Do not use the egress version stored in redis to decide whether to enable PsRPC. Use a conf entry instead (#1262)

This commit is contained in:
Benjamin Pracht
2022-12-30 09:32:55 +13:00
committed by GitHub
parent 1cffa98311
commit 7778cdf2cd
7 changed files with 59 additions and 55 deletions

View File

@@ -200,6 +200,12 @@ keys:
# # ingress info
# rtmp_base_url: "rtmp://my.domain.com/live"
# egress server
# egress:
# # Whether to use the PSRPC enabled RPC implementation. This requires livekit egress version >=1.5.4
# # The legacy, non PSRPC RPC implementation will be removed eventually
# use_psrpc: false
# Region of the current node. Required if using regionaware node selector
# region: us-west-2

View File

@@ -52,6 +52,7 @@ type Config struct {
Video VideoConfig `yaml:"video,omitempty"`
Room RoomConfig `yaml:"room,omitempty"`
TURN TURNConfig `yaml:"turn,omitempty"`
Egress EgressConfig `yaml:"egress,omitempty"`
Ingress IngressConfig `yaml:"ingress,omitempty"`
WebHook WebHookConfig `yaml:"webhook,omitempty"`
NodeSelector NodeSelectorConfig `yaml:"node_selector,omitempty"`
@@ -234,6 +235,10 @@ type LimitConfig struct {
BytesPerSec float32 `yaml:"bytes_per_sec"`
}
type EgressConfig struct {
UsePsRPC bool `yaml:"use_psrpc"`
}
type IngressConfig struct {
RTMPBaseURL string `yaml:"rtmp_base_url"`
}

View File

@@ -33,7 +33,6 @@ type EgressService struct {
type egressLauncher struct {
psrpcClient rpc.EgressClient
clientDeprecated egress.RPCClient
usePSRPC bool
es EgressStore
telemetry telemetry.TelemetryService
}
@@ -81,7 +80,7 @@ func (s *EgressService) Start() error {
}
s.shutdown = make(chan struct{})
if s.psrpcClient != nil && s.es != nil {
if (s.psrpcClient != nil || s.clientDeprecated != nil) && s.es != nil {
return s.startWorker()
}
@@ -184,14 +183,10 @@ func (s *egressLauncher) StartEgress(ctx context.Context, req *livekit.StartEgre
var info *livekit.EgressInfo
var err error
if !s.usePSRPC {
s.usePSRPC = s.es.UsePSRPC()
}
if s.usePSRPC {
if s.psrpcClient != nil {
info, err = s.psrpcClient.StartEgress(ctx, req)
} else {
logger.Warnw("Using deprecated egress client. Please upgrade egress to v >=1.5.4", nil)
logger.Infow("Using deprecated egress client.", nil)
info, err = s.clientDeprecated.SendRequest(ctx, req)
}
if err != nil {
@@ -217,7 +212,7 @@ func (s *EgressService) UpdateLayout(ctx context.Context, req *livekit.UpdateLay
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.psrpcClient == nil {
if s.psrpcClient == nil && s.clientDeprecated == nil {
return nil, ErrEgressNotConnected
}
@@ -253,7 +248,7 @@ func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStr
return nil, twirpAuthError(err)
}
if s.psrpcClient == nil {
if s.psrpcClient == nil && s.clientDeprecated == nil {
return nil, ErrEgressNotConnected
}
@@ -290,7 +285,7 @@ func (s *EgressService) ListEgress(ctx context.Context, req *livekit.ListEgressR
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.psrpcClient == nil {
if s.psrpcClient == nil && s.clientDeprecated == nil {
return nil, ErrEgressNotConnected
}
@@ -308,7 +303,7 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR
return nil, twirpAuthError(err)
}
if s.psrpcClient == nil {
if s.psrpcClient == nil && s.clientDeprecated == nil {
return nil, ErrEgressNotConnected
}
@@ -356,22 +351,24 @@ func (s *EgressService) startWorker() error {
return err
}
go func() {
sub, err := s.psrpcClient.SubscribeInfoUpdate(context.Background())
if err != nil {
logger.Errorw("failed to subscribe", err)
}
for {
select {
case info := <-sub.Channel():
s.handleUpdate(info)
case <-s.shutdown:
_ = sub.Close()
return
if s.psrpcClient != nil {
go func() {
sub, err := s.psrpcClient.SubscribeInfoUpdate(context.Background())
if err != nil {
logger.Errorw("failed to subscribe", err)
}
}
}()
for {
select {
case info := <-sub.Channel():
s.handleUpdate(info)
case <-s.shutdown:
_ = sub.Close()
return
}
}
}()
}
if s.clientDeprecated != nil {
go func() {
@@ -439,6 +436,9 @@ func (s *EgressService) getFirst(f0, f1 func() (*livekit.EgressInfo, error)) (*l
if s.clientDeprecated == nil {
return f1()
}
if s.psrpcClient == nil {
return f0()
}
type res struct {
info *livekit.EgressInfo

View File

@@ -44,7 +44,6 @@ type EgressStore interface {
LoadEgress(ctx context.Context, egressID string) (*livekit.EgressInfo, error)
ListEgress(ctx context.Context, roomName livekit.RoomName) ([]*livekit.EgressInfo, error)
UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error
UsePSRPC() bool
}
//counterfeiter:generate . IngressStore

View File

@@ -20,8 +20,7 @@ import (
)
const (
VersionKey = "livekit_version"
EgressVersionKey = "egress_version"
VersionKey = "livekit_version"
// RoomsKey is hash of room_name => Room proto
RoomsKey = "rooms"
@@ -419,27 +418,6 @@ func (s *RedisStore) UpdateEgress(_ context.Context, info *livekit.EgressInfo) e
return nil
}
func (s *RedisStore) UsePSRPC() bool {
egressVersion, err := s.rc.Get(s.ctx, EgressVersionKey).Result()
if err != nil || egressVersion == "" {
return false
}
v, _ := goversion.NewVersion(egressVersion)
minVersion, _ := goversion.NewVersion("1.5.4")
return v.GreaterThanOrEqual(minVersion)
}
func (s *RedisStore) GetEgressVersion(_ context.Context) (*goversion.Version, error) {
egressVersion, err := s.rc.Get(s.ctx, EgressVersionKey).Result()
if err != nil && err != redis.Nil {
return nil, err
}
if egressVersion == "" {
egressVersion = "0.0.0"
}
return goversion.NewVersion(egressVersion)
}
// Deletes egress info 24h after the egress has ended
func (s *RedisStore) egressWorker() {
ticker := time.NewTicker(time.Minute * 30)

View File

@@ -45,7 +45,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
telemetry.NewAnalyticsService,
telemetry.NewTelemetryService,
getMessageBus,
rpc.NewEgressClient,
getEgressClient,
egress.NewRedisRPCClient,
getEgressStore,
NewEgressLauncher,
@@ -141,6 +141,14 @@ func getMessageBus(rc redis.UniversalClient) psrpc.MessageBus {
return psrpc.NewRedisMessageBus(rc)
}
func getEgressClient(conf *config.Config, nodeID livekit.NodeID, bus psrpc.MessageBus) (rpc.EgressClient, error) {
if conf.Egress.UsePsRPC {
return rpc.NewEgressClient(nodeID, bus)
}
return nil, nil
}
func getEgressStore(s ObjectStore) EgressStore {
switch store := s.(type) {
case *RedisStore:

View File

@@ -48,7 +48,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
nodeID := getNodeID(currentNode)
messageBus := getMessageBus(universalClient)
egressClient, err := rpc.NewEgressClient(nodeID, messageBus)
egressClient, err := getEgressClient(conf, nodeID, messageBus)
if err != nil {
return nil, err
}
@@ -71,8 +71,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
egressService := NewEgressService(egressClient, rpcClient, objectStore, egressStore, roomService, telemetryService, rtcEgressLauncher)
ingressConfig := getIngressConfig(conf)
ingressRPC := ingress.NewRedisRPC(nodeID, universalClient)
ingressRPCClient := getIngressRPCClient(ingressRPC)
rpc := ingress.NewRedisRPC(nodeID, universalClient)
ingressRPCClient := getIngressRPCClient(rpc)
ingressStore := getIngressStore(objectStore)
ingressService := NewIngressService(ingressConfig, ingressRPCClient, ingressStore, roomService, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, telemetryService)
@@ -170,6 +170,14 @@ func getMessageBus(rc redis.UniversalClient) psrpc.MessageBus {
return psrpc.NewRedisMessageBus(rc)
}
func getEgressClient(conf *config.Config, nodeID livekit.NodeID, bus psrpc.MessageBus) (rpc.EgressClient, error) {
if conf.Egress.UsePsRPC {
return rpc.NewEgressClient(nodeID, bus)
}
return nil, nil
}
func getEgressStore(s ObjectStore) EgressStore {
switch store := s.(type) {
case *RedisStore: