Misc optimisations. (#4490)

- prevent some escape to heap
- avoid copying by using a ring buffer for receiver reports (probably
  should remove this as this is for debugging only and data so far has
  shown clients sending bad data and nothing more.)
This commit is contained in:
Raja Subramanian
2026-04-28 20:51:04 +05:30
committed by GitHub
parent 19b9e8c00a
commit c1ad2b22e6
4 changed files with 47 additions and 38 deletions
+5 -5
View File
@@ -481,8 +481,8 @@ func (r *rtpStatsBase) deltaInfo(
return
}
then, now := r.getAndResetSnapshot(snapshotID, extStartSN, extHighestSN)
if now == nil || then == nil {
then, now, ok := r.getAndResetSnapshot(snapshotID, extStartSN, extHighestSN)
if !ok {
return
}
@@ -694,9 +694,9 @@ func (r *rtpStatsBase) updateJitter(ets uint64, packetTime int64) float64 {
return r.jitter
}
func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64, extHighestSN uint64) (*snapshot, *snapshot) {
func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64, extHighestSN uint64) (snapshot, snapshot, bool) {
if !r.initialized || snapshotID < cFirstSnapshotID {
return nil, nil
return snapshot{}, snapshot{}, false
}
idx := snapshotID - cFirstSnapshotID
@@ -709,7 +709,7 @@ func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64,
// snapshot now
now := r.getSnapshot(mono.UnixNano(), extHighestSN+1)
r.snapshots[idx] = now
return &then, &now
return then, now, true
}
func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (
+5 -5
View File
@@ -320,8 +320,8 @@ func (r *rtpStatsBaseLite) deltaInfoLite(
extStartSN uint64,
extHighestSN uint64,
) (deltaInfoLite *RTPDeltaInfoLite, loggingFields []any, err error) {
then, now := r.getAndResetSnapshotLite(snapshotLiteID, extStartSN, extHighestSN)
if now == nil || then == nil {
then, now, ok := r.getAndResetSnapshotLite(snapshotLiteID, extStartSN, extHighestSN)
if !ok {
return
}
@@ -493,9 +493,9 @@ func (r *rtpStatsBaseLite) toProto(packetsExpected, packetsSeenMinusPadding, pac
return p
}
func (r *rtpStatsBaseLite) getAndResetSnapshotLite(snapshotLiteID uint32, extStartSN uint64, extHighestSN uint64) (*snapshotLite, *snapshotLite) {
func (r *rtpStatsBaseLite) getAndResetSnapshotLite(snapshotLiteID uint32, extStartSN uint64, extHighestSN uint64) (snapshotLite, snapshotLite, bool) {
if !r.initialized {
return nil, nil
return snapshotLite{}, snapshotLite{}, false
}
idx := snapshotLiteID - cFirstSnapshotID
@@ -508,7 +508,7 @@ func (r *rtpStatsBaseLite) getAndResetSnapshotLite(snapshotLiteID uint32, extSta
// snapshot now
now := r.getSnapshotLite(mono.UnixNano(), extHighestSN+1)
r.snapshotLites[idx] = now
return &then, &now
return then, now, true
}
func (r *rtpStatsBaseLite) updateGapHistogram(gap int) {
+2 -2
View File
@@ -835,8 +835,8 @@ func (r *RTPStatsReceiver) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uin
defer r.lock.Unlock()
extHighestSN := r.sequenceNumber.GetExtendedHighest()
then, now := r.getAndResetSnapshot(snapshotID, r.sequenceNumber.GetExtendedStart(), extHighestSN)
if now == nil || then == nil {
then, now, ok := r.getAndResetSnapshot(snapshotID, r.sequenceNumber.GetExtendedStart(), extHighestSN)
if !ok {
return nil
}
+35 -26
View File
@@ -106,7 +106,8 @@ type wrappedReceptionReportsLogger struct {
}
func (w wrappedReceptionReportsLogger) MarshalLogObject(e zapcore.ObjectEncoder) error {
for i, rr := range w.senderSnapshotReceiverView.processedReceptionReports {
for i := 0; i < w.senderSnapshotReceiverView.processedReceptionReportsSize; i++ {
rr := w.senderSnapshotReceiverView.processedReceptionReports[(w.senderSnapshotReceiverView.processedReceptionReportsHead+i)%cMaxProcessedReceptionReports]
e.AddReflected(fmt.Sprintf("%d", i), rr)
}
@@ -195,9 +196,13 @@ type senderSnapshotReceiverView struct {
maxRtt uint32
maxJitter float64
extLastRRSN uint64
intervalStats intervalStats
processedReceptionReports []rtcp.ReceptionReport
extLastRRSN uint64
intervalStats intervalStats
processedReceptionReports [cMaxProcessedReceptionReports]rtcp.ReceptionReport
processedReceptionReportsHead int
processedReceptionReportsSize int
metadataCacheOverflowCount int
}
@@ -861,12 +866,15 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
}
}
s.receiverView.extLastRRSN = extReceivedRRSN
reports := &s.receiverView.processedReceptionReports
if len(*reports) >= cMaxProcessedReceptionReports {
copy(*reports, (*reports)[1:])
(*reports)[len(*reports)-1] = rr
} else {
*reports = append(*reports, rr)
s.receiverView.processedReceptionReports[s.receiverView.processedReceptionReportsHead] = rr
s.receiverView.processedReceptionReportsHead++
if s.receiverView.processedReceptionReportsHead >= cMaxProcessedReceptionReports {
s.receiverView.processedReceptionReportsHead = 0
}
s.receiverView.processedReceptionReportsSize++
if s.receiverView.processedReceptionReportsSize > cMaxProcessedReceptionReports {
s.receiverView.processedReceptionReportsSize = cMaxProcessedReceptionReports
}
}
@@ -1045,8 +1053,8 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) (*RTPDeltaInfo
}
var deltaStatsSenderView *RTPDeltaInfo
thenSenderView, nowSenderView := r.getAndResetSenderSnapshotWindow(senderSnapshotID)
if thenSenderView != nil && nowSenderView != nil {
thenSenderView, nowSenderView, ok := r.getAndResetSenderSnapshotWindow(senderSnapshotID)
if !ok {
startTime := thenSenderView.startTime
endTime := nowSenderView.startTime
@@ -1108,8 +1116,8 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) (*RTPDeltaInfo
var deltaStatsReceiverView *RTPDeltaInfo
if r.lastRRTime != 0 {
thenReceiverView, nowReceiverView := r.getAndResetSenderSnapshotReceiverView(senderSnapshotID)
if thenReceiverView != nil && nowReceiverView != nil {
thenReceiverView, nowReceiverView, ok := r.getAndResetSenderSnapshotReceiverView(senderSnapshotID)
if !ok {
startTime := thenReceiverView.startTime
endTime := nowReceiverView.startTime
@@ -1213,9 +1221,9 @@ func (r *RTPStatsSender) ToProto() *livekit.RTPStats {
return p
}
func (r *RTPStatsSender) getAndResetSenderSnapshotWindow(senderSnapshotID uint32) (*senderSnapshotWindow, *senderSnapshotWindow) {
func (r *RTPStatsSender) getAndResetSenderSnapshotWindow(senderSnapshotID uint32) (senderSnapshotWindow, senderSnapshotWindow, bool) {
if !r.initialized || senderSnapshotID < cFirstSnapshotID {
return nil, nil
return senderSnapshotWindow{}, senderSnapshotWindow{}, false
}
idx := senderSnapshotID - cFirstSnapshotID
@@ -1227,7 +1235,7 @@ func (r *RTPStatsSender) getAndResetSenderSnapshotWindow(senderSnapshotID uint32
// snapshot now
r.senderSnapshots[idx].senderView = r.getSenderSnapshotWindow(mono.UnixNano())
return &then.senderView, &r.senderSnapshots[idx].senderView
return then.senderView, r.senderSnapshots[idx].senderView, true
}
func (r *RTPStatsSender) getSenderSnapshotWindow(startTime int64) senderSnapshotWindow {
@@ -1254,9 +1262,9 @@ func (r *RTPStatsSender) getSenderSnapshotWindow(startTime int64) senderSnapshot
}
}
func (r *RTPStatsSender) getAndResetSenderSnapshotReceiverView(senderSnapshotID uint32) (*senderSnapshotReceiverView, *senderSnapshotReceiverView) {
func (r *RTPStatsSender) getAndResetSenderSnapshotReceiverView(senderSnapshotID uint32) (senderSnapshotReceiverView, senderSnapshotReceiverView, bool) {
if !r.initialized || r.lastRRTime == 0 || senderSnapshotID < cFirstSnapshotID {
return nil, nil
return senderSnapshotReceiverView{}, senderSnapshotReceiverView{}, false
}
idx := senderSnapshotID - cFirstSnapshotID
@@ -1268,7 +1276,7 @@ func (r *RTPStatsSender) getAndResetSenderSnapshotReceiverView(senderSnapshotID
// snapshot now
r.senderSnapshots[idx].receiverView = r.getSenderSnapshotReceiverView(r.lastRRTime, &then.receiverView)
return &then.receiverView, &r.senderSnapshots[idx].receiverView
return then.receiverView, r.senderSnapshots[idx].receiverView, true
}
func (r *RTPStatsSender) getSenderSnapshotReceiverView(startTime int64, s *senderSnapshotReceiverView) senderSnapshotReceiverView {
@@ -1347,11 +1355,13 @@ func (r *RTPStatsSender) clearSnInfos(extStartInclusive uint64, extEndExclusive
return
}
if extEndExclusive-extStartInclusive > uint64(len(r.snInfos)) {
clear(r.snInfos)
return
}
for esn := extStartInclusive; esn != extEndExclusive; esn++ {
snInfo := &r.snInfos[int(esn)%len(r.snInfos)]
snInfo.pktSize = 0
snInfo.hdrSize = 0
snInfo.flags = 0
r.snInfos[int(esn)%len(r.snInfos)] = snInfo{}
}
}
@@ -1380,8 +1390,7 @@ func (r *RTPStatsSender) getIntervalStats(
intervalStats.packetsNotFoundMetadata = (extEndExclusive - extStartInclusive) - (extEndExclusiveClamped - extStartInclusiveClamped)
for esn := extStartInclusiveClamped; esn != extEndExclusiveClamped; esn++ {
slot := r.getSnInfoOutOfOrderSlot(esn, ehsn)
snInfo := &r.snInfos[slot]
snInfo := &r.snInfos[int(esn)%len(r.snInfos)]
switch {
case snInfo.pktSize == 0:
intervalStats.packetsLostFeed++