Use EWMA (Exponentially Weighted Moving Average) for score updates. (#1507)

* Use EWMA (Exponentially Weighted Moving Average) for score updates.

Makes code simpler, but makes it harder to test as the inflection points
are not exact.

Score falls a bit slower to be conservative on dropping quality too
quickly. Still fall factor is higher (i. e. newer scores get more
weight) than rise factor (i. e. newer scores get lower weight).
Slower rise factor to introduce hysteresis on things climibing back too
quickly.

In the extreme case, asympttotic conditions could cause unexpected
results. For example, having 4% loss of video continously will never
drop quality to `POOR`. It will get close to 60, but it will always
stay above 60 forever and hence quality will never drop to POOR.
Maybe, need some sort of variable thresholding to deal with that. But,
that is an extreme case and may not happen in real life.

* remove unused stuff
This commit is contained in:
Raja Subramanian
2023-03-09 13:52:01 +05:30
committed by GitHub
parent 14b0b48b15
commit e893d30fd0
2 changed files with 89 additions and 251 deletions

View File

@@ -1,6 +1,7 @@
package connectionquality
import (
"math"
"testing"
"time"
@@ -48,7 +49,7 @@ func TestConnectionQuality(t *testing.T) {
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
// introduce loss and the score should drop - 10% loss for Opus -> POOR
// introduce loss and the score should drop - 12% loss for Opus -> POOR
now = now.Add(duration)
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: &buffer.StreamStatsWithLayers{
@@ -56,7 +57,7 @@ func TestConnectionQuality(t *testing.T) {
StartTime: now,
Duration: duration,
Packets: 250,
PacketsLost: 25,
PacketsLost: 30,
},
},
}
@@ -65,25 +66,8 @@ func TestConnectionQuality(t *testing.T) {
require.Greater(t, float32(3.2), mos)
require.Equal(t, livekit.ConnectionQuality_POOR, quality)
// should stay at POOR quality for poor threshold wait even if the conditions improve
for i := 0; i < 3; i++ {
now = now.Add(duration)
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: &buffer.StreamStatsWithLayers{
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
Packets: 250,
},
},
}
cs.updateScore(streams, now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(3.2), mos)
require.Equal(t, livekit.ConnectionQuality_POOR, quality)
}
// should return median quality which should be EXCELLENT as all windows in above loop have great conditions
// should stay at POOR quality for one iteration even if the conditions improve
// due to significant loss (12%) in the previous window
now = now.Add(duration)
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: &buffer.StreamStatsWithLayers{
@@ -96,98 +80,10 @@ func TestConnectionQuality(t *testing.T) {
}
cs.updateScore(streams, now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
// introduce loss and the score should drop - 3% loss for Opus -> GOOD
now = now.Add(duration)
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: &buffer.StreamStatsWithLayers{
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
Packets: 250,
PacketsLost: 8,
},
},
}
cs.updateScore(streams, now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
// should stay at GOOD quality for good threshold (which is shorter than poor threshold) wait even if the conditions improve
for i := 0; i < 1; i++ {
now = now.Add(duration)
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: &buffer.StreamStatsWithLayers{
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
Packets: 250,
},
},
}
cs.updateScore(streams, now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
}
// should return median quality which should be EXCELLENT as all windows in above loop have great conditions
now = now.Add(duration)
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: &buffer.StreamStatsWithLayers{
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
Packets: 250,
},
},
}
cs.updateScore(streams, now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
// POOR -> GOOD -> EXCELLENT should take longer
now = now.Add(duration)
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: &buffer.StreamStatsWithLayers{
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
Packets: 250,
PacketsLost: 25,
},
},
}
cs.updateScore(streams, now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(3.2), mos)
require.Equal(t, livekit.ConnectionQuality_POOR, quality)
// should stay at POOR quality for poor threshold wait even if the conditions improve
for i := 0; i < 3; i++ {
now = now.Add(duration)
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: &buffer.StreamStatsWithLayers{
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
Packets: 250,
PacketsLost: 8,
},
},
}
cs.updateScore(streams, now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(3.2), mos)
require.Equal(t, livekit.ConnectionQuality_POOR, quality)
}
// should return median quality which should be GOOD as all windows in above loop have some loss (i. e. GOOD quality).
// although the below update has no loss (EXCELLENT quality), median should be at GOOO
// should climb up to GOOD if conditions continue to be good
now = now.Add(duration)
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: &buffer.StreamStatsWithLayers{
@@ -203,7 +99,56 @@ func TestConnectionQuality(t *testing.T) {
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
// another EXCELLENT quality window and the median should switch to EXCELLENT
// should climb up to EXCELLENT if conditions continue to be good
now = now.Add(duration)
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: &buffer.StreamStatsWithLayers{
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
Packets: 250,
},
},
}
cs.updateScore(streams, now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
// introduce loss and the score should drop - 5% loss for Opus -> GOOD
now = now.Add(duration)
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: &buffer.StreamStatsWithLayers{
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
Packets: 250,
PacketsLost: 13,
},
},
}
cs.updateScore(streams, now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
// should stay at GOOD quality for another iteration even if the conditions improve
now = now.Add(duration)
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: &buffer.StreamStatsWithLayers{
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
Packets: 250,
},
},
}
cs.updateScore(streams, now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
// should climb up to EXCELLENT if conditions continue to be good
now = now.Add(duration)
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: &buffer.StreamStatsWithLayers{
@@ -367,6 +312,7 @@ func TestConnectionQuality(t *testing.T) {
packetsExpected uint32
expectedQualities []expectedQuality
}{
// NOTE: Because of EWMA (Exponentially Weighted Moving Average), these cut off points are not exact
// "audio/opus" - no fec - 0 <= loss < 2.5%: EXCELLENT, 2.5% <= loss < 5%: GOOD, >= 5%: POOR
{
name: "audio/opus - no fec",
@@ -385,7 +331,7 @@ func TestConnectionQuality(t *testing.T) {
expectedQuality: livekit.ConnectionQuality_GOOD,
},
{
packetLossPercentage: 5.0,
packetLossPercentage: 5.2,
expectedMOS: 3.2,
expectedQuality: livekit.ConnectionQuality_POOR,
},
@@ -404,12 +350,12 @@ func TestConnectionQuality(t *testing.T) {
expectedQuality: livekit.ConnectionQuality_EXCELLENT,
},
{
packetLossPercentage: 4.0,
packetLossPercentage: 4.1,
expectedMOS: 4.1,
expectedQuality: livekit.ConnectionQuality_GOOD,
},
{
packetLossPercentage: 8.0,
packetLossPercentage: 8.2,
expectedMOS: 3.2,
expectedQuality: livekit.ConnectionQuality_POOR,
},
@@ -457,7 +403,7 @@ func TestConnectionQuality(t *testing.T) {
expectedQuality: livekit.ConnectionQuality_GOOD,
},
{
packetLossPercentage: 20.0,
packetLossPercentage: 22.0,
expectedMOS: 3.2,
expectedQuality: livekit.ConnectionQuality_POOR,
},
@@ -476,7 +422,7 @@ func TestConnectionQuality(t *testing.T) {
expectedQuality: livekit.ConnectionQuality_EXCELLENT,
},
{
packetLossPercentage: 2.0,
packetLossPercentage: 2.5,
expectedMOS: 4.1,
expectedQuality: livekit.ConnectionQuality_GOOD,
},
@@ -504,7 +450,7 @@ func TestConnectionQuality(t *testing.T) {
StartTime: now,
Duration: duration,
Packets: tc.packetsExpected,
PacketsLost: uint32(eq.packetLossPercentage * float64(tc.packetsExpected) / 100.0),
PacketsLost: uint32(math.Ceil(eq.packetLossPercentage * float64(tc.packetsExpected) / 100.0)),
},
},
}
@@ -531,6 +477,7 @@ func TestConnectionQuality(t *testing.T) {
expectedMOS float32
expectedQuality livekit.ConnectionQuality
}{
// NOTE: Because of EWMA (Exponentially Weighted Moving Average), these cut off points are not exact
// 1.0 <= expectedBits / actualBits < ~2.7 = EXCELLENT
// ~2.7 <= expectedBits / actualBits < ~7.5 = GOOD
// expectedBits / actualBits >= ~7.5 = POOR
@@ -560,7 +507,7 @@ func TestConnectionQuality(t *testing.T) {
offset: 3 * time.Second,
},
},
bytes: 7_000_000 / 8 / 3,
bytes: uint64(math.Ceil(7_000_000.0 / 8.0 / 3.5)),
expectedMOS: 4.1,
expectedQuality: livekit.ConnectionQuality_GOOD,
},
@@ -575,7 +522,7 @@ func TestConnectionQuality(t *testing.T) {
offset: 3 * time.Second,
},
},
bytes: 8_000_000 / 8 / 8,
bytes: uint64(math.Ceil(8_000_000.0 / 8.0 / 13.0)),
expectedMOS: 3.2,
expectedQuality: livekit.ConnectionQuality_POOR,
},

