Prevent anachronous sample reading. (#1863)

* Prevenet anachronous sample reading.

Not so pretty way of solving this. Please let me know if you have
thoughts.

Passing in time allows testing easier. But, that also leads to
time reversal problems. Example scenario
1. Connection stats worker gets a time and initiates quality
   calculation.
2. A layer transition is recorded after that.
3. By the time, scorer is called to calculate score with time from Step
   1, there is time reversal and results in anachronous sample.

One option is to use a scorer lock in connection stats module and wrap
all calls to scorer in that lock, but that does not prevent the passed
in time stamps themselves getting out of order. Also, stand alond use
of scorer in some other context will be problematic.

Doing the hybrid thing of taking current time in scorer if passed in
time is zero so that scorer lock domain controls it.

* use zero time everywhere in normal flow

* make APIs with and without time passed in as Paul suggested
This commit is contained in:
Raja Subramanian
2023-07-10 08:39:52 +05:30
committed by GitHub
parent 8a1fc223da
commit e6f5f2f344
5 changed files with 200 additions and 88 deletions
+64 -17
View File
@@ -60,16 +60,27 @@ func NewConnectionStats(params ConnectionStatsParams) *ConnectionStats {
}
}
func (cs *ConnectionStats) Start(trackInfo *livekit.TrackInfo, at time.Time) {
func (cs *ConnectionStats) start(trackInfo *livekit.TrackInfo) {
cs.isVideo.Store(trackInfo.Type == livekit.TrackType_VIDEO)
go cs.updateStatsWorker()
}
func (cs *ConnectionStats) StartAt(trackInfo *livekit.TrackInfo, at time.Time) {
if cs.isStarted.Swap(true) {
return
}
cs.isVideo.Store(trackInfo.Type == livekit.TrackType_VIDEO)
cs.scorer.StartAt(at)
cs.start(trackInfo)
}
cs.scorer.Start(at)
func (cs *ConnectionStats) Start(trackInfo *livekit.TrackInfo) {
if cs.isStarted.Swap(true) {
return
}
go cs.updateStatsWorker()
cs.scorer.Start()
cs.start(trackInfo)
}
func (cs *ConnectionStats) Close() {
@@ -80,36 +91,68 @@ func (cs *ConnectionStats) OnStatsUpdate(fn func(cs *ConnectionStats, stat *live
cs.onStatsUpdate = fn
}
func (cs *ConnectionStats) UpdateMute(isMuted bool, at time.Time) {
func (cs *ConnectionStats) UpdateMuteAt(isMuted bool, at time.Time) {
if cs.done.IsBroken() {
return
}
cs.scorer.UpdateMute(isMuted, at)
cs.scorer.UpdateMuteAt(isMuted, at)
}
func (cs *ConnectionStats) AddBitrateTransition(bitrate int64, at time.Time) {
func (cs *ConnectionStats) UpdateMute(isMuted bool) {
if cs.done.IsBroken() {
return
}
cs.scorer.AddBitrateTransition(bitrate, at)
cs.scorer.UpdateMute(isMuted)
}
func (cs *ConnectionStats) UpdateLayerMute(isMuted bool, at time.Time) {
func (cs *ConnectionStats) AddBitrateTransitionAt(bitrate int64, at time.Time) {
if cs.done.IsBroken() {
return
}
cs.scorer.UpdateLayerMute(isMuted, at)
cs.scorer.AddBitrateTransitionAt(bitrate, at)
}
func (cs *ConnectionStats) AddLayerTransition(distance float64, at time.Time) {
func (cs *ConnectionStats) AddBitrateTransition(bitrate int64) {
if cs.done.IsBroken() {
return
}
cs.scorer.AddLayerTransition(distance, at)
cs.scorer.AddBitrateTransition(bitrate)
}
func (cs *ConnectionStats) UpdateLayerMuteAt(isMuted bool, at time.Time) {
if cs.done.IsBroken() {
return
}
cs.scorer.UpdateLayerMuteAt(isMuted, at)
}
func (cs *ConnectionStats) UpdateLayerMute(isMuted bool) {
if cs.done.IsBroken() {
return
}
cs.scorer.UpdateLayerMute(isMuted)
}
func (cs *ConnectionStats) AddLayerTransitionAt(distance float64, at time.Time) {
if cs.done.IsBroken() {
return
}
cs.scorer.AddLayerTransitionAt(distance, at)
}
func (cs *ConnectionStats) AddLayerTransition(distance float64) {
if cs.done.IsBroken() {
return
}
cs.scorer.AddLayerTransition(distance)
}
func (cs *ConnectionStats) GetScoreAndQuality() (float32, livekit.ConnectionQuality) {
@@ -129,7 +172,11 @@ func (cs *ConnectionStats) updateScoreWithAggregate(agg *buffer.RTPDeltaInfo, at
stat.rttMax = agg.RttMax
stat.jitterMax = agg.JitterMax
}
cs.scorer.Update(&stat, at)
if at.IsZero() {
cs.scorer.Update(&stat)
} else {
cs.scorer.UpdateAt(&stat, at)
}
mos, _ := cs.scorer.GetMOSAndQuality()
return mos
@@ -182,7 +229,7 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32,
return cs.updateScoreWithAggregate(agg, at), streams
}
func (cs *ConnectionStats) updateScore(at time.Time) (float32, map[uint32]*buffer.StreamStatsWithLayers) {
func (cs *ConnectionStats) updateScoreAt(at time.Time) (float32, map[uint32]*buffer.StreamStatsWithLayers) {
if cs.params.GetDeltaStats == nil {
return MinMOS, nil
}
@@ -227,8 +274,8 @@ func (cs *ConnectionStats) clearStreamingStart() {
cs.lock.Unlock()
}
func (cs *ConnectionStats) getStat(at time.Time) {
score, streams := cs.updateScore(at)
func (cs *ConnectionStats) getStat() {
score, streams := cs.updateScoreAt(time.Time{})
if cs.onStatsUpdate != nil && len(streams) != 0 {
analyticsStreams := make([]*livekit.AnalyticsStream, 0, len(streams))
@@ -279,7 +326,7 @@ func (cs *ConnectionStats) updateStatsWorker() {
return
}
cs.getStat(time.Now())
cs.getStat()
}
}
}
@@ -39,11 +39,11 @@ func TestConnectionQuality(t *testing.T) {
duration := 5 * time.Second
now := time.Now()
cs.Start(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration))
cs.UpdateMute(false, now.Add(-1*time.Second))
cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration))
cs.UpdateMuteAt(false, now.Add(-1*time.Second))
// no data and not enough unmute time should return default state which is EXCELLENT quality
cs.updateScore(now)
cs.updateScoreAt(now)
mos, quality := cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
@@ -58,7 +58,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
@@ -83,7 +83,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(2.1), mos)
require.Equal(t, livekit.ConnectionQuality_POOR, quality)
@@ -101,7 +101,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
@@ -117,7 +117,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
@@ -133,7 +133,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
@@ -150,7 +150,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
@@ -166,7 +166,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
@@ -182,7 +182,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
@@ -199,20 +199,20 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(2.1), mos)
require.Equal(t, livekit.ConnectionQuality_POOR, quality)
now = now.Add(duration)
cs.UpdateMute(true, now.Add(1*time.Second))
cs.UpdateMuteAt(true, now.Add(1*time.Second))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
// unmute at time so that next window does not satisfy the unmute time threshold.
// that means even if the next update has 0 packets, it should hold state and stay at EXCELLENT quality
cs.UpdateMute(false, now.Add(3*time.Second))
cs.UpdateMuteAt(false, now.Add(3*time.Second))
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: {
@@ -223,7 +223,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
@@ -239,15 +239,15 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(2.1), mos)
require.Equal(t, livekit.ConnectionQuality_POOR, quality)
// mute/unmute to bring quality back up
now = now.Add(duration)
cs.UpdateMute(true, now.Add(1*time.Second))
cs.UpdateMute(false, now.Add(2*time.Second))
cs.UpdateMuteAt(true, now.Add(1*time.Second))
cs.UpdateMuteAt(false, now.Add(2*time.Second))
// with lesser number of packet (simulating DTX).
// even higher loss (like 10%) should not knock down quality due to quadratic weighting of packet loss ratio
@@ -261,15 +261,15 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
// mute/unmute to bring quality back up
now = now.Add(duration)
cs.UpdateMute(true, now.Add(1*time.Second))
cs.UpdateMute(false, now.Add(2*time.Second))
cs.UpdateMuteAt(true, now.Add(1*time.Second))
cs.UpdateMuteAt(false, now.Add(2*time.Second))
// RTT and jitter can knock quality down.
// at 2% loss, quality should stay at EXCELLENT purely based on loss, but with added RTT/jitter, should drop to GOOD
@@ -285,19 +285,19 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
// mute/unmute to bring quality back up
now = now.Add(duration)
cs.UpdateMute(true, now.Add(1*time.Second))
cs.UpdateMute(false, now.Add(2*time.Second))
cs.UpdateMuteAt(true, now.Add(1*time.Second))
cs.UpdateMuteAt(false, now.Add(2*time.Second))
// bitrate based calculation can drop quality even if there is no loss
cs.AddBitrateTransition(1_000_000, now)
cs.AddBitrateTransition(2_000_000, now.Add(2*time.Second))
cs.AddBitrateTransitionAt(1_000_000, now)
cs.AddBitrateTransitionAt(2_000_000, now.Add(2*time.Second))
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: {
@@ -309,21 +309,21 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
// a transition to 0 (all layers stopped) should flip quality to EXCELLENT
now = now.Add(duration)
cs.AddBitrateTransition(0, now)
cs.AddBitrateTransitionAt(0, now)
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
// test layer mute via UpdateLayerMute API
cs.AddBitrateTransition(1_000_000, now)
cs.AddBitrateTransition(2_000_000, now.Add(2*time.Second))
cs.AddBitrateTransitionAt(1_000_000, now)
cs.AddBitrateTransitionAt(2_000_000, now.Add(2*time.Second))
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: {
@@ -335,20 +335,20 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
now = now.Add(duration)
cs.UpdateLayerMute(true, now)
cs.UpdateLayerMuteAt(true, now)
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
// setting bit rate after layer mute should layer unmute automatically
cs.AddBitrateTransition(1_000_000, now)
cs.AddBitrateTransition(2_000_000, now.Add(2*time.Second))
cs.AddBitrateTransitionAt(1_000_000, now)
cs.AddBitrateTransitionAt(2_000_000, now.Add(2*time.Second))
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: {
@@ -360,7 +360,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
@@ -375,8 +375,8 @@ func TestConnectionQuality(t *testing.T) {
duration := 5 * time.Second
now := time.Now()
cs.Start(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration))
cs.UpdateMute(false, now.Add(-1*time.Second))
cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration))
cs.UpdateMuteAt(false, now.Add(-1*time.Second))
// RTT does not knock quality down because it is dependent and hence not taken into account
// at 2% loss, quality should stay at EXCELLENT purely based on loss. With high RTT (700 ms)
@@ -392,7 +392,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality := cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
@@ -407,8 +407,8 @@ func TestConnectionQuality(t *testing.T) {
duration := 5 * time.Second
now := time.Now()
cs.Start(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration))
cs.UpdateMute(false, now.Add(-1*time.Second))
cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration))
cs.UpdateMuteAt(false, now.Add(-1*time.Second))
// Jitter does not knock quality down because it is dependent and hence not taken into account
// at 2% loss, quality should stay at EXCELLENT purely based on loss. With high jitter (200 ms)
@@ -424,7 +424,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality := cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
@@ -576,7 +576,7 @@ func TestConnectionQuality(t *testing.T) {
duration := 5 * time.Second
now := time.Now()
cs.Start(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration))
cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration))
for _, eq := range tc.expectedQualities {
streams = map[uint32]*buffer.StreamStatsWithLayers{
@@ -589,7 +589,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality := cs.GetScoreAndQuality()
require.Greater(t, eq.expectedMOS, mos)
require.Equal(t, eq.expectedQuality, quality)
@@ -673,10 +673,10 @@ func TestConnectionQuality(t *testing.T) {
duration := 5 * time.Second
now := time.Now()
cs.Start(&livekit.TrackInfo{Type: livekit.TrackType_VIDEO}, now)
cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_VIDEO}, now)
for _, tr := range tc.transitions {
cs.AddBitrateTransition(tr.bitrate, now.Add(tr.offset))
cs.AddBitrateTransitionAt(tr.bitrate, now.Add(tr.offset))
}
streams = map[uint32]*buffer.StreamStatsWithLayers{
@@ -689,7 +689,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality := cs.GetScoreAndQuality()
require.Greater(t, tc.expectedMOS, mos)
require.Equal(t, tc.expectedQuality, quality)
@@ -764,10 +764,10 @@ func TestConnectionQuality(t *testing.T) {
duration := 5 * time.Second
now := time.Now()
cs.Start(&livekit.TrackInfo{Type: livekit.TrackType_VIDEO}, now)
cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_VIDEO}, now)
for _, tr := range tc.transitions {
cs.AddLayerTransition(tr.distance, now.Add(tr.offset))
cs.AddLayerTransitionAt(tr.distance, now.Add(tr.offset))
}
streams = map[uint32]*buffer.StreamStatsWithLayers{
@@ -779,7 +779,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(now.Add(duration))
cs.updateScoreAt(now.Add(duration))
mos, quality := cs.GetScoreAndQuality()
require.Greater(t, tc.expectedMOS, mos)
require.Equal(t, tc.expectedQuality, quality)
+75 -9
View File
@@ -181,17 +181,25 @@ func newQualityScorer(params qualityScorerParams) *qualityScorer {
}
}
func (q *qualityScorer) Start(at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
func (q *qualityScorer) startAtLocked(at time.Time) {
q.lastUpdateAt = at
}
func (q *qualityScorer) UpdateMute(isMuted bool, at time.Time) {
func (q *qualityScorer) StartAt(at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.startAtLocked(at)
}
func (q *qualityScorer) Start() {
q.lock.Lock()
defer q.lock.Unlock()
q.startAtLocked(time.Now())
}
func (q *qualityScorer) updateMuteAtLocked(isMuted bool, at time.Time) {
if isMuted {
q.mutedAt = at
q.score = maxScore
@@ -200,10 +208,21 @@ func (q *qualityScorer) UpdateMute(isMuted bool, at time.Time) {
}
}
func (q *qualityScorer) AddBitrateTransition(bitrate int64, at time.Time) {
func (q *qualityScorer) UpdateMuteAt(isMuted bool, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.updateMuteAtLocked(isMuted, at)
}
func (q *qualityScorer) UpdateMute(isMuted bool) {
q.lock.Lock()
defer q.lock.Unlock()
q.updateMuteAtLocked(isMuted, time.Now())
}
func (q *qualityScorer) addBitrateTransitionAtLocked(bitrate int64, at time.Time) {
q.aggregateBitrate.AddSampleAt(bitrate, at)
if bitrate == 0 {
@@ -218,10 +237,21 @@ func (q *qualityScorer) AddBitrateTransition(bitrate int64, at time.Time) {
}
}
func (q *qualityScorer) UpdateLayerMute(isMuted bool, at time.Time) {
func (q *qualityScorer) AddBitrateTransitionAt(bitrate int64, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.addBitrateTransitionAtLocked(bitrate, at)
}
func (q *qualityScorer) AddBitrateTransition(bitrate int64) {
q.lock.Lock()
defer q.lock.Unlock()
q.addBitrateTransitionAtLocked(bitrate, time.Now())
}
func (q *qualityScorer) updateLayerMuteAtLocked(isMuted bool, at time.Time) {
if isMuted {
if !q.isLayerMuted() {
q.aggregateBitrate.AddSampleAt(0, at)
@@ -236,17 +266,39 @@ func (q *qualityScorer) UpdateLayerMute(isMuted bool, at time.Time) {
}
}
func (q *qualityScorer) AddLayerTransition(distance float64, at time.Time) {
func (q *qualityScorer) UpdateLayerMuteAt(isMuted bool, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.updateLayerMuteAtLocked(isMuted, at)
}
func (q *qualityScorer) UpdateLayerMute(isMuted bool) {
q.lock.Lock()
defer q.lock.Unlock()
q.updateLayerMuteAtLocked(isMuted, time.Now())
}
func (q *qualityScorer) addLayerTransitionAtLocked(distance float64, at time.Time) {
q.layerDistance.AddSampleAt(distance, at)
}
func (q *qualityScorer) Update(stat *windowStat, at time.Time) {
func (q *qualityScorer) AddLayerTransitionAt(distance float64, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.addLayerTransitionAtLocked(distance, at)
}
func (q *qualityScorer) AddLayerTransition(distance float64) {
q.lock.Lock()
defer q.lock.Unlock()
q.addLayerTransitionAtLocked(distance, time.Now())
}
func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) {
// always update transitions
expectedBitrate, _, err := q.aggregateBitrate.GetAggregateAndRestartAt(at)
if err != nil {
@@ -331,6 +383,20 @@ func (q *qualityScorer) Update(stat *windowStat, at time.Time) {
q.lastUpdateAt = at
}
func (q *qualityScorer) UpdateAt(stat *windowStat, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.updateAtLocked(stat, at)
}
func (q *qualityScorer) Update(stat *windowStat) {
q.lock.Lock()
defer q.lock.Unlock()
q.updateAtLocked(stat, time.Now())
}
func (q *qualityScorer) isMuted() bool {
return !q.mutedAt.IsZero() && (q.unmutedAt.IsZero() || q.mutedAt.After(q.unmutedAt))
}
+3 -3
View File
@@ -396,7 +396,7 @@ func (d *DownTrack) TrackInfoAvailable() {
if ti == nil {
return
}
d.connectionStats.Start(ti, time.Now())
d.connectionStats.Start(ti)
}
func (d *DownTrack) SetStreamAllocatorListener(listener DownTrackStreamAllocatorListener) {
@@ -740,7 +740,7 @@ func (d *DownTrack) handleMute(muted bool, isPub bool, changed bool, maxLayer bu
return
}
d.connectionStats.UpdateMute(d.forwarder.IsAnyMuted(), time.Now())
d.connectionStats.UpdateMute(d.forwarder.IsAnyMuted())
//
// Subscriber mute changes trigger a max layer notification.
@@ -955,7 +955,7 @@ func (d *DownTrack) maybeAddTransition(_bitrate int64, distance float64) {
return
}
d.connectionStats.AddLayerTransition(distance, time.Now())
d.connectionStats.AddLayerTransition(distance)
}
func (d *DownTrack) UpTrackBitrateReport(availableLayers []int32, bitrates Bitrates) {
+9 -10
View File
@@ -215,7 +215,7 @@ func NewWebRTCReceiver(
w.onStatsUpdate(w, stat)
}
})
w.connectionStats.Start(w.trackInfo, time.Now())
w.connectionStats.Start(w.trackInfo)
for _, ext := range receiver.GetParameters().HeaderExtensions {
if ext.URI == dd.ExtensionUrl {
@@ -375,7 +375,7 @@ func (w *WebRTCReceiver) SetUpTrackPaused(paused bool) {
}
w.bufferMu.RUnlock()
w.connectionStats.UpdateMute(paused, time.Now())
w.connectionStats.UpdateMute(paused)
}
func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error {
@@ -398,12 +398,11 @@ func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error {
func (w *WebRTCReceiver) SetMaxExpectedSpatialLayer(layer int32) {
w.streamTrackerManager.SetMaxExpectedSpatialLayer(layer)
now := time.Now()
if layer == buffer.InvalidLayerSpatial {
w.connectionStats.UpdateLayerMute(true, now)
w.connectionStats.UpdateLayerMute(true)
} else {
w.connectionStats.UpdateLayerMute(false, now)
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired(), now)
w.connectionStats.UpdateLayerMute(false)
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired())
}
}
@@ -413,7 +412,7 @@ func (w *WebRTCReceiver) OnAvailableLayersChanged() {
dt.UpTrackLayersChange()
}
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired(), time.Now())
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired())
}
// StreamTrackerManagerListener.OnBitrateAvailabilityChanged
@@ -429,7 +428,7 @@ func (w *WebRTCReceiver) OnMaxPublishedLayerChanged(maxPublishedLayer int32) {
dt.UpTrackMaxPublishedLayerChange(maxPublishedLayer)
}
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired(), time.Now())
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired())
}
// StreamTrackerManagerListener.OnMaxTemporalLayerSeenChanged
@@ -438,7 +437,7 @@ func (w *WebRTCReceiver) OnMaxTemporalLayerSeenChanged(maxTemporalLayerSeen int3
dt.UpTrackMaxTemporalLayerSeenChange(maxTemporalLayerSeen)
}
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired(), time.Now())
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired())
}
// StreamTrackerManagerListener.OnMaxAvailableLayerChanged
@@ -458,7 +457,7 @@ func (w *WebRTCReceiver) OnBitrateReport(availableLayers []int32, bitrates Bitra
dt.UpTrackBitrateReport(availableLayers, bitrates)
}
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired(), time.Now())
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired())
}
func (w *WebRTCReceiver) GetLayeredBitrate() ([]int32, Bitrates) {