diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index f0c37a3c2..6324c3ecb 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1619,6 +1619,8 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e "extNextTS", extNextTS, "tsJump", extNextTS-extLastTS, "nextSN", rtpMungerState.ExtLastSN+1, + "extIncomingSN", extPkt.ExtSequenceNumber, + "extIncomingTS", extPkt.ExtTimestamp, ) f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, extNextTS-extLastTS) diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index 4ac711600..ef3b5434b 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -121,7 +121,7 @@ func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { r.extLastSN = extPkt.ExtSequenceNumber r.extSecondLastSN = r.extLastSN - 1 - r.updateSnOffset() + r.updateSnOffset("init") r.extLastTS = extPkt.ExtTimestamp } @@ -130,7 +130,7 @@ func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint64, r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1 r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber - r.extLastSN - snAdjust) - r.updateSnOffset() + r.updateSnOffset("switch") r.tsOffset = extPkt.ExtTimestamp - r.extLastTS - tsAdjust } @@ -156,7 +156,7 @@ func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) { } r.extLastSN = r.extSecondLastSN - r.updateSnOffset() + r.updateSnOffset("drop") } func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) { @@ -197,6 +197,15 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara if diff < 0 { // out-of-order, look up sequence number offset cache snOffset, err := r.snRangeMap.GetValue(extPkt.ExtSequenceNumber) + r.logger.Debugw( + "out-of-order packet", + "extHighestIncomingSN", r.extHighestIncomingSN, + "extLastSN", r.extLastSN, + "extSequenceNumber", extPkt.ExtSequenceNumber, + "snOffset", snOffset, + "error", err, + "outgoingSN", extPkt.ExtSequenceNumber-snOffset, + ) if err != nil { return &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingOutOfOrder, @@ -218,7 +227,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara r.logger.Errorw("could not exclude range", err, "sn", r.extHighestIncomingSN) } - r.updateSnOffset() + r.updateSnOffset("pad-drop") return &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingContiguous, @@ -288,8 +297,10 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate r.extSecondLastSN = extLastSN - 1 r.extLastSN = extLastSN - r.snRangeMap.DecValue(uint64(num)) - r.updateSnOffset() + if err := r.snRangeMap.CloseRangeAndDecValue(r.extHighestIncomingSN, uint64(num)); err != nil { + r.logger.Errorw("could not close range", err, "sn", r.extHighestIncomingSN) + } + r.updateSnOffset("pad") r.tsOffset -= extLastTS - r.extLastTS r.extLastTS = extLastTS @@ -305,10 +316,16 @@ func (r *RTPMunger) IsOnFrameBoundary() bool { return r.lastMarker } -func (r *RTPMunger) updateSnOffset() { +func (r *RTPMunger) updateSnOffset(cause string) { snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN + 1) if err != nil { - r.logger.Errorw("could not get SN offset", err) + r.logger.Errorw("could not get sequence number offset", err) } r.snOffset = snOffset + r.logger.Debugw( + "updating sequence number offset", + "cause", cause, + "extHighestIncomingSN", r.extHighestIncomingSN, + "snOffset", r.snOffset, + ) } diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index 7cd56395f..14280d0a0 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -144,7 +144,12 @@ func (s *sequencer) push( s.extHighestSN = extModifiedSN } else { if diff < -int64(s.size) { - s.logger.Debugw("old packet, can not be sequenced", "extHighestSN", s.extHighestSN, "extModifiedSN", extModifiedSN) + s.logger.Debugw( + "old packet, cannot be sequenced", + "extHighestSN", s.extHighestSN, + "extIncomingSN", extIncomingSN, + "extModifiedSN", extModifiedSN, + ) return } @@ -152,7 +157,12 @@ func (s *sequencer) push( var err error snOffset, err = s.snRangeMap.GetValue(extModifiedSN) if err != nil { - s.logger.Errorw("could not get sequence number offset", err, "extIncomingSN", extIncomingSN, "extModifiedSn", extModifiedSN) + s.logger.Errorw( + "could not get sequence number offset", err, + "extHighestSN", s.extHighestSN, + "extIncomingSN", extIncomingSN, + "extModifiedSN", extModifiedSN, + ) return } } diff --git a/pkg/sfu/utils/rangemap.go b/pkg/sfu/utils/rangemap.go index ecf36ec93..22a9b03d1 100644 --- a/pkg/sfu/utils/rangemap.go +++ b/pkg/sfu/utils/rangemap.go @@ -66,8 +66,23 @@ func (r *RangeMap[RT, VT]) ClearAndResetValue(val VT) { r.initRanges(val) } -func (r *RangeMap[RT, VT]) DecValue(dec VT) { - r.ranges[len(r.ranges)-1].value -= dec +func (r *RangeMap[RT, VT]) CloseRangeAndDecValue(end RT, dec VT) error { + // close open range + lr := &r.ranges[len(r.ranges)-1] + if lr.start > end { + // start of open range is after given end + return errReversedOrder + } + lr.end = end + + // start a new open one with decremented value + r.ranges = append(r.ranges, rangeVal[RT, VT]{ + start: end + 1, + end: 0, + value: lr.value - dec, + }) + r.prune() + return nil } func (r *RangeMap[RT, VT]) initRanges(val VT) { diff --git a/pkg/sfu/utils/rangemap_test.go b/pkg/sfu/utils/rangemap_test.go index 2c1d38f1d..95a61132b 100644 --- a/pkg/sfu/utils/rangemap_test.go +++ b/pkg/sfu/utils/rangemap_test.go @@ -28,30 +28,32 @@ func TestRangeMapUint32(t *testing.T) { require.NoError(t, err) require.Equal(t, uint32(0), value) - expectedRangeVal := rangeVal[uint32, uint32]{ - start: 0, - end: 0, - value: 0, + expectedRanges := []rangeVal[uint32, uint32]{ + { + start: 0, + end: 0, + value: 0, + }, } - require.Equal(t, expectedRangeVal, r.ranges[0]) + require.Equal(t, expectedRanges, r.ranges) // add an exclusion, should create a new range err = r.ExcludeRange(10, 11) require.NoError(t, err) - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 0, - end: 9, - value: 0, + expectedRanges = []rangeVal[uint32, uint32]{ + { + start: 0, + end: 9, + value: 0, + }, + { + start: 11, + end: 0, + value: 1, + }, } - require.Equal(t, expectedRangeVal, r.ranges[0]) - - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 11, - end: 0, - value: 1, - } - require.Equal(t, expectedRangeVal, r.ranges[1]) + require.Equal(t, expectedRanges, r.ranges) // getting value in old range should return 0 value, err = r.GetValue(6) @@ -81,19 +83,19 @@ func TestRangeMapUint32(t *testing.T) { err = r.ExcludeRange(11, 12) require.NoError(t, err) - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 0, - end: 9, - value: 0, + expectedRanges = []rangeVal[uint32, uint32]{ + { + start: 0, + end: 9, + value: 0, + }, + { + start: 12, + end: 0, + value: 2, + }, } - require.Equal(t, expectedRangeVal, r.ranges[0]) - - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 12, - end: 0, - value: 2, - } - require.Equal(t, expectedRangeVal, r.ranges[1]) + require.Equal(t, expectedRanges, r.ranges) // excluded range should return error, now is excluded because exclusion range could be extended value, err = r.GetValue(11) @@ -112,19 +114,19 @@ func TestRangeMapUint32(t *testing.T) { err = r.ExcludeRange(12, 22) require.NoError(t, err) - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 0, - end: 9, - value: 0, + expectedRanges = []rangeVal[uint32, uint32]{ + { + start: 0, + end: 9, + value: 0, + }, + { + start: 22, + end: 0, + value: 12, + }, } - require.Equal(t, expectedRangeVal, r.ranges[0]) - - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 22, - end: 0, - value: 12, - } - require.Equal(t, expectedRangeVal, r.ranges[1]) + require.Equal(t, expectedRanges, r.ranges) // excluded range should return error, now is excluded because exclusion range could be extended value, err = r.GetValue(15) @@ -139,26 +141,24 @@ func TestRangeMapUint32(t *testing.T) { err = r.ExcludeRange(26, 30) require.NoError(t, err) - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 0, - end: 9, - value: 0, + expectedRanges = []rangeVal[uint32, uint32]{ + { + start: 0, + end: 9, + value: 0, + }, + { + start: 22, + end: 25, + value: 12, + }, + { + start: 30, + end: 0, + value: 16, + }, } - require.Equal(t, expectedRangeVal, r.ranges[0]) - - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 22, - end: 25, - value: 12, - } - require.Equal(t, expectedRangeVal, r.ranges[1]) - - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 30, - end: 0, - value: 16, - } - require.Equal(t, expectedRangeVal, r.ranges[2]) + require.Equal(t, expectedRanges, r.ranges) // get a value from newly closed range [22, 25] value, err = r.GetValue(23) @@ -170,26 +170,24 @@ func TestRangeMapUint32(t *testing.T) { require.NoError(t, err) // previously first range would have been pruned due to size limitations - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 22, - end: 25, - value: 12, + expectedRanges = []rangeVal[uint32, uint32]{ + { + start: 22, + end: 25, + value: 12, + }, + { + start: 30, + end: 49, + value: 16, + }, + { + start: 51, + end: 0, + value: 17, + }, } - require.Equal(t, expectedRangeVal, r.ranges[0]) - - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 30, - end: 49, - value: 16, - } - require.Equal(t, expectedRangeVal, r.ranges[1]) - - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 51, - end: 0, - value: 17, - } - require.Equal(t, expectedRangeVal, r.ranges[2]) + require.Equal(t, expectedRanges, r.ranges) // excluded range should return error value, err = r.GetValue(50) @@ -222,52 +220,72 @@ func TestRangeMapUint32(t *testing.T) { // reset r.ClearAndResetValue(23) - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 0, - end: 0, - value: 23, + expectedRanges = []rangeVal[uint32, uint32]{ + { + start: 0, + end: 0, + value: 23, + }, } - require.Equal(t, expectedRangeVal, r.ranges[0]) + require.Equal(t, expectedRanges, r.ranges) value, err = r.GetValue(55555555) require.NoError(t, err) require.Equal(t, uint32(23), value) // decrement value and ensure that any key returns that value - r.DecValue(12) + err = r.CloseRangeAndDecValue(34, 12) + require.NoError(t, err) - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 0, - end: 0, - value: 11, + expectedRanges = []rangeVal[uint32, uint32]{ + { + start: 0, + end: 34, + value: 23, + }, + { + start: 35, + end: 0, + value: 11, + }, } - require.Equal(t, expectedRangeVal, r.ranges[0]) + require.Equal(t, expectedRanges, r.ranges) value, err = r.GetValue(55555555) require.NoError(t, err) require.Equal(t, uint32(11), value) // add an exclusion and then decrement value - err = r.ExcludeRange(10, 15) + err = r.ExcludeRange(40, 45) require.NoError(t, err) - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 0, - end: 9, - value: 11, + expectedRanges = []rangeVal[uint32, uint32]{ + { + start: 0, + end: 34, + value: 23, + }, + { + start: 35, + end: 39, + value: 11, + }, + { + start: 45, + end: 0, + value: 16, + }, } - require.Equal(t, expectedRangeVal, r.ranges[0]) - - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 15, - end: 0, - value: 16, - } - require.Equal(t, expectedRangeVal, r.ranges[1]) + require.Equal(t, expectedRanges, r.ranges) // first range access value, err = r.GetValue(5) require.NoError(t, err) + require.Equal(t, uint32(23), value) + + // second range access + value, err = r.GetValue(35) + require.NoError(t, err) require.Equal(t, uint32(11), value) // open range access @@ -275,29 +293,42 @@ func TestRangeMapUint32(t *testing.T) { require.NoError(t, err) require.Equal(t, uint32(16), value) - r.DecValue(6) - - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 0, - end: 9, - value: 11, - } - require.Equal(t, expectedRangeVal, r.ranges[0]) - - expectedRangeVal = rangeVal[uint32, uint32]{ - start: 15, - end: 0, - value: 10, - } - require.Equal(t, expectedRangeVal, r.ranges[1]) - - // first range access - value, err = r.GetValue(5) + err = r.CloseRangeAndDecValue(66, 6) require.NoError(t, err) - require.Equal(t, uint32(11), value) + + expectedRanges = []rangeVal[uint32, uint32]{ + { + start: 35, + end: 39, + value: 11, + }, + { + start: 45, + end: 66, + value: 16, + }, + { + start: 67, + end: 0, + value: 10, + }, + } + require.Equal(t, expectedRanges, r.ranges) + + err = r.CloseRangeAndDecValue(66, 6) + require.ErrorIs(t, err, errReversedOrder) + + // aged out range access + value, err = r.GetValue(5) + require.ErrorIs(t, err, errKeyTooOld) + + // access closed range before decrementing value + value, err = r.GetValue(66) + require.NoError(t, err) + require.Equal(t, uint32(16), value) // open range access - value, err = r.GetValue(55555555) + value, err = r.GetValue(67) require.NoError(t, err) require.Equal(t, uint32(10), value) }