Files
livekit/pkg/agent/worker.go
2024-09-18 00:17:45 -07:00

528 lines
14 KiB
Go

// Copyright 2024 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"context"
"errors"
"fmt"
"sync"
"time"
"google.golang.org/protobuf/proto"
pagent "github.com/livekit/protocol/agent"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils/guid"
"github.com/livekit/psrpc"
)
var (
ErrUnimplementedWrorkerSignal = errors.New("unimplemented worker signal")
ErrUnknownWorkerSignal = errors.New("unknown worker signal")
ErrUnknownJobType = errors.New("unknown job type")
ErrWorkerClosed = errors.New("worker closed")
ErrWorkerNotAvailable = errors.New("worker not available")
ErrAvailabilityTimeout = errors.New("agent worker availability timeout")
ErrDuplicateJobAssignment = errors.New("duplicate job assignment")
)
type WorkerProtocolVersion int
const CurrentProtocol = 1
const (
registerTimeout = 10 * time.Second
assignJobTimeout = 10 * time.Second
pingFrequency = 10 * time.Second
)
type SignalConn interface {
WriteServerMessage(msg *livekit.ServerMessage) (int, error)
ReadWorkerMessage() (*livekit.WorkerMessage, int, error)
SetReadDeadline(time.Time) error
Close() error
}
func JobStatusIsEnded(s livekit.JobStatus) bool {
return s == livekit.JobStatus_JS_SUCCESS || s == livekit.JobStatus_JS_FAILED
}
type WorkerSignalHandler interface {
HandleRegister(*livekit.RegisterWorkerRequest) error
HandleAvailability(*livekit.AvailabilityResponse) error
HandleUpdateJob(*livekit.UpdateJobStatus) error
HandleSimulateJob(*livekit.SimulateJobRequest) error
HandlePing(*livekit.WorkerPing) error
HandleUpdateWorker(*livekit.UpdateWorkerStatus) error
HandleMigrateJob(*livekit.MigrateJobRequest) error
}
func DispatchWorkerSignal(req *livekit.WorkerMessage, h WorkerSignalHandler) error {
switch m := req.Message.(type) {
case *livekit.WorkerMessage_Register:
return h.HandleRegister(m.Register)
case *livekit.WorkerMessage_Availability:
return h.HandleAvailability(m.Availability)
case *livekit.WorkerMessage_UpdateJob:
return h.HandleUpdateJob(m.UpdateJob)
case *livekit.WorkerMessage_SimulateJob:
return h.HandleSimulateJob(m.SimulateJob)
case *livekit.WorkerMessage_Ping:
return h.HandlePing(m.Ping)
case *livekit.WorkerMessage_UpdateWorker:
return h.HandleUpdateWorker(m.UpdateWorker)
case *livekit.WorkerMessage_MigrateJob:
return h.HandleMigrateJob(m.MigrateJob)
default:
return ErrUnknownWorkerSignal
}
}
var _ WorkerSignalHandler = (*UnimplementedWorkerSignalHandler)(nil)
type UnimplementedWorkerSignalHandler struct{}
func (UnimplementedWorkerSignalHandler) HandleRegister(*livekit.RegisterWorkerRequest) error {
return fmt.Errorf("%w: Register", ErrUnimplementedWrorkerSignal)
}
func (UnimplementedWorkerSignalHandler) HandleAvailability(*livekit.AvailabilityResponse) error {
return fmt.Errorf("%w: Availability", ErrUnimplementedWrorkerSignal)
}
func (UnimplementedWorkerSignalHandler) HandleUpdateJob(*livekit.UpdateJobStatus) error {
return fmt.Errorf("%w: UpdateJob", ErrUnimplementedWrorkerSignal)
}
func (UnimplementedWorkerSignalHandler) HandleSimulateJob(*livekit.SimulateJobRequest) error {
return fmt.Errorf("%w: SimulateJob", ErrUnimplementedWrorkerSignal)
}
func (UnimplementedWorkerSignalHandler) HandlePing(*livekit.WorkerPing) error {
return fmt.Errorf("%w: Ping", ErrUnimplementedWrorkerSignal)
}
func (UnimplementedWorkerSignalHandler) HandleUpdateWorker(*livekit.UpdateWorkerStatus) error {
return fmt.Errorf("%w: UpdateWorker", ErrUnimplementedWrorkerSignal)
}
func (UnimplementedWorkerSignalHandler) HandleMigrateJob(*livekit.MigrateJobRequest) error {
return fmt.Errorf("%w: MigrateJob", ErrUnimplementedWrorkerSignal)
}
type WorkerPingHandler struct {
UnimplementedWorkerSignalHandler
conn SignalConn
}
func (h WorkerPingHandler) HandlePing(ping *livekit.WorkerPing) error {
_, err := h.conn.WriteServerMessage(&livekit.ServerMessage{
Message: &livekit.ServerMessage_Pong{
Pong: &livekit.WorkerPong{
LastTimestamp: ping.Timestamp,
Timestamp: time.Now().UnixMilli(),
},
},
})
return err
}
type WorkerRegistration struct {
Protocol WorkerProtocolVersion
ID string
Version string
AgentName string
Namespace string
JobType livekit.JobType
Permissions *livekit.ParticipantPermission
}
var _ WorkerSignalHandler = (*WorkerRegisterer)(nil)
type WorkerRegisterer struct {
WorkerPingHandler
serverInfo *livekit.ServerInfo
protocol WorkerProtocolVersion
deadline time.Time
registration WorkerRegistration
registered bool
}
func NewWorkerRegisterer(conn SignalConn, serverInfo *livekit.ServerInfo, protocol WorkerProtocolVersion) *WorkerRegisterer {
return &WorkerRegisterer{
WorkerPingHandler: WorkerPingHandler{conn: conn},
serverInfo: serverInfo,
protocol: protocol,
deadline: time.Now().Add(registerTimeout),
}
}
func (h *WorkerRegisterer) Deadline() time.Time {
return h.deadline
}
func (h *WorkerRegisterer) Registration() WorkerRegistration {
return h.registration
}
func (h *WorkerRegisterer) Registered() bool {
return h.registered
}
func (h *WorkerRegisterer) HandleRegister(req *livekit.RegisterWorkerRequest) error {
if !livekit.IsJobType(req.GetType()) {
return ErrUnknownJobType
}
permissions := req.AllowedPermissions
if permissions == nil {
permissions = &livekit.ParticipantPermission{
CanSubscribe: true,
CanPublish: true,
CanPublishData: true,
CanUpdateMetadata: true,
}
}
h.registration = WorkerRegistration{
Protocol: h.protocol,
ID: guid.New(guid.AgentWorkerPrefix),
Version: req.Version,
AgentName: req.AgentName,
Namespace: req.GetNamespace(),
JobType: req.GetType(),
Permissions: permissions,
}
h.registered = true
_, err := h.conn.WriteServerMessage(&livekit.ServerMessage{
Message: &livekit.ServerMessage_Register{
Register: &livekit.RegisterWorkerResponse{
WorkerId: h.registration.ID,
ServerInfo: h.serverInfo,
},
},
})
return err
}
var _ WorkerSignalHandler = (*Worker)(nil)
type Worker struct {
WorkerPingHandler
WorkerRegistration
apiKey string
apiSecret string
conn SignalConn
logger logger.Logger
ctx context.Context
cancel context.CancelFunc
closed chan struct{}
mu sync.Mutex
load float32
status livekit.WorkerStatus
runningJobs map[string]*livekit.Job
availability map[string]chan *livekit.AvailabilityResponse
}
func NewWorker(
registration WorkerRegistration,
apiKey string,
apiSecret string,
conn SignalConn,
logger logger.Logger,
) *Worker {
ctx, cancel := context.WithCancel(context.Background())
return &Worker{
WorkerRegistration: registration,
apiKey: apiKey,
apiSecret: apiSecret,
conn: conn,
logger: logger.WithValues(
"workerID", registration.ID,
"agentName", registration.AgentName,
"jobType", registration.JobType.String(),
),
ctx: ctx,
cancel: cancel,
closed: make(chan struct{}),
runningJobs: make(map[string]*livekit.Job),
availability: make(map[string]chan *livekit.AvailabilityResponse),
}
}
func (w *Worker) sendRequest(req *livekit.ServerMessage) {
if _, err := w.conn.WriteServerMessage(req); err != nil {
w.logger.Warnw("error writing to websocket", err)
}
}
func (w *Worker) Status() livekit.WorkerStatus {
w.mu.Lock()
defer w.mu.Unlock()
return w.status
}
func (w *Worker) Load() float32 {
w.mu.Lock()
defer w.mu.Unlock()
return w.load
}
func (w *Worker) Logger() logger.Logger {
return w.logger
}
func (w *Worker) RunningJobs() map[string]*livekit.Job {
w.mu.Lock()
defer w.mu.Unlock()
jobs := make(map[string]*livekit.Job, len(w.runningJobs))
for k, v := range w.runningJobs {
jobs[k] = v
}
return jobs
}
func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error {
availCh := make(chan *livekit.AvailabilityResponse, 1)
w.mu.Lock()
if _, ok := w.availability[job.Id]; ok {
w.mu.Unlock()
return ErrDuplicateJobAssignment
}
w.availability[job.Id] = availCh
w.mu.Unlock()
defer func() {
w.mu.Lock()
delete(w.availability, job.Id)
w.mu.Unlock()
}()
now := time.Now()
if job.State == nil {
job.State = &livekit.JobState{
UpdatedAt: now.UnixNano(),
}
}
w.sendRequest(&livekit.ServerMessage{Message: &livekit.ServerMessage_Availability{
Availability: &livekit.AvailabilityRequest{Job: job},
}})
timeout := time.NewTimer(assignJobTimeout)
defer timeout.Stop()
// See handleAvailability for the response
select {
case res := <-availCh:
if !res.Available {
return ErrWorkerNotAvailable
}
job.State.ParticipantIdentity = res.ParticipantIdentity
token, err := pagent.BuildAgentToken(w.apiKey, w.apiSecret, job.Room.Name, res.ParticipantIdentity, res.ParticipantName, res.ParticipantMetadata, w.Permissions)
if err != nil {
w.logger.Errorw("failed to build agent token", err)
return err
}
// In OSS, Url is nil, and the used API Key is the same as the one used to connect the worker
w.sendRequest(&livekit.ServerMessage{Message: &livekit.ServerMessage_Assignment{
Assignment: &livekit.JobAssignment{Job: job, Url: nil, Token: token},
}})
w.mu.Lock()
w.runningJobs[job.Id] = job
w.mu.Unlock()
// TODO sweep jobs that are never started. We can't do this until all SDKs actually update the the JOB state
return nil
case <-timeout.C:
return ErrAvailabilityTimeout
case <-w.ctx.Done():
return ErrWorkerClosed
case <-ctx.Done():
return ctx.Err()
}
}
func (w *Worker) TerminateJob(jobID string, reason rpc.JobTerminateReason) (*livekit.JobState, error) {
w.mu.Lock()
job := w.runningJobs[jobID]
w.mu.Unlock()
if job == nil {
return nil, psrpc.NewErrorf(psrpc.NotFound, "no running job for given jobID")
}
w.sendRequest(&livekit.ServerMessage{Message: &livekit.ServerMessage_Termination{
Termination: &livekit.JobTermination{
JobId: jobID,
},
}})
status := livekit.JobStatus_JS_SUCCESS
errorStr := ""
if reason == rpc.JobTerminateReason_AGENT_LEFT_ROOM {
status = livekit.JobStatus_JS_FAILED
errorStr = "agent worker left the room"
}
return w.UpdateJobStatus(&livekit.UpdateJobStatus{
JobId: jobID,
Status: status,
Error: errorStr,
})
}
func (w *Worker) UpdateMetadata(metadata string) {
w.logger.Debugw("worker metadata updated", nil, "metadata", metadata)
}
func (w *Worker) IsClosed() bool {
select {
case <-w.closed:
return true
default:
return false
}
}
func (w *Worker) Close() {
w.mu.Lock()
if w.IsClosed() {
w.mu.Unlock()
return
}
w.logger.Infow("closing worker")
close(w.closed)
w.cancel()
_ = w.conn.Close()
w.mu.Unlock()
}
func (w *Worker) HandleAvailability(res *livekit.AvailabilityResponse) error {
w.mu.Lock()
defer w.mu.Unlock()
availCh, ok := w.availability[res.JobId]
if !ok {
w.logger.Warnw("received availability response for unknown job", nil, "jobId", res.JobId)
return nil
}
availCh <- res
delete(w.availability, res.JobId)
return nil
}
func (w *Worker) HandleUpdateJob(update *livekit.UpdateJobStatus) error {
_, err := w.UpdateJobStatus(update)
if err != nil {
w.logger.Infow("received job update for unknown job", "jobID", update.JobId)
}
return err
}
func (w *Worker) UpdateJobStatus(update *livekit.UpdateJobStatus) (*livekit.JobState, error) {
w.mu.Lock()
defer w.mu.Unlock()
job, ok := w.runningJobs[update.JobId]
if !ok {
return nil, psrpc.NewErrorf(psrpc.NotFound, "received job update for unknown job")
}
now := time.Now()
job.State.UpdatedAt = now.UnixNano()
if job.State.Status == livekit.JobStatus_JS_PENDING && update.Status != livekit.JobStatus_JS_PENDING {
job.State.StartedAt = now.UnixNano()
}
if job.State.Status < livekit.JobStatus_JS_SUCCESS && JobStatusIsEnded(update.Status) {
job.State.EndedAt = now.UnixNano()
}
job.State.Status = update.Status
job.State.Error = update.Error
// TODO do not delete, leave inside the JobDefinition
if JobStatusIsEnded(job.State.Status) {
delete(w.runningJobs, job.Id)
}
return proto.Clone(job.State).(*livekit.JobState), nil
}
func (w *Worker) HandleSimulateJob(simulate *livekit.SimulateJobRequest) error {
jobType := livekit.JobType_JT_ROOM
if simulate.Participant != nil {
jobType = livekit.JobType_JT_PUBLISHER
}
job := &livekit.Job{
Id: guid.New(guid.AgentJobPrefix),
Type: jobType,
Room: simulate.Room,
Participant: simulate.Participant,
Namespace: w.Namespace,
AgentName: w.AgentName,
}
go func() {
err := w.AssignJob(w.ctx, job)
if err != nil {
w.logger.Errorw("failed to simulate job, assignment failed", err, "jobId", job.Id)
}
}()
return nil
}
func (w *Worker) HandleUpdateWorker(update *livekit.UpdateWorkerStatus) error {
w.logger.Debugw("worker status update", "update", logger.Proto(update))
w.mu.Lock()
defer w.mu.Unlock()
if update.Status != nil {
w.status = update.GetStatus()
}
w.load = update.GetLoad()
return nil
}
func (w *Worker) HandleMigrateJob(req *livekit.MigrateJobRequest) error {
// TODO(theomonnom): On OSS this is not implemented
// We could maybe just move a specific job to another worker
return nil
}