mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 17:45:40 +00:00
617 lines
17 KiB
Go
617 lines
17 KiB
Go
package rtc
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
|
|
"github.com/livekit/livekit-server/pkg/rtc/types"
|
|
"github.com/livekit/livekit-server/pkg/utils"
|
|
)
|
|
|
|
var (
|
|
ErrSubscriptionPermissionNeedsId = errors.New("either participant identity or SID needed")
|
|
)
|
|
|
|
type UpTrackManagerParams struct {
|
|
SID livekit.ParticipantID
|
|
Logger logger.Logger
|
|
}
|
|
|
|
type UpTrackManager struct {
|
|
params UpTrackManagerParams
|
|
|
|
closed bool
|
|
|
|
// publishedTracks that participant is publishing
|
|
publishedTracks map[livekit.TrackID]types.MediaTrack
|
|
subscriptionPermission *livekit.SubscriptionPermission
|
|
subscriptionPermissionVersion *utils.TimedVersion
|
|
// subscriber permission for published tracks
|
|
subscriberPermissions map[livekit.ParticipantIdentity]*livekit.TrackPermission // subscriberIdentity => *livekit.TrackPermission
|
|
// keeps tracks of track specific subscribers who are awaiting permission
|
|
pendingSubscriptions map[livekit.TrackID][]livekit.ParticipantIdentity // trackID => []subscriberIdentity
|
|
|
|
opsQueue *utils.OpsQueue
|
|
|
|
lock sync.RWMutex
|
|
|
|
// callbacks & handlers
|
|
onClose func()
|
|
onTrackUpdated func(track types.MediaTrack, onlyIfReady bool)
|
|
}
|
|
|
|
func NewUpTrackManager(params UpTrackManagerParams) *UpTrackManager {
|
|
return &UpTrackManager{
|
|
params: params,
|
|
publishedTracks: make(map[livekit.TrackID]types.MediaTrack),
|
|
pendingSubscriptions: make(map[livekit.TrackID][]livekit.ParticipantIdentity),
|
|
opsQueue: utils.NewOpsQueue(params.Logger, "utm", 20),
|
|
}
|
|
}
|
|
|
|
func (u *UpTrackManager) Start() {
|
|
u.opsQueue.Start()
|
|
}
|
|
|
|
func (u *UpTrackManager) Close(willBeResumed bool) {
|
|
u.opsQueue.Stop()
|
|
|
|
u.lock.Lock()
|
|
u.closed = true
|
|
notify := len(u.publishedTracks) == 0
|
|
u.lock.Unlock()
|
|
|
|
// remove all subscribers
|
|
for _, t := range u.GetPublishedTracks() {
|
|
t.ClearAllReceivers(willBeResumed)
|
|
}
|
|
|
|
if notify && u.onClose != nil {
|
|
u.onClose()
|
|
}
|
|
}
|
|
|
|
func (u *UpTrackManager) OnUpTrackManagerClose(f func()) {
|
|
u.onClose = f
|
|
}
|
|
|
|
func (u *UpTrackManager) ToProto() []*livekit.TrackInfo {
|
|
u.lock.RLock()
|
|
defer u.lock.RUnlock()
|
|
|
|
var trackInfos []*livekit.TrackInfo
|
|
for _, t := range u.publishedTracks {
|
|
trackInfos = append(trackInfos, t.ToProto())
|
|
}
|
|
|
|
return trackInfos
|
|
}
|
|
|
|
func (u *UpTrackManager) OnPublishedTrackUpdated(f func(track types.MediaTrack, onlyIfReady bool)) {
|
|
u.onTrackUpdated = f
|
|
}
|
|
|
|
// AddSubscriber subscribes op to all publishedTracks
|
|
func (u *UpTrackManager) AddSubscriber(sub types.LocalParticipant, params types.AddSubscriberParams) (int, error) {
|
|
u.lock.Lock()
|
|
defer u.lock.Unlock()
|
|
|
|
var tracks []types.MediaTrack
|
|
if params.AllTracks {
|
|
for _, t := range u.publishedTracks {
|
|
tracks = append(tracks, t)
|
|
}
|
|
} else {
|
|
for _, trackID := range params.TrackIDs {
|
|
track := u.getPublishedTrackLocked(trackID)
|
|
if track == nil {
|
|
continue
|
|
}
|
|
|
|
tracks = append(tracks, track)
|
|
}
|
|
}
|
|
if len(tracks) == 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
var trackIDs []livekit.TrackID
|
|
for _, track := range tracks {
|
|
trackIDs = append(trackIDs, track.ID())
|
|
}
|
|
u.params.Logger.Debugw("subscribing participant to tracks",
|
|
"subscriber", sub.Identity(),
|
|
"subscriberID", sub.ID(),
|
|
"trackIDs", trackIDs)
|
|
|
|
n := 0
|
|
for _, track := range tracks {
|
|
trackID := track.ID()
|
|
subscriberIdentity := sub.Identity()
|
|
if !u.hasPermissionLocked(trackID, subscriberIdentity) {
|
|
u.maybeAddPendingSubscriptionLocked(trackID, subscriberIdentity, sub, nil)
|
|
continue
|
|
}
|
|
|
|
if err := track.AddSubscriber(sub); err != nil {
|
|
return n, err
|
|
}
|
|
n += 1
|
|
|
|
u.maybeRemovePendingSubscriptionLocked(trackID, sub, true)
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
func (u *UpTrackManager) RemoveSubscriber(sub types.LocalParticipant, trackID livekit.TrackID, willBeResumed bool) {
|
|
u.lock.Lock()
|
|
defer u.lock.Unlock()
|
|
|
|
track := u.getPublishedTrackLocked(trackID)
|
|
if track != nil {
|
|
track.RemoveSubscriber(sub.ID(), willBeResumed)
|
|
}
|
|
|
|
u.maybeRemovePendingSubscriptionLocked(trackID, sub, false)
|
|
}
|
|
|
|
func (u *UpTrackManager) SetPublishedTrackMuted(trackID livekit.TrackID, muted bool) types.MediaTrack {
|
|
u.lock.RLock()
|
|
track := u.publishedTracks[trackID]
|
|
u.lock.RUnlock()
|
|
|
|
if track != nil {
|
|
currentMuted := track.IsMuted()
|
|
track.SetMuted(muted)
|
|
|
|
if currentMuted != track.IsMuted() {
|
|
u.params.Logger.Infow("publisher mute status changed", "trackID", trackID, "muted", track.IsMuted())
|
|
if u.onTrackUpdated != nil {
|
|
u.onTrackUpdated(track, false)
|
|
}
|
|
}
|
|
}
|
|
|
|
return track
|
|
}
|
|
|
|
func (u *UpTrackManager) GetPublishedTrack(trackID livekit.TrackID) types.MediaTrack {
|
|
u.lock.RLock()
|
|
defer u.lock.RUnlock()
|
|
|
|
return u.getPublishedTrackLocked(trackID)
|
|
}
|
|
|
|
func (u *UpTrackManager) GetPublishedTracks() []types.MediaTrack {
|
|
u.lock.RLock()
|
|
defer u.lock.RUnlock()
|
|
|
|
tracks := make([]types.MediaTrack, 0, len(u.publishedTracks))
|
|
for _, t := range u.publishedTracks {
|
|
tracks = append(tracks, t)
|
|
}
|
|
return tracks
|
|
}
|
|
|
|
func (u *UpTrackManager) UpdateSubscriptionPermission(
|
|
subscriptionPermission *livekit.SubscriptionPermission,
|
|
timedVersion *livekit.TimedVersion,
|
|
resolverByIdentity func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant,
|
|
resolverBySid func(participantID livekit.ParticipantID) types.LocalParticipant,
|
|
) error {
|
|
u.lock.Lock()
|
|
if timedVersion != nil {
|
|
if u.subscriptionPermissionVersion != nil {
|
|
tv := utils.NewTimedVersionFromProto(timedVersion)
|
|
// ignore older version
|
|
if !tv.After(u.subscriptionPermissionVersion) {
|
|
perms := ""
|
|
if u.subscriptionPermission != nil {
|
|
perms = u.subscriptionPermission.String()
|
|
}
|
|
u.params.Logger.Infow(
|
|
"skipping older subscription permission version",
|
|
"existingValue", perms,
|
|
"existingVersion", u.subscriptionPermissionVersion.ToProto().String(),
|
|
"requestingValue", subscriptionPermission.String(),
|
|
"requestingVersion", timedVersion.String(),
|
|
)
|
|
u.lock.Unlock()
|
|
return nil
|
|
}
|
|
u.subscriptionPermissionVersion.Update(time.UnixMicro(timedVersion.UnixMicro))
|
|
} else {
|
|
u.subscriptionPermissionVersion = utils.NewTimedVersionFromProto(timedVersion)
|
|
}
|
|
} else {
|
|
// use current time as the new/updated version
|
|
if u.subscriptionPermissionVersion == nil {
|
|
u.subscriptionPermissionVersion = utils.NewTimedVersion(time.Now(), 0)
|
|
} else {
|
|
u.subscriptionPermissionVersion.Update(time.Now())
|
|
}
|
|
}
|
|
|
|
// store as is for use when migrating
|
|
u.subscriptionPermission = subscriptionPermission
|
|
if subscriptionPermission == nil {
|
|
u.params.Logger.Debugw(
|
|
"updating subscription permission, setting to nil",
|
|
"version", u.subscriptionPermissionVersion.ToProto().String(),
|
|
)
|
|
// possible to get a nil when migrating
|
|
u.lock.Unlock()
|
|
return nil
|
|
}
|
|
|
|
u.params.Logger.Debugw(
|
|
"updating subscription permission",
|
|
"permissions", u.subscriptionPermission.String(),
|
|
"version", u.subscriptionPermissionVersion.ToProto().String(),
|
|
)
|
|
if err := u.parseSubscriptionPermissions(subscriptionPermission, resolverBySid); err != nil {
|
|
// when failed, do not override previous permissions
|
|
u.params.Logger.Errorw("failed updating subscription permission", err)
|
|
u.lock.Unlock()
|
|
return err
|
|
}
|
|
u.lock.Unlock()
|
|
|
|
u.processPendingSubscriptions(resolverByIdentity)
|
|
u.maybeRevokeSubscriptions(resolverByIdentity)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (u *UpTrackManager) SubscriptionPermission() (*livekit.SubscriptionPermission, *livekit.TimedVersion) {
|
|
u.lock.RLock()
|
|
defer u.lock.RUnlock()
|
|
|
|
if u.subscriptionPermissionVersion == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
return u.subscriptionPermission, u.subscriptionPermissionVersion.ToProto()
|
|
}
|
|
|
|
func (u *UpTrackManager) UpdateVideoLayers(updateVideoLayers *livekit.UpdateVideoLayers) error {
|
|
track := u.GetPublishedTrack(livekit.TrackID(updateVideoLayers.TrackSid))
|
|
if track == nil {
|
|
u.params.Logger.Warnw("could not find track", nil, "trackID", livekit.TrackID(updateVideoLayers.TrackSid))
|
|
return errors.New("could not find published track")
|
|
}
|
|
|
|
track.UpdateVideoLayers(updateVideoLayers.Layers)
|
|
if u.onTrackUpdated != nil {
|
|
u.onTrackUpdated(track, false)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (u *UpTrackManager) AddPublishedTrack(track types.MediaTrack) {
|
|
u.lock.Lock()
|
|
if _, ok := u.publishedTracks[track.ID()]; !ok {
|
|
u.publishedTracks[track.ID()] = track
|
|
}
|
|
u.lock.Unlock()
|
|
u.params.Logger.Debugw("added published track", "trackID", track.ID(), "trackInfo", track.ToProto().String())
|
|
|
|
track.AddOnClose(func() {
|
|
notifyClose := false
|
|
|
|
// cleanup
|
|
u.lock.Lock()
|
|
trackID := track.ID()
|
|
delete(u.publishedTracks, trackID)
|
|
delete(u.pendingSubscriptions, trackID)
|
|
// not modifying subscription permissions, will get reset on next update from participant
|
|
|
|
if u.closed && len(u.publishedTracks) == 0 {
|
|
notifyClose = true
|
|
}
|
|
u.lock.Unlock()
|
|
|
|
// only send this when client is in a ready state
|
|
if u.onTrackUpdated != nil {
|
|
u.onTrackUpdated(track, true)
|
|
}
|
|
|
|
if notifyClose && u.onClose != nil {
|
|
u.onClose()
|
|
}
|
|
})
|
|
}
|
|
|
|
func (u *UpTrackManager) RemovePublishedTrack(track types.MediaTrack, willBeResumed bool, shouldClose bool) {
|
|
if shouldClose {
|
|
track.Close(willBeResumed)
|
|
} else {
|
|
track.ClearAllReceivers(willBeResumed)
|
|
}
|
|
u.lock.Lock()
|
|
delete(u.publishedTracks, track.ID())
|
|
u.lock.Unlock()
|
|
}
|
|
|
|
func (u *UpTrackManager) getPublishedTrackLocked(trackID livekit.TrackID) types.MediaTrack {
|
|
return u.publishedTracks[trackID]
|
|
}
|
|
|
|
func (u *UpTrackManager) parseSubscriptionPermissions(
|
|
subscriptionPermission *livekit.SubscriptionPermission,
|
|
resolver func(participantID livekit.ParticipantID) types.LocalParticipant,
|
|
) error {
|
|
// every update overrides the existing
|
|
|
|
// all_participants takes precedence
|
|
if subscriptionPermission.AllParticipants {
|
|
// everything is allowed, nothing else to do
|
|
u.subscriberPermissions = nil
|
|
return nil
|
|
}
|
|
|
|
// per participant permissions
|
|
subscriberPermissions := make(map[livekit.ParticipantIdentity]*livekit.TrackPermission)
|
|
for _, trackPerms := range subscriptionPermission.TrackPermissions {
|
|
subscriberIdentity := livekit.ParticipantIdentity(trackPerms.ParticipantIdentity)
|
|
if subscriberIdentity == "" {
|
|
if trackPerms.ParticipantSid == "" {
|
|
return ErrSubscriptionPermissionNeedsId
|
|
}
|
|
|
|
sub := resolver(livekit.ParticipantID(trackPerms.ParticipantSid))
|
|
if sub == nil {
|
|
u.params.Logger.Warnw("could not find subscriber for permissions update", nil, "subscriberID", trackPerms.ParticipantSid)
|
|
continue
|
|
}
|
|
|
|
subscriberIdentity = sub.Identity()
|
|
} else {
|
|
if trackPerms.ParticipantSid != "" {
|
|
sub := resolver(livekit.ParticipantID(trackPerms.ParticipantSid))
|
|
if sub != nil && sub.Identity() != subscriberIdentity {
|
|
u.params.Logger.Errorw("participant identity mismatch", nil, "expected", subscriberIdentity, "got", sub.Identity())
|
|
}
|
|
if sub == nil {
|
|
u.params.Logger.Warnw("could not find subscriber for permissions update", nil, "subscriberID", trackPerms.ParticipantSid)
|
|
}
|
|
}
|
|
}
|
|
|
|
subscriberPermissions[subscriberIdentity] = trackPerms
|
|
}
|
|
|
|
u.subscriberPermissions = subscriberPermissions
|
|
|
|
return nil
|
|
}
|
|
|
|
func (u *UpTrackManager) hasPermissionLocked(trackID livekit.TrackID, subscriberIdentity livekit.ParticipantIdentity) bool {
|
|
if u.subscriberPermissions == nil {
|
|
return true
|
|
}
|
|
|
|
perms, ok := u.subscriberPermissions[subscriberIdentity]
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
if perms.AllTracks {
|
|
return true
|
|
}
|
|
|
|
for _, sid := range perms.TrackSids {
|
|
if livekit.TrackID(sid) == trackID {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (u *UpTrackManager) getAllowedSubscribersLocked(trackID livekit.TrackID) []livekit.ParticipantIdentity {
|
|
if u.subscriberPermissions == nil {
|
|
return nil
|
|
}
|
|
|
|
allowed := make([]livekit.ParticipantIdentity, 0)
|
|
for subscriberIdentity, perms := range u.subscriberPermissions {
|
|
if perms.AllTracks {
|
|
allowed = append(allowed, subscriberIdentity)
|
|
continue
|
|
}
|
|
|
|
for _, sid := range perms.TrackSids {
|
|
if livekit.TrackID(sid) == trackID {
|
|
allowed = append(allowed, subscriberIdentity)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
return allowed
|
|
}
|
|
|
|
func (u *UpTrackManager) maybeAddPendingSubscriptionLocked(
|
|
trackID livekit.TrackID,
|
|
subscriberIdentity livekit.ParticipantIdentity,
|
|
sub types.LocalParticipant,
|
|
resolver func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant,
|
|
) {
|
|
pending := u.pendingSubscriptions[trackID]
|
|
for _, identity := range pending {
|
|
if identity == subscriberIdentity {
|
|
// already pending
|
|
return
|
|
}
|
|
}
|
|
|
|
u.pendingSubscriptions[trackID] = append(u.pendingSubscriptions[trackID], subscriberIdentity)
|
|
u.params.Logger.Debugw("adding pending subscription", "subscriberIdentity", subscriberIdentity, "trackID", trackID)
|
|
u.opsQueue.Enqueue(func() {
|
|
if sub == nil {
|
|
if resolver == nil {
|
|
u.params.Logger.Warnw("no resolver", nil)
|
|
} else {
|
|
sub = resolver(subscriberIdentity)
|
|
}
|
|
}
|
|
|
|
if sub != nil {
|
|
sub.SubscriptionPermissionUpdate(u.params.SID, trackID, false)
|
|
} else {
|
|
u.params.Logger.Warnw("could not send subscription permission update, no subscriber", nil, "subscriberIdentity", subscriberIdentity)
|
|
}
|
|
})
|
|
}
|
|
|
|
func (u *UpTrackManager) maybeRemovePendingSubscriptionLocked(trackID livekit.TrackID, sub types.LocalParticipant, sendUpdate bool) {
|
|
subscriberIdentity := sub.Identity()
|
|
|
|
found := false
|
|
|
|
pending := u.pendingSubscriptions[trackID]
|
|
n := len(pending)
|
|
for idx, identity := range pending {
|
|
if identity == subscriberIdentity {
|
|
found = true
|
|
u.pendingSubscriptions[trackID][idx] = u.pendingSubscriptions[trackID][n-1]
|
|
u.pendingSubscriptions[trackID] = u.pendingSubscriptions[trackID][:n-1]
|
|
break
|
|
}
|
|
}
|
|
if len(u.pendingSubscriptions[trackID]) == 0 {
|
|
delete(u.pendingSubscriptions, trackID)
|
|
}
|
|
|
|
if found && sendUpdate {
|
|
u.params.Logger.Debugw("removing pending subscription", "subscriberID", sub.ID(), "trackID", trackID)
|
|
u.opsQueue.Enqueue(func() {
|
|
sub.SubscriptionPermissionUpdate(u.params.SID, trackID, true)
|
|
})
|
|
}
|
|
}
|
|
|
|
// creates subscriptions for tracks if permissions have been granted
|
|
func (u *UpTrackManager) processPendingSubscriptions(resolver func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant) {
|
|
type ResolvedInfo struct {
|
|
sub types.LocalParticipant
|
|
state livekit.ParticipantInfo_State
|
|
}
|
|
|
|
// gather all identites that need resolving
|
|
resolvedInfos := make(map[livekit.ParticipantIdentity]*ResolvedInfo)
|
|
u.lock.RLock()
|
|
for trackID, pending := range u.pendingSubscriptions {
|
|
track := u.getPublishedTrackLocked(trackID)
|
|
if track == nil {
|
|
// published track is gone
|
|
continue
|
|
}
|
|
|
|
for _, identity := range pending {
|
|
resolvedInfos[identity] = nil
|
|
}
|
|
}
|
|
u.lock.RUnlock()
|
|
|
|
for identity := range resolvedInfos {
|
|
sub := resolver(identity)
|
|
if sub != nil {
|
|
resolvedInfos[identity] = &ResolvedInfo{
|
|
sub: sub,
|
|
state: sub.State(),
|
|
}
|
|
}
|
|
}
|
|
|
|
// check for subscriptions that can be reinstated
|
|
u.lock.Lock()
|
|
updatedPendingSubscriptions := make(map[livekit.TrackID][]livekit.ParticipantIdentity)
|
|
for trackID, pending := range u.pendingSubscriptions {
|
|
track := u.getPublishedTrackLocked(trackID)
|
|
if track == nil {
|
|
// published track is gone
|
|
continue
|
|
}
|
|
|
|
var updatedPending []livekit.ParticipantIdentity
|
|
for _, identity := range pending {
|
|
resolvedInfo := resolvedInfos[identity]
|
|
if resolvedInfo == nil || resolvedInfo.sub == nil || resolvedInfo.state == livekit.ParticipantInfo_DISCONNECTED {
|
|
// do not keep this pending subscription as subscriber may be gone
|
|
continue
|
|
}
|
|
|
|
if !u.hasPermissionLocked(trackID, identity) {
|
|
updatedPending = append(updatedPending, identity)
|
|
continue
|
|
}
|
|
|
|
sub := resolvedInfo.sub
|
|
if err := track.AddSubscriber(sub); err != nil {
|
|
u.params.Logger.Errorw("error reinstating subscription", err, "subscirberID", sub.ID(), "trackID", trackID)
|
|
// keep it in pending on error in case the error is transient
|
|
updatedPending = append(updatedPending, identity)
|
|
continue
|
|
}
|
|
|
|
u.params.Logger.Debugw("reinstating subscription", "subscriberID", sub.ID(), "trackID", trackID)
|
|
u.opsQueue.Enqueue(func() {
|
|
sub.SubscriptionPermissionUpdate(u.params.SID, trackID, true)
|
|
})
|
|
}
|
|
|
|
updatedPendingSubscriptions[trackID] = updatedPending
|
|
}
|
|
|
|
u.pendingSubscriptions = updatedPendingSubscriptions
|
|
u.lock.Unlock()
|
|
}
|
|
|
|
func (u *UpTrackManager) maybeRevokeSubscriptions(resolver func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant) {
|
|
u.lock.Lock()
|
|
defer u.lock.Unlock()
|
|
|
|
for trackID, track := range u.publishedTracks {
|
|
allowed := u.getAllowedSubscribersLocked(trackID)
|
|
if allowed == nil {
|
|
// no restrictions
|
|
continue
|
|
}
|
|
|
|
revoked := track.RevokeDisallowedSubscribers(allowed)
|
|
for _, subIdentity := range revoked {
|
|
u.maybeAddPendingSubscriptionLocked(trackID, subIdentity, nil, resolver)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (u *UpTrackManager) DebugInfo() map[string]interface{} {
|
|
info := map[string]interface{}{}
|
|
publishedTrackInfo := make(map[livekit.TrackID]interface{})
|
|
|
|
u.lock.RLock()
|
|
for trackID, track := range u.publishedTracks {
|
|
if mt, ok := track.(*MediaTrack); ok {
|
|
publishedTrackInfo[trackID] = mt.DebugInfo()
|
|
} else {
|
|
publishedTrackInfo[trackID] = map[string]interface{}{
|
|
"ID": track.ID(),
|
|
"Kind": track.Kind().String(),
|
|
"PubMuted": track.IsMuted(),
|
|
}
|
|
}
|
|
}
|
|
u.lock.RUnlock()
|
|
|
|
info["PublishedTracks"] = publishedTrackInfo
|
|
|
|
return info
|
|
}
|