"Power of Two Random Choices" option for node selection (#3785)

* Added optional "Power of Two Random Choices" algorithm for the node selector sort_by feature. The current, default behavior of picking the lowest-valued node remains.
This commit is contained in:
Alan Willard
2025-10-08 18:31:01 -04:00
committed by GitHub
parent a20bbe34fa
commit a8d4df66f4
14 changed files with 373 additions and 22 deletions

View File

@@ -291,6 +291,9 @@ keys:
# # priority used for selection of node when multiple are available
# # default: random. valid values: random, sysload, cpuload, rooms, clients, tracks, bytespersec
# sort_by: sysload
# # algorithm used to govern selecting from sorted nodes
# # default: lowest. valid values: lowest, twochoice
# algorithm: lowest
# # used in sysload and regionaware
# # do not assign room to node if load per CPU exceeds sysload_limit
# sysload_limit: 0.7

View File

@@ -212,6 +212,7 @@ type TURNConfig struct {
type NodeSelectorConfig struct {
Kind string `yaml:"kind,omitempty"`
SortBy string `yaml:"sort_by,omitempty"`
Algorithm string `yaml:"algorithm,omitempty"`
CPULoadLimit float32 `yaml:"cpu_load_limit,omitempty"`
SysloadLimit float32 `yaml:"sysload_limit,omitempty"`
Regions []RegionConfig `yaml:"regions,omitempty"`
@@ -395,6 +396,7 @@ var DefaultConfig = Config{
SortBy: "random",
SysloadLimit: 0.9,
CPULoadLimit: 0.9,
Algorithm: "lowest",
},
SignalRelay: SignalRelayConfig{
RetryTimeout: 7500 * time.Millisecond,

View File

@@ -20,7 +20,8 @@ import (
// AnySelector selects any available node with no limitations
type AnySelector struct {
SortBy string
SortBy string
Algorithm string
}
func (s *AnySelector) SelectNode(nodes []*livekit.Node) (*livekit.Node, error) {
@@ -29,5 +30,5 @@ func (s *AnySelector) SelectNode(nodes []*livekit.Node) (*livekit.Node, error) {
return nil, ErrNoAvailableNodes
}
return SelectSortedNode(nodes, s.SortBy)
return SelectSortedNode(nodes, s.SortBy, s.Algorithm)
}

View File

@@ -0,0 +1,287 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package selector
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/livekit/protocol/livekit"
)
func createTestNode(id string, cpuLoad float32, numRooms int32, numClients int32, state livekit.NodeState) *livekit.Node {
return &livekit.Node{
Id: id,
State: state,
Stats: &livekit.NodeStats{
UpdatedAt: time.Now().Unix() - 1, // Recent update to be considered available
CpuLoad: cpuLoad,
NumRooms: numRooms,
NumClients: numClients,
NumCpus: 4,
LoadAvgLast1Min: cpuLoad * 4, // Simulate system load
},
}
}
func TestAnySelector_SelectNode_TwoChoice(t *testing.T) {
tests := []struct {
name string
sortBy string
algorithm string
nodes []*livekit.Node
wantErr string
expected string
notExpected string
}{
{
name: "successful selection with cpuload sorting",
sortBy: "cpuload",
algorithm: "twochoice",
nodes: []*livekit.Node{
createTestNode("node1", 0.8, 5, 10, livekit.NodeState_SERVING),
createTestNode("node2", 0.3, 2, 5, livekit.NodeState_SERVING),
createTestNode("node3", 0.6, 3, 8, livekit.NodeState_SERVING),
createTestNode("node4", 0.9, 6, 12, livekit.NodeState_SERVING),
},
wantErr: "",
expected: "", // Not determinstic selection, so no specific expected node
notExpected: "node4", // Node with highest load should not be selected
},
{
name: "successful selection with rooms sorting",
sortBy: "rooms",
algorithm: "twochoice",
nodes: []*livekit.Node{
createTestNode("node1", 0.5, 8, 15, livekit.NodeState_SERVING),
createTestNode("node2", 0.4, 2, 5, livekit.NodeState_SERVING),
createTestNode("node3", 0.6, 12, 20, livekit.NodeState_SERVING),
},
wantErr: "",
expected: "", // Not determinstic selection, so no specific expected node
notExpected: "node3", // Node with highest room count should not be selected
},
{
name: "successful selection with clients sorting",
sortBy: "clients",
algorithm: "twochoice",
nodes: []*livekit.Node{
createTestNode("node1", 0.5, 3, 25, livekit.NodeState_SERVING),
createTestNode("node2", 0.4, 2, 5, livekit.NodeState_SERVING),
createTestNode("node3", 0.6, 4, 30, livekit.NodeState_SERVING),
},
wantErr: "",
expected: "", // Not determinstic selection, so no specific expected node
notExpected: "node3", // Node with highest clients should not be selected
},
{
name: "empty nodes list",
sortBy: "cpuload",
algorithm: "twochoice",
nodes: []*livekit.Node{},
wantErr: "could not find any available nodes",
},
{
name: "no available nodes - all unavailable",
sortBy: "cpuload",
algorithm: "twochoice",
nodes: []*livekit.Node{
{
Id: "node1",
State: livekit.NodeState_SERVING,
Stats: &livekit.NodeStats{
UpdatedAt: time.Now().Unix() - 10, // Too old
CpuLoad: 0.3,
},
},
},
wantErr: "could not find any available nodes",
},
{
name: "no available nodes - not serving",
sortBy: "cpuload",
algorithm: "twochoice",
nodes: []*livekit.Node{
{
Id: "node1",
State: livekit.NodeState_SHUTTING_DOWN,
Stats: &livekit.NodeStats{
UpdatedAt: time.Now().Unix() - 1,
CpuLoad: 0.3,
},
},
},
wantErr: "could not find any available nodes",
},
{
name: "single available node",
sortBy: "cpuload",
algorithm: "twochoice",
nodes: []*livekit.Node{
createTestNode("node1", 0.5, 3, 10, livekit.NodeState_SERVING),
},
wantErr: "",
expected: "node1", // Should select the only available node
notExpected: "", // No other nodes to compare against
},
{
name: "two available nodes",
sortBy: "cpuload",
algorithm: "twochoice",
nodes: []*livekit.Node{
createTestNode("node1", 0.8, 5, 15, livekit.NodeState_SERVING),
createTestNode("node2", 0.3, 2, 5, livekit.NodeState_SERVING),
},
wantErr: "",
expected: "node2", // Should select the node with lower load
notExpected: "node1", // Should not select the node with higher load
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
selector := &AnySelector{
SortBy: tt.sortBy,
Algorithm: tt.algorithm,
}
node, err := selector.SelectNode(tt.nodes)
if tt.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tt.wantErr)
require.Nil(t, node)
} else {
require.NoError(t, err)
require.NotNil(t, node)
require.NotEmpty(t, node.Id)
// Verify the selected node is one of the available nodes
found := false
availableNodes := GetAvailableNodes(tt.nodes)
for _, availableNode := range availableNodes {
if availableNode.Id == node.Id {
found = true
break
}
}
require.True(t, found, "Selected node should be one of the available nodes")
if tt.expected != "" {
require.Equal(t, tt.expected, node.Id, "Selected node should match expected")
}
if tt.notExpected != "" {
require.NotEqual(t, tt.notExpected, node.Id, "Selected node should not match not expected")
}
}
})
}
}
func TestAnySelector_SelectNode_TwoChoice_Probabilistic_Behavior(t *testing.T) {
// Test that two-choice algorithm favors nodes with lower metrics
// This test runs multiple iterations to increase confidence in the probabilistic behavior
selector := &AnySelector{
SortBy: "cpuload",
Algorithm: "twochoice",
}
// Create nodes where node2 has significantly lower CPU load
nodes := []*livekit.Node{
createTestNode("node1", 0.95, 10, 20, livekit.NodeState_SERVING), // Very high load
createTestNode("node2", 0.1, 1, 2, livekit.NodeState_SERVING), // Low load
createTestNode("node3", 0.5, 9, 18, livekit.NodeState_SERVING), // Medium load
createTestNode("node4", 0.85, 8, 16, livekit.NodeState_SERVING), // High load
}
// Run multiple selections and count how often the low-load node is selected
iterations := 1000
lowLoadSelections := 0
higestLoadSelections := 0
for i := 0; i < iterations; i++ {
node, err := selector.SelectNode(nodes)
require.NoError(t, err)
require.NotNil(t, node)
if node.Id == "node2" {
lowLoadSelections++
}
if node.Id == "node1" {
higestLoadSelections++
}
}
// The low-load node should be selected more often than pure random (25%)
// Due to the two-choice algorithm favoring the better node
selectionRate := float64(lowLoadSelections) / float64(iterations)
require.Greater(t, selectionRate, 0.4, "Two-choice algorithm should favor the low-load node more than random selection")
require.Equal(t, higestLoadSelections, 0, "Two-choice algorithm should never favor the highest load node")
}
func TestAnySelector_SelectNode_InvalidParameters(t *testing.T) {
nodes := []*livekit.Node{
createTestNode("node1", 0.5, 3, 10, livekit.NodeState_SERVING),
}
tests := []struct {
name string
sortBy string
algorithm string
wantErr string
}{
{
name: "empty sortBy",
sortBy: "",
algorithm: "twochoice",
wantErr: "sort by option cannot be blank",
},
{
name: "empty algorithm",
sortBy: "cpuload",
algorithm: "",
wantErr: "node selector algorithm option cannot be blank",
},
{
name: "unknown sortBy",
sortBy: "invalid",
algorithm: "twochoice",
wantErr: "unknown sort by option",
},
{
name: "unknown algorithm",
sortBy: "cpuload",
algorithm: "invalid",
wantErr: "unknown node selector algorithm option",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
selector := &AnySelector{
SortBy: tt.sortBy,
Algorithm: tt.algorithm,
}
node, err := selector.SelectNode(nodes)
require.Error(t, err)
require.Contains(t, err.Error(), tt.wantErr)
require.Nil(t, node)
})
}
}

