From 79eda6b72bea5cf52b2537cfd1db21a612ab9e6b Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 12 Dec 2024 14:22:31 +0530 Subject: [PATCH] Send side BWE: tighter contributing groups (#3245) * WIP * clean up * debug * epm log * debug * fmt * clean up * default no SSBWE * clean up --- .../bwe/sendsidebwe/congestion_detector.go | 110 +++++++++++------- pkg/sfu/sequencer.go | 14 +++ 2 files changed, 80 insertions(+), 44 deletions(-) diff --git a/pkg/sfu/bwe/sendsidebwe/congestion_detector.go b/pkg/sfu/bwe/sendsidebwe/congestion_detector.go index 0f16515c5..0151a2958 100644 --- a/pkg/sfu/bwe/sendsidebwe/congestion_detector.go +++ b/pkg/sfu/bwe/sendsidebwe/congestion_detector.go @@ -119,11 +119,17 @@ type qdMeasurement struct { jqrMin int64 dqrMax int64 - numGroups int - smallestGroupIdx int - minSendTime int64 - maxSendTime int64 - isSealed bool + numGroups int + minSendTime int64 + maxSendTime int64 + + isSealed bool + + isEarlyWarningTriggered bool + earlyWarningGroupIdx int + + isCongestedTriggered bool + congestedGroupIdx int } func newQdMeasurement( @@ -145,10 +151,6 @@ func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) { return } - if q.smallestGroupIdx == 0 || q.smallestGroupIdx > groupIdx { - q.smallestGroupIdx = groupIdx - } - pqd, pqdOk := pg.FinalizedPropagatedQueuingDelay() if !pqdOk { return @@ -166,13 +168,19 @@ func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) { if q.minSendTime == 0 || minSendTime < q.minSendTime { q.minSendTime = minSendTime } - if maxSendTime > q.maxSendTime { - q.maxSendTime = maxSendTime - } + q.maxSendTime = max(q.maxSendTime, maxSendTime) } - // can seal if congested config thresholds are met as they are longer - if q.congestedConfig.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime) { + if !q.isEarlyWarningTriggered && q.earlyWarningConfig.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime) { + q.isEarlyWarningTriggered = true + q.earlyWarningGroupIdx = groupIdx + } + + if !q.isCongestedTriggered && q.congestedConfig.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime) { + q.isCongestedTriggered = true + q.congestedGroupIdx = groupIdx + + // can seal if congested config thresholds are met as they are longer q.isSealed = true } } @@ -181,16 +189,20 @@ func (q *qdMeasurement) IsSealed() bool { return q.isSealed } -func (q *qdMeasurement) SmallestGroupIdx() int { - return q.smallestGroupIdx +func (q *qdMeasurement) IsEarlyWarningTriggered() bool { + return q.isEarlyWarningTriggered } -func (q *qdMeasurement) IsEarlyWarningTriggered() bool { - return q.earlyWarningConfig.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime) +func (q *qdMeasurement) EarlyWarningGroupIdx() int { + return q.earlyWarningGroupIdx } func (q *qdMeasurement) IsCongestedTriggered() bool { - return q.congestedConfig.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime) + return q.isCongestedTriggered +} + +func (q *qdMeasurement) CongestedGroupIdx() int { + return q.congestedGroupIdx } func (q *qdMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error { @@ -199,13 +211,14 @@ func (q *qdMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error { } e.AddInt("numGroups", q.numGroups) - e.AddInt("smallestGroupIdx", q.smallestGroupIdx) e.AddInt64("minSendTime", q.minSendTime) e.AddInt64("maxSendTime", q.maxSendTime) e.AddDuration("duration", time.Duration((q.maxSendTime-q.minSendTime)*1000)) e.AddBool("isSealed", q.isSealed) - e.AddBool("isEarlyWarningTriggered", q.IsEarlyWarningTriggered()) - e.AddBool("isCongestedTriggered", q.IsCongestedTriggered()) + e.AddBool("isEarlyWarningTriggered", q.isEarlyWarningTriggered) + e.AddInt("earlyWarningGroupIdx", q.earlyWarningGroupIdx) + e.AddBool("isCongestedTriggered", q.isCongestedTriggered) + e.AddInt("congestedGroupIdx", q.congestedGroupIdx) return nil } @@ -216,13 +229,16 @@ type lossMeasurement struct { congestedConfig CongestionSignalConfig congestionMinLoss float64 - numGroups int - smallestGroupIdx int - ts *trafficStats + numGroups int + ts *trafficStats - earlyWarningWeightedLoss float64 - earlyWarningWeightedLossDone bool - congestedWeightedLoss float64 + isEarlyWarningGrouped bool + earlyWarningGroupIdx int + earlyWarningWeightedLoss float64 + + isCongestedGrouped bool + congestedGroupIdx int + congestedWeightedLoss float64 isSealed bool } @@ -250,19 +266,18 @@ func (l *lossMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) { return } - if l.smallestGroupIdx == 0 || l.smallestGroupIdx > groupIdx { - l.smallestGroupIdx = groupIdx - } - l.numGroups++ l.ts.Merge(pg.Traffic()) duration := l.ts.Duration() - if l.earlyWarningConfig.IsTriggered(l.numGroups, duration) && !l.earlyWarningWeightedLossDone { + if !l.isEarlyWarningGrouped && l.earlyWarningConfig.IsTriggered(l.numGroups, duration) { + l.isEarlyWarningGrouped = true + l.earlyWarningGroupIdx = groupIdx l.earlyWarningWeightedLoss = l.ts.WeightedLoss() - l.earlyWarningWeightedLossDone = true } - if l.congestedConfig.IsTriggered(l.numGroups, duration) { + if !l.isCongestedGrouped && l.congestedConfig.IsTriggered(l.numGroups, duration) { + l.isCongestedGrouped = true + l.congestedGroupIdx = groupIdx l.congestedWeightedLoss = l.ts.WeightedLoss() l.isSealed = true // can seal if congested thresholds are satisfied as those should be higher } @@ -272,27 +287,34 @@ func (l *lossMeasurement) IsSealed() bool { return l.isSealed } -func (l *lossMeasurement) SmallestGroupIdx() int { - return l.smallestGroupIdx -} - func (l *lossMeasurement) IsEarlyWarningTriggered() bool { return l.earlyWarningWeightedLoss > l.congestionMinLoss } +func (l *lossMeasurement) EarlyWarningGroupIdx() int { + return l.earlyWarningGroupIdx +} + func (l *lossMeasurement) IsCongestedTriggered() bool { return l.congestedWeightedLoss > l.congestionMinLoss } +func (l *lossMeasurement) CongestedGroupIdx() int { + return l.congestedGroupIdx +} + func (l *lossMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error { if l == nil { return nil } e.AddInt("numGroups", l.numGroups) - e.AddInt("smallestGroupIdx", l.smallestGroupIdx) e.AddObject("ts", l.ts) + e.AddBool("isEarlyWarningGrouped", l.isEarlyWarningGrouped) + e.AddInt("earlyWarningGroupIdx", l.earlyWarningGroupIdx) e.AddFloat64("earlyWarningWeightedLoss", l.earlyWarningWeightedLoss) + e.AddBool("isCongestedGrouped", l.isCongestedGrouped) + e.AddInt("congestedGroupIdx", l.congestedGroupIdx) e.AddFloat64("congestedWeightedLoss", l.congestedWeightedLoss) e.AddBool("isSealed", l.isSealed) e.AddBool("isEarlyWarningTriggered", l.IsEarlyWarningTriggered()) @@ -737,12 +759,12 @@ func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool, earlyWarningTriggered := qdMeasurement.IsEarlyWarningTriggered() if earlyWarningTriggered { earlyWarningReason = "queuing-delay" - oldestContributingGroup = qdMeasurement.SmallestGroupIdx() + oldestContributingGroup = qdMeasurement.EarlyWarningGroupIdx() } else { earlyWarningTriggered = lossMeasurement.IsEarlyWarningTriggered() if earlyWarningTriggered { earlyWarningReason = "loss" - oldestContributingGroup = lossMeasurement.SmallestGroupIdx() + oldestContributingGroup = lossMeasurement.EarlyWarningGroupIdx() } } @@ -750,12 +772,12 @@ func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool, congestedTriggered := qdMeasurement.IsCongestedTriggered() if congestedTriggered { congestedReason = "queuing-delay" - oldestContributingGroup = qdMeasurement.SmallestGroupIdx() + oldestContributingGroup = qdMeasurement.CongestedGroupIdx() } else { congestedTriggered = lossMeasurement.IsCongestedTriggered() if congestedTriggered { congestedReason = "loss" - oldestContributingGroup = lossMeasurement.SmallestGroupIdx() + oldestContributingGroup = lossMeasurement.CongestedGroupIdx() } } diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index 294029cc5..5222741fe 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -21,6 +21,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/utils" "github.com/livekit/protocol/logger" + "go.uber.org/zap/zapcore" ) const ( @@ -85,6 +86,19 @@ type extPacketMeta struct { extTimestamp uint64 } +func (epm *extPacketMeta) MarshalLogObject(e zapcore.ObjectEncoder) error { + if epm == nil { + return nil + } + + e.AddUint64("sourceSeqNo", epm.sourceSeqNo) + e.AddUint16("targetSeqNo", epm.targetSeqNo) + e.AddUint64("extSequenceNumber", epm.extSequenceNumber) + e.AddInt8("layer", epm.layer) + e.AddUint8("nacked", epm.nacked) + return nil +} + // Sequencer stores the packet sequence received by the down track type sequencer struct { sync.Mutex