Split probe controller from StreamAllocator. (#1751)

* Split probe controller from StreamAllocator.

With TWCC, there is a need to check for probe status
in a separate goroutine. So, probe specific stuff need
locking. Split out the probe controller to make that cleaner.

* remove defer
This commit is contained in:
Raja Subramanian
2023-05-29 14:41:44 +05:30
committed by GitHub
parent 2edd257705
commit fdfd830394
2 changed files with 290 additions and 148 deletions

View File

@@ -0,0 +1,263 @@
package streamallocator
import (
"sync"
"time"
"github.com/livekit/protocol/logger"
)
const (
ProbeWaitBase = 5 * time.Second
ProbeBackoffFactor = 1.5
ProbeWaitMax = 30 * time.Second
ProbeSettleWait = 250
ProbeTrendWait = 2 * time.Second
ProbePct = 120
ProbeMinBps = 200 * 1000 // 200 kbps
ProbeMinDuration = 20 * time.Second
ProbeMaxDuration = 21 * time.Second
)
// ---------------------------------------------------------------------------
type ProbeControllerParams struct {
Prober *Prober
Logger logger.Logger
}
type ProbeController struct {
params ProbeControllerParams
lock sync.RWMutex
probeInterval time.Duration
lastProbeStartTime time.Time
probeGoalBps int64
probeClusterId ProbeClusterId
abortedProbeClusterId ProbeClusterId
probeTrendObserved bool
probeEndTime time.Time
onProbeSuccess func()
}
func NewProbeController(params ProbeControllerParams) *ProbeController {
p := &ProbeController{
params: params,
}
p.Reset()
return p
}
func (p *ProbeController) OnProbeSuccess(f func()) {
p.lock.Lock()
defer p.lock.Unlock()
p.onProbeSuccess = f
}
func (p *ProbeController) Reset() {
p.lock.Lock()
defer p.lock.Unlock()
p.lastProbeStartTime = time.Now()
p.resetProbeIntervalLocked()
p.clearProbeLocked()
}
func (p *ProbeController) ProbeClusterDone(info ProbeClusterInfo, lowestEstimate int64) {
p.lock.Lock()
if p.probeClusterId != info.Id {
p.lock.Unlock()
return
}
if p.abortedProbeClusterId == ProbeClusterIdInvalid {
// successful probe, finalize
isSuccessful := p.finalizeProbeLocked()
var onProbeSuccess func()
if isSuccessful {
onProbeSuccess = p.onProbeSuccess
}
p.lock.Unlock()
if onProbeSuccess != nil {
onProbeSuccess()
}
return
}
// ensure probe queue is flushed
// STREAM-ALLOCATOR-TODO: ProbeSettleWait should actually be a certain number of RTTs.
expectedDuration := float64(info.BytesSent*8*1000) / float64(lowestEstimate)
queueTime := expectedDuration - float64(info.Duration.Milliseconds())
if queueTime < 0.0 {
queueTime = 0.0
}
queueWait := time.Duration(queueTime+float64(ProbeSettleWait)) * time.Millisecond
p.probeEndTime = p.lastProbeStartTime.Add(queueWait)
p.lock.Unlock()
}
func (p *ProbeController) CheckProbe(trend ChannelTrend, highestEstimate int64) {
p.lock.Lock()
defer p.lock.Unlock()
if p.probeClusterId == ProbeClusterIdInvalid {
return
}
if trend != ChannelTrendNeutral {
p.probeTrendObserved = true
}
switch {
case !p.probeTrendObserved && time.Since(p.lastProbeStartTime) > ProbeTrendWait:
//
// More of a safety net.
// In rare cases, the estimate gets stuck. Prevent from probe running amok
// STREAM-ALLOCATOR-TODO: Need more testing here to ensure that probe does not cause a lot of damage
//
p.params.Logger.Infow("stream allocator: probe: aborting, no trend", "cluster", p.probeClusterId)
p.abortProbeLocked()
case trend == ChannelTrendCongesting:
// stop immediately if the probe is congesting channel more
p.params.Logger.Infow("stream allocator: probe: aborting, channel is congesting", "cluster", p.probeClusterId)
p.abortProbeLocked()
case highestEstimate > p.probeGoalBps:
// reached goal, stop probing
p.params.Logger.Infow(
"stream allocator: probe: stopping, goal reached",
"cluster", p.probeClusterId,
"goal", p.probeGoalBps,
"highest", highestEstimate,
)
p.StopProbe()
}
}
func (p *ProbeController) MaybeFinalizeProbe() {
p.lock.Lock()
isSuccessful := false
if p.isInProbeLocked() && !p.probeEndTime.IsZero() && time.Now().After(p.probeEndTime) {
isSuccessful = p.finalizeProbeLocked()
}
var onProbeSuccess func()
if isSuccessful {
onProbeSuccess = p.onProbeSuccess
}
p.lock.Unlock()
if onProbeSuccess != nil {
onProbeSuccess()
}
}
func (p *ProbeController) DoesProbeNeedFinalize() bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.abortedProbeClusterId != ProbeClusterIdInvalid
}
func (p *ProbeController) finalizeProbeLocked() bool {
aborted := p.probeClusterId == p.abortedProbeClusterId
p.clearProbeLocked()
if aborted {
// failed probe, backoff
p.backoffProbeIntervalLocked()
return false
}
// reset probe interval on a successful probe
p.resetProbeIntervalLocked()
return true
}
func (p *ProbeController) InitProbe(probeGoalDeltaBps int64, expectedBandwidthUsage int64) (ProbeClusterId, int64) {
p.lock.Lock()
defer p.lock.Unlock()
p.lastProbeStartTime = time.Now()
// overshoot a bit to account for noise (in measurement/estimate etc)
p.probeGoalBps = expectedBandwidthUsage + ((probeGoalDeltaBps * ProbePct) / 100)
p.abortedProbeClusterId = ProbeClusterIdInvalid
p.probeTrendObserved = false
p.probeEndTime = time.Time{}
p.probeClusterId = p.params.Prober.AddCluster(
ProbeClusterModeUniform,
int(p.probeGoalBps),
int(expectedBandwidthUsage),
ProbeMinDuration,
ProbeMaxDuration,
)
return p.probeClusterId, p.probeGoalBps
}
func (p *ProbeController) clearProbeLocked() {
p.probeClusterId = ProbeClusterIdInvalid
p.abortedProbeClusterId = ProbeClusterIdInvalid
}
func (p *ProbeController) backoffProbeIntervalLocked() {
p.probeInterval = time.Duration(p.probeInterval.Seconds()*ProbeBackoffFactor) * time.Second
if p.probeInterval > ProbeWaitMax {
p.probeInterval = ProbeWaitMax
}
}
func (p *ProbeController) resetProbeIntervalLocked() {
p.probeInterval = ProbeWaitBase
}
func (p *ProbeController) StopProbe() {
p.params.Prober.Reset()
}
func (p *ProbeController) AbortProbe() {
p.lock.Lock()
defer p.lock.Unlock()
p.abortProbeLocked()
}
func (p *ProbeController) abortProbeLocked() {
p.abortedProbeClusterId = p.probeClusterId
p.StopProbe()
}
func (p *ProbeController) IsInProbe() bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.isInProbeLocked()
}
func (p *ProbeController) isInProbeLocked() bool {
return p.probeClusterId != ProbeClusterIdInvalid
}
func (p *ProbeController) CanProbe() bool {
p.lock.RLock()
defer p.lock.RUnlock()
return time.Since(p.lastProbeStartTime) >= p.probeInterval && p.probeClusterId == ProbeClusterIdInvalid
}
// ------------------------------------------------

