TWCC probing (#3234)

* WIP

* WIP

* WIP

* make it compile

* typo

* clean up

* fmt

* fixes
This commit is contained in:
Raja Subramanian
2024-12-06 00:13:36 +05:30
committed by GitHub
parent d862917249
commit 94488d434d
16 changed files with 911 additions and 454 deletions
+13 -25
View File
@@ -16,6 +16,7 @@ package bwe
import (
"fmt"
"time"
"github.com/livekit/livekit-server/pkg/sfu/ccutils"
"github.com/pion/rtcp"
@@ -23,6 +24,13 @@ import (
// ------------------------------------------------
const (
DefaultRTT = float64(0.070) // 70 ms
RTTSmoothingFactor = float64(0.5)
)
// ------------------------------------------------
type CongestionState int
const (
@@ -52,29 +60,6 @@ func (c CongestionState) String() string {
// ------------------------------------------------
type ProbeSignal int
const (
ProbeSignalInconclusive ProbeSignal = iota
ProbeSignalCongesting
ProbeSignalClearing
)
func (p ProbeSignal) String() string {
switch p {
case ProbeSignalInconclusive:
return "INCONCLUSIVE"
case ProbeSignalCongesting:
return "CONGESTING"
case ProbeSignalClearing:
return "CLEARING"
default:
return fmt.Sprintf("%d", int(p))
}
}
// ------------------------------------------------
type BWE interface {
SetBWEListener(bweListner BWEListener)
@@ -100,10 +85,13 @@ type BWE interface {
HandleTWCCFeedback(report *rtcp.TransportLayerCC)
CongestionState() CongestionState
UpdateRTT(rtt float64)
CanProbe() bool
ProbeDuration() time.Duration
ProbeClusterStarting(pci ccutils.ProbeClusterInfo)
ProbeClusterDone(pci ccutils.ProbeClusterInfo) (ProbeSignal, int64)
ProbeClusterDone(pci ccutils.ProbeClusterInfo)
ProbeClusterFinalize() (ccutils.ProbeSignal, int64, bool)
}
// ------------------------------------------------
+14 -4
View File
@@ -15,6 +15,8 @@
package bwe
import (
"time"
"github.com/livekit/livekit-server/pkg/sfu/ccutils"
"github.com/pion/rtcp"
)
@@ -48,14 +50,22 @@ func (n *NullBWE) HandleREMB(
func (n *NullBWE) HandleTWCCFeedback(_report *rtcp.TransportLayerCC) {}
func (n *NullBWE) CongestionState() CongestionState {
return CongestionStateNone
func (n *NullBWE) UpdateRTT(rtt float64) {}
func (n *NullBWE) CanProbe() bool {
return false
}
func (n *NullBWE) ProbeDuration() time.Duration {
return 0
}
func (n *NullBWE) ProbeClusterStarting(_pci ccutils.ProbeClusterInfo) {}
func (n *NullBWE) ProbeClusterDone(_pci ccutils.ProbeClusterInfo) (ProbeSignal, int64) {
return ProbeSignalInconclusive, 0
func (n *NullBWE) ProbeClusterDone(_pci ccutils.ProbeClusterInfo) {}
func (n *NullBWE) ProbeClusterFinalize() (ccutils.ProbeSignal, int64, bool) {
return ccutils.ProbeSignalInconclusive, 0, false
}
// ------------------------------------------------
+172
View File
@@ -0,0 +1,172 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remotebwe
import (
"fmt"
"time"
"github.com/livekit/livekit-server/pkg/sfu/bwe"
"github.com/livekit/livekit-server/pkg/sfu/ccutils"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/mono"
)
// ---------------------------------------------------------------------------
type probeControllerState int
const (
probeControllerStateNone probeControllerState = iota
probeControllerStateProbing
probeControllerStateHangover
)
func (p probeControllerState) String() string {
switch p {
case probeControllerStateNone:
return "NONE"
case probeControllerStateProbing:
return "PROBING"
case probeControllerStateHangover:
return "HANGOVER"
default:
return fmt.Sprintf("%d", int(p))
}
}
// ------------------------------------------------
type ProbeControllerConfig struct {
ProbeRegulator ccutils.ProbeRegulatorConfig `yaml:"probe_regulator,omitempty"`
SettleWaitNumRTT uint32 `yaml:"settle_wait_num_rtt,omitempty"`
SettleWaitMin time.Duration `yaml:"settle_wait_min,omitempty"`
SettleWaitMax time.Duration `yaml:"settle_wait_max,omitempty"`
}
var (
DefaultProbeControllerConfig = ProbeControllerConfig{
ProbeRegulator: ccutils.DefaultProbeRegulatorConfig,
SettleWaitNumRTT: 10,
SettleWaitMin: 500 * time.Millisecond,
SettleWaitMax: 10 * time.Second,
}
)
// ---------------------------------------------------------------------------
type probeControllerParams struct {
Config ProbeControllerConfig
Logger logger.Logger
}
type probeController struct {
params probeControllerParams
state probeControllerState
stateSwitchedAt time.Time
pci ccutils.ProbeClusterInfo
rtt float64
*ccutils.ProbeRegulator
}
func newProbeController(params probeControllerParams) *probeController {
return &probeController{
params: params,
state: probeControllerStateNone,
stateSwitchedAt: mono.Now(),
pci: ccutils.ProbeClusterInfoInvalid,
rtt: bwe.DefaultRTT,
ProbeRegulator: ccutils.NewProbeRegulator(
ccutils.ProbeRegulatorParams{
Config: params.Config.ProbeRegulator,
Logger: params.Logger,
},
),
}
}
func (p *probeController) UpdateRTT(rtt float64) {
if rtt == 0 {
p.rtt = bwe.DefaultRTT
} else {
if p.rtt == 0 {
p.rtt = rtt
} else {
p.rtt = bwe.RTTSmoothingFactor*rtt + (1.0-bwe.RTTSmoothingFactor)*p.rtt
}
}
}
func (p *probeController) CanProbe() bool {
return p.state == probeControllerStateNone && p.ProbeRegulator.CanProbe()
}
func (p *probeController) IsInProbe() bool {
return p.state != probeControllerStateNone
}
func (p *probeController) ProbeClusterStarting(pci ccutils.ProbeClusterInfo) {
if p.state != probeControllerStateNone {
p.params.Logger.Warnw("unexpected probe controller state", nil, "state", p.state)
}
p.setState(probeControllerStateProbing)
p.pci = pci
}
func (p *probeController) ProbeClusterDone(pci ccutils.ProbeClusterInfo) {
if p.pci.Id != pci.Id {
return
}
p.pci.Result = pci.Result
p.setState(probeControllerStateHangover)
}
func (p *probeController) MaybeFinalizeProbe() (ccutils.ProbeClusterInfo, bool) {
if p.state != probeControllerStateHangover {
return ccutils.ProbeClusterInfoInvalid, false
}
settleWait := time.Duration(float64(p.params.Config.SettleWaitNumRTT) * p.rtt * float64(time.Second))
if settleWait < p.params.Config.SettleWaitMin {
settleWait = p.params.Config.SettleWaitMin
}
if settleWait > p.params.Config.SettleWaitMax {
settleWait = p.params.Config.SettleWaitMax
}
if time.Since(p.stateSwitchedAt) < settleWait {
return ccutils.ProbeClusterInfoInvalid, false
}
p.setState(probeControllerStateNone)
return p.pci, true
}
func (p *probeController) setState(state probeControllerState) {
if state == p.state {
return
}
p.state = state
p.stateSwitchedAt = mono.Now()
}
// ------------------------------------------------
+70 -42
View File
@@ -27,25 +27,22 @@ import (
// ---------------------------------------------------------------------------
type RemoteBWEConfig struct {
NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"`
ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"`
ChannelObserverProbe ChannelObserverConfig `yaml:"channel_observer_probe,omitempty"`
ChannelObserverNonProbe ChannelObserverConfig `yaml:"channel_observer_non_probe,omitempty"`
CongestedMinDuration time.Duration `yaml:"congested_min_duration,omitempty"`
PeriodicCheckInterval time.Duration `yaml:"periodic_check_interval,omitempty"`
PeriodicCheckIntervalCongested time.Duration `yaml:"periodic_check_interval_congested,omitempty"`
NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"`
ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"`
ChannelObserverProbe ChannelObserverConfig `yaml:"channel_observer_probe,omitempty"`
ChannelObserverNonProbe ChannelObserverConfig `yaml:"channel_observer_non_probe,omitempty"`
CongestedHangoverDuration time.Duration `yaml:"congested_hangover_duration,omitempty"`
ProbeController ProbeControllerConfig `yaml:"probe_controller,omitempty"`
}
var (
DefaultRemoteBWEConfig = RemoteBWEConfig{
NackRatioAttenuator: 0.4,
ExpectedUsageThreshold: 0.95,
ChannelObserverProbe: defaultChannelObserverConfigProbe,
ChannelObserverNonProbe: defaultChannelObserverConfigNonProbe,
CongestedMinDuration: 3 * time.Second,
PeriodicCheckInterval: 2 * time.Second,
PeriodicCheckIntervalCongested: 200 * time.Millisecond,
NackRatioAttenuator: 0.4,
ExpectedUsageThreshold: 0.95,
ChannelObserverProbe: defaultChannelObserverConfigProbe,
ChannelObserverNonProbe: defaultChannelObserverConfigNonProbe,
CongestedHangoverDuration: 3 * time.Second,
ProbeController: DefaultProbeControllerConfig,
}
)
@@ -67,7 +64,8 @@ type RemoteBWE struct {
lastExpectedBandwidthUsage int64
committedChannelCapacity int64
isInProbe bool
probeController *probeController
channelObserver *channelObserver
congestionState bwe.CongestionState
@@ -107,11 +105,15 @@ func (r *RemoteBWE) Reset() {
r.lastExpectedBandwidthUsage = 0
r.committedChannelCapacity = 100_000_000
r.isInProbe = false
r.newChannelObserver()
r.congestionState = bwe.CongestionStateNone
r.congestionStateSwitchedAt = mono.Now()
r.probeController = newProbeController(probeControllerParams{
Config: r.params.Config.ProbeController,
Logger: r.params.Logger,
})
r.newChannelObserver()
}
func (r *RemoteBWE) HandleREMB(
@@ -126,7 +128,7 @@ func (r *RemoteBWE) HandleREMB(
// in probe, freeze channel observer state if probe causes congestion till the probe is done,
// this is to ensure that probe result is not a success and an unsuccessful probe will not up allocate any tracks
if r.isInProbe && r.congestionState != bwe.CongestionStateNone {
if r.congestionState != bwe.CongestionStateNone && r.probeController.IsInProbe() {
r.lock.Unlock()
return
}
@@ -144,11 +146,11 @@ func (r *RemoteBWE) HandleREMB(
}
}
func (r *RemoteBWE) CongestionState() bwe.CongestionState {
r.lock.RLock()
defer r.lock.RUnlock()
func (r *RemoteBWE) UpdateRTT(rtt float64) {
r.lock.Lock()
defer r.lock.Unlock()
return r.congestionState
r.probeController.UpdateRTT(rtt)
}
func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState, int64) {
@@ -162,7 +164,7 @@ func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState
switch r.congestionState {
case bwe.CongestionStateNone:
if trend == channelTrendCongesting {
if r.isInProbe || r.estimateAvailableChannelCapacity(reason) {
if r.probeController.IsInProbe() || r.estimateAvailableChannelCapacity(reason) {
// when in probe, if congested, stays there will probe is done,
// the estimate stays at pre-probe level
newState = bwe.CongestionStateCongested
@@ -184,7 +186,7 @@ func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState
if r.estimateAvailableChannelCapacity(reason) {
newState = bwe.CongestionStateCongested
}
} else if time.Since(r.congestionStateSwitchedAt) >= r.params.Config.CongestedMinDuration {
} else if time.Since(r.congestionStateSwitchedAt) >= r.params.Config.CongestedHangoverDuration {
newState = bwe.CongestionStateNone
}
}
@@ -254,6 +256,20 @@ func (r *RemoteBWE) updateCongestionState(state bwe.CongestionState, reason chan
r.congestionStateSwitchedAt = mono.Now()
}
func (r *RemoteBWE) CanProbe() bool {
r.lock.Lock()
defer r.lock.Unlock()
return r.congestionState == bwe.CongestionStateNone && r.probeController.CanProbe()
}
func (r *RemoteBWE) ProbeDuration() time.Duration {
r.lock.Lock()
defer r.lock.Unlock()
return r.probeController.ProbeDuration()
}
func (r *RemoteBWE) ProbeClusterStarting(pci ccutils.ProbeClusterInfo) {
r.lock.Lock()
defer r.lock.Unlock()
@@ -267,49 +283,61 @@ func (r *RemoteBWE) ProbeClusterStarting(pci ccutils.ProbeClusterInfo) {
"channel", r.channelObserver,
)
r.isInProbe = true
r.probeController.ProbeClusterStarting(pci)
r.newChannelObserver()
}
func (r *RemoteBWE) ProbeClusterDone(_pci ccutils.ProbeClusterInfo) (bwe.ProbeSignal, int64) {
func (r *RemoteBWE) ProbeClusterDone(pci ccutils.ProbeClusterInfo) {
r.lock.Lock()
defer r.lock.Unlock()
r.probeController.ProbeClusterDone(pci)
}
func (r *RemoteBWE) ProbeClusterFinalize() (ccutils.ProbeSignal, int64, bool) {
r.lock.Lock()
defer r.lock.Unlock()
pci, isFinalized := r.probeController.MaybeFinalizeProbe()
if !isFinalized {
return ccutils.ProbeSignalInconclusive, 0, isFinalized
}
// switch to a non-probe channel observer on probe end,
// reset congestion state to get a fresh trend
pco := r.channelObserver
probeCongestionState := r.congestionState
r.isInProbe = false
r.congestionState = bwe.CongestionStateNone
r.newChannelObserver()
r.params.Logger.Debugw(
"remote bwe: probe done",
"remote bwe: probe finalized",
"lastReceived", r.lastReceivedEstimate,
"expectedBandwidthUsage", r.lastExpectedBandwidthUsage,
"channel", pco,
"isSignalValid", pco.HasEnoughEstimateSamples(),
"probeClusterInfo", pci,
)
probeSignal := ccutils.ProbeSignalClearing
if probeCongestionState != bwe.CongestionStateNone {
return bwe.ProbeSignalCongesting, r.committedChannelCapacity
probeSignal = ccutils.ProbeSignalCongesting
} else if trend, _ := pco.GetTrend(); !pco.HasEnoughEstimateSamples() || trend == channelTrendNeutral {
probeSignal = ccutils.ProbeSignalInconclusive
} else {
highestEstimate := pco.GetHighestEstimate()
if highestEstimate > r.committedChannelCapacity {
r.committedChannelCapacity = highestEstimate
}
}
trend, _ := pco.GetTrend()
if !pco.HasEnoughEstimateSamples() || trend == channelTrendNeutral {
return bwe.ProbeSignalInconclusive, r.committedChannelCapacity
}
highestEstimate := pco.GetHighestEstimate()
if highestEstimate > r.committedChannelCapacity {
r.committedChannelCapacity = highestEstimate
}
return bwe.ProbeSignalClearing, r.committedChannelCapacity
r.probeController.ProbeSignal(probeSignal, pci.CreatedAt)
return probeSignal, r.committedChannelCapacity, true
}
func (r *RemoteBWE) newChannelObserver() {
if r.isInProbe {
if r.probeController.IsInProbe() {
r.channelObserver = newChannelObserver(
channelObserverParams{
Name: "probe",
+193 -36
View File
@@ -63,6 +63,58 @@ var (
// -------------------------------------------------------------------------------
type ProbeSignalConfig struct {
MinBytesRatio float64 `yaml:"min_bytes_ratio,omitempty"`
MinDurationRatio float64 `yaml:"min_duration_ratio,omitempty"`
JQRMinDelay time.Duration `yaml:"jqr_min_delay,omitempty"`
DQRMaxDelay time.Duration `yaml:"dqr_max_delay,omitempty"`
WeightedLoss WeightedLossConfig `yaml:"weighted_loss,omitempty"`
CongestionMinLoss float64 `yaml:"congestion_min_loss,omitempty"`
}
func (p ProbeSignalConfig) IsValid(pci ccutils.ProbeClusterInfo) bool {
return pci.Result.Bytes() > int(p.MinBytesRatio*float64(pci.Goal.DesiredBytes)) && pci.Result.Duration() > time.Duration(p.MinDurationRatio*float64(pci.Goal.Duration))
}
func (p ProbeSignalConfig) ProbeSignal(ppg *probePacketGroup) (ccutils.ProbeSignal, int64) {
ts := newTrafficStats(trafficStatsParams{
Config: p.WeightedLoss,
})
ts.Merge(ppg.Traffic())
pqd := ppg.PropagatedQueuingDelay()
if pqd > p.JQRMinDelay.Microseconds() {
return ccutils.ProbeSignalCongesting, ts.AcknowledgedBitrate()
}
if ts.WeightedLoss() > p.CongestionMinLoss {
return ccutils.ProbeSignalCongesting, ts.AcknowledgedBitrate()
}
if pqd < p.DQRMaxDelay.Microseconds() {
return ccutils.ProbeSignalClearing, ts.AcknowledgedBitrate()
}
return ccutils.ProbeSignalInconclusive, ts.AcknowledgedBitrate()
}
var (
DefaultProbeSignalConfig = ProbeSignalConfig{
MinBytesRatio: 0.5,
MinDurationRatio: 0.5,
JQRMinDelay: 15 * time.Millisecond,
DQRMaxDelay: 5 * time.Millisecond,
WeightedLoss: defaultWeightedLossConfig,
CongestionMinLoss: 0.25,
}
)
// -------------------------------------------------------------------------------
type qdMeasurement struct {
earlyWarningConfig CongestionSignalConfig
congestedConfig CongestionSignalConfig
@@ -94,7 +146,7 @@ func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup) {
return
}
pqd, pqdOk := pg.PropagatedQueuingDelay()
pqd, pqdOk := pg.FinalizedPropagatedQueuingDelay()
if !pqdOk {
return
}
@@ -234,6 +286,10 @@ type CongestionDetectorConfig struct {
PacketGroup PacketGroupConfig `yaml:"packet_group,omitempty"`
PacketGroupMaxAge time.Duration `yaml:"packet_group_max_age,omitempty"`
ProbePacketGroup ProbePacketGroupConfig `yaml:"probe_packet_group,omitempty"`
ProbeRegulator ccutils.ProbeRegulatorConfig `yaml:"probe_regulator,omitempty"`
ProbeSignal ProbeSignalConfig `yaml:"probe_signal,omitempty"`
JQRMinDelay time.Duration `yaml:"jqr_min_delay,omitempty"`
DQRMaxDelay time.Duration `yaml:"dqr_max_delay,omitempty"`
@@ -268,24 +324,34 @@ var (
}
DefaultCongestionDetectorConfig = CongestionDetectorConfig{
PacketGroup: DefaultPacketGroupConfig,
PacketGroupMaxAge: 15 * time.Second,
JQRMinDelay: 15 * time.Millisecond,
DQRMaxDelay: 5 * time.Millisecond,
WeightedLoss: defaultWeightedLossConfig,
CongestionMinLoss: 0.25,
QueuingDelayEarlyWarning: DefaultQueuingDelayEarlyWarningCongestionSignalConfig,
LossEarlyWarning: DefaultLossEarlyWarningCongestionSignalConfig,
EarlyWarningHangover: 500 * time.Millisecond,
QueuingDelayCongested: DefaultQueuingDelayCongestedCongestionSignalConfig,
LossCongested: DefaultLossCongestedCongestionSignalConfig,
CongestedHangover: 3 * time.Second,
PacketGroup: DefaultPacketGroupConfig,
PacketGroupMaxAge: 15 * time.Second,
ProbePacketGroup: DefaultPacketGroupConfigProbe,
ProbeRegulator: ccutils.DefaultProbeRegulatorConfig,
ProbeSignal: DefaultProbeSignalConfig,
JQRMinDelay: 15 * time.Millisecond,
DQRMaxDelay: 5 * time.Millisecond,
WeightedLoss: defaultWeightedLossConfig,
CongestionMinLoss: 0.25,
QueuingDelayEarlyWarning: DefaultQueuingDelayEarlyWarningCongestionSignalConfig,
LossEarlyWarning: DefaultLossEarlyWarningCongestionSignalConfig,
EarlyWarningHangover: 500 * time.Millisecond,
QueuingDelayCongested: DefaultQueuingDelayCongestedCongestionSignalConfig,
LossCongested: DefaultLossCongestedCongestionSignalConfig,
CongestedHangover: 3 * time.Second,
RateMeasurementWindowDurationMin: 800 * time.Millisecond,
RateMeasurementWindowDurationMax: 2 * time.Second,
PeriodicCheckInterval: 2 * time.Second,
PeriodicCheckIntervalCongested: 200 * time.Millisecond,
CongestedCTRTrend: defaultTrendDetectorConfigCongestedCTR,
CongestedCTREpsilon: 0.05,
PeriodicCheckInterval: 2 * time.Second,
PeriodicCheckIntervalCongested: 200 * time.Millisecond,
CongestedCTRTrend: defaultTrendDetectorConfigCongestedCTR,
CongestedCTREpsilon: 0.05,
}
)
@@ -307,11 +373,16 @@ type congestionDetector struct {
lock sync.RWMutex
feedbackReports deque.Deque[feedbackReport]
rtt float64
*packetTracker
twccFeedback *twccFeedback
packetGroups []*packetGroup
probePacketGroup *probePacketGroup
probeRegulator *ccutils.ProbeRegulator
wake chan struct{}
stop core.Fuse
@@ -326,9 +397,14 @@ type congestionDetector struct {
func newCongestionDetector(params congestionDetectorParams) *congestionDetector {
c := &congestionDetector{
params: params,
packetTracker: newPacketTracker(packetTrackerParams{Logger: params.Logger}),
twccFeedback: newTWCCFeedback(twccFeedbackParams{Logger: params.Logger}),
params: params,
rtt: bwe.DefaultRTT,
packetTracker: newPacketTracker(packetTrackerParams{Logger: params.Logger}),
twccFeedback: newTWCCFeedback(twccFeedbackParams{Logger: params.Logger}),
probeRegulator: ccutils.NewProbeRegulator(ccutils.ProbeRegulatorParams{
Config: params.Config.ProbeRegulator,
Logger: params.Logger,
}),
wake: make(chan struct{}, 1),
estimatedAvailableChannelCapacity: 100_000_000,
congestionState: bwe.CongestionStateNone,
@@ -371,11 +447,87 @@ func (c *congestionDetector) HandleTWCCFeedback(report *rtcp.TransportLayerCC) {
}
}
func (c *congestionDetector) CongestionState() bwe.CongestionState {
c.lock.RLock()
defer c.lock.RUnlock()
func (c *congestionDetector) UpdateRTT(rtt float64) {
if rtt == 0 {
c.rtt = bwe.DefaultRTT
} else {
if c.rtt == 0 {
c.rtt = rtt
} else {
c.rtt = bwe.RTTSmoothingFactor*rtt + (1.0-bwe.RTTSmoothingFactor)*c.rtt
}
}
}
return c.congestionState
func (c *congestionDetector) CanProbe() bool {
c.lock.Lock()
defer c.lock.Unlock()
return c.congestionState == bwe.CongestionStateNone && c.probePacketGroup == nil && c.probeRegulator.CanProbe()
}
func (c *congestionDetector) ProbeDuration() time.Duration {
c.lock.Lock()
defer c.lock.Unlock()
return c.probeRegulator.ProbeDuration()
}
func (c *congestionDetector) ProbeClusterStarting(pci ccutils.ProbeClusterInfo) {
c.lock.Lock()
defer c.lock.Unlock()
c.probePacketGroup = newProbePacketGroup(
probePacketGroupParams{
Config: c.params.Config.ProbePacketGroup,
WeightedLoss: c.params.Config.WeightedLoss,
Logger: c.params.Logger,
},
pci,
)
c.packetTracker.ProbeClusterStarting(pci.Id)
}
func (c *congestionDetector) ProbeClusterDone(pci ccutils.ProbeClusterInfo) {
c.lock.Lock()
defer c.lock.Unlock()
c.packetTracker.ProbeClusterDone(pci.Id)
if c.probePacketGroup != nil {
c.probePacketGroup.ProbeClusterDone(pci)
}
}
func (c *congestionDetector) ProbeClusterFinalize() (ccutils.ProbeSignal, int64, bool) {
c.lock.Lock()
defer c.lock.Unlock()
if c.probePacketGroup == nil {
return ccutils.ProbeSignalInconclusive, 0, false
}
pci, isFinalized := c.probePacketGroup.MaybeFinalizeProbe(c.packetTracker.ProbeMaxSequenceNumber(), c.rtt)
if !isFinalized {
return ccutils.ProbeSignalInconclusive, 0, isFinalized
}
isSignalValid := c.params.Config.ProbeSignal.IsValid(pci)
c.params.Logger.Debugw(
"send side bwe: probe finalized",
"isSignalValid", isSignalValid,
"probeClusterInfo", pci,
"probePacketGroup", c.probePacketGroup,
)
probeSignal, estimatedAvailableChannelCapacity := c.params.Config.ProbeSignal.ProbeSignal(c.probePacketGroup)
if probeSignal == ccutils.ProbeSignalClearing && estimatedAvailableChannelCapacity > c.estimatedAvailableChannelCapacity {
c.estimatedAvailableChannelCapacity = estimatedAvailableChannelCapacity
}
c.probeRegulator.ProbeSignal(probeSignal, pci.CreatedAt)
c.probePacketGroup = nil
return probeSignal, c.estimatedAvailableChannelCapacity, true
}
func (c *congestionDetector) prunePacketGroups() {
@@ -458,7 +610,7 @@ func (c *congestionDetector) congestionDetectionStateMachine() {
switch state {
case bwe.CongestionStateNone:
if congestedTriggered {
c.params.Logger.Warnw("invalid congested state transition", nil, "from", state, "reason", congestedReason)
c.params.Logger.Warnw("send side bwe: invalid congested state transition", nil, "from", state, "reason", congestedReason)
}
if earlyWarningTriggered {
newState = bwe.CongestionStateEarlyWarning
@@ -475,7 +627,7 @@ func (c *congestionDetector) congestionDetectionStateMachine() {
case bwe.CongestionStateEarlyWarningHangover:
if congestedTriggered {
c.params.Logger.Warnw("invalid congested state transition", nil, "from", state, "reason", congestedReason)
c.params.Logger.Warnw("send side bwe: invalid congested state transition", nil, "from", state, "reason", congestedReason)
}
if earlyWarningTriggered {
newState = bwe.CongestionStateEarlyWarning
@@ -491,7 +643,7 @@ func (c *congestionDetector) congestionDetectionStateMachine() {
case bwe.CongestionStateCongestedHangover:
if congestedTriggered {
c.params.Logger.Warnw("invalid congested state transition", nil, "from", state, "reason", congestedReason)
c.params.Logger.Warnw("send side bwe: invalid congested state transition", nil, "from", state, "reason", congestedReason)
}
if earlyWarningTriggered {
newState = bwe.CongestionStateEarlyWarning
@@ -543,7 +695,7 @@ func (c *congestionDetector) updateCTRTrend(pg *packetGroup) {
return
}
c.params.Logger.Infow("captured traffic ratio is trending downward", "channel", c.congestedCTRTrend)
c.params.Logger.Infow("send side bwe: captured traffic ratio is trending downward", "channel", c.congestedCTRTrend)
if bweListener := c.getBWEListener(); bweListener != nil {
bweListener.OnCongestionStateChange(c.congestionState, c.estimatedAvailableChannelCapacity)
@@ -554,7 +706,7 @@ func (c *congestionDetector) updateCTRTrend(pg *packetGroup) {
}
func (c *congestionDetector) estimateAvailableChannelCapacity() {
if len(c.packetGroups) == 0 {
if len(c.packetGroups) == 0 || c.probePacketGroup != nil {
return
}
@@ -579,7 +731,7 @@ func (c *congestionDetector) estimateAvailableChannelCapacity() {
if agg.Duration() < c.params.Config.RateMeasurementWindowDurationMin.Microseconds() {
c.params.Logger.Infow(
"not enough data to estimate available channel capacity",
"send side bwe: not enough data to estimate available channel capacity",
"duration", agg.Duration(),
"numGroups", len(c.packetGroups),
"oldestUsed", max(0, idx),
@@ -592,7 +744,7 @@ func (c *congestionDetector) estimateAvailableChannelCapacity() {
func (c *congestionDetector) updateCongestionState(state bwe.CongestionState, reason string, oldestContributingGroup int) {
c.params.Logger.Infow(
"congestion state change",
"send side bwe: congestion state change",
"from", c.congestionState,
"to", state,
"reason", reason,
@@ -629,7 +781,7 @@ func (c *congestionDetector) updateCongestionState(state bwe.CongestionState, re
func (c *congestionDetector) processFeedbackReport(fbr feedbackReport) {
recvRefTime, isOutOfOrder := c.twccFeedback.ProcessReport(fbr.report, fbr.at)
if isOutOfOrder {
c.params.Logger.Infow("received out-of-order feedback report")
c.params.Logger.Infow("send side bwe: received out-of-order feedback report")
}
if len(c.packetGroups) == 0 {
@@ -652,6 +804,10 @@ func (c *congestionDetector) processFeedbackReport(fbr feedbackReport) {
return
}
if c.probePacketGroup != nil {
c.probePacketGroup.Add(pi, sendDelta, recvDelta, isLost)
}
err := pg.Add(pi, sendDelta, recvDelta, isLost)
if err == nil {
return
@@ -662,18 +818,19 @@ func (c *congestionDetector) processFeedbackReport(fbr feedbackReport) {
c.updateCTRTrend(pg)
// SSBWE-REMOVE c.params.Logger.Infow("packet group done", "group", pg, "numGroups", len(c.packetGroups)) // SSBWE-REMOVE
pqd, _ := pg.PropagatedQueuingDelay()
pg = newPacketGroup(
packetGroupParams{
Config: c.params.Config.PacketGroup,
WeightedLoss: c.params.Config.WeightedLoss,
Logger: c.params.Logger,
},
pqd,
pg.PropagatedQueuingDelay(),
)
c.packetGroups = append(c.packetGroups, pg)
pg.Add(pi, sendDelta, recvDelta, isLost)
if err = pg.Add(pi, sendDelta, recvDelta, isLost); err != nil {
c.params.Logger.Warnw("send side bwe: could not add packet to new packet group", err, "packetInfo", pi, "packetGroup", pg)
}
return
}
@@ -683,7 +840,7 @@ func (c *congestionDetector) processFeedbackReport(fbr feedbackReport) {
if err := opg.Add(pi, sendDelta, recvDelta, isLost); err == nil {
return
} else if err == errGroupFinalized {
c.params.Logger.Infow("unpected finalized group", "packetInfo", pi, "packetGroup", opg)
c.params.Logger.Infow("send side bwe: unexpected finalized group", "packetInfo", pi, "packetGroup", opg)
}
}
}
+12 -8
View File
@@ -230,16 +230,20 @@ func (p *packetGroup) SendWindow() (int64, int64) {
return p.minSendTime, p.maxSendTime
}
func (p *packetGroup) PropagatedQueuingDelay() (int64, bool) {
func (p *packetGroup) PropagatedQueuingDelay() int64 {
if p.queuingDelay+p.aggregateRecvDelta-p.aggregateSendDelta > 0 {
return p.queuingDelay + p.aggregateRecvDelta - p.aggregateSendDelta
}
return max(0, p.aggregateRecvDelta-p.aggregateSendDelta)
}
func (p *packetGroup) FinalizedPropagatedQueuingDelay() (int64, bool) {
if !p.isFinalized {
return 0, false
}
if p.queuingDelay+p.aggregateRecvDelta-p.aggregateSendDelta > 0 {
return p.queuingDelay + p.aggregateRecvDelta - p.aggregateSendDelta, true
}
return max(0, p.aggregateRecvDelta-p.aggregateSendDelta), true
return p.PropagatedQueuingDelay(), true
}
func (p *packetGroup) Traffic() *trafficStats {
@@ -251,6 +255,7 @@ func (p *packetGroup) Traffic() *trafficStats {
ackedPackets: p.acked.numPackets(),
ackedBytes: p.acked.numBytes(),
lostPackets: p.lost.numPackets(),
lostBytes: p.lost.numBytes(),
}
}
@@ -282,8 +287,7 @@ func (p *packetGroup) MarshalLogObject(e zapcore.ObjectEncoder) error {
ts.Merge(p.Traffic())
e.AddObject("trafficStats", ts)
e.AddInt64("queuingDelay", p.queuingDelay)
pqd, _ := p.PropagatedQueuingDelay()
e.AddInt64("propagatedQueuingDelay", pqd)
e.AddInt64("propagatedQueuingDelay", p.PropagatedQueuingDelay())
e.AddBool("isFinalized", p.isFinalized)
return nil
+31 -1
View File
@@ -41,6 +41,9 @@ type packetTracker struct {
baseRecvTime int64
piLastRecv *packetInfo
probeClusterId ccutils.ProbeClusterId
probeMaxSequenceNumber uint64
}
func newPacketTracker(params packetTrackerParams) *packetTracker {
@@ -73,7 +76,7 @@ func (p *packetTracker) RecordPacketSendAndGetSequenceNumber(
probeClusterId: probeClusterId,
isProbe: isProbe,
}
// SSBWE-REMOVE p.params.Logger.Infow("packet sent", "packetInfo", pi) // SSBWE-REMOVE
//p.params.Logger.Infow("send side bwe: packet sent", "packetInfo", pi) // SSBWE-REMOVE
p.sequenceNumber++
@@ -82,6 +85,10 @@ func (p *packetTracker) RecordPacketSendAndGetSequenceNumber(
p.piLastRecv = nil
}
if p.probeClusterId != ccutils.ProbeClusterIdInvalid && p.probeClusterId == pi.probeClusterId && pi.sequenceNumber > p.probeMaxSequenceNumber {
p.probeMaxSequenceNumber = pi.sequenceNumber
}
return uint16(pi.sequenceNumber)
}
@@ -137,3 +144,26 @@ func (p *packetTracker) getPacketInfoExisting(sn uint16) *packetInfo {
return nil
}
func (p *packetTracker) ProbeClusterStarting(probeClusterId ccutils.ProbeClusterId) {
p.lock.Lock()
defer p.lock.Unlock()
p.probeClusterId = probeClusterId
}
func (p *packetTracker) ProbeClusterDone(probeClusterId ccutils.ProbeClusterId) {
p.lock.Lock()
defer p.lock.Unlock()
if p.probeClusterId == probeClusterId {
p.probeClusterId = ccutils.ProbeClusterIdInvalid
}
}
func (p *packetTracker) ProbeMaxSequenceNumber() uint64 {
p.lock.Lock()
defer p.lock.Unlock()
return p.probeMaxSequenceNumber
}
@@ -0,0 +1,132 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sendsidebwe
import (
"time"
"github.com/livekit/livekit-server/pkg/sfu/ccutils"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/mono"
"go.uber.org/zap/zapcore"
)
// -------------------------------------------------------------
type ProbePacketGroupConfig struct {
PacketGroup PacketGroupConfig `yaml:"packet_group,omitempty"`
SettleWaitNumRTT uint32 `yaml:"settle_wait_num_rtt,omitempty"`
SettleWaitMin time.Duration `yaml:"settle_wait_min,omitempty"`
SettleWaitMax time.Duration `yaml:"settle_wait_max,omitempty"`
}
var (
// large numbers to treat a probe packet group as one
DefaultPacketGroupConfigProbe = ProbePacketGroupConfig{
PacketGroup: PacketGroupConfig{
MinPackets: 16384,
MaxWindowDuration: time.Minute,
},
SettleWaitNumRTT: 10,
SettleWaitMin: 500 * time.Millisecond,
SettleWaitMax: 10 * time.Second,
}
)
// -------------------------------------------------------------
type probePacketGroupParams struct {
Config ProbePacketGroupConfig
WeightedLoss WeightedLossConfig
Logger logger.Logger
}
type probePacketGroup struct {
params probePacketGroupParams
pci ccutils.ProbeClusterInfo
*packetGroup
maxSequenceNumber uint64
doneAt time.Time
}
func newProbePacketGroup(params probePacketGroupParams, pci ccutils.ProbeClusterInfo) *probePacketGroup {
return &probePacketGroup{
params: params,
pci: pci,
packetGroup: newPacketGroup(
packetGroupParams{
Config: params.Config.PacketGroup,
WeightedLoss: params.WeightedLoss,
Logger: params.Logger,
},
0,
),
}
}
func (p *probePacketGroup) ProbeClusterDone(pci ccutils.ProbeClusterInfo) {
if p.pci.Id != pci.Id {
return
}
p.pci.Result = pci.Result
p.doneAt = mono.Now()
}
func (p *probePacketGroup) MaybeFinalizeProbe(maxSequenceNumber uint64, rtt float64) (ccutils.ProbeClusterInfo, bool) {
if p.doneAt.IsZero() {
return ccutils.ProbeClusterInfoInvalid, false
}
if maxSequenceNumber != 0 && p.maxSequenceNumber >= maxSequenceNumber {
return p.pci, true
}
settleWait := time.Duration(float64(p.params.Config.SettleWaitNumRTT) * rtt * float64(time.Second))
if settleWait < p.params.Config.SettleWaitMin {
settleWait = p.params.Config.SettleWaitMin
}
if settleWait > p.params.Config.SettleWaitMax {
settleWait = p.params.Config.SettleWaitMax
}
if time.Since(p.doneAt) < settleWait {
return ccutils.ProbeClusterInfoInvalid, false
}
return p.pci, true
}
func (p *probePacketGroup) Add(pi *packetInfo, sendDelta, recvDelta int64, isLost bool) error {
if !p.doneAt.IsZero() || pi.probeClusterId != p.pci.Id {
return nil
}
p.maxSequenceNumber = max(p.maxSequenceNumber, pi.sequenceNumber)
return p.packetGroup.Add(pi, sendDelta, recvDelta, isLost)
}
func (p *probePacketGroup) MarshalLogObject(e zapcore.ObjectEncoder) error {
if p == nil {
return nil
}
e.AddObject("pci", p.pci)
e.AddObject("packetGroup", p.packetGroup)
e.AddUint64("maxSequenceNumber", p.maxSequenceNumber)
e.AddTime("doneAt", p.doneAt)
return nil
}
+35 -2
View File
@@ -15,7 +15,10 @@
package sendsidebwe
import (
"time"
"github.com/livekit/livekit-server/pkg/sfu/bwe"
"github.com/livekit/livekit-server/pkg/sfu/ccutils"
"github.com/livekit/protocol/logger"
"github.com/pion/rtcp"
)
@@ -100,12 +103,42 @@ func (s *SendSideBWE) Stop() {
s.congestionDetector.Stop()
}
func (s *SendSideBWE) RecordPacketSendAndGetSequenceNumber(
atMicro int64,
size int,
isRTX bool,
probeClusterId ccutils.ProbeClusterId,
isProbe bool,
) uint16 {
return s.congestionDetector.RecordPacketSendAndGetSequenceNumber(atMicro, size, isRTX, probeClusterId, isProbe)
}
func (s *SendSideBWE) HandleTWCCFeedback(report *rtcp.TransportLayerCC) {
s.congestionDetector.HandleTWCCFeedback(report)
}
func (s *SendSideBWE) CongestionState() bwe.CongestionState {
return s.congestionDetector.CongestionState()
func (s *SendSideBWE) UpdateRTT(rtt float64) {
s.congestionDetector.UpdateRTT(rtt)
}
func (s *SendSideBWE) CanProbe() bool {
return s.congestionDetector.CanProbe()
}
func (s *SendSideBWE) ProbeDuration() time.Duration {
return s.congestionDetector.ProbeDuration()
}
func (s *SendSideBWE) ProbeClusterStarting(pci ccutils.ProbeClusterInfo) {
s.congestionDetector.ProbeClusterStarting(pci)
}
func (s *SendSideBWE) ProbeClusterDone(pci ccutils.ProbeClusterInfo) {
s.congestionDetector.ProbeClusterDone(pci)
}
func (s *SendSideBWE) ProbeClusterFinalize() (ccutils.ProbeSignal, int64, bool) {
return s.congestionDetector.ProbeClusterFinalize()
}
// ------------------------------------------------
+15 -4
View File
@@ -53,6 +53,7 @@ type trafficStats struct {
ackedPackets int
ackedBytes int
lostPackets int
lostBytes int
}
func newTrafficStats(params trafficStatsParams) *trafficStats {
@@ -73,6 +74,11 @@ func (ts *trafficStats) Merge(rhs *trafficStats) {
ts.ackedPackets += rhs.ackedPackets
ts.ackedBytes += rhs.ackedBytes
ts.lostPackets += rhs.lostPackets
ts.lostBytes += rhs.lostBytes
}
func (ts *trafficStats) NumBytes() int {
return ts.ackedBytes + ts.lostBytes
}
func (ts *trafficStats) Duration() int64 {
@@ -80,6 +86,11 @@ func (ts *trafficStats) Duration() int64 {
}
func (ts *trafficStats) AcknowledgedBitrate() int64 {
duration := ts.Duration()
if duration == 0 {
return 0
}
ackedBitrate := int64(ts.ackedBytes) * 8 * 1e6 / ts.Duration()
return int64(float64(ackedBitrate) * ts.CapturedTrafficRatio())
}
@@ -112,10 +123,10 @@ func (ts *trafficStats) WeightedLoss() float64 {
pps := totalPackets * 1e6 / float64(ts.Duration())
// Log10 is used to give higher weight for the same loss ratio at higher packet rates,
// for e.g. with a penalty factor of 0.25
// - 10% loss at 20 pps = 0.1 * log10(20) * 0.25 = 0.032
// - 10% loss at 100 pps = 0.1 * log10(100) * 0.25 = 0.05
// - 10% loss at 1000 pps = 0.1 * log10(1000) * 0.25 = 0.075
// for e.g.
// - 10% loss at 20 pps = 0.1 * log10(20) = 0.130
// - 10% loss at 100 pps = 0.1 * log10(100) = 0.2
// - 10% loss at 1000 pps = 0.1 * log10(1000) = 0.3
return lossRatio * math.Log10(pps)
}
+2 -2
View File
@@ -66,7 +66,7 @@ func newTWCCFeedback(params twccFeedbackParams) *twccFeedback {
}
func (t *twccFeedback) ProcessReport(report *rtcp.TransportLayerCC, at time.Time) (int64, bool) {
// SSBWE-REMOVE t.params.Logger.Infow("TWCC feedback", "report", report.String()) // SSBWE-REMOVE
// t.params.Logger.Infow("send side bwe: TWCC feedback", "report", report.String()) // SSBWE-REMOVE
t.numReports++
if t.lastFeedbackTime.IsZero() {
t.lastFeedbackTime = at
@@ -99,7 +99,7 @@ func (t *twccFeedback) ProcessReport(report *rtcp.TransportLayerCC, at time.Time
if !isOutOfOrder {
sinceLast := at.Sub(t.lastFeedbackTime)
// SSBWE-REMOVE t.params.Logger.Infow("report received", "at", at, "sinceLast", sinceLast, "pktCount", report.FbPktCount) // SSBWE-REMOVE
// t.params.Logger.Infow("send side bwe: report received", "at", at, "sinceLast", sinceLast, "pktCount", report.FbPktCount) // SSBWE-REMOVE
if t.estimatedFeedbackInterval == 0 {
t.estimatedFeedbackInterval = sinceLast
} else {
+106
View File
@@ -0,0 +1,106 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ccutils
import (
"time"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/mono"
)
// ------------------------------------------------
type ProbeRegulatorConfig struct {
BaseInterval time.Duration `yaml:"base_interval,omitempty"`
BackoffFactor float64 `yaml:"backoff_factor,omitempty"`
MaxInterval time.Duration `yaml:"max_interval,omitempty"`
MinDuration time.Duration `yaml:"min_duration,omitempty"`
MaxDuration time.Duration `yaml:"max_duration,omitempty"`
DurationIncreaseFactor float64 `yaml:"duration_increase_factor,omitempty"`
}
var (
DefaultProbeRegulatorConfig = ProbeRegulatorConfig{
BaseInterval: 3 * time.Second,
BackoffFactor: 1.5,
MaxInterval: 2 * time.Minute,
MinDuration: 200 * time.Millisecond,
MaxDuration: 20 * time.Second,
DurationIncreaseFactor: 1.5,
}
)
// ---------------------------------------------------------------------------
type ProbeRegulatorParams struct {
Config ProbeRegulatorConfig
Logger logger.Logger
}
type ProbeRegulator struct {
params ProbeRegulatorParams
probeInterval time.Duration
probeDuration time.Duration
nextProbeEarliestAt time.Time
}
func NewProbeRegulator(params ProbeRegulatorParams) *ProbeRegulator {
return &ProbeRegulator{
params: params,
probeInterval: params.Config.BaseInterval,
probeDuration: params.Config.MinDuration,
nextProbeEarliestAt: mono.Now(),
}
}
func (p *ProbeRegulator) CanProbe() bool {
return mono.Now().After(p.nextProbeEarliestAt)
}
func (p *ProbeRegulator) ProbeDuration() time.Duration {
return p.probeDuration
}
func (p *ProbeRegulator) ProbeSignal(probeSignal ProbeSignal, baseTime time.Time) {
if probeSignal == ProbeSignalCongesting {
// wait longer till next probe
p.probeInterval = time.Duration(p.probeInterval.Seconds()*p.params.Config.BackoffFactor) * time.Second
if p.probeInterval > p.params.Config.MaxInterval {
p.probeInterval = p.params.Config.MaxInterval
}
// revert back to starting with shortest probe
p.probeDuration = p.params.Config.MinDuration
} else {
// probe can be started again after minimal interval as previous congestion signal indicated congestion clearing
p.probeInterval = p.params.Config.BaseInterval
// can do longer probe after a good probe
p.probeDuration = time.Duration(float64(p.probeDuration.Milliseconds())*p.params.Config.DurationIncreaseFactor) * time.Millisecond
if p.probeDuration > p.params.Config.MaxDuration {
p.probeDuration = p.params.Config.MaxDuration
}
}
if baseTime.IsZero() {
p.nextProbeEarliestAt = mono.Now().Add(p.probeInterval)
} else {
p.nextProbeEarliestAt = baseTime.Add(p.probeInterval)
}
}
+40
View File
@@ -0,0 +1,40 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ccutils
import "fmt"
// ------------------------------------------------
type ProbeSignal int
const (
ProbeSignalInconclusive ProbeSignal = iota
ProbeSignalCongesting
ProbeSignalClearing
)
func (p ProbeSignal) String() string {
switch p {
case ProbeSignalInconclusive:
return "INCONCLUSIVE"
case ProbeSignalCongesting:
return "CONGESTING"
case ProbeSignalClearing:
return "CLEARING"
default:
return fmt.Sprintf("%d", int(p))
}
}
+4
View File
@@ -628,6 +628,10 @@ func (d *DownTrack) SetProbeClusterId(probeClusterId ccutils.ProbeClusterId) {
d.probeClusterId.Store(uint32(probeClusterId))
}
func (d *DownTrack) SwapProbeClusterId(match ccutils.ProbeClusterId, swap ccutils.ProbeClusterId) {
d.probeClusterId.CompareAndSwap(uint32(match), uint32(swap))
}
// ID is the unique identifier for this Track. This should be unique for the
// stream, but doesn't have to globally unique. A common example would be 'audio' or 'video'
// and StreamID would be 'desktop' or 'webcam'
-277
View File
@@ -1,277 +0,0 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package streamallocator
import (
"fmt"
"sync"
"time"
"github.com/livekit/livekit-server/pkg/sfu/bwe"
"github.com/livekit/livekit-server/pkg/sfu/ccutils"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/mono"
)
const (
cDefaultRTT = float64(0.070) // 70 ms
cRTTSmoothingFactor = float64(0.5)
)
// ---------------------------------------------------------------------------
type ProbeControllerState int
const (
ProbeControllerStateNone ProbeControllerState = iota
ProbeControllerStateProbing
ProbeControllerStateHangover
)
func (p ProbeControllerState) String() string {
switch p {
case ProbeControllerStateNone:
return "NONE"
case ProbeControllerStateProbing:
return "PROBING"
case ProbeControllerStateHangover:
return "HANGOVER"
default:
return fmt.Sprintf("%d", int(p))
}
}
// ------------------------------------------------
type ProbeControllerConfig struct {
BaseInterval time.Duration `yaml:"base_interval,omitempty"`
BackoffFactor float64 `yaml:"backoff_factor,omitempty"`
MaxInterval time.Duration `yaml:"max_interval,omitempty"`
SettleWaitNumRTT uint32 `yaml:"settle_wait_num_rtt,omitempty"`
SettleWaitMin time.Duration `yaml:"settle_wait_min,omitempty"`
SettleWaitMax time.Duration `yaml:"settle_wait_max,omitempty"`
OveragePct int64 `yaml:"overage_pct,omitempty"`
MinBps int64 `yaml:"min_bps,omitempty"`
MinDuration time.Duration `yaml:"min_duration,omitempty"`
MaxDuration time.Duration `yaml:"max_duration,omitempty"`
DurationIncreaseFactor float64 `yaml:"duration_increase_factor,omitempty"`
}
var (
DefaultProbeControllerConfig = ProbeControllerConfig{
BaseInterval: 3 * time.Second,
BackoffFactor: 1.5,
MaxInterval: 2 * time.Minute,
SettleWaitNumRTT: 10,
SettleWaitMin: 500 * time.Millisecond,
SettleWaitMax: 10 * time.Second,
OveragePct: 120,
MinBps: 200_000,
MinDuration: 200 * time.Millisecond,
MaxDuration: 20 * time.Second,
DurationIncreaseFactor: 1.5,
}
)
// ---------------------------------------------------------------------------
type ProbeControllerParams struct {
Config ProbeControllerConfig
Logger logger.Logger
}
type ProbeController struct {
params ProbeControllerParams
lock sync.RWMutex
state ProbeControllerState
stateSwitchedAt time.Time
pci ccutils.ProbeClusterInfo
rtt float64
probeInterval time.Duration
probeDuration time.Duration
nextProbeEarliestAt time.Time
}
func NewProbeController(params ProbeControllerParams) *ProbeController {
p := &ProbeController{
params: params,
rtt: cDefaultRTT,
}
p.Reset()
return p
}
func (p *ProbeController) Reset() {
p.lock.Lock()
defer p.lock.Unlock()
p.state = ProbeControllerStateNone
p.stateSwitchedAt = mono.Now()
p.pci = ccutils.ProbeClusterInfoInvalid
p.probeInterval = p.params.Config.BaseInterval
p.probeDuration = p.params.Config.MinDuration
p.nextProbeEarliestAt = mono.Now()
}
func (p *ProbeController) UpdateRTT(rtt float64) {
if rtt == 0 {
p.rtt = cDefaultRTT
} else {
if p.rtt == 0 {
p.rtt = rtt
} else {
p.rtt = cRTTSmoothingFactor*rtt + (1.0-cRTTSmoothingFactor)*p.rtt
}
}
}
func (p *ProbeController) CanProbe() bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.state == ProbeControllerStateNone && mono.Now().After(p.nextProbeEarliestAt)
}
func (p *ProbeController) MaybeInitiateProbe(availableBandwidthBps int64, probeGoalDeltaBps int64, expectedBandwidthUsage int64) (ccutils.ProbeClusterGoal, bool) {
p.lock.RLock()
defer p.lock.RUnlock()
if p.state != ProbeControllerStateNone {
// already probing or in probe hangover, don't start a new one
return ccutils.ProbeClusterGoal{}, false
}
if mono.Now().Before(p.nextProbeEarliestAt) {
return ccutils.ProbeClusterGoal{}, false
}
// overshoot a bit to account for noise (in measurement/estimate etc)
desiredIncreaseBps := (probeGoalDeltaBps * p.params.Config.OveragePct) / 100
if desiredIncreaseBps < p.params.Config.MinBps {
desiredIncreaseBps = p.params.Config.MinBps
}
return ccutils.ProbeClusterGoal{
AvailableBandwidthBps: int(availableBandwidthBps),
ExpectedUsageBps: int(expectedBandwidthUsage),
DesiredBps: int(expectedBandwidthUsage + desiredIncreaseBps),
Duration: p.probeDuration,
}, true
}
func (p *ProbeController) ProbeClusterStarting(pci ccutils.ProbeClusterInfo) {
p.lock.Lock()
defer p.lock.Unlock()
if p.state != ProbeControllerStateNone {
p.params.Logger.Warnw("unexpected probe controller state", nil, "state", p.state)
}
p.setState(ProbeControllerStateProbing)
p.pci = pci
}
func (p *ProbeController) ProbeClusterDone(pci ccutils.ProbeClusterInfo) {
p.lock.Lock()
defer p.lock.Unlock()
if p.pci.Id == pci.Id {
p.pci.Result = pci.Result
p.setState(ProbeControllerStateHangover)
}
}
func (p *ProbeController) MaybeFinalizeProbe() (ccutils.ProbeClusterInfo, bool) {
p.lock.Lock()
defer p.lock.Unlock()
if p.state != ProbeControllerStateHangover {
return ccutils.ProbeClusterInfoInvalid, false
}
settleWait := time.Duration(float64(p.params.Config.SettleWaitNumRTT) * p.rtt * float64(time.Second))
if settleWait < p.params.Config.SettleWaitMin {
settleWait = p.params.Config.SettleWaitMin
}
if settleWait > p.params.Config.SettleWaitMax {
settleWait = p.params.Config.SettleWaitMax
}
if time.Since(p.stateSwitchedAt) < settleWait {
return ccutils.ProbeClusterInfoInvalid, false
}
p.setState(ProbeControllerStateNone)
return p.pci, true
}
func (p *ProbeController) ProbeSignal(probeSignal bwe.ProbeSignal) {
if probeSignal == bwe.ProbeSignalCongesting {
// wait longer till next probe
p.probeInterval = time.Duration(p.probeInterval.Seconds()*p.params.Config.BackoffFactor) * time.Second
if p.probeInterval > p.params.Config.MaxInterval {
p.probeInterval = p.params.Config.MaxInterval
}
// revert back to starting with shortest probe
p.probeDuration = p.params.Config.MinDuration
} else {
// probe can be started again after minimal interval as previous congestion signal indicated congestion clearing
p.probeInterval = p.params.Config.BaseInterval
// can do longer probe after a good probe
p.probeDuration = time.Duration(float64(p.probeDuration.Milliseconds())*p.params.Config.DurationIncreaseFactor) * time.Millisecond
if p.probeDuration > p.params.Config.MaxDuration {
p.probeDuration = p.params.Config.MaxDuration
}
}
if p.pci.CreatedAt.IsZero() {
p.nextProbeEarliestAt = mono.Now().Add(p.probeInterval)
} else {
p.nextProbeEarliestAt = p.pci.CreatedAt.Add(p.probeInterval)
}
}
func (p *ProbeController) GetActiveProbeClusterId() ccutils.ProbeClusterId {
p.lock.RLock()
defer p.lock.RUnlock()
if p.state == ProbeControllerStateNone {
return ccutils.ProbeClusterIdInvalid
}
return p.pci.Id
}
func (p *ProbeController) setState(state ProbeControllerState) {
if state == p.state {
return
}
p.state = state
p.stateSwitchedAt = mono.Now()
}
// ------------------------------------------------
+72 -53
View File
@@ -157,16 +157,20 @@ const (
)
type StreamAllocatorConfig struct {
ProbeMode ProbeMode `yaml:"probe_mode,omitempty"`
MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"`
ProbeController ProbeControllerConfig `yaml:"probe_controller,omitempty"`
DisableEstimationUnmanagedTracks bool `yaml:"disable_etimation_unmanaged_tracks,omitempty"`
ProbeMode ProbeMode `yaml:"probe_mode,omitempty"`
MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"`
DisableEstimationUnmanagedTracks bool `yaml:"disable_etimation_unmanaged_tracks,omitempty"`
ProbeOveragePct int64 `yaml:"probe_overage_pct,omitempty"`
ProbeMinBps int64 `yaml:"probe_min_bps,omitempty"`
}
var (
DefaultStreamAllocatorConfig = StreamAllocatorConfig{
ProbeMode: ProbeModePadding,
ProbeController: DefaultProbeControllerConfig,
ProbeMode: ProbeModePadding,
ProbeOveragePct: 120,
ProbeMinBps: 200_000,
}
)
@@ -193,8 +197,7 @@ type StreamAllocator struct {
committedChannelCapacity int64
overriddenChannelCapacity int64
probeController *ProbeController
prober *ccutils.Prober
prober *ccutils.Prober
// STREAM-ALLOCATOR-DATA rateMonitor *RateMonitor
@@ -203,8 +206,9 @@ type StreamAllocator struct {
isAllocateAllPending bool
rembTrackingSSRC uint32
state streamAllocatorState
isHolding bool
state streamAllocatorState
isHolding bool
activeProbeClusterId ccutils.ProbeClusterId
eventsQueue *utils.TypedOpsQueue[Event]
@@ -219,8 +223,9 @@ func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause b
enabled: enabled,
allowPause: allowPause,
// STREAM-ALLOCATOR-DATA rateMonitor: NewRateMonitor(),
videoTracks: make(map[livekit.TrackID]*Track),
state: streamAllocatorStateStable,
videoTracks: make(map[livekit.TrackID]*Track),
state: streamAllocatorStateStable,
activeProbeClusterId: ccutils.ProbeClusterIdInvalid,
eventsQueue: utils.NewTypedOpsQueue[Event](utils.OpsQueueParams{
Name: "stream-allocator",
MinSize: 64,
@@ -234,11 +239,6 @@ func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause b
Logger: params.Logger,
})
s.probeController = NewProbeController(ProbeControllerParams{
Config: s.params.Config.ProbeController,
Logger: params.Logger,
})
s.params.BWE.SetBWEListener(s)
s.params.Pacer.SetPacerProbeObserverListener(s)
@@ -298,7 +298,7 @@ func (s *StreamAllocator) AddTrack(downTrack *sfu.DownTrack, params AddTrackPara
}
downTrack.SetStreamAllocatorListener(s)
downTrack.SetProbeClusterId(s.prober.GetActiveClusterId())
downTrack.SetProbeClusterId(s.activeProbeClusterId)
s.maybePostEventAllocateTrack(downTrack)
}
@@ -694,22 +694,25 @@ func (s *StreamAllocator) handleSignalEstimate(event Event) {
func (s *StreamAllocator) handleSignalPeriodicPing(Event) {
// finalize any probe that may have finished/aborted
if pci, ok := s.probeController.MaybeFinalizeProbe(); ok {
probeSignal, channelCapacity := s.params.BWE.ProbeClusterDone(pci)
s.params.Logger.Debugw(
"stream allocator: probe result",
"probeSignal", probeSignal,
"channelCapacity", channelCapacity,
)
if probeSignal != bwe.ProbeSignalCongesting {
if channelCapacity > s.committedChannelCapacity {
s.committedChannelCapacity = channelCapacity
if s.activeProbeClusterId != ccutils.ProbeClusterIdInvalid {
if probeSignal, channelCapacity, isFinalized := s.params.BWE.ProbeClusterFinalize(); isFinalized {
s.params.Logger.Debugw(
"stream allocator: probe result",
"probeClusterId", s.activeProbeClusterId,
"probeSignal", probeSignal,
"channelCapacity", channelCapacity,
)
s.activeProbeClusterId = ccutils.ProbeClusterIdInvalid
if probeSignal != ccutils.ProbeSignalCongesting {
if channelCapacity > s.committedChannelCapacity {
s.committedChannelCapacity = channelCapacity
}
s.maybeBoostDeficientTracks()
}
s.maybeBoostDeficientTracks()
}
s.probeController.ProbeSignal(probeSignal)
}
// probe if necessary and timing is right
@@ -722,7 +725,7 @@ func (s *StreamAllocator) handleSignalPeriodicPing(Event) {
if s.params.RTTGetter != nil {
if rtt, ok := s.params.RTTGetter(); ok {
s.probeController.UpdateRTT(rtt)
s.params.BWE.UpdateRTT(rtt)
}
}
}
@@ -735,7 +738,8 @@ func (s *StreamAllocator) handleSignalPeriodicPing(Event) {
func (s *StreamAllocator) handleSignalProbeClusterSwitch(event Event) {
pci := event.Data.(ccutils.ProbeClusterInfo)
s.probeController.ProbeClusterStarting(pci)
s.activeProbeClusterId = pci.Id
s.params.BWE.ProbeClusterStarting(pci)
s.params.Pacer.StartProbeCluster(pci)
@@ -765,7 +769,12 @@ func (s *StreamAllocator) handleSignalSendProbe(event Event) {
func (s *StreamAllocator) handleSignalPacerProbeObserverClusterComplete(event Event) {
probeClusterId, _ := event.Data.(ccutils.ProbeClusterId)
pci := s.params.Pacer.EndProbeCluster(probeClusterId)
s.probeController.ProbeClusterDone(pci)
for _, t := range s.getTracks() {
t.DownTrack().SwapProbeClusterId(pci.Id, ccutils.ProbeClusterIdInvalid)
}
s.params.BWE.ProbeClusterDone(pci)
s.prober.ClusterDone(pci)
}
@@ -850,7 +859,7 @@ func (s *StreamAllocator) handleSignalCongestionStateChange(event Event) {
}
if cscd.congestionState == bwe.CongestionStateCongested {
if s.probeController.GetActiveProbeClusterId() != ccutils.ProbeClusterIdInvalid {
if s.activeProbeClusterId != ccutils.ProbeClusterIdInvalid {
s.params.Logger.Infow(
"stream allocator: channel congestion detected, not updating channel capacity in active probe",
"old(bps)", s.committedChannelCapacity,
@@ -887,12 +896,10 @@ func (s *StreamAllocator) setState(state streamAllocatorState) {
s.params.Logger.Infow("stream allocator: state change", "from", s.state, "to", state)
s.state = state
// restart everything when when state is stable
// restart everything when state is STABLE
if state == streamAllocatorStateStable {
s.maybeStopProbe()
s.probeController.Reset()
s.params.BWE.Reset()
}
}
@@ -1064,12 +1071,13 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
}
func (s *StreamAllocator) maybeStopProbe() {
activeProbeClusterId := s.probeController.GetActiveProbeClusterId()
if activeProbeClusterId != ccutils.ProbeClusterIdInvalid {
pci := s.params.Pacer.EndProbeCluster(activeProbeClusterId)
s.probeController.ProbeClusterDone(pci)
s.prober.Reset(pci)
if s.activeProbeClusterId == ccutils.ProbeClusterIdInvalid {
return
}
pci := s.params.Pacer.EndProbeCluster(s.activeProbeClusterId)
s.params.BWE.ProbeClusterDone(pci)
s.prober.Reset(pci)
}
func (s *StreamAllocator) maybeBoostDeficientTracks() {
@@ -1296,7 +1304,7 @@ func (s *StreamAllocator) maybeProbe() {
return
}
if s.params.BWE.CongestionState() != bwe.CongestionStateNone || !s.probeController.CanProbe() {
if !s.params.BWE.CanProbe() {
return
}
@@ -1321,7 +1329,7 @@ func (s *StreamAllocator) maybeProbeWithMedia() {
updateStreamStateChange(track, allocation, update)
s.maybeSendUpdate(update)
s.probeController.Reset()
s.params.BWE.Reset()
break
}
}
@@ -1334,14 +1342,25 @@ func (s *StreamAllocator) maybeProbeWithPadding() {
continue
}
pcg, ok := s.probeController.MaybeInitiateProbe(s.committedChannelCapacity, transition.BandwidthDelta, s.getExpectedBandwidthUsage())
if ok {
pci := s.prober.AddCluster(ccutils.ProbeClusterModeUniform, pcg)
s.params.Logger.Debugw(
"stream allocator: starting probe",
"probeClusterInfo", pci,
)
// overshoot a bit to account for noise (in measurement/estimate etc)
desiredIncreaseBps := (transition.BandwidthDelta * s.params.Config.ProbeOveragePct) / 100
if desiredIncreaseBps < s.params.Config.ProbeMinBps {
desiredIncreaseBps = s.params.Config.ProbeMinBps
}
expectedBandwidthUsage := s.getExpectedBandwidthUsage()
pci := s.prober.AddCluster(
ccutils.ProbeClusterModeUniform,
ccutils.ProbeClusterGoal{
AvailableBandwidthBps: int(s.committedChannelCapacity),
ExpectedUsageBps: int(expectedBandwidthUsage),
DesiredBps: int(expectedBandwidthUsage + desiredIncreaseBps),
Duration: s.params.BWE.ProbeDuration(),
},
)
s.params.Logger.Debugw(
"stream allocator: adding probe",
"probeClusterInfo", pci,
)
break
}
}