From 88facc0235cce247cc2a7001cb43ff36fa410358 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sat, 31 Jan 2026 14:19:19 -0800 Subject: [PATCH] adds a test to ensure agent worker errors cause disconnection (#4273) --- pkg/agent/agent_test.go | 30 ++++++++++++++++++++++++++++++ pkg/agent/testutils/server.go | 21 +++++++++++++-------- 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index 40ae7a781..447ca798e 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -188,3 +188,33 @@ func TestAgentLoadBalancing(t *testing.T) { } }) } + +func TestConnectionClosedOnDispatchError(t *testing.T) { + t.Run("connection closed when unknown message type received", func(t *testing.T) { + bus := psrpc.NewLocalMessageBus() + server := testutils.NewTestServer(bus) + t.Cleanup(server.Close) + + // register agent + worker := server.SimulateAgentWorker() + worker.Register("test_agent", livekit.JobType_JT_ROOM) + responses := worker.RegisterWorkerResponses.Observe() + select { + case <-responses.Events(): + // registered + case <-time.After(time.Second): + require.Fail(t, "registration timeout") + } + responses.Stop() + + // send invalid message (nil Message field triggers ErrUnknownWorkerSignal) + worker.SendMessage(&livekit.WorkerMessage{Message: nil}) + + select { + case <-worker.Closed(): + // connection closed + case <-time.After(time.Second): + require.Fail(t, "connection should have been closed after dispatch error") + } + }) +} diff --git a/pkg/agent/testutils/server.go b/pkg/agent/testutils/server.go index 3eb2b0e16..31a12c5ed 100644 --- a/pkg/agent/testutils/server.go +++ b/pkg/agent/testutils/server.go @@ -206,6 +206,11 @@ func (w *AgentWorker) Close() error { return nil } +// Closed returns a channel that is closed when the connection is closed by the server +func (w *AgentWorker) Closed() <-chan struct{} { + return w.fuse.Watch() +} + func (w *AgentWorker) SetReadDeadline(t time.Time) error { w.mu.Lock() defer w.mu.Unlock() @@ -317,7 +322,7 @@ func (w *AgentWorker) handlePong(m *livekit.WorkerPong) { w.WorkerPongs.Emit(m) } -func (w *AgentWorker) sendMessage(m *livekit.WorkerMessage) { +func (w *AgentWorker) SendMessage(m *livekit.WorkerMessage) { select { case <-w.fuse.Watch(): case w.workerMessages <- m: @@ -325,43 +330,43 @@ func (w *AgentWorker) sendMessage(m *livekit.WorkerMessage) { } func (w *AgentWorker) SendRegister(m *livekit.RegisterWorkerRequest) { - w.sendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_Register{ + w.SendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_Register{ Register: m, }}) } func (w *AgentWorker) SendAvailability(m *livekit.AvailabilityResponse) { - w.sendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_Availability{ + w.SendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_Availability{ Availability: m, }}) } func (w *AgentWorker) SendUpdateWorker(m *livekit.UpdateWorkerStatus) { - w.sendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_UpdateWorker{ + w.SendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_UpdateWorker{ UpdateWorker: m, }}) } func (w *AgentWorker) SendUpdateJob(m *livekit.UpdateJobStatus) { - w.sendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_UpdateJob{ + w.SendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_UpdateJob{ UpdateJob: m, }}) } func (w *AgentWorker) SendPing(m *livekit.WorkerPing) { - w.sendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_Ping{ + w.SendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_Ping{ Ping: m, }}) } func (w *AgentWorker) SendSimulateJob(m *livekit.SimulateJobRequest) { - w.sendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_SimulateJob{ + w.SendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_SimulateJob{ SimulateJob: m, }}) } func (w *AgentWorker) SendMigrateJob(m *livekit.MigrateJobRequest) { - w.sendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_MigrateJob{ + w.SendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_MigrateJob{ MigrateJob: m, }}) }