Send side BWE: tighter contributing groups (#3245)

* WIP

* clean up

* debug

* epm log

* debug

* fmt

* clean up

* default no SSBWE

* clean up
This commit is contained in:
Raja Subramanian
2024-12-12 14:22:31 +05:30
committed by GitHub
parent 4b16017d09
commit 79eda6b72b
2 changed files with 80 additions and 44 deletions
+66 -44
View File
@@ -119,11 +119,17 @@ type qdMeasurement struct {
jqrMin int64
dqrMax int64
numGroups int
smallestGroupIdx int
minSendTime int64
maxSendTime int64
isSealed bool
numGroups int
minSendTime int64
maxSendTime int64
isSealed bool
isEarlyWarningTriggered bool
earlyWarningGroupIdx int
isCongestedTriggered bool
congestedGroupIdx int
}
func newQdMeasurement(
@@ -145,10 +151,6 @@ func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) {
return
}
if q.smallestGroupIdx == 0 || q.smallestGroupIdx > groupIdx {
q.smallestGroupIdx = groupIdx
}
pqd, pqdOk := pg.FinalizedPropagatedQueuingDelay()
if !pqdOk {
return
@@ -166,13 +168,19 @@ func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) {
if q.minSendTime == 0 || minSendTime < q.minSendTime {
q.minSendTime = minSendTime
}
if maxSendTime > q.maxSendTime {
q.maxSendTime = maxSendTime
}
q.maxSendTime = max(q.maxSendTime, maxSendTime)
}
// can seal if congested config thresholds are met as they are longer
if q.congestedConfig.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime) {
if !q.isEarlyWarningTriggered && q.earlyWarningConfig.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime) {
q.isEarlyWarningTriggered = true
q.earlyWarningGroupIdx = groupIdx
}
if !q.isCongestedTriggered && q.congestedConfig.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime) {
q.isCongestedTriggered = true
q.congestedGroupIdx = groupIdx
// can seal if congested config thresholds are met as they are longer
q.isSealed = true
}
}
@@ -181,16 +189,20 @@ func (q *qdMeasurement) IsSealed() bool {
return q.isSealed
}
func (q *qdMeasurement) SmallestGroupIdx() int {
return q.smallestGroupIdx
func (q *qdMeasurement) IsEarlyWarningTriggered() bool {
return q.isEarlyWarningTriggered
}
func (q *qdMeasurement) IsEarlyWarningTriggered() bool {
return q.earlyWarningConfig.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime)
func (q *qdMeasurement) EarlyWarningGroupIdx() int {
return q.earlyWarningGroupIdx
}
func (q *qdMeasurement) IsCongestedTriggered() bool {
return q.congestedConfig.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime)
return q.isCongestedTriggered
}
func (q *qdMeasurement) CongestedGroupIdx() int {
return q.congestedGroupIdx
}
func (q *qdMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error {
@@ -199,13 +211,14 @@ func (q *qdMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error {
}
e.AddInt("numGroups", q.numGroups)
e.AddInt("smallestGroupIdx", q.smallestGroupIdx)
e.AddInt64("minSendTime", q.minSendTime)
e.AddInt64("maxSendTime", q.maxSendTime)
e.AddDuration("duration", time.Duration((q.maxSendTime-q.minSendTime)*1000))
e.AddBool("isSealed", q.isSealed)
e.AddBool("isEarlyWarningTriggered", q.IsEarlyWarningTriggered())
e.AddBool("isCongestedTriggered", q.IsCongestedTriggered())
e.AddBool("isEarlyWarningTriggered", q.isEarlyWarningTriggered)
e.AddInt("earlyWarningGroupIdx", q.earlyWarningGroupIdx)
e.AddBool("isCongestedTriggered", q.isCongestedTriggered)
e.AddInt("congestedGroupIdx", q.congestedGroupIdx)
return nil
}
@@ -216,13 +229,16 @@ type lossMeasurement struct {
congestedConfig CongestionSignalConfig
congestionMinLoss float64
numGroups int
smallestGroupIdx int
ts *trafficStats
numGroups int
ts *trafficStats
earlyWarningWeightedLoss float64
earlyWarningWeightedLossDone bool
congestedWeightedLoss float64
isEarlyWarningGrouped bool
earlyWarningGroupIdx int
earlyWarningWeightedLoss float64
isCongestedGrouped bool
congestedGroupIdx int
congestedWeightedLoss float64
isSealed bool
}
@@ -250,19 +266,18 @@ func (l *lossMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) {
return
}
if l.smallestGroupIdx == 0 || l.smallestGroupIdx > groupIdx {
l.smallestGroupIdx = groupIdx
}
l.numGroups++
l.ts.Merge(pg.Traffic())
duration := l.ts.Duration()
if l.earlyWarningConfig.IsTriggered(l.numGroups, duration) && !l.earlyWarningWeightedLossDone {
if !l.isEarlyWarningGrouped && l.earlyWarningConfig.IsTriggered(l.numGroups, duration) {
l.isEarlyWarningGrouped = true
l.earlyWarningGroupIdx = groupIdx
l.earlyWarningWeightedLoss = l.ts.WeightedLoss()
l.earlyWarningWeightedLossDone = true
}
if l.congestedConfig.IsTriggered(l.numGroups, duration) {
if !l.isCongestedGrouped && l.congestedConfig.IsTriggered(l.numGroups, duration) {
l.isCongestedGrouped = true
l.congestedGroupIdx = groupIdx
l.congestedWeightedLoss = l.ts.WeightedLoss()
l.isSealed = true // can seal if congested thresholds are satisfied as those should be higher
}
@@ -272,27 +287,34 @@ func (l *lossMeasurement) IsSealed() bool {
return l.isSealed
}
func (l *lossMeasurement) SmallestGroupIdx() int {
return l.smallestGroupIdx
}
func (l *lossMeasurement) IsEarlyWarningTriggered() bool {
return l.earlyWarningWeightedLoss > l.congestionMinLoss
}
func (l *lossMeasurement) EarlyWarningGroupIdx() int {
return l.earlyWarningGroupIdx
}
func (l *lossMeasurement) IsCongestedTriggered() bool {
return l.congestedWeightedLoss > l.congestionMinLoss
}
func (l *lossMeasurement) CongestedGroupIdx() int {
return l.congestedGroupIdx
}
func (l *lossMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error {
if l == nil {
return nil
}
e.AddInt("numGroups", l.numGroups)
e.AddInt("smallestGroupIdx", l.smallestGroupIdx)
e.AddObject("ts", l.ts)
e.AddBool("isEarlyWarningGrouped", l.isEarlyWarningGrouped)
e.AddInt("earlyWarningGroupIdx", l.earlyWarningGroupIdx)
e.AddFloat64("earlyWarningWeightedLoss", l.earlyWarningWeightedLoss)
e.AddBool("isCongestedGrouped", l.isCongestedGrouped)
e.AddInt("congestedGroupIdx", l.congestedGroupIdx)
e.AddFloat64("congestedWeightedLoss", l.congestedWeightedLoss)
e.AddBool("isSealed", l.isSealed)
e.AddBool("isEarlyWarningTriggered", l.IsEarlyWarningTriggered())
@@ -737,12 +759,12 @@ func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool,
earlyWarningTriggered := qdMeasurement.IsEarlyWarningTriggered()
if earlyWarningTriggered {
earlyWarningReason = "queuing-delay"
oldestContributingGroup = qdMeasurement.SmallestGroupIdx()
oldestContributingGroup = qdMeasurement.EarlyWarningGroupIdx()
} else {
earlyWarningTriggered = lossMeasurement.IsEarlyWarningTriggered()
if earlyWarningTriggered {
earlyWarningReason = "loss"
oldestContributingGroup = lossMeasurement.SmallestGroupIdx()
oldestContributingGroup = lossMeasurement.EarlyWarningGroupIdx()
}
}
@@ -750,12 +772,12 @@ func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool,
congestedTriggered := qdMeasurement.IsCongestedTriggered()
if congestedTriggered {
congestedReason = "queuing-delay"
oldestContributingGroup = qdMeasurement.SmallestGroupIdx()
oldestContributingGroup = qdMeasurement.CongestedGroupIdx()
} else {
congestedTriggered = lossMeasurement.IsCongestedTriggered()
if congestedTriggered {
congestedReason = "loss"
oldestContributingGroup = lossMeasurement.SmallestGroupIdx()
oldestContributingGroup = lossMeasurement.CongestedGroupIdx()
}
}
+14
View File
@@ -21,6 +21,7 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/protocol/logger"
"go.uber.org/zap/zapcore"
)
const (
@@ -85,6 +86,19 @@ type extPacketMeta struct {
extTimestamp uint64
}
func (epm *extPacketMeta) MarshalLogObject(e zapcore.ObjectEncoder) error {
if epm == nil {
return nil
}
e.AddUint64("sourceSeqNo", epm.sourceSeqNo)
e.AddUint16("targetSeqNo", epm.targetSeqNo)
e.AddUint64("extSequenceNumber", epm.extSequenceNumber)
e.AddInt8("layer", epm.layer)
e.AddUint8("nacked", epm.nacked)
return nil
}
// Sequencer stores the packet sequence received by the down track
type sequencer struct {
sync.Mutex