Using shadow pattern for stats workers (#742)

This commit is contained in:
Raja Subramanian
2022-05-31 10:32:54 +05:30
committed by GitHub
parent 508aa471a9
commit 2b561d2bad
2 changed files with 30 additions and 35 deletions
+8 -10
View File
@@ -28,9 +28,9 @@ type telemetryServiceInternal struct {
webhookPool *workerpool.WorkerPool
// one worker per participant
workersMu sync.RWMutex
workers []*StatsWorker
workersIdx map[livekit.ParticipantID]int
workersMu sync.RWMutex
workers map[livekit.ParticipantID]*StatsWorker
workersShadow map[livekit.ParticipantID]*StatsWorker
analytics AnalyticsService
}
@@ -39,7 +39,7 @@ func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsS
return &telemetryServiceInternal{
notifier: notifier,
webhookPool: workerpool.New(maxWebhookWorkers),
workersIdx: make(map[livekit.ParticipantID]int),
workers: make(map[livekit.ParticipantID]*StatsWorker),
analytics: analytics,
}
}
@@ -93,13 +93,11 @@ func (t *telemetryServiceInternal) Report(ctx context.Context, stats []*livekit.
func (t *telemetryServiceInternal) SendAnalytics() {
t.workersMu.RLock()
workers := t.workers
workers := t.workersShadow
t.workersMu.RUnlock()
for _, worker := range workers {
if worker != nil {
worker.Update()
}
worker.Update()
}
}
@@ -107,8 +105,8 @@ func (t *telemetryServiceInternal) getStatsWorker(participantID livekit.Particip
t.workersMu.RLock()
defer t.workersMu.RUnlock()
if idx, ok := t.workersIdx[participantID]; ok {
return t.workers[idx]
if worker, ok := t.workersShadow[participantID]; ok {
return worker
}
return nil
+22 -25
View File
@@ -54,30 +54,19 @@ func (t *telemetryServiceInternal) ParticipantJoined(
) {
prometheus.IncrementParticipantJoin(1)
newWorker := newStatsWorker(
ctx,
t,
livekit.RoomID(room.Sid),
livekit.RoomName(room.Name),
livekit.ParticipantID(participant.Sid),
livekit.ParticipantIdentity(participant.Identity),
)
t.workersMu.Lock()
var free = false
for idx, worker := range t.workers {
if worker != nil {
continue
}
if _, ok := t.workers[livekit.ParticipantID(participant.Sid)]; !ok {
newWorker := newStatsWorker(
ctx,
t,
livekit.RoomID(room.Sid),
livekit.RoomName(room.Name),
livekit.ParticipantID(participant.Sid),
livekit.ParticipantIdentity(participant.Identity),
)
t.workers[livekit.ParticipantID(participant.Sid)] = newWorker
free = true
t.workersIdx[livekit.ParticipantID(participant.Sid)] = idx
t.workers[idx] = newWorker
break
}
if !free {
t.workersIdx[livekit.ParticipantID(participant.Sid)] = len(t.workers)
t.workers = append(t.workers, newWorker)
t.shadowWorkersLocked()
}
t.workersMu.Unlock()
@@ -120,9 +109,10 @@ func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *li
}
t.workersMu.Lock()
if idx, ok := t.workersIdx[livekit.ParticipantID(participant.Sid)]; ok {
delete(t.workersIdx, livekit.ParticipantID(participant.Sid))
t.workers[idx] = nil
if _, ok := t.workers[livekit.ParticipantID(participant.Sid)]; ok {
delete(t.workers, livekit.ParticipantID(participant.Sid))
t.shadowWorkersLocked()
}
t.workersMu.Unlock()
@@ -340,3 +330,10 @@ func (t *telemetryServiceInternal) EgressEnded(ctx context.Context, info *liveki
RoomId: info.RoomId,
})
}
func (t *telemetryServiceInternal) shadowWorkersLocked() {
t.workersShadow = make(map[livekit.ParticipantID]*StatsWorker, len(t.workers))
for pID, worker := range t.workers {
t.workersShadow[pID] = worker
}
}