diff --git a/pkg/sfu/connectionquality/connectionstats_test.go b/pkg/sfu/connectionquality/connectionstats_test.go index 7af09f19f..6fe4ef218 100644 --- a/pkg/sfu/connectionquality/connectionstats_test.go +++ b/pkg/sfu/connectionquality/connectionstats_test.go @@ -1,6 +1,7 @@ package connectionquality import ( + "math" "testing" "time" @@ -48,7 +49,7 @@ func TestConnectionQuality(t *testing.T) { require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) - // introduce loss and the score should drop - 10% loss for Opus -> POOR + // introduce loss and the score should drop - 12% loss for Opus -> POOR now = now.Add(duration) streams = map[uint32]*buffer.StreamStatsWithLayers{ 1: &buffer.StreamStatsWithLayers{ @@ -56,7 +57,7 @@ func TestConnectionQuality(t *testing.T) { StartTime: now, Duration: duration, Packets: 250, - PacketsLost: 25, + PacketsLost: 30, }, }, } @@ -65,25 +66,8 @@ func TestConnectionQuality(t *testing.T) { require.Greater(t, float32(3.2), mos) require.Equal(t, livekit.ConnectionQuality_POOR, quality) - // should stay at POOR quality for poor threshold wait even if the conditions improve - for i := 0; i < 3; i++ { - now = now.Add(duration) - streams = map[uint32]*buffer.StreamStatsWithLayers{ - 1: &buffer.StreamStatsWithLayers{ - RTPStats: &buffer.RTPDeltaInfo{ - StartTime: now, - Duration: duration, - Packets: 250, - }, - }, - } - cs.updateScore(streams, now.Add(duration)) - mos, quality = cs.GetScoreAndQuality() - require.Greater(t, float32(3.2), mos) - require.Equal(t, livekit.ConnectionQuality_POOR, quality) - } - - // should return median quality which should be EXCELLENT as all windows in above loop have great conditions + // should stay at POOR quality for one iteration even if the conditions improve + // due to significant loss (12%) in the previous window now = now.Add(duration) streams = map[uint32]*buffer.StreamStatsWithLayers{ 1: &buffer.StreamStatsWithLayers{ @@ -96,98 +80,10 @@ func TestConnectionQuality(t *testing.T) { } cs.updateScore(streams, now.Add(duration)) mos, quality = cs.GetScoreAndQuality() - require.Greater(t, float32(4.6), mos) - require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) - - // introduce loss and the score should drop - 3% loss for Opus -> GOOD - now = now.Add(duration) - streams = map[uint32]*buffer.StreamStatsWithLayers{ - 1: &buffer.StreamStatsWithLayers{ - RTPStats: &buffer.RTPDeltaInfo{ - StartTime: now, - Duration: duration, - Packets: 250, - PacketsLost: 8, - }, - }, - } - cs.updateScore(streams, now.Add(duration)) - mos, quality = cs.GetScoreAndQuality() - require.Greater(t, float32(4.1), mos) - require.Equal(t, livekit.ConnectionQuality_GOOD, quality) - - // should stay at GOOD quality for good threshold (which is shorter than poor threshold) wait even if the conditions improve - for i := 0; i < 1; i++ { - now = now.Add(duration) - streams = map[uint32]*buffer.StreamStatsWithLayers{ - 1: &buffer.StreamStatsWithLayers{ - RTPStats: &buffer.RTPDeltaInfo{ - StartTime: now, - Duration: duration, - Packets: 250, - }, - }, - } - cs.updateScore(streams, now.Add(duration)) - mos, quality = cs.GetScoreAndQuality() - require.Greater(t, float32(4.1), mos) - require.Equal(t, livekit.ConnectionQuality_GOOD, quality) - } - - // should return median quality which should be EXCELLENT as all windows in above loop have great conditions - now = now.Add(duration) - streams = map[uint32]*buffer.StreamStatsWithLayers{ - 1: &buffer.StreamStatsWithLayers{ - RTPStats: &buffer.RTPDeltaInfo{ - StartTime: now, - Duration: duration, - Packets: 250, - }, - }, - } - cs.updateScore(streams, now.Add(duration)) - mos, quality = cs.GetScoreAndQuality() - require.Greater(t, float32(4.6), mos) - require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) - - // POOR -> GOOD -> EXCELLENT should take longer - now = now.Add(duration) - streams = map[uint32]*buffer.StreamStatsWithLayers{ - 1: &buffer.StreamStatsWithLayers{ - RTPStats: &buffer.RTPDeltaInfo{ - StartTime: now, - Duration: duration, - Packets: 250, - PacketsLost: 25, - }, - }, - } - cs.updateScore(streams, now.Add(duration)) - mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(3.2), mos) require.Equal(t, livekit.ConnectionQuality_POOR, quality) - // should stay at POOR quality for poor threshold wait even if the conditions improve - for i := 0; i < 3; i++ { - now = now.Add(duration) - streams = map[uint32]*buffer.StreamStatsWithLayers{ - 1: &buffer.StreamStatsWithLayers{ - RTPStats: &buffer.RTPDeltaInfo{ - StartTime: now, - Duration: duration, - Packets: 250, - PacketsLost: 8, - }, - }, - } - cs.updateScore(streams, now.Add(duration)) - mos, quality = cs.GetScoreAndQuality() - require.Greater(t, float32(3.2), mos) - require.Equal(t, livekit.ConnectionQuality_POOR, quality) - } - - // should return median quality which should be GOOD as all windows in above loop have some loss (i. e. GOOD quality). - // although the below update has no loss (EXCELLENT quality), median should be at GOOO + // should climb up to GOOD if conditions continue to be good now = now.Add(duration) streams = map[uint32]*buffer.StreamStatsWithLayers{ 1: &buffer.StreamStatsWithLayers{ @@ -203,7 +99,56 @@ func TestConnectionQuality(t *testing.T) { require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) - // another EXCELLENT quality window and the median should switch to EXCELLENT + // should climb up to EXCELLENT if conditions continue to be good + now = now.Add(duration) + streams = map[uint32]*buffer.StreamStatsWithLayers{ + 1: &buffer.StreamStatsWithLayers{ + RTPStats: &buffer.RTPDeltaInfo{ + StartTime: now, + Duration: duration, + Packets: 250, + }, + }, + } + cs.updateScore(streams, now.Add(duration)) + mos, quality = cs.GetScoreAndQuality() + require.Greater(t, float32(4.6), mos) + require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) + + // introduce loss and the score should drop - 5% loss for Opus -> GOOD + now = now.Add(duration) + streams = map[uint32]*buffer.StreamStatsWithLayers{ + 1: &buffer.StreamStatsWithLayers{ + RTPStats: &buffer.RTPDeltaInfo{ + StartTime: now, + Duration: duration, + Packets: 250, + PacketsLost: 13, + }, + }, + } + cs.updateScore(streams, now.Add(duration)) + mos, quality = cs.GetScoreAndQuality() + require.Greater(t, float32(4.1), mos) + require.Equal(t, livekit.ConnectionQuality_GOOD, quality) + + // should stay at GOOD quality for another iteration even if the conditions improve + now = now.Add(duration) + streams = map[uint32]*buffer.StreamStatsWithLayers{ + 1: &buffer.StreamStatsWithLayers{ + RTPStats: &buffer.RTPDeltaInfo{ + StartTime: now, + Duration: duration, + Packets: 250, + }, + }, + } + cs.updateScore(streams, now.Add(duration)) + mos, quality = cs.GetScoreAndQuality() + require.Greater(t, float32(4.1), mos) + require.Equal(t, livekit.ConnectionQuality_GOOD, quality) + + // should climb up to EXCELLENT if conditions continue to be good now = now.Add(duration) streams = map[uint32]*buffer.StreamStatsWithLayers{ 1: &buffer.StreamStatsWithLayers{ @@ -367,6 +312,7 @@ func TestConnectionQuality(t *testing.T) { packetsExpected uint32 expectedQualities []expectedQuality }{ + // NOTE: Because of EWMA (Exponentially Weighted Moving Average), these cut off points are not exact // "audio/opus" - no fec - 0 <= loss < 2.5%: EXCELLENT, 2.5% <= loss < 5%: GOOD, >= 5%: POOR { name: "audio/opus - no fec", @@ -385,7 +331,7 @@ func TestConnectionQuality(t *testing.T) { expectedQuality: livekit.ConnectionQuality_GOOD, }, { - packetLossPercentage: 5.0, + packetLossPercentage: 5.2, expectedMOS: 3.2, expectedQuality: livekit.ConnectionQuality_POOR, }, @@ -404,12 +350,12 @@ func TestConnectionQuality(t *testing.T) { expectedQuality: livekit.ConnectionQuality_EXCELLENT, }, { - packetLossPercentage: 4.0, + packetLossPercentage: 4.1, expectedMOS: 4.1, expectedQuality: livekit.ConnectionQuality_GOOD, }, { - packetLossPercentage: 8.0, + packetLossPercentage: 8.2, expectedMOS: 3.2, expectedQuality: livekit.ConnectionQuality_POOR, }, @@ -457,7 +403,7 @@ func TestConnectionQuality(t *testing.T) { expectedQuality: livekit.ConnectionQuality_GOOD, }, { - packetLossPercentage: 20.0, + packetLossPercentage: 22.0, expectedMOS: 3.2, expectedQuality: livekit.ConnectionQuality_POOR, }, @@ -476,7 +422,7 @@ func TestConnectionQuality(t *testing.T) { expectedQuality: livekit.ConnectionQuality_EXCELLENT, }, { - packetLossPercentage: 2.0, + packetLossPercentage: 2.5, expectedMOS: 4.1, expectedQuality: livekit.ConnectionQuality_GOOD, }, @@ -504,7 +450,7 @@ func TestConnectionQuality(t *testing.T) { StartTime: now, Duration: duration, Packets: tc.packetsExpected, - PacketsLost: uint32(eq.packetLossPercentage * float64(tc.packetsExpected) / 100.0), + PacketsLost: uint32(math.Ceil(eq.packetLossPercentage * float64(tc.packetsExpected) / 100.0)), }, }, } @@ -531,6 +477,7 @@ func TestConnectionQuality(t *testing.T) { expectedMOS float32 expectedQuality livekit.ConnectionQuality }{ + // NOTE: Because of EWMA (Exponentially Weighted Moving Average), these cut off points are not exact // 1.0 <= expectedBits / actualBits < ~2.7 = EXCELLENT // ~2.7 <= expectedBits / actualBits < ~7.5 = GOOD // expectedBits / actualBits >= ~7.5 = POOR @@ -560,7 +507,7 @@ func TestConnectionQuality(t *testing.T) { offset: 3 * time.Second, }, }, - bytes: 7_000_000 / 8 / 3, + bytes: uint64(math.Ceil(7_000_000.0 / 8.0 / 3.5)), expectedMOS: 4.1, expectedQuality: livekit.ConnectionQuality_GOOD, }, @@ -575,7 +522,7 @@ func TestConnectionQuality(t *testing.T) { offset: 3 * time.Second, }, }, - bytes: 8_000_000 / 8 / 8, + bytes: uint64(math.Ceil(8_000_000.0 / 8.0 / 13.0)), expectedMOS: 3.2, expectedQuality: livekit.ConnectionQuality_POOR, }, diff --git a/pkg/sfu/connectionquality/scorer.go b/pkg/sfu/connectionquality/scorer.go index 1b935be6f..8d77f31e6 100644 --- a/pkg/sfu/connectionquality/scorer.go +++ b/pkg/sfu/connectionquality/scorer.go @@ -3,7 +3,6 @@ package connectionquality import ( "fmt" "math" - "sort" "sync" "time" @@ -17,8 +16,8 @@ const ( maxScore = float64(100.0) poorScore = float64(50.0) - waitForQualityPoor = 5 - waitForQualityGood = 3 + increaseFactor = float64(0.4) // slow increase + decreaseFactor = float64(0.8) // fast decrease unmuteTimeThreshold = float64(0.5) ) @@ -101,62 +100,6 @@ func (w *windowStat) String() string { // ------------------------------------------ -type windowScore struct { - stat *windowStat - score float64 -} - -func newWindowScorePacket(stat *windowStat, plw float64) *windowScore { - return &windowScore{ - stat: stat, - score: stat.calculatePacketScore(plw), - } -} - -func newWindowScoreByte(stat *windowStat, expectedBitrate int64) *windowScore { - return &windowScore{ - stat: stat, - score: stat.calculateByteScore(expectedBitrate), - } -} - -func newWindowScoreWithScore(stat *windowStat, score float64) *windowScore { - return &windowScore{ - stat: stat, - score: score, - } -} - -func (w *windowScore) getScore() float64 { - return w.score -} - -func (w *windowScore) String() string { - return fmt.Sprintf("stat: {%+v}, score: %0.2f", w.stat, w.score) -} - -// ------------------------------------------ - -type qualityScorerState int - -const ( - qualityScorerStateStable qualityScorerState = iota - qualityScorerStateRecovering -) - -func (q qualityScorerState) String() string { - switch q { - case qualityScorerStateStable: - return "STABLE" - case qualityScorerStateRecovering: - return "RECOVERING" - default: - return fmt.Sprintf("%d", int(q)) - } -} - -// ------------------------------------------ - type layerTransition struct { startedAt time.Time bitrate int64 @@ -173,9 +116,7 @@ type qualityScorer struct { lock sync.RWMutex lastUpdateAt time.Time - score float64 - state qualityScorerState - windows []*windowScore + score float64 mutedAt time.Time unmutedAt time.Time @@ -192,7 +133,6 @@ func newQualityScorer(params qualityScorerParams) *qualityScorer { return &qualityScorer{ params: params, score: maxScore, - state: qualityScorerStateStable, } } @@ -209,10 +149,6 @@ func (q *qualityScorer) UpdateMute(isMuted bool, at time.Time) { if isMuted { q.mutedAt = at - - // stable when muted - q.state = qualityScorerStateStable - q.windows = q.windows[:0] q.score = maxScore } else { q.unmutedAt = at @@ -230,10 +166,6 @@ func (q *qualityScorer) AddTransition(bitrate int64, at time.Time) { if bitrate == 0 { q.layersMutedAt = at - - // stable when no layers expected - q.state = qualityScorerStateStable - q.windows = q.windows[:0] q.score = maxScore } else { if q.layersUnmutedAt.IsZero() || q.layersMutedAt.After(q.layersUnmutedAt) { @@ -261,64 +193,42 @@ func (q *qualityScorer) Update(stat *windowStat, at time.Time) { } reason := "none" - var ws *windowScore + var score float64 if stat.packetsExpected == 0 { reason = "dry" - ws = newWindowScoreWithScore(stat, poorScore) + score = poorScore } else { - wsPacket := newWindowScorePacket(stat, q.getPacketLossWeight(stat)) - wsByte := newWindowScoreByte(stat, expectedBitrate) - if wsPacket.getScore() < wsByte.getScore() { + packetScore := stat.calculatePacketScore(q.getPacketLossWeight(stat)) + byteScore := stat.calculateByteScore(expectedBitrate) + if packetScore < byteScore { reason = "packet" - ws = wsPacket + score = packetScore } else { reason = "bitrate" - ws = wsByte + score = byteScore } } - score := ws.getScore() - cq := scoreToConnectionQuality(score) - - q.lastUpdateAt = at - - // transition to start of recovering on any quality drop + factor := increaseFactor + if score < q.score { + factor = decreaseFactor + } + score = factor*score + (1.0-factor)*q.score // WARNING NOTE: comparing protobuf enum values directly (livekit.ConnectionQuality) - if scoreToConnectionQuality(q.score) > cq { - q.windows = []*windowScore{ws} - q.state = qualityScorerStateRecovering - q.score = score + if scoreToConnectionQuality(q.score) > scoreToConnectionQuality(score) { q.params.Logger.Infow( "quality drop", "reason", reason, - "score", q.score, - "quality", scoreToConnectionQuality(q.score), - "window", ws, + "prevScore", q.score, + "prevQuality", scoreToConnectionQuality(q.score), + "score", score, + "quality", scoreToConnectionQuality(score), + "stat", stat, "expectedBitrate", expectedBitrate, ) - return } - // if stable and quality continues to be EXCELLENT, stay there. - if q.state == qualityScorerStateStable && cq == livekit.ConnectionQuality_EXCELLENT { - q.score = score - return - } - - // when recovering, look at a longer window - q.windows = append(q.windows, ws) - if !q.prune(at) { - // minimum recovery duration not satisfied, hold at current quality - return - } - - // take median of scores in a longer window to prevent quality reporting oscillations - sort.Slice(q.windows, func(i, j int) bool { return q.windows[i].getScore() < q.windows[j].getScore() }) - mid := (len(q.windows)+1)/2 - 1 - q.score = q.windows[mid].getScore() - if scoreToConnectionQuality(q.score) == livekit.ConnectionQuality_EXCELLENT { - q.state = qualityScorerStateStable - q.windows = q.windows[:0] - } + q.score = score + q.lastUpdateAt = at } func (q *qualityScorer) isMuted() bool { @@ -372,25 +282,6 @@ func (q *qualityScorer) getPacketLossWeight(stat *windowStat) float64 { return math.Sqrt(pps/q.maxPPS) * q.params.PacketLossWeight } -func (q *qualityScorer) prune(at time.Time) bool { - cq := scoreToConnectionQuality(q.score) - - var wait int - if cq == livekit.ConnectionQuality_POOR { - wait = waitForQualityPoor - } else { - wait = waitForQualityGood - } - - oldest := len(q.windows) - wait - if oldest < 0 { - oldest = 0 - } - q.windows = q.windows[oldest:] - - return len(q.windows) >= wait -} - func (q *qualityScorer) getExpectedBitsAndUpdateTransitions(at time.Time) int64 { if len(q.transitions) == 0 { return 0