Partial support for agent dispatch management (#2872)

- Store agent dispaches independently of room agents on rtc.Room
- Serialize agent dispatches in rtc.Room
- Support for agent dispatch and job serialization in redis

The agent Job object references denormalized Room and ParticipantInfo object. When storing Jobs, this sets the Room to nil, and only stores the Participant identity field. When read back, these fields need to be set to their current value.
This commit is contained in:
Benjamin Pracht
2024-07-18 13:36:43 -07:00
committed by GitHub
parent 95f4b304ef
commit a877ba2352
17 changed files with 887 additions and 81 deletions

2
go.mod
View File

@@ -20,7 +20,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7
github.com/livekit/protocol v1.19.2-0.20240716163657-b5404e5c9aae
github.com/livekit/protocol v1.19.2-0.20240716225317-497688ff49e4
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0

4
go.sum
View File

@@ -167,8 +167,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 h1:F1L8inJoynwIAYpZENNYS+1xHJMF5RFRorsnAlcxfSY=
github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.19.2-0.20240716163657-b5404e5c9aae h1:Pj1/pdRCCZZYuKRTxbjYtw1eClmkoYxG3nhQNcVA5no=
github.com/livekit/protocol v1.19.2-0.20240716163657-b5404e5c9aae/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4=
github.com/livekit/protocol v1.19.2-0.20240716225317-497688ff49e4 h1:8inXM3a1qo6Y17dbfFIy/SafcFcz3Ynva2Y5+0UfwK4=
github.com/livekit/protocol v1.19.2-0.20240716225317-497688ff49e4/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4=
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo=
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=

View File

@@ -42,13 +42,14 @@ const (
type Client interface {
// LaunchJob starts a room or participant job on an agent.
// it will launch a job once for each worker in each namespace
LaunchJob(ctx context.Context, desc *JobRequest)
LaunchJob(ctx context.Context, desc *JobRequest) *serverutils.IncrementalDispatcher[*livekit.Job]
Stop() error
}
type JobRequest struct {
JobType livekit.JobType
Room *livekit.Room
DispatchId string
JobType livekit.JobType
Room *livekit.Room
// only set for participant jobs
Participant *livekit.ParticipantInfo
Metadata string
@@ -111,37 +112,53 @@ func NewAgentClient(bus psrpc.MessageBus) (Client, error) {
return c, nil
}
func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) {
func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) *serverutils.IncrementalDispatcher[*livekit.Job] {
jobTypeTopic := RoomAgentTopic
if desc.JobType == livekit.JobType_JT_PUBLISHER {
jobTypeTopic = PublisherAgentTopic
}
ret := serverutils.NewIncrementalDispatcher[*livekit.Job]()
dispatcher := c.getDispatcher(desc.AgentName, desc.JobType)
if dispatcher == nil {
logger.Infow("not dispatching agent job since no worker is available", "agentName", desc.AgentName, "jobType", desc.JobType)
return
return ret
}
var wg sync.WaitGroup
dispatcher.ForEach(func(curNs string) {
topic := GetAgentTopic(desc.AgentName, curNs)
wg.Add(1)
c.workers.Submit(func() {
defer wg.Done()
// The cached agent parameters do not provide the exact combination of available job type/agent name/namespace, so some of the JobRequest RPC may not trigger any worker
_, err := c.client.JobRequest(context.Background(), topic, jobTypeTopic, &livekit.Job{
job := &livekit.Job{
Id: utils.NewGuid(utils.AgentJobPrefix),
DispatchId: desc.DispatchId,
Type: desc.JobType,
Room: desc.Room,
Participant: desc.Participant,
Namespace: curNs,
AgentName: desc.AgentName,
Metadata: desc.Metadata,
})
if err != nil {
logger.Infow("failed to send job request", "error", err, "namespace", curNs, "jobType", desc.JobType)
}
resp, err := c.client.JobRequest(context.Background(), topic, jobTypeTopic, job)
if err != nil {
logger.Infow("failed to send job request", "error", err, "namespace", curNs, "jobType", desc.JobType, "agentName", desc.AgentName)
return
}
job.State = resp.State
ret.Add(job)
})
})
c.workers.Submit(func() {
wg.Wait()
ret.Done()
})
return ret
}
func (c *agentClient) getDispatcher(agName string, jobType livekit.JobType) *serverutils.IncrementalDispatcher[string] {

View File

@@ -211,8 +211,11 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error {
w.mu.Unlock()
}()
now := time.Now()
if job.State == nil {
job.State = &livekit.JobState{}
job.State = &livekit.JobState{
UpdatedAt: now.UnixNano(),
}
}
w.sendRequest(&livekit.ServerMessage{Message: &livekit.ServerMessage_Availability{

View File

@@ -35,6 +35,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/utils/guid"
"github.com/livekit/livekit-server/pkg/agent"
"github.com/livekit/livekit-server/pkg/config"
@@ -62,6 +63,16 @@ var (
roomUpdateInterval = 5 * time.Second // frequency to update room participant counts
)
// Duplicate the service.AgentStore interface to avoid a rtc -> service -> rtc import cycle
type AgentStore interface {
StoreAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error
DeleteAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error
ListAgentDispatches(ctx context.Context, roomName livekit.RoomName) ([]*livekit.AgentDispatch, error)
StoreAgentJob(ctx context.Context, job *livekit.Job) error
DeleteAgentJob(ctx context.Context, job *livekit.Job) error
}
type broadcastOptions struct {
skipSource bool
immediate bool
@@ -95,15 +106,17 @@ type Room struct {
protoProxy *utils.ProtoProxy[*livekit.Room]
Logger logger.Logger
config WebRTCConfig
audioConfig *config.AudioConfig
serverInfo *livekit.ServerInfo
telemetry telemetry.TelemetryService
egressLauncher EgressLauncher
trackManager *RoomTrackManager
config WebRTCConfig
audioConfig *config.AudioConfig
serverInfo *livekit.ServerInfo
telemetry telemetry.TelemetryService
egressLauncher EgressLauncher
trackManager *RoomTrackManager
agentDispatches []*livekit.AgentDispatch
// agents
agentClient agent.Client
agentStore AgentStore
// map of identity -> Participant
participants map[livekit.ParticipantIdentity]types.LocalParticipant
@@ -142,6 +155,7 @@ func NewRoom(
serverInfo *livekit.ServerInfo,
telemetry telemetry.TelemetryService,
agentClient agent.Client,
agentStore AgentStore,
egressLauncher EgressLauncher,
) *Room {
r := &Room{
@@ -157,6 +171,7 @@ func NewRoom(
telemetry: telemetry,
egressLauncher: egressLauncher,
agentClient: agentClient,
agentStore: agentStore,
trackManager: NewRoomTrackManager(),
serverInfo: serverInfo,
participants: make(map[livekit.ParticipantIdentity]types.LocalParticipant),
@@ -182,6 +197,10 @@ func NewRoom(
}
r.protoProxy = utils.NewProtoProxy[*livekit.Room](roomUpdateInterval, r.updateProto)
r.createAgentDispatchesFromRoomAgent()
r.launchRoomAgents()
go r.audioUpdateWorker()
go r.connectionQualityWorker()
go r.changeUpdateWorker()
@@ -1403,23 +1422,46 @@ func (r *Room) simulationCleanupWorker() {
}
}
func (r *Room) launchRoomAgents() {
if r.agentClient == nil {
return
}
for _, ag := range r.agentDispatches {
go func() {
inc := r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{
JobType: livekit.JobType_JT_ROOM,
Room: r.ToProto(),
Metadata: ag.Metadata,
AgentName: ag.AgentName,
DispatchId: ag.Id,
})
inc.ForEach(func(job *livekit.Job) {
r.agentStore.StoreAgentJob(context.Background(), job)
})
}()
}
}
func (r *Room) launchPublisherAgents(p types.Participant) {
if p == nil || p.IsDependent() || r.agentClient == nil {
return
}
if r.internal == nil {
return
}
for _, ag := range r.internal.AgentDispatches {
go r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{
JobType: livekit.JobType_JT_PUBLISHER,
Room: r.ToProto(),
Participant: p.ToProto(),
Metadata: ag.Metadata,
AgentName: ag.AgentName,
})
for _, ag := range r.agentDispatches {
go func() {
inc := r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{
JobType: livekit.JobType_JT_PUBLISHER,
Room: r.ToProto(),
Participant: p.ToProto(),
Metadata: ag.Metadata,
AgentName: ag.AgentName,
DispatchId: ag.Id,
})
inc.ForEach(func(job *livekit.Job) {
r.agentStore.StoreAgentJob(context.Background(), job)
})
}()
}
}
@@ -1440,6 +1482,32 @@ func (r *Room) DebugInfo() map[string]interface{} {
return info
}
func (r *Room) createAgentDispatchesFromRoomAgent() {
now := time.Now()
if r.internal == nil {
return
}
for _, ag := range r.internal.AgentDispatches {
ad := &livekit.AgentDispatch{
Id: guid.New(guid.AgentDispatchPrefix),
AgentName: ag.AgentName,
Metadata: ag.Metadata,
Room: r.protoRoom.Name,
State: &livekit.AgentDispatchState{
CreatedAt: now.UnixNano(),
},
}
r.agentDispatches = append(r.agentDispatches, ad)
if r.agentStore != nil {
err := r.agentStore.StoreAgentDispatch(context.Background(), ad)
if err != nil {
r.Logger.Warnw("failed storing room dispatch", err)
}
}
}
}
// ------------------------------------------------------------
func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, kind livekit.DataPacket_Kind, dp *livekit.DataPacket, logger logger.Logger) {

View File

@@ -813,7 +813,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room {
Region: "testregion",
},
telemetry.NewTelemetryService(webhook.NewDefaultNotifier("", "", nil), &telemetryfakes.FakeAnalyticsService{}),
nil, nil,
nil, nil, nil,
)
for i := 0; i < opts.num+opts.numHidden; i++ {
identity := livekit.ParticipantIdentity(fmt.Sprintf("p%d", i))

View File

@@ -289,7 +289,7 @@ func (h *AgentHandler) HandleWorkerDeregister(w *agent.Worker) {
}
}
func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*emptypb.Empty, error) {
func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*rpc.JobRequestResponse, error) {
key := workerKey{job.AgentName, job.Namespace, job.Type}
attempted := make(map[*agent.Worker]struct{})
for {
@@ -340,7 +340,9 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*empty
return nil, err
}
return &emptypb.Empty{}, nil
return &rpc.JobRequestResponse{
State: job.State,
}, nil
}
}

View File

@@ -95,3 +95,13 @@ type SIPStore interface {
ListSIPDispatchRule(ctx context.Context) ([]*livekit.SIPDispatchRuleInfo, error)
DeleteSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error
}
//counterfeiter:generate . AgentStore
type AgentStore interface {
StoreAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error
DeleteAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error
ListAgentDispatches(ctx context.Context, roomName livekit.RoomName) ([]*livekit.AgentDispatch, error)
StoreAgentJob(ctx context.Context, job *livekit.Job) error
DeleteAgentJob(ctx context.Context, job *livekit.Job) error
}

View File

@@ -20,6 +20,7 @@ import (
"time"
"github.com/thoas/go-funk"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/livekit"
)
@@ -32,16 +33,21 @@ type LocalStore struct {
// map of roomName => { identity: participant }
participants map[livekit.RoomName]map[livekit.ParticipantIdentity]*livekit.ParticipantInfo
agentDispatches map[livekit.RoomName]map[string]*livekit.AgentDispatch
agentJobs map[livekit.RoomName]map[string]*livekit.Job
lock sync.RWMutex
globalLock sync.Mutex
}
func NewLocalStore() *LocalStore {
return &LocalStore{
rooms: make(map[livekit.RoomName]*livekit.Room),
roomInternal: make(map[livekit.RoomName]*livekit.RoomInternal),
participants: make(map[livekit.RoomName]map[livekit.ParticipantIdentity]*livekit.ParticipantInfo),
lock: sync.RWMutex{},
rooms: make(map[livekit.RoomName]*livekit.Room),
roomInternal: make(map[livekit.RoomName]*livekit.RoomInternal),
participants: make(map[livekit.RoomName]map[livekit.ParticipantIdentity]*livekit.ParticipantInfo),
agentDispatches: make(map[livekit.RoomName]map[string]*livekit.AgentDispatch),
agentJobs: make(map[livekit.RoomName]map[string]*livekit.Job),
lock: sync.RWMutex{},
}
}
@@ -102,6 +108,8 @@ func (s *LocalStore) DeleteRoom(ctx context.Context, roomName livekit.RoomName)
delete(s.participants, livekit.RoomName(room.Name))
delete(s.rooms, livekit.RoomName(room.Name))
delete(s.roomInternal, livekit.RoomName(room.Name))
delete(s.agentDispatches, livekit.RoomName(room.Name))
delete(s.agentJobs, livekit.RoomName(room.Name))
return nil
}
@@ -171,3 +179,103 @@ func (s *LocalStore) DeleteParticipant(_ context.Context, roomName livekit.RoomN
}
return nil
}
func (s *LocalStore) StoreAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error {
s.lock.Lock()
defer s.lock.Unlock()
clone := proto.Clone(dispatch).(*livekit.AgentDispatch)
if clone.State != nil {
clone.State.Jobs = nil
}
roomDispatches := s.agentDispatches[livekit.RoomName(dispatch.Room)]
if roomDispatches == nil {
roomDispatches = make(map[string]*livekit.AgentDispatch)
s.agentDispatches[livekit.RoomName(dispatch.Room)] = roomDispatches
}
roomDispatches[clone.Id] = clone
return nil
}
func (s *LocalStore) DeleteAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error {
s.lock.Lock()
defer s.lock.Unlock()
roomDispatches := s.agentDispatches[livekit.RoomName(dispatch.Room)]
if roomDispatches != nil {
delete(roomDispatches, dispatch.Id)
}
return nil
}
func (s *LocalStore) ListAgentDispatches(ctx context.Context, roomName livekit.RoomName) ([]*livekit.AgentDispatch, error) {
s.lock.Lock()
defer s.lock.Unlock()
agentDispatches := s.agentDispatches[roomName]
if agentDispatches == nil {
return nil, nil
}
agentJobs := s.agentJobs[roomName]
var js []*livekit.Job
if agentJobs != nil {
for _, j := range agentJobs {
js = append(js, proto.Clone(j).(*livekit.Job))
}
}
var ds []*livekit.AgentDispatch
m := make(map[string]*livekit.AgentDispatch)
for _, d := range agentDispatches {
clone := proto.Clone(d).(*livekit.AgentDispatch)
m[d.Id] = clone
ds = append(ds, clone)
}
for _, j := range js {
d := m[j.DispatchId]
if d != nil {
d.State.Jobs = append(d.State.Jobs, proto.Clone(j).(*livekit.Job))
}
}
return ds, nil
}
func (s *LocalStore) StoreAgentJob(ctx context.Context, job *livekit.Job) error {
s.lock.Lock()
defer s.lock.Unlock()
clone := proto.Clone(job).(*livekit.Job)
clone.Room = nil
if clone.Participant != nil {
clone.Participant = &livekit.ParticipantInfo{
Identity: clone.Participant.Identity,
}
}
roomJobs := s.agentJobs[livekit.RoomName(job.Room.Name)]
if roomJobs == nil {
roomJobs = make(map[string]*livekit.Job)
s.agentJobs[livekit.RoomName(job.Room.Name)] = roomJobs
}
roomJobs[clone.Id] = clone
return nil
}
func (s *LocalStore) DeleteAgentJob(ctx context.Context, job *livekit.Job) error {
s.lock.Lock()
defer s.lock.Unlock()
roomJobs := s.agentJobs[livekit.RoomName(job.Room.Name)]
if roomJobs != nil {
delete(roomJobs, job.Id)
}
return nil
}

View File

@@ -30,6 +30,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/guid"
"github.com/livekit/psrpc"
"github.com/livekit/livekit-server/version"
)
@@ -58,6 +59,10 @@ const (
// RoomLockPrefix is a simple key containing a provided lock uid
RoomLockPrefix = "room_lock:"
// Agents
AgentDispatchPrefix = "agent_dispatch:"
AgentJobPrefix = "agent_job:"
maxRetries = 5
)
@@ -231,6 +236,8 @@ func (s *RedisStore) DeleteRoom(ctx context.Context, roomName livekit.RoomName)
pp.HDel(s.ctx, RoomsKey, string(roomName))
pp.HDel(s.ctx, RoomInternalKey, string(roomName))
pp.Del(s.ctx, RoomParticipantsPrefix+string(roomName))
pp.Del(s.ctx, AgentDispatchPrefix+string(roomName))
pp.Del(s.ctx, AgentJobPrefix+string(roomName))
_, err = pp.Exec(s.ctx)
return err
@@ -822,6 +829,99 @@ func (s *RedisStore) DeleteIngress(_ context.Context, info *livekit.IngressInfo)
return nil
}
func (s *RedisStore) StoreAgentDispatch(_ context.Context, dispatch *livekit.AgentDispatch) error {
di := proto.Clone(dispatch).(*livekit.AgentDispatch)
// Do not store jobs with the dispatch
if di.State != nil {
di.State.Jobs = nil
}
key := AgentDispatchPrefix + string(dispatch.Room)
data, err := proto.Marshal(di)
if err != nil {
return err
}
return s.rc.HSet(s.ctx, key, di.Id, data).Err()
}
// This will not delete the jobs created by the dispatch
func (s *RedisStore) DeleteAgentDispatch(_ context.Context, dispatch *livekit.AgentDispatch) error {
key := AgentDispatchPrefix + string(dispatch.Room)
return s.rc.HDel(s.ctx, key, dispatch.Id).Err()
}
func (s *RedisStore) ListAgentDispatches(_ context.Context, roomName livekit.RoomName) ([]*livekit.AgentDispatch, error) {
key := AgentDispatchPrefix + string(roomName)
dispatches, err := redisLoadMany[livekit.AgentDispatch](s.ctx, s, key)
if err != nil {
return nil, err
}
dMap := make(map[string]*livekit.AgentDispatch)
for _, di := range dispatches {
dMap[di.Id] = di
}
key = AgentJobPrefix + string(roomName)
jobs, err := redisLoadMany[livekit.Job](s.ctx, s, key)
if err != nil {
return nil, err
}
// Associate job to dispatch
for _, jb := range jobs {
di := dMap[jb.DispatchId]
if di == nil {
continue
}
if di.State == nil {
di.State = &livekit.AgentDispatchState{}
}
di.State.Jobs = append(di.State.Jobs, jb)
}
return dispatches, nil
}
func (s *RedisStore) StoreAgentJob(_ context.Context, job *livekit.Job) error {
if job.Room == nil {
return psrpc.NewErrorf(psrpc.InvalidArgument, "job doesn't have a valid Room field")
}
key := AgentJobPrefix + string(job.Room.Name)
jb := proto.Clone(job).(*livekit.Job)
// Do not store room with the job
jb.Room = nil
// Only store the participant identity
if jb.Participant != nil {
jb.Participant = &livekit.ParticipantInfo{
Identity: jb.Participant.Identity,
}
}
data, err := proto.Marshal(jb)
if err != nil {
return err
}
return s.rc.HSet(s.ctx, key, job.Id, data).Err()
}
func (s *RedisStore) DeleteAgentJob(_ context.Context, job *livekit.Job) error {
if job.Room == nil {
return psrpc.NewErrorf(psrpc.InvalidArgument, "job doesn't have a valid Room field")
}
key := AgentJobPrefix + string(job.Room.Name)
return s.rc.HDel(s.ctx, key, job.Id).Err()
}
func redisStoreOne(ctx context.Context, s *RedisStore, key, id string, p proto.Message) error {
if id == "" {
return errors.New("id is not set")

View File

@@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/ingress"
"github.com/livekit/protocol/livekit"
@@ -293,6 +294,92 @@ func TestIngressStore(t *testing.T) {
require.Equal(t, "", infos[0].RoomName)
}
func TestAgentStore(t *testing.T) {
ctx := context.Background()
rs := redisStore(t)
ad := &livekit.AgentDispatch{
Id: "dispatch_id",
AgentName: "agent_name",
Metadata: "metadata",
Room: "room_name",
State: &livekit.AgentDispatchState{
CreatedAt: 1,
DeletedAt: 2,
Jobs: []*livekit.Job{
&livekit.Job{
Id: "job_id",
DispatchId: "dispatch_id",
Type: livekit.JobType_JT_PUBLISHER,
Room: &livekit.Room{
Name: "room_name",
},
Participant: &livekit.ParticipantInfo{
Identity: "identity",
Name: "name",
},
Namespace: "ns",
Metadata: "metadata",
AgentName: "agent_name",
State: &livekit.JobState{
Status: livekit.JobStatus_JS_RUNNING,
StartedAt: 3,
EndedAt: 4,
Error: "error",
},
},
},
},
}
err := rs.StoreAgentDispatch(ctx, ad)
require.NoError(t, err)
rd, err := rs.ListAgentDispatches(ctx, "not_a_room")
require.NoError(t, err)
require.Equal(t, 0, len(rd))
rd, err = rs.ListAgentDispatches(ctx, "room_name")
require.NoError(t, err)
require.Equal(t, 1, len(rd))
expected := proto.Clone(ad).(*livekit.AgentDispatch)
expected.State.Jobs = nil
require.True(t, proto.Equal(expected, rd[0]))
err = rs.StoreAgentJob(ctx, ad.State.Jobs[0])
require.NoError(t, err)
rd, err = rs.ListAgentDispatches(ctx, "room_name")
require.NoError(t, err)
require.Equal(t, 1, len(rd))
expected = proto.Clone(ad).(*livekit.AgentDispatch)
expected.State.Jobs[0].Room = nil
expected.State.Jobs[0].Participant = &livekit.ParticipantInfo{
Identity: "identity",
}
require.True(t, proto.Equal(expected, rd[0]))
err = rs.DeleteAgentJob(ctx, ad.State.Jobs[0])
require.NoError(t, err)
rd, err = rs.ListAgentDispatches(ctx, "room_name")
require.NoError(t, err)
require.Equal(t, 1, len(rd))
expected = proto.Clone(ad).(*livekit.AgentDispatch)
expected.State.Jobs = nil
require.True(t, proto.Equal(expected, rd[0]))
err = rs.DeleteAgentDispatch(ctx, ad)
require.NoError(t, err)
rd, err = rs.ListAgentDispatches(ctx, "room_name")
require.NoError(t, err)
require.Equal(t, 0, len(rd))
}
func compareIngressInfo(t *testing.T, expected, v *livekit.IngressInfo) {
require.Equal(t, expected.IngressId, v.IngressId)
require.Equal(t, expected.StreamKey, v.StreamKey)

View File

@@ -74,6 +74,7 @@ type RoomManager struct {
telemetry telemetry.TelemetryService
clientConfManager clientconfiguration.ClientConfigurationManager
agentClient agent.Client
agentStore AgentStore
egressLauncher rtc.EgressLauncher
versionGenerator utils.TimedVersionGenerator
turnAuthHandler *TURNAuthHandler
@@ -97,6 +98,7 @@ func NewLocalRoomManager(
telemetry telemetry.TelemetryService,
clientConfManager clientconfiguration.ClientConfigurationManager,
agentClient agent.Client,
agentStore AgentStore,
egressLauncher rtc.EgressLauncher,
versionGenerator utils.TimedVersionGenerator,
turnAuthHandler *TURNAuthHandler,
@@ -118,6 +120,7 @@ func NewLocalRoomManager(
clientConfManager: clientConfManager,
egressLauncher: egressLauncher,
agentClient: agentClient,
agentStore: agentStore,
versionGenerator: versionGenerator,
turnAuthHandler: turnAuthHandler,
bus: bus,
@@ -557,7 +560,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
}
// construct ice servers
newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, r.config.Room, &r.config.Audio, r.serverInfo, r.telemetry, r.agentClient, r.egressLauncher)
newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, r.config.Room, &r.config.Audio, r.serverInfo, r.telemetry, r.agentClient, r.agentStore, r.egressLauncher)
roomTopic := rpc.FormatRoomTopic(roomName)
roomServer := must.Get(rpc.NewTypedRoomServer(r, r.bus))

View File

@@ -103,15 +103,6 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
defer done()
if created {
_, internal, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Name), true)
if internal.AgentDispatches != nil {
err = s.launchAgents(ctx, rm, internal.AgentDispatches)
if err != nil {
return nil, err
}
}
if req.Egress != nil && req.Egress.Room != nil {
// ensure room name matches
req.Egress.Room.RoomName = req.Name
@@ -130,19 +121,6 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
return rm, nil
}
func (s *RoomService) launchAgents(ctx context.Context, rm *livekit.Room, agents []*livekit.RoomAgentDispatch) error {
for _, ag := range agents {
go s.agentClient.LaunchJob(ctx, &agent.JobRequest{
JobType: livekit.JobType_JT_ROOM,
Room: rm,
Metadata: ag.Metadata,
AgentName: ag.AgentName,
})
}
return nil
}
func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsRequest) (*livekit.ListRoomsResponse, error) {
AppendLogFields(ctx, "room", req.Names)
err := EnsureListPermission(ctx)

View File

@@ -517,31 +517,13 @@ func (s *RTCService) startConnection(
timeout time.Duration,
) (connectionResult, *livekit.SignalResponse, error) {
var cr connectionResult
var created bool
var err error
cr.Room, created, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName), ConfigName: GetRoomConfiguration(ctx)})
cr.Room, _, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName), ConfigName: GetRoomConfiguration(ctx)})
if err != nil {
return cr, nil, err
}
if created && s.agentClient != nil {
// TODO Have CreateRoom return the RoomInternal object?
_, internal, err := s.store.LoadRoom(ctx, livekit.RoomName(roomName), true)
if err != nil {
return connectionResult{}, nil, err
}
for _, ag := range internal.AgentDispatches {
go s.agentClient.LaunchJob(ctx, &agent.JobRequest{
JobType: livekit.JobType_JT_ROOM,
Room: cr.Room,
Metadata: ag.Metadata,
AgentName: ag.AgentName,
})
}
}
// this needs to be started first *before* using router functions on this node
cr.StartParticipantSignalResults, err = s.router.StartParticipantSignal(ctx, roomName, pi)
if err != nil {

View File

@@ -0,0 +1,424 @@
// Code generated by counterfeiter. DO NOT EDIT.
package servicefakes
import (
"context"
"sync"
"github.com/livekit/livekit-server/pkg/service"
"github.com/livekit/protocol/livekit"
)
type FakeAgentStore struct {
DeleteAgentDispatchStub func(context.Context, *livekit.AgentDispatch) error
deleteAgentDispatchMutex sync.RWMutex
deleteAgentDispatchArgsForCall []struct {
arg1 context.Context
arg2 *livekit.AgentDispatch
}
deleteAgentDispatchReturns struct {
result1 error
}
deleteAgentDispatchReturnsOnCall map[int]struct {
result1 error
}
DeleteAgentJobStub func(context.Context, *livekit.Job) error
deleteAgentJobMutex sync.RWMutex
deleteAgentJobArgsForCall []struct {
arg1 context.Context
arg2 *livekit.Job
}
deleteAgentJobReturns struct {
result1 error
}
deleteAgentJobReturnsOnCall map[int]struct {
result1 error
}
ListAgentDispatchesStub func(context.Context, livekit.RoomName) ([]*livekit.AgentDispatch, error)
listAgentDispatchesMutex sync.RWMutex
listAgentDispatchesArgsForCall []struct {
arg1 context.Context
arg2 livekit.RoomName
}
listAgentDispatchesReturns struct {
result1 []*livekit.AgentDispatch
result2 error
}
listAgentDispatchesReturnsOnCall map[int]struct {
result1 []*livekit.AgentDispatch
result2 error
}
StoreAgentDispatchStub func(context.Context, *livekit.AgentDispatch) error
storeAgentDispatchMutex sync.RWMutex
storeAgentDispatchArgsForCall []struct {
arg1 context.Context
arg2 *livekit.AgentDispatch
}
storeAgentDispatchReturns struct {
result1 error
}
storeAgentDispatchReturnsOnCall map[int]struct {
result1 error
}
StoreAgentJobStub func(context.Context, *livekit.Job) error
storeAgentJobMutex sync.RWMutex
storeAgentJobArgsForCall []struct {
arg1 context.Context
arg2 *livekit.Job
}
storeAgentJobReturns struct {
result1 error
}
storeAgentJobReturnsOnCall map[int]struct {
result1 error
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
func (fake *FakeAgentStore) DeleteAgentDispatch(arg1 context.Context, arg2 *livekit.AgentDispatch) error {
fake.deleteAgentDispatchMutex.Lock()
ret, specificReturn := fake.deleteAgentDispatchReturnsOnCall[len(fake.deleteAgentDispatchArgsForCall)]
fake.deleteAgentDispatchArgsForCall = append(fake.deleteAgentDispatchArgsForCall, struct {
arg1 context.Context
arg2 *livekit.AgentDispatch
}{arg1, arg2})
stub := fake.DeleteAgentDispatchStub
fakeReturns := fake.deleteAgentDispatchReturns
fake.recordInvocation("DeleteAgentDispatch", []interface{}{arg1, arg2})
fake.deleteAgentDispatchMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeAgentStore) DeleteAgentDispatchCallCount() int {
fake.deleteAgentDispatchMutex.RLock()
defer fake.deleteAgentDispatchMutex.RUnlock()
return len(fake.deleteAgentDispatchArgsForCall)
}
func (fake *FakeAgentStore) DeleteAgentDispatchCalls(stub func(context.Context, *livekit.AgentDispatch) error) {
fake.deleteAgentDispatchMutex.Lock()
defer fake.deleteAgentDispatchMutex.Unlock()
fake.DeleteAgentDispatchStub = stub
}
func (fake *FakeAgentStore) DeleteAgentDispatchArgsForCall(i int) (context.Context, *livekit.AgentDispatch) {
fake.deleteAgentDispatchMutex.RLock()
defer fake.deleteAgentDispatchMutex.RUnlock()
argsForCall := fake.deleteAgentDispatchArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeAgentStore) DeleteAgentDispatchReturns(result1 error) {
fake.deleteAgentDispatchMutex.Lock()
defer fake.deleteAgentDispatchMutex.Unlock()
fake.DeleteAgentDispatchStub = nil
fake.deleteAgentDispatchReturns = struct {
result1 error
}{result1}
}
func (fake *FakeAgentStore) DeleteAgentDispatchReturnsOnCall(i int, result1 error) {
fake.deleteAgentDispatchMutex.Lock()
defer fake.deleteAgentDispatchMutex.Unlock()
fake.DeleteAgentDispatchStub = nil
if fake.deleteAgentDispatchReturnsOnCall == nil {
fake.deleteAgentDispatchReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.deleteAgentDispatchReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeAgentStore) DeleteAgentJob(arg1 context.Context, arg2 *livekit.Job) error {
fake.deleteAgentJobMutex.Lock()
ret, specificReturn := fake.deleteAgentJobReturnsOnCall[len(fake.deleteAgentJobArgsForCall)]
fake.deleteAgentJobArgsForCall = append(fake.deleteAgentJobArgsForCall, struct {
arg1 context.Context
arg2 *livekit.Job
}{arg1, arg2})
stub := fake.DeleteAgentJobStub
fakeReturns := fake.deleteAgentJobReturns
fake.recordInvocation("DeleteAgentJob", []interface{}{arg1, arg2})
fake.deleteAgentJobMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeAgentStore) DeleteAgentJobCallCount() int {
fake.deleteAgentJobMutex.RLock()
defer fake.deleteAgentJobMutex.RUnlock()
return len(fake.deleteAgentJobArgsForCall)
}
func (fake *FakeAgentStore) DeleteAgentJobCalls(stub func(context.Context, *livekit.Job) error) {
fake.deleteAgentJobMutex.Lock()
defer fake.deleteAgentJobMutex.Unlock()
fake.DeleteAgentJobStub = stub
}
func (fake *FakeAgentStore) DeleteAgentJobArgsForCall(i int) (context.Context, *livekit.Job) {
fake.deleteAgentJobMutex.RLock()
defer fake.deleteAgentJobMutex.RUnlock()
argsForCall := fake.deleteAgentJobArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeAgentStore) DeleteAgentJobReturns(result1 error) {
fake.deleteAgentJobMutex.Lock()
defer fake.deleteAgentJobMutex.Unlock()
fake.DeleteAgentJobStub = nil
fake.deleteAgentJobReturns = struct {
result1 error
}{result1}
}
func (fake *FakeAgentStore) DeleteAgentJobReturnsOnCall(i int, result1 error) {
fake.deleteAgentJobMutex.Lock()
defer fake.deleteAgentJobMutex.Unlock()
fake.DeleteAgentJobStub = nil
if fake.deleteAgentJobReturnsOnCall == nil {
fake.deleteAgentJobReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.deleteAgentJobReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeAgentStore) ListAgentDispatches(arg1 context.Context, arg2 livekit.RoomName) ([]*livekit.AgentDispatch, error) {
fake.listAgentDispatchesMutex.Lock()
ret, specificReturn := fake.listAgentDispatchesReturnsOnCall[len(fake.listAgentDispatchesArgsForCall)]
fake.listAgentDispatchesArgsForCall = append(fake.listAgentDispatchesArgsForCall, struct {
arg1 context.Context
arg2 livekit.RoomName
}{arg1, arg2})
stub := fake.ListAgentDispatchesStub
fakeReturns := fake.listAgentDispatchesReturns
fake.recordInvocation("ListAgentDispatches", []interface{}{arg1, arg2})
fake.listAgentDispatchesMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeAgentStore) ListAgentDispatchesCallCount() int {
fake.listAgentDispatchesMutex.RLock()
defer fake.listAgentDispatchesMutex.RUnlock()
return len(fake.listAgentDispatchesArgsForCall)
}
func (fake *FakeAgentStore) ListAgentDispatchesCalls(stub func(context.Context, livekit.RoomName) ([]*livekit.AgentDispatch, error)) {
fake.listAgentDispatchesMutex.Lock()
defer fake.listAgentDispatchesMutex.Unlock()
fake.ListAgentDispatchesStub = stub
}
func (fake *FakeAgentStore) ListAgentDispatchesArgsForCall(i int) (context.Context, livekit.RoomName) {
fake.listAgentDispatchesMutex.RLock()
defer fake.listAgentDispatchesMutex.RUnlock()
argsForCall := fake.listAgentDispatchesArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeAgentStore) ListAgentDispatchesReturns(result1 []*livekit.AgentDispatch, result2 error) {
fake.listAgentDispatchesMutex.Lock()
defer fake.listAgentDispatchesMutex.Unlock()
fake.ListAgentDispatchesStub = nil
fake.listAgentDispatchesReturns = struct {
result1 []*livekit.AgentDispatch
result2 error
}{result1, result2}
}
func (fake *FakeAgentStore) ListAgentDispatchesReturnsOnCall(i int, result1 []*livekit.AgentDispatch, result2 error) {
fake.listAgentDispatchesMutex.Lock()
defer fake.listAgentDispatchesMutex.Unlock()
fake.ListAgentDispatchesStub = nil
if fake.listAgentDispatchesReturnsOnCall == nil {
fake.listAgentDispatchesReturnsOnCall = make(map[int]struct {
result1 []*livekit.AgentDispatch
result2 error
})
}
fake.listAgentDispatchesReturnsOnCall[i] = struct {
result1 []*livekit.AgentDispatch
result2 error
}{result1, result2}
}
func (fake *FakeAgentStore) StoreAgentDispatch(arg1 context.Context, arg2 *livekit.AgentDispatch) error {
fake.storeAgentDispatchMutex.Lock()
ret, specificReturn := fake.storeAgentDispatchReturnsOnCall[len(fake.storeAgentDispatchArgsForCall)]
fake.storeAgentDispatchArgsForCall = append(fake.storeAgentDispatchArgsForCall, struct {
arg1 context.Context
arg2 *livekit.AgentDispatch
}{arg1, arg2})
stub := fake.StoreAgentDispatchStub
fakeReturns := fake.storeAgentDispatchReturns
fake.recordInvocation("StoreAgentDispatch", []interface{}{arg1, arg2})
fake.storeAgentDispatchMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeAgentStore) StoreAgentDispatchCallCount() int {
fake.storeAgentDispatchMutex.RLock()
defer fake.storeAgentDispatchMutex.RUnlock()
return len(fake.storeAgentDispatchArgsForCall)
}
func (fake *FakeAgentStore) StoreAgentDispatchCalls(stub func(context.Context, *livekit.AgentDispatch) error) {
fake.storeAgentDispatchMutex.Lock()
defer fake.storeAgentDispatchMutex.Unlock()
fake.StoreAgentDispatchStub = stub
}
func (fake *FakeAgentStore) StoreAgentDispatchArgsForCall(i int) (context.Context, *livekit.AgentDispatch) {
fake.storeAgentDispatchMutex.RLock()
defer fake.storeAgentDispatchMutex.RUnlock()
argsForCall := fake.storeAgentDispatchArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeAgentStore) StoreAgentDispatchReturns(result1 error) {
fake.storeAgentDispatchMutex.Lock()
defer fake.storeAgentDispatchMutex.Unlock()
fake.StoreAgentDispatchStub = nil
fake.storeAgentDispatchReturns = struct {
result1 error
}{result1}
}
func (fake *FakeAgentStore) StoreAgentDispatchReturnsOnCall(i int, result1 error) {
fake.storeAgentDispatchMutex.Lock()
defer fake.storeAgentDispatchMutex.Unlock()
fake.StoreAgentDispatchStub = nil
if fake.storeAgentDispatchReturnsOnCall == nil {
fake.storeAgentDispatchReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.storeAgentDispatchReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeAgentStore) StoreAgentJob(arg1 context.Context, arg2 *livekit.Job) error {
fake.storeAgentJobMutex.Lock()
ret, specificReturn := fake.storeAgentJobReturnsOnCall[len(fake.storeAgentJobArgsForCall)]
fake.storeAgentJobArgsForCall = append(fake.storeAgentJobArgsForCall, struct {
arg1 context.Context
arg2 *livekit.Job
}{arg1, arg2})
stub := fake.StoreAgentJobStub
fakeReturns := fake.storeAgentJobReturns
fake.recordInvocation("StoreAgentJob", []interface{}{arg1, arg2})
fake.storeAgentJobMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeAgentStore) StoreAgentJobCallCount() int {
fake.storeAgentJobMutex.RLock()
defer fake.storeAgentJobMutex.RUnlock()
return len(fake.storeAgentJobArgsForCall)
}
func (fake *FakeAgentStore) StoreAgentJobCalls(stub func(context.Context, *livekit.Job) error) {
fake.storeAgentJobMutex.Lock()
defer fake.storeAgentJobMutex.Unlock()
fake.StoreAgentJobStub = stub
}
func (fake *FakeAgentStore) StoreAgentJobArgsForCall(i int) (context.Context, *livekit.Job) {
fake.storeAgentJobMutex.RLock()
defer fake.storeAgentJobMutex.RUnlock()
argsForCall := fake.storeAgentJobArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeAgentStore) StoreAgentJobReturns(result1 error) {
fake.storeAgentJobMutex.Lock()
defer fake.storeAgentJobMutex.Unlock()
fake.StoreAgentJobStub = nil
fake.storeAgentJobReturns = struct {
result1 error
}{result1}
}
func (fake *FakeAgentStore) StoreAgentJobReturnsOnCall(i int, result1 error) {
fake.storeAgentJobMutex.Lock()
defer fake.storeAgentJobMutex.Unlock()
fake.StoreAgentJobStub = nil
if fake.storeAgentJobReturnsOnCall == nil {
fake.storeAgentJobReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.storeAgentJobReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeAgentStore) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.deleteAgentDispatchMutex.RLock()
defer fake.deleteAgentDispatchMutex.RUnlock()
fake.deleteAgentJobMutex.RLock()
defer fake.deleteAgentJobMutex.RUnlock()
fake.listAgentDispatchesMutex.RLock()
defer fake.listAgentDispatchesMutex.RUnlock()
fake.storeAgentDispatchMutex.RLock()
defer fake.storeAgentDispatchMutex.RUnlock()
fake.storeAgentJobMutex.RLock()
defer fake.storeAgentJobMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
}
return copiedInvocations
}
func (fake *FakeAgentStore) recordInvocation(key string, args []interface{}) {
fake.invocationsMutex.Lock()
defer fake.invocationsMutex.Unlock()
if fake.invocations == nil {
fake.invocations = map[string][][]interface{}{}
}
if fake.invocations[key] == nil {
fake.invocations[key] = [][]interface{}{}
}
fake.invocations[key] = append(fake.invocations[key], args)
}
var _ service.AgentStore = new(FakeAgentStore)

View File

@@ -80,6 +80,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
NewRTCService,
NewAgentService,
agent.NewAgentClient,
getAgentStore,
getSignalRelayConfig,
NewDefaultSignalServer,
routing.NewSignalClient,
@@ -200,6 +201,17 @@ func getIngressStore(s ObjectStore) IngressStore {
}
}
func getAgentStore(s ObjectStore) AgentStore {
switch store := s.(type) {
case *RedisStore:
return store
case *LocalStore:
return store
default:
return nil
}
}
func getIngressConfig(conf *config.Config) *config.IngressConfig {
return &conf.Ingress
}

View File

@@ -119,10 +119,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
return nil, err
}
clientConfigurationManager := createClientConfiguration()
agentStore := getAgentStore(objectStore)
timedVersionGenerator := utils.NewDefaultTimedVersionGenerator()
turnAuthHandler := NewTURNAuthHandler(keyProvider)
forwardStats := createForwardStats(conf)
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, client, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus, forwardStats)
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, client, agentStore, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus, forwardStats)
if err != nil {
return nil, err
}
@@ -251,6 +252,17 @@ func getIngressStore(s ObjectStore) IngressStore {
}
}
func getAgentStore(s ObjectStore) AgentStore {
switch store := s.(type) {
case *RedisStore:
return store
case *LocalStore:
return store
default:
return nil
}
}
func getIngressConfig(conf *config.Config) *config.IngressConfig {
return &conf.Ingress
}