From 9ab8c1d52232ed4ea402421d28630dd8c6ea5dfd Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Mon, 30 Mar 2026 20:48:48 -0700 Subject: [PATCH] clear track notifier observers on subscription teardown (#4413) When a subscriber disconnects, observer closures registered on the publisher's TrackChangedNotifier and TrackRemovedNotifier were never removed. These closures capture the SubscriptionManager, which holds the ParticipantImpl, preventing the entire participant object graph (PCTransport, SDPs, RTP stats, DownTracks) from being garbage collected. In rooms with many participants that disconnect and reconnect frequently, this causes unbounded memory growth proportional to the number of disconnect events. The leaked memory is not recoverable while the room remains open. Clear notifiers in both handleSubscribedTrackClose (individual subscription teardown) and SubscriptionManager.Close (full participant teardown), matching the existing cleanup in handleSourceTrackRemoved. Co-authored-by: Claude Opus 4.6 (1M context) --- pkg/rtc/subscriptionmanager.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index 8e2590397..1512fc0b3 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -19,12 +19,13 @@ package rtc import ( "context" "errors" + "maps" + "slices" "sync" "time" "github.com/pion/webrtc/v4/pkg/rtcerr" "go.uber.org/atomic" - "golang.org/x/exp/maps" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" @@ -114,6 +115,16 @@ func (m *SubscriptionManager) Close(isExpectedToResume bool) { prometheus.RecordTrackSubscribeCancels(int32(m.getNumCancellations())) + // Remove observer closures from track change/remove notifiers to allow + // this participant and its transports to be garbage collected. + m.lock.Lock() + subs := maps.Clone(m.subscriptions) + m.lock.Unlock() + for _, sub := range subs { + sub.setChangedNotifier(nil) + sub.setRemovedNotifier(nil) + } + subTracks := m.GetSubscribedTracks() downTracksToClose := make([]*sfu.DownTrack, 0, len(subTracks)) for _, st := range subTracks { @@ -332,7 +343,7 @@ func (m *SubscriptionManager) GetSubscribedParticipants() []livekit.ParticipantI m.lock.RLock() defer m.lock.RUnlock() - return maps.Keys(m.subscribedTo) + return slices.Collect(maps.Keys(m.subscribedTo)) } func (m *SubscriptionManager) IsSubscribedTo(participantID livekit.ParticipantID) bool { @@ -945,6 +956,8 @@ func (m *SubscriptionManager) handleSubscribedTrackClose(s *mediaTrackSubscripti return } s.setSubscribedTrack(nil) + s.setChangedNotifier(nil) + s.setRemovedNotifier(nil) var relieveFromLimits bool switch subTrack.MediaTrack().Kind() {