From d7750a60ec566bc4f8c4bc24eee6f99baa978d38 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 23 Mar 2023 17:23:14 +0530 Subject: [PATCH] Unify the forwarder between dependency descriptor and no DD case. (#1543) --- pkg/sfu/downtrack.go | 8 +- pkg/sfu/forwarder.go | 180 ++++++++++++++++++++----------------------- pkg/sfu/receiver.go | 11 +-- 3 files changed, 92 insertions(+), 107 deletions(-) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index e1469d4df..9a4507fae 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1277,10 +1277,10 @@ func (d *DownTrack) handleRTCP(bytes []byte) { pliOnce := true sendPliOnce := func() { if pliOnce { - targetLayers := d.forwarder.TargetLayers() - if targetLayers != InvalidLayers && !d.forwarder.IsAnyMuted() { - d.logger.Debugw("sending PLI RTCP", "layer", targetLayers.Spatial) - d.receiver.SendPLI(targetLayers.Spatial, false) + _, layer := d.forwarder.CheckSync() + if layer != InvalidLayerSpatial && !d.forwarder.IsAnyMuted() { + d.logger.Debugw("sending PLI RTCP", "layer", layer) + d.receiver.SendPLI(layer, false) d.isNACKThrottled.Store(true) d.rtpStats.UpdatePliTime() pliOnce = false diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 27266091c..69aab504c 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1480,94 +1480,37 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in f.rtpMunger.UpdateAndGetSnTs(extPkt) // call to update highest incoming sequence number and other internal structures f.rtpMunger.PacketDropped(extPkt) return tp, nil - } else if f.targetLayers.Spatial != f.currentLayers.Spatial && f.targetLayers.Spatial == layer && (extPkt.KeyFrame || tp.isSwitchingToTargetLayer) { - // lock to target layer - f.logger.Infow( - "locking to target layer", - "current", f.currentLayers, - "target", f.targetLayers, - "req", f.requestLayerSpatial, - "feed", extPkt.Packet.SSRC, - ) - f.currentLayers.Spatial = f.targetLayers.Spatial - if !f.isTemporalSupported { - f.currentLayers.Temporal = f.targetLayers.Temporal - } - // TODO : we switch to target layer immediately now since we assume all frame chain is integrity - // if we have frame chain check, should switch only if target chain is not broken and decodable - // if f.ddLayerSelector != nil { - // f.ddLayerSelector.SelectLayer(f.currentLayers) - // } - if f.currentLayers.Spatial >= f.maxLayers.Spatial { - tp.isSwitchingToMaxLayer = true - } } - } else { - if f.currentLayers.Spatial != f.targetLayers.Spatial { - // Three things to check when not locked to target - // 1. Resumable layer - don't need a key frame - // 2. Opportunistic layer upgrade - needs a key frame - // 3. Need to downgrade - needs a key frame - found := false - if f.parkedLayers.IsValid() { - if f.parkedLayers.Spatial == layer { - f.logger.Infow( - "resuming at parked layer", - "current", f.currentLayers, - "target", f.targetLayers, - "parked", f.parkedLayers, - "feed", extPkt.Packet.SSRC, - ) - f.currentLayers = f.parkedLayers - found = true - } - } else { - if extPkt.KeyFrame { - if layer > f.currentLayers.Spatial && layer <= f.targetLayers.Spatial { - f.logger.Infow( - "upgrading layer", - "current", f.currentLayers, - "target", f.targetLayers, - "max", f.maxLayers, - "layer", layer, - "req", f.requestLayerSpatial, - "maxPublished", f.maxPublishedLayer, - "feed", extPkt.Packet.SSRC, - ) - found = true - } + } - if layer < f.currentLayers.Spatial && layer >= f.targetLayers.Spatial { - f.logger.Infow( - "downgrading layer", - "current", f.currentLayers, - "target", f.targetLayers, - "max", f.maxLayers, - "layer", layer, - "req", f.requestLayerSpatial, - "maxPublished", f.maxPublishedLayer, - "feed", extPkt.Packet.SSRC, - ) - found = true - } - - if found { - f.currentLayers.Spatial = layer - if !f.isTemporalSupported { - f.currentLayers.Temporal = extPkt.Temporal - } - } - } + // at this point, either + // 1. dependency description has selected the layer for forwarding OR + // 2. non-dependency deescriptor is yet to make decision, but it can potentially switch to the incoming layer and start forwarding + // + // both cases cases upgrade/downgrade to current layer under the right conditions + if f.currentLayers.Spatial != f.targetLayers.Spatial { + // Three things to check when not locked to target + // 1. Resumable layer - don't need a key frame + // 2. Opportunistic layer upgrade - needs a key frame if not using depedency descriptor + // 3. Need to downgrade - needs a key frame if not using dependency descriptor + found := false + if f.parkedLayers.IsValid() { + if f.parkedLayers.Spatial == layer { + f.logger.Infow( + "resuming at parked layer", + "current", f.currentLayers, + "target", f.targetLayers, + "parked", f.parkedLayers, + "feed", extPkt.Packet.SSRC, + ) + f.currentLayers = f.parkedLayers + found = true } - - if found { - tp.isSwitchingToTargetLayer = true - f.clearParkedLayers() - if f.currentLayers.Spatial >= f.maxLayers.Spatial { - tp.isSwitchingToMaxLayer = true - + } else { + if extPkt.KeyFrame || tp.isSwitchingToTargetLayer { + if layer > f.currentLayers.Spatial && layer <= f.targetLayers.Spatial { f.logger.Infow( - "reached max layer", + "upgrading layer", "current", f.currentLayers, "target", f.targetLayers, "max", f.maxLayers, @@ -1576,19 +1519,40 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in "maxPublished", f.maxPublishedLayer, "feed", extPkt.Packet.SSRC, ) + found = true } - if f.currentLayers.Spatial >= f.maxLayers.Spatial || f.currentLayers.Spatial == f.maxPublishedLayer { - f.targetLayers.Spatial = f.currentLayers.Spatial + if layer < f.currentLayers.Spatial && layer >= f.targetLayers.Spatial { + f.logger.Infow( + "downgrading layer", + "current", f.currentLayers, + "target", f.targetLayers, + "max", f.maxLayers, + "layer", layer, + "req", f.requestLayerSpatial, + "maxPublished", f.maxPublishedLayer, + "feed", extPkt.Packet.SSRC, + ) + found = true + } + + if found { + f.currentLayers.Spatial = layer + if !f.isTemporalSupported { + f.currentLayers.Temporal = f.targetLayers.Temporal + } } } } - // if locked to higher than max layer due to overshoot, check if it can be dialed back - if f.currentLayers.Spatial > f.maxLayers.Spatial { - if layer <= f.maxLayers.Spatial && extPkt.KeyFrame { + if found { + tp.isSwitchingToTargetLayer = true + f.clearParkedLayers() + if f.currentLayers.Spatial >= f.maxLayers.Spatial { + tp.isSwitchingToMaxLayer = true + f.logger.Infow( - "adjusting overshoot", + "reached max layer", "current", f.currentLayers, "target", f.targetLayers, "max", f.maxLayers, @@ -1597,14 +1561,40 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in "maxPublished", f.maxPublishedLayer, "feed", extPkt.Packet.SSRC, ) - f.currentLayers.Spatial = layer + } - if f.currentLayers.Spatial >= f.maxLayers.Spatial { - tp.isSwitchingToMaxLayer = true + if f.currentLayers.Spatial >= f.maxLayers.Spatial || f.currentLayers.Spatial == f.maxPublishedLayer { + f.targetLayers.Spatial = f.currentLayers.Spatial + if f.ddLayerSelector != nil { + f.ddLayerSelector.SelectLayer(f.targetLayers) } + } + } + } - if f.currentLayers.Spatial >= f.maxLayers.Spatial || f.currentLayers.Spatial == f.maxPublishedLayer { - f.targetLayers.Spatial = layer + // if locked to higher than max layer due to overshoot, check if it can be dialed back + if f.currentLayers.Spatial > f.maxLayers.Spatial { + if layer <= f.maxLayers.Spatial && (extPkt.KeyFrame || tp.isSwitchingToTargetLayer) { + f.logger.Infow( + "adjusting overshoot", + "current", f.currentLayers, + "target", f.targetLayers, + "max", f.maxLayers, + "layer", layer, + "req", f.requestLayerSpatial, + "maxPublished", f.maxPublishedLayer, + "feed", extPkt.Packet.SSRC, + ) + f.currentLayers.Spatial = layer + + if f.currentLayers.Spatial >= f.maxLayers.Spatial { + tp.isSwitchingToMaxLayer = true + } + + if f.currentLayers.Spatial >= f.maxLayers.Spatial || f.currentLayers.Spatial == f.maxPublishedLayer { + f.targetLayers.Spatial = layer + if f.ddLayerSelector != nil { + f.ddLayerSelector.SelectLayer(f.targetLayers) } } } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 67f00f22c..c8c5d9a63 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -298,11 +298,6 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff } layer := int32(0) - // for svc codecs, use layer full quality instead. - // we only have buffer for full quality - if w.isSVC { - layer = int32(len(w.buffers)) - 1 - } if w.Kind() == webrtc.RTPCodecTypeVideo && !w.isSVC { layer = buffer.RidToSpatialLayer(track.RID(), w.trackInfo) } @@ -510,10 +505,10 @@ func (w *WebRTCReceiver) getBuffer(layer int32) *buffer.Buffer { } func (w *WebRTCReceiver) getBufferLocked(layer int32) *buffer.Buffer { - // for svc codecs, use layer full quality instead. - // we only have buffer for full quality + // for svc codecs, use layer = 0 always. + // spatial layers are in-built and handled by single buffer if w.isSVC { - layer = int32(len(w.buffers)) - 1 + layer = 0 } if int(layer) >= len(w.buffers) {