diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go index 735821b6c..76e644507 100644 --- a/pkg/telemetry/statsworker.go +++ b/pkg/telemetry/statsworker.go @@ -128,6 +128,12 @@ func (s *StatsWorker) Close() bool { return ok } +func (s *StatsWorker) Closed() bool { + s.lock.Lock() + defer s.lock.Unlock() + return !s.closedAt.IsZero() +} + func (s *StatsWorker) collectStats( ts *timestamppb.Timestamp, streamType livekit.StreamType, diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 8af2fb6c4..151f7e814 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -160,7 +160,9 @@ func (t *telemetryService) FlushStats() { if reap != nil { t.workersMu.Lock() for reap != nil { - delete(t.workers, reap.participantID) + if reap == t.workers[reap.participantID] { + delete(t.workers, reap.participantID) + } reap = reap.next } t.workersMu.Unlock() @@ -195,7 +197,7 @@ func (t *telemetryService) getOrCreateWorker( t.workersMu.Lock() defer t.workersMu.Unlock() - if worker, ok := t.workers[participantID]; ok { + if worker, ok := t.workers[participantID]; ok && !worker.Closed() { return worker, true }