RemoveSubscriber while revoking. (#796)

Else, a quick turn around of permissions tries to subscribe
again and the subscriber is already in the subscribed list.
This commit is contained in:
Raja Subramanian
2022-07-01 11:31:47 +05:30
committed by GitHub
parent c15eeeff2b
commit 856cc1798f
2 changed files with 14 additions and 3 deletions

View File

@@ -385,7 +385,7 @@ func (t *MediaTrackSubscriptions) RevokeDisallowedSubscribers(allowedSubscriberI
"subscriber", subTrack.SubscriberIdentity(),
"subscriberID", subTrack.SubscriberID(),
)
go subTrack.DownTrack().Close()
t.RemoveSubscriber(subTrack.SubscriberID(), false)
revokedSubscriberIdentities = append(revokedSubscriberIdentities, subTrack.SubscriberIdentity())
}
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/utils"
)
var (
@@ -32,6 +33,8 @@ type UpTrackManager struct {
// keeps tracks of track specific subscribers who are awaiting permission
pendingSubscriptions map[livekit.TrackID][]livekit.ParticipantIdentity // trackID => []subscriberIdentity
opsQueue *utils.OpsQueue
lock sync.RWMutex
// callbacks & handlers
@@ -44,10 +47,12 @@ func NewUpTrackManager(params UpTrackManagerParams) *UpTrackManager {
params: params,
publishedTracks: make(map[livekit.TrackID]types.MediaTrack),
pendingSubscriptions: make(map[livekit.TrackID][]livekit.ParticipantIdentity),
opsQueue: utils.NewOpsQueue(params.Logger, "utm", 20),
}
}
func (u *UpTrackManager) Start() {
u.opsQueue.Start()
}
func (u *UpTrackManager) Restart() {
@@ -57,6 +62,8 @@ func (u *UpTrackManager) Restart() {
}
func (u *UpTrackManager) Close(willBeResumed bool) {
u.opsQueue.Stop()
u.lock.Lock()
u.closed = true
notify := len(u.publishedTracks) == 0
@@ -415,7 +422,9 @@ func (u *UpTrackManager) maybeAddPendingSubscription(trackID livekit.TrackID, su
}
u.pendingSubscriptions[trackID] = append(u.pendingSubscriptions[trackID], subscriberIdentity)
go sub.SubscriptionPermissionUpdate(u.params.SID, trackID, false)
u.opsQueue.Enqueue(func() {
sub.SubscriptionPermissionUpdate(u.params.SID, trackID, false)
})
}
func (u *UpTrackManager) maybeRemovePendingSubscription(trackID livekit.TrackID, sub types.LocalParticipant) {
@@ -468,7 +477,9 @@ func (u *UpTrackManager) processPendingSubscriptions(resolver func(participantId
continue
}
go sub.SubscriptionPermissionUpdate(u.params.SID, trackID, true)
u.opsQueue.Enqueue(func() {
sub.SubscriptionPermissionUpdate(u.params.SID, trackID, true)
})
}
updatedPendingSubscriptions[trackID] = updatedPending