diff --git a/pkg/config/config.go b/pkg/config/config.go index 6a7a57943..368282294 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -149,8 +149,10 @@ type CongestionControlChannelObserverConfig struct { } type CongestionControlConfig struct { - Enabled bool `yaml:"enabled"` - AllowPause bool `yaml:"allow_pause"` + Enabled bool `yaml:"enabled,omitempty"` + AllowPause bool `yaml:"allow_pause,omitempty"` + NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"` + ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"` UseSendSideBWE bool `yaml:"send_side_bandwidth_estimation,omitempty"` ProbeMode CongestionControlProbeMode `yaml:"padding_mode,omitempty"` MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"` @@ -316,9 +318,11 @@ var DefaultConfig = Config{ HighQuality: time.Second, }, CongestionControl: CongestionControlConfig{ - Enabled: true, - AllowPause: false, - ProbeMode: CongestionControlProbeModePadding, + Enabled: true, + AllowPause: false, + NackRatioAttenuator: 0.4, + ExpectedUsageThreshold: 0.95, + ProbeMode: CongestionControlProbeModePadding, ProbeConfig: CongestionControlProbeConfig{ BaseInterval: 3 * time.Second, BackoffFactor: 1.5, diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index aa274be3e..48d7eb14c 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -36,8 +36,6 @@ import ( const ( ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps - NackRatioAttenuator = 0.4 // how much to attenuate NACK ratio while calculating loss adjusted estimate - PriorityMin = uint8(1) PriorityMax = uint8(255) PriorityDefaultScreenshare = PriorityMax @@ -785,30 +783,42 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() { expectedBandwidthUsage := s.getExpectedBandwidthUsage() switch reason { case ChannelCongestionReasonLoss: - estimateToCommit = int64(float64(expectedBandwidthUsage) * (1.0 - NackRatioAttenuator*s.channelObserver.GetNackRatio())) - if estimateToCommit > s.lastReceivedEstimate { - estimateToCommit = s.lastReceivedEstimate - } + estimateToCommit = int64(float64(expectedBandwidthUsage) * (1.0 - s.params.Config.NackRatioAttenuator*s.channelObserver.GetNackRatio())) default: estimateToCommit = s.lastReceivedEstimate } + if estimateToCommit > s.lastReceivedEstimate { + estimateToCommit = s.lastReceivedEstimate + } + + commitThreshold := int64(s.params.Config.ExpectedUsageThreshold * float64(expectedBandwidthUsage)) + action := "applying" + if estimateToCommit > commitThreshold { + action = "skipping" + } s.params.Logger.Infow( - "stream allocator: channel congestion detected, updating channel capacity", + fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity update", action), "reason", reason, "old(bps)", s.committedChannelCapacity, "new(bps)", estimateToCommit, "lastReceived(bps)", s.lastReceivedEstimate, "expectedUsage(bps)", expectedBandwidthUsage, + "commitThreshold(bps)", commitThreshold, "channel", s.channelObserver.ToString(), ) s.params.Logger.Infow( - "stream allocator: channel congestion detected, updating channel capacity: experimental", + fmt.Sprintf("stream allocator: channel congestion detected, %s channel capacity: experimental", action), "rateHistory", s.rateMonitor.GetHistory(), "expectedQueuing", s.rateMonitor.GetQueuingGuess(), "nackHistory", s.channelObserver.GetNackHistory(), "trackHistory", s.getTracksHistory(), ) + if estimateToCommit > commitThreshold { + // estimate to commit is either higher or within tolerance of expected uage, skip committing and re-allocating + return + } + s.committedChannelCapacity = estimateToCommit // reset to get new set of samples for next trend diff --git a/pkg/sfu/streamallocator/trenddetector.go b/pkg/sfu/streamallocator/trenddetector.go index 019716f27..74b96230a 100644 --- a/pkg/sfu/streamallocator/trenddetector.go +++ b/pkg/sfu/streamallocator/trenddetector.go @@ -167,6 +167,7 @@ func (t *TrendDetector) ToString() string { func (t *TrendDetector) prune() { // prune based on a few rules + // 1. If there are more than required samples if len(t.samples) > t.params.RequiredSamples { t.samples = t.samples[len(t.samples)-t.params.RequiredSamples:] @@ -187,18 +188,21 @@ func (t *TrendDetector) prune() { } } - // 3. If all sample values are same, collapse to just the last one + // 3. collapse same values at the front to just the last of those samples if len(t.samples) != 0 { - sameValue := true + cutoffIndex := -1 firstValue := t.samples[0].value - for i := 0; i < len(t.samples); i++ { + for i := 1; i < len(t.samples); i++ { if t.samples[i].value != firstValue { - sameValue = false + cutoffIndex = i - 1 break } } - if sameValue { + if cutoffIndex >= 0 { + t.samples = t.samples[cutoffIndex:] + } else { + // all values are the same, just keep the last one t.samples = t.samples[len(t.samples)-1:] } }