From c1ad2b22e67fe3e5248cd5b77e6919a73428f67e Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 28 Apr 2026 20:51:04 +0530 Subject: [PATCH] Misc optimisations. (#4490) - prevent some escape to heap - avoid copying by using a ring buffer for receiver reports (probably should remove this as this is for debugging only and data so far has shown clients sending bad data and nothing more.) --- pkg/sfu/rtpstats/rtpstats_base.go | 10 ++--- pkg/sfu/rtpstats/rtpstats_base_lite.go | 10 ++--- pkg/sfu/rtpstats/rtpstats_receiver.go | 4 +- pkg/sfu/rtpstats/rtpstats_sender.go | 61 +++++++++++++++----------- 4 files changed, 47 insertions(+), 38 deletions(-) diff --git a/pkg/sfu/rtpstats/rtpstats_base.go b/pkg/sfu/rtpstats/rtpstats_base.go index 1dfcef3d6..b14a92b8b 100644 --- a/pkg/sfu/rtpstats/rtpstats_base.go +++ b/pkg/sfu/rtpstats/rtpstats_base.go @@ -481,8 +481,8 @@ func (r *rtpStatsBase) deltaInfo( return } - then, now := r.getAndResetSnapshot(snapshotID, extStartSN, extHighestSN) - if now == nil || then == nil { + then, now, ok := r.getAndResetSnapshot(snapshotID, extStartSN, extHighestSN) + if !ok { return } @@ -694,9 +694,9 @@ func (r *rtpStatsBase) updateJitter(ets uint64, packetTime int64) float64 { return r.jitter } -func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64, extHighestSN uint64) (*snapshot, *snapshot) { +func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64, extHighestSN uint64) (snapshot, snapshot, bool) { if !r.initialized || snapshotID < cFirstSnapshotID { - return nil, nil + return snapshot{}, snapshot{}, false } idx := snapshotID - cFirstSnapshotID @@ -709,7 +709,7 @@ func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64, // snapshot now now := r.getSnapshot(mono.UnixNano(), extHighestSN+1) r.snapshots[idx] = now - return &then, &now + return then, now, true } func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) ( diff --git a/pkg/sfu/rtpstats/rtpstats_base_lite.go b/pkg/sfu/rtpstats/rtpstats_base_lite.go index c88b45c79..9180dfd48 100644 --- a/pkg/sfu/rtpstats/rtpstats_base_lite.go +++ b/pkg/sfu/rtpstats/rtpstats_base_lite.go @@ -320,8 +320,8 @@ func (r *rtpStatsBaseLite) deltaInfoLite( extStartSN uint64, extHighestSN uint64, ) (deltaInfoLite *RTPDeltaInfoLite, loggingFields []any, err error) { - then, now := r.getAndResetSnapshotLite(snapshotLiteID, extStartSN, extHighestSN) - if now == nil || then == nil { + then, now, ok := r.getAndResetSnapshotLite(snapshotLiteID, extStartSN, extHighestSN) + if !ok { return } @@ -493,9 +493,9 @@ func (r *rtpStatsBaseLite) toProto(packetsExpected, packetsSeenMinusPadding, pac return p } -func (r *rtpStatsBaseLite) getAndResetSnapshotLite(snapshotLiteID uint32, extStartSN uint64, extHighestSN uint64) (*snapshotLite, *snapshotLite) { +func (r *rtpStatsBaseLite) getAndResetSnapshotLite(snapshotLiteID uint32, extStartSN uint64, extHighestSN uint64) (snapshotLite, snapshotLite, bool) { if !r.initialized { - return nil, nil + return snapshotLite{}, snapshotLite{}, false } idx := snapshotLiteID - cFirstSnapshotID @@ -508,7 +508,7 @@ func (r *rtpStatsBaseLite) getAndResetSnapshotLite(snapshotLiteID uint32, extSta // snapshot now now := r.getSnapshotLite(mono.UnixNano(), extHighestSN+1) r.snapshotLites[idx] = now - return &then, &now + return then, now, true } func (r *rtpStatsBaseLite) updateGapHistogram(gap int) { diff --git a/pkg/sfu/rtpstats/rtpstats_receiver.go b/pkg/sfu/rtpstats/rtpstats_receiver.go index fdc7c0d7c..1fc1fc8ed 100644 --- a/pkg/sfu/rtpstats/rtpstats_receiver.go +++ b/pkg/sfu/rtpstats/rtpstats_receiver.go @@ -835,8 +835,8 @@ func (r *RTPStatsReceiver) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uin defer r.lock.Unlock() extHighestSN := r.sequenceNumber.GetExtendedHighest() - then, now := r.getAndResetSnapshot(snapshotID, r.sequenceNumber.GetExtendedStart(), extHighestSN) - if now == nil || then == nil { + then, now, ok := r.getAndResetSnapshot(snapshotID, r.sequenceNumber.GetExtendedStart(), extHighestSN) + if !ok { return nil } diff --git a/pkg/sfu/rtpstats/rtpstats_sender.go b/pkg/sfu/rtpstats/rtpstats_sender.go index 1c155f6d3..99e4dfd63 100644 --- a/pkg/sfu/rtpstats/rtpstats_sender.go +++ b/pkg/sfu/rtpstats/rtpstats_sender.go @@ -106,7 +106,8 @@ type wrappedReceptionReportsLogger struct { } func (w wrappedReceptionReportsLogger) MarshalLogObject(e zapcore.ObjectEncoder) error { - for i, rr := range w.senderSnapshotReceiverView.processedReceptionReports { + for i := 0; i < w.senderSnapshotReceiverView.processedReceptionReportsSize; i++ { + rr := w.senderSnapshotReceiverView.processedReceptionReports[(w.senderSnapshotReceiverView.processedReceptionReportsHead+i)%cMaxProcessedReceptionReports] e.AddReflected(fmt.Sprintf("%d", i), rr) } @@ -195,9 +196,13 @@ type senderSnapshotReceiverView struct { maxRtt uint32 maxJitter float64 - extLastRRSN uint64 - intervalStats intervalStats - processedReceptionReports []rtcp.ReceptionReport + extLastRRSN uint64 + intervalStats intervalStats + + processedReceptionReports [cMaxProcessedReceptionReports]rtcp.ReceptionReport + processedReceptionReportsHead int + processedReceptionReportsSize int + metadataCacheOverflowCount int } @@ -861,12 +866,15 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt } } s.receiverView.extLastRRSN = extReceivedRRSN - reports := &s.receiverView.processedReceptionReports - if len(*reports) >= cMaxProcessedReceptionReports { - copy(*reports, (*reports)[1:]) - (*reports)[len(*reports)-1] = rr - } else { - *reports = append(*reports, rr) + + s.receiverView.processedReceptionReports[s.receiverView.processedReceptionReportsHead] = rr + s.receiverView.processedReceptionReportsHead++ + if s.receiverView.processedReceptionReportsHead >= cMaxProcessedReceptionReports { + s.receiverView.processedReceptionReportsHead = 0 + } + s.receiverView.processedReceptionReportsSize++ + if s.receiverView.processedReceptionReportsSize > cMaxProcessedReceptionReports { + s.receiverView.processedReceptionReportsSize = cMaxProcessedReceptionReports } } @@ -1045,8 +1053,8 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) (*RTPDeltaInfo } var deltaStatsSenderView *RTPDeltaInfo - thenSenderView, nowSenderView := r.getAndResetSenderSnapshotWindow(senderSnapshotID) - if thenSenderView != nil && nowSenderView != nil { + thenSenderView, nowSenderView, ok := r.getAndResetSenderSnapshotWindow(senderSnapshotID) + if !ok { startTime := thenSenderView.startTime endTime := nowSenderView.startTime @@ -1108,8 +1116,8 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) (*RTPDeltaInfo var deltaStatsReceiverView *RTPDeltaInfo if r.lastRRTime != 0 { - thenReceiverView, nowReceiverView := r.getAndResetSenderSnapshotReceiverView(senderSnapshotID) - if thenReceiverView != nil && nowReceiverView != nil { + thenReceiverView, nowReceiverView, ok := r.getAndResetSenderSnapshotReceiverView(senderSnapshotID) + if !ok { startTime := thenReceiverView.startTime endTime := nowReceiverView.startTime @@ -1213,9 +1221,9 @@ func (r *RTPStatsSender) ToProto() *livekit.RTPStats { return p } -func (r *RTPStatsSender) getAndResetSenderSnapshotWindow(senderSnapshotID uint32) (*senderSnapshotWindow, *senderSnapshotWindow) { +func (r *RTPStatsSender) getAndResetSenderSnapshotWindow(senderSnapshotID uint32) (senderSnapshotWindow, senderSnapshotWindow, bool) { if !r.initialized || senderSnapshotID < cFirstSnapshotID { - return nil, nil + return senderSnapshotWindow{}, senderSnapshotWindow{}, false } idx := senderSnapshotID - cFirstSnapshotID @@ -1227,7 +1235,7 @@ func (r *RTPStatsSender) getAndResetSenderSnapshotWindow(senderSnapshotID uint32 // snapshot now r.senderSnapshots[idx].senderView = r.getSenderSnapshotWindow(mono.UnixNano()) - return &then.senderView, &r.senderSnapshots[idx].senderView + return then.senderView, r.senderSnapshots[idx].senderView, true } func (r *RTPStatsSender) getSenderSnapshotWindow(startTime int64) senderSnapshotWindow { @@ -1254,9 +1262,9 @@ func (r *RTPStatsSender) getSenderSnapshotWindow(startTime int64) senderSnapshot } } -func (r *RTPStatsSender) getAndResetSenderSnapshotReceiverView(senderSnapshotID uint32) (*senderSnapshotReceiverView, *senderSnapshotReceiverView) { +func (r *RTPStatsSender) getAndResetSenderSnapshotReceiverView(senderSnapshotID uint32) (senderSnapshotReceiverView, senderSnapshotReceiverView, bool) { if !r.initialized || r.lastRRTime == 0 || senderSnapshotID < cFirstSnapshotID { - return nil, nil + return senderSnapshotReceiverView{}, senderSnapshotReceiverView{}, false } idx := senderSnapshotID - cFirstSnapshotID @@ -1268,7 +1276,7 @@ func (r *RTPStatsSender) getAndResetSenderSnapshotReceiverView(senderSnapshotID // snapshot now r.senderSnapshots[idx].receiverView = r.getSenderSnapshotReceiverView(r.lastRRTime, &then.receiverView) - return &then.receiverView, &r.senderSnapshots[idx].receiverView + return then.receiverView, r.senderSnapshots[idx].receiverView, true } func (r *RTPStatsSender) getSenderSnapshotReceiverView(startTime int64, s *senderSnapshotReceiverView) senderSnapshotReceiverView { @@ -1347,11 +1355,13 @@ func (r *RTPStatsSender) clearSnInfos(extStartInclusive uint64, extEndExclusive return } + if extEndExclusive-extStartInclusive > uint64(len(r.snInfos)) { + clear(r.snInfos) + return + } + for esn := extStartInclusive; esn != extEndExclusive; esn++ { - snInfo := &r.snInfos[int(esn)%len(r.snInfos)] - snInfo.pktSize = 0 - snInfo.hdrSize = 0 - snInfo.flags = 0 + r.snInfos[int(esn)%len(r.snInfos)] = snInfo{} } } @@ -1380,8 +1390,7 @@ func (r *RTPStatsSender) getIntervalStats( intervalStats.packetsNotFoundMetadata = (extEndExclusive - extStartInclusive) - (extEndExclusiveClamped - extStartInclusiveClamped) for esn := extStartInclusiveClamped; esn != extEndExclusiveClamped; esn++ { - slot := r.getSnInfoOutOfOrderSlot(esn, ehsn) - snInfo := &r.snInfos[slot] + snInfo := &r.snInfos[int(esn)%len(r.snInfos)] switch { case snInfo.pktSize == 0: intervalStats.packetsLostFeed++