mirror of
https://github.com/livekit/livekit.git
synced 2026-05-14 14:05:18 +00:00
Stream allocator tweaks (#1936)
* Prevent re-allocation if possible * log commitThreshold * Collapse same values in the front
This commit is contained in:
@@ -149,8 +149,10 @@ type CongestionControlChannelObserverConfig struct {
|
||||
}
|
||||
|
||||
type CongestionControlConfig struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
AllowPause bool `yaml:"allow_pause"`
|
||||
Enabled bool `yaml:"enabled,omitempty"`
|
||||
AllowPause bool `yaml:"allow_pause,omitempty"`
|
||||
NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"`
|
||||
ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"`
|
||||
UseSendSideBWE bool `yaml:"send_side_bandwidth_estimation,omitempty"`
|
||||
ProbeMode CongestionControlProbeMode `yaml:"padding_mode,omitempty"`
|
||||
MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"`
|
||||
@@ -316,9 +318,11 @@ var DefaultConfig = Config{
|
||||
HighQuality: time.Second,
|
||||
},
|
||||
CongestionControl: CongestionControlConfig{
|
||||
Enabled: true,
|
||||
AllowPause: false,
|
||||
ProbeMode: CongestionControlProbeModePadding,
|
||||
Enabled: true,
|
||||
AllowPause: false,
|
||||
NackRatioAttenuator: 0.4,
|
||||
ExpectedUsageThreshold: 0.95,
|
||||
ProbeMode: CongestionControlProbeModePadding,
|
||||
ProbeConfig: CongestionControlProbeConfig{
|
||||
BaseInterval: 3 * time.Second,
|
||||
BackoffFactor: 1.5,
|
||||
|
||||
@@ -36,8 +36,6 @@ import (
|
||||
const (
|
||||
ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps
|
||||
|
||||
NackRatioAttenuator = 0.4 // how much to attenuate NACK ratio while calculating loss adjusted estimate
|
||||
|
||||
PriorityMin = uint8(1)
|
||||
PriorityMax = uint8(255)
|
||||
PriorityDefaultScreenshare = PriorityMax
|
||||
@@ -785,30 +783,42 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() {
|
||||
expectedBandwidthUsage := s.getExpectedBandwidthUsage()
|
||||
switch reason {
|
||||
case ChannelCongestionReasonLoss:
|
||||
estimateToCommit = int64(float64(expectedBandwidthUsage) * (1.0 - NackRatioAttenuator*s.channelObserver.GetNackRatio()))
|
||||
if estimateToCommit > s.lastReceivedEstimate {
|
||||
estimateToCommit = s.lastReceivedEstimate
|
||||
}
|
||||
estimateToCommit = int64(float64(expectedBandwidthUsage) * (1.0 - s.params.Config.NackRatioAttenuator*s.channelObserver.GetNackRatio()))
|
||||
default:
|
||||
estimateToCommit = s.lastReceivedEstimate
|
||||
}
|
||||
if estimateToCommit > s.lastReceivedEstimate {
|
||||
estimateToCommit = s.lastReceivedEstimate
|
||||
}
|
||||
|
||||
commitThreshold := int64(s.params.Config.ExpectedUsageThreshold * float64(expectedBandwidthUsage))
|
||||
action := "applying"
|
||||
if estimateToCommit > commitThreshold {
|
||||
action = "skipping"
|
||||
}
|
||||
|
||||
s.params.Logger.Infow(
|
||||
"stream allocator: channel congestion detected, updating channel capacity",
|
||||
fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity update", action),
|
||||
"reason", reason,
|
||||
"old(bps)", s.committedChannelCapacity,
|
||||
"new(bps)", estimateToCommit,
|
||||
"lastReceived(bps)", s.lastReceivedEstimate,
|
||||
"expectedUsage(bps)", expectedBandwidthUsage,
|
||||
"commitThreshold(bps)", commitThreshold,
|
||||
"channel", s.channelObserver.ToString(),
|
||||
)
|
||||
s.params.Logger.Infow(
|
||||
"stream allocator: channel congestion detected, updating channel capacity: experimental",
|
||||
fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity: experimental", action),
|
||||
"rateHistory", s.rateMonitor.GetHistory(),
|
||||
"expectedQueuing", s.rateMonitor.GetQueuingGuess(),
|
||||
"nackHistory", s.channelObserver.GetNackHistory(),
|
||||
"trackHistory", s.getTracksHistory(),
|
||||
)
|
||||
if estimateToCommit > commitThreshold {
|
||||
// estimate to commit is either higher or within tolerance of expected uage, skip committing and re-allocating
|
||||
return
|
||||
}
|
||||
|
||||
s.committedChannelCapacity = estimateToCommit
|
||||
|
||||
// reset to get new set of samples for next trend
|
||||
|
||||
@@ -167,6 +167,7 @@ func (t *TrendDetector) ToString() string {
|
||||
|
||||
func (t *TrendDetector) prune() {
|
||||
// prune based on a few rules
|
||||
|
||||
// 1. If there are more than required samples
|
||||
if len(t.samples) > t.params.RequiredSamples {
|
||||
t.samples = t.samples[len(t.samples)-t.params.RequiredSamples:]
|
||||
@@ -187,18 +188,21 @@ func (t *TrendDetector) prune() {
|
||||
}
|
||||
}
|
||||
|
||||
// 3. If all sample values are same, collapse to just the last one
|
||||
// 3. collapse same values at the front to just the last of those samples
|
||||
if len(t.samples) != 0 {
|
||||
sameValue := true
|
||||
cutoffIndex := -1
|
||||
firstValue := t.samples[0].value
|
||||
for i := 0; i < len(t.samples); i++ {
|
||||
for i := 1; i < len(t.samples); i++ {
|
||||
if t.samples[i].value != firstValue {
|
||||
sameValue = false
|
||||
cutoffIndex = i - 1
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if sameValue {
|
||||
if cutoffIndex >= 0 {
|
||||
t.samples = t.samples[cutoffIndex:]
|
||||
} else {
|
||||
// all values are the same, just keep the last one
|
||||
t.samples = t.samples[len(t.samples)-1:]
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user