diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index b3291b1ac..cccf82c83 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -19,6 +19,8 @@ const ( SequenceNumberMax = uint16(65535) NumSequenceNumbers = 65536 FirstSnapshotId = 1 + SnInfoSize = 2048 + SnInfoMask = SnInfoSize - 1 ) type RTPFlowState struct { @@ -65,9 +67,10 @@ type Snapshot struct { } type SnInfo struct { - pktSize uint16 - payloadSize uint16 - marker bool + sn uint16 + pktSize uint16 + isPaddingOnly bool + marker bool } type RTPStatsParams struct { @@ -117,7 +120,12 @@ type RTPStats struct { jitterOverridden float64 maxJitterOverridden float64 - snInfos [NumSequenceNumbers]SnInfo + snInfos [SnInfoSize]SnInfo + snInfoBaseOffset int + snInfoInitialized bool + snInfoBaseSn uint16 + snInfoHighestSn uint16 + gapHistogram [GapHistogramNumBins]uint32 nacks uint32 @@ -227,7 +235,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa } // adjust start to account for out-of-order packets before a cycle completes - r.maybeAdjustStartSN(rtph, packetTime) + r.maybeAdjustStartSN(rtph, packetTime, pktSize, payloadSize) if !r.isSnInfoLost(rtph.SequenceNumber) { r.bytesDuplicate += pktSize @@ -283,7 +291,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa return } -func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64) { +func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSize uint64, payloadSize int) { if (r.getExtHighestSN() - r.extStartSN + 1) >= (NumSequenceNumbers / 2) { return } @@ -297,6 +305,8 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64) { beforeAdjust := r.extStartSN r.extStartSN = uint32(rtph.SequenceNumber) + r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker) + for _, s := range r.snapshots { if s.extStartSN == beforeAdjust { s.extStartSN = r.extStartSN @@ -913,33 +923,113 @@ func (r *RTPStats) getPacketsLost() uint32 { return r.packetsLost } +func (r *RTPStats) updateAndGetSnInfoWritePtr(sn uint16) int { + if !r.snInfoInitialized { + r.snInfoInitialized = true + r.snInfoBaseSn = sn + r.snInfoHighestSn = sn + } + + // check for out-of-order with base + offset := sn - r.snInfoBaseSn + if offset > (1 << 15) { + offset = r.snInfoBaseSn - sn + availableSpace := len(r.snInfos) - int(r.snInfoHighestSn-r.snInfoBaseSn+1) + if availableSpace > int(offset) { + r.snInfoBaseSn = sn + r.snInfoBaseOffset = (r.snInfoBaseOffset - int(offset)) & SnInfoMask + } else { + // cannot move back + return -1 + } + } + + if int(offset) >= len(r.snInfos) { + // slide buffer and throw away old sequence numbers + slide := int(offset) - len(r.snInfos) + 1 + r.snInfoBaseOffset = (r.snInfoBaseOffset + slide) & SnInfoMask + r.snInfoBaseSn = r.snInfos[r.snInfoBaseOffset].sn + offset -= uint16(slide) + } + + if (sn - r.snInfoHighestSn) < (1 << 15) { + r.snInfoHighestSn = sn + } + + return (r.snInfoBaseOffset + int(offset)) & SnInfoMask +} + +func (r *RTPStats) getSnInfoReadPtr(sn uint16) int { + if !r.snInfoInitialized { + return -1 + } + + offset := sn - r.snInfoBaseSn + if offset > (1 << 15) { + // out of order + return -1 + } + + if int(offset) >= len(r.snInfos) { + // asking for newer than what is in buffer + return -1 + } + + return (r.snInfoBaseOffset + int(offset)) & SnInfoMask +} + func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, payloadSize uint16, marker bool) { - snInfo := &r.snInfos[sn] + writePtr := r.updateAndGetSnInfoWritePtr(sn) + if writePtr < 0 { + return + } + + snInfo := &r.snInfos[writePtr] + snInfo.sn = sn snInfo.pktSize = pktSize - snInfo.payloadSize = payloadSize + snInfo.isPaddingOnly = payloadSize == 0 snInfo.marker = marker } func (r *RTPStats) clearSnInfo(sn uint16) { - snInfo := &r.snInfos[sn] + writePtr := r.updateAndGetSnInfoWritePtr(sn) + if writePtr < 0 { + return + } + + snInfo := &r.snInfos[writePtr] + snInfo.sn = sn snInfo.pktSize = 0 - snInfo.payloadSize = 0 + snInfo.isPaddingOnly = false snInfo.marker = false } func (r *RTPStats) isSnInfoLost(sn uint16) bool { - snInfo := &r.snInfos[sn] - return snInfo.pktSize == 0 && snInfo.payloadSize == 0 + readPtr := r.getSnInfoReadPtr(sn) + if readPtr < 0 { + // don't have information, declare not lost + return false + } + + snInfo := &r.snInfos[readPtr] + return snInfo.pktSize == 0 } func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) (packets uint32, bytes uint64, packetsPadding uint32, bytesPadding uint64, packetsLost uint32, frames uint32) { + packetsNotFound := uint32(0) processSN := func(sn uint16) { - snInfo := &r.snInfos[sn] + readPtr := r.getSnInfoReadPtr(sn) + if readPtr < 0 { + packetsNotFound++ + return + } + + snInfo := &r.snInfos[readPtr] switch { - case snInfo.pktSize == 0 && snInfo.payloadSize == 0: + case snInfo.pktSize == 0: packetsLost++ - case snInfo.payloadSize == 0: + case snInfo.isPaddingOnly: packetsPadding++ bytesPadding += uint64(snInfo.pktSize) @@ -963,6 +1053,17 @@ func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) processSN(sn) } } + + if packetsNotFound != 0 { + r.params.Logger.Warnw( + "could not find some packets", nil, + "start", startInclusive, + "end", endExclusive, + "baseSN", r.snInfoBaseSn, + "baseOffset", r.snInfoBaseOffset, + "count", packetsNotFound, + ) + } return }