From 5dda31d42dc0bcf7c474add7305fd1290f82493c Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Thu, 9 Nov 2023 01:31:00 -0800 Subject: [PATCH] register one job per test agent worker (#2232) --- test/agent.go | 50 +++++++++++++++++++++++----------------------- test/agent_test.go | 23 +++++++++++++++------ 2 files changed, 42 insertions(+), 31 deletions(-) diff --git a/test/agent.go b/test/agent.go index 76f608314..65b4952b6 100644 --- a/test/agent.go +++ b/test/agent.go @@ -48,38 +48,38 @@ func newAgentClient(token string) (*agentClient, error) { }, nil } -func (c *agentClient) Run() error { +func (c *agentClient) Run(jobType livekit.JobType) (err error) { go c.read() workerID := utils.NewGuid("W_") - if err := c.write(&livekit.WorkerMessage{ - Message: &livekit.WorkerMessage_Register{ - Register: &livekit.RegisterWorkerRequest{ - Type: livekit.JobType_JT_ROOM, - WorkerId: workerID, - Version: "version", - Name: "name", + switch jobType { + case livekit.JobType_JT_ROOM: + err = c.write(&livekit.WorkerMessage{ + Message: &livekit.WorkerMessage_Register{ + Register: &livekit.RegisterWorkerRequest{ + Type: livekit.JobType_JT_ROOM, + WorkerId: workerID, + Version: "version", + Name: "name", + }, }, - }, - }); err != nil { - return err + }) + + case livekit.JobType_JT_PUBLISHER: + err = c.write(&livekit.WorkerMessage{ + Message: &livekit.WorkerMessage_Register{ + Register: &livekit.RegisterWorkerRequest{ + Type: livekit.JobType_JT_PUBLISHER, + WorkerId: workerID, + Version: "version", + Name: "name", + }, + }, + }) } - if err := c.write(&livekit.WorkerMessage{ - Message: &livekit.WorkerMessage_Register{ - Register: &livekit.RegisterWorkerRequest{ - Type: livekit.JobType_JT_PUBLISHER, - WorkerId: workerID, - Version: "version", - Name: "name", - }, - }, - }); err != nil { - return err - } - - return nil + return err } func (c *agentClient) read() { diff --git a/test/agent_test.go b/test/agent_test.go index 0a45fa9ae..a8825577b 100644 --- a/test/agent_test.go +++ b/test/agent_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/require" "github.com/livekit/protocol/auth" + "github.com/livekit/protocol/livekit" ) func TestAgents(t *testing.T) { @@ -31,15 +32,25 @@ func TestAgents(t *testing.T) { require.NoError(t, err) ac2, err := newAgentClient(agentToken()) require.NoError(t, err) + ac3, err := newAgentClient(agentToken()) + require.NoError(t, err) + ac4, err := newAgentClient(agentToken()) + require.NoError(t, err) defer ac1.close() defer ac2.close() - ac1.Run() - ac2.Run() + defer ac3.close() + defer ac4.close() + ac1.Run(livekit.JobType_JT_ROOM) + ac2.Run(livekit.JobType_JT_ROOM) + ac3.Run(livekit.JobType_JT_PUBLISHER) + ac4.Run(livekit.JobType_JT_PUBLISHER) time.Sleep(time.Second * 3) - require.Equal(t, int32(2), ac1.registered.Load()) - require.Equal(t, int32(2), ac2.registered.Load()) + require.Equal(t, int32(1), ac1.registered.Load()) + require.Equal(t, int32(1), ac2.registered.Load()) + require.Equal(t, int32(1), ac3.registered.Load()) + require.Equal(t, int32(1), ac4.registered.Load()) c1 := createRTCClient("c1", defaultServerPort, nil) c2 := createRTCClient("c2", defaultServerPort, nil) @@ -56,7 +67,7 @@ func TestAgents(t *testing.T) { time.Sleep(time.Second * 3) require.Equal(t, int32(1), ac1.roomJobs.Load()+ac2.roomJobs.Load()) - require.Equal(t, int32(1), ac1.participantJobs.Load()+ac2.participantJobs.Load()) + require.Equal(t, int32(1), ac3.participantJobs.Load()+ac4.participantJobs.Load()) // publish 2 tracks t3, err := c2.AddStaticTrack("audio/opus", "audio", "webcam") @@ -69,7 +80,7 @@ func TestAgents(t *testing.T) { time.Sleep(time.Second * 3) require.Equal(t, int32(1), ac1.roomJobs.Load()+ac2.roomJobs.Load()) - require.Equal(t, int32(2), ac1.participantJobs.Load()+ac2.participantJobs.Load()) + require.Equal(t, int32(2), ac3.participantJobs.Load()+ac4.participantJobs.Load()) } func agentToken() string {