Use an interface instead of a lot of callbacks. (#1510)

This commit is contained in:
Raja Subramanian
2023-03-10 23:22:22 +05:30
committed by GitHub
parent 53c8e611f6
commit e7e8bbe72c
2 changed files with 66 additions and 67 deletions
+27 -13
View File
@@ -108,8 +108,8 @@ type WebRTCReceiver struct {
connectionStats *connectionquality.ConnectionStats
// update stats
onStatsUpdate func(w *WebRTCReceiver, stat *livekit.AnalyticsStat)
onStatsUpdate func(w *WebRTCReceiver, stat *livekit.AnalyticsStat)
onMaxLayerChange func(maxLayer int32)
primaryReceiver atomic.Value // *RedPrimaryReceiver
redReceiver atomic.Value // *RedReceiver
@@ -192,11 +192,7 @@ func NewWebRTCReceiver(
}
w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, trackersConfig)
w.streamTrackerManager.OnAvailableLayersChanged(w.downTrackLayerChange)
w.streamTrackerManager.OnBitrateAvailabilityChanged(w.downTrackBitrateAvailabilityChange)
w.streamTrackerManager.OnMaxPublishedLayerChanged(w.downTrackMaxPublishedLayerChange)
w.streamTrackerManager.OnMaxTemporalLayerSeenChanged(w.downTrackMaxTemporalLayerSeenChange)
w.streamTrackerManager.OnBitrateReport(w.downTrackBitrateReport)
w.streamTrackerManager.SetListener(w)
for _, opt := range opts {
w = opt(w)
@@ -232,7 +228,9 @@ func (w *WebRTCReceiver) OnStatsUpdate(fn func(w *WebRTCReceiver, stat *livekit.
}
func (w *WebRTCReceiver) OnMaxLayerChange(fn func(maxLayer int32)) {
w.streamTrackerManager.OnMaxLayerChanged(fn)
w.upTrackMu.Lock()
w.onMaxLayerChange = fn
w.upTrackMu.Unlock()
}
func (w *WebRTCReceiver) GetConnectionScoreAndQuality() (float32, livekit.ConnectionQuality) {
@@ -409,19 +407,22 @@ func (w *WebRTCReceiver) SetMaxExpectedSpatialLayer(layer int32) {
w.notifyMaxExpectedLayer(layer)
}
func (w *WebRTCReceiver) downTrackLayerChange() {
// StreamTrackerManagerListener.OnAvailableLayersChanged
func (w *WebRTCReceiver) OnAvailableLayersChanged() {
for _, dt := range w.downTrackSpreader.GetDownTracks() {
dt.UpTrackLayersChange()
}
}
func (w *WebRTCReceiver) downTrackBitrateAvailabilityChange() {
// StreamTrackerManagerListener.OnBitrateAvailabilityChanged
func (w *WebRTCReceiver) OnBitrateAvailabilityChanged() {
for _, dt := range w.downTrackSpreader.GetDownTracks() {
dt.UpTrackBitrateAvailabilityChange()
}
}
func (w *WebRTCReceiver) downTrackMaxPublishedLayerChange(maxPublishedLayer int32) {
// StreamTrackerManagerListener.OnMaxPublishedLayerChanged
func (w *WebRTCReceiver) OnMaxPublishedLayerChanged(maxPublishedLayer int32) {
for _, dt := range w.downTrackSpreader.GetDownTracks() {
dt.UpTrackMaxPublishedLayerChange(maxPublishedLayer)
}
@@ -429,7 +430,8 @@ func (w *WebRTCReceiver) downTrackMaxPublishedLayerChange(maxPublishedLayer int3
w.notifyMaxExpectedLayer(maxPublishedLayer)
}
func (w *WebRTCReceiver) downTrackMaxTemporalLayerSeenChange(maxTemporalLayerSeen int32) {
// StreamTrackerManagerListener.OnMaxTemporalLayerSeenChanged
func (w *WebRTCReceiver) OnMaxTemporalLayerSeenChanged(maxTemporalLayerSeen int32) {
for _, dt := range w.downTrackSpreader.GetDownTracks() {
dt.UpTrackMaxTemporalLayerSeenChange(maxTemporalLayerSeen)
}
@@ -439,7 +441,19 @@ func (w *WebRTCReceiver) downTrackMaxTemporalLayerSeenChange(maxTemporalLayerSee
}
}
func (w *WebRTCReceiver) downTrackBitrateReport(availableLayers []int32, bitrates Bitrates) {
// StreamTrackerManagerListener.OnMaxAvailableLayerChanged
func (w *WebRTCReceiver) OnMaxAvailableLayerChanged(maxAvailableLayer int32) {
w.upTrackMu.RLock()
onMaxLayerChange := w.onMaxLayerChange
w.upTrackMu.RUnlock()
if onMaxLayerChange != nil {
onMaxLayerChange(maxAvailableLayer)
}
}
// StreamTrackerManagerListener.OnBitrateReport
func (w *WebRTCReceiver) OnBitrateReport(availableLayers []int32, bitrates Bitrates) {
for _, dt := range w.downTrackSpreader.GetDownTracks() {
dt.UpTrackBitrateReport(availableLayers, bitrates)
}
+39 -54
View File
@@ -14,6 +14,15 @@ import (
"github.com/livekit/protocol/logger"
)
type StreamTrackerManagerListener interface {
OnAvailableLayersChanged()
OnBitrateAvailabilityChanged()
OnMaxPublishedLayerChanged(maxPublishedLayer int32)
OnMaxTemporalLayerSeenChanged(maxTemporalLayerSeen int32)
OnMaxAvailableLayerChanged(maxAvailableLayer int32)
OnBitrateReport(availableLayers []int32, bitrates Bitrates)
}
type StreamTrackerManager struct {
logger logger.Logger
trackInfo *livekit.TrackInfo
@@ -37,12 +46,7 @@ type StreamTrackerManager struct {
closed core.Fuse
onAvailableLayersChanged func()
onBitrateAvailabilityChanged func()
onMaxPublishedLayerChanged func(maxPublishedLayer int32)
onMaxTemporalLayerSeenChanged func(maxTemporalLayerSeen int32)
onMaxAvailableLayerChanged func(maxAvailableLayer int32)
onBitrateReport func(availableLayers []int32, bitrates Bitrates)
listener StreamTrackerManagerListener
}
func NewStreamTrackerManager(
@@ -81,32 +85,17 @@ func (s *StreamTrackerManager) Close() {
s.closed.Break()
}
func (s *StreamTrackerManager) OnAvailableLayersChanged(f func()) {
s.onAvailableLayersChanged = f
}
func (s *StreamTrackerManager) OnBitrateAvailabilityChanged(f func()) {
s.onBitrateAvailabilityChanged = f
}
func (s *StreamTrackerManager) OnMaxPublishedLayerChanged(f func(maxPublishedLayer int32)) {
s.onMaxPublishedLayerChanged = f
}
func (s *StreamTrackerManager) OnMaxTemporalLayerSeenChanged(f func(maxTemporalLayerSeen int32)) {
func (s *StreamTrackerManager) SetListener(listener StreamTrackerManagerListener) {
s.lock.Lock()
s.onMaxTemporalLayerSeenChanged = f
s.listener = listener
s.lock.Unlock()
}
func (s *StreamTrackerManager) OnMaxLayerChanged(f func(maxAvailableLayer int32)) {
s.onMaxAvailableLayerChanged = f
}
func (s *StreamTrackerManager) getListener() StreamTrackerManagerListener {
s.lock.RLock()
defer s.lock.RUnlock()
func (s *StreamTrackerManager) OnBitrateReport(f func(availableLayers []int32, bitrates Bitrates)) {
s.lock.Lock()
s.onBitrateReport = f
s.lock.Unlock()
return s.listener
}
func (s *StreamTrackerManager) createStreamTrackerPacket(layer int32) streamtracker.StreamTrackerImpl {
@@ -169,8 +158,8 @@ func (s *StreamTrackerManager) AddTracker(layer int32) *streamtracker.StreamTrac
}
})
tracker.OnBitrateAvailable(func() {
if s.onBitrateAvailabilityChanged != nil {
s.onBitrateAvailabilityChanged()
if listener := s.getListener(); listener != nil {
listener.OnBitrateAvailabilityChanged()
}
})
@@ -178,15 +167,17 @@ func (s *StreamTrackerManager) AddTracker(layer int32) *streamtracker.StreamTrac
paused := s.paused
s.trackers[layer] = tracker
var onMaxPublishedLayerChanged func(maxPublishedLayer int32)
notify := false
if layer > s.maxPublishedLayer {
s.maxPublishedLayer = layer
onMaxPublishedLayerChanged = s.onMaxPublishedLayerChanged
notify = true
}
s.lock.Unlock()
if onMaxPublishedLayerChanged != nil {
go onMaxPublishedLayerChanged(layer)
if notify {
if listener := s.getListener(); listener != nil {
go listener.OnMaxPublishedLayerChanged(layer)
}
}
tracker.SetPaused(paused)
@@ -423,12 +414,12 @@ func (s *StreamTrackerManager) addAvailableLayer(layer int32) {
)
s.lock.Unlock()
if s.onAvailableLayersChanged != nil {
s.onAvailableLayersChanged()
}
if listener := s.getListener(); listener != nil {
listener.OnAvailableLayersChanged()
if isMaxLayerChange && s.onMaxAvailableLayerChanged != nil {
s.onMaxAvailableLayerChanged(layer)
if isMaxLayerChange {
listener.OnMaxAvailableLayerChanged(layer)
}
}
}
@@ -462,13 +453,13 @@ func (s *StreamTrackerManager) removeAvailableLayer(layer int32) {
s.lock.Unlock()
// need to immediately switch off unavailable layers
if s.onAvailableLayersChanged != nil {
s.onAvailableLayersChanged()
}
if listener := s.getListener(); listener != nil {
listener.OnAvailableLayersChanged()
// if maxLayer was removed, send the new maxLayer
if curMaxLayer != prevMaxLayer && s.onMaxAvailableLayerChanged != nil {
s.onMaxAvailableLayerChanged(curMaxLayer)
// if maxLayer was removed, send the new maxLayer
if curMaxLayer != prevMaxLayer {
listener.OnMaxAvailableLayerChanged(curMaxLayer)
}
}
}
@@ -565,11 +556,10 @@ done:
}
s.maxTemporalLayerSeen = maxTemporalLayerSeen
onMaxTemporalLayerSeenChanged := s.onMaxTemporalLayerSeenChanged
s.lock.Unlock()
if onMaxTemporalLayerSeenChanged != nil {
onMaxTemporalLayerSeenChanged(maxTemporalLayerSeen)
if listener := s.getListener(); listener != nil {
listener.OnMaxTemporalLayerSeenChanged(maxTemporalLayerSeen)
}
}
@@ -586,14 +576,9 @@ func (s *StreamTrackerManager) bitrateReporter() {
al, brs := s.GetLayeredBitrate()
s.updateMaxTemporalLayerSeen(brs)
s.lock.RLock()
onBitrateReport := s.onBitrateReport
s.lock.RUnlock()
if onBitrateReport != nil {
onBitrateReport(al, brs)
if listener := s.getListener(); listener != nil {
listener.OnBitrateReport(al, brs)
}
}
}
}