diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index 8e2590397..1512fc0b3 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -19,12 +19,13 @@ package rtc import ( "context" "errors" + "maps" + "slices" "sync" "time" "github.com/pion/webrtc/v4/pkg/rtcerr" "go.uber.org/atomic" - "golang.org/x/exp/maps" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" @@ -114,6 +115,16 @@ func (m *SubscriptionManager) Close(isExpectedToResume bool) { prometheus.RecordTrackSubscribeCancels(int32(m.getNumCancellations())) + // Remove observer closures from track change/remove notifiers to allow + // this participant and its transports to be garbage collected. + m.lock.Lock() + subs := maps.Clone(m.subscriptions) + m.lock.Unlock() + for _, sub := range subs { + sub.setChangedNotifier(nil) + sub.setRemovedNotifier(nil) + } + subTracks := m.GetSubscribedTracks() downTracksToClose := make([]*sfu.DownTrack, 0, len(subTracks)) for _, st := range subTracks { @@ -332,7 +343,7 @@ func (m *SubscriptionManager) GetSubscribedParticipants() []livekit.ParticipantI m.lock.RLock() defer m.lock.RUnlock() - return maps.Keys(m.subscribedTo) + return slices.Collect(maps.Keys(m.subscribedTo)) } func (m *SubscriptionManager) IsSubscribedTo(participantID livekit.ParticipantID) bool { @@ -945,6 +956,8 @@ func (m *SubscriptionManager) handleSubscribedTrackClose(s *mediaTrackSubscripti return } s.setSubscribedTrack(nil) + s.setChangedNotifier(nil) + s.setRemovedNotifier(nil) var relieveFromLimits bool switch subTrack.MediaTrack().Kind() {