diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index a6498dcd4..07d7f45fa 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -246,8 +246,6 @@ type DownTrack struct { totalRepeatedNACKs atomic.Uint32 - keyFrameRequestGeneration atomic.Uint32 - blankFramesGeneration atomic.Uint32 connectionStats *connectionquality.ConnectionStats @@ -273,6 +271,10 @@ type DownTrack struct { maxLayerNotifierCh chan struct{} maxLayerNotifierChClosed bool + keyFrameRequesterChMu sync.RWMutex + keyFrameRequesterCh chan struct{} + keyFrameRequesterChClosed bool + cbMu sync.RWMutex onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat) onMaxSubscribedLayerChanged func(dt *DownTrack, layer int32) @@ -294,13 +296,14 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) { } d := &DownTrack{ - params: params, - id: params.Receiver.TrackID(), - upstreamCodecs: codecs, - kind: kind, - codec: codecs[0].RTPCodecCapability, - pacer: params.Pacer, - maxLayerNotifierCh: make(chan struct{}, 1), + params: params, + id: params.Receiver.TrackID(), + upstreamCodecs: codecs, + kind: kind, + codec: codecs[0].RTPCodecCapability, + pacer: params.Pacer, + maxLayerNotifierCh: make(chan struct{}, 1), + keyFrameRequesterCh: make(chan struct{}, 1), } d.forwarder = NewForwarder( d.kind, @@ -346,6 +349,7 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) { } if d.kind == webrtc.RTPCodecTypeVideo { go d.maxLayerNotifierWorker() + go d.keyFrameRequester() } return d, nil @@ -584,57 +588,58 @@ func (d *DownTrack) GetTransceiver() *webrtc.RTPTransceiver { return d.transceiver.Load() } -func (d *DownTrack) maybeStartKeyFrameRequester() { - // - // Always move to next generation to abandon any running key frame requester - // This ensures that it is stopped if forwarding is disabled due to mute - // or paused due to bandwidth constraints. A new key frame requester is - // started if a layer lock is required. - // - d.stopKeyFrameRequester() - - locked, layer := d.forwarder.CheckSync() - if !locked { - go d.keyFrameRequester(d.keyFrameRequestGeneration.Load(), layer) - } -} - -func (d *DownTrack) stopKeyFrameRequester() { - d.keyFrameRequestGeneration.Inc() -} - -func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) { - if d.IsClosed() || layer == buffer.InvalidLayerSpatial { +func (d *DownTrack) postKeyFrameRequestEvent() { + if d.kind != webrtc.RTPCodecTypeVideo { return } - interval := 2 * d.rtpStats.GetRtt() - if interval < keyFrameIntervalMin { - interval = keyFrameIntervalMin + d.keyFrameRequesterChMu.RLock() + if !d.keyFrameRequesterChClosed { + select { + case d.keyFrameRequesterCh <- struct{}{}: + default: + } } - if interval > keyFrameIntervalMax { - interval = keyFrameIntervalMax + d.keyFrameRequesterChMu.RUnlock() +} + +func (d *DownTrack) keyFrameRequester() { + getInterval := func() time.Duration { + interval := 2 * d.rtpStats.GetRtt() + if interval < keyFrameIntervalMin { + interval = keyFrameIntervalMin + } + if interval > keyFrameIntervalMax { + interval = keyFrameIntervalMax + } + return time.Duration(interval) * time.Millisecond } - ticker := time.NewTicker(time.Duration(interval) * time.Millisecond) + + interval := getInterval() + ticker := time.NewTicker(interval) defer ticker.Stop() for { - locked, _ := d.forwarder.CheckSync() - if locked { + if d.IsClosed() { return } - if d.writable.Load() { - d.params.Logger.Debugw("sending PLI for layer lock", "generation", generation, "layer", layer) + select { + case _, more := <-d.keyFrameRequesterCh: + if !more { + return + } + case <-ticker.C: + } + + locked, layer := d.forwarder.CheckSync() + if !locked && layer != buffer.InvalidLayerSpatial && d.writable.Load() { + d.params.Logger.Debugw("sending PLI for layer lock", "layer", layer) d.params.Receiver.SendPLI(layer, false) d.rtpStats.UpdateLayerLockPliAndTime(1) } - <-ticker.C - - if generation != d.keyFrameRequestGeneration.Load() || !d.writable.Load() { - return - } + ticker.Reset(getInterval()) } } @@ -986,11 +991,15 @@ func (d *DownTrack) CloseWithFlush(flush bool) { close(d.maxLayerNotifierCh) d.maxLayerNotifierChMu.Unlock() + d.keyFrameRequesterChMu.Lock() + d.keyFrameRequesterChClosed = true + close(d.keyFrameRequesterCh) + d.keyFrameRequesterChMu.Unlock() + if onCloseHandler := d.getOnCloseHandler(); onCloseHandler != nil { onCloseHandler(!flush) } - d.stopKeyFrameRequester() d.ClearStreamAllocatorReportInterval() } @@ -1174,7 +1183,7 @@ func (d *DownTrack) DistanceToDesired() float64 { func (d *DownTrack) AllocateOptimal(allowOvershoot bool) VideoAllocation { al, brs := d.params.Receiver.GetLayeredBitrate() allocation := d.forwarder.AllocateOptimal(al, brs, allowOvershoot) - d.maybeStartKeyFrameRequester() + d.postKeyFrameRequestEvent() d.maybeAddTransition(allocation.BandwidthNeeded, allocation.DistanceToDesired, allocation.PauseReason) return allocation } @@ -1216,7 +1225,7 @@ func (d *DownTrack) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation { allocation := d.forwarder.ProvisionalAllocateCommit() - d.maybeStartKeyFrameRequester() + d.postKeyFrameRequestEvent() d.maybeAddTransition(allocation.BandwidthNeeded, allocation.DistanceToDesired, allocation.PauseReason) return allocation } @@ -1224,7 +1233,7 @@ func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation { func (d *DownTrack) AllocateNextHigher(availableChannelCapacity int64, allowOvershoot bool) (VideoAllocation, bool) { al, brs := d.params.Receiver.GetLayeredBitrate() allocation, available := d.forwarder.AllocateNextHigher(availableChannelCapacity, al, brs, allowOvershoot) - d.maybeStartKeyFrameRequester() + d.postKeyFrameRequestEvent() d.maybeAddTransition(allocation.BandwidthNeeded, allocation.DistanceToDesired, allocation.PauseReason) return allocation, available } @@ -1245,7 +1254,6 @@ func (d *DownTrack) GetNextHigherTransition(allowOvershoot bool) (VideoTransitio func (d *DownTrack) Pause() VideoAllocation { al, brs := d.params.Receiver.GetLayeredBitrate() allocation := d.forwarder.Pause(al, brs) - d.maybeStartKeyFrameRequester() d.maybeAddTransition(allocation.BandwidthNeeded, allocation.DistanceToDesired, allocation.PauseReason) return allocation } @@ -1464,10 +1472,9 @@ func (d *DownTrack) handleRTCP(bytes []byte) { pliOnce := true sendPliOnce := func() { _, layer := d.forwarder.CheckSync() - isAnyMuted := d.forwarder.IsAnyMuted() - d.params.Logger.Debugw("received PLI/FIR RTCP", "layer", layer, "isAnyMuted", isAnyMuted) + d.params.Logger.Debugw("received PLI/FIR RTCP", "layer", layer) if pliOnce { - if layer != buffer.InvalidLayerSpatial && !isAnyMuted { + if layer != buffer.InvalidLayerSpatial { d.params.Logger.Debugw("sending PLI RTCP", "layer", layer) d.params.Receiver.SendPLI(layer, false) d.isNACKThrottled.Store(true) @@ -1796,10 +1803,6 @@ func (d *DownTrack) GetAndResetBytesSent() (uint32, uint32) { func (d *DownTrack) onBindAndConnectedChange() { d.writable.Store(d.connected.Load() && d.bound.Load()) if d.connected.Load() && d.bound.Load() && !d.bindAndConnectedOnce.Swap(true) { - if d.kind == webrtc.RTPCodecTypeVideo { - d.maybeStartKeyFrameRequester() - } - if d.activePaddingOnMuteUpTrack.Load() { go d.sendPaddingOnMute() } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 49c955291..e0ac55742 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1374,7 +1374,11 @@ func (f *Forwarder) updateAllocation(alloc VideoAllocation, reason string) Video func (f *Forwarder) setTargetLayer(targetLayer buffer.VideoLayer, requestLayerSpatial int32) { f.vls.SetTarget(targetLayer) - f.vls.SetRequestSpatial(requestLayerSpatial) + if targetLayer.IsValid() { + f.vls.SetRequestSpatial(requestLayerSpatial) + } else { + f.vls.SetRequestSpatial(buffer.InvalidLayerSpatial) + } } func (f *Forwarder) Resync() { @@ -1392,7 +1396,7 @@ func (f *Forwarder) resyncLocked() { } } -func (f *Forwarder) CheckSync() (locked bool, layer int32) { +func (f *Forwarder) CheckSync() (bool, int32) { f.lock.RLock() defer f.lock.RUnlock() diff --git a/pkg/sfu/videolayerselector/dependencydescriptor.go b/pkg/sfu/videolayerselector/dependencydescriptor.go index 1f8a9390b..1fc8e5f31 100644 --- a/pkg/sfu/videolayerselector/dependencydescriptor.go +++ b/pkg/sfu/videolayerselector/dependencydescriptor.go @@ -332,6 +332,17 @@ func (d *DependencyDescriptor) CheckSync() (locked bool, layer int32) { return false, layer } + allBroken := true + for _, chain := range d.chains { + if !chain.Broken() { + allBroken = false + break + } + } + if allBroken { + return false, layer + } + d.decodeTargetsLock.RLock() defer d.decodeTargetsLock.RUnlock() for _, dt := range d.decodeTargets {