From 480edff4ac6002201cbd3c4b38b8d7ae8e9ad644 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 16 Aug 2023 13:52:01 +0530 Subject: [PATCH] Adjust TS and cycles when adjusting start. (#1971) * Adjust TS and cycles when adjusting start. Chasing some AddPacket errors across relay. Noticed that in one case the start/end sequence was flipped. There is a known issue of it happening with resync. Unclear if this instance was due to resync or not. The start was close to the edge (64513). So, thought maybe adjust at start and noticed that it needs to maybe increase cycle count if start is wrapping back. In this case, the start is 1000 before wrap point. So, may not be a wrap back issue, but addressing what I found anyway. * fix test --- pkg/sfu/buffer/rtpstats.go | 28 ++++++++++++++++++++++++---- pkg/sfu/buffer/rtpstats_test.go | 3 +++ pkg/sfu/utils/wraparound.go | 2 +- 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 1a556c3d0..8f2b54f7d 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -424,7 +424,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa } // adjust start to account for out-of-order packets before a cycle completes - if !r.maybeAdjustStartSN(rtph, pktSize, hdrSize, payloadSize) { + if !r.maybeAdjustStart(rtph, pktSize, hdrSize, payloadSize) { if !r.isSnInfoLost(rtph.SequenceNumber) { r.bytesDuplicate += pktSize r.headerBytesDuplicate += hdrSize @@ -497,7 +497,7 @@ func (r *RTPStats) ResyncOnNextPacket() { r.resyncOnNextPacket = true } -func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, pktSize uint64, hdrSize uint64, payloadSize int) bool { +func (r *RTPStats) maybeAdjustStart(rtph *rtp.Header, pktSize uint64, hdrSize uint64, payloadSize int) bool { if (r.getExtHighestSN() - r.extStartSN + 1) >= (NumSequenceNumbers / 2) { return false } @@ -507,17 +507,37 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, pktSize uint64, hdrSize } r.packetsLost += uint32(uint16(r.extStartSN)-rtph.SequenceNumber) - 1 - beforeAdjust := r.extStartSN + snBeforeAdjust := r.extStartSN r.extStartSN = uint32(rtph.SequenceNumber) + if r.extStartSN > snBeforeAdjust { + // wrapping back + r.cycles++ + } r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, true) for _, s := range r.snapshots { - if s.extStartSN == beforeAdjust { + if s.extStartSN == snBeforeAdjust { s.extStartSN = r.extStartSN } } + tsBeforeAdjust := r.extStartTS + r.extStartTS = uint64(rtph.Timestamp) + if r.extStartTS > tsBeforeAdjust { + // wrapping back + r.tsCycles++ + } + r.logger.Infow( + "adjusting starting sequence number", + "snBefore", snBeforeAdjust, + "snAfter", r.extStartSN, + "snCyles", r.cycles, + "tsBefore", tsBeforeAdjust, + "tsAfter", r.extStartTS, + "tsCyles", r.tsCycles, + ) + return true } diff --git a/pkg/sfu/buffer/rtpstats_test.go b/pkg/sfu/buffer/rtpstats_test.go index c5c4138ba..389d330c3 100644 --- a/pkg/sfu/buffer/rtpstats_test.go +++ b/pkg/sfu/buffer/rtpstats_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/livekit/protocol/logger" "github.com/pion/rtp" "github.com/stretchr/testify/require" ) @@ -38,6 +39,7 @@ func TestRTPStats(t *testing.T) { clockRate := uint32(90000) r := NewRTPStats(RTPStatsParams{ ClockRate: clockRate, + Logger: logger.GetLogger(), }) totalDuration := 5 * time.Second @@ -79,6 +81,7 @@ func TestRTPStats_Update(t *testing.T) { clockRate := uint32(90000) r := NewRTPStats(RTPStatsParams{ ClockRate: clockRate, + Logger: logger.GetLogger(), }) sequenceNumber := uint16(rand.Float64() * float64(1<<16)) diff --git a/pkg/sfu/utils/wraparound.go b/pkg/sfu/utils/wraparound.go index 299002fbf..f9f102e1c 100644 --- a/pkg/sfu/utils/wraparound.go +++ b/pkg/sfu/utils/wraparound.go @@ -113,7 +113,7 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended // re-adjust start if necessary. The conditions are // 1. Not seen more than half the range yet - // 1. wrap around compared to start and not completed a half cycle, sequences like (10, 65530) in uint16 space + // 1. wrap back compared to start and not completed a half cycle, sequences like (10, 65530) in uint16 space // 2. no wrap around, but out-of-order compared to start and not completed a half cycle , sequences like (10, 9), (65530, 65528) in uint16 space cycles := w.cycles totalNum := w.GetExtendedHighest() - w.GetExtendedStart() + 1