update protocol (#3031)

* update protocol

* test

* cleanup
This commit is contained in:
Paul Wells
2024-09-21 17:05:39 -07:00
committed by GitHub
parent 22c36ef423
commit 8428d5e62b
5 changed files with 86 additions and 54 deletions

6
go.mod
View File

@@ -19,8 +19,8 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598
github.com/livekit/protocol v1.22.1-0.20240920184753-71b9c184e5c8
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a
github.com/livekit/protocol v1.22.1-0.20240921065117-bc519801ab1f
github.com/livekit/psrpc v0.6.0
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.9.0
@@ -77,8 +77,6 @@ require (
github.com/docker/docker v27.1.1+incompatible // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/eapache/channels v1.1.0 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-jose/go-jose/v3 v3.0.3 // indirect
github.com/go-logr/logr v1.4.2 // indirect

12
go.sum
View File

@@ -58,10 +58,6 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/eapache/channels v1.1.0 h1:F1taHcn7/F0i8DYqKXJnyhJcVpp2kgFcNePxXtnyu4k=
github.com/eapache/channels v1.1.0/go.mod h1:jMm2qB5Ubtg9zLd+inMZd2/NUvXgzmWXsDaLyQIGfH0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elliotchance/orderedmap/v2 v2.4.0 h1:6tUmMwD9F998FNpwFxA5E6NQvSpk2PVw7RKsVq3+2Cw=
github.com/elliotchance/orderedmap/v2 v2.4.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q=
github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A=
@@ -169,10 +165,10 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.22.1-0.20240920184753-71b9c184e5c8 h1:Tt/INVn5HOgyy/OknLzEK46sWDKRnQ+NvsjFkMZDbWc=
github.com/livekit/protocol v1.22.1-0.20240920184753-71b9c184e5c8/go.mod h1:AFuwk3+uIWFeO5ohKjx5w606Djl940+wktaZ441VoCI=
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo=
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/protocol v1.22.1-0.20240921065117-bc519801ab1f h1:ZqNVFsfSJcE5mdlP5JMSTxx16s0/A8f90KoxXleSR18=
github.com/livekit/protocol v1.22.1-0.20240921065117-bc519801ab1f/go.mod h1:dhhlL/vzgcvdKroqEHaIwfO1vkX2MSjgEtYl2prGUKU=
github.com/livekit/psrpc v0.6.0 h1:bLf39yD2IP92/C7104Q2ZYqauUpcn+fioBeVtfhXe+E=
github.com/livekit/psrpc v0.6.0/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=
github.com/mackerelio/go-osstat v0.2.5/go.mod h1:atxwWF+POUZcdtR1wnsUcQxTytoHG4uhl2AKKzrOajY=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=

View File

@@ -50,50 +50,52 @@ func TestAgent(t *testing.T) {
})
}
func TestAgentLoadBalancing(t *testing.T) {
batchJobCreate := func(batchSize int, totalJobs int, client rpc.AgentInternalClient, workers []*testutils.AgentWorker) <-chan struct{} {
var assigned atomic.Uint32
done := make(chan struct{})
for _, w := range workers {
assignments := w.JobAssignments.Observe()
go func() {
defer assignments.Stop()
for {
select {
case <-done:
case <-assignments.Events():
if assigned.Inc() == uint32(totalJobs) {
close(done)
}
func testBatchJobRequest(t require.TestingT, batchSize int, totalJobs int, client rpc.AgentInternalClient, workers []*testutils.AgentWorker) <-chan struct{} {
var assigned atomic.Uint32
done := make(chan struct{})
for _, w := range workers {
assignments := w.JobAssignments.Observe()
go func() {
defer assignments.Stop()
for {
select {
case <-done:
case <-assignments.Events():
if assigned.Inc() == uint32(totalJobs) {
close(done)
}
}
}()
}
var wg sync.WaitGroup
for i := 0; i < totalJobs; i += batchSize {
wg.Add(1)
go func(start int) {
defer wg.Done()
for j := start; j < start+batchSize && j < totalJobs; j++ {
job := &livekit.Job{
Id: guid.New(guid.AgentJobPrefix),
DispatchId: guid.New(guid.AgentDispatchPrefix),
Type: livekit.JobType_JT_ROOM,
Room: &livekit.Room{},
AgentName: "test",
}
_, err := client.JobRequest(context.Background(), "test", agent.RoomAgentTopic, job)
require.NoError(t, err)
}
}(i)
}
wg.Wait()
return done
}
}()
}
// wait for agent registration
time.Sleep(100 * time.Millisecond)
var wg sync.WaitGroup
for i := 0; i < totalJobs; i += batchSize {
wg.Add(1)
go func(start int) {
defer wg.Done()
for j := start; j < start+batchSize && j < totalJobs; j++ {
job := &livekit.Job{
Id: guid.New(guid.AgentJobPrefix),
DispatchId: guid.New(guid.AgentDispatchPrefix),
Type: livekit.JobType_JT_ROOM,
Room: &livekit.Room{},
AgentName: "test",
}
_, err := client.JobRequest(context.Background(), "test", agent.RoomAgentTopic, job)
require.NoError(t, err)
}
}(i)
}
wg.Wait()
return done
}
func TestAgentLoadBalancing(t *testing.T) {
t.Run("jobs are distributed normally with baseline worker load", func(t *testing.T) {
totalWorkers := 5
totalJobs := 100
@@ -101,6 +103,7 @@ func TestAgentLoadBalancing(t *testing.T) {
bus := psrpc.NewLocalMessageBus()
client := must.Get(rpc.NewAgentInternalClient(bus))
t.Cleanup(client.Close)
server := testutils.NewTestServer(bus)
t.Cleanup(server.Close)
@@ -114,7 +117,7 @@ func TestAgentLoadBalancing(t *testing.T) {
}
select {
case <-batchJobCreate(10, totalJobs, client, agents):
case <-testBatchJobRequest(t, 10, totalJobs, client, agents):
case <-time.After(time.Second):
require.Fail(t, "job assignment timeout")
}
@@ -139,6 +142,7 @@ func TestAgentLoadBalancing(t *testing.T) {
bus := psrpc.NewLocalMessageBus()
client := must.Get(rpc.NewAgentInternalClient(bus))
t.Cleanup(client.Close)
server := testutils.NewTestServer(bus)
t.Cleanup(server.Close)
@@ -155,7 +159,7 @@ func TestAgentLoadBalancing(t *testing.T) {
}
select {
case <-batchJobCreate(1, totalJobs, client, agents):
case <-testBatchJobRequest(t, 1, totalJobs, client, agents):
case <-time.After(time.Second):
require.Fail(t, "job assignment timeout")
}

View File

@@ -59,3 +59,7 @@ func (c *roomManagerClient) CreateRoom(ctx context.Context, nodeID livekit.NodeI
Timeout: c.config.CreateRoomTimeout,
})))...)
}
func (c *roomManagerClient) Close() {
c.client.Close()
}

View File

@@ -11,6 +11,10 @@ import (
)
type FakeRoomManagerClient struct {
CloseStub func()
closeMutex sync.RWMutex
closeArgsForCall []struct {
}
CreateRoomStub func(context.Context, livekit.NodeID, *livekit.CreateRoomRequest, ...psrpc.RequestOption) (*livekit.Room, error)
createRoomMutex sync.RWMutex
createRoomArgsForCall []struct {
@@ -31,6 +35,30 @@ type FakeRoomManagerClient struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeRoomManagerClient) Close() {
fake.closeMutex.Lock()
fake.closeArgsForCall = append(fake.closeArgsForCall, struct {
}{})
stub := fake.CloseStub
fake.recordInvocation("Close", []interface{}{})
fake.closeMutex.Unlock()
if stub != nil {
fake.CloseStub()
}
}
func (fake *FakeRoomManagerClient) CloseCallCount() int {
fake.closeMutex.RLock()
defer fake.closeMutex.RUnlock()
return len(fake.closeArgsForCall)
}
func (fake *FakeRoomManagerClient) CloseCalls(stub func()) {
fake.closeMutex.Lock()
defer fake.closeMutex.Unlock()
fake.CloseStub = stub
}
func (fake *FakeRoomManagerClient) CreateRoom(arg1 context.Context, arg2 livekit.NodeID, arg3 *livekit.CreateRoomRequest, arg4 ...psrpc.RequestOption) (*livekit.Room, error) {
fake.createRoomMutex.Lock()
ret, specificReturn := fake.createRoomReturnsOnCall[len(fake.createRoomArgsForCall)]
@@ -101,6 +129,8 @@ func (fake *FakeRoomManagerClient) CreateRoomReturnsOnCall(i int, result1 *livek
func (fake *FakeRoomManagerClient) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.closeMutex.RLock()
defer fake.closeMutex.RUnlock()
fake.createRoomMutex.RLock()
defer fake.createRoomMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}