diff --git a/pkg/rtc/uptrackmanager.go b/pkg/rtc/uptrackmanager.go index c674b1f8a..9006f6c6f 100644 --- a/pkg/rtc/uptrackmanager.go +++ b/pkg/rtc/uptrackmanager.go @@ -142,7 +142,7 @@ func (u *UpTrackManager) AddSubscriber(sub types.LocalParticipant, params types. } n += 1 - u.maybeRemovePendingSubscriptionLocked(trackID, sub, true) + u.maybeRemovePendingSubscriptionLocked(trackID, sub, true, true) } return n, nil } @@ -156,7 +156,7 @@ func (u *UpTrackManager) RemoveSubscriber(sub types.LocalParticipant, trackID li track.RemoveSubscriber(sub.ID(), willBeResumed) } - u.maybeRemovePendingSubscriptionLocked(trackID, sub, false) + u.maybeRemovePendingSubscriptionLocked(trackID, sub, false, false) } func (u *UpTrackManager) SetPublishedTrackMuted(trackID livekit.TrackID, muted bool) types.MediaTrack { @@ -253,7 +253,12 @@ func (u *UpTrackManager) UpdateSubscriptionPermission( "permissions", u.subscriptionPermission.String(), "version", u.subscriptionPermissionVersion.ToProto().String(), ) - if err := u.parseSubscriptionPermissions(subscriptionPermission, resolverBySid); err != nil { + if err := u.parseSubscriptionPermissionsLocked(subscriptionPermission, func(pID livekit.ParticipantID) types.LocalParticipant { + u.lock.Unlock() + p := resolverBySid(pID) + u.lock.Lock() + return p + }); err != nil { // when failed, do not override previous permissions u.params.Logger.Errorw("failed updating subscription permission", err) u.lock.Unlock() @@ -342,7 +347,7 @@ func (u *UpTrackManager) getPublishedTrackLocked(trackID livekit.TrackID) types. return u.publishedTracks[trackID] } -func (u *UpTrackManager) parseSubscriptionPermissions( +func (u *UpTrackManager) parseSubscriptionPermissionsLocked( subscriptionPermission *livekit.SubscriptionPermission, resolver func(participantID livekit.ParticipantID) types.LocalParticipant, ) error { @@ -470,7 +475,7 @@ func (u *UpTrackManager) maybeAddPendingSubscriptionLocked( }) } -func (u *UpTrackManager) maybeRemovePendingSubscriptionLocked(trackID livekit.TrackID, sub types.LocalParticipant, sendUpdate bool) { +func (u *UpTrackManager) maybeRemovePendingSubscriptionLocked(trackID livekit.TrackID, sub types.LocalParticipant, sendUpdate bool, forceUpdate bool) { subscriberIdentity := sub.Identity() found := false @@ -489,7 +494,7 @@ func (u *UpTrackManager) maybeRemovePendingSubscriptionLocked(trackID livekit.Tr delete(u.pendingSubscriptions, trackID) } - if found && sendUpdate { + if sendUpdate && (forceUpdate || found) { u.params.Logger.Debugw("removing pending subscription", "subscriberID", sub.ID(), "trackID", trackID) u.opsQueue.Enqueue(func() { sub.SubscriptionPermissionUpdate(u.params.SID, trackID, true)