mirror of
https://github.com/livekit/livekit.git
synced 2026-05-24 19:05:36 +00:00
Unify the forwarder between dependency descriptor and no DD case. (#1543)
This commit is contained in:
@@ -1277,10 +1277,10 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
|
||||
pliOnce := true
|
||||
sendPliOnce := func() {
|
||||
if pliOnce {
|
||||
targetLayers := d.forwarder.TargetLayers()
|
||||
if targetLayers != InvalidLayers && !d.forwarder.IsAnyMuted() {
|
||||
d.logger.Debugw("sending PLI RTCP", "layer", targetLayers.Spatial)
|
||||
d.receiver.SendPLI(targetLayers.Spatial, false)
|
||||
_, layer := d.forwarder.CheckSync()
|
||||
if layer != InvalidLayerSpatial && !d.forwarder.IsAnyMuted() {
|
||||
d.logger.Debugw("sending PLI RTCP", "layer", layer)
|
||||
d.receiver.SendPLI(layer, false)
|
||||
d.isNACKThrottled.Store(true)
|
||||
d.rtpStats.UpdatePliTime()
|
||||
pliOnce = false
|
||||
|
||||
+85
-95
@@ -1480,94 +1480,37 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in
|
||||
f.rtpMunger.UpdateAndGetSnTs(extPkt) // call to update highest incoming sequence number and other internal structures
|
||||
f.rtpMunger.PacketDropped(extPkt)
|
||||
return tp, nil
|
||||
} else if f.targetLayers.Spatial != f.currentLayers.Spatial && f.targetLayers.Spatial == layer && (extPkt.KeyFrame || tp.isSwitchingToTargetLayer) {
|
||||
// lock to target layer
|
||||
f.logger.Infow(
|
||||
"locking to target layer",
|
||||
"current", f.currentLayers,
|
||||
"target", f.targetLayers,
|
||||
"req", f.requestLayerSpatial,
|
||||
"feed", extPkt.Packet.SSRC,
|
||||
)
|
||||
f.currentLayers.Spatial = f.targetLayers.Spatial
|
||||
if !f.isTemporalSupported {
|
||||
f.currentLayers.Temporal = f.targetLayers.Temporal
|
||||
}
|
||||
// TODO : we switch to target layer immediately now since we assume all frame chain is integrity
|
||||
// if we have frame chain check, should switch only if target chain is not broken and decodable
|
||||
// if f.ddLayerSelector != nil {
|
||||
// f.ddLayerSelector.SelectLayer(f.currentLayers)
|
||||
// }
|
||||
if f.currentLayers.Spatial >= f.maxLayers.Spatial {
|
||||
tp.isSwitchingToMaxLayer = true
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if f.currentLayers.Spatial != f.targetLayers.Spatial {
|
||||
// Three things to check when not locked to target
|
||||
// 1. Resumable layer - don't need a key frame
|
||||
// 2. Opportunistic layer upgrade - needs a key frame
|
||||
// 3. Need to downgrade - needs a key frame
|
||||
found := false
|
||||
if f.parkedLayers.IsValid() {
|
||||
if f.parkedLayers.Spatial == layer {
|
||||
f.logger.Infow(
|
||||
"resuming at parked layer",
|
||||
"current", f.currentLayers,
|
||||
"target", f.targetLayers,
|
||||
"parked", f.parkedLayers,
|
||||
"feed", extPkt.Packet.SSRC,
|
||||
)
|
||||
f.currentLayers = f.parkedLayers
|
||||
found = true
|
||||
}
|
||||
} else {
|
||||
if extPkt.KeyFrame {
|
||||
if layer > f.currentLayers.Spatial && layer <= f.targetLayers.Spatial {
|
||||
f.logger.Infow(
|
||||
"upgrading layer",
|
||||
"current", f.currentLayers,
|
||||
"target", f.targetLayers,
|
||||
"max", f.maxLayers,
|
||||
"layer", layer,
|
||||
"req", f.requestLayerSpatial,
|
||||
"maxPublished", f.maxPublishedLayer,
|
||||
"feed", extPkt.Packet.SSRC,
|
||||
)
|
||||
found = true
|
||||
}
|
||||
}
|
||||
|
||||
if layer < f.currentLayers.Spatial && layer >= f.targetLayers.Spatial {
|
||||
f.logger.Infow(
|
||||
"downgrading layer",
|
||||
"current", f.currentLayers,
|
||||
"target", f.targetLayers,
|
||||
"max", f.maxLayers,
|
||||
"layer", layer,
|
||||
"req", f.requestLayerSpatial,
|
||||
"maxPublished", f.maxPublishedLayer,
|
||||
"feed", extPkt.Packet.SSRC,
|
||||
)
|
||||
found = true
|
||||
}
|
||||
|
||||
if found {
|
||||
f.currentLayers.Spatial = layer
|
||||
if !f.isTemporalSupported {
|
||||
f.currentLayers.Temporal = extPkt.Temporal
|
||||
}
|
||||
}
|
||||
}
|
||||
// at this point, either
|
||||
// 1. dependency description has selected the layer for forwarding OR
|
||||
// 2. non-dependency deescriptor is yet to make decision, but it can potentially switch to the incoming layer and start forwarding
|
||||
//
|
||||
// both cases cases upgrade/downgrade to current layer under the right conditions
|
||||
if f.currentLayers.Spatial != f.targetLayers.Spatial {
|
||||
// Three things to check when not locked to target
|
||||
// 1. Resumable layer - don't need a key frame
|
||||
// 2. Opportunistic layer upgrade - needs a key frame if not using depedency descriptor
|
||||
// 3. Need to downgrade - needs a key frame if not using dependency descriptor
|
||||
found := false
|
||||
if f.parkedLayers.IsValid() {
|
||||
if f.parkedLayers.Spatial == layer {
|
||||
f.logger.Infow(
|
||||
"resuming at parked layer",
|
||||
"current", f.currentLayers,
|
||||
"target", f.targetLayers,
|
||||
"parked", f.parkedLayers,
|
||||
"feed", extPkt.Packet.SSRC,
|
||||
)
|
||||
f.currentLayers = f.parkedLayers
|
||||
found = true
|
||||
}
|
||||
|
||||
if found {
|
||||
tp.isSwitchingToTargetLayer = true
|
||||
f.clearParkedLayers()
|
||||
if f.currentLayers.Spatial >= f.maxLayers.Spatial {
|
||||
tp.isSwitchingToMaxLayer = true
|
||||
|
||||
} else {
|
||||
if extPkt.KeyFrame || tp.isSwitchingToTargetLayer {
|
||||
if layer > f.currentLayers.Spatial && layer <= f.targetLayers.Spatial {
|
||||
f.logger.Infow(
|
||||
"reached max layer",
|
||||
"upgrading layer",
|
||||
"current", f.currentLayers,
|
||||
"target", f.targetLayers,
|
||||
"max", f.maxLayers,
|
||||
@@ -1576,19 +1519,40 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in
|
||||
"maxPublished", f.maxPublishedLayer,
|
||||
"feed", extPkt.Packet.SSRC,
|
||||
)
|
||||
found = true
|
||||
}
|
||||
|
||||
if f.currentLayers.Spatial >= f.maxLayers.Spatial || f.currentLayers.Spatial == f.maxPublishedLayer {
|
||||
f.targetLayers.Spatial = f.currentLayers.Spatial
|
||||
if layer < f.currentLayers.Spatial && layer >= f.targetLayers.Spatial {
|
||||
f.logger.Infow(
|
||||
"downgrading layer",
|
||||
"current", f.currentLayers,
|
||||
"target", f.targetLayers,
|
||||
"max", f.maxLayers,
|
||||
"layer", layer,
|
||||
"req", f.requestLayerSpatial,
|
||||
"maxPublished", f.maxPublishedLayer,
|
||||
"feed", extPkt.Packet.SSRC,
|
||||
)
|
||||
found = true
|
||||
}
|
||||
|
||||
if found {
|
||||
f.currentLayers.Spatial = layer
|
||||
if !f.isTemporalSupported {
|
||||
f.currentLayers.Temporal = f.targetLayers.Temporal
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if locked to higher than max layer due to overshoot, check if it can be dialed back
|
||||
if f.currentLayers.Spatial > f.maxLayers.Spatial {
|
||||
if layer <= f.maxLayers.Spatial && extPkt.KeyFrame {
|
||||
if found {
|
||||
tp.isSwitchingToTargetLayer = true
|
||||
f.clearParkedLayers()
|
||||
if f.currentLayers.Spatial >= f.maxLayers.Spatial {
|
||||
tp.isSwitchingToMaxLayer = true
|
||||
|
||||
f.logger.Infow(
|
||||
"adjusting overshoot",
|
||||
"reached max layer",
|
||||
"current", f.currentLayers,
|
||||
"target", f.targetLayers,
|
||||
"max", f.maxLayers,
|
||||
@@ -1597,14 +1561,40 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in
|
||||
"maxPublished", f.maxPublishedLayer,
|
||||
"feed", extPkt.Packet.SSRC,
|
||||
)
|
||||
f.currentLayers.Spatial = layer
|
||||
}
|
||||
|
||||
if f.currentLayers.Spatial >= f.maxLayers.Spatial {
|
||||
tp.isSwitchingToMaxLayer = true
|
||||
if f.currentLayers.Spatial >= f.maxLayers.Spatial || f.currentLayers.Spatial == f.maxPublishedLayer {
|
||||
f.targetLayers.Spatial = f.currentLayers.Spatial
|
||||
if f.ddLayerSelector != nil {
|
||||
f.ddLayerSelector.SelectLayer(f.targetLayers)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if f.currentLayers.Spatial >= f.maxLayers.Spatial || f.currentLayers.Spatial == f.maxPublishedLayer {
|
||||
f.targetLayers.Spatial = layer
|
||||
// if locked to higher than max layer due to overshoot, check if it can be dialed back
|
||||
if f.currentLayers.Spatial > f.maxLayers.Spatial {
|
||||
if layer <= f.maxLayers.Spatial && (extPkt.KeyFrame || tp.isSwitchingToTargetLayer) {
|
||||
f.logger.Infow(
|
||||
"adjusting overshoot",
|
||||
"current", f.currentLayers,
|
||||
"target", f.targetLayers,
|
||||
"max", f.maxLayers,
|
||||
"layer", layer,
|
||||
"req", f.requestLayerSpatial,
|
||||
"maxPublished", f.maxPublishedLayer,
|
||||
"feed", extPkt.Packet.SSRC,
|
||||
)
|
||||
f.currentLayers.Spatial = layer
|
||||
|
||||
if f.currentLayers.Spatial >= f.maxLayers.Spatial {
|
||||
tp.isSwitchingToMaxLayer = true
|
||||
}
|
||||
|
||||
if f.currentLayers.Spatial >= f.maxLayers.Spatial || f.currentLayers.Spatial == f.maxPublishedLayer {
|
||||
f.targetLayers.Spatial = layer
|
||||
if f.ddLayerSelector != nil {
|
||||
f.ddLayerSelector.SelectLayer(f.targetLayers)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+3
-8
@@ -298,11 +298,6 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff
|
||||
}
|
||||
|
||||
layer := int32(0)
|
||||
// for svc codecs, use layer full quality instead.
|
||||
// we only have buffer for full quality
|
||||
if w.isSVC {
|
||||
layer = int32(len(w.buffers)) - 1
|
||||
}
|
||||
if w.Kind() == webrtc.RTPCodecTypeVideo && !w.isSVC {
|
||||
layer = buffer.RidToSpatialLayer(track.RID(), w.trackInfo)
|
||||
}
|
||||
@@ -510,10 +505,10 @@ func (w *WebRTCReceiver) getBuffer(layer int32) *buffer.Buffer {
|
||||
}
|
||||
|
||||
func (w *WebRTCReceiver) getBufferLocked(layer int32) *buffer.Buffer {
|
||||
// for svc codecs, use layer full quality instead.
|
||||
// we only have buffer for full quality
|
||||
// for svc codecs, use layer = 0 always.
|
||||
// spatial layers are in-built and handled by single buffer
|
||||
if w.isSVC {
|
||||
layer = int32(len(w.buffers)) - 1
|
||||
layer = 0
|
||||
}
|
||||
|
||||
if int(layer) >= len(w.buffers) {
|
||||
|
||||
Reference in New Issue
Block a user