From c16eb6692521c39d2fcfebc329dcaba4f93419c0 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Thu, 16 Feb 2023 15:11:23 -0800 Subject: [PATCH] Fix race condition with unsubscribing from a republished track (#1429) --- pkg/rtc/subscriptionmanager.go | 55 +++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index f3dd627fa..ba00eb1d8 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -38,7 +38,8 @@ var ( // ensuring this is longer than iceFailedTimeout so we are certain the participant won't return notFoundTimeout = iceFailedTimeout // amount of time to try otherwise before flagging subscription as failed - subscriptionTimeout = iceFailedTimeout + subscriptionTimeout = iceFailedTimeout + trackRemoveGracePeriod = time.Second ) type SubscriptionManagerParams struct { @@ -405,9 +406,7 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error { // do not unsubscribe, track is still available return } - // source track removed, we would unsubscribe - s.logger.Debugw("unsubscribing track since source track was removed") - s.setDesired(false) + s.handleSourceTrackRemoved() }) } @@ -610,6 +609,8 @@ func (s *trackSubscription) getPublisherID() livekit.ParticipantID { func (s *trackSubscription) setDesired(desired bool) bool { s.lock.Lock() + defer s.lock.Unlock() + if desired { // as long as user explicitly set it to desired // we'll reset the timer so it has sufficient time to reconcile @@ -618,19 +619,17 @@ func (s *trackSubscription) setDesired(desired bool) bool { } if s.desired == desired { - s.lock.Unlock() return false } s.desired = desired - s.lock.Unlock() // when no longer desired, we no longer care about change notifications if desired { // reset attempts s.numAttempts.Store(0) } else { - s.setChangedNotifier(nil) - s.setRemovedNotifier(nil) + s.setChangedNotifierLocked(nil) + s.setRemovedNotifierLocked(nil) } return true } @@ -683,34 +682,41 @@ func (s *trackSubscription) getSubscribedTrack() types.SubscribedTrack { func (s *trackSubscription) setChangedNotifier(notifier types.ChangeNotifier) bool { s.lock.Lock() + defer s.lock.Unlock() + return s.setChangedNotifierLocked(notifier) +} + +func (s *trackSubscription) setChangedNotifierLocked(notifier types.ChangeNotifier) bool { if s.changedNotifier == notifier { - s.lock.Unlock() return false } existing := s.changedNotifier s.changedNotifier = notifier - s.lock.Unlock() if existing != nil { - existing.RemoveObserver(string(s.subscriberID)) + go existing.RemoveObserver(string(s.subscriberID)) } return true } func (s *trackSubscription) setRemovedNotifier(notifier types.ChangeNotifier) bool { s.lock.Lock() + defer s.lock.Unlock() + return s.setRemovedNotifierLocked(notifier) +} + +func (s *trackSubscription) setRemovedNotifierLocked(notifier types.ChangeNotifier) bool { if s.removedNotifier == notifier { - s.lock.Unlock() + return false } existing := s.removedNotifier s.removedNotifier = notifier - s.lock.Unlock() if existing != nil { - existing.RemoveObserver(string(s.subscriberID)) + go existing.RemoveObserver(string(s.subscriberID)) } return true } @@ -751,6 +757,27 @@ func (s *trackSubscription) recordAttempt(success bool) { } } +func (s *trackSubscription) handleSourceTrackRemoved() { + s.lock.Lock() + defer s.lock.Unlock() + startedAt := s.subStartedAt.Load() + if startedAt == nil || time.Since(*startedAt) < trackRemoveGracePeriod { + // to prevent race conditions, if we've recently been asked to subscribe to a track + // ignore when source was removed. reconciler will take care of it eventually + // this would address the case when a track was unpublished and republished immediately + // it's possible for another caller to call setDesired(true) for the republished track before + // handleSourceTrackRemoved is called on the previously unpublished track + return + } + + // source track removed, we would unsubscribe + s.logger.Debugw("unsubscribing track since source track was removed") + s.desired = false + + s.setChangedNotifierLocked(nil) + s.setRemovedNotifierLocked(nil) +} + func (s *trackSubscription) maybeRecordError(ts telemetry.TelemetryService, pID livekit.ParticipantID, err error, isUserError bool) { if s.eventSent.Swap(true) { return