mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 15:35:41 +00:00
Simplifying SN info cache in RTPStats module (#660)
* Simplifying SN info cache in RTPStats module * Remove unnecessary field
This commit is contained in:
@@ -67,7 +67,6 @@ type Snapshot struct {
|
||||
}
|
||||
|
||||
type SnInfo struct {
|
||||
sn uint16
|
||||
pktSize uint16
|
||||
isPaddingOnly bool
|
||||
marker bool
|
||||
@@ -120,11 +119,8 @@ type RTPStats struct {
|
||||
jitterOverridden float64
|
||||
maxJitterOverridden float64
|
||||
|
||||
snInfos [SnInfoSize]SnInfo
|
||||
snInfoBaseOffset int
|
||||
snInfoInitialized bool
|
||||
snInfoBaseSn uint16
|
||||
snInfoHighestSn uint16
|
||||
snInfos [SnInfoSize]SnInfo
|
||||
snInfoWritePtr int
|
||||
|
||||
gapHistogram [GapHistogramNumBins]uint32
|
||||
|
||||
@@ -235,14 +231,15 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
|
||||
}
|
||||
|
||||
// adjust start to account for out-of-order packets before a cycle completes
|
||||
r.maybeAdjustStartSN(rtph, packetTime, pktSize, payloadSize)
|
||||
|
||||
if !r.isSnInfoLost(rtph.SequenceNumber) {
|
||||
r.bytesDuplicate += pktSize
|
||||
r.packetsDuplicate++
|
||||
isDuplicate = true
|
||||
} else {
|
||||
r.packetsLost--
|
||||
if !r.maybeAdjustStartSN(rtph, packetTime, pktSize, payloadSize) {
|
||||
if !r.isSnInfoLost(rtph.SequenceNumber) {
|
||||
r.bytesDuplicate += pktSize
|
||||
r.packetsDuplicate++
|
||||
isDuplicate = true
|
||||
} else {
|
||||
r.packetsLost--
|
||||
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker)
|
||||
}
|
||||
}
|
||||
|
||||
// in-order
|
||||
@@ -258,11 +255,11 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
|
||||
r.updateGapHistogram(int(diff))
|
||||
|
||||
// update missing sequence numbers
|
||||
for lost := r.highestSN + 1; lost != rtph.SequenceNumber; lost++ {
|
||||
r.clearSnInfo(lost)
|
||||
}
|
||||
r.clearSnInfos(r.highestSN+1, rtph.SequenceNumber)
|
||||
r.packetsLost += uint32(diff - 1)
|
||||
|
||||
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker)
|
||||
|
||||
if rtph.SequenceNumber < r.highestSN && !first {
|
||||
r.cycles++
|
||||
}
|
||||
@@ -276,8 +273,6 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
|
||||
}
|
||||
|
||||
if !isDuplicate {
|
||||
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker)
|
||||
|
||||
if payloadSize == 0 {
|
||||
r.packetsPadding++
|
||||
r.bytesPadding += pktSize
|
||||
@@ -291,17 +286,16 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
|
||||
return
|
||||
}
|
||||
|
||||
func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSize uint64, payloadSize int) {
|
||||
func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSize uint64, payloadSize int) bool {
|
||||
if (r.getExtHighestSN() - r.extStartSN + 1) >= (NumSequenceNumbers / 2) {
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
if (rtph.SequenceNumber - uint16(r.extStartSN)) < (1 << 15) {
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
// NOTE: current sequence number is counted as loss as it will be deducted in the duplicate check
|
||||
r.packetsLost += uint32(uint16(r.extStartSN) - rtph.SequenceNumber)
|
||||
r.packetsLost += uint32(uint16(r.extStartSN)-rtph.SequenceNumber) - 1
|
||||
beforeAdjust := r.extStartSN
|
||||
r.extStartSN = uint32(rtph.SequenceNumber)
|
||||
|
||||
@@ -312,6 +306,8 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSiz
|
||||
s.extStartSN = r.extStartSN
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *RTPStats) GetTotalPacketsPrimary() uint32 {
|
||||
@@ -923,91 +919,54 @@ func (r *RTPStats) getPacketsLost() uint32 {
|
||||
return r.packetsLost
|
||||
}
|
||||
|
||||
func (r *RTPStats) updateAndGetSnInfoWritePtr(sn uint16) int {
|
||||
if !r.snInfoInitialized {
|
||||
r.snInfoInitialized = true
|
||||
r.snInfoBaseSn = sn
|
||||
r.snInfoHighestSn = sn
|
||||
}
|
||||
|
||||
// check for out-of-order with base
|
||||
offset := sn - r.snInfoBaseSn
|
||||
if offset > (1 << 15) {
|
||||
offset = r.snInfoBaseSn - sn
|
||||
availableSpace := len(r.snInfos) - int(r.snInfoHighestSn-r.snInfoBaseSn+1)
|
||||
if availableSpace > int(offset) {
|
||||
r.snInfoBaseSn = sn
|
||||
r.snInfoBaseOffset = (r.snInfoBaseOffset - int(offset)) & SnInfoMask
|
||||
} else {
|
||||
// cannot move back
|
||||
func (r *RTPStats) getSnInfoOutOfOrderPtr(sn uint16) int {
|
||||
offset := sn - r.highestSN
|
||||
if offset > 0 && offset < (1<<15) {
|
||||
return -1 // in-order, not expected, maybe too new
|
||||
} else {
|
||||
offset = r.highestSN - sn
|
||||
if int(offset) >= SnInfoSize {
|
||||
// too old, ignore
|
||||
return -1
|
||||
}
|
||||
}
|
||||
|
||||
if int(offset) >= len(r.snInfos) {
|
||||
// slide buffer and throw away old sequence numbers
|
||||
slide := int(offset) - len(r.snInfos) + 1
|
||||
r.snInfoBaseOffset = (r.snInfoBaseOffset + slide) & SnInfoMask
|
||||
r.snInfoBaseSn = r.snInfos[r.snInfoBaseOffset].sn
|
||||
offset -= uint16(slide)
|
||||
}
|
||||
|
||||
if (sn - r.snInfoHighestSn) < (1 << 15) {
|
||||
r.snInfoHighestSn = sn
|
||||
}
|
||||
|
||||
return (r.snInfoBaseOffset + int(offset)) & SnInfoMask
|
||||
}
|
||||
|
||||
func (r *RTPStats) getSnInfoReadPtr(sn uint16) int {
|
||||
if !r.snInfoInitialized {
|
||||
return -1
|
||||
}
|
||||
|
||||
offset := sn - r.snInfoBaseSn
|
||||
if offset > (1 << 15) {
|
||||
// out of order
|
||||
return -1
|
||||
}
|
||||
|
||||
if int(offset) >= len(r.snInfos) {
|
||||
// asking for newer than what is in buffer
|
||||
return -1
|
||||
}
|
||||
|
||||
return (r.snInfoBaseOffset + int(offset)) & SnInfoMask
|
||||
return (r.snInfoWritePtr - int(offset) - 1) & SnInfoMask
|
||||
}
|
||||
|
||||
func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, payloadSize uint16, marker bool) {
|
||||
writePtr := r.updateAndGetSnInfoWritePtr(sn)
|
||||
if writePtr < 0 {
|
||||
return
|
||||
writePtr := 0
|
||||
ooo := (sn - r.highestSN) > (1 << 15)
|
||||
if !ooo {
|
||||
writePtr = r.snInfoWritePtr
|
||||
r.snInfoWritePtr = (writePtr + 1) & SnInfoMask
|
||||
} else {
|
||||
writePtr = r.getSnInfoOutOfOrderPtr(sn)
|
||||
if writePtr < 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
snInfo := &r.snInfos[writePtr]
|
||||
snInfo.sn = sn
|
||||
snInfo.pktSize = pktSize
|
||||
snInfo.isPaddingOnly = payloadSize == 0
|
||||
snInfo.marker = marker
|
||||
}
|
||||
|
||||
func (r *RTPStats) clearSnInfo(sn uint16) {
|
||||
writePtr := r.updateAndGetSnInfoWritePtr(sn)
|
||||
if writePtr < 0 {
|
||||
return
|
||||
}
|
||||
func (r *RTPStats) clearSnInfos(startInclusive uint16, endExclusive uint16) {
|
||||
for sn := startInclusive; sn != endExclusive; sn++ {
|
||||
snInfo := &r.snInfos[r.snInfoWritePtr]
|
||||
snInfo.pktSize = 0
|
||||
snInfo.isPaddingOnly = false
|
||||
snInfo.marker = false
|
||||
|
||||
snInfo := &r.snInfos[writePtr]
|
||||
snInfo.sn = sn
|
||||
snInfo.pktSize = 0
|
||||
snInfo.isPaddingOnly = false
|
||||
snInfo.marker = false
|
||||
r.snInfoWritePtr = (r.snInfoWritePtr + 1) & SnInfoMask
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RTPStats) isSnInfoLost(sn uint16) bool {
|
||||
readPtr := r.getSnInfoReadPtr(sn)
|
||||
readPtr := r.getSnInfoOutOfOrderPtr(sn)
|
||||
if readPtr < 0 {
|
||||
// don't have information, declare not lost
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -1018,9 +977,8 @@ func (r *RTPStats) isSnInfoLost(sn uint16) bool {
|
||||
func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) (packets uint32, bytes uint64, packetsPadding uint32, bytesPadding uint64, packetsLost uint32, frames uint32) {
|
||||
packetsNotFound := uint32(0)
|
||||
processSN := func(sn uint16) {
|
||||
readPtr := r.getSnInfoReadPtr(sn)
|
||||
readPtr := r.getSnInfoOutOfOrderPtr(sn)
|
||||
if readPtr < 0 {
|
||||
packetsNotFound++
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1059,8 +1017,6 @@ func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16)
|
||||
"could not find some packets", nil,
|
||||
"start", startInclusive,
|
||||
"end", endExclusive,
|
||||
"baseSN", r.snInfoBaseSn,
|
||||
"baseOffset", r.snInfoBaseOffset,
|
||||
"count", packetsNotFound,
|
||||
)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user