From c3964ba2ebfd93972be72c43a45f6337969d7155 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 14 Nov 2025 16:13:23 +0530 Subject: [PATCH] Use sync.Pool for objects in packet path. (#4066) * Use sync.Pool for objects in packet path. Seeing cases of forwarding latency spikes that aling with GC. This might be a bit overkill, but using sync.Pool for small + short-lived objects in packet path. Before this, all these were increasing in alloc_space heap profile samples over time. With these, there is no increase (actually the lines corresponding to geting from pool does not even show up in heap accounting when doing `list` in `pprof`) * merge * Paul feedback --- pkg/sfu/buffer/buffer.go | 22 +++++++- pkg/sfu/buffer/buffer_test.go | 9 ++++ pkg/sfu/buffer/dependencydescriptorparser.go | 23 +++++++- pkg/sfu/downtrack.go | 56 +++++++++++++------- pkg/sfu/pacer/base.go | 15 +++++- pkg/sfu/pacer/pacer.go | 11 ++++ pkg/sfu/receiver.go | 2 + pkg/sfu/sfu.go | 17 +++--- 8 files changed, 125 insertions(+), 30 deletions(-) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 929ddabe9..02874b179 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -46,6 +46,16 @@ import ( "github.com/livekit/protocol/utils/mono" ) +var ( + ExtPacketFactory = &sync.Pool{ + New: func() any { + return &ExtPacket{} + }, + } +) + +// -------------------------------------- + const ( ReportDelta = 1e9 @@ -543,6 +553,15 @@ func (b *Buffer) ReadExtended(buf []byte) (*ExtPacket, error) { } } +func (b *Buffer) ReleaseExtPacket(extPkt *ExtPacket) { + if b.ddParser != nil { + b.ddParser.ReleaseExtDependencyDescriptor(extPkt.DependencyDescriptor) + } + + *extPkt = ExtPacket{} + ExtPacketFactory.Put(extPkt) +} + func (b *Buffer) Close() error { b.closeOnce.Do(func() { b.closed.Store(true) @@ -903,7 +922,8 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64, isRTX } func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime int64, isBuffered bool, flowState rtpstats.RTPFlowState) *ExtPacket { - ep := &ExtPacket{ + ep := ExtPacketFactory.Get().(*ExtPacket) + *ep = ExtPacket{ Arrival: arrivalTime, ExtSequenceNumber: flowState.ExtSequenceNumber, ExtTimestamp: flowState.ExtTimestamp, diff --git a/pkg/sfu/buffer/buffer_test.go b/pkg/sfu/buffer/buffer_test.go index 55aaebb7d..15036aa63 100644 --- a/pkg/sfu/buffer/buffer_test.go +++ b/pkg/sfu/buffer/buffer_test.go @@ -433,3 +433,12 @@ func BenchmarkMemcpu(b *testing.B) { copy(buf2, buf) } } + +func BenchmarkExtPacketFactory(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + extPkt := ExtPacketFactory.Get().(*ExtPacket) + *extPkt = ExtPacket{} + ExtPacketFactory.Put(extPkt) + } +} diff --git a/pkg/sfu/buffer/dependencydescriptorparser.go b/pkg/sfu/buffer/dependencydescriptorparser.go index 9926517cc..8641a4047 100644 --- a/pkg/sfu/buffer/dependencydescriptorparser.go +++ b/pkg/sfu/buffer/dependencydescriptorparser.go @@ -17,6 +17,7 @@ package buffer import ( "fmt" "sort" + "sync" "time" "github.com/pion/rtp" @@ -28,6 +29,16 @@ import ( "github.com/livekit/protocol/logger" ) +var ( + ExtDependencyDescriptorFactory = &sync.Pool{ + New: func() any { + return &ExtDependencyDescriptor{} + }, + } +) + +// -------------------------------------- + const ( ddRestartThreshold = 30 * time.Second @@ -156,7 +167,8 @@ func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescr r.frameChecker.AddPacket(extSeq, extFN, &ddVal) - extDD := &ExtDependencyDescriptor{ + extDD := ExtDependencyDescriptorFactory.Get().(*ExtDependencyDescriptor) + *extDD = ExtDependencyDescriptor{ Descriptor: &ddVal, ExtFrameNum: extFN, Integrity: r.frameChecker.FrameIntegrity(extFN), @@ -235,6 +247,15 @@ func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescr return extDD, videoLayer, nil } +func (r *DependencyDescriptorParser) ReleaseExtDependencyDescriptor(extDD *ExtDependencyDescriptor) { + if extDD == nil { + return + } + + *extDD = ExtDependencyDescriptor{} + ExtDependencyDescriptorFactory.Put(extDD) +} + func (r *DependencyDescriptorParser) restart() { r.frameChecker = NewFrameIntegrityChecker(integrityCheckFrame, integrityCheckPkt) r.structure = nil diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 4cd0042af..d93815db9 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1013,17 +1013,16 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) int32 { payload = payload[:len(tp.codecBytes)+n] // translate RTP header - hdr := &rtp.Header{ + hdr := RTPHeaderFactory.Get().(*rtp.Header) + *hdr = rtp.Header{ Version: extPkt.Packet.Version, Padding: extPkt.Packet.Padding, + Marker: tp.marker, PayloadType: d.getTranslatedPayloadType(extPkt.Packet.PayloadType), SequenceNumber: uint16(tp.rtp.extSequenceNumber), Timestamp: uint32(tp.rtp.extTimestamp), SSRC: d.ssrc, } - if tp.marker { - hdr.Marker = tp.marker - } // add extensions if d.dependencyDescriptorExtID != 0 && tp.ddBytes != nil { @@ -1094,8 +1093,10 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) int32 { 0, extPkt.IsOutOfOrder, ) - d.pacer.Enqueue(&pacer.Packet{ + pacerPacket := pacer.PacketFactory.Get().(*pacer.Packet) + *pacerPacket = pacer.Packet{ Header: hdr, + HeaderPool: RTPHeaderFactory, HeaderSize: headerSize, Payload: payload, ProbeClusterId: ccutils.ProbeClusterId(d.probeClusterId.Load()), @@ -1104,7 +1105,8 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) int32 { WriteStream: d.writeStream, Pool: PacketFactory, PoolEntity: poolEntity, - }) + } + d.pacer.Enqueue(pacerPacket) if extPkt.KeyFrame { d.isNACKThrottled.Store(false) @@ -1197,7 +1199,8 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa bytesSent := 0 payloads := make([]byte, RTPPaddingMaxPayloadSize*len(snts)) for i := 0; i < len(snts); i++ { - hdr := &rtp.Header{ + hdr := RTPHeaderFactory.Get().(*rtp.Header) + *hdr = rtp.Header{ Version: 2, Padding: true, Marker: false, @@ -1225,8 +1228,10 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa false, ) - d.pacer.Enqueue(&pacer.Packet{ + pacerPacket := pacer.PacketFactory.Get().(*pacer.Packet) + *pacerPacket = pacer.Packet{ Header: hdr, + HeaderPool: RTPHeaderFactory, HeaderSize: hdrSize, Payload: payload, ProbeClusterId: ccutils.ProbeClusterId(d.probeClusterId.Load()), @@ -1234,7 +1239,8 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), WriteStream: d.writeStream, - }) + } + d.pacer.Enqueue(pacerPacket) bytesSent += hdrSize + payloadSize } @@ -1748,7 +1754,8 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan 0, false, ) - d.pacer.Enqueue(&pacer.Packet{ + pacerPacket := pacer.PacketFactory.Get().(*pacer.Packet) + *pacerPacket = pacer.Packet{ Header: hdr, HeaderSize: headerSize, Payload: payload, @@ -1756,7 +1763,8 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), WriteStream: d.writeStream, - }) + } + d.pacer.Enqueue(pacerPacket) // only the first frame will need frameEndNeeded to close out the // previous picture, rest are small key frames (for the video case) @@ -2045,7 +2053,8 @@ func (d *DownTrack) retransmitPacket(epm *extPacketMeta, sourcePkt []byte, isPro d.params.Logger.Errorw("could not unmarshal rtp packet to send via RTX", err) return 0, err } - hdr := &rtp.Header{ + hdr := RTPHeaderFactory.Get().(*rtp.Header) + *hdr = rtp.Header{ Version: pkt.Header.Version, Padding: pkt.Header.Padding, Marker: epm.marker, @@ -2132,8 +2141,10 @@ func (d *DownTrack) retransmitPacket(epm *extPacketMeta, sourcePkt []byte, isPro isOutOfOrder, ) } - d.pacer.Enqueue(&pacer.Packet{ + pacerPacket := pacer.PacketFactory.Get().(*pacer.Packet) + *pacerPacket = pacer.Packet{ Header: hdr, + HeaderPool: RTPHeaderFactory, HeaderSize: headerSize, Payload: payload, ProbeClusterId: ccutils.ProbeClusterId(d.probeClusterId.Load()), @@ -2144,7 +2155,8 @@ func (d *DownTrack) retransmitPacket(epm *extPacketMeta, sourcePkt []byte, isPro WriteStream: d.writeStream, Pool: PacketFactory, PoolEntity: poolEntity, - }) + } + d.pacer.Enqueue(pacerPacket) return headerSize + len(payload), nil } @@ -2224,7 +2236,8 @@ func (d *DownTrack) WriteProbePackets(bytesToSend int, usePadding bool) int { payloads := make([]byte, RTPPaddingMaxPayloadSize*num) for i := 0; i < num; i++ { rtxExtSequenceNumber := d.rtxSequenceNumber.Inc() - hdr := &rtp.Header{ + hdr := RTPHeaderFactory.Get().(*rtp.Header) + *hdr = rtp.Header{ Version: 2, Padding: true, Marker: false, @@ -2251,8 +2264,10 @@ func (d *DownTrack) WriteProbePackets(bytesToSend int, usePadding bool) int { payloadSize, false, ) - d.pacer.Enqueue(&pacer.Packet{ + pacerPacket := pacer.PacketFactory.Get().(*pacer.Packet) + *pacerPacket = pacer.Packet{ Header: hdr, + HeaderPool: RTPHeaderFactory, HeaderSize: hdrSize, Payload: payload, ProbeClusterId: ccutils.ProbeClusterId(d.probeClusterId.Load()), @@ -2260,7 +2275,8 @@ func (d *DownTrack) WriteProbePackets(bytesToSend int, usePadding bool) int { AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), WriteStream: d.writeStream, - }) + } + d.pacer.Enqueue(pacerPacket) bytesSent += hdrSize + payloadSize } @@ -2481,7 +2497,8 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { len(payload), // although this is using empty frames, mark as padding as these are used to trigger Pion OnTrack only false, ) - d.pacer.Enqueue(&pacer.Packet{ + pacerPacket := pacer.PacketFactory.Get().(*pacer.Packet) + *pacerPacket = pacer.Packet{ Header: hdr, HeaderSize: headerSize, Payload: payload, @@ -2489,7 +2506,8 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), WriteStream: d.writeStream, - }) + } + d.pacer.Enqueue(pacerPacket) } numFrames-- diff --git a/pkg/sfu/pacer/base.go b/pkg/sfu/pacer/base.go index b7097786f..96e3be250 100644 --- a/pkg/sfu/pacer/base.go +++ b/pkg/sfu/pacer/base.go @@ -20,6 +20,7 @@ import ( "time" "github.com/livekit/livekit-server/pkg/sfu/bwe" + "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils/mono" "github.com/pion/rtp" @@ -56,9 +57,17 @@ func (b *Base) TimeSinceLastSentPacket() time.Duration { func (b *Base) SendPacket(p *Packet) (int, error) { defer func() { + if p.HeaderPool != nil && p.Header != nil { + *p.Header = rtp.Header{} + p.HeaderPool.Put(p.Header) + } + if p.Pool != nil && p.PoolEntity != nil { p.Pool.Put(p.PoolEntity) } + + *p = Packet{} + PacketFactory.Put(p) }() err := b.patchRTPHeaderExtensions(p) @@ -83,8 +92,10 @@ func (b *Base) SendPacket(p *Packet) (int, error) { func (b *Base) patchRTPHeaderExtensions(p *Packet) error { sendingAt := mono.Now() if p.AbsSendTimeExtID != 0 { - absSendTime := rtp.NewAbsSendTimeExtension(sendingAt) - absSendTimeBytes, err := absSendTime.Marshal() + absSendTimeExt := rtp.AbsSendTimeExtension{ + Timestamp: uint64(mediatransportutil.ToNtpTime(sendingAt) >> 14), + } + absSendTimeBytes, err := absSendTimeExt.Marshal() if err != nil { return err } diff --git a/pkg/sfu/pacer/pacer.go b/pkg/sfu/pacer/pacer.go index 8532fe26e..f23212519 100644 --- a/pkg/sfu/pacer/pacer.go +++ b/pkg/sfu/pacer/pacer.go @@ -23,6 +23,16 @@ import ( "github.com/pion/webrtc/v4" ) +var ( + PacketFactory = &sync.Pool{ + New: func() any { + return &Packet{} + }, + } +) + +// -------------------------------------- + type PacerBehavior string const ( @@ -33,6 +43,7 @@ const ( type Packet struct { Header *rtp.Header + HeaderPool *sync.Pool HeaderSize int Payload []byte IsRTX bool diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 0b46f299f..7d6cb2085 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -867,6 +867,8 @@ func (w *WebRTCReceiver) forwardRTP(layer int32, buff *buffer.Buffer) { } numPacketsForwarded++ + + buff.ReleaseExtPacket(pkt) } } diff --git a/pkg/sfu/sfu.go b/pkg/sfu/sfu.go index 648c1e563..c8760d069 100644 --- a/pkg/sfu/sfu.go +++ b/pkg/sfu/sfu.go @@ -16,18 +16,21 @@ package sfu import ( "sync" + + "github.com/pion/rtp" ) var ( - PacketFactory *sync.Pool -) - -func init() { - // Init packet factory PacketFactory = &sync.Pool{ - New: func() interface{} { + New: func() any { b := make([]byte, 1460) return &b }, } -} + + RTPHeaderFactory = &sync.Pool{ + New: func() any { + return &rtp.Header{} + }, + } +)