diff --git a/.github/workflows/buildtest.yaml b/.github/workflows/buildtest.yaml index 984caa2c9..1a3b00a7d 100644 --- a/.github/workflows/buildtest.yaml +++ b/.github/workflows/buildtest.yaml @@ -45,6 +45,7 @@ jobs: - name: Static Check uses: dominikh/staticcheck-action@v1.3.0 with: + checks: '["all", "-ST1000", "-ST1003", "-ST1020", "-ST1021", "-ST1022", "-SA1019"]' min-go-version: 1.18 version: 2022.1.3 install-go: false diff --git a/go.mod b/go.mod index 77e85ba66..a409764f3 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a - github.com/livekit/protocol v1.4.1 + github.com/livekit/protocol v1.4.2-0.20230215235903-8a6b0e05628f github.com/livekit/psrpc v0.2.6 github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 github.com/mackerelio/go-osstat v0.2.3 @@ -68,15 +68,12 @@ require ( github.com/google/subcommands v1.2.0 // indirect github.com/google/uuid v1.3.0 // indirect github.com/josharian/native v1.1.0 // indirect - github.com/klauspost/compress v1.15.13 // indirect github.com/lithammer/shortuuid/v3 v3.0.7 // indirect github.com/lithammer/shortuuid/v4 v4.0.0 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mdlayher/netlink v1.7.1 // indirect github.com/mdlayher/socket v0.4.0 // indirect - github.com/minio/highwayhash v1.0.2 // indirect - github.com/nats-io/jwt/v2 v2.3.0 // indirect github.com/nats-io/nats.go v1.23.0 // indirect github.com/nats-io/nkeys v0.3.0 // indirect github.com/nats-io/nuid v1.0.1 // indirect diff --git a/go.sum b/go.sum index 499848312..ab342d796 100644 --- a/go.sum +++ b/go.sum @@ -212,8 +212,7 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8 github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw= github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.15.13 h1:NFn1Wr8cfnenSJSA46lLq4wHCcBzKTSjnBIexDMMOV0= -github.com/klauspost/compress v1.15.13/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= @@ -233,8 +232,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a h1:5UkGQpskXp7HcBmyrCwWtO7ygDWbqtjN09Yva4l/nyE= github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a/go.mod h1:1Dlx20JPoIKGP45eo+yuj0HjeE25zmyeX/EWHiPCjFw= -github.com/livekit/protocol v1.4.1 h1:ZuVEPpRXgYIeCWA3PV+Zs4ygBumRPXBE/OOqr3v9LHE= -github.com/livekit/protocol v1.4.1/go.mod h1:NQqdMVGf7yf7615n0EaUD17d0xkQ0ERMzG3XAHHpbUw= +github.com/livekit/protocol v1.4.2-0.20230215235903-8a6b0e05628f h1:OiphUT1pCHizzgGiNVPkNiLpgyayWT83ylhbLOxO/sc= +github.com/livekit/protocol v1.4.2-0.20230215235903-8a6b0e05628f/go.mod h1:iU8n3n4hj2Wf9s0oHHF63sUEDHw6v/S+/RokL7SgChw= github.com/livekit/psrpc v0.2.6 h1:cYrpYEwKEd6TRtF9fXYjVEJU2K0laF0uL0hNJg7pw/E= github.com/livekit/psrpc v0.2.6/go.mod h1:2wtOo1F03vub2qIjx0rAPpVplg873670/LN08o/yopM= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw= @@ -269,7 +268,6 @@ github.com/mdlayher/socket v0.1.1/go.mod h1:mYV5YIZAfHh4dzDVzI8x8tWLWCliuX8Mon5A github.com/mdlayher/socket v0.4.0 h1:280wsy40IC9M9q1uPGcLBwXpcTQDtoGwVt+BNoITxIw= github.com/mdlayher/socket v0.4.0/go.mod h1:xxFqz5GRCUN3UEOm9CZqEJsAbe1C8OwSK46NlmWuVoc= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= -github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -280,7 +278,6 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= -github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/nats-server/v2 v2.9.8 h1:jgxZsv+A3Reb3MgwxaINcNq/za8xZInKhDg9Q0cGN1o= github.com/nats-io/nats.go v1.23.0 h1:lR28r7IX44WjYgdiKz9GmUeW0uh/m33uD3yEjLZ2cOE= github.com/nats-io/nats.go v1.23.0/go.mod h1:ki/Scsa23edbh8IRZbCuNXR9TDcbvfaSijKtaqQgw+Q= @@ -547,7 +544,6 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190411185658-b44545bcd369/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/magefile.go b/magefile.go index fc0d26042..1742de3df 100644 --- a/magefile.go +++ b/magefile.go @@ -145,59 +145,6 @@ func PublishDocker() error { return nil } -// regenerate psrpc service definitions -func Psrpc() error { - psrpcProtoFiles := []string{ - "pkg/service/rpc/egress.proto", - "pkg/service/rpc/ingress.proto", - "pkg/service/rpc/io.proto", - } - - fmt.Println("generating psrpc") - - protocolDir, err := mageutil.GetPkgDir("github.com/livekit/protocol") - if err != nil { - return err - } - - psrpcDir, err := mageutil.GetPkgDir("github.com/livekit/psrpc") - if err != nil { - return err - } - - protoc, err := mageutil.GetToolPath("protoc") - if err != nil { - return err - } - protocGoPath, err := mageutil.GetToolPath("protoc-gen-go") - if err != nil { - return err - } - psrpcPath, err := mageutil.GetToolPath("protoc-gen-psrpc") - if err != nil { - return err - } - - fmt.Println("generating psrpc protobuf") - args := append([]string{ - "--go_out", ".", - "--psrpc_out", ".", - "--go_opt=paths=source_relative", - "--psrpc_opt=paths=source_relative", - "--plugin=go=" + protocGoPath, - "--plugin=psrpc=" + psrpcPath, - "-I" + protocolDir, - "-I" + psrpcDir + "/protoc-gen-psrpc/options", - "-I=.", - }, psrpcProtoFiles...) - cmd := exec.Command(protoc, args...) - mageutil.ConnectStd(cmd) - if err := cmd.Run(); err != nil { - return err - } - return nil -} - // run unit tests, skipping integration func Test() error { mg.Deps(generateWire, setULimit) diff --git a/pkg/rtc/room_egress.go b/pkg/rtc/room_egress.go index a1e480d53..989270713 100644 --- a/pkg/rtc/room_egress.go +++ b/pkg/rtc/room_egress.go @@ -9,12 +9,13 @@ import ( "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/webhook" ) type EgressLauncher interface { - StartEgress(context.Context, *livekit.StartEgressRequest) (*livekit.EgressInfo, error) - StartEgressWithClusterId(ctx context.Context, clusterId string, req *livekit.StartEgressRequest) (*livekit.EgressInfo, error) + StartEgress(context.Context, *rpc.StartEgressRequest) (*livekit.EgressInfo, error) + StartEgressWithClusterId(ctx context.Context, clusterId string, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) } func StartTrackEgress( @@ -77,8 +78,8 @@ func startTrackEgress( return req, errors.New("egress launcher not found") } - _, err := launcher.StartEgress(ctx, &livekit.StartEgressRequest{ - Request: &livekit.StartEgressRequest_Track{ + _, err := launcher.StartEgress(ctx, &rpc.StartEgressRequest{ + Request: &rpc.StartEgressRequest_Track{ Track: req, }, RoomId: string(roomID), diff --git a/pkg/service/egress.go b/pkg/service/egress.go index f0a7b7178..abafa7162 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -9,11 +9,11 @@ import ( "github.com/twitchtv/twirp" "github.com/livekit/livekit-server/pkg/rtc" - "github.com/livekit/livekit-server/pkg/service/rpc" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/egress" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils" ) @@ -79,8 +79,8 @@ func (s *EgressService) StartRoomCompositeEgress(ctx context.Context, req *livek defer func() { AppendLogFields(ctx, fields...) }() - ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &livekit.StartEgressRequest{ - Request: &livekit.StartEgressRequest_RoomComposite{ + ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &rpc.StartEgressRequest{ + Request: &rpc.StartEgressRequest_RoomComposite{ RoomComposite: req, }, }) @@ -101,8 +101,8 @@ func (s *EgressService) StartTrackCompositeEgress(ctx context.Context, req *live defer func() { AppendLogFields(ctx, fields...) }() - ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &livekit.StartEgressRequest{ - Request: &livekit.StartEgressRequest_TrackComposite{ + ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &rpc.StartEgressRequest{ + Request: &rpc.StartEgressRequest_TrackComposite{ TrackComposite: req, }, }) @@ -121,8 +121,8 @@ func (s *EgressService) StartTrackEgress(ctx context.Context, req *livekit.Track defer func() { AppendLogFields(ctx, fields...) }() - ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &livekit.StartEgressRequest{ - Request: &livekit.StartEgressRequest_Track{ + ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &rpc.StartEgressRequest{ + Request: &rpc.StartEgressRequest_Track{ Track: req, }, }) @@ -141,8 +141,8 @@ func (s *EgressService) StartWebEgress(ctx context.Context, req *livekit.WebEgre defer func() { AppendLogFields(ctx, fields...) }() - ei, err := s.startEgress(ctx, "", &livekit.StartEgressRequest{ - Request: &livekit.StartEgressRequest_Web{ + ei, err := s.startEgress(ctx, "", &rpc.StartEgressRequest{ + Request: &rpc.StartEgressRequest_Web{ Web: req, }, }) @@ -153,7 +153,7 @@ func (s *EgressService) StartWebEgress(ctx context.Context, req *livekit.WebEgre return ei, err } -func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomName, req *livekit.StartEgressRequest) (*livekit.EgressInfo, error) { +func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomName, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } else if s.launcher == nil { @@ -171,14 +171,14 @@ func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomNa return s.launcher.StartEgress(ctx, req) } -func (s *egressLauncher) StartEgress(ctx context.Context, req *livekit.StartEgressRequest) (*livekit.EgressInfo, error) { +func (s *egressLauncher) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { return s.StartEgressWithClusterId(ctx, "", req) } -func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId string, req *livekit.StartEgressRequest) (*livekit.EgressInfo, error) { +func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId string, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { var info *livekit.EgressInfo var err error - // Ensure we have a Egress ID + // Ensure we have an Egress ID if req.EgressId == "" { req.EgressId = utils.NewGuid(utils.EgressPrefix) } @@ -187,6 +187,7 @@ func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId info, err = s.psrpcClient.StartEgress(ctx, clusterId, req) } else { logger.Infow("using deprecated egress client") + // SendRequest will transform rpc.StartEgressRequest into deprecated livekit.StartEgressRequest info, err = s.clientDeprecated.SendRequest(ctx, req) } if err != nil { diff --git a/pkg/service/ingress.go b/pkg/service/ingress.go index 89b558071..9a6960c66 100644 --- a/pkg/service/ingress.go +++ b/pkg/service/ingress.go @@ -5,11 +5,11 @@ import ( "time" "github.com/livekit/livekit-server/pkg/config" - "github.com/livekit/livekit-server/pkg/service/rpc" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/ingress" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils" "github.com/livekit/psrpc" ) diff --git a/pkg/service/ioinfo.go b/pkg/service/ioinfo.go index b4f272380..11f5381b3 100644 --- a/pkg/service/ioinfo.go +++ b/pkg/service/ioinfo.go @@ -8,12 +8,12 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/emptypb" - "github.com/livekit/livekit-server/pkg/service/rpc" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/egress" "github.com/livekit/protocol/ingress" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/rpc" "github.com/livekit/psrpc" ) @@ -107,16 +107,16 @@ func (s *IOInfoService) UpdateEgressInfo(ctx context.Context, info *livekit.Egre return &emptypb.Empty{}, nil } -func (s *IOInfoService) GetIngressInfo(ctx context.Context, req *livekit.GetIngressInfoRequest) (*livekit.GetIngressInfoResponse, error) { +func (s *IOInfoService) GetIngressInfo(ctx context.Context, req *rpc.GetIngressInfoRequest) (*rpc.GetIngressInfoResponse, error) { info, err := s.loadIngressFromInfoRequest(req) if err != nil { return nil, err } - return &livekit.GetIngressInfoResponse{Info: info}, nil + return &rpc.GetIngressInfoResponse{Info: info}, nil } -func (s *IOInfoService) loadIngressFromInfoRequest(req *livekit.GetIngressInfoRequest) (info *livekit.IngressInfo, err error) { +func (s *IOInfoService) loadIngressFromInfoRequest(req *rpc.GetIngressInfoRequest) (info *livekit.IngressInfo, err error) { if req.IngressId != "" { info, err = s.is.LoadIngress(context.Background(), req.IngressId) } else if req.StreamKey != "" { @@ -127,7 +127,7 @@ func (s *IOInfoService) loadIngressFromInfoRequest(req *livekit.GetIngressInfoRe return info, err } -func (s *IOInfoService) UpdateIngressState(ctx context.Context, req *livekit.UpdateIngressStateRequest) (*emptypb.Empty, error) { +func (s *IOInfoService) UpdateIngressState(ctx context.Context, req *rpc.UpdateIngressStateRequest) (*emptypb.Empty, error) { if err := s.is.UpdateIngressState(ctx, req.IngressId, req.State); err != nil { logger.Errorw("could not update ingress", err) return nil, err @@ -228,11 +228,15 @@ func (s *IOInfoService) ingressWorkerDeprecated() { continue } - info, err := s.loadIngressFromInfoRequest(req) + info, err := s.loadIngressFromInfoRequest(&rpc.GetIngressInfoRequest{ + IngressId: req.IngressId, + StreamKey: req.StreamKey, + }) if err != nil { logger.Errorw("failed to load ingress info", err) continue } + err = s.icDeprecated.SendGetIngressInfoResponse(context.Background(), req, &livekit.GetIngressInfoResponse{Info: info}, err) if err != nil { logger.Errorw("could not send response", err) diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 8cd4238d9..d4a4fe5a0 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -14,6 +14,7 @@ import ( "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc" ) // A rooms service that supports a single node @@ -84,8 +85,8 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq } if req.Egress != nil && req.Egress.Room != nil { - egress := &livekit.StartEgressRequest{ - Request: &livekit.StartEgressRequest_RoomComposite{ + egress := &rpc.StartEgressRequest{ + Request: &rpc.StartEgressRequest_RoomComposite{ RoomComposite: req.Egress.Room, }, RoomId: rm.Sid, diff --git a/pkg/service/rpc/egress.pb.go b/pkg/service/rpc/egress.pb.go deleted file mode 100644 index 7bf8c5a1f..000000000 --- a/pkg/service/rpc/egress.pb.go +++ /dev/null @@ -1,238 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.27.1 -// protoc v3.21.6 -// source: pkg/service/rpc/egress.proto - -package rpc - -import ( - livekit "github.com/livekit/protocol/livekit" - _ "github.com/livekit/psrpc/protoc-gen-psrpc/options" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type ListActiveEgressRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *ListActiveEgressRequest) Reset() { - *x = ListActiveEgressRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_pkg_service_rpc_egress_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ListActiveEgressRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ListActiveEgressRequest) ProtoMessage() {} - -func (x *ListActiveEgressRequest) ProtoReflect() protoreflect.Message { - mi := &file_pkg_service_rpc_egress_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ListActiveEgressRequest.ProtoReflect.Descriptor instead. -func (*ListActiveEgressRequest) Descriptor() ([]byte, []int) { - return file_pkg_service_rpc_egress_proto_rawDescGZIP(), []int{0} -} - -type ListActiveEgressResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - EgressIds []string `protobuf:"bytes,1,rep,name=egress_ids,json=egressIds,proto3" json:"egress_ids,omitempty"` -} - -func (x *ListActiveEgressResponse) Reset() { - *x = ListActiveEgressResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_pkg_service_rpc_egress_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ListActiveEgressResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ListActiveEgressResponse) ProtoMessage() {} - -func (x *ListActiveEgressResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_service_rpc_egress_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ListActiveEgressResponse.ProtoReflect.Descriptor instead. -func (*ListActiveEgressResponse) Descriptor() ([]byte, []int) { - return file_pkg_service_rpc_egress_proto_rawDescGZIP(), []int{1} -} - -func (x *ListActiveEgressResponse) GetEgressIds() []string { - if x != nil { - return x.EgressIds - } - return nil -} - -var File_pkg_service_rpc_egress_proto protoreflect.FileDescriptor - -var file_pkg_service_rpc_egress_proto_rawDesc = []byte{ - 0x0a, 0x1c, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, 0x70, - 0x63, 0x2f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, - 0x72, 0x70, 0x63, 0x1a, 0x0d, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x1a, 0x1a, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x72, 0x70, 0x63, 0x5f, - 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x14, - 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x19, 0x0a, 0x17, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, - 0x76, 0x65, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, - 0x39, 0x0a, 0x18, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x45, 0x67, 0x72, - 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x65, - 0x67, 0x72, 0x65, 0x73, 0x73, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, - 0x09, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x64, 0x73, 0x32, 0xb8, 0x01, 0x0a, 0x0e, 0x45, - 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x12, 0x4d, 0x0a, - 0x0b, 0x53, 0x74, 0x61, 0x72, 0x74, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1b, 0x2e, 0x6c, - 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x45, 0x67, 0x72, 0x65, - 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6c, 0x69, 0x76, 0x65, - 0x6b, 0x69, 0x74, 0x2e, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x0c, - 0xb2, 0x89, 0x01, 0x02, 0x20, 0x01, 0xb2, 0x89, 0x01, 0x02, 0x18, 0x01, 0x12, 0x57, 0x0a, 0x10, - 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, - 0x12, 0x1c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, - 0x65, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, - 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x45, - 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xb2, - 0x89, 0x01, 0x02, 0x08, 0x01, 0x32, 0xa1, 0x01, 0x0a, 0x0d, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, - 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x49, 0x0a, 0x0c, 0x55, 0x70, 0x64, 0x61, 0x74, - 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x1c, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, - 0x74, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, - 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x06, 0xb2, 0x89, 0x01, 0x02, - 0x18, 0x01, 0x12, 0x45, 0x0a, 0x0a, 0x53, 0x74, 0x6f, 0x70, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, - 0x12, 0x1a, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x53, 0x74, 0x6f, 0x70, 0x45, - 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6c, - 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, - 0x6f, 0x22, 0x06, 0xb2, 0x89, 0x01, 0x02, 0x18, 0x01, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, - 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, - 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x2f, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_pkg_service_rpc_egress_proto_rawDescOnce sync.Once - file_pkg_service_rpc_egress_proto_rawDescData = file_pkg_service_rpc_egress_proto_rawDesc -) - -func file_pkg_service_rpc_egress_proto_rawDescGZIP() []byte { - file_pkg_service_rpc_egress_proto_rawDescOnce.Do(func() { - file_pkg_service_rpc_egress_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_service_rpc_egress_proto_rawDescData) - }) - return file_pkg_service_rpc_egress_proto_rawDescData -} - -var file_pkg_service_rpc_egress_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_pkg_service_rpc_egress_proto_goTypes = []interface{}{ - (*ListActiveEgressRequest)(nil), // 0: rpc.ListActiveEgressRequest - (*ListActiveEgressResponse)(nil), // 1: rpc.ListActiveEgressResponse - (*livekit.StartEgressRequest)(nil), // 2: livekit.StartEgressRequest - (*livekit.UpdateStreamRequest)(nil), // 3: livekit.UpdateStreamRequest - (*livekit.StopEgressRequest)(nil), // 4: livekit.StopEgressRequest - (*livekit.EgressInfo)(nil), // 5: livekit.EgressInfo -} -var file_pkg_service_rpc_egress_proto_depIdxs = []int32{ - 2, // 0: rpc.EgressInternal.StartEgress:input_type -> livekit.StartEgressRequest - 0, // 1: rpc.EgressInternal.ListActiveEgress:input_type -> rpc.ListActiveEgressRequest - 3, // 2: rpc.EgressHandler.UpdateStream:input_type -> livekit.UpdateStreamRequest - 4, // 3: rpc.EgressHandler.StopEgress:input_type -> livekit.StopEgressRequest - 5, // 4: rpc.EgressInternal.StartEgress:output_type -> livekit.EgressInfo - 1, // 5: rpc.EgressInternal.ListActiveEgress:output_type -> rpc.ListActiveEgressResponse - 5, // 6: rpc.EgressHandler.UpdateStream:output_type -> livekit.EgressInfo - 5, // 7: rpc.EgressHandler.StopEgress:output_type -> livekit.EgressInfo - 4, // [4:8] is the sub-list for method output_type - 0, // [0:4] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_pkg_service_rpc_egress_proto_init() } -func file_pkg_service_rpc_egress_proto_init() { - if File_pkg_service_rpc_egress_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_pkg_service_rpc_egress_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListActiveEgressRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkg_service_rpc_egress_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListActiveEgressResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_pkg_service_rpc_egress_proto_rawDesc, - NumEnums: 0, - NumMessages: 2, - NumExtensions: 0, - NumServices: 2, - }, - GoTypes: file_pkg_service_rpc_egress_proto_goTypes, - DependencyIndexes: file_pkg_service_rpc_egress_proto_depIdxs, - MessageInfos: file_pkg_service_rpc_egress_proto_msgTypes, - }.Build() - File_pkg_service_rpc_egress_proto = out.File - file_pkg_service_rpc_egress_proto_rawDesc = nil - file_pkg_service_rpc_egress_proto_goTypes = nil - file_pkg_service_rpc_egress_proto_depIdxs = nil -} diff --git a/pkg/service/rpc/egress.proto b/pkg/service/rpc/egress.proto deleted file mode 100644 index 311946c4c..000000000 --- a/pkg/service/rpc/egress.proto +++ /dev/null @@ -1,34 +0,0 @@ -syntax = "proto3"; - -package rpc; - -option go_package = "github.com/livekit/livekit/pkg/service/rpc"; - -import "options.proto"; -import "livekit_rpc_internal.proto"; -import "livekit_egress.proto"; - -service EgressInternal { - rpc StartEgress(livekit.StartEgressRequest) returns (livekit.EgressInfo) { - option (psrpc.options).affinity_func = true; - option (psrpc.options).topics = true; - }; - rpc ListActiveEgress(ListActiveEgressRequest) returns (ListActiveEgressResponse) { - option (psrpc.options).multi = true; - } -} - -service EgressHandler { - rpc UpdateStream(livekit.UpdateStreamRequest) returns (livekit.EgressInfo) { - option (psrpc.options).topics = true; - } - rpc StopEgress(livekit.StopEgressRequest) returns (livekit.EgressInfo) { - option (psrpc.options).topics = true; - } -} - -message ListActiveEgressRequest {} - -message ListActiveEgressResponse { - repeated string egress_ids = 1; -} diff --git a/pkg/service/rpc/egress.psrpc.go b/pkg/service/rpc/egress.psrpc.go deleted file mode 100644 index 6ac71b66f..000000000 --- a/pkg/service/rpc/egress.psrpc.go +++ /dev/null @@ -1,253 +0,0 @@ -// Code generated by protoc-gen-psrpc v0.2.5, DO NOT EDIT. -// source: pkg/service/rpc/egress.proto - -package rpc - -import context "context" -import psrpc "github.com/livekit/psrpc" -import version "github.com/livekit/psrpc/version" -import livekit "github.com/livekit/protocol/livekit" -import livekit3 "github.com/livekit/protocol/livekit" - -var _ = version.PsrpcVersion_0_2_5 - -// =============================== -// EgressInternal Client Interface -// =============================== - -type EgressInternalClient interface { - StartEgress(context.Context, string, *livekit3.StartEgressRequest, ...psrpc.RequestOption) (*livekit.EgressInfo, error) - - ListActiveEgress(context.Context, *ListActiveEgressRequest, ...psrpc.RequestOption) (<-chan *psrpc.Response[*ListActiveEgressResponse], error) -} - -// =================================== -// EgressInternal ServerImpl Interface -// =================================== - -type EgressInternalServerImpl interface { - StartEgress(context.Context, *livekit3.StartEgressRequest) (*livekit.EgressInfo, error) - StartEgressAffinity(*livekit3.StartEgressRequest) float32 - - ListActiveEgress(context.Context, *ListActiveEgressRequest) (*ListActiveEgressResponse, error) -} - -// =============================== -// EgressInternal Server Interface -// =============================== - -type EgressInternalServer interface { - RegisterStartEgressTopic(string) error - DeregisterStartEgressTopic(string) - - // Close and wait for pending RPCs to complete - Shutdown() - - // Close immediately, without waiting for pending RPCs - Kill() -} - -// ===================== -// EgressInternal Client -// ===================== - -type egressInternalClient struct { - client *psrpc.RPCClient -} - -// NewEgressInternalClient creates a psrpc client that implements the EgressInternalClient interface. -func NewEgressInternalClient(clientID string, bus psrpc.MessageBus, opts ...psrpc.ClientOption) (EgressInternalClient, error) { - rpcClient, err := psrpc.NewRPCClient("EgressInternal", clientID, bus, opts...) - if err != nil { - return nil, err - } - - return &egressInternalClient{ - client: rpcClient, - }, nil -} - -func (c *egressInternalClient) StartEgress(ctx context.Context, topic string, req *livekit3.StartEgressRequest, opts ...psrpc.RequestOption) (*livekit.EgressInfo, error) { - return psrpc.RequestSingle[*livekit.EgressInfo](ctx, c.client, "StartEgress", topic, req, opts...) -} - -func (c *egressInternalClient) ListActiveEgress(ctx context.Context, req *ListActiveEgressRequest, opts ...psrpc.RequestOption) (<-chan *psrpc.Response[*ListActiveEgressResponse], error) { - return psrpc.RequestMulti[*ListActiveEgressResponse](ctx, c.client, "ListActiveEgress", "", req, opts...) -} - -// ===================== -// EgressInternal Server -// ===================== - -type egressInternalServer struct { - svc EgressInternalServerImpl - rpc *psrpc.RPCServer -} - -// NewEgressInternalServer builds a RPCServer that will route requests -// to the corresponding method in the provided svc implementation. -func NewEgressInternalServer(serverID string, svc EgressInternalServerImpl, bus psrpc.MessageBus, opts ...psrpc.ServerOption) (EgressInternalServer, error) { - s := psrpc.NewRPCServer("EgressInternal", serverID, bus, opts...) - - var err error - err = psrpc.RegisterHandler(s, "ListActiveEgress", "", svc.ListActiveEgress, nil) - if err != nil { - s.Close(false) - return nil, err - } - - return &egressInternalServer{ - svc: svc, - rpc: s, - }, nil -} - -func (s *egressInternalServer) RegisterStartEgressTopic(topic string) error { - return psrpc.RegisterHandler(s.rpc, "StartEgress", topic, s.svc.StartEgress, s.svc.StartEgressAffinity) -} - -func (s *egressInternalServer) DeregisterStartEgressTopic(topic string) { - s.rpc.DeregisterHandler("StartEgress", topic) -} - -func (s *egressInternalServer) Shutdown() { - s.rpc.Close(false) -} - -func (s *egressInternalServer) Kill() { - s.rpc.Close(true) -} - -// ============================== -// EgressHandler Client Interface -// ============================== - -type EgressHandlerClient interface { - UpdateStream(context.Context, string, *livekit.UpdateStreamRequest, ...psrpc.RequestOption) (*livekit.EgressInfo, error) - - StopEgress(context.Context, string, *livekit.StopEgressRequest, ...psrpc.RequestOption) (*livekit.EgressInfo, error) -} - -// ================================== -// EgressHandler ServerImpl Interface -// ================================== - -type EgressHandlerServerImpl interface { - UpdateStream(context.Context, *livekit.UpdateStreamRequest) (*livekit.EgressInfo, error) - - StopEgress(context.Context, *livekit.StopEgressRequest) (*livekit.EgressInfo, error) -} - -// ============================== -// EgressHandler Server Interface -// ============================== - -type EgressHandlerServer interface { - RegisterUpdateStreamTopic(string) error - DeregisterUpdateStreamTopic(string) - - RegisterStopEgressTopic(string) error - DeregisterStopEgressTopic(string) - - // Close and wait for pending RPCs to complete - Shutdown() - - // Close immediately, without waiting for pending RPCs - Kill() -} - -// ==================== -// EgressHandler Client -// ==================== - -type egressHandlerClient struct { - client *psrpc.RPCClient -} - -// NewEgressHandlerClient creates a psrpc client that implements the EgressHandlerClient interface. -func NewEgressHandlerClient(clientID string, bus psrpc.MessageBus, opts ...psrpc.ClientOption) (EgressHandlerClient, error) { - rpcClient, err := psrpc.NewRPCClient("EgressHandler", clientID, bus, opts...) - if err != nil { - return nil, err - } - - return &egressHandlerClient{ - client: rpcClient, - }, nil -} - -func (c *egressHandlerClient) UpdateStream(ctx context.Context, topic string, req *livekit.UpdateStreamRequest, opts ...psrpc.RequestOption) (*livekit.EgressInfo, error) { - return psrpc.RequestSingle[*livekit.EgressInfo](ctx, c.client, "UpdateStream", topic, req, opts...) -} - -func (c *egressHandlerClient) StopEgress(ctx context.Context, topic string, req *livekit.StopEgressRequest, opts ...psrpc.RequestOption) (*livekit.EgressInfo, error) { - return psrpc.RequestSingle[*livekit.EgressInfo](ctx, c.client, "StopEgress", topic, req, opts...) -} - -// ==================== -// EgressHandler Server -// ==================== - -type egressHandlerServer struct { - svc EgressHandlerServerImpl - rpc *psrpc.RPCServer -} - -// NewEgressHandlerServer builds a RPCServer that will route requests -// to the corresponding method in the provided svc implementation. -func NewEgressHandlerServer(serverID string, svc EgressHandlerServerImpl, bus psrpc.MessageBus, opts ...psrpc.ServerOption) (EgressHandlerServer, error) { - s := psrpc.NewRPCServer("EgressHandler", serverID, bus, opts...) - - return &egressHandlerServer{ - svc: svc, - rpc: s, - }, nil -} - -func (s *egressHandlerServer) RegisterUpdateStreamTopic(topic string) error { - return psrpc.RegisterHandler(s.rpc, "UpdateStream", topic, s.svc.UpdateStream, nil) -} - -func (s *egressHandlerServer) DeregisterUpdateStreamTopic(topic string) { - s.rpc.DeregisterHandler("UpdateStream", topic) -} - -func (s *egressHandlerServer) RegisterStopEgressTopic(topic string) error { - return psrpc.RegisterHandler(s.rpc, "StopEgress", topic, s.svc.StopEgress, nil) -} - -func (s *egressHandlerServer) DeregisterStopEgressTopic(topic string) { - s.rpc.DeregisterHandler("StopEgress", topic) -} - -func (s *egressHandlerServer) Shutdown() { - s.rpc.Close(false) -} - -func (s *egressHandlerServer) Kill() { - s.rpc.Close(true) -} - -var psrpcFileDescriptor0 = []byte{ - // 312 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0xd1, 0x4a, 0xc3, 0x30, - 0x14, 0x86, 0x89, 0x93, 0xb1, 0x1d, 0x9d, 0x48, 0x14, 0xac, 0x75, 0x83, 0xd1, 0x2b, 0x11, 0x69, - 0x61, 0x5e, 0x79, 0xa9, 0x30, 0xb0, 0xe0, 0xd5, 0x86, 0x08, 0xde, 0x8c, 0x2e, 0x3d, 0xce, 0xb0, - 0xad, 0x89, 0xc9, 0xd9, 0xde, 0x61, 0x8f, 0xe1, 0x2b, 0x0c, 0x1f, 0x50, 0x58, 0xb2, 0x52, 0x26, - 0x13, 0xaf, 0x02, 0xff, 0x97, 0xfc, 0xf9, 0x0e, 0x09, 0xb4, 0xf5, 0x74, 0x92, 0x58, 0x34, 0x4b, - 0x29, 0x30, 0x31, 0x5a, 0x24, 0x38, 0x31, 0x68, 0x6d, 0xac, 0x8d, 0x22, 0xc5, 0x6b, 0x46, 0x8b, - 0xb0, 0xa5, 0x34, 0x49, 0x55, 0xf8, 0x2c, 0x0c, 0x67, 0x72, 0x89, 0x53, 0x49, 0x23, 0xa3, 0xc5, - 0x48, 0x16, 0x84, 0xa6, 0xc8, 0x66, 0x9e, 0x9d, 0x6f, 0x59, 0xb5, 0x25, 0xba, 0x84, 0x8b, 0x67, - 0x69, 0xe9, 0x41, 0x90, 0x5c, 0x62, 0x7f, 0x43, 0x06, 0xf8, 0xb9, 0x40, 0x4b, 0xd1, 0x3d, 0x04, - 0xbf, 0x91, 0xd5, 0xaa, 0xb0, 0xc8, 0x3b, 0x00, 0xae, 0x66, 0x24, 0x73, 0x1b, 0xb0, 0x6e, 0xed, - 0xba, 0x39, 0x68, 0xba, 0x24, 0xcd, 0x6d, 0xef, 0x9b, 0xc1, 0x89, 0x3b, 0x91, 0x7a, 0x09, 0x9e, - 0xc2, 0xd1, 0x90, 0x32, 0x43, 0x2e, 0xe6, 0x57, 0xb1, 0xd7, 0x89, 0x2b, 0xa9, 0xbf, 0x39, 0x3c, - 0x2b, 0xe1, 0xb6, 0xe4, 0x5d, 0x45, 0x8d, 0xf5, 0x8a, 0x1d, 0x06, 0xac, 0xcb, 0xf8, 0x2b, 0x9c, - 0xee, 0x8a, 0xf1, 0x76, 0x6c, 0xb4, 0x88, 0xf7, 0x8c, 0x12, 0x76, 0xf6, 0x50, 0x37, 0x4d, 0x54, - 0x5f, 0xaf, 0xd8, 0x41, 0x83, 0xf5, 0xbe, 0x18, 0xb4, 0x1c, 0x7a, 0xca, 0x8a, 0x7c, 0x86, 0x86, - 0xa7, 0x70, 0xfc, 0xa2, 0xf3, 0x8c, 0x70, 0x48, 0x06, 0xb3, 0x39, 0x6f, 0x97, 0x66, 0xd5, 0xf8, - 0x4f, 0xef, 0x4d, 0x79, 0xc0, 0x78, 0x1f, 0x60, 0x48, 0x4a, 0x7b, 0xdf, 0xb0, 0x32, 0xff, 0x36, - 0xfc, 0x4f, 0xcd, 0xe3, 0xed, 0xdb, 0xcd, 0x44, 0xd2, 0xc7, 0x62, 0x1c, 0x0b, 0x35, 0x4f, 0xfc, - 0xc6, 0x72, 0xdd, 0xf9, 0x31, 0xe3, 0xfa, 0xe6, 0x95, 0xef, 0x7e, 0x02, 0x00, 0x00, 0xff, 0xff, - 0xb8, 0xd5, 0x8e, 0x13, 0x4b, 0x02, 0x00, 0x00, -} diff --git a/pkg/service/rpc/egress_client.go b/pkg/service/rpc/egress_client.go deleted file mode 100644 index eba43b4c4..000000000 --- a/pkg/service/rpc/egress_client.go +++ /dev/null @@ -1,36 +0,0 @@ -package rpc - -import ( - "github.com/livekit/protocol/livekit" - "github.com/livekit/psrpc" -) - -type EgressClient interface { - EgressInternalClient - EgressHandlerClient -} - -type egressClient struct { - EgressInternalClient - EgressHandlerClient -} - -func NewEgressClient(nodeID livekit.NodeID, bus psrpc.MessageBus) (EgressClient, error) { - if bus == nil { - return nil, nil - } - - clientID := string(nodeID) - internalClient, err := NewEgressInternalClient(clientID, bus) - if err != nil { - return nil, err - } - handlerClient, err := NewEgressHandlerClient(clientID, bus) - if err != nil { - return nil, err - } - return &egressClient{ - EgressInternalClient: internalClient, - EgressHandlerClient: handlerClient, - }, nil -} diff --git a/pkg/service/rpc/ingress.pb.go b/pkg/service/rpc/ingress.pb.go deleted file mode 100644 index 85e9d4362..000000000 --- a/pkg/service/rpc/ingress.pb.go +++ /dev/null @@ -1,230 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.27.1 -// protoc v3.21.6 -// source: pkg/service/rpc/ingress.proto - -package rpc - -import ( - livekit "github.com/livekit/protocol/livekit" - _ "github.com/livekit/psrpc/protoc-gen-psrpc/options" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type ListActiveIngressRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *ListActiveIngressRequest) Reset() { - *x = ListActiveIngressRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_pkg_service_rpc_ingress_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ListActiveIngressRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ListActiveIngressRequest) ProtoMessage() {} - -func (x *ListActiveIngressRequest) ProtoReflect() protoreflect.Message { - mi := &file_pkg_service_rpc_ingress_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ListActiveIngressRequest.ProtoReflect.Descriptor instead. -func (*ListActiveIngressRequest) Descriptor() ([]byte, []int) { - return file_pkg_service_rpc_ingress_proto_rawDescGZIP(), []int{0} -} - -type ListActiveIngressResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - IngressIds []string `protobuf:"bytes,1,rep,name=ingress_ids,json=ingressIds,proto3" json:"ingress_ids,omitempty"` -} - -func (x *ListActiveIngressResponse) Reset() { - *x = ListActiveIngressResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_pkg_service_rpc_ingress_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ListActiveIngressResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ListActiveIngressResponse) ProtoMessage() {} - -func (x *ListActiveIngressResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_service_rpc_ingress_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ListActiveIngressResponse.ProtoReflect.Descriptor instead. -func (*ListActiveIngressResponse) Descriptor() ([]byte, []int) { - return file_pkg_service_rpc_ingress_proto_rawDescGZIP(), []int{1} -} - -func (x *ListActiveIngressResponse) GetIngressIds() []string { - if x != nil { - return x.IngressIds - } - return nil -} - -var File_pkg_service_rpc_ingress_proto protoreflect.FileDescriptor - -var file_pkg_service_rpc_ingress_proto_rawDesc = []byte{ - 0x0a, 0x1d, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, 0x70, - 0x63, 0x2f, 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, - 0x03, 0x72, 0x70, 0x63, 0x1a, 0x0d, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x1a, 0x15, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x69, 0x6e, 0x67, - 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x1a, 0x0a, 0x18, 0x4c, 0x69, - 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x3c, 0x0a, 0x19, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, - 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x5f, 0x69, - 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0a, 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73, - 0x73, 0x49, 0x64, 0x73, 0x32, 0x6d, 0x0a, 0x0f, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, - 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x12, 0x5a, 0x0a, 0x11, 0x4c, 0x69, 0x73, 0x74, 0x41, - 0x63, 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1d, 0x2e, 0x72, - 0x70, 0x63, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, - 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x72, 0x70, - 0x63, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x72, - 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xb2, 0x89, 0x01, - 0x02, 0x08, 0x01, 0x32, 0xae, 0x01, 0x0a, 0x0e, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x48, - 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x4d, 0x0a, 0x0d, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, - 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1d, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, - 0x74, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, - 0x2e, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x74, 0x61, 0x74, 0x65, 0x22, 0x06, 0xb2, - 0x89, 0x01, 0x02, 0x18, 0x01, 0x12, 0x4d, 0x0a, 0x0d, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x49, - 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1d, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, - 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, - 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x74, 0x61, 0x74, 0x65, 0x22, 0x06, 0xb2, 0x89, - 0x01, 0x02, 0x18, 0x01, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, - 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, - 0x69, 0x74, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, - 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_pkg_service_rpc_ingress_proto_rawDescOnce sync.Once - file_pkg_service_rpc_ingress_proto_rawDescData = file_pkg_service_rpc_ingress_proto_rawDesc -) - -func file_pkg_service_rpc_ingress_proto_rawDescGZIP() []byte { - file_pkg_service_rpc_ingress_proto_rawDescOnce.Do(func() { - file_pkg_service_rpc_ingress_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_service_rpc_ingress_proto_rawDescData) - }) - return file_pkg_service_rpc_ingress_proto_rawDescData -} - -var file_pkg_service_rpc_ingress_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_pkg_service_rpc_ingress_proto_goTypes = []interface{}{ - (*ListActiveIngressRequest)(nil), // 0: rpc.ListActiveIngressRequest - (*ListActiveIngressResponse)(nil), // 1: rpc.ListActiveIngressResponse - (*livekit.UpdateIngressRequest)(nil), // 2: livekit.UpdateIngressRequest - (*livekit.DeleteIngressRequest)(nil), // 3: livekit.DeleteIngressRequest - (*livekit.IngressState)(nil), // 4: livekit.IngressState -} -var file_pkg_service_rpc_ingress_proto_depIdxs = []int32{ - 0, // 0: rpc.IngressInternal.ListActiveIngress:input_type -> rpc.ListActiveIngressRequest - 2, // 1: rpc.IngressHandler.UpdateIngress:input_type -> livekit.UpdateIngressRequest - 3, // 2: rpc.IngressHandler.DeleteIngress:input_type -> livekit.DeleteIngressRequest - 1, // 3: rpc.IngressInternal.ListActiveIngress:output_type -> rpc.ListActiveIngressResponse - 4, // 4: rpc.IngressHandler.UpdateIngress:output_type -> livekit.IngressState - 4, // 5: rpc.IngressHandler.DeleteIngress:output_type -> livekit.IngressState - 3, // [3:6] is the sub-list for method output_type - 0, // [0:3] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_pkg_service_rpc_ingress_proto_init() } -func file_pkg_service_rpc_ingress_proto_init() { - if File_pkg_service_rpc_ingress_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_pkg_service_rpc_ingress_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListActiveIngressRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkg_service_rpc_ingress_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListActiveIngressResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_pkg_service_rpc_ingress_proto_rawDesc, - NumEnums: 0, - NumMessages: 2, - NumExtensions: 0, - NumServices: 2, - }, - GoTypes: file_pkg_service_rpc_ingress_proto_goTypes, - DependencyIndexes: file_pkg_service_rpc_ingress_proto_depIdxs, - MessageInfos: file_pkg_service_rpc_ingress_proto_msgTypes, - }.Build() - File_pkg_service_rpc_ingress_proto = out.File - file_pkg_service_rpc_ingress_proto_rawDesc = nil - file_pkg_service_rpc_ingress_proto_goTypes = nil - file_pkg_service_rpc_ingress_proto_depIdxs = nil -} diff --git a/pkg/service/rpc/ingress.proto b/pkg/service/rpc/ingress.proto deleted file mode 100644 index 6648f282c..000000000 --- a/pkg/service/rpc/ingress.proto +++ /dev/null @@ -1,29 +0,0 @@ -syntax = "proto3"; - -package rpc; - -option go_package = "github.com/livekit/livekit/pkg/service/rpc"; - -import "options.proto"; -import "livekit_ingress.proto"; - -service IngressInternal { - rpc ListActiveIngress(ListActiveIngressRequest) returns (ListActiveIngressResponse) { - option (psrpc.options).multi = true; - }; -} - -service IngressHandler { - rpc UpdateIngress(livekit.UpdateIngressRequest) returns (livekit.IngressState) { - option (psrpc.options).topics = true; - }; - rpc DeleteIngress(livekit.DeleteIngressRequest) returns (livekit.IngressState) { - option (psrpc.options).topics = true; - }; -} - -message ListActiveIngressRequest {} - -message ListActiveIngressResponse { - repeated string ingress_ids = 1; -} diff --git a/pkg/service/rpc/ingress.psrpc.go b/pkg/service/rpc/ingress.psrpc.go deleted file mode 100644 index d9c278b69..000000000 --- a/pkg/service/rpc/ingress.psrpc.go +++ /dev/null @@ -1,229 +0,0 @@ -// Code generated by protoc-gen-psrpc v0.2.5, DO NOT EDIT. -// source: pkg/service/rpc/ingress.proto - -package rpc - -import context "context" -import psrpc "github.com/livekit/psrpc" -import version "github.com/livekit/psrpc/version" -import livekit2 "github.com/livekit/protocol/livekit" - -var _ = version.PsrpcVersion_0_2_5 - -// ================================ -// IngressInternal Client Interface -// ================================ - -type IngressInternalClient interface { - ListActiveIngress(context.Context, *ListActiveIngressRequest, ...psrpc.RequestOption) (<-chan *psrpc.Response[*ListActiveIngressResponse], error) -} - -// ==================================== -// IngressInternal ServerImpl Interface -// ==================================== - -type IngressInternalServerImpl interface { - ListActiveIngress(context.Context, *ListActiveIngressRequest) (*ListActiveIngressResponse, error) -} - -// ================================ -// IngressInternal Server Interface -// ================================ - -type IngressInternalServer interface { - // Close and wait for pending RPCs to complete - Shutdown() - - // Close immediately, without waiting for pending RPCs - Kill() -} - -// ====================== -// IngressInternal Client -// ====================== - -type ingressInternalClient struct { - client *psrpc.RPCClient -} - -// NewIngressInternalClient creates a psrpc client that implements the IngressInternalClient interface. -func NewIngressInternalClient(clientID string, bus psrpc.MessageBus, opts ...psrpc.ClientOption) (IngressInternalClient, error) { - rpcClient, err := psrpc.NewRPCClient("IngressInternal", clientID, bus, opts...) - if err != nil { - return nil, err - } - - return &ingressInternalClient{ - client: rpcClient, - }, nil -} - -func (c *ingressInternalClient) ListActiveIngress(ctx context.Context, req *ListActiveIngressRequest, opts ...psrpc.RequestOption) (<-chan *psrpc.Response[*ListActiveIngressResponse], error) { - return psrpc.RequestMulti[*ListActiveIngressResponse](ctx, c.client, "ListActiveIngress", "", req, opts...) -} - -// ====================== -// IngressInternal Server -// ====================== - -type ingressInternalServer struct { - svc IngressInternalServerImpl - rpc *psrpc.RPCServer -} - -// NewIngressInternalServer builds a RPCServer that will route requests -// to the corresponding method in the provided svc implementation. -func NewIngressInternalServer(serverID string, svc IngressInternalServerImpl, bus psrpc.MessageBus, opts ...psrpc.ServerOption) (IngressInternalServer, error) { - s := psrpc.NewRPCServer("IngressInternal", serverID, bus, opts...) - - var err error - err = psrpc.RegisterHandler(s, "ListActiveIngress", "", svc.ListActiveIngress, nil) - if err != nil { - s.Close(false) - return nil, err - } - - return &ingressInternalServer{ - svc: svc, - rpc: s, - }, nil -} - -func (s *ingressInternalServer) Shutdown() { - s.rpc.Close(false) -} - -func (s *ingressInternalServer) Kill() { - s.rpc.Close(true) -} - -// =============================== -// IngressHandler Client Interface -// =============================== - -type IngressHandlerClient interface { - UpdateIngress(context.Context, string, *livekit2.UpdateIngressRequest, ...psrpc.RequestOption) (*livekit2.IngressState, error) - - DeleteIngress(context.Context, string, *livekit2.DeleteIngressRequest, ...psrpc.RequestOption) (*livekit2.IngressState, error) -} - -// =================================== -// IngressHandler ServerImpl Interface -// =================================== - -type IngressHandlerServerImpl interface { - UpdateIngress(context.Context, *livekit2.UpdateIngressRequest) (*livekit2.IngressState, error) - - DeleteIngress(context.Context, *livekit2.DeleteIngressRequest) (*livekit2.IngressState, error) -} - -// =============================== -// IngressHandler Server Interface -// =============================== - -type IngressHandlerServer interface { - RegisterUpdateIngressTopic(string) error - DeregisterUpdateIngressTopic(string) - - RegisterDeleteIngressTopic(string) error - DeregisterDeleteIngressTopic(string) - - // Close and wait for pending RPCs to complete - Shutdown() - - // Close immediately, without waiting for pending RPCs - Kill() -} - -// ===================== -// IngressHandler Client -// ===================== - -type ingressHandlerClient struct { - client *psrpc.RPCClient -} - -// NewIngressHandlerClient creates a psrpc client that implements the IngressHandlerClient interface. -func NewIngressHandlerClient(clientID string, bus psrpc.MessageBus, opts ...psrpc.ClientOption) (IngressHandlerClient, error) { - rpcClient, err := psrpc.NewRPCClient("IngressHandler", clientID, bus, opts...) - if err != nil { - return nil, err - } - - return &ingressHandlerClient{ - client: rpcClient, - }, nil -} - -func (c *ingressHandlerClient) UpdateIngress(ctx context.Context, topic string, req *livekit2.UpdateIngressRequest, opts ...psrpc.RequestOption) (*livekit2.IngressState, error) { - return psrpc.RequestSingle[*livekit2.IngressState](ctx, c.client, "UpdateIngress", topic, req, opts...) -} - -func (c *ingressHandlerClient) DeleteIngress(ctx context.Context, topic string, req *livekit2.DeleteIngressRequest, opts ...psrpc.RequestOption) (*livekit2.IngressState, error) { - return psrpc.RequestSingle[*livekit2.IngressState](ctx, c.client, "DeleteIngress", topic, req, opts...) -} - -// ===================== -// IngressHandler Server -// ===================== - -type ingressHandlerServer struct { - svc IngressHandlerServerImpl - rpc *psrpc.RPCServer -} - -// NewIngressHandlerServer builds a RPCServer that will route requests -// to the corresponding method in the provided svc implementation. -func NewIngressHandlerServer(serverID string, svc IngressHandlerServerImpl, bus psrpc.MessageBus, opts ...psrpc.ServerOption) (IngressHandlerServer, error) { - s := psrpc.NewRPCServer("IngressHandler", serverID, bus, opts...) - - return &ingressHandlerServer{ - svc: svc, - rpc: s, - }, nil -} - -func (s *ingressHandlerServer) RegisterUpdateIngressTopic(topic string) error { - return psrpc.RegisterHandler(s.rpc, "UpdateIngress", topic, s.svc.UpdateIngress, nil) -} - -func (s *ingressHandlerServer) DeregisterUpdateIngressTopic(topic string) { - s.rpc.DeregisterHandler("UpdateIngress", topic) -} - -func (s *ingressHandlerServer) RegisterDeleteIngressTopic(topic string) error { - return psrpc.RegisterHandler(s.rpc, "DeleteIngress", topic, s.svc.DeleteIngress, nil) -} - -func (s *ingressHandlerServer) DeregisterDeleteIngressTopic(topic string) { - s.rpc.DeregisterHandler("DeleteIngress", topic) -} - -func (s *ingressHandlerServer) Shutdown() { - s.rpc.Close(false) -} - -func (s *ingressHandlerServer) Kill() { - s.rpc.Close(true) -} - -var psrpcFileDescriptor1 = []byte{ - // 269 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0xc1, 0x4a, 0xc3, 0x40, - 0x10, 0x86, 0x89, 0x85, 0xa2, 0x2b, 0x55, 0x5c, 0x28, 0xc4, 0x40, 0x55, 0x72, 0x12, 0x91, 0x0d, - 0xd4, 0xab, 0x17, 0xc5, 0x83, 0x01, 0xbd, 0x54, 0xbc, 0xf4, 0x52, 0xd2, 0xcd, 0x10, 0x97, 0xa6, - 0xbb, 0xeb, 0xce, 0x34, 0xef, 0xe0, 0xcb, 0x78, 0xf0, 0x09, 0x45, 0x3b, 0x16, 0xa3, 0x2d, 0xf4, - 0x14, 0xf8, 0xbf, 0xc9, 0xf7, 0x0f, 0x3b, 0x62, 0xe0, 0x67, 0x55, 0x86, 0x10, 0x1a, 0xa3, 0x21, - 0x0b, 0x5e, 0x67, 0xc6, 0x56, 0x01, 0x10, 0x95, 0x0f, 0x8e, 0x9c, 0xec, 0x04, 0xaf, 0x93, 0x9e, - 0xf3, 0x64, 0x9c, 0xe5, 0x2c, 0xe9, 0xd7, 0xa6, 0x81, 0x99, 0xa1, 0x49, 0x6b, 0x34, 0x4d, 0x44, - 0xfc, 0x60, 0x90, 0x6e, 0x34, 0x99, 0x06, 0xf2, 0x25, 0x1a, 0xc1, 0xeb, 0x02, 0x90, 0xd2, 0x6b, - 0x71, 0xbc, 0x86, 0xa1, 0x77, 0x16, 0x41, 0x9e, 0x8a, 0x7d, 0x36, 0x4d, 0x4c, 0x89, 0x71, 0x74, - 0xd6, 0x39, 0xdf, 0x1b, 0x09, 0x8e, 0xf2, 0x12, 0x87, 0x73, 0x71, 0xc8, 0xff, 0xe4, 0x96, 0x20, - 0xd8, 0xa2, 0x96, 0x63, 0x71, 0xf4, 0x4f, 0x28, 0x07, 0x2a, 0x78, 0xad, 0x36, 0x2d, 0x91, 0x9c, - 0x6c, 0xc2, 0xcb, 0x3d, 0xd2, 0xee, 0xc7, 0x5b, 0xb4, 0xb3, 0x1b, 0x0d, 0xdf, 0x23, 0x71, 0xc0, - 0xec, 0xbe, 0xb0, 0x65, 0x0d, 0x41, 0x3e, 0x8a, 0xde, 0xb3, 0x2f, 0x0b, 0xfa, 0x55, 0xc5, 0x8f, - 0xa0, 0x5a, 0xf9, 0x4f, 0x55, 0x7f, 0x85, 0x19, 0x3c, 0x51, 0x41, 0xdc, 0x10, 0x47, 0x5f, 0xba, - 0x3b, 0xa8, 0x61, 0x9d, 0xae, 0x95, 0x6f, 0xa7, 0xbb, 0xbd, 0x1c, 0x5f, 0x54, 0x86, 0x5e, 0x16, - 0x53, 0xa5, 0xdd, 0x3c, 0xe3, 0xd1, 0xd5, 0xf7, 0xcf, 0x81, 0xa7, 0xdd, 0xef, 0x73, 0x5d, 0x7d, - 0x06, 0x00, 0x00, 0xff, 0xff, 0xff, 0xd2, 0xc1, 0x99, 0xfa, 0x01, 0x00, 0x00, -} diff --git a/pkg/service/rpc/ingress_client.go b/pkg/service/rpc/ingress_client.go deleted file mode 100644 index 7126a6c10..000000000 --- a/pkg/service/rpc/ingress_client.go +++ /dev/null @@ -1,36 +0,0 @@ -package rpc - -import ( - "github.com/livekit/protocol/livekit" - "github.com/livekit/psrpc" -) - -type IngressClient interface { - IngressInternalClient - IngressHandlerClient -} - -type ingressClient struct { - IngressInternalClient - IngressHandlerClient -} - -func NewIngressClient(nodeID livekit.NodeID, bus psrpc.MessageBus) (IngressClient, error) { - if bus == nil { - return nil, nil - } - - clientID := string(nodeID) - internalClient, err := NewIngressInternalClient(clientID, bus) - if err != nil { - return nil, err - } - handlerClient, err := NewIngressHandlerClient(clientID, bus) - if err != nil { - return nil, err - } - return &ingressClient{ - IngressInternalClient: internalClient, - IngressHandlerClient: handlerClient, - }, nil -} diff --git a/pkg/service/rpc/io.pb.go b/pkg/service/rpc/io.pb.go deleted file mode 100644 index f1010c748..000000000 --- a/pkg/service/rpc/io.pb.go +++ /dev/null @@ -1,98 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.27.1 -// protoc v3.21.6 -// source: pkg/service/rpc/io.proto - -package rpc - -import ( - livekit "github.com/livekit/protocol/livekit" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - emptypb "google.golang.org/protobuf/types/known/emptypb" - reflect "reflect" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -var File_pkg_service_rpc_io_proto protoreflect.FileDescriptor - -var file_pkg_service_rpc_io_proto_rawDesc = []byte{ - 0x0a, 0x18, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, 0x70, - 0x63, 0x2f, 0x69, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x72, 0x70, 0x63, 0x1a, - 0x14, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1a, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x72, - 0x70, 0x63, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0xee, - 0x01, 0x0a, 0x06, 0x49, 0x4f, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x3f, 0x0a, 0x10, 0x55, 0x70, 0x64, - 0x61, 0x74, 0x65, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x13, 0x2e, - 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, - 0x66, 0x6f, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x51, 0x0a, 0x0e, 0x47, 0x65, - 0x74, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1e, 0x2e, 0x6c, - 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, - 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6c, - 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, - 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x50, 0x0a, - 0x12, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x74, - 0x61, 0x74, 0x65, 0x12, 0x22, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x55, 0x70, - 0x64, 0x61, 0x74, 0x65, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x74, 0x61, 0x74, 0x65, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x42, - 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, - 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x70, 0x6b, - 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var file_pkg_service_rpc_io_proto_goTypes = []interface{}{ - (*livekit.EgressInfo)(nil), // 0: livekit.EgressInfo - (*livekit.GetIngressInfoRequest)(nil), // 1: livekit.GetIngressInfoRequest - (*livekit.UpdateIngressStateRequest)(nil), // 2: livekit.UpdateIngressStateRequest - (*emptypb.Empty)(nil), // 3: google.protobuf.Empty - (*livekit.GetIngressInfoResponse)(nil), // 4: livekit.GetIngressInfoResponse -} -var file_pkg_service_rpc_io_proto_depIdxs = []int32{ - 0, // 0: rpc.IOInfo.UpdateEgressInfo:input_type -> livekit.EgressInfo - 1, // 1: rpc.IOInfo.GetIngressInfo:input_type -> livekit.GetIngressInfoRequest - 2, // 2: rpc.IOInfo.UpdateIngressState:input_type -> livekit.UpdateIngressStateRequest - 3, // 3: rpc.IOInfo.UpdateEgressInfo:output_type -> google.protobuf.Empty - 4, // 4: rpc.IOInfo.GetIngressInfo:output_type -> livekit.GetIngressInfoResponse - 3, // 5: rpc.IOInfo.UpdateIngressState:output_type -> google.protobuf.Empty - 3, // [3:6] is the sub-list for method output_type - 0, // [0:3] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_pkg_service_rpc_io_proto_init() } -func file_pkg_service_rpc_io_proto_init() { - if File_pkg_service_rpc_io_proto != nil { - return - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_pkg_service_rpc_io_proto_rawDesc, - NumEnums: 0, - NumMessages: 0, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_pkg_service_rpc_io_proto_goTypes, - DependencyIndexes: file_pkg_service_rpc_io_proto_depIdxs, - }.Build() - File_pkg_service_rpc_io_proto = out.File - file_pkg_service_rpc_io_proto_rawDesc = nil - file_pkg_service_rpc_io_proto_goTypes = nil - file_pkg_service_rpc_io_proto_depIdxs = nil -} diff --git a/pkg/service/rpc/io.proto b/pkg/service/rpc/io.proto deleted file mode 100644 index 72c04ff2d..000000000 --- a/pkg/service/rpc/io.proto +++ /dev/null @@ -1,15 +0,0 @@ -syntax = "proto3"; - -package rpc; - -option go_package = "github.com/livekit/livekit/pkg/service/rpc"; - -import "livekit_egress.proto"; -import "livekit_rpc_internal.proto"; -import "google/protobuf/empty.proto"; - -service IOInfo { - rpc UpdateEgressInfo(livekit.EgressInfo) returns (google.protobuf.Empty); - rpc GetIngressInfo(livekit.GetIngressInfoRequest) returns (livekit.GetIngressInfoResponse); - rpc UpdateIngressState(livekit.UpdateIngressStateRequest) returns (google.protobuf.Empty); -} diff --git a/pkg/service/rpc/io.psrpc.go b/pkg/service/rpc/io.psrpc.go deleted file mode 100644 index 97ce84e57..000000000 --- a/pkg/service/rpc/io.psrpc.go +++ /dev/null @@ -1,147 +0,0 @@ -// Code generated by protoc-gen-psrpc v0.2.5, DO NOT EDIT. -// source: pkg/service/rpc/io.proto - -package rpc - -import context "context" -import psrpc "github.com/livekit/psrpc" -import version "github.com/livekit/psrpc/version" -import google_protobuf2 "google.golang.org/protobuf/types/known/emptypb" -import livekit "github.com/livekit/protocol/livekit" -import livekit3 "github.com/livekit/protocol/livekit" - -var _ = version.PsrpcVersion_0_2_5 - -// ======================= -// IOInfo Client Interface -// ======================= - -type IOInfoClient interface { - UpdateEgressInfo(context.Context, *livekit.EgressInfo, ...psrpc.RequestOption) (*google_protobuf2.Empty, error) - - GetIngressInfo(context.Context, *livekit3.GetIngressInfoRequest, ...psrpc.RequestOption) (*livekit3.GetIngressInfoResponse, error) - - UpdateIngressState(context.Context, *livekit3.UpdateIngressStateRequest, ...psrpc.RequestOption) (*google_protobuf2.Empty, error) -} - -// =========================== -// IOInfo ServerImpl Interface -// =========================== - -type IOInfoServerImpl interface { - UpdateEgressInfo(context.Context, *livekit.EgressInfo) (*google_protobuf2.Empty, error) - - GetIngressInfo(context.Context, *livekit3.GetIngressInfoRequest) (*livekit3.GetIngressInfoResponse, error) - - UpdateIngressState(context.Context, *livekit3.UpdateIngressStateRequest) (*google_protobuf2.Empty, error) -} - -// ======================= -// IOInfo Server Interface -// ======================= - -type IOInfoServer interface { - // Close and wait for pending RPCs to complete - Shutdown() - - // Close immediately, without waiting for pending RPCs - Kill() -} - -// ============= -// IOInfo Client -// ============= - -type iOInfoClient struct { - client *psrpc.RPCClient -} - -// NewIOInfoClient creates a psrpc client that implements the IOInfoClient interface. -func NewIOInfoClient(clientID string, bus psrpc.MessageBus, opts ...psrpc.ClientOption) (IOInfoClient, error) { - rpcClient, err := psrpc.NewRPCClient("IOInfo", clientID, bus, opts...) - if err != nil { - return nil, err - } - - return &iOInfoClient{ - client: rpcClient, - }, nil -} - -func (c *iOInfoClient) UpdateEgressInfo(ctx context.Context, req *livekit.EgressInfo, opts ...psrpc.RequestOption) (*google_protobuf2.Empty, error) { - return psrpc.RequestSingle[*google_protobuf2.Empty](ctx, c.client, "UpdateEgressInfo", "", req, opts...) -} - -func (c *iOInfoClient) GetIngressInfo(ctx context.Context, req *livekit3.GetIngressInfoRequest, opts ...psrpc.RequestOption) (*livekit3.GetIngressInfoResponse, error) { - return psrpc.RequestSingle[*livekit3.GetIngressInfoResponse](ctx, c.client, "GetIngressInfo", "", req, opts...) -} - -func (c *iOInfoClient) UpdateIngressState(ctx context.Context, req *livekit3.UpdateIngressStateRequest, opts ...psrpc.RequestOption) (*google_protobuf2.Empty, error) { - return psrpc.RequestSingle[*google_protobuf2.Empty](ctx, c.client, "UpdateIngressState", "", req, opts...) -} - -// ============= -// IOInfo Server -// ============= - -type iOInfoServer struct { - svc IOInfoServerImpl - rpc *psrpc.RPCServer -} - -// NewIOInfoServer builds a RPCServer that will route requests -// to the corresponding method in the provided svc implementation. -func NewIOInfoServer(serverID string, svc IOInfoServerImpl, bus psrpc.MessageBus, opts ...psrpc.ServerOption) (IOInfoServer, error) { - s := psrpc.NewRPCServer("IOInfo", serverID, bus, opts...) - - var err error - err = psrpc.RegisterHandler(s, "UpdateEgressInfo", "", svc.UpdateEgressInfo, nil) - if err != nil { - s.Close(false) - return nil, err - } - - err = psrpc.RegisterHandler(s, "GetIngressInfo", "", svc.GetIngressInfo, nil) - if err != nil { - s.Close(false) - return nil, err - } - - err = psrpc.RegisterHandler(s, "UpdateIngressState", "", svc.UpdateIngressState, nil) - if err != nil { - s.Close(false) - return nil, err - } - - return &iOInfoServer{ - svc: svc, - rpc: s, - }, nil -} - -func (s *iOInfoServer) Shutdown() { - s.rpc.Close(false) -} - -func (s *iOInfoServer) Kill() { - s.rpc.Close(true) -} - -var psrpcFileDescriptor2 = []byte{ - // 236 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x90, 0xcd, 0x4a, 0xc4, 0x30, - 0x14, 0x85, 0x11, 0x61, 0x16, 0x59, 0x88, 0x44, 0x11, 0x89, 0xa0, 0xe0, 0x52, 0x24, 0x01, 0x7d, - 0x00, 0x41, 0x18, 0xa4, 0x2b, 0xff, 0x70, 0xe3, 0x66, 0x68, 0xe3, 0x9d, 0x18, 0xa6, 0x93, 0x7b, - 0x4d, 0x6e, 0x07, 0x7c, 0x69, 0x9f, 0x41, 0x6c, 0xd2, 0x0e, 0x2a, 0x5d, 0x85, 0x9c, 0x8f, 0xf3, - 0x71, 0xb8, 0xe2, 0x98, 0x56, 0xce, 0x24, 0x88, 0x1b, 0x6f, 0xc1, 0x44, 0xb2, 0xc6, 0xa3, 0xa6, - 0x88, 0x8c, 0x72, 0x37, 0x92, 0x55, 0x87, 0xad, 0xdf, 0xc0, 0xca, 0xf3, 0x02, 0x5c, 0x84, 0x94, - 0x32, 0x52, 0x6a, 0x48, 0x23, 0xd9, 0x85, 0x0f, 0x0c, 0x31, 0xd4, 0x6d, 0x61, 0x27, 0x0e, 0xd1, - 0xb5, 0x60, 0xfa, 0x5f, 0xd3, 0x2d, 0x0d, 0xac, 0x89, 0x3f, 0x33, 0xbc, 0xfa, 0xda, 0x11, 0xb3, - 0xea, 0xbe, 0x0a, 0x4b, 0x94, 0x37, 0x62, 0xff, 0x85, 0xde, 0x6a, 0x86, 0x79, 0x6f, 0xee, 0xb3, - 0x03, 0x5d, 0xc4, 0x7a, 0x1b, 0xaa, 0x23, 0x9d, 0x8d, 0x7a, 0x30, 0xea, 0xf9, 0x8f, 0x51, 0x3e, - 0x8a, 0xbd, 0x3b, 0xe0, 0x2a, 0x6c, 0xeb, 0xa7, 0x63, 0xfd, 0x37, 0x78, 0x82, 0x8f, 0x0e, 0x12, - 0xab, 0xb3, 0x49, 0x9e, 0x08, 0x43, 0x02, 0xf9, 0x20, 0x64, 0xde, 0x54, 0xe0, 0x33, 0xd7, 0x0c, - 0xf2, 0x7c, 0xac, 0xfd, 0x87, 0x83, 0x7a, 0x62, 0xe4, 0xed, 0xe5, 0xeb, 0x85, 0xf3, 0xfc, 0xde, - 0x35, 0xda, 0xe2, 0xda, 0x14, 0xcf, 0xf8, 0xfe, 0xb9, 0x7d, 0x33, 0xeb, 0xdb, 0xd7, 0xdf, 0x01, - 0x00, 0x00, 0xff, 0xff, 0x87, 0x58, 0x4d, 0x3b, 0x95, 0x01, 0x00, 0x00, -} diff --git a/pkg/service/rpc/race.go b/pkg/service/rpc/race.go deleted file mode 100644 index 5c4825156..000000000 --- a/pkg/service/rpc/race.go +++ /dev/null @@ -1,63 +0,0 @@ -package rpc - -import ( - "context" - "sync" -) - -type raceResult[T any] struct { - i int - val *T - err error -} - -type Race[T any] struct { - ctx context.Context - cancel context.CancelFunc - nextIndex int - - resultLock sync.Mutex - result *raceResult[T] -} - -// NewRace creates a race to yield the result from one or more candidate -// functions -func NewRace[T any](ctx context.Context) *Race[T] { - ctx, cancel := context.WithCancel(ctx) - return &Race[T]{ - ctx: ctx, - cancel: cancel, - } -} - -// Go adds a candidate function to the race by running it in a new goroutine -func (r *Race[T]) Go(fn func(ctx context.Context) (*T, error)) { - i := r.nextIndex - r.nextIndex++ - - go func() { - val, err := fn(r.ctx) - - r.resultLock.Lock() - if r.result == nil { - r.result = &raceResult[T]{i, val, err} - } - r.resultLock.Unlock() - - r.cancel() - }() -} - -// Wait awaits the first complete function and returns the index and results -// or -1 if the context is cancelled before any candidate finishes. -func (r *Race[T]) Wait() (int, *T, error) { - <-r.ctx.Done() - - r.resultLock.Lock() - res := r.result - r.resultLock.Unlock() - if res != nil { - return res.i, res.val, res.err - } - return -1, nil, r.ctx.Err() -} diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 5eec5d6ef..24e718c63 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -16,7 +16,6 @@ import ( "github.com/livekit/livekit-server/pkg/clientconfiguration" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" - "github.com/livekit/livekit-server/pkg/service/rpc" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/auth" @@ -24,6 +23,7 @@ import ( "github.com/livekit/protocol/ingress" "github.com/livekit/protocol/livekit" redisLiveKit "github.com/livekit/protocol/redis" + "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/webhook" "github.com/livekit/psrpc" ) diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 07e3e9bf7..986ce89c7 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -11,7 +11,6 @@ import ( "github.com/livekit/livekit-server/pkg/clientconfiguration" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" - "github.com/livekit/livekit-server/pkg/service/rpc" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/auth" @@ -19,6 +18,7 @@ import ( "github.com/livekit/protocol/ingress" "github.com/livekit/protocol/livekit" redis2 "github.com/livekit/protocol/redis" + "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/webhook" "github.com/livekit/psrpc" "github.com/pion/turn/v2"