mirror of
https://github.com/livekit/livekit.git
synced 2026-03-29 07:09:51 +00:00
579 lines
15 KiB
Go
579 lines
15 KiB
Go
// Copyright 2024 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package agent
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
pagent "github.com/livekit/protocol/agent"
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
"github.com/livekit/protocol/rpc"
|
|
"github.com/livekit/protocol/utils"
|
|
"github.com/livekit/protocol/utils/guid"
|
|
"github.com/livekit/psrpc"
|
|
)
|
|
|
|
var (
|
|
ErrUnimplementedWrorkerSignal = errors.New("unimplemented worker signal")
|
|
ErrUnknownWorkerSignal = errors.New("unknown worker signal")
|
|
ErrUnknownJobType = errors.New("unknown job type")
|
|
ErrJobNotFound = psrpc.NewErrorf(psrpc.NotFound, "no running job for given jobID")
|
|
ErrWorkerClosed = errors.New("worker closed")
|
|
ErrWorkerNotAvailable = errors.New("worker not available")
|
|
ErrAvailabilityTimeout = errors.New("agent worker availability timeout")
|
|
ErrDuplicateJobAssignment = errors.New("duplicate job assignment")
|
|
)
|
|
|
|
const AgentNameAttributeKey = "lk.agent_name"
|
|
|
|
type WorkerProtocolVersion int
|
|
|
|
const CurrentProtocol = 1
|
|
|
|
const (
|
|
RegisterTimeout = 10 * time.Second
|
|
AssignJobTimeout = 10 * time.Second
|
|
)
|
|
|
|
type SignalConn interface {
|
|
WriteServerMessage(msg *livekit.ServerMessage) (int, error)
|
|
ReadWorkerMessage() (*livekit.WorkerMessage, int, error)
|
|
SetReadDeadline(time.Time) error
|
|
Close() error
|
|
}
|
|
|
|
func JobStatusIsEnded(s livekit.JobStatus) bool {
|
|
return s == livekit.JobStatus_JS_SUCCESS || s == livekit.JobStatus_JS_FAILED
|
|
}
|
|
|
|
type WorkerSignalHandler interface {
|
|
HandleRegister(*livekit.RegisterWorkerRequest) error
|
|
HandleAvailability(*livekit.AvailabilityResponse) error
|
|
HandleUpdateJob(*livekit.UpdateJobStatus) error
|
|
HandleSimulateJob(*livekit.SimulateJobRequest) error
|
|
HandlePing(*livekit.WorkerPing) error
|
|
HandleUpdateWorker(*livekit.UpdateWorkerStatus) error
|
|
HandleMigrateJob(*livekit.MigrateJobRequest) error
|
|
}
|
|
|
|
func DispatchWorkerSignal(req *livekit.WorkerMessage, h WorkerSignalHandler) error {
|
|
switch m := req.Message.(type) {
|
|
case *livekit.WorkerMessage_Register:
|
|
return h.HandleRegister(m.Register)
|
|
case *livekit.WorkerMessage_Availability:
|
|
return h.HandleAvailability(m.Availability)
|
|
case *livekit.WorkerMessage_UpdateJob:
|
|
return h.HandleUpdateJob(m.UpdateJob)
|
|
case *livekit.WorkerMessage_SimulateJob:
|
|
return h.HandleSimulateJob(m.SimulateJob)
|
|
case *livekit.WorkerMessage_Ping:
|
|
return h.HandlePing(m.Ping)
|
|
case *livekit.WorkerMessage_UpdateWorker:
|
|
return h.HandleUpdateWorker(m.UpdateWorker)
|
|
case *livekit.WorkerMessage_MigrateJob:
|
|
return h.HandleMigrateJob(m.MigrateJob)
|
|
default:
|
|
return ErrUnknownWorkerSignal
|
|
}
|
|
}
|
|
|
|
var _ WorkerSignalHandler = (*UnimplementedWorkerSignalHandler)(nil)
|
|
|
|
type UnimplementedWorkerSignalHandler struct{}
|
|
|
|
func (UnimplementedWorkerSignalHandler) HandleRegister(*livekit.RegisterWorkerRequest) error {
|
|
return fmt.Errorf("%w: Register", ErrUnimplementedWrorkerSignal)
|
|
}
|
|
func (UnimplementedWorkerSignalHandler) HandleAvailability(*livekit.AvailabilityResponse) error {
|
|
return fmt.Errorf("%w: Availability", ErrUnimplementedWrorkerSignal)
|
|
}
|
|
func (UnimplementedWorkerSignalHandler) HandleUpdateJob(*livekit.UpdateJobStatus) error {
|
|
return fmt.Errorf("%w: UpdateJob", ErrUnimplementedWrorkerSignal)
|
|
}
|
|
func (UnimplementedWorkerSignalHandler) HandleSimulateJob(*livekit.SimulateJobRequest) error {
|
|
return fmt.Errorf("%w: SimulateJob", ErrUnimplementedWrorkerSignal)
|
|
}
|
|
func (UnimplementedWorkerSignalHandler) HandlePing(*livekit.WorkerPing) error {
|
|
return fmt.Errorf("%w: Ping", ErrUnimplementedWrorkerSignal)
|
|
}
|
|
func (UnimplementedWorkerSignalHandler) HandleUpdateWorker(*livekit.UpdateWorkerStatus) error {
|
|
return fmt.Errorf("%w: UpdateWorker", ErrUnimplementedWrorkerSignal)
|
|
}
|
|
func (UnimplementedWorkerSignalHandler) HandleMigrateJob(*livekit.MigrateJobRequest) error {
|
|
return fmt.Errorf("%w: MigrateJob", ErrUnimplementedWrorkerSignal)
|
|
}
|
|
|
|
type WorkerPingHandler struct {
|
|
UnimplementedWorkerSignalHandler
|
|
conn SignalConn
|
|
}
|
|
|
|
func (h WorkerPingHandler) HandlePing(ping *livekit.WorkerPing) error {
|
|
_, err := h.conn.WriteServerMessage(&livekit.ServerMessage{
|
|
Message: &livekit.ServerMessage_Pong{
|
|
Pong: &livekit.WorkerPong{
|
|
LastTimestamp: ping.Timestamp,
|
|
Timestamp: time.Now().UnixMilli(),
|
|
},
|
|
},
|
|
})
|
|
return err
|
|
}
|
|
|
|
type WorkerRegistration struct {
|
|
Protocol WorkerProtocolVersion
|
|
ID string
|
|
Version string
|
|
AgentID string
|
|
AgentName string
|
|
Namespace string
|
|
JobType livekit.JobType
|
|
Permissions *livekit.ParticipantPermission
|
|
ClientIP string
|
|
}
|
|
|
|
func MakeWorkerRegistration() WorkerRegistration {
|
|
return WorkerRegistration{
|
|
ID: guid.New(guid.AgentWorkerPrefix),
|
|
Protocol: CurrentProtocol,
|
|
}
|
|
}
|
|
|
|
var _ WorkerSignalHandler = (*WorkerRegisterer)(nil)
|
|
|
|
type WorkerRegisterer struct {
|
|
WorkerPingHandler
|
|
serverInfo *livekit.ServerInfo
|
|
deadline time.Time
|
|
|
|
registration WorkerRegistration
|
|
registered bool
|
|
}
|
|
|
|
func NewWorkerRegisterer(conn SignalConn, serverInfo *livekit.ServerInfo, base WorkerRegistration) *WorkerRegisterer {
|
|
return &WorkerRegisterer{
|
|
WorkerPingHandler: WorkerPingHandler{conn: conn},
|
|
serverInfo: serverInfo,
|
|
registration: base,
|
|
deadline: time.Now().Add(RegisterTimeout),
|
|
}
|
|
}
|
|
|
|
func (h *WorkerRegisterer) Deadline() time.Time {
|
|
return h.deadline
|
|
}
|
|
|
|
func (h *WorkerRegisterer) Registration() WorkerRegistration {
|
|
return h.registration
|
|
}
|
|
|
|
func (h *WorkerRegisterer) Registered() bool {
|
|
return h.registered
|
|
}
|
|
|
|
func (h *WorkerRegisterer) HandleRegister(req *livekit.RegisterWorkerRequest) error {
|
|
if !livekit.IsJobType(req.GetType()) {
|
|
return ErrUnknownJobType
|
|
}
|
|
|
|
permissions := req.AllowedPermissions
|
|
if permissions == nil {
|
|
permissions = &livekit.ParticipantPermission{
|
|
CanSubscribe: true,
|
|
CanPublish: true,
|
|
CanPublishData: true,
|
|
CanUpdateMetadata: true,
|
|
}
|
|
}
|
|
|
|
h.registration.Version = req.Version
|
|
h.registration.AgentName = req.AgentName
|
|
h.registration.Namespace = req.GetNamespace()
|
|
h.registration.JobType = req.GetType()
|
|
h.registration.Permissions = permissions
|
|
h.registered = true
|
|
|
|
_, err := h.conn.WriteServerMessage(&livekit.ServerMessage{
|
|
Message: &livekit.ServerMessage_Register{
|
|
Register: &livekit.RegisterWorkerResponse{
|
|
WorkerId: h.registration.ID,
|
|
ServerInfo: h.serverInfo,
|
|
},
|
|
},
|
|
})
|
|
return err
|
|
}
|
|
|
|
var _ WorkerSignalHandler = (*Worker)(nil)
|
|
|
|
type Worker struct {
|
|
WorkerPingHandler
|
|
WorkerRegistration
|
|
|
|
apiKey string
|
|
apiSecret string
|
|
logger logger.Logger
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
closed chan struct{}
|
|
|
|
mu sync.Mutex
|
|
load float32
|
|
status livekit.WorkerStatus
|
|
|
|
runningJobs map[livekit.JobID]*livekit.Job
|
|
availability map[livekit.JobID]chan *livekit.AvailabilityResponse
|
|
}
|
|
|
|
func NewWorker(
|
|
registration WorkerRegistration,
|
|
apiKey string,
|
|
apiSecret string,
|
|
conn SignalConn,
|
|
logger logger.Logger,
|
|
) *Worker {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
return &Worker{
|
|
WorkerPingHandler: WorkerPingHandler{conn: conn},
|
|
WorkerRegistration: registration,
|
|
apiKey: apiKey,
|
|
apiSecret: apiSecret,
|
|
logger: logger.WithValues(
|
|
"workerID", registration.ID,
|
|
"agentName", registration.AgentName,
|
|
"jobType", registration.JobType.String(),
|
|
),
|
|
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
closed: make(chan struct{}),
|
|
|
|
runningJobs: make(map[livekit.JobID]*livekit.Job),
|
|
availability: make(map[livekit.JobID]chan *livekit.AvailabilityResponse),
|
|
}
|
|
}
|
|
|
|
func (w *Worker) sendRequest(req *livekit.ServerMessage) {
|
|
if _, err := w.conn.WriteServerMessage(req); err != nil {
|
|
w.logger.Warnw("error writing to websocket", err)
|
|
}
|
|
}
|
|
|
|
func (w *Worker) Status() livekit.WorkerStatus {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
return w.status
|
|
}
|
|
|
|
func (w *Worker) Load() float32 {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
return w.load
|
|
}
|
|
|
|
func (w *Worker) Logger() logger.Logger {
|
|
return w.logger
|
|
}
|
|
|
|
func (w *Worker) RunningJobs() map[livekit.JobID]*livekit.Job {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
jobs := make(map[livekit.JobID]*livekit.Job, len(w.runningJobs))
|
|
for k, v := range w.runningJobs {
|
|
jobs[k] = v
|
|
}
|
|
return jobs
|
|
}
|
|
|
|
func (w *Worker) RunningJobCount() int {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
return len(w.runningJobs)
|
|
}
|
|
|
|
func (w *Worker) GetJobState(jobID livekit.JobID) (*livekit.JobState, error) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
j, ok := w.runningJobs[jobID]
|
|
if !ok {
|
|
return nil, ErrJobNotFound
|
|
}
|
|
return utils.CloneProto(j.State), nil
|
|
}
|
|
|
|
func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) (*livekit.JobState, error) {
|
|
availCh := make(chan *livekit.AvailabilityResponse, 1)
|
|
job = utils.CloneProto(job)
|
|
jobID := livekit.JobID(job.Id)
|
|
|
|
w.mu.Lock()
|
|
if _, ok := w.availability[jobID]; ok {
|
|
w.mu.Unlock()
|
|
return nil, ErrDuplicateJobAssignment
|
|
}
|
|
|
|
w.availability[jobID] = availCh
|
|
w.mu.Unlock()
|
|
|
|
defer func() {
|
|
w.mu.Lock()
|
|
delete(w.availability, jobID)
|
|
w.mu.Unlock()
|
|
}()
|
|
|
|
if job.State == nil {
|
|
job.State = &livekit.JobState{}
|
|
}
|
|
now := time.Now()
|
|
job.State.WorkerId = w.ID
|
|
job.State.AgentId = w.AgentID
|
|
job.State.UpdatedAt = now.UnixNano()
|
|
job.State.StartedAt = now.UnixNano()
|
|
job.State.Status = livekit.JobStatus_JS_RUNNING
|
|
|
|
w.sendRequest(&livekit.ServerMessage{Message: &livekit.ServerMessage_Availability{
|
|
Availability: &livekit.AvailabilityRequest{Job: job},
|
|
}})
|
|
|
|
timeout := time.NewTimer(AssignJobTimeout)
|
|
defer timeout.Stop()
|
|
|
|
// See handleAvailability for the response
|
|
select {
|
|
case res := <-availCh:
|
|
if res.Terminate {
|
|
job.State.EndedAt = now.UnixNano()
|
|
job.State.Status = livekit.JobStatus_JS_SUCCESS
|
|
return job.State, nil
|
|
}
|
|
|
|
if !res.Available {
|
|
return nil, ErrWorkerNotAvailable
|
|
}
|
|
|
|
job.State.ParticipantIdentity = res.ParticipantIdentity
|
|
attributes := res.ParticipantAttributes
|
|
if attributes == nil {
|
|
attributes = make(map[string]string)
|
|
}
|
|
attributes[AgentNameAttributeKey] = w.AgentName
|
|
|
|
token, err := pagent.BuildAgentToken(
|
|
w.apiKey,
|
|
w.apiSecret,
|
|
job.Room.Name,
|
|
res.ParticipantIdentity,
|
|
res.ParticipantName,
|
|
res.ParticipantMetadata,
|
|
attributes,
|
|
w.Permissions,
|
|
)
|
|
if err != nil {
|
|
w.logger.Errorw("failed to build agent token", err)
|
|
return nil, err
|
|
}
|
|
|
|
// In OSS, Url is nil, and the used API Key is the same as the one used to connect the worker
|
|
w.sendRequest(&livekit.ServerMessage{Message: &livekit.ServerMessage_Assignment{
|
|
Assignment: &livekit.JobAssignment{Job: job, Url: nil, Token: token},
|
|
}})
|
|
|
|
state := utils.CloneProto(job.State)
|
|
|
|
w.mu.Lock()
|
|
w.runningJobs[jobID] = job
|
|
w.mu.Unlock()
|
|
|
|
// TODO sweep jobs that are never started. We can't do this until all SDKs actually update the the JOB state
|
|
|
|
return state, nil
|
|
case <-timeout.C:
|
|
return nil, ErrAvailabilityTimeout
|
|
case <-w.ctx.Done():
|
|
return nil, ErrWorkerClosed
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
|
|
func (w *Worker) TerminateJob(jobID livekit.JobID, reason rpc.JobTerminateReason) (*livekit.JobState, error) {
|
|
w.mu.Lock()
|
|
_, ok := w.runningJobs[jobID]
|
|
w.mu.Unlock()
|
|
|
|
if !ok {
|
|
return nil, ErrJobNotFound
|
|
}
|
|
|
|
w.sendRequest(&livekit.ServerMessage{Message: &livekit.ServerMessage_Termination{
|
|
Termination: &livekit.JobTermination{
|
|
JobId: string(jobID),
|
|
},
|
|
}})
|
|
|
|
status := livekit.JobStatus_JS_SUCCESS
|
|
errorStr := ""
|
|
if reason == rpc.JobTerminateReason_AGENT_LEFT_ROOM {
|
|
status = livekit.JobStatus_JS_FAILED
|
|
errorStr = "agent worker left the room"
|
|
}
|
|
|
|
return w.UpdateJobStatus(&livekit.UpdateJobStatus{
|
|
JobId: string(jobID),
|
|
Status: status,
|
|
Error: errorStr,
|
|
})
|
|
}
|
|
|
|
func (w *Worker) UpdateMetadata(metadata string) {
|
|
w.logger.Debugw("worker metadata updated", nil, "metadata", metadata)
|
|
}
|
|
|
|
func (w *Worker) IsClosed() bool {
|
|
select {
|
|
case <-w.closed:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (w *Worker) Close() {
|
|
w.mu.Lock()
|
|
if w.IsClosed() {
|
|
w.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
w.logger.Infow("closing worker", "workerID", w.ID, "jobType", w.JobType, "agentName", w.AgentName)
|
|
|
|
close(w.closed)
|
|
w.cancel()
|
|
_ = w.conn.Close()
|
|
w.mu.Unlock()
|
|
}
|
|
|
|
func (w *Worker) HandleAvailability(res *livekit.AvailabilityResponse) error {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
jobID := livekit.JobID(res.JobId)
|
|
availCh, ok := w.availability[jobID]
|
|
if !ok {
|
|
w.logger.Warnw("received availability response for unknown job", nil, "jobID", jobID)
|
|
return nil
|
|
}
|
|
|
|
availCh <- res
|
|
delete(w.availability, jobID)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (w *Worker) HandleUpdateJob(update *livekit.UpdateJobStatus) error {
|
|
_, err := w.UpdateJobStatus(update)
|
|
if err != nil {
|
|
// treating this as a debug message only
|
|
// this can happen if the Room closes first, which would delete the agent dispatch
|
|
// that would mark the job as successful. subsequent updates from the same worker
|
|
// would not be able to find the same jobID.
|
|
w.logger.Debugw("received job update for unknown job", "jobID", update.JobId)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *Worker) UpdateJobStatus(update *livekit.UpdateJobStatus) (*livekit.JobState, error) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
jobID := livekit.JobID(update.JobId)
|
|
job, ok := w.runningJobs[jobID]
|
|
if !ok {
|
|
return nil, psrpc.NewErrorf(psrpc.NotFound, "received job update for unknown job")
|
|
}
|
|
|
|
now := time.Now()
|
|
job.State.UpdatedAt = now.UnixNano()
|
|
|
|
if job.State.Status == livekit.JobStatus_JS_PENDING && update.Status != livekit.JobStatus_JS_PENDING {
|
|
job.State.StartedAt = now.UnixNano()
|
|
}
|
|
|
|
job.State.Status = update.Status
|
|
job.State.Error = update.Error
|
|
|
|
if JobStatusIsEnded(update.Status) {
|
|
job.State.EndedAt = now.UnixNano()
|
|
delete(w.runningJobs, jobID)
|
|
|
|
w.logger.Infow("job ended", "jobID", update.JobId, "status", update.Status, "error", update.Error)
|
|
}
|
|
|
|
return proto.Clone(job.State).(*livekit.JobState), nil
|
|
}
|
|
|
|
func (w *Worker) HandleSimulateJob(simulate *livekit.SimulateJobRequest) error {
|
|
jobType := livekit.JobType_JT_ROOM
|
|
if simulate.Participant != nil {
|
|
jobType = livekit.JobType_JT_PUBLISHER
|
|
}
|
|
|
|
job := &livekit.Job{
|
|
Id: guid.New(guid.AgentJobPrefix),
|
|
Type: jobType,
|
|
Room: simulate.Room,
|
|
Participant: simulate.Participant,
|
|
Namespace: w.Namespace,
|
|
AgentName: w.AgentName,
|
|
}
|
|
|
|
go func() {
|
|
_, err := w.AssignJob(w.ctx, job)
|
|
if err != nil {
|
|
w.logger.Errorw("unable to simulate job", err, "jobID", job.Id)
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (w *Worker) HandleUpdateWorker(update *livekit.UpdateWorkerStatus) error {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if status := update.Status; status != nil && w.status != *status {
|
|
w.status = *status
|
|
w.Logger().Debugw("worker status changed", "status", w.status)
|
|
}
|
|
w.load = update.GetLoad()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (w *Worker) HandleMigrateJob(req *livekit.MigrateJobRequest) error {
|
|
// TODO(theomonnom): On OSS this is not implemented
|
|
// We could maybe just move a specific job to another worker
|
|
return nil
|
|
}
|