From 57b3dfdcf41533d4ced4860a464f7e968d65373e Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 11 Nov 2024 13:27:49 +0530 Subject: [PATCH] Loss based congestion signal detector. (#3168) * Loss based congestion signal detector. It uses the same approach of thresholding + duration to detect region of operation and further derive early warning/congested states. A gutter is used for indeterminate region just like the queuing delay based case. The two approaches (queuing delay and loss) are treated independently, i. e. packet groups have to satify the same type of condition (queuing delay OR loss) to build up congestion. The aggregate congestion signal is triggered if either one triggers. Maybe, there is a way to accept hybrid signalling (i. e. each group satisfying either threhsold adds up to congestion signal detection), but needs more experimentation. For now, keeping them separate. * apply max threshold * clean up * spelling --- pkg/sfu/sendsidebwe/congestion_detector.go | 147 ++++++++++++++++----- pkg/sfu/sendsidebwe/packet_group.go | 44 ++++-- 2 files changed, 146 insertions(+), 45 deletions(-) diff --git a/pkg/sfu/sendsidebwe/congestion_detector.go b/pkg/sfu/sendsidebwe/congestion_detector.go index d82f2b9d6..a0440be2a 100644 --- a/pkg/sfu/sendsidebwe/congestion_detector.go +++ b/pkg/sfu/sendsidebwe/congestion_detector.go @@ -20,19 +20,63 @@ type CongestionSignalConfig struct { } var ( - DefaultEarlyWarningCongestionSignalConfig = CongestionSignalConfig{ + DefaultQueuingDelayEarlyWarningCongestionSignalConfig = CongestionSignalConfig{ MinNumberOfGroups: 1, MinDuration: 100 * time.Millisecond, } - DefaultCongestedCongestionSignalConfig = CongestionSignalConfig{ + DefaultLossEarlyWarningCongestionSignalConfig = CongestionSignalConfig{ + MinNumberOfGroups: 2, + MinDuration: 200 * time.Millisecond, + } + + DefaultQueuingDelayCongestedCongestionSignalConfig = CongestionSignalConfig{ MinNumberOfGroups: 3, MinDuration: 300 * time.Millisecond, } + + DefaultLossCongestedCongestionSignalConfig = CongestionSignalConfig{ + MinNumberOfGroups: 5, + MinDuration: 600 * time.Millisecond, + } ) // ------------------------------------------------------------------------------- +type congestionSignalCalculator[T int64 | float64] struct { + thresholdMin T + thresholdMax T + isSealed bool + numGroups int + duration int64 +} + +func (c *congestionSignalCalculator[T]) processSample(val T, duration int64) { + if c.isSealed { + return + } + + if val < c.thresholdMin { + // any DQR group breaks the continuity + c.isSealed = true + return + } + + // INDETERMINATE group is treated as a no-op + + // JQR group builds up congestion signal + if val > c.thresholdMax { + c.numGroups++ + c.duration += duration + } +} + +func (c *congestionSignalCalculator[T]) isTriggered(config CongestionSignalConfig) bool { + return c.numGroups >= config.MinNumberOfGroups && c.duration >= config.MinDuration.Microseconds() +} + +// ------------------------------------------------------------------------------- + type CongestionDetectorConfig struct { PacketGroup PacketGroupConfig `yaml:"packet_group,omitempty"` PacketGroupMaxAge time.Duration `yaml:"packet_group_max_age,omitempty"` @@ -40,10 +84,16 @@ type CongestionDetectorConfig struct { JQRMinDelay time.Duration `yaml:"jqr_min_delay,omitempty"` DQRMaxDelay time.Duration `yaml:"dqr_max_delay,omitempty"` - EarlyWarning CongestionSignalConfig `yaml:"early_warning,omitempty"` - EarlyWarningHangover time.Duration `yaml:"early_warning_hangover,omitempty"` - Congested CongestionSignalConfig `yaml:"congested,omitempty"` - CongestedHangover time.Duration `yaml:"congested_hangover,omitempty"` + JQRMinLoss float64 `yaml:"jqr_min_loss,omitempty"` + DQRMaxLoss float64 `yaml:"dqr_max_loss,omitempty"` + + QueuingDelayEarlyWarning CongestionSignalConfig `yaml:"queuing_delay_early_warning,omitempty"` + LossEarlyWarning CongestionSignalConfig `yaml:"loss_early_warning,omitempty"` + EarlyWarningHangover time.Duration `yaml:"early_warning_hangover,omitempty"` + + QueuingDelayCongested CongestionSignalConfig `yaml:"queuing_delay_congested,omitempty"` + LossCongested CongestionSignalConfig `yaml:"loss_congested,omitempty"` + CongestedHangover time.Duration `yaml:"congested_hangover,omitempty"` RateMeasurementWindowFullnessMin float64 `yaml:"rate_measurement_window_fullness_min,omitempty"` RateMeasurementWindowDurationMin time.Duration `yaml:"rate_measurement_window_duration_min,omitempty"` @@ -70,9 +120,13 @@ var ( PacketGroupMaxAge: 30 * time.Second, JQRMinDelay: 15 * time.Millisecond, DQRMaxDelay: 5 * time.Millisecond, - EarlyWarning: DefaultEarlyWarningCongestionSignalConfig, + JQRMinLoss: 0.15, + DQRMaxLoss: 0.05, + QueuingDelayEarlyWarning: DefaultQueuingDelayEarlyWarningCongestionSignalConfig, + LossEarlyWarning: DefaultLossEarlyWarningCongestionSignalConfig, EarlyWarningHangover: 500 * time.Millisecond, - Congested: DefaultCongestedCongestionSignalConfig, + QueuingDelayCongested: DefaultQueuingDelayCongestedCongestionSignalConfig, + LossCongested: DefaultLossCongestedCongestionSignalConfig, CongestedHangover: 3 * time.Second, RateMeasurementWindowFullnessMin: 0.8, RateMeasurementWindowDurationMin: 800 * time.Millisecond, @@ -151,12 +205,14 @@ func (c *congestionDetector) GetCongestionState() CongestionState { return c.congestionState } -func (c *congestionDetector) updateCongestionState(state CongestionState) { +func (c *congestionDetector) updateCongestionState(state CongestionState, reason string, oldestContributingGroup int) { c.lock.Lock() c.params.Logger.Infow( "congestion state change", "from", c.congestionState, "to", state, + "reason", reason, + "contributingGroups", logger.ObjectSlice(c.packetGroups[oldestContributingGroup:]), "estimatedAvailableChannelCapacity", c.estimatedAvailableChannelCapacity, ) @@ -221,51 +277,74 @@ func (c *congestionDetector) prunePacketGroups() { } } -func (c *congestionDetector) isCongestionSignalTriggered() (bool, bool) { +func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool, string, int) { earlyWarningTriggered := false - congestedTriggered := false + earlyWarningReason := "" - numGroups := 0 - duration := int64(0) - for idx := len(c.packetGroups) - 1; idx >= 0; idx-- { + congestedTriggered := false + congestedReason := "" + + qd := &congestionSignalCalculator[int64]{ + thresholdMin: c.params.Config.JQRMinDelay.Microseconds(), + thresholdMax: c.params.Config.DQRMaxDelay.Microseconds(), + } + loss := &congestionSignalCalculator[float64]{ + thresholdMin: c.params.Config.JQRMinLoss, + thresholdMax: c.params.Config.DQRMaxLoss, + } + + var idx int + for idx = len(c.packetGroups) - 1; idx >= 0; idx-- { pg := c.packetGroups[idx] - pqd, ok := pg.PropagatedQueuingDelay() - if !ok { + pqd, pqdOk := pg.PropagatedQueuingDelay() + lr, lrOk := pg.LossRatio() + if !pqdOk && !lrOk { continue } - if pqd > c.params.Config.JQRMinDelay.Microseconds() { - // JQR group builds up congestion signal - numGroups++ - duration += pg.SendDuration() + // `queueing delay` and `loss` based congestion signals are determined independently, + // i. e. one packet group triggering `queueing delay` and another group triggering + // `loss` will not combine to trigger the aggregate congestion signal + sendDuration := pg.SendDuration() + if pqdOk { + qd.processSample(pqd, sendDuration) + } + if lrOk { + loss.processSample(lr, sendDuration) } - // INDETERMINATE group is treated as a no-op - - if pqd < c.params.Config.DQRMaxDelay.Microseconds() { - // any DQR group breaks the continuity - return earlyWarningTriggered, congestedTriggered - } - - if numGroups >= c.params.Config.EarlyWarning.MinNumberOfGroups && duration >= c.params.Config.EarlyWarning.MinDuration.Microseconds() { + if !earlyWarningTriggered && qd.isTriggered(c.params.Config.QueuingDelayEarlyWarning) { earlyWarningTriggered = true + earlyWarningReason = "queuing-delay" } - if numGroups >= c.params.Config.Congested.MinNumberOfGroups && duration >= c.params.Config.Congested.MinDuration.Microseconds() { + if !earlyWarningTriggered && loss.isTriggered(c.params.Config.LossEarlyWarning) { + earlyWarningTriggered = true + earlyWarningReason = "loss" + } + + if !congestedTriggered && qd.isTriggered(c.params.Config.QueuingDelayCongested) { congestedTriggered = true + congestedReason = "queuing-delay" } + if !congestedTriggered && loss.isTriggered(c.params.Config.LossCongested) { + congestedTriggered = true + congestedReason = "loss" + } + if earlyWarningTriggered && congestedTriggered { break } } - return earlyWarningTriggered, congestedTriggered + return earlyWarningTriggered, earlyWarningReason, congestedTriggered, congestedReason, idx } func (c *congestionDetector) congestionDetectionStateMachine() { state := c.GetCongestionState() newState := state + reason := "" - earlyWarningTriggered, congestedTriggered := c.isCongestionSignalTriggered() + earlyWarningTriggered, earlyWarningReason, congestedTriggered, congestedReason, oldestContributingGroup := c.isCongestionSignalTriggered() switch state { case CongestionStateNone: @@ -274,11 +353,13 @@ func (c *congestionDetector) congestionDetectionStateMachine() { } if earlyWarningTriggered { newState = CongestionStateEarlyWarning + reason = earlyWarningReason } case CongestionStateEarlyWarning: if congestedTriggered { newState = CongestionStateCongested + reason = congestedReason } else if !earlyWarningTriggered { newState = CongestionStateEarlyWarningHangover } @@ -289,6 +370,7 @@ func (c *congestionDetector) congestionDetectionStateMachine() { } if earlyWarningTriggered { newState = CongestionStateEarlyWarning + reason = earlyWarningReason } else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.EarlyWarningHangover { newState = CongestionStateNone } @@ -304,6 +386,7 @@ func (c *congestionDetector) congestionDetectionStateMachine() { } if earlyWarningTriggered { newState = CongestionStateEarlyWarning + reason = earlyWarningReason } else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.CongestedHangover { newState = CongestionStateNone } @@ -314,7 +397,7 @@ func (c *congestionDetector) congestionDetectionStateMachine() { // update after running the above estimate as state change callback includes the estimated available channel capacity if newState != state { c.congestionStateSwitchedAt = mono.Now() - c.updateCongestionState(newState) + c.updateCongestionState(newState, reason, oldestContributingGroup) } } diff --git a/pkg/sfu/sendsidebwe/packet_group.go b/pkg/sfu/sendsidebwe/packet_group.go index 636eef189..dcee61017 100644 --- a/pkg/sfu/sendsidebwe/packet_group.go +++ b/pkg/sfu/sendsidebwe/packet_group.go @@ -261,6 +261,22 @@ func (p *packetGroup) Traffic() (int64, int64, int, float64) { return p.minSendTime, p.maxSendTime - p.minSendTime, numBytes, fullness } +func (p *packetGroup) LossRatio() (float64, bool) { + if !p.isFinalized { + return 0.0, false + } + + return p.lossRatio() +} + +func (p *packetGroup) lossRatio() (float64, bool) { + lostPackets := p.lost.numPackets() + totalPackets := float64(lostPackets + p.acked.numPackets()) + lossRatio := float64(lostPackets) / totalPackets + // indeterminate if not enough packets, the second return value will be false if indeterminate + return lossRatio, totalPackets >= float64(p.params.Config.MinPackets)*p.params.Config.LossPenaltyMinPacketsRatio +} + func (p *packetGroup) MarshalLogObject(e zapcore.ObjectEncoder) error { if p == nil { return nil @@ -276,6 +292,8 @@ func (p *packetGroup) MarshalLogObject(e zapcore.ObjectEncoder) error { recvDuration := time.Duration((p.maxRecvTime - p.minRecvTime) * 1000) e.AddDuration("recvDuration", recvDuration) + e.AddUint64("minSequenceNumber", p.minSequenceNumber) + e.AddUint64("maxSequenceNumber", p.maxSequenceNumber) e.AddObject("acked", p.acked) e.AddObject("lost", p.lost) @@ -295,8 +313,13 @@ func (p *packetGroup) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddInt64("aggregateRecvDelta", p.aggregateRecvDelta) e.AddInt64("queuingDelay", p.queuingDelay) e.AddInt64("groupDelay", p.aggregateRecvDelta-p.aggregateSendDelta) - e.AddFloat64("lossRatio", float64(p.lost.numPackets())/float64(p.acked.numPackets()+p.lost.numPackets())) + + lossRatio, lossRatioValid := p.lossRatio() + e.AddFloat64("lossRatio", lossRatio) + e.AddBool("lossRatioValid", lossRatioValid) + e.AddInt64("lossPenalty", p.getLossPenalty()) + capturedTrafficRatio := p.CapturedTrafficRatio() e.AddFloat64("capturedTrafficRatio", capturedTrafficRatio) e.AddFloat64("estimatedAvailableChannelCapacity", sendBitrate*capturedTrafficRatio) @@ -318,21 +341,16 @@ func (p *packetGroup) inGroup(sequenceNumber uint64) error { } func (p *packetGroup) getLossPenalty() int64 { - if p.aggregateRecvDelta == 0 { - return 0 - } - - lostPackets := p.lost.numPackets() - totalPackets := float64(lostPackets + p.acked.numPackets()) - if totalPackets < float64(p.params.Config.MinPackets)*p.params.Config.LossPenaltyMinPacketsRatio { - return 0 - } - // Log10 is used to give higher weight for the same loss ratio at higher packet rates, // for e.g. with a penalty factor of 0.25 // - 10% loss at 20 total packets = 0.1 * log10(20) * 0.25 = 0.032 // - 10% loss at 100 total packets = 0.1 * log10(100) * 0.25 = 0.05 // - 10% loss at 1000 total packets = 0.1 * log10(100) * 0.25 = 0.075 - lossRatio := float64(lostPackets) / totalPackets - return int64(float64(p.aggregateRecvDelta) * lossRatio * math.Log10(totalPackets) * p.params.Config.LossPenaltyFactor) + lossRatio, _ := p.lossRatio() + return int64( + float64(p.aggregateRecvDelta) * + lossRatio * + math.Log10(float64(p.acked.numPackets()+p.lost.numPackets())) * + p.params.Config.LossPenaltyFactor, + ) }