mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 22:05:39 +00:00
Fix (hopefully) state transition. (#3375)
My previous PR was trying to fix a premature transition from CONGESTED -> NONE state. But, that introduced a bug which accelerated the transition from EARLY_WARNING -> CONGESTED state due to holding the `queuingRegion` state variable. Fix by not holding `queuingRegion` in state and take result of processing runs. NOTE: This can still use some work to make the state machine cleaner. Initially, I was passing around a bunch of variables which was uncouth. Then moved things to state which made code easier to read and also log different things, but it causes problems like the ones above. Will take another look and think more about making it more robust.
This commit is contained in:
@@ -509,7 +509,6 @@ type congestionDetector struct {
|
||||
congestedTrafficStats *trafficStats
|
||||
congestedPacketGroup *packetGroup
|
||||
|
||||
queuingRegion queuingRegion
|
||||
congestionReason congestionReason
|
||||
qdMeasurement *qdMeasurement
|
||||
lossMeasurement *lossMeasurement
|
||||
@@ -550,7 +549,6 @@ func (c *congestionDetector) Reset() {
|
||||
|
||||
c.clearCTRTrend()
|
||||
|
||||
c.queuingRegion = queuingRegionIndeterminate
|
||||
c.congestionReason = congestionReasonNone
|
||||
c.qdMeasurement = nil
|
||||
c.lossMeasurement = nil
|
||||
@@ -849,7 +847,7 @@ func (c *congestionDetector) updateCongestionSignal(
|
||||
qdDQRConfig CongestionSignalConfig,
|
||||
lossJQRConfig CongestionSignalConfig,
|
||||
lossDQRConfig CongestionSignalConfig,
|
||||
) {
|
||||
) queuingRegion {
|
||||
c.qdMeasurement = newQDMeasurement(
|
||||
qdJQRConfig,
|
||||
qdDQRConfig,
|
||||
@@ -877,23 +875,26 @@ func (c *congestionDetector) updateCongestionSignal(
|
||||
}
|
||||
}
|
||||
|
||||
qr := queuingRegionIndeterminate
|
||||
qdQueuingRegion := c.qdMeasurement.QueuingRegion()
|
||||
lossQueuingRegion := c.lossMeasurement.QueuingRegion()
|
||||
switch {
|
||||
case qdQueuingRegion == queuingRegionJQR:
|
||||
c.queuingRegion = queuingRegionJQR
|
||||
qr = queuingRegionJQR
|
||||
c.congestionReason = congestionReasonQueuingDelay
|
||||
case lossQueuingRegion == queuingRegionJQR:
|
||||
c.queuingRegion = queuingRegionJQR
|
||||
qr = queuingRegionJQR
|
||||
c.congestionReason = congestionReasonLoss
|
||||
case qdQueuingRegion == queuingRegionDQR && lossQueuingRegion == queuingRegionDQR:
|
||||
c.queuingRegion = queuingRegionDQR
|
||||
qr = queuingRegionDQR
|
||||
c.congestionReason = congestionReasonNone
|
||||
}
|
||||
|
||||
return qr
|
||||
}
|
||||
|
||||
func (c *congestionDetector) updateEarlyWarningSignal() {
|
||||
c.updateCongestionSignal(
|
||||
func (c *congestionDetector) updateEarlyWarningSignal() queuingRegion {
|
||||
return c.updateCongestionSignal(
|
||||
c.params.Config.QueuingDelayEarlyWarningJQR,
|
||||
c.params.Config.QueuingDelayEarlyWarningDQR,
|
||||
c.params.Config.LossEarlyWarningJQR,
|
||||
@@ -901,8 +902,8 @@ func (c *congestionDetector) updateEarlyWarningSignal() {
|
||||
)
|
||||
}
|
||||
|
||||
func (c *congestionDetector) updateCongestedSignal() {
|
||||
c.updateCongestionSignal(
|
||||
func (c *congestionDetector) updateCongestedSignal() queuingRegion {
|
||||
return c.updateCongestionSignal(
|
||||
c.params.Config.QueuingDelayCongestedJQR,
|
||||
c.params.Config.QueuingDelayCongestedDQR,
|
||||
c.params.Config.LossCongestedJQR,
|
||||
@@ -916,34 +917,26 @@ func (c *congestionDetector) congestionDetectionStateMachine() (bool, bwe.Conges
|
||||
|
||||
switch fromState {
|
||||
case bwe.CongestionStateNone:
|
||||
c.updateEarlyWarningSignal()
|
||||
if c.queuingRegion == queuingRegionJQR {
|
||||
if c.updateEarlyWarningSignal() == queuingRegionJQR {
|
||||
toState = bwe.CongestionStateEarlyWarning
|
||||
}
|
||||
|
||||
case bwe.CongestionStateEarlyWarning:
|
||||
c.updateCongestedSignal()
|
||||
if c.queuingRegion == queuingRegionJQR {
|
||||
if c.updateCongestedSignal() == queuingRegionJQR {
|
||||
toState = bwe.CongestionStateCongested
|
||||
} else {
|
||||
c.updateEarlyWarningSignal()
|
||||
if c.queuingRegion == queuingRegionDQR {
|
||||
toState = bwe.CongestionStateNone
|
||||
}
|
||||
} else if c.updateEarlyWarningSignal() == queuingRegionDQR {
|
||||
toState = bwe.CongestionStateNone
|
||||
}
|
||||
|
||||
case bwe.CongestionStateCongested:
|
||||
c.updateCongestedSignal()
|
||||
if c.queuingRegion == queuingRegionDQR {
|
||||
if c.updateCongestedSignal() == queuingRegionDQR {
|
||||
toState = bwe.CongestionStateNone
|
||||
}
|
||||
}
|
||||
|
||||
c.estimateAvailableChannelCapacity()
|
||||
|
||||
// update after running the above estimate as state change callback includes the estimated available channel capacity
|
||||
shouldNotify := false
|
||||
if toState != fromState {
|
||||
c.estimateAvailableChannelCapacity()
|
||||
fromState, toState = c.updateCongestionState(toState)
|
||||
shouldNotify = true
|
||||
}
|
||||
@@ -1040,20 +1033,18 @@ func (c *congestionDetector) estimateAvailableChannelCapacity() {
|
||||
return
|
||||
}
|
||||
|
||||
// when in JQR, use contributing groups,
|
||||
// when congested, use contributing groups,
|
||||
// else use a time windowed measurement
|
||||
useWindow := false
|
||||
isAggValid := true
|
||||
minGroupIdx := 0
|
||||
maxGroupIdx := len(c.packetGroups) - 1
|
||||
if c.queuingRegion == queuingRegionJQR {
|
||||
switch c.congestionReason {
|
||||
case congestionReasonQueuingDelay:
|
||||
minGroupIdx, maxGroupIdx = c.qdMeasurement.GroupRange()
|
||||
case congestionReasonLoss:
|
||||
minGroupIdx, maxGroupIdx = c.lossMeasurement.GroupRange()
|
||||
}
|
||||
} else {
|
||||
switch c.congestionReason {
|
||||
case congestionReasonQueuingDelay:
|
||||
minGroupIdx, maxGroupIdx = c.qdMeasurement.GroupRange()
|
||||
case congestionReasonLoss:
|
||||
minGroupIdx, maxGroupIdx = c.lossMeasurement.GroupRange()
|
||||
default:
|
||||
useWindow = true
|
||||
isAggValid = false
|
||||
}
|
||||
@@ -1085,7 +1076,6 @@ func (c *congestionDetector) updateCongestionState(state bwe.CongestionState) (b
|
||||
loggingFields := []any{
|
||||
"from", c.congestionState,
|
||||
"to", state,
|
||||
"queuingRegion", c.queuingRegion,
|
||||
"congestionReason", c.congestionReason,
|
||||
"qdMeasurement", c.qdMeasurement,
|
||||
"lossMeasurement", c.lossMeasurement,
|
||||
@@ -1093,7 +1083,7 @@ func (c *congestionDetector) updateCongestionState(state bwe.CongestionState) (b
|
||||
"estimatedAvailableChannelCapacity", c.estimatedAvailableChannelCapacity,
|
||||
"estimateTrafficStats", c.estimateTrafficStats,
|
||||
}
|
||||
if c.queuingRegion == queuingRegionJQR {
|
||||
if c.congestionReason != congestionReasonNone {
|
||||
var minGroupIdx, maxGroupIdx int
|
||||
switch c.congestionReason {
|
||||
case congestionReasonQueuingDelay:
|
||||
|
||||
Reference in New Issue
Block a user