Dampen oscillations in loss based congestion detection when using TWCC. (#3256)

* Rework congestion detection state machine

* WIP

* fmt

* clean up

* revert config
This commit is contained in:
Raja Subramanian
2024-12-16 15:26:23 +05:30
committed by GitHub
parent 699cd9c26c
commit 192ecbfc88
8 changed files with 222 additions and 193 deletions

View File

@@ -15,6 +15,7 @@
package sendsidebwe
import (
"fmt"
"sync"
"time"
@@ -45,7 +46,7 @@ var (
defaultLossEarlyWarningCongestionSignalConfig = CongestionSignalConfig{
MinNumberOfGroups: 2,
MinDuration: 200 * time.Millisecond,
MinDuration: 300 * time.Millisecond,
}
defaultQueuingDelayCongestedCongestionSignalConfig = CongestionSignalConfig{
@@ -54,8 +55,8 @@ var (
}
defaultLossCongestedCongestionSignalConfig = CongestionSignalConfig{
MinNumberOfGroups: 5,
MinDuration: 600 * time.Millisecond,
MinNumberOfGroups: 6,
MinDuration: 900 * time.Millisecond,
}
)
@@ -68,8 +69,9 @@ type ProbeSignalConfig struct {
JQRMinDelay time.Duration `yaml:"jqr_min_delay,omitempty"`
DQRMaxDelay time.Duration `yaml:"dqr_max_delay,omitempty"`
WeightedLoss WeightedLossConfig `yaml:"weighted_loss,omitempty"`
CongestionMinWeightedLoss float64 `yaml:"congestion_min_weighted_loss,omitempty"`
WeightedLoss WeightedLossConfig `yaml:"weighted_loss,omitempty"`
JQRMinWeightedLoss float64 `yaml:"jqr_min_weighted_loss,omitempty"`
DQRMaxWeightedLoss float64 `yaml:"dqr_max_weighted_loss,omitempty"`
}
func (p ProbeSignalConfig) IsValid(pci ccutils.ProbeClusterInfo) bool {
@@ -83,15 +85,11 @@ func (p ProbeSignalConfig) ProbeSignal(ppg *probePacketGroup) (ccutils.ProbeSign
ts.Merge(ppg.Traffic())
pqd := ppg.PropagatedQueuingDelay()
if pqd > p.JQRMinDelay.Microseconds() {
if pqd > p.JQRMinDelay.Microseconds() || ts.WeightedLoss() > p.JQRMinWeightedLoss {
return ccutils.ProbeSignalCongesting, ts.AcknowledgedBitrate()
}
if ts.WeightedLoss() > p.CongestionMinWeightedLoss {
return ccutils.ProbeSignalCongesting, ts.AcknowledgedBitrate()
}
if pqd < p.DQRMaxDelay.Microseconds() {
if pqd < p.DQRMaxDelay.Microseconds() && ts.WeightedLoss() < p.DQRMaxWeightedLoss {
return ccutils.ProbeSignalNotCongesting, ts.AcknowledgedBitrate()
}
@@ -106,43 +104,58 @@ var (
JQRMinDelay: 15 * time.Millisecond,
DQRMaxDelay: 5 * time.Millisecond,
WeightedLoss: defaultWeightedLossConfig,
CongestionMinWeightedLoss: 0.25,
WeightedLoss: defaultWeightedLossConfig,
JQRMinWeightedLoss: 0.25,
DQRMaxWeightedLoss: 0.1,
}
)
// -------------------------------------------------------------------------------
type queuingRegion int
const (
queuingRegionDQR queuingRegion = iota
queuingRegionIndeterminate
queuingRegionJQR
)
func (q queuingRegion) String() string {
switch q {
case queuingRegionDQR:
return "DQR"
case queuingRegionIndeterminate:
return "INDETERMINATE"
case queuingRegionJQR:
return "JQR"
default:
return fmt.Sprintf("%d", int(q))
}
}
// -------------------------------------------------------------------------------
type qdMeasurement struct {
earlyWarningConfig CongestionSignalConfig
congestedConfig CongestionSignalConfig
jqrMin int64
dqrMax int64
config CongestionSignalConfig
jqrMin int64
dqrMax int64
numGroups int
minSendTime int64
maxSendTime int64
isSealed bool
isSealed bool
sealedGroupIdx int
isEarlyWarningTriggered bool
earlyWarningGroupIdx int
isCongestedTriggered bool
congestedGroupIdx int
queuingRegion queuingRegion
}
func newQdMeasurement(
earlyWarningConfig CongestionSignalConfig,
congestedConfig CongestionSignalConfig,
jqrMin int64,
dqrMax int64,
) *qdMeasurement {
func newQdMeasurement(config CongestionSignalConfig, jqrMin int64, dqrMax int64) *qdMeasurement {
return &qdMeasurement{
earlyWarningConfig: earlyWarningConfig,
congestedConfig: congestedConfig,
jqrMin: jqrMin,
dqrMax: dqrMax,
config: config,
jqrMin: jqrMin,
dqrMax: dqrMax,
queuingRegion: queuingRegionIndeterminate,
}
}
@@ -159,6 +172,8 @@ func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) {
if pqd < q.dqrMax {
// a DQR breaks continuity
q.isSealed = true
q.sealedGroupIdx = groupIdx
q.queuingRegion = queuingRegionDQR
return
}
@@ -171,17 +186,10 @@ func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) {
q.maxSendTime = max(q.maxSendTime, maxSendTime)
}
if !q.isEarlyWarningTriggered && q.earlyWarningConfig.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime) {
q.isEarlyWarningTriggered = true
q.earlyWarningGroupIdx = groupIdx
}
if !q.isCongestedTriggered && q.congestedConfig.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime) {
q.isCongestedTriggered = true
q.congestedGroupIdx = groupIdx
// can seal if congested config thresholds are met as they are longer
if q.config.IsTriggered(q.numGroups, q.maxSendTime-q.minSendTime) {
q.isSealed = true
q.sealedGroupIdx = groupIdx
q.queuingRegion = queuingRegionJQR
}
}
@@ -189,20 +197,8 @@ func (q *qdMeasurement) IsSealed() bool {
return q.isSealed
}
func (q *qdMeasurement) IsEarlyWarningTriggered() bool {
return q.isEarlyWarningTriggered
}
func (q *qdMeasurement) EarlyWarningGroupIdx() int {
return q.earlyWarningGroupIdx
}
func (q *qdMeasurement) IsCongestedTriggered() bool {
return q.isCongestedTriggered
}
func (q *qdMeasurement) CongestedGroupIdx() int {
return q.congestedGroupIdx
func (q *qdMeasurement) Result() (queuingRegion, int) {
return q.queuingRegion, q.sealedGroupIdx
}
func (q *qdMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error {
@@ -215,49 +211,45 @@ func (q *qdMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddInt64("maxSendTime", q.maxSendTime)
e.AddDuration("duration", time.Duration((q.maxSendTime-q.minSendTime)*1000))
e.AddBool("isSealed", q.isSealed)
e.AddBool("isEarlyWarningTriggered", q.isEarlyWarningTriggered)
e.AddInt("earlyWarningGroupIdx", q.earlyWarningGroupIdx)
e.AddBool("isCongestedTriggered", q.isCongestedTriggered)
e.AddInt("congestedGroupIdx", q.congestedGroupIdx)
e.AddInt("sealedGroupIdx", q.sealedGroupIdx)
e.AddString("queuingRegion", q.queuingRegion.String())
return nil
}
// -------------------------------------------------------------------------------
type lossMeasurement struct {
earlyWarningConfig CongestionSignalConfig
congestedConfig CongestionSignalConfig
congestionMinLoss float64
config CongestionSignalConfig
jqrMinLoss float64
dqrMaxLoss float64
numGroups int
ts *trafficStats
isEarlyWarningGrouped bool
earlyWarningGroupIdx int
earlyWarningWeightedLoss float64
isSealed bool
sealedGroupIdx int
isCongestedGrouped bool
congestedGroupIdx int
congestedWeightedLoss float64
weightedLoss float64
isSealed bool
queuingRegion queuingRegion
}
func newLossMeasurement(
earlyWarningConfig CongestionSignalConfig,
congestedConfig CongestionSignalConfig,
config CongestionSignalConfig,
weightedLossConfig WeightedLossConfig,
congestionMinLoss float64,
jqrMinLoss float64,
dqrMaxLoss float64,
logger logger.Logger,
) *lossMeasurement {
return &lossMeasurement{
earlyWarningConfig: earlyWarningConfig,
congestedConfig: congestedConfig,
congestionMinLoss: congestionMinLoss,
config: config,
jqrMinLoss: jqrMinLoss,
dqrMaxLoss: dqrMaxLoss,
ts: newTrafficStats(trafficStatsParams{
Config: weightedLossConfig,
Logger: logger,
}),
queuingRegion: queuingRegionIndeterminate,
}
}
@@ -270,16 +262,16 @@ func (l *lossMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) {
l.ts.Merge(pg.Traffic())
duration := l.ts.Duration()
if !l.isEarlyWarningGrouped && l.earlyWarningConfig.IsTriggered(l.numGroups, duration) {
l.isEarlyWarningGrouped = true
l.earlyWarningGroupIdx = groupIdx
l.earlyWarningWeightedLoss = l.ts.WeightedLoss()
}
if !l.isCongestedGrouped && l.congestedConfig.IsTriggered(l.numGroups, duration) {
l.isCongestedGrouped = true
l.congestedGroupIdx = groupIdx
l.congestedWeightedLoss = l.ts.WeightedLoss()
l.isSealed = true // can seal if congested thresholds are satisfied as those should be higher
if l.config.IsTriggered(l.numGroups, duration) {
l.isSealed = true
l.sealedGroupIdx = groupIdx
l.weightedLoss = l.ts.WeightedLoss()
if l.weightedLoss < l.dqrMaxLoss {
l.queuingRegion = queuingRegionDQR
} else if l.weightedLoss > l.jqrMinLoss {
l.queuingRegion = queuingRegionJQR
}
}
}
@@ -287,20 +279,8 @@ func (l *lossMeasurement) IsSealed() bool {
return l.isSealed
}
func (l *lossMeasurement) IsEarlyWarningTriggered() bool {
return l.earlyWarningWeightedLoss > l.congestionMinLoss
}
func (l *lossMeasurement) EarlyWarningGroupIdx() int {
return l.earlyWarningGroupIdx
}
func (l *lossMeasurement) IsCongestedTriggered() bool {
return l.congestedWeightedLoss > l.congestionMinLoss
}
func (l *lossMeasurement) CongestedGroupIdx() int {
return l.congestedGroupIdx
func (l *lossMeasurement) Result() (queuingRegion, int) {
return l.queuingRegion, l.sealedGroupIdx
}
func (l *lossMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error {
@@ -310,15 +290,10 @@ func (l *lossMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddInt("numGroups", l.numGroups)
e.AddObject("ts", l.ts)
e.AddBool("isEarlyWarningGrouped", l.isEarlyWarningGrouped)
e.AddInt("earlyWarningGroupIdx", l.earlyWarningGroupIdx)
e.AddFloat64("earlyWarningWeightedLoss", l.earlyWarningWeightedLoss)
e.AddBool("isCongestedGrouped", l.isCongestedGrouped)
e.AddInt("congestedGroupIdx", l.congestedGroupIdx)
e.AddFloat64("congestedWeightedLoss", l.congestedWeightedLoss)
e.AddBool("isSealed", l.isSealed)
e.AddBool("isEarlyWarningTriggered", l.IsEarlyWarningTriggered())
e.AddBool("isCongestedTriggered", l.IsCongestedTriggered())
e.AddInt("sealedGroupIdx", l.sealedGroupIdx)
e.AddFloat64("weightedLoss", l.weightedLoss)
e.AddString("queuingRegion", l.queuingRegion.String())
return nil
}
@@ -335,8 +310,9 @@ type CongestionDetectorConfig struct {
JQRMinDelay time.Duration `yaml:"jqr_min_delay,omitempty"`
DQRMaxDelay time.Duration `yaml:"dqr_max_delay,omitempty"`
WeightedLoss WeightedLossConfig `yaml:"weighted_loss,omitempty"`
CongestionMinWeightedLoss float64 `yaml:"congestion_min_weighted_loss,omitempty"`
WeightedLoss WeightedLossConfig `yaml:"weighted_loss,omitempty"`
JQRMinWeightedLoss float64 `yaml:"jqr_min_weighted_loss,omitempty"`
DQRMaxWeightedLoss float64 `yaml:"dqr_max_weighted_loss,omitempty"`
QueuingDelayEarlyWarning CongestionSignalConfig `yaml:"queuing_delay_early_warning,omitempty"`
LossEarlyWarning CongestionSignalConfig `yaml:"loss_early_warning,omitempty"`
@@ -368,7 +344,7 @@ var (
defaultCongestionDetectorConfig = CongestionDetectorConfig{
PacketGroup: defaultPacketGroupConfig,
PacketGroupMaxAge: 10 * time.Second,
PacketGroupMaxAge: 5 * time.Second,
ProbePacketGroup: defaultProbePacketGroupConfig,
ProbeRegulator: ccutils.DefaultProbeRegulatorConfig,
@@ -377,8 +353,9 @@ var (
JQRMinDelay: 15 * time.Millisecond,
DQRMaxDelay: 5 * time.Millisecond,
WeightedLoss: defaultWeightedLossConfig,
CongestionMinWeightedLoss: 0.25,
WeightedLoss: defaultWeightedLossConfig,
JQRMinWeightedLoss: 0.25,
DQRMaxWeightedLoss: 0.1,
QueuingDelayEarlyWarning: defaultQueuingDelayEarlyWarningCongestionSignalConfig,
LossEarlyWarning: defaultLossEarlyWarningCongestionSignalConfig,
@@ -671,6 +648,23 @@ func (c *congestionDetector) ProbeClusterDone(pci ccutils.ProbeClusterInfo) {
}
}
func (c *congestionDetector) ProbeClusterIsGoalReached() bool {
c.lock.Lock()
defer c.lock.Unlock()
if c.probePacketGroup == nil || c.congestionState != bwe.CongestionStateNone {
return false
}
pci := c.probePacketGroup.ProbeClusterInfo()
if !c.params.Config.ProbeSignal.IsValid(pci) {
return false
}
probeSignal, estimatedAvailableChannelCapacity := c.params.Config.ProbeSignal.ProbeSignal(c.probePacketGroup)
return probeSignal != ccutils.ProbeSignalNotCongesting && estimatedAvailableChannelCapacity > int64(pci.Goal.DesiredBps)
}
func (c *congestionDetector) ProbeClusterFinalize() (ccutils.ProbeSignal, int64, bool) {
c.lock.Lock()
defer c.lock.Unlock()
@@ -720,18 +714,21 @@ func (c *congestionDetector) prunePacketGroups() {
}
}
func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool, string, int) {
func (c *congestionDetector) getCongestionSignal(
stage string,
qdConfig CongestionSignalConfig,
lossConfig CongestionSignalConfig,
) (queuingRegion, string, int) {
qdMeasurement := newQdMeasurement(
c.params.Config.QueuingDelayEarlyWarning,
c.params.Config.QueuingDelayCongested,
qdConfig,
c.params.Config.JQRMinDelay.Microseconds(),
c.params.Config.DQRMaxDelay.Microseconds(),
)
lossMeasurement := newLossMeasurement(
c.params.Config.LossEarlyWarning,
c.params.Config.LossCongested,
lossConfig,
c.params.Config.WeightedLoss,
c.params.Config.CongestionMinWeightedLoss,
c.params.Config.JQRMinWeightedLoss,
c.params.Config.DQRMaxWeightedLoss,
c.params.Logger,
)
@@ -745,97 +742,88 @@ func (c *congestionDetector) isCongestionSignalTriggered() (bool, string, bool,
if qdMeasurement.IsSealed() && lossMeasurement.IsSealed() {
break
}
// if "congested" triggered, can stop as that is the longer duration check and also
// the worst case check, i. e. if "congested" is triggered due to any condition,
// there can be nothing else that can trigger
if qdMeasurement.IsCongestedTriggered() || lossMeasurement.IsCongestedTriggered() {
break
}
}
oldestContributingGroup := max(0, idx)
earlyWarningReason := ""
earlyWarningTriggered := qdMeasurement.IsEarlyWarningTriggered()
if earlyWarningTriggered {
earlyWarningReason = "queuing-delay"
oldestContributingGroup = qdMeasurement.EarlyWarningGroupIdx()
c.params.Logger.Debugw("send side bwe: early warning queuing-delay", "qd", qdMeasurement)
reason := ""
qr, groupIdx := qdMeasurement.Result()
if qr == queuingRegionJQR {
reason = "queuing-delay"
oldestContributingGroup = groupIdx
c.params.Logger.Debugw("send side bwe: queuing-delay in JQR", "stage", stage, "qd", qdMeasurement)
} else {
earlyWarningTriggered = lossMeasurement.IsEarlyWarningTriggered()
if earlyWarningTriggered {
earlyWarningReason = "loss"
oldestContributingGroup = lossMeasurement.EarlyWarningGroupIdx()
c.params.Logger.Debugw("send side bwe: early warning loss", "loss", lossMeasurement)
qr, groupIdx = lossMeasurement.Result()
if qr == queuingRegionJQR {
reason = "loss"
oldestContributingGroup = groupIdx
c.params.Logger.Debugw("send side bwe: loss in JQR", "stage", stage, "loss", lossMeasurement)
}
}
congestedReason := ""
congestedTriggered := qdMeasurement.IsCongestedTriggered()
if congestedTriggered {
congestedReason = "queuing-delay"
oldestContributingGroup = qdMeasurement.CongestedGroupIdx()
c.params.Logger.Debugw("send side bwe: congested queuing-delay", "qd", qdMeasurement)
} else {
congestedTriggered = lossMeasurement.IsCongestedTriggered()
if congestedTriggered {
congestedReason = "loss"
oldestContributingGroup = lossMeasurement.CongestedGroupIdx()
c.params.Logger.Debugw("send side bwe: congested loss", "loss", lossMeasurement)
}
}
return qr, reason, oldestContributingGroup
}
return earlyWarningTriggered, earlyWarningReason, congestedTriggered, congestedReason, oldestContributingGroup
func (c *congestionDetector) getEarlyWarningSignal() (queuingRegion, string, int) {
return c.getCongestionSignal(
"early-warning",
c.params.Config.QueuingDelayEarlyWarning,
c.params.Config.LossEarlyWarning,
)
}
func (c *congestionDetector) getCongestedSignal() (queuingRegion, string, int) {
return c.getCongestionSignal(
"congested",
c.params.Config.QueuingDelayCongested,
c.params.Config.LossCongested,
)
}
func (c *congestionDetector) congestionDetectionStateMachine() (bool, bwe.CongestionState, int64) {
state := c.congestionState
newState := c.congestionState
reason := ""
earlyWarningTriggered, earlyWarningReason, congestedTriggered, congestedReason, oldestContributingGroup := c.isCongestionSignalTriggered()
var (
qr queuingRegion
reason string
oldestContributingGroup int
)
switch state {
case bwe.CongestionStateNone:
if congestedTriggered && !earlyWarningTriggered {
c.params.Logger.Warnw("send side bwe: invalid congested state transition", nil, "from", state, "reason", congestedReason)
}
if earlyWarningTriggered {
qr, reason, oldestContributingGroup = c.getEarlyWarningSignal()
if qr == queuingRegionJQR {
newState = bwe.CongestionStateEarlyWarning
reason = earlyWarningReason
}
case bwe.CongestionStateEarlyWarning:
if congestedTriggered {
qr, reason, oldestContributingGroup = c.getCongestedSignal()
if qr == queuingRegionJQR {
newState = bwe.CongestionStateCongested
reason = congestedReason
} else if !earlyWarningTriggered {
newState = bwe.CongestionStateEarlyWarningHangover
} else {
qr, _, _ := c.getEarlyWarningSignal()
if qr == queuingRegionDQR {
newState = bwe.CongestionStateEarlyWarningHangover
}
}
case bwe.CongestionStateEarlyWarningHangover:
if congestedTriggered && !earlyWarningTriggered {
c.params.Logger.Warnw("send side bwe: invalid congested state transition", nil, "from", state, "reason", congestedReason)
}
if earlyWarningTriggered {
qr, reason, oldestContributingGroup = c.getEarlyWarningSignal()
if qr == queuingRegionJQR {
newState = bwe.CongestionStateEarlyWarning
reason = earlyWarningReason
} else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.EarlyWarningHangover {
newState = bwe.CongestionStateNone
}
case bwe.CongestionStateCongested:
if !congestedTriggered {
qr, _, _ = c.getCongestedSignal()
if qr == queuingRegionDQR {
newState = bwe.CongestionStateCongestedHangover
}
case bwe.CongestionStateCongestedHangover:
if congestedTriggered && !earlyWarningTriggered {
c.params.Logger.Warnw("send side bwe: invalid congested state transition", nil, "from", state, "reason", congestedReason)
}
if earlyWarningTriggered {
qr, reason, oldestContributingGroup = c.getEarlyWarningSignal()
if qr == queuingRegionJQR {
newState = bwe.CongestionStateEarlyWarning
reason = earlyWarningReason
} else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.CongestedHangover {
newState = bwe.CongestionStateNone
}

View File

@@ -87,6 +87,10 @@ func (p *probePacketGroup) ProbeClusterDone(pci ccutils.ProbeClusterInfo) {
p.doneAt = mono.Now()
}
func (p *probePacketGroup) ProbeClusterInfo() ccutils.ProbeClusterInfo {
return p.pci
}
func (p *probePacketGroup) MaybeFinalizeProbe(maxSequenceNumber uint64, rtt float64) (ccutils.ProbeClusterInfo, bool) {
if p.doneAt.IsZero() {
return ccutils.ProbeClusterInfoInvalid, false
@@ -111,7 +115,7 @@ func (p *probePacketGroup) MaybeFinalizeProbe(maxSequenceNumber uint64, rtt floa
}
func (p *probePacketGroup) Add(pi *packetInfo, sendDelta, recvDelta int64, isLost bool) error {
if !p.doneAt.IsZero() || pi.probeClusterId != p.pci.Id {
if pi.probeClusterId != p.pci.Id {
return nil
}

View File

@@ -130,6 +130,10 @@ func (s *SendSideBWE) ProbeClusterDone(pci ccutils.ProbeClusterInfo) {
s.congestionDetector.ProbeClusterDone(pci)
}
func (s *SendSideBWE) ProbeClusterIsGoalReached() bool {
return s.congestionDetector.ProbeClusterIsGoalReached()
}
func (s *SendSideBWE) ProbeClusterFinalize() (ccutils.ProbeSignal, int64, bool) {
return s.congestionDetector.ProbeClusterFinalize()
}

View File

@@ -165,10 +165,14 @@ func (ts *trafficStats) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddInt64("recvDelta", ts.recvDelta)
e.AddInt64("groupDelay", ts.recvDelta-ts.sendDelta)
e.AddFloat64("weightedLoss", ts.WeightedLoss())
if (ts.ackedPackets + ts.lostPackets) != 0 {
e.AddFloat64("rawLoss", float64(ts.lostPackets)/float64(ts.ackedPackets+ts.lostPackets))
totalPackets := ts.lostPackets + ts.ackedPackets
if duration != 0 {
e.AddFloat64("pps", float64(totalPackets)/duration.Seconds())
}
if (totalPackets) != 0 {
e.AddFloat64("rawLoss", float64(ts.lostPackets)/float64(totalPackets))
}
e.AddFloat64("weightedLoss", ts.WeightedLoss())
e.AddInt64("lossPenalty", ts.lossPenalty())
capturedTrafficRatio := ts.CapturedTrafficRatio()

View File

@@ -349,12 +349,15 @@ func (p ProbeClusterGoal) MarshalLogObject(e zapcore.ObjectEncoder) error {
}
type ProbeClusterResult struct {
StartTime int64
EndTime int64
BytesProbe int
BytesNonProbePrimary int
BytesNonProbeRTX int
IsCompleted bool
StartTime int64
EndTime int64
PacketsProbe int
BytesProbe int
PacketsNonProbePrimary int
BytesNonProbePrimary int
PacketsNonProbeRTX int
BytesNonProbeRTX int
IsCompleted bool
}
func (p ProbeClusterResult) Bytes() int {
@@ -378,8 +381,11 @@ func (p ProbeClusterResult) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddTime("StartTime", time.Unix(0, p.StartTime))
e.AddTime("EndTime", time.Unix(0, p.EndTime))
e.AddDuration("Duration", p.Duration())
e.AddInt("PacketsProbe", p.PacketsProbe)
e.AddInt("BytesProbe", p.BytesProbe)
e.AddInt("PacketsNonProbePrimary", p.PacketsNonProbePrimary)
e.AddInt("BytesNonProbePrimary", p.BytesNonProbePrimary)
e.AddInt("PacketsNonProbeRTX", p.PacketsNonProbeRTX)
e.AddInt("BytesNonProbeRTX", p.BytesNonProbeRTX)
e.AddInt("Bytes", p.Bytes())
e.AddFloat64("Bitrate", p.Bitrate())

View File

@@ -1817,7 +1817,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
}
case *rtcp.TransportLayerCC:
if p.MediaSSRC == d.ssrc || (d.ssrcRTX != 0 && p.MediaSSRC == d.ssrcRTX) {
if p.MediaSSRC == d.ssrc {
if sal := d.getStreamAllocatorListener(); sal != nil {
sal.OnTransportCCFeedback(d, p)
}
@@ -1885,6 +1885,13 @@ func (d *DownTrack) handleRTCPRTX(bytes []byte) {
d.rtpStatsRTX.UpdateFromReceiverReport(r)
}
case *rtcp.TransportLayerCC:
if p.MediaSSRC == d.ssrcRTX {
if sal := d.getStreamAllocatorListener(); sal != nil {
sal.OnTransportCCFeedback(d, p)
}
}
}
}
}

View File

@@ -112,11 +112,14 @@ func (po *ProbeObserver) RecordPacket(size int, isRTX bool, probeClusterId ccuti
}
if isProbe {
po.pci.Result.PacketsProbe++
po.pci.Result.BytesProbe += size
} else {
if isRTX {
po.pci.Result.PacketsNonProbeRTX++
po.pci.Result.BytesNonProbeRTX += size
} else {
po.pci.Result.PacketsNonProbePrimary++
po.pci.Result.BytesNonProbePrimary += size
}
}

View File

@@ -83,6 +83,7 @@ const (
streamAllocatorSignalAllocateAllTracks
streamAllocatorSignalAdjustState
streamAllocatorSignalEstimate
streamAllocatorSignalFeedback
streamAllocatorSignalPeriodicPing
streamAllocatorSignalProbeClusterSwitch
streamAllocatorSignalSendProbe
@@ -103,6 +104,8 @@ func (s streamAllocatorSignal) String() string {
return "ADJUST_STATE"
case streamAllocatorSignalEstimate:
return "ESTIMATE"
case streamAllocatorSignalFeedback:
return "FEEDBACK"
case streamAllocatorSignalPeriodicPing:
return "PERIODIC_PING"
case streamAllocatorSignalProbeClusterSwitch:
@@ -435,11 +438,10 @@ func (s *StreamAllocator) OnREMB(downTrack *sfu.DownTrack, remb *rtcp.ReceiverEs
// called when a new transport-cc feedback is received
func (s *StreamAllocator) OnTransportCCFeedback(downTrack *sfu.DownTrack, fb *rtcp.TransportLayerCC) {
if s.sendSideBWEInterceptor != nil {
s.sendSideBWEInterceptor.WriteRTCP([]rtcp.Packet{fb}, nil)
}
s.params.BWE.HandleTWCCFeedback(fb)
s.postEvent(Event{
Signal: streamAllocatorSignalFeedback,
Data: fb,
})
}
// called when target bitrate changes (send side bandwidth estimation)
@@ -612,6 +614,8 @@ func (s *StreamAllocator) postEvent(event Event) {
event.handleSignalAdjustState(event)
case streamAllocatorSignalEstimate:
event.handleSignalEstimate(event)
case streamAllocatorSignalFeedback:
event.handleSignalFeedback(event)
case streamAllocatorSignalPeriodicPing:
event.handleSignalPeriodicPing(event)
case streamAllocatorSignalProbeClusterSwitch:
@@ -660,7 +664,7 @@ func (s *StreamAllocator) handleSignalAdjustState(Event) {
}
func (s *StreamAllocator) handleSignalEstimate(event Event) {
receivedEstimate, _ := event.Data.(int64)
receivedEstimate := event.Data.(int64)
// always update NACKs
packetDelta, repeatedNackDelta := s.getNackDelta()
@@ -673,6 +677,15 @@ func (s *StreamAllocator) handleSignalEstimate(event Event) {
)
}
func (s *StreamAllocator) handleSignalFeedback(event Event) {
fb := event.Data.(*rtcp.TransportLayerCC)
if s.sendSideBWEInterceptor != nil {
s.sendSideBWEInterceptor.WriteRTCP([]rtcp.Packet{fb}, nil)
}
s.params.BWE.HandleTWCCFeedback(fb)
}
func (s *StreamAllocator) handleSignalPeriodicPing(Event) {
// if pause is allowed, there may be no packets sent and BWE could be congested state,
// reset BWE if that persists for a while