diff --git a/go.mod b/go.mod index fc4b7292a..4e957eac9 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 github.com/livekit/mediatransportutil v0.0.0-20260501135216-8818f1b77e59 - github.com/livekit/protocol v1.45.7-0.20260502131722-3c9faab50403 + github.com/livekit/protocol v1.45.8-0.20260505211410-5dd801462b33 github.com/livekit/psrpc v0.7.1 github.com/mackerelio/go-osstat v0.2.7 github.com/magefile/mage v1.17.0 diff --git a/go.sum b/go.sum index 1bf28b4d7..4558a9f0d 100644 --- a/go.sum +++ b/go.sum @@ -181,8 +181,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20260501135216-8818f1b77e59 h1:lWRMrb4ReRJu/e/BAp1kpT6fQOjS8WjCxdp0PGjgrBc= github.com/livekit/mediatransportutil v0.0.0-20260501135216-8818f1b77e59/go.mod h1:RCd46PT+6sEztld6XpkCrG1xskb0u3SqxIjy4G897Ss= -github.com/livekit/protocol v1.45.7-0.20260502131722-3c9faab50403 h1:tWebMNKx9GWtvgSXN6BgmXYX1fuXS5buM2MqGzuypgM= -github.com/livekit/protocol v1.45.7-0.20260502131722-3c9faab50403/go.mod h1:Q4uw9Bkz7ucNxPP/lcVj6IkVMYzlq3TwYo2TRFL6BN0= +github.com/livekit/protocol v1.45.8-0.20260505211410-5dd801462b33 h1:mr5ALVvQhQ7sdvaRAM5vNWPtBjmLSM1C2vLHOgzqwBs= +github.com/livekit/protocol v1.45.8-0.20260505211410-5dd801462b33/go.mod h1:Q4uw9Bkz7ucNxPP/lcVj6IkVMYzlq3TwYo2TRFL6BN0= github.com/livekit/psrpc v0.7.1 h1:ms37az0QTD3UXIWuUC5D/SkmKOlRMVRsI261eBWu/Vw= github.com/livekit/psrpc v0.7.1/go.mod h1:bZ4iHFQptTkbPnB0LasvRNu/OBYXEu1NA6O5BMFo9kk= github.com/mackerelio/go-osstat v0.2.7 h1:TCavZi10wF49bT6iQZ9eT2keGZQpC69MTDfdJej5e94= diff --git a/pkg/agent/client.go b/pkg/agent/client.go index 9a061c084..51129d033 100644 --- a/pkg/agent/client.go +++ b/pkg/agent/client.go @@ -62,8 +62,8 @@ type JobRequest struct { // only set for participant jobs Participant *livekit.ParticipantInfo Metadata string - AgentName string - Environment string + AgentName string + Deployment string } type agentClient struct { @@ -155,7 +155,7 @@ func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) *serverut } dispatcher.ForEach(func(curNs string) { - topic := GetAgentTopic(desc.AgentName, curNs, desc.Environment) + topic := GetAgentTopic(desc.AgentName, curNs, desc.Deployment) wg.Add(1) c.workers.Submit(func() { @@ -171,7 +171,7 @@ func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) *serverut AgentName: desc.AgentName, Metadata: desc.Metadata, EnableRecording: c.config.EnableUserDataRecording, - Environment: desc.Environment, + Deployment: desc.Deployment, } resp, err := c.client.JobRequest(context.Background(), topic, jobTypeTopic, job) if err != nil { @@ -328,7 +328,7 @@ func (c *agentClient) Stop() error { return nil } -func GetAgentTopic(agentName, namespace, environment string) string { +func GetAgentTopic(agentName, namespace, deployment string) string { var topic string if agentName == "" { // Backward compatibility @@ -339,8 +339,8 @@ func GetAgentTopic(agentName, namespace, environment string) string { } else { topic = fmt.Sprintf("%s_%s", agentName, namespace) } - if environment != "" { - topic += "_" + environment + if deployment != "" { + topic += "_" + deployment } return topic } diff --git a/pkg/agent/worker.go b/pkg/agent/worker.go index 32d1eeaa6..e1e80fcc5 100644 --- a/pkg/agent/worker.go +++ b/pkg/agent/worker.go @@ -150,7 +150,7 @@ type WorkerRegistration struct { JobType livekit.JobType Permissions *livekit.ParticipantPermission ClientIP string - Environment string + Deployment string } func MakeWorkerRegistration() WorkerRegistration { @@ -197,7 +197,7 @@ func (h *WorkerRegisterer) HandleRegister(req *livekit.RegisterWorkerRequest) er return ErrUnknownJobType } - if err := protoagent.ValidateEnvironment(req.GetEnvironment()); err != nil { + if err := protoagent.ValidateDeployment(req.GetDeployment()); err != nil { return err } @@ -216,7 +216,7 @@ func (h *WorkerRegisterer) HandleRegister(req *livekit.RegisterWorkerRequest) er h.registration.Namespace = req.GetNamespace() h.registration.JobType = req.GetType() h.registration.Permissions = permissions - h.registration.Environment = req.GetEnvironment() + h.registration.Deployment = req.GetDeployment() h.registered = true _, err := h.conn.WriteServerMessage(&livekit.ServerMessage{ diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 5c1855151..8a3118dcb 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -1763,12 +1763,12 @@ func (r *Room) launchRoomAgents(ads []*agentDispatch) { go func() { inc := r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{ - JobType: livekit.JobType_JT_ROOM, - Room: r.ToProto(), - Metadata: ad.Metadata, - AgentName: ad.AgentName, - DispatchId: ad.Id, - Environment: ad.Environment, + JobType: livekit.JobType_JT_ROOM, + Room: r.ToProto(), + Metadata: ad.Metadata, + AgentName: ad.AgentName, + DispatchId: ad.Id, + Deployment: ad.Deployment, }) r.handleNewJobs(ad.AgentDispatch, inc) done() @@ -1792,7 +1792,7 @@ func (r *Room) launchTargetAgents(ads []*agentDispatch, p types.Participant, job Metadata: ad.Metadata, AgentName: ad.AgentName, DispatchId: ad.Id, - Environment: ad.Environment, + Deployment: ad.Deployment, }) r.handleNewJobs(ad.AgentDispatch, inc) done() @@ -1849,7 +1849,7 @@ func (r *Room) createAgentDispatch(dispatch *livekit.AgentDispatch) (*agentDispa } func (r *Room) createAgentDispatchFromRoomDispatch(rad *livekit.RoomAgentDispatch) (*agentDispatch, error) { - if err := protoagent.ValidateEnvironment(rad.GetEnvironment()); err != nil { + if err := protoagent.ValidateDeployment(rad.GetDeployment()); err != nil { return nil, err } return r.createAgentDispatch(&livekit.AgentDispatch{ @@ -1858,7 +1858,7 @@ func (r *Room) createAgentDispatchFromRoomDispatch(rad *livekit.RoomAgentDispatc Metadata: rad.GetMetadata(), Room: r.protoRoom.Name, RestartPolicy: rad.GetRestartPolicy(), - Environment: rad.GetEnvironment(), + Deployment: rad.GetDeployment(), }) } diff --git a/pkg/service/agent_dispatch_service.go b/pkg/service/agent_dispatch_service.go index 6649ab487..b261c2d4b 100644 --- a/pkg/service/agent_dispatch_service.go +++ b/pkg/service/agent_dispatch_service.go @@ -56,7 +56,7 @@ func (ag *AgentDispatchService) CreateDispatch(ctx context.Context, req *livekit return nil, twirpAuthError(err) } - if err := agent.ValidateEnvironment(req.GetEnvironment()); err != nil { + if err := agent.ValidateDeployment(req.GetDeployment()); err != nil { return nil, psrpc.NewError(psrpc.InvalidArgument, err) } @@ -78,7 +78,7 @@ func (ag *AgentDispatchService) CreateDispatch(ctx context.Context, req *livekit Room: req.Room, Metadata: req.Metadata, RestartPolicy: req.RestartPolicy, - Environment: req.Environment, + Deployment: req.Deployment, } return ag.agentDispatchClient.CreateDispatch(ctx, ag.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), dispatch) } diff --git a/pkg/service/agentservice.go b/pkg/service/agentservice.go index 9b099bf22..215f5fac4 100644 --- a/pkg/service/agentservice.go +++ b/pkg/service/agentservice.go @@ -158,10 +158,10 @@ type AgentHandler struct { } type workerKey struct { - agentName string - namespace string - jobType livekit.JobType - environment string + agentName string + namespace string + jobType livekit.JobType + deployment string } func NewAgentService( @@ -256,13 +256,13 @@ func (h *AgentHandler) registerWorker(w *agent.Worker) { h.workers[w.ID] = w - key := workerKey{w.AgentName, w.Namespace, w.JobType, w.Environment} + key := workerKey{w.AgentName, w.Namespace, w.JobType, w.Deployment} workers := h.namespaceWorkers[key] created := len(workers) == 0 if created { - nameTopic := agent.GetAgentTopic(w.AgentName, w.Namespace, w.Environment) + nameTopic := agent.GetAgentTopic(w.AgentName, w.Namespace, w.Deployment) var typeTopic string switch w.JobType { case livekit.JobType_JT_ROOM: @@ -321,7 +321,7 @@ func (h *AgentHandler) deregisterWorker(w *agent.Worker) { delete(h.workers, w.ID) - key := workerKey{w.AgentName, w.Namespace, w.JobType, w.Environment} + key := workerKey{w.AgentName, w.Namespace, w.JobType, w.Deployment} workers, ok := h.namespaceWorkers[key] if !ok { @@ -343,7 +343,7 @@ func (h *AgentHandler) deregisterWorker(w *agent.Worker) { ) delete(h.namespaceWorkers, key) - topic := agent.GetAgentTopic(w.AgentName, w.Namespace, w.Environment) + topic := agent.GetAgentTopic(w.AgentName, w.Namespace, w.Deployment) switch w.JobType { case livekit.JobType_JT_ROOM: @@ -394,7 +394,7 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*rpc.J logger = logger.WithValues("participant", job.Participant.Identity) } - key := workerKey{job.AgentName, job.Namespace, job.Type, job.Environment} + key := workerKey{job.AgentName, job.Namespace, job.Type, job.Deployment} attempted := make(map[*agent.Worker]struct{}) for { selected, err := h.selectWorkerWeightedByLoad(key, attempted) @@ -439,7 +439,7 @@ func (h *AgentHandler) JobRequestAffinity(ctx context.Context, job *livekit.Job) var affinity float32 for _, w := range h.workers { - if w.AgentName != job.AgentName || w.Namespace != job.Namespace || w.JobType != job.Type || w.Environment != job.Environment { + if w.AgentName != job.AgentName || w.Namespace != job.Namespace || w.JobType != job.Type || w.Deployment != job.Deployment { continue }