Merge remote-tracking branch 'origin/master' into raja_1833

This commit is contained in:
boks1971
2023-08-08 09:04:29 +05:30
8 changed files with 193 additions and 73 deletions
+9 -5
View File
@@ -149,8 +149,10 @@ type CongestionControlChannelObserverConfig struct {
}
type CongestionControlConfig struct {
Enabled bool `yaml:"enabled"`
AllowPause bool `yaml:"allow_pause"`
Enabled bool `yaml:"enabled,omitempty"`
AllowPause bool `yaml:"allow_pause,omitempty"`
NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"`
ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"`
UseSendSideBWE bool `yaml:"send_side_bandwidth_estimation,omitempty"`
ProbeMode CongestionControlProbeMode `yaml:"padding_mode,omitempty"`
MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"`
@@ -316,9 +318,11 @@ var DefaultConfig = Config{
HighQuality: time.Second,
},
CongestionControl: CongestionControlConfig{
Enabled: true,
AllowPause: false,
ProbeMode: CongestionControlProbeModePadding,
Enabled: true,
AllowPause: false,
NackRatioAttenuator: 0.4,
ExpectedUsageThreshold: 0.95,
ProbeMode: CongestionControlProbeModePadding,
ProbeConfig: CongestionControlProbeConfig{
BaseInterval: 3 * time.Second,
BackoffFactor: 1.5,
+1
View File
@@ -24,6 +24,7 @@ var (
ErrIdentityEmpty = psrpc.NewErrorf(psrpc.InvalidArgument, "identity cannot be empty")
ErrIngressNotConnected = psrpc.NewErrorf(psrpc.Internal, "ingress not connected (redis required)")
ErrIngressNotFound = psrpc.NewErrorf(psrpc.NotFound, "ingress does not exist")
ErrIngressNonReusable = psrpc.NewErrorf(psrpc.InvalidArgument, "ingress is not reusable and cannot be modified")
ErrMetadataExceedsLimits = psrpc.NewErrorf(psrpc.InvalidArgument, "metadata size exceeds limits")
ErrOperationFailed = psrpc.NewErrorf(psrpc.Internal, "operation cannot be completed")
ErrParticipantNotFound = psrpc.NewErrorf(psrpc.NotFound, "participant does not exist")
+97 -16
View File
@@ -16,6 +16,8 @@ package service
import (
"context"
"fmt"
"net/url"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/telemetry"
@@ -27,6 +29,10 @@ import (
"github.com/livekit/psrpc"
)
type IngressLauncher interface {
LaunchPullIngress(ctx context.Context, info *livekit.IngressInfo) (*livekit.IngressInfo, error)
}
type IngressService struct {
conf *config.IngressConfig
nodeID livekit.NodeID
@@ -35,6 +41,30 @@ type IngressService struct {
store IngressStore
roomService livekit.RoomService
telemetry telemetry.TelemetryService
launcher IngressLauncher
}
func NewIngressServiceWithIngressLauncher(
conf *config.IngressConfig,
nodeID livekit.NodeID,
bus psrpc.MessageBus,
psrpcClient rpc.IngressClient,
store IngressStore,
rs livekit.RoomService,
ts telemetry.TelemetryService,
launcher IngressLauncher,
) *IngressService {
return &IngressService{
conf: conf,
nodeID: nodeID,
bus: bus,
psrpcClient: psrpcClient,
store: store,
roomService: rs,
telemetry: ts,
launcher: launcher,
}
}
func NewIngressService(
@@ -46,16 +76,11 @@ func NewIngressService(
rs livekit.RoomService,
ts telemetry.TelemetryService,
) *IngressService {
s := NewIngressServiceWithIngressLauncher(conf, nodeID, bus, psrpcClient, store, rs, ts, nil)
return &IngressService{
conf: conf,
nodeID: nodeID,
bus: bus,
psrpcClient: psrpcClient,
store: store,
roomService: rs,
telemetry: ts,
}
s.launcher = s
return s
}
func (s *IngressService) CreateIngress(ctx context.Context, req *livekit.CreateIngressRequest) (*livekit.IngressInfo, error) {
@@ -70,17 +95,18 @@ func (s *IngressService) CreateIngress(ctx context.Context, req *livekit.CreateI
AppendLogFields(ctx, fields...)
}()
var urlPrefix string
var url string
switch req.InputType {
case livekit.IngressInput_RTMP_INPUT:
urlPrefix = s.conf.RTMPBaseURL
url = s.conf.RTMPBaseURL
case livekit.IngressInput_WHIP_INPUT:
urlPrefix = s.conf.WHIPBaseURL
url = s.conf.WHIPBaseURL
case livekit.IngressInput_URL_INPUT:
default:
return nil, ingress.ErrInvalidIngressType
}
ig, err := s.CreateIngressWithUrlPrefix(ctx, urlPrefix, req)
ig, err := s.CreateIngressWithUrl(ctx, url, req)
if err != nil {
return nil, err
}
@@ -89,7 +115,7 @@ func (s *IngressService) CreateIngress(ctx context.Context, req *livekit.CreateI
return ig, nil
}
func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPrefix string, req *livekit.CreateIngressRequest) (*livekit.IngressInfo, error) {
func (s *IngressService) CreateIngressWithUrl(ctx context.Context, urlStr string, req *livekit.CreateIngressRequest) (*livekit.IngressInfo, error) {
err := EnsureIngressAdminPermission(ctx)
if err != nil {
return nil, twirpAuthError(err)
@@ -98,13 +124,28 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref
return nil, ErrIngressNotConnected
}
if req.InputType == livekit.IngressInput_URL_INPUT {
if req.Url == "" {
return nil, ingress.ErrInvalidIngress("missing URL parameter")
}
urlObj, err := url.Parse(req.Url)
if err != nil {
return nil, psrpc.NewError(psrpc.InvalidArgument, err)
}
if urlObj.Scheme != "http" && urlObj.Scheme != "https" {
return nil, ingress.ErrInvalidIngress(fmt.Sprintf("invalid url scheme %s", urlObj.Scheme))
}
// Marshall the URL again for sanitization
urlStr = urlObj.String()
}
sk := utils.NewGuid("")
info := &livekit.IngressInfo{
IngressId: utils.NewGuid(utils.IngressPrefix),
Name: req.Name,
StreamKey: sk,
Url: urlPrefix,
Url: urlStr,
InputType: req.InputType,
Audio: req.Audio,
Video: req.Video,
@@ -112,14 +153,41 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref
RoomName: req.RoomName,
ParticipantIdentity: req.ParticipantIdentity,
ParticipantName: req.ParticipantName,
Reusable: req.InputType == livekit.IngressInput_RTMP_INPUT,
State: &livekit.IngressState{},
}
switch req.InputType {
case livekit.IngressInput_RTMP_INPUT,
livekit.IngressInput_WHIP_INPUT:
info.Reusable = true
if err := ingress.ValidateForSerialization(info); err != nil {
return nil, err
}
case livekit.IngressInput_URL_INPUT:
if err := ingress.Validate(info); err != nil {
return nil, err
}
default:
return nil, ingress.ErrInvalidIngressType
}
if err := ingress.ValidateForSerialization(info); err != nil {
return nil, err
}
if req.InputType == livekit.IngressInput_URL_INPUT {
retInfo, err := s.launcher.LaunchPullIngress(ctx, info)
if retInfo != nil {
info = retInfo
} else {
info.State.Status = livekit.IngressState_ENDPOINT_ERROR
info.State.Error = err.Error()
}
if err != nil {
return info, err
}
}
if err = s.store.StoreIngress(ctx, info); err != nil {
logger.Errorw("could not write ingress info", err)
return nil, err
@@ -129,6 +197,14 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref
return info, nil
}
func (s *IngressService) LaunchPullIngress(ctx context.Context, info *livekit.IngressInfo) (*livekit.IngressInfo, error) {
req := &rpc.StartIngressRequest{
Info: info,
}
return s.psrpcClient.StartIngress(ctx, req)
}
func updateInfoUsingRequest(req *livekit.UpdateIngressRequest, info *livekit.IngressInfo) error {
if req.Name != "" {
info.Name = req.Name
@@ -183,6 +259,11 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI
return nil, err
}
if !info.Reusable {
logger.Infow("ingress update attempted on non reusable ingress", "ingressID", info.IngressId)
return info, ErrIngressNonReusable
}
switch info.State.Status {
case livekit.IngressState_ENDPOINT_ERROR:
info.State.Status = livekit.IngressState_ENDPOINT_INACTIVE
@@ -97,8 +97,15 @@ func (d *DependencyDescriptor) MarshalSizeWithActiveChains(activeChains uint32)
}
func (d *DependencyDescriptor) String() string {
return fmt.Sprintf("DependencyDescriptor{FirstPacketInFrame: %v, LastPacketInFrame: %v, FrameNumber: %v, FrameDependencies: %+v, Resolution: %+v, ActiveDecodeTargetsBitmask: %v, AttachedStructure: %v}",
d.FirstPacketInFrame, d.LastPacketInFrame, d.FrameNumber, *d.FrameDependencies, *d.Resolution, formatBitmask(d.ActiveDecodeTargetsBitmask), d.AttachedStructure)
resolution, dependencies := "-", "-"
if d.Resolution != nil {
resolution = fmt.Sprintf("%+v", *d.Resolution)
}
if d.FrameDependencies != nil {
dependencies = fmt.Sprintf("%+v", *d.FrameDependencies)
}
return fmt.Sprintf("DependencyDescriptor{FirstPacketInFrame: %v, LastPacketInFrame: %v, FrameNumber: %v, FrameDependencies: %s, Resolution: %s, ActiveDecodeTargetsBitmask: %v, AttachedStructure: %v}",
d.FirstPacketInFrame, d.LastPacketInFrame, d.FrameNumber, dependencies, resolution, formatBitmask(d.ActiveDecodeTargetsBitmask), d.AttachedStructure)
}
// ------------------------------------------------------------------------------
+18 -8
View File
@@ -36,8 +36,6 @@ import (
const (
ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps
NackRatioAttenuator = 0.4 // how much to attenuate NACK ratio while calculating loss adjusted estimate
PriorityMin = uint8(1)
PriorityMax = uint8(255)
PriorityDefaultScreenshare = PriorityMax
@@ -785,30 +783,42 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() {
expectedBandwidthUsage := s.getExpectedBandwidthUsage()
switch reason {
case ChannelCongestionReasonLoss:
estimateToCommit = int64(float64(expectedBandwidthUsage) * (1.0 - NackRatioAttenuator*s.channelObserver.GetNackRatio()))
if estimateToCommit > s.lastReceivedEstimate {
estimateToCommit = s.lastReceivedEstimate
}
estimateToCommit = int64(float64(expectedBandwidthUsage) * (1.0 - s.params.Config.NackRatioAttenuator*s.channelObserver.GetNackRatio()))
default:
estimateToCommit = s.lastReceivedEstimate
}
if estimateToCommit > s.lastReceivedEstimate {
estimateToCommit = s.lastReceivedEstimate
}
commitThreshold := int64(s.params.Config.ExpectedUsageThreshold * float64(expectedBandwidthUsage))
action := "applying"
if estimateToCommit > commitThreshold {
action = "skipping"
}
s.params.Logger.Infow(
"stream allocator: channel congestion detected, updating channel capacity",
fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity update", action),
"reason", reason,
"old(bps)", s.committedChannelCapacity,
"new(bps)", estimateToCommit,
"lastReceived(bps)", s.lastReceivedEstimate,
"expectedUsage(bps)", expectedBandwidthUsage,
"commitThreshold(bps)", commitThreshold,
"channel", s.channelObserver.ToString(),
)
s.params.Logger.Infow(
"stream allocator: channel congestion detected, updating channel capacity: experimental",
fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity: experimental", action),
"rateHistory", s.rateMonitor.GetHistory(),
"expectedQueuing", s.rateMonitor.GetQueuingGuess(),
"nackHistory", s.channelObserver.GetNackHistory(),
"trackHistory", s.getTracksHistory(),
)
if estimateToCommit > commitThreshold {
// estimate to commit is either higher or within tolerance of expected uage, skip committing and re-allocating
return
}
s.committedChannelCapacity = estimateToCommit
// reset to get new set of samples for next trend
+27 -19
View File
@@ -51,6 +51,23 @@ type trendDetectorSample struct {
at time.Time
}
func trendDetectorSampleListToString(samples []trendDetectorSample) string {
samplesStr := ""
if len(samples) > 0 {
firstTime := samples[0].at
samplesStr += "["
for i, sample := range samples {
suffix := ", "
if i == len(samples)-1 {
suffix = ""
}
samplesStr += fmt.Sprintf("%d(%d)%s", sample.value, sample.at.Sub(firstTime).Milliseconds(), suffix)
}
samplesStr += "]"
}
return samplesStr
}
// ------------------------------------------------
type TrendDetectorParams struct {
@@ -145,28 +162,16 @@ func (t *TrendDetector) HasEnoughSamples() bool {
func (t *TrendDetector) ToString() string {
now := time.Now()
elapsed := now.Sub(t.startTime).Seconds()
samplesStr := ""
if len(t.samples) > 0 {
firstTime := t.samples[0].at
samplesStr += "["
for i, sample := range t.samples {
suffix := ", "
if i == len(t.samples)-1 {
suffix = ""
}
samplesStr += fmt.Sprintf("%d(%d)%s", sample.value, sample.at.Sub(firstTime).Milliseconds(), suffix)
}
samplesStr += "]"
}
return fmt.Sprintf("n: %s, t: %+v|%+v|%.2fs, v: %d|%d|%d|%s|%.2f",
t.params.Name,
t.startTime.Format(time.UnixDate), now.Format(time.UnixDate), elapsed,
t.numSamples, t.lowestValue, t.highestValue, samplesStr, kendallsTau(t.samples),
t.numSamples, t.lowestValue, t.highestValue, trendDetectorSampleListToString(t.samples), kendallsTau(t.samples),
)
}
func (t *TrendDetector) prune() {
// prune based on a few rules
// 1. If there are more than required samples
if len(t.samples) > t.params.RequiredSamples {
t.samples = t.samples[len(t.samples)-t.params.RequiredSamples:]
@@ -187,18 +192,21 @@ func (t *TrendDetector) prune() {
}
}
// 3. If all sample values are same, collapse to just the last one
// 3. collapse same values at the front to just the last of those samples
if len(t.samples) != 0 {
sameValue := true
cutoffIndex := -1
firstValue := t.samples[0].value
for i := 0; i < len(t.samples); i++ {
for i := 1; i < len(t.samples); i++ {
if t.samples[i].value != firstValue {
sameValue = false
cutoffIndex = i - 1
break
}
}
if sameValue {
if cutoffIndex >= 0 {
t.samples = t.samples[cutoffIndex:]
} else {
// all values are the same, just keep the last one
t.samples = t.samples[len(t.samples)-1:]
}
}