diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index e44d46718..e33aa5f66 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -70,19 +70,14 @@ func (t *telemetryService) ParticipantJoined( prometheus.IncrementParticipantJoin(1) prometheus.AddParticipant() - worker := newStatsWorker( + t.createWorker( ctx, - t, livekit.RoomID(room.Sid), livekit.RoomName(room.Name), livekit.ParticipantID(participant.Sid), livekit.ParticipantIdentity(participant.Identity), ) - t.lock.Lock() - t.workers[livekit.ParticipantID(participant.Sid)] = worker - t.lock.Unlock() - t.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_PARTICIPANT_JOINED, Timestamp: timestamppb.Now(), @@ -110,6 +105,16 @@ func (t *telemetryService) ParticipantActive( Participant: participant, }) + if _, ok := t.getWorker(livekit.ParticipantID(participant.Sid)); !ok { + t.createWorker( + ctx, + livekit.RoomID(room.Sid), + livekit.RoomName(room.Name), + livekit.ParticipantID(participant.Sid), + livekit.ParticipantIdentity(participant.Identity), + ) + } + t.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_PARTICIPANT_ACTIVE, Timestamp: timestamppb.Now(), diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index cf585dba2..1b72e18f1 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -116,6 +116,22 @@ func (t *telemetryService) getWorker(participantID livekit.ParticipantID) (worke return } +func (t *telemetryService) createWorker(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, + participantID livekit.ParticipantID, participantIdentity livekit.ParticipantIdentity) { + worker := newStatsWorker( + ctx, + t, + roomID, + roomName, + participantID, + participantIdentity, + ) + + t.lock.Lock() + t.workers[participantID] = worker + t.lock.Unlock() +} + func (t *telemetryService) cleanupWorkers() { t.lock.Lock() defer t.lock.Unlock()