From bfbc4fa81fa4972e9c668e44306811fdec8143eb Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 6 Jan 2025 11:48:27 +0530 Subject: [PATCH] Remove alloc in packet forwarding path. (#3305) * Remove alloc in packet forwarding path. Unlikely logger creation was doing allocs. Replace it with a function like in rtpstats_receiver.go so that allocations do not happen unnecessarily. * variable rename * one more place --- pkg/sfu/buffer/buffer_test.go | 1 - pkg/sfu/rtpstats/rtpstats_sender.go | 71 +++++++++++++++-------------- 2 files changed, 38 insertions(+), 34 deletions(-) diff --git a/pkg/sfu/buffer/buffer_test.go b/pkg/sfu/buffer/buffer_test.go index de8f33022..ad8a553b8 100644 --- a/pkg/sfu/buffer/buffer_test.go +++ b/pkg/sfu/buffer/buffer_test.go @@ -322,5 +322,4 @@ func BenchmarkMemcpu(b *testing.B) { for i := 0; i < b.N; i++ { copy(buf2, buf) } - } diff --git a/pkg/sfu/rtpstats/rtpstats_sender.go b/pkg/sfu/rtpstats/rtpstats_sender.go index e3ee972a8..c47cad3fb 100644 --- a/pkg/sfu/rtpstats/rtpstats_sender.go +++ b/pkg/sfu/rtpstats/rtpstats_sender.go @@ -26,6 +26,7 @@ import ( "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils/mono" ) @@ -363,18 +364,20 @@ func (r *RTPStatsSender) Update( pktSize := uint64(hdrSize + payloadSize + paddingSize) isDuplicate := false gapSN := int64(extSequenceNumber - r.extHighestSN) - logger := r.logger.WithUnlikelyValues( - "currSN", extSequenceNumber, - "gapSN", gapSN, - "currTS", extTimestamp, - "gapTS", int64(extTimestamp-r.extHighestTS), - "packetTime", packetTime, - "marker", marker, - "hdrSize", hdrSize, - "payloadSize", payloadSize, - "paddingSize", paddingSize, - "rtpStats", lockedRTPStatsSenderLogEncoder{r}, - ) + ulgr := func() logger.UnlikelyLogger { + return r.logger.WithUnlikelyValues( + "currSN", extSequenceNumber, + "gapSN", gapSN, + "currTS", extTimestamp, + "gapTS", int64(extTimestamp-r.extHighestTS), + "packetTime", packetTime, + "marker", marker, + "hdrSize", hdrSize, + "payloadSize", payloadSize, + "paddingSize", paddingSize, + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, + ) + } if gapSN <= 0 { // duplicate OR out-of-order if payloadSize == 0 && extSequenceNumber < r.extStartSN { // do not start on a padding only packet @@ -401,7 +404,7 @@ func (r *RTPStatsSender) Update( } } - logger.Infow( + ulgr().Infow( "adjusting start sequence number", "snAfter", extSequenceNumber, "tsAfter", extTimestamp, @@ -426,7 +429,7 @@ func (r *RTPStatsSender) Update( if !isDuplicate && -gapSN >= cSequenceNumberLargeJumpThreshold { r.largeJumpNegativeCount++ if (r.largeJumpNegativeCount-1)%100 == 0 { - logger.Warnw( + ulgr().Warnw( "large sequence number gap negative", nil, "count", r.largeJumpNegativeCount, ) @@ -436,7 +439,7 @@ func (r *RTPStatsSender) Update( if gapSN >= cSequenceNumberLargeJumpThreshold { r.largeJumpCount++ if (r.largeJumpCount-1)%100 == 0 { - logger.Warnw( + ulgr().Warnw( "large sequence number gap", nil, "count", r.largeJumpCount, ) @@ -446,7 +449,7 @@ func (r *RTPStatsSender) Update( if extTimestamp < r.extHighestTS { r.timeReversedCount++ if (r.timeReversedCount-1)%100 == 0 { - logger.Warnw( + ulgr().Warnw( "time reversed", nil, "count", r.timeReversedCount, ) @@ -466,7 +469,7 @@ func (r *RTPStatsSender) Update( } if extTimestamp < r.extStartTS { - logger.Infow( + ulgr().Infow( "adjusting start timestamp", "snAfter", extSequenceNumber, "tsAfter", extTimestamp, @@ -732,20 +735,22 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek Octets: octetCount, } - logger := r.logger.WithUnlikelyValues( - "curr", WrappedRTCPSenderReportStateLogger{srData}, - "feed", WrappedRTCPSenderReportStateLogger{publisherSRData}, - "tsOffset", tsOffset, - "timeNow", time.Now(), - "reportTime", time.Unix(0, reportTime), - "reportTimeAdjusted", time.Unix(0, reportTimeAdjusted), - "timeSinceHighest", time.Since(time.Unix(0, r.highestTime)), - "timeSinceFirst", time.Since(time.Unix(0, r.firstTime)), - "timeSincePublisherSRAdjusted", time.Since(time.Unix(0, publisherSRData.AtAdjusted)), - "timeSincePublisherSR", time.Since(time.Unix(0, publisherSRData.At)), - "nowRTPExt", nowRTPExt, - "rtpStats", lockedRTPStatsSenderLogEncoder{r}, - ) + ulgr := func() logger.UnlikelyLogger { + return r.logger.WithUnlikelyValues( + "curr", WrappedRTCPSenderReportStateLogger{srData}, + "feed", WrappedRTCPSenderReportStateLogger{publisherSRData}, + "tsOffset", tsOffset, + "timeNow", time.Now(), + "reportTime", time.Unix(0, reportTime), + "reportTimeAdjusted", time.Unix(0, reportTimeAdjusted), + "timeSinceHighest", time.Since(time.Unix(0, r.highestTime)), + "timeSinceFirst", time.Since(time.Unix(0, r.firstTime)), + "timeSincePublisherSRAdjusted", time.Since(time.Unix(0, publisherSRData.AtAdjusted)), + "timeSincePublisherSR", time.Since(time.Unix(0, publisherSRData.At)), + "nowRTPExt", nowRTPExt, + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, + ) + } if r.srNewest != nil && nowRTPExt >= r.srNewest.RtpTimestampExt { timeSinceLastReport := nowNTP.Time().Sub(mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time()) @@ -754,7 +759,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek if timeSinceLastReport.Seconds() > 0.2 && math.Abs(float64(r.params.ClockRate)-windowClockRate) > 0.2*float64(r.params.ClockRate) { r.clockSkewCount++ if (r.clockSkewCount-1)%100 == 0 { - logger.Infow( + ulgr().Infow( "sending sender report, clock skew", "timeSinceLastReport", timeSinceLastReport, "rtpDiffSinceLastReport", rtpDiffSinceLastReport, @@ -768,7 +773,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek if r.srNewest != nil && nowRTPExt < r.srNewest.RtpTimestampExt { // If report being generated is behind the last report, skip it. // Should not happen. - logger.Infow("sending sender report, out-of-order, skipping") + ulgr().Infow("sending sender report, out-of-order, skipping") return nil }