From d8edb9b2e726d7e8c60843ea2bb1df6f90992ec2 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Wed, 31 Aug 2022 14:14:40 -0700 Subject: [PATCH] Adopt Ingress RPC interface changes (#972) --- go.mod | 4 ++-- go.sum | 9 ++++----- pkg/service/ingress.go | 30 +++++++++++++----------------- pkg/service/wire.go | 10 ++++++++++ pkg/service/wire_gen.go | 17 ++++++++++++++++- 5 files changed, 45 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index c9f0b50ce..6298b6a74 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/hashicorp/go-version v1.6.0 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v1.0.2-0.20220824112019-a1c2809ddc67 + github.com/livekit/protocol v1.0.2-0.20220831180559-284d6b27297a github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 github.com/mackerelio/go-osstat v0.2.3 github.com/magefile/mage v1.13.0 @@ -92,7 +92,7 @@ require ( golang.org/x/tools v0.1.10 // indirect golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 // indirect - google.golang.org/grpc v1.48.0 // indirect + google.golang.org/grpc v1.49.0 // indirect gopkg.in/square/go-jose.v2 v2.6.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index d3a7db3b3..b0ea47318 100644 --- a/go.sum +++ b/go.sum @@ -240,8 +240,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v1.0.2-0.20220824112019-a1c2809ddc67 h1:mh2hHoW2nMaqrEXPCy0G8utMTv8AbqFvzpetJ4YzYzw= -github.com/livekit/protocol v1.0.2-0.20220824112019-a1c2809ddc67/go.mod h1:M+JZ29cEbUozppVg3LlDC9rZPE5d218Dft3h1qBP1qM= +github.com/livekit/protocol v1.0.2-0.20220831180559-284d6b27297a h1:UY0lVZdEixOguIApMxWRqLEYj7tSJDJguvzhJaPtl/Y= +github.com/livekit/protocol v1.0.2-0.20220831180559-284d6b27297a/go.mod h1:ykRtMmaq4blqGyLWWPtYkB/74JYsyK7N2DAXLGnfSa4= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U= github.com/mackerelio/go-osstat v0.2.3 h1:jAMXD5erlDE39kdX2CU7YwCGRcxIO33u/p8+Fhe5dJw= @@ -431,7 +431,6 @@ go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= -go.uber.org/zap v1.22.0/go.mod h1:H4siCOZOrAolnUPJEkfaSjDqyP+BDS0DdDWzwcgt3+U= go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -756,8 +755,8 @@ google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= -google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w= -google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= +google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw= +google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/pkg/service/ingress.go b/pkg/service/ingress.go index dd0934e40..757c460eb 100644 --- a/pkg/service/ingress.go +++ b/pkg/service/ingress.go @@ -17,7 +17,7 @@ import ( type IngressService struct { conf *config.IngressConfig - rpc ingress.RPC + rpcClient ingress.RPCClient store IngressStore roomService livekit.RoomService telemetry telemetry.TelemetryService @@ -25,16 +25,16 @@ type IngressService struct { } func NewIngressService( - conf *config.Config, - rpc ingress.RPC, + conf *config.IngressConfig, + rpcClient ingress.RPCClient, store IngressStore, rs livekit.RoomService, ts telemetry.TelemetryService, ) *IngressService { return &IngressService{ - conf: &conf.Ingress, - rpc: rpc, + conf: conf, + rpcClient: rpcClient, store: store, roomService: rs, telemetry: ts, @@ -43,7 +43,7 @@ func NewIngressService( } func (s *IngressService) Start() { - if s.rpc != nil { + if s.rpcClient != nil { go s.updateWorker() go s.entitiesWorker() } @@ -98,7 +98,7 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI return nil, twirpAuthError(ErrPermissionDenied) } - if s.rpc == nil { + if s.rpcClient == nil { return nil, ErrIngressNotConnected } @@ -135,7 +135,7 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI case livekit.IngressState_ENDPOINT_BUFFERING, livekit.IngressState_ENDPOINT_PUBLISHING: - info, err = s.rpc.SendRequest(ctx, &livekit.IngressRequest{ + info, err = s.rpcClient.SendRequest(ctx, &livekit.IngressRequest{ IngressId: req.IngressId, Request: &livekit.IngressRequest_Update{Update: req}, }) @@ -163,10 +163,6 @@ func (s *IngressService) ListIngress(ctx context.Context, req *livekit.ListIngre return nil, twirpAuthError(ErrPermissionDenied) } - if s.rpc == nil { - return nil, ErrIngressNotConnected - } - infos, err := s.store.ListIngress(ctx, livekit.RoomName(req.RoomName)) if err != nil { logger.Errorw("could not list ingress info", err) @@ -181,7 +177,7 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI return nil, twirpAuthError(err) } - if s.rpc == nil { + if s.rpcClient == nil { return nil, ErrIngressNotConnected } @@ -193,7 +189,7 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI switch info.State.Status { case livekit.IngressState_ENDPOINT_BUFFERING, livekit.IngressState_ENDPOINT_PUBLISHING: - info, err = s.rpc.SendRequest(ctx, &livekit.IngressRequest{ + info, err = s.rpcClient.SendRequest(ctx, &livekit.IngressRequest{ IngressId: req.IngressId, Request: &livekit.IngressRequest_Delete{Delete: req}, }) @@ -214,7 +210,7 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI } func (s *IngressService) updateWorker() { - sub, err := s.rpc.GetUpdateChannel(context.Background()) + sub, err := s.rpcClient.GetUpdateChannel(context.Background()) if err != nil { logger.Errorw("failed to subscribe to results channel", err) return @@ -246,7 +242,7 @@ func (s *IngressService) updateWorker() { } func (s *IngressService) entitiesWorker() { - sub, err := s.rpc.GetEntityChannel(context.Background()) + sub, err := s.rpcClient.GetEntityChannel(context.Background()) if err != nil { logger.Errorw("failed to subscribe to entities channel", err) return @@ -273,7 +269,7 @@ func (s *IngressService) entitiesWorker() { } else { err = errors.New("request needs to specity either IngressId or StreamKey") } - err = s.rpc.SendResponse(context.Background(), req, info, err) + err = s.rpcClient.SendGetIngressInfoResponse(context.Background(), req, &livekit.GetIngressInfoResponse{Info: info}, err) if err != nil { logger.Errorw("could not send response", err) } diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 836d796c6..00a9939b0 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -48,6 +48,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live NewEgressService, ingress.NewRedisRPC, getIngressStore, + getIngressConfig, + getIngressRPCClient, NewIngressService, NewRoomAllocator, NewRoomService, @@ -189,6 +191,14 @@ func getIngressStore(s ObjectStore) IngressStore { } } +func getIngressConfig(conf *config.Config) *config.IngressConfig { + return &conf.Ingress +} + +func getIngressRPCClient(rpc ingress.RPC) ingress.RPCClient { + return rpc +} + func createClientConfiguration() clientconfiguration.ClientConfigurationManager { return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations) } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index dc3ec335f..274765335 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -63,9 +63,12 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live analyticsService := telemetry.NewAnalyticsService(conf, currentNode) telemetryService := telemetry.NewTelemetryService(notifier, analyticsService) egressService := NewEgressService(rpcClient, objectStore, egressStore, roomService, telemetryService) + ingressConfig := getIngressConfig(conf) rpc := ingress.NewRedisRPC(nodeID, client) + ingressRPCClient := getIngressRPCClient(rpc) + rpcServer := getIngressRPCServer(rpc) ingressStore := getIngressStore(objectStore) - ingressService := NewIngressService(conf, rpc, ingressStore, roomService, telemetryService) + ingressService := NewIngressService(ingressConfig, ingressRPCClient, rpcServer, ingressStore, roomService, telemetryService) rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode) clientConfigurationManager := createClientConfiguration() roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager) @@ -215,6 +218,18 @@ func getIngressStore(s ObjectStore) IngressStore { } } +func getIngressConfig(conf *config.Config) *config.IngressConfig { + return &conf.Ingress +} + +func getIngressRPCClient(rpc ingress.RPC) ingress.RPCClient { + return rpc +} + +func getIngressRPCServer(rpc ingress.RPC) ingress.RPCServer { + return rpc +} + func createClientConfiguration() clientconfiguration.ClientConfigurationManager { return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations) }