TWCC tweaks (#3282)

* SSBWE experimentation

* dqr hysteresis

* fixes

* fmt

* more relaxed DQR

* pps proportional to duration

* clean up

* clean up

* don't need gratuitous up allocation
This commit is contained in:
Raja Subramanian
2024-12-21 23:08:38 +05:30
committed by GitHub
parent 8fa1127724
commit 6e9964e80b
5 changed files with 185 additions and 145 deletions
-6
View File
@@ -36,9 +36,7 @@ type CongestionState int
const (
CongestionStateNone CongestionState = iota
CongestionStateEarlyWarning
CongestionStateEarlyWarningHangover
CongestionStateCongested
CongestionStateCongestedHangover
)
func (c CongestionState) String() string {
@@ -47,12 +45,8 @@ func (c CongestionState) String() string {
return "NONE"
case CongestionStateEarlyWarning:
return "EARLY_WARNING"
case CongestionStateEarlyWarningHangover:
return "EARLY_WARNING_HANGOVER"
case CongestionStateCongested:
return "CONGESTED"
case CongestionStateCongestedHangover:
return "CONGESTED_HANGOVER"
default:
return fmt.Sprintf("%d", int(c))
}
+10 -21
View File
@@ -27,22 +27,20 @@ import (
// ---------------------------------------------------------------------------
type RemoteBWEConfig struct {
NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"`
ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"`
ChannelObserverProbe ChannelObserverConfig `yaml:"channel_observer_probe,omitempty"`
ChannelObserverNonProbe ChannelObserverConfig `yaml:"channel_observer_non_probe,omitempty"`
CongestedHangoverDuration time.Duration `yaml:"congested_hangover_duration,omitempty"`
ProbeController ProbeControllerConfig `yaml:"probe_controller,omitempty"`
NackRatioAttenuator float64 `yaml:"nack_ratio_attenuator,omitempty"`
ExpectedUsageThreshold float64 `yaml:"expected_usage_threshold,omitempty"`
ChannelObserverProbe ChannelObserverConfig `yaml:"channel_observer_probe,omitempty"`
ChannelObserverNonProbe ChannelObserverConfig `yaml:"channel_observer_non_probe,omitempty"`
ProbeController ProbeControllerConfig `yaml:"probe_controller,omitempty"`
}
var (
DefaultRemoteBWEConfig = RemoteBWEConfig{
NackRatioAttenuator: 0.4,
ExpectedUsageThreshold: 0.95,
ChannelObserverProbe: defaultChannelObserverConfigProbe,
ChannelObserverNonProbe: defaultChannelObserverConfigNonProbe,
CongestedHangoverDuration: 3 * time.Second,
ProbeController: DefaultProbeControllerConfig,
NackRatioAttenuator: 0.4,
ExpectedUsageThreshold: 0.95,
ChannelObserverProbe: defaultChannelObserverConfigProbe,
ChannelObserverNonProbe: defaultChannelObserverConfigNonProbe,
ProbeController: DefaultProbeControllerConfig,
}
)
@@ -177,15 +175,6 @@ func (r *RemoteBWE) congestionDetectionStateMachine() (bool, bwe.CongestionState
update = true
}
} else {
toState = bwe.CongestionStateCongestedHangover
}
case bwe.CongestionStateCongestedHangover:
if trend == channelTrendCongesting {
if r.estimateAvailableChannelCapacity(reason) {
toState = bwe.CongestionStateCongested
}
} else if time.Since(r.congestionStateSwitchedAt) >= r.params.Config.CongestedHangoverDuration {
toState = bwe.CongestionStateNone
}
}
+160 -101
View File
@@ -39,22 +39,42 @@ func (c CongestionSignalConfig) IsTriggered(numGroups int, duration int64) bool
}
var (
defaultQueuingDelayEarlyWarningCongestionSignalConfig = CongestionSignalConfig{
defaultQueuingDelayEarlyWarningJQRConfig = CongestionSignalConfig{
MinNumberOfGroups: 2,
MinDuration: 200 * time.Millisecond,
}
defaultLossEarlyWarningCongestionSignalConfig = CongestionSignalConfig{
defaultQueuingDelayEarlyWarningDQRConfig = CongestionSignalConfig{
MinNumberOfGroups: 3,
MinDuration: 300 * time.Millisecond,
}
defaultQueuingDelayCongestedCongestionSignalConfig = CongestionSignalConfig{
defaultLossEarlyWarningJQRConfig = CongestionSignalConfig{
MinNumberOfGroups: 3,
MinDuration: 300 * time.Millisecond,
}
defaultLossEarlyWarningDQRConfig = CongestionSignalConfig{
MinNumberOfGroups: 4,
MinDuration: 400 * time.Millisecond,
}
defaultLossCongestedCongestionSignalConfig = CongestionSignalConfig{
defaultQueuingDelayCongestedJQRConfig = CongestionSignalConfig{
MinNumberOfGroups: 4,
MinDuration: 400 * time.Millisecond,
}
defaultQueuingDelayCongestedDQRConfig = CongestionSignalConfig{
MinNumberOfGroups: 5,
MinDuration: 500 * time.Millisecond,
}
defaultLossCongestedJQRConfig = CongestionSignalConfig{
MinNumberOfGroups: 6,
MinDuration: 600 * time.Millisecond,
}
defaultLossCongestedDQRConfig = CongestionSignalConfig{
MinNumberOfGroups: 6,
MinDuration: 600 * time.Millisecond,
}
@@ -158,12 +178,14 @@ func (c congestionReason) String() string {
// -------------------------------------------------------------------------------
type qdMeasurement struct {
config CongestionSignalConfig
jqrMin int64
dqrMax int64
jqrConfig CongestionSignalConfig
dqrConfig CongestionSignalConfig
jqrMin int64
dqrMax int64
numGroups int
numJQRGroups int
numDQRGroups int
minSendTime int64
maxSendTime int64
@@ -174,9 +196,10 @@ type qdMeasurement struct {
queuingRegion queuingRegion
}
func newQdMeasurement(config CongestionSignalConfig, jqrMin int64, dqrMax int64) *qdMeasurement {
func newQDMeasurement(jqrConfig CongestionSignalConfig, dqrConfig CongestionSignalConfig, jqrMin int64, dqrMax int64) *qdMeasurement {
return &qdMeasurement{
config: config,
jqrConfig: jqrConfig,
dqrConfig: dqrConfig,
jqrMin: jqrMin,
dqrMax: dqrMax,
queuingRegion: queuingRegionIndeterminate,
@@ -199,25 +222,40 @@ func (q *qdMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) {
}
q.maxGroupIdx = max(q.maxGroupIdx, groupIdx)
minSendTime, maxSendTime := pg.SendWindow()
if q.minSendTime == 0 || minSendTime < q.minSendTime {
q.minSendTime = minSendTime
}
q.maxSendTime = max(q.maxSendTime, maxSendTime)
if pqd < q.dqrMax {
// a DQR breaks continuity
q.isSealed = true
q.queuingRegion = queuingRegionDQR
return
q.numDQRGroups++
if q.numJQRGroups > 0 {
// JQR continuity is broken
q.isSealed = true
return
}
if q.dqrConfig.IsTriggered(q.numDQRGroups, q.maxSendTime-q.minSendTime) {
q.isSealed = true
q.queuingRegion = queuingRegionDQR
return
}
}
if pqd > q.jqrMin {
q.numJQRGroups++
minSendTime, maxSendTime := pg.SendWindow()
if q.minSendTime == 0 || minSendTime < q.minSendTime {
q.minSendTime = minSendTime
if q.numDQRGroups > 0 {
// DQR continuity is broken
q.isSealed = true
return
}
q.maxSendTime = max(q.maxSendTime, maxSendTime)
}
if q.config.IsTriggered(q.numJQRGroups, q.maxSendTime-q.minSendTime) {
q.isSealed = true
q.queuingRegion = queuingRegionJQR
if q.jqrConfig.IsTriggered(q.numJQRGroups, q.maxSendTime-q.minSendTime) {
q.isSealed = true
q.queuingRegion = queuingRegionJQR
return
}
}
}
@@ -240,6 +278,7 @@ func (q *qdMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddInt("numGroups", q.numGroups)
e.AddInt("numJQRGroups", q.numJQRGroups)
e.AddInt("numDQRGroups", q.numDQRGroups)
e.AddInt64("minSendTime", q.minSendTime)
e.AddInt64("maxSendTime", q.maxSendTime)
e.AddDuration("duration", time.Duration((q.maxSendTime-q.minSendTime)*1000))
@@ -253,14 +292,16 @@ func (q *qdMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error {
// -------------------------------------------------------------------------------
type lossMeasurement struct {
config CongestionSignalConfig
jqrConfig CongestionSignalConfig
dqrConfig CongestionSignalConfig
jqrMinLoss float64
dqrMaxLoss float64
numGroups int
ts *trafficStats
isSealed bool
isJQRSealed bool
isDQRSealed bool
minGroupIdx int
maxGroupIdx int
@@ -270,14 +311,16 @@ type lossMeasurement struct {
}
func newLossMeasurement(
config CongestionSignalConfig,
jqrConfig CongestionSignalConfig,
dqrConfig CongestionSignalConfig,
weightedLossConfig WeightedLossConfig,
jqrMinLoss float64,
dqrMaxLoss float64,
logger logger.Logger,
) *lossMeasurement {
return &lossMeasurement{
config: config,
jqrConfig: jqrConfig,
dqrConfig: dqrConfig,
jqrMinLoss: jqrMinLoss,
dqrMaxLoss: dqrMaxLoss,
ts: newTrafficStats(trafficStatsParams{
@@ -289,7 +332,7 @@ func newLossMeasurement(
}
func (l *lossMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) {
if l.isSealed || !pg.IsFinalized() {
if (l.isJQRSealed && l.isDQRSealed) || !pg.IsFinalized() {
return
}
@@ -301,21 +344,31 @@ func (l *lossMeasurement) ProcessPacketGroup(pg *packetGroup, groupIdx int) {
l.ts.Merge(pg.Traffic())
duration := l.ts.Duration()
if l.config.IsTriggered(l.numGroups, duration) {
l.isSealed = true
l.weightedLoss = l.ts.WeightedLoss()
if !l.isJQRSealed && l.jqrConfig.IsTriggered(l.numGroups, l.ts.Duration()) {
l.isJQRSealed = true
weightedLoss := l.ts.WeightedLoss()
if weightedLoss > l.jqrMinLoss {
l.weightedLoss = weightedLoss
l.queuingRegion = queuingRegionJQR
l.isDQRSealed = true // seal DQR also as JQR is already hit
return
}
}
if l.dqrConfig.IsTriggered(l.numGroups, l.ts.Duration()) {
l.isDQRSealed = true
l.weightedLoss = l.ts.WeightedLoss()
if l.weightedLoss < l.dqrMaxLoss {
l.queuingRegion = queuingRegionDQR
} else if l.weightedLoss > l.jqrMinLoss {
l.queuingRegion = queuingRegionJQR
return
}
}
}
func (l *lossMeasurement) IsSealed() bool {
return l.isSealed
return l.isJQRSealed && l.isDQRSealed
}
func (l *lossMeasurement) QueuingRegion() queuingRegion {
@@ -333,7 +386,8 @@ func (l *lossMeasurement) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddInt("numGroups", l.numGroups)
e.AddObject("ts", l.ts)
e.AddBool("isSealed", l.isSealed)
e.AddBool("isJQRSealed", l.isJQRSealed)
e.AddBool("isDQRSealed", l.isDQRSealed)
e.AddInt("minGroupIdx", l.minGroupIdx)
e.AddInt("maxGroupIdx", l.maxGroupIdx)
e.AddFloat64("weightedLoss", l.weightedLoss)
@@ -358,13 +412,15 @@ type CongestionDetectorConfig struct {
JQRMinWeightedLoss float64 `yaml:"jqr_min_weighted_loss,omitempty"`
DQRMaxWeightedLoss float64 `yaml:"dqr_max_weighted_loss,omitempty"`
QueuingDelayEarlyWarning CongestionSignalConfig `yaml:"queuing_delay_early_warning,omitempty"`
LossEarlyWarning CongestionSignalConfig `yaml:"loss_early_warning,omitempty"`
EarlyWarningHangover time.Duration `yaml:"early_warning_hangover,omitempty"`
QueuingDelayEarlyWarningJQR CongestionSignalConfig `yaml:"queuing_delay_early_warning_jqr,omitempty"`
QueuingDelayEarlyWarningDQR CongestionSignalConfig `yaml:"queuing_delay_early_warning_dqr,omitempty"`
LossEarlyWarningJQR CongestionSignalConfig `yaml:"loss_early_warning_jqr,omitempty"`
LossEarlyWarningDQR CongestionSignalConfig `yaml:"loss_early_warning_dqr,omitempty"`
QueuingDelayCongested CongestionSignalConfig `yaml:"queuing_delay_congested,omitempty"`
LossCongested CongestionSignalConfig `yaml:"loss_congested,omitempty"`
CongestedHangover time.Duration `yaml:"congested_hangover,omitempty"`
QueuingDelayCongestedJQR CongestionSignalConfig `yaml:"queuing_delay_congested_jqr,omitempty"`
QueuingDelayCongestedDQR CongestionSignalConfig `yaml:"queuing_delay_congested_dqr,omitempty"`
LossCongestedJQR CongestionSignalConfig `yaml:"loss_congested_jqr,omitempty"`
LossCongestedDQR CongestionSignalConfig `yaml:"loss_congested_dqr,omitempty"`
CongestedCTRTrend ccutils.TrendDetectorConfig `yaml:"congested_ctr_trend,omitempty"`
CongestedCTREpsilon float64 `yaml:"congested_ctr_epsilon,omitempty"`
@@ -390,7 +446,7 @@ var (
defaultCongestionDetectorConfig = CongestionDetectorConfig{
PacketGroup: defaultPacketGroupConfig,
PacketGroupMaxAge: 5 * time.Second,
PacketGroupMaxAge: 10 * time.Second,
ProbePacketGroup: defaultProbePacketGroupConfig,
ProbeRegulator: ccutils.DefaultProbeRegulatorConfig,
@@ -403,13 +459,15 @@ var (
JQRMinWeightedLoss: 0.25,
DQRMaxWeightedLoss: 0.1,
QueuingDelayEarlyWarning: defaultQueuingDelayEarlyWarningCongestionSignalConfig,
LossEarlyWarning: defaultLossEarlyWarningCongestionSignalConfig,
EarlyWarningHangover: 500 * time.Millisecond,
QueuingDelayEarlyWarningJQR: defaultQueuingDelayEarlyWarningJQRConfig,
QueuingDelayEarlyWarningDQR: defaultQueuingDelayEarlyWarningDQRConfig,
LossEarlyWarningJQR: defaultLossEarlyWarningJQRConfig,
LossEarlyWarningDQR: defaultLossEarlyWarningDQRConfig,
QueuingDelayCongested: defaultQueuingDelayCongestedCongestionSignalConfig,
LossCongested: defaultLossCongestedCongestionSignalConfig,
CongestedHangover: 3 * time.Second,
QueuingDelayCongestedJQR: defaultQueuingDelayCongestedJQRConfig,
QueuingDelayCongestedDQR: defaultQueuingDelayCongestedDQRConfig,
LossCongestedJQR: defaultLossCongestedJQRConfig,
LossCongestedDQR: defaultLossCongestedDQRConfig,
CongestedCTRTrend: defaultTrendDetectorConfigCongestedCTR,
CongestedCTREpsilon: 0.05,
@@ -451,10 +509,10 @@ type congestionDetector struct {
congestedTrafficStats *trafficStats
congestedPacketGroup *packetGroup
queuingRegion queuingRegion
congestionReason congestionReason
jqrQDMeasurement *qdMeasurement
jqrLossMeasurement *lossMeasurement
queuingRegion queuingRegion
congestionReason congestionReason
qdMeasurement *qdMeasurement
lossMeasurement *lossMeasurement
bweListener bwe.BWEListener
}
@@ -494,8 +552,8 @@ func (c *congestionDetector) Reset() {
c.queuingRegion = queuingRegionIndeterminate
c.congestionReason = congestionReasonNone
c.jqrQDMeasurement = nil
c.jqrLossMeasurement = nil
c.qdMeasurement = nil
c.lossMeasurement = nil
}
func (c *congestionDetector) SetBWEListener(bweListener bwe.BWEListener) {
@@ -757,8 +815,17 @@ func (c *congestionDetector) ProbeClusterFinalize() (ccutils.ProbeSignal, int64,
"isSignalValid", isSignalValid,
"probeClusterInfo", pci,
"probePacketGroup", c.probePacketGroup,
"congestionState", c.congestionState,
)
// if congestion signal changed during probe, defer to that signal
if c.congestionState != bwe.CongestionStateNone {
probeSignal := ccutils.ProbeSignalCongesting
c.probeRegulator.ProbeSignal(probeSignal, pci.CreatedAt)
c.probePacketGroup = nil
return probeSignal, c.estimatedAvailableChannelCapacity, true
}
probeSignal, estimatedAvailableChannelCapacity := c.params.Config.ProbeSignal.ProbeSignal(c.probePacketGroup)
if probeSignal == ccutils.ProbeSignalNotCongesting && estimatedAvailableChannelCapacity > c.estimatedAvailableChannelCapacity {
c.estimatedAvailableChannelCapacity = estimatedAvailableChannelCapacity
@@ -789,16 +856,20 @@ func (c *congestionDetector) prunePacketGroups() {
func (c *congestionDetector) updateCongestionSignal(
stage string,
qdConfig CongestionSignalConfig,
lossConfig CongestionSignalConfig,
qdJQRConfig CongestionSignalConfig,
qdDQRConfig CongestionSignalConfig,
lossJQRConfig CongestionSignalConfig,
lossDQRConfig CongestionSignalConfig,
) {
qdMeasurement := newQdMeasurement(
qdConfig,
c.qdMeasurement = newQDMeasurement(
qdJQRConfig,
qdDQRConfig,
c.params.Config.JQRMinDelay.Microseconds(),
c.params.Config.DQRMaxDelay.Microseconds(),
)
lossMeasurement := newLossMeasurement(
lossConfig,
c.lossMeasurement = newLossMeasurement(
lossJQRConfig,
lossDQRConfig,
c.params.Config.WeightedLoss,
c.params.Config.JQRMinWeightedLoss,
c.params.Config.DQRMaxWeightedLoss,
@@ -808,28 +879,23 @@ func (c *congestionDetector) updateCongestionSignal(
var idx int
for idx = len(c.packetGroups) - 1; idx >= 0; idx-- {
pg := c.packetGroups[idx]
qdMeasurement.ProcessPacketGroup(pg, idx)
lossMeasurement.ProcessPacketGroup(pg, idx)
c.qdMeasurement.ProcessPacketGroup(pg, idx)
c.lossMeasurement.ProcessPacketGroup(pg, idx)
// if both measurements have enough data to make a decision, stop processing groups
if qdMeasurement.IsSealed() && lossMeasurement.IsSealed() {
if c.qdMeasurement.IsSealed() && c.lossMeasurement.IsSealed() {
break
}
}
c.congestionReason = congestionReasonNone
c.queuingRegion = qdMeasurement.QueuingRegion()
c.queuingRegion = c.qdMeasurement.QueuingRegion()
if c.queuingRegion == queuingRegionJQR {
c.congestionReason = congestionReasonQueuingDelay
c.jqrQDMeasurement = qdMeasurement
} else {
c.jqrQDMeasurement = nil
c.queuingRegion = lossMeasurement.QueuingRegion()
c.queuingRegion = c.lossMeasurement.QueuingRegion()
if c.queuingRegion == queuingRegionJQR {
c.congestionReason = congestionReasonLoss
c.jqrLossMeasurement = lossMeasurement
} else {
c.jqrLossMeasurement = nil
}
}
}
@@ -837,16 +903,20 @@ func (c *congestionDetector) updateCongestionSignal(
func (c *congestionDetector) updateEarlyWarningSignal() {
c.updateCongestionSignal(
"early-warning",
c.params.Config.QueuingDelayEarlyWarning,
c.params.Config.LossEarlyWarning,
c.params.Config.QueuingDelayEarlyWarningJQR,
c.params.Config.QueuingDelayEarlyWarningDQR,
c.params.Config.LossEarlyWarningJQR,
c.params.Config.LossEarlyWarningDQR,
)
}
func (c *congestionDetector) updateCongestedSignal() {
c.updateCongestionSignal(
"congested",
c.params.Config.QueuingDelayCongested,
c.params.Config.LossCongested,
c.params.Config.QueuingDelayCongestedJQR,
c.params.Config.QueuingDelayCongestedDQR,
c.params.Config.LossCongestedJQR,
c.params.Config.LossCongestedDQR,
)
}
@@ -868,29 +938,13 @@ func (c *congestionDetector) congestionDetectionStateMachine() (bool, bwe.Conges
} else {
c.updateEarlyWarningSignal()
if c.queuingRegion == queuingRegionDQR {
toState = bwe.CongestionStateEarlyWarningHangover
toState = bwe.CongestionStateNone
}
}
case bwe.CongestionStateEarlyWarningHangover:
c.updateEarlyWarningSignal()
if c.queuingRegion == queuingRegionJQR {
toState = bwe.CongestionStateEarlyWarning
} else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.EarlyWarningHangover {
toState = bwe.CongestionStateNone
}
case bwe.CongestionStateCongested:
c.updateCongestedSignal()
if c.queuingRegion == queuingRegionDQR {
toState = bwe.CongestionStateCongestedHangover
}
case bwe.CongestionStateCongestedHangover:
c.updateEarlyWarningSignal()
if c.queuingRegion == queuingRegionJQR {
toState = bwe.CongestionStateEarlyWarning
} else if time.Since(c.congestionStateSwitchedAt) >= c.params.Config.CongestedHangover {
toState = bwe.CongestionStateNone
}
}
@@ -1001,10 +1055,14 @@ func (c *congestionDetector) estimateAvailableChannelCapacity() {
useWindow := false
isAggValid := true
minGroupIdx := 0
if c.jqrQDMeasurement != nil {
minGroupIdx, _ = c.jqrQDMeasurement.GroupRange()
} else if c.jqrLossMeasurement != nil {
minGroupIdx, _ = c.jqrLossMeasurement.GroupRange()
maxGroupIdx := len(c.packetGroups) - 1
if c.queuingRegion == queuingRegionJQR {
switch c.congestionReason {
case congestionReasonQueuingDelay:
minGroupIdx, maxGroupIdx = c.qdMeasurement.GroupRange()
case congestionReasonLoss:
minGroupIdx, maxGroupIdx = c.lossMeasurement.GroupRange()
}
} else {
useWindow = true
isAggValid = false
@@ -1014,7 +1072,7 @@ func (c *congestionDetector) estimateAvailableChannelCapacity() {
Config: c.params.Config.WeightedLoss,
Logger: c.params.Logger,
})
for idx := len(c.packetGroups) - 1; idx >= minGroupIdx; idx-- {
for idx := maxGroupIdx; idx >= minGroupIdx; idx-- {
pg := c.packetGroups[idx]
if !pg.IsFinalized() {
continue
@@ -1039,21 +1097,22 @@ func (c *congestionDetector) updateCongestionState(state bwe.CongestionState) (b
"to", state,
"queuingRegion", c.queuingRegion,
"congestionReason", c.congestionReason,
"qdMeasurement", c.qdMeasurement,
"lossMeasurement", c.lossMeasurement,
"numPacketGroups", len(c.packetGroups),
"estimatedAvailableChannelCapacity", c.estimatedAvailableChannelCapacity,
"estimateTrafficStats", c.estimateTrafficStats,
}
if c.queuingRegion == queuingRegionJQR {
var minGroupIdx, maxGroupIdx int
if c.jqrQDMeasurement != nil {
minGroupIdx, maxGroupIdx = c.jqrQDMeasurement.GroupRange()
} else if c.jqrLossMeasurement != nil {
minGroupIdx, maxGroupIdx = c.jqrLossMeasurement.GroupRange()
switch c.congestionReason {
case congestionReasonQueuingDelay:
minGroupIdx, maxGroupIdx = c.qdMeasurement.GroupRange()
case congestionReasonLoss:
minGroupIdx, maxGroupIdx = c.lossMeasurement.GroupRange()
}
loggingFields = append(
loggingFields,
"jqrQDMeasurement", c.jqrQDMeasurement,
"jqrLossMeasurement", c.jqrLossMeasurement,
"contributingGroups", logger.ObjectSlice(c.packetGroups[minGroupIdx:maxGroupIdx+1]),
)
}
+13 -4
View File
@@ -26,14 +26,16 @@ import (
type WeightedLossConfig struct {
MinDurationForLossValidity time.Duration `yaml:"min_duration_for_loss_validity,omitempty"`
MinPPSForLossValidity int `yaml:"min_pps_for_loss_validity,omitempty"`
BaseDuration time.Duration `yaml:"base_duration,omitempty"`
BasePPS int `yaml:"base_pps,omitempty"`
LossPenaltyFactor float64 `yaml:"loss_penalty_factor,omitempty"`
}
var (
defaultWeightedLossConfig = WeightedLossConfig{
MinDurationForLossValidity: 250 * time.Millisecond,
MinPPSForLossValidity: 30,
MinDurationForLossValidity: 100 * time.Millisecond,
BaseDuration: 500 * time.Millisecond,
BasePPS: 30,
LossPenaltyFactor: 0.25,
}
)
@@ -119,7 +121,14 @@ func (ts *trafficStats) WeightedLoss() float64 {
totalPackets := float64(ts.lostPackets + ts.ackedPackets)
pps := totalPackets * 1e6 / float64(durationMicro)
if int(pps) < ts.params.Config.MinPPSForLossValidity {
// longer duration, i. e. more time resolution, lower pps is acceptable as the measurement is more stable
deltaDuration := time.Duration(durationMicro*1000) - ts.params.Config.BaseDuration
if deltaDuration < 0 {
deltaDuration = 0
}
threshold := math.Exp(-deltaDuration.Seconds()) * float64(ts.params.Config.BasePPS)
if pps < threshold {
return 0.0
}
+2 -13
View File
@@ -727,17 +727,6 @@ func (s *StreamAllocator) handleSignalPeriodicPing(Event) {
}
}
// try up allocations in case there is available headroom,
// it is possible that a previous up allocation is waiting to settle,
// so even if there was headroom available while doing previous up allocation
// it may not have used up all available headroom,
// check before probing again as this could use available headroom and
// up allocate all tracks to their desired layers, that would avoid
// an unnecessary probe cluster
if s.state == streamAllocatorStateDeficient {
s.maybeBoostDeficientTracks()
}
// probe if necessary and timing is right
if s.state == streamAllocatorStateDeficient {
s.maybeProbe()
@@ -1439,11 +1428,11 @@ func updateStreamStateChange(track *Track, allocation sfu.VideoAllocation, updat
}
func isHoldableCongestionState(bweCongestionState bwe.CongestionState) bool {
return bweCongestionState == bwe.CongestionStateEarlyWarning || bweCongestionState == bwe.CongestionStateEarlyWarningHangover
return bweCongestionState == bwe.CongestionStateEarlyWarning
}
func isDeficientCongestionState(bweCongestionState bwe.CongestionState) bool {
return bweCongestionState == bwe.CongestionStateCongested || bweCongestionState == bwe.CongestionStateCongestedHangover
return bweCongestionState == bwe.CongestionStateCongested
}
// ------------------------------------------------