Cleaning up some logs and standardising log frequency. (#4420)

Removing some logs which have not been useful in terms of insights other
than saying that there are a bunch of packets missing. Going to start
looking at gaps in terms of time if the inter-packet gap is too high.

Also, using logging these events as first 20 and then every 200.
This commit is contained in:
Raja Subramanian
2026-04-01 21:17:43 +05:30
committed by GitHub
parent 7b92530461
commit 9674ac48ab
5 changed files with 44 additions and 190 deletions
+19 -60
View File
@@ -1795,23 +1795,6 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
"rtpStats", f.rtpStats,
)
}
// TODO-REMOVE-AFTER-DATA-COLLECTION
logTransitionInfo := func(message string, extExpectedTS, extRefTS, extLastTS uint64, diffSeconds float64) {
f.logger.Infow(
message,
"layer", layer,
"referenceLayerSpatial", f.referenceLayerSpatial,
"extExpectedTS", extExpectedTS,
"incomingTS", extPkt.Packet.Timestamp,
"extIncomingTS", extPkt.ExtTimestamp,
"extRefTS", extRefTS,
"extLastTS", extLastTS,
"diffSeconds", math.Abs(diffSeconds),
"refInfos", logger.ObjectSlice(f.refInfos[:]),
"lastSwitchExtIncomingTS", f.lastSwitchExtIncomingTS,
"rtpStats", f.rtpStats,
)
}
// Compute how much time passed between the previous forwarded packet
// and the current incoming (to be forwarded) packet and calculate
@@ -1891,7 +1874,6 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
}
}
bigJump := false
var extNextTS uint64
if f.lastSSRC == 0 {
// If resuming (e. g. on unmute), keep next timestamp close to expected timestamp.
@@ -1917,14 +1899,12 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
diffSeconds := float64(int64(extExpectedTS-extRefTS)) / float64(f.clockRate)
if diffSeconds >= 0.0 {
if f.resumeBehindThreshold > 0 && diffSeconds > f.resumeBehindThreshold {
logTransitionInfo("resume, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds)
logTransition("resume, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds)
extNextTS = extExpectedTS
bigJump = true
} else if diffSeconds > ResumeBehindHighThresholdSeconds {
// could be due to incoming time stamp lagging a lot, like an unpause of the track
logTransitionInfo("resume, reference very far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds)
logTransition("resume, reference very far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds)
extNextTS = extExpectedTS
bigJump = true
} else {
extNextTS = extRefTS
}
@@ -1966,44 +1946,23 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
// nominal increase
extNextTS = extLastTS + 1
}
if bigJump { // TODO-REMOVE-AFTER-DATA-COLLECTION
f.logger.Infow(
"next timestamp on switch",
"switchingAt", switchingAt,
"layer", layer,
"extLastTS", extLastTS,
"lastMarker", rtpMungerState.LastMarker,
"extRefTS", extRefTS,
"dummyStartTSOffset", f.dummyStartTSOffset,
"referenceLayerSpatial", f.referenceLayerSpatial,
"extExpectedTS", extExpectedTS,
"extNextTS", extNextTS,
"tsJump", extNextTS-extLastTS,
"nextSN", rtpMungerState.ExtLastSequenceNumber+1,
"extIncomingSN", extPkt.ExtSequenceNumber,
"incomingTS", extPkt.Packet.Timestamp,
"extIncomingTS", extPkt.ExtTimestamp,
"rtpStats", f.rtpStats,
)
} else {
f.logger.Debugw(
"next timestamp on switch",
"switchingAt", switchingAt,
"layer", layer,
"extLastTS", extLastTS,
"lastMarker", rtpMungerState.LastMarker,
"extRefTS", extRefTS,
"dummyStartTSOffset", f.dummyStartTSOffset,
"referenceLayerSpatial", f.referenceLayerSpatial,
"extExpectedTS", extExpectedTS,
"extNextTS", extNextTS,
"tsJump", extNextTS-extLastTS,
"nextSN", rtpMungerState.ExtLastSequenceNumber+1,
"extIncomingSN", extPkt.ExtSequenceNumber,
"extIncomingTS", extPkt.ExtTimestamp,
"rtpStats", f.rtpStats,
)
}
f.logger.Debugw(
"next timestamp on switch",
"switchingAt", switchingAt,
"layer", layer,
"extLastTS", extLastTS,
"lastMarker", rtpMungerState.LastMarker,
"extRefTS", extRefTS,
"dummyStartTSOffset", f.dummyStartTSOffset,
"referenceLayerSpatial", f.referenceLayerSpatial,
"extExpectedTS", extExpectedTS,
"extNextTS", extNextTS,
"tsJump", extNextTS-extLastTS,
"nextSN", rtpMungerState.ExtLastSequenceNumber+1,
"extIncomingSN", extPkt.ExtSequenceNumber,
"extIncomingTS", extPkt.ExtTimestamp,
"rtpStats", f.rtpStats,
)
f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, extNextTS-extLastTS)
f.codecMunger.UpdateOffsets(extPkt)
-2
View File
@@ -31,8 +31,6 @@ import (
const (
cFirstPacketTimeAdjustWindow = 2 * time.Minute
cFirstPacketTimeAdjustThreshold = 15 * 1e9
cSequenceNumberLargeJumpThreshold = 100
)
// -------------------------------------------------------
+4
View File
@@ -550,4 +550,8 @@ func getPacketsExpected(extStartSN, extHighestSN uint64) uint64 {
return extHighestSN - extStartSN + 1
}
func shouldLog(count int) bool {
return count < 20 || count%200 == 0
}
// ----------------------------------
+17 -74
View File
@@ -185,8 +185,6 @@ type RTPStatsReceiver struct {
clockSkewCount int
clockSkewMediaPathCount int
outOfOrderSenderReportCount int
largeJumpCount int
largeJumpNegativeCount int
timeReversedCount int
packetsDroppedPreStartTimestamp int
@@ -311,7 +309,6 @@ func (r *RTPStatsReceiver) Update(
r.undoUpdatesLocked(resSN, resTS)
r.packetsDroppedPreStartTimestamp++
rulf := &receiverUpdateLoggingFields{
packetTime: packetTime,
sequenceNumber: sequenceNumber,
@@ -330,7 +327,9 @@ func (r *RTPStatsReceiver) Update(
timeSinceHighest: timeSinceHighest,
rtpStats: r,
}
r.logger.Warnw("dropping packet, pre-start timestamp", nil, zap.Inline(rulf))
if shouldLog(r.packetsDroppedPreStartTimestamp) {
r.logger.Warnw("dropping packet, pre-start timestamp", nil, zap.Inline(rulf))
}
if r.maybeRestart(sequenceNumber, timestamp, payloadSize) {
r.logger.Infow("potential restart", zap.Inline(rulf))
@@ -381,7 +380,6 @@ func (r *RTPStatsReceiver) Update(
r.undoUpdatesLocked(resSN, resTS)
r.packetsDroppedOldTimestamp++
rulf := &receiverUpdateLoggingFields{
packetTime: packetTime,
sequenceNumber: sequenceNumber,
@@ -400,7 +398,9 @@ func (r *RTPStatsReceiver) Update(
timeSinceHighest: timeSinceHighest,
rtpStats: r,
}
r.logger.Warnw("dropping packet, old timestamp", nil, zap.Inline(rulf))
if shouldLog(r.packetsDroppedOldTimestamp) {
r.logger.Warnw("dropping packet, old timestamp", nil, zap.Inline(rulf))
}
if r.maybeRestart(sequenceNumber, timestamp, payloadSize) {
r.logger.Infow("potential restart", zap.Inline(rulf))
@@ -418,9 +418,9 @@ func (r *RTPStatsReceiver) Update(
if gapTS < 0 && gapSN > 0 {
r.undoUpdatesLocked(resSN, resTS)
r.packetsDroppedOldSequenceNumber++
expectedTSJump = int64(r.rtpConverter.ToRTPExt(time.Duration(timeSinceHighest)))
r.packetsDroppedOldSequenceNumber++
rulf := &receiverUpdateLoggingFields{
packetTime: packetTime,
sequenceNumber: sequenceNumber,
@@ -439,7 +439,9 @@ func (r *RTPStatsReceiver) Update(
timeSinceHighest: timeSinceHighest,
rtpStats: r,
}
r.logger.Warnw("dropping packet, old sequence number", nil, zap.Inline(rulf))
if shouldLog(r.packetsDroppedOldSequenceNumber) {
r.logger.Warnw("dropping packet, old sequence number", nil, zap.Inline(rulf))
}
if r.maybeRestart(sequenceNumber, timestamp, payloadSize) {
r.logger.Infow("potential restart", zap.Inline(rulf))
@@ -488,7 +490,6 @@ func (r *RTPStatsReceiver) Update(
r.undoUpdatesLocked(resSN, resTS)
r.packetsDroppedPreStartSequenceNumber++
rulf := &receiverUpdateLoggingFields{
packetTime: packetTime,
sequenceNumber: sequenceNumber,
@@ -507,7 +508,9 @@ func (r *RTPStatsReceiver) Update(
timeSinceHighest: timeSinceHighest,
rtpStats: r,
}
r.logger.Warnw("dropping packet, pre-start sequence number", nil, zap.Inline(rulf))
if shouldLog(r.packetsDroppedPreStartSequenceNumber) {
r.logger.Warnw("dropping packet, pre-start sequence number", nil, zap.Inline(rulf))
}
if r.maybeRestart(sequenceNumber, timestamp, payloadSize) {
r.logger.Infow("potential restart", zap.Inline(rulf))
@@ -539,68 +542,10 @@ func (r *RTPStatsReceiver) Update(
}
flowState.IsOutOfOrder = true
if !flowState.IsDuplicate && -gapSN >= cSequenceNumberLargeJumpThreshold {
r.largeJumpNegativeCount++
if (r.largeJumpNegativeCount-1)%100 == 0 {
rulf := &receiverUpdateLoggingFields{
packetTime: packetTime,
sequenceNumber: sequenceNumber,
timestamp: timestamp,
marker: marker,
hdrSize: hdrSize,
payloadSize: payloadSize,
paddingSize: paddingSize,
resSN: resSN,
gapSN: gapSN,
resTS: resTS,
gapTS: gapTS,
snRolloverCount: snRolloverCount,
expectedTSJump: expectedTSJump,
tsRolloverCount: tsRolloverCount,
timeSinceHighest: timeSinceHighest,
rtpStats: r,
}
r.logger.Warnw(
"large sequence number gap negative", nil,
zap.Inline(rulf),
"count", r.largeJumpNegativeCount,
)
}
}
} else { // in-order
if gapSN >= cSequenceNumberLargeJumpThreshold {
r.largeJumpCount++
if (r.largeJumpCount-1)%100 == 0 {
rulf := &receiverUpdateLoggingFields{
packetTime: packetTime,
sequenceNumber: sequenceNumber,
timestamp: timestamp,
marker: marker,
hdrSize: hdrSize,
payloadSize: payloadSize,
paddingSize: paddingSize,
resSN: resSN,
gapSN: gapSN,
resTS: resTS,
gapTS: gapTS,
snRolloverCount: snRolloverCount,
expectedTSJump: expectedTSJump,
tsRolloverCount: tsRolloverCount,
timeSinceHighest: timeSinceHighest,
rtpStats: r,
}
r.logger.Warnw(
"large sequence number gap", nil,
zap.Inline(rulf),
"count", r.largeJumpCount,
)
}
}
if resTS.ExtendedVal < resTS.PreExtendedHighest && r.bytes > 0 {
r.timeReversedCount++
if (r.timeReversedCount-1)%100 == 0 {
if shouldLog(r.timeReversedCount) {
rulf := &receiverUpdateLoggingFields{
packetTime: packetTime,
sequenceNumber: sequenceNumber,
@@ -721,7 +666,7 @@ func (r *RTPStatsReceiver) checkOutOfOrderSenderReport(srData *livekit.RTCPSende
// Or it could be due bad report generation.
// In any case, ignore out-of-order reports.
r.outOfOrderSenderReportCount++
if (r.outOfOrderSenderReportCount-1)%10 == 0 {
if shouldLog(r.outOfOrderSenderReportCount) {
r.logger.Infow(
"received sender report, out-of-order, skipping",
"current", WrappedRTCPSenderReportStateLogger{srData},
@@ -755,7 +700,7 @@ func (r *RTPStatsReceiver) checkRTPClockSkewForSenderReport(srData *livekit.RTCP
if (timeSinceLast > 0.2 && math.Abs(float64(r.clockRate)-calculatedClockRateFromLast) > 0.2*float64(r.clockRate)) ||
(timeSinceFirst > 0.2 && math.Abs(float64(r.clockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.clockRate)) {
r.clockSkewCount++
if (r.clockSkewCount-1)%100 == 0 {
if shouldLog(r.clockSkewCount) {
r.logger.Infow(
"received sender report, clock skew",
"current", WrappedRTCPSenderReportStateLogger{srData},
@@ -792,7 +737,7 @@ func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srDa
// is it more than 5 seconds off?
if uint32(math.Abs(float64(int64(diffHighest)))) > 5*r.clockRate || uint32(math.Abs(float64(int64(diffFirst)))) > 5*r.clockRate {
r.clockSkewMediaPathCount++
if (r.clockSkewMediaPathCount-1)%100 == 0 {
if shouldLog(r.clockSkewMediaPathCount) {
r.logger.Infow(
"received sender report, clock skew against media path",
"current", WrappedRTCPSenderReportStateLogger{srData},
@@ -1077,8 +1022,6 @@ func (r lockedRTPStatsReceiverLogEncoder) MarshalLogObject(e zapcore.ObjectEncod
e.AddInt("clockSkewCount", r.clockSkewCount)
e.AddInt("clockSkewMediaPathCount", r.clockSkewMediaPathCount)
e.AddInt("outOfOrderSenderReportCount", r.outOfOrderSenderReportCount)
e.AddInt("largeJumpCount", r.largeJumpCount)
e.AddInt("largeJumpNegativeCount", r.largeJumpNegativeCount)
e.AddInt("timeReversedCount", r.timeReversedCount)
e.AddInt("packetsDroppedPreStartTimestamp", r.packetsDroppedPreStartTimestamp)
+4 -54
View File
@@ -375,10 +375,8 @@ type RTPStatsSender struct {
nextSenderSnapshotID uint32
senderSnapshots []senderSnapshot
clockSkewCount int
largeJumpNegativeCount int
largeJumpCount int
timeReversedCount int
clockSkewCount int
timeReversedCount int
}
func NewRTPStatsSender(params RTPStatsParams, cacheSize int) *RTPStatsSender {
@@ -566,58 +564,10 @@ func (r *RTPStatsSender) Update(
r.packetsLost--
r.setSnInfo(extSequenceNumber, r.extHighestSN, uint16(pktSize), uint8(hdrSize), uint16(payloadSize), marker, true)
}
if !isDuplicate && -gapSN >= cSequenceNumberLargeJumpThreshold {
r.largeJumpNegativeCount++
if (r.largeJumpNegativeCount-1)%100 == 0 {
sulf := &senderUpdateLoggingFields{
packetTime: packetTime,
extSequenceNumber: extSequenceNumber,
extTimestamp: extTimestamp,
marker: marker,
hdrSize: hdrSize,
payloadSize: payloadSize,
paddingSize: paddingSize,
gapSN: gapSN,
gapTS: int64(extTimestamp - r.extHighestTS),
timeSinceHighest: packetTime - r.highestTime,
rtpStats: r,
}
r.logger.Warnw(
"large sequence number gap negative", nil,
zap.Inline(sulf),
"count", r.largeJumpNegativeCount,
)
}
}
} else { // in-order
if gapSN >= cSequenceNumberLargeJumpThreshold {
r.largeJumpCount++
if (r.largeJumpCount-1)%100 == 0 {
sulf := &senderUpdateLoggingFields{
packetTime: packetTime,
extSequenceNumber: extSequenceNumber,
extTimestamp: extTimestamp,
marker: marker,
hdrSize: hdrSize,
payloadSize: payloadSize,
paddingSize: paddingSize,
gapSN: gapSN,
gapTS: int64(extTimestamp - r.extHighestTS),
timeSinceHighest: packetTime - r.highestTime,
rtpStats: r,
}
r.logger.Warnw(
"large sequence number gap", nil,
zap.Inline(sulf),
"count", r.largeJumpCount,
)
}
}
if extTimestamp < r.extHighestTS {
r.timeReversedCount++
if (r.timeReversedCount-1)%100 == 0 {
if shouldLog(r.timeReversedCount) {
sulf := &senderUpdateLoggingFields{
packetTime: packetTime,
extSequenceNumber: extSequenceNumber,
@@ -1008,7 +958,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek
windowClockRate := float64(rtpDiffSinceLastReport) / timeSinceLastReport.Seconds()
if timeSinceLastReport.Seconds() > 0.2 && math.Abs(float64(r.clockRate)-windowClockRate) > 0.2*float64(r.clockRate) {
r.clockSkewCount++
if (r.clockSkewCount-1)%100 == 0 {
if shouldLog(r.clockSkewCount) {
srlf := &senderReportLoggingFields{
srData: srData,
publisherSRData: publisherSRData,