From 04269c100c61ccd0cc1c0291ffcd2b7359490dbf Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 7 Mar 2023 09:08:19 +0530 Subject: [PATCH] Connection quality misc changes (#1496) * Connectino quality misc changes 1. Call scorer.Update() with nil stat when no data available so that scorer can synthesise window with proper window time. 2. Substract out loss in interval to account for packets not sent at all. 3. Fix `packetsNotFound` variable in `getIntervalStats`. I remember this working at some point. Not sure if I fat fingered in another PR and deleted the increment line. 4. Logging a bit more when no packets expected. Those can get noisy especially when track is muted. But, seeing some unexplained instances of no packets leading to quality drop. So, temporary logging to get a bit more information. * correct spelling * Limit packet score minimum to 0.0 --- pkg/sfu/buffer/rtpstats.go | 33 ++++++++++++----- pkg/sfu/connectionquality/connectionstats.go | 38 +++++++++++--------- pkg/sfu/connectionquality/scorer.go | 9 +++-- 3 files changed, 53 insertions(+), 27 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 0f093a2b4..4f548182f 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -864,6 +864,10 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo { return nil } if packetsExpected == 0 { + r.logger.Debugw( + "no expected packets", + "info", fmt.Sprintf("start: %d @ %+v, end: %d @ %+v", then.extStartSN, then.startTime, now.extStartSN, now.startTime), + ) return nil } @@ -873,10 +877,15 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo { // by taking number of packets from interval report, packets not sent (because of missing packets in feed) will be accounted for packetsExpected = intervalStats.packets + intervalStats.packetsPadding if packetsExpected == 0 { + r.logger.Debugw( + "no expected packets in interval", + "info", fmt.Sprintf("start: %d @ %+v, end: %d @ %+v", then.extStartSN, then.startTime, now.extStartSN, now.startTime), + ) return nil } - packetsLost = now.packetsLostOverridden - then.packetsLostOverridden + // discount loss in the interval as those are packets not sent at all + packetsLost = now.packetsLostOverridden - then.packetsLostOverridden - intervalStats.packetsLost if int32(packetsLost) < 0 { packetsLost = 0 } @@ -884,7 +893,14 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo { if packetsLost > packetsExpected { r.logger.Warnw( "unexpected number of packets lost", - fmt.Errorf("start: %d, end: %d, expected: %d, lost: %d", then.extStartSN, now.extStartSN, packetsExpected, packetsLost), + fmt.Errorf( + "start: %d, end: %d, expected: %d, lost: report: %d, interval: %d", + then.extStartSN, + now.extStartSN, + packetsExpected, + now.packetsLostOverridden-then.packetsLostOverridden, + intervalStats.packetsLost, + ), ) packetsLost = packetsExpected } @@ -1123,12 +1139,12 @@ func (r *RTPStats) getSnInfoOutOfOrderPtr(sn uint16) int { offset := sn - r.highestSN if offset > 0 && offset < (1<<15) { return -1 // in-order, not expected, maybe too new - } else { - offset = r.highestSN - sn - if int(offset) >= SnInfoSize { - // too old, ignore - return -1 - } + } + + offset = r.highestSN - sn + if int(offset) >= SnInfoSize { + // too old, ignore + return -1 } return (r.snInfoWritePtr - int(offset) - 1) & SnInfoMask @@ -1180,6 +1196,7 @@ func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) processSN := func(sn uint16) { readPtr := r.getSnInfoOutOfOrderPtr(sn) if readPtr < 0 { + packetsNotFound++ return } diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index dd1c085b1..143aecd3b 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -77,25 +77,29 @@ func (cs *ConnectionStats) GetScoreAndQuality() (float32, livekit.ConnectionQual } func (cs *ConnectionStats) updateScore(streams map[uint32]*buffer.StreamStatsWithLayers, at time.Time) float32 { - var stat windowStat - for _, s := range streams { - if stat.startedAt.IsZero() || stat.startedAt.After(s.RTPStats.StartTime) { - stat.startedAt = s.RTPStats.StartTime + if len(streams) == 0 { + cs.scorer.Update(nil, at) + } else { + var stat windowStat + for _, s := range streams { + if stat.startedAt.IsZero() || stat.startedAt.After(s.RTPStats.StartTime) { + stat.startedAt = s.RTPStats.StartTime + } + if stat.duration < s.RTPStats.Duration { + stat.duration = s.RTPStats.Duration + } + stat.packetsExpected += s.RTPStats.Packets + s.RTPStats.PacketsPadding + stat.packetsLost += s.RTPStats.PacketsLost + if stat.rttMax < s.RTPStats.RttMax { + stat.rttMax = s.RTPStats.RttMax + } + if stat.jitterMax < s.RTPStats.JitterMax { + stat.jitterMax = s.RTPStats.JitterMax + } + stat.bytes += s.RTPStats.Bytes - s.RTPStats.HeaderBytes // only use media payload size } - if stat.duration < s.RTPStats.Duration { - stat.duration = s.RTPStats.Duration - } - stat.packetsExpected += s.RTPStats.Packets + s.RTPStats.PacketsPadding - stat.packetsLost += s.RTPStats.PacketsLost - if stat.rttMax < s.RTPStats.RttMax { - stat.rttMax = s.RTPStats.RttMax - } - if stat.jitterMax < s.RTPStats.JitterMax { - stat.jitterMax = s.RTPStats.JitterMax - } - stat.bytes += s.RTPStats.Bytes - s.RTPStats.HeaderBytes // only use media payload size + cs.scorer.Update(&stat, at) } - cs.scorer.Update(&stat, at) mos, _ := cs.scorer.GetMOSAndQuality() return mos diff --git a/pkg/sfu/connectionquality/scorer.go b/pkg/sfu/connectionquality/scorer.go index bdd1b7ac7..0085f0ac5 100644 --- a/pkg/sfu/connectionquality/scorer.go +++ b/pkg/sfu/connectionquality/scorer.go @@ -50,7 +50,12 @@ func (w *windowStat) calculatePacketScore(plw float64) float64 { } lossEffect *= plw - return maxScore - delayEffect - lossEffect + score := maxScore - delayEffect - lossEffect + if score < 0.0 { + score = 0.0 + } + + return score } func (w *windowStat) calculateByteScore(expectedBitrate int64) float64 { @@ -286,7 +291,7 @@ func (q *qualityScorer) Update(stat *windowStat, at time.Time) { q.score = score q.params.Logger.Infow( "quality drop", - "reaason", reason, + "reason", reason, "score", q.score, "quality", scoreToConnectionQuality(q.score), "window", ws,