From fd27a70fe288442b07dfe0d7eddb01ad8a71f279 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 13 Mar 2023 07:45:59 +0530 Subject: [PATCH] stream allocator <-> down track misc changes/clean up (#1512) --- pkg/sfu/downtrack.go | 266 ++++++++++++++++++++----------------- pkg/sfu/prober.go | 136 +++++++++++++------ pkg/sfu/streamallocator.go | 65 +++++---- 3 files changed, 275 insertions(+), 192 deletions(-) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 2b6ae507e..7b2384d2e 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -123,6 +123,33 @@ func (d DownTrackState) String() string { // ------------------------------------------------------------------- +type DownTrackStreamAllocatorListener interface { + // RTCP received + OnREMB(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate) + OnTransportCCFeedback(dt *DownTrack, cc *rtcp.TransportLayerCC) + + // video layer availability changed + OnAvailableLayersChanged(dt *DownTrack) + + // video layer bitrate availability changed + OnBitrateAvailabilityChanged(dt *DownTrack) + + // max published video layer changed + OnMaxPublishedLayerChanged(dt *DownTrack) + + // subscription changed - mute/unmute + OnSubscriptionChanged(dt *DownTrack) + + // subscribed max video layer changed + OnSubscribedLayersChanged(dt *DownTrack, layers VideoLayers) + + // target video layer reaached + OnTargetLayerReached(dt *DownTrack) + + // packet(s) sent + OnPacketsSent(dt *DownTrack, size int) +} + type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport) // DownTrack implements TrackLocal, is the track used to write packets @@ -134,11 +161,9 @@ type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport) // - closed // once closed, a DownTrack cannot be re-used. type DownTrack struct { - bindLock sync.Mutex logger logger.Logger id livekit.TrackID subscriberID livekit.ParticipantID - bound atomic.Bool kind webrtc.RTPCodecType mime string ssrc uint32 @@ -150,22 +175,27 @@ type DownTrack struct { forwarder *Forwarder - upstreamCodecs []webrtc.RTPCodecParameters - codec webrtc.RTPCodecCapability - rtpHeaderExtensions []webrtc.RTPHeaderExtensionParameter - absSendTimeID int - dependencyDescriptorID int - receiver TrackReceiver - transceiver *webrtc.RTPTransceiver - writeStream webrtc.TrackLocalWriter - rtcpReader *buffer.RTCPReader - onCloseHandler func(willBeResumed bool) - onBinding func() - receiverReportListeners []ReceiverReportListener + upstreamCodecs []webrtc.RTPCodecParameters + codec webrtc.RTPCodecCapability + rtpHeaderExtensions []webrtc.RTPHeaderExtensionParameter + absSendTimeID int + dependencyDescriptorID int + receiver TrackReceiver + transceiver *webrtc.RTPTransceiver + writeStream webrtc.TrackLocalWriter + rtcpReader *buffer.RTCPReader + onCloseHandler func(willBeResumed bool) + onBinding func() + listenerLock sync.RWMutex - isClosed atomic.Bool - connected atomic.Bool - bindAndConnectedOnce atomic.Bool + receiverReportListeners []ReceiverReportListener + + bindLock sync.Mutex + bound atomic.Bool + + isClosed atomic.Bool + connected atomic.Bool + bindAndConnectedOnce atomic.Bool rtpStats *buffer.RTPStats @@ -187,33 +217,10 @@ type DownTrack struct { activePaddingOnMuteUpTrack atomic.Bool - // RTCP callbacks - onREMB func(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate) - onTransportCCFeedback func(dt *DownTrack, cc *rtcp.TransportLayerCC) - - // simulcast layer availability change callback - onAvailableLayersChanged func(dt *DownTrack) - - // layer bitrate availability change callback - onBitrateAvailabilityChanged func(dt *DownTrack) - - // max published layer change callback - onMaxPublishedLayerChanged func(dt *DownTrack) - - // subscription change callback - onSubscriptionChanged func(dt *DownTrack) - - // max layer change callback - onSubscribedLayersChanged func(dt *DownTrack, layers VideoLayers) - - // target layer found callback - onTargetLayerFound func(dt *DownTrack) - - // packet sent callback - onPacketSent []func(dt *DownTrack, size int) - - // padding packet sent callback - onPaddingSent []func(dt *DownTrack, size int) + streamAllocatorLock sync.RWMutex + streamAllocatorListener DownTrackStreamAllocatorListener + streamAllocatorReportGeneration int + streamAllocatorBytesCounter atomic.Uint32 // update stats onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat) @@ -258,8 +265,8 @@ func NewDownTrack( } d.forwarder = NewForwarder(d.kind, d.logger, d.receiver.GetReferenceLayerRTPTimestamp) d.forwarder.OnParkedLayersExpired(func() { - if d.onSubscriptionChanged != nil { - d.onSubscriptionChanged(d) + if sal := d.getStreamAllocatorListener(); sal != nil { + sal.OnSubscriptionChanged(d) } }) @@ -362,6 +369,68 @@ func (d *DownTrack) TrackInfoAvailable() { d.connectionStats.Start(ti, time.Now()) } +func (d *DownTrack) SetStreamAllocatorListener(listener DownTrackStreamAllocatorListener) { + d.streamAllocatorLock.Lock() + d.streamAllocatorListener = listener + d.streamAllocatorLock.Unlock() + + // kick of a gratuitous allocation + if listener != nil { + listener.OnSubscriptionChanged(d) + } +} + +func (d *DownTrack) getStreamAllocatorListener() DownTrackStreamAllocatorListener { + d.streamAllocatorLock.RLock() + defer d.streamAllocatorLock.RUnlock() + + return d.streamAllocatorListener +} + +func (d *DownTrack) SetStreamAllocatorReportInterval(interval time.Duration) { + d.ClearStreamAllocatorReportInterval() + + if interval == 0 { + return + } + + d.streamAllocatorLock.Lock() + d.streamAllocatorBytesCounter.Store(0) + + d.streamAllocatorReportGeneration++ + gen := d.streamAllocatorReportGeneration + d.streamAllocatorLock.Unlock() + + go func(generation int) { + timer := time.NewTimer(interval) + for { + <-timer.C + + d.streamAllocatorLock.Lock() + if generation != d.streamAllocatorReportGeneration { + d.streamAllocatorLock.Unlock() + return + } + + sal := d.streamAllocatorListener + bytes := d.streamAllocatorBytesCounter.Swap(0) + d.streamAllocatorLock.Unlock() + + if sal != nil { + sal.OnPacketsSent(d, int(bytes)) + } + + timer.Reset(interval) + } + }(gen) +} + +func (d *DownTrack) ClearStreamAllocatorReportInterval() { + d.streamAllocatorLock.Lock() + d.streamAllocatorReportGeneration++ + d.streamAllocatorLock.Unlock() +} + // ID is the unique identifier for this Track. This should be unique for the // stream, but doesn't have to globally unique. A common example would be 'audio' or 'video' // and StreamID would be 'desktop' or 'webcam' @@ -535,10 +604,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { return err } - pktSize := hdr.MarshalSize() + len(payload) - for _, f := range d.onPacketSent { - f(d, pktSize) - } + d.streamAllocatorBytesCounter.Add(uint32(hdr.MarshalSize() + len(payload))) if tp.isSwitchingToMaxLayer && d.onMaxSubscribedLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo { d.onMaxSubscribedLayerChanged(d, layer) @@ -556,8 +622,10 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { d.stopKeyFrameRequester() } - if tp.isSwitchingToTargetLayer && d.onTargetLayerFound != nil { - d.onTargetLayerFound(d) + if tp.isSwitchingToTargetLayer { + if sal := d.getStreamAllocatorListener(); sal != nil { + sal.OnTargetLayerReached(d) + } } } @@ -639,11 +707,6 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool) int { return bytesSent } - size := hdr.MarshalSize() + len(payload) - for _, f := range d.onPaddingSent { - f(d, size) - } - if !paddingOnMute { d.rtpStats.Update(&hdr, 0, len(payload), time.Now().UnixNano()) } @@ -657,7 +720,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool) int { d.sequencer.pushPadding(hdr.SequenceNumber) } - bytesSent += size + bytesSent += hdr.MarshalSize() + len(payload) } return bytesSent @@ -715,8 +778,8 @@ func (d *DownTrack) handleMute(muted bool, isPub bool, changed bool, maxLayers V d.onMaxSubscribedLayerChanged(d, notifyLayer) } - if d.onSubscriptionChanged != nil { - d.onSubscriptionChanged(d) + if sal := d.getStreamAllocatorListener(); sal != nil { + sal.OnSubscriptionChanged(d) } // when muting, send a few silence frames to ensure residual noise does not @@ -805,6 +868,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) { } d.stopKeyFrameRequester() + d.ClearStreamAllocatorReportInterval() } func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) { @@ -824,8 +888,8 @@ func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) { d.onMaxSubscribedLayerChanged(d, maxLayers.Spatial) } - if d.onSubscribedLayersChanged != nil { - d.onSubscribedLayersChanged(d, maxLayers) + if sal := d.getStreamAllocatorListener(); sal != nil { + sal.OnSubscribedLayersChanged(d, maxLayers) } } @@ -835,8 +899,8 @@ func (d *DownTrack) SetMaxTemporalLayer(temporalLayer int32) { return } - if d.onSubscribedLayersChanged != nil { - d.onSubscribedLayersChanged(d, maxLayers) + if sal := d.getStreamAllocatorListener(); sal != nil { + sal.OnSubscribedLayersChanged(d, maxLayers) } } @@ -859,22 +923,22 @@ func (d *DownTrack) SeedState(state DownTrackState) { } func (d *DownTrack) UpTrackLayersChange() { - if d.onAvailableLayersChanged != nil { - d.onAvailableLayersChanged(d) + if sal := d.getStreamAllocatorListener(); sal != nil { + sal.OnAvailableLayersChanged(d) } } func (d *DownTrack) UpTrackBitrateAvailabilityChange() { - if d.onBitrateAvailabilityChanged != nil { - d.onBitrateAvailabilityChanged(d) + if sal := d.getStreamAllocatorListener(); sal != nil { + sal.OnBitrateAvailabilityChanged(d) } } func (d *DownTrack) UpTrackMaxPublishedLayerChange(maxPublishedLayer int32) { d.forwarder.SetMaxPublishedLayer(maxPublishedLayer) - if d.onMaxPublishedLayerChanged != nil { - d.onMaxPublishedLayerChanged(d) + if sal := d.getStreamAllocatorListener(); sal != nil { + sal.OnMaxPublishedLayerChanged(d) } } @@ -912,14 +976,6 @@ func (d *DownTrack) OnBinding(fn func()) { d.onBinding = fn } -func (d *DownTrack) OnREMB(fn func(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate)) { - d.onREMB = fn -} - -func (d *DownTrack) OnTransportCCFeedback(fn func(dt *DownTrack, cc *rtcp.TransportLayerCC)) { - d.onTransportCCFeedback = fn -} - func (d *DownTrack) AddReceiverReportListener(listener ReceiverReportListener) { d.listenerLock.Lock() defer d.listenerLock.Unlock() @@ -927,41 +983,6 @@ func (d *DownTrack) AddReceiverReportListener(listener ReceiverReportListener) { d.receiverReportListeners = append(d.receiverReportListeners, listener) } -func (d *DownTrack) OnAvailableLayersChanged(fn func(dt *DownTrack)) { - d.onAvailableLayersChanged = fn -} - -func (d *DownTrack) OnBitrateAvailabilityChanged(fn func(dt *DownTrack)) { - d.onBitrateAvailabilityChanged = fn -} - -func (d *DownTrack) OnMaxPublishedLayerChanged(fn func(dt *DownTrack)) { - d.onMaxPublishedLayerChanged = fn -} - -func (d *DownTrack) OnSubscriptionChanged(fn func(dt *DownTrack)) { - d.onSubscriptionChanged = fn - - // kick off an allocation just in case other events happened before callbacks were set up - go fn(d) -} - -func (d *DownTrack) OnSubscribedLayersChanged(fn func(dt *DownTrack, layers VideoLayers)) { - d.onSubscribedLayersChanged = fn -} - -func (d *DownTrack) OnTargetLayerFound(fn func(dt *DownTrack)) { - d.onTargetLayerFound = fn -} - -func (d *DownTrack) OnPacketSent(fn func(dt *DownTrack, size int)) { - d.onPacketSent = append(d.onPacketSent, fn) -} - -func (d *DownTrack) OnPaddingSent(fn func(dt *DownTrack, size int)) { - d.onPaddingSent = append(d.onPaddingSent, fn) -} - func (d *DownTrack) OnStatsUpdate(fn func(dt *DownTrack, stat *livekit.AnalyticsStat)) { d.onStatsUpdate = fn } @@ -1157,9 +1178,7 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan return } - for _, f := range d.onPacketSent { - f(d, pktSize) - } + d.streamAllocatorBytesCounter.Add(uint32(pktSize)) // only the first frame will need frameEndNeeded to close out the // previous picture, rest are small key frames (for the video case) @@ -1290,8 +1309,8 @@ func (d *DownTrack) handleRTCP(bytes []byte) { sendPliOnce() case *rtcp.ReceiverEstimatedMaximumBitrate: - if d.onREMB != nil { - d.onREMB(d, p) + if sal := d.getStreamAllocatorListener(); sal != nil { + sal.OnREMB(d, p) } case *rtcp.ReceiverReport: @@ -1331,8 +1350,10 @@ func (d *DownTrack) handleRTCP(bytes []byte) { go d.retransmitPackets(nacks) case *rtcp.TransportLayerCC: - if p.MediaSSRC == d.ssrc && d.onTransportCCFeedback != nil { - d.onTransportCCFeedback(d, p) + if p.MediaSSRC == d.ssrc { + if sal := d.getStreamAllocatorListener(); sal != nil { + sal.OnTransportCCFeedback(d, p) + } } } } @@ -1461,10 +1482,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { if _, err = d.writeStream.WriteRTP(&pkt.Header, payload); err != nil { d.logger.Errorw("writing rtx packet err", err) } else { - pktSize := pkt.Header.MarshalSize() + len(payload) - for _, f := range d.onPacketSent { - f(d, pktSize) - } + d.streamAllocatorBytesCounter.Add(uint32(pkt.Header.MarshalSize() + len(payload))) d.rtpStats.Update(&pkt.Header, len(payload), 0, time.Now().UnixNano()) } diff --git a/pkg/sfu/prober.go b/pkg/sfu/prober.go index cde3dc49a..42dd61d1e 100644 --- a/pkg/sfu/prober.go +++ b/pkg/sfu/prober.go @@ -116,6 +116,12 @@ import ( "github.com/livekit/protocol/logger" ) +type ProberListener interface { + OnSendProbe(bytesToSend int) + OnProbeClusterDone(info ProbeClusterInfo) + OnActiveChanged(isActive bool) +} + type ProberParams struct { Logger logger.Logger } @@ -125,12 +131,14 @@ type Prober struct { clusterId atomic.Uint32 - clustersMu sync.RWMutex - clusters deque.Deque - activeCluster *Cluster + clustersMu sync.RWMutex + clusters deque.Deque + activeCluster *Cluster + activeStateQueue []bool + activeStateQueueInProcess atomic.Bool - onSendProbe func(bytesToSend int) - onProbeClusterDone func(info ProbeClusterInfo) + listenerMu sync.RWMutex + listener ProberListener } func NewProber(params ProberParams) *Prober { @@ -141,6 +149,19 @@ func NewProber(params ProberParams) *Prober { return p } +func (p *Prober) SetProberListener(listener ProberListener) { + p.listenerMu.Lock() + p.listener = listener + p.listenerMu.Unlock() +} + +func (p *Prober) getProberListener() ProberListener { + p.listenerMu.RLock() + defer p.listenerMu.RUnlock() + + return p.listener +} + func (p *Prober) IsRunning() bool { p.clustersMu.RLock() defer p.clustersMu.RUnlock() @@ -161,19 +182,17 @@ func (p *Prober) Reset() { p.clusters.Clear() p.activeCluster = nil + + p.activeStateQueue = append(p.activeStateQueue, false) p.clustersMu.Unlock() - if p.onProbeClusterDone != nil && reset { - p.onProbeClusterDone(info) + if reset { + if pl := p.getProberListener(); pl != nil { + pl.OnProbeClusterDone(info) + } } -} -func (p *Prober) OnSendProbe(f func(bytesToSend int)) { - p.onSendProbe = f -} - -func (p *Prober) OnProbeClusterDone(f func(info ProbeClusterInfo)) { - p.onProbeClusterDone = f + p.processActiveStateQueue() } func (p *Prober) AddCluster(desiredRateBps int, expectedRateBps int, minDuration time.Duration, maxDuration time.Duration) ProbeClusterId { @@ -190,13 +209,13 @@ func (p *Prober) AddCluster(desiredRateBps int, expectedRateBps int, minDuration return clusterId } -func (p *Prober) PacketSent(size int) { +func (p *Prober) PacketsSent(size int) { cluster := p.getFrontCluster() if cluster == nil { return } - cluster.PacketSent(size) + cluster.PacketsSent(size) } func (p *Prober) ProbeSent(size int) { @@ -227,10 +246,10 @@ func (p *Prober) getFrontCluster() *Cluster { func (p *Prober) popFrontCluster(cluster *Cluster) { p.clustersMu.Lock() - defer p.clustersMu.Unlock() if p.clusters.Len() == 0 { p.activeCluster = nil + p.clustersMu.Unlock() return } @@ -241,28 +260,64 @@ func (p *Prober) popFrontCluster(cluster *Cluster) { if cluster == p.activeCluster { p.activeCluster = nil } + + if p.clusters.Len() == 0 { + p.activeStateQueue = append(p.activeStateQueue, false) + } + p.clustersMu.Unlock() + + p.processActiveStateQueue() } func (p *Prober) pushBackClusterAndMaybeStart(cluster *Cluster) { p.clustersMu.Lock() - defer p.clustersMu.Unlock() - p.clusters.PushBack(cluster) if p.clusters.Len() == 1 { + p.activeStateQueue = append(p.activeStateQueue, true) + go p.run() } + p.clustersMu.Unlock() + + p.processActiveStateQueue() +} + +func (p *Prober) processActiveStateQueue() { + if p.activeStateQueueInProcess.Swap(true) { + // processing queue + return + } + + for { + p.clustersMu.Lock() + if len(p.activeStateQueue) == 0 { + p.clustersMu.Unlock() + break + } + + isActive := p.activeStateQueue[0] + p.activeStateQueue = p.activeStateQueue[1:] + p.clustersMu.Unlock() + + if pl := p.getProberListener(); pl != nil { + pl.OnActiveChanged(isActive) + } + } + + p.activeStateQueueInProcess.Store(false) } func (p *Prober) run() { - for { - // determine how long to sleep - cluster := p.getFrontCluster() - if cluster == nil { - return - } + // determine how long to sleep + cluster := p.getFrontCluster() + if cluster == nil { + return + } - time.Sleep(cluster.GetSleepDuration()) + timer := time.NewTimer(cluster.GetSleepDuration()) + for { + <-timer.C // wake up and check for probes to send cluster = p.getFrontCluster() @@ -270,18 +325,25 @@ func (p *Prober) run() { return } - cluster.Process(p.onSendProbe) + cluster.Process(p.getProberListener()) if cluster.IsFinished() { p.logger.Debugw("cluster finished", "cluster", cluster.String()) - if p.onProbeClusterDone != nil { - p.onProbeClusterDone(cluster.GetInfo()) + if pl := p.getProberListener(); pl != nil { + pl.OnProbeClusterDone(cluster.GetInfo()) } p.popFrontCluster(cluster) - continue } + + // determine how long to sleep + cluster := p.getFrontCluster() + if cluster == nil { + return + } + + timer.Reset(cluster.GetSleepDuration()) } } @@ -300,12 +362,6 @@ type ProbeClusterInfo struct { } type Cluster struct { - // LK-TODO-START - // Check if we can operate at cluster level without a lock. - // The quantities that are updated in a different thread are - // bytesSentNonProbe - maybe make this an atomic value - // Lock contention time should be very minimal though. - // LK-TODO-END lock sync.RWMutex id ProbeClusterId @@ -354,7 +410,7 @@ func (c *Cluster) GetSleepDuration() time.Duration { return c.sleepDuration } -func (c *Cluster) PacketSent(size int) { +func (c *Cluster) PacketsSent(size int) { c.lock.Lock() defer c.lock.Unlock() @@ -398,7 +454,7 @@ func (c *Cluster) GetInfo() ProbeClusterInfo { } } -func (c *Cluster) Process(onSendProbe func(bytesToSend int)) { +func (c *Cluster) Process(pl ProberListener) { c.lock.RLock() timeElapsed := time.Since(c.startTime) @@ -428,8 +484,8 @@ func (c *Cluster) Process(onSendProbe func(bytesToSend int)) { bytesShortFall = ((bytesShortFall + 274) / 275) * 275 c.lock.RUnlock() - if bytesShortFall > 0 && onSendProbe != nil { - onSendProbe(bytesShortFall) + if bytesShortFall > 0 && pl != nil { + pl.OnSendProbe(bytesShortFall) } // LK-TODO look at adapting sleep time based on how many bytes and how much time is left diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index d8f2c2136..bf5ba725f 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -184,8 +184,7 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator { s.resetState() - s.prober.OnSendProbe(s.onSendProbe) - s.prober.OnProbeClusterDone(s.onProbeClusterDone) + s.prober.SetProberListener(s) return s } @@ -236,15 +235,11 @@ func (s *StreamAllocator) AddTrack(downTrack *DownTrack, params AddTrackParams) s.videoTracks[livekit.TrackID(downTrack.ID())] = track s.videoTracksMu.Unlock() - downTrack.OnREMB(s.onREMB) - downTrack.OnTransportCCFeedback(s.onTransportCCFeedback) - downTrack.OnAvailableLayersChanged(s.onAvailableLayersChanged) - downTrack.OnBitrateAvailabilityChanged(s.onBitrateAvailabilityChanged) - downTrack.OnMaxPublishedLayerChanged(s.onMaxPublishedLayerChanged) - downTrack.OnSubscriptionChanged(s.onSubscriptionChanged) - downTrack.OnSubscribedLayersChanged(s.onSubscribedLayersChanged) - downTrack.OnPacketSent(s.onPacketSent) - downTrack.OnTargetLayerFound(s.onTargetLayerFound) + downTrack.SetStreamAllocatorListener(s) + if s.prober.IsRunning() { + // LK-TODO: this can be changed to adapt to probe rate + downTrack.SetStreamAllocatorReportInterval(50 * time.Millisecond) + } s.maybePostEventAllocateTrack(downTrack) } @@ -285,7 +280,7 @@ func (s *StreamAllocator) resetState() { } // called when a new REMB is received (receive side bandwidth estimation) -func (s *StreamAllocator) onREMB(downTrack *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate) { +func (s *StreamAllocator) OnREMB(downTrack *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate) { // // Channel capacity is estimated at a peer connection level. All down tracks // in the peer connection will end up calling this for a REMB report with @@ -365,7 +360,7 @@ func (s *StreamAllocator) onREMB(downTrack *DownTrack, remb *rtcp.ReceiverEstima } // called when a new transport-cc feedback is received -func (s *StreamAllocator) onTransportCCFeedback(downTrack *DownTrack, fb *rtcp.TransportLayerCC) { +func (s *StreamAllocator) OnTransportCCFeedback(downTrack *DownTrack, fb *rtcp.TransportLayerCC) { if s.bwe != nil { s.bwe.WriteRTCP([]rtcp.Packet{fb}, nil) } @@ -380,27 +375,27 @@ func (s *StreamAllocator) onTargetBitrateChange(bitrate int) { } // called when feeding track's layer availability changes -func (s *StreamAllocator) onAvailableLayersChanged(downTrack *DownTrack) { +func (s *StreamAllocator) OnAvailableLayersChanged(downTrack *DownTrack) { s.maybePostEventAllocateTrack(downTrack) } // called when feeding track's bitrate measurement of any layer is available -func (s *StreamAllocator) onBitrateAvailabilityChanged(downTrack *DownTrack) { +func (s *StreamAllocator) OnBitrateAvailabilityChanged(downTrack *DownTrack) { s.maybePostEventAllocateTrack(downTrack) } // called when feeding track's max publisher layer changes -func (s *StreamAllocator) onMaxPublishedLayerChanged(downTrack *DownTrack) { +func (s *StreamAllocator) OnMaxPublishedLayerChanged(downTrack *DownTrack) { s.maybePostEventAllocateTrack(downTrack) } // called when subscription settings changes (muting/unmuting of track) -func (s *StreamAllocator) onSubscriptionChanged(downTrack *DownTrack) { +func (s *StreamAllocator) OnSubscriptionChanged(downTrack *DownTrack) { s.maybePostEventAllocateTrack(downTrack) } // called when subscribed layers changes (limiting max layers) -func (s *StreamAllocator) onSubscribedLayersChanged(downTrack *DownTrack, layers VideoLayers) { +func (s *StreamAllocator) OnSubscribedLayersChanged(downTrack *DownTrack, layers VideoLayers) { shouldPost := false s.videoTracksMu.Lock() if track := s.videoTracks[livekit.TrackID(downTrack.ID())]; track != nil { @@ -418,13 +413,21 @@ func (s *StreamAllocator) onSubscribedLayersChanged(downTrack *DownTrack, layers } } +// called when forwarder finds a target layer +func (s *StreamAllocator) OnTargetLayerReached(downTrack *DownTrack) { + s.postEvent(Event{ + Signal: streamAllocatorSignalTargetLayerFound, + TrackID: livekit.TrackID(downTrack.ID()), + }) +} + // called when a video DownTrack sends a packet -func (s *StreamAllocator) onPacketSent(downTrack *DownTrack, size int) { - s.prober.PacketSent(size) +func (s *StreamAllocator) OnPacketsSent(downTrack *DownTrack, size int) { + s.prober.PacketsSent(size) } // called when prober wants to send packet(s) -func (s *StreamAllocator) onSendProbe(bytesToSend int) { +func (s *StreamAllocator) OnSendProbe(bytesToSend int) { s.postEvent(Event{ Signal: streamAllocatorSignalSendProbe, Data: bytesToSend, @@ -432,19 +435,23 @@ func (s *StreamAllocator) onSendProbe(bytesToSend int) { } // called when prober wants to send packet(s) -func (s *StreamAllocator) onProbeClusterDone(info ProbeClusterInfo) { +func (s *StreamAllocator) OnProbeClusterDone(info ProbeClusterInfo) { s.postEvent(Event{ Signal: streamAllocatorSignalProbeClusterDone, Data: info, }) } -// called when forwarder finds a target layer -func (s *StreamAllocator) onTargetLayerFound(downTrack *DownTrack) { - s.postEvent(Event{ - Signal: streamAllocatorSignalTargetLayerFound, - TrackID: livekit.TrackID(downTrack.ID()), - }) +// called when prober active state changes +func (s *StreamAllocator) OnActiveChanged(isActive bool) { + for _, t := range s.getTracks() { + if isActive { + // LK-TODO: this can be changed to adapt to probe rate + t.DownTrack().SetStreamAllocatorReportInterval(50 * time.Millisecond) + } else { + t.DownTrack().ClearStreamAllocatorReportInterval() + } + } } func (s *StreamAllocator) maybePostEventAllocateTrack(downTrack *DownTrack) { @@ -484,6 +491,8 @@ func (s *StreamAllocator) processEvents() { for event := range s.eventCh { s.handleEvent(&event) } + + s.stopProbe() } func (s *StreamAllocator) ping() {