View File

@@ -23,6 +23,7 @@ import (
type CPULoadSelector struct {
CPULoadLimit float32
SortBy string
Algorithm string
}
func (s *CPULoadSelector) filterNodes(nodes []*livekit.Node) ([]*livekit.Node, error) {
@@ -50,5 +51,5 @@ func (s *CPULoadSelector) SelectNode(nodes []*livekit.Node) (*livekit.Node, erro
return nil, err
}
return SelectSortedNode(nodes, s.SortBy)
return SelectSortedNode(nodes, s.SortBy, s.Algorithm)
}

View File

@@ -25,7 +25,7 @@ import (
)
func TestCPULoadSelector_SelectNode(t *testing.T) {
sel := selector.CPULoadSelector{CPULoadLimit: 0.8, SortBy: "random"}
sel := selector.CPULoadSelector{CPULoadLimit: 0.8, SortBy: "random", Algorithm: "lowest"}
var nodes []*livekit.Node
_, err := sel.SelectNode(nodes)

View File

@@ -21,5 +21,7 @@ var (
ErrCurrentRegionNotSet = errors.New("current region cannot be blank")
ErrCurrentRegionUnknownLatLon = errors.New("unknown lat and lon for the current region")
ErrSortByNotSet = errors.New("sort by option cannot be blank")
ErrAlgorithmNotSet = errors.New("node selector algorithm option cannot be blank")
ErrSortByUnknown = errors.New("unknown sort by option")
ErrAlgorithmUnknown = errors.New("unknown node selector algorithm option")
)

View File

@@ -37,19 +37,21 @@ func CreateNodeSelector(conf *config.Config) (NodeSelector, error) {
}
switch kind {
case "any":
return &AnySelector{conf.NodeSelector.SortBy}, nil
return &AnySelector{conf.NodeSelector.SortBy, conf.NodeSelector.Algorithm}, nil
case "cpuload":
return &CPULoadSelector{
CPULoadLimit: conf.NodeSelector.CPULoadLimit,
SortBy: conf.NodeSelector.SortBy,
SortBy: conf.NodeSelector.SortBy,
Algorithm: conf.NodeSelector.Algorithm,
}, nil
case "sysload":
return &SystemLoadSelector{
SysloadLimit: conf.NodeSelector.SysloadLimit,
SortBy: conf.NodeSelector.SortBy,
SortBy: conf.NodeSelector.SortBy,
Algorithm: conf.NodeSelector.Algorithm,
}, nil
case "regionaware":
s, err := NewRegionAwareSelector(conf.Region, conf.NodeSelector.Regions, conf.NodeSelector.SortBy)
s, err := NewRegionAwareSelector(conf.Region, conf.NodeSelector.Regions, conf.NodeSelector.SortBy, conf.NodeSelector.Algorithm)
if err != nil {
return nil, err
}
@@ -57,7 +59,7 @@ func CreateNodeSelector(conf *config.Config) (NodeSelector, error) {
return s, nil
case "random":
logger.Warnw("random node selector is deprecated, please switch to \"any\" or another selector", nil)
return &AnySelector{conf.NodeSelector.SortBy}, nil
return &AnySelector{conf.NodeSelector.SortBy, conf.NodeSelector.Algorithm}, nil
default:
return nil, ErrUnsupportedSelector
}

View File

@@ -29,9 +29,10 @@ type RegionAwareSelector struct {
regionDistances map[string]float64
regions []config.RegionConfig
SortBy string
Algorithm string
}
func NewRegionAwareSelector(currentRegion string, regions []config.RegionConfig, sortBy string) (*RegionAwareSelector, error) {
func NewRegionAwareSelector(currentRegion string, regions []config.RegionConfig, sortBy string, algorithm string) (*RegionAwareSelector, error) {
if currentRegion == "" {
return nil, ErrCurrentRegionNotSet
}
@@ -41,6 +42,7 @@ func NewRegionAwareSelector(currentRegion string, regions []config.RegionConfig,
regionDistances: make(map[string]float64),
regions: regions,
SortBy: sortBy,
Algorithm: algorithm,
}
var currentRC *config.RegionConfig
@@ -94,7 +96,7 @@ func (s *RegionAwareSelector) SelectNode(nodes []*livekit.Node) (*livekit.Node,
nodes = nearestNodes
}
return SelectSortedNode(nodes, s.SortBy)
return SelectSortedNode(nodes, s.SortBy, s.Algorithm)
}
// haversine(θ) function

View File

@@ -34,6 +34,7 @@ const (
regionEast = "us-east"
regionSeattle = "seattle"
sortBy = "random"
algorithm = "lowest"
)
func TestRegionAwareRouting(t *testing.T) {
@@ -58,7 +59,7 @@ func TestRegionAwareRouting(t *testing.T) {
nodes := []*livekit.Node{
newTestNodeInRegion("", false),
}
s, err := selector.NewRegionAwareSelector(regionEast, nil, sortBy)
s, err := selector.NewRegionAwareSelector(regionEast, nil, sortBy, algorithm)
require.NoError(t, err)
node, err := s.SelectNode(nodes)
@@ -74,7 +75,7 @@ func TestRegionAwareRouting(t *testing.T) {
expectedNode,
newTestNodeInRegion(regionEast, false),
}
s, err := selector.NewRegionAwareSelector(regionEast, rc, sortBy)
s, err := selector.NewRegionAwareSelector(regionEast, rc, sortBy, algorithm)
require.NoError(t, err)
s.SysloadLimit = loadLimit
@@ -91,7 +92,7 @@ func TestRegionAwareRouting(t *testing.T) {
newTestNodeInRegion(regionWest, true),
newTestNodeInRegion(regionEast, false),
}
s, err := selector.NewRegionAwareSelector(regionEast, rc, sortBy)
s, err := selector.NewRegionAwareSelector(regionEast, rc, sortBy, algorithm)
require.NoError(t, err)
s.SysloadLimit = loadLimit
@@ -107,7 +108,7 @@ func TestRegionAwareRouting(t *testing.T) {
expectedNode,
newTestNodeInRegion(regionEast, true),
}
s, err := selector.NewRegionAwareSelector(regionSeattle, rc, sortBy)
s, err := selector.NewRegionAwareSelector(regionSeattle, rc, sortBy, algorithm)
require.NoError(t, err)
s.SysloadLimit = loadLimit
@@ -125,7 +126,7 @@ func TestRegionAwareRouting(t *testing.T) {
expectedNode,
expectedNode,
}
s, err := selector.NewRegionAwareSelector(regionSeattle, rc, sortBy)
s, err := selector.NewRegionAwareSelector(regionSeattle, rc, sortBy, algorithm)
require.NoError(t, err)
s.SysloadLimit = loadLimit
@@ -138,7 +139,7 @@ func TestRegionAwareRouting(t *testing.T) {
nodes := []*livekit.Node{
newTestNodeInRegion(regionWest, true),
}
s, err := selector.NewRegionAwareSelector(regionEast, rc, sortBy)
s, err := selector.NewRegionAwareSelector(regionEast, rc, sortBy, algorithm)
require.NoError(t, err)
node, err := s.SelectNode(nodes)

View File

@@ -23,7 +23,7 @@ import (
)
func SortByTest(t *testing.T, sortBy string) {
sel := selector.SystemLoadSelector{SortBy: sortBy}
sel := selector.SystemLoadSelector{SortBy: sortBy, Algorithm: "lowest"}
nodes := []*livekit.Node{nodeLoadLow, nodeLoadMedium, nodeLoadHigh}
for i := 0; i < 5; i++ {
@@ -38,7 +38,7 @@ func SortByTest(t *testing.T, sortBy string) {
}
func TestSortByErrors(t *testing.T) {
sel := selector.SystemLoadSelector{}
sel := selector.SystemLoadSelector{Algorithm: "lowest"}
nodes := []*livekit.Node{nodeLoadLow, nodeLoadMedium, nodeLoadHigh}
// Test unset sort by option error

View File

@@ -23,6 +23,7 @@ import (
type SystemLoadSelector struct {
SysloadLimit float32
SortBy string
Algorithm string
}
func (s *SystemLoadSelector) filterNodes(nodes []*livekit.Node) ([]*livekit.Node, error) {
@@ -49,5 +50,5 @@ func (s *SystemLoadSelector) SelectNode(nodes []*livekit.Node) (*livekit.Node, e
return nil, err
}
return SelectSortedNode(nodes, s.SortBy)
return SelectSortedNode(nodes, s.SortBy, s.Algorithm)
}

View File

@@ -88,7 +88,7 @@ var (
)
func TestSystemLoadSelector_SelectNode(t *testing.T) {
sel := selector.SystemLoadSelector{SysloadLimit: 1.0, SortBy: "random"}
sel := selector.SystemLoadSelector{SysloadLimit: 1.0, SortBy: "random", Algorithm: "lowest"}
var nodes []*livekit.Node
_, err := sel.SelectNode(nodes)

View File

@@ -15,6 +15,7 @@
package selector
import (
"math/rand/v2"
"sort"
"time"
@@ -74,11 +75,49 @@ func LimitsReached(limitConfig config.LimitConfig, nodeStats *livekit.NodeStats)
return false
}
func SelectSortedNode(nodes []*livekit.Node, sortBy string) (*livekit.Node, error) {
func SelectSortedNode(nodes []*livekit.Node, sortBy string, algorithm string) (*livekit.Node, error) {
if sortBy == "" {
return nil, ErrSortByNotSet
}
if algorithm == "" {
return nil, ErrAlgorithmNotSet
}
switch algorithm {
case "lowest": // examine all nodes and select the lowest based on sort criteria
return selectLowestSortedNode(nodes, sortBy)
case "twochoice": // randomly select two nodes and return the lowest based on sort criteria "Power of Two Random Choices"
return selectTwoChoiceSortedNode(nodes, sortBy)
default:
return nil, ErrAlgorithmUnknown
}
}
func selectTwoChoiceSortedNode(nodes []*livekit.Node, sortBy string) (*livekit.Node, error) {
if len(nodes) <= 2 {
return selectLowestSortedNode(nodes, sortBy)
}
// randomly select two nodes
node1, node2, err := selectTwoRandomNodes(nodes)
if err != nil {
return nil, err
}
// compare the two nodes based on the sort criteria
if node1 == nil || node2 == nil {
return nil, ErrNoAvailableNodes
}
selectedNode, err := selectLowestSortedNode([]*livekit.Node{node1, node2}, sortBy)
if err != nil {
return nil, err
}
return selectedNode, nil
}
func selectLowestSortedNode(nodes []*livekit.Node, sortBy string) (*livekit.Node, error) {
// Return a node based on what it should be sorted by for priority
switch sortBy {
case "random":
@@ -127,3 +166,13 @@ func SelectSortedNode(nodes []*livekit.Node, sortBy string) (*livekit.Node, erro
return nil, ErrSortByUnknown
}
}
func selectTwoRandomNodes(nodes []*livekit.Node) (*livekit.Node, *livekit.Node, error) {
if len(nodes) < 2 {
return nil, nil, ErrNoAvailableNodes
}
shuffledIndices := rand.Perm(len(nodes))
return nodes[shuffledIndices[0]], nodes[shuffledIndices[1]], nil
}