From ef838e4fa2d9d8a81a54a32a14b3d138e61339c8 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 17 Jun 2024 23:51:00 +0530 Subject: [PATCH] Indicate if track is expectd to be resumed in `onClose` callback. (#2800) That is the main change. Changed variable name to `isExpectedToResume` everywhere to be consistent. Planning to use the callback value in relays to determine if the down track should be closed or switched to a different up track. --- pkg/rtc/mediatrack.go | 6 +-- pkg/rtc/mediatrackreceiver.go | 48 ++++++++++--------- pkg/rtc/mediatracksubscriptions.go | 24 +++++----- pkg/rtc/participant.go | 2 +- pkg/rtc/subscribedtrack.go | 4 +- pkg/rtc/subscriptionmanager.go | 16 +++---- pkg/rtc/subscriptionmanager_test.go | 28 +++++------ pkg/rtc/types/interfaces.go | 14 +++--- .../typesfakes/fake_local_media_track.go | 12 ++--- pkg/rtc/types/typesfakes/fake_media_track.go | 12 ++--- .../types/typesfakes/fake_subscribed_track.go | 12 ++--- pkg/rtc/uptrackmanager.go | 12 ++--- pkg/sfu/downtrack.go | 6 +-- 13 files changed, 99 insertions(+), 97 deletions(-) diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 4531f0dcd..85368df95 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -411,13 +411,13 @@ func (t *MediaTrack) Restart() { } } -func (t *MediaTrack) Close(willBeResumed bool) { +func (t *MediaTrack) Close(isExpectedToResume bool) { t.MediaTrackReceiver.SetClosing() if t.dynacastManager != nil { t.dynacastManager.Close() } - t.MediaTrackReceiver.ClearAllReceivers(willBeResumed) - t.MediaTrackReceiver.Close() + t.MediaTrackReceiver.ClearAllReceivers(isExpectedToResume) + t.MediaTrackReceiver.Close(isExpectedToResume) } func (t *MediaTrack) SetMuted(muted bool) { diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index b346eb37f..e4c5b4448 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -97,16 +97,16 @@ type MediaTrackReceiverParams struct { type MediaTrackReceiver struct { params MediaTrackReceiverParams - lock sync.RWMutex - receivers []*simulcastReceiver - trackInfo *livekit.TrackInfo - potentialCodecs []webrtc.RTPCodecParameters - state mediaTrackReceiverState - willBeResumed bool + lock sync.RWMutex + receivers []*simulcastReceiver + trackInfo *livekit.TrackInfo + potentialCodecs []webrtc.RTPCodecParameters + state mediaTrackReceiverState + isExpectedToResume bool onSetupReceiver func(mime string) onMediaLossFeedback func(dt *sfu.DownTrack, report *rtcp.ReceiverReport) - onClose []func() + onClose []func(isExpectedToResume bool) *MediaTrackSubscriptions } @@ -258,7 +258,7 @@ func (t *MediaTrackReceiver) SetPotentialCodecs(codecs []webrtc.RTPCodecParamete t.lock.Unlock() } -func (t *MediaTrackReceiver) ClearReceiver(mime string, willBeResumed bool) { +func (t *MediaTrackReceiver) ClearReceiver(mime string, isExpectedToResume bool) { t.lock.Lock() receivers := slices.Clone(t.receivers) for idx, receiver := range receivers { @@ -272,20 +272,20 @@ func (t *MediaTrackReceiver) ClearReceiver(mime string, willBeResumed bool) { t.receivers = receivers t.lock.Unlock() - t.removeAllSubscribersForMime(mime, willBeResumed) + t.removeAllSubscribersForMime(mime, isExpectedToResume) } -func (t *MediaTrackReceiver) ClearAllReceivers(willBeResumed bool) { +func (t *MediaTrackReceiver) ClearAllReceivers(isExpectedToResume bool) { t.params.Logger.Debugw("clearing all receivers") t.lock.Lock() receivers := t.receivers t.receivers = nil - t.willBeResumed = willBeResumed + t.isExpectedToResume = isExpectedToResume t.lock.Unlock() for _, r := range receivers { - t.removeAllSubscribersForMime(r.Codec().MimeType, willBeResumed) + t.removeAllSubscribersForMime(r.Codec().MimeType, isExpectedToResume) } } @@ -332,16 +332,18 @@ func (t *MediaTrackReceiver) TryClose() bool { numActiveReceivers++ } } + + isExpectedToResume := t.isExpectedToResume t.lock.RUnlock() if numActiveReceivers != 0 { return false } - t.Close() + t.Close(isExpectedToResume) return true } -func (t *MediaTrackReceiver) Close() { +func (t *MediaTrackReceiver) Close(isExpectedToResume bool) { t.lock.Lock() if t.state == mediaTrackReceiverStateClosed { t.lock.Unlock() @@ -353,7 +355,7 @@ func (t *MediaTrackReceiver) Close() { t.lock.Unlock() for _, f := range onclose { - f() + f(isExpectedToResume) } } @@ -437,7 +439,7 @@ func (t *MediaTrackReceiver) SetMuted(muted bool) { t.MediaTrackSubscriptions.SetMuted(muted) } -func (t *MediaTrackReceiver) AddOnClose(f func()) { +func (t *MediaTrackReceiver) AddOnClose(f func(isExpectedToResume bool)) { if f == nil { return } @@ -499,16 +501,16 @@ func (t *MediaTrackReceiver) AddSubscriber(sub types.LocalParticipant) (types.Su // media track could have been closed while adding subscription remove := false - willBeResumed := false + isExpectedToResume := false t.lock.RLock() if t.state != mediaTrackReceiverStateOpen { - willBeResumed = t.willBeResumed + isExpectedToResume = t.isExpectedToResume remove = true } t.lock.RUnlock() if remove { - _ = t.MediaTrackSubscriptions.RemoveSubscriber(sub.ID(), willBeResumed) + _ = t.MediaTrackSubscriptions.RemoveSubscriber(sub.ID(), isExpectedToResume) return nil, ErrNotOpen } @@ -517,14 +519,14 @@ func (t *MediaTrackReceiver) AddSubscriber(sub types.LocalParticipant) (types.Su // RemoveSubscriber removes participant from subscription // stop all forwarders to the client -func (t *MediaTrackReceiver) RemoveSubscriber(subscriberID livekit.ParticipantID, willBeResumed bool) { - _ = t.MediaTrackSubscriptions.RemoveSubscriber(subscriberID, willBeResumed) +func (t *MediaTrackReceiver) RemoveSubscriber(subscriberID livekit.ParticipantID, isExpectedToResume bool) { + _ = t.MediaTrackSubscriptions.RemoveSubscriber(subscriberID, isExpectedToResume) } -func (t *MediaTrackReceiver) removeAllSubscribersForMime(mime string, willBeResumed bool) { +func (t *MediaTrackReceiver) removeAllSubscribersForMime(mime string, isExpectedToResume bool) { t.params.Logger.Debugw("removing all subscribers for mime", "mime", mime) for _, subscriberID := range t.MediaTrackSubscriptions.GetAllSubscribersForMime(mime) { - t.RemoveSubscriber(subscriberID, willBeResumed) + t.RemoveSubscriber(subscriberID, isExpectedToResume) } } diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index e2f4d5b00..f49faeb68 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -296,8 +296,8 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * // But, the subscription could be removed early if the published track is closed // while adding subscription. In those cases, subscription manager would not have set // the `OnClose` callback. So, set it here to handle cases of early close. - subTrack.OnClose(func(willBeResumed bool) { - if !willBeResumed { + subTrack.OnClose(func(isExpectedToResume bool) { + if !isExpectedToResume { if err := sub.RemoveTrackFromSubscriber(sender); err != nil { t.params.Logger.Warnw("could not remove track from peer connection", err) } @@ -306,8 +306,8 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * downTrack.SetTransceiver(transceiver) - downTrack.OnCloseHandler(func(willBeResumed bool) { - go t.downTrackClosed(sub, willBeResumed) + downTrack.OnCloseHandler(func(isExpectedToResume bool) { + go t.downTrackClosed(sub, isExpectedToResume) }) t.subscribedTracksMu.Lock() @@ -319,24 +319,24 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * // RemoveSubscriber removes participant from subscription // stop all forwarders to the client -func (t *MediaTrackSubscriptions) RemoveSubscriber(subscriberID livekit.ParticipantID, willBeResumed bool) error { +func (t *MediaTrackSubscriptions) RemoveSubscriber(subscriberID livekit.ParticipantID, isExpectedToResume bool) error { subTrack := t.getSubscribedTrack(subscriberID) if subTrack == nil { return errNotFound } - t.params.Logger.Debugw("removing subscriber", "subscriberID", subscriberID, "willBeResumed", willBeResumed) - t.closeSubscribedTrack(subTrack, willBeResumed) + t.params.Logger.Debugw("removing subscriber", "subscriberID", subscriberID, "isExpectedToResume", isExpectedToResume) + t.closeSubscribedTrack(subTrack, isExpectedToResume) return nil } -func (t *MediaTrackSubscriptions) closeSubscribedTrack(subTrack types.SubscribedTrack, willBeResumed bool) { +func (t *MediaTrackSubscriptions) closeSubscribedTrack(subTrack types.SubscribedTrack, isExpectedToResume bool) { dt := subTrack.DownTrack() if dt == nil { return } - if willBeResumed { + if isExpectedToResume { dt.CloseWithFlush(false) } else { // flushing blocks, avoid blocking when publisher removes all its subscribers @@ -418,7 +418,7 @@ func (t *MediaTrackSubscriptions) DebugInfo() []map[string]interface{} { func (t *MediaTrackSubscriptions) downTrackClosed( sub types.LocalParticipant, - willBeResumed bool, + isExpectedToResume bool, ) { subscriberID := sub.ID() t.subscribedTracksMu.RLock() @@ -429,7 +429,7 @@ func (t *MediaTrackSubscriptions) downTrackClosed( // Cache transceiver for potential re-use on resume. // To ensure subscription manager does not re-subscribe before caching, // delete the subscribed track only after caching. - if willBeResumed { + if isExpectedToResume { dt := subTrack.DownTrack() tr := dt.GetTransceiver() if tr != nil { @@ -442,6 +442,6 @@ func (t *MediaTrackSubscriptions) downTrackClosed( delete(t.subscribedTracks, subscriberID) t.subscribedTracksMu.Unlock() - subTrack.Close(willBeResumed) + subTrack.Close(isExpectedToResume) } } diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 504b7bc24..e7e847291 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -2125,7 +2125,7 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv } trackID := livekit.TrackID(ti.Sid) - mt.AddOnClose(func() { + mt.AddOnClose(func(_isExpectedToRsume bool) { if p.supervisor != nil { p.supervisor.ClearPublishedTrack(trackID, mt) } diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index eff9b8358..f5dd9e995 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -139,9 +139,9 @@ func (t *SubscribedTrack) Bound(err error) { } // for DownTrack callback to notify us that it's closed -func (t *SubscribedTrack) Close(willBeResumed bool) { +func (t *SubscribedTrack) Close(isExpectedToResume bool) { if onClose := t.onClose.Load(); onClose != nil { - go onClose.(func(bool))(willBeResumed) + go onClose.(func(bool))(isExpectedToResume) } } diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index a7dfa6608..4a49b11db 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -91,7 +91,7 @@ func NewSubscriptionManager(params SubscriptionManagerParams) *SubscriptionManag return m } -func (m *SubscriptionManager) Close(willBeResumed bool) { +func (m *SubscriptionManager) Close(isExpectedToResume bool) { m.lock.Lock() if m.isClosed() { m.lock.Unlock() @@ -113,7 +113,7 @@ func (m *SubscriptionManager) Close(willBeResumed bool) { } } - if willBeResumed { + if isExpectedToResume { for _, dt := range downTracksToClose { dt.CloseWithFlush(false) } @@ -523,8 +523,8 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error { ) } if err == nil && subTrack != nil { // subTrack could be nil if already subscribed - subTrack.OnClose(func(willBeResumed bool) { - m.handleSubscribedTrackClose(s, willBeResumed) + subTrack.OnClose(func(isExpectedToResume bool) { + m.handleSubscribedTrackClose(s, isExpectedToResume) }) subTrack.AddOnBind(func(err error) { if err != nil { @@ -615,10 +615,10 @@ func (m *SubscriptionManager) handleSourceTrackRemoved(trackID livekit.TrackID) // - subscriber-initiated unsubscribe // - UpTrack was closed // - publisher revoked permissions for the participant -func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, willBeResumed bool) { +func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, isExpectedToResume bool) { s.logger.Debugw( "subscribed track closed", - "willBeResumed", willBeResumed, + "isExpectedToResume", isExpectedToResume, ) wasBound := s.isBound() subTrack := s.getSubscribedTrack() @@ -666,7 +666,7 @@ func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, w context.Background(), m.params.Participant.ID(), &livekit.TrackInfo{Sid: string(s.trackID), Type: subTrack.MediaTrack().Kind()}, - !willBeResumed, + !isExpectedToResume, ) dt := subTrack.DownTrack() @@ -684,7 +684,7 @@ func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, w } } - if !willBeResumed { + if !isExpectedToResume { sender := subTrack.RTPSender() if sender != nil { s.logger.Debugw("removing PeerConnection track", diff --git a/pkg/rtc/subscriptionmanager_test.go b/pkg/rtc/subscriptionmanager_test.go index a666f47f6..c6410391c 100644 --- a/pkg/rtc/subscriptionmanager_test.go +++ b/pkg/rtc/subscriptionmanager_test.go @@ -214,11 +214,11 @@ func TestUnsubscribe(t *testing.T) { st, err := res.Track.AddSubscriber(sm.params.Participant) require.NoError(t, err) s.subscribedTrack = st - st.OnClose(func(willBeResumed bool) { - sm.handleSubscribedTrackClose(s, willBeResumed) + st.OnClose(func(isExpectedToResume bool) { + sm.handleSubscribedTrackClose(s, isExpectedToResume) }) - res.Track.(*typesfakes.FakeMediaTrack).RemoveSubscriberCalls(func(pID livekit.ParticipantID, willBeResumed bool) { - setTestSubscribedTrackClosed(t, st, willBeResumed) + res.Track.(*typesfakes.FakeMediaTrack).RemoveSubscriberCalls(func(pID livekit.ParticipantID, isExpectedToResume bool) { + setTestSubscribedTrackClosed(t, st, isExpectedToResume) }) sm.lock.Lock() @@ -279,18 +279,18 @@ func TestSubscribeStatusChanged(t *testing.T) { return !s1.needsSubscribe() && !s2.needsSubscribe() }, subSettleTimeout, subCheckInterval, "track1 and track2 should be subscribed") st1 := s1.getSubscribedTrack() - st1.OnClose(func(willBeResumed bool) { - sm.handleSubscribedTrackClose(s1, willBeResumed) + st1.OnClose(func(isExpectedToResume bool) { + sm.handleSubscribedTrackClose(s1, isExpectedToResume) }) st2 := s2.getSubscribedTrack() - st2.OnClose(func(willBeResumed bool) { - sm.handleSubscribedTrackClose(s2, willBeResumed) + st2.OnClose(func(isExpectedToResume bool) { + sm.handleSubscribedTrackClose(s2, isExpectedToResume) }) - st1.MediaTrack().(*typesfakes.FakeMediaTrack).RemoveSubscriberCalls(func(pID livekit.ParticipantID, willBeResumed bool) { - setTestSubscribedTrackClosed(t, st1, willBeResumed) + st1.MediaTrack().(*typesfakes.FakeMediaTrack).RemoveSubscriberCalls(func(pID livekit.ParticipantID, isExpectedToResume bool) { + setTestSubscribedTrackClosed(t, st1, isExpectedToResume) }) - st2.MediaTrack().(*typesfakes.FakeMediaTrack).RemoveSubscriberCalls(func(pID livekit.ParticipantID, willBeResumed bool) { - setTestSubscribedTrackClosed(t, st2, willBeResumed) + st2.MediaTrack().(*typesfakes.FakeMediaTrack).RemoveSubscriberCalls(func(pID livekit.ParticipantID, isExpectedToResume bool) { + setTestSubscribedTrackClosed(t, st2, isExpectedToResume) }) require.Eventually(t, func() bool { @@ -533,9 +533,9 @@ func setTestSubscribedTrackBound(t *testing.T, st types.SubscribedTrack) { } } -func setTestSubscribedTrackClosed(t *testing.T, st types.SubscribedTrack, willBeResumed bool) { +func setTestSubscribedTrackClosed(t *testing.T, st types.SubscribedTrack, isExpectedToResume bool) { fst, ok := st.(*typesfakes.FakeSubscribedTrack) require.True(t, ok) - fst.OnCloseArgsForCall(0)(willBeResumed) + fst.OnCloseArgsForCall(0)(isExpectedToResume) } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index b51aaa784..93bb32206 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -261,7 +261,7 @@ type Participant interface { IsPublisher() bool GetPublishedTrack(trackID livekit.TrackID) MediaTrack GetPublishedTracks() []MediaTrack - RemovePublishedTrack(track MediaTrack, willBeResumed bool, shouldClose bool) + RemovePublishedTrack(track MediaTrack, isExpectedToResume bool, shouldClose bool) GetAudioLevel() (smoothedLevel float64, active bool) @@ -466,15 +466,15 @@ type MediaTrack interface { GetAudioLevel() (level float64, active bool) - Close(willBeResumed bool) + Close(isExpectedToResume bool) IsOpen() bool // callbacks - AddOnClose(func()) + AddOnClose(func(isExpectedToResume bool)) // subscribers AddSubscriber(participant LocalParticipant) (SubscribedTrack, error) - RemoveSubscriber(participantID livekit.ParticipantID, willBeResumed bool) + RemoveSubscriber(participantID livekit.ParticipantID, isExpectedToResume bool) IsSubscriber(subID livekit.ParticipantID) bool RevokeDisallowedSubscribers(allowedSubscriberIdentities []livekit.ParticipantIdentity) []livekit.ParticipantIdentity GetAllSubscribers() []livekit.ParticipantID @@ -487,7 +487,7 @@ type MediaTrack interface { GetTemporalLayerForSpatialFps(spatial int32, fps uint32, mime string) int32 Receivers() []sfu.TrackReceiver - ClearAllReceivers(willBeResumed bool) + ClearAllReceivers(isExpectedToResume bool) IsEncrypted() bool } @@ -514,8 +514,8 @@ type LocalMediaTrack interface { type SubscribedTrack interface { AddOnBind(f func(error)) IsBound() bool - Close(willBeResumed bool) - OnClose(f func(willBeResumed bool)) + Close(isExpectedToResume bool) + OnClose(f func(isExpectedToResume bool)) ID() livekit.TrackID PublisherID() livekit.ParticipantID PublisherIdentity() livekit.ParticipantIdentity diff --git a/pkg/rtc/types/typesfakes/fake_local_media_track.go b/pkg/rtc/types/typesfakes/fake_local_media_track.go index 3545f9187..1bf6297b4 100644 --- a/pkg/rtc/types/typesfakes/fake_local_media_track.go +++ b/pkg/rtc/types/typesfakes/fake_local_media_track.go @@ -10,10 +10,10 @@ import ( ) type FakeLocalMediaTrack struct { - AddOnCloseStub func(func()) + AddOnCloseStub func(func(isExpectedToResume bool)) addOnCloseMutex sync.RWMutex addOnCloseArgsForCall []struct { - arg1 func() + arg1 func(isExpectedToResume bool) } AddSubscriberStub func(types.LocalParticipant) (types.SubscribedTrack, error) addSubscriberMutex sync.RWMutex @@ -351,10 +351,10 @@ type FakeLocalMediaTrack struct { invocationsMutex sync.RWMutex } -func (fake *FakeLocalMediaTrack) AddOnClose(arg1 func()) { +func (fake *FakeLocalMediaTrack) AddOnClose(arg1 func(isExpectedToResume bool)) { fake.addOnCloseMutex.Lock() fake.addOnCloseArgsForCall = append(fake.addOnCloseArgsForCall, struct { - arg1 func() + arg1 func(isExpectedToResume bool) }{arg1}) stub := fake.AddOnCloseStub fake.recordInvocation("AddOnClose", []interface{}{arg1}) @@ -370,13 +370,13 @@ func (fake *FakeLocalMediaTrack) AddOnCloseCallCount() int { return len(fake.addOnCloseArgsForCall) } -func (fake *FakeLocalMediaTrack) AddOnCloseCalls(stub func(func())) { +func (fake *FakeLocalMediaTrack) AddOnCloseCalls(stub func(func(isExpectedToResume bool))) { fake.addOnCloseMutex.Lock() defer fake.addOnCloseMutex.Unlock() fake.AddOnCloseStub = stub } -func (fake *FakeLocalMediaTrack) AddOnCloseArgsForCall(i int) func() { +func (fake *FakeLocalMediaTrack) AddOnCloseArgsForCall(i int) func(isExpectedToResume bool) { fake.addOnCloseMutex.RLock() defer fake.addOnCloseMutex.RUnlock() argsForCall := fake.addOnCloseArgsForCall[i] diff --git a/pkg/rtc/types/typesfakes/fake_media_track.go b/pkg/rtc/types/typesfakes/fake_media_track.go index 174d49b82..887646a18 100644 --- a/pkg/rtc/types/typesfakes/fake_media_track.go +++ b/pkg/rtc/types/typesfakes/fake_media_track.go @@ -10,10 +10,10 @@ import ( ) type FakeMediaTrack struct { - AddOnCloseStub func(func()) + AddOnCloseStub func(func(isExpectedToResume bool)) addOnCloseMutex sync.RWMutex addOnCloseArgsForCall []struct { - arg1 func() + arg1 func(isExpectedToResume bool) } AddSubscriberStub func(types.LocalParticipant) (types.SubscribedTrack, error) addSubscriberMutex sync.RWMutex @@ -287,10 +287,10 @@ type FakeMediaTrack struct { invocationsMutex sync.RWMutex } -func (fake *FakeMediaTrack) AddOnClose(arg1 func()) { +func (fake *FakeMediaTrack) AddOnClose(arg1 func(isExpectedToResume bool)) { fake.addOnCloseMutex.Lock() fake.addOnCloseArgsForCall = append(fake.addOnCloseArgsForCall, struct { - arg1 func() + arg1 func(isExpectedToResume bool) }{arg1}) stub := fake.AddOnCloseStub fake.recordInvocation("AddOnClose", []interface{}{arg1}) @@ -306,13 +306,13 @@ func (fake *FakeMediaTrack) AddOnCloseCallCount() int { return len(fake.addOnCloseArgsForCall) } -func (fake *FakeMediaTrack) AddOnCloseCalls(stub func(func())) { +func (fake *FakeMediaTrack) AddOnCloseCalls(stub func(func(isExpectedToResume bool))) { fake.addOnCloseMutex.Lock() defer fake.addOnCloseMutex.Unlock() fake.AddOnCloseStub = stub } -func (fake *FakeMediaTrack) AddOnCloseArgsForCall(i int) func() { +func (fake *FakeMediaTrack) AddOnCloseArgsForCall(i int) func(isExpectedToResume bool) { fake.addOnCloseMutex.RLock() defer fake.addOnCloseMutex.RUnlock() argsForCall := fake.addOnCloseArgsForCall[i] diff --git a/pkg/rtc/types/typesfakes/fake_subscribed_track.go b/pkg/rtc/types/typesfakes/fake_subscribed_track.go index 375e2cb44..42c9a011c 100644 --- a/pkg/rtc/types/typesfakes/fake_subscribed_track.go +++ b/pkg/rtc/types/typesfakes/fake_subscribed_track.go @@ -81,10 +81,10 @@ type FakeSubscribedTrack struct { needsNegotiationReturnsOnCall map[int]struct { result1 bool } - OnCloseStub func(func(willBeResumed bool)) + OnCloseStub func(func(isExpectedToResume bool)) onCloseMutex sync.RWMutex onCloseArgsForCall []struct { - arg1 func(willBeResumed bool) + arg1 func(isExpectedToResume bool) } PublisherIDStub func() livekit.ParticipantID publisherIDMutex sync.RWMutex @@ -557,10 +557,10 @@ func (fake *FakeSubscribedTrack) NeedsNegotiationReturnsOnCall(i int, result1 bo }{result1} } -func (fake *FakeSubscribedTrack) OnClose(arg1 func(willBeResumed bool)) { +func (fake *FakeSubscribedTrack) OnClose(arg1 func(isExpectedToResume bool)) { fake.onCloseMutex.Lock() fake.onCloseArgsForCall = append(fake.onCloseArgsForCall, struct { - arg1 func(willBeResumed bool) + arg1 func(isExpectedToResume bool) }{arg1}) stub := fake.OnCloseStub fake.recordInvocation("OnClose", []interface{}{arg1}) @@ -576,13 +576,13 @@ func (fake *FakeSubscribedTrack) OnCloseCallCount() int { return len(fake.onCloseArgsForCall) } -func (fake *FakeSubscribedTrack) OnCloseCalls(stub func(func(willBeResumed bool))) { +func (fake *FakeSubscribedTrack) OnCloseCalls(stub func(func(isExpectedToResume bool))) { fake.onCloseMutex.Lock() defer fake.onCloseMutex.Unlock() fake.OnCloseStub = stub } -func (fake *FakeSubscribedTrack) OnCloseArgsForCall(i int) func(willBeResumed bool) { +func (fake *FakeSubscribedTrack) OnCloseArgsForCall(i int) func(isExpectedToResume bool) { fake.onCloseMutex.RLock() defer fake.onCloseMutex.RUnlock() argsForCall := fake.onCloseArgsForCall[i] diff --git a/pkg/rtc/uptrackmanager.go b/pkg/rtc/uptrackmanager.go index b0fabdefa..22ea6b1ec 100644 --- a/pkg/rtc/uptrackmanager.go +++ b/pkg/rtc/uptrackmanager.go @@ -66,7 +66,7 @@ func NewUpTrackManager(params UpTrackManagerParams) *UpTrackManager { } } -func (u *UpTrackManager) Close(willBeResumed bool) { +func (u *UpTrackManager) Close(isExpectedToResume bool) { u.lock.Lock() if u.closed { u.lock.Unlock() @@ -80,7 +80,7 @@ func (u *UpTrackManager) Close(willBeResumed bool) { u.lock.Unlock() for _, t := range publishedTracks { - t.Close(willBeResumed) + t.Close(isExpectedToResume) } if onClose := u.getOnUpTrackManagerClose(); onClose != nil { @@ -274,7 +274,7 @@ func (u *UpTrackManager) AddPublishedTrack(track types.MediaTrack) { u.lock.Unlock() u.params.Logger.Debugw("added published track", "trackID", track.ID(), "trackInfo", logger.Proto(track.ToProto())) - track.AddOnClose(func() { + track.AddOnClose(func(_isExpectedToResume bool) { u.lock.Lock() delete(u.publishedTracks, track.ID()) // not modifying subscription permissions, will get reset on next update from participant @@ -282,11 +282,11 @@ func (u *UpTrackManager) AddPublishedTrack(track types.MediaTrack) { }) } -func (u *UpTrackManager) RemovePublishedTrack(track types.MediaTrack, willBeResumed bool, shouldClose bool) { +func (u *UpTrackManager) RemovePublishedTrack(track types.MediaTrack, isExpectedToResume bool, shouldClose bool) { if shouldClose { - track.Close(willBeResumed) + track.Close(isExpectedToResume) } else { - track.ClearAllReceivers(willBeResumed) + track.ClearAllReceivers(isExpectedToResume) } u.lock.Lock() delete(u.publishedTracks, track.ID()) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index f9d1f1358..170abadad 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -296,7 +296,7 @@ type DownTrack struct { onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat) onMaxSubscribedLayerChanged func(dt *DownTrack, layer int32) onRttUpdate func(dt *DownTrack, rtt uint32) - onCloseHandler func(willBeResumed bool) + onCloseHandler func(isExpectedToResume bool) createdAt int64 } @@ -1169,14 +1169,14 @@ func (d *DownTrack) UpTrackBitrateReport(availableLayers []int32, bitrates Bitra } // OnCloseHandler method to be called on remote tracked removed -func (d *DownTrack) OnCloseHandler(fn func(willBeResumed bool)) { +func (d *DownTrack) OnCloseHandler(fn func(isExpectedToResume bool)) { d.cbMu.Lock() defer d.cbMu.Unlock() d.onCloseHandler = fn } -func (d *DownTrack) getOnCloseHandler() func(willBeResumed bool) { +func (d *DownTrack) getOnCloseHandler() func(isExpectedToResume bool) { d.cbMu.RLock() defer d.cbMu.RUnlock()