From 7778cdf2cd74ad27b2fe680a59166137ffdde6bc Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Fri, 30 Dec 2022 09:32:55 +1300 Subject: [PATCH] Do not use the egress version stored in redis to decide whether to enable PsRPC. Use a conf entry instead (#1262) --- config-sample.yaml | 6 +++++ pkg/config/config.go | 5 ++++ pkg/service/egress.go | 54 +++++++++++++++++++-------------------- pkg/service/interfaces.go | 1 - pkg/service/redisstore.go | 24 +---------------- pkg/service/wire.go | 10 +++++++- pkg/service/wire_gen.go | 14 +++++++--- 7 files changed, 59 insertions(+), 55 deletions(-) diff --git a/config-sample.yaml b/config-sample.yaml index 1cb57b590..d56f263cc 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -200,6 +200,12 @@ keys: # # ingress info # rtmp_base_url: "rtmp://my.domain.com/live" +# egress server +# egress: +# # Whether to use the PSRPC enabled RPC implementation. This requires livekit egress version >=1.5.4 +# # The legacy, non PSRPC RPC implementation will be removed eventually +# use_psrpc: false + # Region of the current node. Required if using regionaware node selector # region: us-west-2 diff --git a/pkg/config/config.go b/pkg/config/config.go index 99787d08d..ac13b2c35 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -52,6 +52,7 @@ type Config struct { Video VideoConfig `yaml:"video,omitempty"` Room RoomConfig `yaml:"room,omitempty"` TURN TURNConfig `yaml:"turn,omitempty"` + Egress EgressConfig `yaml:"egress,omitempty"` Ingress IngressConfig `yaml:"ingress,omitempty"` WebHook WebHookConfig `yaml:"webhook,omitempty"` NodeSelector NodeSelectorConfig `yaml:"node_selector,omitempty"` @@ -234,6 +235,10 @@ type LimitConfig struct { BytesPerSec float32 `yaml:"bytes_per_sec"` } +type EgressConfig struct { + UsePsRPC bool `yaml:"use_psrpc"` +} + type IngressConfig struct { RTMPBaseURL string `yaml:"rtmp_base_url"` } diff --git a/pkg/service/egress.go b/pkg/service/egress.go index e04debd73..f3ff39974 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -33,7 +33,6 @@ type EgressService struct { type egressLauncher struct { psrpcClient rpc.EgressClient clientDeprecated egress.RPCClient - usePSRPC bool es EgressStore telemetry telemetry.TelemetryService } @@ -81,7 +80,7 @@ func (s *EgressService) Start() error { } s.shutdown = make(chan struct{}) - if s.psrpcClient != nil && s.es != nil { + if (s.psrpcClient != nil || s.clientDeprecated != nil) && s.es != nil { return s.startWorker() } @@ -184,14 +183,10 @@ func (s *egressLauncher) StartEgress(ctx context.Context, req *livekit.StartEgre var info *livekit.EgressInfo var err error - if !s.usePSRPC { - s.usePSRPC = s.es.UsePSRPC() - } - - if s.usePSRPC { + if s.psrpcClient != nil { info, err = s.psrpcClient.StartEgress(ctx, req) } else { - logger.Warnw("Using deprecated egress client. Please upgrade egress to v >=1.5.4", nil) + logger.Infow("Using deprecated egress client.", nil) info, err = s.clientDeprecated.SendRequest(ctx, req) } if err != nil { @@ -217,7 +212,7 @@ func (s *EgressService) UpdateLayout(ctx context.Context, req *livekit.UpdateLay if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } - if s.psrpcClient == nil { + if s.psrpcClient == nil && s.clientDeprecated == nil { return nil, ErrEgressNotConnected } @@ -253,7 +248,7 @@ func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStr return nil, twirpAuthError(err) } - if s.psrpcClient == nil { + if s.psrpcClient == nil && s.clientDeprecated == nil { return nil, ErrEgressNotConnected } @@ -290,7 +285,7 @@ func (s *EgressService) ListEgress(ctx context.Context, req *livekit.ListEgressR if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } - if s.psrpcClient == nil { + if s.psrpcClient == nil && s.clientDeprecated == nil { return nil, ErrEgressNotConnected } @@ -308,7 +303,7 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR return nil, twirpAuthError(err) } - if s.psrpcClient == nil { + if s.psrpcClient == nil && s.clientDeprecated == nil { return nil, ErrEgressNotConnected } @@ -356,22 +351,24 @@ func (s *EgressService) startWorker() error { return err } - go func() { - sub, err := s.psrpcClient.SubscribeInfoUpdate(context.Background()) - if err != nil { - logger.Errorw("failed to subscribe", err) - } - - for { - select { - case info := <-sub.Channel(): - s.handleUpdate(info) - case <-s.shutdown: - _ = sub.Close() - return + if s.psrpcClient != nil { + go func() { + sub, err := s.psrpcClient.SubscribeInfoUpdate(context.Background()) + if err != nil { + logger.Errorw("failed to subscribe", err) } - } - }() + + for { + select { + case info := <-sub.Channel(): + s.handleUpdate(info) + case <-s.shutdown: + _ = sub.Close() + return + } + } + }() + } if s.clientDeprecated != nil { go func() { @@ -439,6 +436,9 @@ func (s *EgressService) getFirst(f0, f1 func() (*livekit.EgressInfo, error)) (*l if s.clientDeprecated == nil { return f1() } + if s.psrpcClient == nil { + return f0() + } type res struct { info *livekit.EgressInfo diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index 7739adecd..c2b9876e6 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -44,7 +44,6 @@ type EgressStore interface { LoadEgress(ctx context.Context, egressID string) (*livekit.EgressInfo, error) ListEgress(ctx context.Context, roomName livekit.RoomName) ([]*livekit.EgressInfo, error) UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error - UsePSRPC() bool } //counterfeiter:generate . IngressStore diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index c0bdad1cc..d69438ee4 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -20,8 +20,7 @@ import ( ) const ( - VersionKey = "livekit_version" - EgressVersionKey = "egress_version" + VersionKey = "livekit_version" // RoomsKey is hash of room_name => Room proto RoomsKey = "rooms" @@ -419,27 +418,6 @@ func (s *RedisStore) UpdateEgress(_ context.Context, info *livekit.EgressInfo) e return nil } -func (s *RedisStore) UsePSRPC() bool { - egressVersion, err := s.rc.Get(s.ctx, EgressVersionKey).Result() - if err != nil || egressVersion == "" { - return false - } - v, _ := goversion.NewVersion(egressVersion) - minVersion, _ := goversion.NewVersion("1.5.4") - return v.GreaterThanOrEqual(minVersion) -} - -func (s *RedisStore) GetEgressVersion(_ context.Context) (*goversion.Version, error) { - egressVersion, err := s.rc.Get(s.ctx, EgressVersionKey).Result() - if err != nil && err != redis.Nil { - return nil, err - } - if egressVersion == "" { - egressVersion = "0.0.0" - } - return goversion.NewVersion(egressVersion) -} - // Deletes egress info 24h after the egress has ended func (s *RedisStore) egressWorker() { ticker := time.NewTicker(time.Minute * 30) diff --git a/pkg/service/wire.go b/pkg/service/wire.go index e2306d2a6..75ff9acfa 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -45,7 +45,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live telemetry.NewAnalyticsService, telemetry.NewTelemetryService, getMessageBus, - rpc.NewEgressClient, + getEgressClient, egress.NewRedisRPCClient, getEgressStore, NewEgressLauncher, @@ -141,6 +141,14 @@ func getMessageBus(rc redis.UniversalClient) psrpc.MessageBus { return psrpc.NewRedisMessageBus(rc) } +func getEgressClient(conf *config.Config, nodeID livekit.NodeID, bus psrpc.MessageBus) (rpc.EgressClient, error) { + if conf.Egress.UsePsRPC { + return rpc.NewEgressClient(nodeID, bus) + } + + return nil, nil +} + func getEgressStore(s ObjectStore) EgressStore { switch store := s.(type) { case *RedisStore: diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index fd5af95cc..3a5730345 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -48,7 +48,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } nodeID := getNodeID(currentNode) messageBus := getMessageBus(universalClient) - egressClient, err := rpc.NewEgressClient(nodeID, messageBus) + egressClient, err := getEgressClient(conf, nodeID, messageBus) if err != nil { return nil, err } @@ -71,8 +71,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } egressService := NewEgressService(egressClient, rpcClient, objectStore, egressStore, roomService, telemetryService, rtcEgressLauncher) ingressConfig := getIngressConfig(conf) - ingressRPC := ingress.NewRedisRPC(nodeID, universalClient) - ingressRPCClient := getIngressRPCClient(ingressRPC) + rpc := ingress.NewRedisRPC(nodeID, universalClient) + ingressRPCClient := getIngressRPCClient(rpc) ingressStore := getIngressStore(objectStore) ingressService := NewIngressService(ingressConfig, ingressRPCClient, ingressStore, roomService, telemetryService) rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, telemetryService) @@ -170,6 +170,14 @@ func getMessageBus(rc redis.UniversalClient) psrpc.MessageBus { return psrpc.NewRedisMessageBus(rc) } +func getEgressClient(conf *config.Config, nodeID livekit.NodeID, bus psrpc.MessageBus) (rpc.EgressClient, error) { + if conf.Egress.UsePsRPC { + return rpc.NewEgressClient(nodeID, bus) + } + + return nil, nil +} + func getEgressStore(s ObjectStore) EgressStore { switch store := s.(type) { case *RedisStore: