mirror of
https://github.com/livekit/livekit.git
synced 2026-04-25 15:32:09 +00:00
compute agent dispatch affinity from target load (#4442)
* compute agent dispatch affinity from target load * fix test config
This commit is contained in:
@@ -1,5 +1,8 @@
|
||||
package agent
|
||||
|
||||
const DefaultTargetLoad = 0.7
|
||||
|
||||
type Config struct {
|
||||
EnableUserDataRecording bool `yaml:"enable_user_data_recording"`
|
||||
EnableUserDataRecording bool `yaml:"enable_user_data_recording"`
|
||||
TargetLoad float32 `yaml:"target_load,omitempty"`
|
||||
}
|
||||
|
||||
@@ -40,7 +40,12 @@ type TestServer struct {
|
||||
func NewTestServer(bus psrpc.MessageBus) *TestServer {
|
||||
localNode, _ := routing.NewLocalNode(nil)
|
||||
return NewTestServerWithService(must.Get(service.NewAgentService(
|
||||
&config.Config{Region: "test"},
|
||||
&config.Config{
|
||||
Region: "test",
|
||||
Agents: agent.Config{
|
||||
TargetLoad: agent.DefaultTargetLoad,
|
||||
},
|
||||
},
|
||||
localNode,
|
||||
bus,
|
||||
auth.NewSimpleKeyProvider("test", "verysecretsecret"),
|
||||
|
||||
@@ -436,7 +436,10 @@ var DefaultConfig = Config{
|
||||
StreamBufferSize: 1000,
|
||||
ConnectAttempts: 3,
|
||||
},
|
||||
PSRPC: rpc.DefaultPSRPCConfig,
|
||||
Agents: agent.Config{
|
||||
TargetLoad: agent.DefaultTargetLoad,
|
||||
},
|
||||
PSRPC: rpc.DefaultPSRPCConfig,
|
||||
Keys: map[string]string{},
|
||||
Metric: metric.DefaultMetricConfig,
|
||||
WebHook: webhook.DefaultWebHookConfig,
|
||||
|
||||
@@ -143,6 +143,7 @@ type AgentHandler struct {
|
||||
workers map[string]*agent.Worker
|
||||
jobToWorker map[livekit.JobID]*agent.Worker
|
||||
keyProvider auth.KeyProvider
|
||||
targetLoad float32
|
||||
|
||||
namespaceWorkers map[workerKey][]*agent.Worker
|
||||
roomKeyCount int
|
||||
@@ -188,6 +189,7 @@ func NewAgentService(
|
||||
keyProvider,
|
||||
logger.GetLogger(),
|
||||
serverInfo,
|
||||
conf.Agents.TargetLoad,
|
||||
agent.RoomAgentTopic,
|
||||
agent.PublisherAgentTopic,
|
||||
agent.ParticipantAgentTopic,
|
||||
@@ -207,6 +209,7 @@ func NewAgentHandler(
|
||||
keyProvider auth.KeyProvider,
|
||||
logger logger.Logger,
|
||||
serverInfo *livekit.ServerInfo,
|
||||
targetLoad float32,
|
||||
roomTopic string,
|
||||
publisherTopic string,
|
||||
participantTopic string,
|
||||
@@ -219,6 +222,7 @@ func NewAgentHandler(
|
||||
namespaceWorkers: make(map[workerKey][]*agent.Worker),
|
||||
serverInfo: serverInfo,
|
||||
keyProvider: keyProvider,
|
||||
targetLoad: targetLoad,
|
||||
roomTopic: roomTopic,
|
||||
publisherTopic: publisherTopic,
|
||||
participantTopic: participantTopic,
|
||||
@@ -439,7 +443,7 @@ func (h *AgentHandler) JobRequestAffinity(ctx context.Context, job *livekit.Job)
|
||||
}
|
||||
|
||||
if w.Status() == livekit.WorkerStatus_WS_AVAILABLE {
|
||||
affinity += max(0, 1-w.Load())
|
||||
affinity += max(0, h.targetLoad-w.Load())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user