From 492eb3bf18d076b2537975d1fd54ca52434830fe Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 8 Sep 2023 23:48:54 +0530 Subject: [PATCH] Sequencer small optimisations (#2049) * Sequencer small optimisations 1. Use range map to exclude padding only packets. Should take lesser space as we are not using slice to hold pointer to actual data. 2. Avoid `time.Now()` when adding each packet. Just use the arrival time as it should be close enough. `time.Now()` was showing up in profile. * remove debug * correct comment --- pkg/sfu/buffer/rtpstats.go | 14 +-- pkg/sfu/downtrack.go | 49 ++++---- pkg/sfu/forwarder_test.go | 94 +++++++-------- pkg/sfu/rtpmunger.go | 30 ++--- pkg/sfu/rtpmunger_test.go | 58 ++++----- pkg/sfu/sequencer.go | 234 ++++++++++++++++++++++--------------- pkg/sfu/sequencer_test.go | 156 +++++++++++++++++++++---- 7 files changed, 391 insertions(+), 244 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 1f0f2694e..d8fb6c110 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -1133,17 +1133,17 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, fracLost = proxyFracLost } - var dlsr uint32 - if r.srNewest != nil && !r.srNewest.At.IsZero() { - delayMS := uint32(time.Since(r.srNewest.At).Milliseconds()) - dlsr = (delayMS / 1e3) << 16 - dlsr |= (delayMS % 1e3) * 65536 / 1000 - } - lastSR := uint32(0) + dlsr := uint32(0) if r.srNewest != nil { lastSR = uint32(r.srNewest.NTPTimestamp >> 16) + if !r.srNewest.At.IsZero() { + delayMS := uint32(time.Since(r.srNewest.At).Milliseconds()) + dlsr = (delayMS / 1e3) << 16 + dlsr |= (delayMS % 1e3) * 65536 / 1000 + } } + return &rtcp.ReceptionReport{ SSRC: ssrc, FractionLost: fracLost, diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 42f32887a..39c961420 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -71,8 +71,6 @@ const ( keyFrameIntervalMax = 1000 flushTimeout = 1 * time.Second - maxPadding = 2000 - waitBeforeSendPaddingOnMute = 100 * time.Millisecond maxPaddingOnMuteDuration = 5 * time.Second ) @@ -393,11 +391,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, d.rtcpReader = rr } - if d.kind == webrtc.RTPCodecTypeAudio { - d.sequencer = newSequencer(d.params.MaxTrack, 0, d.params.Logger) - } else { - d.sequencer = newSequencer(d.params.MaxTrack, maxPadding, d.params.Logger) - } + d.sequencer = newSequencer(d.params.MaxTrack, d.kind == webrtc.RTPCodecTypeVideo, d.params.Logger) d.codec = codec.RTPCodecCapability if d.onBinding != nil { @@ -710,9 +704,10 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { } if d.sequencer != nil { d.sequencer.push( - extPkt.Packet.SequenceNumber, - tp.rtp.sequenceNumber, - tp.rtp.timestamp, + extPkt.Arrival, + extPkt.ExtSequenceNumber, + tp.rtp.extSequenceNumber, + tp.rtp.extTimestamp, hdr.Marker, int8(layer), tp.codecBytes, @@ -786,6 +781,15 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa return 0 } + // + // Register with sequencer as padding only so that NACKs for these can be filtered out. + // Retransmission is probably a sign of network congestion/badness. + // So, retransmitting padding only packets is only going to make matters worse. + // + if d.sequencer != nil { + d.sequencer.pushPadding(snts[0].extSequenceNumber, snts[len(snts)-1].extSequenceNumber) + } + bytesSent := 0 for i := 0; i < len(snts); i++ { hdr := rtp.Header{ @@ -793,8 +797,8 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa Padding: true, Marker: false, PayloadType: d.payloadType, - SequenceNumber: snts[i].sequenceNumber, - Timestamp: snts[i].timestamp, + SequenceNumber: uint16(snts[i].extSequenceNumber), + Timestamp: uint32(snts[i].extTimestamp), SSRC: d.ssrc, CSRC: []uint32{}, } @@ -816,15 +820,6 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa OnSent: d.packetSent, }) - // - // Register with sequencer with invalid layer so that NACKs for these can be filtered out. - // Retransmission is probably a sign of network congestion/badness. - // So, retransmitting padding packets is only going to make matters worse. - // - if d.sequencer != nil { - d.sequencer.pushPadding(hdr.SequenceNumber) - } - bytesSent += hdr.MarshalSize() + len(payload) } @@ -1298,8 +1293,8 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan Padding: false, Marker: true, PayloadType: d.payloadType, - SequenceNumber: snts[i].sequenceNumber, - Timestamp: snts[i].timestamp, + SequenceNumber: uint16(snts[i].extSequenceNumber), + Timestamp: uint32(snts[i].extTimestamp), SSRC: d.ssrc, CSRC: []uint32{}, } @@ -1637,8 +1632,8 @@ func (d *DownTrack) getTranslatedRTPHeader(extPkt *buffer.ExtPacket, tp *Transla tpRTP := tp.rtp hdr := extPkt.Packet.Header hdr.PayloadType = d.payloadType - hdr.Timestamp = tpRTP.timestamp - hdr.SequenceNumber = tpRTP.sequenceNumber + hdr.Timestamp = uint32(tpRTP.extTimestamp) + hdr.SequenceNumber = uint16(tpRTP.extSequenceNumber) hdr.SSRC = d.ssrc if tp.marker { hdr.Marker = tp.marker @@ -1785,8 +1780,8 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { Padding: false, Marker: true, PayloadType: d.payloadType, - SequenceNumber: snts[i].sequenceNumber, - Timestamp: snts[i].timestamp, + SequenceNumber: uint16(snts[i].extSequenceNumber), + Timestamp: uint32(snts[i].extTimestamp), SSRC: d.ssrc, CSRC: []uint32{}, } diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index 7f1e936e5..abd58144a 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -1194,9 +1194,9 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { // should lock onto the first packet expectedTP := TranslationParams{ rtp: &TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingContiguous, - sequenceNumber: 23333, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingContiguous, + extSequenceNumber: 23333, + extTimestamp: 0xabcdef, }, } actualTP, err := f.GetTranslationParams(extPkt, 0) @@ -1227,9 +1227,9 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingOutOfOrder, - sequenceNumber: 23331, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingOutOfOrder, + extSequenceNumber: 23331, + extTimestamp: 0xabcdef, }, } actualTP, err = f.GetTranslationParams(extPkt, 0) @@ -1262,9 +1262,9 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingContiguous, - sequenceNumber: 23333, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingContiguous, + extSequenceNumber: 23333, + extTimestamp: 0xabcdef, }, } actualTP, err = f.GetTranslationParams(extPkt, 0) @@ -1281,9 +1281,9 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingGap, - sequenceNumber: 23335, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingGap, + extSequenceNumber: 23335, + extTimestamp: 0xabcdef, }, } actualTP, err = f.GetTranslationParams(extPkt, 0) @@ -1301,9 +1301,9 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingOutOfOrder, - sequenceNumber: 23334, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingOutOfOrder, + extSequenceNumber: 23334, + extTimestamp: 0xabcdef, }, } actualTP, err = f.GetTranslationParams(extPkt, 0) @@ -1321,9 +1321,9 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingContiguous, - sequenceNumber: 23336, - timestamp: 0xabcdf0, + snOrdering: SequenceNumberOrderingContiguous, + extSequenceNumber: 23336, + extTimestamp: 0xabcdf0, }, } actualTP, err = f.GetTranslationParams(extPkt, 0) @@ -1417,9 +1417,9 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) { isSwitching: true, isResuming: true, rtp: &TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingContiguous, - sequenceNumber: 23333, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingContiguous, + extSequenceNumber: 23333, + extTimestamp: 0xabcdef, }, codecBytes: marshalledVP8, marker: true, @@ -1495,9 +1495,9 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) { require.NoError(t, err) expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingContiguous, - sequenceNumber: 23334, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingContiguous, + extSequenceNumber: 23334, + extTimestamp: 0xabcdef, }, codecBytes: marshalledVP8, } @@ -1548,9 +1548,9 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) { require.NoError(t, err) expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingContiguous, - sequenceNumber: 23335, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingContiguous, + extSequenceNumber: 23335, + extTimestamp: 0xabcdef, }, codecBytes: marshalledVP8, } @@ -1630,9 +1630,9 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) { require.NoError(t, err) expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingContiguous, - sequenceNumber: 23336, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingContiguous, + extSequenceNumber: 23336, + extTimestamp: 0xabcdef, }, codecBytes: marshalledVP8, } @@ -1650,9 +1650,9 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingGap, - sequenceNumber: 23338, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingGap, + extSequenceNumber: 23338, + extTimestamp: 0xabcdef, }, } actualTP, err = f.GetTranslationParams(extPkt, 0) @@ -1669,9 +1669,9 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) { expectedTP = TranslationParams{ rtp: &TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingOutOfOrder, - sequenceNumber: 23337, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingOutOfOrder, + extSequenceNumber: 23337, + extTimestamp: 0xabcdef, }, } actualTP, err = f.GetTranslationParams(extPkt, 0) @@ -1728,9 +1728,9 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) { expectedTP = TranslationParams{ isSwitching: true, rtp: &TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingContiguous, - sequenceNumber: 23339, - timestamp: 0xabcdf0, + snOrdering: SequenceNumberOrderingContiguous, + extSequenceNumber: 23339, + extTimestamp: 0xabcdf0, }, codecBytes: marshalledVP8, } @@ -1788,8 +1788,8 @@ func TestForwarderGetSnTsForPadding(t *testing.T) { var sntsExpected = make([]SnTs, numPadding) for i := 0; i < numPadding; i++ { sntsExpected[i] = SnTs{ - sequenceNumber: 23333 + uint16(i) + 1, - timestamp: 0xabcdef + (uint32(i)*clockRate)/frameRate, + extSequenceNumber: 23333 + uint64(i) + 1, + extTimestamp: 0xabcdef + (uint64(i)*uint64(clockRate))/uint64(frameRate), } } require.Equal(t, sntsExpected, snts) @@ -1800,8 +1800,8 @@ func TestForwarderGetSnTsForPadding(t *testing.T) { for i := 0; i < numPadding; i++ { sntsExpected[i] = SnTs{ - sequenceNumber: 23338 + uint16(i) + 1, - timestamp: 0xabcdef + (uint32(i+1)*clockRate)/frameRate, + extSequenceNumber: 23338 + uint64(i) + 1, + extTimestamp: 0xabcdef + (uint64(i+1)*uint64(clockRate))/uint64(frameRate), } } require.Equal(t, sntsExpected, snts) @@ -1861,8 +1861,8 @@ func TestForwarderGetSnTsForBlankFrames(t *testing.T) { ts = params.Timestamp + 1 + ((uint32(i)*clockRate)+frameRate-1)/frameRate } sntsExpected[i] = SnTs{ - sequenceNumber: params.SequenceNumber + uint16(i) + 1, - timestamp: ts, + extSequenceNumber: uint64(params.SequenceNumber) + uint64(i) + 1, + extTimestamp: uint64(ts), } } require.Equal(t, sntsExpected, snts) @@ -1873,9 +1873,9 @@ func TestForwarderGetSnTsForBlankFrames(t *testing.T) { sntsExpected = sntsExpected[:numPadding] for i := 0; i < numPadding; i++ { sntsExpected[i] = SnTs{ - sequenceNumber: params.SequenceNumber + uint16(len(snts)) + uint16(i) + 1, + extSequenceNumber: uint64(params.SequenceNumber) + uint64(len(snts)) + uint64(i) + 1, // +1 here due to expected time stamp bumpint by at least one so that time stamp is always moving ahead - timestamp: snts[len(snts)-1].timestamp + 1 + ((uint32(i+1)*clockRate)+frameRate-1)/frameRate, + extTimestamp: snts[len(snts)-1].extTimestamp + 1 + ((uint64(i+1)*uint64(clockRate))+uint64(frameRate)-1)/uint64(frameRate), } } snts, frameEndNeeded, err = f.GetSnTsForBlankFrames(30, numBlankFrames) diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index 1d902aaf3..4ac711600 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -40,14 +40,14 @@ const ( ) type TranslationParamsRTP struct { - snOrdering SequenceNumberOrdering - sequenceNumber uint16 - timestamp uint32 + snOrdering SequenceNumberOrdering + extSequenceNumber uint64 + extTimestamp uint64 } type SnTs struct { - sequenceNumber uint16 - timestamp uint32 + extSequenceNumber uint64 + extTimestamp uint64 } // ---------------------------------------------------------------------- @@ -188,9 +188,9 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara } return &TranslationParamsRTP{ - snOrdering: ordering, - sequenceNumber: uint16(extMungedSN), - timestamp: uint32(extMungedTS), + snOrdering: ordering, + extSequenceNumber: extMungedSN, + extTimestamp: extMungedTS, }, nil } @@ -204,9 +204,9 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara } return &TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingOutOfOrder, - sequenceNumber: uint16(extPkt.ExtSequenceNumber - snOffset), - timestamp: uint32(extPkt.ExtTimestamp - r.tsOffset), + snOrdering: SequenceNumberOrderingOutOfOrder, + extSequenceNumber: extPkt.ExtSequenceNumber - snOffset, + extTimestamp: extPkt.ExtTimestamp - r.tsOffset, }, nil } @@ -268,21 +268,21 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate vals := make([]SnTs, num) for i := 0; i < num; i++ { extLastSN++ - vals[i].sequenceNumber = uint16(extLastSN) + vals[i].extSequenceNumber = extLastSN if frameRate != 0 { if useLastTSForFirst && i == 0 { - vals[i].timestamp = uint32(r.extLastTS) + vals[i].extTimestamp = r.extLastTS } else { ets := extRtpTimestamp + uint64(((uint32(i+1-tsOffset)*clockRate)+frameRate-1)/frameRate) if int64(ets-extLastTS) <= 0 { ets = extLastTS + 1 } extLastTS = ets - vals[i].timestamp = uint32(ets) + vals[i].extTimestamp = ets } } else { - vals[i].timestamp = uint32(r.extLastTS) + vals[i].extTimestamp = r.extLastTS } } diff --git a/pkg/sfu/rtpmunger_test.go b/pkg/sfu/rtpmunger_test.go index 8e5573100..ea8842457 100644 --- a/pkg/sfu/rtpmunger_test.go +++ b/pkg/sfu/rtpmunger_test.go @@ -173,9 +173,9 @@ func TestOutOfOrderSequenceNumber(t *testing.T) { extPkt, _ = testutils.GetTestExtPacket(params) tpExpected := TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingOutOfOrder, - sequenceNumber: 23331, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingOutOfOrder, + extSequenceNumber: 23331, + extTimestamp: 0xabcdef, } tp, err := r.UpdateAndGetSnTs(extPkt) @@ -258,9 +258,9 @@ func TestPaddingOnlyPacket(t *testing.T) { extPkt, _ = testutils.GetTestExtPacket(params) tpExpected = TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingGap, - sequenceNumber: 23334, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingGap, + extSequenceNumber: 23334, + extTimestamp: 0xabcdef, } tp, err = r.UpdateAndGetSnTs(extPkt) @@ -299,9 +299,9 @@ func TestGapInSequenceNumber(t *testing.T) { extPkt, _ = testutils.GetTestExtPacket(params) tpExpected := TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingGap, - sequenceNumber: 1, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingGap, + extSequenceNumber: 65536 + 1, + extTimestamp: 0xabcdef, } tp, err := r.UpdateAndGetSnTs(extPkt) @@ -352,9 +352,9 @@ func TestGapInSequenceNumber(t *testing.T) { extPkt, _ = testutils.GetTestExtPacket(params) tpExpected = TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingGap, - sequenceNumber: 3, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingGap, + extSequenceNumber: 65536 + 3, + extTimestamp: 0xabcdef, } tp, err = r.UpdateAndGetSnTs(extPkt) @@ -403,9 +403,9 @@ func TestGapInSequenceNumber(t *testing.T) { extPkt, _ = testutils.GetTestExtPacket(params) tpExpected = TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingGap, - sequenceNumber: 5, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingGap, + extSequenceNumber: 65536 + 5, + extTimestamp: 0xabcdef, } tp, err = r.UpdateAndGetSnTs(extPkt) @@ -436,9 +436,9 @@ func TestGapInSequenceNumber(t *testing.T) { extPkt, _ = testutils.GetTestExtPacket(params) tpExpected = TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingOutOfOrder, - sequenceNumber: 4, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingOutOfOrder, + extSequenceNumber: 65536 + 4, + extTimestamp: 0xabcdef, } tp, err = r.UpdateAndGetSnTs(extPkt) @@ -459,9 +459,9 @@ func TestGapInSequenceNumber(t *testing.T) { extPkt, _ = testutils.GetTestExtPacket(params) tpExpected = TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingOutOfOrder, - sequenceNumber: 2, - timestamp: 0xabcdef, + snOrdering: SequenceNumberOrderingOutOfOrder, + extSequenceNumber: 65536 + 2, + extTimestamp: 0xabcdef, } tp, err = r.UpdateAndGetSnTs(extPkt) @@ -494,27 +494,27 @@ func TestUpdateAndGetPaddingSnTs(t *testing.T) { // forcing a marker should not error out. // And timestamp on first padding should be the same as the last one. numPadding := 10 - clockRate := uint32(10) - frameRate := uint32(5) + clockRate := uint64(10) + frameRate := uint64(5) var sntsExpected = make([]SnTs, numPadding) for i := 0; i < numPadding; i++ { sntsExpected[i] = SnTs{ - sequenceNumber: params.SequenceNumber + uint16(i) + 1, - timestamp: params.Timestamp + ((uint32(i)*clockRate)+frameRate-1)/frameRate, + extSequenceNumber: uint64(params.SequenceNumber) + uint64(i) + 1, + extTimestamp: uint64(params.Timestamp) + ((uint64(i)*clockRate)+frameRate-1)/frameRate, } } - snts, err := r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, true, extPkt.ExtTimestamp) + snts, err := r.UpdateAndGetPaddingSnTs(numPadding, uint32(clockRate), uint32(frameRate), true, extPkt.ExtTimestamp) require.NoError(t, err) require.Equal(t, sntsExpected, snts) // now that there is a marker, timestamp should jump on first padding when asked again for i := 0; i < numPadding; i++ { sntsExpected[i] = SnTs{ - sequenceNumber: params.SequenceNumber + uint16(len(snts)) + uint16(i) + 1, - timestamp: snts[len(snts)-1].timestamp + ((uint32(i+1)*clockRate)+frameRate-1)/frameRate, + extSequenceNumber: uint64(params.SequenceNumber) + uint64(len(snts)) + uint64(i) + 1, + extTimestamp: snts[len(snts)-1].extTimestamp + ((uint64(i+1)*clockRate)+frameRate-1)/frameRate, } } - snts, err = r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, false, uint64(snts[len(snts)-1].timestamp)) + snts, err = r.UpdateAndGetPaddingSnTs(numPadding, uint32(clockRate), uint32(frameRate), false, snts[len(snts)-1].extTimestamp) require.NoError(t, err) require.Equal(t, sntsExpected, snts) } diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index a2699427b..74dbd38f8 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/livekit/livekit-server/pkg/sfu/utils" "github.com/livekit/protocol/logger" ) @@ -74,28 +75,30 @@ type packetMeta struct { // Sequencer stores the packet sequence received by the down track type sequencer struct { sync.Mutex - init bool - max int - seq []*packetMeta - meta []packetMeta - metaWritePtr int - step int - headSN uint16 + size int startTime int64 + initialized bool + extHighestSN uint64 + snOffset uint64 + meta []packetMeta + snRangeMap *utils.RangeMap[uint64, uint64] rtt uint32 logger logger.Logger } -func newSequencer(maxTrack int, maxPadding int, logger logger.Logger) *sequencer { - return &sequencer{ - startTime: time.Now().UnixNano() / 1e6, - max: maxTrack + maxPadding, - seq: make([]*packetMeta, maxTrack+maxPadding), - meta: make([]packetMeta, maxTrack), - metaWritePtr: 0, - rtt: defaultRtt, - logger: logger, +func newSequencer(size int, maybeSparse bool, logger logger.Logger) *sequencer { + s := &sequencer{ + size: size, + startTime: time.Now().UnixMilli(), + meta: make([]packetMeta, size), + rtt: defaultRtt, + logger: logger, } + + if maybeSparse { + s.snRangeMap = utils.NewRangeMap[uint64, uint64]((size + 1) / 2) // assume run lengths of at least 2 in between padding bursts + } + return s } func (s *sequencer) setRTT(rtt uint32) { @@ -110,8 +113,9 @@ func (s *sequencer) setRTT(rtt uint32) { } func (s *sequencer) push( - sn, offSn uint16, - timeStamp uint32, + packetTime time.Time, + extIncomingSN, extModifiedSN uint64, + extModifiedTS uint64, marker bool, layer int8, codecBytes []byte, @@ -120,125 +124,163 @@ func (s *sequencer) push( s.Lock() defer s.Unlock() - slot, isValid := s.getSlot(offSn) - if !isValid { - return + if !s.initialized { + s.extHighestSN = extModifiedSN - 1 + s.updateSNOffset() } - s.meta[s.metaWritePtr] = packetMeta{ - sourceSeqNo: sn, - targetSeqNo: offSn, - timestamp: timeStamp, + snOffset := s.snOffset + diff := int64(extModifiedSN - s.extHighestSN) + if diff >= 0 { + s.extHighestSN = extModifiedSN + } else { + if diff < -int64(s.size) { + s.logger.Debugw("old packet, can not be sequenced", "extHighestSN", s.extHighestSN, "extModifiedSN", extModifiedSN) + return + } + + if s.snRangeMap != nil { + var err error + snOffset, err = s.snRangeMap.GetValue(extModifiedSN) + if err != nil { + s.logger.Errorw("could not get sequence number offset", err, "extIncomingSN", extIncomingSN, "extModifiedSn", extModifiedSN) + return + } + } + } + + slot := (extModifiedSN - snOffset) % uint64(s.size) + s.meta[slot] = packetMeta{ + sourceSeqNo: uint16(extIncomingSN), + targetSeqNo: uint16(extModifiedSN), + timestamp: uint32(extModifiedTS), marker: marker, layer: layer, codecBytes: append([]byte{}, codecBytes...), ddBytes: append([]byte{}, ddBytes...), - lastNack: s.getRefTime(), // delay retransmissions after the original transmission - } - - s.seq[slot] = &s.meta[s.metaWritePtr] - - s.metaWritePtr++ - if s.metaWritePtr >= len(s.meta) { - s.metaWritePtr -= len(s.meta) + lastNack: s.getRefTime(packetTime), // delay retransmissions after the original transmission } } -func (s *sequencer) pushPadding(offSn uint16) { +func (s *sequencer) pushPadding(extStartSNInclusive uint64, extEndSNInclusive uint64) { s.Lock() defer s.Unlock() - slot, isValid := s.getSlot(offSn) - if !isValid { + if s.snRangeMap == nil { return } - s.seq[slot] = nil -} + if extStartSNInclusive <= s.extHighestSN { + // a higher sequence number has already been recorded with an offset, + // adding an exclusion range before the highest means the offset of sequence numbers + // after the exclusion range will be affected and all those higher sequence numbers + // need to be patched. + // + // Not recording exclusion range means a few slots (of the size of exclusion range) + // are wasted in this cycle. That should be fine as the exclusion ranges should be + // a few packets at a time. + s.logger.Warnw("cannot exclude old range", nil, "extHighestSN", s.extHighestSN, "startSN", extStartSNInclusive, "endSN", extEndSNInclusive) -func (s *sequencer) getSlot(offSn uint16) (int, bool) { - if !s.init { - s.headSN = offSn - 1 - s.init = true - } + // if exclusion range is before what has already been sequenced, invalidate exclusion range slots + for sn := extStartSNInclusive; sn != extEndSNInclusive+1; sn++ { + diff := int64(sn - s.extHighestSN) + if diff >= 0 || diff < -int64(s.size) { + // too old OR too new (too new should not happen, just be safe) + continue + } - diff := offSn - s.headSN - if diff == 0 { - // duplicate - return 0, false - } + snOffset, err := s.snRangeMap.GetValue(sn) + if err != nil { + s.logger.Errorw("could not get sequence number offset", err, "sn", sn) + continue + } - slot := 0 - if diff > (1 << 15) { - // out-of-order - back := int(s.headSN - offSn) - if back >= s.max { - s.logger.Debugw("old packet, can not be sequenced", "head", s.headSN, "received", offSn) - return 0, false + slot := (sn - snOffset) % uint64(s.size) + s.meta[slot] = packetMeta{ + sourceSeqNo: 0, + targetSeqNo: 0, + } } - slot = s.step - back - 1 - } else { - s.headSN = offSn - - // invalidate intervening slots - for idx := 0; idx < int(diff)-1; idx++ { - s.seq[s.wrap(s.step+idx)] = nil - } - - slot = s.step + int(diff) - 1 - - // for next packet - s.step = s.wrap(s.step + int(diff)) + return } - return s.wrap(slot), true + if err := s.snRangeMap.ExcludeRange(extStartSNInclusive, extEndSNInclusive+1); err != nil { + s.logger.Errorw("could not exclude range", err, "startSN", extStartSNInclusive, "endSN", extEndSNInclusive) + return + } + + s.extHighestSN = extEndSNInclusive + s.updateSNOffset() } func (s *sequencer) getPacketsMeta(seqNo []uint16) []packetMeta { s.Lock() defer s.Unlock() - meta := make([]packetMeta, 0, len(seqNo)) - refTime := s.getRefTime() + snOffset := uint64(0) + var err error + packetsMeta := make([]packetMeta, 0, len(seqNo)) + refTime := s.getRefTime(time.Now()) + highestSN := uint16(s.extHighestSN) for _, sn := range seqNo { - diff := s.headSN - sn - if diff > (1<<15) || int(diff) >= s.max { - // out-of-order from head (should not happen) or too old + diff := highestSN - sn + if diff > (1 << 15) { + // out-of-order from head (should not happen, just be safe) continue } - slot := s.wrap(s.step - int(diff) - 1) - seq := s.seq[slot] - if seq == nil || seq.targetSeqNo != sn { + // find slot by adjusting for padding only packets that were not recorded in sequencer + extSN := uint64(sn) + (s.extHighestSN & 0xFFFF_FFFF_FFFF_0000) + if sn > highestSN { + extSN -= (1 << 16) + } + + if s.extHighestSN-extSN >= uint64(s.size) { + // too old continue } - if refTime-seq.lastNack > uint32(math.Min(float64(ignoreRetransmission), float64(2*s.rtt))) && seq.nacked < maxAck { - seq.nacked++ - seq.lastNack = refTime + if s.snRangeMap != nil { + snOffset, err = s.snRangeMap.GetValue(extSN) + if err != nil { + // could be padding packet which is excluded and will not have value + continue + } + } - pm := *seq - pm.codecBytes = append([]byte{}, seq.codecBytes...) - pm.ddBytes = append([]byte{}, seq.ddBytes...) - meta = append(meta, pm) + slot := (extSN - snOffset) % uint64(s.size) + meta := &s.meta[slot] + if meta.targetSeqNo != sn { + continue + } + + if meta.nacked < maxAck && refTime-meta.lastNack > uint32(math.Min(float64(ignoreRetransmission), float64(2*s.rtt))) { + meta.nacked++ + meta.lastNack = refTime + + pm := *meta + pm.codecBytes = append([]byte{}, meta.codecBytes...) + pm.ddBytes = append([]byte{}, meta.ddBytes...) + packetsMeta = append(packetsMeta, pm) } } - return meta + return packetsMeta } -func (s *sequencer) wrap(slot int) int { - for slot < 0 { - slot += s.max +func (s *sequencer) getRefTime(at time.Time) uint32 { + return uint32(at.UnixMilli() - s.startTime) +} + +func (s *sequencer) updateSNOffset() { + if s.snRangeMap == nil { + return } - for slot >= s.max { - slot -= s.max + snOffset, err := s.snRangeMap.GetValue(s.extHighestSN + 1) + if err != nil { + s.logger.Errorw("could not update sequence number offset", err, "extHighestSN", s.extHighestSN) + return } - - return slot -} - -func (s *sequencer) getRefTime() uint32 { - return uint32(time.Now().UnixNano()/1e6 - s.startTime) + s.snOffset = snOffset } diff --git a/pkg/sfu/sequencer_test.go b/pkg/sfu/sequencer_test.go index 8fde0d9e6..900248992 100644 --- a/pkg/sfu/sequencer_test.go +++ b/pkg/sfu/sequencer_test.go @@ -25,15 +25,15 @@ import ( ) func Test_sequencer(t *testing.T) { - seq := newSequencer(500, 0, logger.GetLogger()) + seq := newSequencer(500, false, logger.GetLogger()) off := uint16(15) - for i := uint16(1); i < 518; i++ { - seq.push(i, i+off, 123, true, 2, nil, nil) + for i := uint64(1); i < 518; i++ { + seq.push(time.Now(), i, i+uint64(off), 123, true, 2, nil, nil) } // send the last two out-of-order - seq.push(519, 519+off, 123, false, 2, nil, nil) - seq.push(518, 518+off, 123, true, 2, nil, nil) + seq.push(time.Now(), 519, 519+uint64(off), 123, false, 2, nil, nil) + seq.push(time.Now(), 518, 518+uint64(off), 123, true, 2, nil, nil) req := []uint16{57, 58, 62, 63, 513, 514, 515, 516, 517} res := seq.getPacketsMeta(req) @@ -59,14 +59,14 @@ func Test_sequencer(t *testing.T) { require.Equal(t, val.layer, int8(2)) } - seq.push(521, 521+off, 123, true, 1, nil, nil) + seq.push(time.Now(), 521, 521+uint64(off), 123, true, 1, nil, nil) m := seq.getPacketsMeta([]uint16{521 + off}) require.Equal(t, 0, len(m)) time.Sleep((ignoreRetransmission + 10) * time.Millisecond) m = seq.getPacketsMeta([]uint16{521 + off}) require.Equal(t, 1, len(m)) - seq.push(505, 505+off, 123, false, 1, nil, nil) + seq.push(time.Now(), 505, 505+uint64(off), 123, false, 1, nil, nil) m = seq.getPacketsMeta([]uint16{505 + off}) require.Equal(t, 0, len(m)) time.Sleep((ignoreRetransmission + 10) * time.Millisecond) @@ -74,14 +74,17 @@ func Test_sequencer(t *testing.T) { require.Equal(t, 1, len(m)) } -func Test_sequencer_getNACKSeqNo(t *testing.T) { +func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { type args struct { seqNo []uint16 } + type input struct { + seqNo uint64 + isPadding bool + } type fields struct { - input []uint16 - padding []uint16 - offset uint16 + inputs []input + offset uint64 markerOdd bool markerEven bool codecBytesOdd []byte @@ -99,8 +102,17 @@ func Test_sequencer_getNACKSeqNo(t *testing.T) { { name: "Should get correct seq numbers", fields: fields{ - input: []uint16{2, 3, 4, 7, 8, 11}, - padding: []uint16{9, 10}, + inputs: []input{ + {65526, false}, + {65524, false}, + {65525, false}, + {65529, false}, + {65530, false}, + {65531, true}, + {65533, false}, + {65532, true}, + {65534, false}, + }, offset: 5, markerOdd: true, markerEven: false, @@ -110,26 +122,124 @@ func Test_sequencer_getNACKSeqNo(t *testing.T) { ddBytesEven: []byte{11, 12}, }, args: args{ - seqNo: []uint16{4 + 5, 5 + 5, 8 + 5, 9 + 5, 10 + 5, 11 + 5}, + seqNo: []uint16{65526 + 5, 65527 + 5, 65530 + 5, 0 /* 65531 input */, 1 /* 65532 input */, 2 /* 65533 input */, 3 /* 65534 input */}, }, - want: []uint16{4, 8, 11}, + // although 65526 is originally pushed, that would have been reset by 65532 (padding only packet) + // because of trying to add an exclusion range before highest sequence number which will fail + // and the resulting fix up of the exclusion range slots + want: []uint16{65530, 65533, 65534}, }, } for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - n := newSequencer(5, 10, logger.GetLogger()) + n := newSequencer(5, true, logger.GetLogger()) - for _, i := range tt.fields.input { - if i%2 == 0 { - n.push(i, i+tt.fields.offset, 123, tt.fields.markerEven, 3, tt.fields.codecBytesEven, tt.fields.ddBytesEven) + for _, i := range tt.fields.inputs { + if i.isPadding { + n.pushPadding(i.seqNo+tt.fields.offset, i.seqNo+tt.fields.offset) } else { - n.push(i, i+tt.fields.offset, 123, tt.fields.markerOdd, 3, tt.fields.codecBytesOdd, tt.fields.ddBytesOdd) + if i.seqNo%2 == 0 { + n.push(time.Now(), i.seqNo, i.seqNo+tt.fields.offset, 123, tt.fields.markerEven, 3, tt.fields.codecBytesEven, tt.fields.ddBytesEven) + } else { + n.push(time.Now(), i.seqNo, i.seqNo+tt.fields.offset, 123, tt.fields.markerOdd, 3, tt.fields.codecBytesOdd, tt.fields.ddBytesOdd) + } + } + } + + time.Sleep((ignoreRetransmission + 10) * time.Millisecond) + g := n.getPacketsMeta(tt.args.seqNo) + var got []uint16 + for _, sn := range g { + got = append(got, sn.sourceSeqNo) + if sn.sourceSeqNo%2 == 0 { + require.Equal(t, tt.fields.markerEven, sn.marker) + require.Equal(t, tt.fields.codecBytesEven, sn.codecBytes) + require.Equal(t, tt.fields.ddBytesEven, sn.ddBytes) + } else { + require.Equal(t, tt.fields.markerOdd, sn.marker) + require.Equal(t, tt.fields.codecBytesOdd, sn.codecBytes) + require.Equal(t, tt.fields.ddBytesOdd, sn.ddBytes) + } + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("getPacketsMeta() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) { + type args struct { + seqNo []uint16 + } + type input struct { + seqNo uint64 + isPadding bool + } + type fields struct { + inputs []input + offset uint64 + markerOdd bool + markerEven bool + codecBytesOdd []byte + codecBytesEven []byte + ddBytesOdd []byte + ddBytesEven []byte + } + + tests := []struct { + name string + fields fields + args args + want []uint16 + }{ + { + name: "Should get correct seq numbers", + fields: fields{ + inputs: []input{ + {2, false}, + {3, false}, + {4, false}, + {7, false}, + {8, false}, + {9, true}, + {11, false}, + {10, true}, + {12, false}, + {13, false}, + }, + offset: 5, + markerOdd: true, + markerEven: false, + codecBytesOdd: []byte{1, 2, 3, 4}, + codecBytesEven: []byte{5, 6, 7}, + ddBytesOdd: []byte{8, 9, 10}, + ddBytesEven: []byte{11, 12}, + }, + args: args{ + seqNo: []uint16{4 + 5, 5 + 5, 8 + 5, 9 + 5, 10 + 5, 11 + 5, 12 + 5}, + }, + // although 4 and 8 were originally added, they would be too old after a cycle of sequencer buffer + want: []uint16{11, 12}, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + n := newSequencer(5, false, logger.GetLogger()) + + for _, i := range tt.fields.inputs { + if i.isPadding { + n.pushPadding(i.seqNo+tt.fields.offset, i.seqNo+tt.fields.offset) + } else { + if i.seqNo%2 == 0 { + n.push(time.Now(), i.seqNo, i.seqNo+tt.fields.offset, 123, tt.fields.markerEven, 3, tt.fields.codecBytesEven, tt.fields.ddBytesEven) + } else { + n.push(time.Now(), i.seqNo, i.seqNo+tt.fields.offset, 123, tt.fields.markerOdd, 3, tt.fields.codecBytesOdd, tt.fields.ddBytesOdd) + } } } - for _, i := range tt.fields.padding { - n.pushPadding(i + tt.fields.offset) - } time.Sleep((ignoreRetransmission + 10) * time.Millisecond) g := n.getPacketsMeta(tt.args.seqNo)