diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 149b1e496..bdb515e67 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -569,6 +569,14 @@ func (p *ParticipantImpl) ConnectedAt() time.Time { return p.connectedAt } +func (p *ParticipantImpl) ActiveAt() time.Time { + if activeAt := p.lastActiveAt.Load(); activeAt != nil { + return *activeAt + } + + return time.Time{} +} + func (p *ParticipantImpl) GetClientInfo() *livekit.ClientInfo { p.lock.RLock() defer p.lock.RUnlock() diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index d755e5f1f..8a5d07076 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -548,10 +548,13 @@ func (m *SubscriptionManager) reconcileSubscription(s *mediaTrackSubscription) { // check bound status, notify error callback if it's not bound // if a publisher leaves or closes the source track, SubscribedTrack will be closed as well and it will go // back to needsSubscribe state - if m.params.Participant.IsReady() && s.durationSinceStart() > subscriptionTimeout { - s.logger.Warnw("track not bound after timeout", nil) - s.maybeRecordError(m.params.Telemetry, s.subscriberID, ErrTrackNotBound, false) - m.params.OnSubscriptionError(s.trackID, true, ErrTrackNotBound) + if activeAt := m.params.Participant.ActiveAt(); !activeAt.IsZero() { + wait := min(time.Since(activeAt), s.durationSinceStart()) + if wait > subscriptionTimeout { + s.logger.Warnw("track not bound after timeout", nil) + s.maybeRecordError(m.params.Telemetry, s.subscriberID, ErrTrackNotBound, false) + m.params.OnSubscriptionError(s.trackID, true, ErrTrackNotBound) + } } } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 3e7cce68a..5280b7498 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -384,6 +384,7 @@ type LocalParticipant interface { SupportsTransceiverReuse() bool IsUsingSinglePeerConnection() bool IsReady() bool + ActiveAt() time.Time Disconnected() <-chan struct{} IsIdle() bool SubscriberAsPrimary() bool diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 48392b454..511b7f444 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -23,6 +23,16 @@ import ( ) type FakeLocalParticipant struct { + ActiveAtStub func() time.Time + activeAtMutex sync.RWMutex + activeAtArgsForCall []struct { + } + activeAtReturns struct { + result1 time.Time + } + activeAtReturnsOnCall map[int]struct { + result1 time.Time + } AddOnCloseStub func(string, func(types.LocalParticipant)) addOnCloseMutex sync.RWMutex addOnCloseArgsForCall []struct { @@ -1489,6 +1499,59 @@ type FakeLocalParticipant struct { invocationsMutex sync.RWMutex } +func (fake *FakeLocalParticipant) ActiveAt() time.Time { + fake.activeAtMutex.Lock() + ret, specificReturn := fake.activeAtReturnsOnCall[len(fake.activeAtArgsForCall)] + fake.activeAtArgsForCall = append(fake.activeAtArgsForCall, struct { + }{}) + stub := fake.ActiveAtStub + fakeReturns := fake.activeAtReturns + fake.recordInvocation("ActiveAt", []interface{}{}) + fake.activeAtMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) ActiveAtCallCount() int { + fake.activeAtMutex.RLock() + defer fake.activeAtMutex.RUnlock() + return len(fake.activeAtArgsForCall) +} + +func (fake *FakeLocalParticipant) ActiveAtCalls(stub func() time.Time) { + fake.activeAtMutex.Lock() + defer fake.activeAtMutex.Unlock() + fake.ActiveAtStub = stub +} + +func (fake *FakeLocalParticipant) ActiveAtReturns(result1 time.Time) { + fake.activeAtMutex.Lock() + defer fake.activeAtMutex.Unlock() + fake.ActiveAtStub = nil + fake.activeAtReturns = struct { + result1 time.Time + }{result1} +} + +func (fake *FakeLocalParticipant) ActiveAtReturnsOnCall(i int, result1 time.Time) { + fake.activeAtMutex.Lock() + defer fake.activeAtMutex.Unlock() + fake.ActiveAtStub = nil + if fake.activeAtReturnsOnCall == nil { + fake.activeAtReturnsOnCall = make(map[int]struct { + result1 time.Time + }) + } + fake.activeAtReturnsOnCall[i] = struct { + result1 time.Time + }{result1} +} + func (fake *FakeLocalParticipant) AddOnClose(arg1 string, arg2 func(types.LocalParticipant)) { fake.addOnCloseMutex.Lock() fake.addOnCloseArgsForCall = append(fake.addOnCloseArgsForCall, struct {