From c823320528ca26dcd5c8e3c74ef0f52ddf69a2f3 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 12 Mar 2025 22:11:27 +0530 Subject: [PATCH] Add a key frame seeder in up track. (#3524) --- pkg/sfu/buffer/buffer.go | 67 +++++++++++++++++++++++++++++-- pkg/sfu/forwarder.go | 13 +----- pkg/sfu/receiver.go | 13 +++--- pkg/sfu/rtpstats/rtpstats_base.go | 7 ++++ 4 files changed, 78 insertions(+), 22 deletions(-) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index f0a94d202..e9bc9455b 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -306,6 +306,10 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili } b.pPackets = nil b.bound = true + + if mime.IsMimeTypeVideo(b.mime) { + go b.seedKeyFrame() + } } func (b *Buffer) OnCodecChange(fn func(webrtc.RTPCodecParameters)) { @@ -499,14 +503,18 @@ func (b *Buffer) Close() error { b.closeOnce.Do(func() { b.closed.Store(true) - if b.rtpStats != nil { - b.rtpStats.Stop() + b.RLock() + rtpStats := b.rtpStats + b.RUnlock() + + if rtpStats != nil { + rtpStats.Stop() b.logger.Debugw("rtp stats", "direction", "upstream", - "stats", b.rtpStats, + "stats", rtpStats, ) if cb := b.getOnFinalRtpStats(); cb != nil { - cb(b.rtpStats.ToProto()) + cb(rtpStats.ToProto()) } } @@ -800,6 +808,10 @@ func (b *Buffer) handleCodecChange(newPT uint8) { if f := b.onCodecChange; f != nil { go f(newCodec) } + + if mime.IsMimeTypeVideo(b.mime) { + go b.seedKeyFrame() + } } func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime int64) rtpstats.RTPFlowState { @@ -1227,4 +1239,51 @@ func (b *Buffer) GetTemporalLayerFpsForSpatial(layer int32) []float32 { return nil } +func (b *Buffer) seedKeyFrame() { + // a key frame is needed especially when using Dependency Descriptor + // to get the DD structure which is used in parsing subsequent packets, + // till then packets are dropped which results in stream tracker not + // getting any data which means it does not declare layer start. + // + // send gratuitous PLIs for some time or until a key frame is seen to + // get the engine rolling + b.logger.Debugw("starting key frame seeder") + timer := time.NewTimer(time.Minute) + defer timer.Stop() + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + if b.closed.Load() { + return + } + + select { + case <-timer.C: + b.logger.Infow("stopping key frame seeder: timeout") + return + + case <-ticker.C: + b.RLock() + rtpStats := b.rtpStats + b.RUnlock() + + if rtpStats != nil { + cnt, last := rtpStats.KeyFrame() + if cnt > 0 { + b.logger.Debugw( + "stopping key frame seeder: received key frame", + "keyFrameCount", cnt, + "lastKeyFrame", last, + ) + return + } + + b.SendPLI(true) + } + } + } +} + // --------------------------------------------------------------- diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 9aa39fece..5251e7e90 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1563,18 +1563,7 @@ func (f *Forwarder) CheckSync() (bool, int32) { f.lock.RLock() defer f.lock.RUnlock() - locked, layer := f.vls.CheckSync() - if !locked { - return locked, layer - } - - // max published layer (as seen by this forwarder) could be - // lower than max subscribed, mark de-synced if not deficient - if !f.isDeficientLocked() && f.vls.GetMax().Spatial > f.vls.GetTarget().Spatial { - return false, layer - } - - return true, layer + return f.vls.CheckSync() } func (f *Forwarder) Restart() { diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 44312454a..3f4fd9eff 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -745,12 +745,6 @@ func (w *WebRTCReceiver) forwardRTP(layer int32, buff *buffer.Buffer) { numPacketsDropped := 0 defer func() { w.closeOnce.Do(func() { - w.logger.Debugw( - "closing forwarder", - "layer", layer, - "numPacketsForwarded", numPacketsForwarded, - "numPacketsDropped", numPacketsDropped, - ) w.closed.Store(true) w.closeTracks() if rt := w.redTransformer.Load(); rt != nil { @@ -762,6 +756,13 @@ func (w *WebRTCReceiver) forwardRTP(layer int32, buff *buffer.Buffer) { if w.isSVC { w.streamTrackerManager.RemoveAllTrackers() } + + w.logger.Debugw( + "closing forwarder", + "layer", layer, + "numPacketsForwarded", numPacketsForwarded, + "numPacketsDropped", numPacketsDropped, + ) }() var spatialTrackers [buffer.DefaultMaxLayerSpatial + 1]streamtracker.StreamTrackerWorker diff --git a/pkg/sfu/rtpstats/rtpstats_base.go b/pkg/sfu/rtpstats/rtpstats_base.go index d8f5c8eed..8844581e6 100644 --- a/pkg/sfu/rtpstats/rtpstats_base.go +++ b/pkg/sfu/rtpstats/rtpstats_base.go @@ -346,6 +346,13 @@ func (r *rtpStatsBase) UpdateKeyFrame(kfCount uint32) { r.lastKeyFrame = time.Now() } +func (r *rtpStatsBase) KeyFrame() (uint32, time.Time) { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.keyFrames, r.lastKeyFrame +} + func (r *rtpStatsBase) UpdateRtt(rtt uint32) { r.lock.Lock() defer r.lock.Unlock()