Keep track of pending subscription close. (#478)

Introduce a pending close map and fire onNoSubscribers only
when there are not subscribed and no pending close.

There are a couple of paths for down track close
- RemoveSubscriber
- RemoveAllSubscriber
We remove the subscriber from `subscribedTracks` in these.
This is because `AddSubscriber` checks for existing subscription.
If there is a remove followed by an add, the add should not think
there is an existing susbcription if there is a delay in down track
close callback.

But, down track close is also called directly from places like
participant close. So, have to clean up both subscribedTrack
and pendingClose when the down track close fires.

Call onNoSubscribers only when both are empty. This will allow
relay up track to stop properly when all susbcribers have left.
This commit is contained in:
Raja Subramanian
2022-03-01 23:30:13 +05:30
committed by GitHub
parent 7449175c8e
commit bfae13eaa6

View File

@@ -27,7 +27,8 @@ type MediaTrackSubscriptions struct {
params MediaTrackSubscriptionsParams
subscribedTracksMu sync.RWMutex
subscribedTracks map[livekit.ParticipantID]types.SubscribedTrack // participantID => types.SubscribedTrack
subscribedTracks map[livekit.ParticipantID]types.SubscribedTrack
pendingClose map[livekit.ParticipantID]types.SubscribedTrack
onNoSubscribers func()
@@ -56,6 +57,7 @@ func NewMediaTrackSubscriptions(params MediaTrackSubscriptionsParams) *MediaTrac
t := &MediaTrackSubscriptions{
params: params,
subscribedTracks: make(map[livekit.ParticipantID]types.SubscribedTrack),
pendingClose: make(map[livekit.ParticipantID]types.SubscribedTrack),
maxSubscriberQuality: make(map[livekit.ParticipantID]livekit.VideoQuality),
maxSubscriberNodeQuality: make(map[livekit.NodeID]livekit.VideoQuality),
maxSubscribedQuality: livekit.VideoQuality_LOW,
@@ -207,6 +209,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, code
downTrack.OnCloseHandler(func() {
t.subscribedTracksMu.Lock()
delete(t.subscribedTracks, subscriberID)
delete(t.pendingClose, subscriberID)
t.subscribedTracksMu.Unlock()
t.maybeNotifyNoSubscribers()
@@ -271,6 +274,9 @@ func (t *MediaTrackSubscriptions) RemoveSubscriber(participantID livekit.Partici
t.subscribedTracksMu.Lock()
delete(t.subscribedTracks, participantID)
if subTrack != nil {
t.pendingClose[participantID] = subTrack
}
t.subscribedTracksMu.Unlock()
if subTrack != nil {
@@ -284,6 +290,10 @@ func (t *MediaTrackSubscriptions) RemoveAllSubscribers() {
t.subscribedTracksMu.Lock()
subscribedTracks := t.getAllSubscribedTracksLocked()
t.subscribedTracks = make(map[livekit.ParticipantID]types.SubscribedTrack)
for _, subTrack := range subscribedTracks {
t.pendingClose[subTrack.SubscriberID()] = subTrack
}
t.subscribedTracksMu.Unlock()
for _, subTrack := range subscribedTracks {
@@ -539,7 +549,7 @@ func (t *MediaTrackSubscriptions) maybeNotifyNoSubscribers() {
}
t.subscribedTracksMu.RLock()
empty := len(t.subscribedTracks) == 0
empty := len(t.subscribedTracks) == 0 && len(t.pendingClose) == 0
t.subscribedTracksMu.RUnlock()
if empty {