Force 'allowed; even on subscription.' (#1187)

Also not holding lock while resolving by sid. Not pretty.
But, we should deprecate SID based permissions.

Also, wire changes, possibly due to redis cluster PR?
This commit is contained in:
Raja Subramanian
2022-11-23 14:13:20 +05:30
committed by GitHub
parent 6711060cdb
commit 3074cb862f

View File

@@ -142,7 +142,7 @@ func (u *UpTrackManager) AddSubscriber(sub types.LocalParticipant, params types.
}
n += 1
u.maybeRemovePendingSubscriptionLocked(trackID, sub, true)
u.maybeRemovePendingSubscriptionLocked(trackID, sub, true, true)
}
return n, nil
}
@@ -156,7 +156,7 @@ func (u *UpTrackManager) RemoveSubscriber(sub types.LocalParticipant, trackID li
track.RemoveSubscriber(sub.ID(), willBeResumed)
}
u.maybeRemovePendingSubscriptionLocked(trackID, sub, false)
u.maybeRemovePendingSubscriptionLocked(trackID, sub, false, false)
}
func (u *UpTrackManager) SetPublishedTrackMuted(trackID livekit.TrackID, muted bool) types.MediaTrack {
@@ -253,7 +253,12 @@ func (u *UpTrackManager) UpdateSubscriptionPermission(
"permissions", u.subscriptionPermission.String(),
"version", u.subscriptionPermissionVersion.ToProto().String(),
)
if err := u.parseSubscriptionPermissions(subscriptionPermission, resolverBySid); err != nil {
if err := u.parseSubscriptionPermissionsLocked(subscriptionPermission, func(pID livekit.ParticipantID) types.LocalParticipant {
u.lock.Unlock()
p := resolverBySid(pID)
u.lock.Lock()
return p
}); err != nil {
// when failed, do not override previous permissions
u.params.Logger.Errorw("failed updating subscription permission", err)
u.lock.Unlock()
@@ -342,7 +347,7 @@ func (u *UpTrackManager) getPublishedTrackLocked(trackID livekit.TrackID) types.
return u.publishedTracks[trackID]
}
func (u *UpTrackManager) parseSubscriptionPermissions(
func (u *UpTrackManager) parseSubscriptionPermissionsLocked(
subscriptionPermission *livekit.SubscriptionPermission,
resolver func(participantID livekit.ParticipantID) types.LocalParticipant,
) error {
@@ -470,7 +475,7 @@ func (u *UpTrackManager) maybeAddPendingSubscriptionLocked(
})
}
func (u *UpTrackManager) maybeRemovePendingSubscriptionLocked(trackID livekit.TrackID, sub types.LocalParticipant, sendUpdate bool) {
func (u *UpTrackManager) maybeRemovePendingSubscriptionLocked(trackID livekit.TrackID, sub types.LocalParticipant, sendUpdate bool, forceUpdate bool) {
subscriberIdentity := sub.Identity()
found := false
@@ -489,7 +494,7 @@ func (u *UpTrackManager) maybeRemovePendingSubscriptionLocked(trackID livekit.Tr
delete(u.pendingSubscriptions, trackID)
}
if found && sendUpdate {
if sendUpdate && (forceUpdate || found) {
u.params.Logger.Debugw("removing pending subscription", "subscriberID", sub.ID(), "trackID", trackID)
u.opsQueue.Enqueue(func() {
sub.SubscriptionPermissionUpdate(u.params.SID, trackID, true)