From 09c0b25787df9a4f0f7366515c201b35a85f6228 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 24 Apr 2023 23:39:30 +0530 Subject: [PATCH] Ensure that RR is not received for a while before running scorer on nil (#1653) data. Without the check, it was getting tripped by publisher not publishing any data. Both conditions returned nil, but in one case, the receiver report should have been received, but no movement in number of packets. --- pkg/sfu/buffer/rtpstats.go | 2 +- pkg/sfu/connectionquality/connectionstats.go | 31 ++++++++++++++------ 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index f1176f5d2..3b9ec4a8d 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -869,7 +869,7 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo { } if packetsExpected == 0 { if r.params.IsReceiverReportDriven { - // not received RTCP RR + // not received RTCP RR (OR) publisher is not producing any data return nil } diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index fb6caa91f..7eca5d515 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -15,9 +15,10 @@ import ( ) const ( - UpdateInterval = 5 * time.Second - processThreshold = 0.95 - noStatsTooLongMultiplier = 2 + UpdateInterval = 5 * time.Second + processThreshold = 0.95 + noStatsTooLongMultiplier = 2 + noReceiverReportTooLongThreshold = 10 * time.Second ) type ConnectionStatsParams struct { @@ -38,9 +39,10 @@ type ConnectionStats struct { onStatsUpdate func(cs *ConnectionStats, stat *livekit.AnalyticsStat) - lock sync.RWMutex - lastStatsAt time.Time - statsInProcess bool + lock sync.RWMutex + lastStatsAt time.Time + statsInProcess bool + lastReceiverReportAt time.Time scorer *qualityScorer @@ -103,6 +105,10 @@ func (cs *ConnectionStats) GetScoreAndQuality() (float32, livekit.ConnectionQual } func (cs *ConnectionStats) ReceiverReportReceived(at time.Time) { + cs.lock.Lock() + cs.lastReceiverReportAt = time.Now() + cs.lock.Unlock() + cs.getStat(at) } @@ -160,8 +166,8 @@ func (cs *ConnectionStats) updateLastStatsAt(at time.Time) { } func (cs *ConnectionStats) isTooLongSinceLastStats() bool { - cs.lock.Lock() - defer cs.lock.Unlock() + cs.lock.RLock() + defer cs.lock.RUnlock() interval := cs.params.UpdateInterval if interval == 0 { @@ -170,6 +176,13 @@ func (cs *ConnectionStats) isTooLongSinceLastStats() bool { return !cs.lastStatsAt.IsZero() && time.Since(cs.lastStatsAt) > interval*noStatsTooLongMultiplier } +func (cs *ConnectionStats) isTooLongSinceLastReceiverReport() bool { + cs.lock.RLock() + defer cs.lock.RUnlock() + + return !cs.lastReceiverReportAt.IsZero() && time.Since(cs.lastReceiverReportAt) > noReceiverReportTooLongThreshold +} + func (cs *ConnectionStats) clearInProcess() { cs.lock.Lock() defer cs.lock.Unlock() @@ -189,7 +202,7 @@ func (cs *ConnectionStats) getStat(at time.Time) { streams := cs.params.GetDeltaStats() if len(streams) == 0 { - if cs.isTooLongSinceLastStats() { + if cs.isTooLongSinceLastStats() && cs.isTooLongSinceLastReceiverReport() { cs.updateLastStatsAt(at) cs.updateScore(streams, at) }