From e461e9cd795f36e7659882df542e90ba235e6a26 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 19 Oct 2023 13:58:50 +0530 Subject: [PATCH 1/9] Log skew in clock rate. (#2158) * Log skew in clock rate. Remember seeing sender report time stamp moving backward across mute with replaceTrack(null). Not able to reproduce it in JS sample app, but have seen it elsewhere. Logging to understand it better. Wondering if the sender report should be reset on time stamp moving backward or if we should drop backwards moving reports. * set threshold at 20% --- pkg/sfu/buffer/rtpstats_base.go | 12 +++++++++ pkg/sfu/buffer/rtpstats_receiver.go | 39 ++++++++++++++++++----------- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 0c77299ae..11b89b9cc 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -105,6 +105,8 @@ type snapshot struct { maxJitter float64 } +// ------------------------------------------------------------------ + type RTCPSenderReportData struct { RTPTimestamp uint32 RTPTimestampExt uint64 @@ -112,6 +114,16 @@ type RTCPSenderReportData struct { At time.Time } +func (r *RTCPSenderReportData) ToString() string { + if r == nil { + return "" + } + + return fmt.Sprintf("ntp: %s, rtp: %d, extRtp: %d, at: %s", r.NTPTimestamp.Time().String(), r.RTPTimestamp, r.RTPTimestampExt, r.At.String()) +} + +// ------------------------------------------------------------------ + type RTPStatsParams struct { ClockRate uint32 Logger logger.Logger diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 5902f57e8..fe906eb98 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -16,6 +16,7 @@ package buffer import ( "fmt" + "math" "time" "github.com/pion/rtcp" @@ -287,12 +288,8 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp { r.logger.Infow( "received anachronous sender report", - "currentNTP", srData.NTPTimestamp.Time().String(), - "currentRTP", srData.RTPTimestamp, - "currentAt", srData.At.String(), - "lastNTP", r.srNewest.NTPTimestamp.Time().String(), - "lastRTP", r.srNewest.RTPTimestamp, - "lastAt", r.srNewest.At.String(), + "last", r.srNewest.ToString(), + "current", srData.ToString(), ) return } @@ -310,6 +307,26 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestamp, r.timestamp.GetStart()) + if r.srNewest != nil { + timeSinceLast := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()).Seconds() + rtpDiffSinceLast := srDataCopy.RTPTimestampExt - r.srNewest.RTPTimestampExt + calculatedClockRateFromLast := float64(rtpDiffSinceLast) / timeSinceLast + + timeSinceFirst := srData.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time()).Seconds() + rtpDiffSinceFirst := srDataCopy.RTPTimestampExt - r.srFirst.RTPTimestampExt + calculatedClockRateFromFirst := float64(rtpDiffSinceFirst) / timeSinceFirst + if math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromLast) > 0.2*float64(r.params.ClockRate) || math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.params.ClockRate) { + r.logger.Infow( + "clock rate skew", + "first", r.srFirst.ToString(), + "last", r.srNewest.ToString(), + "current", srDataCopy.ToString(), + "calculatedFirst", calculatedClockRateFromFirst, + "calculatedLast", calculatedClockRateFromLast, + ) + } + } + if r.srNewest != nil && srDataCopy.RTPTimestampExt < r.srNewest.RTPTimestampExt { // This can happen when a track is replaced with a null and then restored - // i. e. muting replacing with null and unmute restoring the original track. @@ -317,14 +334,8 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) // Resetting will ensure sample rate calculations do not go haywire due to negative time. r.logger.Infow( "received sender report, out-of-order, resetting", - "prevTSExt", r.srNewest.RTPTimestampExt, - "prevRTP", r.srNewest.RTPTimestamp, - "prevNTP", r.srNewest.NTPTimestamp.Time().String(), - "prevAt", r.srNewest.At.String(), - "currTSExt", srDataCopy.RTPTimestampExt, - "currRTP", srDataCopy.RTPTimestamp, - "currNTP", srDataCopy.NTPTimestamp.Time().String(), - "currentAt", srDataCopy.At.String(), + "last", r.srNewest.ToString(), + "current", srDataCopy.ToString(), ) r.srFirst = nil } From 0d7477178e2b7d326b0337c8eca8298f6ea848e2 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 20 Oct 2023 00:44:39 +0530 Subject: [PATCH 2/9] More fine grained filtering NACKs after a key frame. (#2159) * More fine grained filtering NACKs after a key frame. There are applications with periodic key frame. So, a packet lost before a key frame will not be retransmitted. But, decoder could wait (jitter buffer, play out time) and cause a stutter. Idea behind disabling NACKs after key frame was another knob to throttle retransmission bit rate. But, with spaced out retransmissions and max retransmissions per sequence number, there are throttles. This would provide more throttling, but affects some applications. So, disabling filtering NACKs after a key frame. Introducing another flag to disallow layers. This would still be quite useful, i. e. under congestion the stream allocator would move the target lower. But, because of congestion, higher layer would have lost a bunch of packets. Client would NACK those. Retransmitting those higher layer packets would congest the channel more. The new flag (default enabled) would disallow higher layers retransmission. This was happening before this change also, just splitting out the flag for more control. * split flag --- pkg/sfu/buffer/rtpstats_receiver.go | 4 +++- pkg/sfu/forwarder.go | 27 ++++++++++++++------------- pkg/sfu/receiver.go | 5 ++--- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index fe906eb98..164c0928b 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -315,7 +315,9 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) timeSinceFirst := srData.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time()).Seconds() rtpDiffSinceFirst := srDataCopy.RTPTimestampExt - r.srFirst.RTPTimestampExt calculatedClockRateFromFirst := float64(rtpDiffSinceFirst) / timeSinceFirst - if math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromLast) > 0.2*float64(r.params.ClockRate) || math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.params.ClockRate) { + + if (timeSinceLast > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromLast) > 0.2*float64(r.params.ClockRate)) || + (timeSinceFirst > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.params.ClockRate)) { r.logger.Infow( "clock rate skew", "first", r.srFirst.ToString(), diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index cebb5704b..ea6ea29c1 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -38,7 +38,8 @@ import ( // Forwarder const ( FlagPauseOnDowngrade = true - FlagFilterRTX = true + FlagFilterRTX = false + FlagFilterRTXLayers = true TransitionCostSpatial = 10 ResumeBehindThresholdSeconds = float64(0.2) // 200ms @@ -1399,15 +1400,14 @@ func (f *Forwarder) CheckSync() (locked bool, layer int32) { } func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLayers [buffer.DefaultMaxLayerSpatial + 1]bool) { - if !FlagFilterRTX { - filtered = nacks - return - } - f.lock.RLock() defer f.lock.RUnlock() - filtered = f.rtpMunger.FilterRTX(nacks) + if !FlagFilterRTX { + filtered = nacks + } else { + filtered = f.rtpMunger.FilterRTX(nacks) + } // // Curb RTX when deficient for two cases @@ -1417,14 +1417,15 @@ func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLaye // // Without the curb, when congestion hits, RTX rate could be so high that it further congests the channel. // - currentLayer := f.vls.GetCurrent() - targetLayer := f.vls.GetTarget() - for layer := int32(0); layer < buffer.DefaultMaxLayerSpatial+1; layer++ { - if f.isDeficientLocked() && (targetLayer.Spatial < currentLayer.Spatial || layer > currentLayer.Spatial) { - disallowedLayers[layer] = true + if FlagFilterRTXLayers { + currentLayer := f.vls.GetCurrent() + targetLayer := f.vls.GetTarget() + for layer := int32(0); layer < buffer.DefaultMaxLayerSpatial+1; layer++ { + if f.isDeficientLocked() && (targetLayer.Spatial < currentLayer.Spatial || layer > currentLayer.Spatial) { + disallowedLayers[layer] = true + } } } - return } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index e7ba1fb93..560a9246d 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -210,9 +210,6 @@ func NewWebRTCReceiver( isRED: IsRedCodec(track.Codec().MimeType), } - w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, trackersConfig) - w.streamTrackerManager.SetListener(w) - for _, opt := range opts { w = opt(w) } @@ -235,6 +232,8 @@ func NewWebRTCReceiver( }) w.connectionStats.Start(w.trackInfo) + w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, trackersConfig) + w.streamTrackerManager.SetListener(w) // SVC-TODO: Handle DD for non-SVC cases??? if w.isSVC { for _, ext := range receiver.GetParameters().HeaderExtensions { From 43a0ca57b53a87df0f0fe11499054fa692b32b31 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 20 Oct 2023 12:13:29 +0530 Subject: [PATCH 3/9] Clear flags in packet metadata cache before setting them. (#2160) Not sure if this could have resulted in bad FPS calculation, but could have contributed to it. --- pkg/sfu/buffer/rtpstats_sender.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index b1c4c5df9..2f3bcd426 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -844,6 +844,7 @@ func (r *RTPStatsSender) setSnInfo(esn uint64, ehsn uint64, pktSize uint16, hdrS snInfo := &r.snInfos[slot] snInfo.pktSize = pktSize snInfo.hdrSize = hdrSize + snInfo.flags = 0 if marker { snInfo.flags |= snInfoFlagMarker } From 5bf2e5fd4a62909aa684c68659655d7c27f3e9c0 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 20 Oct 2023 23:06:34 +0530 Subject: [PATCH 4/9] Log clock deviations in sender report. (#2161) Seeing some unexplained jumps in sender report time stamp in canary. Wonder if the calculated clock rate is way off during some interval. Logging clock deviations to understand better. --- pkg/sfu/buffer/rtpstats_sender.go | 53 ++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 2f3bcd426..49892afdb 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -17,6 +17,7 @@ package buffer import ( "errors" "fmt" + "math" "time" "github.com/pion/rtcp" @@ -593,6 +594,37 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui } } + srData := &RTCPSenderReportData{ + NTPTimestamp: nowNTP, + RTPTimestamp: nowRTP, + RTPTimestampExt: nowRTPExt, + At: now, + } + if r.srNewest != nil { + timeSinceLastReport := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time()).Seconds() + rtpDiffSinceLastReport := nowRTPExt - r.srNewest.RTPTimestampExt + windowClockRate := float64(rtpDiffSinceLastReport) / timeSinceLastReport + if timeSinceLastReport > 0.2 && math.Abs(float64(r.params.ClockRate)-windowClockRate) > 0.2*float64(r.params.ClockRate) { + r.logger.Infow( + "sending sender report, clock skew", + "last", r.srNewest.ToString(), + "curr", srData.ToString(), + "timeNow", time.Now().String(), + "extHighestTS", r.extHighestTS, + "highestTime", r.highestTime.String(), + "timeSinceHighest", timeSinceHighest.String(), + "firstTime", r.firstTime.String(), + "timeSinceFirst", timeSinceFirst.String(), + "nowRTPExtUsingTime", nowRTPExtUsingTime, + "calculatedClockRate", calculatedClockRate, + "nowRTPExtUsingRate", nowRTPExtUsingRate, + "timeSinceLastReport", timeSinceLastReport, + "rtpDiffSinceLastReport", rtpDiffSinceLastReport, + "windowClockRate", windowClockRate, + ) + } + } + if r.srNewest != nil && nowRTPExt < r.srNewest.RTPTimestampExt { // If report being generated is behind, use the time difference and // clock rate of codec to produce next report. @@ -607,18 +639,14 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui // That could end up behind the last report's timestamp in extreme cases r.logger.Infow( "sending sender report, out-of-order, repairing", - "prevTSExt", r.srNewest.RTPTimestampExt, - "prevRTP", r.srNewest.RTPTimestamp, - "prevNTP", r.srNewest.NTPTimestamp.Time().String(), - "extHighestTS", r.extHighestTS, - "currTSExt", nowRTPExt, - "currRTP", nowRTP, - "currNTP", nowNTP.Time().String(), + "last", r.srNewest.ToString(), + "curr", srData.ToString(), "timeNow", time.Now().String(), - "firstTime", r.firstTime.String(), - "timeSinceFirst", timeSinceFirst.String(), + "extHighestTS", r.extHighestTS, "highestTime", r.highestTime.String(), "timeSinceHighest", timeSinceHighest.String(), + "firstTime", r.firstTime.String(), + "timeSinceFirst", timeSinceFirst.String(), "nowRTPExtUsingTime", nowRTPExtUsingTime, "calculatedClockRate", calculatedClockRate, "nowRTPExtUsingRate", nowRTPExtUsingRate, @@ -628,12 +656,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui nowRTP = uint32(nowRTPExt) } - r.srNewest = &RTCPSenderReportData{ - NTPTimestamp: nowNTP, - RTPTimestamp: nowRTP, - RTPTimestampExt: nowRTPExt, - At: now, - } + r.srNewest = srData if r.srFirst == nil { r.srFirst = r.srNewest } From 0407eb4833f35ec77c1e272bb519dd6eff8a5e49 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 21 Oct 2023 01:37:30 +0530 Subject: [PATCH 5/9] Log audio packets in forwarding path. (#2162) Seeing a time stamp jump that I am not able to explain. Basically, it looks like the time stamp doubles at some point. There is no code which doubles the timestamp. Can understand an erroneous roll over/wrap around, but doubling is very strange. So, logging only audio packets. Will disable as soon as I have some smaples from canary. --- pkg/sfu/buffer/rtpstats_sender.go | 2 ++ pkg/sfu/downtrack.go | 8 ++++++++ pkg/sfu/forwarder.go | 2 +- pkg/sfu/rtpmunger.go | 3 +++ 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 49892afdb..47bf2411b 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -610,6 +610,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui "last", r.srNewest.ToString(), "curr", srData.ToString(), "timeNow", time.Now().String(), + "extStartTS", r.extStartTS, "extHighestTS", r.extHighestTS, "highestTime", r.highestTime.String(), "timeSinceHighest", timeSinceHighest.String(), @@ -642,6 +643,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui "last", r.srNewest.ToString(), "curr", srData.ToString(), "timeNow", time.Now().String(), + "extStartTS", r.extStartTS, "extHighestTS", r.extHighestTS, "highestTime", r.highestTime.String(), "timeSinceHighest", timeSinceHighest.String(), diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index b26080526..c37cb7b21 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -743,6 +743,14 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { Pool: PacketFactory, PoolEntity: poolEntity, }) + if d.kind == webrtc.RTPCodecTypeAudio { + d.params.Logger.Infow("forwarding debug", + "incomingSN", extPkt.ExtSequenceNumber, + "outgoingSN", tp.rtp.extSequenceNumber, + "incomingTS", extPkt.ExtTimestamp, + "outgoingTS", tp.rtp.extTimestamp, + ) // TODO-REMOVE-AFTER-DEBUG + } return nil } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index ea6ea29c1..b174a2e9a 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1479,7 +1479,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e } logTransition := func(message string, extExpectedTS, extRefTS, extLastTS uint64, diffSeconds float64) { - f.logger.Debugw( + f.logger.Infow( message, "layer", layer, "extExpectedTS", extExpectedTS, diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index 31c415f12..636375d42 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -179,6 +179,9 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara extMungedSN := extPkt.ExtSequenceNumber - r.snOffset extMungedTS := extPkt.ExtTimestamp - r.tsOffset + if extMungedTS > (r.extLastTS + 48000) { + r.logger.Infow("large jump in ts", "lastTS", r.extLastTS, "incomingTS", extPkt.ExtTimestamp, "mungedTS", extMungedTS, "tsOffset", r.tsOffset) // TODO-REMOVE-AFTER-DEBUG + } r.extSecondLastSN = r.extLastSN r.extLastSN = extMungedSN From 4f8bbdbaaba0c3c10a3cffdef7a88255c4a1a0e1 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 21 Oct 2023 01:47:50 +0530 Subject: [PATCH 6/9] Keeping revert of debug logs ready (#2163) --- pkg/sfu/downtrack.go | 8 -------- pkg/sfu/forwarder.go | 2 +- pkg/sfu/rtpmunger.go | 3 --- 3 files changed, 1 insertion(+), 12 deletions(-) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index c37cb7b21..b26080526 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -743,14 +743,6 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { Pool: PacketFactory, PoolEntity: poolEntity, }) - if d.kind == webrtc.RTPCodecTypeAudio { - d.params.Logger.Infow("forwarding debug", - "incomingSN", extPkt.ExtSequenceNumber, - "outgoingSN", tp.rtp.extSequenceNumber, - "incomingTS", extPkt.ExtTimestamp, - "outgoingTS", tp.rtp.extTimestamp, - ) // TODO-REMOVE-AFTER-DEBUG - } return nil } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index b174a2e9a..ea6ea29c1 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1479,7 +1479,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e } logTransition := func(message string, extExpectedTS, extRefTS, extLastTS uint64, diffSeconds float64) { - f.logger.Infow( + f.logger.Debugw( message, "layer", layer, "extExpectedTS", extExpectedTS, diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index 636375d42..31c415f12 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -179,9 +179,6 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara extMungedSN := extPkt.ExtSequenceNumber - r.snOffset extMungedTS := extPkt.ExtTimestamp - r.tsOffset - if extMungedTS > (r.extLastTS + 48000) { - r.logger.Infow("large jump in ts", "lastTS", r.extLastTS, "incomingTS", extPkt.ExtTimestamp, "mungedTS", extMungedTS, "tsOffset", r.tsOffset) // TODO-REMOVE-AFTER-DEBUG - } r.extSecondLastSN = r.extLastSN r.extLastSN = extMungedSN From 39edfab2b5c6471c7b132bd85f52e606fdab5f95 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 21 Oct 2023 02:25:03 +0530 Subject: [PATCH 7/9] Fix extended TS calculated during retransmit. (#2164) May have caused the large time stamp jump in sender reports. --- pkg/sfu/sequencer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index 7dad24aea..ee6ed8bdb 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -286,7 +286,7 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta { meta.nacked++ meta.lastNack = refTime - extTS := uint64(meta.timestamp) + (s.extHighestTS & 0xFFFF_FFFF_FFFF_0000) + extTS := uint64(meta.timestamp) + (s.extHighestTS & 0xFFFF_FFFF_0000_0000) if meta.timestamp > highestTS { extTS -= (1 << 32) } From b591c56aa340f03684761c89a0c2fa13db2b1f8a Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 21 Oct 2023 10:26:30 +0530 Subject: [PATCH 8/9] Logging reduction. (#2165) Move some to Debugw and add sampling for a few. --- pkg/sfu/buffer/rtpstats_base.go | 4 +- pkg/sfu/buffer/rtpstats_receiver.go | 40 ++++++---- pkg/sfu/buffer/rtpstats_sender.go | 117 ++++++++++++++++------------ pkg/sfu/downtrack.go | 10 +-- pkg/sfu/forwarder.go | 9 +-- 5 files changed, 101 insertions(+), 79 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 11b89b9cc..3dac1c4af 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -539,8 +539,8 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes packetsExpected := now.extStartSN - then.extStartSN if packetsExpected > cNumSequenceNumbers { - r.logger.Errorw( - "too many packets expected in delta", nil, + r.logger.Infow( + "too many packets expected in delta", "startSN", then.extStartSN, "endSN", now.extStartSN, "packetsExpected", packetsExpected, diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 164c0928b..1272a7424 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -52,6 +52,9 @@ type RTPStatsReceiver struct { timestamp *utils.WrapAround[uint32, uint64] history *protoutils.Bitmap[uint64] + + clockSkewCount int + outOfOrderSsenderReportCount int } func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver { @@ -111,7 +114,7 @@ func (r *RTPStatsReceiver) Update( r.snapshots[i] = r.initSnapshot(r.startTime, r.sequenceNumber.GetExtendedStart()) } - r.logger.Infow( + r.logger.Debugw( "rtp receiver stream start", "startTime", r.startTime.String(), "firstTime", r.firstTime.String(), @@ -318,14 +321,18 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) if (timeSinceLast > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromLast) > 0.2*float64(r.params.ClockRate)) || (timeSinceFirst > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.params.ClockRate)) { - r.logger.Infow( - "clock rate skew", - "first", r.srFirst.ToString(), - "last", r.srNewest.ToString(), - "current", srDataCopy.ToString(), - "calculatedFirst", calculatedClockRateFromFirst, - "calculatedLast", calculatedClockRateFromLast, - ) + if r.clockSkewCount%10 == 0 { + r.logger.Infow( + "clock rate skew", + "first", r.srFirst.ToString(), + "last", r.srNewest.ToString(), + "current", srDataCopy.ToString(), + "calculatedFirst", calculatedClockRateFromFirst, + "calculatedLast", calculatedClockRateFromLast, + "count", r.clockSkewCount, + ) + } + r.clockSkewCount++ } } @@ -334,11 +341,16 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) // i. e. muting replacing with null and unmute restoring the original track. // Under such a condition reset the sender reports to start from this point. // Resetting will ensure sample rate calculations do not go haywire due to negative time. - r.logger.Infow( - "received sender report, out-of-order, resetting", - "last", r.srNewest.ToString(), - "current", srDataCopy.ToString(), - ) + if r.outOfOrderSsenderReportCount%10 == 0 { + r.logger.Infow( + "received sender report, out-of-order, resetting", + "last", r.srNewest.ToString(), + "current", srDataCopy.ToString(), + "count", r.outOfOrderSsenderReportCount, + ) + } + r.outOfOrderSsenderReportCount++ + r.srFirst = nil } diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 47bf2411b..269e448e2 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -156,6 +156,10 @@ type RTPStatsSender struct { nextSenderSnapshotID uint32 senderSnapshots []senderSnapshot + + clockSkewCount int + outOfOrderSenderReportCount int + metadataCacheOverflowCount int } func NewRTPStatsSender(params RTPStatsParams) *RTPStatsSender { @@ -265,7 +269,7 @@ func (r *RTPStatsSender) Update( r.senderSnapshots[i] = r.initSenderSnapshot(r.startTime, r.extStartSN) } - r.logger.Infow( + r.logger.Debugw( "rtp sender stream start", "startTime", r.startTime.String(), "firstTime", r.firstTime.String(), @@ -510,22 +514,26 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt eis := &s.intervalStats eis.aggregate(&is) if is.packetsNotFound != 0 { - r.logger.Warnw( - "potential sequence number de-sync", nil, - "lastRRTime", r.lastRRTime.String(), - "lastRR", r.lastRR, - "sinceLastRR", time.Since(r.lastRRTime).String(), - "receivedRR", rr, - "extStartSN", r.extStartSN, - "extHighestSN", r.extHighestSN, - "extLastRRSN", s.extLastRRSN, - "extReceivedRRSN", extReceivedRRSN, - "packetsInInterval", extReceivedRRSN-s.extLastRRSN, - "intervalStats", is.ToString(), - "aggregateIntervalStats", eis.ToString(), - "extHighestSNFromRR", r.extHighestSNFromRR, - "packetsLostFromRR", r.packetsLostFromRR, - ) + if r.metadataCacheOverflowCount%10 == 0 { + r.logger.Infow( + "metadata cache overflow", + "lastRRTime", r.lastRRTime.String(), + "lastRR", r.lastRR, + "sinceLastRR", time.Since(r.lastRRTime).String(), + "receivedRR", rr, + "extStartSN", r.extStartSN, + "extHighestSN", r.extHighestSN, + "extLastRRSN", s.extLastRRSN, + "extReceivedRRSN", extReceivedRRSN, + "packetsInInterval", extReceivedRRSN-s.extLastRRSN, + "intervalStats", is.ToString(), + "aggregateIntervalStats", eis.ToString(), + "extHighestSNFromRR", r.extHighestSNFromRR, + "packetsLostFromRR", r.packetsLostFromRR, + "count", r.metadataCacheOverflowCount, + ) + } + r.metadataCacheOverflowCount++ } s.extLastRRSN = extReceivedRRSN } @@ -605,24 +613,28 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui rtpDiffSinceLastReport := nowRTPExt - r.srNewest.RTPTimestampExt windowClockRate := float64(rtpDiffSinceLastReport) / timeSinceLastReport if timeSinceLastReport > 0.2 && math.Abs(float64(r.params.ClockRate)-windowClockRate) > 0.2*float64(r.params.ClockRate) { - r.logger.Infow( - "sending sender report, clock skew", - "last", r.srNewest.ToString(), - "curr", srData.ToString(), - "timeNow", time.Now().String(), - "extStartTS", r.extStartTS, - "extHighestTS", r.extHighestTS, - "highestTime", r.highestTime.String(), - "timeSinceHighest", timeSinceHighest.String(), - "firstTime", r.firstTime.String(), - "timeSinceFirst", timeSinceFirst.String(), - "nowRTPExtUsingTime", nowRTPExtUsingTime, - "calculatedClockRate", calculatedClockRate, - "nowRTPExtUsingRate", nowRTPExtUsingRate, - "timeSinceLastReport", timeSinceLastReport, - "rtpDiffSinceLastReport", rtpDiffSinceLastReport, - "windowClockRate", windowClockRate, - ) + if r.clockSkewCount%10 == 0 { + r.logger.Infow( + "sending sender report, clock skew", + "last", r.srNewest.ToString(), + "curr", srData.ToString(), + "timeNow", time.Now().String(), + "extStartTS", r.extStartTS, + "extHighestTS", r.extHighestTS, + "highestTime", r.highestTime.String(), + "timeSinceHighest", timeSinceHighest.String(), + "firstTime", r.firstTime.String(), + "timeSinceFirst", timeSinceFirst.String(), + "nowRTPExtUsingTime", nowRTPExtUsingTime, + "calculatedClockRate", calculatedClockRate, + "nowRTPExtUsingRate", nowRTPExtUsingRate, + "timeSinceLastReport", timeSinceLastReport, + "rtpDiffSinceLastReport", rtpDiffSinceLastReport, + "windowClockRate", windowClockRate, + "count", r.clockSkewCount, + ) + } + r.clockSkewCount++ } } @@ -638,21 +650,26 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui // result in this module not having calculated clock rate of publisher side. // - When the above happens, current will be generated using highestTS which could be behind. // That could end up behind the last report's timestamp in extreme cases - r.logger.Infow( - "sending sender report, out-of-order, repairing", - "last", r.srNewest.ToString(), - "curr", srData.ToString(), - "timeNow", time.Now().String(), - "extStartTS", r.extStartTS, - "extHighestTS", r.extHighestTS, - "highestTime", r.highestTime.String(), - "timeSinceHighest", timeSinceHighest.String(), - "firstTime", r.firstTime.String(), - "timeSinceFirst", timeSinceFirst.String(), - "nowRTPExtUsingTime", nowRTPExtUsingTime, - "calculatedClockRate", calculatedClockRate, - "nowRTPExtUsingRate", nowRTPExtUsingRate, - ) + if r.outOfOrderSenderReportCount%10 == 0 { + r.logger.Infow( + "sending sender report, out-of-order, repairing", + "last", r.srNewest.ToString(), + "curr", srData.ToString(), + "timeNow", time.Now().String(), + "extStartTS", r.extStartTS, + "extHighestTS", r.extHighestTS, + "highestTime", r.highestTime.String(), + "timeSinceHighest", timeSinceHighest.String(), + "firstTime", r.firstTime.String(), + "timeSinceFirst", timeSinceFirst.String(), + "nowRTPExtUsingTime", nowRTPExtUsingTime, + "calculatedClockRate", calculatedClockRate, + "nowRTPExtUsingRate", nowRTPExtUsingRate, + "count", r.outOfOrderSenderReportCount, + ) + } + r.outOfOrderSenderReportCount++ + ntpDiffSinceLast := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time()) nowRTPExt = r.srNewest.RTPTimestampExt + uint64(ntpDiffSinceLast.Seconds()*float64(r.params.ClockRate)) nowRTP = uint32(nowRTPExt) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index b26080526..880f047e8 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -672,7 +672,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { tp, err := d.forwarder.GetTranslationParams(extPkt, layer) if tp.shouldDrop { if err != nil { - d.params.Logger.Errorw("write rtp packet failed", err) + d.params.Logger.Errorw("could not get translation params", err) } return err } @@ -692,7 +692,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { hdr, err := d.getTranslatedRTPHeader(extPkt, tp) if err != nil { - d.params.Logger.Errorw("write rtp packet failed", err) + d.params.Logger.Errorw("could not get translated RTP header", err) if poolEntity != nil { PacketFactory.Put(poolEntity) } @@ -1447,7 +1447,7 @@ func (d *DownTrack) getH264BlankFrame(_frameEndNeeded bool) ([]byte, error) { func (d *DownTrack) handleRTCP(bytes []byte) { pkts, err := rtcp.Unmarshal(bytes) if err != nil { - d.params.Logger.Errorw("unmarshal rtcp receiver packets err", err) + d.params.Logger.Errorw("could not unmarshal rtcp receiver packets", err) return } @@ -1611,7 +1611,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { var pkt rtp.Packet if err = pkt.Unmarshal(pktBuff[:n]); err != nil { - d.params.Logger.Errorw("unmarshalling rtp packet failed in retransmit", err) + d.params.Logger.Errorw("could not unmarshal rtp packet in retransmit", err) continue } pkt.Header.Marker = epm.marker @@ -1625,7 +1625,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { if d.mime == "video/vp8" && len(pkt.Payload) > 0 && len(epm.codecBytes) != 0 { var incomingVP8 buffer.VP8 if err = incomingVP8.Unmarshal(pkt.Payload); err != nil { - d.params.Logger.Errorw("unmarshalling VP8 packet err", err) + d.params.Logger.Errorw("could not unmarshal VP8 packet", err) PacketFactory.Put(poolEntity) continue } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index ea6ea29c1..f37bea03f 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1581,14 +1581,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e extNextTS = extExpectedTS } else if diffSeconds > ResumeBehindHighTresholdSeconds { // could be due to incorrect reference calculation - f.logger.Infow( - "resume, reference very far behind", - "layer", layer, - "extExpectedTS", extExpectedTS, - "extRefTS", extRefTS, - "extLastTS", extLastTS, - "diffSeconds", diffSeconds, - ) + logTransition("resume, reference very far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) extNextTS = extExpectedTS } else { extNextTS = extRefTS From 3e9450c7747b8cab0067c8132ac7817c915aee5f Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 21 Oct 2023 11:02:34 +0530 Subject: [PATCH 9/9] Log more details in warns. (#2166) Logging more details in warns so that we do not have to enable Infow for some logs later. --- pkg/sfu/buffer/rtpstats_receiver.go | 12 ++++++++++++ pkg/sfu/buffer/rtpstats_sender.go | 12 ++++++++++++ pkg/sfu/rtpmunger.go | 2 +- 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 1272a7424..77300f7ab 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -157,6 +157,12 @@ func (r *RTPStatsReceiver) Update( if -gapSN >= cNumSequenceNumbers { r.logger.Warnw( "large sequence number gap negative", nil, + "extStartSN", r.sequenceNumber.GetExtendedStart(), + "extHighestSN", r.sequenceNumber.GetExtendedHighest(), + "extStartTS", r.timestamp.GetExtendedStart(), + "extHighestTS", r.timestamp.GetExtendedHighest(), + "firstTime", r.firstTime.String(), + "highestTime", r.highestTime.String(), "prev", resSN.PreExtendedHighest, "curr", resSN.ExtendedVal, "gap", gapSN, @@ -223,6 +229,12 @@ func (r *RTPStatsReceiver) Update( if gapSN >= cNumSequenceNumbers { r.logger.Warnw( "large sequence number gap", nil, + "extStartSN", r.sequenceNumber.GetExtendedStart(), + "extHighestSN", r.sequenceNumber.GetExtendedHighest(), + "extStartTS", r.timestamp.GetExtendedStart(), + "extHighestTS", r.timestamp.GetExtendedHighest(), + "firstTime", r.firstTime.String(), + "highestTime", r.highestTime.String(), "prev", resSN.PreExtendedHighest, "curr", resSN.ExtendedVal, "gap", gapSN, diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 269e448e2..59e01a831 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -289,6 +289,12 @@ func (r *RTPStatsSender) Update( if -gapSN >= cNumSequenceNumbers { r.logger.Warnw( "large sequence number gap negative", nil, + "extStartSN", r.extStartSN, + "extHighestSN", r.extHighestSN, + "extStartTS", r.extStartTS, + "extHighestTS", r.extHighestTS, + "firstTime", r.firstTime.String(), + "highestTime", r.highestTime.String(), "prev", r.extHighestSN, "curr", extSequenceNumber, "gap", gapSN, @@ -349,6 +355,12 @@ func (r *RTPStatsSender) Update( if gapSN >= cNumSequenceNumbers { r.logger.Warnw( "large sequence number gap", nil, + "extStartSN", r.extStartSN, + "extHighestSN", r.extHighestSN, + "extStartTS", r.extStartTS, + "extHighestTS", r.extHighestTS, + "firstTime", r.firstTime.String(), + "highestTime", r.highestTime.String(), "prev", r.extHighestSN, "curr", extSequenceNumber, "gap", gapSN, diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index 31c415f12..1fcda126e 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -217,7 +217,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara r.logger.Errorw( "unexpected packet ordering", nil, "extIncomingSN", extPkt.ExtSequenceNumber, - "extHighestIncominSN", r.extHighestIncomingSN, + "extHighestIncomingSN", r.extHighestIncomingSN, "extLastSN", r.extLastSN, "snOffsetIncoming", snOffset, "snOffsetHighest", r.snOffset,