mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 19:55:41 +00:00
* Better naming/logging for send side bwe * Check BWE congestion state before doing optimal allocation. It is possible that BWE declares congestion, but the estimated bandwidth may still be enough to accommodate all tracks. So, stream allocator would still not in DEFICIENT state. On a new track allocation, it will get optimal allocation although BWE is in congested state. Take BWE congestion state into consideration before doing any track allocation. * get congestion state from BWE rather than caching it in stream allocator * get states from update * log contributing groups only when they are small in number * get oldest group always * notify only when estimate is dropping * require stronger CTR down trend * maintain min/max group * relax thresholds to declare congestion a bit
1450 lines
42 KiB
Go
1450 lines
42 KiB
Go
// Copyright 2023 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package streamallocator
|
|
|
|
import (
|
|
"fmt"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pion/interceptor/pkg/cc"
|
|
"github.com/pion/rtcp"
|
|
"github.com/pion/webrtc/v4"
|
|
"go.uber.org/atomic"
|
|
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
|
|
"github.com/livekit/livekit-server/pkg/sfu"
|
|
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
|
"github.com/livekit/livekit-server/pkg/sfu/bwe"
|
|
"github.com/livekit/livekit-server/pkg/sfu/ccutils"
|
|
"github.com/livekit/livekit-server/pkg/sfu/pacer"
|
|
"github.com/livekit/livekit-server/pkg/utils"
|
|
)
|
|
|
|
const (
|
|
ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps
|
|
|
|
PriorityMin = uint8(1)
|
|
PriorityMax = uint8(255)
|
|
PriorityDefaultScreenshare = PriorityMax
|
|
PriorityDefaultVideo = PriorityMin
|
|
|
|
FlagAllowOvershootWhileOptimal = true
|
|
FlagAllowOvershootWhileDeficient = false
|
|
FlagAllowOvershootExemptTrackWhileDeficient = true
|
|
FlagAllowOvershootInProbe = true
|
|
FlagAllowOvershootInCatchup = false
|
|
FlagAllowOvershootInBoost = true
|
|
|
|
cRTTPullInterval = 30 * time.Second
|
|
)
|
|
|
|
// ---------------------------------------------------------------------------
|
|
|
|
type streamAllocatorState int
|
|
|
|
const (
|
|
streamAllocatorStateStable streamAllocatorState = iota
|
|
streamAllocatorStateDeficient
|
|
)
|
|
|
|
func (s streamAllocatorState) String() string {
|
|
switch s {
|
|
case streamAllocatorStateStable:
|
|
return "STABLE"
|
|
case streamAllocatorStateDeficient:
|
|
return "DEFICIENT"
|
|
default:
|
|
return fmt.Sprintf("UNKNOWN: %d", int(s))
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
|
|
type streamAllocatorSignal int
|
|
|
|
const (
|
|
streamAllocatorSignalAllocateTrack streamAllocatorSignal = iota
|
|
streamAllocatorSignalAllocateAllTracks
|
|
streamAllocatorSignalAdjustState
|
|
streamAllocatorSignalEstimate
|
|
streamAllocatorSignalFeedback
|
|
streamAllocatorSignalPeriodicPing
|
|
streamAllocatorSignalProbeClusterSwitch
|
|
streamAllocatorSignalSendProbe
|
|
streamAllocatorSignalPacerProbeObserverClusterComplete
|
|
streamAllocatorSignalResume
|
|
streamAllocatorSignalSetAllowPause
|
|
streamAllocatorSignalSetChannelCapacity
|
|
streamAllocatorSignalCongestionStateChange
|
|
)
|
|
|
|
func (s streamAllocatorSignal) String() string {
|
|
switch s {
|
|
case streamAllocatorSignalAllocateTrack:
|
|
return "ALLOCATE_TRACK"
|
|
case streamAllocatorSignalAllocateAllTracks:
|
|
return "ALLOCATE_ALL_TRACKS"
|
|
case streamAllocatorSignalAdjustState:
|
|
return "ADJUST_STATE"
|
|
case streamAllocatorSignalEstimate:
|
|
return "ESTIMATE"
|
|
case streamAllocatorSignalFeedback:
|
|
return "FEEDBACK"
|
|
case streamAllocatorSignalPeriodicPing:
|
|
return "PERIODIC_PING"
|
|
case streamAllocatorSignalProbeClusterSwitch:
|
|
return "PROBE_CLUSTER_SWITCH"
|
|
case streamAllocatorSignalSendProbe:
|
|
return "SEND_PROBE"
|
|
case streamAllocatorSignalPacerProbeObserverClusterComplete:
|
|
return "PACER_PROBE_OBSERVER_CLUSTER_COMPLETE"
|
|
case streamAllocatorSignalResume:
|
|
return "RESUME"
|
|
case streamAllocatorSignalSetAllowPause:
|
|
return "SET_ALLOW_PAUSE"
|
|
case streamAllocatorSignalSetChannelCapacity:
|
|
return "SET_CHANNEL_CAPACITY"
|
|
case streamAllocatorSignalCongestionStateChange:
|
|
return "CONGESTION_STATE_CHANGE"
|
|
default:
|
|
return fmt.Sprintf("%d", int(s))
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
|
|
type Event struct {
|
|
*StreamAllocator
|
|
Signal streamAllocatorSignal
|
|
TrackID livekit.TrackID
|
|
Data interface{}
|
|
}
|
|
|
|
func (e Event) String() string {
|
|
return fmt.Sprintf("StreamAllocator:Event{signal: %s, trackID: %s, data: %+v}", e.Signal, e.TrackID, e.Data)
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
|
|
type (
|
|
ProbeMode string
|
|
)
|
|
|
|
const (
|
|
ProbeModePadding ProbeMode = "padding"
|
|
ProbeModeMedia ProbeMode = "media"
|
|
)
|
|
|
|
type StreamAllocatorConfig struct {
|
|
MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"`
|
|
DisableEstimationUnmanagedTracks bool `yaml:"disable_etimation_unmanaged_tracks,omitempty"`
|
|
|
|
ProbeMode ProbeMode `yaml:"probe_mode,omitempty"`
|
|
ProbeOveragePct int64 `yaml:"probe_overage_pct,omitempty"`
|
|
ProbeMinBps int64 `yaml:"probe_min_bps,omitempty"`
|
|
|
|
PausedMinWait time.Duration `yaml:"paused_min_wait,omitempty"`
|
|
}
|
|
|
|
var (
|
|
DefaultStreamAllocatorConfig = StreamAllocatorConfig{
|
|
MinChannelCapacity: 0,
|
|
DisableEstimationUnmanagedTracks: false,
|
|
|
|
ProbeMode: ProbeModePadding,
|
|
ProbeOveragePct: 120,
|
|
ProbeMinBps: 200_000,
|
|
|
|
PausedMinWait: 5 * time.Second,
|
|
}
|
|
)
|
|
|
|
// ---------------------------------------------------------------------------
|
|
|
|
type StreamAllocatorParams struct {
|
|
Config StreamAllocatorConfig
|
|
BWE bwe.BWE
|
|
Pacer pacer.Pacer
|
|
RTTGetter func() (float64, bool)
|
|
Logger logger.Logger
|
|
}
|
|
|
|
type StreamAllocator struct {
|
|
params StreamAllocatorParams
|
|
|
|
onStreamStateChange func(update *StreamStateUpdate) error
|
|
|
|
sendSideBWEInterceptor cc.BandwidthEstimator
|
|
|
|
enabled bool
|
|
allowPause bool
|
|
|
|
committedChannelCapacity int64
|
|
overriddenChannelCapacity int64
|
|
|
|
prober *ccutils.Prober
|
|
|
|
videoTracksMu sync.RWMutex
|
|
videoTracks map[livekit.TrackID]*Track
|
|
isAllocateAllPending bool
|
|
rembTrackingSSRC uint32
|
|
|
|
state streamAllocatorState
|
|
|
|
activeProbeClusterId ccutils.ProbeClusterId
|
|
activeProbeGoalReached bool
|
|
activeProbeCongesting bool
|
|
|
|
eventsQueue *utils.TypedOpsQueue[Event]
|
|
|
|
lastRTTTime time.Time
|
|
|
|
isStopped atomic.Bool
|
|
}
|
|
|
|
func NewStreamAllocator(params StreamAllocatorParams, enabled bool, allowPause bool) *StreamAllocator {
|
|
s := &StreamAllocator{
|
|
params: params,
|
|
enabled: enabled,
|
|
allowPause: allowPause,
|
|
videoTracks: make(map[livekit.TrackID]*Track),
|
|
state: streamAllocatorStateStable,
|
|
activeProbeClusterId: ccutils.ProbeClusterIdInvalid,
|
|
eventsQueue: utils.NewTypedOpsQueue[Event](utils.OpsQueueParams{
|
|
Name: "stream-allocator",
|
|
MinSize: 64,
|
|
Logger: params.Logger,
|
|
}),
|
|
lastRTTTime: time.Now().Add(-cRTTPullInterval),
|
|
}
|
|
|
|
s.prober = ccutils.NewProber(ccutils.ProberParams{
|
|
Listener: s,
|
|
Logger: params.Logger,
|
|
})
|
|
|
|
s.params.BWE.SetBWEListener(s)
|
|
s.params.Pacer.SetPacerProbeObserverListener(s)
|
|
|
|
return s
|
|
}
|
|
|
|
func (s *StreamAllocator) Start() {
|
|
s.eventsQueue.Start()
|
|
go s.ping()
|
|
}
|
|
|
|
func (s *StreamAllocator) Stop() {
|
|
if s.isStopped.Swap(true) {
|
|
return
|
|
}
|
|
|
|
// wait for eventsQueue to be done
|
|
<-s.eventsQueue.Stop()
|
|
|
|
s.maybeStopProbe()
|
|
}
|
|
|
|
func (s *StreamAllocator) OnStreamStateChange(f func(update *StreamStateUpdate) error) {
|
|
s.onStreamStateChange = f
|
|
}
|
|
|
|
func (s *StreamAllocator) SetSendSideBWEInterceptor(sendSideBWEInterceptor cc.BandwidthEstimator) {
|
|
if sendSideBWEInterceptor != nil {
|
|
sendSideBWEInterceptor.OnTargetBitrateChange(s.onTargetBitrateChange)
|
|
}
|
|
s.sendSideBWEInterceptor = sendSideBWEInterceptor
|
|
}
|
|
|
|
type AddTrackParams struct {
|
|
Source livekit.TrackSource
|
|
Priority uint8
|
|
IsSimulcast bool
|
|
PublisherID livekit.ParticipantID
|
|
}
|
|
|
|
func (s *StreamAllocator) AddTrack(downTrack *sfu.DownTrack, params AddTrackParams) {
|
|
if downTrack.Kind() != webrtc.RTPCodecTypeVideo {
|
|
return
|
|
}
|
|
|
|
track := NewTrack(downTrack, params.Source, params.IsSimulcast, params.PublisherID, s.params.Logger)
|
|
track.SetPriority(params.Priority)
|
|
|
|
trackID := livekit.TrackID(downTrack.ID())
|
|
s.videoTracksMu.Lock()
|
|
oldTrack := s.videoTracks[trackID]
|
|
s.videoTracks[trackID] = track
|
|
s.videoTracksMu.Unlock()
|
|
|
|
if oldTrack != nil {
|
|
oldTrack.DownTrack().SetStreamAllocatorListener(nil)
|
|
}
|
|
|
|
downTrack.SetStreamAllocatorListener(s)
|
|
downTrack.SetProbeClusterId(s.activeProbeClusterId)
|
|
|
|
s.maybePostEventAllocateTrack(downTrack)
|
|
}
|
|
|
|
func (s *StreamAllocator) RemoveTrack(downTrack *sfu.DownTrack) {
|
|
s.videoTracksMu.Lock()
|
|
if existing := s.videoTracks[livekit.TrackID(downTrack.ID())]; existing != nil && existing.DownTrack() == downTrack {
|
|
delete(s.videoTracks, livekit.TrackID(downTrack.ID()))
|
|
}
|
|
s.videoTracksMu.Unlock()
|
|
|
|
// STREAM-ALLOCATOR-TODO: use any saved bandwidth to re-distribute
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalAdjustState,
|
|
})
|
|
}
|
|
|
|
func (s *StreamAllocator) SetTrackPriority(downTrack *sfu.DownTrack, priority uint8) {
|
|
s.videoTracksMu.Lock()
|
|
if track := s.videoTracks[livekit.TrackID(downTrack.ID())]; track != nil {
|
|
changed := track.SetPriority(priority)
|
|
if changed && !s.isAllocateAllPending {
|
|
// do a full allocation on a track priority change to keep it simple
|
|
s.isAllocateAllPending = true
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalAllocateAllTracks,
|
|
})
|
|
}
|
|
}
|
|
s.videoTracksMu.Unlock()
|
|
}
|
|
|
|
func (s *StreamAllocator) SetAllowPause(allowPause bool) {
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalSetAllowPause,
|
|
Data: allowPause,
|
|
})
|
|
}
|
|
|
|
func (s *StreamAllocator) SetChannelCapacity(channelCapacity int64) {
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalSetChannelCapacity,
|
|
Data: channelCapacity,
|
|
})
|
|
}
|
|
|
|
// called when a new REMB is received (receive side bandwidth estimation)
|
|
func (s *StreamAllocator) OnREMB(downTrack *sfu.DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate) {
|
|
//
|
|
// Channel capacity is estimated at a peer connection level. All down tracks
|
|
// in the peer connection will end up calling this for a REMB report with
|
|
// the same estimated channel capacity. Use a tracking SSRC to lock onto to
|
|
// one report. As SSRCs can be dropped over time, update tracking SSRC as needed
|
|
//
|
|
// A couple of things to keep in mind
|
|
// - REMB reports could be sent gratuitously as a way of providing
|
|
// periodic feedback, i.e. even if the estimated capacity does not
|
|
// change, there could be REMB packets on the wire. Those gratuitous
|
|
// REMBs should not trigger anything bad.
|
|
// - As each down track will issue this callback for the same REMB packet
|
|
// from the wire, theoretically it is possible that one down track's
|
|
// callback from previous REMB comes after another down track's callback
|
|
// from the new REMB. REMBs could fire very quickly especially when
|
|
// the network is entering congestion.
|
|
// STREAM-ALLOCATOR-TODO-START
|
|
// Need to check if the same SSRC reports can somehow race, i.e. does pion send
|
|
// RTCP dispatch for same SSRC on different threads? If not, the tracking SSRC
|
|
// should prevent racing
|
|
// STREAM-ALLOCATOR-TODO-END
|
|
//
|
|
|
|
// if there are no video tracks, ignore any straggler REMB
|
|
s.videoTracksMu.Lock()
|
|
if len(s.videoTracks) == 0 {
|
|
s.videoTracksMu.Unlock()
|
|
return
|
|
}
|
|
|
|
downTrackSSRC := uint32(0)
|
|
downTrackSSRCRTX := uint32(0)
|
|
track := s.videoTracks[livekit.TrackID(downTrack.ID())]
|
|
if track != nil {
|
|
downTrackSSRC = track.DownTrack().SSRC()
|
|
downTrackSSRCRTX = track.DownTrack().SSRCRTX()
|
|
}
|
|
|
|
found := false
|
|
for _, ssrc := range remb.SSRCs {
|
|
if ssrc == s.rembTrackingSSRC {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
if len(remb.SSRCs) == 0 {
|
|
s.params.Logger.Warnw("stream allocator: no SSRC to track REMB", nil)
|
|
s.videoTracksMu.Unlock()
|
|
return
|
|
}
|
|
|
|
// try to lock to track which is sending this update
|
|
downTrackSSRC := track.DownTrack().SSRC()
|
|
downTrackSSRCRTX := track.DownTrack().SSRCRTX()
|
|
for _, ssrc := range remb.SSRCs {
|
|
if ssrc == 0 {
|
|
continue
|
|
}
|
|
|
|
if ssrc == downTrackSSRC {
|
|
s.rembTrackingSSRC = downTrackSSRC
|
|
found = true
|
|
break
|
|
}
|
|
if ssrc == downTrackSSRCRTX {
|
|
s.rembTrackingSSRC = downTrackSSRCRTX
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
s.rembTrackingSSRC = remb.SSRCs[0]
|
|
}
|
|
}
|
|
|
|
if s.rembTrackingSSRC == 0 || (s.rembTrackingSSRC != downTrackSSRC && s.rembTrackingSSRC != downTrackSSRCRTX) {
|
|
s.videoTracksMu.Unlock()
|
|
return
|
|
}
|
|
s.videoTracksMu.Unlock()
|
|
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalEstimate,
|
|
Data: int64(remb.Bitrate),
|
|
})
|
|
}
|
|
|
|
// called when a new transport-cc feedback is received
|
|
func (s *StreamAllocator) OnTransportCCFeedback(downTrack *sfu.DownTrack, fb *rtcp.TransportLayerCC) {
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalFeedback,
|
|
Data: fb,
|
|
})
|
|
}
|
|
|
|
// called when target bitrate changes (send side bandwidth estimation)
|
|
func (s *StreamAllocator) onTargetBitrateChange(bitrate int) {
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalEstimate,
|
|
Data: int64(bitrate),
|
|
})
|
|
}
|
|
|
|
// called when congestion state changes (send side bandwidth estimation)
|
|
type congestionStateChangeData struct {
|
|
fromState bwe.CongestionState
|
|
toState bwe.CongestionState
|
|
estimatedAvailableChannelCapacity int64
|
|
}
|
|
|
|
// BWEListener implementation
|
|
func (s *StreamAllocator) OnCongestionStateChange(fromState bwe.CongestionState, toState bwe.CongestionState, estimatedAvailableChannelCapacity int64) {
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalCongestionStateChange,
|
|
Data: congestionStateChangeData{fromState, toState, estimatedAvailableChannelCapacity},
|
|
})
|
|
}
|
|
|
|
// called when feeding track's layer availability changes
|
|
func (s *StreamAllocator) OnAvailableLayersChanged(downTrack *sfu.DownTrack) {
|
|
s.maybePostEventAllocateTrack(downTrack)
|
|
}
|
|
|
|
// called when feeding track's bitrate measurement of any layer is available
|
|
func (s *StreamAllocator) OnBitrateAvailabilityChanged(downTrack *sfu.DownTrack) {
|
|
s.maybePostEventAllocateTrack(downTrack)
|
|
}
|
|
|
|
// called when feeding track's max published spatial layer changes
|
|
func (s *StreamAllocator) OnMaxPublishedSpatialChanged(downTrack *sfu.DownTrack) {
|
|
s.maybePostEventAllocateTrack(downTrack)
|
|
}
|
|
|
|
// called when feeding track's max published temporal layer changes
|
|
func (s *StreamAllocator) OnMaxPublishedTemporalChanged(downTrack *sfu.DownTrack) {
|
|
s.maybePostEventAllocateTrack(downTrack)
|
|
}
|
|
|
|
// called when subscription settings changes (muting/unmuting of track)
|
|
func (s *StreamAllocator) OnSubscriptionChanged(downTrack *sfu.DownTrack) {
|
|
s.maybePostEventAllocateTrack(downTrack)
|
|
}
|
|
|
|
// called when subscribed layer changes (limiting max layer)
|
|
func (s *StreamAllocator) OnSubscribedLayerChanged(downTrack *sfu.DownTrack, layer buffer.VideoLayer) {
|
|
shouldPost := false
|
|
s.videoTracksMu.Lock()
|
|
if track := s.videoTracks[livekit.TrackID(downTrack.ID())]; track != nil {
|
|
if track.SetMaxLayer(layer) && track.SetDirty(true) {
|
|
shouldPost = true
|
|
}
|
|
}
|
|
s.videoTracksMu.Unlock()
|
|
|
|
if shouldPost {
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalAllocateTrack,
|
|
TrackID: livekit.TrackID(downTrack.ID()),
|
|
})
|
|
}
|
|
}
|
|
|
|
// called when forwarder resumes a track
|
|
func (s *StreamAllocator) OnResume(downTrack *sfu.DownTrack) {
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalResume,
|
|
TrackID: livekit.TrackID(downTrack.ID()),
|
|
})
|
|
}
|
|
|
|
// called when probe cluster changes
|
|
func (s *StreamAllocator) OnProbeClusterSwitch(pci ccutils.ProbeClusterInfo) {
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalProbeClusterSwitch,
|
|
Data: pci,
|
|
})
|
|
}
|
|
|
|
// called when prober wants to send packet(s)
|
|
func (s *StreamAllocator) OnSendProbe(bytesToSend int) {
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalSendProbe,
|
|
Data: bytesToSend,
|
|
})
|
|
}
|
|
|
|
// called when pacer probe observer observes a cluster completion
|
|
func (s *StreamAllocator) OnPacerProbeObserverClusterComplete(probeClusterId ccutils.ProbeClusterId) {
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalPacerProbeObserverClusterComplete,
|
|
Data: probeClusterId,
|
|
})
|
|
}
|
|
|
|
// called to check if track should participate in BWE
|
|
func (s *StreamAllocator) IsBWEEnabled(downTrack *sfu.DownTrack) bool {
|
|
if !s.params.Config.DisableEstimationUnmanagedTracks {
|
|
return true
|
|
}
|
|
|
|
s.videoTracksMu.Lock()
|
|
defer s.videoTracksMu.Unlock()
|
|
|
|
if track := s.videoTracks[livekit.TrackID(downTrack.ID())]; track != nil {
|
|
return track.IsManaged()
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// called to check if track subscription mute can be applied
|
|
func (s *StreamAllocator) IsSubscribeMutable(downTrack *sfu.DownTrack) bool {
|
|
s.videoTracksMu.Lock()
|
|
defer s.videoTracksMu.Unlock()
|
|
|
|
if track := s.videoTracks[livekit.TrackID(downTrack.ID())]; track != nil {
|
|
return track.IsSubscribeMutable()
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (s *StreamAllocator) maybePostEventAllocateTrack(downTrack *sfu.DownTrack) {
|
|
shouldPost := false
|
|
s.videoTracksMu.Lock()
|
|
if track := s.videoTracks[livekit.TrackID(downTrack.ID())]; track != nil {
|
|
shouldPost = track.SetDirty(true)
|
|
}
|
|
s.videoTracksMu.Unlock()
|
|
|
|
if shouldPost {
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalAllocateTrack,
|
|
TrackID: livekit.TrackID(downTrack.ID()),
|
|
})
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) ping() {
|
|
ticker := time.NewTicker(100 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
<-ticker.C
|
|
if s.isStopped.Load() {
|
|
return
|
|
}
|
|
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalPeriodicPing,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) postEvent(event Event) {
|
|
event.StreamAllocator = s
|
|
s.eventsQueue.Enqueue(func(event Event) {
|
|
switch event.Signal {
|
|
case streamAllocatorSignalAllocateTrack:
|
|
event.handleSignalAllocateTrack(event)
|
|
case streamAllocatorSignalAllocateAllTracks:
|
|
event.handleSignalAllocateAllTracks(event)
|
|
case streamAllocatorSignalAdjustState:
|
|
event.handleSignalAdjustState(event)
|
|
case streamAllocatorSignalEstimate:
|
|
event.handleSignalEstimate(event)
|
|
case streamAllocatorSignalFeedback:
|
|
event.handleSignalFeedback(event)
|
|
case streamAllocatorSignalPeriodicPing:
|
|
event.handleSignalPeriodicPing(event)
|
|
case streamAllocatorSignalProbeClusterSwitch:
|
|
event.handleSignalProbeClusterSwitch(event)
|
|
case streamAllocatorSignalSendProbe:
|
|
event.handleSignalSendProbe(event)
|
|
case streamAllocatorSignalPacerProbeObserverClusterComplete:
|
|
event.handleSignalPacerProbeObserverClusterComplete(event)
|
|
case streamAllocatorSignalResume:
|
|
event.handleSignalResume(event)
|
|
case streamAllocatorSignalSetAllowPause:
|
|
event.handleSignalSetAllowPause(event)
|
|
case streamAllocatorSignalSetChannelCapacity:
|
|
event.handleSignalSetChannelCapacity(event)
|
|
case streamAllocatorSignalCongestionStateChange:
|
|
s.handleSignalCongestionStateChange(event)
|
|
}
|
|
}, event)
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalAllocateTrack(event Event) {
|
|
s.videoTracksMu.Lock()
|
|
track := s.videoTracks[event.TrackID]
|
|
if track != nil {
|
|
track.SetDirty(false)
|
|
}
|
|
s.videoTracksMu.Unlock()
|
|
|
|
if track != nil {
|
|
s.allocateTrack(track)
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalAllocateAllTracks(Event) {
|
|
s.videoTracksMu.Lock()
|
|
s.isAllocateAllPending = false
|
|
s.videoTracksMu.Unlock()
|
|
|
|
if s.state == streamAllocatorStateDeficient {
|
|
s.allocateAllTracks()
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalAdjustState(Event) {
|
|
s.adjustState()
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalEstimate(event Event) {
|
|
receivedEstimate := event.Data.(int64)
|
|
|
|
// always update NACKs
|
|
packetDelta, repeatedNackDelta := s.getNackDelta()
|
|
|
|
s.params.BWE.HandleREMB(
|
|
receivedEstimate,
|
|
s.getExpectedBandwidthUsage(),
|
|
packetDelta,
|
|
repeatedNackDelta,
|
|
)
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalFeedback(event Event) {
|
|
fb := event.Data.(*rtcp.TransportLayerCC)
|
|
if s.sendSideBWEInterceptor != nil {
|
|
s.sendSideBWEInterceptor.WriteRTCP([]rtcp.Packet{fb}, nil)
|
|
}
|
|
|
|
s.params.BWE.HandleTWCCFeedback(fb)
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalPeriodicPing(Event) {
|
|
// if pause is allowed, there may be no packets sent and BWE could be congested state,
|
|
// reset BWE if that persists for a while
|
|
if s.state == streamAllocatorStateDeficient && s.params.Pacer.TimeSinceLastSentPacket() > s.params.Config.PausedMinWait {
|
|
s.params.Logger.Infow("stream allocator: resetting bwe to enable probing")
|
|
s.maybeStopProbe()
|
|
s.params.BWE.Reset()
|
|
}
|
|
|
|
if s.activeProbeClusterId != ccutils.ProbeClusterIdInvalid {
|
|
if !s.activeProbeCongesting && !s.activeProbeGoalReached && s.params.BWE.ProbeClusterIsGoalReached() {
|
|
s.params.Logger.Debugw(
|
|
"stream allocator: probe goal reached",
|
|
"activeProbeClusterId", s.activeProbeClusterId,
|
|
)
|
|
s.activeProbeGoalReached = true
|
|
s.maybeStopProbe()
|
|
}
|
|
|
|
// finalize any probe that may have finished/aborted
|
|
if probeSignal, channelCapacity, isFinalized := s.params.BWE.ProbeClusterFinalize(); isFinalized {
|
|
s.params.Logger.Debugw(
|
|
"stream allocator: probe result",
|
|
"activeProbeClusterId", s.activeProbeClusterId,
|
|
"probeSignal", probeSignal,
|
|
"channelCapacity", channelCapacity,
|
|
)
|
|
|
|
s.activeProbeClusterId = ccutils.ProbeClusterIdInvalid
|
|
|
|
if probeSignal != ccutils.ProbeSignalCongesting {
|
|
if channelCapacity > s.committedChannelCapacity {
|
|
s.committedChannelCapacity = channelCapacity
|
|
}
|
|
|
|
s.maybeBoostDeficientTracks()
|
|
}
|
|
}
|
|
}
|
|
|
|
// try up allocations in case there is available headroom,
|
|
// it is possible that a previous up allocation is waiting to settle,
|
|
// so even if there was headroom available while doing previous up allocation
|
|
// it may not have used up all available headroom,
|
|
// check before probing again as this could use available headroom and
|
|
// up allocate all tracks to their desired layers, that would avoid
|
|
// an unnecessary probe cluster
|
|
if s.state == streamAllocatorStateDeficient {
|
|
s.maybeBoostDeficientTracks()
|
|
}
|
|
|
|
// probe if necessary and timing is right
|
|
if s.state == streamAllocatorStateDeficient {
|
|
s.maybeProbe()
|
|
}
|
|
|
|
if time.Since(s.lastRTTTime) > cRTTPullInterval {
|
|
s.lastRTTTime = time.Now()
|
|
|
|
if s.params.RTTGetter != nil {
|
|
if rtt, ok := s.params.RTTGetter(); ok {
|
|
s.params.BWE.UpdateRTT(rtt)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalProbeClusterSwitch(event Event) {
|
|
pci := event.Data.(ccutils.ProbeClusterInfo)
|
|
s.activeProbeClusterId = pci.Id
|
|
s.activeProbeGoalReached = false
|
|
s.activeProbeCongesting = false
|
|
|
|
s.params.BWE.ProbeClusterStarting(pci)
|
|
|
|
s.params.Pacer.StartProbeCluster(pci)
|
|
|
|
for _, t := range s.getTracks() {
|
|
t.DownTrack().SetProbeClusterId(pci.Id)
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalSendProbe(event Event) {
|
|
bytesToSend := event.Data.(int)
|
|
if bytesToSend <= 0 {
|
|
return
|
|
}
|
|
|
|
bytesSent := 0
|
|
for _, track := range s.getTracks() {
|
|
sent := track.WriteProbePackets(bytesToSend)
|
|
bytesSent += sent
|
|
bytesToSend -= sent
|
|
if bytesToSend <= 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
s.prober.ProbesSent(bytesSent)
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalPacerProbeObserverClusterComplete(event Event) {
|
|
probeClusterId, _ := event.Data.(ccutils.ProbeClusterId)
|
|
pci := s.params.Pacer.EndProbeCluster(probeClusterId)
|
|
|
|
for _, t := range s.getTracks() {
|
|
t.DownTrack().SwapProbeClusterId(pci.Id, ccutils.ProbeClusterIdInvalid)
|
|
}
|
|
|
|
s.params.BWE.ProbeClusterDone(pci)
|
|
s.prober.ClusterDone(pci)
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalResume(event Event) {
|
|
s.videoTracksMu.Lock()
|
|
track := s.videoTracks[event.TrackID]
|
|
updated := track != nil && track.SetStreamState(StreamStateActive)
|
|
s.videoTracksMu.Unlock()
|
|
|
|
if updated {
|
|
update := NewStreamStateUpdate()
|
|
update.HandleStreamingChange(track, StreamStateActive)
|
|
s.maybeSendUpdate(update)
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalSetAllowPause(event Event) {
|
|
s.allowPause = event.Data.(bool)
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalSetChannelCapacity(event Event) {
|
|
s.overriddenChannelCapacity = event.Data.(int64)
|
|
if s.overriddenChannelCapacity > 0 {
|
|
s.params.Logger.Infow("allocating on override channel capacity", "override", s.overriddenChannelCapacity)
|
|
s.allocateAllTracks()
|
|
} else {
|
|
s.params.Logger.Infow("clearing override channel capacity")
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalCongestionStateChange(event Event) {
|
|
cscd := event.Data.(congestionStateChangeData)
|
|
if cscd.toState != bwe.CongestionStateNone {
|
|
// end/abort any running probe if channel is not clear
|
|
s.maybeStopProbe()
|
|
}
|
|
|
|
// some tracks may have been held at sub-optimal allocation
|
|
// during early warning hold (if there was one)
|
|
if isHoldableCongestionState(cscd.fromState) && cscd.toState == bwe.CongestionStateNone && s.state == streamAllocatorStateStable {
|
|
update := NewStreamStateUpdate()
|
|
for _, track := range s.getTracks() {
|
|
allocation := track.AllocateOptimal(FlagAllowOvershootWhileOptimal, false)
|
|
updateStreamStateChange(track, allocation, update)
|
|
}
|
|
s.maybeSendUpdate(update)
|
|
}
|
|
|
|
if cscd.toState == bwe.CongestionStateCongested {
|
|
if s.activeProbeClusterId != ccutils.ProbeClusterIdInvalid {
|
|
if !s.activeProbeCongesting {
|
|
s.activeProbeCongesting = true
|
|
s.params.Logger.Infow(
|
|
"stream allocator: channel congestion detected, not updating channel capacity in active probe",
|
|
"old(bps)", s.committedChannelCapacity,
|
|
"new(bps)", cscd.estimatedAvailableChannelCapacity,
|
|
"expectedUsage(bps)", s.getExpectedBandwidthUsage(),
|
|
)
|
|
}
|
|
} else {
|
|
s.params.Logger.Infow(
|
|
"stream allocator: channel congestion detected, updating channel capacity",
|
|
"old(bps)", s.committedChannelCapacity,
|
|
"new(bps)", cscd.estimatedAvailableChannelCapacity,
|
|
"expectedUsage(bps)", s.getExpectedBandwidthUsage(),
|
|
)
|
|
s.committedChannelCapacity = cscd.estimatedAvailableChannelCapacity
|
|
|
|
s.allocateAllTracks()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) setState(state streamAllocatorState) {
|
|
if s.state == state {
|
|
return
|
|
}
|
|
|
|
s.params.Logger.Infow("stream allocator: state change", "from", s.state, "to", state)
|
|
s.state = state
|
|
|
|
// restart everything when state is STABLE
|
|
if state == streamAllocatorStateStable {
|
|
s.maybeStopProbe()
|
|
|
|
s.params.BWE.Reset()
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) adjustState() {
|
|
for _, track := range s.getTracks() {
|
|
if track.IsDeficient() {
|
|
s.setState(streamAllocatorStateDeficient)
|
|
return
|
|
}
|
|
}
|
|
|
|
s.setState(streamAllocatorStateStable)
|
|
}
|
|
|
|
func (s *StreamAllocator) allocateTrack(track *Track) {
|
|
// end/abort any probe that may be running when a track specific change needs allocation
|
|
s.maybeStopProbe()
|
|
|
|
// if not deficient, free pass allocate track
|
|
bweCongestionState := s.params.BWE.CongestionState()
|
|
if !s.enabled || (s.state == streamAllocatorStateStable && !isDeficientCongestionState(bweCongestionState)) || !track.IsManaged() {
|
|
update := NewStreamStateUpdate()
|
|
allocation := track.AllocateOptimal(FlagAllowOvershootWhileOptimal, isHoldableCongestionState(bweCongestionState))
|
|
updateStreamStateChange(track, allocation, update)
|
|
s.maybeSendUpdate(update)
|
|
return
|
|
}
|
|
|
|
//
|
|
// In DEFICIENT state,
|
|
// Two possibilities
|
|
// 1. Available headroom is enough to accommodate track that needs change.
|
|
// Note that the track could be muted, hence stopping.
|
|
// 2. Have to steal bits from other tracks currently streaming.
|
|
//
|
|
// For both cases, do
|
|
// a. Find cooperative transition from track that needs allocation.
|
|
// b. If track is giving back bits, apply the transition and use bits given
|
|
// back to boost any deficient track(s).
|
|
//
|
|
// If track needs more bits, i.e. upward transition (may need resume or higher layer subscription),
|
|
// a. Try to allocate using existing headroom. This can be tried to get the best
|
|
// possible fit for the available headroom.
|
|
// b. If there is not enough headroom to allocate anything, ask for best offer from
|
|
// other tracks that are currently streaming and try to use it. This is done only if the
|
|
// track needing change is not currently streaming, i. e. it has to be resumed.
|
|
//
|
|
track.ProvisionalAllocatePrepare()
|
|
transition := track.ProvisionalAllocateGetCooperativeTransition(FlagAllowOvershootWhileDeficient)
|
|
|
|
// downgrade, giving back bits
|
|
if transition.From.GreaterThan(transition.To) {
|
|
allocation := track.ProvisionalAllocateCommit()
|
|
|
|
update := NewStreamStateUpdate()
|
|
updateStreamStateChange(track, allocation, update)
|
|
s.maybeSendUpdate(update)
|
|
|
|
s.adjustState()
|
|
|
|
// Use the bits given back to boost deficient track(s).
|
|
// Note layer downgrade may actually have positive delta (i.e. consume more bits)
|
|
// because of when the measurement is done. But, only available headroom after
|
|
// applying the transition will be used to boost deficient track(s).
|
|
s.maybeBoostDeficientTracks()
|
|
return
|
|
}
|
|
|
|
// a no-op transition
|
|
if transition.From == transition.To {
|
|
return
|
|
}
|
|
|
|
// this track is currently not streaming and needs bits to start OR streaming at some layer and wants more bits.
|
|
// NOTE: With co-operative transition, tracks should not be asking for more if already streaming, but handle that case any way.
|
|
// first try an allocation using available headroom, current consumption of this track is discounted to calculate headroom.
|
|
availableChannelCapacity := s.getAvailableHeadroomWithoutTracks(false, []*Track{track})
|
|
if availableChannelCapacity > 0 {
|
|
track.ProvisionalAllocateReset() // to reset allocation from co-operative transition above and try fresh
|
|
|
|
bestLayer := buffer.InvalidLayer
|
|
|
|
alloc_loop:
|
|
for spatial := int32(0); spatial <= buffer.DefaultMaxLayerSpatial; spatial++ {
|
|
for temporal := int32(0); temporal <= buffer.DefaultMaxLayerTemporal; temporal++ {
|
|
layer := buffer.VideoLayer{
|
|
Spatial: spatial,
|
|
Temporal: temporal,
|
|
}
|
|
|
|
isCandidate, usedChannelCapacity := track.ProvisionalAllocate(
|
|
availableChannelCapacity,
|
|
layer,
|
|
s.allowPause,
|
|
FlagAllowOvershootWhileDeficient,
|
|
)
|
|
if availableChannelCapacity < usedChannelCapacity {
|
|
break alloc_loop
|
|
}
|
|
|
|
if isCandidate {
|
|
bestLayer = layer
|
|
}
|
|
}
|
|
}
|
|
|
|
if bestLayer.IsValid() {
|
|
if bestLayer.GreaterThan(transition.From) {
|
|
// found layer that can fit in available headroom, take it if it is better than existing
|
|
update := NewStreamStateUpdate()
|
|
allocation := track.ProvisionalAllocateCommit()
|
|
updateStreamStateChange(track, allocation, update)
|
|
s.maybeSendUpdate(update)
|
|
}
|
|
|
|
s.adjustState()
|
|
return
|
|
}
|
|
|
|
track.ProvisionalAllocateReset()
|
|
transition = track.ProvisionalAllocateGetCooperativeTransition(FlagAllowOvershootWhileDeficient) // get transition again to reset above allocation attempt using available headroom
|
|
}
|
|
|
|
// if there is not enough headroom, try to redistribute starting with tracks that are closest to their desired.
|
|
bandwidthAcquired := int64(0)
|
|
var contributingTracks []*Track
|
|
|
|
minDistanceSorted := s.getMinDistanceSorted(track)
|
|
for _, t := range minDistanceSorted {
|
|
t.ProvisionalAllocatePrepare()
|
|
}
|
|
|
|
for _, t := range minDistanceSorted {
|
|
tx := t.ProvisionalAllocateGetBestWeightedTransition()
|
|
if tx.BandwidthDelta < 0 {
|
|
contributingTracks = append(contributingTracks, t)
|
|
|
|
bandwidthAcquired += -tx.BandwidthDelta
|
|
if bandwidthAcquired >= transition.BandwidthDelta {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
update := NewStreamStateUpdate()
|
|
if bandwidthAcquired >= transition.BandwidthDelta {
|
|
// commit the tracks that contributed
|
|
for _, t := range contributingTracks {
|
|
allocation := t.ProvisionalAllocateCommit()
|
|
updateStreamStateChange(t, allocation, update)
|
|
}
|
|
|
|
// STREAM-ALLOCATOR-TODO if got too much extra, can potentially give it to some deficient track
|
|
}
|
|
|
|
// commit the track that needs change if enough could be acquired or pause not allowed
|
|
if !s.allowPause || bandwidthAcquired >= transition.BandwidthDelta {
|
|
allocation := track.ProvisionalAllocateCommit()
|
|
updateStreamStateChange(track, allocation, update)
|
|
} else {
|
|
// explicitly pause to ensure stream state update happens if a track coming out of mute cannot be allocated
|
|
allocation := track.Pause()
|
|
updateStreamStateChange(track, allocation, update)
|
|
}
|
|
|
|
s.maybeSendUpdate(update)
|
|
|
|
s.adjustState()
|
|
}
|
|
|
|
func (s *StreamAllocator) maybeStopProbe() {
|
|
if s.activeProbeClusterId == ccutils.ProbeClusterIdInvalid {
|
|
return
|
|
}
|
|
|
|
pci := s.params.Pacer.EndProbeCluster(s.activeProbeClusterId)
|
|
|
|
for _, t := range s.getTracks() {
|
|
t.DownTrack().SwapProbeClusterId(pci.Id, ccutils.ProbeClusterIdInvalid)
|
|
}
|
|
|
|
s.params.BWE.ProbeClusterDone(pci)
|
|
s.prober.Reset(pci)
|
|
}
|
|
|
|
func (s *StreamAllocator) maybeBoostDeficientTracks() {
|
|
availableChannelCapacity := s.getAvailableHeadroom(false)
|
|
if availableChannelCapacity <= 0 {
|
|
return
|
|
}
|
|
|
|
update := NewStreamStateUpdate()
|
|
|
|
sortedTracks := s.getMaxDistanceSortedDeficient()
|
|
boost_loop:
|
|
for {
|
|
for idx, track := range sortedTracks {
|
|
allocation, boosted := track.AllocateNextHigher(availableChannelCapacity, FlagAllowOvershootInCatchup)
|
|
if !boosted {
|
|
if idx == len(sortedTracks)-1 {
|
|
// all tracks tried
|
|
break boost_loop
|
|
}
|
|
continue
|
|
}
|
|
|
|
updateStreamStateChange(track, allocation, update)
|
|
|
|
availableChannelCapacity -= allocation.BandwidthDelta
|
|
if availableChannelCapacity <= 0 {
|
|
break boost_loop
|
|
}
|
|
|
|
break // sort again below as the track that was just boosted could still be farthest from its desired
|
|
}
|
|
sortedTracks = s.getMaxDistanceSortedDeficient()
|
|
if len(sortedTracks) == 0 {
|
|
break // nothing available to boost
|
|
}
|
|
}
|
|
|
|
s.maybeSendUpdate(update)
|
|
|
|
s.adjustState()
|
|
}
|
|
|
|
func (s *StreamAllocator) allocateAllTracks() {
|
|
if !s.enabled {
|
|
// nothing else to do when disabled
|
|
return
|
|
}
|
|
|
|
//
|
|
// Goals:
|
|
// 1. Stream as many tracks as possible, i.e. no pauses.
|
|
// 2. Try to give fair allocation to all track.
|
|
//
|
|
// Start with the lowest layer and give each track a chance at that layer and keep going up.
|
|
// As long as there is enough bandwidth for tracks to stream at the lowest layer, the first goal is achieved.
|
|
//
|
|
// Tracks that have higher subscribed layer can use any additional available bandwidth. This tried to achieve the second goal.
|
|
//
|
|
// If there is not enough bandwidth even for the lowest layer, tracks at lower priorities will be paused.
|
|
//
|
|
update := NewStreamStateUpdate()
|
|
|
|
availableChannelCapacity := s.getAvailableChannelCapacity(true)
|
|
|
|
//
|
|
// This pass is to find out if there is any leftover channel capacity after allocating exempt tracks.
|
|
// Exempt tracks are given optimal allocation (i. e. no bandwidth constraint) so that they do not fail allocation.
|
|
//
|
|
videoTracks := s.getTracks()
|
|
for _, track := range videoTracks {
|
|
if track.IsManaged() {
|
|
continue
|
|
}
|
|
|
|
allocation := track.AllocateOptimal(FlagAllowOvershootExemptTrackWhileDeficient, false)
|
|
updateStreamStateChange(track, allocation, update)
|
|
|
|
// STREAM-ALLOCATOR-TODO: optimistic allocation before bitrate is available will return 0. How to account for that?
|
|
if !s.params.Config.DisableEstimationUnmanagedTracks {
|
|
availableChannelCapacity -= allocation.BandwidthRequested
|
|
}
|
|
}
|
|
|
|
if availableChannelCapacity < 0 {
|
|
availableChannelCapacity = 0
|
|
}
|
|
if availableChannelCapacity == 0 && s.allowPause {
|
|
// nothing left for managed tracks, pause them all
|
|
for _, track := range videoTracks {
|
|
if !track.IsManaged() {
|
|
continue
|
|
}
|
|
|
|
allocation := track.Pause()
|
|
updateStreamStateChange(track, allocation, update)
|
|
}
|
|
} else {
|
|
sorted := s.getSorted()
|
|
for _, track := range sorted {
|
|
track.ProvisionalAllocatePrepare()
|
|
}
|
|
|
|
for spatial := int32(0); spatial <= buffer.DefaultMaxLayerSpatial; spatial++ {
|
|
for temporal := int32(0); temporal <= buffer.DefaultMaxLayerTemporal; temporal++ {
|
|
layer := buffer.VideoLayer{
|
|
Spatial: spatial,
|
|
Temporal: temporal,
|
|
}
|
|
|
|
for _, track := range sorted {
|
|
_, usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient)
|
|
availableChannelCapacity -= usedChannelCapacity
|
|
if availableChannelCapacity < 0 {
|
|
availableChannelCapacity = 0
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, track := range sorted {
|
|
allocation := track.ProvisionalAllocateCommit()
|
|
updateStreamStateChange(track, allocation, update)
|
|
}
|
|
}
|
|
|
|
s.maybeSendUpdate(update)
|
|
|
|
s.adjustState()
|
|
}
|
|
|
|
func (s *StreamAllocator) maybeSendUpdate(update *StreamStateUpdate) {
|
|
if update.Empty() {
|
|
return
|
|
}
|
|
|
|
// logging individual changes to make it easier for logging systems
|
|
for _, streamState := range update.StreamStates {
|
|
s.params.Logger.Debugw("streamed tracks changed",
|
|
"trackID", streamState.TrackID,
|
|
"state", streamState.State,
|
|
)
|
|
}
|
|
if s.onStreamStateChange != nil {
|
|
err := s.onStreamStateChange(update)
|
|
if err != nil {
|
|
s.params.Logger.Errorw("could not send streamed tracks update", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) getAvailableChannelCapacity(allowOverride bool) int64 {
|
|
availableChannelCapacity := s.committedChannelCapacity
|
|
if s.params.Config.MinChannelCapacity > availableChannelCapacity {
|
|
availableChannelCapacity = s.params.Config.MinChannelCapacity
|
|
s.params.Logger.Debugw(
|
|
"stream allocator: overriding channel capacity with min channel capacity",
|
|
"actual", s.committedChannelCapacity,
|
|
"override", availableChannelCapacity,
|
|
)
|
|
}
|
|
if allowOverride && s.overriddenChannelCapacity > 0 {
|
|
availableChannelCapacity = s.overriddenChannelCapacity
|
|
s.params.Logger.Debugw(
|
|
"stream allocator: overriding channel capacity",
|
|
"actual", s.committedChannelCapacity,
|
|
"override", availableChannelCapacity,
|
|
)
|
|
}
|
|
|
|
return availableChannelCapacity
|
|
}
|
|
|
|
func (s *StreamAllocator) getExpectedBandwidthUsage() int64 {
|
|
expected := int64(0)
|
|
for _, track := range s.getTracks() {
|
|
expected += track.BandwidthRequested()
|
|
}
|
|
|
|
return expected
|
|
}
|
|
|
|
func (s *StreamAllocator) getExpectedBandwidthUsageWithoutTracks(filteredTracks []*Track) int64 {
|
|
expected := int64(0)
|
|
for _, track := range s.getTracks() {
|
|
filtered := false
|
|
for _, ft := range filteredTracks {
|
|
if ft == track {
|
|
filtered = true
|
|
break
|
|
}
|
|
}
|
|
if !filtered {
|
|
expected += track.BandwidthRequested()
|
|
}
|
|
}
|
|
|
|
return expected
|
|
}
|
|
|
|
func (s *StreamAllocator) getAvailableHeadroom(allowOverride bool) int64 {
|
|
return s.getAvailableChannelCapacity(allowOverride) - s.getExpectedBandwidthUsage()
|
|
}
|
|
|
|
func (s *StreamAllocator) getAvailableHeadroomWithoutTracks(allowOverride bool, filteredTracks []*Track) int64 {
|
|
return s.getAvailableChannelCapacity(allowOverride) - s.getExpectedBandwidthUsageWithoutTracks(filteredTracks)
|
|
}
|
|
|
|
func (s *StreamAllocator) getNackDelta() (uint32, uint32) {
|
|
aggPacketDelta := uint32(0)
|
|
aggRepeatedNackDelta := uint32(0)
|
|
for _, track := range s.getTracks() {
|
|
packetDelta, nackDelta := track.GetNackDelta()
|
|
aggPacketDelta += packetDelta
|
|
aggRepeatedNackDelta += nackDelta
|
|
}
|
|
|
|
return aggPacketDelta, aggRepeatedNackDelta
|
|
}
|
|
|
|
func (s *StreamAllocator) maybeProbe() {
|
|
if s.overriddenChannelCapacity > 0 {
|
|
// do not probe if channel capacity is overridden
|
|
return
|
|
}
|
|
|
|
if !s.params.BWE.CanProbe() {
|
|
return
|
|
}
|
|
|
|
switch s.params.Config.ProbeMode {
|
|
case ProbeModeMedia:
|
|
s.maybeProbeWithMedia()
|
|
s.adjustState()
|
|
case ProbeModePadding:
|
|
s.maybeProbeWithPadding()
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) maybeProbeWithMedia() {
|
|
// boost deficient track farthest from desired layer
|
|
for _, track := range s.getMaxDistanceSortedDeficient() {
|
|
allocation, boosted := track.AllocateNextHigher(ChannelCapacityInfinity, FlagAllowOvershootInBoost)
|
|
if !boosted {
|
|
continue
|
|
}
|
|
|
|
update := NewStreamStateUpdate()
|
|
updateStreamStateChange(track, allocation, update)
|
|
s.maybeSendUpdate(update)
|
|
|
|
s.params.BWE.Reset()
|
|
break
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) maybeProbeWithPadding() {
|
|
// use deficient track farthest from desired layer to find how much to probe
|
|
for _, track := range s.getMaxDistanceSortedDeficient() {
|
|
transition, available := track.GetNextHigherTransition(FlagAllowOvershootInProbe)
|
|
if !available || transition.BandwidthDelta < 0 {
|
|
continue
|
|
}
|
|
|
|
// overshoot a bit to account for noise (in measurement/estimate etc)
|
|
desiredIncreaseBps := (transition.BandwidthDelta * s.params.Config.ProbeOveragePct) / 100
|
|
if desiredIncreaseBps < s.params.Config.ProbeMinBps {
|
|
desiredIncreaseBps = s.params.Config.ProbeMinBps
|
|
}
|
|
expectedBandwidthUsage := s.getExpectedBandwidthUsage()
|
|
pci := s.prober.AddCluster(
|
|
ccutils.ProbeClusterModeUniform,
|
|
ccutils.ProbeClusterGoal{
|
|
AvailableBandwidthBps: int(s.committedChannelCapacity),
|
|
ExpectedUsageBps: int(expectedBandwidthUsage),
|
|
DesiredBps: int(expectedBandwidthUsage + desiredIncreaseBps),
|
|
Duration: s.params.BWE.ProbeDuration(),
|
|
},
|
|
)
|
|
s.params.Logger.Debugw(
|
|
"stream allocator: adding probe",
|
|
"probeClusterInfo", pci,
|
|
)
|
|
break
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) getTracks() []*Track {
|
|
s.videoTracksMu.RLock()
|
|
tracks := make([]*Track, 0, len(s.videoTracks))
|
|
for _, track := range s.videoTracks {
|
|
tracks = append(tracks, track)
|
|
}
|
|
s.videoTracksMu.RUnlock()
|
|
|
|
return tracks
|
|
}
|
|
|
|
func (s *StreamAllocator) getSorted() TrackSorter {
|
|
s.videoTracksMu.RLock()
|
|
var trackSorter TrackSorter
|
|
for _, track := range s.videoTracks {
|
|
if !track.IsManaged() {
|
|
continue
|
|
}
|
|
|
|
trackSorter = append(trackSorter, track)
|
|
}
|
|
s.videoTracksMu.RUnlock()
|
|
|
|
sort.Sort(trackSorter)
|
|
|
|
return trackSorter
|
|
}
|
|
|
|
func (s *StreamAllocator) getMinDistanceSorted(exclude *Track) MinDistanceSorter {
|
|
s.videoTracksMu.RLock()
|
|
var minDistanceSorter MinDistanceSorter
|
|
for _, track := range s.videoTracks {
|
|
if !track.IsManaged() || track == exclude {
|
|
continue
|
|
}
|
|
|
|
minDistanceSorter = append(minDistanceSorter, track)
|
|
}
|
|
s.videoTracksMu.RUnlock()
|
|
|
|
sort.Sort(minDistanceSorter)
|
|
|
|
return minDistanceSorter
|
|
}
|
|
|
|
func (s *StreamAllocator) getMaxDistanceSortedDeficient() MaxDistanceSorter {
|
|
s.videoTracksMu.RLock()
|
|
var maxDistanceSorter MaxDistanceSorter
|
|
for _, track := range s.videoTracks {
|
|
if !track.IsManaged() || !track.IsDeficient() {
|
|
continue
|
|
}
|
|
|
|
maxDistanceSorter = append(maxDistanceSorter, track)
|
|
}
|
|
s.videoTracksMu.RUnlock()
|
|
|
|
sort.Sort(maxDistanceSorter)
|
|
|
|
return maxDistanceSorter
|
|
}
|
|
|
|
// ------------------------------------------------
|
|
|
|
func updateStreamStateChange(track *Track, allocation sfu.VideoAllocation, update *StreamStateUpdate) {
|
|
updated := false
|
|
streamState := StreamStateInactive
|
|
switch allocation.PauseReason {
|
|
case sfu.VideoPauseReasonMuted:
|
|
fallthrough
|
|
|
|
case sfu.VideoPauseReasonPubMuted:
|
|
streamState = StreamStateInactive
|
|
updated = track.SetStreamState(streamState)
|
|
|
|
case sfu.VideoPauseReasonBandwidth:
|
|
streamState = StreamStatePaused
|
|
updated = track.SetStreamState(streamState)
|
|
}
|
|
|
|
if updated {
|
|
update.HandleStreamingChange(track, streamState)
|
|
}
|
|
}
|
|
|
|
func isHoldableCongestionState(bweCongestionState bwe.CongestionState) bool {
|
|
return bweCongestionState == bwe.CongestionStateEarlyWarning || bweCongestionState == bwe.CongestionStateEarlyWarningHangover
|
|
}
|
|
|
|
func isDeficientCongestionState(bweCongestionState bwe.CongestionState) bool {
|
|
return bweCongestionState == bwe.CongestionStateCongested || bweCongestionState == bwe.CongestionStateCongestedHangover
|
|
}
|
|
|
|
// ------------------------------------------------
|