Handle subscribe race with track close better. (#3526)

There are two very very edge case scenarios this is triyng to address.

Scenario 1:
-----------
- both pA and pB migrating
- pA migrates first and subscribes to pB via remote track of pB
- while the above subscribe is happening, pB also migrates and
  closes the remote track
- by the time the subscribe set up completes, it realises that
  the remote track is not open any more and removes itself as
  subscriber
- but that removal is using the wrong `isExpectedToResume` as clearing
  all receivers has not run yet which is what caches the
  `isExpectedToResume`.
- That meant, the down track transceiver is not cached and hence not
  re-used when re-subscribing via pB's local track
- Fix it by caching the expected to resume when changing receiver state
  to `closing`.

Scenario 2:
-----------
- both pA and pB migrating
- pA migrates first and subscribes to pB via remote track of pB
- while the above subscribe is happening, pB also migrates and
  closes the remote track
- pB's local track is published before the remote track can be fully
  closed and all the subscribers removed. That local track gets added
  to track manager.
- While the remote track is cleaning, subscription manager triggers
  again to for pA to subscribe to pB's track. The track manager now
  resolves to the local track.
- Local track subscription progresses. As the remote track clean up is
  not finished, the transceiver is not cached. So, the local track based
  subscription creates a new transceiver and that ends up causing
  duplicate tracks in the SDP offer.
- Fix it by creating a FIFO in track manager and only resolve using the
  first one. So, in the above case, till the remote track is fully
  cleaned up, the track manager will resolve to that. Yes, the
  subscriptions itself will fail as the track is not in open state (i. e.
  it might be in `closing` state), but that is fine as subscription
  manager will eventually resolve to the local track and proper
  transceiver re-use can happen.
This commit is contained in:
Raja Subramanian
2025-03-14 14:37:37 +05:30
committed by GitHub
parent a6cb00b31e
commit 65d8aa2847
3 changed files with 44 additions and 18 deletions
+2 -2
View File
@@ -304,7 +304,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track sfu.TrackRe
sfu.WithForwardStats(t.params.ForwardStats),
)
newWR.OnCloseHandler(func() {
t.MediaTrackReceiver.SetClosing()
t.MediaTrackReceiver.SetClosing(false)
t.MediaTrackReceiver.ClearReceiver(mimeType, false)
if t.MediaTrackReceiver.TryClose() {
if t.dynacastManager != nil {
@@ -460,7 +460,7 @@ func (t *MediaTrack) Restart() {
}
func (t *MediaTrack) Close(isExpectedToResume bool) {
t.MediaTrackReceiver.SetClosing()
t.MediaTrackReceiver.SetClosing(isExpectedToResume)
if t.dynacastManager != nil {
t.dynacastManager.Close()
}
+7 -3
View File
@@ -376,7 +376,7 @@ func (t *MediaTrackReceiver) ClearReceiver(mime mime.MimeType, isExpectedToResum
}
func (t *MediaTrackReceiver) ClearAllReceivers(isExpectedToResume bool) {
t.params.Logger.Debugw("clearing all receivers")
t.params.Logger.Debugw("clearing all receivers", "isExpectedToResume", isExpectedToResume)
t.lock.Lock()
receivers := t.receivers
t.receivers = nil
@@ -408,12 +408,15 @@ func (t *MediaTrackReceiver) IsOpen() bool {
return true
}
func (t *MediaTrackReceiver) SetClosing() {
func (t *MediaTrackReceiver) SetClosing(isExpectedToResume bool) {
t.lock.Lock()
defer t.lock.Unlock()
if t.state == mediaTrackReceiverStateOpen {
t.state = mediaTrackReceiverStateClosing
}
t.isExpectedToResume = isExpectedToResume
}
func (t *MediaTrackReceiver) TryClose() bool {
@@ -603,6 +606,7 @@ func (t *MediaTrackReceiver) AddSubscriber(sub types.LocalParticipant) (types.Su
t.lock.RUnlock()
if remove {
t.params.Logger.Debugw("removing susbcriber on a not-open track", "subscriberID", sub.ID(), "isExpectedToResume", isExpectedToResume)
_ = t.MediaTrackSubscriptions.RemoveSubscriber(sub.ID(), isExpectedToResume)
return nil, ErrNotOpen
}
@@ -617,7 +621,7 @@ func (t *MediaTrackReceiver) RemoveSubscriber(subscriberID livekit.ParticipantID
}
func (t *MediaTrackReceiver) removeAllSubscribersForMime(mime mime.MimeType, isExpectedToResume bool) {
t.params.Logger.Debugw("removing all subscribers for mime", "mime", mime)
t.params.Logger.Debugw("removing all subscribers for mime", "mime", mime, "isExpectedToResume", isExpectedToResume)
for _, subscriberID := range t.MediaTrackSubscriptions.GetAllSubscribersForMime(mime) {
t.RemoveSubscriber(subscriberID, isExpectedToResume)
}
+35 -13
View File
@@ -22,6 +22,7 @@ import (
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/livekit"
"golang.org/x/exp/slices"
)
// RoomTrackManager holds tracks that are published to the room
@@ -29,7 +30,7 @@ type RoomTrackManager struct {
lock sync.RWMutex
changedNotifier *utils.ChangeNotifierManager
removedNotifier *utils.ChangeNotifierManager
tracks map[livekit.TrackID]*TrackInfo
tracks map[livekit.TrackID][]*TrackInfo
}
type TrackInfo struct {
@@ -40,52 +41,73 @@ type TrackInfo struct {
func NewRoomTrackManager() *RoomTrackManager {
return &RoomTrackManager{
tracks: make(map[livekit.TrackID]*TrackInfo),
tracks: make(map[livekit.TrackID][]*TrackInfo),
changedNotifier: utils.NewChangeNotifierManager(),
removedNotifier: utils.NewChangeNotifierManager(),
}
}
func (r *RoomTrackManager) AddTrack(track types.MediaTrack, publisherIdentity livekit.ParticipantIdentity, publisherID livekit.ParticipantID) {
trackID := track.ID()
r.lock.Lock()
r.tracks[track.ID()] = &TrackInfo{
r.tracks[trackID] = append(r.tracks[trackID], &TrackInfo{
Track: track,
PublisherIdentity: publisherIdentity,
PublisherID: publisherID,
}
})
r.lock.Unlock()
r.NotifyTrackChanged(track.ID())
r.NotifyTrackChanged(trackID)
}
func (r *RoomTrackManager) RemoveTrack(track types.MediaTrack) {
trackID := track.ID()
r.lock.Lock()
// ensure we are removing the same track as added
info, ok := r.tracks[track.ID()]
if !ok || info.Track != track {
infos, ok := r.tracks[trackID]
if !ok {
r.lock.Unlock()
return
}
delete(r.tracks, track.ID())
found := false
for idx, info := range infos {
if info.Track == track {
r.tracks[trackID] = slices.Delete(r.tracks[trackID], idx, idx+1)
if len(r.tracks[trackID]) == 0 {
delete(r.tracks, trackID)
}
found = true
break
}
}
r.lock.Unlock()
n := r.removedNotifier.GetNotifier(string(track.ID()))
if !found {
return
}
n := r.removedNotifier.GetNotifier(string(trackID))
if n != nil {
n.NotifyChanged()
}
r.changedNotifier.RemoveNotifier(string(track.ID()), true)
r.removedNotifier.RemoveNotifier(string(track.ID()), true)
r.changedNotifier.RemoveNotifier(string(trackID), true)
r.removedNotifier.RemoveNotifier(string(trackID), true)
}
func (r *RoomTrackManager) GetTrackInfo(trackID livekit.TrackID) *TrackInfo {
r.lock.RLock()
defer r.lock.RUnlock()
info := r.tracks[trackID]
if info == nil {
infos := r.tracks[trackID]
if len(infos) == 0 {
return nil
}
// earliest added track is used till it is removed
info := infos[0]
// when track is about to close, do not resolve
if info.Track != nil && !info.Track.IsOpen() {
return nil