From 5e3b3ee3c16809dc8785e09d0f34edf2017c1ee2 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Sun, 22 Sep 2024 23:37:06 -0700 Subject: [PATCH] return copy of job state when starting job (#3035) --- pkg/agent/worker.go | 22 +++++++++++++--------- pkg/service/agentservice.go | 4 ++-- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/pkg/agent/worker.go b/pkg/agent/worker.go index 7de167374..c4729482b 100644 --- a/pkg/agent/worker.go +++ b/pkg/agent/worker.go @@ -27,6 +27,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" + "github.com/livekit/protocol/utils" "github.com/livekit/protocol/utils/guid" "github.com/livekit/psrpc" ) @@ -300,13 +301,14 @@ func (w *Worker) RunningJobs() map[string]*livekit.Job { return jobs } -func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error { +func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) (*livekit.JobState, error) { availCh := make(chan *livekit.AvailabilityResponse, 1) + job = utils.CloneProto(job) w.mu.Lock() if _, ok := w.availability[job.Id]; ok { w.mu.Unlock() - return ErrDuplicateJobAssignment + return nil, ErrDuplicateJobAssignment } w.availability[job.Id] = availCh @@ -336,7 +338,7 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error { select { case res := <-availCh: if !res.Available { - return ErrWorkerNotAvailable + return nil, ErrWorkerNotAvailable } job.State.ParticipantIdentity = res.ParticipantIdentity @@ -344,7 +346,7 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error { token, err := pagent.BuildAgentToken(w.apiKey, w.apiSecret, job.Room.Name, res.ParticipantIdentity, res.ParticipantName, res.ParticipantMetadata, w.Permissions) if err != nil { w.logger.Errorw("failed to build agent token", err) - return err + return nil, err } // In OSS, Url is nil, and the used API Key is the same as the one used to connect the worker @@ -352,19 +354,21 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error { Assignment: &livekit.JobAssignment{Job: job, Url: nil, Token: token}, }}) + state := utils.CloneProto(job.State) + w.mu.Lock() w.runningJobs[job.Id] = job w.mu.Unlock() // TODO sweep jobs that are never started. We can't do this until all SDKs actually update the the JOB state - return nil + return state, nil case <-timeout.C: - return ErrAvailabilityTimeout + return nil, ErrAvailabilityTimeout case <-w.ctx.Done(): - return ErrWorkerClosed + return nil, ErrWorkerClosed case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() } } @@ -497,7 +501,7 @@ func (w *Worker) HandleSimulateJob(simulate *livekit.SimulateJobRequest) error { } go func() { - err := w.AssignJob(w.ctx, job) + _, err := w.AssignJob(w.ctx, job) if err != nil { w.logger.Errorw("failed to simulate job, assignment failed", err, "jobId", job.Id) } diff --git a/pkg/service/agentservice.go b/pkg/service/agentservice.go index f6491230a..27e50802a 100644 --- a/pkg/service/agentservice.go +++ b/pkg/service/agentservice.go @@ -366,7 +366,7 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*rpc.J values = append(values, "participant", job.Participant.Identity) } h.logger.Debugw("assigning job", values...) - err = selected.AssignJob(ctx, job) + state, err := selected.AssignJob(ctx, job) if err != nil { if errors.Is(err, agent.ErrWorkerNotAvailable) { continue // Try another worker @@ -383,7 +383,7 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*rpc.J } return &rpc.JobRequestResponse{ - State: job.State, + State: state, }, nil } }