From fb7eb3450e73a313c26e11ebaa29e5c7b90f4731 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Mon, 8 Jul 2024 17:09:11 -0700 Subject: [PATCH] Update agents service to updated protocol (#2837) - Deprecate namespace field - Restore former semantic of starting a job for each registered namespace, for a given Agent Name - Add agentName field - Use "dispatcher" naming convention --- go.mod | 2 +- go.sum | 4 +- pkg/agent/client.go | 108 ++++++++++++++++++++++++----------- pkg/agent/worker.go | 9 +++ pkg/rtc/room.go | 36 +++--------- pkg/service/agentservice.go | 44 +++++++++----- pkg/service/roomallocator.go | 17 ++---- pkg/service/roomservice.go | 16 ++---- pkg/service/rtcservice.go | 10 +--- test/agent_test.go | 10 ++-- 10 files changed, 142 insertions(+), 114 deletions(-) diff --git a/go.mod b/go.mod index 7f68cfc52..f25062603 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 - github.com/livekit/protocol v1.19.2-0.20240705134535-94a2cfe2f1ee + github.com/livekit/protocol v1.19.2-0.20240706015329-8c1eef4468ee github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 3c9636419..305998b3b 100644 --- a/go.sum +++ b/go.sum @@ -167,8 +167,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 h1:F1L8inJoynwIAYpZENNYS+1xHJMF5RFRorsnAlcxfSY= github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.19.2-0.20240705134535-94a2cfe2f1ee h1:J1U5fqAB5wJ4+Dl/DAf43Eiw+syyLTKAJoGuUj3rjQI= -github.com/livekit/protocol v1.19.2-0.20240705134535-94a2cfe2f1ee/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4= +github.com/livekit/protocol v1.19.2-0.20240706015329-8c1eef4468ee h1:t+EHiCHcxOe/hH3KZNhai+0lscBBs6HoYLg09wXAhuE= +github.com/livekit/protocol v1.19.2-0.20240706015329-8c1eef4468ee/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= diff --git a/pkg/agent/client.go b/pkg/agent/client.go index c82d6c3f5..f5051527d 100644 --- a/pkg/agent/client.go +++ b/pkg/agent/client.go @@ -16,6 +16,7 @@ package agent import ( "context" + "fmt" "sync" "time" @@ -51,7 +52,7 @@ type JobRequest struct { // only set for participant jobs Participant *livekit.ParticipantInfo Metadata string - Namespace string + AgentName string } type agentClient struct { @@ -61,9 +62,12 @@ type agentClient struct { // cache response to avoid constantly checking with controllers // cache is invalidated with AgentRegistered updates - roomNamespaces *serverutils.IncrementalDispatcher[string] - publisherNamespaces *serverutils.IncrementalDispatcher[string] - enabledExpiresAt time.Time + roomNamespaces *serverutils.IncrementalDispatcher[string] // deprecated + publisherNamespaces *serverutils.IncrementalDispatcher[string] // deprecated + roomAgentNames *serverutils.IncrementalDispatcher[string] + publisherAgentNames *serverutils.IncrementalDispatcher[string] + + enabledExpiresAt time.Time workers *workerpool.WorkerPool @@ -96,6 +100,8 @@ func NewAgentClient(bus psrpc.MessageBus) (Client, error) { c.mu.Lock() c.roomNamespaces = nil c.publisherNamespaces = nil + c.roomAgentNames = nil + c.publisherAgentNames = nil c.mu.Unlock() } @@ -111,68 +117,80 @@ func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) { jobTypeTopic = PublisherAgentTopic } - if !c.isNamespaceActive(desc.Namespace, desc.JobType) { - logger.Infow("not dispatching agent job since no worker is available", "namespace", desc.Namespace, "jobType", desc.JobType) + dispatcher := c.getDispatcher(desc.AgentName, desc.JobType) + + if dispatcher == nil { + logger.Infow("not dispatching agent job since no worker is available", "agentName", desc.AgentName, "jobType", desc.JobType) return } - c.workers.Submit(func() { - _, err := c.client.JobRequest(context.Background(), desc.Namespace, jobTypeTopic, &livekit.Job{ - Id: utils.NewGuid(utils.AgentJobPrefix), - Type: desc.JobType, - Room: desc.Room, - Participant: desc.Participant, - Namespace: desc.Namespace, - Metadata: desc.Metadata, + dispatcher.ForEach(func(curNs string) { + topic := GetAgentTopic(desc.AgentName, curNs) + c.workers.Submit(func() { + // The cached agent parameters do not provide the exact combination of available job type/agent name/namespace, so some of the JobRequest RPC may not trigger any worker + _, err := c.client.JobRequest(context.Background(), topic, jobTypeTopic, &livekit.Job{ + Id: utils.NewGuid(utils.AgentJobPrefix), + Type: desc.JobType, + Room: desc.Room, + Participant: desc.Participant, + Namespace: curNs, + AgentName: desc.AgentName, + Metadata: desc.Metadata, + }) + if err != nil { + logger.Infow("failed to send job request", "error", err, "namespace", curNs, "jobType", desc.JobType) + } }) - if err != nil { - logger.Infow("failed to send job request", "error", err, "namespace", desc.Namespace, "jobType", desc.JobType) - } }) } -func (c *agentClient) isNamespaceActive(ns string, jobType livekit.JobType) bool { +func (c *agentClient) getDispatcher(agName string, jobType livekit.JobType) *serverutils.IncrementalDispatcher[string] { c.mu.Lock() - if time.Since(c.enabledExpiresAt) > EnabledCacheTTL || c.roomNamespaces == nil || c.publisherNamespaces == nil { + if time.Since(c.enabledExpiresAt) > EnabledCacheTTL || c.roomNamespaces == nil || + c.publisherNamespaces == nil || c.roomAgentNames == nil || c.publisherAgentNames == nil { c.enabledExpiresAt = time.Now() c.roomNamespaces = serverutils.NewIncrementalDispatcher[string]() c.publisherNamespaces = serverutils.NewIncrementalDispatcher[string]() - go c.checkEnabled(c.roomNamespaces, c.publisherNamespaces) + c.roomAgentNames = serverutils.NewIncrementalDispatcher[string]() + c.publisherAgentNames = serverutils.NewIncrementalDispatcher[string]() + + go c.checkEnabled(c.roomNamespaces, c.publisherNamespaces, c.roomAgentNames, c.publisherAgentNames) } target := c.roomNamespaces + agentNames := c.roomAgentNames if jobType == livekit.JobType_JT_PUBLISHER { target = c.publisherNamespaces + agentNames = c.publisherAgentNames } - c.mu.Unlock() - done := make(chan bool, 1) - - go func() { - target.ForEach(func(curNs string) { - if curNs == ns { + done := make(chan *serverutils.IncrementalDispatcher[string], 1) + c.workers.Submit(func() { + agentNames.ForEach(func(ag string) { + if ag == agName { select { - case done <- true: + case done <- target: default: } - - return } }) select { - case done <- false: + case done <- nil: default: } - }() + }) return <-done } -func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces *serverutils.IncrementalDispatcher[string]) { +func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces, roomAgentNames, publisherAgentNames *serverutils.IncrementalDispatcher[string]) { defer roomNamespaces.Done() defer publisherNamespaces.Done() + defer roomAgentNames.Done() + defer publisherAgentNames.Done() + resChan, err := c.client.CheckEnabled(context.Background(), &rpc.CheckEnabledRequest{}, psrpc.WithRequestTimeout(CheckEnabledTimeout)) if err != nil { logger.Errorw("failed to check enabled", err) @@ -181,6 +199,8 @@ func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces *serverut roomNSMap := make(map[string]bool) publisherNSMap := make(map[string]bool) + roomAgMap := make(map[string]bool) + publisherAgMap := make(map[string]bool) for r := range resChan { if r.Result.GetRoomEnabled() { @@ -190,6 +210,12 @@ func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces *serverut roomNSMap[ns] = true } } + for _, ag := range r.Result.GetAgentNames() { + if _, ok := roomAgMap[ag]; !ok { + roomAgentNames.Add(ag) + roomAgMap[ag] = true + } + } } if r.Result.GetPublisherEnabled() { for _, ns := range r.Result.GetNamespaces() { @@ -198,6 +224,12 @@ func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces *serverut publisherNSMap[ns] = true } } + for _, ag := range r.Result.GetAgentNames() { + if _, ok := publisherAgMap[ag]; !ok { + publisherAgentNames.Add(ag) + publisherAgMap[ag] = true + } + } } } } @@ -207,3 +239,15 @@ func (c *agentClient) Stop() error { <-c.subDone return nil } + +func GetAgentTopic(agentName, namespace string) string { + if agentName == "" { + // Backward compatibility + return namespace + } else if namespace == "" { + // Forward compatibility once the namespace field is removed from the worker SDK + return agentName + } else { + return fmt.Sprintf("%s_%s", agentName, namespace) + } +} diff --git a/pkg/agent/worker.go b/pkg/agent/worker.go index a7d2a4bad..651b732de 100644 --- a/pkg/agent/worker.go +++ b/pkg/agent/worker.go @@ -78,6 +78,7 @@ type Worker struct { id string jobType livekit.JobType version string + agentName string name string namespace string load float32 @@ -161,6 +162,12 @@ func (w *Worker) Namespace() string { return w.namespace } +func (w *Worker) AgentName() string { + w.mu.Lock() + defer w.mu.Unlock() + return w.agentName +} + func (w *Worker) Status() livekit.WorkerStatus { w.mu.Lock() defer w.mu.Unlock() @@ -318,6 +325,7 @@ func (w *Worker) handleRegister(req *livekit.RegisterWorkerRequest) { w.version = req.Version w.name = req.Name + w.agentName = req.GetAgentName() w.namespace = req.GetNamespace() w.jobType = req.GetType() @@ -408,6 +416,7 @@ func (w *Worker) handleSimulateJob(simulate *livekit.SimulateJobRequest) { Room: simulate.Room, Participant: simulate.Participant, Namespace: w.Namespace(), + AgentName: w.AgentName(), } go func() { diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 6d1f28b69..27c9a3856 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -1432,34 +1432,14 @@ func (r *Room) launchPublisherAgents(p types.Participant) { return } - for _, ag := range r.internal.Agents { - if ag.Type != livekit.JobType_JT_PUBLISHER { - continue - } - - var startAgent bool - - if len(ag.ParticipantIdentity) == 0 { - // If no participant given, start for all participants - startAgent = true - } else { - for _, pi := range ag.ParticipantIdentity { - if pi == string(p.Identity()) { - startAgent = true - break - } - } - } - - if startAgent { - go r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{ - JobType: livekit.JobType_JT_PUBLISHER, - Room: r.ToProto(), - Participant: p.ToProto(), - Metadata: ag.Metadata, - Namespace: ag.Namespace, - }) - } + for _, ag := range r.internal.AgentDispatches { + go r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{ + JobType: livekit.JobType_JT_PUBLISHER, + Room: r.ToProto(), + Participant: p.ToProto(), + Metadata: ag.Metadata, + AgentName: ag.AgentName, + }) } } diff --git a/pkg/service/agentservice.go b/pkg/service/agentservice.go index dd3cbd6dc..826680fbc 100644 --- a/pkg/service/agentservice.go +++ b/pkg/service/agentservice.go @@ -94,14 +94,15 @@ type AgentHandler struct { namespaceWorkers map[workerKey][]*agent.Worker roomKeyCount int publisherKeyCount int - // TODO remove once deprecated CheckEnabled is removed - namespaces []string + namespaces []string // namespaces deprecated + agentNames []string roomTopic string publisherTopic string } type workerKey struct { + agentName string namespace string jobType livekit.JobType } @@ -203,17 +204,18 @@ func (h *AgentHandler) HandleConnection(ctx context.Context, conn agent.SignalCo func (h *AgentHandler) HandleWorkerRegister(w *agent.Worker) { h.mu.Lock() - key := workerKey{w.Namespace(), w.JobType()} + key := workerKey{w.AgentName(), w.Namespace(), w.JobType()} workers := h.namespaceWorkers[key] created := len(workers) == 0 if created { - topic := h.roomTopic + nameTopic := agent.GetAgentTopic(w.AgentName(), w.Namespace()) + typeTopic := h.roomTopic if w.JobType() == livekit.JobType_JT_PUBLISHER { - topic = h.publisherTopic + typeTopic = h.publisherTopic } - err := h.agentServer.RegisterJobRequestTopic(w.Namespace(), topic) + err := h.agentServer.RegisterJobRequestTopic(nameTopic, typeTopic) if err != nil { h.mu.Unlock() @@ -230,16 +232,19 @@ func (h *AgentHandler) HandleWorkerRegister(w *agent.Worker) { h.namespaces = append(h.namespaces, w.Namespace()) sort.Strings(h.namespaces) + h.agentNames = append(h.agentNames, w.AgentName()) + sort.Strings(h.agentNames) + } h.namespaceWorkers[key] = append(workers, w) h.mu.Unlock() if created { - h.logger.Infow("initial worker registered", "namespace", w.Namespace(), "jobType", w.JobType()) + h.logger.Infow("initial worker registered", "namespace", w.Namespace(), "jobType", w.JobType(), "agentName", w.AgentName()) err := h.agentServer.PublishWorkerRegistered(context.Background(), agent.DefaultHandlerNamespace, &emptypb.Empty{}) if err != nil { - w.Logger().Errorw("failed to publish worker registered", err) + w.Logger().Errorw("failed to publish worker registered", err, "namespace", w.Namespace(), "jobType", w.JobType(), "agentName", w.AgentName()) } } } @@ -248,7 +253,7 @@ func (h *AgentHandler) HandleWorkerDeregister(w *agent.Worker) { h.mu.Lock() defer h.mu.Unlock() - key := workerKey{w.Namespace(), w.JobType()} + key := workerKey{w.AgentName(), w.Namespace(), w.JobType()} workers, ok := h.namespaceWorkers[key] if !ok { @@ -262,25 +267,30 @@ func (h *AgentHandler) HandleWorkerDeregister(w *agent.Worker) { if len(workers) > 1 { h.namespaceWorkers[key] = slices.Delete(workers, index, index+1) } else { - h.logger.Debugw("last worker deregistered") + h.logger.Debugw("last worker deregistered", "namespace", w.Namespace(), "jobType", w.JobType(), "agentName", w.AgentName()) delete(h.namespaceWorkers, key) + topic := agent.GetAgentTopic(w.AgentName(), w.Namespace()) if w.JobType() == livekit.JobType_JT_ROOM { h.roomKeyCount-- - h.agentServer.DeregisterJobRequestTopic(w.Namespace(), h.roomTopic) + h.agentServer.DeregisterJobRequestTopic(topic, h.roomTopic) } else { h.publisherKeyCount-- - h.agentServer.DeregisterJobRequestTopic(w.Namespace(), h.publisherTopic) + h.agentServer.DeregisterJobRequestTopic(topic, h.publisherTopic) } + // agentNames and namespaces contains repeated entries for each agentNames/namespaces combinations if i := slices.Index(h.namespaces, w.Namespace()); i != -1 { h.namespaces = slices.Delete(h.namespaces, i, i+1) } + if i := slices.Index(h.agentNames, w.AgentName()); i != -1 { + h.agentNames = slices.Delete(h.agentNames, i, i+1) + } } } func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*emptypb.Empty, error) { - key := workerKey{job.Namespace, job.Type} + key := workerKey{job.AgentName, job.Namespace, job.Type} attempted := make(map[*agent.Worker]struct{}) for { h.mu.Lock() @@ -312,6 +322,7 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*empty values := []interface{}{ "jobID", job.Id, "namespace", job.Namespace, + "agentName", job.AgentName, "workerID", selected.ID(), } if job.Room != nil { @@ -320,7 +331,7 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*empty if job.Participant != nil { values = append(values, "participant", job.Participant.Identity) } - logger.Debugw("assigning job", values...) + h.logger.Debugw("assigning job", values...) err := selected.AssignJob(ctx, job) if err != nil { if errors.Is(err, agent.ErrWorkerNotAvailable) { @@ -340,7 +351,7 @@ func (h *AgentHandler) JobRequestAffinity(ctx context.Context, job *livekit.Job) var affinity float32 var maxLoad float32 for _, w := range h.workers { - if w.Namespace() != job.Namespace || w.JobType() != job.Type { + if w.AgentName() != job.AgentName || w.Namespace() != job.Namespace || w.JobType() != job.Type { continue } @@ -362,8 +373,11 @@ func (h *AgentHandler) CheckEnabled(ctx context.Context, req *rpc.CheckEnabledRe h.mu.Lock() defer h.mu.Unlock() + // This doesn't return the full agentName -> namespace mapping, which can cause some unnecessary RPC. + // namespaces are however deprecated. return &rpc.CheckEnabledResponse{ Namespaces: slices.Compact(slices.Clone(h.namespaces)), + AgentNames: slices.Compact(slices.Clone(h.agentNames)), RoomEnabled: h.roomKeyCount != 0, PublisherEnabled: h.publisherKeyCount != 0, }, nil diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go index eecb2bb30..73d7b4fce 100644 --- a/pkg/service/roomallocator.go +++ b/pkg/service/roomallocator.go @@ -107,23 +107,14 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre } } if req.Agent == nil { - // Backward compatibility: by default, start any agent in the empty namespace + // Backward compatibility: by default, start any agent in the empty JobName req.Agent = &livekit.RoomAgent{ - Agents: []*livekit.CreateAgentJobDefinitionRequest{ - &livekit.CreateAgentJobDefinitionRequest{ - Type: livekit.JobType_JT_ROOM, - Room: req.Name, - Namespace: "default", - }, - &livekit.CreateAgentJobDefinitionRequest{ - Type: livekit.JobType_JT_PUBLISHER, - Room: req.Name, - Namespace: "default", - }, + Dispatches: []*livekit.RoomAgentDispatch{ + &livekit.RoomAgentDispatch{}, }, } } - internal.Agents = req.Agent.Agents + internal.AgentDispatches = req.Agent.Dispatches if req.MinPlayoutDelay > 0 || req.MaxPlayoutDelay > 0 { internal.PlayoutDelay = &livekit.PlayoutDelay{ Enabled: true, diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 2cc379bd4..233a82313 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -105,8 +105,8 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq if created { _, internal, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Name), true) - if internal.Agents != nil { - err = s.launchAgents(ctx, rm, internal.Agents) + if internal.AgentDispatches != nil { + err = s.launchAgents(ctx, rm, internal.AgentDispatches) if err != nil { return nil, err } @@ -130,17 +130,13 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return rm, nil } -func (s *RoomService) launchAgents(ctx context.Context, rm *livekit.Room, agents []*livekit.CreateAgentJobDefinitionRequest) error { +func (s *RoomService) launchAgents(ctx context.Context, rm *livekit.Room, agents []*livekit.RoomAgentDispatch) error { for _, ag := range agents { - if ag.Type != livekit.JobType_JT_ROOM { - continue - } - go s.agentClient.LaunchJob(ctx, &agent.JobRequest{ - JobType: ag.Type, + JobType: livekit.JobType_JT_ROOM, Room: rm, Metadata: ag.Metadata, - Namespace: ag.Namespace, + AgentName: ag.AgentName, }) } @@ -344,7 +340,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat } if created { - err = s.launchAgents(ctx, room, internal.Agents) + err = s.launchAgents(ctx, room, internal.AgentDispatches) if err != nil { return nil, err } diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 86952b767..2d8b18c8b 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -527,16 +527,12 @@ func (s *RTCService) startConnection( return connectionResult{}, nil, err } - for _, ag := range internal.Agents { - if ag.Type != livekit.JobType_JT_ROOM { - continue - } - + for _, ag := range internal.AgentDispatches { go s.agentClient.LaunchJob(ctx, &agent.JobRequest{ - JobType: ag.Type, + JobType: livekit.JobType_JT_ROOM, Room: cr.Room, Metadata: ag.Metadata, - Namespace: ag.Namespace, + AgentName: ag.AgentName, }) } } diff --git a/test/agent_test.go b/test/agent_test.go index 7db5b4b6f..9031b9505 100644 --- a/test/agent_test.go +++ b/test/agent_test.go @@ -121,12 +121,10 @@ func TestAgentNamespaces(t *testing.T) { _, err = roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{ Name: testRoom, Agent: &livekit.RoomAgent{ - Agents: []*livekit.CreateAgentJobDefinitionRequest{ - &livekit.CreateAgentJobDefinitionRequest{ - Namespace: "namespace1", - }, - &livekit.CreateAgentJobDefinitionRequest{ - Namespace: "namespace2", + Dispatches: []*livekit.RoomAgentDispatch{ + &livekit.RoomAgentDispatch{}, + &livekit.RoomAgentDispatch{ + AgentName: "ag", }, }, },