register one job per test agent worker (#2232)

This commit is contained in:
Paul Wells
2023-11-09 01:31:00 -08:00
committed by GitHub
parent aa797c749c
commit 5dda31d42d
2 changed files with 42 additions and 31 deletions

View File

@@ -48,38 +48,38 @@ func newAgentClient(token string) (*agentClient, error) {
}, nil
}
func (c *agentClient) Run() error {
func (c *agentClient) Run(jobType livekit.JobType) (err error) {
go c.read()
workerID := utils.NewGuid("W_")
if err := c.write(&livekit.WorkerMessage{
Message: &livekit.WorkerMessage_Register{
Register: &livekit.RegisterWorkerRequest{
Type: livekit.JobType_JT_ROOM,
WorkerId: workerID,
Version: "version",
Name: "name",
switch jobType {
case livekit.JobType_JT_ROOM:
err = c.write(&livekit.WorkerMessage{
Message: &livekit.WorkerMessage_Register{
Register: &livekit.RegisterWorkerRequest{
Type: livekit.JobType_JT_ROOM,
WorkerId: workerID,
Version: "version",
Name: "name",
},
},
},
}); err != nil {
return err
})
case livekit.JobType_JT_PUBLISHER:
err = c.write(&livekit.WorkerMessage{
Message: &livekit.WorkerMessage_Register{
Register: &livekit.RegisterWorkerRequest{
Type: livekit.JobType_JT_PUBLISHER,
WorkerId: workerID,
Version: "version",
Name: "name",
},
},
})
}
if err := c.write(&livekit.WorkerMessage{
Message: &livekit.WorkerMessage_Register{
Register: &livekit.RegisterWorkerRequest{
Type: livekit.JobType_JT_PUBLISHER,
WorkerId: workerID,
Version: "version",
Name: "name",
},
},
}); err != nil {
return err
}
return nil
return err
}
func (c *agentClient) read() {

View File

@@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
)
func TestAgents(t *testing.T) {
@@ -31,15 +32,25 @@ func TestAgents(t *testing.T) {
require.NoError(t, err)
ac2, err := newAgentClient(agentToken())
require.NoError(t, err)
ac3, err := newAgentClient(agentToken())
require.NoError(t, err)
ac4, err := newAgentClient(agentToken())
require.NoError(t, err)
defer ac1.close()
defer ac2.close()
ac1.Run()
ac2.Run()
defer ac3.close()
defer ac4.close()
ac1.Run(livekit.JobType_JT_ROOM)
ac2.Run(livekit.JobType_JT_ROOM)
ac3.Run(livekit.JobType_JT_PUBLISHER)
ac4.Run(livekit.JobType_JT_PUBLISHER)
time.Sleep(time.Second * 3)
require.Equal(t, int32(2), ac1.registered.Load())
require.Equal(t, int32(2), ac2.registered.Load())
require.Equal(t, int32(1), ac1.registered.Load())
require.Equal(t, int32(1), ac2.registered.Load())
require.Equal(t, int32(1), ac3.registered.Load())
require.Equal(t, int32(1), ac4.registered.Load())
c1 := createRTCClient("c1", defaultServerPort, nil)
c2 := createRTCClient("c2", defaultServerPort, nil)
@@ -56,7 +67,7 @@ func TestAgents(t *testing.T) {
time.Sleep(time.Second * 3)
require.Equal(t, int32(1), ac1.roomJobs.Load()+ac2.roomJobs.Load())
require.Equal(t, int32(1), ac1.participantJobs.Load()+ac2.participantJobs.Load())
require.Equal(t, int32(1), ac3.participantJobs.Load()+ac4.participantJobs.Load())
// publish 2 tracks
t3, err := c2.AddStaticTrack("audio/opus", "audio", "webcam")
@@ -69,7 +80,7 @@ func TestAgents(t *testing.T) {
time.Sleep(time.Second * 3)
require.Equal(t, int32(1), ac1.roomJobs.Load()+ac2.roomJobs.Load())
require.Equal(t, int32(2), ac1.participantJobs.Load()+ac2.participantJobs.Load())
require.Equal(t, int32(2), ac3.participantJobs.Load()+ac4.participantJobs.Load())
}
func agentToken() string {