diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index 2b2601b74..ce821b9ee 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -60,16 +60,27 @@ func NewConnectionStats(params ConnectionStatsParams) *ConnectionStats { } } -func (cs *ConnectionStats) Start(trackInfo *livekit.TrackInfo, at time.Time) { +func (cs *ConnectionStats) start(trackInfo *livekit.TrackInfo) { + cs.isVideo.Store(trackInfo.Type == livekit.TrackType_VIDEO) + go cs.updateStatsWorker() +} + +func (cs *ConnectionStats) StartAt(trackInfo *livekit.TrackInfo, at time.Time) { if cs.isStarted.Swap(true) { return } - cs.isVideo.Store(trackInfo.Type == livekit.TrackType_VIDEO) + cs.scorer.StartAt(at) + cs.start(trackInfo) +} - cs.scorer.Start(at) +func (cs *ConnectionStats) Start(trackInfo *livekit.TrackInfo) { + if cs.isStarted.Swap(true) { + return + } - go cs.updateStatsWorker() + cs.scorer.Start() + cs.start(trackInfo) } func (cs *ConnectionStats) Close() { @@ -80,36 +91,68 @@ func (cs *ConnectionStats) OnStatsUpdate(fn func(cs *ConnectionStats, stat *live cs.onStatsUpdate = fn } -func (cs *ConnectionStats) UpdateMute(isMuted bool, at time.Time) { +func (cs *ConnectionStats) UpdateMuteAt(isMuted bool, at time.Time) { if cs.done.IsBroken() { return } - cs.scorer.UpdateMute(isMuted, at) + cs.scorer.UpdateMuteAt(isMuted, at) } -func (cs *ConnectionStats) AddBitrateTransition(bitrate int64, at time.Time) { +func (cs *ConnectionStats) UpdateMute(isMuted bool) { if cs.done.IsBroken() { return } - cs.scorer.AddBitrateTransition(bitrate, at) + cs.scorer.UpdateMute(isMuted) } -func (cs *ConnectionStats) UpdateLayerMute(isMuted bool, at time.Time) { +func (cs *ConnectionStats) AddBitrateTransitionAt(bitrate int64, at time.Time) { if cs.done.IsBroken() { return } - cs.scorer.UpdateLayerMute(isMuted, at) + cs.scorer.AddBitrateTransitionAt(bitrate, at) } -func (cs *ConnectionStats) AddLayerTransition(distance float64, at time.Time) { +func (cs *ConnectionStats) AddBitrateTransition(bitrate int64) { if cs.done.IsBroken() { return } - cs.scorer.AddLayerTransition(distance, at) + cs.scorer.AddBitrateTransition(bitrate) +} + +func (cs *ConnectionStats) UpdateLayerMuteAt(isMuted bool, at time.Time) { + if cs.done.IsBroken() { + return + } + + cs.scorer.UpdateLayerMuteAt(isMuted, at) +} + +func (cs *ConnectionStats) UpdateLayerMute(isMuted bool) { + if cs.done.IsBroken() { + return + } + + cs.scorer.UpdateLayerMute(isMuted) +} + +func (cs *ConnectionStats) AddLayerTransitionAt(distance float64, at time.Time) { + if cs.done.IsBroken() { + return + } + + cs.scorer.AddLayerTransitionAt(distance, at) +} + +func (cs *ConnectionStats) AddLayerTransition(distance float64) { + if cs.done.IsBroken() { + return + } + + cs.scorer.AddLayerTransition(distance) } func (cs *ConnectionStats) GetScoreAndQuality() (float32, livekit.ConnectionQuality) { @@ -129,7 +172,11 @@ func (cs *ConnectionStats) updateScoreWithAggregate(agg *buffer.RTPDeltaInfo, at stat.rttMax = agg.RttMax stat.jitterMax = agg.JitterMax } - cs.scorer.Update(&stat, at) + if at.IsZero() { + cs.scorer.Update(&stat) + } else { + cs.scorer.UpdateAt(&stat, at) + } mos, _ := cs.scorer.GetMOSAndQuality() return mos @@ -182,7 +229,7 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32, return cs.updateScoreWithAggregate(agg, at), streams } -func (cs *ConnectionStats) updateScore(at time.Time) (float32, map[uint32]*buffer.StreamStatsWithLayers) { +func (cs *ConnectionStats) updateScoreAt(at time.Time) (float32, map[uint32]*buffer.StreamStatsWithLayers) { if cs.params.GetDeltaStats == nil { return MinMOS, nil } @@ -227,8 +274,8 @@ func (cs *ConnectionStats) clearStreamingStart() { cs.lock.Unlock() } -func (cs *ConnectionStats) getStat(at time.Time) { - score, streams := cs.updateScore(at) +func (cs *ConnectionStats) getStat() { + score, streams := cs.updateScoreAt(time.Time{}) if cs.onStatsUpdate != nil && len(streams) != 0 { analyticsStreams := make([]*livekit.AnalyticsStream, 0, len(streams)) @@ -279,7 +326,7 @@ func (cs *ConnectionStats) updateStatsWorker() { return } - cs.getStat(time.Now()) + cs.getStat() } } } diff --git a/pkg/sfu/connectionquality/connectionstats_test.go b/pkg/sfu/connectionquality/connectionstats_test.go index dce11601d..86d1302de 100644 --- a/pkg/sfu/connectionquality/connectionstats_test.go +++ b/pkg/sfu/connectionquality/connectionstats_test.go @@ -39,11 +39,11 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.Start(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration)) - cs.UpdateMute(false, now.Add(-1*time.Second)) + cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration)) + cs.UpdateMuteAt(false, now.Add(-1*time.Second)) // no data and not enough unmute time should return default state which is EXCELLENT quality - cs.updateScore(now) + cs.updateScoreAt(now) mos, quality := cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) @@ -58,7 +58,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) @@ -83,7 +83,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(2.1), mos) require.Equal(t, livekit.ConnectionQuality_POOR, quality) @@ -101,7 +101,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) @@ -117,7 +117,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) @@ -133,7 +133,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) @@ -150,7 +150,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) @@ -166,7 +166,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) @@ -182,7 +182,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) @@ -199,20 +199,20 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(2.1), mos) require.Equal(t, livekit.ConnectionQuality_POOR, quality) now = now.Add(duration) - cs.UpdateMute(true, now.Add(1*time.Second)) + cs.UpdateMuteAt(true, now.Add(1*time.Second)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) // unmute at time so that next window does not satisfy the unmute time threshold. // that means even if the next update has 0 packets, it should hold state and stay at EXCELLENT quality - cs.UpdateMute(false, now.Add(3*time.Second)) + cs.UpdateMuteAt(false, now.Add(3*time.Second)) streams = map[uint32]*buffer.StreamStatsWithLayers{ 1: { @@ -223,7 +223,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) @@ -239,15 +239,15 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(2.1), mos) require.Equal(t, livekit.ConnectionQuality_POOR, quality) // mute/unmute to bring quality back up now = now.Add(duration) - cs.UpdateMute(true, now.Add(1*time.Second)) - cs.UpdateMute(false, now.Add(2*time.Second)) + cs.UpdateMuteAt(true, now.Add(1*time.Second)) + cs.UpdateMuteAt(false, now.Add(2*time.Second)) // with lesser number of packet (simulating DTX). // even higher loss (like 10%) should not knock down quality due to quadratic weighting of packet loss ratio @@ -261,15 +261,15 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) // mute/unmute to bring quality back up now = now.Add(duration) - cs.UpdateMute(true, now.Add(1*time.Second)) - cs.UpdateMute(false, now.Add(2*time.Second)) + cs.UpdateMuteAt(true, now.Add(1*time.Second)) + cs.UpdateMuteAt(false, now.Add(2*time.Second)) // RTT and jitter can knock quality down. // at 2% loss, quality should stay at EXCELLENT purely based on loss, but with added RTT/jitter, should drop to GOOD @@ -285,19 +285,19 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) // mute/unmute to bring quality back up now = now.Add(duration) - cs.UpdateMute(true, now.Add(1*time.Second)) - cs.UpdateMute(false, now.Add(2*time.Second)) + cs.UpdateMuteAt(true, now.Add(1*time.Second)) + cs.UpdateMuteAt(false, now.Add(2*time.Second)) // bitrate based calculation can drop quality even if there is no loss - cs.AddBitrateTransition(1_000_000, now) - cs.AddBitrateTransition(2_000_000, now.Add(2*time.Second)) + cs.AddBitrateTransitionAt(1_000_000, now) + cs.AddBitrateTransitionAt(2_000_000, now.Add(2*time.Second)) streams = map[uint32]*buffer.StreamStatsWithLayers{ 1: { @@ -309,21 +309,21 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) // a transition to 0 (all layers stopped) should flip quality to EXCELLENT now = now.Add(duration) - cs.AddBitrateTransition(0, now) + cs.AddBitrateTransitionAt(0, now) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) // test layer mute via UpdateLayerMute API - cs.AddBitrateTransition(1_000_000, now) - cs.AddBitrateTransition(2_000_000, now.Add(2*time.Second)) + cs.AddBitrateTransitionAt(1_000_000, now) + cs.AddBitrateTransitionAt(2_000_000, now.Add(2*time.Second)) streams = map[uint32]*buffer.StreamStatsWithLayers{ 1: { @@ -335,20 +335,20 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) now = now.Add(duration) - cs.UpdateLayerMute(true, now) + cs.UpdateLayerMuteAt(true, now) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) // setting bit rate after layer mute should layer unmute automatically - cs.AddBitrateTransition(1_000_000, now) - cs.AddBitrateTransition(2_000_000, now.Add(2*time.Second)) + cs.AddBitrateTransitionAt(1_000_000, now) + cs.AddBitrateTransitionAt(2_000_000, now.Add(2*time.Second)) streams = map[uint32]*buffer.StreamStatsWithLayers{ 1: { @@ -360,7 +360,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality = cs.GetScoreAndQuality() require.Greater(t, float32(4.1), mos) require.Equal(t, livekit.ConnectionQuality_GOOD, quality) @@ -375,8 +375,8 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.Start(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration)) - cs.UpdateMute(false, now.Add(-1*time.Second)) + cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration)) + cs.UpdateMuteAt(false, now.Add(-1*time.Second)) // RTT does not knock quality down because it is dependent and hence not taken into account // at 2% loss, quality should stay at EXCELLENT purely based on loss. With high RTT (700 ms) @@ -392,7 +392,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality := cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) @@ -407,8 +407,8 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.Start(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration)) - cs.UpdateMute(false, now.Add(-1*time.Second)) + cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration)) + cs.UpdateMuteAt(false, now.Add(-1*time.Second)) // Jitter does not knock quality down because it is dependent and hence not taken into account // at 2% loss, quality should stay at EXCELLENT purely based on loss. With high jitter (200 ms) @@ -424,7 +424,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality := cs.GetScoreAndQuality() require.Greater(t, float32(4.6), mos) require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality) @@ -576,7 +576,7 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.Start(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration)) + cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration)) for _, eq := range tc.expectedQualities { streams = map[uint32]*buffer.StreamStatsWithLayers{ @@ -589,7 +589,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality := cs.GetScoreAndQuality() require.Greater(t, eq.expectedMOS, mos) require.Equal(t, eq.expectedQuality, quality) @@ -673,10 +673,10 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.Start(&livekit.TrackInfo{Type: livekit.TrackType_VIDEO}, now) + cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_VIDEO}, now) for _, tr := range tc.transitions { - cs.AddBitrateTransition(tr.bitrate, now.Add(tr.offset)) + cs.AddBitrateTransitionAt(tr.bitrate, now.Add(tr.offset)) } streams = map[uint32]*buffer.StreamStatsWithLayers{ @@ -689,7 +689,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality := cs.GetScoreAndQuality() require.Greater(t, tc.expectedMOS, mos) require.Equal(t, tc.expectedQuality, quality) @@ -764,10 +764,10 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.Start(&livekit.TrackInfo{Type: livekit.TrackType_VIDEO}, now) + cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_VIDEO}, now) for _, tr := range tc.transitions { - cs.AddLayerTransition(tr.distance, now.Add(tr.offset)) + cs.AddLayerTransitionAt(tr.distance, now.Add(tr.offset)) } streams = map[uint32]*buffer.StreamStatsWithLayers{ @@ -779,7 +779,7 @@ func TestConnectionQuality(t *testing.T) { }, }, } - cs.updateScore(now.Add(duration)) + cs.updateScoreAt(now.Add(duration)) mos, quality := cs.GetScoreAndQuality() require.Greater(t, tc.expectedMOS, mos) require.Equal(t, tc.expectedQuality, quality) diff --git a/pkg/sfu/connectionquality/scorer.go b/pkg/sfu/connectionquality/scorer.go index 5084d6ce8..a855246f2 100644 --- a/pkg/sfu/connectionquality/scorer.go +++ b/pkg/sfu/connectionquality/scorer.go @@ -181,17 +181,25 @@ func newQualityScorer(params qualityScorerParams) *qualityScorer { } } -func (q *qualityScorer) Start(at time.Time) { - q.lock.Lock() - defer q.lock.Unlock() - +func (q *qualityScorer) startAtLocked(at time.Time) { q.lastUpdateAt = at } -func (q *qualityScorer) UpdateMute(isMuted bool, at time.Time) { +func (q *qualityScorer) StartAt(at time.Time) { q.lock.Lock() defer q.lock.Unlock() + q.startAtLocked(at) +} + +func (q *qualityScorer) Start() { + q.lock.Lock() + defer q.lock.Unlock() + + q.startAtLocked(time.Now()) +} + +func (q *qualityScorer) updateMuteAtLocked(isMuted bool, at time.Time) { if isMuted { q.mutedAt = at q.score = maxScore @@ -200,10 +208,21 @@ func (q *qualityScorer) UpdateMute(isMuted bool, at time.Time) { } } -func (q *qualityScorer) AddBitrateTransition(bitrate int64, at time.Time) { +func (q *qualityScorer) UpdateMuteAt(isMuted bool, at time.Time) { q.lock.Lock() defer q.lock.Unlock() + q.updateMuteAtLocked(isMuted, at) +} + +func (q *qualityScorer) UpdateMute(isMuted bool) { + q.lock.Lock() + defer q.lock.Unlock() + + q.updateMuteAtLocked(isMuted, time.Now()) +} + +func (q *qualityScorer) addBitrateTransitionAtLocked(bitrate int64, at time.Time) { q.aggregateBitrate.AddSampleAt(bitrate, at) if bitrate == 0 { @@ -218,10 +237,21 @@ func (q *qualityScorer) AddBitrateTransition(bitrate int64, at time.Time) { } } -func (q *qualityScorer) UpdateLayerMute(isMuted bool, at time.Time) { +func (q *qualityScorer) AddBitrateTransitionAt(bitrate int64, at time.Time) { q.lock.Lock() defer q.lock.Unlock() + q.addBitrateTransitionAtLocked(bitrate, at) +} + +func (q *qualityScorer) AddBitrateTransition(bitrate int64) { + q.lock.Lock() + defer q.lock.Unlock() + + q.addBitrateTransitionAtLocked(bitrate, time.Now()) +} + +func (q *qualityScorer) updateLayerMuteAtLocked(isMuted bool, at time.Time) { if isMuted { if !q.isLayerMuted() { q.aggregateBitrate.AddSampleAt(0, at) @@ -236,17 +266,39 @@ func (q *qualityScorer) UpdateLayerMute(isMuted bool, at time.Time) { } } -func (q *qualityScorer) AddLayerTransition(distance float64, at time.Time) { +func (q *qualityScorer) UpdateLayerMuteAt(isMuted bool, at time.Time) { q.lock.Lock() defer q.lock.Unlock() + q.updateLayerMuteAtLocked(isMuted, at) +} + +func (q *qualityScorer) UpdateLayerMute(isMuted bool) { + q.lock.Lock() + defer q.lock.Unlock() + + q.updateLayerMuteAtLocked(isMuted, time.Now()) +} + +func (q *qualityScorer) addLayerTransitionAtLocked(distance float64, at time.Time) { q.layerDistance.AddSampleAt(distance, at) } -func (q *qualityScorer) Update(stat *windowStat, at time.Time) { +func (q *qualityScorer) AddLayerTransitionAt(distance float64, at time.Time) { q.lock.Lock() defer q.lock.Unlock() + q.addLayerTransitionAtLocked(distance, at) +} + +func (q *qualityScorer) AddLayerTransition(distance float64) { + q.lock.Lock() + defer q.lock.Unlock() + + q.addLayerTransitionAtLocked(distance, time.Now()) +} + +func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { // always update transitions expectedBitrate, _, err := q.aggregateBitrate.GetAggregateAndRestartAt(at) if err != nil { @@ -331,6 +383,20 @@ func (q *qualityScorer) Update(stat *windowStat, at time.Time) { q.lastUpdateAt = at } +func (q *qualityScorer) UpdateAt(stat *windowStat, at time.Time) { + q.lock.Lock() + defer q.lock.Unlock() + + q.updateAtLocked(stat, at) +} + +func (q *qualityScorer) Update(stat *windowStat) { + q.lock.Lock() + defer q.lock.Unlock() + + q.updateAtLocked(stat, time.Now()) +} + func (q *qualityScorer) isMuted() bool { return !q.mutedAt.IsZero() && (q.unmutedAt.IsZero() || q.mutedAt.After(q.unmutedAt)) } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 5e7644449..013dca537 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -396,7 +396,7 @@ func (d *DownTrack) TrackInfoAvailable() { if ti == nil { return } - d.connectionStats.Start(ti, time.Now()) + d.connectionStats.Start(ti) } func (d *DownTrack) SetStreamAllocatorListener(listener DownTrackStreamAllocatorListener) { @@ -740,7 +740,7 @@ func (d *DownTrack) handleMute(muted bool, isPub bool, changed bool, maxLayer bu return } - d.connectionStats.UpdateMute(d.forwarder.IsAnyMuted(), time.Now()) + d.connectionStats.UpdateMute(d.forwarder.IsAnyMuted()) // // Subscriber mute changes trigger a max layer notification. @@ -955,7 +955,7 @@ func (d *DownTrack) maybeAddTransition(_bitrate int64, distance float64) { return } - d.connectionStats.AddLayerTransition(distance, time.Now()) + d.connectionStats.AddLayerTransition(distance) } func (d *DownTrack) UpTrackBitrateReport(availableLayers []int32, bitrates Bitrates) { diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 26ca98ad7..837b534ab 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -215,7 +215,7 @@ func NewWebRTCReceiver( w.onStatsUpdate(w, stat) } }) - w.connectionStats.Start(w.trackInfo, time.Now()) + w.connectionStats.Start(w.trackInfo) for _, ext := range receiver.GetParameters().HeaderExtensions { if ext.URI == dd.ExtensionUrl { @@ -375,7 +375,7 @@ func (w *WebRTCReceiver) SetUpTrackPaused(paused bool) { } w.bufferMu.RUnlock() - w.connectionStats.UpdateMute(paused, time.Now()) + w.connectionStats.UpdateMute(paused) } func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error { @@ -398,12 +398,11 @@ func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error { func (w *WebRTCReceiver) SetMaxExpectedSpatialLayer(layer int32) { w.streamTrackerManager.SetMaxExpectedSpatialLayer(layer) - now := time.Now() if layer == buffer.InvalidLayerSpatial { - w.connectionStats.UpdateLayerMute(true, now) + w.connectionStats.UpdateLayerMute(true) } else { - w.connectionStats.UpdateLayerMute(false, now) - w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired(), now) + w.connectionStats.UpdateLayerMute(false) + w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired()) } } @@ -413,7 +412,7 @@ func (w *WebRTCReceiver) OnAvailableLayersChanged() { dt.UpTrackLayersChange() } - w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired(), time.Now()) + w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired()) } // StreamTrackerManagerListener.OnBitrateAvailabilityChanged @@ -429,7 +428,7 @@ func (w *WebRTCReceiver) OnMaxPublishedLayerChanged(maxPublishedLayer int32) { dt.UpTrackMaxPublishedLayerChange(maxPublishedLayer) } - w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired(), time.Now()) + w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired()) } // StreamTrackerManagerListener.OnMaxTemporalLayerSeenChanged @@ -438,7 +437,7 @@ func (w *WebRTCReceiver) OnMaxTemporalLayerSeenChanged(maxTemporalLayerSeen int3 dt.UpTrackMaxTemporalLayerSeenChange(maxTemporalLayerSeen) } - w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired(), time.Now()) + w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired()) } // StreamTrackerManagerListener.OnMaxAvailableLayerChanged @@ -458,7 +457,7 @@ func (w *WebRTCReceiver) OnBitrateReport(availableLayers []int32, bitrates Bitra dt.UpTrackBitrateReport(availableLayers, bitrates) } - w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired(), time.Now()) + w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired()) } func (w *WebRTCReceiver) GetLayeredBitrate() ([]int32, Bitrates) {