Log rtp stats more consistently. (#2816)

* Log rtp stats more consistently.

Thank you Paul for the logging tip.
Also update deps.

* remove duplicate logging field

* nil check
This commit is contained in:
Raja Subramanian
2024-06-25 14:55:42 +05:30
committed by GitHub
parent ddee5d012f
commit fa490dd510
5 changed files with 127 additions and 144 deletions
+15 -13
View File
@@ -15,6 +15,7 @@
package buffer
import (
"errors"
"fmt"
"sync"
"time"
@@ -573,10 +574,10 @@ func (r *rtpStatsBase) getTotalPacketsPrimary(extStartSN, extHighestSN uint64) u
return packetsSeen - r.packetsPadding
}
func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighestSN uint64) *RTPDeltaInfo {
func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighestSN uint64) (deltaInfo *RTPDeltaInfo, err error, loggingFields []interface{}) {
then, now := r.getAndResetSnapshot(snapshotID, extStartSN, extHighestSN)
if now == nil || then == nil {
return nil
return
}
startTime := then.startTime
@@ -587,22 +588,23 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes
packetsExpected = 0
}
if packetsExpected > cNumSequenceNumbers {
r.logger.Infow(
"too many packets expected in delta",
loggingFields = []interface{}{
"startSN", then.extStartSN,
"endSN", now.extStartSN,
"packetsExpected", packetsExpected,
"startTime", startTime,
"endTime", endTime,
"duration", endTime.Sub(startTime).String(),
)
return nil
}
err = errors.New("too many packets expected in delta")
return
}
if packetsExpected == 0 {
return &RTPDeltaInfo{
deltaInfo = &RTPDeltaInfo{
StartTime: startTime,
EndTime: endTime,
}
return
}
packetsLost := uint32(now.packetsLost - then.packetsLost)
@@ -613,21 +615,20 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes
// padding packets delta could be higher than expected due to out-of-order padding packets
packetsPadding := now.packetsPadding - then.packetsPadding
if packetsExpected < packetsPadding {
r.logger.Infow(
"padding packets more than expected",
loggingFields = []interface{}{
"packetsExpected", packetsExpected,
"packetsPadding", packetsPadding,
"packetsLost", packetsLost,
"startSequenceNumber", then.extStartSN,
"endSequenceNumber", now.extStartSN-1,
"rtpStats", r,
)
"endSequenceNumber", now.extStartSN - 1,
}
err = errors.New("padding packets more than expected")
packetsExpected = 0
} else {
packetsExpected -= packetsPadding
}
return &RTPDeltaInfo{
deltaInfo = &RTPDeltaInfo{
StartTime: startTime,
EndTime: endTime,
Packets: uint32(packetsExpected),
@@ -648,6 +649,7 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes
Plis: now.plis - then.plis,
Firs: now.firs - then.firs,
}
return
}
func (r *rtpStatsBase) MarshalLogObject(e zapcore.ObjectEncoder) error {
+40 -34
View File
@@ -176,10 +176,7 @@ func (r *RTPStatsReceiver) Update(
r.logger.Debugw(
"rtp receiver stream start",
"startTime", r.startTime.String(),
"firstTime", r.firstTime.String(),
"startSN", r.sequenceNumber.GetExtendedStart(),
"startTS", r.timestamp.GetExtendedStart(),
"rtpStats", lockedRTPStatsReceiverLogEncoder{r},
)
} else {
resSN = r.sequenceNumber.Update(sequenceNumber)
@@ -194,13 +191,6 @@ func (r *RTPStatsReceiver) Update(
gapSN := int64(resSN.ExtendedVal - resSN.PreExtendedHighest)
getLoggingFields := func() []interface{} {
return []interface{}{
"extStartSN", r.sequenceNumber.GetExtendedStart(),
"extHighestSN", r.sequenceNumber.GetExtendedHighest(),
"extStartTS", r.timestamp.GetExtendedStart(),
"extHighestTS", r.timestamp.GetExtendedHighest(),
"startTime", r.startTime.String(),
"firstTime", r.firstTime.String(),
"highestTime", r.highestTime.String(),
"prevSN", resSN.PreExtendedHighest,
"currSN", resSN.ExtendedVal,
"gapSN", gapSN,
@@ -214,8 +204,6 @@ func (r *RTPStatsReceiver) Update(
"hdrSize", hdrSize,
"payloadSize", payloadSize,
"paddingSize", paddingSize,
"first", r.srFirst,
"last", r.srNewest,
}
}
if gapSN <= 0 { // duplicate OR out-of-order
@@ -314,9 +302,8 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp {
r.logger.Infow(
"received sender report, anachronous, dropping",
"first", r.srFirst,
"last", r.srNewest,
"current", srData,
"rtpStats", lockedRTPStatsReceiverLogEncoder{r},
)
return false
}
@@ -370,10 +357,9 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
if r.outOfOrderSenderReportCount%10 == 0 {
r.logger.Infow(
"received sender report, out-of-order, skipping",
"first", r.srFirst,
"last", r.srNewest,
"current", &srDataCopy,
"count", r.outOfOrderSenderReportCount,
"rtpStats", lockedRTPStatsReceiverLogEncoder{r},
)
}
r.outOfOrderSenderReportCount++
@@ -394,8 +380,6 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
if r.clockSkewCount%100 == 0 {
r.logger.Infow(
"received sender report, clock skew",
"first", r.srFirst,
"last", r.srNewest,
"current", &srDataCopy,
"timeSinceFirst", timeSinceFirst,
"rtpDiffSinceFirst", rtpDiffSinceFirst,
@@ -404,6 +388,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
"rtpDiffSinceLast", rtpDiffSinceLast,
"calculatedLast", calculatedClockRateFromLast,
"count", r.clockSkewCount,
"rtpStats", lockedRTPStatsReceiverLogEncoder{r},
)
}
r.clockSkewCount++
@@ -414,16 +399,13 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
var deltaPropagationDelay time.Duration
getPropagationFields := func() []interface{} {
return []interface{}{
"propagationDelay", r.propagationDelay.String(),
"receivedPropagationDelay", propagationDelay.String(),
"longTermDeltaPropagationDelay", r.longTermDeltaPropagationDelay.String(),
"receivedDeltaPropagationDelay", deltaPropagationDelay.String(),
"deltaHighCount", r.propagationDelayDeltaHighCount,
"sinceDeltaHighStart", time.Since(r.propagationDelayDeltaHighStartTime).String(),
"propagationDelaySpike", r.propagationDelaySpike.String(),
"first", r.srFirst,
"last", r.srNewest,
"current", &srDataCopy,
"rtpStats", lockedRTPStatsReceiverLogEncoder{r},
}
}
resetDelta := func() {
@@ -537,6 +519,7 @@ func (r *RTPStatsReceiver) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uin
r.logger.Warnw(
"too many packets expected in receiver report",
fmt.Errorf("start: %d, end: %d, expected: %d", then.extStartSN, now.extStartSN, packetsExpected),
"rtpStats", lockedRTPStatsReceiverLogEncoder{r},
)
return nil
}
@@ -584,7 +567,16 @@ func (r *RTPStatsReceiver) DeltaInfo(snapshotID uint32) *RTPDeltaInfo {
r.lock.Lock()
defer r.lock.Unlock()
return r.deltaInfo(snapshotID, r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest())
deltaInfo, err, loggingFields := r.deltaInfo(
snapshotID,
r.sequenceNumber.GetExtendedStart(),
r.sequenceNumber.GetExtendedHighest(),
)
if err != nil {
r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsReceiverLogEncoder{r}))
}
return deltaInfo
}
func (r *RTPStatsReceiver) MarshalLogObject(e zapcore.ObjectEncoder) error {
@@ -595,16 +587,7 @@ func (r *RTPStatsReceiver) MarshalLogObject(e zapcore.ObjectEncoder) error {
r.lock.RLock()
defer r.lock.RUnlock()
e.AddObject("base", r.rtpStatsBase)
e.AddUint64("extStartSN", r.sequenceNumber.GetExtendedStart())
e.AddUint64("extHighestSN", r.sequenceNumber.GetExtendedHighest())
e.AddUint64("extStartTS", r.timestamp.GetExtendedStart())
e.AddUint64("extHighestTS", r.timestamp.GetExtendedHighest())
e.AddDuration("propagationDelay", r.propagationDelay)
e.AddDuration("longTermDeltaPropagationDelay", r.longTermDeltaPropagationDelay)
return nil
return lockedRTPStatsReceiverLogEncoder{r}.MarshalLogObject(e)
}
func (r *RTPStatsReceiver) String() string {
@@ -635,3 +618,26 @@ func (r *RTPStatsReceiver) isInRange(esn uint64, ehsn uint64) bool {
}
// ----------------------------------
type lockedRTPStatsReceiverLogEncoder struct {
*RTPStatsReceiver
}
func (r lockedRTPStatsReceiverLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder) error {
if r.RTPStatsReceiver == nil {
return nil
}
e.AddObject("base", r.rtpStatsBase)
e.AddUint64("extStartSN", r.sequenceNumber.GetExtendedStart())
e.AddUint64("extHighestSN", r.sequenceNumber.GetExtendedHighest())
e.AddUint64("extStartTS", r.timestamp.GetExtendedStart())
e.AddUint64("extHighestTS", r.timestamp.GetExtendedHighest())
e.AddDuration("propagationDelay", r.propagationDelay)
e.AddDuration("longTermDeltaPropagationDelay", r.longTermDeltaPropagationDelay)
return nil
}
// ----------------------------------
+43 -73
View File
@@ -277,10 +277,7 @@ func (r *RTPStatsSender) Update(
r.logger.Debugw(
"rtp sender stream start",
"startTime", r.startTime.String(),
"firstTime", r.firstTime.String(),
"startSN", r.extStartSN,
"startTS", r.extStartTS,
"rtpStats", lockedRTPStatsSenderLogEncoder{r},
)
}
@@ -289,26 +286,16 @@ func (r *RTPStatsSender) Update(
gapSN := int64(extSequenceNumber - r.extHighestSN)
getLoggingFields := func() []interface{} {
return []interface{}{
"extStartSN", r.extStartSN,
"extHighestSN", r.extHighestSN,
"extStartTS", r.extStartTS,
"extHighestTS", r.extHighestTS,
"startTime", r.startTime.String(),
"firstTime", r.firstTime.String(),
"highestTime", r.highestTime.String(),
"highestSN", r.extHighestSN,
"currSN", extSequenceNumber,
"gapSN", gapSN,
"highestTS", r.extHighestTS,
"currTS", extTimestamp,
"gapTS", int64(extTimestamp - r.extHighestTS),
"packetTime", packetTime.String(),
"packetTime", packetTime,
"marker", marker,
"hdrSize", hdrSize,
"payloadSize", payloadSize,
"paddingSize", paddingSize,
"firstSR", r.srFirst,
"lastSR", r.srNewest,
"rtpStats", lockedRTPStatsSenderLogEncoder{r},
}
}
if gapSN <= 0 { // duplicate OR out-of-order
@@ -340,9 +327,7 @@ func (r *RTPStatsSender) Update(
r.logger.Infow(
"adjusting start sequence number",
append(getLoggingFields(),
"snBefore", r.extStartSN,
"snAfter", extSequenceNumber,
"tsBefore", r.extStartTS,
"tsAfter", extTimestamp,
)...,
)
@@ -399,9 +384,7 @@ func (r *RTPStatsSender) Update(
r.logger.Infow(
"adjusting start timestamp",
append(getLoggingFields(),
"snBefore", r.extStartSN,
"snAfter", extSequenceNumber,
"tsBefore", r.extStartTS,
"tsAfter", extTimestamp,
)...,
)
@@ -472,12 +455,9 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
if !r.lastRRTime.IsZero() && r.extHighestSNFromRR > extHighestSNFromRR {
r.logger.Debugw(
fmt.Sprintf("receiver report potentially out of order, highestSN: existing: %d, received: %d", r.extHighestSNFromRR, extHighestSNFromRR),
"lastRRTime", r.lastRRTime.String(),
"lastRR", r.lastRR,
"sinceLastRR", time.Since(r.lastRRTime).String(),
"receivedRR", rr,
"firstSR", r.srFirst,
"lastSR", r.srNewest,
"rtpStats", lockedRTPStatsSenderLogEncoder{r},
)
return
}
@@ -539,24 +519,11 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
}
r.logger.Infow(
"rr interval too big, skipping",
"lastRRTime", r.lastRRTime.String(),
"lastRR", r.lastRR,
"timeSinceLastRR", timeSinceLastRR.String(),
"receivedRR", rr,
"extStartSN", r.extStartSN,
"extHighestSN", r.extHighestSN,
"extStartTS", r.extStartTS,
"extHighestTS", r.extHighestTS,
"extLastRRSN", s.extLastRRSN,
"firstTime", r.firstTime.String(),
"startTime", r.startTime.String(),
"highestTime", r.highestTime.String(),
"extReceivedRRSN", extReceivedRRSN,
"packetsInInterval", extReceivedRRSN-s.extLastRRSN,
"extHighestSNFromRR", r.extHighestSNFromRR,
"packetsLostFromRR", r.packetsLostFromRR,
"firstSR", r.srFirst,
"lastSR", r.srNewest,
"rtpStats", lockedRTPStatsSenderLogEncoder{r},
)
continue
}
@@ -573,27 +540,14 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
if r.metadataCacheOverflowCount%10 == 0 {
r.logger.Infow(
"metadata cache overflow",
"lastRRTime", r.lastRRTime.String(),
"lastRR", r.lastRR,
"timeSinceLastRR", timeSinceLastRR.String(),
"receivedRR", rr,
"extStartSN", r.extStartSN,
"extHighestSN", r.extHighestSN,
"extStartTS", r.extStartTS,
"extHighestTS", r.extHighestTS,
"extLastRRSN", s.extLastRRSN,
"firstTime", r.firstTime.String(),
"startTime", r.startTime.String(),
"highestTime", r.highestTime.String(),
"extReceivedRRSN", extReceivedRRSN,
"packetsInInterval", extReceivedRRSN-s.extLastRRSN,
"intervalStats", is.ToString(),
"aggregateIntervalStats", eis.ToString(),
"extHighestSNFromRR", r.extHighestSNFromRR,
"packetsLostFromRR", r.packetsLostFromRR,
"count", r.metadataCacheOverflowCount,
"firstSR", r.srFirst,
"lastSR", r.srNewest,
"rtpStats", lockedRTPStatsSenderLogEncoder{r},
)
}
r.metadataCacheOverflowCount++
@@ -671,22 +625,17 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS
getFields := func() []interface{} {
return []interface{}{
"first", r.srFirst,
"last", r.srNewest,
"curr", srData,
"feed", publisherSRData,
"tsOffset", tsOffset,
"timeNow", time.Now().String(),
"now", now.String(),
"extStartTS", r.extStartTS,
"extHighestTS", r.extHighestTS,
"highestTime", r.highestTime.String(),
"timeSinceHighest", now.Sub(r.highestTime).String(),
"firstTime", r.firstTime.String(),
"timeSinceFirst", now.Sub(r.firstTime).String(),
"timeSincePublisherSRAdjusted", timeSincePublisherSRAdjusted.String(),
"timeSincePublisherSR", time.Since(publisherSRData.At).String(),
"nowRTPExt", nowRTPExt,
"rtpStats", lockedRTPStatsSenderLogEncoder{r},
}
}
if r.srNewest != nil && nowRTPExt >= r.srNewest.RTPTimestampExt {
@@ -733,7 +682,16 @@ func (r *RTPStatsSender) DeltaInfo(snapshotID uint32) *RTPDeltaInfo {
r.lock.Lock()
defer r.lock.Unlock()
return r.deltaInfo(snapshotID, r.extStartSN, r.extHighestSN)
deltaInfo, err, loggingFields := r.deltaInfo(
snapshotID,
r.extStartSN,
r.extHighestSN,
)
if err != nil {
r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsSenderLogEncoder{r}))
}
return deltaInfo
}
func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo {
@@ -761,8 +719,7 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo
"startTime", startTime.String(),
"endTime", endTime.String(),
"duration", endTime.Sub(startTime).String(),
"firstSR", r.srFirst,
"lastSR", r.srNewest,
"rtpStats", lockedRTPStatsSenderLogEncoder{r},
)
return nil
}
@@ -790,6 +747,7 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo
packetsLost,
packetsLostFeed,
),
"rtpStats", lockedRTPStatsSenderLogEncoder{r},
)
packetsLost = packetsExpected
}
@@ -833,18 +791,7 @@ func (r *RTPStatsSender) MarshalLogObject(e zapcore.ObjectEncoder) error {
r.lock.RLock()
defer r.lock.RUnlock()
e.AddObject("base", r.rtpStatsBase)
e.AddUint64("extStartSN", r.extStartSN)
e.AddUint64("extHighestSN", r.extHighestSN)
e.AddUint64("extStartTS", r.extStartTS)
e.AddUint64("extHighestTS", r.extHighestTS)
e.AddTime("lastRRTime", r.lastRRTime)
e.AddReflected("lastRR", r.lastRR)
e.AddUint64("extHighestSNFromRR", r.extHighestSNFromRR)
e.AddUint64("packetsLostFromRR", r.packetsLostFromRR)
e.AddFloat64("jitterFromRR", r.jitterFromRR)
e.AddFloat64("maxJitterFromRR", r.maxJitterFromRR)
return nil
return lockedRTPStatsSenderLogEncoder{r}.MarshalLogObject(e)
}
func (r *RTPStatsSender) String() string {
@@ -1027,3 +974,26 @@ func (r *RTPStatsSender) getIntervalStats(
}
// -------------------------------------------------------------------
type lockedRTPStatsSenderLogEncoder struct {
*RTPStatsSender
}
func (r lockedRTPStatsSenderLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder) error {
if r.RTPStatsSender == nil {
return nil
}
e.AddObject("base", r.rtpStatsBase)
e.AddUint64("extStartSN", r.extStartSN)
e.AddUint64("extHighestSN", r.extHighestSN)
e.AddUint64("extStartTS", r.extStartTS)
e.AddUint64("extHighestTS", r.extHighestTS)
e.AddTime("lastRRTime", r.lastRRTime)
e.AddReflected("lastRR", r.lastRR)
e.AddUint64("extHighestSNFromRR", r.extHighestSNFromRR)
e.AddUint64("packetsLostFromRR", r.packetsLostFromRR)
e.AddFloat64("jitterFromRR", r.jitterFromRR)
e.AddFloat64("maxJitterFromRR", r.maxJitterFromRR)
return nil
}