diff --git a/magefile.go b/magefile.go index e1b84da1b..fc0d26042 100644 --- a/magefile.go +++ b/magefile.go @@ -149,6 +149,8 @@ func PublishDocker() error { func Psrpc() error { psrpcProtoFiles := []string{ "pkg/service/rpc/egress.proto", + "pkg/service/rpc/ingress.proto", + "pkg/service/rpc/io.proto", } fmt.Println("generating psrpc") diff --git a/pkg/config/config.go b/pkg/config/config.go index ac13b2c35..fbdc0ea6b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -241,6 +241,7 @@ type EgressConfig struct { type IngressConfig struct { RTMPBaseURL string `yaml:"rtmp_base_url"` + UsePsRPC bool `yaml:"use_psrpc"` } // not exposed to YAML diff --git a/pkg/service/egress.go b/pkg/service/egress.go index e9e4dfc58..b752ad1cd 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -258,19 +258,23 @@ func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStr return nil, ErrEgressNotConnected } - f0 := func() (*livekit.EgressInfo, error) { - return s.clientDeprecated.SendRequest(ctx, &livekit.EgressRequest{ - EgressId: req.EgressId, - Request: &livekit.EgressRequest_UpdateStream{ - UpdateStream: req, - }, + race := rpc.NewRace[livekit.EgressInfo](ctx) + if s.clientDeprecated != nil { + race.Go(func(ctx context.Context) (*livekit.EgressInfo, error) { + return s.clientDeprecated.SendRequest(ctx, &livekit.EgressRequest{ + EgressId: req.EgressId, + Request: &livekit.EgressRequest_UpdateStream{ + UpdateStream: req, + }, + }) }) } - f1 := func() (*livekit.EgressInfo, error) { - return s.psrpcClient.UpdateStream(ctx, req.EgressId, req) + if s.psrpcClient != nil { + race.Go(func(ctx context.Context) (*livekit.EgressInfo, error) { + return s.psrpcClient.UpdateStream(ctx, req.EgressId, req) + }) } - - info, err := s.getFirst(f0, f1) + _, info, err := race.Wait() if err != nil { return nil, err } @@ -323,19 +327,23 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR } } - f0 := func() (*livekit.EgressInfo, error) { - return s.clientDeprecated.SendRequest(ctx, &livekit.EgressRequest{ - EgressId: req.EgressId, - Request: &livekit.EgressRequest_Stop{ - Stop: req, - }, + race := rpc.NewRace[livekit.EgressInfo](ctx) + if s.clientDeprecated != nil { + race.Go(func(ctx context.Context) (*livekit.EgressInfo, error) { + return s.clientDeprecated.SendRequest(ctx, &livekit.EgressRequest{ + EgressId: req.EgressId, + Request: &livekit.EgressRequest_Stop{ + Stop: req, + }, + }) }) } - f1 := func() (*livekit.EgressInfo, error) { - return s.psrpcClient.StopEgress(ctx, req.EgressId, req) + if s.psrpcClient != nil { + race.Go(func(ctx context.Context) (*livekit.EgressInfo, error) { + return s.psrpcClient.StopEgress(ctx, req.EgressId, req) + }) } - - info, err = s.getFirst(f0, f1) + _, info, err = race.Wait() if err != nil { return nil, err } @@ -436,43 +444,3 @@ func (s *EgressService) handleUpdate(info *livekit.EgressInfo) { } } } - -// TODO: remove in future version -func (s *EgressService) getFirst(f0, f1 func() (*livekit.EgressInfo, error)) (*livekit.EgressInfo, error) { - if s.clientDeprecated == nil { - return f1() - } - if s.psrpcClient == nil { - return f0() - } - - type res struct { - info *livekit.EgressInfo - err error - } - v0 := make(chan *res, 1) - v1 := make(chan *res, 1) - - go func() { - info, err := f0() - v0 <- &res{ - info: info, - err: err, - } - }() - - go func() { - info, err := f1() - v1 <- &res{ - info: info, - err: err, - } - }() - - select { - case r := <-v0: - return r.info, r.err - case r := <-v1: - return r.info, r.err - } -} diff --git a/pkg/service/ingress.go b/pkg/service/ingress.go index 196255784..f697f12c5 100644 --- a/pkg/service/ingress.go +++ b/pkg/service/ingress.go @@ -8,11 +8,13 @@ import ( "google.golang.org/protobuf/proto" "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/service/rpc" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/ingress" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" + "github.com/livekit/psrpc" ) var ( @@ -22,6 +24,10 @@ var ( type IngressService struct { conf *config.IngressConfig + nodeID livekit.NodeID + bus psrpc.MessageBus + psrpcClient rpc.IngressClient + psrpcServer rpc.IOInfoServer rpcClient ingress.RPCClient store IngressStore roomService livekit.RoomService @@ -31,6 +37,9 @@ type IngressService struct { func NewIngressService( conf *config.IngressConfig, + nodeID livekit.NodeID, + bus psrpc.MessageBus, + psrpcClient rpc.IngressClient, rpcClient ingress.RPCClient, store IngressStore, rs livekit.RoomService, @@ -39,6 +48,9 @@ func NewIngressService( return &IngressService{ conf: conf, + nodeID: nodeID, + bus: bus, + psrpcClient: psrpcClient, rpcClient: rpcClient, store: store, roomService: rs, @@ -47,15 +59,26 @@ func NewIngressService( } } -func (s *IngressService) Start() { - if s.rpcClient != nil { +func (s *IngressService) Start() error { + if s.psrpcClient != nil { + psrpcServer, err := rpc.NewIOInfoServer(string(s.nodeID), s, s.bus) + if err != nil { + return err + } + s.psrpcServer = psrpcServer + } else if s.rpcClient != nil { go s.updateWorker() go s.entitiesWorker() } + return nil } func (s *IngressService) Stop() { close(s.shutdown) + + if s.psrpcServer != nil { + s.psrpcServer.Shutdown() + } } func (s *IngressService) CreateIngress(ctx context.Context, req *livekit.CreateIngressRequest) (*livekit.IngressInfo, error) { @@ -84,6 +107,9 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref if err != nil { return nil, twirpAuthError(err) } + if s.store == nil { + return nil, ErrIngressNotConnected + } sk := utils.NewGuid("") @@ -159,7 +185,7 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI return nil, twirpAuthError(err) } - if s.rpcClient == nil { + if s.psrpcClient == nil && s.rpcClient == nil { return nil, ErrIngressNotConnected } @@ -201,14 +227,22 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI case livekit.IngressState_ENDPOINT_BUFFERING, livekit.IngressState_ENDPOINT_PUBLISHING: // Do not update store the returned state as the ingress service will do it - s, err := s.sendRPCWithRetry(ctx, &livekit.IngressRequest{ - IngressId: req.IngressId, - Request: &livekit.IngressRequest_Update{Update: req}, - }) - if err != nil { + race := rpc.NewRace[livekit.IngressState](ctx) + if s.rpcClient != nil { + race.Go(func(ctx context.Context) (*livekit.IngressState, error) { + return s.sendRPCWithRetry(ctx, &livekit.IngressRequest{ + IngressId: req.IngressId, + Request: &livekit.IngressRequest_Update{Update: req}, + }) + }) + } + if s.psrpcClient != nil { + race.Go(func(ctx context.Context) (*livekit.IngressState, error) { + return s.psrpcClient.UpdateIngress(ctx, req.IngressId, req) + }) + } + if _, _, err := race.Wait(); err != nil { logger.Warnw("could not update active ingress", err) - } else { - info.State = s } } @@ -227,6 +261,9 @@ func (s *IngressService) ListIngress(ctx context.Context, req *livekit.ListIngre if err != nil { return nil, twirpAuthError(err) } + if s.store == nil { + return nil, ErrIngressNotConnected + } infos, err := s.store.ListIngress(ctx, livekit.RoomName(req.RoomName)) if err != nil { @@ -243,7 +280,7 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI return nil, twirpAuthError(err) } - if s.rpcClient == nil { + if s.psrpcClient == nil && s.rpcClient == nil { return nil, ErrIngressNotConnected } @@ -255,14 +292,22 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI switch info.State.Status { case livekit.IngressState_ENDPOINT_BUFFERING, livekit.IngressState_ENDPOINT_PUBLISHING: - s, err := s.sendRPCWithRetry(ctx, &livekit.IngressRequest{ - IngressId: req.IngressId, - Request: &livekit.IngressRequest_Delete{Delete: req}, - }) - if err != nil { + race := rpc.NewRace[livekit.IngressState](ctx) + if s.rpcClient != nil { + race.Go(func(ctx context.Context) (*livekit.IngressState, error) { + return s.sendRPCWithRetry(ctx, &livekit.IngressRequest{ + IngressId: req.IngressId, + Request: &livekit.IngressRequest_Delete{Delete: req}, + }) + }) + } + if s.psrpcClient != nil { + race.Go(func(ctx context.Context) (*livekit.IngressState, error) { + return s.psrpcClient.DeleteIngress(ctx, req.IngressId, req) + }) + } + if _, _, err := race.Wait(); err != nil { logger.Warnw("could not stop active ingress", err) - } else { - info.State = s } } @@ -308,6 +353,25 @@ func (s *IngressService) updateWorker() { } } +func (s *IngressService) UpdateIngressState(ctx context.Context, req *livekit.UpdateIngressStateRequest) (*rpc.Ignored, error) { + if err := s.store.UpdateIngressState(ctx, req.IngressId, req.State); err != nil { + logger.Errorw("could not update ingress", err) + return nil, err + } + return &rpc.Ignored{}, nil +} + +func (s *IngressService) loadIngressFromInfoRequest(req *livekit.GetIngressInfoRequest) (info *livekit.IngressInfo, err error) { + if req.IngressId != "" { + info, err = s.store.LoadIngress(context.Background(), req.IngressId) + } else if req.StreamKey != "" { + info, err = s.store.LoadIngressFromStreamKey(context.Background(), req.StreamKey) + } else { + err = errors.New("request needs to specity either IngressId or StreamKey") + } + return info, err +} + func (s *IngressService) entitiesWorker() { sub, err := s.rpcClient.GetEntityChannel(context.Background()) if err != nil { @@ -327,14 +391,10 @@ func (s *IngressService) entitiesWorker() { continue } - var info *livekit.IngressInfo - var err error - if req.IngressId != "" { - info, err = s.store.LoadIngress(context.Background(), req.IngressId) - } else if req.StreamKey != "" { - info, err = s.store.LoadIngressFromStreamKey(context.Background(), req.StreamKey) - } else { - err = errors.New("request needs to specity either IngressId or StreamKey") + info, err := s.loadIngressFromInfoRequest(req) + if err != nil { + logger.Errorw("failed to load ingress info", err) + continue } err = s.rpcClient.SendGetIngressInfoResponse(context.Background(), req, &livekit.GetIngressInfoResponse{Info: info}, err) if err != nil { @@ -347,3 +407,11 @@ func (s *IngressService) entitiesWorker() { } } } + +func (s *IngressService) GetIngressInfo(ctx context.Context, req *livekit.GetIngressInfoRequest) (*livekit.GetIngressInfoResponse, error) { + info, err := s.loadIngressFromInfoRequest(req) + if err != nil { + return nil, err + } + return &livekit.GetIngressInfoResponse{Info: info}, nil +} diff --git a/pkg/service/rpc/egress.pb.go b/pkg/service/rpc/egress.pb.go index 880219869..e354b245e 100644 --- a/pkg/service/rpc/egress.pb.go +++ b/pkg/service/rpc/egress.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.27.1 -// protoc v3.21.5 +// protoc-gen-go v1.28.1 +// protoc v3.21.12 // source: pkg/service/rpc/egress.proto package rpc diff --git a/pkg/service/rpc/egress.psrpc.go b/pkg/service/rpc/egress.psrpc.go index 35eb76ee6..ae9593eff 100644 --- a/pkg/service/rpc/egress.psrpc.go +++ b/pkg/service/rpc/egress.psrpc.go @@ -1,4 +1,4 @@ -// Code generated by protoc-gen-psrpc v0.2.1, DO NOT EDIT. +// Code generated by protoc-gen-psrpc v0.2.2, DO NOT EDIT. // source: pkg/service/rpc/egress.proto package rpc diff --git a/pkg/service/rpc/ingress.pb.go b/pkg/service/rpc/ingress.pb.go new file mode 100644 index 000000000..d936dc7b4 --- /dev/null +++ b/pkg/service/rpc/ingress.pb.go @@ -0,0 +1,230 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: pkg/service/rpc/ingress.proto + +package rpc + +import ( + livekit "github.com/livekit/protocol/livekit" + _ "github.com/livekit/psrpc/protoc-gen-psrpc/options" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ListActiveIngressRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *ListActiveIngressRequest) Reset() { + *x = ListActiveIngressRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_service_rpc_ingress_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ListActiveIngressRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListActiveIngressRequest) ProtoMessage() {} + +func (x *ListActiveIngressRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkg_service_rpc_ingress_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListActiveIngressRequest.ProtoReflect.Descriptor instead. +func (*ListActiveIngressRequest) Descriptor() ([]byte, []int) { + return file_pkg_service_rpc_ingress_proto_rawDescGZIP(), []int{0} +} + +type ListActiveIngressResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + IngressIds []string `protobuf:"bytes,1,rep,name=ingress_ids,json=ingressIds,proto3" json:"ingress_ids,omitempty"` +} + +func (x *ListActiveIngressResponse) Reset() { + *x = ListActiveIngressResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_service_rpc_ingress_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ListActiveIngressResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListActiveIngressResponse) ProtoMessage() {} + +func (x *ListActiveIngressResponse) ProtoReflect() protoreflect.Message { + mi := &file_pkg_service_rpc_ingress_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListActiveIngressResponse.ProtoReflect.Descriptor instead. +func (*ListActiveIngressResponse) Descriptor() ([]byte, []int) { + return file_pkg_service_rpc_ingress_proto_rawDescGZIP(), []int{1} +} + +func (x *ListActiveIngressResponse) GetIngressIds() []string { + if x != nil { + return x.IngressIds + } + return nil +} + +var File_pkg_service_rpc_ingress_proto protoreflect.FileDescriptor + +var file_pkg_service_rpc_ingress_proto_rawDesc = []byte{ + 0x0a, 0x1d, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, 0x70, + 0x63, 0x2f, 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x03, 0x72, 0x70, 0x63, 0x1a, 0x0d, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x1a, 0x15, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x69, 0x6e, 0x67, + 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x1a, 0x0a, 0x18, 0x4c, 0x69, + 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x3c, 0x0a, 0x19, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, + 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x5f, 0x69, + 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0a, 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73, + 0x73, 0x49, 0x64, 0x73, 0x32, 0x6d, 0x0a, 0x0f, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, + 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x12, 0x5a, 0x0a, 0x11, 0x4c, 0x69, 0x73, 0x74, 0x41, + 0x63, 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1d, 0x2e, 0x72, + 0x70, 0x63, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, + 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x72, 0x70, + 0x63, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x72, + 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xb2, 0x89, 0x01, + 0x02, 0x08, 0x01, 0x32, 0xae, 0x01, 0x0a, 0x0e, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x48, + 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x4d, 0x0a, 0x0d, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, + 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1d, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, + 0x74, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, + 0x2e, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x74, 0x61, 0x74, 0x65, 0x22, 0x06, 0xb2, + 0x89, 0x01, 0x02, 0x18, 0x01, 0x12, 0x4d, 0x0a, 0x0d, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x49, + 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1d, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, + 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, + 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x74, 0x61, 0x74, 0x65, 0x22, 0x06, 0xb2, 0x89, + 0x01, 0x02, 0x18, 0x01, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, + 0x69, 0x74, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, + 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_pkg_service_rpc_ingress_proto_rawDescOnce sync.Once + file_pkg_service_rpc_ingress_proto_rawDescData = file_pkg_service_rpc_ingress_proto_rawDesc +) + +func file_pkg_service_rpc_ingress_proto_rawDescGZIP() []byte { + file_pkg_service_rpc_ingress_proto_rawDescOnce.Do(func() { + file_pkg_service_rpc_ingress_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_service_rpc_ingress_proto_rawDescData) + }) + return file_pkg_service_rpc_ingress_proto_rawDescData +} + +var file_pkg_service_rpc_ingress_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_pkg_service_rpc_ingress_proto_goTypes = []interface{}{ + (*ListActiveIngressRequest)(nil), // 0: rpc.ListActiveIngressRequest + (*ListActiveIngressResponse)(nil), // 1: rpc.ListActiveIngressResponse + (*livekit.UpdateIngressRequest)(nil), // 2: livekit.UpdateIngressRequest + (*livekit.DeleteIngressRequest)(nil), // 3: livekit.DeleteIngressRequest + (*livekit.IngressState)(nil), // 4: livekit.IngressState +} +var file_pkg_service_rpc_ingress_proto_depIdxs = []int32{ + 0, // 0: rpc.IngressInternal.ListActiveIngress:input_type -> rpc.ListActiveIngressRequest + 2, // 1: rpc.IngressHandler.UpdateIngress:input_type -> livekit.UpdateIngressRequest + 3, // 2: rpc.IngressHandler.DeleteIngress:input_type -> livekit.DeleteIngressRequest + 1, // 3: rpc.IngressInternal.ListActiveIngress:output_type -> rpc.ListActiveIngressResponse + 4, // 4: rpc.IngressHandler.UpdateIngress:output_type -> livekit.IngressState + 4, // 5: rpc.IngressHandler.DeleteIngress:output_type -> livekit.IngressState + 3, // [3:6] is the sub-list for method output_type + 0, // [0:3] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_pkg_service_rpc_ingress_proto_init() } +func file_pkg_service_rpc_ingress_proto_init() { + if File_pkg_service_rpc_ingress_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_pkg_service_rpc_ingress_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListActiveIngressRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_service_rpc_ingress_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListActiveIngressResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pkg_service_rpc_ingress_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 2, + }, + GoTypes: file_pkg_service_rpc_ingress_proto_goTypes, + DependencyIndexes: file_pkg_service_rpc_ingress_proto_depIdxs, + MessageInfos: file_pkg_service_rpc_ingress_proto_msgTypes, + }.Build() + File_pkg_service_rpc_ingress_proto = out.File + file_pkg_service_rpc_ingress_proto_rawDesc = nil + file_pkg_service_rpc_ingress_proto_goTypes = nil + file_pkg_service_rpc_ingress_proto_depIdxs = nil +} diff --git a/pkg/service/rpc/ingress.proto b/pkg/service/rpc/ingress.proto new file mode 100644 index 000000000..6648f282c --- /dev/null +++ b/pkg/service/rpc/ingress.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +package rpc; + +option go_package = "github.com/livekit/livekit/pkg/service/rpc"; + +import "options.proto"; +import "livekit_ingress.proto"; + +service IngressInternal { + rpc ListActiveIngress(ListActiveIngressRequest) returns (ListActiveIngressResponse) { + option (psrpc.options).multi = true; + }; +} + +service IngressHandler { + rpc UpdateIngress(livekit.UpdateIngressRequest) returns (livekit.IngressState) { + option (psrpc.options).topics = true; + }; + rpc DeleteIngress(livekit.DeleteIngressRequest) returns (livekit.IngressState) { + option (psrpc.options).topics = true; + }; +} + +message ListActiveIngressRequest {} + +message ListActiveIngressResponse { + repeated string ingress_ids = 1; +} diff --git a/pkg/service/rpc/ingress.psrpc.go b/pkg/service/rpc/ingress.psrpc.go new file mode 100644 index 000000000..0c8e1eaff --- /dev/null +++ b/pkg/service/rpc/ingress.psrpc.go @@ -0,0 +1,227 @@ +// Code generated by protoc-gen-psrpc v0.2.2, DO NOT EDIT. +// source: pkg/service/rpc/ingress.proto + +package rpc + +import context "context" + +import psrpc1 "github.com/livekit/psrpc" +import livekit2 "github.com/livekit/protocol/livekit" + +// ================================ +// IngressInternal Client Interface +// ================================ + +type IngressInternalClient interface { + ListActiveIngress(context.Context, *ListActiveIngressRequest, ...psrpc1.RequestOption) (<-chan *psrpc1.Response[*ListActiveIngressResponse], error) +} + +// ==================================== +// IngressInternal ServerImpl Interface +// ==================================== + +type IngressInternalServerImpl interface { + ListActiveIngress(context.Context, *ListActiveIngressRequest) (*ListActiveIngressResponse, error) +} + +// ================================ +// IngressInternal Server Interface +// ================================ + +type IngressInternalServer interface { + // Close and wait for pending RPCs to complete + Shutdown() + + // Close immediately, without waiting for pending RPCs + Kill() +} + +// ====================== +// IngressInternal Client +// ====================== + +type ingressInternalClient struct { + client *psrpc1.RPCClient +} + +// NewIngressInternalClient creates a psrpc client that implements the IngressInternalClient interface. +func NewIngressInternalClient(clientID string, bus psrpc1.MessageBus, opts ...psrpc1.ClientOption) (IngressInternalClient, error) { + rpcClient, err := psrpc1.NewRPCClient("IngressInternal", clientID, bus, opts...) + if err != nil { + return nil, err + } + + return &ingressInternalClient{ + client: rpcClient, + }, nil +} + +func (c *ingressInternalClient) ListActiveIngress(ctx context.Context, req *ListActiveIngressRequest, opts ...psrpc1.RequestOption) (<-chan *psrpc1.Response[*ListActiveIngressResponse], error) { + return psrpc1.RequestMulti[*ListActiveIngressResponse](ctx, c.client, "ListActiveIngress", "", req, opts...) +} + +// ====================== +// IngressInternal Server +// ====================== + +type ingressInternalServer struct { + svc IngressInternalServerImpl + rpc *psrpc1.RPCServer +} + +// NewIngressInternalServer builds a RPCServer that will route requests +// to the corresponding method in the provided svc implementation. +func NewIngressInternalServer(serverID string, svc IngressInternalServerImpl, bus psrpc1.MessageBus, opts ...psrpc1.ServerOption) (IngressInternalServer, error) { + s := psrpc1.NewRPCServer("IngressInternal", serverID, bus, opts...) + + var err error + err = psrpc1.RegisterHandler(s, "ListActiveIngress", "", svc.ListActiveIngress, nil) + if err != nil { + s.Close(false) + return nil, err + } + + return &ingressInternalServer{ + svc: svc, + rpc: s, + }, nil +} + +func (s *ingressInternalServer) Shutdown() { + s.rpc.Close(false) +} + +func (s *ingressInternalServer) Kill() { + s.rpc.Close(true) +} + +// =============================== +// IngressHandler Client Interface +// =============================== + +type IngressHandlerClient interface { + UpdateIngress(context.Context, string, *livekit2.UpdateIngressRequest, ...psrpc1.RequestOption) (*livekit2.IngressState, error) + + DeleteIngress(context.Context, string, *livekit2.DeleteIngressRequest, ...psrpc1.RequestOption) (*livekit2.IngressState, error) +} + +// =================================== +// IngressHandler ServerImpl Interface +// =================================== + +type IngressHandlerServerImpl interface { + UpdateIngress(context.Context, *livekit2.UpdateIngressRequest) (*livekit2.IngressState, error) + + DeleteIngress(context.Context, *livekit2.DeleteIngressRequest) (*livekit2.IngressState, error) +} + +// =============================== +// IngressHandler Server Interface +// =============================== + +type IngressHandlerServer interface { + RegisterUpdateIngressTopic(string) error + DeregisterUpdateIngressTopic(string) + + RegisterDeleteIngressTopic(string) error + DeregisterDeleteIngressTopic(string) + + // Close and wait for pending RPCs to complete + Shutdown() + + // Close immediately, without waiting for pending RPCs + Kill() +} + +// ===================== +// IngressHandler Client +// ===================== + +type ingressHandlerClient struct { + client *psrpc1.RPCClient +} + +// NewIngressHandlerClient creates a psrpc client that implements the IngressHandlerClient interface. +func NewIngressHandlerClient(clientID string, bus psrpc1.MessageBus, opts ...psrpc1.ClientOption) (IngressHandlerClient, error) { + rpcClient, err := psrpc1.NewRPCClient("IngressHandler", clientID, bus, opts...) + if err != nil { + return nil, err + } + + return &ingressHandlerClient{ + client: rpcClient, + }, nil +} + +func (c *ingressHandlerClient) UpdateIngress(ctx context.Context, topic string, req *livekit2.UpdateIngressRequest, opts ...psrpc1.RequestOption) (*livekit2.IngressState, error) { + return psrpc1.RequestSingle[*livekit2.IngressState](ctx, c.client, "UpdateIngress", topic, req, opts...) +} + +func (c *ingressHandlerClient) DeleteIngress(ctx context.Context, topic string, req *livekit2.DeleteIngressRequest, opts ...psrpc1.RequestOption) (*livekit2.IngressState, error) { + return psrpc1.RequestSingle[*livekit2.IngressState](ctx, c.client, "DeleteIngress", topic, req, opts...) +} + +// ===================== +// IngressHandler Server +// ===================== + +type ingressHandlerServer struct { + svc IngressHandlerServerImpl + rpc *psrpc1.RPCServer +} + +// NewIngressHandlerServer builds a RPCServer that will route requests +// to the corresponding method in the provided svc implementation. +func NewIngressHandlerServer(serverID string, svc IngressHandlerServerImpl, bus psrpc1.MessageBus, opts ...psrpc1.ServerOption) (IngressHandlerServer, error) { + s := psrpc1.NewRPCServer("IngressHandler", serverID, bus, opts...) + + return &ingressHandlerServer{ + svc: svc, + rpc: s, + }, nil +} + +func (s *ingressHandlerServer) RegisterUpdateIngressTopic(topic string) error { + return psrpc1.RegisterHandler(s.rpc, "UpdateIngress", topic, s.svc.UpdateIngress, nil) +} + +func (s *ingressHandlerServer) DeregisterUpdateIngressTopic(topic string) { + s.rpc.DeregisterHandler("UpdateIngress", topic) +} + +func (s *ingressHandlerServer) RegisterDeleteIngressTopic(topic string) error { + return psrpc1.RegisterHandler(s.rpc, "DeleteIngress", topic, s.svc.DeleteIngress, nil) +} + +func (s *ingressHandlerServer) DeregisterDeleteIngressTopic(topic string) { + s.rpc.DeregisterHandler("DeleteIngress", topic) +} + +func (s *ingressHandlerServer) Shutdown() { + s.rpc.Close(false) +} + +func (s *ingressHandlerServer) Kill() { + s.rpc.Close(true) +} + +var psrpcFileDescriptor1 = []byte{ + // 269 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0xc1, 0x4a, 0xc3, 0x40, + 0x10, 0x86, 0x89, 0x85, 0xa2, 0x2b, 0x55, 0x5c, 0x28, 0xc4, 0x40, 0x55, 0x72, 0x12, 0x91, 0x0d, + 0xd4, 0xab, 0x17, 0xc5, 0x83, 0x01, 0xbd, 0x54, 0xbc, 0xf4, 0x52, 0xd2, 0xcd, 0x10, 0x97, 0xa6, + 0xbb, 0xeb, 0xce, 0x34, 0xef, 0xe0, 0xcb, 0x78, 0xf0, 0x09, 0x45, 0x3b, 0x16, 0xa3, 0x2d, 0xf4, + 0x14, 0xf8, 0xbf, 0xc9, 0xf7, 0x0f, 0x3b, 0x62, 0xe0, 0x67, 0x55, 0x86, 0x10, 0x1a, 0xa3, 0x21, + 0x0b, 0x5e, 0x67, 0xc6, 0x56, 0x01, 0x10, 0x95, 0x0f, 0x8e, 0x9c, 0xec, 0x04, 0xaf, 0x93, 0x9e, + 0xf3, 0x64, 0x9c, 0xe5, 0x2c, 0xe9, 0xd7, 0xa6, 0x81, 0x99, 0xa1, 0x49, 0x6b, 0x34, 0x4d, 0x44, + 0xfc, 0x60, 0x90, 0x6e, 0x34, 0x99, 0x06, 0xf2, 0x25, 0x1a, 0xc1, 0xeb, 0x02, 0x90, 0xd2, 0x6b, + 0x71, 0xbc, 0x86, 0xa1, 0x77, 0x16, 0x41, 0x9e, 0x8a, 0x7d, 0x36, 0x4d, 0x4c, 0x89, 0x71, 0x74, + 0xd6, 0x39, 0xdf, 0x1b, 0x09, 0x8e, 0xf2, 0x12, 0x87, 0x73, 0x71, 0xc8, 0xff, 0xe4, 0x96, 0x20, + 0xd8, 0xa2, 0x96, 0x63, 0x71, 0xf4, 0x4f, 0x28, 0x07, 0x2a, 0x78, 0xad, 0x36, 0x2d, 0x91, 0x9c, + 0x6c, 0xc2, 0xcb, 0x3d, 0xd2, 0xee, 0xc7, 0x5b, 0xb4, 0xb3, 0x1b, 0x0d, 0xdf, 0x23, 0x71, 0xc0, + 0xec, 0xbe, 0xb0, 0x65, 0x0d, 0x41, 0x3e, 0x8a, 0xde, 0xb3, 0x2f, 0x0b, 0xfa, 0x55, 0xc5, 0x8f, + 0xa0, 0x5a, 0xf9, 0x4f, 0x55, 0x7f, 0x85, 0x19, 0x3c, 0x51, 0x41, 0xdc, 0x10, 0x47, 0x5f, 0xba, + 0x3b, 0xa8, 0x61, 0x9d, 0xae, 0x95, 0x6f, 0xa7, 0xbb, 0xbd, 0x1c, 0x5f, 0x54, 0x86, 0x5e, 0x16, + 0x53, 0xa5, 0xdd, 0x3c, 0xe3, 0xd1, 0xd5, 0xf7, 0xcf, 0x81, 0xa7, 0xdd, 0xef, 0x73, 0x5d, 0x7d, + 0x06, 0x00, 0x00, 0xff, 0xff, 0xff, 0xd2, 0xc1, 0x99, 0xfa, 0x01, 0x00, 0x00, +} diff --git a/pkg/service/rpc/ingress_client.go b/pkg/service/rpc/ingress_client.go new file mode 100644 index 000000000..7126a6c10 --- /dev/null +++ b/pkg/service/rpc/ingress_client.go @@ -0,0 +1,36 @@ +package rpc + +import ( + "github.com/livekit/protocol/livekit" + "github.com/livekit/psrpc" +) + +type IngressClient interface { + IngressInternalClient + IngressHandlerClient +} + +type ingressClient struct { + IngressInternalClient + IngressHandlerClient +} + +func NewIngressClient(nodeID livekit.NodeID, bus psrpc.MessageBus) (IngressClient, error) { + if bus == nil { + return nil, nil + } + + clientID := string(nodeID) + internalClient, err := NewIngressInternalClient(clientID, bus) + if err != nil { + return nil, err + } + handlerClient, err := NewIngressHandlerClient(clientID, bus) + if err != nil { + return nil, err + } + return &ingressClient{ + IngressInternalClient: internalClient, + IngressHandlerClient: handlerClient, + }, nil +} diff --git a/pkg/service/rpc/io.pb.go b/pkg/service/rpc/io.pb.go new file mode 100644 index 000000000..84d818e82 --- /dev/null +++ b/pkg/service/rpc/io.pb.go @@ -0,0 +1,154 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: pkg/service/rpc/io.proto + +package rpc + +import ( + livekit "github.com/livekit/protocol/livekit" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Ignored struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *Ignored) Reset() { + *x = Ignored{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_service_rpc_io_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Ignored) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Ignored) ProtoMessage() {} + +func (x *Ignored) ProtoReflect() protoreflect.Message { + mi := &file_pkg_service_rpc_io_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Ignored.ProtoReflect.Descriptor instead. +func (*Ignored) Descriptor() ([]byte, []int) { + return file_pkg_service_rpc_io_proto_rawDescGZIP(), []int{0} +} + +var File_pkg_service_rpc_io_proto protoreflect.FileDescriptor + +var file_pkg_service_rpc_io_proto_rawDesc = []byte{ + 0x0a, 0x18, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, 0x70, + 0x63, 0x2f, 0x69, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x72, 0x70, 0x63, 0x1a, + 0x1a, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x72, 0x70, 0x63, 0x5f, 0x69, 0x6e, 0x74, + 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x09, 0x0a, 0x07, 0x49, + 0x67, 0x6e, 0x6f, 0x72, 0x65, 0x64, 0x32, 0xa3, 0x01, 0x0a, 0x06, 0x49, 0x4f, 0x49, 0x6e, 0x66, + 0x6f, 0x12, 0x51, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, + 0x6e, 0x66, 0x6f, 0x12, 0x1e, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x47, 0x65, + 0x74, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x47, 0x65, + 0x74, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x46, 0x0a, 0x12, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x49, 0x6e, + 0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x22, 0x2e, 0x6c, 0x69, 0x76, + 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, + 0x73, 0x73, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, + 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x49, 0x67, 0x6e, 0x6f, 0x72, 0x65, 0x64, 0x42, 0x2c, 0x5a, 0x2a, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, + 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x73, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, +} + +var ( + file_pkg_service_rpc_io_proto_rawDescOnce sync.Once + file_pkg_service_rpc_io_proto_rawDescData = file_pkg_service_rpc_io_proto_rawDesc +) + +func file_pkg_service_rpc_io_proto_rawDescGZIP() []byte { + file_pkg_service_rpc_io_proto_rawDescOnce.Do(func() { + file_pkg_service_rpc_io_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_service_rpc_io_proto_rawDescData) + }) + return file_pkg_service_rpc_io_proto_rawDescData +} + +var file_pkg_service_rpc_io_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_pkg_service_rpc_io_proto_goTypes = []interface{}{ + (*Ignored)(nil), // 0: rpc.Ignored + (*livekit.GetIngressInfoRequest)(nil), // 1: livekit.GetIngressInfoRequest + (*livekit.UpdateIngressStateRequest)(nil), // 2: livekit.UpdateIngressStateRequest + (*livekit.GetIngressInfoResponse)(nil), // 3: livekit.GetIngressInfoResponse +} +var file_pkg_service_rpc_io_proto_depIdxs = []int32{ + 1, // 0: rpc.IOInfo.GetIngressInfo:input_type -> livekit.GetIngressInfoRequest + 2, // 1: rpc.IOInfo.UpdateIngressState:input_type -> livekit.UpdateIngressStateRequest + 3, // 2: rpc.IOInfo.GetIngressInfo:output_type -> livekit.GetIngressInfoResponse + 0, // 3: rpc.IOInfo.UpdateIngressState:output_type -> rpc.Ignored + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_pkg_service_rpc_io_proto_init() } +func file_pkg_service_rpc_io_proto_init() { + if File_pkg_service_rpc_io_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_pkg_service_rpc_io_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Ignored); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pkg_service_rpc_io_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_pkg_service_rpc_io_proto_goTypes, + DependencyIndexes: file_pkg_service_rpc_io_proto_depIdxs, + MessageInfos: file_pkg_service_rpc_io_proto_msgTypes, + }.Build() + File_pkg_service_rpc_io_proto = out.File + file_pkg_service_rpc_io_proto_rawDesc = nil + file_pkg_service_rpc_io_proto_goTypes = nil + file_pkg_service_rpc_io_proto_depIdxs = nil +} diff --git a/pkg/service/rpc/io.proto b/pkg/service/rpc/io.proto new file mode 100644 index 000000000..b3ebb92e6 --- /dev/null +++ b/pkg/service/rpc/io.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package rpc; + +option go_package = "github.com/livekit/livekit/pkg/service/rpc"; + +import "livekit_rpc_internal.proto"; + +service IOInfo { + rpc GetIngressInfo(livekit.GetIngressInfoRequest) returns (livekit.GetIngressInfoResponse); + rpc UpdateIngressState(livekit.UpdateIngressStateRequest) returns (Ignored); +} + +message Ignored {} diff --git a/pkg/service/rpc/io.psrpc.go b/pkg/service/rpc/io.psrpc.go new file mode 100644 index 000000000..41c1f1036 --- /dev/null +++ b/pkg/service/rpc/io.psrpc.go @@ -0,0 +1,127 @@ +// Code generated by protoc-gen-psrpc v0.2.2, DO NOT EDIT. +// source: pkg/service/rpc/io.proto + +package rpc + +import context "context" + +import psrpc1 "github.com/livekit/psrpc" +import livekit3 "github.com/livekit/protocol/livekit" + +// ======================= +// IOInfo Client Interface +// ======================= + +type IOInfoClient interface { + GetIngressInfo(context.Context, *livekit3.GetIngressInfoRequest, ...psrpc1.RequestOption) (*livekit3.GetIngressInfoResponse, error) + + UpdateIngressState(context.Context, *livekit3.UpdateIngressStateRequest, ...psrpc1.RequestOption) (*Ignored, error) +} + +// =========================== +// IOInfo ServerImpl Interface +// =========================== + +type IOInfoServerImpl interface { + GetIngressInfo(context.Context, *livekit3.GetIngressInfoRequest) (*livekit3.GetIngressInfoResponse, error) + + UpdateIngressState(context.Context, *livekit3.UpdateIngressStateRequest) (*Ignored, error) +} + +// ======================= +// IOInfo Server Interface +// ======================= + +type IOInfoServer interface { + // Close and wait for pending RPCs to complete + Shutdown() + + // Close immediately, without waiting for pending RPCs + Kill() +} + +// ============= +// IOInfo Client +// ============= + +type iOInfoClient struct { + client *psrpc1.RPCClient +} + +// NewIOInfoClient creates a psrpc client that implements the IOInfoClient interface. +func NewIOInfoClient(clientID string, bus psrpc1.MessageBus, opts ...psrpc1.ClientOption) (IOInfoClient, error) { + rpcClient, err := psrpc1.NewRPCClient("IOInfo", clientID, bus, opts...) + if err != nil { + return nil, err + } + + return &iOInfoClient{ + client: rpcClient, + }, nil +} + +func (c *iOInfoClient) GetIngressInfo(ctx context.Context, req *livekit3.GetIngressInfoRequest, opts ...psrpc1.RequestOption) (*livekit3.GetIngressInfoResponse, error) { + return psrpc1.RequestSingle[*livekit3.GetIngressInfoResponse](ctx, c.client, "GetIngressInfo", "", req, opts...) +} + +func (c *iOInfoClient) UpdateIngressState(ctx context.Context, req *livekit3.UpdateIngressStateRequest, opts ...psrpc1.RequestOption) (*Ignored, error) { + return psrpc1.RequestSingle[*Ignored](ctx, c.client, "UpdateIngressState", "", req, opts...) +} + +// ============= +// IOInfo Server +// ============= + +type iOInfoServer struct { + svc IOInfoServerImpl + rpc *psrpc1.RPCServer +} + +// NewIOInfoServer builds a RPCServer that will route requests +// to the corresponding method in the provided svc implementation. +func NewIOInfoServer(serverID string, svc IOInfoServerImpl, bus psrpc1.MessageBus, opts ...psrpc1.ServerOption) (IOInfoServer, error) { + s := psrpc1.NewRPCServer("IOInfo", serverID, bus, opts...) + + var err error + err = psrpc1.RegisterHandler(s, "GetIngressInfo", "", svc.GetIngressInfo, nil) + if err != nil { + s.Close(false) + return nil, err + } + + err = psrpc1.RegisterHandler(s, "UpdateIngressState", "", svc.UpdateIngressState, nil) + if err != nil { + s.Close(false) + return nil, err + } + + return &iOInfoServer{ + svc: svc, + rpc: s, + }, nil +} + +func (s *iOInfoServer) Shutdown() { + s.rpc.Close(false) +} + +func (s *iOInfoServer) Kill() { + s.rpc.Close(true) +} + +var psrpcFileDescriptor2 = []byte{ + // 201 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x28, 0xc8, 0x4e, 0xd7, + 0x2f, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0xd5, 0x2f, 0x2a, 0x48, 0xd6, 0xcf, 0xcc, 0xd7, 0x2b, + 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2e, 0x2a, 0x48, 0x96, 0x92, 0xca, 0xc9, 0x2c, 0x4b, 0xcd, + 0xce, 0x2c, 0x89, 0x2f, 0x2a, 0x48, 0x8e, 0xcf, 0xcc, 0x2b, 0x49, 0x2d, 0xca, 0x4b, 0xcc, 0x81, + 0x28, 0x50, 0xe2, 0xe4, 0x62, 0xf7, 0x4c, 0xcf, 0xcb, 0x2f, 0x4a, 0x4d, 0x31, 0x5a, 0xcc, 0xc8, + 0xc5, 0xe6, 0xe9, 0xef, 0x99, 0x97, 0x96, 0x2f, 0x14, 0xc8, 0xc5, 0xe7, 0x9e, 0x5a, 0xe2, 0x99, + 0x97, 0x5e, 0x94, 0x5a, 0x5c, 0x0c, 0x16, 0x91, 0xd3, 0x83, 0x1a, 0xa2, 0x87, 0x2a, 0x11, 0x94, + 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0x22, 0x25, 0x8f, 0x53, 0xbe, 0xb8, 0x20, 0x3f, 0xaf, 0x38, 0x55, + 0xc8, 0x8d, 0x4b, 0x28, 0xb4, 0x20, 0x25, 0xb1, 0x24, 0x15, 0x2a, 0x19, 0x5c, 0x92, 0x58, 0x92, + 0x2a, 0xa4, 0x04, 0xd7, 0x86, 0x29, 0x09, 0x33, 0x9a, 0x47, 0xaf, 0xa8, 0x20, 0x59, 0x0f, 0xea, + 0x4a, 0x27, 0x9d, 0x28, 0xad, 0xf4, 0xcc, 0x92, 0x8c, 0xd2, 0x24, 0xbd, 0xe4, 0xfc, 0x5c, 0x7d, + 0xa8, 0x6e, 0x38, 0x8d, 0x16, 0x10, 0x49, 0x6c, 0x60, 0x5f, 0x1a, 0x03, 0x02, 0x00, 0x00, 0xff, + 0xff, 0xba, 0x2f, 0x5a, 0x5b, 0x22, 0x01, 0x00, 0x00, +} diff --git a/pkg/service/rpc/race.go b/pkg/service/rpc/race.go new file mode 100644 index 000000000..5c4825156 --- /dev/null +++ b/pkg/service/rpc/race.go @@ -0,0 +1,63 @@ +package rpc + +import ( + "context" + "sync" +) + +type raceResult[T any] struct { + i int + val *T + err error +} + +type Race[T any] struct { + ctx context.Context + cancel context.CancelFunc + nextIndex int + + resultLock sync.Mutex + result *raceResult[T] +} + +// NewRace creates a race to yield the result from one or more candidate +// functions +func NewRace[T any](ctx context.Context) *Race[T] { + ctx, cancel := context.WithCancel(ctx) + return &Race[T]{ + ctx: ctx, + cancel: cancel, + } +} + +// Go adds a candidate function to the race by running it in a new goroutine +func (r *Race[T]) Go(fn func(ctx context.Context) (*T, error)) { + i := r.nextIndex + r.nextIndex++ + + go func() { + val, err := fn(r.ctx) + + r.resultLock.Lock() + if r.result == nil { + r.result = &raceResult[T]{i, val, err} + } + r.resultLock.Unlock() + + r.cancel() + }() +} + +// Wait awaits the first complete function and returns the index and results +// or -1 if the context is cancelled before any candidate finishes. +func (r *Race[T]) Wait() (int, *T, error) { + <-r.ctx.Done() + + r.resultLock.Lock() + res := r.result + r.resultLock.Unlock() + if res != nil { + return res.i, res.val, res.err + } + return -1, nil, r.ctx.Err() +} diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 75ff9acfa..175b7dd33 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -51,6 +51,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live NewEgressLauncher, NewEgressService, ingress.NewRedisRPC, + getIngressClient, getIngressStore, getIngressConfig, getIngressRPCClient, @@ -158,6 +159,14 @@ func getEgressStore(s ObjectStore) EgressStore { } } +func getIngressClient(conf *config.Config, nodeID livekit.NodeID, bus psrpc.MessageBus) (rpc.IngressClient, error) { + if conf.Ingress.UsePsRPC { + return rpc.NewIngressClient(nodeID, bus) + } + + return nil, nil +} + func getIngressStore(s ObjectStore) IngressStore { switch store := s.(type) { case *RedisStore: diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 3a5730345..ea948b54c 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -71,10 +71,14 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } egressService := NewEgressService(egressClient, rpcClient, objectStore, egressStore, roomService, telemetryService, rtcEgressLauncher) ingressConfig := getIngressConfig(conf) + ingressClient, err := getIngressClient(conf, nodeID, messageBus) + if err != nil { + return nil, err + } rpc := ingress.NewRedisRPC(nodeID, universalClient) ingressRPCClient := getIngressRPCClient(rpc) ingressStore := getIngressStore(objectStore) - ingressService := NewIngressService(ingressConfig, ingressRPCClient, ingressStore, roomService, telemetryService) + ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressRPCClient, ingressStore, roomService, telemetryService) rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, telemetryService) clientConfigurationManager := createClientConfiguration() roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher) @@ -187,6 +191,14 @@ func getEgressStore(s ObjectStore) EgressStore { } } +func getIngressClient(conf *config.Config, nodeID livekit.NodeID, bus psrpc.MessageBus) (rpc.IngressClient, error) { + if conf.Ingress.UsePsRPC { + return rpc.NewIngressClient(nodeID, bus) + } + + return nil, nil +} + func getIngressStore(s ObjectStore) IngressStore { switch store := s.(type) { case *RedisStore: