diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 1a556c3d0..8f2b54f7d 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -424,7 +424,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa } // adjust start to account for out-of-order packets before a cycle completes - if !r.maybeAdjustStartSN(rtph, pktSize, hdrSize, payloadSize) { + if !r.maybeAdjustStart(rtph, pktSize, hdrSize, payloadSize) { if !r.isSnInfoLost(rtph.SequenceNumber) { r.bytesDuplicate += pktSize r.headerBytesDuplicate += hdrSize @@ -497,7 +497,7 @@ func (r *RTPStats) ResyncOnNextPacket() { r.resyncOnNextPacket = true } -func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, pktSize uint64, hdrSize uint64, payloadSize int) bool { +func (r *RTPStats) maybeAdjustStart(rtph *rtp.Header, pktSize uint64, hdrSize uint64, payloadSize int) bool { if (r.getExtHighestSN() - r.extStartSN + 1) >= (NumSequenceNumbers / 2) { return false } @@ -507,17 +507,37 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, pktSize uint64, hdrSize } r.packetsLost += uint32(uint16(r.extStartSN)-rtph.SequenceNumber) - 1 - beforeAdjust := r.extStartSN + snBeforeAdjust := r.extStartSN r.extStartSN = uint32(rtph.SequenceNumber) + if r.extStartSN > snBeforeAdjust { + // wrapping back + r.cycles++ + } r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, true) for _, s := range r.snapshots { - if s.extStartSN == beforeAdjust { + if s.extStartSN == snBeforeAdjust { s.extStartSN = r.extStartSN } } + tsBeforeAdjust := r.extStartTS + r.extStartTS = uint64(rtph.Timestamp) + if r.extStartTS > tsBeforeAdjust { + // wrapping back + r.tsCycles++ + } + r.logger.Infow( + "adjusting starting sequence number", + "snBefore", snBeforeAdjust, + "snAfter", r.extStartSN, + "snCyles", r.cycles, + "tsBefore", tsBeforeAdjust, + "tsAfter", r.extStartTS, + "tsCyles", r.tsCycles, + ) + return true } diff --git a/pkg/sfu/buffer/rtpstats_test.go b/pkg/sfu/buffer/rtpstats_test.go index c5c4138ba..389d330c3 100644 --- a/pkg/sfu/buffer/rtpstats_test.go +++ b/pkg/sfu/buffer/rtpstats_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/livekit/protocol/logger" "github.com/pion/rtp" "github.com/stretchr/testify/require" ) @@ -38,6 +39,7 @@ func TestRTPStats(t *testing.T) { clockRate := uint32(90000) r := NewRTPStats(RTPStatsParams{ ClockRate: clockRate, + Logger: logger.GetLogger(), }) totalDuration := 5 * time.Second @@ -79,6 +81,7 @@ func TestRTPStats_Update(t *testing.T) { clockRate := uint32(90000) r := NewRTPStats(RTPStatsParams{ ClockRate: clockRate, + Logger: logger.GetLogger(), }) sequenceNumber := uint16(rand.Float64() * float64(1<<16)) diff --git a/pkg/sfu/utils/wraparound.go b/pkg/sfu/utils/wraparound.go index 299002fbf..f9f102e1c 100644 --- a/pkg/sfu/utils/wraparound.go +++ b/pkg/sfu/utils/wraparound.go @@ -113,7 +113,7 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended // re-adjust start if necessary. The conditions are // 1. Not seen more than half the range yet - // 1. wrap around compared to start and not completed a half cycle, sequences like (10, 65530) in uint16 space + // 1. wrap back compared to start and not completed a half cycle, sequences like (10, 65530) in uint16 space // 2. no wrap around, but out-of-order compared to start and not completed a half cycle , sequences like (10, 9), (65530, 65528) in uint16 space cycles := w.cycles totalNum := w.GetExtendedHighest() - w.GetExtendedStart() + 1