Adjust TS and cycles when adjusting start. (#1971)

* Adjust TS and cycles when adjusting start.

Chasing some AddPacket errors across relay.
Noticed that in one case the start/end sequence was flipped.
There is a known issue of it happening with resync.
Unclear if this instance was due to resync or not.
The start was close to the edge (64513). So, thought maybe
adjust at start and noticed that it needs to maybe increase
cycle count if start is wrapping back. In this case, the
start is 1000 before wrap point. So, may not be a wrap back
issue, but addressing what I found anyway.

* fix test
This commit is contained in:
Raja Subramanian
2023-08-16 13:52:01 +05:30
committed by GitHub
parent c0ea1b9ced
commit 480edff4ac
3 changed files with 28 additions and 5 deletions
+24 -4
View File
@@ -424,7 +424,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
}
// adjust start to account for out-of-order packets before a cycle completes
if !r.maybeAdjustStartSN(rtph, pktSize, hdrSize, payloadSize) {
if !r.maybeAdjustStart(rtph, pktSize, hdrSize, payloadSize) {
if !r.isSnInfoLost(rtph.SequenceNumber) {
r.bytesDuplicate += pktSize
r.headerBytesDuplicate += hdrSize
@@ -497,7 +497,7 @@ func (r *RTPStats) ResyncOnNextPacket() {
r.resyncOnNextPacket = true
}
func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, pktSize uint64, hdrSize uint64, payloadSize int) bool {
func (r *RTPStats) maybeAdjustStart(rtph *rtp.Header, pktSize uint64, hdrSize uint64, payloadSize int) bool {
if (r.getExtHighestSN() - r.extStartSN + 1) >= (NumSequenceNumbers / 2) {
return false
}
@@ -507,17 +507,37 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, pktSize uint64, hdrSize
}
r.packetsLost += uint32(uint16(r.extStartSN)-rtph.SequenceNumber) - 1
beforeAdjust := r.extStartSN
snBeforeAdjust := r.extStartSN
r.extStartSN = uint32(rtph.SequenceNumber)
if r.extStartSN > snBeforeAdjust {
// wrapping back
r.cycles++
}
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, true)
for _, s := range r.snapshots {
if s.extStartSN == beforeAdjust {
if s.extStartSN == snBeforeAdjust {
s.extStartSN = r.extStartSN
}
}
tsBeforeAdjust := r.extStartTS
r.extStartTS = uint64(rtph.Timestamp)
if r.extStartTS > tsBeforeAdjust {
// wrapping back
r.tsCycles++
}
r.logger.Infow(
"adjusting starting sequence number",
"snBefore", snBeforeAdjust,
"snAfter", r.extStartSN,
"snCyles", r.cycles,
"tsBefore", tsBeforeAdjust,
"tsAfter", r.extStartTS,
"tsCyles", r.tsCycles,
)
return true
}
+3
View File
@@ -20,6 +20,7 @@ import (
"testing"
"time"
"github.com/livekit/protocol/logger"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
@@ -38,6 +39,7 @@ func TestRTPStats(t *testing.T) {
clockRate := uint32(90000)
r := NewRTPStats(RTPStatsParams{
ClockRate: clockRate,
Logger: logger.GetLogger(),
})
totalDuration := 5 * time.Second
@@ -79,6 +81,7 @@ func TestRTPStats_Update(t *testing.T) {
clockRate := uint32(90000)
r := NewRTPStats(RTPStatsParams{
ClockRate: clockRate,
Logger: logger.GetLogger(),
})
sequenceNumber := uint16(rand.Float64() * float64(1<<16))
+1 -1
View File
@@ -113,7 +113,7 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended
// re-adjust start if necessary. The conditions are
// 1. Not seen more than half the range yet
// 1. wrap around compared to start and not completed a half cycle, sequences like (10, 65530) in uint16 space
// 1. wrap back compared to start and not completed a half cycle, sequences like (10, 65530) in uint16 space
// 2. no wrap around, but out-of-order compared to start and not completed a half cycle , sequences like (10, 9), (65530, 65528) in uint16 space
cycles := w.cycles
totalNum := w.GetExtendedHighest() - w.GetExtendedStart() + 1