From bfba6feed44c534459003aab2a1c1e583630d2c6 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 24 Sep 2025 14:45:57 +0530 Subject: [PATCH] Adjust stream allocator ping interval based on state. (#3951) * Adjust stream allocator ping interval based on state. In steady state, does a 15 second ping. While deficient, to be able to react to probes faster, it pings at 100ms interval. * clean up * log ops queue not able to wake up --- pkg/sfu/streamallocator/streamallocator.go | 58 ++++++++++++---------- pkg/sfu/streamallocator/track.go | 4 +- pkg/utils/opsqueue.go | 1 + 3 files changed, 36 insertions(+), 27 deletions(-) diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 3cc67a478..19c9aca22 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -37,21 +37,24 @@ import ( ) const ( - ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps + cChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps - PriorityMin = uint8(1) - PriorityMax = uint8(255) - PriorityDefaultScreenshare = PriorityMax - PriorityDefaultVideo = PriorityMin + cPriorityMin = uint8(1) + cPriorityMax = uint8(255) + cPriorityDefaultScreenshare = cPriorityMax + cPriorityDefaultVideo = cPriorityMin - FlagAllowOvershootWhileOptimal = true - FlagAllowOvershootWhileDeficient = false - FlagAllowOvershootExemptTrackWhileDeficient = true - FlagAllowOvershootInProbe = true - FlagAllowOvershootInCatchup = false - FlagAllowOvershootInBoost = true + cFlagAllowOvershootWhileOptimal = true + cFlagAllowOvershootWhileDeficient = false + cFlagAllowOvershootExemptTrackWhileDeficient = true + cFlagAllowOvershootInProbe = true + cFlagAllowOvershootInCatchup = false + cFlagAllowOvershootInBoost = true cRTTPullInterval = 30 * time.Second + + cPingLong = cRTTPullInterval / 2 + cPingShort = 100 * time.Millisecond ) // --------------------------------------------------------------------------- @@ -215,6 +218,8 @@ type StreamAllocator struct { lastRTTTime time.Time + pingGeneration atomic.Uint32 + isStopped atomic.Bool } @@ -247,7 +252,7 @@ func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause b func (s *StreamAllocator) Start() { s.eventsQueue.Start() - go s.ping() + go s.ping(s.pingGeneration.Inc(), cPingLong) } func (s *StreamAllocator) Stop() { @@ -589,13 +594,13 @@ func (s *StreamAllocator) maybePostEventAllocateTrack(downTrack *sfu.DownTrack) } } -func (s *StreamAllocator) ping() { - ticker := time.NewTicker(100 * time.Millisecond) +func (s *StreamAllocator) ping(pingGeneration uint32, interval time.Duration) { + ticker := time.NewTicker(interval) defer ticker.Stop() for { <-ticker.C - if s.isStopped.Load() { + if s.isStopped.Load() || (pingGeneration != s.pingGeneration.Load()) { return } @@ -833,7 +838,7 @@ func (s *StreamAllocator) handleSignalCongestionStateChange(event Event) { if isHoldableCongestionState(cscd.fromState) && cscd.toState == bwe.CongestionStateNone && s.state == streamAllocatorStateStable { update := NewStreamStateUpdate() for _, track := range s.getTracks() { - allocation := track.AllocateOptimal(FlagAllowOvershootWhileOptimal, false) + allocation := track.AllocateOptimal(cFlagAllowOvershootWhileOptimal, false) updateStreamStateChange(track, allocation, update) } s.maybeSendUpdate(update) @@ -879,6 +884,9 @@ func (s *StreamAllocator) setState(state streamAllocatorState) { s.params.BWE.Reset() s.activeProbeClusterId = ccutils.ProbeClusterIdInvalid + go s.ping(s.pingGeneration.Inc(), cPingLong) + } else { + go s.ping(s.pingGeneration.Inc(), cPingShort) } } @@ -901,7 +909,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) { bweCongestionState := s.params.BWE.CongestionState() if !s.enabled || (s.state == streamAllocatorStateStable && !isDeficientCongestionState(bweCongestionState)) || !track.IsManaged() { update := NewStreamStateUpdate() - allocation := track.AllocateOptimal(FlagAllowOvershootWhileOptimal, isHoldableCongestionState(bweCongestionState)) + allocation := track.AllocateOptimal(cFlagAllowOvershootWhileOptimal, isHoldableCongestionState(bweCongestionState)) updateStreamStateChange(track, allocation, update) s.maybeSendUpdate(update) return @@ -927,7 +935,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) { // track needing change is not currently streaming, i. e. it has to be resumed. // track.ProvisionalAllocatePrepare() - transition := track.ProvisionalAllocateGetCooperativeTransition(FlagAllowOvershootWhileDeficient) + transition := track.ProvisionalAllocateGetCooperativeTransition(cFlagAllowOvershootWhileDeficient) // downgrade, giving back bits if transition.From.GreaterThan(transition.To) { @@ -973,7 +981,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) { availableChannelCapacity, layer, s.allowPause, - FlagAllowOvershootWhileDeficient, + cFlagAllowOvershootWhileDeficient, ) if availableChannelCapacity < usedChannelCapacity { break alloc_loop @@ -999,7 +1007,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) { } track.ProvisionalAllocateReset() - transition = track.ProvisionalAllocateGetCooperativeTransition(FlagAllowOvershootWhileDeficient) // get transition again to reset above allocation attempt using available headroom + transition = track.ProvisionalAllocateGetCooperativeTransition(cFlagAllowOvershootWhileDeficient) // get transition again to reset above allocation attempt using available headroom } // if there is not enough headroom, try to redistribute starting with tracks that are closest to their desired. @@ -1082,7 +1090,7 @@ func (s *StreamAllocator) maybeBoostDeficientTracks() { boost_loop: for { for idx, track := range sortedTracks { - allocation, boosted := track.AllocateNextHigher(availableChannelCapacity, FlagAllowOvershootInCatchup) + allocation, boosted := track.AllocateNextHigher(availableChannelCapacity, cFlagAllowOvershootInCatchup) if !boosted { if idx == len(sortedTracks)-1 { // all tracks tried @@ -1143,7 +1151,7 @@ func (s *StreamAllocator) allocateAllTracks() { continue } - allocation := track.AllocateOptimal(FlagAllowOvershootExemptTrackWhileDeficient, false) + allocation := track.AllocateOptimal(cFlagAllowOvershootExemptTrackWhileDeficient, false) updateStreamStateChange(track, allocation, update) // STREAM-ALLOCATOR-TODO: optimistic allocation before bitrate is available will return 0. How to account for that? @@ -1179,7 +1187,7 @@ func (s *StreamAllocator) allocateAllTracks() { } for _, track := range sorted { - _, usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient) + _, usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, cFlagAllowOvershootWhileDeficient) availableChannelCapacity -= usedChannelCapacity if availableChannelCapacity < 0 { availableChannelCapacity = 0 @@ -1310,7 +1318,7 @@ func (s *StreamAllocator) maybeProbe() { func (s *StreamAllocator) maybeProbeWithMedia() { // boost deficient track farthest from desired layer for _, track := range s.getMaxDistanceSortedDeficient() { - allocation, boosted := track.AllocateNextHigher(ChannelCapacityInfinity, FlagAllowOvershootInBoost) + allocation, boosted := track.AllocateNextHigher(cChannelCapacityInfinity, cFlagAllowOvershootInBoost) if !boosted { continue } @@ -1327,7 +1335,7 @@ func (s *StreamAllocator) maybeProbeWithMedia() { func (s *StreamAllocator) maybeProbeWithPadding() { // use deficient track farthest from desired layer to find how much to probe for _, track := range s.getMaxDistanceSortedDeficient() { - transition, available := track.GetNextHigherTransition(FlagAllowOvershootInProbe) + transition, available := track.GetNextHigherTransition(cFlagAllowOvershootInProbe) if !available || transition.BandwidthDelta < 0 { continue } diff --git a/pkg/sfu/streamallocator/track.go b/pkg/sfu/streamallocator/track.go index 20a76b44b..a094aa50e 100644 --- a/pkg/sfu/streamallocator/track.go +++ b/pkg/sfu/streamallocator/track.go @@ -87,9 +87,9 @@ func (t *Track) SetPriority(priority uint8) bool { if priority == 0 { switch t.source { case livekit.TrackSource_SCREEN_SHARE: - priority = PriorityDefaultScreenshare + priority = cPriorityDefaultScreenshare default: - priority = PriorityDefaultVideo + priority = cPriorityDefaultVideo } } diff --git a/pkg/utils/opsqueue.go b/pkg/utils/opsqueue.go index 65e0dafe2..5e88b959c 100644 --- a/pkg/utils/opsqueue.go +++ b/pkg/utils/opsqueue.go @@ -128,6 +128,7 @@ func (oq *opsQueueBase[T]) Enqueue(op T) { select { case oq.wake <- struct{}{}: default: + oq.params.Logger.Infow("could not wake ops queue", "name", oq.params.Name) } } }