From 8dc5a899a9e59e02f97e3aa39f7f3f44462dfe5d Mon Sep 17 00:00:00 2001 From: shishirng Date: Thu, 29 Sep 2022 17:26:43 -0400 Subject: [PATCH] Create stats worker for participant on Active if not exists (#1059) on migration, participants don't send JOIN event, so create it in PARTICIPANT_ACTIVE event --- pkg/telemetry/events.go | 17 +++++++++++------ pkg/telemetry/telemetryservice.go | 16 ++++++++++++++++ 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index e44d46718..e33aa5f66 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -70,19 +70,14 @@ func (t *telemetryService) ParticipantJoined( prometheus.IncrementParticipantJoin(1) prometheus.AddParticipant() - worker := newStatsWorker( + t.createWorker( ctx, - t, livekit.RoomID(room.Sid), livekit.RoomName(room.Name), livekit.ParticipantID(participant.Sid), livekit.ParticipantIdentity(participant.Identity), ) - t.lock.Lock() - t.workers[livekit.ParticipantID(participant.Sid)] = worker - t.lock.Unlock() - t.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_PARTICIPANT_JOINED, Timestamp: timestamppb.Now(), @@ -110,6 +105,16 @@ func (t *telemetryService) ParticipantActive( Participant: participant, }) + if _, ok := t.getWorker(livekit.ParticipantID(participant.Sid)); !ok { + t.createWorker( + ctx, + livekit.RoomID(room.Sid), + livekit.RoomName(room.Name), + livekit.ParticipantID(participant.Sid), + livekit.ParticipantIdentity(participant.Identity), + ) + } + t.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_PARTICIPANT_ACTIVE, Timestamp: timestamppb.Now(), diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index cf585dba2..1b72e18f1 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -116,6 +116,22 @@ func (t *telemetryService) getWorker(participantID livekit.ParticipantID) (worke return } +func (t *telemetryService) createWorker(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, + participantID livekit.ParticipantID, participantIdentity livekit.ParticipantIdentity) { + worker := newStatsWorker( + ctx, + t, + roomID, + roomName, + participantID, + participantIdentity, + ) + + t.lock.Lock() + t.workers[participantID] = worker + t.lock.Unlock() +} + func (t *telemetryService) cleanupWorkers() { t.lock.Lock() defer t.lock.Unlock()