diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 6142f10c7..08b1389d8 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -113,50 +113,6 @@ type snapshot struct { // ------------------------------------------------------------------ -type wrappedRTPDriftLogger struct { - *livekit.RTPDrift -} - -func (w wrappedRTPDriftLogger) MarshalLogObject(e zapcore.ObjectEncoder) error { - rd := w.RTPDrift - if rd == nil { - return nil - } - - e.AddTime("StartTime", rd.StartTime.AsTime()) - e.AddTime("EndTime", rd.EndTime.AsTime()) - e.AddFloat64("Duration", rd.Duration) - e.AddUint64("StartTimestamp", rd.StartTimestamp) - e.AddUint64("EndTimestamp", rd.EndTimestamp) - e.AddUint64("RtpClockTicks", rd.RtpClockTicks) - e.AddInt64("DriftSamples", rd.DriftSamples) - e.AddFloat64("DriftMs", rd.DriftMs) - e.AddFloat64("ClockRate", rd.ClockRate) - return nil -} - -// ------------------------------------------------------------------ - -type WrappedRTCPSenderReportStateLogger struct { - *livekit.RTCPSenderReportState -} - -func (w WrappedRTCPSenderReportStateLogger) MarshalLogObject(e zapcore.ObjectEncoder) error { - rsrs := w.RTCPSenderReportState - if rsrs == nil { - return nil - } - - e.AddUint32("RtpTimestamp", rsrs.RtpTimestamp) - e.AddUint64("RtpTimestampExt", rsrs.RtpTimestampExt) - e.AddTime("NtpTimestamp", mediatransportutil.NtpTime(rsrs.NtpTimestamp).Time()) - e.AddTime("At", time.Unix(0, rsrs.At)) - e.AddTime("AtAdjusted", time.Unix(0, rsrs.AtAdjusted)) - e.AddUint32("Packets", rsrs.Packets) - e.AddUint64("Octets", rsrs.Octets) - return nil -} - func RTCPSenderReportPropagationDelay(rsrs *livekit.RTCPSenderReportState, passThrough bool) time.Duration { if passThrough { return 0 @@ -537,7 +493,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderRepo "adjustment", time.Duration(r.firstTime - firstTime).String(), "extNowTS", extNowTS, "extStartTS", extStartTS, - "srData", WrappedRTCPSenderReportStateLogger{srData}, + "srData", logger.Proto(srData), "tsOffset", tsOffset, "timeSinceReceive", timeSinceReceive.String(), "timeSinceFirst", timeSinceFirst.String(), @@ -724,8 +680,8 @@ func (r *rtpStatsBase) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddUint32("rtt", r.rtt) e.AddUint32("maxRtt", r.maxRtt) - e.AddObject("srFirst", WrappedRTCPSenderReportStateLogger{r.srFirst}) - e.AddObject("srNewest", WrappedRTCPSenderReportStateLogger{r.srNewest}) + e.AddObject("srFirst", logger.Proto(r.srFirst)) + e.AddObject("srNewest", logger.Proto(r.srNewest)) return nil } diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 81d3f1178..e5d889320 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -26,6 +26,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/utils" "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" protoutils "github.com/livekit/protocol/utils" ) @@ -428,7 +429,7 @@ func (r *RTPStatsReceiver) checkOutOfOrderSenderReport(srData *livekit.RTCPSende if (r.outOfOrderSenderReportCount-1)%10 == 0 { r.logger.Infow( "received sender report, out-of-order, skipping", - "current", WrappedRTCPSenderReportStateLogger{srData}, + "current", logger.Proto(srData), "count", r.outOfOrderSenderReportCount, "rtpStats", lockedRTPStatsReceiverLogEncoder{r}, ) @@ -458,7 +459,7 @@ func (r *RTPStatsReceiver) checkRTPClockSkewForSenderReport(srData *livekit.RTCP if (r.clockSkewCount-1)%100 == 0 { r.logger.Infow( "received sender report, clock skew", - "current", WrappedRTCPSenderReportStateLogger{srData}, + "current", logger.Proto(srData), "timeSinceFirst", timeSinceFirst, "rtpDiffSinceFirst", rtpDiffSinceFirst, "calculatedFirst", calculatedClockRateFromFirst, @@ -494,7 +495,7 @@ func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srDa if (r.clockSkewMediaPathCount-1)%100 == 0 { r.logger.Infow( "received sender report, clock skew against media path", - "current", WrappedRTCPSenderReportStateLogger{srData}, + "current", logger.Proto(srData), "timeSinceSR", timeSinceSR, "extNowTSSR", extNowTSSR, "timeSinceHighest", timeSinceHighest, @@ -723,10 +724,10 @@ func (r lockedRTPStatsReceiverLogEncoder) MarshalLogObject(e zapcore.ObjectEncod e.AddObject("propagationDelayEstimator", r.propagationDelayEstimator) packetDrift, ntpReportDrift, receivedReportDrift, rebasedReportDrift := r.getDrift(extStartTS, extHighestTS) - e.AddObject("packetDrift", wrappedRTPDriftLogger{packetDrift}) - e.AddObject("ntpReportDrift", wrappedRTPDriftLogger{ntpReportDrift}) - e.AddObject("receivedReportDrift", wrappedRTPDriftLogger{receivedReportDrift}) - e.AddObject("rebasedReportDrift", wrappedRTPDriftLogger{rebasedReportDrift}) + e.AddObject("packetDrift", logger.Proto(packetDrift)) + e.AddObject("ntpReportDrift", logger.Proto(ntpReportDrift)) + e.AddObject("receivedReportDrift", logger.Proto(receivedReportDrift)) + e.AddObject("rebasedReportDrift", logger.Proto(rebasedReportDrift)) return nil } diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index b46e976e9..18a9b927d 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -25,6 +25,7 @@ import ( "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" ) const ( @@ -1027,9 +1028,9 @@ func (r lockedRTPStatsSenderLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder e.AddFloat64("maxJitterFromRR", r.maxJitterFromRR) packetDrift, ntpReportDrift, receivedReportDrift, rebasedReportDrift := r.getDrift(r.extStartTS, r.extHighestTS) - e.AddObject("packetDrift", wrappedRTPDriftLogger{packetDrift}) - e.AddObject("ntpReportDrift", wrappedRTPDriftLogger{ntpReportDrift}) - e.AddObject("receivedReportDrift", wrappedRTPDriftLogger{receivedReportDrift}) - e.AddObject("rebasedReportDrift", wrappedRTPDriftLogger{rebasedReportDrift}) + e.AddObject("packetDrift", logger.Proto(packetDrift)) + e.AddObject("ntpReportDrift", logger.Proto(ntpReportDrift)) + e.AddObject("receivedReportDrift", logger.Proto(receivedReportDrift)) + e.AddObject("rebasedReportDrift", logger.Proto(rebasedReportDrift)) return nil } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 08edc1e5b..87eb6f81c 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -30,6 +30,7 @@ import ( "github.com/pion/transport/v2/packetio" "github.com/pion/webrtc/v3" "go.uber.org/atomic" + "go.uber.org/zap/zapcore" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -145,6 +146,13 @@ func (d DownTrackState) String() string { d.RTPStats, d.DeltaStatsSenderSnapshotId, d.ForwarderState.String()) } +func (d DownTrackState) MarshalLogObject(e zapcore.ObjectEncoder) error { + e.AddObject("RTPStats", d.RTPStats) + e.AddUint32("DeltaStatsSenderSnapshotId", d.DeltaStatsSenderSnapshotId) + e.AddObject("ForwarderState", logger.Proto(d.ForwarderState)) + return nil +} + // ------------------------------------------------------------------- /* STREAM-ALLOCATOR-DATA @@ -1245,6 +1253,7 @@ func (d *DownTrack) GetState() DownTrackState { } func (d *DownTrack) SeedState(state DownTrackState) { + d.params.Logger.Debugw("seeding down track state", "state", state) if state.RTPStats != nil { d.rtpStats.Seed(state.RTPStats) d.deltaStatsSenderSnapshotId = state.DeltaStatsSenderSnapshotId diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index e14509541..2d586f11a 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -196,9 +196,7 @@ type refInfo struct { } func (r refInfo) MarshalLogObject(e zapcore.ObjectEncoder) error { - e.AddObject("senderReport", buffer.WrappedRTCPSenderReportStateLogger{ - RTCPSenderReportState: r.senderReport, - }) + e.AddObject("senderReport", logger.Proto(r.senderReport)) e.AddUint64("tsOffset", r.tsOffset) e.AddBool("isTSOffsetValid", r.isTSOffsetValid) return nil @@ -625,6 +623,9 @@ func (f *Forwarder) SetRefSenderReport(isSVC bool, layer int32, srData *livekit. f.refIsSVC = isSVC if layer >= 0 && int(layer) < len(f.refInfos) { + if layer == f.referenceLayerSpatial && f.refInfos[layer].senderReport == nil { + f.logger.Debugw("received RTCP sender report for reference layer spatial", "layer", layer) + } f.refInfos[layer] = refInfo{srData, 0, false} // Mark validity of time stamp offset.