From 88c77dc666d2cbdea3d1da22a4e726b39ff40ad1 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Thu, 9 Apr 2026 13:49:43 -0700 Subject: [PATCH] compute agent dispatch affinity from target load (#4442) * compute agent dispatch affinity from target load * fix test config --- pkg/agent/config.go | 5 ++++- pkg/agent/testutils/server.go | 7 ++++++- pkg/config/config.go | 5 ++++- pkg/service/agentservice.go | 6 +++++- 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/pkg/agent/config.go b/pkg/agent/config.go index c4d0fa84c..5a3633108 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -1,5 +1,8 @@ package agent +const DefaultTargetLoad = 0.7 + type Config struct { - EnableUserDataRecording bool `yaml:"enable_user_data_recording"` + EnableUserDataRecording bool `yaml:"enable_user_data_recording"` + TargetLoad float32 `yaml:"target_load,omitempty"` } diff --git a/pkg/agent/testutils/server.go b/pkg/agent/testutils/server.go index 31a12c5ed..5f045436d 100644 --- a/pkg/agent/testutils/server.go +++ b/pkg/agent/testutils/server.go @@ -40,7 +40,12 @@ type TestServer struct { func NewTestServer(bus psrpc.MessageBus) *TestServer { localNode, _ := routing.NewLocalNode(nil) return NewTestServerWithService(must.Get(service.NewAgentService( - &config.Config{Region: "test"}, + &config.Config{ + Region: "test", + Agents: agent.Config{ + TargetLoad: agent.DefaultTargetLoad, + }, + }, localNode, bus, auth.NewSimpleKeyProvider("test", "verysecretsecret"), diff --git a/pkg/config/config.go b/pkg/config/config.go index 5da2eff54..01038b3bf 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -436,7 +436,10 @@ var DefaultConfig = Config{ StreamBufferSize: 1000, ConnectAttempts: 3, }, - PSRPC: rpc.DefaultPSRPCConfig, + Agents: agent.Config{ + TargetLoad: agent.DefaultTargetLoad, + }, + PSRPC: rpc.DefaultPSRPCConfig, Keys: map[string]string{}, Metric: metric.DefaultMetricConfig, WebHook: webhook.DefaultWebHookConfig, diff --git a/pkg/service/agentservice.go b/pkg/service/agentservice.go index 09234f223..f51d4c7ba 100644 --- a/pkg/service/agentservice.go +++ b/pkg/service/agentservice.go @@ -143,6 +143,7 @@ type AgentHandler struct { workers map[string]*agent.Worker jobToWorker map[livekit.JobID]*agent.Worker keyProvider auth.KeyProvider + targetLoad float32 namespaceWorkers map[workerKey][]*agent.Worker roomKeyCount int @@ -188,6 +189,7 @@ func NewAgentService( keyProvider, logger.GetLogger(), serverInfo, + conf.Agents.TargetLoad, agent.RoomAgentTopic, agent.PublisherAgentTopic, agent.ParticipantAgentTopic, @@ -207,6 +209,7 @@ func NewAgentHandler( keyProvider auth.KeyProvider, logger logger.Logger, serverInfo *livekit.ServerInfo, + targetLoad float32, roomTopic string, publisherTopic string, participantTopic string, @@ -219,6 +222,7 @@ func NewAgentHandler( namespaceWorkers: make(map[workerKey][]*agent.Worker), serverInfo: serverInfo, keyProvider: keyProvider, + targetLoad: targetLoad, roomTopic: roomTopic, publisherTopic: publisherTopic, participantTopic: participantTopic, @@ -439,7 +443,7 @@ func (h *AgentHandler) JobRequestAffinity(ctx context.Context, job *livekit.Job) } if w.Status() == livekit.WorkerStatus_WS_AVAILABLE { - affinity += max(0, 1-w.Load()) + affinity += max(0, h.targetLoad-w.Load()) } }