diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index 9cb5c716c..fb5c7e710 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -16,8 +16,6 @@ import ( const ( UpdateInterval = 5 * time.Second - processThreshold = 0.95 - noStatsTooLongMultiplier = 2 noReceiverReportTooLongThreshold = 30 * time.Second ) @@ -120,9 +118,9 @@ func (cs *ConnectionStats) updateScoreWithAggregate(agg *buffer.RTPDeltaInfo, at return mos } -func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) float32 { +func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32, map[uint32]*buffer.StreamStatsWithLayers) { if cs.params.GetDeltaStatsOverridden == nil || cs.params.GetLastReceiverReportTime == nil { - return MinMOS + return MinMOS, nil } cs.lock.RLock() @@ -131,7 +129,7 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) float32 { if streamingStartedAt.IsZero() { // not streaming, just return current score mos, _ := cs.scorer.GetMOSAndQuality() - return mos + return mos, nil } streams := cs.params.GetDeltaStatsOverridden() @@ -143,12 +141,12 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) float32 { } if time.Since(marker) > noReceiverReportTooLongThreshold { // have not received receiver report for a long time when streaming, run with nil stat - return cs.updateScoreWithAggregate(nil, at) + return cs.updateScoreWithAggregate(nil, at), nil } // wait for receiver report, return current score mos, _ := cs.scorer.GetMOSAndQuality() - return mos + return mos, nil } // delta stat duration could be large due to not receiving receiver report for a long time (for example, due to mute), @@ -157,17 +155,27 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) float32 { if streamingStartedAt.After(cs.params.GetLastReceiverReportTime()) { // last receiver report was before streaming started, wait for next one mos, _ := cs.scorer.GetMOSAndQuality() - return mos + return mos, streams } if streamingStartedAt.After(agg.StartTime) { agg.Duration = agg.StartTime.Add(agg.Duration).Sub(streamingStartedAt) agg.StartTime = streamingStartedAt } - return cs.updateScoreWithAggregate(agg, at) + return cs.updateScoreWithAggregate(agg, at), streams } -func (cs *ConnectionStats) updateScore(streams map[uint32]*buffer.StreamStatsWithLayers, at time.Time) float32 { +func (cs *ConnectionStats) updateScore(at time.Time) (float32, map[uint32]*buffer.StreamStatsWithLayers) { + if cs.params.GetDeltaStats == nil { + return MinMOS, nil + } + + streams := cs.params.GetDeltaStats() + if len(streams) == 0 { + mos, _ := cs.scorer.GetMOSAndQuality() + return mos, nil + } + deltaInfoList := make([]*buffer.RTPDeltaInfo, 0, len(streams)) for _, s := range streams { deltaInfoList = append(deltaInfoList, s.RTPStats) @@ -185,7 +193,7 @@ func (cs *ConnectionStats) updateScore(streams map[uint32]*buffer.StreamStatsWit return cs.updateScoreFromReceiverReport(at) } - return cs.updateScoreWithAggregate(agg, at) + return cs.updateScoreWithAggregate(agg, at), streams } func (cs *ConnectionStats) maybeSetStreamingStart(at time.Time) { @@ -203,18 +211,9 @@ func (cs *ConnectionStats) clearStreamingStart() { } func (cs *ConnectionStats) getStat(at time.Time) { - if cs.params.GetDeltaStats == nil { - return - } + score, streams := cs.updateScore(at) - streams := cs.params.GetDeltaStats() - if len(streams) == 0 { - return - } - - score := cs.updateScore(streams, at) - - if cs.onStatsUpdate != nil { + if cs.onStatsUpdate != nil && len(streams) != 0 { analyticsStreams := make([]*livekit.AnalyticsStream, 0, len(streams)) for ssrc, stream := range streams { as := toAnalyticsStream(ssrc, stream.RTPStats) @@ -317,6 +316,13 @@ func toAggregateDeltaInfo(streams map[uint32]*buffer.StreamStatsWithLayers) *buf } func toAnalyticsStream(ssrc uint32, deltaStats *buffer.RTPDeltaInfo) *livekit.AnalyticsStream { + // discount the feed side loss when reporting forwarded track stats + packetsLost := deltaStats.PacketsLost + if deltaStats.PacketsMissing > packetsLost { + packetsLost = 0 + } else { + packetsLost -= deltaStats.PacketsMissing + } return &livekit.AnalyticsStream{ Ssrc: ssrc, PrimaryPackets: deltaStats.Packets, @@ -325,7 +331,7 @@ func toAnalyticsStream(ssrc uint32, deltaStats *buffer.RTPDeltaInfo) *livekit.An RetransmitBytes: deltaStats.BytesDuplicate, PaddingPackets: deltaStats.PacketsPadding, PaddingBytes: deltaStats.BytesPadding, - PacketsLost: deltaStats.PacketsLost, + PacketsLost: packetsLost, Frames: deltaStats.Frames, Rtt: deltaStats.RttMax, Jitter: uint32(deltaStats.JitterMax), diff --git a/pkg/sfu/connectionquality/connectionstats_test.go b/pkg/sfu/connectionquality/connectionstats_test.go index 57793ef85..dce11601d 100644 --- a/pkg/sfu/connectionquality/connectionstats_test.go +++ b/pkg/sfu/connectionquality/connectionstats_test.go @@ -12,19 +12,30 @@ import ( "github.com/livekit/protocol/logger" ) -func newConnectionStats(mimeType string, isFECEnabled bool, includeRTT bool, includeJitter bool) *ConnectionStats { +func newConnectionStats( + mimeType string, + isFECEnabled bool, + includeRTT bool, + includeJitter bool, + getDeltaStats func() map[uint32]*buffer.StreamStatsWithLayers, +) *ConnectionStats { return NewConnectionStats(ConnectionStatsParams{ MimeType: mimeType, IsFECEnabled: isFECEnabled, IncludeRTT: includeRTT, IncludeJitter: includeJitter, + GetDeltaStats: getDeltaStats, Logger: logger.GetLogger(), }) } func TestConnectionQuality(t *testing.T) { t.Run("quality scorer state machine", func(t *testing.T) { - cs := newConnectionStats("audio/opus", false, true, true) + var streams map[uint32]*buffer.StreamStatsWithLayers + getDeltaStats := func() map[uint32]*buffer.StreamStatsWithLayers { + return streams + } + cs := newConnectionStats("audio/opus", false, true, true, getDeltaStats) duration := 5 * time.Second now := time.Now() @@ -32,13 +43,13 @@ func TestConnectionQuality(t *testing.T) { cs.UpdateMute(false, now.Add(-1*time.Second)) // no data and not enough unmute time should return default state which is EXCELLENT quality - cs.updateScore(nil, now) + cs.updateScore(now) mos, quality := cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) // best conditions (no loss, jitter/rtt = 0) - quality should stay EXCELLENT - streams := map[uint32]*buffer.StreamStatsWithLayers{ + streams = map[uint32]*buffer.StreamStatsWithLayers{ 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, @@ -47,7 +58,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) @@ -72,7 +83,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(2.1), mos) require.Equal(t, livekit.ConnectionQuality_POOR, quality) @@ -90,7 +101,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) @@ -106,7 +117,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) @@ -122,7 +133,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) @@ -139,7 +150,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) @@ -155,7 +166,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) @@ -171,7 +182,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) @@ -188,7 +199,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(2.1), mos) require.Equal(t, livekit.ConnectionQuality_POOR, quality) @@ -212,7 +223,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) @@ -228,7 +239,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(2.1), mos) require.Equal(t, livekit.ConnectionQuality_POOR, quality) @@ -250,7 +261,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) @@ -274,7 +285,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) @@ -298,7 +309,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) @@ -324,7 +335,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) @@ -349,14 +360,18 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) }) t.Run("quality scorer dependent rtt", func(t *testing.T) { - cs := newConnectionStats("audio/opus", false, false, true) + var streams map[uint32]*buffer.StreamStatsWithLayers + getDeltaStats := func() map[uint32]*buffer.StreamStatsWithLayers { + return streams + } + cs := newConnectionStats("audio/opus", false, false, true, getDeltaStats) duration := 5 * time.Second now := time.Now() @@ -366,7 +381,7 @@ func TestConnectionQuality(t *testing.T) { // RTT does not knock quality down because it is dependent and hence not taken into account // at 2% loss, quality should stay at EXCELLENT purely based on loss. With high RTT (700 ms) // quality should drop to GOOD if RTT were taken into consideration - streams := map[uint32]*buffer.StreamStatsWithLayers{ + streams = map[uint32]*buffer.StreamStatsWithLayers{ 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, @@ -377,14 +392,18 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality := cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) }) t.Run("quality scorer dependent jitter", func(t *testing.T) { - cs := newConnectionStats("audio/opus", false, true, false) + var streams map[uint32]*buffer.StreamStatsWithLayers + getDeltaStats := func() map[uint32]*buffer.StreamStatsWithLayers { + return streams + } + cs := newConnectionStats("audio/opus", false, true, false, getDeltaStats) duration := 5 * time.Second now := time.Now() @@ -394,7 +413,7 @@ func TestConnectionQuality(t *testing.T) { // Jitter does not knock quality down because it is dependent and hence not taken into account // at 2% loss, quality should stay at EXCELLENT purely based on loss. With high jitter (200 ms) // quality should drop to GOOD if jitter were taken into consideration - streams := map[uint32]*buffer.StreamStatsWithLayers{ + streams = map[uint32]*buffer.StreamStatsWithLayers{ 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, @@ -405,7 +424,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality := cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) @@ -549,14 +568,18 @@ func TestConnectionQuality(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - cs := newConnectionStats(tc.mimeType, tc.isFECEnabled, true, true) + var streams map[uint32]*buffer.StreamStatsWithLayers + getDeltaStats := func() map[uint32]*buffer.StreamStatsWithLayers { + return streams + } + cs := newConnectionStats(tc.mimeType, tc.isFECEnabled, true, true, getDeltaStats) duration := 5 * time.Second now := time.Now() cs.Start(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration)) for _, eq := range tc.expectedQualities { - streams := map[uint32]*buffer.StreamStatsWithLayers{ + streams = map[uint32]*buffer.StreamStatsWithLayers{ 123: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, @@ -566,7 +589,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality := cs.GetScoreAndQuality() require.Greater(t, eq.expectedMOS, mos) require.Equal(t, eq.expectedQuality, quality) @@ -642,7 +665,11 @@ func TestConnectionQuality(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - cs := newConnectionStats("video/vp8", false, true, true) + var streams map[uint32]*buffer.StreamStatsWithLayers + getDeltaStats := func() map[uint32]*buffer.StreamStatsWithLayers { + return streams + } + cs := newConnectionStats("video/vp8", false, true, true, getDeltaStats) duration := 5 * time.Second now := time.Now() @@ -652,7 +679,7 @@ func TestConnectionQuality(t *testing.T) { cs.AddBitrateTransition(tr.bitrate, now.Add(tr.offset)) } - streams := map[uint32]*buffer.StreamStatsWithLayers{ + streams = map[uint32]*buffer.StreamStatsWithLayers{ 123: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, @@ -662,7 +689,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality := cs.GetScoreAndQuality() require.Greater(t, tc.expectedMOS, mos) require.Equal(t, tc.expectedQuality, quality) @@ -729,7 +756,11 @@ func TestConnectionQuality(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - cs := newConnectionStats("video/vp8", false, true, true) + var streams map[uint32]*buffer.StreamStatsWithLayers + getDeltaStats := func() map[uint32]*buffer.StreamStatsWithLayers { + return streams + } + cs := newConnectionStats("video/vp8", false, true, true, getDeltaStats) duration := 5 * time.Second now := time.Now() @@ -739,7 +770,7 @@ func TestConnectionQuality(t *testing.T) { cs.AddLayerTransition(tr.distance, now.Add(tr.offset)) } - streams := map[uint32]*buffer.StreamStatsWithLayers{ + streams = map[uint32]*buffer.StreamStatsWithLayers{ 123: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, @@ -748,7 +779,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(streams, now.Add(duration)) + cs.updateScore(now.Add(duration)) mos, quality := cs.GetScoreAndQuality() require.Greater(t, tc.expectedMOS, mos) require.Equal(t, tc.expectedQuality, quality)