Adopt Ingress RPC interface changes (#972)

This commit is contained in:
Benjamin Pracht
2022-08-31 14:14:40 -07:00
committed by GitHub
parent c401ca58af
commit d8edb9b2e7
5 changed files with 45 additions and 25 deletions

4
go.mod
View File

@@ -16,7 +16,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/go-version v1.6.0
github.com/hashicorp/golang-lru v0.5.4
github.com/livekit/protocol v1.0.2-0.20220824112019-a1c2809ddc67
github.com/livekit/protocol v1.0.2-0.20220831180559-284d6b27297a
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995
github.com/mackerelio/go-osstat v0.2.3
github.com/magefile/mage v1.13.0
@@ -92,7 +92,7 @@ require (
golang.org/x/tools v0.1.10 // indirect
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 // indirect
google.golang.org/grpc v1.48.0 // indirect
google.golang.org/grpc v1.49.0 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

9
go.sum
View File

@@ -240,8 +240,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v1.0.2-0.20220824112019-a1c2809ddc67 h1:mh2hHoW2nMaqrEXPCy0G8utMTv8AbqFvzpetJ4YzYzw=
github.com/livekit/protocol v1.0.2-0.20220824112019-a1c2809ddc67/go.mod h1:M+JZ29cEbUozppVg3LlDC9rZPE5d218Dft3h1qBP1qM=
github.com/livekit/protocol v1.0.2-0.20220831180559-284d6b27297a h1:UY0lVZdEixOguIApMxWRqLEYj7tSJDJguvzhJaPtl/Y=
github.com/livekit/protocol v1.0.2-0.20220831180559-284d6b27297a/go.mod h1:ykRtMmaq4blqGyLWWPtYkB/74JYsyK7N2DAXLGnfSa4=
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw=
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U=
github.com/mackerelio/go-osstat v0.2.3 h1:jAMXD5erlDE39kdX2CU7YwCGRcxIO33u/p8+Fhe5dJw=
@@ -431,7 +431,6 @@ go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.22.0/go.mod h1:H4siCOZOrAolnUPJEkfaSjDqyP+BDS0DdDWzwcgt3+U=
go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY=
go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@@ -756,8 +755,8 @@ google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w=
google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw=
google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

View File

@@ -17,7 +17,7 @@ import (
type IngressService struct {
conf *config.IngressConfig
rpc ingress.RPC
rpcClient ingress.RPCClient
store IngressStore
roomService livekit.RoomService
telemetry telemetry.TelemetryService
@@ -25,16 +25,16 @@ type IngressService struct {
}
func NewIngressService(
conf *config.Config,
rpc ingress.RPC,
conf *config.IngressConfig,
rpcClient ingress.RPCClient,
store IngressStore,
rs livekit.RoomService,
ts telemetry.TelemetryService,
) *IngressService {
return &IngressService{
conf: &conf.Ingress,
rpc: rpc,
conf: conf,
rpcClient: rpcClient,
store: store,
roomService: rs,
telemetry: ts,
@@ -43,7 +43,7 @@ func NewIngressService(
}
func (s *IngressService) Start() {
if s.rpc != nil {
if s.rpcClient != nil {
go s.updateWorker()
go s.entitiesWorker()
}
@@ -98,7 +98,7 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI
return nil, twirpAuthError(ErrPermissionDenied)
}
if s.rpc == nil {
if s.rpcClient == nil {
return nil, ErrIngressNotConnected
}
@@ -135,7 +135,7 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI
case livekit.IngressState_ENDPOINT_BUFFERING,
livekit.IngressState_ENDPOINT_PUBLISHING:
info, err = s.rpc.SendRequest(ctx, &livekit.IngressRequest{
info, err = s.rpcClient.SendRequest(ctx, &livekit.IngressRequest{
IngressId: req.IngressId,
Request: &livekit.IngressRequest_Update{Update: req},
})
@@ -163,10 +163,6 @@ func (s *IngressService) ListIngress(ctx context.Context, req *livekit.ListIngre
return nil, twirpAuthError(ErrPermissionDenied)
}
if s.rpc == nil {
return nil, ErrIngressNotConnected
}
infos, err := s.store.ListIngress(ctx, livekit.RoomName(req.RoomName))
if err != nil {
logger.Errorw("could not list ingress info", err)
@@ -181,7 +177,7 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI
return nil, twirpAuthError(err)
}
if s.rpc == nil {
if s.rpcClient == nil {
return nil, ErrIngressNotConnected
}
@@ -193,7 +189,7 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI
switch info.State.Status {
case livekit.IngressState_ENDPOINT_BUFFERING,
livekit.IngressState_ENDPOINT_PUBLISHING:
info, err = s.rpc.SendRequest(ctx, &livekit.IngressRequest{
info, err = s.rpcClient.SendRequest(ctx, &livekit.IngressRequest{
IngressId: req.IngressId,
Request: &livekit.IngressRequest_Delete{Delete: req},
})
@@ -214,7 +210,7 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI
}
func (s *IngressService) updateWorker() {
sub, err := s.rpc.GetUpdateChannel(context.Background())
sub, err := s.rpcClient.GetUpdateChannel(context.Background())
if err != nil {
logger.Errorw("failed to subscribe to results channel", err)
return
@@ -246,7 +242,7 @@ func (s *IngressService) updateWorker() {
}
func (s *IngressService) entitiesWorker() {
sub, err := s.rpc.GetEntityChannel(context.Background())
sub, err := s.rpcClient.GetEntityChannel(context.Background())
if err != nil {
logger.Errorw("failed to subscribe to entities channel", err)
return
@@ -273,7 +269,7 @@ func (s *IngressService) entitiesWorker() {
} else {
err = errors.New("request needs to specity either IngressId or StreamKey")
}
err = s.rpc.SendResponse(context.Background(), req, info, err)
err = s.rpcClient.SendGetIngressInfoResponse(context.Background(), req, &livekit.GetIngressInfoResponse{Info: info}, err)
if err != nil {
logger.Errorw("could not send response", err)
}

View File

@@ -48,6 +48,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
NewEgressService,
ingress.NewRedisRPC,
getIngressStore,
getIngressConfig,
getIngressRPCClient,
NewIngressService,
NewRoomAllocator,
NewRoomService,
@@ -189,6 +191,14 @@ func getIngressStore(s ObjectStore) IngressStore {
}
}
func getIngressConfig(conf *config.Config) *config.IngressConfig {
return &conf.Ingress
}
func getIngressRPCClient(rpc ingress.RPC) ingress.RPCClient {
return rpc
}
func createClientConfiguration() clientconfiguration.ClientConfigurationManager {
return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations)
}

View File

@@ -63,9 +63,12 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
analyticsService := telemetry.NewAnalyticsService(conf, currentNode)
telemetryService := telemetry.NewTelemetryService(notifier, analyticsService)
egressService := NewEgressService(rpcClient, objectStore, egressStore, roomService, telemetryService)
ingressConfig := getIngressConfig(conf)
rpc := ingress.NewRedisRPC(nodeID, client)
ingressRPCClient := getIngressRPCClient(rpc)
rpcServer := getIngressRPCServer(rpc)
ingressStore := getIngressStore(objectStore)
ingressService := NewIngressService(conf, rpc, ingressStore, roomService, telemetryService)
ingressService := NewIngressService(ingressConfig, ingressRPCClient, rpcServer, ingressStore, roomService, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode)
clientConfigurationManager := createClientConfiguration()
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager)
@@ -215,6 +218,18 @@ func getIngressStore(s ObjectStore) IngressStore {
}
}
func getIngressConfig(conf *config.Config) *config.IngressConfig {
return &conf.Ingress
}
func getIngressRPCClient(rpc ingress.RPC) ingress.RPCClient {
return rpc
}
func getIngressRPCServer(rpc ingress.RPC) ingress.RPCServer {
return rpc
}
func createClientConfiguration() clientconfiguration.ClientConfigurationManager {
return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations)
}