From 32d8af6da0ac85c99da15bcbfda5f1cfbf7d8529 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Wed, 16 Feb 2022 19:57:55 +0800 Subject: [PATCH] repeat request pli until first keyframe received (#436) --- pkg/sfu/downtrack.go | 26 ++++++++++++++++++++++++++ pkg/sfu/forwarder.go | 7 +++++++ 2 files changed, 33 insertions(+) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 7a47e027b..12acb75f8 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -36,6 +36,8 @@ const ( RTPPaddingMaxPayloadSize = 255 RTPPaddingEstimatedHeaderSize = 20 RTPBlankFramesMax = 6 + + firstKeyFramePLIInterval = 500 * time.Millisecond ) var ( @@ -47,6 +49,7 @@ var ( ErrNotVP8 = errors.New("not VP8") ErrOutOfOrderVP8PictureIdCacheMiss = errors.New("out-of-order VP8 picture id not found in cache") ErrFilteredVP8TemporalLayer = errors.New("filtered VP8 temporal layer") + ErrTrackAlreadyBind = errors.New("already bind") ) var ( @@ -128,6 +131,8 @@ type DownTrack struct { // update rtt onRttUpdate func(dt *DownTrack, rtt uint32) + + closed chan struct{} } // NewDownTrack returns a DownTrack. @@ -160,6 +165,7 @@ func NewDownTrack( codec: c, kind: kind, forwarder: NewForwarder(c, kind, logger), + closed: make(chan struct{}), } d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{ @@ -185,6 +191,9 @@ func NewDownTrack( // This asserts that the code requested is supported by the remote peer. // If so it sets up all the state (SSRC and PayloadType) to have a call func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error) { + if d.bound.get() { + return webrtc.RTPCodecParameters{}, ErrTrackAlreadyBind + } parameters := webrtc.RTPCodecParameters{RTPCodecCapability: d.codec} if codec, err := codecParametersFuzzySearch(parameters, t.CodecParameters()); err == nil { d.ssrc = uint32(t.SSRC()) @@ -203,6 +212,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, d.onBind() } d.bound.set(true) + go d.requestFirstKeyframe() return codec, nil } return webrtc.RTPCodecParameters{}, webrtc.ErrUnsupportedCodec @@ -259,6 +269,21 @@ func (d *DownTrack) SetTransceiver(transceiver *webrtc.RTPTransceiver) { d.transceiver = transceiver } +func (d *DownTrack) requestFirstKeyframe() { + ticker := time.NewTicker(firstKeyFramePLIInterval) + for !d.forwarder.ReceivedFirstKeyFrame() { + select { + case <-d.closed: + return + + case <-ticker.C: + if l := d.forwarder.TargetLayers(); l != InvalidLayers { + d.receiver.SendPLI(l.spatial) + } + } + } +} + // WriteRTP writes an RTP Packet to the DownTrack func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { var pool *[]byte @@ -492,6 +517,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) { if d.onCloseHandler != nil { d.onCloseHandler() } + close(d.closed) }) } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 6afd88958..974d5efb0 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -186,6 +186,8 @@ type Forwarder struct { rtpMunger *RTPMunger vp8Munger *VP8Munger + + receivedFirstKeyFrame atomicBool } func NewForwarder(codec webrtc.RTPCodecCapability, kind webrtc.RTPCodecType, logger logger.Logger) *Forwarder { @@ -1067,6 +1069,10 @@ func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32) return nil, ErrUnknownKind } +func (f *Forwarder) ReceivedFirstKeyFrame() bool { + return f.receivedFirstKeyFrame.get() +} + // should be called with lock held func (f *Forwarder) getTranslationParamsAudio(extPkt *buffer.ExtPacket) (*TranslationParams, error) { if f.lastSSRC != extPkt.Packet.SSRC { @@ -1127,6 +1133,7 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in if f.currentLayers.spatial == f.maxLayers.spatial { tp.isSwitchingToMaxLayer = true } + f.receivedFirstKeyFrame.set(true) } else { tp.shouldSendPLI = true }