From 2ff8fe3b781ceec6d2d8f4edc38c4b5bde7cb2ad Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 7 Oct 2023 10:56:34 +0530 Subject: [PATCH] Prevent old packets resolution. (#2134) * Prevent old packets resolution. With range map, we are just looking up ranges and not exactly which packets were missing. This caused the case of old packets being resolved after layer switch. For example, - Packet 10 is layer switch, range map gets reset - Packet 11, 12, 13 are forwarded - Packet 9 comes, it should ideally be dropped as pre-layer switch old packet. But, when looking up range map, it gets an offset and hence gets re-mapped to something before layer switch. This was probably okay as decoders would have had a key frame at the switch point and moved ahead, but incorrect technically. Fix is to reset the start point in the range map to the switch point and not 0. So, when packet 9 comes, range map will return "key too old" error and that packet will be dropped as missing from cache. * fix tests --- pkg/sfu/forwarder_test.go | 32 +++++++++++++++------- pkg/sfu/rtpmunger.go | 5 ++-- pkg/sfu/rtpmunger_test.go | 49 ++++++++++++++++++++++++++++------ pkg/sfu/utils/rangemap.go | 10 +++---- pkg/sfu/utils/rangemap_test.go | 19 ++++++++----- 5 files changed, 82 insertions(+), 33 deletions(-) diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index f621fd587..5e58e3e41 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -1237,11 +1237,23 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { require.Equal(t, expectedTP, *actualTP) // add a missing sequence number to the cache - f.rtpMunger.snRangeMap.ExcludeRange(23332, 23333) + err = f.rtpMunger.snRangeMap.ExcludeRange(23334, 23335) + require.NoError(t, err) + + params = &testutils.TestExtPacketParams{ + SequenceNumber: 23336, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 20, + } + extPkt, _ = testutils.GetTestExtPacket(params) + + _, err = f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) // out-of-order packet should get offset from cache params = &testutils.TestExtPacketParams{ - SequenceNumber: 23331, + SequenceNumber: 23335, Timestamp: 0xabcdef, SSRC: 0x12345678, PayloadSize: 20, @@ -1251,7 +1263,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingOutOfOrder, - extSequenceNumber: 23331, + extSequenceNumber: 23334, extTimestamp: 0xabcdef, }, } @@ -1261,7 +1273,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { // padding only packet in order should be dropped params = &testutils.TestExtPacketParams{ - SequenceNumber: 23334, + SequenceNumber: 23337, Timestamp: 0xabcdef, SSRC: 0x12345678, } @@ -1276,7 +1288,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { // in order packet should be forwarded params = &testutils.TestExtPacketParams{ - SequenceNumber: 23335, + SequenceNumber: 23338, Timestamp: 0xabcdef, SSRC: 0x12345678, PayloadSize: 20, @@ -1286,7 +1298,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingContiguous, - extSequenceNumber: 23333, + extSequenceNumber: 23336, extTimestamp: 0xabcdef, }, } @@ -1296,7 +1308,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { // padding only packet after a gap should not be dropped params = &testutils.TestExtPacketParams{ - SequenceNumber: 23337, + SequenceNumber: 23340, Timestamp: 0xabcdef, SSRC: 0x12345678, } @@ -1305,7 +1317,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingGap, - extSequenceNumber: 23335, + extSequenceNumber: 23338, extTimestamp: 0xabcdef, }, } @@ -1325,7 +1337,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingOutOfOrder, - extSequenceNumber: 23334, + extSequenceNumber: 23335, extTimestamp: 0xabcdef, }, } @@ -1345,7 +1357,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingContiguous, - extSequenceNumber: 23336, + extSequenceNumber: 23339, extTimestamp: 0xabcdf0, }, } diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index c30b8a53b..b162158fe 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -23,9 +23,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/utils" ) -// // RTPMunger -// type SequenceNumberOrdering int const ( @@ -121,6 +119,7 @@ func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { r.extLastSN = extPkt.ExtSequenceNumber r.extSecondLastSN = r.extLastSN - 1 + r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber, 0) r.updateSnOffset() r.extLastTS = extPkt.ExtTimestamp @@ -129,7 +128,7 @@ func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint64, tsAdjust uint64) { r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1 - r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber - r.extLastSN - snAdjust) + r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber, extPkt.ExtSequenceNumber-r.extLastSN-snAdjust) r.updateSnOffset() r.tsOffset = extPkt.ExtTimestamp - r.extLastTS - tsAdjust diff --git a/pkg/sfu/rtpmunger_test.go b/pkg/sfu/rtpmunger_test.go index ea8842457..7c202ce1e 100644 --- a/pkg/sfu/rtpmunger_test.go +++ b/pkg/sfu/rtpmunger_test.go @@ -45,8 +45,11 @@ func TestSetLastSnTs(t *testing.T) { require.Equal(t, uint64(23333), r.extLastSN) require.Equal(t, uint64(0xabcdef), r.extLastTS) snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.Error(t, err) + snOffset, err = r.snRangeMap.GetValue(r.extLastSN) require.NoError(t, err) require.Equal(t, uint64(0), snOffset) + require.Equal(t, uint64(0), r.snOffset) require.Equal(t, uint64(0), r.tsOffset) } @@ -71,9 +74,11 @@ func TestUpdateSnTsOffsets(t *testing.T) { require.Equal(t, uint64(33332), r.extHighestIncomingSN) require.Equal(t, uint64(23333), r.extLastSN) require.Equal(t, uint64(0xabcdef), r.extLastTS) - snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) - require.NoError(t, err) - require.Equal(t, uint64(9999), snOffset) + _, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.Error(t, err) + _, err = r.snRangeMap.GetValue(r.extLastSN) + require.Error(t, err) + require.Equal(t, uint64(9999), r.snOffset) require.Equal(t, uint64(0xffff_ffff_ffff_ffff), r.tsOffset) } @@ -92,6 +97,8 @@ func TestPacketDropped(t *testing.T) { require.Equal(t, uint64(23333), r.extLastSN) require.Equal(t, uint64(0xabcdef), r.extLastTS) snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) + require.Error(t, err) + snOffset, err = r.snRangeMap.GetValue(r.extLastSN) require.NoError(t, err) require.Equal(t, uint64(0), snOffset) require.Equal(t, uint64(0), r.tsOffset) @@ -160,10 +167,11 @@ func TestOutOfOrderSequenceNumber(t *testing.T) { r.SetLastSnTs(extPkt) r.UpdateAndGetSnTs(extPkt) - // add a missing sequence number to the cache - r.snRangeMap.ExcludeRange(23332, 23333) + // should not be able to add a missing sequence number to the cache that is before start + err := r.snRangeMap.ExcludeRange(23332, 23333) + require.Error(t, err) - // out-of-order sequence number should be munged using cache + // out-of-order sequence number before start should miss params = &testutils.TestExtPacketParams{ SequenceNumber: 23331, Timestamp: 0xabcdef, @@ -172,13 +180,38 @@ func TestOutOfOrderSequenceNumber(t *testing.T) { } extPkt, _ = testutils.GetTestExtPacket(params) + tp, err := r.UpdateAndGetSnTs(extPkt) + require.Error(t, err) + + // add a missing sequence number to the cache + err = r.snRangeMap.ExcludeRange(23334, 23335) + require.NoError(t, err) + + params = &testutils.TestExtPacketParams{ + SequenceNumber: 23336, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 10, + } + extPkt, _ = testutils.GetTestExtPacket(params) + r.UpdateAndGetSnTs(extPkt) + + // out-of-order sequence number should be munged from cache + params = &testutils.TestExtPacketParams{ + SequenceNumber: 23335, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 10, + } + extPkt, _ = testutils.GetTestExtPacket(params) + tpExpected := TranslationParamsRTP{ snOrdering: SequenceNumberOrderingOutOfOrder, - extSequenceNumber: 23331, + extSequenceNumber: 23334, extTimestamp: 0xabcdef, } - tp, err := r.UpdateAndGetSnTs(extPkt) + tp, err = r.UpdateAndGetSnTs(extPkt) require.NoError(t, err) require.Equal(t, tpExpected, *tp) diff --git a/pkg/sfu/utils/rangemap.go b/pkg/sfu/utils/rangemap.go index 036c24494..fe5480832 100644 --- a/pkg/sfu/utils/rangemap.go +++ b/pkg/sfu/utils/rangemap.go @@ -59,12 +59,12 @@ func NewRangeMap[RT rangeType, VT valueType](size int) *RangeMap[RT, VT] { halfRange: 1 << ((unsafe.Sizeof(t) * 8) - 1), size: int(math.Max(float64(size), float64(minRanges))), } - r.initRanges(0) + r.initRanges(0, 0) return r } -func (r *RangeMap[RT, VT]) ClearAndResetValue(val VT) { - r.initRanges(val) +func (r *RangeMap[RT, VT]) ClearAndResetValue(start RT, val VT) { + r.initRanges(start, val) } func (r *RangeMap[RT, VT]) DecValue(end RT, dec VT) { @@ -87,10 +87,10 @@ func (r *RangeMap[RT, VT]) DecValue(end RT, dec VT) { r.prune() } -func (r *RangeMap[RT, VT]) initRanges(val VT) { +func (r *RangeMap[RT, VT]) initRanges(start RT, val VT) { r.ranges = []rangeVal[RT, VT]{ { - start: 0, + start: start, end: 0, value: val, }, diff --git a/pkg/sfu/utils/rangemap_test.go b/pkg/sfu/utils/rangemap_test.go index 36a9daa97..657481fc7 100644 --- a/pkg/sfu/utils/rangemap_test.go +++ b/pkg/sfu/utils/rangemap_test.go @@ -219,10 +219,10 @@ func TestRangeMapUint32(t *testing.T) { require.Equal(t, uint32(17), value) // reset - r.ClearAndResetValue(23) + r.ClearAndResetValue(24, 23) expectedRanges = []rangeVal[uint32, uint32]{ { - start: 0, + start: 24, end: 0, value: 23, }, @@ -233,12 +233,13 @@ func TestRangeMapUint32(t *testing.T) { require.NoError(t, err) require.Equal(t, uint32(23), value) - // decrement value and ensure that any key returns that value + // decrement value and ensure that any key after start in ClearAndResetValue above returns that value + // (as given end is higher than open range start, open range should be closed and a new range added) r.DecValue(34, 12) expectedRanges = []rangeVal[uint32, uint32]{ { - start: 0, + start: 24, end: 34, value: 23, }, @@ -260,7 +261,7 @@ func TestRangeMapUint32(t *testing.T) { expectedRanges = []rangeVal[uint32, uint32]{ { - start: 0, + start: 24, end: 34, value: 23, }, @@ -277,8 +278,12 @@ func TestRangeMapUint32(t *testing.T) { } require.Equal(t, expectedRanges, r.ranges) - // first range access + // before first range access value, err = r.GetValue(5) + require.ErrorIs(t, err, errKeyTooOld) + + // first range access + value, err = r.GetValue(25) require.NoError(t, err) require.Equal(t, uint32(23), value) @@ -314,7 +319,7 @@ func TestRangeMapUint32(t *testing.T) { require.Equal(t, expectedRanges, r.ranges) // aged out range access - value, err = r.GetValue(5) + value, err = r.GetValue(25) require.ErrorIs(t, err, errKeyTooOld) // access closed range before decrementing value