View File

@@ -25,17 +25,6 @@ const (
NackRatioAttenuator = 0.4 // how much to attenuate NACK ratio while calculating loss adjusted estimate
ProbeWaitBase = 5 * time.Second
ProbeBackoffFactor = 1.5
ProbeWaitMax = 30 * time.Second
ProbeSettleWait = 250
ProbeTrendWait = 2 * time.Second
ProbePct = 120
ProbeMinBps = 200 * 1000 // 200 kbps
ProbeMinDuration = 20 * time.Second
ProbeMaxDuration = 21 * time.Second
PriorityMin = uint8(1)
PriorityMax = uint8(255)
PriorityDefaultScreenshare = PriorityMax
@@ -175,13 +164,7 @@ type StreamAllocator struct {
committedChannelCapacity int64
overriddenChannelCapacity int64
probeInterval time.Duration
lastProbeStartTime time.Time
probeGoalBps int64
probeClusterId ProbeClusterId
abortedProbeClusterId ProbeClusterId
probeTrendObserved bool
probeEndTime time.Time
probeController *ProbeController
prober *Prober
@@ -213,6 +196,12 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator {
eventCh: make(chan Event, 1000),
}
s.probeController = NewProbeController(ProbeControllerParams{
Prober: s.prober,
Logger: params.Logger,
})
s.probeController.OnProbeSuccess(s.onProbeSuccess)
s.resetState()
s.prober.SetProberListener(s)
@@ -319,7 +308,7 @@ func (s *StreamAllocator) SetChannelCapacity(channelCapacity int64) {
func (s *StreamAllocator) resetState() {
s.channelObserver = s.newChannelObserverNonProbe()
s.resetProbe()
s.probeController.Reset()
s.state = streamAllocatorStateStable
}
@@ -561,7 +550,7 @@ func (s *StreamAllocator) processEvents() {
s.handleEvent(&event)
}
s.stopProbe()
s.probeController.StopProbe()
}
func (s *StreamAllocator) ping() {
@@ -642,7 +631,7 @@ func (s *StreamAllocator) handleSignalEstimate(event *Event) {
s.monitorRate(receivedEstimate)
// while probing, maintain estimate separately to enable keeping current committed estimate if probe fails
if s.isInProbe() {
if s.probeController.IsInProbe() {
s.handleNewEstimateInProbe()
} else {
s.handleNewEstimateInNonProbe()
@@ -651,9 +640,7 @@ func (s *StreamAllocator) handleSignalEstimate(event *Event) {
func (s *StreamAllocator) handleSignalPeriodicPing(event *Event) {
// finalize probe if necessary
if s.isInProbe() && !s.probeEndTime.IsZero() && time.Now().After(s.probeEndTime) {
s.finalizeProbe()
}
s.probeController.MaybeFinalizeProbe()
// probe if necessary and timing is right
if s.state == streamAllocatorStateDeficient {
@@ -686,26 +673,7 @@ func (s *StreamAllocator) handleSignalSendProbe(event *Event) {
func (s *StreamAllocator) handleSignalProbeClusterDone(event *Event) {
info, _ := event.Data.(ProbeClusterInfo)
if s.probeClusterId != info.Id {
return
}
if s.abortedProbeClusterId == ProbeClusterIdInvalid {
// successful probe, finalize
s.finalizeProbe()
return
}
// ensure probe queue is flushed
// STREAM-ALLOCATOR-TODO: ProbeSettleWait should actually be a certain number of RTTs.
lowestEstimate := int64(math.Min(float64(s.committedChannelCapacity), float64(s.channelObserver.GetLowestEstimate())))
expectedDuration := float64(info.BytesSent*8*1000) / float64(lowestEstimate)
queueTime := expectedDuration - float64(info.Duration.Milliseconds())
if queueTime < 0.0 {
queueTime = 0.0
}
queueWait := time.Duration(queueTime+float64(ProbeSettleWait)) * time.Millisecond
s.probeEndTime = s.lastProbeStartTime.Add(queueWait)
s.probeController.ProbeClusterDone(info, int64(math.Min(float64(s.committedChannelCapacity), float64(s.channelObserver.GetLowestEstimate()))))
}
func (s *StreamAllocator) handleSignalResume(event *Event) {
@@ -732,7 +700,7 @@ func (s *StreamAllocator) handleSignalSetChannelCapacity(event *Event) {
s.params.Logger.Infow("allocating on override channel capacity", "override", s.overriddenChannelCapacity)
s.allocateAllTracks()
} else {
s.params.Logger.Infow("clearing override channel capacity")
s.params.Logger.Infow("clearing override channel capacity")
}
}
@@ -769,7 +737,7 @@ func (s *StreamAllocator) setState(state streamAllocatorState) {
s.state = state
// reset probe to enforce a delay after state change before probing
s.lastProbeStartTime = time.Now()
s.probeController.Reset()
}
func (s *StreamAllocator) adjustState() {
@@ -787,7 +755,7 @@ func (s *StreamAllocator) handleNewEstimateInProbe() {
// always update NACKs, even if aborted
packetDelta, repeatedNackDelta := s.getNackDelta()
if s.abortedProbeClusterId != ProbeClusterIdInvalid {
if s.probeController.DoesProbeNeedFinalize() {
// waiting for aborted probe to finalize
return
}
@@ -796,35 +764,7 @@ func (s *StreamAllocator) handleNewEstimateInProbe() {
s.channelObserver.AddNack(packetDelta, repeatedNackDelta)
trend, _ := s.channelObserver.GetTrend()
if trend != ChannelTrendNeutral {
s.probeTrendObserved = true
}
switch {
case !s.probeTrendObserved && time.Since(s.lastProbeStartTime) > ProbeTrendWait:
//
// More of a safety net.
// In rare cases, the estimate gets stuck. Prevent from probe running amok
// STREAM-ALLOCATOR-TODO: Need more testing here to ensure that probe does not cause a lot of damage
//
s.params.Logger.Infow("stream allocator: probe: aborting, no trend", "cluster", s.probeClusterId)
s.abortProbe()
case trend == ChannelTrendCongesting:
// stop immediately if the probe is congesting channel more
s.params.Logger.Infow("stream allocator: probe: aborting, channel is congesting", "cluster", s.probeClusterId)
s.abortProbe()
case s.channelObserver.GetHighestEstimate() > s.probeGoalBps:
// reached goal, stop probing
s.params.Logger.Infow(
"stream allocator: probe: stopping, goal reached",
"cluster", s.probeClusterId,
"goal", s.probeGoalBps,
"highest", s.channelObserver.GetHighestEstimate(),
)
s.stopProbe()
}
s.probeController.CheckProbe(trend, s.channelObserver.GetHighestEstimate())
}
func (s *StreamAllocator) handleNewEstimateInNonProbe() {
@@ -872,14 +812,14 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() {
s.channelObserver = s.newChannelObserverNonProbe()
// reset probe to ensure it does not start too soon after a downward trend
s.resetProbe()
s.probeController.Reset()
s.allocateAllTracks()
}
func (s *StreamAllocator) allocateTrack(track *Track) {
// abort any probe that may be running when a track specific change needs allocation
s.abortProbe()
s.probeController.AbortProbe()
// if not deficient, free pass allocate track
if !s.params.Config.Enabled || s.state == streamAllocatorStateStable || !track.IsManaged() {
@@ -976,12 +916,9 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
s.adjustState()
}
func (s *StreamAllocator) finalizeProbe() {
aborted := s.probeClusterId == s.abortedProbeClusterId
func (s *StreamAllocator) onProbeSuccess() {
highestEstimateInProbe := s.channelObserver.GetHighestEstimate()
s.clearProbe()
//
// Reset estimator at the end of a probe irrespective of probe result to get fresh readings.
// With a failed probe, the latest estimate could be lower than committed estimate.
@@ -995,15 +932,6 @@ func (s *StreamAllocator) finalizeProbe() {
//
s.channelObserver = s.newChannelObserverNonProbe()
if aborted {
// failed probe, backoff
s.backoffProbeInterval()
return
}
// reset probe interval on a successful probe
s.resetProbeInterval()
// probe estimate is same or higher, commit it and try to allocate deficient tracks
s.params.Logger.Infow(
"successful probe, updating channel capacity",
@@ -1211,8 +1139,6 @@ func (s *StreamAllocator) newChannelObserverNonProbe() *ChannelObserver {
}
func (s *StreamAllocator) initProbe(probeGoalDeltaBps int64) {
s.lastProbeStartTime = time.Now()
expectedBandwidthUsage := s.getExpectedBandwidthUsage()
if float64(expectedBandwidthUsage) > 1.5*float64(s.committedChannelCapacity) {
// STREAM-ALLOCATOR-TODO-START
@@ -1227,14 +1153,8 @@ func (s *StreamAllocator) initProbe(probeGoalDeltaBps int64) {
fmt.Errorf("expected too high, expected: %d, committed: %d", expectedBandwidthUsage, s.committedChannelCapacity),
)
}
// overshoot a bit to account for noise (in measurement/estimate etc)
s.probeGoalBps = expectedBandwidthUsage + ((probeGoalDeltaBps * ProbePct) / 100)
s.abortedProbeClusterId = ProbeClusterIdInvalid
s.probeTrendObserved = false
s.probeEndTime = time.Time{}
probeClusterId, probeGoalBps := s.probeController.InitProbe(probeGoalDeltaBps, expectedBandwidthUsage)
channelState := ""
if s.channelObserver != nil {
@@ -1243,67 +1163,26 @@ func (s *StreamAllocator) initProbe(probeGoalDeltaBps int64) {
s.channelObserver = s.newChannelObserverProbe()
s.channelObserver.SeedEstimate(s.lastReceivedEstimate)
s.probeClusterId = s.prober.AddCluster(
ProbeClusterModeUniform,
int(s.probeGoalBps),
int(expectedBandwidthUsage),
ProbeMinDuration,
ProbeMaxDuration,
)
s.params.Logger.Infow(
"stream allocator: starting probe",
"probeClusterId", s.probeClusterId,
"probeClusterId", probeClusterId,
"current usage", expectedBandwidthUsage,
"committed", s.committedChannelCapacity,
"lastReceived", s.lastReceivedEstimate,
"channel", channelState,
"probeGoalDeltaBps", probeGoalDeltaBps,
"goalBps", s.probeGoalBps,
"goalBps", probeGoalBps,
)
}
func (s *StreamAllocator) resetProbe() {
s.lastProbeStartTime = time.Now()
s.resetProbeInterval()
s.clearProbe()
}
func (s *StreamAllocator) clearProbe() {
s.probeClusterId = ProbeClusterIdInvalid
s.abortedProbeClusterId = ProbeClusterIdInvalid
}
func (s *StreamAllocator) backoffProbeInterval() {
s.probeInterval = time.Duration(s.probeInterval.Seconds()*ProbeBackoffFactor) * time.Second
if s.probeInterval > ProbeWaitMax {
s.probeInterval = ProbeWaitMax
}
}
func (s *StreamAllocator) resetProbeInterval() {
s.probeInterval = ProbeWaitBase
}
func (s *StreamAllocator) stopProbe() {
s.prober.Reset()
}
func (s *StreamAllocator) abortProbe() {
s.abortedProbeClusterId = s.probeClusterId
s.stopProbe()
}
func (s *StreamAllocator) isInProbe() bool {
return s.probeClusterId != ProbeClusterIdInvalid
}
func (s *StreamAllocator) maybeProbe() {
if time.Since(s.lastProbeStartTime) < s.probeInterval || s.probeClusterId != ProbeClusterIdInvalid || s.overriddenChannelCapacity > 0 {
if s.overriddenChannelCapacity > 0 {
// do not probe if channel capacity is overridden
return
}
if !s.probeController.CanProbe() {
return
}
switch s.params.Config.ProbeMode {
case config.CongestionControlProbeModeMedia:
@@ -1328,7 +1207,7 @@ func (s *StreamAllocator) maybeProbeWithMedia() {
}
s.maybeSendUpdate(update)
s.lastProbeStartTime = time.Now()
s.probeController.Reset()
break
}
}