reduce gc from stream allocator rate monitor (#2638)

* reduce gc from stream allocator rate monitor

* deps

* comment out rate monitor
This commit is contained in:
Paul Wells
2024-04-09 23:14:15 -07:00
committed by GitHub
parent 6b0f7403ef
commit 4b7e5dc1cc
5 changed files with 55 additions and 59 deletions
+2 -1
View File
@@ -19,7 +19,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240406063423-a67d961689df
github.com/livekit/protocol v1.12.1-0.20240403204952-bc6c7ffd71f0
github.com/livekit/protocol v1.12.1-0.20240410060226-b6a979d8cfce
github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
@@ -55,6 +55,7 @@ require (
)
require (
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
+4 -2
View File
@@ -1,5 +1,7 @@
github.com/avast/retry-go/v4 v4.5.1 h1:AxIx0HGi4VZ3I02jr78j5lZ3M6x1E0Ivxa6b0pUUh7o=
github.com/avast/retry-go/v4 v4.5.1/go.mod h1:/sipNsvNB3RRuT5iNcb6h73nw3IBmXJ/H3XrCQYSOpc=
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bep/debounce v1.2.1 h1:v67fRdBA9UQu2NhLFXrSg0Brw7CexQekrBwDMM8bzeY=
@@ -132,8 +134,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240406063423-a67d961689df h1:DVhJRlF6/CtiyxJVy3QsbS9bf7GyUMuRZONMwZxIWpY=
github.com/livekit/mediatransportutil v0.0.0-20240406063423-a67d961689df/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.12.1-0.20240403204952-bc6c7ffd71f0 h1:BfQN4k6YG+XTdfbXnA2XZLAXinRNlJrGdTLAjyOW2Rg=
github.com/livekit/protocol v1.12.1-0.20240403204952-bc6c7ffd71f0/go.mod h1:mcv7L2DWB6iRckI++egmwFU7YEy3W+aLHnusNhioqi0=
github.com/livekit/protocol v1.12.1-0.20240410060226-b6a979d8cfce h1:cbuw8FQ5S1vX6avOmlj6f8IwliXsOGjOa3UR0YDucX8=
github.com/livekit/protocol v1.12.1-0.20240410060226-b6a979d8cfce/go.mod h1:jB6PWwf4tdMAwy+jxexqaVWuQiiklAtO4F5zZzWkTII=
github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be h1:W1nCFZ19rYAORMBNX82NeVPHjADN0UyORr6refbUXpU=
github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
-7
View File
@@ -57,13 +57,6 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
case *livekit.SignalRequest_Leave:
pLogger.Debugw("client leaving room")
room.RemoveParticipant(participant.Identity(), participant.ID(), types.ParticipantCloseReasonClientRequestLeave)
case *livekit.SignalRequest_UpdateLayers:
err := room.UpdateVideoLayers(participant, msg.UpdateLayers)
if err != nil {
pLogger.Warnw("could not update video layers", err,
"update", msg.UpdateLayers)
return nil
}
case *livekit.SignalRequest_SubscriptionPermission:
err := room.UpdateSubscriptionPermission(participant, msg.SubscriptionPermission)
if err != nil {
+44 -44
View File
@@ -16,6 +16,7 @@ package streamallocator
import (
"fmt"
"sync"
"time"
"github.com/livekit/protocol/utils/timeseries"
@@ -31,6 +32,7 @@ const (
// ------------------------------------------------
type RateMonitor struct {
mu sync.Mutex
bitrateEstimate *timeseries.TimeSeries[int64]
managedBytesSent *timeseries.TimeSeries[uint32]
managedBytesRetransmitted *timeseries.TimeSeries[uint32]
@@ -67,6 +69,9 @@ func NewRateMonitor() *RateMonitor {
}
func (r *RateMonitor) Update(estimate int64, managedBytesSent uint32, managedBytesRetransmitted uint32, unmanagedBytesSent uint32, unmanagedBytesRetransmitted uint32) {
r.mu.Lock()
defer r.mu.Unlock()
now := time.Now()
r.bitrateEstimate.AddSampleAt(estimate, now)
r.managedBytesSent.AddSampleAt(managedBytesSent, now)
@@ -82,36 +87,36 @@ func (r *RateMonitor) Update(estimate int64, managedBytesSent uint32, managedByt
// Reason is that the estimate could be higher than the actual rate by a significant amount.
// So, updating periodically to flush out samples that will not contribute to queueing would be good.
func (r *RateMonitor) GetQueuingGuess() float64 {
_, _, _, _, _, qd := r.getRates(queueMonitorWindow)
return qd
_, _, _, _, _, queuingDelay := r.getRates(queueMonitorWindow)
return queuingDelay
}
func (r *RateMonitor) getRates(monitorDuration time.Duration) (float64, float64, float64, float64, float64, float64) {
threshold := time.Now().Add(-monitorDuration)
bitrateEstimateSamples := r.bitrateEstimate.GetSamplesAfter(threshold)
managedBytesSentSamples := r.managedBytesSent.GetSamplesAfter(threshold)
managedBytesRetransmittedSamples := r.managedBytesRetransmitted.GetSamplesAfter(threshold)
unmanagedBytesSentSamples := r.unmanagedBytesSent.GetSamplesAfter(threshold)
unmanagedBytesRetransmittedSamples := r.unmanagedBytesRetransmitted.GetSamplesAfter(threshold)
func (r *RateMonitor) getRates(monitorDuration time.Duration) (totalBitrateEstimate, totalManagedSent, totalManagedRetransmitted, totalUnmanagedSent, totalUnmanagedRetransmitted, queuingDelay float64) {
r.mu.Lock()
defer r.mu.Unlock()
if len(bitrateEstimateSamples) == 0 || (len(managedBytesSentSamples)+len(managedBytesRetransmittedSamples)+len(unmanagedBytesSentSamples)+len(unmanagedBytesRetransmittedSamples)) == 0 {
return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
threshold := time.Now().Add(-monitorDuration)
if !r.bitrateEstimate.HasSamplesAfter(threshold) ||
!(r.managedBytesSent.HasSamplesAfter(threshold) ||
r.managedBytesRetransmitted.HasSamplesAfter(threshold) ||
r.unmanagedBytesSent.HasSamplesAfter(threshold) ||
r.unmanagedBytesRetransmitted.HasSamplesAfter(threshold)) {
return
}
totalBitrateEstimate := getTimeWeightedSum(bitrateEstimateSamples)
totalManagedSent := getRate(managedBytesSentSamples) * 8
totalManagedRetransmitted := getRate(managedBytesRetransmittedSamples) * 8
totalUnmanagedSent := getRate(unmanagedBytesSentSamples) * 8
totalUnmanagedRetransmitted := getRate(unmanagedBytesRetransmittedSamples) * 8
totalBitrateEstimate = getTimeWeightedSum(r.bitrateEstimate.ReverseIterateSamplesAfter(threshold))
totalManagedSent = getRate(r.managedBytesSent.ReverseIterateSamplesAfter(threshold)) * 8
totalManagedRetransmitted = getRate(r.managedBytesRetransmitted.ReverseIterateSamplesAfter(threshold)) * 8
totalUnmanagedSent = getRate(r.unmanagedBytesSent.ReverseIterateSamplesAfter(threshold)) * 8
totalUnmanagedRetransmitted = getRate(r.unmanagedBytesRetransmitted.ReverseIterateSamplesAfter(threshold)) * 8
totalBits := totalManagedSent + totalManagedRetransmitted + totalUnmanagedSent + totalUnmanagedRetransmitted
queuingDelay := float64(0.0)
if totalBits > totalBitrateEstimate {
latestBitrateEstimate := bitrateEstimateSamples[len(bitrateEstimateSamples)-1].Value
latestBitrateEstimate := r.bitrateEstimate.Back().Value
excessBits := totalBits - totalBitrateEstimate
queuingDelay = excessBits / float64(latestBitrateEstimate)
}
return totalBitrateEstimate, totalManagedSent, totalManagedRetransmitted, totalUnmanagedSent, totalUnmanagedRetransmitted, queuingDelay
return
}
func (r *RateMonitor) updateHistory() {
@@ -124,10 +129,12 @@ func (r *RateMonitor) updateHistory() {
return
}
r.mu.Lock()
r.history = append(
r.history,
fmt.Sprintf("t: %+v, e: %.2f, m: %.2f/%.2f, um: %.2f/%.2f, qd: %.2f", time.Now().UnixMilli(), e, m, mr, um, umr, qd),
)
r.mu.Unlock()
}
func (r *RateMonitor) GetHistory() []string {
@@ -136,37 +143,30 @@ func (r *RateMonitor) GetHistory() []string {
// ------------------------------------------------
func getTimeWeightedSum[T int64 | uint32](samples []timeseries.TimeSeriesSample[T]) float64 {
if len(samples) < 2 {
return 0.0
}
func getTimeWeightedSum[T int64 | uint32](it timeseries.ReverseIterator[T]) float64 {
sum := 0.0
for i := 1; i < len(samples); i++ {
diff := samples[i].At.Sub(samples[i-1].At).Seconds()
sum += diff * float64(samples[i-1].Value)
next := time.Now()
for it.Next() {
diff := next.Sub(it.Value().At).Seconds()
sum += diff * float64(it.Value().Value)
next = it.Value().At
}
diff := time.Now().Sub(samples[len(samples)-1].At).Seconds()
sum += diff * float64(samples[len(samples)-1].Value)
return sum
}
func getRate[T int64 | uint32](samples []timeseries.TimeSeriesSample[T]) float64 {
if len(samples) < 2 {
return 0.0
func getRate[T int64 | uint32](it timeseries.ReverseIterator[T]) float64 {
var sum float64
var first, last time.Time
for it.Next() {
if last.IsZero() {
last = it.Value().At
}
first = it.Value().At
sum += float64(it.Value().Value)
}
sum := 0.0
// start at 1 as the first sample duration is not available
for i := 1; i < len(samples); i++ {
sum += float64(samples[i].Value)
if duration := last.Sub(first); duration > 0 {
return sum / duration.Seconds()
}
duration := samples[len(samples)-1].At.Sub(samples[0].At)
if duration == 0 {
return 0.0
}
return sum / duration.Seconds()
return 0
}
+5 -5
View File
@@ -157,7 +157,7 @@ type StreamAllocator struct {
prober *Prober
channelObserver *ChannelObserver
rateMonitor *RateMonitor
// rateMonitor *RateMonitor
videoTracksMu sync.RWMutex
videoTracks map[livekit.TrackID]*Track
@@ -178,7 +178,7 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator {
prober: NewProber(ProberParams{
Logger: params.Logger,
}),
rateMonitor: NewRateMonitor(),
// rateMonitor: NewRateMonitor(),
videoTracks: make(map[livekit.TrackID]*Track),
eventsQueue: utils.NewOpsQueue(utils.OpsQueueParams{
Name: "stream-allocator",
@@ -825,8 +825,8 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() {
)
s.params.Logger.Debugw(
fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity: experimental", action),
"rateHistory", s.rateMonitor.GetHistory(),
"expectedQueuing", s.rateMonitor.GetQueuingGuess(),
// "rateHistory", s.rateMonitor.GetHistory(),
// "expectedQueuing", s.rateMonitor.GetQueuingGuess(),
"nackHistory", s.channelObserver.GetNackHistory(),
"trackHistory", s.getTracksHistory(),
)
@@ -1431,7 +1431,7 @@ func (s *StreamAllocator) monitorRate(estimate int64) {
}
}
s.rateMonitor.Update(estimate, managedBytesSent, managedBytesRetransmitted, unmanagedBytesSent, unmanagedBytesRetransmitted)
// s.rateMonitor.Update(estimate, managedBytesSent, managedBytesRetransmitted, unmanagedBytesSent, unmanagedBytesRetransmitted)
}
func (s *StreamAllocator) updateTracksHistory() {