mirror of
https://github.com/livekit/livekit.git
synced 2026-04-15 10:45:40 +00:00
A couple of other bits
1. Use request layer for sending PLI on bind and connected.
2. When adjusting for overshoot, do not adjust target unless current is
at max. If not, it could get stuck in a lower layer in the following
scenario
a. Overshoot to layer 2
b. Max layer is 1, start sending PLI
c. Get key frame for layer 0, adjust for overshoot as we have
something at a layer lower than max.
d. Adjust for overshoot.
e. Setting target to max means that current and target are equal
and no further adjustment happens.
1869 lines
49 KiB
Go
1869 lines
49 KiB
Go
package sfu
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pion/interceptor/pkg/cc"
|
|
"github.com/pion/rtcp"
|
|
"github.com/pion/webrtc/v3"
|
|
"go.uber.org/atomic"
|
|
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
|
|
"github.com/livekit/livekit-server/pkg/config"
|
|
)
|
|
|
|
const (
|
|
ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps
|
|
|
|
NackRatioAttenuator = 0.4 // how much to attenuate NACK ratio while calculating loss adjusted estimate
|
|
|
|
ProbeWaitBase = 5 * time.Second
|
|
ProbeBackoffFactor = 1.5
|
|
ProbeWaitMax = 30 * time.Second
|
|
ProbeSettleWait = 250
|
|
ProbeTrendWait = 2 * time.Second
|
|
|
|
ProbePct = 120
|
|
ProbeMinBps = 200 * 1000 // 200 kbps
|
|
ProbeMinDuration = 20 * time.Second
|
|
ProbeMaxDuration = 21 * time.Second
|
|
|
|
PriorityMin = uint8(1)
|
|
PriorityMax = uint8(255)
|
|
PriorityDefaultScreenshare = PriorityMax
|
|
PriorityDefaultVideo = PriorityMin
|
|
|
|
FlagAllowOvershootWhileOptimal = true
|
|
FlagAllowOvershootWhileDeficient = false
|
|
FlagAllowOvershootExemptTrackWhileDeficient = true
|
|
FlagAllowOvershootInProbe = true
|
|
FlagAllowOvershootInCatchup = true
|
|
)
|
|
|
|
var (
|
|
ChannelObserverParamsProbe = ChannelObserverParams{
|
|
Name: "probe",
|
|
EstimateRequiredSamples: 3,
|
|
EstimateDownwardTrendThreshold: 0.0,
|
|
EstimateCollapseValues: false,
|
|
NackWindowMinDuration: 500 * time.Millisecond,
|
|
NackWindowMaxDuration: 1 * time.Second,
|
|
NackRatioThreshold: 0.04,
|
|
}
|
|
|
|
ChannelObserverParamsNonProbe = ChannelObserverParams{
|
|
Name: "non-probe",
|
|
EstimateRequiredSamples: 8,
|
|
EstimateDownwardTrendThreshold: -0.5,
|
|
EstimateCollapseValues: true,
|
|
NackWindowMinDuration: 1 * time.Second,
|
|
NackWindowMaxDuration: 2 * time.Second,
|
|
NackRatioThreshold: 0.08,
|
|
}
|
|
)
|
|
|
|
type streamAllocatorState int
|
|
|
|
const (
|
|
streamAllocatorStateStable streamAllocatorState = iota
|
|
streamAllocatorStateDeficient
|
|
)
|
|
|
|
func (s streamAllocatorState) String() string {
|
|
switch s {
|
|
case streamAllocatorStateStable:
|
|
return "STABLE"
|
|
case streamAllocatorStateDeficient:
|
|
return "DEFICIENT"
|
|
default:
|
|
return fmt.Sprintf("%d", int(s))
|
|
}
|
|
}
|
|
|
|
type streamAllocatorSignal int
|
|
|
|
const (
|
|
streamAllocatorSignalAllocateTrack streamAllocatorSignal = iota
|
|
streamAllocatorSignalAllocateAllTracks
|
|
streamAllocatorSignalAdjustState
|
|
streamAllocatorSignalEstimate
|
|
streamAllocatorSignalPeriodicPing
|
|
streamAllocatorSignalSendProbe
|
|
streamAllocatorSignalProbeClusterDone
|
|
streamAllocatorSignalTargetLayerFound
|
|
)
|
|
|
|
func (s streamAllocatorSignal) String() string {
|
|
switch s {
|
|
case streamAllocatorSignalAllocateTrack:
|
|
return "ALLOCATE_TRACK"
|
|
case streamAllocatorSignalAllocateAllTracks:
|
|
return "ALLOCATE_ALL_TRACKS"
|
|
case streamAllocatorSignalAdjustState:
|
|
return "ADJUST_STATE"
|
|
case streamAllocatorSignalEstimate:
|
|
return "ESTIMATE"
|
|
case streamAllocatorSignalPeriodicPing:
|
|
return "PERIODIC_PING"
|
|
case streamAllocatorSignalSendProbe:
|
|
return "SEND_PROBE"
|
|
case streamAllocatorSignalProbeClusterDone:
|
|
return "PROBE_CLUSTER_DONE"
|
|
case streamAllocatorSignalTargetLayerFound:
|
|
return "TARGET_LAYER_FOUND"
|
|
default:
|
|
return fmt.Sprintf("%d", int(s))
|
|
}
|
|
}
|
|
|
|
type Event struct {
|
|
Signal streamAllocatorSignal
|
|
TrackID livekit.TrackID
|
|
Data interface{}
|
|
}
|
|
|
|
func (e Event) String() string {
|
|
return fmt.Sprintf("StreamAllocator:Event{signal: %s, trackID: %s, data: %+v}", e.Signal, e.TrackID, e.Data)
|
|
}
|
|
|
|
type StreamAllocatorParams struct {
|
|
Config config.CongestionControlConfig
|
|
Logger logger.Logger
|
|
}
|
|
|
|
type StreamAllocator struct {
|
|
params StreamAllocatorParams
|
|
|
|
onStreamStateChange func(update *StreamStateUpdate) error
|
|
|
|
bwe cc.BandwidthEstimator
|
|
|
|
lastReceivedEstimate int64
|
|
committedChannelCapacity int64
|
|
|
|
probeInterval time.Duration
|
|
lastProbeStartTime time.Time
|
|
probeGoalBps int64
|
|
probeClusterId ProbeClusterId
|
|
abortedProbeClusterId ProbeClusterId
|
|
probeTrendObserved bool
|
|
probeEndTime time.Time
|
|
|
|
prober *Prober
|
|
|
|
channelObserver *ChannelObserver
|
|
|
|
videoTracksMu sync.RWMutex
|
|
videoTracks map[livekit.TrackID]*Track
|
|
isAllocateAllPending bool
|
|
rembTrackingSSRC uint32
|
|
|
|
state streamAllocatorState
|
|
|
|
eventChMu sync.RWMutex
|
|
eventCh chan Event
|
|
|
|
isStopped atomic.Bool
|
|
}
|
|
|
|
func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator {
|
|
s := &StreamAllocator{
|
|
params: params,
|
|
prober: NewProber(ProberParams{
|
|
Logger: params.Logger,
|
|
}),
|
|
videoTracks: make(map[livekit.TrackID]*Track),
|
|
eventCh: make(chan Event, 200),
|
|
}
|
|
|
|
s.resetState()
|
|
|
|
s.prober.OnSendProbe(s.onSendProbe)
|
|
s.prober.OnProbeClusterDone(s.onProbeClusterDone)
|
|
|
|
return s
|
|
}
|
|
|
|
func (s *StreamAllocator) Start() {
|
|
go s.processEvents()
|
|
go s.ping()
|
|
}
|
|
|
|
func (s *StreamAllocator) Stop() {
|
|
s.eventChMu.Lock()
|
|
if s.isStopped.Swap(true) {
|
|
s.eventChMu.Unlock()
|
|
return
|
|
}
|
|
|
|
close(s.eventCh)
|
|
s.eventChMu.Unlock()
|
|
}
|
|
|
|
func (s *StreamAllocator) OnStreamStateChange(f func(update *StreamStateUpdate) error) {
|
|
s.onStreamStateChange = f
|
|
}
|
|
|
|
func (s *StreamAllocator) SetBandwidthEstimator(bwe cc.BandwidthEstimator) {
|
|
if bwe != nil {
|
|
bwe.OnTargetBitrateChange(s.onTargetBitrateChange)
|
|
}
|
|
s.bwe = bwe
|
|
}
|
|
|
|
type AddTrackParams struct {
|
|
Source livekit.TrackSource
|
|
Priority uint8
|
|
IsSimulcast bool
|
|
PublisherID livekit.ParticipantID
|
|
}
|
|
|
|
func (s *StreamAllocator) AddTrack(downTrack *DownTrack, params AddTrackParams) {
|
|
if downTrack.Kind() != webrtc.RTPCodecTypeVideo {
|
|
return
|
|
}
|
|
|
|
track := newTrack(downTrack, params.Source, params.IsSimulcast, params.PublisherID, s.params.Logger)
|
|
track.SetPriority(params.Priority)
|
|
|
|
s.videoTracksMu.Lock()
|
|
s.videoTracks[livekit.TrackID(downTrack.ID())] = track
|
|
s.videoTracksMu.Unlock()
|
|
|
|
downTrack.OnREMB(s.onREMB)
|
|
downTrack.OnTransportCCFeedback(s.onTransportCCFeedback)
|
|
downTrack.OnAvailableLayersChanged(s.onAvailableLayersChanged)
|
|
downTrack.OnBitrateAvailabilityChanged(s.onBitrateAvailabilityChanged)
|
|
downTrack.OnMaxPublishedLayerChanged(s.onMaxPublishedLayerChanged)
|
|
downTrack.OnSubscriptionChanged(s.onSubscriptionChanged)
|
|
downTrack.OnSubscribedLayersChanged(s.onSubscribedLayersChanged)
|
|
downTrack.OnPacketSent(s.onPacketSent)
|
|
downTrack.OnTargetLayerFound(s.onTargetLayerFound)
|
|
|
|
s.maybePostEventAllocateTrack(downTrack)
|
|
}
|
|
|
|
func (s *StreamAllocator) RemoveTrack(downTrack *DownTrack) {
|
|
s.videoTracksMu.Lock()
|
|
if existing := s.videoTracks[livekit.TrackID(downTrack.ID())]; existing != nil && existing.DownTrack() == downTrack {
|
|
delete(s.videoTracks, livekit.TrackID(downTrack.ID()))
|
|
}
|
|
s.videoTracksMu.Unlock()
|
|
|
|
// LK-TODO: use any saved bandwidth to re-distribute
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalAdjustState,
|
|
})
|
|
}
|
|
|
|
func (s *StreamAllocator) SetTrackPriority(downTrack *DownTrack, priority uint8) {
|
|
s.videoTracksMu.Lock()
|
|
if track := s.videoTracks[livekit.TrackID(downTrack.ID())]; track != nil {
|
|
changed := track.SetPriority(priority)
|
|
if changed && !s.isAllocateAllPending {
|
|
// do a full allocation on a track priority change to keep it simple
|
|
s.isAllocateAllPending = true
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalAllocateAllTracks,
|
|
})
|
|
}
|
|
}
|
|
s.videoTracksMu.Unlock()
|
|
}
|
|
|
|
func (s *StreamAllocator) resetState() {
|
|
s.channelObserver = s.newChannelObserverNonProbe()
|
|
s.resetProbe()
|
|
|
|
s.state = streamAllocatorStateStable
|
|
}
|
|
|
|
// called when a new REMB is received (receive side bandwidth estimation)
|
|
func (s *StreamAllocator) onREMB(downTrack *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate) {
|
|
//
|
|
// Channel capacity is estimated at a peer connection level. All down tracks
|
|
// in the peer connection will end up calling this for a REMB report with
|
|
// the same estimated channel capacity. Use a tracking SSRC to lock onto to
|
|
// one report. As SSRCs can be dropped over time, update tracking SSRC as needed
|
|
//
|
|
// A couple of things to keep in mind
|
|
// - REMB reports could be sent gratuitously as a way of providing
|
|
// periodic feedback, i.e. even if the estimated capacity does not
|
|
// change, there could be REMB packets on the wire. Those gratuitous
|
|
// REMBs should not trigger anything bad.
|
|
// - As each down track will issue this callback for the same REMB packet
|
|
// from the wire, theoretically it is possible that one down track's
|
|
// callback from previous REMB comes after another down track's callback
|
|
// from the new REMB. REMBs could fire very quickly especially when
|
|
// the network is entering congestion.
|
|
// LK-TODO-START
|
|
// Need to check if the same SSRC reports can somehow race, i.e. does pion send
|
|
// RTCP dispatch for same SSRC on different threads? If not, the tracking SSRC
|
|
// should prevent racing
|
|
// LK-TODO-END
|
|
//
|
|
|
|
// if there are no video tracks, ignore any straggler REMB
|
|
s.videoTracksMu.Lock()
|
|
if len(s.videoTracks) == 0 {
|
|
s.videoTracksMu.Unlock()
|
|
return
|
|
}
|
|
|
|
track := s.videoTracks[livekit.TrackID(downTrack.ID())]
|
|
downTrackSSRC := uint32(0)
|
|
if track != nil {
|
|
downTrackSSRC = track.DownTrack().SSRC()
|
|
}
|
|
|
|
found := false
|
|
for _, ssrc := range remb.SSRCs {
|
|
if ssrc == s.rembTrackingSSRC {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
if len(remb.SSRCs) == 0 {
|
|
s.params.Logger.Warnw("stream allocator: no SSRC to track REMB", nil)
|
|
s.videoTracksMu.Unlock()
|
|
return
|
|
}
|
|
|
|
// try to lock to track which is sending this update
|
|
if downTrackSSRC != 0 {
|
|
for _, ssrc := range remb.SSRCs {
|
|
if ssrc == downTrackSSRC {
|
|
s.rembTrackingSSRC = downTrackSSRC
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
s.rembTrackingSSRC = remb.SSRCs[0]
|
|
}
|
|
}
|
|
|
|
if s.rembTrackingSSRC == 0 || s.rembTrackingSSRC != downTrackSSRC {
|
|
s.videoTracksMu.Unlock()
|
|
return
|
|
}
|
|
s.videoTracksMu.Unlock()
|
|
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalEstimate,
|
|
Data: int64(remb.Bitrate),
|
|
})
|
|
}
|
|
|
|
// called when a new transport-cc feedback is received
|
|
func (s *StreamAllocator) onTransportCCFeedback(downTrack *DownTrack, fb *rtcp.TransportLayerCC) {
|
|
if s.bwe != nil {
|
|
s.bwe.WriteRTCP([]rtcp.Packet{fb}, nil)
|
|
}
|
|
}
|
|
|
|
// called when target bitrate changes (send side bandwidth estimation)
|
|
func (s *StreamAllocator) onTargetBitrateChange(bitrate int) {
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalEstimate,
|
|
Data: int64(bitrate),
|
|
})
|
|
}
|
|
|
|
// called when feeding track's layer availability changes
|
|
func (s *StreamAllocator) onAvailableLayersChanged(downTrack *DownTrack) {
|
|
s.maybePostEventAllocateTrack(downTrack)
|
|
}
|
|
|
|
// called when feeding track's bitrate measurement of any layer is available
|
|
func (s *StreamAllocator) onBitrateAvailabilityChanged(downTrack *DownTrack) {
|
|
s.maybePostEventAllocateTrack(downTrack)
|
|
}
|
|
|
|
// called when feeding track's max publisher layer changes
|
|
func (s *StreamAllocator) onMaxPublishedLayerChanged(downTrack *DownTrack) {
|
|
s.maybePostEventAllocateTrack(downTrack)
|
|
}
|
|
|
|
// called when subscription settings changes (muting/unmuting of track)
|
|
func (s *StreamAllocator) onSubscriptionChanged(downTrack *DownTrack) {
|
|
s.maybePostEventAllocateTrack(downTrack)
|
|
}
|
|
|
|
// called when subscribed layers changes (limiting max layers)
|
|
func (s *StreamAllocator) onSubscribedLayersChanged(downTrack *DownTrack, layers VideoLayers) {
|
|
shouldPost := false
|
|
s.videoTracksMu.Lock()
|
|
if track := s.videoTracks[livekit.TrackID(downTrack.ID())]; track != nil {
|
|
if track.SetMaxLayers(layers) && track.SetDirty(true) {
|
|
shouldPost = true
|
|
}
|
|
}
|
|
s.videoTracksMu.Unlock()
|
|
|
|
if shouldPost {
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalAllocateTrack,
|
|
TrackID: livekit.TrackID(downTrack.ID()),
|
|
})
|
|
}
|
|
}
|
|
|
|
// called when a video DownTrack sends a packet
|
|
func (s *StreamAllocator) onPacketSent(downTrack *DownTrack, size int) {
|
|
s.prober.PacketSent(size)
|
|
}
|
|
|
|
// called when prober wants to send packet(s)
|
|
func (s *StreamAllocator) onSendProbe(bytesToSend int) {
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalSendProbe,
|
|
Data: bytesToSend,
|
|
})
|
|
}
|
|
|
|
// called when prober wants to send packet(s)
|
|
func (s *StreamAllocator) onProbeClusterDone(info ProbeClusterInfo) {
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalProbeClusterDone,
|
|
Data: info,
|
|
})
|
|
}
|
|
|
|
// called when forwarder finds a target layer
|
|
func (s *StreamAllocator) onTargetLayerFound(downTrack *DownTrack) {
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalTargetLayerFound,
|
|
TrackID: livekit.TrackID(downTrack.ID()),
|
|
})
|
|
}
|
|
|
|
func (s *StreamAllocator) maybePostEventAllocateTrack(downTrack *DownTrack) {
|
|
shouldPost := false
|
|
s.videoTracksMu.Lock()
|
|
if track := s.videoTracks[livekit.TrackID(downTrack.ID())]; track != nil {
|
|
if track.SetDirty(true) {
|
|
shouldPost = true
|
|
}
|
|
}
|
|
s.videoTracksMu.Unlock()
|
|
|
|
if shouldPost {
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalAllocateTrack,
|
|
TrackID: livekit.TrackID(downTrack.ID()),
|
|
})
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) postEvent(event Event) {
|
|
s.eventChMu.RLock()
|
|
if s.isStopped.Load() {
|
|
s.eventChMu.RUnlock()
|
|
return
|
|
}
|
|
|
|
select {
|
|
case s.eventCh <- event:
|
|
default:
|
|
s.params.Logger.Warnw("stream allocator: event queue full", nil)
|
|
}
|
|
s.eventChMu.RUnlock()
|
|
}
|
|
|
|
func (s *StreamAllocator) processEvents() {
|
|
for event := range s.eventCh {
|
|
s.handleEvent(&event)
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) ping() {
|
|
ticker := time.NewTicker(time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
<-ticker.C
|
|
if s.isStopped.Load() {
|
|
return
|
|
}
|
|
|
|
s.postEvent(Event{
|
|
Signal: streamAllocatorSignalPeriodicPing,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) handleEvent(event *Event) {
|
|
switch event.Signal {
|
|
case streamAllocatorSignalAllocateTrack:
|
|
s.handleSignalAllocateTrack(event)
|
|
case streamAllocatorSignalAllocateAllTracks:
|
|
s.handleSignalAllocateAllTracks(event)
|
|
case streamAllocatorSignalAdjustState:
|
|
s.handleSignalAdjustState(event)
|
|
case streamAllocatorSignalEstimate:
|
|
s.handleSignalEstimate(event)
|
|
case streamAllocatorSignalPeriodicPing:
|
|
s.handleSignalPeriodicPing(event)
|
|
case streamAllocatorSignalSendProbe:
|
|
s.handleSignalSendProbe(event)
|
|
case streamAllocatorSignalProbeClusterDone:
|
|
s.handleSignalProbeClusterDone(event)
|
|
case streamAllocatorSignalTargetLayerFound:
|
|
s.handleSignalTargetLayerFound(event)
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalAllocateTrack(event *Event) {
|
|
s.videoTracksMu.Lock()
|
|
track := s.videoTracks[event.TrackID]
|
|
if track != nil {
|
|
track.SetDirty(false)
|
|
}
|
|
s.videoTracksMu.Unlock()
|
|
|
|
if track != nil {
|
|
s.allocateTrack(track)
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalAllocateAllTracks(event *Event) {
|
|
s.videoTracksMu.Lock()
|
|
s.isAllocateAllPending = false
|
|
s.videoTracksMu.Unlock()
|
|
|
|
if s.state == streamAllocatorStateDeficient {
|
|
s.allocateAllTracks()
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalAdjustState(event *Event) {
|
|
s.adjustState()
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalEstimate(event *Event) {
|
|
receivedEstimate, _ := event.Data.(int64)
|
|
s.lastReceivedEstimate = receivedEstimate
|
|
|
|
// while probing, maintain estimate separately to enable keeping current committed estimate if probe fails
|
|
if s.isInProbe() {
|
|
s.handleNewEstimateInProbe()
|
|
} else {
|
|
s.handleNewEstimateInNonProbe()
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalPeriodicPing(event *Event) {
|
|
// finalize probe if necessary
|
|
if s.isInProbe() && !s.probeEndTime.IsZero() && time.Now().After(s.probeEndTime) {
|
|
s.finalizeProbe()
|
|
}
|
|
|
|
// probe if necessary and timing is right
|
|
if s.state == streamAllocatorStateDeficient {
|
|
s.maybeProbe()
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalSendProbe(event *Event) {
|
|
bytesToSend := event.Data.(int)
|
|
if bytesToSend <= 0 {
|
|
return
|
|
}
|
|
|
|
bytesSent := 0
|
|
for _, track := range s.getTracks() {
|
|
sent := track.WritePaddingRTP(bytesToSend)
|
|
bytesSent += sent
|
|
bytesToSend -= sent
|
|
if bytesToSend <= 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
if bytesSent != 0 {
|
|
s.prober.ProbeSent(bytesSent)
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalProbeClusterDone(event *Event) {
|
|
info, _ := event.Data.(ProbeClusterInfo)
|
|
if s.probeClusterId != info.Id {
|
|
return
|
|
}
|
|
|
|
if s.abortedProbeClusterId == ProbeClusterIdInvalid {
|
|
// successful probe, finalize
|
|
s.finalizeProbe()
|
|
return
|
|
}
|
|
|
|
// ensure probe queue is flushed
|
|
// LK-TODO: ProbeSettleWait should actually be a certain number of RTTs.
|
|
lowestEstimate := int64(math.Min(float64(s.committedChannelCapacity), float64(s.channelObserver.GetLowestEstimate())))
|
|
expectedDuration := float64(info.BytesSent*8*1000) / float64(lowestEstimate)
|
|
queueTime := expectedDuration - float64(info.Duration.Milliseconds())
|
|
if queueTime < 0.0 {
|
|
queueTime = 0.0
|
|
}
|
|
queueWait := time.Duration(queueTime+float64(ProbeSettleWait)) * time.Millisecond
|
|
s.probeEndTime = s.lastProbeStartTime.Add(queueWait)
|
|
}
|
|
|
|
func (s *StreamAllocator) handleSignalTargetLayerFound(event *Event) {
|
|
s.videoTracksMu.Lock()
|
|
track := s.videoTracks[event.TrackID]
|
|
s.videoTracksMu.Unlock()
|
|
|
|
if track != nil {
|
|
update := NewStreamStateUpdate()
|
|
if track.SetPaused(false) {
|
|
update.HandleStreamingChange(false, track)
|
|
}
|
|
s.maybeSendUpdate(update)
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) setState(state streamAllocatorState) {
|
|
if s.state == state {
|
|
return
|
|
}
|
|
|
|
s.params.Logger.Infow("stream allocator: state change", "from", s.state, "to", state)
|
|
s.state = state
|
|
|
|
// reset probe to enforce a delay after state change before probing
|
|
s.lastProbeStartTime = time.Now()
|
|
}
|
|
|
|
func (s *StreamAllocator) adjustState() {
|
|
for _, track := range s.getTracks() {
|
|
if track.IsDeficient() {
|
|
s.setState(streamAllocatorStateDeficient)
|
|
return
|
|
}
|
|
}
|
|
|
|
s.setState(streamAllocatorStateStable)
|
|
}
|
|
|
|
func (s *StreamAllocator) handleNewEstimateInProbe() {
|
|
// always update NACKs, even if aborted
|
|
packetDelta, repeatedNackDelta := s.getNackDelta()
|
|
|
|
if s.abortedProbeClusterId != ProbeClusterIdInvalid {
|
|
// waiting for aborted probe to finalize
|
|
return
|
|
}
|
|
|
|
s.channelObserver.AddEstimate(s.lastReceivedEstimate)
|
|
s.channelObserver.AddNack(packetDelta, repeatedNackDelta)
|
|
|
|
trend, _ := s.channelObserver.GetTrend()
|
|
if trend != ChannelTrendNeutral {
|
|
s.probeTrendObserved = true
|
|
}
|
|
|
|
switch {
|
|
case !s.probeTrendObserved && time.Since(s.lastProbeStartTime) > ProbeTrendWait:
|
|
//
|
|
// More of a safety net.
|
|
// In rare cases, the estimate gets stuck. Prevent from probe running amok
|
|
// LK-TODO: Need more testing here to ensure that probe does not cause a lot of damage
|
|
//
|
|
s.params.Logger.Infow("stream allocator: probe: aborting, no trend", "cluster", s.probeClusterId)
|
|
s.abortProbe()
|
|
|
|
case trend == ChannelTrendCongesting:
|
|
// stop immediately if the probe is congesting channel more
|
|
s.params.Logger.Infow("stream allocator: probe: aborting, channel is congesting", "cluster", s.probeClusterId)
|
|
s.abortProbe()
|
|
|
|
case s.channelObserver.GetHighestEstimate() > s.probeGoalBps:
|
|
// reached goal, stop probing
|
|
s.params.Logger.Infow(
|
|
"stream allocator: probe: stopping, goal reached",
|
|
"cluster", s.probeClusterId,
|
|
"goal", s.probeGoalBps,
|
|
"highest", s.channelObserver.GetHighestEstimate(),
|
|
)
|
|
s.stopProbe()
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) handleNewEstimateInNonProbe() {
|
|
s.channelObserver.AddEstimate(s.lastReceivedEstimate)
|
|
|
|
packetDelta, repeatedNackDelta := s.getNackDelta()
|
|
s.channelObserver.AddNack(packetDelta, repeatedNackDelta)
|
|
|
|
trend, reason := s.channelObserver.GetTrend()
|
|
if trend != ChannelTrendCongesting {
|
|
return
|
|
}
|
|
|
|
var estimateToCommit int64
|
|
var packets, repeatedNacks uint32
|
|
var nackRatio float64
|
|
expectedBandwidthUsage := s.getExpectedBandwidthUsage()
|
|
switch reason {
|
|
case ChannelCongestionReasonLoss:
|
|
packets, repeatedNacks, nackRatio = s.channelObserver.GetNackRatio()
|
|
estimateToCommit = int64(float64(expectedBandwidthUsage) * (1.0 - NackRatioAttenuator*nackRatio))
|
|
default:
|
|
estimateToCommit = s.lastReceivedEstimate
|
|
}
|
|
|
|
s.params.Logger.Infow(
|
|
"stream allocator: channel congestion detected, updating channel capacity",
|
|
"reason", reason,
|
|
"old(bps)", s.committedChannelCapacity,
|
|
"new(bps)", estimateToCommit,
|
|
"lastReceived(bps)", s.lastReceivedEstimate,
|
|
"expectedUsage(bps)", expectedBandwidthUsage,
|
|
"packets", packets,
|
|
"repeatedNacks", repeatedNacks,
|
|
"nackRatio", nackRatio,
|
|
)
|
|
s.committedChannelCapacity = estimateToCommit
|
|
|
|
// reset to get new set of samples for next trend
|
|
s.channelObserver = s.newChannelObserverNonProbe()
|
|
|
|
// reset probe to ensure it does not start too soon after a downward trend
|
|
s.resetProbe()
|
|
|
|
s.allocateAllTracks()
|
|
}
|
|
|
|
func (s *StreamAllocator) allocateTrack(track *Track) {
|
|
// abort any probe that may be running when a track specific change needs allocation
|
|
s.abortProbe()
|
|
|
|
// if not deficient, free pass allocate track
|
|
if !s.params.Config.Enabled || s.state == streamAllocatorStateStable || !track.IsManaged() {
|
|
update := NewStreamStateUpdate()
|
|
allocation := track.AllocateOptimal(FlagAllowOvershootWhileOptimal)
|
|
if allocation.pauseReason == VideoPauseReasonBandwidth && track.SetPaused(true) {
|
|
update.HandleStreamingChange(true, track)
|
|
}
|
|
s.maybeSendUpdate(update)
|
|
return
|
|
}
|
|
|
|
//
|
|
// In DEFICIENT state,
|
|
// 1. Find cooperative transition from track that needs allocation.
|
|
// 2. If track is currently streaming at minimum, do not do anything.
|
|
// 3. If that track is giving back bits, apply the transition.
|
|
// 4. If this track needs more, ask for best offer from others and try to use it.
|
|
//
|
|
track.ProvisionalAllocatePrepare()
|
|
transition := track.ProvisionalAllocateGetCooperativeTransition(FlagAllowOvershootWhileDeficient)
|
|
|
|
// track is currently streaming at minimum
|
|
if transition.bandwidthDelta == 0 {
|
|
return
|
|
}
|
|
|
|
// downgrade, giving back bits
|
|
if transition.from.GreaterThan(transition.to) {
|
|
allocation := track.ProvisionalAllocateCommit()
|
|
|
|
update := NewStreamStateUpdate()
|
|
if allocation.pauseReason == VideoPauseReasonBandwidth && track.SetPaused(true) {
|
|
update.HandleStreamingChange(true, track)
|
|
}
|
|
s.maybeSendUpdate(update)
|
|
|
|
s.adjustState()
|
|
return
|
|
// LK-TODO-START
|
|
// Should use the bits given back to start any paused track.
|
|
// Note layer downgrade may actually have positive delta (i.e. consume more bits)
|
|
// because of when the measurement is done. Watch for that.
|
|
// LK-TODO-END
|
|
}
|
|
|
|
//
|
|
// This track is currently not streaming and needs bits to start.
|
|
// Try to redistribute starting with tracks that are closest to their desired.
|
|
//
|
|
bandwidthAcquired := int64(0)
|
|
var contributingTracks []*Track
|
|
|
|
minDistanceSorted := s.getMinDistanceSorted(track)
|
|
for _, t := range minDistanceSorted {
|
|
t.ProvisionalAllocatePrepare()
|
|
}
|
|
|
|
for _, t := range minDistanceSorted {
|
|
tx := t.ProvisionalAllocateGetBestWeightedTransition()
|
|
if tx.bandwidthDelta < 0 {
|
|
contributingTracks = append(contributingTracks, t)
|
|
|
|
bandwidthAcquired += -tx.bandwidthDelta
|
|
if bandwidthAcquired >= transition.bandwidthDelta {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
update := NewStreamStateUpdate()
|
|
if bandwidthAcquired >= transition.bandwidthDelta {
|
|
// commit the tracks that contributed
|
|
for _, t := range contributingTracks {
|
|
allocation := t.ProvisionalAllocateCommit()
|
|
if allocation.pauseReason == VideoPauseReasonBandwidth && track.SetPaused(true) {
|
|
update.HandleStreamingChange(true, t)
|
|
}
|
|
}
|
|
|
|
// LK-TODO if got too much extra, can potentially give it to some deficient track
|
|
}
|
|
|
|
// commit the track that needs change if enough could be acquired or pause not allowed
|
|
if !s.params.Config.AllowPause || bandwidthAcquired >= transition.bandwidthDelta {
|
|
allocation := track.ProvisionalAllocateCommit()
|
|
if allocation.pauseReason == VideoPauseReasonBandwidth && track.SetPaused(true) {
|
|
update.HandleStreamingChange(true, track)
|
|
}
|
|
}
|
|
|
|
s.maybeSendUpdate(update)
|
|
|
|
s.adjustState()
|
|
}
|
|
|
|
func (s *StreamAllocator) finalizeProbe() {
|
|
aborted := s.probeClusterId == s.abortedProbeClusterId
|
|
highestEstimateInProbe := s.channelObserver.GetHighestEstimate()
|
|
|
|
s.clearProbe()
|
|
|
|
//
|
|
// Reset estimator at the end of a probe irrespective of probe result to get fresh readings.
|
|
// With a failed probe, the latest estimate would be lower than committed estimate.
|
|
// As bandwidth estimator (remote in REMB case, local in TWCC case) holds state,
|
|
// subsequent estimates could start from the lower point. That should not trigger a
|
|
// downward trend and get latched to committed estimate as that would trigger a re-allocation.
|
|
// With fresh readings, as long as the trend is not going downward, it will not get latched.
|
|
//
|
|
// NOTE: With TWCC, it is possible to reset bandwidth estimation to clean state as
|
|
// the send side is in full control of bandwidth estimation.
|
|
//
|
|
s.channelObserver = s.newChannelObserverNonProbe()
|
|
|
|
if aborted {
|
|
// failed probe, backoff
|
|
s.backoffProbeInterval()
|
|
return
|
|
}
|
|
|
|
// reset probe interval on a successful probe
|
|
s.resetProbeInterval()
|
|
|
|
// probe estimate is same or higher, commit it and try to allocate deficient tracks
|
|
s.params.Logger.Infow(
|
|
"successful probe, updating channel capacity",
|
|
"old(bps)", s.committedChannelCapacity,
|
|
"new(bps)", highestEstimateInProbe,
|
|
)
|
|
s.committedChannelCapacity = highestEstimateInProbe
|
|
|
|
s.maybeBoostDeficientTracks()
|
|
}
|
|
|
|
func (s *StreamAllocator) maybeBoostDeficientTracks() {
|
|
committedChannelCapacity := s.committedChannelCapacity
|
|
if s.params.Config.MinChannelCapacity > committedChannelCapacity {
|
|
committedChannelCapacity = s.params.Config.MinChannelCapacity
|
|
s.params.Logger.Debugw(
|
|
"stream allocator: overriding channel capacity",
|
|
"actual", s.committedChannelCapacity,
|
|
"override", committedChannelCapacity,
|
|
)
|
|
}
|
|
availableChannelCapacity := committedChannelCapacity - s.getExpectedBandwidthUsage()
|
|
if availableChannelCapacity <= 0 {
|
|
return
|
|
}
|
|
|
|
update := NewStreamStateUpdate()
|
|
|
|
for _, track := range s.getMaxDistanceSortedDeficient() {
|
|
allocation, boosted := track.AllocateNextHigher(availableChannelCapacity, FlagAllowOvershootInCatchup)
|
|
if !boosted {
|
|
continue
|
|
}
|
|
|
|
if allocation.pauseReason == VideoPauseReasonBandwidth && track.SetPaused(true) {
|
|
update.HandleStreamingChange(true, track)
|
|
}
|
|
|
|
availableChannelCapacity -= allocation.bandwidthDelta
|
|
if availableChannelCapacity <= 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
s.maybeSendUpdate(update)
|
|
|
|
s.adjustState()
|
|
}
|
|
|
|
func (s *StreamAllocator) allocateAllTracks() {
|
|
if !s.params.Config.Enabled {
|
|
// nothing else to do when disabled
|
|
return
|
|
}
|
|
|
|
//
|
|
// Goals:
|
|
// 1. Stream as many tracks as possible, i.e. no pauses.
|
|
// 2. Try to give fair allocation to all track.
|
|
//
|
|
// Start with the lowest layers and give each track a chance at that layer and keep going up.
|
|
// As long as there is enough bandwidth for tracks to stream at the lowest layers, the first goal is achieved.
|
|
//
|
|
// Tracks that have higher subscribed layers can use any additional available bandwidth. This tried to achieve the second goal.
|
|
//
|
|
// If there is not enough bandwidth even for the lowest layers, tracks at lower priorities will be paused.
|
|
//
|
|
update := NewStreamStateUpdate()
|
|
|
|
availableChannelCapacity := s.committedChannelCapacity
|
|
if s.params.Config.MinChannelCapacity > availableChannelCapacity {
|
|
availableChannelCapacity = s.params.Config.MinChannelCapacity
|
|
s.params.Logger.Debugw(
|
|
"stream allocator: overriding channel capacity",
|
|
"actual", s.committedChannelCapacity,
|
|
"override", availableChannelCapacity,
|
|
)
|
|
}
|
|
|
|
//
|
|
// This pass is to find out if there is any leftover channel capacity after allocating exempt tracks.
|
|
// Exempt tracks are given optimal allocation (i. e. no bandwidth constraint) so that they do not fail allocation.
|
|
//
|
|
videoTracks := s.getTracks()
|
|
for _, track := range videoTracks {
|
|
if track.IsManaged() {
|
|
continue
|
|
}
|
|
|
|
allocation := track.AllocateOptimal(FlagAllowOvershootExemptTrackWhileDeficient)
|
|
if allocation.pauseReason == VideoPauseReasonBandwidth && track.SetPaused(true) {
|
|
update.HandleStreamingChange(true, track)
|
|
}
|
|
|
|
// LK-TODO: optimistic allocation before bitrate is available will return 0. How to account for that?
|
|
availableChannelCapacity -= allocation.bandwidthRequested
|
|
}
|
|
|
|
if availableChannelCapacity < 0 {
|
|
availableChannelCapacity = 0
|
|
}
|
|
if availableChannelCapacity == 0 && s.params.Config.AllowPause {
|
|
// nothing left for managed tracks, pause them all
|
|
for _, track := range videoTracks {
|
|
if !track.IsManaged() {
|
|
continue
|
|
}
|
|
|
|
allocation := track.Pause()
|
|
if allocation.pauseReason == VideoPauseReasonBandwidth && track.SetPaused(true) {
|
|
update.HandleStreamingChange(true, track)
|
|
}
|
|
}
|
|
} else {
|
|
sorted := s.getSorted()
|
|
for _, track := range sorted {
|
|
track.ProvisionalAllocatePrepare()
|
|
}
|
|
|
|
for spatial := int32(0); spatial <= DefaultMaxLayerSpatial; spatial++ {
|
|
for temporal := int32(0); temporal <= DefaultMaxLayerTemporal; temporal++ {
|
|
layers := VideoLayers{
|
|
Spatial: spatial,
|
|
Temporal: temporal,
|
|
}
|
|
|
|
for _, track := range sorted {
|
|
usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layers, s.params.Config.AllowPause, FlagAllowOvershootWhileDeficient)
|
|
availableChannelCapacity -= usedChannelCapacity
|
|
if availableChannelCapacity < 0 {
|
|
availableChannelCapacity = 0
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, track := range sorted {
|
|
allocation := track.ProvisionalAllocateCommit()
|
|
if allocation.pauseReason == VideoPauseReasonBandwidth && track.SetPaused(true) {
|
|
update.HandleStreamingChange(true, track)
|
|
}
|
|
}
|
|
}
|
|
|
|
s.maybeSendUpdate(update)
|
|
|
|
s.adjustState()
|
|
}
|
|
|
|
func (s *StreamAllocator) maybeSendUpdate(update *StreamStateUpdate) {
|
|
if update.Empty() {
|
|
return
|
|
}
|
|
|
|
// logging individual changes to make it easier for logging systems
|
|
for _, streamState := range update.StreamStates {
|
|
s.params.Logger.Debugw("streamed tracks changed",
|
|
"trackID", streamState.TrackID,
|
|
"state", streamState.State,
|
|
)
|
|
}
|
|
if s.onStreamStateChange != nil {
|
|
err := s.onStreamStateChange(update)
|
|
if err != nil {
|
|
s.params.Logger.Errorw("could not send streamed tracks update", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) getExpectedBandwidthUsage() int64 {
|
|
expected := int64(0)
|
|
for _, track := range s.getTracks() {
|
|
expected += track.BandwidthRequested()
|
|
}
|
|
|
|
return expected
|
|
}
|
|
|
|
func (s *StreamAllocator) getNackDelta() (uint32, uint32) {
|
|
aggPacketDelta := uint32(0)
|
|
aggRepeatedNackDelta := uint32(0)
|
|
for _, track := range s.getTracks() {
|
|
packetDelta, nackDelta := track.GetNackDelta()
|
|
aggPacketDelta += packetDelta
|
|
aggRepeatedNackDelta += nackDelta
|
|
}
|
|
|
|
return aggPacketDelta, aggRepeatedNackDelta
|
|
}
|
|
|
|
func (s *StreamAllocator) newChannelObserverProbe() *ChannelObserver {
|
|
return NewChannelObserver(ChannelObserverParamsProbe, s.params.Logger)
|
|
}
|
|
|
|
func (s *StreamAllocator) newChannelObserverNonProbe() *ChannelObserver {
|
|
return NewChannelObserver(ChannelObserverParamsNonProbe, s.params.Logger)
|
|
}
|
|
|
|
func (s *StreamAllocator) initProbe(probeRateBps int64) {
|
|
s.lastProbeStartTime = time.Now()
|
|
|
|
expectedBandwidthUsage := s.getExpectedBandwidthUsage()
|
|
s.probeGoalBps = expectedBandwidthUsage + probeRateBps
|
|
|
|
s.abortedProbeClusterId = ProbeClusterIdInvalid
|
|
|
|
s.probeTrendObserved = false
|
|
|
|
s.probeEndTime = time.Time{}
|
|
|
|
s.channelObserver = s.newChannelObserverProbe()
|
|
s.channelObserver.SeedEstimate(s.lastReceivedEstimate)
|
|
|
|
desiredRateBps := int(probeRateBps) + int(math.Max(float64(s.committedChannelCapacity), float64(expectedBandwidthUsage)))
|
|
s.probeClusterId = s.prober.AddCluster(
|
|
desiredRateBps,
|
|
int(expectedBandwidthUsage),
|
|
ProbeMinDuration,
|
|
ProbeMaxDuration,
|
|
)
|
|
s.params.Logger.Infow(
|
|
"stream allocator: starting probe",
|
|
"probeClusterId", s.probeClusterId,
|
|
"current usage", expectedBandwidthUsage,
|
|
"committed", s.committedChannelCapacity,
|
|
"lastReceived", s.lastReceivedEstimate,
|
|
"probeRateBps", probeRateBps,
|
|
"goalBps", expectedBandwidthUsage+probeRateBps,
|
|
)
|
|
}
|
|
|
|
func (s *StreamAllocator) resetProbe() {
|
|
s.lastProbeStartTime = time.Now()
|
|
|
|
s.resetProbeInterval()
|
|
|
|
s.clearProbe()
|
|
}
|
|
|
|
func (s *StreamAllocator) clearProbe() {
|
|
s.probeClusterId = ProbeClusterIdInvalid
|
|
s.abortedProbeClusterId = ProbeClusterIdInvalid
|
|
}
|
|
|
|
func (s *StreamAllocator) backoffProbeInterval() {
|
|
s.probeInterval = time.Duration(s.probeInterval.Seconds()*ProbeBackoffFactor) * time.Second
|
|
if s.probeInterval > ProbeWaitMax {
|
|
s.probeInterval = ProbeWaitMax
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) resetProbeInterval() {
|
|
s.probeInterval = ProbeWaitBase
|
|
}
|
|
|
|
func (s *StreamAllocator) stopProbe() {
|
|
s.prober.Reset()
|
|
}
|
|
|
|
func (s *StreamAllocator) abortProbe() {
|
|
s.abortedProbeClusterId = s.probeClusterId
|
|
s.stopProbe()
|
|
}
|
|
|
|
func (s *StreamAllocator) isInProbe() bool {
|
|
return s.probeClusterId != ProbeClusterIdInvalid
|
|
}
|
|
|
|
func (s *StreamAllocator) maybeProbe() {
|
|
if time.Since(s.lastProbeStartTime) < s.probeInterval || s.probeClusterId != ProbeClusterIdInvalid {
|
|
return
|
|
}
|
|
|
|
switch s.params.Config.ProbeMode {
|
|
case config.CongestionControlProbeModeMedia:
|
|
s.maybeProbeWithMedia()
|
|
s.adjustState()
|
|
case config.CongestionControlProbeModePadding:
|
|
s.maybeProbeWithPadding()
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) maybeProbeWithMedia() {
|
|
// boost deficient track farthest from desired layers
|
|
for _, track := range s.getMaxDistanceSortedDeficient() {
|
|
allocation, boosted := track.AllocateNextHigher(ChannelCapacityInfinity, FlagAllowOvershootInCatchup)
|
|
if !boosted {
|
|
continue
|
|
}
|
|
|
|
update := NewStreamStateUpdate()
|
|
if allocation.pauseReason == VideoPauseReasonBandwidth && track.SetPaused(true) {
|
|
update.HandleStreamingChange(true, track)
|
|
}
|
|
s.maybeSendUpdate(update)
|
|
|
|
s.lastProbeStartTime = time.Now()
|
|
break
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) maybeProbeWithPadding() {
|
|
// use deficient track farthest from desired layers to find how much to probe
|
|
for _, track := range s.getMaxDistanceSortedDeficient() {
|
|
transition, available := track.GetNextHigherTransition(FlagAllowOvershootInProbe)
|
|
if !available || transition.bandwidthDelta < 0 {
|
|
continue
|
|
}
|
|
|
|
probeRateBps := (transition.bandwidthDelta * ProbePct) / 100
|
|
if probeRateBps < ProbeMinBps {
|
|
probeRateBps = ProbeMinBps
|
|
}
|
|
|
|
s.initProbe(probeRateBps)
|
|
break
|
|
}
|
|
}
|
|
|
|
func (s *StreamAllocator) getTracks() []*Track {
|
|
s.videoTracksMu.RLock()
|
|
var tracks []*Track
|
|
for _, track := range s.videoTracks {
|
|
tracks = append(tracks, track)
|
|
}
|
|
s.videoTracksMu.RUnlock()
|
|
|
|
return tracks
|
|
}
|
|
|
|
func (s *StreamAllocator) getSorted() TrackSorter {
|
|
s.videoTracksMu.RLock()
|
|
var trackSorter TrackSorter
|
|
for _, track := range s.videoTracks {
|
|
if !track.IsManaged() {
|
|
continue
|
|
}
|
|
|
|
trackSorter = append(trackSorter, track)
|
|
}
|
|
s.videoTracksMu.RUnlock()
|
|
|
|
sort.Sort(trackSorter)
|
|
|
|
return trackSorter
|
|
}
|
|
|
|
func (s *StreamAllocator) getMinDistanceSorted(exclude *Track) MinDistanceSorter {
|
|
s.videoTracksMu.RLock()
|
|
var minDistanceSorter MinDistanceSorter
|
|
for _, track := range s.videoTracks {
|
|
if !track.IsManaged() || track == exclude {
|
|
continue
|
|
}
|
|
|
|
minDistanceSorter = append(minDistanceSorter, track)
|
|
}
|
|
s.videoTracksMu.RUnlock()
|
|
|
|
sort.Sort(minDistanceSorter)
|
|
|
|
return minDistanceSorter
|
|
}
|
|
|
|
func (s *StreamAllocator) getMaxDistanceSortedDeficient() MaxDistanceSorter {
|
|
s.videoTracksMu.RLock()
|
|
var maxDistanceSorter MaxDistanceSorter
|
|
for _, track := range s.videoTracks {
|
|
if !track.IsManaged() || !track.IsDeficient() {
|
|
continue
|
|
}
|
|
|
|
maxDistanceSorter = append(maxDistanceSorter, track)
|
|
}
|
|
s.videoTracksMu.RUnlock()
|
|
|
|
sort.Sort(maxDistanceSorter)
|
|
|
|
return maxDistanceSorter
|
|
}
|
|
|
|
// ------------------------------------------------
|
|
|
|
type StreamState int
|
|
|
|
const (
|
|
StreamStateActive StreamState = iota
|
|
StreamStatePaused
|
|
)
|
|
|
|
func (s StreamState) String() string {
|
|
switch s {
|
|
case StreamStateActive:
|
|
return "active"
|
|
case StreamStatePaused:
|
|
return "paused"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|
|
|
|
type StreamStateInfo struct {
|
|
ParticipantID livekit.ParticipantID
|
|
TrackID livekit.TrackID
|
|
State StreamState
|
|
}
|
|
|
|
type StreamStateUpdate struct {
|
|
StreamStates []*StreamStateInfo
|
|
}
|
|
|
|
func NewStreamStateUpdate() *StreamStateUpdate {
|
|
return &StreamStateUpdate{}
|
|
}
|
|
|
|
func (s *StreamStateUpdate) HandleStreamingChange(isPaused bool, track *Track) {
|
|
if isPaused {
|
|
s.StreamStates = append(s.StreamStates, &StreamStateInfo{
|
|
ParticipantID: track.PublisherID(),
|
|
TrackID: track.ID(),
|
|
State: StreamStatePaused,
|
|
})
|
|
} else {
|
|
s.StreamStates = append(s.StreamStates, &StreamStateInfo{
|
|
ParticipantID: track.PublisherID(),
|
|
TrackID: track.ID(),
|
|
State: StreamStateActive,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (s *StreamStateUpdate) Empty() bool {
|
|
return len(s.StreamStates) == 0
|
|
}
|
|
|
|
// ------------------------------------------------
|
|
|
|
type Track struct {
|
|
downTrack *DownTrack
|
|
source livekit.TrackSource
|
|
isSimulcast bool
|
|
priority uint8
|
|
publisherID livekit.ParticipantID
|
|
logger logger.Logger
|
|
|
|
maxLayers VideoLayers
|
|
|
|
totalPackets uint32
|
|
totalRepeatedNacks uint32
|
|
|
|
isDirty bool
|
|
|
|
isPaused bool
|
|
}
|
|
|
|
func newTrack(
|
|
downTrack *DownTrack,
|
|
source livekit.TrackSource,
|
|
isSimulcast bool,
|
|
publisherID livekit.ParticipantID,
|
|
logger logger.Logger,
|
|
) *Track {
|
|
t := &Track{
|
|
downTrack: downTrack,
|
|
source: source,
|
|
isSimulcast: isSimulcast,
|
|
publisherID: publisherID,
|
|
logger: logger,
|
|
isPaused: true,
|
|
}
|
|
t.SetPriority(0)
|
|
t.SetMaxLayers(downTrack.MaxLayers())
|
|
|
|
return t
|
|
}
|
|
|
|
func (t *Track) SetDirty(isDirty bool) bool {
|
|
if t.isDirty == isDirty {
|
|
return false
|
|
}
|
|
|
|
t.isDirty = isDirty
|
|
return true
|
|
}
|
|
|
|
func (t *Track) SetPaused(isPaused bool) bool {
|
|
if t.isPaused == isPaused {
|
|
return false
|
|
}
|
|
|
|
t.isPaused = isPaused
|
|
return true
|
|
}
|
|
|
|
func (t *Track) SetPriority(priority uint8) bool {
|
|
if priority == 0 {
|
|
switch t.source {
|
|
case livekit.TrackSource_SCREEN_SHARE:
|
|
priority = PriorityDefaultScreenshare
|
|
default:
|
|
priority = PriorityDefaultVideo
|
|
}
|
|
}
|
|
|
|
if t.priority == priority {
|
|
return false
|
|
}
|
|
|
|
t.priority = priority
|
|
return true
|
|
}
|
|
|
|
func (t *Track) Priority() uint8 {
|
|
return t.priority
|
|
}
|
|
|
|
func (t *Track) DownTrack() *DownTrack {
|
|
return t.downTrack
|
|
}
|
|
|
|
func (t *Track) IsManaged() bool {
|
|
return t.source != livekit.TrackSource_SCREEN_SHARE || t.isSimulcast
|
|
}
|
|
|
|
func (t *Track) ID() livekit.TrackID {
|
|
return livekit.TrackID(t.downTrack.ID())
|
|
}
|
|
|
|
func (t *Track) PublisherID() livekit.ParticipantID {
|
|
return t.publisherID
|
|
}
|
|
|
|
func (t *Track) SetMaxLayers(layers VideoLayers) bool {
|
|
if t.maxLayers == layers {
|
|
return false
|
|
}
|
|
|
|
t.maxLayers = layers
|
|
return true
|
|
}
|
|
|
|
func (t *Track) WritePaddingRTP(bytesToSend int) int {
|
|
return t.downTrack.WritePaddingRTP(bytesToSend, false)
|
|
}
|
|
|
|
func (t *Track) AllocateOptimal(allowOvershoot bool) VideoAllocation {
|
|
return t.downTrack.AllocateOptimal(allowOvershoot)
|
|
}
|
|
|
|
func (t *Track) ProvisionalAllocatePrepare() {
|
|
t.downTrack.ProvisionalAllocatePrepare()
|
|
}
|
|
|
|
func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool, allowOvershoot bool) int64 {
|
|
return t.downTrack.ProvisionalAllocate(availableChannelCapacity, layers, allowPause, allowOvershoot)
|
|
}
|
|
|
|
func (t *Track) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition {
|
|
return t.downTrack.ProvisionalAllocateGetCooperativeTransition(allowOvershoot)
|
|
}
|
|
|
|
func (t *Track) ProvisionalAllocateGetBestWeightedTransition() VideoTransition {
|
|
return t.downTrack.ProvisionalAllocateGetBestWeightedTransition()
|
|
}
|
|
|
|
func (t *Track) ProvisionalAllocateCommit() VideoAllocation {
|
|
return t.downTrack.ProvisionalAllocateCommit()
|
|
}
|
|
|
|
func (t *Track) AllocateNextHigher(availableChannelCapacity int64, allowOvershoot bool) (VideoAllocation, bool) {
|
|
return t.downTrack.AllocateNextHigher(availableChannelCapacity, allowOvershoot)
|
|
}
|
|
|
|
func (t *Track) GetNextHigherTransition(allowOvershoot bool) (VideoTransition, bool) {
|
|
return t.downTrack.GetNextHigherTransition(allowOvershoot)
|
|
}
|
|
|
|
func (t *Track) Pause() VideoAllocation {
|
|
return t.downTrack.Pause()
|
|
}
|
|
|
|
func (t *Track) IsDeficient() bool {
|
|
return t.downTrack.IsDeficient()
|
|
}
|
|
|
|
func (t *Track) BandwidthRequested() int64 {
|
|
return t.downTrack.BandwidthRequested()
|
|
}
|
|
|
|
func (t *Track) DistanceToDesired() int32 {
|
|
return t.downTrack.DistanceToDesired()
|
|
}
|
|
|
|
func (t *Track) GetNackDelta() (uint32, uint32) {
|
|
totalPackets, totalRepeatedNacks := t.downTrack.GetNackStats()
|
|
|
|
packetDelta := totalPackets - t.totalPackets
|
|
t.totalPackets = totalPackets
|
|
|
|
nackDelta := totalRepeatedNacks - t.totalRepeatedNacks
|
|
t.totalRepeatedNacks = totalRepeatedNacks
|
|
|
|
return packetDelta, nackDelta
|
|
}
|
|
|
|
// ------------------------------------------------
|
|
|
|
type TrackSorter []*Track
|
|
|
|
func (t TrackSorter) Len() int {
|
|
return len(t)
|
|
}
|
|
|
|
func (t TrackSorter) Swap(i, j int) {
|
|
t[i], t[j] = t[j], t[i]
|
|
}
|
|
|
|
func (t TrackSorter) Less(i, j int) bool {
|
|
//
|
|
// TrackSorter is used to allocate layer-by-layer.
|
|
// So, higher priority track should come earlier so that it gets an earlier shot at each layer
|
|
//
|
|
if t[i].priority != t[j].priority {
|
|
return t[i].priority > t[j].priority
|
|
}
|
|
|
|
if t[i].maxLayers.Spatial != t[j].maxLayers.Spatial {
|
|
return t[i].maxLayers.Spatial > t[j].maxLayers.Spatial
|
|
}
|
|
|
|
return t[i].maxLayers.Temporal > t[j].maxLayers.Temporal
|
|
}
|
|
|
|
// ------------------------------------------------
|
|
|
|
type MaxDistanceSorter []*Track
|
|
|
|
func (m MaxDistanceSorter) Len() int {
|
|
return len(m)
|
|
}
|
|
|
|
func (m MaxDistanceSorter) Swap(i, j int) {
|
|
m[i], m[j] = m[j], m[i]
|
|
}
|
|
|
|
func (m MaxDistanceSorter) Less(i, j int) bool {
|
|
//
|
|
// MaxDistanceSorter is used to find a deficient track to use for probing during recovery from congestion.
|
|
// So, higher priority track should come earlier so that they have a chance to recover sooner.
|
|
//
|
|
if m[i].priority != m[j].priority {
|
|
return m[i].priority > m[j].priority
|
|
}
|
|
|
|
return m[i].DistanceToDesired() > m[j].DistanceToDesired()
|
|
}
|
|
|
|
// ------------------------------------------------
|
|
|
|
type MinDistanceSorter []*Track
|
|
|
|
func (m MinDistanceSorter) Len() int {
|
|
return len(m)
|
|
}
|
|
|
|
func (m MinDistanceSorter) Swap(i, j int) {
|
|
m[i], m[j] = m[j], m[i]
|
|
}
|
|
|
|
func (m MinDistanceSorter) Less(i, j int) bool {
|
|
//
|
|
// MinDistanceSorter is used to find excess bandwidth in cooperative allocation.
|
|
// So, lower priority track should come earlier so that they contribute bandwidth to higher priority tracks.
|
|
//
|
|
if m[i].priority != m[j].priority {
|
|
return m[i].priority < m[j].priority
|
|
}
|
|
|
|
return m[i].DistanceToDesired() < m[j].DistanceToDesired()
|
|
}
|
|
|
|
// ------------------------------------------------
|
|
|
|
type ChannelTrend int
|
|
|
|
const (
|
|
ChannelTrendNeutral ChannelTrend = iota
|
|
ChannelTrendClearing
|
|
ChannelTrendCongesting
|
|
)
|
|
|
|
func (c ChannelTrend) String() string {
|
|
switch c {
|
|
case ChannelTrendNeutral:
|
|
return "NEUTRAL"
|
|
case ChannelTrendClearing:
|
|
return "CLEARING"
|
|
case ChannelTrendCongesting:
|
|
return "CONGESTING"
|
|
default:
|
|
return fmt.Sprintf("%d", int(c))
|
|
}
|
|
}
|
|
|
|
type ChannelCongestionReason int
|
|
|
|
const (
|
|
ChannelCongestionReasonNone ChannelCongestionReason = iota
|
|
ChannelCongestionReasonEstimate
|
|
ChannelCongestionReasonLoss
|
|
)
|
|
|
|
func (c ChannelCongestionReason) String() string {
|
|
switch c {
|
|
case ChannelCongestionReasonNone:
|
|
return "NONE"
|
|
case ChannelCongestionReasonEstimate:
|
|
return "ESTIMATE"
|
|
case ChannelCongestionReasonLoss:
|
|
return "LOSS"
|
|
default:
|
|
return fmt.Sprintf("%d", int(c))
|
|
}
|
|
}
|
|
|
|
type ChannelObserverParams struct {
|
|
Name string
|
|
EstimateRequiredSamples int
|
|
EstimateDownwardTrendThreshold float64
|
|
EstimateCollapseValues bool
|
|
NackWindowMinDuration time.Duration
|
|
NackWindowMaxDuration time.Duration
|
|
NackRatioThreshold float64
|
|
}
|
|
|
|
type ChannelObserver struct {
|
|
params ChannelObserverParams
|
|
logger logger.Logger
|
|
|
|
estimateTrend *TrendDetector
|
|
|
|
nackWindowStartTime time.Time
|
|
packets uint32
|
|
repeatedNacks uint32
|
|
}
|
|
|
|
func NewChannelObserver(params ChannelObserverParams, logger logger.Logger) *ChannelObserver {
|
|
return &ChannelObserver{
|
|
params: params,
|
|
logger: logger,
|
|
estimateTrend: NewTrendDetector(TrendDetectorParams{
|
|
Name: params.Name + "-estimate",
|
|
Logger: logger,
|
|
RequiredSamples: params.EstimateRequiredSamples,
|
|
DownwardTrendThreshold: params.EstimateDownwardTrendThreshold,
|
|
CollapseValues: params.EstimateCollapseValues,
|
|
}),
|
|
}
|
|
}
|
|
|
|
func (c *ChannelObserver) SeedEstimate(estimate int64) {
|
|
c.estimateTrend.Seed(estimate)
|
|
}
|
|
|
|
func (c *ChannelObserver) SeedNack(packets uint32, repeatedNacks uint32) {
|
|
c.packets = packets
|
|
c.repeatedNacks = repeatedNacks
|
|
}
|
|
|
|
func (c *ChannelObserver) AddEstimate(estimate int64) {
|
|
c.estimateTrend.AddValue(estimate)
|
|
}
|
|
|
|
func (c *ChannelObserver) AddNack(packets uint32, repeatedNacks uint32) {
|
|
if c.params.NackWindowMaxDuration != 0 && !c.nackWindowStartTime.IsZero() && time.Since(c.nackWindowStartTime) > c.params.NackWindowMaxDuration {
|
|
c.nackWindowStartTime = time.Time{}
|
|
c.packets = 0
|
|
c.repeatedNacks = 0
|
|
}
|
|
|
|
//
|
|
// Start NACK monitoring window only when a repeated NACK happens.
|
|
// This allows locking tightly to when NACKs start happening and
|
|
// check if the NACKs keep adding up (potentially a sign of congestion)
|
|
// or isolated losses
|
|
//
|
|
if c.repeatedNacks == 0 && repeatedNacks != 0 {
|
|
c.nackWindowStartTime = time.Now()
|
|
}
|
|
|
|
if !c.nackWindowStartTime.IsZero() {
|
|
c.packets += packets
|
|
c.repeatedNacks += repeatedNacks
|
|
}
|
|
}
|
|
|
|
func (c *ChannelObserver) GetLowestEstimate() int64 {
|
|
return c.estimateTrend.GetLowest()
|
|
}
|
|
|
|
func (c *ChannelObserver) GetHighestEstimate() int64 {
|
|
return c.estimateTrend.GetHighest()
|
|
}
|
|
|
|
func (c *ChannelObserver) GetNackRatio() (uint32, uint32, float64) {
|
|
ratio := 0.0
|
|
if c.packets != 0 {
|
|
ratio = float64(c.repeatedNacks) / float64(c.packets)
|
|
if ratio > 1.0 {
|
|
ratio = 1.0
|
|
}
|
|
}
|
|
|
|
return c.packets, c.repeatedNacks, ratio
|
|
}
|
|
|
|
func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason) {
|
|
estimateDirection := c.estimateTrend.GetDirection()
|
|
packets, repeatedNacks, nackRatio := c.GetNackRatio()
|
|
|
|
switch {
|
|
case estimateDirection == TrendDirectionDownward:
|
|
c.logger.Debugw(
|
|
"stream allocator: channel observer: estimate is trending downward",
|
|
"name", c.params.Name,
|
|
"estimate", c.estimateTrend.ToString(),
|
|
"packets", packets,
|
|
"repeatedNacks", repeatedNacks,
|
|
"ratio", nackRatio,
|
|
)
|
|
return ChannelTrendCongesting, ChannelCongestionReasonEstimate
|
|
case c.params.NackWindowMinDuration != 0 && !c.nackWindowStartTime.IsZero() && time.Since(c.nackWindowStartTime) > c.params.NackWindowMinDuration && nackRatio > c.params.NackRatioThreshold:
|
|
c.logger.Debugw(
|
|
"stream allocator: channel observer: high rate of repeated NACKs",
|
|
"name", c.params.Name,
|
|
"estimate", c.estimateTrend.ToString(),
|
|
"packets", packets,
|
|
"repeatedNacks", repeatedNacks,
|
|
"ratio", nackRatio,
|
|
)
|
|
return ChannelTrendCongesting, ChannelCongestionReasonLoss
|
|
case estimateDirection == TrendDirectionUpward:
|
|
return ChannelTrendClearing, ChannelCongestionReasonNone
|
|
}
|
|
|
|
return ChannelTrendNeutral, ChannelCongestionReasonNone
|
|
}
|
|
|
|
// ------------------------------------------------
|
|
|
|
type TrendDirection int
|
|
|
|
const (
|
|
TrendDirectionNeutral TrendDirection = iota
|
|
TrendDirectionUpward
|
|
TrendDirectionDownward
|
|
)
|
|
|
|
func (t TrendDirection) String() string {
|
|
switch t {
|
|
case TrendDirectionNeutral:
|
|
return "NEUTRAL"
|
|
case TrendDirectionUpward:
|
|
return "UPWARD"
|
|
case TrendDirectionDownward:
|
|
return "DOWNWARD"
|
|
default:
|
|
return fmt.Sprintf("%d", int(t))
|
|
}
|
|
}
|
|
|
|
type TrendDetectorParams struct {
|
|
Name string
|
|
Logger logger.Logger
|
|
RequiredSamples int
|
|
DownwardTrendThreshold float64
|
|
CollapseValues bool
|
|
}
|
|
|
|
type TrendDetector struct {
|
|
params TrendDetectorParams
|
|
|
|
startTime time.Time
|
|
numSamples int
|
|
values []int64
|
|
lowestValue int64
|
|
highestValue int64
|
|
|
|
direction TrendDirection
|
|
}
|
|
|
|
func NewTrendDetector(params TrendDetectorParams) *TrendDetector {
|
|
return &TrendDetector{
|
|
params: params,
|
|
startTime: time.Now(),
|
|
direction: TrendDirectionNeutral,
|
|
}
|
|
}
|
|
|
|
func (t *TrendDetector) Seed(value int64) {
|
|
if len(t.values) != 0 {
|
|
return
|
|
}
|
|
|
|
t.values = append(t.values, value)
|
|
}
|
|
|
|
func (t *TrendDetector) AddValue(value int64) {
|
|
t.numSamples++
|
|
if t.lowestValue == 0 || value < t.lowestValue {
|
|
t.lowestValue = value
|
|
}
|
|
if value > t.highestValue {
|
|
t.highestValue = value
|
|
}
|
|
|
|
// ignore duplicate values
|
|
if t.params.CollapseValues && len(t.values) != 0 && t.values[len(t.values)-1] == value {
|
|
return
|
|
}
|
|
|
|
if len(t.values) == t.params.RequiredSamples {
|
|
t.values = t.values[1:]
|
|
}
|
|
t.values = append(t.values, value)
|
|
|
|
t.updateDirection()
|
|
}
|
|
|
|
func (t *TrendDetector) GetLowest() int64 {
|
|
return t.lowestValue
|
|
}
|
|
|
|
func (t *TrendDetector) GetHighest() int64 {
|
|
return t.highestValue
|
|
}
|
|
|
|
func (t *TrendDetector) GetValues() []int64 {
|
|
return t.values
|
|
}
|
|
|
|
func (t *TrendDetector) GetDirection() TrendDirection {
|
|
return t.direction
|
|
}
|
|
|
|
func (t *TrendDetector) ToString() string {
|
|
now := time.Now()
|
|
elapsed := now.Sub(t.startTime).Seconds()
|
|
str := fmt.Sprintf("n: %s", t.params.Name)
|
|
str += fmt.Sprintf(", t: %+v|%+v|%.2fs", t.startTime.Format(time.UnixDate), now.Format(time.UnixDate), elapsed)
|
|
str += fmt.Sprintf(", v: %d|%d|%d|%+v|%.2f", t.numSamples, t.lowestValue, t.highestValue, t.values, kendallsTau(t.values))
|
|
return str
|
|
}
|
|
|
|
func (t *TrendDetector) updateDirection() {
|
|
if len(t.values) < t.params.RequiredSamples {
|
|
t.direction = TrendDirectionNeutral
|
|
return
|
|
}
|
|
|
|
// using Kendall's Tau to find trend
|
|
kt := kendallsTau(t.values)
|
|
|
|
t.direction = TrendDirectionNeutral
|
|
switch {
|
|
case kt > 0:
|
|
t.direction = TrendDirectionUpward
|
|
case kt < t.params.DownwardTrendThreshold:
|
|
t.direction = TrendDirectionDownward
|
|
}
|
|
}
|
|
|
|
// ------------------------------------------------
|
|
|
|
func kendallsTau(values []int64) float64 {
|
|
concordantPairs := 0
|
|
discordantPairs := 0
|
|
|
|
for i := 0; i < len(values)-1; i++ {
|
|
for j := i + 1; j < len(values); j++ {
|
|
if values[i] < values[j] {
|
|
concordantPairs++
|
|
} else if values[i] > values[j] {
|
|
discordantPairs++
|
|
}
|
|
}
|
|
}
|
|
|
|
if (concordantPairs + discordantPairs) == 0 {
|
|
return 0.0
|
|
}
|
|
|
|
return (float64(concordantPairs) - float64(discordantPairs)) / (float64(concordantPairs) + float64(discordantPairs))
|
|
}
|