mirror of
https://github.com/livekit/livekit.git
synced 2026-05-25 14:15:15 +00:00
Refine dependency descriptor based selection forwarder (#1808)
* Don't update dependency info if unordered packet received * Trace all active svc chains for downtrack * Try to keep lower decode target decodable * remove comments * Test case * clean code * solve comments
This commit is contained in:
@@ -42,7 +42,7 @@ type ExtPacket struct {
|
||||
Payload interface{}
|
||||
KeyFrame bool
|
||||
RawPacket []byte
|
||||
DependencyDescriptor *DependencyDescriptorWithDecodeTarget
|
||||
DependencyDescriptor *ExtDependencyDescriptor
|
||||
}
|
||||
|
||||
// Buffer contains all packets
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/pion/rtp"
|
||||
|
||||
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/utils"
|
||||
|
||||
"github.com/livekit/protocol/logger"
|
||||
)
|
||||
@@ -17,6 +18,11 @@ type DependencyDescriptorParser struct {
|
||||
logger logger.Logger
|
||||
onMaxLayerChanged func(int32, int32)
|
||||
decodeTargets []DependencyDescriptorDecodeTarget
|
||||
|
||||
wrapAround *utils.WrapAround[uint16, uint64]
|
||||
structureExtSeq uint64
|
||||
activeDecodeTargetsExtSeq uint64
|
||||
activeDecodeTargetsMask uint32
|
||||
}
|
||||
|
||||
func NewDependencyDescriptorParser(ddExtID uint8, logger logger.Logger, onMaxLayerChanged func(int32, int32)) *DependencyDescriptorParser {
|
||||
@@ -25,16 +31,19 @@ func NewDependencyDescriptorParser(ddExtID uint8, logger logger.Logger, onMaxLay
|
||||
ddExtID: ddExtID,
|
||||
logger: logger,
|
||||
onMaxLayerChanged: onMaxLayerChanged,
|
||||
wrapAround: utils.NewWrapAround[uint16, uint64](),
|
||||
}
|
||||
}
|
||||
|
||||
type DependencyDescriptorWithDecodeTarget struct {
|
||||
Descriptor *dd.DependencyDescriptor
|
||||
DecodeTargets []DependencyDescriptorDecodeTarget
|
||||
type ExtDependencyDescriptor struct {
|
||||
Descriptor *dd.DependencyDescriptor
|
||||
|
||||
DecodeTargets []DependencyDescriptorDecodeTarget
|
||||
StructureUpdated bool
|
||||
ActiveDecodeTargetsUpdated bool
|
||||
}
|
||||
|
||||
func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*DependencyDescriptorWithDecodeTarget, VideoLayer, error) {
|
||||
// DD-TODO: make sure out-of-order RTP packets do not update decode targets
|
||||
func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescriptor, VideoLayer, error) {
|
||||
var videoLayer VideoLayer
|
||||
ddBuf := pkt.GetExtension(r.ddExtID)
|
||||
if ddBuf == nil {
|
||||
@@ -52,49 +61,53 @@ func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*DependencyDescript
|
||||
return nil, videoLayer, err
|
||||
}
|
||||
|
||||
extSeq := r.wrapAround.Update(pkt.SequenceNumber).ExtendedVal
|
||||
|
||||
if ddVal.FrameDependencies != nil {
|
||||
videoLayer.Spatial, videoLayer.Temporal = int32(ddVal.FrameDependencies.SpatialId), int32(ddVal.FrameDependencies.TemporalId)
|
||||
}
|
||||
if ddVal.AttachedStructure != nil && !ddVal.FirstPacketInFrame {
|
||||
// r.logger.Debugw("ignoring non-first packet in frame with attached structure")
|
||||
return nil, videoLayer, nil
|
||||
|
||||
extDD := &ExtDependencyDescriptor{
|
||||
Descriptor: &ddVal,
|
||||
}
|
||||
|
||||
if ddVal.AttachedStructure != nil {
|
||||
r.structure = ddVal.AttachedStructure
|
||||
r.decodeTargets = ProcessFrameDependencyStructure(ddVal.AttachedStructure)
|
||||
if len(r.decodeTargets) != 0 {
|
||||
r.logger.Debugw(fmt.Sprintf("update decode targets: %v", r.decodeTargets))
|
||||
r.onMaxLayerChanged(r.decodeTargets[0].Layer.Spatial, r.decodeTargets[0].Layer.Temporal)
|
||||
r.logger.Debugw(fmt.Sprintf("parsed dependency descriptor\n%s", ddVal.String()))
|
||||
if extSeq > r.structureExtSeq {
|
||||
r.structure = ddVal.AttachedStructure
|
||||
r.decodeTargets = ProcessFrameDependencyStructure(ddVal.AttachedStructure)
|
||||
r.structureExtSeq = extSeq
|
||||
extDD.StructureUpdated = true
|
||||
extDD.ActiveDecodeTargetsUpdated = true
|
||||
// The dependency descriptor reader will always set ActiveDecodeTargetsBitmask for TemplateDependencyStructure is present,
|
||||
// so don't need to notify max layer change here.
|
||||
}
|
||||
}
|
||||
|
||||
if ddVal.AttachedStructure != nil && ddVal.FirstPacketInFrame {
|
||||
r.logger.Debugw(fmt.Sprintf("parsed dependency descriptor\n%s", ddVal.String()))
|
||||
}
|
||||
|
||||
if mask := ddVal.ActiveDecodeTargetsBitmask; mask != nil {
|
||||
var maxSpatial, maxTemporal int32
|
||||
for _, dt := range r.decodeTargets {
|
||||
if *mask&(1<<dt.Target) != uint32(dd.DecodeTargetNotPresent) {
|
||||
if maxSpatial < dt.Layer.Spatial {
|
||||
maxSpatial = dt.Layer.Spatial
|
||||
}
|
||||
if maxTemporal < dt.Layer.Temporal {
|
||||
maxTemporal = dt.Layer.Temporal
|
||||
if mask := ddVal.ActiveDecodeTargetsBitmask; mask != nil && extSeq > r.activeDecodeTargetsExtSeq {
|
||||
r.activeDecodeTargetsExtSeq = extSeq
|
||||
if *mask != r.activeDecodeTargetsMask {
|
||||
r.activeDecodeTargetsMask = *mask
|
||||
extDD.ActiveDecodeTargetsUpdated = true
|
||||
var maxSpatial, maxTemporal int32
|
||||
for _, dt := range r.decodeTargets {
|
||||
if *mask&(1<<dt.Target) != uint32(dd.DecodeTargetNotPresent) {
|
||||
if maxSpatial < dt.Layer.Spatial {
|
||||
maxSpatial = dt.Layer.Spatial
|
||||
}
|
||||
if maxTemporal < dt.Layer.Temporal {
|
||||
maxTemporal = dt.Layer.Temporal
|
||||
}
|
||||
}
|
||||
}
|
||||
r.logger.Debugw("max layer changed", "maxSpatial", maxSpatial, "maxTemporal", maxTemporal)
|
||||
r.onMaxLayerChanged(maxSpatial, maxTemporal)
|
||||
}
|
||||
r.logger.Debugw("max layer changed", "maxSpatial", maxSpatial, "maxTemporal", maxTemporal)
|
||||
r.onMaxLayerChanged(maxSpatial, maxTemporal)
|
||||
}
|
||||
|
||||
withDecodeTargets := &DependencyDescriptorWithDecodeTarget{
|
||||
Descriptor: &ddVal,
|
||||
DecodeTargets: r.decodeTargets,
|
||||
}
|
||||
extDD.DecodeTargets = r.decodeTargets
|
||||
|
||||
return withDecodeTargets, videoLayer, nil
|
||||
return extDD, videoLayer, nil
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------
|
||||
@@ -131,22 +144,11 @@ func ProcessFrameDependencyStructure(structure *dd.FrameDependencyStructure) []D
|
||||
|
||||
func GetActiveDecodeTargetBitmask(layer VideoLayer, decodeTargets []DependencyDescriptorDecodeTarget) *uint32 {
|
||||
activeBitMask := uint32(0)
|
||||
var maxSpatial, maxTemporal int32
|
||||
for _, dt := range decodeTargets {
|
||||
if dt.Layer.Spatial > maxSpatial {
|
||||
maxSpatial = dt.Layer.Spatial
|
||||
}
|
||||
if dt.Layer.Temporal > maxTemporal {
|
||||
maxTemporal = dt.Layer.Temporal
|
||||
}
|
||||
if dt.Layer.Spatial <= layer.Spatial && dt.Layer.Temporal <= layer.Temporal {
|
||||
activeBitMask |= 1 << dt.Target
|
||||
}
|
||||
}
|
||||
if layer.Spatial == maxSpatial && layer.Temporal == maxTemporal {
|
||||
// all the decode targets are selected
|
||||
return nil
|
||||
}
|
||||
|
||||
return &activeBitMask
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ func (f *testFrameInfo) toVP8() *ExtPacket {
|
||||
func (f *testFrameInfo) toDD() *ExtPacket {
|
||||
return &ExtPacket{
|
||||
Packet: &rtp.Packet{Header: f.header},
|
||||
DependencyDescriptor: &DependencyDescriptorWithDecodeTarget{
|
||||
DependencyDescriptor: &ExtDependencyDescriptor{
|
||||
Descriptor: &dependencydescriptor.DependencyDescriptor{
|
||||
FrameNumber: f.framenumber,
|
||||
FrameDependencies: &dependencydescriptor.FrameDependencyTemplate{
|
||||
|
||||
@@ -1409,9 +1409,7 @@ func (f *Forwarder) CheckSync() (locked bool, layer int32) {
|
||||
f.lock.RLock()
|
||||
defer f.lock.RUnlock()
|
||||
|
||||
layer = f.vls.GetRequestSpatial()
|
||||
locked = layer == f.vls.GetCurrent().Spatial || f.vls.GetParked().IsValid()
|
||||
return
|
||||
return f.vls.CheckSync()
|
||||
}
|
||||
|
||||
func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLayers [buffer.DefaultMaxLayerSpatial + 1]bool) {
|
||||
|
||||
+8
-8
@@ -645,6 +645,14 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
|
||||
}
|
||||
}
|
||||
|
||||
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
|
||||
_ = dt.WriteRTP(pkt, spatialLayer)
|
||||
})
|
||||
|
||||
if redPktWriter != nil {
|
||||
redPktWriter(pkt, spatialLayer)
|
||||
}
|
||||
|
||||
if spatialTracker != nil {
|
||||
spatialTracker.Observe(
|
||||
pkt.Temporal,
|
||||
@@ -655,14 +663,6 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
|
||||
pkt.DependencyDescriptor,
|
||||
)
|
||||
}
|
||||
|
||||
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
|
||||
_ = dt.WriteRTP(pkt, spatialLayer)
|
||||
})
|
||||
|
||||
if redPktWriter != nil {
|
||||
redPktWriter(pkt, spatialLayer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -52,5 +52,5 @@ type StreamTrackerWorker interface {
|
||||
Status() StreamStatus
|
||||
BitrateTemporalCumulative() []int64
|
||||
SetPaused(paused bool)
|
||||
Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, dd *buffer.DependencyDescriptorWithDecodeTarget)
|
||||
Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, dd *buffer.ExtDependencyDescriptor)
|
||||
}
|
||||
|
||||
@@ -176,7 +176,7 @@ func (s *StreamTracker) Observe(
|
||||
payloadSize int,
|
||||
hasMarker bool,
|
||||
ts uint32,
|
||||
_ *buffer.DependencyDescriptorWithDecodeTarget,
|
||||
_ *buffer.ExtDependencyDescriptor,
|
||||
) {
|
||||
s.lock.Lock()
|
||||
|
||||
|
||||
@@ -123,7 +123,7 @@ func (s *StreamTrackerDependencyDescriptor) SetPaused(paused bool) {
|
||||
|
||||
}
|
||||
|
||||
func (s *StreamTrackerDependencyDescriptor) Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, ddVal *buffer.DependencyDescriptorWithDecodeTarget) {
|
||||
func (s *StreamTrackerDependencyDescriptor) Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, ddVal *buffer.ExtDependencyDescriptor) {
|
||||
s.lock.Lock()
|
||||
|
||||
if s.isStopped || s.paused || payloadSize == 0 || ddVal == nil {
|
||||
@@ -133,7 +133,7 @@ func (s *StreamTrackerDependencyDescriptor) Observe(temporalLayer int32, pktSize
|
||||
|
||||
var notifyFns []func(status StreamStatus)
|
||||
var notifyStatus StreamStatus
|
||||
if mask := ddVal.Descriptor.ActiveDecodeTargetsBitmask; mask != nil {
|
||||
if mask := ddVal.Descriptor.ActiveDecodeTargetsBitmask; mask != nil && ddVal.ActiveDecodeTargetsUpdated {
|
||||
var maxSpatial, maxTemporal int32
|
||||
for _, dt := range ddVal.DecodeTargets {
|
||||
if *mask&(1<<dt.Target) != uint32(dd.DecodeTargetNotPresent) {
|
||||
@@ -173,7 +173,6 @@ func (s *StreamTrackerDependencyDescriptor) Observe(temporalLayer int32, pktSize
|
||||
notifyFns = append(notifyFns, s.onStatusChanged[i])
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
dtis := ddVal.Descriptor.FrameDependencies.DecodeTargetIndications
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
"github.com/livekit/protocol/logger"
|
||||
)
|
||||
|
||||
func createDescriptorDependencyForTargets(maxSpatial, maxTemporal int) *buffer.DependencyDescriptorWithDecodeTarget {
|
||||
func createDescriptorDependencyForTargets(maxSpatial, maxTemporal int) *buffer.ExtDependencyDescriptor {
|
||||
var targets []buffer.DependencyDescriptorDecodeTarget
|
||||
var mask uint32
|
||||
for i := 0; i <= maxSpatial; i++ {
|
||||
@@ -26,14 +26,15 @@ func createDescriptorDependencyForTargets(maxSpatial, maxTemporal int) *buffer.D
|
||||
dtis[t.Target] = dd.DecodeTargetRequired
|
||||
}
|
||||
|
||||
return &buffer.DependencyDescriptorWithDecodeTarget{
|
||||
return &buffer.ExtDependencyDescriptor{
|
||||
Descriptor: &dd.DependencyDescriptor{
|
||||
ActiveDecodeTargetsBitmask: &mask,
|
||||
FrameDependencies: &dd.FrameDependencyTemplate{
|
||||
DecodeTargetIndications: dtis,
|
||||
},
|
||||
},
|
||||
DecodeTargets: targets,
|
||||
DecodeTargets: targets,
|
||||
ActiveDecodeTargetsUpdated: true,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -73,6 +73,12 @@ func (b *Base) GetRequestSpatial() int32 {
|
||||
return b.requestSpatial
|
||||
}
|
||||
|
||||
func (b *Base) CheckSync() (locked bool, layer int32) {
|
||||
layer = b.GetRequestSpatial()
|
||||
locked = layer == b.GetCurrent().Spatial || b.GetParked().IsValid()
|
||||
return
|
||||
}
|
||||
|
||||
func (b *Base) SetMaxSeen(maxSeenLayer buffer.VideoLayer) {
|
||||
b.maxSeenLayer = maxSeenLayer
|
||||
}
|
||||
|
||||
@@ -0,0 +1,56 @@
|
||||
package videolayerselector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
|
||||
)
|
||||
|
||||
type DecodeTarget struct {
|
||||
buffer.DependencyDescriptorDecodeTarget
|
||||
chain *FrameChain
|
||||
active bool
|
||||
}
|
||||
|
||||
type FrameDetectionResult struct {
|
||||
TargetValid bool
|
||||
DTI dd.DecodeTargetIndication
|
||||
}
|
||||
|
||||
func NewDecodeTarget(target buffer.DependencyDescriptorDecodeTarget, chain *FrameChain) *DecodeTarget {
|
||||
return &DecodeTarget{
|
||||
DependencyDescriptorDecodeTarget: target,
|
||||
chain: chain,
|
||||
}
|
||||
}
|
||||
|
||||
func (dt *DecodeTarget) Valid() bool {
|
||||
return dt.chain == nil || !dt.chain.Broken()
|
||||
}
|
||||
|
||||
func (dt *DecodeTarget) Active() bool {
|
||||
return dt.active
|
||||
}
|
||||
|
||||
func (dt *DecodeTarget) UpdateActive(activeBitmask uint32) {
|
||||
active := (activeBitmask & (1 << dt.Target)) != 0
|
||||
dt.active = active
|
||||
if dt.chain != nil {
|
||||
dt.chain.UpdateActive(active)
|
||||
}
|
||||
}
|
||||
|
||||
func (dt *DecodeTarget) OnFrame(extFrameNum uint64, fd *dd.FrameDependencyTemplate) (FrameDetectionResult, error) {
|
||||
result := FrameDetectionResult{}
|
||||
if len(fd.DecodeTargetIndications) <= dt.Target {
|
||||
return result, fmt.Errorf("mismatch target %d and len(DecodeTargetIndications) %d", dt.Target, len(fd.DecodeTargetIndications))
|
||||
}
|
||||
|
||||
result.DTI = fd.DecodeTargetIndications[dt.Target]
|
||||
// The encoder can choose not to use frame chain in theory, and we need to trace every required frame is decodable in this case.
|
||||
// But we don't observe this in browser and it makes no sense to not use the chain with svc, so only use chain to detect decode target broken now,
|
||||
// and always return decodable if it is not protect by chain.
|
||||
result.TargetValid = dt.Valid()
|
||||
return result, nil
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package videolayerselector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
dede "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
|
||||
@@ -15,16 +16,20 @@ type DependencyDescriptor struct {
|
||||
frameNum *utils.WrapAround[uint16, uint64]
|
||||
decisions *SelectorDecisionCache
|
||||
|
||||
needsDecodeTargetBitmask bool
|
||||
activeDecodeTargetsBitmask *uint32
|
||||
structure *dede.FrameDependencyStructure
|
||||
|
||||
chains []*FrameChain
|
||||
|
||||
decodeTargetsLock sync.RWMutex
|
||||
decodeTargets []*DecodeTarget
|
||||
}
|
||||
|
||||
func NewDependencyDescriptor(logger logger.Logger) *DependencyDescriptor {
|
||||
return &DependencyDescriptor{
|
||||
Base: NewBase(logger),
|
||||
frameNum: utils.NewWrapAround[uint16, uint64](),
|
||||
decisions: NewSelectorDecisionCache(256),
|
||||
decisions: NewSelectorDecisionCache(256, 80),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,7 +37,7 @@ func NewDependencyDescriptorFromNull(vls VideoLayerSelector) *DependencyDescript
|
||||
return &DependencyDescriptor{
|
||||
Base: vls.(*Null).Base,
|
||||
frameNum: utils.NewWrapAround[uint16, uint64](),
|
||||
decisions: NewSelectorDecisionCache(256),
|
||||
decisions: NewSelectorDecisionCache(256, 80),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,11 +73,6 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r
|
||||
return
|
||||
}
|
||||
switch sd {
|
||||
case selectorDecisionForwarded:
|
||||
// a packet of an alreadty forwarded frame, maintain decision
|
||||
result.RTPMarker = extPkt.Packet.Header.Marker || (dd.LastPacketInFrame && d.currentLayer.Spatial == int32(fd.SpatialId))
|
||||
result.IsSelected = true
|
||||
|
||||
case selectorDecisionDropped:
|
||||
// a packet of an alreadty dropped frame, maintain decision
|
||||
return
|
||||
@@ -83,6 +83,86 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r
|
||||
return
|
||||
}
|
||||
|
||||
if ddwdt.StructureUpdated {
|
||||
d.updateDependencyStructure(dd.AttachedStructure, ddwdt.DecodeTargets)
|
||||
}
|
||||
|
||||
if ddwdt.ActiveDecodeTargetsUpdated {
|
||||
d.updateActiveDecodeTargets(*dd.ActiveDecodeTargetsBitmask)
|
||||
}
|
||||
|
||||
for _, chain := range d.chains {
|
||||
chain.OnFrame(extFrameNum, fd)
|
||||
}
|
||||
|
||||
// find decode target closest to targetLayer
|
||||
highestDecodeTarget := buffer.DependencyDescriptorDecodeTarget{
|
||||
Target: -1,
|
||||
Layer: buffer.InvalidLayer,
|
||||
}
|
||||
var dti dede.DecodeTargetIndication
|
||||
d.decodeTargetsLock.RLock()
|
||||
for _, dt := range d.decodeTargets {
|
||||
if !dt.Active() || dt.Layer.Spatial > d.targetLayer.Spatial || dt.Layer.Temporal > d.targetLayer.Temporal {
|
||||
continue
|
||||
}
|
||||
|
||||
frameResult, err := dt.OnFrame(extFrameNum, fd)
|
||||
if err != nil {
|
||||
d.decodeTargetsLock.RUnlock()
|
||||
// dtis error, dependency descriptor might lost
|
||||
d.logger.Debugw(fmt.Sprintf("drop packet for frame detection error, incoming: %v",
|
||||
incomingLayer,
|
||||
), "err", err)
|
||||
d.decisions.AddDropped(extFrameNum)
|
||||
return
|
||||
}
|
||||
|
||||
// Keep forwarding the lower spatial with temporal layer 0 to keep the lower frame chain intact,
|
||||
// it will cost a few extra bits as those frames might not be present in the current target
|
||||
// but will make the subscriber switch to lower layer seamlessly without pli.
|
||||
if frameResult.TargetValid {
|
||||
if highestDecodeTarget.Target == -1 {
|
||||
highestDecodeTarget = dt.DependencyDescriptorDecodeTarget
|
||||
dti = frameResult.DTI
|
||||
} else if dt.Layer.Spatial < highestDecodeTarget.Layer.Spatial && dt.Layer.Temporal == 0 &&
|
||||
frameResult.DTI != dede.DecodeTargetNotPresent && frameResult.DTI != dede.DecodeTargetDiscardable {
|
||||
dti = frameResult.DTI
|
||||
}
|
||||
}
|
||||
}
|
||||
d.decodeTargetsLock.RUnlock()
|
||||
|
||||
// DD-TODO : we don't have a rtp queue to ensure the order of packets now,
|
||||
// so we don't know packet is lost/out of order, that cause us can't detect
|
||||
// frame integrity, entire frame is forwareded, whether frame chain is broken.
|
||||
// So use a simple check here, assume all the reference frame is forwarded and
|
||||
// only check DTI of the active decode target.
|
||||
// it is not effeciency, at last we need check frame chain integrity.
|
||||
|
||||
if highestDecodeTarget.Target < 0 {
|
||||
// no active decode target, do not select
|
||||
// d.logger.Debugw(fmt.Sprintf("drop packet for no target found, decodeTargets %v, tagetLayer %v, incoming %v",
|
||||
// d.decodeTargets,
|
||||
// d.targetLayer,
|
||||
// incomingLayer,
|
||||
// ))
|
||||
d.decisions.AddDropped(extFrameNum)
|
||||
return
|
||||
}
|
||||
|
||||
// // DD-TODO : if bandwidth in congest, could drop the 'Discardable' frame
|
||||
if dti == dede.DecodeTargetNotPresent {
|
||||
// d.logger.Debugw(fmt.Sprintf("drop packet for decode target not present, highestDecodeTarget %d, incoming %v, fn: %d/%d",
|
||||
// highestDecodeTarget,
|
||||
// incomingLayer,
|
||||
// dd.FrameNumber,
|
||||
// extFrameNum,
|
||||
// ))
|
||||
d.decisions.AddDropped(extFrameNum)
|
||||
return
|
||||
}
|
||||
|
||||
// check decodability using reference frames
|
||||
isDecodable := true
|
||||
for _, fdiff := range fd.FrameDiffs {
|
||||
@@ -90,122 +170,14 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r
|
||||
continue
|
||||
}
|
||||
|
||||
if sd, _ := d.decisions.GetDecision(extFrameNum - uint64(fdiff)); sd != selectorDecisionForwarded {
|
||||
// use relaxed check for frame diff that we have chain intact detection and don't want
|
||||
// to drop packet due to out-of-order packet or recoverable packet loss
|
||||
if sd, _ := d.decisions.GetDecision(extFrameNum - uint64(fdiff)); sd == selectorDecisionDropped {
|
||||
isDecodable = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !isDecodable {
|
||||
// DD-TODO START
|
||||
// Not decodable could happen due to packet loss or out-of-order packets,
|
||||
// Need to figure out better ways to handle this.
|
||||
//
|
||||
// 1. Should definitely check if this frame is not part of current decode target OR discardable.
|
||||
// In that case, forwarding can proceed without disruption.
|
||||
// 2. Add a packet queue and try to de-jitter for some time. Safest is to packet copy to local queue on
|
||||
// all down tracks.
|
||||
// 3. Force a PLI and wait for a key frame.
|
||||
// DD-TODO END
|
||||
d.decisions.AddDropped(extFrameNum)
|
||||
return
|
||||
}
|
||||
|
||||
// DD-TODO should not update for out-of-order RTP packets
|
||||
if dd.AttachedStructure != nil {
|
||||
// update decode target layer and active decode targets
|
||||
// DD-TODO : these targets info can be shared by all the downtracks, no need calculate in every selector
|
||||
d.updateDependencyStructure(dd.AttachedStructure)
|
||||
}
|
||||
|
||||
// DD-TODO : we don't have a rtp queue to ensure the order of packets now,
|
||||
// so we don't know packet is lost/out of order, that cause us can't detect
|
||||
// frame integrity, entire frame is forwareded, whether frame chain is broken.
|
||||
// So use a simple check here, assume all the reference frame is forwarded and
|
||||
// only check DTI of the active decode target.
|
||||
// it is not effeciency, at last we need check frame chain integrity.
|
||||
|
||||
activeDecodeTargets := dd.ActiveDecodeTargetsBitmask
|
||||
if activeDecodeTargets != nil {
|
||||
d.logger.Debugw("active decode targets", "activeDecodeTargets", *activeDecodeTargets)
|
||||
}
|
||||
|
||||
// find decode target closest to targetLayer
|
||||
highestDecodeTarget := buffer.DependencyDescriptorDecodeTarget{
|
||||
Target: -1,
|
||||
Layer: buffer.InvalidLayer,
|
||||
}
|
||||
for _, dt := range ddwdt.DecodeTargets {
|
||||
if dt.Layer.Spatial > d.targetLayer.Spatial || dt.Layer.Temporal > d.targetLayer.Temporal {
|
||||
continue
|
||||
}
|
||||
|
||||
if activeDecodeTargets != nil && ((*activeDecodeTargets)&(1<<dt.Target) == 0) {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(d.structure.DecodeTargetProtectedByChain) == 0 {
|
||||
highestDecodeTarget = dt
|
||||
//d.logger.Debugw("select target", "highestDecodeTarget", highestDecodeTarget, "dtis", fd.DecodeTargetIndications)
|
||||
break
|
||||
}
|
||||
|
||||
if len(d.structure.DecodeTargetProtectedByChain) < dt.Target {
|
||||
// look for lower target
|
||||
continue
|
||||
}
|
||||
|
||||
chainIdx := d.structure.DecodeTargetProtectedByChain[dt.Target]
|
||||
if len(fd.ChainDiffs) < chainIdx {
|
||||
// look for lower target
|
||||
continue
|
||||
}
|
||||
|
||||
prevFrameInChain := extFrameNum - uint64(fd.ChainDiffs[chainIdx])
|
||||
if prevFrameInChain != 0 && prevFrameInChain != extFrameNum {
|
||||
if sd, err := d.decisions.GetDecision(prevFrameInChain); err != nil || sd != selectorDecisionForwarded {
|
||||
// look for lower target
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
highestDecodeTarget = dt
|
||||
//d.logger.Debugw("select target", "highestDecodeTarget", highestDecodeTarget, "dtis", fd.DecodeTargetIndications)
|
||||
break
|
||||
}
|
||||
|
||||
if highestDecodeTarget.Target < 0 {
|
||||
// no active decode target, do not select
|
||||
//d.logger.Debugw(fmt.Sprintf("drop packet for no target found, decodeTargets %v, tagetLayer %v, incoming %v",
|
||||
//d.decodeTargets,
|
||||
//d.targetLayer,
|
||||
//incomingLayer,
|
||||
//))
|
||||
d.decisions.AddDropped(extFrameNum)
|
||||
return
|
||||
}
|
||||
|
||||
dtis := fd.DecodeTargetIndications
|
||||
if len(dtis) < highestDecodeTarget.Target {
|
||||
// dtis error, dependency descriptor might lost
|
||||
d.logger.Debugw(fmt.Sprintf("drop packet for dtis error, dtis %v, highestDecodeTarget %+v, incoming: %v",
|
||||
dtis,
|
||||
highestDecodeTarget,
|
||||
incomingLayer,
|
||||
))
|
||||
d.decisions.AddDropped(extFrameNum)
|
||||
return
|
||||
}
|
||||
|
||||
// DD-TODO : if bandwidth in congest, could drop the 'Discardable' packet
|
||||
dti := dtis[highestDecodeTarget.Target]
|
||||
if dti == dede.DecodeTargetNotPresent {
|
||||
//d.logger.Debugw(fmt.Sprintf("drop packet for decode target not present, dtis %v, highestDecodeTarget %d, incoming %v, fn: %d/%d",
|
||||
//dtis,
|
||||
//highestDecodeTarget,
|
||||
//incomingLayer,
|
||||
//dd.FrameNumber,
|
||||
//extFrameNum,
|
||||
//))
|
||||
d.decisions.AddDropped(extFrameNum)
|
||||
return
|
||||
}
|
||||
@@ -225,6 +197,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r
|
||||
)
|
||||
}
|
||||
d.currentLayer = highestDecodeTarget.Layer
|
||||
d.activeDecodeTargetsBitmask = buffer.GetActiveDecodeTargetBitmask(d.currentLayer, ddwdt.DecodeTargets)
|
||||
if d.currentLayer.Spatial == d.requestSpatial {
|
||||
result.IsSwitchingToRequestSpatial = true
|
||||
}
|
||||
@@ -249,15 +222,9 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r
|
||||
Structure: d.structure,
|
||||
}
|
||||
if dd.AttachedStructure == nil {
|
||||
if d.needsDecodeTargetBitmask {
|
||||
d.needsDecodeTargetBitmask = false
|
||||
|
||||
d.activeDecodeTargetsBitmask = buffer.GetActiveDecodeTargetBitmask(d.targetLayer, ddwdt.DecodeTargets)
|
||||
d.logger.Debugw("setting decode target bitmask", "activeDecodeTargetsBitmask", d.activeDecodeTargetsBitmask)
|
||||
}
|
||||
|
||||
if d.activeDecodeTargetsBitmask != nil {
|
||||
// clone and override activebitmask
|
||||
// DD-TODO: if the packet that contains the bitmask is acknowledged by RR, then we don't need it until it changed.
|
||||
ddClone := *ddExtension.Descriptor
|
||||
ddClone.ActiveDecodeTargetsBitmask = d.activeDecodeTargetsBitmask
|
||||
ddExtension.Descriptor = &ddClone
|
||||
@@ -294,10 +261,65 @@ func (d *DependencyDescriptor) SetTarget(targetLayer buffer.VideoLayer) {
|
||||
}
|
||||
|
||||
d.Base.SetTarget(targetLayer)
|
||||
|
||||
d.needsDecodeTargetBitmask = true
|
||||
}
|
||||
|
||||
func (d *DependencyDescriptor) updateDependencyStructure(structure *dede.FrameDependencyStructure) {
|
||||
func (d *DependencyDescriptor) updateDependencyStructure(structure *dede.FrameDependencyStructure, decodeTargets []buffer.DependencyDescriptorDecodeTarget) {
|
||||
d.structure = structure
|
||||
|
||||
d.chains = d.chains[:0]
|
||||
|
||||
for chainIdx := 0; chainIdx < structure.NumChains; chainIdx++ {
|
||||
d.chains = append(d.chains, NewFrameChain(d.decisions, chainIdx, d.logger))
|
||||
}
|
||||
|
||||
newTargets := make([]*DecodeTarget, 0, len(decodeTargets))
|
||||
for _, dt := range decodeTargets {
|
||||
var chain *FrameChain
|
||||
// When chain_cnt > 0, each Decode target MUST be protected by exactly one Chain.
|
||||
if structure.NumChains > 0 {
|
||||
chainIdx := structure.DecodeTargetProtectedByChain[dt.Target]
|
||||
if chainIdx >= len(d.chains) {
|
||||
// should not happen
|
||||
d.logger.Errorw("DecodeTargetProtectedByChain chainIdx out of range", nil, "chainIdx", chainIdx, "NumChains", len(d.chains))
|
||||
} else {
|
||||
chain = d.chains[chainIdx]
|
||||
}
|
||||
}
|
||||
newTargets = append(newTargets, NewDecodeTarget(dt, chain))
|
||||
}
|
||||
d.decodeTargetsLock.Lock()
|
||||
d.decodeTargets = newTargets
|
||||
d.decodeTargetsLock.Unlock()
|
||||
}
|
||||
|
||||
func (d *DependencyDescriptor) updateActiveDecodeTargets(activeDecodeTargetsBitmask uint32) {
|
||||
for _, chain := range d.chains {
|
||||
chain.BeginUpdateActive()
|
||||
}
|
||||
|
||||
d.decodeTargetsLock.RLock()
|
||||
for _, dt := range d.decodeTargets {
|
||||
dt.UpdateActive(activeDecodeTargetsBitmask)
|
||||
}
|
||||
d.decodeTargetsLock.RUnlock()
|
||||
|
||||
for _, chain := range d.chains {
|
||||
chain.EndUpdateActive()
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DependencyDescriptor) CheckSync() (locked bool, layer int32) {
|
||||
layer = d.GetRequestSpatial()
|
||||
if d.GetParked().IsValid() {
|
||||
return true, layer
|
||||
}
|
||||
|
||||
d.decodeTargetsLock.RLock()
|
||||
defer d.decodeTargetsLock.RUnlock()
|
||||
for _, dt := range d.decodeTargets {
|
||||
if dt.Active() && dt.Layer.Spatial == layer && dt.Valid() {
|
||||
return true, layer
|
||||
}
|
||||
}
|
||||
return false, layer
|
||||
}
|
||||
|
||||
@@ -0,0 +1,377 @@
|
||||
package videolayerselector
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
|
||||
"github.com/livekit/protocol/logger"
|
||||
"github.com/pion/rtp"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDecodeTarget(t *testing.T) {
|
||||
target := buffer.DependencyDescriptorDecodeTarget{
|
||||
Target: 1,
|
||||
Layer: buffer.VideoLayer{Spatial: 1, Temporal: 2},
|
||||
}
|
||||
|
||||
t.Run("No Chain", func(t *testing.T) {
|
||||
dt := NewDecodeTarget(target, nil)
|
||||
require.True(t, dt.Valid())
|
||||
// no indication found
|
||||
_, err := dt.OnFrame(1, &dd.FrameDependencyTemplate{
|
||||
DecodeTargetIndications: []dd.DecodeTargetIndication{},
|
||||
})
|
||||
require.Error(t, err)
|
||||
|
||||
ret, err := dt.OnFrame(1, &dd.FrameDependencyTemplate{
|
||||
DecodeTargetIndications: []dd.DecodeTargetIndication{dd.DecodeTargetNotPresent, dd.DecodeTargetRequired},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.True(t, ret.TargetValid)
|
||||
require.Equal(t, dd.DecodeTargetRequired, ret.DTI)
|
||||
})
|
||||
|
||||
t.Run("With Chain", func(t *testing.T) {
|
||||
decisions := NewSelectorDecisionCache(256, 80)
|
||||
chain := NewFrameChain(decisions, 1, logger.GetLogger())
|
||||
dt := NewDecodeTarget(target, chain)
|
||||
chain.BeginUpdateActive()
|
||||
dt.UpdateActive(1 << dt.Target)
|
||||
chain.EndUpdateActive()
|
||||
require.True(t, dt.Active())
|
||||
require.False(t, dt.Valid())
|
||||
|
||||
// chain intact
|
||||
frame := &dd.FrameDependencyTemplate{
|
||||
DecodeTargetIndications: []dd.DecodeTargetIndication{dd.DecodeTargetNotPresent, dd.DecodeTargetRequired},
|
||||
ChainDiffs: []int{0, 0},
|
||||
}
|
||||
chain.OnFrame(1, frame)
|
||||
require.True(t, dt.Valid())
|
||||
ret, err := dt.OnFrame(1, frame)
|
||||
require.NoError(t, err)
|
||||
require.True(t, ret.TargetValid)
|
||||
require.Equal(t, dd.DecodeTargetRequired, ret.DTI)
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
func TestFrameChain(t *testing.T) {
|
||||
decisions := NewSelectorDecisionCache(256, 3)
|
||||
chain := NewFrameChain(decisions, 0, logger.GetLogger())
|
||||
require.True(t, chain.Broken())
|
||||
|
||||
// chain intact
|
||||
frameNoDiff := &dd.FrameDependencyTemplate{
|
||||
ChainDiffs: []int{0},
|
||||
}
|
||||
// not active
|
||||
require.False(t, chain.OnFrame(1, frameNoDiff))
|
||||
|
||||
chain.BeginUpdateActive()
|
||||
chain.UpdateActive(true)
|
||||
chain.EndUpdateActive()
|
||||
|
||||
require.True(t, chain.OnFrame(1, frameNoDiff))
|
||||
decisions.AddForwarded(1)
|
||||
|
||||
frameDiff1 := &dd.FrameDependencyTemplate{
|
||||
ChainDiffs: []int{1},
|
||||
}
|
||||
|
||||
require.True(t, chain.OnFrame(2, frameDiff1))
|
||||
decisions.AddForwarded(2)
|
||||
|
||||
// frame 5 arrives first , but frame 4 can be recovered by NACK
|
||||
require.True(t, chain.OnFrame(5, frameDiff1))
|
||||
decisions.AddForwarded(5)
|
||||
|
||||
// frame 4 arrives, chain remains intact
|
||||
require.True(t, chain.OnFrame(4, frameDiff1))
|
||||
decisions.AddForwarded(4)
|
||||
|
||||
// frame 3 missed by out of nack range, chain broken
|
||||
decisions.AddForwarded(7)
|
||||
require.True(t, chain.Broken())
|
||||
|
||||
// recovery by non-diff frame
|
||||
require.True(t, chain.OnFrame(1000, frameNoDiff))
|
||||
require.False(t, chain.Broken())
|
||||
decisions.AddForwarded(1000)
|
||||
|
||||
// broken by dropped frame
|
||||
require.True(t, chain.OnFrame(1002, frameDiff1))
|
||||
decisions.AddDropped(1001)
|
||||
require.True(t, chain.Broken())
|
||||
|
||||
// recovery by non-diff frame
|
||||
require.True(t, chain.OnFrame(2000, frameNoDiff))
|
||||
decisions.AddForwarded(2000)
|
||||
decisions.AddDropped(2001)
|
||||
require.False(t, chain.OnFrame(2002, frameDiff1))
|
||||
require.True(t, chain.Broken())
|
||||
}
|
||||
|
||||
func TestDependencyDescriptor(t *testing.T) {
|
||||
ddSelector := NewDependencyDescriptor(logger.GetLogger())
|
||||
targetLayer := buffer.VideoLayer{Spatial: 1, Temporal: 2}
|
||||
ddSelector.SetTarget(targetLayer)
|
||||
ddSelector.SetRequestSpatial(1)
|
||||
|
||||
// no dd ext, dropped
|
||||
ret := ddSelector.Select(&buffer.ExtPacket{}, 0)
|
||||
require.False(t, ret.IsSelected)
|
||||
require.False(t, ret.IsRelevant)
|
||||
|
||||
// non key frame, dropped
|
||||
ret = ddSelector.Select(&buffer.ExtPacket{
|
||||
KeyFrame: false,
|
||||
DependencyDescriptor: &buffer.ExtDependencyDescriptor{
|
||||
Descriptor: &dd.DependencyDescriptor{
|
||||
FrameNumber: 1,
|
||||
FrameDependencies: &dd.FrameDependencyTemplate{
|
||||
SpatialId: int(targetLayer.Spatial),
|
||||
TemporalId: int(targetLayer.Temporal),
|
||||
},
|
||||
},
|
||||
},
|
||||
}, 0)
|
||||
require.False(t, ret.IsSelected)
|
||||
require.True(t, ret.IsRelevant)
|
||||
|
||||
frames := createDDFrames(buffer.VideoLayer{Spatial: 2, Temporal: 2}, 3)
|
||||
// key frame, update structure and decode targets
|
||||
ret = ddSelector.Select(frames[0], 0)
|
||||
require.True(t, ret.IsSelected)
|
||||
require.Equal(t, ddSelector.GetCurrent(), ddSelector.GetTarget())
|
||||
sync, _ := ddSelector.CheckSync()
|
||||
require.True(t, sync)
|
||||
|
||||
// forward frame belongs to target layer
|
||||
// drop frame exceeds target layer (not present in target layer or lower layer)
|
||||
// forward frame not present in target layer but present in lower layer
|
||||
var (
|
||||
belongTargetCase bool
|
||||
exceedTargetCase bool
|
||||
lowerTargetCase bool
|
||||
)
|
||||
idx := 1
|
||||
var frameForwarded, frameDropped []*buffer.ExtPacket
|
||||
for ; idx < len(frames); idx++ {
|
||||
fd := frames[idx].DependencyDescriptor.Descriptor.FrameDependencies
|
||||
ret = ddSelector.Select(frames[idx], 0)
|
||||
switch {
|
||||
case fd.SpatialId == int(targetLayer.Spatial) && fd.TemporalId == int(targetLayer.Temporal):
|
||||
require.True(t, ret.IsSelected)
|
||||
belongTargetCase = true
|
||||
frameForwarded = append(frameForwarded, frames[idx])
|
||||
case fd.SpatialId < int(targetLayer.Spatial) && fd.TemporalId == 0:
|
||||
require.True(t, ret.IsSelected)
|
||||
lowerTargetCase = true
|
||||
frameForwarded = append(frameForwarded, frames[idx])
|
||||
case fd.SpatialId > int(targetLayer.Spatial) || fd.TemporalId > int(targetLayer.Temporal):
|
||||
require.False(t, ret.IsSelected)
|
||||
exceedTargetCase = true
|
||||
frameDropped = append(frameDropped, frames[idx])
|
||||
}
|
||||
|
||||
if belongTargetCase && exceedTargetCase && lowerTargetCase {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
require.True(t, belongTargetCase && exceedTargetCase && lowerTargetCase)
|
||||
|
||||
// select frame already forwarded
|
||||
ret = ddSelector.Select(frameForwarded[0], 0)
|
||||
require.True(t, ret.IsSelected)
|
||||
|
||||
// drop frame already dropped
|
||||
ret = ddSelector.Select(frameDropped[0], 0)
|
||||
require.False(t, ret.IsSelected)
|
||||
|
||||
// drop frame present but not decodable (dependency frame missed)
|
||||
idx++
|
||||
for ; idx < len(frames); idx++ {
|
||||
fd := frames[idx].DependencyDescriptor.Descriptor.FrameDependencies
|
||||
ret = ddSelector.Select(frames[idx], 0)
|
||||
if fd.SpatialId == int(targetLayer.Spatial) && fd.TemporalId == int(targetLayer.Temporal) {
|
||||
break
|
||||
}
|
||||
}
|
||||
notDecodableFrame := frames[idx]
|
||||
notDecodableFrame.DependencyDescriptor.Descriptor.FrameDependencies.FrameDiffs = []int{
|
||||
int(notDecodableFrame.DependencyDescriptor.Descriptor.FrameNumber - frameDropped[0].DependencyDescriptor.Descriptor.FrameNumber),
|
||||
}
|
||||
ret = ddSelector.Select(notDecodableFrame, 0)
|
||||
require.False(t, ret.IsSelected)
|
||||
|
||||
// target layer broken
|
||||
idx++
|
||||
for ; idx < len(frames); idx++ {
|
||||
fd := frames[idx].DependencyDescriptor.Descriptor.FrameDependencies
|
||||
ret = ddSelector.Select(frames[idx], 0)
|
||||
if fd.SpatialId == int(targetLayer.Spatial) && fd.TemporalId == int(targetLayer.Temporal) {
|
||||
break
|
||||
}
|
||||
}
|
||||
brokenFrame := frames[idx]
|
||||
brokenFrame.DependencyDescriptor.Descriptor.FrameDependencies.ChainDiffs[targetLayer.Spatial] =
|
||||
int(notDecodableFrame.DependencyDescriptor.Descriptor.FrameNumber - frameDropped[0].DependencyDescriptor.Descriptor.FrameNumber)
|
||||
ret = ddSelector.Select(brokenFrame, 0)
|
||||
require.False(t, ret.IsSelected)
|
||||
|
||||
// switch to lower layer, forward frame
|
||||
idx++
|
||||
var switchToLower bool
|
||||
for ; idx < len(frames); idx++ {
|
||||
ret = ddSelector.Select(frames[idx], 0)
|
||||
if ret.IsSelected {
|
||||
require.True(t, targetLayer.GreaterThan(ddSelector.GetCurrent()))
|
||||
switchToLower = true
|
||||
break
|
||||
}
|
||||
}
|
||||
require.True(t, switchToLower)
|
||||
|
||||
// not sync with requested layer
|
||||
ddSelector.SetRequestSpatial(targetLayer.Spatial)
|
||||
locked, layer := ddSelector.CheckSync()
|
||||
require.False(t, locked)
|
||||
require.Equal(t, targetLayer.Spatial, layer)
|
||||
|
||||
// request to current layer, sync
|
||||
ddSelector.SetRequestSpatial(ddSelector.GetCurrent().Spatial)
|
||||
locked, _ = ddSelector.CheckSync()
|
||||
require.True(t, locked)
|
||||
}
|
||||
|
||||
func createDDFrames(maxLayer buffer.VideoLayer, startFrameNumber uint16) []*buffer.ExtPacket {
|
||||
var frames []*buffer.ExtPacket
|
||||
var activeBitMask uint32
|
||||
var decodeTargets []buffer.DependencyDescriptorDecodeTarget
|
||||
var decodeTargetsProtectByChain []int
|
||||
for i := 0; i <= int(maxLayer.Spatial); i++ {
|
||||
for j := 0; j <= int(maxLayer.Temporal); j++ {
|
||||
decodeTargets = append(decodeTargets, buffer.DependencyDescriptorDecodeTarget{
|
||||
Target: i*int(maxLayer.Temporal+1) + j,
|
||||
Layer: buffer.VideoLayer{Spatial: int32(i), Temporal: int32(j)},
|
||||
})
|
||||
decodeTargetsProtectByChain = append(decodeTargetsProtectByChain, i)
|
||||
activeBitMask |= 1 << uint(i*int(maxLayer.Temporal+1)+j)
|
||||
}
|
||||
}
|
||||
sort.Slice(decodeTargets, func(i, j int) bool {
|
||||
return decodeTargets[i].Layer.GreaterThan(decodeTargets[j].Layer)
|
||||
})
|
||||
|
||||
chainDiffs := make([]int, len(decodeTargets))
|
||||
dtis := make([]dd.DecodeTargetIndication, len(decodeTargets))
|
||||
for _, dt := range decodeTargets {
|
||||
dtis[dt.Target] = dd.DecodeTargetSwitch
|
||||
}
|
||||
|
||||
templates := make([]*dd.FrameDependencyTemplate, len(decodeTargets))
|
||||
|
||||
for _, dt := range decodeTargets {
|
||||
templates[dt.Target] = &dd.FrameDependencyTemplate{
|
||||
SpatialId: int(dt.Layer.Spatial),
|
||||
TemporalId: int(dt.Layer.Temporal),
|
||||
ChainDiffs: chainDiffs,
|
||||
DecodeTargetIndications: dtis,
|
||||
}
|
||||
}
|
||||
keyFrame := &buffer.ExtPacket{
|
||||
KeyFrame: true,
|
||||
DependencyDescriptor: &buffer.ExtDependencyDescriptor{
|
||||
Descriptor: &dd.DependencyDescriptor{
|
||||
FrameNumber: startFrameNumber,
|
||||
FrameDependencies: &dd.FrameDependencyTemplate{
|
||||
SpatialId: 0,
|
||||
TemporalId: 0,
|
||||
ChainDiffs: chainDiffs,
|
||||
DecodeTargetIndications: dtis,
|
||||
},
|
||||
AttachedStructure: &dd.FrameDependencyStructure{
|
||||
NumDecodeTargets: int((maxLayer.Spatial + 1) * (maxLayer.Temporal + 1)),
|
||||
NumChains: int(maxLayer.Spatial) + 1,
|
||||
DecodeTargetProtectedByChain: decodeTargetsProtectByChain,
|
||||
Templates: templates,
|
||||
},
|
||||
ActiveDecodeTargetsBitmask: &activeBitMask,
|
||||
},
|
||||
DecodeTargets: decodeTargets,
|
||||
StructureUpdated: true,
|
||||
ActiveDecodeTargetsUpdated: true,
|
||||
},
|
||||
Packet: &rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
SSRC: 1234,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
frames = append(frames, keyFrame)
|
||||
|
||||
chainPrevFrame := make(map[int]int)
|
||||
for i := 0; i <= int(maxLayer.Spatial); i++ {
|
||||
chainPrevFrame[i] = int(startFrameNumber)
|
||||
}
|
||||
startFrameNumber++
|
||||
for i := 0; i < 10; i++ {
|
||||
for j := len(decodeTargets) - 1; j >= 0; j-- {
|
||||
dt := decodeTargets[j]
|
||||
frameChainDiffs := make([]int, len(chainDiffs))
|
||||
for i := range frameChainDiffs {
|
||||
frameChainDiffs[i] = int(startFrameNumber) - chainPrevFrame[i]
|
||||
}
|
||||
|
||||
frameDtis := make([]dd.DecodeTargetIndication, len(dtis))
|
||||
for k := range frameDtis {
|
||||
if k >= dt.Target {
|
||||
if dt.Layer.Temporal == 0 {
|
||||
frameDtis[k] = dd.DecodeTargetRequired
|
||||
} else {
|
||||
frameDtis[k] = dd.DecodeTargetDiscardable
|
||||
}
|
||||
} else {
|
||||
frameDtis[k] = dd.DecodeTargetNotPresent
|
||||
}
|
||||
}
|
||||
|
||||
frame := &buffer.ExtPacket{
|
||||
KeyFrame: true,
|
||||
DependencyDescriptor: &buffer.ExtDependencyDescriptor{
|
||||
Descriptor: &dd.DependencyDescriptor{
|
||||
FrameNumber: startFrameNumber,
|
||||
FrameDependencies: &dd.FrameDependencyTemplate{
|
||||
SpatialId: int(dt.Layer.Spatial),
|
||||
TemporalId: int(dt.Layer.Temporal),
|
||||
ChainDiffs: frameChainDiffs,
|
||||
DecodeTargetIndications: frameDtis,
|
||||
},
|
||||
},
|
||||
DecodeTargets: decodeTargets,
|
||||
},
|
||||
Packet: &rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
SSRC: 1234,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
startFrameNumber++
|
||||
|
||||
if dt.Layer.Temporal == 0 {
|
||||
chainPrevFrame[int(dt.Layer.Spatial)] = int(startFrameNumber)
|
||||
}
|
||||
|
||||
frames = append(frames, frame)
|
||||
}
|
||||
}
|
||||
|
||||
return frames
|
||||
}
|
||||
@@ -0,0 +1,114 @@
|
||||
package videolayerselector
|
||||
|
||||
import (
|
||||
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
|
||||
"github.com/livekit/protocol/logger"
|
||||
)
|
||||
|
||||
type FrameChain struct {
|
||||
logger logger.Logger
|
||||
decisions *SelectorDecisionCache
|
||||
broken bool
|
||||
chainIdx int
|
||||
active bool
|
||||
updatingActive bool
|
||||
|
||||
expectFrames []uint64
|
||||
}
|
||||
|
||||
func NewFrameChain(decisions *SelectorDecisionCache, chainIdx int, logger logger.Logger) *FrameChain {
|
||||
return &FrameChain{
|
||||
logger: logger,
|
||||
decisions: decisions,
|
||||
broken: true,
|
||||
chainIdx: chainIdx,
|
||||
active: false,
|
||||
}
|
||||
}
|
||||
|
||||
func (fc *FrameChain) OnFrame(extFrameNum uint64, fd *dd.FrameDependencyTemplate) bool {
|
||||
if !fc.active {
|
||||
return false
|
||||
}
|
||||
|
||||
// A decodable frame with frame_chain_fdiff equal to 0 indicates that the Chain is intact.
|
||||
if fd.ChainDiffs[fc.chainIdx] == 0 {
|
||||
if fc.broken {
|
||||
fc.broken = false
|
||||
fc.logger.Debugw("frame chain intact", "chanIdx", fc.chainIdx)
|
||||
}
|
||||
fc.expectFrames = fc.expectFrames[:0]
|
||||
return true
|
||||
}
|
||||
|
||||
if fc.broken {
|
||||
return false
|
||||
}
|
||||
|
||||
prevFrameInChain := extFrameNum - uint64(fd.ChainDiffs[fc.chainIdx])
|
||||
sd, err := fc.decisions.GetDecision(prevFrameInChain)
|
||||
if err != nil {
|
||||
fc.logger.Debugw("could not get decision", "err", err, "frame", extFrameNum, "prevFrame", prevFrameInChain)
|
||||
}
|
||||
|
||||
var intact bool
|
||||
switch {
|
||||
case sd == selectorDecisionForwarded:
|
||||
intact = true
|
||||
|
||||
case sd == selectorDecisionUnknown:
|
||||
// If the previous frame is unknown, means it has not arrived but could be recovered by NACK / out-of-order arrival,
|
||||
// set up a expected callback here to determine if the chain is broken or intact
|
||||
if fc.decisions.ExpectDecision(prevFrameInChain, fc.OnExpectFrameChanged) {
|
||||
intact = true
|
||||
fc.expectFrames = append(fc.expectFrames, prevFrameInChain)
|
||||
}
|
||||
}
|
||||
|
||||
if !intact {
|
||||
fc.broken = true
|
||||
fc.logger.Debugw("frame chain broken", "chanIdx", fc.chainIdx, "sd", sd, "frame", extFrameNum, "prevFrame", prevFrameInChain)
|
||||
}
|
||||
return intact
|
||||
}
|
||||
|
||||
func (fc *FrameChain) OnExpectFrameChanged(frameNum uint64, decision selectorDecision) {
|
||||
for i, f := range fc.expectFrames {
|
||||
if f == frameNum {
|
||||
if decision != selectorDecisionForwarded {
|
||||
fc.broken = true
|
||||
}
|
||||
fc.expectFrames[i] = fc.expectFrames[len(fc.expectFrames)-1]
|
||||
fc.expectFrames = fc.expectFrames[:len(fc.expectFrames)-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (fc *FrameChain) Broken() bool {
|
||||
return fc.broken
|
||||
}
|
||||
|
||||
func (fc *FrameChain) BeginUpdateActive() {
|
||||
fc.updatingActive = false
|
||||
}
|
||||
|
||||
func (fc *FrameChain) UpdateActive(active bool) {
|
||||
fc.updatingActive = fc.updatingActive || active
|
||||
}
|
||||
|
||||
func (fc *FrameChain) EndUpdateActive() {
|
||||
active := fc.updatingActive
|
||||
fc.updatingActive = false
|
||||
|
||||
if active == fc.active {
|
||||
return
|
||||
}
|
||||
|
||||
// if the chain transit from inactive to active, reset broken to wait a decodable SWITCH frame
|
||||
if !fc.active {
|
||||
fc.broken = true
|
||||
}
|
||||
|
||||
fc.active = active
|
||||
}
|
||||
@@ -33,18 +33,23 @@ func (s selectorDecision) String() string {
|
||||
// ----------------------------------------------------------------------
|
||||
|
||||
type SelectorDecisionCache struct {
|
||||
initialized bool
|
||||
base uint64
|
||||
last uint64
|
||||
masks []uint64
|
||||
numEntries uint64
|
||||
initialized bool
|
||||
base uint64
|
||||
last uint64
|
||||
masks []uint64
|
||||
numEntries uint64
|
||||
numNackEntries uint64
|
||||
|
||||
onExpectEntityChanged map[uint64][]func(entity uint64, decision selectorDecision)
|
||||
}
|
||||
|
||||
func NewSelectorDecisionCache(maxNumElements uint64) *SelectorDecisionCache {
|
||||
func NewSelectorDecisionCache(maxNumElements uint64, numNackEntries uint64) *SelectorDecisionCache {
|
||||
numElements := (maxNumElements*2 + 63) / 64
|
||||
return &SelectorDecisionCache{
|
||||
masks: make([]uint64, numElements),
|
||||
numEntries: numElements * 32, // 2 bits per entry
|
||||
masks: make([]uint64, numElements),
|
||||
numEntries: numElements * 32, // 2 bits per entry
|
||||
numNackEntries: numNackEntries,
|
||||
onExpectEntityChanged: make(map[uint64][]func(entity uint64, decision selectorDecision)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,19 +62,39 @@ func (s *SelectorDecisionCache) AddDropped(entity uint64) {
|
||||
}
|
||||
|
||||
func (s *SelectorDecisionCache) GetDecision(entity uint64) (selectorDecision, error) {
|
||||
if !s.initialized || entity > s.last || entity < s.base {
|
||||
if !s.initialized || entity < s.base {
|
||||
return selectorDecisionMissing, nil
|
||||
}
|
||||
|
||||
if entity > s.last {
|
||||
return selectorDecisionUnknown, nil
|
||||
}
|
||||
|
||||
offset := s.last - entity
|
||||
if offset >= s.numEntries {
|
||||
// asking for something too old
|
||||
return selectorDecisionUnknown, fmt.Errorf("too old, oldest: %d, asking: %d", s.last-s.numEntries+1, entity)
|
||||
return selectorDecisionMissing, fmt.Errorf("too old, oldest: %d, asking: %d", s.last-s.numEntries+1, entity)
|
||||
}
|
||||
|
||||
return s.getEntity(entity), nil
|
||||
}
|
||||
|
||||
func (s *SelectorDecisionCache) ExpectDecision(entity uint64, f func(entity uint64, decision selectorDecision)) bool {
|
||||
if !s.initialized || entity < s.base {
|
||||
return false
|
||||
}
|
||||
|
||||
if entity < s.last {
|
||||
offset := s.last - entity
|
||||
if offset >= s.numEntries {
|
||||
return false // too old
|
||||
}
|
||||
}
|
||||
|
||||
s.onExpectEntityChanged[entity] = append(s.onExpectEntityChanged[entity], f)
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *SelectorDecisionCache) addEntity(entity uint64, sd selectorDecision) {
|
||||
if !s.initialized {
|
||||
s.initialized = true
|
||||
@@ -90,16 +115,60 @@ func (s *SelectorDecisionCache) addEntity(entity uint64, sd selectorDecision) {
|
||||
}
|
||||
|
||||
for e := s.last + 1; e != entity; e++ {
|
||||
s.setEntity(e, selectorDecisionMissing)
|
||||
s.setEntity(e, selectorDecisionUnknown)
|
||||
}
|
||||
|
||||
// update [last+1-nack, entity-nack) to missing
|
||||
missingStart := s.last
|
||||
if missingStart > s.numNackEntries+s.base {
|
||||
missingStart -= s.numNackEntries
|
||||
} else {
|
||||
missingStart = s.base
|
||||
}
|
||||
missingEnd := entity
|
||||
if missingEnd > s.numNackEntries+s.base {
|
||||
missingEnd -= s.numNackEntries
|
||||
} else {
|
||||
missingEnd = s.base
|
||||
}
|
||||
if missingEnd > missingStart {
|
||||
for e := missingStart; e != missingEnd; e++ {
|
||||
s.setEntityIfUnknown(e, selectorDecisionMissing)
|
||||
}
|
||||
}
|
||||
|
||||
s.setEntity(entity, sd)
|
||||
s.last = entity
|
||||
|
||||
for e, fns := range s.onExpectEntityChanged {
|
||||
if e+s.numEntries < s.last {
|
||||
delete(s.onExpectEntityChanged, e)
|
||||
for _, f := range fns {
|
||||
f(e, selectorDecisionMissing)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SelectorDecisionCache) setEntityIfUnknown(entity uint64, sd selectorDecision) {
|
||||
if s.getEntity(entity) == selectorDecisionUnknown {
|
||||
s.setEntity(entity, sd)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SelectorDecisionCache) setEntity(entity uint64, sd selectorDecision) {
|
||||
index, bitpos := s.getPos(entity)
|
||||
s.masks[index] &= ^(0x3 << bitpos) // clear before bitwise OR
|
||||
s.masks[index] |= (uint64(sd) & 0x3) << bitpos
|
||||
|
||||
if sd != selectorDecisionUnknown {
|
||||
if fns, ok := s.onExpectEntityChanged[entity]; ok {
|
||||
delete(s.onExpectEntityChanged, entity)
|
||||
for _, f := range fns {
|
||||
f(entity, sd)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SelectorDecisionCache) getEntity(entity uint64) selectorDecision {
|
||||
|
||||
@@ -32,6 +32,8 @@ type VideoLayerSelector interface {
|
||||
SetRequestSpatial(layer int32)
|
||||
GetRequestSpatial() int32
|
||||
|
||||
CheckSync() (locked bool, layer int32)
|
||||
|
||||
SetMaxSeen(maxSeenLayer buffer.VideoLayer)
|
||||
SetMaxSeenSpatial(layer int32)
|
||||
SetMaxSeenTemporal(layer int32)
|
||||
|
||||
Reference in New Issue
Block a user