Files
livekit/pkg/rtc/uptrackmanager.go
Raja Subramanian 177061712c Do not hold lock while invoking resolver. (#816)
* Do not hold lock while invoking resolver.

Resolver is in room and it will grab its lock.
It is called from partcipant when checking permissions.
Permissions processing uses the participant lock.
So, not a good idea to call a room function with
participant lock held. Avoid that.

Also, use a full lock in the add/remove subscription
path. This is to ensure that permissions path and
subscription path (like subscribing to new participant's
tracks) do not race. As subscriptions are queued now
on the subscriber side, this should be fine.

* Revoke path resolution in ops queue goroutine

* fix test
2022-07-07 13:24:26 +05:30

598 lines
16 KiB
Go

package rtc
import (
"errors"
"sync"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/utils"
)
var (
ErrSubscriptionPermissionNeedsId = errors.New("either participant identity or SID needed")
)
type UpTrackManagerParams struct {
SID livekit.ParticipantID
Logger logger.Logger
}
type UpTrackManager struct {
params UpTrackManagerParams
closed bool
// publishedTracks that participant is publishing
publishedTracks map[livekit.TrackID]types.MediaTrack
subscriptionPermission *livekit.SubscriptionPermission
// subscriber permission for published tracks
subscriberPermissions map[livekit.ParticipantIdentity]*livekit.TrackPermission // subscriberIdentity => *livekit.TrackPermission
// keeps tracks of track specific subscribers who are awaiting permission
pendingSubscriptions map[livekit.TrackID][]livekit.ParticipantIdentity // trackID => []subscriberIdentity
opsQueue *utils.OpsQueue
lock sync.RWMutex
// callbacks & handlers
onClose func()
onTrackUpdated func(track types.MediaTrack, onlyIfReady bool)
}
func NewUpTrackManager(params UpTrackManagerParams) *UpTrackManager {
return &UpTrackManager{
params: params,
publishedTracks: make(map[livekit.TrackID]types.MediaTrack),
pendingSubscriptions: make(map[livekit.TrackID][]livekit.ParticipantIdentity),
opsQueue: utils.NewOpsQueue(params.Logger, "utm", 20),
}
}
func (u *UpTrackManager) Start() {
u.opsQueue.Start()
}
func (u *UpTrackManager) Restart() {
for _, t := range u.GetPublishedTracks() {
t.Restart()
}
}
func (u *UpTrackManager) Close(willBeResumed bool) {
u.opsQueue.Stop()
u.lock.Lock()
u.closed = true
notify := len(u.publishedTracks) == 0
u.lock.Unlock()
// remove all subscribers
for _, t := range u.GetPublishedTracks() {
t.RemoveAllSubscribers(willBeResumed)
}
if notify && u.onClose != nil {
u.onClose()
}
}
func (u *UpTrackManager) OnUpTrackManagerClose(f func()) {
u.onClose = f
}
func (u *UpTrackManager) ToProto() []*livekit.TrackInfo {
u.lock.RLock()
defer u.lock.RUnlock()
var trackInfos []*livekit.TrackInfo
for _, t := range u.publishedTracks {
trackInfos = append(trackInfos, t.ToProto())
}
return trackInfos
}
func (u *UpTrackManager) OnPublishedTrackUpdated(f func(track types.MediaTrack, onlyIfReady bool)) {
u.onTrackUpdated = f
}
// AddSubscriber subscribes op to all publishedTracks
func (u *UpTrackManager) AddSubscriber(sub types.LocalParticipant, params types.AddSubscriberParams) (int, error) {
u.lock.Lock()
defer u.lock.Unlock()
var tracks []types.MediaTrack
if params.AllTracks {
for _, t := range u.publishedTracks {
tracks = append(tracks, t)
}
} else {
for _, trackID := range params.TrackIDs {
track := u.getPublishedTrackLocked(trackID)
if track == nil {
continue
}
tracks = append(tracks, track)
}
}
if len(tracks) == 0 {
return 0, nil
}
var trackIDs []livekit.TrackID
for _, track := range tracks {
trackIDs = append(trackIDs, track.ID())
}
u.params.Logger.Debugw("subscribing participant to tracks",
"subscriber", sub.Identity(),
"subscriberID", sub.ID(),
"trackIDs", trackIDs)
n := 0
for _, track := range tracks {
trackID := track.ID()
subscriberIdentity := sub.Identity()
if !u.hasPermissionLocked(trackID, subscriberIdentity) {
u.maybeAddPendingSubscriptionLocked(trackID, subscriberIdentity, sub, nil)
continue
}
if err := track.AddSubscriber(sub); err != nil {
return n, err
}
n += 1
u.maybeRemovePendingSubscriptionLocked(trackID, sub, true)
}
return n, nil
}
func (u *UpTrackManager) RemoveSubscriber(sub types.LocalParticipant, trackID livekit.TrackID, willBeResumed bool) {
u.lock.Lock()
defer u.lock.Unlock()
track := u.getPublishedTrackLocked(trackID)
if track != nil {
track.RemoveSubscriber(sub.ID(), willBeResumed)
}
u.maybeRemovePendingSubscriptionLocked(trackID, sub, false)
}
func (u *UpTrackManager) SetPublishedTrackMuted(trackID livekit.TrackID, muted bool) types.MediaTrack {
u.lock.RLock()
track := u.publishedTracks[trackID]
u.lock.RUnlock()
if track != nil {
currentMuted := track.IsMuted()
track.SetMuted(muted)
if currentMuted != track.IsMuted() {
u.params.Logger.Debugw("mute status changed", "trackID", trackID, "muted", track.IsMuted())
if u.onTrackUpdated != nil {
u.onTrackUpdated(track, false)
}
}
}
return track
}
func (u *UpTrackManager) GetPublishedTrack(trackID livekit.TrackID) types.MediaTrack {
u.lock.RLock()
defer u.lock.RUnlock()
return u.getPublishedTrackLocked(trackID)
}
func (u *UpTrackManager) GetPublishedTracks() []types.MediaTrack {
u.lock.RLock()
defer u.lock.RUnlock()
tracks := make([]types.MediaTrack, 0, len(u.publishedTracks))
for _, t := range u.publishedTracks {
tracks = append(tracks, t)
}
return tracks
}
func (u *UpTrackManager) UpdateSubscriptionPermission(
subscriptionPermission *livekit.SubscriptionPermission,
resolverByIdentity func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant,
resolverBySid func(participantID livekit.ParticipantID) types.LocalParticipant,
) error {
u.lock.Lock()
if subscriptionPermission == nil {
u.params.Logger.Debugw("updating subscription permission, setting to nil")
// store as is for use when migrating
u.subscriptionPermission = subscriptionPermission
// possible to get a nil when migrating
u.lock.Unlock()
return nil
}
u.params.Logger.Debugw("updating subscription permission", "permissions", subscriptionPermission.String())
if err := u.parseSubscriptionPermissions(subscriptionPermission, resolverBySid); err != nil {
// when failed, do not override previous permissions
u.params.Logger.Errorw("failed updating subscription permission", err)
u.lock.Unlock()
return err
}
// store as is for use when migrating
u.subscriptionPermission = subscriptionPermission
u.lock.Unlock()
u.processPendingSubscriptions(resolverByIdentity)
u.maybeRevokeSubscriptions(resolverByIdentity)
return nil
}
func (u *UpTrackManager) SubscriptionPermission() *livekit.SubscriptionPermission {
u.lock.RLock()
defer u.lock.RUnlock()
return u.subscriptionPermission
}
func (u *UpTrackManager) UpdateVideoLayers(updateVideoLayers *livekit.UpdateVideoLayers) error {
track := u.GetPublishedTrack(livekit.TrackID(updateVideoLayers.TrackSid))
if track == nil {
u.params.Logger.Warnw("could not find track", nil, "trackID", livekit.TrackID(updateVideoLayers.TrackSid))
return errors.New("could not find published track")
}
track.UpdateVideoLayers(updateVideoLayers.Layers)
if u.onTrackUpdated != nil {
u.onTrackUpdated(track, false)
}
return nil
}
func (u *UpTrackManager) UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []types.SubscribedCodecQuality) error {
track := u.GetPublishedTrack(trackID)
if track == nil {
u.params.Logger.Warnw("could not find track", nil, "trackID", trackID)
return errors.New("could not find published track")
}
track.NotifySubscriberNodeMaxQuality(nodeID, maxQualities)
return nil
}
func (u *UpTrackManager) UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error {
track := u.GetPublishedTrack(trackID)
if track == nil {
u.params.Logger.Warnw("could not find track", nil, "trackID", trackID)
return errors.New("could not find published track")
}
track.NotifySubscriberNodeMediaLoss(nodeID, uint8(fractionalLoss))
return nil
}
func (u *UpTrackManager) AddPublishedTrack(track types.MediaTrack) {
u.lock.Lock()
if _, ok := u.publishedTracks[track.ID()]; !ok {
u.publishedTracks[track.ID()] = track
}
u.lock.Unlock()
u.params.Logger.Debugw("added published track", "trackID", track.ID(), "trackInfo", track.ToProto().String())
track.AddOnClose(func() {
notifyClose := false
// cleanup
u.lock.Lock()
trackID := track.ID()
delete(u.publishedTracks, trackID)
delete(u.pendingSubscriptions, trackID)
// not modifying subscription permissions, will get reset on next update from participant
if u.closed && len(u.publishedTracks) == 0 {
notifyClose = true
}
u.lock.Unlock()
// only send this when client is in a ready state
if u.onTrackUpdated != nil {
u.onTrackUpdated(track, true)
}
if notifyClose && u.onClose != nil {
u.onClose()
}
})
}
func (u *UpTrackManager) RemovePublishedTrack(track types.MediaTrack, willBeResumed bool) {
track.RemoveAllSubscribers(willBeResumed)
u.lock.Lock()
delete(u.publishedTracks, track.ID())
u.lock.Unlock()
}
func (u *UpTrackManager) getPublishedTrackLocked(trackID livekit.TrackID) types.MediaTrack {
return u.publishedTracks[trackID]
}
func (u *UpTrackManager) parseSubscriptionPermissions(
subscriptionPermission *livekit.SubscriptionPermission,
resolver func(participantID livekit.ParticipantID) types.LocalParticipant,
) error {
// every update overrides the existing
// all_participants takes precedence
if subscriptionPermission.AllParticipants {
// everything is allowed, nothing else to do
u.subscriberPermissions = nil
return nil
}
// per participant permissions
subscriberPermissions := make(map[livekit.ParticipantIdentity]*livekit.TrackPermission)
for _, trackPerms := range subscriptionPermission.TrackPermissions {
subscriberIdentity := livekit.ParticipantIdentity(trackPerms.ParticipantIdentity)
if subscriberIdentity == "" {
if trackPerms.ParticipantSid == "" {
return ErrSubscriptionPermissionNeedsId
}
sub := resolver(livekit.ParticipantID(trackPerms.ParticipantSid))
if sub == nil {
u.params.Logger.Warnw("could not find subscriber for permissions update", nil, "subscriberID", trackPerms.ParticipantSid)
continue
}
subscriberIdentity = sub.Identity()
} else {
if trackPerms.ParticipantSid != "" {
sub := resolver(livekit.ParticipantID(trackPerms.ParticipantSid))
if sub != nil && sub.Identity() != subscriberIdentity {
u.params.Logger.Errorw("participant identity mismatch", nil, "expected", subscriberIdentity, "got", sub.Identity())
}
if sub == nil {
u.params.Logger.Warnw("could not find subscriber for permissions update", nil, "subscriberID", trackPerms.ParticipantSid)
}
}
}
subscriberPermissions[subscriberIdentity] = trackPerms
}
u.subscriberPermissions = subscriberPermissions
return nil
}
func (u *UpTrackManager) hasPermissionLocked(trackID livekit.TrackID, subscriberIdentity livekit.ParticipantIdentity) bool {
if u.subscriberPermissions == nil {
return true
}
perms, ok := u.subscriberPermissions[subscriberIdentity]
if !ok {
return false
}
if perms.AllTracks {
return true
}
for _, sid := range perms.TrackSids {
if livekit.TrackID(sid) == trackID {
return true
}
}
return false
}
func (u *UpTrackManager) getAllowedSubscribersLocked(trackID livekit.TrackID) []livekit.ParticipantIdentity {
if u.subscriberPermissions == nil {
return nil
}
allowed := make([]livekit.ParticipantIdentity, 0)
for subscriberIdentity, perms := range u.subscriberPermissions {
if perms.AllTracks {
allowed = append(allowed, subscriberIdentity)
continue
}
for _, sid := range perms.TrackSids {
if livekit.TrackID(sid) == trackID {
allowed = append(allowed, subscriberIdentity)
break
}
}
}
return allowed
}
func (u *UpTrackManager) maybeAddPendingSubscriptionLocked(
trackID livekit.TrackID,
subscriberIdentity livekit.ParticipantIdentity,
sub types.LocalParticipant,
resolver func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant,
) {
pending := u.pendingSubscriptions[trackID]
for _, identity := range pending {
if identity == subscriberIdentity {
// already pending
return
}
}
u.pendingSubscriptions[trackID] = append(u.pendingSubscriptions[trackID], subscriberIdentity)
u.params.Logger.Debugw("adding pending subscription", "subscriberIdentity", subscriberIdentity, "trackID", trackID)
u.opsQueue.Enqueue(func() {
if sub == nil {
if resolver == nil {
u.params.Logger.Warnw("no resolver", nil)
} else {
sub = resolver(subscriberIdentity)
}
}
if sub != nil {
sub.SubscriptionPermissionUpdate(u.params.SID, trackID, false)
} else {
u.params.Logger.Warnw("could not send subscription permission update, no subscriber", nil, "subscriberIdentity", subscriberIdentity)
}
})
}
func (u *UpTrackManager) maybeRemovePendingSubscriptionLocked(trackID livekit.TrackID, sub types.LocalParticipant, sendUpdate bool) {
subscriberIdentity := sub.Identity()
found := false
pending := u.pendingSubscriptions[trackID]
n := len(pending)
for idx, identity := range pending {
if identity == subscriberIdentity {
found = true
u.pendingSubscriptions[trackID][idx] = u.pendingSubscriptions[trackID][n-1]
u.pendingSubscriptions[trackID] = u.pendingSubscriptions[trackID][:n-1]
break
}
}
if len(u.pendingSubscriptions[trackID]) == 0 {
delete(u.pendingSubscriptions, trackID)
}
if found && sendUpdate {
u.params.Logger.Debugw("removing pending subscription", "subscriberID", sub.ID(), "trackID", trackID)
u.opsQueue.Enqueue(func() {
sub.SubscriptionPermissionUpdate(u.params.SID, trackID, true)
})
}
}
// creates subscriptions for tracks if permissions have been granted
func (u *UpTrackManager) processPendingSubscriptions(resolver func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant) {
type ResolvedInfo struct {
sub types.LocalParticipant
state livekit.ParticipantInfo_State
}
// gather all identites that need resolving
resolvedInfos := make(map[livekit.ParticipantIdentity]*ResolvedInfo)
u.lock.RLock()
for trackID, pending := range u.pendingSubscriptions {
track := u.getPublishedTrackLocked(trackID)
if track == nil {
// published track is gone
continue
}
for _, identity := range pending {
resolvedInfos[identity] = nil
}
}
u.lock.RUnlock()
for identity := range resolvedInfos {
sub := resolver(identity)
if sub != nil {
resolvedInfos[identity] = &ResolvedInfo{
sub: sub,
state: sub.State(),
}
}
}
// check for subscriptions that can be reinstated
u.lock.Lock()
updatedPendingSubscriptions := make(map[livekit.TrackID][]livekit.ParticipantIdentity)
for trackID, pending := range u.pendingSubscriptions {
track := u.getPublishedTrackLocked(trackID)
if track == nil {
// published track is gone
continue
}
var updatedPending []livekit.ParticipantIdentity
for _, identity := range pending {
resolvedInfo := resolvedInfos[identity]
if resolvedInfo == nil || resolvedInfo.sub == nil || resolvedInfo.state == livekit.ParticipantInfo_DISCONNECTED {
// do not keep this pending subscription as subscriber may be gone
continue
}
if !u.hasPermissionLocked(trackID, identity) {
updatedPending = append(updatedPending, identity)
continue
}
sub := resolvedInfo.sub
if err := track.AddSubscriber(sub); err != nil {
u.params.Logger.Errorw("error reinstating subscription", err, "subscirberID", sub.ID(), "trackID", trackID)
// keep it in pending on error in case the error is transient
updatedPending = append(updatedPending, identity)
continue
}
u.params.Logger.Debugw("reinstating subscription", "subscriberID", sub.ID(), "trackID", trackID)
u.opsQueue.Enqueue(func() {
sub.SubscriptionPermissionUpdate(u.params.SID, trackID, true)
})
}
updatedPendingSubscriptions[trackID] = updatedPending
}
u.pendingSubscriptions = updatedPendingSubscriptions
u.lock.Unlock()
}
func (u *UpTrackManager) maybeRevokeSubscriptions(resolver func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant) {
u.lock.Lock()
defer u.lock.Unlock()
for trackID, track := range u.publishedTracks {
allowed := u.getAllowedSubscribersLocked(trackID)
if allowed == nil {
// no restrictions
continue
}
revoked := track.RevokeDisallowedSubscribers(allowed)
for _, subIdentity := range revoked {
u.maybeAddPendingSubscriptionLocked(trackID, subIdentity, nil, resolver)
}
}
}
func (u *UpTrackManager) DebugInfo() map[string]interface{} {
info := map[string]interface{}{}
publishedTrackInfo := make(map[livekit.TrackID]interface{})
u.lock.RLock()
for trackID, track := range u.publishedTracks {
if mt, ok := track.(*MediaTrack); ok {
publishedTrackInfo[trackID] = mt.DebugInfo()
} else {
publishedTrackInfo[trackID] = map[string]interface{}{
"ID": track.ID(),
"Kind": track.Kind().String(),
"PubMuted": track.IsMuted(),
}
}
}
u.lock.RUnlock()
info["PublishedTracks"] = publishedTrackInfo
return info
}