Use active at time to check for track not bound timeout. (#4206)

This commit is contained in:
Raja Subramanian
2025-12-29 20:32:08 +05:30
committed by GitHub
parent 3606ce542f
commit 08793bea89
4 changed files with 79 additions and 4 deletions
+8
View File
@@ -569,6 +569,14 @@ func (p *ParticipantImpl) ConnectedAt() time.Time {
return p.connectedAt
}
func (p *ParticipantImpl) ActiveAt() time.Time {
if activeAt := p.lastActiveAt.Load(); activeAt != nil {
return *activeAt
}
return time.Time{}
}
func (p *ParticipantImpl) GetClientInfo() *livekit.ClientInfo {
p.lock.RLock()
defer p.lock.RUnlock()
+7 -4
View File
@@ -548,10 +548,13 @@ func (m *SubscriptionManager) reconcileSubscription(s *mediaTrackSubscription) {
// check bound status, notify error callback if it's not bound
// if a publisher leaves or closes the source track, SubscribedTrack will be closed as well and it will go
// back to needsSubscribe state
if m.params.Participant.IsReady() && s.durationSinceStart() > subscriptionTimeout {
s.logger.Warnw("track not bound after timeout", nil)
s.maybeRecordError(m.params.Telemetry, s.subscriberID, ErrTrackNotBound, false)
m.params.OnSubscriptionError(s.trackID, true, ErrTrackNotBound)
if activeAt := m.params.Participant.ActiveAt(); !activeAt.IsZero() {
wait := min(time.Since(activeAt), s.durationSinceStart())
if wait > subscriptionTimeout {
s.logger.Warnw("track not bound after timeout", nil)
s.maybeRecordError(m.params.Telemetry, s.subscriberID, ErrTrackNotBound, false)
m.params.OnSubscriptionError(s.trackID, true, ErrTrackNotBound)
}
}
}
+1
View File
@@ -384,6 +384,7 @@ type LocalParticipant interface {
SupportsTransceiverReuse() bool
IsUsingSinglePeerConnection() bool
IsReady() bool
ActiveAt() time.Time
Disconnected() <-chan struct{}
IsIdle() bool
SubscriberAsPrimary() bool
@@ -23,6 +23,16 @@ import (
)
type FakeLocalParticipant struct {
ActiveAtStub func() time.Time
activeAtMutex sync.RWMutex
activeAtArgsForCall []struct {
}
activeAtReturns struct {
result1 time.Time
}
activeAtReturnsOnCall map[int]struct {
result1 time.Time
}
AddOnCloseStub func(string, func(types.LocalParticipant))
addOnCloseMutex sync.RWMutex
addOnCloseArgsForCall []struct {
@@ -1489,6 +1499,59 @@ type FakeLocalParticipant struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeLocalParticipant) ActiveAt() time.Time {
fake.activeAtMutex.Lock()
ret, specificReturn := fake.activeAtReturnsOnCall[len(fake.activeAtArgsForCall)]
fake.activeAtArgsForCall = append(fake.activeAtArgsForCall, struct {
}{})
stub := fake.ActiveAtStub
fakeReturns := fake.activeAtReturns
fake.recordInvocation("ActiveAt", []interface{}{})
fake.activeAtMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) ActiveAtCallCount() int {
fake.activeAtMutex.RLock()
defer fake.activeAtMutex.RUnlock()
return len(fake.activeAtArgsForCall)
}
func (fake *FakeLocalParticipant) ActiveAtCalls(stub func() time.Time) {
fake.activeAtMutex.Lock()
defer fake.activeAtMutex.Unlock()
fake.ActiveAtStub = stub
}
func (fake *FakeLocalParticipant) ActiveAtReturns(result1 time.Time) {
fake.activeAtMutex.Lock()
defer fake.activeAtMutex.Unlock()
fake.ActiveAtStub = nil
fake.activeAtReturns = struct {
result1 time.Time
}{result1}
}
func (fake *FakeLocalParticipant) ActiveAtReturnsOnCall(i int, result1 time.Time) {
fake.activeAtMutex.Lock()
defer fake.activeAtMutex.Unlock()
fake.ActiveAtStub = nil
if fake.activeAtReturnsOnCall == nil {
fake.activeAtReturnsOnCall = make(map[int]struct {
result1 time.Time
})
}
fake.activeAtReturnsOnCall[i] = struct {
result1 time.Time
}{result1}
}
func (fake *FakeLocalParticipant) AddOnClose(arg1 string, arg2 func(types.LocalParticipant)) {
fake.addOnCloseMutex.Lock()
fake.addOnCloseArgsForCall = append(fake.addOnCloseArgsForCall, struct {