diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 5474c2664..dd0916d82 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -570,7 +570,14 @@ func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime time.Tim if errors.Is(err, bucket.ErrPacketTooOld) { packetTooOldCount := b.packetTooOldCount.Inc() if (packetTooOldCount-1)%100 == 0 { - b.logger.Warnw("could not add packet to bucket", err, "count", packetTooOldCount) + b.logger.Warnw( + "could not add packet to bucket", err, + "count", packetTooOldCount, + "flowState", &flowState, + "snAdjustment", snAdjustment, + "incomingSequenceNumber", flowState.ExtSequenceNumber+snAdjustment, + "rtpStats", b.rtpStats, + ) } } else if err != bucket.ErrRTXPacket { b.logger.Warnw("could not add packet to bucket", err) diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index af8ecfe21..51b06c8d0 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -635,6 +635,81 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes } } +func (r *rtpStatsBase) MarshalLogObject(e zapcore.ObjectEncoder) error { + if r == nil { + return nil + } + + e.AddTime("startTime", r.startTime) + e.AddTime("firstTime", r.firstTime) + e.AddTime("highestTime", r.highestTime) + + e.AddUint64("bytes", r.bytes) + e.AddUint64("headerBytes", r.headerBytes) + + e.AddUint64("packetsDuplicate", r.packetsDuplicate) + e.AddUint64("bytesDuplicate", r.bytesDuplicate) + e.AddUint64("headerBytesDuplicate", r.headerBytesDuplicate) + + e.AddUint64("packetsPadding", r.packetsPadding) + e.AddUint64("bytesPadding", r.bytesPadding) + e.AddUint64("headerBytesPadding", r.headerBytesPadding) + + e.AddUint64("packetsOutOfOrder", r.packetsOutOfOrder) + + e.AddUint64("packetsLost", r.packetsLost) + + e.AddUint32("frames", r.frames) + + e.AddFloat64("jitter", r.jitter) + e.AddFloat64("maxJitter", r.maxJitter) + + hasLoss := false + first := true + str := "[" + for burst, count := range r.gapHistogram { + if count == 0 { + continue + } + + hasLoss = true + + if !first { + str += ", " + } + first = false + str += fmt.Sprintf("%d:%d", burst+1, count) + } + str += "]" + if hasLoss { + e.AddString("gapHistogram", str) + } + + e.AddUint32("nacks", r.nacks) + e.AddUint32("nackAcks", r.nackAcks) + e.AddUint32("nackMisses", r.nackMisses) + e.AddUint32("nackRepeated", r.nackRepeated) + + e.AddUint32("plis", r.plis) + e.AddTime("lastPli", r.lastPli) + + e.AddUint32("layerLockPlis", r.layerLockPlis) + e.AddTime("lastLayerLockPli", r.lastLayerLockPli) + + e.AddUint32("firs", r.firs) + e.AddTime("lastFir", r.lastFir) + + e.AddUint32("keyFrames", r.keyFrames) + e.AddTime("lastKeyFrame", r.lastKeyFrame) + + e.AddUint32("rtt", r.rtt) + e.AddUint32("maxRtt", r.maxRtt) + + e.AddObject("srFirst", r.srFirst) + e.AddObject("srNewest", r.srNewest) + return nil +} + func (r *rtpStatsBase) toString( extStartSN, extHighestSN, extStartTS, extHighestTS uint64, packetsLost uint64, diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 9ffc2507e..a82ba4b4a 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pion/rtcp" + "go.uber.org/zap/zapcore" "github.com/livekit/livekit-server/pkg/sfu/utils" "github.com/livekit/protocol/livekit" @@ -61,6 +62,8 @@ const ( cReportSlack = float64(60.0) ) +// --------------------------------------------------------------------- + type RTPFlowState struct { IsNotHandled bool @@ -75,6 +78,24 @@ type RTPFlowState struct { ExtTimestamp uint64 } +func (r *RTPFlowState) MarshalLogObject(e zapcore.ObjectEncoder) error { + if r == nil { + return nil + } + + e.AddBool("IsNotHandled", r.IsNotHandled) + e.AddBool("HasLoss", r.HasLoss) + e.AddUint64("LossStartInclusive", r.LossStartInclusive) + e.AddUint64("LossEndExclusive", r.LossEndExclusive) + e.AddBool("IsDuplicate", r.IsDuplicate) + e.AddBool("IsOutOfOrder", r.IsOutOfOrder) + e.AddUint64("ExtSequenceNumber", r.ExtSequenceNumber) + e.AddUint64("ExtTimestamp", r.ExtTimestamp) + return nil +} + +// --------------------------------------------------------------------- + type RTPStatsReceiver struct { *rtpStatsBase @@ -92,6 +113,8 @@ type RTPStatsReceiver struct { clockSkewCount int outOfOrderSsenderReportCount int + largeJumpCount int + largeJumpNegativeCount int } func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver { @@ -175,6 +198,7 @@ func (r *RTPStatsReceiver) Update( "extHighestSN", r.sequenceNumber.GetExtendedHighest(), "extStartTS", r.timestamp.GetExtendedStart(), "extHighestTS", r.timestamp.GetExtendedHighest(), + "startTime", r.startTime.String(), "firstTime", r.firstTime.String(), "highestTime", r.highestTime.String(), "prevSN", resSN.PreExtendedHighest, @@ -194,7 +218,13 @@ func (r *RTPStatsReceiver) Update( } if gapSN <= 0 { // duplicate OR out-of-order if -gapSN >= cNumSequenceNumbers/2 { - r.logger.Warnw("large sequence number gap negative", nil, getLoggingFields()...) + if r.largeJumpNegativeCount%100 == 0 { + r.logger.Warnw( + "large sequence number gap negative", nil, + append(getLoggingFields(), "count", r.largeJumpNegativeCount)..., + ) + } + r.largeJumpNegativeCount++ } if gapSN != 0 { @@ -218,7 +248,13 @@ func (r *RTPStatsReceiver) Update( flowState.ExtTimestamp = resTS.ExtendedVal } else { // in-order if gapSN >= cNumSequenceNumbers/2 || resTS.ExtendedVal < resTS.PreExtendedHighest { - r.logger.Warnw("large sequence number gap OR time reversed", nil, getLoggingFields()...) + if r.largeJumpCount%100 == 0 { + r.logger.Warnw( + "large sequence number gap OR time reversed", nil, + append(getLoggingFields(), "count", r.largeJumpCount)..., + ) + } + r.largeJumpCount++ } // update gap histogram @@ -548,6 +584,22 @@ func (r *RTPStatsReceiver) DeltaInfo(snapshotID uint32) *RTPDeltaInfo { return r.deltaInfo(snapshotID, r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest()) } +func (r *RTPStatsReceiver) MarshalLogObject(e zapcore.ObjectEncoder) error { + if r == nil { + return nil + } + + r.lock.RLock() + defer r.lock.RUnlock() + + e.AddObject("base", r.rtpStatsBase) + e.AddUint64("extendedStartSN", r.sequenceNumber.GetExtendedStart()) + e.AddUint64("extHighestSN", r.sequenceNumber.GetExtendedHighest()) + e.AddUint64("extStartTS", r.timestamp.GetExtendedStart()) + e.AddUint64("extHighestTS", r.timestamp.GetExtendedHighest()) + return nil +} + func (r *RTPStatsReceiver) String() string { r.lock.RLock() defer r.lock.RUnlock() diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index a46e4de07..64aca2fa8 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pion/rtcp" + "go.uber.org/zap/zapcore" "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/livekit" @@ -163,6 +164,8 @@ type RTPStatsSender struct { clockSkewCount int metadataCacheOverflowCount int + largeJumpNegativeCount int + largeJumpCount int } func NewRTPStatsSender(params RTPStatsParams) *RTPStatsSender { @@ -290,6 +293,7 @@ func (r *RTPStatsSender) Update( "extHighestSN", r.extHighestSN, "extStartTS", r.extStartTS, "extHighestTS", r.extHighestTS, + "startTime", r.startTime.String(), "firstTime", r.firstTime.String(), "highestTime", r.highestTime.String(), "prevSN", r.extHighestSN, @@ -315,7 +319,13 @@ func (r *RTPStatsSender) Update( return } if -gapSN >= cNumSequenceNumbers/2 { - r.logger.Warnw("large sequence number gap negative", nil, getLoggingFields()...) + if r.largeJumpNegativeCount%100 == 0 { + r.logger.Warnw( + "large sequence number gap negative", nil, + append(getLoggingFields(), "count", r.largeJumpNegativeCount)..., + ) + } + r.largeJumpNegativeCount++ } if extSequenceNumber < r.extStartSN { @@ -365,7 +375,13 @@ func (r *RTPStatsSender) Update( } } else { // in-order if gapSN >= cNumSequenceNumbers/2 || extTimestamp < r.extHighestTS { - r.logger.Warnw("large sequence number gap OR time reversed", nil, getLoggingFields()...) + if r.largeJumpCount%100 == 0 { + r.logger.Warnw( + "large sequence number gap OR time reversed", nil, + append(getLoggingFields(), "count", r.largeJumpCount)..., + ) + } + r.largeJumpCount++ } // update gap histogram @@ -812,6 +828,28 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo } } +func (r *RTPStatsSender) MarshalLogObject(e zapcore.ObjectEncoder) error { + if r == nil { + return nil + } + + r.lock.RLock() + defer r.lock.RUnlock() + + e.AddObject("base", r.rtpStatsBase) + e.AddUint64("extStartSN", r.extStartSN) + e.AddUint64("extHighestSN", r.extHighestSN) + e.AddUint64("extStartTS", r.extStartTS) + e.AddUint64("extHighestTS", r.extHighestTS) + e.AddTime("lastRRTime", r.lastRRTime) + e.AddReflected("lastRR", r.lastRR) + e.AddUint64("extHighestSNFromRR", r.extHighestSNFromRR) + e.AddUint64("packetsLostFromRR", r.packetsLostFromRR) + e.AddFloat64("jitterFromRR", r.jitterFromRR) + e.AddFloat64("maxJitterFromRR", r.maxJitterFromRR) + return nil +} + func (r *RTPStatsSender) String() string { r.lock.RLock() defer r.lock.RUnlock()