Update agents service to updated protocol (#2837)

- Deprecate namespace field
- Restore former semantic of starting a job for each registered namespace, for a given Agent Name
- Add agentName field
- Use "dispatcher" naming convention
This commit is contained in:
Benjamin Pracht
2024-07-08 17:09:11 -07:00
committed by GitHub
parent e4fba5634a
commit fb7eb3450e
10 changed files with 142 additions and 114 deletions
+1 -1
View File
@@ -20,7 +20,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7
github.com/livekit/protocol v1.19.2-0.20240705134535-94a2cfe2f1ee
github.com/livekit/protocol v1.19.2-0.20240706015329-8c1eef4468ee
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0
+2 -2
View File
@@ -167,8 +167,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 h1:F1L8inJoynwIAYpZENNYS+1xHJMF5RFRorsnAlcxfSY=
github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.19.2-0.20240705134535-94a2cfe2f1ee h1:J1U5fqAB5wJ4+Dl/DAf43Eiw+syyLTKAJoGuUj3rjQI=
github.com/livekit/protocol v1.19.2-0.20240705134535-94a2cfe2f1ee/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4=
github.com/livekit/protocol v1.19.2-0.20240706015329-8c1eef4468ee h1:t+EHiCHcxOe/hH3KZNhai+0lscBBs6HoYLg09wXAhuE=
github.com/livekit/protocol v1.19.2-0.20240706015329-8c1eef4468ee/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4=
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo=
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=
+76 -32
View File
@@ -16,6 +16,7 @@ package agent
import (
"context"
"fmt"
"sync"
"time"
@@ -51,7 +52,7 @@ type JobRequest struct {
// only set for participant jobs
Participant *livekit.ParticipantInfo
Metadata string
Namespace string
AgentName string
}
type agentClient struct {
@@ -61,9 +62,12 @@ type agentClient struct {
// cache response to avoid constantly checking with controllers
// cache is invalidated with AgentRegistered updates
roomNamespaces *serverutils.IncrementalDispatcher[string]
publisherNamespaces *serverutils.IncrementalDispatcher[string]
enabledExpiresAt time.Time
roomNamespaces *serverutils.IncrementalDispatcher[string] // deprecated
publisherNamespaces *serverutils.IncrementalDispatcher[string] // deprecated
roomAgentNames *serverutils.IncrementalDispatcher[string]
publisherAgentNames *serverutils.IncrementalDispatcher[string]
enabledExpiresAt time.Time
workers *workerpool.WorkerPool
@@ -96,6 +100,8 @@ func NewAgentClient(bus psrpc.MessageBus) (Client, error) {
c.mu.Lock()
c.roomNamespaces = nil
c.publisherNamespaces = nil
c.roomAgentNames = nil
c.publisherAgentNames = nil
c.mu.Unlock()
}
@@ -111,68 +117,80 @@ func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) {
jobTypeTopic = PublisherAgentTopic
}
if !c.isNamespaceActive(desc.Namespace, desc.JobType) {
logger.Infow("not dispatching agent job since no worker is available", "namespace", desc.Namespace, "jobType", desc.JobType)
dispatcher := c.getDispatcher(desc.AgentName, desc.JobType)
if dispatcher == nil {
logger.Infow("not dispatching agent job since no worker is available", "agentName", desc.AgentName, "jobType", desc.JobType)
return
}
c.workers.Submit(func() {
_, err := c.client.JobRequest(context.Background(), desc.Namespace, jobTypeTopic, &livekit.Job{
Id: utils.NewGuid(utils.AgentJobPrefix),
Type: desc.JobType,
Room: desc.Room,
Participant: desc.Participant,
Namespace: desc.Namespace,
Metadata: desc.Metadata,
dispatcher.ForEach(func(curNs string) {
topic := GetAgentTopic(desc.AgentName, curNs)
c.workers.Submit(func() {
// The cached agent parameters do not provide the exact combination of available job type/agent name/namespace, so some of the JobRequest RPC may not trigger any worker
_, err := c.client.JobRequest(context.Background(), topic, jobTypeTopic, &livekit.Job{
Id: utils.NewGuid(utils.AgentJobPrefix),
Type: desc.JobType,
Room: desc.Room,
Participant: desc.Participant,
Namespace: curNs,
AgentName: desc.AgentName,
Metadata: desc.Metadata,
})
if err != nil {
logger.Infow("failed to send job request", "error", err, "namespace", curNs, "jobType", desc.JobType)
}
})
if err != nil {
logger.Infow("failed to send job request", "error", err, "namespace", desc.Namespace, "jobType", desc.JobType)
}
})
}
func (c *agentClient) isNamespaceActive(ns string, jobType livekit.JobType) bool {
func (c *agentClient) getDispatcher(agName string, jobType livekit.JobType) *serverutils.IncrementalDispatcher[string] {
c.mu.Lock()
if time.Since(c.enabledExpiresAt) > EnabledCacheTTL || c.roomNamespaces == nil || c.publisherNamespaces == nil {
if time.Since(c.enabledExpiresAt) > EnabledCacheTTL || c.roomNamespaces == nil ||
c.publisherNamespaces == nil || c.roomAgentNames == nil || c.publisherAgentNames == nil {
c.enabledExpiresAt = time.Now()
c.roomNamespaces = serverutils.NewIncrementalDispatcher[string]()
c.publisherNamespaces = serverutils.NewIncrementalDispatcher[string]()
go c.checkEnabled(c.roomNamespaces, c.publisherNamespaces)
c.roomAgentNames = serverutils.NewIncrementalDispatcher[string]()
c.publisherAgentNames = serverutils.NewIncrementalDispatcher[string]()
go c.checkEnabled(c.roomNamespaces, c.publisherNamespaces, c.roomAgentNames, c.publisherAgentNames)
}
target := c.roomNamespaces
agentNames := c.roomAgentNames
if jobType == livekit.JobType_JT_PUBLISHER {
target = c.publisherNamespaces
agentNames = c.publisherAgentNames
}
c.mu.Unlock()
done := make(chan bool, 1)
go func() {
target.ForEach(func(curNs string) {
if curNs == ns {
done := make(chan *serverutils.IncrementalDispatcher[string], 1)
c.workers.Submit(func() {
agentNames.ForEach(func(ag string) {
if ag == agName {
select {
case done <- true:
case done <- target:
default:
}
return
}
})
select {
case done <- false:
case done <- nil:
default:
}
}()
})
return <-done
}
func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces *serverutils.IncrementalDispatcher[string]) {
func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces, roomAgentNames, publisherAgentNames *serverutils.IncrementalDispatcher[string]) {
defer roomNamespaces.Done()
defer publisherNamespaces.Done()
defer roomAgentNames.Done()
defer publisherAgentNames.Done()
resChan, err := c.client.CheckEnabled(context.Background(), &rpc.CheckEnabledRequest{}, psrpc.WithRequestTimeout(CheckEnabledTimeout))
if err != nil {
logger.Errorw("failed to check enabled", err)
@@ -181,6 +199,8 @@ func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces *serverut
roomNSMap := make(map[string]bool)
publisherNSMap := make(map[string]bool)
roomAgMap := make(map[string]bool)
publisherAgMap := make(map[string]bool)
for r := range resChan {
if r.Result.GetRoomEnabled() {
@@ -190,6 +210,12 @@ func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces *serverut
roomNSMap[ns] = true
}
}
for _, ag := range r.Result.GetAgentNames() {
if _, ok := roomAgMap[ag]; !ok {
roomAgentNames.Add(ag)
roomAgMap[ag] = true
}
}
}
if r.Result.GetPublisherEnabled() {
for _, ns := range r.Result.GetNamespaces() {
@@ -198,6 +224,12 @@ func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces *serverut
publisherNSMap[ns] = true
}
}
for _, ag := range r.Result.GetAgentNames() {
if _, ok := publisherAgMap[ag]; !ok {
publisherAgentNames.Add(ag)
publisherAgMap[ag] = true
}
}
}
}
}
@@ -207,3 +239,15 @@ func (c *agentClient) Stop() error {
<-c.subDone
return nil
}
func GetAgentTopic(agentName, namespace string) string {
if agentName == "" {
// Backward compatibility
return namespace
} else if namespace == "" {
// Forward compatibility once the namespace field is removed from the worker SDK
return agentName
} else {
return fmt.Sprintf("%s_%s", agentName, namespace)
}
}
+9
View File
@@ -78,6 +78,7 @@ type Worker struct {
id string
jobType livekit.JobType
version string
agentName string
name string
namespace string
load float32
@@ -161,6 +162,12 @@ func (w *Worker) Namespace() string {
return w.namespace
}
func (w *Worker) AgentName() string {
w.mu.Lock()
defer w.mu.Unlock()
return w.agentName
}
func (w *Worker) Status() livekit.WorkerStatus {
w.mu.Lock()
defer w.mu.Unlock()
@@ -318,6 +325,7 @@ func (w *Worker) handleRegister(req *livekit.RegisterWorkerRequest) {
w.version = req.Version
w.name = req.Name
w.agentName = req.GetAgentName()
w.namespace = req.GetNamespace()
w.jobType = req.GetType()
@@ -408,6 +416,7 @@ func (w *Worker) handleSimulateJob(simulate *livekit.SimulateJobRequest) {
Room: simulate.Room,
Participant: simulate.Participant,
Namespace: w.Namespace(),
AgentName: w.AgentName(),
}
go func() {
+8 -28
View File
@@ -1432,34 +1432,14 @@ func (r *Room) launchPublisherAgents(p types.Participant) {
return
}
for _, ag := range r.internal.Agents {
if ag.Type != livekit.JobType_JT_PUBLISHER {
continue
}
var startAgent bool
if len(ag.ParticipantIdentity) == 0 {
// If no participant given, start for all participants
startAgent = true
} else {
for _, pi := range ag.ParticipantIdentity {
if pi == string(p.Identity()) {
startAgent = true
break
}
}
}
if startAgent {
go r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{
JobType: livekit.JobType_JT_PUBLISHER,
Room: r.ToProto(),
Participant: p.ToProto(),
Metadata: ag.Metadata,
Namespace: ag.Namespace,
})
}
for _, ag := range r.internal.AgentDispatches {
go r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{
JobType: livekit.JobType_JT_PUBLISHER,
Room: r.ToProto(),
Participant: p.ToProto(),
Metadata: ag.Metadata,
AgentName: ag.AgentName,
})
}
}
+29 -15
View File
@@ -94,14 +94,15 @@ type AgentHandler struct {
namespaceWorkers map[workerKey][]*agent.Worker
roomKeyCount int
publisherKeyCount int
// TODO remove once deprecated CheckEnabled is removed
namespaces []string
namespaces []string // namespaces deprecated
agentNames []string
roomTopic string
publisherTopic string
}
type workerKey struct {
agentName string
namespace string
jobType livekit.JobType
}
@@ -203,17 +204,18 @@ func (h *AgentHandler) HandleConnection(ctx context.Context, conn agent.SignalCo
func (h *AgentHandler) HandleWorkerRegister(w *agent.Worker) {
h.mu.Lock()
key := workerKey{w.Namespace(), w.JobType()}
key := workerKey{w.AgentName(), w.Namespace(), w.JobType()}
workers := h.namespaceWorkers[key]
created := len(workers) == 0
if created {
topic := h.roomTopic
nameTopic := agent.GetAgentTopic(w.AgentName(), w.Namespace())
typeTopic := h.roomTopic
if w.JobType() == livekit.JobType_JT_PUBLISHER {
topic = h.publisherTopic
typeTopic = h.publisherTopic
}
err := h.agentServer.RegisterJobRequestTopic(w.Namespace(), topic)
err := h.agentServer.RegisterJobRequestTopic(nameTopic, typeTopic)
if err != nil {
h.mu.Unlock()
@@ -230,16 +232,19 @@ func (h *AgentHandler) HandleWorkerRegister(w *agent.Worker) {
h.namespaces = append(h.namespaces, w.Namespace())
sort.Strings(h.namespaces)
h.agentNames = append(h.agentNames, w.AgentName())
sort.Strings(h.agentNames)
}
h.namespaceWorkers[key] = append(workers, w)
h.mu.Unlock()
if created {
h.logger.Infow("initial worker registered", "namespace", w.Namespace(), "jobType", w.JobType())
h.logger.Infow("initial worker registered", "namespace", w.Namespace(), "jobType", w.JobType(), "agentName", w.AgentName())
err := h.agentServer.PublishWorkerRegistered(context.Background(), agent.DefaultHandlerNamespace, &emptypb.Empty{})
if err != nil {
w.Logger().Errorw("failed to publish worker registered", err)
w.Logger().Errorw("failed to publish worker registered", err, "namespace", w.Namespace(), "jobType", w.JobType(), "agentName", w.AgentName())
}
}
}
@@ -248,7 +253,7 @@ func (h *AgentHandler) HandleWorkerDeregister(w *agent.Worker) {
h.mu.Lock()
defer h.mu.Unlock()
key := workerKey{w.Namespace(), w.JobType()}
key := workerKey{w.AgentName(), w.Namespace(), w.JobType()}
workers, ok := h.namespaceWorkers[key]
if !ok {
@@ -262,25 +267,30 @@ func (h *AgentHandler) HandleWorkerDeregister(w *agent.Worker) {
if len(workers) > 1 {
h.namespaceWorkers[key] = slices.Delete(workers, index, index+1)
} else {
h.logger.Debugw("last worker deregistered")
h.logger.Debugw("last worker deregistered", "namespace", w.Namespace(), "jobType", w.JobType(), "agentName", w.AgentName())
delete(h.namespaceWorkers, key)
topic := agent.GetAgentTopic(w.AgentName(), w.Namespace())
if w.JobType() == livekit.JobType_JT_ROOM {
h.roomKeyCount--
h.agentServer.DeregisterJobRequestTopic(w.Namespace(), h.roomTopic)
h.agentServer.DeregisterJobRequestTopic(topic, h.roomTopic)
} else {
h.publisherKeyCount--
h.agentServer.DeregisterJobRequestTopic(w.Namespace(), h.publisherTopic)
h.agentServer.DeregisterJobRequestTopic(topic, h.publisherTopic)
}
// agentNames and namespaces contains repeated entries for each agentNames/namespaces combinations
if i := slices.Index(h.namespaces, w.Namespace()); i != -1 {
h.namespaces = slices.Delete(h.namespaces, i, i+1)
}
if i := slices.Index(h.agentNames, w.AgentName()); i != -1 {
h.agentNames = slices.Delete(h.agentNames, i, i+1)
}
}
}
func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*emptypb.Empty, error) {
key := workerKey{job.Namespace, job.Type}
key := workerKey{job.AgentName, job.Namespace, job.Type}
attempted := make(map[*agent.Worker]struct{})
for {
h.mu.Lock()
@@ -312,6 +322,7 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*empty
values := []interface{}{
"jobID", job.Id,
"namespace", job.Namespace,
"agentName", job.AgentName,
"workerID", selected.ID(),
}
if job.Room != nil {
@@ -320,7 +331,7 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*empty
if job.Participant != nil {
values = append(values, "participant", job.Participant.Identity)
}
logger.Debugw("assigning job", values...)
h.logger.Debugw("assigning job", values...)
err := selected.AssignJob(ctx, job)
if err != nil {
if errors.Is(err, agent.ErrWorkerNotAvailable) {
@@ -340,7 +351,7 @@ func (h *AgentHandler) JobRequestAffinity(ctx context.Context, job *livekit.Job)
var affinity float32
var maxLoad float32
for _, w := range h.workers {
if w.Namespace() != job.Namespace || w.JobType() != job.Type {
if w.AgentName() != job.AgentName || w.Namespace() != job.Namespace || w.JobType() != job.Type {
continue
}
@@ -362,8 +373,11 @@ func (h *AgentHandler) CheckEnabled(ctx context.Context, req *rpc.CheckEnabledRe
h.mu.Lock()
defer h.mu.Unlock()
// This doesn't return the full agentName -> namespace mapping, which can cause some unnecessary RPC.
// namespaces are however deprecated.
return &rpc.CheckEnabledResponse{
Namespaces: slices.Compact(slices.Clone(h.namespaces)),
AgentNames: slices.Compact(slices.Clone(h.agentNames)),
RoomEnabled: h.roomKeyCount != 0,
PublisherEnabled: h.publisherKeyCount != 0,
}, nil
+4 -13
View File
@@ -107,23 +107,14 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
}
}
if req.Agent == nil {
// Backward compatibility: by default, start any agent in the empty namespace
// Backward compatibility: by default, start any agent in the empty JobName
req.Agent = &livekit.RoomAgent{
Agents: []*livekit.CreateAgentJobDefinitionRequest{
&livekit.CreateAgentJobDefinitionRequest{
Type: livekit.JobType_JT_ROOM,
Room: req.Name,
Namespace: "default",
},
&livekit.CreateAgentJobDefinitionRequest{
Type: livekit.JobType_JT_PUBLISHER,
Room: req.Name,
Namespace: "default",
},
Dispatches: []*livekit.RoomAgentDispatch{
&livekit.RoomAgentDispatch{},
},
}
}
internal.Agents = req.Agent.Agents
internal.AgentDispatches = req.Agent.Dispatches
if req.MinPlayoutDelay > 0 || req.MaxPlayoutDelay > 0 {
internal.PlayoutDelay = &livekit.PlayoutDelay{
Enabled: true,
+6 -10
View File
@@ -105,8 +105,8 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
if created {
_, internal, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Name), true)
if internal.Agents != nil {
err = s.launchAgents(ctx, rm, internal.Agents)
if internal.AgentDispatches != nil {
err = s.launchAgents(ctx, rm, internal.AgentDispatches)
if err != nil {
return nil, err
}
@@ -130,17 +130,13 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
return rm, nil
}
func (s *RoomService) launchAgents(ctx context.Context, rm *livekit.Room, agents []*livekit.CreateAgentJobDefinitionRequest) error {
func (s *RoomService) launchAgents(ctx context.Context, rm *livekit.Room, agents []*livekit.RoomAgentDispatch) error {
for _, ag := range agents {
if ag.Type != livekit.JobType_JT_ROOM {
continue
}
go s.agentClient.LaunchJob(ctx, &agent.JobRequest{
JobType: ag.Type,
JobType: livekit.JobType_JT_ROOM,
Room: rm,
Metadata: ag.Metadata,
Namespace: ag.Namespace,
AgentName: ag.AgentName,
})
}
@@ -344,7 +340,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
}
if created {
err = s.launchAgents(ctx, room, internal.Agents)
err = s.launchAgents(ctx, room, internal.AgentDispatches)
if err != nil {
return nil, err
}
+3 -7
View File
@@ -527,16 +527,12 @@ func (s *RTCService) startConnection(
return connectionResult{}, nil, err
}
for _, ag := range internal.Agents {
if ag.Type != livekit.JobType_JT_ROOM {
continue
}
for _, ag := range internal.AgentDispatches {
go s.agentClient.LaunchJob(ctx, &agent.JobRequest{
JobType: ag.Type,
JobType: livekit.JobType_JT_ROOM,
Room: cr.Room,
Metadata: ag.Metadata,
Namespace: ag.Namespace,
AgentName: ag.AgentName,
})
}
}
+4 -6
View File
@@ -121,12 +121,10 @@ func TestAgentNamespaces(t *testing.T) {
_, err = roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{
Name: testRoom,
Agent: &livekit.RoomAgent{
Agents: []*livekit.CreateAgentJobDefinitionRequest{
&livekit.CreateAgentJobDefinitionRequest{
Namespace: "namespace1",
},
&livekit.CreateAgentJobDefinitionRequest{
Namespace: "namespace2",
Dispatches: []*livekit.RoomAgentDispatch{
&livekit.RoomAgentDispatch{},
&livekit.RoomAgentDispatch{
AgentName: "ag",
},
},
},