From 6e9964e80b05d55286b811e034e4f21afe670df4 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 21 Dec 2024 23:08:38 +0530 Subject: [PATCH] TWCC tweaks (#3282) * SSBWE experimentation * dqr hysteresis * fixes * fmt * more relaxed DQR * pps proportional to duration * clean up * clean up * don't need gratuitous up allocation --- pkg/sfu/bwe/bwe.go | 6 - pkg/sfu/bwe/remotebwe/remote_bwe.go | 31 +-- .../bwe/sendsidebwe/congestion_detector.go | 261 +++++++++++------- pkg/sfu/bwe/sendsidebwe/traffic_stats.go | 17 +- pkg/sfu/streamallocator/streamallocator.go | 15 +- 5 files changed, 185 insertions(+), 145 deletions(-) diff --git a/pkg/sfu/bwe/bwe.go b/pkg/sfu/bwe/bwe.go index 3185c7573..5c806df04 100644 --- a/pkg/sfu/bwe/bwe.go +++ b/pkg/sfu/bwe/bwe.go @@ -36,9 +36,7 @@ type CongestionState int const ( CongestionStateNone CongestionState = iota CongestionStateEarlyWarning - CongestionStateEarlyWarningHangover CongestionStateCongested - CongestionStateCongestedHangover ) func (c CongestionState) String() string { @@ -47,12 +45,8 @@ func (c CongestionState) String() string { return "NONE" case CongestionStateEarlyWarning: return "EARLY_WARNING" - case CongestionStateEarlyWarningHangover: - return "EARLY_WARNING_HANGOVER" case CongestionStateCongested: return "CONGESTED" - case CongestionStateCongestedHangover: - return "CONGESTED_HANGOVER" default: return fmt.Sprintf("%d", int(c)) } diff --git a/pkg/sfu/bwe/remotebwe/remote_bwe.go b/pkg/sfu/bwe/remotebwe/remote_bwe.go index bfe90f87c..f7aaaabaf 100644 --- a/pkg/sfu/bwe/remotebwe/remote_bwe.go +++ b/pkg/sfu/bwe/remotebwe/remote_bwe.go @@ -27,22 +27,20 @@ import ( // --------------------------------------------------------------------------- type RemoteBWEConfig struct { - NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"` - ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"` - ChannelObserverProbe ChannelObserverConfig `yaml:"channel_observer_probe,omitempty"` - ChannelObserverNonProbe ChannelObserverConfig `yaml:"channel_observer_non_probe,omitempty"` - CongestedHangoverDuration time.Duration `yaml:"congested_hangover_duration,omitempty"` - ProbeController ProbeControllerConfig `yaml:"probe_controller,omitempty"` + NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"` + ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"` + ChannelObserverProbe ChannelObserverConfig `yaml:"channel_observer_probe,omitempty"` + ChannelObserverNonProbe ChannelObserverConfig `yaml:"channel_observer_non_probe,omitempty"` + ProbeController ProbeControllerConfig `yaml:"probe_controller,omitempty"` } var ( DefaultRemoteBWEConfig = RemoteBWEConfig{ - NackRatioAttenuator: 0.4, - ExpectedUsageThreshold: 0.95, - ChannelObserverProbe: defaultChannelObserverConfigProbe, - ChannelObserverNonProbe: defaultChannelObserverConfigNonProbe, - CongestedHangoverDuration: 3 * time.Second, - ProbeController: DefaultProbeControllerConfig, + NackRatioAttenuator: 0.4, + ExpectedUsageThreshold: 0.95, + ChannelObserverProbe: defaultChannelObserverConfigProbe, + ChannelObserverNonProbe: defaultChannelObserverConfigNonProbe, + ProbeController: DefaultProbeControllerConfig, } ) @@ -177,15 +175,6 @@ func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState update = true } } else { - toState = bwe.CongestionStateCongestedHangover - } - - case bwe.CongestionStateCongestedHangover: - if trend == channelTrendCongesting { - if r.estimateAvailableChannelCapacity(reason) { - toState = bwe.CongestionStateCongested - } - } else if time.Since(r.congestionStateSwitchedAt) >= r.params.Config.CongestedHangoverDuration { toState = bwe.CongestionStateNone } } diff --git a/pkg/sfu/bwe/sendsidebwe/congestion_detector.go b/pkg/sfu/bwe/sendsidebwe/congestion_detector.go index c4f49ee8f..13ca8ce93 100644 --- a/pkg/sfu/bwe/sendsidebwe/congestion_detector.go +++ b/pkg/sfu/bwe/sendsidebwe/congestion_detector.go @@ -39,22 +39,42 @@ func (c CongestionSignalConfig) IsTriggered(numGroups int, duration int64) bool } var ( - defaultQueuingDelayEarlyWarningCongestionSignalConfig = CongestionSignalConfig{ + defaultQueuingDelayEarlyWarningJQRConfig = CongestionSignalConfig{ MinNumberOfGroups: 2, MinDuration: 200 * time.Millisecond, } - defaultLossEarlyWarningCongestionSignalConfig = CongestionSignalConfig{ + defaultQueuingDelayEarlyWarningDQRConfig = CongestionSignalConfig{ MinNumberOfGroups: 3, MinDuration: 300 * time.Millisecond, } - defaultQueuingDelayCongestedCongestionSignalConfig = CongestionSignalConfig{ + defaultLossEarlyWarningJQRConfig = CongestionSignalConfig{ + MinNumberOfGroups: 3, + MinDuration: 300 * time.Millisecond, + } + + defaultLossEarlyWarningDQRConfig = CongestionSignalConfig{ MinNumberOfGroups: 4, MinDuration: 400 * time.Millisecond, } - defaultLossCongestedCongestionSignalConfig = CongestionSignalConfig{ + defaultQueuingDelayCongestedJQRConfig = CongestionSignalConfig{ + MinNumberOfGroups: 4, + MinDuration: 400 * time.Millisecond, + } + + defaultQueuingDelayCongestedDQRConfig = CongestionSignalConfig{ + MinNumberOfGroups: 5, + MinDuration: 500 * time.Millisecond, + } + + defaultLossCongestedJQRConfig = CongestionSignalConfig{ + MinNumberOfGroups: 6, + MinDuration: 600 * time.Millisecond, + } + + defaultLossCongestedDQRConfig = CongestionSignalConfig{ MinNumberOfGroups: 6, MinDuration: 600 * time.Millisecond, } @@ -158,12 +178,14 @@ func (c congestionReason) String() string { // ------------------------------------------------------------------------------- type qdMeasurement struct { - config CongestionSignalConfig - jqrMin int64 - dqrMax int64 + jqrConfig CongestionSignalConfig + dqrConfig CongestionSignalConfig + jqrMin int64 + dqrMax int64 numGroups int numJQRGroups int + numDQRGroups int minSendTime int64 maxSendTime int64 @@ -174,9 +196,10 @@ type qdMeasurement struct { queuingRegion queuingRegion } -func newQdMeasurement(config CongestionSignalConfig, jqrMin int64, dqrMax int64) *qdMeasurement { +func newQDMeasurement(jqrConfig CongestionSignalConfig, dqrConfig CongestionSignalConfig, jqrMin int64, dqrMax int64) *qdMeasurement { return &qdMeasurement{ - config: config, + jqrConfig: jqrConfig, + dqrConfig: dqrConfig, jqrMin: jqrMin, dqrMax: dqrMax, queuingRegion: queuingRegionIndeterminate, @@ -199,25 +222,40 @@ func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) { } q.maxGroupIdx = max(q.maxGroupIdx, groupIdx) + minSendTime, maxSendTime := pg.SendWindow() + if q.minSendTime == 0 || minSendTime < q.minSendTime { + q.minSendTime = minSendTime + } + q.maxSendTime = max(q.maxSendTime, maxSendTime) + if pqd < q.dqrMax { - // a DQR breaks continuity - q.isSealed = true - q.queuingRegion = queuingRegionDQR - return + q.numDQRGroups++ + if q.numJQRGroups > 0 { + // JQR continuity is broken + q.isSealed = true + return + } + + if q.dqrConfig.IsTriggered(q.numDQRGroups, q.maxSendTime-q.minSendTime) { + q.isSealed = true + q.queuingRegion = queuingRegionDQR + return + } } if pqd > q.jqrMin { q.numJQRGroups++ - minSendTime, maxSendTime := pg.SendWindow() - if q.minSendTime == 0 || minSendTime < q.minSendTime { - q.minSendTime = minSendTime + if q.numDQRGroups > 0 { + // DQR continuity is broken + q.isSealed = true + return } - q.maxSendTime = max(q.maxSendTime, maxSendTime) - } - if q.config.IsTriggered(q.numJQRGroups, q.maxSendTime-q.minSendTime) { - q.isSealed = true - q.queuingRegion = queuingRegionJQR + if q.jqrConfig.IsTriggered(q.numJQRGroups, q.maxSendTime-q.minSendTime) { + q.isSealed = true + q.queuingRegion = queuingRegionJQR + return + } } } @@ -240,6 +278,7 @@ func (q *qdMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddInt("numGroups", q.numGroups) e.AddInt("numJQRGroups", q.numJQRGroups) + e.AddInt("numDQRGroups", q.numDQRGroups) e.AddInt64("minSendTime", q.minSendTime) e.AddInt64("maxSendTime", q.maxSendTime) e.AddDuration("duration", time.Duration((q.maxSendTime-q.minSendTime)*1000)) @@ -253,14 +292,16 @@ func (q *qdMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error { // ------------------------------------------------------------------------------- type lossMeasurement struct { - config CongestionSignalConfig + jqrConfig CongestionSignalConfig + dqrConfig CongestionSignalConfig jqrMinLoss float64 dqrMaxLoss float64 numGroups int ts *trafficStats - isSealed bool + isJQRSealed bool + isDQRSealed bool minGroupIdx int maxGroupIdx int @@ -270,14 +311,16 @@ type lossMeasurement struct { } func newLossMeasurement( - config CongestionSignalConfig, + jqrConfig CongestionSignalConfig, + dqrConfig CongestionSignalConfig, weightedLossConfig WeightedLossConfig, jqrMinLoss float64, dqrMaxLoss float64, logger logger.Logger, ) *lossMeasurement { return &lossMeasurement{ - config: config, + jqrConfig: jqrConfig, + dqrConfig: dqrConfig, jqrMinLoss: jqrMinLoss, dqrMaxLoss: dqrMaxLoss, ts: newTrafficStats(trafficStatsParams{ @@ -289,7 +332,7 @@ func newLossMeasurement( } func (l *lossMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) { - if l.isSealed || !pg.IsFinalized() { + if (l.isJQRSealed && l.isDQRSealed) || !pg.IsFinalized() { return } @@ -301,21 +344,31 @@ func (l *lossMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) { l.ts.Merge(pg.Traffic()) - duration := l.ts.Duration() - if l.config.IsTriggered(l.numGroups, duration) { - l.isSealed = true - l.weightedLoss = l.ts.WeightedLoss() + if !l.isJQRSealed && l.jqrConfig.IsTriggered(l.numGroups, l.ts.Duration()) { + l.isJQRSealed = true + weightedLoss := l.ts.WeightedLoss() + if weightedLoss > l.jqrMinLoss { + l.weightedLoss = weightedLoss + l.queuingRegion = queuingRegionJQR + l.isDQRSealed = true // seal DQR also as JQR is already hit + return + } + } + + if l.dqrConfig.IsTriggered(l.numGroups, l.ts.Duration()) { + l.isDQRSealed = true + + l.weightedLoss = l.ts.WeightedLoss() if l.weightedLoss < l.dqrMaxLoss { l.queuingRegion = queuingRegionDQR - } else if l.weightedLoss > l.jqrMinLoss { - l.queuingRegion = queuingRegionJQR + return } } } func (l *lossMeasurement) IsSealed() bool { - return l.isSealed + return l.isJQRSealed && l.isDQRSealed } func (l *lossMeasurement) QueuingRegion() queuingRegion { @@ -333,7 +386,8 @@ func (l *lossMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddInt("numGroups", l.numGroups) e.AddObject("ts", l.ts) - e.AddBool("isSealed", l.isSealed) + e.AddBool("isJQRSealed", l.isJQRSealed) + e.AddBool("isDQRSealed", l.isDQRSealed) e.AddInt("minGroupIdx", l.minGroupIdx) e.AddInt("maxGroupIdx", l.maxGroupIdx) e.AddFloat64("weightedLoss", l.weightedLoss) @@ -358,13 +412,15 @@ type CongestionDetectorConfig struct { JQRMinWeightedLoss float64 `yaml:"jqr_min_weighted_loss,omitempty"` DQRMaxWeightedLoss float64 `yaml:"dqr_max_weighted_loss,omitempty"` - QueuingDelayEarlyWarning CongestionSignalConfig `yaml:"queuing_delay_early_warning,omitempty"` - LossEarlyWarning CongestionSignalConfig `yaml:"loss_early_warning,omitempty"` - EarlyWarningHangover time.Duration `yaml:"early_warning_hangover,omitempty"` + QueuingDelayEarlyWarningJQR CongestionSignalConfig `yaml:"queuing_delay_early_warning_jqr,omitempty"` + QueuingDelayEarlyWarningDQR CongestionSignalConfig `yaml:"queuing_delay_early_warning_dqr,omitempty"` + LossEarlyWarningJQR CongestionSignalConfig `yaml:"loss_early_warning_jqr,omitempty"` + LossEarlyWarningDQR CongestionSignalConfig `yaml:"loss_early_warning_dqr,omitempty"` - QueuingDelayCongested CongestionSignalConfig `yaml:"queuing_delay_congested,omitempty"` - LossCongested CongestionSignalConfig `yaml:"loss_congested,omitempty"` - CongestedHangover time.Duration `yaml:"congested_hangover,omitempty"` + QueuingDelayCongestedJQR CongestionSignalConfig `yaml:"queuing_delay_congested_jqr,omitempty"` + QueuingDelayCongestedDQR CongestionSignalConfig `yaml:"queuing_delay_congested_dqr,omitempty"` + LossCongestedJQR CongestionSignalConfig `yaml:"loss_congested_jqr,omitempty"` + LossCongestedDQR CongestionSignalConfig `yaml:"loss_congested_dqr,omitempty"` CongestedCTRTrend ccutils.TrendDetectorConfig `yaml:"congested_ctr_trend,omitempty"` CongestedCTREpsilon float64 `yaml:"congested_ctr_epsilon,omitempty"` @@ -390,7 +446,7 @@ var ( defaultCongestionDetectorConfig = CongestionDetectorConfig{ PacketGroup: defaultPacketGroupConfig, - PacketGroupMaxAge: 5 * time.Second, + PacketGroupMaxAge: 10 * time.Second, ProbePacketGroup: defaultProbePacketGroupConfig, ProbeRegulator: ccutils.DefaultProbeRegulatorConfig, @@ -403,13 +459,15 @@ var ( JQRMinWeightedLoss: 0.25, DQRMaxWeightedLoss: 0.1, - QueuingDelayEarlyWarning: defaultQueuingDelayEarlyWarningCongestionSignalConfig, - LossEarlyWarning: defaultLossEarlyWarningCongestionSignalConfig, - EarlyWarningHangover: 500 * time.Millisecond, + QueuingDelayEarlyWarningJQR: defaultQueuingDelayEarlyWarningJQRConfig, + QueuingDelayEarlyWarningDQR: defaultQueuingDelayEarlyWarningDQRConfig, + LossEarlyWarningJQR: defaultLossEarlyWarningJQRConfig, + LossEarlyWarningDQR: defaultLossEarlyWarningDQRConfig, - QueuingDelayCongested: defaultQueuingDelayCongestedCongestionSignalConfig, - LossCongested: defaultLossCongestedCongestionSignalConfig, - CongestedHangover: 3 * time.Second, + QueuingDelayCongestedJQR: defaultQueuingDelayCongestedJQRConfig, + QueuingDelayCongestedDQR: defaultQueuingDelayCongestedDQRConfig, + LossCongestedJQR: defaultLossCongestedJQRConfig, + LossCongestedDQR: defaultLossCongestedDQRConfig, CongestedCTRTrend: defaultTrendDetectorConfigCongestedCTR, CongestedCTREpsilon: 0.05, @@ -451,10 +509,10 @@ type congestionDetector struct { congestedTrafficStats *trafficStats congestedPacketGroup *packetGroup - queuingRegion queuingRegion - congestionReason congestionReason - jqrQDMeasurement *qdMeasurement - jqrLossMeasurement *lossMeasurement + queuingRegion queuingRegion + congestionReason congestionReason + qdMeasurement *qdMeasurement + lossMeasurement *lossMeasurement bweListener bwe.BWEListener } @@ -494,8 +552,8 @@ func (c *congestionDetector) Reset() { c.queuingRegion = queuingRegionIndeterminate c.congestionReason = congestionReasonNone - c.jqrQDMeasurement = nil - c.jqrLossMeasurement = nil + c.qdMeasurement = nil + c.lossMeasurement = nil } func (c *congestionDetector) SetBWEListener(bweListener bwe.BWEListener) { @@ -757,8 +815,17 @@ func (c *congestionDetector) ProbeClusterFinalize() (ccutils.ProbeSignal, int64, "isSignalValid", isSignalValid, "probeClusterInfo", pci, "probePacketGroup", c.probePacketGroup, + "congestionState", c.congestionState, ) + // if congestion signal changed during probe, defer to that signal + if c.congestionState != bwe.CongestionStateNone { + probeSignal := ccutils.ProbeSignalCongesting + c.probeRegulator.ProbeSignal(probeSignal, pci.CreatedAt) + c.probePacketGroup = nil + return probeSignal, c.estimatedAvailableChannelCapacity, true + } + probeSignal, estimatedAvailableChannelCapacity := c.params.Config.ProbeSignal.ProbeSignal(c.probePacketGroup) if probeSignal == ccutils.ProbeSignalNotCongesting && estimatedAvailableChannelCapacity > c.estimatedAvailableChannelCapacity { c.estimatedAvailableChannelCapacity = estimatedAvailableChannelCapacity @@ -789,16 +856,20 @@ func (c *congestionDetector) prunePacketGroups() { func (c *congestionDetector) updateCongestionSignal( stage string, - qdConfig CongestionSignalConfig, - lossConfig CongestionSignalConfig, + qdJQRConfig CongestionSignalConfig, + qdDQRConfig CongestionSignalConfig, + lossJQRConfig CongestionSignalConfig, + lossDQRConfig CongestionSignalConfig, ) { - qdMeasurement := newQdMeasurement( - qdConfig, + c.qdMeasurement = newQDMeasurement( + qdJQRConfig, + qdDQRConfig, c.params.Config.JQRMinDelay.Microseconds(), c.params.Config.DQRMaxDelay.Microseconds(), ) - lossMeasurement := newLossMeasurement( - lossConfig, + c.lossMeasurement = newLossMeasurement( + lossJQRConfig, + lossDQRConfig, c.params.Config.WeightedLoss, c.params.Config.JQRMinWeightedLoss, c.params.Config.DQRMaxWeightedLoss, @@ -808,28 +879,23 @@ func (c *congestionDetector) updateCongestionSignal( var idx int for idx = len(c.packetGroups) - 1; idx >= 0; idx-- { pg := c.packetGroups[idx] - qdMeasurement.ProcessPacketGroup(pg, idx) - lossMeasurement.ProcessPacketGroup(pg, idx) + c.qdMeasurement.ProcessPacketGroup(pg, idx) + c.lossMeasurement.ProcessPacketGroup(pg, idx) // if both measurements have enough data to make a decision, stop processing groups - if qdMeasurement.IsSealed() && lossMeasurement.IsSealed() { + if c.qdMeasurement.IsSealed() && c.lossMeasurement.IsSealed() { break } } c.congestionReason = congestionReasonNone - c.queuingRegion = qdMeasurement.QueuingRegion() + c.queuingRegion = c.qdMeasurement.QueuingRegion() if c.queuingRegion == queuingRegionJQR { c.congestionReason = congestionReasonQueuingDelay - c.jqrQDMeasurement = qdMeasurement } else { - c.jqrQDMeasurement = nil - c.queuingRegion = lossMeasurement.QueuingRegion() + c.queuingRegion = c.lossMeasurement.QueuingRegion() if c.queuingRegion == queuingRegionJQR { c.congestionReason = congestionReasonLoss - c.jqrLossMeasurement = lossMeasurement - } else { - c.jqrLossMeasurement = nil } } } @@ -837,16 +903,20 @@ func (c *congestionDetector) updateCongestionSignal( func (c *congestionDetector) updateEarlyWarningSignal() { c.updateCongestionSignal( "early-warning", - c.params.Config.QueuingDelayEarlyWarning, - c.params.Config.LossEarlyWarning, + c.params.Config.QueuingDelayEarlyWarningJQR, + c.params.Config.QueuingDelayEarlyWarningDQR, + c.params.Config.LossEarlyWarningJQR, + c.params.Config.LossEarlyWarningDQR, ) } func (c *congestionDetector) updateCongestedSignal() { c.updateCongestionSignal( "congested", - c.params.Config.QueuingDelayCongested, - c.params.Config.LossCongested, + c.params.Config.QueuingDelayCongestedJQR, + c.params.Config.QueuingDelayCongestedDQR, + c.params.Config.LossCongestedJQR, + c.params.Config.LossCongestedDQR, ) } @@ -868,29 +938,13 @@ func (c *congestionDetector) congestionDetectionStateMachine() (bool, bwe.Conges } else { c.updateEarlyWarningSignal() if c.queuingRegion == queuingRegionDQR { - toState = bwe.CongestionStateEarlyWarningHangover + toState = bwe.CongestionStateNone } } - case bwe.CongestionStateEarlyWarningHangover: - c.updateEarlyWarningSignal() - if c.queuingRegion == queuingRegionJQR { - toState = bwe.CongestionStateEarlyWarning - } else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.EarlyWarningHangover { - toState = bwe.CongestionStateNone - } - case bwe.CongestionStateCongested: c.updateCongestedSignal() if c.queuingRegion == queuingRegionDQR { - toState = bwe.CongestionStateCongestedHangover - } - - case bwe.CongestionStateCongestedHangover: - c.updateEarlyWarningSignal() - if c.queuingRegion == queuingRegionJQR { - toState = bwe.CongestionStateEarlyWarning - } else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.CongestedHangover { toState = bwe.CongestionStateNone } } @@ -1001,10 +1055,14 @@ func (c *congestionDetector) estimateAvailableChannelCapacity() { useWindow := false isAggValid := true minGroupIdx := 0 - if c.jqrQDMeasurement != nil { - minGroupIdx, _ = c.jqrQDMeasurement.GroupRange() - } else if c.jqrLossMeasurement != nil { - minGroupIdx, _ = c.jqrLossMeasurement.GroupRange() + maxGroupIdx := len(c.packetGroups) - 1 + if c.queuingRegion == queuingRegionJQR { + switch c.congestionReason { + case congestionReasonQueuingDelay: + minGroupIdx, maxGroupIdx = c.qdMeasurement.GroupRange() + case congestionReasonLoss: + minGroupIdx, maxGroupIdx = c.lossMeasurement.GroupRange() + } } else { useWindow = true isAggValid = false @@ -1014,7 +1072,7 @@ func (c *congestionDetector) estimateAvailableChannelCapacity() { Config: c.params.Config.WeightedLoss, Logger: c.params.Logger, }) - for idx := len(c.packetGroups) - 1; idx >= minGroupIdx; idx-- { + for idx := maxGroupIdx; idx >= minGroupIdx; idx-- { pg := c.packetGroups[idx] if !pg.IsFinalized() { continue @@ -1039,21 +1097,22 @@ func (c *congestionDetector) updateCongestionState(state bwe.CongestionState) (b "to", state, "queuingRegion", c.queuingRegion, "congestionReason", c.congestionReason, + "qdMeasurement", c.qdMeasurement, + "lossMeasurement", c.lossMeasurement, "numPacketGroups", len(c.packetGroups), "estimatedAvailableChannelCapacity", c.estimatedAvailableChannelCapacity, "estimateTrafficStats", c.estimateTrafficStats, } if c.queuingRegion == queuingRegionJQR { var minGroupIdx, maxGroupIdx int - if c.jqrQDMeasurement != nil { - minGroupIdx, maxGroupIdx = c.jqrQDMeasurement.GroupRange() - } else if c.jqrLossMeasurement != nil { - minGroupIdx, maxGroupIdx = c.jqrLossMeasurement.GroupRange() + switch c.congestionReason { + case congestionReasonQueuingDelay: + minGroupIdx, maxGroupIdx = c.qdMeasurement.GroupRange() + case congestionReasonLoss: + minGroupIdx, maxGroupIdx = c.lossMeasurement.GroupRange() } loggingFields = append( loggingFields, - "jqrQDMeasurement", c.jqrQDMeasurement, - "jqrLossMeasurement", c.jqrLossMeasurement, "contributingGroups", logger.ObjectSlice(c.packetGroups[minGroupIdx:maxGroupIdx+1]), ) } diff --git a/pkg/sfu/bwe/sendsidebwe/traffic_stats.go b/pkg/sfu/bwe/sendsidebwe/traffic_stats.go index 9158b22d6..480878ac6 100644 --- a/pkg/sfu/bwe/sendsidebwe/traffic_stats.go +++ b/pkg/sfu/bwe/sendsidebwe/traffic_stats.go @@ -26,14 +26,16 @@ import ( type WeightedLossConfig struct { MinDurationForLossValidity time.Duration `yaml:"min_duration_for_loss_validity,omitempty"` - MinPPSForLossValidity int `yaml:"min_pps_for_loss_validity,omitempty"` + BaseDuration time.Duration `yaml:"base_duration,omitempty"` + BasePPS int `yaml:"base_pps,omitempty"` LossPenaltyFactor float64 `yaml:"loss_penalty_factor,omitempty"` } var ( defaultWeightedLossConfig = WeightedLossConfig{ - MinDurationForLossValidity: 250 * time.Millisecond, - MinPPSForLossValidity: 30, + MinDurationForLossValidity: 100 * time.Millisecond, + BaseDuration: 500 * time.Millisecond, + BasePPS: 30, LossPenaltyFactor: 0.25, } ) @@ -119,7 +121,14 @@ func (ts *trafficStats) WeightedLoss() float64 { totalPackets := float64(ts.lostPackets + ts.ackedPackets) pps := totalPackets * 1e6 / float64(durationMicro) - if int(pps) < ts.params.Config.MinPPSForLossValidity { + + // longer duration, i. e. more time resolution, lower pps is acceptable as the measurement is more stable + deltaDuration := time.Duration(durationMicro*1000) - ts.params.Config.BaseDuration + if deltaDuration < 0 { + deltaDuration = 0 + } + threshold := math.Exp(-deltaDuration.Seconds()) * float64(ts.params.Config.BasePPS) + if pps < threshold { return 0.0 } diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 2abd9f211..62d922877 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -727,17 +727,6 @@ func (s *StreamAllocator) handleSignalPeriodicPing(Event) { } } - // try up allocations in case there is available headroom, - // it is possible that a previous up allocation is waiting to settle, - // so even if there was headroom available while doing previous up allocation - // it may not have used up all available headroom, - // check before probing again as this could use available headroom and - // up allocate all tracks to their desired layers, that would avoid - // an unnecessary probe cluster - if s.state == streamAllocatorStateDeficient { - s.maybeBoostDeficientTracks() - } - // probe if necessary and timing is right if s.state == streamAllocatorStateDeficient { s.maybeProbe() @@ -1439,11 +1428,11 @@ func updateStreamStateChange(track *Track, allocation sfu.VideoAllocation, updat } func isHoldableCongestionState(bweCongestionState bwe.CongestionState) bool { - return bweCongestionState == bwe.CongestionStateEarlyWarning || bweCongestionState == bwe.CongestionStateEarlyWarningHangover + return bweCongestionState == bwe.CongestionStateEarlyWarning } func isDeficientCongestionState(bweCongestionState bwe.CongestionState) bool { - return bweCongestionState == bwe.CongestionStateCongested || bweCongestionState == bwe.CongestionStateCongestedHangover + return bweCongestionState == bwe.CongestionStateCongested } // ------------------------------------------------