From c2335968deb61615655af00aeeb762161cddd1d0 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 14 Mar 2023 16:27:39 +0530 Subject: [PATCH] Prevent evaluation over small wkndow. (#1516) With push model (i. e. connection quality evaluation triggered by reception of RTCP receiver report), it is possible that a report is received quickly after a track is started (especially with video). Those should not trigger a quality evaluation. Set `lastStatsAt` in `Start` routine and ensure that start has been called and enough time has passed since last stats time to avoid small windows. --- pkg/sfu/connectionquality/connectionstats.go | 32 ++++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index ac6506bd8..a1a320249 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -29,8 +29,10 @@ type ConnectionStatsParams struct { } type ConnectionStats struct { - params ConnectionStatsParams - isVideo atomic.Bool + params ConnectionStatsParams + + isStarted atomic.Bool + isVideo atomic.Bool onStatsUpdate func(cs *ConnectionStats, stat *livekit.AnalyticsStat) @@ -55,8 +57,14 @@ func NewConnectionStats(params ConnectionStatsParams) *ConnectionStats { } func (cs *ConnectionStats) Start(trackInfo *livekit.TrackInfo, at time.Time) { + if cs.isStarted.Swap(true) { + return + } + cs.isVideo.Store(trackInfo.Type == livekit.TrackType_VIDEO) + cs.updateLastStatsAt(time.Now()) // force an initial wait + cs.scorer.Start(at) go cs.updateStatsWorker() @@ -130,7 +138,7 @@ func (cs *ConnectionStats) maybeMarkInProcess() bool { interval = UpdateInterval } - if cs.lastStatsAt.IsZero() || time.Since(cs.lastStatsAt) > time.Duration(processThreshold*float64(interval)) { + if cs.isStarted.Load() && time.Since(cs.lastStatsAt) > time.Duration(processThreshold*float64(interval)) { cs.statsInProcess = true return true } @@ -138,14 +146,18 @@ func (cs *ConnectionStats) maybeMarkInProcess() bool { return false } -func (cs *ConnectionStats) updateInProcess(isAvailable bool, at time.Time) { +func (cs *ConnectionStats) updateLastStatsAt(at time.Time) { + cs.lock.Lock() + defer cs.lock.Unlock() + + cs.lastStatsAt = at +} + +func (cs *ConnectionStats) clearInProcess() { cs.lock.Lock() defer cs.lock.Unlock() cs.statsInProcess = false - if isAvailable { - cs.lastStatsAt = at - } } func (cs *ConnectionStats) getStat(at time.Time) { @@ -160,12 +172,12 @@ func (cs *ConnectionStats) getStat(at time.Time) { streams := cs.params.GetDeltaStats() if len(streams) == 0 { - cs.updateInProcess(false, at) + cs.clearInProcess() return } // stats available, update last stats time - cs.updateInProcess(true, at) + cs.updateLastStatsAt(at) score := cs.updateScore(streams, at) @@ -194,6 +206,8 @@ func (cs *ConnectionStats) getStat(at time.Time) { Mime: cs.params.MimeType, }) } + + cs.clearInProcess() } func (cs *ConnectionStats) updateStatsWorker() {