NodeSelector using system load metric (#94)

* use load when selecting nodes

* Apply suggestions from code review

Co-authored-by: David Zhao <david@davidzhao.com>

* sysload selector test

* wire nodeSelectorFromConfig

* fix tests

* Update pkg/routing/selectorsystemload_test.go

Co-authored-by: David Zhao <david@davidzhao.com>

* dz review: nits

Co-authored-by: David Zhao <david@davidzhao.com>
This commit is contained in:
Mathew Kamkar
2021-08-26 15:24:39 -07:00
committed by GitHub
parent 0a9d12bc35
commit c437152e2b
12 changed files with 140 additions and 21 deletions

View File

@@ -124,8 +124,8 @@ func main() {
},
},
{
Name: "list-nodes",
Usage: "list all nodes",
Name: "list-nodes",
Usage: "list all nodes",
Action: listNodes,
},
},
@@ -200,7 +200,7 @@ func startServer(c *cli.Context) error {
return err
}
server, err := service.InitializeServer(conf, keyProvider, currentNode, &routing.RandomSelector{})
server, err := service.InitializeServer(conf, keyProvider, currentNode)
if err != nil {
return err
}

View File

@@ -117,3 +117,11 @@ keys:
# # optional
# # cert_file: /path/to/cert.pem
# # key_file: /path/to/key.pem
# # node selector
# node_selector:
# # default: random. valid values: random, sysload
# kind: sysload
# # used in sysload node selector
# # do not assign room to node if load per CPU exceeds sysload_limit
# sysload_limit: 0.7

View File

