diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 81994bc54..953071f34 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -265,6 +265,10 @@ func (p *ParticipantImpl) IsReady() bool { return state == livekit.ParticipantInfo_JOINED || state == livekit.ParticipantInfo_ACTIVE } +func (p *ParticipantImpl) IsDisconnected() bool { + return p.State() == livekit.ParticipantInfo_DISCONNECTED +} + func (p *ParticipantImpl) ConnectedAt() time.Time { return p.connectedAt } @@ -719,7 +723,7 @@ func (p *ParticipantImpl) MaybeStartMigration(force bool, onStart func()) bool { p.migrationTimer = time.AfterFunc(migrationWaitDuration, func() { p.clearMigrationTimer() - if p.isClosed.Load() || p.State() == livekit.ParticipantInfo_DISCONNECTED { + if p.isClosed.Load() || p.IsDisconnected() { return } // TODO: change to debug once we are confident @@ -1216,7 +1220,7 @@ func (p *ParticipantImpl) onSubscriberOffer(offer webrtc.SessionDescription) err // when a new remoteTrack is created, creates a Track and adds it to room func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) { - if p.State() == livekit.ParticipantInfo_DISCONNECTED { + if p.IsDisconnected() { return } @@ -1254,7 +1258,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w } func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byte) { - if p.State() == livekit.ParticipantInfo_DISCONNECTED || !p.CanPublishData() { + if p.IsDisconnected() || !p.CanPublishData() { return } @@ -1285,7 +1289,7 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt } func (p *ParticipantImpl) onICECandidate(c *webrtc.ICECandidate, target livekit.SignalTarget) error { - if c == nil || p.State() == livekit.ParticipantInfo_DISCONNECTED { + if c == nil || p.IsDisconnected() { return nil } @@ -1333,7 +1337,7 @@ func (p *ParticipantImpl) setupDisconnectTimer() { p.disconnectTimer = time.AfterFunc(disconnectCleanupDuration, func() { p.clearDisconnectTimer() - if p.isClosed.Load() || p.State() == livekit.ParticipantInfo_DISCONNECTED { + if p.isClosed.Load() || p.IsDisconnected() { return } p.params.Logger.Infow("closing disconnected participant") @@ -1355,7 +1359,7 @@ func (p *ParticipantImpl) onAnyTransportFailed() { func (p *ParticipantImpl) subscriberRTCPWorker() { defer Recover() for { - if p.State() == livekit.ParticipantInfo_DISCONNECTED { + if p.IsDisconnected() { return } @@ -2050,7 +2054,7 @@ func (p *ParticipantImpl) onAnyTransportNegotiationFailed() { func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, sourceTrack types.MediaTrack, isRelayed bool, f func(sub types.LocalParticipant) error) bool { // do not queue subscription is participant is already closed/disconnected - if p.isClosed.Load() || p.State() == livekit.ParticipantInfo_DISCONNECTED { + if p.isClosed.Load() || p.IsDisconnected() { return false } diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index c94733785..136d38c11 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -66,6 +66,11 @@ func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) e func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []*livekit.ParticipantInfo) error { p.updateLock.Lock() + if p.IsDisconnected() { + p.updateLock.Unlock() + return nil + } + if !p.IsReady() { // queue up updates p.queuedUpdates = append(p.queuedUpdates, participantsToUpdate...) @@ -201,7 +206,7 @@ func (p *ParticipantImpl) sendTrackUnpublished(trackID livekit.TrackID) { } func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error { - if p.State() == livekit.ParticipantInfo_DISCONNECTED || (!p.IsReady() && msg.GetJoin() == nil) { + if p.IsDisconnected() || (!p.IsReady() && msg.GetJoin() == nil) { return nil } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 8a1ffa980..bd2635e5d 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -398,7 +398,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livek } // send broadcast only if it's not already closed - sendUpdates := p.State() != livekit.ParticipantInfo_DISCONNECTED + sendUpdates := !p.IsDisconnected() p.OnTrackUpdated(nil) p.OnTrackPublished(nil) @@ -873,11 +873,6 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcas func (r *Room) sendParticipantUpdates(updates []*livekit.ParticipantInfo) { for _, op := range r.GetParticipants() { - // skip closed participants - if op.State() == livekit.ParticipantInfo_DISCONNECTED { - continue - } - err := op.SendParticipantUpdate(updates) if err != nil { r.Logger.Errorw("could not send update to participant", err, diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 499293a1f..7cd300e44 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -233,6 +233,7 @@ type LocalParticipant interface { ConnectedAt() time.Time State() livekit.ParticipantInfo_State IsReady() bool + IsDisconnected() bool SubscriberAsPrimary() bool GetClientConfiguration() *livekit.ClientConfiguration GetICEConnectionType() ICEConnectionType diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 790c3f93c..b042940bd 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -368,6 +368,16 @@ type FakeLocalParticipant struct { identityReturnsOnCall map[int]struct { result1 livekit.ParticipantIdentity } + IsDisconnectedStub func() bool + isDisconnectedMutex sync.RWMutex + isDisconnectedArgsForCall []struct { + } + isDisconnectedReturns struct { + result1 bool + } + isDisconnectedReturnsOnCall map[int]struct { + result1 bool + } IsPublisherStub func() bool isPublisherMutex sync.RWMutex isPublisherArgsForCall []struct { @@ -2609,6 +2619,59 @@ func (fake *FakeLocalParticipant) IdentityReturnsOnCall(i int, result1 livekit.P }{result1} } +func (fake *FakeLocalParticipant) IsDisconnected() bool { + fake.isDisconnectedMutex.Lock() + ret, specificReturn := fake.isDisconnectedReturnsOnCall[len(fake.isDisconnectedArgsForCall)] + fake.isDisconnectedArgsForCall = append(fake.isDisconnectedArgsForCall, struct { + }{}) + stub := fake.IsDisconnectedStub + fakeReturns := fake.isDisconnectedReturns + fake.recordInvocation("IsDisconnected", []interface{}{}) + fake.isDisconnectedMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) IsDisconnectedCallCount() int { + fake.isDisconnectedMutex.RLock() + defer fake.isDisconnectedMutex.RUnlock() + return len(fake.isDisconnectedArgsForCall) +} + +func (fake *FakeLocalParticipant) IsDisconnectedCalls(stub func() bool) { + fake.isDisconnectedMutex.Lock() + defer fake.isDisconnectedMutex.Unlock() + fake.IsDisconnectedStub = stub +} + +func (fake *FakeLocalParticipant) IsDisconnectedReturns(result1 bool) { + fake.isDisconnectedMutex.Lock() + defer fake.isDisconnectedMutex.Unlock() + fake.IsDisconnectedStub = nil + fake.isDisconnectedReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeLocalParticipant) IsDisconnectedReturnsOnCall(i int, result1 bool) { + fake.isDisconnectedMutex.Lock() + defer fake.isDisconnectedMutex.Unlock() + fake.IsDisconnectedStub = nil + if fake.isDisconnectedReturnsOnCall == nil { + fake.isDisconnectedReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.isDisconnectedReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeLocalParticipant) IsPublisher() bool { fake.isPublisherMutex.Lock() ret, specificReturn := fake.isPublisherReturnsOnCall[len(fake.isPublisherArgsForCall)] @@ -5050,6 +5113,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.iDMutex.RUnlock() fake.identityMutex.RLock() defer fake.identityMutex.RUnlock() + fake.isDisconnectedMutex.RLock() + defer fake.isDisconnectedMutex.RUnlock() fake.isPublisherMutex.RLock() defer fake.isPublisherMutex.RUnlock() fake.isReadyMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index e55c10a0a..c2968c90d 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -434,7 +434,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room }) newRoom.OnParticipantChanged(func(p types.LocalParticipant) { - if p.State() != livekit.ParticipantInfo_DISCONNECTED { + if !p.IsDisconnected() { if err := r.roomStore.StoreParticipant(ctx, roomName, p.ToProto()); err != nil { newRoom.Logger.Errorw("could not handle participant change", err) } @@ -483,7 +483,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa select { case <-stateCheckTicker.C: // periodic check to ensure participant didn't become disconnected - if participant.State() == livekit.ParticipantInfo_DISCONNECTED { + if participant.IsDisconnected() { return } case <-tokenTicker.C: diff --git a/pkg/service/servicefakes/fake_egress_store.go b/pkg/service/servicefakes/fake_egress_store.go index abf809abb..06ee8d9cc 100644 --- a/pkg/service/servicefakes/fake_egress_store.go +++ b/pkg/service/servicefakes/fake_egress_store.go @@ -62,16 +62,6 @@ type FakeEgressStore struct { updateEgressReturnsOnCall map[int]struct { result1 error } - UsePSRPCStub func() bool - usePSRPCMutex sync.RWMutex - usePSRPCArgsForCall []struct { - } - usePSRPCReturns struct { - result1 bool - } - usePSRPCReturnsOnCall map[int]struct { - result1 bool - } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -330,59 +320,6 @@ func (fake *FakeEgressStore) UpdateEgressReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeEgressStore) UsePSRPC() bool { - fake.usePSRPCMutex.Lock() - ret, specificReturn := fake.usePSRPCReturnsOnCall[len(fake.usePSRPCArgsForCall)] - fake.usePSRPCArgsForCall = append(fake.usePSRPCArgsForCall, struct { - }{}) - stub := fake.UsePSRPCStub - fakeReturns := fake.usePSRPCReturns - fake.recordInvocation("UsePSRPC", []interface{}{}) - fake.usePSRPCMutex.Unlock() - if stub != nil { - return stub() - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeEgressStore) UsePSRPCCallCount() int { - fake.usePSRPCMutex.RLock() - defer fake.usePSRPCMutex.RUnlock() - return len(fake.usePSRPCArgsForCall) -} - -func (fake *FakeEgressStore) UsePSRPCCalls(stub func() bool) { - fake.usePSRPCMutex.Lock() - defer fake.usePSRPCMutex.Unlock() - fake.UsePSRPCStub = stub -} - -func (fake *FakeEgressStore) UsePSRPCReturns(result1 bool) { - fake.usePSRPCMutex.Lock() - defer fake.usePSRPCMutex.Unlock() - fake.UsePSRPCStub = nil - fake.usePSRPCReturns = struct { - result1 bool - }{result1} -} - -func (fake *FakeEgressStore) UsePSRPCReturnsOnCall(i int, result1 bool) { - fake.usePSRPCMutex.Lock() - defer fake.usePSRPCMutex.Unlock() - fake.UsePSRPCStub = nil - if fake.usePSRPCReturnsOnCall == nil { - fake.usePSRPCReturnsOnCall = make(map[int]struct { - result1 bool - }) - } - fake.usePSRPCReturnsOnCall[i] = struct { - result1 bool - }{result1} -} - func (fake *FakeEgressStore) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -394,8 +331,6 @@ func (fake *FakeEgressStore) Invocations() map[string][][]interface{} { defer fake.storeEgressMutex.RUnlock() fake.updateEgressMutex.RLock() defer fake.updateEgressMutex.RUnlock() - fake.usePSRPCMutex.RLock() - defer fake.usePSRPCMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value