diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index f1176f5d2..3b9ec4a8d 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -869,7 +869,7 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo { } if packetsExpected == 0 { if r.params.IsReceiverReportDriven { - // not received RTCP RR + // not received RTCP RR (OR) publisher is not producing any data return nil } diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index fb6caa91f..7eca5d515 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -15,9 +15,10 @@ import ( ) const ( - UpdateInterval = 5 * time.Second - processThreshold = 0.95 - noStatsTooLongMultiplier = 2 + UpdateInterval = 5 * time.Second + processThreshold = 0.95 + noStatsTooLongMultiplier = 2 + noReceiverReportTooLongThreshold = 10 * time.Second ) type ConnectionStatsParams struct { @@ -38,9 +39,10 @@ type ConnectionStats struct { onStatsUpdate func(cs *ConnectionStats, stat *livekit.AnalyticsStat) - lock sync.RWMutex - lastStatsAt time.Time - statsInProcess bool + lock sync.RWMutex + lastStatsAt time.Time + statsInProcess bool + lastReceiverReportAt time.Time scorer *qualityScorer @@ -103,6 +105,10 @@ func (cs *ConnectionStats) GetScoreAndQuality() (float32, livekit.ConnectionQual } func (cs *ConnectionStats) ReceiverReportReceived(at time.Time) { + cs.lock.Lock() + cs.lastReceiverReportAt = time.Now() + cs.lock.Unlock() + cs.getStat(at) } @@ -160,8 +166,8 @@ func (cs *ConnectionStats) updateLastStatsAt(at time.Time) { } func (cs *ConnectionStats) isTooLongSinceLastStats() bool { - cs.lock.Lock() - defer cs.lock.Unlock() + cs.lock.RLock() + defer cs.lock.RUnlock() interval := cs.params.UpdateInterval if interval == 0 { @@ -170,6 +176,13 @@ func (cs *ConnectionStats) isTooLongSinceLastStats() bool { return !cs.lastStatsAt.IsZero() && time.Since(cs.lastStatsAt) > interval*noStatsTooLongMultiplier } +func (cs *ConnectionStats) isTooLongSinceLastReceiverReport() bool { + cs.lock.RLock() + defer cs.lock.RUnlock() + + return !cs.lastReceiverReportAt.IsZero() && time.Since(cs.lastReceiverReportAt) > noReceiverReportTooLongThreshold +} + func (cs *ConnectionStats) clearInProcess() { cs.lock.Lock() defer cs.lock.Unlock() @@ -189,7 +202,7 @@ func (cs *ConnectionStats) getStat(at time.Time) { streams := cs.params.GetDeltaStats() if len(streams) == 0 { - if cs.isTooLongSinceLastStats() { + if cs.isTooLongSinceLastStats() && cs.isTooLongSinceLastReceiverReport() { cs.updateLastStatsAt(at) cs.updateScore(streams, at) }