Augment LeaveRequest with alternate regions to connect. (#2408)

* Augment LeaveRequest with alternate regions to connect.

* update protocol and issue resume action on close if expected to resume

* use current protocol in tests

* address feedback
This commit is contained in:
Raja Subramanian
2024-01-25 22:22:46 +05:30
committed by GitHub
parent 43a40eb52d
commit d3da94c45e
9 changed files with 146 additions and 347 deletions
+2 -2
View File
@@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20231213075826-cccbf2b93d3f
github.com/livekit/protocol v1.9.5
github.com/livekit/protocol v1.9.6-0.20240125083757-31b03e690557
github.com/livekit/psrpc v0.5.3-0.20231214055026-06ce27a934c9
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
@@ -101,7 +101,7 @@ require (
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/grpc v1.61.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
+4 -4
View File
@@ -126,8 +126,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20231213075826-cccbf2b93d3f h1:XHrwGwLNGQB3ZqolH1YdMH/22hgXKr4vm+2M7JKMMGg=
github.com/livekit/mediatransportutil v0.0.0-20231213075826-cccbf2b93d3f/go.mod h1:GBzn9xL+mivI1pW+tyExcKgbc0VOc29I9yJsNcAVaAc=
github.com/livekit/protocol v1.9.5 h1:/I6maM05euoUxrV6je16Qj5yCnCSPZ+nhHzm8akLCVk=
github.com/livekit/protocol v1.9.5/go.mod h1:daddOPw85C9nq6f9w1uiuc1i/He6X2gArlFcKUPELI4=
github.com/livekit/protocol v1.9.6-0.20240125083757-31b03e690557 h1:lz11rtmZouO9C7RcDJpg5tWERUEAMLTYjtHX/mRyras=
github.com/livekit/protocol v1.9.6-0.20240125083757-31b03e690557/go.mod h1:daddOPw85C9nq6f9w1uiuc1i/He6X2gArlFcKUPELI4=
github.com/livekit/psrpc v0.5.3-0.20231214055026-06ce27a934c9 h1:kXXV/NLVDHZ+Gn7xrR+UPpdwbH48n7WReBjLHAzqzhY=
github.com/livekit/psrpc v0.5.3-0.20231214055026-06ce27a934c9/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
@@ -421,8 +421,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac h1:nUQEQmH/csSvFECKYRv6HWEyypysidKl2I6Qpsglq/0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac/go.mod h1:daQN87bsDqDoe316QbbvX60nMoJQa4r6Ds0ZuoAe5yA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s=
google.golang.org/grpc v1.61.0 h1:TOvOcuXn30kRao+gfcvsebNEa5iZIiLkisYEkf7R7o0=
google.golang.org/grpc v1.61.0/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+51 -10
View File
@@ -221,6 +221,8 @@ type ParticipantImpl struct {
// loggers for publisher and subscriber
pubLogger logger.Logger
subLogger logger.Logger
regionSettings *livekit.RegionSettings
}
func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
@@ -582,8 +584,7 @@ func (p *ParticipantImpl) HandleSignalSourceClose() {
p.TransportManager.SetSignalSourceValid(false)
if !p.HasConnected() {
reason := types.ParticipantCloseReasonJoinFailed
_ = p.Close(false, reason, false)
_ = p.Close(false, types.ParticipantCloseReasonSignalSourceClose, false)
}
}
@@ -749,12 +750,32 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
p.clearMigrationTimer()
// send leave message
if sendLeave {
var leave *livekit.LeaveRequest
if p.ProtocolVersion().SupportsRegionsInLeaveRequest() {
leave = &livekit.LeaveRequest{
Reason: reason.ToDisconnectReason(),
}
if isExpectedToResume {
leave.Action = livekit.LeaveRequest_RESUME
} else {
leave.Action = livekit.LeaveRequest_DISCONNECT
}
// although regions are not needed when resuming OR disconnecting,
// send it if available, just in case clients want to fall back.
p.lock.RLock()
if p.regionSettings != nil {
leave.Regions = proto.Clone(p.regionSettings).(*livekit.RegionSettings)
}
p.lock.RUnlock()
} else if sendLeave {
leave = &livekit.LeaveRequest{
Reason: reason.ToDisconnectReason(),
}
}
if leave != nil {
_ = p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Leave{
Leave: &livekit.LeaveRequest{
Reason: reason.ToDisconnectReason(),
},
Leave: leave,
},
})
}
@@ -2282,12 +2303,26 @@ func (p *ParticipantImpl) GetCachedDownTrack(trackID livekit.TrackID) (*webrtc.R
}
func (p *ParticipantImpl) IssueFullReconnect(reason types.ParticipantCloseReason) {
var leave *livekit.LeaveRequest
if p.ProtocolVersion().SupportsRegionsInLeaveRequest() {
leave = &livekit.LeaveRequest{
Reason: reason.ToDisconnectReason(),
Action: livekit.LeaveRequest_RECONNECT,
}
p.lock.RLock()
if p.regionSettings != nil {
leave.Regions = proto.Clone(p.regionSettings).(*livekit.RegionSettings)
}
p.lock.RUnlock()
} else {
leave = &livekit.LeaveRequest{
CanReconnect: true,
Reason: reason.ToDisconnectReason(),
}
}
_ = p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Leave{
Leave: &livekit.LeaveRequest{
CanReconnect: true,
Reason: reason.ToDisconnectReason(),
},
Leave: leave,
},
})
@@ -2451,3 +2486,9 @@ func (p *ParticipantImpl) setupEnabledCodecs(publishEnabledCodecs []*livekit.Cod
}
p.enabledSubscribeCodecs = subscribeCodecs
}
func (p *ParticipantImpl) SetRegionSettings(regionSettings *livekit.RegionSettings) {
p.lock.Lock()
p.regionSettings = proto.Clone(regionSettings).(*livekit.RegionSettings)
p.lock.Unlock()
}
+11 -8
View File
@@ -105,6 +105,7 @@ const (
ParticipantCloseReasonSubscriptionError
ParticipantCloseReasonDataChannelError
ParticipantCloseReasonMigrateCodecMismatch
ParticipantCloseReasonSignalSourceClose
)
func (p ParticipantCloseReason) String() string {
@@ -157,6 +158,8 @@ func (p ParticipantCloseReason) String() string {
return "DATA_CHANNEL_ERROR"
case ParticipantCloseReasonMigrateCodecMismatch:
return "MIGRATE_CODEC_MISMATCH"
case ParticipantCloseReasonSignalSourceClose:
return "SIGNAL_SOURCE_CLOSE"
default:
return fmt.Sprintf("%d", int(p))
}
@@ -173,22 +176,20 @@ func (p ParticipantCloseReason) ToDisconnectReason() livekit.DisconnectReason {
return livekit.DisconnectReason_JOIN_FAILURE
case ParticipantCloseReasonPeerConnectionDisconnected:
return livekit.DisconnectReason_STATE_MISMATCH
case ParticipantCloseReasonDuplicateIdentity, ParticipantCloseReasonMigrationComplete, ParticipantCloseReasonStale:
case ParticipantCloseReasonDuplicateIdentity, ParticipantCloseReasonStale:
return livekit.DisconnectReason_DUPLICATE_IDENTITY
case ParticipantCloseReasonMigrationComplete, ParticipantCloseReasonSimulateMigration:
return livekit.DisconnectReason_MIGRATION
case ParticipantCloseReasonServiceRequestRemoveParticipant:
return livekit.DisconnectReason_PARTICIPANT_REMOVED
case ParticipantCloseReasonServiceRequestDeleteRoom:
return livekit.DisconnectReason_ROOM_DELETED
case ParticipantCloseReasonSimulateMigration:
return livekit.DisconnectReason_DUPLICATE_IDENTITY
case ParticipantCloseReasonSimulateNodeFailure:
return livekit.DisconnectReason_SERVER_SHUTDOWN
case ParticipantCloseReasonSimulateServerLeave:
return livekit.DisconnectReason_SERVER_SHUTDOWN
case ParticipantCloseReasonOvercommitted:
case ParticipantCloseReasonSimulateNodeFailure, ParticipantCloseReasonSimulateServerLeave, ParticipantCloseReasonOvercommitted:
return livekit.DisconnectReason_SERVER_SHUTDOWN
case ParticipantCloseReasonNegotiateFailed, ParticipantCloseReasonPublicationError, ParticipantCloseReasonSubscriptionError, ParticipantCloseReasonDataChannelError, ParticipantCloseReasonMigrateCodecMismatch:
return livekit.DisconnectReason_STATE_MISMATCH
case ParticipantCloseReasonSignalSourceClose:
return livekit.DisconnectReason_SIGNAL_CLOSE
default:
// the other types will map to unknown reason
return livekit.DisconnectReason_UNKNOWN_REASON
@@ -421,6 +422,8 @@ type LocalParticipant interface {
GetPacer() pacer.Pacer
GetTrafficLoad() *TrafficLoad
SetRegionSettings(regionSettings *livekit.RegionSettings)
}
// Room is a container of participants, and can provide room-level actions
+5 -1
View File
@@ -16,7 +16,7 @@ package types
type ProtocolVersion int
const CurrentProtocol = 12
const CurrentProtocol = 13
func (v ProtocolVersion) SupportsPackedStreamId() bool {
return v > 0
@@ -83,3 +83,7 @@ func (v ProtocolVersion) SupportsConnectionQualityLost() bool {
func (v ProtocolVersion) SupportsAsyncRoomID() bool {
return v > 11
}
func (v ProtocolVersion) SupportsRegionsInLeaveRequest() bool {
return v > 12
}
@@ -753,6 +753,11 @@ type FakeLocalParticipant struct {
setPermissionReturnsOnCall map[int]struct {
result1 bool
}
SetRegionSettingsStub func(*livekit.RegionSettings)
setRegionSettingsMutex sync.RWMutex
setRegionSettingsArgsForCall []struct {
arg1 *livekit.RegionSettings
}
SetResponseSinkStub func(routing.MessageSink)
setResponseSinkMutex sync.RWMutex
setResponseSinkArgsForCall []struct {
@@ -4981,6 +4986,38 @@ func (fake *FakeLocalParticipant) SetPermissionReturnsOnCall(i int, result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) SetRegionSettings(arg1 *livekit.RegionSettings) {
fake.setRegionSettingsMutex.Lock()
fake.setRegionSettingsArgsForCall = append(fake.setRegionSettingsArgsForCall, struct {
arg1 *livekit.RegionSettings
}{arg1})
stub := fake.SetRegionSettingsStub
fake.recordInvocation("SetRegionSettings", []interface{}{arg1})
fake.setRegionSettingsMutex.Unlock()
if stub != nil {
fake.SetRegionSettingsStub(arg1)
}
}
func (fake *FakeLocalParticipant) SetRegionSettingsCallCount() int {
fake.setRegionSettingsMutex.RLock()
defer fake.setRegionSettingsMutex.RUnlock()
return len(fake.setRegionSettingsArgsForCall)
}
func (fake *FakeLocalParticipant) SetRegionSettingsCalls(stub func(*livekit.RegionSettings)) {
fake.setRegionSettingsMutex.Lock()
defer fake.setRegionSettingsMutex.Unlock()
fake.SetRegionSettingsStub = stub
}
func (fake *FakeLocalParticipant) SetRegionSettingsArgsForCall(i int) *livekit.RegionSettings {
fake.setRegionSettingsMutex.RLock()
defer fake.setRegionSettingsMutex.RUnlock()
argsForCall := fake.setRegionSettingsArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) SetResponseSink(arg1 routing.MessageSink) {
fake.setResponseSinkMutex.Lock()
fake.setResponseSinkArgsForCall = append(fake.setResponseSinkArgsForCall, struct {
@@ -6343,6 +6380,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.setNameMutex.RUnlock()
fake.setPermissionMutex.RLock()
defer fake.setPermissionMutex.RUnlock()
fake.setRegionSettingsMutex.RLock()
defer fake.setRegionSettingsMutex.RUnlock()
fake.setResponseSinkMutex.RLock()
defer fake.setResponseSinkMutex.RUnlock()
fake.setSignalSourceValidMutex.RLock()
+29 -8
View File
@@ -276,12 +276,23 @@ func (r *RoomManager) StartSession(
"participant", pi.Identity,
"reason", pi.ReconnectReason,
)
var leave *livekit.LeaveRequest
pv := types.ProtocolVersion(pi.Client.Protocol)
if pv.SupportsRegionsInLeaveRequest() {
leave = &livekit.LeaveRequest{
Reason: livekit.DisconnectReason_STATE_MISMATCH,
Action: livekit.LeaveRequest_RECONNECT,
}
} else {
leave = &livekit.LeaveRequest{
CanReconnect: true,
Reason: livekit.DisconnectReason_STATE_MISMATCH,
}
}
_ = responseSink.WriteMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Leave{
Leave: &livekit.LeaveRequest{
CanReconnect: true,
Reason: livekit.DisconnectReason_STATE_MISMATCH,
},
Leave: leave,
},
})
return errors.New("could not restart closed participant")
@@ -321,12 +332,22 @@ func (r *RoomManager) StartSession(
} else if pi.Reconnect {
// send leave request if participant is trying to reconnect without keep subscribe state
// but missing from the room
var leave *livekit.LeaveRequest
pv := types.ProtocolVersion(pi.Client.Protocol)
if pv.SupportsRegionsInLeaveRequest() {
leave = &livekit.LeaveRequest{
Reason: livekit.DisconnectReason_STATE_MISMATCH,
Action: livekit.LeaveRequest_RECONNECT,
}
} else {
leave = &livekit.LeaveRequest{
CanReconnect: true,
Reason: livekit.DisconnectReason_STATE_MISMATCH,
}
}
_ = responseSink.WriteMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Leave{
Leave: &livekit.LeaveRequest{
CanReconnect: true,
Reason: livekit.DisconnectReason_STATE_MISMATCH,
},
Leave: leave,
},
})
return errors.New("could not restart participant")
-312
View File
@@ -22,18 +22,6 @@ type FakeSIPStore struct {
deleteSIPDispatchRuleReturnsOnCall map[int]struct {
result1 error
}
DeleteSIPParticipantStub func(context.Context, *livekit.SIPParticipantInfo) error
deleteSIPParticipantMutex sync.RWMutex
deleteSIPParticipantArgsForCall []struct {
arg1 context.Context
arg2 *livekit.SIPParticipantInfo
}
deleteSIPParticipantReturns struct {
result1 error
}
deleteSIPParticipantReturnsOnCall map[int]struct {
result1 error
}
DeleteSIPTrunkStub func(context.Context, *livekit.SIPTrunkInfo) error
deleteSIPTrunkMutex sync.RWMutex
deleteSIPTrunkArgsForCall []struct {
@@ -59,19 +47,6 @@ type FakeSIPStore struct {
result1 []*livekit.SIPDispatchRuleInfo
result2 error
}
ListSIPParticipantStub func(context.Context) ([]*livekit.SIPParticipantInfo, error)
listSIPParticipantMutex sync.RWMutex
listSIPParticipantArgsForCall []struct {
arg1 context.Context
}
listSIPParticipantReturns struct {
result1 []*livekit.SIPParticipantInfo
result2 error
}
listSIPParticipantReturnsOnCall map[int]struct {
result1 []*livekit.SIPParticipantInfo
result2 error
}
ListSIPTrunkStub func(context.Context) ([]*livekit.SIPTrunkInfo, error)
listSIPTrunkMutex sync.RWMutex
listSIPTrunkArgsForCall []struct {
@@ -99,20 +74,6 @@ type FakeSIPStore struct {
result1 *livekit.SIPDispatchRuleInfo
result2 error
}
LoadSIPParticipantStub func(context.Context, string) (*livekit.SIPParticipantInfo, error)
loadSIPParticipantMutex sync.RWMutex
loadSIPParticipantArgsForCall []struct {
arg1 context.Context
arg2 string
}
loadSIPParticipantReturns struct {
result1 *livekit.SIPParticipantInfo
result2 error
}
loadSIPParticipantReturnsOnCall map[int]struct {
result1 *livekit.SIPParticipantInfo
result2 error
}
LoadSIPTrunkStub func(context.Context, string) (*livekit.SIPTrunkInfo, error)
loadSIPTrunkMutex sync.RWMutex
loadSIPTrunkArgsForCall []struct {
@@ -139,18 +100,6 @@ type FakeSIPStore struct {
storeSIPDispatchRuleReturnsOnCall map[int]struct {
result1 error
}
StoreSIPParticipantStub func(context.Context, *livekit.SIPParticipantInfo) error
storeSIPParticipantMutex sync.RWMutex
storeSIPParticipantArgsForCall []struct {
arg1 context.Context
arg2 *livekit.SIPParticipantInfo
}
storeSIPParticipantReturns struct {
result1 error
}
storeSIPParticipantReturnsOnCall map[int]struct {
result1 error
}
StoreSIPTrunkStub func(context.Context, *livekit.SIPTrunkInfo) error
storeSIPTrunkMutex sync.RWMutex
storeSIPTrunkArgsForCall []struct {
@@ -229,68 +178,6 @@ func (fake *FakeSIPStore) DeleteSIPDispatchRuleReturnsOnCall(i int, result1 erro
}{result1}
}
func (fake *FakeSIPStore) DeleteSIPParticipant(arg1 context.Context, arg2 *livekit.SIPParticipantInfo) error {
fake.deleteSIPParticipantMutex.Lock()
ret, specificReturn := fake.deleteSIPParticipantReturnsOnCall[len(fake.deleteSIPParticipantArgsForCall)]
fake.deleteSIPParticipantArgsForCall = append(fake.deleteSIPParticipantArgsForCall, struct {
arg1 context.Context
arg2 *livekit.SIPParticipantInfo
}{arg1, arg2})
stub := fake.DeleteSIPParticipantStub
fakeReturns := fake.deleteSIPParticipantReturns
fake.recordInvocation("DeleteSIPParticipant", []interface{}{arg1, arg2})
fake.deleteSIPParticipantMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeSIPStore) DeleteSIPParticipantCallCount() int {
fake.deleteSIPParticipantMutex.RLock()
defer fake.deleteSIPParticipantMutex.RUnlock()
return len(fake.deleteSIPParticipantArgsForCall)
}
func (fake *FakeSIPStore) DeleteSIPParticipantCalls(stub func(context.Context, *livekit.SIPParticipantInfo) error) {
fake.deleteSIPParticipantMutex.Lock()
defer fake.deleteSIPParticipantMutex.Unlock()
fake.DeleteSIPParticipantStub = stub
}
func (fake *FakeSIPStore) DeleteSIPParticipantArgsForCall(i int) (context.Context, *livekit.SIPParticipantInfo) {
fake.deleteSIPParticipantMutex.RLock()
defer fake.deleteSIPParticipantMutex.RUnlock()
argsForCall := fake.deleteSIPParticipantArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeSIPStore) DeleteSIPParticipantReturns(result1 error) {
fake.deleteSIPParticipantMutex.Lock()
defer fake.deleteSIPParticipantMutex.Unlock()
fake.DeleteSIPParticipantStub = nil
fake.deleteSIPParticipantReturns = struct {
result1 error
}{result1}
}
func (fake *FakeSIPStore) DeleteSIPParticipantReturnsOnCall(i int, result1 error) {
fake.deleteSIPParticipantMutex.Lock()
defer fake.deleteSIPParticipantMutex.Unlock()
fake.DeleteSIPParticipantStub = nil
if fake.deleteSIPParticipantReturnsOnCall == nil {
fake.deleteSIPParticipantReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.deleteSIPParticipantReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeSIPStore) DeleteSIPTrunk(arg1 context.Context, arg2 *livekit.SIPTrunkInfo) error {
fake.deleteSIPTrunkMutex.Lock()
ret, specificReturn := fake.deleteSIPTrunkReturnsOnCall[len(fake.deleteSIPTrunkArgsForCall)]
@@ -417,70 +304,6 @@ func (fake *FakeSIPStore) ListSIPDispatchRuleReturnsOnCall(i int, result1 []*liv
}{result1, result2}
}
func (fake *FakeSIPStore) ListSIPParticipant(arg1 context.Context) ([]*livekit.SIPParticipantInfo, error) {
fake.listSIPParticipantMutex.Lock()
ret, specificReturn := fake.listSIPParticipantReturnsOnCall[len(fake.listSIPParticipantArgsForCall)]
fake.listSIPParticipantArgsForCall = append(fake.listSIPParticipantArgsForCall, struct {
arg1 context.Context
}{arg1})
stub := fake.ListSIPParticipantStub
fakeReturns := fake.listSIPParticipantReturns
fake.recordInvocation("ListSIPParticipant", []interface{}{arg1})
fake.listSIPParticipantMutex.Unlock()
if stub != nil {
return stub(arg1)
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeSIPStore) ListSIPParticipantCallCount() int {
fake.listSIPParticipantMutex.RLock()
defer fake.listSIPParticipantMutex.RUnlock()
return len(fake.listSIPParticipantArgsForCall)
}
func (fake *FakeSIPStore) ListSIPParticipantCalls(stub func(context.Context) ([]*livekit.SIPParticipantInfo, error)) {
fake.listSIPParticipantMutex.Lock()
defer fake.listSIPParticipantMutex.Unlock()
fake.ListSIPParticipantStub = stub
}
func (fake *FakeSIPStore) ListSIPParticipantArgsForCall(i int) context.Context {
fake.listSIPParticipantMutex.RLock()
defer fake.listSIPParticipantMutex.RUnlock()
argsForCall := fake.listSIPParticipantArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeSIPStore) ListSIPParticipantReturns(result1 []*livekit.SIPParticipantInfo, result2 error) {
fake.listSIPParticipantMutex.Lock()
defer fake.listSIPParticipantMutex.Unlock()
fake.ListSIPParticipantStub = nil
fake.listSIPParticipantReturns = struct {
result1 []*livekit.SIPParticipantInfo
result2 error
}{result1, result2}
}
func (fake *FakeSIPStore) ListSIPParticipantReturnsOnCall(i int, result1 []*livekit.SIPParticipantInfo, result2 error) {
fake.listSIPParticipantMutex.Lock()
defer fake.listSIPParticipantMutex.Unlock()
fake.ListSIPParticipantStub = nil
if fake.listSIPParticipantReturnsOnCall == nil {
fake.listSIPParticipantReturnsOnCall = make(map[int]struct {
result1 []*livekit.SIPParticipantInfo
result2 error
})
}
fake.listSIPParticipantReturnsOnCall[i] = struct {
result1 []*livekit.SIPParticipantInfo
result2 error
}{result1, result2}
}
func (fake *FakeSIPStore) ListSIPTrunk(arg1 context.Context) ([]*livekit.SIPTrunkInfo, error) {
fake.listSIPTrunkMutex.Lock()
ret, specificReturn := fake.listSIPTrunkReturnsOnCall[len(fake.listSIPTrunkArgsForCall)]
@@ -610,71 +433,6 @@ func (fake *FakeSIPStore) LoadSIPDispatchRuleReturnsOnCall(i int, result1 *livek
}{result1, result2}
}
func (fake *FakeSIPStore) LoadSIPParticipant(arg1 context.Context, arg2 string) (*livekit.SIPParticipantInfo, error) {
fake.loadSIPParticipantMutex.Lock()
ret, specificReturn := fake.loadSIPParticipantReturnsOnCall[len(fake.loadSIPParticipantArgsForCall)]
fake.loadSIPParticipantArgsForCall = append(fake.loadSIPParticipantArgsForCall, struct {
arg1 context.Context
arg2 string
}{arg1, arg2})
stub := fake.LoadSIPParticipantStub
fakeReturns := fake.loadSIPParticipantReturns
fake.recordInvocation("LoadSIPParticipant", []interface{}{arg1, arg2})
fake.loadSIPParticipantMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeSIPStore) LoadSIPParticipantCallCount() int {
fake.loadSIPParticipantMutex.RLock()
defer fake.loadSIPParticipantMutex.RUnlock()
return len(fake.loadSIPParticipantArgsForCall)
}
func (fake *FakeSIPStore) LoadSIPParticipantCalls(stub func(context.Context, string) (*livekit.SIPParticipantInfo, error)) {
fake.loadSIPParticipantMutex.Lock()
defer fake.loadSIPParticipantMutex.Unlock()
fake.LoadSIPParticipantStub = stub
}
func (fake *FakeSIPStore) LoadSIPParticipantArgsForCall(i int) (context.Context, string) {
fake.loadSIPParticipantMutex.RLock()
defer fake.loadSIPParticipantMutex.RUnlock()
argsForCall := fake.loadSIPParticipantArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeSIPStore) LoadSIPParticipantReturns(result1 *livekit.SIPParticipantInfo, result2 error) {
fake.loadSIPParticipantMutex.Lock()
defer fake.loadSIPParticipantMutex.Unlock()
fake.LoadSIPParticipantStub = nil
fake.loadSIPParticipantReturns = struct {
result1 *livekit.SIPParticipantInfo
result2 error
}{result1, result2}
}
func (fake *FakeSIPStore) LoadSIPParticipantReturnsOnCall(i int, result1 *livekit.SIPParticipantInfo, result2 error) {
fake.loadSIPParticipantMutex.Lock()
defer fake.loadSIPParticipantMutex.Unlock()
fake.LoadSIPParticipantStub = nil
if fake.loadSIPParticipantReturnsOnCall == nil {
fake.loadSIPParticipantReturnsOnCall = make(map[int]struct {
result1 *livekit.SIPParticipantInfo
result2 error
})
}
fake.loadSIPParticipantReturnsOnCall[i] = struct {
result1 *livekit.SIPParticipantInfo
result2 error
}{result1, result2}
}
func (fake *FakeSIPStore) LoadSIPTrunk(arg1 context.Context, arg2 string) (*livekit.SIPTrunkInfo, error) {
fake.loadSIPTrunkMutex.Lock()
ret, specificReturn := fake.loadSIPTrunkReturnsOnCall[len(fake.loadSIPTrunkArgsForCall)]
@@ -802,68 +560,6 @@ func (fake *FakeSIPStore) StoreSIPDispatchRuleReturnsOnCall(i int, result1 error
}{result1}
}
func (fake *FakeSIPStore) StoreSIPParticipant(arg1 context.Context, arg2 *livekit.SIPParticipantInfo) error {
fake.storeSIPParticipantMutex.Lock()
ret, specificReturn := fake.storeSIPParticipantReturnsOnCall[len(fake.storeSIPParticipantArgsForCall)]
fake.storeSIPParticipantArgsForCall = append(fake.storeSIPParticipantArgsForCall, struct {
arg1 context.Context
arg2 *livekit.SIPParticipantInfo
}{arg1, arg2})
stub := fake.StoreSIPParticipantStub
fakeReturns := fake.storeSIPParticipantReturns
fake.recordInvocation("StoreSIPParticipant", []interface{}{arg1, arg2})
fake.storeSIPParticipantMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeSIPStore) StoreSIPParticipantCallCount() int {
fake.storeSIPParticipantMutex.RLock()
defer fake.storeSIPParticipantMutex.RUnlock()
return len(fake.storeSIPParticipantArgsForCall)
}
func (fake *FakeSIPStore) StoreSIPParticipantCalls(stub func(context.Context, *livekit.SIPParticipantInfo) error) {
fake.storeSIPParticipantMutex.Lock()
defer fake.storeSIPParticipantMutex.Unlock()
fake.StoreSIPParticipantStub = stub
}
func (fake *FakeSIPStore) StoreSIPParticipantArgsForCall(i int) (context.Context, *livekit.SIPParticipantInfo) {
fake.storeSIPParticipantMutex.RLock()
defer fake.storeSIPParticipantMutex.RUnlock()
argsForCall := fake.storeSIPParticipantArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeSIPStore) StoreSIPParticipantReturns(result1 error) {
fake.storeSIPParticipantMutex.Lock()
defer fake.storeSIPParticipantMutex.Unlock()
fake.StoreSIPParticipantStub = nil
fake.storeSIPParticipantReturns = struct {
result1 error
}{result1}
}
func (fake *FakeSIPStore) StoreSIPParticipantReturnsOnCall(i int, result1 error) {
fake.storeSIPParticipantMutex.Lock()
defer fake.storeSIPParticipantMutex.Unlock()
fake.StoreSIPParticipantStub = nil
if fake.storeSIPParticipantReturnsOnCall == nil {
fake.storeSIPParticipantReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.storeSIPParticipantReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeSIPStore) StoreSIPTrunk(arg1 context.Context, arg2 *livekit.SIPTrunkInfo) error {
fake.storeSIPTrunkMutex.Lock()
ret, specificReturn := fake.storeSIPTrunkReturnsOnCall[len(fake.storeSIPTrunkArgsForCall)]
@@ -931,26 +627,18 @@ func (fake *FakeSIPStore) Invocations() map[string][][]interface{} {
defer fake.invocationsMutex.RUnlock()
fake.deleteSIPDispatchRuleMutex.RLock()
defer fake.deleteSIPDispatchRuleMutex.RUnlock()
fake.deleteSIPParticipantMutex.RLock()
defer fake.deleteSIPParticipantMutex.RUnlock()
fake.deleteSIPTrunkMutex.RLock()
defer fake.deleteSIPTrunkMutex.RUnlock()
fake.listSIPDispatchRuleMutex.RLock()
defer fake.listSIPDispatchRuleMutex.RUnlock()
fake.listSIPParticipantMutex.RLock()
defer fake.listSIPParticipantMutex.RUnlock()
fake.listSIPTrunkMutex.RLock()
defer fake.listSIPTrunkMutex.RUnlock()
fake.loadSIPDispatchRuleMutex.RLock()
defer fake.loadSIPDispatchRuleMutex.RUnlock()
fake.loadSIPParticipantMutex.RLock()
defer fake.loadSIPParticipantMutex.RUnlock()
fake.loadSIPTrunkMutex.RLock()
defer fake.loadSIPTrunkMutex.RUnlock()
fake.storeSIPDispatchRuleMutex.RLock()
defer fake.storeSIPDispatchRuleMutex.RUnlock()
fake.storeSIPParticipantMutex.RLock()
defer fake.storeSIPParticipantMutex.RUnlock()
fake.storeSIPTrunkMutex.RLock()
defer fake.storeSIPTrunkMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
+5 -2
View File
@@ -114,7 +114,7 @@ type Options struct {
}
func NewWebSocketConn(host, token string, opts *Options) (*websocket.Conn, error) {
u, err := url.Parse(host + "/rtc?protocol=7")
u, err := url.Parse(host + fmt.Sprintf("/rtc?protocol=%d", types.CurrentProtocol))
if err != nil {
return nil, err
}
@@ -493,7 +493,10 @@ func (c *RTCClient) Stop() {
logger.Infow("stopping client", "ID", c.ID())
_ = c.SendRequest(&livekit.SignalRequest{
Message: &livekit.SignalRequest_Leave{
Leave: &livekit.LeaveRequest{},
Leave: &livekit.LeaveRequest{
Reason: livekit.DisconnectReason_CLIENT_INITIATED,
Action: livekit.LeaveRequest_DISCONNECT,
},
},
})
c.publisherFullyEstablished.Store(false)