From 99601e6d41d26fdec36afb142db621e86f86112e Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 7 Mar 2023 14:32:43 +0530 Subject: [PATCH] Handle the case of no packets in down stream tracks better. (#1500) --- pkg/sfu/buffer/rtpstats.go | 68 +++++++++++++------- pkg/sfu/connectionquality/connectionstats.go | 40 ++++++------ pkg/sfu/connectionquality/scorer.go | 34 +++++----- 3 files changed, 77 insertions(+), 65 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 4f548182f..5c9efaf01 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -31,6 +31,8 @@ type RTPFlowState struct { } type IntervalStats struct { + earliestPktTime int64 + latestPktTime int64 packets uint32 bytes uint64 headerBytes uint64 @@ -54,6 +56,7 @@ type RTPDeltaInfo struct { BytesPadding uint64 HeaderBytesPadding uint64 PacketsLost uint32 + PacketsMissing uint32 Frames uint32 RttMax uint32 JitterMax float64 @@ -78,6 +81,7 @@ type Snapshot struct { } type SnInfo struct { + pktTime int64 hdrSize uint16 pktSize uint16 isPaddingOnly bool @@ -365,7 +369,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa isDuplicate = true } else { r.packetsLost-- - r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker) + r.setSnInfo(rtph.SequenceNumber, packetTime, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker) } } @@ -384,7 +388,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa r.clearSnInfos(r.highestSN+1, rtph.SequenceNumber) r.packetsLost += uint32(diff - 1) - r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker) + r.setSnInfo(rtph.SequenceNumber, packetTime, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker) if rtph.SequenceNumber < r.highestSN && !first { r.cycles++ @@ -434,7 +438,7 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSiz beforeAdjust := r.extStartSN r.extStartSN = uint32(rtph.SequenceNumber) - r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker) + r.setSnInfo(rtph.SequenceNumber, packetTime, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker) for _, s := range r.snapshots { if s.extStartSN == beforeAdjust { @@ -855,6 +859,9 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo { r.lock.RLock() defer r.lock.RUnlock() + startTime := then.startTime + endTime := now.startTime + packetsExpected := now.extStartSN - then.extStartSN if packetsExpected > NumSequenceNumbers { r.logger.Warnw( @@ -864,28 +871,27 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo { return nil } if packetsExpected == 0 { - r.logger.Debugw( - "no expected packets", - "info", fmt.Sprintf("start: %d @ %+v, end: %d @ %+v", then.extStartSN, then.startTime, now.extStartSN, now.startTime), - ) - return nil - } - - packetsLost := uint32(0) - intervalStats := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN)) - if r.params.IsReceiverReportDriven { - // by taking number of packets from interval report, packets not sent (because of missing packets in feed) will be accounted for - packetsExpected = intervalStats.packets + intervalStats.packetsPadding - if packetsExpected == 0 { - r.logger.Debugw( - "no expected packets in interval", - "info", fmt.Sprintf("start: %d @ %+v, end: %d @ %+v", then.extStartSN, then.startTime, now.extStartSN, now.startTime), - ) + if r.params.IsReceiverReportDriven { + // not received RTCP RR return nil } - // discount loss in the interval as those are packets not sent at all - packetsLost = now.packetsLostOverridden - then.packetsLostOverridden - intervalStats.packetsLost + return &RTPDeltaInfo{ + StartTime: startTime, + Duration: endTime.Sub(startTime), + } + } + + packetsLost := uint32(0) + packetsMissing := uint32(0) + intervalStats := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN)) + if r.params.IsReceiverReportDriven { + startTime = time.Unix(0, intervalStats.earliestPktTime) + endTime = time.Unix(0, intervalStats.latestPktTime) + + packetsMissing = intervalStats.packetsLost + + packetsLost = now.packetsLostOverridden - then.packetsLostOverridden if int32(packetsLost) < 0 { packetsLost = 0 } @@ -915,8 +921,8 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo { maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6 return &RTPDeltaInfo{ - StartTime: then.startTime, - Duration: now.startTime.Sub(then.startTime), + StartTime: startTime, + Duration: endTime.Sub(startTime), Packets: packetsExpected - intervalStats.packetsPadding, Bytes: intervalStats.bytes, HeaderBytes: intervalStats.headerBytes, @@ -927,6 +933,7 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo { BytesPadding: intervalStats.bytesPadding, HeaderBytesPadding: intervalStats.headerBytesPadding, PacketsLost: packetsLost, + PacketsMissing: packetsMissing, Frames: intervalStats.frames, RttMax: then.maxRtt, JitterMax: maxJitterTime, @@ -1150,7 +1157,7 @@ func (r *RTPStats) getSnInfoOutOfOrderPtr(sn uint16) int { return (r.snInfoWritePtr - int(offset) - 1) & SnInfoMask } -func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool) { +func (r *RTPStats) setSnInfo(sn uint16, pktTime int64, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool) { writePtr := 0 ooo := (sn - r.highestSN) > (1 << 15) if !ooo { @@ -1164,6 +1171,7 @@ func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, hdrSize uint16, payloadS } snInfo := &r.snInfos[writePtr] + snInfo.pktTime = pktTime snInfo.pktSize = pktSize snInfo.hdrSize = hdrSize snInfo.isPaddingOnly = payloadSize == 0 @@ -1173,7 +1181,9 @@ func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, hdrSize uint16, payloadS func (r *RTPStats) clearSnInfos(startInclusive uint16, endExclusive uint16) { for sn := startInclusive; sn != endExclusive; sn++ { snInfo := &r.snInfos[r.snInfoWritePtr] + snInfo.pktTime = 0 snInfo.pktSize = 0 + snInfo.hdrSize = 0 snInfo.isPaddingOnly = false snInfo.marker = false @@ -1219,6 +1229,14 @@ func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) if snInfo.marker { intervalStats.frames++ } + + if intervalStats.earliestPktTime == 0 || snInfo.pktTime < intervalStats.earliestPktTime { + intervalStats.earliestPktTime = snInfo.pktTime + } + + if intervalStats.latestPktTime == 0 || snInfo.pktTime > intervalStats.latestPktTime { + intervalStats.latestPktTime = snInfo.pktTime + } } if startInclusive == endExclusive { diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index 143aecd3b..3195b7a18 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -77,29 +77,26 @@ func (cs *ConnectionStats) GetScoreAndQuality() (float32, livekit.ConnectionQual } func (cs *ConnectionStats) updateScore(streams map[uint32]*buffer.StreamStatsWithLayers, at time.Time) float32 { - if len(streams) == 0 { - cs.scorer.Update(nil, at) - } else { - var stat windowStat - for _, s := range streams { - if stat.startedAt.IsZero() || stat.startedAt.After(s.RTPStats.StartTime) { - stat.startedAt = s.RTPStats.StartTime - } - if stat.duration < s.RTPStats.Duration { - stat.duration = s.RTPStats.Duration - } - stat.packetsExpected += s.RTPStats.Packets + s.RTPStats.PacketsPadding - stat.packetsLost += s.RTPStats.PacketsLost - if stat.rttMax < s.RTPStats.RttMax { - stat.rttMax = s.RTPStats.RttMax - } - if stat.jitterMax < s.RTPStats.JitterMax { - stat.jitterMax = s.RTPStats.JitterMax - } - stat.bytes += s.RTPStats.Bytes - s.RTPStats.HeaderBytes // only use media payload size + var stat windowStat + for _, s := range streams { + if stat.startedAt.IsZero() || stat.startedAt.After(s.RTPStats.StartTime) { + stat.startedAt = s.RTPStats.StartTime } - cs.scorer.Update(&stat, at) + if stat.duration < s.RTPStats.Duration { + stat.duration = s.RTPStats.Duration + } + stat.packetsExpected += s.RTPStats.Packets + s.RTPStats.PacketsPadding + stat.packetsLost += s.RTPStats.PacketsLost + stat.packetsMissing += s.RTPStats.PacketsMissing + if stat.rttMax < s.RTPStats.RttMax { + stat.rttMax = s.RTPStats.RttMax + } + if stat.jitterMax < s.RTPStats.JitterMax { + stat.jitterMax = s.RTPStats.JitterMax + } + stat.bytes += s.RTPStats.Bytes - s.RTPStats.HeaderBytes // only use media payload size } + cs.scorer.Update(&stat, at) mos, _ := cs.scorer.GetMOSAndQuality() return mos @@ -112,7 +109,6 @@ func (cs *ConnectionStats) getStat(at time.Time) *livekit.AnalyticsStat { streams := cs.params.GetDeltaStats() if len(streams) == 0 { - cs.updateScore(streams, at) return nil } diff --git a/pkg/sfu/connectionquality/scorer.go b/pkg/sfu/connectionquality/scorer.go index 0085f0ac5..1b935be6f 100644 --- a/pkg/sfu/connectionquality/scorer.go +++ b/pkg/sfu/connectionquality/scorer.go @@ -30,6 +30,7 @@ type windowStat struct { duration time.Duration packetsExpected uint32 packetsLost uint32 + packetsMissing uint32 bytes uint64 rttMax uint32 jitterMax float64 @@ -44,9 +45,14 @@ func (w *windowStat) calculatePacketScore(plw float64) float64 { delayEffect = (effectiveDelay - 120.0) / 10.0 } + actualLost := w.packetsLost - w.packetsMissing + if int32(actualLost) < 0 { + actualLost = 0 + } + lossEffect := float64(0.0) if w.packetsExpected > 0 { - lossEffect = float64(w.packetsLost) * 100.0 / float64(w.packetsExpected) + lossEffect = float64(actualLost) * 100.0 / float64(w.packetsExpected) } lossEffect *= plw @@ -256,26 +262,18 @@ func (q *qualityScorer) Update(stat *windowStat, at time.Time) { reason := "none" var ws *windowScore - if stat == nil { + if stat.packetsExpected == 0 { reason = "dry" - ws = newWindowScoreWithScore(&windowStat{ - startedAt: q.lastUpdateAt, - duration: at.Sub(q.lastUpdateAt), - }, poorScore) + ws = newWindowScoreWithScore(stat, poorScore) } else { - if stat.packetsExpected == 0 { - reason = "dry" - ws = newWindowScoreWithScore(stat, poorScore) + wsPacket := newWindowScorePacket(stat, q.getPacketLossWeight(stat)) + wsByte := newWindowScoreByte(stat, expectedBitrate) + if wsPacket.getScore() < wsByte.getScore() { + reason = "packet" + ws = wsPacket } else { - wsPacket := newWindowScorePacket(stat, q.getPacketLossWeight(stat)) - wsByte := newWindowScoreByte(stat, expectedBitrate) - if wsPacket.getScore() < wsByte.getScore() { - reason = "packet" - ws = wsPacket - } else { - reason = "bitrate" - ws = wsByte - } + reason = "bitrate" + ws = wsByte } } score := ws.getScore()