mirror of
https://github.com/livekit/livekit.git
synced 2026-05-11 08:17:09 +00:00
rename agent environment to deployment (#4506)
* rename agent environment to deployment * deps
This commit is contained in:
@@ -21,7 +21,7 @@ require (
|
||||
github.com/jxskiss/base62 v1.1.0
|
||||
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
|
||||
github.com/livekit/mediatransportutil v0.0.0-20260501135216-8818f1b77e59
|
||||
github.com/livekit/protocol v1.45.7-0.20260502131722-3c9faab50403
|
||||
github.com/livekit/protocol v1.45.8-0.20260505211410-5dd801462b33
|
||||
github.com/livekit/psrpc v0.7.1
|
||||
github.com/mackerelio/go-osstat v0.2.7
|
||||
github.com/magefile/mage v1.17.0
|
||||
|
||||
@@ -181,8 +181,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT
|
||||
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20260501135216-8818f1b77e59 h1:lWRMrb4ReRJu/e/BAp1kpT6fQOjS8WjCxdp0PGjgrBc=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20260501135216-8818f1b77e59/go.mod h1:RCd46PT+6sEztld6XpkCrG1xskb0u3SqxIjy4G897Ss=
|
||||
github.com/livekit/protocol v1.45.7-0.20260502131722-3c9faab50403 h1:tWebMNKx9GWtvgSXN6BgmXYX1fuXS5buM2MqGzuypgM=
|
||||
github.com/livekit/protocol v1.45.7-0.20260502131722-3c9faab50403/go.mod h1:Q4uw9Bkz7ucNxPP/lcVj6IkVMYzlq3TwYo2TRFL6BN0=
|
||||
github.com/livekit/protocol v1.45.8-0.20260505211410-5dd801462b33 h1:mr5ALVvQhQ7sdvaRAM5vNWPtBjmLSM1C2vLHOgzqwBs=
|
||||
github.com/livekit/protocol v1.45.8-0.20260505211410-5dd801462b33/go.mod h1:Q4uw9Bkz7ucNxPP/lcVj6IkVMYzlq3TwYo2TRFL6BN0=
|
||||
github.com/livekit/psrpc v0.7.1 h1:ms37az0QTD3UXIWuUC5D/SkmKOlRMVRsI261eBWu/Vw=
|
||||
github.com/livekit/psrpc v0.7.1/go.mod h1:bZ4iHFQptTkbPnB0LasvRNu/OBYXEu1NA6O5BMFo9kk=
|
||||
github.com/mackerelio/go-osstat v0.2.7 h1:TCavZi10wF49bT6iQZ9eT2keGZQpC69MTDfdJej5e94=
|
||||
|
||||
+7
-7
@@ -62,8 +62,8 @@ type JobRequest struct {
|
||||
// only set for participant jobs
|
||||
Participant *livekit.ParticipantInfo
|
||||
Metadata string
|
||||
AgentName string
|
||||
Environment string
|
||||
AgentName string
|
||||
Deployment string
|
||||
}
|
||||
|
||||
type agentClient struct {
|
||||
@@ -155,7 +155,7 @@ func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) *serverut
|
||||
}
|
||||
|
||||
dispatcher.ForEach(func(curNs string) {
|
||||
topic := GetAgentTopic(desc.AgentName, curNs, desc.Environment)
|
||||
topic := GetAgentTopic(desc.AgentName, curNs, desc.Deployment)
|
||||
|
||||
wg.Add(1)
|
||||
c.workers.Submit(func() {
|
||||
@@ -171,7 +171,7 @@ func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) *serverut
|
||||
AgentName: desc.AgentName,
|
||||
Metadata: desc.Metadata,
|
||||
EnableRecording: c.config.EnableUserDataRecording,
|
||||
Environment: desc.Environment,
|
||||
Deployment: desc.Deployment,
|
||||
}
|
||||
resp, err := c.client.JobRequest(context.Background(), topic, jobTypeTopic, job)
|
||||
if err != nil {
|
||||
@@ -328,7 +328,7 @@ func (c *agentClient) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetAgentTopic(agentName, namespace, environment string) string {
|
||||
func GetAgentTopic(agentName, namespace, deployment string) string {
|
||||
var topic string
|
||||
if agentName == "" {
|
||||
// Backward compatibility
|
||||
@@ -339,8 +339,8 @@ func GetAgentTopic(agentName, namespace, environment string) string {
|
||||
} else {
|
||||
topic = fmt.Sprintf("%s_%s", agentName, namespace)
|
||||
}
|
||||
if environment != "" {
|
||||
topic += "_" + environment
|
||||
if deployment != "" {
|
||||
topic += "_" + deployment
|
||||
}
|
||||
return topic
|
||||
}
|
||||
|
||||
+3
-3
@@ -150,7 +150,7 @@ type WorkerRegistration struct {
|
||||
JobType livekit.JobType
|
||||
Permissions *livekit.ParticipantPermission
|
||||
ClientIP string
|
||||
Environment string
|
||||
Deployment string
|
||||
}
|
||||
|
||||
func MakeWorkerRegistration() WorkerRegistration {
|
||||
@@ -197,7 +197,7 @@ func (h *WorkerRegisterer) HandleRegister(req *livekit.RegisterWorkerRequest) er
|
||||
return ErrUnknownJobType
|
||||
}
|
||||
|
||||
if err := protoagent.ValidateEnvironment(req.GetEnvironment()); err != nil {
|
||||
if err := protoagent.ValidateDeployment(req.GetDeployment()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -216,7 +216,7 @@ func (h *WorkerRegisterer) HandleRegister(req *livekit.RegisterWorkerRequest) er
|
||||
h.registration.Namespace = req.GetNamespace()
|
||||
h.registration.JobType = req.GetType()
|
||||
h.registration.Permissions = permissions
|
||||
h.registration.Environment = req.GetEnvironment()
|
||||
h.registration.Deployment = req.GetDeployment()
|
||||
h.registered = true
|
||||
|
||||
_, err := h.conn.WriteServerMessage(&livekit.ServerMessage{
|
||||
|
||||
+9
-9
@@ -1763,12 +1763,12 @@ func (r *Room) launchRoomAgents(ads []*agentDispatch) {
|
||||
|
||||
go func() {
|
||||
inc := r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{
|
||||
JobType: livekit.JobType_JT_ROOM,
|
||||
Room: r.ToProto(),
|
||||
Metadata: ad.Metadata,
|
||||
AgentName: ad.AgentName,
|
||||
DispatchId: ad.Id,
|
||||
Environment: ad.Environment,
|
||||
JobType: livekit.JobType_JT_ROOM,
|
||||
Room: r.ToProto(),
|
||||
Metadata: ad.Metadata,
|
||||
AgentName: ad.AgentName,
|
||||
DispatchId: ad.Id,
|
||||
Deployment: ad.Deployment,
|
||||
})
|
||||
r.handleNewJobs(ad.AgentDispatch, inc)
|
||||
done()
|
||||
@@ -1792,7 +1792,7 @@ func (r *Room) launchTargetAgents(ads []*agentDispatch, p types.Participant, job
|
||||
Metadata: ad.Metadata,
|
||||
AgentName: ad.AgentName,
|
||||
DispatchId: ad.Id,
|
||||
Environment: ad.Environment,
|
||||
Deployment: ad.Deployment,
|
||||
})
|
||||
r.handleNewJobs(ad.AgentDispatch, inc)
|
||||
done()
|
||||
@@ -1849,7 +1849,7 @@ func (r *Room) createAgentDispatch(dispatch *livekit.AgentDispatch) (*agentDispa
|
||||
}
|
||||
|
||||
func (r *Room) createAgentDispatchFromRoomDispatch(rad *livekit.RoomAgentDispatch) (*agentDispatch, error) {
|
||||
if err := protoagent.ValidateEnvironment(rad.GetEnvironment()); err != nil {
|
||||
if err := protoagent.ValidateDeployment(rad.GetDeployment()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r.createAgentDispatch(&livekit.AgentDispatch{
|
||||
@@ -1858,7 +1858,7 @@ func (r *Room) createAgentDispatchFromRoomDispatch(rad *livekit.RoomAgentDispatc
|
||||
Metadata: rad.GetMetadata(),
|
||||
Room: r.protoRoom.Name,
|
||||
RestartPolicy: rad.GetRestartPolicy(),
|
||||
Environment: rad.GetEnvironment(),
|
||||
Deployment: rad.GetDeployment(),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ func (ag *AgentDispatchService) CreateDispatch(ctx context.Context, req *livekit
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
|
||||
if err := agent.ValidateEnvironment(req.GetEnvironment()); err != nil {
|
||||
if err := agent.ValidateDeployment(req.GetDeployment()); err != nil {
|
||||
return nil, psrpc.NewError(psrpc.InvalidArgument, err)
|
||||
}
|
||||
|
||||
@@ -78,7 +78,7 @@ func (ag *AgentDispatchService) CreateDispatch(ctx context.Context, req *livekit
|
||||
Room: req.Room,
|
||||
Metadata: req.Metadata,
|
||||
RestartPolicy: req.RestartPolicy,
|
||||
Environment: req.Environment,
|
||||
Deployment: req.Deployment,
|
||||
}
|
||||
return ag.agentDispatchClient.CreateDispatch(ctx, ag.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), dispatch)
|
||||
}
|
||||
|
||||
+10
-10
@@ -158,10 +158,10 @@ type AgentHandler struct {
|
||||
}
|
||||
|
||||
type workerKey struct {
|
||||
agentName string
|
||||
namespace string
|
||||
jobType livekit.JobType
|
||||
environment string
|
||||
agentName string
|
||||
namespace string
|
||||
jobType livekit.JobType
|
||||
deployment string
|
||||
}
|
||||
|
||||
func NewAgentService(
|
||||
@@ -256,13 +256,13 @@ func (h *AgentHandler) registerWorker(w *agent.Worker) {
|
||||
|
||||
h.workers[w.ID] = w
|
||||
|
||||
key := workerKey{w.AgentName, w.Namespace, w.JobType, w.Environment}
|
||||
key := workerKey{w.AgentName, w.Namespace, w.JobType, w.Deployment}
|
||||
|
||||
workers := h.namespaceWorkers[key]
|
||||
created := len(workers) == 0
|
||||
|
||||
if created {
|
||||
nameTopic := agent.GetAgentTopic(w.AgentName, w.Namespace, w.Environment)
|
||||
nameTopic := agent.GetAgentTopic(w.AgentName, w.Namespace, w.Deployment)
|
||||
var typeTopic string
|
||||
switch w.JobType {
|
||||
case livekit.JobType_JT_ROOM:
|
||||
@@ -321,7 +321,7 @@ func (h *AgentHandler) deregisterWorker(w *agent.Worker) {
|
||||
|
||||
delete(h.workers, w.ID)
|
||||
|
||||
key := workerKey{w.AgentName, w.Namespace, w.JobType, w.Environment}
|
||||
key := workerKey{w.AgentName, w.Namespace, w.JobType, w.Deployment}
|
||||
|
||||
workers, ok := h.namespaceWorkers[key]
|
||||
if !ok {
|
||||
@@ -343,7 +343,7 @@ func (h *AgentHandler) deregisterWorker(w *agent.Worker) {
|
||||
)
|
||||
delete(h.namespaceWorkers, key)
|
||||
|
||||
topic := agent.GetAgentTopic(w.AgentName, w.Namespace, w.Environment)
|
||||
topic := agent.GetAgentTopic(w.AgentName, w.Namespace, w.Deployment)
|
||||
|
||||
switch w.JobType {
|
||||
case livekit.JobType_JT_ROOM:
|
||||
@@ -394,7 +394,7 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*rpc.J
|
||||
logger = logger.WithValues("participant", job.Participant.Identity)
|
||||
}
|
||||
|
||||
key := workerKey{job.AgentName, job.Namespace, job.Type, job.Environment}
|
||||
key := workerKey{job.AgentName, job.Namespace, job.Type, job.Deployment}
|
||||
attempted := make(map[*agent.Worker]struct{})
|
||||
for {
|
||||
selected, err := h.selectWorkerWeightedByLoad(key, attempted)
|
||||
@@ -439,7 +439,7 @@ func (h *AgentHandler) JobRequestAffinity(ctx context.Context, job *livekit.Job)
|
||||
|
||||
var affinity float32
|
||||
for _, w := range h.workers {
|
||||
if w.AgentName != job.AgentName || w.Namespace != job.Namespace || w.JobType != job.Type || w.Environment != job.Environment {
|
||||
if w.AgentName != job.AgentName || w.Namespace != job.Namespace || w.JobType != job.Type || w.Deployment != job.Deployment {
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user