diff --git a/pkg/rtc/mediatrack_test.go b/pkg/rtc/mediatrack_test.go index 7a397dbbe..3be331d16 100644 --- a/pkg/rtc/mediatrack_test.go +++ b/pkg/rtc/mediatrack_test.go @@ -2,7 +2,9 @@ package rtc import ( "testing" + "time" + "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/protocol/livekit" "github.com/stretchr/testify/require" ) @@ -148,29 +150,34 @@ func TestSubscribedMaxQuality(t *testing.T) { }) t.Run("subscribers max quality", func(t *testing.T) { - mt := NewMediaTrack(MediaTrackParams{TrackInfo: &livekit.TrackInfo{ - Sid: "v1", - Type: livekit.TrackType_VIDEO, - Width: 1080, - Height: 720, - Layers: []*livekit.VideoLayer{ - { - Quality: livekit.VideoQuality_LOW, - Width: 480, - Height: 270, - }, - { - Quality: livekit.VideoQuality_MEDIUM, - Width: 960, - Height: 540, - }, - { - Quality: livekit.VideoQuality_HIGH, - Width: 1080, - Height: 720, + mt := NewMediaTrack(MediaTrackParams{ + TrackInfo: &livekit.TrackInfo{ + Sid: "v1", + Type: livekit.TrackType_VIDEO, + Width: 1080, + Height: 720, + Layers: []*livekit.VideoLayer{ + { + Quality: livekit.VideoQuality_LOW, + Width: 480, + Height: 270, + }, + { + Quality: livekit.VideoQuality_MEDIUM, + Width: 960, + Height: 540, + }, + { + Quality: livekit.VideoQuality_HIGH, + Width: 1080, + Height: 720, + }, }, }, - }}) + VideoConfig: config.VideoConfig{ + SubscribedQualityUpdateThrottle: 100 * time.Millisecond, + }, + }) actualTrackID := livekit.TrackID("") actualSubscribedQualities := make([]*livekit.SubscribedQuality, 0) @@ -194,6 +201,9 @@ func TestSubscribedMaxQuality(t *testing.T) { // "s1" dropping to MEDIUM should disable HIGH layer mt.notifySubscriberMaxQuality("s1", livekit.VideoQuality_MEDIUM) + // wait for throttle to kick in + time.Sleep(110 * time.Millisecond) + expectedSubscribedQualities = []*livekit.SubscribedQuality{ {Quality: livekit.VideoQuality_LOW, Enabled: true}, {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, @@ -206,6 +216,9 @@ func TestSubscribedMaxQuality(t *testing.T) { mt.notifySubscriberMaxQuality("s1", livekit.VideoQuality_LOW) mt.notifySubscriberMaxQuality("s2", livekit.VideoQuality_LOW) + // wait for throttle to kick in + time.Sleep(110 * time.Millisecond) + expectedSubscribedQualities = []*livekit.SubscribedQuality{ {Quality: livekit.VideoQuality_LOW, Enabled: true}, {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index 9a32a7729..04d9d0656 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -20,18 +20,6 @@ import ( const ( ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps - NumRequiredEstimatesNonProbe = 8 - NumRequiredEstimatesProbe = 3 - - DownwardTrendThresholdNonProbe = -0.5 - DownwardTrendThresholdProbe = 0.0 - - NackWindowDurationProbe = 0 * time.Second - NackWindowDurationNonProbe = 2 * time.Second - - NackRatioThresholdNonProbe = 0.08 - NackRatioThresholdProbe = 0.04 - NackRatioAttenuator = 0.4 // how much to attenuate NACK ratio while calculating loss adjusted estimate ProbeWaitBase = 5 * time.Second @@ -51,6 +39,28 @@ const ( PriorityDefaultVideo = PriorityMin ) +var ( + ChannelObserverParamsProbe = ChannelObserverParams{ + Name: "probe", + EstimateRequiredSamples: 3, + EstimateDownwardTrendThreshold: 0.0, + EstimateCollapseValues: false, + NackWindowMinDuration: 500 * time.Millisecond, + NackWindowMaxDuration: 1 * time.Second, + NackRatioThreshold: 0.04, + } + + ChannelObserverParamsNonProbe = ChannelObserverParams{ + Name: "non-probe", + EstimateRequiredSamples: 8, + EstimateDownwardTrendThreshold: -0.5, + EstimateCollapseValues: true, + NackWindowMinDuration: 1 * time.Second, + NackWindowMaxDuration: 2 * time.Second, + NackRatioThreshold: 0.08, + } +) + type State int const ( @@ -709,11 +719,8 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() { expectedBandwidthUsage := s.getExpectedBandwidthUsage() switch reason { case ChannelCongestionReasonLoss: - estimateToCommit = expectedBandwidthUsage nackRatio = s.channelObserver.GetNackRatio() - if nackRatio > NackRatioThresholdNonProbe { - estimateToCommit = int64(float64(estimateToCommit) * (1.0 - NackRatioAttenuator*nackRatio)) - } + estimateToCommit = int64(float64(expectedBandwidthUsage) * (1.0 - NackRatioAttenuator*nackRatio)) default: estimateToCommit = s.lastReceivedEstimate } @@ -1033,27 +1040,11 @@ func (s *StreamAllocator) getNackDelta() (uint32, uint32) { } func (s *StreamAllocator) newChannelObserverProbe() *ChannelObserver { - return NewChannelObserver(ChannelObserverParams{ - Name: "probe", - Logger: s.params.Logger, - EstimateRequiredSamples: NumRequiredEstimatesProbe, - EstimateDownwardTrendThreshold: DownwardTrendThresholdProbe, - EstimateCollapseValues: false, - NackWindowDuration: NackWindowDurationProbe, - NackRatioThreshold: NackRatioThresholdProbe, - }) + return NewChannelObserver(ChannelObserverParamsProbe, s.params.Logger) } func (s *StreamAllocator) newChannelObserverNonProbe() *ChannelObserver { - return NewChannelObserver(ChannelObserverParams{ - Name: "non-probe", - Logger: s.params.Logger, - EstimateRequiredSamples: NumRequiredEstimatesNonProbe, - EstimateDownwardTrendThreshold: DownwardTrendThresholdNonProbe, - EstimateCollapseValues: true, - NackWindowDuration: NackWindowDurationNonProbe, - NackRatioThreshold: NackRatioThresholdNonProbe, - }) + return NewChannelObserver(ChannelObserverParamsNonProbe, s.params.Logger) } func (s *StreamAllocator) initProbe(probeRateBps int64) { @@ -1528,16 +1519,17 @@ func (c ChannelCongestionReason) String() string { type ChannelObserverParams struct { Name string - Logger logger.Logger EstimateRequiredSamples int EstimateDownwardTrendThreshold float64 EstimateCollapseValues bool - NackWindowDuration time.Duration + NackWindowMinDuration time.Duration + NackWindowMaxDuration time.Duration NackRatioThreshold float64 } type ChannelObserver struct { params ChannelObserverParams + logger logger.Logger estimateTrend *TrendDetector @@ -1546,17 +1538,17 @@ type ChannelObserver struct { repeatedNacks uint32 } -func NewChannelObserver(params ChannelObserverParams) *ChannelObserver { +func NewChannelObserver(params ChannelObserverParams, logger logger.Logger) *ChannelObserver { return &ChannelObserver{ params: params, + logger: logger, estimateTrend: NewTrendDetector(TrendDetectorParams{ Name: params.Name + "-estimate", - Logger: params.Logger, + Logger: logger, RequiredSamples: params.EstimateRequiredSamples, DownwardTrendThreshold: params.EstimateDownwardTrendThreshold, CollapseValues: params.EstimateCollapseValues, }), - nackWindowStartTime: time.Now(), } } @@ -1574,14 +1566,26 @@ func (c *ChannelObserver) AddEstimate(estimate int64) { } func (c *ChannelObserver) AddNack(packets uint32, repeatedNacks uint32) { - if c.params.NackWindowDuration != 0 && time.Since(c.nackWindowStartTime) > c.params.NackWindowDuration { - c.nackWindowStartTime = time.Now() + if c.params.NackWindowMaxDuration != 0 && !c.nackWindowStartTime.IsZero() && time.Since(c.nackWindowStartTime) > c.params.NackWindowMaxDuration { + c.nackWindowStartTime = time.Time{} c.packets = 0 c.repeatedNacks = 0 } - c.packets += packets - c.repeatedNacks += repeatedNacks + // + // Start NACK monitoring window only when a repeated NACK happens. + // This allows locking tightly to when NACKs start happening and + // check if the NACKs keep adding up (potentially a sign of congestion) + // or isolated losses + // + if c.repeatedNacks == 0 && repeatedNacks != 0 { + c.nackWindowStartTime = time.Now() + } + + if !c.nackWindowStartTime.IsZero() { + c.packets += packets + c.repeatedNacks += repeatedNacks + } } func (c *ChannelObserver) GetLowestEstimate() int64 { @@ -1610,14 +1614,14 @@ func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason) { switch { case estimateDirection == TrendDirectionDownward: - c.params.Logger.Debugw( + c.logger.Debugw( "channel observer: estimate is trending downward", "name", c.params.Name, "estimate", c.estimateTrend.ToString(), ) return ChannelTrendCongesting, ChannelCongestionReasonEstimate - case nackRatio > c.params.NackRatioThreshold: - c.params.Logger.Debugw( + case c.params.NackWindowMinDuration != 0 && !c.nackWindowStartTime.IsZero() && time.Since(c.nackWindowStartTime) > c.params.NackWindowMinDuration && nackRatio > c.params.NackRatioThreshold: + c.logger.Debugw( "channel observer: high rate of repeated NACKs", "name", c.params.Name, "ratio", nackRatio,