Move psrpc to protocol (#1426)

* move psrpc to protocol

* update checks

* update protocol

* update protocol ref

* blank line
This commit is contained in:
David Colburn
2023-02-15 16:47:38 -08:00
committed by GitHub
parent 1848a21eda
commit 10c53e0ebb
23 changed files with 40 additions and 1500 deletions
+1
View File
@@ -45,6 +45,7 @@ jobs:
- name: Static Check
uses: dominikh/staticcheck-action@v1.3.0
with:
checks: '["all", "-ST1000", "-ST1003", "-ST1020", "-ST1021", "-ST1022", "-SA1019"]'
min-go-version: 1.18
version: 2022.1.3
install-go: false
+1 -4
View File
@@ -17,7 +17,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a
github.com/livekit/protocol v1.4.1
github.com/livekit/protocol v1.4.2-0.20230215235903-8a6b0e05628f
github.com/livekit/psrpc v0.2.6
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995
github.com/mackerelio/go-osstat v0.2.3
@@ -68,15 +68,12 @@ require (
github.com/google/subcommands v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/josharian/native v1.1.0 // indirect
github.com/klauspost/compress v1.15.13 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/lithammer/shortuuid/v4 v4.0.0 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mdlayher/netlink v1.7.1 // indirect
github.com/mdlayher/socket v0.4.0 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.3.0 // indirect
github.com/nats-io/nats.go v1.23.0 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
+3 -7
View File
@@ -212,8 +212,7 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8
github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw=
github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.15.13 h1:NFn1Wr8cfnenSJSA46lLq4wHCcBzKTSjnBIexDMMOV0=
github.com/klauspost/compress v1.15.13/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
@@ -233,8 +232,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a h1:5UkGQpskXp7HcBmyrCwWtO7ygDWbqtjN09Yva4l/nyE=
github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a/go.mod h1:1Dlx20JPoIKGP45eo+yuj0HjeE25zmyeX/EWHiPCjFw=
github.com/livekit/protocol v1.4.1 h1:ZuVEPpRXgYIeCWA3PV+Zs4ygBumRPXBE/OOqr3v9LHE=
github.com/livekit/protocol v1.4.1/go.mod h1:NQqdMVGf7yf7615n0EaUD17d0xkQ0ERMzG3XAHHpbUw=
github.com/livekit/protocol v1.4.2-0.20230215235903-8a6b0e05628f h1:OiphUT1pCHizzgGiNVPkNiLpgyayWT83ylhbLOxO/sc=
github.com/livekit/protocol v1.4.2-0.20230215235903-8a6b0e05628f/go.mod h1:iU8n3n4hj2Wf9s0oHHF63sUEDHw6v/S+/RokL7SgChw=
github.com/livekit/psrpc v0.2.6 h1:cYrpYEwKEd6TRtF9fXYjVEJU2K0laF0uL0hNJg7pw/E=
github.com/livekit/psrpc v0.2.6/go.mod h1:2wtOo1F03vub2qIjx0rAPpVplg873670/LN08o/yopM=
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw=
@@ -269,7 +268,6 @@ github.com/mdlayher/socket v0.1.1/go.mod h1:mYV5YIZAfHh4dzDVzI8x8tWLWCliuX8Mon5A
github.com/mdlayher/socket v0.4.0 h1:280wsy40IC9M9q1uPGcLBwXpcTQDtoGwVt+BNoITxIw=
github.com/mdlayher/socket v0.4.0/go.mod h1:xxFqz5GRCUN3UEOm9CZqEJsAbe1C8OwSK46NlmWuVoc=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@@ -280,7 +278,6 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.9.8 h1:jgxZsv+A3Reb3MgwxaINcNq/za8xZInKhDg9Q0cGN1o=
github.com/nats-io/nats.go v1.23.0 h1:lR28r7IX44WjYgdiKz9GmUeW0uh/m33uD3yEjLZ2cOE=
github.com/nats-io/nats.go v1.23.0/go.mod h1:ki/Scsa23edbh8IRZbCuNXR9TDcbvfaSijKtaqQgw+Q=
@@ -547,7 +544,6 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190411185658-b44545bcd369/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-53
View File
@@ -145,59 +145,6 @@ func PublishDocker() error {
return nil
}
// regenerate psrpc service definitions
func Psrpc() error {
psrpcProtoFiles := []string{
"pkg/service/rpc/egress.proto",
"pkg/service/rpc/ingress.proto",
"pkg/service/rpc/io.proto",
}
fmt.Println("generating psrpc")
protocolDir, err := mageutil.GetPkgDir("github.com/livekit/protocol")
if err != nil {
return err
}
psrpcDir, err := mageutil.GetPkgDir("github.com/livekit/psrpc")
if err != nil {
return err
}
protoc, err := mageutil.GetToolPath("protoc")
if err != nil {
return err
}
protocGoPath, err := mageutil.GetToolPath("protoc-gen-go")
if err != nil {
return err
}
psrpcPath, err := mageutil.GetToolPath("protoc-gen-psrpc")
if err != nil {
return err
}
fmt.Println("generating psrpc protobuf")
args := append([]string{
"--go_out", ".",
"--psrpc_out", ".",
"--go_opt=paths=source_relative",
"--psrpc_opt=paths=source_relative",
"--plugin=go=" + protocGoPath,
"--plugin=psrpc=" + psrpcPath,
"-I" + protocolDir,
"-I" + psrpcDir + "/protoc-gen-psrpc/options",
"-I=.",
}, psrpcProtoFiles...)
cmd := exec.Command(protoc, args...)
mageutil.ConnectStd(cmd)
if err := cmd.Run(); err != nil {
return err
}
return nil
}
// run unit tests, skipping integration
func Test() error {
mg.Deps(generateWire, setULimit)
+5 -4
View File
@@ -9,12 +9,13 @@ import (
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/webhook"
)
type EgressLauncher interface {
StartEgress(context.Context, *livekit.StartEgressRequest) (*livekit.EgressInfo, error)
StartEgressWithClusterId(ctx context.Context, clusterId string, req *livekit.StartEgressRequest) (*livekit.EgressInfo, error)
StartEgress(context.Context, *rpc.StartEgressRequest) (*livekit.EgressInfo, error)
StartEgressWithClusterId(ctx context.Context, clusterId string, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error)
}
func StartTrackEgress(
@@ -77,8 +78,8 @@ func startTrackEgress(
return req, errors.New("egress launcher not found")
}
_, err := launcher.StartEgress(ctx, &livekit.StartEgressRequest{
Request: &livekit.StartEgressRequest_Track{
_, err := launcher.StartEgress(ctx, &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_Track{
Track: req,
},
RoomId: string(roomID),
+14 -13
View File
@@ -9,11 +9,11 @@ import (
"github.com/twitchtv/twirp"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/livekit-server/pkg/service/rpc"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"
)
@@ -79,8 +79,8 @@ func (s *EgressService) StartRoomCompositeEgress(ctx context.Context, req *livek
defer func() {
AppendLogFields(ctx, fields...)
}()
ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &livekit.StartEgressRequest{
Request: &livekit.StartEgressRequest_RoomComposite{
ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_RoomComposite{
RoomComposite: req,
},
})
@@ -101,8 +101,8 @@ func (s *EgressService) StartTrackCompositeEgress(ctx context.Context, req *live
defer func() {
AppendLogFields(ctx, fields...)
}()
ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &livekit.StartEgressRequest{
Request: &livekit.StartEgressRequest_TrackComposite{
ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_TrackComposite{
TrackComposite: req,
},
})
@@ -121,8 +121,8 @@ func (s *EgressService) StartTrackEgress(ctx context.Context, req *livekit.Track
defer func() {
AppendLogFields(ctx, fields...)
}()
ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &livekit.StartEgressRequest{
Request: &livekit.StartEgressRequest_Track{
ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_Track{
Track: req,
},
})
@@ -141,8 +141,8 @@ func (s *EgressService) StartWebEgress(ctx context.Context, req *livekit.WebEgre
defer func() {
AppendLogFields(ctx, fields...)
}()
ei, err := s.startEgress(ctx, "", &livekit.StartEgressRequest{
Request: &livekit.StartEgressRequest_Web{
ei, err := s.startEgress(ctx, "", &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_Web{
Web: req,
},
})
@@ -153,7 +153,7 @@ func (s *EgressService) StartWebEgress(ctx context.Context, req *livekit.WebEgre
return ei, err
}
func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomName, req *livekit.StartEgressRequest) (*livekit.EgressInfo, error) {
func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomName, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) {
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
} else if s.launcher == nil {
@@ -171,14 +171,14 @@ func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomNa
return s.launcher.StartEgress(ctx, req)
}
func (s *egressLauncher) StartEgress(ctx context.Context, req *livekit.StartEgressRequest) (*livekit.EgressInfo, error) {
func (s *egressLauncher) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) {
return s.StartEgressWithClusterId(ctx, "", req)
}
func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId string, req *livekit.StartEgressRequest) (*livekit.EgressInfo, error) {
func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId string, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) {
var info *livekit.EgressInfo
var err error
// Ensure we have a Egress ID
// Ensure we have an Egress ID
if req.EgressId == "" {
req.EgressId = utils.NewGuid(utils.EgressPrefix)
}
@@ -187,6 +187,7 @@ func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId
info, err = s.psrpcClient.StartEgress(ctx, clusterId, req)
} else {
logger.Infow("using deprecated egress client")
// SendRequest will transform rpc.StartEgressRequest into deprecated livekit.StartEgressRequest
info, err = s.clientDeprecated.SendRequest(ctx, req)
}
if err != nil {
+1 -1
View File
@@ -5,11 +5,11 @@ import (
"time"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/service/rpc"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/ingress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"
"github.com/livekit/psrpc"
)
+10 -6
View File
@@ -8,12 +8,12 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/livekit/livekit-server/pkg/service/rpc"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/ingress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"
)
@@ -107,16 +107,16 @@ func (s *IOInfoService) UpdateEgressInfo(ctx context.Context, info *livekit.Egre
return &emptypb.Empty{}, nil
}
func (s *IOInfoService) GetIngressInfo(ctx context.Context, req *livekit.GetIngressInfoRequest) (*livekit.GetIngressInfoResponse, error) {
func (s *IOInfoService) GetIngressInfo(ctx context.Context, req *rpc.GetIngressInfoRequest) (*rpc.GetIngressInfoResponse, error) {
info, err := s.loadIngressFromInfoRequest(req)
if err != nil {
return nil, err
}
return &livekit.GetIngressInfoResponse{Info: info}, nil
return &rpc.GetIngressInfoResponse{Info: info}, nil
}
func (s *IOInfoService) loadIngressFromInfoRequest(req *livekit.GetIngressInfoRequest) (info *livekit.IngressInfo, err error) {
func (s *IOInfoService) loadIngressFromInfoRequest(req *rpc.GetIngressInfoRequest) (info *livekit.IngressInfo, err error) {
if req.IngressId != "" {
info, err = s.is.LoadIngress(context.Background(), req.IngressId)
} else if req.StreamKey != "" {
@@ -127,7 +127,7 @@ func (s *IOInfoService) loadIngressFromInfoRequest(req *livekit.GetIngressInfoRe
return info, err
}
func (s *IOInfoService) UpdateIngressState(ctx context.Context, req *livekit.UpdateIngressStateRequest) (*emptypb.Empty, error) {
func (s *IOInfoService) UpdateIngressState(ctx context.Context, req *rpc.UpdateIngressStateRequest) (*emptypb.Empty, error) {
if err := s.is.UpdateIngressState(ctx, req.IngressId, req.State); err != nil {
logger.Errorw("could not update ingress", err)
return nil, err
@@ -228,11 +228,15 @@ func (s *IOInfoService) ingressWorkerDeprecated() {
continue
}
info, err := s.loadIngressFromInfoRequest(req)
info, err := s.loadIngressFromInfoRequest(&rpc.GetIngressInfoRequest{
IngressId: req.IngressId,
StreamKey: req.StreamKey,
})
if err != nil {
logger.Errorw("failed to load ingress info", err)
continue
}
err = s.icDeprecated.SendGetIngressInfoResponse(context.Background(), req, &livekit.GetIngressInfoResponse{Info: info}, err)
if err != nil {
logger.Errorw("could not send response", err)
+3 -2
View File
@@ -14,6 +14,7 @@ import (
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
)
// A rooms service that supports a single node
@@ -84,8 +85,8 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
}
if req.Egress != nil && req.Egress.Room != nil {
egress := &livekit.StartEgressRequest{
Request: &livekit.StartEgressRequest_RoomComposite{
egress := &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_RoomComposite{
RoomComposite: req.Egress.Room,
},
RoomId: rm.Sid,
-238
View File
@@ -1,238 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.27.1
// protoc v3.21.6
// source: pkg/service/rpc/egress.proto
package rpc
import (
livekit "github.com/livekit/protocol/livekit"
_ "github.com/livekit/psrpc/protoc-gen-psrpc/options"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type ListActiveEgressRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *ListActiveEgressRequest) Reset() {
*x = ListActiveEgressRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_pkg_service_rpc_egress_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ListActiveEgressRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListActiveEgressRequest) ProtoMessage() {}
func (x *ListActiveEgressRequest) ProtoReflect() protoreflect.Message {
mi := &file_pkg_service_rpc_egress_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListActiveEgressRequest.ProtoReflect.Descriptor instead.
func (*ListActiveEgressRequest) Descriptor() ([]byte, []int) {
return file_pkg_service_rpc_egress_proto_rawDescGZIP(), []int{0}
}
type ListActiveEgressResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
EgressIds []string `protobuf:"bytes,1,rep,name=egress_ids,json=egressIds,proto3" json:"egress_ids,omitempty"`
}
func (x *ListActiveEgressResponse) Reset() {
*x = ListActiveEgressResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_pkg_service_rpc_egress_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ListActiveEgressResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListActiveEgressResponse) ProtoMessage() {}
func (x *ListActiveEgressResponse) ProtoReflect() protoreflect.Message {
mi := &file_pkg_service_rpc_egress_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListActiveEgressResponse.ProtoReflect.Descriptor instead.
func (*ListActiveEgressResponse) Descriptor() ([]byte, []int) {
return file_pkg_service_rpc_egress_proto_rawDescGZIP(), []int{1}
}
func (x *ListActiveEgressResponse) GetEgressIds() []string {
if x != nil {
return x.EgressIds
}
return nil
}
var File_pkg_service_rpc_egress_proto protoreflect.FileDescriptor
var file_pkg_service_rpc_egress_proto_rawDesc = []byte{
0x0a, 0x1c, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, 0x70,
0x63, 0x2f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03,
0x72, 0x70, 0x63, 0x1a, 0x0d, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x1a, 0x1a, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x72, 0x70, 0x63, 0x5f,
0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x14,
0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x22, 0x19, 0x0a, 0x17, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69,
0x76, 0x65, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22,
0x39, 0x0a, 0x18, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x45, 0x67, 0x72,
0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x65,
0x67, 0x72, 0x65, 0x73, 0x73, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52,
0x09, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x64, 0x73, 0x32, 0xb8, 0x01, 0x0a, 0x0e, 0x45,
0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x12, 0x4d, 0x0a,
0x0b, 0x53, 0x74, 0x61, 0x72, 0x74, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1b, 0x2e, 0x6c,
0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x45, 0x67, 0x72, 0x65,
0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6c, 0x69, 0x76, 0x65,
0x6b, 0x69, 0x74, 0x2e, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x0c,
0xb2, 0x89, 0x01, 0x02, 0x20, 0x01, 0xb2, 0x89, 0x01, 0x02, 0x18, 0x01, 0x12, 0x57, 0x0a, 0x10,
0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73,
0x12, 0x1c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76,
0x65, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d,
0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x45,
0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xb2,
0x89, 0x01, 0x02, 0x08, 0x01, 0x32, 0xa1, 0x01, 0x0a, 0x0d, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73,
0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x49, 0x0a, 0x0c, 0x55, 0x70, 0x64, 0x61, 0x74,
0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x1c, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69,
0x74, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e,
0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x06, 0xb2, 0x89, 0x01, 0x02,
0x18, 0x01, 0x12, 0x45, 0x0a, 0x0a, 0x53, 0x74, 0x6f, 0x70, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73,
0x12, 0x1a, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x53, 0x74, 0x6f, 0x70, 0x45,
0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6c,
0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66,
0x6f, 0x22, 0x06, 0xb2, 0x89, 0x01, 0x02, 0x18, 0x01, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74,
0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f,
0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76,
0x69, 0x63, 0x65, 0x2f, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_pkg_service_rpc_egress_proto_rawDescOnce sync.Once
file_pkg_service_rpc_egress_proto_rawDescData = file_pkg_service_rpc_egress_proto_rawDesc
)
func file_pkg_service_rpc_egress_proto_rawDescGZIP() []byte {
file_pkg_service_rpc_egress_proto_rawDescOnce.Do(func() {
file_pkg_service_rpc_egress_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_service_rpc_egress_proto_rawDescData)
})
return file_pkg_service_rpc_egress_proto_rawDescData
}
var file_pkg_service_rpc_egress_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_pkg_service_rpc_egress_proto_goTypes = []interface{}{
(*ListActiveEgressRequest)(nil), // 0: rpc.ListActiveEgressRequest
(*ListActiveEgressResponse)(nil), // 1: rpc.ListActiveEgressResponse
(*livekit.StartEgressRequest)(nil), // 2: livekit.StartEgressRequest
(*livekit.UpdateStreamRequest)(nil), // 3: livekit.UpdateStreamRequest
(*livekit.StopEgressRequest)(nil), // 4: livekit.StopEgressRequest
(*livekit.EgressInfo)(nil), // 5: livekit.EgressInfo
}
var file_pkg_service_rpc_egress_proto_depIdxs = []int32{
2, // 0: rpc.EgressInternal.StartEgress:input_type -> livekit.StartEgressRequest
0, // 1: rpc.EgressInternal.ListActiveEgress:input_type -> rpc.ListActiveEgressRequest
3, // 2: rpc.EgressHandler.UpdateStream:input_type -> livekit.UpdateStreamRequest
4, // 3: rpc.EgressHandler.StopEgress:input_type -> livekit.StopEgressRequest
5, // 4: rpc.EgressInternal.StartEgress:output_type -> livekit.EgressInfo
1, // 5: rpc.EgressInternal.ListActiveEgress:output_type -> rpc.ListActiveEgressResponse
5, // 6: rpc.EgressHandler.UpdateStream:output_type -> livekit.EgressInfo
5, // 7: rpc.EgressHandler.StopEgress:output_type -> livekit.EgressInfo
4, // [4:8] is the sub-list for method output_type
0, // [0:4] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_pkg_service_rpc_egress_proto_init() }
func file_pkg_service_rpc_egress_proto_init() {
if File_pkg_service_rpc_egress_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_pkg_service_rpc_egress_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ListActiveEgressRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pkg_service_rpc_egress_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ListActiveEgressResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pkg_service_rpc_egress_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 2,
},
GoTypes: file_pkg_service_rpc_egress_proto_goTypes,
DependencyIndexes: file_pkg_service_rpc_egress_proto_depIdxs,
MessageInfos: file_pkg_service_rpc_egress_proto_msgTypes,
}.Build()
File_pkg_service_rpc_egress_proto = out.File
file_pkg_service_rpc_egress_proto_rawDesc = nil
file_pkg_service_rpc_egress_proto_goTypes = nil
file_pkg_service_rpc_egress_proto_depIdxs = nil
}
-34
View File
@@ -1,34 +0,0 @@
syntax = "proto3";
package rpc;
option go_package = "github.com/livekit/livekit/pkg/service/rpc";
import "options.proto";
import "livekit_rpc_internal.proto";
import "livekit_egress.proto";
service EgressInternal {
rpc StartEgress(livekit.StartEgressRequest) returns (livekit.EgressInfo) {
option (psrpc.options).affinity_func = true;
option (psrpc.options).topics = true;
};
rpc ListActiveEgress(ListActiveEgressRequest) returns (ListActiveEgressResponse) {
option (psrpc.options).multi = true;
}
}
service EgressHandler {
rpc UpdateStream(livekit.UpdateStreamRequest) returns (livekit.EgressInfo) {
option (psrpc.options).topics = true;
}
rpc StopEgress(livekit.StopEgressRequest) returns (livekit.EgressInfo) {
option (psrpc.options).topics = true;
}
}
message ListActiveEgressRequest {}
message ListActiveEgressResponse {
repeated string egress_ids = 1;
}
-253
View File
@@ -1,253 +0,0 @@
// Code generated by protoc-gen-psrpc v0.2.5, DO NOT EDIT.
// source: pkg/service/rpc/egress.proto
package rpc
import context "context"
import psrpc "github.com/livekit/psrpc"
import version "github.com/livekit/psrpc/version"
import livekit "github.com/livekit/protocol/livekit"
import livekit3 "github.com/livekit/protocol/livekit"
var _ = version.PsrpcVersion_0_2_5
// ===============================
// EgressInternal Client Interface
// ===============================
type EgressInternalClient interface {
StartEgress(context.Context, string, *livekit3.StartEgressRequest, ...psrpc.RequestOption) (*livekit.EgressInfo, error)
ListActiveEgress(context.Context, *ListActiveEgressRequest, ...psrpc.RequestOption) (<-chan *psrpc.Response[*ListActiveEgressResponse], error)
}
// ===================================
// EgressInternal ServerImpl Interface
// ===================================
type EgressInternalServerImpl interface {
StartEgress(context.Context, *livekit3.StartEgressRequest) (*livekit.EgressInfo, error)
StartEgressAffinity(*livekit3.StartEgressRequest) float32
ListActiveEgress(context.Context, *ListActiveEgressRequest) (*ListActiveEgressResponse, error)
}
// ===============================
// EgressInternal Server Interface
// ===============================
type EgressInternalServer interface {
RegisterStartEgressTopic(string) error
DeregisterStartEgressTopic(string)
// Close and wait for pending RPCs to complete
Shutdown()
// Close immediately, without waiting for pending RPCs
Kill()
}
// =====================
// EgressInternal Client
// =====================
type egressInternalClient struct {
client *psrpc.RPCClient
}
// NewEgressInternalClient creates a psrpc client that implements the EgressInternalClient interface.
func NewEgressInternalClient(clientID string, bus psrpc.MessageBus, opts ...psrpc.ClientOption) (EgressInternalClient, error) {
rpcClient, err := psrpc.NewRPCClient("EgressInternal", clientID, bus, opts...)
if err != nil {
return nil, err
}
return &egressInternalClient{
client: rpcClient,
}, nil
}
func (c *egressInternalClient) StartEgress(ctx context.Context, topic string, req *livekit3.StartEgressRequest, opts ...psrpc.RequestOption) (*livekit.EgressInfo, error) {
return psrpc.RequestSingle[*livekit.EgressInfo](ctx, c.client, "StartEgress", topic, req, opts...)
}
func (c *egressInternalClient) ListActiveEgress(ctx context.Context, req *ListActiveEgressRequest, opts ...psrpc.RequestOption) (<-chan *psrpc.Response[*ListActiveEgressResponse], error) {
return psrpc.RequestMulti[*ListActiveEgressResponse](ctx, c.client, "ListActiveEgress", "", req, opts...)
}
// =====================
// EgressInternal Server
// =====================
type egressInternalServer struct {
svc EgressInternalServerImpl
rpc *psrpc.RPCServer
}
// NewEgressInternalServer builds a RPCServer that will route requests
// to the corresponding method in the provided svc implementation.
func NewEgressInternalServer(serverID string, svc EgressInternalServerImpl, bus psrpc.MessageBus, opts ...psrpc.ServerOption) (EgressInternalServer, error) {
s := psrpc.NewRPCServer("EgressInternal", serverID, bus, opts...)
var err error
err = psrpc.RegisterHandler(s, "ListActiveEgress", "", svc.ListActiveEgress, nil)
if err != nil {
s.Close(false)
return nil, err
}
return &egressInternalServer{
svc: svc,
rpc: s,
}, nil
}
func (s *egressInternalServer) RegisterStartEgressTopic(topic string) error {
return psrpc.RegisterHandler(s.rpc, "StartEgress", topic, s.svc.StartEgress, s.svc.StartEgressAffinity)
}
func (s *egressInternalServer) DeregisterStartEgressTopic(topic string) {
s.rpc.DeregisterHandler("StartEgress", topic)
}
func (s *egressInternalServer) Shutdown() {
s.rpc.Close(false)
}
func (s *egressInternalServer) Kill() {
s.rpc.Close(true)
}
// ==============================
// EgressHandler Client Interface
// ==============================
type EgressHandlerClient interface {
UpdateStream(context.Context, string, *livekit.UpdateStreamRequest, ...psrpc.RequestOption) (*livekit.EgressInfo, error)
StopEgress(context.Context, string, *livekit.StopEgressRequest, ...psrpc.RequestOption) (*livekit.EgressInfo, error)
}
// ==================================
// EgressHandler ServerImpl Interface
// ==================================
type EgressHandlerServerImpl interface {
UpdateStream(context.Context, *livekit.UpdateStreamRequest) (*livekit.EgressInfo, error)
StopEgress(context.Context, *livekit.StopEgressRequest) (*livekit.EgressInfo, error)
}
// ==============================
// EgressHandler Server Interface
// ==============================
type EgressHandlerServer interface {
RegisterUpdateStreamTopic(string) error
DeregisterUpdateStreamTopic(string)
RegisterStopEgressTopic(string) error
DeregisterStopEgressTopic(string)
// Close and wait for pending RPCs to complete
Shutdown()
// Close immediately, without waiting for pending RPCs
Kill()
}
// ====================
// EgressHandler Client
// ====================
type egressHandlerClient struct {
client *psrpc.RPCClient
}
// NewEgressHandlerClient creates a psrpc client that implements the EgressHandlerClient interface.
func NewEgressHandlerClient(clientID string, bus psrpc.MessageBus, opts ...psrpc.ClientOption) (EgressHandlerClient, error) {
rpcClient, err := psrpc.NewRPCClient("EgressHandler", clientID, bus, opts...)
if err != nil {
return nil, err
}
return &egressHandlerClient{
client: rpcClient,
}, nil
}
func (c *egressHandlerClient) UpdateStream(ctx context.Context, topic string, req *livekit.UpdateStreamRequest, opts ...psrpc.RequestOption) (*livekit.EgressInfo, error) {
return psrpc.RequestSingle[*livekit.EgressInfo](ctx, c.client, "UpdateStream", topic, req, opts...)
}
func (c *egressHandlerClient) StopEgress(ctx context.Context, topic string, req *livekit.StopEgressRequest, opts ...psrpc.RequestOption) (*livekit.EgressInfo, error) {
return psrpc.RequestSingle[*livekit.EgressInfo](ctx, c.client, "StopEgress", topic, req, opts...)
}
// ====================
// EgressHandler Server
// ====================
type egressHandlerServer struct {
svc EgressHandlerServerImpl
rpc *psrpc.RPCServer
}
// NewEgressHandlerServer builds a RPCServer that will route requests
// to the corresponding method in the provided svc implementation.
func NewEgressHandlerServer(serverID string, svc EgressHandlerServerImpl, bus psrpc.MessageBus, opts ...psrpc.ServerOption) (EgressHandlerServer, error) {
s := psrpc.NewRPCServer("EgressHandler", serverID, bus, opts...)
return &egressHandlerServer{
svc: svc,
rpc: s,
}, nil
}
func (s *egressHandlerServer) RegisterUpdateStreamTopic(topic string) error {
return psrpc.RegisterHandler(s.rpc, "UpdateStream", topic, s.svc.UpdateStream, nil)
}
func (s *egressHandlerServer) DeregisterUpdateStreamTopic(topic string) {
s.rpc.DeregisterHandler("UpdateStream", topic)
}
func (s *egressHandlerServer) RegisterStopEgressTopic(topic string) error {
return psrpc.RegisterHandler(s.rpc, "StopEgress", topic, s.svc.StopEgress, nil)
}
func (s *egressHandlerServer) DeregisterStopEgressTopic(topic string) {
s.rpc.DeregisterHandler("StopEgress", topic)
}
func (s *egressHandlerServer) Shutdown() {
s.rpc.Close(false)
}
func (s *egressHandlerServer) Kill() {
s.rpc.Close(true)
}
var psrpcFileDescriptor0 = []byte{
// 312 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0xd1, 0x4a, 0xc3, 0x30,
0x14, 0x86, 0x89, 0x93, 0xb1, 0x1d, 0x9d, 0x48, 0x14, 0xac, 0x75, 0x83, 0xd1, 0x2b, 0x11, 0x69,
0x61, 0x5e, 0x79, 0xa9, 0x30, 0xb0, 0xe0, 0xd5, 0x86, 0x08, 0xde, 0x8c, 0x2e, 0x3d, 0xce, 0xb0,
0xad, 0x89, 0xc9, 0xd9, 0xde, 0x61, 0x8f, 0xe1, 0x2b, 0x0c, 0x1f, 0x50, 0x58, 0xb2, 0x52, 0x26,
0x13, 0xaf, 0x02, 0xff, 0x97, 0xfc, 0xf9, 0x0e, 0x09, 0xb4, 0xf5, 0x74, 0x92, 0x58, 0x34, 0x4b,
0x29, 0x30, 0x31, 0x5a, 0x24, 0x38, 0x31, 0x68, 0x6d, 0xac, 0x8d, 0x22, 0xc5, 0x6b, 0x46, 0x8b,
0xb0, 0xa5, 0x34, 0x49, 0x55, 0xf8, 0x2c, 0x0c, 0x67, 0x72, 0x89, 0x53, 0x49, 0x23, 0xa3, 0xc5,
0x48, 0x16, 0x84, 0xa6, 0xc8, 0x66, 0x9e, 0x9d, 0x6f, 0x59, 0xb5, 0x25, 0xba, 0x84, 0x8b, 0x67,
0x69, 0xe9, 0x41, 0x90, 0x5c, 0x62, 0x7f, 0x43, 0x06, 0xf8, 0xb9, 0x40, 0x4b, 0xd1, 0x3d, 0x04,
0xbf, 0x91, 0xd5, 0xaa, 0xb0, 0xc8, 0x3b, 0x00, 0xae, 0x66, 0x24, 0x73, 0x1b, 0xb0, 0x6e, 0xed,
0xba, 0x39, 0x68, 0xba, 0x24, 0xcd, 0x6d, 0xef, 0x9b, 0xc1, 0x89, 0x3b, 0x91, 0x7a, 0x09, 0x9e,
0xc2, 0xd1, 0x90, 0x32, 0x43, 0x2e, 0xe6, 0x57, 0xb1, 0xd7, 0x89, 0x2b, 0xa9, 0xbf, 0x39, 0x3c,
0x2b, 0xe1, 0xb6, 0xe4, 0x5d, 0x45, 0x8d, 0xf5, 0x8a, 0x1d, 0x06, 0xac, 0xcb, 0xf8, 0x2b, 0x9c,
0xee, 0x8a, 0xf1, 0x76, 0x6c, 0xb4, 0x88, 0xf7, 0x8c, 0x12, 0x76, 0xf6, 0x50, 0x37, 0x4d, 0x54,
0x5f, 0xaf, 0xd8, 0x41, 0x83, 0xf5, 0xbe, 0x18, 0xb4, 0x1c, 0x7a, 0xca, 0x8a, 0x7c, 0x86, 0x86,
0xa7, 0x70, 0xfc, 0xa2, 0xf3, 0x8c, 0x70, 0x48, 0x06, 0xb3, 0x39, 0x6f, 0x97, 0x66, 0xd5, 0xf8,
0x4f, 0xef, 0x4d, 0x79, 0xc0, 0x78, 0x1f, 0x60, 0x48, 0x4a, 0x7b, 0xdf, 0xb0, 0x32, 0xff, 0x36,
0xfc, 0x4f, 0xcd, 0xe3, 0xed, 0xdb, 0xcd, 0x44, 0xd2, 0xc7, 0x62, 0x1c, 0x0b, 0x35, 0x4f, 0xfc,
0xc6, 0x72, 0xdd, 0xf9, 0x31, 0xe3, 0xfa, 0xe6, 0x95, 0xef, 0x7e, 0x02, 0x00, 0x00, 0xff, 0xff,
0xb8, 0xd5, 0x8e, 0x13, 0x4b, 0x02, 0x00, 0x00,
}
-36
View File
@@ -1,36 +0,0 @@
package rpc
import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/psrpc"
)
type EgressClient interface {
EgressInternalClient
EgressHandlerClient
}
type egressClient struct {
EgressInternalClient
EgressHandlerClient
}
func NewEgressClient(nodeID livekit.NodeID, bus psrpc.MessageBus) (EgressClient, error) {
if bus == nil {
return nil, nil
}
clientID := string(nodeID)
internalClient, err := NewEgressInternalClient(clientID, bus)
if err != nil {
return nil, err
}
handlerClient, err := NewEgressHandlerClient(clientID, bus)
if err != nil {
return nil, err
}
return &egressClient{
EgressInternalClient: internalClient,
EgressHandlerClient: handlerClient,
}, nil
}
-230
View File
@@ -1,230 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.27.1
// protoc v3.21.6
// source: pkg/service/rpc/ingress.proto
package rpc
import (
livekit "github.com/livekit/protocol/livekit"
_ "github.com/livekit/psrpc/protoc-gen-psrpc/options"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type ListActiveIngressRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *ListActiveIngressRequest) Reset() {
*x = ListActiveIngressRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_pkg_service_rpc_ingress_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ListActiveIngressRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListActiveIngressRequest) ProtoMessage() {}
func (x *ListActiveIngressRequest) ProtoReflect() protoreflect.Message {
mi := &file_pkg_service_rpc_ingress_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListActiveIngressRequest.ProtoReflect.Descriptor instead.
func (*ListActiveIngressRequest) Descriptor() ([]byte, []int) {
return file_pkg_service_rpc_ingress_proto_rawDescGZIP(), []int{0}
}
type ListActiveIngressResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
IngressIds []string `protobuf:"bytes,1,rep,name=ingress_ids,json=ingressIds,proto3" json:"ingress_ids,omitempty"`
}
func (x *ListActiveIngressResponse) Reset() {
*x = ListActiveIngressResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_pkg_service_rpc_ingress_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ListActiveIngressResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListActiveIngressResponse) ProtoMessage() {}
func (x *ListActiveIngressResponse) ProtoReflect() protoreflect.Message {
mi := &file_pkg_service_rpc_ingress_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListActiveIngressResponse.ProtoReflect.Descriptor instead.
func (*ListActiveIngressResponse) Descriptor() ([]byte, []int) {
return file_pkg_service_rpc_ingress_proto_rawDescGZIP(), []int{1}
}
func (x *ListActiveIngressResponse) GetIngressIds() []string {
if x != nil {
return x.IngressIds
}
return nil
}
var File_pkg_service_rpc_ingress_proto protoreflect.FileDescriptor
var file_pkg_service_rpc_ingress_proto_rawDesc = []byte{
0x0a, 0x1d, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, 0x70,
0x63, 0x2f, 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
0x03, 0x72, 0x70, 0x63, 0x1a, 0x0d, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x1a, 0x15, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x69, 0x6e, 0x67,
0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x1a, 0x0a, 0x18, 0x4c, 0x69,
0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x3c, 0x0a, 0x19, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63,
0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x5f, 0x69,
0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0a, 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73,
0x73, 0x49, 0x64, 0x73, 0x32, 0x6d, 0x0a, 0x0f, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49,
0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x12, 0x5a, 0x0a, 0x11, 0x4c, 0x69, 0x73, 0x74, 0x41,
0x63, 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1d, 0x2e, 0x72,
0x70, 0x63, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67,
0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x72, 0x70,
0x63, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x72,
0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xb2, 0x89, 0x01,
0x02, 0x08, 0x01, 0x32, 0xae, 0x01, 0x0a, 0x0e, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x48,
0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x4d, 0x0a, 0x0d, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65,
0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1d, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69,
0x74, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74,
0x2e, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x74, 0x61, 0x74, 0x65, 0x22, 0x06, 0xb2,
0x89, 0x01, 0x02, 0x18, 0x01, 0x12, 0x4d, 0x0a, 0x0d, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x49,
0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1d, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74,
0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e,
0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x74, 0x61, 0x74, 0x65, 0x22, 0x06, 0xb2, 0x89,
0x01, 0x02, 0x18, 0x01, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63,
0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b,
0x69, 0x74, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72,
0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_pkg_service_rpc_ingress_proto_rawDescOnce sync.Once
file_pkg_service_rpc_ingress_proto_rawDescData = file_pkg_service_rpc_ingress_proto_rawDesc
)
func file_pkg_service_rpc_ingress_proto_rawDescGZIP() []byte {
file_pkg_service_rpc_ingress_proto_rawDescOnce.Do(func() {
file_pkg_service_rpc_ingress_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_service_rpc_ingress_proto_rawDescData)
})
return file_pkg_service_rpc_ingress_proto_rawDescData
}
var file_pkg_service_rpc_ingress_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_pkg_service_rpc_ingress_proto_goTypes = []interface{}{
(*ListActiveIngressRequest)(nil), // 0: rpc.ListActiveIngressRequest
(*ListActiveIngressResponse)(nil), // 1: rpc.ListActiveIngressResponse
(*livekit.UpdateIngressRequest)(nil), // 2: livekit.UpdateIngressRequest
(*livekit.DeleteIngressRequest)(nil), // 3: livekit.DeleteIngressRequest
(*livekit.IngressState)(nil), // 4: livekit.IngressState
}
var file_pkg_service_rpc_ingress_proto_depIdxs = []int32{
0, // 0: rpc.IngressInternal.ListActiveIngress:input_type -> rpc.ListActiveIngressRequest
2, // 1: rpc.IngressHandler.UpdateIngress:input_type -> livekit.UpdateIngressRequest
3, // 2: rpc.IngressHandler.DeleteIngress:input_type -> livekit.DeleteIngressRequest
1, // 3: rpc.IngressInternal.ListActiveIngress:output_type -> rpc.ListActiveIngressResponse
4, // 4: rpc.IngressHandler.UpdateIngress:output_type -> livekit.IngressState
4, // 5: rpc.IngressHandler.DeleteIngress:output_type -> livekit.IngressState
3, // [3:6] is the sub-list for method output_type
0, // [0:3] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_pkg_service_rpc_ingress_proto_init() }
func file_pkg_service_rpc_ingress_proto_init() {
if File_pkg_service_rpc_ingress_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_pkg_service_rpc_ingress_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ListActiveIngressRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pkg_service_rpc_ingress_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ListActiveIngressResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pkg_service_rpc_ingress_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 2,
},
GoTypes: file_pkg_service_rpc_ingress_proto_goTypes,
DependencyIndexes: file_pkg_service_rpc_ingress_proto_depIdxs,
MessageInfos: file_pkg_service_rpc_ingress_proto_msgTypes,
}.Build()
File_pkg_service_rpc_ingress_proto = out.File
file_pkg_service_rpc_ingress_proto_rawDesc = nil
file_pkg_service_rpc_ingress_proto_goTypes = nil
file_pkg_service_rpc_ingress_proto_depIdxs = nil
}
-29
View File
@@ -1,29 +0,0 @@
syntax = "proto3";
package rpc;
option go_package = "github.com/livekit/livekit/pkg/service/rpc";
import "options.proto";
import "livekit_ingress.proto";
service IngressInternal {
rpc ListActiveIngress(ListActiveIngressRequest) returns (ListActiveIngressResponse) {
option (psrpc.options).multi = true;
};
}
service IngressHandler {
rpc UpdateIngress(livekit.UpdateIngressRequest) returns (livekit.IngressState) {
option (psrpc.options).topics = true;
};
rpc DeleteIngress(livekit.DeleteIngressRequest) returns (livekit.IngressState) {
option (psrpc.options).topics = true;
};
}
message ListActiveIngressRequest {}
message ListActiveIngressResponse {
repeated string ingress_ids = 1;
}
-229
View File
@@ -1,229 +0,0 @@
// Code generated by protoc-gen-psrpc v0.2.5, DO NOT EDIT.
// source: pkg/service/rpc/ingress.proto
package rpc
import context "context"
import psrpc "github.com/livekit/psrpc"
import version "github.com/livekit/psrpc/version"
import livekit2 "github.com/livekit/protocol/livekit"
var _ = version.PsrpcVersion_0_2_5
// ================================
// IngressInternal Client Interface
// ================================
type IngressInternalClient interface {
ListActiveIngress(context.Context, *ListActiveIngressRequest, ...psrpc.RequestOption) (<-chan *psrpc.Response[*ListActiveIngressResponse], error)
}
// ====================================
// IngressInternal ServerImpl Interface
// ====================================
type IngressInternalServerImpl interface {
ListActiveIngress(context.Context, *ListActiveIngressRequest) (*ListActiveIngressResponse, error)
}
// ================================
// IngressInternal Server Interface
// ================================
type IngressInternalServer interface {
// Close and wait for pending RPCs to complete
Shutdown()
// Close immediately, without waiting for pending RPCs
Kill()
}
// ======================
// IngressInternal Client
// ======================
type ingressInternalClient struct {
client *psrpc.RPCClient
}
// NewIngressInternalClient creates a psrpc client that implements the IngressInternalClient interface.
func NewIngressInternalClient(clientID string, bus psrpc.MessageBus, opts ...psrpc.ClientOption) (IngressInternalClient, error) {
rpcClient, err := psrpc.NewRPCClient("IngressInternal", clientID, bus, opts...)
if err != nil {
return nil, err
}
return &ingressInternalClient{
client: rpcClient,
}, nil
}
func (c *ingressInternalClient) ListActiveIngress(ctx context.Context, req *ListActiveIngressRequest, opts ...psrpc.RequestOption) (<-chan *psrpc.Response[*ListActiveIngressResponse], error) {
return psrpc.RequestMulti[*ListActiveIngressResponse](ctx, c.client, "ListActiveIngress", "", req, opts...)
}
// ======================
// IngressInternal Server
// ======================
type ingressInternalServer struct {
svc IngressInternalServerImpl
rpc *psrpc.RPCServer
}
// NewIngressInternalServer builds a RPCServer that will route requests
// to the corresponding method in the provided svc implementation.
func NewIngressInternalServer(serverID string, svc IngressInternalServerImpl, bus psrpc.MessageBus, opts ...psrpc.ServerOption) (IngressInternalServer, error) {
s := psrpc.NewRPCServer("IngressInternal", serverID, bus, opts...)
var err error
err = psrpc.RegisterHandler(s, "ListActiveIngress", "", svc.ListActiveIngress, nil)
if err != nil {
s.Close(false)
return nil, err
}
return &ingressInternalServer{
svc: svc,
rpc: s,
}, nil
}
func (s *ingressInternalServer) Shutdown() {
s.rpc.Close(false)
}
func (s *ingressInternalServer) Kill() {
s.rpc.Close(true)
}
// ===============================
// IngressHandler Client Interface
// ===============================
type IngressHandlerClient interface {
UpdateIngress(context.Context, string, *livekit2.UpdateIngressRequest, ...psrpc.RequestOption) (*livekit2.IngressState, error)
DeleteIngress(context.Context, string, *livekit2.DeleteIngressRequest, ...psrpc.RequestOption) (*livekit2.IngressState, error)
}
// ===================================
// IngressHandler ServerImpl Interface
// ===================================
type IngressHandlerServerImpl interface {
UpdateIngress(context.Context, *livekit2.UpdateIngressRequest) (*livekit2.IngressState, error)
DeleteIngress(context.Context, *livekit2.DeleteIngressRequest) (*livekit2.IngressState, error)
}
// ===============================
// IngressHandler Server Interface
// ===============================
type IngressHandlerServer interface {
RegisterUpdateIngressTopic(string) error
DeregisterUpdateIngressTopic(string)
RegisterDeleteIngressTopic(string) error
DeregisterDeleteIngressTopic(string)
// Close and wait for pending RPCs to complete
Shutdown()
// Close immediately, without waiting for pending RPCs
Kill()
}
// =====================
// IngressHandler Client
// =====================
type ingressHandlerClient struct {
client *psrpc.RPCClient
}
// NewIngressHandlerClient creates a psrpc client that implements the IngressHandlerClient interface.
func NewIngressHandlerClient(clientID string, bus psrpc.MessageBus, opts ...psrpc.ClientOption) (IngressHandlerClient, error) {
rpcClient, err := psrpc.NewRPCClient("IngressHandler", clientID, bus, opts...)
if err != nil {
return nil, err
}
return &ingressHandlerClient{
client: rpcClient,
}, nil
}
func (c *ingressHandlerClient) UpdateIngress(ctx context.Context, topic string, req *livekit2.UpdateIngressRequest, opts ...psrpc.RequestOption) (*livekit2.IngressState, error) {
return psrpc.RequestSingle[*livekit2.IngressState](ctx, c.client, "UpdateIngress", topic, req, opts...)
}
func (c *ingressHandlerClient) DeleteIngress(ctx context.Context, topic string, req *livekit2.DeleteIngressRequest, opts ...psrpc.RequestOption) (*livekit2.IngressState, error) {
return psrpc.RequestSingle[*livekit2.IngressState](ctx, c.client, "DeleteIngress", topic, req, opts...)
}
// =====================
// IngressHandler Server
// =====================
type ingressHandlerServer struct {
svc IngressHandlerServerImpl
rpc *psrpc.RPCServer
}
// NewIngressHandlerServer builds a RPCServer that will route requests
// to the corresponding method in the provided svc implementation.
func NewIngressHandlerServer(serverID string, svc IngressHandlerServerImpl, bus psrpc.MessageBus, opts ...psrpc.ServerOption) (IngressHandlerServer, error) {
s := psrpc.NewRPCServer("IngressHandler", serverID, bus, opts...)
return &ingressHandlerServer{
svc: svc,
rpc: s,
}, nil
}
func (s *ingressHandlerServer) RegisterUpdateIngressTopic(topic string) error {
return psrpc.RegisterHandler(s.rpc, "UpdateIngress", topic, s.svc.UpdateIngress, nil)
}
func (s *ingressHandlerServer) DeregisterUpdateIngressTopic(topic string) {
s.rpc.DeregisterHandler("UpdateIngress", topic)
}
func (s *ingressHandlerServer) RegisterDeleteIngressTopic(topic string) error {
return psrpc.RegisterHandler(s.rpc, "DeleteIngress", topic, s.svc.DeleteIngress, nil)
}
func (s *ingressHandlerServer) DeregisterDeleteIngressTopic(topic string) {
s.rpc.DeregisterHandler("DeleteIngress", topic)
}
func (s *ingressHandlerServer) Shutdown() {
s.rpc.Close(false)
}
func (s *ingressHandlerServer) Kill() {
s.rpc.Close(true)
}
var psrpcFileDescriptor1 = []byte{
// 269 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0xc1, 0x4a, 0xc3, 0x40,
0x10, 0x86, 0x89, 0x85, 0xa2, 0x2b, 0x55, 0x5c, 0x28, 0xc4, 0x40, 0x55, 0x72, 0x12, 0x91, 0x0d,
0xd4, 0xab, 0x17, 0xc5, 0x83, 0x01, 0xbd, 0x54, 0xbc, 0xf4, 0x52, 0xd2, 0xcd, 0x10, 0x97, 0xa6,
0xbb, 0xeb, 0xce, 0x34, 0xef, 0xe0, 0xcb, 0x78, 0xf0, 0x09, 0x45, 0x3b, 0x16, 0xa3, 0x2d, 0xf4,
0x14, 0xf8, 0xbf, 0xc9, 0xf7, 0x0f, 0x3b, 0x62, 0xe0, 0x67, 0x55, 0x86, 0x10, 0x1a, 0xa3, 0x21,
0x0b, 0x5e, 0x67, 0xc6, 0x56, 0x01, 0x10, 0x95, 0x0f, 0x8e, 0x9c, 0xec, 0x04, 0xaf, 0x93, 0x9e,
0xf3, 0x64, 0x9c, 0xe5, 0x2c, 0xe9, 0xd7, 0xa6, 0x81, 0x99, 0xa1, 0x49, 0x6b, 0x34, 0x4d, 0x44,
0xfc, 0x60, 0x90, 0x6e, 0x34, 0x99, 0x06, 0xf2, 0x25, 0x1a, 0xc1, 0xeb, 0x02, 0x90, 0xd2, 0x6b,
0x71, 0xbc, 0x86, 0xa1, 0x77, 0x16, 0x41, 0x9e, 0x8a, 0x7d, 0x36, 0x4d, 0x4c, 0x89, 0x71, 0x74,
0xd6, 0x39, 0xdf, 0x1b, 0x09, 0x8e, 0xf2, 0x12, 0x87, 0x73, 0x71, 0xc8, 0xff, 0xe4, 0x96, 0x20,
0xd8, 0xa2, 0x96, 0x63, 0x71, 0xf4, 0x4f, 0x28, 0x07, 0x2a, 0x78, 0xad, 0x36, 0x2d, 0x91, 0x9c,
0x6c, 0xc2, 0xcb, 0x3d, 0xd2, 0xee, 0xc7, 0x5b, 0xb4, 0xb3, 0x1b, 0x0d, 0xdf, 0x23, 0x71, 0xc0,
0xec, 0xbe, 0xb0, 0x65, 0x0d, 0x41, 0x3e, 0x8a, 0xde, 0xb3, 0x2f, 0x0b, 0xfa, 0x55, 0xc5, 0x8f,
0xa0, 0x5a, 0xf9, 0x4f, 0x55, 0x7f, 0x85, 0x19, 0x3c, 0x51, 0x41, 0xdc, 0x10, 0x47, 0x5f, 0xba,
0x3b, 0xa8, 0x61, 0x9d, 0xae, 0x95, 0x6f, 0xa7, 0xbb, 0xbd, 0x1c, 0x5f, 0x54, 0x86, 0x5e, 0x16,
0x53, 0xa5, 0xdd, 0x3c, 0xe3, 0xd1, 0xd5, 0xf7, 0xcf, 0x81, 0xa7, 0xdd, 0xef, 0x73, 0x5d, 0x7d,
0x06, 0x00, 0x00, 0xff, 0xff, 0xff, 0xd2, 0xc1, 0x99, 0xfa, 0x01, 0x00, 0x00,
}
-36
View File
@@ -1,36 +0,0 @@
package rpc
import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/psrpc"
)
type IngressClient interface {
IngressInternalClient
IngressHandlerClient
}
type ingressClient struct {
IngressInternalClient
IngressHandlerClient
}
func NewIngressClient(nodeID livekit.NodeID, bus psrpc.MessageBus) (IngressClient, error) {
if bus == nil {
return nil, nil
}
clientID := string(nodeID)
internalClient, err := NewIngressInternalClient(clientID, bus)
if err != nil {
return nil, err
}
handlerClient, err := NewIngressHandlerClient(clientID, bus)
if err != nil {
return nil, err
}
return &ingressClient{
IngressInternalClient: internalClient,
IngressHandlerClient: handlerClient,
}, nil
}
-98
View File
@@ -1,98 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.27.1
// protoc v3.21.6
// source: pkg/service/rpc/io.proto
package rpc
import (
livekit "github.com/livekit/protocol/livekit"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
emptypb "google.golang.org/protobuf/types/known/emptypb"
reflect "reflect"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
var File_pkg_service_rpc_io_proto protoreflect.FileDescriptor
var file_pkg_service_rpc_io_proto_rawDesc = []byte{
0x0a, 0x18, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, 0x70,
0x63, 0x2f, 0x69, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x72, 0x70, 0x63, 0x1a,
0x14, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1a, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x72,
0x70, 0x63, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0xee,
0x01, 0x0a, 0x06, 0x49, 0x4f, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x3f, 0x0a, 0x10, 0x55, 0x70, 0x64,
0x61, 0x74, 0x65, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x13, 0x2e,
0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e,
0x66, 0x6f, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x51, 0x0a, 0x0e, 0x47, 0x65,
0x74, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1e, 0x2e, 0x6c,
0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73,
0x73, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6c,
0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73,
0x73, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x50, 0x0a,
0x12, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x74,
0x61, 0x74, 0x65, 0x12, 0x22, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x55, 0x70,
0x64, 0x61, 0x74, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x74, 0x61, 0x74, 0x65,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x42,
0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69,
0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x70, 0x6b,
0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var file_pkg_service_rpc_io_proto_goTypes = []interface{}{
(*livekit.EgressInfo)(nil), // 0: livekit.EgressInfo
(*livekit.GetIngressInfoRequest)(nil), // 1: livekit.GetIngressInfoRequest
(*livekit.UpdateIngressStateRequest)(nil), // 2: livekit.UpdateIngressStateRequest
(*emptypb.Empty)(nil), // 3: google.protobuf.Empty
(*livekit.GetIngressInfoResponse)(nil), // 4: livekit.GetIngressInfoResponse
}
var file_pkg_service_rpc_io_proto_depIdxs = []int32{
0, // 0: rpc.IOInfo.UpdateEgressInfo:input_type -> livekit.EgressInfo
1, // 1: rpc.IOInfo.GetIngressInfo:input_type -> livekit.GetIngressInfoRequest
2, // 2: rpc.IOInfo.UpdateIngressState:input_type -> livekit.UpdateIngressStateRequest
3, // 3: rpc.IOInfo.UpdateEgressInfo:output_type -> google.protobuf.Empty
4, // 4: rpc.IOInfo.GetIngressInfo:output_type -> livekit.GetIngressInfoResponse
3, // 5: rpc.IOInfo.UpdateIngressState:output_type -> google.protobuf.Empty
3, // [3:6] is the sub-list for method output_type
0, // [0:3] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_pkg_service_rpc_io_proto_init() }
func file_pkg_service_rpc_io_proto_init() {
if File_pkg_service_rpc_io_proto != nil {
return
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pkg_service_rpc_io_proto_rawDesc,
NumEnums: 0,
NumMessages: 0,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_pkg_service_rpc_io_proto_goTypes,
DependencyIndexes: file_pkg_service_rpc_io_proto_depIdxs,
}.Build()
File_pkg_service_rpc_io_proto = out.File
file_pkg_service_rpc_io_proto_rawDesc = nil
file_pkg_service_rpc_io_proto_goTypes = nil
file_pkg_service_rpc_io_proto_depIdxs = nil
}
-15
View File
@@ -1,15 +0,0 @@
syntax = "proto3";
package rpc;
option go_package = "github.com/livekit/livekit/pkg/service/rpc";
import "livekit_egress.proto";
import "livekit_rpc_internal.proto";
import "google/protobuf/empty.proto";
service IOInfo {
rpc UpdateEgressInfo(livekit.EgressInfo) returns (google.protobuf.Empty);
rpc GetIngressInfo(livekit.GetIngressInfoRequest) returns (livekit.GetIngressInfoResponse);
rpc UpdateIngressState(livekit.UpdateIngressStateRequest) returns (google.protobuf.Empty);
}
-147
View File
@@ -1,147 +0,0 @@
// Code generated by protoc-gen-psrpc v0.2.5, DO NOT EDIT.
// source: pkg/service/rpc/io.proto
package rpc
import context "context"
import psrpc "github.com/livekit/psrpc"
import version "github.com/livekit/psrpc/version"
import google_protobuf2 "google.golang.org/protobuf/types/known/emptypb"
import livekit "github.com/livekit/protocol/livekit"
import livekit3 "github.com/livekit/protocol/livekit"
var _ = version.PsrpcVersion_0_2_5
// =======================
// IOInfo Client Interface
// =======================
type IOInfoClient interface {
UpdateEgressInfo(context.Context, *livekit.EgressInfo, ...psrpc.RequestOption) (*google_protobuf2.Empty, error)
GetIngressInfo(context.Context, *livekit3.GetIngressInfoRequest, ...psrpc.RequestOption) (*livekit3.GetIngressInfoResponse, error)
UpdateIngressState(context.Context, *livekit3.UpdateIngressStateRequest, ...psrpc.RequestOption) (*google_protobuf2.Empty, error)
}
// ===========================
// IOInfo ServerImpl Interface
// ===========================
type IOInfoServerImpl interface {
UpdateEgressInfo(context.Context, *livekit.EgressInfo) (*google_protobuf2.Empty, error)
GetIngressInfo(context.Context, *livekit3.GetIngressInfoRequest) (*livekit3.GetIngressInfoResponse, error)
UpdateIngressState(context.Context, *livekit3.UpdateIngressStateRequest) (*google_protobuf2.Empty, error)
}
// =======================
// IOInfo Server Interface
// =======================
type IOInfoServer interface {
// Close and wait for pending RPCs to complete
Shutdown()
// Close immediately, without waiting for pending RPCs
Kill()
}
// =============
// IOInfo Client
// =============
type iOInfoClient struct {
client *psrpc.RPCClient
}
// NewIOInfoClient creates a psrpc client that implements the IOInfoClient interface.
func NewIOInfoClient(clientID string, bus psrpc.MessageBus, opts ...psrpc.ClientOption) (IOInfoClient, error) {
rpcClient, err := psrpc.NewRPCClient("IOInfo", clientID, bus, opts...)
if err != nil {
return nil, err
}
return &iOInfoClient{
client: rpcClient,
}, nil
}
func (c *iOInfoClient) UpdateEgressInfo(ctx context.Context, req *livekit.EgressInfo, opts ...psrpc.RequestOption) (*google_protobuf2.Empty, error) {
return psrpc.RequestSingle[*google_protobuf2.Empty](ctx, c.client, "UpdateEgressInfo", "", req, opts...)
}
func (c *iOInfoClient) GetIngressInfo(ctx context.Context, req *livekit3.GetIngressInfoRequest, opts ...psrpc.RequestOption) (*livekit3.GetIngressInfoResponse, error) {
return psrpc.RequestSingle[*livekit3.GetIngressInfoResponse](ctx, c.client, "GetIngressInfo", "", req, opts...)
}
func (c *iOInfoClient) UpdateIngressState(ctx context.Context, req *livekit3.UpdateIngressStateRequest, opts ...psrpc.RequestOption) (*google_protobuf2.Empty, error) {
return psrpc.RequestSingle[*google_protobuf2.Empty](ctx, c.client, "UpdateIngressState", "", req, opts...)
}
// =============
// IOInfo Server
// =============
type iOInfoServer struct {
svc IOInfoServerImpl
rpc *psrpc.RPCServer
}
// NewIOInfoServer builds a RPCServer that will route requests
// to the corresponding method in the provided svc implementation.
func NewIOInfoServer(serverID string, svc IOInfoServerImpl, bus psrpc.MessageBus, opts ...psrpc.ServerOption) (IOInfoServer, error) {
s := psrpc.NewRPCServer("IOInfo", serverID, bus, opts...)
var err error
err = psrpc.RegisterHandler(s, "UpdateEgressInfo", "", svc.UpdateEgressInfo, nil)
if err != nil {
s.Close(false)
return nil, err
}
err = psrpc.RegisterHandler(s, "GetIngressInfo", "", svc.GetIngressInfo, nil)
if err != nil {
s.Close(false)
return nil, err
}
err = psrpc.RegisterHandler(s, "UpdateIngressState", "", svc.UpdateIngressState, nil)
if err != nil {
s.Close(false)
return nil, err
}
return &iOInfoServer{
svc: svc,
rpc: s,
}, nil
}
func (s *iOInfoServer) Shutdown() {
s.rpc.Close(false)
}
func (s *iOInfoServer) Kill() {
s.rpc.Close(true)
}
var psrpcFileDescriptor2 = []byte{
// 236 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x90, 0xcd, 0x4a, 0xc4, 0x30,
0x14, 0x85, 0x11, 0x61, 0x16, 0x59, 0x88, 0x44, 0x11, 0x89, 0xa0, 0xe0, 0x52, 0x24, 0x01, 0x7d,
0x00, 0x41, 0x18, 0xa4, 0x2b, 0xff, 0x70, 0xe3, 0x66, 0x68, 0xe3, 0x9d, 0x18, 0xa6, 0x93, 0x7b,
0x4d, 0x6e, 0x07, 0x7c, 0x69, 0x9f, 0x41, 0x6c, 0xd2, 0x0e, 0x2a, 0x5d, 0x85, 0x9c, 0x8f, 0xf3,
0x71, 0xb8, 0xe2, 0x98, 0x56, 0xce, 0x24, 0x88, 0x1b, 0x6f, 0xc1, 0x44, 0xb2, 0xc6, 0xa3, 0xa6,
0x88, 0x8c, 0x72, 0x37, 0x92, 0x55, 0x87, 0xad, 0xdf, 0xc0, 0xca, 0xf3, 0x02, 0x5c, 0x84, 0x94,
0x32, 0x52, 0x6a, 0x48, 0x23, 0xd9, 0x85, 0x0f, 0x0c, 0x31, 0xd4, 0x6d, 0x61, 0x27, 0x0e, 0xd1,
0xb5, 0x60, 0xfa, 0x5f, 0xd3, 0x2d, 0x0d, 0xac, 0x89, 0x3f, 0x33, 0xbc, 0xfa, 0xda, 0x11, 0xb3,
0xea, 0xbe, 0x0a, 0x4b, 0x94, 0x37, 0x62, 0xff, 0x85, 0xde, 0x6a, 0x86, 0x79, 0x6f, 0xee, 0xb3,
0x03, 0x5d, 0xc4, 0x7a, 0x1b, 0xaa, 0x23, 0x9d, 0x8d, 0x7a, 0x30, 0xea, 0xf9, 0x8f, 0x51, 0x3e,
0x8a, 0xbd, 0x3b, 0xe0, 0x2a, 0x6c, 0xeb, 0xa7, 0x63, 0xfd, 0x37, 0x78, 0x82, 0x8f, 0x0e, 0x12,
0xab, 0xb3, 0x49, 0x9e, 0x08, 0x43, 0x02, 0xf9, 0x20, 0x64, 0xde, 0x54, 0xe0, 0x33, 0xd7, 0x0c,
0xf2, 0x7c, 0xac, 0xfd, 0x87, 0x83, 0x7a, 0x62, 0xe4, 0xed, 0xe5, 0xeb, 0x85, 0xf3, 0xfc, 0xde,
0x35, 0xda, 0xe2, 0xda, 0x14, 0xcf, 0xf8, 0xfe, 0xb9, 0x7d, 0x33, 0xeb, 0xdb, 0xd7, 0xdf, 0x01,
0x00, 0x00, 0xff, 0xff, 0x87, 0x58, 0x4d, 0x3b, 0x95, 0x01, 0x00, 0x00,
}
-63
View File
@@ -1,63 +0,0 @@
package rpc
import (
"context"
"sync"
)
type raceResult[T any] struct {
i int
val *T
err error
}
type Race[T any] struct {
ctx context.Context
cancel context.CancelFunc
nextIndex int
resultLock sync.Mutex
result *raceResult[T]
}
// NewRace creates a race to yield the result from one or more candidate
// functions
func NewRace[T any](ctx context.Context) *Race[T] {
ctx, cancel := context.WithCancel(ctx)
return &Race[T]{
ctx: ctx,
cancel: cancel,
}
}
// Go adds a candidate function to the race by running it in a new goroutine
func (r *Race[T]) Go(fn func(ctx context.Context) (*T, error)) {
i := r.nextIndex
r.nextIndex++
go func() {
val, err := fn(r.ctx)
r.resultLock.Lock()
if r.result == nil {
r.result = &raceResult[T]{i, val, err}
}
r.resultLock.Unlock()
r.cancel()
}()
}
// Wait awaits the first complete function and returns the index and results
// or -1 if the context is cancelled before any candidate finishes.
func (r *Race[T]) Wait() (int, *T, error) {
<-r.ctx.Done()
r.resultLock.Lock()
res := r.result
r.resultLock.Unlock()
if res != nil {
return res.i, res.val, res.err
}
return -1, nil, r.ctx.Err()
}
+1 -1
View File
@@ -16,7 +16,6 @@ import (
"github.com/livekit/livekit-server/pkg/clientconfiguration"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/service/rpc"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/auth"
@@ -24,6 +23,7 @@ import (
"github.com/livekit/protocol/ingress"
"github.com/livekit/protocol/livekit"
redisLiveKit "github.com/livekit/protocol/redis"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/webhook"
"github.com/livekit/psrpc"
)
+1 -1
View File
@@ -11,7 +11,6 @@ import (
"github.com/livekit/livekit-server/pkg/clientconfiguration"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/service/rpc"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/auth"
@@ -19,6 +18,7 @@ import (
"github.com/livekit/protocol/ingress"
"github.com/livekit/protocol/livekit"
redis2 "github.com/livekit/protocol/redis"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/webhook"
"github.com/livekit/psrpc"
"github.com/pion/turn/v2"