diff --git a/go.mod b/go.mod index a8254f18e..2c0762c30 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397 - github.com/livekit/protocol v1.41.1-0.20250919162624-3476f457ed91 + github.com/livekit/protocol v1.42.1-0.20250924073819-3e4e117b0437 github.com/livekit/psrpc v0.7.0 github.com/mackerelio/go-osstat v0.2.6 github.com/magefile/mage v1.15.0 @@ -74,8 +74,8 @@ require ( ) require ( - buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250717185734-6c6e0d3c608e.1 // indirect - buf.build/go/protovalidate v0.14.0 // indirect + buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250912141014-52f32327d4b0.1 // indirect + buf.build/go/protovalidate v1.0.0 // indirect buf.build/go/protoyaml v0.6.0 // indirect cel.dev/expr v0.24.0 // indirect dario.cat/mergo v1.0.0 // indirect @@ -115,7 +115,7 @@ require ( github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/term v0.5.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/nats-io/nats.go v1.45.0 // indirect + github.com/nats-io/nats.go v1.46.0 // indirect github.com/nats-io/nkeys v0.4.11 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect @@ -145,8 +145,8 @@ require ( golang.org/x/sys v0.36.0 // indirect golang.org/x/text v0.29.0 // indirect golang.org/x/tools v0.37.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250922171735-9219d122eba9 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9 // indirect google.golang.org/grpc v1.75.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 2812c643e..d29e66e0f 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ -buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250717185734-6c6e0d3c608e.1 h1:u98oQG8CHYBrOWrYdqbyNpKz4Pw02ssv03DsTInnXn8= -buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250717185734-6c6e0d3c608e.1/go.mod h1:aY3zbkNan5F+cGm9lITDP6oxJIwu0dn9KjJuJjWaHkg= -buf.build/go/protovalidate v0.14.0 h1:kr/rC/no+DtRyYX+8KXLDxNnI1rINz0imk5K44ZpZ3A= -buf.build/go/protovalidate v0.14.0/go.mod h1:+F/oISho9MO7gJQNYC2VWLzcO1fTPmaTA08SDYJZncA= +buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250912141014-52f32327d4b0.1 h1:DQLS/rRxLHuugVzjJU5AvOwD57pdFl9he/0O7e5P294= +buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250912141014-52f32327d4b0.1/go.mod h1:aY3zbkNan5F+cGm9lITDP6oxJIwu0dn9KjJuJjWaHkg= +buf.build/go/protovalidate v1.0.0 h1:IAG1etULddAy93fiBsFVhpj7es5zL53AfB/79CVGtyY= +buf.build/go/protovalidate v1.0.0/go.mod h1:KQmEUrcQuC99hAw+juzOEAmILScQiKBP1Oc36vvCLW8= buf.build/go/protoyaml v0.6.0 h1:Nzz1lvcXF8YgNZXk+voPPwdU8FjDPTUV4ndNTXN0n2w= buf.build/go/protoyaml v0.6.0/go.mod h1:RgUOsBu/GYKLDSIRgQXniXbNgFlGEZnQpRAUdLAFV2Q= cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY= @@ -171,8 +171,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397 h1:Z7j2mY+bvG05UC80MpnJkitlJju8sSDWsr0Bb4dPceo= github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A= -github.com/livekit/protocol v1.41.1-0.20250919162624-3476f457ed91 h1:XoxTN5GkqZCrUz02bTxQQhB34yDgSWnmVv12oQJgpuo= -github.com/livekit/protocol v1.41.1-0.20250919162624-3476f457ed91/go.mod h1:vhMS30QoEyH2p34vi6X1eWkC4EMV72ZGZwQb74ajY7A= +github.com/livekit/protocol v1.42.1-0.20250924073819-3e4e117b0437 h1:2bbAPB3XhkeZqNrnaC/WUvas1wbWBXHE7Ab+1IeRYXM= +github.com/livekit/protocol v1.42.1-0.20250924073819-3e4e117b0437/go.mod h1:vhMS30QoEyH2p34vi6X1eWkC4EMV72ZGZwQb74ajY7A= github.com/livekit/psrpc v0.7.0 h1:rtfqfjYN06WJYloE/S0nmkJ/Y04x4pxLQLe8kQ4FVHU= github.com/livekit/psrpc v0.7.0/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk= github.com/mackerelio/go-osstat v0.2.6 h1:gs4U8BZeS1tjrL08tt5VUliVvSWP26Ai2Ob8Lr7f2i0= @@ -216,8 +216,8 @@ github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA= -github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nats.go v1.46.0 h1:iUcX+MLT0HHXskGkz+Sg20sXrPtJLsOojMDTDzOHSb8= +github.com/nats-io/nats.go v1.46.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -479,10 +479,10 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= -google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090 h1:d8Nakh1G+ur7+P3GcMjpRDEkoLUcLW2iU92XVqR+XMQ= -google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090/go.mod h1:U8EXRNSd8sUYyDfs/It7KVWodQr+Hf9xtxyxWudSwEw= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090 h1:/OQuEa4YWtDt7uQWHd3q3sUMb+QOLQUg1xa8CEsRv5w= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090/go.mod h1:GmFNa4BdJZ2a8G+wCe9Bg3wwThLrJun751XstdJt5Og= +google.golang.org/genproto/googleapis/api v0.0.0-20250922171735-9219d122eba9 h1:jm6v6kMRpTYKxBRrDkYAitNJegUeO1Mf3Kt80obv0gg= +google.golang.org/genproto/googleapis/api v0.0.0-20250922171735-9219d122eba9/go.mod h1:LmwNphe5Afor5V3R5BppOULHOnt2mCIf+NxMd4XiygE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9 h1:V1jCN2HBa8sySkR5vLcCSqJSTMv093Rw9EJefhQGP7M= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9/go.mod h1:HSkG/KdJWusxU1F6CNrwNDjBMgisKxGnc5dAZfT0mjQ= google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI= google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw= diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 9ed6cb49a..a350055a7 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -25,6 +25,7 @@ import ( "time" "github.com/frostbyte73/core" + "github.com/google/uuid" lru "github.com/hashicorp/golang-lru/v2" "github.com/pion/rtcp" "github.com/pion/sdp/v3" @@ -46,6 +47,7 @@ import ( "github.com/livekit/protocol/utils" "github.com/livekit/protocol/utils/guid" "github.com/livekit/protocol/utils/pointer" + "github.com/livekit/psrpc" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/metric" @@ -328,6 +330,10 @@ type ParticipantImpl struct { // loggers for publisher and subscriber pubLogger logger.Logger subLogger logger.Logger + + rpcLock sync.Mutex + rpcPendingAcks map[string]*utils.DataChannelRpcPendingAckHandler + rpcPendingResponses map[string]*utils.DataChannelRpcPendingResponseHandler } func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { @@ -360,7 +366,9 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { joiningMessageFirstSeqs: make(map[livekit.ParticipantID]uint32), joiningMessageLastWrittenSeqs: make(map[livekit.ParticipantID]uint32), }, - onClose: make(map[string]func(types.LocalParticipant)), + rpcPendingAcks: make(map[string]*utils.DataChannelRpcPendingAckHandler), + rpcPendingResponses: make(map[string]*utils.DataChannelRpcPendingResponseHandler), + onClose: make(map[string]func(types.LocalParticipant)), } p.setupSignalling() @@ -1506,6 +1514,14 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea p.UpTrackManager.Close(isExpectedToResume) + p.rpcLock.Lock() + clear(p.rpcPendingAcks) + for _, handler := range p.rpcPendingResponses { + handler.Resolve("", utils.DataChannelRpcErrorFromBuiltInCodes(utils.DataChannelRpcRecipientDisconnected, "")) + } + p.rpcPendingResponses = make(map[string]*utils.DataChannelRpcPendingResponseHandler) + p.rpcLock.Unlock() + p.updateState(livekit.ParticipantInfo_DISCONNECTED) close(p.disconnected) @@ -2492,10 +2508,24 @@ func (p *ParticipantImpl) handleReceivedDataMessage(kind livekit.DataPacket_Kind if payload.RpcResponse == nil { return } + + rpcResponse := payload.RpcResponse + switch res := rpcResponse.Value.(type) { + case *livekit.RpcResponse_Payload: + shouldForwardData = !p.handleIncomingRpcResponse(payload.RpcResponse.GetRequestId(), res.Payload, nil) + case *livekit.RpcResponse_Error: + shouldForwardData = !p.handleIncomingRpcResponse(payload.RpcResponse.GetRequestId(), "", &utils.DataChannelRpcError{ + Code: utils.DataChannelRpcErrorCode(res.Error.GetCode()), + Message: res.Error.GetMessage(), + Data: res.Error.GetData(), + }) + } case *livekit.DataPacket_RpcAck: if payload.RpcAck == nil { return } + + shouldForwardData = !p.handleIncomingRpcAck(payload.RpcAck.GetRequestId()) case *livekit.DataPacket_StreamHeader: if payload.StreamHeader == nil { return @@ -4170,3 +4200,125 @@ func (p *ParticipantImpl) AddTransceiverFromTrackLocal( ) } } + +func (p *ParticipantImpl) handleIncomingRpcAck(requestId string) bool { + p.rpcLock.Lock() + defer p.rpcLock.Unlock() + + handler, ok := p.rpcPendingAcks[requestId] + if !ok { + return false + } + + handler.Resolve() + delete(p.rpcPendingAcks, requestId) + return true +} + +func (p *ParticipantImpl) handleIncomingRpcResponse(requestId string, payload string, err *utils.DataChannelRpcError) bool { + p.rpcLock.Lock() + defer p.rpcLock.Unlock() + + handler, ok := p.rpcPendingResponses[requestId] + if !ok { + return false + } + + handler.Resolve(payload, err) + delete(p.rpcPendingResponses, requestId) + return true +} + +func (p *ParticipantImpl) PerformRpc(req *livekit.PerformRpcRequest, resultCh chan string, errorCh chan error) { + responseTimeout := req.GetResponseTimeoutMs() + if responseTimeout <= 0 { + responseTimeout = uint32(utils.DataChannelRpcDefaultResponseTimeout.Milliseconds()) + } + + go func() { + if len([]byte(req.GetPayload())) > utils.DataChannelRpcMaxPayloadBytes { + errorCh <- utils.DataChannelRpcErrorFromBuiltInCodes(utils.DataChannelRpcRequestPayloadTooLarge, "").PsrpcError() + return + } + + id := uuid.NewString() + + responseTimer := time.AfterFunc(time.Duration(responseTimeout)*time.Millisecond, func() { + p.rpcLock.Lock() + delete(p.rpcPendingResponses, id) + p.rpcLock.Unlock() + + select { + case errorCh <- utils.DataChannelRpcErrorFromBuiltInCodes(utils.DataChannelRpcResponseTimeout, "").PsrpcError(): + default: + } + }) + ackTimer := time.AfterFunc(utils.DataChannelRpcMaxRoundTripLatency, func() { + p.rpcLock.Lock() + delete(p.rpcPendingAcks, id) + delete(p.rpcPendingResponses, id) + p.rpcLock.Unlock() + responseTimer.Stop() + + select { + case errorCh <- utils.DataChannelRpcErrorFromBuiltInCodes(utils.DataChannelRpcConnectionTimeout, "").PsrpcError(): + default: + } + }) + + rpcRequest := &livekit.DataPacket{ + Kind: livekit.DataPacket_RELIABLE, + ParticipantIdentity: id, + Value: &livekit.DataPacket_RpcRequest{ + RpcRequest: &livekit.RpcRequest{ + Id: id, + Method: req.GetMethod(), + Payload: req.GetPayload(), + ResponseTimeoutMs: responseTimeout - p.lastRTT, + Version: 1, + }, + }, + } + data, err := proto.Marshal(rpcRequest) + if err != nil { + ackTimer.Stop() + responseTimer.Stop() + errorCh <- psrpc.NewError(psrpc.Internal, err) + return + } + + // using RPC ID as the unique ID for server to identify the response + err = p.SendDataMessage(livekit.DataPacket_RELIABLE, data, livekit.ParticipantID(id), 0) + if err != nil { + ackTimer.Stop() + responseTimer.Stop() + errorCh <- psrpc.NewError(psrpc.Internal, err) + return + } + + p.rpcLock.Lock() + p.rpcPendingAcks[id] = &utils.DataChannelRpcPendingAckHandler{ + Resolve: func() { + ackTimer.Stop() + }, + ParticipantIdentity: req.GetDestinationIdentity(), + } + p.rpcPendingResponses[id] = &utils.DataChannelRpcPendingResponseHandler{ + Resolve: func(payload string, error *utils.DataChannelRpcError) { + responseTimer.Stop() + if _, ok := p.rpcPendingAcks[id]; ok { + p.rpcPendingAcks[id].Resolve() + ackTimer.Stop() + } + + if error != nil { + errorCh <- error.PsrpcError() + } else { + resultCh <- payload + } + }, + ParticipantIdentity: req.GetDestinationIdentity(), + } + p.rpcLock.Unlock() + }() +} diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 91ad3ae6f..0cdb53445 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -541,6 +541,8 @@ type LocalParticipant interface { HandleLeaveRequest(reason ParticipantCloseReason) HandleSignalMessage(msg proto.Message) error + + PerformRpc(req *livekit.PerformRpcRequest, resultCh chan string, errorCh chan error) } // Room is a container of participants, and can provide room-level actions diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index a1dcca947..4f850971e 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -940,6 +940,13 @@ type FakeLocalParticipant struct { onUpdateSubscriptionsArgsForCall []struct { arg1 func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool) } + PerformRpcStub func(*livekit.PerformRpcRequest, chan string, chan error) + performRpcMutex sync.RWMutex + performRpcArgsForCall []struct { + arg1 *livekit.PerformRpcRequest + arg2 chan string + arg3 chan error + } ProtocolVersionStub func() types.ProtocolVersion protocolVersionMutex sync.RWMutex protocolVersionArgsForCall []struct { @@ -6413,6 +6420,40 @@ func (fake *FakeLocalParticipant) OnUpdateSubscriptionsArgsForCall(i int) func(t return argsForCall.arg1 } +func (fake *FakeLocalParticipant) PerformRpc(arg1 *livekit.PerformRpcRequest, arg2 chan string, arg3 chan error) { + fake.performRpcMutex.Lock() + fake.performRpcArgsForCall = append(fake.performRpcArgsForCall, struct { + arg1 *livekit.PerformRpcRequest + arg2 chan string + arg3 chan error + }{arg1, arg2, arg3}) + stub := fake.PerformRpcStub + fake.recordInvocation("PerformRpc", []interface{}{arg1, arg2, arg3}) + fake.performRpcMutex.Unlock() + if stub != nil { + fake.PerformRpcStub(arg1, arg2, arg3) + } +} + +func (fake *FakeLocalParticipant) PerformRpcCallCount() int { + fake.performRpcMutex.RLock() + defer fake.performRpcMutex.RUnlock() + return len(fake.performRpcArgsForCall) +} + +func (fake *FakeLocalParticipant) PerformRpcCalls(stub func(*livekit.PerformRpcRequest, chan string, chan error)) { + fake.performRpcMutex.Lock() + defer fake.performRpcMutex.Unlock() + fake.PerformRpcStub = stub +} + +func (fake *FakeLocalParticipant) PerformRpcArgsForCall(i int) (*livekit.PerformRpcRequest, chan string, chan error) { + fake.performRpcMutex.RLock() + defer fake.performRpcMutex.RUnlock() + argsForCall := fake.performRpcArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + func (fake *FakeLocalParticipant) ProtocolVersion() types.ProtocolVersion { fake.protocolVersionMutex.Lock() ret, specificReturn := fake.protocolVersionReturnsOnCall[len(fake.protocolVersionArgsForCall)] diff --git a/pkg/service/errors.go b/pkg/service/errors.go index 7e5e66af7..7d0739054 100644 --- a/pkg/service/errors.go +++ b/pkg/service/errors.go @@ -48,4 +48,5 @@ var ( ErrInvalidMessageType = psrpc.NewErrorf(psrpc.Internal, "invalid message type") ErrNoConnectRequest = psrpc.NewErrorf(psrpc.InvalidArgument, "no connect request") ErrNoConnectResponse = psrpc.NewErrorf(psrpc.InvalidArgument, "no connect response") + ErrDestinationIdentityRequired = psrpc.NewErrorf(psrpc.InvalidArgument, "destination identity is required") ) diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index e81a4ccad..a98249fa6 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -825,6 +825,30 @@ func (r *RoomManager) MoveParticipant(ctx context.Context, req *livekit.MovePart return nil, errors.New("not implemented") } +func (r *RoomManager) PerformRpc(ctx context.Context, req *livekit.PerformRpcRequest) (*livekit.PerformRpcResponse, error) { + room := r.GetRoom(ctx, livekit.RoomName(req.GetRoom())) + if room == nil { + return nil, ErrRoomNotFound + } + + participant := room.GetParticipant(livekit.ParticipantIdentity(req.GetDestinationIdentity())) + if participant == nil { + return nil, ErrParticipantNotFound + } + + resultChan := make(chan string, 1) + errorChan := make(chan error, 1) + + participant.PerformRpc(req, resultChan, errorChan) + + select { + case result := <-resultChan: + return &livekit.PerformRpcResponse{Payload: result}, nil + case err := <-errorChan: + return nil, err + } +} + func (r *RoomManager) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomRequest) (*livekit.DeleteRoomResponse, error) { room := r.GetRoom(ctx, livekit.RoomName(req.Room)) if room == nil { diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 0ec4b9b37..2a17c691b 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -355,6 +355,24 @@ func (s *RoomService) MoveParticipant(ctx context.Context, req *livekit.MovePart return res, err } +func (s *RoomService) PerformRpc(ctx context.Context, req *livekit.PerformRpcRequest) (*livekit.PerformRpcResponse, error) { + RecordRequest(ctx, req) + + roomName := livekit.RoomName(req.Room) + AppendLogFields(ctx, "room", roomName, "participant", req.DestinationIdentity) + + if err := EnsureAdminPermission(ctx, roomName); err != nil { + return nil, twirpAuthError(err) + } + if req.DestinationIdentity == "" { + return nil, ErrDestinationIdentityRequired + } + + res, err := s.participantClient.PerformRpc(ctx, s.topicFormatter.ParticipantTopic(ctx, roomName, livekit.ParticipantIdentity(req.DestinationIdentity)), req) + RecordResponse(ctx, res) + return res, err +} + func redactCreateRoomRequest(req *livekit.CreateRoomRequest) *livekit.CreateRoomRequest { if req.Egress == nil && req.Metadata == "" { // nothing to redact