diff --git a/cmd/server/main.go b/cmd/server/main.go index 8d4a2556f..2c2b02e61 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -124,8 +124,8 @@ func main() { }, }, { - Name: "list-nodes", - Usage: "list all nodes", + Name: "list-nodes", + Usage: "list all nodes", Action: listNodes, }, }, @@ -200,7 +200,7 @@ func startServer(c *cli.Context) error { return err } - server, err := service.InitializeServer(conf, keyProvider, currentNode, &routing.RandomSelector{}) + server, err := service.InitializeServer(conf, keyProvider, currentNode) if err != nil { return err } diff --git a/config-sample.yaml b/config-sample.yaml index c03b01f22..760a74bd4 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -117,3 +117,11 @@ keys: # # optional # # cert_file: /path/to/cert.pem # # key_file: /path/to/key.pem + +# # node selector +# node_selector: +# # default: random. valid values: random, sysload +# kind: sysload +# # used in sysload node selector +# # do not assign room to node if load per CPU exceeds sysload_limit +# sysload_limit: 0.7 diff --git a/pkg/config/config.go b/pkg/config/config.go index 564d0a1d1..7f1d38c57 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -18,17 +18,18 @@ var DefaultStunServers = []string{ } type Config struct { - Port uint32 `yaml:"port"` - PrometheusPort uint32 `yaml:"prometheus_port"` - RTC RTCConfig `yaml:"rtc"` - Redis RedisConfig `yaml:"redis"` - Audio AudioConfig `yaml:"audio"` - Room RoomConfig `yaml:"room"` - TURN TURNConfig `yaml:"turn"` - WebHook WebHookConfig `yaml:"webhook"` - KeyFile string `yaml:"key_file"` - Keys map[string]string `yaml:"keys"` - LogLevel string `yaml:"log_level"` + Port uint32 `yaml:"port"` + PrometheusPort uint32 `yaml:"prometheus_port"` + RTC RTCConfig `yaml:"rtc"` + Redis RedisConfig `yaml:"redis"` + Audio AudioConfig `yaml:"audio"` + Room RoomConfig `yaml:"room"` + TURN TURNConfig `yaml:"turn"` + WebHook WebHookConfig `yaml:"webhook"` + NodeSelector NodeSelectorConfig `yaml:"node_selector"` + KeyFile string `yaml:"key_file"` + Keys map[string]string `yaml:"keys"` + LogLevel string `yaml:"log_level"` Development bool `yaml:"development"` } @@ -106,6 +107,11 @@ type WebHookConfig struct { APIKey string `yaml:"api_key"` } +type NodeSelectorConfig struct { + Kind string `yaml:"kind"` + SysloadLimit float32 `yaml:"sysload_limit"` +} + func NewConfig(confString string, c *cli.Context) (*Config, error) { // start with defaults conf := &Config{ @@ -145,6 +151,10 @@ func NewConfig(confString string, c *cli.Context) (*Config, error) { TURN: TURNConfig{ Enabled: false, }, + NodeSelector: NodeSelectorConfig{ + Kind: "random", + SysloadLimit: 0.7, + }, Keys: map[string]string{}, } if confString != "" { diff --git a/pkg/routing/randomselector.go b/pkg/routing/selectorrandom.go similarity index 100% rename from pkg/routing/randomselector.go rename to pkg/routing/selectorrandom.go diff --git a/pkg/routing/selectorsystemload.go b/pkg/routing/selectorsystemload.go new file mode 100644 index 000000000..1ecb83ad1 --- /dev/null +++ b/pkg/routing/selectorsystemload.go @@ -0,0 +1,34 @@ +package routing + +import ( + livekit "github.com/livekit/livekit-server/proto" + "github.com/thoas/go-funk" +) + +type SystemLoadSelector struct { + SysloadLimit float32 +} + +func (s *SystemLoadSelector) SelectNode(nodes []*livekit.Node, room *livekit.Room) (*livekit.Node, error) { + nodes = GetAvailableNodes(nodes) + if len(nodes) == 0 { + return nil, ErrNoAvailableNodes + } + + nodesLowLoad := []*livekit.Node{} + for _, node := range nodes { + numCpus := node.Stats.NumCpus + if numCpus == 0 { + numCpus = 1 + } + if node.Stats.LoadAvgLast1Min/float32(numCpus) < s.SysloadLimit { + nodesLowLoad = append(nodesLowLoad, node) + } + } + if len(nodesLowLoad) > 0 { + nodes = nodesLowLoad + } + + idx := funk.RandomInt(0, len(nodes)) + return nodes[idx], nil +} diff --git a/pkg/routing/selectorsystemload_test.go b/pkg/routing/selectorsystemload_test.go new file mode 100644 index 000000000..943acb12c --- /dev/null +++ b/pkg/routing/selectorsystemload_test.go @@ -0,0 +1,54 @@ +package routing_test + +import ( + "testing" + "time" + + "github.com/livekit/livekit-server/pkg/routing" + livekit "github.com/livekit/livekit-server/proto" + "github.com/stretchr/testify/require" +) + +var ( + nodeLoadLow = &livekit.Node{ + Stats: &livekit.NodeStats{ + UpdatedAt: time.Now().Unix(), + NumCpus: 1, + LoadAvgLast1Min: 0.0, + }, + } + + nodeLoadHigh = &livekit.Node{ + Stats: &livekit.NodeStats{ + UpdatedAt: time.Now().Unix(), + NumCpus: 1, + LoadAvgLast1Min: 2.0, + }, + } +) + +func TestSystemLoadSelector_SelectNode(t *testing.T) { + selector := routing.SystemLoadSelector{SysloadLimit: 1.0} + + nodes := []*livekit.Node{} + _, err := selector.SelectNode(nodes, nil) + require.Error(t, err, "should error no available nodes") + + // Select a node with high load when no nodes with low load are available + nodes = []*livekit.Node{nodeLoadHigh} + if _, err := selector.SelectNode(nodes, nil); err != nil { + t.Error(err) + } + + // Select a node with low load when available + nodes = []*livekit.Node{nodeLoadLow, nodeLoadHigh} + for i := 0; i < 5; i++ { + node, err := selector.SelectNode(nodes, nil) + if err != nil { + t.Error(err) + } + if node != nodeLoadLow { + t.Error("selected the wrong node") + } + } +} diff --git a/pkg/service/utils.go b/pkg/service/utils.go index ca17f1e55..50bce50f1 100644 --- a/pkg/service/utils.go +++ b/pkg/service/utils.go @@ -22,6 +22,7 @@ var ServiceSet = wire.NewSet( createRouter, createStore, createWebhookNotifier, + nodeSelectorFromConfig, NewRecordingService, NewRoomService, NewRTCService, @@ -83,6 +84,17 @@ func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (*web return webhook.NewNotifier(wc.APIKey, secret, wc.URLs), nil } +func nodeSelectorFromConfig(conf *config.Config) routing.NodeSelector { + switch conf.NodeSelector.Kind { + case "sysload": + return &routing.SystemLoadSelector{ + SysloadLimit: conf.NodeSelector.SysloadLimit, + } + default: + return &routing.RandomSelector{} + } +} + func handleError(w http.ResponseWriter, status int, msg string) { // GetLogger already with extra depth 1 logger.GetLogger().V(1).Info("error handling request", "error", msg, "status", status) diff --git a/pkg/service/wire.go b/pkg/service/wire.go index c679438cc..8bcf71ae7 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -11,7 +11,7 @@ import ( "github.com/livekit/livekit-server/pkg/routing" ) -func InitializeServer(conf *config.Config, keyProvider auth.KeyProvider, currentNode routing.LocalNode, selector routing.NodeSelector) (*LivekitServer, error) { +func InitializeServer(conf *config.Config, keyProvider auth.KeyProvider, currentNode routing.LocalNode) (*LivekitServer, error) { wire.Build( ServiceSet, ) diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 1cd5676e5..aefb9190e 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -14,18 +14,19 @@ import ( // Injectors from wire.go: -func InitializeServer(conf *config.Config, keyProvider auth.KeyProvider, currentNode routing.LocalNode, selector routing.NodeSelector) (*LivekitServer, error) { +func InitializeServer(conf *config.Config, keyProvider auth.KeyProvider, currentNode routing.LocalNode) (*LivekitServer, error) { client, err := createRedisClient(conf) if err != nil { return nil, err } roomStore := createStore(client) router := createRouter(client, currentNode) + nodeSelector := nodeSelectorFromConfig(conf) notifier, err := createWebhookNotifier(conf, keyProvider) if err != nil { return nil, err } - roomManager, err := NewRoomManager(roomStore, router, currentNode, selector, notifier, conf) + roomManager, err := NewRoomManager(roomStore, router, currentNode, nodeSelector, notifier, conf) if err != nil { return nil, err } diff --git a/test/integration_helpers.go b/test/integration_helpers.go index 3f4cf3b87..c4baaea69 100644 --- a/test/integration_helpers.go +++ b/test/integration_helpers.go @@ -145,7 +145,7 @@ func createSingleNodeServer() *service.LivekitServer { } currentNode.Id = utils.NewGuid(nodeId1) - s, err := service.InitializeServer(conf, &StaticKeyProvider{}, currentNode, &routing.RandomSelector{}) + s, err := service.InitializeServer(conf, &StaticKeyProvider{}, currentNode) if err != nil { panic(fmt.Sprintf("could not create server: %v", err)) } @@ -173,7 +173,7 @@ func createMultiNodeServer(nodeId string, port uint32) *service.LivekitServer { currentNode.Id = nodeId // redis routing and store - s, err := service.InitializeServer(conf, &StaticKeyProvider{}, currentNode, &routing.RandomSelector{}) + s, err := service.InitializeServer(conf, &StaticKeyProvider{}, currentNode) if err != nil { panic(fmt.Sprintf("could not create server: %v", err)) } diff --git a/test/turn_test.go b/test/turn_test.go index a4f73cd6f..ba1c78f2a 100644 --- a/test/turn_test.go +++ b/test/turn_test.go @@ -28,7 +28,7 @@ func testTurnServer(t *testing.T) { currentNode.Id = utils.NewGuid(nodeId1) // local routing and store - s, err := service.InitializeServer(conf, &StaticKeyProvider{}, currentNode, &routing.RandomSelector{}) + s, err := service.InitializeServer(conf, &StaticKeyProvider{}, currentNode) require.NoError(t, err) go s.Start() waitForServerToStart(s) diff --git a/test/webhook_test.go b/test/webhook_test.go index 1632285db..d6524e37d 100644 --- a/test/webhook_test.go +++ b/test/webhook_test.go @@ -103,7 +103,7 @@ func setupServerWithWebhook() (server *service.LivekitServer, testServer *webook } currentNode.Id = utils.NewGuid(nodeId1) - server, err = service.InitializeServer(conf, &StaticKeyProvider{}, currentNode, &routing.RandomSelector{}) + server, err = service.InitializeServer(conf, &StaticKeyProvider{}, currentNode) if err != nil { return }