From 2dcb5c928ac2c99503a0b96c2c665e793e87596b Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 2 Dec 2024 23:06:06 +0530 Subject: [PATCH] Freeze update on congested probe. (#3228) Reverting back to pre-refactor behaviour. Was trying to avoid doing special treatment when in probe, but REMB values are hard to predict and the NACKs as well. So, freeze updates when congesting in probe till the probe is done. Otherwise, further changes while probe is finalising sometimes causes an invalid signal and tracks are not up allocated. --- pkg/sfu/bwe/remotebwe/remote_bwe.go | 127 +++++--------------- pkg/sfu/streamallocator/probe_controller.go | 4 +- pkg/sfu/streamallocator/streamallocator.go | 8 +- 3 files changed, 39 insertions(+), 100 deletions(-) diff --git a/pkg/sfu/bwe/remotebwe/remote_bwe.go b/pkg/sfu/bwe/remotebwe/remote_bwe.go index e7e36ed46..df8d5e5e9 100644 --- a/pkg/sfu/bwe/remotebwe/remote_bwe.go +++ b/pkg/sfu/bwe/remotebwe/remote_bwe.go @@ -18,7 +18,6 @@ import ( "sync" "time" - "github.com/frostbyte73/core" "github.com/livekit/livekit-server/pkg/sfu/bwe" "github.com/livekit/livekit-server/pkg/sfu/ccutils" "github.com/livekit/protocol/logger" @@ -74,9 +73,6 @@ type RemoteBWE struct { congestionState bwe.CongestionState congestionStateSwitchedAt time.Time - wake chan struct{} - stop core.Fuse - bweListener bwe.BWEListener } @@ -85,8 +81,6 @@ func NewRemoteBWE(params RemoteBWEParams) *RemoteBWE { params: params, } - go r.worker() - r.Reset() return r } @@ -118,16 +112,6 @@ func (r *RemoteBWE) Reset() { r.congestionState = bwe.CongestionStateNone r.congestionStateSwitchedAt = mono.Now() - - // notify worker for ticker interval management based on state - select { - case r.wake <- struct{}{}: - default: - } -} - -func (r *RemoteBWE) Stop() { - r.stop.Break() } func (r *RemoteBWE) HandleREMB( @@ -140,6 +124,13 @@ func (r *RemoteBWE) HandleREMB( r.lastReceivedEstimate = receivedEstimate r.lastExpectedBandwidthUsage = expectedBandwidthUsage + // in probe, freeze channel observer state if probe causes congestion till the probe is done, + // this is to ensure that probe result is not a success and an unsuccessful probe will not up allocate any tracks + if r.isInProbe && r.congestionState != bwe.CongestionStateNone { + r.lock.Unlock() + return + } + r.channelObserver.AddEstimate(r.lastReceivedEstimate) r.channelObserver.AddNack(sentPackets, repeatedNacks) @@ -171,7 +162,9 @@ func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState switch r.congestionState { case bwe.CongestionStateNone: if trend == channelTrendCongesting { - if r.estimateAvailableChannelCapacity(reason) { + if r.isInProbe || r.estimateAvailableChannelCapacity(reason) { + // when in probe, if congested, stays there will probe is done, + // the estimate stays at pre-probe level newState = bwe.CongestionStateCongested } } @@ -257,39 +250,10 @@ func (r *RemoteBWE) updateCongestionState(state bwe.CongestionState, reason chan "committedChannelCapacity", r.committedChannelCapacity, ) - if state != r.congestionState { - // notify worker for ticker interval management based on state - select { - case r.wake <- struct{}{}: - default: - } - } - r.congestionState = state r.congestionStateSwitchedAt = mono.Now() } -func (r *RemoteBWE) newChannelObserver() { - if r.isInProbe { - r.channelObserver = newChannelObserver( - channelObserverParams{ - Name: "probe", - Config: r.params.Config.ChannelObserverProbe, - }, - r.params.Logger, - ) - r.channelObserver.SeedEstimate(r.committedChannelCapacity) - } else { - r.channelObserver = newChannelObserver( - channelObserverParams{ - Name: "non-probe", - Config: r.params.Config.ChannelObserverNonProbe, - }, - r.params.Logger, - ) - } -} - func (r *RemoteBWE) ProbeClusterStarting(pci ccutils.ProbeClusterInfo) { r.lock.Lock() defer r.lock.Unlock() @@ -311,9 +275,11 @@ func (r *RemoteBWE) ProbeClusterDone(_pci ccutils.ProbeClusterInfo) (bool, int64 r.lock.Lock() defer r.lock.Unlock() - // switch to a non-probe channel observer on probe end + // switch to a non-probe channel observer on probe end, + // reset congestion state to get a fresh trend pco := r.channelObserver r.isInProbe = false + r.congestionState = bwe.CongestionStateNone r.newChannelObserver() r.params.Logger.Debugw( @@ -331,56 +297,29 @@ func (r *RemoteBWE) ProbeClusterDone(_pci ccutils.ProbeClusterInfo) (bool, int64 trend, _ := pco.GetTrend() highestEstimate := pco.GetHighestEstimate() - if trend == channelTrendClearing && highestEstimate > r.committedChannelCapacity { + if trend != channelTrendCongesting && highestEstimate > r.committedChannelCapacity { r.committedChannelCapacity = highestEstimate } - return trend == channelTrendClearing, r.committedChannelCapacity + return trend == channelTrendCongesting, r.committedChannelCapacity } -func (r *RemoteBWE) getCheckInterval() time.Duration { - r.lock.RLock() - state := r.congestionState - r.lock.RUnlock() - - switch state { - case bwe.CongestionStateCongested: - if r.params.Config.PeriodicCheckIntervalCongested != 0 { - return r.params.Config.PeriodicCheckIntervalCongested - } - - return DefaultRemoteBWEConfig.PeriodicCheckIntervalCongested - - default: - if r.params.Config.PeriodicCheckInterval != 0 { - return r.params.Config.PeriodicCheckInterval - } - - return DefaultRemoteBWEConfig.PeriodicCheckInterval - } -} - -func (r *RemoteBWE) worker() { - ticker := time.NewTicker(r.getCheckInterval()) - defer ticker.Stop() - - for { - select { - case <-r.wake: - ticker.Reset(r.getCheckInterval()) - - case <-ticker.C: - r.lock.Lock() - shouldNotify, state, committedChannelCapacity := r.congestionDetectionStateMachine() - r.lock.Unlock() - - if shouldNotify { - if bweListener := r.getBWEListener(); bweListener != nil { - bweListener.OnCongestionStateChange(state, committedChannelCapacity) - } - } - - case <-r.stop.Watch(): - return - } +func (r *RemoteBWE) newChannelObserver() { + if r.isInProbe { + r.channelObserver = newChannelObserver( + channelObserverParams{ + Name: "probe", + Config: r.params.Config.ChannelObserverProbe, + }, + r.params.Logger, + ) + r.channelObserver.SeedEstimate(r.committedChannelCapacity) + } else { + r.channelObserver = newChannelObserver( + channelObserverParams{ + Name: "non-probe", + Config: r.params.Config.ChannelObserverNonProbe, + }, + r.params.Logger, + ) } } diff --git a/pkg/sfu/streamallocator/probe_controller.go b/pkg/sfu/streamallocator/probe_controller.go index 40ed41e61..54c62fc4d 100644 --- a/pkg/sfu/streamallocator/probe_controller.go +++ b/pkg/sfu/streamallocator/probe_controller.go @@ -225,8 +225,8 @@ func (p *ProbeController) MaybeFinalizeProbe() (ccutils.ProbeClusterInfo, bool) return p.pci, true } -func (p *ProbeController) ProbeCongestionSignal(isCongestionClearing bool) { - if !isCongestionClearing { +func (p *ProbeController) ProbeCongestionSignal(isCongesting bool) { + if isCongesting { // wait longer till next probe p.probeInterval = time.Duration(p.probeInterval.Seconds()*p.params.Config.BackoffFactor) * time.Second if p.probeInterval > p.params.Config.MaxInterval { diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 0b0ce7905..48ca9ce6e 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -695,13 +695,13 @@ func (s *StreamAllocator) handleSignalEstimate(event Event) { func (s *StreamAllocator) handleSignalPeriodicPing(Event) { // finalize any probe that may have finished/aborted if pci, ok := s.probeController.MaybeFinalizeProbe(); ok { - isCongestionClearing, channelCapacity := s.params.BWE.ProbeClusterDone(pci) + isCongesting, channelCapacity := s.params.BWE.ProbeClusterDone(pci) s.params.Logger.Debugw( "stream allocator: probe result", - "isCongestionClearing", isCongestionClearing, + "isCongesting", isCongesting, "channelCapacity", channelCapacity, ) - if isCongestionClearing { + if !isCongesting { if channelCapacity > s.committedChannelCapacity { s.committedChannelCapacity = channelCapacity } @@ -709,7 +709,7 @@ func (s *StreamAllocator) handleSignalPeriodicPing(Event) { s.maybeBoostDeficientTracks() } - s.probeController.ProbeCongestionSignal(isCongestionClearing) + s.probeController.ProbeCongestionSignal(isCongesting) } // probe if necessary and timing is right