diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 886b84506..b1337c24e 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -193,7 +193,6 @@ func NewMediaTrack(params MediaTrackParams, ti *livekit.TrackInfo) *MediaTrack { } }) - t.SetMuted(ti.Muted) return t } diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 04af89215..9c896b644 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -503,15 +503,9 @@ func (t *MediaTrackReceiver) SetMuted(muted bool) { trackInfo := t.TrackInfoClone() trackInfo.Muted = muted t.trackInfo.Store(trackInfo) - - receivers := t.receivers t.lock.Unlock() - for _, receiver := range receivers { - receiver.SetUpTrackPaused(muted) - } - - t.MediaTrackSubscriptions.SetMuted(muted) + t.updateTrackInfoOfReceivers() } func (t *MediaTrackReceiver) IsEncrypted() bool { @@ -652,6 +646,8 @@ func (t *MediaTrackReceiver) updateTrackInfoOfReceivers() { for _, r := range t.loadReceivers() { r.UpdateTrackInfo(ti) } + + t.MediaTrackSubscriptions.SetMuted(ti.GetMuted()) } func (t *MediaTrackReceiver) SetLayerSsrcsForRid(mimeType mime.MimeType, rid string, ssrc uint32, repairSSRC uint32) { @@ -852,7 +848,6 @@ func (t *MediaTrackReceiver) UpdateCodecRids(mimeType mime.MimeType, rids buffer } func (t *MediaTrackReceiver) UpdateTrackInfo(ti *livekit.TrackInfo) { - updateMute := false clonedInfo := utils.CloneProto(ti) t.lock.Lock() @@ -893,16 +888,9 @@ func (t *MediaTrackReceiver) UpdateTrackInfo(ti *livekit.TrackInfo) { clonedInfo.Layers = ci.Layers } } - if trackInfo.Muted != clonedInfo.Muted { - updateMute = true - } t.trackInfo.Store(clonedInfo) t.lock.Unlock() - if updateMute { - t.SetMuted(clonedInfo.Muted) - } - t.updateTrackInfoOfReceivers() } diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index 7b979eae9..6623b3c59 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -196,8 +196,6 @@ type DummyReceiver struct { settingsLock sync.Mutex maxExpectedLayerValid bool maxExpectedLayer int32 - pausedValid bool - paused bool redReceiver, primaryReceiver *DummyRedReceiver } @@ -246,19 +244,12 @@ func (d *DummyReceiver) Upgrade(receiver sfu.TrackReceiver) { d.settingsLock.Lock() maxExpectedLayerValid := d.maxExpectedLayerValid d.maxExpectedLayerValid = false - - pausedValid := d.pausedValid - d.pausedValid = false d.settingsLock.Unlock() if maxExpectedLayerValid { receiver.SetMaxExpectedSpatialLayer(d.maxExpectedLayer) } - if pausedValid { - receiver.SetUpTrackPaused(d.paused) - } - d.settingsLock.Lock() if d.primaryReceiver != nil { d.primaryReceiver.upgrade(receiver) @@ -332,22 +323,6 @@ func (d *DummyReceiver) SendPLI(layer int32, force bool) { } } -func (d *DummyReceiver) SetUpTrackPaused(paused bool) { - d.settingsLock.Lock() - receiver := d.getReceiver() - if receiver != nil { - d.pausedValid = false - } else { - d.pausedValid = true - d.paused = paused - } - d.settingsLock.Unlock() - - if receiver != nil { - receiver.SetUpTrackPaused(paused) - } -} - func (d *DummyReceiver) SetMaxExpectedSpatialLayer(layer int32) { d.settingsLock.Lock() receiver := d.getReceiver() diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 68886a208..d2cae63a1 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -145,6 +145,8 @@ func NewWebRTCReceiver( mime.IsMimeTypeStringRED(codec.MimeType) || strings.Contains(strings.ToLower(codec.SDPFmtpLine), "useinbandfec=1"), ) + w.UpdateTrackInfo(trackInfo) + return w } @@ -195,10 +197,9 @@ func (w *WebRTCReceiver) AddUpTrack(track TrackRemote, buff *buffer.Buffer) erro return nil } -func (w *WebRTCReceiver) SetUpTrackPaused(paused bool) { - w.ReceiverBase.SetUpTrackPaused(paused) - - w.connectionStats.UpdateMute(paused) +func (w *WebRTCReceiver) UpdateTrackInfo(ti *livekit.TrackInfo) { + w.ReceiverBase.UpdateTrackInfo(ti) + w.connectionStats.UpdateMute(ti.GetMuted()) } func (w *WebRTCReceiver) notifyMaxExpectedLayer(layer int32) { diff --git a/pkg/sfu/receiver_base.go b/pkg/sfu/receiver_base.go index 1b9047ce7..e1f1a1169 100644 --- a/pkg/sfu/receiver_base.go +++ b/pkg/sfu/receiver_base.go @@ -116,7 +116,6 @@ type TrackReceiver interface { SendPLI(layer int32, force bool) - SetUpTrackPaused(paused bool) SetMaxExpectedSpatialLayer(layer int32) AddDownTrack(track TrackSender) error @@ -352,7 +351,15 @@ func (r *ReceiverBase) UpdateTrackInfo(ti *livekit.TrackInfo) { ) } r.trackInfo = utils.CloneProto(ti) - // MUTABLE-TRACKINFO-TODO: notify buffers, buffers may need to resize retransmission buffer if there is layer change + + paused := r.trackInfo.GetMuted() + for _, buff := range r.buffers { + if buff == nil { + continue + } + + buff.SetPaused(paused) + } r.bufferMu.Unlock() r.streamTrackerManager.UpdateTrackInfo(ti) @@ -540,23 +547,6 @@ func (r *ReceiverBase) StreamTrackerManager() *StreamTrackerManager { return r.streamTrackerManager } -// SetUpTrackPaused indicates upstream will not be sending any data. -// this will reflect the "muted" status and will pause streamtracker to ensure we don't turn off -// the layer -func (r *ReceiverBase) SetUpTrackPaused(paused bool) { - r.streamTrackerManager.SetPaused(paused) - - r.bufferMu.RLock() - for _, buff := range r.buffers { - if buff == nil { - continue - } - - buff.SetPaused(paused) - } - r.bufferMu.RUnlock() -} - func (r *ReceiverBase) AddDownTrack(track TrackSender) error { if r.IsClosed() { return ErrReceiverClosed @@ -760,13 +750,14 @@ func (r *ReceiverBase) GetOrCreateBuffer(layer int32) buffer.BufferProvider { r.bufferMu.Lock() r.buffers[layer] = buff rtt := r.rtt + paused := r.trackInfo.GetMuted() r.bufferMu.Unlock() - r.setupBuffer(buff, layer, rtt) + r.setupBuffer(buff, layer, rtt, paused) return buff } -func (r *ReceiverBase) setupBuffer(buff buffer.BufferProvider, layer int32, rtt uint32) { +func (r *ReceiverBase) setupBuffer(buff buffer.BufferProvider, layer int32, rtt uint32, paused bool) { buff.SetLogger(r.params.Logger.WithValues("layer", layer)) buff.SetAudioLevelConfig(r.audioConfig.AudioLevelConfig) buff.SetStreamRestartDetection(r.enableRTPStreamRestartDetection) @@ -818,16 +809,17 @@ func (r *ReceiverBase) setupBuffer(buff buffer.BufferProvider, layer int32, rtt } buff.SetRTT(rtt) - buff.SetPaused(r.streamTrackerManager.IsPaused()) + buff.SetPaused(paused) } func (r *ReceiverBase) AddBuffer(buff buffer.BufferProvider, layer int32) { r.bufferMu.Lock() r.buffers[layer] = buff rtt := r.rtt + paused := r.trackInfo.GetMuted() r.bufferMu.Unlock() - r.setupBuffer(buff, layer, rtt) + r.setupBuffer(buff, layer, rtt, paused) } func (r *ReceiverBase) StartBuffer(buff buffer.BufferProvider, layer int32) { diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 7e0347906..6eb28e5f2 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -20,7 +20,6 @@ import ( "time" "github.com/frostbyte73/core" - "go.uber.org/atomic" "github.com/livekit/protocol/codecs/mime" "github.com/livekit/protocol/livekit" @@ -112,7 +111,6 @@ var ( type StreamTrackerManager struct { logger logger.Logger - trackInfo atomic.Pointer[livekit.TrackInfo] mimeType mime.MimeType videoLayerMode livekit.VideoLayer_Mode clockRate uint32 @@ -120,6 +118,7 @@ type StreamTrackerManager struct { trackerConfig StreamTrackerConfig lock sync.RWMutex + trackInfo *livekit.TrackInfo maxPublishedLayer int32 maxTemporalLayerSeen int32 @@ -128,7 +127,6 @@ type StreamTrackerManager struct { availableLayers []int32 maxExpectedLayer int32 - paused bool closed core.Fuse @@ -146,11 +144,11 @@ func NewStreamTrackerManager( logger: logger, mimeType: mimeType, videoLayerMode: buffer.GetVideoLayerModeForMimeType(mimeType, trackInfo), + trackInfo: utils.CloneProto(trackInfo), maxPublishedLayer: buffer.InvalidLayerSpatial, maxTemporalLayerSeen: buffer.InvalidLayerTemporal, clockRate: clockRate, } - s.trackInfo.Store(utils.CloneProto(trackInfo)) switch trackInfo.Source { case livekit.TrackSource_SCREEN_SHARE: @@ -294,7 +292,7 @@ func (s *StreamTrackerManager) AddTracker(layer int32) streamtracker.StreamTrack }) s.lock.Lock() - paused := s.paused + paused := s.trackInfo.GetMuted() s.trackers[layer] = tracker notify := false @@ -337,8 +335,6 @@ func (s *StreamTrackerManager) RemoveAllTrackers() { s.maxExpectedLayer = buffer.InvalidLayerSpatial s.maxExpectedLayerFromTrackInfoLocked(true) - s.paused = false - ddTracker := s.ddTracker s.ddTracker = nil s.lock.Unlock() @@ -364,9 +360,8 @@ func (s *StreamTrackerManager) GetTracker(layer int32) streamtracker.StreamTrack return s.trackers[layer] } -func (s *StreamTrackerManager) SetPaused(paused bool) { +func (s *StreamTrackerManager) setPaused(paused bool) { s.lock.Lock() - s.paused = paused trackers := s.trackers s.lock.Unlock() @@ -377,16 +372,15 @@ func (s *StreamTrackerManager) SetPaused(paused bool) { } } -func (s *StreamTrackerManager) IsPaused() bool { - s.lock.RLock() - defer s.lock.RUnlock() - - return s.paused -} - func (s *StreamTrackerManager) UpdateTrackInfo(ti *livekit.TrackInfo) { - s.trackInfo.Store(utils.CloneProto(ti)) - s.maxExpectedLayerFromTrackInfo(false) + s.lock.Lock() + s.trackInfo = utils.CloneProto(ti) + s.maxExpectedLayerFromTrackInfoLocked(false) + + paused := s.trackInfo.GetMuted() + s.lock.Unlock() + + s.setPaused(paused) } func (s *StreamTrackerManager) SetMaxExpectedSpatialLayer(layer int32) int32 { @@ -425,7 +419,7 @@ func (s *StreamTrackerManager) DistanceToDesired() float64 { s.lock.RLock() defer s.lock.RUnlock() - if s.paused || s.maxExpectedLayer < 0 || s.maxTemporalLayerSeen < 0 { + if s.trackInfo.GetMuted() || s.maxExpectedLayer < 0 || s.maxTemporalLayerSeen < 0 { return 0 } @@ -594,9 +588,8 @@ func (s *StreamTrackerManager) maxExpectedLayerFromTrackInfo(force bool) { func (s *StreamTrackerManager) maxExpectedLayerFromTrackInfoLocked(force bool) { maxExpectedLayer := buffer.InvalidLayerSpatial - ti := s.trackInfo.Load() - if ti != nil { - for _, layer := range buffer.GetVideoLayersForMimeType(s.mimeType, ti) { + if s.trackInfo != nil { + for _, layer := range buffer.GetVideoLayersForMimeType(s.mimeType, s.trackInfo) { if layer.SpatialLayer > maxExpectedLayer { maxExpectedLayer = layer.SpatialLayer }