From a877ba2352945fa1d350d78ec28b49a80fa574d8 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Thu, 18 Jul 2024 13:36:43 -0700 Subject: [PATCH] Partial support for agent dispatch management (#2872) - Store agent dispaches independently of room agents on rtc.Room - Serialize agent dispatches in rtc.Room - Support for agent dispatch and job serialization in redis The agent Job object references denormalized Room and ParticipantInfo object. When storing Jobs, this sets the Room to nil, and only stores the Participant identity field. When read back, these fields need to be set to their current value. --- go.mod | 2 +- go.sum | 4 +- pkg/agent/client.go | 35 +- pkg/agent/worker.go | 5 +- pkg/rtc/room.go | 104 ++++- pkg/rtc/room_test.go | 2 +- pkg/service/agentservice.go | 6 +- pkg/service/interfaces.go | 10 + pkg/service/localstore.go | 116 ++++- pkg/service/redisstore.go | 100 +++++ pkg/service/redisstore_test.go | 87 ++++ pkg/service/roommanager.go | 5 +- pkg/service/roomservice.go | 22 - pkg/service/rtcservice.go | 20 +- pkg/service/servicefakes/fake_agent_store.go | 424 +++++++++++++++++++ pkg/service/wire.go | 12 + pkg/service/wire_gen.go | 14 +- 17 files changed, 887 insertions(+), 81 deletions(-) create mode 100644 pkg/service/servicefakes/fake_agent_store.go diff --git a/go.mod b/go.mod index 7af9f3998..55eb8d853 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 - github.com/livekit/protocol v1.19.2-0.20240716163657-b5404e5c9aae + github.com/livekit/protocol v1.19.2-0.20240716225317-497688ff49e4 github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 64e5e45f0..c599e440c 100644 --- a/go.sum +++ b/go.sum @@ -167,8 +167,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 h1:F1L8inJoynwIAYpZENNYS+1xHJMF5RFRorsnAlcxfSY= github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.19.2-0.20240716163657-b5404e5c9aae h1:Pj1/pdRCCZZYuKRTxbjYtw1eClmkoYxG3nhQNcVA5no= -github.com/livekit/protocol v1.19.2-0.20240716163657-b5404e5c9aae/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4= +github.com/livekit/protocol v1.19.2-0.20240716225317-497688ff49e4 h1:8inXM3a1qo6Y17dbfFIy/SafcFcz3Ynva2Y5+0UfwK4= +github.com/livekit/protocol v1.19.2-0.20240716225317-497688ff49e4/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= diff --git a/pkg/agent/client.go b/pkg/agent/client.go index f5051527d..13d4aba9b 100644 --- a/pkg/agent/client.go +++ b/pkg/agent/client.go @@ -42,13 +42,14 @@ const ( type Client interface { // LaunchJob starts a room or participant job on an agent. // it will launch a job once for each worker in each namespace - LaunchJob(ctx context.Context, desc *JobRequest) + LaunchJob(ctx context.Context, desc *JobRequest) *serverutils.IncrementalDispatcher[*livekit.Job] Stop() error } type JobRequest struct { - JobType livekit.JobType - Room *livekit.Room + DispatchId string + JobType livekit.JobType + Room *livekit.Room // only set for participant jobs Participant *livekit.ParticipantInfo Metadata string @@ -111,37 +112,53 @@ func NewAgentClient(bus psrpc.MessageBus) (Client, error) { return c, nil } -func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) { +func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) *serverutils.IncrementalDispatcher[*livekit.Job] { jobTypeTopic := RoomAgentTopic if desc.JobType == livekit.JobType_JT_PUBLISHER { jobTypeTopic = PublisherAgentTopic } + ret := serverutils.NewIncrementalDispatcher[*livekit.Job]() dispatcher := c.getDispatcher(desc.AgentName, desc.JobType) if dispatcher == nil { logger.Infow("not dispatching agent job since no worker is available", "agentName", desc.AgentName, "jobType", desc.JobType) - return + return ret } + var wg sync.WaitGroup dispatcher.ForEach(func(curNs string) { topic := GetAgentTopic(desc.AgentName, curNs) + + wg.Add(1) c.workers.Submit(func() { + defer wg.Done() // The cached agent parameters do not provide the exact combination of available job type/agent name/namespace, so some of the JobRequest RPC may not trigger any worker - _, err := c.client.JobRequest(context.Background(), topic, jobTypeTopic, &livekit.Job{ + job := &livekit.Job{ Id: utils.NewGuid(utils.AgentJobPrefix), + DispatchId: desc.DispatchId, Type: desc.JobType, Room: desc.Room, Participant: desc.Participant, Namespace: curNs, AgentName: desc.AgentName, Metadata: desc.Metadata, - }) - if err != nil { - logger.Infow("failed to send job request", "error", err, "namespace", curNs, "jobType", desc.JobType) } + resp, err := c.client.JobRequest(context.Background(), topic, jobTypeTopic, job) + if err != nil { + logger.Infow("failed to send job request", "error", err, "namespace", curNs, "jobType", desc.JobType, "agentName", desc.AgentName) + return + } + job.State = resp.State + ret.Add(job) }) }) + c.workers.Submit(func() { + wg.Wait() + ret.Done() + }) + + return ret } func (c *agentClient) getDispatcher(agName string, jobType livekit.JobType) *serverutils.IncrementalDispatcher[string] { diff --git a/pkg/agent/worker.go b/pkg/agent/worker.go index d09229aac..143feee79 100644 --- a/pkg/agent/worker.go +++ b/pkg/agent/worker.go @@ -211,8 +211,11 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error { w.mu.Unlock() }() + now := time.Now() if job.State == nil { - job.State = &livekit.JobState{} + job.State = &livekit.JobState{ + UpdatedAt: now.UnixNano(), + } } w.sendRequest(&livekit.ServerMessage{Message: &livekit.ServerMessage_Availability{ diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 3671f1b0a..b0c515df0 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -35,6 +35,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" + "github.com/livekit/protocol/utils/guid" "github.com/livekit/livekit-server/pkg/agent" "github.com/livekit/livekit-server/pkg/config" @@ -62,6 +63,16 @@ var ( roomUpdateInterval = 5 * time.Second // frequency to update room participant counts ) +// Duplicate the service.AgentStore interface to avoid a rtc -> service -> rtc import cycle +type AgentStore interface { + StoreAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error + DeleteAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error + ListAgentDispatches(ctx context.Context, roomName livekit.RoomName) ([]*livekit.AgentDispatch, error) + + StoreAgentJob(ctx context.Context, job *livekit.Job) error + DeleteAgentJob(ctx context.Context, job *livekit.Job) error +} + type broadcastOptions struct { skipSource bool immediate bool @@ -95,15 +106,17 @@ type Room struct { protoProxy *utils.ProtoProxy[*livekit.Room] Logger logger.Logger - config WebRTCConfig - audioConfig *config.AudioConfig - serverInfo *livekit.ServerInfo - telemetry telemetry.TelemetryService - egressLauncher EgressLauncher - trackManager *RoomTrackManager + config WebRTCConfig + audioConfig *config.AudioConfig + serverInfo *livekit.ServerInfo + telemetry telemetry.TelemetryService + egressLauncher EgressLauncher + trackManager *RoomTrackManager + agentDispatches []*livekit.AgentDispatch // agents agentClient agent.Client + agentStore AgentStore // map of identity -> Participant participants map[livekit.ParticipantIdentity]types.LocalParticipant @@ -142,6 +155,7 @@ func NewRoom( serverInfo *livekit.ServerInfo, telemetry telemetry.TelemetryService, agentClient agent.Client, + agentStore AgentStore, egressLauncher EgressLauncher, ) *Room { r := &Room{ @@ -157,6 +171,7 @@ func NewRoom( telemetry: telemetry, egressLauncher: egressLauncher, agentClient: agentClient, + agentStore: agentStore, trackManager: NewRoomTrackManager(), serverInfo: serverInfo, participants: make(map[livekit.ParticipantIdentity]types.LocalParticipant), @@ -182,6 +197,10 @@ func NewRoom( } r.protoProxy = utils.NewProtoProxy[*livekit.Room](roomUpdateInterval, r.updateProto) + r.createAgentDispatchesFromRoomAgent() + + r.launchRoomAgents() + go r.audioUpdateWorker() go r.connectionQualityWorker() go r.changeUpdateWorker() @@ -1403,23 +1422,46 @@ func (r *Room) simulationCleanupWorker() { } } +func (r *Room) launchRoomAgents() { + if r.agentClient == nil { + return + } + + for _, ag := range r.agentDispatches { + go func() { + inc := r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{ + JobType: livekit.JobType_JT_ROOM, + Room: r.ToProto(), + Metadata: ag.Metadata, + AgentName: ag.AgentName, + DispatchId: ag.Id, + }) + inc.ForEach(func(job *livekit.Job) { + r.agentStore.StoreAgentJob(context.Background(), job) + }) + }() + } +} + func (r *Room) launchPublisherAgents(p types.Participant) { if p == nil || p.IsDependent() || r.agentClient == nil { return } - if r.internal == nil { - return - } - - for _, ag := range r.internal.AgentDispatches { - go r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{ - JobType: livekit.JobType_JT_PUBLISHER, - Room: r.ToProto(), - Participant: p.ToProto(), - Metadata: ag.Metadata, - AgentName: ag.AgentName, - }) + for _, ag := range r.agentDispatches { + go func() { + inc := r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{ + JobType: livekit.JobType_JT_PUBLISHER, + Room: r.ToProto(), + Participant: p.ToProto(), + Metadata: ag.Metadata, + AgentName: ag.AgentName, + DispatchId: ag.Id, + }) + inc.ForEach(func(job *livekit.Job) { + r.agentStore.StoreAgentJob(context.Background(), job) + }) + }() } } @@ -1440,6 +1482,32 @@ func (r *Room) DebugInfo() map[string]interface{} { return info } +func (r *Room) createAgentDispatchesFromRoomAgent() { + now := time.Now() + if r.internal == nil { + return + } + + for _, ag := range r.internal.AgentDispatches { + ad := &livekit.AgentDispatch{ + Id: guid.New(guid.AgentDispatchPrefix), + AgentName: ag.AgentName, + Metadata: ag.Metadata, + Room: r.protoRoom.Name, + State: &livekit.AgentDispatchState{ + CreatedAt: now.UnixNano(), + }, + } + r.agentDispatches = append(r.agentDispatches, ad) + if r.agentStore != nil { + err := r.agentStore.StoreAgentDispatch(context.Background(), ad) + if err != nil { + r.Logger.Warnw("failed storing room dispatch", err) + } + } + } +} + // ------------------------------------------------------------ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, kind livekit.DataPacket_Kind, dp *livekit.DataPacket, logger logger.Logger) { diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 40a8b30ad..a85f361c1 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -813,7 +813,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room { Region: "testregion", }, telemetry.NewTelemetryService(webhook.NewDefaultNotifier("", "", nil), &telemetryfakes.FakeAnalyticsService{}), - nil, nil, + nil, nil, nil, ) for i := 0; i < opts.num+opts.numHidden; i++ { identity := livekit.ParticipantIdentity(fmt.Sprintf("p%d", i)) diff --git a/pkg/service/agentservice.go b/pkg/service/agentservice.go index 826680fbc..734609354 100644 --- a/pkg/service/agentservice.go +++ b/pkg/service/agentservice.go @@ -289,7 +289,7 @@ func (h *AgentHandler) HandleWorkerDeregister(w *agent.Worker) { } } -func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*emptypb.Empty, error) { +func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*rpc.JobRequestResponse, error) { key := workerKey{job.AgentName, job.Namespace, job.Type} attempted := make(map[*agent.Worker]struct{}) for { @@ -340,7 +340,9 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*empty return nil, err } - return &emptypb.Empty{}, nil + return &rpc.JobRequestResponse{ + State: job.State, + }, nil } } diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index 9ebce8271..c55998fbc 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -95,3 +95,13 @@ type SIPStore interface { ListSIPDispatchRule(ctx context.Context) ([]*livekit.SIPDispatchRuleInfo, error) DeleteSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error } + +//counterfeiter:generate . AgentStore +type AgentStore interface { + StoreAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error + DeleteAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error + ListAgentDispatches(ctx context.Context, roomName livekit.RoomName) ([]*livekit.AgentDispatch, error) + + StoreAgentJob(ctx context.Context, job *livekit.Job) error + DeleteAgentJob(ctx context.Context, job *livekit.Job) error +} diff --git a/pkg/service/localstore.go b/pkg/service/localstore.go index a651c24a0..b1b498509 100644 --- a/pkg/service/localstore.go +++ b/pkg/service/localstore.go @@ -20,6 +20,7 @@ import ( "time" "github.com/thoas/go-funk" + "google.golang.org/protobuf/proto" "github.com/livekit/protocol/livekit" ) @@ -32,16 +33,21 @@ type LocalStore struct { // map of roomName => { identity: participant } participants map[livekit.RoomName]map[livekit.ParticipantIdentity]*livekit.ParticipantInfo + agentDispatches map[livekit.RoomName]map[string]*livekit.AgentDispatch + agentJobs map[livekit.RoomName]map[string]*livekit.Job + lock sync.RWMutex globalLock sync.Mutex } func NewLocalStore() *LocalStore { return &LocalStore{ - rooms: make(map[livekit.RoomName]*livekit.Room), - roomInternal: make(map[livekit.RoomName]*livekit.RoomInternal), - participants: make(map[livekit.RoomName]map[livekit.ParticipantIdentity]*livekit.ParticipantInfo), - lock: sync.RWMutex{}, + rooms: make(map[livekit.RoomName]*livekit.Room), + roomInternal: make(map[livekit.RoomName]*livekit.RoomInternal), + participants: make(map[livekit.RoomName]map[livekit.ParticipantIdentity]*livekit.ParticipantInfo), + agentDispatches: make(map[livekit.RoomName]map[string]*livekit.AgentDispatch), + agentJobs: make(map[livekit.RoomName]map[string]*livekit.Job), + lock: sync.RWMutex{}, } } @@ -102,6 +108,8 @@ func (s *LocalStore) DeleteRoom(ctx context.Context, roomName livekit.RoomName) delete(s.participants, livekit.RoomName(room.Name)) delete(s.rooms, livekit.RoomName(room.Name)) delete(s.roomInternal, livekit.RoomName(room.Name)) + delete(s.agentDispatches, livekit.RoomName(room.Name)) + delete(s.agentJobs, livekit.RoomName(room.Name)) return nil } @@ -171,3 +179,103 @@ func (s *LocalStore) DeleteParticipant(_ context.Context, roomName livekit.RoomN } return nil } + +func (s *LocalStore) StoreAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error { + s.lock.Lock() + defer s.lock.Unlock() + + clone := proto.Clone(dispatch).(*livekit.AgentDispatch) + if clone.State != nil { + clone.State.Jobs = nil + } + + roomDispatches := s.agentDispatches[livekit.RoomName(dispatch.Room)] + if roomDispatches == nil { + roomDispatches = make(map[string]*livekit.AgentDispatch) + s.agentDispatches[livekit.RoomName(dispatch.Room)] = roomDispatches + } + + roomDispatches[clone.Id] = clone + return nil +} + +func (s *LocalStore) DeleteAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error { + s.lock.Lock() + defer s.lock.Unlock() + + roomDispatches := s.agentDispatches[livekit.RoomName(dispatch.Room)] + if roomDispatches != nil { + delete(roomDispatches, dispatch.Id) + } + + return nil +} + +func (s *LocalStore) ListAgentDispatches(ctx context.Context, roomName livekit.RoomName) ([]*livekit.AgentDispatch, error) { + s.lock.Lock() + defer s.lock.Unlock() + + agentDispatches := s.agentDispatches[roomName] + if agentDispatches == nil { + return nil, nil + } + agentJobs := s.agentJobs[roomName] + + var js []*livekit.Job + if agentJobs != nil { + for _, j := range agentJobs { + js = append(js, proto.Clone(j).(*livekit.Job)) + } + } + var ds []*livekit.AgentDispatch + + m := make(map[string]*livekit.AgentDispatch) + for _, d := range agentDispatches { + clone := proto.Clone(d).(*livekit.AgentDispatch) + m[d.Id] = clone + ds = append(ds, clone) + } + + for _, j := range js { + d := m[j.DispatchId] + if d != nil { + d.State.Jobs = append(d.State.Jobs, proto.Clone(j).(*livekit.Job)) + } + } + + return ds, nil +} + +func (s *LocalStore) StoreAgentJob(ctx context.Context, job *livekit.Job) error { + s.lock.Lock() + defer s.lock.Unlock() + + clone := proto.Clone(job).(*livekit.Job) + clone.Room = nil + if clone.Participant != nil { + clone.Participant = &livekit.ParticipantInfo{ + Identity: clone.Participant.Identity, + } + } + + roomJobs := s.agentJobs[livekit.RoomName(job.Room.Name)] + if roomJobs == nil { + roomJobs = make(map[string]*livekit.Job) + s.agentJobs[livekit.RoomName(job.Room.Name)] = roomJobs + } + roomJobs[clone.Id] = clone + + return nil +} + +func (s *LocalStore) DeleteAgentJob(ctx context.Context, job *livekit.Job) error { + s.lock.Lock() + defer s.lock.Unlock() + + roomJobs := s.agentJobs[livekit.RoomName(job.Room.Name)] + if roomJobs != nil { + delete(roomJobs, job.Id) + } + + return nil +} diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index 4da0ab360..8326e36f1 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -30,6 +30,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils/guid" + "github.com/livekit/psrpc" "github.com/livekit/livekit-server/version" ) @@ -58,6 +59,10 @@ const ( // RoomLockPrefix is a simple key containing a provided lock uid RoomLockPrefix = "room_lock:" + // Agents + AgentDispatchPrefix = "agent_dispatch:" + AgentJobPrefix = "agent_job:" + maxRetries = 5 ) @@ -231,6 +236,8 @@ func (s *RedisStore) DeleteRoom(ctx context.Context, roomName livekit.RoomName) pp.HDel(s.ctx, RoomsKey, string(roomName)) pp.HDel(s.ctx, RoomInternalKey, string(roomName)) pp.Del(s.ctx, RoomParticipantsPrefix+string(roomName)) + pp.Del(s.ctx, AgentDispatchPrefix+string(roomName)) + pp.Del(s.ctx, AgentJobPrefix+string(roomName)) _, err = pp.Exec(s.ctx) return err @@ -822,6 +829,99 @@ func (s *RedisStore) DeleteIngress(_ context.Context, info *livekit.IngressInfo) return nil } +func (s *RedisStore) StoreAgentDispatch(_ context.Context, dispatch *livekit.AgentDispatch) error { + di := proto.Clone(dispatch).(*livekit.AgentDispatch) + + // Do not store jobs with the dispatch + if di.State != nil { + di.State.Jobs = nil + } + + key := AgentDispatchPrefix + string(dispatch.Room) + + data, err := proto.Marshal(di) + if err != nil { + return err + } + + return s.rc.HSet(s.ctx, key, di.Id, data).Err() +} + +// This will not delete the jobs created by the dispatch +func (s *RedisStore) DeleteAgentDispatch(_ context.Context, dispatch *livekit.AgentDispatch) error { + key := AgentDispatchPrefix + string(dispatch.Room) + return s.rc.HDel(s.ctx, key, dispatch.Id).Err() +} + +func (s *RedisStore) ListAgentDispatches(_ context.Context, roomName livekit.RoomName) ([]*livekit.AgentDispatch, error) { + key := AgentDispatchPrefix + string(roomName) + dispatches, err := redisLoadMany[livekit.AgentDispatch](s.ctx, s, key) + if err != nil { + return nil, err + } + + dMap := make(map[string]*livekit.AgentDispatch) + for _, di := range dispatches { + dMap[di.Id] = di + } + + key = AgentJobPrefix + string(roomName) + jobs, err := redisLoadMany[livekit.Job](s.ctx, s, key) + if err != nil { + return nil, err + } + + // Associate job to dispatch + for _, jb := range jobs { + di := dMap[jb.DispatchId] + if di == nil { + continue + } + if di.State == nil { + di.State = &livekit.AgentDispatchState{} + } + di.State.Jobs = append(di.State.Jobs, jb) + } + + return dispatches, nil +} + +func (s *RedisStore) StoreAgentJob(_ context.Context, job *livekit.Job) error { + if job.Room == nil { + return psrpc.NewErrorf(psrpc.InvalidArgument, "job doesn't have a valid Room field") + } + + key := AgentJobPrefix + string(job.Room.Name) + + jb := proto.Clone(job).(*livekit.Job) + + // Do not store room with the job + jb.Room = nil + + // Only store the participant identity + if jb.Participant != nil { + jb.Participant = &livekit.ParticipantInfo{ + Identity: jb.Participant.Identity, + } + } + + data, err := proto.Marshal(jb) + if err != nil { + return err + } + + return s.rc.HSet(s.ctx, key, job.Id, data).Err() +} + +func (s *RedisStore) DeleteAgentJob(_ context.Context, job *livekit.Job) error { + if job.Room == nil { + return psrpc.NewErrorf(psrpc.InvalidArgument, "job doesn't have a valid Room field") + } + + key := AgentJobPrefix + string(job.Room.Name) + return s.rc.HDel(s.ctx, key, job.Id).Err() +} + func redisStoreOne(ctx context.Context, s *RedisStore, key, id string, p proto.Message) error { if id == "" { return errors.New("id is not set") diff --git a/pkg/service/redisstore_test.go b/pkg/service/redisstore_test.go index d62db1025..13a6905e6 100644 --- a/pkg/service/redisstore_test.go +++ b/pkg/service/redisstore_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" + "google.golang.org/protobuf/proto" "github.com/livekit/protocol/ingress" "github.com/livekit/protocol/livekit" @@ -293,6 +294,92 @@ func TestIngressStore(t *testing.T) { require.Equal(t, "", infos[0].RoomName) } +func TestAgentStore(t *testing.T) { + ctx := context.Background() + rs := redisStore(t) + + ad := &livekit.AgentDispatch{ + Id: "dispatch_id", + AgentName: "agent_name", + Metadata: "metadata", + Room: "room_name", + State: &livekit.AgentDispatchState{ + CreatedAt: 1, + DeletedAt: 2, + Jobs: []*livekit.Job{ + &livekit.Job{ + Id: "job_id", + DispatchId: "dispatch_id", + Type: livekit.JobType_JT_PUBLISHER, + Room: &livekit.Room{ + Name: "room_name", + }, + Participant: &livekit.ParticipantInfo{ + Identity: "identity", + Name: "name", + }, + Namespace: "ns", + Metadata: "metadata", + AgentName: "agent_name", + State: &livekit.JobState{ + Status: livekit.JobStatus_JS_RUNNING, + StartedAt: 3, + EndedAt: 4, + Error: "error", + }, + }, + }, + }, + } + + err := rs.StoreAgentDispatch(ctx, ad) + require.NoError(t, err) + + rd, err := rs.ListAgentDispatches(ctx, "not_a_room") + require.NoError(t, err) + require.Equal(t, 0, len(rd)) + + rd, err = rs.ListAgentDispatches(ctx, "room_name") + require.NoError(t, err) + require.Equal(t, 1, len(rd)) + + expected := proto.Clone(ad).(*livekit.AgentDispatch) + expected.State.Jobs = nil + require.True(t, proto.Equal(expected, rd[0])) + + err = rs.StoreAgentJob(ctx, ad.State.Jobs[0]) + require.NoError(t, err) + + rd, err = rs.ListAgentDispatches(ctx, "room_name") + require.NoError(t, err) + require.Equal(t, 1, len(rd)) + + expected = proto.Clone(ad).(*livekit.AgentDispatch) + expected.State.Jobs[0].Room = nil + expected.State.Jobs[0].Participant = &livekit.ParticipantInfo{ + Identity: "identity", + } + require.True(t, proto.Equal(expected, rd[0])) + + err = rs.DeleteAgentJob(ctx, ad.State.Jobs[0]) + require.NoError(t, err) + + rd, err = rs.ListAgentDispatches(ctx, "room_name") + require.NoError(t, err) + require.Equal(t, 1, len(rd)) + + expected = proto.Clone(ad).(*livekit.AgentDispatch) + expected.State.Jobs = nil + require.True(t, proto.Equal(expected, rd[0])) + + err = rs.DeleteAgentDispatch(ctx, ad) + require.NoError(t, err) + + rd, err = rs.ListAgentDispatches(ctx, "room_name") + require.NoError(t, err) + require.Equal(t, 0, len(rd)) +} + func compareIngressInfo(t *testing.T, expected, v *livekit.IngressInfo) { require.Equal(t, expected.IngressId, v.IngressId) require.Equal(t, expected.StreamKey, v.StreamKey) diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index c2451e611..06f783a43 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -74,6 +74,7 @@ type RoomManager struct { telemetry telemetry.TelemetryService clientConfManager clientconfiguration.ClientConfigurationManager agentClient agent.Client + agentStore AgentStore egressLauncher rtc.EgressLauncher versionGenerator utils.TimedVersionGenerator turnAuthHandler *TURNAuthHandler @@ -97,6 +98,7 @@ func NewLocalRoomManager( telemetry telemetry.TelemetryService, clientConfManager clientconfiguration.ClientConfigurationManager, agentClient agent.Client, + agentStore AgentStore, egressLauncher rtc.EgressLauncher, versionGenerator utils.TimedVersionGenerator, turnAuthHandler *TURNAuthHandler, @@ -118,6 +120,7 @@ func NewLocalRoomManager( clientConfManager: clientConfManager, egressLauncher: egressLauncher, agentClient: agentClient, + agentStore: agentStore, versionGenerator: versionGenerator, turnAuthHandler: turnAuthHandler, bus: bus, @@ -557,7 +560,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room } // construct ice servers - newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, r.config.Room, &r.config.Audio, r.serverInfo, r.telemetry, r.agentClient, r.egressLauncher) + newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, r.config.Room, &r.config.Audio, r.serverInfo, r.telemetry, r.agentClient, r.agentStore, r.egressLauncher) roomTopic := rpc.FormatRoomTopic(roomName) roomServer := must.Get(rpc.NewTypedRoomServer(r, r.bus)) diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 9aacef06f..f4b4b8116 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -103,15 +103,6 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq defer done() if created { - _, internal, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Name), true) - - if internal.AgentDispatches != nil { - err = s.launchAgents(ctx, rm, internal.AgentDispatches) - if err != nil { - return nil, err - } - } - if req.Egress != nil && req.Egress.Room != nil { // ensure room name matches req.Egress.Room.RoomName = req.Name @@ -130,19 +121,6 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return rm, nil } -func (s *RoomService) launchAgents(ctx context.Context, rm *livekit.Room, agents []*livekit.RoomAgentDispatch) error { - for _, ag := range agents { - go s.agentClient.LaunchJob(ctx, &agent.JobRequest{ - JobType: livekit.JobType_JT_ROOM, - Room: rm, - Metadata: ag.Metadata, - AgentName: ag.AgentName, - }) - } - - return nil -} - func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsRequest) (*livekit.ListRoomsResponse, error) { AppendLogFields(ctx, "room", req.Names) err := EnsureListPermission(ctx) diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 15e0af0ed..aaba72990 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -517,31 +517,13 @@ func (s *RTCService) startConnection( timeout time.Duration, ) (connectionResult, *livekit.SignalResponse, error) { var cr connectionResult - var created bool var err error - cr.Room, created, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName), ConfigName: GetRoomConfiguration(ctx)}) + cr.Room, _, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName), ConfigName: GetRoomConfiguration(ctx)}) if err != nil { return cr, nil, err } - if created && s.agentClient != nil { - // TODO Have CreateRoom return the RoomInternal object? - _, internal, err := s.store.LoadRoom(ctx, livekit.RoomName(roomName), true) - if err != nil { - return connectionResult{}, nil, err - } - - for _, ag := range internal.AgentDispatches { - go s.agentClient.LaunchJob(ctx, &agent.JobRequest{ - JobType: livekit.JobType_JT_ROOM, - Room: cr.Room, - Metadata: ag.Metadata, - AgentName: ag.AgentName, - }) - } - } - // this needs to be started first *before* using router functions on this node cr.StartParticipantSignalResults, err = s.router.StartParticipantSignal(ctx, roomName, pi) if err != nil { diff --git a/pkg/service/servicefakes/fake_agent_store.go b/pkg/service/servicefakes/fake_agent_store.go new file mode 100644 index 000000000..71d43f567 --- /dev/null +++ b/pkg/service/servicefakes/fake_agent_store.go @@ -0,0 +1,424 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package servicefakes + +import ( + "context" + "sync" + + "github.com/livekit/livekit-server/pkg/service" + "github.com/livekit/protocol/livekit" +) + +type FakeAgentStore struct { + DeleteAgentDispatchStub func(context.Context, *livekit.AgentDispatch) error + deleteAgentDispatchMutex sync.RWMutex + deleteAgentDispatchArgsForCall []struct { + arg1 context.Context + arg2 *livekit.AgentDispatch + } + deleteAgentDispatchReturns struct { + result1 error + } + deleteAgentDispatchReturnsOnCall map[int]struct { + result1 error + } + DeleteAgentJobStub func(context.Context, *livekit.Job) error + deleteAgentJobMutex sync.RWMutex + deleteAgentJobArgsForCall []struct { + arg1 context.Context + arg2 *livekit.Job + } + deleteAgentJobReturns struct { + result1 error + } + deleteAgentJobReturnsOnCall map[int]struct { + result1 error + } + ListAgentDispatchesStub func(context.Context, livekit.RoomName) ([]*livekit.AgentDispatch, error) + listAgentDispatchesMutex sync.RWMutex + listAgentDispatchesArgsForCall []struct { + arg1 context.Context + arg2 livekit.RoomName + } + listAgentDispatchesReturns struct { + result1 []*livekit.AgentDispatch + result2 error + } + listAgentDispatchesReturnsOnCall map[int]struct { + result1 []*livekit.AgentDispatch + result2 error + } + StoreAgentDispatchStub func(context.Context, *livekit.AgentDispatch) error + storeAgentDispatchMutex sync.RWMutex + storeAgentDispatchArgsForCall []struct { + arg1 context.Context + arg2 *livekit.AgentDispatch + } + storeAgentDispatchReturns struct { + result1 error + } + storeAgentDispatchReturnsOnCall map[int]struct { + result1 error + } + StoreAgentJobStub func(context.Context, *livekit.Job) error + storeAgentJobMutex sync.RWMutex + storeAgentJobArgsForCall []struct { + arg1 context.Context + arg2 *livekit.Job + } + storeAgentJobReturns struct { + result1 error + } + storeAgentJobReturnsOnCall map[int]struct { + result1 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeAgentStore) DeleteAgentDispatch(arg1 context.Context, arg2 *livekit.AgentDispatch) error { + fake.deleteAgentDispatchMutex.Lock() + ret, specificReturn := fake.deleteAgentDispatchReturnsOnCall[len(fake.deleteAgentDispatchArgsForCall)] + fake.deleteAgentDispatchArgsForCall = append(fake.deleteAgentDispatchArgsForCall, struct { + arg1 context.Context + arg2 *livekit.AgentDispatch + }{arg1, arg2}) + stub := fake.DeleteAgentDispatchStub + fakeReturns := fake.deleteAgentDispatchReturns + fake.recordInvocation("DeleteAgentDispatch", []interface{}{arg1, arg2}) + fake.deleteAgentDispatchMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeAgentStore) DeleteAgentDispatchCallCount() int { + fake.deleteAgentDispatchMutex.RLock() + defer fake.deleteAgentDispatchMutex.RUnlock() + return len(fake.deleteAgentDispatchArgsForCall) +} + +func (fake *FakeAgentStore) DeleteAgentDispatchCalls(stub func(context.Context, *livekit.AgentDispatch) error) { + fake.deleteAgentDispatchMutex.Lock() + defer fake.deleteAgentDispatchMutex.Unlock() + fake.DeleteAgentDispatchStub = stub +} + +func (fake *FakeAgentStore) DeleteAgentDispatchArgsForCall(i int) (context.Context, *livekit.AgentDispatch) { + fake.deleteAgentDispatchMutex.RLock() + defer fake.deleteAgentDispatchMutex.RUnlock() + argsForCall := fake.deleteAgentDispatchArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeAgentStore) DeleteAgentDispatchReturns(result1 error) { + fake.deleteAgentDispatchMutex.Lock() + defer fake.deleteAgentDispatchMutex.Unlock() + fake.DeleteAgentDispatchStub = nil + fake.deleteAgentDispatchReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeAgentStore) DeleteAgentDispatchReturnsOnCall(i int, result1 error) { + fake.deleteAgentDispatchMutex.Lock() + defer fake.deleteAgentDispatchMutex.Unlock() + fake.DeleteAgentDispatchStub = nil + if fake.deleteAgentDispatchReturnsOnCall == nil { + fake.deleteAgentDispatchReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.deleteAgentDispatchReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeAgentStore) DeleteAgentJob(arg1 context.Context, arg2 *livekit.Job) error { + fake.deleteAgentJobMutex.Lock() + ret, specificReturn := fake.deleteAgentJobReturnsOnCall[len(fake.deleteAgentJobArgsForCall)] + fake.deleteAgentJobArgsForCall = append(fake.deleteAgentJobArgsForCall, struct { + arg1 context.Context + arg2 *livekit.Job + }{arg1, arg2}) + stub := fake.DeleteAgentJobStub + fakeReturns := fake.deleteAgentJobReturns + fake.recordInvocation("DeleteAgentJob", []interface{}{arg1, arg2}) + fake.deleteAgentJobMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeAgentStore) DeleteAgentJobCallCount() int { + fake.deleteAgentJobMutex.RLock() + defer fake.deleteAgentJobMutex.RUnlock() + return len(fake.deleteAgentJobArgsForCall) +} + +func (fake *FakeAgentStore) DeleteAgentJobCalls(stub func(context.Context, *livekit.Job) error) { + fake.deleteAgentJobMutex.Lock() + defer fake.deleteAgentJobMutex.Unlock() + fake.DeleteAgentJobStub = stub +} + +func (fake *FakeAgentStore) DeleteAgentJobArgsForCall(i int) (context.Context, *livekit.Job) { + fake.deleteAgentJobMutex.RLock() + defer fake.deleteAgentJobMutex.RUnlock() + argsForCall := fake.deleteAgentJobArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeAgentStore) DeleteAgentJobReturns(result1 error) { + fake.deleteAgentJobMutex.Lock() + defer fake.deleteAgentJobMutex.Unlock() + fake.DeleteAgentJobStub = nil + fake.deleteAgentJobReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeAgentStore) DeleteAgentJobReturnsOnCall(i int, result1 error) { + fake.deleteAgentJobMutex.Lock() + defer fake.deleteAgentJobMutex.Unlock() + fake.DeleteAgentJobStub = nil + if fake.deleteAgentJobReturnsOnCall == nil { + fake.deleteAgentJobReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.deleteAgentJobReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeAgentStore) ListAgentDispatches(arg1 context.Context, arg2 livekit.RoomName) ([]*livekit.AgentDispatch, error) { + fake.listAgentDispatchesMutex.Lock() + ret, specificReturn := fake.listAgentDispatchesReturnsOnCall[len(fake.listAgentDispatchesArgsForCall)] + fake.listAgentDispatchesArgsForCall = append(fake.listAgentDispatchesArgsForCall, struct { + arg1 context.Context + arg2 livekit.RoomName + }{arg1, arg2}) + stub := fake.ListAgentDispatchesStub + fakeReturns := fake.listAgentDispatchesReturns + fake.recordInvocation("ListAgentDispatches", []interface{}{arg1, arg2}) + fake.listAgentDispatchesMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeAgentStore) ListAgentDispatchesCallCount() int { + fake.listAgentDispatchesMutex.RLock() + defer fake.listAgentDispatchesMutex.RUnlock() + return len(fake.listAgentDispatchesArgsForCall) +} + +func (fake *FakeAgentStore) ListAgentDispatchesCalls(stub func(context.Context, livekit.RoomName) ([]*livekit.AgentDispatch, error)) { + fake.listAgentDispatchesMutex.Lock() + defer fake.listAgentDispatchesMutex.Unlock() + fake.ListAgentDispatchesStub = stub +} + +func (fake *FakeAgentStore) ListAgentDispatchesArgsForCall(i int) (context.Context, livekit.RoomName) { + fake.listAgentDispatchesMutex.RLock() + defer fake.listAgentDispatchesMutex.RUnlock() + argsForCall := fake.listAgentDispatchesArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeAgentStore) ListAgentDispatchesReturns(result1 []*livekit.AgentDispatch, result2 error) { + fake.listAgentDispatchesMutex.Lock() + defer fake.listAgentDispatchesMutex.Unlock() + fake.ListAgentDispatchesStub = nil + fake.listAgentDispatchesReturns = struct { + result1 []*livekit.AgentDispatch + result2 error + }{result1, result2} +} + +func (fake *FakeAgentStore) ListAgentDispatchesReturnsOnCall(i int, result1 []*livekit.AgentDispatch, result2 error) { + fake.listAgentDispatchesMutex.Lock() + defer fake.listAgentDispatchesMutex.Unlock() + fake.ListAgentDispatchesStub = nil + if fake.listAgentDispatchesReturnsOnCall == nil { + fake.listAgentDispatchesReturnsOnCall = make(map[int]struct { + result1 []*livekit.AgentDispatch + result2 error + }) + } + fake.listAgentDispatchesReturnsOnCall[i] = struct { + result1 []*livekit.AgentDispatch + result2 error + }{result1, result2} +} + +func (fake *FakeAgentStore) StoreAgentDispatch(arg1 context.Context, arg2 *livekit.AgentDispatch) error { + fake.storeAgentDispatchMutex.Lock() + ret, specificReturn := fake.storeAgentDispatchReturnsOnCall[len(fake.storeAgentDispatchArgsForCall)] + fake.storeAgentDispatchArgsForCall = append(fake.storeAgentDispatchArgsForCall, struct { + arg1 context.Context + arg2 *livekit.AgentDispatch + }{arg1, arg2}) + stub := fake.StoreAgentDispatchStub + fakeReturns := fake.storeAgentDispatchReturns + fake.recordInvocation("StoreAgentDispatch", []interface{}{arg1, arg2}) + fake.storeAgentDispatchMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeAgentStore) StoreAgentDispatchCallCount() int { + fake.storeAgentDispatchMutex.RLock() + defer fake.storeAgentDispatchMutex.RUnlock() + return len(fake.storeAgentDispatchArgsForCall) +} + +func (fake *FakeAgentStore) StoreAgentDispatchCalls(stub func(context.Context, *livekit.AgentDispatch) error) { + fake.storeAgentDispatchMutex.Lock() + defer fake.storeAgentDispatchMutex.Unlock() + fake.StoreAgentDispatchStub = stub +} + +func (fake *FakeAgentStore) StoreAgentDispatchArgsForCall(i int) (context.Context, *livekit.AgentDispatch) { + fake.storeAgentDispatchMutex.RLock() + defer fake.storeAgentDispatchMutex.RUnlock() + argsForCall := fake.storeAgentDispatchArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeAgentStore) StoreAgentDispatchReturns(result1 error) { + fake.storeAgentDispatchMutex.Lock() + defer fake.storeAgentDispatchMutex.Unlock() + fake.StoreAgentDispatchStub = nil + fake.storeAgentDispatchReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeAgentStore) StoreAgentDispatchReturnsOnCall(i int, result1 error) { + fake.storeAgentDispatchMutex.Lock() + defer fake.storeAgentDispatchMutex.Unlock() + fake.StoreAgentDispatchStub = nil + if fake.storeAgentDispatchReturnsOnCall == nil { + fake.storeAgentDispatchReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.storeAgentDispatchReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeAgentStore) StoreAgentJob(arg1 context.Context, arg2 *livekit.Job) error { + fake.storeAgentJobMutex.Lock() + ret, specificReturn := fake.storeAgentJobReturnsOnCall[len(fake.storeAgentJobArgsForCall)] + fake.storeAgentJobArgsForCall = append(fake.storeAgentJobArgsForCall, struct { + arg1 context.Context + arg2 *livekit.Job + }{arg1, arg2}) + stub := fake.StoreAgentJobStub + fakeReturns := fake.storeAgentJobReturns + fake.recordInvocation("StoreAgentJob", []interface{}{arg1, arg2}) + fake.storeAgentJobMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeAgentStore) StoreAgentJobCallCount() int { + fake.storeAgentJobMutex.RLock() + defer fake.storeAgentJobMutex.RUnlock() + return len(fake.storeAgentJobArgsForCall) +} + +func (fake *FakeAgentStore) StoreAgentJobCalls(stub func(context.Context, *livekit.Job) error) { + fake.storeAgentJobMutex.Lock() + defer fake.storeAgentJobMutex.Unlock() + fake.StoreAgentJobStub = stub +} + +func (fake *FakeAgentStore) StoreAgentJobArgsForCall(i int) (context.Context, *livekit.Job) { + fake.storeAgentJobMutex.RLock() + defer fake.storeAgentJobMutex.RUnlock() + argsForCall := fake.storeAgentJobArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeAgentStore) StoreAgentJobReturns(result1 error) { + fake.storeAgentJobMutex.Lock() + defer fake.storeAgentJobMutex.Unlock() + fake.StoreAgentJobStub = nil + fake.storeAgentJobReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeAgentStore) StoreAgentJobReturnsOnCall(i int, result1 error) { + fake.storeAgentJobMutex.Lock() + defer fake.storeAgentJobMutex.Unlock() + fake.StoreAgentJobStub = nil + if fake.storeAgentJobReturnsOnCall == nil { + fake.storeAgentJobReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.storeAgentJobReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeAgentStore) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.deleteAgentDispatchMutex.RLock() + defer fake.deleteAgentDispatchMutex.RUnlock() + fake.deleteAgentJobMutex.RLock() + defer fake.deleteAgentJobMutex.RUnlock() + fake.listAgentDispatchesMutex.RLock() + defer fake.listAgentDispatchesMutex.RUnlock() + fake.storeAgentDispatchMutex.RLock() + defer fake.storeAgentDispatchMutex.RUnlock() + fake.storeAgentJobMutex.RLock() + defer fake.storeAgentJobMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeAgentStore) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ service.AgentStore = new(FakeAgentStore) diff --git a/pkg/service/wire.go b/pkg/service/wire.go index c856c7384..75b3509a1 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -80,6 +80,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live NewRTCService, NewAgentService, agent.NewAgentClient, + getAgentStore, getSignalRelayConfig, NewDefaultSignalServer, routing.NewSignalClient, @@ -200,6 +201,17 @@ func getIngressStore(s ObjectStore) IngressStore { } } +func getAgentStore(s ObjectStore) AgentStore { + switch store := s.(type) { + case *RedisStore: + return store + case *LocalStore: + return store + default: + return nil + } +} + func getIngressConfig(conf *config.Config) *config.IngressConfig { return &conf.Ingress } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index de2ecf664..15f81061f 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -119,10 +119,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live return nil, err } clientConfigurationManager := createClientConfiguration() + agentStore := getAgentStore(objectStore) timedVersionGenerator := utils.NewDefaultTimedVersionGenerator() turnAuthHandler := NewTURNAuthHandler(keyProvider) forwardStats := createForwardStats(conf) - roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, client, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus, forwardStats) + roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, client, agentStore, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus, forwardStats) if err != nil { return nil, err } @@ -251,6 +252,17 @@ func getIngressStore(s ObjectStore) IngressStore { } } +func getAgentStore(s ObjectStore) AgentStore { + switch store := s.(type) { + case *RedisStore: + return store + case *LocalStore: + return store + default: + return nil + } +} + func getIngressConfig(conf *config.Config) *config.IngressConfig { return &conf.Ingress }