diff --git a/go.mod b/go.mod index 7af9f3998..55eb8d853 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 - github.com/livekit/protocol v1.19.2-0.20240716163657-b5404e5c9aae + github.com/livekit/protocol v1.19.2-0.20240716225317-497688ff49e4 github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 64e5e45f0..c599e440c 100644 --- a/go.sum +++ b/go.sum @@ -167,8 +167,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 h1:F1L8inJoynwIAYpZENNYS+1xHJMF5RFRorsnAlcxfSY= github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.19.2-0.20240716163657-b5404e5c9aae h1:Pj1/pdRCCZZYuKRTxbjYtw1eClmkoYxG3nhQNcVA5no= -github.com/livekit/protocol v1.19.2-0.20240716163657-b5404e5c9aae/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4= +github.com/livekit/protocol v1.19.2-0.20240716225317-497688ff49e4 h1:8inXM3a1qo6Y17dbfFIy/SafcFcz3Ynva2Y5+0UfwK4= +github.com/livekit/protocol v1.19.2-0.20240716225317-497688ff49e4/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= diff --git a/pkg/agent/client.go b/pkg/agent/client.go index f5051527d..13d4aba9b 100644 --- a/pkg/agent/client.go +++ b/pkg/agent/client.go @@ -42,13 +42,14 @@ const ( type Client interface { // LaunchJob starts a room or participant job on an agent. // it will launch a job once for each worker in each namespace - LaunchJob(ctx context.Context, desc *JobRequest) + LaunchJob(ctx context.Context, desc *JobRequest) *serverutils.IncrementalDispatcher[*livekit.Job] Stop() error } type JobRequest struct { - JobType livekit.JobType - Room *livekit.Room + DispatchId string + JobType livekit.JobType + Room *livekit.Room // only set for participant jobs Participant *livekit.ParticipantInfo Metadata string @@ -111,37 +112,53 @@ func NewAgentClient(bus psrpc.MessageBus) (Client, error) { return c, nil } -func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) { +func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) *serverutils.IncrementalDispatcher[*livekit.Job] { jobTypeTopic := RoomAgentTopic if desc.JobType == livekit.JobType_JT_PUBLISHER { jobTypeTopic = PublisherAgentTopic } + ret := serverutils.NewIncrementalDispatcher[*livekit.Job]() dispatcher := c.getDispatcher(desc.AgentName, desc.JobType) if dispatcher == nil { logger.Infow("not dispatching agent job since no worker is available", "agentName", desc.AgentName, "jobType", desc.JobType) - return + return ret } + var wg sync.WaitGroup dispatcher.ForEach(func(curNs string) { topic := GetAgentTopic(desc.AgentName, curNs) + + wg.Add(1) c.workers.Submit(func() { + defer wg.Done() // The cached agent parameters do not provide the exact combination of available job type/agent name/namespace, so some of the JobRequest RPC may not trigger any worker - _, err := c.client.JobRequest(context.Background(), topic, jobTypeTopic, &livekit.Job{ + job := &livekit.Job{ Id: utils.NewGuid(utils.AgentJobPrefix), + DispatchId: desc.DispatchId, Type: desc.JobType, Room: desc.Room, Participant: desc.Participant, Namespace: curNs, AgentName: desc.AgentName, Metadata: desc.Metadata, - }) - if err != nil { - logger.Infow("failed to send job request", "error", err, "namespace", curNs, "jobType", desc.JobType) } + resp, err := c.client.JobRequest(context.Background(), topic, jobTypeTopic, job) + if err != nil { + logger.Infow("failed to send job request", "error", err, "namespace", curNs, "jobType", desc.JobType, "agentName", desc.AgentName) + return + } + job.State = resp.State + ret.Add(job) }) }) + c.workers.Submit(func() { + wg.Wait() + ret.Done() + }) + + return ret } func (c *agentClient) getDispatcher(agName string, jobType livekit.JobType) *serverutils.IncrementalDispatcher[string] { diff --git a/pkg/agent/worker.go b/pkg/agent/worker.go index d09229aac..143feee79 100644 --- a/pkg/agent/worker.go +++ b/pkg/agent/worker.go @@ -211,8 +211,11 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error { w.mu.Unlock() }() + now := time.Now() if job.State == nil { - job.State = &livekit.JobState{} + job.State = &livekit.JobState{ + UpdatedAt: now.UnixNano(), + } } w.sendRequest(&livekit.ServerMessage{Message: &livekit.ServerMessage_Availability{ diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 3671f1b0a..b0c515df0 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -35,6 +35,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" + "github.com/livekit/protocol/utils/guid" "github.com/livekit/livekit-server/pkg/agent" "github.com/livekit/livekit-server/pkg/config" @@ -62,6 +63,16 @@ var ( roomUpdateInterval = 5 * time.Second // frequency to update room participant counts ) +// Duplicate the service.AgentStore interface to avoid a rtc -> service -> rtc import cycle +type AgentStore interface { + StoreAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error + DeleteAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error + ListAgentDispatches(ctx context.Context, roomName livekit.RoomName) ([]*livekit.AgentDispatch, error) + + StoreAgentJob(ctx context.Context, job *livekit.Job) error + DeleteAgentJob(ctx context.Context, job *livekit.Job) error +} + type broadcastOptions struct { skipSource bool immediate bool @@ -95,15 +106,17 @@ type Room struct { protoProxy *utils.ProtoProxy[*livekit.Room] Logger logger.Logger - config WebRTCConfig - audioConfig *config.AudioConfig - serverInfo *livekit.ServerInfo - telemetry telemetry.TelemetryService - egressLauncher EgressLauncher - trackManager *RoomTrackManager + config WebRTCConfig + audioConfig *config.AudioConfig + serverInfo *livekit.ServerInfo + telemetry telemetry.TelemetryService + egressLauncher EgressLauncher + trackManager *RoomTrackManager + agentDispatches []*livekit.AgentDispatch // agents agentClient agent.Client + agentStore AgentStore // map of identity -> Participant participants map[livekit.ParticipantIdentity]types.LocalParticipant @@ -142,6 +155,7 @@ func NewRoom( serverInfo *livekit.ServerInfo, telemetry telemetry.TelemetryService, agentClient agent.Client, + agentStore AgentStore, egressLauncher EgressLauncher, ) *Room { r := &Room{ @@ -157,6 +171,7 @@ func NewRoom( telemetry: telemetry, egressLauncher: egressLauncher, agentClient: agentClient, + agentStore: agentStore, trackManager: NewRoomTrackManager(), serverInfo: serverInfo, participants: make(map[livekit.ParticipantIdentity]types.LocalParticipant), @@ -182,6 +197,10 @@ func NewRoom( } r.protoProxy = utils.NewProtoProxy[*livekit.Room](roomUpdateInterval, r.updateProto) + r.createAgentDispatchesFromRoomAgent() + + r.launchRoomAgents() + go r.audioUpdateWorker() go r.connectionQualityWorker() go r.changeUpdateWorker() @@ -1403,23 +1422,46 @@ func (r *Room) simulationCleanupWorker() { } } +func (r *Room) launchRoomAgents() { + if r.agentClient == nil { + return + } + + for _, ag := range r.agentDispatches { + go func() { + inc := r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{ + JobType: livekit.JobType_JT_ROOM, + Room: r.ToProto(), + Metadata: ag.Metadata, + AgentName: ag.AgentName, + DispatchId: ag.Id, + }) + inc.ForEach(func(job *livekit.Job) { + r.agentStore.StoreAgentJob(context.Background(), job) + }) + }() + } +} + func (r *Room) launchPublisherAgents(p types.Participant) { if p == nil || p.IsDependent() || r.agentClient == nil { return } - if r.internal == nil { - return - } - - for _, ag := range r.internal.AgentDispatches { - go r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{ - JobType: livekit.JobType_JT_PUBLISHER, - Room: r.ToProto(), - Participant: p.ToProto(), - Metadata: ag.Metadata, - AgentName: ag.AgentName, - }) + for _, ag := range r.agentDispatches { + go func() { + inc := r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{ + JobType: livekit.JobType_JT_PUBLISHER, + Room: r.ToProto(), + Participant: p.ToProto(), + Metadata: ag.Metadata, + AgentName: ag.AgentName, + DispatchId: ag.Id, + }) + inc.ForEach(func(job *livekit.Job) { + r.agentStore.StoreAgentJob(context.Background(), job) + }) + }() } } @@ -1440,6 +1482,32 @@ func (r *Room) DebugInfo() map[string]interface{} { return info } +func (r *Room) createAgentDispatchesFromRoomAgent() { + now := time.Now() + if r.internal == nil { + return + } + + for _, ag := range r.internal.AgentDispatches { + ad := &livekit.AgentDispatch{ + Id: guid.New(guid.AgentDispatchPrefix), + AgentName: ag.AgentName, + Metadata: ag.Metadata, + Room: r.protoRoom.Name, + State: &livekit.AgentDispatchState{ + CreatedAt: now.UnixNano(), + }, + } + r.agentDispatches = append(r.agentDispatches, ad) + if r.agentStore != nil { + err := r.agentStore.StoreAgentDispatch(context.Background(), ad) + if err != nil { + r.Logger.Warnw("failed storing room dispatch", err) + } + } + } +} + // ------------------------------------------------------------ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, kind livekit.DataPacket_Kind, dp *livekit.DataPacket, logger logger.Logger) { diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 40a8b30ad..a85f361c1 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -813,7 +813,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room { Region: "testregion", }, telemetry.NewTelemetryService(webhook.NewDefaultNotifier("", "", nil), &telemetryfakes.FakeAnalyticsService{}), - nil, nil, + nil, nil, nil, ) for i := 0; i < opts.num+opts.numHidden; i++ { identity := livekit.ParticipantIdentity(fmt.Sprintf("p%d", i)) diff --git a/pkg/service/agentservice.go b/pkg/service/agentservice.go index 826680fbc..734609354 100644 --- a/pkg/service/agentservice.go +++ b/pkg/service/agentservice.go @@ -289,7 +289,7 @@ func (h *AgentHandler) HandleWorkerDeregister(w *agent.Worker) { } } -func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*emptypb.Empty, error) { +func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*rpc.JobRequestResponse, error) { key := workerKey{job.AgentName, job.Namespace, job.Type} attempted := make(map[*agent.Worker]struct{}) for { @@ -340,7 +340,9 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*empty return nil, err } - return &emptypb.Empty{}, nil + return &rpc.JobRequestResponse{ + State: job.State, + }, nil } } diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index 9ebce8271..c55998fbc 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -95,3 +95,13 @@ type SIPStore interface { ListSIPDispatchRule(ctx context.Context) ([]*livekit.SIPDispatchRuleInfo, error) DeleteSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error } + +//counterfeiter:generate . AgentStore +type AgentStore interface { + StoreAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error + DeleteAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error + ListAgentDispatches(ctx context.Context, roomName livekit.RoomName) ([]*livekit.AgentDispatch, error) + + StoreAgentJob(ctx context.Context, job *livekit.Job) error + DeleteAgentJob(ctx context.Context, job *livekit.Job) error +} diff --git a/pkg/service/localstore.go b/pkg/service/localstore.go index a651c24a0..b1b498509 100644 --- a/pkg/service/localstore.go +++ b/pkg/service/localstore.go @@ -20,6 +20,7 @@ import ( "time" "github.com/thoas/go-funk" + "google.golang.org/protobuf/proto" "github.com/livekit/protocol/livekit" ) @@ -32,16 +33,21 @@ type LocalStore struct { // map of roomName => { identity: participant } participants map[livekit.RoomName]map[livekit.ParticipantIdentity]*livekit.ParticipantInfo + agentDispatches map[livekit.RoomName]map[string]*livekit.AgentDispatch + agentJobs map[livekit.RoomName]map[string]*livekit.Job + lock sync.RWMutex globalLock sync.Mutex } func NewLocalStore() *LocalStore { return &LocalStore{ - rooms: make(map[livekit.RoomName]*livekit.Room), - roomInternal: make(map[livekit.RoomName]*livekit.RoomInternal), - participants: make(map[livekit.RoomName]map[livekit.ParticipantIdentity]*livekit.ParticipantInfo), - lock: sync.RWMutex{}, + rooms: make(map[livekit.RoomName]*livekit.Room), + roomInternal: make(map[livekit.RoomName]*livekit.RoomInternal), + participants: make(map[livekit.RoomName]map[livekit.ParticipantIdentity]*livekit.ParticipantInfo), + agentDispatches: make(map[livekit.RoomName]map[string]*livekit.AgentDispatch), + agentJobs: make(map[livekit.RoomName]map[string]*livekit.Job), + lock: sync.RWMutex{}, } } @@ -102,6 +108,8 @@ func (s *LocalStore) DeleteRoom(ctx context.Context, roomName livekit.RoomName) delete(s.participants, livekit.RoomName(room.Name)) delete(s.rooms, livekit.RoomName(room.Name)) delete(s.roomInternal, livekit.RoomName(room.Name)) + delete(s.agentDispatches, livekit.RoomName(room.Name)) + delete(s.agentJobs, livekit.RoomName(room.Name)) return nil } @@ -171,3 +179,103 @@ func (s *LocalStore) DeleteParticipant(_ context.Context, roomName livekit.RoomN } return nil } + +func (s *LocalStore) StoreAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error { + s.lock.Lock() + defer s.lock.Unlock() + + clone := proto.Clone(dispatch).(*livekit.AgentDispatch) + if clone.State != nil { + clone.State.Jobs = nil + } + + roomDispatches := s.agentDispatches[livekit.RoomName(dispatch.Room)] + if roomDispatches == nil { + roomDispatches = make(map[string]*livekit.AgentDispatch) + s.agentDispatches[livekit.RoomName(dispatch.Room)] = roomDispatches + } + + roomDispatches[clone.Id] = clone + return nil +} + +func (s *LocalStore) DeleteAgentDispatch(ctx context.Context, dispatch *livekit.AgentDispatch) error { + s.lock.Lock() + defer s.lock.Unlock() + + roomDispatches := s.agentDispatches[livekit.RoomName(dispatch.Room)] + if roomDispatches != nil { + delete(roomDispatches, dispatch.Id) + } + + return nil +} + +func (s *LocalStore) ListAgentDispatches(ctx context.Context, roomName livekit.RoomName) ([]*livekit.AgentDispatch, error) { + s.lock.Lock() + defer s.lock.Unlock() + + agentDispatches := s.agentDispatches[roomName] + if agentDispatches == nil { + return nil, nil + } + agentJobs := s.agentJobs[roomName] + + var js []*livekit.Job + if agentJobs != nil { + for _, j := range agentJobs { + js = append(js, proto.Clone(j).(*livekit.Job)) + } + } + var ds []*livekit.AgentDispatch + + m := make(map[string]*livekit.AgentDispatch) + for _, d := range agentDispatches { + clone := proto.Clone(d).(*livekit.AgentDispatch) + m[d.Id] = clone + ds = append(ds, clone) + } + + for _, j := range js { + d := m[j.DispatchId] + if d != nil { + d.State.Jobs = append(d.State.Jobs, proto.Clone(j).(*livekit.Job)) + } + } + + return ds, nil +} + +func (s *LocalStore) StoreAgentJob(ctx context.Context, job *livekit.Job) error { + s.lock.Lock() + defer s.lock.Unlock() + + clone := proto.Clone(job).(*livekit.Job) + clone.Room = nil + if clone.Participant != nil { + clone.Participant = &livekit.ParticipantInfo{ + Identity: clone.Participant.Identity, + } + } + + roomJobs := s.agentJobs[livekit.RoomName(job.Room.Name)] + if roomJobs == nil { + roomJobs = make(map[string]*livekit.Job) + s.agentJobs[livekit.RoomName(job.Room.Name)] = roomJobs + } + roomJobs[clone.Id] = clone + + return nil +} + +func (s *LocalStore) DeleteAgentJob(ctx context.Context, job *livekit.Job) error { + s.lock.Lock() + defer s.lock.Unlock() + + roomJobs := s.agentJobs[livekit.RoomName(job.Room.Name)] + if roomJobs != nil { + delete(roomJobs, job.Id) + } + + return nil +} diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index 4da0ab360..8326e36f1 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -30,6 +30,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils/guid" + "github.com/livekit/psrpc" "github.com/livekit/livekit-server/version" ) @@ -58,6 +59,10 @@ const ( // RoomLockPrefix is a simple key containing a provided lock uid RoomLockPrefix = "room_lock:" + // Agents + AgentDispatchPrefix = "agent_dispatch:" + AgentJobPrefix = "agent_job:" + maxRetries = 5 ) @@ -231,6 +236,8 @@ func (s *RedisStore) DeleteRoom(ctx context.Context, roomName livekit.RoomName) pp.HDel(s.ctx, RoomsKey, string(roomName)) pp.HDel(s.ctx, RoomInternalKey, string(roomName)) pp.Del(s.ctx, RoomParticipantsPrefix+string(roomName)) + pp.Del(s.ctx, AgentDispatchPrefix+string(roomName)) + pp.Del(s.ctx, AgentJobPrefix+string(roomName)) _, err = pp.Exec(s.ctx) return err @@ -822,6 +829,99 @@ func (s *RedisStore) DeleteIngress(_ context.Context, info *livekit.IngressInfo) return nil } +func (s *RedisStore) StoreAgentDispatch(_ context.Context, dispatch *livekit.AgentDispatch) error { + di := proto.Clone(dispatch).(*livekit.AgentDispatch) + + // Do not store jobs with the dispatch + if di.State != nil { + di.State.Jobs = nil + } + + key := AgentDispatchPrefix + string(dispatch.Room) + + data, err := proto.Marshal(di) + if err != nil { + return err + } + + return s.rc.HSet(s.ctx, key, di.Id, data).Err() +} + +// This will not delete the jobs created by the dispatch +func (s *RedisStore) DeleteAgentDispatch(_ context.Context, dispatch *livekit.AgentDispatch) error { + key := AgentDispatchPrefix + string(dispatch.Room) + return s.rc.HDel(s.ctx, key, dispatch.Id).Err() +} + +func (s *RedisStore) ListAgentDispatches(_ context.Context, roomName livekit.RoomName) ([]*livekit.AgentDispatch, error) { + key := AgentDispatchPrefix + string(roomName) + dispatches, err := redisLoadMany[livekit.AgentDispatch](s.ctx, s, key) + if err != nil { + return nil, err + } + + dMap := make(map[string]*livekit.AgentDispatch) + for _, di := range dispatches { + dMap[di.Id] = di + } + + key = AgentJobPrefix + string(roomName) + jobs, err := redisLoadMany[livekit.Job](s.ctx, s, key) + if err != nil { + return nil, err + } + + // Associate job to dispatch + for _, jb := range jobs { + di := dMap[jb.DispatchId] + if di == nil { + continue + } + if di.State == nil { + di.State = &livekit.AgentDispatchState{} + } + di.State.Jobs = append(di.State.Jobs, jb) + } + + return dispatches, nil +} + +func (s *RedisStore) StoreAgentJob(_ context.Context, job *livekit.Job) error { + if job.Room == nil { + return psrpc.NewErrorf(psrpc.InvalidArgument, "job doesn't have a valid Room field") + } + + key := AgentJobPrefix + string(job.Room.Name) + + jb := proto.Clone(job).(*livekit.Job) + + // Do not store room with the job + jb.Room = nil + + // Only store the participant identity + if jb.Participant != nil { + jb.Participant = &livekit.ParticipantInfo{ + Identity: jb.Participant.Identity, + } + } + + data, err := proto.Marshal(jb) + if err != nil { + return err + } + + return s.rc.HSet(s.ctx, key, job.Id, data).Err() +} + +func (s *RedisStore) DeleteAgentJob(_ context.Context, job *livekit.Job) error { + if job.Room == nil { + return psrpc.NewErrorf(psrpc.InvalidArgument, "job doesn't have a valid Room field") + } + + key := AgentJobPrefix + string(job.Room.Name) + return s.rc.HDel(s.ctx, key, job.Id).Err() +} + func redisStoreOne(ctx context.Context, s *RedisStore, key, id string, p proto.Message) error { if id == "" { return errors.New("id is not set") diff --git a/pkg/service/redisstore_test.go b/pkg/service/redisstore_test.go index d62db1025..13a6905e6 100644 --- a/pkg/service/redisstore_test.go +++ b/pkg/service/redisstore_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" + "google.golang.org/protobuf/proto" "github.com/livekit/protocol/ingress" "github.com/livekit/protocol/livekit" @@ -293,6 +294,92 @@ func TestIngressStore(t *testing.T) { require.Equal(t, "", infos[0].RoomName) } +func TestAgentStore(t *testing.T) { + ctx := context.Background() + rs := redisStore(t) + + ad := &livekit.AgentDispatch{ + Id: "dispatch_id", + AgentName: "agent_name", + Metadata: "metadata", + Room: "room_name", + State: &livekit.AgentDispatchState{ + CreatedAt: 1, + DeletedAt: 2, + Jobs: []*livekit.Job{ + &livekit.Job{ + Id: "job_id", + DispatchId: "dispatch_id", + Type: livekit.JobType_JT_PUBLISHER, + Room: &livekit.Room{ + Name: "room_name", + }, + Participant: &livekit.ParticipantInfo{ + Identity: "identity", + Name: "name", + }, + Namespace: "ns", + Metadata: "metadata", + AgentName: "agent_name", + State: &livekit.JobState{ + Status: livekit.JobStatus_JS_RUNNING, + StartedAt: 3, + EndedAt: 4, + Error: "error", + }, + }, + }, + }, + } + + err := rs.StoreAgentDispatch(ctx, ad) + require.NoError(t, err) + + rd, err := rs.ListAgentDispatches(ctx, "not_a_room") + require.NoError(t, err) + require.Equal(t, 0, len(rd)) + + rd, err = rs.ListAgentDispatches(ctx, "room_name") + require.NoError(t, err) + require.Equal(t, 1, len(rd)) + + expected := proto.Clone(ad).(*livekit.AgentDispatch) + expected.State.Jobs = nil + require.True(t, proto.Equal(expected, rd[0])) + + err = rs.StoreAgentJob(ctx, ad.State.Jobs[0]) + require.NoError(t, err) + + rd, err = rs.ListAgentDispatches(ctx, "room_name") + require.NoError(t, err) + require.Equal(t, 1, len(rd)) + + expected = proto.Clone(ad).(*livekit.AgentDispatch) + expected.State.Jobs[0].Room = nil + expected.State.Jobs[0].Participant = &livekit.ParticipantInfo{ + Identity: "identity", + } + require.True(t, proto.Equal(expected, rd[0])) + + err = rs.DeleteAgentJob(ctx, ad.State.Jobs[0]) + require.NoError(t, err) + + rd, err = rs.ListAgentDispatches(ctx, "room_name") + require.NoError(t, err) + require.Equal(t, 1, len(rd)) + + expected = proto.Clone(ad).(*livekit.AgentDispatch) + expected.State.Jobs = nil + require.True(t, proto.Equal(expected, rd[0])) + + err = rs.DeleteAgentDispatch(ctx, ad) + require.NoError(t, err) + + rd, err = rs.ListAgentDispatches(ctx, "room_name") + require.NoError(t, err) + require.Equal(t, 0, len(rd)) +} + func compareIngressInfo(t *testing.T, expected, v *livekit.IngressInfo) { require.Equal(t, expected.IngressId, v.IngressId) require.Equal(t, expected.StreamKey, v.StreamKey) diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index c2451e611..06f783a43 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -74,6 +74,7 @@ type RoomManager struct { telemetry telemetry.TelemetryService clientConfManager clientconfiguration.ClientConfigurationManager agentClient agent.Client + agentStore AgentStore egressLauncher rtc.EgressLauncher versionGenerator utils.TimedVersionGenerator turnAuthHandler *TURNAuthHandler @@ -97,6 +98,7 @@ func NewLocalRoomManager( telemetry telemetry.TelemetryService, clientConfManager clientconfiguration.ClientConfigurationManager, agentClient agent.Client, + agentStore AgentStore, egressLauncher rtc.EgressLauncher, versionGenerator utils.TimedVersionGenerator, turnAuthHandler *TURNAuthHandler, @@ -118,6 +120,7 @@ func NewLocalRoomManager( clientConfManager: clientConfManager, egressLauncher: egressLauncher, agentClient: agentClient, + agentStore: agentStore, versionGenerator: versionGenerator, turnAuthHandler: turnAuthHandler, bus: bus, @@ -557,7 +560,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room } // construct ice servers - newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, r.config.Room, &r.config.Audio, r.serverInfo, r.telemetry, r.agentClient, r.egressLauncher) + newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, r.config.Room, &r.config.Audio, r.serverInfo, r.telemetry, r.agentClient, r.agentStore, r.egressLauncher) roomTopic := rpc.FormatRoomTopic(roomName) roomServer := must.Get(rpc.NewTypedRoomServer(r, r.bus)) diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 9aacef06f..f4b4b8116 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -103,15 +103,6 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq defer done() if created { - _, internal, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Name), true) - - if internal.AgentDispatches != nil { - err = s.launchAgents(ctx, rm, internal.AgentDispatches) - if err != nil { - return nil, err - } - } - if req.Egress != nil && req.Egress.Room != nil { // ensure room name matches req.Egress.Room.RoomName = req.Name @@ -130,19 +121,6 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return rm, nil } -func (s *RoomService) launchAgents(ctx context.Context, rm *livekit.Room, agents []*livekit.RoomAgentDispatch) error { - for _, ag := range agents { - go s.agentClient.LaunchJob(ctx, &agent.JobRequest{ - JobType: livekit.JobType_JT_ROOM, - Room: rm, - Metadata: ag.Metadata, - AgentName: ag.AgentName, - }) - } - - return nil -} - func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsRequest) (*livekit.ListRoomsResponse, error) { AppendLogFields(ctx, "room", req.Names) err := EnsureListPermission(ctx) diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 15e0af0ed..aaba72990 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -517,31 +517,13 @@ func (s *RTCService) startConnection( timeout time.Duration, ) (connectionResult, *livekit.SignalResponse, error) { var cr connectionResult - var created bool var err error - cr.Room, created, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName), ConfigName: GetRoomConfiguration(ctx)}) + cr.Room, _, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName), ConfigName: GetRoomConfiguration(ctx)}) if err != nil { return cr, nil, err } - if created && s.agentClient != nil { - // TODO Have CreateRoom return the RoomInternal object? - _, internal, err := s.store.LoadRoom(ctx, livekit.RoomName(roomName), true) - if err != nil { - return connectionResult{}, nil, err - } - - for _, ag := range internal.AgentDispatches { - go s.agentClient.LaunchJob(ctx, &agent.JobRequest{ - JobType: livekit.JobType_JT_ROOM, - Room: cr.Room, - Metadata: ag.Metadata, - AgentName: ag.AgentName, - }) - } - } - // this needs to be started first *before* using router functions on this node cr.StartParticipantSignalResults, err = s.router.StartParticipantSignal(ctx, roomName, pi) if err != nil { diff --git a/pkg/service/servicefakes/fake_agent_store.go b/pkg/service/servicefakes/fake_agent_store.go new file mode 100644 index 000000000..71d43f567 --- /dev/null +++ b/pkg/service/servicefakes/fake_agent_store.go @@ -0,0 +1,424 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package servicefakes + +import ( + "context" + "sync" + + "github.com/livekit/livekit-server/pkg/service" + "github.com/livekit/protocol/livekit" +) + +type FakeAgentStore struct { + DeleteAgentDispatchStub func(context.Context, *livekit.AgentDispatch) error + deleteAgentDispatchMutex sync.RWMutex + deleteAgentDispatchArgsForCall []struct { + arg1 context.Context + arg2 *livekit.AgentDispatch + } + deleteAgentDispatchReturns struct { + result1 error + } + deleteAgentDispatchReturnsOnCall map[int]struct { + result1 error + } + DeleteAgentJobStub func(context.Context, *livekit.Job) error + deleteAgentJobMutex sync.RWMutex + deleteAgentJobArgsForCall []struct { + arg1 context.Context + arg2 *livekit.Job + } + deleteAgentJobReturns struct { + result1 error + } + deleteAgentJobReturnsOnCall map[int]struct { + result1 error + } + ListAgentDispatchesStub func(context.Context, livekit.RoomName) ([]*livekit.AgentDispatch, error) + listAgentDispatchesMutex sync.RWMutex + listAgentDispatchesArgsForCall []struct { + arg1 context.Context + arg2 livekit.RoomName + } + listAgentDispatchesReturns struct { + result1 []*livekit.AgentDispatch + result2 error + } + listAgentDispatchesReturnsOnCall map[int]struct { + result1 []*livekit.AgentDispatch + result2 error + } + StoreAgentDispatchStub func(context.Context, *livekit.AgentDispatch) error + storeAgentDispatchMutex sync.RWMutex + storeAgentDispatchArgsForCall []struct { + arg1 context.Context + arg2 *livekit.AgentDispatch + } + storeAgentDispatchReturns struct { + result1 error + } + storeAgentDispatchReturnsOnCall map[int]struct { + result1 error + } + StoreAgentJobStub func(context.Context, *livekit.Job) error + storeAgentJobMutex sync.RWMutex + storeAgentJobArgsForCall []struct { + arg1 context.Context + arg2 *livekit.Job + } + storeAgentJobReturns struct { + result1 error + } + storeAgentJobReturnsOnCall map[int]struct { + result1 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeAgentStore) DeleteAgentDispatch(arg1 context.Context, arg2 *livekit.AgentDispatch) error { + fake.deleteAgentDispatchMutex.Lock() + ret, specificReturn := fake.deleteAgentDispatchReturnsOnCall[len(fake.deleteAgentDispatchArgsForCall)] + fake.deleteAgentDispatchArgsForCall = append(fake.deleteAgentDispatchArgsForCall, struct { + arg1 context.Context + arg2 *livekit.AgentDispatch + }{arg1, arg2}) + stub := fake.DeleteAgentDispatchStub + fakeReturns := fake.deleteAgentDispatchReturns + fake.recordInvocation("DeleteAgentDispatch", []interface{}{arg1, arg2}) + fake.deleteAgentDispatchMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeAgentStore) DeleteAgentDispatchCallCount() int { + fake.deleteAgentDispatchMutex.RLock() + defer fake.deleteAgentDispatchMutex.RUnlock() + return len(fake.deleteAgentDispatchArgsForCall) +} + +func (fake *FakeAgentStore) DeleteAgentDispatchCalls(stub func(context.Context, *livekit.AgentDispatch) error) { + fake.deleteAgentDispatchMutex.Lock() + defer fake.deleteAgentDispatchMutex.Unlock() + fake.DeleteAgentDispatchStub = stub +} + +func (fake *FakeAgentStore) DeleteAgentDispatchArgsForCall(i int) (context.Context, *livekit.AgentDispatch) { + fake.deleteAgentDispatchMutex.RLock() + defer fake.deleteAgentDispatchMutex.RUnlock() + argsForCall := fake.deleteAgentDispatchArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeAgentStore) DeleteAgentDispatchReturns(result1 error) { + fake.deleteAgentDispatchMutex.Lock() + defer fake.deleteAgentDispatchMutex.Unlock() + fake.DeleteAgentDispatchStub = nil + fake.deleteAgentDispatchReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeAgentStore) DeleteAgentDispatchReturnsOnCall(i int, result1 error) { + fake.deleteAgentDispatchMutex.Lock() + defer fake.deleteAgentDispatchMutex.Unlock() + fake.DeleteAgentDispatchStub = nil + if fake.deleteAgentDispatchReturnsOnCall == nil { + fake.deleteAgentDispatchReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.deleteAgentDispatchReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeAgentStore) DeleteAgentJob(arg1 context.Context, arg2 *livekit.Job) error { + fake.deleteAgentJobMutex.Lock() + ret, specificReturn := fake.deleteAgentJobReturnsOnCall[len(fake.deleteAgentJobArgsForCall)] + fake.deleteAgentJobArgsForCall = append(fake.deleteAgentJobArgsForCall, struct { + arg1 context.Context + arg2 *livekit.Job + }{arg1, arg2}) + stub := fake.DeleteAgentJobStub + fakeReturns := fake.deleteAgentJobReturns + fake.recordInvocation("DeleteAgentJob", []interface{}{arg1, arg2}) + fake.deleteAgentJobMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeAgentStore) DeleteAgentJobCallCount() int { + fake.deleteAgentJobMutex.RLock() + defer fake.deleteAgentJobMutex.RUnlock() + return len(fake.deleteAgentJobArgsForCall) +} + +func (fake *FakeAgentStore) DeleteAgentJobCalls(stub func(context.Context, *livekit.Job) error) { + fake.deleteAgentJobMutex.Lock() + defer fake.deleteAgentJobMutex.Unlock() + fake.DeleteAgentJobStub = stub +} + +func (fake *FakeAgentStore) DeleteAgentJobArgsForCall(i int) (context.Context, *livekit.Job) { + fake.deleteAgentJobMutex.RLock() + defer fake.deleteAgentJobMutex.RUnlock() + argsForCall := fake.deleteAgentJobArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeAgentStore) DeleteAgentJobReturns(result1 error) { + fake.deleteAgentJobMutex.Lock() + defer fake.deleteAgentJobMutex.Unlock() + fake.DeleteAgentJobStub = nil + fake.deleteAgentJobReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeAgentStore) DeleteAgentJobReturnsOnCall(i int, result1 error) { + fake.deleteAgentJobMutex.Lock() + defer fake.deleteAgentJobMutex.Unlock() + fake.DeleteAgentJobStub = nil + if fake.deleteAgentJobReturnsOnCall == nil { + fake.deleteAgentJobReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.deleteAgentJobReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeAgentStore) ListAgentDispatches(arg1 context.Context, arg2 livekit.RoomName) ([]*livekit.AgentDispatch, error) { + fake.listAgentDispatchesMutex.Lock() + ret, specificReturn := fake.listAgentDispatchesReturnsOnCall[len(fake.listAgentDispatchesArgsForCall)] + fake.listAgentDispatchesArgsForCall = append(fake.listAgentDispatchesArgsForCall, struct { + arg1 context.Context + arg2 livekit.RoomName + }{arg1, arg2}) + stub := fake.ListAgentDispatchesStub + fakeReturns := fake.listAgentDispatchesReturns + fake.recordInvocation("ListAgentDispatches", []interface{}{arg1, arg2}) + fake.listAgentDispatchesMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeAgentStore) ListAgentDispatchesCallCount() int { + fake.listAgentDispatchesMutex.RLock() + defer fake.listAgentDispatchesMutex.RUnlock() + return len(fake.listAgentDispatchesArgsForCall) +} + +func (fake *FakeAgentStore) ListAgentDispatchesCalls(stub func(context.Context, livekit.RoomName) ([]*livekit.AgentDispatch, error)) { + fake.listAgentDispatchesMutex.Lock() + defer fake.listAgentDispatchesMutex.Unlock() + fake.ListAgentDispatchesStub = stub +} + +func (fake *FakeAgentStore) ListAgentDispatchesArgsForCall(i int) (context.Context, livekit.RoomName) { + fake.listAgentDispatchesMutex.RLock() + defer fake.listAgentDispatchesMutex.RUnlock() + argsForCall := fake.listAgentDispatchesArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeAgentStore) ListAgentDispatchesReturns(result1 []*livekit.AgentDispatch, result2 error) { + fake.listAgentDispatchesMutex.Lock() + defer fake.listAgentDispatchesMutex.Unlock() + fake.ListAgentDispatchesStub = nil + fake.listAgentDispatchesReturns = struct { + result1 []*livekit.AgentDispatch + result2 error + }{result1, result2} +} + +func (fake *FakeAgentStore) ListAgentDispatchesReturnsOnCall(i int, result1 []*livekit.AgentDispatch, result2 error) { + fake.listAgentDispatchesMutex.Lock() + defer fake.listAgentDispatchesMutex.Unlock() + fake.ListAgentDispatchesStub = nil + if fake.listAgentDispatchesReturnsOnCall == nil { + fake.listAgentDispatchesReturnsOnCall = make(map[int]struct { + result1 []*livekit.AgentDispatch + result2 error + }) + } + fake.listAgentDispatchesReturnsOnCall[i] = struct { + result1 []*livekit.AgentDispatch + result2 error + }{result1, result2} +} + +func (fake *FakeAgentStore) StoreAgentDispatch(arg1 context.Context, arg2 *livekit.AgentDispatch) error { + fake.storeAgentDispatchMutex.Lock() + ret, specificReturn := fake.storeAgentDispatchReturnsOnCall[len(fake.storeAgentDispatchArgsForCall)] + fake.storeAgentDispatchArgsForCall = append(fake.storeAgentDispatchArgsForCall, struct { + arg1 context.Context + arg2 *livekit.AgentDispatch + }{arg1, arg2}) + stub := fake.StoreAgentDispatchStub + fakeReturns := fake.storeAgentDispatchReturns + fake.recordInvocation("StoreAgentDispatch", []interface{}{arg1, arg2}) + fake.storeAgentDispatchMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeAgentStore) StoreAgentDispatchCallCount() int { + fake.storeAgentDispatchMutex.RLock() + defer fake.storeAgentDispatchMutex.RUnlock() + return len(fake.storeAgentDispatchArgsForCall) +} + +func (fake *FakeAgentStore) StoreAgentDispatchCalls(stub func(context.Context, *livekit.AgentDispatch) error) { + fake.storeAgentDispatchMutex.Lock() + defer fake.storeAgentDispatchMutex.Unlock() + fake.StoreAgentDispatchStub = stub +} + +func (fake *FakeAgentStore) StoreAgentDispatchArgsForCall(i int) (context.Context, *livekit.AgentDispatch) { + fake.storeAgentDispatchMutex.RLock() + defer fake.storeAgentDispatchMutex.RUnlock() + argsForCall := fake.storeAgentDispatchArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeAgentStore) StoreAgentDispatchReturns(result1 error) { + fake.storeAgentDispatchMutex.Lock() + defer fake.storeAgentDispatchMutex.Unlock() + fake.StoreAgentDispatchStub = nil + fake.storeAgentDispatchReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeAgentStore) StoreAgentDispatchReturnsOnCall(i int, result1 error) { + fake.storeAgentDispatchMutex.Lock() + defer fake.storeAgentDispatchMutex.Unlock() + fake.StoreAgentDispatchStub = nil + if fake.storeAgentDispatchReturnsOnCall == nil { + fake.storeAgentDispatchReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.storeAgentDispatchReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeAgentStore) StoreAgentJob(arg1 context.Context, arg2 *livekit.Job) error { + fake.storeAgentJobMutex.Lock() + ret, specificReturn := fake.storeAgentJobReturnsOnCall[len(fake.storeAgentJobArgsForCall)] + fake.storeAgentJobArgsForCall = append(fake.storeAgentJobArgsForCall, struct { + arg1 context.Context + arg2 *livekit.Job + }{arg1, arg2}) + stub := fake.StoreAgentJobStub + fakeReturns := fake.storeAgentJobReturns + fake.recordInvocation("StoreAgentJob", []interface{}{arg1, arg2}) + fake.storeAgentJobMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeAgentStore) StoreAgentJobCallCount() int { + fake.storeAgentJobMutex.RLock() + defer fake.storeAgentJobMutex.RUnlock() + return len(fake.storeAgentJobArgsForCall) +} + +func (fake *FakeAgentStore) StoreAgentJobCalls(stub func(context.Context, *livekit.Job) error) { + fake.storeAgentJobMutex.Lock() + defer fake.storeAgentJobMutex.Unlock() + fake.StoreAgentJobStub = stub +} + +func (fake *FakeAgentStore) StoreAgentJobArgsForCall(i int) (context.Context, *livekit.Job) { + fake.storeAgentJobMutex.RLock() + defer fake.storeAgentJobMutex.RUnlock() + argsForCall := fake.storeAgentJobArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeAgentStore) StoreAgentJobReturns(result1 error) { + fake.storeAgentJobMutex.Lock() + defer fake.storeAgentJobMutex.Unlock() + fake.StoreAgentJobStub = nil + fake.storeAgentJobReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeAgentStore) StoreAgentJobReturnsOnCall(i int, result1 error) { + fake.storeAgentJobMutex.Lock() + defer fake.storeAgentJobMutex.Unlock() + fake.StoreAgentJobStub = nil + if fake.storeAgentJobReturnsOnCall == nil { + fake.storeAgentJobReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.storeAgentJobReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeAgentStore) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.deleteAgentDispatchMutex.RLock() + defer fake.deleteAgentDispatchMutex.RUnlock() + fake.deleteAgentJobMutex.RLock() + defer fake.deleteAgentJobMutex.RUnlock() + fake.listAgentDispatchesMutex.RLock() + defer fake.listAgentDispatchesMutex.RUnlock() + fake.storeAgentDispatchMutex.RLock() + defer fake.storeAgentDispatchMutex.RUnlock() + fake.storeAgentJobMutex.RLock() + defer fake.storeAgentJobMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeAgentStore) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ service.AgentStore = new(FakeAgentStore) diff --git a/pkg/service/wire.go b/pkg/service/wire.go index c856c7384..75b3509a1 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -80,6 +80,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live NewRTCService, NewAgentService, agent.NewAgentClient, + getAgentStore, getSignalRelayConfig, NewDefaultSignalServer, routing.NewSignalClient, @@ -200,6 +201,17 @@ func getIngressStore(s ObjectStore) IngressStore { } } +func getAgentStore(s ObjectStore) AgentStore { + switch store := s.(type) { + case *RedisStore: + return store + case *LocalStore: + return store + default: + return nil + } +} + func getIngressConfig(conf *config.Config) *config.IngressConfig { return &conf.Ingress } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index de2ecf664..15f81061f 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -119,10 +119,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live return nil, err } clientConfigurationManager := createClientConfiguration() + agentStore := getAgentStore(objectStore) timedVersionGenerator := utils.NewDefaultTimedVersionGenerator() turnAuthHandler := NewTURNAuthHandler(keyProvider) forwardStats := createForwardStats(conf) - roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, client, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus, forwardStats) + roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, client, agentStore, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus, forwardStats) if err != nil { return nil, err } @@ -251,6 +252,17 @@ func getIngressStore(s ObjectStore) IngressStore { } } +func getAgentStore(s ObjectStore) AgentStore { + switch store := s.(type) { + case *RedisStore: + return store + case *LocalStore: + return store + default: + return nil + } +} + func getIngressConfig(conf *config.Config) *config.IngressConfig { return &conf.Ingress }