mirror of
https://github.com/livekit/livekit.git
synced 2026-04-01 19:35:43 +00:00
389 lines
9.2 KiB
Go
389 lines
9.2 KiB
Go
// Copyright 2024 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package agent
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
|
|
pagent "github.com/livekit/protocol/agent"
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
"github.com/livekit/protocol/utils"
|
|
putil "github.com/livekit/protocol/utils"
|
|
)
|
|
|
|
type WorkerProtocolVersion int
|
|
|
|
const CurrentProtocol = 1
|
|
|
|
const (
|
|
registerTimeout = 10 * time.Second
|
|
assignJobTimeout = 10 * time.Second
|
|
pingFrequency = 10 * time.Second
|
|
)
|
|
|
|
var (
|
|
ErrWorkerClosed = errors.New("worker closed")
|
|
ErrWorkerNotAvailable = errors.New("worker not available")
|
|
ErrAvailabilityTimeout = errors.New("agent worker availability timeout")
|
|
)
|
|
|
|
type sigConn interface {
|
|
WriteServerMessage(msg *livekit.ServerMessage) (int, error)
|
|
}
|
|
|
|
type Worker struct {
|
|
id string
|
|
jobType livekit.JobType
|
|
version string
|
|
name string
|
|
namespace string
|
|
load float32
|
|
permissions *livekit.ParticipantPermission
|
|
apiKey string
|
|
apiSecret string
|
|
serverInfo *livekit.ServerInfo
|
|
mu sync.Mutex
|
|
|
|
protocolVersion WorkerProtocolVersion
|
|
registered atomic.Bool
|
|
status livekit.WorkerStatus
|
|
runningJobs map[string]*Job
|
|
|
|
onWorkerRegistered func(w *Worker)
|
|
|
|
conn *websocket.Conn
|
|
sigConn sigConn
|
|
closed chan struct{}
|
|
|
|
availability map[string]chan *livekit.AvailabilityResponse
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
Logger logger.Logger
|
|
}
|
|
|
|
func NewWorker(
|
|
protocolVersion WorkerProtocolVersion,
|
|
apiKey string,
|
|
apiSecret string,
|
|
serverInfo *livekit.ServerInfo,
|
|
conn *websocket.Conn,
|
|
sigConn sigConn,
|
|
logger logger.Logger,
|
|
) *Worker {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
w := &Worker{
|
|
id: putil.NewGuid(utils.AgentWorkerPrefix),
|
|
protocolVersion: protocolVersion,
|
|
apiKey: apiKey,
|
|
apiSecret: apiSecret,
|
|
serverInfo: serverInfo,
|
|
closed: make(chan struct{}),
|
|
runningJobs: make(map[string]*Job),
|
|
availability: make(map[string]chan *livekit.AvailabilityResponse),
|
|
conn: conn,
|
|
sigConn: sigConn,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
Logger: logger,
|
|
}
|
|
|
|
go func() {
|
|
<-time.After(registerTimeout)
|
|
if !w.registered.Load() && !w.IsClosed() {
|
|
w.Logger.Warnw("worker did not register in time", nil, "id", w.id)
|
|
w.Close()
|
|
}
|
|
}()
|
|
|
|
return w
|
|
}
|
|
|
|
func (w *Worker) sendRequest(req *livekit.ServerMessage) {
|
|
if _, err := w.sigConn.WriteServerMessage(req); err != nil {
|
|
w.Logger.Errorw("error writing to websocket", err)
|
|
}
|
|
}
|
|
|
|
func (w *Worker) ID() string {
|
|
return w.id
|
|
}
|
|
|
|
func (w *Worker) JobType() livekit.JobType {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
return w.jobType
|
|
}
|
|
|
|
func (w *Worker) Namespace() string {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
return w.namespace
|
|
}
|
|
|
|
func (w *Worker) Status() livekit.WorkerStatus {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
return w.status
|
|
}
|
|
|
|
func (w *Worker) Load() float32 {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
return w.load
|
|
}
|
|
|
|
func (w *Worker) OnWorkerRegistered(f func(w *Worker)) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
w.onWorkerRegistered = f
|
|
}
|
|
|
|
func (w *Worker) Registered() bool {
|
|
return w.registered.Load()
|
|
}
|
|
|
|
func (w *Worker) RunningJobs() map[string]*Job {
|
|
jobs := make(map[string]*Job, len(w.runningJobs))
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
for k, v := range w.runningJobs {
|
|
jobs[k] = v
|
|
}
|
|
return jobs
|
|
}
|
|
|
|
func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error {
|
|
availCh := make(chan *livekit.AvailabilityResponse, 1)
|
|
|
|
w.mu.Lock()
|
|
w.availability[job.Id] = availCh
|
|
w.mu.Unlock()
|
|
|
|
w.sendRequest(&livekit.ServerMessage{Message: &livekit.ServerMessage_Availability{
|
|
Availability: &livekit.AvailabilityRequest{Job: job},
|
|
}})
|
|
|
|
// See handleAvailability for the response
|
|
select {
|
|
case res := <-availCh:
|
|
if !res.Available {
|
|
return ErrWorkerNotAvailable
|
|
}
|
|
|
|
token, err := pagent.BuildAgentToken(w.apiKey, w.apiSecret, job.Room.Name, res.ParticipantIdentity, res.ParticipantName, res.ParticipantMetadata, w.permissions)
|
|
if err != nil {
|
|
w.Logger.Errorw("failed to build agent token", err)
|
|
return err
|
|
}
|
|
|
|
// In OSS, Url is nil, and the used API Key is the same as the one used to connect the worker
|
|
w.sendRequest(&livekit.ServerMessage{Message: &livekit.ServerMessage_Assignment{
|
|
Assignment: &livekit.JobAssignment{Job: job, Url: nil, Token: token},
|
|
}})
|
|
|
|
// TODO(theomonnom): Check if an agent was successfully connected to the room before returning
|
|
return nil
|
|
case <-time.After(assignJobTimeout):
|
|
return ErrAvailabilityTimeout
|
|
case <-w.ctx.Done():
|
|
return ErrWorkerClosed
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
func (w *Worker) UpdateStatus(status *livekit.UpdateWorkerStatus) {
|
|
w.mu.Lock()
|
|
if status.Status != nil {
|
|
w.status = status.GetStatus()
|
|
}
|
|
w.load = status.GetLoad()
|
|
w.mu.Unlock()
|
|
|
|
if status.Metadata != nil {
|
|
w.UpdateMetadata(status.GetMetadata())
|
|
}
|
|
}
|
|
|
|
func (w *Worker) UpdateMetadata(metadata string) {
|
|
w.Logger.Debugw("worker metadata updated", nil, "metadata", metadata)
|
|
}
|
|
|
|
func (w *Worker) IsClosed() bool {
|
|
select {
|
|
case <-w.closed:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (w *Worker) Close() {
|
|
w.mu.Lock()
|
|
if w.IsClosed() {
|
|
w.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
w.Logger.Infow("closing worker")
|
|
|
|
close(w.closed)
|
|
w.cancel()
|
|
_ = w.conn.Close()
|
|
w.mu.Unlock()
|
|
}
|
|
|
|
func (w *Worker) HandleMessage(req *livekit.WorkerMessage) {
|
|
switch m := req.Message.(type) {
|
|
case *livekit.WorkerMessage_Register:
|
|
go w.handleRegister(m.Register)
|
|
case *livekit.WorkerMessage_Availability:
|
|
go w.handleAvailability(m.Availability)
|
|
case *livekit.WorkerMessage_UpdateJob:
|
|
go w.handleJobUpdate(m.UpdateJob)
|
|
case *livekit.WorkerMessage_SimulateJob:
|
|
go w.handleSimulateJob(m.SimulateJob)
|
|
case *livekit.WorkerMessage_Ping:
|
|
go w.handleWorkerPing(m.Ping)
|
|
case *livekit.WorkerMessage_UpdateWorker:
|
|
go w.handleWorkerStatus(m.UpdateWorker)
|
|
case *livekit.WorkerMessage_MigrateJob:
|
|
go w.handleMigrateJob(m.MigrateJob)
|
|
}
|
|
}
|
|
|
|
func (w *Worker) handleRegister(req *livekit.RegisterWorkerRequest) {
|
|
if w.registered.Load() {
|
|
w.Logger.Warnw("worker already registered", nil, "id", w.id)
|
|
return
|
|
}
|
|
|
|
w.mu.Lock()
|
|
onWorkerRegistered := w.onWorkerRegistered
|
|
w.jobType = req.Type
|
|
w.version = req.Version
|
|
w.name = req.Name
|
|
w.namespace = req.GetNamespace()
|
|
|
|
if req.AllowedPermissions != nil {
|
|
w.permissions = req.AllowedPermissions
|
|
} else {
|
|
// Use default agent permissions
|
|
w.permissions = &livekit.ParticipantPermission{
|
|
CanSubscribe: true,
|
|
CanPublish: true,
|
|
CanPublishData: true,
|
|
CanUpdateMetadata: true,
|
|
}
|
|
}
|
|
|
|
w.status = livekit.WorkerStatus_WS_AVAILABLE
|
|
w.registered.Store(true)
|
|
w.mu.Unlock()
|
|
|
|
w.Logger.Debugw("worker registered", "request", req)
|
|
|
|
w.sendRequest(&livekit.ServerMessage{
|
|
Message: &livekit.ServerMessage_Register{
|
|
Register: &livekit.RegisterWorkerResponse{
|
|
WorkerId: w.ID(),
|
|
ServerInfo: w.serverInfo,
|
|
},
|
|
},
|
|
})
|
|
|
|
if onWorkerRegistered != nil {
|
|
onWorkerRegistered(w)
|
|
}
|
|
}
|
|
|
|
func (w *Worker) handleAvailability(res *livekit.AvailabilityResponse) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
availCh, ok := w.availability[res.JobId]
|
|
if !ok {
|
|
w.Logger.Warnw("received availability response for unknown job", nil, "jobId", res.JobId)
|
|
return
|
|
}
|
|
|
|
availCh <- res
|
|
delete(w.availability, res.JobId)
|
|
}
|
|
|
|
func (w *Worker) handleJobUpdate(update *livekit.UpdateJobStatus) {
|
|
w.mu.Lock()
|
|
job, ok := w.runningJobs[update.JobId]
|
|
w.mu.Unlock()
|
|
|
|
if !ok {
|
|
w.Logger.Warnw("received job update for unknown job", nil, "jobId", update.JobId)
|
|
return
|
|
}
|
|
|
|
job.UpdateStatus(update)
|
|
}
|
|
|
|
func (w *Worker) handleSimulateJob(simulate *livekit.SimulateJobRequest) {
|
|
jobType := livekit.JobType_JT_ROOM
|
|
if simulate.Participant != nil {
|
|
jobType = livekit.JobType_JT_PUBLISHER
|
|
}
|
|
|
|
job := &livekit.Job{
|
|
Id: utils.NewGuid(utils.AgentJobPrefix),
|
|
Type: jobType,
|
|
Room: simulate.Room,
|
|
Participant: simulate.Participant,
|
|
Namespace: w.Namespace(),
|
|
}
|
|
|
|
ctx := context.Background()
|
|
err := w.AssignJob(ctx, job)
|
|
if err != nil {
|
|
w.Logger.Errorw("failed to simulate job, assignment failed", err, "jobId", job.Id)
|
|
}
|
|
|
|
}
|
|
|
|
func (w *Worker) handleWorkerPing(ping *livekit.WorkerPing) {
|
|
w.sendRequest(&livekit.ServerMessage{Message: &livekit.ServerMessage_Pong{
|
|
Pong: &livekit.WorkerPong{
|
|
LastTimestamp: ping.Timestamp,
|
|
Timestamp: time.Now().UnixMilli(),
|
|
},
|
|
}})
|
|
}
|
|
|
|
func (w *Worker) handleWorkerStatus(update *livekit.UpdateWorkerStatus) {
|
|
w.Logger.Debugw("worker status update", "status", update.Status, "load", update.Load)
|
|
w.UpdateStatus(update)
|
|
}
|
|
|
|
func (w *Worker) handleMigrateJob(migrate *livekit.MigrateJobRequest) {
|
|
// TODO(theomonnom): On OSS this is not implemented
|
|
// We could maybe just move a specific job to another worker
|
|
}
|