From e893d30fd08e86a6a690d5bf3041ac57eee5c543 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 9 Mar 2023 13:52:01 +0530 Subject: [PATCH] Use EWMA (Exponentially Weighted Moving Average) for score updates. (#1507) * Use EWMA (Exponentially Weighted Moving Average) for score updates. Makes code simpler, but makes it harder to test as the inflection points are not exact. Score falls a bit slower to be conservative on dropping quality too quickly. Still fall factor is higher (i. e. newer scores get more weight) than rise factor (i. e. newer scores get lower weight). Slower rise factor to introduce hysteresis on things climibing back too quickly. In the extreme case, asympttotic conditions could cause unexpected results. For example, having 4% loss of video continously will never drop quality to `POOR`. It will get close to 60, but it will always stay above 60 forever and hence quality will never drop to POOR. Maybe, need some sort of variable thresholding to deal with that. But, that is an extreme case and may not happen in real life. * remove unused stuff --- .../connectionquality/connectionstats_test.go | 185 +++++++----------- pkg/sfu/connectionquality/scorer.go | 155 +++------------ 2 files changed, 89 insertions(+), 251 deletions(-) diff --git a/pkg/sfu/connectionquality/connectionstats_test.go b/pkg/sfu/connectionquality/connectionstats_test.go index 7af09f19f..6fe4ef218 100644 --- a/pkg/sfu/connectionquality/connectionstats_test.go +++ b/pkg/sfu/connectionquality/connectionstats_test.go @@ -1,6 +1,7 @@ package connectionquality import ( + "math" "testing" "time" @@ -48,7 +49,7 @@ func TestConnectionQuality(t *testing.T) { require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) - // introduce loss and the score should drop - 10% loss for Opus -> POOR + // introduce loss and the score should drop - 12% loss for Opus -> POOR now = now.Add(duration) streams = map[uint32]*buffer.StreamStatsWithLayers{ 1: &buffer.StreamStatsWithLayers{ @@ -56,7 +57,7 @@ func TestConnectionQuality(t *testing.T) { StartTime: now, Duration: duration, Packets: 250, - PacketsLost: 25, + PacketsLost: 30, }, }, } @@ -65,25 +66,8 @@ func TestConnectionQuality(t *testing.T) { require.Greater(t, float32(3.2), mos) require.Equal(t, livekit.ConnectionQuality_POOR, quality) - // should stay at POOR quality for poor threshold wait even if the conditions improve - for i := 0; i < 3; i++ { - now = now.Add(duration) - streams = map[uint32]*buffer.StreamStatsWithLayers{ - 1: &buffer.StreamStatsWithLayers{ - RTPStats: &buffer.RTPDeltaInfo{ - StartTime: now, - Duration: duration, - Packets: 250, - }, - }, - } - cs.updateScore(streams, now.Add(duration)) - mos, quality = cs.GetScoreAndQuality() - require.Greater(t, float32(3.2), mos) - require.Equal(t, livekit.ConnectionQuality_POOR, quality) - } - - // should return median quality which should be EXCELLENT as all windows in above loop have great conditions + // should stay at POOR quality for one iteration even if the conditions improve + // due to significant loss (12%) in the previous window now = now.Add(duration) streams = map[uint32]*buffer.StreamStatsWithLayers{ 1: &buffer.StreamStatsWithLayers{ @@ -96,98 +80,10 @@ func TestConnectionQuality(t *testing.T) { } cs.updateScore(streams, now.Add(duration)) mos, quality = cs.GetScoreAndQuality() - require.Greater(t, float32(4.6), mos) - require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) - - // introduce loss and the score should drop - 3% loss for Opus -> GOOD - now = now.Add(duration) - streams = map[uint32]*buffer.StreamStatsWithLayers{ - 1: &buffer.StreamStatsWithLayers{ - RTPStats: &buffer.RTPDeltaInfo{ - StartTime: now, - Duration: duration, - Packets: 250, - PacketsLost: 8, - }, - }, - } - cs.updateScore(streams, now.Add(duration)) - mos, quality = cs.GetScoreAndQuality() - require.Greater(t, float32(4.1), mos) - require.Equal(t, livekit.ConnectionQuality_GOOD, quality) - - // should stay at GOOD quality for good threshold (which is shorter than poor threshold) wait even if the conditions improve - for i := 0; i < 1; i++ { - now = now.Add(duration) - streams = map[uint32]*buffer.StreamStatsWithLayers{ - 1: &buffer.StreamStatsWithLayers{ - RTPStats: &buffer.RTPDeltaInfo{ - StartTime: now, - Duration: duration, - Packets: 250, - }, - }, - } - cs.updateScore(streams, now.Add(duration)) - mos, quality = cs.GetScoreAndQuality() - require.Greater(t, float32(4.1), mos) - require.Equal(t, livekit.ConnectionQuality_GOOD, quality) - } - - // should return median quality which should be EXCELLENT as all windows in above loop have great conditions - now = now.Add(duration) - streams = map[uint32]*buffer.StreamStatsWithLayers{ - 1: &buffer.StreamStatsWithLayers{ - RTPStats: &buffer.RTPDeltaInfo{ - StartTime: now, - Duration: duration, - Packets: 250, - }, - }, - } - cs.updateScore(streams, now.Add(duration)) - mos, quality = cs.GetScoreAndQuality() - require.Greater(t, float32(4.6), mos) - require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) - - // POOR -> GOOD -> EXCELLENT should take longer - now = now.Add(duration) - streams = map[uint32]*buffer.StreamStatsWithLayers{ - 1: &buffer.StreamStatsWithLayers{ - RTPStats: &buffer.RTPDeltaInfo{ - StartTime: now, - Duration: duration, - Packets: 250, - PacketsLost: 25, - }, - }, - } - cs.updateScore(streams, now.Add(duration)) - mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(3.2), mos) require.Equal(t, livekit.ConnectionQuality_POOR, quality) - // should stay at POOR quality for poor threshold wait even if the conditions improve - for i := 0; i < 3; i++ { - now = now.Add(duration) - streams = map[uint32]*buffer.StreamStatsWithLayers{ - 1: &buffer.StreamStatsWithLayers{ - RTPStats: &buffer.RTPDeltaInfo{ - StartTime: now, - Duration: duration, - Packets: 250, - PacketsLost: 8, - }, - }, - } - cs.updateScore(streams, now.Add(duration)) - mos, quality = cs.GetScoreAndQuality() - require.Greater(t, float32(3.2), mos) - require.Equal(t, livekit.ConnectionQuality_POOR, quality) - } - - // should return median quality which should be GOOD as all windows in above loop have some loss (i. e. GOOD quality). - // although the below update has no loss (EXCELLENT quality), median should be at GOOO + // should climb up to GOOD if conditions continue to be good now = now.Add(duration) streams = map[uint32]*buffer.StreamStatsWithLayers{ 1: &buffer.StreamStatsWithLayers{ @@ -203,7 +99,56 @@ func TestConnectionQuality(t *testing.T) { require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) - // another EXCELLENT quality window and the median should switch to EXCELLENT + // should climb up to EXCELLENT if conditions continue to be good + now = now.Add(duration) + streams = map[uint32]*buffer.StreamStatsWithLayers{ + 1: &buffer.StreamStatsWithLayers{ + RTPStats: &buffer.RTPDeltaInfo{ + StartTime: now, + Duration: duration, + Packets: 250, + }, + }, + } + cs.updateScore(streams, now.Add(duration)) + mos, quality = cs.GetScoreAndQuality() + require.Greater(t, float32(4.6), mos) + require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) + + // introduce loss and the score should drop - 5% loss for Opus -> GOOD + now = now.Add(duration) + streams = map[uint32]*buffer.StreamStatsWithLayers{ + 1: &buffer.StreamStatsWithLayers{ + RTPStats: &buffer.RTPDeltaInfo{ + StartTime: now, + Duration: duration, + Packets: 250, + PacketsLost: 13, + }, + }, + } + cs.updateScore(streams, now.Add(duration)) + mos, quality = cs.GetScoreAndQuality() + require.Greater(t, float32(4.1), mos) + require.Equal(t, livekit.ConnectionQuality_GOOD, quality) + + // should stay at GOOD quality for another iteration even if the conditions improve + now = now.Add(duration) + streams = map[uint32]*buffer.StreamStatsWithLayers{ + 1: &buffer.StreamStatsWithLayers{ + RTPStats: &buffer.RTPDeltaInfo{ + StartTime: now, + Duration: duration, + Packets: 250, + }, + }, + } + cs.updateScore(streams, now.Add(duration)) + mos, quality = cs.GetScoreAndQuality() + require.Greater(t, float32(4.1), mos) + require.Equal(t, livekit.ConnectionQuality_GOOD, quality) + + // should climb up to EXCELLENT if conditions continue to be good now = now.Add(duration) streams = map[uint32]*buffer.StreamStatsWithLayers{ 1: &buffer.StreamStatsWithLayers{ @@ -367,6 +312,7 @@ func TestConnectionQuality(t *testing.T) { packetsExpected uint32 expectedQualities []expectedQuality }{ + // NOTE: Because of EWMA (Exponentially Weighted Moving Average), these cut off points are not exact // "audio/opus" - no fec - 0 <= loss < 2.5%: EXCELLENT, 2.5% <= loss < 5%: GOOD, >= 5%: POOR { name: "audio/opus - no fec", @@ -385,7 +331,7 @@ func TestConnectionQuality(t *testing.T) { expectedQuality: livekit.ConnectionQuality_GOOD, }, { - packetLossPercentage: 5.0, + packetLossPercentage: 5.2, expectedMOS: 3.2, expectedQuality: livekit.ConnectionQuality_POOR, }, @@ -404,12 +350,12 @@ func TestConnectionQuality(t *testing.T) { expectedQuality: livekit.ConnectionQuality_EXCELLENT, }, { - packetLossPercentage: 4.0, + packetLossPercentage: 4.1, expectedMOS: 4.1, expectedQuality: livekit.ConnectionQuality_GOOD, }, { - packetLossPercentage: 8.0, + packetLossPercentage: 8.2, expectedMOS: 3.2, expectedQuality: livekit.ConnectionQuality_POOR, }, @@ -457,7 +403,7 @@ func TestConnectionQuality(t *testing.T) { expectedQuality: livekit.ConnectionQuality_GOOD, }, { - packetLossPercentage: 20.0, + packetLossPercentage: 22.0, expectedMOS: 3.2, expectedQuality: livekit.ConnectionQuality_POOR, }, @@ -476,7 +422,7 @@ func TestConnectionQuality(t *testing.T) { expectedQuality: livekit.ConnectionQuality_EXCELLENT, }, { - packetLossPercentage: 2.0, + packetLossPercentage: 2.5, expectedMOS: 4.1, expectedQuality: livekit.ConnectionQuality_GOOD, }, @@ -504,7 +450,7 @@ func TestConnectionQuality(t *testing.T) { StartTime: now, Duration: duration, Packets: tc.packetsExpected, - PacketsLost: uint32(eq.packetLossPercentage * float64(tc.packetsExpected) / 100.0), + PacketsLost: uint32(math.Ceil(eq.packetLossPercentage * float64(tc.packetsExpected) / 100.0)), }, }, } @@ -531,6 +477,7 @@ func TestConnectionQuality(t *testing.T) { expectedMOS float32 expectedQuality livekit.ConnectionQuality }{ + // NOTE: Because of EWMA (Exponentially Weighted Moving Average), these cut off points are not exact // 1.0 <= expectedBits / actualBits < ~2.7 = EXCELLENT // ~2.7 <= expectedBits / actualBits < ~7.5 = GOOD // expectedBits / actualBits >= ~7.5 = POOR @@ -560,7 +507,7 @@ func TestConnectionQuality(t *testing.T) { offset: 3 * time.Second, }, }, - bytes: 7_000_000 / 8 / 3, + bytes: uint64(math.Ceil(7_000_000.0 / 8.0 / 3.5)), expectedMOS: 4.1, expectedQuality: livekit.ConnectionQuality_GOOD, }, @@ -575,7 +522,7 @@ func TestConnectionQuality(t *testing.T) { offset: 3 * time.Second, }, }, - bytes: 8_000_000 / 8 / 8, + bytes: uint64(math.Ceil(8_000_000.0 / 8.0 / 13.0)), expectedMOS: 3.2, expectedQuality: livekit.ConnectionQuality_POOR, }, diff --git a/pkg/sfu/connectionquality/scorer.go b/pkg/sfu/connectionquality/scorer.go index 1b935be6f..8d77f31e6 100644 --- a/pkg/sfu/connectionquality/scorer.go +++ b/pkg/sfu/connectionquality/scorer.go @@ -3,7 +3,6 @@ package connectionquality import ( "fmt" "math" - "sort" "sync" "time" @@ -17,8 +16,8 @@ const ( maxScore = float64(100.0) poorScore = float64(50.0) - waitForQualityPoor = 5 - waitForQualityGood = 3 + increaseFactor = float64(0.4) // slow increase + decreaseFactor = float64(0.8) // fast decrease unmuteTimeThreshold = float64(0.5) ) @@ -101,62 +100,6 @@ func (w *windowStat) String() string { // ------------------------------------------ -type windowScore struct { - stat *windowStat - score float64 -} - -func newWindowScorePacket(stat *windowStat, plw float64) *windowScore { - return &windowScore{ - stat: stat, - score: stat.calculatePacketScore(plw), - } -} - -func newWindowScoreByte(stat *windowStat, expectedBitrate int64) *windowScore { - return &windowScore{ - stat: stat, - score: stat.calculateByteScore(expectedBitrate), - } -} - -func newWindowScoreWithScore(stat *windowStat, score float64) *windowScore { - return &windowScore{ - stat: stat, - score: score, - } -} - -func (w *windowScore) getScore() float64 { - return w.score -} - -func (w *windowScore) String() string { - return fmt.Sprintf("stat: {%+v}, score: %0.2f", w.stat, w.score) -} - -// ------------------------------------------ - -type qualityScorerState int - -const ( - qualityScorerStateStable qualityScorerState = iota - qualityScorerStateRecovering -) - -func (q qualityScorerState) String() string { - switch q { - case qualityScorerStateStable: - return "STABLE" - case qualityScorerStateRecovering: - return "RECOVERING" - default: - return fmt.Sprintf("%d", int(q)) - } -} - -// ------------------------------------------ - type layerTransition struct { startedAt time.Time bitrate int64 @@ -173,9 +116,7 @@ type qualityScorer struct { lock sync.RWMutex lastUpdateAt time.Time - score float64 - state qualityScorerState - windows []*windowScore + score float64 mutedAt time.Time unmutedAt time.Time @@ -192,7 +133,6 @@ func newQualityScorer(params qualityScorerParams) *qualityScorer { return &qualityScorer{ params: params, score: maxScore, - state: qualityScorerStateStable, } } @@ -209,10 +149,6 @@ func (q *qualityScorer) UpdateMute(isMuted bool, at time.Time) { if isMuted { q.mutedAt = at - - // stable when muted - q.state = qualityScorerStateStable - q.windows = q.windows[:0] q.score = maxScore } else { q.unmutedAt = at @@ -230,10 +166,6 @@ func (q *qualityScorer) AddTransition(bitrate int64, at time.Time) { if bitrate == 0 { q.layersMutedAt = at - - // stable when no layers expected - q.state = qualityScorerStateStable - q.windows = q.windows[:0] q.score = maxScore } else { if q.layersUnmutedAt.IsZero() || q.layersMutedAt.After(q.layersUnmutedAt) { @@ -261,64 +193,42 @@ func (q *qualityScorer) Update(stat *windowStat, at time.Time) { } reason := "none" - var ws *windowScore + var score float64 if stat.packetsExpected == 0 { reason = "dry" - ws = newWindowScoreWithScore(stat, poorScore) + score = poorScore } else { - wsPacket := newWindowScorePacket(stat, q.getPacketLossWeight(stat)) - wsByte := newWindowScoreByte(stat, expectedBitrate) - if wsPacket.getScore() < wsByte.getScore() { + packetScore := stat.calculatePacketScore(q.getPacketLossWeight(stat)) + byteScore := stat.calculateByteScore(expectedBitrate) + if packetScore < byteScore { reason = "packet" - ws = wsPacket + score = packetScore } else { reason = "bitrate" - ws = wsByte + score = byteScore } } - score := ws.getScore() - cq := scoreToConnectionQuality(score) - - q.lastUpdateAt = at - - // transition to start of recovering on any quality drop + factor := increaseFactor + if score < q.score { + factor = decreaseFactor + } + score = factor*score + (1.0-factor)*q.score // WARNING NOTE: comparing protobuf enum values directly (livekit.ConnectionQuality) - if scoreToConnectionQuality(q.score) > cq { - q.windows = []*windowScore{ws} - q.state = qualityScorerStateRecovering - q.score = score + if scoreToConnectionQuality(q.score) > scoreToConnectionQuality(score) { q.params.Logger.Infow( "quality drop", "reason", reason, - "score", q.score, - "quality", scoreToConnectionQuality(q.score), - "window", ws, + "prevScore", q.score, + "prevQuality", scoreToConnectionQuality(q.score), + "score", score, + "quality", scoreToConnectionQuality(score), + "stat", stat, "expectedBitrate", expectedBitrate, ) - return } - // if stable and quality continues to be EXCELLENT, stay there. - if q.state == qualityScorerStateStable && cq == livekit.ConnectionQuality_EXCELLENT { - q.score = score - return - } - - // when recovering, look at a longer window - q.windows = append(q.windows, ws) - if !q.prune(at) { - // minimum recovery duration not satisfied, hold at current quality - return - } - - // take median of scores in a longer window to prevent quality reporting oscillations - sort.Slice(q.windows, func(i, j int) bool { return q.windows[i].getScore() < q.windows[j].getScore() }) - mid := (len(q.windows)+1)/2 - 1 - q.score = q.windows[mid].getScore() - if scoreToConnectionQuality(q.score) == livekit.ConnectionQuality_EXCELLENT { - q.state = qualityScorerStateStable - q.windows = q.windows[:0] - } + q.score = score + q.lastUpdateAt = at } func (q *qualityScorer) isMuted() bool { @@ -372,25 +282,6 @@ func (q *qualityScorer) getPacketLossWeight(stat *windowStat) float64 { return math.Sqrt(pps/q.maxPPS) * q.params.PacketLossWeight } -func (q *qualityScorer) prune(at time.Time) bool { - cq := scoreToConnectionQuality(q.score) - - var wait int - if cq == livekit.ConnectionQuality_POOR { - wait = waitForQualityPoor - } else { - wait = waitForQualityGood - } - - oldest := len(q.windows) - wait - if oldest < 0 { - oldest = 0 - } - q.windows = q.windows[oldest:] - - return len(q.windows) >= wait -} - func (q *qualityScorer) getExpectedBitsAndUpdateTransitions(at time.Time) int64 { if len(q.transitions) == 0 { return 0