From 856cc1798fec4bdd08857d2b0dc26650a6215530 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 1 Jul 2022 11:31:47 +0530 Subject: [PATCH] RemoveSubscriber while revoking. (#796) Else, a quick turn around of permissions tries to subscribe again and the subscriber is already in the subscribed list. --- pkg/rtc/mediatracksubscriptions.go | 2 +- pkg/rtc/uptrackmanager.go | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 92a88e78f..865d0362a 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -385,7 +385,7 @@ func (t *MediaTrackSubscriptions) RevokeDisallowedSubscribers(allowedSubscriberI "subscriber", subTrack.SubscriberIdentity(), "subscriberID", subTrack.SubscriberID(), ) - go subTrack.DownTrack().Close() + t.RemoveSubscriber(subTrack.SubscriberID(), false) revokedSubscriberIdentities = append(revokedSubscriberIdentities, subTrack.SubscriberIdentity()) } } diff --git a/pkg/rtc/uptrackmanager.go b/pkg/rtc/uptrackmanager.go index 215edf37a..1b81c3bff 100644 --- a/pkg/rtc/uptrackmanager.go +++ b/pkg/rtc/uptrackmanager.go @@ -8,6 +8,7 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/utils" ) var ( @@ -32,6 +33,8 @@ type UpTrackManager struct { // keeps tracks of track specific subscribers who are awaiting permission pendingSubscriptions map[livekit.TrackID][]livekit.ParticipantIdentity // trackID => []subscriberIdentity + opsQueue *utils.OpsQueue + lock sync.RWMutex // callbacks & handlers @@ -44,10 +47,12 @@ func NewUpTrackManager(params UpTrackManagerParams) *UpTrackManager { params: params, publishedTracks: make(map[livekit.TrackID]types.MediaTrack), pendingSubscriptions: make(map[livekit.TrackID][]livekit.ParticipantIdentity), + opsQueue: utils.NewOpsQueue(params.Logger, "utm", 20), } } func (u *UpTrackManager) Start() { + u.opsQueue.Start() } func (u *UpTrackManager) Restart() { @@ -57,6 +62,8 @@ func (u *UpTrackManager) Restart() { } func (u *UpTrackManager) Close(willBeResumed bool) { + u.opsQueue.Stop() + u.lock.Lock() u.closed = true notify := len(u.publishedTracks) == 0 @@ -415,7 +422,9 @@ func (u *UpTrackManager) maybeAddPendingSubscription(trackID livekit.TrackID, su } u.pendingSubscriptions[trackID] = append(u.pendingSubscriptions[trackID], subscriberIdentity) - go sub.SubscriptionPermissionUpdate(u.params.SID, trackID, false) + u.opsQueue.Enqueue(func() { + sub.SubscriptionPermissionUpdate(u.params.SID, trackID, false) + }) } func (u *UpTrackManager) maybeRemovePendingSubscription(trackID livekit.TrackID, sub types.LocalParticipant) { @@ -468,7 +477,9 @@ func (u *UpTrackManager) processPendingSubscriptions(resolver func(participantId continue } - go sub.SubscriptionPermissionUpdate(u.params.SID, trackID, true) + u.opsQueue.Enqueue(func() { + sub.SubscriptionPermissionUpdate(u.params.SID, trackID, true) + }) } updatedPendingSubscriptions[trackID] = updatedPending