From ed2a0011d9e9812f5a96b2bb9d2bf20727b3702b Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sun, 17 Apr 2022 08:43:50 +0530 Subject: [PATCH] Lock to receiver report for senders (#616) --- pkg/sfu/buffer/rtpstats.go | 290 +++++++++++++++++++------------- pkg/sfu/buffer/rtpstats_test.go | 3 +- pkg/sfu/downtrack.go | 10 +- 3 files changed, 175 insertions(+), 128 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index fe5959e56..b3291b1ac 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -2,7 +2,6 @@ package buffer import ( "fmt" - "math/bits" "sync" "time" @@ -19,12 +18,9 @@ const ( SequenceNumberMin = uint16(0) SequenceNumberMax = uint16(65535) NumSequenceNumbers = 65536 + FirstSnapshotId = 1 ) -func getPos(sn uint16) (uint16, uint16) { - return sn >> 6, sn & 0x3f -} - type RTPFlowState struct { IsHighestSN bool HasLoss bool @@ -56,25 +52,28 @@ type RTPDeltaInfo struct { } type Snapshot struct { - extStartSN uint32 - bytes uint64 - packetsDuplicate uint32 - bytesDuplicate uint64 - packetsPadding uint32 - bytesPadding uint64 - frames uint32 - nacks uint32 - plis uint32 - firs uint32 - maxJitter float64 - isJitterOverridden bool - maxJitterOverridden float64 - maxRtt uint32 + extStartSN uint32 + packetsDuplicate uint32 + bytesDuplicate uint64 + packetsLostOverridden uint32 + nacks uint32 + plis uint32 + firs uint32 + maxRtt uint32 + maxJitter float64 + maxJitterOverridden float64 +} + +type SnInfo struct { + pktSize uint16 + payloadSize uint16 + marker bool } type RTPStatsParams struct { - ClockRate uint32 - Logger logger.Logger + ClockRate uint32 + IsReceiverReportDriven bool + Logger logger.Logger } type RTPStats struct { @@ -92,6 +91,9 @@ type RTPStats struct { highestSN uint16 cycles uint16 + isRRSeen bool + extHighestSNOverridden uint32 + highestTS uint32 highestTime int64 @@ -105,19 +107,17 @@ type RTPStats struct { packetsOutOfOrder uint32 - packetsLost uint32 - isPacketsLostOverridden bool - packetsLostOverridden uint32 + packetsLost uint32 + packetsLostOverridden uint32 frames uint32 jitter float64 maxJitter float64 - isJitterOverridden bool jitterOverridden float64 maxJitterOverridden float64 - seenSNs [NumSequenceNumbers / 64]uint64 + snInfos [NumSequenceNumbers]SnInfo gapHistogram [GapHistogramNumBins]uint32 nacks uint32 @@ -150,7 +150,7 @@ func NewRTPStats(params RTPStatsParams) *RTPStats { return &RTPStats{ params: params, logger: params.Logger, - nextSnapshotId: 1, + nextSnapshotId: FirstSnapshotId, snapshots: make(map[uint32]*Snapshot), } } @@ -171,7 +171,12 @@ func (r *RTPStats) NewSnapshotId() uint32 { defer r.lock.Unlock() id := r.nextSnapshotId + if r.initialized { + r.snapshots[id] = &Snapshot{extStartSN: r.extStartSN} + } + r.nextSnapshotId++ + return id } @@ -204,6 +209,11 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa r.cycles = 0 first = true + + // initialize snapshots if any + for i := uint32(FirstSnapshotId); i < r.nextSnapshotId; i++ { + r.snapshots[i] = &Snapshot{extStartSN: r.extStartSN} + } } pktSize := uint64(rtph.MarshalSize() + payloadSize + paddingSize) @@ -219,7 +229,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa // adjust start to account for out-of-order packets before a cycle completes r.maybeAdjustStartSN(rtph, packetTime) - if r.isSeenSN(rtph.SequenceNumber) { + if !r.isSnInfoLost(rtph.SequenceNumber) { r.bytesDuplicate += pktSize r.packetsDuplicate++ isDuplicate = true @@ -241,7 +251,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa // update missing sequence numbers for lost := r.highestSN + 1; lost != rtph.SequenceNumber; lost++ { - r.clearSeenSN(lost) + r.clearSnInfo(lost) } r.packetsLost += uint32(diff - 1) @@ -257,10 +267,9 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa } } - // set current sequence number in seen list - r.setSeenSN(rtph.SequenceNumber) - if !isDuplicate { + r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker) + if payloadSize == 0 { r.packetsPadding++ r.bytesPadding += pktSize @@ -285,7 +294,14 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64) { // NOTE: current sequence number is counted as loss as it will be deducted in the duplicate check r.packetsLost += uint32(uint16(r.extStartSN) - rtph.SequenceNumber) + beforeAdjust := r.extStartSN r.extStartSN = uint32(rtph.SequenceNumber) + + for _, s := range r.snapshots { + if s.extStartSN == beforeAdjust { + s.extStartSN = r.extStartSN + } + } } func (r *RTPStats) GetTotalPacketsPrimary() uint32 { @@ -296,7 +312,13 @@ func (r *RTPStats) GetTotalPacketsPrimary() uint32 { } func (r *RTPStats) getTotalPacketsPrimary() uint32 { - packetsSeen := r.getNumPacketsSeen() + packetsExpected := r.getExtHighestSN() - r.extStartSN + 1 + if r.packetsLost > packetsExpected { + // should not happen + return 0 + } + + packetsSeen := packetsExpected - r.packetsLost if r.packetsPadding > packetsSeen { return 0 } @@ -304,34 +326,34 @@ func (r *RTPStats) getTotalPacketsPrimary() uint32 { return packetsSeen - r.packetsPadding } -func (r *RTPStats) UpdatePacketsLost(packetsLost uint32) { +func (r *RTPStats) UpdateFromReceiverReport(extHighestSN uint32, packetsLost uint32, rtt uint32, jitter float64) { r.lock.Lock() defer r.lock.Unlock() - if !r.endTime.IsZero() { + if !r.endTime.IsZero() || !r.params.IsReceiverReportDriven { return } - r.isPacketsLostOverridden = true + r.isRRSeen = true + r.extHighestSNOverridden = extHighestSN r.packetsLostOverridden = packetsLost -} -func (r *RTPStats) UpdateJitter(jitter float64) { - r.lock.Lock() - defer r.lock.Unlock() - - if !r.endTime.IsZero() { - return + r.rtt = rtt + if rtt > r.maxRtt { + r.maxRtt = rtt } - r.isJitterOverridden = true r.jitterOverridden = jitter if jitter > r.maxJitterOverridden { r.maxJitterOverridden = jitter } + // update snapshots for _, s := range r.snapshots { - s.isJitterOverridden = true + if rtt > s.maxRtt { + s.maxRtt = rtt + } + if jitter > s.maxJitterOverridden { s.maxJitterOverridden = jitter } @@ -551,7 +573,7 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, r.lock.RLock() defer r.lock.RUnlock() - packetsExpected := now.extStartSN - then.extStartSN + 1 + packetsExpected := now.extStartSN - then.extStartSN if packetsExpected > NumSequenceNumbers { logger.Warnw( "too many packets expected in receiver report", @@ -563,7 +585,13 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, return nil } - packetsLost := r.numMissingSNs(uint16(then.extStartSN), uint16(now.extStartSN)) + packetsLost := uint32(0) + if r.params.IsReceiverReportDriven { + // should not be set for streams that need to generate reception report + packetsLost = now.packetsLostOverridden - then.packetsLostOverridden + } else { + _, _, _, _, packetsLost, _ = r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN)) + } lossRate := float32(packetsLost) / float32(packetsExpected) fracLost := uint8(lossRate * 256.0) if proxyFracLost > fracLost { @@ -578,7 +606,8 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, } jitter := r.jitter - if r.isJitterOverridden { + if r.params.IsReceiverReportDriven { + // should not be set for streams that need to generate reception report jitter = r.jitterOverridden } @@ -605,7 +634,7 @@ func (r *RTPStats) SnapshotInfo(snapshotId uint32) *RTPSnapshotInfo { r.lock.RLock() defer r.lock.RUnlock() - packetsExpected := now.extStartSN - then.extStartSN + 1 + packetsExpected := now.extStartSN - then.extStartSN if packetsExpected > NumSequenceNumbers { logger.Warnw( "too many packets expected in snapshot", @@ -617,10 +646,15 @@ func (r *RTPStats) SnapshotInfo(snapshotId uint32) *RTPSnapshotInfo { return nil } - packetsLost := r.numMissingSNs(uint16(then.extStartSN), uint16(now.extStartSN)) + packetsLost := uint32(0) + if r.params.IsReceiverReportDriven { + packetsLost = now.packetsLostOverridden - then.packetsLostOverridden + } else { + _, _, _, _, packetsLost, _ = r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN)) + } maxJitter := then.maxJitter - if then.isJitterOverridden { + if r.params.IsReceiverReportDriven { maxJitter = then.maxJitterOverridden } maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6 @@ -645,7 +679,7 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo { r.lock.RLock() defer r.lock.RUnlock() - packetsExpected := now.extStartSN - then.extStartSN + 1 + packetsExpected := now.extStartSN - then.extStartSN if packetsExpected > NumSequenceNumbers { logger.Warnw( "too many packets expected in delta", @@ -657,25 +691,26 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo { return nil } - packetsLost := r.numMissingSNs(uint16(then.extStartSN), uint16(now.extStartSN)) - packetsPadding := now.packetsPadding - then.packetsPadding - bytesPadding := now.bytesPadding - then.bytesPadding + _, bytes, packetsPadding, bytesPadding, packetsLost, frames := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN)) + if r.params.IsReceiverReportDriven { + packetsLost = now.packetsLostOverridden - then.packetsLostOverridden + } maxJitter := then.maxJitter - if then.isJitterOverridden { + if r.params.IsReceiverReportDriven { maxJitter = then.maxJitterOverridden } maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6 return &RTPDeltaInfo{ Packets: packetsExpected - packetsPadding, - Bytes: now.bytes - then.bytes - bytesPadding, + Bytes: bytes, PacketsDuplicate: now.packetsDuplicate - then.packetsDuplicate, BytesDuplicate: now.bytesDuplicate - then.bytesDuplicate, PacketsPadding: packetsPadding, BytesPadding: bytesPadding, PacketsLost: packetsLost, - Frames: now.frames - then.frames, + Frames: frames, RttMax: then.maxRtt, JitterMax: maxJitterTime, Nacks: now.nacks - then.nacks, @@ -716,7 +751,7 @@ func (r *RTPStats) ToString() string { jitter := r.jitter maxJitter := r.maxJitter - if r.isJitterOverridden { + if r.params.IsReceiverReportDriven { jitter = r.jitterOverridden maxJitter = r.maxJitterOverridden } @@ -770,17 +805,14 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { return nil } - packetsExpected := r.getExtHighestSN() - r.extStartSN + 1 packets := r.getTotalPacketsPrimary() packetRate := float64(packets) / elapsed bitrate := float64(r.bytes) * 8.0 / elapsed frameRate := float64(r.frames) / elapsed - packetsLost := r.packetsLost - if r.isPacketsLostOverridden { - packetsLost = r.packetsLostOverridden - } + packetsExpected := r.getExtHighestSN() - r.extStartSN + 1 + packetsLost := r.getPacketsLost() packetLostRate := float64(packetsLost) / elapsed packetLostPercentage := float32(packetsLost) / float32(packetsExpected) * 100.0 @@ -792,7 +824,7 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { jitter := r.jitter maxJitter := r.maxJitter - if r.isJitterOverridden { + if r.params.IsReceiverReportDriven { jitter = r.jitterOverridden maxJitter = r.maxJitterOverridden } @@ -865,53 +897,73 @@ func (r *RTPStats) getExtHighestSN() uint32 { return (uint32(r.cycles) << 16) | uint32(r.highestSN) } -func (r *RTPStats) getNumPacketsSeen() uint32 { - packetsExpected := r.getExtHighestSN() - r.extStartSN + 1 - if r.packetsLost > packetsExpected { - // should not happen - return 0 +func (r *RTPStats) getExtHighestSNAdjusted() uint32 { + if r.params.IsReceiverReportDriven && r.isRRSeen { + return r.extHighestSNOverridden } - return packetsExpected - r.packetsLost + return r.getExtHighestSN() } -func (r *RTPStats) setSeenSN(sn uint16) { - idx, rem := getPos(sn) - r.seenSNs[idx] |= 1 << rem -} - -func (r *RTPStats) clearSeenSN(sn uint16) { - idx, rem := getPos(sn) - r.seenSNs[idx] &^= 1 << rem -} - -func (r *RTPStats) isSeenSN(sn uint16) bool { - idx, rem := getPos(sn) - return (r.seenSNs[idx] & (1 << rem)) != 0 -} - -func (r *RTPStats) numMissingSNs(startInclusive uint16, endInclusive uint16) uint32 { - startIdx, startRem := getPos(startInclusive) - endIdx, endRem := getPos(endInclusive + 1) - - seen := uint32(0) - idx := startIdx - loopEnd := (endIdx + 1) % uint16(len(r.seenSNs)) - for idx != loopEnd { - mask := uint64((1 << 64) - 1) - if idx == startIdx { - mask &^= uint64((1 << startRem) - 1) - } - if idx == endIdx { - mask &= uint64((1 << endRem) - 1) - } - - seen += uint32(bits.OnesCount64(r.seenSNs[idx] & mask)) - - idx = (idx + 1) % uint16(len(r.seenSNs)) +func (r *RTPStats) getPacketsLost() uint32 { + if r.params.IsReceiverReportDriven && r.isRRSeen { + return r.packetsLostOverridden } - return uint32(endInclusive-startInclusive+1) - seen + return r.packetsLost +} + +func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, payloadSize uint16, marker bool) { + snInfo := &r.snInfos[sn] + snInfo.pktSize = pktSize + snInfo.payloadSize = payloadSize + snInfo.marker = marker +} + +func (r *RTPStats) clearSnInfo(sn uint16) { + snInfo := &r.snInfos[sn] + snInfo.pktSize = 0 + snInfo.payloadSize = 0 + snInfo.marker = false +} + +func (r *RTPStats) isSnInfoLost(sn uint16) bool { + snInfo := &r.snInfos[sn] + return snInfo.pktSize == 0 && snInfo.payloadSize == 0 +} + +func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) (packets uint32, bytes uint64, packetsPadding uint32, bytesPadding uint64, packetsLost uint32, frames uint32) { + processSN := func(sn uint16) { + snInfo := &r.snInfos[sn] + switch { + case snInfo.pktSize == 0 && snInfo.payloadSize == 0: + packetsLost++ + + case snInfo.payloadSize == 0: + packetsPadding++ + bytesPadding += uint64(snInfo.pktSize) + + default: + packets++ + bytes += uint64(snInfo.pktSize) + } + + if snInfo.marker { + frames++ + } + } + + if startInclusive == endExclusive { + // do a full cycle + for sn := uint32(0); sn < NumSequenceNumbers; sn++ { + processSN(uint16(sn)) + } + } else { + for sn := startInclusive; sn != endExclusive; sn++ { + processSN(sn) + } + } + return } func (r *RTPStats) updateJitter(rtph *rtp.Header, packetTime int64) { @@ -952,7 +1004,7 @@ func (r *RTPStats) updateGapHistogram(gap int) { } func (r *RTPStats) getAndResetSnapshot(snapshotId uint32) (*Snapshot, *Snapshot) { - if !r.initialized { + if !r.initialized || (r.params.IsReceiverReportDriven && !r.isRRSeen) { return nil, nil } @@ -966,20 +1018,16 @@ func (r *RTPStats) getAndResetSnapshot(snapshotId uint32) (*Snapshot, *Snapshot) // snapshot now r.snapshots[snapshotId] = &Snapshot{ - extStartSN: r.getExtHighestSN(), - bytes: r.bytes, - packetsDuplicate: r.packetsDuplicate, - bytesDuplicate: r.bytesDuplicate, - packetsPadding: r.packetsPadding, - bytesPadding: r.bytesPadding, - frames: r.frames, - nacks: r.nacks, - plis: r.plis, - firs: r.firs, - maxJitter: 0.0, - isJitterOverridden: false, - maxJitterOverridden: 0.0, - maxRtt: 0, + extStartSN: r.getExtHighestSNAdjusted() + 1, + packetsDuplicate: r.packetsDuplicate, + bytesDuplicate: r.bytesDuplicate, + packetsLostOverridden: r.packetsLostOverridden, + nacks: r.nacks, + plis: r.plis, + firs: r.firs, + maxJitter: 0.0, + maxJitterOverridden: 0.0, + maxRtt: 0, } // make a copy so that it can be used independently now := *r.snapshots[snapshotId] diff --git a/pkg/sfu/buffer/rtpstats_test.go b/pkg/sfu/buffer/rtpstats_test.go index 335d1d1cc..584818cbe 100644 --- a/pkg/sfu/buffer/rtpstats_test.go +++ b/pkg/sfu/buffer/rtpstats_test.go @@ -128,7 +128,8 @@ func TestRTPStats_Update(t *testing.T) { require.Equal(t, uint32(3), r.packetsOutOfOrder) require.Equal(t, uint32(1), r.packetsDuplicate) require.Equal(t, uint32(16), r.packetsLost) - require.Equal(t, uint32(16), r.numMissingSNs(uint16(r.extStartSN), uint16(r.getExtHighestSN()))) + _, _, _, _, packetsLost, _ := r.getIntervalStats(uint16(r.extStartSN), uint16(r.getExtHighestSN()+1)) + require.Equal(t, uint32(16), packetsLost) r.Stop() } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 9493bfcec..eae06e6d8 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -203,8 +203,9 @@ func NewDownTrack( }) d.rtpStats = buffer.NewRTPStats(buffer.RTPStatsParams{ - ClockRate: d.codec.ClockRate, - Logger: d.logger, + ClockRate: d.codec.ClockRate, + IsReceiverReportDriven: true, + Logger: d.logger, }) d.connectionQualitySnapshotId = d.rtpStats.NewSnapshotId() d.deltaStatsSnapshotId = d.rtpStats.NewSnapshotId() @@ -994,15 +995,12 @@ func (d *DownTrack) handleRTCP(bytes []byte) { } rr.Reports = append(rr.Reports, r) - d.rtpStats.UpdatePacketsLost(r.TotalLost) - rtt := getRttMs(&r) if rtt != d.rtpStats.GetRtt() { rttToReport = rtt } - d.rtpStats.UpdateRtt(rtt) - d.rtpStats.UpdateJitter(float64(r.Jitter)) + d.rtpStats.UpdateFromReceiverReport(r.LastSequenceNumber, r.TotalLost, rtt, float64(r.Jitter)) } if len(rr.Reports) > 0 { d.listenerLock.RLock()