More detailed logging to understand old packets. (#2730)

This commit is contained in:
Raja Subramanian
2024-05-25 18:34:55 +05:30
committed by GitHub
parent cf8c462805
commit 8be2005e0f
4 changed files with 177 additions and 5 deletions
+8 -1
View File
@@ -570,7 +570,14 @@ func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime time.Tim
if errors.Is(err, bucket.ErrPacketTooOld) {
packetTooOldCount := b.packetTooOldCount.Inc()
if (packetTooOldCount-1)%100 == 0 {
b.logger.Warnw("could not add packet to bucket", err, "count", packetTooOldCount)
b.logger.Warnw(
"could not add packet to bucket", err,
"count", packetTooOldCount,
"flowState", &flowState,
"snAdjustment", snAdjustment,
"incomingSequenceNumber", flowState.ExtSequenceNumber+snAdjustment,
"rtpStats", b.rtpStats,
)
}
} else if err != bucket.ErrRTXPacket {
b.logger.Warnw("could not add packet to bucket", err)
+75
View File
@@ -635,6 +635,81 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes
}
}
func (r *rtpStatsBase) MarshalLogObject(e zapcore.ObjectEncoder) error {
if r == nil {
return nil
}
e.AddTime("startTime", r.startTime)
e.AddTime("firstTime", r.firstTime)
e.AddTime("highestTime", r.highestTime)
e.AddUint64("bytes", r.bytes)
e.AddUint64("headerBytes", r.headerBytes)
e.AddUint64("packetsDuplicate", r.packetsDuplicate)
e.AddUint64("bytesDuplicate", r.bytesDuplicate)
e.AddUint64("headerBytesDuplicate", r.headerBytesDuplicate)
e.AddUint64("packetsPadding", r.packetsPadding)
e.AddUint64("bytesPadding", r.bytesPadding)
e.AddUint64("headerBytesPadding", r.headerBytesPadding)
e.AddUint64("packetsOutOfOrder", r.packetsOutOfOrder)
e.AddUint64("packetsLost", r.packetsLost)
e.AddUint32("frames", r.frames)
e.AddFloat64("jitter", r.jitter)
e.AddFloat64("maxJitter", r.maxJitter)
hasLoss := false
first := true
str := "["
for burst, count := range r.gapHistogram {
if count == 0 {
continue
}
hasLoss = true
if !first {
str += ", "
}
first = false
str += fmt.Sprintf("%d:%d", burst+1, count)
}
str += "]"
if hasLoss {
e.AddString("gapHistogram", str)
}
e.AddUint32("nacks", r.nacks)
e.AddUint32("nackAcks", r.nackAcks)
e.AddUint32("nackMisses", r.nackMisses)
e.AddUint32("nackRepeated", r.nackRepeated)
e.AddUint32("plis", r.plis)
e.AddTime("lastPli", r.lastPli)
e.AddUint32("layerLockPlis", r.layerLockPlis)
e.AddTime("lastLayerLockPli", r.lastLayerLockPli)
e.AddUint32("firs", r.firs)
e.AddTime("lastFir", r.lastFir)
e.AddUint32("keyFrames", r.keyFrames)
e.AddTime("lastKeyFrame", r.lastKeyFrame)
e.AddUint32("rtt", r.rtt)
e.AddUint32("maxRtt", r.maxRtt)
e.AddObject("srFirst", r.srFirst)
e.AddObject("srNewest", r.srNewest)
return nil
}
func (r *rtpStatsBase) toString(
extStartSN, extHighestSN, extStartTS, extHighestTS uint64,
packetsLost uint64,
+54 -2
View File
@@ -20,6 +20,7 @@ import (
"time"
"github.com/pion/rtcp"
"go.uber.org/zap/zapcore"
"github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/protocol/livekit"
@@ -61,6 +62,8 @@ const (
cReportSlack = float64(60.0)
)
// ---------------------------------------------------------------------
type RTPFlowState struct {
IsNotHandled bool
@@ -75,6 +78,24 @@ type RTPFlowState struct {
ExtTimestamp uint64
}
func (r *RTPFlowState) MarshalLogObject(e zapcore.ObjectEncoder) error {
if r == nil {
return nil
}
e.AddBool("IsNotHandled", r.IsNotHandled)
e.AddBool("HasLoss", r.HasLoss)
e.AddUint64("LossStartInclusive", r.LossStartInclusive)
e.AddUint64("LossEndExclusive", r.LossEndExclusive)
e.AddBool("IsDuplicate", r.IsDuplicate)
e.AddBool("IsOutOfOrder", r.IsOutOfOrder)
e.AddUint64("ExtSequenceNumber", r.ExtSequenceNumber)
e.AddUint64("ExtTimestamp", r.ExtTimestamp)
return nil
}
// ---------------------------------------------------------------------
type RTPStatsReceiver struct {
*rtpStatsBase
@@ -92,6 +113,8 @@ type RTPStatsReceiver struct {
clockSkewCount int
outOfOrderSsenderReportCount int
largeJumpCount int
largeJumpNegativeCount int
}
func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver {
@@ -175,6 +198,7 @@ func (r *RTPStatsReceiver) Update(
"extHighestSN", r.sequenceNumber.GetExtendedHighest(),
"extStartTS", r.timestamp.GetExtendedStart(),
"extHighestTS", r.timestamp.GetExtendedHighest(),
"startTime", r.startTime.String(),
"firstTime", r.firstTime.String(),
"highestTime", r.highestTime.String(),
"prevSN", resSN.PreExtendedHighest,
@@ -194,7 +218,13 @@ func (r *RTPStatsReceiver) Update(
}
if gapSN <= 0 { // duplicate OR out-of-order
if -gapSN >= cNumSequenceNumbers/2 {
r.logger.Warnw("large sequence number gap negative", nil, getLoggingFields()...)
if r.largeJumpNegativeCount%100 == 0 {
r.logger.Warnw(
"large sequence number gap negative", nil,
append(getLoggingFields(), "count", r.largeJumpNegativeCount)...,
)
}
r.largeJumpNegativeCount++
}
if gapSN != 0 {
@@ -218,7 +248,13 @@ func (r *RTPStatsReceiver) Update(
flowState.ExtTimestamp = resTS.ExtendedVal
} else { // in-order
if gapSN >= cNumSequenceNumbers/2 || resTS.ExtendedVal < resTS.PreExtendedHighest {
r.logger.Warnw("large sequence number gap OR time reversed", nil, getLoggingFields()...)
if r.largeJumpCount%100 == 0 {
r.logger.Warnw(
"large sequence number gap OR time reversed", nil,
append(getLoggingFields(), "count", r.largeJumpCount)...,
)
}
r.largeJumpCount++
}
// update gap histogram
@@ -548,6 +584,22 @@ func (r *RTPStatsReceiver) DeltaInfo(snapshotID uint32) *RTPDeltaInfo {
return r.deltaInfo(snapshotID, r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest())
}
func (r *RTPStatsReceiver) MarshalLogObject(e zapcore.ObjectEncoder) error {
if r == nil {
return nil
}
r.lock.RLock()
defer r.lock.RUnlock()
e.AddObject("base", r.rtpStatsBase)
e.AddUint64("extendedStartSN", r.sequenceNumber.GetExtendedStart())
e.AddUint64("extHighestSN", r.sequenceNumber.GetExtendedHighest())
e.AddUint64("extStartTS", r.timestamp.GetExtendedStart())
e.AddUint64("extHighestTS", r.timestamp.GetExtendedHighest())
return nil
}
func (r *RTPStatsReceiver) String() string {
r.lock.RLock()
defer r.lock.RUnlock()
+40 -2
View File
@@ -21,6 +21,7 @@ import (
"time"
"github.com/pion/rtcp"
"go.uber.org/zap/zapcore"
"github.com/livekit/mediatransportutil"
"github.com/livekit/protocol/livekit"
@@ -163,6 +164,8 @@ type RTPStatsSender struct {
clockSkewCount int
metadataCacheOverflowCount int
largeJumpNegativeCount int
largeJumpCount int
}
func NewRTPStatsSender(params RTPStatsParams) *RTPStatsSender {
@@ -290,6 +293,7 @@ func (r *RTPStatsSender) Update(
"extHighestSN", r.extHighestSN,
"extStartTS", r.extStartTS,
"extHighestTS", r.extHighestTS,
"startTime", r.startTime.String(),
"firstTime", r.firstTime.String(),
"highestTime", r.highestTime.String(),
"prevSN", r.extHighestSN,
@@ -315,7 +319,13 @@ func (r *RTPStatsSender) Update(
return
}
if -gapSN >= cNumSequenceNumbers/2 {
r.logger.Warnw("large sequence number gap negative", nil, getLoggingFields()...)
if r.largeJumpNegativeCount%100 == 0 {
r.logger.Warnw(
"large sequence number gap negative", nil,
append(getLoggingFields(), "count", r.largeJumpNegativeCount)...,
)
}
r.largeJumpNegativeCount++
}
if extSequenceNumber < r.extStartSN {
@@ -365,7 +375,13 @@ func (r *RTPStatsSender) Update(
}
} else { // in-order
if gapSN >= cNumSequenceNumbers/2 || extTimestamp < r.extHighestTS {
r.logger.Warnw("large sequence number gap OR time reversed", nil, getLoggingFields()...)
if r.largeJumpCount%100 == 0 {
r.logger.Warnw(
"large sequence number gap OR time reversed", nil,
append(getLoggingFields(), "count", r.largeJumpCount)...,
)
}
r.largeJumpCount++
}
// update gap histogram
@@ -812,6 +828,28 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo
}
}
func (r *RTPStatsSender) MarshalLogObject(e zapcore.ObjectEncoder) error {
if r == nil {
return nil
}
r.lock.RLock()
defer r.lock.RUnlock()
e.AddObject("base", r.rtpStatsBase)
e.AddUint64("extStartSN", r.extStartSN)
e.AddUint64("extHighestSN", r.extHighestSN)
e.AddUint64("extStartTS", r.extStartTS)
e.AddUint64("extHighestTS", r.extHighestTS)
e.AddTime("lastRRTime", r.lastRRTime)
e.AddReflected("lastRR", r.lastRR)
e.AddUint64("extHighestSNFromRR", r.extHighestSNFromRR)
e.AddUint64("packetsLostFromRR", r.packetsLostFromRR)
e.AddFloat64("jitterFromRR", r.jitterFromRR)
e.AddFloat64("maxJitterFromRR", r.maxJitterFromRR)
return nil
}
func (r *RTPStatsSender) String() string {
r.lock.RLock()
defer r.lock.RUnlock()