From f4314686d136d8fac140832e64a39b07abcab4eb Mon Sep 17 00:00:00 2001 From: David Zhao Date: Fri, 5 Apr 2024 20:32:29 -0700 Subject: [PATCH] Improve Agent logging (#2628) --- pkg/agent/worker.go | 3 +++ pkg/service/agentservice.go | 43 +++++++++++++++++++++++++++---------- 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/pkg/agent/worker.go b/pkg/agent/worker.go index b6f69fcfe..666681b39 100644 --- a/pkg/agent/worker.go +++ b/pkg/agent/worker.go @@ -303,6 +303,8 @@ func (w *Worker) handleRegister(req *livekit.RegisterWorkerRequest) { w.registered.Store(true) w.mu.Unlock() + w.Logger.Debugw("worker registered", "request", req) + w.sendRequest(&livekit.ServerMessage{ Message: &livekit.ServerMessage_Register{ Register: &livekit.RegisterWorkerResponse{ @@ -376,6 +378,7 @@ func (w *Worker) handleWorkerPing(ping *livekit.WorkerPing) { } func (w *Worker) handleWorkerStatus(update *livekit.UpdateWorkerStatus) { + w.Logger.Debugw("worker status update", "status", update.Status, "load", update.Load) w.UpdateStatus(update) } diff --git a/pkg/service/agentservice.go b/pkg/service/agentservice.go index e8f391cb4..e3153347c 100644 --- a/pkg/service/agentservice.go +++ b/pkg/service/agentservice.go @@ -33,10 +33,10 @@ import ( "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/livekit-server/pkg/rtc/types" - "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/livekit-server/version" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" "github.com/livekit/psrpc" ) @@ -50,6 +50,7 @@ type AgentService struct { type AgentHandler struct { agentServer rpc.AgentInternalServer mu sync.Mutex + logger logger.Logger serverInfo *livekit.ServerInfo workers map[string]*agent.Worker @@ -98,6 +99,7 @@ func NewAgentService(conf *config.Config, s.AgentHandler = NewAgentHandler( agentServer, keyProvider, + logger.GetLogger(), serverInfo, agent.RoomAgentTopic, agent.PublisherAgentTopic, @@ -132,12 +134,14 @@ func (s *AgentService) ServeHTTP(writer http.ResponseWriter, r *http.Request) { func NewAgentHandler( agentServer rpc.AgentInternalServer, keyProvider auth.KeyProvider, + logger logger.Logger, serverInfo *livekit.ServerInfo, roomTopic string, publisherTopic string, ) *AgentHandler { return &AgentHandler{ agentServer: agentServer, + logger: logger, workers: make(map[string]*agent.Worker), namespaces: make(map[string]*namespaceInfo), serverInfo: serverInfo, @@ -155,13 +159,11 @@ func (h *AgentHandler) HandleConnection(r *http.Request, conn *websocket.Conn, o sigConn := NewWSSignalConnection(conn) - logger := utils.GetLogger(r.Context()) - apiKey := GetAPIKey(r.Context()) apiSecret := h.keyProvider.GetSecret(apiKey) - worker := agent.NewWorker(protocol, apiKey, apiSecret, h.serverInfo, conn, sigConn, logger) - worker.OnWorkerRegistered(h.registerWorkerTopic) + worker := agent.NewWorker(protocol, apiKey, apiSecret, h.serverInfo, conn, sigConn, h.logger) + worker.OnWorkerRegistered(h.handleWorkerRegister) h.mu.Lock() h.workers[worker.ID()] = worker @@ -176,7 +178,7 @@ func (h *AgentHandler) HandleConnection(r *http.Request, conn *websocket.Conn, o h.mu.Unlock() if worker.Registered() { - h.deregisterWorkerTopic(worker) + h.handleWorkerDeregister(worker) } if numWorkers == 0 && onIdle != nil { @@ -209,7 +211,7 @@ func (h *AgentHandler) HandleConnection(r *http.Request, conn *websocket.Conn, o } } -func (h *AgentHandler) registerWorkerTopic(w *agent.Worker) { +func (h *AgentHandler) handleWorkerRegister(w *agent.Worker) { h.mu.Lock() info, ok := h.namespaces[w.Namespace()] @@ -220,16 +222,19 @@ func (h *AgentHandler) registerWorkerTopic(w *agent.Worker) { numRooms = info.numRooms } + shouldNotify := false var err error if w.JobType() == livekit.JobType_JT_PUBLISHER { numPublishers++ if numPublishers == 1 { + shouldNotify = true err = h.agentServer.RegisterJobRequestTopic(w.Namespace(), h.publisherTopic) } } else if w.JobType() == livekit.JobType_JT_ROOM { numRooms++ if numRooms == 1 { + shouldNotify = true err = h.agentServer.RegisterJobRequestTopic(w.Namespace(), h.roomTopic) } } @@ -250,13 +255,16 @@ func (h *AgentHandler) registerWorkerTopic(w *agent.Worker) { h.publisherEnabled = h.publisherAvailableLocked() h.mu.Unlock() - err = h.agentServer.PublishWorkerRegistered(context.Background(), agent.DefaultHandlerNamespace, &emptypb.Empty{}) - if err != nil { - w.Logger.Errorw("failed to publish worker registered", err) + if shouldNotify { + h.logger.Infow("initial worker registered", "namespace", w.Namespace(), "jobType", w.JobType()) + err = h.agentServer.PublishWorkerRegistered(context.Background(), agent.DefaultHandlerNamespace, &emptypb.Empty{}) + if err != nil { + w.Logger.Errorw("failed to publish worker registered", err) + } } } -func (h *AgentHandler) deregisterWorkerTopic(worker *agent.Worker) { +func (h *AgentHandler) handleWorkerDeregister(worker *agent.Worker) { h.mu.Lock() defer h.mu.Unlock() @@ -278,6 +286,7 @@ func (h *AgentHandler) deregisterWorkerTopic(worker *agent.Worker) { } if info.numPublishers == 0 && info.numRooms == 0 { + h.logger.Debugw("last worker deregistered") delete(h.namespaces, worker.Namespace()) } @@ -340,6 +349,18 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*empty attempted[selected.ID()] = true + values := []interface{}{ + "jobID", job.Id, + "namespace", job.Namespace, + "workerID", selected.ID(), + } + if job.Room != nil { + values = append(values, "room", job.Room.Name, "roomID", job.Room.Sid) + } + if job.Participant != nil { + values = append(values, "participant", job.Participant.Identity) + } + logger.Debugw("assigning job", values...) err := selected.AssignJob(ctx, job) if err != nil { if errors.Is(err, agent.ErrWorkerNotAvailable) {