Use sync.Pool for objects in packet path. (#4066)

* Use sync.Pool for objects in packet path.

Seeing cases of forwarding latency spikes that aling with GC.

This might be a bit overkill, but using sync.Pool for small +
short-lived objects in packet path.

Before this, all these were increasing in alloc_space heap profile
samples over time. With these, there is no increase (actually the lines
corresponding to geting from pool does not even show up in heap
accounting when doing `list` in `pprof`)

* merge

* Paul feedback
This commit is contained in:
Raja Subramanian
2025-11-14 16:13:23 +05:30
committed by GitHub
parent f8b994d491
commit c3964ba2eb
8 changed files with 125 additions and 30 deletions
+21 -1
View File
@@ -46,6 +46,16 @@ import (
"github.com/livekit/protocol/utils/mono"
)
var (
ExtPacketFactory = &sync.Pool{
New: func() any {
return &ExtPacket{}
},
}
)
// --------------------------------------
const (
ReportDelta = 1e9
@@ -543,6 +553,15 @@ func (b *Buffer) ReadExtended(buf []byte) (*ExtPacket, error) {
}
}
func (b *Buffer) ReleaseExtPacket(extPkt *ExtPacket) {
if b.ddParser != nil {
b.ddParser.ReleaseExtDependencyDescriptor(extPkt.DependencyDescriptor)
}
*extPkt = ExtPacket{}
ExtPacketFactory.Put(extPkt)
}
func (b *Buffer) Close() error {
b.closeOnce.Do(func() {
b.closed.Store(true)
@@ -903,7 +922,8 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64, isRTX
}
func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime int64, isBuffered bool, flowState rtpstats.RTPFlowState) *ExtPacket {
ep := &ExtPacket{
ep := ExtPacketFactory.Get().(*ExtPacket)
*ep = ExtPacket{
Arrival: arrivalTime,
ExtSequenceNumber: flowState.ExtSequenceNumber,
ExtTimestamp: flowState.ExtTimestamp,
+9
View File
@@ -433,3 +433,12 @@ func BenchmarkMemcpu(b *testing.B) {
copy(buf2, buf)
}
}
func BenchmarkExtPacketFactory(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
extPkt := ExtPacketFactory.Get().(*ExtPacket)
*extPkt = ExtPacket{}
ExtPacketFactory.Put(extPkt)
}
}
+22 -1
View File
@@ -17,6 +17,7 @@ package buffer
import (
"fmt"
"sort"
"sync"
"time"
"github.com/pion/rtp"
@@ -28,6 +29,16 @@ import (
"github.com/livekit/protocol/logger"
)
var (
ExtDependencyDescriptorFactory = &sync.Pool{
New: func() any {
return &ExtDependencyDescriptor{}
},
}
)
// --------------------------------------
const (
ddRestartThreshold = 30 * time.Second
@@ -156,7 +167,8 @@ func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescr
r.frameChecker.AddPacket(extSeq, extFN, &ddVal)
extDD := &ExtDependencyDescriptor{
extDD := ExtDependencyDescriptorFactory.Get().(*ExtDependencyDescriptor)
*extDD = ExtDependencyDescriptor{
Descriptor: &ddVal,
ExtFrameNum: extFN,
Integrity: r.frameChecker.FrameIntegrity(extFN),
@@ -235,6 +247,15 @@ func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescr
return extDD, videoLayer, nil
}
func (r *DependencyDescriptorParser) ReleaseExtDependencyDescriptor(extDD *ExtDependencyDescriptor) {
if extDD == nil {
return
}
*extDD = ExtDependencyDescriptor{}
ExtDependencyDescriptorFactory.Put(extDD)
}
func (r *DependencyDescriptorParser) restart() {
r.frameChecker = NewFrameIntegrityChecker(integrityCheckFrame, integrityCheckPkt)
r.structure = nil
+37 -19
View File
@@ -1013,17 +1013,16 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) int32 {
payload = payload[:len(tp.codecBytes)+n]
// translate RTP header
hdr := &rtp.Header{
hdr := RTPHeaderFactory.Get().(*rtp.Header)
*hdr = rtp.Header{
Version: extPkt.Packet.Version,
Padding: extPkt.Packet.Padding,
Marker: tp.marker,
PayloadType: d.getTranslatedPayloadType(extPkt.Packet.PayloadType),
SequenceNumber: uint16(tp.rtp.extSequenceNumber),
Timestamp: uint32(tp.rtp.extTimestamp),
SSRC: d.ssrc,
}
if tp.marker {
hdr.Marker = tp.marker
}
// add extensions
if d.dependencyDescriptorExtID != 0 && tp.ddBytes != nil {
@@ -1094,8 +1093,10 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) int32 {
0,
extPkt.IsOutOfOrder,
)
d.pacer.Enqueue(&pacer.Packet{
pacerPacket := pacer.PacketFactory.Get().(*pacer.Packet)
*pacerPacket = pacer.Packet{
Header: hdr,
HeaderPool: RTPHeaderFactory,
HeaderSize: headerSize,
Payload: payload,
ProbeClusterId: ccutils.ProbeClusterId(d.probeClusterId.Load()),
@@ -1104,7 +1105,8 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) int32 {
WriteStream: d.writeStream,
Pool: PacketFactory,
PoolEntity: poolEntity,
})
}
d.pacer.Enqueue(pacerPacket)
if extPkt.KeyFrame {
d.isNACKThrottled.Store(false)
@@ -1197,7 +1199,8 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa
bytesSent := 0
payloads := make([]byte, RTPPaddingMaxPayloadSize*len(snts))
for i := 0; i < len(snts); i++ {
hdr := &rtp.Header{
hdr := RTPHeaderFactory.Get().(*rtp.Header)
*hdr = rtp.Header{
Version: 2,
Padding: true,
Marker: false,
@@ -1225,8 +1228,10 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa
false,
)
d.pacer.Enqueue(&pacer.Packet{
pacerPacket := pacer.PacketFactory.Get().(*pacer.Packet)
*pacerPacket = pacer.Packet{
Header: hdr,
HeaderPool: RTPHeaderFactory,
HeaderSize: hdrSize,
Payload: payload,
ProbeClusterId: ccutils.ProbeClusterId(d.probeClusterId.Load()),
@@ -1234,7 +1239,8 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa
AbsSendTimeExtID: uint8(d.absSendTimeExtID),
TransportWideExtID: uint8(d.transportWideExtID),
WriteStream: d.writeStream,
})
}
d.pacer.Enqueue(pacerPacket)
bytesSent += hdrSize + payloadSize
}
@@ -1748,7 +1754,8 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan
0,
false,
)
d.pacer.Enqueue(&pacer.Packet{
pacerPacket := pacer.PacketFactory.Get().(*pacer.Packet)
*pacerPacket = pacer.Packet{
Header: hdr,
HeaderSize: headerSize,
Payload: payload,
@@ -1756,7 +1763,8 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan
AbsSendTimeExtID: uint8(d.absSendTimeExtID),
TransportWideExtID: uint8(d.transportWideExtID),
WriteStream: d.writeStream,
})
}
d.pacer.Enqueue(pacerPacket)
// only the first frame will need frameEndNeeded to close out the
// previous picture, rest are small key frames (for the video case)
@@ -2045,7 +2053,8 @@ func (d *DownTrack) retransmitPacket(epm *extPacketMeta, sourcePkt []byte, isPro
d.params.Logger.Errorw("could not unmarshal rtp packet to send via RTX", err)
return 0, err
}
hdr := &rtp.Header{
hdr := RTPHeaderFactory.Get().(*rtp.Header)
*hdr = rtp.Header{
Version: pkt.Header.Version,
Padding: pkt.Header.Padding,
Marker: epm.marker,
@@ -2132,8 +2141,10 @@ func (d *DownTrack) retransmitPacket(epm *extPacketMeta, sourcePkt []byte, isPro
isOutOfOrder,
)
}
d.pacer.Enqueue(&pacer.Packet{
pacerPacket := pacer.PacketFactory.Get().(*pacer.Packet)
*pacerPacket = pacer.Packet{
Header: hdr,
HeaderPool: RTPHeaderFactory,
HeaderSize: headerSize,
Payload: payload,
ProbeClusterId: ccutils.ProbeClusterId(d.probeClusterId.Load()),
@@ -2144,7 +2155,8 @@ func (d *DownTrack) retransmitPacket(epm *extPacketMeta, sourcePkt []byte, isPro
WriteStream: d.writeStream,
Pool: PacketFactory,
PoolEntity: poolEntity,
})
}
d.pacer.Enqueue(pacerPacket)
return headerSize + len(payload), nil
}
@@ -2224,7 +2236,8 @@ func (d *DownTrack) WriteProbePackets(bytesToSend int, usePadding bool) int {
payloads := make([]byte, RTPPaddingMaxPayloadSize*num)
for i := 0; i < num; i++ {
rtxExtSequenceNumber := d.rtxSequenceNumber.Inc()
hdr := &rtp.Header{
hdr := RTPHeaderFactory.Get().(*rtp.Header)
*hdr = rtp.Header{
Version: 2,
Padding: true,
Marker: false,
@@ -2251,8 +2264,10 @@ func (d *DownTrack) WriteProbePackets(bytesToSend int, usePadding bool) int {
payloadSize,
false,
)
d.pacer.Enqueue(&pacer.Packet{
pacerPacket := pacer.PacketFactory.Get().(*pacer.Packet)
*pacerPacket = pacer.Packet{
Header: hdr,
HeaderPool: RTPHeaderFactory,
HeaderSize: hdrSize,
Payload: payload,
ProbeClusterId: ccutils.ProbeClusterId(d.probeClusterId.Load()),
@@ -2260,7 +2275,8 @@ func (d *DownTrack) WriteProbePackets(bytesToSend int, usePadding bool) int {
AbsSendTimeExtID: uint8(d.absSendTimeExtID),
TransportWideExtID: uint8(d.transportWideExtID),
WriteStream: d.writeStream,
})
}
d.pacer.Enqueue(pacerPacket)
bytesSent += hdrSize + payloadSize
}
@@ -2481,7 +2497,8 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() {
len(payload), // although this is using empty frames, mark as padding as these are used to trigger Pion OnTrack only
false,
)
d.pacer.Enqueue(&pacer.Packet{
pacerPacket := pacer.PacketFactory.Get().(*pacer.Packet)
*pacerPacket = pacer.Packet{
Header: hdr,
HeaderSize: headerSize,
Payload: payload,
@@ -2489,7 +2506,8 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() {
AbsSendTimeExtID: uint8(d.absSendTimeExtID),
TransportWideExtID: uint8(d.transportWideExtID),
WriteStream: d.writeStream,
})
}
d.pacer.Enqueue(pacerPacket)
}
numFrames--
+13 -2
View File
@@ -20,6 +20,7 @@ import (
"time"
"github.com/livekit/livekit-server/pkg/sfu/bwe"
"github.com/livekit/mediatransportutil"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/mono"
"github.com/pion/rtp"
@@ -56,9 +57,17 @@ func (b *Base) TimeSinceLastSentPacket() time.Duration {
func (b *Base) SendPacket(p *Packet) (int, error) {
defer func() {
if p.HeaderPool != nil && p.Header != nil {
*p.Header = rtp.Header{}
p.HeaderPool.Put(p.Header)
}
if p.Pool != nil && p.PoolEntity != nil {
p.Pool.Put(p.PoolEntity)
}
*p = Packet{}
PacketFactory.Put(p)
}()
err := b.patchRTPHeaderExtensions(p)
@@ -83,8 +92,10 @@ func (b *Base) SendPacket(p *Packet) (int, error) {
func (b *Base) patchRTPHeaderExtensions(p *Packet) error {
sendingAt := mono.Now()
if p.AbsSendTimeExtID != 0 {
absSendTime := rtp.NewAbsSendTimeExtension(sendingAt)
absSendTimeBytes, err := absSendTime.Marshal()
absSendTimeExt := rtp.AbsSendTimeExtension{
Timestamp: uint64(mediatransportutil.ToNtpTime(sendingAt) >> 14),
}
absSendTimeBytes, err := absSendTimeExt.Marshal()
if err != nil {
return err
}
+11
View File
@@ -23,6 +23,16 @@ import (
"github.com/pion/webrtc/v4"
)
var (
PacketFactory = &sync.Pool{
New: func() any {
return &Packet{}
},
}
)
// --------------------------------------
type PacerBehavior string
const (
@@ -33,6 +43,7 @@ const (
type Packet struct {
Header *rtp.Header
HeaderPool *sync.Pool
HeaderSize int
Payload []byte
IsRTX bool
+2
View File
@@ -867,6 +867,8 @@ func (w *WebRTCReceiver) forwardRTP(layer int32, buff *buffer.Buffer) {
}
numPacketsForwarded++
buff.ReleaseExtPacket(pkt)
}
}
+10 -7
View File
@@ -16,18 +16,21 @@ package sfu
import (
"sync"
"github.com/pion/rtp"
)
var (
PacketFactory *sync.Pool
)
func init() {
// Init packet factory
PacketFactory = &sync.Pool{
New: func() interface{} {
New: func() any {
b := make([]byte, 1460)
return &b
},
}
}
RTPHeaderFactory = &sync.Pool{
New: func() any {
return &rtp.Header{}
},
}
)