Catch a few edge cases in stream allocator. (#532)

* Catch a few edge cases in stream allocator.

- More useful logging

* fire probe cluster done callback only when there is an active cluster
This commit is contained in:
Raja Subramanian
2022-03-18 20:42:15 +05:30
committed by GitHub
parent 3ce4010e89
commit d738424173
3 changed files with 48 additions and 23 deletions

View File

@@ -149,11 +149,13 @@ func (p *Prober) IsRunning() bool {
}
func (p *Prober) Reset() {
reset := false
var info ProbeClusterInfo
p.clustersMu.Lock()
if p.activeCluster != nil {
p.logger.Debugw("resetting active cluster", "cluster", p.activeCluster.String())
reset = true
info = p.activeCluster.GetInfo()
}
@@ -161,7 +163,7 @@ func (p *Prober) Reset() {
p.activeCluster = nil
p.clustersMu.Unlock()
if p.onProbeClusterDone != nil && info.BytesSent != 0 {
if p.onProbeClusterDone != nil && reset {
p.onProbeClusterDone(info)
}
}

View File

@@ -644,15 +644,20 @@ func (s *StreamAllocator) handleNewEstimateInProbe() {
// In rare cases, the estimate gets stuck. Prevent from probe running amok
// LK-TODO: Need more testing here to ensure that probe does not cause a lot of damage
//
s.params.Logger.Debugw("probe: aborting, no trend")
s.params.Logger.Debugw("probe: aborting, no trend", "cluster", s.probeClusterId)
s.abortProbe()
case trend == ChannelTrendCongesting:
// stop immediately if the probe is congesting channel more
s.params.Logger.Debugw("probe: aborting, channel is congesting")
s.params.Logger.Debugw("probe: aborting, channel is congesting", "cluster", s.probeClusterId)
s.abortProbe()
case s.probeChannelObserver.GetHighestEstimate() > s.probeGoalBps:
// reached goal, stop probing
s.params.Logger.Debugw("probe: stopping, goal reached")
s.params.Logger.Debugw(
"probe: stopping, goal reached",
"cluster", s.probeClusterId,
"goal", s.probeGoalBps,
"highest", s.probeChannelObserver.GetHighestEstimate(),
)
s.stopProbe()
}
}
@@ -673,9 +678,6 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() {
if nackRatio > NackRatioThresholdNonProbe {
lossAdjustedEstimate = int64(float64(lossAdjustedEstimate) * (1.0 - NackRatioAttenuator*nackRatio))
}
if s.committedChannelCapacity == lossAdjustedEstimate {
return
}
s.params.Logger.Infow(
"channel congestion detected, updating channel capacity",
@@ -814,12 +816,7 @@ func (s *StreamAllocator) finalizeProbe() {
// reset probe interval on a successful probe
s.resetProbeInterval()
if s.committedChannelCapacity == highestEstimateInProbe {
// no increase for whatever reason, don't backoff though
return
}
// probe estimate is higher, commit it and try allocate deficient tracks
// probe estimate is same or higher, commit it and try allocate deficient tracks
s.params.Logger.Infow(
"successful probe, updating channel capacity",
"old(bps)", s.committedChannelCapacity,
@@ -827,6 +824,10 @@ func (s *StreamAllocator) finalizeProbe() {
)
s.committedChannelCapacity = highestEstimateInProbe
s.maybeBoostDeficientTracks()
}
func (s *StreamAllocator) maybeBoostDeficientTracks() {
availableChannelCapacity := s.committedChannelCapacity - s.getExpectedBandwidthUsage()
if availableChannelCapacity <= 0 {
return
@@ -981,10 +982,11 @@ func (s *StreamAllocator) getNackDelta() (uint32, uint32) {
return aggPacketDelta, aggRepeatedNackDelta
}
func (s *StreamAllocator) initProbe(goalBps int64) {
func (s *StreamAllocator) initProbe(probeRateBps int64) {
s.lastProbeStartTime = time.Now()
s.probeGoalBps = goalBps
expectedBandwidthUsage := s.getExpectedBandwidthUsage()
s.probeGoalBps = expectedBandwidthUsage + probeRateBps
s.abortedProbeClusterId = ProbeClusterIdInvalid
@@ -994,6 +996,22 @@ func (s *StreamAllocator) initProbe(goalBps int64) {
s.probeChannelObserver = NewChannelObserver("probe", s.params.Logger, NumRequiredEstimatesProbe, NackRatioThresholdProbe)
s.probeChannelObserver.SeedEstimate(s.lastReceivedEstimate)
s.probeClusterId = s.prober.AddCluster(
int(s.committedChannelCapacity+probeRateBps),
int(expectedBandwidthUsage),
ProbeMinDuration,
ProbeMaxDuration,
)
s.params.Logger.Debugw(
"starting probe",
"probeClusterId", s.probeClusterId,
"current usage", expectedBandwidthUsage,
"committed", s.committedChannelCapacity,
"lastReceived", s.lastReceivedEstimate,
"probeRateBps", probeRateBps,
"goalBps", expectedBandwidthUsage+probeRateBps,
)
}
func (s *StreamAllocator) resetProbe() {
@@ -1083,13 +1101,7 @@ func (s *StreamAllocator) maybeProbeWithPadding() {
probeRateBps = ProbeMinBps
}
s.initProbe(s.getExpectedBandwidthUsage() + probeRateBps)
s.probeClusterId = s.prober.AddCluster(
int(s.committedChannelCapacity+probeRateBps),
int(s.getExpectedBandwidthUsage()),
ProbeMinDuration,
ProbeMaxDuration,
)
s.initProbe(probeRateBps)
break
}
}
@@ -1504,7 +1516,12 @@ func (c *ChannelObserver) GetTrend() ChannelTrend {
switch {
case estimateDirection == TrendDirectionDownward:
c.logger.Debugw("channel observer: estimate is trending downward")
c.logger.Debugw(
"channel observer: estimate is trending downward",
"lowest", c.estimateTrend.GetLowest(),
"highest", c.estimateTrend.GetHighest(),
"estimates", c.estimateTrend.GetValues(),
)
return ChannelTrendCongesting
case nackRatio > c.nackRatioThreshold:
c.logger.Debugw("channel observer: high rate of repeated NACKs", "ratio", nackRatio)
@@ -1598,6 +1615,10 @@ func (t *TrendDetector) GetHighest() int64 {
return t.highestvalue
}
func (t *TrendDetector) GetValues() []int64 {
return t.values
}
func (t *TrendDetector) GetDirection() TrendDirection {
return t.direction
}

View File

@@ -254,6 +254,8 @@ func (s *StreamTrackerManager) addAvailableLayer(layer int32) {
layers := s.availableLayers
s.lock.Unlock()
s.logger.Debugw("available layers changed", "layers", layers)
if s.onAvailableLayersChanged != nil {
s.onAvailableLayersChanged(layers)
}