From 2d6aa049c94bc9a96e8a68e671cc45f1a14769a9 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sun, 13 Oct 2024 00:56:51 -0700 Subject: [PATCH] Improve agent job assignment logging (#3090) * Improve agent job assignment logging * add more agent logging --------- Co-authored-by: Paul Wells --- pkg/agent/worker.go | 16 ++++++++--- pkg/service/agentservice.go | 53 ++++++++++++++++++++++++------------- 2 files changed, 46 insertions(+), 23 deletions(-) diff --git a/pkg/agent/worker.go b/pkg/agent/worker.go index 2ed7446ba..290050741 100644 --- a/pkg/agent/worker.go +++ b/pkg/agent/worker.go @@ -360,7 +360,15 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) (*livekit.JobS job.State.ParticipantIdentity = res.ParticipantIdentity - token, err := pagent.BuildAgentToken(w.apiKey, w.apiSecret, job.Room.Name, res.ParticipantIdentity, res.ParticipantName, res.ParticipantMetadata, w.Permissions) + token, err := pagent.BuildAgentToken( + w.apiKey, + w.apiSecret, + job.Room.Name, + res.ParticipantIdentity, + res.ParticipantName, + res.ParticipantMetadata, + w.Permissions, + ) if err != nil { w.logger.Errorw("failed to build agent token", err) return nil, err @@ -438,7 +446,7 @@ func (w *Worker) Close() { return } - w.logger.Infow("closing worker") + w.logger.Infow("closing worker", "workerID", w.ID, "jobType", w.JobType, "agentName", w.AgentName) close(w.closed) w.cancel() @@ -453,7 +461,7 @@ func (w *Worker) HandleAvailability(res *livekit.AvailabilityResponse) error { jobID := livekit.JobID(res.JobId) availCh, ok := w.availability[jobID] if !ok { - w.logger.Warnw("received availability response for unknown job", nil, "jobId", jobID) + w.logger.Warnw("received availability response for unknown job", nil, "jobID", jobID) return nil } @@ -519,7 +527,7 @@ func (w *Worker) HandleSimulateJob(simulate *livekit.SimulateJobRequest) error { go func() { _, err := w.AssignJob(w.ctx, job) if err != nil { - w.logger.Errorw("unable to simulate job", err, "jobId", job.Id) + w.logger.Errorw("unable to simulate job", err, "jobID", job.Id) } }() diff --git a/pkg/service/agentservice.go b/pkg/service/agentservice.go index 70e68b902..92a36a9b3 100644 --- a/pkg/service/agentservice.go +++ b/pkg/service/agentservice.go @@ -89,7 +89,7 @@ func DispatchAgentWorkerSignal(c agent.SignalConn, h agent.WorkerSignalHandler, req, _, err := c.ReadWorkerMessage() if err != nil { if IsWebSocketCloseError(err) { - l.Infow("worker closed WS connection", "wsError", err) + l.Debugw("worker closed WS connection", "wsError", err) } else { l.Errorw("error reading from websocket", err) } @@ -200,7 +200,7 @@ func NewAgentHandler( ) *AgentHandler { return &AgentHandler{ agentServer: agentServer, - logger: logger, + logger: logger.WithComponent("agents"), workers: make(map[string]*agent.Worker), jobToWorker: make(map[livekit.JobID]*agent.Worker), namespaceWorkers: make(map[workerKey][]*agent.Worker), @@ -272,9 +272,15 @@ func (h *AgentHandler) registerWorker(w *agent.Worker) { h.namespaceWorkers[key] = append(workers, w) h.mu.Unlock() + h.logger.Infow("worker registered", + "namespace", w.Namespace, + "jobType", w.JobType, + "agentName", w.AgentName, + "workerID", w.ID, + ) if created { - h.logger.Infow("initial worker registered", "namespace", w.Namespace, "jobType", w.JobType, "agentName", w.AgentName) err := h.agentServer.PublishWorkerRegistered(context.Background(), agent.DefaultHandlerNamespace, &emptypb.Empty{}) + // TODO: when this happens, should we disconnect the worker so it'll retry? if err != nil { w.Logger().Errorw("failed to publish worker registered", err, "namespace", w.Namespace, "jobType", w.JobType, "agentName", w.AgentName) } @@ -301,7 +307,12 @@ func (h *AgentHandler) deregisterWorker(w *agent.Worker) { if len(workers) > 1 { h.namespaceWorkers[key] = slices.Delete(workers, index, index+1) } else { - h.logger.Debugw("last worker deregistered", "namespace", w.Namespace, "jobType", w.JobType, "agentName", w.AgentName) + h.logger.Infow("last worker deregistered", + "namespace", w.Namespace, + "jobType", w.JobType, + "agentName", w.AgentName, + "workerID", w.ID, + ) delete(h.namespaceWorkers, key) topic := agent.GetAgentTopic(w.AgentName, w.Namespace) @@ -337,43 +348,47 @@ func (h *AgentHandler) deregisterJob(jobID livekit.JobID) { } func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*rpc.JobRequestResponse, error) { + logger := h.logger.WithUnlikelyValues( + "jobID", job.Id, + "namespace", job.Namespace, + "agentName", job.AgentName, + ) + if job.Room != nil { + logger = logger.WithValues("room", job.Room.Name, "roomID", job.Room.Sid) + } + if job.Participant != nil { + logger = logger.WithValues("participant", job.Participant.Identity) + } + key := workerKey{job.AgentName, job.Namespace, job.Type} attempted := make(map[*agent.Worker]struct{}) for { selected, err := h.selectWorkerWeightedByLoad(key, attempted) if err != nil { + logger.Warnw("no worker available to handle job", err) return nil, psrpc.NewError(psrpc.ResourceExhausted, err) } + logger := logger.WithValues("workerID", selected.ID) attempted[selected] = struct{}{} - values := []interface{}{ - "jobID", job.Id, - "namespace", job.Namespace, - "agentName", job.AgentName, - "workerID", selected.ID, - } - if job.Room != nil { - values = append(values, "room", job.Room.Name, "roomID", job.Room.Sid) - } - if job.Participant != nil { - values = append(values, "participant", job.Participant.Identity) - } - h.logger.Debugw("assigning job", values...) state, err := selected.AssignJob(ctx, job) if err != nil { - if utils.ErrorIsOneOf(err, agent.ErrWorkerNotAvailable, agent.ErrWorkerClosed) { + retry := utils.ErrorIsOneOf(err, agent.ErrWorkerNotAvailable, agent.ErrWorkerClosed) + logger.Warnw("failed to assign job to worker", err, "retry", retry) + if retry { continue // Try another worker } return nil, err } + logger.Infow("assigned job to worker") h.mu.Lock() h.jobToWorker[livekit.JobID(job.Id)] = selected h.mu.Unlock() err = h.agentServer.RegisterJobTerminateTopic(job.Id) if err != nil { - h.logger.Errorw("failes registering JobTerminate handler", err, values...) + logger.Errorw("failed to register JobTerminate handler", err) } return &rpc.JobRequestResponse{