From ea57e4f2c1edf5f6dae1df48d63ea5ee4d991351 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sun, 28 May 2023 10:05:35 +0530 Subject: [PATCH] Ignore receiver reports that have a sequence number before first packet. (#1745) --- pkg/sfu/buffer/rtpstats.go | 10 ++++++---- pkg/sfu/downtrack.go | 8 ++++---- pkg/sfu/forwarder.go | 3 +-- pkg/sfu/forwarder_test.go | 4 ++-- pkg/sfu/streamallocator/track.go | 2 +- 5 files changed, 14 insertions(+), 13 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index fda74a257..87a403bfa 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -539,7 +539,9 @@ func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32 r.lock.Lock() defer r.lock.Unlock() - if !r.endTime.IsZero() || !r.params.IsReceiverReportDriven { + if !r.endTime.IsZero() || !r.params.IsReceiverReportDriven || rr.LastSequenceNumber < r.extStartSN { + // it is possible that the `LastSequenceNumber` in the receiver report is before the starting + // sequence number when dummy packets are used to trigger Pion's OnTrack path. return } @@ -1107,7 +1109,7 @@ func (r *RTPStats) DeltaInfoOverridden(snapshotId uint32) *RTPDeltaInfo { packetsExpected := now.extStartSNOverridden - then.extStartSNOverridden if packetsExpected > NumSequenceNumbers { r.logger.Warnw( - "too many packets expected in delta", + "too many packets expected in delta (overridden)", fmt.Errorf("start: %d, end: %d, expected: %d", then.extStartSNOverridden, now.extStartSNOverridden, packetsExpected), ) return nil @@ -1558,7 +1560,7 @@ func (r *RTPStats) updateGapHistogram(gap int) { } func (r *RTPStats) getAndResetSnapshot(snapshotId uint32, override bool) (*Snapshot, *Snapshot) { - if !r.initialized || (r.params.IsReceiverReportDriven && r.lastRRTime.IsZero()) { + if !r.initialized || (override && r.lastRRTime.IsZero()) { return nil, nil } @@ -1573,7 +1575,7 @@ func (r *RTPStats) getAndResetSnapshot(snapshotId uint32, override bool) (*Snaps } var startTime time.Time - if override && r.params.IsReceiverReportDriven { + if override { startTime = r.lastRRTime } else { startTime = time.Now() diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 8fc30d542..0e8e12dbe 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -652,7 +652,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { // WritePaddingRTP tries to write as many padding only RTP packets as necessary // to satisfy given size to the DownTrack -func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool) int { +func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMarker bool) int { if !d.rtpStats.IsActive() && !paddingOnMute { return 0 } @@ -684,7 +684,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool) int { return 0 } - snts, err := d.forwarder.GetSnTsForPadding(num) + snts, err := d.forwarder.GetSnTsForPadding(num, forceMarker) if err != nil { return 0 } @@ -1699,10 +1699,10 @@ func (d *DownTrack) onBindAndConnected() { } func (d *DownTrack) sendPaddingOnMute() { - d.logger.Debugw("sending padding on mute") // let uptrack have chance to send packet before we send padding time.Sleep(waitBeforeSendPaddingOnMute) + d.logger.Debugw("sending padding on mute") if d.kind == webrtc.RTPCodecTypeVideo { d.sendPaddingOnMuteForVideo() } else if d.mime == "audio/opus" { @@ -1717,7 +1717,7 @@ func (d *DownTrack) sendPaddingOnMuteForVideo() { if d.rtpStats.IsActive() || d.IsClosed() { return } - d.WritePaddingRTP(20, true) + d.WritePaddingRTP(20, true, true) time.Sleep(paddingOnMuteInterval) } } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 0690d4084..b2c4f4f27 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1649,7 +1649,7 @@ func (f *Forwarder) maybeStart() { f.firstTS = extPkt.Packet.Timestamp } -func (f *Forwarder) GetSnTsForPadding(num int) ([]SnTs, error) { +func (f *Forwarder) GetSnTsForPadding(num int, forceMarker bool) ([]SnTs, error) { f.lock.Lock() defer f.lock.Unlock() @@ -1660,7 +1660,6 @@ func (f *Forwarder) GetSnTsForPadding(num int) ([]SnTs, error) { // not get out-of-sync. But, when a stream is paused, // force a frame marker as a restart of the stream will // start with a key frame which will reset the decoder. - forceMarker := false if !f.vls.GetTarget().IsValid() { forceMarker = true } diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index 06c294f9d..df82fa86d 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -1770,7 +1770,7 @@ func TestForwardGetSnTsForPadding(t *testing.T) { disable(f) // should get back frame end needed as the last packet did not have RTP marker set - snts, err := f.GetSnTsForPadding(5) + snts, err := f.GetSnTsForPadding(5, false) require.NoError(t, err) numPadding := 5 @@ -1786,7 +1786,7 @@ func TestForwardGetSnTsForPadding(t *testing.T) { require.Equal(t, sntsExpected, snts) // now that there is a marker, timestamp should jump on first padding when asked again - snts, err = f.GetSnTsForPadding(numPadding) + snts, err = f.GetSnTsForPadding(numPadding, false) require.NoError(t, err) for i := 0; i < numPadding; i++ { diff --git a/pkg/sfu/streamallocator/track.go b/pkg/sfu/streamallocator/track.go index d3aedcef4..4c1a125e4 100644 --- a/pkg/sfu/streamallocator/track.go +++ b/pkg/sfu/streamallocator/track.go @@ -135,7 +135,7 @@ func (t *Track) SetMaxLayer(layer buffer.VideoLayer) bool { } func (t *Track) WritePaddingRTP(bytesToSend int) int { - return t.downTrack.WritePaddingRTP(bytesToSend, false) + return t.downTrack.WritePaddingRTP(bytesToSend, false, false) } func (t *Track) AllocateOptimal(allowOvershoot bool) sfu.VideoAllocation {