diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index f621fd587..5e58e3e41 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -1237,11 +1237,23 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { require.Equal(t, expectedTP, *actualTP) // add a missing sequence number to the cache - f.rtpMunger.snRangeMap.ExcludeRange(23332, 23333) + err = f.rtpMunger.snRangeMap.ExcludeRange(23334, 23335) + require.NoError(t, err) + + params = &testutils.TestExtPacketParams{ + SequenceNumber: 23336, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 20, + } + extPkt, _ = testutils.GetTestExtPacket(params) + + _, err = f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) // out-of-order packet should get offset from cache params = &testutils.TestExtPacketParams{ - SequenceNumber: 23331, + SequenceNumber: 23335, Timestamp: 0xabcdef, SSRC: 0x12345678, PayloadSize: 20, @@ -1251,7 +1263,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingOutOfOrder, - extSequenceNumber: 23331, + extSequenceNumber: 23334, extTimestamp: 0xabcdef, }, } @@ -1261,7 +1273,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { // padding only packet in order should be dropped params = &testutils.TestExtPacketParams{ - SequenceNumber: 23334, + SequenceNumber: 23337, Timestamp: 0xabcdef, SSRC: 0x12345678, } @@ -1276,7 +1288,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { // in order packet should be forwarded params = &testutils.TestExtPacketParams{ - SequenceNumber: 23335, + SequenceNumber: 23338, Timestamp: 0xabcdef, SSRC: 0x12345678, PayloadSize: 20, @@ -1286,7 +1298,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingContiguous, - extSequenceNumber: 23333, + extSequenceNumber: 23336, extTimestamp: 0xabcdef, }, } @@ -1296,7 +1308,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { // padding only packet after a gap should not be dropped params = &testutils.TestExtPacketParams{ - SequenceNumber: 23337, + SequenceNumber: 23340, Timestamp: 0xabcdef, SSRC: 0x12345678, } @@ -1305,7 +1317,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingGap, - extSequenceNumber: 23335, + extSequenceNumber: 23338, extTimestamp: 0xabcdef, }, } @@ -1325,7 +1337,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingOutOfOrder, - extSequenceNumber: 23334, + extSequenceNumber: 23335, extTimestamp: 0xabcdef, }, } @@ -1345,7 +1357,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingContiguous, - extSequenceNumber: 23336, + extSequenceNumber: 23339, extTimestamp: 0xabcdf0, }, } diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index c30b8a53b..b162158fe 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -23,9 +23,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/utils" ) -// // RTPMunger -// type SequenceNumberOrdering int const ( @@ -121,6 +119,7 @@ func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { r.extLastSN = extPkt.ExtSequenceNumber r.extSecondLastSN = r.extLastSN - 1 + r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber, 0) r.updateSnOffset() r.extLastTS = extPkt.ExtTimestamp @@ -129,7 +128,7 @@ func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint64, tsAdjust uint64) { r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1 - r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber - r.extLastSN - snAdjust) + r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber, extPkt.ExtSequenceNumber-r.extLastSN-snAdjust) r.updateSnOffset() r.tsOffset = extPkt.ExtTimestamp - r.extLastTS - tsAdjust diff --git a/pkg/sfu/rtpmunger_test.go b/pkg/sfu/rtpmunger_test.go index ea8842457..7c202ce1e 100644 --- a/pkg/sfu/rtpmunger_test.go +++ b/pkg/sfu/rtpmunger_test.go @@ -45,8 +45,11 @@ func TestSetLastSnTs(t *testing.T) { require.Equal(t, uint64(23333), r.extLastSN) require.Equal(t, uint64(0xabcdef), r.extLastTS) snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.Error(t, err) + snOffset, err = r.snRangeMap.GetValue(r.extLastSN) require.NoError(t, err) require.Equal(t, uint64(0), snOffset) + require.Equal(t, uint64(0), r.snOffset) require.Equal(t, uint64(0), r.tsOffset) } @@ -71,9 +74,11 @@ func TestUpdateSnTsOffsets(t *testing.T) { require.Equal(t, uint64(33332), r.extHighestIncomingSN) require.Equal(t, uint64(23333), r.extLastSN) require.Equal(t, uint64(0xabcdef), r.extLastTS) - snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) - require.NoError(t, err) - require.Equal(t, uint64(9999), snOffset) + _, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.Error(t, err) + _, err = r.snRangeMap.GetValue(r.extLastSN) + require.Error(t, err) + require.Equal(t, uint64(9999), r.snOffset) require.Equal(t, uint64(0xffff_ffff_ffff_ffff), r.tsOffset) } @@ -92,6 +97,8 @@ func TestPacketDropped(t *testing.T) { require.Equal(t, uint64(23333), r.extLastSN) require.Equal(t, uint64(0xabcdef), r.extLastTS) snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.Error(t, err) + snOffset, err = r.snRangeMap.GetValue(r.extLastSN) require.NoError(t, err) require.Equal(t, uint64(0), snOffset) require.Equal(t, uint64(0), r.tsOffset) @@ -160,10 +167,11 @@ func TestOutOfOrderSequenceNumber(t *testing.T) { r.SetLastSnTs(extPkt) r.UpdateAndGetSnTs(extPkt) - // add a missing sequence number to the cache - r.snRangeMap.ExcludeRange(23332, 23333) + // should not be able to add a missing sequence number to the cache that is before start + err := r.snRangeMap.ExcludeRange(23332, 23333) + require.Error(t, err) - // out-of-order sequence number should be munged using cache + // out-of-order sequence number before start should miss params = &testutils.TestExtPacketParams{ SequenceNumber: 23331, Timestamp: 0xabcdef, @@ -172,13 +180,38 @@ func TestOutOfOrderSequenceNumber(t *testing.T) { } extPkt, _ = testutils.GetTestExtPacket(params) + tp, err := r.UpdateAndGetSnTs(extPkt) + require.Error(t, err) + + // add a missing sequence number to the cache + err = r.snRangeMap.ExcludeRange(23334, 23335) + require.NoError(t, err) + + params = &testutils.TestExtPacketParams{ + SequenceNumber: 23336, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 10, + } + extPkt, _ = testutils.GetTestExtPacket(params) + r.UpdateAndGetSnTs(extPkt) + + // out-of-order sequence number should be munged from cache + params = &testutils.TestExtPacketParams{ + SequenceNumber: 23335, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 10, + } + extPkt, _ = testutils.GetTestExtPacket(params) + tpExpected := TranslationParamsRTP{ snOrdering: SequenceNumberOrderingOutOfOrder, - extSequenceNumber: 23331, + extSequenceNumber: 23334, extTimestamp: 0xabcdef, } - tp, err := r.UpdateAndGetSnTs(extPkt) + tp, err = r.UpdateAndGetSnTs(extPkt) require.NoError(t, err) require.Equal(t, tpExpected, *tp) diff --git a/pkg/sfu/utils/rangemap.go b/pkg/sfu/utils/rangemap.go index 036c24494..fe5480832 100644 --- a/pkg/sfu/utils/rangemap.go +++ b/pkg/sfu/utils/rangemap.go @@ -59,12 +59,12 @@ func NewRangeMap[RT rangeType, VT valueType](size int) *RangeMap[RT, VT] { halfRange: 1 << ((unsafe.Sizeof(t) * 8) - 1), size: int(math.Max(float64(size), float64(minRanges))), } - r.initRanges(0) + r.initRanges(0, 0) return r } -func (r *RangeMap[RT, VT]) ClearAndResetValue(val VT) { - r.initRanges(val) +func (r *RangeMap[RT, VT]) ClearAndResetValue(start RT, val VT) { + r.initRanges(start, val) } func (r *RangeMap[RT, VT]) DecValue(end RT, dec VT) { @@ -87,10 +87,10 @@ func (r *RangeMap[RT, VT]) DecValue(end RT, dec VT) { r.prune() } -func (r *RangeMap[RT, VT]) initRanges(val VT) { +func (r *RangeMap[RT, VT]) initRanges(start RT, val VT) { r.ranges = []rangeVal[RT, VT]{ { - start: 0, + start: start, end: 0, value: val, }, diff --git a/pkg/sfu/utils/rangemap_test.go b/pkg/sfu/utils/rangemap_test.go index 36a9daa97..657481fc7 100644 --- a/pkg/sfu/utils/rangemap_test.go +++ b/pkg/sfu/utils/rangemap_test.go @@ -219,10 +219,10 @@ func TestRangeMapUint32(t *testing.T) { require.Equal(t, uint32(17), value) // reset - r.ClearAndResetValue(23) + r.ClearAndResetValue(24, 23) expectedRanges = []rangeVal[uint32, uint32]{ { - start: 0, + start: 24, end: 0, value: 23, }, @@ -233,12 +233,13 @@ func TestRangeMapUint32(t *testing.T) { require.NoError(t, err) require.Equal(t, uint32(23), value) - // decrement value and ensure that any key returns that value + // decrement value and ensure that any key after start in ClearAndResetValue above returns that value + // (as given end is higher than open range start, open range should be closed and a new range added) r.DecValue(34, 12) expectedRanges = []rangeVal[uint32, uint32]{ { - start: 0, + start: 24, end: 34, value: 23, }, @@ -260,7 +261,7 @@ func TestRangeMapUint32(t *testing.T) { expectedRanges = []rangeVal[uint32, uint32]{ { - start: 0, + start: 24, end: 34, value: 23, }, @@ -277,8 +278,12 @@ func TestRangeMapUint32(t *testing.T) { } require.Equal(t, expectedRanges, r.ranges) - // first range access + // before first range access value, err = r.GetValue(5) + require.ErrorIs(t, err, errKeyTooOld) + + // first range access + value, err = r.GetValue(25) require.NoError(t, err) require.Equal(t, uint32(23), value) @@ -314,7 +319,7 @@ func TestRangeMapUint32(t *testing.T) { require.Equal(t, expectedRanges, r.ranges) // aged out range access - value, err = r.GetValue(5) + value, err = r.GetValue(25) require.ErrorIs(t, err, errKeyTooOld) // access closed range before decrementing value