mirror of
https://github.com/livekit/livekit.git
synced 2026-03-29 07:09:51 +00:00
* Some logging changes. Trying to chase a case of large sequence number gap on subscriber side where packets are sent after a long time. * return values instead of logging
264 lines
6.7 KiB
Go
264 lines
6.7 KiB
Go
/*
|
|
* Copyright 2023 LiveKit, Inc
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package rtc
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/livekit/livekit-server/pkg/rtc/types"
|
|
"github.com/livekit/livekit-server/pkg/utils"
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
)
|
|
|
|
// RoomTrackManager holds tracks that are published to the room
|
|
type RoomTrackManager struct {
|
|
logger logger.Logger
|
|
|
|
lock sync.RWMutex
|
|
changedNotifier *utils.ChangeNotifierManager
|
|
removedNotifier *utils.ChangeNotifierManager
|
|
tracks map[livekit.TrackID][]*TrackInfo
|
|
dataTracks map[livekit.TrackID][]*DataTrackInfo
|
|
}
|
|
|
|
type TrackInfo struct {
|
|
Track types.MediaTrack
|
|
PublisherIdentity livekit.ParticipantIdentity
|
|
PublisherID livekit.ParticipantID
|
|
}
|
|
|
|
type DataTrackInfo struct {
|
|
DataTrack types.DataTrack
|
|
PublisherIdentity livekit.ParticipantIdentity
|
|
PublisherID livekit.ParticipantID
|
|
}
|
|
|
|
func NewRoomTrackManager(logger logger.Logger) *RoomTrackManager {
|
|
return &RoomTrackManager{
|
|
logger: logger,
|
|
tracks: make(map[livekit.TrackID][]*TrackInfo),
|
|
dataTracks: make(map[livekit.TrackID][]*DataTrackInfo),
|
|
changedNotifier: utils.NewChangeNotifierManager(),
|
|
removedNotifier: utils.NewChangeNotifierManager(),
|
|
}
|
|
}
|
|
|
|
func (r *RoomTrackManager) AddTrack(track types.MediaTrack, publisherIdentity livekit.ParticipantIdentity, publisherID livekit.ParticipantID) {
|
|
trackID := track.ID()
|
|
r.lock.Lock()
|
|
infos, ok := r.tracks[trackID]
|
|
if ok {
|
|
for _, info := range infos {
|
|
if info.Track == track {
|
|
r.lock.Unlock()
|
|
r.logger.Infow("not adding duplicate track", "trackID", trackID)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
r.tracks[trackID] = append(r.tracks[trackID], &TrackInfo{
|
|
Track: track,
|
|
PublisherIdentity: publisherIdentity,
|
|
PublisherID: publisherID,
|
|
})
|
|
r.lock.Unlock()
|
|
|
|
r.NotifyTrackChanged(trackID)
|
|
}
|
|
|
|
func (r *RoomTrackManager) RemoveTrack(track types.MediaTrack) {
|
|
trackID := track.ID()
|
|
r.lock.Lock()
|
|
// ensure we are removing the same track as added
|
|
infos, ok := r.tracks[trackID]
|
|
if !ok {
|
|
r.lock.Unlock()
|
|
return
|
|
}
|
|
|
|
numRemoved := 0
|
|
idx := 0
|
|
for _, info := range infos {
|
|
if info.Track == track {
|
|
numRemoved++
|
|
} else {
|
|
r.tracks[trackID][idx] = info
|
|
idx++
|
|
}
|
|
}
|
|
for j := idx; j < len(infos); j++ {
|
|
r.tracks[trackID][j] = nil
|
|
}
|
|
r.tracks[trackID] = r.tracks[trackID][:idx]
|
|
if len(r.tracks[trackID]) == 0 {
|
|
delete(r.tracks, trackID)
|
|
}
|
|
r.lock.Unlock()
|
|
if numRemoved == 0 {
|
|
return
|
|
}
|
|
if numRemoved > 1 {
|
|
r.logger.Warnw("removed multiple tracks", nil, "trackID", trackID, "numRemoved", numRemoved)
|
|
}
|
|
|
|
n := r.removedNotifier.GetNotifier(string(trackID))
|
|
if n != nil {
|
|
n.NotifyChanged()
|
|
}
|
|
|
|
r.changedNotifier.RemoveNotifier(string(trackID), true)
|
|
r.removedNotifier.RemoveNotifier(string(trackID), true)
|
|
}
|
|
|
|
func (r *RoomTrackManager) GetTrackInfo(trackID livekit.TrackID) *TrackInfo {
|
|
r.lock.RLock()
|
|
defer r.lock.RUnlock()
|
|
|
|
infos := r.tracks[trackID]
|
|
if len(infos) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// earliest added track is used till it is removed
|
|
info := infos[0]
|
|
|
|
// when track is about to close, do not resolve
|
|
if info.Track != nil && !info.Track.IsOpen() {
|
|
return nil
|
|
}
|
|
return info
|
|
}
|
|
|
|
func (r *RoomTrackManager) NotifyTrackChanged(trackID livekit.TrackID) {
|
|
n := r.changedNotifier.GetNotifier(string(trackID))
|
|
if n != nil {
|
|
n.NotifyChanged()
|
|
}
|
|
}
|
|
|
|
// HasObservers lets caller know if the current media track has any observers
|
|
// this is used to signal interest in the track. when another MediaTrack with the same
|
|
// trackID is being used, track is not considered to be observed.
|
|
func (r *RoomTrackManager) HasObservers(track types.MediaTrack) bool {
|
|
n := r.changedNotifier.GetNotifier(string(track.ID()))
|
|
if n == nil || !n.HasObservers() {
|
|
return false
|
|
}
|
|
|
|
info := r.GetTrackInfo(track.ID())
|
|
if info == nil || info.Track != track {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (r *RoomTrackManager) GetOrCreateTrackChangeNotifier(trackID livekit.TrackID) *utils.ChangeNotifier {
|
|
return r.changedNotifier.GetOrCreateNotifier(string(trackID))
|
|
}
|
|
|
|
func (r *RoomTrackManager) GetOrCreateTrackRemoveNotifier(trackID livekit.TrackID) *utils.ChangeNotifier {
|
|
return r.removedNotifier.GetOrCreateNotifier(string(trackID))
|
|
}
|
|
|
|
func (r *RoomTrackManager) AddDataTrack(dataTrack types.DataTrack, publisherIdentity livekit.ParticipantIdentity, publisherID livekit.ParticipantID) {
|
|
trackID := dataTrack.ID()
|
|
r.lock.Lock()
|
|
infos, ok := r.dataTracks[trackID]
|
|
if ok {
|
|
for _, info := range infos {
|
|
if info.DataTrack == dataTrack {
|
|
r.lock.Unlock()
|
|
r.logger.Infow("not adding duplicate data track", "trackID", trackID)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
r.dataTracks[trackID] = append(r.dataTracks[trackID], &DataTrackInfo{
|
|
DataTrack: dataTrack,
|
|
PublisherIdentity: publisherIdentity,
|
|
PublisherID: publisherID,
|
|
})
|
|
r.lock.Unlock()
|
|
|
|
r.NotifyTrackChanged(trackID)
|
|
}
|
|
|
|
func (r *RoomTrackManager) RemoveDataTrack(dataTrack types.DataTrack) {
|
|
trackID := dataTrack.ID()
|
|
r.lock.Lock()
|
|
// ensure we are removing the same track as added
|
|
infos, ok := r.dataTracks[trackID]
|
|
if !ok {
|
|
r.lock.Unlock()
|
|
return
|
|
}
|
|
|
|
numRemoved := 0
|
|
idx := 0
|
|
for _, info := range infos {
|
|
if info.DataTrack == dataTrack {
|
|
numRemoved++
|
|
} else {
|
|
r.dataTracks[trackID][idx] = info
|
|
idx++
|
|
}
|
|
}
|
|
for j := idx; j < len(infos); j++ {
|
|
r.dataTracks[trackID][j] = nil
|
|
}
|
|
r.dataTracks[trackID] = r.dataTracks[trackID][:idx]
|
|
if len(r.dataTracks[trackID]) == 0 {
|
|
delete(r.dataTracks, trackID)
|
|
}
|
|
r.lock.Unlock()
|
|
if numRemoved == 0 {
|
|
return
|
|
}
|
|
if numRemoved > 1 {
|
|
r.logger.Warnw("removed multiple data tracks", nil, "trackID", trackID, "numRemoved", numRemoved)
|
|
}
|
|
|
|
n := r.removedNotifier.GetNotifier(string(trackID))
|
|
if n != nil {
|
|
n.NotifyChanged()
|
|
}
|
|
|
|
r.changedNotifier.RemoveNotifier(string(trackID), true)
|
|
r.removedNotifier.RemoveNotifier(string(trackID), true)
|
|
}
|
|
|
|
func (r *RoomTrackManager) GetDataTrackInfo(trackID livekit.TrackID) *DataTrackInfo {
|
|
r.lock.RLock()
|
|
defer r.lock.RUnlock()
|
|
|
|
infos := r.dataTracks[trackID]
|
|
if len(infos) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// earliest added data track is used till it is removed
|
|
return infos[0]
|
|
}
|
|
|
|
func (r *RoomTrackManager) Report() (int, int) {
|
|
r.lock.RLock()
|
|
defer r.lock.RUnlock()
|
|
|
|
return len(r.tracks), len(r.dataTracks)
|
|
}
|