From add99626559ea6e54cdcb3ddcbaeb48a5364e1dc Mon Sep 17 00:00:00 2001 From: David Zhao Date: Fri, 3 Feb 2023 01:06:04 -0800 Subject: [PATCH] Avoid triggering subscription failed handler unnecessarily. (#1379) Certain errors are not at fault of the subscriber. For these errors the reconciler should keep trying instead of giving up. --- pkg/rtc/mediatrackreceiver.go | 2 +- pkg/rtc/subscriptionmanager.go | 31 +++++++++++++++++++++++-------- pkg/rtc/transport.go | 4 ++-- 3 files changed, 26 insertions(+), 11 deletions(-) diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 0ab257ca6..5eb0a5a66 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -152,7 +152,7 @@ func (t *MediaTrackReceiver) SetupReceiver(receiver sfu.TrackReceiver, priority return } - // codec postion maybe taked by DumbReceiver, check and upgrade to WebRTCReceiver + // codec position maybe taken by DummyReceiver, check and upgrade to WebRTCReceiver var upgradeReceiver bool for _, r := range t.receivers { if strings.EqualFold(r.Codec().MimeType, receiver.Codec().MimeType) { diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index 2ecfe366b..381a3458c 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -38,7 +38,7 @@ var ( // ensuring this is longer than iceFailedTimeout so we are certain the participant won't return notFoundTimeout = iceFailedTimeout // amount of time to try otherwise before flagging subscription as failed - subscriptionTimeout = 20 * time.Second + subscriptionTimeout = iceFailedTimeout ) type SubscriptionManagerParams struct { @@ -284,16 +284,21 @@ func (m *SubscriptionManager) reconcileSubscription(s *trackSubscription) { s.recordAttempt(false) switch err { - case ErrNoTrackPermission, ErrNoSubscribePermission: - // retry permission errors forever, since it's outside of our control and publisher could - // grant any time - // however, we'll still log an event to reflect this in telemetry + case ErrNoTrackPermission, ErrNoSubscribePermission, ErrNoReceiver, ErrNotOpen, ErrTrackNotAttached: + // these are errors that are outside of our control, so we'll keep trying + // - ErrNoTrackPermission: publisher did not grant subscriber permission, may change any moment + // - ErrNoSubscribePermission: participant was not granted canSubscribe, may change any moment + // - ErrNoReceiver: Track is in the process of closing (another local track published to the same instance) + // - ErrTrackNotAttached: Remote Track that is not attached, but may be attached later + // - ErrNotOpen: Track is closing or already closed + // We'll still log an event to reflect this in telemetry since it's been too long if s.durationSinceStart() > subscriptionTimeout { s.maybeRecordError(m.params.Telemetry, m.params.Participant.ID(), err, true) } case ErrPublisherNotConnected, ErrTrackNotFound: - // publisher left or track was unpublished, if after timeout, we'd unsubscribe - // from it. this is the *only* case we'd change desired state + // publisher left or source track was never published or closed + // if after timeout, we'd unsubscribe from it. + // this is the *only* case we'd change desired state if s.durationSinceStart() > notFoundTimeout { s.maybeRecordError(m.params.Telemetry, m.params.Participant.ID(), err, true) s.logger.Infow("unsubscribing track since track isn't available", "error", err) @@ -338,6 +343,8 @@ func (m *SubscriptionManager) reconcileSubscription(s *trackSubscription) { if s.needsBind() { // check bound status, notify error callback if it's not bound + // if a publisher leaves or closes the source track, SubscribedTrack will be closed as well and it will go + // back to needsSubscribe state if s.durationSinceStart() > subscriptionTimeout { s.logger.Errorw("track not bound after timeout", nil) s.maybeRecordError(m.params.Telemetry, m.params.Participant.ID(), ErrTrackNotBound, false) @@ -565,7 +572,10 @@ type trackSubscription struct { eventSent atomic.Bool numAttempts atomic.Int32 bound bool - subStartedAt atomic.Pointer[time.Time] + + // the later of when subscription was requested or when the first failure was encountered + // this timestamp determines when failures are reported + subStartedAt atomic.Pointer[time.Time] } func newTrackSubscription(subscriberID livekit.ParticipantID, trackID livekit.TrackID, l logger.Logger) *trackSubscription { @@ -714,6 +724,11 @@ func (s *trackSubscription) isBound() bool { func (s *trackSubscription) recordAttempt(success bool) { if !success { + if s.numAttempts.Load() == 0 { + // on first failure, we'd want to start the timer + t := time.Now() + s.subStartedAt.Store(&t) + } s.numAttempts.Add(1) } else { s.numAttempts.Store(0) diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 6a714efee..75f4210e5 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -1117,7 +1117,7 @@ func (t *PCTransport) preparePC(previousAnswer webrtc.SessionDescription) error } // replace client's fingerprint into dump pc's answer, for pion's dtls process, it will - // keep the fingerprint at first call of SetRemoteDescription, if dumb pc and client pc use + // keep the fingerprint at first call of SetRemoteDescription, if dummy pc and client pc use // different fingerprint, that will cause pion denied dtls data after handshake with client // complete (can't pass fingerprint change). // in this step, we don't established connection with dump pc(no candidate swap), just use @@ -1161,7 +1161,7 @@ func (t *PCTransport) initPCWithPreviousAnswer(previousAnswer webrtc.SessionDesc // for pion generate unmatched sdp, it always appends data channel to last m-lines, // that not consistent with our previous answer that data channel might at middle-line // because sdp can negotiate multi times before migration.(it will sticky to the last m-line atfirst negotiate) - // so use a dumb pc to negotiate sdp to fixed the datachannel's mid at same position with previous answer + // so use a dummy pc to negotiate sdp to fixed the datachannel's mid at same position with previous answer if err := t.preparePC(previousAnswer); err != nil { t.params.Logger.Errorw("prepare pc for migration failed", err) return senders, err