diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index cccf82c83..6223dc714 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -67,7 +67,6 @@ type Snapshot struct { } type SnInfo struct { - sn uint16 pktSize uint16 isPaddingOnly bool marker bool @@ -120,11 +119,8 @@ type RTPStats struct { jitterOverridden float64 maxJitterOverridden float64 - snInfos [SnInfoSize]SnInfo - snInfoBaseOffset int - snInfoInitialized bool - snInfoBaseSn uint16 - snInfoHighestSn uint16 + snInfos [SnInfoSize]SnInfo + snInfoWritePtr int gapHistogram [GapHistogramNumBins]uint32 @@ -235,14 +231,15 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa } // adjust start to account for out-of-order packets before a cycle completes - r.maybeAdjustStartSN(rtph, packetTime, pktSize, payloadSize) - - if !r.isSnInfoLost(rtph.SequenceNumber) { - r.bytesDuplicate += pktSize - r.packetsDuplicate++ - isDuplicate = true - } else { - r.packetsLost-- + if !r.maybeAdjustStartSN(rtph, packetTime, pktSize, payloadSize) { + if !r.isSnInfoLost(rtph.SequenceNumber) { + r.bytesDuplicate += pktSize + r.packetsDuplicate++ + isDuplicate = true + } else { + r.packetsLost-- + r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker) + } } // in-order @@ -258,11 +255,11 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa r.updateGapHistogram(int(diff)) // update missing sequence numbers - for lost := r.highestSN + 1; lost != rtph.SequenceNumber; lost++ { - r.clearSnInfo(lost) - } + r.clearSnInfos(r.highestSN+1, rtph.SequenceNumber) r.packetsLost += uint32(diff - 1) + r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker) + if rtph.SequenceNumber < r.highestSN && !first { r.cycles++ } @@ -276,8 +273,6 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa } if !isDuplicate { - r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker) - if payloadSize == 0 { r.packetsPadding++ r.bytesPadding += pktSize @@ -291,17 +286,16 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa return } -func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSize uint64, payloadSize int) { +func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSize uint64, payloadSize int) bool { if (r.getExtHighestSN() - r.extStartSN + 1) >= (NumSequenceNumbers / 2) { - return + return false } if (rtph.SequenceNumber - uint16(r.extStartSN)) < (1 << 15) { - return + return false } - // NOTE: current sequence number is counted as loss as it will be deducted in the duplicate check - r.packetsLost += uint32(uint16(r.extStartSN) - rtph.SequenceNumber) + r.packetsLost += uint32(uint16(r.extStartSN)-rtph.SequenceNumber) - 1 beforeAdjust := r.extStartSN r.extStartSN = uint32(rtph.SequenceNumber) @@ -312,6 +306,8 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSiz s.extStartSN = r.extStartSN } } + + return true } func (r *RTPStats) GetTotalPacketsPrimary() uint32 { @@ -923,91 +919,54 @@ func (r *RTPStats) getPacketsLost() uint32 { return r.packetsLost } -func (r *RTPStats) updateAndGetSnInfoWritePtr(sn uint16) int { - if !r.snInfoInitialized { - r.snInfoInitialized = true - r.snInfoBaseSn = sn - r.snInfoHighestSn = sn - } - - // check for out-of-order with base - offset := sn - r.snInfoBaseSn - if offset > (1 << 15) { - offset = r.snInfoBaseSn - sn - availableSpace := len(r.snInfos) - int(r.snInfoHighestSn-r.snInfoBaseSn+1) - if availableSpace > int(offset) { - r.snInfoBaseSn = sn - r.snInfoBaseOffset = (r.snInfoBaseOffset - int(offset)) & SnInfoMask - } else { - // cannot move back +func (r *RTPStats) getSnInfoOutOfOrderPtr(sn uint16) int { + offset := sn - r.highestSN + if offset > 0 && offset < (1<<15) { + return -1 // in-order, not expected, maybe too new + } else { + offset = r.highestSN - sn + if int(offset) >= SnInfoSize { + // too old, ignore return -1 } } - if int(offset) >= len(r.snInfos) { - // slide buffer and throw away old sequence numbers - slide := int(offset) - len(r.snInfos) + 1 - r.snInfoBaseOffset = (r.snInfoBaseOffset + slide) & SnInfoMask - r.snInfoBaseSn = r.snInfos[r.snInfoBaseOffset].sn - offset -= uint16(slide) - } - - if (sn - r.snInfoHighestSn) < (1 << 15) { - r.snInfoHighestSn = sn - } - - return (r.snInfoBaseOffset + int(offset)) & SnInfoMask -} - -func (r *RTPStats) getSnInfoReadPtr(sn uint16) int { - if !r.snInfoInitialized { - return -1 - } - - offset := sn - r.snInfoBaseSn - if offset > (1 << 15) { - // out of order - return -1 - } - - if int(offset) >= len(r.snInfos) { - // asking for newer than what is in buffer - return -1 - } - - return (r.snInfoBaseOffset + int(offset)) & SnInfoMask + return (r.snInfoWritePtr - int(offset) - 1) & SnInfoMask } func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, payloadSize uint16, marker bool) { - writePtr := r.updateAndGetSnInfoWritePtr(sn) - if writePtr < 0 { - return + writePtr := 0 + ooo := (sn - r.highestSN) > (1 << 15) + if !ooo { + writePtr = r.snInfoWritePtr + r.snInfoWritePtr = (writePtr + 1) & SnInfoMask + } else { + writePtr = r.getSnInfoOutOfOrderPtr(sn) + if writePtr < 0 { + return + } } snInfo := &r.snInfos[writePtr] - snInfo.sn = sn snInfo.pktSize = pktSize snInfo.isPaddingOnly = payloadSize == 0 snInfo.marker = marker } -func (r *RTPStats) clearSnInfo(sn uint16) { - writePtr := r.updateAndGetSnInfoWritePtr(sn) - if writePtr < 0 { - return - } +func (r *RTPStats) clearSnInfos(startInclusive uint16, endExclusive uint16) { + for sn := startInclusive; sn != endExclusive; sn++ { + snInfo := &r.snInfos[r.snInfoWritePtr] + snInfo.pktSize = 0 + snInfo.isPaddingOnly = false + snInfo.marker = false - snInfo := &r.snInfos[writePtr] - snInfo.sn = sn - snInfo.pktSize = 0 - snInfo.isPaddingOnly = false - snInfo.marker = false + r.snInfoWritePtr = (r.snInfoWritePtr + 1) & SnInfoMask + } } func (r *RTPStats) isSnInfoLost(sn uint16) bool { - readPtr := r.getSnInfoReadPtr(sn) + readPtr := r.getSnInfoOutOfOrderPtr(sn) if readPtr < 0 { - // don't have information, declare not lost return false } @@ -1018,9 +977,8 @@ func (r *RTPStats) isSnInfoLost(sn uint16) bool { func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) (packets uint32, bytes uint64, packetsPadding uint32, bytesPadding uint64, packetsLost uint32, frames uint32) { packetsNotFound := uint32(0) processSN := func(sn uint16) { - readPtr := r.getSnInfoReadPtr(sn) + readPtr := r.getSnInfoOutOfOrderPtr(sn) if readPtr < 0 { - packetsNotFound++ return } @@ -1059,8 +1017,6 @@ func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) "could not find some packets", nil, "start", startInclusive, "end", endExclusive, - "baseSN", r.snInfoBaseSn, - "baseOffset", r.snInfoBaseOffset, "count", packetsNotFound, ) }