Make an IsDisconnected interface and use it (#1278)

This commit is contained in:
Raja Subramanian
2022-12-31 12:53:02 +05:30
committed by GitHub
parent 9b8311ffcb
commit 4ba7e57683
7 changed files with 86 additions and 81 deletions
+11 -7
View File
@@ -265,6 +265,10 @@ func (p *ParticipantImpl) IsReady() bool {
return state == livekit.ParticipantInfo_JOINED || state == livekit.ParticipantInfo_ACTIVE
}
func (p *ParticipantImpl) IsDisconnected() bool {
return p.State() == livekit.ParticipantInfo_DISCONNECTED
}
func (p *ParticipantImpl) ConnectedAt() time.Time {
return p.connectedAt
}
@@ -719,7 +723,7 @@ func (p *ParticipantImpl) MaybeStartMigration(force bool, onStart func()) bool {
p.migrationTimer = time.AfterFunc(migrationWaitDuration, func() {
p.clearMigrationTimer()
if p.isClosed.Load() || p.State() == livekit.ParticipantInfo_DISCONNECTED {
if p.isClosed.Load() || p.IsDisconnected() {
return
}
// TODO: change to debug once we are confident
@@ -1216,7 +1220,7 @@ func (p *ParticipantImpl) onSubscriberOffer(offer webrtc.SessionDescription) err
// when a new remoteTrack is created, creates a Track and adds it to room
func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
if p.State() == livekit.ParticipantInfo_DISCONNECTED {
if p.IsDisconnected() {
return
}
@@ -1254,7 +1258,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
}
func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byte) {
if p.State() == livekit.ParticipantInfo_DISCONNECTED || !p.CanPublishData() {
if p.IsDisconnected() || !p.CanPublishData() {
return
}
@@ -1285,7 +1289,7 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt
}
func (p *ParticipantImpl) onICECandidate(c *webrtc.ICECandidate, target livekit.SignalTarget) error {
if c == nil || p.State() == livekit.ParticipantInfo_DISCONNECTED {
if c == nil || p.IsDisconnected() {
return nil
}
@@ -1333,7 +1337,7 @@ func (p *ParticipantImpl) setupDisconnectTimer() {
p.disconnectTimer = time.AfterFunc(disconnectCleanupDuration, func() {
p.clearDisconnectTimer()
if p.isClosed.Load() || p.State() == livekit.ParticipantInfo_DISCONNECTED {
if p.isClosed.Load() || p.IsDisconnected() {
return
}
p.params.Logger.Infow("closing disconnected participant")
@@ -1355,7 +1359,7 @@ func (p *ParticipantImpl) onAnyTransportFailed() {
func (p *ParticipantImpl) subscriberRTCPWorker() {
defer Recover()
for {
if p.State() == livekit.ParticipantInfo_DISCONNECTED {
if p.IsDisconnected() {
return
}
@@ -2050,7 +2054,7 @@ func (p *ParticipantImpl) onAnyTransportNegotiationFailed() {
func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, sourceTrack types.MediaTrack, isRelayed bool, f func(sub types.LocalParticipant) error) bool {
// do not queue subscription is participant is already closed/disconnected
if p.isClosed.Load() || p.State() == livekit.ParticipantInfo_DISCONNECTED {
if p.isClosed.Load() || p.IsDisconnected() {
return false
}
+6 -1
View File
@@ -66,6 +66,11 @@ func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) e
func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []*livekit.ParticipantInfo) error {
p.updateLock.Lock()
if p.IsDisconnected() {
p.updateLock.Unlock()
return nil
}
if !p.IsReady() {
// queue up updates
p.queuedUpdates = append(p.queuedUpdates, participantsToUpdate...)
@@ -201,7 +206,7 @@ func (p *ParticipantImpl) sendTrackUnpublished(trackID livekit.TrackID) {
}
func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error {
if p.State() == livekit.ParticipantInfo_DISCONNECTED || (!p.IsReady() && msg.GetJoin() == nil) {
if p.IsDisconnected() || (!p.IsReady() && msg.GetJoin() == nil) {
return nil
}
+1 -6
View File
@@ -398,7 +398,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livek
}
// send broadcast only if it's not already closed
sendUpdates := p.State() != livekit.ParticipantInfo_DISCONNECTED
sendUpdates := !p.IsDisconnected()
p.OnTrackUpdated(nil)
p.OnTrackPublished(nil)
@@ -873,11 +873,6 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcas
func (r *Room) sendParticipantUpdates(updates []*livekit.ParticipantInfo) {
for _, op := range r.GetParticipants() {
// skip closed participants
if op.State() == livekit.ParticipantInfo_DISCONNECTED {
continue
}
err := op.SendParticipantUpdate(updates)
if err != nil {
r.Logger.Errorw("could not send update to participant", err,
+1
View File
@@ -233,6 +233,7 @@ type LocalParticipant interface {
ConnectedAt() time.Time
State() livekit.ParticipantInfo_State
IsReady() bool
IsDisconnected() bool
SubscriberAsPrimary() bool
GetClientConfiguration() *livekit.ClientConfiguration
GetICEConnectionType() ICEConnectionType
@@ -368,6 +368,16 @@ type FakeLocalParticipant struct {
identityReturnsOnCall map[int]struct {
result1 livekit.ParticipantIdentity
}
IsDisconnectedStub func() bool
isDisconnectedMutex sync.RWMutex
isDisconnectedArgsForCall []struct {
}
isDisconnectedReturns struct {
result1 bool
}
isDisconnectedReturnsOnCall map[int]struct {
result1 bool
}
IsPublisherStub func() bool
isPublisherMutex sync.RWMutex
isPublisherArgsForCall []struct {
@@ -2609,6 +2619,59 @@ func (fake *FakeLocalParticipant) IdentityReturnsOnCall(i int, result1 livekit.P
}{result1}
}
func (fake *FakeLocalParticipant) IsDisconnected() bool {
fake.isDisconnectedMutex.Lock()
ret, specificReturn := fake.isDisconnectedReturnsOnCall[len(fake.isDisconnectedArgsForCall)]
fake.isDisconnectedArgsForCall = append(fake.isDisconnectedArgsForCall, struct {
}{})
stub := fake.IsDisconnectedStub
fakeReturns := fake.isDisconnectedReturns
fake.recordInvocation("IsDisconnected", []interface{}{})
fake.isDisconnectedMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) IsDisconnectedCallCount() int {
fake.isDisconnectedMutex.RLock()
defer fake.isDisconnectedMutex.RUnlock()
return len(fake.isDisconnectedArgsForCall)
}
func (fake *FakeLocalParticipant) IsDisconnectedCalls(stub func() bool) {
fake.isDisconnectedMutex.Lock()
defer fake.isDisconnectedMutex.Unlock()
fake.IsDisconnectedStub = stub
}
func (fake *FakeLocalParticipant) IsDisconnectedReturns(result1 bool) {
fake.isDisconnectedMutex.Lock()
defer fake.isDisconnectedMutex.Unlock()
fake.IsDisconnectedStub = nil
fake.isDisconnectedReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) IsDisconnectedReturnsOnCall(i int, result1 bool) {
fake.isDisconnectedMutex.Lock()
defer fake.isDisconnectedMutex.Unlock()
fake.IsDisconnectedStub = nil
if fake.isDisconnectedReturnsOnCall == nil {
fake.isDisconnectedReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.isDisconnectedReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) IsPublisher() bool {
fake.isPublisherMutex.Lock()
ret, specificReturn := fake.isPublisherReturnsOnCall[len(fake.isPublisherArgsForCall)]
@@ -5050,6 +5113,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.iDMutex.RUnlock()
fake.identityMutex.RLock()
defer fake.identityMutex.RUnlock()
fake.isDisconnectedMutex.RLock()
defer fake.isDisconnectedMutex.RUnlock()
fake.isPublisherMutex.RLock()
defer fake.isPublisherMutex.RUnlock()
fake.isReadyMutex.RLock()
+2 -2
View File
@@ -434,7 +434,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
})
newRoom.OnParticipantChanged(func(p types.LocalParticipant) {
if p.State() != livekit.ParticipantInfo_DISCONNECTED {
if !p.IsDisconnected() {
if err := r.roomStore.StoreParticipant(ctx, roomName, p.ToProto()); err != nil {
newRoom.Logger.Errorw("could not handle participant change", err)
}
@@ -483,7 +483,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa
select {
case <-stateCheckTicker.C:
// periodic check to ensure participant didn't become disconnected
if participant.State() == livekit.ParticipantInfo_DISCONNECTED {
if participant.IsDisconnected() {
return
}
case <-tokenTicker.C:
@@ -62,16 +62,6 @@ type FakeEgressStore struct {
updateEgressReturnsOnCall map[int]struct {
result1 error
}
UsePSRPCStub func() bool
usePSRPCMutex sync.RWMutex
usePSRPCArgsForCall []struct {
}
usePSRPCReturns struct {
result1 bool
}
usePSRPCReturnsOnCall map[int]struct {
result1 bool
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
@@ -330,59 +320,6 @@ func (fake *FakeEgressStore) UpdateEgressReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeEgressStore) UsePSRPC() bool {
fake.usePSRPCMutex.Lock()
ret, specificReturn := fake.usePSRPCReturnsOnCall[len(fake.usePSRPCArgsForCall)]
fake.usePSRPCArgsForCall = append(fake.usePSRPCArgsForCall, struct {
}{})
stub := fake.UsePSRPCStub
fakeReturns := fake.usePSRPCReturns
fake.recordInvocation("UsePSRPC", []interface{}{})
fake.usePSRPCMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeEgressStore) UsePSRPCCallCount() int {
fake.usePSRPCMutex.RLock()
defer fake.usePSRPCMutex.RUnlock()
return len(fake.usePSRPCArgsForCall)
}
func (fake *FakeEgressStore) UsePSRPCCalls(stub func() bool) {
fake.usePSRPCMutex.Lock()
defer fake.usePSRPCMutex.Unlock()
fake.UsePSRPCStub = stub
}
func (fake *FakeEgressStore) UsePSRPCReturns(result1 bool) {
fake.usePSRPCMutex.Lock()
defer fake.usePSRPCMutex.Unlock()
fake.UsePSRPCStub = nil
fake.usePSRPCReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeEgressStore) UsePSRPCReturnsOnCall(i int, result1 bool) {
fake.usePSRPCMutex.Lock()
defer fake.usePSRPCMutex.Unlock()
fake.UsePSRPCStub = nil
if fake.usePSRPCReturnsOnCall == nil {
fake.usePSRPCReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.usePSRPCReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeEgressStore) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
@@ -394,8 +331,6 @@ func (fake *FakeEgressStore) Invocations() map[string][][]interface{} {
defer fake.storeEgressMutex.RUnlock()
fake.updateEgressMutex.RLock()
defer fake.updateEgressMutex.RUnlock()
fake.usePSRPCMutex.RLock()
defer fake.usePSRPCMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value