diff --git a/go.mod b/go.mod index a5e470bf9..08c5ddbf3 100644 --- a/go.mod +++ b/go.mod @@ -19,8 +19,8 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 - github.com/livekit/protocol v1.22.1-0.20240920184753-71b9c184e5c8 - github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a + github.com/livekit/protocol v1.22.1-0.20240921065117-bc519801ab1f + github.com/livekit/psrpc v0.6.0 github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.9.0 @@ -77,8 +77,6 @@ require ( github.com/docker/docker v27.1.1+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect - github.com/eapache/channels v1.1.0 // indirect - github.com/eapache/queue v1.1.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-jose/go-jose/v3 v3.0.3 // indirect github.com/go-logr/logr v1.4.2 // indirect diff --git a/go.sum b/go.sum index 8b6a85f46..84d50f400 100644 --- a/go.sum +++ b/go.sum @@ -58,10 +58,6 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= -github.com/eapache/channels v1.1.0 h1:F1taHcn7/F0i8DYqKXJnyhJcVpp2kgFcNePxXtnyu4k= -github.com/eapache/channels v1.1.0/go.mod h1:jMm2qB5Ubtg9zLd+inMZd2/NUvXgzmWXsDaLyQIGfH0= -github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= -github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/elliotchance/orderedmap/v2 v2.4.0 h1:6tUmMwD9F998FNpwFxA5E6NQvSpk2PVw7RKsVq3+2Cw= github.com/elliotchance/orderedmap/v2 v2.4.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q= github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= @@ -169,10 +165,10 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.22.1-0.20240920184753-71b9c184e5c8 h1:Tt/INVn5HOgyy/OknLzEK46sWDKRnQ+NvsjFkMZDbWc= -github.com/livekit/protocol v1.22.1-0.20240920184753-71b9c184e5c8/go.mod h1:AFuwk3+uIWFeO5ohKjx5w606Djl940+wktaZ441VoCI= -github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo= -github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= +github.com/livekit/protocol v1.22.1-0.20240921065117-bc519801ab1f h1:ZqNVFsfSJcE5mdlP5JMSTxx16s0/A8f90KoxXleSR18= +github.com/livekit/protocol v1.22.1-0.20240921065117-bc519801ab1f/go.mod h1:dhhlL/vzgcvdKroqEHaIwfO1vkX2MSjgEtYl2prGUKU= +github.com/livekit/psrpc v0.6.0 h1:bLf39yD2IP92/C7104Q2ZYqauUpcn+fioBeVtfhXe+E= +github.com/livekit/psrpc v0.6.0/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= github.com/mackerelio/go-osstat v0.2.5/go.mod h1:atxwWF+POUZcdtR1wnsUcQxTytoHG4uhl2AKKzrOajY= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index 079cf8e02..98d040749 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -50,50 +50,52 @@ func TestAgent(t *testing.T) { }) } -func TestAgentLoadBalancing(t *testing.T) { - - batchJobCreate := func(batchSize int, totalJobs int, client rpc.AgentInternalClient, workers []*testutils.AgentWorker) <-chan struct{} { - var assigned atomic.Uint32 - done := make(chan struct{}) - for _, w := range workers { - assignments := w.JobAssignments.Observe() - go func() { - defer assignments.Stop() - for { - select { - case <-done: - case <-assignments.Events(): - if assigned.Inc() == uint32(totalJobs) { - close(done) - } +func testBatchJobRequest(t require.TestingT, batchSize int, totalJobs int, client rpc.AgentInternalClient, workers []*testutils.AgentWorker) <-chan struct{} { + var assigned atomic.Uint32 + done := make(chan struct{}) + for _, w := range workers { + assignments := w.JobAssignments.Observe() + go func() { + defer assignments.Stop() + for { + select { + case <-done: + case <-assignments.Events(): + if assigned.Inc() == uint32(totalJobs) { + close(done) } } - }() - } - - var wg sync.WaitGroup - for i := 0; i < totalJobs; i += batchSize { - wg.Add(1) - go func(start int) { - defer wg.Done() - for j := start; j < start+batchSize && j < totalJobs; j++ { - job := &livekit.Job{ - Id: guid.New(guid.AgentJobPrefix), - DispatchId: guid.New(guid.AgentDispatchPrefix), - Type: livekit.JobType_JT_ROOM, - Room: &livekit.Room{}, - AgentName: "test", - } - _, err := client.JobRequest(context.Background(), "test", agent.RoomAgentTopic, job) - require.NoError(t, err) - } - }(i) - } - wg.Wait() - - return done + } + }() } + // wait for agent registration + time.Sleep(100 * time.Millisecond) + + var wg sync.WaitGroup + for i := 0; i < totalJobs; i += batchSize { + wg.Add(1) + go func(start int) { + defer wg.Done() + for j := start; j < start+batchSize && j < totalJobs; j++ { + job := &livekit.Job{ + Id: guid.New(guid.AgentJobPrefix), + DispatchId: guid.New(guid.AgentDispatchPrefix), + Type: livekit.JobType_JT_ROOM, + Room: &livekit.Room{}, + AgentName: "test", + } + _, err := client.JobRequest(context.Background(), "test", agent.RoomAgentTopic, job) + require.NoError(t, err) + } + }(i) + } + wg.Wait() + + return done +} + +func TestAgentLoadBalancing(t *testing.T) { t.Run("jobs are distributed normally with baseline worker load", func(t *testing.T) { totalWorkers := 5 totalJobs := 100 @@ -101,6 +103,7 @@ func TestAgentLoadBalancing(t *testing.T) { bus := psrpc.NewLocalMessageBus() client := must.Get(rpc.NewAgentInternalClient(bus)) + t.Cleanup(client.Close) server := testutils.NewTestServer(bus) t.Cleanup(server.Close) @@ -114,7 +117,7 @@ func TestAgentLoadBalancing(t *testing.T) { } select { - case <-batchJobCreate(10, totalJobs, client, agents): + case <-testBatchJobRequest(t, 10, totalJobs, client, agents): case <-time.After(time.Second): require.Fail(t, "job assignment timeout") } @@ -139,6 +142,7 @@ func TestAgentLoadBalancing(t *testing.T) { bus := psrpc.NewLocalMessageBus() client := must.Get(rpc.NewAgentInternalClient(bus)) + t.Cleanup(client.Close) server := testutils.NewTestServer(bus) t.Cleanup(server.Close) @@ -155,7 +159,7 @@ func TestAgentLoadBalancing(t *testing.T) { } select { - case <-batchJobCreate(1, totalJobs, client, agents): + case <-testBatchJobRequest(t, 1, totalJobs, client, agents): case <-time.After(time.Second): require.Fail(t, "job assignment timeout") } diff --git a/pkg/routing/roommanager.go b/pkg/routing/roommanager.go index 0df0846c3..327c5806d 100644 --- a/pkg/routing/roommanager.go +++ b/pkg/routing/roommanager.go @@ -59,3 +59,7 @@ func (c *roomManagerClient) CreateRoom(ctx context.Context, nodeID livekit.NodeI Timeout: c.config.CreateRoomTimeout, })))...) } + +func (c *roomManagerClient) Close() { + c.client.Close() +} diff --git a/pkg/routing/routingfakes/fake_room_manager_client.go b/pkg/routing/routingfakes/fake_room_manager_client.go index a1df758d3..d164ac1e3 100644 --- a/pkg/routing/routingfakes/fake_room_manager_client.go +++ b/pkg/routing/routingfakes/fake_room_manager_client.go @@ -11,6 +11,10 @@ import ( ) type FakeRoomManagerClient struct { + CloseStub func() + closeMutex sync.RWMutex + closeArgsForCall []struct { + } CreateRoomStub func(context.Context, livekit.NodeID, *livekit.CreateRoomRequest, ...psrpc.RequestOption) (*livekit.Room, error) createRoomMutex sync.RWMutex createRoomArgsForCall []struct { @@ -31,6 +35,30 @@ type FakeRoomManagerClient struct { invocationsMutex sync.RWMutex } +func (fake *FakeRoomManagerClient) Close() { + fake.closeMutex.Lock() + fake.closeArgsForCall = append(fake.closeArgsForCall, struct { + }{}) + stub := fake.CloseStub + fake.recordInvocation("Close", []interface{}{}) + fake.closeMutex.Unlock() + if stub != nil { + fake.CloseStub() + } +} + +func (fake *FakeRoomManagerClient) CloseCallCount() int { + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + return len(fake.closeArgsForCall) +} + +func (fake *FakeRoomManagerClient) CloseCalls(stub func()) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = stub +} + func (fake *FakeRoomManagerClient) CreateRoom(arg1 context.Context, arg2 livekit.NodeID, arg3 *livekit.CreateRoomRequest, arg4 ...psrpc.RequestOption) (*livekit.Room, error) { fake.createRoomMutex.Lock() ret, specificReturn := fake.createRoomReturnsOnCall[len(fake.createRoomArgsForCall)] @@ -101,6 +129,8 @@ func (fake *FakeRoomManagerClient) CreateRoomReturnsOnCall(i int, result1 *livek func (fake *FakeRoomManagerClient) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() fake.createRoomMutex.RLock() defer fake.createRoomMutex.RUnlock() copiedInvocations := map[string][][]interface{}{}