diff --git a/pkg/config/config.go b/pkg/config/config.go index 7eadbcc40..52ef6a079 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -61,7 +61,7 @@ type Config struct { KeyFile string `yaml:"key_file,omitempty"` Keys map[string]string `yaml:"keys,omitempty"` Region string `yaml:"region,omitempty"` - UsePSRPCSignal bool `yaml:"use_psrpc_signal,omitempty"` + SignalRelay SignalRelayConfig `yaml:"signal_relay,omitempty"` // LogLevel is deprecated LogLevel string `yaml:"log_level,omitempty"` Logging LoggingConfig `yaml:"logging,omitempty"` @@ -228,6 +228,13 @@ type NodeSelectorConfig struct { Regions []RegionConfig `yaml:"regions"` } +type SignalRelayConfig struct { + Enabled bool `yaml:"enabled"` + MaxAttempts int `yaml:"max_attempts"` + Timeout time.Duration `yaml:"timeout"` + Backoff time.Duration `yaml:"backoff"` +} + // RegionConfig lists available regions and their latitude/longitude, so the selector would prefer // regions that are closer type RegionConfig struct { @@ -397,6 +404,12 @@ func NewConfig(confString string, strictMode bool, c *cli.Context, baseFlags []c SysloadLimit: 0.9, CPULoadLimit: 0.9, }, + SignalRelay: SignalRelayConfig{ + Enabled: false, + MaxAttempts: 3, + Timeout: 500 * time.Millisecond, + Backoff: 500 * time.Millisecond, + }, Keys: map[string]string{}, } diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index d7968e413..20c090809 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -61,17 +61,6 @@ type RTCMessageCallback func( msg *livekit.RTCNodeMessage, ) -type NewSignalClientCallabck func( - roomName livekit.RoomName, - pi ParticipantInit, - nodeID livekit.NodeID, -) ( - connectionID livekit.ConnectionID, - reqSink MessageSink, - resSource MessageSource, - err error, -) - // Router allows multiple nodes to coordinate the participant session // //counterfeiter:generate . Router diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index 4acbbdff4..be1a8e025 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -50,7 +50,7 @@ func NewRedisRouter(config *config.Config, lr *LocalRouter, rc redis.UniversalCl rr := &RedisRouter{ LocalRouter: lr, rc: rc, - usePSRPCSignal: config.UsePSRPCSignal, + usePSRPCSignal: config.SignalRelay.Enabled, } rr.ctx, rr.cancel = context.WithCancel(context.Background()) return rr diff --git a/pkg/routing/signal.go b/pkg/routing/signal.go index c5cede85e..4a7eaba8f 100644 --- a/pkg/routing/signal.go +++ b/pkg/routing/signal.go @@ -5,11 +5,13 @@ import ( "google.golang.org/protobuf/proto" + "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils" "github.com/livekit/psrpc" + "github.com/livekit/psrpc/middleware" ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate @@ -24,8 +26,13 @@ type signalClient struct { client rpc.TypedSignalClient } -func NewSignalClient(nodeID livekit.NodeID, bus psrpc.MessageBus) (SignalClient, error) { - c, err := rpc.NewTypedSignalClient(nodeID, bus) +func NewSignalClient(nodeID livekit.NodeID, bus psrpc.MessageBus, config config.SignalRelayConfig) (SignalClient, error) { + ri := middleware.NewStreamRetryInterceptorFactory(middleware.RetryOptions{ + MaxAttempts: config.MaxAttempts, + Timeout: config.Timeout, + Backoff: config.Backoff, + }) + c, err := rpc.NewTypedSignalClient(nodeID, bus, psrpc.WithClientStreamInterceptors(ri)) if err != nil { return nil, err } diff --git a/pkg/service/signal.go b/pkg/service/signal.go index abf9b05dd..6e4b17b32 100644 --- a/pkg/service/signal.go +++ b/pkg/service/signal.go @@ -6,12 +6,14 @@ import ( "github.com/pkg/errors" "google.golang.org/protobuf/proto" + "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" "github.com/livekit/psrpc" + "github.com/livekit/psrpc/middleware" ) type SessionHandler func( @@ -31,9 +33,15 @@ func NewSignalServer( nodeID livekit.NodeID, region string, bus psrpc.MessageBus, + config config.SignalRelayConfig, sessionHandler SessionHandler, ) (*SignalServer, error) { - s, err := rpc.NewTypedSignalServer(nodeID, &signalService{region, sessionHandler}, bus) + ri := middleware.NewStreamRetryInterceptorFactory(middleware.RetryOptions{ + MaxAttempts: config.MaxAttempts, + Timeout: config.Timeout, + Backoff: config.Backoff, + }) + s, err := rpc.NewTypedSignalServer(nodeID, &signalService{region, sessionHandler}, bus, psrpc.WithServerStreamInterceptors(ri)) if err != nil { return nil, err } @@ -48,6 +56,7 @@ func NewSignalServer( func NewDefaultSignalServer( currentNode routing.LocalNode, bus psrpc.MessageBus, + config config.SignalRelayConfig, router routing.Router, roomManager *RoomManager, ) (r *SignalServer, err error) { @@ -63,7 +72,7 @@ func NewDefaultSignalServer( return roomManager.StartSession(ctx, roomName, pi, requestSource, responseSink) } - return NewSignalServer(livekit.NodeID(currentNode.Id), currentNode.Region, bus, sessionHandler) + return NewSignalServer(livekit.NodeID(currentNode.Id), currentNode.Region, bus, config, sessionHandler) } func (r *SignalServer) Stop() { diff --git a/pkg/service/wire.go b/pkg/service/wire.go index f010c7d3a..67310da29 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -57,6 +57,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live NewRoomAllocator, NewRoomService, NewRTCService, + getSignalRelayConfig, NewDefaultSignalServer, routing.NewSignalClient, NewLocalRoomManager, @@ -73,6 +74,7 @@ func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routi createRedisClient, getNodeID, getMessageBus, + getSignalRelayConfig, routing.NewSignalClient, routing.CreateRouter, ) @@ -184,6 +186,10 @@ func getRoomConf(config *config.Config) config.RoomConfig { return config.Room } +func getSignalRelayConfig(config *config.Config) config.SignalRelayConfig { + return config.SignalRelay +} + func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) { return NewTurnServer(conf, authHandler, false) } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 9231a74d2..080858485 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -42,7 +42,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } nodeID := getNodeID(currentNode) messageBus := getMessageBus(universalClient) - signalClient, err := routing.NewSignalClient(nodeID, messageBus) + signalRelayConfig := getSignalRelayConfig(conf) + signalClient, err := routing.NewSignalClient(nodeID, messageBus, signalRelayConfig) if err != nil { return nil, err } @@ -92,7 +93,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - signalServer, err := NewDefaultSignalServer(currentNode, messageBus, router, roomManager) + signalServer, err := NewDefaultSignalServer(currentNode, messageBus, signalRelayConfig, router, roomManager) if err != nil { return nil, err } @@ -115,7 +116,8 @@ func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routi } nodeID := getNodeID(currentNode) messageBus := getMessageBus(universalClient) - signalClient, err := routing.NewSignalClient(nodeID, messageBus) + signalRelayConfig := getSignalRelayConfig(conf) + signalClient, err := routing.NewSignalClient(nodeID, messageBus, signalRelayConfig) if err != nil { return nil, err } @@ -229,6 +231,10 @@ func getRoomConf(config2 *config.Config) config.RoomConfig { return config2.Room } +func getSignalRelayConfig(config2 *config.Config) config.SignalRelayConfig { + return config2.SignalRelay +} + func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) { return NewTurnServer(conf, authHandler, false) }