From cea41e4189c6f26d7c61692e2a2c0dea9b87f5d0 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 27 Jun 2023 17:44:53 +0530 Subject: [PATCH] Discount out-of-order packets in downstream score. (#1831) * Discount out-of-order packets in downstream score. More notes inline. * correct comment * clean up comment --- pkg/sfu/buffer/rtpstats.go | 22 +++++++--- pkg/sfu/connectionquality/connectionstats.go | 1 + pkg/sfu/connectionquality/scorer.go | 43 +++++++++++++++----- 3 files changed, 50 insertions(+), 16 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 0bb269ed6..ce702409b 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -60,6 +60,7 @@ type IntervalStats struct { bytesPadding uint64 headerBytesPadding uint64 packetsLost uint32 + packetsOutOfOrder uint32 frames uint32 } @@ -77,6 +78,7 @@ type RTPDeltaInfo struct { HeaderBytesPadding uint64 PacketsLost uint32 PacketsMissing uint32 + PacketsOutOfOrder uint32 Frames uint32 RttMax uint32 JitterMax float64 @@ -106,6 +108,7 @@ type SnInfo struct { pktSize uint16 isPaddingOnly bool marker bool + isOutOfOrder bool } type RTCPSenderReportData struct { @@ -412,7 +415,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa isDuplicate = true } else { r.packetsLost-- - r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker) + r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, true) } } @@ -431,7 +434,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa r.clearSnInfos(r.highestSN+1, rtph.SequenceNumber) r.packetsLost += uint32(diff - 1) - r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker) + r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, false) if rtph.SequenceNumber < r.highestSN && !first { r.cycles++ @@ -490,7 +493,7 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, pktSize uint64, hdrSize beforeAdjust := r.extStartSN r.extStartSN = uint32(rtph.SequenceNumber) - r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker) + r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, true) for _, s := range r.snapshots { if s.extStartSN == beforeAdjust { @@ -1132,7 +1135,6 @@ func (r *RTPStats) DeltaInfoOverridden(snapshotId uint32) *RTPDeltaInfo { } intervalStats := r.getIntervalStats(uint16(then.extStartSNOverridden), uint16(now.extStartSNOverridden)) - packetsMissing := intervalStats.packetsLost packetsLost := now.packetsLostOverridden - then.packetsLostOverridden if int32(packetsLost) < 0 { packetsLost = 0 @@ -1173,7 +1175,8 @@ func (r *RTPStats) DeltaInfoOverridden(snapshotId uint32) *RTPDeltaInfo { BytesPadding: intervalStats.bytesPadding, HeaderBytesPadding: intervalStats.headerBytesPadding, PacketsLost: packetsLost, - PacketsMissing: packetsMissing, + PacketsMissing: intervalStats.packetsLost, + PacketsOutOfOrder: intervalStats.packetsOutOfOrder, Frames: intervalStats.frames, RttMax: then.maxRtt, JitterMax: maxJitterTime, @@ -1409,7 +1412,7 @@ func (r *RTPStats) getSnInfoOutOfOrderPtr(sn uint16) int { return (r.snInfoWritePtr - int(offset) - 1) & SnInfoMask } -func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool) { +func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool, isOutOfOrder bool) { writePtr := 0 ooo := (sn - r.highestSN) > (1 << 15) if !ooo { @@ -1427,6 +1430,7 @@ func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, hdrSize uint16, payloadS snInfo.hdrSize = hdrSize snInfo.isPaddingOnly = payloadSize == 0 snInfo.marker = marker + snInfo.isOutOfOrder = isOutOfOrder } func (r *RTPStats) clearSnInfos(startInclusive uint16, endExclusive uint16) { @@ -1474,6 +1478,9 @@ func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) intervalStats.packets++ intervalStats.bytes += uint64(snInfo.pktSize) intervalStats.headerBytes += uint64(snInfo.hdrSize) + if snInfo.isOutOfOrder { + intervalStats.packetsOutOfOrder++ + } } if snInfo.marker { @@ -1822,6 +1829,7 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo { packetsLost := uint32(0) packetsMissing := uint32(0) + packetsOutOfOrder := uint32(0) frames := uint32(0) @@ -1860,6 +1868,7 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo { packetsLost += deltaInfo.PacketsLost packetsMissing += deltaInfo.PacketsMissing + packetsOutOfOrder += deltaInfo.PacketsOutOfOrder frames += deltaInfo.Frames @@ -1893,6 +1902,7 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo { HeaderBytesPadding: headerBytesPadding, PacketsLost: packetsLost, PacketsMissing: packetsMissing, + PacketsOutOfOrder: packetsOutOfOrder, Frames: frames, RttMax: maxRtt, JitterMax: maxJitter, diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index fb5c7e710..5acc6c0da 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -108,6 +108,7 @@ func (cs *ConnectionStats) updateScoreWithAggregate(agg *buffer.RTPDeltaInfo, at stat.packetsExpected = agg.Packets + agg.PacketsPadding stat.packetsLost = agg.PacketsLost stat.packetsMissing = agg.PacketsMissing + stat.packetsOutOfOrder = agg.PacketsOutOfOrder stat.bytes = agg.Bytes - agg.HeaderBytes // only use media payload size stat.rttMax = agg.RttMax stat.jitterMax = agg.JitterMax diff --git a/pkg/sfu/connectionquality/scorer.go b/pkg/sfu/connectionquality/scorer.go index 56e008a10..4d333e005 100644 --- a/pkg/sfu/connectionquality/scorer.go +++ b/pkg/sfu/connectionquality/scorer.go @@ -29,14 +29,15 @@ const ( // ------------------------------------------ type windowStat struct { - startedAt time.Time - duration time.Duration - packetsExpected uint32 - packetsLost uint32 - packetsMissing uint32 - bytes uint64 - rttMax uint32 - jitterMax float64 + startedAt time.Time + duration time.Duration + packetsExpected uint32 + packetsLost uint32 + packetsMissing uint32 + packetsOutOfOrder uint32 + bytes uint64 + rttMax uint32 + jitterMax float64 } func (w *windowStat) calculatePacketScore(plw float64, includeRTT bool, includeJitter bool) float64 { @@ -59,7 +60,28 @@ func (w *windowStat) calculatePacketScore(plw float64, includeRTT bool, includeJ delayEffect = (effectiveDelay - 120.0) / 10.0 } - actualLost := w.packetsLost - w.packetsMissing + // discount out-of-order packets from loss to deal with a scenario like + // 1. up stream has loss + // 2. down stream forwards with loss/hole in sequence number + // 3. down stream client reports a certain number of loss + // 4. while processing that, up stream could have retransmitted missing packets + // 5. those retransmitted packets are forwarded, + // - server's view: it has forwarded those packets + // - client's view: it had not seen those packets when sending RTCP RR + // so those retransmitted packets appear like down stream loss to server. + // + // retransmitted packets would have arrived out-of-order. So, discounting them + // will account for it. + // + // Note that packets can arrive out-of-order in the upstream during regular + // streaming as well, i. e. without loss + NACK + retransmit. Those will be + // discounted too. And that will skew the real loss. For example, let + // us say that 40 out of 100 packets were reported lost by down stream. + // These could be real losses. In the same window, 40 packets could have been + // delivered out-of-order by the up stream, thus cancelling out the real loss. + // But, those situations should be rare and is a compromise for not letting + // up stream loss penalise down stream. + actualLost := w.packetsLost - w.packetsMissing - w.packetsOutOfOrder if int32(actualLost) < 0 { actualLost = 0 } @@ -102,12 +124,13 @@ func (w *windowStat) calculateBitrateScore(expectedBitrate int64) float64 { } func (w *windowStat) String() string { - return fmt.Sprintf("start: %+v, dur: %+v, pe: %d, pl: %d, pm: %d, b: %d, rtt: %d, jitter: %0.2f", + return fmt.Sprintf("start: %+v, dur: %+v, pe: %d, pl: %d, pm: %d, pooo: %d, b: %d, rtt: %d, jitter: %0.2f", w.startedAt, w.duration, w.packetsExpected, w.packetsLost, w.packetsMissing, + w.packetsOutOfOrder, w.bytes, w.rttMax, w.jitterMax,