diff --git a/pkg/sfu/bwe/sendsidebwe/congestion_detector.go b/pkg/sfu/bwe/sendsidebwe/congestion_detector.go index 36c00b863..c4f49ee8f 100644 --- a/pkg/sfu/bwe/sendsidebwe/congestion_detector.go +++ b/pkg/sfu/bwe/sendsidebwe/congestion_detector.go @@ -135,6 +135,28 @@ func (q queuingRegion) String() string { // ------------------------------------------------------------------------------- +type congestionReason int + +const ( + congestionReasonNone congestionReason = iota + congestionReasonQueuingDelay + congestionReasonLoss +) + +func (c congestionReason) String() string { + switch c { + case congestionReasonNone: + return "NONE" + case congestionReasonQueuingDelay: + return "QUEUING_DELAY" + case congestionReasonLoss: + return "LOSS" + default: + return fmt.Sprintf("%d", int(c)) + } +} + +// ------------------------------------------------------------------------------- type qdMeasurement struct { config CongestionSignalConfig jqrMin int64 @@ -203,8 +225,12 @@ func (q *qdMeasurement) IsSealed() bool { return q.isSealed } -func (q *qdMeasurement) Result() (queuingRegion, int, int) { - return q.queuingRegion, q.minGroupIdx, q.maxGroupIdx +func (q *qdMeasurement) QueuingRegion() queuingRegion { + return q.queuingRegion +} + +func (q *qdMeasurement) GroupRange() (int, int) { + return max(0, q.minGroupIdx), max(0, q.maxGroupIdx) } func (q *qdMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error { @@ -292,8 +318,12 @@ func (l *lossMeasurement) IsSealed() bool { return l.isSealed } -func (l *lossMeasurement) Result() (queuingRegion, int, int) { - return l.queuingRegion, l.minGroupIdx, l.maxGroupIdx +func (l *lossMeasurement) QueuingRegion() queuingRegion { + return l.queuingRegion +} + +func (l *lossMeasurement) GroupRange() (int, int) { + return max(0, l.minGroupIdx), max(0, l.maxGroupIdx) } func (l *lossMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error { @@ -339,6 +369,8 @@ type CongestionDetectorConfig struct { CongestedCTRTrend ccutils.TrendDetectorConfig `yaml:"congested_ctr_trend,omitempty"` CongestedCTREpsilon float64 `yaml:"congested_ctr_epsilon,omitempty"` CongestedPacketGroup PacketGroupConfig `yaml:"congested_packet_group,omitempty"` + + EstimationWindowDuration time.Duration `yaml:"estimaton_window_duration,omitempty"` } var ( @@ -382,6 +414,8 @@ var ( CongestedCTRTrend: defaultTrendDetectorConfigCongestedCTR, CongestedCTREpsilon: 0.05, CongestedPacketGroup: defaultCongestedPacketGroupConfig, + + EstimationWindowDuration: time.Second, } ) @@ -408,13 +442,20 @@ type congestionDetector struct { probeRegulator *ccutils.ProbeRegulator estimatedAvailableChannelCapacity int64 - congestionState bwe.CongestionState - congestionStateSwitchedAt time.Time + estimateTrafficStats *trafficStats + + congestionState bwe.CongestionState + congestionStateSwitchedAt time.Time congestedCTRTrend *ccutils.TrendDetector[float64] congestedTrafficStats *trafficStats congestedPacketGroup *packetGroup + queuingRegion queuingRegion + congestionReason congestionReason + jqrQDMeasurement *qdMeasurement + jqrLossMeasurement *lossMeasurement + bweListener bwe.BWEListener } @@ -434,16 +475,27 @@ func (c *congestionDetector) Reset() { defer c.lock.Unlock() c.rtt = bwe.DefaultRTT + c.packetGroups = nil + c.probePacketGroup = nil c.probeRegulator = ccutils.NewProbeRegulator(ccutils.ProbeRegulatorParams{ Config: c.params.Config.ProbeRegulator, Logger: c.params.Logger, }) + c.estimatedAvailableChannelCapacity = 100_000_000 + c.estimateTrafficStats = nil + c.congestionState = bwe.CongestionStateNone c.congestionStateSwitchedAt = mono.Now() + c.clearCTRTrend() + + c.queuingRegion = queuingRegionIndeterminate + c.congestionReason = congestionReasonNone + c.jqrQDMeasurement = nil + c.jqrLossMeasurement = nil } func (c *congestionDetector) SetBWEListener(bweListener bwe.BWEListener) { @@ -735,11 +787,11 @@ func (c *congestionDetector) prunePacketGroups() { } } -func (c *congestionDetector) getCongestionSignal( +func (c *congestionDetector) updateCongestionSignal( stage string, qdConfig CongestionSignalConfig, lossConfig CongestionSignalConfig, -) (queuingRegion, string, int, int) { +) { qdMeasurement := newQdMeasurement( qdConfig, c.params.Config.JQRMinDelay.Microseconds(), @@ -765,30 +817,33 @@ func (c *congestionDetector) getCongestionSignal( } } - reason := "" - qr, minGroupIdx, maxGroupIdx := qdMeasurement.Result() - if qr == queuingRegionJQR { - reason = "queuing-delay" + c.congestionReason = congestionReasonNone + c.queuingRegion = qdMeasurement.QueuingRegion() + if c.queuingRegion == queuingRegionJQR { + c.congestionReason = congestionReasonQueuingDelay + c.jqrQDMeasurement = qdMeasurement } else { - qr, minGroupIdx, maxGroupIdx = lossMeasurement.Result() - if qr == queuingRegionJQR { - reason = "loss" + c.jqrQDMeasurement = nil + c.queuingRegion = lossMeasurement.QueuingRegion() + if c.queuingRegion == queuingRegionJQR { + c.congestionReason = congestionReasonLoss + c.jqrLossMeasurement = lossMeasurement + } else { + c.jqrLossMeasurement = nil } } - - return qr, reason, max(0, minGroupIdx), max(0, maxGroupIdx) } -func (c *congestionDetector) getEarlyWarningSignal() (queuingRegion, string, int, int) { - return c.getCongestionSignal( +func (c *congestionDetector) updateEarlyWarningSignal() { + c.updateCongestionSignal( "early-warning", c.params.Config.QueuingDelayEarlyWarning, c.params.Config.LossEarlyWarning, ) } -func (c *congestionDetector) getCongestedSignal() (queuingRegion, string, int, int) { - return c.getCongestionSignal( +func (c *congestionDetector) updateCongestedSignal() { + c.updateCongestionSignal( "congested", c.params.Config.QueuingDelayCongested, c.params.Config.LossCongested, @@ -799,58 +854,53 @@ func (c *congestionDetector) congestionDetectionStateMachine() (bool, bwe.Conges fromState := c.congestionState toState := c.congestionState - var ( - qr queuingRegion - reason string - minGroupIdx, maxGroupIdx int - ) switch fromState { case bwe.CongestionStateNone: - qr, reason, minGroupIdx, maxGroupIdx = c.getEarlyWarningSignal() - if qr == queuingRegionJQR { + c.updateEarlyWarningSignal() + if c.queuingRegion == queuingRegionJQR { toState = bwe.CongestionStateEarlyWarning } case bwe.CongestionStateEarlyWarning: - qr, reason, minGroupIdx, maxGroupIdx = c.getCongestedSignal() - if qr == queuingRegionJQR { + c.updateCongestedSignal() + if c.queuingRegion == queuingRegionJQR { toState = bwe.CongestionStateCongested } else { - qr, reason, minGroupIdx, maxGroupIdx = c.getEarlyWarningSignal() - if qr == queuingRegionDQR { + c.updateEarlyWarningSignal() + if c.queuingRegion == queuingRegionDQR { toState = bwe.CongestionStateEarlyWarningHangover } } case bwe.CongestionStateEarlyWarningHangover: - qr, reason, minGroupIdx, maxGroupIdx = c.getEarlyWarningSignal() - if qr == queuingRegionJQR { + c.updateEarlyWarningSignal() + if c.queuingRegion == queuingRegionJQR { toState = bwe.CongestionStateEarlyWarning } else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.EarlyWarningHangover { toState = bwe.CongestionStateNone } case bwe.CongestionStateCongested: - qr, reason, minGroupIdx, maxGroupIdx = c.getCongestedSignal() - if qr == queuingRegionDQR { + c.updateCongestedSignal() + if c.queuingRegion == queuingRegionDQR { toState = bwe.CongestionStateCongestedHangover } case bwe.CongestionStateCongestedHangover: - qr, reason, minGroupIdx, maxGroupIdx = c.getEarlyWarningSignal() - if qr == queuingRegionJQR { + c.updateEarlyWarningSignal() + if c.queuingRegion == queuingRegionJQR { toState = bwe.CongestionStateEarlyWarning } else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.CongestedHangover { toState = bwe.CongestionStateNone } } - c.estimateAvailableChannelCapacity(minGroupIdx) + c.estimateAvailableChannelCapacity() // update after running the above estimate as state change callback includes the estimated available channel capacity shouldNotify := false if toState != fromState { - fromState, toState = c.updateCongestionState(toState, reason, minGroupIdx, maxGroupIdx) + fromState, toState = c.updateCongestionState(toState) shouldNotify = true } @@ -941,39 +991,71 @@ func (c *congestionDetector) updateCTRTrend(pi *packetInfo, sendDelta, recvDelta } } -func (c *congestionDetector) estimateAvailableChannelCapacity(oldestContributingGroup int) { +func (c *congestionDetector) estimateAvailableChannelCapacity() { if len(c.packetGroups) == 0 || c.congestedCTRTrend != nil || c.probePacketGroup != nil { return } + // when in JQR, use contributing groups, + // else use a time windowed measurement + useWindow := false + isAggValid := true + minGroupIdx := 0 + if c.jqrQDMeasurement != nil { + minGroupIdx, _ = c.jqrQDMeasurement.GroupRange() + } else if c.jqrLossMeasurement != nil { + minGroupIdx, _ = c.jqrLossMeasurement.GroupRange() + } else { + useWindow = true + isAggValid = false + } + agg := newTrafficStats(trafficStatsParams{ Config: c.params.Config.WeightedLoss, Logger: c.params.Logger, }) - for idx := len(c.packetGroups) - 1; idx >= oldestContributingGroup; idx-- { + for idx := len(c.packetGroups) - 1; idx >= minGroupIdx; idx-- { pg := c.packetGroups[idx] if !pg.IsFinalized() { continue } agg.Merge(pg.Traffic()) + if useWindow && agg.Duration() > c.params.Config.EstimationWindowDuration.Microseconds() { + isAggValid = true + break + } } - c.estimatedAvailableChannelCapacity = agg.AcknowledgedBitrate() + if isAggValid { + c.estimatedAvailableChannelCapacity = agg.AcknowledgedBitrate() + c.estimateTrafficStats = agg + } } -func (c *congestionDetector) updateCongestionState(state bwe.CongestionState, reason string, minGroupIdx int, maxGroupIdx int) (bwe.CongestionState, bwe.CongestionState) { +func (c *congestionDetector) updateCongestionState(state bwe.CongestionState) (bwe.CongestionState, bwe.CongestionState) { loggingFields := []any{ "from", c.congestionState, "to", state, - "reason", reason, + "queuingRegion", c.queuingRegion, + "congestionReason", c.congestionReason, "numPacketGroups", len(c.packetGroups), - "minGroupIdx", minGroupIdx, - "maxGroupIdx", maxGroupIdx, "estimatedAvailableChannelCapacity", c.estimatedAvailableChannelCapacity, + "estimateTrafficStats", c.estimateTrafficStats, } - if state == bwe.CongestionStateEarlyWarning || state == bwe.CongestionStateCongested { - loggingFields = append(loggingFields, "contributingGroups", logger.ObjectSlice(c.packetGroups[minGroupIdx:maxGroupIdx+1])) + if c.queuingRegion == queuingRegionJQR { + var minGroupIdx, maxGroupIdx int + if c.jqrQDMeasurement != nil { + minGroupIdx, maxGroupIdx = c.jqrQDMeasurement.GroupRange() + } else if c.jqrLossMeasurement != nil { + minGroupIdx, maxGroupIdx = c.jqrLossMeasurement.GroupRange() + } + loggingFields = append( + loggingFields, + "jqrQDMeasurement", c.jqrQDMeasurement, + "jqrLossMeasurement", c.jqrLossMeasurement, + "contributingGroups", logger.ObjectSlice(c.packetGroups[minGroupIdx:maxGroupIdx+1]), + ) } c.params.Logger.Infow("send side bwe: congestion state change", loggingFields...)