Limit dynacast to video and media loss proxy to audio (#902)

* Limit dynacast to video and media loss proxy to audio

Was looking at keeping the track type out of those modules
and do a check at a higher level, but it is a bit unwieldy.
So, adding checks to the modules.

Also, ensuring that media loss proxy does not reset unconditionally
every second. Audio RTCP happens once in 5 seconds or so.
So, if server proxied let say 2% at t = 5, t = 6 would have
proxied 0 loss which may or may not be true. So, ensure that
a report was received and proxy value is updated by an actual
report.

* Remove track type from modules
This commit is contained in:
Raja Subramanian
2022-08-10 11:30:49 +05:30
committed by GitHub
parent 49cf15cdca
commit 2192b0fc8d
2 changed files with 57 additions and 39 deletions

View File

@@ -23,9 +23,10 @@ type MediaLossProxyParams struct {
type MediaLossProxy struct {
params MediaLossProxyParams
lock sync.Mutex
maxDownFracLost uint8
maxDownFracLostTs time.Time
lock sync.Mutex
maxDownFracLost uint8
maxDownFracLostTs time.Time
maxDownFracLostValid bool
onMediaLossUpdate func(fractionalLoss uint8)
}
@@ -43,6 +44,7 @@ func (m *MediaLossProxy) OnMediaLossUpdate(f func(fractionalLoss uint8)) {
func (m *MediaLossProxy) HandleMaxLossFeedback(_ *sfu.DownTrack, report *rtcp.ReceiverReport) {
m.lock.Lock()
for _, rr := range report.Reports {
m.maxDownFracLostValid = true
if m.maxDownFracLost < rr.FractionLost {
m.maxDownFracLost = rr.FractionLost
}
@@ -54,6 +56,7 @@ func (m *MediaLossProxy) HandleMaxLossFeedback(_ *sfu.DownTrack, report *rtcp.Re
func (m *MediaLossProxy) NotifySubscriberNodeMediaLoss(_nodeID livekit.NodeID, fractionalLoss uint8) {
m.lock.Lock()
m.maxDownFracLostValid = true
if m.maxDownFracLost < fractionalLoss {
m.maxDownFracLost = fractionalLoss
}
@@ -70,11 +73,12 @@ func (m *MediaLossProxy) maybeUpdateLoss() {
m.lock.Lock()
now := time.Now()
if now.Sub(m.maxDownFracLostTs) > downLostUpdateDelta {
if now.Sub(m.maxDownFracLostTs) > downLostUpdateDelta && m.maxDownFracLostValid {
shouldUpdate = true
maxLost = m.maxDownFracLost
m.maxDownFracLost = 0
m.maxDownFracLostTs = now
m.maxDownFracLostValid = false
}
onMediaLossUpdate := m.onMediaLossUpdate
m.lock.Unlock()

View File

@@ -60,10 +60,6 @@ type MediaTrackParams struct {
func NewMediaTrack(params MediaTrackParams) *MediaTrack {
t := &MediaTrack{
params: params,
dynacastManager: NewDynacastManager(DynacastManagerParams{
DynacastPauseDelay: params.VideoConfig.DynacastPauseDelay,
Logger: params.Logger,
}),
}
t.MediaTrackReceiver = NewMediaTrackReceiver(MediaTrackReceiverParams{
@@ -89,43 +85,55 @@ func NewMediaTrack(params MediaTrackParams) *MediaTrack {
})
})
t.MediaLossProxy = NewMediaLossProxy(MediaLossProxyParams{
Logger: params.Logger,
})
t.MediaLossProxy.OnMediaLossUpdate(func(fractionalLoss uint8) {
if t.buffer != nil && t.Kind() == livekit.TrackType_AUDIO {
// ok to access buffer since receivers are added before subscribers
t.buffer.SetLastFractionLostReport(fractionalLoss)
}
})
if params.TrackInfo.Type == livekit.TrackType_AUDIO {
t.MediaLossProxy = NewMediaLossProxy(MediaLossProxyParams{
Logger: params.Logger,
})
t.MediaLossProxy.OnMediaLossUpdate(func(fractionalLoss uint8) {
if t.buffer != nil {
// ok to access buffer since receivers are added before subscribers
t.buffer.SetLastFractionLostReport(fractionalLoss)
}
})
t.MediaTrackReceiver.OnMediaLossFeedback(t.MediaLossProxy.HandleMaxLossFeedback)
}
t.MediaTrackReceiver.OnSetupReceiver(func(mime string) {
t.dynacastManager.AddCodec(mime)
})
t.MediaTrackReceiver.OnSubscriberMaxQualityChange(func(subscriberID livekit.ParticipantID, codec webrtc.RTPCodecCapability, layer int32) {
t.dynacastManager.NotifySubscriberMaxQuality(subscriberID, codec.MimeType, utils.QualityForSpatialLayer(layer))
})
t.MediaTrackReceiver.OnMediaLossFeedback(t.MediaLossProxy.HandleMaxLossFeedback)
if params.TrackInfo.Type == livekit.TrackType_VIDEO {
t.dynacastManager = NewDynacastManager(DynacastManagerParams{
DynacastPauseDelay: params.VideoConfig.DynacastPauseDelay,
Logger: params.Logger,
})
t.MediaTrackReceiver.OnSetupReceiver(func(mime string) {
t.dynacastManager.AddCodec(mime)
})
t.MediaTrackReceiver.OnSubscriberMaxQualityChange(func(subscriberID livekit.ParticipantID, codec webrtc.RTPCodecCapability, layer int32) {
t.dynacastManager.NotifySubscriberMaxQuality(subscriberID, codec.MimeType, utils.QualityForSpatialLayer(layer))
})
}
return t
}
func (t *MediaTrack) OnSubscribedMaxQualityChange(f func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) error) {
t.dynacastManager.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) {
if f != nil && !t.IsMuted() {
_ = f(t.ID(), subscribedQualities, maxSubscribedQualities)
}
for _, q := range maxSubscribedQualities {
receiver := t.Receiver(q.CodecMime)
if receiver != nil {
receiver.SetMaxExpectedSpatialLayer(utils.SpatialLayerForQuality(q.Quality))
if t.dynacastManager != nil {
t.dynacastManager.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) {
if f != nil && !t.IsMuted() {
_ = f(t.ID(), subscribedQualities, maxSubscribedQualities)
}
}
})
for _, q := range maxSubscribedQualities {
receiver := t.Receiver(q.CodecMime)
if receiver != nil {
receiver.SetMaxExpectedSpatialLayer(utils.SpatialLayerForQuality(q.Quality))
}
}
})
}
}
func (t *MediaTrack) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualities []types.SubscribedCodecQuality) {
t.dynacastManager.NotifySubscriberNodeMaxQuality(nodeID, qualities)
if t.dynacastManager != nil {
t.dynacastManager.NotifySubscriberNodeMaxQuality(nodeID, qualities)
}
}
func (t *MediaTrack) SignalCid() string {
@@ -220,7 +228,9 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
t.RemoveAllSubscribers(false)
t.MediaTrackReceiver.ClearReceiver(mime)
if t.MediaTrackReceiver.TryClose() {
t.dynacastManager.Close()
if t.dynacastManager != nil {
t.dynacastManager.Close()
}
t.params.Telemetry.TrackUnpublished(
context.Background(),
t.PublisherID(),
@@ -323,11 +333,15 @@ func (t *MediaTrack) OnMaxLayerChange(maxLayer int32) {
func (t *MediaTrack) Restart() {
t.MediaTrackReceiver.Restart()
t.dynacastManager.Restart()
if t.dynacastManager != nil {
t.dynacastManager.Restart()
}
}
func (t *MediaTrack) Close() {
t.dynacastManager.Close()
if t.dynacastManager != nil {
t.dynacastManager.Close()
}
t.MediaTrackReceiver.Close()
}
@@ -336,7 +350,7 @@ func (t *MediaTrack) SetMuted(muted bool) {
// update quality based on subscription if unmuting.
// This will queue up the current state, but subscriber
// driven changes could update it.
if !muted {
if !muted && t.dynacastManager != nil {
t.dynacastManager.ForceUpdate()
}