diff --git a/pkg/telemetry/telemetryserviceinternal.go b/pkg/telemetry/telemetryserviceinternal.go index 68fca98e0..beab7c302 100644 --- a/pkg/telemetry/telemetryserviceinternal.go +++ b/pkg/telemetry/telemetryserviceinternal.go @@ -28,9 +28,9 @@ type telemetryServiceInternal struct { webhookPool *workerpool.WorkerPool // one worker per participant - workersMu sync.RWMutex - workers []*StatsWorker - workersIdx map[livekit.ParticipantID]int + workersMu sync.RWMutex + workers map[livekit.ParticipantID]*StatsWorker + workersShadow map[livekit.ParticipantID]*StatsWorker analytics AnalyticsService } @@ -39,7 +39,7 @@ func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsS return &telemetryServiceInternal{ notifier: notifier, webhookPool: workerpool.New(maxWebhookWorkers), - workersIdx: make(map[livekit.ParticipantID]int), + workers: make(map[livekit.ParticipantID]*StatsWorker), analytics: analytics, } } @@ -93,13 +93,11 @@ func (t *telemetryServiceInternal) Report(ctx context.Context, stats []*livekit. func (t *telemetryServiceInternal) SendAnalytics() { t.workersMu.RLock() - workers := t.workers + workers := t.workersShadow t.workersMu.RUnlock() for _, worker := range workers { - if worker != nil { - worker.Update() - } + worker.Update() } } @@ -107,8 +105,8 @@ func (t *telemetryServiceInternal) getStatsWorker(participantID livekit.Particip t.workersMu.RLock() defer t.workersMu.RUnlock() - if idx, ok := t.workersIdx[participantID]; ok { - return t.workers[idx] + if worker, ok := t.workersShadow[participantID]; ok { + return worker } return nil diff --git a/pkg/telemetry/telemetryserviceinternalevents.go b/pkg/telemetry/telemetryserviceinternalevents.go index 9f65b95ff..8bb09ebd4 100644 --- a/pkg/telemetry/telemetryserviceinternalevents.go +++ b/pkg/telemetry/telemetryserviceinternalevents.go @@ -54,30 +54,19 @@ func (t *telemetryServiceInternal) ParticipantJoined( ) { prometheus.IncrementParticipantJoin(1) - newWorker := newStatsWorker( - ctx, - t, - livekit.RoomID(room.Sid), - livekit.RoomName(room.Name), - livekit.ParticipantID(participant.Sid), - livekit.ParticipantIdentity(participant.Identity), - ) t.workersMu.Lock() - var free = false - for idx, worker := range t.workers { - if worker != nil { - continue - } + if _, ok := t.workers[livekit.ParticipantID(participant.Sid)]; !ok { + newWorker := newStatsWorker( + ctx, + t, + livekit.RoomID(room.Sid), + livekit.RoomName(room.Name), + livekit.ParticipantID(participant.Sid), + livekit.ParticipantIdentity(participant.Identity), + ) + t.workers[livekit.ParticipantID(participant.Sid)] = newWorker - free = true - t.workersIdx[livekit.ParticipantID(participant.Sid)] = idx - t.workers[idx] = newWorker - break - } - - if !free { - t.workersIdx[livekit.ParticipantID(participant.Sid)] = len(t.workers) - t.workers = append(t.workers, newWorker) + t.shadowWorkersLocked() } t.workersMu.Unlock() @@ -120,9 +109,10 @@ func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *li } t.workersMu.Lock() - if idx, ok := t.workersIdx[livekit.ParticipantID(participant.Sid)]; ok { - delete(t.workersIdx, livekit.ParticipantID(participant.Sid)) - t.workers[idx] = nil + if _, ok := t.workers[livekit.ParticipantID(participant.Sid)]; ok { + delete(t.workers, livekit.ParticipantID(participant.Sid)) + + t.shadowWorkersLocked() } t.workersMu.Unlock() @@ -340,3 +330,10 @@ func (t *telemetryServiceInternal) EgressEnded(ctx context.Context, info *liveki RoomId: info.RoomId, }) } + +func (t *telemetryServiceInternal) shadowWorkersLocked() { + t.workersShadow = make(map[livekit.ParticipantID]*StatsWorker, len(t.workers)) + for pID, worker := range t.workers { + t.workersShadow[pID] = worker + } +}