mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 15:35:41 +00:00
feat: server rpc apis (#3904)
* feat: server rpc apis * fix: cleanup * fix: move rpc impl to participant * cleanup * cleanup, psrpc errors * remove TODO comment * update protocol, handle participant disconnect case * add ephemeral participant identity to rpc data packet * fix: panic * chore(deps): bump proto
This commit is contained in:
committed by
GitHub
parent
80b1166292
commit
990c5fafbb
12
go.mod
12
go.mod
@@ -23,7 +23,7 @@ require (
|
||||
github.com/jxskiss/base62 v1.1.0
|
||||
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
|
||||
github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397
|
||||
github.com/livekit/protocol v1.41.1-0.20250919162624-3476f457ed91
|
||||
github.com/livekit/protocol v1.42.1-0.20250924073819-3e4e117b0437
|
||||
github.com/livekit/psrpc v0.7.0
|
||||
github.com/mackerelio/go-osstat v0.2.6
|
||||
github.com/magefile/mage v1.15.0
|
||||
@@ -74,8 +74,8 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250717185734-6c6e0d3c608e.1 // indirect
|
||||
buf.build/go/protovalidate v0.14.0 // indirect
|
||||
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250912141014-52f32327d4b0.1 // indirect
|
||||
buf.build/go/protovalidate v1.0.0 // indirect
|
||||
buf.build/go/protoyaml v0.6.0 // indirect
|
||||
cel.dev/expr v0.24.0 // indirect
|
||||
dario.cat/mergo v1.0.0 // indirect
|
||||
@@ -115,7 +115,7 @@ require (
|
||||
github.com/moby/docker-image-spec v1.3.1 // indirect
|
||||
github.com/moby/term v0.5.0 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/nats-io/nats.go v1.45.0 // indirect
|
||||
github.com/nats-io/nats.go v1.46.0 // indirect
|
||||
github.com/nats-io/nkeys v0.4.11 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
@@ -145,8 +145,8 @@ require (
|
||||
golang.org/x/sys v0.36.0 // indirect
|
||||
golang.org/x/text v0.29.0 // indirect
|
||||
golang.org/x/tools v0.37.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250922171735-9219d122eba9 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9 // indirect
|
||||
google.golang.org/grpc v1.75.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
)
|
||||
|
||||
24
go.sum
24
go.sum
@@ -1,7 +1,7 @@
|
||||
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250717185734-6c6e0d3c608e.1 h1:u98oQG8CHYBrOWrYdqbyNpKz4Pw02ssv03DsTInnXn8=
|
||||
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250717185734-6c6e0d3c608e.1/go.mod h1:aY3zbkNan5F+cGm9lITDP6oxJIwu0dn9KjJuJjWaHkg=
|
||||
buf.build/go/protovalidate v0.14.0 h1:kr/rC/no+DtRyYX+8KXLDxNnI1rINz0imk5K44ZpZ3A=
|
||||
buf.build/go/protovalidate v0.14.0/go.mod h1:+F/oISho9MO7gJQNYC2VWLzcO1fTPmaTA08SDYJZncA=
|
||||
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250912141014-52f32327d4b0.1 h1:DQLS/rRxLHuugVzjJU5AvOwD57pdFl9he/0O7e5P294=
|
||||
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250912141014-52f32327d4b0.1/go.mod h1:aY3zbkNan5F+cGm9lITDP6oxJIwu0dn9KjJuJjWaHkg=
|
||||
buf.build/go/protovalidate v1.0.0 h1:IAG1etULddAy93fiBsFVhpj7es5zL53AfB/79CVGtyY=
|
||||
buf.build/go/protovalidate v1.0.0/go.mod h1:KQmEUrcQuC99hAw+juzOEAmILScQiKBP1Oc36vvCLW8=
|
||||
buf.build/go/protoyaml v0.6.0 h1:Nzz1lvcXF8YgNZXk+voPPwdU8FjDPTUV4ndNTXN0n2w=
|
||||
buf.build/go/protoyaml v0.6.0/go.mod h1:RgUOsBu/GYKLDSIRgQXniXbNgFlGEZnQpRAUdLAFV2Q=
|
||||
cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY=
|
||||
@@ -171,8 +171,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT
|
||||
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397 h1:Z7j2mY+bvG05UC80MpnJkitlJju8sSDWsr0Bb4dPceo=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A=
|
||||
github.com/livekit/protocol v1.41.1-0.20250919162624-3476f457ed91 h1:XoxTN5GkqZCrUz02bTxQQhB34yDgSWnmVv12oQJgpuo=
|
||||
github.com/livekit/protocol v1.41.1-0.20250919162624-3476f457ed91/go.mod h1:vhMS30QoEyH2p34vi6X1eWkC4EMV72ZGZwQb74ajY7A=
|
||||
github.com/livekit/protocol v1.42.1-0.20250924073819-3e4e117b0437 h1:2bbAPB3XhkeZqNrnaC/WUvas1wbWBXHE7Ab+1IeRYXM=
|
||||
github.com/livekit/protocol v1.42.1-0.20250924073819-3e4e117b0437/go.mod h1:vhMS30QoEyH2p34vi6X1eWkC4EMV72ZGZwQb74ajY7A=
|
||||
github.com/livekit/psrpc v0.7.0 h1:rtfqfjYN06WJYloE/S0nmkJ/Y04x4pxLQLe8kQ4FVHU=
|
||||
github.com/livekit/psrpc v0.7.0/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk=
|
||||
github.com/mackerelio/go-osstat v0.2.6 h1:gs4U8BZeS1tjrL08tt5VUliVvSWP26Ai2Ob8Lr7f2i0=
|
||||
@@ -216,8 +216,8 @@ github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
|
||||
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
|
||||
github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA=
|
||||
github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
|
||||
github.com/nats-io/nats.go v1.46.0 h1:iUcX+MLT0HHXskGkz+Sg20sXrPtJLsOojMDTDzOHSb8=
|
||||
github.com/nats-io/nats.go v1.46.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
|
||||
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
|
||||
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
@@ -479,10 +479,10 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
|
||||
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090 h1:d8Nakh1G+ur7+P3GcMjpRDEkoLUcLW2iU92XVqR+XMQ=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090/go.mod h1:U8EXRNSd8sUYyDfs/It7KVWodQr+Hf9xtxyxWudSwEw=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090 h1:/OQuEa4YWtDt7uQWHd3q3sUMb+QOLQUg1xa8CEsRv5w=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090/go.mod h1:GmFNa4BdJZ2a8G+wCe9Bg3wwThLrJun751XstdJt5Og=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250922171735-9219d122eba9 h1:jm6v6kMRpTYKxBRrDkYAitNJegUeO1Mf3Kt80obv0gg=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250922171735-9219d122eba9/go.mod h1:LmwNphe5Afor5V3R5BppOULHOnt2mCIf+NxMd4XiygE=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9 h1:V1jCN2HBa8sySkR5vLcCSqJSTMv093Rw9EJefhQGP7M=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9/go.mod h1:HSkG/KdJWusxU1F6CNrwNDjBMgisKxGnc5dAZfT0mjQ=
|
||||
google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI=
|
||||
google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ=
|
||||
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/frostbyte73/core"
|
||||
"github.com/google/uuid"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/sdp/v3"
|
||||
@@ -46,6 +47,7 @@ import (
|
||||
"github.com/livekit/protocol/utils"
|
||||
"github.com/livekit/protocol/utils/guid"
|
||||
"github.com/livekit/protocol/utils/pointer"
|
||||
"github.com/livekit/psrpc"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/metric"
|
||||
@@ -328,6 +330,10 @@ type ParticipantImpl struct {
|
||||
// loggers for publisher and subscriber
|
||||
pubLogger logger.Logger
|
||||
subLogger logger.Logger
|
||||
|
||||
rpcLock sync.Mutex
|
||||
rpcPendingAcks map[string]*utils.DataChannelRpcPendingAckHandler
|
||||
rpcPendingResponses map[string]*utils.DataChannelRpcPendingResponseHandler
|
||||
}
|
||||
|
||||
func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
|
||||
@@ -360,7 +366,9 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
|
||||
joiningMessageFirstSeqs: make(map[livekit.ParticipantID]uint32),
|
||||
joiningMessageLastWrittenSeqs: make(map[livekit.ParticipantID]uint32),
|
||||
},
|
||||
onClose: make(map[string]func(types.LocalParticipant)),
|
||||
rpcPendingAcks: make(map[string]*utils.DataChannelRpcPendingAckHandler),
|
||||
rpcPendingResponses: make(map[string]*utils.DataChannelRpcPendingResponseHandler),
|
||||
onClose: make(map[string]func(types.LocalParticipant)),
|
||||
}
|
||||
p.setupSignalling()
|
||||
|
||||
@@ -1506,6 +1514,14 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
|
||||
|
||||
p.UpTrackManager.Close(isExpectedToResume)
|
||||
|
||||
p.rpcLock.Lock()
|
||||
clear(p.rpcPendingAcks)
|
||||
for _, handler := range p.rpcPendingResponses {
|
||||
handler.Resolve("", utils.DataChannelRpcErrorFromBuiltInCodes(utils.DataChannelRpcRecipientDisconnected, ""))
|
||||
}
|
||||
p.rpcPendingResponses = make(map[string]*utils.DataChannelRpcPendingResponseHandler)
|
||||
p.rpcLock.Unlock()
|
||||
|
||||
p.updateState(livekit.ParticipantInfo_DISCONNECTED)
|
||||
close(p.disconnected)
|
||||
|
||||
@@ -2492,10 +2508,24 @@ func (p *ParticipantImpl) handleReceivedDataMessage(kind livekit.DataPacket_Kind
|
||||
if payload.RpcResponse == nil {
|
||||
return
|
||||
}
|
||||
|
||||
rpcResponse := payload.RpcResponse
|
||||
switch res := rpcResponse.Value.(type) {
|
||||
case *livekit.RpcResponse_Payload:
|
||||
shouldForwardData = !p.handleIncomingRpcResponse(payload.RpcResponse.GetRequestId(), res.Payload, nil)
|
||||
case *livekit.RpcResponse_Error:
|
||||
shouldForwardData = !p.handleIncomingRpcResponse(payload.RpcResponse.GetRequestId(), "", &utils.DataChannelRpcError{
|
||||
Code: utils.DataChannelRpcErrorCode(res.Error.GetCode()),
|
||||
Message: res.Error.GetMessage(),
|
||||
Data: res.Error.GetData(),
|
||||
})
|
||||
}
|
||||
case *livekit.DataPacket_RpcAck:
|
||||
if payload.RpcAck == nil {
|
||||
return
|
||||
}
|
||||
|
||||
shouldForwardData = !p.handleIncomingRpcAck(payload.RpcAck.GetRequestId())
|
||||
case *livekit.DataPacket_StreamHeader:
|
||||
if payload.StreamHeader == nil {
|
||||
return
|
||||
@@ -4170,3 +4200,125 @@ func (p *ParticipantImpl) AddTransceiverFromTrackLocal(
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ParticipantImpl) handleIncomingRpcAck(requestId string) bool {
|
||||
p.rpcLock.Lock()
|
||||
defer p.rpcLock.Unlock()
|
||||
|
||||
handler, ok := p.rpcPendingAcks[requestId]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
handler.Resolve()
|
||||
delete(p.rpcPendingAcks, requestId)
|
||||
return true
|
||||
}
|
||||
|
||||
func (p *ParticipantImpl) handleIncomingRpcResponse(requestId string, payload string, err *utils.DataChannelRpcError) bool {
|
||||
p.rpcLock.Lock()
|
||||
defer p.rpcLock.Unlock()
|
||||
|
||||
handler, ok := p.rpcPendingResponses[requestId]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
handler.Resolve(payload, err)
|
||||
delete(p.rpcPendingResponses, requestId)
|
||||
return true
|
||||
}
|
||||
|
||||
func (p *ParticipantImpl) PerformRpc(req *livekit.PerformRpcRequest, resultCh chan string, errorCh chan error) {
|
||||
responseTimeout := req.GetResponseTimeoutMs()
|
||||
if responseTimeout <= 0 {
|
||||
responseTimeout = uint32(utils.DataChannelRpcDefaultResponseTimeout.Milliseconds())
|
||||
}
|
||||
|
||||
go func() {
|
||||
if len([]byte(req.GetPayload())) > utils.DataChannelRpcMaxPayloadBytes {
|
||||
errorCh <- utils.DataChannelRpcErrorFromBuiltInCodes(utils.DataChannelRpcRequestPayloadTooLarge, "").PsrpcError()
|
||||
return
|
||||
}
|
||||
|
||||
id := uuid.NewString()
|
||||
|
||||
responseTimer := time.AfterFunc(time.Duration(responseTimeout)*time.Millisecond, func() {
|
||||
p.rpcLock.Lock()
|
||||
delete(p.rpcPendingResponses, id)
|
||||
p.rpcLock.Unlock()
|
||||
|
||||
select {
|
||||
case errorCh <- utils.DataChannelRpcErrorFromBuiltInCodes(utils.DataChannelRpcResponseTimeout, "").PsrpcError():
|
||||
default:
|
||||
}
|
||||
})
|
||||
ackTimer := time.AfterFunc(utils.DataChannelRpcMaxRoundTripLatency, func() {
|
||||
p.rpcLock.Lock()
|
||||
delete(p.rpcPendingAcks, id)
|
||||
delete(p.rpcPendingResponses, id)
|
||||
p.rpcLock.Unlock()
|
||||
responseTimer.Stop()
|
||||
|
||||
select {
|
||||
case errorCh <- utils.DataChannelRpcErrorFromBuiltInCodes(utils.DataChannelRpcConnectionTimeout, "").PsrpcError():
|
||||
default:
|
||||
}
|
||||
})
|
||||
|
||||
rpcRequest := &livekit.DataPacket{
|
||||
Kind: livekit.DataPacket_RELIABLE,
|
||||
ParticipantIdentity: id,
|
||||
Value: &livekit.DataPacket_RpcRequest{
|
||||
RpcRequest: &livekit.RpcRequest{
|
||||
Id: id,
|
||||
Method: req.GetMethod(),
|
||||
Payload: req.GetPayload(),
|
||||
ResponseTimeoutMs: responseTimeout - p.lastRTT,
|
||||
Version: 1,
|
||||
},
|
||||
},
|
||||
}
|
||||
data, err := proto.Marshal(rpcRequest)
|
||||
if err != nil {
|
||||
ackTimer.Stop()
|
||||
responseTimer.Stop()
|
||||
errorCh <- psrpc.NewError(psrpc.Internal, err)
|
||||
return
|
||||
}
|
||||
|
||||
// using RPC ID as the unique ID for server to identify the response
|
||||
err = p.SendDataMessage(livekit.DataPacket_RELIABLE, data, livekit.ParticipantID(id), 0)
|
||||
if err != nil {
|
||||
ackTimer.Stop()
|
||||
responseTimer.Stop()
|
||||
errorCh <- psrpc.NewError(psrpc.Internal, err)
|
||||
return
|
||||
}
|
||||
|
||||
p.rpcLock.Lock()
|
||||
p.rpcPendingAcks[id] = &utils.DataChannelRpcPendingAckHandler{
|
||||
Resolve: func() {
|
||||
ackTimer.Stop()
|
||||
},
|
||||
ParticipantIdentity: req.GetDestinationIdentity(),
|
||||
}
|
||||
p.rpcPendingResponses[id] = &utils.DataChannelRpcPendingResponseHandler{
|
||||
Resolve: func(payload string, error *utils.DataChannelRpcError) {
|
||||
responseTimer.Stop()
|
||||
if _, ok := p.rpcPendingAcks[id]; ok {
|
||||
p.rpcPendingAcks[id].Resolve()
|
||||
ackTimer.Stop()
|
||||
}
|
||||
|
||||
if error != nil {
|
||||
errorCh <- error.PsrpcError()
|
||||
} else {
|
||||
resultCh <- payload
|
||||
}
|
||||
},
|
||||
ParticipantIdentity: req.GetDestinationIdentity(),
|
||||
}
|
||||
p.rpcLock.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -541,6 +541,8 @@ type LocalParticipant interface {
|
||||
HandleLeaveRequest(reason ParticipantCloseReason)
|
||||
|
||||
HandleSignalMessage(msg proto.Message) error
|
||||
|
||||
PerformRpc(req *livekit.PerformRpcRequest, resultCh chan string, errorCh chan error)
|
||||
}
|
||||
|
||||
// Room is a container of participants, and can provide room-level actions
|
||||
|
||||
@@ -940,6 +940,13 @@ type FakeLocalParticipant struct {
|
||||
onUpdateSubscriptionsArgsForCall []struct {
|
||||
arg1 func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool)
|
||||
}
|
||||
PerformRpcStub func(*livekit.PerformRpcRequest, chan string, chan error)
|
||||
performRpcMutex sync.RWMutex
|
||||
performRpcArgsForCall []struct {
|
||||
arg1 *livekit.PerformRpcRequest
|
||||
arg2 chan string
|
||||
arg3 chan error
|
||||
}
|
||||
ProtocolVersionStub func() types.ProtocolVersion
|
||||
protocolVersionMutex sync.RWMutex
|
||||
protocolVersionArgsForCall []struct {
|
||||
@@ -6413,6 +6420,40 @@ func (fake *FakeLocalParticipant) OnUpdateSubscriptionsArgsForCall(i int) func(t
|
||||
return argsForCall.arg1
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) PerformRpc(arg1 *livekit.PerformRpcRequest, arg2 chan string, arg3 chan error) {
|
||||
fake.performRpcMutex.Lock()
|
||||
fake.performRpcArgsForCall = append(fake.performRpcArgsForCall, struct {
|
||||
arg1 *livekit.PerformRpcRequest
|
||||
arg2 chan string
|
||||
arg3 chan error
|
||||
}{arg1, arg2, arg3})
|
||||
stub := fake.PerformRpcStub
|
||||
fake.recordInvocation("PerformRpc", []interface{}{arg1, arg2, arg3})
|
||||
fake.performRpcMutex.Unlock()
|
||||
if stub != nil {
|
||||
fake.PerformRpcStub(arg1, arg2, arg3)
|
||||
}
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) PerformRpcCallCount() int {
|
||||
fake.performRpcMutex.RLock()
|
||||
defer fake.performRpcMutex.RUnlock()
|
||||
return len(fake.performRpcArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) PerformRpcCalls(stub func(*livekit.PerformRpcRequest, chan string, chan error)) {
|
||||
fake.performRpcMutex.Lock()
|
||||
defer fake.performRpcMutex.Unlock()
|
||||
fake.PerformRpcStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) PerformRpcArgsForCall(i int) (*livekit.PerformRpcRequest, chan string, chan error) {
|
||||
fake.performRpcMutex.RLock()
|
||||
defer fake.performRpcMutex.RUnlock()
|
||||
argsForCall := fake.performRpcArgsForCall[i]
|
||||
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) ProtocolVersion() types.ProtocolVersion {
|
||||
fake.protocolVersionMutex.Lock()
|
||||
ret, specificReturn := fake.protocolVersionReturnsOnCall[len(fake.protocolVersionArgsForCall)]
|
||||
|
||||
@@ -48,4 +48,5 @@ var (
|
||||
ErrInvalidMessageType = psrpc.NewErrorf(psrpc.Internal, "invalid message type")
|
||||
ErrNoConnectRequest = psrpc.NewErrorf(psrpc.InvalidArgument, "no connect request")
|
||||
ErrNoConnectResponse = psrpc.NewErrorf(psrpc.InvalidArgument, "no connect response")
|
||||
ErrDestinationIdentityRequired = psrpc.NewErrorf(psrpc.InvalidArgument, "destination identity is required")
|
||||
)
|
||||
|
||||
@@ -825,6 +825,30 @@ func (r *RoomManager) MoveParticipant(ctx context.Context, req *livekit.MovePart
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (r *RoomManager) PerformRpc(ctx context.Context, req *livekit.PerformRpcRequest) (*livekit.PerformRpcResponse, error) {
|
||||
room := r.GetRoom(ctx, livekit.RoomName(req.GetRoom()))
|
||||
if room == nil {
|
||||
return nil, ErrRoomNotFound
|
||||
}
|
||||
|
||||
participant := room.GetParticipant(livekit.ParticipantIdentity(req.GetDestinationIdentity()))
|
||||
if participant == nil {
|
||||
return nil, ErrParticipantNotFound
|
||||
}
|
||||
|
||||
resultChan := make(chan string, 1)
|
||||
errorChan := make(chan error, 1)
|
||||
|
||||
participant.PerformRpc(req, resultChan, errorChan)
|
||||
|
||||
select {
|
||||
case result := <-resultChan:
|
||||
return &livekit.PerformRpcResponse{Payload: result}, nil
|
||||
case err := <-errorChan:
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RoomManager) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomRequest) (*livekit.DeleteRoomResponse, error) {
|
||||
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
|
||||
if room == nil {
|
||||
|
||||
@@ -355,6 +355,24 @@ func (s *RoomService) MoveParticipant(ctx context.Context, req *livekit.MovePart
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (s *RoomService) PerformRpc(ctx context.Context, req *livekit.PerformRpcRequest) (*livekit.PerformRpcResponse, error) {
|
||||
RecordRequest(ctx, req)
|
||||
|
||||
roomName := livekit.RoomName(req.Room)
|
||||
AppendLogFields(ctx, "room", roomName, "participant", req.DestinationIdentity)
|
||||
|
||||
if err := EnsureAdminPermission(ctx, roomName); err != nil {
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
if req.DestinationIdentity == "" {
|
||||
return nil, ErrDestinationIdentityRequired
|
||||
}
|
||||
|
||||
res, err := s.participantClient.PerformRpc(ctx, s.topicFormatter.ParticipantTopic(ctx, roomName, livekit.ParticipantIdentity(req.DestinationIdentity)), req)
|
||||
RecordResponse(ctx, res)
|
||||
return res, err
|
||||
}
|
||||
|
||||
func redactCreateRoomRequest(req *livekit.CreateRoomRequest) *livekit.CreateRoomRequest {
|
||||
if req.Egress == nil && req.Metadata == "" {
|
||||
// nothing to redact
|
||||
|
||||
Reference in New Issue
Block a user