Attempt to reduce disruption due to probe. (#1839)

* Make congestion controller probe config

* Wait for enough estimate samples

* fixes

* format

* limit number of times a packet is ACKed

* ramp up probe duration

* go format

* correct comment

* restore default

* add float64 type to generated CLI
This commit is contained in:
Raja Subramanian
2023-06-30 11:09:46 +05:30
committed by GitHub
parent 3de1e49165
commit 69a1e572be
7 changed files with 192 additions and 102 deletions

View File

@@ -105,12 +105,31 @@ type PLIThrottleConfig struct {
HighQuality time.Duration `yaml:"high_quality,omitempty"`
}
type CongestionControlProbeConfig struct {
BaseInterval time.Duration `yaml:"base_interval,omitempty"`
BackoffFactor float64 `yaml:"backoff_factor,omitempty"`
MaxInterval time.Duration `yaml:"max_interval,omitempty"`
SettleWait time.Duration `yaml:"settle_wait,omitempty"`
SettleWaitMax time.Duration `yaml:"settle_wait_max,omitempty"`
TrendWait time.Duration `yaml:"trend_wait,omitempty"`
OveragePct int64 `yaml:"overage_pct,omitempty"`
MinBps int64 `yaml:"min_bps,omitempty"`
MinDuration time.Duration `yaml:"min_duration,omitempty"`
MaxDuration time.Duration `yaml:"max_duration,omitempty"`
DurationOverflowFactor float64 `yaml:"duration_overflow_factor,omitempty"`
DurationIncreaseFactor float64 `yaml:"duration_increase_factor,omitempty"`
}
type CongestionControlConfig struct {
Enabled bool `yaml:"enabled"`
AllowPause bool `yaml:"allow_pause"`
UseSendSideBWE bool `yaml:"send_side_bandwidth_estimation,omitempty"`
ProbeMode CongestionControlProbeMode `yaml:"padding_mode,omitempty"`
MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"`
Enabled bool `yaml:"enabled"`
AllowPause bool `yaml:"allow_pause"`
UseSendSideBWE bool `yaml:"send_side_bandwidth_estimation,omitempty"`
ProbeMode CongestionControlProbeMode `yaml:"padding_mode,omitempty"`
MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"`
ProbeConfig CongestionControlProbeConfig `yaml:"probe_config,omitempty"`
}
type AudioConfig struct {
@@ -269,6 +288,23 @@ func NewConfig(confString string, strictMode bool, c *cli.Context, baseFlags []c
Enabled: true,
AllowPause: false,
ProbeMode: CongestionControlProbeModePadding,
ProbeConfig: CongestionControlProbeConfig{
BaseInterval: 3 * time.Second,
BackoffFactor: 1.5,
MaxInterval: 2 * time.Minute,
SettleWait: 250 * time.Millisecond,
SettleWaitMax: 10 * time.Second,
TrendWait: 2 * time.Second,
OveragePct: 120,
MinBps: 200_000,
MinDuration: 200 * time.Millisecond,
MaxDuration: 20 * time.Second,
DurationOverflowFactor: 1.25,
DurationIncreaseFactor: 1.5,
},
},
},
Audio: AudioConfig{
@@ -596,6 +632,13 @@ func GenerateCLIFlags(existingFlags []cli.Flag, hidden bool) ([]cli.Flag, error)
Usage: generatedCLIFlagUsage,
Hidden: hidden,
}
case reflect.Float64:
flag = &cli.Float64Flag{
Name: name,
EnvVars: []string{envVar},
Usage: generatedCLIFlagUsage,
Hidden: hidden,
}
case reflect.Slice:
// TODO
continue
@@ -647,6 +690,8 @@ func (conf *Config) updateFromCLI(c *cli.Context, baseFlags []cli.Flag) error {
configValue.SetUint(c.Uint64(flagName))
case reflect.Float32:
configValue.SetFloat(c.Float64(flagName))
case reflect.Float64:
configValue.SetFloat(c.Float64(flagName))
// case reflect.Slice:
// // TODO
// case reflect.Map:

View File

@@ -11,6 +11,7 @@ import (
const (
defaultRtt = 70
ignoreRetransmission = 100 // Ignore packet retransmission after ignoreRetransmission milliseconds
maxAck = 3
)
func btoi(b bool) int {
@@ -187,7 +188,7 @@ func (s *sequencer) getPacketsMeta(seqNo []uint16) []packetMeta {
continue
}
if seq.lastNack == 0 || refTime-seq.lastNack > uint32(math.Min(float64(ignoreRetransmission), float64(2*s.rtt))) {
if (seq.lastNack == 0 || refTime-seq.lastNack > uint32(math.Min(float64(ignoreRetransmission), float64(2*s.rtt)))) && seq.nacked < maxAck {
seq.nacked++
seq.lastNack = refTime

View File

@@ -120,6 +120,10 @@ func (c *ChannelObserver) GetHighestEstimate() int64 {
return c.estimateTrend.GetHighest()
}
func (c *ChannelObserver) HasEnoughEstimateSamples() bool {
return c.estimateTrend.HasEnoughSamples()
}
func (c *ChannelObserver) GetNackRatio() float64 {
return c.nackTracker.GetRatio()
}

View File

@@ -4,26 +4,14 @@ import (
"sync"
"time"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/protocol/logger"
)
const (
ProbeWaitBase = 5 * time.Second
ProbeBackoffFactor = 1.5
ProbeWaitMax = 2 * time.Minute
ProbeSettleWait = 250
ProbeSettleWaitMax = 10 * time.Second
ProbeTrendWait = 2 * time.Second
ProbePct = 120
ProbeMinBps = 200 * 1000 // 200 kbps
ProbeMinDuration = 20 * time.Second
ProbeMaxDuration = 21 * time.Second
)
// ---------------------------------------------------------------------------
type ProbeControllerParams struct {
Config config.CongestionControlProbeConfig
Prober *Prober
Logger logger.Logger
}
@@ -31,19 +19,23 @@ type ProbeControllerParams struct {
type ProbeController struct {
params ProbeControllerParams
lock sync.RWMutex
probeInterval time.Duration
lastProbeStartTime time.Time
probeGoalBps int64
probeClusterId ProbeClusterId
abortedProbeClusterId ProbeClusterId
probeTrendObserved bool
probeEndTime time.Time
lock sync.RWMutex
probeInterval time.Duration
lastProbeStartTime time.Time
probeGoalBps int64
probeClusterId ProbeClusterId
doneProbeClusterInfo ProbeClusterInfo
abortedProbeClusterId ProbeClusterId
goalReachedProbeClusterId ProbeClusterId
probeTrendObserved bool
probeEndTime time.Time
probeDuration time.Duration
}
func NewProbeController(params ProbeControllerParams) *ProbeController {
p := &ProbeController{
params: params,
params: params,
probeDuration: params.Config.MinDuration,
}
p.Reset()
@@ -57,45 +49,21 @@ func (p *ProbeController) Reset() {
p.lastProbeStartTime = time.Now()
p.resetProbeIntervalLocked()
p.resetProbeDurationLocked()
p.clearProbeLocked()
}
func (p *ProbeController) ProbeClusterDone(info ProbeClusterInfo, lowestEstimate int64) bool {
func (p *ProbeController) ProbeClusterDone(info ProbeClusterInfo) {
p.lock.Lock()
defer p.lock.Unlock()
if p.probeClusterId != info.Id {
p.params.Logger.Infow("not expected probe cluster", "probeClusterId", p.probeClusterId, "resetProbeClusterId", info.Id)
return false
return
}
if p.abortedProbeClusterId == ProbeClusterIdInvalid {
// successful probe, finalize
return p.finalizeProbeLocked()
}
// ensure probe queue is flushed
// STREAM-ALLOCATOR-TODO: ProbeSettleWait should actually be a certain number of RTTs.
expectedDuration := float64(info.BytesSent*8*1000) / float64(lowestEstimate)
queueTime := expectedDuration - float64(info.Duration.Milliseconds())
if queueTime < 0.0 {
queueTime = 0.0
}
queueWait := time.Duration(queueTime+float64(ProbeSettleWait)) * time.Millisecond
if queueWait > ProbeSettleWaitMax {
queueWait = ProbeSettleWaitMax
}
p.probeEndTime = p.lastProbeStartTime.Add(queueWait + info.Duration)
p.params.Logger.Infow(
"setting probe end time",
"probeClusterId", p.probeClusterId,
"expectedDuration", expectedDuration,
"queueTime", queueTime,
"queueWait", queueWait,
"probeEndTime", p.probeEndTime,
)
return false
p.doneProbeClusterInfo = info
}
func (p *ProbeController) CheckProbe(trend ChannelTrend, highestEstimate int64) {
@@ -111,7 +79,7 @@ func (p *ProbeController) CheckProbe(trend ChannelTrend, highestEstimate int64)
}
switch {
case !p.probeTrendObserved && time.Since(p.lastProbeStartTime) > ProbeTrendWait:
case !p.probeTrendObserved && time.Since(p.lastProbeStartTime) > p.params.Config.TrendWait:
//
// More of a safety net.
// In rare cases, the estimate gets stuck. Prevent from probe running amok
@@ -133,41 +101,87 @@ func (p *ProbeController) CheckProbe(trend ChannelTrend, highestEstimate int64)
"goal", p.probeGoalBps,
"highest", highestEstimate,
)
p.goalReachedProbeClusterId = p.probeClusterId
p.StopProbe()
}
}
func (p *ProbeController) MaybeFinalizeProbe() (isHandled bool, isSuccessful bool) {
func (p *ProbeController) MaybeFinalizeProbe(
isComplete bool,
trend ChannelTrend,
lowestEstimate int64,
) (isHandled bool, isNotFailing bool, isGoalReached bool) {
p.lock.Lock()
defer p.lock.Unlock()
if p.isInProbeLocked() && !p.probeEndTime.IsZero() && time.Now().After(p.probeEndTime) {
return true, p.finalizeProbeLocked()
if !p.isInProbeLocked() {
return false, false, false
}
return false, false
if p.goalReachedProbeClusterId != ProbeClusterIdInvalid {
// finalise goal reached probe cluster
p.finalizeProbeLocked(ChannelTrendNeutral)
return true, true, true
}
if (isComplete || p.abortedProbeClusterId != ProbeClusterIdInvalid) && p.probeEndTime.IsZero() && p.doneProbeClusterInfo.Id != ProbeClusterIdInvalid && p.doneProbeClusterInfo.Id == p.probeClusterId {
// ensure any queueing due to probing is flushed
// STREAM-ALLOCATOR-TODO: CongestionControlProbeConfig.SettleWait should actually be a certain number of RTTs.
expectedDuration := float64(9.0)
if lowestEstimate != 0 {
expectedDuration = float64(p.doneProbeClusterInfo.BytesSent*8*1000) / float64(lowestEstimate)
}
queueTime := expectedDuration - float64(p.doneProbeClusterInfo.Duration.Milliseconds())
if queueTime < 0.0 {
queueTime = 0.0
}
queueWait := (time.Duration(queueTime) * time.Millisecond) + p.params.Config.SettleWait
if queueWait > p.params.Config.SettleWaitMax {
queueWait = p.params.Config.SettleWaitMax
}
p.probeEndTime = p.lastProbeStartTime.Add(queueWait + p.doneProbeClusterInfo.Duration)
p.params.Logger.Infow(
"setting probe end time",
"probeClusterId", p.probeClusterId,
"expectedDuration", expectedDuration,
"queueTime", queueTime,
"queueWait", queueWait,
"probeEndTime", p.probeEndTime,
)
}
if !p.probeEndTime.IsZero() && time.Now().After(p.probeEndTime) {
// finalisze aborted or non-failing but non-goal-reached probe cluster
return true, p.finalizeProbeLocked(trend), false
}
return false, false, false
}
func (p *ProbeController) DoesProbeNeedFinalize() bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.abortedProbeClusterId != ProbeClusterIdInvalid
return p.abortedProbeClusterId != ProbeClusterIdInvalid || p.goalReachedProbeClusterId != ProbeClusterIdInvalid
}
func (p *ProbeController) finalizeProbeLocked() bool {
func (p *ProbeController) finalizeProbeLocked(trend ChannelTrend) (isNotFailing bool) {
aborted := p.probeClusterId == p.abortedProbeClusterId
p.clearProbeLocked()
if aborted {
if aborted || trend == ChannelTrendCongesting {
// failed probe, backoff
p.backoffProbeIntervalLocked()
p.resetProbeDurationLocked()
return false
}
// reset probe interval on a successful probe
// reset probe interval and increase probe duration on a upward trending probe
p.resetProbeIntervalLocked()
if trend == ChannelTrendClearing {
p.increaseProbeDurationLocked()
}
return true
}
@@ -178,13 +192,15 @@ func (p *ProbeController) InitProbe(probeGoalDeltaBps int64, expectedBandwidthUs
p.lastProbeStartTime = time.Now()
// overshoot a bit to account for noise (in measurement/estimate etc)
desiredIncreaseBps := (probeGoalDeltaBps * ProbePct) / 100
if desiredIncreaseBps < ProbeMinBps {
desiredIncreaseBps = ProbeMinBps
desiredIncreaseBps := (probeGoalDeltaBps * p.params.Config.OveragePct) / 100
if desiredIncreaseBps < p.params.Config.MinBps {
desiredIncreaseBps = p.params.Config.MinBps
}
p.probeGoalBps = expectedBandwidthUsage + desiredIncreaseBps
p.doneProbeClusterInfo = ProbeClusterInfo{Id: ProbeClusterIdInvalid}
p.abortedProbeClusterId = ProbeClusterIdInvalid
p.goalReachedProbeClusterId = ProbeClusterIdInvalid
p.probeTrendObserved = false
@@ -194,8 +210,8 @@ func (p *ProbeController) InitProbe(probeGoalDeltaBps int64, expectedBandwidthUs
ProbeClusterModeUniform,
int(p.probeGoalBps),
int(expectedBandwidthUsage),
ProbeMinDuration,
ProbeMaxDuration,
p.probeDuration,
time.Duration(float64(p.probeDuration.Milliseconds())*p.params.Config.DurationOverflowFactor)*time.Millisecond,
)
return p.probeClusterId, p.probeGoalBps
@@ -203,18 +219,31 @@ func (p *ProbeController) InitProbe(probeGoalDeltaBps int64, expectedBandwidthUs
func (p *ProbeController) clearProbeLocked() {
p.probeClusterId = ProbeClusterIdInvalid
p.doneProbeClusterInfo = ProbeClusterInfo{Id: ProbeClusterIdInvalid}
p.abortedProbeClusterId = ProbeClusterIdInvalid
p.goalReachedProbeClusterId = ProbeClusterIdInvalid
}
func (p *ProbeController) backoffProbeIntervalLocked() {
p.probeInterval = time.Duration(p.probeInterval.Seconds()*ProbeBackoffFactor) * time.Second
if p.probeInterval > ProbeWaitMax {
p.probeInterval = ProbeWaitMax
p.probeInterval = time.Duration(p.probeInterval.Seconds()*p.params.Config.BackoffFactor) * time.Second
if p.probeInterval > p.params.Config.MaxInterval {
p.probeInterval = p.params.Config.MaxInterval
}
}
func (p *ProbeController) resetProbeIntervalLocked() {
p.probeInterval = ProbeWaitBase
p.probeInterval = p.params.Config.BaseInterval
}
func (p *ProbeController) resetProbeDurationLocked() {
p.probeDuration = p.params.Config.MinDuration
}
func (p *ProbeController) increaseProbeDurationLocked() {
p.probeDuration = time.Duration(float64(p.probeDuration.Milliseconds())*p.params.Config.DurationIncreaseFactor) * time.Millisecond
if p.probeDuration > p.params.Config.MaxDuration {
p.probeDuration = p.params.Config.MaxDuration
}
}
func (p *ProbeController) StopProbe() {

View File

@@ -354,7 +354,7 @@ type ProbeClusterId uint32
const (
ProbeClusterIdInvalid ProbeClusterId = 0
bucketDuration = time.Second
bucketDuration = 100 * time.Millisecond
bytesPerProbe = 1000
minProbeRateBps = 10000
)
@@ -423,14 +423,14 @@ func NewCluster(id ProbeClusterId, mode ProbeClusterMode, desiredRateBps int, ex
}
func (c *Cluster) initBuckets(desiredRateBps int, expectedRateBps int, minDuration time.Duration) {
// split into 1-second bucket
// split into granular buckets
// NOTE: splitting even if mode is unitform
numBuckets := int((minDuration.Milliseconds() + bucketDuration.Milliseconds() - 1) / bucketDuration.Milliseconds())
if numBuckets < 1 {
numBuckets = 1
}
expectedRateBytes := (expectedRateBps + 7) / 8
expectedRateBytesPerSec := (expectedRateBps + 7) / 8
baseProbeRateBps := (desiredRateBps - expectedRateBps + numBuckets - 1) / numBuckets
runningDesiredBytes := 0
@@ -447,13 +447,13 @@ func (c *Cluster) initBuckets(desiredRateBps int, expectedRateBps int, minDurati
if bucketProbeRateBps < minProbeRateBps {
bucketProbeRateBps = minProbeRateBps
}
bucketProbeRateBytes := (bucketProbeRateBps + 7) / 8
bucketProbeRateBytesPerSec := (bucketProbeRateBps + 7) / 8
// pace based on bytes per probe
numProbes := (bucketProbeRateBytes + bytesPerProbe - 1) / bytesPerProbe
sleepDurationMicroSeconds := int(float64(1_000_000)/float64(numProbes) + 0.5)
numProbesPerSec := (bucketProbeRateBytesPerSec + bytesPerProbe - 1) / bytesPerProbe
sleepDurationMicroSeconds := int(float64(1_000_000)/float64(numProbesPerSec) + 0.5)
runningDesiredBytes += bucketProbeRateBytes + expectedRateBytes
runningDesiredBytes += (((bucketProbeRateBytesPerSec + expectedRateBytesPerSec) * int(bucketDuration.Milliseconds())) + 999) / 1000
runningDesiredElapsedTime += bucketDuration
c.buckets = append(c.buckets, clusterBucket{
@@ -537,10 +537,10 @@ func (c *Cluster) Process(pl ProberListener) {
if bytesShortFall < 0 {
bytesShortFall = 0
}
// cap short fall to limit to 8 packets in an iteration
// cap short fall to limit to 5 packets in an iteration
// 275 bytes per packet (255 max RTP padding payload + 20 bytes RTP header)
if bytesShortFall > (275 * 8) {
bytesShortFall = 275 * 8
if bytesShortFall > (275 * 5) {
bytesShortFall = 275 * 5
}
// round up to packet size
bytesShortFall = ((bytesShortFall + 274) / 275) * 275

View File

@@ -2,7 +2,6 @@ package streamallocator
import (
"fmt"
"math"
"sort"
"sync"
"time"
@@ -199,6 +198,7 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator {
}
s.probeController = NewProbeController(ProbeControllerParams{
Config: s.params.Config.ProbeConfig,
Prober: s.prober,
Logger: params.Logger,
})
@@ -555,7 +555,7 @@ func (s *StreamAllocator) processEvents() {
}
func (s *StreamAllocator) ping() {
ticker := time.NewTicker(500 * time.Millisecond)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
@@ -641,9 +641,14 @@ func (s *StreamAllocator) handleSignalEstimate(event *Event) {
func (s *StreamAllocator) handleSignalPeriodicPing(event *Event) {
// finalize probe if necessary
isHandled, isSuccessful := s.probeController.MaybeFinalizeProbe()
trend, _ := s.channelObserver.GetTrend()
isHandled, isNotFailing, isGoalReached := s.probeController.MaybeFinalizeProbe(
s.channelObserver.HasEnoughEstimateSamples(),
trend,
s.channelObserver.GetLowestEstimate(),
)
if isHandled {
s.onProbeDone(isSuccessful)
s.onProbeDone(isNotFailing, isGoalReached)
}
// probe if necessary and timing is right
@@ -677,10 +682,7 @@ func (s *StreamAllocator) handleSignalSendProbe(event *Event) {
func (s *StreamAllocator) handleSignalProbeClusterDone(event *Event) {
info, _ := event.Data.(ProbeClusterInfo)
isHandled := s.probeController.ProbeClusterDone(info, int64(math.Min(float64(s.committedChannelCapacity), float64(s.channelObserver.GetLowestEstimate()))))
if isHandled {
s.onProbeDone(true)
}
s.probeController.ProbeClusterDone(info)
}
func (s *StreamAllocator) handleSignalResume(event *Event) {
@@ -925,7 +927,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
s.adjustState()
}
func (s *StreamAllocator) onProbeDone(isSuccessful bool) {
func (s *StreamAllocator) onProbeDone(isNotFailing bool, isGoalReached bool) {
highestEstimateInProbe := s.channelObserver.GetHighestEstimate()
//
@@ -939,18 +941,23 @@ func (s *StreamAllocator) onProbeDone(isSuccessful bool) {
// NOTE: With TWCC, it is possible to reset bandwidth estimation to clean state as
// the send side is in full control of bandwidth estimation.
//
channelObserverString := s.channelObserver.ToString()
s.channelObserver = s.newChannelObserverNonProbe()
if !isSuccessful {
s.params.Logger.Infow(
"probe done",
"isNotFailing", isNotFailing,
"isGoalReached", isGoalReached,
"committedEstimate", s.committedChannelCapacity,
"highestEstimate", highestEstimateInProbe,
"channel", channelObserverString,
)
if !isNotFailing {
return
}
// probe estimate is same or higher, commit it and try to allocate deficient tracks
s.params.Logger.Infow(
"successful probe, updating channel capacity",
"old(bps)", s.committedChannelCapacity,
"new(bps)", highestEstimateInProbe,
)
s.committedChannelCapacity = highestEstimateInProbe
if highestEstimateInProbe > s.committedChannelCapacity {
s.committedChannelCapacity = highestEstimateInProbe
}
s.maybeBoostDeficientTracks()
}

View File

@@ -122,6 +122,10 @@ func (t *TrendDetector) GetDirection() TrendDirection {
return t.direction
}
func (t *TrendDetector) HasEnoughSamples() bool {
return t.numSamples >= t.params.RequiredSamples
}
func (t *TrendDetector) ToString() string {
now := time.Now()
elapsed := now.Sub(t.startTime).Seconds()