From 9674ac48ab4657796bb5b9bb0e421d5b7723d30e Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 1 Apr 2026 21:17:43 +0530 Subject: [PATCH] Cleaning up some logs and standardising log frequency. (#4420) Removing some logs which have not been useful in terms of insights other than saying that there are a bunch of packets missing. Going to start looking at gaps in terms of time if the inter-packet gap is too high. Also, using logging these events as first 20 and then every 200. --- pkg/sfu/forwarder.go | 79 ++++++---------------- pkg/sfu/rtpstats/rtpstats_base.go | 2 - pkg/sfu/rtpstats/rtpstats_base_lite.go | 4 ++ pkg/sfu/rtpstats/rtpstats_receiver.go | 91 +++++--------------------- pkg/sfu/rtpstats/rtpstats_sender.go | 58 ++-------------- 5 files changed, 44 insertions(+), 190 deletions(-) diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index f96413005..5779945ad 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1795,23 +1795,6 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e "rtpStats", f.rtpStats, ) } - // TODO-REMOVE-AFTER-DATA-COLLECTION - logTransitionInfo := func(message string, extExpectedTS, extRefTS, extLastTS uint64, diffSeconds float64) { - f.logger.Infow( - message, - "layer", layer, - "referenceLayerSpatial", f.referenceLayerSpatial, - "extExpectedTS", extExpectedTS, - "incomingTS", extPkt.Packet.Timestamp, - "extIncomingTS", extPkt.ExtTimestamp, - "extRefTS", extRefTS, - "extLastTS", extLastTS, - "diffSeconds", math.Abs(diffSeconds), - "refInfos", logger.ObjectSlice(f.refInfos[:]), - "lastSwitchExtIncomingTS", f.lastSwitchExtIncomingTS, - "rtpStats", f.rtpStats, - ) - } // Compute how much time passed between the previous forwarded packet // and the current incoming (to be forwarded) packet and calculate @@ -1891,7 +1874,6 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e } } - bigJump := false var extNextTS uint64 if f.lastSSRC == 0 { // If resuming (e. g. on unmute), keep next timestamp close to expected timestamp. @@ -1917,14 +1899,12 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e diffSeconds := float64(int64(extExpectedTS-extRefTS)) / float64(f.clockRate) if diffSeconds >= 0.0 { if f.resumeBehindThreshold > 0 && diffSeconds > f.resumeBehindThreshold { - logTransitionInfo("resume, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) + logTransition("resume, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) extNextTS = extExpectedTS - bigJump = true } else if diffSeconds > ResumeBehindHighThresholdSeconds { // could be due to incoming time stamp lagging a lot, like an unpause of the track - logTransitionInfo("resume, reference very far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) + logTransition("resume, reference very far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) extNextTS = extExpectedTS - bigJump = true } else { extNextTS = extRefTS } @@ -1966,44 +1946,23 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // nominal increase extNextTS = extLastTS + 1 } - if bigJump { // TODO-REMOVE-AFTER-DATA-COLLECTION - f.logger.Infow( - "next timestamp on switch", - "switchingAt", switchingAt, - "layer", layer, - "extLastTS", extLastTS, - "lastMarker", rtpMungerState.LastMarker, - "extRefTS", extRefTS, - "dummyStartTSOffset", f.dummyStartTSOffset, - "referenceLayerSpatial", f.referenceLayerSpatial, - "extExpectedTS", extExpectedTS, - "extNextTS", extNextTS, - "tsJump", extNextTS-extLastTS, - "nextSN", rtpMungerState.ExtLastSequenceNumber+1, - "extIncomingSN", extPkt.ExtSequenceNumber, - "incomingTS", extPkt.Packet.Timestamp, - "extIncomingTS", extPkt.ExtTimestamp, - "rtpStats", f.rtpStats, - ) - } else { - f.logger.Debugw( - "next timestamp on switch", - "switchingAt", switchingAt, - "layer", layer, - "extLastTS", extLastTS, - "lastMarker", rtpMungerState.LastMarker, - "extRefTS", extRefTS, - "dummyStartTSOffset", f.dummyStartTSOffset, - "referenceLayerSpatial", f.referenceLayerSpatial, - "extExpectedTS", extExpectedTS, - "extNextTS", extNextTS, - "tsJump", extNextTS-extLastTS, - "nextSN", rtpMungerState.ExtLastSequenceNumber+1, - "extIncomingSN", extPkt.ExtSequenceNumber, - "extIncomingTS", extPkt.ExtTimestamp, - "rtpStats", f.rtpStats, - ) - } + f.logger.Debugw( + "next timestamp on switch", + "switchingAt", switchingAt, + "layer", layer, + "extLastTS", extLastTS, + "lastMarker", rtpMungerState.LastMarker, + "extRefTS", extRefTS, + "dummyStartTSOffset", f.dummyStartTSOffset, + "referenceLayerSpatial", f.referenceLayerSpatial, + "extExpectedTS", extExpectedTS, + "extNextTS", extNextTS, + "tsJump", extNextTS-extLastTS, + "nextSN", rtpMungerState.ExtLastSequenceNumber+1, + "extIncomingSN", extPkt.ExtSequenceNumber, + "extIncomingTS", extPkt.ExtTimestamp, + "rtpStats", f.rtpStats, + ) f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, extNextTS-extLastTS) f.codecMunger.UpdateOffsets(extPkt) diff --git a/pkg/sfu/rtpstats/rtpstats_base.go b/pkg/sfu/rtpstats/rtpstats_base.go index c06043559..9c85a9494 100644 --- a/pkg/sfu/rtpstats/rtpstats_base.go +++ b/pkg/sfu/rtpstats/rtpstats_base.go @@ -31,8 +31,6 @@ import ( const ( cFirstPacketTimeAdjustWindow = 2 * time.Minute cFirstPacketTimeAdjustThreshold = 15 * 1e9 - - cSequenceNumberLargeJumpThreshold = 100 ) // ------------------------------------------------------- diff --git a/pkg/sfu/rtpstats/rtpstats_base_lite.go b/pkg/sfu/rtpstats/rtpstats_base_lite.go index b67ab245b..7af64315a 100644 --- a/pkg/sfu/rtpstats/rtpstats_base_lite.go +++ b/pkg/sfu/rtpstats/rtpstats_base_lite.go @@ -550,4 +550,8 @@ func getPacketsExpected(extStartSN, extHighestSN uint64) uint64 { return extHighestSN - extStartSN + 1 } +func shouldLog(count int) bool { + return count < 20 || count%200 == 0 +} + // ---------------------------------- diff --git a/pkg/sfu/rtpstats/rtpstats_receiver.go b/pkg/sfu/rtpstats/rtpstats_receiver.go index 237d2b1f7..297224402 100644 --- a/pkg/sfu/rtpstats/rtpstats_receiver.go +++ b/pkg/sfu/rtpstats/rtpstats_receiver.go @@ -185,8 +185,6 @@ type RTPStatsReceiver struct { clockSkewCount int clockSkewMediaPathCount int outOfOrderSenderReportCount int - largeJumpCount int - largeJumpNegativeCount int timeReversedCount int packetsDroppedPreStartTimestamp int @@ -311,7 +309,6 @@ func (r *RTPStatsReceiver) Update( r.undoUpdatesLocked(resSN, resTS) r.packetsDroppedPreStartTimestamp++ - rulf := &receiverUpdateLoggingFields{ packetTime: packetTime, sequenceNumber: sequenceNumber, @@ -330,7 +327,9 @@ func (r *RTPStatsReceiver) Update( timeSinceHighest: timeSinceHighest, rtpStats: r, } - r.logger.Warnw("dropping packet, pre-start timestamp", nil, zap.Inline(rulf)) + if shouldLog(r.packetsDroppedPreStartTimestamp) { + r.logger.Warnw("dropping packet, pre-start timestamp", nil, zap.Inline(rulf)) + } if r.maybeRestart(sequenceNumber, timestamp, payloadSize) { r.logger.Infow("potential restart", zap.Inline(rulf)) @@ -381,7 +380,6 @@ func (r *RTPStatsReceiver) Update( r.undoUpdatesLocked(resSN, resTS) r.packetsDroppedOldTimestamp++ - rulf := &receiverUpdateLoggingFields{ packetTime: packetTime, sequenceNumber: sequenceNumber, @@ -400,7 +398,9 @@ func (r *RTPStatsReceiver) Update( timeSinceHighest: timeSinceHighest, rtpStats: r, } - r.logger.Warnw("dropping packet, old timestamp", nil, zap.Inline(rulf)) + if shouldLog(r.packetsDroppedOldTimestamp) { + r.logger.Warnw("dropping packet, old timestamp", nil, zap.Inline(rulf)) + } if r.maybeRestart(sequenceNumber, timestamp, payloadSize) { r.logger.Infow("potential restart", zap.Inline(rulf)) @@ -418,9 +418,9 @@ func (r *RTPStatsReceiver) Update( if gapTS < 0 && gapSN > 0 { r.undoUpdatesLocked(resSN, resTS) - r.packetsDroppedOldSequenceNumber++ expectedTSJump = int64(r.rtpConverter.ToRTPExt(time.Duration(timeSinceHighest))) + r.packetsDroppedOldSequenceNumber++ rulf := &receiverUpdateLoggingFields{ packetTime: packetTime, sequenceNumber: sequenceNumber, @@ -439,7 +439,9 @@ func (r *RTPStatsReceiver) Update( timeSinceHighest: timeSinceHighest, rtpStats: r, } - r.logger.Warnw("dropping packet, old sequence number", nil, zap.Inline(rulf)) + if shouldLog(r.packetsDroppedOldSequenceNumber) { + r.logger.Warnw("dropping packet, old sequence number", nil, zap.Inline(rulf)) + } if r.maybeRestart(sequenceNumber, timestamp, payloadSize) { r.logger.Infow("potential restart", zap.Inline(rulf)) @@ -488,7 +490,6 @@ func (r *RTPStatsReceiver) Update( r.undoUpdatesLocked(resSN, resTS) r.packetsDroppedPreStartSequenceNumber++ - rulf := &receiverUpdateLoggingFields{ packetTime: packetTime, sequenceNumber: sequenceNumber, @@ -507,7 +508,9 @@ func (r *RTPStatsReceiver) Update( timeSinceHighest: timeSinceHighest, rtpStats: r, } - r.logger.Warnw("dropping packet, pre-start sequence number", nil, zap.Inline(rulf)) + if shouldLog(r.packetsDroppedPreStartSequenceNumber) { + r.logger.Warnw("dropping packet, pre-start sequence number", nil, zap.Inline(rulf)) + } if r.maybeRestart(sequenceNumber, timestamp, payloadSize) { r.logger.Infow("potential restart", zap.Inline(rulf)) @@ -539,68 +542,10 @@ func (r *RTPStatsReceiver) Update( } flowState.IsOutOfOrder = true - - if !flowState.IsDuplicate && -gapSN >= cSequenceNumberLargeJumpThreshold { - r.largeJumpNegativeCount++ - if (r.largeJumpNegativeCount-1)%100 == 0 { - rulf := &receiverUpdateLoggingFields{ - packetTime: packetTime, - sequenceNumber: sequenceNumber, - timestamp: timestamp, - marker: marker, - hdrSize: hdrSize, - payloadSize: payloadSize, - paddingSize: paddingSize, - resSN: resSN, - gapSN: gapSN, - resTS: resTS, - gapTS: gapTS, - snRolloverCount: snRolloverCount, - expectedTSJump: expectedTSJump, - tsRolloverCount: tsRolloverCount, - timeSinceHighest: timeSinceHighest, - rtpStats: r, - } - r.logger.Warnw( - "large sequence number gap negative", nil, - zap.Inline(rulf), - "count", r.largeJumpNegativeCount, - ) - } - } } else { // in-order - if gapSN >= cSequenceNumberLargeJumpThreshold { - r.largeJumpCount++ - if (r.largeJumpCount-1)%100 == 0 { - rulf := &receiverUpdateLoggingFields{ - packetTime: packetTime, - sequenceNumber: sequenceNumber, - timestamp: timestamp, - marker: marker, - hdrSize: hdrSize, - payloadSize: payloadSize, - paddingSize: paddingSize, - resSN: resSN, - gapSN: gapSN, - resTS: resTS, - gapTS: gapTS, - snRolloverCount: snRolloverCount, - expectedTSJump: expectedTSJump, - tsRolloverCount: tsRolloverCount, - timeSinceHighest: timeSinceHighest, - rtpStats: r, - } - r.logger.Warnw( - "large sequence number gap", nil, - zap.Inline(rulf), - "count", r.largeJumpCount, - ) - } - } - if resTS.ExtendedVal < resTS.PreExtendedHighest && r.bytes > 0 { r.timeReversedCount++ - if (r.timeReversedCount-1)%100 == 0 { + if shouldLog(r.timeReversedCount) { rulf := &receiverUpdateLoggingFields{ packetTime: packetTime, sequenceNumber: sequenceNumber, @@ -721,7 +666,7 @@ func (r *RTPStatsReceiver) checkOutOfOrderSenderReport(srData *livekit.RTCPSende // Or it could be due bad report generation. // In any case, ignore out-of-order reports. r.outOfOrderSenderReportCount++ - if (r.outOfOrderSenderReportCount-1)%10 == 0 { + if shouldLog(r.outOfOrderSenderReportCount) { r.logger.Infow( "received sender report, out-of-order, skipping", "current", WrappedRTCPSenderReportStateLogger{srData}, @@ -755,7 +700,7 @@ func (r *RTPStatsReceiver) checkRTPClockSkewForSenderReport(srData *livekit.RTCP if (timeSinceLast > 0.2 && math.Abs(float64(r.clockRate)-calculatedClockRateFromLast) > 0.2*float64(r.clockRate)) || (timeSinceFirst > 0.2 && math.Abs(float64(r.clockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.clockRate)) { r.clockSkewCount++ - if (r.clockSkewCount-1)%100 == 0 { + if shouldLog(r.clockSkewCount) { r.logger.Infow( "received sender report, clock skew", "current", WrappedRTCPSenderReportStateLogger{srData}, @@ -792,7 +737,7 @@ func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srDa // is it more than 5 seconds off? if uint32(math.Abs(float64(int64(diffHighest)))) > 5*r.clockRate || uint32(math.Abs(float64(int64(diffFirst)))) > 5*r.clockRate { r.clockSkewMediaPathCount++ - if (r.clockSkewMediaPathCount-1)%100 == 0 { + if shouldLog(r.clockSkewMediaPathCount) { r.logger.Infow( "received sender report, clock skew against media path", "current", WrappedRTCPSenderReportStateLogger{srData}, @@ -1077,8 +1022,6 @@ func (r lockedRTPStatsReceiverLogEncoder) MarshalLogObject(e zapcore.ObjectEncod e.AddInt("clockSkewCount", r.clockSkewCount) e.AddInt("clockSkewMediaPathCount", r.clockSkewMediaPathCount) e.AddInt("outOfOrderSenderReportCount", r.outOfOrderSenderReportCount) - e.AddInt("largeJumpCount", r.largeJumpCount) - e.AddInt("largeJumpNegativeCount", r.largeJumpNegativeCount) e.AddInt("timeReversedCount", r.timeReversedCount) e.AddInt("packetsDroppedPreStartTimestamp", r.packetsDroppedPreStartTimestamp) diff --git a/pkg/sfu/rtpstats/rtpstats_sender.go b/pkg/sfu/rtpstats/rtpstats_sender.go index d9ae74f77..c8551a9ee 100644 --- a/pkg/sfu/rtpstats/rtpstats_sender.go +++ b/pkg/sfu/rtpstats/rtpstats_sender.go @@ -375,10 +375,8 @@ type RTPStatsSender struct { nextSenderSnapshotID uint32 senderSnapshots []senderSnapshot - clockSkewCount int - largeJumpNegativeCount int - largeJumpCount int - timeReversedCount int + clockSkewCount int + timeReversedCount int } func NewRTPStatsSender(params RTPStatsParams, cacheSize int) *RTPStatsSender { @@ -566,58 +564,10 @@ func (r *RTPStatsSender) Update( r.packetsLost-- r.setSnInfo(extSequenceNumber, r.extHighestSN, uint16(pktSize), uint8(hdrSize), uint16(payloadSize), marker, true) } - - if !isDuplicate && -gapSN >= cSequenceNumberLargeJumpThreshold { - r.largeJumpNegativeCount++ - if (r.largeJumpNegativeCount-1)%100 == 0 { - sulf := &senderUpdateLoggingFields{ - packetTime: packetTime, - extSequenceNumber: extSequenceNumber, - extTimestamp: extTimestamp, - marker: marker, - hdrSize: hdrSize, - payloadSize: payloadSize, - paddingSize: paddingSize, - gapSN: gapSN, - gapTS: int64(extTimestamp - r.extHighestTS), - timeSinceHighest: packetTime - r.highestTime, - rtpStats: r, - } - r.logger.Warnw( - "large sequence number gap negative", nil, - zap.Inline(sulf), - "count", r.largeJumpNegativeCount, - ) - } - } } else { // in-order - if gapSN >= cSequenceNumberLargeJumpThreshold { - r.largeJumpCount++ - if (r.largeJumpCount-1)%100 == 0 { - sulf := &senderUpdateLoggingFields{ - packetTime: packetTime, - extSequenceNumber: extSequenceNumber, - extTimestamp: extTimestamp, - marker: marker, - hdrSize: hdrSize, - payloadSize: payloadSize, - paddingSize: paddingSize, - gapSN: gapSN, - gapTS: int64(extTimestamp - r.extHighestTS), - timeSinceHighest: packetTime - r.highestTime, - rtpStats: r, - } - r.logger.Warnw( - "large sequence number gap", nil, - zap.Inline(sulf), - "count", r.largeJumpCount, - ) - } - } - if extTimestamp < r.extHighestTS { r.timeReversedCount++ - if (r.timeReversedCount-1)%100 == 0 { + if shouldLog(r.timeReversedCount) { sulf := &senderUpdateLoggingFields{ packetTime: packetTime, extSequenceNumber: extSequenceNumber, @@ -1008,7 +958,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek windowClockRate := float64(rtpDiffSinceLastReport) / timeSinceLastReport.Seconds() if timeSinceLastReport.Seconds() > 0.2 && math.Abs(float64(r.clockRate)-windowClockRate) > 0.2*float64(r.clockRate) { r.clockSkewCount++ - if (r.clockSkewCount-1)%100 == 0 { + if shouldLog(r.clockSkewCount) { srlf := &senderReportLoggingFields{ srData: srData, publisherSRData: publisherSRData,