diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 957aaa452..0550f9cf6 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -54,6 +54,7 @@ type pendingPacket struct { type ExtPacket struct { VideoLayer Arrival time.Time + ExtSequenceNumber uint32 Packet *rtp.Packet Payload interface{} KeyFrame bool @@ -83,7 +84,6 @@ type Buffer struct { snRangeMap *utils.RangeMap[uint32, uint32] - // supported feedbacks latestTSForAudioLevelInitialized bool latestTSForAudioLevel uint32 @@ -441,7 +441,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { b.doReports(arrivalTime) - ep := b.getExtPacket(&rtpPacket, arrivalTime) + ep := b.getExtPacket(&rtpPacket, extSeqNumber, arrivalTime) if ep == nil { return } @@ -546,10 +546,11 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) { } } -func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time) *ExtPacket { +func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, extSeqNumber uint32, arrivalTime time.Time) *ExtPacket { ep := &ExtPacket{ - Packet: rtpPacket, - Arrival: arrivalTime, + Arrival: arrivalTime, + ExtSequenceNumber: extSeqNumber, + Packet: rtpPacket, VideoLayer: VideoLayer{ Spatial: InvalidLayerSpatial, Temporal: InvalidLayerTemporal, diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index cc690576b..6cc495e90 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -84,6 +84,7 @@ var ( ErrOutOfOrderSequenceNumberCacheMiss = errors.New("out-of-order sequence number not found in cache") ErrPaddingOnlyPacket = errors.New("padding only packet that need not be forwarded") ErrDuplicatePacket = errors.New("duplicate packet") + ErrSequenceNumberOffsetNotFound = errors.New("sequence number offset not found") ErrPaddingNotOnFrameBoundary = errors.New("padding cannot send on non-frame boundary") ErrDownTrackAlreadyBound = errors.New("already bound") ) @@ -1634,16 +1635,10 @@ func (d *DownTrack) translateVP8PacketTo(pkt *rtp.Packet, incomingVP8 *buffer.VP } func (d *DownTrack) DebugInfo() map[string]interface{} { - rtpMungerParams := d.forwarder.GetRTPMungerParams() stats := map[string]interface{}{ - "HighestIncomingSN": rtpMungerParams.highestIncomingSN, - "LastSN": rtpMungerParams.lastSN, - "SNOffset": rtpMungerParams.snOffset, - "LastTS": rtpMungerParams.lastTS, - "TSOffset": rtpMungerParams.tsOffset, - "LastMarker": rtpMungerParams.lastMarker, - "LastPli": d.rtpStats.LastPli(), + "LastPli": d.rtpStats.LastPli(), } + stats["RTPMunger"] = d.forwarder.RTPMungerDebugInfo() senderReport := d.CreateSenderReport() if senderReport != nil { diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 88876d307..69363e65a 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1585,7 +1585,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e "expectedTS", expectedTS, "nextTS", nextTS, "tsJump", nextTS-lastTS, - "nextSN", rtpMungerState.LastSN+1, + "nextSN", rtpMungerState.ExtLastSN+1, ) f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, nextTS-lastTS) @@ -1792,11 +1792,11 @@ func (f *Forwarder) GetPadding(frameEndNeeded bool) ([]byte, error) { return f.codecMunger.UpdateAndGetPadding(!frameEndNeeded) } -func (f *Forwarder) GetRTPMungerParams() RTPMungerParams { +func (f *Forwarder) RTPMungerDebugInfo() map[string]interface{} { f.lock.RLock() defer f.lock.RUnlock() - return f.rtpMunger.GetParams() + return f.rtpMunger.DebugInfo() } // ----------------------------------------------------------------------------- diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index 01b224634..9c3be7c7a 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -1196,9 +1196,13 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { require.NoError(t, err) require.Equal(t, expectedTP, *actualTP) + // add a missing sequence number to the cache + f.rtpMunger.snRangeMap.IncValue(10) + f.rtpMunger.snRangeMap.AddRange(23332, 23333) + // out-of-order packet not in cache should be dropped params = &testutils.TestExtPacketParams{ - SequenceNumber: 23332, + SequenceNumber: 23331, Timestamp: 0xabcdef, SSRC: 0x12345678, PayloadSize: 20, @@ -1239,7 +1243,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingContiguous, - sequenceNumber: 23334, + sequenceNumber: 23324, timestamp: 0xabcdef, }, } @@ -1247,7 +1251,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { require.NoError(t, err) require.Equal(t, expectedTP, *actualTP) - // padding only packet after a gap should be forwarded + // padding only packet after a gap should not be dropped params = &testutils.TestExtPacketParams{ SequenceNumber: 23337, Timestamp: 0xabcdef, @@ -1258,7 +1262,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingGap, - sequenceNumber: 23336, + sequenceNumber: 23326, timestamp: 0xabcdef, }, } @@ -1278,7 +1282,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingOutOfOrder, - sequenceNumber: 23335, + sequenceNumber: 23325, timestamp: 0xabcdef, }, } @@ -1298,7 +1302,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingContiguous, - sequenceNumber: 23337, + sequenceNumber: 23327, timestamp: 0xabcdf0, }, } @@ -1716,7 +1720,7 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) { require.Equal(t, f.lastSSRC, params.SSRC) } -func TestForwardGetSnTsForPadding(t *testing.T) { +func TestForwarderGetSnTsForPadding(t *testing.T) { f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) params := &testutils.TestExtPacketParams{ @@ -1783,7 +1787,7 @@ func TestForwardGetSnTsForPadding(t *testing.T) { require.Equal(t, sntsExpected, snts) } -func TestForwardGetSnTsForBlankFrames(t *testing.T) { +func TestForwarderGetSnTsForBlankFrames(t *testing.T) { f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) params := &testutils.TestExtPacketParams{ @@ -1860,7 +1864,7 @@ func TestForwardGetSnTsForBlankFrames(t *testing.T) { require.Equal(t, sntsExpected, snts) } -func TestForwardGetPaddingVP8(t *testing.T) { +func TestForwarderGetPaddingVP8(t *testing.T) { f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) params := &testutils.TestExtPacketParams{ diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index 52faa8124..3a79cac93 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -20,6 +20,7 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/sfu/buffer" + "github.com/livekit/livekit-server/pkg/sfu/utils" ) // @@ -36,9 +37,6 @@ const ( const ( RtxGateWindow = 2000 - - SnOffsetCacheSize = 4096 - SnOffsetCacheMask = SnOffsetCacheSize - 1 ) type TranslationParamsRTP struct { @@ -55,97 +53,96 @@ type SnTs struct { // ---------------------------------------------------------------------- type RTPMungerState struct { - LastSN uint16 - LastTS uint32 + ExtLastSN uint32 + LastTS uint32 } func (r RTPMungerState) String() string { - return fmt.Sprintf("RTPMungerState{lastSN: %d, lastTS: %d)", r.LastSN, r.LastTS) + return fmt.Sprintf("RTPMungerState{extLastSN: %d, lastTS: %d)", r.ExtLastSN, r.LastTS) } // ---------------------------------------------------------------------- -type RTPMungerParams struct { - highestIncomingSN uint16 - lastSN uint16 - snOffset uint16 - lastTS uint32 - tsOffset uint32 - lastMarker bool - - snOffsets [SnOffsetCacheSize]uint16 - snOffsetsWritePtr int - snOffsetsOccupancy int - - rtxGateSn uint16 - isInRtxGateRegion bool -} - type RTPMunger struct { logger logger.Logger - RTPMungerParams + extHighestIncomingSN uint32 + snRangeMap *utils.RangeMap[uint32, uint32] + + extLastSN uint32 + lastTS uint32 + tsOffset uint32 + lastMarker bool + + extRtxGateSn uint32 + isInRtxGateRegion bool } func NewRTPMunger(logger logger.Logger) *RTPMunger { return &RTPMunger{ - logger: logger, + logger: logger, + snRangeMap: utils.NewRangeMap[uint32, uint32](100), } } -func (r *RTPMunger) GetParams() RTPMungerParams { - return RTPMungerParams{ - highestIncomingSN: r.highestIncomingSN, - lastSN: r.lastSN, - snOffset: r.snOffset, - lastTS: r.lastTS, - tsOffset: r.tsOffset, - lastMarker: r.lastMarker, +func (r *RTPMunger) DebugInfo() map[string]interface{} { + snOffset, _ := r.snRangeMap.GetValue(r.extHighestIncomingSN) + return map[string]interface{}{ + "ExtHighestIncomingSN": r.extHighestIncomingSN, + "ExtLastSN": r.extLastSN, + "SNOffset": snOffset, + "LastTS": r.lastTS, + "TSOffset": r.tsOffset, + "LastMarker": r.lastMarker, } } func (r *RTPMunger) GetLast() RTPMungerState { return RTPMungerState{ - LastSN: r.lastSN, - LastTS: r.lastTS, + ExtLastSN: r.extLastSN, + LastTS: r.lastTS, } } func (r *RTPMunger) SeedLast(state RTPMungerState) { - r.lastSN = state.LastSN + r.extLastSN = state.ExtLastSN r.lastTS = state.LastTS } func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { - r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1 - r.lastSN = extPkt.Packet.SequenceNumber + r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1 + r.extLastSN = extPkt.ExtSequenceNumber r.lastTS = extPkt.Packet.Timestamp } -func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, tsAdjust uint32) { - r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1 - r.snOffset = extPkt.Packet.SequenceNumber - r.lastSN - snAdjust +func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint32, tsAdjust uint32) { + r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1 + r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber - r.extLastSN - snAdjust) r.tsOffset = extPkt.Packet.Timestamp - r.lastTS - tsAdjust - - // clear offsets cache layer/source switch - r.snOffsetsWritePtr = 0 - r.snOffsetsOccupancy = 0 } func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) { - if r.highestIncomingSN != extPkt.Packet.SequenceNumber { + if r.extHighestIncomingSN != extPkt.ExtSequenceNumber { return } - r.snOffset++ - r.lastSN = extPkt.Packet.SequenceNumber - r.snOffset + + r.snRangeMap.IncValue(1) + + snOffset, err := r.snRangeMap.GetValue(extPkt.ExtSequenceNumber) + if err != nil { + r.logger.Errorw("could not get sequence number offset", err) + return + } + + r.extLastSN = extPkt.ExtSequenceNumber - snOffset } func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) { - // if out-of-order, look up sequence number offset cache - diff := extPkt.Packet.SequenceNumber - r.highestIncomingSN - if diff > (1 << 15) { - snOffset, isValid := r.getSnOffset(extPkt.Packet.SequenceNumber) - if !isValid { + diff := extPkt.ExtSequenceNumber - r.extHighestIncomingSN + if diff > (1 << 31) { + // out-of-order, look up sequence number offset cache + snOffset, err := r.snRangeMap.GetValue(extPkt.ExtSequenceNumber) + if err != nil { return &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingOutOfOrder, }, ErrOutOfOrderSequenceNumberCacheMiss @@ -153,72 +150,60 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara return &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingOutOfOrder, - sequenceNumber: extPkt.Packet.SequenceNumber - snOffset, + sequenceNumber: uint16(extPkt.ExtSequenceNumber - snOffset), timestamp: extPkt.Packet.Timestamp - r.tsOffset, }, nil } - // record sn offset - for i := r.highestIncomingSN + 1; i != extPkt.Packet.SequenceNumber+1; i++ { - r.snOffsets[r.snOffsetsWritePtr] = r.snOffset - r.snOffsetsWritePtr = (r.snOffsetsWritePtr + 1) & SnOffsetCacheMask - r.snOffsetsOccupancy++ - } - if r.snOffsetsOccupancy > SnOffsetCacheSize { - r.snOffsetsOccupancy = SnOffsetCacheSize + // can get duplicate packet due to FEC + if diff == 0 { + return &TranslationParamsRTP{ + snOrdering: SequenceNumberOrderingDuplicate, + }, ErrDuplicatePacket } ordering := SequenceNumberOrderingContiguous if diff > 1 { ordering = SequenceNumberOrderingGap - } else { - // can get duplicate packet due to FEC - if diff == 0 { - return &TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingDuplicate, - }, ErrDuplicatePacket - } - - // if padding only packet, can be dropped and sequence number adjusted - // as it is contiguous and in order. That means this is the highest - // incoming sequence number, and it is a good point to adjust - // sequence number offset. - if len(extPkt.Packet.Payload) == 0 { - r.highestIncomingSN = extPkt.Packet.SequenceNumber - r.snOffset++ - - return &TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingContiguous, - }, ErrPaddingOnlyPacket - } + r.snRangeMap.AddRange(r.extHighestIncomingSN+1, extPkt.ExtSequenceNumber) } - // in-order incoming packet, may or may not be contiguous. - // In the case of loss (i.e. incoming sequence number is not contiguous), - // forward even if it is a padding only packet. With temporal scalability, - // it is unclear if the current packet should be dropped if it is not - // contiguous. Hence, forward anything that is not contiguous. - // Reference: http://www.rtcbits.com/2017/04/howto-implement-temporal-scalability.html - mungedSN := extPkt.Packet.SequenceNumber - r.snOffset + r.extHighestIncomingSN = extPkt.ExtSequenceNumber + + // if padding only packet, can be dropped and sequence number adjusted, if contiguous + if diff == 1 && len(extPkt.Packet.Payload) == 0 { + r.snRangeMap.IncValue(1) + return &TranslationParamsRTP{ + snOrdering: ordering, + }, ErrPaddingOnlyPacket + } + + snOffset, err := r.snRangeMap.GetValue(extPkt.ExtSequenceNumber) + if err != nil { + return &TranslationParamsRTP{ + snOrdering: ordering, + }, ErrSequenceNumberOffsetNotFound + } + + extMungedSN := extPkt.ExtSequenceNumber - snOffset mungedTS := extPkt.Packet.Timestamp - r.tsOffset - r.highestIncomingSN = extPkt.Packet.SequenceNumber - r.lastSN = mungedSN + r.extLastSN = extMungedSN r.lastTS = mungedTS r.lastMarker = extPkt.Packet.Marker if extPkt.KeyFrame { - r.rtxGateSn = mungedSN + r.extRtxGateSn = extMungedSN r.isInRtxGateRegion = true } - if r.isInRtxGateRegion && (mungedSN-r.rtxGateSn) < (1<<15) && (mungedSN-r.rtxGateSn) > RtxGateWindow { + if r.isInRtxGateRegion && (extMungedSN-r.extRtxGateSn) > RtxGateWindow { r.isInRtxGateRegion = false } return &TranslationParamsRTP{ snOrdering: ordering, - sequenceNumber: mungedSN, + sequenceNumber: uint16(extMungedSN), timestamp: mungedTS, }, nil } @@ -230,7 +215,7 @@ func (r *RTPMunger) FilterRTX(nacks []uint16) []uint16 { filtered := make([]uint16, 0, len(nacks)) for _, sn := range nacks { - if (sn - r.rtxGateSn) < (1 << 15) { + if (sn - uint16(r.extRtxGateSn)) < (1 << 15) { filtered = append(filtered, sn) } } @@ -251,10 +236,12 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate tsOffset = 1 } + extLastSN := r.extLastSN lastTS := r.lastTS vals := make([]SnTs, num) for i := 0; i < num; i++ { - vals[i].sequenceNumber = r.lastSN + uint16(i) + 1 + extLastSN++ + vals[i].sequenceNumber = uint16(extLastSN) if frameRate != 0 { if useLastTSForFirst && i == 0 { vals[i].timestamp = r.lastTS @@ -271,8 +258,8 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate } } - r.lastSN = vals[num-1].sequenceNumber - r.snOffset -= uint16(num) + r.extLastSN = extLastSN + r.snRangeMap.DecValue(uint32(num)) r.tsOffset -= vals[num-1].timestamp - r.lastTS r.lastTS = vals[num-1].timestamp @@ -287,13 +274,3 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate func (r *RTPMunger) IsOnFrameBoundary() bool { return r.lastMarker } - -func (r *RTPMunger) getSnOffset(sn uint16) (uint16, bool) { - diff := r.highestIncomingSN - sn - if int(diff) >= r.snOffsetsOccupancy { - return 0, false - } - - readPtr := (r.snOffsetsWritePtr - int(diff) - 1) & SnOffsetCacheMask - return r.snOffsets[readPtr], true -} diff --git a/pkg/sfu/rtpmunger_test.go b/pkg/sfu/rtpmunger_test.go index 63a611a1f..9f4ea976d 100644 --- a/pkg/sfu/rtpmunger_test.go +++ b/pkg/sfu/rtpmunger_test.go @@ -41,10 +41,12 @@ func TestSetLastSnTs(t *testing.T) { require.NotNil(t, extPkt) r.SetLastSnTs(extPkt) - require.Equal(t, uint16(23332), r.highestIncomingSN) - require.Equal(t, uint16(23333), r.lastSN) + require.Equal(t, uint32(23332), r.extHighestIncomingSN) + require.Equal(t, uint32(23333), r.extLastSN) require.Equal(t, uint32(0xabcdef), r.lastTS) - require.Equal(t, uint16(0), r.snOffset) + snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.NoError(t, err) + require.Equal(t, uint32(0), snOffset) require.Equal(t, uint32(0), r.tsOffset) } @@ -66,10 +68,12 @@ func TestUpdateSnTsOffsets(t *testing.T) { } extPkt, _ = testutils.GetTestExtPacket(params) r.UpdateSnTsOffsets(extPkt, 1, 1) - require.Equal(t, uint16(33332), r.highestIncomingSN) - require.Equal(t, uint16(23333), r.lastSN) + require.Equal(t, uint32(33332), r.extHighestIncomingSN) + require.Equal(t, uint32(23333), r.extLastSN) require.Equal(t, uint32(0xabcdef), r.lastTS) - require.Equal(t, uint16(9999), r.snOffset) + snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.NoError(t, err) + require.Equal(t, uint32(9999), snOffset) require.Equal(t, uint32(0xffffffff), r.tsOffset) } @@ -84,14 +88,15 @@ func TestPacketDropped(t *testing.T) { } extPkt, _ := testutils.GetTestExtPacket(params) r.SetLastSnTs(extPkt) - require.Equal(t, uint16(23332), r.highestIncomingSN) - require.Equal(t, uint16(23333), r.lastSN) + require.Equal(t, uint32(23332), r.extHighestIncomingSN) + require.Equal(t, uint32(23333), r.extLastSN) require.Equal(t, uint32(0xabcdef), r.lastTS) - require.Equal(t, uint16(0), r.snOffset) + snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.NoError(t, err) + require.Equal(t, uint32(0), snOffset) require.Equal(t, uint32(0), r.tsOffset) r.UpdateAndGetSnTs(extPkt) // update sequence number offset - require.Equal(t, 1, r.snOffsetsWritePtr) // drop a non-head packet, should cause no change in internals params = &testutils.TestExtPacketParams{ @@ -101,9 +106,11 @@ func TestPacketDropped(t *testing.T) { } extPkt, _ = testutils.GetTestExtPacket(params) r.PacketDropped(extPkt) - require.Equal(t, uint16(23333), r.highestIncomingSN) - require.Equal(t, uint16(23333), r.lastSN) - require.Equal(t, uint16(0), r.snOffset) + require.Equal(t, uint32(23333), r.extHighestIncomingSN) + require.Equal(t, uint32(23333), r.extLastSN) + snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.NoError(t, err) + require.Equal(t, uint32(0), snOffset) // drop a head packet and check offset increases params = &testutils.TestExtPacketParams{ @@ -115,13 +122,12 @@ func TestPacketDropped(t *testing.T) { extPkt, _ = testutils.GetTestExtPacket(params) r.UpdateAndGetSnTs(extPkt) // update sequence number offset - snOffsetWritePtr := (44444 - 23333 + 1) & SnOffsetCacheMask - require.Equal(t, snOffsetWritePtr, r.snOffsetsWritePtr) - require.Equal(t, SnOffsetCacheSize, r.snOffsetsOccupancy) r.PacketDropped(extPkt) - require.Equal(t, r.lastSN, uint16(44443)) - require.Equal(t, uint16(1), r.snOffset) + require.Equal(t, uint32(44443), r.extLastSN) + snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.NoError(t, err) + require.Equal(t, uint32(1), snOffset) params = &testutils.TestExtPacketParams{ SequenceNumber: 44445, @@ -132,11 +138,10 @@ func TestPacketDropped(t *testing.T) { extPkt, _ = testutils.GetTestExtPacket(params) r.UpdateAndGetSnTs(extPkt) // update sequence number offset - require.Equal(t, uint16(1), r.snOffsets[snOffsetWritePtr]) - snOffsetWritePtr = (snOffsetWritePtr + 1) & SnOffsetCacheMask - require.Equal(t, snOffsetWritePtr, r.snOffsetsWritePtr) - require.Equal(t, uint16(44444), r.lastSN) - require.Equal(t, uint16(1), r.snOffset) + require.Equal(t, r.extLastSN, uint32(44444)) + snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.NoError(t, err) + require.Equal(t, uint32(1), snOffset) } func TestOutOfOrderSequenceNumber(t *testing.T) { @@ -152,9 +157,13 @@ func TestOutOfOrderSequenceNumber(t *testing.T) { r.SetLastSnTs(extPkt) r.UpdateAndGetSnTs(extPkt) + // add a missing sequence number to the cache + r.snRangeMap.IncValue(10) + r.snRangeMap.AddRange(23332, 23333) + // out-of-order sequence number not in the missing sequence number cache params = &testutils.TestExtPacketParams{ - SequenceNumber: 23332, + SequenceNumber: 23331, Timestamp: 0xabcdef, SSRC: 0x12345678, PayloadSize: 10, @@ -170,9 +179,13 @@ func TestOutOfOrderSequenceNumber(t *testing.T) { require.ErrorIs(t, err, ErrOutOfOrderSequenceNumberCacheMiss) require.Equal(t, tpExpected, *tp) - // add missing sequence number to the cache and try again - r.snOffsets[SnOffsetCacheSize-1] = 10 - r.snOffsetsOccupancy++ + params = &testutils.TestExtPacketParams{ + SequenceNumber: 23332, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 10, + } + extPkt, _ = testutils.GetTestExtPacket(params) tpExpected = TranslationParamsRTP{ snOrdering: SequenceNumberOrderingOutOfOrder, @@ -230,9 +243,11 @@ func TestPaddingOnlyPacket(t *testing.T) { require.Error(t, err) require.ErrorIs(t, err, ErrPaddingOnlyPacket) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint16(23333), r.highestIncomingSN) - require.Equal(t, uint16(23333), r.lastSN) - require.Equal(t, uint16(1), r.snOffset) + require.Equal(t, uint32(23333), r.extHighestIncomingSN) + require.Equal(t, uint32(23333), r.extLastSN) + snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.NoError(t, err) + require.Equal(t, uint32(1), snOffset) // padding only packet with a gap should not report an error params = &testutils.TestExtPacketParams{ @@ -251,9 +266,11 @@ func TestPaddingOnlyPacket(t *testing.T) { tp, err = r.UpdateAndGetSnTs(extPkt) require.NoError(t, err) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint16(23335), r.highestIncomingSN) - require.Equal(t, uint16(23334), r.lastSN) - require.Equal(t, uint16(1), r.snOffset) + require.Equal(t, uint32(23335), r.extHighestIncomingSN) + require.Equal(t, uint32(23334), r.extLastSN) + snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.NoError(t, err) + require.Equal(t, uint32(1), snOffset) } func TestGapInSequenceNumber(t *testing.T) { @@ -274,6 +291,7 @@ func TestGapInSequenceNumber(t *testing.T) { // three lost packets params = &testutils.TestExtPacketParams{ SequenceNumber: 1, + SNCycles: 1, Timestamp: 0xabcdef, SSRC: 0x12345678, PayloadSize: 33, @@ -289,22 +307,25 @@ func TestGapInSequenceNumber(t *testing.T) { tp, err := r.UpdateAndGetSnTs(extPkt) require.NoError(t, err) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint16(1), r.highestIncomingSN) - require.Equal(t, uint16(1), r.lastSN) - require.Equal(t, uint16(0), r.snOffset) + require.Equal(t, uint32(65536+1), r.extHighestIncomingSN) + require.Equal(t, uint32(65536+1), r.extLastSN) + snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.NoError(t, err) + require.Equal(t, uint32(0), snOffset) // ensure missing sequence numbers got recorded in cache // last received, three missing in between and current received should all be in cache - for i := uint16(65533); i != 2; i++ { - offset, ok := r.getSnOffset(i) - require.True(t, ok) - require.Equal(t, uint16(0), offset) + for i := uint32(65534); i != 65536+1; i++ { + offset, err := r.snRangeMap.GetValue(i) + require.NoError(t, err) + require.Equal(t, uint32(0), offset) } // a padding only packet should be dropped params = &testutils.TestExtPacketParams{ SequenceNumber: 2, + SNCycles: 1, Timestamp: 0xabcdef, SSRC: 0x12345678, } @@ -317,13 +338,16 @@ func TestGapInSequenceNumber(t *testing.T) { tp, err = r.UpdateAndGetSnTs(extPkt) require.ErrorIs(t, err, ErrPaddingOnlyPacket) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint16(2), r.highestIncomingSN) - require.Equal(t, uint16(1), r.lastSN) - require.Equal(t, uint16(1), r.snOffset) + require.Equal(t, uint32(65536+2), r.extHighestIncomingSN) + require.Equal(t, uint32(65536+1), r.extLastSN) + snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.NoError(t, err) + require.Equal(t, uint32(1), snOffset) // a packet with a gap should be adding to missing cache params = &testutils.TestExtPacketParams{ SequenceNumber: 4, + SNCycles: 1, Timestamp: 0xabcdef, SSRC: 0x12345678, PayloadSize: 22, @@ -339,13 +363,16 @@ func TestGapInSequenceNumber(t *testing.T) { tp, err = r.UpdateAndGetSnTs(extPkt) require.NoError(t, err) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint16(4), r.highestIncomingSN) - require.Equal(t, uint16(3), r.lastSN) - require.Equal(t, uint16(1), r.snOffset) + require.Equal(t, uint32(65536+4), r.extHighestIncomingSN) + require.Equal(t, uint32(65536+3), r.extLastSN) + snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.NoError(t, err) + require.Equal(t, uint32(1), snOffset) // another contiguous padding only packet should be dropped params = &testutils.TestExtPacketParams{ SequenceNumber: 5, + SNCycles: 1, Timestamp: 0xabcdef, SSRC: 0x12345678, } @@ -358,13 +385,16 @@ func TestGapInSequenceNumber(t *testing.T) { tp, err = r.UpdateAndGetSnTs(extPkt) require.ErrorIs(t, err, ErrPaddingOnlyPacket) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint16(5), r.highestIncomingSN) - require.Equal(t, uint16(3), r.lastSN) - require.Equal(t, uint16(2), r.snOffset) + require.Equal(t, uint32(65536+5), r.extHighestIncomingSN) + require.Equal(t, uint32(65536+3), r.extLastSN) + snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.NoError(t, err) + require.Equal(t, uint32(2), snOffset) // a packet with a gap should be adding to missing cache params = &testutils.TestExtPacketParams{ SequenceNumber: 7, + SNCycles: 1, Timestamp: 0xabcdef, SSRC: 0x12345678, PayloadSize: 22, @@ -380,13 +410,16 @@ func TestGapInSequenceNumber(t *testing.T) { tp, err = r.UpdateAndGetSnTs(extPkt) require.NoError(t, err) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint16(7), r.highestIncomingSN) - require.Equal(t, uint16(5), r.lastSN) - require.Equal(t, uint16(2), r.snOffset) + require.Equal(t, uint32(65536+7), r.extHighestIncomingSN) + require.Equal(t, uint32(65536+5), r.extLastSN) + snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.NoError(t, err) + require.Equal(t, uint32(2), snOffset) // check the missing packets params = &testutils.TestExtPacketParams{ SequenceNumber: 6, + SNCycles: 1, Timestamp: 0xabcdef, SSRC: 0x12345678, } @@ -401,12 +434,15 @@ func TestGapInSequenceNumber(t *testing.T) { tp, err = r.UpdateAndGetSnTs(extPkt) require.NoError(t, err) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint16(7), r.highestIncomingSN) - require.Equal(t, uint16(5), r.lastSN) - require.Equal(t, uint16(2), r.snOffset) + require.Equal(t, uint32(65536+7), r.extHighestIncomingSN) + require.Equal(t, uint32(65536+5), r.extLastSN) + snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.NoError(t, err) + require.Equal(t, uint32(2), snOffset) params = &testutils.TestExtPacketParams{ SequenceNumber: 3, + SNCycles: 1, Timestamp: 0xabcdef, SSRC: 0x12345678, } @@ -421,9 +457,11 @@ func TestGapInSequenceNumber(t *testing.T) { tp, err = r.UpdateAndGetSnTs(extPkt) require.NoError(t, err) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint16(7), r.highestIncomingSN) - require.Equal(t, uint16(5), r.lastSN) - require.Equal(t, uint16(2), r.snOffset) + require.Equal(t, uint32(65536+7), r.extHighestIncomingSN) + require.Equal(t, uint32(65536+5), r.extLastSN) + snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.NoError(t, err) + require.Equal(t, uint32(2), snOffset) } func TestUpdateAndGetPaddingSnTs(t *testing.T) { diff --git a/pkg/sfu/testutils/data.go b/pkg/sfu/testutils/data.go index 38640d96b..c30757b3d 100644 --- a/pkg/sfu/testutils/data.go +++ b/pkg/sfu/testutils/data.go @@ -30,6 +30,7 @@ type TestExtPacketParams struct { IsKeyFrame bool PayloadType uint8 SequenceNumber uint16 + SNCycles int Timestamp uint32 SSRC uint32 PayloadSize int @@ -61,11 +62,12 @@ func GetTestExtPacket(params *TestExtPacketParams) (*buffer.ExtPacket, error) { } ep := &buffer.ExtPacket{ - VideoLayer: params.VideoLayer, - Arrival: params.ArrivalTime, - Packet: &packet, - KeyFrame: params.IsKeyFrame, - RawPacket: raw, + VideoLayer: params.VideoLayer, + ExtSequenceNumber: uint32(params.SNCycles<<16) + uint32(params.SequenceNumber), + Arrival: params.ArrivalTime, + Packet: &packet, + KeyFrame: params.IsKeyFrame, + RawPacket: raw, } return ep, nil diff --git a/pkg/sfu/utils/rangemap.go b/pkg/sfu/utils/rangemap.go index 75ffd1646..3654791f0 100644 --- a/pkg/sfu/utils/rangemap.go +++ b/pkg/sfu/utils/rangemap.go @@ -59,12 +59,21 @@ func NewRangeMap[RT rangeType, VT valueType](size int) *RangeMap[RT, VT] { } } +func (r *RangeMap[RT, VT]) ClearAndResetValue(val VT) { + r.ranges = r.ranges[:0] + r.runningValue = val +} + func (r *RangeMap[RT, VT]) IncValue(inc VT) { r.runningValue += inc } +func (r *RangeMap[RT, VT]) DecValue(dec VT) { + r.runningValue -= dec +} + func (r *RangeMap[RT, VT]) AddRange(startInclusive RT, endExclusive RT) error { - if endExclusive-startInclusive > r.halfRange { + if endExclusive == startInclusive || endExclusive-startInclusive > r.halfRange { return errReversedOrder } diff --git a/pkg/sfu/utils/rangemap_test.go b/pkg/sfu/utils/rangemap_test.go index dcc6a745d..ef154c1cd 100644 --- a/pkg/sfu/utils/rangemap_test.go +++ b/pkg/sfu/utils/rangemap_test.go @@ -56,6 +56,7 @@ func TestRangeMapUint32(t *testing.T) { value, err = r.GetValue(22) require.NoError(t, err) require.Equal(t, uint32(2), value) + // outside range should return 3 value, err = r.GetValue(662) require.NoError(t, err) @@ -119,4 +120,10 @@ func TestRangeMapUint32(t *testing.T) { value, err = r.GetValue(3000) require.NoError(t, err) require.Equal(t, uint32(13), value) + + // decrement running value + r.DecValue(23) + value, err = r.GetValue(3000) + require.NoError(t, err) + require.Equal(t, uint32((1<<32)-10), value) }