From afda860162bbb35c68ce337bc5d1ffcb04aaa963 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Thu, 18 Jul 2024 03:37:45 -0700 Subject: [PATCH] prevent race in telemetry worker cleanup (#2879) --- pkg/telemetry/statsworker.go | 6 ++++++ pkg/telemetry/telemetryservice.go | 6 ++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go index 735821b6c..76e644507 100644 --- a/pkg/telemetry/statsworker.go +++ b/pkg/telemetry/statsworker.go @@ -128,6 +128,12 @@ func (s *StatsWorker) Close() bool { return ok } +func (s *StatsWorker) Closed() bool { + s.lock.Lock() + defer s.lock.Unlock() + return !s.closedAt.IsZero() +} + func (s *StatsWorker) collectStats( ts *timestamppb.Timestamp, streamType livekit.StreamType, diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 8af2fb6c4..151f7e814 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -160,7 +160,9 @@ func (t *telemetryService) FlushStats() { if reap != nil { t.workersMu.Lock() for reap != nil { - delete(t.workers, reap.participantID) + if reap == t.workers[reap.participantID] { + delete(t.workers, reap.participantID) + } reap = reap.next } t.workersMu.Unlock() @@ -195,7 +197,7 @@ func (t *telemetryService) getOrCreateWorker( t.workersMu.Lock() defer t.workersMu.Unlock() - if worker, ok := t.workers[participantID]; ok { + if worker, ok := t.workers[participantID]; ok && !worker.Closed() { return worker, true }