Optimise some bits in rtpstats_receiver (#4297)

* Optimise some bits in rtpstats_receiver

RTPStatsReceiver.Update is one of the high CPU bits. Taking some
suggestions from Cursor. Makes the `Update` function verbose though :-(

* zap.Inline logging fields

* rename
This commit is contained in:
Raja Subramanian
2026-02-06 21:26:30 +05:30
committed by GitHub
parent 52a4b848a4
commit cefd5da940
2 changed files with 264 additions and 58 deletions
+262 -56
View File
@@ -20,6 +20,7 @@ import (
"time"
"github.com/pion/rtcp"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/livekit/mediatransportutil"
@@ -126,6 +127,49 @@ func (p packet) MarshalLogObject(e zapcore.ObjectEncoder) error {
// ---------------------------------------------------------------------
type receiverUpdateLoggingFields struct {
packetTime int64
sequenceNumber uint16
timestamp uint32
marker bool
hdrSize int
payloadSize int
paddingSize int
resSN *utils.WrapAroundUpdateResult[uint64]
gapSN int64
resTS *utils.WrapAroundUpdateResult[uint64]
gapTS int64
snRolloverCount int
expectedTSJump int64
tsRolloverCount int
timeSinceHighest int64
rtpStats *RTPStatsReceiver
}
func (rulf *receiverUpdateLoggingFields) MarshalLogObject(e zapcore.ObjectEncoder) error {
if rulf != nil {
e.AddObject("resSN", rulf.resSN)
e.AddInt64("gapSN", rulf.gapSN)
e.AddObject("resTS", rulf.resTS)
e.AddInt64("gapTS", rulf.gapTS)
e.AddInt("snRolloverCount", rulf.snRolloverCount)
e.AddInt64("expectedTSJump", rulf.expectedTSJump)
e.AddInt("tsRolloverCount", rulf.tsRolloverCount)
e.AddTime("packetTime", time.Unix(0, rulf.packetTime))
e.AddDuration("timeSinceHighest", time.Duration(rulf.timeSinceHighest))
e.AddUint16("sequenceNumber", rulf.sequenceNumber)
e.AddUint32("timestamp", rulf.timestamp)
e.AddBool("marker", rulf.marker)
e.AddInt("hdrSize", rulf.hdrSize)
e.AddInt("payloadSize", rulf.payloadSize)
e.AddInt("paddingSize", rulf.paddingSize)
e.AddObject("rtpStats", lockedRTPStatsReceiverLogEncoder{rulf.rtpStats})
}
return nil
}
// ---------------------------------------------------------------------
type RTPStatsReceiver struct {
*rtpStatsBase
@@ -150,7 +194,8 @@ type RTPStatsReceiver struct {
packetsDroppedPreStartSequenceNumber int
packetsDroppedOldSequenceNumber int
restartPackets []packet
restartPacketsBuf [restartThreshold]packet
restartPacketsN int
}
func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver {
@@ -192,6 +237,11 @@ func (r *RTPStatsReceiver) getTSRolloverCount(diffNano int64, ts uint32) int {
return int(roc)
}
func (r *RTPStatsReceiver) undoUpdatesLocked(resSN utils.WrapAroundUpdateResult[uint64], resTS utils.WrapAroundUpdateResult[uint64]) {
r.sequenceNumber.UndoUpdate(resSN)
r.timestamp.UndoUpdate(resTS)
}
func (r *RTPStatsReceiver) Update(
packetTime int64,
sequenceNumber uint16,
@@ -222,32 +272,6 @@ func (r *RTPStatsReceiver) Update(
var tsRolloverCount int
var snRolloverCount int
logger := func() logger.UnlikelyLogger {
return r.logger.WithUnlikelyValues(
"resSN", resSN,
"gapSN", gapSN,
"resTS", resTS,
"gapTS", gapTS,
"snRolloverCount", snRolloverCount,
"expectedTSJump", expectedTSJump,
"tsRolloverCount", tsRolloverCount,
"packetTime", time.Unix(0, packetTime),
"timeSinceHighest", time.Duration(timeSinceHighest),
"sequenceNumber", sequenceNumber,
"timestamp", timestamp,
"marker", marker,
"hdrSize", hdrSize,
"payloadSize", payloadSize,
"paddingSize", paddingSize,
"rtpStats", lockedRTPStatsReceiverLogEncoder{r},
)
}
undoUpdates := func() {
r.sequenceNumber.UndoUpdate(resSN)
r.timestamp.UndoUpdate(resTS)
}
if !r.initialized {
if payloadSize == 0 {
// do not start on a padding only packet
@@ -281,17 +305,54 @@ func (r *RTPStatsReceiver) Update(
timeSinceHighest = packetTime - r.highestTime
tsRolloverCount = r.getTSRolloverCount(timeSinceHighest, timestamp)
if tsRolloverCount >= 0 {
logger().Warnw("potential time stamp roll over", nil)
rulf := &receiverUpdateLoggingFields{
packetTime: packetTime,
sequenceNumber: sequenceNumber,
timestamp: timestamp,
marker: marker,
hdrSize: hdrSize,
payloadSize: payloadSize,
paddingSize: paddingSize,
resSN: &resSN,
gapSN: gapSN,
resTS: &resTS,
gapTS: gapTS,
snRolloverCount: snRolloverCount,
expectedTSJump: expectedTSJump,
tsRolloverCount: tsRolloverCount,
timeSinceHighest: timeSinceHighest,
rtpStats: r,
}
r.logger.Warnw("potential time stamp roll over", nil, zap.Inline(rulf))
}
resTS = r.timestamp.Rollover(timestamp, tsRolloverCount)
if resTS.IsUnhandled {
undoUpdates()
r.undoUpdatesLocked(resSN, resTS)
r.packetsDroppedPreStartTimestamp++
logger().Warnw("dropping packet, pre-start timestamp", nil)
rulf := &receiverUpdateLoggingFields{
packetTime: packetTime,
sequenceNumber: sequenceNumber,
timestamp: timestamp,
marker: marker,
hdrSize: hdrSize,
payloadSize: payloadSize,
paddingSize: paddingSize,
resSN: &resSN,
gapSN: gapSN,
resTS: &resTS,
gapTS: gapTS,
snRolloverCount: snRolloverCount,
expectedTSJump: expectedTSJump,
tsRolloverCount: tsRolloverCount,
timeSinceHighest: timeSinceHighest,
rtpStats: r,
}
r.logger.Warnw("dropping packet, pre-start timestamp", nil, zap.Inline(rulf))
if r.maybeRestart(sequenceNumber, timestamp, payloadSize) {
logger().Infow("potential restart")
r.logger.Infow("potential restart", zap.Inline(rulf))
r.resetRestart()
flowState.UnhandledReason = RTPFlowUnhandledReasonRestart
} else {
@@ -316,13 +377,32 @@ func (r *RTPStatsReceiver) Update(
if gapSN < 0 && gapTS > 0 {
expectedTSJump = int64(r.rtpConverter.ToRTPExt(time.Duration(timeSinceHighest)))
if gapTS > int64(float64(expectedTSJump)*cTSJumpTooHighFactor) {
undoUpdates()
r.undoUpdatesLocked(resSN, resTS)
r.packetsDroppedOldTimestamp++
logger().Warnw("dropping packet, old timestamp", nil)
rulf := &receiverUpdateLoggingFields{
packetTime: packetTime,
sequenceNumber: sequenceNumber,
timestamp: timestamp,
marker: marker,
hdrSize: hdrSize,
payloadSize: payloadSize,
paddingSize: paddingSize,
resSN: &resSN,
gapSN: gapSN,
resTS: &resTS,
gapTS: gapTS,
snRolloverCount: snRolloverCount,
expectedTSJump: expectedTSJump,
tsRolloverCount: tsRolloverCount,
timeSinceHighest: timeSinceHighest,
rtpStats: r,
}
r.logger.Warnw("dropping packet, old timestamp", nil, zap.Inline(rulf))
if r.maybeRestart(sequenceNumber, timestamp, payloadSize) {
logger().Infow("potential restart")
r.logger.Infow("potential restart", zap.Inline(rulf))
r.resetRestart()
flowState.UnhandledReason = RTPFlowUnhandledReasonRestart
} else {
@@ -335,14 +415,33 @@ func (r *RTPStatsReceiver) Update(
// Case 2:
// Sequence number looks like it is moving forward, but it is actually a very old packet.
if gapTS < 0 && gapSN > 0 {
undoUpdates()
r.undoUpdatesLocked(resSN, resTS)
r.packetsDroppedOldSequenceNumber++
expectedTSJump = int64(r.rtpConverter.ToRTPExt(time.Duration(timeSinceHighest)))
logger().Warnw("dropping packet, old sequence number", nil)
rulf := &receiverUpdateLoggingFields{
packetTime: packetTime,
sequenceNumber: sequenceNumber,
timestamp: timestamp,
marker: marker,
hdrSize: hdrSize,
payloadSize: payloadSize,
paddingSize: paddingSize,
resSN: &resSN,
gapSN: gapSN,
resTS: &resTS,
gapTS: gapTS,
snRolloverCount: snRolloverCount,
expectedTSJump: expectedTSJump,
tsRolloverCount: tsRolloverCount,
timeSinceHighest: timeSinceHighest,
rtpStats: r,
}
r.logger.Warnw("dropping packet, old sequence number", nil, zap.Inline(rulf))
if r.maybeRestart(sequenceNumber, timestamp, payloadSize) {
logger().Infow("potential restart")
r.logger.Infow("potential restart", zap.Inline(rulf))
r.resetRestart()
flowState.UnhandledReason = RTPFlowUnhandledReasonRestart
} else {
@@ -362,18 +461,55 @@ func (r *RTPStatsReceiver) Update(
}
resSN = r.sequenceNumber.Rollover(sequenceNumber, snRolloverCount)
if !resSN.IsUnhandled {
logger().Warnw("forcing sequence number rollover", nil)
rulf := &receiverUpdateLoggingFields{
packetTime: packetTime,
sequenceNumber: sequenceNumber,
timestamp: timestamp,
marker: marker,
hdrSize: hdrSize,
payloadSize: payloadSize,
paddingSize: paddingSize,
resSN: &resSN,
gapSN: gapSN,
resTS: &resTS,
gapTS: gapTS,
snRolloverCount: snRolloverCount,
expectedTSJump: expectedTSJump,
tsRolloverCount: tsRolloverCount,
timeSinceHighest: timeSinceHighest,
rtpStats: r,
}
r.logger.Warnw("forcing sequence number rollover", nil, zap.Inline(rulf))
}
}
if resSN.IsUnhandled {
undoUpdates()
r.undoUpdatesLocked(resSN, resTS)
r.packetsDroppedPreStartSequenceNumber++
logger().Warnw("dropping packet, pre-start sequence number", nil)
rulf := &receiverUpdateLoggingFields{
packetTime: packetTime,
sequenceNumber: sequenceNumber,
timestamp: timestamp,
marker: marker,
hdrSize: hdrSize,
payloadSize: payloadSize,
paddingSize: paddingSize,
resSN: &resSN,
gapSN: gapSN,
resTS: &resTS,
gapTS: gapTS,
snRolloverCount: snRolloverCount,
expectedTSJump: expectedTSJump,
tsRolloverCount: tsRolloverCount,
timeSinceHighest: timeSinceHighest,
rtpStats: r,
}
r.logger.Warnw("dropping packet, pre-start sequence number", nil, zap.Inline(rulf))
if r.maybeRestart(sequenceNumber, timestamp, payloadSize) {
logger().Infow("potential restart")
r.logger.Infow("potential restart", zap.Inline(rulf))
r.resetRestart()
flowState.UnhandledReason = RTPFlowUnhandledReasonRestart
} else {
@@ -406,8 +542,27 @@ func (r *RTPStatsReceiver) Update(
if !flowState.IsDuplicate && -gapSN >= cSequenceNumberLargeJumpThreshold {
r.largeJumpNegativeCount++
if (r.largeJumpNegativeCount-1)%100 == 0 {
logger().Warnw(
rulf := &receiverUpdateLoggingFields{
packetTime: packetTime,
sequenceNumber: sequenceNumber,
timestamp: timestamp,
marker: marker,
hdrSize: hdrSize,
payloadSize: payloadSize,
paddingSize: paddingSize,
resSN: &resSN,
gapSN: gapSN,
resTS: &resTS,
gapTS: gapTS,
snRolloverCount: snRolloverCount,
expectedTSJump: expectedTSJump,
tsRolloverCount: tsRolloverCount,
timeSinceHighest: timeSinceHighest,
rtpStats: r,
}
r.logger.Warnw(
"large sequence number gap negative", nil,
zap.Inline(rulf),
"count", r.largeJumpNegativeCount,
)
}
@@ -416,8 +571,27 @@ func (r *RTPStatsReceiver) Update(
if gapSN >= cSequenceNumberLargeJumpThreshold {
r.largeJumpCount++
if (r.largeJumpCount-1)%100 == 0 {
logger().Warnw(
rulf := &receiverUpdateLoggingFields{
packetTime: packetTime,
sequenceNumber: sequenceNumber,
timestamp: timestamp,
marker: marker,
hdrSize: hdrSize,
payloadSize: payloadSize,
paddingSize: paddingSize,
resSN: &resSN,
gapSN: gapSN,
resTS: &resTS,
gapTS: gapTS,
snRolloverCount: snRolloverCount,
expectedTSJump: expectedTSJump,
tsRolloverCount: tsRolloverCount,
timeSinceHighest: timeSinceHighest,
rtpStats: r,
}
r.logger.Warnw(
"large sequence number gap", nil,
zap.Inline(rulf),
"count", r.largeJumpCount,
)
}
@@ -426,8 +600,27 @@ func (r *RTPStatsReceiver) Update(
if resTS.ExtendedVal < resTS.PreExtendedHighest {
r.timeReversedCount++
if (r.timeReversedCount-1)%100 == 0 {
logger().Warnw(
rulf := &receiverUpdateLoggingFields{
packetTime: packetTime,
sequenceNumber: sequenceNumber,
timestamp: timestamp,
marker: marker,
hdrSize: hdrSize,
payloadSize: payloadSize,
paddingSize: paddingSize,
resSN: &resSN,
gapSN: gapSN,
resTS: &resTS,
gapTS: gapTS,
snRolloverCount: snRolloverCount,
expectedTSJump: expectedTSJump,
tsRolloverCount: tsRolloverCount,
timeSinceHighest: timeSinceHighest,
rtpStats: r,
}
r.logger.Warnw(
"time reversed", nil,
zap.Inline(rulf),
"count", r.timeReversedCount,
)
}
@@ -479,7 +672,9 @@ func (r *RTPStatsReceiver) getExtendedSenderReport(srData *livekit.RTCPSenderRep
if r.srNewest != nil {
// use time since last sender report to ensure long gaps where the time stamp might
// jump more than half the range
timeSinceLastReport := mediatransportutil.NtpTime(srData.NtpTimestamp).Time().Sub(mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time())
srTime := mediatransportutil.NtpTime(srData.NtpTimestamp).Time()
srNewestTime := mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time()
timeSinceLastReport := srTime.Sub(srNewestTime)
expectedRTPTimestampExt := r.srNewest.RtpTimestampExt + r.rtpConverter.ToRTPExt(timeSinceLastReport)
lbound := expectedRTPTimestampExt - uint64(cReportSlack*float64(r.clockRate))
ubound := expectedRTPTimestampExt + uint64(cReportSlack*float64(r.clockRate))
@@ -544,11 +739,15 @@ func (r *RTPStatsReceiver) checkRTPClockSkewForSenderReport(srData *livekit.RTCP
return
}
timeSinceLast := mediatransportutil.NtpTime(srData.NtpTimestamp).Time().Sub(mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time()).Seconds()
srTime := mediatransportutil.NtpTime(srData.NtpTimestamp).Time()
srNewestTime := mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time()
srFirstTime := mediatransportutil.NtpTime(r.srFirst.NtpTimestamp).Time()
timeSinceLast := srTime.Sub(srNewestTime).Seconds()
rtpDiffSinceLast := srData.RtpTimestampExt - r.srNewest.RtpTimestampExt
calculatedClockRateFromLast := float64(rtpDiffSinceLast) / timeSinceLast
timeSinceFirst := mediatransportutil.NtpTime(srData.NtpTimestamp).Time().Sub(mediatransportutil.NtpTime(r.srFirst.NtpTimestamp).Time()).Seconds()
timeSinceFirst := srTime.Sub(srFirstTime).Seconds()
rtpDiffSinceFirst := srData.RtpTimestampExt - r.srFirst.RtpTimestampExt
calculatedClockRateFromFirst := float64(rtpDiffSinceFirst) / timeSinceFirst
@@ -612,7 +811,8 @@ func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srDa
}
func (r *RTPStatsReceiver) updatePropagationDelayAndRecordSenderReport(srData *livekit.RTCPSenderReportState) {
senderClockTime := mediatransportutil.NtpTime(srData.NtpTimestamp).Time().UnixNano()
srTime := mediatransportutil.NtpTime(srData.NtpTimestamp).Time()
senderClockTime := srTime.UnixNano()
estimatedPropagationDelay, stepChange := r.propagationDelayEstimator.Update(senderClockTime, srData.At)
if stepChange {
r.logger.Debugw(
@@ -814,17 +1014,23 @@ func (r *RTPStatsReceiver) ExtendedHighestSequenceNumber() uint64 {
func (r *RTPStatsReceiver) maybeRestart(sn uint16, ts uint32, payloadSize int) bool {
if payloadSize > 0 {
r.restartPackets = append(r.restartPackets, packet{sn, ts})
if r.restartPacketsN < restartThreshold {
r.restartPacketsBuf[r.restartPacketsN] = packet{sn, ts}
r.restartPacketsN++
} else {
// keep last restartThreshold entries: shift left and append
copy(r.restartPacketsBuf[:], r.restartPacketsBuf[1:])
r.restartPacketsBuf[restartThreshold-1] = packet{sn, ts}
}
}
if len(r.restartPackets) < restartThreshold {
if r.restartPacketsN < restartThreshold {
return false
}
r.restartPackets = r.restartPackets[max(len(r.restartPackets)-restartThreshold, 0):]
// check for contiguous sequence numbers and equal or increasing timestamps
for i := 1; i < len(r.restartPackets); i++ {
p := &r.restartPackets[i]
prev := &r.restartPackets[i-1]
for i := 1; i < r.restartPacketsN; i++ {
p := &r.restartPacketsBuf[i]
prev := &r.restartPacketsBuf[i-1]
if p.sequenceNumber != prev.sequenceNumber+1 || (p.timestamp-prev.timestamp) > (1<<31) {
return false
}
@@ -834,7 +1040,7 @@ func (r *RTPStatsReceiver) maybeRestart(sn uint16, ts uint32, payloadSize int) b
}
func (r *RTPStatsReceiver) resetRestart() {
r.restartPackets = r.restartPackets[:0]
r.restartPacketsN = 0
}
// ----------------------------------
@@ -879,7 +1085,7 @@ func (r lockedRTPStatsReceiverLogEncoder) MarshalLogObject(e zapcore.ObjectEncod
e.AddInt("packetsDroppedPreStartSequenceNumber", r.packetsDroppedPreStartSequenceNumber)
e.AddInt("packetsDroppedOldSequenceNumber", r.packetsDroppedOldSequenceNumber)
e.AddArray("restartPackets", logger.ObjectSlice(r.restartPackets))
e.AddArray("restartPackets", logger.ObjectSlice(r.restartPacketsBuf[:r.restartPacketsN]))
return nil
}
+2 -2
View File
@@ -243,10 +243,10 @@ func Test_RTPStatsReceiver_Restart(t *testing.T) {
require.False(t, r.maybeRestart(19, 21, 1000))
// can restart as there are enough packets with proper sequencing
require.True(t, r.maybeRestart(20, 21, 1000))
require.Equal(t, restartThreshold, len(r.restartPackets))
require.Equal(t, restartThreshold, r.restartPacketsN)
r.resetRestart()
require.Zero(t, len(r.restartPackets))
require.Zero(t, r.restartPacketsN)
r.Stop()
}