Files
livekit/pkg/rtc/roomtrackmanager.go
Raja Subramanian 65d8aa2847 Handle subscribe race with track close better. (#3526)
There are two very very edge case scenarios this is triyng to address.

Scenario 1:
-----------
- both pA and pB migrating
- pA migrates first and subscribes to pB via remote track of pB
- while the above subscribe is happening, pB also migrates and
  closes the remote track
- by the time the subscribe set up completes, it realises that
  the remote track is not open any more and removes itself as
  subscriber
- but that removal is using the wrong `isExpectedToResume` as clearing
  all receivers has not run yet which is what caches the
  `isExpectedToResume`.
- That meant, the down track transceiver is not cached and hence not
  re-used when re-subscribing via pB's local track
- Fix it by caching the expected to resume when changing receiver state
  to `closing`.

Scenario 2:
-----------
- both pA and pB migrating
- pA migrates first and subscribes to pB via remote track of pB
- while the above subscribe is happening, pB also migrates and
  closes the remote track
- pB's local track is published before the remote track can be fully
  closed and all the subscribers removed. That local track gets added
  to track manager.
- While the remote track is cleaning, subscription manager triggers
  again to for pA to subscribe to pB's track. The track manager now
  resolves to the local track.
- Local track subscription progresses. As the remote track clean up is
  not finished, the transceiver is not cached. So, the local track based
  subscription creates a new transceiver and that ends up causing
  duplicate tracks in the SDP offer.
- Fix it by creating a FIFO in track manager and only resolve using the
  first one. So, in the above case, till the remote track is fully
  cleaned up, the track manager will resolve to that. Yes, the
  subscriptions itself will fail as the track is not in open state (i. e.
  it might be in `closing` state), but that is fine as subscription
  manager will eventually resolve to the local track and proper
  transceiver re-use can happen.
2025-03-14 14:37:37 +05:30

148 lines
3.9 KiB
Go

/*
* Copyright 2023 LiveKit, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rtc
import (
"sync"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/livekit"
"golang.org/x/exp/slices"
)
// RoomTrackManager holds tracks that are published to the room
type RoomTrackManager struct {
lock sync.RWMutex
changedNotifier *utils.ChangeNotifierManager
removedNotifier *utils.ChangeNotifierManager
tracks map[livekit.TrackID][]*TrackInfo
}
type TrackInfo struct {
Track types.MediaTrack
PublisherIdentity livekit.ParticipantIdentity
PublisherID livekit.ParticipantID
}
func NewRoomTrackManager() *RoomTrackManager {
return &RoomTrackManager{
tracks: make(map[livekit.TrackID][]*TrackInfo),
changedNotifier: utils.NewChangeNotifierManager(),
removedNotifier: utils.NewChangeNotifierManager(),
}
}
func (r *RoomTrackManager) AddTrack(track types.MediaTrack, publisherIdentity livekit.ParticipantIdentity, publisherID livekit.ParticipantID) {
trackID := track.ID()
r.lock.Lock()
r.tracks[trackID] = append(r.tracks[trackID], &TrackInfo{
Track: track,
PublisherIdentity: publisherIdentity,
PublisherID: publisherID,
})
r.lock.Unlock()
r.NotifyTrackChanged(trackID)
}
func (r *RoomTrackManager) RemoveTrack(track types.MediaTrack) {
trackID := track.ID()
r.lock.Lock()
// ensure we are removing the same track as added
infos, ok := r.tracks[trackID]
if !ok {
r.lock.Unlock()
return
}
found := false
for idx, info := range infos {
if info.Track == track {
r.tracks[trackID] = slices.Delete(r.tracks[trackID], idx, idx+1)
if len(r.tracks[trackID]) == 0 {
delete(r.tracks, trackID)
}
found = true
break
}
}
r.lock.Unlock()
if !found {
return
}
n := r.removedNotifier.GetNotifier(string(trackID))
if n != nil {
n.NotifyChanged()
}
r.changedNotifier.RemoveNotifier(string(trackID), true)
r.removedNotifier.RemoveNotifier(string(trackID), true)
}
func (r *RoomTrackManager) GetTrackInfo(trackID livekit.TrackID) *TrackInfo {
r.lock.RLock()
defer r.lock.RUnlock()
infos := r.tracks[trackID]
if len(infos) == 0 {
return nil
}
// earliest added track is used till it is removed
info := infos[0]
// when track is about to close, do not resolve
if info.Track != nil && !info.Track.IsOpen() {
return nil
}
return info
}
func (r *RoomTrackManager) NotifyTrackChanged(trackID livekit.TrackID) {
n := r.changedNotifier.GetNotifier(string(trackID))
if n != nil {
n.NotifyChanged()
}
}
// HasObservers lets caller know if the current media track has any observers
// this is used to signal interest in the track. when another MediaTrack with the same
// trackID is being used, track is not considered to be observed.
func (r *RoomTrackManager) HasObservers(track types.MediaTrack) bool {
n := r.changedNotifier.GetNotifier(string(track.ID()))
if n == nil || !n.HasObservers() {
return false
}
info := r.GetTrackInfo(track.ID())
if info == nil || info.Track != track {
return false
}
return true
}
func (r *RoomTrackManager) GetOrCreateTrackChangeNotifier(trackID livekit.TrackID) *utils.ChangeNotifier {
return r.changedNotifier.GetOrCreateNotifier(string(trackID))
}
func (r *RoomTrackManager) GetOrCreateTrackRemoveNotifier(trackID livekit.TrackID) *utils.ChangeNotifier {
return r.removedNotifier.GetOrCreateNotifier(string(trackID))
}