From 3cfb71e7ca4756a6b723dc2d40a5debf97ccf982 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 16 Apr 2026 01:03:40 +0530 Subject: [PATCH] Use Muted in TrackInfo to propagated published track muted. (#4453) * Use Muted in TrackInfo to propagated published track muted. When the track is muted as a receiver is created, the receiver potentially was not getting the muted property. That would result in quality scorer expecting packets. Use TrackInfo consistently for mute and apply the mute on start up of a receiver. * update mute of subscriptions --- pkg/rtc/mediatrack.go | 1 - pkg/rtc/mediatrackreceiver.go | 18 +++------------- pkg/rtc/wrappedreceiver.go | 25 ---------------------- pkg/sfu/receiver.go | 9 ++++---- pkg/sfu/receiver_base.go | 38 +++++++++++++-------------------- pkg/sfu/streamtrackermanager.go | 37 +++++++++++++------------------- 6 files changed, 38 insertions(+), 90 deletions(-) diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 886b84506..b1337c24e 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -193,7 +193,6 @@ func NewMediaTrack(params MediaTrackParams, ti *livekit.TrackInfo) *MediaTrack { } }) - t.SetMuted(ti.Muted) return t } diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 04af89215..9c896b644 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -503,15 +503,9 @@ func (t *MediaTrackReceiver) SetMuted(muted bool) { trackInfo := t.TrackInfoClone() trackInfo.Muted = muted t.trackInfo.Store(trackInfo) - - receivers := t.receivers t.lock.Unlock() - for _, receiver := range receivers { - receiver.SetUpTrackPaused(muted) - } - - t.MediaTrackSubscriptions.SetMuted(muted) + t.updateTrackInfoOfReceivers() } func (t *MediaTrackReceiver) IsEncrypted() bool { @@ -652,6 +646,8 @@ func (t *MediaTrackReceiver) updateTrackInfoOfReceivers() { for _, r := range t.loadReceivers() { r.UpdateTrackInfo(ti) } + + t.MediaTrackSubscriptions.SetMuted(ti.GetMuted()) } func (t *MediaTrackReceiver) SetLayerSsrcsForRid(mimeType mime.MimeType, rid string, ssrc uint32, repairSSRC uint32) { @@ -852,7 +848,6 @@ func (t *MediaTrackReceiver) UpdateCodecRids(mimeType mime.MimeType, rids buffer } func (t *MediaTrackReceiver) UpdateTrackInfo(ti *livekit.TrackInfo) { - updateMute := false clonedInfo := utils.CloneProto(ti) t.lock.Lock() @@ -893,16 +888,9 @@ func (t *MediaTrackReceiver) UpdateTrackInfo(ti *livekit.TrackInfo) { clonedInfo.Layers = ci.Layers } } - if trackInfo.Muted != clonedInfo.Muted { - updateMute = true - } t.trackInfo.Store(clonedInfo) t.lock.Unlock() - if updateMute { - t.SetMuted(clonedInfo.Muted) - } - t.updateTrackInfoOfReceivers() } diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index 7b979eae9..6623b3c59 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -196,8 +196,6 @@ type DummyReceiver struct { settingsLock sync.Mutex maxExpectedLayerValid bool maxExpectedLayer int32 - pausedValid bool - paused bool redReceiver, primaryReceiver *DummyRedReceiver } @@ -246,19 +244,12 @@ func (d *DummyReceiver) Upgrade(receiver sfu.TrackReceiver) { d.settingsLock.Lock() maxExpectedLayerValid := d.maxExpectedLayerValid d.maxExpectedLayerValid = false - - pausedValid := d.pausedValid - d.pausedValid = false d.settingsLock.Unlock() if maxExpectedLayerValid { receiver.SetMaxExpectedSpatialLayer(d.maxExpectedLayer) } - if pausedValid { - receiver.SetUpTrackPaused(d.paused) - } - d.settingsLock.Lock() if d.primaryReceiver != nil { d.primaryReceiver.upgrade(receiver) @@ -332,22 +323,6 @@ func (d *DummyReceiver) SendPLI(layer int32, force bool) { } } -func (d *DummyReceiver) SetUpTrackPaused(paused bool) { - d.settingsLock.Lock() - receiver := d.getReceiver() - if receiver != nil { - d.pausedValid = false - } else { - d.pausedValid = true - d.paused = paused - } - d.settingsLock.Unlock() - - if receiver != nil { - receiver.SetUpTrackPaused(paused) - } -} - func (d *DummyReceiver) SetMaxExpectedSpatialLayer(layer int32) { d.settingsLock.Lock() receiver := d.getReceiver() diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 68886a208..d2cae63a1 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -145,6 +145,8 @@ func NewWebRTCReceiver( mime.IsMimeTypeStringRED(codec.MimeType) || strings.Contains(strings.ToLower(codec.SDPFmtpLine), "useinbandfec=1"), ) + w.UpdateTrackInfo(trackInfo) + return w } @@ -195,10 +197,9 @@ func (w *WebRTCReceiver) AddUpTrack(track TrackRemote, buff *buffer.Buffer) erro return nil } -func (w *WebRTCReceiver) SetUpTrackPaused(paused bool) { - w.ReceiverBase.SetUpTrackPaused(paused) - - w.connectionStats.UpdateMute(paused) +func (w *WebRTCReceiver) UpdateTrackInfo(ti *livekit.TrackInfo) { + w.ReceiverBase.UpdateTrackInfo(ti) + w.connectionStats.UpdateMute(ti.GetMuted()) } func (w *WebRTCReceiver) notifyMaxExpectedLayer(layer int32) { diff --git a/pkg/sfu/receiver_base.go b/pkg/sfu/receiver_base.go index 1b9047ce7..e1f1a1169 100644 --- a/pkg/sfu/receiver_base.go +++ b/pkg/sfu/receiver_base.go @@ -116,7 +116,6 @@ type TrackReceiver interface { SendPLI(layer int32, force bool) - SetUpTrackPaused(paused bool) SetMaxExpectedSpatialLayer(layer int32) AddDownTrack(track TrackSender) error @@ -352,7 +351,15 @@ func (r *ReceiverBase) UpdateTrackInfo(ti *livekit.TrackInfo) { ) } r.trackInfo = utils.CloneProto(ti) - // MUTABLE-TRACKINFO-TODO: notify buffers, buffers may need to resize retransmission buffer if there is layer change + + paused := r.trackInfo.GetMuted() + for _, buff := range r.buffers { + if buff == nil { + continue + } + + buff.SetPaused(paused) + } r.bufferMu.Unlock() r.streamTrackerManager.UpdateTrackInfo(ti) @@ -540,23 +547,6 @@ func (r *ReceiverBase) StreamTrackerManager() *StreamTrackerManager { return r.streamTrackerManager } -// SetUpTrackPaused indicates upstream will not be sending any data. -// this will reflect the "muted" status and will pause streamtracker to ensure we don't turn off -// the layer -func (r *ReceiverBase) SetUpTrackPaused(paused bool) { - r.streamTrackerManager.SetPaused(paused) - - r.bufferMu.RLock() - for _, buff := range r.buffers { - if buff == nil { - continue - } - - buff.SetPaused(paused) - } - r.bufferMu.RUnlock() -} - func (r *ReceiverBase) AddDownTrack(track TrackSender) error { if r.IsClosed() { return ErrReceiverClosed @@ -760,13 +750,14 @@ func (r *ReceiverBase) GetOrCreateBuffer(layer int32) buffer.BufferProvider { r.bufferMu.Lock() r.buffers[layer] = buff rtt := r.rtt + paused := r.trackInfo.GetMuted() r.bufferMu.Unlock() - r.setupBuffer(buff, layer, rtt) + r.setupBuffer(buff, layer, rtt, paused) return buff } -func (r *ReceiverBase) setupBuffer(buff buffer.BufferProvider, layer int32, rtt uint32) { +func (r *ReceiverBase) setupBuffer(buff buffer.BufferProvider, layer int32, rtt uint32, paused bool) { buff.SetLogger(r.params.Logger.WithValues("layer", layer)) buff.SetAudioLevelConfig(r.audioConfig.AudioLevelConfig) buff.SetStreamRestartDetection(r.enableRTPStreamRestartDetection) @@ -818,16 +809,17 @@ func (r *ReceiverBase) setupBuffer(buff buffer.BufferProvider, layer int32, rtt } buff.SetRTT(rtt) - buff.SetPaused(r.streamTrackerManager.IsPaused()) + buff.SetPaused(paused) } func (r *ReceiverBase) AddBuffer(buff buffer.BufferProvider, layer int32) { r.bufferMu.Lock() r.buffers[layer] = buff rtt := r.rtt + paused := r.trackInfo.GetMuted() r.bufferMu.Unlock() - r.setupBuffer(buff, layer, rtt) + r.setupBuffer(buff, layer, rtt, paused) } func (r *ReceiverBase) StartBuffer(buff buffer.BufferProvider, layer int32) { diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 7e0347906..6eb28e5f2 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -20,7 +20,6 @@ import ( "time" "github.com/frostbyte73/core" - "go.uber.org/atomic" "github.com/livekit/protocol/codecs/mime" "github.com/livekit/protocol/livekit" @@ -112,7 +111,6 @@ var ( type StreamTrackerManager struct { logger logger.Logger - trackInfo atomic.Pointer[livekit.TrackInfo] mimeType mime.MimeType videoLayerMode livekit.VideoLayer_Mode clockRate uint32 @@ -120,6 +118,7 @@ type StreamTrackerManager struct { trackerConfig StreamTrackerConfig lock sync.RWMutex + trackInfo *livekit.TrackInfo maxPublishedLayer int32 maxTemporalLayerSeen int32 @@ -128,7 +127,6 @@ type StreamTrackerManager struct { availableLayers []int32 maxExpectedLayer int32 - paused bool closed core.Fuse @@ -146,11 +144,11 @@ func NewStreamTrackerManager( logger: logger, mimeType: mimeType, videoLayerMode: buffer.GetVideoLayerModeForMimeType(mimeType, trackInfo), + trackInfo: utils.CloneProto(trackInfo), maxPublishedLayer: buffer.InvalidLayerSpatial, maxTemporalLayerSeen: buffer.InvalidLayerTemporal, clockRate: clockRate, } - s.trackInfo.Store(utils.CloneProto(trackInfo)) switch trackInfo.Source { case livekit.TrackSource_SCREEN_SHARE: @@ -294,7 +292,7 @@ func (s *StreamTrackerManager) AddTracker(layer int32) streamtracker.StreamTrack }) s.lock.Lock() - paused := s.paused + paused := s.trackInfo.GetMuted() s.trackers[layer] = tracker notify := false @@ -337,8 +335,6 @@ func (s *StreamTrackerManager) RemoveAllTrackers() { s.maxExpectedLayer = buffer.InvalidLayerSpatial s.maxExpectedLayerFromTrackInfoLocked(true) - s.paused = false - ddTracker := s.ddTracker s.ddTracker = nil s.lock.Unlock() @@ -364,9 +360,8 @@ func (s *StreamTrackerManager) GetTracker(layer int32) streamtracker.StreamTrack return s.trackers[layer] } -func (s *StreamTrackerManager) SetPaused(paused bool) { +func (s *StreamTrackerManager) setPaused(paused bool) { s.lock.Lock() - s.paused = paused trackers := s.trackers s.lock.Unlock() @@ -377,16 +372,15 @@ func (s *StreamTrackerManager) SetPaused(paused bool) { } } -func (s *StreamTrackerManager) IsPaused() bool { - s.lock.RLock() - defer s.lock.RUnlock() - - return s.paused -} - func (s *StreamTrackerManager) UpdateTrackInfo(ti *livekit.TrackInfo) { - s.trackInfo.Store(utils.CloneProto(ti)) - s.maxExpectedLayerFromTrackInfo(false) + s.lock.Lock() + s.trackInfo = utils.CloneProto(ti) + s.maxExpectedLayerFromTrackInfoLocked(false) + + paused := s.trackInfo.GetMuted() + s.lock.Unlock() + + s.setPaused(paused) } func (s *StreamTrackerManager) SetMaxExpectedSpatialLayer(layer int32) int32 { @@ -425,7 +419,7 @@ func (s *StreamTrackerManager) DistanceToDesired() float64 { s.lock.RLock() defer s.lock.RUnlock() - if s.paused || s.maxExpectedLayer < 0 || s.maxTemporalLayerSeen < 0 { + if s.trackInfo.GetMuted() || s.maxExpectedLayer < 0 || s.maxTemporalLayerSeen < 0 { return 0 } @@ -594,9 +588,8 @@ func (s *StreamTrackerManager) maxExpectedLayerFromTrackInfo(force bool) { func (s *StreamTrackerManager) maxExpectedLayerFromTrackInfoLocked(force bool) { maxExpectedLayer := buffer.InvalidLayerSpatial - ti := s.trackInfo.Load() - if ti != nil { - for _, layer := range buffer.GetVideoLayersForMimeType(s.mimeType, ti) { + if s.trackInfo != nil { + for _, layer := range buffer.GetVideoLayersForMimeType(s.mimeType, s.trackInfo) { if layer.SpatialLayer > maxExpectedLayer { maxExpectedLayer = layer.SpatialLayer }