mirror of
https://github.com/livekit/livekit.git
synced 2026-05-24 10:15:28 +00:00
Fix race condition with unsubscribing from a republished track (#1429)
This commit is contained in:
@@ -38,7 +38,8 @@ var (
|
||||
// ensuring this is longer than iceFailedTimeout so we are certain the participant won't return
|
||||
notFoundTimeout = iceFailedTimeout
|
||||
// amount of time to try otherwise before flagging subscription as failed
|
||||
subscriptionTimeout = iceFailedTimeout
|
||||
subscriptionTimeout = iceFailedTimeout
|
||||
trackRemoveGracePeriod = time.Second
|
||||
)
|
||||
|
||||
type SubscriptionManagerParams struct {
|
||||
@@ -405,9 +406,7 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error {
|
||||
// do not unsubscribe, track is still available
|
||||
return
|
||||
}
|
||||
// source track removed, we would unsubscribe
|
||||
s.logger.Debugw("unsubscribing track since source track was removed")
|
||||
s.setDesired(false)
|
||||
s.handleSourceTrackRemoved()
|
||||
})
|
||||
}
|
||||
|
||||
@@ -610,6 +609,8 @@ func (s *trackSubscription) getPublisherID() livekit.ParticipantID {
|
||||
|
||||
func (s *trackSubscription) setDesired(desired bool) bool {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
if desired {
|
||||
// as long as user explicitly set it to desired
|
||||
// we'll reset the timer so it has sufficient time to reconcile
|
||||
@@ -618,19 +619,17 @@ func (s *trackSubscription) setDesired(desired bool) bool {
|
||||
}
|
||||
|
||||
if s.desired == desired {
|
||||
s.lock.Unlock()
|
||||
return false
|
||||
}
|
||||
s.desired = desired
|
||||
s.lock.Unlock()
|
||||
|
||||
// when no longer desired, we no longer care about change notifications
|
||||
if desired {
|
||||
// reset attempts
|
||||
s.numAttempts.Store(0)
|
||||
} else {
|
||||
s.setChangedNotifier(nil)
|
||||
s.setRemovedNotifier(nil)
|
||||
s.setChangedNotifierLocked(nil)
|
||||
s.setRemovedNotifierLocked(nil)
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -683,34 +682,41 @@ func (s *trackSubscription) getSubscribedTrack() types.SubscribedTrack {
|
||||
|
||||
func (s *trackSubscription) setChangedNotifier(notifier types.ChangeNotifier) bool {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
return s.setChangedNotifierLocked(notifier)
|
||||
}
|
||||
|
||||
func (s *trackSubscription) setChangedNotifierLocked(notifier types.ChangeNotifier) bool {
|
||||
if s.changedNotifier == notifier {
|
||||
s.lock.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
existing := s.changedNotifier
|
||||
s.changedNotifier = notifier
|
||||
s.lock.Unlock()
|
||||
|
||||
if existing != nil {
|
||||
existing.RemoveObserver(string(s.subscriberID))
|
||||
go existing.RemoveObserver(string(s.subscriberID))
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *trackSubscription) setRemovedNotifier(notifier types.ChangeNotifier) bool {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
return s.setRemovedNotifierLocked(notifier)
|
||||
}
|
||||
|
||||
func (s *trackSubscription) setRemovedNotifierLocked(notifier types.ChangeNotifier) bool {
|
||||
if s.removedNotifier == notifier {
|
||||
s.lock.Unlock()
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
existing := s.removedNotifier
|
||||
s.removedNotifier = notifier
|
||||
s.lock.Unlock()
|
||||
|
||||
if existing != nil {
|
||||
existing.RemoveObserver(string(s.subscriberID))
|
||||
go existing.RemoveObserver(string(s.subscriberID))
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -751,6 +757,27 @@ func (s *trackSubscription) recordAttempt(success bool) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *trackSubscription) handleSourceTrackRemoved() {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
startedAt := s.subStartedAt.Load()
|
||||
if startedAt == nil || time.Since(*startedAt) < trackRemoveGracePeriod {
|
||||
// to prevent race conditions, if we've recently been asked to subscribe to a track
|
||||
// ignore when source was removed. reconciler will take care of it eventually
|
||||
// this would address the case when a track was unpublished and republished immediately
|
||||
// it's possible for another caller to call setDesired(true) for the republished track before
|
||||
// handleSourceTrackRemoved is called on the previously unpublished track
|
||||
return
|
||||
}
|
||||
|
||||
// source track removed, we would unsubscribe
|
||||
s.logger.Debugw("unsubscribing track since source track was removed")
|
||||
s.desired = false
|
||||
|
||||
s.setChangedNotifierLocked(nil)
|
||||
s.setRemovedNotifierLocked(nil)
|
||||
}
|
||||
|
||||
func (s *trackSubscription) maybeRecordError(ts telemetry.TelemetryService, pID livekit.ParticipantID, err error, isUserError bool) {
|
||||
if s.eventSent.Swap(true) {
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user