mirror of
https://github.com/livekit/livekit.git
synced 2026-04-28 12:45:43 +00:00
stream allocator <-> down track misc changes/clean up (#1512)
This commit is contained in:
+142
-124
@@ -123,6 +123,33 @@ func (d DownTrackState) String() string {
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
type DownTrackStreamAllocatorListener interface {
|
||||
// RTCP received
|
||||
OnREMB(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate)
|
||||
OnTransportCCFeedback(dt *DownTrack, cc *rtcp.TransportLayerCC)
|
||||
|
||||
// video layer availability changed
|
||||
OnAvailableLayersChanged(dt *DownTrack)
|
||||
|
||||
// video layer bitrate availability changed
|
||||
OnBitrateAvailabilityChanged(dt *DownTrack)
|
||||
|
||||
// max published video layer changed
|
||||
OnMaxPublishedLayerChanged(dt *DownTrack)
|
||||
|
||||
// subscription changed - mute/unmute
|
||||
OnSubscriptionChanged(dt *DownTrack)
|
||||
|
||||
// subscribed max video layer changed
|
||||
OnSubscribedLayersChanged(dt *DownTrack, layers VideoLayers)
|
||||
|
||||
// target video layer reaached
|
||||
OnTargetLayerReached(dt *DownTrack)
|
||||
|
||||
// packet(s) sent
|
||||
OnPacketsSent(dt *DownTrack, size int)
|
||||
}
|
||||
|
||||
type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport)
|
||||
|
||||
// DownTrack implements TrackLocal, is the track used to write packets
|
||||
@@ -134,11 +161,9 @@ type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport)
|
||||
// - closed
|
||||
// once closed, a DownTrack cannot be re-used.
|
||||
type DownTrack struct {
|
||||
bindLock sync.Mutex
|
||||
logger logger.Logger
|
||||
id livekit.TrackID
|
||||
subscriberID livekit.ParticipantID
|
||||
bound atomic.Bool
|
||||
kind webrtc.RTPCodecType
|
||||
mime string
|
||||
ssrc uint32
|
||||
@@ -150,22 +175,27 @@ type DownTrack struct {
|
||||
|
||||
forwarder *Forwarder
|
||||
|
||||
upstreamCodecs []webrtc.RTPCodecParameters
|
||||
codec webrtc.RTPCodecCapability
|
||||
rtpHeaderExtensions []webrtc.RTPHeaderExtensionParameter
|
||||
absSendTimeID int
|
||||
dependencyDescriptorID int
|
||||
receiver TrackReceiver
|
||||
transceiver *webrtc.RTPTransceiver
|
||||
writeStream webrtc.TrackLocalWriter
|
||||
rtcpReader *buffer.RTCPReader
|
||||
onCloseHandler func(willBeResumed bool)
|
||||
onBinding func()
|
||||
receiverReportListeners []ReceiverReportListener
|
||||
upstreamCodecs []webrtc.RTPCodecParameters
|
||||
codec webrtc.RTPCodecCapability
|
||||
rtpHeaderExtensions []webrtc.RTPHeaderExtensionParameter
|
||||
absSendTimeID int
|
||||
dependencyDescriptorID int
|
||||
receiver TrackReceiver
|
||||
transceiver *webrtc.RTPTransceiver
|
||||
writeStream webrtc.TrackLocalWriter
|
||||
rtcpReader *buffer.RTCPReader
|
||||
onCloseHandler func(willBeResumed bool)
|
||||
onBinding func()
|
||||
|
||||
listenerLock sync.RWMutex
|
||||
isClosed atomic.Bool
|
||||
connected atomic.Bool
|
||||
bindAndConnectedOnce atomic.Bool
|
||||
receiverReportListeners []ReceiverReportListener
|
||||
|
||||
bindLock sync.Mutex
|
||||
bound atomic.Bool
|
||||
|
||||
isClosed atomic.Bool
|
||||
connected atomic.Bool
|
||||
bindAndConnectedOnce atomic.Bool
|
||||
|
||||
rtpStats *buffer.RTPStats
|
||||
|
||||
@@ -187,33 +217,10 @@ type DownTrack struct {
|
||||
|
||||
activePaddingOnMuteUpTrack atomic.Bool
|
||||
|
||||
// RTCP callbacks
|
||||
onREMB func(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate)
|
||||
onTransportCCFeedback func(dt *DownTrack, cc *rtcp.TransportLayerCC)
|
||||
|
||||
// simulcast layer availability change callback
|
||||
onAvailableLayersChanged func(dt *DownTrack)
|
||||
|
||||
// layer bitrate availability change callback
|
||||
onBitrateAvailabilityChanged func(dt *DownTrack)
|
||||
|
||||
// max published layer change callback
|
||||
onMaxPublishedLayerChanged func(dt *DownTrack)
|
||||
|
||||
// subscription change callback
|
||||
onSubscriptionChanged func(dt *DownTrack)
|
||||
|
||||
// max layer change callback
|
||||
onSubscribedLayersChanged func(dt *DownTrack, layers VideoLayers)
|
||||
|
||||
// target layer found callback
|
||||
onTargetLayerFound func(dt *DownTrack)
|
||||
|
||||
// packet sent callback
|
||||
onPacketSent []func(dt *DownTrack, size int)
|
||||
|
||||
// padding packet sent callback
|
||||
onPaddingSent []func(dt *DownTrack, size int)
|
||||
streamAllocatorLock sync.RWMutex
|
||||
streamAllocatorListener DownTrackStreamAllocatorListener
|
||||
streamAllocatorReportGeneration int
|
||||
streamAllocatorBytesCounter atomic.Uint32
|
||||
|
||||
// update stats
|
||||
onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat)
|
||||
@@ -258,8 +265,8 @@ func NewDownTrack(
|
||||
}
|
||||
d.forwarder = NewForwarder(d.kind, d.logger, d.receiver.GetReferenceLayerRTPTimestamp)
|
||||
d.forwarder.OnParkedLayersExpired(func() {
|
||||
if d.onSubscriptionChanged != nil {
|
||||
d.onSubscriptionChanged(d)
|
||||
if sal := d.getStreamAllocatorListener(); sal != nil {
|
||||
sal.OnSubscriptionChanged(d)
|
||||
}
|
||||
})
|
||||
|
||||
@@ -362,6 +369,68 @@ func (d *DownTrack) TrackInfoAvailable() {
|
||||
d.connectionStats.Start(ti, time.Now())
|
||||
}
|
||||
|
||||
func (d *DownTrack) SetStreamAllocatorListener(listener DownTrackStreamAllocatorListener) {
|
||||
d.streamAllocatorLock.Lock()
|
||||
d.streamAllocatorListener = listener
|
||||
d.streamAllocatorLock.Unlock()
|
||||
|
||||
// kick of a gratuitous allocation
|
||||
if listener != nil {
|
||||
listener.OnSubscriptionChanged(d)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DownTrack) getStreamAllocatorListener() DownTrackStreamAllocatorListener {
|
||||
d.streamAllocatorLock.RLock()
|
||||
defer d.streamAllocatorLock.RUnlock()
|
||||
|
||||
return d.streamAllocatorListener
|
||||
}
|
||||
|
||||
func (d *DownTrack) SetStreamAllocatorReportInterval(interval time.Duration) {
|
||||
d.ClearStreamAllocatorReportInterval()
|
||||
|
||||
if interval == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
d.streamAllocatorLock.Lock()
|
||||
d.streamAllocatorBytesCounter.Store(0)
|
||||
|
||||
d.streamAllocatorReportGeneration++
|
||||
gen := d.streamAllocatorReportGeneration
|
||||
d.streamAllocatorLock.Unlock()
|
||||
|
||||
go func(generation int) {
|
||||
timer := time.NewTimer(interval)
|
||||
for {
|
||||
<-timer.C
|
||||
|
||||
d.streamAllocatorLock.Lock()
|
||||
if generation != d.streamAllocatorReportGeneration {
|
||||
d.streamAllocatorLock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
sal := d.streamAllocatorListener
|
||||
bytes := d.streamAllocatorBytesCounter.Swap(0)
|
||||
d.streamAllocatorLock.Unlock()
|
||||
|
||||
if sal != nil {
|
||||
sal.OnPacketsSent(d, int(bytes))
|
||||
}
|
||||
|
||||
timer.Reset(interval)
|
||||
}
|
||||
}(gen)
|
||||
}
|
||||
|
||||
func (d *DownTrack) ClearStreamAllocatorReportInterval() {
|
||||
d.streamAllocatorLock.Lock()
|
||||
d.streamAllocatorReportGeneration++
|
||||
d.streamAllocatorLock.Unlock()
|
||||
}
|
||||
|
||||
// ID is the unique identifier for this Track. This should be unique for the
|
||||
// stream, but doesn't have to globally unique. A common example would be 'audio' or 'video'
|
||||
// and StreamID would be 'desktop' or 'webcam'
|
||||
@@ -535,10 +604,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
|
||||
return err
|
||||
}
|
||||
|
||||
pktSize := hdr.MarshalSize() + len(payload)
|
||||
for _, f := range d.onPacketSent {
|
||||
f(d, pktSize)
|
||||
}
|
||||
d.streamAllocatorBytesCounter.Add(uint32(hdr.MarshalSize() + len(payload)))
|
||||
|
||||
if tp.isSwitchingToMaxLayer && d.onMaxSubscribedLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo {
|
||||
d.onMaxSubscribedLayerChanged(d, layer)
|
||||
@@ -556,8 +622,10 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
|
||||
d.stopKeyFrameRequester()
|
||||
}
|
||||
|
||||
if tp.isSwitchingToTargetLayer && d.onTargetLayerFound != nil {
|
||||
d.onTargetLayerFound(d)
|
||||
if tp.isSwitchingToTargetLayer {
|
||||
if sal := d.getStreamAllocatorListener(); sal != nil {
|
||||
sal.OnTargetLayerReached(d)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -639,11 +707,6 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool) int {
|
||||
return bytesSent
|
||||
}
|
||||
|
||||
size := hdr.MarshalSize() + len(payload)
|
||||
for _, f := range d.onPaddingSent {
|
||||
f(d, size)
|
||||
}
|
||||
|
||||
if !paddingOnMute {
|
||||
d.rtpStats.Update(&hdr, 0, len(payload), time.Now().UnixNano())
|
||||
}
|
||||
@@ -657,7 +720,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool) int {
|
||||
d.sequencer.pushPadding(hdr.SequenceNumber)
|
||||
}
|
||||
|
||||
bytesSent += size
|
||||
bytesSent += hdr.MarshalSize() + len(payload)
|
||||
}
|
||||
|
||||
return bytesSent
|
||||
@@ -715,8 +778,8 @@ func (d *DownTrack) handleMute(muted bool, isPub bool, changed bool, maxLayers V
|
||||
d.onMaxSubscribedLayerChanged(d, notifyLayer)
|
||||
}
|
||||
|
||||
if d.onSubscriptionChanged != nil {
|
||||
d.onSubscriptionChanged(d)
|
||||
if sal := d.getStreamAllocatorListener(); sal != nil {
|
||||
sal.OnSubscriptionChanged(d)
|
||||
}
|
||||
|
||||
// when muting, send a few silence frames to ensure residual noise does not
|
||||
@@ -805,6 +868,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
|
||||
}
|
||||
|
||||
d.stopKeyFrameRequester()
|
||||
d.ClearStreamAllocatorReportInterval()
|
||||
}
|
||||
|
||||
func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) {
|
||||
@@ -824,8 +888,8 @@ func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) {
|
||||
d.onMaxSubscribedLayerChanged(d, maxLayers.Spatial)
|
||||
}
|
||||
|
||||
if d.onSubscribedLayersChanged != nil {
|
||||
d.onSubscribedLayersChanged(d, maxLayers)
|
||||
if sal := d.getStreamAllocatorListener(); sal != nil {
|
||||
sal.OnSubscribedLayersChanged(d, maxLayers)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -835,8 +899,8 @@ func (d *DownTrack) SetMaxTemporalLayer(temporalLayer int32) {
|
||||
return
|
||||
}
|
||||
|
||||
if d.onSubscribedLayersChanged != nil {
|
||||
d.onSubscribedLayersChanged(d, maxLayers)
|
||||
if sal := d.getStreamAllocatorListener(); sal != nil {
|
||||
sal.OnSubscribedLayersChanged(d, maxLayers)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -859,22 +923,22 @@ func (d *DownTrack) SeedState(state DownTrackState) {
|
||||
}
|
||||
|
||||
func (d *DownTrack) UpTrackLayersChange() {
|
||||
if d.onAvailableLayersChanged != nil {
|
||||
d.onAvailableLayersChanged(d)
|
||||
if sal := d.getStreamAllocatorListener(); sal != nil {
|
||||
sal.OnAvailableLayersChanged(d)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DownTrack) UpTrackBitrateAvailabilityChange() {
|
||||
if d.onBitrateAvailabilityChanged != nil {
|
||||
d.onBitrateAvailabilityChanged(d)
|
||||
if sal := d.getStreamAllocatorListener(); sal != nil {
|
||||
sal.OnBitrateAvailabilityChanged(d)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DownTrack) UpTrackMaxPublishedLayerChange(maxPublishedLayer int32) {
|
||||
d.forwarder.SetMaxPublishedLayer(maxPublishedLayer)
|
||||
|
||||
if d.onMaxPublishedLayerChanged != nil {
|
||||
d.onMaxPublishedLayerChanged(d)
|
||||
if sal := d.getStreamAllocatorListener(); sal != nil {
|
||||
sal.OnMaxPublishedLayerChanged(d)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -912,14 +976,6 @@ func (d *DownTrack) OnBinding(fn func()) {
|
||||
d.onBinding = fn
|
||||
}
|
||||
|
||||
func (d *DownTrack) OnREMB(fn func(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate)) {
|
||||
d.onREMB = fn
|
||||
}
|
||||
|
||||
func (d *DownTrack) OnTransportCCFeedback(fn func(dt *DownTrack, cc *rtcp.TransportLayerCC)) {
|
||||
d.onTransportCCFeedback = fn
|
||||
}
|
||||
|
||||
func (d *DownTrack) AddReceiverReportListener(listener ReceiverReportListener) {
|
||||
d.listenerLock.Lock()
|
||||
defer d.listenerLock.Unlock()
|
||||
@@ -927,41 +983,6 @@ func (d *DownTrack) AddReceiverReportListener(listener ReceiverReportListener) {
|
||||
d.receiverReportListeners = append(d.receiverReportListeners, listener)
|
||||
}
|
||||
|
||||
func (d *DownTrack) OnAvailableLayersChanged(fn func(dt *DownTrack)) {
|
||||
d.onAvailableLayersChanged = fn
|
||||
}
|
||||
|
||||
func (d *DownTrack) OnBitrateAvailabilityChanged(fn func(dt *DownTrack)) {
|
||||
d.onBitrateAvailabilityChanged = fn
|
||||
}
|
||||
|
||||
func (d *DownTrack) OnMaxPublishedLayerChanged(fn func(dt *DownTrack)) {
|
||||
d.onMaxPublishedLayerChanged = fn
|
||||
}
|
||||
|
||||
func (d *DownTrack) OnSubscriptionChanged(fn func(dt *DownTrack)) {
|
||||
d.onSubscriptionChanged = fn
|
||||
|
||||
// kick off an allocation just in case other events happened before callbacks were set up
|
||||
go fn(d)
|
||||
}
|
||||
|
||||
func (d *DownTrack) OnSubscribedLayersChanged(fn func(dt *DownTrack, layers VideoLayers)) {
|
||||
d.onSubscribedLayersChanged = fn
|
||||
}
|
||||
|
||||
func (d *DownTrack) OnTargetLayerFound(fn func(dt *DownTrack)) {
|
||||
d.onTargetLayerFound = fn
|
||||
}
|
||||
|
||||
func (d *DownTrack) OnPacketSent(fn func(dt *DownTrack, size int)) {
|
||||
d.onPacketSent = append(d.onPacketSent, fn)
|
||||
}
|
||||
|
||||
func (d *DownTrack) OnPaddingSent(fn func(dt *DownTrack, size int)) {
|
||||
d.onPaddingSent = append(d.onPaddingSent, fn)
|
||||
}
|
||||
|
||||
func (d *DownTrack) OnStatsUpdate(fn func(dt *DownTrack, stat *livekit.AnalyticsStat)) {
|
||||
d.onStatsUpdate = fn
|
||||
}
|
||||
@@ -1157,9 +1178,7 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan
|
||||
return
|
||||
}
|
||||
|
||||
for _, f := range d.onPacketSent {
|
||||
f(d, pktSize)
|
||||
}
|
||||
d.streamAllocatorBytesCounter.Add(uint32(pktSize))
|
||||
|
||||
// only the first frame will need frameEndNeeded to close out the
|
||||
// previous picture, rest are small key frames (for the video case)
|
||||
@@ -1290,8 +1309,8 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
|
||||
sendPliOnce()
|
||||
|
||||
case *rtcp.ReceiverEstimatedMaximumBitrate:
|
||||
if d.onREMB != nil {
|
||||
d.onREMB(d, p)
|
||||
if sal := d.getStreamAllocatorListener(); sal != nil {
|
||||
sal.OnREMB(d, p)
|
||||
}
|
||||
|
||||
case *rtcp.ReceiverReport:
|
||||
@@ -1331,8 +1350,10 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
|
||||
go d.retransmitPackets(nacks)
|
||||
|
||||
case *rtcp.TransportLayerCC:
|
||||
if p.MediaSSRC == d.ssrc && d.onTransportCCFeedback != nil {
|
||||
d.onTransportCCFeedback(d, p)
|
||||
if p.MediaSSRC == d.ssrc {
|
||||
if sal := d.getStreamAllocatorListener(); sal != nil {
|
||||
sal.OnTransportCCFeedback(d, p)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1461,10 +1482,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
|
||||
if _, err = d.writeStream.WriteRTP(&pkt.Header, payload); err != nil {
|
||||
d.logger.Errorw("writing rtx packet err", err)
|
||||
} else {
|
||||
pktSize := pkt.Header.MarshalSize() + len(payload)
|
||||
for _, f := range d.onPacketSent {
|
||||
f(d, pktSize)
|
||||
}
|
||||
d.streamAllocatorBytesCounter.Add(uint32(pkt.Header.MarshalSize() + len(payload)))
|
||||
|
||||
d.rtpStats.Update(&pkt.Header, len(payload), 0, time.Now().UnixNano())
|
||||
}
|
||||
|
||||
+96
-40
@@ -116,6 +116,12 @@ import (
|
||||
"github.com/livekit/protocol/logger"
|
||||
)
|
||||
|
||||
type ProberListener interface {
|
||||
OnSendProbe(bytesToSend int)
|
||||
OnProbeClusterDone(info ProbeClusterInfo)
|
||||
OnActiveChanged(isActive bool)
|
||||
}
|
||||
|
||||
type ProberParams struct {
|
||||
Logger logger.Logger
|
||||
}
|
||||
@@ -125,12 +131,14 @@ type Prober struct {
|
||||
|
||||
clusterId atomic.Uint32
|
||||
|
||||
clustersMu sync.RWMutex
|
||||
clusters deque.Deque
|
||||
activeCluster *Cluster
|
||||
clustersMu sync.RWMutex
|
||||
clusters deque.Deque
|
||||
activeCluster *Cluster
|
||||
activeStateQueue []bool
|
||||
activeStateQueueInProcess atomic.Bool
|
||||
|
||||
onSendProbe func(bytesToSend int)
|
||||
onProbeClusterDone func(info ProbeClusterInfo)
|
||||
listenerMu sync.RWMutex
|
||||
listener ProberListener
|
||||
}
|
||||
|
||||
func NewProber(params ProberParams) *Prober {
|
||||
@@ -141,6 +149,19 @@ func NewProber(params ProberParams) *Prober {
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *Prober) SetProberListener(listener ProberListener) {
|
||||
p.listenerMu.Lock()
|
||||
p.listener = listener
|
||||
p.listenerMu.Unlock()
|
||||
}
|
||||
|
||||
func (p *Prober) getProberListener() ProberListener {
|
||||
p.listenerMu.RLock()
|
||||
defer p.listenerMu.RUnlock()
|
||||
|
||||
return p.listener
|
||||
}
|
||||
|
||||
func (p *Prober) IsRunning() bool {
|
||||
p.clustersMu.RLock()
|
||||
defer p.clustersMu.RUnlock()
|
||||
@@ -161,19 +182,17 @@ func (p *Prober) Reset() {
|
||||
|
||||
p.clusters.Clear()
|
||||
p.activeCluster = nil
|
||||
|
||||
p.activeStateQueue = append(p.activeStateQueue, false)
|
||||
p.clustersMu.Unlock()
|
||||
|
||||
if p.onProbeClusterDone != nil && reset {
|
||||
p.onProbeClusterDone(info)
|
||||
if reset {
|
||||
if pl := p.getProberListener(); pl != nil {
|
||||
pl.OnProbeClusterDone(info)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Prober) OnSendProbe(f func(bytesToSend int)) {
|
||||
p.onSendProbe = f
|
||||
}
|
||||
|
||||
func (p *Prober) OnProbeClusterDone(f func(info ProbeClusterInfo)) {
|
||||
p.onProbeClusterDone = f
|
||||
p.processActiveStateQueue()
|
||||
}
|
||||
|
||||
func (p *Prober) AddCluster(desiredRateBps int, expectedRateBps int, minDuration time.Duration, maxDuration time.Duration) ProbeClusterId {
|
||||
@@ -190,13 +209,13 @@ func (p *Prober) AddCluster(desiredRateBps int, expectedRateBps int, minDuration
|
||||
return clusterId
|
||||
}
|
||||
|
||||
func (p *Prober) PacketSent(size int) {
|
||||
func (p *Prober) PacketsSent(size int) {
|
||||
cluster := p.getFrontCluster()
|
||||
if cluster == nil {
|
||||
return
|
||||
}
|
||||
|
||||
cluster.PacketSent(size)
|
||||
cluster.PacketsSent(size)
|
||||
}
|
||||
|
||||
func (p *Prober) ProbeSent(size int) {
|
||||
@@ -227,10 +246,10 @@ func (p *Prober) getFrontCluster() *Cluster {
|
||||
|
||||
func (p *Prober) popFrontCluster(cluster *Cluster) {
|
||||
p.clustersMu.Lock()
|
||||
defer p.clustersMu.Unlock()
|
||||
|
||||
if p.clusters.Len() == 0 {
|
||||
p.activeCluster = nil
|
||||
p.clustersMu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -241,28 +260,64 @@ func (p *Prober) popFrontCluster(cluster *Cluster) {
|
||||
if cluster == p.activeCluster {
|
||||
p.activeCluster = nil
|
||||
}
|
||||
|
||||
if p.clusters.Len() == 0 {
|
||||
p.activeStateQueue = append(p.activeStateQueue, false)
|
||||
}
|
||||
p.clustersMu.Unlock()
|
||||
|
||||
p.processActiveStateQueue()
|
||||
}
|
||||
|
||||
func (p *Prober) pushBackClusterAndMaybeStart(cluster *Cluster) {
|
||||
p.clustersMu.Lock()
|
||||
defer p.clustersMu.Unlock()
|
||||
|
||||
p.clusters.PushBack(cluster)
|
||||
|
||||
if p.clusters.Len() == 1 {
|
||||
p.activeStateQueue = append(p.activeStateQueue, true)
|
||||
|
||||
go p.run()
|
||||
}
|
||||
p.clustersMu.Unlock()
|
||||
|
||||
p.processActiveStateQueue()
|
||||
}
|
||||
|
||||
func (p *Prober) processActiveStateQueue() {
|
||||
if p.activeStateQueueInProcess.Swap(true) {
|
||||
// processing queue
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
p.clustersMu.Lock()
|
||||
if len(p.activeStateQueue) == 0 {
|
||||
p.clustersMu.Unlock()
|
||||
break
|
||||
}
|
||||
|
||||
isActive := p.activeStateQueue[0]
|
||||
p.activeStateQueue = p.activeStateQueue[1:]
|
||||
p.clustersMu.Unlock()
|
||||
|
||||
if pl := p.getProberListener(); pl != nil {
|
||||
pl.OnActiveChanged(isActive)
|
||||
}
|
||||
}
|
||||
|
||||
p.activeStateQueueInProcess.Store(false)
|
||||
}
|
||||
|
||||
func (p *Prober) run() {
|
||||
for {
|
||||
// determine how long to sleep
|
||||
cluster := p.getFrontCluster()
|
||||
if cluster == nil {
|
||||
return
|
||||
}
|
||||
// determine how long to sleep
|
||||
cluster := p.getFrontCluster()
|
||||
if cluster == nil {
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(cluster.GetSleepDuration())
|
||||
timer := time.NewTimer(cluster.GetSleepDuration())
|
||||
for {
|
||||
<-timer.C
|
||||
|
||||
// wake up and check for probes to send
|
||||
cluster = p.getFrontCluster()
|
||||
@@ -270,18 +325,25 @@ func (p *Prober) run() {
|
||||
return
|
||||
}
|
||||
|
||||
cluster.Process(p.onSendProbe)
|
||||
cluster.Process(p.getProberListener())
|
||||
|
||||
if cluster.IsFinished() {
|
||||
p.logger.Debugw("cluster finished", "cluster", cluster.String())
|
||||
|
||||
if p.onProbeClusterDone != nil {
|
||||
p.onProbeClusterDone(cluster.GetInfo())
|
||||
if pl := p.getProberListener(); pl != nil {
|
||||
pl.OnProbeClusterDone(cluster.GetInfo())
|
||||
}
|
||||
|
||||
p.popFrontCluster(cluster)
|
||||
continue
|
||||
}
|
||||
|
||||
// determine how long to sleep
|
||||
cluster := p.getFrontCluster()
|
||||
if cluster == nil {
|
||||
return
|
||||
}
|
||||
|
||||
timer.Reset(cluster.GetSleepDuration())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -300,12 +362,6 @@ type ProbeClusterInfo struct {
|
||||
}
|
||||
|
||||
type Cluster struct {
|
||||
// LK-TODO-START
|
||||
// Check if we can operate at cluster level without a lock.
|
||||
// The quantities that are updated in a different thread are
|
||||
// bytesSentNonProbe - maybe make this an atomic value
|
||||
// Lock contention time should be very minimal though.
|
||||
// LK-TODO-END
|
||||
lock sync.RWMutex
|
||||
|
||||
id ProbeClusterId
|
||||
@@ -354,7 +410,7 @@ func (c *Cluster) GetSleepDuration() time.Duration {
|
||||
return c.sleepDuration
|
||||
}
|
||||
|
||||
func (c *Cluster) PacketSent(size int) {
|
||||
func (c *Cluster) PacketsSent(size int) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
@@ -398,7 +454,7 @@ func (c *Cluster) GetInfo() ProbeClusterInfo {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) Process(onSendProbe func(bytesToSend int)) {
|
||||
func (c *Cluster) Process(pl ProberListener) {
|
||||
c.lock.RLock()
|
||||
|
||||
timeElapsed := time.Since(c.startTime)
|
||||
@@ -428,8 +484,8 @@ func (c *Cluster) Process(onSendProbe func(bytesToSend int)) {
|
||||
bytesShortFall = ((bytesShortFall + 274) / 275) * 275
|
||||
c.lock.RUnlock()
|
||||
|
||||
if bytesShortFall > 0 && onSendProbe != nil {
|
||||
onSendProbe(bytesShortFall)
|
||||
if bytesShortFall > 0 && pl != nil {
|
||||
pl.OnSendProbe(bytesShortFall)
|
||||
}
|
||||
|
||||
// LK-TODO look at adapting sleep time based on how many bytes and how much time is left
|
||||
|
||||
+37
-28
@@ -184,8 +184,7 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator {
|
||||
|
||||
s.resetState()
|
||||
|
||||
s.prober.OnSendProbe(s.onSendProbe)
|
||||
s.prober.OnProbeClusterDone(s.onProbeClusterDone)
|
||||
s.prober.SetProberListener(s)
|
||||
|
||||
return s
|
||||
}
|
||||
@@ -236,15 +235,11 @@ func (s *StreamAllocator) AddTrack(downTrack *DownTrack, params AddTrackParams)
|
||||
s.videoTracks[livekit.TrackID(downTrack.ID())] = track
|
||||
s.videoTracksMu.Unlock()
|
||||
|
||||
downTrack.OnREMB(s.onREMB)
|
||||
downTrack.OnTransportCCFeedback(s.onTransportCCFeedback)
|
||||
downTrack.OnAvailableLayersChanged(s.onAvailableLayersChanged)
|
||||
downTrack.OnBitrateAvailabilityChanged(s.onBitrateAvailabilityChanged)
|
||||
downTrack.OnMaxPublishedLayerChanged(s.onMaxPublishedLayerChanged)
|
||||
downTrack.OnSubscriptionChanged(s.onSubscriptionChanged)
|
||||
downTrack.OnSubscribedLayersChanged(s.onSubscribedLayersChanged)
|
||||
downTrack.OnPacketSent(s.onPacketSent)
|
||||
downTrack.OnTargetLayerFound(s.onTargetLayerFound)
|
||||
downTrack.SetStreamAllocatorListener(s)
|
||||
if s.prober.IsRunning() {
|
||||
// LK-TODO: this can be changed to adapt to probe rate
|
||||
downTrack.SetStreamAllocatorReportInterval(50 * time.Millisecond)
|
||||
}
|
||||
|
||||
s.maybePostEventAllocateTrack(downTrack)
|
||||
}
|
||||
@@ -285,7 +280,7 @@ func (s *StreamAllocator) resetState() {
|
||||
}
|
||||
|
||||
// called when a new REMB is received (receive side bandwidth estimation)
|
||||
func (s *StreamAllocator) onREMB(downTrack *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate) {
|
||||
func (s *StreamAllocator) OnREMB(downTrack *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate) {
|
||||
//
|
||||
// Channel capacity is estimated at a peer connection level. All down tracks
|
||||
// in the peer connection will end up calling this for a REMB report with
|
||||
@@ -365,7 +360,7 @@ func (s *StreamAllocator) onREMB(downTrack *DownTrack, remb *rtcp.ReceiverEstima
|
||||
}
|
||||
|
||||
// called when a new transport-cc feedback is received
|
||||
func (s *StreamAllocator) onTransportCCFeedback(downTrack *DownTrack, fb *rtcp.TransportLayerCC) {
|
||||
func (s *StreamAllocator) OnTransportCCFeedback(downTrack *DownTrack, fb *rtcp.TransportLayerCC) {
|
||||
if s.bwe != nil {
|
||||
s.bwe.WriteRTCP([]rtcp.Packet{fb}, nil)
|
||||
}
|
||||
@@ -380,27 +375,27 @@ func (s *StreamAllocator) onTargetBitrateChange(bitrate int) {
|
||||
}
|
||||
|
||||
// called when feeding track's layer availability changes
|
||||
func (s *StreamAllocator) onAvailableLayersChanged(downTrack *DownTrack) {
|
||||
func (s *StreamAllocator) OnAvailableLayersChanged(downTrack *DownTrack) {
|
||||
s.maybePostEventAllocateTrack(downTrack)
|
||||
}
|
||||
|
||||
// called when feeding track's bitrate measurement of any layer is available
|
||||
func (s *StreamAllocator) onBitrateAvailabilityChanged(downTrack *DownTrack) {
|
||||
func (s *StreamAllocator) OnBitrateAvailabilityChanged(downTrack *DownTrack) {
|
||||
s.maybePostEventAllocateTrack(downTrack)
|
||||
}
|
||||
|
||||
// called when feeding track's max publisher layer changes
|
||||
func (s *StreamAllocator) onMaxPublishedLayerChanged(downTrack *DownTrack) {
|
||||
func (s *StreamAllocator) OnMaxPublishedLayerChanged(downTrack *DownTrack) {
|
||||
s.maybePostEventAllocateTrack(downTrack)
|
||||
}
|
||||
|
||||
// called when subscription settings changes (muting/unmuting of track)
|
||||
func (s *StreamAllocator) onSubscriptionChanged(downTrack *DownTrack) {
|
||||
func (s *StreamAllocator) OnSubscriptionChanged(downTrack *DownTrack) {
|
||||
s.maybePostEventAllocateTrack(downTrack)
|
||||
}
|
||||
|
||||
// called when subscribed layers changes (limiting max layers)
|
||||
func (s *StreamAllocator) onSubscribedLayersChanged(downTrack *DownTrack, layers VideoLayers) {
|
||||
func (s *StreamAllocator) OnSubscribedLayersChanged(downTrack *DownTrack, layers VideoLayers) {
|
||||
shouldPost := false
|
||||
s.videoTracksMu.Lock()
|
||||
if track := s.videoTracks[livekit.TrackID(downTrack.ID())]; track != nil {
|
||||
@@ -418,13 +413,21 @@ func (s *StreamAllocator) onSubscribedLayersChanged(downTrack *DownTrack, layers
|
||||
}
|
||||
}
|
||||
|
||||
// called when forwarder finds a target layer
|
||||
func (s *StreamAllocator) OnTargetLayerReached(downTrack *DownTrack) {
|
||||
s.postEvent(Event{
|
||||
Signal: streamAllocatorSignalTargetLayerFound,
|
||||
TrackID: livekit.TrackID(downTrack.ID()),
|
||||
})
|
||||
}
|
||||
|
||||
// called when a video DownTrack sends a packet
|
||||
func (s *StreamAllocator) onPacketSent(downTrack *DownTrack, size int) {
|
||||
s.prober.PacketSent(size)
|
||||
func (s *StreamAllocator) OnPacketsSent(downTrack *DownTrack, size int) {
|
||||
s.prober.PacketsSent(size)
|
||||
}
|
||||
|
||||
// called when prober wants to send packet(s)
|
||||
func (s *StreamAllocator) onSendProbe(bytesToSend int) {
|
||||
func (s *StreamAllocator) OnSendProbe(bytesToSend int) {
|
||||
s.postEvent(Event{
|
||||
Signal: streamAllocatorSignalSendProbe,
|
||||
Data: bytesToSend,
|
||||
@@ -432,19 +435,23 @@ func (s *StreamAllocator) onSendProbe(bytesToSend int) {
|
||||
}
|
||||
|
||||
// called when prober wants to send packet(s)
|
||||
func (s *StreamAllocator) onProbeClusterDone(info ProbeClusterInfo) {
|
||||
func (s *StreamAllocator) OnProbeClusterDone(info ProbeClusterInfo) {
|
||||
s.postEvent(Event{
|
||||
Signal: streamAllocatorSignalProbeClusterDone,
|
||||
Data: info,
|
||||
})
|
||||
}
|
||||
|
||||
// called when forwarder finds a target layer
|
||||
func (s *StreamAllocator) onTargetLayerFound(downTrack *DownTrack) {
|
||||
s.postEvent(Event{
|
||||
Signal: streamAllocatorSignalTargetLayerFound,
|
||||
TrackID: livekit.TrackID(downTrack.ID()),
|
||||
})
|
||||
// called when prober active state changes
|
||||
func (s *StreamAllocator) OnActiveChanged(isActive bool) {
|
||||
for _, t := range s.getTracks() {
|
||||
if isActive {
|
||||
// LK-TODO: this can be changed to adapt to probe rate
|
||||
t.DownTrack().SetStreamAllocatorReportInterval(50 * time.Millisecond)
|
||||
} else {
|
||||
t.DownTrack().ClearStreamAllocatorReportInterval()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StreamAllocator) maybePostEventAllocateTrack(downTrack *DownTrack) {
|
||||
@@ -484,6 +491,8 @@ func (s *StreamAllocator) processEvents() {
|
||||
for event := range s.eventCh {
|
||||
s.handleEvent(&event)
|
||||
}
|
||||
|
||||
s.stopProbe()
|
||||
}
|
||||
|
||||
func (s *StreamAllocator) ping() {
|
||||
|
||||
Reference in New Issue
Block a user