View File

@@ -3,7 +3,6 @@ package connectionquality
import (
"fmt"
"math"
"sort"
"sync"
"time"
@@ -17,8 +16,8 @@ const (
maxScore = float64(100.0)
poorScore = float64(50.0)
waitForQualityPoor = 5
waitForQualityGood = 3
increaseFactor = float64(0.4) // slow increase
decreaseFactor = float64(0.8) // fast decrease
unmuteTimeThreshold = float64(0.5)
)
@@ -101,62 +100,6 @@ func (w *windowStat) String() string {
// ------------------------------------------
type windowScore struct {
stat *windowStat
score float64
}
func newWindowScorePacket(stat *windowStat, plw float64) *windowScore {
return &windowScore{
stat: stat,
score: stat.calculatePacketScore(plw),
}
}
func newWindowScoreByte(stat *windowStat, expectedBitrate int64) *windowScore {
return &windowScore{
stat: stat,
score: stat.calculateByteScore(expectedBitrate),
}
}
func newWindowScoreWithScore(stat *windowStat, score float64) *windowScore {
return &windowScore{
stat: stat,
score: score,
}
}
func (w *windowScore) getScore() float64 {
return w.score
}
func (w *windowScore) String() string {
return fmt.Sprintf("stat: {%+v}, score: %0.2f", w.stat, w.score)
}
// ------------------------------------------
type qualityScorerState int
const (
qualityScorerStateStable qualityScorerState = iota
qualityScorerStateRecovering
)
func (q qualityScorerState) String() string {
switch q {
case qualityScorerStateStable:
return "STABLE"
case qualityScorerStateRecovering:
return "RECOVERING"
default:
return fmt.Sprintf("%d", int(q))
}
}
// ------------------------------------------
type layerTransition struct {
startedAt time.Time
bitrate int64
@@ -173,9 +116,7 @@ type qualityScorer struct {
lock sync.RWMutex
lastUpdateAt time.Time
score float64
state qualityScorerState
windows []*windowScore
score float64
mutedAt time.Time
unmutedAt time.Time
@@ -192,7 +133,6 @@ func newQualityScorer(params qualityScorerParams) *qualityScorer {
return &qualityScorer{
params: params,
score: maxScore,
state: qualityScorerStateStable,
}
}
@@ -209,10 +149,6 @@ func (q *qualityScorer) UpdateMute(isMuted bool, at time.Time) {
if isMuted {
q.mutedAt = at
// stable when muted
q.state = qualityScorerStateStable
q.windows = q.windows[:0]
q.score = maxScore
} else {
q.unmutedAt = at
@@ -230,10 +166,6 @@ func (q *qualityScorer) AddTransition(bitrate int64, at time.Time) {
if bitrate == 0 {
q.layersMutedAt = at
// stable when no layers expected
q.state = qualityScorerStateStable
q.windows = q.windows[:0]
q.score = maxScore
} else {
if q.layersUnmutedAt.IsZero() || q.layersMutedAt.After(q.layersUnmutedAt) {
@@ -261,64 +193,42 @@ func (q *qualityScorer) Update(stat *windowStat, at time.Time) {
}
reason := "none"
var ws *windowScore
var score float64
if stat.packetsExpected == 0 {
reason = "dry"
ws = newWindowScoreWithScore(stat, poorScore)
score = poorScore
} else {
wsPacket := newWindowScorePacket(stat, q.getPacketLossWeight(stat))
wsByte := newWindowScoreByte(stat, expectedBitrate)
if wsPacket.getScore() < wsByte.getScore() {
packetScore := stat.calculatePacketScore(q.getPacketLossWeight(stat))
byteScore := stat.calculateByteScore(expectedBitrate)
if packetScore < byteScore {
reason = "packet"
ws = wsPacket
score = packetScore
} else {
reason = "bitrate"
ws = wsByte
score = byteScore
}
}
score := ws.getScore()
cq := scoreToConnectionQuality(score)
q.lastUpdateAt = at
// transition to start of recovering on any quality drop
factor := increaseFactor
if score < q.score {
factor = decreaseFactor
}
score = factor*score + (1.0-factor)*q.score
// WARNING NOTE: comparing protobuf enum values directly (livekit.ConnectionQuality)
if scoreToConnectionQuality(q.score) > cq {
q.windows = []*windowScore{ws}
q.state = qualityScorerStateRecovering
q.score = score
if scoreToConnectionQuality(q.score) > scoreToConnectionQuality(score) {
q.params.Logger.Infow(
"quality drop",
"reason", reason,
"score", q.score,
"quality", scoreToConnectionQuality(q.score),
"window", ws,
"prevScore", q.score,
"prevQuality", scoreToConnectionQuality(q.score),
"score", score,
"quality", scoreToConnectionQuality(score),
"stat", stat,
"expectedBitrate", expectedBitrate,
)
return
}
// if stable and quality continues to be EXCELLENT, stay there.
if q.state == qualityScorerStateStable && cq == livekit.ConnectionQuality_EXCELLENT {
q.score = score
return
}
// when recovering, look at a longer window
q.windows = append(q.windows, ws)
if !q.prune(at) {
// minimum recovery duration not satisfied, hold at current quality
return
}
// take median of scores in a longer window to prevent quality reporting oscillations
sort.Slice(q.windows, func(i, j int) bool { return q.windows[i].getScore() < q.windows[j].getScore() })
mid := (len(q.windows)+1)/2 - 1
q.score = q.windows[mid].getScore()
if scoreToConnectionQuality(q.score) == livekit.ConnectionQuality_EXCELLENT {
q.state = qualityScorerStateStable
q.windows = q.windows[:0]
}
q.score = score
q.lastUpdateAt = at
}
func (q *qualityScorer) isMuted() bool {
@@ -372,25 +282,6 @@ func (q *qualityScorer) getPacketLossWeight(stat *windowStat) float64 {
return math.Sqrt(pps/q.maxPPS) * q.params.PacketLossWeight
}
func (q *qualityScorer) prune(at time.Time) bool {
cq := scoreToConnectionQuality(q.score)
var wait int
if cq == livekit.ConnectionQuality_POOR {
wait = waitForQualityPoor
} else {
wait = waitForQualityGood
}
oldest := len(q.windows) - wait
if oldest < 0 {
oldest = 0
}
q.windows = q.windows[oldest:]
return len(q.windows) >= wait
}
func (q *qualityScorer) getExpectedBitsAndUpdateTransitions(at time.Time) int64 {
if len(q.transitions) == 0 {
return 0