Adjust stream allocator ping interval based on state. (#3951)

* Adjust stream allocator ping interval based on state.

In steady state, does a 15 second ping.
While deficient, to be able to react to probes faster, it pings at 100ms
interval.

* clean up

* log ops queue not able to wake up
This commit is contained in:
Raja Subramanian
2025-09-24 14:45:57 +05:30
committed by GitHub
parent 3837006b39
commit bfba6feed4
3 changed files with 36 additions and 27 deletions
+33 -25
View File
@@ -37,21 +37,24 @@ import (
)
const (
ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps
cChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps
PriorityMin = uint8(1)
PriorityMax = uint8(255)
PriorityDefaultScreenshare = PriorityMax
PriorityDefaultVideo = PriorityMin
cPriorityMin = uint8(1)
cPriorityMax = uint8(255)
cPriorityDefaultScreenshare = cPriorityMax
cPriorityDefaultVideo = cPriorityMin
FlagAllowOvershootWhileOptimal = true
FlagAllowOvershootWhileDeficient = false
FlagAllowOvershootExemptTrackWhileDeficient = true
FlagAllowOvershootInProbe = true
FlagAllowOvershootInCatchup = false
FlagAllowOvershootInBoost = true
cFlagAllowOvershootWhileOptimal = true
cFlagAllowOvershootWhileDeficient = false
cFlagAllowOvershootExemptTrackWhileDeficient = true
cFlagAllowOvershootInProbe = true
cFlagAllowOvershootInCatchup = false
cFlagAllowOvershootInBoost = true
cRTTPullInterval = 30 * time.Second
cPingLong = cRTTPullInterval / 2
cPingShort = 100 * time.Millisecond
)
// ---------------------------------------------------------------------------
@@ -215,6 +218,8 @@ type StreamAllocator struct {
lastRTTTime time.Time
pingGeneration atomic.Uint32
isStopped atomic.Bool
}
@@ -247,7 +252,7 @@ func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause b
func (s *StreamAllocator) Start() {
s.eventsQueue.Start()
go s.ping()
go s.ping(s.pingGeneration.Inc(), cPingLong)
}
func (s *StreamAllocator) Stop() {
@@ -589,13 +594,13 @@ func (s *StreamAllocator) maybePostEventAllocateTrack(downTrack *sfu.DownTrack)
}
}
func (s *StreamAllocator) ping() {
ticker := time.NewTicker(100 * time.Millisecond)
func (s *StreamAllocator) ping(pingGeneration uint32, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
<-ticker.C
if s.isStopped.Load() {
if s.isStopped.Load() || (pingGeneration != s.pingGeneration.Load()) {
return
}
@@ -833,7 +838,7 @@ func (s *StreamAllocator) handleSignalCongestionStateChange(event Event) {
if isHoldableCongestionState(cscd.fromState) && cscd.toState == bwe.CongestionStateNone && s.state == streamAllocatorStateStable {
update := NewStreamStateUpdate()
for _, track := range s.getTracks() {
allocation := track.AllocateOptimal(FlagAllowOvershootWhileOptimal, false)
allocation := track.AllocateOptimal(cFlagAllowOvershootWhileOptimal, false)
updateStreamStateChange(track, allocation, update)
}
s.maybeSendUpdate(update)
@@ -879,6 +884,9 @@ func (s *StreamAllocator) setState(state streamAllocatorState) {
s.params.BWE.Reset()
s.activeProbeClusterId = ccutils.ProbeClusterIdInvalid
go s.ping(s.pingGeneration.Inc(), cPingLong)
} else {
go s.ping(s.pingGeneration.Inc(), cPingShort)
}
}
@@ -901,7 +909,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
bweCongestionState := s.params.BWE.CongestionState()
if !s.enabled || (s.state == streamAllocatorStateStable && !isDeficientCongestionState(bweCongestionState)) || !track.IsManaged() {
update := NewStreamStateUpdate()
allocation := track.AllocateOptimal(FlagAllowOvershootWhileOptimal, isHoldableCongestionState(bweCongestionState))
allocation := track.AllocateOptimal(cFlagAllowOvershootWhileOptimal, isHoldableCongestionState(bweCongestionState))
updateStreamStateChange(track, allocation, update)
s.maybeSendUpdate(update)
return
@@ -927,7 +935,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
// track needing change is not currently streaming, i. e. it has to be resumed.
//
track.ProvisionalAllocatePrepare()
transition := track.ProvisionalAllocateGetCooperativeTransition(FlagAllowOvershootWhileDeficient)
transition := track.ProvisionalAllocateGetCooperativeTransition(cFlagAllowOvershootWhileDeficient)
// downgrade, giving back bits
if transition.From.GreaterThan(transition.To) {
@@ -973,7 +981,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
availableChannelCapacity,
layer,
s.allowPause,
FlagAllowOvershootWhileDeficient,
cFlagAllowOvershootWhileDeficient,
)
if availableChannelCapacity < usedChannelCapacity {
break alloc_loop
@@ -999,7 +1007,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
}
track.ProvisionalAllocateReset()
transition = track.ProvisionalAllocateGetCooperativeTransition(FlagAllowOvershootWhileDeficient) // get transition again to reset above allocation attempt using available headroom
transition = track.ProvisionalAllocateGetCooperativeTransition(cFlagAllowOvershootWhileDeficient) // get transition again to reset above allocation attempt using available headroom
}
// if there is not enough headroom, try to redistribute starting with tracks that are closest to their desired.
@@ -1082,7 +1090,7 @@ func (s *StreamAllocator) maybeBoostDeficientTracks() {
boost_loop:
for {
for idx, track := range sortedTracks {
allocation, boosted := track.AllocateNextHigher(availableChannelCapacity, FlagAllowOvershootInCatchup)
allocation, boosted := track.AllocateNextHigher(availableChannelCapacity, cFlagAllowOvershootInCatchup)
if !boosted {
if idx == len(sortedTracks)-1 {
// all tracks tried
@@ -1143,7 +1151,7 @@ func (s *StreamAllocator) allocateAllTracks() {
continue
}
allocation := track.AllocateOptimal(FlagAllowOvershootExemptTrackWhileDeficient, false)
allocation := track.AllocateOptimal(cFlagAllowOvershootExemptTrackWhileDeficient, false)
updateStreamStateChange(track, allocation, update)
// STREAM-ALLOCATOR-TODO: optimistic allocation before bitrate is available will return 0. How to account for that?
@@ -1179,7 +1187,7 @@ func (s *StreamAllocator) allocateAllTracks() {
}
for _, track := range sorted {
_, usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient)
_, usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, cFlagAllowOvershootWhileDeficient)
availableChannelCapacity -= usedChannelCapacity
if availableChannelCapacity < 0 {
availableChannelCapacity = 0
@@ -1310,7 +1318,7 @@ func (s *StreamAllocator) maybeProbe() {
func (s *StreamAllocator) maybeProbeWithMedia() {
// boost deficient track farthest from desired layer
for _, track := range s.getMaxDistanceSortedDeficient() {
allocation, boosted := track.AllocateNextHigher(ChannelCapacityInfinity, FlagAllowOvershootInBoost)
allocation, boosted := track.AllocateNextHigher(cChannelCapacityInfinity, cFlagAllowOvershootInBoost)
if !boosted {
continue
}
@@ -1327,7 +1335,7 @@ func (s *StreamAllocator) maybeProbeWithMedia() {
func (s *StreamAllocator) maybeProbeWithPadding() {
// use deficient track farthest from desired layer to find how much to probe
for _, track := range s.getMaxDistanceSortedDeficient() {
transition, available := track.GetNextHigherTransition(FlagAllowOvershootInProbe)
transition, available := track.GetNextHigherTransition(cFlagAllowOvershootInProbe)
if !available || transition.BandwidthDelta < 0 {
continue
}
+2 -2
View File
@@ -87,9 +87,9 @@ func (t *Track) SetPriority(priority uint8) bool {
if priority == 0 {
switch t.source {
case livekit.TrackSource_SCREEN_SHARE:
priority = PriorityDefaultScreenshare
priority = cPriorityDefaultScreenshare
default:
priority = PriorityDefaultVideo
priority = cPriorityDefaultVideo
}
}
+1
View File
@@ -128,6 +128,7 @@ func (oq *opsQueueBase[T]) Enqueue(op T) {
select {
case oq.wake <- struct{}{}:
default:
oq.params.Logger.Infow("could not wake ops queue", "name", oq.params.Name)
}
}
}