diff --git a/config-sample.yaml b/config-sample.yaml index 2316137c0..4916c9d05 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -291,6 +291,9 @@ keys: # # priority used for selection of node when multiple are available # # default: random. valid values: random, sysload, cpuload, rooms, clients, tracks, bytespersec # sort_by: sysload +# # algorithm used to govern selecting from sorted nodes +# # default: lowest. valid values: lowest, twochoice +# algorithm: lowest # # used in sysload and regionaware # # do not assign room to node if load per CPU exceeds sysload_limit # sysload_limit: 0.7 diff --git a/pkg/config/config.go b/pkg/config/config.go index 83b399a3b..4b6a8bc66 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -212,6 +212,7 @@ type TURNConfig struct { type NodeSelectorConfig struct { Kind string `yaml:"kind,omitempty"` SortBy string `yaml:"sort_by,omitempty"` + Algorithm string `yaml:"algorithm,omitempty"` CPULoadLimit float32 `yaml:"cpu_load_limit,omitempty"` SysloadLimit float32 `yaml:"sysload_limit,omitempty"` Regions []RegionConfig `yaml:"regions,omitempty"` @@ -395,6 +396,7 @@ var DefaultConfig = Config{ SortBy: "random", SysloadLimit: 0.9, CPULoadLimit: 0.9, + Algorithm: "lowest", }, SignalRelay: SignalRelayConfig{ RetryTimeout: 7500 * time.Millisecond, diff --git a/pkg/routing/selector/any.go b/pkg/routing/selector/any.go index 71f09ba87..592e2ed71 100644 --- a/pkg/routing/selector/any.go +++ b/pkg/routing/selector/any.go @@ -20,7 +20,8 @@ import ( // AnySelector selects any available node with no limitations type AnySelector struct { - SortBy string + SortBy string + Algorithm string } func (s *AnySelector) SelectNode(nodes []*livekit.Node) (*livekit.Node, error) { @@ -29,5 +30,5 @@ func (s *AnySelector) SelectNode(nodes []*livekit.Node) (*livekit.Node, error) { return nil, ErrNoAvailableNodes } - return SelectSortedNode(nodes, s.SortBy) + return SelectSortedNode(nodes, s.SortBy, s.Algorithm) } diff --git a/pkg/routing/selector/any_test.go b/pkg/routing/selector/any_test.go new file mode 100644 index 000000000..a38f80ca0 --- /dev/null +++ b/pkg/routing/selector/any_test.go @@ -0,0 +1,287 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package selector + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/livekit/protocol/livekit" +) + +func createTestNode(id string, cpuLoad float32, numRooms int32, numClients int32, state livekit.NodeState) *livekit.Node { + return &livekit.Node{ + Id: id, + State: state, + Stats: &livekit.NodeStats{ + UpdatedAt: time.Now().Unix() - 1, // Recent update to be considered available + CpuLoad: cpuLoad, + NumRooms: numRooms, + NumClients: numClients, + NumCpus: 4, + LoadAvgLast1Min: cpuLoad * 4, // Simulate system load + }, + } +} + +func TestAnySelector_SelectNode_TwoChoice(t *testing.T) { + tests := []struct { + name string + sortBy string + algorithm string + nodes []*livekit.Node + wantErr string + expected string + notExpected string + }{ + { + name: "successful selection with cpuload sorting", + sortBy: "cpuload", + algorithm: "twochoice", + nodes: []*livekit.Node{ + createTestNode("node1", 0.8, 5, 10, livekit.NodeState_SERVING), + createTestNode("node2", 0.3, 2, 5, livekit.NodeState_SERVING), + createTestNode("node3", 0.6, 3, 8, livekit.NodeState_SERVING), + createTestNode("node4", 0.9, 6, 12, livekit.NodeState_SERVING), + }, + wantErr: "", + expected: "", // Not determinstic selection, so no specific expected node + notExpected: "node4", // Node with highest load should not be selected + }, + { + name: "successful selection with rooms sorting", + sortBy: "rooms", + algorithm: "twochoice", + nodes: []*livekit.Node{ + createTestNode("node1", 0.5, 8, 15, livekit.NodeState_SERVING), + createTestNode("node2", 0.4, 2, 5, livekit.NodeState_SERVING), + createTestNode("node3", 0.6, 12, 20, livekit.NodeState_SERVING), + }, + wantErr: "", + expected: "", // Not determinstic selection, so no specific expected node + notExpected: "node3", // Node with highest room count should not be selected + }, + { + name: "successful selection with clients sorting", + sortBy: "clients", + algorithm: "twochoice", + nodes: []*livekit.Node{ + createTestNode("node1", 0.5, 3, 25, livekit.NodeState_SERVING), + createTestNode("node2", 0.4, 2, 5, livekit.NodeState_SERVING), + createTestNode("node3", 0.6, 4, 30, livekit.NodeState_SERVING), + }, + wantErr: "", + expected: "", // Not determinstic selection, so no specific expected node + notExpected: "node3", // Node with highest clients should not be selected + }, + { + name: "empty nodes list", + sortBy: "cpuload", + algorithm: "twochoice", + nodes: []*livekit.Node{}, + wantErr: "could not find any available nodes", + }, + { + name: "no available nodes - all unavailable", + sortBy: "cpuload", + algorithm: "twochoice", + nodes: []*livekit.Node{ + { + Id: "node1", + State: livekit.NodeState_SERVING, + Stats: &livekit.NodeStats{ + UpdatedAt: time.Now().Unix() - 10, // Too old + CpuLoad: 0.3, + }, + }, + }, + wantErr: "could not find any available nodes", + }, + { + name: "no available nodes - not serving", + sortBy: "cpuload", + algorithm: "twochoice", + nodes: []*livekit.Node{ + { + Id: "node1", + State: livekit.NodeState_SHUTTING_DOWN, + Stats: &livekit.NodeStats{ + UpdatedAt: time.Now().Unix() - 1, + CpuLoad: 0.3, + }, + }, + }, + wantErr: "could not find any available nodes", + }, + { + name: "single available node", + sortBy: "cpuload", + algorithm: "twochoice", + nodes: []*livekit.Node{ + createTestNode("node1", 0.5, 3, 10, livekit.NodeState_SERVING), + }, + wantErr: "", + expected: "node1", // Should select the only available node + notExpected: "", // No other nodes to compare against + }, + { + name: "two available nodes", + sortBy: "cpuload", + algorithm: "twochoice", + nodes: []*livekit.Node{ + createTestNode("node1", 0.8, 5, 15, livekit.NodeState_SERVING), + createTestNode("node2", 0.3, 2, 5, livekit.NodeState_SERVING), + }, + wantErr: "", + expected: "node2", // Should select the node with lower load + notExpected: "node1", // Should not select the node with higher load + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + selector := &AnySelector{ + SortBy: tt.sortBy, + Algorithm: tt.algorithm, + } + + node, err := selector.SelectNode(tt.nodes) + + if tt.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tt.wantErr) + require.Nil(t, node) + } else { + require.NoError(t, err) + require.NotNil(t, node) + require.NotEmpty(t, node.Id) + + // Verify the selected node is one of the available nodes + found := false + availableNodes := GetAvailableNodes(tt.nodes) + for _, availableNode := range availableNodes { + if availableNode.Id == node.Id { + found = true + break + } + } + require.True(t, found, "Selected node should be one of the available nodes") + + if tt.expected != "" { + require.Equal(t, tt.expected, node.Id, "Selected node should match expected") + } + if tt.notExpected != "" { + require.NotEqual(t, tt.notExpected, node.Id, "Selected node should not match not expected") + } + } + }) + } +} + +func TestAnySelector_SelectNode_TwoChoice_Probabilistic_Behavior(t *testing.T) { + // Test that two-choice algorithm favors nodes with lower metrics + // This test runs multiple iterations to increase confidence in the probabilistic behavior + selector := &AnySelector{ + SortBy: "cpuload", + Algorithm: "twochoice", + } + + // Create nodes where node2 has significantly lower CPU load + nodes := []*livekit.Node{ + createTestNode("node1", 0.95, 10, 20, livekit.NodeState_SERVING), // Very high load + createTestNode("node2", 0.1, 1, 2, livekit.NodeState_SERVING), // Low load + createTestNode("node3", 0.5, 9, 18, livekit.NodeState_SERVING), // Medium load + createTestNode("node4", 0.85, 8, 16, livekit.NodeState_SERVING), // High load + } + + // Run multiple selections and count how often the low-load node is selected + iterations := 1000 + lowLoadSelections := 0 + higestLoadSelections := 0 + + for i := 0; i < iterations; i++ { + node, err := selector.SelectNode(nodes) + require.NoError(t, err) + require.NotNil(t, node) + + if node.Id == "node2" { + lowLoadSelections++ + } + if node.Id == "node1" { + higestLoadSelections++ + } + } + + // The low-load node should be selected more often than pure random (25%) + // Due to the two-choice algorithm favoring the better node + selectionRate := float64(lowLoadSelections) / float64(iterations) + require.Greater(t, selectionRate, 0.4, "Two-choice algorithm should favor the low-load node more than random selection") + require.Equal(t, higestLoadSelections, 0, "Two-choice algorithm should never favor the highest load node") +} + +func TestAnySelector_SelectNode_InvalidParameters(t *testing.T) { + nodes := []*livekit.Node{ + createTestNode("node1", 0.5, 3, 10, livekit.NodeState_SERVING), + } + + tests := []struct { + name string + sortBy string + algorithm string + wantErr string + }{ + { + name: "empty sortBy", + sortBy: "", + algorithm: "twochoice", + wantErr: "sort by option cannot be blank", + }, + { + name: "empty algorithm", + sortBy: "cpuload", + algorithm: "", + wantErr: "node selector algorithm option cannot be blank", + }, + { + name: "unknown sortBy", + sortBy: "invalid", + algorithm: "twochoice", + wantErr: "unknown sort by option", + }, + { + name: "unknown algorithm", + sortBy: "cpuload", + algorithm: "invalid", + wantErr: "unknown node selector algorithm option", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + selector := &AnySelector{ + SortBy: tt.sortBy, + Algorithm: tt.algorithm, + } + + node, err := selector.SelectNode(nodes) + + require.Error(t, err) + require.Contains(t, err.Error(), tt.wantErr) + require.Nil(t, node) + }) + } +} diff --git a/pkg/routing/selector/cpuload.go b/pkg/routing/selector/cpuload.go index 1cd04c4c0..f84b4a3ce 100644 --- a/pkg/routing/selector/cpuload.go +++ b/pkg/routing/selector/cpuload.go @@ -23,6 +23,7 @@ import ( type CPULoadSelector struct { CPULoadLimit float32 SortBy string + Algorithm string } func (s *CPULoadSelector) filterNodes(nodes []*livekit.Node) ([]*livekit.Node, error) { @@ -50,5 +51,5 @@ func (s *CPULoadSelector) SelectNode(nodes []*livekit.Node) (*livekit.Node, erro return nil, err } - return SelectSortedNode(nodes, s.SortBy) + return SelectSortedNode(nodes, s.SortBy, s.Algorithm) } diff --git a/pkg/routing/selector/cpuload_test.go b/pkg/routing/selector/cpuload_test.go index 33bca4717..ec23c1df8 100644 --- a/pkg/routing/selector/cpuload_test.go +++ b/pkg/routing/selector/cpuload_test.go @@ -25,7 +25,7 @@ import ( ) func TestCPULoadSelector_SelectNode(t *testing.T) { - sel := selector.CPULoadSelector{CPULoadLimit: 0.8, SortBy: "random"} + sel := selector.CPULoadSelector{CPULoadLimit: 0.8, SortBy: "random", Algorithm: "lowest"} var nodes []*livekit.Node _, err := sel.SelectNode(nodes) diff --git a/pkg/routing/selector/errors.go b/pkg/routing/selector/errors.go index c011f67af..690153a15 100644 --- a/pkg/routing/selector/errors.go +++ b/pkg/routing/selector/errors.go @@ -21,5 +21,7 @@ var ( ErrCurrentRegionNotSet = errors.New("current region cannot be blank") ErrCurrentRegionUnknownLatLon = errors.New("unknown lat and lon for the current region") ErrSortByNotSet = errors.New("sort by option cannot be blank") + ErrAlgorithmNotSet = errors.New("node selector algorithm option cannot be blank") ErrSortByUnknown = errors.New("unknown sort by option") + ErrAlgorithmUnknown = errors.New("unknown node selector algorithm option") ) diff --git a/pkg/routing/selector/interfaces.go b/pkg/routing/selector/interfaces.go index 60d001e18..6fc223ef9 100644 --- a/pkg/routing/selector/interfaces.go +++ b/pkg/routing/selector/interfaces.go @@ -37,19 +37,21 @@ func CreateNodeSelector(conf *config.Config) (NodeSelector, error) { } switch kind { case "any": - return &AnySelector{conf.NodeSelector.SortBy}, nil + return &AnySelector{conf.NodeSelector.SortBy, conf.NodeSelector.Algorithm}, nil case "cpuload": return &CPULoadSelector{ CPULoadLimit: conf.NodeSelector.CPULoadLimit, - SortBy: conf.NodeSelector.SortBy, + SortBy: conf.NodeSelector.SortBy, + Algorithm: conf.NodeSelector.Algorithm, }, nil case "sysload": return &SystemLoadSelector{ SysloadLimit: conf.NodeSelector.SysloadLimit, - SortBy: conf.NodeSelector.SortBy, + SortBy: conf.NodeSelector.SortBy, + Algorithm: conf.NodeSelector.Algorithm, }, nil case "regionaware": - s, err := NewRegionAwareSelector(conf.Region, conf.NodeSelector.Regions, conf.NodeSelector.SortBy) + s, err := NewRegionAwareSelector(conf.Region, conf.NodeSelector.Regions, conf.NodeSelector.SortBy, conf.NodeSelector.Algorithm) if err != nil { return nil, err } @@ -57,7 +59,7 @@ func CreateNodeSelector(conf *config.Config) (NodeSelector, error) { return s, nil case "random": logger.Warnw("random node selector is deprecated, please switch to \"any\" or another selector", nil) - return &AnySelector{conf.NodeSelector.SortBy}, nil + return &AnySelector{conf.NodeSelector.SortBy, conf.NodeSelector.Algorithm}, nil default: return nil, ErrUnsupportedSelector } diff --git a/pkg/routing/selector/regionaware.go b/pkg/routing/selector/regionaware.go index 61f494f70..756781ae5 100644 --- a/pkg/routing/selector/regionaware.go +++ b/pkg/routing/selector/regionaware.go @@ -29,9 +29,10 @@ type RegionAwareSelector struct { regionDistances map[string]float64 regions []config.RegionConfig SortBy string + Algorithm string } -func NewRegionAwareSelector(currentRegion string, regions []config.RegionConfig, sortBy string) (*RegionAwareSelector, error) { +func NewRegionAwareSelector(currentRegion string, regions []config.RegionConfig, sortBy string, algorithm string) (*RegionAwareSelector, error) { if currentRegion == "" { return nil, ErrCurrentRegionNotSet } @@ -41,6 +42,7 @@ func NewRegionAwareSelector(currentRegion string, regions []config.RegionConfig, regionDistances: make(map[string]float64), regions: regions, SortBy: sortBy, + Algorithm: algorithm, } var currentRC *config.RegionConfig @@ -94,7 +96,7 @@ func (s *RegionAwareSelector) SelectNode(nodes []*livekit.Node) (*livekit.Node, nodes = nearestNodes } - return SelectSortedNode(nodes, s.SortBy) + return SelectSortedNode(nodes, s.SortBy, s.Algorithm) } // haversine(θ) function diff --git a/pkg/routing/selector/regionaware_test.go b/pkg/routing/selector/regionaware_test.go index bc4b2036f..743310156 100644 --- a/pkg/routing/selector/regionaware_test.go +++ b/pkg/routing/selector/regionaware_test.go @@ -34,6 +34,7 @@ const ( regionEast = "us-east" regionSeattle = "seattle" sortBy = "random" + algorithm = "lowest" ) func TestRegionAwareRouting(t *testing.T) { @@ -58,7 +59,7 @@ func TestRegionAwareRouting(t *testing.T) { nodes := []*livekit.Node{ newTestNodeInRegion("", false), } - s, err := selector.NewRegionAwareSelector(regionEast, nil, sortBy) + s, err := selector.NewRegionAwareSelector(regionEast, nil, sortBy, algorithm) require.NoError(t, err) node, err := s.SelectNode(nodes) @@ -74,7 +75,7 @@ func TestRegionAwareRouting(t *testing.T) { expectedNode, newTestNodeInRegion(regionEast, false), } - s, err := selector.NewRegionAwareSelector(regionEast, rc, sortBy) + s, err := selector.NewRegionAwareSelector(regionEast, rc, sortBy, algorithm) require.NoError(t, err) s.SysloadLimit = loadLimit @@ -91,7 +92,7 @@ func TestRegionAwareRouting(t *testing.T) { newTestNodeInRegion(regionWest, true), newTestNodeInRegion(regionEast, false), } - s, err := selector.NewRegionAwareSelector(regionEast, rc, sortBy) + s, err := selector.NewRegionAwareSelector(regionEast, rc, sortBy, algorithm) require.NoError(t, err) s.SysloadLimit = loadLimit @@ -107,7 +108,7 @@ func TestRegionAwareRouting(t *testing.T) { expectedNode, newTestNodeInRegion(regionEast, true), } - s, err := selector.NewRegionAwareSelector(regionSeattle, rc, sortBy) + s, err := selector.NewRegionAwareSelector(regionSeattle, rc, sortBy, algorithm) require.NoError(t, err) s.SysloadLimit = loadLimit @@ -125,7 +126,7 @@ func TestRegionAwareRouting(t *testing.T) { expectedNode, expectedNode, } - s, err := selector.NewRegionAwareSelector(regionSeattle, rc, sortBy) + s, err := selector.NewRegionAwareSelector(regionSeattle, rc, sortBy, algorithm) require.NoError(t, err) s.SysloadLimit = loadLimit @@ -138,7 +139,7 @@ func TestRegionAwareRouting(t *testing.T) { nodes := []*livekit.Node{ newTestNodeInRegion(regionWest, true), } - s, err := selector.NewRegionAwareSelector(regionEast, rc, sortBy) + s, err := selector.NewRegionAwareSelector(regionEast, rc, sortBy, algorithm) require.NoError(t, err) node, err := s.SelectNode(nodes) diff --git a/pkg/routing/selector/sortby_test.go b/pkg/routing/selector/sortby_test.go index 31de0027b..889f84f3e 100644 --- a/pkg/routing/selector/sortby_test.go +++ b/pkg/routing/selector/sortby_test.go @@ -23,7 +23,7 @@ import ( ) func SortByTest(t *testing.T, sortBy string) { - sel := selector.SystemLoadSelector{SortBy: sortBy} + sel := selector.SystemLoadSelector{SortBy: sortBy, Algorithm: "lowest"} nodes := []*livekit.Node{nodeLoadLow, nodeLoadMedium, nodeLoadHigh} for i := 0; i < 5; i++ { @@ -38,7 +38,7 @@ func SortByTest(t *testing.T, sortBy string) { } func TestSortByErrors(t *testing.T) { - sel := selector.SystemLoadSelector{} + sel := selector.SystemLoadSelector{Algorithm: "lowest"} nodes := []*livekit.Node{nodeLoadLow, nodeLoadMedium, nodeLoadHigh} // Test unset sort by option error diff --git a/pkg/routing/selector/sysload.go b/pkg/routing/selector/sysload.go index 909311a0e..285fca493 100644 --- a/pkg/routing/selector/sysload.go +++ b/pkg/routing/selector/sysload.go @@ -23,6 +23,7 @@ import ( type SystemLoadSelector struct { SysloadLimit float32 SortBy string + Algorithm string } func (s *SystemLoadSelector) filterNodes(nodes []*livekit.Node) ([]*livekit.Node, error) { @@ -49,5 +50,5 @@ func (s *SystemLoadSelector) SelectNode(nodes []*livekit.Node) (*livekit.Node, e return nil, err } - return SelectSortedNode(nodes, s.SortBy) + return SelectSortedNode(nodes, s.SortBy, s.Algorithm) } diff --git a/pkg/routing/selector/sysload_test.go b/pkg/routing/selector/sysload_test.go index 8867b4918..9f3349a5f 100644 --- a/pkg/routing/selector/sysload_test.go +++ b/pkg/routing/selector/sysload_test.go @@ -88,7 +88,7 @@ var ( ) func TestSystemLoadSelector_SelectNode(t *testing.T) { - sel := selector.SystemLoadSelector{SysloadLimit: 1.0, SortBy: "random"} + sel := selector.SystemLoadSelector{SysloadLimit: 1.0, SortBy: "random", Algorithm: "lowest"} var nodes []*livekit.Node _, err := sel.SelectNode(nodes) diff --git a/pkg/routing/selector/utils.go b/pkg/routing/selector/utils.go index 5d5aef4b4..88b6f29bd 100644 --- a/pkg/routing/selector/utils.go +++ b/pkg/routing/selector/utils.go @@ -15,6 +15,7 @@ package selector import ( + "math/rand/v2" "sort" "time" @@ -74,11 +75,49 @@ func LimitsReached(limitConfig config.LimitConfig, nodeStats *livekit.NodeStats) return false } -func SelectSortedNode(nodes []*livekit.Node, sortBy string) (*livekit.Node, error) { +func SelectSortedNode(nodes []*livekit.Node, sortBy string, algorithm string) (*livekit.Node, error) { if sortBy == "" { return nil, ErrSortByNotSet } + if algorithm == "" { + return nil, ErrAlgorithmNotSet + } + switch algorithm { + case "lowest": // examine all nodes and select the lowest based on sort criteria + return selectLowestSortedNode(nodes, sortBy) + case "twochoice": // randomly select two nodes and return the lowest based on sort criteria "Power of Two Random Choices" + return selectTwoChoiceSortedNode(nodes, sortBy) + default: + return nil, ErrAlgorithmUnknown + } +} + +func selectTwoChoiceSortedNode(nodes []*livekit.Node, sortBy string) (*livekit.Node, error) { + if len(nodes) <= 2 { + return selectLowestSortedNode(nodes, sortBy) + } + + // randomly select two nodes + node1, node2, err := selectTwoRandomNodes(nodes) + if err != nil { + return nil, err + } + + // compare the two nodes based on the sort criteria + if node1 == nil || node2 == nil { + return nil, ErrNoAvailableNodes + } + + selectedNode, err := selectLowestSortedNode([]*livekit.Node{node1, node2}, sortBy) + if err != nil { + return nil, err + } + + return selectedNode, nil +} + +func selectLowestSortedNode(nodes []*livekit.Node, sortBy string) (*livekit.Node, error) { // Return a node based on what it should be sorted by for priority switch sortBy { case "random": @@ -127,3 +166,13 @@ func SelectSortedNode(nodes []*livekit.Node, sortBy string) (*livekit.Node, erro return nil, ErrSortByUnknown } } + +func selectTwoRandomNodes(nodes []*livekit.Node) (*livekit.Node, *livekit.Node, error) { + if len(nodes) < 2 { + return nil, nil, ErrNoAvailableNodes + } + + shuffledIndices := rand.Perm(len(nodes)) + + return nodes[shuffledIndices[0]], nodes[shuffledIndices[1]], nil +}