From c8b7d486b9ca3435f99823234c46e4fb36a635e1 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 31 Jan 2024 11:36:50 +0530 Subject: [PATCH] Do not synthesise DISCONNECT on session change. (#2412) * Do not synthesise DISCONNECT on session change. v12 clients can handle session change based on identity. * change for testf * Squelch participant update if close reason is DUPLICATE_IDENTITY. * fix test * comment * Clean up participant close reason a bit * fix test * test --- pkg/rtc/participant.go | 16 +++- pkg/rtc/room.go | 88 ++++++++++++------ pkg/rtc/room_test.go | 93 ++++++++++--------- pkg/rtc/types/interfaces.go | 17 ++-- pkg/rtc/types/protocol_version.go | 4 + .../typesfakes/fake_local_participant.go | 65 +++++++++++++ pkg/rtc/types/typesfakes/fake_participant.go | 65 +++++++++++++ pkg/service/roommanager.go | 10 +- test/webhook_test.go | 3 +- 9 files changed, 268 insertions(+), 93 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 52da57e30..8c476d5e7 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -140,9 +140,13 @@ type ParticipantImpl struct { params ParticipantParams isClosed atomic.Bool - state atomic.Value // livekit.ParticipantInfo_State - resSinkMu sync.Mutex - resSink routing.MessageSink + closeReason atomic.Value // types.ParticipantCloseReason + + state atomic.Value // livekit.ParticipantInfo_State + + resSinkMu sync.Mutex + resSink routing.MessageSink + grants *auth.ClaimGrants hidden atomic.Bool isPublisher atomic.Bool @@ -253,6 +257,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { if !params.DisableSupervisor { p.supervisor = supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}) } + p.closeReason.Store(types.ParticipantCloseReasonNone) p.version.Store(params.InitialVersion) p.timedVersion.Update(params.VersionGenerator.New()) p.migrateState.Store(types.MigrateStateInit) @@ -762,6 +767,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea "reason", reason.String(), "isExpectedToResume", isExpectedToResume, ) + p.closeReason.Store(reason) p.clearDisconnectTimer() p.clearMigrationTimer() @@ -812,6 +818,10 @@ func (p *ParticipantImpl) IsClosed() bool { return p.isClosed.Load() } +func (p *ParticipantImpl) CloseReason() types.ParticipantCloseReason { + return p.closeReason.Load().(types.ParticipantCloseReason) +} + // Negotiate subscriber SDP with client, if force is true, will cancel pending // negotiate task and negotiate immediately func (p *ParticipantImpl) Negotiate(force bool) { diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 1daf41cb6..4e848a7c5 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -68,6 +68,12 @@ type broadcastOptions struct { immediate bool } +type participantUpdate struct { + pi *livekit.ParticipantInfo + isSynthesizedDisconnect bool + closeReason types.ParticipantCloseReason +} + type disconnectSignalOnResumeNoMessages struct { expiry time.Time closedCount int @@ -100,7 +106,7 @@ type Room struct { bufferFactory *buffer.FactoryOfBufferFactory // batch update participant info for non-publishers - batchedUpdates map[livekit.ParticipantIdentity]*livekit.ParticipantInfo + batchedUpdates map[livekit.ParticipantIdentity]*participantUpdate batchedUpdatesMu sync.Mutex // time the first participant joined the room @@ -155,7 +161,7 @@ func NewRoom( participantRequestSources: make(map[livekit.ParticipantIdentity]routing.MessageSource), hasPublished: make(map[livekit.ParticipantIdentity]bool), bufferFactory: buffer.NewFactoryOfBufferFactory(config.Receiver.PacketBufferSize), - batchedUpdates: make(map[livekit.ParticipantIdentity]*livekit.ParticipantInfo), + batchedUpdates: make(map[livekit.ParticipantIdentity]*participantUpdate), closed: make(chan struct{}), trailer: []byte(utils.RandomSecret()), disconnectSignalOnResumeParticipants: make(map[livekit.ParticipantIdentity]time.Time), @@ -367,7 +373,8 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me p.GetLogger().Infow("participant active", connectionDetailsFields(cds)...) } else if state == livekit.ParticipantInfo_DISCONNECTED { // remove participant from room - go r.RemoveParticipant(p.Identity(), p.ID(), types.ParticipantCloseReasonStateDisconnected) + // participant should already be closed and have a close reason, so NONE is fine here + go r.RemoveParticipant(p.Identity(), p.ID(), types.ParticipantCloseReasonNone) } }) // it's important to set this before connection, we don't want to miss out on any published tracks @@ -764,11 +771,11 @@ func (r *Room) CloseIfEmpty() { r.lock.Unlock() if elapsed >= int64(timeout) { - r.Close() + r.Close(types.ParticipantCloseReasonNone) } } -func (r *Room) Close() { +func (r *Room) Close(reason types.ParticipantCloseReason) { r.lock.Lock() select { case <-r.closed: @@ -782,7 +789,7 @@ func (r *Room) Close() { r.Logger.Infow("closing room") for _, p := range r.GetParticipants() { - _ = p.Close(true, types.ParticipantCloseReasonRoomClose, false) + _ = p.Close(true, reason, false) } r.protoProxy.Stop() @@ -1104,27 +1111,49 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcas // send update only to hidden participant err := p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi}) if err != nil { - r.Logger.Errorw("could not send update to participant", err, - "participant", p.Identity(), "pID", p.ID()) + p.GetLogger().Errorw("could not send update to participant", err) } } return } - updates := r.pushAndDequeueUpdates(pi, opts.immediate) + updates := r.pushAndDequeueUpdates(pi, p.CloseReason(), opts.immediate) r.sendParticipantUpdates(updates) } -func (r *Room) sendParticipantUpdates(updates []*livekit.ParticipantInfo) { +func (r *Room) sendParticipantUpdates(updates []*participantUpdate) { if len(updates) == 0 { return } + // For filtered updates, skip + // 1. synthesized DISCONNECT - this happens on SID change + // 2. close reasons of DUPLICATE_IDENTITY/STALE - A newer session for that identity exists. + // + // Filtered updates are used with clients that can handle identity based reconnect and hence those + // conditions can be skipped. + var filteredUpdates []*livekit.ParticipantInfo + for _, update := range updates { + if update.isSynthesizedDisconnect || IsCloseNotifySkippable(update.closeReason) { + continue + } + filteredUpdates = append(filteredUpdates, update.pi) + } + + var fullUpdates []*livekit.ParticipantInfo + for _, update := range updates { + fullUpdates = append(fullUpdates, update.pi) + } + for _, op := range r.GetParticipants() { - err := op.SendParticipantUpdate(updates) + var err error + if op.ProtocolVersion().SupportsIdentityBasedReconnection() { + err = op.SendParticipantUpdate(filteredUpdates) + } else { + err = op.SendParticipantUpdate(fullUpdates) + } if err != nil { - r.Logger.Errorw("could not send update to participant", err, - "participant", op.Identity(), "pID", op.ID()) + op.GetLogger().Errorw("could not send update to participant", err) } } } @@ -1170,29 +1199,34 @@ func (r *Room) sendSpeakerChanges(speakers []*livekit.SpeakerInfo) { // * subscriber-only updates will be queued for batch updates // * publisher & immediate updates will be returned without queuing // * when the SID changes, it will return both updates, with the earlier participant set to disconnected -func (r *Room) pushAndDequeueUpdates(pi *livekit.ParticipantInfo, isImmediate bool) []*livekit.ParticipantInfo { +func (r *Room) pushAndDequeueUpdates( + pi *livekit.ParticipantInfo, + closeReason types.ParticipantCloseReason, + isImmediate bool, +) []*participantUpdate { r.batchedUpdatesMu.Lock() defer r.batchedUpdatesMu.Unlock() - var updates []*livekit.ParticipantInfo + var updates []*participantUpdate identity := livekit.ParticipantIdentity(pi.Identity) existing := r.batchedUpdates[identity] shouldSend := isImmediate || pi.IsPublisher if existing != nil { - if pi.Sid == existing.Sid { + if pi.Sid == existing.pi.Sid { // same participant session - if pi.Version < existing.Version { + if pi.Version < existing.pi.Version { // out of order update return nil } } else { // different participant sessions - if existing.JoinedAt < pi.JoinedAt { + if existing.pi.JoinedAt < pi.JoinedAt { // existing is older, synthesize a DISCONNECT for older and // send immediately along with newer session to signal switch shouldSend = true - existing.State = livekit.ParticipantInfo_DISCONNECTED + existing.pi.State = livekit.ParticipantInfo_DISCONNECTED + existing.isSynthesizedDisconnect = true updates = append(updates, existing) } else { // older session update, newer session has already become active, so nothing to do @@ -1213,10 +1247,10 @@ func (r *Room) pushAndDequeueUpdates(pi *livekit.ParticipantInfo, isImmediate bo if shouldSend { // include any queued update, and return delete(r.batchedUpdates, identity) - updates = append(updates, pi) + updates = append(updates, &participantUpdate{pi: pi, closeReason: closeReason}) } else { // enqueue for batch - r.batchedUpdates[identity] = pi + r.batchedUpdates[identity] = &participantUpdate{pi: pi, closeReason: closeReason} } return updates @@ -1257,18 +1291,14 @@ func (r *Room) changeUpdateWorker() { case <-subTicker.C: r.batchedUpdatesMu.Lock() updatesMap := r.batchedUpdates - r.batchedUpdates = make(map[livekit.ParticipantIdentity]*livekit.ParticipantInfo) + r.batchedUpdates = make(map[livekit.ParticipantIdentity]*participantUpdate) r.batchedUpdatesMu.Unlock() if len(updatesMap) == 0 { continue } - updates := make([]*livekit.ParticipantInfo, 0, len(updatesMap)) - for _, pi := range updatesMap { - updates = append(updates, pi) - } - r.sendParticipantUpdates(updates) + r.sendParticipantUpdates(maps.Values(updatesMap)) } } } @@ -1509,6 +1539,10 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, dp }) } +func IsCloseNotifySkippable(closeReason types.ParticipantCloseReason) bool { + return closeReason == types.ParticipantCloseReasonDuplicateIdentity +} + func connectionDetailsFields(cds []*types.ICEConnectionDetails) []interface{} { var fields []interface{} connectionType := types.ICEConnectionTypeUnknown diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 4bf6ff6b6..0febf2989 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -135,7 +135,7 @@ func TestRoomJoin(t *testing.T) { disconnectedParticipant := participants[1].(*typesfakes.FakeLocalParticipant) disconnectedParticipant.StateReturns(livekit.ParticipantInfo_DISCONNECTED) - rm.RemoveParticipant(p.Identity(), p.ID(), types.ParticipantCloseReasonStateDisconnected) + rm.RemoveParticipant(p.Identity(), p.ID(), types.ParticipantCloseReasonClientRequestLeave) time.Sleep(defaultDelay) require.Equal(t, p, changedParticipant) @@ -265,17 +265,18 @@ func TestPushAndDequeueUpdates(t *testing.T) { require.Equal(t, a.Version, b.Version) } testCases := []struct { - name string - pi *livekit.ParticipantInfo - immediate bool - existing *livekit.ParticipantInfo - expected []*livekit.ParticipantInfo - validate func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo) + name string + pi *livekit.ParticipantInfo + closeReason types.ParticipantCloseReason + immediate bool + existing *participantUpdate + expected []*participantUpdate + validate func(t *testing.T, rm *Room, updates []*participantUpdate) }{ { name: "publisher updates are immediate", pi: publisher1v1, - expected: []*livekit.ParticipantInfo{publisher1v1}, + expected: []*participantUpdate{{pi: publisher1v1}}, }, { name: "subscriber updates are queued", @@ -284,20 +285,20 @@ func TestPushAndDequeueUpdates(t *testing.T) { { name: "last version is enqueued", pi: subscriber1v2, - existing: subscriber1v1, - validate: func(t *testing.T, rm *Room, _ []*livekit.ParticipantInfo) { + existing: &participantUpdate{pi: proto.Clone(subscriber1v1).(*livekit.ParticipantInfo)}, // clone the existing value since it can be modified when setting to disconnected + validate: func(t *testing.T, rm *Room, _ []*participantUpdate) { queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)] require.NotNil(t, queued) - requirePIEquals(t, subscriber1v2, queued) + requirePIEquals(t, subscriber1v2, queued.pi) }, }, { name: "latest version when immediate", pi: subscriber1v2, - existing: subscriber1v1, + existing: &participantUpdate{pi: proto.Clone(subscriber1v1).(*livekit.ParticipantInfo)}, immediate: true, - expected: []*livekit.ParticipantInfo{subscriber1v2}, - validate: func(t *testing.T, rm *Room, _ []*livekit.ParticipantInfo) { + expected: []*participantUpdate{{pi: subscriber1v2}}, + validate: func(t *testing.T, rm *Room, _ []*participantUpdate) { queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)] require.Nil(t, queued) }, @@ -305,32 +306,37 @@ func TestPushAndDequeueUpdates(t *testing.T) { { name: "out of order updates are rejected", pi: subscriber1v1, - existing: subscriber1v2, - validate: func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo) { + existing: &participantUpdate{pi: proto.Clone(subscriber1v2).(*livekit.ParticipantInfo)}, + validate: func(t *testing.T, rm *Room, updates []*participantUpdate) { queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)] - requirePIEquals(t, subscriber1v2, queued) + requirePIEquals(t, subscriber1v2, queued.pi) }, }, { - name: "sid change is broadcasted immediately", - pi: publisher2, - existing: subscriber1v2, - expected: []*livekit.ParticipantInfo{ + name: "sid change is broadcasted immediately with synthsized disconnect", + pi: publisher2, + closeReason: types.ParticipantCloseReasonServiceRequestRemoveParticipant, // just to test if update contain the close reason + existing: &participantUpdate{pi: proto.Clone(subscriber1v2).(*livekit.ParticipantInfo), closeReason: types.ParticipantCloseReasonStale}, + expected: []*participantUpdate{ { - Identity: identity, - Sid: "1", - Version: 2, - State: livekit.ParticipantInfo_DISCONNECTED, + pi: &livekit.ParticipantInfo{ + Identity: identity, + Sid: "1", + Version: 2, + State: livekit.ParticipantInfo_DISCONNECTED, + }, + isSynthesizedDisconnect: true, + closeReason: types.ParticipantCloseReasonStale, }, - publisher2, + {pi: publisher2, closeReason: types.ParticipantCloseReasonServiceRequestRemoveParticipant}, }, }, { name: "when switching to publisher, queue is cleared", pi: publisher1v2, - existing: subscriber1v1, - expected: []*livekit.ParticipantInfo{publisher1v2}, - validate: func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo) { + existing: &participantUpdate{pi: proto.Clone(subscriber1v1).(*livekit.ParticipantInfo)}, + expected: []*participantUpdate{{pi: publisher1v2}}, + validate: func(t *testing.T, rm *Room, updates []*participantUpdate) { require.Empty(t, rm.batchedUpdates) }, }, @@ -340,13 +346,14 @@ func TestPushAndDequeueUpdates(t *testing.T) { t.Run(tc.name, func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 1}) if tc.existing != nil { - // clone the existing value since it can be modified when setting to disconnected - rm.batchedUpdates[livekit.ParticipantIdentity(tc.existing.Identity)] = proto.Clone(tc.existing).(*livekit.ParticipantInfo) + rm.batchedUpdates[livekit.ParticipantIdentity(tc.existing.pi.Identity)] = tc.existing } - updates := rm.pushAndDequeueUpdates(tc.pi, tc.immediate) + updates := rm.pushAndDequeueUpdates(tc.pi, tc.closeReason, tc.immediate) require.Equal(t, len(tc.expected), len(updates)) for i, item := range tc.expected { - requirePIEquals(t, item, updates[i]) + requirePIEquals(t, item.pi, updates[i].pi) + require.Equal(t, item.isSynthesizedDisconnect, updates[i].isSynthesizedDisconnect) + require.Equal(t, item.closeReason, updates[i].closeReason) } if tc.validate != nil { @@ -443,7 +450,7 @@ func TestActiveSpeakers(t *testing.T) { audioUpdateDuration := (audioUpdateInterval + 10) * time.Millisecond t.Run("participant should not be getting audio updates (protocol 2)", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 1, protocol: 2}) - defer rm.Close() + defer rm.Close(types.ParticipantCloseReasonNone) p := rm.GetParticipants()[0].(*typesfakes.FakeLocalParticipant) require.Empty(t, rm.GetActiveSpeakers()) @@ -455,7 +462,7 @@ func TestActiveSpeakers(t *testing.T) { t.Run("speakers should be sorted by loudness", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 2}) - defer rm.Close() + defer rm.Close(types.ParticipantCloseReasonNone) participants := rm.GetParticipants() p := participants[0].(*typesfakes.FakeLocalParticipant) p2 := participants[1].(*typesfakes.FakeLocalParticipant) @@ -470,7 +477,7 @@ func TestActiveSpeakers(t *testing.T) { t.Run("participants are getting audio updates (protocol 3+)", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 2, protocol: 3}) - defer rm.Close() + defer rm.Close(types.ParticipantCloseReasonNone) participants := rm.GetParticipants() p := participants[0].(*typesfakes.FakeLocalParticipant) time.Sleep(time.Millisecond) // let the first update cycle run @@ -509,7 +516,7 @@ func TestActiveSpeakers(t *testing.T) { t.Run("audio level is smoothed", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 2, protocol: 3, audioSmoothIntervals: 3}) - defer rm.Close() + defer rm.Close(types.ParticipantCloseReasonNone) participants := rm.GetParticipants() p := participants[0].(*typesfakes.FakeLocalParticipant) op := participants[1].(*typesfakes.FakeLocalParticipant) @@ -566,7 +573,7 @@ func TestDataChannel(t *testing.T) { t.Run("participants should receive data", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 3}) - defer rm.Close() + defer rm.Close(types.ParticipantCloseReasonNone) participants := rm.GetParticipants() p := participants[0].(*typesfakes.FakeLocalParticipant) @@ -596,7 +603,7 @@ func TestDataChannel(t *testing.T) { t.Run("only one participant should receive the data", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 4}) - defer rm.Close() + defer rm.Close(types.ParticipantCloseReasonNone) participants := rm.GetParticipants() p := participants[0].(*typesfakes.FakeLocalParticipant) p1 := participants[1].(*typesfakes.FakeLocalParticipant) @@ -627,7 +634,7 @@ func TestDataChannel(t *testing.T) { t.Run("publishing disallowed", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 2}) - defer rm.Close() + defer rm.Close(types.ParticipantCloseReasonNone) participants := rm.GetParticipants() p := participants[0].(*typesfakes.FakeLocalParticipant) p.CanPublishDataReturns(false) @@ -655,7 +662,7 @@ func TestDataChannel(t *testing.T) { func TestHiddenParticipants(t *testing.T) { t.Run("other participants don't receive hidden updates", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 2, numHidden: 1}) - defer rm.Close() + defer rm.Close(types.ParticipantCloseReasonNone) pNew := NewMockParticipant("new", types.CurrentProtocol, false, false) rm.Join(pNew, nil, nil, iceServersForRoom) @@ -687,7 +694,7 @@ func TestHiddenParticipants(t *testing.T) { func TestRoomUpdate(t *testing.T) { t.Run("updates are sent when participant joined", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 1}) - defer rm.Close() + defer rm.Close(types.ParticipantCloseReasonNone) p1 := rm.GetParticipants()[0].(*typesfakes.FakeLocalParticipant) require.Equal(t, 0, p1.SendRoomUpdateCallCount()) @@ -703,7 +710,7 @@ func TestRoomUpdate(t *testing.T) { t.Run("participants should receive metadata update", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 2}) - defer rm.Close() + defer rm.Close(types.ParticipantCloseReasonNone) rm.SetMetadata("test metadata...") diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index af18021d0..32eb1e35c 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -81,14 +81,13 @@ type SubscribedCodecQuality struct { type ParticipantCloseReason int const ( - ParticipantCloseReasonClientRequestLeave ParticipantCloseReason = iota + ParticipantCloseReasonNone ParticipantCloseReason = iota + ParticipantCloseReasonClientRequestLeave ParticipantCloseReasonRoomManagerStop - ParticipantCloseReasonRoomClose ParticipantCloseReasonVerifyFailed ParticipantCloseReasonJoinFailed ParticipantCloseReasonJoinTimeout ParticipantCloseReasonMessageBusFailed - ParticipantCloseReasonStateDisconnected ParticipantCloseReasonPeerConnectionDisconnected ParticipantCloseReasonDuplicateIdentity ParticipantCloseReasonMigrationComplete @@ -100,7 +99,6 @@ const ( ParticipantCloseReasonSimulateServerLeave ParticipantCloseReasonNegotiateFailed ParticipantCloseReasonMigrationRequested - ParticipantCloseReasonOvercommitted ParticipantCloseReasonPublicationError ParticipantCloseReasonSubscriptionError ParticipantCloseReasonDataChannelError @@ -110,12 +108,12 @@ const ( func (p ParticipantCloseReason) String() string { switch p { + case ParticipantCloseReasonNone: + return "NONE" case ParticipantCloseReasonClientRequestLeave: return "CLIENT_REQUEST_LEAVE" case ParticipantCloseReasonRoomManagerStop: return "ROOM_MANAGER_STOP" - case ParticipantCloseReasonRoomClose: - return "ROOM_CLOSE" case ParticipantCloseReasonVerifyFailed: return "VERIFY_FAILED" case ParticipantCloseReasonJoinFailed: @@ -124,8 +122,6 @@ func (p ParticipantCloseReason) String() string { return "JOIN_TIMEOUT" case ParticipantCloseReasonMessageBusFailed: return "MESSAGE_BUS_FAILED" - case ParticipantCloseReasonStateDisconnected: - return "STATE_DISCONNECTED" case ParticipantCloseReasonPeerConnectionDisconnected: return "PEER_CONNECTION_DISCONNECTED" case ParticipantCloseReasonDuplicateIdentity: @@ -148,8 +144,6 @@ func (p ParticipantCloseReason) String() string { return "NEGOTIATE_FAILED" case ParticipantCloseReasonMigrationRequested: return "MIGRATION_REQUESTED" - case ParticipantCloseReasonOvercommitted: - return "OVERCOMMITTED" case ParticipantCloseReasonPublicationError: return "PUBLICATION_ERROR" case ParticipantCloseReasonSubscriptionError: @@ -184,7 +178,7 @@ func (p ParticipantCloseReason) ToDisconnectReason() livekit.DisconnectReason { return livekit.DisconnectReason_PARTICIPANT_REMOVED case ParticipantCloseReasonServiceRequestDeleteRoom: return livekit.DisconnectReason_ROOM_DELETED - case ParticipantCloseReasonSimulateNodeFailure, ParticipantCloseReasonSimulateServerLeave, ParticipantCloseReasonOvercommitted: + case ParticipantCloseReasonSimulateNodeFailure, ParticipantCloseReasonSimulateServerLeave: return livekit.DisconnectReason_SERVER_SHUTDOWN case ParticipantCloseReasonNegotiateFailed, ParticipantCloseReasonPublicationError, ParticipantCloseReasonSubscriptionError, ParticipantCloseReasonDataChannelError, ParticipantCloseReasonMigrateCodecMismatch: return livekit.DisconnectReason_STATE_MISMATCH @@ -250,6 +244,7 @@ type Participant interface { ID() livekit.ParticipantID Identity() livekit.ParticipantIdentity State() livekit.ParticipantInfo_State + CloseReason() ParticipantCloseReason CanSkipBroadcast() bool ToProto() *livekit.ParticipantInfo diff --git a/pkg/rtc/types/protocol_version.go b/pkg/rtc/types/protocol_version.go index 08075184d..ae2282085 100644 --- a/pkg/rtc/types/protocol_version.go +++ b/pkg/rtc/types/protocol_version.go @@ -84,6 +84,10 @@ func (v ProtocolVersion) SupportsAsyncRoomID() bool { return v > 11 } +func (v ProtocolVersion) SupportsIdentityBasedReconnection() bool { + return v > 11 +} + func (v ProtocolVersion) SupportsRegionsInLeaveRequest() bool { return v > 12 } diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index f101e4398..c20a1a1d7 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -133,6 +133,16 @@ type FakeLocalParticipant struct { closeReturnsOnCall map[int]struct { result1 error } + CloseReasonStub func() types.ParticipantCloseReason + closeReasonMutex sync.RWMutex + closeReasonArgsForCall []struct { + } + closeReasonReturns struct { + result1 types.ParticipantCloseReason + } + closeReasonReturnsOnCall map[int]struct { + result1 types.ParticipantCloseReason + } CloseSignalConnectionStub func(types.SignallingCloseReason) closeSignalConnectionMutex sync.RWMutex closeSignalConnectionArgsForCall []struct { @@ -1545,6 +1555,59 @@ func (fake *FakeLocalParticipant) CloseReturnsOnCall(i int, result1 error) { }{result1} } +func (fake *FakeLocalParticipant) CloseReason() types.ParticipantCloseReason { + fake.closeReasonMutex.Lock() + ret, specificReturn := fake.closeReasonReturnsOnCall[len(fake.closeReasonArgsForCall)] + fake.closeReasonArgsForCall = append(fake.closeReasonArgsForCall, struct { + }{}) + stub := fake.CloseReasonStub + fakeReturns := fake.closeReasonReturns + fake.recordInvocation("CloseReason", []interface{}{}) + fake.closeReasonMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) CloseReasonCallCount() int { + fake.closeReasonMutex.RLock() + defer fake.closeReasonMutex.RUnlock() + return len(fake.closeReasonArgsForCall) +} + +func (fake *FakeLocalParticipant) CloseReasonCalls(stub func() types.ParticipantCloseReason) { + fake.closeReasonMutex.Lock() + defer fake.closeReasonMutex.Unlock() + fake.CloseReasonStub = stub +} + +func (fake *FakeLocalParticipant) CloseReasonReturns(result1 types.ParticipantCloseReason) { + fake.closeReasonMutex.Lock() + defer fake.closeReasonMutex.Unlock() + fake.CloseReasonStub = nil + fake.closeReasonReturns = struct { + result1 types.ParticipantCloseReason + }{result1} +} + +func (fake *FakeLocalParticipant) CloseReasonReturnsOnCall(i int, result1 types.ParticipantCloseReason) { + fake.closeReasonMutex.Lock() + defer fake.closeReasonMutex.Unlock() + fake.CloseReasonStub = nil + if fake.closeReasonReturnsOnCall == nil { + fake.closeReasonReturnsOnCall = make(map[int]struct { + result1 types.ParticipantCloseReason + }) + } + fake.closeReasonReturnsOnCall[i] = struct { + result1 types.ParticipantCloseReason + }{result1} +} + func (fake *FakeLocalParticipant) CloseSignalConnection(arg1 types.SignallingCloseReason) { fake.closeSignalConnectionMutex.Lock() fake.closeSignalConnectionArgsForCall = append(fake.closeSignalConnectionArgsForCall, struct { @@ -6171,6 +6234,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.claimGrantsMutex.RUnlock() fake.closeMutex.RLock() defer fake.closeMutex.RUnlock() + fake.closeReasonMutex.RLock() + defer fake.closeReasonMutex.RUnlock() fake.closeSignalConnectionMutex.RLock() defer fake.closeSignalConnectionMutex.RUnlock() fake.connectedAtMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index 0c5c3929a..98fff9c80 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -33,6 +33,16 @@ type FakeParticipant struct { closeReturnsOnCall map[int]struct { result1 error } + CloseReasonStub func() types.ParticipantCloseReason + closeReasonMutex sync.RWMutex + closeReasonArgsForCall []struct { + } + closeReasonReturns struct { + result1 types.ParticipantCloseReason + } + closeReasonReturnsOnCall map[int]struct { + result1 types.ParticipantCloseReason + } DebugInfoStub func() map[string]interface{} debugInfoMutex sync.RWMutex debugInfoArgsForCall []struct { @@ -342,6 +352,59 @@ func (fake *FakeParticipant) CloseReturnsOnCall(i int, result1 error) { }{result1} } +func (fake *FakeParticipant) CloseReason() types.ParticipantCloseReason { + fake.closeReasonMutex.Lock() + ret, specificReturn := fake.closeReasonReturnsOnCall[len(fake.closeReasonArgsForCall)] + fake.closeReasonArgsForCall = append(fake.closeReasonArgsForCall, struct { + }{}) + stub := fake.CloseReasonStub + fakeReturns := fake.closeReasonReturns + fake.recordInvocation("CloseReason", []interface{}{}) + fake.closeReasonMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeParticipant) CloseReasonCallCount() int { + fake.closeReasonMutex.RLock() + defer fake.closeReasonMutex.RUnlock() + return len(fake.closeReasonArgsForCall) +} + +func (fake *FakeParticipant) CloseReasonCalls(stub func() types.ParticipantCloseReason) { + fake.closeReasonMutex.Lock() + defer fake.closeReasonMutex.Unlock() + fake.CloseReasonStub = stub +} + +func (fake *FakeParticipant) CloseReasonReturns(result1 types.ParticipantCloseReason) { + fake.closeReasonMutex.Lock() + defer fake.closeReasonMutex.Unlock() + fake.CloseReasonStub = nil + fake.closeReasonReturns = struct { + result1 types.ParticipantCloseReason + }{result1} +} + +func (fake *FakeParticipant) CloseReasonReturnsOnCall(i int, result1 types.ParticipantCloseReason) { + fake.closeReasonMutex.Lock() + defer fake.closeReasonMutex.Unlock() + fake.CloseReasonStub = nil + if fake.closeReasonReturnsOnCall == nil { + fake.closeReasonReturnsOnCall = make(map[int]struct { + result1 types.ParticipantCloseReason + }) + } + fake.closeReasonReturnsOnCall[i] = struct { + result1 types.ParticipantCloseReason + }{result1} +} + func (fake *FakeParticipant) DebugInfo() map[string]interface{} { fake.debugInfoMutex.Lock() ret, specificReturn := fake.debugInfoReturnsOnCall[len(fake.debugInfoArgsForCall)] @@ -1337,6 +1400,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.canSkipBroadcastMutex.RUnlock() fake.closeMutex.RLock() defer fake.closeMutex.RUnlock() + fake.closeReasonMutex.RLock() + defer fake.closeReasonMutex.RUnlock() fake.debugInfoMutex.RLock() defer fake.debugInfoMutex.RUnlock() fake.getAudioLevelMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index abfd29331..b09ec48c2 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -213,10 +213,7 @@ func (r *RoomManager) Stop() { r.lock.RUnlock() for _, room := range rooms { - for _, p := range room.GetParticipants() { - _ = p.Close(true, types.ParticipantCloseReasonRoomManagerStop, false) - } - room.Close() + room.Close(types.ParticipantCloseReasonRoomManagerStop) } r.roomServers.Kill() @@ -727,10 +724,7 @@ func (r *RoomManager) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq } } else { room.Logger.Infow("deleting room") - for _, p := range room.GetParticipants() { - _ = p.Close(true, types.ParticipantCloseReasonServiceRequestDeleteRoom, false) - } - room.Close() + room.Close(types.ParticipantCloseReasonServiceRequestDeleteRoom) } return &livekit.DeleteRoomResponse{}, nil } diff --git a/test/webhook_test.go b/test/webhook_test.go index 678c48c80..0463eea1a 100644 --- a/test/webhook_test.go +++ b/test/webhook_test.go @@ -35,6 +35,7 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/service" "github.com/livekit/livekit-server/pkg/testutils" ) @@ -108,7 +109,7 @@ func TestWebhooks(t *testing.T) { // room closed rm := server.RoomManager().GetRoom(context.Background(), testRoom) - rm.Close() + rm.Close(types.ParticipantCloseReasonNone) testutils.WithTimeout(t, func() string { if ts.GetEvent(webhook.EventRoomFinished) == nil { return "did not receive RoomFinished"