diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index 98d040749..5e317c243 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -12,6 +12,7 @@ import ( "github.com/livekit/livekit-server/pkg/agent" "github.com/livekit/livekit-server/pkg/agent/testutils" + "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils/guid" @@ -20,6 +21,7 @@ import ( ) func TestAgent(t *testing.T) { + testAgentName := "test_agent" t.Run("dispatched jobs are assigned to a worker", func(t *testing.T) { bus := psrpc.NewLocalMessageBus() @@ -28,7 +30,7 @@ func TestAgent(t *testing.T) { t.Cleanup(server.Close) worker := server.SimulateAgentWorker() - worker.Register("test", livekit.JobType_JT_ROOM) + worker.Register(testAgentName, livekit.JobType_JT_ROOM) jobAssignments := worker.JobAssignments.Observe() job := &livekit.Job{ @@ -36,14 +38,19 @@ func TestAgent(t *testing.T) { DispatchId: guid.New(guid.AgentDispatchPrefix), Type: livekit.JobType_JT_ROOM, Room: &livekit.Room{}, - AgentName: "test", + AgentName: testAgentName, } - _, err := client.JobRequest(context.Background(), "test", agent.RoomAgentTopic, job) + _, err := client.JobRequest(context.Background(), testAgentName, agent.RoomAgentTopic, job) require.NoError(t, err) select { case a := <-jobAssignments.Events(): require.EqualValues(t, job.Id, a.Job.Id) + v, err := auth.ParseAPIToken(a.Token) + require.NoError(t, err) + claims, err := v.Verify(server.TestAPISecret) + require.NoError(t, err) + require.Equal(t, testAgentName, claims.Attributes[agent.AgentNameAttributeKey]) case <-time.After(time.Second): require.Fail(t, "job assignment timeout") } diff --git a/pkg/agent/testutils/server.go b/pkg/agent/testutils/server.go index c695afb4e..3eb2b0e16 100644 --- a/pkg/agent/testutils/server.go +++ b/pkg/agent/testutils/server.go @@ -33,6 +33,8 @@ type AgentService interface { type TestServer struct { AgentService + TestAPIKey string + TestAPISecret string } func NewTestServer(bus psrpc.MessageBus) *TestServer { @@ -46,7 +48,7 @@ func NewTestServer(bus psrpc.MessageBus) *TestServer { } func NewTestServerWithService(s AgentService) *TestServer { - return &TestServer{s} + return &TestServer{s, "test", "verysecretsecret"} } type SimulatedWorkerOptions struct { diff --git a/pkg/agent/worker.go b/pkg/agent/worker.go index a130f2af5..3b8ff4e2b 100644 --- a/pkg/agent/worker.go +++ b/pkg/agent/worker.go @@ -43,6 +43,8 @@ var ( ErrDuplicateJobAssignment = errors.New("duplicate job assignment") ) +const AgentNameAttributeKey = "lk.agent_name" + type WorkerProtocolVersion int const CurrentProtocol = 1 @@ -365,6 +367,11 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) (*livekit.JobS } job.State.ParticipantIdentity = res.ParticipantIdentity + attributes := res.ParticipantAttributes + if attributes == nil { + attributes = make(map[string]string) + } + attributes[AgentNameAttributeKey] = w.AgentName token, err := pagent.BuildAgentToken( w.apiKey, @@ -373,7 +380,7 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) (*livekit.JobS res.ParticipantIdentity, res.ParticipantName, res.ParticipantMetadata, - res.ParticipantAttributes, + attributes, w.Permissions, ) if err != nil { diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 0c62abbbf..3b8913f0c 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -89,23 +89,23 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService, objectStore) topicFormatter := rpc.NewTopicFormatter() - v, err := rpc.NewTypedRoomClient(clientParams) + roomClient, err := rpc.NewTypedRoomClient(clientParams) if err != nil { return nil, err } - v2, err := rpc.NewTypedParticipantClient(clientParams) + participantClient, err := rpc.NewTypedParticipantClient(clientParams) if err != nil { return nil, err } - roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2) + roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient) if err != nil { return nil, err } - v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) + agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) if err != nil { return nil, err } - agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router) + agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router) egressService := NewEgressService(egressClient, rtcEgressLauncher, ioInfoService, roomService) ingressConfig := getIngressConfig(conf) ingressClient, err := rpc.NewIngressClient(clientParams) @@ -120,11 +120,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService) rtcService := NewRTCService(conf, roomAllocator, router, telemetryService) - v4, err := rpc.NewTypedWHIPParticipantClient(clientParams) + whipParticipantClient, err := rpc.NewTypedWHIPParticipantClient(clientParams) if err != nil { return nil, err } - serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, v4) + serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, whipParticipantClient) if err != nil { return nil, err }