Ingress psrpc (#1295)

* add ingress psrpc codegen

* use psrpc for ingress

* merge entity/info update psrpc services

* split update/delete ingress methods

* add race helper test

* add race context cancel test

* sync race result with mutex
This commit is contained in:
Paul Wells
2023-01-12 11:00:43 -08:00
committed by GitHub
parent 81fb1c5ef0
commit a052ebd644
16 changed files with 1030 additions and 90 deletions

View File

@@ -149,6 +149,8 @@ func PublishDocker() error {
func Psrpc() error {
psrpcProtoFiles := []string{
"pkg/service/rpc/egress.proto",
"pkg/service/rpc/ingress.proto",
"pkg/service/rpc/io.proto",
}
fmt.Println("generating psrpc")

View File

@@ -241,6 +241,7 @@ type EgressConfig struct {
type IngressConfig struct {
RTMPBaseURL string `yaml:"rtmp_base_url"`
UsePsRPC bool `yaml:"use_psrpc"`
}
// not exposed to YAML

View File

@@ -258,19 +258,23 @@ func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStr
return nil, ErrEgressNotConnected
}
f0 := func() (*livekit.EgressInfo, error) {
return s.clientDeprecated.SendRequest(ctx, &livekit.EgressRequest{
EgressId: req.EgressId,
Request: &livekit.EgressRequest_UpdateStream{
UpdateStream: req,
},
race := rpc.NewRace[livekit.EgressInfo](ctx)
if s.clientDeprecated != nil {
race.Go(func(ctx context.Context) (*livekit.EgressInfo, error) {
return s.clientDeprecated.SendRequest(ctx, &livekit.EgressRequest{
EgressId: req.EgressId,
Request: &livekit.EgressRequest_UpdateStream{
UpdateStream: req,
},
})
})
}
f1 := func() (*livekit.EgressInfo, error) {
return s.psrpcClient.UpdateStream(ctx, req.EgressId, req)
if s.psrpcClient != nil {
race.Go(func(ctx context.Context) (*livekit.EgressInfo, error) {
return s.psrpcClient.UpdateStream(ctx, req.EgressId, req)
})
}
info, err := s.getFirst(f0, f1)
_, info, err := race.Wait()
if err != nil {
return nil, err
}
@@ -323,19 +327,23 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR
}
}
f0 := func() (*livekit.EgressInfo, error) {
return s.clientDeprecated.SendRequest(ctx, &livekit.EgressRequest{
EgressId: req.EgressId,
Request: &livekit.EgressRequest_Stop{
Stop: req,
},
race := rpc.NewRace[livekit.EgressInfo](ctx)
if s.clientDeprecated != nil {
race.Go(func(ctx context.Context) (*livekit.EgressInfo, error) {
return s.clientDeprecated.SendRequest(ctx, &livekit.EgressRequest{
EgressId: req.EgressId,
Request: &livekit.EgressRequest_Stop{
Stop: req,
},
})
})
}
f1 := func() (*livekit.EgressInfo, error) {
return s.psrpcClient.StopEgress(ctx, req.EgressId, req)
if s.psrpcClient != nil {
race.Go(func(ctx context.Context) (*livekit.EgressInfo, error) {
return s.psrpcClient.StopEgress(ctx, req.EgressId, req)
})
}
info, err = s.getFirst(f0, f1)
_, info, err = race.Wait()
if err != nil {
return nil, err
}
@@ -436,43 +444,3 @@ func (s *EgressService) handleUpdate(info *livekit.EgressInfo) {
}
}
}
// TODO: remove in future version
func (s *EgressService) getFirst(f0, f1 func() (*livekit.EgressInfo, error)) (*livekit.EgressInfo, error) {
if s.clientDeprecated == nil {
return f1()
}
if s.psrpcClient == nil {
return f0()
}
type res struct {
info *livekit.EgressInfo
err error
}
v0 := make(chan *res, 1)
v1 := make(chan *res, 1)
go func() {
info, err := f0()
v0 <- &res{
info: info,
err: err,
}
}()
go func() {
info, err := f1()
v1 <- &res{
info: info,
err: err,
}
}()
select {
case r := <-v0:
return r.info, r.err
case r := <-v1:
return r.info, r.err
}
}

View File

@@ -8,11 +8,13 @@ import (
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/service/rpc"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/ingress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/psrpc"
)
var (
@@ -22,6 +24,10 @@ var (
type IngressService struct {
conf *config.IngressConfig
nodeID livekit.NodeID
bus psrpc.MessageBus
psrpcClient rpc.IngressClient
psrpcServer rpc.IOInfoServer
rpcClient ingress.RPCClient
store IngressStore
roomService livekit.RoomService
@@ -31,6 +37,9 @@ type IngressService struct {
func NewIngressService(
conf *config.IngressConfig,
nodeID livekit.NodeID,
bus psrpc.MessageBus,
psrpcClient rpc.IngressClient,
rpcClient ingress.RPCClient,
store IngressStore,
rs livekit.RoomService,
@@ -39,6 +48,9 @@ func NewIngressService(
return &IngressService{
conf: conf,
nodeID: nodeID,
bus: bus,
psrpcClient: psrpcClient,
rpcClient: rpcClient,
store: store,
roomService: rs,
@@ -47,15 +59,26 @@ func NewIngressService(
}
}
func (s *IngressService) Start() {
if s.rpcClient != nil {
func (s *IngressService) Start() error {
if s.psrpcClient != nil {
psrpcServer, err := rpc.NewIOInfoServer(string(s.nodeID), s, s.bus)
if err != nil {
return err
}
s.psrpcServer = psrpcServer
} else if s.rpcClient != nil {
go s.updateWorker()
go s.entitiesWorker()
}
return nil
}
func (s *IngressService) Stop() {
close(s.shutdown)
if s.psrpcServer != nil {
s.psrpcServer.Shutdown()
}
}
func (s *IngressService) CreateIngress(ctx context.Context, req *livekit.CreateIngressRequest) (*livekit.IngressInfo, error) {
@@ -84,6 +107,9 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref
if err != nil {
return nil, twirpAuthError(err)
}
if s.store == nil {
return nil, ErrIngressNotConnected
}
sk := utils.NewGuid("")
@@ -159,7 +185,7 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI
return nil, twirpAuthError(err)
}
if s.rpcClient == nil {
if s.psrpcClient == nil && s.rpcClient == nil {
return nil, ErrIngressNotConnected
}
@@ -201,14 +227,22 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI
case livekit.IngressState_ENDPOINT_BUFFERING,
livekit.IngressState_ENDPOINT_PUBLISHING:
// Do not update store the returned state as the ingress service will do it
s, err := s.sendRPCWithRetry(ctx, &livekit.IngressRequest{
IngressId: req.IngressId,
Request: &livekit.IngressRequest_Update{Update: req},
})
if err != nil {
race := rpc.NewRace[livekit.IngressState](ctx)
if s.rpcClient != nil {
race.Go(func(ctx context.Context) (*livekit.IngressState, error) {
return s.sendRPCWithRetry(ctx, &livekit.IngressRequest{
IngressId: req.IngressId,
Request: &livekit.IngressRequest_Update{Update: req},
})
})
}
if s.psrpcClient != nil {
race.Go(func(ctx context.Context) (*livekit.IngressState, error) {
return s.psrpcClient.UpdateIngress(ctx, req.IngressId, req)
})
}
if _, _, err := race.Wait(); err != nil {
logger.Warnw("could not update active ingress", err)
} else {
info.State = s
}
}
@@ -227,6 +261,9 @@ func (s *IngressService) ListIngress(ctx context.Context, req *livekit.ListIngre
if err != nil {
return nil, twirpAuthError(err)
}
if s.store == nil {
return nil, ErrIngressNotConnected
}
infos, err := s.store.ListIngress(ctx, livekit.RoomName(req.RoomName))
if err != nil {
@@ -243,7 +280,7 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI
return nil, twirpAuthError(err)
}
if s.rpcClient == nil {
if s.psrpcClient == nil && s.rpcClient == nil {
return nil, ErrIngressNotConnected
}
@@ -255,14 +292,22 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI
switch info.State.Status {
case livekit.IngressState_ENDPOINT_BUFFERING,
livekit.IngressState_ENDPOINT_PUBLISHING:
s, err := s.sendRPCWithRetry(ctx, &livekit.IngressRequest{
IngressId: req.IngressId,
Request: &livekit.IngressRequest_Delete{Delete: req},
})
if err != nil {
race := rpc.NewRace[livekit.IngressState](ctx)
if s.rpcClient != nil {
race.Go(func(ctx context.Context) (*livekit.IngressState, error) {
return s.sendRPCWithRetry(ctx, &livekit.IngressRequest{
IngressId: req.IngressId,
Request: &livekit.IngressRequest_Delete{Delete: req},
})
})
}
if s.psrpcClient != nil {
race.Go(func(ctx context.Context) (*livekit.IngressState, error) {
return s.psrpcClient.DeleteIngress(ctx, req.IngressId, req)
})
}
if _, _, err := race.Wait(); err != nil {
logger.Warnw("could not stop active ingress", err)
} else {
info.State = s
}
}
@@ -308,6 +353,25 @@ func (s *IngressService) updateWorker() {
}
}
func (s *IngressService) UpdateIngressState(ctx context.Context, req *livekit.UpdateIngressStateRequest) (*rpc.Ignored, error) {
if err := s.store.UpdateIngressState(ctx, req.IngressId, req.State); err != nil {
logger.Errorw("could not update ingress", err)
return nil, err
}
return &rpc.Ignored{}, nil
}
func (s *IngressService) loadIngressFromInfoRequest(req *livekit.GetIngressInfoRequest) (info *livekit.IngressInfo, err error) {
if req.IngressId != "" {
info, err = s.store.LoadIngress(context.Background(), req.IngressId)
} else if req.StreamKey != "" {
info, err = s.store.LoadIngressFromStreamKey(context.Background(), req.StreamKey)
} else {
err = errors.New("request needs to specity either IngressId or StreamKey")
}
return info, err
}
func (s *IngressService) entitiesWorker() {
sub, err := s.rpcClient.GetEntityChannel(context.Background())
if err != nil {
@@ -327,14 +391,10 @@ func (s *IngressService) entitiesWorker() {
continue
}
var info *livekit.IngressInfo
var err error
if req.IngressId != "" {
info, err = s.store.LoadIngress(context.Background(), req.IngressId)
} else if req.StreamKey != "" {
info, err = s.store.LoadIngressFromStreamKey(context.Background(), req.StreamKey)
} else {
err = errors.New("request needs to specity either IngressId or StreamKey")
info, err := s.loadIngressFromInfoRequest(req)
if err != nil {
logger.Errorw("failed to load ingress info", err)
continue
}
err = s.rpcClient.SendGetIngressInfoResponse(context.Background(), req, &livekit.GetIngressInfoResponse{Info: info}, err)
if err != nil {
@@ -347,3 +407,11 @@ func (s *IngressService) entitiesWorker() {
}
}
}
func (s *IngressService) GetIngressInfo(ctx context.Context, req *livekit.GetIngressInfoRequest) (*livekit.GetIngressInfoResponse, error) {
info, err := s.loadIngressFromInfoRequest(req)
if err != nil {
return nil, err
}
return &livekit.GetIngressInfoResponse{Info: info}, nil
}

View File

@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.27.1
// protoc v3.21.5
// protoc-gen-go v1.28.1
// protoc v3.21.12
// source: pkg/service/rpc/egress.proto
package rpc

View File

@@ -1,4 +1,4 @@
// Code generated by protoc-gen-psrpc v0.2.1, DO NOT EDIT.
// Code generated by protoc-gen-psrpc v0.2.2, DO NOT EDIT.
// source: pkg/service/rpc/egress.proto
package rpc

View File

@@ -0,0 +1,230 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.21.12
// source: pkg/service/rpc/ingress.proto
package rpc
import (
livekit "github.com/livekit/protocol/livekit"
_ "github.com/livekit/psrpc/protoc-gen-psrpc/options"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type ListActiveIngressRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *ListActiveIngressRequest) Reset() {
*x = ListActiveIngressRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_pkg_service_rpc_ingress_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ListActiveIngressRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListActiveIngressRequest) ProtoMessage() {}
func (x *ListActiveIngressRequest) ProtoReflect() protoreflect.Message {
mi := &file_pkg_service_rpc_ingress_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListActiveIngressRequest.ProtoReflect.Descriptor instead.
func (*ListActiveIngressRequest) Descriptor() ([]byte, []int) {
return file_pkg_service_rpc_ingress_proto_rawDescGZIP(), []int{0}
}
type ListActiveIngressResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
IngressIds []string `protobuf:"bytes,1,rep,name=ingress_ids,json=ingressIds,proto3" json:"ingress_ids,omitempty"`
}
func (x *ListActiveIngressResponse) Reset() {
*x = ListActiveIngressResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_pkg_service_rpc_ingress_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ListActiveIngressResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListActiveIngressResponse) ProtoMessage() {}
func (x *ListActiveIngressResponse) ProtoReflect() protoreflect.Message {
mi := &file_pkg_service_rpc_ingress_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListActiveIngressResponse.ProtoReflect.Descriptor instead.
func (*ListActiveIngressResponse) Descriptor() ([]byte, []int) {
return file_pkg_service_rpc_ingress_proto_rawDescGZIP(), []int{1}
}
func (x *ListActiveIngressResponse) GetIngressIds() []string {
if x != nil {
return x.IngressIds
}
return nil
}
var File_pkg_service_rpc_ingress_proto protoreflect.FileDescriptor
var file_pkg_service_rpc_ingress_proto_rawDesc = []byte{
0x0a, 0x1d, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, 0x70,
0x63, 0x2f, 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
0x03, 0x72, 0x70, 0x63, 0x1a, 0x0d, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x1a, 0x15, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x69, 0x6e, 0x67,
0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x1a, 0x0a, 0x18, 0x4c, 0x69,
0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x3c, 0x0a, 0x19, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63,
0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x5f, 0x69,
0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0a, 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73,
0x73, 0x49, 0x64, 0x73, 0x32, 0x6d, 0x0a, 0x0f, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49,
0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x12, 0x5a, 0x0a, 0x11, 0x4c, 0x69, 0x73, 0x74, 0x41,
0x63, 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1d, 0x2e, 0x72,
0x70, 0x63, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67,
0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x72, 0x70,
0x63, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x72,
0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xb2, 0x89, 0x01,
0x02, 0x08, 0x01, 0x32, 0xae, 0x01, 0x0a, 0x0e, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x48,
0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x4d, 0x0a, 0x0d, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65,
0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1d, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69,
0x74, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74,
0x2e, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x74, 0x61, 0x74, 0x65, 0x22, 0x06, 0xb2,
0x89, 0x01, 0x02, 0x18, 0x01, 0x12, 0x4d, 0x0a, 0x0d, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x49,
0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1d, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74,
0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e,
0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x74, 0x61, 0x74, 0x65, 0x22, 0x06, 0xb2, 0x89,
0x01, 0x02, 0x18, 0x01, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63,
0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b,
0x69, 0x74, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72,
0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_pkg_service_rpc_ingress_proto_rawDescOnce sync.Once
file_pkg_service_rpc_ingress_proto_rawDescData = file_pkg_service_rpc_ingress_proto_rawDesc
)
func file_pkg_service_rpc_ingress_proto_rawDescGZIP() []byte {
file_pkg_service_rpc_ingress_proto_rawDescOnce.Do(func() {
file_pkg_service_rpc_ingress_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_service_rpc_ingress_proto_rawDescData)
})
return file_pkg_service_rpc_ingress_proto_rawDescData
}
var file_pkg_service_rpc_ingress_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_pkg_service_rpc_ingress_proto_goTypes = []interface{}{
(*ListActiveIngressRequest)(nil), // 0: rpc.ListActiveIngressRequest
(*ListActiveIngressResponse)(nil), // 1: rpc.ListActiveIngressResponse
(*livekit.UpdateIngressRequest)(nil), // 2: livekit.UpdateIngressRequest
(*livekit.DeleteIngressRequest)(nil), // 3: livekit.DeleteIngressRequest
(*livekit.IngressState)(nil), // 4: livekit.IngressState
}
var file_pkg_service_rpc_ingress_proto_depIdxs = []int32{
0, // 0: rpc.IngressInternal.ListActiveIngress:input_type -> rpc.ListActiveIngressRequest
2, // 1: rpc.IngressHandler.UpdateIngress:input_type -> livekit.UpdateIngressRequest
3, // 2: rpc.IngressHandler.DeleteIngress:input_type -> livekit.DeleteIngressRequest
1, // 3: rpc.IngressInternal.ListActiveIngress:output_type -> rpc.ListActiveIngressResponse
4, // 4: rpc.IngressHandler.UpdateIngress:output_type -> livekit.IngressState
4, // 5: rpc.IngressHandler.DeleteIngress:output_type -> livekit.IngressState
3, // [3:6] is the sub-list for method output_type
0, // [0:3] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_pkg_service_rpc_ingress_proto_init() }
func file_pkg_service_rpc_ingress_proto_init() {
if File_pkg_service_rpc_ingress_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_pkg_service_rpc_ingress_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ListActiveIngressRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pkg_service_rpc_ingress_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ListActiveIngressResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pkg_service_rpc_ingress_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 2,
},
GoTypes: file_pkg_service_rpc_ingress_proto_goTypes,
DependencyIndexes: file_pkg_service_rpc_ingress_proto_depIdxs,
MessageInfos: file_pkg_service_rpc_ingress_proto_msgTypes,
}.Build()
File_pkg_service_rpc_ingress_proto = out.File
file_pkg_service_rpc_ingress_proto_rawDesc = nil
file_pkg_service_rpc_ingress_proto_goTypes = nil
file_pkg_service_rpc_ingress_proto_depIdxs = nil
}

View File

@@ -0,0 +1,29 @@
syntax = "proto3";
package rpc;
option go_package = "github.com/livekit/livekit/pkg/service/rpc";
import "options.proto";
import "livekit_ingress.proto";
service IngressInternal {
rpc ListActiveIngress(ListActiveIngressRequest) returns (ListActiveIngressResponse) {
option (psrpc.options).multi = true;
};
}
service IngressHandler {
rpc UpdateIngress(livekit.UpdateIngressRequest) returns (livekit.IngressState) {
option (psrpc.options).topics = true;
};
rpc DeleteIngress(livekit.DeleteIngressRequest) returns (livekit.IngressState) {
option (psrpc.options).topics = true;
};
}
message ListActiveIngressRequest {}
message ListActiveIngressResponse {
repeated string ingress_ids = 1;
}

View File

@@ -0,0 +1,227 @@
// Code generated by protoc-gen-psrpc v0.2.2, DO NOT EDIT.
// source: pkg/service/rpc/ingress.proto
package rpc
import context "context"
import psrpc1 "github.com/livekit/psrpc"
import livekit2 "github.com/livekit/protocol/livekit"
// ================================
// IngressInternal Client Interface
// ================================
type IngressInternalClient interface {
ListActiveIngress(context.Context, *ListActiveIngressRequest, ...psrpc1.RequestOption) (<-chan *psrpc1.Response[*ListActiveIngressResponse], error)
}
// ====================================
// IngressInternal ServerImpl Interface
// ====================================
type IngressInternalServerImpl interface {
ListActiveIngress(context.Context, *ListActiveIngressRequest) (*ListActiveIngressResponse, error)
}
// ================================
// IngressInternal Server Interface
// ================================
type IngressInternalServer interface {
// Close and wait for pending RPCs to complete
Shutdown()
// Close immediately, without waiting for pending RPCs
Kill()
}
// ======================
// IngressInternal Client
// ======================
type ingressInternalClient struct {
client *psrpc1.RPCClient
}
// NewIngressInternalClient creates a psrpc client that implements the IngressInternalClient interface.
func NewIngressInternalClient(clientID string, bus psrpc1.MessageBus, opts ...psrpc1.ClientOption) (IngressInternalClient, error) {
rpcClient, err := psrpc1.NewRPCClient("IngressInternal", clientID, bus, opts...)
if err != nil {
return nil, err
}
return &ingressInternalClient{
client: rpcClient,
}, nil
}
func (c *ingressInternalClient) ListActiveIngress(ctx context.Context, req *ListActiveIngressRequest, opts ...psrpc1.RequestOption) (<-chan *psrpc1.Response[*ListActiveIngressResponse], error) {
return psrpc1.RequestMulti[*ListActiveIngressResponse](ctx, c.client, "ListActiveIngress", "", req, opts...)
}
// ======================
// IngressInternal Server
// ======================
type ingressInternalServer struct {
svc IngressInternalServerImpl
rpc *psrpc1.RPCServer
}
// NewIngressInternalServer builds a RPCServer that will route requests
// to the corresponding method in the provided svc implementation.
func NewIngressInternalServer(serverID string, svc IngressInternalServerImpl, bus psrpc1.MessageBus, opts ...psrpc1.ServerOption) (IngressInternalServer, error) {
s := psrpc1.NewRPCServer("IngressInternal", serverID, bus, opts...)
var err error
err = psrpc1.RegisterHandler(s, "ListActiveIngress", "", svc.ListActiveIngress, nil)
if err != nil {
s.Close(false)
return nil, err
}
return &ingressInternalServer{
svc: svc,
rpc: s,
}, nil
}
func (s *ingressInternalServer) Shutdown() {
s.rpc.Close(false)
}
func (s *ingressInternalServer) Kill() {
s.rpc.Close(true)
}
// ===============================
// IngressHandler Client Interface
// ===============================
type IngressHandlerClient interface {
UpdateIngress(context.Context, string, *livekit2.UpdateIngressRequest, ...psrpc1.RequestOption) (*livekit2.IngressState, error)
DeleteIngress(context.Context, string, *livekit2.DeleteIngressRequest, ...psrpc1.RequestOption) (*livekit2.IngressState, error)
}
// ===================================
// IngressHandler ServerImpl Interface
// ===================================
type IngressHandlerServerImpl interface {
UpdateIngress(context.Context, *livekit2.UpdateIngressRequest) (*livekit2.IngressState, error)
DeleteIngress(context.Context, *livekit2.DeleteIngressRequest) (*livekit2.IngressState, error)
}
// ===============================
// IngressHandler Server Interface
// ===============================
type IngressHandlerServer interface {
RegisterUpdateIngressTopic(string) error
DeregisterUpdateIngressTopic(string)
RegisterDeleteIngressTopic(string) error
DeregisterDeleteIngressTopic(string)
// Close and wait for pending RPCs to complete
Shutdown()
// Close immediately, without waiting for pending RPCs
Kill()
}
// =====================
// IngressHandler Client
// =====================
type ingressHandlerClient struct {
client *psrpc1.RPCClient
}
// NewIngressHandlerClient creates a psrpc client that implements the IngressHandlerClient interface.
func NewIngressHandlerClient(clientID string, bus psrpc1.MessageBus, opts ...psrpc1.ClientOption) (IngressHandlerClient, error) {
rpcClient, err := psrpc1.NewRPCClient("IngressHandler", clientID, bus, opts...)
if err != nil {
return nil, err
}
return &ingressHandlerClient{
client: rpcClient,
}, nil
}
func (c *ingressHandlerClient) UpdateIngress(ctx context.Context, topic string, req *livekit2.UpdateIngressRequest, opts ...psrpc1.RequestOption) (*livekit2.IngressState, error) {
return psrpc1.RequestSingle[*livekit2.IngressState](ctx, c.client, "UpdateIngress", topic, req, opts...)
}
func (c *ingressHandlerClient) DeleteIngress(ctx context.Context, topic string, req *livekit2.DeleteIngressRequest, opts ...psrpc1.RequestOption) (*livekit2.IngressState, error) {
return psrpc1.RequestSingle[*livekit2.IngressState](ctx, c.client, "DeleteIngress", topic, req, opts...)
}
// =====================
// IngressHandler Server
// =====================
type ingressHandlerServer struct {
svc IngressHandlerServerImpl
rpc *psrpc1.RPCServer
}
// NewIngressHandlerServer builds a RPCServer that will route requests
// to the corresponding method in the provided svc implementation.
func NewIngressHandlerServer(serverID string, svc IngressHandlerServerImpl, bus psrpc1.MessageBus, opts ...psrpc1.ServerOption) (IngressHandlerServer, error) {
s := psrpc1.NewRPCServer("IngressHandler", serverID, bus, opts...)
return &ingressHandlerServer{
svc: svc,
rpc: s,
}, nil
}
func (s *ingressHandlerServer) RegisterUpdateIngressTopic(topic string) error {
return psrpc1.RegisterHandler(s.rpc, "UpdateIngress", topic, s.svc.UpdateIngress, nil)
}
func (s *ingressHandlerServer) DeregisterUpdateIngressTopic(topic string) {
s.rpc.DeregisterHandler("UpdateIngress", topic)
}
func (s *ingressHandlerServer) RegisterDeleteIngressTopic(topic string) error {
return psrpc1.RegisterHandler(s.rpc, "DeleteIngress", topic, s.svc.DeleteIngress, nil)
}
func (s *ingressHandlerServer) DeregisterDeleteIngressTopic(topic string) {
s.rpc.DeregisterHandler("DeleteIngress", topic)
}
func (s *ingressHandlerServer) Shutdown() {
s.rpc.Close(false)
}
func (s *ingressHandlerServer) Kill() {
s.rpc.Close(true)
}
var psrpcFileDescriptor1 = []byte{
// 269 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0xc1, 0x4a, 0xc3, 0x40,
0x10, 0x86, 0x89, 0x85, 0xa2, 0x2b, 0x55, 0x5c, 0x28, 0xc4, 0x40, 0x55, 0x72, 0x12, 0x91, 0x0d,
0xd4, 0xab, 0x17, 0xc5, 0x83, 0x01, 0xbd, 0x54, 0xbc, 0xf4, 0x52, 0xd2, 0xcd, 0x10, 0x97, 0xa6,
0xbb, 0xeb, 0xce, 0x34, 0xef, 0xe0, 0xcb, 0x78, 0xf0, 0x09, 0x45, 0x3b, 0x16, 0xa3, 0x2d, 0xf4,
0x14, 0xf8, 0xbf, 0xc9, 0xf7, 0x0f, 0x3b, 0x62, 0xe0, 0x67, 0x55, 0x86, 0x10, 0x1a, 0xa3, 0x21,
0x0b, 0x5e, 0x67, 0xc6, 0x56, 0x01, 0x10, 0x95, 0x0f, 0x8e, 0x9c, 0xec, 0x04, 0xaf, 0x93, 0x9e,
0xf3, 0x64, 0x9c, 0xe5, 0x2c, 0xe9, 0xd7, 0xa6, 0x81, 0x99, 0xa1, 0x49, 0x6b, 0x34, 0x4d, 0x44,
0xfc, 0x60, 0x90, 0x6e, 0x34, 0x99, 0x06, 0xf2, 0x25, 0x1a, 0xc1, 0xeb, 0x02, 0x90, 0xd2, 0x6b,
0x71, 0xbc, 0x86, 0xa1, 0x77, 0x16, 0x41, 0x9e, 0x8a, 0x7d, 0x36, 0x4d, 0x4c, 0x89, 0x71, 0x74,
0xd6, 0x39, 0xdf, 0x1b, 0x09, 0x8e, 0xf2, 0x12, 0x87, 0x73, 0x71, 0xc8, 0xff, 0xe4, 0x96, 0x20,
0xd8, 0xa2, 0x96, 0x63, 0x71, 0xf4, 0x4f, 0x28, 0x07, 0x2a, 0x78, 0xad, 0x36, 0x2d, 0x91, 0x9c,
0x6c, 0xc2, 0xcb, 0x3d, 0xd2, 0xee, 0xc7, 0x5b, 0xb4, 0xb3, 0x1b, 0x0d, 0xdf, 0x23, 0x71, 0xc0,
0xec, 0xbe, 0xb0, 0x65, 0x0d, 0x41, 0x3e, 0x8a, 0xde, 0xb3, 0x2f, 0x0b, 0xfa, 0x55, 0xc5, 0x8f,
0xa0, 0x5a, 0xf9, 0x4f, 0x55, 0x7f, 0x85, 0x19, 0x3c, 0x51, 0x41, 0xdc, 0x10, 0x47, 0x5f, 0xba,
0x3b, 0xa8, 0x61, 0x9d, 0xae, 0x95, 0x6f, 0xa7, 0xbb, 0xbd, 0x1c, 0x5f, 0x54, 0x86, 0x5e, 0x16,
0x53, 0xa5, 0xdd, 0x3c, 0xe3, 0xd1, 0xd5, 0xf7, 0xcf, 0x81, 0xa7, 0xdd, 0xef, 0x73, 0x5d, 0x7d,
0x06, 0x00, 0x00, 0xff, 0xff, 0xff, 0xd2, 0xc1, 0x99, 0xfa, 0x01, 0x00, 0x00,
}

View File

@@ -0,0 +1,36 @@
package rpc
import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/psrpc"
)
type IngressClient interface {
IngressInternalClient
IngressHandlerClient
}
type ingressClient struct {
IngressInternalClient
IngressHandlerClient
}
func NewIngressClient(nodeID livekit.NodeID, bus psrpc.MessageBus) (IngressClient, error) {
if bus == nil {
return nil, nil
}
clientID := string(nodeID)
internalClient, err := NewIngressInternalClient(clientID, bus)
if err != nil {
return nil, err
}
handlerClient, err := NewIngressHandlerClient(clientID, bus)
if err != nil {
return nil, err
}
return &ingressClient{
IngressInternalClient: internalClient,
IngressHandlerClient: handlerClient,
}, nil
}

154
pkg/service/rpc/io.pb.go Normal file
View File

@@ -0,0 +1,154 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.21.12
// source: pkg/service/rpc/io.proto
package rpc
import (
livekit "github.com/livekit/protocol/livekit"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type Ignored struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *Ignored) Reset() {
*x = Ignored{}
if protoimpl.UnsafeEnabled {
mi := &file_pkg_service_rpc_io_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Ignored) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Ignored) ProtoMessage() {}
func (x *Ignored) ProtoReflect() protoreflect.Message {
mi := &file_pkg_service_rpc_io_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Ignored.ProtoReflect.Descriptor instead.
func (*Ignored) Descriptor() ([]byte, []int) {
return file_pkg_service_rpc_io_proto_rawDescGZIP(), []int{0}
}
var File_pkg_service_rpc_io_proto protoreflect.FileDescriptor
var file_pkg_service_rpc_io_proto_rawDesc = []byte{
0x0a, 0x18, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, 0x70,
0x63, 0x2f, 0x69, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x72, 0x70, 0x63, 0x1a,
0x1a, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x72, 0x70, 0x63, 0x5f, 0x69, 0x6e, 0x74,
0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x09, 0x0a, 0x07, 0x49,
0x67, 0x6e, 0x6f, 0x72, 0x65, 0x64, 0x32, 0xa3, 0x01, 0x0a, 0x06, 0x49, 0x4f, 0x49, 0x6e, 0x66,
0x6f, 0x12, 0x51, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49,
0x6e, 0x66, 0x6f, 0x12, 0x1e, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x47, 0x65,
0x74, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x47, 0x65,
0x74, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x46, 0x0a, 0x12, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x49, 0x6e,
0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x22, 0x2e, 0x6c, 0x69, 0x76,
0x65, 0x6b, 0x69, 0x74, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65,
0x73, 0x73, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c,
0x2e, 0x72, 0x70, 0x63, 0x2e, 0x49, 0x67, 0x6e, 0x6f, 0x72, 0x65, 0x64, 0x42, 0x2c, 0x5a, 0x2a,
0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b,
0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x73,
0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
}
var (
file_pkg_service_rpc_io_proto_rawDescOnce sync.Once
file_pkg_service_rpc_io_proto_rawDescData = file_pkg_service_rpc_io_proto_rawDesc
)
func file_pkg_service_rpc_io_proto_rawDescGZIP() []byte {
file_pkg_service_rpc_io_proto_rawDescOnce.Do(func() {
file_pkg_service_rpc_io_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_service_rpc_io_proto_rawDescData)
})
return file_pkg_service_rpc_io_proto_rawDescData
}
var file_pkg_service_rpc_io_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_pkg_service_rpc_io_proto_goTypes = []interface{}{
(*Ignored)(nil), // 0: rpc.Ignored
(*livekit.GetIngressInfoRequest)(nil), // 1: livekit.GetIngressInfoRequest
(*livekit.UpdateIngressStateRequest)(nil), // 2: livekit.UpdateIngressStateRequest
(*livekit.GetIngressInfoResponse)(nil), // 3: livekit.GetIngressInfoResponse
}
var file_pkg_service_rpc_io_proto_depIdxs = []int32{
1, // 0: rpc.IOInfo.GetIngressInfo:input_type -> livekit.GetIngressInfoRequest
2, // 1: rpc.IOInfo.UpdateIngressState:input_type -> livekit.UpdateIngressStateRequest
3, // 2: rpc.IOInfo.GetIngressInfo:output_type -> livekit.GetIngressInfoResponse
0, // 3: rpc.IOInfo.UpdateIngressState:output_type -> rpc.Ignored
2, // [2:4] is the sub-list for method output_type
0, // [0:2] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_pkg_service_rpc_io_proto_init() }
func file_pkg_service_rpc_io_proto_init() {
if File_pkg_service_rpc_io_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_pkg_service_rpc_io_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Ignored); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pkg_service_rpc_io_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_pkg_service_rpc_io_proto_goTypes,
DependencyIndexes: file_pkg_service_rpc_io_proto_depIdxs,
MessageInfos: file_pkg_service_rpc_io_proto_msgTypes,
}.Build()
File_pkg_service_rpc_io_proto = out.File
file_pkg_service_rpc_io_proto_rawDesc = nil
file_pkg_service_rpc_io_proto_goTypes = nil
file_pkg_service_rpc_io_proto_depIdxs = nil
}

14
pkg/service/rpc/io.proto Normal file
View File

@@ -0,0 +1,14 @@
syntax = "proto3";
package rpc;
option go_package = "github.com/livekit/livekit/pkg/service/rpc";
import "livekit_rpc_internal.proto";
service IOInfo {
rpc GetIngressInfo(livekit.GetIngressInfoRequest) returns (livekit.GetIngressInfoResponse);
rpc UpdateIngressState(livekit.UpdateIngressStateRequest) returns (Ignored);
}
message Ignored {}

127
pkg/service/rpc/io.psrpc.go Normal file
View File

@@ -0,0 +1,127 @@
// Code generated by protoc-gen-psrpc v0.2.2, DO NOT EDIT.
// source: pkg/service/rpc/io.proto
package rpc
import context "context"
import psrpc1 "github.com/livekit/psrpc"
import livekit3 "github.com/livekit/protocol/livekit"
// =======================
// IOInfo Client Interface
// =======================
type IOInfoClient interface {
GetIngressInfo(context.Context, *livekit3.GetIngressInfoRequest, ...psrpc1.RequestOption) (*livekit3.GetIngressInfoResponse, error)
UpdateIngressState(context.Context, *livekit3.UpdateIngressStateRequest, ...psrpc1.RequestOption) (*Ignored, error)
}
// ===========================
// IOInfo ServerImpl Interface
// ===========================
type IOInfoServerImpl interface {
GetIngressInfo(context.Context, *livekit3.GetIngressInfoRequest) (*livekit3.GetIngressInfoResponse, error)
UpdateIngressState(context.Context, *livekit3.UpdateIngressStateRequest) (*Ignored, error)
}
// =======================
// IOInfo Server Interface
// =======================
type IOInfoServer interface {
// Close and wait for pending RPCs to complete
Shutdown()
// Close immediately, without waiting for pending RPCs
Kill()
}
// =============
// IOInfo Client
// =============
type iOInfoClient struct {
client *psrpc1.RPCClient
}
// NewIOInfoClient creates a psrpc client that implements the IOInfoClient interface.
func NewIOInfoClient(clientID string, bus psrpc1.MessageBus, opts ...psrpc1.ClientOption) (IOInfoClient, error) {
rpcClient, err := psrpc1.NewRPCClient("IOInfo", clientID, bus, opts...)
if err != nil {
return nil, err
}
return &iOInfoClient{
client: rpcClient,
}, nil
}
func (c *iOInfoClient) GetIngressInfo(ctx context.Context, req *livekit3.GetIngressInfoRequest, opts ...psrpc1.RequestOption) (*livekit3.GetIngressInfoResponse, error) {
return psrpc1.RequestSingle[*livekit3.GetIngressInfoResponse](ctx, c.client, "GetIngressInfo", "", req, opts...)
}
func (c *iOInfoClient) UpdateIngressState(ctx context.Context, req *livekit3.UpdateIngressStateRequest, opts ...psrpc1.RequestOption) (*Ignored, error) {
return psrpc1.RequestSingle[*Ignored](ctx, c.client, "UpdateIngressState", "", req, opts...)
}
// =============
// IOInfo Server
// =============
type iOInfoServer struct {
svc IOInfoServerImpl
rpc *psrpc1.RPCServer
}
// NewIOInfoServer builds a RPCServer that will route requests
// to the corresponding method in the provided svc implementation.
func NewIOInfoServer(serverID string, svc IOInfoServerImpl, bus psrpc1.MessageBus, opts ...psrpc1.ServerOption) (IOInfoServer, error) {
s := psrpc1.NewRPCServer("IOInfo", serverID, bus, opts...)
var err error
err = psrpc1.RegisterHandler(s, "GetIngressInfo", "", svc.GetIngressInfo, nil)
if err != nil {
s.Close(false)
return nil, err
}
err = psrpc1.RegisterHandler(s, "UpdateIngressState", "", svc.UpdateIngressState, nil)
if err != nil {
s.Close(false)
return nil, err
}
return &iOInfoServer{
svc: svc,
rpc: s,
}, nil
}
func (s *iOInfoServer) Shutdown() {
s.rpc.Close(false)
}
func (s *iOInfoServer) Kill() {
s.rpc.Close(true)
}
var psrpcFileDescriptor2 = []byte{
// 201 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x28, 0xc8, 0x4e, 0xd7,
0x2f, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0xd5, 0x2f, 0x2a, 0x48, 0xd6, 0xcf, 0xcc, 0xd7, 0x2b,
0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2e, 0x2a, 0x48, 0x96, 0x92, 0xca, 0xc9, 0x2c, 0x4b, 0xcd,
0xce, 0x2c, 0x89, 0x2f, 0x2a, 0x48, 0x8e, 0xcf, 0xcc, 0x2b, 0x49, 0x2d, 0xca, 0x4b, 0xcc, 0x81,
0x28, 0x50, 0xe2, 0xe4, 0x62, 0xf7, 0x4c, 0xcf, 0xcb, 0x2f, 0x4a, 0x4d, 0x31, 0x5a, 0xcc, 0xc8,
0xc5, 0xe6, 0xe9, 0xef, 0x99, 0x97, 0x96, 0x2f, 0x14, 0xc8, 0xc5, 0xe7, 0x9e, 0x5a, 0xe2, 0x99,
0x97, 0x5e, 0x94, 0x5a, 0x5c, 0x0c, 0x16, 0x91, 0xd3, 0x83, 0x1a, 0xa2, 0x87, 0x2a, 0x11, 0x94,
0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0x22, 0x25, 0x8f, 0x53, 0xbe, 0xb8, 0x20, 0x3f, 0xaf, 0x38, 0x55,
0xc8, 0x8d, 0x4b, 0x28, 0xb4, 0x20, 0x25, 0xb1, 0x24, 0x15, 0x2a, 0x19, 0x5c, 0x92, 0x58, 0x92,
0x2a, 0xa4, 0x04, 0xd7, 0x86, 0x29, 0x09, 0x33, 0x9a, 0x47, 0xaf, 0xa8, 0x20, 0x59, 0x0f, 0xea,
0x4a, 0x27, 0x9d, 0x28, 0xad, 0xf4, 0xcc, 0x92, 0x8c, 0xd2, 0x24, 0xbd, 0xe4, 0xfc, 0x5c, 0x7d,
0xa8, 0x6e, 0x38, 0x8d, 0x16, 0x10, 0x49, 0x6c, 0x60, 0x5f, 0x1a, 0x03, 0x02, 0x00, 0x00, 0xff,
0xff, 0xba, 0x2f, 0x5a, 0x5b, 0x22, 0x01, 0x00, 0x00,
}

63
pkg/service/rpc/race.go Normal file
View File

@@ -0,0 +1,63 @@
package rpc
import (
"context"
"sync"
)
type raceResult[T any] struct {
i int
val *T
err error
}
type Race[T any] struct {
ctx context.Context
cancel context.CancelFunc
nextIndex int
resultLock sync.Mutex
result *raceResult[T]
}
// NewRace creates a race to yield the result from one or more candidate
// functions
func NewRace[T any](ctx context.Context) *Race[T] {
ctx, cancel := context.WithCancel(ctx)
return &Race[T]{
ctx: ctx,
cancel: cancel,
}
}
// Go adds a candidate function to the race by running it in a new goroutine
func (r *Race[T]) Go(fn func(ctx context.Context) (*T, error)) {
i := r.nextIndex
r.nextIndex++
go func() {
val, err := fn(r.ctx)
r.resultLock.Lock()
if r.result == nil {
r.result = &raceResult[T]{i, val, err}
}
r.resultLock.Unlock()
r.cancel()
}()
}
// Wait awaits the first complete function and returns the index and results
// or -1 if the context is cancelled before any candidate finishes.
func (r *Race[T]) Wait() (int, *T, error) {
<-r.ctx.Done()
r.resultLock.Lock()
res := r.result
r.resultLock.Unlock()
if res != nil {
return res.i, res.val, res.err
}
return -1, nil, r.ctx.Err()
}

View File

@@ -51,6 +51,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
NewEgressLauncher,
NewEgressService,
ingress.NewRedisRPC,
getIngressClient,
getIngressStore,
getIngressConfig,
getIngressRPCClient,
@@ -158,6 +159,14 @@ func getEgressStore(s ObjectStore) EgressStore {
}
}
func getIngressClient(conf *config.Config, nodeID livekit.NodeID, bus psrpc.MessageBus) (rpc.IngressClient, error) {
if conf.Ingress.UsePsRPC {
return rpc.NewIngressClient(nodeID, bus)
}
return nil, nil
}
func getIngressStore(s ObjectStore) IngressStore {
switch store := s.(type) {
case *RedisStore:

View File

@@ -71,10 +71,14 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
egressService := NewEgressService(egressClient, rpcClient, objectStore, egressStore, roomService, telemetryService, rtcEgressLauncher)
ingressConfig := getIngressConfig(conf)
ingressClient, err := getIngressClient(conf, nodeID, messageBus)
if err != nil {
return nil, err
}
rpc := ingress.NewRedisRPC(nodeID, universalClient)
ingressRPCClient := getIngressRPCClient(rpc)
ingressStore := getIngressStore(objectStore)
ingressService := NewIngressService(ingressConfig, ingressRPCClient, ingressStore, roomService, telemetryService)
ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressRPCClient, ingressStore, roomService, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, telemetryService)
clientConfigurationManager := createClientConfiguration()
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher)
@@ -187,6 +191,14 @@ func getEgressStore(s ObjectStore) EgressStore {
}
}
func getIngressClient(conf *config.Config, nodeID livekit.NodeID, bus psrpc.MessageBus) (rpc.IngressClient, error) {
if conf.Ingress.UsePsRPC {
return rpc.NewIngressClient(nodeID, bus)
}
return nil, nil
}
func getIngressStore(s ObjectStore) IngressStore {
switch store := s.(type) {
case *RedisStore: