return copy of job state when starting job (#3035)

This commit is contained in:
Paul Wells
2024-09-22 23:37:06 -07:00
committed by GitHub
parent 341d1e512c
commit 5e3b3ee3c1
2 changed files with 15 additions and 11 deletions
+13 -9
View File
@@ -27,6 +27,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/utils/guid"
"github.com/livekit/psrpc"
)
@@ -300,13 +301,14 @@ func (w *Worker) RunningJobs() map[string]*livekit.Job {
return jobs
}
func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error {
func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) (*livekit.JobState, error) {
availCh := make(chan *livekit.AvailabilityResponse, 1)
job = utils.CloneProto(job)
w.mu.Lock()
if _, ok := w.availability[job.Id]; ok {
w.mu.Unlock()
return ErrDuplicateJobAssignment
return nil, ErrDuplicateJobAssignment
}
w.availability[job.Id] = availCh
@@ -336,7 +338,7 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error {
select {
case res := <-availCh:
if !res.Available {
return ErrWorkerNotAvailable
return nil, ErrWorkerNotAvailable
}
job.State.ParticipantIdentity = res.ParticipantIdentity
@@ -344,7 +346,7 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error {
token, err := pagent.BuildAgentToken(w.apiKey, w.apiSecret, job.Room.Name, res.ParticipantIdentity, res.ParticipantName, res.ParticipantMetadata, w.Permissions)
if err != nil {
w.logger.Errorw("failed to build agent token", err)
return err
return nil, err
}
// In OSS, Url is nil, and the used API Key is the same as the one used to connect the worker
@@ -352,19 +354,21 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error {
Assignment: &livekit.JobAssignment{Job: job, Url: nil, Token: token},
}})
state := utils.CloneProto(job.State)
w.mu.Lock()
w.runningJobs[job.Id] = job
w.mu.Unlock()
// TODO sweep jobs that are never started. We can't do this until all SDKs actually update the the JOB state
return nil
return state, nil
case <-timeout.C:
return ErrAvailabilityTimeout
return nil, ErrAvailabilityTimeout
case <-w.ctx.Done():
return ErrWorkerClosed
return nil, ErrWorkerClosed
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
}
}
@@ -497,7 +501,7 @@ func (w *Worker) HandleSimulateJob(simulate *livekit.SimulateJobRequest) error {
}
go func() {
err := w.AssignJob(w.ctx, job)
_, err := w.AssignJob(w.ctx, job)
if err != nil {
w.logger.Errorw("failed to simulate job, assignment failed", err, "jobId", job.Id)
}
+2 -2
View File
@@ -366,7 +366,7 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*rpc.J
values = append(values, "participant", job.Participant.Identity)
}
h.logger.Debugw("assigning job", values...)
err = selected.AssignJob(ctx, job)
state, err := selected.AssignJob(ctx, job)
if err != nil {
if errors.Is(err, agent.ErrWorkerNotAvailable) {
continue // Try another worker
@@ -383,7 +383,7 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*rpc.J
}
return &rpc.JobRequestResponse{
State: job.State,
State: state,
}, nil
}
}