Freeze update on congested probe. (#3228)

Reverting back to pre-refactor behaviour. Was trying to avoid doing
special treatment when in probe, but REMB values are hard to predict
and the NACKs as well.

So, freeze updates when congesting in probe till the probe is done.
Otherwise, further changes while probe is finalising sometimes causes an
invalid signal and tracks are not up allocated.
This commit is contained in:
Raja Subramanian
2024-12-02 23:06:06 +05:30
committed by GitHub
parent 12b3da0a40
commit 2dcb5c928a
3 changed files with 39 additions and 100 deletions
+33 -94
View File
@@ -18,7 +18,6 @@ import (
"sync"
"time"
"github.com/frostbyte73/core"
"github.com/livekit/livekit-server/pkg/sfu/bwe"
"github.com/livekit/livekit-server/pkg/sfu/ccutils"
"github.com/livekit/protocol/logger"
@@ -74,9 +73,6 @@ type RemoteBWE struct {
congestionState bwe.CongestionState
congestionStateSwitchedAt time.Time
wake chan struct{}
stop core.Fuse
bweListener bwe.BWEListener
}
@@ -85,8 +81,6 @@ func NewRemoteBWE(params RemoteBWEParams) *RemoteBWE {
params: params,
}
go r.worker()
r.Reset()
return r
}
@@ -118,16 +112,6 @@ func (r *RemoteBWE) Reset() {
r.congestionState = bwe.CongestionStateNone
r.congestionStateSwitchedAt = mono.Now()
// notify worker for ticker interval management based on state
select {
case r.wake <- struct{}{}:
default:
}
}
func (r *RemoteBWE) Stop() {
r.stop.Break()
}
func (r *RemoteBWE) HandleREMB(
@@ -140,6 +124,13 @@ func (r *RemoteBWE) HandleREMB(
r.lastReceivedEstimate = receivedEstimate
r.lastExpectedBandwidthUsage = expectedBandwidthUsage
// in probe, freeze channel observer state if probe causes congestion till the probe is done,
// this is to ensure that probe result is not a success and an unsuccessful probe will not up allocate any tracks
if r.isInProbe && r.congestionState != bwe.CongestionStateNone {
r.lock.Unlock()
return
}
r.channelObserver.AddEstimate(r.lastReceivedEstimate)
r.channelObserver.AddNack(sentPackets, repeatedNacks)
@@ -171,7 +162,9 @@ func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState
switch r.congestionState {
case bwe.CongestionStateNone:
if trend == channelTrendCongesting {
if r.estimateAvailableChannelCapacity(reason) {
if r.isInProbe || r.estimateAvailableChannelCapacity(reason) {
// when in probe, if congested, stays there will probe is done,
// the estimate stays at pre-probe level
newState = bwe.CongestionStateCongested
}
}
@@ -257,39 +250,10 @@ func (r *RemoteBWE) updateCongestionState(state bwe.CongestionState, reason chan
"committedChannelCapacity", r.committedChannelCapacity,
)
if state != r.congestionState {
// notify worker for ticker interval management based on state
select {
case r.wake <- struct{}{}:
default:
}
}
r.congestionState = state
r.congestionStateSwitchedAt = mono.Now()
}
func (r *RemoteBWE) newChannelObserver() {
if r.isInProbe {
r.channelObserver = newChannelObserver(
channelObserverParams{
Name: "probe",
Config: r.params.Config.ChannelObserverProbe,
},
r.params.Logger,
)
r.channelObserver.SeedEstimate(r.committedChannelCapacity)
} else {
r.channelObserver = newChannelObserver(
channelObserverParams{
Name: "non-probe",
Config: r.params.Config.ChannelObserverNonProbe,
},
r.params.Logger,
)
}
}
func (r *RemoteBWE) ProbeClusterStarting(pci ccutils.ProbeClusterInfo) {
r.lock.Lock()
defer r.lock.Unlock()
@@ -311,9 +275,11 @@ func (r *RemoteBWE) ProbeClusterDone(_pci ccutils.ProbeClusterInfo) (bool, int64
r.lock.Lock()
defer r.lock.Unlock()
// switch to a non-probe channel observer on probe end
// switch to a non-probe channel observer on probe end,
// reset congestion state to get a fresh trend
pco := r.channelObserver
r.isInProbe = false
r.congestionState = bwe.CongestionStateNone
r.newChannelObserver()
r.params.Logger.Debugw(
@@ -331,56 +297,29 @@ func (r *RemoteBWE) ProbeClusterDone(_pci ccutils.ProbeClusterInfo) (bool, int64
trend, _ := pco.GetTrend()
highestEstimate := pco.GetHighestEstimate()
if trend == channelTrendClearing && highestEstimate > r.committedChannelCapacity {
if trend != channelTrendCongesting && highestEstimate > r.committedChannelCapacity {
r.committedChannelCapacity = highestEstimate
}
return trend == channelTrendClearing, r.committedChannelCapacity
return trend == channelTrendCongesting, r.committedChannelCapacity
}
func (r *RemoteBWE) getCheckInterval() time.Duration {
r.lock.RLock()
state := r.congestionState
r.lock.RUnlock()
switch state {
case bwe.CongestionStateCongested:
if r.params.Config.PeriodicCheckIntervalCongested != 0 {
return r.params.Config.PeriodicCheckIntervalCongested
}
return DefaultRemoteBWEConfig.PeriodicCheckIntervalCongested
default:
if r.params.Config.PeriodicCheckInterval != 0 {
return r.params.Config.PeriodicCheckInterval
}
return DefaultRemoteBWEConfig.PeriodicCheckInterval
}
}
func (r *RemoteBWE) worker() {
ticker := time.NewTicker(r.getCheckInterval())
defer ticker.Stop()
for {
select {
case <-r.wake:
ticker.Reset(r.getCheckInterval())
case <-ticker.C:
r.lock.Lock()
shouldNotify, state, committedChannelCapacity := r.congestionDetectionStateMachine()
r.lock.Unlock()
if shouldNotify {
if bweListener := r.getBWEListener(); bweListener != nil {
bweListener.OnCongestionStateChange(state, committedChannelCapacity)
}
}
case <-r.stop.Watch():
return
}
func (r *RemoteBWE) newChannelObserver() {
if r.isInProbe {
r.channelObserver = newChannelObserver(
channelObserverParams{
Name: "probe",
Config: r.params.Config.ChannelObserverProbe,
},
r.params.Logger,
)
r.channelObserver.SeedEstimate(r.committedChannelCapacity)
} else {
r.channelObserver = newChannelObserver(
channelObserverParams{
Name: "non-probe",
Config: r.params.Config.ChannelObserverNonProbe,
},
r.params.Logger,
)
}
}
+2 -2
View File
@@ -225,8 +225,8 @@ func (p *ProbeController) MaybeFinalizeProbe() (ccutils.ProbeClusterInfo, bool)
return p.pci, true
}
func (p *ProbeController) ProbeCongestionSignal(isCongestionClearing bool) {
if !isCongestionClearing {
func (p *ProbeController) ProbeCongestionSignal(isCongesting bool) {
if isCongesting {
// wait longer till next probe
p.probeInterval = time.Duration(p.probeInterval.Seconds()*p.params.Config.BackoffFactor) * time.Second
if p.probeInterval > p.params.Config.MaxInterval {
+4 -4
View File
@@ -695,13 +695,13 @@ func (s *StreamAllocator) handleSignalEstimate(event Event) {
func (s *StreamAllocator) handleSignalPeriodicPing(Event) {
// finalize any probe that may have finished/aborted
if pci, ok := s.probeController.MaybeFinalizeProbe(); ok {
isCongestionClearing, channelCapacity := s.params.BWE.ProbeClusterDone(pci)
isCongesting, channelCapacity := s.params.BWE.ProbeClusterDone(pci)
s.params.Logger.Debugw(
"stream allocator: probe result",
"isCongestionClearing", isCongestionClearing,
"isCongesting", isCongesting,
"channelCapacity", channelCapacity,
)
if isCongestionClearing {
if !isCongesting {
if channelCapacity > s.committedChannelCapacity {
s.committedChannelCapacity = channelCapacity
}
@@ -709,7 +709,7 @@ func (s *StreamAllocator) handleSignalPeriodicPing(Event) {
s.maybeBoostDeficientTracks()
}
s.probeController.ProbeCongestionSignal(isCongestionClearing)
s.probeController.ProbeCongestionSignal(isCongesting)
}
// probe if necessary and timing is right