Improve agent job assignment logging (#3090)

* Improve agent job assignment logging

* add more agent logging

---------

Co-authored-by: Paul Wells <paulwe@gmail.com>
This commit is contained in:
David Zhao
2024-10-13 00:56:51 -07:00
committed by GitHub
parent 9147120915
commit 2d6aa049c9
2 changed files with 46 additions and 23 deletions
+12 -4
View File
@@ -360,7 +360,15 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) (*livekit.JobS
job.State.ParticipantIdentity = res.ParticipantIdentity
token, err := pagent.BuildAgentToken(w.apiKey, w.apiSecret, job.Room.Name, res.ParticipantIdentity, res.ParticipantName, res.ParticipantMetadata, w.Permissions)
token, err := pagent.BuildAgentToken(
w.apiKey,
w.apiSecret,
job.Room.Name,
res.ParticipantIdentity,
res.ParticipantName,
res.ParticipantMetadata,
w.Permissions,
)
if err != nil {
w.logger.Errorw("failed to build agent token", err)
return nil, err
@@ -438,7 +446,7 @@ func (w *Worker) Close() {
return
}
w.logger.Infow("closing worker")
w.logger.Infow("closing worker", "workerID", w.ID, "jobType", w.JobType, "agentName", w.AgentName)
close(w.closed)
w.cancel()
@@ -453,7 +461,7 @@ func (w *Worker) HandleAvailability(res *livekit.AvailabilityResponse) error {
jobID := livekit.JobID(res.JobId)
availCh, ok := w.availability[jobID]
if !ok {
w.logger.Warnw("received availability response for unknown job", nil, "jobId", jobID)
w.logger.Warnw("received availability response for unknown job", nil, "jobID", jobID)
return nil
}
@@ -519,7 +527,7 @@ func (w *Worker) HandleSimulateJob(simulate *livekit.SimulateJobRequest) error {
go func() {
_, err := w.AssignJob(w.ctx, job)
if err != nil {
w.logger.Errorw("unable to simulate job", err, "jobId", job.Id)
w.logger.Errorw("unable to simulate job", err, "jobID", job.Id)
}
}()
+34 -19
View File
@@ -89,7 +89,7 @@ func DispatchAgentWorkerSignal(c agent.SignalConn, h agent.WorkerSignalHandler,
req, _, err := c.ReadWorkerMessage()
if err != nil {
if IsWebSocketCloseError(err) {
l.Infow("worker closed WS connection", "wsError", err)
l.Debugw("worker closed WS connection", "wsError", err)
} else {
l.Errorw("error reading from websocket", err)
}
@@ -200,7 +200,7 @@ func NewAgentHandler(
) *AgentHandler {
return &AgentHandler{
agentServer: agentServer,
logger: logger,
logger: logger.WithComponent("agents"),
workers: make(map[string]*agent.Worker),
jobToWorker: make(map[livekit.JobID]*agent.Worker),
namespaceWorkers: make(map[workerKey][]*agent.Worker),
@@ -272,9 +272,15 @@ func (h *AgentHandler) registerWorker(w *agent.Worker) {
h.namespaceWorkers[key] = append(workers, w)
h.mu.Unlock()
h.logger.Infow("worker registered",
"namespace", w.Namespace,
"jobType", w.JobType,
"agentName", w.AgentName,
"workerID", w.ID,
)
if created {
h.logger.Infow("initial worker registered", "namespace", w.Namespace, "jobType", w.JobType, "agentName", w.AgentName)
err := h.agentServer.PublishWorkerRegistered(context.Background(), agent.DefaultHandlerNamespace, &emptypb.Empty{})
// TODO: when this happens, should we disconnect the worker so it'll retry?
if err != nil {
w.Logger().Errorw("failed to publish worker registered", err, "namespace", w.Namespace, "jobType", w.JobType, "agentName", w.AgentName)
}
@@ -301,7 +307,12 @@ func (h *AgentHandler) deregisterWorker(w *agent.Worker) {
if len(workers) > 1 {
h.namespaceWorkers[key] = slices.Delete(workers, index, index+1)
} else {
h.logger.Debugw("last worker deregistered", "namespace", w.Namespace, "jobType", w.JobType, "agentName", w.AgentName)
h.logger.Infow("last worker deregistered",
"namespace", w.Namespace,
"jobType", w.JobType,
"agentName", w.AgentName,
"workerID", w.ID,
)
delete(h.namespaceWorkers, key)
topic := agent.GetAgentTopic(w.AgentName, w.Namespace)
@@ -337,43 +348,47 @@ func (h *AgentHandler) deregisterJob(jobID livekit.JobID) {
}
func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*rpc.JobRequestResponse, error) {
logger := h.logger.WithUnlikelyValues(
"jobID", job.Id,
"namespace", job.Namespace,
"agentName", job.AgentName,
)
if job.Room != nil {
logger = logger.WithValues("room", job.Room.Name, "roomID", job.Room.Sid)
}
if job.Participant != nil {
logger = logger.WithValues("participant", job.Participant.Identity)
}
key := workerKey{job.AgentName, job.Namespace, job.Type}
attempted := make(map[*agent.Worker]struct{})
for {
selected, err := h.selectWorkerWeightedByLoad(key, attempted)
if err != nil {
logger.Warnw("no worker available to handle job", err)
return nil, psrpc.NewError(psrpc.ResourceExhausted, err)
}
logger := logger.WithValues("workerID", selected.ID)
attempted[selected] = struct{}{}
values := []interface{}{
"jobID", job.Id,
"namespace", job.Namespace,
"agentName", job.AgentName,
"workerID", selected.ID,
}
if job.Room != nil {
values = append(values, "room", job.Room.Name, "roomID", job.Room.Sid)
}
if job.Participant != nil {
values = append(values, "participant", job.Participant.Identity)
}
h.logger.Debugw("assigning job", values...)
state, err := selected.AssignJob(ctx, job)
if err != nil {
if utils.ErrorIsOneOf(err, agent.ErrWorkerNotAvailable, agent.ErrWorkerClosed) {
retry := utils.ErrorIsOneOf(err, agent.ErrWorkerNotAvailable, agent.ErrWorkerClosed)
logger.Warnw("failed to assign job to worker", err, "retry", retry)
if retry {
continue // Try another worker
}
return nil, err
}
logger.Infow("assigned job to worker")
h.mu.Lock()
h.jobToWorker[livekit.JobID(job.Id)] = selected
h.mu.Unlock()
err = h.agentServer.RegisterJobTerminateTopic(job.Id)
if err != nil {
h.logger.Errorw("failes registering JobTerminate handler", err, values...)
logger.Errorw("failed to register JobTerminate handler", err)
}
return &rpc.JobRequestResponse{