Reduce memory used by RTPStats. (#645)

Keep track of last 2048 sequence numbers instead of full range
to reduce memory usage.
This commit is contained in:
Raja Subramanian
2022-04-22 17:17:37 +05:30
committed by GitHub
parent 57e2321a18
commit 2e182afb61
+116 -15
View File
@@ -19,6 +19,8 @@ const (
SequenceNumberMax = uint16(65535)
NumSequenceNumbers = 65536
FirstSnapshotId = 1
SnInfoSize = 2048
SnInfoMask = SnInfoSize - 1
)
type RTPFlowState struct {
@@ -65,9 +67,10 @@ type Snapshot struct {
}
type SnInfo struct {
pktSize uint16
payloadSize uint16
marker bool
sn uint16
pktSize uint16
isPaddingOnly bool
marker bool
}
type RTPStatsParams struct {
@@ -117,7 +120,12 @@ type RTPStats struct {
jitterOverridden float64
maxJitterOverridden float64
snInfos [NumSequenceNumbers]SnInfo
snInfos [SnInfoSize]SnInfo
snInfoBaseOffset int
snInfoInitialized bool
snInfoBaseSn uint16
snInfoHighestSn uint16
gapHistogram [GapHistogramNumBins]uint32
nacks uint32
@@ -227,7 +235,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
}
// adjust start to account for out-of-order packets before a cycle completes
r.maybeAdjustStartSN(rtph, packetTime)
r.maybeAdjustStartSN(rtph, packetTime, pktSize, payloadSize)
if !r.isSnInfoLost(rtph.SequenceNumber) {
r.bytesDuplicate += pktSize
@@ -283,7 +291,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
return
}
func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64) {
func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSize uint64, payloadSize int) {
if (r.getExtHighestSN() - r.extStartSN + 1) >= (NumSequenceNumbers / 2) {
return
}
@@ -297,6 +305,8 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64) {
beforeAdjust := r.extStartSN
r.extStartSN = uint32(rtph.SequenceNumber)
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker)
for _, s := range r.snapshots {
if s.extStartSN == beforeAdjust {
s.extStartSN = r.extStartSN
@@ -913,33 +923,113 @@ func (r *RTPStats) getPacketsLost() uint32 {
return r.packetsLost
}
func (r *RTPStats) updateAndGetSnInfoWritePtr(sn uint16) int {
if !r.snInfoInitialized {
r.snInfoInitialized = true
r.snInfoBaseSn = sn
r.snInfoHighestSn = sn
}
// check for out-of-order with base
offset := sn - r.snInfoBaseSn
if offset > (1 << 15) {
offset = r.snInfoBaseSn - sn
availableSpace := len(r.snInfos) - int(r.snInfoHighestSn-r.snInfoBaseSn+1)
if availableSpace > int(offset) {
r.snInfoBaseSn = sn
r.snInfoBaseOffset = (r.snInfoBaseOffset - int(offset)) & SnInfoMask
} else {
// cannot move back
return -1
}
}
if int(offset) >= len(r.snInfos) {
// slide buffer and throw away old sequence numbers
slide := int(offset) - len(r.snInfos) + 1
r.snInfoBaseOffset = (r.snInfoBaseOffset + slide) & SnInfoMask
r.snInfoBaseSn = r.snInfos[r.snInfoBaseOffset].sn
offset -= uint16(slide)
}
if (sn - r.snInfoHighestSn) < (1 << 15) {
r.snInfoHighestSn = sn
}
return (r.snInfoBaseOffset + int(offset)) & SnInfoMask
}
func (r *RTPStats) getSnInfoReadPtr(sn uint16) int {
if !r.snInfoInitialized {
return -1
}
offset := sn - r.snInfoBaseSn
if offset > (1 << 15) {
// out of order
return -1
}
if int(offset) >= len(r.snInfos) {
// asking for newer than what is in buffer
return -1
}
return (r.snInfoBaseOffset + int(offset)) & SnInfoMask
}
func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, payloadSize uint16, marker bool) {
snInfo := &r.snInfos[sn]
writePtr := r.updateAndGetSnInfoWritePtr(sn)
if writePtr < 0 {
return
}
snInfo := &r.snInfos[writePtr]
snInfo.sn = sn
snInfo.pktSize = pktSize
snInfo.payloadSize = payloadSize
snInfo.isPaddingOnly = payloadSize == 0
snInfo.marker = marker
}
func (r *RTPStats) clearSnInfo(sn uint16) {
snInfo := &r.snInfos[sn]
writePtr := r.updateAndGetSnInfoWritePtr(sn)
if writePtr < 0 {
return
}
snInfo := &r.snInfos[writePtr]
snInfo.sn = sn
snInfo.pktSize = 0
snInfo.payloadSize = 0
snInfo.isPaddingOnly = false
snInfo.marker = false
}
func (r *RTPStats) isSnInfoLost(sn uint16) bool {
snInfo := &r.snInfos[sn]
return snInfo.pktSize == 0 && snInfo.payloadSize == 0
readPtr := r.getSnInfoReadPtr(sn)
if readPtr < 0 {
// don't have information, declare not lost
return false
}
snInfo := &r.snInfos[readPtr]
return snInfo.pktSize == 0
}
func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) (packets uint32, bytes uint64, packetsPadding uint32, bytesPadding uint64, packetsLost uint32, frames uint32) {
packetsNotFound := uint32(0)
processSN := func(sn uint16) {
snInfo := &r.snInfos[sn]
readPtr := r.getSnInfoReadPtr(sn)
if readPtr < 0 {
packetsNotFound++
return
}
snInfo := &r.snInfos[readPtr]
switch {
case snInfo.pktSize == 0 && snInfo.payloadSize == 0:
case snInfo.pktSize == 0:
packetsLost++
case snInfo.payloadSize == 0:
case snInfo.isPaddingOnly:
packetsPadding++
bytesPadding += uint64(snInfo.pktSize)
@@ -963,6 +1053,17 @@ func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16)
processSN(sn)
}
}
if packetsNotFound != 0 {
r.params.Logger.Warnw(
"could not find some packets", nil,
"start", startInclusive,
"end", endExclusive,
"baseSN", r.snInfoBaseSn,
"baseOffset", r.snInfoBaseOffset,
"count", packetsNotFound,
)
}
return
}