mirror of
https://github.com/livekit/livekit.git
synced 2026-04-01 04:25:39 +00:00
There are two very very edge case scenarios this is triyng to address. Scenario 1: ----------- - both pA and pB migrating - pA migrates first and subscribes to pB via remote track of pB - while the above subscribe is happening, pB also migrates and closes the remote track - by the time the subscribe set up completes, it realises that the remote track is not open any more and removes itself as subscriber - but that removal is using the wrong `isExpectedToResume` as clearing all receivers has not run yet which is what caches the `isExpectedToResume`. - That meant, the down track transceiver is not cached and hence not re-used when re-subscribing via pB's local track - Fix it by caching the expected to resume when changing receiver state to `closing`. Scenario 2: ----------- - both pA and pB migrating - pA migrates first and subscribes to pB via remote track of pB - while the above subscribe is happening, pB also migrates and closes the remote track - pB's local track is published before the remote track can be fully closed and all the subscribers removed. That local track gets added to track manager. - While the remote track is cleaning, subscription manager triggers again to for pA to subscribe to pB's track. The track manager now resolves to the local track. - Local track subscription progresses. As the remote track clean up is not finished, the transceiver is not cached. So, the local track based subscription creates a new transceiver and that ends up causing duplicate tracks in the SDP offer. - Fix it by creating a FIFO in track manager and only resolve using the first one. So, in the above case, till the remote track is fully cleaned up, the track manager will resolve to that. Yes, the subscriptions itself will fail as the track is not in open state (i. e. it might be in `closing` state), but that is fine as subscription manager will eventually resolve to the local track and proper transceiver re-use can happen.
148 lines
3.9 KiB
Go
148 lines
3.9 KiB
Go
/*
|
|
* Copyright 2023 LiveKit, Inc
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package rtc
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/livekit/livekit-server/pkg/rtc/types"
|
|
"github.com/livekit/livekit-server/pkg/utils"
|
|
"github.com/livekit/protocol/livekit"
|
|
"golang.org/x/exp/slices"
|
|
)
|
|
|
|
// RoomTrackManager holds tracks that are published to the room
|
|
type RoomTrackManager struct {
|
|
lock sync.RWMutex
|
|
changedNotifier *utils.ChangeNotifierManager
|
|
removedNotifier *utils.ChangeNotifierManager
|
|
tracks map[livekit.TrackID][]*TrackInfo
|
|
}
|
|
|
|
type TrackInfo struct {
|
|
Track types.MediaTrack
|
|
PublisherIdentity livekit.ParticipantIdentity
|
|
PublisherID livekit.ParticipantID
|
|
}
|
|
|
|
func NewRoomTrackManager() *RoomTrackManager {
|
|
return &RoomTrackManager{
|
|
tracks: make(map[livekit.TrackID][]*TrackInfo),
|
|
changedNotifier: utils.NewChangeNotifierManager(),
|
|
removedNotifier: utils.NewChangeNotifierManager(),
|
|
}
|
|
}
|
|
|
|
func (r *RoomTrackManager) AddTrack(track types.MediaTrack, publisherIdentity livekit.ParticipantIdentity, publisherID livekit.ParticipantID) {
|
|
trackID := track.ID()
|
|
r.lock.Lock()
|
|
r.tracks[trackID] = append(r.tracks[trackID], &TrackInfo{
|
|
Track: track,
|
|
PublisherIdentity: publisherIdentity,
|
|
PublisherID: publisherID,
|
|
})
|
|
r.lock.Unlock()
|
|
|
|
r.NotifyTrackChanged(trackID)
|
|
}
|
|
|
|
func (r *RoomTrackManager) RemoveTrack(track types.MediaTrack) {
|
|
trackID := track.ID()
|
|
r.lock.Lock()
|
|
// ensure we are removing the same track as added
|
|
infos, ok := r.tracks[trackID]
|
|
if !ok {
|
|
r.lock.Unlock()
|
|
return
|
|
}
|
|
|
|
found := false
|
|
for idx, info := range infos {
|
|
if info.Track == track {
|
|
r.tracks[trackID] = slices.Delete(r.tracks[trackID], idx, idx+1)
|
|
if len(r.tracks[trackID]) == 0 {
|
|
delete(r.tracks, trackID)
|
|
}
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
r.lock.Unlock()
|
|
|
|
if !found {
|
|
return
|
|
}
|
|
|
|
n := r.removedNotifier.GetNotifier(string(trackID))
|
|
if n != nil {
|
|
n.NotifyChanged()
|
|
}
|
|
|
|
r.changedNotifier.RemoveNotifier(string(trackID), true)
|
|
r.removedNotifier.RemoveNotifier(string(trackID), true)
|
|
}
|
|
|
|
func (r *RoomTrackManager) GetTrackInfo(trackID livekit.TrackID) *TrackInfo {
|
|
r.lock.RLock()
|
|
defer r.lock.RUnlock()
|
|
|
|
infos := r.tracks[trackID]
|
|
if len(infos) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// earliest added track is used till it is removed
|
|
info := infos[0]
|
|
|
|
// when track is about to close, do not resolve
|
|
if info.Track != nil && !info.Track.IsOpen() {
|
|
return nil
|
|
}
|
|
return info
|
|
}
|
|
|
|
func (r *RoomTrackManager) NotifyTrackChanged(trackID livekit.TrackID) {
|
|
n := r.changedNotifier.GetNotifier(string(trackID))
|
|
if n != nil {
|
|
n.NotifyChanged()
|
|
}
|
|
}
|
|
|
|
// HasObservers lets caller know if the current media track has any observers
|
|
// this is used to signal interest in the track. when another MediaTrack with the same
|
|
// trackID is being used, track is not considered to be observed.
|
|
func (r *RoomTrackManager) HasObservers(track types.MediaTrack) bool {
|
|
n := r.changedNotifier.GetNotifier(string(track.ID()))
|
|
if n == nil || !n.HasObservers() {
|
|
return false
|
|
}
|
|
|
|
info := r.GetTrackInfo(track.ID())
|
|
if info == nil || info.Track != track {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (r *RoomTrackManager) GetOrCreateTrackChangeNotifier(trackID livekit.TrackID) *utils.ChangeNotifier {
|
|
return r.changedNotifier.GetOrCreateNotifier(string(trackID))
|
|
}
|
|
|
|
func (r *RoomTrackManager) GetOrCreateTrackRemoveNotifier(trackID livekit.TrackID) *utils.ChangeNotifier {
|
|
return r.removedNotifier.GetOrCreateNotifier(string(trackID))
|
|
}
|