Do not synthesise DISCONNECT on session change. (#2412)

* Do not synthesise DISCONNECT on session change.

v12 clients can handle session change based on identity.

* change for testf

* Squelch participant update if close reason is DUPLICATE_IDENTITY.

* fix test

* comment

* Clean up participant close reason a bit

* fix test

* test
This commit is contained in:
Raja Subramanian
2024-01-31 11:36:50 +05:30
committed by GitHub
parent f960a4f9fb
commit c8b7d486b9
9 changed files with 268 additions and 93 deletions
+13 -3
View File
@@ -140,9 +140,13 @@ type ParticipantImpl struct {
params ParticipantParams
isClosed atomic.Bool
state atomic.Value // livekit.ParticipantInfo_State
resSinkMu sync.Mutex
resSink routing.MessageSink
closeReason atomic.Value // types.ParticipantCloseReason
state atomic.Value // livekit.ParticipantInfo_State
resSinkMu sync.Mutex
resSink routing.MessageSink
grants *auth.ClaimGrants
hidden atomic.Bool
isPublisher atomic.Bool
@@ -253,6 +257,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
if !params.DisableSupervisor {
p.supervisor = supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger})
}
p.closeReason.Store(types.ParticipantCloseReasonNone)
p.version.Store(params.InitialVersion)
p.timedVersion.Update(params.VersionGenerator.New())
p.migrateState.Store(types.MigrateStateInit)
@@ -762,6 +767,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
"reason", reason.String(),
"isExpectedToResume", isExpectedToResume,
)
p.closeReason.Store(reason)
p.clearDisconnectTimer()
p.clearMigrationTimer()
@@ -812,6 +818,10 @@ func (p *ParticipantImpl) IsClosed() bool {
return p.isClosed.Load()
}
func (p *ParticipantImpl) CloseReason() types.ParticipantCloseReason {
return p.closeReason.Load().(types.ParticipantCloseReason)
}
// Negotiate subscriber SDP with client, if force is true, will cancel pending
// negotiate task and negotiate immediately
func (p *ParticipantImpl) Negotiate(force bool) {
+61 -27
View File
@@ -68,6 +68,12 @@ type broadcastOptions struct {
immediate bool
}
type participantUpdate struct {
pi *livekit.ParticipantInfo
isSynthesizedDisconnect bool
closeReason types.ParticipantCloseReason
}
type disconnectSignalOnResumeNoMessages struct {
expiry time.Time
closedCount int
@@ -100,7 +106,7 @@ type Room struct {
bufferFactory *buffer.FactoryOfBufferFactory
// batch update participant info for non-publishers
batchedUpdates map[livekit.ParticipantIdentity]*livekit.ParticipantInfo
batchedUpdates map[livekit.ParticipantIdentity]*participantUpdate
batchedUpdatesMu sync.Mutex
// time the first participant joined the room
@@ -155,7 +161,7 @@ func NewRoom(
participantRequestSources: make(map[livekit.ParticipantIdentity]routing.MessageSource),
hasPublished: make(map[livekit.ParticipantIdentity]bool),
bufferFactory: buffer.NewFactoryOfBufferFactory(config.Receiver.PacketBufferSize),
batchedUpdates: make(map[livekit.ParticipantIdentity]*livekit.ParticipantInfo),
batchedUpdates: make(map[livekit.ParticipantIdentity]*participantUpdate),
closed: make(chan struct{}),
trailer: []byte(utils.RandomSecret()),
disconnectSignalOnResumeParticipants: make(map[livekit.ParticipantIdentity]time.Time),
@@ -367,7 +373,8 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me
p.GetLogger().Infow("participant active", connectionDetailsFields(cds)...)
} else if state == livekit.ParticipantInfo_DISCONNECTED {
// remove participant from room
go r.RemoveParticipant(p.Identity(), p.ID(), types.ParticipantCloseReasonStateDisconnected)
// participant should already be closed and have a close reason, so NONE is fine here
go r.RemoveParticipant(p.Identity(), p.ID(), types.ParticipantCloseReasonNone)
}
})
// it's important to set this before connection, we don't want to miss out on any published tracks
@@ -764,11 +771,11 @@ func (r *Room) CloseIfEmpty() {
r.lock.Unlock()
if elapsed >= int64(timeout) {
r.Close()
r.Close(types.ParticipantCloseReasonNone)
}
}
func (r *Room) Close() {
func (r *Room) Close(reason types.ParticipantCloseReason) {
r.lock.Lock()
select {
case <-r.closed:
@@ -782,7 +789,7 @@ func (r *Room) Close() {
r.Logger.Infow("closing room")
for _, p := range r.GetParticipants() {
_ = p.Close(true, types.ParticipantCloseReasonRoomClose, false)
_ = p.Close(true, reason, false)
}
r.protoProxy.Stop()
@@ -1104,27 +1111,49 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcas
// send update only to hidden participant
err := p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi})
if err != nil {
r.Logger.Errorw("could not send update to participant", err,
"participant", p.Identity(), "pID", p.ID())
p.GetLogger().Errorw("could not send update to participant", err)
}
}
return
}
updates := r.pushAndDequeueUpdates(pi, opts.immediate)
updates := r.pushAndDequeueUpdates(pi, p.CloseReason(), opts.immediate)
r.sendParticipantUpdates(updates)
}
func (r *Room) sendParticipantUpdates(updates []*livekit.ParticipantInfo) {
func (r *Room) sendParticipantUpdates(updates []*participantUpdate) {
if len(updates) == 0 {
return
}
// For filtered updates, skip
// 1. synthesized DISCONNECT - this happens on SID change
// 2. close reasons of DUPLICATE_IDENTITY/STALE - A newer session for that identity exists.
//
// Filtered updates are used with clients that can handle identity based reconnect and hence those
// conditions can be skipped.
var filteredUpdates []*livekit.ParticipantInfo
for _, update := range updates {
if update.isSynthesizedDisconnect || IsCloseNotifySkippable(update.closeReason) {
continue
}
filteredUpdates = append(filteredUpdates, update.pi)
}
var fullUpdates []*livekit.ParticipantInfo
for _, update := range updates {
fullUpdates = append(fullUpdates, update.pi)
}
for _, op := range r.GetParticipants() {
err := op.SendParticipantUpdate(updates)
var err error
if op.ProtocolVersion().SupportsIdentityBasedReconnection() {
err = op.SendParticipantUpdate(filteredUpdates)
} else {
err = op.SendParticipantUpdate(fullUpdates)
}
if err != nil {
r.Logger.Errorw("could not send update to participant", err,
"participant", op.Identity(), "pID", op.ID())
op.GetLogger().Errorw("could not send update to participant", err)
}
}
}
@@ -1170,29 +1199,34 @@ func (r *Room) sendSpeakerChanges(speakers []*livekit.SpeakerInfo) {
// * subscriber-only updates will be queued for batch updates
// * publisher & immediate updates will be returned without queuing
// * when the SID changes, it will return both updates, with the earlier participant set to disconnected
func (r *Room) pushAndDequeueUpdates(pi *livekit.ParticipantInfo, isImmediate bool) []*livekit.ParticipantInfo {
func (r *Room) pushAndDequeueUpdates(
pi *livekit.ParticipantInfo,
closeReason types.ParticipantCloseReason,
isImmediate bool,
) []*participantUpdate {
r.batchedUpdatesMu.Lock()
defer r.batchedUpdatesMu.Unlock()
var updates []*livekit.ParticipantInfo
var updates []*participantUpdate
identity := livekit.ParticipantIdentity(pi.Identity)
existing := r.batchedUpdates[identity]
shouldSend := isImmediate || pi.IsPublisher
if existing != nil {
if pi.Sid == existing.Sid {
if pi.Sid == existing.pi.Sid {
// same participant session
if pi.Version < existing.Version {
if pi.Version < existing.pi.Version {
// out of order update
return nil
}
} else {
// different participant sessions
if existing.JoinedAt < pi.JoinedAt {
if existing.pi.JoinedAt < pi.JoinedAt {
// existing is older, synthesize a DISCONNECT for older and
// send immediately along with newer session to signal switch
shouldSend = true
existing.State = livekit.ParticipantInfo_DISCONNECTED
existing.pi.State = livekit.ParticipantInfo_DISCONNECTED
existing.isSynthesizedDisconnect = true
updates = append(updates, existing)
} else {
// older session update, newer session has already become active, so nothing to do
@@ -1213,10 +1247,10 @@ func (r *Room) pushAndDequeueUpdates(pi *livekit.ParticipantInfo, isImmediate bo
if shouldSend {
// include any queued update, and return
delete(r.batchedUpdates, identity)
updates = append(updates, pi)
updates = append(updates, &participantUpdate{pi: pi, closeReason: closeReason})
} else {
// enqueue for batch
r.batchedUpdates[identity] = pi
r.batchedUpdates[identity] = &participantUpdate{pi: pi, closeReason: closeReason}
}
return updates
@@ -1257,18 +1291,14 @@ func (r *Room) changeUpdateWorker() {
case <-subTicker.C:
r.batchedUpdatesMu.Lock()
updatesMap := r.batchedUpdates
r.batchedUpdates = make(map[livekit.ParticipantIdentity]*livekit.ParticipantInfo)
r.batchedUpdates = make(map[livekit.ParticipantIdentity]*participantUpdate)
r.batchedUpdatesMu.Unlock()
if len(updatesMap) == 0 {
continue
}
updates := make([]*livekit.ParticipantInfo, 0, len(updatesMap))
for _, pi := range updatesMap {
updates = append(updates, pi)
}
r.sendParticipantUpdates(updates)
r.sendParticipantUpdates(maps.Values(updatesMap))
}
}
}
@@ -1509,6 +1539,10 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, dp
})
}
func IsCloseNotifySkippable(closeReason types.ParticipantCloseReason) bool {
return closeReason == types.ParticipantCloseReasonDuplicateIdentity
}
func connectionDetailsFields(cds []*types.ICEConnectionDetails) []interface{} {
var fields []interface{}
connectionType := types.ICEConnectionTypeUnknown
+50 -43
View File
@@ -135,7 +135,7 @@ func TestRoomJoin(t *testing.T) {
disconnectedParticipant := participants[1].(*typesfakes.FakeLocalParticipant)
disconnectedParticipant.StateReturns(livekit.ParticipantInfo_DISCONNECTED)
rm.RemoveParticipant(p.Identity(), p.ID(), types.ParticipantCloseReasonStateDisconnected)
rm.RemoveParticipant(p.Identity(), p.ID(), types.ParticipantCloseReasonClientRequestLeave)
time.Sleep(defaultDelay)
require.Equal(t, p, changedParticipant)
@@ -265,17 +265,18 @@ func TestPushAndDequeueUpdates(t *testing.T) {
require.Equal(t, a.Version, b.Version)
}
testCases := []struct {
name string
pi *livekit.ParticipantInfo
immediate bool
existing *livekit.ParticipantInfo
expected []*livekit.ParticipantInfo
validate func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo)
name string
pi *livekit.ParticipantInfo
closeReason types.ParticipantCloseReason
immediate bool
existing *participantUpdate
expected []*participantUpdate
validate func(t *testing.T, rm *Room, updates []*participantUpdate)
}{
{
name: "publisher updates are immediate",
pi: publisher1v1,
expected: []*livekit.ParticipantInfo{publisher1v1},
expected: []*participantUpdate{{pi: publisher1v1}},
},
{
name: "subscriber updates are queued",
@@ -284,20 +285,20 @@ func TestPushAndDequeueUpdates(t *testing.T) {
{
name: "last version is enqueued",
pi: subscriber1v2,
existing: subscriber1v1,
validate: func(t *testing.T, rm *Room, _ []*livekit.ParticipantInfo) {
existing: &participantUpdate{pi: proto.Clone(subscriber1v1).(*livekit.ParticipantInfo)}, // clone the existing value since it can be modified when setting to disconnected
validate: func(t *testing.T, rm *Room, _ []*participantUpdate) {
queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)]
require.NotNil(t, queued)
requirePIEquals(t, subscriber1v2, queued)
requirePIEquals(t, subscriber1v2, queued.pi)
},
},
{
name: "latest version when immediate",
pi: subscriber1v2,
existing: subscriber1v1,
existing: &participantUpdate{pi: proto.Clone(subscriber1v1).(*livekit.ParticipantInfo)},
immediate: true,
expected: []*livekit.ParticipantInfo{subscriber1v2},
validate: func(t *testing.T, rm *Room, _ []*livekit.ParticipantInfo) {
expected: []*participantUpdate{{pi: subscriber1v2}},
validate: func(t *testing.T, rm *Room, _ []*participantUpdate) {
queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)]
require.Nil(t, queued)
},
@@ -305,32 +306,37 @@ func TestPushAndDequeueUpdates(t *testing.T) {
{
name: "out of order updates are rejected",
pi: subscriber1v1,
existing: subscriber1v2,
validate: func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo) {
existing: &participantUpdate{pi: proto.Clone(subscriber1v2).(*livekit.ParticipantInfo)},
validate: func(t *testing.T, rm *Room, updates []*participantUpdate) {
queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)]
requirePIEquals(t, subscriber1v2, queued)
requirePIEquals(t, subscriber1v2, queued.pi)
},
},
{
name: "sid change is broadcasted immediately",
pi: publisher2,
existing: subscriber1v2,
expected: []*livekit.ParticipantInfo{
name: "sid change is broadcasted immediately with synthsized disconnect",
pi: publisher2,
closeReason: types.ParticipantCloseReasonServiceRequestRemoveParticipant, // just to test if update contain the close reason
existing: &participantUpdate{pi: proto.Clone(subscriber1v2).(*livekit.ParticipantInfo), closeReason: types.ParticipantCloseReasonStale},
expected: []*participantUpdate{
{
Identity: identity,
Sid: "1",
Version: 2,
State: livekit.ParticipantInfo_DISCONNECTED,
pi: &livekit.ParticipantInfo{
Identity: identity,
Sid: "1",
Version: 2,
State: livekit.ParticipantInfo_DISCONNECTED,
},
isSynthesizedDisconnect: true,
closeReason: types.ParticipantCloseReasonStale,
},
publisher2,
{pi: publisher2, closeReason: types.ParticipantCloseReasonServiceRequestRemoveParticipant},
},
},
{
name: "when switching to publisher, queue is cleared",
pi: publisher1v2,
existing: subscriber1v1,
expected: []*livekit.ParticipantInfo{publisher1v2},
validate: func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo) {
existing: &participantUpdate{pi: proto.Clone(subscriber1v1).(*livekit.ParticipantInfo)},
expected: []*participantUpdate{{pi: publisher1v2}},
validate: func(t *testing.T, rm *Room, updates []*participantUpdate) {
require.Empty(t, rm.batchedUpdates)
},
},
@@ -340,13 +346,14 @@ func TestPushAndDequeueUpdates(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 1})
if tc.existing != nil {
// clone the existing value since it can be modified when setting to disconnected
rm.batchedUpdates[livekit.ParticipantIdentity(tc.existing.Identity)] = proto.Clone(tc.existing).(*livekit.ParticipantInfo)
rm.batchedUpdates[livekit.ParticipantIdentity(tc.existing.pi.Identity)] = tc.existing
}
updates := rm.pushAndDequeueUpdates(tc.pi, tc.immediate)
updates := rm.pushAndDequeueUpdates(tc.pi, tc.closeReason, tc.immediate)
require.Equal(t, len(tc.expected), len(updates))
for i, item := range tc.expected {
requirePIEquals(t, item, updates[i])
requirePIEquals(t, item.pi, updates[i].pi)
require.Equal(t, item.isSynthesizedDisconnect, updates[i].isSynthesizedDisconnect)
require.Equal(t, item.closeReason, updates[i].closeReason)
}
if tc.validate != nil {
@@ -443,7 +450,7 @@ func TestActiveSpeakers(t *testing.T) {
audioUpdateDuration := (audioUpdateInterval + 10) * time.Millisecond
t.Run("participant should not be getting audio updates (protocol 2)", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 1, protocol: 2})
defer rm.Close()
defer rm.Close(types.ParticipantCloseReasonNone)
p := rm.GetParticipants()[0].(*typesfakes.FakeLocalParticipant)
require.Empty(t, rm.GetActiveSpeakers())
@@ -455,7 +462,7 @@ func TestActiveSpeakers(t *testing.T) {
t.Run("speakers should be sorted by loudness", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2})
defer rm.Close()
defer rm.Close(types.ParticipantCloseReasonNone)
participants := rm.GetParticipants()
p := participants[0].(*typesfakes.FakeLocalParticipant)
p2 := participants[1].(*typesfakes.FakeLocalParticipant)
@@ -470,7 +477,7 @@ func TestActiveSpeakers(t *testing.T) {
t.Run("participants are getting audio updates (protocol 3+)", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2, protocol: 3})
defer rm.Close()
defer rm.Close(types.ParticipantCloseReasonNone)
participants := rm.GetParticipants()
p := participants[0].(*typesfakes.FakeLocalParticipant)
time.Sleep(time.Millisecond) // let the first update cycle run
@@ -509,7 +516,7 @@ func TestActiveSpeakers(t *testing.T) {
t.Run("audio level is smoothed", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2, protocol: 3, audioSmoothIntervals: 3})
defer rm.Close()
defer rm.Close(types.ParticipantCloseReasonNone)
participants := rm.GetParticipants()
p := participants[0].(*typesfakes.FakeLocalParticipant)
op := participants[1].(*typesfakes.FakeLocalParticipant)
@@ -566,7 +573,7 @@ func TestDataChannel(t *testing.T) {
t.Run("participants should receive data", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 3})
defer rm.Close()
defer rm.Close(types.ParticipantCloseReasonNone)
participants := rm.GetParticipants()
p := participants[0].(*typesfakes.FakeLocalParticipant)
@@ -596,7 +603,7 @@ func TestDataChannel(t *testing.T) {
t.Run("only one participant should receive the data", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 4})
defer rm.Close()
defer rm.Close(types.ParticipantCloseReasonNone)
participants := rm.GetParticipants()
p := participants[0].(*typesfakes.FakeLocalParticipant)
p1 := participants[1].(*typesfakes.FakeLocalParticipant)
@@ -627,7 +634,7 @@ func TestDataChannel(t *testing.T) {
t.Run("publishing disallowed", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2})
defer rm.Close()
defer rm.Close(types.ParticipantCloseReasonNone)
participants := rm.GetParticipants()
p := participants[0].(*typesfakes.FakeLocalParticipant)
p.CanPublishDataReturns(false)
@@ -655,7 +662,7 @@ func TestDataChannel(t *testing.T) {
func TestHiddenParticipants(t *testing.T) {
t.Run("other participants don't receive hidden updates", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2, numHidden: 1})
defer rm.Close()
defer rm.Close(types.ParticipantCloseReasonNone)
pNew := NewMockParticipant("new", types.CurrentProtocol, false, false)
rm.Join(pNew, nil, nil, iceServersForRoom)
@@ -687,7 +694,7 @@ func TestHiddenParticipants(t *testing.T) {
func TestRoomUpdate(t *testing.T) {
t.Run("updates are sent when participant joined", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 1})
defer rm.Close()
defer rm.Close(types.ParticipantCloseReasonNone)
p1 := rm.GetParticipants()[0].(*typesfakes.FakeLocalParticipant)
require.Equal(t, 0, p1.SendRoomUpdateCallCount())
@@ -703,7 +710,7 @@ func TestRoomUpdate(t *testing.T) {
t.Run("participants should receive metadata update", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2})
defer rm.Close()
defer rm.Close(types.ParticipantCloseReasonNone)
rm.SetMetadata("test metadata...")
+6 -11
View File
@@ -81,14 +81,13 @@ type SubscribedCodecQuality struct {
type ParticipantCloseReason int
const (
ParticipantCloseReasonClientRequestLeave ParticipantCloseReason = iota
ParticipantCloseReasonNone ParticipantCloseReason = iota
ParticipantCloseReasonClientRequestLeave
ParticipantCloseReasonRoomManagerStop
ParticipantCloseReasonRoomClose
ParticipantCloseReasonVerifyFailed
ParticipantCloseReasonJoinFailed
ParticipantCloseReasonJoinTimeout
ParticipantCloseReasonMessageBusFailed
ParticipantCloseReasonStateDisconnected
ParticipantCloseReasonPeerConnectionDisconnected
ParticipantCloseReasonDuplicateIdentity
ParticipantCloseReasonMigrationComplete
@@ -100,7 +99,6 @@ const (
ParticipantCloseReasonSimulateServerLeave
ParticipantCloseReasonNegotiateFailed
ParticipantCloseReasonMigrationRequested
ParticipantCloseReasonOvercommitted
ParticipantCloseReasonPublicationError
ParticipantCloseReasonSubscriptionError
ParticipantCloseReasonDataChannelError
@@ -110,12 +108,12 @@ const (
func (p ParticipantCloseReason) String() string {
switch p {
case ParticipantCloseReasonNone:
return "NONE"
case ParticipantCloseReasonClientRequestLeave:
return "CLIENT_REQUEST_LEAVE"
case ParticipantCloseReasonRoomManagerStop:
return "ROOM_MANAGER_STOP"
case ParticipantCloseReasonRoomClose:
return "ROOM_CLOSE"
case ParticipantCloseReasonVerifyFailed:
return "VERIFY_FAILED"
case ParticipantCloseReasonJoinFailed:
@@ -124,8 +122,6 @@ func (p ParticipantCloseReason) String() string {
return "JOIN_TIMEOUT"
case ParticipantCloseReasonMessageBusFailed:
return "MESSAGE_BUS_FAILED"
case ParticipantCloseReasonStateDisconnected:
return "STATE_DISCONNECTED"
case ParticipantCloseReasonPeerConnectionDisconnected:
return "PEER_CONNECTION_DISCONNECTED"
case ParticipantCloseReasonDuplicateIdentity:
@@ -148,8 +144,6 @@ func (p ParticipantCloseReason) String() string {
return "NEGOTIATE_FAILED"
case ParticipantCloseReasonMigrationRequested:
return "MIGRATION_REQUESTED"
case ParticipantCloseReasonOvercommitted:
return "OVERCOMMITTED"
case ParticipantCloseReasonPublicationError:
return "PUBLICATION_ERROR"
case ParticipantCloseReasonSubscriptionError:
@@ -184,7 +178,7 @@ func (p ParticipantCloseReason) ToDisconnectReason() livekit.DisconnectReason {
return livekit.DisconnectReason_PARTICIPANT_REMOVED
case ParticipantCloseReasonServiceRequestDeleteRoom:
return livekit.DisconnectReason_ROOM_DELETED
case ParticipantCloseReasonSimulateNodeFailure, ParticipantCloseReasonSimulateServerLeave, ParticipantCloseReasonOvercommitted:
case ParticipantCloseReasonSimulateNodeFailure, ParticipantCloseReasonSimulateServerLeave:
return livekit.DisconnectReason_SERVER_SHUTDOWN
case ParticipantCloseReasonNegotiateFailed, ParticipantCloseReasonPublicationError, ParticipantCloseReasonSubscriptionError, ParticipantCloseReasonDataChannelError, ParticipantCloseReasonMigrateCodecMismatch:
return livekit.DisconnectReason_STATE_MISMATCH
@@ -250,6 +244,7 @@ type Participant interface {
ID() livekit.ParticipantID
Identity() livekit.ParticipantIdentity
State() livekit.ParticipantInfo_State
CloseReason() ParticipantCloseReason
CanSkipBroadcast() bool
ToProto() *livekit.ParticipantInfo
+4
View File
@@ -84,6 +84,10 @@ func (v ProtocolVersion) SupportsAsyncRoomID() bool {
return v > 11
}
func (v ProtocolVersion) SupportsIdentityBasedReconnection() bool {
return v > 11
}
func (v ProtocolVersion) SupportsRegionsInLeaveRequest() bool {
return v > 12
}
@@ -133,6 +133,16 @@ type FakeLocalParticipant struct {
closeReturnsOnCall map[int]struct {
result1 error
}
CloseReasonStub func() types.ParticipantCloseReason
closeReasonMutex sync.RWMutex
closeReasonArgsForCall []struct {
}
closeReasonReturns struct {
result1 types.ParticipantCloseReason
}
closeReasonReturnsOnCall map[int]struct {
result1 types.ParticipantCloseReason
}
CloseSignalConnectionStub func(types.SignallingCloseReason)
closeSignalConnectionMutex sync.RWMutex
closeSignalConnectionArgsForCall []struct {
@@ -1545,6 +1555,59 @@ func (fake *FakeLocalParticipant) CloseReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeLocalParticipant) CloseReason() types.ParticipantCloseReason {
fake.closeReasonMutex.Lock()
ret, specificReturn := fake.closeReasonReturnsOnCall[len(fake.closeReasonArgsForCall)]
fake.closeReasonArgsForCall = append(fake.closeReasonArgsForCall, struct {
}{})
stub := fake.CloseReasonStub
fakeReturns := fake.closeReasonReturns
fake.recordInvocation("CloseReason", []interface{}{})
fake.closeReasonMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) CloseReasonCallCount() int {
fake.closeReasonMutex.RLock()
defer fake.closeReasonMutex.RUnlock()
return len(fake.closeReasonArgsForCall)
}
func (fake *FakeLocalParticipant) CloseReasonCalls(stub func() types.ParticipantCloseReason) {
fake.closeReasonMutex.Lock()
defer fake.closeReasonMutex.Unlock()
fake.CloseReasonStub = stub
}
func (fake *FakeLocalParticipant) CloseReasonReturns(result1 types.ParticipantCloseReason) {
fake.closeReasonMutex.Lock()
defer fake.closeReasonMutex.Unlock()
fake.CloseReasonStub = nil
fake.closeReasonReturns = struct {
result1 types.ParticipantCloseReason
}{result1}
}
func (fake *FakeLocalParticipant) CloseReasonReturnsOnCall(i int, result1 types.ParticipantCloseReason) {
fake.closeReasonMutex.Lock()
defer fake.closeReasonMutex.Unlock()
fake.CloseReasonStub = nil
if fake.closeReasonReturnsOnCall == nil {
fake.closeReasonReturnsOnCall = make(map[int]struct {
result1 types.ParticipantCloseReason
})
}
fake.closeReasonReturnsOnCall[i] = struct {
result1 types.ParticipantCloseReason
}{result1}
}
func (fake *FakeLocalParticipant) CloseSignalConnection(arg1 types.SignallingCloseReason) {
fake.closeSignalConnectionMutex.Lock()
fake.closeSignalConnectionArgsForCall = append(fake.closeSignalConnectionArgsForCall, struct {
@@ -6171,6 +6234,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.claimGrantsMutex.RUnlock()
fake.closeMutex.RLock()
defer fake.closeMutex.RUnlock()
fake.closeReasonMutex.RLock()
defer fake.closeReasonMutex.RUnlock()
fake.closeSignalConnectionMutex.RLock()
defer fake.closeSignalConnectionMutex.RUnlock()
fake.connectedAtMutex.RLock()
@@ -33,6 +33,16 @@ type FakeParticipant struct {
closeReturnsOnCall map[int]struct {
result1 error
}
CloseReasonStub func() types.ParticipantCloseReason
closeReasonMutex sync.RWMutex
closeReasonArgsForCall []struct {
}
closeReasonReturns struct {
result1 types.ParticipantCloseReason
}
closeReasonReturnsOnCall map[int]struct {
result1 types.ParticipantCloseReason
}
DebugInfoStub func() map[string]interface{}
debugInfoMutex sync.RWMutex
debugInfoArgsForCall []struct {
@@ -342,6 +352,59 @@ func (fake *FakeParticipant) CloseReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeParticipant) CloseReason() types.ParticipantCloseReason {
fake.closeReasonMutex.Lock()
ret, specificReturn := fake.closeReasonReturnsOnCall[len(fake.closeReasonArgsForCall)]
fake.closeReasonArgsForCall = append(fake.closeReasonArgsForCall, struct {
}{})
stub := fake.CloseReasonStub
fakeReturns := fake.closeReasonReturns
fake.recordInvocation("CloseReason", []interface{}{})
fake.closeReasonMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeParticipant) CloseReasonCallCount() int {
fake.closeReasonMutex.RLock()
defer fake.closeReasonMutex.RUnlock()
return len(fake.closeReasonArgsForCall)
}
func (fake *FakeParticipant) CloseReasonCalls(stub func() types.ParticipantCloseReason) {
fake.closeReasonMutex.Lock()
defer fake.closeReasonMutex.Unlock()
fake.CloseReasonStub = stub
}
func (fake *FakeParticipant) CloseReasonReturns(result1 types.ParticipantCloseReason) {
fake.closeReasonMutex.Lock()
defer fake.closeReasonMutex.Unlock()
fake.CloseReasonStub = nil
fake.closeReasonReturns = struct {
result1 types.ParticipantCloseReason
}{result1}
}
func (fake *FakeParticipant) CloseReasonReturnsOnCall(i int, result1 types.ParticipantCloseReason) {
fake.closeReasonMutex.Lock()
defer fake.closeReasonMutex.Unlock()
fake.CloseReasonStub = nil
if fake.closeReasonReturnsOnCall == nil {
fake.closeReasonReturnsOnCall = make(map[int]struct {
result1 types.ParticipantCloseReason
})
}
fake.closeReasonReturnsOnCall[i] = struct {
result1 types.ParticipantCloseReason
}{result1}
}
func (fake *FakeParticipant) DebugInfo() map[string]interface{} {
fake.debugInfoMutex.Lock()
ret, specificReturn := fake.debugInfoReturnsOnCall[len(fake.debugInfoArgsForCall)]
@@ -1337,6 +1400,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} {
defer fake.canSkipBroadcastMutex.RUnlock()
fake.closeMutex.RLock()
defer fake.closeMutex.RUnlock()
fake.closeReasonMutex.RLock()
defer fake.closeReasonMutex.RUnlock()
fake.debugInfoMutex.RLock()
defer fake.debugInfoMutex.RUnlock()
fake.getAudioLevelMutex.RLock()
+2 -8
View File
@@ -213,10 +213,7 @@ func (r *RoomManager) Stop() {
r.lock.RUnlock()
for _, room := range rooms {
for _, p := range room.GetParticipants() {
_ = p.Close(true, types.ParticipantCloseReasonRoomManagerStop, false)
}
room.Close()
room.Close(types.ParticipantCloseReasonRoomManagerStop)
}
r.roomServers.Kill()
@@ -727,10 +724,7 @@ func (r *RoomManager) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq
}
} else {
room.Logger.Infow("deleting room")
for _, p := range room.GetParticipants() {
_ = p.Close(true, types.ParticipantCloseReasonServiceRequestDeleteRoom, false)
}
room.Close()
room.Close(types.ParticipantCloseReasonServiceRequestDeleteRoom)
}
return &livekit.DeleteRoomResponse{}, nil
}
+2 -1
View File
@@ -35,6 +35,7 @@ import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/service"
"github.com/livekit/livekit-server/pkg/testutils"
)
@@ -108,7 +109,7 @@ func TestWebhooks(t *testing.T) {
// room closed
rm := server.RoomManager().GetRoom(context.Background(), testRoom)
rm.Close()
rm.Close(types.ParticipantCloseReasonNone)
testutils.WithTimeout(t, func() string {
if ts.GetEvent(webhook.EventRoomFinished) == nil {
return "did not receive RoomFinished"