From e7e8bbe72cd22a7ca2c1a2714bddaa3c2e316b2d Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 10 Mar 2023 23:22:22 +0530 Subject: [PATCH] Use an interface instead of a lot of callbacks. (#1510) --- pkg/sfu/receiver.go | 40 +++++++++----- pkg/sfu/streamtrackermanager.go | 93 ++++++++++++++------------------- 2 files changed, 66 insertions(+), 67 deletions(-) diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index b0b4f1576..3eab251e4 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -108,8 +108,8 @@ type WebRTCReceiver struct { connectionStats *connectionquality.ConnectionStats - // update stats - onStatsUpdate func(w *WebRTCReceiver, stat *livekit.AnalyticsStat) + onStatsUpdate func(w *WebRTCReceiver, stat *livekit.AnalyticsStat) + onMaxLayerChange func(maxLayer int32) primaryReceiver atomic.Value // *RedPrimaryReceiver redReceiver atomic.Value // *RedReceiver @@ -192,11 +192,7 @@ func NewWebRTCReceiver( } w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, trackersConfig) - w.streamTrackerManager.OnAvailableLayersChanged(w.downTrackLayerChange) - w.streamTrackerManager.OnBitrateAvailabilityChanged(w.downTrackBitrateAvailabilityChange) - w.streamTrackerManager.OnMaxPublishedLayerChanged(w.downTrackMaxPublishedLayerChange) - w.streamTrackerManager.OnMaxTemporalLayerSeenChanged(w.downTrackMaxTemporalLayerSeenChange) - w.streamTrackerManager.OnBitrateReport(w.downTrackBitrateReport) + w.streamTrackerManager.SetListener(w) for _, opt := range opts { w = opt(w) @@ -232,7 +228,9 @@ func (w *WebRTCReceiver) OnStatsUpdate(fn func(w *WebRTCReceiver, stat *livekit. } func (w *WebRTCReceiver) OnMaxLayerChange(fn func(maxLayer int32)) { - w.streamTrackerManager.OnMaxLayerChanged(fn) + w.upTrackMu.Lock() + w.onMaxLayerChange = fn + w.upTrackMu.Unlock() } func (w *WebRTCReceiver) GetConnectionScoreAndQuality() (float32, livekit.ConnectionQuality) { @@ -409,19 +407,22 @@ func (w *WebRTCReceiver) SetMaxExpectedSpatialLayer(layer int32) { w.notifyMaxExpectedLayer(layer) } -func (w *WebRTCReceiver) downTrackLayerChange() { +// StreamTrackerManagerListener.OnAvailableLayersChanged +func (w *WebRTCReceiver) OnAvailableLayersChanged() { for _, dt := range w.downTrackSpreader.GetDownTracks() { dt.UpTrackLayersChange() } } -func (w *WebRTCReceiver) downTrackBitrateAvailabilityChange() { +// StreamTrackerManagerListener.OnBitrateAvailabilityChanged +func (w *WebRTCReceiver) OnBitrateAvailabilityChanged() { for _, dt := range w.downTrackSpreader.GetDownTracks() { dt.UpTrackBitrateAvailabilityChange() } } -func (w *WebRTCReceiver) downTrackMaxPublishedLayerChange(maxPublishedLayer int32) { +// StreamTrackerManagerListener.OnMaxPublishedLayerChanged +func (w *WebRTCReceiver) OnMaxPublishedLayerChanged(maxPublishedLayer int32) { for _, dt := range w.downTrackSpreader.GetDownTracks() { dt.UpTrackMaxPublishedLayerChange(maxPublishedLayer) } @@ -429,7 +430,8 @@ func (w *WebRTCReceiver) downTrackMaxPublishedLayerChange(maxPublishedLayer int3 w.notifyMaxExpectedLayer(maxPublishedLayer) } -func (w *WebRTCReceiver) downTrackMaxTemporalLayerSeenChange(maxTemporalLayerSeen int32) { +// StreamTrackerManagerListener.OnMaxTemporalLayerSeenChanged +func (w *WebRTCReceiver) OnMaxTemporalLayerSeenChanged(maxTemporalLayerSeen int32) { for _, dt := range w.downTrackSpreader.GetDownTracks() { dt.UpTrackMaxTemporalLayerSeenChange(maxTemporalLayerSeen) } @@ -439,7 +441,19 @@ func (w *WebRTCReceiver) downTrackMaxTemporalLayerSeenChange(maxTemporalLayerSee } } -func (w *WebRTCReceiver) downTrackBitrateReport(availableLayers []int32, bitrates Bitrates) { +// StreamTrackerManagerListener.OnMaxAvailableLayerChanged +func (w *WebRTCReceiver) OnMaxAvailableLayerChanged(maxAvailableLayer int32) { + w.upTrackMu.RLock() + onMaxLayerChange := w.onMaxLayerChange + w.upTrackMu.RUnlock() + + if onMaxLayerChange != nil { + onMaxLayerChange(maxAvailableLayer) + } +} + +// StreamTrackerManagerListener.OnBitrateReport +func (w *WebRTCReceiver) OnBitrateReport(availableLayers []int32, bitrates Bitrates) { for _, dt := range w.downTrackSpreader.GetDownTracks() { dt.UpTrackBitrateReport(availableLayers, bitrates) } diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index ead3c2095..af4532af4 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -14,6 +14,15 @@ import ( "github.com/livekit/protocol/logger" ) +type StreamTrackerManagerListener interface { + OnAvailableLayersChanged() + OnBitrateAvailabilityChanged() + OnMaxPublishedLayerChanged(maxPublishedLayer int32) + OnMaxTemporalLayerSeenChanged(maxTemporalLayerSeen int32) + OnMaxAvailableLayerChanged(maxAvailableLayer int32) + OnBitrateReport(availableLayers []int32, bitrates Bitrates) +} + type StreamTrackerManager struct { logger logger.Logger trackInfo *livekit.TrackInfo @@ -37,12 +46,7 @@ type StreamTrackerManager struct { closed core.Fuse - onAvailableLayersChanged func() - onBitrateAvailabilityChanged func() - onMaxPublishedLayerChanged func(maxPublishedLayer int32) - onMaxTemporalLayerSeenChanged func(maxTemporalLayerSeen int32) - onMaxAvailableLayerChanged func(maxAvailableLayer int32) - onBitrateReport func(availableLayers []int32, bitrates Bitrates) + listener StreamTrackerManagerListener } func NewStreamTrackerManager( @@ -81,32 +85,17 @@ func (s *StreamTrackerManager) Close() { s.closed.Break() } -func (s *StreamTrackerManager) OnAvailableLayersChanged(f func()) { - s.onAvailableLayersChanged = f -} - -func (s *StreamTrackerManager) OnBitrateAvailabilityChanged(f func()) { - s.onBitrateAvailabilityChanged = f -} - -func (s *StreamTrackerManager) OnMaxPublishedLayerChanged(f func(maxPublishedLayer int32)) { - s.onMaxPublishedLayerChanged = f -} - -func (s *StreamTrackerManager) OnMaxTemporalLayerSeenChanged(f func(maxTemporalLayerSeen int32)) { +func (s *StreamTrackerManager) SetListener(listener StreamTrackerManagerListener) { s.lock.Lock() - s.onMaxTemporalLayerSeenChanged = f + s.listener = listener s.lock.Unlock() } -func (s *StreamTrackerManager) OnMaxLayerChanged(f func(maxAvailableLayer int32)) { - s.onMaxAvailableLayerChanged = f -} +func (s *StreamTrackerManager) getListener() StreamTrackerManagerListener { + s.lock.RLock() + defer s.lock.RUnlock() -func (s *StreamTrackerManager) OnBitrateReport(f func(availableLayers []int32, bitrates Bitrates)) { - s.lock.Lock() - s.onBitrateReport = f - s.lock.Unlock() + return s.listener } func (s *StreamTrackerManager) createStreamTrackerPacket(layer int32) streamtracker.StreamTrackerImpl { @@ -169,8 +158,8 @@ func (s *StreamTrackerManager) AddTracker(layer int32) *streamtracker.StreamTrac } }) tracker.OnBitrateAvailable(func() { - if s.onBitrateAvailabilityChanged != nil { - s.onBitrateAvailabilityChanged() + if listener := s.getListener(); listener != nil { + listener.OnBitrateAvailabilityChanged() } }) @@ -178,15 +167,17 @@ func (s *StreamTrackerManager) AddTracker(layer int32) *streamtracker.StreamTrac paused := s.paused s.trackers[layer] = tracker - var onMaxPublishedLayerChanged func(maxPublishedLayer int32) + notify := false if layer > s.maxPublishedLayer { s.maxPublishedLayer = layer - onMaxPublishedLayerChanged = s.onMaxPublishedLayerChanged + notify = true } s.lock.Unlock() - if onMaxPublishedLayerChanged != nil { - go onMaxPublishedLayerChanged(layer) + if notify { + if listener := s.getListener(); listener != nil { + go listener.OnMaxPublishedLayerChanged(layer) + } } tracker.SetPaused(paused) @@ -423,12 +414,12 @@ func (s *StreamTrackerManager) addAvailableLayer(layer int32) { ) s.lock.Unlock() - if s.onAvailableLayersChanged != nil { - s.onAvailableLayersChanged() - } + if listener := s.getListener(); listener != nil { + listener.OnAvailableLayersChanged() - if isMaxLayerChange && s.onMaxAvailableLayerChanged != nil { - s.onMaxAvailableLayerChanged(layer) + if isMaxLayerChange { + listener.OnMaxAvailableLayerChanged(layer) + } } } @@ -462,13 +453,13 @@ func (s *StreamTrackerManager) removeAvailableLayer(layer int32) { s.lock.Unlock() // need to immediately switch off unavailable layers - if s.onAvailableLayersChanged != nil { - s.onAvailableLayersChanged() - } + if listener := s.getListener(); listener != nil { + listener.OnAvailableLayersChanged() - // if maxLayer was removed, send the new maxLayer - if curMaxLayer != prevMaxLayer && s.onMaxAvailableLayerChanged != nil { - s.onMaxAvailableLayerChanged(curMaxLayer) + // if maxLayer was removed, send the new maxLayer + if curMaxLayer != prevMaxLayer { + listener.OnMaxAvailableLayerChanged(curMaxLayer) + } } } @@ -565,11 +556,10 @@ done: } s.maxTemporalLayerSeen = maxTemporalLayerSeen - onMaxTemporalLayerSeenChanged := s.onMaxTemporalLayerSeenChanged s.lock.Unlock() - if onMaxTemporalLayerSeenChanged != nil { - onMaxTemporalLayerSeenChanged(maxTemporalLayerSeen) + if listener := s.getListener(); listener != nil { + listener.OnMaxTemporalLayerSeenChanged(maxTemporalLayerSeen) } } @@ -586,14 +576,9 @@ func (s *StreamTrackerManager) bitrateReporter() { al, brs := s.GetLayeredBitrate() s.updateMaxTemporalLayerSeen(brs) - s.lock.RLock() - onBitrateReport := s.onBitrateReport - s.lock.RUnlock() - - if onBitrateReport != nil { - onBitrateReport(al, brs) + if listener := s.getListener(); listener != nil { + listener.OnBitrateReport(al, brs) } - } } }