From 5b975af55fdf7d813991bd9d306f54980ee929b1 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Tue, 27 Jun 2023 15:11:06 +0800 Subject: [PATCH] Refine dependency descriptor based selection forwarder (#1808) * Don't update dependency info if unordered packet received * Trace all active svc chains for downtrack * Try to keep lower decode target decodable * remove comments * Test case * clean code * solve comments --- pkg/sfu/buffer/buffer.go | 2 +- pkg/sfu/buffer/dependencydescriptorparser.go | 90 +++-- pkg/sfu/buffer/fps_test.go | 2 +- pkg/sfu/forwarder.go | 4 +- pkg/sfu/receiver.go | 16 +- pkg/sfu/streamtracker/interfaces.go | 2 +- pkg/sfu/streamtracker/streamtracker.go | 2 +- pkg/sfu/streamtracker/streamtracker_dd.go | 5 +- .../streamtracker/streamtracker_dd_test.go | 7 +- pkg/sfu/videolayerselector/base.go | 6 + pkg/sfu/videolayerselector/decodetarget.go | 56 +++ .../dependencydescriptor.go | 280 +++++++------ .../dependencydescriptor_test.go | 377 ++++++++++++++++++ pkg/sfu/videolayerselector/framechain.go | 114 ++++++ .../selectordecisioncache.go | 91 ++++- .../videolayerselector/videolayerselector.go | 2 + 16 files changed, 851 insertions(+), 205 deletions(-) create mode 100644 pkg/sfu/videolayerselector/decodetarget.go create mode 100644 pkg/sfu/videolayerselector/dependencydescriptor_test.go create mode 100644 pkg/sfu/videolayerselector/framechain.go diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index bc5b45e20..b242662dd 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -42,7 +42,7 @@ type ExtPacket struct { Payload interface{} KeyFrame bool RawPacket []byte - DependencyDescriptor *DependencyDescriptorWithDecodeTarget + DependencyDescriptor *ExtDependencyDescriptor } // Buffer contains all packets diff --git a/pkg/sfu/buffer/dependencydescriptorparser.go b/pkg/sfu/buffer/dependencydescriptorparser.go index c52e86c83..71b4aac2e 100644 --- a/pkg/sfu/buffer/dependencydescriptorparser.go +++ b/pkg/sfu/buffer/dependencydescriptorparser.go @@ -7,6 +7,7 @@ import ( "github.com/pion/rtp" dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + "github.com/livekit/livekit-server/pkg/sfu/utils" "github.com/livekit/protocol/logger" ) @@ -17,6 +18,11 @@ type DependencyDescriptorParser struct { logger logger.Logger onMaxLayerChanged func(int32, int32) decodeTargets []DependencyDescriptorDecodeTarget + + wrapAround *utils.WrapAround[uint16, uint64] + structureExtSeq uint64 + activeDecodeTargetsExtSeq uint64 + activeDecodeTargetsMask uint32 } func NewDependencyDescriptorParser(ddExtID uint8, logger logger.Logger, onMaxLayerChanged func(int32, int32)) *DependencyDescriptorParser { @@ -25,16 +31,19 @@ func NewDependencyDescriptorParser(ddExtID uint8, logger logger.Logger, onMaxLay ddExtID: ddExtID, logger: logger, onMaxLayerChanged: onMaxLayerChanged, + wrapAround: utils.NewWrapAround[uint16, uint64](), } } -type DependencyDescriptorWithDecodeTarget struct { - Descriptor *dd.DependencyDescriptor - DecodeTargets []DependencyDescriptorDecodeTarget +type ExtDependencyDescriptor struct { + Descriptor *dd.DependencyDescriptor + + DecodeTargets []DependencyDescriptorDecodeTarget + StructureUpdated bool + ActiveDecodeTargetsUpdated bool } -func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*DependencyDescriptorWithDecodeTarget, VideoLayer, error) { - // DD-TODO: make sure out-of-order RTP packets do not update decode targets +func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescriptor, VideoLayer, error) { var videoLayer VideoLayer ddBuf := pkt.GetExtension(r.ddExtID) if ddBuf == nil { @@ -52,49 +61,53 @@ func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*DependencyDescript return nil, videoLayer, err } + extSeq := r.wrapAround.Update(pkt.SequenceNumber).ExtendedVal + if ddVal.FrameDependencies != nil { videoLayer.Spatial, videoLayer.Temporal = int32(ddVal.FrameDependencies.SpatialId), int32(ddVal.FrameDependencies.TemporalId) } - if ddVal.AttachedStructure != nil && !ddVal.FirstPacketInFrame { - // r.logger.Debugw("ignoring non-first packet in frame with attached structure") - return nil, videoLayer, nil + + extDD := &ExtDependencyDescriptor{ + Descriptor: &ddVal, } if ddVal.AttachedStructure != nil { - r.structure = ddVal.AttachedStructure - r.decodeTargets = ProcessFrameDependencyStructure(ddVal.AttachedStructure) - if len(r.decodeTargets) != 0 { - r.logger.Debugw(fmt.Sprintf("update decode targets: %v", r.decodeTargets)) - r.onMaxLayerChanged(r.decodeTargets[0].Layer.Spatial, r.decodeTargets[0].Layer.Temporal) + r.logger.Debugw(fmt.Sprintf("parsed dependency descriptor\n%s", ddVal.String())) + if extSeq > r.structureExtSeq { + r.structure = ddVal.AttachedStructure + r.decodeTargets = ProcessFrameDependencyStructure(ddVal.AttachedStructure) + r.structureExtSeq = extSeq + extDD.StructureUpdated = true + extDD.ActiveDecodeTargetsUpdated = true + // The dependency descriptor reader will always set ActiveDecodeTargetsBitmask for TemplateDependencyStructure is present, + // so don't need to notify max layer change here. } } - if ddVal.AttachedStructure != nil && ddVal.FirstPacketInFrame { - r.logger.Debugw(fmt.Sprintf("parsed dependency descriptor\n%s", ddVal.String())) - } - - if mask := ddVal.ActiveDecodeTargetsBitmask; mask != nil { - var maxSpatial, maxTemporal int32 - for _, dt := range r.decodeTargets { - if *mask&(1< r.activeDecodeTargetsExtSeq { + r.activeDecodeTargetsExtSeq = extSeq + if *mask != r.activeDecodeTargetsMask { + r.activeDecodeTargetsMask = *mask + extDD.ActiveDecodeTargetsUpdated = true + var maxSpatial, maxTemporal int32 + for _, dt := range r.decodeTargets { + if *mask&(1< maxSpatial { - maxSpatial = dt.Layer.Spatial - } - if dt.Layer.Temporal > maxTemporal { - maxTemporal = dt.Layer.Temporal - } if dt.Layer.Spatial <= layer.Spatial && dt.Layer.Temporal <= layer.Temporal { activeBitMask |= 1 << dt.Target } } - if layer.Spatial == maxSpatial && layer.Temporal == maxTemporal { - // all the decode targets are selected - return nil - } return &activeBitMask } diff --git a/pkg/sfu/buffer/fps_test.go b/pkg/sfu/buffer/fps_test.go index 206041acf..b090d2770 100644 --- a/pkg/sfu/buffer/fps_test.go +++ b/pkg/sfu/buffer/fps_test.go @@ -31,7 +31,7 @@ func (f *testFrameInfo) toVP8() *ExtPacket { func (f *testFrameInfo) toDD() *ExtPacket { return &ExtPacket{ Packet: &rtp.Packet{Header: f.header}, - DependencyDescriptor: &DependencyDescriptorWithDecodeTarget{ + DependencyDescriptor: &ExtDependencyDescriptor{ Descriptor: &dependencydescriptor.DependencyDescriptor{ FrameNumber: f.framenumber, FrameDependencies: &dependencydescriptor.FrameDependencyTemplate{ diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 8929cf736..75e00baa9 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1409,9 +1409,7 @@ func (f *Forwarder) CheckSync() (locked bool, layer int32) { f.lock.RLock() defer f.lock.RUnlock() - layer = f.vls.GetRequestSpatial() - locked = layer == f.vls.GetCurrent().Spatial || f.vls.GetParked().IsValid() - return + return f.vls.CheckSync() } func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLayers [buffer.DefaultMaxLayerSpatial + 1]bool) { diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 80ef3f007..26ca98ad7 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -645,6 +645,14 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { } } + w.downTrackSpreader.Broadcast(func(dt TrackSender) { + _ = dt.WriteRTP(pkt, spatialLayer) + }) + + if redPktWriter != nil { + redPktWriter(pkt, spatialLayer) + } + if spatialTracker != nil { spatialTracker.Observe( pkt.Temporal, @@ -655,14 +663,6 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { pkt.DependencyDescriptor, ) } - - w.downTrackSpreader.Broadcast(func(dt TrackSender) { - _ = dt.WriteRTP(pkt, spatialLayer) - }) - - if redPktWriter != nil { - redPktWriter(pkt, spatialLayer) - } } } diff --git a/pkg/sfu/streamtracker/interfaces.go b/pkg/sfu/streamtracker/interfaces.go index 934032f68..a9135e631 100644 --- a/pkg/sfu/streamtracker/interfaces.go +++ b/pkg/sfu/streamtracker/interfaces.go @@ -52,5 +52,5 @@ type StreamTrackerWorker interface { Status() StreamStatus BitrateTemporalCumulative() []int64 SetPaused(paused bool) - Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, dd *buffer.DependencyDescriptorWithDecodeTarget) + Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, dd *buffer.ExtDependencyDescriptor) } diff --git a/pkg/sfu/streamtracker/streamtracker.go b/pkg/sfu/streamtracker/streamtracker.go index a1b645fb2..1be7dc0fc 100644 --- a/pkg/sfu/streamtracker/streamtracker.go +++ b/pkg/sfu/streamtracker/streamtracker.go @@ -176,7 +176,7 @@ func (s *StreamTracker) Observe( payloadSize int, hasMarker bool, ts uint32, - _ *buffer.DependencyDescriptorWithDecodeTarget, + _ *buffer.ExtDependencyDescriptor, ) { s.lock.Lock() diff --git a/pkg/sfu/streamtracker/streamtracker_dd.go b/pkg/sfu/streamtracker/streamtracker_dd.go index b6daee387..2d7301f7c 100644 --- a/pkg/sfu/streamtracker/streamtracker_dd.go +++ b/pkg/sfu/streamtracker/streamtracker_dd.go @@ -123,7 +123,7 @@ func (s *StreamTrackerDependencyDescriptor) SetPaused(paused bool) { } -func (s *StreamTrackerDependencyDescriptor) Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, ddVal *buffer.DependencyDescriptorWithDecodeTarget) { +func (s *StreamTrackerDependencyDescriptor) Observe(temporalLayer int32, pktSize int, payloadSize int, hasMarker bool, ts uint32, ddVal *buffer.ExtDependencyDescriptor) { s.lock.Lock() if s.isStopped || s.paused || payloadSize == 0 || ddVal == nil { @@ -133,7 +133,7 @@ func (s *StreamTrackerDependencyDescriptor) Observe(temporalLayer int32, pktSize var notifyFns []func(status StreamStatus) var notifyStatus StreamStatus - if mask := ddVal.Descriptor.ActiveDecodeTargetsBitmask; mask != nil { + if mask := ddVal.Descriptor.ActiveDecodeTargetsBitmask; mask != nil && ddVal.ActiveDecodeTargetsUpdated { var maxSpatial, maxTemporal int32 for _, dt := range ddVal.DecodeTargets { if *mask&(1< d.targetLayer.Spatial || dt.Layer.Temporal > d.targetLayer.Temporal { + continue + } + + frameResult, err := dt.OnFrame(extFrameNum, fd) + if err != nil { + d.decodeTargetsLock.RUnlock() + // dtis error, dependency descriptor might lost + d.logger.Debugw(fmt.Sprintf("drop packet for frame detection error, incoming: %v", + incomingLayer, + ), "err", err) + d.decisions.AddDropped(extFrameNum) + return + } + + // Keep forwarding the lower spatial with temporal layer 0 to keep the lower frame chain intact, + // it will cost a few extra bits as those frames might not be present in the current target + // but will make the subscriber switch to lower layer seamlessly without pli. + if frameResult.TargetValid { + if highestDecodeTarget.Target == -1 { + highestDecodeTarget = dt.DependencyDescriptorDecodeTarget + dti = frameResult.DTI + } else if dt.Layer.Spatial < highestDecodeTarget.Layer.Spatial && dt.Layer.Temporal == 0 && + frameResult.DTI != dede.DecodeTargetNotPresent && frameResult.DTI != dede.DecodeTargetDiscardable { + dti = frameResult.DTI + } + } + } + d.decodeTargetsLock.RUnlock() + + // DD-TODO : we don't have a rtp queue to ensure the order of packets now, + // so we don't know packet is lost/out of order, that cause us can't detect + // frame integrity, entire frame is forwareded, whether frame chain is broken. + // So use a simple check here, assume all the reference frame is forwarded and + // only check DTI of the active decode target. + // it is not effeciency, at last we need check frame chain integrity. + + if highestDecodeTarget.Target < 0 { + // no active decode target, do not select + // d.logger.Debugw(fmt.Sprintf("drop packet for no target found, decodeTargets %v, tagetLayer %v, incoming %v", + // d.decodeTargets, + // d.targetLayer, + // incomingLayer, + // )) + d.decisions.AddDropped(extFrameNum) + return + } + + // // DD-TODO : if bandwidth in congest, could drop the 'Discardable' frame + if dti == dede.DecodeTargetNotPresent { + // d.logger.Debugw(fmt.Sprintf("drop packet for decode target not present, highestDecodeTarget %d, incoming %v, fn: %d/%d", + // highestDecodeTarget, + // incomingLayer, + // dd.FrameNumber, + // extFrameNum, + // )) + d.decisions.AddDropped(extFrameNum) + return + } + // check decodability using reference frames isDecodable := true for _, fdiff := range fd.FrameDiffs { @@ -90,122 +170,14 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r continue } - if sd, _ := d.decisions.GetDecision(extFrameNum - uint64(fdiff)); sd != selectorDecisionForwarded { + // use relaxed check for frame diff that we have chain intact detection and don't want + // to drop packet due to out-of-order packet or recoverable packet loss + if sd, _ := d.decisions.GetDecision(extFrameNum - uint64(fdiff)); sd == selectorDecisionDropped { isDecodable = false break } } if !isDecodable { - // DD-TODO START - // Not decodable could happen due to packet loss or out-of-order packets, - // Need to figure out better ways to handle this. - // - // 1. Should definitely check if this frame is not part of current decode target OR discardable. - // In that case, forwarding can proceed without disruption. - // 2. Add a packet queue and try to de-jitter for some time. Safest is to packet copy to local queue on - // all down tracks. - // 3. Force a PLI and wait for a key frame. - // DD-TODO END - d.decisions.AddDropped(extFrameNum) - return - } - - // DD-TODO should not update for out-of-order RTP packets - if dd.AttachedStructure != nil { - // update decode target layer and active decode targets - // DD-TODO : these targets info can be shared by all the downtracks, no need calculate in every selector - d.updateDependencyStructure(dd.AttachedStructure) - } - - // DD-TODO : we don't have a rtp queue to ensure the order of packets now, - // so we don't know packet is lost/out of order, that cause us can't detect - // frame integrity, entire frame is forwareded, whether frame chain is broken. - // So use a simple check here, assume all the reference frame is forwarded and - // only check DTI of the active decode target. - // it is not effeciency, at last we need check frame chain integrity. - - activeDecodeTargets := dd.ActiveDecodeTargetsBitmask - if activeDecodeTargets != nil { - d.logger.Debugw("active decode targets", "activeDecodeTargets", *activeDecodeTargets) - } - - // find decode target closest to targetLayer - highestDecodeTarget := buffer.DependencyDescriptorDecodeTarget{ - Target: -1, - Layer: buffer.InvalidLayer, - } - for _, dt := range ddwdt.DecodeTargets { - if dt.Layer.Spatial > d.targetLayer.Spatial || dt.Layer.Temporal > d.targetLayer.Temporal { - continue - } - - if activeDecodeTargets != nil && ((*activeDecodeTargets)&(1< 0, each Decode target MUST be protected by exactly one Chain. + if structure.NumChains > 0 { + chainIdx := structure.DecodeTargetProtectedByChain[dt.Target] + if chainIdx >= len(d.chains) { + // should not happen + d.logger.Errorw("DecodeTargetProtectedByChain chainIdx out of range", nil, "chainIdx", chainIdx, "NumChains", len(d.chains)) + } else { + chain = d.chains[chainIdx] + } + } + newTargets = append(newTargets, NewDecodeTarget(dt, chain)) + } + d.decodeTargetsLock.Lock() + d.decodeTargets = newTargets + d.decodeTargetsLock.Unlock() +} + +func (d *DependencyDescriptor) updateActiveDecodeTargets(activeDecodeTargetsBitmask uint32) { + for _, chain := range d.chains { + chain.BeginUpdateActive() + } + + d.decodeTargetsLock.RLock() + for _, dt := range d.decodeTargets { + dt.UpdateActive(activeDecodeTargetsBitmask) + } + d.decodeTargetsLock.RUnlock() + + for _, chain := range d.chains { + chain.EndUpdateActive() + } +} + +func (d *DependencyDescriptor) CheckSync() (locked bool, layer int32) { + layer = d.GetRequestSpatial() + if d.GetParked().IsValid() { + return true, layer + } + + d.decodeTargetsLock.RLock() + defer d.decodeTargetsLock.RUnlock() + for _, dt := range d.decodeTargets { + if dt.Active() && dt.Layer.Spatial == layer && dt.Valid() { + return true, layer + } + } + return false, layer } diff --git a/pkg/sfu/videolayerselector/dependencydescriptor_test.go b/pkg/sfu/videolayerselector/dependencydescriptor_test.go new file mode 100644 index 000000000..21e416691 --- /dev/null +++ b/pkg/sfu/videolayerselector/dependencydescriptor_test.go @@ -0,0 +1,377 @@ +package videolayerselector + +import ( + "sort" + "testing" + + "github.com/livekit/livekit-server/pkg/sfu/buffer" + dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + "github.com/livekit/protocol/logger" + "github.com/pion/rtp" + "github.com/stretchr/testify/require" +) + +func TestDecodeTarget(t *testing.T) { + target := buffer.DependencyDescriptorDecodeTarget{ + Target: 1, + Layer: buffer.VideoLayer{Spatial: 1, Temporal: 2}, + } + + t.Run("No Chain", func(t *testing.T) { + dt := NewDecodeTarget(target, nil) + require.True(t, dt.Valid()) + // no indication found + _, err := dt.OnFrame(1, &dd.FrameDependencyTemplate{ + DecodeTargetIndications: []dd.DecodeTargetIndication{}, + }) + require.Error(t, err) + + ret, err := dt.OnFrame(1, &dd.FrameDependencyTemplate{ + DecodeTargetIndications: []dd.DecodeTargetIndication{dd.DecodeTargetNotPresent, dd.DecodeTargetRequired}, + }) + require.NoError(t, err) + require.True(t, ret.TargetValid) + require.Equal(t, dd.DecodeTargetRequired, ret.DTI) + }) + + t.Run("With Chain", func(t *testing.T) { + decisions := NewSelectorDecisionCache(256, 80) + chain := NewFrameChain(decisions, 1, logger.GetLogger()) + dt := NewDecodeTarget(target, chain) + chain.BeginUpdateActive() + dt.UpdateActive(1 << dt.Target) + chain.EndUpdateActive() + require.True(t, dt.Active()) + require.False(t, dt.Valid()) + + // chain intact + frame := &dd.FrameDependencyTemplate{ + DecodeTargetIndications: []dd.DecodeTargetIndication{dd.DecodeTargetNotPresent, dd.DecodeTargetRequired}, + ChainDiffs: []int{0, 0}, + } + chain.OnFrame(1, frame) + require.True(t, dt.Valid()) + ret, err := dt.OnFrame(1, frame) + require.NoError(t, err) + require.True(t, ret.TargetValid) + require.Equal(t, dd.DecodeTargetRequired, ret.DTI) + + }) +} + +func TestFrameChain(t *testing.T) { + decisions := NewSelectorDecisionCache(256, 3) + chain := NewFrameChain(decisions, 0, logger.GetLogger()) + require.True(t, chain.Broken()) + + // chain intact + frameNoDiff := &dd.FrameDependencyTemplate{ + ChainDiffs: []int{0}, + } + // not active + require.False(t, chain.OnFrame(1, frameNoDiff)) + + chain.BeginUpdateActive() + chain.UpdateActive(true) + chain.EndUpdateActive() + + require.True(t, chain.OnFrame(1, frameNoDiff)) + decisions.AddForwarded(1) + + frameDiff1 := &dd.FrameDependencyTemplate{ + ChainDiffs: []int{1}, + } + + require.True(t, chain.OnFrame(2, frameDiff1)) + decisions.AddForwarded(2) + + // frame 5 arrives first , but frame 4 can be recovered by NACK + require.True(t, chain.OnFrame(5, frameDiff1)) + decisions.AddForwarded(5) + + // frame 4 arrives, chain remains intact + require.True(t, chain.OnFrame(4, frameDiff1)) + decisions.AddForwarded(4) + + // frame 3 missed by out of nack range, chain broken + decisions.AddForwarded(7) + require.True(t, chain.Broken()) + + // recovery by non-diff frame + require.True(t, chain.OnFrame(1000, frameNoDiff)) + require.False(t, chain.Broken()) + decisions.AddForwarded(1000) + + // broken by dropped frame + require.True(t, chain.OnFrame(1002, frameDiff1)) + decisions.AddDropped(1001) + require.True(t, chain.Broken()) + + // recovery by non-diff frame + require.True(t, chain.OnFrame(2000, frameNoDiff)) + decisions.AddForwarded(2000) + decisions.AddDropped(2001) + require.False(t, chain.OnFrame(2002, frameDiff1)) + require.True(t, chain.Broken()) +} + +func TestDependencyDescriptor(t *testing.T) { + ddSelector := NewDependencyDescriptor(logger.GetLogger()) + targetLayer := buffer.VideoLayer{Spatial: 1, Temporal: 2} + ddSelector.SetTarget(targetLayer) + ddSelector.SetRequestSpatial(1) + + // no dd ext, dropped + ret := ddSelector.Select(&buffer.ExtPacket{}, 0) + require.False(t, ret.IsSelected) + require.False(t, ret.IsRelevant) + + // non key frame, dropped + ret = ddSelector.Select(&buffer.ExtPacket{ + KeyFrame: false, + DependencyDescriptor: &buffer.ExtDependencyDescriptor{ + Descriptor: &dd.DependencyDescriptor{ + FrameNumber: 1, + FrameDependencies: &dd.FrameDependencyTemplate{ + SpatialId: int(targetLayer.Spatial), + TemporalId: int(targetLayer.Temporal), + }, + }, + }, + }, 0) + require.False(t, ret.IsSelected) + require.True(t, ret.IsRelevant) + + frames := createDDFrames(buffer.VideoLayer{Spatial: 2, Temporal: 2}, 3) + // key frame, update structure and decode targets + ret = ddSelector.Select(frames[0], 0) + require.True(t, ret.IsSelected) + require.Equal(t, ddSelector.GetCurrent(), ddSelector.GetTarget()) + sync, _ := ddSelector.CheckSync() + require.True(t, sync) + + // forward frame belongs to target layer + // drop frame exceeds target layer (not present in target layer or lower layer) + // forward frame not present in target layer but present in lower layer + var ( + belongTargetCase bool + exceedTargetCase bool + lowerTargetCase bool + ) + idx := 1 + var frameForwarded, frameDropped []*buffer.ExtPacket + for ; idx < len(frames); idx++ { + fd := frames[idx].DependencyDescriptor.Descriptor.FrameDependencies + ret = ddSelector.Select(frames[idx], 0) + switch { + case fd.SpatialId == int(targetLayer.Spatial) && fd.TemporalId == int(targetLayer.Temporal): + require.True(t, ret.IsSelected) + belongTargetCase = true + frameForwarded = append(frameForwarded, frames[idx]) + case fd.SpatialId < int(targetLayer.Spatial) && fd.TemporalId == 0: + require.True(t, ret.IsSelected) + lowerTargetCase = true + frameForwarded = append(frameForwarded, frames[idx]) + case fd.SpatialId > int(targetLayer.Spatial) || fd.TemporalId > int(targetLayer.Temporal): + require.False(t, ret.IsSelected) + exceedTargetCase = true + frameDropped = append(frameDropped, frames[idx]) + } + + if belongTargetCase && exceedTargetCase && lowerTargetCase { + break + } + } + + require.True(t, belongTargetCase && exceedTargetCase && lowerTargetCase) + + // select frame already forwarded + ret = ddSelector.Select(frameForwarded[0], 0) + require.True(t, ret.IsSelected) + + // drop frame already dropped + ret = ddSelector.Select(frameDropped[0], 0) + require.False(t, ret.IsSelected) + + // drop frame present but not decodable (dependency frame missed) + idx++ + for ; idx < len(frames); idx++ { + fd := frames[idx].DependencyDescriptor.Descriptor.FrameDependencies + ret = ddSelector.Select(frames[idx], 0) + if fd.SpatialId == int(targetLayer.Spatial) && fd.TemporalId == int(targetLayer.Temporal) { + break + } + } + notDecodableFrame := frames[idx] + notDecodableFrame.DependencyDescriptor.Descriptor.FrameDependencies.FrameDiffs = []int{ + int(notDecodableFrame.DependencyDescriptor.Descriptor.FrameNumber - frameDropped[0].DependencyDescriptor.Descriptor.FrameNumber), + } + ret = ddSelector.Select(notDecodableFrame, 0) + require.False(t, ret.IsSelected) + + // target layer broken + idx++ + for ; idx < len(frames); idx++ { + fd := frames[idx].DependencyDescriptor.Descriptor.FrameDependencies + ret = ddSelector.Select(frames[idx], 0) + if fd.SpatialId == int(targetLayer.Spatial) && fd.TemporalId == int(targetLayer.Temporal) { + break + } + } + brokenFrame := frames[idx] + brokenFrame.DependencyDescriptor.Descriptor.FrameDependencies.ChainDiffs[targetLayer.Spatial] = + int(notDecodableFrame.DependencyDescriptor.Descriptor.FrameNumber - frameDropped[0].DependencyDescriptor.Descriptor.FrameNumber) + ret = ddSelector.Select(brokenFrame, 0) + require.False(t, ret.IsSelected) + + // switch to lower layer, forward frame + idx++ + var switchToLower bool + for ; idx < len(frames); idx++ { + ret = ddSelector.Select(frames[idx], 0) + if ret.IsSelected { + require.True(t, targetLayer.GreaterThan(ddSelector.GetCurrent())) + switchToLower = true + break + } + } + require.True(t, switchToLower) + + // not sync with requested layer + ddSelector.SetRequestSpatial(targetLayer.Spatial) + locked, layer := ddSelector.CheckSync() + require.False(t, locked) + require.Equal(t, targetLayer.Spatial, layer) + + // request to current layer, sync + ddSelector.SetRequestSpatial(ddSelector.GetCurrent().Spatial) + locked, _ = ddSelector.CheckSync() + require.True(t, locked) +} + +func createDDFrames(maxLayer buffer.VideoLayer, startFrameNumber uint16) []*buffer.ExtPacket { + var frames []*buffer.ExtPacket + var activeBitMask uint32 + var decodeTargets []buffer.DependencyDescriptorDecodeTarget + var decodeTargetsProtectByChain []int + for i := 0; i <= int(maxLayer.Spatial); i++ { + for j := 0; j <= int(maxLayer.Temporal); j++ { + decodeTargets = append(decodeTargets, buffer.DependencyDescriptorDecodeTarget{ + Target: i*int(maxLayer.Temporal+1) + j, + Layer: buffer.VideoLayer{Spatial: int32(i), Temporal: int32(j)}, + }) + decodeTargetsProtectByChain = append(decodeTargetsProtectByChain, i) + activeBitMask |= 1 << uint(i*int(maxLayer.Temporal+1)+j) + } + } + sort.Slice(decodeTargets, func(i, j int) bool { + return decodeTargets[i].Layer.GreaterThan(decodeTargets[j].Layer) + }) + + chainDiffs := make([]int, len(decodeTargets)) + dtis := make([]dd.DecodeTargetIndication, len(decodeTargets)) + for _, dt := range decodeTargets { + dtis[dt.Target] = dd.DecodeTargetSwitch + } + + templates := make([]*dd.FrameDependencyTemplate, len(decodeTargets)) + + for _, dt := range decodeTargets { + templates[dt.Target] = &dd.FrameDependencyTemplate{ + SpatialId: int(dt.Layer.Spatial), + TemporalId: int(dt.Layer.Temporal), + ChainDiffs: chainDiffs, + DecodeTargetIndications: dtis, + } + } + keyFrame := &buffer.ExtPacket{ + KeyFrame: true, + DependencyDescriptor: &buffer.ExtDependencyDescriptor{ + Descriptor: &dd.DependencyDescriptor{ + FrameNumber: startFrameNumber, + FrameDependencies: &dd.FrameDependencyTemplate{ + SpatialId: 0, + TemporalId: 0, + ChainDiffs: chainDiffs, + DecodeTargetIndications: dtis, + }, + AttachedStructure: &dd.FrameDependencyStructure{ + NumDecodeTargets: int((maxLayer.Spatial + 1) * (maxLayer.Temporal + 1)), + NumChains: int(maxLayer.Spatial) + 1, + DecodeTargetProtectedByChain: decodeTargetsProtectByChain, + Templates: templates, + }, + ActiveDecodeTargetsBitmask: &activeBitMask, + }, + DecodeTargets: decodeTargets, + StructureUpdated: true, + ActiveDecodeTargetsUpdated: true, + }, + Packet: &rtp.Packet{ + Header: rtp.Header{ + SSRC: 1234, + }, + }, + } + + frames = append(frames, keyFrame) + + chainPrevFrame := make(map[int]int) + for i := 0; i <= int(maxLayer.Spatial); i++ { + chainPrevFrame[i] = int(startFrameNumber) + } + startFrameNumber++ + for i := 0; i < 10; i++ { + for j := len(decodeTargets) - 1; j >= 0; j-- { + dt := decodeTargets[j] + frameChainDiffs := make([]int, len(chainDiffs)) + for i := range frameChainDiffs { + frameChainDiffs[i] = int(startFrameNumber) - chainPrevFrame[i] + } + + frameDtis := make([]dd.DecodeTargetIndication, len(dtis)) + for k := range frameDtis { + if k >= dt.Target { + if dt.Layer.Temporal == 0 { + frameDtis[k] = dd.DecodeTargetRequired + } else { + frameDtis[k] = dd.DecodeTargetDiscardable + } + } else { + frameDtis[k] = dd.DecodeTargetNotPresent + } + } + + frame := &buffer.ExtPacket{ + KeyFrame: true, + DependencyDescriptor: &buffer.ExtDependencyDescriptor{ + Descriptor: &dd.DependencyDescriptor{ + FrameNumber: startFrameNumber, + FrameDependencies: &dd.FrameDependencyTemplate{ + SpatialId: int(dt.Layer.Spatial), + TemporalId: int(dt.Layer.Temporal), + ChainDiffs: frameChainDiffs, + DecodeTargetIndications: frameDtis, + }, + }, + DecodeTargets: decodeTargets, + }, + Packet: &rtp.Packet{ + Header: rtp.Header{ + SSRC: 1234, + }, + }, + } + + startFrameNumber++ + + if dt.Layer.Temporal == 0 { + chainPrevFrame[int(dt.Layer.Spatial)] = int(startFrameNumber) + } + + frames = append(frames, frame) + } + } + + return frames +} diff --git a/pkg/sfu/videolayerselector/framechain.go b/pkg/sfu/videolayerselector/framechain.go new file mode 100644 index 000000000..60cb4ea73 --- /dev/null +++ b/pkg/sfu/videolayerselector/framechain.go @@ -0,0 +1,114 @@ +package videolayerselector + +import ( + dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" + "github.com/livekit/protocol/logger" +) + +type FrameChain struct { + logger logger.Logger + decisions *SelectorDecisionCache + broken bool + chainIdx int + active bool + updatingActive bool + + expectFrames []uint64 +} + +func NewFrameChain(decisions *SelectorDecisionCache, chainIdx int, logger logger.Logger) *FrameChain { + return &FrameChain{ + logger: logger, + decisions: decisions, + broken: true, + chainIdx: chainIdx, + active: false, + } +} + +func (fc *FrameChain) OnFrame(extFrameNum uint64, fd *dd.FrameDependencyTemplate) bool { + if !fc.active { + return false + } + + // A decodable frame with frame_chain_fdiff equal to 0 indicates that the Chain is intact. + if fd.ChainDiffs[fc.chainIdx] == 0 { + if fc.broken { + fc.broken = false + fc.logger.Debugw("frame chain intact", "chanIdx", fc.chainIdx) + } + fc.expectFrames = fc.expectFrames[:0] + return true + } + + if fc.broken { + return false + } + + prevFrameInChain := extFrameNum - uint64(fd.ChainDiffs[fc.chainIdx]) + sd, err := fc.decisions.GetDecision(prevFrameInChain) + if err != nil { + fc.logger.Debugw("could not get decision", "err", err, "frame", extFrameNum, "prevFrame", prevFrameInChain) + } + + var intact bool + switch { + case sd == selectorDecisionForwarded: + intact = true + + case sd == selectorDecisionUnknown: + // If the previous frame is unknown, means it has not arrived but could be recovered by NACK / out-of-order arrival, + // set up a expected callback here to determine if the chain is broken or intact + if fc.decisions.ExpectDecision(prevFrameInChain, fc.OnExpectFrameChanged) { + intact = true + fc.expectFrames = append(fc.expectFrames, prevFrameInChain) + } + } + + if !intact { + fc.broken = true + fc.logger.Debugw("frame chain broken", "chanIdx", fc.chainIdx, "sd", sd, "frame", extFrameNum, "prevFrame", prevFrameInChain) + } + return intact +} + +func (fc *FrameChain) OnExpectFrameChanged(frameNum uint64, decision selectorDecision) { + for i, f := range fc.expectFrames { + if f == frameNum { + if decision != selectorDecisionForwarded { + fc.broken = true + } + fc.expectFrames[i] = fc.expectFrames[len(fc.expectFrames)-1] + fc.expectFrames = fc.expectFrames[:len(fc.expectFrames)-1] + break + } + } +} + +func (fc *FrameChain) Broken() bool { + return fc.broken +} + +func (fc *FrameChain) BeginUpdateActive() { + fc.updatingActive = false +} + +func (fc *FrameChain) UpdateActive(active bool) { + fc.updatingActive = fc.updatingActive || active +} + +func (fc *FrameChain) EndUpdateActive() { + active := fc.updatingActive + fc.updatingActive = false + + if active == fc.active { + return + } + + // if the chain transit from inactive to active, reset broken to wait a decodable SWITCH frame + if !fc.active { + fc.broken = true + } + + fc.active = active +} diff --git a/pkg/sfu/videolayerselector/selectordecisioncache.go b/pkg/sfu/videolayerselector/selectordecisioncache.go index b83bf9559..c98d13984 100644 --- a/pkg/sfu/videolayerselector/selectordecisioncache.go +++ b/pkg/sfu/videolayerselector/selectordecisioncache.go @@ -33,18 +33,23 @@ func (s selectorDecision) String() string { // ---------------------------------------------------------------------- type SelectorDecisionCache struct { - initialized bool - base uint64 - last uint64 - masks []uint64 - numEntries uint64 + initialized bool + base uint64 + last uint64 + masks []uint64 + numEntries uint64 + numNackEntries uint64 + + onExpectEntityChanged map[uint64][]func(entity uint64, decision selectorDecision) } -func NewSelectorDecisionCache(maxNumElements uint64) *SelectorDecisionCache { +func NewSelectorDecisionCache(maxNumElements uint64, numNackEntries uint64) *SelectorDecisionCache { numElements := (maxNumElements*2 + 63) / 64 return &SelectorDecisionCache{ - masks: make([]uint64, numElements), - numEntries: numElements * 32, // 2 bits per entry + masks: make([]uint64, numElements), + numEntries: numElements * 32, // 2 bits per entry + numNackEntries: numNackEntries, + onExpectEntityChanged: make(map[uint64][]func(entity uint64, decision selectorDecision)), } } @@ -57,19 +62,39 @@ func (s *SelectorDecisionCache) AddDropped(entity uint64) { } func (s *SelectorDecisionCache) GetDecision(entity uint64) (selectorDecision, error) { - if !s.initialized || entity > s.last || entity < s.base { + if !s.initialized || entity < s.base { + return selectorDecisionMissing, nil + } + + if entity > s.last { return selectorDecisionUnknown, nil } offset := s.last - entity if offset >= s.numEntries { // asking for something too old - return selectorDecisionUnknown, fmt.Errorf("too old, oldest: %d, asking: %d", s.last-s.numEntries+1, entity) + return selectorDecisionMissing, fmt.Errorf("too old, oldest: %d, asking: %d", s.last-s.numEntries+1, entity) } return s.getEntity(entity), nil } +func (s *SelectorDecisionCache) ExpectDecision(entity uint64, f func(entity uint64, decision selectorDecision)) bool { + if !s.initialized || entity < s.base { + return false + } + + if entity < s.last { + offset := s.last - entity + if offset >= s.numEntries { + return false // too old + } + } + + s.onExpectEntityChanged[entity] = append(s.onExpectEntityChanged[entity], f) + return true +} + func (s *SelectorDecisionCache) addEntity(entity uint64, sd selectorDecision) { if !s.initialized { s.initialized = true @@ -90,16 +115,60 @@ func (s *SelectorDecisionCache) addEntity(entity uint64, sd selectorDecision) { } for e := s.last + 1; e != entity; e++ { - s.setEntity(e, selectorDecisionMissing) + s.setEntity(e, selectorDecisionUnknown) } + + // update [last+1-nack, entity-nack) to missing + missingStart := s.last + if missingStart > s.numNackEntries+s.base { + missingStart -= s.numNackEntries + } else { + missingStart = s.base + } + missingEnd := entity + if missingEnd > s.numNackEntries+s.base { + missingEnd -= s.numNackEntries + } else { + missingEnd = s.base + } + if missingEnd > missingStart { + for e := missingStart; e != missingEnd; e++ { + s.setEntityIfUnknown(e, selectorDecisionMissing) + } + } + s.setEntity(entity, sd) s.last = entity + + for e, fns := range s.onExpectEntityChanged { + if e+s.numEntries < s.last { + delete(s.onExpectEntityChanged, e) + for _, f := range fns { + f(e, selectorDecisionMissing) + } + } + } +} + +func (s *SelectorDecisionCache) setEntityIfUnknown(entity uint64, sd selectorDecision) { + if s.getEntity(entity) == selectorDecisionUnknown { + s.setEntity(entity, sd) + } } func (s *SelectorDecisionCache) setEntity(entity uint64, sd selectorDecision) { index, bitpos := s.getPos(entity) s.masks[index] &= ^(0x3 << bitpos) // clear before bitwise OR s.masks[index] |= (uint64(sd) & 0x3) << bitpos + + if sd != selectorDecisionUnknown { + if fns, ok := s.onExpectEntityChanged[entity]; ok { + delete(s.onExpectEntityChanged, entity) + for _, f := range fns { + f(entity, sd) + } + } + } } func (s *SelectorDecisionCache) getEntity(entity uint64) selectorDecision { diff --git a/pkg/sfu/videolayerselector/videolayerselector.go b/pkg/sfu/videolayerselector/videolayerselector.go index fa1e71d83..b108976bf 100644 --- a/pkg/sfu/videolayerselector/videolayerselector.go +++ b/pkg/sfu/videolayerselector/videolayerselector.go @@ -32,6 +32,8 @@ type VideoLayerSelector interface { SetRequestSpatial(layer int32) GetRequestSpatial() int32 + CheckSync() (locked bool, layer int32) + SetMaxSeen(maxSeenLayer buffer.VideoLayer) SetMaxSeenSpatial(layer int32) SetMaxSeenTemporal(layer int32)