mirror of
https://github.com/livekit/livekit.git
synced 2026-04-26 23:55:25 +00:00
clear track notifier observers on subscription teardown (#4413)
When a subscriber disconnects, observer closures registered on the publisher's TrackChangedNotifier and TrackRemovedNotifier were never removed. These closures capture the SubscriptionManager, which holds the ParticipantImpl, preventing the entire participant object graph (PCTransport, SDPs, RTP stats, DownTracks) from being garbage collected. In rooms with many participants that disconnect and reconnect frequently, this causes unbounded memory growth proportional to the number of disconnect events. The leaked memory is not recoverable while the room remains open. Clear notifiers in both handleSubscribedTrackClose (individual subscription teardown) and SubscriptionManager.Close (full participant teardown), matching the existing cleanup in handleSourceTrackRemoved. Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -19,12 +19,13 @@ package rtc
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"maps"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pion/webrtc/v4/pkg/rtcerr"
|
||||
"go.uber.org/atomic"
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
"github.com/livekit/livekit-server/pkg/sfu"
|
||||
@@ -114,6 +115,16 @@ func (m *SubscriptionManager) Close(isExpectedToResume bool) {
|
||||
|
||||
prometheus.RecordTrackSubscribeCancels(int32(m.getNumCancellations()))
|
||||
|
||||
// Remove observer closures from track change/remove notifiers to allow
|
||||
// this participant and its transports to be garbage collected.
|
||||
m.lock.Lock()
|
||||
subs := maps.Clone(m.subscriptions)
|
||||
m.lock.Unlock()
|
||||
for _, sub := range subs {
|
||||
sub.setChangedNotifier(nil)
|
||||
sub.setRemovedNotifier(nil)
|
||||
}
|
||||
|
||||
subTracks := m.GetSubscribedTracks()
|
||||
downTracksToClose := make([]*sfu.DownTrack, 0, len(subTracks))
|
||||
for _, st := range subTracks {
|
||||
@@ -332,7 +343,7 @@ func (m *SubscriptionManager) GetSubscribedParticipants() []livekit.ParticipantI
|
||||
m.lock.RLock()
|
||||
defer m.lock.RUnlock()
|
||||
|
||||
return maps.Keys(m.subscribedTo)
|
||||
return slices.Collect(maps.Keys(m.subscribedTo))
|
||||
}
|
||||
|
||||
func (m *SubscriptionManager) IsSubscribedTo(participantID livekit.ParticipantID) bool {
|
||||
@@ -945,6 +956,8 @@ func (m *SubscriptionManager) handleSubscribedTrackClose(s *mediaTrackSubscripti
|
||||
return
|
||||
}
|
||||
s.setSubscribedTrack(nil)
|
||||
s.setChangedNotifier(nil)
|
||||
s.setRemovedNotifier(nil)
|
||||
|
||||
var relieveFromLimits bool
|
||||
switch subTrack.MediaTrack().Kind() {
|
||||
|
||||
Reference in New Issue
Block a user