@@ -18,17 +18,18 @@ var DefaultStunServers = []string{
}
type Config struct {
Port uint32 `yaml:"port"`
PrometheusPort uint32 `yaml:"prometheus_port"`
RTC RTCConfig `yaml:"rtc"`
Redis RedisConfig `yaml:"redis"`
Audio AudioConfig `yaml:"audio"`
Room RoomConfig `yaml:"room"`
TURN TURNConfig `yaml:"turn"`
WebHook WebHookConfig `yaml:"webhook"`
KeyFile string `yaml:"key_file"`
Keys map[string]string `yaml:"keys"`
LogLevel string `yaml:"log_level"`
Port uint32 `yaml:"port"`
PrometheusPort uint32 `yaml:"prometheus_port"`
RTC RTCConfig `yaml:"rtc"`
Redis RedisConfig `yaml:"redis"`
Audio AudioConfig `yaml:"audio"`
Room RoomConfig `yaml:"room"`
TURN TURNConfig `yaml:"turn"`
WebHook WebHookConfig `yaml:"webhook"`
NodeSelector NodeSelectorConfig `yaml:"node_selector"`
KeyFile string `yaml:"key_file"`
Keys map[string]string `yaml:"keys"`
LogLevel string `yaml:"log_level"`
Development bool `yaml:"development"`
}
@@ -106,6 +107,11 @@ type WebHookConfig struct {
APIKey string `yaml:"api_key"`
}
type NodeSelectorConfig struct {
Kind string `yaml:"kind"`
SysloadLimit float32 `yaml:"sysload_limit"`
}
func NewConfig(confString string, c *cli.Context) (*Config, error) {
// start with defaults
conf := &Config{
@@ -145,6 +151,10 @@ func NewConfig(confString string, c *cli.Context) (*Config, error) {
TURN: TURNConfig{
Enabled: false,
},
NodeSelector: NodeSelectorConfig{
Kind: "random",
SysloadLimit: 0.7,
},
Keys: map[string]string{},
}
if confString != "" {

View File

@@ -0,0 +1,34 @@
package routing
import (
livekit "github.com/livekit/livekit-server/proto"
"github.com/thoas/go-funk"
)
type SystemLoadSelector struct {
SysloadLimit float32
}
func (s *SystemLoadSelector) SelectNode(nodes []*livekit.Node, room *livekit.Room) (*livekit.Node, error) {
nodes = GetAvailableNodes(nodes)
if len(nodes) == 0 {
return nil, ErrNoAvailableNodes
}
nodesLowLoad := []*livekit.Node{}
for _, node := range nodes {
numCpus := node.Stats.NumCpus
if numCpus == 0 {
numCpus = 1
}
if node.Stats.LoadAvgLast1Min/float32(numCpus) < s.SysloadLimit {
nodesLowLoad = append(nodesLowLoad, node)
}
}
if len(nodesLowLoad) > 0 {
nodes = nodesLowLoad
}
idx := funk.RandomInt(0, len(nodes))
return nodes[idx], nil
}

View File

@@ -0,0 +1,54 @@
package routing_test
import (
"testing"
"time"
"github.com/livekit/livekit-server/pkg/routing"
livekit "github.com/livekit/livekit-server/proto"
"github.com/stretchr/testify/require"
)
var (
nodeLoadLow = &livekit.Node{
Stats: &livekit.NodeStats{
UpdatedAt: time.Now().Unix(),
NumCpus: 1,
LoadAvgLast1Min: 0.0,
},
}
nodeLoadHigh = &livekit.Node{
Stats: &livekit.NodeStats{
UpdatedAt: time.Now().Unix(),
NumCpus: 1,
LoadAvgLast1Min: 2.0,
},
}
)
func TestSystemLoadSelector_SelectNode(t *testing.T) {
selector := routing.SystemLoadSelector{SysloadLimit: 1.0}
nodes := []*livekit.Node{}
_, err := selector.SelectNode(nodes, nil)
require.Error(t, err, "should error no available nodes")
// Select a node with high load when no nodes with low load are available
nodes = []*livekit.Node{nodeLoadHigh}
if _, err := selector.SelectNode(nodes, nil); err != nil {
t.Error(err)
}
// Select a node with low load when available
nodes = []*livekit.Node{nodeLoadLow, nodeLoadHigh}
for i := 0; i < 5; i++ {
node, err := selector.SelectNode(nodes, nil)
if err != nil {
t.Error(err)
}
if node != nodeLoadLow {
t.Error("selected the wrong node")
}
}
}

View File

@@ -22,6 +22,7 @@ var ServiceSet = wire.NewSet(
createRouter,
createStore,
createWebhookNotifier,
nodeSelectorFromConfig,
NewRecordingService,
NewRoomService,
NewRTCService,
@@ -83,6 +84,17 @@ func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (*web
return webhook.NewNotifier(wc.APIKey, secret, wc.URLs), nil
}
func nodeSelectorFromConfig(conf *config.Config) routing.NodeSelector {
switch conf.NodeSelector.Kind {
case "sysload":
return &routing.SystemLoadSelector{
SysloadLimit: conf.NodeSelector.SysloadLimit,
}
default:
return &routing.RandomSelector{}
}
}
func handleError(w http.ResponseWriter, status int, msg string) {
// GetLogger already with extra depth 1
logger.GetLogger().V(1).Info("error handling request", "error", msg, "status", status)

View File

@@ -11,7 +11,7 @@ import (
"github.com/livekit/livekit-server/pkg/routing"
)
func InitializeServer(conf *config.Config, keyProvider auth.KeyProvider, currentNode routing.LocalNode, selector routing.NodeSelector) (*LivekitServer, error) {
func InitializeServer(conf *config.Config, keyProvider auth.KeyProvider, currentNode routing.LocalNode) (*LivekitServer, error) {
wire.Build(
ServiceSet,
)

View File

@@ -14,18 +14,19 @@ import (
// Injectors from wire.go:
func InitializeServer(conf *config.Config, keyProvider auth.KeyProvider, currentNode routing.LocalNode, selector routing.NodeSelector) (*LivekitServer, error) {
func InitializeServer(conf *config.Config, keyProvider auth.KeyProvider, currentNode routing.LocalNode) (*LivekitServer, error) {
client, err := createRedisClient(conf)
if err != nil {
return nil, err
}
roomStore := createStore(client)
router := createRouter(client, currentNode)
nodeSelector := nodeSelectorFromConfig(conf)
notifier, err := createWebhookNotifier(conf, keyProvider)
if err != nil {
return nil, err
}
roomManager, err := NewRoomManager(roomStore, router, currentNode, selector, notifier, conf)
roomManager, err := NewRoomManager(roomStore, router, currentNode, nodeSelector, notifier, conf)
if err != nil {
return nil, err
}

View File

@@ -145,7 +145,7 @@ func createSingleNodeServer() *service.LivekitServer {
}
currentNode.Id = utils.NewGuid(nodeId1)
s, err := service.InitializeServer(conf, &StaticKeyProvider{}, currentNode, &routing.RandomSelector{})
s, err := service.InitializeServer(conf, &StaticKeyProvider{}, currentNode)
if err != nil {
panic(fmt.Sprintf("could not create server: %v", err))
}
@@ -173,7 +173,7 @@ func createMultiNodeServer(nodeId string, port uint32) *service.LivekitServer {
currentNode.Id = nodeId
// redis routing and store
s, err := service.InitializeServer(conf, &StaticKeyProvider{}, currentNode, &routing.RandomSelector{})
s, err := service.InitializeServer(conf, &StaticKeyProvider{}, currentNode)
if err != nil {
panic(fmt.Sprintf("could not create server: %v", err))
}

View File

@@ -28,7 +28,7 @@ func testTurnServer(t *testing.T) {
currentNode.Id = utils.NewGuid(nodeId1)
// local routing and store
s, err := service.InitializeServer(conf, &StaticKeyProvider{}, currentNode, &routing.RandomSelector{})
s, err := service.InitializeServer(conf, &StaticKeyProvider{}, currentNode)
require.NoError(t, err)
go s.Start()
waitForServerToStart(s)

View File

@@ -103,7 +103,7 @@ func setupServerWithWebhook() (server *service.LivekitServer, testServer *webook
}
currentNode.Id = utils.NewGuid(nodeId1)
server, err = service.InitializeServer(conf, &StaticKeyProvider{}, currentNode, &routing.RandomSelector{})
server, err = service.InitializeServer(conf, &StaticKeyProvider{}, currentNode)
if err != nil {
return
}