Indicate if track is expectd to be resumed in onClose callback. (#2800)

That is the main change. Changed variable name to `isExpectedToResume`
everywhere to be consistent.

Planning to use the callback value in relays to determine if the down
track should be closed or switched to a different up track.
This commit is contained in:
Raja Subramanian
2024-06-17 23:51:00 +05:30
committed by GitHub
parent aa72466ac8
commit ef838e4fa2
13 changed files with 99 additions and 97 deletions

View File

@@ -411,13 +411,13 @@ func (t *MediaTrack) Restart() {
}
}
func (t *MediaTrack) Close(willBeResumed bool) {
func (t *MediaTrack) Close(isExpectedToResume bool) {
t.MediaTrackReceiver.SetClosing()
if t.dynacastManager != nil {
t.dynacastManager.Close()
}
t.MediaTrackReceiver.ClearAllReceivers(willBeResumed)
t.MediaTrackReceiver.Close()
t.MediaTrackReceiver.ClearAllReceivers(isExpectedToResume)
t.MediaTrackReceiver.Close(isExpectedToResume)
}
func (t *MediaTrack) SetMuted(muted bool) {

View File

@@ -97,16 +97,16 @@ type MediaTrackReceiverParams struct {
type MediaTrackReceiver struct {
params MediaTrackReceiverParams
lock sync.RWMutex
receivers []*simulcastReceiver
trackInfo *livekit.TrackInfo
potentialCodecs []webrtc.RTPCodecParameters
state mediaTrackReceiverState
willBeResumed bool
lock sync.RWMutex
receivers []*simulcastReceiver
trackInfo *livekit.TrackInfo
potentialCodecs []webrtc.RTPCodecParameters
state mediaTrackReceiverState
isExpectedToResume bool
onSetupReceiver func(mime string)
onMediaLossFeedback func(dt *sfu.DownTrack, report *rtcp.ReceiverReport)
onClose []func()
onClose []func(isExpectedToResume bool)
*MediaTrackSubscriptions
}
@@ -258,7 +258,7 @@ func (t *MediaTrackReceiver) SetPotentialCodecs(codecs []webrtc.RTPCodecParamete
t.lock.Unlock()
}
func (t *MediaTrackReceiver) ClearReceiver(mime string, willBeResumed bool) {
func (t *MediaTrackReceiver) ClearReceiver(mime string, isExpectedToResume bool) {
t.lock.Lock()
receivers := slices.Clone(t.receivers)
for idx, receiver := range receivers {
@@ -272,20 +272,20 @@ func (t *MediaTrackReceiver) ClearReceiver(mime string, willBeResumed bool) {
t.receivers = receivers
t.lock.Unlock()
t.removeAllSubscribersForMime(mime, willBeResumed)
t.removeAllSubscribersForMime(mime, isExpectedToResume)
}
func (t *MediaTrackReceiver) ClearAllReceivers(willBeResumed bool) {
func (t *MediaTrackReceiver) ClearAllReceivers(isExpectedToResume bool) {
t.params.Logger.Debugw("clearing all receivers")
t.lock.Lock()
receivers := t.receivers
t.receivers = nil
t.willBeResumed = willBeResumed
t.isExpectedToResume = isExpectedToResume
t.lock.Unlock()
for _, r := range receivers {
t.removeAllSubscribersForMime(r.Codec().MimeType, willBeResumed)
t.removeAllSubscribersForMime(r.Codec().MimeType, isExpectedToResume)
}
}
@@ -332,16 +332,18 @@ func (t *MediaTrackReceiver) TryClose() bool {
numActiveReceivers++
}
}
isExpectedToResume := t.isExpectedToResume
t.lock.RUnlock()
if numActiveReceivers != 0 {
return false
}
t.Close()
t.Close(isExpectedToResume)
return true
}
func (t *MediaTrackReceiver) Close() {
func (t *MediaTrackReceiver) Close(isExpectedToResume bool) {
t.lock.Lock()
if t.state == mediaTrackReceiverStateClosed {
t.lock.Unlock()
@@ -353,7 +355,7 @@ func (t *MediaTrackReceiver) Close() {
t.lock.Unlock()
for _, f := range onclose {
f()
f(isExpectedToResume)
}
}
@@ -437,7 +439,7 @@ func (t *MediaTrackReceiver) SetMuted(muted bool) {
t.MediaTrackSubscriptions.SetMuted(muted)
}
func (t *MediaTrackReceiver) AddOnClose(f func()) {
func (t *MediaTrackReceiver) AddOnClose(f func(isExpectedToResume bool)) {
if f == nil {
return
}
@@ -499,16 +501,16 @@ func (t *MediaTrackReceiver) AddSubscriber(sub types.LocalParticipant) (types.Su
// media track could have been closed while adding subscription
remove := false
willBeResumed := false
isExpectedToResume := false
t.lock.RLock()
if t.state != mediaTrackReceiverStateOpen {
willBeResumed = t.willBeResumed
isExpectedToResume = t.isExpectedToResume
remove = true
}
t.lock.RUnlock()
if remove {
_ = t.MediaTrackSubscriptions.RemoveSubscriber(sub.ID(), willBeResumed)
_ = t.MediaTrackSubscriptions.RemoveSubscriber(sub.ID(), isExpectedToResume)
return nil, ErrNotOpen
}
@@ -517,14 +519,14 @@ func (t *MediaTrackReceiver) AddSubscriber(sub types.LocalParticipant) (types.Su
// RemoveSubscriber removes participant from subscription
// stop all forwarders to the client
func (t *MediaTrackReceiver) RemoveSubscriber(subscriberID livekit.ParticipantID, willBeResumed bool) {
_ = t.MediaTrackSubscriptions.RemoveSubscriber(subscriberID, willBeResumed)
func (t *MediaTrackReceiver) RemoveSubscriber(subscriberID livekit.ParticipantID, isExpectedToResume bool) {
_ = t.MediaTrackSubscriptions.RemoveSubscriber(subscriberID, isExpectedToResume)
}
func (t *MediaTrackReceiver) removeAllSubscribersForMime(mime string, willBeResumed bool) {
func (t *MediaTrackReceiver) removeAllSubscribersForMime(mime string, isExpectedToResume bool) {
t.params.Logger.Debugw("removing all subscribers for mime", "mime", mime)
for _, subscriberID := range t.MediaTrackSubscriptions.GetAllSubscribersForMime(mime) {
t.RemoveSubscriber(subscriberID, willBeResumed)
t.RemoveSubscriber(subscriberID, isExpectedToResume)
}
}

View File

@@ -296,8 +296,8 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
// But, the subscription could be removed early if the published track is closed
// while adding subscription. In those cases, subscription manager would not have set
// the `OnClose` callback. So, set it here to handle cases of early close.
subTrack.OnClose(func(willBeResumed bool) {
if !willBeResumed {
subTrack.OnClose(func(isExpectedToResume bool) {
if !isExpectedToResume {
if err := sub.RemoveTrackFromSubscriber(sender); err != nil {
t.params.Logger.Warnw("could not remove track from peer connection", err)
}
@@ -306,8 +306,8 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
downTrack.SetTransceiver(transceiver)
downTrack.OnCloseHandler(func(willBeResumed bool) {
go t.downTrackClosed(sub, willBeResumed)
downTrack.OnCloseHandler(func(isExpectedToResume bool) {
go t.downTrackClosed(sub, isExpectedToResume)
})
t.subscribedTracksMu.Lock()
@@ -319,24 +319,24 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
// RemoveSubscriber removes participant from subscription
// stop all forwarders to the client
func (t *MediaTrackSubscriptions) RemoveSubscriber(subscriberID livekit.ParticipantID, willBeResumed bool) error {
func (t *MediaTrackSubscriptions) RemoveSubscriber(subscriberID livekit.ParticipantID, isExpectedToResume bool) error {
subTrack := t.getSubscribedTrack(subscriberID)
if subTrack == nil {
return errNotFound
}
t.params.Logger.Debugw("removing subscriber", "subscriberID", subscriberID, "willBeResumed", willBeResumed)
t.closeSubscribedTrack(subTrack, willBeResumed)
t.params.Logger.Debugw("removing subscriber", "subscriberID", subscriberID, "isExpectedToResume", isExpectedToResume)
t.closeSubscribedTrack(subTrack, isExpectedToResume)
return nil
}
func (t *MediaTrackSubscriptions) closeSubscribedTrack(subTrack types.SubscribedTrack, willBeResumed bool) {
func (t *MediaTrackSubscriptions) closeSubscribedTrack(subTrack types.SubscribedTrack, isExpectedToResume bool) {
dt := subTrack.DownTrack()
if dt == nil {
return
}
if willBeResumed {
if isExpectedToResume {
dt.CloseWithFlush(false)
} else {
// flushing blocks, avoid blocking when publisher removes all its subscribers
@@ -418,7 +418,7 @@ func (t *MediaTrackSubscriptions) DebugInfo() []map[string]interface{} {
func (t *MediaTrackSubscriptions) downTrackClosed(
sub types.LocalParticipant,
willBeResumed bool,
isExpectedToResume bool,
) {
subscriberID := sub.ID()
t.subscribedTracksMu.RLock()
@@ -429,7 +429,7 @@ func (t *MediaTrackSubscriptions) downTrackClosed(
// Cache transceiver for potential re-use on resume.
// To ensure subscription manager does not re-subscribe before caching,
// delete the subscribed track only after caching.
if willBeResumed {
if isExpectedToResume {
dt := subTrack.DownTrack()
tr := dt.GetTransceiver()
if tr != nil {
@@ -442,6 +442,6 @@ func (t *MediaTrackSubscriptions) downTrackClosed(
delete(t.subscribedTracks, subscriberID)
t.subscribedTracksMu.Unlock()
subTrack.Close(willBeResumed)
subTrack.Close(isExpectedToResume)
}
}

View File

@@ -2125,7 +2125,7 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv
}
trackID := livekit.TrackID(ti.Sid)
mt.AddOnClose(func() {
mt.AddOnClose(func(_isExpectedToRsume bool) {
if p.supervisor != nil {
p.supervisor.ClearPublishedTrack(trackID, mt)
}

View File

@@ -139,9 +139,9 @@ func (t *SubscribedTrack) Bound(err error) {
}
// for DownTrack callback to notify us that it's closed
func (t *SubscribedTrack) Close(willBeResumed bool) {
func (t *SubscribedTrack) Close(isExpectedToResume bool) {
if onClose := t.onClose.Load(); onClose != nil {
go onClose.(func(bool))(willBeResumed)
go onClose.(func(bool))(isExpectedToResume)
}
}

View File

@@ -91,7 +91,7 @@ func NewSubscriptionManager(params SubscriptionManagerParams) *SubscriptionManag
return m
}
func (m *SubscriptionManager) Close(willBeResumed bool) {
func (m *SubscriptionManager) Close(isExpectedToResume bool) {
m.lock.Lock()
if m.isClosed() {
m.lock.Unlock()
@@ -113,7 +113,7 @@ func (m *SubscriptionManager) Close(willBeResumed bool) {
}
}
if willBeResumed {
if isExpectedToResume {
for _, dt := range downTracksToClose {
dt.CloseWithFlush(false)
}
@@ -523,8 +523,8 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error {
)
}
if err == nil && subTrack != nil { // subTrack could be nil if already subscribed
subTrack.OnClose(func(willBeResumed bool) {
m.handleSubscribedTrackClose(s, willBeResumed)
subTrack.OnClose(func(isExpectedToResume bool) {
m.handleSubscribedTrackClose(s, isExpectedToResume)
})
subTrack.AddOnBind(func(err error) {
if err != nil {
@@ -615,10 +615,10 @@ func (m *SubscriptionManager) handleSourceTrackRemoved(trackID livekit.TrackID)
// - subscriber-initiated unsubscribe
// - UpTrack was closed
// - publisher revoked permissions for the participant
func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, willBeResumed bool) {
func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, isExpectedToResume bool) {
s.logger.Debugw(
"subscribed track closed",
"willBeResumed", willBeResumed,
"isExpectedToResume", isExpectedToResume,
)
wasBound := s.isBound()
subTrack := s.getSubscribedTrack()
@@ -666,7 +666,7 @@ func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, w
context.Background(),
m.params.Participant.ID(),
&livekit.TrackInfo{Sid: string(s.trackID), Type: subTrack.MediaTrack().Kind()},
!willBeResumed,
!isExpectedToResume,
)
dt := subTrack.DownTrack()
@@ -684,7 +684,7 @@ func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, w
}
}
if !willBeResumed {
if !isExpectedToResume {
sender := subTrack.RTPSender()
if sender != nil {
s.logger.Debugw("removing PeerConnection track",

View File

@@ -214,11 +214,11 @@ func TestUnsubscribe(t *testing.T) {
st, err := res.Track.AddSubscriber(sm.params.Participant)
require.NoError(t, err)
s.subscribedTrack = st
st.OnClose(func(willBeResumed bool) {
sm.handleSubscribedTrackClose(s, willBeResumed)
st.OnClose(func(isExpectedToResume bool) {
sm.handleSubscribedTrackClose(s, isExpectedToResume)
})
res.Track.(*typesfakes.FakeMediaTrack).RemoveSubscriberCalls(func(pID livekit.ParticipantID, willBeResumed bool) {
setTestSubscribedTrackClosed(t, st, willBeResumed)
res.Track.(*typesfakes.FakeMediaTrack).RemoveSubscriberCalls(func(pID livekit.ParticipantID, isExpectedToResume bool) {
setTestSubscribedTrackClosed(t, st, isExpectedToResume)
})
sm.lock.Lock()
@@ -279,18 +279,18 @@ func TestSubscribeStatusChanged(t *testing.T) {
return !s1.needsSubscribe() && !s2.needsSubscribe()
}, subSettleTimeout, subCheckInterval, "track1 and track2 should be subscribed")
st1 := s1.getSubscribedTrack()
st1.OnClose(func(willBeResumed bool) {
sm.handleSubscribedTrackClose(s1, willBeResumed)
st1.OnClose(func(isExpectedToResume bool) {
sm.handleSubscribedTrackClose(s1, isExpectedToResume)
})
st2 := s2.getSubscribedTrack()
st2.OnClose(func(willBeResumed bool) {
sm.handleSubscribedTrackClose(s2, willBeResumed)
st2.OnClose(func(isExpectedToResume bool) {
sm.handleSubscribedTrackClose(s2, isExpectedToResume)
})
st1.MediaTrack().(*typesfakes.FakeMediaTrack).RemoveSubscriberCalls(func(pID livekit.ParticipantID, willBeResumed bool) {
setTestSubscribedTrackClosed(t, st1, willBeResumed)
st1.MediaTrack().(*typesfakes.FakeMediaTrack).RemoveSubscriberCalls(func(pID livekit.ParticipantID, isExpectedToResume bool) {
setTestSubscribedTrackClosed(t, st1, isExpectedToResume)
})
st2.MediaTrack().(*typesfakes.FakeMediaTrack).RemoveSubscriberCalls(func(pID livekit.ParticipantID, willBeResumed bool) {
setTestSubscribedTrackClosed(t, st2, willBeResumed)
st2.MediaTrack().(*typesfakes.FakeMediaTrack).RemoveSubscriberCalls(func(pID livekit.ParticipantID, isExpectedToResume bool) {
setTestSubscribedTrackClosed(t, st2, isExpectedToResume)
})
require.Eventually(t, func() bool {
@@ -533,9 +533,9 @@ func setTestSubscribedTrackBound(t *testing.T, st types.SubscribedTrack) {
}
}
func setTestSubscribedTrackClosed(t *testing.T, st types.SubscribedTrack, willBeResumed bool) {
func setTestSubscribedTrackClosed(t *testing.T, st types.SubscribedTrack, isExpectedToResume bool) {
fst, ok := st.(*typesfakes.FakeSubscribedTrack)
require.True(t, ok)
fst.OnCloseArgsForCall(0)(willBeResumed)
fst.OnCloseArgsForCall(0)(isExpectedToResume)
}

View File

@@ -261,7 +261,7 @@ type Participant interface {
IsPublisher() bool
GetPublishedTrack(trackID livekit.TrackID) MediaTrack
GetPublishedTracks() []MediaTrack
RemovePublishedTrack(track MediaTrack, willBeResumed bool, shouldClose bool)
RemovePublishedTrack(track MediaTrack, isExpectedToResume bool, shouldClose bool)
GetAudioLevel() (smoothedLevel float64, active bool)
@@ -466,15 +466,15 @@ type MediaTrack interface {
GetAudioLevel() (level float64, active bool)
Close(willBeResumed bool)
Close(isExpectedToResume bool)
IsOpen() bool
// callbacks
AddOnClose(func())
AddOnClose(func(isExpectedToResume bool))
// subscribers
AddSubscriber(participant LocalParticipant) (SubscribedTrack, error)
RemoveSubscriber(participantID livekit.ParticipantID, willBeResumed bool)
RemoveSubscriber(participantID livekit.ParticipantID, isExpectedToResume bool)
IsSubscriber(subID livekit.ParticipantID) bool
RevokeDisallowedSubscribers(allowedSubscriberIdentities []livekit.ParticipantIdentity) []livekit.ParticipantIdentity
GetAllSubscribers() []livekit.ParticipantID
@@ -487,7 +487,7 @@ type MediaTrack interface {
GetTemporalLayerForSpatialFps(spatial int32, fps uint32, mime string) int32
Receivers() []sfu.TrackReceiver
ClearAllReceivers(willBeResumed bool)
ClearAllReceivers(isExpectedToResume bool)
IsEncrypted() bool
}
@@ -514,8 +514,8 @@ type LocalMediaTrack interface {
type SubscribedTrack interface {
AddOnBind(f func(error))
IsBound() bool
Close(willBeResumed bool)
OnClose(f func(willBeResumed bool))
Close(isExpectedToResume bool)
OnClose(f func(isExpectedToResume bool))
ID() livekit.TrackID
PublisherID() livekit.ParticipantID
PublisherIdentity() livekit.ParticipantIdentity

View File

@@ -10,10 +10,10 @@ import (
)
type FakeLocalMediaTrack struct {
AddOnCloseStub func(func())
AddOnCloseStub func(func(isExpectedToResume bool))
addOnCloseMutex sync.RWMutex
addOnCloseArgsForCall []struct {
arg1 func()
arg1 func(isExpectedToResume bool)
}
AddSubscriberStub func(types.LocalParticipant) (types.SubscribedTrack, error)
addSubscriberMutex sync.RWMutex
@@ -351,10 +351,10 @@ type FakeLocalMediaTrack struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeLocalMediaTrack) AddOnClose(arg1 func()) {
func (fake *FakeLocalMediaTrack) AddOnClose(arg1 func(isExpectedToResume bool)) {
fake.addOnCloseMutex.Lock()
fake.addOnCloseArgsForCall = append(fake.addOnCloseArgsForCall, struct {
arg1 func()
arg1 func(isExpectedToResume bool)
}{arg1})
stub := fake.AddOnCloseStub
fake.recordInvocation("AddOnClose", []interface{}{arg1})
@@ -370,13 +370,13 @@ func (fake *FakeLocalMediaTrack) AddOnCloseCallCount() int {
return len(fake.addOnCloseArgsForCall)
}
func (fake *FakeLocalMediaTrack) AddOnCloseCalls(stub func(func())) {
func (fake *FakeLocalMediaTrack) AddOnCloseCalls(stub func(func(isExpectedToResume bool))) {
fake.addOnCloseMutex.Lock()
defer fake.addOnCloseMutex.Unlock()
fake.AddOnCloseStub = stub
}
func (fake *FakeLocalMediaTrack) AddOnCloseArgsForCall(i int) func() {
func (fake *FakeLocalMediaTrack) AddOnCloseArgsForCall(i int) func(isExpectedToResume bool) {
fake.addOnCloseMutex.RLock()
defer fake.addOnCloseMutex.RUnlock()
argsForCall := fake.addOnCloseArgsForCall[i]

View File

@@ -10,10 +10,10 @@ import (
)
type FakeMediaTrack struct {
AddOnCloseStub func(func())
AddOnCloseStub func(func(isExpectedToResume bool))
addOnCloseMutex sync.RWMutex
addOnCloseArgsForCall []struct {
arg1 func()
arg1 func(isExpectedToResume bool)
}
AddSubscriberStub func(types.LocalParticipant) (types.SubscribedTrack, error)
addSubscriberMutex sync.RWMutex
@@ -287,10 +287,10 @@ type FakeMediaTrack struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeMediaTrack) AddOnClose(arg1 func()) {
func (fake *FakeMediaTrack) AddOnClose(arg1 func(isExpectedToResume bool)) {
fake.addOnCloseMutex.Lock()
fake.addOnCloseArgsForCall = append(fake.addOnCloseArgsForCall, struct {
arg1 func()
arg1 func(isExpectedToResume bool)
}{arg1})
stub := fake.AddOnCloseStub
fake.recordInvocation("AddOnClose", []interface{}{arg1})
@@ -306,13 +306,13 @@ func (fake *FakeMediaTrack) AddOnCloseCallCount() int {
return len(fake.addOnCloseArgsForCall)
}
func (fake *FakeMediaTrack) AddOnCloseCalls(stub func(func())) {
func (fake *FakeMediaTrack) AddOnCloseCalls(stub func(func(isExpectedToResume bool))) {
fake.addOnCloseMutex.Lock()
defer fake.addOnCloseMutex.Unlock()
fake.AddOnCloseStub = stub
}
func (fake *FakeMediaTrack) AddOnCloseArgsForCall(i int) func() {
func (fake *FakeMediaTrack) AddOnCloseArgsForCall(i int) func(isExpectedToResume bool) {
fake.addOnCloseMutex.RLock()
defer fake.addOnCloseMutex.RUnlock()
argsForCall := fake.addOnCloseArgsForCall[i]

View File

@@ -81,10 +81,10 @@ type FakeSubscribedTrack struct {
needsNegotiationReturnsOnCall map[int]struct {
result1 bool
}
OnCloseStub func(func(willBeResumed bool))
OnCloseStub func(func(isExpectedToResume bool))
onCloseMutex sync.RWMutex
onCloseArgsForCall []struct {
arg1 func(willBeResumed bool)
arg1 func(isExpectedToResume bool)
}
PublisherIDStub func() livekit.ParticipantID
publisherIDMutex sync.RWMutex
@@ -557,10 +557,10 @@ func (fake *FakeSubscribedTrack) NeedsNegotiationReturnsOnCall(i int, result1 bo
}{result1}
}
func (fake *FakeSubscribedTrack) OnClose(arg1 func(willBeResumed bool)) {
func (fake *FakeSubscribedTrack) OnClose(arg1 func(isExpectedToResume bool)) {
fake.onCloseMutex.Lock()
fake.onCloseArgsForCall = append(fake.onCloseArgsForCall, struct {
arg1 func(willBeResumed bool)
arg1 func(isExpectedToResume bool)
}{arg1})
stub := fake.OnCloseStub
fake.recordInvocation("OnClose", []interface{}{arg1})
@@ -576,13 +576,13 @@ func (fake *FakeSubscribedTrack) OnCloseCallCount() int {
return len(fake.onCloseArgsForCall)
}
func (fake *FakeSubscribedTrack) OnCloseCalls(stub func(func(willBeResumed bool))) {
func (fake *FakeSubscribedTrack) OnCloseCalls(stub func(func(isExpectedToResume bool))) {
fake.onCloseMutex.Lock()
defer fake.onCloseMutex.Unlock()
fake.OnCloseStub = stub
}
func (fake *FakeSubscribedTrack) OnCloseArgsForCall(i int) func(willBeResumed bool) {
func (fake *FakeSubscribedTrack) OnCloseArgsForCall(i int) func(isExpectedToResume bool) {
fake.onCloseMutex.RLock()
defer fake.onCloseMutex.RUnlock()
argsForCall := fake.onCloseArgsForCall[i]

View File

@@ -66,7 +66,7 @@ func NewUpTrackManager(params UpTrackManagerParams) *UpTrackManager {
}
}
func (u *UpTrackManager) Close(willBeResumed bool) {
func (u *UpTrackManager) Close(isExpectedToResume bool) {
u.lock.Lock()
if u.closed {
u.lock.Unlock()
@@ -80,7 +80,7 @@ func (u *UpTrackManager) Close(willBeResumed bool) {
u.lock.Unlock()
for _, t := range publishedTracks {
t.Close(willBeResumed)
t.Close(isExpectedToResume)
}
if onClose := u.getOnUpTrackManagerClose(); onClose != nil {
@@ -274,7 +274,7 @@ func (u *UpTrackManager) AddPublishedTrack(track types.MediaTrack) {
u.lock.Unlock()
u.params.Logger.Debugw("added published track", "trackID", track.ID(), "trackInfo", logger.Proto(track.ToProto()))
track.AddOnClose(func() {
track.AddOnClose(func(_isExpectedToResume bool) {
u.lock.Lock()
delete(u.publishedTracks, track.ID())
// not modifying subscription permissions, will get reset on next update from participant
@@ -282,11 +282,11 @@ func (u *UpTrackManager) AddPublishedTrack(track types.MediaTrack) {
})
}
func (u *UpTrackManager) RemovePublishedTrack(track types.MediaTrack, willBeResumed bool, shouldClose bool) {
func (u *UpTrackManager) RemovePublishedTrack(track types.MediaTrack, isExpectedToResume bool, shouldClose bool) {
if shouldClose {
track.Close(willBeResumed)
track.Close(isExpectedToResume)
} else {
track.ClearAllReceivers(willBeResumed)
track.ClearAllReceivers(isExpectedToResume)
}
u.lock.Lock()
delete(u.publishedTracks, track.ID())

View File

@@ -296,7 +296,7 @@ type DownTrack struct {
onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat)
onMaxSubscribedLayerChanged func(dt *DownTrack, layer int32)
onRttUpdate func(dt *DownTrack, rtt uint32)
onCloseHandler func(willBeResumed bool)
onCloseHandler func(isExpectedToResume bool)
createdAt int64
}
@@ -1169,14 +1169,14 @@ func (d *DownTrack) UpTrackBitrateReport(availableLayers []int32, bitrates Bitra
}
// OnCloseHandler method to be called on remote tracked removed
func (d *DownTrack) OnCloseHandler(fn func(willBeResumed bool)) {
func (d *DownTrack) OnCloseHandler(fn func(isExpectedToResume bool)) {
d.cbMu.Lock()
defer d.cbMu.Unlock()
d.onCloseHandler = fn
}
func (d *DownTrack) getOnCloseHandler() func(willBeResumed bool) {
func (d *DownTrack) getOnCloseHandler() func(isExpectedToResume bool) {
d.cbMu.RLock()
defer d.cbMu.RUnlock()