Close participant on full reconnect. (#1818)

* Close participant on full reconnect.

A full reconnect == irrecoverable error. Participant cannot continue.
So, close the participant when issuing a full reconnect.
That should prevent subscription manager reconcile till the participant
is finally closed down when participant is stale.

* format
This commit is contained in:
Raja Subramanian
2023-06-22 10:09:10 +05:30
committed by GitHub
parent 2438058474
commit 00558dee5c
6 changed files with 79 additions and 44 deletions
+10 -10
View File
@@ -650,13 +650,13 @@ func (p *ParticipantImpl) Start() {
})
}
func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseReason) error {
func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseReason, isExpectedToResume bool) error {
if p.isClosed.Swap(true) {
// already closed
return nil
}
p.params.Logger.Infow("participant closing", "sendLeave", sendLeave, "reason", reason.String())
p.params.Logger.Infow("participant closing", "sendLeave", sendLeave, "reason", reason.String(), "isExpectedToResume", isExpectedToResume)
p.clearDisconnectTimer()
p.clearMigrationTimer()
@@ -680,10 +680,10 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
p.pendingTracksLock.Unlock()
for _, t := range closeMutedTrack {
t.Close(!sendLeave)
t.Close(isExpectedToResume)
}
p.UpTrackManager.Close(!sendLeave)
p.UpTrackManager.Close(isExpectedToResume)
p.updateState(livekit.ParticipantInfo_DISCONNECTED)
@@ -699,7 +699,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
// Close peer connections without blocking participant Close. If peer connections are gathering candidates
// Close will block.
go func() {
p.SubscriptionManager.Close(!sendLeave)
p.SubscriptionManager.Close(isExpectedToResume)
p.TransportManager.Close()
}()
@@ -760,7 +760,7 @@ func (p *ParticipantImpl) MaybeStartMigration(force bool, onStart func()) bool {
p.migrationTimer = time.AfterFunc(migrationWaitDuration, func() {
p.clearMigrationTimer()
if p.isClosed.Load() || p.IsDisconnected() {
if p.IsClosed() || p.IsDisconnected() {
return
}
// TODO: change to debug once we are confident
@@ -1338,11 +1338,11 @@ func (p *ParticipantImpl) setupDisconnectTimer() {
p.disconnectTimer = time.AfterFunc(disconnectCleanupDuration, func() {
p.clearDisconnectTimer()
if p.isClosed.Load() || p.IsDisconnected() {
if p.IsClosed() || p.IsDisconnected() {
return
}
p.params.Logger.Infow("closing disconnected participant")
_ = p.Close(true, types.ParticipantCloseReasonPeerConnectionDisconnected)
_ = p.Close(true, types.ParticipantCloseReasonPeerConnectionDisconnected, false)
})
p.lock.Unlock()
}
@@ -2082,8 +2082,8 @@ func (p *ParticipantImpl) IssueFullReconnect(reason types.ParticipantCloseReason
}
p.CloseSignalConnection(scr)
// on a full reconnect, no need to supervise this participant anymore
p.supervisor.Stop()
// a full reconnect == client should connect back with a new session, close current one
p.Close(false, reason, false)
}
func (p *ParticipantImpl) onPublicationError(trackID livekit.TrackID) {
+5 -5
View File
@@ -487,7 +487,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livek
// close participant as well
r.Logger.Debugw("closing participant for removal", "pID", p.ID(), "participant", p.Identity())
_ = p.Close(true, reason)
_ = p.Close(true, reason, false)
r.leftAt.Store(time.Now().Unix())
@@ -622,7 +622,7 @@ func (r *Room) Close() {
r.lock.Unlock()
r.Logger.Infow("closing room")
for _, p := range r.GetParticipants() {
_ = p.Close(true, types.ParticipantCloseReasonRoomClose)
_ = p.Close(true, types.ParticipantCloseReasonRoomClose, false)
}
r.protoProxy.Stop()
if r.onClose != nil {
@@ -705,18 +705,18 @@ func (r *Room) SimulateScenario(participant types.LocalParticipant, simulateScen
case *livekit.SimulateScenario_Migration:
r.Logger.Infow("simulating migration", "participant", participant.Identity())
// drop participant without necessarily cleaning up
if err := participant.Close(false, types.ParticipantCloseReasonSimulateMigration); err != nil {
if err := participant.Close(false, types.ParticipantCloseReasonSimulateMigration, true); err != nil {
return err
}
case *livekit.SimulateScenario_NodeFailure:
r.Logger.Infow("simulating node failure", "participant", participant.Identity())
// drop participant without necessarily cleaning up
if err := participant.Close(false, types.ParticipantCloseReasonSimulateNodeFailure); err != nil {
if err := participant.Close(false, types.ParticipantCloseReasonSimulateNodeFailure, true); err != nil {
return err
}
case *livekit.SimulateScenario_ServerLeave:
r.Logger.Infow("simulating server leave", "participant", participant.Identity())
if err := participant.Close(true, types.ParticipantCloseReasonSimulateServerLeave); err != nil {
if err := participant.Close(true, types.ParticipantCloseReasonSimulateServerLeave, false); err != nil {
return err
}
case *livekit.SimulateScenario_SwitchCandidateProtocol:
+1 -1
View File
@@ -237,7 +237,7 @@ type Participant interface {
IsRecorder() bool
Start()
Close(sendLeave bool, reason ParticipantCloseReason) error
Close(sendLeave bool, reason ParticipantCloseReason, isExpectedToResume bool) error
SubscriptionPermission() (*livekit.SubscriptionPermission, utils.TimedVersion)
@@ -119,11 +119,12 @@ type FakeLocalParticipant struct {
claimGrantsReturnsOnCall map[int]struct {
result1 *auth.ClaimGrants
}
CloseStub func(bool, types.ParticipantCloseReason) error
CloseStub func(bool, types.ParticipantCloseReason, bool) error
closeMutex sync.RWMutex
closeArgsForCall []struct {
arg1 bool
arg2 types.ParticipantCloseReason
arg3 bool
}
closeReturns struct {
result1 error
@@ -1368,19 +1369,20 @@ func (fake *FakeLocalParticipant) ClaimGrantsReturnsOnCall(i int, result1 *auth.
}{result1}
}
func (fake *FakeLocalParticipant) Close(arg1 bool, arg2 types.ParticipantCloseReason) error {
func (fake *FakeLocalParticipant) Close(arg1 bool, arg2 types.ParticipantCloseReason, arg3 bool) error {
fake.closeMutex.Lock()
ret, specificReturn := fake.closeReturnsOnCall[len(fake.closeArgsForCall)]
fake.closeArgsForCall = append(fake.closeArgsForCall, struct {
arg1 bool
arg2 types.ParticipantCloseReason
}{arg1, arg2})
arg3 bool
}{arg1, arg2, arg3})
stub := fake.CloseStub
fakeReturns := fake.closeReturns
fake.recordInvocation("Close", []interface{}{arg1, arg2})
fake.recordInvocation("Close", []interface{}{arg1, arg2, arg3})
fake.closeMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
@@ -1394,17 +1396,17 @@ func (fake *FakeLocalParticipant) CloseCallCount() int {
return len(fake.closeArgsForCall)
}
func (fake *FakeLocalParticipant) CloseCalls(stub func(bool, types.ParticipantCloseReason) error) {
func (fake *FakeLocalParticipant) CloseCalls(stub func(bool, types.ParticipantCloseReason, bool) error) {
fake.closeMutex.Lock()
defer fake.closeMutex.Unlock()
fake.CloseStub = stub
}
func (fake *FakeLocalParticipant) CloseArgsForCall(i int) (bool, types.ParticipantCloseReason) {
func (fake *FakeLocalParticipant) CloseArgsForCall(i int) (bool, types.ParticipantCloseReason, bool) {
fake.closeMutex.RLock()
defer fake.closeMutex.RUnlock()
argsForCall := fake.closeArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeLocalParticipant) CloseReturns(result1 error) {
+10 -8
View File
@@ -20,11 +20,12 @@ type FakeParticipant struct {
canSkipBroadcastReturnsOnCall map[int]struct {
result1 bool
}
CloseStub func(bool, types.ParticipantCloseReason) error
CloseStub func(bool, types.ParticipantCloseReason, bool) error
closeMutex sync.RWMutex
closeArgsForCall []struct {
arg1 bool
arg2 types.ParticipantCloseReason
arg3 bool
}
closeReturns struct {
result1 error
@@ -260,19 +261,20 @@ func (fake *FakeParticipant) CanSkipBroadcastReturnsOnCall(i int, result1 bool)
}{result1}
}
func (fake *FakeParticipant) Close(arg1 bool, arg2 types.ParticipantCloseReason) error {
func (fake *FakeParticipant) Close(arg1 bool, arg2 types.ParticipantCloseReason, arg3 bool) error {
fake.closeMutex.Lock()
ret, specificReturn := fake.closeReturnsOnCall[len(fake.closeArgsForCall)]
fake.closeArgsForCall = append(fake.closeArgsForCall, struct {
arg1 bool
arg2 types.ParticipantCloseReason
}{arg1, arg2})
arg3 bool
}{arg1, arg2, arg3})
stub := fake.CloseStub
fakeReturns := fake.closeReturns
fake.recordInvocation("Close", []interface{}{arg1, arg2})
fake.recordInvocation("Close", []interface{}{arg1, arg2, arg3})
fake.closeMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
@@ -286,17 +288,17 @@ func (fake *FakeParticipant) CloseCallCount() int {
return len(fake.closeArgsForCall)
}
func (fake *FakeParticipant) CloseCalls(stub func(bool, types.ParticipantCloseReason) error) {
func (fake *FakeParticipant) CloseCalls(stub func(bool, types.ParticipantCloseReason, bool) error) {
fake.closeMutex.Lock()
defer fake.closeMutex.Unlock()
fake.CloseStub = stub
}
func (fake *FakeParticipant) CloseArgsForCall(i int) (bool, types.ParticipantCloseReason) {
func (fake *FakeParticipant) CloseArgsForCall(i int) (bool, types.ParticipantCloseReason, bool) {
fake.closeMutex.RLock()
defer fake.closeMutex.RUnlock()
argsForCall := fake.closeArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeParticipant) CloseReturns(result1 error) {
+43 -12
View File
@@ -194,7 +194,7 @@ func (r *RoomManager) Stop() {
for _, room := range rooms {
for _, p := range room.GetParticipants() {
_ = p.Close(true, types.ParticipantCloseReasonRoomManagerStop)
_ = p.Close(true, types.ParticipantCloseReasonRoomManagerStop, false)
}
room.Close()
}
@@ -229,11 +229,35 @@ func (r *RoomManager) StartSession(
if pi.Identity == "" {
return nil
}
participant := room.GetParticipant(pi.Identity)
if participant != nil {
// When reconnecting, it means WS has interrupted by underlying peer connection is still ok
// in this mode, we'll keep the participant SID, and just swap the sink for the underlying connection
// When reconnecting, it means WS has interrupted but underlying peer connection is still ok in this state,
// we'll keep the participant SID, and just swap the sink for the underlying connection
if pi.Reconnect {
if participant.IsClosed() {
// Send leave request if participant is closed, i. e. handle the case of client trying to resume crossing wires with
// server closing the participant due to some irrecoverable condition. Such a condition would have triggered
// a full reconnect when that condition occurred.
//
// It is possible that the client did not get that send request. So, send it again.
logger.Infow("cannot restart a closed participant",
"room", roomName,
"nodeID", r.currentNode.Id,
"participant", pi.Identity,
"reason", pi.ReconnectReason,
)
_ = responseSink.WriteMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Leave{
Leave: &livekit.LeaveRequest{
CanReconnect: true,
Reason: livekit.DisconnectReason_STATE_MISMATCH,
},
},
})
return errors.New("could not restart closed participant")
}
logger.Infow("resuming RTC session",
"room", roomName,
"nodeID", r.currentNode.Id,
@@ -244,20 +268,27 @@ func (r *RoomManager) StartSession(
if iceConfig == nil {
iceConfig = &livekit.ICEConfig{}
}
if err = room.ResumeParticipant(participant, requestSource, responseSink,
r.iceServersForRoom(protoRoom, iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS),
pi.ReconnectReason); err != nil {
if err = room.ResumeParticipant(
participant,
requestSource,
responseSink,
r.iceServersForRoom(
protoRoom,
iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS,
),
pi.ReconnectReason,
); err != nil {
logger.Warnw("could not resume participant", err, "participant", pi.Identity)
return err
}
r.telemetry.ParticipantResumed(ctx, room.ToProto(), participant.ToProto(), livekit.NodeID(r.currentNode.Id), pi.ReconnectReason)
go r.rtcSessionWorker(room, participant, requestSource)
return nil
} else {
participant.GetLogger().Infow("removing duplicate participant")
// we need to clean up the existing participant, so a new one can join
room.RemoveParticipant(participant.Identity(), participant.ID(), types.ParticipantCloseReasonDuplicateIdentity)
}
// we need to clean up the existing participant, so a new one can join
participant.GetLogger().Infow("removing duplicate participant")
room.RemoveParticipant(participant.Identity(), participant.ID(), types.ParticipantCloseReasonDuplicateIdentity)
} else if pi.Reconnect {
// send leave request if participant is trying to reconnect without keep subscribe state
// but missing from the room
@@ -356,7 +387,7 @@ func (r *RoomManager) StartSession(
}
if err = room.Join(participant, requestSource, &opts, r.iceServersForRoom(protoRoom, iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS)); err != nil {
pLogger.Errorw("could not join room", err)
_ = participant.Close(true, types.ParticipantCloseReasonJoinFailed)
_ = participant.Close(true, types.ParticipantCloseReasonJoinFailed, false)
return err
}
if err = r.roomStore.StoreParticipant(ctx, roomName, participant.ToProto()); err != nil {
@@ -598,7 +629,7 @@ func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName livekit.Roo
case *livekit.RTCNodeMessage_DeleteRoom:
room.Logger.Infow("deleting room")
for _, p := range room.GetParticipants() {
_ = p.Close(true, types.ParticipantCloseReasonServiceRequestDeleteRoom)
_ = p.Close(true, types.ParticipantCloseReasonServiceRequestDeleteRoom, false)
}
room.Close()
case *livekit.RTCNodeMessage_UpdateSubscriptions: