From a825661affd63f6137d081fb5e0aaa64253d1b67 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 12 Nov 2024 09:21:28 +0530 Subject: [PATCH] Use weighted loss to detect loss based congesiton signal. (#3169) * Use weighted loss to detect loss based congesiton signal. - Increase JQR min loss to 0.25. - Use weighted loss ratio so that more packet rate gets higher weightage. At default config, 10 packets in 1/2 second will form a valid packet group for loss based congestion signal consideration. Two packets lost in that group may not be bad. So, bumped up the JQR min loss to 0.25. However, 20% loss (or even much lesser loss) could be problematic if the packet rate is higher (potentially multiple streams affected and there could be a lot of NACKs as a result). So, weight it by packet rate so that higher packet rates enter JQR at lower losses. * WIP * use aggregated loss --- pkg/sfu/sendsidebwe/congestion_detector.go | 328 ++++++++++++++------- pkg/sfu/sendsidebwe/packet_group.go | 147 +++------ pkg/sfu/sendsidebwe/packet_info.go | 14 + pkg/sfu/sendsidebwe/packet_tracker.go | 21 ++ pkg/sfu/sendsidebwe/send_side_bwe.go | 14 + pkg/sfu/sendsidebwe/traffic_stats.go | 156 ++++++++++ pkg/sfu/sendsidebwe/twcc_feedback.go | 14 + 7 files changed, 488 insertions(+), 206 deletions(-) create mode 100644 pkg/sfu/sendsidebwe/traffic_stats.go diff --git a/pkg/sfu/sendsidebwe/congestion_detector.go b/pkg/sfu/sendsidebwe/congestion_detector.go index a0440be2a..1dcd01be5 100644 --- a/pkg/sfu/sendsidebwe/congestion_detector.go +++ b/pkg/sfu/sendsidebwe/congestion_detector.go @@ -1,3 +1,17 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package sendsidebwe import ( @@ -43,36 +57,138 @@ var ( // ------------------------------------------------------------------------------- -type congestionSignalCalculator[T int64 | float64] struct { - thresholdMin T - thresholdMax T - isSealed bool - numGroups int - duration int64 +type qdMeasurement struct { + earlyWarningConfig CongestionSignalConfig + congestedConfig CongestionSignalConfig + jqrMin int64 + dqrMax int64 + + numGroups int + minSendTime int64 + maxSendTime int64 + isSealed bool } -func (c *congestionSignalCalculator[T]) processSample(val T, duration int64) { - if c.isSealed { - return - } - - if val < c.thresholdMin { - // any DQR group breaks the continuity - c.isSealed = true - return - } - - // INDETERMINATE group is treated as a no-op - - // JQR group builds up congestion signal - if val > c.thresholdMax { - c.numGroups++ - c.duration += duration +func newQdMeasurement( + earlyWarningConfig CongestionSignalConfig, + congestedConfig CongestionSignalConfig, + jqrMin int64, + dqrMax int64, +) *qdMeasurement { + return &qdMeasurement{ + earlyWarningConfig: earlyWarningConfig, + congestedConfig: congestedConfig, + jqrMin: jqrMin, + dqrMax: dqrMax, } } -func (c *congestionSignalCalculator[T]) isTriggered(config CongestionSignalConfig) bool { - return c.numGroups >= config.MinNumberOfGroups && c.duration >= config.MinDuration.Microseconds() +func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup) { + if q.isSealed { + return + } + + pqd, pqdOk := pg.PropagatedQueuingDelay() + if !pqdOk { + return + } + + if pqd < q.dqrMax { + // a DQR breaks continuity + q.isSealed = true + return + } + + if pqd > q.jqrMin { + q.numGroups++ + minSendTime, maxSendTime := pg.SendWindow() + if q.minSendTime == 0 || minSendTime < q.minSendTime { + q.minSendTime = minSendTime + } + if maxSendTime > q.maxSendTime { + q.maxSendTime = maxSendTime + } + } + + // can seal if congested config thresholds are met as they are longer + if q.numGroups >= q.congestedConfig.MinNumberOfGroups && (q.maxSendTime-q.minSendTime) >= q.congestedConfig.MinDuration.Microseconds() { + q.isSealed = true + } +} + +func (q *qdMeasurement) IsSealed() bool { + return q.isSealed +} + +func (q *qdMeasurement) IsEarlyWarningTriggered() bool { + return q.numGroups >= q.earlyWarningConfig.MinNumberOfGroups && (q.maxSendTime-q.minSendTime) >= q.earlyWarningConfig.MinDuration.Microseconds() +} + +func (q *qdMeasurement) IsCongestedTriggered() bool { + return q.numGroups >= q.congestedConfig.MinNumberOfGroups && (q.maxSendTime-q.minSendTime) >= q.congestedConfig.MinDuration.Microseconds() +} + +// ------------------------------------------------------------------------------- + +type lossMeasurement struct { + earlyWarningConfig CongestionSignalConfig + congestedConfig CongestionSignalConfig + congestionMinLoss float64 + + numGroups int + ts *trafficStats + + earlyWarningWeightedLoss float64 + congestedWeightedLoss float64 + + isSealed bool +} + +func newLossMeasurement( + earlyWarningConfig CongestionSignalConfig, + congestedConfig CongestionSignalConfig, + weightedLossConfig WeightedLossConfig, + congestionMinLoss float64, + logger logger.Logger, +) *lossMeasurement { + return &lossMeasurement{ + earlyWarningConfig: earlyWarningConfig, + congestedConfig: congestedConfig, + congestionMinLoss: congestionMinLoss, + ts: newTrafficStats(trafficStatsParams{ + Config: weightedLossConfig, + Logger: logger, + }), + } +} + +func (l *lossMeasurement) ProcessPacketGroup(pg *packetGroup) { + if l.isSealed { + return + } + + l.ts.Merge(pg.Traffic()) + + duration := l.ts.Duration() + if l.numGroups >= l.earlyWarningConfig.MinNumberOfGroups && duration >= l.earlyWarningConfig.MinDuration.Microseconds() { + l.earlyWarningWeightedLoss = l.ts.WeightedLoss() + } + if l.numGroups >= l.congestedConfig.MinNumberOfGroups && duration >= l.congestedConfig.MinDuration.Microseconds() { + l.congestedWeightedLoss = l.ts.WeightedLoss() + l.isSealed = true // can seal if congested thresholds are satisfied as those should be higher + } +} + +func (l *lossMeasurement) IsSealed() bool { + return l.isSealed +} + +func (l *lossMeasurement) IsEarlyWarningTriggered() bool { + return l.earlyWarningWeightedLoss > l.congestionMinLoss +} + +func (l *lossMeasurement) IsCongestedTriggered() bool { + return l.congestedWeightedLoss > l.congestionMinLoss } // ------------------------------------------------------------------------------- @@ -84,8 +200,8 @@ type CongestionDetectorConfig struct { JQRMinDelay time.Duration `yaml:"jqr_min_delay,omitempty"` DQRMaxDelay time.Duration `yaml:"dqr_max_delay,omitempty"` - JQRMinLoss float64 `yaml:"jqr_min_loss,omitempty"` - DQRMaxLoss float64 `yaml:"dqr_max_loss,omitempty"` + WeightedLoss WeightedLossConfig `yaml:"weighted_loss,omitempty"` + CongestionMinLoss float64 `yaml:"congestion_min_loss,omitempty"` QueuingDelayEarlyWarning CongestionSignalConfig `yaml:"queuing_delay_early_warning,omitempty"` LossEarlyWarning CongestionSignalConfig `yaml:"loss_early_warning,omitempty"` @@ -95,7 +211,6 @@ type CongestionDetectorConfig struct { LossCongested CongestionSignalConfig `yaml:"loss_congested,omitempty"` CongestedHangover time.Duration `yaml:"congested_hangover,omitempty"` - RateMeasurementWindowFullnessMin float64 `yaml:"rate_measurement_window_fullness_min,omitempty"` RateMeasurementWindowDurationMin time.Duration `yaml:"rate_measurement_window_duration_min,omitempty"` RateMeasurementWindowDurationMax time.Duration `yaml:"rate_measurement_window_duration_max,omitempty"` @@ -109,7 +224,7 @@ var ( defaultTrendDetectorConfigCongestedCTR = ccutils.TrendDetectorConfig{ RequiredSamples: 5, RequiredSamplesMin: 2, - DownwardTrendThreshold: -0.6, + DownwardTrendThreshold: -0.5, DownwardTrendMaxWait: 5 * time.Second, CollapseThreshold: 500 * time.Millisecond, ValidityWindow: 10 * time.Second, @@ -120,15 +235,14 @@ var ( PacketGroupMaxAge: 30 * time.Second, JQRMinDelay: 15 * time.Millisecond, DQRMaxDelay: 5 * time.Millisecond, - JQRMinLoss: 0.15, - DQRMaxLoss: 0.05, + WeightedLoss: defaultWeightedLossConfig, + CongestionMinLoss: 0.25, QueuingDelayEarlyWarning: DefaultQueuingDelayEarlyWarningCongestionSignalConfig, LossEarlyWarning: DefaultLossEarlyWarningCongestionSignalConfig, EarlyWarningHangover: 500 * time.Millisecond, QueuingDelayCongested: DefaultQueuingDelayCongestedCongestionSignalConfig, LossCongested: DefaultLossCongestedCongestionSignalConfig, CongestedHangover: 3 * time.Second, - RateMeasurementWindowFullnessMin: 0.8, RateMeasurementWindowDurationMin: 800 * time.Millisecond, RateMeasurementWindowDurationMax: 2 * time.Second, PeriodicCheckInterval: 2 * time.Second, @@ -168,6 +282,7 @@ type congestionDetector struct { congestionState CongestionState congestionStateSwitchedAt time.Time congestedCTRTrend *ccutils.TrendDetector[float64] + congestedTrafficStats *trafficStats onCongestionStateChange func(congestionState CongestionState, estimatedAvailableChannelCapacity int64) } @@ -235,12 +350,17 @@ func (c *congestionDetector) updateCongestionState(state CongestionState, reason // reducing/relieving congestion if state == CongestionStateCongested && prevState != CongestionStateCongested { c.congestedCTRTrend = ccutils.NewTrendDetector[float64](ccutils.TrendDetectorParams{ - Name: "ssbwe-estimate", + Name: "ssbwe-ctr", Logger: c.params.Logger, Config: c.params.Config.CongestedCTRTrend, }) + c.congestedTrafficStats = newTrafficStats(trafficStatsParams{ + Config: c.params.Config.WeightedLoss, + Logger: c.params.Logger, + }) } else if state != CongestionStateCongested { c.congestedCTRTrend = nil + c.congestedTrafficStats = nil } } @@ -268,72 +388,67 @@ func (c *congestionDetector) prunePacketGroups() { return } - threshold := c.packetGroups[len(c.packetGroups)-1].MinSendTime() - c.params.Config.PacketGroupMaxAge.Microseconds() + bst := c.packetTracker.BaseSendTime() + if bst == 0 { + return + } + threshold := mono.UnixMicro() - bst - c.params.Config.PacketGroupMaxAge.Microseconds() + for idx, pg := range c.packetGroups { - if mst := pg.MinSendTime(); mst < threshold { - c.packetGroups = c.packetGroups[idx+1:] + if mst := pg.MinSendTime(); mst > threshold { + c.packetGroups = c.packetGroups[idx:] return } } } func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool, string, int) { - earlyWarningTriggered := false - earlyWarningReason := "" - - congestedTriggered := false - congestedReason := "" - - qd := &congestionSignalCalculator[int64]{ - thresholdMin: c.params.Config.JQRMinDelay.Microseconds(), - thresholdMax: c.params.Config.DQRMaxDelay.Microseconds(), - } - loss := &congestionSignalCalculator[float64]{ - thresholdMin: c.params.Config.JQRMinLoss, - thresholdMax: c.params.Config.DQRMaxLoss, - } + qdMeasurement := newQdMeasurement( + c.params.Config.QueuingDelayEarlyWarning, + c.params.Config.QueuingDelayCongested, + c.params.Config.JQRMinDelay.Microseconds(), + c.params.Config.DQRMaxDelay.Microseconds(), + ) + lossMeasurement := newLossMeasurement( + c.params.Config.LossEarlyWarning, + c.params.Config.LossCongested, + c.params.Config.WeightedLoss, + c.params.Config.CongestionMinLoss, + c.params.Logger, + ) var idx int for idx = len(c.packetGroups) - 1; idx >= 0; idx-- { pg := c.packetGroups[idx] - pqd, pqdOk := pg.PropagatedQueuingDelay() - lr, lrOk := pg.LossRatio() - if !pqdOk && !lrOk { - continue - } + qdMeasurement.ProcessPacketGroup(pg) + lossMeasurement.ProcessPacketGroup(pg) - // `queueing delay` and `loss` based congestion signals are determined independently, - // i. e. one packet group triggering `queueing delay` and another group triggering - // `loss` will not combine to trigger the aggregate congestion signal - sendDuration := pg.SendDuration() - if pqdOk { - qd.processSample(pqd, sendDuration) - } - if lrOk { - loss.processSample(lr, sendDuration) + // if both measurements have enough data to make a decision, stop processing groups + if qdMeasurement.IsSealed() && lossMeasurement.IsSealed() { + break } + } - if !earlyWarningTriggered && qd.isTriggered(c.params.Config.QueuingDelayEarlyWarning) { - earlyWarningTriggered = true - earlyWarningReason = "queuing-delay" - } - if !earlyWarningTriggered && loss.isTriggered(c.params.Config.LossEarlyWarning) { - earlyWarningTriggered = true + earlyWarningReason := "" + earlyWarningTriggered := qdMeasurement.IsEarlyWarningTriggered() + if earlyWarningTriggered { + earlyWarningReason = "queuing-delay" + } else { + earlyWarningTriggered = lossMeasurement.IsEarlyWarningTriggered() + if earlyWarningTriggered { earlyWarningReason = "loss" } + } - if !congestedTriggered && qd.isTriggered(c.params.Config.QueuingDelayCongested) { - congestedTriggered = true - congestedReason = "queuing-delay" - } - if !congestedTriggered && loss.isTriggered(c.params.Config.LossCongested) { - congestedTriggered = true + congestedReason := "" + congestedTriggered := qdMeasurement.IsCongestedTriggered() + if congestedTriggered { + congestedReason = "queuing-delay" + } else { + congestedTriggered = lossMeasurement.IsCongestedTriggered() + if congestedTriggered { congestedReason = "loss" } - - if earlyWarningTriggered && congestedTriggered { - break - } } return earlyWarningTriggered, earlyWarningReason, congestedTriggered, congestedReason, idx @@ -401,12 +516,17 @@ func (c *congestionDetector) congestionDetectionStateMachine() { } } -func (c *congestionDetector) updateTrend(ctr float64) { +func (c *congestionDetector) updateTrend(pg *packetGroup) { if c.congestedCTRTrend == nil { return } - // quantise the CTR to filter out small changes + // progressively keep increasing the window and make measurements over longer windows, + // if congestion is not relieving, CTR will trend down + c.congestedTrafficStats.Merge(pg.Traffic()) + ctr := c.congestedTrafficStats.CapturedTrafficRatio() + + // quantise CTR to filter out small changes c.congestedCTRTrend.AddValue(float64(int((ctr+(c.params.Config.CongestedCTREpsilon/2))/c.params.Config.CongestedCTREpsilon)) * c.params.Config.CongestedCTREpsilon) if c.congestedCTRTrend.GetDirection() == ccutils.TrendDirectionDownward { @@ -424,10 +544,14 @@ func (c *congestionDetector) updateTrend(ctr float64) { // reset to get new set of samples for next trend c.congestedCTRTrend = ccutils.NewTrendDetector[float64](ccutils.TrendDetectorParams{ - Name: "ssbwe-estimate", + Name: "ssbwe-ctr", Logger: c.params.Logger, Config: c.params.Config.CongestedCTRTrend, }) + c.congestedTrafficStats = newTrafficStats(trafficStatsParams{ + Config: c.params.Config.WeightedLoss, + Logger: c.params.Logger, + }) } } @@ -436,31 +560,33 @@ func (c *congestionDetector) estimateAvailableChannelCapacity() { return } - totalDuration := int64(0) - totalBytes := 0 - threshold := c.packetGroups[len(c.packetGroups)-1].MinSendTime() - c.params.Config.RateMeasurementWindowDurationMax.Microseconds() + bst := c.packetTracker.BaseSendTime() + if bst == 0 { + return + } + threshold := mono.UnixMicro() - bst - c.params.Config.RateMeasurementWindowDurationMax.Microseconds() + + agg := newTrafficStats(trafficStatsParams{ + Config: c.params.Config.WeightedLoss, + Logger: c.params.Logger, + }) for idx := len(c.packetGroups) - 1; idx >= 0; idx-- { pg := c.packetGroups[idx] - mst, dur, nbytes, fullness := pg.Traffic() - if mst < threshold { + if pg.MinSendTime() < threshold { break } - if fullness < c.params.Config.RateMeasurementWindowFullnessMin { - continue - } - - totalDuration += dur - totalBytes += nbytes + agg.Merge(pg.Traffic()) } - if totalDuration >= c.params.Config.RateMeasurementWindowDurationMin.Microseconds() { - c.lock.Lock() - c.estimatedAvailableChannelCapacity = int64(totalBytes) * 8 * 1e6 / totalDuration - c.lock.Unlock() - } else { - c.params.Logger.Infow("not enough data to estimate available channel capacity", "totalDuration", totalDuration) + if agg.Duration() < c.params.Config.RateMeasurementWindowDurationMin.Microseconds() { + c.params.Logger.Infow("not enough data to estimate available channel capacity", "duration", agg.Duration()) + return } + + c.lock.Lock() + c.estimatedAvailableChannelCapacity = agg.AcknowledgedBitrate() + c.lock.Unlock() } func (c *congestionDetector) processFeedbackReport(fbr feedbackReport) { @@ -495,7 +621,7 @@ func (c *congestionDetector) processFeedbackReport(fbr feedbackReport) { if err == errGroupFinalized { // previous group ended, start a new group - c.updateTrend(pg.CapturedTrafficRatio()) + c.updateTrend(pg) // SSBWE-REMOVE c.params.Logger.Infow("packet group done", "group", pg, "numGroups", len(c.packetGroups)) // SSBWE-REMOVE pqd, _ := pg.PropagatedQueuingDelay() diff --git a/pkg/sfu/sendsidebwe/packet_group.go b/pkg/sfu/sendsidebwe/packet_group.go index dcee61017..ccf0b2ba6 100644 --- a/pkg/sfu/sendsidebwe/packet_group.go +++ b/pkg/sfu/sendsidebwe/packet_group.go @@ -1,8 +1,21 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package sendsidebwe import ( "errors" - "math" "time" "github.com/livekit/protocol/logger" @@ -22,18 +35,12 @@ var ( type PacketGroupConfig struct { MinPackets int `yaml:"min_packets,omitempty"` MaxWindowDuration time.Duration `yaml:"max_window_duration,omitempty"` - - // should have at least this fraction of `MinPackets` for loss penalty consideration - LossPenaltyMinPacketsRatio float64 `yaml:"loss_penalty_min_packet_ratio,omitempty"` - LossPenaltyFactor float64 `yaml:"loss_penalty_factor,omitempty"` } var ( DefaultPacketGroupConfig = PacketGroupConfig{ - MinPackets: 20, - MaxWindowDuration: 500 * time.Millisecond, - LossPenaltyMinPacketsRatio: 0.5, - LossPenaltyFactor: 0.25, + MinPackets: 20, + MaxWindowDuration: 500 * time.Millisecond, } ) @@ -108,8 +115,9 @@ func (c classStat) MarshalLogObject(e zapcore.ObjectEncoder) error { // ------------------------------------------------------------- type packetGroupParams struct { - Config PacketGroupConfig - Logger logger.Logger + Config PacketGroupConfig + WeightedLoss WeightedLossConfig + Logger logger.Logger } type packetGroup struct { @@ -212,6 +220,10 @@ func (p *packetGroup) MinSendTime() int64 { return p.minSendTime } +func (p *packetGroup) SendWindow() (int64, int64) { + return p.maxSendTime, p.minSendTime +} + func (p *packetGroup) PropagatedQueuingDelay() (int64, bool) { if !p.isFinalized { return 0, false @@ -224,57 +236,16 @@ func (p *packetGroup) PropagatedQueuingDelay() (int64, bool) { return max(0, p.aggregateRecvDelta-p.aggregateSendDelta), true } -func (p *packetGroup) SendDuration() int64 { - if !p.isFinalized { - return 0 +func (p *packetGroup) Traffic() *trafficStats { + return &trafficStats{ + minSendTime: p.minSendTime, + maxSendTime: p.maxSendTime, + sendDelta: p.aggregateSendDelta, + recvDelta: p.aggregateRecvDelta, + ackedPackets: p.acked.numPackets(), + ackedBytes: p.acked.numBytes(), + lostPackets: p.lost.numPackets(), } - - return p.maxSendTime - p.minSendTime -} - -func (p *packetGroup) CapturedTrafficRatio() float64 { - capturedTrafficRatio := float64(0.0) - if p.aggregateRecvDelta != 0 { - // apply a penalty for lost packets, - // tha rationale being packet dropping is a strategy to relieve congestion - // and if they were not dropped, they would have increased queuing delay, - // as it is not possible to know the reason for the losses, - // apply a small penalty to receive delta aggregate to simulate those packets - // build up queuing delay. - // - // note that it is applied only for determining rate and - // not while determining queuing region, adding synthetic delays - // like this could cause queuing region to be stuck in JQR - capturedTrafficRatio = float64(p.aggregateSendDelta) / float64(p.aggregateRecvDelta+p.getLossPenalty()) - } - return min(1.0, capturedTrafficRatio) -} - -func (p *packetGroup) Traffic() (int64, int64, int, float64) { - numBytes := int(float64(p.acked.numBytes()) * p.CapturedTrafficRatio()) - - fullness := max( - float64(p.acked.numPackets())/float64(p.params.Config.MinPackets), - float64(p.maxSendTime-p.minSendTime)/float64(p.params.Config.MaxWindowDuration.Microseconds()), - ) - - return p.minSendTime, p.maxSendTime - p.minSendTime, numBytes, fullness -} - -func (p *packetGroup) LossRatio() (float64, bool) { - if !p.isFinalized { - return 0.0, false - } - - return p.lossRatio() -} - -func (p *packetGroup) lossRatio() (float64, bool) { - lostPackets := p.lost.numPackets() - totalPackets := float64(lostPackets + p.acked.numPackets()) - lossRatio := float64(lostPackets) / totalPackets - // indeterminate if not enough packets, the second return value will be false if indeterminate - return lossRatio, totalPackets >= float64(p.params.Config.MinPackets)*p.params.Config.LossPenaltyMinPacketsRatio } func (p *packetGroup) MarshalLogObject(e zapcore.ObjectEncoder) error { @@ -282,26 +253,15 @@ func (p *packetGroup) MarshalLogObject(e zapcore.ObjectEncoder) error { return nil } - e.AddInt64("minSendTime", p.minSendTime) - e.AddInt64("maxSendTime", p.maxSendTime) - sendDuration := time.Duration((p.maxSendTime - p.minSendTime) * 1000) - e.AddDuration("sendDuration", sendDuration) - - e.AddInt64("minRecvTime", p.minRecvTime) - e.AddInt64("maxRecvTime", p.maxRecvTime) - recvDuration := time.Duration((p.maxRecvTime - p.minRecvTime) * 1000) - e.AddDuration("recvDuration", recvDuration) - e.AddUint64("minSequenceNumber", p.minSequenceNumber) e.AddUint64("maxSequenceNumber", p.maxSequenceNumber) e.AddObject("acked", p.acked) e.AddObject("lost", p.lost) - sendBitrate := float64(0) - if sendDuration != 0 { - sendBitrate = float64(p.acked.numBytes()*8) / sendDuration.Seconds() - e.AddFloat64("sendBitrate", sendBitrate) - } + e.AddInt64("minRecvTime", p.minRecvTime) + e.AddInt64("maxRecvTime", p.maxRecvTime) + recvDuration := time.Duration((p.maxRecvTime - p.minRecvTime) * 1000) + e.AddDuration("recvDuration", recvDuration) recvBitrate := float64(0) if recvDuration != 0 { @@ -309,20 +269,12 @@ func (p *packetGroup) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddFloat64("recvBitrate", recvBitrate) } - e.AddInt64("aggregateSendDelta", p.aggregateSendDelta) - e.AddInt64("aggregateRecvDelta", p.aggregateRecvDelta) - e.AddInt64("queuingDelay", p.queuingDelay) - e.AddInt64("groupDelay", p.aggregateRecvDelta-p.aggregateSendDelta) - - lossRatio, lossRatioValid := p.lossRatio() - e.AddFloat64("lossRatio", lossRatio) - e.AddBool("lossRatioValid", lossRatioValid) - - e.AddInt64("lossPenalty", p.getLossPenalty()) - - capturedTrafficRatio := p.CapturedTrafficRatio() - e.AddFloat64("capturedTrafficRatio", capturedTrafficRatio) - e.AddFloat64("estimatedAvailableChannelCapacity", sendBitrate*capturedTrafficRatio) + ts := newTrafficStats(trafficStatsParams{ + Config: p.params.WeightedLoss, + Logger: p.params.Logger, + }) + ts.Merge(p.Traffic()) + e.AddObject("trafficStats", ts) e.AddBool("isFinalized", p.isFinalized) return nil @@ -339,18 +291,3 @@ func (p *packetGroup) inGroup(sequenceNumber uint64) error { return nil } - -func (p *packetGroup) getLossPenalty() int64 { - // Log10 is used to give higher weight for the same loss ratio at higher packet rates, - // for e.g. with a penalty factor of 0.25 - // - 10% loss at 20 total packets = 0.1 * log10(20) * 0.25 = 0.032 - // - 10% loss at 100 total packets = 0.1 * log10(100) * 0.25 = 0.05 - // - 10% loss at 1000 total packets = 0.1 * log10(100) * 0.25 = 0.075 - lossRatio, _ := p.lossRatio() - return int64( - float64(p.aggregateRecvDelta) * - lossRatio * - math.Log10(float64(p.acked.numPackets()+p.lost.numPackets())) * - p.params.Config.LossPenaltyFactor, - ) -} diff --git a/pkg/sfu/sendsidebwe/packet_info.go b/pkg/sfu/sendsidebwe/packet_info.go index 58bff7a58..c753c1c86 100644 --- a/pkg/sfu/sendsidebwe/packet_info.go +++ b/pkg/sfu/sendsidebwe/packet_info.go @@ -1,3 +1,17 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package sendsidebwe import ( diff --git a/pkg/sfu/sendsidebwe/packet_tracker.go b/pkg/sfu/sendsidebwe/packet_tracker.go index 183181265..8716b7729 100644 --- a/pkg/sfu/sendsidebwe/packet_tracker.go +++ b/pkg/sfu/sendsidebwe/packet_tracker.go @@ -1,3 +1,17 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package sendsidebwe import ( @@ -70,6 +84,13 @@ func (p *packetTracker) RecordPacketSendAndGetSequenceNumber(at time.Time, size return uint16(pi.sequenceNumber) } +func (p *packetTracker) BaseSendTime() int64 { + p.lock.Lock() + defer p.lock.Unlock() + + return p.baseSendTime +} + func (p *packetTracker) RecordPacketIndicationFromRemote(sn uint16, recvTime int64) (piRecv packetInfo, sendDelta, recvDelta int64) { p.lock.Lock() defer p.lock.Unlock() diff --git a/pkg/sfu/sendsidebwe/send_side_bwe.go b/pkg/sfu/sendsidebwe/send_side_bwe.go index f48f9af56..5a0207863 100644 --- a/pkg/sfu/sendsidebwe/send_side_bwe.go +++ b/pkg/sfu/sendsidebwe/send_side_bwe.go @@ -1,3 +1,17 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package sendsidebwe import ( diff --git a/pkg/sfu/sendsidebwe/traffic_stats.go b/pkg/sfu/sendsidebwe/traffic_stats.go new file mode 100644 index 000000000..9e1cfa9f0 --- /dev/null +++ b/pkg/sfu/sendsidebwe/traffic_stats.go @@ -0,0 +1,156 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sendsidebwe + +import ( + "math" + "time" + + "github.com/livekit/protocol/logger" + "go.uber.org/zap/zapcore" +) + +// ----------------------------------------------------------- + +type WeightedLossConfig struct { + MinPacketsForLossValidity int `yaml:"min_packets_for_loss_validity,omitempty"` + LossPenaltyFactor float64 `yaml:"loss_penalty_factor,omitempty"` +} + +var ( + defaultWeightedLossConfig = WeightedLossConfig{ + MinPacketsForLossValidity: 20, + LossPenaltyFactor: 0.25, + } +) + +// ----------------------------------------------------------- + +type trafficStatsParams struct { + Config WeightedLossConfig + Logger logger.Logger +} + +type trafficStats struct { + params trafficStatsParams + + minSendTime int64 + maxSendTime int64 + sendDelta int64 + recvDelta int64 + ackedPackets int + ackedBytes int + lostPackets int +} + +func newTrafficStats(params trafficStatsParams) *trafficStats { + return &trafficStats{ + params: params, + } +} + +func (ts *trafficStats) Merge(rhs *trafficStats) { + if rhs.minSendTime == 0 || rhs.minSendTime < ts.minSendTime { + ts.minSendTime = rhs.minSendTime + } + if rhs.maxSendTime > ts.maxSendTime { + ts.maxSendTime = rhs.maxSendTime + } + ts.sendDelta += rhs.sendDelta + ts.recvDelta += rhs.recvDelta + ts.ackedPackets += rhs.ackedPackets + ts.ackedBytes += rhs.ackedBytes + ts.lostPackets += rhs.lostPackets +} + +func (ts *trafficStats) Duration() int64 { + return ts.maxSendTime - ts.minSendTime +} + +func (ts *trafficStats) AcknowledgedBitrate() int64 { + ackedBitrate := int64(ts.ackedBytes) * 8 * 1e6 / ts.Duration() + return int64(float64(ackedBitrate) * ts.CapturedTrafficRatio()) +} + +func (ts *trafficStats) CapturedTrafficRatio() float64 { + if ts.recvDelta == 0 { + return 0.0 + } + + // apply a penalty for lost packets, + // tha rationale being packet dropping is a strategy to relieve congestion + // and if they were not dropped, they would have increased queuing delay, + // as it is not possible to know the reason for the losses, + // apply a small penalty to receive delta aggregate to simulate those packets + // building up queuing delay. + return min(1.0, float64(ts.sendDelta)/float64(ts.recvDelta+ts.lossPenalty())) +} + +func (ts *trafficStats) WeightedLoss() float64 { + totalPackets := float64(ts.lostPackets + ts.ackedPackets) + if int(totalPackets) < ts.params.Config.MinPacketsForLossValidity { + return 0.0 + } + + lossRatio := float64(0.0) + if totalPackets != 0 { + lossRatio = float64(ts.lostPackets) / totalPackets + } + + pps := totalPackets * 1e6 / float64(ts.Duration()) + + // Log10 is used to give higher weight for the same loss ratio at higher packet rates, + // for e.g. with a penalty factor of 0.25 + // - 10% loss at 20 pps = 0.1 * log10(20) * 0.25 = 0.032 + // - 10% loss at 100 pps = 0.1 * log10(100) * 0.25 = 0.05 + // - 10% loss at 1000 pps = 0.1 * log10(1000) * 0.25 = 0.075 + return lossRatio * math.Log10(pps) +} + +func (ts *trafficStats) lossPenalty() int64 { + return int64(float64(ts.recvDelta) * ts.WeightedLoss() * ts.params.Config.LossPenaltyFactor) +} + +func (ts *trafficStats) MarshalLogObject(e zapcore.ObjectEncoder) error { + if ts == nil { + return nil + } + + e.AddInt64("minSendTime", ts.minSendTime) + e.AddInt64("maxSendTime", ts.maxSendTime) + duration := time.Duration(ts.Duration() * 1000) + e.AddDuration("duration", duration) + + bitrate := float64(0) + if duration != 0 { + bitrate = float64(ts.ackedBytes*8) / duration.Seconds() + e.AddFloat64("bitrate", bitrate) + } + + e.AddInt64("sendDelta", ts.sendDelta) + e.AddInt64("recvDelta", ts.recvDelta) + e.AddInt64("groupDelay", ts.recvDelta-ts.sendDelta) + + e.AddFloat64("weightedLoss", ts.WeightedLoss()) + if (ts.ackedPackets + ts.lostPackets) != 0 { + e.AddFloat64("rawLoss", float64(ts.lostPackets)/float64(ts.ackedPackets+ts.lostPackets)) + } + e.AddInt64("lossPenalty", ts.lossPenalty()) + + capturedTrafficRatio := ts.CapturedTrafficRatio() + e.AddFloat64("capturedTrafficRatio", capturedTrafficRatio) + e.AddFloat64("estimatedAvailableChannelCapacity", bitrate*capturedTrafficRatio) + return nil +} diff --git a/pkg/sfu/sendsidebwe/twcc_feedback.go b/pkg/sfu/sendsidebwe/twcc_feedback.go index 23eca2637..1e1251042 100644 --- a/pkg/sfu/sendsidebwe/twcc_feedback.go +++ b/pkg/sfu/sendsidebwe/twcc_feedback.go @@ -1,3 +1,17 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package sendsidebwe import (