diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go
index bc5b45e20..b242662dd 100644
--- a/pkg/sfu/buffer/buffer.go
+++ b/pkg/sfu/buffer/buffer.go
@@ -42,7 +42,7 @@ type ExtPacket struct {
Payload interface{}
KeyFrame bool
RawPacket []byte
- DependencyDescriptor *DependencyDescriptorWithDecodeTarget
+ DependencyDescriptor *ExtDependencyDescriptor
}
// Buffer contains all packets
diff --git a/pkg/sfu/buffer/dependencydescriptorparser.go b/pkg/sfu/buffer/dependencydescriptorparser.go
index c52e86c83..71b4aac2e 100644
--- a/pkg/sfu/buffer/dependencydescriptorparser.go
+++ b/pkg/sfu/buffer/dependencydescriptorparser.go
@@ -7,6 +7,7 @@ import (
"github.com/pion/rtp"
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
+ "github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/protocol/logger"
)
@@ -17,6 +18,11 @@ type DependencyDescriptorParser struct {
logger logger.Logger
onMaxLayerChanged func(int32, int32)
decodeTargets []DependencyDescriptorDecodeTarget
+
+ wrapAround *utils.WrapAround[uint16, uint64]
+ structureExtSeq uint64
+ activeDecodeTargetsExtSeq uint64
+ activeDecodeTargetsMask uint32
}
func NewDependencyDescriptorParser(ddExtID uint8, logger logger.Logger, onMaxLayerChanged func(int32, int32)) *DependencyDescriptorParser {
@@ -25,16 +31,19 @@ func NewDependencyDescriptorParser(ddExtID uint8, logger logger.Logger, onMaxLay
ddExtID: ddExtID,
logger: logger,
onMaxLayerChanged: onMaxLayerChanged,
+ wrapAround: utils.NewWrapAround[uint16, uint64](),
}
}
-type DependencyDescriptorWithDecodeTarget struct {
- Descriptor *dd.DependencyDescriptor
- DecodeTargets []DependencyDescriptorDecodeTarget
+type ExtDependencyDescriptor struct {
+ Descriptor *dd.DependencyDescriptor
+
+ DecodeTargets []DependencyDescriptorDecodeTarget
+ StructureUpdated bool
+ ActiveDecodeTargetsUpdated bool
}
-func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*DependencyDescriptorWithDecodeTarget, VideoLayer, error) {
- // DD-TODO: make sure out-of-order RTP packets do not update decode targets
+func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescriptor, VideoLayer, error) {
var videoLayer VideoLayer
ddBuf := pkt.GetExtension(r.ddExtID)
if ddBuf == nil {
@@ -52,49 +61,53 @@ func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*DependencyDescript
return nil, videoLayer, err
}
+ extSeq := r.wrapAround.Update(pkt.SequenceNumber).ExtendedVal
+
if ddVal.FrameDependencies != nil {
videoLayer.Spatial, videoLayer.Temporal = int32(ddVal.FrameDependencies.SpatialId), int32(ddVal.FrameDependencies.TemporalId)
}
- if ddVal.AttachedStructure != nil && !ddVal.FirstPacketInFrame {
- // r.logger.Debugw("ignoring non-first packet in frame with attached structure")
- return nil, videoLayer, nil
+
+ extDD := &ExtDependencyDescriptor{
+ Descriptor: &ddVal,
}
if ddVal.AttachedStructure != nil {
- r.structure = ddVal.AttachedStructure
- r.decodeTargets = ProcessFrameDependencyStructure(ddVal.AttachedStructure)
- if len(r.decodeTargets) != 0 {
- r.logger.Debugw(fmt.Sprintf("update decode targets: %v", r.decodeTargets))
- r.onMaxLayerChanged(r.decodeTargets[0].Layer.Spatial, r.decodeTargets[0].Layer.Temporal)
+ r.logger.Debugw(fmt.Sprintf("parsed dependency descriptor\n%s", ddVal.String()))
+ if extSeq > r.structureExtSeq {
+ r.structure = ddVal.AttachedStructure
+ r.decodeTargets = ProcessFrameDependencyStructure(ddVal.AttachedStructure)
+ r.structureExtSeq = extSeq
+ extDD.StructureUpdated = true
+ extDD.ActiveDecodeTargetsUpdated = true
+ // The dependency descriptor reader will always set ActiveDecodeTargetsBitmask for TemplateDependencyStructure is present,
+ // so don't need to notify max layer change here.
}
}
- if ddVal.AttachedStructure != nil && ddVal.FirstPacketInFrame {
- r.logger.Debugw(fmt.Sprintf("parsed dependency descriptor\n%s", ddVal.String()))
- }
-
- if mask := ddVal.ActiveDecodeTargetsBitmask; mask != nil {
- var maxSpatial, maxTemporal int32
- for _, dt := range r.decodeTargets {
- if *mask&(1<
r.activeDecodeTargetsExtSeq {
+ r.activeDecodeTargetsExtSeq = extSeq
+ if *mask != r.activeDecodeTargetsMask {
+ r.activeDecodeTargetsMask = *mask
+ extDD.ActiveDecodeTargetsUpdated = true
+ var maxSpatial, maxTemporal int32
+ for _, dt := range r.decodeTargets {
+ if *mask&(1< maxSpatial {
- maxSpatial = dt.Layer.Spatial
- }
- if dt.Layer.Temporal > maxTemporal {
- maxTemporal = dt.Layer.Temporal
- }
if dt.Layer.Spatial <= layer.Spatial && dt.Layer.Temporal <= layer.Temporal {
activeBitMask |= 1 << dt.Target
}
}
- if layer.Spatial == maxSpatial && layer.Temporal == maxTemporal {
- // all the decode targets are selected
- return nil
- }
return &activeBitMask
}
diff --git a/pkg/sfu/buffer/fps_test.go b/pkg/sfu/buffer/fps_test.go
index 206041acf..b090d2770 100644
--- a/pkg/sfu/buffer/fps_test.go
+++ b/pkg/sfu/buffer/fps_test.go
@@ -31,7 +31,7 @@ func (f *testFrameInfo) toVP8() *ExtPacket {
func (f *testFrameInfo) toDD() *ExtPacket {
return &ExtPacket{
Packet: &rtp.Packet{Header: f.header},
- DependencyDescriptor: &DependencyDescriptorWithDecodeTarget{
+ DependencyDescriptor: &ExtDependencyDescriptor{
Descriptor: &dependencydescriptor.DependencyDescriptor{
FrameNumber: f.framenumber,
FrameDependencies: &dependencydescriptor.FrameDependencyTemplate{
diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go
index 8929cf736..75e00baa9 100644
--- a/pkg/sfu/forwarder.go
+++ b/pkg/sfu/forwarder.go
@@ -1409,9 +1409,7 @@ func (f *Forwarder) CheckSync() (locked bool, layer int32) {
f.lock.RLock()
defer f.lock.RUnlock()
- layer = f.vls.GetRequestSpatial()
- locked = layer == f.vls.GetCurrent().Spatial || f.vls.GetParked().IsValid()
- return
+ return f.vls.CheckSync()
}
func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLayers [buffer.DefaultMaxLayerSpatial + 1]bool) {
diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go
index 80ef3f007..26ca98ad7 100644
--- a/pkg/sfu/receiver.go
+++ b/pkg/sfu/receiver.go
@@ -645,6 +645,14 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
}
}
+ w.downTrackSpreader.Broadcast(func(dt TrackSender) {
+ _ = dt.WriteRTP(pkt, spatialLayer)
+ })
+
+ if redPktWriter != nil {
+ redPktWriter(pkt, spatialLayer)
+ }
+
if spatialTracker != nil {
spatialTracker.Observe(
pkt.Temporal,
@@ -655,14 +663,6 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
pkt.DependencyDescriptor,
)
}
-
- w.downTrackSpreader.Broadcast(func(dt TrackSender) {
- _ = dt.WriteRTP(pkt, spatialLayer)
- })
-
- if redPktWriter != nil {
- redPktWriter(pkt, spatialLayer)
- }
}
}
diff --git a/pkg/sfu/streamtracker/interfaces.go b/pkg/sfu/streamtracker/interfaces.go
index 934032f68..a9135e631 100644
--- a/pkg/sfu/streamtracker/interfaces.go
+++ b/pkg/sfu/streamtracker/interfaces.go
@@ -52,5 +52,5 @@ type StreamTrackerWorker interface {
Status() StreamStatus
BitrateTemporalCumulative() []int64
SetPaused(paused bool)
- Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, dd *buffer.DependencyDescriptorWithDecodeTarget)
+ Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, dd *buffer.ExtDependencyDescriptor)
}
diff --git a/pkg/sfu/streamtracker/streamtracker.go b/pkg/sfu/streamtracker/streamtracker.go
index a1b645fb2..1be7dc0fc 100644
--- a/pkg/sfu/streamtracker/streamtracker.go
+++ b/pkg/sfu/streamtracker/streamtracker.go
@@ -176,7 +176,7 @@ func (s *StreamTracker) Observe(
payloadSize int,
hasMarker bool,
ts uint32,
- _ *buffer.DependencyDescriptorWithDecodeTarget,
+ _ *buffer.ExtDependencyDescriptor,
) {
s.lock.Lock()
diff --git a/pkg/sfu/streamtracker/streamtracker_dd.go b/pkg/sfu/streamtracker/streamtracker_dd.go
index b6daee387..2d7301f7c 100644
--- a/pkg/sfu/streamtracker/streamtracker_dd.go
+++ b/pkg/sfu/streamtracker/streamtracker_dd.go
@@ -123,7 +123,7 @@ func (s *StreamTrackerDependencyDescriptor) SetPaused(paused bool) {
}
-func (s *StreamTrackerDependencyDescriptor) Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, ddVal *buffer.DependencyDescriptorWithDecodeTarget) {
+func (s *StreamTrackerDependencyDescriptor) Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, ddVal *buffer.ExtDependencyDescriptor) {
s.lock.Lock()
if s.isStopped || s.paused || payloadSize == 0 || ddVal == nil {
@@ -133,7 +133,7 @@ func (s *StreamTrackerDependencyDescriptor) Observe(temporalLayer int32, pktSize
var notifyFns []func(status StreamStatus)
var notifyStatus StreamStatus
- if mask := ddVal.Descriptor.ActiveDecodeTargetsBitmask; mask != nil {
+ if mask := ddVal.Descriptor.ActiveDecodeTargetsBitmask; mask != nil && ddVal.ActiveDecodeTargetsUpdated {
var maxSpatial, maxTemporal int32
for _, dt := range ddVal.DecodeTargets {
if *mask&(1< d.targetLayer.Spatial || dt.Layer.Temporal > d.targetLayer.Temporal {
+ continue
+ }
+
+ frameResult, err := dt.OnFrame(extFrameNum, fd)
+ if err != nil {
+ d.decodeTargetsLock.RUnlock()
+ // dtis error, dependency descriptor might lost
+ d.logger.Debugw(fmt.Sprintf("drop packet for frame detection error, incoming: %v",
+ incomingLayer,
+ ), "err", err)
+ d.decisions.AddDropped(extFrameNum)
+ return
+ }
+
+ // Keep forwarding the lower spatial with temporal layer 0 to keep the lower frame chain intact,
+ // it will cost a few extra bits as those frames might not be present in the current target
+ // but will make the subscriber switch to lower layer seamlessly without pli.
+ if frameResult.TargetValid {
+ if highestDecodeTarget.Target == -1 {
+ highestDecodeTarget = dt.DependencyDescriptorDecodeTarget
+ dti = frameResult.DTI
+ } else if dt.Layer.Spatial < highestDecodeTarget.Layer.Spatial && dt.Layer.Temporal == 0 &&
+ frameResult.DTI != dede.DecodeTargetNotPresent && frameResult.DTI != dede.DecodeTargetDiscardable {
+ dti = frameResult.DTI
+ }
+ }
+ }
+ d.decodeTargetsLock.RUnlock()
+
+ // DD-TODO : we don't have a rtp queue to ensure the order of packets now,
+ // so we don't know packet is lost/out of order, that cause us can't detect
+ // frame integrity, entire frame is forwareded, whether frame chain is broken.
+ // So use a simple check here, assume all the reference frame is forwarded and
+ // only check DTI of the active decode target.
+ // it is not effeciency, at last we need check frame chain integrity.
+
+ if highestDecodeTarget.Target < 0 {
+ // no active decode target, do not select
+ // d.logger.Debugw(fmt.Sprintf("drop packet for no target found, decodeTargets %v, tagetLayer %v, incoming %v",
+ // d.decodeTargets,
+ // d.targetLayer,
+ // incomingLayer,
+ // ))
+ d.decisions.AddDropped(extFrameNum)
+ return
+ }
+
+ // // DD-TODO : if bandwidth in congest, could drop the 'Discardable' frame
+ if dti == dede.DecodeTargetNotPresent {
+ // d.logger.Debugw(fmt.Sprintf("drop packet for decode target not present, highestDecodeTarget %d, incoming %v, fn: %d/%d",
+ // highestDecodeTarget,
+ // incomingLayer,
+ // dd.FrameNumber,
+ // extFrameNum,
+ // ))
+ d.decisions.AddDropped(extFrameNum)
+ return
+ }
+
// check decodability using reference frames
isDecodable := true
for _, fdiff := range fd.FrameDiffs {
@@ -90,122 +170,14 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r
continue
}
- if sd, _ := d.decisions.GetDecision(extFrameNum - uint64(fdiff)); sd != selectorDecisionForwarded {
+ // use relaxed check for frame diff that we have chain intact detection and don't want
+ // to drop packet due to out-of-order packet or recoverable packet loss
+ if sd, _ := d.decisions.GetDecision(extFrameNum - uint64(fdiff)); sd == selectorDecisionDropped {
isDecodable = false
break
}
}
if !isDecodable {
- // DD-TODO START
- // Not decodable could happen due to packet loss or out-of-order packets,
- // Need to figure out better ways to handle this.
- //
- // 1. Should definitely check if this frame is not part of current decode target OR discardable.
- // In that case, forwarding can proceed without disruption.
- // 2. Add a packet queue and try to de-jitter for some time. Safest is to packet copy to local queue on
- // all down tracks.
- // 3. Force a PLI and wait for a key frame.
- // DD-TODO END
- d.decisions.AddDropped(extFrameNum)
- return
- }
-
- // DD-TODO should not update for out-of-order RTP packets
- if dd.AttachedStructure != nil {
- // update decode target layer and active decode targets
- // DD-TODO : these targets info can be shared by all the downtracks, no need calculate in every selector
- d.updateDependencyStructure(dd.AttachedStructure)
- }
-
- // DD-TODO : we don't have a rtp queue to ensure the order of packets now,
- // so we don't know packet is lost/out of order, that cause us can't detect
- // frame integrity, entire frame is forwareded, whether frame chain is broken.
- // So use a simple check here, assume all the reference frame is forwarded and
- // only check DTI of the active decode target.
- // it is not effeciency, at last we need check frame chain integrity.
-
- activeDecodeTargets := dd.ActiveDecodeTargetsBitmask
- if activeDecodeTargets != nil {
- d.logger.Debugw("active decode targets", "activeDecodeTargets", *activeDecodeTargets)
- }
-
- // find decode target closest to targetLayer
- highestDecodeTarget := buffer.DependencyDescriptorDecodeTarget{
- Target: -1,
- Layer: buffer.InvalidLayer,
- }
- for _, dt := range ddwdt.DecodeTargets {
- if dt.Layer.Spatial > d.targetLayer.Spatial || dt.Layer.Temporal > d.targetLayer.Temporal {
- continue
- }
-
- if activeDecodeTargets != nil && ((*activeDecodeTargets)&(1< 0, each Decode target MUST be protected by exactly one Chain.
+ if structure.NumChains > 0 {
+ chainIdx := structure.DecodeTargetProtectedByChain[dt.Target]
+ if chainIdx >= len(d.chains) {
+ // should not happen
+ d.logger.Errorw("DecodeTargetProtectedByChain chainIdx out of range", nil, "chainIdx", chainIdx, "NumChains", len(d.chains))
+ } else {
+ chain = d.chains[chainIdx]
+ }
+ }
+ newTargets = append(newTargets, NewDecodeTarget(dt, chain))
+ }
+ d.decodeTargetsLock.Lock()
+ d.decodeTargets = newTargets
+ d.decodeTargetsLock.Unlock()
+}
+
+func (d *DependencyDescriptor) updateActiveDecodeTargets(activeDecodeTargetsBitmask uint32) {
+ for _, chain := range d.chains {
+ chain.BeginUpdateActive()
+ }
+
+ d.decodeTargetsLock.RLock()
+ for _, dt := range d.decodeTargets {
+ dt.UpdateActive(activeDecodeTargetsBitmask)
+ }
+ d.decodeTargetsLock.RUnlock()
+
+ for _, chain := range d.chains {
+ chain.EndUpdateActive()
+ }
+}
+
+func (d *DependencyDescriptor) CheckSync() (locked bool, layer int32) {
+ layer = d.GetRequestSpatial()
+ if d.GetParked().IsValid() {
+ return true, layer
+ }
+
+ d.decodeTargetsLock.RLock()
+ defer d.decodeTargetsLock.RUnlock()
+ for _, dt := range d.decodeTargets {
+ if dt.Active() && dt.Layer.Spatial == layer && dt.Valid() {
+ return true, layer
+ }
+ }
+ return false, layer
}
diff --git a/pkg/sfu/videolayerselector/dependencydescriptor_test.go b/pkg/sfu/videolayerselector/dependencydescriptor_test.go
new file mode 100644
index 000000000..21e416691
--- /dev/null
+++ b/pkg/sfu/videolayerselector/dependencydescriptor_test.go
@@ -0,0 +1,377 @@
+package videolayerselector
+
+import (
+ "sort"
+ "testing"
+
+ "github.com/livekit/livekit-server/pkg/sfu/buffer"
+ dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
+ "github.com/livekit/protocol/logger"
+ "github.com/pion/rtp"
+ "github.com/stretchr/testify/require"
+)
+
+func TestDecodeTarget(t *testing.T) {
+ target := buffer.DependencyDescriptorDecodeTarget{
+ Target: 1,
+ Layer: buffer.VideoLayer{Spatial: 1, Temporal: 2},
+ }
+
+ t.Run("No Chain", func(t *testing.T) {
+ dt := NewDecodeTarget(target, nil)
+ require.True(t, dt.Valid())
+ // no indication found
+ _, err := dt.OnFrame(1, &dd.FrameDependencyTemplate{
+ DecodeTargetIndications: []dd.DecodeTargetIndication{},
+ })
+ require.Error(t, err)
+
+ ret, err := dt.OnFrame(1, &dd.FrameDependencyTemplate{
+ DecodeTargetIndications: []dd.DecodeTargetIndication{dd.DecodeTargetNotPresent, dd.DecodeTargetRequired},
+ })
+ require.NoError(t, err)
+ require.True(t, ret.TargetValid)
+ require.Equal(t, dd.DecodeTargetRequired, ret.DTI)
+ })
+
+ t.Run("With Chain", func(t *testing.T) {
+ decisions := NewSelectorDecisionCache(256, 80)
+ chain := NewFrameChain(decisions, 1, logger.GetLogger())
+ dt := NewDecodeTarget(target, chain)
+ chain.BeginUpdateActive()
+ dt.UpdateActive(1 << dt.Target)
+ chain.EndUpdateActive()
+ require.True(t, dt.Active())
+ require.False(t, dt.Valid())
+
+ // chain intact
+ frame := &dd.FrameDependencyTemplate{
+ DecodeTargetIndications: []dd.DecodeTargetIndication{dd.DecodeTargetNotPresent, dd.DecodeTargetRequired},
+ ChainDiffs: []int{0, 0},
+ }
+ chain.OnFrame(1, frame)
+ require.True(t, dt.Valid())
+ ret, err := dt.OnFrame(1, frame)
+ require.NoError(t, err)
+ require.True(t, ret.TargetValid)
+ require.Equal(t, dd.DecodeTargetRequired, ret.DTI)
+
+ })
+}
+
+func TestFrameChain(t *testing.T) {
+ decisions := NewSelectorDecisionCache(256, 3)
+ chain := NewFrameChain(decisions, 0, logger.GetLogger())
+ require.True(t, chain.Broken())
+
+ // chain intact
+ frameNoDiff := &dd.FrameDependencyTemplate{
+ ChainDiffs: []int{0},
+ }
+ // not active
+ require.False(t, chain.OnFrame(1, frameNoDiff))
+
+ chain.BeginUpdateActive()
+ chain.UpdateActive(true)
+ chain.EndUpdateActive()
+
+ require.True(t, chain.OnFrame(1, frameNoDiff))
+ decisions.AddForwarded(1)
+
+ frameDiff1 := &dd.FrameDependencyTemplate{
+ ChainDiffs: []int{1},
+ }
+
+ require.True(t, chain.OnFrame(2, frameDiff1))
+ decisions.AddForwarded(2)
+
+ // frame 5 arrives first , but frame 4 can be recovered by NACK
+ require.True(t, chain.OnFrame(5, frameDiff1))
+ decisions.AddForwarded(5)
+
+ // frame 4 arrives, chain remains intact
+ require.True(t, chain.OnFrame(4, frameDiff1))
+ decisions.AddForwarded(4)
+
+ // frame 3 missed by out of nack range, chain broken
+ decisions.AddForwarded(7)
+ require.True(t, chain.Broken())
+
+ // recovery by non-diff frame
+ require.True(t, chain.OnFrame(1000, frameNoDiff))
+ require.False(t, chain.Broken())
+ decisions.AddForwarded(1000)
+
+ // broken by dropped frame
+ require.True(t, chain.OnFrame(1002, frameDiff1))
+ decisions.AddDropped(1001)
+ require.True(t, chain.Broken())
+
+ // recovery by non-diff frame
+ require.True(t, chain.OnFrame(2000, frameNoDiff))
+ decisions.AddForwarded(2000)
+ decisions.AddDropped(2001)
+ require.False(t, chain.OnFrame(2002, frameDiff1))
+ require.True(t, chain.Broken())
+}
+
+func TestDependencyDescriptor(t *testing.T) {
+ ddSelector := NewDependencyDescriptor(logger.GetLogger())
+ targetLayer := buffer.VideoLayer{Spatial: 1, Temporal: 2}
+ ddSelector.SetTarget(targetLayer)
+ ddSelector.SetRequestSpatial(1)
+
+ // no dd ext, dropped
+ ret := ddSelector.Select(&buffer.ExtPacket{}, 0)
+ require.False(t, ret.IsSelected)
+ require.False(t, ret.IsRelevant)
+
+ // non key frame, dropped
+ ret = ddSelector.Select(&buffer.ExtPacket{
+ KeyFrame: false,
+ DependencyDescriptor: &buffer.ExtDependencyDescriptor{
+ Descriptor: &dd.DependencyDescriptor{
+ FrameNumber: 1,
+ FrameDependencies: &dd.FrameDependencyTemplate{
+ SpatialId: int(targetLayer.Spatial),
+ TemporalId: int(targetLayer.Temporal),
+ },
+ },
+ },
+ }, 0)
+ require.False(t, ret.IsSelected)
+ require.True(t, ret.IsRelevant)
+
+ frames := createDDFrames(buffer.VideoLayer{Spatial: 2, Temporal: 2}, 3)
+ // key frame, update structure and decode targets
+ ret = ddSelector.Select(frames[0], 0)
+ require.True(t, ret.IsSelected)
+ require.Equal(t, ddSelector.GetCurrent(), ddSelector.GetTarget())
+ sync, _ := ddSelector.CheckSync()
+ require.True(t, sync)
+
+ // forward frame belongs to target layer
+ // drop frame exceeds target layer (not present in target layer or lower layer)
+ // forward frame not present in target layer but present in lower layer
+ var (
+ belongTargetCase bool
+ exceedTargetCase bool
+ lowerTargetCase bool
+ )
+ idx := 1
+ var frameForwarded, frameDropped []*buffer.ExtPacket
+ for ; idx < len(frames); idx++ {
+ fd := frames[idx].DependencyDescriptor.Descriptor.FrameDependencies
+ ret = ddSelector.Select(frames[idx], 0)
+ switch {
+ case fd.SpatialId == int(targetLayer.Spatial) && fd.TemporalId == int(targetLayer.Temporal):
+ require.True(t, ret.IsSelected)
+ belongTargetCase = true
+ frameForwarded = append(frameForwarded, frames[idx])
+ case fd.SpatialId < int(targetLayer.Spatial) && fd.TemporalId == 0:
+ require.True(t, ret.IsSelected)
+ lowerTargetCase = true
+ frameForwarded = append(frameForwarded, frames[idx])
+ case fd.SpatialId > int(targetLayer.Spatial) || fd.TemporalId > int(targetLayer.Temporal):
+ require.False(t, ret.IsSelected)
+ exceedTargetCase = true
+ frameDropped = append(frameDropped, frames[idx])
+ }
+
+ if belongTargetCase && exceedTargetCase && lowerTargetCase {
+ break
+ }
+ }
+
+ require.True(t, belongTargetCase && exceedTargetCase && lowerTargetCase)
+
+ // select frame already forwarded
+ ret = ddSelector.Select(frameForwarded[0], 0)
+ require.True(t, ret.IsSelected)
+
+ // drop frame already dropped
+ ret = ddSelector.Select(frameDropped[0], 0)
+ require.False(t, ret.IsSelected)
+
+ // drop frame present but not decodable (dependency frame missed)
+ idx++
+ for ; idx < len(frames); idx++ {
+ fd := frames[idx].DependencyDescriptor.Descriptor.FrameDependencies
+ ret = ddSelector.Select(frames[idx], 0)
+ if fd.SpatialId == int(targetLayer.Spatial) && fd.TemporalId == int(targetLayer.Temporal) {
+ break
+ }
+ }
+ notDecodableFrame := frames[idx]
+ notDecodableFrame.DependencyDescriptor.Descriptor.FrameDependencies.FrameDiffs = []int{
+ int(notDecodableFrame.DependencyDescriptor.Descriptor.FrameNumber - frameDropped[0].DependencyDescriptor.Descriptor.FrameNumber),
+ }
+ ret = ddSelector.Select(notDecodableFrame, 0)
+ require.False(t, ret.IsSelected)
+
+ // target layer broken
+ idx++
+ for ; idx < len(frames); idx++ {
+ fd := frames[idx].DependencyDescriptor.Descriptor.FrameDependencies
+ ret = ddSelector.Select(frames[idx], 0)
+ if fd.SpatialId == int(targetLayer.Spatial) && fd.TemporalId == int(targetLayer.Temporal) {
+ break
+ }
+ }
+ brokenFrame := frames[idx]
+ brokenFrame.DependencyDescriptor.Descriptor.FrameDependencies.ChainDiffs[targetLayer.Spatial] =
+ int(notDecodableFrame.DependencyDescriptor.Descriptor.FrameNumber - frameDropped[0].DependencyDescriptor.Descriptor.FrameNumber)
+ ret = ddSelector.Select(brokenFrame, 0)
+ require.False(t, ret.IsSelected)
+
+ // switch to lower layer, forward frame
+ idx++
+ var switchToLower bool
+ for ; idx < len(frames); idx++ {
+ ret = ddSelector.Select(frames[idx], 0)
+ if ret.IsSelected {
+ require.True(t, targetLayer.GreaterThan(ddSelector.GetCurrent()))
+ switchToLower = true
+ break
+ }
+ }
+ require.True(t, switchToLower)
+
+ // not sync with requested layer
+ ddSelector.SetRequestSpatial(targetLayer.Spatial)
+ locked, layer := ddSelector.CheckSync()
+ require.False(t, locked)
+ require.Equal(t, targetLayer.Spatial, layer)
+
+ // request to current layer, sync
+ ddSelector.SetRequestSpatial(ddSelector.GetCurrent().Spatial)
+ locked, _ = ddSelector.CheckSync()
+ require.True(t, locked)
+}
+
+func createDDFrames(maxLayer buffer.VideoLayer, startFrameNumber uint16) []*buffer.ExtPacket {
+ var frames []*buffer.ExtPacket
+ var activeBitMask uint32
+ var decodeTargets []buffer.DependencyDescriptorDecodeTarget
+ var decodeTargetsProtectByChain []int
+ for i := 0; i <= int(maxLayer.Spatial); i++ {
+ for j := 0; j <= int(maxLayer.Temporal); j++ {
+ decodeTargets = append(decodeTargets, buffer.DependencyDescriptorDecodeTarget{
+ Target: i*int(maxLayer.Temporal+1) + j,
+ Layer: buffer.VideoLayer{Spatial: int32(i), Temporal: int32(j)},
+ })
+ decodeTargetsProtectByChain = append(decodeTargetsProtectByChain, i)
+ activeBitMask |= 1 << uint(i*int(maxLayer.Temporal+1)+j)
+ }
+ }
+ sort.Slice(decodeTargets, func(i, j int) bool {
+ return decodeTargets[i].Layer.GreaterThan(decodeTargets[j].Layer)
+ })
+
+ chainDiffs := make([]int, len(decodeTargets))
+ dtis := make([]dd.DecodeTargetIndication, len(decodeTargets))
+ for _, dt := range decodeTargets {
+ dtis[dt.Target] = dd.DecodeTargetSwitch
+ }
+
+ templates := make([]*dd.FrameDependencyTemplate, len(decodeTargets))
+
+ for _, dt := range decodeTargets {
+ templates[dt.Target] = &dd.FrameDependencyTemplate{
+ SpatialId: int(dt.Layer.Spatial),
+ TemporalId: int(dt.Layer.Temporal),
+ ChainDiffs: chainDiffs,
+ DecodeTargetIndications: dtis,
+ }
+ }
+ keyFrame := &buffer.ExtPacket{
+ KeyFrame: true,
+ DependencyDescriptor: &buffer.ExtDependencyDescriptor{
+ Descriptor: &dd.DependencyDescriptor{
+ FrameNumber: startFrameNumber,
+ FrameDependencies: &dd.FrameDependencyTemplate{
+ SpatialId: 0,
+ TemporalId: 0,
+ ChainDiffs: chainDiffs,
+ DecodeTargetIndications: dtis,
+ },
+ AttachedStructure: &dd.FrameDependencyStructure{
+ NumDecodeTargets: int((maxLayer.Spatial + 1) * (maxLayer.Temporal + 1)),
+ NumChains: int(maxLayer.Spatial) + 1,
+ DecodeTargetProtectedByChain: decodeTargetsProtectByChain,
+ Templates: templates,
+ },
+ ActiveDecodeTargetsBitmask: &activeBitMask,
+ },
+ DecodeTargets: decodeTargets,
+ StructureUpdated: true,
+ ActiveDecodeTargetsUpdated: true,
+ },
+ Packet: &rtp.Packet{
+ Header: rtp.Header{
+ SSRC: 1234,
+ },
+ },
+ }
+
+ frames = append(frames, keyFrame)
+
+ chainPrevFrame := make(map[int]int)
+ for i := 0; i <= int(maxLayer.Spatial); i++ {
+ chainPrevFrame[i] = int(startFrameNumber)
+ }
+ startFrameNumber++
+ for i := 0; i < 10; i++ {
+ for j := len(decodeTargets) - 1; j >= 0; j-- {
+ dt := decodeTargets[j]
+ frameChainDiffs := make([]int, len(chainDiffs))
+ for i := range frameChainDiffs {
+ frameChainDiffs[i] = int(startFrameNumber) - chainPrevFrame[i]
+ }
+
+ frameDtis := make([]dd.DecodeTargetIndication, len(dtis))
+ for k := range frameDtis {
+ if k >= dt.Target {
+ if dt.Layer.Temporal == 0 {
+ frameDtis[k] = dd.DecodeTargetRequired
+ } else {
+ frameDtis[k] = dd.DecodeTargetDiscardable
+ }
+ } else {
+ frameDtis[k] = dd.DecodeTargetNotPresent
+ }
+ }
+
+ frame := &buffer.ExtPacket{
+ KeyFrame: true,
+ DependencyDescriptor: &buffer.ExtDependencyDescriptor{
+ Descriptor: &dd.DependencyDescriptor{
+ FrameNumber: startFrameNumber,
+ FrameDependencies: &dd.FrameDependencyTemplate{
+ SpatialId: int(dt.Layer.Spatial),
+ TemporalId: int(dt.Layer.Temporal),
+ ChainDiffs: frameChainDiffs,
+ DecodeTargetIndications: frameDtis,
+ },
+ },
+ DecodeTargets: decodeTargets,
+ },
+ Packet: &rtp.Packet{
+ Header: rtp.Header{
+ SSRC: 1234,
+ },
+ },
+ }
+
+ startFrameNumber++
+
+ if dt.Layer.Temporal == 0 {
+ chainPrevFrame[int(dt.Layer.Spatial)] = int(startFrameNumber)
+ }
+
+ frames = append(frames, frame)
+ }
+ }
+
+ return frames
+}
diff --git a/pkg/sfu/videolayerselector/framechain.go b/pkg/sfu/videolayerselector/framechain.go
new file mode 100644
index 000000000..60cb4ea73
--- /dev/null
+++ b/pkg/sfu/videolayerselector/framechain.go
@@ -0,0 +1,114 @@
+package videolayerselector
+
+import (
+ dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
+ "github.com/livekit/protocol/logger"
+)
+
+type FrameChain struct {
+ logger logger.Logger
+ decisions *SelectorDecisionCache
+ broken bool
+ chainIdx int
+ active bool
+ updatingActive bool
+
+ expectFrames []uint64
+}
+
+func NewFrameChain(decisions *SelectorDecisionCache, chainIdx int, logger logger.Logger) *FrameChain {
+ return &FrameChain{
+ logger: logger,
+ decisions: decisions,
+ broken: true,
+ chainIdx: chainIdx,
+ active: false,
+ }
+}
+
+func (fc *FrameChain) OnFrame(extFrameNum uint64, fd *dd.FrameDependencyTemplate) bool {
+ if !fc.active {
+ return false
+ }
+
+ // A decodable frame with frame_chain_fdiff equal to 0 indicates that the Chain is intact.
+ if fd.ChainDiffs[fc.chainIdx] == 0 {
+ if fc.broken {
+ fc.broken = false
+ fc.logger.Debugw("frame chain intact", "chanIdx", fc.chainIdx)
+ }
+ fc.expectFrames = fc.expectFrames[:0]
+ return true
+ }
+
+ if fc.broken {
+ return false
+ }
+
+ prevFrameInChain := extFrameNum - uint64(fd.ChainDiffs[fc.chainIdx])
+ sd, err := fc.decisions.GetDecision(prevFrameInChain)
+ if err != nil {
+ fc.logger.Debugw("could not get decision", "err", err, "frame", extFrameNum, "prevFrame", prevFrameInChain)
+ }
+
+ var intact bool
+ switch {
+ case sd == selectorDecisionForwarded:
+ intact = true
+
+ case sd == selectorDecisionUnknown:
+ // If the previous frame is unknown, means it has not arrived but could be recovered by NACK / out-of-order arrival,
+ // set up a expected callback here to determine if the chain is broken or intact
+ if fc.decisions.ExpectDecision(prevFrameInChain, fc.OnExpectFrameChanged) {
+ intact = true
+ fc.expectFrames = append(fc.expectFrames, prevFrameInChain)
+ }
+ }
+
+ if !intact {
+ fc.broken = true
+ fc.logger.Debugw("frame chain broken", "chanIdx", fc.chainIdx, "sd", sd, "frame", extFrameNum, "prevFrame", prevFrameInChain)
+ }
+ return intact
+}
+
+func (fc *FrameChain) OnExpectFrameChanged(frameNum uint64, decision selectorDecision) {
+ for i, f := range fc.expectFrames {
+ if f == frameNum {
+ if decision != selectorDecisionForwarded {
+ fc.broken = true
+ }
+ fc.expectFrames[i] = fc.expectFrames[len(fc.expectFrames)-1]
+ fc.expectFrames = fc.expectFrames[:len(fc.expectFrames)-1]
+ break
+ }
+ }
+}
+
+func (fc *FrameChain) Broken() bool {
+ return fc.broken
+}
+
+func (fc *FrameChain) BeginUpdateActive() {
+ fc.updatingActive = false
+}
+
+func (fc *FrameChain) UpdateActive(active bool) {
+ fc.updatingActive = fc.updatingActive || active
+}
+
+func (fc *FrameChain) EndUpdateActive() {
+ active := fc.updatingActive
+ fc.updatingActive = false
+
+ if active == fc.active {
+ return
+ }
+
+ // if the chain transit from inactive to active, reset broken to wait a decodable SWITCH frame
+ if !fc.active {
+ fc.broken = true
+ }
+
+ fc.active = active
+}
diff --git a/pkg/sfu/videolayerselector/selectordecisioncache.go b/pkg/sfu/videolayerselector/selectordecisioncache.go
index b83bf9559..c98d13984 100644
--- a/pkg/sfu/videolayerselector/selectordecisioncache.go
+++ b/pkg/sfu/videolayerselector/selectordecisioncache.go
@@ -33,18 +33,23 @@ func (s selectorDecision) String() string {
// ----------------------------------------------------------------------
type SelectorDecisionCache struct {
- initialized bool
- base uint64
- last uint64
- masks []uint64
- numEntries uint64
+ initialized bool
+ base uint64
+ last uint64
+ masks []uint64
+ numEntries uint64
+ numNackEntries uint64
+
+ onExpectEntityChanged map[uint64][]func(entity uint64, decision selectorDecision)
}
-func NewSelectorDecisionCache(maxNumElements uint64) *SelectorDecisionCache {
+func NewSelectorDecisionCache(maxNumElements uint64, numNackEntries uint64) *SelectorDecisionCache {
numElements := (maxNumElements*2 + 63) / 64
return &SelectorDecisionCache{
- masks: make([]uint64, numElements),
- numEntries: numElements * 32, // 2 bits per entry
+ masks: make([]uint64, numElements),
+ numEntries: numElements * 32, // 2 bits per entry
+ numNackEntries: numNackEntries,
+ onExpectEntityChanged: make(map[uint64][]func(entity uint64, decision selectorDecision)),
}
}
@@ -57,19 +62,39 @@ func (s *SelectorDecisionCache) AddDropped(entity uint64) {
}
func (s *SelectorDecisionCache) GetDecision(entity uint64) (selectorDecision, error) {
- if !s.initialized || entity > s.last || entity < s.base {
+ if !s.initialized || entity < s.base {
+ return selectorDecisionMissing, nil
+ }
+
+ if entity > s.last {
return selectorDecisionUnknown, nil
}
offset := s.last - entity
if offset >= s.numEntries {
// asking for something too old
- return selectorDecisionUnknown, fmt.Errorf("too old, oldest: %d, asking: %d", s.last-s.numEntries+1, entity)
+ return selectorDecisionMissing, fmt.Errorf("too old, oldest: %d, asking: %d", s.last-s.numEntries+1, entity)
}
return s.getEntity(entity), nil
}
+func (s *SelectorDecisionCache) ExpectDecision(entity uint64, f func(entity uint64, decision selectorDecision)) bool {
+ if !s.initialized || entity < s.base {
+ return false
+ }
+
+ if entity < s.last {
+ offset := s.last - entity
+ if offset >= s.numEntries {
+ return false // too old
+ }
+ }
+
+ s.onExpectEntityChanged[entity] = append(s.onExpectEntityChanged[entity], f)
+ return true
+}
+
func (s *SelectorDecisionCache) addEntity(entity uint64, sd selectorDecision) {
if !s.initialized {
s.initialized = true
@@ -90,16 +115,60 @@ func (s *SelectorDecisionCache) addEntity(entity uint64, sd selectorDecision) {
}
for e := s.last + 1; e != entity; e++ {
- s.setEntity(e, selectorDecisionMissing)
+ s.setEntity(e, selectorDecisionUnknown)
}
+
+ // update [last+1-nack, entity-nack) to missing
+ missingStart := s.last
+ if missingStart > s.numNackEntries+s.base {
+ missingStart -= s.numNackEntries
+ } else {
+ missingStart = s.base
+ }
+ missingEnd := entity
+ if missingEnd > s.numNackEntries+s.base {
+ missingEnd -= s.numNackEntries
+ } else {
+ missingEnd = s.base
+ }
+ if missingEnd > missingStart {
+ for e := missingStart; e != missingEnd; e++ {
+ s.setEntityIfUnknown(e, selectorDecisionMissing)
+ }
+ }
+
s.setEntity(entity, sd)
s.last = entity
+
+ for e, fns := range s.onExpectEntityChanged {
+ if e+s.numEntries < s.last {
+ delete(s.onExpectEntityChanged, e)
+ for _, f := range fns {
+ f(e, selectorDecisionMissing)
+ }
+ }
+ }
+}
+
+func (s *SelectorDecisionCache) setEntityIfUnknown(entity uint64, sd selectorDecision) {
+ if s.getEntity(entity) == selectorDecisionUnknown {
+ s.setEntity(entity, sd)
+ }
}
func (s *SelectorDecisionCache) setEntity(entity uint64, sd selectorDecision) {
index, bitpos := s.getPos(entity)
s.masks[index] &= ^(0x3 << bitpos) // clear before bitwise OR
s.masks[index] |= (uint64(sd) & 0x3) << bitpos
+
+ if sd != selectorDecisionUnknown {
+ if fns, ok := s.onExpectEntityChanged[entity]; ok {
+ delete(s.onExpectEntityChanged, entity)
+ for _, f := range fns {
+ f(entity, sd)
+ }
+ }
+ }
}
func (s *SelectorDecisionCache) getEntity(entity uint64) selectorDecision {
diff --git a/pkg/sfu/videolayerselector/videolayerselector.go b/pkg/sfu/videolayerselector/videolayerselector.go
index fa1e71d83..b108976bf 100644
--- a/pkg/sfu/videolayerselector/videolayerselector.go
+++ b/pkg/sfu/videolayerselector/videolayerselector.go
@@ -32,6 +32,8 @@ type VideoLayerSelector interface {
SetRequestSpatial(layer int32)
GetRequestSpatial() int32
+ CheckSync() (locked bool, layer int32)
+
SetMaxSeen(maxSeenLayer buffer.VideoLayer)
SetMaxSeenSpatial(layer int32)
SetMaxSeenTemporal(layer int32)