From 32af15dc8036d583a98a3875cec66a48d1ef66d0 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 21 Aug 2023 13:20:40 +0530 Subject: [PATCH 1/3] Log RTP stream start time and more details when adjusting first packet time. (#1983) Trying to understand first packet time jumps on migration. --- pkg/sfu/buffer/rtpstats.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index d99954bb0..120285129 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -402,6 +402,14 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa extStartSNOverridden: r.extStartSN, } } + + r.logger.Debugw( + "rtp stream start", + "startTime", r.startTime.String(), + "firstTime", r.firstTime.String(), + "startSN", r.extStartSN, + "startTS", r.extStartTS, + ) } if r.resyncOnNextPacket { @@ -529,7 +537,7 @@ func (r *RTPStats) maybeAdjustStart(rtph *rtp.Header, pktSize uint64, hdrSize ui r.tsCycles++ } r.logger.Infow( - "adjusting starting sequence number", + "adjusting start", "snBefore", snBeforeAdjust, "snAfter", r.extStartSN, "snCyles", r.cycles, @@ -817,8 +825,13 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) { if firstTime.Before(r.firstTime) { r.logger.Debugw( "adjusting first packet time", + "startTime", r.startTime.String(), + "nowTime", now.String(), "before", r.firstTime.String(), "after", firstTime.String(), + "adjustment", r.firstTime.Sub(firstTime), + "nowTS", ts, + "extStartTS", r.extStartTS, ) if r.firstTime.Sub(firstTime) > firstPacketTimeAdjustThreshold { r.logger.Infow("first packet time adjustment too big, ignoring", From b1098cda41e65cbf839f605feb92a6074bd68ba2 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 21 Aug 2023 18:04:03 +0530 Subject: [PATCH 2/3] Do not start RTPStats on a padding packet. (#1984) --- pkg/sfu/buffer/buffer_test.go | 4 +-- pkg/sfu/buffer/rtpstats.go | 11 +++++++ pkg/sfu/downtrack.go | 56 ++++++++++++++--------------------- 3 files changed, 35 insertions(+), 36 deletions(-) diff --git a/pkg/sfu/buffer/buffer_test.go b/pkg/sfu/buffer/buffer_test.go index 7f1e97357..eb6582dbb 100644 --- a/pkg/sfu/buffer/buffer_test.go +++ b/pkg/sfu/buffer/buffer_test.go @@ -180,6 +180,7 @@ func TestNewBuffer(t *testing.T) { Header: rtp.Header{ SequenceNumber: 65534, }, + Payload: []byte{1}, }, { Header: rtp.Header{ @@ -201,8 +202,7 @@ func TestNewBuffer(t *testing.T) { buff := NewBuffer(123, pool, pool) buff.codecType = webrtc.RTPCodecTypeVideo require.NotNil(t, buff) - buff.OnRtcpFeedback(func(_ []rtcp.Packet) { - }) + buff.OnRtcpFeedback(func(_ []rtcp.Packet) {}) buff.Bind(webrtc.RTPParameters{ HeaderExtensions: nil, Codecs: []webrtc.RTPCodecParameters{vp8Codec}, diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 120285129..24a0c34a7 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -377,6 +377,11 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa first := false if !r.initialized { + if payloadSize == 0 { + // do not start on a padding only packet + return + } + r.initialized = true r.startTime = time.Now() @@ -514,6 +519,12 @@ func (r *RTPStats) maybeAdjustStart(rtph *rtp.Header, pktSize uint64, hdrSize ui return false } + if payloadSize == 0 { + // do not start on a padding only packet + r.logger.Infow("adjusting start, skipping on padding only packet") + return true + } + r.packetsLost += uint32(uint16(r.extStartSN)-rtph.SequenceNumber) - 1 snBeforeAdjust := r.extStartSN r.extStartSN = uint32(rtph.SequenceNumber) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index eceb854fa..cf18f1c38 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -784,9 +784,8 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa TransportWideExtID: uint8(d.transportWideExtID), WriteStream: d.writeStream, Metadata: sendPacketMetadata{ - isPadding: true, - disableCounter: true, - disableRTPStats: paddingOnMute, + isPadding: true, + disableCounter: true, }, OnSent: d.packetSent, }) @@ -1294,10 +1293,7 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), WriteStream: d.writeStream, - Metadata: sendPacketMetadata{ - isBlankFrame: true, - }, - OnSent: d.packetSent, + OnSent: d.packetSent, }) // only the first frame will need frameEndNeeded to close out the @@ -1791,11 +1787,7 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), WriteStream: d.writeStream, - Metadata: sendPacketMetadata{ - isBlankFrame: true, - disableRTPStats: true, - }, - OnSent: d.packetSent, + OnSent: d.packetSent, }) } @@ -1812,16 +1804,14 @@ func (d *DownTrack) HandleRTCPSenderReportData(_payloadType webrtc.PayloadType, } type sendPacketMetadata struct { - layer int32 - arrival time.Time - isKeyFrame bool - isRTX bool - isPadding bool - isBlankFrame bool - disableCounter bool - disableRTPStats bool - tp *TranslationParams - pool *[]byte + layer int32 + arrival time.Time + isKeyFrame bool + isRTX bool + isPadding bool + disableCounter bool + tp *TranslationParams + pool *[]byte } func (d *DownTrack) packetSent(md interface{}, hdr *rtp.Header, payloadSize int, sendTime time.Time, sendError error) { @@ -1839,10 +1829,9 @@ func (d *DownTrack) packetSent(md interface{}, hdr *rtp.Header, payloadSize int, return } - headerSize := hdr.MarshalSize() if !spmd.disableCounter { // STREAM-ALLOCATOR-TODO: remove this stream allocator bytes counter once stream allocator changes fully to pull bytes counter - size := uint32(headerSize + payloadSize) + size := uint32(hdr.MarshalSize() + payloadSize) d.streamAllocatorBytesCounter.Add(size) if spmd.isRTX { d.bytesRetransmitted.Add(size) @@ -1851,16 +1840,15 @@ func (d *DownTrack) packetSent(md interface{}, hdr *rtp.Header, payloadSize int, } } - if !spmd.disableRTPStats { - packetTime := spmd.arrival - if packetTime.IsZero() { - packetTime = sendTime - } - if spmd.isPadding { - d.rtpStats.Update(hdr, 0, payloadSize, packetTime) - } else { - d.rtpStats.Update(hdr, payloadSize, 0, packetTime) - } + // update RTPStats + packetTime := spmd.arrival + if packetTime.IsZero() { + packetTime = sendTime + } + if spmd.isPadding { + d.rtpStats.Update(hdr, 0, payloadSize, packetTime) + } else { + d.rtpStats.Update(hdr, payloadSize, 0, packetTime) } if spmd.isKeyFrame { From 28a60e1808bf9fcf62408c434cc70c39233a0d6e Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 21 Aug 2023 19:47:18 +0530 Subject: [PATCH 3/3] Need empty metadata object (#1985) --- pkg/sfu/downtrack.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index cf18f1c38..cc690576b 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1293,6 +1293,7 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), WriteStream: d.writeStream, + Metadata: sendPacketMetadata{}, OnSent: d.packetSent, }) @@ -1787,6 +1788,7 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), WriteStream: d.writeStream, + Metadata: sendPacketMetadata{}, OnSent: d.packetSent, }) }