Files
livekit/pkg/sfu/streamallocator/streamallocator.go
Raja Subramanian e32eaa451f Refactor video layer selector (#1588)
* WIP commit

* WIP commit

* fix test

* FPS for VP9

* WIP commit

* test changes

* WIP commit

* h264

* codec munger

* forwarder state

* clean up a bit

* dd interface

* WIP commit

* WIP commit

* WIP commit

* WIP commit

* more TODO notes

* overshoot interface

* clean up

* clean up isTemporalSupported

* wait for key frame to resume

* clean up VP8 payload descriptor stuff

* temporal layer selector

* comment out vp9 and av1

* space

* fix test compile

* append bytes

* fix tests

* fix test
2023-04-08 10:57:57 +05:30

1266 lines
34 KiB
Go

package streamallocator
import (
"fmt"
"math"
"sort"
"sync"
"time"
"github.com/pion/interceptor/pkg/cc"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"go.uber.org/atomic"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
)
const (
ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps
NackRatioAttenuator = 0.4 // how much to attenuate NACK ratio while calculating loss adjusted estimate
ProbeWaitBase = 5 * time.Second
ProbeBackoffFactor = 1.5
ProbeWaitMax = 30 * time.Second
ProbeSettleWait = 250
ProbeTrendWait = 2 * time.Second
ProbePct = 120
ProbeMinBps = 200 * 1000 // 200 kbps
ProbeMinDuration = 20 * time.Second
ProbeMaxDuration = 21 * time.Second
PriorityMin = uint8(1)
PriorityMax = uint8(255)
PriorityDefaultScreenshare = PriorityMax
PriorityDefaultVideo = PriorityMin
FlagAllowOvershootWhileOptimal = true
FlagAllowOvershootWhileDeficient = false
FlagAllowOvershootExemptTrackWhileDeficient = true
FlagAllowOvershootInProbe = true
FlagAllowOvershootInCatchup = true
)
var (
ChannelObserverParamsProbe = ChannelObserverParams{
Name: "probe",
EstimateRequiredSamples: 3,
EstimateDownwardTrendThreshold: 0.0,
EstimateCollapseValues: false,
NackWindowMinDuration: 500 * time.Millisecond,
NackWindowMaxDuration: 1 * time.Second,
NackRatioThreshold: 0.04,
}
ChannelObserverParamsNonProbe = ChannelObserverParams{
Name: "non-probe",
EstimateRequiredSamples: 8,
EstimateDownwardTrendThreshold: -0.5,
EstimateCollapseValues: true,
NackWindowMinDuration: 1 * time.Second,
NackWindowMaxDuration: 2 * time.Second,
NackRatioThreshold: 0.08,
}
)
type streamAllocatorState int
const (
streamAllocatorStateStable streamAllocatorState = iota
streamAllocatorStateDeficient
)
func (s streamAllocatorState) String() string {
switch s {
case streamAllocatorStateStable:
return "STABLE"
case streamAllocatorStateDeficient:
return "DEFICIENT"
default:
return fmt.Sprintf("%d", int(s))
}
}
type streamAllocatorSignal int
const (
streamAllocatorSignalAllocateTrack streamAllocatorSignal = iota
streamAllocatorSignalAllocateAllTracks
streamAllocatorSignalAdjustState
streamAllocatorSignalEstimate
streamAllocatorSignalPeriodicPing
streamAllocatorSignalSendProbe
streamAllocatorSignalProbeClusterDone
streamAllocatorSignalResume
)
func (s streamAllocatorSignal) String() string {
switch s {
case streamAllocatorSignalAllocateTrack:
return "ALLOCATE_TRACK"
case streamAllocatorSignalAllocateAllTracks:
return "ALLOCATE_ALL_TRACKS"
case streamAllocatorSignalAdjustState:
return "ADJUST_STATE"
case streamAllocatorSignalEstimate:
return "ESTIMATE"
case streamAllocatorSignalPeriodicPing:
return "PERIODIC_PING"
case streamAllocatorSignalSendProbe:
return "SEND_PROBE"
case streamAllocatorSignalProbeClusterDone:
return "PROBE_CLUSTER_DONE"
case streamAllocatorSignalResume:
return "RESUME"
default:
return fmt.Sprintf("%d", int(s))
}
}
type Event struct {
Signal streamAllocatorSignal
TrackID livekit.TrackID
Data interface{}
}
func (e Event) String() string {
return fmt.Sprintf("StreamAllocator:Event{signal: %s, trackID: %s, data: %+v}", e.Signal, e.TrackID, e.Data)
}
type StreamAllocatorParams struct {
Config config.CongestionControlConfig
Logger logger.Logger
}
type StreamAllocator struct {
params StreamAllocatorParams
onStreamStateChange func(update *StreamStateUpdate) error
bwe cc.BandwidthEstimator
lastReceivedEstimate int64
committedChannelCapacity int64
probeInterval time.Duration
lastProbeStartTime time.Time
probeGoalBps int64
probeClusterId ProbeClusterId
abortedProbeClusterId ProbeClusterId
probeTrendObserved bool
probeEndTime time.Time
prober *Prober
channelObserver *ChannelObserver
videoTracksMu sync.RWMutex
videoTracks map[livekit.TrackID]*Track
isAllocateAllPending bool
rembTrackingSSRC uint32
state streamAllocatorState
eventChMu sync.RWMutex
eventCh chan Event
isStopped atomic.Bool
}
func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator {
s := &StreamAllocator{
params: params,
prober: NewProber(ProberParams{
Logger: params.Logger,
}),
videoTracks: make(map[livekit.TrackID]*Track),
eventCh: make(chan Event, 200),
}
s.resetState()
s.prober.SetProberListener(s)
return s
}
func (s *StreamAllocator) Start() {
go s.processEvents()
go s.ping()
}
func (s *StreamAllocator) Stop() {
s.eventChMu.Lock()
if s.isStopped.Swap(true) {
s.eventChMu.Unlock()
return
}
close(s.eventCh)
s.eventChMu.Unlock()
}
func (s *StreamAllocator) OnStreamStateChange(f func(update *StreamStateUpdate) error) {
s.onStreamStateChange = f
}
func (s *StreamAllocator) SetBandwidthEstimator(bwe cc.BandwidthEstimator) {
if bwe != nil {
bwe.OnTargetBitrateChange(s.onTargetBitrateChange)
}
s.bwe = bwe
}
type AddTrackParams struct {
Source livekit.TrackSource
Priority uint8
IsSimulcast bool
PublisherID livekit.ParticipantID
}
func (s *StreamAllocator) AddTrack(downTrack *sfu.DownTrack, params AddTrackParams) {
if downTrack.Kind() != webrtc.RTPCodecTypeVideo {
return
}
track := NewTrack(downTrack, params.Source, params.IsSimulcast, params.PublisherID, s.params.Logger)
track.SetPriority(params.Priority)
s.videoTracksMu.Lock()
s.videoTracks[livekit.TrackID(downTrack.ID())] = track
s.videoTracksMu.Unlock()
downTrack.SetStreamAllocatorListener(s)
if s.prober.IsRunning() {
// LK-TODO: this can be changed to adapt to probe rate
downTrack.SetStreamAllocatorReportInterval(50 * time.Millisecond)
}
s.maybePostEventAllocateTrack(downTrack)
}
func (s *StreamAllocator) RemoveTrack(downTrack *sfu.DownTrack) {
s.videoTracksMu.Lock()
if existing := s.videoTracks[livekit.TrackID(downTrack.ID())]; existing != nil && existing.DownTrack() == downTrack {
delete(s.videoTracks, livekit.TrackID(downTrack.ID()))
}
s.videoTracksMu.Unlock()
// LK-TODO: use any saved bandwidth to re-distribute
s.postEvent(Event{
Signal: streamAllocatorSignalAdjustState,
})
}
func (s *StreamAllocator) SetTrackPriority(downTrack *sfu.DownTrack, priority uint8) {
s.videoTracksMu.Lock()
if track := s.videoTracks[livekit.TrackID(downTrack.ID())]; track != nil {
changed := track.SetPriority(priority)
if changed && !s.isAllocateAllPending {
// do a full allocation on a track priority change to keep it simple
s.isAllocateAllPending = true
s.postEvent(Event{
Signal: streamAllocatorSignalAllocateAllTracks,
})
}
}
s.videoTracksMu.Unlock()
}
func (s *StreamAllocator) resetState() {
s.channelObserver = s.newChannelObserverNonProbe()
s.resetProbe()
s.state = streamAllocatorStateStable
}
// called when a new REMB is received (receive side bandwidth estimation)
func (s *StreamAllocator) OnREMB(downTrack *sfu.DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate) {
//
// Channel capacity is estimated at a peer connection level. All down tracks
// in the peer connection will end up calling this for a REMB report with
// the same estimated channel capacity. Use a tracking SSRC to lock onto to
// one report. As SSRCs can be dropped over time, update tracking SSRC as needed
//
// A couple of things to keep in mind
// - REMB reports could be sent gratuitously as a way of providing
// periodic feedback, i.e. even if the estimated capacity does not
// change, there could be REMB packets on the wire. Those gratuitous
// REMBs should not trigger anything bad.
// - As each down track will issue this callback for the same REMB packet
// from the wire, theoretically it is possible that one down track's
// callback from previous REMB comes after another down track's callback
// from the new REMB. REMBs could fire very quickly especially when
// the network is entering congestion.
// LK-TODO-START
// Need to check if the same SSRC reports can somehow race, i.e. does pion send
// RTCP dispatch for same SSRC on different threads? If not, the tracking SSRC
// should prevent racing
// LK-TODO-END
//
// if there are no video tracks, ignore any straggler REMB
s.videoTracksMu.Lock()
if len(s.videoTracks) == 0 {
s.videoTracksMu.Unlock()
return
}
track := s.videoTracks[livekit.TrackID(downTrack.ID())]
downTrackSSRC := uint32(0)
if track != nil {
downTrackSSRC = track.DownTrack().SSRC()
}
found := false
for _, ssrc := range remb.SSRCs {
if ssrc == s.rembTrackingSSRC {
found = true
break
}
}
if !found {
if len(remb.SSRCs) == 0 {
s.params.Logger.Warnw("stream allocator: no SSRC to track REMB", nil)
s.videoTracksMu.Unlock()
return
}
// try to lock to track which is sending this update
if downTrackSSRC != 0 {
for _, ssrc := range remb.SSRCs {
if ssrc == downTrackSSRC {
s.rembTrackingSSRC = downTrackSSRC
found = true
break
}
}
}
if !found {
s.rembTrackingSSRC = remb.SSRCs[0]
}
}
if s.rembTrackingSSRC == 0 || s.rembTrackingSSRC != downTrackSSRC {
s.videoTracksMu.Unlock()
return
}
s.videoTracksMu.Unlock()
s.postEvent(Event{
Signal: streamAllocatorSignalEstimate,
Data: int64(remb.Bitrate),
})
}
// called when a new transport-cc feedback is received
func (s *StreamAllocator) OnTransportCCFeedback(downTrack *sfu.DownTrack, fb *rtcp.TransportLayerCC) {
if s.bwe != nil {
s.bwe.WriteRTCP([]rtcp.Packet{fb}, nil)
}
}
// called when target bitrate changes (send side bandwidth estimation)
func (s *StreamAllocator) onTargetBitrateChange(bitrate int) {
s.postEvent(Event{
Signal: streamAllocatorSignalEstimate,
Data: int64(bitrate),
})
}
// called when feeding track's layer availability changes
func (s *StreamAllocator) OnAvailableLayersChanged(downTrack *sfu.DownTrack) {
s.maybePostEventAllocateTrack(downTrack)
}
// called when feeding track's bitrate measurement of any layer is available
func (s *StreamAllocator) OnBitrateAvailabilityChanged(downTrack *sfu.DownTrack) {
s.maybePostEventAllocateTrack(downTrack)
}
// called when feeding track's max publisher layer changes
func (s *StreamAllocator) OnMaxPublishedLayerChanged(downTrack *sfu.DownTrack) {
s.maybePostEventAllocateTrack(downTrack)
}
// called when subscription settings changes (muting/unmuting of track)
func (s *StreamAllocator) OnSubscriptionChanged(downTrack *sfu.DownTrack) {
s.maybePostEventAllocateTrack(downTrack)
}
// called when subscribed layers changes (limiting max layers)
func (s *StreamAllocator) OnSubscribedLayersChanged(downTrack *sfu.DownTrack, layers buffer.VideoLayer) {
shouldPost := false
s.videoTracksMu.Lock()
if track := s.videoTracks[livekit.TrackID(downTrack.ID())]; track != nil {
if track.SetMaxLayers(layers) && track.SetDirty(true) {
shouldPost = true
}
}
s.videoTracksMu.Unlock()
if shouldPost {
s.postEvent(Event{
Signal: streamAllocatorSignalAllocateTrack,
TrackID: livekit.TrackID(downTrack.ID()),
})
}
}
// called when forwarder resumes a track
func (s *StreamAllocator) OnResume(downTrack *sfu.DownTrack) {
s.postEvent(Event{
Signal: streamAllocatorSignalResume,
TrackID: livekit.TrackID(downTrack.ID()),
})
}
// called when a video DownTrack sends a packet
func (s *StreamAllocator) OnPacketsSent(downTrack *sfu.DownTrack, size int) {
s.prober.PacketsSent(size)
}
// called when prober wants to send packet(s)
func (s *StreamAllocator) OnSendProbe(bytesToSend int) {
s.postEvent(Event{
Signal: streamAllocatorSignalSendProbe,
Data: bytesToSend,
})
}
// called when prober wants to send packet(s)
func (s *StreamAllocator) OnProbeClusterDone(info ProbeClusterInfo) {
s.postEvent(Event{
Signal: streamAllocatorSignalProbeClusterDone,
Data: info,
})
}
// called when prober active state changes
func (s *StreamAllocator) OnActiveChanged(isActive bool) {
for _, t := range s.getTracks() {
if isActive {
// LK-TODO: this can be changed to adapt to probe rate
t.DownTrack().SetStreamAllocatorReportInterval(50 * time.Millisecond)
} else {
t.DownTrack().ClearStreamAllocatorReportInterval()
}
}
}
func (s *StreamAllocator) maybePostEventAllocateTrack(downTrack *sfu.DownTrack) {
shouldPost := false
s.videoTracksMu.Lock()
if track := s.videoTracks[livekit.TrackID(downTrack.ID())]; track != nil {
if track.SetDirty(true) {
shouldPost = true
}
}
s.videoTracksMu.Unlock()
if shouldPost {
s.postEvent(Event{
Signal: streamAllocatorSignalAllocateTrack,
TrackID: livekit.TrackID(downTrack.ID()),
})
}
}
func (s *StreamAllocator) postEvent(event Event) {
s.eventChMu.RLock()
if s.isStopped.Load() {
s.eventChMu.RUnlock()
return
}
select {
case s.eventCh <- event:
default:
s.params.Logger.Warnw("stream allocator: event queue full", nil)
}
s.eventChMu.RUnlock()
}
func (s *StreamAllocator) processEvents() {
for event := range s.eventCh {
s.handleEvent(&event)
}
s.stopProbe()
}
func (s *StreamAllocator) ping() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
<-ticker.C
if s.isStopped.Load() {
return
}
s.postEvent(Event{
Signal: streamAllocatorSignalPeriodicPing,
})
}
}
func (s *StreamAllocator) handleEvent(event *Event) {
switch event.Signal {
case streamAllocatorSignalAllocateTrack:
s.handleSignalAllocateTrack(event)
case streamAllocatorSignalAllocateAllTracks:
s.handleSignalAllocateAllTracks(event)
case streamAllocatorSignalAdjustState:
s.handleSignalAdjustState(event)
case streamAllocatorSignalEstimate:
s.handleSignalEstimate(event)
case streamAllocatorSignalPeriodicPing:
s.handleSignalPeriodicPing(event)
case streamAllocatorSignalSendProbe:
s.handleSignalSendProbe(event)
case streamAllocatorSignalProbeClusterDone:
s.handleSignalProbeClusterDone(event)
case streamAllocatorSignalResume:
s.handleSignalResume(event)
}
}
func (s *StreamAllocator) handleSignalAllocateTrack(event *Event) {
s.videoTracksMu.Lock()
track := s.videoTracks[event.TrackID]
if track != nil {
track.SetDirty(false)
}
s.videoTracksMu.Unlock()
if track != nil {
s.allocateTrack(track)
}
}
func (s *StreamAllocator) handleSignalAllocateAllTracks(event *Event) {
s.videoTracksMu.Lock()
s.isAllocateAllPending = false
s.videoTracksMu.Unlock()
if s.state == streamAllocatorStateDeficient {
s.allocateAllTracks()
}
}
func (s *StreamAllocator) handleSignalAdjustState(event *Event) {
s.adjustState()
}
func (s *StreamAllocator) handleSignalEstimate(event *Event) {
receivedEstimate, _ := event.Data.(int64)
s.lastReceivedEstimate = receivedEstimate
// while probing, maintain estimate separately to enable keeping current committed estimate if probe fails
if s.isInProbe() {
s.handleNewEstimateInProbe()
} else {
s.handleNewEstimateInNonProbe()
}
}
func (s *StreamAllocator) handleSignalPeriodicPing(event *Event) {
// finalize probe if necessary
if s.isInProbe() && !s.probeEndTime.IsZero() && time.Now().After(s.probeEndTime) {
s.finalizeProbe()
}
// probe if necessary and timing is right
if s.state == streamAllocatorStateDeficient {
s.maybeProbe()
}
}
func (s *StreamAllocator) handleSignalSendProbe(event *Event) {
bytesToSend := event.Data.(int)
if bytesToSend <= 0 {
return
}
bytesSent := 0
for _, track := range s.getTracks() {
sent := track.WritePaddingRTP(bytesToSend)
bytesSent += sent
bytesToSend -= sent
if bytesToSend <= 0 {
break
}
}
if bytesSent != 0 {
s.prober.ProbeSent(bytesSent)
}
}
func (s *StreamAllocator) handleSignalProbeClusterDone(event *Event) {
info, _ := event.Data.(ProbeClusterInfo)
if s.probeClusterId != info.Id {
return
}
if s.abortedProbeClusterId == ProbeClusterIdInvalid {
// successful probe, finalize
s.finalizeProbe()
return
}
// ensure probe queue is flushed
// LK-TODO: ProbeSettleWait should actually be a certain number of RTTs.
lowestEstimate := int64(math.Min(float64(s.committedChannelCapacity), float64(s.channelObserver.GetLowestEstimate())))
expectedDuration := float64(info.BytesSent*8*1000) / float64(lowestEstimate)
queueTime := expectedDuration - float64(info.Duration.Milliseconds())
if queueTime < 0.0 {
queueTime = 0.0
}
queueWait := time.Duration(queueTime+float64(ProbeSettleWait)) * time.Millisecond
s.probeEndTime = s.lastProbeStartTime.Add(queueWait)
}
func (s *StreamAllocator) handleSignalResume(event *Event) {
s.videoTracksMu.Lock()
track := s.videoTracks[event.TrackID]
s.videoTracksMu.Unlock()
if track != nil {
update := NewStreamStateUpdate()
if track.SetPaused(false) {
update.HandleStreamingChange(false, track)
}
s.maybeSendUpdate(update)
}
}
func (s *StreamAllocator) setState(state streamAllocatorState) {
if s.state == state {
return
}
s.params.Logger.Infow("stream allocator: state change", "from", s.state, "to", state)
s.state = state
// reset probe to enforce a delay after state change before probing
s.lastProbeStartTime = time.Now()
}
func (s *StreamAllocator) adjustState() {
for _, track := range s.getTracks() {
if track.IsDeficient() {
s.setState(streamAllocatorStateDeficient)
return
}
}
s.setState(streamAllocatorStateStable)
}
func (s *StreamAllocator) handleNewEstimateInProbe() {
// always update NACKs, even if aborted
packetDelta, repeatedNackDelta := s.getNackDelta()
if s.abortedProbeClusterId != ProbeClusterIdInvalid {
// waiting for aborted probe to finalize
return
}
s.channelObserver.AddEstimate(s.lastReceivedEstimate)
s.channelObserver.AddNack(packetDelta, repeatedNackDelta)
trend, _ := s.channelObserver.GetTrend()
if trend != ChannelTrendNeutral {
s.probeTrendObserved = true
}
switch {
case !s.probeTrendObserved && time.Since(s.lastProbeStartTime) > ProbeTrendWait:
//
// More of a safety net.
// In rare cases, the estimate gets stuck. Prevent from probe running amok
// LK-TODO: Need more testing here to ensure that probe does not cause a lot of damage
//
s.params.Logger.Infow("stream allocator: probe: aborting, no trend", "cluster", s.probeClusterId)
s.abortProbe()
case trend == ChannelTrendCongesting:
// stop immediately if the probe is congesting channel more
s.params.Logger.Infow("stream allocator: probe: aborting, channel is congesting", "cluster", s.probeClusterId)
s.abortProbe()
case s.channelObserver.GetHighestEstimate() > s.probeGoalBps:
// reached goal, stop probing
s.params.Logger.Infow(
"stream allocator: probe: stopping, goal reached",
"cluster", s.probeClusterId,
"goal", s.probeGoalBps,
"highest", s.channelObserver.GetHighestEstimate(),
)
s.stopProbe()
}
}
func (s *StreamAllocator) handleNewEstimateInNonProbe() {
s.channelObserver.AddEstimate(s.lastReceivedEstimate)
packetDelta, repeatedNackDelta := s.getNackDelta()
s.channelObserver.AddNack(packetDelta, repeatedNackDelta)
trend, reason := s.channelObserver.GetTrend()
if trend != ChannelTrendCongesting {
return
}
var estimateToCommit int64
expectedBandwidthUsage := s.getExpectedBandwidthUsage()
packets, repeatedNacks, nackRatio := s.channelObserver.GetNackRatio()
switch reason {
case ChannelCongestionReasonLoss:
estimateToCommit = int64(float64(expectedBandwidthUsage) * (1.0 - NackRatioAttenuator*nackRatio))
default:
estimateToCommit = s.lastReceivedEstimate
}
s.params.Logger.Infow(
"stream allocator: channel congestion detected, updating channel capacity",
"reason", reason,
"old(bps)", s.committedChannelCapacity,
"new(bps)", estimateToCommit,
"lastReceived(bps)", s.lastReceivedEstimate,
"expectedUsage(bps)", expectedBandwidthUsage,
"packets", packets,
"repeatedNacks", repeatedNacks,
"nackRatio", nackRatio,
)
s.committedChannelCapacity = estimateToCommit
// reset to get new set of samples for next trend
s.channelObserver = s.newChannelObserverNonProbe()
// reset probe to ensure it does not start too soon after a downward trend
s.resetProbe()
s.allocateAllTracks()
}
func (s *StreamAllocator) allocateTrack(track *Track) {
// abort any probe that may be running when a track specific change needs allocation
s.abortProbe()
// if not deficient, free pass allocate track
if !s.params.Config.Enabled || s.state == streamAllocatorStateStable || !track.IsManaged() {
update := NewStreamStateUpdate()
allocation := track.AllocateOptimal(FlagAllowOvershootWhileOptimal)
if allocation.PauseReason == sfu.VideoPauseReasonBandwidth && track.SetPaused(true) {
update.HandleStreamingChange(true, track)
}
s.maybeSendUpdate(update)
return
}
//
// In DEFICIENT state,
// 1. Find cooperative transition from track that needs allocation.
// 2. If track is currently streaming at minimum, do not do anything.
// 3. If that track is giving back bits, apply the transition.
// 4. If this track needs more, ask for best offer from others and try to use it.
//
track.ProvisionalAllocatePrepare()
transition := track.ProvisionalAllocateGetCooperativeTransition(FlagAllowOvershootWhileDeficient)
// track is currently streaming at minimum
if transition.BandwidthDelta == 0 {
return
}
// downgrade, giving back bits
if transition.From.GreaterThan(transition.To) {
allocation := track.ProvisionalAllocateCommit()
update := NewStreamStateUpdate()
if allocation.PauseReason == sfu.VideoPauseReasonBandwidth && track.SetPaused(true) {
update.HandleStreamingChange(true, track)
}
s.maybeSendUpdate(update)
s.adjustState()
return
// LK-TODO-START
// Should use the bits given back to start any paused track.
// Note layer downgrade may actually have positive delta (i.e. consume more bits)
// because of when the measurement is done. Watch for that.
// LK-TODO-END
}
//
// This track is currently not streaming and needs bits to start.
// Try to redistribute starting with tracks that are closest to their desired.
//
bandwidthAcquired := int64(0)
var contributingTracks []*Track
minDistanceSorted := s.getMinDistanceSorted(track)
for _, t := range minDistanceSorted {
t.ProvisionalAllocatePrepare()
}
for _, t := range minDistanceSorted {
tx := t.ProvisionalAllocateGetBestWeightedTransition()
if tx.BandwidthDelta < 0 {
contributingTracks = append(contributingTracks, t)
bandwidthAcquired += -tx.BandwidthDelta
if bandwidthAcquired >= transition.BandwidthDelta {
break
}
}
}
update := NewStreamStateUpdate()
if bandwidthAcquired >= transition.BandwidthDelta {
// commit the tracks that contributed
for _, t := range contributingTracks {
allocation := t.ProvisionalAllocateCommit()
if allocation.PauseReason == sfu.VideoPauseReasonBandwidth && track.SetPaused(true) {
update.HandleStreamingChange(true, t)
}
}
// LK-TODO if got too much extra, can potentially give it to some deficient track
}
// commit the track that needs change if enough could be acquired or pause not allowed
if !s.params.Config.AllowPause || bandwidthAcquired >= transition.BandwidthDelta {
allocation := track.ProvisionalAllocateCommit()
if allocation.PauseReason == sfu.VideoPauseReasonBandwidth && track.SetPaused(true) {
update.HandleStreamingChange(true, track)
}
}
s.maybeSendUpdate(update)
s.adjustState()
}
func (s *StreamAllocator) finalizeProbe() {
aborted := s.probeClusterId == s.abortedProbeClusterId
highestEstimateInProbe := s.channelObserver.GetHighestEstimate()
s.clearProbe()
//
// Reset estimator at the end of a probe irrespective of probe result to get fresh readings.
// With a failed probe, the latest estimate would be lower than committed estimate.
// As bandwidth estimator (remote in REMB case, local in TWCC case) holds state,
// subsequent estimates could start from the lower point. That should not trigger a
// downward trend and get latched to committed estimate as that would trigger a re-allocation.
// With fresh readings, as long as the trend is not going downward, it will not get latched.
//
// NOTE: With TWCC, it is possible to reset bandwidth estimation to clean state as
// the send side is in full control of bandwidth estimation.
//
s.channelObserver = s.newChannelObserverNonProbe()
if aborted {
// failed probe, backoff
s.backoffProbeInterval()
return
}
// reset probe interval on a successful probe
s.resetProbeInterval()
// probe estimate is same or higher, commit it and try to allocate deficient tracks
s.params.Logger.Infow(
"successful probe, updating channel capacity",
"old(bps)", s.committedChannelCapacity,
"new(bps)", highestEstimateInProbe,
)
s.committedChannelCapacity = highestEstimateInProbe
s.maybeBoostDeficientTracks()
}
func (s *StreamAllocator) maybeBoostDeficientTracks() {
committedChannelCapacity := s.committedChannelCapacity
if s.params.Config.MinChannelCapacity > committedChannelCapacity {
committedChannelCapacity = s.params.Config.MinChannelCapacity
s.params.Logger.Debugw(
"stream allocator: overriding channel capacity",
"actual", s.committedChannelCapacity,
"override", committedChannelCapacity,
)
}
availableChannelCapacity := committedChannelCapacity - s.getExpectedBandwidthUsage()
if availableChannelCapacity <= 0 {
return
}
update := NewStreamStateUpdate()
for _, track := range s.getMaxDistanceSortedDeficient() {
allocation, boosted := track.AllocateNextHigher(availableChannelCapacity, FlagAllowOvershootInCatchup)
if !boosted {
continue
}
if allocation.PauseReason == sfu.VideoPauseReasonBandwidth && track.SetPaused(true) {
update.HandleStreamingChange(true, track)
}
availableChannelCapacity -= allocation.BandwidthDelta
if availableChannelCapacity <= 0 {
break
}
}
s.maybeSendUpdate(update)
s.adjustState()
}
func (s *StreamAllocator) allocateAllTracks() {
if !s.params.Config.Enabled {
// nothing else to do when disabled
return
}
//
// Goals:
// 1. Stream as many tracks as possible, i.e. no pauses.
// 2. Try to give fair allocation to all track.
//
// Start with the lowest layers and give each track a chance at that layer and keep going up.
// As long as there is enough bandwidth for tracks to stream at the lowest layers, the first goal is achieved.
//
// Tracks that have higher subscribed layers can use any additional available bandwidth. This tried to achieve the second goal.
//
// If there is not enough bandwidth even for the lowest layers, tracks at lower priorities will be paused.
//
update := NewStreamStateUpdate()
availableChannelCapacity := s.committedChannelCapacity
if s.params.Config.MinChannelCapacity > availableChannelCapacity {
availableChannelCapacity = s.params.Config.MinChannelCapacity
s.params.Logger.Debugw(
"stream allocator: overriding channel capacity",
"actual", s.committedChannelCapacity,
"override", availableChannelCapacity,
)
}
//
// This pass is to find out if there is any leftover channel capacity after allocating exempt tracks.
// Exempt tracks are given optimal allocation (i. e. no bandwidth constraint) so that they do not fail allocation.
//
videoTracks := s.getTracks()
for _, track := range videoTracks {
if track.IsManaged() {
continue
}
allocation := track.AllocateOptimal(FlagAllowOvershootExemptTrackWhileDeficient)
if allocation.PauseReason == sfu.VideoPauseReasonBandwidth && track.SetPaused(true) {
update.HandleStreamingChange(true, track)
}
// LK-TODO: optimistic allocation before bitrate is available will return 0. How to account for that?
availableChannelCapacity -= allocation.BandwidthRequested
}
if availableChannelCapacity < 0 {
availableChannelCapacity = 0
}
if availableChannelCapacity == 0 && s.params.Config.AllowPause {
// nothing left for managed tracks, pause them all
for _, track := range videoTracks {
if !track.IsManaged() {
continue
}
allocation := track.Pause()
if allocation.PauseReason == sfu.VideoPauseReasonBandwidth && track.SetPaused(true) {
update.HandleStreamingChange(true, track)
}
}
} else {
sorted := s.getSorted()
for _, track := range sorted {
track.ProvisionalAllocatePrepare()
}
for spatial := int32(0); spatial <= buffer.DefaultMaxLayerSpatial; spatial++ {
for temporal := int32(0); temporal <= buffer.DefaultMaxLayerTemporal; temporal++ {
layers := buffer.VideoLayer{
Spatial: spatial,
Temporal: temporal,
}
for _, track := range sorted {
usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layers, s.params.Config.AllowPause, FlagAllowOvershootWhileDeficient)
availableChannelCapacity -= usedChannelCapacity
if availableChannelCapacity < 0 {
availableChannelCapacity = 0
}
}
}
}
for _, track := range sorted {
allocation := track.ProvisionalAllocateCommit()
if allocation.PauseReason == sfu.VideoPauseReasonBandwidth && track.SetPaused(true) {
update.HandleStreamingChange(true, track)
}
}
}
s.maybeSendUpdate(update)
s.adjustState()
}
func (s *StreamAllocator) maybeSendUpdate(update *StreamStateUpdate) {
if update.Empty() {
return
}
// logging individual changes to make it easier for logging systems
for _, streamState := range update.StreamStates {
s.params.Logger.Debugw("streamed tracks changed",
"trackID", streamState.TrackID,
"state", streamState.State,
)
}
if s.onStreamStateChange != nil {
err := s.onStreamStateChange(update)
if err != nil {
s.params.Logger.Errorw("could not send streamed tracks update", err)
}
}
}
func (s *StreamAllocator) getExpectedBandwidthUsage() int64 {
expected := int64(0)
for _, track := range s.getTracks() {
expected += track.BandwidthRequested()
}
return expected
}
func (s *StreamAllocator) getNackDelta() (uint32, uint32) {
aggPacketDelta := uint32(0)
aggRepeatedNackDelta := uint32(0)
for _, track := range s.getTracks() {
packetDelta, nackDelta := track.GetNackDelta()
aggPacketDelta += packetDelta
aggRepeatedNackDelta += nackDelta
}
return aggPacketDelta, aggRepeatedNackDelta
}
func (s *StreamAllocator) newChannelObserverProbe() *ChannelObserver {
return NewChannelObserver(ChannelObserverParamsProbe, s.params.Logger)
}
func (s *StreamAllocator) newChannelObserverNonProbe() *ChannelObserver {
return NewChannelObserver(ChannelObserverParamsNonProbe, s.params.Logger)
}
func (s *StreamAllocator) initProbe(probeRateBps int64) {
s.lastProbeStartTime = time.Now()
expectedBandwidthUsage := s.getExpectedBandwidthUsage()
s.probeGoalBps = expectedBandwidthUsage + probeRateBps
s.abortedProbeClusterId = ProbeClusterIdInvalid
s.probeTrendObserved = false
s.probeEndTime = time.Time{}
s.channelObserver = s.newChannelObserverProbe()
s.channelObserver.SeedEstimate(s.lastReceivedEstimate)
desiredRateBps := int(probeRateBps) + int(math.Max(float64(s.committedChannelCapacity), float64(expectedBandwidthUsage)))
s.probeClusterId = s.prober.AddCluster(
desiredRateBps,
int(expectedBandwidthUsage),
ProbeMinDuration,
ProbeMaxDuration,
)
s.params.Logger.Infow(
"stream allocator: starting probe",
"probeClusterId", s.probeClusterId,
"current usage", expectedBandwidthUsage,
"committed", s.committedChannelCapacity,
"lastReceived", s.lastReceivedEstimate,
"probeRateBps", probeRateBps,
"goalBps", s.probeGoalBps,
)
}
func (s *StreamAllocator) resetProbe() {
s.lastProbeStartTime = time.Now()
s.resetProbeInterval()
s.clearProbe()
}
func (s *StreamAllocator) clearProbe() {
s.probeClusterId = ProbeClusterIdInvalid
s.abortedProbeClusterId = ProbeClusterIdInvalid
}
func (s *StreamAllocator) backoffProbeInterval() {
s.probeInterval = time.Duration(s.probeInterval.Seconds()*ProbeBackoffFactor) * time.Second
if s.probeInterval > ProbeWaitMax {
s.probeInterval = ProbeWaitMax
}
}
func (s *StreamAllocator) resetProbeInterval() {
s.probeInterval = ProbeWaitBase
}
func (s *StreamAllocator) stopProbe() {
s.prober.Reset()
}
func (s *StreamAllocator) abortProbe() {
s.abortedProbeClusterId = s.probeClusterId
s.stopProbe()
}
func (s *StreamAllocator) isInProbe() bool {
return s.probeClusterId != ProbeClusterIdInvalid
}
func (s *StreamAllocator) maybeProbe() {
if time.Since(s.lastProbeStartTime) < s.probeInterval || s.probeClusterId != ProbeClusterIdInvalid {
return
}
switch s.params.Config.ProbeMode {
case config.CongestionControlProbeModeMedia:
s.maybeProbeWithMedia()
s.adjustState()
case config.CongestionControlProbeModePadding:
s.maybeProbeWithPadding()
}
}
func (s *StreamAllocator) maybeProbeWithMedia() {
// boost deficient track farthest from desired layers
for _, track := range s.getMaxDistanceSortedDeficient() {
allocation, boosted := track.AllocateNextHigher(ChannelCapacityInfinity, FlagAllowOvershootInCatchup)
if !boosted {
continue
}
update := NewStreamStateUpdate()
if allocation.PauseReason == sfu.VideoPauseReasonBandwidth && track.SetPaused(true) {
update.HandleStreamingChange(true, track)
}
s.maybeSendUpdate(update)
s.lastProbeStartTime = time.Now()
break
}
}
func (s *StreamAllocator) maybeProbeWithPadding() {
// use deficient track farthest from desired layers to find how much to probe
for _, track := range s.getMaxDistanceSortedDeficient() {
transition, available := track.GetNextHigherTransition(FlagAllowOvershootInProbe)
if !available || transition.BandwidthDelta < 0 {
continue
}
probeRateBps := (transition.BandwidthDelta * ProbePct) / 100
if probeRateBps < ProbeMinBps {
probeRateBps = ProbeMinBps
}
s.initProbe(probeRateBps)
break
}
}
func (s *StreamAllocator) getTracks() []*Track {
s.videoTracksMu.RLock()
var tracks []*Track
for _, track := range s.videoTracks {
tracks = append(tracks, track)
}
s.videoTracksMu.RUnlock()
return tracks
}
func (s *StreamAllocator) getSorted() TrackSorter {
s.videoTracksMu.RLock()
var trackSorter TrackSorter
for _, track := range s.videoTracks {
if !track.IsManaged() {
continue
}
trackSorter = append(trackSorter, track)
}
s.videoTracksMu.RUnlock()
sort.Sort(trackSorter)
return trackSorter
}
func (s *StreamAllocator) getMinDistanceSorted(exclude *Track) MinDistanceSorter {
s.videoTracksMu.RLock()
var minDistanceSorter MinDistanceSorter
for _, track := range s.videoTracks {
if !track.IsManaged() || track == exclude {
continue
}
minDistanceSorter = append(minDistanceSorter, track)
}
s.videoTracksMu.RUnlock()
sort.Sort(minDistanceSorter)
return minDistanceSorter
}
func (s *StreamAllocator) getMaxDistanceSortedDeficient() MaxDistanceSorter {
s.videoTracksMu.RLock()
var maxDistanceSorter MaxDistanceSorter
for _, track := range s.videoTracks {
if !track.IsManaged() || !track.IsDeficient() {
continue
}
maxDistanceSorter = append(maxDistanceSorter, track)
}
s.videoTracksMu.RUnlock()
sort.Sort(maxDistanceSorter)
return maxDistanceSorter
}
// ------------------------------------------------