diff --git a/pkg/config/config.go b/pkg/config/config.go index 325eec68c..77bfe4adf 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -105,12 +105,31 @@ type PLIThrottleConfig struct { HighQuality time.Duration `yaml:"high_quality,omitempty"` } +type CongestionControlProbeConfig struct { + BaseInterval time.Duration `yaml:"base_interval,omitempty"` + BackoffFactor float64 `yaml:"backoff_factor,omitempty"` + MaxInterval time.Duration `yaml:"max_interval,omitempty"` + + SettleWait time.Duration `yaml:"settle_wait,omitempty"` + SettleWaitMax time.Duration `yaml:"settle_wait_max,omitempty"` + + TrendWait time.Duration `yaml:"trend_wait,omitempty"` + + OveragePct int64 `yaml:"overage_pct,omitempty"` + MinBps int64 `yaml:"min_bps,omitempty"` + MinDuration time.Duration `yaml:"min_duration,omitempty"` + MaxDuration time.Duration `yaml:"max_duration,omitempty"` + DurationOverflowFactor float64 `yaml:"duration_overflow_factor,omitempty"` + DurationIncreaseFactor float64 `yaml:"duration_increase_factor,omitempty"` +} + type CongestionControlConfig struct { - Enabled bool `yaml:"enabled"` - AllowPause bool `yaml:"allow_pause"` - UseSendSideBWE bool `yaml:"send_side_bandwidth_estimation,omitempty"` - ProbeMode CongestionControlProbeMode `yaml:"padding_mode,omitempty"` - MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"` + Enabled bool `yaml:"enabled"` + AllowPause bool `yaml:"allow_pause"` + UseSendSideBWE bool `yaml:"send_side_bandwidth_estimation,omitempty"` + ProbeMode CongestionControlProbeMode `yaml:"padding_mode,omitempty"` + MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"` + ProbeConfig CongestionControlProbeConfig `yaml:"probe_config,omitempty"` } type AudioConfig struct { @@ -269,6 +288,23 @@ func NewConfig(confString string, strictMode bool, c *cli.Context, baseFlags []c Enabled: true, AllowPause: false, ProbeMode: CongestionControlProbeModePadding, + ProbeConfig: CongestionControlProbeConfig{ + BaseInterval: 3 * time.Second, + BackoffFactor: 1.5, + MaxInterval: 2 * time.Minute, + + SettleWait: 250 * time.Millisecond, + SettleWaitMax: 10 * time.Second, + + TrendWait: 2 * time.Second, + + OveragePct: 120, + MinBps: 200_000, + MinDuration: 200 * time.Millisecond, + MaxDuration: 20 * time.Second, + DurationOverflowFactor: 1.25, + DurationIncreaseFactor: 1.5, + }, }, }, Audio: AudioConfig{ @@ -596,6 +632,13 @@ func GenerateCLIFlags(existingFlags []cli.Flag, hidden bool) ([]cli.Flag, error) Usage: generatedCLIFlagUsage, Hidden: hidden, } + case reflect.Float64: + flag = &cli.Float64Flag{ + Name: name, + EnvVars: []string{envVar}, + Usage: generatedCLIFlagUsage, + Hidden: hidden, + } case reflect.Slice: // TODO continue @@ -647,6 +690,8 @@ func (conf *Config) updateFromCLI(c *cli.Context, baseFlags []cli.Flag) error { configValue.SetUint(c.Uint64(flagName)) case reflect.Float32: configValue.SetFloat(c.Float64(flagName)) + case reflect.Float64: + configValue.SetFloat(c.Float64(flagName)) // case reflect.Slice: // // TODO // case reflect.Map: diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index c3c2e4302..4a8b31f9c 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -11,6 +11,7 @@ import ( const ( defaultRtt = 70 ignoreRetransmission = 100 // Ignore packet retransmission after ignoreRetransmission milliseconds + maxAck = 3 ) func btoi(b bool) int { @@ -187,7 +188,7 @@ func (s *sequencer) getPacketsMeta(seqNo []uint16) []packetMeta { continue } - if seq.lastNack == 0 || refTime-seq.lastNack > uint32(math.Min(float64(ignoreRetransmission), float64(2*s.rtt))) { + if (seq.lastNack == 0 || refTime-seq.lastNack > uint32(math.Min(float64(ignoreRetransmission), float64(2*s.rtt)))) && seq.nacked < maxAck { seq.nacked++ seq.lastNack = refTime diff --git a/pkg/sfu/streamallocator/channelobserver.go b/pkg/sfu/streamallocator/channelobserver.go index 9c99e90da..0e094bdad 100644 --- a/pkg/sfu/streamallocator/channelobserver.go +++ b/pkg/sfu/streamallocator/channelobserver.go @@ -120,6 +120,10 @@ func (c *ChannelObserver) GetHighestEstimate() int64 { return c.estimateTrend.GetHighest() } +func (c *ChannelObserver) HasEnoughEstimateSamples() bool { + return c.estimateTrend.HasEnoughSamples() +} + func (c *ChannelObserver) GetNackRatio() float64 { return c.nackTracker.GetRatio() } diff --git a/pkg/sfu/streamallocator/probe_controller.go b/pkg/sfu/streamallocator/probe_controller.go index 9fc80ad53..1d6bfd352 100644 --- a/pkg/sfu/streamallocator/probe_controller.go +++ b/pkg/sfu/streamallocator/probe_controller.go @@ -4,26 +4,14 @@ import ( "sync" "time" + "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/protocol/logger" ) -const ( - ProbeWaitBase = 5 * time.Second - ProbeBackoffFactor = 1.5 - ProbeWaitMax = 2 * time.Minute - ProbeSettleWait = 250 - ProbeSettleWaitMax = 10 * time.Second - ProbeTrendWait = 2 * time.Second - - ProbePct = 120 - ProbeMinBps = 200 * 1000 // 200 kbps - ProbeMinDuration = 20 * time.Second - ProbeMaxDuration = 21 * time.Second -) - // --------------------------------------------------------------------------- type ProbeControllerParams struct { + Config config.CongestionControlProbeConfig Prober *Prober Logger logger.Logger } @@ -31,19 +19,23 @@ type ProbeControllerParams struct { type ProbeController struct { params ProbeControllerParams - lock sync.RWMutex - probeInterval time.Duration - lastProbeStartTime time.Time - probeGoalBps int64 - probeClusterId ProbeClusterId - abortedProbeClusterId ProbeClusterId - probeTrendObserved bool - probeEndTime time.Time + lock sync.RWMutex + probeInterval time.Duration + lastProbeStartTime time.Time + probeGoalBps int64 + probeClusterId ProbeClusterId + doneProbeClusterInfo ProbeClusterInfo + abortedProbeClusterId ProbeClusterId + goalReachedProbeClusterId ProbeClusterId + probeTrendObserved bool + probeEndTime time.Time + probeDuration time.Duration } func NewProbeController(params ProbeControllerParams) *ProbeController { p := &ProbeController{ - params: params, + params: params, + probeDuration: params.Config.MinDuration, } p.Reset() @@ -57,45 +49,21 @@ func (p *ProbeController) Reset() { p.lastProbeStartTime = time.Now() p.resetProbeIntervalLocked() + p.resetProbeDurationLocked() p.clearProbeLocked() } -func (p *ProbeController) ProbeClusterDone(info ProbeClusterInfo, lowestEstimate int64) bool { +func (p *ProbeController) ProbeClusterDone(info ProbeClusterInfo) { p.lock.Lock() defer p.lock.Unlock() if p.probeClusterId != info.Id { p.params.Logger.Infow("not expected probe cluster", "probeClusterId", p.probeClusterId, "resetProbeClusterId", info.Id) - return false + return } - if p.abortedProbeClusterId == ProbeClusterIdInvalid { - // successful probe, finalize - return p.finalizeProbeLocked() - } - - // ensure probe queue is flushed - // STREAM-ALLOCATOR-TODO: ProbeSettleWait should actually be a certain number of RTTs. - expectedDuration := float64(info.BytesSent*8*1000) / float64(lowestEstimate) - queueTime := expectedDuration - float64(info.Duration.Milliseconds()) - if queueTime < 0.0 { - queueTime = 0.0 - } - queueWait := time.Duration(queueTime+float64(ProbeSettleWait)) * time.Millisecond - if queueWait > ProbeSettleWaitMax { - queueWait = ProbeSettleWaitMax - } - p.probeEndTime = p.lastProbeStartTime.Add(queueWait + info.Duration) - p.params.Logger.Infow( - "setting probe end time", - "probeClusterId", p.probeClusterId, - "expectedDuration", expectedDuration, - "queueTime", queueTime, - "queueWait", queueWait, - "probeEndTime", p.probeEndTime, - ) - return false + p.doneProbeClusterInfo = info } func (p *ProbeController) CheckProbe(trend ChannelTrend, highestEstimate int64) { @@ -111,7 +79,7 @@ func (p *ProbeController) CheckProbe(trend ChannelTrend, highestEstimate int64) } switch { - case !p.probeTrendObserved && time.Since(p.lastProbeStartTime) > ProbeTrendWait: + case !p.probeTrendObserved && time.Since(p.lastProbeStartTime) > p.params.Config.TrendWait: // // More of a safety net. // In rare cases, the estimate gets stuck. Prevent from probe running amok @@ -133,41 +101,87 @@ func (p *ProbeController) CheckProbe(trend ChannelTrend, highestEstimate int64) "goal", p.probeGoalBps, "highest", highestEstimate, ) + p.goalReachedProbeClusterId = p.probeClusterId p.StopProbe() } } -func (p *ProbeController) MaybeFinalizeProbe() (isHandled bool, isSuccessful bool) { +func (p *ProbeController) MaybeFinalizeProbe( + isComplete bool, + trend ChannelTrend, + lowestEstimate int64, +) (isHandled bool, isNotFailing bool, isGoalReached bool) { p.lock.Lock() defer p.lock.Unlock() - if p.isInProbeLocked() && !p.probeEndTime.IsZero() && time.Now().After(p.probeEndTime) { - return true, p.finalizeProbeLocked() + if !p.isInProbeLocked() { + return false, false, false } - return false, false + if p.goalReachedProbeClusterId != ProbeClusterIdInvalid { + // finalise goal reached probe cluster + p.finalizeProbeLocked(ChannelTrendNeutral) + return true, true, true + } + + if (isComplete || p.abortedProbeClusterId != ProbeClusterIdInvalid) && p.probeEndTime.IsZero() && p.doneProbeClusterInfo.Id != ProbeClusterIdInvalid && p.doneProbeClusterInfo.Id == p.probeClusterId { + // ensure any queueing due to probing is flushed + // STREAM-ALLOCATOR-TODO: CongestionControlProbeConfig.SettleWait should actually be a certain number of RTTs. + expectedDuration := float64(9.0) + if lowestEstimate != 0 { + expectedDuration = float64(p.doneProbeClusterInfo.BytesSent*8*1000) / float64(lowestEstimate) + } + queueTime := expectedDuration - float64(p.doneProbeClusterInfo.Duration.Milliseconds()) + if queueTime < 0.0 { + queueTime = 0.0 + } + queueWait := (time.Duration(queueTime) * time.Millisecond) + p.params.Config.SettleWait + if queueWait > p.params.Config.SettleWaitMax { + queueWait = p.params.Config.SettleWaitMax + } + p.probeEndTime = p.lastProbeStartTime.Add(queueWait + p.doneProbeClusterInfo.Duration) + p.params.Logger.Infow( + "setting probe end time", + "probeClusterId", p.probeClusterId, + "expectedDuration", expectedDuration, + "queueTime", queueTime, + "queueWait", queueWait, + "probeEndTime", p.probeEndTime, + ) + } + + if !p.probeEndTime.IsZero() && time.Now().After(p.probeEndTime) { + // finalisze aborted or non-failing but non-goal-reached probe cluster + return true, p.finalizeProbeLocked(trend), false + } + + return false, false, false } func (p *ProbeController) DoesProbeNeedFinalize() bool { p.lock.RLock() defer p.lock.RUnlock() - return p.abortedProbeClusterId != ProbeClusterIdInvalid + return p.abortedProbeClusterId != ProbeClusterIdInvalid || p.goalReachedProbeClusterId != ProbeClusterIdInvalid } -func (p *ProbeController) finalizeProbeLocked() bool { +func (p *ProbeController) finalizeProbeLocked(trend ChannelTrend) (isNotFailing bool) { aborted := p.probeClusterId == p.abortedProbeClusterId p.clearProbeLocked() - if aborted { + if aborted || trend == ChannelTrendCongesting { // failed probe, backoff p.backoffProbeIntervalLocked() + p.resetProbeDurationLocked() return false } - // reset probe interval on a successful probe + // reset probe interval and increase probe duration on a upward trending probe p.resetProbeIntervalLocked() + if trend == ChannelTrendClearing { + p.increaseProbeDurationLocked() + } return true } @@ -178,13 +192,15 @@ func (p *ProbeController) InitProbe(probeGoalDeltaBps int64, expectedBandwidthUs p.lastProbeStartTime = time.Now() // overshoot a bit to account for noise (in measurement/estimate etc) - desiredIncreaseBps := (probeGoalDeltaBps * ProbePct) / 100 - if desiredIncreaseBps < ProbeMinBps { - desiredIncreaseBps = ProbeMinBps + desiredIncreaseBps := (probeGoalDeltaBps * p.params.Config.OveragePct) / 100 + if desiredIncreaseBps < p.params.Config.MinBps { + desiredIncreaseBps = p.params.Config.MinBps } p.probeGoalBps = expectedBandwidthUsage + desiredIncreaseBps + p.doneProbeClusterInfo = ProbeClusterInfo{Id: ProbeClusterIdInvalid} p.abortedProbeClusterId = ProbeClusterIdInvalid + p.goalReachedProbeClusterId = ProbeClusterIdInvalid p.probeTrendObserved = false @@ -194,8 +210,8 @@ func (p *ProbeController) InitProbe(probeGoalDeltaBps int64, expectedBandwidthUs ProbeClusterModeUniform, int(p.probeGoalBps), int(expectedBandwidthUsage), - ProbeMinDuration, - ProbeMaxDuration, + p.probeDuration, + time.Duration(float64(p.probeDuration.Milliseconds())*p.params.Config.DurationOverflowFactor)*time.Millisecond, ) return p.probeClusterId, p.probeGoalBps @@ -203,18 +219,31 @@ func (p *ProbeController) InitProbe(probeGoalDeltaBps int64, expectedBandwidthUs func (p *ProbeController) clearProbeLocked() { p.probeClusterId = ProbeClusterIdInvalid + p.doneProbeClusterInfo = ProbeClusterInfo{Id: ProbeClusterIdInvalid} p.abortedProbeClusterId = ProbeClusterIdInvalid + p.goalReachedProbeClusterId = ProbeClusterIdInvalid } func (p *ProbeController) backoffProbeIntervalLocked() { - p.probeInterval = time.Duration(p.probeInterval.Seconds()*ProbeBackoffFactor) * time.Second - if p.probeInterval > ProbeWaitMax { - p.probeInterval = ProbeWaitMax + p.probeInterval = time.Duration(p.probeInterval.Seconds()*p.params.Config.BackoffFactor) * time.Second + if p.probeInterval > p.params.Config.MaxInterval { + p.probeInterval = p.params.Config.MaxInterval } } func (p *ProbeController) resetProbeIntervalLocked() { - p.probeInterval = ProbeWaitBase + p.probeInterval = p.params.Config.BaseInterval +} + +func (p *ProbeController) resetProbeDurationLocked() { + p.probeDuration = p.params.Config.MinDuration +} + +func (p *ProbeController) increaseProbeDurationLocked() { + p.probeDuration = time.Duration(float64(p.probeDuration.Milliseconds())*p.params.Config.DurationIncreaseFactor) * time.Millisecond + if p.probeDuration > p.params.Config.MaxDuration { + p.probeDuration = p.params.Config.MaxDuration + } } func (p *ProbeController) StopProbe() { diff --git a/pkg/sfu/streamallocator/prober.go b/pkg/sfu/streamallocator/prober.go index 66ef47b8a..be774f9b2 100644 --- a/pkg/sfu/streamallocator/prober.go +++ b/pkg/sfu/streamallocator/prober.go @@ -354,7 +354,7 @@ type ProbeClusterId uint32 const ( ProbeClusterIdInvalid ProbeClusterId = 0 - bucketDuration = time.Second + bucketDuration = 100 * time.Millisecond bytesPerProbe = 1000 minProbeRateBps = 10000 ) @@ -423,14 +423,14 @@ func NewCluster(id ProbeClusterId, mode ProbeClusterMode, desiredRateBps int, ex } func (c *Cluster) initBuckets(desiredRateBps int, expectedRateBps int, minDuration time.Duration) { - // split into 1-second bucket + // split into granular buckets // NOTE: splitting even if mode is unitform numBuckets := int((minDuration.Milliseconds() + bucketDuration.Milliseconds() - 1) / bucketDuration.Milliseconds()) if numBuckets < 1 { numBuckets = 1 } - expectedRateBytes := (expectedRateBps + 7) / 8 + expectedRateBytesPerSec := (expectedRateBps + 7) / 8 baseProbeRateBps := (desiredRateBps - expectedRateBps + numBuckets - 1) / numBuckets runningDesiredBytes := 0 @@ -447,13 +447,13 @@ func (c *Cluster) initBuckets(desiredRateBps int, expectedRateBps int, minDurati if bucketProbeRateBps < minProbeRateBps { bucketProbeRateBps = minProbeRateBps } - bucketProbeRateBytes := (bucketProbeRateBps + 7) / 8 + bucketProbeRateBytesPerSec := (bucketProbeRateBps + 7) / 8 // pace based on bytes per probe - numProbes := (bucketProbeRateBytes + bytesPerProbe - 1) / bytesPerProbe - sleepDurationMicroSeconds := int(float64(1_000_000)/float64(numProbes) + 0.5) + numProbesPerSec := (bucketProbeRateBytesPerSec + bytesPerProbe - 1) / bytesPerProbe + sleepDurationMicroSeconds := int(float64(1_000_000)/float64(numProbesPerSec) + 0.5) - runningDesiredBytes += bucketProbeRateBytes + expectedRateBytes + runningDesiredBytes += (((bucketProbeRateBytesPerSec + expectedRateBytesPerSec) * int(bucketDuration.Milliseconds())) + 999) / 1000 runningDesiredElapsedTime += bucketDuration c.buckets = append(c.buckets, clusterBucket{ @@ -537,10 +537,10 @@ func (c *Cluster) Process(pl ProberListener) { if bytesShortFall < 0 { bytesShortFall = 0 } - // cap short fall to limit to 8 packets in an iteration + // cap short fall to limit to 5 packets in an iteration // 275 bytes per packet (255 max RTP padding payload + 20 bytes RTP header) - if bytesShortFall > (275 * 8) { - bytesShortFall = 275 * 8 + if bytesShortFall > (275 * 5) { + bytesShortFall = 275 * 5 } // round up to packet size bytesShortFall = ((bytesShortFall + 274) / 275) * 275 diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index dba10d7f2..475b38e94 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -2,7 +2,6 @@ package streamallocator import ( "fmt" - "math" "sort" "sync" "time" @@ -199,6 +198,7 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator { } s.probeController = NewProbeController(ProbeControllerParams{ + Config: s.params.Config.ProbeConfig, Prober: s.prober, Logger: params.Logger, }) @@ -555,7 +555,7 @@ func (s *StreamAllocator) processEvents() { } func (s *StreamAllocator) ping() { - ticker := time.NewTicker(500 * time.Millisecond) + ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { @@ -641,9 +641,14 @@ func (s *StreamAllocator) handleSignalEstimate(event *Event) { func (s *StreamAllocator) handleSignalPeriodicPing(event *Event) { // finalize probe if necessary - isHandled, isSuccessful := s.probeController.MaybeFinalizeProbe() + trend, _ := s.channelObserver.GetTrend() + isHandled, isNotFailing, isGoalReached := s.probeController.MaybeFinalizeProbe( + s.channelObserver.HasEnoughEstimateSamples(), + trend, + s.channelObserver.GetLowestEstimate(), + ) if isHandled { - s.onProbeDone(isSuccessful) + s.onProbeDone(isNotFailing, isGoalReached) } // probe if necessary and timing is right @@ -677,10 +682,7 @@ func (s *StreamAllocator) handleSignalSendProbe(event *Event) { func (s *StreamAllocator) handleSignalProbeClusterDone(event *Event) { info, _ := event.Data.(ProbeClusterInfo) - isHandled := s.probeController.ProbeClusterDone(info, int64(math.Min(float64(s.committedChannelCapacity), float64(s.channelObserver.GetLowestEstimate())))) - if isHandled { - s.onProbeDone(true) - } + s.probeController.ProbeClusterDone(info) } func (s *StreamAllocator) handleSignalResume(event *Event) { @@ -925,7 +927,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) { s.adjustState() } -func (s *StreamAllocator) onProbeDone(isSuccessful bool) { +func (s *StreamAllocator) onProbeDone(isNotFailing bool, isGoalReached bool) { highestEstimateInProbe := s.channelObserver.GetHighestEstimate() // @@ -939,18 +941,23 @@ func (s *StreamAllocator) onProbeDone(isSuccessful bool) { // NOTE: With TWCC, it is possible to reset bandwidth estimation to clean state as // the send side is in full control of bandwidth estimation. // + channelObserverString := s.channelObserver.ToString() s.channelObserver = s.newChannelObserverNonProbe() - if !isSuccessful { + s.params.Logger.Infow( + "probe done", + "isNotFailing", isNotFailing, + "isGoalReached", isGoalReached, + "committedEstimate", s.committedChannelCapacity, + "highestEstimate", highestEstimateInProbe, + "channel", channelObserverString, + ) + if !isNotFailing { return } - // probe estimate is same or higher, commit it and try to allocate deficient tracks - s.params.Logger.Infow( - "successful probe, updating channel capacity", - "old(bps)", s.committedChannelCapacity, - "new(bps)", highestEstimateInProbe, - ) - s.committedChannelCapacity = highestEstimateInProbe + if highestEstimateInProbe > s.committedChannelCapacity { + s.committedChannelCapacity = highestEstimateInProbe + } s.maybeBoostDeficientTracks() } diff --git a/pkg/sfu/streamallocator/trenddetector.go b/pkg/sfu/streamallocator/trenddetector.go index 2a5281230..ed0ae1d71 100644 --- a/pkg/sfu/streamallocator/trenddetector.go +++ b/pkg/sfu/streamallocator/trenddetector.go @@ -122,6 +122,10 @@ func (t *TrendDetector) GetDirection() TrendDirection { return t.direction } +func (t *TrendDetector) HasEnoughSamples() bool { + return t.numSamples >= t.params.RequiredSamples +} + func (t *TrendDetector) ToString() string { now := time.Now() elapsed := now.Sub(t.startTime).Seconds()