From fdfd830394b64de6671f230f4c25810beb669142 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 29 May 2023 14:41:44 +0530 Subject: [PATCH] Split probe controller from StreamAllocator. (#1751) * Split probe controller from StreamAllocator. With TWCC, there is a need to check for probe status in a separate goroutine. So, probe specific stuff need locking. Split out the probe controller to make that cleaner. * remove defer --- pkg/sfu/streamallocator/probe_controller.go | 263 ++++++++++++++++++++ pkg/sfu/streamallocator/streamallocator.go | 175 ++----------- 2 files changed, 290 insertions(+), 148 deletions(-) create mode 100644 pkg/sfu/streamallocator/probe_controller.go diff --git a/pkg/sfu/streamallocator/probe_controller.go b/pkg/sfu/streamallocator/probe_controller.go new file mode 100644 index 000000000..f32c38b14 --- /dev/null +++ b/pkg/sfu/streamallocator/probe_controller.go @@ -0,0 +1,263 @@ +package streamallocator + +import ( + "sync" + "time" + + "github.com/livekit/protocol/logger" +) + +const ( + ProbeWaitBase = 5 * time.Second + ProbeBackoffFactor = 1.5 + ProbeWaitMax = 30 * time.Second + ProbeSettleWait = 250 + ProbeTrendWait = 2 * time.Second + + ProbePct = 120 + ProbeMinBps = 200 * 1000 // 200 kbps + ProbeMinDuration = 20 * time.Second + ProbeMaxDuration = 21 * time.Second +) + +// --------------------------------------------------------------------------- + +type ProbeControllerParams struct { + Prober *Prober + Logger logger.Logger +} + +type ProbeController struct { + params ProbeControllerParams + + lock sync.RWMutex + probeInterval time.Duration + lastProbeStartTime time.Time + probeGoalBps int64 + probeClusterId ProbeClusterId + abortedProbeClusterId ProbeClusterId + probeTrendObserved bool + probeEndTime time.Time + + onProbeSuccess func() +} + +func NewProbeController(params ProbeControllerParams) *ProbeController { + p := &ProbeController{ + params: params, + } + + p.Reset() + return p +} + +func (p *ProbeController) OnProbeSuccess(f func()) { + p.lock.Lock() + defer p.lock.Unlock() + + p.onProbeSuccess = f +} + +func (p *ProbeController) Reset() { + p.lock.Lock() + defer p.lock.Unlock() + + p.lastProbeStartTime = time.Now() + + p.resetProbeIntervalLocked() + + p.clearProbeLocked() +} + +func (p *ProbeController) ProbeClusterDone(info ProbeClusterInfo, lowestEstimate int64) { + p.lock.Lock() + if p.probeClusterId != info.Id { + p.lock.Unlock() + return + } + + if p.abortedProbeClusterId == ProbeClusterIdInvalid { + // successful probe, finalize + isSuccessful := p.finalizeProbeLocked() + + var onProbeSuccess func() + if isSuccessful { + onProbeSuccess = p.onProbeSuccess + } + p.lock.Unlock() + + if onProbeSuccess != nil { + onProbeSuccess() + } + return + } + + // ensure probe queue is flushed + // STREAM-ALLOCATOR-TODO: ProbeSettleWait should actually be a certain number of RTTs. + expectedDuration := float64(info.BytesSent*8*1000) / float64(lowestEstimate) + queueTime := expectedDuration - float64(info.Duration.Milliseconds()) + if queueTime < 0.0 { + queueTime = 0.0 + } + queueWait := time.Duration(queueTime+float64(ProbeSettleWait)) * time.Millisecond + p.probeEndTime = p.lastProbeStartTime.Add(queueWait) + p.lock.Unlock() +} + +func (p *ProbeController) CheckProbe(trend ChannelTrend, highestEstimate int64) { + p.lock.Lock() + defer p.lock.Unlock() + + if p.probeClusterId == ProbeClusterIdInvalid { + return + } + + if trend != ChannelTrendNeutral { + p.probeTrendObserved = true + } + + switch { + case !p.probeTrendObserved && time.Since(p.lastProbeStartTime) > ProbeTrendWait: + // + // More of a safety net. + // In rare cases, the estimate gets stuck. Prevent from probe running amok + // STREAM-ALLOCATOR-TODO: Need more testing here to ensure that probe does not cause a lot of damage + // + p.params.Logger.Infow("stream allocator: probe: aborting, no trend", "cluster", p.probeClusterId) + p.abortProbeLocked() + + case trend == ChannelTrendCongesting: + // stop immediately if the probe is congesting channel more + p.params.Logger.Infow("stream allocator: probe: aborting, channel is congesting", "cluster", p.probeClusterId) + p.abortProbeLocked() + + case highestEstimate > p.probeGoalBps: + // reached goal, stop probing + p.params.Logger.Infow( + "stream allocator: probe: stopping, goal reached", + "cluster", p.probeClusterId, + "goal", p.probeGoalBps, + "highest", highestEstimate, + ) + p.StopProbe() + } +} + +func (p *ProbeController) MaybeFinalizeProbe() { + p.lock.Lock() + isSuccessful := false + if p.isInProbeLocked() && !p.probeEndTime.IsZero() && time.Now().After(p.probeEndTime) { + isSuccessful = p.finalizeProbeLocked() + } + + var onProbeSuccess func() + if isSuccessful { + onProbeSuccess = p.onProbeSuccess + } + p.lock.Unlock() + + if onProbeSuccess != nil { + onProbeSuccess() + } +} + +func (p *ProbeController) DoesProbeNeedFinalize() bool { + p.lock.RLock() + defer p.lock.RUnlock() + + return p.abortedProbeClusterId != ProbeClusterIdInvalid +} + +func (p *ProbeController) finalizeProbeLocked() bool { + aborted := p.probeClusterId == p.abortedProbeClusterId + + p.clearProbeLocked() + + if aborted { + // failed probe, backoff + p.backoffProbeIntervalLocked() + return false + } + + // reset probe interval on a successful probe + p.resetProbeIntervalLocked() + return true +} + +func (p *ProbeController) InitProbe(probeGoalDeltaBps int64, expectedBandwidthUsage int64) (ProbeClusterId, int64) { + p.lock.Lock() + defer p.lock.Unlock() + + p.lastProbeStartTime = time.Now() + + // overshoot a bit to account for noise (in measurement/estimate etc) + p.probeGoalBps = expectedBandwidthUsage + ((probeGoalDeltaBps * ProbePct) / 100) + + p.abortedProbeClusterId = ProbeClusterIdInvalid + + p.probeTrendObserved = false + + p.probeEndTime = time.Time{} + + p.probeClusterId = p.params.Prober.AddCluster( + ProbeClusterModeUniform, + int(p.probeGoalBps), + int(expectedBandwidthUsage), + ProbeMinDuration, + ProbeMaxDuration, + ) + + return p.probeClusterId, p.probeGoalBps +} + +func (p *ProbeController) clearProbeLocked() { + p.probeClusterId = ProbeClusterIdInvalid + p.abortedProbeClusterId = ProbeClusterIdInvalid +} + +func (p *ProbeController) backoffProbeIntervalLocked() { + p.probeInterval = time.Duration(p.probeInterval.Seconds()*ProbeBackoffFactor) * time.Second + if p.probeInterval > ProbeWaitMax { + p.probeInterval = ProbeWaitMax + } +} + +func (p *ProbeController) resetProbeIntervalLocked() { + p.probeInterval = ProbeWaitBase +} + +func (p *ProbeController) StopProbe() { + p.params.Prober.Reset() +} + +func (p *ProbeController) AbortProbe() { + p.lock.Lock() + defer p.lock.Unlock() + + p.abortProbeLocked() +} + +func (p *ProbeController) abortProbeLocked() { + p.abortedProbeClusterId = p.probeClusterId + p.StopProbe() +} + +func (p *ProbeController) IsInProbe() bool { + p.lock.RLock() + defer p.lock.RUnlock() + + return p.isInProbeLocked() +} + +func (p *ProbeController) isInProbeLocked() bool { + return p.probeClusterId != ProbeClusterIdInvalid +} + +func (p *ProbeController) CanProbe() bool { + p.lock.RLock() + defer p.lock.RUnlock() + + return time.Since(p.lastProbeStartTime) >= p.probeInterval && p.probeClusterId == ProbeClusterIdInvalid +} + +// ------------------------------------------------ diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index b680ae817..7cc35f84a 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -25,17 +25,6 @@ const ( NackRatioAttenuator = 0.4 // how much to attenuate NACK ratio while calculating loss adjusted estimate - ProbeWaitBase = 5 * time.Second - ProbeBackoffFactor = 1.5 - ProbeWaitMax = 30 * time.Second - ProbeSettleWait = 250 - ProbeTrendWait = 2 * time.Second - - ProbePct = 120 - ProbeMinBps = 200 * 1000 // 200 kbps - ProbeMinDuration = 20 * time.Second - ProbeMaxDuration = 21 * time.Second - PriorityMin = uint8(1) PriorityMax = uint8(255) PriorityDefaultScreenshare = PriorityMax @@ -175,13 +164,7 @@ type StreamAllocator struct { committedChannelCapacity int64 overriddenChannelCapacity int64 - probeInterval time.Duration - lastProbeStartTime time.Time - probeGoalBps int64 - probeClusterId ProbeClusterId - abortedProbeClusterId ProbeClusterId - probeTrendObserved bool - probeEndTime time.Time + probeController *ProbeController prober *Prober @@ -213,6 +196,12 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator { eventCh: make(chan Event, 1000), } + s.probeController = NewProbeController(ProbeControllerParams{ + Prober: s.prober, + Logger: params.Logger, + }) + s.probeController.OnProbeSuccess(s.onProbeSuccess) + s.resetState() s.prober.SetProberListener(s) @@ -319,7 +308,7 @@ func (s *StreamAllocator) SetChannelCapacity(channelCapacity int64) { func (s *StreamAllocator) resetState() { s.channelObserver = s.newChannelObserverNonProbe() - s.resetProbe() + s.probeController.Reset() s.state = streamAllocatorStateStable } @@ -561,7 +550,7 @@ func (s *StreamAllocator) processEvents() { s.handleEvent(&event) } - s.stopProbe() + s.probeController.StopProbe() } func (s *StreamAllocator) ping() { @@ -642,7 +631,7 @@ func (s *StreamAllocator) handleSignalEstimate(event *Event) { s.monitorRate(receivedEstimate) // while probing, maintain estimate separately to enable keeping current committed estimate if probe fails - if s.isInProbe() { + if s.probeController.IsInProbe() { s.handleNewEstimateInProbe() } else { s.handleNewEstimateInNonProbe() @@ -651,9 +640,7 @@ func (s *StreamAllocator) handleSignalEstimate(event *Event) { func (s *StreamAllocator) handleSignalPeriodicPing(event *Event) { // finalize probe if necessary - if s.isInProbe() && !s.probeEndTime.IsZero() && time.Now().After(s.probeEndTime) { - s.finalizeProbe() - } + s.probeController.MaybeFinalizeProbe() // probe if necessary and timing is right if s.state == streamAllocatorStateDeficient { @@ -686,26 +673,7 @@ func (s *StreamAllocator) handleSignalSendProbe(event *Event) { func (s *StreamAllocator) handleSignalProbeClusterDone(event *Event) { info, _ := event.Data.(ProbeClusterInfo) - if s.probeClusterId != info.Id { - return - } - - if s.abortedProbeClusterId == ProbeClusterIdInvalid { - // successful probe, finalize - s.finalizeProbe() - return - } - - // ensure probe queue is flushed - // STREAM-ALLOCATOR-TODO: ProbeSettleWait should actually be a certain number of RTTs. - lowestEstimate := int64(math.Min(float64(s.committedChannelCapacity), float64(s.channelObserver.GetLowestEstimate()))) - expectedDuration := float64(info.BytesSent*8*1000) / float64(lowestEstimate) - queueTime := expectedDuration - float64(info.Duration.Milliseconds()) - if queueTime < 0.0 { - queueTime = 0.0 - } - queueWait := time.Duration(queueTime+float64(ProbeSettleWait)) * time.Millisecond - s.probeEndTime = s.lastProbeStartTime.Add(queueWait) + s.probeController.ProbeClusterDone(info, int64(math.Min(float64(s.committedChannelCapacity), float64(s.channelObserver.GetLowestEstimate())))) } func (s *StreamAllocator) handleSignalResume(event *Event) { @@ -732,7 +700,7 @@ func (s *StreamAllocator) handleSignalSetChannelCapacity(event *Event) { s.params.Logger.Infow("allocating on override channel capacity", "override", s.overriddenChannelCapacity) s.allocateAllTracks() } else { - s.params.Logger.Infow("clearing override channel capacity") + s.params.Logger.Infow("clearing override channel capacity") } } @@ -769,7 +737,7 @@ func (s *StreamAllocator) setState(state streamAllocatorState) { s.state = state // reset probe to enforce a delay after state change before probing - s.lastProbeStartTime = time.Now() + s.probeController.Reset() } func (s *StreamAllocator) adjustState() { @@ -787,7 +755,7 @@ func (s *StreamAllocator) handleNewEstimateInProbe() { // always update NACKs, even if aborted packetDelta, repeatedNackDelta := s.getNackDelta() - if s.abortedProbeClusterId != ProbeClusterIdInvalid { + if s.probeController.DoesProbeNeedFinalize() { // waiting for aborted probe to finalize return } @@ -796,35 +764,7 @@ func (s *StreamAllocator) handleNewEstimateInProbe() { s.channelObserver.AddNack(packetDelta, repeatedNackDelta) trend, _ := s.channelObserver.GetTrend() - if trend != ChannelTrendNeutral { - s.probeTrendObserved = true - } - - switch { - case !s.probeTrendObserved && time.Since(s.lastProbeStartTime) > ProbeTrendWait: - // - // More of a safety net. - // In rare cases, the estimate gets stuck. Prevent from probe running amok - // STREAM-ALLOCATOR-TODO: Need more testing here to ensure that probe does not cause a lot of damage - // - s.params.Logger.Infow("stream allocator: probe: aborting, no trend", "cluster", s.probeClusterId) - s.abortProbe() - - case trend == ChannelTrendCongesting: - // stop immediately if the probe is congesting channel more - s.params.Logger.Infow("stream allocator: probe: aborting, channel is congesting", "cluster", s.probeClusterId) - s.abortProbe() - - case s.channelObserver.GetHighestEstimate() > s.probeGoalBps: - // reached goal, stop probing - s.params.Logger.Infow( - "stream allocator: probe: stopping, goal reached", - "cluster", s.probeClusterId, - "goal", s.probeGoalBps, - "highest", s.channelObserver.GetHighestEstimate(), - ) - s.stopProbe() - } + s.probeController.CheckProbe(trend, s.channelObserver.GetHighestEstimate()) } func (s *StreamAllocator) handleNewEstimateInNonProbe() { @@ -872,14 +812,14 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() { s.channelObserver = s.newChannelObserverNonProbe() // reset probe to ensure it does not start too soon after a downward trend - s.resetProbe() + s.probeController.Reset() s.allocateAllTracks() } func (s *StreamAllocator) allocateTrack(track *Track) { // abort any probe that may be running when a track specific change needs allocation - s.abortProbe() + s.probeController.AbortProbe() // if not deficient, free pass allocate track if !s.params.Config.Enabled || s.state == streamAllocatorStateStable || !track.IsManaged() { @@ -976,12 +916,9 @@ func (s *StreamAllocator) allocateTrack(track *Track) { s.adjustState() } -func (s *StreamAllocator) finalizeProbe() { - aborted := s.probeClusterId == s.abortedProbeClusterId +func (s *StreamAllocator) onProbeSuccess() { highestEstimateInProbe := s.channelObserver.GetHighestEstimate() - s.clearProbe() - // // Reset estimator at the end of a probe irrespective of probe result to get fresh readings. // With a failed probe, the latest estimate could be lower than committed estimate. @@ -995,15 +932,6 @@ func (s *StreamAllocator) finalizeProbe() { // s.channelObserver = s.newChannelObserverNonProbe() - if aborted { - // failed probe, backoff - s.backoffProbeInterval() - return - } - - // reset probe interval on a successful probe - s.resetProbeInterval() - // probe estimate is same or higher, commit it and try to allocate deficient tracks s.params.Logger.Infow( "successful probe, updating channel capacity", @@ -1211,8 +1139,6 @@ func (s *StreamAllocator) newChannelObserverNonProbe() *ChannelObserver { } func (s *StreamAllocator) initProbe(probeGoalDeltaBps int64) { - s.lastProbeStartTime = time.Now() - expectedBandwidthUsage := s.getExpectedBandwidthUsage() if float64(expectedBandwidthUsage) > 1.5*float64(s.committedChannelCapacity) { // STREAM-ALLOCATOR-TODO-START @@ -1227,14 +1153,8 @@ func (s *StreamAllocator) initProbe(probeGoalDeltaBps int64) { fmt.Errorf("expected too high, expected: %d, committed: %d", expectedBandwidthUsage, s.committedChannelCapacity), ) } - // overshoot a bit to account for noise (in measurement/estimate etc) - s.probeGoalBps = expectedBandwidthUsage + ((probeGoalDeltaBps * ProbePct) / 100) - s.abortedProbeClusterId = ProbeClusterIdInvalid - - s.probeTrendObserved = false - - s.probeEndTime = time.Time{} + probeClusterId, probeGoalBps := s.probeController.InitProbe(probeGoalDeltaBps, expectedBandwidthUsage) channelState := "" if s.channelObserver != nil { @@ -1243,67 +1163,26 @@ func (s *StreamAllocator) initProbe(probeGoalDeltaBps int64) { s.channelObserver = s.newChannelObserverProbe() s.channelObserver.SeedEstimate(s.lastReceivedEstimate) - s.probeClusterId = s.prober.AddCluster( - ProbeClusterModeUniform, - int(s.probeGoalBps), - int(expectedBandwidthUsage), - ProbeMinDuration, - ProbeMaxDuration, - ) s.params.Logger.Infow( "stream allocator: starting probe", - "probeClusterId", s.probeClusterId, + "probeClusterId", probeClusterId, "current usage", expectedBandwidthUsage, "committed", s.committedChannelCapacity, "lastReceived", s.lastReceivedEstimate, "channel", channelState, "probeGoalDeltaBps", probeGoalDeltaBps, - "goalBps", s.probeGoalBps, + "goalBps", probeGoalBps, ) } -func (s *StreamAllocator) resetProbe() { - s.lastProbeStartTime = time.Now() - - s.resetProbeInterval() - - s.clearProbe() -} - -func (s *StreamAllocator) clearProbe() { - s.probeClusterId = ProbeClusterIdInvalid - s.abortedProbeClusterId = ProbeClusterIdInvalid -} - -func (s *StreamAllocator) backoffProbeInterval() { - s.probeInterval = time.Duration(s.probeInterval.Seconds()*ProbeBackoffFactor) * time.Second - if s.probeInterval > ProbeWaitMax { - s.probeInterval = ProbeWaitMax - } -} - -func (s *StreamAllocator) resetProbeInterval() { - s.probeInterval = ProbeWaitBase -} - -func (s *StreamAllocator) stopProbe() { - s.prober.Reset() -} - -func (s *StreamAllocator) abortProbe() { - s.abortedProbeClusterId = s.probeClusterId - s.stopProbe() -} - -func (s *StreamAllocator) isInProbe() bool { - return s.probeClusterId != ProbeClusterIdInvalid -} - func (s *StreamAllocator) maybeProbe() { - if time.Since(s.lastProbeStartTime) < s.probeInterval || s.probeClusterId != ProbeClusterIdInvalid || s.overriddenChannelCapacity > 0 { + if s.overriddenChannelCapacity > 0 { // do not probe if channel capacity is overridden return } + if !s.probeController.CanProbe() { + return + } switch s.params.Config.ProbeMode { case config.CongestionControlProbeModeMedia: @@ -1328,7 +1207,7 @@ func (s *StreamAllocator) maybeProbeWithMedia() { } s.maybeSendUpdate(update) - s.lastProbeStartTime = time.Now() + s.probeController.Reset() break } }