diff --git a/pkg/sfu/rtpstats/rtpstats_receiver.go b/pkg/sfu/rtpstats/rtpstats_receiver.go index aae1ee2c2..40e60ac4a 100644 --- a/pkg/sfu/rtpstats/rtpstats_receiver.go +++ b/pkg/sfu/rtpstats/rtpstats_receiver.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pion/rtcp" + "go.uber.org/zap" "go.uber.org/zap/zapcore" "github.com/livekit/mediatransportutil" @@ -126,6 +127,49 @@ func (p packet) MarshalLogObject(e zapcore.ObjectEncoder) error { // --------------------------------------------------------------------- +type receiverUpdateLoggingFields struct { + packetTime int64 + sequenceNumber uint16 + timestamp uint32 + marker bool + hdrSize int + payloadSize int + paddingSize int + resSN *utils.WrapAroundUpdateResult[uint64] + gapSN int64 + resTS *utils.WrapAroundUpdateResult[uint64] + gapTS int64 + snRolloverCount int + expectedTSJump int64 + tsRolloverCount int + timeSinceHighest int64 + rtpStats *RTPStatsReceiver +} + +func (rulf *receiverUpdateLoggingFields) MarshalLogObject(e zapcore.ObjectEncoder) error { + if rulf != nil { + e.AddObject("resSN", rulf.resSN) + e.AddInt64("gapSN", rulf.gapSN) + e.AddObject("resTS", rulf.resTS) + e.AddInt64("gapTS", rulf.gapTS) + e.AddInt("snRolloverCount", rulf.snRolloverCount) + e.AddInt64("expectedTSJump", rulf.expectedTSJump) + e.AddInt("tsRolloverCount", rulf.tsRolloverCount) + e.AddTime("packetTime", time.Unix(0, rulf.packetTime)) + e.AddDuration("timeSinceHighest", time.Duration(rulf.timeSinceHighest)) + e.AddUint16("sequenceNumber", rulf.sequenceNumber) + e.AddUint32("timestamp", rulf.timestamp) + e.AddBool("marker", rulf.marker) + e.AddInt("hdrSize", rulf.hdrSize) + e.AddInt("payloadSize", rulf.payloadSize) + e.AddInt("paddingSize", rulf.paddingSize) + e.AddObject("rtpStats", lockedRTPStatsReceiverLogEncoder{rulf.rtpStats}) + } + return nil +} + +// --------------------------------------------------------------------- + type RTPStatsReceiver struct { *rtpStatsBase @@ -150,7 +194,8 @@ type RTPStatsReceiver struct { packetsDroppedPreStartSequenceNumber int packetsDroppedOldSequenceNumber int - restartPackets []packet + restartPacketsBuf [restartThreshold]packet + restartPacketsN int } func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver { @@ -192,6 +237,11 @@ func (r *RTPStatsReceiver) getTSRolloverCount(diffNano int64, ts uint32) int { return int(roc) } +func (r *RTPStatsReceiver) undoUpdatesLocked(resSN utils.WrapAroundUpdateResult[uint64], resTS utils.WrapAroundUpdateResult[uint64]) { + r.sequenceNumber.UndoUpdate(resSN) + r.timestamp.UndoUpdate(resTS) +} + func (r *RTPStatsReceiver) Update( packetTime int64, sequenceNumber uint16, @@ -222,32 +272,6 @@ func (r *RTPStatsReceiver) Update( var tsRolloverCount int var snRolloverCount int - logger := func() logger.UnlikelyLogger { - return r.logger.WithUnlikelyValues( - "resSN", resSN, - "gapSN", gapSN, - "resTS", resTS, - "gapTS", gapTS, - "snRolloverCount", snRolloverCount, - "expectedTSJump", expectedTSJump, - "tsRolloverCount", tsRolloverCount, - "packetTime", time.Unix(0, packetTime), - "timeSinceHighest", time.Duration(timeSinceHighest), - "sequenceNumber", sequenceNumber, - "timestamp", timestamp, - "marker", marker, - "hdrSize", hdrSize, - "payloadSize", payloadSize, - "paddingSize", paddingSize, - "rtpStats", lockedRTPStatsReceiverLogEncoder{r}, - ) - } - - undoUpdates := func() { - r.sequenceNumber.UndoUpdate(resSN) - r.timestamp.UndoUpdate(resTS) - } - if !r.initialized { if payloadSize == 0 { // do not start on a padding only packet @@ -281,17 +305,54 @@ func (r *RTPStatsReceiver) Update( timeSinceHighest = packetTime - r.highestTime tsRolloverCount = r.getTSRolloverCount(timeSinceHighest, timestamp) if tsRolloverCount >= 0 { - logger().Warnw("potential time stamp roll over", nil) + rulf := &receiverUpdateLoggingFields{ + packetTime: packetTime, + sequenceNumber: sequenceNumber, + timestamp: timestamp, + marker: marker, + hdrSize: hdrSize, + payloadSize: payloadSize, + paddingSize: paddingSize, + resSN: &resSN, + gapSN: gapSN, + resTS: &resTS, + gapTS: gapTS, + snRolloverCount: snRolloverCount, + expectedTSJump: expectedTSJump, + tsRolloverCount: tsRolloverCount, + timeSinceHighest: timeSinceHighest, + rtpStats: r, + } + r.logger.Warnw("potential time stamp roll over", nil, zap.Inline(rulf)) } resTS = r.timestamp.Rollover(timestamp, tsRolloverCount) if resTS.IsUnhandled { - undoUpdates() + r.undoUpdatesLocked(resSN, resTS) r.packetsDroppedPreStartTimestamp++ - logger().Warnw("dropping packet, pre-start timestamp", nil) + + rulf := &receiverUpdateLoggingFields{ + packetTime: packetTime, + sequenceNumber: sequenceNumber, + timestamp: timestamp, + marker: marker, + hdrSize: hdrSize, + payloadSize: payloadSize, + paddingSize: paddingSize, + resSN: &resSN, + gapSN: gapSN, + resTS: &resTS, + gapTS: gapTS, + snRolloverCount: snRolloverCount, + expectedTSJump: expectedTSJump, + tsRolloverCount: tsRolloverCount, + timeSinceHighest: timeSinceHighest, + rtpStats: r, + } + r.logger.Warnw("dropping packet, pre-start timestamp", nil, zap.Inline(rulf)) if r.maybeRestart(sequenceNumber, timestamp, payloadSize) { - logger().Infow("potential restart") + r.logger.Infow("potential restart", zap.Inline(rulf)) r.resetRestart() flowState.UnhandledReason = RTPFlowUnhandledReasonRestart } else { @@ -316,13 +377,32 @@ func (r *RTPStatsReceiver) Update( if gapSN < 0 && gapTS > 0 { expectedTSJump = int64(r.rtpConverter.ToRTPExt(time.Duration(timeSinceHighest))) if gapTS > int64(float64(expectedTSJump)*cTSJumpTooHighFactor) { - undoUpdates() + r.undoUpdatesLocked(resSN, resTS) r.packetsDroppedOldTimestamp++ - logger().Warnw("dropping packet, old timestamp", nil) + + rulf := &receiverUpdateLoggingFields{ + packetTime: packetTime, + sequenceNumber: sequenceNumber, + timestamp: timestamp, + marker: marker, + hdrSize: hdrSize, + payloadSize: payloadSize, + paddingSize: paddingSize, + resSN: &resSN, + gapSN: gapSN, + resTS: &resTS, + gapTS: gapTS, + snRolloverCount: snRolloverCount, + expectedTSJump: expectedTSJump, + tsRolloverCount: tsRolloverCount, + timeSinceHighest: timeSinceHighest, + rtpStats: r, + } + r.logger.Warnw("dropping packet, old timestamp", nil, zap.Inline(rulf)) if r.maybeRestart(sequenceNumber, timestamp, payloadSize) { - logger().Infow("potential restart") + r.logger.Infow("potential restart", zap.Inline(rulf)) r.resetRestart() flowState.UnhandledReason = RTPFlowUnhandledReasonRestart } else { @@ -335,14 +415,33 @@ func (r *RTPStatsReceiver) Update( // Case 2: // Sequence number looks like it is moving forward, but it is actually a very old packet. if gapTS < 0 && gapSN > 0 { - undoUpdates() + r.undoUpdatesLocked(resSN, resTS) r.packetsDroppedOldSequenceNumber++ expectedTSJump = int64(r.rtpConverter.ToRTPExt(time.Duration(timeSinceHighest))) - logger().Warnw("dropping packet, old sequence number", nil) + + rulf := &receiverUpdateLoggingFields{ + packetTime: packetTime, + sequenceNumber: sequenceNumber, + timestamp: timestamp, + marker: marker, + hdrSize: hdrSize, + payloadSize: payloadSize, + paddingSize: paddingSize, + resSN: &resSN, + gapSN: gapSN, + resTS: &resTS, + gapTS: gapTS, + snRolloverCount: snRolloverCount, + expectedTSJump: expectedTSJump, + tsRolloverCount: tsRolloverCount, + timeSinceHighest: timeSinceHighest, + rtpStats: r, + } + r.logger.Warnw("dropping packet, old sequence number", nil, zap.Inline(rulf)) if r.maybeRestart(sequenceNumber, timestamp, payloadSize) { - logger().Infow("potential restart") + r.logger.Infow("potential restart", zap.Inline(rulf)) r.resetRestart() flowState.UnhandledReason = RTPFlowUnhandledReasonRestart } else { @@ -362,18 +461,55 @@ func (r *RTPStatsReceiver) Update( } resSN = r.sequenceNumber.Rollover(sequenceNumber, snRolloverCount) if !resSN.IsUnhandled { - logger().Warnw("forcing sequence number rollover", nil) + rulf := &receiverUpdateLoggingFields{ + packetTime: packetTime, + sequenceNumber: sequenceNumber, + timestamp: timestamp, + marker: marker, + hdrSize: hdrSize, + payloadSize: payloadSize, + paddingSize: paddingSize, + resSN: &resSN, + gapSN: gapSN, + resTS: &resTS, + gapTS: gapTS, + snRolloverCount: snRolloverCount, + expectedTSJump: expectedTSJump, + tsRolloverCount: tsRolloverCount, + timeSinceHighest: timeSinceHighest, + rtpStats: r, + } + r.logger.Warnw("forcing sequence number rollover", nil, zap.Inline(rulf)) } } if resSN.IsUnhandled { - undoUpdates() + r.undoUpdatesLocked(resSN, resTS) r.packetsDroppedPreStartSequenceNumber++ - logger().Warnw("dropping packet, pre-start sequence number", nil) + + rulf := &receiverUpdateLoggingFields{ + packetTime: packetTime, + sequenceNumber: sequenceNumber, + timestamp: timestamp, + marker: marker, + hdrSize: hdrSize, + payloadSize: payloadSize, + paddingSize: paddingSize, + resSN: &resSN, + gapSN: gapSN, + resTS: &resTS, + gapTS: gapTS, + snRolloverCount: snRolloverCount, + expectedTSJump: expectedTSJump, + tsRolloverCount: tsRolloverCount, + timeSinceHighest: timeSinceHighest, + rtpStats: r, + } + r.logger.Warnw("dropping packet, pre-start sequence number", nil, zap.Inline(rulf)) if r.maybeRestart(sequenceNumber, timestamp, payloadSize) { - logger().Infow("potential restart") + r.logger.Infow("potential restart", zap.Inline(rulf)) r.resetRestart() flowState.UnhandledReason = RTPFlowUnhandledReasonRestart } else { @@ -406,8 +542,27 @@ func (r *RTPStatsReceiver) Update( if !flowState.IsDuplicate && -gapSN >= cSequenceNumberLargeJumpThreshold { r.largeJumpNegativeCount++ if (r.largeJumpNegativeCount-1)%100 == 0 { - logger().Warnw( + rulf := &receiverUpdateLoggingFields{ + packetTime: packetTime, + sequenceNumber: sequenceNumber, + timestamp: timestamp, + marker: marker, + hdrSize: hdrSize, + payloadSize: payloadSize, + paddingSize: paddingSize, + resSN: &resSN, + gapSN: gapSN, + resTS: &resTS, + gapTS: gapTS, + snRolloverCount: snRolloverCount, + expectedTSJump: expectedTSJump, + tsRolloverCount: tsRolloverCount, + timeSinceHighest: timeSinceHighest, + rtpStats: r, + } + r.logger.Warnw( "large sequence number gap negative", nil, + zap.Inline(rulf), "count", r.largeJumpNegativeCount, ) } @@ -416,8 +571,27 @@ func (r *RTPStatsReceiver) Update( if gapSN >= cSequenceNumberLargeJumpThreshold { r.largeJumpCount++ if (r.largeJumpCount-1)%100 == 0 { - logger().Warnw( + rulf := &receiverUpdateLoggingFields{ + packetTime: packetTime, + sequenceNumber: sequenceNumber, + timestamp: timestamp, + marker: marker, + hdrSize: hdrSize, + payloadSize: payloadSize, + paddingSize: paddingSize, + resSN: &resSN, + gapSN: gapSN, + resTS: &resTS, + gapTS: gapTS, + snRolloverCount: snRolloverCount, + expectedTSJump: expectedTSJump, + tsRolloverCount: tsRolloverCount, + timeSinceHighest: timeSinceHighest, + rtpStats: r, + } + r.logger.Warnw( "large sequence number gap", nil, + zap.Inline(rulf), "count", r.largeJumpCount, ) } @@ -426,8 +600,27 @@ func (r *RTPStatsReceiver) Update( if resTS.ExtendedVal < resTS.PreExtendedHighest { r.timeReversedCount++ if (r.timeReversedCount-1)%100 == 0 { - logger().Warnw( + rulf := &receiverUpdateLoggingFields{ + packetTime: packetTime, + sequenceNumber: sequenceNumber, + timestamp: timestamp, + marker: marker, + hdrSize: hdrSize, + payloadSize: payloadSize, + paddingSize: paddingSize, + resSN: &resSN, + gapSN: gapSN, + resTS: &resTS, + gapTS: gapTS, + snRolloverCount: snRolloverCount, + expectedTSJump: expectedTSJump, + tsRolloverCount: tsRolloverCount, + timeSinceHighest: timeSinceHighest, + rtpStats: r, + } + r.logger.Warnw( "time reversed", nil, + zap.Inline(rulf), "count", r.timeReversedCount, ) } @@ -479,7 +672,9 @@ func (r *RTPStatsReceiver) getExtendedSenderReport(srData *livekit.RTCPSenderRep if r.srNewest != nil { // use time since last sender report to ensure long gaps where the time stamp might // jump more than half the range - timeSinceLastReport := mediatransportutil.NtpTime(srData.NtpTimestamp).Time().Sub(mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time()) + srTime := mediatransportutil.NtpTime(srData.NtpTimestamp).Time() + srNewestTime := mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time() + timeSinceLastReport := srTime.Sub(srNewestTime) expectedRTPTimestampExt := r.srNewest.RtpTimestampExt + r.rtpConverter.ToRTPExt(timeSinceLastReport) lbound := expectedRTPTimestampExt - uint64(cReportSlack*float64(r.clockRate)) ubound := expectedRTPTimestampExt + uint64(cReportSlack*float64(r.clockRate)) @@ -544,11 +739,15 @@ func (r *RTPStatsReceiver) checkRTPClockSkewForSenderReport(srData *livekit.RTCP return } - timeSinceLast := mediatransportutil.NtpTime(srData.NtpTimestamp).Time().Sub(mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time()).Seconds() + srTime := mediatransportutil.NtpTime(srData.NtpTimestamp).Time() + srNewestTime := mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time() + srFirstTime := mediatransportutil.NtpTime(r.srFirst.NtpTimestamp).Time() + + timeSinceLast := srTime.Sub(srNewestTime).Seconds() rtpDiffSinceLast := srData.RtpTimestampExt - r.srNewest.RtpTimestampExt calculatedClockRateFromLast := float64(rtpDiffSinceLast) / timeSinceLast - timeSinceFirst := mediatransportutil.NtpTime(srData.NtpTimestamp).Time().Sub(mediatransportutil.NtpTime(r.srFirst.NtpTimestamp).Time()).Seconds() + timeSinceFirst := srTime.Sub(srFirstTime).Seconds() rtpDiffSinceFirst := srData.RtpTimestampExt - r.srFirst.RtpTimestampExt calculatedClockRateFromFirst := float64(rtpDiffSinceFirst) / timeSinceFirst @@ -612,7 +811,8 @@ func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srDa } func (r *RTPStatsReceiver) updatePropagationDelayAndRecordSenderReport(srData *livekit.RTCPSenderReportState) { - senderClockTime := mediatransportutil.NtpTime(srData.NtpTimestamp).Time().UnixNano() + srTime := mediatransportutil.NtpTime(srData.NtpTimestamp).Time() + senderClockTime := srTime.UnixNano() estimatedPropagationDelay, stepChange := r.propagationDelayEstimator.Update(senderClockTime, srData.At) if stepChange { r.logger.Debugw( @@ -814,17 +1014,23 @@ func (r *RTPStatsReceiver) ExtendedHighestSequenceNumber() uint64 { func (r *RTPStatsReceiver) maybeRestart(sn uint16, ts uint32, payloadSize int) bool { if payloadSize > 0 { - r.restartPackets = append(r.restartPackets, packet{sn, ts}) + if r.restartPacketsN < restartThreshold { + r.restartPacketsBuf[r.restartPacketsN] = packet{sn, ts} + r.restartPacketsN++ + } else { + // keep last restartThreshold entries: shift left and append + copy(r.restartPacketsBuf[:], r.restartPacketsBuf[1:]) + r.restartPacketsBuf[restartThreshold-1] = packet{sn, ts} + } } - if len(r.restartPackets) < restartThreshold { + if r.restartPacketsN < restartThreshold { return false } - r.restartPackets = r.restartPackets[max(len(r.restartPackets)-restartThreshold, 0):] // check for contiguous sequence numbers and equal or increasing timestamps - for i := 1; i < len(r.restartPackets); i++ { - p := &r.restartPackets[i] - prev := &r.restartPackets[i-1] + for i := 1; i < r.restartPacketsN; i++ { + p := &r.restartPacketsBuf[i] + prev := &r.restartPacketsBuf[i-1] if p.sequenceNumber != prev.sequenceNumber+1 || (p.timestamp-prev.timestamp) > (1<<31) { return false } @@ -834,7 +1040,7 @@ func (r *RTPStatsReceiver) maybeRestart(sn uint16, ts uint32, payloadSize int) b } func (r *RTPStatsReceiver) resetRestart() { - r.restartPackets = r.restartPackets[:0] + r.restartPacketsN = 0 } // ---------------------------------- @@ -879,7 +1085,7 @@ func (r lockedRTPStatsReceiverLogEncoder) MarshalLogObject(e zapcore.ObjectEncod e.AddInt("packetsDroppedPreStartSequenceNumber", r.packetsDroppedPreStartSequenceNumber) e.AddInt("packetsDroppedOldSequenceNumber", r.packetsDroppedOldSequenceNumber) - e.AddArray("restartPackets", logger.ObjectSlice(r.restartPackets)) + e.AddArray("restartPackets", logger.ObjectSlice(r.restartPacketsBuf[:r.restartPacketsN])) return nil } diff --git a/pkg/sfu/rtpstats/rtpstats_test.go b/pkg/sfu/rtpstats/rtpstats_test.go index fafbfe50b..8b0ab240e 100644 --- a/pkg/sfu/rtpstats/rtpstats_test.go +++ b/pkg/sfu/rtpstats/rtpstats_test.go @@ -243,10 +243,10 @@ func Test_RTPStatsReceiver_Restart(t *testing.T) { require.False(t, r.maybeRestart(19, 21, 1000)) // can restart as there are enough packets with proper sequencing require.True(t, r.maybeRestart(20, 21, 1000)) - require.Equal(t, restartThreshold, len(r.restartPackets)) + require.Equal(t, restartThreshold, r.restartPacketsN) r.resetRestart() - require.Zero(t, len(r.restartPackets)) + require.Zero(t, r.restartPacketsN) r.Stop() }