From 4b7e5dc1cc5dc417f26420c854e9019a15cfe62f Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Tue, 9 Apr 2024 23:14:15 -0700 Subject: [PATCH] reduce gc from stream allocator rate monitor (#2638) * reduce gc from stream allocator rate monitor * deps * comment out rate monitor --- go.mod | 3 +- go.sum | 6 +- pkg/rtc/signalhandler.go | 7 -- pkg/sfu/streamallocator/ratemonitor.go | 88 +++++++++++----------- pkg/sfu/streamallocator/streamallocator.go | 10 +-- 5 files changed, 55 insertions(+), 59 deletions(-) diff --git a/go.mod b/go.mod index 0cb8c9a77..2b54b57af 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240406063423-a67d961689df - github.com/livekit/protocol v1.12.1-0.20240403204952-bc6c7ffd71f0 + github.com/livekit/protocol v1.12.1-0.20240410060226-b6a979d8cfce github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 @@ -55,6 +55,7 @@ require ( ) require ( + github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect diff --git a/go.sum b/go.sum index d616ba9e6..c105929f9 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/avast/retry-go/v4 v4.5.1 h1:AxIx0HGi4VZ3I02jr78j5lZ3M6x1E0Ivxa6b0pUUh7o= github.com/avast/retry-go/v4 v4.5.1/go.mod h1:/sipNsvNB3RRuT5iNcb6h73nw3IBmXJ/H3XrCQYSOpc= +github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= +github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bep/debounce v1.2.1 h1:v67fRdBA9UQu2NhLFXrSg0Brw7CexQekrBwDMM8bzeY= @@ -132,8 +134,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240406063423-a67d961689df h1:DVhJRlF6/CtiyxJVy3QsbS9bf7GyUMuRZONMwZxIWpY= github.com/livekit/mediatransportutil v0.0.0-20240406063423-a67d961689df/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.12.1-0.20240403204952-bc6c7ffd71f0 h1:BfQN4k6YG+XTdfbXnA2XZLAXinRNlJrGdTLAjyOW2Rg= -github.com/livekit/protocol v1.12.1-0.20240403204952-bc6c7ffd71f0/go.mod h1:mcv7L2DWB6iRckI++egmwFU7YEy3W+aLHnusNhioqi0= +github.com/livekit/protocol v1.12.1-0.20240410060226-b6a979d8cfce h1:cbuw8FQ5S1vX6avOmlj6f8IwliXsOGjOa3UR0YDucX8= +github.com/livekit/protocol v1.12.1-0.20240410060226-b6a979d8cfce/go.mod h1:jB6PWwf4tdMAwy+jxexqaVWuQiiklAtO4F5zZzWkTII= github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be h1:W1nCFZ19rYAORMBNX82NeVPHjADN0UyORr6refbUXpU= github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index e835cf6ba..cab96e23e 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -57,13 +57,6 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant case *livekit.SignalRequest_Leave: pLogger.Debugw("client leaving room") room.RemoveParticipant(participant.Identity(), participant.ID(), types.ParticipantCloseReasonClientRequestLeave) - case *livekit.SignalRequest_UpdateLayers: - err := room.UpdateVideoLayers(participant, msg.UpdateLayers) - if err != nil { - pLogger.Warnw("could not update video layers", err, - "update", msg.UpdateLayers) - return nil - } case *livekit.SignalRequest_SubscriptionPermission: err := room.UpdateSubscriptionPermission(participant, msg.SubscriptionPermission) if err != nil { diff --git a/pkg/sfu/streamallocator/ratemonitor.go b/pkg/sfu/streamallocator/ratemonitor.go index 9c2ba243e..f2275822f 100644 --- a/pkg/sfu/streamallocator/ratemonitor.go +++ b/pkg/sfu/streamallocator/ratemonitor.go @@ -16,6 +16,7 @@ package streamallocator import ( "fmt" + "sync" "time" "github.com/livekit/protocol/utils/timeseries" @@ -31,6 +32,7 @@ const ( // ------------------------------------------------ type RateMonitor struct { + mu sync.Mutex bitrateEstimate *timeseries.TimeSeries[int64] managedBytesSent *timeseries.TimeSeries[uint32] managedBytesRetransmitted *timeseries.TimeSeries[uint32] @@ -67,6 +69,9 @@ func NewRateMonitor() *RateMonitor { } func (r *RateMonitor) Update(estimate int64, managedBytesSent uint32, managedBytesRetransmitted uint32, unmanagedBytesSent uint32, unmanagedBytesRetransmitted uint32) { + r.mu.Lock() + defer r.mu.Unlock() + now := time.Now() r.bitrateEstimate.AddSampleAt(estimate, now) r.managedBytesSent.AddSampleAt(managedBytesSent, now) @@ -82,36 +87,36 @@ func (r *RateMonitor) Update(estimate int64, managedBytesSent uint32, managedByt // Reason is that the estimate could be higher than the actual rate by a significant amount. // So, updating periodically to flush out samples that will not contribute to queueing would be good. func (r *RateMonitor) GetQueuingGuess() float64 { - _, _, _, _, _, qd := r.getRates(queueMonitorWindow) - return qd + _, _, _, _, _, queuingDelay := r.getRates(queueMonitorWindow) + return queuingDelay } -func (r *RateMonitor) getRates(monitorDuration time.Duration) (float64, float64, float64, float64, float64, float64) { - threshold := time.Now().Add(-monitorDuration) - bitrateEstimateSamples := r.bitrateEstimate.GetSamplesAfter(threshold) - managedBytesSentSamples := r.managedBytesSent.GetSamplesAfter(threshold) - managedBytesRetransmittedSamples := r.managedBytesRetransmitted.GetSamplesAfter(threshold) - unmanagedBytesSentSamples := r.unmanagedBytesSent.GetSamplesAfter(threshold) - unmanagedBytesRetransmittedSamples := r.unmanagedBytesRetransmitted.GetSamplesAfter(threshold) +func (r *RateMonitor) getRates(monitorDuration time.Duration) (totalBitrateEstimate, totalManagedSent, totalManagedRetransmitted, totalUnmanagedSent, totalUnmanagedRetransmitted, queuingDelay float64) { + r.mu.Lock() + defer r.mu.Unlock() - if len(bitrateEstimateSamples) == 0 || (len(managedBytesSentSamples)+len(managedBytesRetransmittedSamples)+len(unmanagedBytesSentSamples)+len(unmanagedBytesRetransmittedSamples)) == 0 { - return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0 + threshold := time.Now().Add(-monitorDuration) + if !r.bitrateEstimate.HasSamplesAfter(threshold) || + !(r.managedBytesSent.HasSamplesAfter(threshold) || + r.managedBytesRetransmitted.HasSamplesAfter(threshold) || + r.unmanagedBytesSent.HasSamplesAfter(threshold) || + r.unmanagedBytesRetransmitted.HasSamplesAfter(threshold)) { + return } - totalBitrateEstimate := getTimeWeightedSum(bitrateEstimateSamples) - totalManagedSent := getRate(managedBytesSentSamples) * 8 - totalManagedRetransmitted := getRate(managedBytesRetransmittedSamples) * 8 - totalUnmanagedSent := getRate(unmanagedBytesSentSamples) * 8 - totalUnmanagedRetransmitted := getRate(unmanagedBytesRetransmittedSamples) * 8 + totalBitrateEstimate = getTimeWeightedSum(r.bitrateEstimate.ReverseIterateSamplesAfter(threshold)) + totalManagedSent = getRate(r.managedBytesSent.ReverseIterateSamplesAfter(threshold)) * 8 + totalManagedRetransmitted = getRate(r.managedBytesRetransmitted.ReverseIterateSamplesAfter(threshold)) * 8 + totalUnmanagedSent = getRate(r.unmanagedBytesSent.ReverseIterateSamplesAfter(threshold)) * 8 + totalUnmanagedRetransmitted = getRate(r.unmanagedBytesRetransmitted.ReverseIterateSamplesAfter(threshold)) * 8 totalBits := totalManagedSent + totalManagedRetransmitted + totalUnmanagedSent + totalUnmanagedRetransmitted - queuingDelay := float64(0.0) if totalBits > totalBitrateEstimate { - latestBitrateEstimate := bitrateEstimateSamples[len(bitrateEstimateSamples)-1].Value + latestBitrateEstimate := r.bitrateEstimate.Back().Value excessBits := totalBits - totalBitrateEstimate queuingDelay = excessBits / float64(latestBitrateEstimate) } - return totalBitrateEstimate, totalManagedSent, totalManagedRetransmitted, totalUnmanagedSent, totalUnmanagedRetransmitted, queuingDelay + return } func (r *RateMonitor) updateHistory() { @@ -124,10 +129,12 @@ func (r *RateMonitor) updateHistory() { return } + r.mu.Lock() r.history = append( r.history, fmt.Sprintf("t: %+v, e: %.2f, m: %.2f/%.2f, um: %.2f/%.2f, qd: %.2f", time.Now().UnixMilli(), e, m, mr, um, umr, qd), ) + r.mu.Unlock() } func (r *RateMonitor) GetHistory() []string { @@ -136,37 +143,30 @@ func (r *RateMonitor) GetHistory() []string { // ------------------------------------------------ -func getTimeWeightedSum[T int64 | uint32](samples []timeseries.TimeSeriesSample[T]) float64 { - if len(samples) < 2 { - return 0.0 - } - +func getTimeWeightedSum[T int64 | uint32](it timeseries.ReverseIterator[T]) float64 { sum := 0.0 - for i := 1; i < len(samples); i++ { - diff := samples[i].At.Sub(samples[i-1].At).Seconds() - sum += diff * float64(samples[i-1].Value) + next := time.Now() + for it.Next() { + diff := next.Sub(it.Value().At).Seconds() + sum += diff * float64(it.Value().Value) + next = it.Value().At } - - diff := time.Now().Sub(samples[len(samples)-1].At).Seconds() - sum += diff * float64(samples[len(samples)-1].Value) return sum } -func getRate[T int64 | uint32](samples []timeseries.TimeSeriesSample[T]) float64 { - if len(samples) < 2 { - return 0.0 +func getRate[T int64 | uint32](it timeseries.ReverseIterator[T]) float64 { + var sum float64 + var first, last time.Time + for it.Next() { + if last.IsZero() { + last = it.Value().At + } + first = it.Value().At + sum += float64(it.Value().Value) } - sum := 0.0 - // start at 1 as the first sample duration is not available - for i := 1; i < len(samples); i++ { - sum += float64(samples[i].Value) + if duration := last.Sub(first); duration > 0 { + return sum / duration.Seconds() } - - duration := samples[len(samples)-1].At.Sub(samples[0].At) - if duration == 0 { - return 0.0 - } - - return sum / duration.Seconds() + return 0 } diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 3abc28a6b..08ecd3421 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -157,7 +157,7 @@ type StreamAllocator struct { prober *Prober channelObserver *ChannelObserver - rateMonitor *RateMonitor + // rateMonitor *RateMonitor videoTracksMu sync.RWMutex videoTracks map[livekit.TrackID]*Track @@ -178,7 +178,7 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator { prober: NewProber(ProberParams{ Logger: params.Logger, }), - rateMonitor: NewRateMonitor(), + // rateMonitor: NewRateMonitor(), videoTracks: make(map[livekit.TrackID]*Track), eventsQueue: utils.NewOpsQueue(utils.OpsQueueParams{ Name: "stream-allocator", @@ -825,8 +825,8 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() { ) s.params.Logger.Debugw( fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity: experimental", action), - "rateHistory", s.rateMonitor.GetHistory(), - "expectedQueuing", s.rateMonitor.GetQueuingGuess(), + // "rateHistory", s.rateMonitor.GetHistory(), + // "expectedQueuing", s.rateMonitor.GetQueuingGuess(), "nackHistory", s.channelObserver.GetNackHistory(), "trackHistory", s.getTracksHistory(), ) @@ -1431,7 +1431,7 @@ func (s *StreamAllocator) monitorRate(estimate int64) { } } - s.rateMonitor.Update(estimate, managedBytesSent, managedBytesRetransmitted, unmanagedBytesSent, unmanagedBytesRetransmitted) + // s.rateMonitor.Update(estimate, managedBytesSent, managedBytesRetransmitted, unmanagedBytesSent, unmanagedBytesRetransmitted) } func (s *StreamAllocator) updateTracksHistory() {