diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 865d5563f..bd55e1128 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -39,7 +39,8 @@ type TrackSender interface { const ( RTPPaddingMaxPayloadSize = 255 RTPPaddingEstimatedHeaderSize = 20 - RTPBlankFramesSeconds = float32(0.2) + RTPBlankFramesMuteSeconds = float32(1.0) + RTPBlankFramesCloseSeconds = float32(0.2) FlagStopRTXOnPLI = true @@ -137,6 +138,8 @@ type DownTrack struct { keyFrameRequestGeneration atomic.Uint32 + blankFramesGeneration atomic.Uint32 + connectionStats *connectionquality.ConnectionStats connectionQualitySnapshotId uint32 deltaStatsSnapshotId uint32 @@ -582,8 +585,9 @@ func (d *DownTrack) Mute(muted bool) { // when muting, send a few silence frames to ensure residual noise does not // put the comfort noise generator on decoder side in a bad state where it // generates noise that is not so comfortable. + d.blankFramesGeneration.Inc() if d.kind == webrtc.RTPCodecTypeAudio && muted { - _ = d.writeBlankFrameRTP(RTPBlankFramesSeconds) + d.writeBlankFrameRTP(RTPBlankFramesMuteSeconds, d.blankFramesGeneration.Load()) } } @@ -601,22 +605,21 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.forwarder.Mute(true) // write blank frames after disabling so that other frames do not interfere. - // Idea here is to send blank 1x1 key frames to flush the decoder buffer at the remote end. + // Idea here is to send blank key frames to flush the decoder buffer at the remote end. // Otherwise, with transceiver re-use last frame from previous stream is held in the // display buffer and there could be a brief moment where the previous stream is displayed. d.logger.Infow("close down track", "peerID", d.peerID, "trackID", d.id, "flushBlankFrame", flush) if flush { - doneFlushing := make(chan struct{}) - go func() { - defer close(doneFlushing) - _ = d.writeBlankFrameRTP(RTPBlankFramesSeconds) - }() + doneFlushing := d.writeBlankFrameRTP(RTPBlankFramesCloseSeconds, d.blankFramesGeneration.Inc()) // wait a limited time to flush timer := time.NewTimer(flushTimeout) + defer timer.Stop() + select { case <-doneFlushing: case <-timer.C: + d.blankFramesGeneration.Inc() // in case flush is still running } } @@ -854,76 +857,96 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport { return d.rtpStats.GetRtcpSenderReport(d.ssrc) } -func (d *DownTrack) writeBlankFrameRTP(duration float32) error { - // don't send if nothing has been sent - if !d.rtpStats.IsActive() { - return nil - } - - // LK-TODO: Support other audio/video codecs - if d.mime != "audio/opus" && d.mime != "video/vp8" && d.mime != "video/h264" { - return nil - } - - frameRate := uint32(30) - if d.mime == "audio/opus" { - frameRate = 50 - } - - // send a number of blank frames just in case there is loss. - // Intentionally ignoring check for mute or bandwidth constrained mute - // as this is used to clear client side buffer. - numFrames := int(float32(frameRate) * duration) - snts, frameEndNeeded, err := d.forwarder.GetSnTsForBlankFrames(frameRate, numFrames) - if err != nil { - d.logger.Warnw("could not get SN/TS to write blank frames", err) - return err - } - - for i := 0; i < len(snts); i++ { - hdr := rtp.Header{ - Version: 2, - Padding: false, - Marker: true, - PayloadType: d.payloadType, - SequenceNumber: snts[i].sequenceNumber, - Timestamp: snts[i].timestamp, - SSRC: d.ssrc, - CSRC: []uint32{}, +func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan struct{} { + done := make(chan struct{}) + go func() { + // don't send if nothing has been sent + if !d.rtpStats.IsActive() { + close(done) + return } - err = d.writeRTPHeaderExtensions(&hdr) - if err != nil { - d.logger.Warnw("could not write header extensions for blank frame", err) - return err - } - - var pktSize int + var writeBlankFrame func(*rtp.Header, bool) (int, error) switch d.mime { case "audio/opus": - pktSize, err = d.writeOpusBlankFrame(&hdr, frameEndNeeded) + writeBlankFrame = d.writeOpusBlankFrame case "video/vp8": - pktSize, err = d.writeVP8BlankFrame(&hdr, frameEndNeeded) + writeBlankFrame = d.writeVP8BlankFrame case "video/h264": - pktSize, err = d.writeH264BlankFrame(&hdr, frameEndNeeded) + writeBlankFrame = d.writeH264BlankFrame default: - return nil - } - if err != nil { - d.logger.Warnw("could not write blank frame", err) - return err + close(done) + return } - for _, f := range d.onPacketSent { - f(d, pktSize) + frameRate := uint32(30) + if d.mime == "audio/opus" { + frameRate = 50 } - // only the first frame will need frameEndNeeded to close out the - // previous picture, rest are small key frames - frameEndNeeded = false - } + // send a number of blank frames just in case there is loss. + // Intentionally ignoring check for mute or bandwidth constrained mute + // as this is used to clear client side buffer. + numFrames := int(float32(frameRate) * duration) + frameDuration := time.Duration(1000/frameRate) * time.Millisecond - return nil + ticker := time.NewTicker(frameDuration) + defer ticker.Stop() + + for { + if generation != d.blankFramesGeneration.Load() || numFrames <= 0 { + close(done) + return + } + + snts, frameEndNeeded, err := d.forwarder.GetSnTsForBlankFrames(frameRate, 1) + if err != nil { + d.logger.Warnw("could not get SN/TS for blank frame", err) + close(done) + return + } + + for i := 0; i < len(snts); i++ { + hdr := rtp.Header{ + Version: 2, + Padding: false, + Marker: true, + PayloadType: d.payloadType, + SequenceNumber: snts[i].sequenceNumber, + Timestamp: snts[i].timestamp, + SSRC: d.ssrc, + CSRC: []uint32{}, + } + + err = d.writeRTPHeaderExtensions(&hdr) + if err != nil { + d.logger.Warnw("could not write header extension for blank frame", err) + close(done) + return + } + + pktSize, err := writeBlankFrame(&hdr, frameEndNeeded) + if err != nil { + d.logger.Warnw("could not write blank frame", err) + close(done) + return + } + + for _, f := range d.onPacketSent { + f(d, pktSize) + } + + // only the first frame will need frameEndNeeded to close out the + // previous picture, rest are small key frames (for the video case) + frameEndNeeded = false + } + + numFrames-- + <-ticker.C + } + }() + + return done } func (d *DownTrack) writeOpusBlankFrame(hdr *rtp.Header, frameEndNeeded bool) (int, error) { diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index deb4dede7..ae2704f5b 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -5,6 +5,7 @@ import ( "math" "strings" "sync" + "time" "github.com/pion/webrtc/v3" @@ -1300,6 +1301,12 @@ func (f *Forwarder) GetSnTsForBlankFrames(frameRate uint32, numPackets int) ([]S f.lock.Lock() defer f.lock.Unlock() + // NOTE: not using diff of current time and previous packet time (lTSCalc) as this + // driven by a timer, there might be slight differences compared to the frame rate. + // As the differences are going to be small (and also not to update RTP time stamp + // by those small differences), not doing the diff. + f.lTSCalc = time.Now().UnixNano() + frameEndNeeded := !f.rtpMunger.IsOnFrameBoundary() if frameEndNeeded { numPackets++ diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index 729a6c6d4..2e5a60b1c 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -1382,12 +1382,13 @@ func TestForwardGetSnTsForBlankFrames(t *testing.T) { _, _ = f.GetTranslationParams(extPkt, 0) // should get back frame end needed as the last packet did not have RTP marker set - snts, frameEndNeeded, err := f.GetSnTsForBlankFrames(30, 6) + numBlankFrames := 6 + snts, frameEndNeeded, err := f.GetSnTsForBlankFrames(30, numBlankFrames) require.NoError(t, err) require.True(t, frameEndNeeded) // there should be one more than RTPBlankFramesMax as one would have been allocated to end previous frame - numPadding := 7 + numPadding := numBlankFrames + 1 clockRate := testutils.TestVP8Codec.ClockRate frameRate := uint32(30) var sntsExpected = make([]SnTs, numPadding) @@ -1401,15 +1402,15 @@ func TestForwardGetSnTsForBlankFrames(t *testing.T) { // now that there is a marker, timestamp should jump on first padding when asked again // also number of padding should be RTPBlankFramesMax - numPadding = 6 + numPadding = numBlankFrames sntsExpected = sntsExpected[:numPadding] for i := 0; i < numPadding; i++ { sntsExpected[i] = SnTs{ sequenceNumber: params.SequenceNumber + uint16(len(snts)) + uint16(i) + 1, - timestamp: params.Timestamp + (uint32(i+1)*clockRate)/frameRate, + timestamp: snts[len(snts)-1].timestamp + (uint32(i+1)*clockRate)/frameRate, } } - snts, frameEndNeeded, err = f.GetSnTsForBlankFrames(30, 6) + snts, frameEndNeeded, err = f.GetSnTsForBlankFrames(30, numBlankFrames) require.NoError(t, err) require.False(t, frameEndNeeded) require.Equal(t, sntsExpected, snts) diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index a24c66745..b99c0bb9e 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -228,6 +228,9 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate r.lastSN = vals[num-1].sequenceNumber r.snOffset -= uint16(num) + r.tsOffset -= vals[num-1].timestamp - r.lastTS + r.lastTS = vals[num-1].timestamp + if forceMarker { r.lastMarker = true } diff --git a/pkg/sfu/rtpmunger_test.go b/pkg/sfu/rtpmunger_test.go index 798e03ace..b01a93717 100644 --- a/pkg/sfu/rtpmunger_test.go +++ b/pkg/sfu/rtpmunger_test.go @@ -317,7 +317,7 @@ func TestUpdateAndGetPaddingSnTs(t *testing.T) { for i := 0; i < numPadding; i++ { sntsExpected[i] = SnTs{ sequenceNumber: params.SequenceNumber + uint16(len(snts)) + uint16(i) + 1, - timestamp: params.Timestamp + (uint32(i+1)*clockRate)/frameRate, + timestamp: snts[len(snts)-1].timestamp + (uint32(i+1)*clockRate)/frameRate, } } snts, err = r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, false)