From ed2eaaabb2b39efa1984c906bb64bd2123dd4dd6 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 15 Mar 2023 15:24:17 +0530 Subject: [PATCH] Add layer mute notification (#1522) * Layer mute * clean up * clean up * set max temporal layer seen on down track add --- pkg/sfu/connectionquality/connectionstats.go | 4 ++ .../connectionquality/connectionstats_test.go | 44 +++++++++++++++ pkg/sfu/connectionquality/scorer.go | 56 ++++++++++++++----- pkg/sfu/receiver.go | 9 ++- pkg/sfu/streamtrackermanager.go | 16 +++--- 5 files changed, 104 insertions(+), 25 deletions(-) diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index a1a320249..d2e4e30a3 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -86,6 +86,10 @@ func (cs *ConnectionStats) AddBitrateTransition(bitrate int64, at time.Time) { cs.scorer.AddBitrateTransition(bitrate, at) } +func (cs *ConnectionStats) UpdateLayerMute(isMuted bool, at time.Time) { + cs.scorer.UpdateLayerMute(isMuted, at) +} + func (cs *ConnectionStats) AddLayerTransition(distance float64, at time.Time) { cs.scorer.AddLayerTransition(distance, at) } diff --git a/pkg/sfu/connectionquality/connectionstats_test.go b/pkg/sfu/connectionquality/connectionstats_test.go index f29d4f3e8..91667cb39 100644 --- a/pkg/sfu/connectionquality/connectionstats_test.go +++ b/pkg/sfu/connectionquality/connectionstats_test.go @@ -297,6 +297,50 @@ func TestConnectionQuality(t *testing.T) { mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) + + // test layer mute via UpdateLayerMute API + cs.AddBitrateTransition(1_000_000, now) + cs.AddBitrateTransition(2_000_000, now.Add(2*time.Second)) + + streams = map[uint32]*buffer.StreamStatsWithLayers{ + 1: &buffer.StreamStatsWithLayers{ + RTPStats: &buffer.RTPDeltaInfo{ + StartTime: now, + Duration: duration, + Packets: 250, + Bytes: 8_000_000 / 8 / 4, + }, + }, + } + cs.updateScore(streams, now.Add(duration)) + mos, quality = cs.GetScoreAndQuality() + require.Greater(t, float32(4.1), mos) + require.Equal(t, livekit.ConnectionQuality_GOOD, quality) + + now = now.Add(duration) + cs.UpdateLayerMute(true, now) + mos, quality = cs.GetScoreAndQuality() + require.Greater(t, float32(4.6), mos) + require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) + + // setting bit rate after layer mute should layer unmute automatically + cs.AddBitrateTransition(1_000_000, now) + cs.AddBitrateTransition(2_000_000, now.Add(2*time.Second)) + + streams = map[uint32]*buffer.StreamStatsWithLayers{ + 1: &buffer.StreamStatsWithLayers{ + RTPStats: &buffer.RTPDeltaInfo{ + StartTime: now, + Duration: duration, + Packets: 250, + Bytes: 8_000_000 / 8 / 4, + }, + }, + } + cs.updateScore(streams, now.Add(duration)) + mos, quality = cs.GetScoreAndQuality() + require.Greater(t, float32(4.1), mos) + require.Equal(t, livekit.ConnectionQuality_GOOD, quality) }) t.Run("codecs - packet", func(t *testing.T) { diff --git a/pkg/sfu/connectionquality/scorer.go b/pkg/sfu/connectionquality/scorer.go index f094c8ecc..9e7a1b047 100644 --- a/pkg/sfu/connectionquality/scorer.go +++ b/pkg/sfu/connectionquality/scorer.go @@ -128,8 +128,8 @@ type qualityScorer struct { mutedAt time.Time unmutedAt time.Time - bitrateMutedAt time.Time - bitrateUnmutedAt time.Time + layerMutedAt time.Time + layerUnmutedAt time.Time maxPPS float64 @@ -173,11 +173,37 @@ func (q *qualityScorer) AddBitrateTransition(bitrate int64, at time.Time) { }) if bitrate == 0 { - q.bitrateMutedAt = at - q.score = maxScore + if !q.isLayerMuted() { + q.layerMutedAt = at + q.score = maxScore + } } else { - if q.bitrateUnmutedAt.IsZero() || q.bitrateMutedAt.After(q.bitrateUnmutedAt) { - q.bitrateUnmutedAt = at + if q.isLayerMuted() { + q.layerUnmutedAt = at + } + } +} + +func (q *qualityScorer) UpdateLayerMute(isMuted bool, at time.Time) { + q.lock.Lock() + defer q.lock.Unlock() + + if isMuted { + if !q.isLayerMuted() { + q.bitrateTransitions = append(q.bitrateTransitions, bitrateTransition{ + startedAt: at, + bitrate: 0, + }) + q.layerTransitions = append(q.layerTransitions, layerTransition{ + startedAt: at, + distance: 0.0, + }) + q.layerMutedAt = at + q.score = maxScore + } + } else { + if q.isLayerMuted() { + q.layerUnmutedAt = at } } } @@ -206,7 +232,7 @@ func (q *qualityScorer) Update(stat *windowStat, at time.Time) { // to stable and quality EXCELLENT for responsiveness. On an unmute, the // entire window data is considered (as long as enough time has passed since // unmute) including the data before mute. - if q.isMuted() || !q.isUnmutedEnough(at) || q.isBitrateMuted() { + if q.isMuted() || !q.isUnmutedEnough(at) || q.isLayerMuted() { q.lastUpdateAt = at return } @@ -274,16 +300,16 @@ func (q *qualityScorer) isUnmutedEnough(at time.Time) bool { sinceUnmute = at.Sub(q.unmutedAt) } - var sinceLayersUnmute time.Duration - if q.bitrateUnmutedAt.IsZero() { - sinceLayersUnmute = at.Sub(q.lastUpdateAt) + var sinceLayerUnmute time.Duration + if q.layerUnmutedAt.IsZero() { + sinceLayerUnmute = at.Sub(q.lastUpdateAt) } else { - sinceLayersUnmute = at.Sub(q.bitrateUnmutedAt) + sinceLayerUnmute = at.Sub(q.layerUnmutedAt) } validDuration := sinceUnmute - if sinceLayersUnmute < validDuration { - validDuration = sinceLayersUnmute + if sinceLayerUnmute < validDuration { + validDuration = sinceLayerUnmute } sinceLastUpdate := at.Sub(q.lastUpdateAt) @@ -291,8 +317,8 @@ func (q *qualityScorer) isUnmutedEnough(at time.Time) bool { return validDuration.Seconds()/sinceLastUpdate.Seconds() > unmuteTimeThreshold } -func (q *qualityScorer) isBitrateMuted() bool { - return !q.bitrateMutedAt.IsZero() && (q.bitrateUnmutedAt.IsZero() || q.bitrateMutedAt.After(q.bitrateUnmutedAt)) +func (q *qualityScorer) isLayerMuted() bool { + return !q.layerMutedAt.IsZero() && (q.layerUnmutedAt.IsZero() || q.layerMutedAt.After(q.layerUnmutedAt)) } func (q *qualityScorer) getPacketLossWeight(stat *windowStat) float64 { diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index ae7493fc5..52ba65fde 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -380,6 +380,7 @@ func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error { track.TrackInfoAvailable() track.UpTrackMaxPublishedLayerChange(w.streamTrackerManager.GetMaxPublishedLayer()) + track.UpTrackMaxTemporalLayerSeenChange(w.streamTrackerManager.GetMaxTemporalLayerSeen()) w.downTrackSpreader.Store(track) return nil @@ -388,7 +389,13 @@ func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error { func (w *WebRTCReceiver) SetMaxExpectedSpatialLayer(layer int32) { w.streamTrackerManager.SetMaxExpectedSpatialLayer(layer) - w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired(), time.Now()) + now := time.Now() + if layer == InvalidLayerSpatial { + w.connectionStats.UpdateLayerMute(true, now) + } else { + w.connectionStats.UpdateLayerMute(false, now) + w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired(), now) + } } // StreamTrackerManagerListener.OnAvailableLayersChanged diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 6fd9585f7..dee95f5a7 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -321,15 +321,6 @@ done: return float64(distance) / float64(s.maxTemporalLayerSeen+1) } -func (s *StreamTrackerManager) getMaxExpectedLayerLocked() int32 { - // find min of layer - maxExpectedLayer := s.maxExpectedLayer - if maxExpectedLayer > s.maxPublishedLayer { - maxExpectedLayer = s.maxPublishedLayer - } - return maxExpectedLayer -} - func (s *StreamTrackerManager) GetMaxPublishedLayer() int32 { s.lock.RLock() defer s.lock.RUnlock() @@ -538,6 +529,13 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in return ts + (srRef.SenderReportData.RTPTimestamp - normalizedTS), nil } +func (s *StreamTrackerManager) GetMaxTemporalLayerSeen() int32 { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.maxTemporalLayerSeen +} + func (s *StreamTrackerManager) updateMaxTemporalLayerSeen(brs Bitrates) { maxTemporalLayerSeen := InvalidLayerTemporal done: