Use timed aggregator. (#1843)

* Use timed aggregator.

For aggregate bitrate and average distance from desired.

Also, clean up debug added to track leak.

* update deps
This commit is contained in:
Raja Subramanian
2023-07-01 10:21:15 +05:30
committed by GitHub
parent 06f9b574cb
commit e3954d1d64
4 changed files with 24 additions and 140 deletions
+1 -1
View File
@@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135
github.com/livekit/protocol v1.5.8
github.com/livekit/protocol v1.5.9-0.20230701042848-e5323cdb4ab3
github.com/livekit/psrpc v0.3.1
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
+2 -2
View File
@@ -124,8 +124,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135 h1:lWYbsondvqG69czxoACDwaJ/BoyD57BahCo70ZH+m4U=
github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135/go.mod h1:MRc0zSOSzXuFt0X218SgabzlaKevkvCckPgBEoHYc34=
github.com/livekit/protocol v1.5.8 h1:Q8Ctyq3rhv+trg0Hnd2rKncKAa9QAMlQO7zAUxvz4TU=
github.com/livekit/protocol v1.5.8/go.mod h1:B6hJiuXT84dHsUgaKHBo+ZLPX4XhklptYA2UbANSiNg=
github.com/livekit/protocol v1.5.9-0.20230701042848-e5323cdb4ab3 h1:OUWOLcsgEJ3o5p5NAlebqHzN0xJHBWl1HBUwMe/PZv4=
github.com/livekit/protocol v1.5.9-0.20230701042848-e5323cdb4ab3/go.mod h1:B6hJiuXT84dHsUgaKHBo+ZLPX4XhklptYA2UbANSiNg=
github.com/livekit/psrpc v0.3.1 h1:KfylgJHvoLQcc22t/oflwMOeSnx0c14G7cWsS+9MYS4=
github.com/livekit/psrpc v0.3.1/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
@@ -81,41 +81,33 @@ func (cs *ConnectionStats) OnStatsUpdate(fn func(cs *ConnectionStats, stat *live
}
func (cs *ConnectionStats) UpdateMute(isMuted bool, at time.Time) {
/* TODO-RESTORE
if cs.done.IsBroken() {
return
}
*/
cs.scorer.UpdateMute(isMuted, at)
}
func (cs *ConnectionStats) AddBitrateTransition(bitrate int64, at time.Time) {
/* TODO-RESTORE
if cs.done.IsBroken() {
return
}
*/
cs.scorer.AddBitrateTransition(bitrate, at)
}
func (cs *ConnectionStats) UpdateLayerMute(isMuted bool, at time.Time) {
/* TODO-RESTORE
if cs.done.IsBroken() {
return
}
*/
cs.scorer.UpdateLayerMute(isMuted, at)
}
func (cs *ConnectionStats) AddLayerTransition(distance float64, at time.Time) {
/* TODO-RESTORE
if cs.done.IsBroken() {
return
}
*/
cs.scorer.AddLayerTransition(distance, at)
}
+21 -129
View File
@@ -8,6 +8,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
)
const (
@@ -139,16 +140,6 @@ func (w *windowStat) String() string {
// ------------------------------------------
type bitrateTransition struct {
startedAt time.Time
bitrate int64
}
type layerTransition struct {
startedAt time.Time
distance float64
}
type qualityScorerParams struct {
PacketLossWeight float64
IncludeRTT bool
@@ -173,14 +164,20 @@ type qualityScorer struct {
maxPPS float64
bitrateTransitions []bitrateTransition
layerTransitions []layerTransition
aggregateBitrate *utils.TimedAggregator[int64]
layerDistance *utils.TimedAggregator[float64]
}
func newQualityScorer(params qualityScorerParams) *qualityScorer {
return &qualityScorer{
params: params,
score: maxScore,
aggregateBitrate: utils.NewTimedAggregator[int64](utils.TimedAggregatorParams{
CapNegativeValues: true,
}),
layerDistance: utils.NewTimedAggregator[float64](utils.TimedAggregatorParams{
CapNegativeValues: true,
}),
}
}
@@ -207,10 +204,7 @@ func (q *qualityScorer) AddBitrateTransition(bitrate int64, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.bitrateTransitions = append(q.bitrateTransitions, bitrateTransition{
startedAt: at,
bitrate: bitrate,
})
q.aggregateBitrate.AddSampleAt(bitrate, at)
if bitrate == 0 {
if !q.isLayerMuted() {
@@ -230,14 +224,8 @@ func (q *qualityScorer) UpdateLayerMute(isMuted bool, at time.Time) {
if isMuted {
if !q.isLayerMuted() {
q.bitrateTransitions = append(q.bitrateTransitions, bitrateTransition{
startedAt: at,
bitrate: 0,
})
q.layerTransitions = append(q.layerTransitions, layerTransition{
startedAt: at,
distance: 0.0,
})
q.aggregateBitrate.AddSampleAt(0, at)
q.layerDistance.AddSampleAt(0, at)
q.layerMutedAt = at
q.score = maxScore
}
@@ -252,24 +240,22 @@ func (q *qualityScorer) AddLayerTransition(distance float64, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
// TODO-REMOVE-AFTER-DEBUG
q.params.Logger.Debugw("adding layer transition", "at", at, "distance", distance)
q.layerTransitions = append(q.layerTransitions, layerTransition{
startedAt: at,
distance: distance,
})
q.layerDistance.AddSampleAt(distance, at)
}
func (q *qualityScorer) Update(stat *windowStat, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
// TODO-REMOVE-AFTER-DEBUG
q.params.Logger.Debugw("running update", "at", at, "stat", stat)
// always update transitions
expectedBitrate := q.getExpectedBitsAndUpdateTransitions(at)
expectedDistance := q.getExpectedDistanceAndUpdateTransitions(at)
expectedBitrate, _, err := q.aggregateBitrate.GetAggregateAndRestartAt(at)
if err != nil {
q.params.Logger.Warnw("error getting expected bitrate", err)
}
expectedDistance, err := q.layerDistance.GetAverageAndRestartAt(at)
if err != nil {
q.params.Logger.Warnw("error getting expected distance", err)
}
// nothing to do when muted or not unmuted for long enough
// NOTE: it is possible that unmute -> mute -> unmute transition happens in the
@@ -402,100 +388,6 @@ func (q *qualityScorer) getPacketLossWeight(stat *windowStat) float64 {
return packetRatio * packetRatio * q.params.PacketLossWeight
}
func (q *qualityScorer) getExpectedBitsAndUpdateTransitions(at time.Time) int64 {
if len(q.bitrateTransitions) == 0 {
return 0
}
var startedAt time.Time
var totalBits float64
for idx := 0; idx < len(q.bitrateTransitions)-1; idx++ {
bt := &q.bitrateTransitions[idx]
btNext := &q.bitrateTransitions[idx+1]
if bt.startedAt.After(q.lastUpdateAt) {
startedAt = bt.startedAt
} else {
startedAt = q.lastUpdateAt
}
totalBits += btNext.startedAt.Sub(startedAt).Seconds() * float64(bt.bitrate)
}
// last transition
bt := &q.bitrateTransitions[len(q.bitrateTransitions)-1]
if bt.startedAt.After(q.lastUpdateAt) {
startedAt = bt.startedAt
} else {
startedAt = q.lastUpdateAt
}
totalBits += at.Sub(startedAt).Seconds() * float64(bt.bitrate)
// set up last bit rate as the starting bit rate for next analysis window
q.bitrateTransitions = []bitrateTransition{{
startedAt: at,
bitrate: bt.bitrate,
}}
return int64(totalBits)
}
func (q *qualityScorer) getExpectedDistanceAndUpdateTransitions(at time.Time) float64 {
if len(q.layerTransitions) == 0 {
return 0
}
var startedAt time.Time
var totalDistance float64
totalDuration := time.Duration(0)
for idx := 0; idx < len(q.layerTransitions)-1; idx++ {
lt := &q.layerTransitions[idx]
ltNext := &q.layerTransitions[idx+1]
if lt.startedAt.After(q.lastUpdateAt) {
startedAt = lt.startedAt
} else {
startedAt = q.lastUpdateAt
}
dur := ltNext.startedAt.Sub(startedAt)
totalDuration += dur
dist := lt.distance
if dist < 0.0 {
// negative distances are overshoot, that does not compensate for shortfalls, so use optimal, i. e. 0 distance when overshooting
dist = 0.0
}
totalDistance += dur.Seconds() * dist
}
// last transition
lt := &q.layerTransitions[len(q.layerTransitions)-1]
if lt.startedAt.After(q.lastUpdateAt) {
startedAt = lt.startedAt
} else {
startedAt = q.lastUpdateAt
}
dur := at.Sub(startedAt)
totalDuration += dur
dist := lt.distance
if dist < 0.0 {
dist = 0.0
}
totalDistance += dur.Seconds() * dist
// set up last distance as the starting distance for next analysis window
q.layerTransitions = []layerTransition{{
startedAt: at,
distance: lt.distance,
}}
if totalDuration == 0 {
return 0
}
return totalDistance / totalDuration.Seconds()
}
func (q *qualityScorer) GetScoreAndQuality() (float32, livekit.ConnectionQuality) {
q.lock.RLock()
defer q.lock.RUnlock()