From 2192b0fc8d3fab5beca4e6524653c6bb3c38739d Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 10 Aug 2022 11:30:49 +0530 Subject: [PATCH] Limit dynacast to video and media loss proxy to audio (#902) * Limit dynacast to video and media loss proxy to audio Was looking at keeping the track type out of those modules and do a check at a higher level, but it is a bit unwieldy. So, adding checks to the modules. Also, ensuring that media loss proxy does not reset unconditionally every second. Audio RTCP happens once in 5 seconds or so. So, if server proxied let say 2% at t = 5, t = 6 would have proxied 0 loss which may or may not be true. So, ensure that a report was received and proxy value is updated by an actual report. * Remove track type from modules --- pkg/rtc/medialossproxy.go | 12 ++++-- pkg/rtc/mediatrack.go | 84 +++++++++++++++++++++++---------------- 2 files changed, 57 insertions(+), 39 deletions(-) diff --git a/pkg/rtc/medialossproxy.go b/pkg/rtc/medialossproxy.go index ec47092fc..4b25d479a 100644 --- a/pkg/rtc/medialossproxy.go +++ b/pkg/rtc/medialossproxy.go @@ -23,9 +23,10 @@ type MediaLossProxyParams struct { type MediaLossProxy struct { params MediaLossProxyParams - lock sync.Mutex - maxDownFracLost uint8 - maxDownFracLostTs time.Time + lock sync.Mutex + maxDownFracLost uint8 + maxDownFracLostTs time.Time + maxDownFracLostValid bool onMediaLossUpdate func(fractionalLoss uint8) } @@ -43,6 +44,7 @@ func (m *MediaLossProxy) OnMediaLossUpdate(f func(fractionalLoss uint8)) { func (m *MediaLossProxy) HandleMaxLossFeedback(_ *sfu.DownTrack, report *rtcp.ReceiverReport) { m.lock.Lock() for _, rr := range report.Reports { + m.maxDownFracLostValid = true if m.maxDownFracLost < rr.FractionLost { m.maxDownFracLost = rr.FractionLost } @@ -54,6 +56,7 @@ func (m *MediaLossProxy) HandleMaxLossFeedback(_ *sfu.DownTrack, report *rtcp.Re func (m *MediaLossProxy) NotifySubscriberNodeMediaLoss(_nodeID livekit.NodeID, fractionalLoss uint8) { m.lock.Lock() + m.maxDownFracLostValid = true if m.maxDownFracLost < fractionalLoss { m.maxDownFracLost = fractionalLoss } @@ -70,11 +73,12 @@ func (m *MediaLossProxy) maybeUpdateLoss() { m.lock.Lock() now := time.Now() - if now.Sub(m.maxDownFracLostTs) > downLostUpdateDelta { + if now.Sub(m.maxDownFracLostTs) > downLostUpdateDelta && m.maxDownFracLostValid { shouldUpdate = true maxLost = m.maxDownFracLost m.maxDownFracLost = 0 m.maxDownFracLostTs = now + m.maxDownFracLostValid = false } onMediaLossUpdate := m.onMediaLossUpdate m.lock.Unlock() diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 221cc393b..3bfc96639 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -60,10 +60,6 @@ type MediaTrackParams struct { func NewMediaTrack(params MediaTrackParams) *MediaTrack { t := &MediaTrack{ params: params, - dynacastManager: NewDynacastManager(DynacastManagerParams{ - DynacastPauseDelay: params.VideoConfig.DynacastPauseDelay, - Logger: params.Logger, - }), } t.MediaTrackReceiver = NewMediaTrackReceiver(MediaTrackReceiverParams{ @@ -89,43 +85,55 @@ func NewMediaTrack(params MediaTrackParams) *MediaTrack { }) }) - t.MediaLossProxy = NewMediaLossProxy(MediaLossProxyParams{ - Logger: params.Logger, - }) - t.MediaLossProxy.OnMediaLossUpdate(func(fractionalLoss uint8) { - if t.buffer != nil && t.Kind() == livekit.TrackType_AUDIO { - // ok to access buffer since receivers are added before subscribers - t.buffer.SetLastFractionLostReport(fractionalLoss) - } - }) + if params.TrackInfo.Type == livekit.TrackType_AUDIO { + t.MediaLossProxy = NewMediaLossProxy(MediaLossProxyParams{ + Logger: params.Logger, + }) + t.MediaLossProxy.OnMediaLossUpdate(func(fractionalLoss uint8) { + if t.buffer != nil { + // ok to access buffer since receivers are added before subscribers + t.buffer.SetLastFractionLostReport(fractionalLoss) + } + }) + t.MediaTrackReceiver.OnMediaLossFeedback(t.MediaLossProxy.HandleMaxLossFeedback) + } - t.MediaTrackReceiver.OnSetupReceiver(func(mime string) { - t.dynacastManager.AddCodec(mime) - }) - t.MediaTrackReceiver.OnSubscriberMaxQualityChange(func(subscriberID livekit.ParticipantID, codec webrtc.RTPCodecCapability, layer int32) { - t.dynacastManager.NotifySubscriberMaxQuality(subscriberID, codec.MimeType, utils.QualityForSpatialLayer(layer)) - }) - t.MediaTrackReceiver.OnMediaLossFeedback(t.MediaLossProxy.HandleMaxLossFeedback) + if params.TrackInfo.Type == livekit.TrackType_VIDEO { + t.dynacastManager = NewDynacastManager(DynacastManagerParams{ + DynacastPauseDelay: params.VideoConfig.DynacastPauseDelay, + Logger: params.Logger, + }) + t.MediaTrackReceiver.OnSetupReceiver(func(mime string) { + t.dynacastManager.AddCodec(mime) + }) + t.MediaTrackReceiver.OnSubscriberMaxQualityChange(func(subscriberID livekit.ParticipantID, codec webrtc.RTPCodecCapability, layer int32) { + t.dynacastManager.NotifySubscriberMaxQuality(subscriberID, codec.MimeType, utils.QualityForSpatialLayer(layer)) + }) + } return t } func (t *MediaTrack) OnSubscribedMaxQualityChange(f func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) error) { - t.dynacastManager.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) { - if f != nil && !t.IsMuted() { - _ = f(t.ID(), subscribedQualities, maxSubscribedQualities) - } - for _, q := range maxSubscribedQualities { - receiver := t.Receiver(q.CodecMime) - if receiver != nil { - receiver.SetMaxExpectedSpatialLayer(utils.SpatialLayerForQuality(q.Quality)) + if t.dynacastManager != nil { + t.dynacastManager.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) { + if f != nil && !t.IsMuted() { + _ = f(t.ID(), subscribedQualities, maxSubscribedQualities) } - } - }) + for _, q := range maxSubscribedQualities { + receiver := t.Receiver(q.CodecMime) + if receiver != nil { + receiver.SetMaxExpectedSpatialLayer(utils.SpatialLayerForQuality(q.Quality)) + } + } + }) + } } func (t *MediaTrack) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualities []types.SubscribedCodecQuality) { - t.dynacastManager.NotifySubscriberNodeMaxQuality(nodeID, qualities) + if t.dynacastManager != nil { + t.dynacastManager.NotifySubscriberNodeMaxQuality(nodeID, qualities) + } } func (t *MediaTrack) SignalCid() string { @@ -220,7 +228,9 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra t.RemoveAllSubscribers(false) t.MediaTrackReceiver.ClearReceiver(mime) if t.MediaTrackReceiver.TryClose() { - t.dynacastManager.Close() + if t.dynacastManager != nil { + t.dynacastManager.Close() + } t.params.Telemetry.TrackUnpublished( context.Background(), t.PublisherID(), @@ -323,11 +333,15 @@ func (t *MediaTrack) OnMaxLayerChange(maxLayer int32) { func (t *MediaTrack) Restart() { t.MediaTrackReceiver.Restart() - t.dynacastManager.Restart() + if t.dynacastManager != nil { + t.dynacastManager.Restart() + } } func (t *MediaTrack) Close() { - t.dynacastManager.Close() + if t.dynacastManager != nil { + t.dynacastManager.Close() + } t.MediaTrackReceiver.Close() } @@ -336,7 +350,7 @@ func (t *MediaTrack) SetMuted(muted bool) { // update quality based on subscription if unmuting. // This will queue up the current state, but subscriber // driven changes could update it. - if !muted { + if !muted && t.dynacastManager != nil { t.dynacastManager.ForceUpdate() }