adds a test to ensure agent worker errors cause disconnection (#4273)

This commit is contained in:
David Zhao
2026-01-31 14:19:19 -08:00
committed by GitHub
parent 76a41a7a8c
commit 88facc0235
2 changed files with 43 additions and 8 deletions
+30
View File
@@ -188,3 +188,33 @@ func TestAgentLoadBalancing(t *testing.T) {
}
})
}
func TestConnectionClosedOnDispatchError(t *testing.T) {
t.Run("connection closed when unknown message type received", func(t *testing.T) {
bus := psrpc.NewLocalMessageBus()
server := testutils.NewTestServer(bus)
t.Cleanup(server.Close)
// register agent
worker := server.SimulateAgentWorker()
worker.Register("test_agent", livekit.JobType_JT_ROOM)
responses := worker.RegisterWorkerResponses.Observe()
select {
case <-responses.Events():
// registered
case <-time.After(time.Second):
require.Fail(t, "registration timeout")
}
responses.Stop()
// send invalid message (nil Message field triggers ErrUnknownWorkerSignal)
worker.SendMessage(&livekit.WorkerMessage{Message: nil})
select {
case <-worker.Closed():
// connection closed
case <-time.After(time.Second):
require.Fail(t, "connection should have been closed after dispatch error")
}
})
}
+13 -8
View File
@@ -206,6 +206,11 @@ func (w *AgentWorker) Close() error {
return nil
}
// Closed returns a channel that is closed when the connection is closed by the server
func (w *AgentWorker) Closed() <-chan struct{} {
return w.fuse.Watch()
}
func (w *AgentWorker) SetReadDeadline(t time.Time) error {
w.mu.Lock()
defer w.mu.Unlock()
@@ -317,7 +322,7 @@ func (w *AgentWorker) handlePong(m *livekit.WorkerPong) {
w.WorkerPongs.Emit(m)
}
func (w *AgentWorker) sendMessage(m *livekit.WorkerMessage) {
func (w *AgentWorker) SendMessage(m *livekit.WorkerMessage) {
select {
case <-w.fuse.Watch():
case w.workerMessages <- m:
@@ -325,43 +330,43 @@ func (w *AgentWorker) sendMessage(m *livekit.WorkerMessage) {
}
func (w *AgentWorker) SendRegister(m *livekit.RegisterWorkerRequest) {
w.sendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_Register{
w.SendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_Register{
Register: m,
}})
}
func (w *AgentWorker) SendAvailability(m *livekit.AvailabilityResponse) {
w.sendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_Availability{
w.SendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_Availability{
Availability: m,
}})
}
func (w *AgentWorker) SendUpdateWorker(m *livekit.UpdateWorkerStatus) {
w.sendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_UpdateWorker{
w.SendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_UpdateWorker{
UpdateWorker: m,
}})
}
func (w *AgentWorker) SendUpdateJob(m *livekit.UpdateJobStatus) {
w.sendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_UpdateJob{
w.SendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_UpdateJob{
UpdateJob: m,
}})
}
func (w *AgentWorker) SendPing(m *livekit.WorkerPing) {
w.sendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_Ping{
w.SendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_Ping{
Ping: m,
}})
}
func (w *AgentWorker) SendSimulateJob(m *livekit.SimulateJobRequest) {
w.sendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_SimulateJob{
w.SendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_SimulateJob{
SimulateJob: m,
}})
}
func (w *AgentWorker) SendMigrateJob(m *livekit.MigrateJobRequest) {
w.sendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_MigrateJob{
w.SendMessage(&livekit.WorkerMessage{Message: &livekit.WorkerMessage_MigrateJob{
MigrateJob: m,
}})
}