diff --git a/pkg/sfu/sendsidebwe/congestion_detector.go b/pkg/sfu/sendsidebwe/congestion_detector.go index d82f2b9d6..a0440be2a 100644 --- a/pkg/sfu/sendsidebwe/congestion_detector.go +++ b/pkg/sfu/sendsidebwe/congestion_detector.go @@ -20,19 +20,63 @@ type CongestionSignalConfig struct { } var ( - DefaultEarlyWarningCongestionSignalConfig = CongestionSignalConfig{ + DefaultQueuingDelayEarlyWarningCongestionSignalConfig = CongestionSignalConfig{ MinNumberOfGroups: 1, MinDuration: 100 * time.Millisecond, } - DefaultCongestedCongestionSignalConfig = CongestionSignalConfig{ + DefaultLossEarlyWarningCongestionSignalConfig = CongestionSignalConfig{ + MinNumberOfGroups: 2, + MinDuration: 200 * time.Millisecond, + } + + DefaultQueuingDelayCongestedCongestionSignalConfig = CongestionSignalConfig{ MinNumberOfGroups: 3, MinDuration: 300 * time.Millisecond, } + + DefaultLossCongestedCongestionSignalConfig = CongestionSignalConfig{ + MinNumberOfGroups: 5, + MinDuration: 600 * time.Millisecond, + } ) // ------------------------------------------------------------------------------- +type congestionSignalCalculator[T int64 | float64] struct { + thresholdMin T + thresholdMax T + isSealed bool + numGroups int + duration int64 +} + +func (c *congestionSignalCalculator[T]) processSample(val T, duration int64) { + if c.isSealed { + return + } + + if val < c.thresholdMin { + // any DQR group breaks the continuity + c.isSealed = true + return + } + + // INDETERMINATE group is treated as a no-op + + // JQR group builds up congestion signal + if val > c.thresholdMax { + c.numGroups++ + c.duration += duration + } +} + +func (c *congestionSignalCalculator[T]) isTriggered(config CongestionSignalConfig) bool { + return c.numGroups >= config.MinNumberOfGroups && c.duration >= config.MinDuration.Microseconds() +} + +// ------------------------------------------------------------------------------- + type CongestionDetectorConfig struct { PacketGroup PacketGroupConfig `yaml:"packet_group,omitempty"` PacketGroupMaxAge time.Duration `yaml:"packet_group_max_age,omitempty"` @@ -40,10 +84,16 @@ type CongestionDetectorConfig struct { JQRMinDelay time.Duration `yaml:"jqr_min_delay,omitempty"` DQRMaxDelay time.Duration `yaml:"dqr_max_delay,omitempty"` - EarlyWarning CongestionSignalConfig `yaml:"early_warning,omitempty"` - EarlyWarningHangover time.Duration `yaml:"early_warning_hangover,omitempty"` - Congested CongestionSignalConfig `yaml:"congested,omitempty"` - CongestedHangover time.Duration `yaml:"congested_hangover,omitempty"` + JQRMinLoss float64 `yaml:"jqr_min_loss,omitempty"` + DQRMaxLoss float64 `yaml:"dqr_max_loss,omitempty"` + + QueuingDelayEarlyWarning CongestionSignalConfig `yaml:"queuing_delay_early_warning,omitempty"` + LossEarlyWarning CongestionSignalConfig `yaml:"loss_early_warning,omitempty"` + EarlyWarningHangover time.Duration `yaml:"early_warning_hangover,omitempty"` + + QueuingDelayCongested CongestionSignalConfig `yaml:"queuing_delay_congested,omitempty"` + LossCongested CongestionSignalConfig `yaml:"loss_congested,omitempty"` + CongestedHangover time.Duration `yaml:"congested_hangover,omitempty"` RateMeasurementWindowFullnessMin float64 `yaml:"rate_measurement_window_fullness_min,omitempty"` RateMeasurementWindowDurationMin time.Duration `yaml:"rate_measurement_window_duration_min,omitempty"` @@ -70,9 +120,13 @@ var ( PacketGroupMaxAge: 30 * time.Second, JQRMinDelay: 15 * time.Millisecond, DQRMaxDelay: 5 * time.Millisecond, - EarlyWarning: DefaultEarlyWarningCongestionSignalConfig, + JQRMinLoss: 0.15, + DQRMaxLoss: 0.05, + QueuingDelayEarlyWarning: DefaultQueuingDelayEarlyWarningCongestionSignalConfig, + LossEarlyWarning: DefaultLossEarlyWarningCongestionSignalConfig, EarlyWarningHangover: 500 * time.Millisecond, - Congested: DefaultCongestedCongestionSignalConfig, + QueuingDelayCongested: DefaultQueuingDelayCongestedCongestionSignalConfig, + LossCongested: DefaultLossCongestedCongestionSignalConfig, CongestedHangover: 3 * time.Second, RateMeasurementWindowFullnessMin: 0.8, RateMeasurementWindowDurationMin: 800 * time.Millisecond, @@ -151,12 +205,14 @@ func (c *congestionDetector) GetCongestionState() CongestionState { return c.congestionState } -func (c *congestionDetector) updateCongestionState(state CongestionState) { +func (c *congestionDetector) updateCongestionState(state CongestionState, reason string, oldestContributingGroup int) { c.lock.Lock() c.params.Logger.Infow( "congestion state change", "from", c.congestionState, "to", state, + "reason", reason, + "contributingGroups", logger.ObjectSlice(c.packetGroups[oldestContributingGroup:]), "estimatedAvailableChannelCapacity", c.estimatedAvailableChannelCapacity, ) @@ -221,51 +277,74 @@ func (c *congestionDetector) prunePacketGroups() { } } -func (c *congestionDetector) isCongestionSignalTriggered() (bool, bool) { +func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool, string, int) { earlyWarningTriggered := false - congestedTriggered := false + earlyWarningReason := "" - numGroups := 0 - duration := int64(0) - for idx := len(c.packetGroups) - 1; idx >= 0; idx-- { + congestedTriggered := false + congestedReason := "" + + qd := &congestionSignalCalculator[int64]{ + thresholdMin: c.params.Config.JQRMinDelay.Microseconds(), + thresholdMax: c.params.Config.DQRMaxDelay.Microseconds(), + } + loss := &congestionSignalCalculator[float64]{ + thresholdMin: c.params.Config.JQRMinLoss, + thresholdMax: c.params.Config.DQRMaxLoss, + } + + var idx int + for idx = len(c.packetGroups) - 1; idx >= 0; idx-- { pg := c.packetGroups[idx] - pqd, ok := pg.PropagatedQueuingDelay() - if !ok { + pqd, pqdOk := pg.PropagatedQueuingDelay() + lr, lrOk := pg.LossRatio() + if !pqdOk && !lrOk { continue } - if pqd > c.params.Config.JQRMinDelay.Microseconds() { - // JQR group builds up congestion signal - numGroups++ - duration += pg.SendDuration() + // `queueing delay` and `loss` based congestion signals are determined independently, + // i. e. one packet group triggering `queueing delay` and another group triggering + // `loss` will not combine to trigger the aggregate congestion signal + sendDuration := pg.SendDuration() + if pqdOk { + qd.processSample(pqd, sendDuration) + } + if lrOk { + loss.processSample(lr, sendDuration) } - // INDETERMINATE group is treated as a no-op - - if pqd < c.params.Config.DQRMaxDelay.Microseconds() { - // any DQR group breaks the continuity - return earlyWarningTriggered, congestedTriggered - } - - if numGroups >= c.params.Config.EarlyWarning.MinNumberOfGroups && duration >= c.params.Config.EarlyWarning.MinDuration.Microseconds() { + if !earlyWarningTriggered && qd.isTriggered(c.params.Config.QueuingDelayEarlyWarning) { earlyWarningTriggered = true + earlyWarningReason = "queuing-delay" } - if numGroups >= c.params.Config.Congested.MinNumberOfGroups && duration >= c.params.Config.Congested.MinDuration.Microseconds() { + if !earlyWarningTriggered && loss.isTriggered(c.params.Config.LossEarlyWarning) { + earlyWarningTriggered = true + earlyWarningReason = "loss" + } + + if !congestedTriggered && qd.isTriggered(c.params.Config.QueuingDelayCongested) { congestedTriggered = true + congestedReason = "queuing-delay" } + if !congestedTriggered && loss.isTriggered(c.params.Config.LossCongested) { + congestedTriggered = true + congestedReason = "loss" + } + if earlyWarningTriggered && congestedTriggered { break } } - return earlyWarningTriggered, congestedTriggered + return earlyWarningTriggered, earlyWarningReason, congestedTriggered, congestedReason, idx } func (c *congestionDetector) congestionDetectionStateMachine() { state := c.GetCongestionState() newState := state + reason := "" - earlyWarningTriggered, congestedTriggered := c.isCongestionSignalTriggered() + earlyWarningTriggered, earlyWarningReason, congestedTriggered, congestedReason, oldestContributingGroup := c.isCongestionSignalTriggered() switch state { case CongestionStateNone: @@ -274,11 +353,13 @@ func (c *congestionDetector) congestionDetectionStateMachine() { } if earlyWarningTriggered { newState = CongestionStateEarlyWarning + reason = earlyWarningReason } case CongestionStateEarlyWarning: if congestedTriggered { newState = CongestionStateCongested + reason = congestedReason } else if !earlyWarningTriggered { newState = CongestionStateEarlyWarningHangover } @@ -289,6 +370,7 @@ func (c *congestionDetector) congestionDetectionStateMachine() { } if earlyWarningTriggered { newState = CongestionStateEarlyWarning + reason = earlyWarningReason } else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.EarlyWarningHangover { newState = CongestionStateNone } @@ -304,6 +386,7 @@ func (c *congestionDetector) congestionDetectionStateMachine() { } if earlyWarningTriggered { newState = CongestionStateEarlyWarning + reason = earlyWarningReason } else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.CongestedHangover { newState = CongestionStateNone } @@ -314,7 +397,7 @@ func (c *congestionDetector) congestionDetectionStateMachine() { // update after running the above estimate as state change callback includes the estimated available channel capacity if newState != state { c.congestionStateSwitchedAt = mono.Now() - c.updateCongestionState(newState) + c.updateCongestionState(newState, reason, oldestContributingGroup) } } diff --git a/pkg/sfu/sendsidebwe/packet_group.go b/pkg/sfu/sendsidebwe/packet_group.go index 636eef189..dcee61017 100644 --- a/pkg/sfu/sendsidebwe/packet_group.go +++ b/pkg/sfu/sendsidebwe/packet_group.go @@ -261,6 +261,22 @@ func (p *packetGroup) Traffic() (int64, int64, int, float64) { return p.minSendTime, p.maxSendTime - p.minSendTime, numBytes, fullness } +func (p *packetGroup) LossRatio() (float64, bool) { + if !p.isFinalized { + return 0.0, false + } + + return p.lossRatio() +} + +func (p *packetGroup) lossRatio() (float64, bool) { + lostPackets := p.lost.numPackets() + totalPackets := float64(lostPackets + p.acked.numPackets()) + lossRatio := float64(lostPackets) / totalPackets + // indeterminate if not enough packets, the second return value will be false if indeterminate + return lossRatio, totalPackets >= float64(p.params.Config.MinPackets)*p.params.Config.LossPenaltyMinPacketsRatio +} + func (p *packetGroup) MarshalLogObject(e zapcore.ObjectEncoder) error { if p == nil { return nil @@ -276,6 +292,8 @@ func (p *packetGroup) MarshalLogObject(e zapcore.ObjectEncoder) error { recvDuration := time.Duration((p.maxRecvTime - p.minRecvTime) * 1000) e.AddDuration("recvDuration", recvDuration) + e.AddUint64("minSequenceNumber", p.minSequenceNumber) + e.AddUint64("maxSequenceNumber", p.maxSequenceNumber) e.AddObject("acked", p.acked) e.AddObject("lost", p.lost) @@ -295,8 +313,13 @@ func (p *packetGroup) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddInt64("aggregateRecvDelta", p.aggregateRecvDelta) e.AddInt64("queuingDelay", p.queuingDelay) e.AddInt64("groupDelay", p.aggregateRecvDelta-p.aggregateSendDelta) - e.AddFloat64("lossRatio", float64(p.lost.numPackets())/float64(p.acked.numPackets()+p.lost.numPackets())) + + lossRatio, lossRatioValid := p.lossRatio() + e.AddFloat64("lossRatio", lossRatio) + e.AddBool("lossRatioValid", lossRatioValid) + e.AddInt64("lossPenalty", p.getLossPenalty()) + capturedTrafficRatio := p.CapturedTrafficRatio() e.AddFloat64("capturedTrafficRatio", capturedTrafficRatio) e.AddFloat64("estimatedAvailableChannelCapacity", sendBitrate*capturedTrafficRatio) @@ -318,21 +341,16 @@ func (p *packetGroup) inGroup(sequenceNumber uint64) error { } func (p *packetGroup) getLossPenalty() int64 { - if p.aggregateRecvDelta == 0 { - return 0 - } - - lostPackets := p.lost.numPackets() - totalPackets := float64(lostPackets + p.acked.numPackets()) - if totalPackets < float64(p.params.Config.MinPackets)*p.params.Config.LossPenaltyMinPacketsRatio { - return 0 - } - // Log10 is used to give higher weight for the same loss ratio at higher packet rates, // for e.g. with a penalty factor of 0.25 // - 10% loss at 20 total packets = 0.1 * log10(20) * 0.25 = 0.032 // - 10% loss at 100 total packets = 0.1 * log10(100) * 0.25 = 0.05 // - 10% loss at 1000 total packets = 0.1 * log10(100) * 0.25 = 0.075 - lossRatio := float64(lostPackets) / totalPackets - return int64(float64(p.aggregateRecvDelta) * lossRatio * math.Log10(totalPackets) * p.params.Config.LossPenaltyFactor) + lossRatio, _ := p.lossRatio() + return int64( + float64(p.aggregateRecvDelta) * + lossRatio * + math.Log10(float64(p.acked.numPackets()+p.lost.numPackets())) * + p.params.Config.LossPenaltyFactor, + ) }