Include agent_name as a participant attribute (#3914)

This commit is contained in:
David Zhao
2025-09-10 21:29:55 -07:00
committed by GitHub
parent 782a35e801
commit 5f561b4ff1
4 changed files with 28 additions and 12 deletions
+10 -3
View File
@@ -12,6 +12,7 @@ import (
"github.com/livekit/livekit-server/pkg/agent"
"github.com/livekit/livekit-server/pkg/agent/testutils"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils/guid"
@@ -20,6 +21,7 @@ import (
)
func TestAgent(t *testing.T) {
testAgentName := "test_agent"
t.Run("dispatched jobs are assigned to a worker", func(t *testing.T) {
bus := psrpc.NewLocalMessageBus()
@@ -28,7 +30,7 @@ func TestAgent(t *testing.T) {
t.Cleanup(server.Close)
worker := server.SimulateAgentWorker()
worker.Register("test", livekit.JobType_JT_ROOM)
worker.Register(testAgentName, livekit.JobType_JT_ROOM)
jobAssignments := worker.JobAssignments.Observe()
job := &livekit.Job{
@@ -36,14 +38,19 @@ func TestAgent(t *testing.T) {
DispatchId: guid.New(guid.AgentDispatchPrefix),
Type: livekit.JobType_JT_ROOM,
Room: &livekit.Room{},
AgentName: "test",
AgentName: testAgentName,
}
_, err := client.JobRequest(context.Background(), "test", agent.RoomAgentTopic, job)
_, err := client.JobRequest(context.Background(), testAgentName, agent.RoomAgentTopic, job)
require.NoError(t, err)
select {
case a := <-jobAssignments.Events():
require.EqualValues(t, job.Id, a.Job.Id)
v, err := auth.ParseAPIToken(a.Token)
require.NoError(t, err)
claims, err := v.Verify(server.TestAPISecret)
require.NoError(t, err)
require.Equal(t, testAgentName, claims.Attributes[agent.AgentNameAttributeKey])
case <-time.After(time.Second):
require.Fail(t, "job assignment timeout")
}
+3 -1
View File
@@ -33,6 +33,8 @@ type AgentService interface {
type TestServer struct {
AgentService
TestAPIKey string
TestAPISecret string
}
func NewTestServer(bus psrpc.MessageBus) *TestServer {
@@ -46,7 +48,7 @@ func NewTestServer(bus psrpc.MessageBus) *TestServer {
}
func NewTestServerWithService(s AgentService) *TestServer {
return &TestServer{s}
return &TestServer{s, "test", "verysecretsecret"}
}
type SimulatedWorkerOptions struct {
+8 -1
View File
@@ -43,6 +43,8 @@ var (
ErrDuplicateJobAssignment = errors.New("duplicate job assignment")
)
const AgentNameAttributeKey = "lk.agent_name"
type WorkerProtocolVersion int
const CurrentProtocol = 1
@@ -365,6 +367,11 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) (*livekit.JobS
}
job.State.ParticipantIdentity = res.ParticipantIdentity
attributes := res.ParticipantAttributes
if attributes == nil {
attributes = make(map[string]string)
}
attributes[AgentNameAttributeKey] = w.AgentName
token, err := pagent.BuildAgentToken(
w.apiKey,
@@ -373,7 +380,7 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) (*livekit.JobS
res.ParticipantIdentity,
res.ParticipantName,
res.ParticipantMetadata,
res.ParticipantAttributes,
attributes,
w.Permissions,
)
if err != nil {
+7 -7
View File
@@ -89,23 +89,23 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService, objectStore)
topicFormatter := rpc.NewTopicFormatter()
v, err := rpc.NewTypedRoomClient(clientParams)
roomClient, err := rpc.NewTypedRoomClient(clientParams)
if err != nil {
return nil, err
}
v2, err := rpc.NewTypedParticipantClient(clientParams)
participantClient, err := rpc.NewTypedParticipantClient(clientParams)
if err != nil {
return nil, err
}
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2)
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient)
if err != nil {
return nil, err
}
v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams)
agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams)
if err != nil {
return nil, err
}
agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router)
agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router)
egressService := NewEgressService(egressClient, rtcEgressLauncher, ioInfoService, roomService)
ingressConfig := getIngressConfig(conf)
ingressClient, err := rpc.NewIngressClient(clientParams)
@@ -120,11 +120,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, router, telemetryService)
v4, err := rpc.NewTypedWHIPParticipantClient(clientParams)
whipParticipantClient, err := rpc.NewTypedWHIPParticipantClient(clientParams)
if err != nil {
return nil, err
}
serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, v4)
serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, whipParticipantClient)
if err != nil {
return nil, err
}