From 4ed9b5f90e0ecad588e00ebde48efa536ea9cfb3 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 31 May 2022 11:06:44 +0530 Subject: [PATCH] Revert "Using shadow pattern for stats workers (#742)" (#744) This reverts commit 2b561d2bad3cb9087ef39bf37555f9a008751852. --- pkg/telemetry/telemetryserviceinternal.go | 18 +++---- .../telemetryserviceinternalevents.go | 47 ++++++++++--------- 2 files changed, 35 insertions(+), 30 deletions(-) diff --git a/pkg/telemetry/telemetryserviceinternal.go b/pkg/telemetry/telemetryserviceinternal.go index beab7c302..68fca98e0 100644 --- a/pkg/telemetry/telemetryserviceinternal.go +++ b/pkg/telemetry/telemetryserviceinternal.go @@ -28,9 +28,9 @@ type telemetryServiceInternal struct { webhookPool *workerpool.WorkerPool // one worker per participant - workersMu sync.RWMutex - workers map[livekit.ParticipantID]*StatsWorker - workersShadow map[livekit.ParticipantID]*StatsWorker + workersMu sync.RWMutex + workers []*StatsWorker + workersIdx map[livekit.ParticipantID]int analytics AnalyticsService } @@ -39,7 +39,7 @@ func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsS return &telemetryServiceInternal{ notifier: notifier, webhookPool: workerpool.New(maxWebhookWorkers), - workers: make(map[livekit.ParticipantID]*StatsWorker), + workersIdx: make(map[livekit.ParticipantID]int), analytics: analytics, } } @@ -93,11 +93,13 @@ func (t *telemetryServiceInternal) Report(ctx context.Context, stats []*livekit. func (t *telemetryServiceInternal) SendAnalytics() { t.workersMu.RLock() - workers := t.workersShadow + workers := t.workers t.workersMu.RUnlock() for _, worker := range workers { - worker.Update() + if worker != nil { + worker.Update() + } } } @@ -105,8 +107,8 @@ func (t *telemetryServiceInternal) getStatsWorker(participantID livekit.Particip t.workersMu.RLock() defer t.workersMu.RUnlock() - if worker, ok := t.workersShadow[participantID]; ok { - return worker + if idx, ok := t.workersIdx[participantID]; ok { + return t.workers[idx] } return nil diff --git a/pkg/telemetry/telemetryserviceinternalevents.go b/pkg/telemetry/telemetryserviceinternalevents.go index 8bb09ebd4..9f65b95ff 100644 --- a/pkg/telemetry/telemetryserviceinternalevents.go +++ b/pkg/telemetry/telemetryserviceinternalevents.go @@ -54,19 +54,30 @@ func (t *telemetryServiceInternal) ParticipantJoined( ) { prometheus.IncrementParticipantJoin(1) + newWorker := newStatsWorker( + ctx, + t, + livekit.RoomID(room.Sid), + livekit.RoomName(room.Name), + livekit.ParticipantID(participant.Sid), + livekit.ParticipantIdentity(participant.Identity), + ) t.workersMu.Lock() - if _, ok := t.workers[livekit.ParticipantID(participant.Sid)]; !ok { - newWorker := newStatsWorker( - ctx, - t, - livekit.RoomID(room.Sid), - livekit.RoomName(room.Name), - livekit.ParticipantID(participant.Sid), - livekit.ParticipantIdentity(participant.Identity), - ) - t.workers[livekit.ParticipantID(participant.Sid)] = newWorker + var free = false + for idx, worker := range t.workers { + if worker != nil { + continue + } - t.shadowWorkersLocked() + free = true + t.workersIdx[livekit.ParticipantID(participant.Sid)] = idx + t.workers[idx] = newWorker + break + } + + if !free { + t.workersIdx[livekit.ParticipantID(participant.Sid)] = len(t.workers) + t.workers = append(t.workers, newWorker) } t.workersMu.Unlock() @@ -109,10 +120,9 @@ func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *li } t.workersMu.Lock() - if _, ok := t.workers[livekit.ParticipantID(participant.Sid)]; ok { - delete(t.workers, livekit.ParticipantID(participant.Sid)) - - t.shadowWorkersLocked() + if idx, ok := t.workersIdx[livekit.ParticipantID(participant.Sid)]; ok { + delete(t.workersIdx, livekit.ParticipantID(participant.Sid)) + t.workers[idx] = nil } t.workersMu.Unlock() @@ -330,10 +340,3 @@ func (t *telemetryServiceInternal) EgressEnded(ctx context.Context, info *liveki RoomId: info.RoomId, }) } - -func (t *telemetryServiceInternal) shadowWorkersLocked() { - t.workersShadow = make(map[livekit.ParticipantID]*StatsWorker, len(t.workers)) - for pID, worker := range t.workers { - t.workersShadow[pID] = worker - } -}