Revert "Using shadow pattern for stats workers (#742)" (#744)

This reverts commit 2b561d2bad.
This commit is contained in:
Raja Subramanian
2022-05-31 11:06:44 +05:30
committed by GitHub
parent f19815754c
commit 4ed9b5f90e
2 changed files with 35 additions and 30 deletions

View File

@@ -28,9 +28,9 @@ type telemetryServiceInternal struct {
webhookPool *workerpool.WorkerPool
// one worker per participant
workersMu sync.RWMutex
workers map[livekit.ParticipantID]*StatsWorker
workersShadow map[livekit.ParticipantID]*StatsWorker
workersMu sync.RWMutex
workers []*StatsWorker
workersIdx map[livekit.ParticipantID]int
analytics AnalyticsService
}
@@ -39,7 +39,7 @@ func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsS
return &telemetryServiceInternal{
notifier: notifier,
webhookPool: workerpool.New(maxWebhookWorkers),
workers: make(map[livekit.ParticipantID]*StatsWorker),
workersIdx: make(map[livekit.ParticipantID]int),
analytics: analytics,
}
}
@@ -93,11 +93,13 @@ func (t *telemetryServiceInternal) Report(ctx context.Context, stats []*livekit.
func (t *telemetryServiceInternal) SendAnalytics() {
t.workersMu.RLock()
workers := t.workersShadow
workers := t.workers
t.workersMu.RUnlock()
for _, worker := range workers {
worker.Update()
if worker != nil {
worker.Update()
}
}
}
@@ -105,8 +107,8 @@ func (t *telemetryServiceInternal) getStatsWorker(participantID livekit.Particip
t.workersMu.RLock()
defer t.workersMu.RUnlock()
if worker, ok := t.workersShadow[participantID]; ok {
return worker
if idx, ok := t.workersIdx[participantID]; ok {
return t.workers[idx]
}
return nil

View File

@@ -54,19 +54,30 @@ func (t *telemetryServiceInternal) ParticipantJoined(
) {
prometheus.IncrementParticipantJoin(1)
newWorker := newStatsWorker(
ctx,
t,
livekit.RoomID(room.Sid),
livekit.RoomName(room.Name),
livekit.ParticipantID(participant.Sid),
livekit.ParticipantIdentity(participant.Identity),
)
t.workersMu.Lock()
if _, ok := t.workers[livekit.ParticipantID(participant.Sid)]; !ok {
newWorker := newStatsWorker(
ctx,
t,
livekit.RoomID(room.Sid),
livekit.RoomName(room.Name),
livekit.ParticipantID(participant.Sid),
livekit.ParticipantIdentity(participant.Identity),
)
t.workers[livekit.ParticipantID(participant.Sid)] = newWorker
var free = false
for idx, worker := range t.workers {
if worker != nil {
continue
}
t.shadowWorkersLocked()
free = true
t.workersIdx[livekit.ParticipantID(participant.Sid)] = idx
t.workers[idx] = newWorker
break
}
if !free {
t.workersIdx[livekit.ParticipantID(participant.Sid)] = len(t.workers)
t.workers = append(t.workers, newWorker)
}
t.workersMu.Unlock()
@@ -109,10 +120,9 @@ func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *li
}
t.workersMu.Lock()
if _, ok := t.workers[livekit.ParticipantID(participant.Sid)]; ok {
delete(t.workers, livekit.ParticipantID(participant.Sid))
t.shadowWorkersLocked()
if idx, ok := t.workersIdx[livekit.ParticipantID(participant.Sid)]; ok {
delete(t.workersIdx, livekit.ParticipantID(participant.Sid))
t.workers[idx] = nil
}
t.workersMu.Unlock()
@@ -330,10 +340,3 @@ func (t *telemetryServiceInternal) EgressEnded(ctx context.Context, info *liveki
RoomId: info.RoomId,
})
}
func (t *telemetryServiceInternal) shadowWorkersLocked() {
t.workersShadow = make(map[livekit.ParticipantID]*StatsWorker, len(t.workers))
for pID, worker := range t.workers {
t.workersShadow[pID] = worker
}
}