Prevent old packets resolution. (#2134)

* Prevent old packets resolution.

With range map, we are just looking up ranges and not exactly
which packets were missing. This caused the case of old packets
being resolved after layer switch.

For example,
- Packet 10 is layer switch, range map gets reset
- Packet 11, 12, 13 are forwarded
- Packet 9 comes, it should ideally be dropped as pre-layer switch old
  packet. But, when looking up range map, it gets an offset and hence
  gets re-mapped to something before layer switch. This was probably
  okay as decoders would have had a key frame at the switch point and
  moved ahead, but incorrect technically.

Fix is to reset the start point in the range map to the switch point
and not 0. So, when packet 9 comes, range map will return "key too old"
error and that packet will be dropped as missing from cache.

* fix tests
This commit is contained in:
Raja Subramanian
2023-10-07 10:56:34 +05:30
committed by GitHub
parent f40a97fd3e
commit 2ff8fe3b78
5 changed files with 82 additions and 33 deletions

View File

@@ -1237,11 +1237,23 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
require.Equal(t, expectedTP, *actualTP)
// add a missing sequence number to the cache
f.rtpMunger.snRangeMap.ExcludeRange(23332, 23333)
err = f.rtpMunger.snRangeMap.ExcludeRange(23334, 23335)
require.NoError(t, err)
params = &testutils.TestExtPacketParams{
SequenceNumber: 23336,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 20,
}
extPkt, _ = testutils.GetTestExtPacket(params)
_, err = f.GetTranslationParams(extPkt, 0)
require.NoError(t, err)
// out-of-order packet should get offset from cache
params = &testutils.TestExtPacketParams{
SequenceNumber: 23331,
SequenceNumber: 23335,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 20,
@@ -1251,7 +1263,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
extSequenceNumber: 23331,
extSequenceNumber: 23334,
extTimestamp: 0xabcdef,
},
}
@@ -1261,7 +1273,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
// padding only packet in order should be dropped
params = &testutils.TestExtPacketParams{
SequenceNumber: 23334,
SequenceNumber: 23337,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
}
@@ -1276,7 +1288,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
// in order packet should be forwarded
params = &testutils.TestExtPacketParams{
SequenceNumber: 23335,
SequenceNumber: 23338,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 20,
@@ -1286,7 +1298,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
extSequenceNumber: 23333,
extSequenceNumber: 23336,
extTimestamp: 0xabcdef,
},
}
@@ -1296,7 +1308,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
// padding only packet after a gap should not be dropped
params = &testutils.TestExtPacketParams{
SequenceNumber: 23337,
SequenceNumber: 23340,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
}
@@ -1305,7 +1317,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingGap,
extSequenceNumber: 23335,
extSequenceNumber: 23338,
extTimestamp: 0xabcdef,
},
}
@@ -1325,7 +1337,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
extSequenceNumber: 23334,
extSequenceNumber: 23335,
extTimestamp: 0xabcdef,
},
}
@@ -1345,7 +1357,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
extSequenceNumber: 23336,
extSequenceNumber: 23339,
extTimestamp: 0xabcdf0,
},
}

View File

@@ -23,9 +23,7 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/utils"
)
//
// RTPMunger
//
type SequenceNumberOrdering int
const (
@@ -121,6 +119,7 @@ func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) {
r.extLastSN = extPkt.ExtSequenceNumber
r.extSecondLastSN = r.extLastSN - 1
r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber, 0)
r.updateSnOffset()
r.extLastTS = extPkt.ExtTimestamp
@@ -129,7 +128,7 @@ func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) {
func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint64, tsAdjust uint64) {
r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1
r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber - r.extLastSN - snAdjust)
r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber, extPkt.ExtSequenceNumber-r.extLastSN-snAdjust)
r.updateSnOffset()
r.tsOffset = extPkt.ExtTimestamp - r.extLastTS - tsAdjust

View File

