Do server PLI when sync is required. (#2197)

* Do server PLI when sync is required.

A few changes
- Run key frame requester goroutine always. Runs every 200 ms which is
  not bad.
- Post a key frame request when server knows it needs one, like after an
  allocation. This ensures that the initial request is not delayed.
- Periodic check will ensure PLI for cases like all frame chains of a
  dependency descriptor being broken.

* simplify
This commit is contained in:
Raja Subramanian
2023-10-27 15:16:39 +05:30
committed by GitHub
parent 3633dfe39e
commit a64bd23b6d
3 changed files with 77 additions and 59 deletions

View File

@@ -246,8 +246,6 @@ type DownTrack struct {
totalRepeatedNACKs atomic.Uint32
keyFrameRequestGeneration atomic.Uint32
blankFramesGeneration atomic.Uint32
connectionStats *connectionquality.ConnectionStats
@@ -273,6 +271,10 @@ type DownTrack struct {
maxLayerNotifierCh chan struct{}
maxLayerNotifierChClosed bool
keyFrameRequesterChMu sync.RWMutex
keyFrameRequesterCh chan struct{}
keyFrameRequesterChClosed bool
cbMu sync.RWMutex
onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat)
onMaxSubscribedLayerChanged func(dt *DownTrack, layer int32)
@@ -294,13 +296,14 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) {
}
d := &DownTrack{
params: params,
id: params.Receiver.TrackID(),
upstreamCodecs: codecs,
kind: kind,
codec: codecs[0].RTPCodecCapability,
pacer: params.Pacer,
maxLayerNotifierCh: make(chan struct{}, 1),
params: params,
id: params.Receiver.TrackID(),
upstreamCodecs: codecs,
kind: kind,
codec: codecs[0].RTPCodecCapability,
pacer: params.Pacer,
maxLayerNotifierCh: make(chan struct{}, 1),
keyFrameRequesterCh: make(chan struct{}, 1),
}
d.forwarder = NewForwarder(
d.kind,
@@ -346,6 +349,7 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) {
}
if d.kind == webrtc.RTPCodecTypeVideo {
go d.maxLayerNotifierWorker()
go d.keyFrameRequester()
}
return d, nil
@@ -584,57 +588,58 @@ func (d *DownTrack) GetTransceiver() *webrtc.RTPTransceiver {
return d.transceiver.Load()
}
func (d *DownTrack) maybeStartKeyFrameRequester() {
//
// Always move to next generation to abandon any running key frame requester
// This ensures that it is stopped if forwarding is disabled due to mute
// or paused due to bandwidth constraints. A new key frame requester is
// started if a layer lock is required.
//
d.stopKeyFrameRequester()
locked, layer := d.forwarder.CheckSync()
if !locked {
go d.keyFrameRequester(d.keyFrameRequestGeneration.Load(), layer)
}
}
func (d *DownTrack) stopKeyFrameRequester() {
d.keyFrameRequestGeneration.Inc()
}
func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) {
if d.IsClosed() || layer == buffer.InvalidLayerSpatial {
func (d *DownTrack) postKeyFrameRequestEvent() {
if d.kind != webrtc.RTPCodecTypeVideo {
return
}
interval := 2 * d.rtpStats.GetRtt()
if interval < keyFrameIntervalMin {
interval = keyFrameIntervalMin
d.keyFrameRequesterChMu.RLock()
if !d.keyFrameRequesterChClosed {
select {
case d.keyFrameRequesterCh <- struct{}{}:
default:
}
}
if interval > keyFrameIntervalMax {
interval = keyFrameIntervalMax
d.keyFrameRequesterChMu.RUnlock()
}
func (d *DownTrack) keyFrameRequester() {
getInterval := func() time.Duration {
interval := 2 * d.rtpStats.GetRtt()
if interval < keyFrameIntervalMin {
interval = keyFrameIntervalMin
}
if interval > keyFrameIntervalMax {
interval = keyFrameIntervalMax
}
return time.Duration(interval) * time.Millisecond
}
ticker := time.NewTicker(time.Duration(interval) * time.Millisecond)
interval := getInterval()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
locked, _ := d.forwarder.CheckSync()
if locked {
if d.IsClosed() {
return
}
if d.writable.Load() {
d.params.Logger.Debugw("sending PLI for layer lock", "generation", generation, "layer", layer)
select {
case _, more := <-d.keyFrameRequesterCh:
if !more {
return
}
case <-ticker.C:
}
locked, layer := d.forwarder.CheckSync()
if !locked && layer != buffer.InvalidLayerSpatial && d.writable.Load() {
d.params.Logger.Debugw("sending PLI for layer lock", "layer", layer)
d.params.Receiver.SendPLI(layer, false)
d.rtpStats.UpdateLayerLockPliAndTime(1)
}
<-ticker.C
if generation != d.keyFrameRequestGeneration.Load() || !d.writable.Load() {
return
}
ticker.Reset(getInterval())
}
}
@@ -986,11 +991,15 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
close(d.maxLayerNotifierCh)
d.maxLayerNotifierChMu.Unlock()
d.keyFrameRequesterChMu.Lock()
d.keyFrameRequesterChClosed = true
close(d.keyFrameRequesterCh)
d.keyFrameRequesterChMu.Unlock()
if onCloseHandler := d.getOnCloseHandler(); onCloseHandler != nil {
onCloseHandler(!flush)
}
d.stopKeyFrameRequester()
d.ClearStreamAllocatorReportInterval()
}
@@ -1174,7 +1183,7 @@ func (d *DownTrack) DistanceToDesired() float64 {
func (d *DownTrack) AllocateOptimal(allowOvershoot bool) VideoAllocation {
al, brs := d.params.Receiver.GetLayeredBitrate()
allocation := d.forwarder.AllocateOptimal(al, brs, allowOvershoot)
d.maybeStartKeyFrameRequester()
d.postKeyFrameRequestEvent()
d.maybeAddTransition(allocation.BandwidthNeeded, allocation.DistanceToDesired, allocation.PauseReason)
return allocation
}
@@ -1216,7 +1225,7 @@ func (d *DownTrack) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti
func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation {
allocation := d.forwarder.ProvisionalAllocateCommit()
d.maybeStartKeyFrameRequester()
d.postKeyFrameRequestEvent()
d.maybeAddTransition(allocation.BandwidthNeeded, allocation.DistanceToDesired, allocation.PauseReason)
return allocation
}
@@ -1224,7 +1233,7 @@ func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation {
func (d *DownTrack) AllocateNextHigher(availableChannelCapacity int64, allowOvershoot bool) (VideoAllocation, bool) {
al, brs := d.params.Receiver.GetLayeredBitrate()
allocation, available := d.forwarder.AllocateNextHigher(availableChannelCapacity, al, brs, allowOvershoot)
d.maybeStartKeyFrameRequester()
d.postKeyFrameRequestEvent()
d.maybeAddTransition(allocation.BandwidthNeeded, allocation.DistanceToDesired, allocation.PauseReason)
return allocation, available
}
@@ -1245,7 +1254,6 @@ func (d *DownTrack) GetNextHigherTransition(allowOvershoot bool) (VideoTransitio
func (d *DownTrack) Pause() VideoAllocation {
al, brs := d.params.Receiver.GetLayeredBitrate()
allocation := d.forwarder.Pause(al, brs)
d.maybeStartKeyFrameRequester()
d.maybeAddTransition(allocation.BandwidthNeeded, allocation.DistanceToDesired, allocation.PauseReason)
return allocation
}
@@ -1464,10 +1472,9 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
pliOnce := true
sendPliOnce := func() {
_, layer := d.forwarder.CheckSync()
isAnyMuted := d.forwarder.IsAnyMuted()
d.params.Logger.Debugw("received PLI/FIR RTCP", "layer", layer, "isAnyMuted", isAnyMuted)
d.params.Logger.Debugw("received PLI/FIR RTCP", "layer", layer)
if pliOnce {
if layer != buffer.InvalidLayerSpatial && !isAnyMuted {
if layer != buffer.InvalidLayerSpatial {
d.params.Logger.Debugw("sending PLI RTCP", "layer", layer)
d.params.Receiver.SendPLI(layer, false)
d.isNACKThrottled.Store(true)
@@ -1796,10 +1803,6 @@ func (d *DownTrack) GetAndResetBytesSent() (uint32, uint32) {
func (d *DownTrack) onBindAndConnectedChange() {
d.writable.Store(d.connected.Load() && d.bound.Load())
if d.connected.Load() && d.bound.Load() && !d.bindAndConnectedOnce.Swap(true) {
if d.kind == webrtc.RTPCodecTypeVideo {
d.maybeStartKeyFrameRequester()
}
if d.activePaddingOnMuteUpTrack.Load() {
go d.sendPaddingOnMute()
}

View File

@@ -1374,7 +1374,11 @@ func (f *Forwarder) updateAllocation(alloc VideoAllocation, reason string) Video
func (f *Forwarder) setTargetLayer(targetLayer buffer.VideoLayer, requestLayerSpatial int32) {
f.vls.SetTarget(targetLayer)
f.vls.SetRequestSpatial(requestLayerSpatial)
if targetLayer.IsValid() {
f.vls.SetRequestSpatial(requestLayerSpatial)
} else {
f.vls.SetRequestSpatial(buffer.InvalidLayerSpatial)
}
}
func (f *Forwarder) Resync() {
@@ -1392,7 +1396,7 @@ func (f *Forwarder) resyncLocked() {
}
}
func (f *Forwarder) CheckSync() (locked bool, layer int32) {
func (f *Forwarder) CheckSync() (bool, int32) {
f.lock.RLock()
defer f.lock.RUnlock()

View File

@@ -332,6 +332,17 @@ func (d *DependencyDescriptor) CheckSync() (locked bool, layer int32) {
return false, layer
}
allBroken := true
for _, chain := range d.chains {
if !chain.Broken() {
allBroken = false
break
}
}
if allBroken {
return false, layer
}
d.decodeTargetsLock.RLock()
defer d.decodeTargetsLock.RUnlock()
for _, dt := range d.decodeTargets {