Files
livekit/pkg/agent/job.go
Théo Monnom dc67f505a5 agent service: new protocol & namespaces (#2545)
* initial worker impl

* fix test

* fix build

* TestAgentNamespaces

* log err

* nit cmt

* TestAgentMultiNode

* Update pkg/agent/worker.go

Co-authored-by: David Zhao <dz@livekit.io>

* retry on worker selection & fix review comments

* Update roommanager.go

* license

* use testutils.WIthTimeout

* abstract namespace/enabled logic into agent.Client, incrementally dispatch

* typos and dates

* lock

* timeout is now optional

* pass in topics instead of fixed

* handler handles connections

* onIdle, numConnections

* fix WithGrants

* update protocol

* check agent client

* broadcast after unlock

* fix data race

* remove ReadChan, fix dispatcher

---------

Co-authored-by: David Zhao <dz@livekit.io>
Co-authored-by: David Colburn <xero73@gmail.com>
2024-04-03 15:25:42 -07:00

87 lines
1.9 KiB
Go

// Copyright 2024 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"sync"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)
// Represents a job that is being executed by a worker
type Job struct {
id string
jobType livekit.JobType
status livekit.JobStatus
namespace string
mu sync.Mutex
load float32
Logger logger.Logger
}
func NewJob(id, namespace string, jobType livekit.JobType) *Job {
return &Job{
id: id,
status: livekit.JobStatus_JS_UNKNOWN,
jobType: jobType,
namespace: namespace,
}
}
func (j *Job) ID() string {
return j.id
}
func (j *Job) Namespace() string {
return j.namespace
}
func (j *Job) Type() livekit.JobType {
return j.jobType
}
func (j *Job) WorkerLoad() float32 {
// Current load that this job is taking on its worker
j.mu.Lock()
defer j.mu.Unlock()
return j.load
}
func (j *Job) UpdateStatus(req *livekit.UpdateJobStatus) {
j.mu.Lock()
if req.Status != nil {
j.status = *req.Status // End of the job, SUCCESS or FAILURE
if j.status == livekit.JobStatus_JS_FAILED {
j.Logger.Errorw("job failed", nil, "id", j.id, "type", j.jobType, "error", req.Error)
}
}
j.load = req.Load
j.mu.Unlock()
if req.Metadata != nil {
j.UpdateMetadata(req.GetMetadata())
}
}
func (j *Job) UpdateMetadata(metadata string) {
j.Logger.Debugw("job metadata", nil, "id", j.id, "metadata", metadata)
}