mirror of
https://github.com/livekit/livekit.git
synced 2026-03-29 07:09:51 +00:00
439 lines
12 KiB
Go
439 lines
12 KiB
Go
// Copyright 2023 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package rtc
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
"golang.org/x/exp/maps"
|
|
|
|
"github.com/livekit/livekit-server/pkg/rtc/types"
|
|
"github.com/livekit/protocol/utils"
|
|
)
|
|
|
|
var (
|
|
ErrSubscriptionPermissionNeedsId = errors.New("either participant identity or SID needed")
|
|
)
|
|
|
|
type UpTrackManagerParams struct {
|
|
Logger logger.Logger
|
|
VersionGenerator utils.TimedVersionGenerator
|
|
}
|
|
|
|
// UpTrackManager manages all uptracks from a participant
|
|
type UpTrackManager struct {
|
|
// utils.TimedVersion is a atomic. To be correctly aligned also on 32bit archs
|
|
// 64it atomics need to be at the front of a struct
|
|
subscriptionPermissionVersion utils.TimedVersion
|
|
|
|
params UpTrackManagerParams
|
|
|
|
closed bool
|
|
|
|
// publishedTracks that participant is publishing
|
|
publishedTracks map[livekit.TrackID]types.MediaTrack
|
|
subscriptionPermission *livekit.SubscriptionPermission
|
|
// subscriber permission for published tracks
|
|
subscriberPermissions map[livekit.ParticipantIdentity]*livekit.TrackPermission // subscriberIdentity => *livekit.TrackPermission
|
|
|
|
lock sync.RWMutex
|
|
|
|
// callbacks & handlers
|
|
onClose func()
|
|
onTrackUpdated func(track types.MediaTrack)
|
|
}
|
|
|
|
func NewUpTrackManager(params UpTrackManagerParams) *UpTrackManager {
|
|
return &UpTrackManager{
|
|
params: params,
|
|
publishedTracks: make(map[livekit.TrackID]types.MediaTrack),
|
|
}
|
|
}
|
|
|
|
func (u *UpTrackManager) Close(isExpectedToResume bool) {
|
|
u.lock.Lock()
|
|
if u.closed {
|
|
u.lock.Unlock()
|
|
return
|
|
}
|
|
|
|
u.closed = true
|
|
|
|
publishedTracks := u.publishedTracks
|
|
u.publishedTracks = make(map[livekit.TrackID]types.MediaTrack)
|
|
u.lock.Unlock()
|
|
|
|
for _, t := range publishedTracks {
|
|
t.Close(isExpectedToResume)
|
|
}
|
|
|
|
if onClose := u.getOnUpTrackManagerClose(); onClose != nil {
|
|
onClose()
|
|
}
|
|
}
|
|
|
|
func (u *UpTrackManager) OnUpTrackManagerClose(f func()) {
|
|
u.lock.Lock()
|
|
u.onClose = f
|
|
u.lock.Unlock()
|
|
}
|
|
|
|
func (u *UpTrackManager) getOnUpTrackManagerClose() func() {
|
|
u.lock.RLock()
|
|
defer u.lock.RUnlock()
|
|
|
|
return u.onClose
|
|
}
|
|
|
|
func (u *UpTrackManager) ToProto() []*livekit.TrackInfo {
|
|
u.lock.RLock()
|
|
defer u.lock.RUnlock()
|
|
|
|
var trackInfos []*livekit.TrackInfo
|
|
for _, t := range u.publishedTracks {
|
|
trackInfos = append(trackInfos, t.ToProto())
|
|
}
|
|
|
|
return trackInfos
|
|
}
|
|
|
|
func (u *UpTrackManager) OnPublishedTrackUpdated(f func(track types.MediaTrack)) {
|
|
u.onTrackUpdated = f
|
|
}
|
|
|
|
func (u *UpTrackManager) SetPublishedTrackMuted(trackID livekit.TrackID, muted bool) (types.MediaTrack, bool) {
|
|
changed := false
|
|
track := u.GetPublishedTrack(trackID)
|
|
if track != nil {
|
|
currentMuted := track.IsMuted()
|
|
track.SetMuted(muted)
|
|
|
|
if currentMuted != track.IsMuted() {
|
|
changed = true
|
|
u.params.Logger.Debugw("publisher mute status changed", "trackID", trackID, "muted", track.IsMuted())
|
|
if u.onTrackUpdated != nil {
|
|
u.onTrackUpdated(track)
|
|
}
|
|
}
|
|
}
|
|
|
|
return track, changed
|
|
}
|
|
|
|
func (u *UpTrackManager) GetPublishedTrack(trackID livekit.TrackID) types.MediaTrack {
|
|
u.lock.RLock()
|
|
defer u.lock.RUnlock()
|
|
|
|
return u.getPublishedTrackLocked(trackID)
|
|
}
|
|
|
|
func (u *UpTrackManager) GetPublishedTracks() []types.MediaTrack {
|
|
u.lock.RLock()
|
|
defer u.lock.RUnlock()
|
|
|
|
return maps.Values(u.publishedTracks)
|
|
}
|
|
|
|
func (u *UpTrackManager) UpdateSubscriptionPermission(
|
|
subscriptionPermission *livekit.SubscriptionPermission,
|
|
timedVersion utils.TimedVersion,
|
|
resolverBySid func(participantID livekit.ParticipantID) types.LocalParticipant,
|
|
) error {
|
|
u.lock.Lock()
|
|
if !timedVersion.IsZero() {
|
|
// it's possible for permission updates to come from another node. In that case
|
|
// they would be the authority for this participant's permissions
|
|
// we do not want to initialize subscriptionPermissionVersion too early since if another machine is the
|
|
// owner for the data, we'd prefer to use their TimedVersion
|
|
// ignore older version
|
|
if !timedVersion.After(u.subscriptionPermissionVersion) {
|
|
u.params.Logger.Debugw(
|
|
"skipping older subscription permission version",
|
|
"existingValue", logger.Proto(u.subscriptionPermission),
|
|
"existingVersion", &u.subscriptionPermissionVersion,
|
|
"requestingValue", logger.Proto(subscriptionPermission),
|
|
"requestingVersion", &timedVersion,
|
|
)
|
|
u.lock.Unlock()
|
|
return nil
|
|
}
|
|
u.subscriptionPermissionVersion.Update(timedVersion)
|
|
} else {
|
|
// for requests coming from the current node, use local versions
|
|
u.subscriptionPermissionVersion.Update(u.params.VersionGenerator.Next())
|
|
}
|
|
|
|
// store as is for use when migrating
|
|
u.subscriptionPermission = subscriptionPermission
|
|
if subscriptionPermission == nil {
|
|
u.params.Logger.Debugw(
|
|
"updating subscription permission, setting to nil",
|
|
"version", u.subscriptionPermissionVersion,
|
|
)
|
|
// possible to get a nil when migrating
|
|
u.lock.Unlock()
|
|
return nil
|
|
}
|
|
|
|
u.params.Logger.Debugw(
|
|
"updating subscription permission",
|
|
"permissions", logger.Proto(u.subscriptionPermission),
|
|
"version", u.subscriptionPermissionVersion,
|
|
)
|
|
if err := u.parseSubscriptionPermissionsLocked(subscriptionPermission, func(pID livekit.ParticipantID) types.LocalParticipant {
|
|
u.lock.Unlock()
|
|
var p types.LocalParticipant
|
|
if resolverBySid != nil {
|
|
p = resolverBySid(pID)
|
|
}
|
|
u.lock.Lock()
|
|
return p
|
|
}); err != nil {
|
|
// when failed, do not override previous permissions
|
|
u.params.Logger.Errorw("failed updating subscription permission", err)
|
|
u.lock.Unlock()
|
|
return err
|
|
}
|
|
u.lock.Unlock()
|
|
|
|
u.maybeRevokeSubscriptions()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (u *UpTrackManager) SubscriptionPermission() (*livekit.SubscriptionPermission, utils.TimedVersion) {
|
|
u.lock.RLock()
|
|
defer u.lock.RUnlock()
|
|
|
|
if u.subscriptionPermissionVersion.IsZero() {
|
|
return nil, u.subscriptionPermissionVersion.Load()
|
|
}
|
|
|
|
return u.subscriptionPermission, u.subscriptionPermissionVersion.Load()
|
|
}
|
|
|
|
func (u *UpTrackManager) HasPermission(trackID livekit.TrackID, subIdentity livekit.ParticipantIdentity) bool {
|
|
u.lock.RLock()
|
|
defer u.lock.RUnlock()
|
|
|
|
return u.hasPermissionLocked(trackID, subIdentity)
|
|
}
|
|
|
|
func (u *UpTrackManager) UpdatePublishedAudioTrack(update *livekit.UpdateLocalAudioTrack) types.MediaTrack {
|
|
track := u.GetPublishedTrack(livekit.TrackID(update.TrackSid))
|
|
if track != nil {
|
|
track.UpdateAudioTrack(update)
|
|
if u.onTrackUpdated != nil {
|
|
u.onTrackUpdated(track)
|
|
}
|
|
}
|
|
|
|
return track
|
|
}
|
|
|
|
func (u *UpTrackManager) UpdatePublishedVideoTrack(update *livekit.UpdateLocalVideoTrack) types.MediaTrack {
|
|
track := u.GetPublishedTrack(livekit.TrackID(update.TrackSid))
|
|
if track != nil {
|
|
track.UpdateVideoTrack(update)
|
|
if u.onTrackUpdated != nil {
|
|
u.onTrackUpdated(track)
|
|
}
|
|
}
|
|
|
|
return track
|
|
}
|
|
|
|
func (u *UpTrackManager) AddPublishedTrack(track types.MediaTrack) {
|
|
u.lock.Lock()
|
|
if _, ok := u.publishedTracks[track.ID()]; !ok {
|
|
u.publishedTracks[track.ID()] = track
|
|
}
|
|
u.lock.Unlock()
|
|
u.params.Logger.Debugw("added published track", "trackID", track.ID(), "trackInfo", logger.Proto(track.ToProto()))
|
|
|
|
track.AddOnClose(func(_isExpectedToResume bool) {
|
|
u.lock.Lock()
|
|
delete(u.publishedTracks, track.ID())
|
|
// not modifying subscription permissions, will get reset on next update from participant
|
|
u.lock.Unlock()
|
|
})
|
|
}
|
|
|
|
func (u *UpTrackManager) RemovePublishedTrack(track types.MediaTrack, isExpectedToResume bool) {
|
|
track.Close(isExpectedToResume)
|
|
|
|
u.lock.Lock()
|
|
delete(u.publishedTracks, track.ID())
|
|
u.lock.Unlock()
|
|
}
|
|
|
|
func (u *UpTrackManager) getPublishedTrackLocked(trackID livekit.TrackID) types.MediaTrack {
|
|
return u.publishedTracks[trackID]
|
|
}
|
|
|
|
func (u *UpTrackManager) parseSubscriptionPermissionsLocked(
|
|
subscriptionPermission *livekit.SubscriptionPermission,
|
|
resolver func(participantID livekit.ParticipantID) types.LocalParticipant,
|
|
) error {
|
|
// every update overrides the existing
|
|
|
|
// all_participants takes precedence
|
|
if subscriptionPermission.AllParticipants {
|
|
// everything is allowed, nothing else to do
|
|
u.subscriberPermissions = nil
|
|
return nil
|
|
}
|
|
|
|
// per participant permissions
|
|
subscriberPermissions := make(map[livekit.ParticipantIdentity]*livekit.TrackPermission)
|
|
for _, trackPerms := range subscriptionPermission.TrackPermissions {
|
|
subscriberIdentity := livekit.ParticipantIdentity(trackPerms.ParticipantIdentity)
|
|
if subscriberIdentity == "" {
|
|
if trackPerms.ParticipantSid == "" {
|
|
return ErrSubscriptionPermissionNeedsId
|
|
}
|
|
|
|
sub := resolver(livekit.ParticipantID(trackPerms.ParticipantSid))
|
|
if sub == nil {
|
|
u.params.Logger.Warnw("could not find subscriber for permissions update", nil, "subscriberID", trackPerms.ParticipantSid)
|
|
continue
|
|
}
|
|
|
|
subscriberIdentity = sub.Identity()
|
|
} else {
|
|
if trackPerms.ParticipantSid != "" {
|
|
sub := resolver(livekit.ParticipantID(trackPerms.ParticipantSid))
|
|
if sub != nil && sub.Identity() != subscriberIdentity {
|
|
u.params.Logger.Errorw("participant identity mismatch", nil, "expected", subscriberIdentity, "got", sub.Identity())
|
|
}
|
|
if sub == nil {
|
|
u.params.Logger.Warnw("could not find subscriber for permissions update", nil, "subscriberID", trackPerms.ParticipantSid)
|
|
}
|
|
}
|
|
}
|
|
|
|
subscriberPermissions[subscriberIdentity] = trackPerms
|
|
}
|
|
|
|
u.subscriberPermissions = subscriberPermissions
|
|
|
|
return nil
|
|
}
|
|
|
|
func (u *UpTrackManager) hasPermissionLocked(trackID livekit.TrackID, subscriberIdentity livekit.ParticipantIdentity) bool {
|
|
if u.subscriberPermissions == nil {
|
|
return true
|
|
}
|
|
|
|
perms, ok := u.subscriberPermissions[subscriberIdentity]
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
if perms.AllTracks {
|
|
return true
|
|
}
|
|
|
|
for _, sid := range perms.TrackSids {
|
|
if livekit.TrackID(sid) == trackID {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// returns a list of participants that are allowed to subscribe to the track. if nil is returned, it means everyone is
|
|
// allowed to subscribe to this track
|
|
func (u *UpTrackManager) getAllowedSubscribersLocked(trackID livekit.TrackID) []livekit.ParticipantIdentity {
|
|
if u.subscriberPermissions == nil {
|
|
return nil
|
|
}
|
|
|
|
allowed := make([]livekit.ParticipantIdentity, 0)
|
|
for subscriberIdentity, perms := range u.subscriberPermissions {
|
|
if perms.AllTracks {
|
|
allowed = append(allowed, subscriberIdentity)
|
|
continue
|
|
}
|
|
|
|
for _, sid := range perms.TrackSids {
|
|
if livekit.TrackID(sid) == trackID {
|
|
allowed = append(allowed, subscriberIdentity)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
return allowed
|
|
}
|
|
|
|
func (u *UpTrackManager) maybeRevokeSubscriptions() {
|
|
u.lock.Lock()
|
|
defer u.lock.Unlock()
|
|
|
|
for trackID, track := range u.publishedTracks {
|
|
allowed := u.getAllowedSubscribersLocked(trackID)
|
|
if allowed == nil {
|
|
// no restrictions
|
|
continue
|
|
}
|
|
|
|
track.RevokeDisallowedSubscribers(allowed)
|
|
}
|
|
}
|
|
|
|
func (u *UpTrackManager) DebugInfo() map[string]any {
|
|
info := map[string]any{}
|
|
publishedTrackInfo := make(map[livekit.TrackID]any)
|
|
|
|
u.lock.RLock()
|
|
for trackID, track := range u.publishedTracks {
|
|
if mt, ok := track.(*MediaTrack); ok {
|
|
publishedTrackInfo[trackID] = mt.DebugInfo()
|
|
} else {
|
|
publishedTrackInfo[trackID] = map[string]any{
|
|
"ID": track.ID(),
|
|
"Kind": track.Kind().String(),
|
|
"PubMuted": track.IsMuted(),
|
|
}
|
|
}
|
|
}
|
|
u.lock.RUnlock()
|
|
|
|
info["PublishedTracks"] = publishedTrackInfo
|
|
|
|
return info
|
|
}
|
|
|
|
func (u *UpTrackManager) GetAudioLevel() (level float64, active bool) {
|
|
level = 0
|
|
for _, pt := range u.GetPublishedTracks() {
|
|
if pt.Source() == livekit.TrackSource_MICROPHONE {
|
|
tl, ta := pt.GetAudioLevel()
|
|
if ta {
|
|
active = true
|
|
if tl > level {
|
|
level = tl
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|