From d24f4093542eb34622cc930ef5393ddd960a857f Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 9 Sep 2023 13:00:59 +0530 Subject: [PATCH] Simplify SN cache a bit. (#2052) --- pkg/sfu/buffer/rtpstats.go | 49 +++++++++------------ pkg/sfu/buffer/rtpstats_test.go | 76 +++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 30 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index d8fb6c110..407e5b341 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -194,8 +194,7 @@ type RTPStats struct { jitterOverridden float64 maxJitterOverridden float64 - snInfos [SnInfoSize]SnInfo - snInfoWritePtr int + snInfos [SnInfoSize]SnInfo gapHistogram [GapHistogramNumBins]uint32 @@ -288,7 +287,6 @@ func (r *RTPStats) Seed(from *RTPStats) { r.maxJitterOverridden = from.maxJitterOverridden r.snInfos = from.snInfos - r.snInfoWritePtr = from.snInfoWritePtr r.gapHistogram = from.gapHistogram @@ -1495,33 +1493,27 @@ func (r *RTPStats) getPacketsLost() uint64 { } func (r *RTPStats) getSnInfoOutOfOrderPtr(esn uint64, ehsn uint64) int { - if int64(esn-ehsn) > 0 { - return -1 // in-order, not expected, maybe too new - } - - offset := ehsn - esn - if int(offset) >= SnInfoSize { - // too old, ignore + offset := int64(ehsn - esn) + if offset >= SnInfoSize || offset < 0 { + // too old OR too new (i. e. ahead of highest) return -1 } - return (r.snInfoWritePtr - int(offset) - 1) & SnInfoMask + return int(esn & SnInfoMask) } func (r *RTPStats) setSnInfo(esn uint64, ehsn uint64, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool, isOutOfOrder bool) { - writePtr := 0 - ooo := int64(esn-ehsn) < 0 - if !ooo { - writePtr = r.snInfoWritePtr - r.snInfoWritePtr = (writePtr + 1) & SnInfoMask - } else { - writePtr = r.getSnInfoOutOfOrderPtr(esn, ehsn) - if writePtr < 0 { + slot := 0 + if int64(esn-ehsn) < 0 { + slot = r.getSnInfoOutOfOrderPtr(esn, ehsn) + if slot < 0 { return } + } else { + slot = int(esn & SnInfoMask) } - snInfo := &r.snInfos[writePtr] + snInfo := &r.snInfos[slot] snInfo.pktSize = pktSize snInfo.hdrSize = hdrSize snInfo.isPaddingOnly = payloadSize == 0 @@ -1531,36 +1523,33 @@ func (r *RTPStats) setSnInfo(esn uint64, ehsn uint64, pktSize uint16, hdrSize ui func (r *RTPStats) clearSnInfos(extStartInclusive uint64, extEndExclusive uint64) { for esn := extStartInclusive; esn != extEndExclusive; esn++ { - snInfo := &r.snInfos[r.snInfoWritePtr] + snInfo := &r.snInfos[esn&SnInfoMask] snInfo.pktSize = 0 snInfo.hdrSize = 0 snInfo.isPaddingOnly = false snInfo.marker = false - - r.snInfoWritePtr = (r.snInfoWritePtr + 1) & SnInfoMask } } func (r *RTPStats) isSnInfoLost(esn uint64, ehsn uint64) bool { - readPtr := r.getSnInfoOutOfOrderPtr(esn, ehsn) - if readPtr < 0 { + slot := r.getSnInfoOutOfOrderPtr(esn, ehsn) + if slot < 0 { return false } - snInfo := &r.snInfos[readPtr] - return snInfo.pktSize == 0 + return r.snInfos[slot].pktSize == 0 } func (r *RTPStats) getIntervalStats(extStartInclusive uint64, extEndExclusive uint64) (intervalStats IntervalStats) { packetsNotFound := uint32(0) processESN := func(esn uint64, ehsn uint64) { - readPtr := r.getSnInfoOutOfOrderPtr(esn, ehsn) - if readPtr < 0 { + slot := r.getSnInfoOutOfOrderPtr(esn, ehsn) + if slot < 0 { packetsNotFound++ return } - snInfo := &r.snInfos[readPtr] + snInfo := &r.snInfos[slot] switch { case snInfo.pktSize == 0: intervalStats.packetsLost++ diff --git a/pkg/sfu/buffer/rtpstats_test.go b/pkg/sfu/buffer/rtpstats_test.go index e59a21de2..f5a4e004c 100644 --- a/pkg/sfu/buffer/rtpstats_test.go +++ b/pkg/sfu/buffer/rtpstats_test.go @@ -152,5 +152,81 @@ func TestRTPStats_Update(t *testing.T) { intervalStats := r.getIntervalStats(r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest()+1) require.Equal(t, uint64(16), intervalStats.packetsLost) + // test sequence number cache + // with a gap + sequenceNumber += 2 + timestamp += 6000 + packet = getPacket(sequenceNumber, timestamp, 1000) + flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now()) + require.True(t, flowState.HasLoss) + require.Equal(t, uint64(sequenceNumber-1), flowState.LossStartInclusive) + require.Equal(t, uint64(sequenceNumber), flowState.LossEndExclusive) + require.Equal(t, uint64(17), r.packetsLost) + expectedSnInfo := SnInfo{ + hdrSize: 12, + pktSize: 1012, + isPaddingOnly: false, + marker: false, + isOutOfOrder: false, + } + require.Equal(t, expectedSnInfo, r.snInfos[sequenceNumber&SnInfoMask]) + + // out-of-order + sequenceNumber-- + timestamp -= 3000 + packet = getPacket(sequenceNumber, timestamp, 999) + flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now()) + require.False(t, flowState.HasLoss) + require.Equal(t, uint64(16), r.packetsLost) + expectedSnInfo = SnInfo{ + hdrSize: 12, + pktSize: 1011, + isPaddingOnly: false, + marker: false, + isOutOfOrder: true, + } + require.Equal(t, expectedSnInfo, r.snInfos[sequenceNumber&SnInfoMask]) + // check that last one is still fine + expectedSnInfo = SnInfo{ + hdrSize: 12, + pktSize: 1012, + isPaddingOnly: false, + marker: false, + isOutOfOrder: false, + } + require.Equal(t, expectedSnInfo, r.snInfos[(sequenceNumber+1)&SnInfoMask]) + + // padding only + sequenceNumber += 2 + packet = getPacket(sequenceNumber, timestamp, 0) + flowState = r.Update(&packet.Header, len(packet.Payload), 25, time.Now()) + require.False(t, flowState.HasLoss) + require.Equal(t, uint64(16), r.packetsLost) + expectedSnInfo = SnInfo{ + hdrSize: 12, + pktSize: 37, + isPaddingOnly: true, + marker: false, + isOutOfOrder: false, + } + require.Equal(t, expectedSnInfo, r.snInfos[sequenceNumber&SnInfoMask]) + // check that last two are still fine + expectedSnInfo = SnInfo{ + hdrSize: 12, + pktSize: 1011, + isPaddingOnly: false, + marker: false, + isOutOfOrder: true, + } + require.Equal(t, expectedSnInfo, r.snInfos[(sequenceNumber-2)&SnInfoMask]) + expectedSnInfo = SnInfo{ + hdrSize: 12, + pktSize: 1012, + isPaddingOnly: false, + marker: false, + isOutOfOrder: false, + } + require.Equal(t, expectedSnInfo, r.snInfos[(sequenceNumber-1)&SnInfoMask]) + r.Stop() }