From 32cd0370c7b19eb4df1c02da97a4f4bd2bbe0c28 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 19 Dec 2025 20:25:22 +0530 Subject: [PATCH] Flush the ext packets on restart/close and release packets. (#4179) --- pkg/sfu/buffer/buffer.go | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 3c62d13ca..abbce1079 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -597,6 +597,8 @@ func (b *Buffer) Close() error { if cb := b.getOnClose(); cb != nil { cb() } + + go b.flushExtPackets() }) return nil } @@ -694,7 +696,7 @@ func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime int64, i if b.nacker != nil { b.nacker = nack.NewNACKQueue(nack.NackQueueParamsDefault) } - b.extPackets.Clear() + b.flushExtPacketsLocked() flowState = b.updateStreamState(rtpPacket, arrivalTime) isRestart = true @@ -954,7 +956,13 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64, isRTX } } -func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime int64, isBuffered bool, isRestart bool, flowState rtpstats.RTPFlowState) *ExtPacket { +func (b *Buffer) getExtPacket( + rtpPacket *rtp.Packet, + arrivalTime int64, + isBuffered bool, + isRestart bool, + flowState rtpstats.RTPFlowState, +) *ExtPacket { ep := ExtPacketFactory.Get().(*ExtPacket) *ep = ExtPacket{ Arrival: arrivalTime, @@ -1107,6 +1115,20 @@ func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime int64, isBuffer return ep } +func (b *Buffer) flushExtPackets() { + b.Lock() + defer b.Unlock() + b.flushExtPacketsLocked() +} + +func (b *Buffer) flushExtPacketsLocked() { + for b.extPackets.Len() > 0 { + ep := b.extPackets.PopFront() + ReleaseExtPacket(ep) + } + b.extPackets.Clear() +} + func (b *Buffer) doNACKs() { if b.nacker == nil { return