Lock to receiver report for senders (#616)

This commit is contained in:
Raja Subramanian
2022-04-17 08:43:50 +05:30
committed by GitHub
parent a98d955284
commit ed2a0011d9
3 changed files with 175 additions and 128 deletions

View File

@@ -2,7 +2,6 @@ package buffer
import (
"fmt"
"math/bits"
"sync"
"time"
@@ -19,12 +18,9 @@ const (
SequenceNumberMin = uint16(0)
SequenceNumberMax = uint16(65535)
NumSequenceNumbers = 65536
FirstSnapshotId = 1
)
func getPos(sn uint16) (uint16, uint16) {
return sn >> 6, sn & 0x3f
}
type RTPFlowState struct {
IsHighestSN bool
HasLoss bool
@@ -56,25 +52,28 @@ type RTPDeltaInfo struct {
}
type Snapshot struct {
extStartSN uint32
bytes uint64
packetsDuplicate uint32
bytesDuplicate uint64
packetsPadding uint32
bytesPadding uint64
frames uint32
nacks uint32
plis uint32
firs uint32
maxJitter float64
isJitterOverridden bool
maxJitterOverridden float64
maxRtt uint32
extStartSN uint32
packetsDuplicate uint32
bytesDuplicate uint64
packetsLostOverridden uint32
nacks uint32
plis uint32
firs uint32
maxRtt uint32
maxJitter float64
maxJitterOverridden float64
}
type SnInfo struct {
pktSize uint16
payloadSize uint16
marker bool
}
type RTPStatsParams struct {
ClockRate uint32
Logger logger.Logger
ClockRate uint32
IsReceiverReportDriven bool
Logger logger.Logger
}
type RTPStats struct {
@@ -92,6 +91,9 @@ type RTPStats struct {
highestSN uint16
cycles uint16
isRRSeen bool
extHighestSNOverridden uint32
highestTS uint32
highestTime int64
@@ -105,19 +107,17 @@ type RTPStats struct {
packetsOutOfOrder uint32
packetsLost uint32
isPacketsLostOverridden bool
packetsLostOverridden uint32
packetsLost uint32
packetsLostOverridden uint32
frames uint32
jitter float64
maxJitter float64
isJitterOverridden bool
jitterOverridden float64
maxJitterOverridden float64
seenSNs [NumSequenceNumbers / 64]uint64
snInfos [NumSequenceNumbers]SnInfo
gapHistogram [GapHistogramNumBins]uint32
nacks uint32
@@ -150,7 +150,7 @@ func NewRTPStats(params RTPStatsParams) *RTPStats {
return &RTPStats{
params: params,
logger: params.Logger,
nextSnapshotId: 1,
nextSnapshotId: FirstSnapshotId,
snapshots: make(map[uint32]*Snapshot),
}
}
@@ -171,7 +171,12 @@ func (r *RTPStats) NewSnapshotId() uint32 {
defer r.lock.Unlock()
id := r.nextSnapshotId
if r.initialized {
r.snapshots[id] = &Snapshot{extStartSN: r.extStartSN}
}
r.nextSnapshotId++
return id
}
@@ -204,6 +209,11 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
r.cycles = 0
first = true
// initialize snapshots if any
for i := uint32(FirstSnapshotId); i < r.nextSnapshotId; i++ {
r.snapshots[i] = &Snapshot{extStartSN: r.extStartSN}
}
}
pktSize := uint64(rtph.MarshalSize() + payloadSize + paddingSize)
@@ -219,7 +229,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
// adjust start to account for out-of-order packets before a cycle completes
r.maybeAdjustStartSN(rtph, packetTime)
if r.isSeenSN(rtph.SequenceNumber) {
if !r.isSnInfoLost(rtph.SequenceNumber) {
r.bytesDuplicate += pktSize
r.packetsDuplicate++
isDuplicate = true
@@ -241,7 +251,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
// update missing sequence numbers
for lost := r.highestSN + 1; lost != rtph.SequenceNumber; lost++ {
r.clearSeenSN(lost)
r.clearSnInfo(lost)
}
r.packetsLost += uint32(diff - 1)
@@ -257,10 +267,9 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
}
}
// set current sequence number in seen list
r.setSeenSN(rtph.SequenceNumber)
if !isDuplicate {
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker)
if payloadSize == 0 {
r.packetsPadding++
r.bytesPadding += pktSize
@@ -285,7 +294,14 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64) {
// NOTE: current sequence number is counted as loss as it will be deducted in the duplicate check
r.packetsLost += uint32(uint16(r.extStartSN) - rtph.SequenceNumber)
beforeAdjust := r.extStartSN
r.extStartSN = uint32(rtph.SequenceNumber)
for _, s := range r.snapshots {
if s.extStartSN == beforeAdjust {
s.extStartSN = r.extStartSN
}
}
}
func (r *RTPStats) GetTotalPacketsPrimary() uint32 {
@@ -296,7 +312,13 @@ func (r *RTPStats) GetTotalPacketsPrimary() uint32 {
}
func (r *RTPStats) getTotalPacketsPrimary() uint32 {
packetsSeen := r.getNumPacketsSeen()
packetsExpected := r.getExtHighestSN() - r.extStartSN + 1
if r.packetsLost > packetsExpected {
// should not happen
return 0
}
packetsSeen := packetsExpected - r.packetsLost
if r.packetsPadding > packetsSeen {
return 0
}
@@ -304,34 +326,34 @@ func (r *RTPStats) getTotalPacketsPrimary() uint32 {
return packetsSeen - r.packetsPadding
}
func (r *RTPStats) UpdatePacketsLost(packetsLost uint32) {
func (r *RTPStats) UpdateFromReceiverReport(extHighestSN uint32, packetsLost uint32, rtt uint32, jitter float64) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
if !r.endTime.IsZero() || !r.params.IsReceiverReportDriven {
return
}
r.isPacketsLostOverridden = true
r.isRRSeen = true
r.extHighestSNOverridden = extHighestSN
r.packetsLostOverridden = packetsLost
}
func (r *RTPStats) UpdateJitter(jitter float64) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
return
r.rtt = rtt
if rtt > r.maxRtt {
r.maxRtt = rtt
}
r.isJitterOverridden = true
r.jitterOverridden = jitter
if jitter > r.maxJitterOverridden {
r.maxJitterOverridden = jitter
}
// update snapshots
for _, s := range r.snapshots {
s.isJitterOverridden = true
if rtt > s.maxRtt {
s.maxRtt = rtt
}
if jitter > s.maxJitterOverridden {
s.maxJitterOverridden = jitter
}
@@ -551,7 +573,7 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8,
r.lock.RLock()
defer r.lock.RUnlock()
packetsExpected := now.extStartSN - then.extStartSN + 1
packetsExpected := now.extStartSN - then.extStartSN
if packetsExpected > NumSequenceNumbers {
logger.Warnw(
"too many packets expected in receiver report",
@@ -563,7 +585,13 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8,
return nil
}
packetsLost := r.numMissingSNs(uint16(then.extStartSN), uint16(now.extStartSN))
packetsLost := uint32(0)
if r.params.IsReceiverReportDriven {
// should not be set for streams that need to generate reception report
packetsLost = now.packetsLostOverridden - then.packetsLostOverridden
} else {
_, _, _, _, packetsLost, _ = r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN))
}
lossRate := float32(packetsLost) / float32(packetsExpected)
fracLost := uint8(lossRate * 256.0)
if proxyFracLost > fracLost {
@@ -578,7 +606,8 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8,
}
jitter := r.jitter
if r.isJitterOverridden {
if r.params.IsReceiverReportDriven {
// should not be set for streams that need to generate reception report
jitter = r.jitterOverridden
}
@@ -605,7 +634,7 @@ func (r *RTPStats) SnapshotInfo(snapshotId uint32) *RTPSnapshotInfo {
r.lock.RLock()
defer r.lock.RUnlock()
packetsExpected := now.extStartSN - then.extStartSN + 1
packetsExpected := now.extStartSN - then.extStartSN
if packetsExpected > NumSequenceNumbers {
logger.Warnw(
"too many packets expected in snapshot",
@@ -617,10 +646,15 @@ func (r *RTPStats) SnapshotInfo(snapshotId uint32) *RTPSnapshotInfo {
return nil
}
packetsLost := r.numMissingSNs(uint16(then.extStartSN), uint16(now.extStartSN))
packetsLost := uint32(0)
if r.params.IsReceiverReportDriven {
packetsLost = now.packetsLostOverridden - then.packetsLostOverridden
} else {
_, _, _, _, packetsLost, _ = r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN))
}
maxJitter := then.maxJitter
if then.isJitterOverridden {
if r.params.IsReceiverReportDriven {
maxJitter = then.maxJitterOverridden
}
maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6
@@ -645,7 +679,7 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo {
r.lock.RLock()
defer r.lock.RUnlock()
packetsExpected := now.extStartSN - then.extStartSN + 1
packetsExpected := now.extStartSN - then.extStartSN
if packetsExpected > NumSequenceNumbers {
logger.Warnw(
"too many packets expected in delta",
@@ -657,25 +691,26 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo {
return nil
}
packetsLost := r.numMissingSNs(uint16(then.extStartSN), uint16(now.extStartSN))
packetsPadding := now.packetsPadding - then.packetsPadding
bytesPadding := now.bytesPadding - then.bytesPadding
_, bytes, packetsPadding, bytesPadding, packetsLost, frames := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN))
if r.params.IsReceiverReportDriven {
packetsLost = now.packetsLostOverridden - then.packetsLostOverridden
}
maxJitter := then.maxJitter
if then.isJitterOverridden {
if r.params.IsReceiverReportDriven {
maxJitter = then.maxJitterOverridden
}
maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6
return &RTPDeltaInfo{
Packets: packetsExpected - packetsPadding,
Bytes: now.bytes - then.bytes - bytesPadding,
Bytes: bytes,
PacketsDuplicate: now.packetsDuplicate - then.packetsDuplicate,
BytesDuplicate: now.bytesDuplicate - then.bytesDuplicate,
PacketsPadding: packetsPadding,
BytesPadding: bytesPadding,
PacketsLost: packetsLost,
Frames: now.frames - then.frames,
Frames: frames,
RttMax: then.maxRtt,
JitterMax: maxJitterTime,
Nacks: now.nacks - then.nacks,
@@ -716,7 +751,7 @@ func (r *RTPStats) ToString() string {
jitter := r.jitter
maxJitter := r.maxJitter
if r.isJitterOverridden {
if r.params.IsReceiverReportDriven {
jitter = r.jitterOverridden
maxJitter = r.maxJitterOverridden
}
@@ -770,17 +805,14 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
return nil
}
packetsExpected := r.getExtHighestSN() - r.extStartSN + 1
packets := r.getTotalPacketsPrimary()
packetRate := float64(packets) / elapsed
bitrate := float64(r.bytes) * 8.0 / elapsed
frameRate := float64(r.frames) / elapsed
packetsLost := r.packetsLost
if r.isPacketsLostOverridden {
packetsLost = r.packetsLostOverridden
}
packetsExpected := r.getExtHighestSN() - r.extStartSN + 1
packetsLost := r.getPacketsLost()
packetLostRate := float64(packetsLost) / elapsed
packetLostPercentage := float32(packetsLost) / float32(packetsExpected) * 100.0
@@ -792,7 +824,7 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
jitter := r.jitter
maxJitter := r.maxJitter
if r.isJitterOverridden {
if r.params.IsReceiverReportDriven {
jitter = r.jitterOverridden
maxJitter = r.maxJitterOverridden
}
@@ -865,53 +897,73 @@ func (r *RTPStats) getExtHighestSN() uint32 {
return (uint32(r.cycles) << 16) | uint32(r.highestSN)
}
func (r *RTPStats) getNumPacketsSeen() uint32 {
packetsExpected := r.getExtHighestSN() - r.extStartSN + 1
if r.packetsLost > packetsExpected {
// should not happen
return 0
func (r *RTPStats) getExtHighestSNAdjusted() uint32 {
if r.params.IsReceiverReportDriven && r.isRRSeen {
return r.extHighestSNOverridden
}
return packetsExpected - r.packetsLost
return r.getExtHighestSN()
}
func (r *RTPStats) setSeenSN(sn uint16) {
idx, rem := getPos(sn)
r.seenSNs[idx] |= 1 << rem
}
func (r *RTPStats) clearSeenSN(sn uint16) {
idx, rem := getPos(sn)
r.seenSNs[idx] &^= 1 << rem
}
func (r *RTPStats) isSeenSN(sn uint16) bool {
idx, rem := getPos(sn)
return (r.seenSNs[idx] & (1 << rem)) != 0
}
func (r *RTPStats) numMissingSNs(startInclusive uint16, endInclusive uint16) uint32 {
startIdx, startRem := getPos(startInclusive)
endIdx, endRem := getPos(endInclusive + 1)
seen := uint32(0)
idx := startIdx
loopEnd := (endIdx + 1) % uint16(len(r.seenSNs))
for idx != loopEnd {
mask := uint64((1 << 64) - 1)
if idx == startIdx {
mask &^= uint64((1 << startRem) - 1)
}
if idx == endIdx {
mask &= uint64((1 << endRem) - 1)
}
seen += uint32(bits.OnesCount64(r.seenSNs[idx] & mask))
idx = (idx + 1) % uint16(len(r.seenSNs))
func (r *RTPStats) getPacketsLost() uint32 {
if r.params.IsReceiverReportDriven && r.isRRSeen {
return r.packetsLostOverridden
}
return uint32(endInclusive-startInclusive+1) - seen
return r.packetsLost
}
func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, payloadSize uint16, marker bool) {
snInfo := &r.snInfos[sn]
snInfo.pktSize = pktSize
snInfo.payloadSize = payloadSize
snInfo.marker = marker
}
func (r *RTPStats) clearSnInfo(sn uint16) {
snInfo := &r.snInfos[sn]
snInfo.pktSize = 0
snInfo.payloadSize = 0
snInfo.marker = false
}
func (r *RTPStats) isSnInfoLost(sn uint16) bool {
snInfo := &r.snInfos[sn]
return snInfo.pktSize == 0 && snInfo.payloadSize == 0
}
func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) (packets uint32, bytes uint64, packetsPadding uint32, bytesPadding uint64, packetsLost uint32, frames uint32) {
processSN := func(sn uint16) {
snInfo := &r.snInfos[sn]
switch {
case snInfo.pktSize == 0 && snInfo.payloadSize == 0:
packetsLost++
case snInfo.payloadSize == 0:
packetsPadding++
bytesPadding += uint64(snInfo.pktSize)
default:
packets++
bytes += uint64(snInfo.pktSize)
}
if snInfo.marker {
frames++
}
}
if startInclusive == endExclusive {
// do a full cycle
for sn := uint32(0); sn < NumSequenceNumbers; sn++ {
processSN(uint16(sn))
}
} else {
for sn := startInclusive; sn != endExclusive; sn++ {
processSN(sn)
}
}
return
}
func (r *RTPStats) updateJitter(rtph *rtp.Header, packetTime int64) {
@@ -952,7 +1004,7 @@ func (r *RTPStats) updateGapHistogram(gap int) {
}
func (r *RTPStats) getAndResetSnapshot(snapshotId uint32) (*Snapshot, *Snapshot) {
if !r.initialized {
if !r.initialized || (r.params.IsReceiverReportDriven && !r.isRRSeen) {
return nil, nil
}
@@ -966,20 +1018,16 @@ func (r *RTPStats) getAndResetSnapshot(snapshotId uint32) (*Snapshot, *Snapshot)
// snapshot now
r.snapshots[snapshotId] = &Snapshot{
extStartSN: r.getExtHighestSN(),
bytes: r.bytes,
packetsDuplicate: r.packetsDuplicate,
bytesDuplicate: r.bytesDuplicate,
packetsPadding: r.packetsPadding,
bytesPadding: r.bytesPadding,
frames: r.frames,
nacks: r.nacks,
plis: r.plis,
firs: r.firs,
maxJitter: 0.0,
isJitterOverridden: false,
maxJitterOverridden: 0.0,
maxRtt: 0,
extStartSN: r.getExtHighestSNAdjusted() + 1,
packetsDuplicate: r.packetsDuplicate,
bytesDuplicate: r.bytesDuplicate,
packetsLostOverridden: r.packetsLostOverridden,
nacks: r.nacks,
plis: r.plis,
firs: r.firs,
maxJitter: 0.0,
maxJitterOverridden: 0.0,
maxRtt: 0,
}
// make a copy so that it can be used independently
now := *r.snapshots[snapshotId]

View File

@@ -128,7 +128,8 @@ func TestRTPStats_Update(t *testing.T) {
require.Equal(t, uint32(3), r.packetsOutOfOrder)
require.Equal(t, uint32(1), r.packetsDuplicate)
require.Equal(t, uint32(16), r.packetsLost)
require.Equal(t, uint32(16), r.numMissingSNs(uint16(r.extStartSN), uint16(r.getExtHighestSN())))
_, _, _, _, packetsLost, _ := r.getIntervalStats(uint16(r.extStartSN), uint16(r.getExtHighestSN()+1))
require.Equal(t, uint32(16), packetsLost)
r.Stop()
}

View File

@@ -203,8 +203,9 @@ func NewDownTrack(
})
d.rtpStats = buffer.NewRTPStats(buffer.RTPStatsParams{
ClockRate: d.codec.ClockRate,
Logger: d.logger,
ClockRate: d.codec.ClockRate,
IsReceiverReportDriven: true,
Logger: d.logger,
})
d.connectionQualitySnapshotId = d.rtpStats.NewSnapshotId()
d.deltaStatsSnapshotId = d.rtpStats.NewSnapshotId()
@@ -994,15 +995,12 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
}
rr.Reports = append(rr.Reports, r)
d.rtpStats.UpdatePacketsLost(r.TotalLost)
rtt := getRttMs(&r)
if rtt != d.rtpStats.GetRtt() {
rttToReport = rtt
}
d.rtpStats.UpdateRtt(rtt)
d.rtpStats.UpdateJitter(float64(r.Jitter))
d.rtpStats.UpdateFromReceiverReport(r.LastSequenceNumber, r.TotalLost, rtt, float64(r.Jitter))
}
if len(rr.Reports) > 0 {
d.listenerLock.RLock()