mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 19:55:41 +00:00
Add a key frame seeder in up track. (#3524)
This commit is contained in:
@@ -306,6 +306,10 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
|
||||
}
|
||||
b.pPackets = nil
|
||||
b.bound = true
|
||||
|
||||
if mime.IsMimeTypeVideo(b.mime) {
|
||||
go b.seedKeyFrame()
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Buffer) OnCodecChange(fn func(webrtc.RTPCodecParameters)) {
|
||||
@@ -499,14 +503,18 @@ func (b *Buffer) Close() error {
|
||||
b.closeOnce.Do(func() {
|
||||
b.closed.Store(true)
|
||||
|
||||
if b.rtpStats != nil {
|
||||
b.rtpStats.Stop()
|
||||
b.RLock()
|
||||
rtpStats := b.rtpStats
|
||||
b.RUnlock()
|
||||
|
||||
if rtpStats != nil {
|
||||
rtpStats.Stop()
|
||||
b.logger.Debugw("rtp stats",
|
||||
"direction", "upstream",
|
||||
"stats", b.rtpStats,
|
||||
"stats", rtpStats,
|
||||
)
|
||||
if cb := b.getOnFinalRtpStats(); cb != nil {
|
||||
cb(b.rtpStats.ToProto())
|
||||
cb(rtpStats.ToProto())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -800,6 +808,10 @@ func (b *Buffer) handleCodecChange(newPT uint8) {
|
||||
if f := b.onCodecChange; f != nil {
|
||||
go f(newCodec)
|
||||
}
|
||||
|
||||
if mime.IsMimeTypeVideo(b.mime) {
|
||||
go b.seedKeyFrame()
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime int64) rtpstats.RTPFlowState {
|
||||
@@ -1227,4 +1239,51 @@ func (b *Buffer) GetTemporalLayerFpsForSpatial(layer int32) []float32 {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Buffer) seedKeyFrame() {
|
||||
// a key frame is needed especially when using Dependency Descriptor
|
||||
// to get the DD structure which is used in parsing subsequent packets,
|
||||
// till then packets are dropped which results in stream tracker not
|
||||
// getting any data which means it does not declare layer start.
|
||||
//
|
||||
// send gratuitous PLIs for some time or until a key frame is seen to
|
||||
// get the engine rolling
|
||||
b.logger.Debugw("starting key frame seeder")
|
||||
timer := time.NewTimer(time.Minute)
|
||||
defer timer.Stop()
|
||||
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
if b.closed.Load() {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-timer.C:
|
||||
b.logger.Infow("stopping key frame seeder: timeout")
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
b.RLock()
|
||||
rtpStats := b.rtpStats
|
||||
b.RUnlock()
|
||||
|
||||
if rtpStats != nil {
|
||||
cnt, last := rtpStats.KeyFrame()
|
||||
if cnt > 0 {
|
||||
b.logger.Debugw(
|
||||
"stopping key frame seeder: received key frame",
|
||||
"keyFrameCount", cnt,
|
||||
"lastKeyFrame", last,
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
b.SendPLI(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
@@ -1563,18 +1563,7 @@ func (f *Forwarder) CheckSync() (bool, int32) {
|
||||
f.lock.RLock()
|
||||
defer f.lock.RUnlock()
|
||||
|
||||
locked, layer := f.vls.CheckSync()
|
||||
if !locked {
|
||||
return locked, layer
|
||||
}
|
||||
|
||||
// max published layer (as seen by this forwarder) could be
|
||||
// lower than max subscribed, mark de-synced if not deficient
|
||||
if !f.isDeficientLocked() && f.vls.GetMax().Spatial > f.vls.GetTarget().Spatial {
|
||||
return false, layer
|
||||
}
|
||||
|
||||
return true, layer
|
||||
return f.vls.CheckSync()
|
||||
}
|
||||
|
||||
func (f *Forwarder) Restart() {
|
||||
|
||||
@@ -745,12 +745,6 @@ func (w *WebRTCReceiver) forwardRTP(layer int32, buff *buffer.Buffer) {
|
||||
numPacketsDropped := 0
|
||||
defer func() {
|
||||
w.closeOnce.Do(func() {
|
||||
w.logger.Debugw(
|
||||
"closing forwarder",
|
||||
"layer", layer,
|
||||
"numPacketsForwarded", numPacketsForwarded,
|
||||
"numPacketsDropped", numPacketsDropped,
|
||||
)
|
||||
w.closed.Store(true)
|
||||
w.closeTracks()
|
||||
if rt := w.redTransformer.Load(); rt != nil {
|
||||
@@ -762,6 +756,13 @@ func (w *WebRTCReceiver) forwardRTP(layer int32, buff *buffer.Buffer) {
|
||||
if w.isSVC {
|
||||
w.streamTrackerManager.RemoveAllTrackers()
|
||||
}
|
||||
|
||||
w.logger.Debugw(
|
||||
"closing forwarder",
|
||||
"layer", layer,
|
||||
"numPacketsForwarded", numPacketsForwarded,
|
||||
"numPacketsDropped", numPacketsDropped,
|
||||
)
|
||||
}()
|
||||
|
||||
var spatialTrackers [buffer.DefaultMaxLayerSpatial + 1]streamtracker.StreamTrackerWorker
|
||||
|
||||
@@ -346,6 +346,13 @@ func (r *rtpStatsBase) UpdateKeyFrame(kfCount uint32) {
|
||||
r.lastKeyFrame = time.Now()
|
||||
}
|
||||
|
||||
func (r *rtpStatsBase) KeyFrame() (uint32, time.Time) {
|
||||
r.lock.RLock()
|
||||
defer r.lock.RUnlock()
|
||||
|
||||
return r.keyFrames, r.lastKeyFrame
|
||||
}
|
||||
|
||||
func (r *rtpStatsBase) UpdateRtt(rtt uint32) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
Reference in New Issue
Block a user