diff --git a/go.mod b/go.mod index 9153ffb53..2a5b7b9e9 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135 - github.com/livekit/protocol v1.5.8 + github.com/livekit/protocol v1.5.9-0.20230701042848-e5323cdb4ab3 github.com/livekit/psrpc v0.3.1 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 89c61e2f4..1c1db1c6c 100644 --- a/go.sum +++ b/go.sum @@ -124,8 +124,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135 h1:lWYbsondvqG69czxoACDwaJ/BoyD57BahCo70ZH+m4U= github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135/go.mod h1:MRc0zSOSzXuFt0X218SgabzlaKevkvCckPgBEoHYc34= -github.com/livekit/protocol v1.5.8 h1:Q8Ctyq3rhv+trg0Hnd2rKncKAa9QAMlQO7zAUxvz4TU= -github.com/livekit/protocol v1.5.8/go.mod h1:B6hJiuXT84dHsUgaKHBo+ZLPX4XhklptYA2UbANSiNg= +github.com/livekit/protocol v1.5.9-0.20230701042848-e5323cdb4ab3 h1:OUWOLcsgEJ3o5p5NAlebqHzN0xJHBWl1HBUwMe/PZv4= +github.com/livekit/protocol v1.5.9-0.20230701042848-e5323cdb4ab3/go.mod h1:B6hJiuXT84dHsUgaKHBo+ZLPX4XhklptYA2UbANSiNg= github.com/livekit/psrpc v0.3.1 h1:KfylgJHvoLQcc22t/oflwMOeSnx0c14G7cWsS+9MYS4= github.com/livekit/psrpc v0.3.1/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index 107be9678..2b2601b74 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -81,41 +81,33 @@ func (cs *ConnectionStats) OnStatsUpdate(fn func(cs *ConnectionStats, stat *live } func (cs *ConnectionStats) UpdateMute(isMuted bool, at time.Time) { - /* TODO-RESTORE if cs.done.IsBroken() { return } - */ cs.scorer.UpdateMute(isMuted, at) } func (cs *ConnectionStats) AddBitrateTransition(bitrate int64, at time.Time) { - /* TODO-RESTORE if cs.done.IsBroken() { return } - */ cs.scorer.AddBitrateTransition(bitrate, at) } func (cs *ConnectionStats) UpdateLayerMute(isMuted bool, at time.Time) { - /* TODO-RESTORE if cs.done.IsBroken() { return } - */ cs.scorer.UpdateLayerMute(isMuted, at) } func (cs *ConnectionStats) AddLayerTransition(distance float64, at time.Time) { - /* TODO-RESTORE if cs.done.IsBroken() { return } - */ cs.scorer.AddLayerTransition(distance, at) } diff --git a/pkg/sfu/connectionquality/scorer.go b/pkg/sfu/connectionquality/scorer.go index b84d27777..5084d6ce8 100644 --- a/pkg/sfu/connectionquality/scorer.go +++ b/pkg/sfu/connectionquality/scorer.go @@ -8,6 +8,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" ) const ( @@ -139,16 +140,6 @@ func (w *windowStat) String() string { // ------------------------------------------ -type bitrateTransition struct { - startedAt time.Time - bitrate int64 -} - -type layerTransition struct { - startedAt time.Time - distance float64 -} - type qualityScorerParams struct { PacketLossWeight float64 IncludeRTT bool @@ -173,14 +164,20 @@ type qualityScorer struct { maxPPS float64 - bitrateTransitions []bitrateTransition - layerTransitions []layerTransition + aggregateBitrate *utils.TimedAggregator[int64] + layerDistance *utils.TimedAggregator[float64] } func newQualityScorer(params qualityScorerParams) *qualityScorer { return &qualityScorer{ params: params, score: maxScore, + aggregateBitrate: utils.NewTimedAggregator[int64](utils.TimedAggregatorParams{ + CapNegativeValues: true, + }), + layerDistance: utils.NewTimedAggregator[float64](utils.TimedAggregatorParams{ + CapNegativeValues: true, + }), } } @@ -207,10 +204,7 @@ func (q *qualityScorer) AddBitrateTransition(bitrate int64, at time.Time) { q.lock.Lock() defer q.lock.Unlock() - q.bitrateTransitions = append(q.bitrateTransitions, bitrateTransition{ - startedAt: at, - bitrate: bitrate, - }) + q.aggregateBitrate.AddSampleAt(bitrate, at) if bitrate == 0 { if !q.isLayerMuted() { @@ -230,14 +224,8 @@ func (q *qualityScorer) UpdateLayerMute(isMuted bool, at time.Time) { if isMuted { if !q.isLayerMuted() { - q.bitrateTransitions = append(q.bitrateTransitions, bitrateTransition{ - startedAt: at, - bitrate: 0, - }) - q.layerTransitions = append(q.layerTransitions, layerTransition{ - startedAt: at, - distance: 0.0, - }) + q.aggregateBitrate.AddSampleAt(0, at) + q.layerDistance.AddSampleAt(0, at) q.layerMutedAt = at q.score = maxScore } @@ -252,24 +240,22 @@ func (q *qualityScorer) AddLayerTransition(distance float64, at time.Time) { q.lock.Lock() defer q.lock.Unlock() - // TODO-REMOVE-AFTER-DEBUG - q.params.Logger.Debugw("adding layer transition", "at", at, "distance", distance) - q.layerTransitions = append(q.layerTransitions, layerTransition{ - startedAt: at, - distance: distance, - }) + q.layerDistance.AddSampleAt(distance, at) } func (q *qualityScorer) Update(stat *windowStat, at time.Time) { q.lock.Lock() defer q.lock.Unlock() - // TODO-REMOVE-AFTER-DEBUG - q.params.Logger.Debugw("running update", "at", at, "stat", stat) - // always update transitions - expectedBitrate := q.getExpectedBitsAndUpdateTransitions(at) - expectedDistance := q.getExpectedDistanceAndUpdateTransitions(at) + expectedBitrate, _, err := q.aggregateBitrate.GetAggregateAndRestartAt(at) + if err != nil { + q.params.Logger.Warnw("error getting expected bitrate", err) + } + expectedDistance, err := q.layerDistance.GetAverageAndRestartAt(at) + if err != nil { + q.params.Logger.Warnw("error getting expected distance", err) + } // nothing to do when muted or not unmuted for long enough // NOTE: it is possible that unmute -> mute -> unmute transition happens in the @@ -402,100 +388,6 @@ func (q *qualityScorer) getPacketLossWeight(stat *windowStat) float64 { return packetRatio * packetRatio * q.params.PacketLossWeight } -func (q *qualityScorer) getExpectedBitsAndUpdateTransitions(at time.Time) int64 { - if len(q.bitrateTransitions) == 0 { - return 0 - } - - var startedAt time.Time - var totalBits float64 - for idx := 0; idx < len(q.bitrateTransitions)-1; idx++ { - bt := &q.bitrateTransitions[idx] - btNext := &q.bitrateTransitions[idx+1] - - if bt.startedAt.After(q.lastUpdateAt) { - startedAt = bt.startedAt - } else { - startedAt = q.lastUpdateAt - } - totalBits += btNext.startedAt.Sub(startedAt).Seconds() * float64(bt.bitrate) - } - - // last transition - bt := &q.bitrateTransitions[len(q.bitrateTransitions)-1] - if bt.startedAt.After(q.lastUpdateAt) { - startedAt = bt.startedAt - } else { - startedAt = q.lastUpdateAt - } - totalBits += at.Sub(startedAt).Seconds() * float64(bt.bitrate) - - // set up last bit rate as the starting bit rate for next analysis window - q.bitrateTransitions = []bitrateTransition{{ - startedAt: at, - bitrate: bt.bitrate, - }} - - return int64(totalBits) -} - -func (q *qualityScorer) getExpectedDistanceAndUpdateTransitions(at time.Time) float64 { - if len(q.layerTransitions) == 0 { - return 0 - } - - var startedAt time.Time - var totalDistance float64 - totalDuration := time.Duration(0) - for idx := 0; idx < len(q.layerTransitions)-1; idx++ { - lt := &q.layerTransitions[idx] - ltNext := &q.layerTransitions[idx+1] - - if lt.startedAt.After(q.lastUpdateAt) { - startedAt = lt.startedAt - } else { - startedAt = q.lastUpdateAt - } - dur := ltNext.startedAt.Sub(startedAt) - totalDuration += dur - - dist := lt.distance - if dist < 0.0 { - // negative distances are overshoot, that does not compensate for shortfalls, so use optimal, i. e. 0 distance when overshooting - dist = 0.0 - } - totalDistance += dur.Seconds() * dist - } - - // last transition - lt := &q.layerTransitions[len(q.layerTransitions)-1] - if lt.startedAt.After(q.lastUpdateAt) { - startedAt = lt.startedAt - } else { - startedAt = q.lastUpdateAt - } - dur := at.Sub(startedAt) - totalDuration += dur - - dist := lt.distance - if dist < 0.0 { - dist = 0.0 - } - totalDistance += dur.Seconds() * dist - - // set up last distance as the starting distance for next analysis window - q.layerTransitions = []layerTransition{{ - startedAt: at, - distance: lt.distance, - }} - - if totalDuration == 0 { - return 0 - } - - return totalDistance / totalDuration.Seconds() -} - func (q *qualityScorer) GetScoreAndQuality() (float32, livekit.ConnectionQuality) { q.lock.RLock() defer q.lock.RUnlock()