diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index dcf79f21b..61d32ada7 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -269,7 +269,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions }) } else if state == livekit.ParticipantInfo_DISCONNECTED { // remove participant from room - go r.RemoveParticipant(p.Identity(), types.ParticipantCloseReasonStateDisconnected) + go r.RemoveParticipant(p.Identity(), p.ID(), types.ParticipantCloseReasonStateDisconnected) } }) participant.OnTrackUpdated(r.onTrackUpdated) @@ -317,7 +317,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions time.AfterFunc(time.Minute, func() { state := participant.State() if state == livekit.ParticipantInfo_JOINING || state == livekit.ParticipantInfo_JOINED { - r.RemoveParticipant(participant.Identity(), types.ParticipantCloseReasonJoinTimeout) + r.RemoveParticipant(participant.Identity(), participant.ID(), types.ParticipantCloseReasonJoinTimeout) } }) @@ -360,10 +360,16 @@ func (r *Room) ResumeParticipant(p types.LocalParticipant, responseSink routing. return nil } -func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, reason types.ParticipantCloseReason) { +func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livekit.ParticipantID, reason types.ParticipantCloseReason) { r.lock.Lock() p, ok := r.participants[identity] if ok { + if pID != "" && p.ID() != pID { + // participant session has been replaced + r.lock.Unlock() + return + } + delete(r.participants, identity) delete(r.participantOpts, identity) if !p.Hidden() { diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 7be24d82c..1cbce27e9 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -55,14 +55,14 @@ func TestJoinedState(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 1}) p0 := rm.GetParticipants()[0] s := time.Now().Unix() - rm.RemoveParticipant(p0.Identity(), types.ParticipantCloseReasonClientRequestLeave) + rm.RemoveParticipant(p0.Identity(), p0.ID(), types.ParticipantCloseReasonClientRequestLeave) require.LessOrEqual(t, s, rm.LastLeftAt()) }) t.Run("LastLeftAt should not be set when there are still participants in the room", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 2}) p0 := rm.GetParticipants()[0] - rm.RemoveParticipant(p0.Identity(), types.ParticipantCloseReasonClientRequestLeave) + rm.RemoveParticipant(p0.Identity(), p0.ID(), types.ParticipantCloseReasonClientRequestLeave) require.EqualValues(t, 0, rm.LastLeftAt()) }) } @@ -120,7 +120,7 @@ func TestRoomJoin(t *testing.T) { disconnectedParticipant := participants[1].(*typesfakes.FakeLocalParticipant) disconnectedParticipant.StateReturns(livekit.ParticipantInfo_DISCONNECTED) - rm.RemoveParticipant(p.Identity(), types.ParticipantCloseReasonStateDisconnected) + rm.RemoveParticipant(p.Identity(), p.ID(), types.ParticipantCloseReasonStateDisconnected) time.Sleep(defaultDelay) require.Equal(t, p, changedParticipant) @@ -333,7 +333,7 @@ func TestRoomClosure(t *testing.T) { p := rm.GetParticipants()[0] // allows immediate close after rm.protoRoom.EmptyTimeout = 0 - rm.RemoveParticipant(p.Identity(), types.ParticipantCloseReasonClientRequestLeave) + rm.RemoveParticipant(p.Identity(), p.ID(), types.ParticipantCloseReasonClientRequestLeave) time.Sleep(defaultDelay) diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index 9a59ae857..c4dad5661 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -62,7 +62,7 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant } case *livekit.SignalRequest_Leave: pLogger.Infow("client leaving room") - room.RemoveParticipant(participant.Identity(), types.ParticipantCloseReasonClientRequestLeave) + room.RemoveParticipant(participant.Identity(), participant.ID(), types.ParticipantCloseReasonClientRequestLeave) case *livekit.SignalRequest_UpdateLayers: err := room.UpdateVideoLayers(participant, msg.UpdateLayers) if err != nil { diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 54e699bd8..2f7ccf323 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -335,7 +335,7 @@ type LocalParticipant interface { type Room interface { Name() livekit.RoomName ID() livekit.RoomID - RemoveParticipant(identity livekit.ParticipantIdentity, reason ParticipantCloseReason) + RemoveParticipant(identity livekit.ParticipantIdentity, pID livekit.ParticipantID, reason ParticipantCloseReason) UpdateSubscriptions(participant LocalParticipant, trackIDs []livekit.TrackID, participantTracks []*livekit.ParticipantTracks, subscribe bool) error UpdateSubscriptionPermission(participant LocalParticipant, permissions *livekit.SubscriptionPermission) error SyncState(participant LocalParticipant, state *livekit.SyncState) error diff --git a/pkg/rtc/types/typesfakes/fake_room.go b/pkg/rtc/types/typesfakes/fake_room.go index 5c0985c91..6704c60bd 100644 --- a/pkg/rtc/types/typesfakes/fake_room.go +++ b/pkg/rtc/types/typesfakes/fake_room.go @@ -29,11 +29,12 @@ type FakeRoom struct { nameReturnsOnCall map[int]struct { result1 livekit.RoomName } - RemoveParticipantStub func(livekit.ParticipantIdentity, types.ParticipantCloseReason) + RemoveParticipantStub func(livekit.ParticipantIdentity, livekit.ParticipantID, types.ParticipantCloseReason) removeParticipantMutex sync.RWMutex removeParticipantArgsForCall []struct { arg1 livekit.ParticipantIdentity - arg2 types.ParticipantCloseReason + arg2 livekit.ParticipantID + arg3 types.ParticipantCloseReason } SetParticipantPermissionStub func(types.LocalParticipant, *livekit.ParticipantPermission) error setParticipantPermissionMutex sync.RWMutex @@ -219,17 +220,18 @@ func (fake *FakeRoom) NameReturnsOnCall(i int, result1 livekit.RoomName) { }{result1} } -func (fake *FakeRoom) RemoveParticipant(arg1 livekit.ParticipantIdentity, arg2 types.ParticipantCloseReason) { +func (fake *FakeRoom) RemoveParticipant(arg1 livekit.ParticipantIdentity, arg2 livekit.ParticipantID, arg3 types.ParticipantCloseReason) { fake.removeParticipantMutex.Lock() fake.removeParticipantArgsForCall = append(fake.removeParticipantArgsForCall, struct { arg1 livekit.ParticipantIdentity - arg2 types.ParticipantCloseReason - }{arg1, arg2}) + arg2 livekit.ParticipantID + arg3 types.ParticipantCloseReason + }{arg1, arg2, arg3}) stub := fake.RemoveParticipantStub - fake.recordInvocation("RemoveParticipant", []interface{}{arg1, arg2}) + fake.recordInvocation("RemoveParticipant", []interface{}{arg1, arg2, arg3}) fake.removeParticipantMutex.Unlock() if stub != nil { - fake.RemoveParticipantStub(arg1, arg2) + fake.RemoveParticipantStub(arg1, arg2, arg3) } } @@ -239,17 +241,17 @@ func (fake *FakeRoom) RemoveParticipantCallCount() int { return len(fake.removeParticipantArgsForCall) } -func (fake *FakeRoom) RemoveParticipantCalls(stub func(livekit.ParticipantIdentity, types.ParticipantCloseReason)) { +func (fake *FakeRoom) RemoveParticipantCalls(stub func(livekit.ParticipantIdentity, livekit.ParticipantID, types.ParticipantCloseReason)) { fake.removeParticipantMutex.Lock() defer fake.removeParticipantMutex.Unlock() fake.RemoveParticipantStub = stub } -func (fake *FakeRoom) RemoveParticipantArgsForCall(i int) (livekit.ParticipantIdentity, types.ParticipantCloseReason) { +func (fake *FakeRoom) RemoveParticipantArgsForCall(i int) (livekit.ParticipantIdentity, livekit.ParticipantID, types.ParticipantCloseReason) { fake.removeParticipantMutex.RLock() defer fake.removeParticipantMutex.RUnlock() argsForCall := fake.removeParticipantArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeRoom) SetParticipantPermission(arg1 types.LocalParticipant, arg2 *livekit.ParticipantPermission) error { diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 3286b6db7..e3ec703ec 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -242,7 +242,7 @@ func (r *RoomManager) StartSession( } else { participant.GetLogger().Infow("removing duplicate participant") // we need to clean up the existing participant, so a new one can join - room.RemoveParticipant(participant.Identity(), types.ParticipantCloseReasonDuplicateIdentity) + room.RemoveParticipant(participant.Identity(), participant.ID(), types.ParticipantCloseReasonDuplicateIdentity) } } else if pi.Reconnect { // send leave request if participant is trying to reconnect without keep subscribe state @@ -541,7 +541,8 @@ func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName livekit.Roo return } pLogger.Infow("removing participant") - room.RemoveParticipant(identity, types.ParticipantCloseReasonServiceRequestRemoveParticipant) + // remove participant by identity, any SID + room.RemoveParticipant(identity, "", types.ParticipantCloseReasonServiceRequestRemoveParticipant) case *livekit.RTCNodeMessage_MuteTrack: if participant == nil { return