diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index cd1bd727c..186e37b1f 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -304,7 +304,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track sfu.TrackRe sfu.WithForwardStats(t.params.ForwardStats), ) newWR.OnCloseHandler(func() { - t.MediaTrackReceiver.SetClosing() + t.MediaTrackReceiver.SetClosing(false) t.MediaTrackReceiver.ClearReceiver(mimeType, false) if t.MediaTrackReceiver.TryClose() { if t.dynacastManager != nil { @@ -460,7 +460,7 @@ func (t *MediaTrack) Restart() { } func (t *MediaTrack) Close(isExpectedToResume bool) { - t.MediaTrackReceiver.SetClosing() + t.MediaTrackReceiver.SetClosing(isExpectedToResume) if t.dynacastManager != nil { t.dynacastManager.Close() } diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 8b6290071..2eebea050 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -376,7 +376,7 @@ func (t *MediaTrackReceiver) ClearReceiver(mime mime.MimeType, isExpectedToResum } func (t *MediaTrackReceiver) ClearAllReceivers(isExpectedToResume bool) { - t.params.Logger.Debugw("clearing all receivers") + t.params.Logger.Debugw("clearing all receivers", "isExpectedToResume", isExpectedToResume) t.lock.Lock() receivers := t.receivers t.receivers = nil @@ -408,12 +408,15 @@ func (t *MediaTrackReceiver) IsOpen() bool { return true } -func (t *MediaTrackReceiver) SetClosing() { +func (t *MediaTrackReceiver) SetClosing(isExpectedToResume bool) { t.lock.Lock() defer t.lock.Unlock() + if t.state == mediaTrackReceiverStateOpen { t.state = mediaTrackReceiverStateClosing } + + t.isExpectedToResume = isExpectedToResume } func (t *MediaTrackReceiver) TryClose() bool { @@ -603,6 +606,7 @@ func (t *MediaTrackReceiver) AddSubscriber(sub types.LocalParticipant) (types.Su t.lock.RUnlock() if remove { + t.params.Logger.Debugw("removing susbcriber on a not-open track", "subscriberID", sub.ID(), "isExpectedToResume", isExpectedToResume) _ = t.MediaTrackSubscriptions.RemoveSubscriber(sub.ID(), isExpectedToResume) return nil, ErrNotOpen } @@ -617,7 +621,7 @@ func (t *MediaTrackReceiver) RemoveSubscriber(subscriberID livekit.ParticipantID } func (t *MediaTrackReceiver) removeAllSubscribersForMime(mime mime.MimeType, isExpectedToResume bool) { - t.params.Logger.Debugw("removing all subscribers for mime", "mime", mime) + t.params.Logger.Debugw("removing all subscribers for mime", "mime", mime, "isExpectedToResume", isExpectedToResume) for _, subscriberID := range t.MediaTrackSubscriptions.GetAllSubscribersForMime(mime) { t.RemoveSubscriber(subscriberID, isExpectedToResume) } diff --git a/pkg/rtc/roomtrackmanager.go b/pkg/rtc/roomtrackmanager.go index a2813a04d..cd66f6d79 100644 --- a/pkg/rtc/roomtrackmanager.go +++ b/pkg/rtc/roomtrackmanager.go @@ -22,6 +22,7 @@ import ( "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" + "golang.org/x/exp/slices" ) // RoomTrackManager holds tracks that are published to the room @@ -29,7 +30,7 @@ type RoomTrackManager struct { lock sync.RWMutex changedNotifier *utils.ChangeNotifierManager removedNotifier *utils.ChangeNotifierManager - tracks map[livekit.TrackID]*TrackInfo + tracks map[livekit.TrackID][]*TrackInfo } type TrackInfo struct { @@ -40,52 +41,73 @@ type TrackInfo struct { func NewRoomTrackManager() *RoomTrackManager { return &RoomTrackManager{ - tracks: make(map[livekit.TrackID]*TrackInfo), + tracks: make(map[livekit.TrackID][]*TrackInfo), changedNotifier: utils.NewChangeNotifierManager(), removedNotifier: utils.NewChangeNotifierManager(), } } func (r *RoomTrackManager) AddTrack(track types.MediaTrack, publisherIdentity livekit.ParticipantIdentity, publisherID livekit.ParticipantID) { + trackID := track.ID() r.lock.Lock() - r.tracks[track.ID()] = &TrackInfo{ + r.tracks[trackID] = append(r.tracks[trackID], &TrackInfo{ Track: track, PublisherIdentity: publisherIdentity, PublisherID: publisherID, - } + }) r.lock.Unlock() - r.NotifyTrackChanged(track.ID()) + r.NotifyTrackChanged(trackID) } func (r *RoomTrackManager) RemoveTrack(track types.MediaTrack) { + trackID := track.ID() r.lock.Lock() // ensure we are removing the same track as added - info, ok := r.tracks[track.ID()] - if !ok || info.Track != track { + infos, ok := r.tracks[trackID] + if !ok { r.lock.Unlock() return } - delete(r.tracks, track.ID()) + + found := false + for idx, info := range infos { + if info.Track == track { + r.tracks[trackID] = slices.Delete(r.tracks[trackID], idx, idx+1) + if len(r.tracks[trackID]) == 0 { + delete(r.tracks, trackID) + } + found = true + break + } + } r.lock.Unlock() - n := r.removedNotifier.GetNotifier(string(track.ID())) + if !found { + return + } + + n := r.removedNotifier.GetNotifier(string(trackID)) if n != nil { n.NotifyChanged() } - r.changedNotifier.RemoveNotifier(string(track.ID()), true) - r.removedNotifier.RemoveNotifier(string(track.ID()), true) + r.changedNotifier.RemoveNotifier(string(trackID), true) + r.removedNotifier.RemoveNotifier(string(trackID), true) } func (r *RoomTrackManager) GetTrackInfo(trackID livekit.TrackID) *TrackInfo { r.lock.RLock() defer r.lock.RUnlock() - info := r.tracks[trackID] - if info == nil { + infos := r.tracks[trackID] + if len(infos) == 0 { return nil } + + // earliest added track is used till it is removed + info := infos[0] + // when track is about to close, do not resolve if info.Track != nil && !info.Track.IsOpen() { return nil