Remove alloc in packet forwarding path. (#3305)

* Remove alloc in packet forwarding path.

Unlikely logger creation was doing allocs. Replace it with a function
like in rtpstats_receiver.go so that allocations do not happen
unnecessarily.

* variable rename

* one more place
This commit is contained in:
Raja Subramanian
2025-01-06 11:48:27 +05:30
committed by GitHub
parent c792d15244
commit bfbc4fa81f
2 changed files with 38 additions and 34 deletions

View File

@@ -322,5 +322,4 @@ func BenchmarkMemcpu(b *testing.B) {
for i := 0; i < b.N; i++ {
copy(buf2, buf)
}
}

View File

@@ -26,6 +26,7 @@ import (
"github.com/livekit/mediatransportutil"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/mono"
)
@@ -363,18 +364,20 @@ func (r *RTPStatsSender) Update(
pktSize := uint64(hdrSize + payloadSize + paddingSize)
isDuplicate := false
gapSN := int64(extSequenceNumber - r.extHighestSN)
logger := r.logger.WithUnlikelyValues(
"currSN", extSequenceNumber,
"gapSN", gapSN,
"currTS", extTimestamp,
"gapTS", int64(extTimestamp-r.extHighestTS),
"packetTime", packetTime,
"marker", marker,
"hdrSize", hdrSize,
"payloadSize", payloadSize,
"paddingSize", paddingSize,
"rtpStats", lockedRTPStatsSenderLogEncoder{r},
)
ulgr := func() logger.UnlikelyLogger {
return r.logger.WithUnlikelyValues(
"currSN", extSequenceNumber,
"gapSN", gapSN,
"currTS", extTimestamp,
"gapTS", int64(extTimestamp-r.extHighestTS),
"packetTime", packetTime,
"marker", marker,
"hdrSize", hdrSize,
"payloadSize", payloadSize,
"paddingSize", paddingSize,
"rtpStats", lockedRTPStatsSenderLogEncoder{r},
)
}
if gapSN <= 0 { // duplicate OR out-of-order
if payloadSize == 0 && extSequenceNumber < r.extStartSN {
// do not start on a padding only packet
@@ -401,7 +404,7 @@ func (r *RTPStatsSender) Update(
}
}
logger.Infow(
ulgr().Infow(
"adjusting start sequence number",
"snAfter", extSequenceNumber,
"tsAfter", extTimestamp,
@@ -426,7 +429,7 @@ func (r *RTPStatsSender) Update(
if !isDuplicate && -gapSN >= cSequenceNumberLargeJumpThreshold {
r.largeJumpNegativeCount++
if (r.largeJumpNegativeCount-1)%100 == 0 {
logger.Warnw(
ulgr().Warnw(
"large sequence number gap negative", nil,
"count", r.largeJumpNegativeCount,
)
@@ -436,7 +439,7 @@ func (r *RTPStatsSender) Update(
if gapSN >= cSequenceNumberLargeJumpThreshold {
r.largeJumpCount++
if (r.largeJumpCount-1)%100 == 0 {
logger.Warnw(
ulgr().Warnw(
"large sequence number gap", nil,
"count", r.largeJumpCount,
)
@@ -446,7 +449,7 @@ func (r *RTPStatsSender) Update(
if extTimestamp < r.extHighestTS {
r.timeReversedCount++
if (r.timeReversedCount-1)%100 == 0 {
logger.Warnw(
ulgr().Warnw(
"time reversed", nil,
"count", r.timeReversedCount,
)
@@ -466,7 +469,7 @@ func (r *RTPStatsSender) Update(
}
if extTimestamp < r.extStartTS {
logger.Infow(
ulgr().Infow(
"adjusting start timestamp",
"snAfter", extSequenceNumber,
"tsAfter", extTimestamp,
@@ -732,20 +735,22 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek
Octets: octetCount,
}
logger := r.logger.WithUnlikelyValues(
"curr", WrappedRTCPSenderReportStateLogger{srData},
"feed", WrappedRTCPSenderReportStateLogger{publisherSRData},
"tsOffset", tsOffset,
"timeNow", time.Now(),
"reportTime", time.Unix(0, reportTime),
"reportTimeAdjusted", time.Unix(0, reportTimeAdjusted),
"timeSinceHighest", time.Since(time.Unix(0, r.highestTime)),
"timeSinceFirst", time.Since(time.Unix(0, r.firstTime)),
"timeSincePublisherSRAdjusted", time.Since(time.Unix(0, publisherSRData.AtAdjusted)),
"timeSincePublisherSR", time.Since(time.Unix(0, publisherSRData.At)),
"nowRTPExt", nowRTPExt,
"rtpStats", lockedRTPStatsSenderLogEncoder{r},
)
ulgr := func() logger.UnlikelyLogger {
return r.logger.WithUnlikelyValues(
"curr", WrappedRTCPSenderReportStateLogger{srData},
"feed", WrappedRTCPSenderReportStateLogger{publisherSRData},
"tsOffset", tsOffset,
"timeNow", time.Now(),
"reportTime", time.Unix(0, reportTime),
"reportTimeAdjusted", time.Unix(0, reportTimeAdjusted),
"timeSinceHighest", time.Since(time.Unix(0, r.highestTime)),
"timeSinceFirst", time.Since(time.Unix(0, r.firstTime)),
"timeSincePublisherSRAdjusted", time.Since(time.Unix(0, publisherSRData.AtAdjusted)),
"timeSincePublisherSR", time.Since(time.Unix(0, publisherSRData.At)),
"nowRTPExt", nowRTPExt,
"rtpStats", lockedRTPStatsSenderLogEncoder{r},
)
}
if r.srNewest != nil && nowRTPExt >= r.srNewest.RtpTimestampExt {
timeSinceLastReport := nowNTP.Time().Sub(mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time())
@@ -754,7 +759,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek
if timeSinceLastReport.Seconds() > 0.2 && math.Abs(float64(r.params.ClockRate)-windowClockRate) > 0.2*float64(r.params.ClockRate) {
r.clockSkewCount++
if (r.clockSkewCount-1)%100 == 0 {
logger.Infow(
ulgr().Infow(
"sending sender report, clock skew",
"timeSinceLastReport", timeSinceLastReport,
"rtpDiffSinceLastReport", rtpDiffSinceLastReport,
@@ -768,7 +773,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek
if r.srNewest != nil && nowRTPExt < r.srNewest.RtpTimestampExt {
// If report being generated is behind the last report, skip it.
// Should not happen.
logger.Infow("sending sender report, out-of-order, skipping")
ulgr().Infow("sending sender report, out-of-order, skipping")
return nil
}