StreamAllocator (congestion controller) refactor (#3180)

* refactor WIP

* WIP

* compiling

* runlock

* fixes

* fmt

* stringer and unlikely logger

* clean up
This commit is contained in:
Raja Subramanian
2024-11-16 03:06:37 +05:30
committed by GitHub
parent eceada8b31
commit 6509cdb5ea
21 changed files with 832 additions and 514 deletions
+5 -1
View File
@@ -29,7 +29,8 @@ import (
"github.com/livekit/livekit-server/pkg/metric"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/sendsidebwe"
"github.com/livekit/livekit-server/pkg/sfu/bwe/remotebwe"
"github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe"
"github.com/livekit/livekit-server/pkg/sfu/streamallocator"
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
"github.com/livekit/protocol/livekit"
@@ -129,6 +130,8 @@ type CongestionControlConfig struct {
StreamAllocator streamallocator.StreamAllocatorConfig `yaml:"stream_allocator,omitempty"`
RemoteBWE remotebwe.RemoteBWEConfig `yaml:"remote_bwe,omitempty"`
UseSendSideBWEInterceptor bool `yaml:"use_send_side_bwe_interceptor,omitempty"`
UseSendSideBWE bool `yaml:"use_send_side_bwe,omitempty"`
@@ -314,6 +317,7 @@ var DefaultConfig = Config{
Enabled: true,
AllowPause: false,
StreamAllocator: streamallocator.DefaultStreamAllocatorConfig,
RemoteBWE: remotebwe.DefaultRemoteBWEConfig,
UseSendSideBWEInterceptor: false,
UseSendSideBWE: false,
SendSideBWE: sendsidebwe.DefaultSendSideBWEConfig,
+16 -10
View File
@@ -39,10 +39,12 @@ import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/rtc/transport"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu/bwe"
"github.com/livekit/livekit-server/pkg/sfu/bwe/remotebwe"
"github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe"
sfuinterceptor "github.com/livekit/livekit-server/pkg/sfu/interceptor"
"github.com/livekit/livekit-server/pkg/sfu/pacer"
pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay"
"github.com/livekit/livekit-server/pkg/sfu/sendsidebwe"
"github.com/livekit/livekit-server/pkg/sfu/streamallocator"
sfuutils "github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
@@ -208,8 +210,8 @@ type PCTransport struct {
streamAllocator *streamallocator.StreamAllocator
// only for subscriber PC
sendSideBWE *sendsidebwe.SendSideBWE
pacer pacer.Pacer
bwe bwe.BWE
pacer pacer.Pacer
previousAnswer *webrtc.SessionDescription
// track id -> description map in previous offer sdp
@@ -350,7 +352,7 @@ func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimat
ir := &interceptor.Registry{}
if params.IsSendSide {
se.DetachDataChannels()
if params.CongestionControlConfig.UseSendSideBWEInterceptor || params.UseSendSideBWEInterceptor && (!params.CongestionControlConfig.UseSendSideBWE && !params.UseSendSideBWE) {
if (params.CongestionControlConfig.UseSendSideBWEInterceptor || params.UseSendSideBWEInterceptor) && (!params.CongestionControlConfig.UseSendSideBWE && !params.UseSendSideBWE) {
params.Logger.Infow("using send side BWE - interceptor")
gf, err := cc.NewInterceptor(func() (cc.BandwidthEstimator, error) {
return gcc.NewSendSideBWE(
@@ -456,16 +458,20 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) {
if params.CongestionControlConfig.UseSendSideBWE || params.UseSendSideBWE {
params.Logger.Infow("using send side BWE")
t.sendSideBWE = sendsidebwe.NewSendSideBWE(sendsidebwe.SendSideBWEParams{
ssbwe := sendsidebwe.NewSendSideBWE(sendsidebwe.SendSideBWEParams{
Config: params.CongestionControlConfig.SendSideBWE,
Logger: params.Logger,
})
t.pacer = pacer.NewNoQueue(params.Logger, t.sendSideBWE)
t.streamAllocator.SetSendSideBWE(t.sendSideBWE)
t.pacer = pacer.NewNoQueue(params.Logger, ssbwe)
t.bwe = ssbwe
} else {
t.bwe = remotebwe.NewRemoteBWE(remotebwe.RemoteBWEParams{
Config: params.CongestionControlConfig.RemoteBWE,
Logger: params.Logger,
})
t.pacer = pacer.NewPassThrough(params.Logger, nil)
}
t.streamAllocator.SetBWE(t.bwe)
}
if err := t.createPeerConnection(); err != nil {
@@ -991,8 +997,8 @@ func (t *PCTransport) Close() {
if t.pacer != nil {
t.pacer.Stop()
}
if t.sendSideBWE != nil {
t.sendSideBWE.Stop()
if t.bwe != nil {
t.bwe.Stop()
}
_ = t.pc.Close()
+105
View File
@@ -0,0 +1,105 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bwe
import (
"fmt"
"github.com/pion/rtcp"
)
// ------------------------------------------------
type CongestionState int
const (
CongestionStateNone CongestionState = iota
CongestionStateEarlyWarning
CongestionStateEarlyWarningHangover
CongestionStateCongested
CongestionStateCongestedHangover
)
func (c CongestionState) String() string {
switch c {
case CongestionStateNone:
return "NONE"
case CongestionStateEarlyWarning:
return "EARLY_WARNING"
case CongestionStateEarlyWarningHangover:
return "EARLY_WARNING_HANGOVER"
case CongestionStateCongested:
return "CONGESTED"
case CongestionStateCongestedHangover:
return "CONGESTED_HANGOVER"
default:
return fmt.Sprintf("%d", int(c))
}
}
// ------------------------------------------------
type ChannelTrend int
const (
ChannelTrendNeutral ChannelTrend = iota
ChannelTrendClearing
ChannelTrendCongesting
)
func (c ChannelTrend) String() string {
switch c {
case ChannelTrendNeutral:
return "NEUTRAL"
case ChannelTrendClearing:
return "CLEARING"
case ChannelTrendCongesting:
return "CONGESTING"
default:
return fmt.Sprintf("%d", int(c))
}
}
// ------------------------------------------------
type BWE interface {
SetBWEListener(bweListner BWEListener)
Reset()
Stop()
HandleREMB(
receivedEstimate int64,
isProbeFinalizing bool,
expectedBandwidthUsage int64,
sentPackets uint32,
repeatedNacks uint32,
)
HandleTWCCFeedback(report *rtcp.TransportLayerCC)
ProbingStart(expectedBandwidthUsage int64)
ProbingEnd(isNotFailing bool, isGoalReached bool)
GetProbeStatus() (isValidSignal bool, trend ChannelTrend, lowestEstimate int64, highestEstimate int64)
}
// ------------------------------------------------
type BWEListener interface {
OnCongestionStateChange(congestionState CongestionState, estimatedAvailableChannelCapacity int64)
}
// ------------------------------------------------
+47
View File
@@ -0,0 +1,47 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bwe
import "github.com/pion/rtcp"
type NullBWE struct {
}
func (n *NullBWE) SetBWEListener(_bweListener BWEListener) {}
func (n *NullBWE) Reset() {}
func (n *NullBWE) Stop() {}
func (n *NullBWE) HandleREMB(
_receivedEstimate int64,
_isProbeFinalizing bool,
_expectedBandwidthUsage int64,
_sentPackets uint32,
_repeatedNacks uint32,
) {
}
func (n *NullBWE) HandleTWCCFeedback(_report *rtcp.TransportLayerCC) {}
func (n *NullBWE) ProbingStart(_expectedBandwidthUsage int64) {}
func (n *NullBWE) ProbingEnd(_isNotFailing bool, _isGoalReached bool) {}
func (n *NullBWE) GetProbeStatus() (bool, ChannelTrend, int64, int64) {
return false, ChannelTrendNeutral, 0, 0
}
// ------------------------------------------------
@@ -12,56 +12,34 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package streamallocator
package remotebwe
import (
"fmt"
"time"
"github.com/livekit/livekit-server/pkg/sfu/bwe"
"github.com/livekit/livekit-server/pkg/sfu/ccutils"
"github.com/livekit/protocol/logger"
)
// ------------------------------------------------
type ChannelTrend int
type channelCongestionReason int
const (
ChannelTrendNeutral ChannelTrend = iota
ChannelTrendClearing
ChannelTrendCongesting
channelCongestionReasonNone channelCongestionReason = iota
channelCongestionReasonEstimate
channelCongestionReasonLoss
)
func (c ChannelTrend) String() string {
func (c channelCongestionReason) String() string {
switch c {
case ChannelTrendNeutral:
return "NEUTRAL"
case ChannelTrendClearing:
return "CLEARING"
case ChannelTrendCongesting:
return "CONGESTING"
default:
return fmt.Sprintf("%d", int(c))
}
}
// ------------------------------------------------
type ChannelCongestionReason int
const (
ChannelCongestionReasonNone ChannelCongestionReason = iota
ChannelCongestionReasonEstimate
ChannelCongestionReasonLoss
)
func (c ChannelCongestionReason) String() string {
switch c {
case ChannelCongestionReasonNone:
case channelCongestionReasonNone:
return "NONE"
case ChannelCongestionReasonEstimate:
case channelCongestionReasonEstimate:
return "ESTIMATE"
case ChannelCongestionReasonLoss:
case channelCongestionReasonLoss:
return "LOSS"
default:
return fmt.Sprintf("%d", int(c))
@@ -85,9 +63,9 @@ var (
ValidityWindow: 10 * time.Second,
}
DefaultChannelObserverConfigProbe = ChannelObserverConfig{
defaultChannelObserverConfigProbe = ChannelObserverConfig{
Estimate: defaultTrendDetectorConfigProbe,
Nack: DefaultNackTrackerConfigProbe,
Nack: defaultNackTrackerConfigProbe,
}
defaultTrendDetectorConfigNonProbe = ccutils.TrendDetectorConfig{
@@ -99,29 +77,29 @@ var (
ValidityWindow: 10 * time.Second,
}
DefaultChannelObserverConfigNonProbe = ChannelObserverConfig{
defaultChannelObserverConfigNonProbe = ChannelObserverConfig{
Estimate: defaultTrendDetectorConfigNonProbe,
Nack: DefaultNackTrackerConfigNonProbe,
Nack: defaultNackTrackerConfigNonProbe,
}
)
// ------------------------------------------------
type ChannelObserverParams struct {
type channelObserverParams struct {
Name string
Config ChannelObserverConfig
}
type ChannelObserver struct {
params ChannelObserverParams
type channelObserver struct {
params channelObserverParams
logger logger.Logger
estimateTrend *ccutils.TrendDetector[int64]
nackTracker *NackTracker
nackTracker *nackTracker
}
func NewChannelObserver(params ChannelObserverParams, logger logger.Logger) *ChannelObserver {
return &ChannelObserver{
func newChannelObserver(params channelObserverParams, logger logger.Logger) *channelObserver {
return &channelObserver{
params: params,
logger: logger,
estimateTrend: ccutils.NewTrendDetector[int64](ccutils.TrendDetectorParams{
@@ -129,7 +107,7 @@ func NewChannelObserver(params ChannelObserverParams, logger logger.Logger) *Cha
Logger: logger,
Config: params.Config.Estimate,
}),
nackTracker: NewNackTracker(NackTrackerParams{
nackTracker: newNackTracker(nackTrackerParams{
Name: params.Name + "-nack",
Logger: logger,
Config: params.Config.Nack,
@@ -137,61 +115,61 @@ func NewChannelObserver(params ChannelObserverParams, logger logger.Logger) *Cha
}
}
func (c *ChannelObserver) SeedEstimate(estimate int64) {
func (c *channelObserver) SeedEstimate(estimate int64) {
c.estimateTrend.Seed(estimate)
}
func (c *ChannelObserver) AddEstimate(estimate int64) {
func (c *channelObserver) AddEstimate(estimate int64) {
c.estimateTrend.AddValue(estimate)
}
func (c *ChannelObserver) AddNack(packets uint32, repeatedNacks uint32) {
func (c *channelObserver) AddNack(packets uint32, repeatedNacks uint32) {
c.nackTracker.Add(packets, repeatedNacks)
}
func (c *ChannelObserver) GetLowestEstimate() int64 {
func (c *channelObserver) GetLowestEstimate() int64 {
return c.estimateTrend.GetLowest()
}
func (c *ChannelObserver) GetHighestEstimate() int64 {
func (c *channelObserver) GetHighestEstimate() int64 {
return c.estimateTrend.GetHighest()
}
func (c *ChannelObserver) HasEnoughEstimateSamples() bool {
func (c *channelObserver) HasEnoughEstimateSamples() bool {
return c.estimateTrend.HasEnoughSamples()
}
func (c *ChannelObserver) GetNackRatio() float64 {
func (c *channelObserver) GetNackRatio() float64 {
return c.nackTracker.GetRatio()
}
/* STREAM-ALLOCATOR-DATA
func (c *ChannelObserver) GetNackHistory() []string {
/* REMOTE-BWE-DATA
func (c *channelObserver) GetNackHistory() []string {
return c.nackTracker.GetHistory()
}
*/
func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason) {
func (c *channelObserver) GetTrend() (bwe.ChannelTrend, channelCongestionReason) {
estimateDirection := c.estimateTrend.GetDirection()
switch {
case estimateDirection == ccutils.TrendDirectionDownward:
c.logger.Debugw("stream allocator: channel observer: estimate is trending downward", "channel", c.ToString())
return ChannelTrendCongesting, ChannelCongestionReasonEstimate
c.logger.Debugw("stream allocator: channel observer: estimate is trending downward", "channel", c)
return bwe.ChannelTrendCongesting, channelCongestionReasonEstimate
case c.nackTracker.IsTriggered():
c.logger.Debugw("stream allocator: channel observer: high rate of repeated NACKs", "channel", c.ToString())
return ChannelTrendCongesting, ChannelCongestionReasonLoss
c.logger.Debugw("stream allocator: channel observer: high rate of repeated NACKs", "channel", c)
return bwe.ChannelTrendCongesting, channelCongestionReasonLoss
case estimateDirection == ccutils.TrendDirectionUpward:
return ChannelTrendClearing, ChannelCongestionReasonNone
return bwe.ChannelTrendClearing, channelCongestionReasonNone
}
return ChannelTrendNeutral, ChannelCongestionReasonNone
return bwe.ChannelTrendNeutral, channelCongestionReasonNone
}
func (c *ChannelObserver) ToString() string {
return fmt.Sprintf("name: %s, estimate: {%s}, nack {%s}", c.params.Name, c.estimateTrend.ToString(), c.nackTracker.ToString())
func (c *channelObserver) String() string {
return fmt.Sprintf("name: %s, estimate: {%v}, nack {%v}", c.params.Name, c.estimateTrend, c.nackTracker)
}
// ------------------------------------------------
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package streamallocator
package remotebwe
import (
"fmt"
@@ -30,13 +30,13 @@ type NackTrackerConfig struct {
}
var (
DefaultNackTrackerConfigProbe = NackTrackerConfig{
defaultNackTrackerConfigProbe = NackTrackerConfig{
WindowMinDuration: 500 * time.Millisecond,
WindowMaxDuration: 1 * time.Second,
RatioThreshold: 0.04,
}
DefaultNackTrackerConfigNonProbe = NackTrackerConfig{
defaultNackTrackerConfigNonProbe = NackTrackerConfig{
WindowMinDuration: 2 * time.Second,
WindowMaxDuration: 3 * time.Second,
RatioThreshold: 0.08,
@@ -45,35 +45,35 @@ var (
// ------------------------------------------------
type NackTrackerParams struct {
type nackTrackerParams struct {
Name string
Logger logger.Logger
Config NackTrackerConfig
}
type NackTracker struct {
params NackTrackerParams
type nackTracker struct {
params nackTrackerParams
windowStartTime time.Time
packets uint32
repeatedNacks uint32
/* STREAM-ALLOCATOR-DATA
// STREAM-ALLOCATOR-EXPERIMENTAL-TODO: remove when cleaning up experimental stuff
/* REMOTE-BWE-DATA
// REMOTE-BWE-EXPERIMENTAL-TODO: remove when cleaning up experimental stuff
history []string
*/
}
func NewNackTracker(params NackTrackerParams) *NackTracker {
return &NackTracker{
func newNackTracker(params nackTrackerParams) *nackTracker {
return &nackTracker{
params: params,
// STREAM-ALLOCATOR-DATA history: make([]string, 0, 10),
// REMOTE-BWE-DATA history: make([]string, 0, 10),
}
}
func (n *NackTracker) Add(packets uint32, repeatedNacks uint32) {
func (n *nackTracker) Add(packets uint32, repeatedNacks uint32) {
if n.params.Config.WindowMaxDuration != 0 && !n.windowStartTime.IsZero() && time.Since(n.windowStartTime) > n.params.Config.WindowMaxDuration {
// STREAM-ALLOCATOR-DATA n.updateHistory()
// REMOTE-BWE-DATA n.updateHistory()
n.windowStartTime = time.Time{}
n.packets = 0
@@ -96,7 +96,7 @@ func (n *NackTracker) Add(packets uint32, repeatedNacks uint32) {
}
}
func (n *NackTracker) GetRatio() float64 {
func (n *nackTracker) GetRatio() float64 {
ratio := 0.0
if n.packets != 0 {
ratio = float64(n.repeatedNacks) / float64(n.packets)
@@ -108,7 +108,7 @@ func (n *NackTracker) GetRatio() float64 {
return ratio
}
func (n *NackTracker) IsTriggered() bool {
func (n *nackTracker) IsTriggered() bool {
if n.params.Config.WindowMinDuration != 0 && !n.windowStartTime.IsZero() && time.Since(n.windowStartTime) > n.params.Config.WindowMinDuration {
return n.GetRatio() > n.params.Config.RatioThreshold
}
@@ -116,7 +116,7 @@ func (n *NackTracker) IsTriggered() bool {
return false
}
func (n *NackTracker) ToString() string {
func (n *nackTracker) ToString() string {
window := ""
if !n.windowStartTime.IsZero() {
now := time.Now()
@@ -126,12 +126,12 @@ func (n *NackTracker) ToString() string {
return fmt.Sprintf("n: %s, %s, p: %d, rn: %d, rn/p: %.2f", n.params.Name, window, n.packets, n.repeatedNacks, n.GetRatio())
}
/* STREAM-ALLOCATOR-DATA
func (n *NackTracker) GetHistory() []string {
/* REMOTE-BWE-DATA
func (n *nackTracker) GetHistory() []string {
return n.history
}
func (n *NackTracker) updateHistory() {
func (n *nackTracker) updateHistory() {
if len(n.history) >= 10 {
n.history = n.history[1:]
}
+369
View File
@@ -0,0 +1,369 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remotebwe
import (
"sync"
"time"
"github.com/frostbyte73/core"
"github.com/livekit/livekit-server/pkg/sfu/bwe"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/mono"
)
const (
ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps
)
// ---------------------------------------------------------------------------
type RemoteBWEConfig struct {
NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"`
ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"`
ChannelObserverProbe ChannelObserverConfig `yaml:"channel_observer_probe,omitempty"`
ChannelObserverNonProbe ChannelObserverConfig `yaml:"channel_observer_non_probe,omitempty"`
CongestedMinDuration time.Duration `yaml:"congested_min_duration,omitempty"`
PeriodicCheckInterval time.Duration `yaml:"periodic_check_interval,omitempty"`
PeriodicCheckIntervalCongested time.Duration `yaml:"periodic_check_interval_congested,omitempty"`
}
var (
DefaultRemoteBWEConfig = RemoteBWEConfig{
NackRatioAttenuator: 0.4,
ExpectedUsageThreshold: 0.95,
ChannelObserverProbe: defaultChannelObserverConfigProbe,
ChannelObserverNonProbe: defaultChannelObserverConfigNonProbe,
CongestedMinDuration: 3 * time.Second,
PeriodicCheckInterval: 2 * time.Second,
PeriodicCheckIntervalCongested: 200 * time.Millisecond,
}
)
// ---------------------------------------------------------------------------
type RemoteBWEParams struct {
Config RemoteBWEConfig
Logger logger.Logger
}
type RemoteBWE struct {
bwe.NullBWE
params RemoteBWEParams
lock sync.RWMutex
lastReceivedEstimate int64
lastExpectedBandwidthUsage int64
isInProbe bool
committedChannelCapacity int64
channelObserver *channelObserver
congestionState bwe.CongestionState
congestionStateSwitchedAt time.Time
wake chan struct{}
stop core.Fuse
bweListener bwe.BWEListener
}
func NewRemoteBWE(params RemoteBWEParams) *RemoteBWE {
r := &RemoteBWE{
params: params,
}
r.channelObserver = r.newChannelObserverNonProbe()
return r
}
func (r *RemoteBWE) SetBWEListener(bweListener bwe.BWEListener) {
r.lock.Lock()
defer r.lock.Unlock()
r.bweListener = bweListener
}
func (r *RemoteBWE) getBWEListener() bwe.BWEListener {
r.lock.RLock()
defer r.lock.RUnlock()
return r.bweListener
}
func (r *RemoteBWE) Reset() {
r.lock.Lock()
defer r.lock.Unlock()
r.channelObserver = r.newChannelObserverNonProbe()
}
func (r *RemoteBWE) Stop() {
r.stop.Break()
}
func (r *RemoteBWE) HandleREMB(
receivedEstimate int64,
isProbeFinalizing bool,
expectedBandwidthUsage int64,
sentPackets uint32,
repeatedNacks uint32,
) {
r.lock.Lock()
r.lastReceivedEstimate = receivedEstimate
r.lastExpectedBandwidthUsage = expectedBandwidthUsage
if !isProbeFinalizing {
r.channelObserver.AddEstimate(r.lastReceivedEstimate)
r.channelObserver.AddNack(sentPackets, repeatedNacks)
}
var (
shouldNotify bool
state bwe.CongestionState
committedChannelCapacity int64
)
if !r.isInProbe {
shouldNotify, state, committedChannelCapacity = r.congestionDetectionStateMachine()
}
r.lock.Unlock()
if shouldNotify {
if bweListener := r.getBWEListener(); bweListener != nil {
bweListener.OnCongestionStateChange(state, committedChannelCapacity)
}
}
}
func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState, int64) {
newState := r.congestionState
update := false
trend, reason := r.channelObserver.GetTrend()
switch r.congestionState {
case bwe.CongestionStateNone:
if trend == bwe.ChannelTrendCongesting {
if r.estimateAvailableChannelCapacity(reason) {
newState = bwe.CongestionStateCongested
}
}
case bwe.CongestionStateCongested:
if trend == bwe.ChannelTrendCongesting {
if r.estimateAvailableChannelCapacity(reason) {
// update state sa this needs to reset switch time to wait for congestion min duration again
update = true
}
} else if time.Since(r.congestionStateSwitchedAt) >= r.params.Config.CongestedMinDuration {
newState = bwe.CongestionStateNone
}
}
shouldNotify := false
if newState != r.congestionState || update {
r.updateCongestionState(newState, reason)
shouldNotify = true
}
return shouldNotify, r.congestionState, r.committedChannelCapacity
}
func (r *RemoteBWE) estimateAvailableChannelCapacity(reason channelCongestionReason) bool {
var estimateToCommit int64
switch reason {
case channelCongestionReasonLoss:
estimateToCommit = int64(float64(r.lastExpectedBandwidthUsage) * (1.0 - r.params.Config.NackRatioAttenuator*r.channelObserver.GetNackRatio()))
default:
estimateToCommit = r.lastReceivedEstimate
}
if estimateToCommit > r.lastReceivedEstimate {
estimateToCommit = r.lastReceivedEstimate
}
commitThreshold := int64(r.params.Config.ExpectedUsageThreshold * float64(r.lastExpectedBandwidthUsage))
ulgr := r.params.Logger.WithUnlikelyValues(
"reason", reason,
"old(bps)", r.committedChannelCapacity,
"new(bps)", estimateToCommit,
"lastReceived(bps)", r.lastReceivedEstimate,
"expectedUsage(bps)", r.lastExpectedBandwidthUsage,
"commitThreshold(bps)", commitThreshold,
"channel", r.channelObserver,
)
if estimateToCommit > commitThreshold {
ulgr.Debugw("remote bwe: channel congestion detected, skipping above commit threshold channel capacity update")
return false
}
ulgr.Infow("remote bwe: channel congestion detected, applying channel capacity update")
/* REMOTE-BWE-DATA
r.params.Logger.Debugw(
fmt.Sprintf("remote bwe: channel congestion detected, %s channel capacity: experimental", action),
"nackHistory", r.channelObserver.GetNackHistory(),
)
*/
r.committedChannelCapacity = estimateToCommit
// reset to get new set of samples for next trend
r.channelObserver = r.newChannelObserverNonProbe()
return true
}
func (r *RemoteBWE) updateCongestionState(state bwe.CongestionState, reason channelCongestionReason) {
r.params.Logger.Infow(
"remote bwe: congestion state change",
"from", r.congestionState,
"to", state,
"reason", reason,
"committedChannelCapacity", r.committedChannelCapacity,
)
if state != r.congestionState {
// notify worker for ticker interval management based on state
select {
case r.wake <- struct{}{}:
default:
}
}
r.congestionState = state
r.congestionStateSwitchedAt = mono.Now()
}
func (r *RemoteBWE) newChannelObserverNonProbe() *channelObserver {
return newChannelObserver(
channelObserverParams{
Name: "non-probe",
Config: r.params.Config.ChannelObserverNonProbe,
},
r.params.Logger,
)
}
func (r *RemoteBWE) ProbingStart(expectedBandwidthUsage int64) {
r.lock.Lock()
defer r.lock.Unlock()
r.isInProbe = true
r.lastExpectedBandwidthUsage = expectedBandwidthUsage
r.params.Logger.Debugw(
"stream allocator: starting probe",
"lastReceived", r.lastReceivedEstimate,
"expectedBandwidthUsage", expectedBandwidthUsage,
"channel", r.channelObserver,
)
r.channelObserver = newChannelObserver(
channelObserverParams{
Name: "probe",
Config: r.params.Config.ChannelObserverProbe,
},
r.params.Logger,
)
r.channelObserver.SeedEstimate(r.lastReceivedEstimate)
}
func (r *RemoteBWE) ProbingEnd(isNotFailing bool, isGoalReached bool) {
r.lock.Lock()
defer r.lock.Unlock()
highestEstimateInProbe := r.channelObserver.GetHighestEstimate()
//
// Reset estimator at the end of a probe irrespective of probe result to get fresh readings.
// With a failed probe, the latest estimate could be lower than committed estimate.
// As bandwidth estimator (remote in REMB case, local in TWCC case) holds state,
// subsequent estimates could start from the lower point. That should not trigger a
// downward trend and get latched to committed estimate as that would trigger a re-allocation.
// With fresh readings, as long as the trend is not going downward, it will not get latched.
//
// BWE-TODO: clean up this comment after implementing probing in TWCC case
// NOTE: With TWCC, it is possible to reset bandwidth estimation to clean state as
// the send side is in full control of bandwidth estimation.
//
r.params.Logger.Debugw(
"probe done",
"isNotFailing", isNotFailing,
"isGoalReached", isGoalReached,
"committedEstimate", r.committedChannelCapacity,
"highestEstimate", highestEstimateInProbe,
"channel", r.channelObserver,
)
r.channelObserver = r.newChannelObserverNonProbe()
if !isNotFailing {
return
}
if highestEstimateInProbe > r.committedChannelCapacity {
r.committedChannelCapacity = highestEstimateInProbe
}
}
func (r *RemoteBWE) GetProbeStatus() (bool, bwe.ChannelTrend, int64, int64) {
r.lock.RLock()
defer r.lock.RUnlock()
if !r.isInProbe {
return false, bwe.ChannelTrendNeutral, 0, 0
}
trend, _ := r.channelObserver.GetTrend()
return r.channelObserver.HasEnoughEstimateSamples(),
trend,
r.channelObserver.GetLowestEstimate(),
r.channelObserver.GetHighestEstimate()
}
func (r *RemoteBWE) worker() {
ticker := time.NewTicker(r.params.Config.PeriodicCheckInterval)
defer ticker.Stop()
for {
select {
case <-r.wake:
r.lock.RLock()
state := r.congestionState
r.lock.RUnlock()
if state == bwe.CongestionStateCongested {
ticker.Reset(r.params.Config.PeriodicCheckIntervalCongested)
} else {
ticker.Reset(r.params.Config.PeriodicCheckInterval)
}
case <-ticker.C:
var (
shouldNotify bool
state bwe.CongestionState
committedChannelCapacity int64
)
r.lock.Lock()
shouldNotify, state, committedChannelCapacity = r.congestionDetectionStateMachine()
r.lock.Unlock()
if shouldNotify {
if bweListener := r.getBWEListener(); bweListener != nil {
bweListener.OnCongestionStateChange(state, committedChannelCapacity)
}
}
case <-r.stop.Watch():
return
}
}
}
@@ -20,6 +20,7 @@ import (
"github.com/frostbyte73/core"
"github.com/gammazero/deque"
"github.com/livekit/livekit-server/pkg/sfu/bwe"
"github.com/livekit/livekit-server/pkg/sfu/ccutils"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/mono"
@@ -315,12 +316,12 @@ type congestionDetector struct {
stop core.Fuse
estimatedAvailableChannelCapacity int64
congestionState CongestionState
congestionState bwe.CongestionState
congestionStateSwitchedAt time.Time
congestedCTRTrend *ccutils.TrendDetector[float64]
congestedTrafficStats *trafficStats
onCongestionStateChange func(congestionState CongestionState, estimatedAvailableChannelCapacity int64)
bweListener bwe.BWEListener
}
func newCongestionDetector(params congestionDetectorParams) *congestionDetector {
@@ -338,69 +339,33 @@ func newCongestionDetector(params congestionDetectorParams) *congestionDetector
return c
}
func (c *congestionDetector) Reset() {
// SSBWE-TODO
// 1. may be clear all packet groups?
// 2. reset congestion state to none
// 3. reset estimate to 100 Mbps
// 4. reset packet_tracker?? maybe only the probe state??
}
func (c *congestionDetector) Stop() {
c.stop.Break()
}
func (c *congestionDetector) OnCongestionStateChange(f func(congestionState CongestionState, estimatedAvailableChannelCapacity int64)) {
func (c *congestionDetector) SetBWEListener(bweListener bwe.BWEListener) {
c.lock.Lock()
defer c.lock.Unlock()
c.onCongestionStateChange = f
c.bweListener = bweListener
}
func (c *congestionDetector) GetCongestionState() CongestionState {
func (c *congestionDetector) getBWEListener() bwe.BWEListener {
c.lock.RLock()
defer c.lock.RUnlock()
return c.congestionState
return c.bweListener
}
func (c *congestionDetector) updateCongestionState(state CongestionState, reason string, oldestContributingGroup int) {
c.lock.Lock()
c.params.Logger.Infow(
"congestion state change",
"from", c.congestionState,
"to", state,
"reason", reason,
"numPacketGroups", len(c.packetGroups),
"numContributingGroups", len(c.packetGroups[oldestContributingGroup:]),
"contributingGroups", logger.ObjectSlice(c.packetGroups[oldestContributingGroup:]),
"estimatedAvailableChannelCapacity", c.estimatedAvailableChannelCapacity,
)
prevState := c.congestionState
c.congestionState = state
onCongestionStateChange := c.onCongestionStateChange
estimatedAvailableChannelCapacity := c.estimatedAvailableChannelCapacity
c.lock.Unlock()
if onCongestionStateChange != nil {
onCongestionStateChange(state, estimatedAvailableChannelCapacity)
}
// when in congested state, monitor changes in captured traffic ratio (CTR)
// to ensure allocations are in line with latest estimates, it is possible that
// the estimate is incorrect when congestion starts and the allocation may be
// sub-optimal and not enough to reduce/relieve congestion, by monitoing CTR
// on a continuous basis allocations can be adjusted in the direction of
// reducing/relieving congestion
if state == CongestionStateCongested && prevState != CongestionStateCongested {
c.resetCTRTrend()
} else if state != CongestionStateCongested {
c.clearCTRTrend()
}
}
func (c *congestionDetector) GetEstimatedAvailableChannelCapacity() int64 {
c.lock.RLock()
defer c.lock.RUnlock()
return c.estimatedAvailableChannelCapacity
}
func (c *congestionDetector) HandleRTCP(report *rtcp.TransportLayerCC) {
func (c *congestionDetector) HandleTWCCFeedback(report *rtcp.TransportLayerCC) {
c.lock.Lock()
c.feedbackReports.PushBack(feedbackReport{mono.Now(), report})
c.lock.Unlock()
@@ -483,55 +448,55 @@ func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool,
}
func (c *congestionDetector) congestionDetectionStateMachine() {
state := c.GetCongestionState()
newState := state
state := c.congestionState
newState := c.congestionState
reason := ""
earlyWarningTriggered, earlyWarningReason, congestedTriggered, congestedReason, oldestContributingGroup := c.isCongestionSignalTriggered()
switch state {
case CongestionStateNone:
case bwe.CongestionStateNone:
if congestedTriggered {
c.params.Logger.Warnw("invalid congested state transition", nil, "from", state, "reason", congestedReason)
}
if earlyWarningTriggered {
newState = CongestionStateEarlyWarning
newState = bwe.CongestionStateEarlyWarning
reason = earlyWarningReason
}
case CongestionStateEarlyWarning:
case bwe.CongestionStateEarlyWarning:
if congestedTriggered {
newState = CongestionStateCongested
newState = bwe.CongestionStateCongested
reason = congestedReason
} else if !earlyWarningTriggered {
newState = CongestionStateEarlyWarningHangover
newState = bwe.CongestionStateEarlyWarningHangover
}
case CongestionStateEarlyWarningHangover:
case bwe.CongestionStateEarlyWarningHangover:
if congestedTriggered {
c.params.Logger.Warnw("invalid congested state transition", nil, "from", state, "reason", congestedReason)
}
if earlyWarningTriggered {
newState = CongestionStateEarlyWarning
newState = bwe.CongestionStateEarlyWarning
reason = earlyWarningReason
} else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.EarlyWarningHangover {
newState = CongestionStateNone
newState = bwe.CongestionStateNone
}
case CongestionStateCongested:
case bwe.CongestionStateCongested:
if !congestedTriggered {
newState = CongestionStateCongestedHangover
newState = bwe.CongestionStateCongestedHangover
}
case CongestionStateCongestedHangover:
case bwe.CongestionStateCongestedHangover:
if congestedTriggered {
c.params.Logger.Warnw("invalid congested state transition", nil, "from", state, "reason", congestedReason)
}
if earlyWarningTriggered {
newState = CongestionStateEarlyWarning
newState = bwe.CongestionStateEarlyWarning
reason = earlyWarningReason
} else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.CongestedHangover {
newState = CongestionStateNone
newState = bwe.CongestionStateNone
}
}
@@ -539,7 +504,6 @@ func (c *congestionDetector) congestionDetectionStateMachine() {
// update after running the above estimate as state change callback includes the estimated available channel capacity
if newState != state {
c.congestionStateSwitchedAt = mono.Now()
c.updateCongestionState(newState, reason, oldestContributingGroup)
}
}
@@ -578,16 +542,10 @@ func (c *congestionDetector) updateCTRTrend(pg *packetGroup) {
return
}
c.params.Logger.Infow("captured traffic ratio is trending downward", "channel", c.congestedCTRTrend.ToString())
c.params.Logger.Infow("captured traffic ratio is trending downward", "channel", c.congestedCTRTrend)
c.lock.RLock()
state := c.congestionState
estimatedAvailableChannelCapacity := c.estimatedAvailableChannelCapacity
onCongestionStateChange := c.onCongestionStateChange
c.lock.RUnlock()
if onCongestionStateChange != nil {
onCongestionStateChange(state, estimatedAvailableChannelCapacity)
if bweListener := c.getBWEListener(); bweListener != nil {
bweListener.OnCongestionStateChange(c.congestionState, c.estimatedAvailableChannelCapacity)
}
// reset to get new set of samples for next trend
@@ -628,9 +586,43 @@ func (c *congestionDetector) estimateAvailableChannelCapacity() {
return
}
c.lock.Lock()
c.estimatedAvailableChannelCapacity = agg.AcknowledgedBitrate()
c.lock.Unlock()
}
func (c *congestionDetector) updateCongestionState(state bwe.CongestionState, reason string, oldestContributingGroup int) {
c.params.Logger.Infow(
"congestion state change",
"from", c.congestionState,
"to", state,
"reason", reason,
"numPacketGroups", len(c.packetGroups),
"numContributingGroups", len(c.packetGroups[oldestContributingGroup:]),
"contributingGroups", logger.ObjectSlice(c.packetGroups[oldestContributingGroup:]),
"estimatedAvailableChannelCapacity", c.estimatedAvailableChannelCapacity,
)
if state != c.congestionState {
c.congestionStateSwitchedAt = mono.Now()
}
prevState := c.congestionState
c.congestionState = state
if bweListener := c.getBWEListener(); bweListener != nil {
bweListener.OnCongestionStateChange(state, c.estimatedAvailableChannelCapacity)
}
// when in congested state, monitor changes in captured traffic ratio (CTR)
// to ensure allocations are in line with latest estimates, it is possible that
// the estimate is incorrect when congestion starts and the allocation may be
// sub-optimal and not enough to reduce/relieve congestion, by monitoing CTR
// on a continuous basis allocations can be adjusted in the direction of
// reducing/relieving congestion
if state == bwe.CongestionStateCongested && prevState != bwe.CongestionStateCongested {
c.resetCTRTrend()
} else if state != bwe.CongestionStateCongested {
c.clearCTRTrend()
}
}
func (c *congestionDetector) processFeedbackReport(fbr feedbackReport) {
@@ -787,7 +779,7 @@ func (c *congestionDetector) worker() {
c.processFeedbackReport(fbReport)
}
if c.GetCongestionState() == CongestionStateCongested {
if c.congestionState == bwe.CongestionStateCongested {
ticker.Reset(c.params.Config.PeriodicCheckIntervalCongested)
} else {
ticker.Reset(c.params.Config.PeriodicCheckInterval)
@@ -15,9 +15,9 @@
package sendsidebwe
import (
"fmt"
"github.com/livekit/livekit-server/pkg/sfu/bwe"
"github.com/livekit/protocol/logger"
"github.com/pion/rtcp"
)
//
@@ -50,35 +50,6 @@ import (
// ---------------------------------------------------------------------------
type CongestionState int
const (
CongestionStateNone CongestionState = iota
CongestionStateEarlyWarning
CongestionStateEarlyWarningHangover
CongestionStateCongested
CongestionStateCongestedHangover
)
func (c CongestionState) String() string {
switch c {
case CongestionStateNone:
return "NONE"
case CongestionStateEarlyWarning:
return "EARLY_WARNING"
case CongestionStateEarlyWarningHangover:
return "EARLY_WARNING_HANGOVER"
case CongestionStateCongested:
return "CONGESTED"
case CongestionStateCongestedHangover:
return "CONGESTED_HANGOVER"
default:
return fmt.Sprintf("%d", int(c))
}
}
// ---------------------------------------------------------------------------
type SendSideBWEConfig struct {
CongestionDetector CongestionDetectorConfig `yaml:"congestion_detector,omitempty"`
}
@@ -97,6 +68,8 @@ type SendSideBWEParams struct {
}
type SendSideBWE struct {
bwe.NullBWE
params SendSideBWEParams
*congestionDetector
@@ -112,8 +85,20 @@ func NewSendSideBWE(params SendSideBWEParams) *SendSideBWE {
}
}
func (s *SendSideBWE) SetBWEListener(bweListener bwe.BWEListener) {
s.congestionDetector.SetBWEListener(bweListener)
}
func (s *SendSideBWE) Reset() {
s.congestionDetector.Reset()
}
func (s *SendSideBWE) Stop() {
s.congestionDetector.Stop()
}
func (s *SendSideBWE) HandleTWCCFeedback(report *rtcp.TransportLayerCC) {
s.congestionDetector.HandleTWCCFeedback(report)
}
// ------------------------------------------------
+1 -1
View File
@@ -154,7 +154,7 @@ func (t *TrendDetector[T]) HasEnoughSamples() bool {
return t.numSamples >= t.params.Config.RequiredSamples
}
func (t *TrendDetector[T]) ToString() string {
func (t *TrendDetector[T]) String() string {
samplesStr := ""
if len(t.samples) > 0 {
firstTime := t.samples[0].at
+1 -1
View File
@@ -19,7 +19,7 @@ import (
"io"
"time"
"github.com/livekit/livekit-server/pkg/sfu/sendsidebwe"
"github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/mono"
"github.com/pion/rtp"
+1 -1
View File
@@ -20,7 +20,7 @@ import (
"github.com/frostbyte73/core"
"github.com/gammazero/deque"
"github.com/livekit/livekit-server/pkg/sfu/sendsidebwe"
"github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe"
"github.com/livekit/protocol/logger"
)
+1 -1
View File
@@ -19,7 +19,7 @@ import (
"github.com/frostbyte73/core"
"github.com/gammazero/deque"
"github.com/livekit/livekit-server/pkg/sfu/sendsidebwe"
"github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe"
"github.com/livekit/protocol/logger"
)
+1 -1
View File
@@ -15,7 +15,7 @@
package pacer
import (
"github.com/livekit/livekit-server/pkg/sfu/sendsidebwe"
"github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe"
"github.com/livekit/protocol/logger"
)
+47 -91
View File
@@ -18,8 +18,8 @@ import (
"sync"
"time"
"github.com/livekit/livekit-server/pkg/sfu/bwe"
"github.com/livekit/livekit-server/pkg/sfu/ccutils"
"github.com/livekit/livekit-server/pkg/sfu/sendsidebwe"
"github.com/livekit/protocol/logger"
)
@@ -75,7 +75,7 @@ type ProbeController struct {
params ProbeControllerParams
lock sync.RWMutex
sendSideBWE *sendsidebwe.SendSideBWE
bwe bwe.BWE
probeInterval time.Duration
lastProbeStartTime time.Time
probeGoalBps int64
@@ -98,11 +98,11 @@ func NewProbeController(params ProbeControllerParams) *ProbeController {
return p
}
func (p *ProbeController) SetSendSideBWE(sendSideBWE *sendsidebwe.SendSideBWE) {
func (p *ProbeController) SetBWE(bwe bwe.BWE) {
p.lock.Lock()
defer p.lock.Unlock()
p.sendSideBWE = sendSideBWE
p.bwe = bwe
}
func (p *ProbeController) Reset() {
@@ -131,49 +131,9 @@ func (p *ProbeController) ProbeClusterDone(info ccutils.ProbeClusterInfo) {
p.lock.Unlock()
}
func (p *ProbeController) CheckProbe(trend ChannelTrend, highestEstimate int64) {
p.lock.Lock()
defer p.lock.Unlock()
if p.probeClusterId == ccutils.ProbeClusterIdInvalid {
return
}
if trend != ChannelTrendNeutral {
p.probeTrendObserved = true
}
switch {
case !p.probeTrendObserved && time.Since(p.lastProbeStartTime) > p.params.Config.TrendWait:
//
// More of a safety net.
// In rare cases, the estimate gets stuck. Prevent from probe running amok
// STREAM-ALLOCATOR-TODO: Need more testing here to ensure that probe does not cause a lot of damage
//
p.params.Logger.Debugw("stream allocator: probe: aborting, no trend", "cluster", p.probeClusterId)
p.abortProbeLocked()
case trend == ChannelTrendCongesting:
// stop immediately if the probe is congesting channel more
p.params.Logger.Debugw("stream allocator: probe: aborting, channel is congesting", "cluster", p.probeClusterId)
p.abortProbeLocked()
case highestEstimate > p.probeGoalBps:
// reached goal, stop probing
p.params.Logger.Infow(
"stream allocator: probe: stopping, goal reached",
"cluster", p.probeClusterId,
"goal", p.probeGoalBps,
"highest", highestEstimate,
)
p.goalReachedProbeClusterId = p.probeClusterId
p.StopProbe()
}
}
func (p *ProbeController) MaybeFinalizeProbe(
isComplete bool,
trend ChannelTrend,
trend bwe.ChannelTrend,
lowestEstimate int64,
) (isHandled bool, isNotFailing bool, isGoalReached bool) {
p.lock.Lock()
@@ -185,7 +145,7 @@ func (p *ProbeController) MaybeFinalizeProbe(
if p.goalReachedProbeClusterId != ccutils.ProbeClusterIdInvalid {
// finalise goal reached probe cluster
p.finalizeProbeLocked(ChannelTrendNeutral)
p.finalizeProbeLocked(bwe.ChannelTrendNeutral)
return true, true, true
}
@@ -193,8 +153,8 @@ func (p *ProbeController) MaybeFinalizeProbe(
p.probeEndTime.IsZero() &&
p.doneProbeClusterInfo.Id != ccutils.ProbeClusterIdInvalid && p.doneProbeClusterInfo.Id == p.probeClusterId {
// ensure any queueing due to probing is flushed
// STREAM-ALLOCATOR-TODO: CongestionControlProbeConfig.SettleWait should actually be a certain number of RTTs.
expectedDuration := float64(9.0)
// STREAM-ALLOCATOR-TODO: ProbeControllerConfig.SettleWait should actually be a certain number of RTTs.
expectedDuration := float64(0.0)
if lowestEstimate != 0 {
expectedDuration = float64(p.doneProbeClusterInfo.BytesSent*8*1000) / float64(lowestEstimate)
}
@@ -232,12 +192,12 @@ func (p *ProbeController) DoesProbeNeedFinalize() bool {
return p.abortedProbeClusterId != ccutils.ProbeClusterIdInvalid || p.goalReachedProbeClusterId != ccutils.ProbeClusterIdInvalid
}
func (p *ProbeController) finalizeProbeLocked(trend ChannelTrend) (isNotFailing bool) {
func (p *ProbeController) finalizeProbeLocked(trend bwe.ChannelTrend) (isNotFailing bool) {
aborted := p.probeClusterId == p.abortedProbeClusterId
p.clearProbeLocked()
if aborted || trend == ChannelTrendCongesting {
if aborted || trend == bwe.ChannelTrendCongesting {
// failed probe, backoff
p.backoffProbeIntervalLocked()
p.resetProbeDurationLocked()
@@ -246,7 +206,7 @@ func (p *ProbeController) finalizeProbeLocked(trend ChannelTrend) (isNotFailing
// reset probe interval and increase probe duration on a upward trending probe
p.resetProbeIntervalLocked()
if trend == ChannelTrendClearing {
if trend == bwe.ChannelTrendClearing {
p.increaseProbeDurationLocked()
}
return true
@@ -281,18 +241,18 @@ func (p *ProbeController) InitProbe(probeGoalDeltaBps int64, expectedBandwidthUs
time.Duration(float64(p.probeDuration.Milliseconds())*p.params.Config.DurationOverflowFactor)*time.Millisecond,
)
p.pollProbe(p.probeClusterId)
p.pollProbe(p.probeClusterId, expectedBandwidthUsage)
return p.probeClusterId, p.probeGoalBps
}
// SSBWE-TODO: try to do same path for both SSBWE and regular, the congesting part might be different though
func (p *ProbeController) pollProbe(probeClusterId ccutils.ProbeClusterId) {
if p.sendSideBWE == nil {
func (p *ProbeController) pollProbe(probeClusterId ccutils.ProbeClusterId, expectedBandwidthUsage int64) {
if p.bwe == nil {
return
}
startingEstimate := p.sendSideBWE.GetEstimatedAvailableChannelCapacity()
p.bwe.ProbingStart(expectedBandwidthUsage)
go func() {
for {
p.lock.Lock()
@@ -302,10 +262,37 @@ func (p *ProbeController) pollProbe(probeClusterId ccutils.ProbeClusterId) {
}
done := false
congestionState := p.sendSideBWE.GetCongestionState()
currentEstimate := p.sendSideBWE.GetEstimatedAvailableChannelCapacity()
_, trend, _, highestEstimate := p.bwe.GetProbeStatus()
if !p.probeTrendObserved && trend != bwe.ChannelTrendNeutral {
p.probeTrendObserved = true
}
switch {
case currentEstimate <= startingEstimate && time.Since(p.lastProbeStartTime) > p.params.Config.TrendWait:
case trend == bwe.ChannelTrendCongesting:
// stop immediately if the probe is congesting channel more
p.params.Logger.Infow(
"stream allocator: probe: aborting, channel is congesting",
"cluster", probeClusterId,
)
p.abortProbeLocked()
done = true
break
case highestEstimate > p.probeGoalBps:
// reached goal, stop probing
p.params.Logger.Infow(
"stream allocator: probe: stopping, goal reached",
"cluster", probeClusterId,
"goal", p.probeGoalBps,
"highestEstimate", highestEstimate,
)
p.goalReachedProbeClusterId = p.probeClusterId
p.StopProbe()
done = true
break
case !p.probeTrendObserved && time.Since(p.lastProbeStartTime) > p.params.Config.TrendWait:
//
// More of a safety net.
// In rare cases, the estimate gets stuck. Prevent from probe running amok
@@ -315,30 +302,6 @@ func (p *ProbeController) pollProbe(probeClusterId ccutils.ProbeClusterId) {
p.abortProbeLocked()
done = true
break
case congestionState == sendsidebwe.CongestionStateCongested || congestionState == sendsidebwe.CongestionStateEarlyWarning:
// stop immediately if the probe is congesting channel more
p.params.Logger.Infow(
"stream allocator: probe: aborting, channel is congesting",
"cluster", probeClusterId,
"congestionState", congestionState,
)
p.abortProbeLocked()
done = true
break
case currentEstimate > p.probeGoalBps:
// reached goal, stop probing
p.params.Logger.Infow(
"stream allocator: probe: stopping, goal reached",
"cluster", probeClusterId,
"goal", p.probeGoalBps,
"current", currentEstimate,
)
p.goalReachedProbeClusterId = p.probeClusterId
p.StopProbe()
done = true
break
}
p.lock.Unlock()
@@ -346,7 +309,7 @@ func (p *ProbeController) pollProbe(probeClusterId ccutils.ProbeClusterId) {
return
}
// SSBWE-TODO: do not hard code sleep time
// BWE-TODO: do not hard code sleep time
time.Sleep(50 * time.Millisecond)
}
}()
@@ -397,13 +360,6 @@ func (p *ProbeController) abortProbeLocked() {
p.StopProbe()
}
func (p *ProbeController) IsInProbe() bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.isInProbeLocked()
}
func (p *ProbeController) isInProbeLocked() bool {
return p.probeClusterId != ccutils.ProbeClusterIdInvalid
}
+93 -217
View File
@@ -30,8 +30,8 @@ import (
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/bwe"
"github.com/livekit/livekit-server/pkg/sfu/ccutils"
"github.com/livekit/livekit-server/pkg/sfu/sendsidebwe"
"github.com/livekit/livekit-server/pkg/utils"
)
@@ -151,24 +151,16 @@ const (
)
type StreamAllocatorConfig struct {
NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"`
ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"`
ProbeMode ProbeMode `yaml:"probe_mode,omitempty"`
MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"`
ProbeController ProbeControllerConfig `yaml:"probe_controller,omitempty"`
ChannelObserverProbe ChannelObserverConfig `yaml:"channel_observer_probe,omitempty"`
ChannelObserverNonProbe ChannelObserverConfig `yaml:"channel_observer_non_probe,omitempty"`
DisableEstimationUnmanagedTracks bool `yaml:"disable_etimation_unmanaged_tracks,omitempty"`
}
var (
DefaultStreamAllocatorConfig = StreamAllocatorConfig{
NackRatioAttenuator: 0.4,
ExpectedUsageThreshold: 0.95,
ProbeMode: ProbeModePadding,
ProbeController: DefaultProbeControllerConfig,
ChannelObserverProbe: DefaultChannelObserverConfigProbe,
ChannelObserverNonProbe: DefaultChannelObserverConfigNonProbe,
ProbeMode: ProbeModePadding,
ProbeController: DefaultProbeControllerConfig,
}
)
@@ -184,13 +176,12 @@ type StreamAllocator struct {
onStreamStateChange func(update *StreamStateUpdate) error
bwe bwe.BWE
sendSideBWEInterceptor cc.BandwidthEstimator
sendSideBWE *sendsidebwe.SendSideBWE
enabled bool
allowPause bool
lastReceivedEstimate int64
committedChannelCapacity int64
overriddenChannelCapacity int64
@@ -198,7 +189,6 @@ type StreamAllocator struct {
prober *ccutils.Prober
channelObserver *ChannelObserver
// STREAM-ALLOCATOR-DATA rateMonitor *RateMonitor
videoTracksMu sync.RWMutex
@@ -206,8 +196,9 @@ type StreamAllocator struct {
isAllocateAllPending bool
rembTrackingSSRC uint32
state streamAllocatorState
isHolding bool
state streamAllocatorState
congestionState bwe.CongestionState
isHolding bool
eventsQueue *utils.TypedOpsQueue[Event]
@@ -263,6 +254,14 @@ func (s *StreamAllocator) OnStreamStateChange(f func(update *StreamStateUpdate)
s.onStreamStateChange = f
}
func (s *StreamAllocator) SetBWE(bwe bwe.BWE) {
if bwe != nil {
bwe.SetBWEListener(s)
}
s.bwe = bwe
s.probeController.SetBWE(bwe)
}
func (s *StreamAllocator) SetSendSideBWEInterceptor(sendSideBWEInterceptor cc.BandwidthEstimator) {
if sendSideBWEInterceptor != nil {
sendSideBWEInterceptor.OnTargetBitrateChange(s.onTargetBitrateChange)
@@ -270,14 +269,6 @@ func (s *StreamAllocator) SetSendSideBWEInterceptor(sendSideBWEInterceptor cc.Ba
s.sendSideBWEInterceptor = sendSideBWEInterceptor
}
func (s *StreamAllocator) SetSendSideBWE(sendSideBWE *sendsidebwe.SendSideBWE) {
if sendSideBWE != nil {
sendSideBWE.OnCongestionStateChange(s.onCongestionStateChange)
}
s.sendSideBWE = sendSideBWE
s.probeController.SetSendSideBWE(sendSideBWE)
}
type AddTrackParams struct {
Source livekit.TrackSource
Priority uint8
@@ -355,7 +346,9 @@ func (s *StreamAllocator) SetChannelCapacity(channelCapacity int64) {
}
func (s *StreamAllocator) resetState() {
s.channelObserver = s.newChannelObserverNonProbe()
if s.bwe != nil {
s.bwe.Reset()
}
s.probeController.Reset()
s.state = streamAllocatorStateStable
@@ -447,8 +440,8 @@ func (s *StreamAllocator) OnTransportCCFeedback(downTrack *sfu.DownTrack, fb *rt
s.sendSideBWEInterceptor.WriteRTCP([]rtcp.Packet{fb}, nil)
}
if s.sendSideBWE != nil {
s.sendSideBWE.HandleRTCP(fb)
if s.bwe != nil {
s.bwe.HandleTWCCFeedback(fb)
}
}
@@ -462,11 +455,12 @@ func (s *StreamAllocator) onTargetBitrateChange(bitrate int) {
// called when congestion state changes (send side bandwidth estimation)
type congestionStateChangeData struct {
congestionState sendsidebwe.CongestionState
congestionState bwe.CongestionState
estimatedAvailableChannelCapacity int64
}
func (s *StreamAllocator) onCongestionStateChange(congestionState sendsidebwe.CongestionState, estimatedAvailableChannelCapacity int64) {
// BWEListener implementation
func (s *StreamAllocator) OnCongestionStateChange(congestionState bwe.CongestionState, estimatedAvailableChannelCapacity int64) {
s.postEvent(Event{
Signal: streamAllocatorSignalCongestionStateChange,
Data: congestionStateChangeData{
@@ -707,27 +701,33 @@ func (s *StreamAllocator) handleSignalAdjustState(Event) {
func (s *StreamAllocator) handleSignalEstimate(event Event) {
receivedEstimate, _ := event.Data.(int64)
s.lastReceivedEstimate = receivedEstimate
// s.monitorRate(receivedEstimate)
// while probing, maintain estimate separately to enable keeping current committed estimate if probe fails
if s.probeController.IsInProbe() {
s.handleNewEstimateInProbe()
} else {
s.handleNewEstimateInNonProbe()
// always update NACKs
packetDelta, repeatedNackDelta := s.getNackDelta()
if s.bwe != nil {
s.bwe.HandleREMB(
receivedEstimate,
s.probeController.DoesProbeNeedFinalize(), // waiting for goal reached OR aborted probe to finalize
s.getExpectedBandwidthUsage(),
packetDelta,
repeatedNackDelta,
)
}
}
func (s *StreamAllocator) handleSignalPeriodicPing(Event) {
// finalize probe if necessary
trend, _ := s.channelObserver.GetTrend()
isHandled, isNotFailing, isGoalReached := s.probeController.MaybeFinalizeProbe(
s.channelObserver.HasEnoughEstimateSamples(),
trend,
s.channelObserver.GetLowestEstimate(),
)
if isHandled {
s.onProbeDone(isNotFailing, isGoalReached)
if s.bwe != nil {
isValidSignal, trend, lowestEstimate, highestEstimate := s.bwe.GetProbeStatus()
isHandled, isNotFailing, isGoalReached := s.probeController.MaybeFinalizeProbe(
isValidSignal,
trend,
lowestEstimate,
)
if isHandled {
s.onProbeDone(isNotFailing, isGoalReached, highestEstimate)
}
}
// probe if necessary and timing is right
@@ -735,7 +735,10 @@ func (s *StreamAllocator) handleSignalPeriodicPing(Event) {
s.maybeProbe()
}
// s.updateTracksHistory()
/* STREAM-ALLOCATOR-DATA
s.monitorRate(s.committedChannelCapacity)
s.updateTracksHistory()
*/
}
func (s *StreamAllocator) handleSignalSendProbe(event Event) {
@@ -819,21 +822,19 @@ func (s *StreamAllocator) handleSignalRTCPReceiverReport(event Event) {
func (s *StreamAllocator) handleSignalCongestionStateChange(event Event) {
cscd := event.Data.(congestionStateChangeData)
if cscd.congestionState != sendsidebwe.CongestionStateNone {
if cscd.congestionState != bwe.CongestionStateNone {
s.probeController.AbortProbe()
}
if cscd.congestionState == sendsidebwe.CongestionStateEarlyWarning ||
cscd.congestionState == sendsidebwe.CongestionStateEarlyWarningHangover {
if cscd.congestionState == bwe.CongestionStateEarlyWarning ||
cscd.congestionState == bwe.CongestionStateEarlyWarningHangover {
s.isHolding = true
} else {
s.isHolding = false
// early warning is done and hold has been released,
// if there is no congestion, allocate all tracks optimally as
// some tracks may have been held at sub-optimal allocation
// during early warning hold
if cscd.congestionState == sendsidebwe.CongestionStateNone && s.state == streamAllocatorStateStable {
if s.isHolding && cscd.congestionState == bwe.CongestionStateNone && s.state == streamAllocatorStateStable {
update := NewStreamStateUpdate()
for _, track := range s.getTracks() {
allocation := track.AllocateOptimal(FlagAllowOvershootWhileOptimal, s.isHolding)
@@ -841,19 +842,39 @@ func (s *StreamAllocator) handleSignalCongestionStateChange(event Event) {
}
s.maybeSendUpdate(update)
}
s.isHolding = false
}
if cscd.congestionState == sendsidebwe.CongestionStateCongested {
if cscd.congestionState == bwe.CongestionStateCongested {
s.params.Logger.Infow(
"stream allocator: channel congestion detected, updating channel capacity",
"old(bps)", s.committedChannelCapacity,
"new(bps)", cscd.estimatedAvailableChannelCapacity,
"expectedUsage(bps)", s.getExpectedBandwidthUsage(),
)
/* STREAM-ALLOCATOR-DATA
s.params.Logger.Debugw(
fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity: experimental", action),
"rateHistory", s.rateMonitor.GetHistory(),
"expectedQueuing", s.rateMonitor.GetQueuingGuess(),
"trackHistory", s.getTracksHistory(),
)
*/
s.committedChannelCapacity = cscd.estimatedAvailableChannelCapacity
// reset probe to ensure it does not start too soon after a downward trend
// BWE-TODO: maybe probe controller setting should be algorithm specific
// BWE-TODO: for e. g., the reset could be waiting shorter in SSBWE case
// BWE-TODO: a couple of things to consider
// BWE-TODO: 1. Make ProbeController be owned by BWE modules?
// BWE-TODO: 2. Add an interface method to BWE to check if probe controller should be reset?
s.probeController.Reset()
s.allocateAllTracks()
}
s.congestionState = cscd.congestionState
}
func (s *StreamAllocator) setState(state streamAllocatorState) {
@@ -866,8 +887,16 @@ func (s *StreamAllocator) setState(state streamAllocatorState) {
// reset probe to enforce a delay after state change before probing
s.probeController.Reset()
// a fresh channel observer after state transition to get clean data
s.channelObserver = s.newChannelObserverNonProbe()
// a fresh start after state transition to get clean data
if s.bwe != nil {
// BWE-TODO: ssbwe maybe should not reset like this as it might have useful state across
// BWE-TODO: state changes in this module, actually even remotebwe should also manage it
// BWE-TODO: internally, Reset should probably only be used if all managed tracks go away
// BWE-TODO: and we can get a clean start, mimicking existing behaviour till this can be
// BWE-TODO: evaluated more.
s.bwe.Reset()
}
}
func (s *StreamAllocator) adjustState() {
@@ -881,99 +910,6 @@ func (s *StreamAllocator) adjustState() {
s.setState(streamAllocatorStateStable)
}
func (s *StreamAllocator) handleNewEstimateInProbe() {
// always update NACKs, even if aborted
packetDelta, repeatedNackDelta := s.getNackDelta()
if s.probeController.DoesProbeNeedFinalize() {
// waiting for aborted probe to finalize
return
}
s.channelObserver.AddEstimate(s.lastReceivedEstimate)
s.channelObserver.AddNack(packetDelta, repeatedNackDelta)
trend, _ := s.channelObserver.GetTrend()
s.probeController.CheckProbe(trend, s.channelObserver.GetHighestEstimate())
}
func (s *StreamAllocator) handleNewEstimateInNonProbe() {
s.channelObserver.AddEstimate(s.lastReceivedEstimate)
packetDelta, repeatedNackDelta := s.getNackDelta()
s.channelObserver.AddNack(packetDelta, repeatedNackDelta)
trend, reason := s.channelObserver.GetTrend()
if trend != ChannelTrendCongesting {
return
}
var estimateToCommit int64
expectedBandwidthUsage := s.getExpectedBandwidthUsage()
switch reason {
case ChannelCongestionReasonLoss:
estimateToCommit = int64(float64(expectedBandwidthUsage) * (1.0 - s.params.Config.NackRatioAttenuator*s.channelObserver.GetNackRatio()))
default:
estimateToCommit = s.lastReceivedEstimate
}
if estimateToCommit > s.lastReceivedEstimate {
estimateToCommit = s.lastReceivedEstimate
}
commitThreshold := int64(s.params.Config.ExpectedUsageThreshold * float64(expectedBandwidthUsage))
action := "applying"
if estimateToCommit > commitThreshold {
action = "skipping"
}
if action == "applying" {
s.params.Logger.Infow(
fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity update", action),
"reason", reason,
"old(bps)", s.committedChannelCapacity,
"new(bps)", estimateToCommit,
"lastReceived(bps)", s.lastReceivedEstimate,
"expectedUsage(bps)", expectedBandwidthUsage,
"commitThreshold(bps)", commitThreshold,
"channel", s.channelObserver.ToString(),
)
} else {
s.params.Logger.Debugw(
fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity update", action),
"reason", reason,
"old(bps)", s.committedChannelCapacity,
"new(bps)", estimateToCommit,
"lastReceived(bps)", s.lastReceivedEstimate,
"expectedUsage(bps)", expectedBandwidthUsage,
"commitThreshold(bps)", commitThreshold,
"channel", s.channelObserver.ToString(),
)
}
/* STREAM-ALLOCATOR-DATA
s.params.Logger.Debugw(
fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity: experimental", action),
"rateHistory", s.rateMonitor.GetHistory(),
"expectedQueuing", s.rateMonitor.GetQueuingGuess(),
"nackHistory", s.channelObserver.GetNackHistory(),
"trackHistory", s.getTracksHistory(),
)
*/
if estimateToCommit > commitThreshold {
// estimate to commit is either higher or within tolerance of expected uage, skip committing and re-allocating
return
}
s.committedChannelCapacity = estimateToCommit
// reset to get new set of samples for next trend
s.channelObserver = s.newChannelObserverNonProbe()
// reset probe to ensure it does not start too soon after a downward trend
s.probeController.Reset()
s.allocateAllTracks()
}
func (s *StreamAllocator) allocateTrack(track *Track) {
// abort any probe that may be running when a track specific change needs allocation
s.probeController.AbortProbe()
@@ -1129,39 +1065,17 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
s.adjustState()
}
func (s *StreamAllocator) onProbeDone(isNotFailing bool, isGoalReached bool) {
highestEstimateInProbe := s.channelObserver.GetHighestEstimate()
if s.sendSideBWE != nil {
highestEstimateInProbe = s.sendSideBWE.GetEstimatedAvailableChannelCapacity()
func (s *StreamAllocator) onProbeDone(isNotFailing bool, isGoalReached bool, highestEstimate int64) {
if s.bwe != nil {
s.bwe.ProbingEnd(isNotFailing, isGoalReached)
}
//
// Reset estimator at the end of a probe irrespective of probe result to get fresh readings.
// With a failed probe, the latest estimate could be lower than committed estimate.
// As bandwidth estimator (remote in REMB case, local in TWCC case) holds state,
// subsequent estimates could start from the lower point. That should not trigger a
// downward trend and get latched to committed estimate as that would trigger a re-allocation.
// With fresh readings, as long as the trend is not going downward, it will not get latched.
//
// NOTE: With TWCC, it is possible to reset bandwidth estimation to clean state as
// the send side is in full control of bandwidth estimation.
//
channelObserverString := s.channelObserver.ToString()
s.channelObserver = s.newChannelObserverNonProbe()
s.params.Logger.Debugw(
"probe done",
"isNotFailing", isNotFailing,
"isGoalReached", isGoalReached,
"committedEstimate", s.committedChannelCapacity,
"highestEstimate", highestEstimateInProbe,
"channel", channelObserverString,
)
if !isNotFailing {
return
}
if highestEstimateInProbe > s.committedChannelCapacity {
s.committedChannelCapacity = highestEstimateInProbe
if highestEstimate > s.committedChannelCapacity {
s.committedChannelCapacity = highestEstimate
}
s.maybeBoostDeficientTracks()
@@ -1277,7 +1191,6 @@ func (s *StreamAllocator) allocateAllTracks() {
for _, track := range sorted {
_, usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient)
s.params.Logger.Infow("debug allocated", "trackID", track.ID(), "usedChannelCapacity", usedChannelCapacity, "availableChannelCapacity", availableChannelCapacity) // REMOVE
availableChannelCapacity -= usedChannelCapacity
if availableChannelCapacity < 0 {
availableChannelCapacity = 0
@@ -1386,52 +1299,17 @@ func (s *StreamAllocator) getNackDelta() (uint32, uint32) {
return aggPacketDelta, aggRepeatedNackDelta
}
func (s *StreamAllocator) newChannelObserverProbe() *ChannelObserver {
return NewChannelObserver(
ChannelObserverParams{
Name: "probe",
Config: s.params.Config.ChannelObserverProbe,
},
s.params.Logger,
)
}
func (s *StreamAllocator) newChannelObserverNonProbe() *ChannelObserver {
return NewChannelObserver(
ChannelObserverParams{
Name: "non-probe",
Config: s.params.Config.ChannelObserverNonProbe,
},
s.params.Logger,
)
}
func (s *StreamAllocator) initProbe(probeGoalDeltaBps int64) {
expectedBandwidthUsage := s.getExpectedBandwidthUsage()
probeClusterId, probeGoalBps := s.probeController.InitProbe(probeGoalDeltaBps, expectedBandwidthUsage)
logFields := []any{
s.params.Logger.Debugw(
"stream allocator: starting probe",
"probeClusterId", probeClusterId,
"current usage", expectedBandwidthUsage,
"committed", s.committedChannelCapacity,
"probeGoalDeltaBps", probeGoalDeltaBps,
"goalBps", probeGoalBps,
}
if s.sendSideBWE != nil {
channelState := ""
if s.channelObserver != nil {
channelState = s.channelObserver.ToString()
}
s.channelObserver = s.newChannelObserverProbe()
s.channelObserver.SeedEstimate(s.lastReceivedEstimate)
logFields = append(
logFields,
"lastReceived", s.lastReceivedEstimate,
"channel", channelState,
)
}
s.params.Logger.Debugw("stream allocator: starting probe", logFields...)
)
}
func (s *StreamAllocator) maybeProbe() {
@@ -1439,10 +1317,8 @@ func (s *StreamAllocator) maybeProbe() {
// do not probe if channel capacity is overridden
return
}
if !s.probeController.CanProbe() {
return
}
if s.sendSideBWE != nil && s.sendSideBWE.GetCongestionState() != sendsidebwe.CongestionStateNone {
if s.congestionState != bwe.CongestionStateNone || !s.probeController.CanProbe() {
return
}