From 94488d434d9dcdc056602919844ebf121ea11b7a Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 6 Dec 2024 00:13:36 +0530 Subject: [PATCH] TWCC probing (#3234) * WIP * WIP * WIP * make it compile * typo * clean up * fmt * fixes --- pkg/sfu/bwe/bwe.go | 38 +-- pkg/sfu/bwe/null_bwe.go | 18 +- pkg/sfu/bwe/remotebwe/probe_controller.go | 172 +++++++++++ pkg/sfu/bwe/remotebwe/remote_bwe.go | 112 ++++--- .../bwe/sendsidebwe/congestion_detector.go | 229 ++++++++++++--- pkg/sfu/bwe/sendsidebwe/packet_group.go | 20 +- pkg/sfu/bwe/sendsidebwe/packet_tracker.go | 32 +- pkg/sfu/bwe/sendsidebwe/probe_packet_group.go | 132 +++++++++ pkg/sfu/bwe/sendsidebwe/send_side_bwe.go | 37 ++- pkg/sfu/bwe/sendsidebwe/traffic_stats.go | 19 +- pkg/sfu/bwe/sendsidebwe/twcc_feedback.go | 4 +- pkg/sfu/ccutils/probe_regulator.go | 106 +++++++ pkg/sfu/ccutils/probe_signal.go | 40 +++ pkg/sfu/downtrack.go | 4 + pkg/sfu/streamallocator/probe_controller.go | 277 ------------------ pkg/sfu/streamallocator/streamallocator.go | 125 ++++---- 16 files changed, 911 insertions(+), 454 deletions(-) create mode 100644 pkg/sfu/bwe/remotebwe/probe_controller.go create mode 100644 pkg/sfu/bwe/sendsidebwe/probe_packet_group.go create mode 100644 pkg/sfu/ccutils/probe_regulator.go create mode 100644 pkg/sfu/ccutils/probe_signal.go delete mode 100644 pkg/sfu/streamallocator/probe_controller.go diff --git a/pkg/sfu/bwe/bwe.go b/pkg/sfu/bwe/bwe.go index f9867ec6b..e0b5addd4 100644 --- a/pkg/sfu/bwe/bwe.go +++ b/pkg/sfu/bwe/bwe.go @@ -16,6 +16,7 @@ package bwe import ( "fmt" + "time" "github.com/livekit/livekit-server/pkg/sfu/ccutils" "github.com/pion/rtcp" @@ -23,6 +24,13 @@ import ( // ------------------------------------------------ +const ( + DefaultRTT = float64(0.070) // 70 ms + RTTSmoothingFactor = float64(0.5) +) + +// ------------------------------------------------ + type CongestionState int const ( @@ -52,29 +60,6 @@ func (c CongestionState) String() string { // ------------------------------------------------ -type ProbeSignal int - -const ( - ProbeSignalInconclusive ProbeSignal = iota - ProbeSignalCongesting - ProbeSignalClearing -) - -func (p ProbeSignal) String() string { - switch p { - case ProbeSignalInconclusive: - return "INCONCLUSIVE" - case ProbeSignalCongesting: - return "CONGESTING" - case ProbeSignalClearing: - return "CLEARING" - default: - return fmt.Sprintf("%d", int(p)) - } -} - -// ------------------------------------------------ - type BWE interface { SetBWEListener(bweListner BWEListener) @@ -100,10 +85,13 @@ type BWE interface { HandleTWCCFeedback(report *rtcp.TransportLayerCC) - CongestionState() CongestionState + UpdateRTT(rtt float64) + CanProbe() bool + ProbeDuration() time.Duration ProbeClusterStarting(pci ccutils.ProbeClusterInfo) - ProbeClusterDone(pci ccutils.ProbeClusterInfo) (ProbeSignal, int64) + ProbeClusterDone(pci ccutils.ProbeClusterInfo) + ProbeClusterFinalize() (ccutils.ProbeSignal, int64, bool) } // ------------------------------------------------ diff --git a/pkg/sfu/bwe/null_bwe.go b/pkg/sfu/bwe/null_bwe.go index 1a714e911..3ef5f4787 100644 --- a/pkg/sfu/bwe/null_bwe.go +++ b/pkg/sfu/bwe/null_bwe.go @@ -15,6 +15,8 @@ package bwe import ( + "time" + "github.com/livekit/livekit-server/pkg/sfu/ccutils" "github.com/pion/rtcp" ) @@ -48,14 +50,22 @@ func (n *NullBWE) HandleREMB( func (n *NullBWE) HandleTWCCFeedback(_report *rtcp.TransportLayerCC) {} -func (n *NullBWE) CongestionState() CongestionState { - return CongestionStateNone +func (n *NullBWE) UpdateRTT(rtt float64) {} + +func (n *NullBWE) CanProbe() bool { + return false +} + +func (n *NullBWE) ProbeDuration() time.Duration { + return 0 } func (n *NullBWE) ProbeClusterStarting(_pci ccutils.ProbeClusterInfo) {} -func (n *NullBWE) ProbeClusterDone(_pci ccutils.ProbeClusterInfo) (ProbeSignal, int64) { - return ProbeSignalInconclusive, 0 +func (n *NullBWE) ProbeClusterDone(_pci ccutils.ProbeClusterInfo) {} + +func (n *NullBWE) ProbeClusterFinalize() (ccutils.ProbeSignal, int64, bool) { + return ccutils.ProbeSignalInconclusive, 0, false } // ------------------------------------------------ diff --git a/pkg/sfu/bwe/remotebwe/probe_controller.go b/pkg/sfu/bwe/remotebwe/probe_controller.go new file mode 100644 index 000000000..0bf94fa8e --- /dev/null +++ b/pkg/sfu/bwe/remotebwe/probe_controller.go @@ -0,0 +1,172 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remotebwe + +import ( + "fmt" + "time" + + "github.com/livekit/livekit-server/pkg/sfu/bwe" + "github.com/livekit/livekit-server/pkg/sfu/ccutils" + "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils/mono" +) + +// --------------------------------------------------------------------------- + +type probeControllerState int + +const ( + probeControllerStateNone probeControllerState = iota + probeControllerStateProbing + probeControllerStateHangover +) + +func (p probeControllerState) String() string { + switch p { + case probeControllerStateNone: + return "NONE" + case probeControllerStateProbing: + return "PROBING" + case probeControllerStateHangover: + return "HANGOVER" + default: + return fmt.Sprintf("%d", int(p)) + } +} + +// ------------------------------------------------ + +type ProbeControllerConfig struct { + ProbeRegulator ccutils.ProbeRegulatorConfig `yaml:"probe_regulator,omitempty"` + + SettleWaitNumRTT uint32 `yaml:"settle_wait_num_rtt,omitempty"` + SettleWaitMin time.Duration `yaml:"settle_wait_min,omitempty"` + SettleWaitMax time.Duration `yaml:"settle_wait_max,omitempty"` +} + +var ( + DefaultProbeControllerConfig = ProbeControllerConfig{ + ProbeRegulator: ccutils.DefaultProbeRegulatorConfig, + + SettleWaitNumRTT: 10, + SettleWaitMin: 500 * time.Millisecond, + SettleWaitMax: 10 * time.Second, + } +) + +// --------------------------------------------------------------------------- + +type probeControllerParams struct { + Config ProbeControllerConfig + Logger logger.Logger +} + +type probeController struct { + params probeControllerParams + + state probeControllerState + stateSwitchedAt time.Time + + pci ccutils.ProbeClusterInfo + rtt float64 + + *ccutils.ProbeRegulator +} + +func newProbeController(params probeControllerParams) *probeController { + return &probeController{ + params: params, + state: probeControllerStateNone, + stateSwitchedAt: mono.Now(), + pci: ccutils.ProbeClusterInfoInvalid, + rtt: bwe.DefaultRTT, + ProbeRegulator: ccutils.NewProbeRegulator( + ccutils.ProbeRegulatorParams{ + Config: params.Config.ProbeRegulator, + Logger: params.Logger, + }, + ), + } +} + +func (p *probeController) UpdateRTT(rtt float64) { + if rtt == 0 { + p.rtt = bwe.DefaultRTT + } else { + if p.rtt == 0 { + p.rtt = rtt + } else { + p.rtt = bwe.RTTSmoothingFactor*rtt + (1.0-bwe.RTTSmoothingFactor)*p.rtt + } + } +} + +func (p *probeController) CanProbe() bool { + return p.state == probeControllerStateNone && p.ProbeRegulator.CanProbe() +} + +func (p *probeController) IsInProbe() bool { + return p.state != probeControllerStateNone +} + +func (p *probeController) ProbeClusterStarting(pci ccutils.ProbeClusterInfo) { + if p.state != probeControllerStateNone { + p.params.Logger.Warnw("unexpected probe controller state", nil, "state", p.state) + } + + p.setState(probeControllerStateProbing) + p.pci = pci +} + +func (p *probeController) ProbeClusterDone(pci ccutils.ProbeClusterInfo) { + if p.pci.Id != pci.Id { + return + } + + p.pci.Result = pci.Result + p.setState(probeControllerStateHangover) +} + +func (p *probeController) MaybeFinalizeProbe() (ccutils.ProbeClusterInfo, bool) { + if p.state != probeControllerStateHangover { + return ccutils.ProbeClusterInfoInvalid, false + } + + settleWait := time.Duration(float64(p.params.Config.SettleWaitNumRTT) * p.rtt * float64(time.Second)) + if settleWait < p.params.Config.SettleWaitMin { + settleWait = p.params.Config.SettleWaitMin + } + if settleWait > p.params.Config.SettleWaitMax { + settleWait = p.params.Config.SettleWaitMax + } + if time.Since(p.stateSwitchedAt) < settleWait { + return ccutils.ProbeClusterInfoInvalid, false + } + + p.setState(probeControllerStateNone) + return p.pci, true +} + +func (p *probeController) setState(state probeControllerState) { + if state == p.state { + return + } + + p.state = state + p.stateSwitchedAt = mono.Now() +} + +// ------------------------------------------------ diff --git a/pkg/sfu/bwe/remotebwe/remote_bwe.go b/pkg/sfu/bwe/remotebwe/remote_bwe.go index 94bcec264..156338714 100644 --- a/pkg/sfu/bwe/remotebwe/remote_bwe.go +++ b/pkg/sfu/bwe/remotebwe/remote_bwe.go @@ -27,25 +27,22 @@ import ( // --------------------------------------------------------------------------- type RemoteBWEConfig struct { - NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"` - ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"` - ChannelObserverProbe ChannelObserverConfig `yaml:"channel_observer_probe,omitempty"` - ChannelObserverNonProbe ChannelObserverConfig `yaml:"channel_observer_non_probe,omitempty"` - CongestedMinDuration time.Duration `yaml:"congested_min_duration,omitempty"` - - PeriodicCheckInterval time.Duration `yaml:"periodic_check_interval,omitempty"` - PeriodicCheckIntervalCongested time.Duration `yaml:"periodic_check_interval_congested,omitempty"` + NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"` + ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"` + ChannelObserverProbe ChannelObserverConfig `yaml:"channel_observer_probe,omitempty"` + ChannelObserverNonProbe ChannelObserverConfig `yaml:"channel_observer_non_probe,omitempty"` + CongestedHangoverDuration time.Duration `yaml:"congested_hangover_duration,omitempty"` + ProbeController ProbeControllerConfig `yaml:"probe_controller,omitempty"` } var ( DefaultRemoteBWEConfig = RemoteBWEConfig{ - NackRatioAttenuator: 0.4, - ExpectedUsageThreshold: 0.95, - ChannelObserverProbe: defaultChannelObserverConfigProbe, - ChannelObserverNonProbe: defaultChannelObserverConfigNonProbe, - CongestedMinDuration: 3 * time.Second, - PeriodicCheckInterval: 2 * time.Second, - PeriodicCheckIntervalCongested: 200 * time.Millisecond, + NackRatioAttenuator: 0.4, + ExpectedUsageThreshold: 0.95, + ChannelObserverProbe: defaultChannelObserverConfigProbe, + ChannelObserverNonProbe: defaultChannelObserverConfigNonProbe, + CongestedHangoverDuration: 3 * time.Second, + ProbeController: DefaultProbeControllerConfig, } ) @@ -67,7 +64,8 @@ type RemoteBWE struct { lastExpectedBandwidthUsage int64 committedChannelCapacity int64 - isInProbe bool + probeController *probeController + channelObserver *channelObserver congestionState bwe.CongestionState @@ -107,11 +105,15 @@ func (r *RemoteBWE) Reset() { r.lastExpectedBandwidthUsage = 0 r.committedChannelCapacity = 100_000_000 - r.isInProbe = false - r.newChannelObserver() - r.congestionState = bwe.CongestionStateNone r.congestionStateSwitchedAt = mono.Now() + + r.probeController = newProbeController(probeControllerParams{ + Config: r.params.Config.ProbeController, + Logger: r.params.Logger, + }) + + r.newChannelObserver() } func (r *RemoteBWE) HandleREMB( @@ -126,7 +128,7 @@ func (r *RemoteBWE) HandleREMB( // in probe, freeze channel observer state if probe causes congestion till the probe is done, // this is to ensure that probe result is not a success and an unsuccessful probe will not up allocate any tracks - if r.isInProbe && r.congestionState != bwe.CongestionStateNone { + if r.congestionState != bwe.CongestionStateNone && r.probeController.IsInProbe() { r.lock.Unlock() return } @@ -144,11 +146,11 @@ func (r *RemoteBWE) HandleREMB( } } -func (r *RemoteBWE) CongestionState() bwe.CongestionState { - r.lock.RLock() - defer r.lock.RUnlock() +func (r *RemoteBWE) UpdateRTT(rtt float64) { + r.lock.Lock() + defer r.lock.Unlock() - return r.congestionState + r.probeController.UpdateRTT(rtt) } func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState, int64) { @@ -162,7 +164,7 @@ func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState switch r.congestionState { case bwe.CongestionStateNone: if trend == channelTrendCongesting { - if r.isInProbe || r.estimateAvailableChannelCapacity(reason) { + if r.probeController.IsInProbe() || r.estimateAvailableChannelCapacity(reason) { // when in probe, if congested, stays there will probe is done, // the estimate stays at pre-probe level newState = bwe.CongestionStateCongested @@ -184,7 +186,7 @@ func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState if r.estimateAvailableChannelCapacity(reason) { newState = bwe.CongestionStateCongested } - } else if time.Since(r.congestionStateSwitchedAt) >= r.params.Config.CongestedMinDuration { + } else if time.Since(r.congestionStateSwitchedAt) >= r.params.Config.CongestedHangoverDuration { newState = bwe.CongestionStateNone } } @@ -254,6 +256,20 @@ func (r *RemoteBWE) updateCongestionState(state bwe.CongestionState, reason chan r.congestionStateSwitchedAt = mono.Now() } +func (r *RemoteBWE) CanProbe() bool { + r.lock.Lock() + defer r.lock.Unlock() + + return r.congestionState == bwe.CongestionStateNone && r.probeController.CanProbe() +} + +func (r *RemoteBWE) ProbeDuration() time.Duration { + r.lock.Lock() + defer r.lock.Unlock() + + return r.probeController.ProbeDuration() +} + func (r *RemoteBWE) ProbeClusterStarting(pci ccutils.ProbeClusterInfo) { r.lock.Lock() defer r.lock.Unlock() @@ -267,49 +283,61 @@ func (r *RemoteBWE) ProbeClusterStarting(pci ccutils.ProbeClusterInfo) { "channel", r.channelObserver, ) - r.isInProbe = true + r.probeController.ProbeClusterStarting(pci) r.newChannelObserver() } -func (r *RemoteBWE) ProbeClusterDone(_pci ccutils.ProbeClusterInfo) (bwe.ProbeSignal, int64) { +func (r *RemoteBWE) ProbeClusterDone(pci ccutils.ProbeClusterInfo) { r.lock.Lock() defer r.lock.Unlock() + r.probeController.ProbeClusterDone(pci) +} + +func (r *RemoteBWE) ProbeClusterFinalize() (ccutils.ProbeSignal, int64, bool) { + r.lock.Lock() + defer r.lock.Unlock() + + pci, isFinalized := r.probeController.MaybeFinalizeProbe() + if !isFinalized { + return ccutils.ProbeSignalInconclusive, 0, isFinalized + } + // switch to a non-probe channel observer on probe end, // reset congestion state to get a fresh trend pco := r.channelObserver probeCongestionState := r.congestionState - r.isInProbe = false r.congestionState = bwe.CongestionStateNone r.newChannelObserver() r.params.Logger.Debugw( - "remote bwe: probe done", + "remote bwe: probe finalized", "lastReceived", r.lastReceivedEstimate, "expectedBandwidthUsage", r.lastExpectedBandwidthUsage, "channel", pco, "isSignalValid", pco.HasEnoughEstimateSamples(), + "probeClusterInfo", pci, ) + probeSignal := ccutils.ProbeSignalClearing if probeCongestionState != bwe.CongestionStateNone { - return bwe.ProbeSignalCongesting, r.committedChannelCapacity + probeSignal = ccutils.ProbeSignalCongesting + } else if trend, _ := pco.GetTrend(); !pco.HasEnoughEstimateSamples() || trend == channelTrendNeutral { + probeSignal = ccutils.ProbeSignalInconclusive + } else { + highestEstimate := pco.GetHighestEstimate() + if highestEstimate > r.committedChannelCapacity { + r.committedChannelCapacity = highestEstimate + } } - trend, _ := pco.GetTrend() - if !pco.HasEnoughEstimateSamples() || trend == channelTrendNeutral { - return bwe.ProbeSignalInconclusive, r.committedChannelCapacity - } - - highestEstimate := pco.GetHighestEstimate() - if highestEstimate > r.committedChannelCapacity { - r.committedChannelCapacity = highestEstimate - } - return bwe.ProbeSignalClearing, r.committedChannelCapacity + r.probeController.ProbeSignal(probeSignal, pci.CreatedAt) + return probeSignal, r.committedChannelCapacity, true } func (r *RemoteBWE) newChannelObserver() { - if r.isInProbe { + if r.probeController.IsInProbe() { r.channelObserver = newChannelObserver( channelObserverParams{ Name: "probe", diff --git a/pkg/sfu/bwe/sendsidebwe/congestion_detector.go b/pkg/sfu/bwe/sendsidebwe/congestion_detector.go index 23704448e..da01845ac 100644 --- a/pkg/sfu/bwe/sendsidebwe/congestion_detector.go +++ b/pkg/sfu/bwe/sendsidebwe/congestion_detector.go @@ -63,6 +63,58 @@ var ( // ------------------------------------------------------------------------------- +type ProbeSignalConfig struct { + MinBytesRatio float64 `yaml:"min_bytes_ratio,omitempty"` + MinDurationRatio float64 `yaml:"min_duration_ratio,omitempty"` + + JQRMinDelay time.Duration `yaml:"jqr_min_delay,omitempty"` + DQRMaxDelay time.Duration `yaml:"dqr_max_delay,omitempty"` + + WeightedLoss WeightedLossConfig `yaml:"weighted_loss,omitempty"` + CongestionMinLoss float64 `yaml:"congestion_min_loss,omitempty"` +} + +func (p ProbeSignalConfig) IsValid(pci ccutils.ProbeClusterInfo) bool { + return pci.Result.Bytes() > int(p.MinBytesRatio*float64(pci.Goal.DesiredBytes)) && pci.Result.Duration() > time.Duration(p.MinDurationRatio*float64(pci.Goal.Duration)) +} + +func (p ProbeSignalConfig) ProbeSignal(ppg *probePacketGroup) (ccutils.ProbeSignal, int64) { + ts := newTrafficStats(trafficStatsParams{ + Config: p.WeightedLoss, + }) + ts.Merge(ppg.Traffic()) + + pqd := ppg.PropagatedQueuingDelay() + if pqd > p.JQRMinDelay.Microseconds() { + return ccutils.ProbeSignalCongesting, ts.AcknowledgedBitrate() + } + + if ts.WeightedLoss() > p.CongestionMinLoss { + return ccutils.ProbeSignalCongesting, ts.AcknowledgedBitrate() + } + + if pqd < p.DQRMaxDelay.Microseconds() { + return ccutils.ProbeSignalClearing, ts.AcknowledgedBitrate() + } + + return ccutils.ProbeSignalInconclusive, ts.AcknowledgedBitrate() +} + +var ( + DefaultProbeSignalConfig = ProbeSignalConfig{ + MinBytesRatio: 0.5, + MinDurationRatio: 0.5, + + JQRMinDelay: 15 * time.Millisecond, + DQRMaxDelay: 5 * time.Millisecond, + + WeightedLoss: defaultWeightedLossConfig, + CongestionMinLoss: 0.25, + } +) + +// ------------------------------------------------------------------------------- + type qdMeasurement struct { earlyWarningConfig CongestionSignalConfig congestedConfig CongestionSignalConfig @@ -94,7 +146,7 @@ func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup) { return } - pqd, pqdOk := pg.PropagatedQueuingDelay() + pqd, pqdOk := pg.FinalizedPropagatedQueuingDelay() if !pqdOk { return } @@ -234,6 +286,10 @@ type CongestionDetectorConfig struct { PacketGroup PacketGroupConfig `yaml:"packet_group,omitempty"` PacketGroupMaxAge time.Duration `yaml:"packet_group_max_age,omitempty"` + ProbePacketGroup ProbePacketGroupConfig `yaml:"probe_packet_group,omitempty"` + ProbeRegulator ccutils.ProbeRegulatorConfig `yaml:"probe_regulator,omitempty"` + ProbeSignal ProbeSignalConfig `yaml:"probe_signal,omitempty"` + JQRMinDelay time.Duration `yaml:"jqr_min_delay,omitempty"` DQRMaxDelay time.Duration `yaml:"dqr_max_delay,omitempty"` @@ -268,24 +324,34 @@ var ( } DefaultCongestionDetectorConfig = CongestionDetectorConfig{ - PacketGroup: DefaultPacketGroupConfig, - PacketGroupMaxAge: 15 * time.Second, - JQRMinDelay: 15 * time.Millisecond, - DQRMaxDelay: 5 * time.Millisecond, - WeightedLoss: defaultWeightedLossConfig, - CongestionMinLoss: 0.25, - QueuingDelayEarlyWarning: DefaultQueuingDelayEarlyWarningCongestionSignalConfig, - LossEarlyWarning: DefaultLossEarlyWarningCongestionSignalConfig, - EarlyWarningHangover: 500 * time.Millisecond, - QueuingDelayCongested: DefaultQueuingDelayCongestedCongestionSignalConfig, - LossCongested: DefaultLossCongestedCongestionSignalConfig, - CongestedHangover: 3 * time.Second, + PacketGroup: DefaultPacketGroupConfig, + PacketGroupMaxAge: 15 * time.Second, + + ProbePacketGroup: DefaultPacketGroupConfigProbe, + ProbeRegulator: ccutils.DefaultProbeRegulatorConfig, + ProbeSignal: DefaultProbeSignalConfig, + + JQRMinDelay: 15 * time.Millisecond, + DQRMaxDelay: 5 * time.Millisecond, + + WeightedLoss: defaultWeightedLossConfig, + CongestionMinLoss: 0.25, + + QueuingDelayEarlyWarning: DefaultQueuingDelayEarlyWarningCongestionSignalConfig, + LossEarlyWarning: DefaultLossEarlyWarningCongestionSignalConfig, + EarlyWarningHangover: 500 * time.Millisecond, + + QueuingDelayCongested: DefaultQueuingDelayCongestedCongestionSignalConfig, + LossCongested: DefaultLossCongestedCongestionSignalConfig, + CongestedHangover: 3 * time.Second, + RateMeasurementWindowDurationMin: 800 * time.Millisecond, RateMeasurementWindowDurationMax: 2 * time.Second, - PeriodicCheckInterval: 2 * time.Second, - PeriodicCheckIntervalCongested: 200 * time.Millisecond, - CongestedCTRTrend: defaultTrendDetectorConfigCongestedCTR, - CongestedCTREpsilon: 0.05, + + PeriodicCheckInterval: 2 * time.Second, + PeriodicCheckIntervalCongested: 200 * time.Millisecond, + CongestedCTRTrend: defaultTrendDetectorConfigCongestedCTR, + CongestedCTREpsilon: 0.05, } ) @@ -307,11 +373,16 @@ type congestionDetector struct { lock sync.RWMutex feedbackReports deque.Deque[feedbackReport] + rtt float64 + *packetTracker twccFeedback *twccFeedback packetGroups []*packetGroup + probePacketGroup *probePacketGroup + probeRegulator *ccutils.ProbeRegulator + wake chan struct{} stop core.Fuse @@ -326,9 +397,14 @@ type congestionDetector struct { func newCongestionDetector(params congestionDetectorParams) *congestionDetector { c := &congestionDetector{ - params: params, - packetTracker: newPacketTracker(packetTrackerParams{Logger: params.Logger}), - twccFeedback: newTWCCFeedback(twccFeedbackParams{Logger: params.Logger}), + params: params, + rtt: bwe.DefaultRTT, + packetTracker: newPacketTracker(packetTrackerParams{Logger: params.Logger}), + twccFeedback: newTWCCFeedback(twccFeedbackParams{Logger: params.Logger}), + probeRegulator: ccutils.NewProbeRegulator(ccutils.ProbeRegulatorParams{ + Config: params.Config.ProbeRegulator, + Logger: params.Logger, + }), wake: make(chan struct{}, 1), estimatedAvailableChannelCapacity: 100_000_000, congestionState: bwe.CongestionStateNone, @@ -371,11 +447,87 @@ func (c *congestionDetector) HandleTWCCFeedback(report *rtcp.TransportLayerCC) { } } -func (c *congestionDetector) CongestionState() bwe.CongestionState { - c.lock.RLock() - defer c.lock.RUnlock() +func (c *congestionDetector) UpdateRTT(rtt float64) { + if rtt == 0 { + c.rtt = bwe.DefaultRTT + } else { + if c.rtt == 0 { + c.rtt = rtt + } else { + c.rtt = bwe.RTTSmoothingFactor*rtt + (1.0-bwe.RTTSmoothingFactor)*c.rtt + } + } +} - return c.congestionState +func (c *congestionDetector) CanProbe() bool { + c.lock.Lock() + defer c.lock.Unlock() + + return c.congestionState == bwe.CongestionStateNone && c.probePacketGroup == nil && c.probeRegulator.CanProbe() +} + +func (c *congestionDetector) ProbeDuration() time.Duration { + c.lock.Lock() + defer c.lock.Unlock() + + return c.probeRegulator.ProbeDuration() +} + +func (c *congestionDetector) ProbeClusterStarting(pci ccutils.ProbeClusterInfo) { + c.lock.Lock() + defer c.lock.Unlock() + + c.probePacketGroup = newProbePacketGroup( + probePacketGroupParams{ + Config: c.params.Config.ProbePacketGroup, + WeightedLoss: c.params.Config.WeightedLoss, + Logger: c.params.Logger, + }, + pci, + ) + + c.packetTracker.ProbeClusterStarting(pci.Id) +} + +func (c *congestionDetector) ProbeClusterDone(pci ccutils.ProbeClusterInfo) { + c.lock.Lock() + defer c.lock.Unlock() + + c.packetTracker.ProbeClusterDone(pci.Id) + if c.probePacketGroup != nil { + c.probePacketGroup.ProbeClusterDone(pci) + } +} + +func (c *congestionDetector) ProbeClusterFinalize() (ccutils.ProbeSignal, int64, bool) { + c.lock.Lock() + defer c.lock.Unlock() + + if c.probePacketGroup == nil { + return ccutils.ProbeSignalInconclusive, 0, false + } + + pci, isFinalized := c.probePacketGroup.MaybeFinalizeProbe(c.packetTracker.ProbeMaxSequenceNumber(), c.rtt) + if !isFinalized { + return ccutils.ProbeSignalInconclusive, 0, isFinalized + } + + isSignalValid := c.params.Config.ProbeSignal.IsValid(pci) + c.params.Logger.Debugw( + "send side bwe: probe finalized", + "isSignalValid", isSignalValid, + "probeClusterInfo", pci, + "probePacketGroup", c.probePacketGroup, + ) + + probeSignal, estimatedAvailableChannelCapacity := c.params.Config.ProbeSignal.ProbeSignal(c.probePacketGroup) + if probeSignal == ccutils.ProbeSignalClearing && estimatedAvailableChannelCapacity > c.estimatedAvailableChannelCapacity { + c.estimatedAvailableChannelCapacity = estimatedAvailableChannelCapacity + } + + c.probeRegulator.ProbeSignal(probeSignal, pci.CreatedAt) + c.probePacketGroup = nil + return probeSignal, c.estimatedAvailableChannelCapacity, true } func (c *congestionDetector) prunePacketGroups() { @@ -458,7 +610,7 @@ func (c *congestionDetector) congestionDetectionStateMachine() { switch state { case bwe.CongestionStateNone: if congestedTriggered { - c.params.Logger.Warnw("invalid congested state transition", nil, "from", state, "reason", congestedReason) + c.params.Logger.Warnw("send side bwe: invalid congested state transition", nil, "from", state, "reason", congestedReason) } if earlyWarningTriggered { newState = bwe.CongestionStateEarlyWarning @@ -475,7 +627,7 @@ func (c *congestionDetector) congestionDetectionStateMachine() { case bwe.CongestionStateEarlyWarningHangover: if congestedTriggered { - c.params.Logger.Warnw("invalid congested state transition", nil, "from", state, "reason", congestedReason) + c.params.Logger.Warnw("send side bwe: invalid congested state transition", nil, "from", state, "reason", congestedReason) } if earlyWarningTriggered { newState = bwe.CongestionStateEarlyWarning @@ -491,7 +643,7 @@ func (c *congestionDetector) congestionDetectionStateMachine() { case bwe.CongestionStateCongestedHangover: if congestedTriggered { - c.params.Logger.Warnw("invalid congested state transition", nil, "from", state, "reason", congestedReason) + c.params.Logger.Warnw("send side bwe: invalid congested state transition", nil, "from", state, "reason", congestedReason) } if earlyWarningTriggered { newState = bwe.CongestionStateEarlyWarning @@ -543,7 +695,7 @@ func (c *congestionDetector) updateCTRTrend(pg *packetGroup) { return } - c.params.Logger.Infow("captured traffic ratio is trending downward", "channel", c.congestedCTRTrend) + c.params.Logger.Infow("send side bwe: captured traffic ratio is trending downward", "channel", c.congestedCTRTrend) if bweListener := c.getBWEListener(); bweListener != nil { bweListener.OnCongestionStateChange(c.congestionState, c.estimatedAvailableChannelCapacity) @@ -554,7 +706,7 @@ func (c *congestionDetector) updateCTRTrend(pg *packetGroup) { } func (c *congestionDetector) estimateAvailableChannelCapacity() { - if len(c.packetGroups) == 0 { + if len(c.packetGroups) == 0 || c.probePacketGroup != nil { return } @@ -579,7 +731,7 @@ func (c *congestionDetector) estimateAvailableChannelCapacity() { if agg.Duration() < c.params.Config.RateMeasurementWindowDurationMin.Microseconds() { c.params.Logger.Infow( - "not enough data to estimate available channel capacity", + "send side bwe: not enough data to estimate available channel capacity", "duration", agg.Duration(), "numGroups", len(c.packetGroups), "oldestUsed", max(0, idx), @@ -592,7 +744,7 @@ func (c *congestionDetector) estimateAvailableChannelCapacity() { func (c *congestionDetector) updateCongestionState(state bwe.CongestionState, reason string, oldestContributingGroup int) { c.params.Logger.Infow( - "congestion state change", + "send side bwe: congestion state change", "from", c.congestionState, "to", state, "reason", reason, @@ -629,7 +781,7 @@ func (c *congestionDetector) updateCongestionState(state bwe.CongestionState, re func (c *congestionDetector) processFeedbackReport(fbr feedbackReport) { recvRefTime, isOutOfOrder := c.twccFeedback.ProcessReport(fbr.report, fbr.at) if isOutOfOrder { - c.params.Logger.Infow("received out-of-order feedback report") + c.params.Logger.Infow("send side bwe: received out-of-order feedback report") } if len(c.packetGroups) == 0 { @@ -652,6 +804,10 @@ func (c *congestionDetector) processFeedbackReport(fbr feedbackReport) { return } + if c.probePacketGroup != nil { + c.probePacketGroup.Add(pi, sendDelta, recvDelta, isLost) + } + err := pg.Add(pi, sendDelta, recvDelta, isLost) if err == nil { return @@ -662,18 +818,19 @@ func (c *congestionDetector) processFeedbackReport(fbr feedbackReport) { c.updateCTRTrend(pg) // SSBWE-REMOVE c.params.Logger.Infow("packet group done", "group", pg, "numGroups", len(c.packetGroups)) // SSBWE-REMOVE - pqd, _ := pg.PropagatedQueuingDelay() pg = newPacketGroup( packetGroupParams{ Config: c.params.Config.PacketGroup, WeightedLoss: c.params.Config.WeightedLoss, Logger: c.params.Logger, }, - pqd, + pg.PropagatedQueuingDelay(), ) c.packetGroups = append(c.packetGroups, pg) - pg.Add(pi, sendDelta, recvDelta, isLost) + if err = pg.Add(pi, sendDelta, recvDelta, isLost); err != nil { + c.params.Logger.Warnw("send side bwe: could not add packet to new packet group", err, "packetInfo", pi, "packetGroup", pg) + } return } @@ -683,7 +840,7 @@ func (c *congestionDetector) processFeedbackReport(fbr feedbackReport) { if err := opg.Add(pi, sendDelta, recvDelta, isLost); err == nil { return } else if err == errGroupFinalized { - c.params.Logger.Infow("unpected finalized group", "packetInfo", pi, "packetGroup", opg) + c.params.Logger.Infow("send side bwe: unexpected finalized group", "packetInfo", pi, "packetGroup", opg) } } } diff --git a/pkg/sfu/bwe/sendsidebwe/packet_group.go b/pkg/sfu/bwe/sendsidebwe/packet_group.go index 2ad9ec00c..b3e0f8021 100644 --- a/pkg/sfu/bwe/sendsidebwe/packet_group.go +++ b/pkg/sfu/bwe/sendsidebwe/packet_group.go @@ -230,16 +230,20 @@ func (p *packetGroup) SendWindow() (int64, int64) { return p.minSendTime, p.maxSendTime } -func (p *packetGroup) PropagatedQueuingDelay() (int64, bool) { +func (p *packetGroup) PropagatedQueuingDelay() int64 { + if p.queuingDelay+p.aggregateRecvDelta-p.aggregateSendDelta > 0 { + return p.queuingDelay + p.aggregateRecvDelta - p.aggregateSendDelta + } + + return max(0, p.aggregateRecvDelta-p.aggregateSendDelta) +} + +func (p *packetGroup) FinalizedPropagatedQueuingDelay() (int64, bool) { if !p.isFinalized { return 0, false } - if p.queuingDelay+p.aggregateRecvDelta-p.aggregateSendDelta > 0 { - return p.queuingDelay + p.aggregateRecvDelta - p.aggregateSendDelta, true - } - - return max(0, p.aggregateRecvDelta-p.aggregateSendDelta), true + return p.PropagatedQueuingDelay(), true } func (p *packetGroup) Traffic() *trafficStats { @@ -251,6 +255,7 @@ func (p *packetGroup) Traffic() *trafficStats { ackedPackets: p.acked.numPackets(), ackedBytes: p.acked.numBytes(), lostPackets: p.lost.numPackets(), + lostBytes: p.lost.numBytes(), } } @@ -282,8 +287,7 @@ func (p *packetGroup) MarshalLogObject(e zapcore.ObjectEncoder) error { ts.Merge(p.Traffic()) e.AddObject("trafficStats", ts) e.AddInt64("queuingDelay", p.queuingDelay) - pqd, _ := p.PropagatedQueuingDelay() - e.AddInt64("propagatedQueuingDelay", pqd) + e.AddInt64("propagatedQueuingDelay", p.PropagatedQueuingDelay()) e.AddBool("isFinalized", p.isFinalized) return nil diff --git a/pkg/sfu/bwe/sendsidebwe/packet_tracker.go b/pkg/sfu/bwe/sendsidebwe/packet_tracker.go index 29526900c..76c0e5503 100644 --- a/pkg/sfu/bwe/sendsidebwe/packet_tracker.go +++ b/pkg/sfu/bwe/sendsidebwe/packet_tracker.go @@ -41,6 +41,9 @@ type packetTracker struct { baseRecvTime int64 piLastRecv *packetInfo + + probeClusterId ccutils.ProbeClusterId + probeMaxSequenceNumber uint64 } func newPacketTracker(params packetTrackerParams) *packetTracker { @@ -73,7 +76,7 @@ func (p *packetTracker) RecordPacketSendAndGetSequenceNumber( probeClusterId: probeClusterId, isProbe: isProbe, } - // SSBWE-REMOVE p.params.Logger.Infow("packet sent", "packetInfo", pi) // SSBWE-REMOVE + //p.params.Logger.Infow("send side bwe: packet sent", "packetInfo", pi) // SSBWE-REMOVE p.sequenceNumber++ @@ -82,6 +85,10 @@ func (p *packetTracker) RecordPacketSendAndGetSequenceNumber( p.piLastRecv = nil } + if p.probeClusterId != ccutils.ProbeClusterIdInvalid && p.probeClusterId == pi.probeClusterId && pi.sequenceNumber > p.probeMaxSequenceNumber { + p.probeMaxSequenceNumber = pi.sequenceNumber + } + return uint16(pi.sequenceNumber) } @@ -137,3 +144,26 @@ func (p *packetTracker) getPacketInfoExisting(sn uint16) *packetInfo { return nil } + +func (p *packetTracker) ProbeClusterStarting(probeClusterId ccutils.ProbeClusterId) { + p.lock.Lock() + defer p.lock.Unlock() + + p.probeClusterId = probeClusterId +} + +func (p *packetTracker) ProbeClusterDone(probeClusterId ccutils.ProbeClusterId) { + p.lock.Lock() + defer p.lock.Unlock() + + if p.probeClusterId == probeClusterId { + p.probeClusterId = ccutils.ProbeClusterIdInvalid + } +} + +func (p *packetTracker) ProbeMaxSequenceNumber() uint64 { + p.lock.Lock() + defer p.lock.Unlock() + + return p.probeMaxSequenceNumber +} diff --git a/pkg/sfu/bwe/sendsidebwe/probe_packet_group.go b/pkg/sfu/bwe/sendsidebwe/probe_packet_group.go new file mode 100644 index 000000000..22dd2d096 --- /dev/null +++ b/pkg/sfu/bwe/sendsidebwe/probe_packet_group.go @@ -0,0 +1,132 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sendsidebwe + +import ( + "time" + + "github.com/livekit/livekit-server/pkg/sfu/ccutils" + "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils/mono" + "go.uber.org/zap/zapcore" +) + +// ------------------------------------------------------------- + +type ProbePacketGroupConfig struct { + PacketGroup PacketGroupConfig `yaml:"packet_group,omitempty"` + + SettleWaitNumRTT uint32 `yaml:"settle_wait_num_rtt,omitempty"` + SettleWaitMin time.Duration `yaml:"settle_wait_min,omitempty"` + SettleWaitMax time.Duration `yaml:"settle_wait_max,omitempty"` +} + +var ( + // large numbers to treat a probe packet group as one + DefaultPacketGroupConfigProbe = ProbePacketGroupConfig{ + PacketGroup: PacketGroupConfig{ + MinPackets: 16384, + MaxWindowDuration: time.Minute, + }, + SettleWaitNumRTT: 10, + SettleWaitMin: 500 * time.Millisecond, + SettleWaitMax: 10 * time.Second, + } +) + +// ------------------------------------------------------------- + +type probePacketGroupParams struct { + Config ProbePacketGroupConfig + WeightedLoss WeightedLossConfig + Logger logger.Logger +} + +type probePacketGroup struct { + params probePacketGroupParams + pci ccutils.ProbeClusterInfo + *packetGroup + maxSequenceNumber uint64 + doneAt time.Time +} + +func newProbePacketGroup(params probePacketGroupParams, pci ccutils.ProbeClusterInfo) *probePacketGroup { + return &probePacketGroup{ + params: params, + pci: pci, + packetGroup: newPacketGroup( + packetGroupParams{ + Config: params.Config.PacketGroup, + WeightedLoss: params.WeightedLoss, + Logger: params.Logger, + }, + 0, + ), + } +} + +func (p *probePacketGroup) ProbeClusterDone(pci ccutils.ProbeClusterInfo) { + if p.pci.Id != pci.Id { + return + } + + p.pci.Result = pci.Result + p.doneAt = mono.Now() +} + +func (p *probePacketGroup) MaybeFinalizeProbe(maxSequenceNumber uint64, rtt float64) (ccutils.ProbeClusterInfo, bool) { + if p.doneAt.IsZero() { + return ccutils.ProbeClusterInfoInvalid, false + } + + if maxSequenceNumber != 0 && p.maxSequenceNumber >= maxSequenceNumber { + return p.pci, true + } + + settleWait := time.Duration(float64(p.params.Config.SettleWaitNumRTT) * rtt * float64(time.Second)) + if settleWait < p.params.Config.SettleWaitMin { + settleWait = p.params.Config.SettleWaitMin + } + if settleWait > p.params.Config.SettleWaitMax { + settleWait = p.params.Config.SettleWaitMax + } + if time.Since(p.doneAt) < settleWait { + return ccutils.ProbeClusterInfoInvalid, false + } + + return p.pci, true +} + +func (p *probePacketGroup) Add(pi *packetInfo, sendDelta, recvDelta int64, isLost bool) error { + if !p.doneAt.IsZero() || pi.probeClusterId != p.pci.Id { + return nil + } + + p.maxSequenceNumber = max(p.maxSequenceNumber, pi.sequenceNumber) + + return p.packetGroup.Add(pi, sendDelta, recvDelta, isLost) +} + +func (p *probePacketGroup) MarshalLogObject(e zapcore.ObjectEncoder) error { + if p == nil { + return nil + } + + e.AddObject("pci", p.pci) + e.AddObject("packetGroup", p.packetGroup) + e.AddUint64("maxSequenceNumber", p.maxSequenceNumber) + e.AddTime("doneAt", p.doneAt) + return nil +} diff --git a/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go b/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go index e0ab0976a..029a5dcbf 100644 --- a/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go +++ b/pkg/sfu/bwe/sendsidebwe/send_side_bwe.go @@ -15,7 +15,10 @@ package sendsidebwe import ( + "time" + "github.com/livekit/livekit-server/pkg/sfu/bwe" + "github.com/livekit/livekit-server/pkg/sfu/ccutils" "github.com/livekit/protocol/logger" "github.com/pion/rtcp" ) @@ -100,12 +103,42 @@ func (s *SendSideBWE) Stop() { s.congestionDetector.Stop() } +func (s *SendSideBWE) RecordPacketSendAndGetSequenceNumber( + atMicro int64, + size int, + isRTX bool, + probeClusterId ccutils.ProbeClusterId, + isProbe bool, +) uint16 { + return s.congestionDetector.RecordPacketSendAndGetSequenceNumber(atMicro, size, isRTX, probeClusterId, isProbe) +} + func (s *SendSideBWE) HandleTWCCFeedback(report *rtcp.TransportLayerCC) { s.congestionDetector.HandleTWCCFeedback(report) } -func (s *SendSideBWE) CongestionState() bwe.CongestionState { - return s.congestionDetector.CongestionState() +func (s *SendSideBWE) UpdateRTT(rtt float64) { + s.congestionDetector.UpdateRTT(rtt) +} + +func (s *SendSideBWE) CanProbe() bool { + return s.congestionDetector.CanProbe() +} + +func (s *SendSideBWE) ProbeDuration() time.Duration { + return s.congestionDetector.ProbeDuration() +} + +func (s *SendSideBWE) ProbeClusterStarting(pci ccutils.ProbeClusterInfo) { + s.congestionDetector.ProbeClusterStarting(pci) +} + +func (s *SendSideBWE) ProbeClusterDone(pci ccutils.ProbeClusterInfo) { + s.congestionDetector.ProbeClusterDone(pci) +} + +func (s *SendSideBWE) ProbeClusterFinalize() (ccutils.ProbeSignal, int64, bool) { + return s.congestionDetector.ProbeClusterFinalize() } // ------------------------------------------------ diff --git a/pkg/sfu/bwe/sendsidebwe/traffic_stats.go b/pkg/sfu/bwe/sendsidebwe/traffic_stats.go index c29191529..efe6ac634 100644 --- a/pkg/sfu/bwe/sendsidebwe/traffic_stats.go +++ b/pkg/sfu/bwe/sendsidebwe/traffic_stats.go @@ -53,6 +53,7 @@ type trafficStats struct { ackedPackets int ackedBytes int lostPackets int + lostBytes int } func newTrafficStats(params trafficStatsParams) *trafficStats { @@ -73,6 +74,11 @@ func (ts *trafficStats) Merge(rhs *trafficStats) { ts.ackedPackets += rhs.ackedPackets ts.ackedBytes += rhs.ackedBytes ts.lostPackets += rhs.lostPackets + ts.lostBytes += rhs.lostBytes +} + +func (ts *trafficStats) NumBytes() int { + return ts.ackedBytes + ts.lostBytes } func (ts *trafficStats) Duration() int64 { @@ -80,6 +86,11 @@ func (ts *trafficStats) Duration() int64 { } func (ts *trafficStats) AcknowledgedBitrate() int64 { + duration := ts.Duration() + if duration == 0 { + return 0 + } + ackedBitrate := int64(ts.ackedBytes) * 8 * 1e6 / ts.Duration() return int64(float64(ackedBitrate) * ts.CapturedTrafficRatio()) } @@ -112,10 +123,10 @@ func (ts *trafficStats) WeightedLoss() float64 { pps := totalPackets * 1e6 / float64(ts.Duration()) // Log10 is used to give higher weight for the same loss ratio at higher packet rates, - // for e.g. with a penalty factor of 0.25 - // - 10% loss at 20 pps = 0.1 * log10(20) * 0.25 = 0.032 - // - 10% loss at 100 pps = 0.1 * log10(100) * 0.25 = 0.05 - // - 10% loss at 1000 pps = 0.1 * log10(1000) * 0.25 = 0.075 + // for e.g. + // - 10% loss at 20 pps = 0.1 * log10(20) = 0.130 + // - 10% loss at 100 pps = 0.1 * log10(100) = 0.2 + // - 10% loss at 1000 pps = 0.1 * log10(1000) = 0.3 return lossRatio * math.Log10(pps) } diff --git a/pkg/sfu/bwe/sendsidebwe/twcc_feedback.go b/pkg/sfu/bwe/sendsidebwe/twcc_feedback.go index 1e1251042..4f8761f07 100644 --- a/pkg/sfu/bwe/sendsidebwe/twcc_feedback.go +++ b/pkg/sfu/bwe/sendsidebwe/twcc_feedback.go @@ -66,7 +66,7 @@ func newTWCCFeedback(params twccFeedbackParams) *twccFeedback { } func (t *twccFeedback) ProcessReport(report *rtcp.TransportLayerCC, at time.Time) (int64, bool) { - // SSBWE-REMOVE t.params.Logger.Infow("TWCC feedback", "report", report.String()) // SSBWE-REMOVE + // t.params.Logger.Infow("send side bwe: TWCC feedback", "report", report.String()) // SSBWE-REMOVE t.numReports++ if t.lastFeedbackTime.IsZero() { t.lastFeedbackTime = at @@ -99,7 +99,7 @@ func (t *twccFeedback) ProcessReport(report *rtcp.TransportLayerCC, at time.Time if !isOutOfOrder { sinceLast := at.Sub(t.lastFeedbackTime) - // SSBWE-REMOVE t.params.Logger.Infow("report received", "at", at, "sinceLast", sinceLast, "pktCount", report.FbPktCount) // SSBWE-REMOVE + // t.params.Logger.Infow("send side bwe: report received", "at", at, "sinceLast", sinceLast, "pktCount", report.FbPktCount) // SSBWE-REMOVE if t.estimatedFeedbackInterval == 0 { t.estimatedFeedbackInterval = sinceLast } else { diff --git a/pkg/sfu/ccutils/probe_regulator.go b/pkg/sfu/ccutils/probe_regulator.go new file mode 100644 index 000000000..ed0940bd9 --- /dev/null +++ b/pkg/sfu/ccutils/probe_regulator.go @@ -0,0 +1,106 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ccutils + +import ( + "time" + + "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils/mono" +) + +// ------------------------------------------------ + +type ProbeRegulatorConfig struct { + BaseInterval time.Duration `yaml:"base_interval,omitempty"` + BackoffFactor float64 `yaml:"backoff_factor,omitempty"` + MaxInterval time.Duration `yaml:"max_interval,omitempty"` + + MinDuration time.Duration `yaml:"min_duration,omitempty"` + MaxDuration time.Duration `yaml:"max_duration,omitempty"` + DurationIncreaseFactor float64 `yaml:"duration_increase_factor,omitempty"` +} + +var ( + DefaultProbeRegulatorConfig = ProbeRegulatorConfig{ + BaseInterval: 3 * time.Second, + BackoffFactor: 1.5, + MaxInterval: 2 * time.Minute, + + MinDuration: 200 * time.Millisecond, + MaxDuration: 20 * time.Second, + DurationIncreaseFactor: 1.5, + } +) + +// --------------------------------------------------------------------------- + +type ProbeRegulatorParams struct { + Config ProbeRegulatorConfig + Logger logger.Logger +} + +type ProbeRegulator struct { + params ProbeRegulatorParams + + probeInterval time.Duration + probeDuration time.Duration + nextProbeEarliestAt time.Time +} + +func NewProbeRegulator(params ProbeRegulatorParams) *ProbeRegulator { + return &ProbeRegulator{ + params: params, + probeInterval: params.Config.BaseInterval, + probeDuration: params.Config.MinDuration, + nextProbeEarliestAt: mono.Now(), + } +} + +func (p *ProbeRegulator) CanProbe() bool { + return mono.Now().After(p.nextProbeEarliestAt) +} + +func (p *ProbeRegulator) ProbeDuration() time.Duration { + return p.probeDuration +} + +func (p *ProbeRegulator) ProbeSignal(probeSignal ProbeSignal, baseTime time.Time) { + if probeSignal == ProbeSignalCongesting { + // wait longer till next probe + p.probeInterval = time.Duration(p.probeInterval.Seconds()*p.params.Config.BackoffFactor) * time.Second + if p.probeInterval > p.params.Config.MaxInterval { + p.probeInterval = p.params.Config.MaxInterval + } + + // revert back to starting with shortest probe + p.probeDuration = p.params.Config.MinDuration + } else { + // probe can be started again after minimal interval as previous congestion signal indicated congestion clearing + p.probeInterval = p.params.Config.BaseInterval + + // can do longer probe after a good probe + p.probeDuration = time.Duration(float64(p.probeDuration.Milliseconds())*p.params.Config.DurationIncreaseFactor) * time.Millisecond + if p.probeDuration > p.params.Config.MaxDuration { + p.probeDuration = p.params.Config.MaxDuration + } + } + + if baseTime.IsZero() { + p.nextProbeEarliestAt = mono.Now().Add(p.probeInterval) + } else { + p.nextProbeEarliestAt = baseTime.Add(p.probeInterval) + } +} diff --git a/pkg/sfu/ccutils/probe_signal.go b/pkg/sfu/ccutils/probe_signal.go new file mode 100644 index 000000000..c71b7616e --- /dev/null +++ b/pkg/sfu/ccutils/probe_signal.go @@ -0,0 +1,40 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ccutils + +import "fmt" + +// ------------------------------------------------ + +type ProbeSignal int + +const ( + ProbeSignalInconclusive ProbeSignal = iota + ProbeSignalCongesting + ProbeSignalClearing +) + +func (p ProbeSignal) String() string { + switch p { + case ProbeSignalInconclusive: + return "INCONCLUSIVE" + case ProbeSignalCongesting: + return "CONGESTING" + case ProbeSignalClearing: + return "CLEARING" + default: + return fmt.Sprintf("%d", int(p)) + } +} diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 0b3dcb867..fb8c7212c 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -628,6 +628,10 @@ func (d *DownTrack) SetProbeClusterId(probeClusterId ccutils.ProbeClusterId) { d.probeClusterId.Store(uint32(probeClusterId)) } +func (d *DownTrack) SwapProbeClusterId(match ccutils.ProbeClusterId, swap ccutils.ProbeClusterId) { + d.probeClusterId.CompareAndSwap(uint32(match), uint32(swap)) +} + // ID is the unique identifier for this Track. This should be unique for the // stream, but doesn't have to globally unique. A common example would be 'audio' or 'video' // and StreamID would be 'desktop' or 'webcam' diff --git a/pkg/sfu/streamallocator/probe_controller.go b/pkg/sfu/streamallocator/probe_controller.go deleted file mode 100644 index d342e9615..000000000 --- a/pkg/sfu/streamallocator/probe_controller.go +++ /dev/null @@ -1,277 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package streamallocator - -import ( - "fmt" - "sync" - "time" - - "github.com/livekit/livekit-server/pkg/sfu/bwe" - "github.com/livekit/livekit-server/pkg/sfu/ccutils" - "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/utils/mono" -) - -const ( - cDefaultRTT = float64(0.070) // 70 ms - cRTTSmoothingFactor = float64(0.5) -) - -// --------------------------------------------------------------------------- - -type ProbeControllerState int - -const ( - ProbeControllerStateNone ProbeControllerState = iota - ProbeControllerStateProbing - ProbeControllerStateHangover -) - -func (p ProbeControllerState) String() string { - switch p { - case ProbeControllerStateNone: - return "NONE" - case ProbeControllerStateProbing: - return "PROBING" - case ProbeControllerStateHangover: - return "HANGOVER" - default: - return fmt.Sprintf("%d", int(p)) - } -} - -// ------------------------------------------------ - -type ProbeControllerConfig struct { - BaseInterval time.Duration `yaml:"base_interval,omitempty"` - BackoffFactor float64 `yaml:"backoff_factor,omitempty"` - MaxInterval time.Duration `yaml:"max_interval,omitempty"` - - SettleWaitNumRTT uint32 `yaml:"settle_wait_num_rtt,omitempty"` - SettleWaitMin time.Duration `yaml:"settle_wait_min,omitempty"` - SettleWaitMax time.Duration `yaml:"settle_wait_max,omitempty"` - - OveragePct int64 `yaml:"overage_pct,omitempty"` - MinBps int64 `yaml:"min_bps,omitempty"` - - MinDuration time.Duration `yaml:"min_duration,omitempty"` - MaxDuration time.Duration `yaml:"max_duration,omitempty"` - DurationIncreaseFactor float64 `yaml:"duration_increase_factor,omitempty"` -} - -var ( - DefaultProbeControllerConfig = ProbeControllerConfig{ - BaseInterval: 3 * time.Second, - BackoffFactor: 1.5, - MaxInterval: 2 * time.Minute, - - SettleWaitNumRTT: 10, - SettleWaitMin: 500 * time.Millisecond, - SettleWaitMax: 10 * time.Second, - - OveragePct: 120, - MinBps: 200_000, - - MinDuration: 200 * time.Millisecond, - MaxDuration: 20 * time.Second, - DurationIncreaseFactor: 1.5, - } -) - -// --------------------------------------------------------------------------- - -type ProbeControllerParams struct { - Config ProbeControllerConfig - Logger logger.Logger -} - -type ProbeController struct { - params ProbeControllerParams - - lock sync.RWMutex - - state ProbeControllerState - stateSwitchedAt time.Time - - pci ccutils.ProbeClusterInfo - rtt float64 - - probeInterval time.Duration - probeDuration time.Duration - nextProbeEarliestAt time.Time -} - -func NewProbeController(params ProbeControllerParams) *ProbeController { - p := &ProbeController{ - params: params, - rtt: cDefaultRTT, - } - - p.Reset() - return p -} - -func (p *ProbeController) Reset() { - p.lock.Lock() - defer p.lock.Unlock() - - p.state = ProbeControllerStateNone - p.stateSwitchedAt = mono.Now() - p.pci = ccutils.ProbeClusterInfoInvalid - p.probeInterval = p.params.Config.BaseInterval - p.probeDuration = p.params.Config.MinDuration - p.nextProbeEarliestAt = mono.Now() -} - -func (p *ProbeController) UpdateRTT(rtt float64) { - if rtt == 0 { - p.rtt = cDefaultRTT - } else { - if p.rtt == 0 { - p.rtt = rtt - } else { - p.rtt = cRTTSmoothingFactor*rtt + (1.0-cRTTSmoothingFactor)*p.rtt - } - } -} - -func (p *ProbeController) CanProbe() bool { - p.lock.RLock() - defer p.lock.RUnlock() - - return p.state == ProbeControllerStateNone && mono.Now().After(p.nextProbeEarliestAt) -} - -func (p *ProbeController) MaybeInitiateProbe(availableBandwidthBps int64, probeGoalDeltaBps int64, expectedBandwidthUsage int64) (ccutils.ProbeClusterGoal, bool) { - p.lock.RLock() - defer p.lock.RUnlock() - - if p.state != ProbeControllerStateNone { - // already probing or in probe hangover, don't start a new one - return ccutils.ProbeClusterGoal{}, false - } - - if mono.Now().Before(p.nextProbeEarliestAt) { - return ccutils.ProbeClusterGoal{}, false - } - - // overshoot a bit to account for noise (in measurement/estimate etc) - desiredIncreaseBps := (probeGoalDeltaBps * p.params.Config.OveragePct) / 100 - if desiredIncreaseBps < p.params.Config.MinBps { - desiredIncreaseBps = p.params.Config.MinBps - } - return ccutils.ProbeClusterGoal{ - AvailableBandwidthBps: int(availableBandwidthBps), - ExpectedUsageBps: int(expectedBandwidthUsage), - DesiredBps: int(expectedBandwidthUsage + desiredIncreaseBps), - Duration: p.probeDuration, - }, true -} - -func (p *ProbeController) ProbeClusterStarting(pci ccutils.ProbeClusterInfo) { - p.lock.Lock() - defer p.lock.Unlock() - - if p.state != ProbeControllerStateNone { - p.params.Logger.Warnw("unexpected probe controller state", nil, "state", p.state) - } - - p.setState(ProbeControllerStateProbing) - p.pci = pci -} - -func (p *ProbeController) ProbeClusterDone(pci ccutils.ProbeClusterInfo) { - p.lock.Lock() - defer p.lock.Unlock() - - if p.pci.Id == pci.Id { - p.pci.Result = pci.Result - p.setState(ProbeControllerStateHangover) - } -} - -func (p *ProbeController) MaybeFinalizeProbe() (ccutils.ProbeClusterInfo, bool) { - p.lock.Lock() - defer p.lock.Unlock() - - if p.state != ProbeControllerStateHangover { - return ccutils.ProbeClusterInfoInvalid, false - } - - settleWait := time.Duration(float64(p.params.Config.SettleWaitNumRTT) * p.rtt * float64(time.Second)) - if settleWait < p.params.Config.SettleWaitMin { - settleWait = p.params.Config.SettleWaitMin - } - if settleWait > p.params.Config.SettleWaitMax { - settleWait = p.params.Config.SettleWaitMax - } - if time.Since(p.stateSwitchedAt) < settleWait { - return ccutils.ProbeClusterInfoInvalid, false - } - - p.setState(ProbeControllerStateNone) - return p.pci, true -} - -func (p *ProbeController) ProbeSignal(probeSignal bwe.ProbeSignal) { - if probeSignal == bwe.ProbeSignalCongesting { - // wait longer till next probe - p.probeInterval = time.Duration(p.probeInterval.Seconds()*p.params.Config.BackoffFactor) * time.Second - if p.probeInterval > p.params.Config.MaxInterval { - p.probeInterval = p.params.Config.MaxInterval - } - - // revert back to starting with shortest probe - p.probeDuration = p.params.Config.MinDuration - } else { - // probe can be started again after minimal interval as previous congestion signal indicated congestion clearing - p.probeInterval = p.params.Config.BaseInterval - - // can do longer probe after a good probe - p.probeDuration = time.Duration(float64(p.probeDuration.Milliseconds())*p.params.Config.DurationIncreaseFactor) * time.Millisecond - if p.probeDuration > p.params.Config.MaxDuration { - p.probeDuration = p.params.Config.MaxDuration - } - } - - if p.pci.CreatedAt.IsZero() { - p.nextProbeEarliestAt = mono.Now().Add(p.probeInterval) - } else { - p.nextProbeEarliestAt = p.pci.CreatedAt.Add(p.probeInterval) - } -} - -func (p *ProbeController) GetActiveProbeClusterId() ccutils.ProbeClusterId { - p.lock.RLock() - defer p.lock.RUnlock() - - if p.state == ProbeControllerStateNone { - return ccutils.ProbeClusterIdInvalid - } - - return p.pci.Id -} - -func (p *ProbeController) setState(state ProbeControllerState) { - if state == p.state { - return - } - - p.state = state - p.stateSwitchedAt = mono.Now() -} - -// ------------------------------------------------ diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 1a3f2ec0c..0c8fc0aa3 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -157,16 +157,20 @@ const ( ) type StreamAllocatorConfig struct { - ProbeMode ProbeMode `yaml:"probe_mode,omitempty"` - MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"` - ProbeController ProbeControllerConfig `yaml:"probe_controller,omitempty"` - DisableEstimationUnmanagedTracks bool `yaml:"disable_etimation_unmanaged_tracks,omitempty"` + ProbeMode ProbeMode `yaml:"probe_mode,omitempty"` + MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"` + DisableEstimationUnmanagedTracks bool `yaml:"disable_etimation_unmanaged_tracks,omitempty"` + + ProbeOveragePct int64 `yaml:"probe_overage_pct,omitempty"` + ProbeMinBps int64 `yaml:"probe_min_bps,omitempty"` } var ( DefaultStreamAllocatorConfig = StreamAllocatorConfig{ - ProbeMode: ProbeModePadding, - ProbeController: DefaultProbeControllerConfig, + ProbeMode: ProbeModePadding, + + ProbeOveragePct: 120, + ProbeMinBps: 200_000, } ) @@ -193,8 +197,7 @@ type StreamAllocator struct { committedChannelCapacity int64 overriddenChannelCapacity int64 - probeController *ProbeController - prober *ccutils.Prober + prober *ccutils.Prober // STREAM-ALLOCATOR-DATA rateMonitor *RateMonitor @@ -203,8 +206,9 @@ type StreamAllocator struct { isAllocateAllPending bool rembTrackingSSRC uint32 - state streamAllocatorState - isHolding bool + state streamAllocatorState + isHolding bool + activeProbeClusterId ccutils.ProbeClusterId eventsQueue *utils.TypedOpsQueue[Event] @@ -219,8 +223,9 @@ func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause b enabled: enabled, allowPause: allowPause, // STREAM-ALLOCATOR-DATA rateMonitor: NewRateMonitor(), - videoTracks: make(map[livekit.TrackID]*Track), - state: streamAllocatorStateStable, + videoTracks: make(map[livekit.TrackID]*Track), + state: streamAllocatorStateStable, + activeProbeClusterId: ccutils.ProbeClusterIdInvalid, eventsQueue: utils.NewTypedOpsQueue[Event](utils.OpsQueueParams{ Name: "stream-allocator", MinSize: 64, @@ -234,11 +239,6 @@ func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause b Logger: params.Logger, }) - s.probeController = NewProbeController(ProbeControllerParams{ - Config: s.params.Config.ProbeController, - Logger: params.Logger, - }) - s.params.BWE.SetBWEListener(s) s.params.Pacer.SetPacerProbeObserverListener(s) @@ -298,7 +298,7 @@ func (s *StreamAllocator) AddTrack(downTrack *sfu.DownTrack, params AddTrackPara } downTrack.SetStreamAllocatorListener(s) - downTrack.SetProbeClusterId(s.prober.GetActiveClusterId()) + downTrack.SetProbeClusterId(s.activeProbeClusterId) s.maybePostEventAllocateTrack(downTrack) } @@ -694,22 +694,25 @@ func (s *StreamAllocator) handleSignalEstimate(event Event) { func (s *StreamAllocator) handleSignalPeriodicPing(Event) { // finalize any probe that may have finished/aborted - if pci, ok := s.probeController.MaybeFinalizeProbe(); ok { - probeSignal, channelCapacity := s.params.BWE.ProbeClusterDone(pci) - s.params.Logger.Debugw( - "stream allocator: probe result", - "probeSignal", probeSignal, - "channelCapacity", channelCapacity, - ) - if probeSignal != bwe.ProbeSignalCongesting { - if channelCapacity > s.committedChannelCapacity { - s.committedChannelCapacity = channelCapacity + if s.activeProbeClusterId != ccutils.ProbeClusterIdInvalid { + if probeSignal, channelCapacity, isFinalized := s.params.BWE.ProbeClusterFinalize(); isFinalized { + s.params.Logger.Debugw( + "stream allocator: probe result", + "probeClusterId", s.activeProbeClusterId, + "probeSignal", probeSignal, + "channelCapacity", channelCapacity, + ) + + s.activeProbeClusterId = ccutils.ProbeClusterIdInvalid + + if probeSignal != ccutils.ProbeSignalCongesting { + if channelCapacity > s.committedChannelCapacity { + s.committedChannelCapacity = channelCapacity + } + + s.maybeBoostDeficientTracks() } - - s.maybeBoostDeficientTracks() } - - s.probeController.ProbeSignal(probeSignal) } // probe if necessary and timing is right @@ -722,7 +725,7 @@ func (s *StreamAllocator) handleSignalPeriodicPing(Event) { if s.params.RTTGetter != nil { if rtt, ok := s.params.RTTGetter(); ok { - s.probeController.UpdateRTT(rtt) + s.params.BWE.UpdateRTT(rtt) } } } @@ -735,7 +738,8 @@ func (s *StreamAllocator) handleSignalPeriodicPing(Event) { func (s *StreamAllocator) handleSignalProbeClusterSwitch(event Event) { pci := event.Data.(ccutils.ProbeClusterInfo) - s.probeController.ProbeClusterStarting(pci) + s.activeProbeClusterId = pci.Id + s.params.BWE.ProbeClusterStarting(pci) s.params.Pacer.StartProbeCluster(pci) @@ -765,7 +769,12 @@ func (s *StreamAllocator) handleSignalSendProbe(event Event) { func (s *StreamAllocator) handleSignalPacerProbeObserverClusterComplete(event Event) { probeClusterId, _ := event.Data.(ccutils.ProbeClusterId) pci := s.params.Pacer.EndProbeCluster(probeClusterId) - s.probeController.ProbeClusterDone(pci) + + for _, t := range s.getTracks() { + t.DownTrack().SwapProbeClusterId(pci.Id, ccutils.ProbeClusterIdInvalid) + } + + s.params.BWE.ProbeClusterDone(pci) s.prober.ClusterDone(pci) } @@ -850,7 +859,7 @@ func (s *StreamAllocator) handleSignalCongestionStateChange(event Event) { } if cscd.congestionState == bwe.CongestionStateCongested { - if s.probeController.GetActiveProbeClusterId() != ccutils.ProbeClusterIdInvalid { + if s.activeProbeClusterId != ccutils.ProbeClusterIdInvalid { s.params.Logger.Infow( "stream allocator: channel congestion detected, not updating channel capacity in active probe", "old(bps)", s.committedChannelCapacity, @@ -887,12 +896,10 @@ func (s *StreamAllocator) setState(state streamAllocatorState) { s.params.Logger.Infow("stream allocator: state change", "from", s.state, "to", state) s.state = state - // restart everything when when state is stable + // restart everything when state is STABLE if state == streamAllocatorStateStable { s.maybeStopProbe() - s.probeController.Reset() - s.params.BWE.Reset() } } @@ -1064,12 +1071,13 @@ func (s *StreamAllocator) allocateTrack(track *Track) { } func (s *StreamAllocator) maybeStopProbe() { - activeProbeClusterId := s.probeController.GetActiveProbeClusterId() - if activeProbeClusterId != ccutils.ProbeClusterIdInvalid { - pci := s.params.Pacer.EndProbeCluster(activeProbeClusterId) - s.probeController.ProbeClusterDone(pci) - s.prober.Reset(pci) + if s.activeProbeClusterId == ccutils.ProbeClusterIdInvalid { + return } + + pci := s.params.Pacer.EndProbeCluster(s.activeProbeClusterId) + s.params.BWE.ProbeClusterDone(pci) + s.prober.Reset(pci) } func (s *StreamAllocator) maybeBoostDeficientTracks() { @@ -1296,7 +1304,7 @@ func (s *StreamAllocator) maybeProbe() { return } - if s.params.BWE.CongestionState() != bwe.CongestionStateNone || !s.probeController.CanProbe() { + if !s.params.BWE.CanProbe() { return } @@ -1321,7 +1329,7 @@ func (s *StreamAllocator) maybeProbeWithMedia() { updateStreamStateChange(track, allocation, update) s.maybeSendUpdate(update) - s.probeController.Reset() + s.params.BWE.Reset() break } } @@ -1334,14 +1342,25 @@ func (s *StreamAllocator) maybeProbeWithPadding() { continue } - pcg, ok := s.probeController.MaybeInitiateProbe(s.committedChannelCapacity, transition.BandwidthDelta, s.getExpectedBandwidthUsage()) - if ok { - pci := s.prober.AddCluster(ccutils.ProbeClusterModeUniform, pcg) - s.params.Logger.Debugw( - "stream allocator: starting probe", - "probeClusterInfo", pci, - ) + // overshoot a bit to account for noise (in measurement/estimate etc) + desiredIncreaseBps := (transition.BandwidthDelta * s.params.Config.ProbeOveragePct) / 100 + if desiredIncreaseBps < s.params.Config.ProbeMinBps { + desiredIncreaseBps = s.params.Config.ProbeMinBps } + expectedBandwidthUsage := s.getExpectedBandwidthUsage() + pci := s.prober.AddCluster( + ccutils.ProbeClusterModeUniform, + ccutils.ProbeClusterGoal{ + AvailableBandwidthBps: int(s.committedChannelCapacity), + ExpectedUsageBps: int(expectedBandwidthUsage), + DesiredBps: int(expectedBandwidthUsage + desiredIncreaseBps), + Duration: s.params.BWE.ProbeDuration(), + }, + ) + s.params.Logger.Debugw( + "stream allocator: adding probe", + "probeClusterInfo", pci, + ) break } }