diff --git a/pkg/config/config.go b/pkg/config/config.go index a966ab288..d60c03c82 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -29,7 +29,8 @@ import ( "github.com/livekit/livekit-server/pkg/metric" "github.com/livekit/livekit-server/pkg/sfu" - "github.com/livekit/livekit-server/pkg/sfu/sendsidebwe" + "github.com/livekit/livekit-server/pkg/sfu/bwe/remotebwe" + "github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe" "github.com/livekit/livekit-server/pkg/sfu/streamallocator" "github.com/livekit/mediatransportutil/pkg/rtcconfig" "github.com/livekit/protocol/livekit" @@ -129,6 +130,8 @@ type CongestionControlConfig struct { StreamAllocator streamallocator.StreamAllocatorConfig `yaml:"stream_allocator,omitempty"` + RemoteBWE remotebwe.RemoteBWEConfig `yaml:"remote_bwe,omitempty"` + UseSendSideBWEInterceptor bool `yaml:"use_send_side_bwe_interceptor,omitempty"` UseSendSideBWE bool `yaml:"use_send_side_bwe,omitempty"` @@ -314,6 +317,7 @@ var DefaultConfig = Config{ Enabled: true, AllowPause: false, StreamAllocator: streamallocator.DefaultStreamAllocatorConfig, + RemoteBWE: remotebwe.DefaultRemoteBWEConfig, UseSendSideBWEInterceptor: false, UseSendSideBWE: false, SendSideBWE: sendsidebwe.DefaultSendSideBWEConfig, diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index c4eb31a4f..8982dd4c1 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -39,10 +39,12 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/rtc/transport" "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/sfu/bwe" + "github.com/livekit/livekit-server/pkg/sfu/bwe/remotebwe" + "github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe" sfuinterceptor "github.com/livekit/livekit-server/pkg/sfu/interceptor" "github.com/livekit/livekit-server/pkg/sfu/pacer" pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay" - "github.com/livekit/livekit-server/pkg/sfu/sendsidebwe" "github.com/livekit/livekit-server/pkg/sfu/streamallocator" sfuutils "github.com/livekit/livekit-server/pkg/sfu/utils" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" @@ -208,8 +210,8 @@ type PCTransport struct { streamAllocator *streamallocator.StreamAllocator // only for subscriber PC - sendSideBWE *sendsidebwe.SendSideBWE - pacer pacer.Pacer + bwe bwe.BWE + pacer pacer.Pacer previousAnswer *webrtc.SessionDescription // track id -> description map in previous offer sdp @@ -350,7 +352,7 @@ func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimat ir := &interceptor.Registry{} if params.IsSendSide { se.DetachDataChannels() - if params.CongestionControlConfig.UseSendSideBWEInterceptor || params.UseSendSideBWEInterceptor && (!params.CongestionControlConfig.UseSendSideBWE && !params.UseSendSideBWE) { + if (params.CongestionControlConfig.UseSendSideBWEInterceptor || params.UseSendSideBWEInterceptor) && (!params.CongestionControlConfig.UseSendSideBWE && !params.UseSendSideBWE) { params.Logger.Infow("using send side BWE - interceptor") gf, err := cc.NewInterceptor(func() (cc.BandwidthEstimator, error) { return gcc.NewSendSideBWE( @@ -456,16 +458,20 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) { if params.CongestionControlConfig.UseSendSideBWE || params.UseSendSideBWE { params.Logger.Infow("using send side BWE") - t.sendSideBWE = sendsidebwe.NewSendSideBWE(sendsidebwe.SendSideBWEParams{ + ssbwe := sendsidebwe.NewSendSideBWE(sendsidebwe.SendSideBWEParams{ Config: params.CongestionControlConfig.SendSideBWE, Logger: params.Logger, }) - t.pacer = pacer.NewNoQueue(params.Logger, t.sendSideBWE) - - t.streamAllocator.SetSendSideBWE(t.sendSideBWE) + t.pacer = pacer.NewNoQueue(params.Logger, ssbwe) + t.bwe = ssbwe } else { + t.bwe = remotebwe.NewRemoteBWE(remotebwe.RemoteBWEParams{ + Config: params.CongestionControlConfig.RemoteBWE, + Logger: params.Logger, + }) t.pacer = pacer.NewPassThrough(params.Logger, nil) } + t.streamAllocator.SetBWE(t.bwe) } if err := t.createPeerConnection(); err != nil { @@ -991,8 +997,8 @@ func (t *PCTransport) Close() { if t.pacer != nil { t.pacer.Stop() } - if t.sendSideBWE != nil { - t.sendSideBWE.Stop() + if t.bwe != nil { + t.bwe.Stop() } _ = t.pc.Close() diff --git a/pkg/sfu/bwe/bwe.go b/pkg/sfu/bwe/bwe.go new file mode 100644 index 000000000..372455228 --- /dev/null +++ b/pkg/sfu/bwe/bwe.go @@ -0,0 +1,105 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bwe + +import ( + "fmt" + + "github.com/pion/rtcp" +) + +// ------------------------------------------------ + +type CongestionState int + +const ( + CongestionStateNone CongestionState = iota + CongestionStateEarlyWarning + CongestionStateEarlyWarningHangover + CongestionStateCongested + CongestionStateCongestedHangover +) + +func (c CongestionState) String() string { + switch c { + case CongestionStateNone: + return "NONE" + case CongestionStateEarlyWarning: + return "EARLY_WARNING" + case CongestionStateEarlyWarningHangover: + return "EARLY_WARNING_HANGOVER" + case CongestionStateCongested: + return "CONGESTED" + case CongestionStateCongestedHangover: + return "CONGESTED_HANGOVER" + default: + return fmt.Sprintf("%d", int(c)) + } +} + +// ------------------------------------------------ + +type ChannelTrend int + +const ( + ChannelTrendNeutral ChannelTrend = iota + ChannelTrendClearing + ChannelTrendCongesting +) + +func (c ChannelTrend) String() string { + switch c { + case ChannelTrendNeutral: + return "NEUTRAL" + case ChannelTrendClearing: + return "CLEARING" + case ChannelTrendCongesting: + return "CONGESTING" + default: + return fmt.Sprintf("%d", int(c)) + } +} + +// ------------------------------------------------ + +type BWE interface { + SetBWEListener(bweListner BWEListener) + + Reset() + + Stop() + + HandleREMB( + receivedEstimate int64, + isProbeFinalizing bool, + expectedBandwidthUsage int64, + sentPackets uint32, + repeatedNacks uint32, + ) + + HandleTWCCFeedback(report *rtcp.TransportLayerCC) + + ProbingStart(expectedBandwidthUsage int64) + ProbingEnd(isNotFailing bool, isGoalReached bool) + GetProbeStatus() (isValidSignal bool, trend ChannelTrend, lowestEstimate int64, highestEstimate int64) +} + +// ------------------------------------------------ + +type BWEListener interface { + OnCongestionStateChange(congestionState CongestionState, estimatedAvailableChannelCapacity int64) +} + +// ------------------------------------------------ diff --git a/pkg/sfu/bwe/null_bwe.go b/pkg/sfu/bwe/null_bwe.go new file mode 100644 index 000000000..b89081dd6 --- /dev/null +++ b/pkg/sfu/bwe/null_bwe.go @@ -0,0 +1,47 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bwe + +import "github.com/pion/rtcp" + +type NullBWE struct { +} + +func (n *NullBWE) SetBWEListener(_bweListener BWEListener) {} + +func (n *NullBWE) Reset() {} + +func (n *NullBWE) Stop() {} + +func (n *NullBWE) HandleREMB( + _receivedEstimate int64, + _isProbeFinalizing bool, + _expectedBandwidthUsage int64, + _sentPackets uint32, + _repeatedNacks uint32, +) { +} + +func (n *NullBWE) HandleTWCCFeedback(_report *rtcp.TransportLayerCC) {} + +func (n *NullBWE) ProbingStart(_expectedBandwidthUsage int64) {} + +func (n *NullBWE) ProbingEnd(_isNotFailing bool, _isGoalReached bool) {} + +func (n *NullBWE) GetProbeStatus() (bool, ChannelTrend, int64, int64) { + return false, ChannelTrendNeutral, 0, 0 +} + +// ------------------------------------------------ diff --git a/pkg/sfu/streamallocator/channelobserver.go b/pkg/sfu/bwe/remotebwe/channel_observer.go similarity index 56% rename from pkg/sfu/streamallocator/channelobserver.go rename to pkg/sfu/bwe/remotebwe/channel_observer.go index a9a0d2221..50f01a25f 100644 --- a/pkg/sfu/streamallocator/channelobserver.go +++ b/pkg/sfu/bwe/remotebwe/channel_observer.go @@ -12,56 +12,34 @@ // See the License for the specific language governing permissions and // limitations under the License. -package streamallocator +package remotebwe import ( "fmt" "time" + "github.com/livekit/livekit-server/pkg/sfu/bwe" "github.com/livekit/livekit-server/pkg/sfu/ccutils" "github.com/livekit/protocol/logger" ) // ------------------------------------------------ -type ChannelTrend int +type channelCongestionReason int const ( - ChannelTrendNeutral ChannelTrend = iota - ChannelTrendClearing - ChannelTrendCongesting + channelCongestionReasonNone channelCongestionReason = iota + channelCongestionReasonEstimate + channelCongestionReasonLoss ) -func (c ChannelTrend) String() string { +func (c channelCongestionReason) String() string { switch c { - case ChannelTrendNeutral: - return "NEUTRAL" - case ChannelTrendClearing: - return "CLEARING" - case ChannelTrendCongesting: - return "CONGESTING" - default: - return fmt.Sprintf("%d", int(c)) - } -} - -// ------------------------------------------------ - -type ChannelCongestionReason int - -const ( - ChannelCongestionReasonNone ChannelCongestionReason = iota - ChannelCongestionReasonEstimate - ChannelCongestionReasonLoss -) - -func (c ChannelCongestionReason) String() string { - switch c { - case ChannelCongestionReasonNone: + case channelCongestionReasonNone: return "NONE" - case ChannelCongestionReasonEstimate: + case channelCongestionReasonEstimate: return "ESTIMATE" - case ChannelCongestionReasonLoss: + case channelCongestionReasonLoss: return "LOSS" default: return fmt.Sprintf("%d", int(c)) @@ -85,9 +63,9 @@ var ( ValidityWindow: 10 * time.Second, } - DefaultChannelObserverConfigProbe = ChannelObserverConfig{ + defaultChannelObserverConfigProbe = ChannelObserverConfig{ Estimate: defaultTrendDetectorConfigProbe, - Nack: DefaultNackTrackerConfigProbe, + Nack: defaultNackTrackerConfigProbe, } defaultTrendDetectorConfigNonProbe = ccutils.TrendDetectorConfig{ @@ -99,29 +77,29 @@ var ( ValidityWindow: 10 * time.Second, } - DefaultChannelObserverConfigNonProbe = ChannelObserverConfig{ + defaultChannelObserverConfigNonProbe = ChannelObserverConfig{ Estimate: defaultTrendDetectorConfigNonProbe, - Nack: DefaultNackTrackerConfigNonProbe, + Nack: defaultNackTrackerConfigNonProbe, } ) // ------------------------------------------------ -type ChannelObserverParams struct { +type channelObserverParams struct { Name string Config ChannelObserverConfig } -type ChannelObserver struct { - params ChannelObserverParams +type channelObserver struct { + params channelObserverParams logger logger.Logger estimateTrend *ccutils.TrendDetector[int64] - nackTracker *NackTracker + nackTracker *nackTracker } -func NewChannelObserver(params ChannelObserverParams, logger logger.Logger) *ChannelObserver { - return &ChannelObserver{ +func newChannelObserver(params channelObserverParams, logger logger.Logger) *channelObserver { + return &channelObserver{ params: params, logger: logger, estimateTrend: ccutils.NewTrendDetector[int64](ccutils.TrendDetectorParams{ @@ -129,7 +107,7 @@ func NewChannelObserver(params ChannelObserverParams, logger logger.Logger) *Cha Logger: logger, Config: params.Config.Estimate, }), - nackTracker: NewNackTracker(NackTrackerParams{ + nackTracker: newNackTracker(nackTrackerParams{ Name: params.Name + "-nack", Logger: logger, Config: params.Config.Nack, @@ -137,61 +115,61 @@ func NewChannelObserver(params ChannelObserverParams, logger logger.Logger) *Cha } } -func (c *ChannelObserver) SeedEstimate(estimate int64) { +func (c *channelObserver) SeedEstimate(estimate int64) { c.estimateTrend.Seed(estimate) } -func (c *ChannelObserver) AddEstimate(estimate int64) { +func (c *channelObserver) AddEstimate(estimate int64) { c.estimateTrend.AddValue(estimate) } -func (c *ChannelObserver) AddNack(packets uint32, repeatedNacks uint32) { +func (c *channelObserver) AddNack(packets uint32, repeatedNacks uint32) { c.nackTracker.Add(packets, repeatedNacks) } -func (c *ChannelObserver) GetLowestEstimate() int64 { +func (c *channelObserver) GetLowestEstimate() int64 { return c.estimateTrend.GetLowest() } -func (c *ChannelObserver) GetHighestEstimate() int64 { +func (c *channelObserver) GetHighestEstimate() int64 { return c.estimateTrend.GetHighest() } -func (c *ChannelObserver) HasEnoughEstimateSamples() bool { +func (c *channelObserver) HasEnoughEstimateSamples() bool { return c.estimateTrend.HasEnoughSamples() } -func (c *ChannelObserver) GetNackRatio() float64 { +func (c *channelObserver) GetNackRatio() float64 { return c.nackTracker.GetRatio() } -/* STREAM-ALLOCATOR-DATA -func (c *ChannelObserver) GetNackHistory() []string { +/* REMOTE-BWE-DATA +func (c *channelObserver) GetNackHistory() []string { return c.nackTracker.GetHistory() } */ -func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason) { +func (c *channelObserver) GetTrend() (bwe.ChannelTrend, channelCongestionReason) { estimateDirection := c.estimateTrend.GetDirection() switch { case estimateDirection == ccutils.TrendDirectionDownward: - c.logger.Debugw("stream allocator: channel observer: estimate is trending downward", "channel", c.ToString()) - return ChannelTrendCongesting, ChannelCongestionReasonEstimate + c.logger.Debugw("stream allocator: channel observer: estimate is trending downward", "channel", c) + return bwe.ChannelTrendCongesting, channelCongestionReasonEstimate case c.nackTracker.IsTriggered(): - c.logger.Debugw("stream allocator: channel observer: high rate of repeated NACKs", "channel", c.ToString()) - return ChannelTrendCongesting, ChannelCongestionReasonLoss + c.logger.Debugw("stream allocator: channel observer: high rate of repeated NACKs", "channel", c) + return bwe.ChannelTrendCongesting, channelCongestionReasonLoss case estimateDirection == ccutils.TrendDirectionUpward: - return ChannelTrendClearing, ChannelCongestionReasonNone + return bwe.ChannelTrendClearing, channelCongestionReasonNone } - return ChannelTrendNeutral, ChannelCongestionReasonNone + return bwe.ChannelTrendNeutral, channelCongestionReasonNone } -func (c *ChannelObserver) ToString() string { - return fmt.Sprintf("name: %s, estimate: {%s}, nack {%s}", c.params.Name, c.estimateTrend.ToString(), c.nackTracker.ToString()) +func (c *channelObserver) String() string { + return fmt.Sprintf("name: %s, estimate: {%v}, nack {%v}", c.params.Name, c.estimateTrend, c.nackTracker) } // ------------------------------------------------ diff --git a/pkg/sfu/streamallocator/nacktracker.go b/pkg/sfu/bwe/remotebwe/nack_tracker.go similarity index 78% rename from pkg/sfu/streamallocator/nacktracker.go rename to pkg/sfu/bwe/remotebwe/nack_tracker.go index 5b678a243..750788e0b 100644 --- a/pkg/sfu/streamallocator/nacktracker.go +++ b/pkg/sfu/bwe/remotebwe/nack_tracker.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package streamallocator +package remotebwe import ( "fmt" @@ -30,13 +30,13 @@ type NackTrackerConfig struct { } var ( - DefaultNackTrackerConfigProbe = NackTrackerConfig{ + defaultNackTrackerConfigProbe = NackTrackerConfig{ WindowMinDuration: 500 * time.Millisecond, WindowMaxDuration: 1 * time.Second, RatioThreshold: 0.04, } - DefaultNackTrackerConfigNonProbe = NackTrackerConfig{ + defaultNackTrackerConfigNonProbe = NackTrackerConfig{ WindowMinDuration: 2 * time.Second, WindowMaxDuration: 3 * time.Second, RatioThreshold: 0.08, @@ -45,35 +45,35 @@ var ( // ------------------------------------------------ -type NackTrackerParams struct { +type nackTrackerParams struct { Name string Logger logger.Logger Config NackTrackerConfig } -type NackTracker struct { - params NackTrackerParams +type nackTracker struct { + params nackTrackerParams windowStartTime time.Time packets uint32 repeatedNacks uint32 - /* STREAM-ALLOCATOR-DATA - // STREAM-ALLOCATOR-EXPERIMENTAL-TODO: remove when cleaning up experimental stuff + /* REMOTE-BWE-DATA + // REMOTE-BWE-EXPERIMENTAL-TODO: remove when cleaning up experimental stuff history []string */ } -func NewNackTracker(params NackTrackerParams) *NackTracker { - return &NackTracker{ +func newNackTracker(params nackTrackerParams) *nackTracker { + return &nackTracker{ params: params, - // STREAM-ALLOCATOR-DATA history: make([]string, 0, 10), + // REMOTE-BWE-DATA history: make([]string, 0, 10), } } -func (n *NackTracker) Add(packets uint32, repeatedNacks uint32) { +func (n *nackTracker) Add(packets uint32, repeatedNacks uint32) { if n.params.Config.WindowMaxDuration != 0 && !n.windowStartTime.IsZero() && time.Since(n.windowStartTime) > n.params.Config.WindowMaxDuration { - // STREAM-ALLOCATOR-DATA n.updateHistory() + // REMOTE-BWE-DATA n.updateHistory() n.windowStartTime = time.Time{} n.packets = 0 @@ -96,7 +96,7 @@ func (n *NackTracker) Add(packets uint32, repeatedNacks uint32) { } } -func (n *NackTracker) GetRatio() float64 { +func (n *nackTracker) GetRatio() float64 { ratio := 0.0 if n.packets != 0 { ratio = float64(n.repeatedNacks) / float64(n.packets) @@ -108,7 +108,7 @@ func (n *NackTracker) GetRatio() float64 { return ratio } -func (n *NackTracker) IsTriggered() bool { +func (n *nackTracker) IsTriggered() bool { if n.params.Config.WindowMinDuration != 0 && !n.windowStartTime.IsZero() && time.Since(n.windowStartTime) > n.params.Config.WindowMinDuration { return n.GetRatio() > n.params.Config.RatioThreshold } @@ -116,7 +116,7 @@ func (n *NackTracker) IsTriggered() bool { return false } -func (n *NackTracker) ToString() string { +func (n *nackTracker) ToString() string { window := "" if !n.windowStartTime.IsZero() { now := time.Now() @@ -126,12 +126,12 @@ func (n *NackTracker) ToString() string { return fmt.Sprintf("n: %s, %s, p: %d, rn: %d, rn/p: %.2f", n.params.Name, window, n.packets, n.repeatedNacks, n.GetRatio()) } -/* STREAM-ALLOCATOR-DATA -func (n *NackTracker) GetHistory() []string { +/* REMOTE-BWE-DATA +func (n *nackTracker) GetHistory() []string { return n.history } -func (n *NackTracker) updateHistory() { +func (n *nackTracker) updateHistory() { if len(n.history) >= 10 { n.history = n.history[1:] } diff --git a/pkg/sfu/bwe/remotebwe/remote_bwe.go b/pkg/sfu/bwe/remotebwe/remote_bwe.go new file mode 100644 index 000000000..0edfdaffe --- /dev/null +++ b/pkg/sfu/bwe/remotebwe/remote_bwe.go @@ -0,0 +1,369 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remotebwe + +import ( + "sync" + "time" + + "github.com/frostbyte73/core" + "github.com/livekit/livekit-server/pkg/sfu/bwe" + "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils/mono" +) + +const ( + ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps +) + +// --------------------------------------------------------------------------- + +type RemoteBWEConfig struct { + NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"` + ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"` + ChannelObserverProbe ChannelObserverConfig `yaml:"channel_observer_probe,omitempty"` + ChannelObserverNonProbe ChannelObserverConfig `yaml:"channel_observer_non_probe,omitempty"` + CongestedMinDuration time.Duration `yaml:"congested_min_duration,omitempty"` + + PeriodicCheckInterval time.Duration `yaml:"periodic_check_interval,omitempty"` + PeriodicCheckIntervalCongested time.Duration `yaml:"periodic_check_interval_congested,omitempty"` +} + +var ( + DefaultRemoteBWEConfig = RemoteBWEConfig{ + NackRatioAttenuator: 0.4, + ExpectedUsageThreshold: 0.95, + ChannelObserverProbe: defaultChannelObserverConfigProbe, + ChannelObserverNonProbe: defaultChannelObserverConfigNonProbe, + CongestedMinDuration: 3 * time.Second, + PeriodicCheckInterval: 2 * time.Second, + PeriodicCheckIntervalCongested: 200 * time.Millisecond, + } +) + +// --------------------------------------------------------------------------- + +type RemoteBWEParams struct { + Config RemoteBWEConfig + Logger logger.Logger +} + +type RemoteBWE struct { + bwe.NullBWE + + params RemoteBWEParams + + lock sync.RWMutex + + lastReceivedEstimate int64 + lastExpectedBandwidthUsage int64 + isInProbe bool + committedChannelCapacity int64 + + channelObserver *channelObserver + + congestionState bwe.CongestionState + congestionStateSwitchedAt time.Time + + wake chan struct{} + stop core.Fuse + + bweListener bwe.BWEListener +} + +func NewRemoteBWE(params RemoteBWEParams) *RemoteBWE { + r := &RemoteBWE{ + params: params, + } + r.channelObserver = r.newChannelObserverNonProbe() + return r +} + +func (r *RemoteBWE) SetBWEListener(bweListener bwe.BWEListener) { + r.lock.Lock() + defer r.lock.Unlock() + + r.bweListener = bweListener +} + +func (r *RemoteBWE) getBWEListener() bwe.BWEListener { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.bweListener +} + +func (r *RemoteBWE) Reset() { + r.lock.Lock() + defer r.lock.Unlock() + + r.channelObserver = r.newChannelObserverNonProbe() +} + +func (r *RemoteBWE) Stop() { + r.stop.Break() +} + +func (r *RemoteBWE) HandleREMB( + receivedEstimate int64, + isProbeFinalizing bool, + expectedBandwidthUsage int64, + sentPackets uint32, + repeatedNacks uint32, +) { + r.lock.Lock() + r.lastReceivedEstimate = receivedEstimate + r.lastExpectedBandwidthUsage = expectedBandwidthUsage + + if !isProbeFinalizing { + r.channelObserver.AddEstimate(r.lastReceivedEstimate) + r.channelObserver.AddNack(sentPackets, repeatedNacks) + } + + var ( + shouldNotify bool + state bwe.CongestionState + committedChannelCapacity int64 + ) + if !r.isInProbe { + shouldNotify, state, committedChannelCapacity = r.congestionDetectionStateMachine() + } + r.lock.Unlock() + + if shouldNotify { + if bweListener := r.getBWEListener(); bweListener != nil { + bweListener.OnCongestionStateChange(state, committedChannelCapacity) + } + } +} + +func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState, int64) { + newState := r.congestionState + update := false + trend, reason := r.channelObserver.GetTrend() + switch r.congestionState { + case bwe.CongestionStateNone: + if trend == bwe.ChannelTrendCongesting { + if r.estimateAvailableChannelCapacity(reason) { + newState = bwe.CongestionStateCongested + } + } + + case bwe.CongestionStateCongested: + if trend == bwe.ChannelTrendCongesting { + if r.estimateAvailableChannelCapacity(reason) { + // update state sa this needs to reset switch time to wait for congestion min duration again + update = true + } + } else if time.Since(r.congestionStateSwitchedAt) >= r.params.Config.CongestedMinDuration { + newState = bwe.CongestionStateNone + } + } + + shouldNotify := false + if newState != r.congestionState || update { + r.updateCongestionState(newState, reason) + shouldNotify = true + } + + return shouldNotify, r.congestionState, r.committedChannelCapacity +} + +func (r *RemoteBWE) estimateAvailableChannelCapacity(reason channelCongestionReason) bool { + var estimateToCommit int64 + switch reason { + case channelCongestionReasonLoss: + estimateToCommit = int64(float64(r.lastExpectedBandwidthUsage) * (1.0 - r.params.Config.NackRatioAttenuator*r.channelObserver.GetNackRatio())) + default: + estimateToCommit = r.lastReceivedEstimate + } + if estimateToCommit > r.lastReceivedEstimate { + estimateToCommit = r.lastReceivedEstimate + } + + commitThreshold := int64(r.params.Config.ExpectedUsageThreshold * float64(r.lastExpectedBandwidthUsage)) + + ulgr := r.params.Logger.WithUnlikelyValues( + "reason", reason, + "old(bps)", r.committedChannelCapacity, + "new(bps)", estimateToCommit, + "lastReceived(bps)", r.lastReceivedEstimate, + "expectedUsage(bps)", r.lastExpectedBandwidthUsage, + "commitThreshold(bps)", commitThreshold, + "channel", r.channelObserver, + ) + if estimateToCommit > commitThreshold { + ulgr.Debugw("remote bwe: channel congestion detected, skipping above commit threshold channel capacity update") + return false + } + + ulgr.Infow("remote bwe: channel congestion detected, applying channel capacity update") + /* REMOTE-BWE-DATA + r.params.Logger.Debugw( + fmt.Sprintf("remote bwe: channel congestion detected, %s channel capacity: experimental", action), + "nackHistory", r.channelObserver.GetNackHistory(), + ) + */ + + r.committedChannelCapacity = estimateToCommit + + // reset to get new set of samples for next trend + r.channelObserver = r.newChannelObserverNonProbe() + return true +} + +func (r *RemoteBWE) updateCongestionState(state bwe.CongestionState, reason channelCongestionReason) { + r.params.Logger.Infow( + "remote bwe: congestion state change", + "from", r.congestionState, + "to", state, + "reason", reason, + "committedChannelCapacity", r.committedChannelCapacity, + ) + + if state != r.congestionState { + // notify worker for ticker interval management based on state + select { + case r.wake <- struct{}{}: + default: + } + } + + r.congestionState = state + r.congestionStateSwitchedAt = mono.Now() +} + +func (r *RemoteBWE) newChannelObserverNonProbe() *channelObserver { + return newChannelObserver( + channelObserverParams{ + Name: "non-probe", + Config: r.params.Config.ChannelObserverNonProbe, + }, + r.params.Logger, + ) +} + +func (r *RemoteBWE) ProbingStart(expectedBandwidthUsage int64) { + r.lock.Lock() + defer r.lock.Unlock() + + r.isInProbe = true + r.lastExpectedBandwidthUsage = expectedBandwidthUsage + + r.params.Logger.Debugw( + "stream allocator: starting probe", + "lastReceived", r.lastReceivedEstimate, + "expectedBandwidthUsage", expectedBandwidthUsage, + "channel", r.channelObserver, + ) + + r.channelObserver = newChannelObserver( + channelObserverParams{ + Name: "probe", + Config: r.params.Config.ChannelObserverProbe, + }, + r.params.Logger, + ) + r.channelObserver.SeedEstimate(r.lastReceivedEstimate) +} + +func (r *RemoteBWE) ProbingEnd(isNotFailing bool, isGoalReached bool) { + r.lock.Lock() + defer r.lock.Unlock() + + highestEstimateInProbe := r.channelObserver.GetHighestEstimate() + + // + // Reset estimator at the end of a probe irrespective of probe result to get fresh readings. + // With a failed probe, the latest estimate could be lower than committed estimate. + // As bandwidth estimator (remote in REMB case, local in TWCC case) holds state, + // subsequent estimates could start from the lower point. That should not trigger a + // downward trend and get latched to committed estimate as that would trigger a re-allocation. + // With fresh readings, as long as the trend is not going downward, it will not get latched. + // + // BWE-TODO: clean up this comment after implementing probing in TWCC case + // NOTE: With TWCC, it is possible to reset bandwidth estimation to clean state as + // the send side is in full control of bandwidth estimation. + // + r.params.Logger.Debugw( + "probe done", + "isNotFailing", isNotFailing, + "isGoalReached", isGoalReached, + "committedEstimate", r.committedChannelCapacity, + "highestEstimate", highestEstimateInProbe, + "channel", r.channelObserver, + ) + r.channelObserver = r.newChannelObserverNonProbe() + if !isNotFailing { + return + } + + if highestEstimateInProbe > r.committedChannelCapacity { + r.committedChannelCapacity = highestEstimateInProbe + } +} + +func (r *RemoteBWE) GetProbeStatus() (bool, bwe.ChannelTrend, int64, int64) { + r.lock.RLock() + defer r.lock.RUnlock() + + if !r.isInProbe { + return false, bwe.ChannelTrendNeutral, 0, 0 + } + + trend, _ := r.channelObserver.GetTrend() + return r.channelObserver.HasEnoughEstimateSamples(), + trend, + r.channelObserver.GetLowestEstimate(), + r.channelObserver.GetHighestEstimate() +} + +func (r *RemoteBWE) worker() { + ticker := time.NewTicker(r.params.Config.PeriodicCheckInterval) + defer ticker.Stop() + + for { + select { + case <-r.wake: + r.lock.RLock() + state := r.congestionState + r.lock.RUnlock() + if state == bwe.CongestionStateCongested { + ticker.Reset(r.params.Config.PeriodicCheckIntervalCongested) + } else { + ticker.Reset(r.params.Config.PeriodicCheckInterval) + } + + case <-ticker.C: + var ( + shouldNotify bool + state bwe.CongestionState + committedChannelCapacity int64 + ) + r.lock.Lock() + shouldNotify, state, committedChannelCapacity = r.congestionDetectionStateMachine() + r.lock.Unlock() + + if shouldNotify { + if bweListener := r.getBWEListener(); bweListener != nil { + bweListener.OnCongestionStateChange(state, committedChannelCapacity) + } + } + + case <-r.stop.Watch(): + return + } + } +} diff --git a/pkg/sfu/sendsidebwe/congestion_detector.go b/pkg/sfu/bwe/sendsidebwe/congestion_detector.go similarity index 91% rename from pkg/sfu/sendsidebwe/congestion_detector.go rename to pkg/sfu/bwe/sendsidebwe/congestion_detector.go index 3f8aba7d3..c525c9588 100644 --- a/pkg/sfu/sendsidebwe/congestion_detector.go +++ b/pkg/sfu/bwe/sendsidebwe/congestion_detector.go @@ -20,6 +20,7 @@ import ( "github.com/frostbyte73/core" "github.com/gammazero/deque" + "github.com/livekit/livekit-server/pkg/sfu/bwe" "github.com/livekit/livekit-server/pkg/sfu/ccutils" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils/mono" @@ -315,12 +316,12 @@ type congestionDetector struct { stop core.Fuse estimatedAvailableChannelCapacity int64 - congestionState CongestionState + congestionState bwe.CongestionState congestionStateSwitchedAt time.Time congestedCTRTrend *ccutils.TrendDetector[float64] congestedTrafficStats *trafficStats - onCongestionStateChange func(congestionState CongestionState, estimatedAvailableChannelCapacity int64) + bweListener bwe.BWEListener } func newCongestionDetector(params congestionDetectorParams) *congestionDetector { @@ -338,69 +339,33 @@ func newCongestionDetector(params congestionDetectorParams) *congestionDetector return c } +func (c *congestionDetector) Reset() { + // SSBWE-TODO + // 1. may be clear all packet groups? + // 2. reset congestion state to none + // 3. reset estimate to 100 Mbps + // 4. reset packet_tracker?? maybe only the probe state?? +} + func (c *congestionDetector) Stop() { c.stop.Break() } -func (c *congestionDetector) OnCongestionStateChange(f func(congestionState CongestionState, estimatedAvailableChannelCapacity int64)) { +func (c *congestionDetector) SetBWEListener(bweListener bwe.BWEListener) { c.lock.Lock() defer c.lock.Unlock() - c.onCongestionStateChange = f + c.bweListener = bweListener } -func (c *congestionDetector) GetCongestionState() CongestionState { +func (c *congestionDetector) getBWEListener() bwe.BWEListener { c.lock.RLock() defer c.lock.RUnlock() - return c.congestionState + return c.bweListener } -func (c *congestionDetector) updateCongestionState(state CongestionState, reason string, oldestContributingGroup int) { - c.lock.Lock() - c.params.Logger.Infow( - "congestion state change", - "from", c.congestionState, - "to", state, - "reason", reason, - "numPacketGroups", len(c.packetGroups), - "numContributingGroups", len(c.packetGroups[oldestContributingGroup:]), - "contributingGroups", logger.ObjectSlice(c.packetGroups[oldestContributingGroup:]), - "estimatedAvailableChannelCapacity", c.estimatedAvailableChannelCapacity, - ) - - prevState := c.congestionState - c.congestionState = state - - onCongestionStateChange := c.onCongestionStateChange - estimatedAvailableChannelCapacity := c.estimatedAvailableChannelCapacity - c.lock.Unlock() - - if onCongestionStateChange != nil { - onCongestionStateChange(state, estimatedAvailableChannelCapacity) - } - - // when in congested state, monitor changes in captured traffic ratio (CTR) - // to ensure allocations are in line with latest estimates, it is possible that - // the estimate is incorrect when congestion starts and the allocation may be - // sub-optimal and not enough to reduce/relieve congestion, by monitoing CTR - // on a continuous basis allocations can be adjusted in the direction of - // reducing/relieving congestion - if state == CongestionStateCongested && prevState != CongestionStateCongested { - c.resetCTRTrend() - } else if state != CongestionStateCongested { - c.clearCTRTrend() - } -} - -func (c *congestionDetector) GetEstimatedAvailableChannelCapacity() int64 { - c.lock.RLock() - defer c.lock.RUnlock() - - return c.estimatedAvailableChannelCapacity -} - -func (c *congestionDetector) HandleRTCP(report *rtcp.TransportLayerCC) { +func (c *congestionDetector) HandleTWCCFeedback(report *rtcp.TransportLayerCC) { c.lock.Lock() c.feedbackReports.PushBack(feedbackReport{mono.Now(), report}) c.lock.Unlock() @@ -483,55 +448,55 @@ func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool, } func (c *congestionDetector) congestionDetectionStateMachine() { - state := c.GetCongestionState() - newState := state + state := c.congestionState + newState := c.congestionState reason := "" earlyWarningTriggered, earlyWarningReason, congestedTriggered, congestedReason, oldestContributingGroup := c.isCongestionSignalTriggered() switch state { - case CongestionStateNone: + case bwe.CongestionStateNone: if congestedTriggered { c.params.Logger.Warnw("invalid congested state transition", nil, "from", state, "reason", congestedReason) } if earlyWarningTriggered { - newState = CongestionStateEarlyWarning + newState = bwe.CongestionStateEarlyWarning reason = earlyWarningReason } - case CongestionStateEarlyWarning: + case bwe.CongestionStateEarlyWarning: if congestedTriggered { - newState = CongestionStateCongested + newState = bwe.CongestionStateCongested reason = congestedReason } else if !earlyWarningTriggered { - newState = CongestionStateEarlyWarningHangover + newState = bwe.CongestionStateEarlyWarningHangover } - case CongestionStateEarlyWarningHangover: + case bwe.CongestionStateEarlyWarningHangover: if congestedTriggered { c.params.Logger.Warnw("invalid congested state transition", nil, "from", state, "reason", congestedReason) } if earlyWarningTriggered { - newState = CongestionStateEarlyWarning + newState = bwe.CongestionStateEarlyWarning reason = earlyWarningReason } else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.EarlyWarningHangover { - newState = CongestionStateNone + newState = bwe.CongestionStateNone } - case CongestionStateCongested: + case bwe.CongestionStateCongested: if !congestedTriggered { - newState = CongestionStateCongestedHangover + newState = bwe.CongestionStateCongestedHangover } - case CongestionStateCongestedHangover: + case bwe.CongestionStateCongestedHangover: if congestedTriggered { c.params.Logger.Warnw("invalid congested state transition", nil, "from", state, "reason", congestedReason) } if earlyWarningTriggered { - newState = CongestionStateEarlyWarning + newState = bwe.CongestionStateEarlyWarning reason = earlyWarningReason } else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.CongestedHangover { - newState = CongestionStateNone + newState = bwe.CongestionStateNone } } @@ -539,7 +504,6 @@ func (c *congestionDetector) congestionDetectionStateMachine() { // update after running the above estimate as state change callback includes the estimated available channel capacity if newState != state { - c.congestionStateSwitchedAt = mono.Now() c.updateCongestionState(newState, reason, oldestContributingGroup) } } @@ -578,16 +542,10 @@ func (c *congestionDetector) updateCTRTrend(pg *packetGroup) { return } - c.params.Logger.Infow("captured traffic ratio is trending downward", "channel", c.congestedCTRTrend.ToString()) + c.params.Logger.Infow("captured traffic ratio is trending downward", "channel", c.congestedCTRTrend) - c.lock.RLock() - state := c.congestionState - estimatedAvailableChannelCapacity := c.estimatedAvailableChannelCapacity - onCongestionStateChange := c.onCongestionStateChange - c.lock.RUnlock() - - if onCongestionStateChange != nil { - onCongestionStateChange(state, estimatedAvailableChannelCapacity) + if bweListener := c.getBWEListener(); bweListener != nil { + bweListener.OnCongestionStateChange(c.congestionState, c.estimatedAvailableChannelCapacity) } // reset to get new set of samples for next trend @@ -628,9 +586,43 @@ func (c *congestionDetector) estimateAvailableChannelCapacity() { return } - c.lock.Lock() c.estimatedAvailableChannelCapacity = agg.AcknowledgedBitrate() - c.lock.Unlock() +} + +func (c *congestionDetector) updateCongestionState(state bwe.CongestionState, reason string, oldestContributingGroup int) { + c.params.Logger.Infow( + "congestion state change", + "from", c.congestionState, + "to", state, + "reason", reason, + "numPacketGroups", len(c.packetGroups), + "numContributingGroups", len(c.packetGroups[oldestContributingGroup:]), + "contributingGroups", logger.ObjectSlice(c.packetGroups[oldestContributingGroup:]), + "estimatedAvailableChannelCapacity", c.estimatedAvailableChannelCapacity, + ) + + if state != c.congestionState { + c.congestionStateSwitchedAt = mono.Now() + } + + prevState := c.congestionState + c.congestionState = state + + if bweListener := c.getBWEListener(); bweListener != nil { + bweListener.OnCongestionStateChange(state, c.estimatedAvailableChannelCapacity) + } + + // when in congested state, monitor changes in captured traffic ratio (CTR) + // to ensure allocations are in line with latest estimates, it is possible that + // the estimate is incorrect when congestion starts and the allocation may be + // sub-optimal and not enough to reduce/relieve congestion, by monitoing CTR + // on a continuous basis allocations can be adjusted in the direction of + // reducing/relieving congestion + if state == bwe.CongestionStateCongested && prevState != bwe.CongestionStateCongested { + c.resetCTRTrend() + } else if state != bwe.CongestionStateCongested { + c.clearCTRTrend() + } } func (c *congestionDetector) processFeedbackReport(fbr feedbackReport) { @@ -787,7 +779,7 @@ func (c *congestionDetector) worker() { c.processFeedbackReport(fbReport) } - if c.GetCongestionState() == CongestionStateCongested { + if c.congestionState == bwe.CongestionStateCongested { ticker.Reset(c.params.Config.PeriodicCheckIntervalCongested) } else { ticker.Reset(c.params.Config.PeriodicCheckInterval) diff --git a/pkg/sfu/sendsidebwe/packet_group.go b/pkg/sfu/bwe/sendsidebwe/packet_group.go similarity index 100% rename from pkg/sfu/sendsidebwe/packet_group.go rename to pkg/sfu/bwe/sendsidebwe/packet_group.go diff --git a/pkg/sfu/sendsidebwe/packet_info.go b/pkg/sfu/bwe/sendsidebwe/packet_info.go similarity index 100% rename from pkg/sfu/sendsidebwe/packet_info.go rename to pkg/sfu/bwe/sendsidebwe/packet_info.go diff --git a/pkg/sfu/sendsidebwe/packet_tracker.go b/pkg/sfu/bwe/sendsidebwe/packet_tracker.go similarity index 100% rename from pkg/sfu/sendsidebwe/packet_tracker.go rename to pkg/sfu/bwe/sendsidebwe/packet_tracker.go diff --git a/pkg/sfu/sendsidebwe/send_side_bwe.go b/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go similarity index 79% rename from pkg/sfu/sendsidebwe/send_side_bwe.go rename to pkg/sfu/bwe/sendsidebwe/send_side_bwe.go index 5a0207863..653b2e3e5 100644 --- a/pkg/sfu/sendsidebwe/send_side_bwe.go +++ b/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go @@ -15,9 +15,9 @@ package sendsidebwe import ( - "fmt" - + "github.com/livekit/livekit-server/pkg/sfu/bwe" "github.com/livekit/protocol/logger" + "github.com/pion/rtcp" ) // @@ -50,35 +50,6 @@ import ( // --------------------------------------------------------------------------- -type CongestionState int - -const ( - CongestionStateNone CongestionState = iota - CongestionStateEarlyWarning - CongestionStateEarlyWarningHangover - CongestionStateCongested - CongestionStateCongestedHangover -) - -func (c CongestionState) String() string { - switch c { - case CongestionStateNone: - return "NONE" - case CongestionStateEarlyWarning: - return "EARLY_WARNING" - case CongestionStateEarlyWarningHangover: - return "EARLY_WARNING_HANGOVER" - case CongestionStateCongested: - return "CONGESTED" - case CongestionStateCongestedHangover: - return "CONGESTED_HANGOVER" - default: - return fmt.Sprintf("%d", int(c)) - } -} - -// --------------------------------------------------------------------------- - type SendSideBWEConfig struct { CongestionDetector CongestionDetectorConfig `yaml:"congestion_detector,omitempty"` } @@ -97,6 +68,8 @@ type SendSideBWEParams struct { } type SendSideBWE struct { + bwe.NullBWE + params SendSideBWEParams *congestionDetector @@ -112,8 +85,20 @@ func NewSendSideBWE(params SendSideBWEParams) *SendSideBWE { } } +func (s *SendSideBWE) SetBWEListener(bweListener bwe.BWEListener) { + s.congestionDetector.SetBWEListener(bweListener) +} + +func (s *SendSideBWE) Reset() { + s.congestionDetector.Reset() +} + func (s *SendSideBWE) Stop() { s.congestionDetector.Stop() } +func (s *SendSideBWE) HandleTWCCFeedback(report *rtcp.TransportLayerCC) { + s.congestionDetector.HandleTWCCFeedback(report) +} + // ------------------------------------------------ diff --git a/pkg/sfu/sendsidebwe/traffic_stats.go b/pkg/sfu/bwe/sendsidebwe/traffic_stats.go similarity index 100% rename from pkg/sfu/sendsidebwe/traffic_stats.go rename to pkg/sfu/bwe/sendsidebwe/traffic_stats.go diff --git a/pkg/sfu/sendsidebwe/twcc_feedback.go b/pkg/sfu/bwe/sendsidebwe/twcc_feedback.go similarity index 100% rename from pkg/sfu/sendsidebwe/twcc_feedback.go rename to pkg/sfu/bwe/sendsidebwe/twcc_feedback.go diff --git a/pkg/sfu/ccutils/trenddetector.go b/pkg/sfu/ccutils/trenddetector.go index 4806387bc..3f76fcb97 100644 --- a/pkg/sfu/ccutils/trenddetector.go +++ b/pkg/sfu/ccutils/trenddetector.go @@ -154,7 +154,7 @@ func (t *TrendDetector[T]) HasEnoughSamples() bool { return t.numSamples >= t.params.Config.RequiredSamples } -func (t *TrendDetector[T]) ToString() string { +func (t *TrendDetector[T]) String() string { samplesStr := "" if len(t.samples) > 0 { firstTime := t.samples[0].at diff --git a/pkg/sfu/pacer/base.go b/pkg/sfu/pacer/base.go index 5ced2b987..9d6344cd0 100644 --- a/pkg/sfu/pacer/base.go +++ b/pkg/sfu/pacer/base.go @@ -19,7 +19,7 @@ import ( "io" "time" - "github.com/livekit/livekit-server/pkg/sfu/sendsidebwe" + "github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils/mono" "github.com/pion/rtp" diff --git a/pkg/sfu/pacer/leaky_bucket.go b/pkg/sfu/pacer/leaky_bucket.go index add76fd3a..325779f94 100644 --- a/pkg/sfu/pacer/leaky_bucket.go +++ b/pkg/sfu/pacer/leaky_bucket.go @@ -20,7 +20,7 @@ import ( "github.com/frostbyte73/core" "github.com/gammazero/deque" - "github.com/livekit/livekit-server/pkg/sfu/sendsidebwe" + "github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe" "github.com/livekit/protocol/logger" ) diff --git a/pkg/sfu/pacer/no_queue.go b/pkg/sfu/pacer/no_queue.go index 86eadebb7..5e0037fa9 100644 --- a/pkg/sfu/pacer/no_queue.go +++ b/pkg/sfu/pacer/no_queue.go @@ -19,7 +19,7 @@ import ( "github.com/frostbyte73/core" "github.com/gammazero/deque" - "github.com/livekit/livekit-server/pkg/sfu/sendsidebwe" + "github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe" "github.com/livekit/protocol/logger" ) diff --git a/pkg/sfu/pacer/pass_through.go b/pkg/sfu/pacer/pass_through.go index 62a4867c6..1bc33026d 100644 --- a/pkg/sfu/pacer/pass_through.go +++ b/pkg/sfu/pacer/pass_through.go @@ -15,7 +15,7 @@ package pacer import ( - "github.com/livekit/livekit-server/pkg/sfu/sendsidebwe" + "github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe" "github.com/livekit/protocol/logger" ) diff --git a/pkg/sfu/streamallocator/probe_controller.go b/pkg/sfu/streamallocator/probe_controller.go index 8c8b46178..411e46d7b 100644 --- a/pkg/sfu/streamallocator/probe_controller.go +++ b/pkg/sfu/streamallocator/probe_controller.go @@ -18,8 +18,8 @@ import ( "sync" "time" + "github.com/livekit/livekit-server/pkg/sfu/bwe" "github.com/livekit/livekit-server/pkg/sfu/ccutils" - "github.com/livekit/livekit-server/pkg/sfu/sendsidebwe" "github.com/livekit/protocol/logger" ) @@ -75,7 +75,7 @@ type ProbeController struct { params ProbeControllerParams lock sync.RWMutex - sendSideBWE *sendsidebwe.SendSideBWE + bwe bwe.BWE probeInterval time.Duration lastProbeStartTime time.Time probeGoalBps int64 @@ -98,11 +98,11 @@ func NewProbeController(params ProbeControllerParams) *ProbeController { return p } -func (p *ProbeController) SetSendSideBWE(sendSideBWE *sendsidebwe.SendSideBWE) { +func (p *ProbeController) SetBWE(bwe bwe.BWE) { p.lock.Lock() defer p.lock.Unlock() - p.sendSideBWE = sendSideBWE + p.bwe = bwe } func (p *ProbeController) Reset() { @@ -131,49 +131,9 @@ func (p *ProbeController) ProbeClusterDone(info ccutils.ProbeClusterInfo) { p.lock.Unlock() } -func (p *ProbeController) CheckProbe(trend ChannelTrend, highestEstimate int64) { - p.lock.Lock() - defer p.lock.Unlock() - - if p.probeClusterId == ccutils.ProbeClusterIdInvalid { - return - } - - if trend != ChannelTrendNeutral { - p.probeTrendObserved = true - } - - switch { - case !p.probeTrendObserved && time.Since(p.lastProbeStartTime) > p.params.Config.TrendWait: - // - // More of a safety net. - // In rare cases, the estimate gets stuck. Prevent from probe running amok - // STREAM-ALLOCATOR-TODO: Need more testing here to ensure that probe does not cause a lot of damage - // - p.params.Logger.Debugw("stream allocator: probe: aborting, no trend", "cluster", p.probeClusterId) - p.abortProbeLocked() - - case trend == ChannelTrendCongesting: - // stop immediately if the probe is congesting channel more - p.params.Logger.Debugw("stream allocator: probe: aborting, channel is congesting", "cluster", p.probeClusterId) - p.abortProbeLocked() - - case highestEstimate > p.probeGoalBps: - // reached goal, stop probing - p.params.Logger.Infow( - "stream allocator: probe: stopping, goal reached", - "cluster", p.probeClusterId, - "goal", p.probeGoalBps, - "highest", highestEstimate, - ) - p.goalReachedProbeClusterId = p.probeClusterId - p.StopProbe() - } -} - func (p *ProbeController) MaybeFinalizeProbe( isComplete bool, - trend ChannelTrend, + trend bwe.ChannelTrend, lowestEstimate int64, ) (isHandled bool, isNotFailing bool, isGoalReached bool) { p.lock.Lock() @@ -185,7 +145,7 @@ func (p *ProbeController) MaybeFinalizeProbe( if p.goalReachedProbeClusterId != ccutils.ProbeClusterIdInvalid { // finalise goal reached probe cluster - p.finalizeProbeLocked(ChannelTrendNeutral) + p.finalizeProbeLocked(bwe.ChannelTrendNeutral) return true, true, true } @@ -193,8 +153,8 @@ func (p *ProbeController) MaybeFinalizeProbe( p.probeEndTime.IsZero() && p.doneProbeClusterInfo.Id != ccutils.ProbeClusterIdInvalid && p.doneProbeClusterInfo.Id == p.probeClusterId { // ensure any queueing due to probing is flushed - // STREAM-ALLOCATOR-TODO: CongestionControlProbeConfig.SettleWait should actually be a certain number of RTTs. - expectedDuration := float64(9.0) + // STREAM-ALLOCATOR-TODO: ProbeControllerConfig.SettleWait should actually be a certain number of RTTs. + expectedDuration := float64(0.0) if lowestEstimate != 0 { expectedDuration = float64(p.doneProbeClusterInfo.BytesSent*8*1000) / float64(lowestEstimate) } @@ -232,12 +192,12 @@ func (p *ProbeController) DoesProbeNeedFinalize() bool { return p.abortedProbeClusterId != ccutils.ProbeClusterIdInvalid || p.goalReachedProbeClusterId != ccutils.ProbeClusterIdInvalid } -func (p *ProbeController) finalizeProbeLocked(trend ChannelTrend) (isNotFailing bool) { +func (p *ProbeController) finalizeProbeLocked(trend bwe.ChannelTrend) (isNotFailing bool) { aborted := p.probeClusterId == p.abortedProbeClusterId p.clearProbeLocked() - if aborted || trend == ChannelTrendCongesting { + if aborted || trend == bwe.ChannelTrendCongesting { // failed probe, backoff p.backoffProbeIntervalLocked() p.resetProbeDurationLocked() @@ -246,7 +206,7 @@ func (p *ProbeController) finalizeProbeLocked(trend ChannelTrend) (isNotFailing // reset probe interval and increase probe duration on a upward trending probe p.resetProbeIntervalLocked() - if trend == ChannelTrendClearing { + if trend == bwe.ChannelTrendClearing { p.increaseProbeDurationLocked() } return true @@ -281,18 +241,18 @@ func (p *ProbeController) InitProbe(probeGoalDeltaBps int64, expectedBandwidthUs time.Duration(float64(p.probeDuration.Milliseconds())*p.params.Config.DurationOverflowFactor)*time.Millisecond, ) - p.pollProbe(p.probeClusterId) + p.pollProbe(p.probeClusterId, expectedBandwidthUsage) return p.probeClusterId, p.probeGoalBps } -// SSBWE-TODO: try to do same path for both SSBWE and regular, the congesting part might be different though -func (p *ProbeController) pollProbe(probeClusterId ccutils.ProbeClusterId) { - if p.sendSideBWE == nil { +func (p *ProbeController) pollProbe(probeClusterId ccutils.ProbeClusterId, expectedBandwidthUsage int64) { + if p.bwe == nil { return } - startingEstimate := p.sendSideBWE.GetEstimatedAvailableChannelCapacity() + p.bwe.ProbingStart(expectedBandwidthUsage) + go func() { for { p.lock.Lock() @@ -302,10 +262,37 @@ func (p *ProbeController) pollProbe(probeClusterId ccutils.ProbeClusterId) { } done := false - congestionState := p.sendSideBWE.GetCongestionState() - currentEstimate := p.sendSideBWE.GetEstimatedAvailableChannelCapacity() + + _, trend, _, highestEstimate := p.bwe.GetProbeStatus() + if !p.probeTrendObserved && trend != bwe.ChannelTrendNeutral { + p.probeTrendObserved = true + } + switch { - case currentEstimate <= startingEstimate && time.Since(p.lastProbeStartTime) > p.params.Config.TrendWait: + case trend == bwe.ChannelTrendCongesting: + // stop immediately if the probe is congesting channel more + p.params.Logger.Infow( + "stream allocator: probe: aborting, channel is congesting", + "cluster", probeClusterId, + ) + p.abortProbeLocked() + done = true + break + + case highestEstimate > p.probeGoalBps: + // reached goal, stop probing + p.params.Logger.Infow( + "stream allocator: probe: stopping, goal reached", + "cluster", probeClusterId, + "goal", p.probeGoalBps, + "highestEstimate", highestEstimate, + ) + p.goalReachedProbeClusterId = p.probeClusterId + p.StopProbe() + done = true + break + + case !p.probeTrendObserved && time.Since(p.lastProbeStartTime) > p.params.Config.TrendWait: // // More of a safety net. // In rare cases, the estimate gets stuck. Prevent from probe running amok @@ -315,30 +302,6 @@ func (p *ProbeController) pollProbe(probeClusterId ccutils.ProbeClusterId) { p.abortProbeLocked() done = true break - - case congestionState == sendsidebwe.CongestionStateCongested || congestionState == sendsidebwe.CongestionStateEarlyWarning: - // stop immediately if the probe is congesting channel more - p.params.Logger.Infow( - "stream allocator: probe: aborting, channel is congesting", - "cluster", probeClusterId, - "congestionState", congestionState, - ) - p.abortProbeLocked() - done = true - break - - case currentEstimate > p.probeGoalBps: - // reached goal, stop probing - p.params.Logger.Infow( - "stream allocator: probe: stopping, goal reached", - "cluster", probeClusterId, - "goal", p.probeGoalBps, - "current", currentEstimate, - ) - p.goalReachedProbeClusterId = p.probeClusterId - p.StopProbe() - done = true - break } p.lock.Unlock() @@ -346,7 +309,7 @@ func (p *ProbeController) pollProbe(probeClusterId ccutils.ProbeClusterId) { return } - // SSBWE-TODO: do not hard code sleep time + // BWE-TODO: do not hard code sleep time time.Sleep(50 * time.Millisecond) } }() @@ -397,13 +360,6 @@ func (p *ProbeController) abortProbeLocked() { p.StopProbe() } -func (p *ProbeController) IsInProbe() bool { - p.lock.RLock() - defer p.lock.RUnlock() - - return p.isInProbeLocked() -} - func (p *ProbeController) isInProbeLocked() bool { return p.probeClusterId != ccutils.ProbeClusterIdInvalid } diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 7c244831b..3c004b620 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -30,8 +30,8 @@ import ( "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/buffer" + "github.com/livekit/livekit-server/pkg/sfu/bwe" "github.com/livekit/livekit-server/pkg/sfu/ccutils" - "github.com/livekit/livekit-server/pkg/sfu/sendsidebwe" "github.com/livekit/livekit-server/pkg/utils" ) @@ -151,24 +151,16 @@ const ( ) type StreamAllocatorConfig struct { - NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"` - ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"` ProbeMode ProbeMode `yaml:"probe_mode,omitempty"` MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"` ProbeController ProbeControllerConfig `yaml:"probe_controller,omitempty"` - ChannelObserverProbe ChannelObserverConfig `yaml:"channel_observer_probe,omitempty"` - ChannelObserverNonProbe ChannelObserverConfig `yaml:"channel_observer_non_probe,omitempty"` DisableEstimationUnmanagedTracks bool `yaml:"disable_etimation_unmanaged_tracks,omitempty"` } var ( DefaultStreamAllocatorConfig = StreamAllocatorConfig{ - NackRatioAttenuator: 0.4, - ExpectedUsageThreshold: 0.95, - ProbeMode: ProbeModePadding, - ProbeController: DefaultProbeControllerConfig, - ChannelObserverProbe: DefaultChannelObserverConfigProbe, - ChannelObserverNonProbe: DefaultChannelObserverConfigNonProbe, + ProbeMode: ProbeModePadding, + ProbeController: DefaultProbeControllerConfig, } ) @@ -184,13 +176,12 @@ type StreamAllocator struct { onStreamStateChange func(update *StreamStateUpdate) error + bwe bwe.BWE sendSideBWEInterceptor cc.BandwidthEstimator - sendSideBWE *sendsidebwe.SendSideBWE enabled bool allowPause bool - lastReceivedEstimate int64 committedChannelCapacity int64 overriddenChannelCapacity int64 @@ -198,7 +189,6 @@ type StreamAllocator struct { prober *ccutils.Prober - channelObserver *ChannelObserver // STREAM-ALLOCATOR-DATA rateMonitor *RateMonitor videoTracksMu sync.RWMutex @@ -206,8 +196,9 @@ type StreamAllocator struct { isAllocateAllPending bool rembTrackingSSRC uint32 - state streamAllocatorState - isHolding bool + state streamAllocatorState + congestionState bwe.CongestionState + isHolding bool eventsQueue *utils.TypedOpsQueue[Event] @@ -263,6 +254,14 @@ func (s *StreamAllocator) OnStreamStateChange(f func(update *StreamStateUpdate) s.onStreamStateChange = f } +func (s *StreamAllocator) SetBWE(bwe bwe.BWE) { + if bwe != nil { + bwe.SetBWEListener(s) + } + s.bwe = bwe + s.probeController.SetBWE(bwe) +} + func (s *StreamAllocator) SetSendSideBWEInterceptor(sendSideBWEInterceptor cc.BandwidthEstimator) { if sendSideBWEInterceptor != nil { sendSideBWEInterceptor.OnTargetBitrateChange(s.onTargetBitrateChange) @@ -270,14 +269,6 @@ func (s *StreamAllocator) SetSendSideBWEInterceptor(sendSideBWEInterceptor cc.Ba s.sendSideBWEInterceptor = sendSideBWEInterceptor } -func (s *StreamAllocator) SetSendSideBWE(sendSideBWE *sendsidebwe.SendSideBWE) { - if sendSideBWE != nil { - sendSideBWE.OnCongestionStateChange(s.onCongestionStateChange) - } - s.sendSideBWE = sendSideBWE - s.probeController.SetSendSideBWE(sendSideBWE) -} - type AddTrackParams struct { Source livekit.TrackSource Priority uint8 @@ -355,7 +346,9 @@ func (s *StreamAllocator) SetChannelCapacity(channelCapacity int64) { } func (s *StreamAllocator) resetState() { - s.channelObserver = s.newChannelObserverNonProbe() + if s.bwe != nil { + s.bwe.Reset() + } s.probeController.Reset() s.state = streamAllocatorStateStable @@ -447,8 +440,8 @@ func (s *StreamAllocator) OnTransportCCFeedback(downTrack *sfu.DownTrack, fb *rt s.sendSideBWEInterceptor.WriteRTCP([]rtcp.Packet{fb}, nil) } - if s.sendSideBWE != nil { - s.sendSideBWE.HandleRTCP(fb) + if s.bwe != nil { + s.bwe.HandleTWCCFeedback(fb) } } @@ -462,11 +455,12 @@ func (s *StreamAllocator) onTargetBitrateChange(bitrate int) { // called when congestion state changes (send side bandwidth estimation) type congestionStateChangeData struct { - congestionState sendsidebwe.CongestionState + congestionState bwe.CongestionState estimatedAvailableChannelCapacity int64 } -func (s *StreamAllocator) onCongestionStateChange(congestionState sendsidebwe.CongestionState, estimatedAvailableChannelCapacity int64) { +// BWEListener implementation +func (s *StreamAllocator) OnCongestionStateChange(congestionState bwe.CongestionState, estimatedAvailableChannelCapacity int64) { s.postEvent(Event{ Signal: streamAllocatorSignalCongestionStateChange, Data: congestionStateChangeData{ @@ -707,27 +701,33 @@ func (s *StreamAllocator) handleSignalAdjustState(Event) { func (s *StreamAllocator) handleSignalEstimate(event Event) { receivedEstimate, _ := event.Data.(int64) - s.lastReceivedEstimate = receivedEstimate - // s.monitorRate(receivedEstimate) - // while probing, maintain estimate separately to enable keeping current committed estimate if probe fails - if s.probeController.IsInProbe() { - s.handleNewEstimateInProbe() - } else { - s.handleNewEstimateInNonProbe() + // always update NACKs + packetDelta, repeatedNackDelta := s.getNackDelta() + + if s.bwe != nil { + s.bwe.HandleREMB( + receivedEstimate, + s.probeController.DoesProbeNeedFinalize(), // waiting for goal reached OR aborted probe to finalize + s.getExpectedBandwidthUsage(), + packetDelta, + repeatedNackDelta, + ) } } func (s *StreamAllocator) handleSignalPeriodicPing(Event) { // finalize probe if necessary - trend, _ := s.channelObserver.GetTrend() - isHandled, isNotFailing, isGoalReached := s.probeController.MaybeFinalizeProbe( - s.channelObserver.HasEnoughEstimateSamples(), - trend, - s.channelObserver.GetLowestEstimate(), - ) - if isHandled { - s.onProbeDone(isNotFailing, isGoalReached) + if s.bwe != nil { + isValidSignal, trend, lowestEstimate, highestEstimate := s.bwe.GetProbeStatus() + isHandled, isNotFailing, isGoalReached := s.probeController.MaybeFinalizeProbe( + isValidSignal, + trend, + lowestEstimate, + ) + if isHandled { + s.onProbeDone(isNotFailing, isGoalReached, highestEstimate) + } } // probe if necessary and timing is right @@ -735,7 +735,10 @@ func (s *StreamAllocator) handleSignalPeriodicPing(Event) { s.maybeProbe() } - // s.updateTracksHistory() + /* STREAM-ALLOCATOR-DATA + s.monitorRate(s.committedChannelCapacity) + s.updateTracksHistory() + */ } func (s *StreamAllocator) handleSignalSendProbe(event Event) { @@ -819,21 +822,19 @@ func (s *StreamAllocator) handleSignalRTCPReceiverReport(event Event) { func (s *StreamAllocator) handleSignalCongestionStateChange(event Event) { cscd := event.Data.(congestionStateChangeData) - if cscd.congestionState != sendsidebwe.CongestionStateNone { + if cscd.congestionState != bwe.CongestionStateNone { s.probeController.AbortProbe() } - if cscd.congestionState == sendsidebwe.CongestionStateEarlyWarning || - cscd.congestionState == sendsidebwe.CongestionStateEarlyWarningHangover { + if cscd.congestionState == bwe.CongestionStateEarlyWarning || + cscd.congestionState == bwe.CongestionStateEarlyWarningHangover { s.isHolding = true } else { - s.isHolding = false - // early warning is done and hold has been released, // if there is no congestion, allocate all tracks optimally as // some tracks may have been held at sub-optimal allocation // during early warning hold - if cscd.congestionState == sendsidebwe.CongestionStateNone && s.state == streamAllocatorStateStable { + if s.isHolding && cscd.congestionState == bwe.CongestionStateNone && s.state == streamAllocatorStateStable { update := NewStreamStateUpdate() for _, track := range s.getTracks() { allocation := track.AllocateOptimal(FlagAllowOvershootWhileOptimal, s.isHolding) @@ -841,19 +842,39 @@ func (s *StreamAllocator) handleSignalCongestionStateChange(event Event) { } s.maybeSendUpdate(update) } + + s.isHolding = false } - if cscd.congestionState == sendsidebwe.CongestionStateCongested { + if cscd.congestionState == bwe.CongestionStateCongested { s.params.Logger.Infow( "stream allocator: channel congestion detected, updating channel capacity", "old(bps)", s.committedChannelCapacity, "new(bps)", cscd.estimatedAvailableChannelCapacity, "expectedUsage(bps)", s.getExpectedBandwidthUsage(), ) + /* STREAM-ALLOCATOR-DATA + s.params.Logger.Debugw( + fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity: experimental", action), + "rateHistory", s.rateMonitor.GetHistory(), + "expectedQueuing", s.rateMonitor.GetQueuingGuess(), + "trackHistory", s.getTracksHistory(), + ) + */ s.committedChannelCapacity = cscd.estimatedAvailableChannelCapacity + // reset probe to ensure it does not start too soon after a downward trend + // BWE-TODO: maybe probe controller setting should be algorithm specific + // BWE-TODO: for e. g., the reset could be waiting shorter in SSBWE case + // BWE-TODO: a couple of things to consider + // BWE-TODO: 1. Make ProbeController be owned by BWE modules? + // BWE-TODO: 2. Add an interface method to BWE to check if probe controller should be reset? + s.probeController.Reset() + s.allocateAllTracks() } + + s.congestionState = cscd.congestionState } func (s *StreamAllocator) setState(state streamAllocatorState) { @@ -866,8 +887,16 @@ func (s *StreamAllocator) setState(state streamAllocatorState) { // reset probe to enforce a delay after state change before probing s.probeController.Reset() - // a fresh channel observer after state transition to get clean data - s.channelObserver = s.newChannelObserverNonProbe() + + // a fresh start after state transition to get clean data + if s.bwe != nil { + // BWE-TODO: ssbwe maybe should not reset like this as it might have useful state across + // BWE-TODO: state changes in this module, actually even remotebwe should also manage it + // BWE-TODO: internally, Reset should probably only be used if all managed tracks go away + // BWE-TODO: and we can get a clean start, mimicking existing behaviour till this can be + // BWE-TODO: evaluated more. + s.bwe.Reset() + } } func (s *StreamAllocator) adjustState() { @@ -881,99 +910,6 @@ func (s *StreamAllocator) adjustState() { s.setState(streamAllocatorStateStable) } -func (s *StreamAllocator) handleNewEstimateInProbe() { - // always update NACKs, even if aborted - packetDelta, repeatedNackDelta := s.getNackDelta() - - if s.probeController.DoesProbeNeedFinalize() { - // waiting for aborted probe to finalize - return - } - - s.channelObserver.AddEstimate(s.lastReceivedEstimate) - s.channelObserver.AddNack(packetDelta, repeatedNackDelta) - - trend, _ := s.channelObserver.GetTrend() - s.probeController.CheckProbe(trend, s.channelObserver.GetHighestEstimate()) -} - -func (s *StreamAllocator) handleNewEstimateInNonProbe() { - s.channelObserver.AddEstimate(s.lastReceivedEstimate) - - packetDelta, repeatedNackDelta := s.getNackDelta() - s.channelObserver.AddNack(packetDelta, repeatedNackDelta) - - trend, reason := s.channelObserver.GetTrend() - if trend != ChannelTrendCongesting { - return - } - - var estimateToCommit int64 - expectedBandwidthUsage := s.getExpectedBandwidthUsage() - switch reason { - case ChannelCongestionReasonLoss: - estimateToCommit = int64(float64(expectedBandwidthUsage) * (1.0 - s.params.Config.NackRatioAttenuator*s.channelObserver.GetNackRatio())) - default: - estimateToCommit = s.lastReceivedEstimate - } - if estimateToCommit > s.lastReceivedEstimate { - estimateToCommit = s.lastReceivedEstimate - } - - commitThreshold := int64(s.params.Config.ExpectedUsageThreshold * float64(expectedBandwidthUsage)) - action := "applying" - if estimateToCommit > commitThreshold { - action = "skipping" - } - - if action == "applying" { - s.params.Logger.Infow( - fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity update", action), - "reason", reason, - "old(bps)", s.committedChannelCapacity, - "new(bps)", estimateToCommit, - "lastReceived(bps)", s.lastReceivedEstimate, - "expectedUsage(bps)", expectedBandwidthUsage, - "commitThreshold(bps)", commitThreshold, - "channel", s.channelObserver.ToString(), - ) - } else { - s.params.Logger.Debugw( - fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity update", action), - "reason", reason, - "old(bps)", s.committedChannelCapacity, - "new(bps)", estimateToCommit, - "lastReceived(bps)", s.lastReceivedEstimate, - "expectedUsage(bps)", expectedBandwidthUsage, - "commitThreshold(bps)", commitThreshold, - "channel", s.channelObserver.ToString(), - ) - } - /* STREAM-ALLOCATOR-DATA - s.params.Logger.Debugw( - fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity: experimental", action), - "rateHistory", s.rateMonitor.GetHistory(), - "expectedQueuing", s.rateMonitor.GetQueuingGuess(), - "nackHistory", s.channelObserver.GetNackHistory(), - "trackHistory", s.getTracksHistory(), - ) - */ - if estimateToCommit > commitThreshold { - // estimate to commit is either higher or within tolerance of expected uage, skip committing and re-allocating - return - } - - s.committedChannelCapacity = estimateToCommit - - // reset to get new set of samples for next trend - s.channelObserver = s.newChannelObserverNonProbe() - - // reset probe to ensure it does not start too soon after a downward trend - s.probeController.Reset() - - s.allocateAllTracks() -} - func (s *StreamAllocator) allocateTrack(track *Track) { // abort any probe that may be running when a track specific change needs allocation s.probeController.AbortProbe() @@ -1129,39 +1065,17 @@ func (s *StreamAllocator) allocateTrack(track *Track) { s.adjustState() } -func (s *StreamAllocator) onProbeDone(isNotFailing bool, isGoalReached bool) { - highestEstimateInProbe := s.channelObserver.GetHighestEstimate() - if s.sendSideBWE != nil { - highestEstimateInProbe = s.sendSideBWE.GetEstimatedAvailableChannelCapacity() +func (s *StreamAllocator) onProbeDone(isNotFailing bool, isGoalReached bool, highestEstimate int64) { + if s.bwe != nil { + s.bwe.ProbingEnd(isNotFailing, isGoalReached) } - // - // Reset estimator at the end of a probe irrespective of probe result to get fresh readings. - // With a failed probe, the latest estimate could be lower than committed estimate. - // As bandwidth estimator (remote in REMB case, local in TWCC case) holds state, - // subsequent estimates could start from the lower point. That should not trigger a - // downward trend and get latched to committed estimate as that would trigger a re-allocation. - // With fresh readings, as long as the trend is not going downward, it will not get latched. - // - // NOTE: With TWCC, it is possible to reset bandwidth estimation to clean state as - // the send side is in full control of bandwidth estimation. - // - channelObserverString := s.channelObserver.ToString() - s.channelObserver = s.newChannelObserverNonProbe() - s.params.Logger.Debugw( - "probe done", - "isNotFailing", isNotFailing, - "isGoalReached", isGoalReached, - "committedEstimate", s.committedChannelCapacity, - "highestEstimate", highestEstimateInProbe, - "channel", channelObserverString, - ) if !isNotFailing { return } - if highestEstimateInProbe > s.committedChannelCapacity { - s.committedChannelCapacity = highestEstimateInProbe + if highestEstimate > s.committedChannelCapacity { + s.committedChannelCapacity = highestEstimate } s.maybeBoostDeficientTracks() @@ -1277,7 +1191,6 @@ func (s *StreamAllocator) allocateAllTracks() { for _, track := range sorted { _, usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient) - s.params.Logger.Infow("debug allocated", "trackID", track.ID(), "usedChannelCapacity", usedChannelCapacity, "availableChannelCapacity", availableChannelCapacity) // REMOVE availableChannelCapacity -= usedChannelCapacity if availableChannelCapacity < 0 { availableChannelCapacity = 0 @@ -1386,52 +1299,17 @@ func (s *StreamAllocator) getNackDelta() (uint32, uint32) { return aggPacketDelta, aggRepeatedNackDelta } -func (s *StreamAllocator) newChannelObserverProbe() *ChannelObserver { - return NewChannelObserver( - ChannelObserverParams{ - Name: "probe", - Config: s.params.Config.ChannelObserverProbe, - }, - s.params.Logger, - ) -} - -func (s *StreamAllocator) newChannelObserverNonProbe() *ChannelObserver { - return NewChannelObserver( - ChannelObserverParams{ - Name: "non-probe", - Config: s.params.Config.ChannelObserverNonProbe, - }, - s.params.Logger, - ) -} - func (s *StreamAllocator) initProbe(probeGoalDeltaBps int64) { expectedBandwidthUsage := s.getExpectedBandwidthUsage() probeClusterId, probeGoalBps := s.probeController.InitProbe(probeGoalDeltaBps, expectedBandwidthUsage) - - logFields := []any{ + s.params.Logger.Debugw( + "stream allocator: starting probe", "probeClusterId", probeClusterId, "current usage", expectedBandwidthUsage, "committed", s.committedChannelCapacity, "probeGoalDeltaBps", probeGoalDeltaBps, "goalBps", probeGoalBps, - } - if s.sendSideBWE != nil { - channelState := "" - if s.channelObserver != nil { - channelState = s.channelObserver.ToString() - } - s.channelObserver = s.newChannelObserverProbe() - s.channelObserver.SeedEstimate(s.lastReceivedEstimate) - logFields = append( - logFields, - "lastReceived", s.lastReceivedEstimate, - "channel", channelState, - ) - } - - s.params.Logger.Debugw("stream allocator: starting probe", logFields...) + ) } func (s *StreamAllocator) maybeProbe() { @@ -1439,10 +1317,8 @@ func (s *StreamAllocator) maybeProbe() { // do not probe if channel capacity is overridden return } - if !s.probeController.CanProbe() { - return - } - if s.sendSideBWE != nil && s.sendSideBWE.GetCongestionState() != sendsidebwe.CongestionStateNone { + + if s.congestionState != bwe.CongestionStateNone || !s.probeController.CanProbe() { return }