Use range map in RTPMunger. (#2000)

* WIP commit

* Make lastSN 32-bit

* Remove unused TSCycles
This commit is contained in:
Raja Subramanian
2023-08-27 10:49:17 +05:30
committed by GitHub
parent 3c31ae5dc0
commit 55d5edcf73
9 changed files with 229 additions and 196 deletions

View File

@@ -54,6 +54,7 @@ type pendingPacket struct {
type ExtPacket struct {
VideoLayer
Arrival time.Time
ExtSequenceNumber uint32
Packet *rtp.Packet
Payload interface{}
KeyFrame bool
@@ -83,7 +84,6 @@ type Buffer struct {
snRangeMap *utils.RangeMap[uint32, uint32]
// supported feedbacks
latestTSForAudioLevelInitialized bool
latestTSForAudioLevel uint32
@@ -441,7 +441,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {
b.doReports(arrivalTime)
ep := b.getExtPacket(&rtpPacket, arrivalTime)
ep := b.getExtPacket(&rtpPacket, extSeqNumber, arrivalTime)
if ep == nil {
return
}
@@ -546,10 +546,11 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) {
}
}
func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time) *ExtPacket {
func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, extSeqNumber uint32, arrivalTime time.Time) *ExtPacket {
ep := &ExtPacket{
Packet: rtpPacket,
Arrival: arrivalTime,
Arrival: arrivalTime,
ExtSequenceNumber: extSeqNumber,
Packet: rtpPacket,
VideoLayer: VideoLayer{
Spatial: InvalidLayerSpatial,
Temporal: InvalidLayerTemporal,

View File

@@ -84,6 +84,7 @@ var (
ErrOutOfOrderSequenceNumberCacheMiss = errors.New("out-of-order sequence number not found in cache")
ErrPaddingOnlyPacket = errors.New("padding only packet that need not be forwarded")
ErrDuplicatePacket = errors.New("duplicate packet")
ErrSequenceNumberOffsetNotFound = errors.New("sequence number offset not found")
ErrPaddingNotOnFrameBoundary = errors.New("padding cannot send on non-frame boundary")
ErrDownTrackAlreadyBound = errors.New("already bound")
)
@@ -1634,16 +1635,10 @@ func (d *DownTrack) translateVP8PacketTo(pkt *rtp.Packet, incomingVP8 *buffer.VP
}
func (d *DownTrack) DebugInfo() map[string]interface{} {
rtpMungerParams := d.forwarder.GetRTPMungerParams()
stats := map[string]interface{}{
"HighestIncomingSN": rtpMungerParams.highestIncomingSN,
"LastSN": rtpMungerParams.lastSN,
"SNOffset": rtpMungerParams.snOffset,
"LastTS": rtpMungerParams.lastTS,
"TSOffset": rtpMungerParams.tsOffset,
"LastMarker": rtpMungerParams.lastMarker,
"LastPli": d.rtpStats.LastPli(),
"LastPli": d.rtpStats.LastPli(),
}
stats["RTPMunger"] = d.forwarder.RTPMungerDebugInfo()
senderReport := d.CreateSenderReport()
if senderReport != nil {

View File

@@ -1585,7 +1585,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
"expectedTS", expectedTS,
"nextTS", nextTS,
"tsJump", nextTS-lastTS,
"nextSN", rtpMungerState.LastSN+1,
"nextSN", rtpMungerState.ExtLastSN+1,
)
f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, nextTS-lastTS)
@@ -1792,11 +1792,11 @@ func (f *Forwarder) GetPadding(frameEndNeeded bool) ([]byte, error) {
return f.codecMunger.UpdateAndGetPadding(!frameEndNeeded)
}
func (f *Forwarder) GetRTPMungerParams() RTPMungerParams {
func (f *Forwarder) RTPMungerDebugInfo() map[string]interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
return f.rtpMunger.GetParams()
return f.rtpMunger.DebugInfo()
}
// -----------------------------------------------------------------------------

View File

@@ -1196,9 +1196,13 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expectedTP, *actualTP)
// add a missing sequence number to the cache
f.rtpMunger.snRangeMap.IncValue(10)
f.rtpMunger.snRangeMap.AddRange(23332, 23333)
// out-of-order packet not in cache should be dropped
params = &testutils.TestExtPacketParams{
SequenceNumber: 23332,
SequenceNumber: 23331,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 20,
@@ -1239,7 +1243,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
sequenceNumber: 23334,
sequenceNumber: 23324,
timestamp: 0xabcdef,
},
}
@@ -1247,7 +1251,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expectedTP, *actualTP)
// padding only packet after a gap should be forwarded
// padding only packet after a gap should not be dropped
params = &testutils.TestExtPacketParams{
SequenceNumber: 23337,
Timestamp: 0xabcdef,
@@ -1258,7 +1262,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingGap,
sequenceNumber: 23336,
sequenceNumber: 23326,
timestamp: 0xabcdef,
},
}
@@ -1278,7 +1282,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
sequenceNumber: 23335,
sequenceNumber: 23325,
timestamp: 0xabcdef,
},
}
@@ -1298,7 +1302,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
sequenceNumber: 23337,
sequenceNumber: 23327,
timestamp: 0xabcdf0,
},
}
@@ -1716,7 +1720,7 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) {
require.Equal(t, f.lastSSRC, params.SSRC)
}
func TestForwardGetSnTsForPadding(t *testing.T) {
func TestForwarderGetSnTsForPadding(t *testing.T) {
f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo)
params := &testutils.TestExtPacketParams{
@@ -1783,7 +1787,7 @@ func TestForwardGetSnTsForPadding(t *testing.T) {
require.Equal(t, sntsExpected, snts)
}
func TestForwardGetSnTsForBlankFrames(t *testing.T) {
func TestForwarderGetSnTsForBlankFrames(t *testing.T) {
f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo)
params := &testutils.TestExtPacketParams{
@@ -1860,7 +1864,7 @@ func TestForwardGetSnTsForBlankFrames(t *testing.T) {
require.Equal(t, sntsExpected, snts)
}
func TestForwardGetPaddingVP8(t *testing.T) {
func TestForwarderGetPaddingVP8(t *testing.T) {
f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo)
params := &testutils.TestExtPacketParams{

View File

@@ -20,6 +20,7 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/utils"
)
//
@@ -36,9 +37,6 @@ const (
const (
RtxGateWindow = 2000
SnOffsetCacheSize = 4096
SnOffsetCacheMask = SnOffsetCacheSize - 1
)
type TranslationParamsRTP struct {
@@ -55,97 +53,96 @@ type SnTs struct {
// ----------------------------------------------------------------------
type RTPMungerState struct {
LastSN uint16
LastTS uint32
ExtLastSN uint32
LastTS uint32
}
func (r RTPMungerState) String() string {
return fmt.Sprintf("RTPMungerState{lastSN: %d, lastTS: %d)", r.LastSN, r.LastTS)
return fmt.Sprintf("RTPMungerState{extLastSN: %d, lastTS: %d)", r.ExtLastSN, r.LastTS)
}
// ----------------------------------------------------------------------
type RTPMungerParams struct {
highestIncomingSN uint16
lastSN uint16
snOffset uint16
lastTS uint32
tsOffset uint32
lastMarker bool
snOffsets [SnOffsetCacheSize]uint16
snOffsetsWritePtr int
snOffsetsOccupancy int
rtxGateSn uint16
isInRtxGateRegion bool
}
type RTPMunger struct {
logger logger.Logger
RTPMungerParams
extHighestIncomingSN uint32
snRangeMap *utils.RangeMap[uint32, uint32]
extLastSN uint32
lastTS uint32
tsOffset uint32
lastMarker bool
extRtxGateSn uint32
isInRtxGateRegion bool
}
func NewRTPMunger(logger logger.Logger) *RTPMunger {
return &RTPMunger{
logger: logger,
logger: logger,
snRangeMap: utils.NewRangeMap[uint32, uint32](100),
}
}
func (r *RTPMunger) GetParams() RTPMungerParams {
return RTPMungerParams{
highestIncomingSN: r.highestIncomingSN,
lastSN: r.lastSN,
snOffset: r.snOffset,
lastTS: r.lastTS,
tsOffset: r.tsOffset,
lastMarker: r.lastMarker,
func (r *RTPMunger) DebugInfo() map[string]interface{} {
snOffset, _ := r.snRangeMap.GetValue(r.extHighestIncomingSN)
return map[string]interface{}{
"ExtHighestIncomingSN": r.extHighestIncomingSN,
"ExtLastSN": r.extLastSN,
"SNOffset": snOffset,
"LastTS": r.lastTS,
"TSOffset": r.tsOffset,
"LastMarker": r.lastMarker,
}
}
func (r *RTPMunger) GetLast() RTPMungerState {
return RTPMungerState{
LastSN: r.lastSN,
LastTS: r.lastTS,
ExtLastSN: r.extLastSN,
LastTS: r.lastTS,
}
}
func (r *RTPMunger) SeedLast(state RTPMungerState) {
r.lastSN = state.LastSN
r.extLastSN = state.ExtLastSN
r.lastTS = state.LastTS
}
func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) {
r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1
r.lastSN = extPkt.Packet.SequenceNumber
r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1
r.extLastSN = extPkt.ExtSequenceNumber
r.lastTS = extPkt.Packet.Timestamp
}
func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, tsAdjust uint32) {
r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1
r.snOffset = extPkt.Packet.SequenceNumber - r.lastSN - snAdjust
func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint32, tsAdjust uint32) {
r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1
r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber - r.extLastSN - snAdjust)
r.tsOffset = extPkt.Packet.Timestamp - r.lastTS - tsAdjust
// clear offsets cache layer/source switch
r.snOffsetsWritePtr = 0
r.snOffsetsOccupancy = 0
}
func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) {
if r.highestIncomingSN != extPkt.Packet.SequenceNumber {
if r.extHighestIncomingSN != extPkt.ExtSequenceNumber {
return
}
r.snOffset++
r.lastSN = extPkt.Packet.SequenceNumber - r.snOffset
r.snRangeMap.IncValue(1)
snOffset, err := r.snRangeMap.GetValue(extPkt.ExtSequenceNumber)
if err != nil {
r.logger.Errorw("could not get sequence number offset", err)
return
}
r.extLastSN = extPkt.ExtSequenceNumber - snOffset
}
func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) {
// if out-of-order, look up sequence number offset cache
diff := extPkt.Packet.SequenceNumber - r.highestIncomingSN
if diff > (1 << 15) {
snOffset, isValid := r.getSnOffset(extPkt.Packet.SequenceNumber)
if !isValid {
diff := extPkt.ExtSequenceNumber - r.extHighestIncomingSN
if diff > (1 << 31) {
// out-of-order, look up sequence number offset cache
snOffset, err := r.snRangeMap.GetValue(extPkt.ExtSequenceNumber)
if err != nil {
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
}, ErrOutOfOrderSequenceNumberCacheMiss
@@ -153,72 +150,60 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
sequenceNumber: extPkt.Packet.SequenceNumber - snOffset,
sequenceNumber: uint16(extPkt.ExtSequenceNumber - snOffset),
timestamp: extPkt.Packet.Timestamp - r.tsOffset,
}, nil
}
// record sn offset
for i := r.highestIncomingSN + 1; i != extPkt.Packet.SequenceNumber+1; i++ {
r.snOffsets[r.snOffsetsWritePtr] = r.snOffset
r.snOffsetsWritePtr = (r.snOffsetsWritePtr + 1) & SnOffsetCacheMask
r.snOffsetsOccupancy++
}
if r.snOffsetsOccupancy > SnOffsetCacheSize {
r.snOffsetsOccupancy = SnOffsetCacheSize
// can get duplicate packet due to FEC
if diff == 0 {
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingDuplicate,
}, ErrDuplicatePacket
}
ordering := SequenceNumberOrderingContiguous
if diff > 1 {
ordering = SequenceNumberOrderingGap
} else {
// can get duplicate packet due to FEC
if diff == 0 {
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingDuplicate,
}, ErrDuplicatePacket
}
// if padding only packet, can be dropped and sequence number adjusted
// as it is contiguous and in order. That means this is the highest
// incoming sequence number, and it is a good point to adjust
// sequence number offset.
if len(extPkt.Packet.Payload) == 0 {
r.highestIncomingSN = extPkt.Packet.SequenceNumber
r.snOffset++
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
}, ErrPaddingOnlyPacket
}
r.snRangeMap.AddRange(r.extHighestIncomingSN+1, extPkt.ExtSequenceNumber)
}
// in-order incoming packet, may or may not be contiguous.
// In the case of loss (i.e. incoming sequence number is not contiguous),
// forward even if it is a padding only packet. With temporal scalability,
// it is unclear if the current packet should be dropped if it is not
// contiguous. Hence, forward anything that is not contiguous.
// Reference: http://www.rtcbits.com/2017/04/howto-implement-temporal-scalability.html
mungedSN := extPkt.Packet.SequenceNumber - r.snOffset
r.extHighestIncomingSN = extPkt.ExtSequenceNumber
// if padding only packet, can be dropped and sequence number adjusted, if contiguous
if diff == 1 && len(extPkt.Packet.Payload) == 0 {
r.snRangeMap.IncValue(1)
return &TranslationParamsRTP{
snOrdering: ordering,
}, ErrPaddingOnlyPacket
}
snOffset, err := r.snRangeMap.GetValue(extPkt.ExtSequenceNumber)
if err != nil {
return &TranslationParamsRTP{
snOrdering: ordering,
}, ErrSequenceNumberOffsetNotFound
}
extMungedSN := extPkt.ExtSequenceNumber - snOffset
mungedTS := extPkt.Packet.Timestamp - r.tsOffset
r.highestIncomingSN = extPkt.Packet.SequenceNumber
r.lastSN = mungedSN
r.extLastSN = extMungedSN
r.lastTS = mungedTS
r.lastMarker = extPkt.Packet.Marker
if extPkt.KeyFrame {
r.rtxGateSn = mungedSN
r.extRtxGateSn = extMungedSN
r.isInRtxGateRegion = true
}
if r.isInRtxGateRegion && (mungedSN-r.rtxGateSn) < (1<<15) && (mungedSN-r.rtxGateSn) > RtxGateWindow {
if r.isInRtxGateRegion && (extMungedSN-r.extRtxGateSn) > RtxGateWindow {
r.isInRtxGateRegion = false
}
return &TranslationParamsRTP{
snOrdering: ordering,
sequenceNumber: mungedSN,
sequenceNumber: uint16(extMungedSN),
timestamp: mungedTS,
}, nil
}
@@ -230,7 +215,7 @@ func (r *RTPMunger) FilterRTX(nacks []uint16) []uint16 {
filtered := make([]uint16, 0, len(nacks))
for _, sn := range nacks {
if (sn - r.rtxGateSn) < (1 << 15) {
if (sn - uint16(r.extRtxGateSn)) < (1 << 15) {
filtered = append(filtered, sn)
}
}
@@ -251,10 +236,12 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate
tsOffset = 1
}
extLastSN := r.extLastSN
lastTS := r.lastTS
vals := make([]SnTs, num)
for i := 0; i < num; i++ {
vals[i].sequenceNumber = r.lastSN + uint16(i) + 1
extLastSN++
vals[i].sequenceNumber = uint16(extLastSN)
if frameRate != 0 {
if useLastTSForFirst && i == 0 {
vals[i].timestamp = r.lastTS
@@ -271,8 +258,8 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate
}
}
r.lastSN = vals[num-1].sequenceNumber
r.snOffset -= uint16(num)
r.extLastSN = extLastSN
r.snRangeMap.DecValue(uint32(num))
r.tsOffset -= vals[num-1].timestamp - r.lastTS
r.lastTS = vals[num-1].timestamp
@@ -287,13 +274,3 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate
func (r *RTPMunger) IsOnFrameBoundary() bool {
return r.lastMarker
}
func (r *RTPMunger) getSnOffset(sn uint16) (uint16, bool) {
diff := r.highestIncomingSN - sn
if int(diff) >= r.snOffsetsOccupancy {
return 0, false
}
readPtr := (r.snOffsetsWritePtr - int(diff) - 1) & SnOffsetCacheMask
return r.snOffsets[readPtr], true
}

View File

@@ -41,10 +41,12 @@ func TestSetLastSnTs(t *testing.T) {
require.NotNil(t, extPkt)
r.SetLastSnTs(extPkt)
require.Equal(t, uint16(23332), r.highestIncomingSN)
require.Equal(t, uint16(23333), r.lastSN)
require.Equal(t, uint32(23332), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
require.Equal(t, uint32(0xabcdef), r.lastTS)
require.Equal(t, uint16(0), r.snOffset)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(0), snOffset)
require.Equal(t, uint32(0), r.tsOffset)
}
@@ -66,10 +68,12 @@ func TestUpdateSnTsOffsets(t *testing.T) {
}
extPkt, _ = testutils.GetTestExtPacket(params)
r.UpdateSnTsOffsets(extPkt, 1, 1)
require.Equal(t, uint16(33332), r.highestIncomingSN)
require.Equal(t, uint16(23333), r.lastSN)
require.Equal(t, uint32(33332), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
require.Equal(t, uint32(0xabcdef), r.lastTS)
require.Equal(t, uint16(9999), r.snOffset)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(9999), snOffset)
require.Equal(t, uint32(0xffffffff), r.tsOffset)
}
@@ -84,14 +88,15 @@ func TestPacketDropped(t *testing.T) {
}
extPkt, _ := testutils.GetTestExtPacket(params)
r.SetLastSnTs(extPkt)
require.Equal(t, uint16(23332), r.highestIncomingSN)
require.Equal(t, uint16(23333), r.lastSN)
require.Equal(t, uint32(23332), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
require.Equal(t, uint32(0xabcdef), r.lastTS)
require.Equal(t, uint16(0), r.snOffset)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(0), snOffset)
require.Equal(t, uint32(0), r.tsOffset)
r.UpdateAndGetSnTs(extPkt) // update sequence number offset
require.Equal(t, 1, r.snOffsetsWritePtr)
// drop a non-head packet, should cause no change in internals
params = &testutils.TestExtPacketParams{
@@ -101,9 +106,11 @@ func TestPacketDropped(t *testing.T) {
}
extPkt, _ = testutils.GetTestExtPacket(params)
r.PacketDropped(extPkt)
require.Equal(t, uint16(23333), r.highestIncomingSN)
require.Equal(t, uint16(23333), r.lastSN)
require.Equal(t, uint16(0), r.snOffset)
require.Equal(t, uint32(23333), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(0), snOffset)
// drop a head packet and check offset increases
params = &testutils.TestExtPacketParams{
@@ -115,13 +122,12 @@ func TestPacketDropped(t *testing.T) {
extPkt, _ = testutils.GetTestExtPacket(params)
r.UpdateAndGetSnTs(extPkt) // update sequence number offset
snOffsetWritePtr := (44444 - 23333 + 1) & SnOffsetCacheMask
require.Equal(t, snOffsetWritePtr, r.snOffsetsWritePtr)
require.Equal(t, SnOffsetCacheSize, r.snOffsetsOccupancy)
r.PacketDropped(extPkt)
require.Equal(t, r.lastSN, uint16(44443))
require.Equal(t, uint16(1), r.snOffset)
require.Equal(t, uint32(44443), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
params = &testutils.TestExtPacketParams{
SequenceNumber: 44445,
@@ -132,11 +138,10 @@ func TestPacketDropped(t *testing.T) {
extPkt, _ = testutils.GetTestExtPacket(params)
r.UpdateAndGetSnTs(extPkt) // update sequence number offset
require.Equal(t, uint16(1), r.snOffsets[snOffsetWritePtr])
snOffsetWritePtr = (snOffsetWritePtr + 1) & SnOffsetCacheMask
require.Equal(t, snOffsetWritePtr, r.snOffsetsWritePtr)
require.Equal(t, uint16(44444), r.lastSN)
require.Equal(t, uint16(1), r.snOffset)
require.Equal(t, r.extLastSN, uint32(44444))
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
}
func TestOutOfOrderSequenceNumber(t *testing.T) {
@@ -152,9 +157,13 @@ func TestOutOfOrderSequenceNumber(t *testing.T) {
r.SetLastSnTs(extPkt)
r.UpdateAndGetSnTs(extPkt)
// add a missing sequence number to the cache
r.snRangeMap.IncValue(10)
r.snRangeMap.AddRange(23332, 23333)
// out-of-order sequence number not in the missing sequence number cache
params = &testutils.TestExtPacketParams{
SequenceNumber: 23332,
SequenceNumber: 23331,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 10,
@@ -170,9 +179,13 @@ func TestOutOfOrderSequenceNumber(t *testing.T) {
require.ErrorIs(t, err, ErrOutOfOrderSequenceNumberCacheMiss)
require.Equal(t, tpExpected, *tp)
// add missing sequence number to the cache and try again
r.snOffsets[SnOffsetCacheSize-1] = 10
r.snOffsetsOccupancy++
params = &testutils.TestExtPacketParams{
SequenceNumber: 23332,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 10,
}
extPkt, _ = testutils.GetTestExtPacket(params)
tpExpected = TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
@@ -230,9 +243,11 @@ func TestPaddingOnlyPacket(t *testing.T) {
require.Error(t, err)
require.ErrorIs(t, err, ErrPaddingOnlyPacket)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(23333), r.highestIncomingSN)
require.Equal(t, uint16(23333), r.lastSN)
require.Equal(t, uint16(1), r.snOffset)
require.Equal(t, uint32(23333), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
// padding only packet with a gap should not report an error
params = &testutils.TestExtPacketParams{
@@ -251,9 +266,11 @@ func TestPaddingOnlyPacket(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(23335), r.highestIncomingSN)
require.Equal(t, uint16(23334), r.lastSN)
require.Equal(t, uint16(1), r.snOffset)
require.Equal(t, uint32(23335), r.extHighestIncomingSN)
require.Equal(t, uint32(23334), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
}
func TestGapInSequenceNumber(t *testing.T) {
@@ -274,6 +291,7 @@ func TestGapInSequenceNumber(t *testing.T) {
// three lost packets
params = &testutils.TestExtPacketParams{
SequenceNumber: 1,
SNCycles: 1,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 33,
@@ -289,22 +307,25 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err := r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(1), r.highestIncomingSN)
require.Equal(t, uint16(1), r.lastSN)
require.Equal(t, uint16(0), r.snOffset)
require.Equal(t, uint32(65536+1), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+1), r.extLastSN)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(0), snOffset)
// ensure missing sequence numbers got recorded in cache
// last received, three missing in between and current received should all be in cache
for i := uint16(65533); i != 2; i++ {
offset, ok := r.getSnOffset(i)
require.True(t, ok)
require.Equal(t, uint16(0), offset)
for i := uint32(65534); i != 65536+1; i++ {
offset, err := r.snRangeMap.GetValue(i)
require.NoError(t, err)
require.Equal(t, uint32(0), offset)
}
// a padding only packet should be dropped
params = &testutils.TestExtPacketParams{
SequenceNumber: 2,
SNCycles: 1,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
}
@@ -317,13 +338,16 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.ErrorIs(t, err, ErrPaddingOnlyPacket)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(2), r.highestIncomingSN)
require.Equal(t, uint16(1), r.lastSN)
require.Equal(t, uint16(1), r.snOffset)
require.Equal(t, uint32(65536+2), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+1), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
// a packet with a gap should be adding to missing cache
params = &testutils.TestExtPacketParams{
SequenceNumber: 4,
SNCycles: 1,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 22,
@@ -339,13 +363,16 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(4), r.highestIncomingSN)
require.Equal(t, uint16(3), r.lastSN)
require.Equal(t, uint16(1), r.snOffset)
require.Equal(t, uint32(65536+4), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+3), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
// another contiguous padding only packet should be dropped
params = &testutils.TestExtPacketParams{
SequenceNumber: 5,
SNCycles: 1,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
}
@@ -358,13 +385,16 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.ErrorIs(t, err, ErrPaddingOnlyPacket)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(5), r.highestIncomingSN)
require.Equal(t, uint16(3), r.lastSN)
require.Equal(t, uint16(2), r.snOffset)
require.Equal(t, uint32(65536+5), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+3), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(2), snOffset)
// a packet with a gap should be adding to missing cache
params = &testutils.TestExtPacketParams{
SequenceNumber: 7,
SNCycles: 1,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 22,
@@ -380,13 +410,16 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(7), r.highestIncomingSN)
require.Equal(t, uint16(5), r.lastSN)
require.Equal(t, uint16(2), r.snOffset)
require.Equal(t, uint32(65536+7), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+5), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(2), snOffset)
// check the missing packets
params = &testutils.TestExtPacketParams{
SequenceNumber: 6,
SNCycles: 1,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
}
@@ -401,12 +434,15 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(7), r.highestIncomingSN)
require.Equal(t, uint16(5), r.lastSN)
require.Equal(t, uint16(2), r.snOffset)
require.Equal(t, uint32(65536+7), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+5), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(2), snOffset)
params = &testutils.TestExtPacketParams{
SequenceNumber: 3,
SNCycles: 1,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
}
@@ -421,9 +457,11 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(7), r.highestIncomingSN)
require.Equal(t, uint16(5), r.lastSN)
require.Equal(t, uint16(2), r.snOffset)
require.Equal(t, uint32(65536+7), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+5), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(2), snOffset)
}
func TestUpdateAndGetPaddingSnTs(t *testing.T) {

View File

@@ -30,6 +30,7 @@ type TestExtPacketParams struct {
IsKeyFrame bool
PayloadType uint8
SequenceNumber uint16
SNCycles int
Timestamp uint32
SSRC uint32
PayloadSize int
@@ -61,11 +62,12 @@ func GetTestExtPacket(params *TestExtPacketParams) (*buffer.ExtPacket, error) {
}
ep := &buffer.ExtPacket{
VideoLayer: params.VideoLayer,
Arrival: params.ArrivalTime,
Packet: &packet,
KeyFrame: params.IsKeyFrame,
RawPacket: raw,
VideoLayer: params.VideoLayer,
ExtSequenceNumber: uint32(params.SNCycles<<16) + uint32(params.SequenceNumber),
Arrival: params.ArrivalTime,
Packet: &packet,
KeyFrame: params.IsKeyFrame,
RawPacket: raw,
}
return ep, nil

View File

@@ -59,12 +59,21 @@ func NewRangeMap[RT rangeType, VT valueType](size int) *RangeMap[RT, VT] {
}
}
func (r *RangeMap[RT, VT]) ClearAndResetValue(val VT) {
r.ranges = r.ranges[:0]
r.runningValue = val
}
func (r *RangeMap[RT, VT]) IncValue(inc VT) {
r.runningValue += inc
}
func (r *RangeMap[RT, VT]) DecValue(dec VT) {
r.runningValue -= dec
}
func (r *RangeMap[RT, VT]) AddRange(startInclusive RT, endExclusive RT) error {
if endExclusive-startInclusive > r.halfRange {
if endExclusive == startInclusive || endExclusive-startInclusive > r.halfRange {
return errReversedOrder
}

View File

@@ -56,6 +56,7 @@ func TestRangeMapUint32(t *testing.T) {
value, err = r.GetValue(22)
require.NoError(t, err)
require.Equal(t, uint32(2), value)
// outside range should return 3
value, err = r.GetValue(662)
require.NoError(t, err)
@@ -119,4 +120,10 @@ func TestRangeMapUint32(t *testing.T) {
value, err = r.GetValue(3000)
require.NoError(t, err)
require.Equal(t, uint32(13), value)
// decrement running value
r.DecValue(23)
value, err = r.GetValue(3000)
require.NoError(t, err)
require.Equal(t, uint32((1<<32)-10), value)
}