@@ -45,8 +45,11 @@ func TestSetLastSnTs(t *testing.T) {
require.Equal(t, uint64(23333), r.extLastSN)
require.Equal(t, uint64(0xabcdef), r.extLastTS)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.Error(t, err)
snOffset, err = r.snRangeMap.GetValue(r.extLastSN)
require.NoError(t, err)
require.Equal(t, uint64(0), snOffset)
require.Equal(t, uint64(0), r.snOffset)
require.Equal(t, uint64(0), r.tsOffset)
}
@@ -71,9 +74,11 @@ func TestUpdateSnTsOffsets(t *testing.T) {
require.Equal(t, uint64(33332), r.extHighestIncomingSN)
require.Equal(t, uint64(23333), r.extLastSN)
require.Equal(t, uint64(0xabcdef), r.extLastTS)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint64(9999), snOffset)
_, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.Error(t, err)
_, err = r.snRangeMap.GetValue(r.extLastSN)
require.Error(t, err)
require.Equal(t, uint64(9999), r.snOffset)
require.Equal(t, uint64(0xffff_ffff_ffff_ffff), r.tsOffset)
}
@@ -92,6 +97,8 @@ func TestPacketDropped(t *testing.T) {
require.Equal(t, uint64(23333), r.extLastSN)
require.Equal(t, uint64(0xabcdef), r.extLastTS)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.Error(t, err)
snOffset, err = r.snRangeMap.GetValue(r.extLastSN)
require.NoError(t, err)
require.Equal(t, uint64(0), snOffset)
require.Equal(t, uint64(0), r.tsOffset)
@@ -160,10 +167,11 @@ func TestOutOfOrderSequenceNumber(t *testing.T) {
r.SetLastSnTs(extPkt)
r.UpdateAndGetSnTs(extPkt)
// add a missing sequence number to the cache
r.snRangeMap.ExcludeRange(23332, 23333)
// should not be able to add a missing sequence number to the cache that is before start
err := r.snRangeMap.ExcludeRange(23332, 23333)
require.Error(t, err)
// out-of-order sequence number should be munged using cache
// out-of-order sequence number before start should miss
params = &testutils.TestExtPacketParams{
SequenceNumber: 23331,
Timestamp: 0xabcdef,
@@ -172,13 +180,38 @@ func TestOutOfOrderSequenceNumber(t *testing.T) {
}
extPkt, _ = testutils.GetTestExtPacket(params)
tp, err := r.UpdateAndGetSnTs(extPkt)
require.Error(t, err)
// add a missing sequence number to the cache
err = r.snRangeMap.ExcludeRange(23334, 23335)
require.NoError(t, err)
params = &testutils.TestExtPacketParams{
SequenceNumber: 23336,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 10,
}
extPkt, _ = testutils.GetTestExtPacket(params)
r.UpdateAndGetSnTs(extPkt)
// out-of-order sequence number should be munged from cache
params = &testutils.TestExtPacketParams{
SequenceNumber: 23335,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 10,
}
extPkt, _ = testutils.GetTestExtPacket(params)
tpExpected := TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
extSequenceNumber: 23331,
extSequenceNumber: 23334,
extTimestamp: 0xabcdef,
}
tp, err := r.UpdateAndGetSnTs(extPkt)
tp, err = r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)

View File

@@ -59,12 +59,12 @@ func NewRangeMap[RT rangeType, VT valueType](size int) *RangeMap[RT, VT] {
halfRange: 1 << ((unsafe.Sizeof(t) * 8) - 1),
size: int(math.Max(float64(size), float64(minRanges))),
}
r.initRanges(0)
r.initRanges(0, 0)
return r
}
func (r *RangeMap[RT, VT]) ClearAndResetValue(val VT) {
r.initRanges(val)
func (r *RangeMap[RT, VT]) ClearAndResetValue(start RT, val VT) {
r.initRanges(start, val)
}
func (r *RangeMap[RT, VT]) DecValue(end RT, dec VT) {
@@ -87,10 +87,10 @@ func (r *RangeMap[RT, VT]) DecValue(end RT, dec VT) {
r.prune()
}
func (r *RangeMap[RT, VT]) initRanges(val VT) {
func (r *RangeMap[RT, VT]) initRanges(start RT, val VT) {
r.ranges = []rangeVal[RT, VT]{
{
start: 0,
start: start,
end: 0,
value: val,
},

View File

@@ -219,10 +219,10 @@ func TestRangeMapUint32(t *testing.T) {
require.Equal(t, uint32(17), value)
// reset
r.ClearAndResetValue(23)
r.ClearAndResetValue(24, 23)
expectedRanges = []rangeVal[uint32, uint32]{
{
start: 0,
start: 24,
end: 0,
value: 23,
},
@@ -233,12 +233,13 @@ func TestRangeMapUint32(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint32(23), value)
// decrement value and ensure that any key returns that value
// decrement value and ensure that any key after start in ClearAndResetValue above returns that value
// (as given end is higher than open range start, open range should be closed and a new range added)
r.DecValue(34, 12)
expectedRanges = []rangeVal[uint32, uint32]{
{
start: 0,
start: 24,
end: 34,
value: 23,
},
@@ -260,7 +261,7 @@ func TestRangeMapUint32(t *testing.T) {
expectedRanges = []rangeVal[uint32, uint32]{
{
start: 0,
start: 24,
end: 34,
value: 23,
},
@@ -277,8 +278,12 @@ func TestRangeMapUint32(t *testing.T) {
}
require.Equal(t, expectedRanges, r.ranges)
// first range access
// before first range access
value, err = r.GetValue(5)
require.ErrorIs(t, err, errKeyTooOld)
// first range access
value, err = r.GetValue(25)
require.NoError(t, err)
require.Equal(t, uint32(23), value)
@@ -314,7 +319,7 @@ func TestRangeMapUint32(t *testing.T) {
require.Equal(t, expectedRanges, r.ranges)
// aged out range access
value, err = r.GetValue(5)
value, err = r.GetValue(25)
require.ErrorIs(t, err, errKeyTooOld)
// access closed range before decrementing value