add stream retry middleware for signalling (#1503)

This commit is contained in:
Paul Wells
2023-03-08 00:51:19 -08:00
committed by GitHub
parent 1f4fd6aafe
commit 2c93d55e5c
7 changed files with 50 additions and 20 deletions
+14 -1
View File
@@ -61,7 +61,7 @@ type Config struct {
KeyFile string `yaml:"key_file,omitempty"`
Keys map[string]string `yaml:"keys,omitempty"`
Region string `yaml:"region,omitempty"`
UsePSRPCSignal bool `yaml:"use_psrpc_signal,omitempty"`
SignalRelay SignalRelayConfig `yaml:"signal_relay,omitempty"`
// LogLevel is deprecated
LogLevel string `yaml:"log_level,omitempty"`
Logging LoggingConfig `yaml:"logging,omitempty"`
@@ -228,6 +228,13 @@ type NodeSelectorConfig struct {
Regions []RegionConfig `yaml:"regions"`
}
type SignalRelayConfig struct {
Enabled bool `yaml:"enabled"`
MaxAttempts int `yaml:"max_attempts"`
Timeout time.Duration `yaml:"timeout"`
Backoff time.Duration `yaml:"backoff"`
}
// RegionConfig lists available regions and their latitude/longitude, so the selector would prefer
// regions that are closer
type RegionConfig struct {
@@ -397,6 +404,12 @@ func NewConfig(confString string, strictMode bool, c *cli.Context, baseFlags []c
SysloadLimit: 0.9,
CPULoadLimit: 0.9,
},
SignalRelay: SignalRelayConfig{
Enabled: false,
MaxAttempts: 3,
Timeout: 500 * time.Millisecond,
Backoff: 500 * time.Millisecond,
},
Keys: map[string]string{},
}
-11
View File
@@ -61,17 +61,6 @@ type RTCMessageCallback func(
msg *livekit.RTCNodeMessage,
)
type NewSignalClientCallabck func(
roomName livekit.RoomName,
pi ParticipantInit,
nodeID livekit.NodeID,
) (
connectionID livekit.ConnectionID,
reqSink MessageSink,
resSource MessageSource,
err error,
)
// Router allows multiple nodes to coordinate the participant session
//
//counterfeiter:generate . Router
+1 -1
View File
@@ -50,7 +50,7 @@ func NewRedisRouter(config *config.Config, lr *LocalRouter, rc redis.UniversalCl
rr := &RedisRouter{
LocalRouter: lr,
rc: rc,
usePSRPCSignal: config.UsePSRPCSignal,
usePSRPCSignal: config.SignalRelay.Enabled,
}
rr.ctx, rr.cancel = context.WithCancel(context.Background())
return rr
+9 -2
View File
@@ -5,11 +5,13 @@ import (
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"
"github.com/livekit/psrpc"
"github.com/livekit/psrpc/middleware"
)
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
@@ -24,8 +26,13 @@ type signalClient struct {
client rpc.TypedSignalClient
}
func NewSignalClient(nodeID livekit.NodeID, bus psrpc.MessageBus) (SignalClient, error) {
c, err := rpc.NewTypedSignalClient(nodeID, bus)
func NewSignalClient(nodeID livekit.NodeID, bus psrpc.MessageBus, config config.SignalRelayConfig) (SignalClient, error) {
ri := middleware.NewStreamRetryInterceptorFactory(middleware.RetryOptions{
MaxAttempts: config.MaxAttempts,
Timeout: config.Timeout,
Backoff: config.Backoff,
})
c, err := rpc.NewTypedSignalClient(nodeID, bus, psrpc.WithClientStreamInterceptors(ri))
if err != nil {
return nil, err
}
+11 -2
View File
@@ -6,12 +6,14 @@ import (
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"
"github.com/livekit/psrpc/middleware"
)
type SessionHandler func(
@@ -31,9 +33,15 @@ func NewSignalServer(
nodeID livekit.NodeID,
region string,
bus psrpc.MessageBus,
config config.SignalRelayConfig,
sessionHandler SessionHandler,
) (*SignalServer, error) {
s, err := rpc.NewTypedSignalServer(nodeID, &signalService{region, sessionHandler}, bus)
ri := middleware.NewStreamRetryInterceptorFactory(middleware.RetryOptions{
MaxAttempts: config.MaxAttempts,
Timeout: config.Timeout,
Backoff: config.Backoff,
})
s, err := rpc.NewTypedSignalServer(nodeID, &signalService{region, sessionHandler}, bus, psrpc.WithServerStreamInterceptors(ri))
if err != nil {
return nil, err
}
@@ -48,6 +56,7 @@ func NewSignalServer(
func NewDefaultSignalServer(
currentNode routing.LocalNode,
bus psrpc.MessageBus,
config config.SignalRelayConfig,
router routing.Router,
roomManager *RoomManager,
) (r *SignalServer, err error) {
@@ -63,7 +72,7 @@ func NewDefaultSignalServer(
return roomManager.StartSession(ctx, roomName, pi, requestSource, responseSink)
}
return NewSignalServer(livekit.NodeID(currentNode.Id), currentNode.Region, bus, sessionHandler)
return NewSignalServer(livekit.NodeID(currentNode.Id), currentNode.Region, bus, config, sessionHandler)
}
func (r *SignalServer) Stop() {
+6
View File
@@ -57,6 +57,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
NewRoomAllocator,
NewRoomService,
NewRTCService,
getSignalRelayConfig,
NewDefaultSignalServer,
routing.NewSignalClient,
NewLocalRoomManager,
@@ -73,6 +74,7 @@ func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routi
createRedisClient,
getNodeID,
getMessageBus,
getSignalRelayConfig,
routing.NewSignalClient,
routing.CreateRouter,
)
@@ -184,6 +186,10 @@ func getRoomConf(config *config.Config) config.RoomConfig {
return config.Room
}
func getSignalRelayConfig(config *config.Config) config.SignalRelayConfig {
return config.SignalRelay
}
func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) {
return NewTurnServer(conf, authHandler, false)
}
+9 -3
View File
@@ -42,7 +42,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
nodeID := getNodeID(currentNode)
messageBus := getMessageBus(universalClient)
signalClient, err := routing.NewSignalClient(nodeID, messageBus)
signalRelayConfig := getSignalRelayConfig(conf)
signalClient, err := routing.NewSignalClient(nodeID, messageBus, signalRelayConfig)
if err != nil {
return nil, err
}
@@ -92,7 +93,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
signalServer, err := NewDefaultSignalServer(currentNode, messageBus, router, roomManager)
signalServer, err := NewDefaultSignalServer(currentNode, messageBus, signalRelayConfig, router, roomManager)
if err != nil {
return nil, err
}
@@ -115,7 +116,8 @@ func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routi
}
nodeID := getNodeID(currentNode)
messageBus := getMessageBus(universalClient)
signalClient, err := routing.NewSignalClient(nodeID, messageBus)
signalRelayConfig := getSignalRelayConfig(conf)
signalClient, err := routing.NewSignalClient(nodeID, messageBus, signalRelayConfig)
if err != nil {
return nil, err
}
@@ -229,6 +231,10 @@ func getRoomConf(config2 *config.Config) config.RoomConfig {
return config2.Room
}
func getSignalRelayConfig(config2 *config.Config) config.SignalRelayConfig {
return config2.SignalRelay
}
func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) {
return NewTurnServer(conf, authHandler, false)
}