From 00558dee5c3d403c731a5db373477e05d5a6fcfb Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 22 Jun 2023 10:09:10 +0530 Subject: [PATCH] Close participant on full reconnect. (#1818) * Close participant on full reconnect. A full reconnect == irrecoverable error. Participant cannot continue. So, close the participant when issuing a full reconnect. That should prevent subscription manager reconcile till the participant is finally closed down when participant is stale. * format --- pkg/rtc/participant.go | 20 +++---- pkg/rtc/room.go | 10 ++-- pkg/rtc/types/interfaces.go | 2 +- .../typesfakes/fake_local_participant.go | 18 +++--- pkg/rtc/types/typesfakes/fake_participant.go | 18 +++--- pkg/service/roommanager.go | 55 +++++++++++++++---- 6 files changed, 79 insertions(+), 44 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index e9bf6bbc8..889619c76 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -650,13 +650,13 @@ func (p *ParticipantImpl) Start() { }) } -func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseReason) error { +func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseReason, isExpectedToResume bool) error { if p.isClosed.Swap(true) { // already closed return nil } - p.params.Logger.Infow("participant closing", "sendLeave", sendLeave, "reason", reason.String()) + p.params.Logger.Infow("participant closing", "sendLeave", sendLeave, "reason", reason.String(), "isExpectedToResume", isExpectedToResume) p.clearDisconnectTimer() p.clearMigrationTimer() @@ -680,10 +680,10 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea p.pendingTracksLock.Unlock() for _, t := range closeMutedTrack { - t.Close(!sendLeave) + t.Close(isExpectedToResume) } - p.UpTrackManager.Close(!sendLeave) + p.UpTrackManager.Close(isExpectedToResume) p.updateState(livekit.ParticipantInfo_DISCONNECTED) @@ -699,7 +699,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea // Close peer connections without blocking participant Close. If peer connections are gathering candidates // Close will block. go func() { - p.SubscriptionManager.Close(!sendLeave) + p.SubscriptionManager.Close(isExpectedToResume) p.TransportManager.Close() }() @@ -760,7 +760,7 @@ func (p *ParticipantImpl) MaybeStartMigration(force bool, onStart func()) bool { p.migrationTimer = time.AfterFunc(migrationWaitDuration, func() { p.clearMigrationTimer() - if p.isClosed.Load() || p.IsDisconnected() { + if p.IsClosed() || p.IsDisconnected() { return } // TODO: change to debug once we are confident @@ -1338,11 +1338,11 @@ func (p *ParticipantImpl) setupDisconnectTimer() { p.disconnectTimer = time.AfterFunc(disconnectCleanupDuration, func() { p.clearDisconnectTimer() - if p.isClosed.Load() || p.IsDisconnected() { + if p.IsClosed() || p.IsDisconnected() { return } p.params.Logger.Infow("closing disconnected participant") - _ = p.Close(true, types.ParticipantCloseReasonPeerConnectionDisconnected) + _ = p.Close(true, types.ParticipantCloseReasonPeerConnectionDisconnected, false) }) p.lock.Unlock() } @@ -2082,8 +2082,8 @@ func (p *ParticipantImpl) IssueFullReconnect(reason types.ParticipantCloseReason } p.CloseSignalConnection(scr) - // on a full reconnect, no need to supervise this participant anymore - p.supervisor.Stop() + // a full reconnect == client should connect back with a new session, close current one + p.Close(false, reason, false) } func (p *ParticipantImpl) onPublicationError(trackID livekit.TrackID) { diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 8d343a98f..df9db3764 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -487,7 +487,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livek // close participant as well r.Logger.Debugw("closing participant for removal", "pID", p.ID(), "participant", p.Identity()) - _ = p.Close(true, reason) + _ = p.Close(true, reason, false) r.leftAt.Store(time.Now().Unix()) @@ -622,7 +622,7 @@ func (r *Room) Close() { r.lock.Unlock() r.Logger.Infow("closing room") for _, p := range r.GetParticipants() { - _ = p.Close(true, types.ParticipantCloseReasonRoomClose) + _ = p.Close(true, types.ParticipantCloseReasonRoomClose, false) } r.protoProxy.Stop() if r.onClose != nil { @@ -705,18 +705,18 @@ func (r *Room) SimulateScenario(participant types.LocalParticipant, simulateScen case *livekit.SimulateScenario_Migration: r.Logger.Infow("simulating migration", "participant", participant.Identity()) // drop participant without necessarily cleaning up - if err := participant.Close(false, types.ParticipantCloseReasonSimulateMigration); err != nil { + if err := participant.Close(false, types.ParticipantCloseReasonSimulateMigration, true); err != nil { return err } case *livekit.SimulateScenario_NodeFailure: r.Logger.Infow("simulating node failure", "participant", participant.Identity()) // drop participant without necessarily cleaning up - if err := participant.Close(false, types.ParticipantCloseReasonSimulateNodeFailure); err != nil { + if err := participant.Close(false, types.ParticipantCloseReasonSimulateNodeFailure, true); err != nil { return err } case *livekit.SimulateScenario_ServerLeave: r.Logger.Infow("simulating server leave", "participant", participant.Identity()) - if err := participant.Close(true, types.ParticipantCloseReasonSimulateServerLeave); err != nil { + if err := participant.Close(true, types.ParticipantCloseReasonSimulateServerLeave, false); err != nil { return err } case *livekit.SimulateScenario_SwitchCandidateProtocol: diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index d9b887041..7370bfa1b 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -237,7 +237,7 @@ type Participant interface { IsRecorder() bool Start() - Close(sendLeave bool, reason ParticipantCloseReason) error + Close(sendLeave bool, reason ParticipantCloseReason, isExpectedToResume bool) error SubscriptionPermission() (*livekit.SubscriptionPermission, utils.TimedVersion) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 235994fa9..f6882fe93 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -119,11 +119,12 @@ type FakeLocalParticipant struct { claimGrantsReturnsOnCall map[int]struct { result1 *auth.ClaimGrants } - CloseStub func(bool, types.ParticipantCloseReason) error + CloseStub func(bool, types.ParticipantCloseReason, bool) error closeMutex sync.RWMutex closeArgsForCall []struct { arg1 bool arg2 types.ParticipantCloseReason + arg3 bool } closeReturns struct { result1 error @@ -1368,19 +1369,20 @@ func (fake *FakeLocalParticipant) ClaimGrantsReturnsOnCall(i int, result1 *auth. }{result1} } -func (fake *FakeLocalParticipant) Close(arg1 bool, arg2 types.ParticipantCloseReason) error { +func (fake *FakeLocalParticipant) Close(arg1 bool, arg2 types.ParticipantCloseReason, arg3 bool) error { fake.closeMutex.Lock() ret, specificReturn := fake.closeReturnsOnCall[len(fake.closeArgsForCall)] fake.closeArgsForCall = append(fake.closeArgsForCall, struct { arg1 bool arg2 types.ParticipantCloseReason - }{arg1, arg2}) + arg3 bool + }{arg1, arg2, arg3}) stub := fake.CloseStub fakeReturns := fake.closeReturns - fake.recordInvocation("Close", []interface{}{arg1, arg2}) + fake.recordInvocation("Close", []interface{}{arg1, arg2, arg3}) fake.closeMutex.Unlock() if stub != nil { - return stub(arg1, arg2) + return stub(arg1, arg2, arg3) } if specificReturn { return ret.result1 @@ -1394,17 +1396,17 @@ func (fake *FakeLocalParticipant) CloseCallCount() int { return len(fake.closeArgsForCall) } -func (fake *FakeLocalParticipant) CloseCalls(stub func(bool, types.ParticipantCloseReason) error) { +func (fake *FakeLocalParticipant) CloseCalls(stub func(bool, types.ParticipantCloseReason, bool) error) { fake.closeMutex.Lock() defer fake.closeMutex.Unlock() fake.CloseStub = stub } -func (fake *FakeLocalParticipant) CloseArgsForCall(i int) (bool, types.ParticipantCloseReason) { +func (fake *FakeLocalParticipant) CloseArgsForCall(i int) (bool, types.ParticipantCloseReason, bool) { fake.closeMutex.RLock() defer fake.closeMutex.RUnlock() argsForCall := fake.closeArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeLocalParticipant) CloseReturns(result1 error) { diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index c32b8a6ed..a660644cb 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -20,11 +20,12 @@ type FakeParticipant struct { canSkipBroadcastReturnsOnCall map[int]struct { result1 bool } - CloseStub func(bool, types.ParticipantCloseReason) error + CloseStub func(bool, types.ParticipantCloseReason, bool) error closeMutex sync.RWMutex closeArgsForCall []struct { arg1 bool arg2 types.ParticipantCloseReason + arg3 bool } closeReturns struct { result1 error @@ -260,19 +261,20 @@ func (fake *FakeParticipant) CanSkipBroadcastReturnsOnCall(i int, result1 bool) }{result1} } -func (fake *FakeParticipant) Close(arg1 bool, arg2 types.ParticipantCloseReason) error { +func (fake *FakeParticipant) Close(arg1 bool, arg2 types.ParticipantCloseReason, arg3 bool) error { fake.closeMutex.Lock() ret, specificReturn := fake.closeReturnsOnCall[len(fake.closeArgsForCall)] fake.closeArgsForCall = append(fake.closeArgsForCall, struct { arg1 bool arg2 types.ParticipantCloseReason - }{arg1, arg2}) + arg3 bool + }{arg1, arg2, arg3}) stub := fake.CloseStub fakeReturns := fake.closeReturns - fake.recordInvocation("Close", []interface{}{arg1, arg2}) + fake.recordInvocation("Close", []interface{}{arg1, arg2, arg3}) fake.closeMutex.Unlock() if stub != nil { - return stub(arg1, arg2) + return stub(arg1, arg2, arg3) } if specificReturn { return ret.result1 @@ -286,17 +288,17 @@ func (fake *FakeParticipant) CloseCallCount() int { return len(fake.closeArgsForCall) } -func (fake *FakeParticipant) CloseCalls(stub func(bool, types.ParticipantCloseReason) error) { +func (fake *FakeParticipant) CloseCalls(stub func(bool, types.ParticipantCloseReason, bool) error) { fake.closeMutex.Lock() defer fake.closeMutex.Unlock() fake.CloseStub = stub } -func (fake *FakeParticipant) CloseArgsForCall(i int) (bool, types.ParticipantCloseReason) { +func (fake *FakeParticipant) CloseArgsForCall(i int) (bool, types.ParticipantCloseReason, bool) { fake.closeMutex.RLock() defer fake.closeMutex.RUnlock() argsForCall := fake.closeArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeParticipant) CloseReturns(result1 error) { diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 4e3997d15..bd09448dd 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -194,7 +194,7 @@ func (r *RoomManager) Stop() { for _, room := range rooms { for _, p := range room.GetParticipants() { - _ = p.Close(true, types.ParticipantCloseReasonRoomManagerStop) + _ = p.Close(true, types.ParticipantCloseReasonRoomManagerStop, false) } room.Close() } @@ -229,11 +229,35 @@ func (r *RoomManager) StartSession( if pi.Identity == "" { return nil } + participant := room.GetParticipant(pi.Identity) if participant != nil { - // When reconnecting, it means WS has interrupted by underlying peer connection is still ok - // in this mode, we'll keep the participant SID, and just swap the sink for the underlying connection + // When reconnecting, it means WS has interrupted but underlying peer connection is still ok in this state, + // we'll keep the participant SID, and just swap the sink for the underlying connection if pi.Reconnect { + if participant.IsClosed() { + // Send leave request if participant is closed, i. e. handle the case of client trying to resume crossing wires with + // server closing the participant due to some irrecoverable condition. Such a condition would have triggered + // a full reconnect when that condition occurred. + // + // It is possible that the client did not get that send request. So, send it again. + logger.Infow("cannot restart a closed participant", + "room", roomName, + "nodeID", r.currentNode.Id, + "participant", pi.Identity, + "reason", pi.ReconnectReason, + ) + _ = responseSink.WriteMessage(&livekit.SignalResponse{ + Message: &livekit.SignalResponse_Leave{ + Leave: &livekit.LeaveRequest{ + CanReconnect: true, + Reason: livekit.DisconnectReason_STATE_MISMATCH, + }, + }, + }) + return errors.New("could not restart closed participant") + } + logger.Infow("resuming RTC session", "room", roomName, "nodeID", r.currentNode.Id, @@ -244,20 +268,27 @@ func (r *RoomManager) StartSession( if iceConfig == nil { iceConfig = &livekit.ICEConfig{} } - if err = room.ResumeParticipant(participant, requestSource, responseSink, - r.iceServersForRoom(protoRoom, iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS), - pi.ReconnectReason); err != nil { + if err = room.ResumeParticipant( + participant, + requestSource, + responseSink, + r.iceServersForRoom( + protoRoom, + iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS, + ), + pi.ReconnectReason, + ); err != nil { logger.Warnw("could not resume participant", err, "participant", pi.Identity) return err } r.telemetry.ParticipantResumed(ctx, room.ToProto(), participant.ToProto(), livekit.NodeID(r.currentNode.Id), pi.ReconnectReason) go r.rtcSessionWorker(room, participant, requestSource) return nil - } else { - participant.GetLogger().Infow("removing duplicate participant") - // we need to clean up the existing participant, so a new one can join - room.RemoveParticipant(participant.Identity(), participant.ID(), types.ParticipantCloseReasonDuplicateIdentity) } + + // we need to clean up the existing participant, so a new one can join + participant.GetLogger().Infow("removing duplicate participant") + room.RemoveParticipant(participant.Identity(), participant.ID(), types.ParticipantCloseReasonDuplicateIdentity) } else if pi.Reconnect { // send leave request if participant is trying to reconnect without keep subscribe state // but missing from the room @@ -356,7 +387,7 @@ func (r *RoomManager) StartSession( } if err = room.Join(participant, requestSource, &opts, r.iceServersForRoom(protoRoom, iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS)); err != nil { pLogger.Errorw("could not join room", err) - _ = participant.Close(true, types.ParticipantCloseReasonJoinFailed) + _ = participant.Close(true, types.ParticipantCloseReasonJoinFailed, false) return err } if err = r.roomStore.StoreParticipant(ctx, roomName, participant.ToProto()); err != nil { @@ -598,7 +629,7 @@ func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName livekit.Roo case *livekit.RTCNodeMessage_DeleteRoom: room.Logger.Infow("deleting room") for _, p := range room.GetParticipants() { - _ = p.Close(true, types.ParticipantCloseReasonServiceRequestDeleteRoom) + _ = p.Close(true, types.ParticipantCloseReasonServiceRequestDeleteRoom, false) } room.Close() case *livekit.RTCNodeMessage_UpdateSubscriptions: