From d7384241739e498ea3d2fb760fe2f10621037ffa Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 18 Mar 2022 20:42:15 +0530 Subject: [PATCH] Catch a few edge cases in stream allocator. (#532) * Catch a few edge cases in stream allocator. - More useful logging * fire probe cluster done callback only when there is an active cluster --- pkg/sfu/prober.go | 4 +- pkg/sfu/streamallocator.go | 65 ++++++++++++++++++++++----------- pkg/sfu/streamtrackermanager.go | 2 + 3 files changed, 48 insertions(+), 23 deletions(-) diff --git a/pkg/sfu/prober.go b/pkg/sfu/prober.go index 0d711bfba..6627991ce 100644 --- a/pkg/sfu/prober.go +++ b/pkg/sfu/prober.go @@ -149,11 +149,13 @@ func (p *Prober) IsRunning() bool { } func (p *Prober) Reset() { + reset := false var info ProbeClusterInfo p.clustersMu.Lock() if p.activeCluster != nil { p.logger.Debugw("resetting active cluster", "cluster", p.activeCluster.String()) + reset = true info = p.activeCluster.GetInfo() } @@ -161,7 +163,7 @@ func (p *Prober) Reset() { p.activeCluster = nil p.clustersMu.Unlock() - if p.onProbeClusterDone != nil && info.BytesSent != 0 { + if p.onProbeClusterDone != nil && reset { p.onProbeClusterDone(info) } } diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index 410d47523..1a1df527e 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -644,15 +644,20 @@ func (s *StreamAllocator) handleNewEstimateInProbe() { // In rare cases, the estimate gets stuck. Prevent from probe running amok // LK-TODO: Need more testing here to ensure that probe does not cause a lot of damage // - s.params.Logger.Debugw("probe: aborting, no trend") + s.params.Logger.Debugw("probe: aborting, no trend", "cluster", s.probeClusterId) s.abortProbe() case trend == ChannelTrendCongesting: // stop immediately if the probe is congesting channel more - s.params.Logger.Debugw("probe: aborting, channel is congesting") + s.params.Logger.Debugw("probe: aborting, channel is congesting", "cluster", s.probeClusterId) s.abortProbe() case s.probeChannelObserver.GetHighestEstimate() > s.probeGoalBps: // reached goal, stop probing - s.params.Logger.Debugw("probe: stopping, goal reached") + s.params.Logger.Debugw( + "probe: stopping, goal reached", + "cluster", s.probeClusterId, + "goal", s.probeGoalBps, + "highest", s.probeChannelObserver.GetHighestEstimate(), + ) s.stopProbe() } } @@ -673,9 +678,6 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() { if nackRatio > NackRatioThresholdNonProbe { lossAdjustedEstimate = int64(float64(lossAdjustedEstimate) * (1.0 - NackRatioAttenuator*nackRatio)) } - if s.committedChannelCapacity == lossAdjustedEstimate { - return - } s.params.Logger.Infow( "channel congestion detected, updating channel capacity", @@ -814,12 +816,7 @@ func (s *StreamAllocator) finalizeProbe() { // reset probe interval on a successful probe s.resetProbeInterval() - if s.committedChannelCapacity == highestEstimateInProbe { - // no increase for whatever reason, don't backoff though - return - } - - // probe estimate is higher, commit it and try allocate deficient tracks + // probe estimate is same or higher, commit it and try allocate deficient tracks s.params.Logger.Infow( "successful probe, updating channel capacity", "old(bps)", s.committedChannelCapacity, @@ -827,6 +824,10 @@ func (s *StreamAllocator) finalizeProbe() { ) s.committedChannelCapacity = highestEstimateInProbe + s.maybeBoostDeficientTracks() +} + +func (s *StreamAllocator) maybeBoostDeficientTracks() { availableChannelCapacity := s.committedChannelCapacity - s.getExpectedBandwidthUsage() if availableChannelCapacity <= 0 { return @@ -981,10 +982,11 @@ func (s *StreamAllocator) getNackDelta() (uint32, uint32) { return aggPacketDelta, aggRepeatedNackDelta } -func (s *StreamAllocator) initProbe(goalBps int64) { +func (s *StreamAllocator) initProbe(probeRateBps int64) { s.lastProbeStartTime = time.Now() - s.probeGoalBps = goalBps + expectedBandwidthUsage := s.getExpectedBandwidthUsage() + s.probeGoalBps = expectedBandwidthUsage + probeRateBps s.abortedProbeClusterId = ProbeClusterIdInvalid @@ -994,6 +996,22 @@ func (s *StreamAllocator) initProbe(goalBps int64) { s.probeChannelObserver = NewChannelObserver("probe", s.params.Logger, NumRequiredEstimatesProbe, NackRatioThresholdProbe) s.probeChannelObserver.SeedEstimate(s.lastReceivedEstimate) + + s.probeClusterId = s.prober.AddCluster( + int(s.committedChannelCapacity+probeRateBps), + int(expectedBandwidthUsage), + ProbeMinDuration, + ProbeMaxDuration, + ) + s.params.Logger.Debugw( + "starting probe", + "probeClusterId", s.probeClusterId, + "current usage", expectedBandwidthUsage, + "committed", s.committedChannelCapacity, + "lastReceived", s.lastReceivedEstimate, + "probeRateBps", probeRateBps, + "goalBps", expectedBandwidthUsage+probeRateBps, + ) } func (s *StreamAllocator) resetProbe() { @@ -1083,13 +1101,7 @@ func (s *StreamAllocator) maybeProbeWithPadding() { probeRateBps = ProbeMinBps } - s.initProbe(s.getExpectedBandwidthUsage() + probeRateBps) - s.probeClusterId = s.prober.AddCluster( - int(s.committedChannelCapacity+probeRateBps), - int(s.getExpectedBandwidthUsage()), - ProbeMinDuration, - ProbeMaxDuration, - ) + s.initProbe(probeRateBps) break } } @@ -1504,7 +1516,12 @@ func (c *ChannelObserver) GetTrend() ChannelTrend { switch { case estimateDirection == TrendDirectionDownward: - c.logger.Debugw("channel observer: estimate is trending downward") + c.logger.Debugw( + "channel observer: estimate is trending downward", + "lowest", c.estimateTrend.GetLowest(), + "highest", c.estimateTrend.GetHighest(), + "estimates", c.estimateTrend.GetValues(), + ) return ChannelTrendCongesting case nackRatio > c.nackRatioThreshold: c.logger.Debugw("channel observer: high rate of repeated NACKs", "ratio", nackRatio) @@ -1598,6 +1615,10 @@ func (t *TrendDetector) GetHighest() int64 { return t.highestvalue } +func (t *TrendDetector) GetValues() []int64 { + return t.values +} + func (t *TrendDetector) GetDirection() TrendDirection { return t.direction } diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 65022691a..b13d02679 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -254,6 +254,8 @@ func (s *StreamTrackerManager) addAvailableLayer(layer int32) { layers := s.availableLayers s.lock.Unlock() + s.logger.Debugw("available layers changed", "layers", layers) + if s.onAvailableLayersChanged != nil { s.onAvailableLayersChanged(layers) }