From bfae13eaa6280dd613e461d412947b5e6aef43e5 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 1 Mar 2022 23:30:13 +0530 Subject: [PATCH] Keep track of pending subscription close. (#478) Introduce a pending close map and fire onNoSubscribers only when there are not subscribed and no pending close. There are a couple of paths for down track close - RemoveSubscriber - RemoveAllSubscriber We remove the subscriber from `subscribedTracks` in these. This is because `AddSubscriber` checks for existing subscription. If there is a remove followed by an add, the add should not think there is an existing susbcription if there is a delay in down track close callback. But, down track close is also called directly from places like participant close. So, have to clean up both subscribedTrack and pendingClose when the down track close fires. Call onNoSubscribers only when both are empty. This will allow relay up track to stop properly when all susbcribers have left. --- pkg/rtc/mediatracksubscriptions.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index ccf4298c2..3fd0dfde0 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -27,7 +27,8 @@ type MediaTrackSubscriptions struct { params MediaTrackSubscriptionsParams subscribedTracksMu sync.RWMutex - subscribedTracks map[livekit.ParticipantID]types.SubscribedTrack // participantID => types.SubscribedTrack + subscribedTracks map[livekit.ParticipantID]types.SubscribedTrack + pendingClose map[livekit.ParticipantID]types.SubscribedTrack onNoSubscribers func() @@ -56,6 +57,7 @@ func NewMediaTrackSubscriptions(params MediaTrackSubscriptionsParams) *MediaTrac t := &MediaTrackSubscriptions{ params: params, subscribedTracks: make(map[livekit.ParticipantID]types.SubscribedTrack), + pendingClose: make(map[livekit.ParticipantID]types.SubscribedTrack), maxSubscriberQuality: make(map[livekit.ParticipantID]livekit.VideoQuality), maxSubscriberNodeQuality: make(map[livekit.NodeID]livekit.VideoQuality), maxSubscribedQuality: livekit.VideoQuality_LOW, @@ -207,6 +209,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, code downTrack.OnCloseHandler(func() { t.subscribedTracksMu.Lock() delete(t.subscribedTracks, subscriberID) + delete(t.pendingClose, subscriberID) t.subscribedTracksMu.Unlock() t.maybeNotifyNoSubscribers() @@ -271,6 +274,9 @@ func (t *MediaTrackSubscriptions) RemoveSubscriber(participantID livekit.Partici t.subscribedTracksMu.Lock() delete(t.subscribedTracks, participantID) + if subTrack != nil { + t.pendingClose[participantID] = subTrack + } t.subscribedTracksMu.Unlock() if subTrack != nil { @@ -284,6 +290,10 @@ func (t *MediaTrackSubscriptions) RemoveAllSubscribers() { t.subscribedTracksMu.Lock() subscribedTracks := t.getAllSubscribedTracksLocked() t.subscribedTracks = make(map[livekit.ParticipantID]types.SubscribedTrack) + + for _, subTrack := range subscribedTracks { + t.pendingClose[subTrack.SubscriberID()] = subTrack + } t.subscribedTracksMu.Unlock() for _, subTrack := range subscribedTracks { @@ -539,7 +549,7 @@ func (t *MediaTrackSubscriptions) maybeNotifyNoSubscribers() { } t.subscribedTracksMu.RLock() - empty := len(t.subscribedTracks) == 0 + empty := len(t.subscribedTracks) == 0 && len(t.pendingClose) == 0 t.subscribedTracksMu.RUnlock() if empty {