From 8c241ecf1237d3f771eeab34f8e5dce1f706b05c Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 6 Dec 2025 17:49:23 +0530 Subject: [PATCH] Fix RTCP reader leak in DownTrack. (#4131) When a participant is closing, RTCP readers should be cleaned up from factory even if the participant is expected to resume. The resumed participant will be a new participant session and peer connection(s) and everything will be set up again. --- pkg/rtc/mediatracksubscriptions.go | 4 ++-- pkg/rtc/subscriptionmanager.go | 4 ++-- pkg/sfu/downtrack.go | 11 ++++++----- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index f350596c2..855f8d78f 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -287,10 +287,10 @@ func (t *MediaTrackSubscriptions) closeSubscribedTrack(subTrack types.Subscribed } if isExpectedToResume { - dt.CloseWithFlush(false) + dt.CloseWithFlush(false, false) } else { // flushing blocks, avoid blocking when publisher removes all its subscribers - go dt.CloseWithFlush(true) + go dt.CloseWithFlush(true, true) } } diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index e7a1786be..02871a5ea 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -128,12 +128,12 @@ func (m *SubscriptionManager) Close(isExpectedToResume bool) { if isExpectedToResume { for _, dt := range downTracksToClose { - dt.CloseWithFlush(false) + dt.CloseWithFlush(false, true) } } else { // flush blocks, so execute in parallel for _, dt := range downTracksToClose { - go dt.CloseWithFlush(true) + go dt.CloseWithFlush(true, true) } } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index d93815db9..774222099 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1319,7 +1319,7 @@ func (d *DownTrack) IsClosed() bool { } func (d *DownTrack) Close() { - d.CloseWithFlush(true) + d.CloseWithFlush(true, true) } // CloseWithFlush - flush used to indicate whether send blank frame to flush @@ -1328,7 +1328,7 @@ func (d *DownTrack) Close() { // set flush=true to avoid previous video shows before new stream is displayed. // 2. in case of session migration, participant migrate from other node, video track should // be resumed with same participant, set flush=false since we don't need to flush decoder. -func (d *DownTrack) CloseWithFlush(flush bool) { +func (d *DownTrack) CloseWithFlush(flush bool, isEnding bool) { d.bindLock.Lock() if d.isClosed.Swap(true) { // already closed @@ -1364,12 +1364,12 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.setBindStateLocked(bindStateUnbound) d.Receiver().DeleteDownTrack(d.SubscriberID()) - if d.rtcpReader != nil && flush { + if d.rtcpReader != nil && isEnding { d.params.Logger.Debugw("downtrack close rtcp reader") d.rtcpReader.Close() d.rtcpReader.OnPacket(nil) } - if d.rtcpReaderRTX != nil && flush { + if d.rtcpReaderRTX != nil && isEnding { d.params.Logger.Debugw("downtrack close rtcp rtx reader") d.rtcpReaderRTX.Close() d.rtcpReaderRTX.OnPacket(nil) @@ -1380,7 +1380,8 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.rtpStats.Stop() d.rtpStatsRTX.Stop() - d.params.Logger.Debugw("rtp stats", + d.params.Logger.Debugw( + "rtp stats", "direction", "downstream", "mime", d.Mime().String(), "ssrc", d.ssrc,