Use Muted in TrackInfo to propagated published track muted. (#4453)

* Use Muted in TrackInfo to propagated published track muted.

When the track is muted as a receiver is created, the receiver
potentially was not getting the muted property. That would result in
quality scorer expecting packets.

Use TrackInfo consistently for mute and apply the mute on start up of a
receiver.

* update mute of subscriptions
This commit is contained in:
Raja Subramanian
2026-04-16 01:03:40 +05:30
committed by GitHub
parent 69aa94797b
commit 3cfb71e7ca
6 changed files with 38 additions and 90 deletions

View File

@@ -193,7 +193,6 @@ func NewMediaTrack(params MediaTrackParams, ti *livekit.TrackInfo) *MediaTrack {
}
})
t.SetMuted(ti.Muted)
return t
}

View File

@@ -503,15 +503,9 @@ func (t *MediaTrackReceiver) SetMuted(muted bool) {
trackInfo := t.TrackInfoClone()
trackInfo.Muted = muted
t.trackInfo.Store(trackInfo)
receivers := t.receivers
t.lock.Unlock()
for _, receiver := range receivers {
receiver.SetUpTrackPaused(muted)
}
t.MediaTrackSubscriptions.SetMuted(muted)
t.updateTrackInfoOfReceivers()
}
func (t *MediaTrackReceiver) IsEncrypted() bool {
@@ -652,6 +646,8 @@ func (t *MediaTrackReceiver) updateTrackInfoOfReceivers() {
for _, r := range t.loadReceivers() {
r.UpdateTrackInfo(ti)
}
t.MediaTrackSubscriptions.SetMuted(ti.GetMuted())
}
func (t *MediaTrackReceiver) SetLayerSsrcsForRid(mimeType mime.MimeType, rid string, ssrc uint32, repairSSRC uint32) {
@@ -852,7 +848,6 @@ func (t *MediaTrackReceiver) UpdateCodecRids(mimeType mime.MimeType, rids buffer
}
func (t *MediaTrackReceiver) UpdateTrackInfo(ti *livekit.TrackInfo) {
updateMute := false
clonedInfo := utils.CloneProto(ti)
t.lock.Lock()
@@ -893,16 +888,9 @@ func (t *MediaTrackReceiver) UpdateTrackInfo(ti *livekit.TrackInfo) {
clonedInfo.Layers = ci.Layers
}
}
if trackInfo.Muted != clonedInfo.Muted {
updateMute = true
}
t.trackInfo.Store(clonedInfo)
t.lock.Unlock()
if updateMute {
t.SetMuted(clonedInfo.Muted)
}
t.updateTrackInfoOfReceivers()
}

View File

@@ -196,8 +196,6 @@ type DummyReceiver struct {
settingsLock sync.Mutex
maxExpectedLayerValid bool
maxExpectedLayer int32
pausedValid bool
paused bool
redReceiver, primaryReceiver *DummyRedReceiver
}
@@ -246,19 +244,12 @@ func (d *DummyReceiver) Upgrade(receiver sfu.TrackReceiver) {
d.settingsLock.Lock()
maxExpectedLayerValid := d.maxExpectedLayerValid
d.maxExpectedLayerValid = false
pausedValid := d.pausedValid
d.pausedValid = false
d.settingsLock.Unlock()
if maxExpectedLayerValid {
receiver.SetMaxExpectedSpatialLayer(d.maxExpectedLayer)
}
if pausedValid {
receiver.SetUpTrackPaused(d.paused)
}
d.settingsLock.Lock()
if d.primaryReceiver != nil {
d.primaryReceiver.upgrade(receiver)
@@ -332,22 +323,6 @@ func (d *DummyReceiver) SendPLI(layer int32, force bool) {
}
}
func (d *DummyReceiver) SetUpTrackPaused(paused bool) {
d.settingsLock.Lock()
receiver := d.getReceiver()
if receiver != nil {
d.pausedValid = false
} else {
d.pausedValid = true
d.paused = paused
}
d.settingsLock.Unlock()
if receiver != nil {
receiver.SetUpTrackPaused(paused)
}
}
func (d *DummyReceiver) SetMaxExpectedSpatialLayer(layer int32) {
d.settingsLock.Lock()
receiver := d.getReceiver()

View File

@@ -145,6 +145,8 @@ func NewWebRTCReceiver(
mime.IsMimeTypeStringRED(codec.MimeType) || strings.Contains(strings.ToLower(codec.SDPFmtpLine), "useinbandfec=1"),
)
w.UpdateTrackInfo(trackInfo)
return w
}
@@ -195,10 +197,9 @@ func (w *WebRTCReceiver) AddUpTrack(track TrackRemote, buff *buffer.Buffer) erro
return nil
}
func (w *WebRTCReceiver) SetUpTrackPaused(paused bool) {
w.ReceiverBase.SetUpTrackPaused(paused)
w.connectionStats.UpdateMute(paused)
func (w *WebRTCReceiver) UpdateTrackInfo(ti *livekit.TrackInfo) {
w.ReceiverBase.UpdateTrackInfo(ti)
w.connectionStats.UpdateMute(ti.GetMuted())
}
func (w *WebRTCReceiver) notifyMaxExpectedLayer(layer int32) {

View File

@@ -116,7 +116,6 @@ type TrackReceiver interface {
SendPLI(layer int32, force bool)
SetUpTrackPaused(paused bool)
SetMaxExpectedSpatialLayer(layer int32)
AddDownTrack(track TrackSender) error
@@ -352,7 +351,15 @@ func (r *ReceiverBase) UpdateTrackInfo(ti *livekit.TrackInfo) {
)
}
r.trackInfo = utils.CloneProto(ti)
// MUTABLE-TRACKINFO-TODO: notify buffers, buffers may need to resize retransmission buffer if there is layer change
paused := r.trackInfo.GetMuted()
for _, buff := range r.buffers {
if buff == nil {
continue
}
buff.SetPaused(paused)
}
r.bufferMu.Unlock()
r.streamTrackerManager.UpdateTrackInfo(ti)
@@ -540,23 +547,6 @@ func (r *ReceiverBase) StreamTrackerManager() *StreamTrackerManager {
return r.streamTrackerManager
}
// SetUpTrackPaused indicates upstream will not be sending any data.
// this will reflect the "muted" status and will pause streamtracker to ensure we don't turn off
// the layer
func (r *ReceiverBase) SetUpTrackPaused(paused bool) {
r.streamTrackerManager.SetPaused(paused)
r.bufferMu.RLock()
for _, buff := range r.buffers {
if buff == nil {
continue
}
buff.SetPaused(paused)
}
r.bufferMu.RUnlock()
}
func (r *ReceiverBase) AddDownTrack(track TrackSender) error {
if r.IsClosed() {
return ErrReceiverClosed
@@ -760,13 +750,14 @@ func (r *ReceiverBase) GetOrCreateBuffer(layer int32) buffer.BufferProvider {
r.bufferMu.Lock()
r.buffers[layer] = buff
rtt := r.rtt
paused := r.trackInfo.GetMuted()
r.bufferMu.Unlock()
r.setupBuffer(buff, layer, rtt)
r.setupBuffer(buff, layer, rtt, paused)
return buff
}
func (r *ReceiverBase) setupBuffer(buff buffer.BufferProvider, layer int32, rtt uint32) {
func (r *ReceiverBase) setupBuffer(buff buffer.BufferProvider, layer int32, rtt uint32, paused bool) {
buff.SetLogger(r.params.Logger.WithValues("layer", layer))
buff.SetAudioLevelConfig(r.audioConfig.AudioLevelConfig)
buff.SetStreamRestartDetection(r.enableRTPStreamRestartDetection)
@@ -818,16 +809,17 @@ func (r *ReceiverBase) setupBuffer(buff buffer.BufferProvider, layer int32, rtt
}
buff.SetRTT(rtt)
buff.SetPaused(r.streamTrackerManager.IsPaused())
buff.SetPaused(paused)
}
func (r *ReceiverBase) AddBuffer(buff buffer.BufferProvider, layer int32) {
r.bufferMu.Lock()
r.buffers[layer] = buff
rtt := r.rtt
paused := r.trackInfo.GetMuted()
r.bufferMu.Unlock()
r.setupBuffer(buff, layer, rtt)
r.setupBuffer(buff, layer, rtt, paused)
}
func (r *ReceiverBase) StartBuffer(buff buffer.BufferProvider, layer int32) {

View File

@@ -20,7 +20,6 @@ import (
"time"
"github.com/frostbyte73/core"
"go.uber.org/atomic"
"github.com/livekit/protocol/codecs/mime"
"github.com/livekit/protocol/livekit"
@@ -112,7 +111,6 @@ var (
type StreamTrackerManager struct {
logger logger.Logger
trackInfo atomic.Pointer[livekit.TrackInfo]
mimeType mime.MimeType
videoLayerMode livekit.VideoLayer_Mode
clockRate uint32
@@ -120,6 +118,7 @@ type StreamTrackerManager struct {
trackerConfig StreamTrackerConfig
lock sync.RWMutex
trackInfo *livekit.TrackInfo
maxPublishedLayer int32
maxTemporalLayerSeen int32
@@ -128,7 +127,6 @@ type StreamTrackerManager struct {
availableLayers []int32
maxExpectedLayer int32
paused bool
closed core.Fuse
@@ -146,11 +144,11 @@ func NewStreamTrackerManager(
logger: logger,
mimeType: mimeType,
videoLayerMode: buffer.GetVideoLayerModeForMimeType(mimeType, trackInfo),
trackInfo: utils.CloneProto(trackInfo),
maxPublishedLayer: buffer.InvalidLayerSpatial,
maxTemporalLayerSeen: buffer.InvalidLayerTemporal,
clockRate: clockRate,
}
s.trackInfo.Store(utils.CloneProto(trackInfo))
switch trackInfo.Source {
case livekit.TrackSource_SCREEN_SHARE:
@@ -294,7 +292,7 @@ func (s *StreamTrackerManager) AddTracker(layer int32) streamtracker.StreamTrack
})
s.lock.Lock()
paused := s.paused
paused := s.trackInfo.GetMuted()
s.trackers[layer] = tracker
notify := false
@@ -337,8 +335,6 @@ func (s *StreamTrackerManager) RemoveAllTrackers() {
s.maxExpectedLayer = buffer.InvalidLayerSpatial
s.maxExpectedLayerFromTrackInfoLocked(true)
s.paused = false
ddTracker := s.ddTracker
s.ddTracker = nil
s.lock.Unlock()
@@ -364,9 +360,8 @@ func (s *StreamTrackerManager) GetTracker(layer int32) streamtracker.StreamTrack
return s.trackers[layer]
}
func (s *StreamTrackerManager) SetPaused(paused bool) {
func (s *StreamTrackerManager) setPaused(paused bool) {
s.lock.Lock()
s.paused = paused
trackers := s.trackers
s.lock.Unlock()
@@ -377,16 +372,15 @@ func (s *StreamTrackerManager) SetPaused(paused bool) {
}
}
func (s *StreamTrackerManager) IsPaused() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.paused
}
func (s *StreamTrackerManager) UpdateTrackInfo(ti *livekit.TrackInfo) {
s.trackInfo.Store(utils.CloneProto(ti))
s.maxExpectedLayerFromTrackInfo(false)
s.lock.Lock()
s.trackInfo = utils.CloneProto(ti)
s.maxExpectedLayerFromTrackInfoLocked(false)
paused := s.trackInfo.GetMuted()
s.lock.Unlock()
s.setPaused(paused)
}
func (s *StreamTrackerManager) SetMaxExpectedSpatialLayer(layer int32) int32 {
@@ -425,7 +419,7 @@ func (s *StreamTrackerManager) DistanceToDesired() float64 {
s.lock.RLock()
defer s.lock.RUnlock()
if s.paused || s.maxExpectedLayer < 0 || s.maxTemporalLayerSeen < 0 {
if s.trackInfo.GetMuted() || s.maxExpectedLayer < 0 || s.maxTemporalLayerSeen < 0 {
return 0
}
@@ -594,9 +588,8 @@ func (s *StreamTrackerManager) maxExpectedLayerFromTrackInfo(force bool) {
func (s *StreamTrackerManager) maxExpectedLayerFromTrackInfoLocked(force bool) {
maxExpectedLayer := buffer.InvalidLayerSpatial
ti := s.trackInfo.Load()
if ti != nil {
for _, layer := range buffer.GetVideoLayersForMimeType(s.mimeType, ti) {
if s.trackInfo != nil {
for _, layer := range buffer.GetVideoLayersForMimeType(s.mimeType, s.trackInfo) {
if layer.SpatialLayer > maxExpectedLayer {
maxExpectedLayer = layer.SpatialLayer
}