mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 00:29:52 +00:00
prevent race in telemetry worker cleanup (#2879)
This commit is contained in:
@@ -128,6 +128,12 @@ func (s *StatsWorker) Close() bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
func (s *StatsWorker) Closed() bool {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
return !s.closedAt.IsZero()
|
||||
}
|
||||
|
||||
func (s *StatsWorker) collectStats(
|
||||
ts *timestamppb.Timestamp,
|
||||
streamType livekit.StreamType,
|
||||
|
||||
@@ -160,7 +160,9 @@ func (t *telemetryService) FlushStats() {
|
||||
if reap != nil {
|
||||
t.workersMu.Lock()
|
||||
for reap != nil {
|
||||
delete(t.workers, reap.participantID)
|
||||
if reap == t.workers[reap.participantID] {
|
||||
delete(t.workers, reap.participantID)
|
||||
}
|
||||
reap = reap.next
|
||||
}
|
||||
t.workersMu.Unlock()
|
||||
@@ -195,7 +197,7 @@ func (t *telemetryService) getOrCreateWorker(
|
||||
t.workersMu.Lock()
|
||||
defer t.workersMu.Unlock()
|
||||
|
||||
if worker, ok := t.workers[participantID]; ok {
|
||||
if worker, ok := t.workers[participantID]; ok && !worker.Closed() {
|
||||
return worker, true
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user