diff --git a/pkg/sfu/bwe/remotebwe/remote_bwe.go b/pkg/sfu/bwe/remotebwe/remote_bwe.go index e7e36ed46..df8d5e5e9 100644 --- a/pkg/sfu/bwe/remotebwe/remote_bwe.go +++ b/pkg/sfu/bwe/remotebwe/remote_bwe.go @@ -18,7 +18,6 @@ import ( "sync" "time" - "github.com/frostbyte73/core" "github.com/livekit/livekit-server/pkg/sfu/bwe" "github.com/livekit/livekit-server/pkg/sfu/ccutils" "github.com/livekit/protocol/logger" @@ -74,9 +73,6 @@ type RemoteBWE struct { congestionState bwe.CongestionState congestionStateSwitchedAt time.Time - wake chan struct{} - stop core.Fuse - bweListener bwe.BWEListener } @@ -85,8 +81,6 @@ func NewRemoteBWE(params RemoteBWEParams) *RemoteBWE { params: params, } - go r.worker() - r.Reset() return r } @@ -118,16 +112,6 @@ func (r *RemoteBWE) Reset() { r.congestionState = bwe.CongestionStateNone r.congestionStateSwitchedAt = mono.Now() - - // notify worker for ticker interval management based on state - select { - case r.wake <- struct{}{}: - default: - } -} - -func (r *RemoteBWE) Stop() { - r.stop.Break() } func (r *RemoteBWE) HandleREMB( @@ -140,6 +124,13 @@ func (r *RemoteBWE) HandleREMB( r.lastReceivedEstimate = receivedEstimate r.lastExpectedBandwidthUsage = expectedBandwidthUsage + // in probe, freeze channel observer state if probe causes congestion till the probe is done, + // this is to ensure that probe result is not a success and an unsuccessful probe will not up allocate any tracks + if r.isInProbe && r.congestionState != bwe.CongestionStateNone { + r.lock.Unlock() + return + } + r.channelObserver.AddEstimate(r.lastReceivedEstimate) r.channelObserver.AddNack(sentPackets, repeatedNacks) @@ -171,7 +162,9 @@ func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState switch r.congestionState { case bwe.CongestionStateNone: if trend == channelTrendCongesting { - if r.estimateAvailableChannelCapacity(reason) { + if r.isInProbe || r.estimateAvailableChannelCapacity(reason) { + // when in probe, if congested, stays there will probe is done, + // the estimate stays at pre-probe level newState = bwe.CongestionStateCongested } } @@ -257,39 +250,10 @@ func (r *RemoteBWE) updateCongestionState(state bwe.CongestionState, reason chan "committedChannelCapacity", r.committedChannelCapacity, ) - if state != r.congestionState { - // notify worker for ticker interval management based on state - select { - case r.wake <- struct{}{}: - default: - } - } - r.congestionState = state r.congestionStateSwitchedAt = mono.Now() } -func (r *RemoteBWE) newChannelObserver() { - if r.isInProbe { - r.channelObserver = newChannelObserver( - channelObserverParams{ - Name: "probe", - Config: r.params.Config.ChannelObserverProbe, - }, - r.params.Logger, - ) - r.channelObserver.SeedEstimate(r.committedChannelCapacity) - } else { - r.channelObserver = newChannelObserver( - channelObserverParams{ - Name: "non-probe", - Config: r.params.Config.ChannelObserverNonProbe, - }, - r.params.Logger, - ) - } -} - func (r *RemoteBWE) ProbeClusterStarting(pci ccutils.ProbeClusterInfo) { r.lock.Lock() defer r.lock.Unlock() @@ -311,9 +275,11 @@ func (r *RemoteBWE) ProbeClusterDone(_pci ccutils.ProbeClusterInfo) (bool, int64 r.lock.Lock() defer r.lock.Unlock() - // switch to a non-probe channel observer on probe end + // switch to a non-probe channel observer on probe end, + // reset congestion state to get a fresh trend pco := r.channelObserver r.isInProbe = false + r.congestionState = bwe.CongestionStateNone r.newChannelObserver() r.params.Logger.Debugw( @@ -331,56 +297,29 @@ func (r *RemoteBWE) ProbeClusterDone(_pci ccutils.ProbeClusterInfo) (bool, int64 trend, _ := pco.GetTrend() highestEstimate := pco.GetHighestEstimate() - if trend == channelTrendClearing && highestEstimate > r.committedChannelCapacity { + if trend != channelTrendCongesting && highestEstimate > r.committedChannelCapacity { r.committedChannelCapacity = highestEstimate } - return trend == channelTrendClearing, r.committedChannelCapacity + return trend == channelTrendCongesting, r.committedChannelCapacity } -func (r *RemoteBWE) getCheckInterval() time.Duration { - r.lock.RLock() - state := r.congestionState - r.lock.RUnlock() - - switch state { - case bwe.CongestionStateCongested: - if r.params.Config.PeriodicCheckIntervalCongested != 0 { - return r.params.Config.PeriodicCheckIntervalCongested - } - - return DefaultRemoteBWEConfig.PeriodicCheckIntervalCongested - - default: - if r.params.Config.PeriodicCheckInterval != 0 { - return r.params.Config.PeriodicCheckInterval - } - - return DefaultRemoteBWEConfig.PeriodicCheckInterval - } -} - -func (r *RemoteBWE) worker() { - ticker := time.NewTicker(r.getCheckInterval()) - defer ticker.Stop() - - for { - select { - case <-r.wake: - ticker.Reset(r.getCheckInterval()) - - case <-ticker.C: - r.lock.Lock() - shouldNotify, state, committedChannelCapacity := r.congestionDetectionStateMachine() - r.lock.Unlock() - - if shouldNotify { - if bweListener := r.getBWEListener(); bweListener != nil { - bweListener.OnCongestionStateChange(state, committedChannelCapacity) - } - } - - case <-r.stop.Watch(): - return - } +func (r *RemoteBWE) newChannelObserver() { + if r.isInProbe { + r.channelObserver = newChannelObserver( + channelObserverParams{ + Name: "probe", + Config: r.params.Config.ChannelObserverProbe, + }, + r.params.Logger, + ) + r.channelObserver.SeedEstimate(r.committedChannelCapacity) + } else { + r.channelObserver = newChannelObserver( + channelObserverParams{ + Name: "non-probe", + Config: r.params.Config.ChannelObserverNonProbe, + }, + r.params.Logger, + ) } } diff --git a/pkg/sfu/streamallocator/probe_controller.go b/pkg/sfu/streamallocator/probe_controller.go index 40ed41e61..54c62fc4d 100644 --- a/pkg/sfu/streamallocator/probe_controller.go +++ b/pkg/sfu/streamallocator/probe_controller.go @@ -225,8 +225,8 @@ func (p *ProbeController) MaybeFinalizeProbe() (ccutils.ProbeClusterInfo, bool) return p.pci, true } -func (p *ProbeController) ProbeCongestionSignal(isCongestionClearing bool) { - if !isCongestionClearing { +func (p *ProbeController) ProbeCongestionSignal(isCongesting bool) { + if isCongesting { // wait longer till next probe p.probeInterval = time.Duration(p.probeInterval.Seconds()*p.params.Config.BackoffFactor) * time.Second if p.probeInterval > p.params.Config.MaxInterval { diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 0b0ce7905..48ca9ce6e 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -695,13 +695,13 @@ func (s *StreamAllocator) handleSignalEstimate(event Event) { func (s *StreamAllocator) handleSignalPeriodicPing(Event) { // finalize any probe that may have finished/aborted if pci, ok := s.probeController.MaybeFinalizeProbe(); ok { - isCongestionClearing, channelCapacity := s.params.BWE.ProbeClusterDone(pci) + isCongesting, channelCapacity := s.params.BWE.ProbeClusterDone(pci) s.params.Logger.Debugw( "stream allocator: probe result", - "isCongestionClearing", isCongestionClearing, + "isCongesting", isCongesting, "channelCapacity", channelCapacity, ) - if isCongestionClearing { + if !isCongesting { if channelCapacity > s.committedChannelCapacity { s.committedChannelCapacity = channelCapacity } @@ -709,7 +709,7 @@ func (s *StreamAllocator) handleSignalPeriodicPing(Event) { s.maybeBoostDeficientTracks() } - s.probeController.ProbeCongestionSignal(isCongestionClearing) + s.probeController.ProbeCongestionSignal(isCongesting) } // probe if necessary and timing is right