From d3da94c45e0c09966babe14c8bee6f5db029ce30 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 25 Jan 2024 22:22:46 +0530 Subject: [PATCH] Augment LeaveRequest with alternate regions to connect. (#2408) * Augment LeaveRequest with alternate regions to connect. * update protocol and issue resume action on close if expected to resume * use current protocol in tests * address feedback --- go.mod | 4 +- go.sum | 8 +- pkg/rtc/participant.go | 61 +++- pkg/rtc/types/interfaces.go | 19 +- pkg/rtc/types/protocol_version.go | 6 +- .../typesfakes/fake_local_participant.go | 39 +++ pkg/service/roommanager.go | 37 ++- pkg/service/servicefakes/fake_sipstore.go | 312 ------------------ test/client/client.go | 7 +- 9 files changed, 146 insertions(+), 347 deletions(-) diff --git a/go.mod b/go.mod index b13a7d01e..bcec95e46 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231213075826-cccbf2b93d3f - github.com/livekit/protocol v1.9.5 + github.com/livekit/protocol v1.9.6-0.20240125083757-31b03e690557 github.com/livekit/psrpc v0.5.3-0.20231214055026-06ce27a934c9 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 @@ -101,7 +101,7 @@ require ( golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.17.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect google.golang.org/grpc v1.61.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 41f44c6e9..d8ae78ab4 100644 --- a/go.sum +++ b/go.sum @@ -126,8 +126,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231213075826-cccbf2b93d3f h1:XHrwGwLNGQB3ZqolH1YdMH/22hgXKr4vm+2M7JKMMGg= github.com/livekit/mediatransportutil v0.0.0-20231213075826-cccbf2b93d3f/go.mod h1:GBzn9xL+mivI1pW+tyExcKgbc0VOc29I9yJsNcAVaAc= -github.com/livekit/protocol v1.9.5 h1:/I6maM05euoUxrV6je16Qj5yCnCSPZ+nhHzm8akLCVk= -github.com/livekit/protocol v1.9.5/go.mod h1:daddOPw85C9nq6f9w1uiuc1i/He6X2gArlFcKUPELI4= +github.com/livekit/protocol v1.9.6-0.20240125083757-31b03e690557 h1:lz11rtmZouO9C7RcDJpg5tWERUEAMLTYjtHX/mRyras= +github.com/livekit/protocol v1.9.6-0.20240125083757-31b03e690557/go.mod h1:daddOPw85C9nq6f9w1uiuc1i/He6X2gArlFcKUPELI4= github.com/livekit/psrpc v0.5.3-0.20231214055026-06ce27a934c9 h1:kXXV/NLVDHZ+Gn7xrR+UPpdwbH48n7WReBjLHAzqzhY= github.com/livekit/psrpc v0.5.3-0.20231214055026-06ce27a934c9/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= @@ -421,8 +421,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac h1:nUQEQmH/csSvFECKYRv6HWEyypysidKl2I6Qpsglq/0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac/go.mod h1:daQN87bsDqDoe316QbbvX60nMoJQa4r6Ds0ZuoAe5yA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s= google.golang.org/grpc v1.61.0 h1:TOvOcuXn30kRao+gfcvsebNEa5iZIiLkisYEkf7R7o0= google.golang.org/grpc v1.61.0/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index ec1bb5596..ff6758d80 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -221,6 +221,8 @@ type ParticipantImpl struct { // loggers for publisher and subscriber pubLogger logger.Logger subLogger logger.Logger + + regionSettings *livekit.RegionSettings } func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { @@ -582,8 +584,7 @@ func (p *ParticipantImpl) HandleSignalSourceClose() { p.TransportManager.SetSignalSourceValid(false) if !p.HasConnected() { - reason := types.ParticipantCloseReasonJoinFailed - _ = p.Close(false, reason, false) + _ = p.Close(false, types.ParticipantCloseReasonSignalSourceClose, false) } } @@ -749,12 +750,32 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea p.clearMigrationTimer() // send leave message - if sendLeave { + var leave *livekit.LeaveRequest + if p.ProtocolVersion().SupportsRegionsInLeaveRequest() { + leave = &livekit.LeaveRequest{ + Reason: reason.ToDisconnectReason(), + } + if isExpectedToResume { + leave.Action = livekit.LeaveRequest_RESUME + } else { + leave.Action = livekit.LeaveRequest_DISCONNECT + } + // although regions are not needed when resuming OR disconnecting, + // send it if available, just in case clients want to fall back. + p.lock.RLock() + if p.regionSettings != nil { + leave.Regions = proto.Clone(p.regionSettings).(*livekit.RegionSettings) + } + p.lock.RUnlock() + } else if sendLeave { + leave = &livekit.LeaveRequest{ + Reason: reason.ToDisconnectReason(), + } + } + if leave != nil { _ = p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Leave{ - Leave: &livekit.LeaveRequest{ - Reason: reason.ToDisconnectReason(), - }, + Leave: leave, }, }) } @@ -2282,12 +2303,26 @@ func (p *ParticipantImpl) GetCachedDownTrack(trackID livekit.TrackID) (*webrtc.R } func (p *ParticipantImpl) IssueFullReconnect(reason types.ParticipantCloseReason) { + var leave *livekit.LeaveRequest + if p.ProtocolVersion().SupportsRegionsInLeaveRequest() { + leave = &livekit.LeaveRequest{ + Reason: reason.ToDisconnectReason(), + Action: livekit.LeaveRequest_RECONNECT, + } + p.lock.RLock() + if p.regionSettings != nil { + leave.Regions = proto.Clone(p.regionSettings).(*livekit.RegionSettings) + } + p.lock.RUnlock() + } else { + leave = &livekit.LeaveRequest{ + CanReconnect: true, + Reason: reason.ToDisconnectReason(), + } + } _ = p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Leave{ - Leave: &livekit.LeaveRequest{ - CanReconnect: true, - Reason: reason.ToDisconnectReason(), - }, + Leave: leave, }, }) @@ -2451,3 +2486,9 @@ func (p *ParticipantImpl) setupEnabledCodecs(publishEnabledCodecs []*livekit.Cod } p.enabledSubscribeCodecs = subscribeCodecs } + +func (p *ParticipantImpl) SetRegionSettings(regionSettings *livekit.RegionSettings) { + p.lock.Lock() + p.regionSettings = proto.Clone(regionSettings).(*livekit.RegionSettings) + p.lock.Unlock() +} diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 152513efc..466615c8a 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -105,6 +105,7 @@ const ( ParticipantCloseReasonSubscriptionError ParticipantCloseReasonDataChannelError ParticipantCloseReasonMigrateCodecMismatch + ParticipantCloseReasonSignalSourceClose ) func (p ParticipantCloseReason) String() string { @@ -157,6 +158,8 @@ func (p ParticipantCloseReason) String() string { return "DATA_CHANNEL_ERROR" case ParticipantCloseReasonMigrateCodecMismatch: return "MIGRATE_CODEC_MISMATCH" + case ParticipantCloseReasonSignalSourceClose: + return "SIGNAL_SOURCE_CLOSE" default: return fmt.Sprintf("%d", int(p)) } @@ -173,22 +176,20 @@ func (p ParticipantCloseReason) ToDisconnectReason() livekit.DisconnectReason { return livekit.DisconnectReason_JOIN_FAILURE case ParticipantCloseReasonPeerConnectionDisconnected: return livekit.DisconnectReason_STATE_MISMATCH - case ParticipantCloseReasonDuplicateIdentity, ParticipantCloseReasonMigrationComplete, ParticipantCloseReasonStale: + case ParticipantCloseReasonDuplicateIdentity, ParticipantCloseReasonStale: return livekit.DisconnectReason_DUPLICATE_IDENTITY + case ParticipantCloseReasonMigrationComplete, ParticipantCloseReasonSimulateMigration: + return livekit.DisconnectReason_MIGRATION case ParticipantCloseReasonServiceRequestRemoveParticipant: return livekit.DisconnectReason_PARTICIPANT_REMOVED case ParticipantCloseReasonServiceRequestDeleteRoom: return livekit.DisconnectReason_ROOM_DELETED - case ParticipantCloseReasonSimulateMigration: - return livekit.DisconnectReason_DUPLICATE_IDENTITY - case ParticipantCloseReasonSimulateNodeFailure: - return livekit.DisconnectReason_SERVER_SHUTDOWN - case ParticipantCloseReasonSimulateServerLeave: - return livekit.DisconnectReason_SERVER_SHUTDOWN - case ParticipantCloseReasonOvercommitted: + case ParticipantCloseReasonSimulateNodeFailure, ParticipantCloseReasonSimulateServerLeave, ParticipantCloseReasonOvercommitted: return livekit.DisconnectReason_SERVER_SHUTDOWN case ParticipantCloseReasonNegotiateFailed, ParticipantCloseReasonPublicationError, ParticipantCloseReasonSubscriptionError, ParticipantCloseReasonDataChannelError, ParticipantCloseReasonMigrateCodecMismatch: return livekit.DisconnectReason_STATE_MISMATCH + case ParticipantCloseReasonSignalSourceClose: + return livekit.DisconnectReason_SIGNAL_CLOSE default: // the other types will map to unknown reason return livekit.DisconnectReason_UNKNOWN_REASON @@ -421,6 +422,8 @@ type LocalParticipant interface { GetPacer() pacer.Pacer GetTrafficLoad() *TrafficLoad + + SetRegionSettings(regionSettings *livekit.RegionSettings) } // Room is a container of participants, and can provide room-level actions diff --git a/pkg/rtc/types/protocol_version.go b/pkg/rtc/types/protocol_version.go index ac445e9d8..08075184d 100644 --- a/pkg/rtc/types/protocol_version.go +++ b/pkg/rtc/types/protocol_version.go @@ -16,7 +16,7 @@ package types type ProtocolVersion int -const CurrentProtocol = 12 +const CurrentProtocol = 13 func (v ProtocolVersion) SupportsPackedStreamId() bool { return v > 0 @@ -83,3 +83,7 @@ func (v ProtocolVersion) SupportsConnectionQualityLost() bool { func (v ProtocolVersion) SupportsAsyncRoomID() bool { return v > 11 } + +func (v ProtocolVersion) SupportsRegionsInLeaveRequest() bool { + return v > 12 +} diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 0cd75616a..756099d2c 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -753,6 +753,11 @@ type FakeLocalParticipant struct { setPermissionReturnsOnCall map[int]struct { result1 bool } + SetRegionSettingsStub func(*livekit.RegionSettings) + setRegionSettingsMutex sync.RWMutex + setRegionSettingsArgsForCall []struct { + arg1 *livekit.RegionSettings + } SetResponseSinkStub func(routing.MessageSink) setResponseSinkMutex sync.RWMutex setResponseSinkArgsForCall []struct { @@ -4981,6 +4986,38 @@ func (fake *FakeLocalParticipant) SetPermissionReturnsOnCall(i int, result1 bool }{result1} } +func (fake *FakeLocalParticipant) SetRegionSettings(arg1 *livekit.RegionSettings) { + fake.setRegionSettingsMutex.Lock() + fake.setRegionSettingsArgsForCall = append(fake.setRegionSettingsArgsForCall, struct { + arg1 *livekit.RegionSettings + }{arg1}) + stub := fake.SetRegionSettingsStub + fake.recordInvocation("SetRegionSettings", []interface{}{arg1}) + fake.setRegionSettingsMutex.Unlock() + if stub != nil { + fake.SetRegionSettingsStub(arg1) + } +} + +func (fake *FakeLocalParticipant) SetRegionSettingsCallCount() int { + fake.setRegionSettingsMutex.RLock() + defer fake.setRegionSettingsMutex.RUnlock() + return len(fake.setRegionSettingsArgsForCall) +} + +func (fake *FakeLocalParticipant) SetRegionSettingsCalls(stub func(*livekit.RegionSettings)) { + fake.setRegionSettingsMutex.Lock() + defer fake.setRegionSettingsMutex.Unlock() + fake.SetRegionSettingsStub = stub +} + +func (fake *FakeLocalParticipant) SetRegionSettingsArgsForCall(i int) *livekit.RegionSettings { + fake.setRegionSettingsMutex.RLock() + defer fake.setRegionSettingsMutex.RUnlock() + argsForCall := fake.setRegionSettingsArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeLocalParticipant) SetResponseSink(arg1 routing.MessageSink) { fake.setResponseSinkMutex.Lock() fake.setResponseSinkArgsForCall = append(fake.setResponseSinkArgsForCall, struct { @@ -6343,6 +6380,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.setNameMutex.RUnlock() fake.setPermissionMutex.RLock() defer fake.setPermissionMutex.RUnlock() + fake.setRegionSettingsMutex.RLock() + defer fake.setRegionSettingsMutex.RUnlock() fake.setResponseSinkMutex.RLock() defer fake.setResponseSinkMutex.RUnlock() fake.setSignalSourceValidMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 51ab919a0..abfd29331 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -276,12 +276,23 @@ func (r *RoomManager) StartSession( "participant", pi.Identity, "reason", pi.ReconnectReason, ) + + var leave *livekit.LeaveRequest + pv := types.ProtocolVersion(pi.Client.Protocol) + if pv.SupportsRegionsInLeaveRequest() { + leave = &livekit.LeaveRequest{ + Reason: livekit.DisconnectReason_STATE_MISMATCH, + Action: livekit.LeaveRequest_RECONNECT, + } + } else { + leave = &livekit.LeaveRequest{ + CanReconnect: true, + Reason: livekit.DisconnectReason_STATE_MISMATCH, + } + } _ = responseSink.WriteMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Leave{ - Leave: &livekit.LeaveRequest{ - CanReconnect: true, - Reason: livekit.DisconnectReason_STATE_MISMATCH, - }, + Leave: leave, }, }) return errors.New("could not restart closed participant") @@ -321,12 +332,22 @@ func (r *RoomManager) StartSession( } else if pi.Reconnect { // send leave request if participant is trying to reconnect without keep subscribe state // but missing from the room + var leave *livekit.LeaveRequest + pv := types.ProtocolVersion(pi.Client.Protocol) + if pv.SupportsRegionsInLeaveRequest() { + leave = &livekit.LeaveRequest{ + Reason: livekit.DisconnectReason_STATE_MISMATCH, + Action: livekit.LeaveRequest_RECONNECT, + } + } else { + leave = &livekit.LeaveRequest{ + CanReconnect: true, + Reason: livekit.DisconnectReason_STATE_MISMATCH, + } + } _ = responseSink.WriteMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Leave{ - Leave: &livekit.LeaveRequest{ - CanReconnect: true, - Reason: livekit.DisconnectReason_STATE_MISMATCH, - }, + Leave: leave, }, }) return errors.New("could not restart participant") diff --git a/pkg/service/servicefakes/fake_sipstore.go b/pkg/service/servicefakes/fake_sipstore.go index 069eff951..fe88a5359 100644 --- a/pkg/service/servicefakes/fake_sipstore.go +++ b/pkg/service/servicefakes/fake_sipstore.go @@ -22,18 +22,6 @@ type FakeSIPStore struct { deleteSIPDispatchRuleReturnsOnCall map[int]struct { result1 error } - DeleteSIPParticipantStub func(context.Context, *livekit.SIPParticipantInfo) error - deleteSIPParticipantMutex sync.RWMutex - deleteSIPParticipantArgsForCall []struct { - arg1 context.Context - arg2 *livekit.SIPParticipantInfo - } - deleteSIPParticipantReturns struct { - result1 error - } - deleteSIPParticipantReturnsOnCall map[int]struct { - result1 error - } DeleteSIPTrunkStub func(context.Context, *livekit.SIPTrunkInfo) error deleteSIPTrunkMutex sync.RWMutex deleteSIPTrunkArgsForCall []struct { @@ -59,19 +47,6 @@ type FakeSIPStore struct { result1 []*livekit.SIPDispatchRuleInfo result2 error } - ListSIPParticipantStub func(context.Context) ([]*livekit.SIPParticipantInfo, error) - listSIPParticipantMutex sync.RWMutex - listSIPParticipantArgsForCall []struct { - arg1 context.Context - } - listSIPParticipantReturns struct { - result1 []*livekit.SIPParticipantInfo - result2 error - } - listSIPParticipantReturnsOnCall map[int]struct { - result1 []*livekit.SIPParticipantInfo - result2 error - } ListSIPTrunkStub func(context.Context) ([]*livekit.SIPTrunkInfo, error) listSIPTrunkMutex sync.RWMutex listSIPTrunkArgsForCall []struct { @@ -99,20 +74,6 @@ type FakeSIPStore struct { result1 *livekit.SIPDispatchRuleInfo result2 error } - LoadSIPParticipantStub func(context.Context, string) (*livekit.SIPParticipantInfo, error) - loadSIPParticipantMutex sync.RWMutex - loadSIPParticipantArgsForCall []struct { - arg1 context.Context - arg2 string - } - loadSIPParticipantReturns struct { - result1 *livekit.SIPParticipantInfo - result2 error - } - loadSIPParticipantReturnsOnCall map[int]struct { - result1 *livekit.SIPParticipantInfo - result2 error - } LoadSIPTrunkStub func(context.Context, string) (*livekit.SIPTrunkInfo, error) loadSIPTrunkMutex sync.RWMutex loadSIPTrunkArgsForCall []struct { @@ -139,18 +100,6 @@ type FakeSIPStore struct { storeSIPDispatchRuleReturnsOnCall map[int]struct { result1 error } - StoreSIPParticipantStub func(context.Context, *livekit.SIPParticipantInfo) error - storeSIPParticipantMutex sync.RWMutex - storeSIPParticipantArgsForCall []struct { - arg1 context.Context - arg2 *livekit.SIPParticipantInfo - } - storeSIPParticipantReturns struct { - result1 error - } - storeSIPParticipantReturnsOnCall map[int]struct { - result1 error - } StoreSIPTrunkStub func(context.Context, *livekit.SIPTrunkInfo) error storeSIPTrunkMutex sync.RWMutex storeSIPTrunkArgsForCall []struct { @@ -229,68 +178,6 @@ func (fake *FakeSIPStore) DeleteSIPDispatchRuleReturnsOnCall(i int, result1 erro }{result1} } -func (fake *FakeSIPStore) DeleteSIPParticipant(arg1 context.Context, arg2 *livekit.SIPParticipantInfo) error { - fake.deleteSIPParticipantMutex.Lock() - ret, specificReturn := fake.deleteSIPParticipantReturnsOnCall[len(fake.deleteSIPParticipantArgsForCall)] - fake.deleteSIPParticipantArgsForCall = append(fake.deleteSIPParticipantArgsForCall, struct { - arg1 context.Context - arg2 *livekit.SIPParticipantInfo - }{arg1, arg2}) - stub := fake.DeleteSIPParticipantStub - fakeReturns := fake.deleteSIPParticipantReturns - fake.recordInvocation("DeleteSIPParticipant", []interface{}{arg1, arg2}) - fake.deleteSIPParticipantMutex.Unlock() - if stub != nil { - return stub(arg1, arg2) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeSIPStore) DeleteSIPParticipantCallCount() int { - fake.deleteSIPParticipantMutex.RLock() - defer fake.deleteSIPParticipantMutex.RUnlock() - return len(fake.deleteSIPParticipantArgsForCall) -} - -func (fake *FakeSIPStore) DeleteSIPParticipantCalls(stub func(context.Context, *livekit.SIPParticipantInfo) error) { - fake.deleteSIPParticipantMutex.Lock() - defer fake.deleteSIPParticipantMutex.Unlock() - fake.DeleteSIPParticipantStub = stub -} - -func (fake *FakeSIPStore) DeleteSIPParticipantArgsForCall(i int) (context.Context, *livekit.SIPParticipantInfo) { - fake.deleteSIPParticipantMutex.RLock() - defer fake.deleteSIPParticipantMutex.RUnlock() - argsForCall := fake.deleteSIPParticipantArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 -} - -func (fake *FakeSIPStore) DeleteSIPParticipantReturns(result1 error) { - fake.deleteSIPParticipantMutex.Lock() - defer fake.deleteSIPParticipantMutex.Unlock() - fake.DeleteSIPParticipantStub = nil - fake.deleteSIPParticipantReturns = struct { - result1 error - }{result1} -} - -func (fake *FakeSIPStore) DeleteSIPParticipantReturnsOnCall(i int, result1 error) { - fake.deleteSIPParticipantMutex.Lock() - defer fake.deleteSIPParticipantMutex.Unlock() - fake.DeleteSIPParticipantStub = nil - if fake.deleteSIPParticipantReturnsOnCall == nil { - fake.deleteSIPParticipantReturnsOnCall = make(map[int]struct { - result1 error - }) - } - fake.deleteSIPParticipantReturnsOnCall[i] = struct { - result1 error - }{result1} -} - func (fake *FakeSIPStore) DeleteSIPTrunk(arg1 context.Context, arg2 *livekit.SIPTrunkInfo) error { fake.deleteSIPTrunkMutex.Lock() ret, specificReturn := fake.deleteSIPTrunkReturnsOnCall[len(fake.deleteSIPTrunkArgsForCall)] @@ -417,70 +304,6 @@ func (fake *FakeSIPStore) ListSIPDispatchRuleReturnsOnCall(i int, result1 []*liv }{result1, result2} } -func (fake *FakeSIPStore) ListSIPParticipant(arg1 context.Context) ([]*livekit.SIPParticipantInfo, error) { - fake.listSIPParticipantMutex.Lock() - ret, specificReturn := fake.listSIPParticipantReturnsOnCall[len(fake.listSIPParticipantArgsForCall)] - fake.listSIPParticipantArgsForCall = append(fake.listSIPParticipantArgsForCall, struct { - arg1 context.Context - }{arg1}) - stub := fake.ListSIPParticipantStub - fakeReturns := fake.listSIPParticipantReturns - fake.recordInvocation("ListSIPParticipant", []interface{}{arg1}) - fake.listSIPParticipantMutex.Unlock() - if stub != nil { - return stub(arg1) - } - if specificReturn { - return ret.result1, ret.result2 - } - return fakeReturns.result1, fakeReturns.result2 -} - -func (fake *FakeSIPStore) ListSIPParticipantCallCount() int { - fake.listSIPParticipantMutex.RLock() - defer fake.listSIPParticipantMutex.RUnlock() - return len(fake.listSIPParticipantArgsForCall) -} - -func (fake *FakeSIPStore) ListSIPParticipantCalls(stub func(context.Context) ([]*livekit.SIPParticipantInfo, error)) { - fake.listSIPParticipantMutex.Lock() - defer fake.listSIPParticipantMutex.Unlock() - fake.ListSIPParticipantStub = stub -} - -func (fake *FakeSIPStore) ListSIPParticipantArgsForCall(i int) context.Context { - fake.listSIPParticipantMutex.RLock() - defer fake.listSIPParticipantMutex.RUnlock() - argsForCall := fake.listSIPParticipantArgsForCall[i] - return argsForCall.arg1 -} - -func (fake *FakeSIPStore) ListSIPParticipantReturns(result1 []*livekit.SIPParticipantInfo, result2 error) { - fake.listSIPParticipantMutex.Lock() - defer fake.listSIPParticipantMutex.Unlock() - fake.ListSIPParticipantStub = nil - fake.listSIPParticipantReturns = struct { - result1 []*livekit.SIPParticipantInfo - result2 error - }{result1, result2} -} - -func (fake *FakeSIPStore) ListSIPParticipantReturnsOnCall(i int, result1 []*livekit.SIPParticipantInfo, result2 error) { - fake.listSIPParticipantMutex.Lock() - defer fake.listSIPParticipantMutex.Unlock() - fake.ListSIPParticipantStub = nil - if fake.listSIPParticipantReturnsOnCall == nil { - fake.listSIPParticipantReturnsOnCall = make(map[int]struct { - result1 []*livekit.SIPParticipantInfo - result2 error - }) - } - fake.listSIPParticipantReturnsOnCall[i] = struct { - result1 []*livekit.SIPParticipantInfo - result2 error - }{result1, result2} -} - func (fake *FakeSIPStore) ListSIPTrunk(arg1 context.Context) ([]*livekit.SIPTrunkInfo, error) { fake.listSIPTrunkMutex.Lock() ret, specificReturn := fake.listSIPTrunkReturnsOnCall[len(fake.listSIPTrunkArgsForCall)] @@ -610,71 +433,6 @@ func (fake *FakeSIPStore) LoadSIPDispatchRuleReturnsOnCall(i int, result1 *livek }{result1, result2} } -func (fake *FakeSIPStore) LoadSIPParticipant(arg1 context.Context, arg2 string) (*livekit.SIPParticipantInfo, error) { - fake.loadSIPParticipantMutex.Lock() - ret, specificReturn := fake.loadSIPParticipantReturnsOnCall[len(fake.loadSIPParticipantArgsForCall)] - fake.loadSIPParticipantArgsForCall = append(fake.loadSIPParticipantArgsForCall, struct { - arg1 context.Context - arg2 string - }{arg1, arg2}) - stub := fake.LoadSIPParticipantStub - fakeReturns := fake.loadSIPParticipantReturns - fake.recordInvocation("LoadSIPParticipant", []interface{}{arg1, arg2}) - fake.loadSIPParticipantMutex.Unlock() - if stub != nil { - return stub(arg1, arg2) - } - if specificReturn { - return ret.result1, ret.result2 - } - return fakeReturns.result1, fakeReturns.result2 -} - -func (fake *FakeSIPStore) LoadSIPParticipantCallCount() int { - fake.loadSIPParticipantMutex.RLock() - defer fake.loadSIPParticipantMutex.RUnlock() - return len(fake.loadSIPParticipantArgsForCall) -} - -func (fake *FakeSIPStore) LoadSIPParticipantCalls(stub func(context.Context, string) (*livekit.SIPParticipantInfo, error)) { - fake.loadSIPParticipantMutex.Lock() - defer fake.loadSIPParticipantMutex.Unlock() - fake.LoadSIPParticipantStub = stub -} - -func (fake *FakeSIPStore) LoadSIPParticipantArgsForCall(i int) (context.Context, string) { - fake.loadSIPParticipantMutex.RLock() - defer fake.loadSIPParticipantMutex.RUnlock() - argsForCall := fake.loadSIPParticipantArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 -} - -func (fake *FakeSIPStore) LoadSIPParticipantReturns(result1 *livekit.SIPParticipantInfo, result2 error) { - fake.loadSIPParticipantMutex.Lock() - defer fake.loadSIPParticipantMutex.Unlock() - fake.LoadSIPParticipantStub = nil - fake.loadSIPParticipantReturns = struct { - result1 *livekit.SIPParticipantInfo - result2 error - }{result1, result2} -} - -func (fake *FakeSIPStore) LoadSIPParticipantReturnsOnCall(i int, result1 *livekit.SIPParticipantInfo, result2 error) { - fake.loadSIPParticipantMutex.Lock() - defer fake.loadSIPParticipantMutex.Unlock() - fake.LoadSIPParticipantStub = nil - if fake.loadSIPParticipantReturnsOnCall == nil { - fake.loadSIPParticipantReturnsOnCall = make(map[int]struct { - result1 *livekit.SIPParticipantInfo - result2 error - }) - } - fake.loadSIPParticipantReturnsOnCall[i] = struct { - result1 *livekit.SIPParticipantInfo - result2 error - }{result1, result2} -} - func (fake *FakeSIPStore) LoadSIPTrunk(arg1 context.Context, arg2 string) (*livekit.SIPTrunkInfo, error) { fake.loadSIPTrunkMutex.Lock() ret, specificReturn := fake.loadSIPTrunkReturnsOnCall[len(fake.loadSIPTrunkArgsForCall)] @@ -802,68 +560,6 @@ func (fake *FakeSIPStore) StoreSIPDispatchRuleReturnsOnCall(i int, result1 error }{result1} } -func (fake *FakeSIPStore) StoreSIPParticipant(arg1 context.Context, arg2 *livekit.SIPParticipantInfo) error { - fake.storeSIPParticipantMutex.Lock() - ret, specificReturn := fake.storeSIPParticipantReturnsOnCall[len(fake.storeSIPParticipantArgsForCall)] - fake.storeSIPParticipantArgsForCall = append(fake.storeSIPParticipantArgsForCall, struct { - arg1 context.Context - arg2 *livekit.SIPParticipantInfo - }{arg1, arg2}) - stub := fake.StoreSIPParticipantStub - fakeReturns := fake.storeSIPParticipantReturns - fake.recordInvocation("StoreSIPParticipant", []interface{}{arg1, arg2}) - fake.storeSIPParticipantMutex.Unlock() - if stub != nil { - return stub(arg1, arg2) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeSIPStore) StoreSIPParticipantCallCount() int { - fake.storeSIPParticipantMutex.RLock() - defer fake.storeSIPParticipantMutex.RUnlock() - return len(fake.storeSIPParticipantArgsForCall) -} - -func (fake *FakeSIPStore) StoreSIPParticipantCalls(stub func(context.Context, *livekit.SIPParticipantInfo) error) { - fake.storeSIPParticipantMutex.Lock() - defer fake.storeSIPParticipantMutex.Unlock() - fake.StoreSIPParticipantStub = stub -} - -func (fake *FakeSIPStore) StoreSIPParticipantArgsForCall(i int) (context.Context, *livekit.SIPParticipantInfo) { - fake.storeSIPParticipantMutex.RLock() - defer fake.storeSIPParticipantMutex.RUnlock() - argsForCall := fake.storeSIPParticipantArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 -} - -func (fake *FakeSIPStore) StoreSIPParticipantReturns(result1 error) { - fake.storeSIPParticipantMutex.Lock() - defer fake.storeSIPParticipantMutex.Unlock() - fake.StoreSIPParticipantStub = nil - fake.storeSIPParticipantReturns = struct { - result1 error - }{result1} -} - -func (fake *FakeSIPStore) StoreSIPParticipantReturnsOnCall(i int, result1 error) { - fake.storeSIPParticipantMutex.Lock() - defer fake.storeSIPParticipantMutex.Unlock() - fake.StoreSIPParticipantStub = nil - if fake.storeSIPParticipantReturnsOnCall == nil { - fake.storeSIPParticipantReturnsOnCall = make(map[int]struct { - result1 error - }) - } - fake.storeSIPParticipantReturnsOnCall[i] = struct { - result1 error - }{result1} -} - func (fake *FakeSIPStore) StoreSIPTrunk(arg1 context.Context, arg2 *livekit.SIPTrunkInfo) error { fake.storeSIPTrunkMutex.Lock() ret, specificReturn := fake.storeSIPTrunkReturnsOnCall[len(fake.storeSIPTrunkArgsForCall)] @@ -931,26 +627,18 @@ func (fake *FakeSIPStore) Invocations() map[string][][]interface{} { defer fake.invocationsMutex.RUnlock() fake.deleteSIPDispatchRuleMutex.RLock() defer fake.deleteSIPDispatchRuleMutex.RUnlock() - fake.deleteSIPParticipantMutex.RLock() - defer fake.deleteSIPParticipantMutex.RUnlock() fake.deleteSIPTrunkMutex.RLock() defer fake.deleteSIPTrunkMutex.RUnlock() fake.listSIPDispatchRuleMutex.RLock() defer fake.listSIPDispatchRuleMutex.RUnlock() - fake.listSIPParticipantMutex.RLock() - defer fake.listSIPParticipantMutex.RUnlock() fake.listSIPTrunkMutex.RLock() defer fake.listSIPTrunkMutex.RUnlock() fake.loadSIPDispatchRuleMutex.RLock() defer fake.loadSIPDispatchRuleMutex.RUnlock() - fake.loadSIPParticipantMutex.RLock() - defer fake.loadSIPParticipantMutex.RUnlock() fake.loadSIPTrunkMutex.RLock() defer fake.loadSIPTrunkMutex.RUnlock() fake.storeSIPDispatchRuleMutex.RLock() defer fake.storeSIPDispatchRuleMutex.RUnlock() - fake.storeSIPParticipantMutex.RLock() - defer fake.storeSIPParticipantMutex.RUnlock() fake.storeSIPTrunkMutex.RLock() defer fake.storeSIPTrunkMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} diff --git a/test/client/client.go b/test/client/client.go index 70b135a6d..fbe6954d9 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -114,7 +114,7 @@ type Options struct { } func NewWebSocketConn(host, token string, opts *Options) (*websocket.Conn, error) { - u, err := url.Parse(host + "/rtc?protocol=7") + u, err := url.Parse(host + fmt.Sprintf("/rtc?protocol=%d", types.CurrentProtocol)) if err != nil { return nil, err } @@ -493,7 +493,10 @@ func (c *RTCClient) Stop() { logger.Infow("stopping client", "ID", c.ID()) _ = c.SendRequest(&livekit.SignalRequest{ Message: &livekit.SignalRequest_Leave{ - Leave: &livekit.LeaveRequest{}, + Leave: &livekit.LeaveRequest{ + Reason: livekit.DisconnectReason_CLIENT_INITIATED, + Action: livekit.LeaveRequest_DISCONNECT, + }, }, }) c.publisherFullyEstablished.Store(false)