From 45581433ccf9500cfb2a672defbc878fa3453528 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 27 Mar 2024 18:45:53 +0530 Subject: [PATCH] Add option to enable bitrate based scoring (#2600) --- pkg/sfu/connectionquality/connectionstats.go | 26 ++++--- .../connectionquality/connectionstats_test.go | 73 +++++++++++++------ pkg/sfu/connectionquality/scorer.go | 15 ++-- pkg/sfu/downtrack.go | 3 +- pkg/sfu/receiver.go | 24 ++++++ 5 files changed, 98 insertions(+), 43 deletions(-) diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index 16ac76d1d..c221ea130 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -45,14 +45,15 @@ type ConnectionStatsSenderProvider interface { } type ConnectionStatsParams struct { - UpdateInterval time.Duration - MimeType string - IsFECEnabled bool - IncludeRTT bool - IncludeJitter bool - ReceiverProvider ConnectionStatsReceiverProvider - SenderProvider ConnectionStatsSenderProvider - Logger logger.Logger + UpdateInterval time.Duration + MimeType string + IsFECEnabled bool + IncludeRTT bool + IncludeJitter bool + EnableBitrateScore bool + ReceiverProvider ConnectionStatsReceiverProvider + SenderProvider ConnectionStatsSenderProvider + Logger logger.Logger } type ConnectionStats struct { @@ -76,10 +77,11 @@ func NewConnectionStats(params ConnectionStatsParams) *ConnectionStats { return &ConnectionStats{ params: params, scorer: newQualityScorer(qualityScorerParams{ - PacketLossWeight: getPacketLossWeight(params.MimeType, params.IsFECEnabled), // LK-TODO: have to notify codec change? - IncludeRTT: params.IncludeRTT, - IncludeJitter: params.IncludeJitter, - Logger: params.Logger, + PacketLossWeight: getPacketLossWeight(params.MimeType, params.IsFECEnabled), // LK-TODO: have to notify codec change? + IncludeRTT: params.IncludeRTT, + IncludeJitter: params.IncludeJitter, + EnableBitrateScore: params.EnableBitrateScore, + Logger: params.Logger, }), } } diff --git a/pkg/sfu/connectionquality/connectionstats_test.go b/pkg/sfu/connectionquality/connectionstats_test.go index 7b07ba8bc..569238a40 100644 --- a/pkg/sfu/connectionquality/connectionstats_test.go +++ b/pkg/sfu/connectionquality/connectionstats_test.go @@ -26,23 +26,6 @@ import ( "github.com/livekit/protocol/logger" ) -func newConnectionStats( - mimeType string, - isFECEnabled bool, - includeRTT bool, - includeJitter bool, - receiverProvider ConnectionStatsReceiverProvider, -) *ConnectionStats { - return NewConnectionStats(ConnectionStatsParams{ - MimeType: mimeType, - IsFECEnabled: isFECEnabled, - IncludeRTT: includeRTT, - IncludeJitter: includeJitter, - ReceiverProvider: receiverProvider, - Logger: logger.GetLogger(), - }) -} - // ----------------------------------------------- type testReceiverProvider struct { @@ -66,7 +49,15 @@ func (trp *testReceiverProvider) GetDeltaStats() map[uint32]*buffer.StreamStatsW func TestConnectionQuality(t *testing.T) { trp := newTestReceiverProvider() t.Run("quality scorer operation", func(t *testing.T) { - cs := newConnectionStats("audio/opus", false, true, true, trp) + cs := NewConnectionStats(ConnectionStatsParams{ + MimeType: "audio/opus", + IsFECEnabled: false, + IncludeRTT: true, + IncludeJitter: true, + EnableBitrateScore: true, + ReceiverProvider: trp, + Logger: logger.GetLogger(), + }) duration := 5 * time.Second now := time.Now() @@ -441,7 +432,14 @@ func TestConnectionQuality(t *testing.T) { }) t.Run("quality scorer dependent rtt", func(t *testing.T) { - cs := newConnectionStats("audio/opus", false, false, true, trp) + cs := NewConnectionStats(ConnectionStatsParams{ + MimeType: "audio/opus", + IsFECEnabled: false, + IncludeRTT: false, + IncludeJitter: true, + ReceiverProvider: trp, + Logger: logger.GetLogger(), + }) duration := 5 * time.Second now := time.Now() @@ -469,7 +467,14 @@ func TestConnectionQuality(t *testing.T) { }) t.Run("quality scorer dependent jitter", func(t *testing.T) { - cs := newConnectionStats("audio/opus", false, true, false, trp) + cs := NewConnectionStats(ConnectionStatsParams{ + MimeType: "audio/opus", + IsFECEnabled: false, + IncludeRTT: true, + IncludeJitter: false, + ReceiverProvider: trp, + Logger: logger.GetLogger(), + }) duration := 5 * time.Second now := time.Now() @@ -634,7 +639,14 @@ func TestConnectionQuality(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - cs := newConnectionStats(tc.mimeType, tc.isFECEnabled, true, true, trp) + cs := NewConnectionStats(ConnectionStatsParams{ + MimeType: tc.mimeType, + IsFECEnabled: tc.isFECEnabled, + IncludeRTT: true, + IncludeJitter: true, + ReceiverProvider: trp, + Logger: logger.GetLogger(), + }) duration := 5 * time.Second now := time.Now() @@ -727,7 +739,15 @@ func TestConnectionQuality(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - cs := newConnectionStats("video/vp8", false, true, true, trp) + cs := NewConnectionStats(ConnectionStatsParams{ + MimeType: "video/vp8", + IsFECEnabled: false, + IncludeRTT: true, + IncludeJitter: true, + EnableBitrateScore: true, + ReceiverProvider: trp, + Logger: logger.GetLogger(), + }) duration := 5 * time.Second now := time.Now() @@ -814,7 +834,14 @@ func TestConnectionQuality(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - cs := newConnectionStats("video/vp8", false, true, true, trp) + cs := NewConnectionStats(ConnectionStatsParams{ + MimeType: "video/vp8", + IsFECEnabled: false, + IncludeRTT: true, + IncludeJitter: true, + ReceiverProvider: trp, + Logger: logger.GetLogger(), + }) duration := 5 * time.Second now := time.Now() diff --git a/pkg/sfu/connectionquality/scorer.go b/pkg/sfu/connectionquality/scorer.go index 33421fcb9..4ed30faa3 100644 --- a/pkg/sfu/connectionquality/scorer.go +++ b/pkg/sfu/connectionquality/scorer.go @@ -123,8 +123,8 @@ func (w *windowStat) calculatePacketScore(plw float64, includeRTT bool, includeJ return score } -func (w *windowStat) calculateBitrateScore(expectedBitrate int64) float64 { - if expectedBitrate == 0 { +func (w *windowStat) calculateBitrateScore(expectedBitrate int64, isEnabled bool) float64 { + if expectedBitrate == 0 || !isEnabled { // unsupported mode OR all layers stopped return cMaxScore } @@ -180,10 +180,11 @@ func (w *windowStat) MarshalLogObject(e zapcore.ObjectEncoder) error { // ------------------------------------------ type qualityScorerParams struct { - PacketLossWeight float64 - IncludeRTT bool - IncludeJitter bool - Logger logger.Logger + PacketLossWeight float64 + IncludeRTT bool + IncludeJitter bool + EnableBitrateScore bool + Logger logger.Logger } type qualityScorer struct { @@ -396,7 +397,7 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { score = qualityTransitionScore[livekit.ConnectionQuality_LOST] } else { packetScore := stat.calculatePacketScore(plw, q.params.IncludeRTT, q.params.IncludeJitter) - bitrateScore := stat.calculateBitrateScore(expectedBitrate) + bitrateScore := stat.calculateBitrateScore(expectedBitrate, q.params.EnableBitrateScore) layerScore := math.Max(math.Min(cMaxScore, cMaxScore-(expectedDistance*distanceWeight)), 0.0) minScore := math.Min(packetScore, bitrateScore) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 8312aeafd..8c0a38494 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1090,7 +1090,7 @@ func (d *DownTrack) UpTrackMaxTemporalLayerSeenChange(maxTemporalLayerSeen int32 } } -func (d *DownTrack) maybeAddTransition(_ int64, distance float64, pauseReason VideoPauseReason) { +func (d *DownTrack) maybeAddTransition(bitrate int64, distance float64, pauseReason VideoPauseReason) { if d.kind == webrtc.RTPCodecTypeAudio { return } @@ -1100,6 +1100,7 @@ func (d *DownTrack) maybeAddTransition(_ int64, distance float64, pauseReason Vi } else { d.connectionStats.UpdatePause(false) d.connectionStats.AddLayerTransition(distance) + d.connectionStats.AddBitrateTransition(bitrate) } } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 3028e98e3..2192c6a66 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -430,8 +430,31 @@ func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error { return nil } +func (w *WebRTCReceiver) notifyMaxExpectedLayer(layer int32) { + ti := w.TrackInfo() + if ti == nil { + return + } + + if w.Kind() == webrtc.RTPCodecTypeAudio || ti.Source == livekit.TrackSource_SCREEN_SHARE { + // screen share tracks have highly variable bitrate, do not use bit rate based quality for those + return + } + + expectedBitrate := int64(0) + for _, vl := range ti.Layers { + l := buffer.VideoQualityToSpatialLayer(vl.Quality, ti) + if l <= layer { + expectedBitrate += int64(vl.Bitrate) + } + } + + w.connectionStats.AddBitrateTransition(expectedBitrate) +} + func (w *WebRTCReceiver) SetMaxExpectedSpatialLayer(layer int32) { w.streamTrackerManager.SetMaxExpectedSpatialLayer(layer) + w.notifyMaxExpectedLayer(layer) if layer == buffer.InvalidLayerSpatial { w.connectionStats.UpdateLayerMute(true) @@ -463,6 +486,7 @@ func (w *WebRTCReceiver) OnMaxPublishedLayerChanged(maxPublishedLayer int32) { dt.UpTrackMaxPublishedLayerChange(maxPublishedLayer) }) + w.notifyMaxExpectedLayer(maxPublishedLayer) w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired()) }