mirror of
https://github.com/livekit/livekit.git
synced 2026-05-15 00:55:32 +00:00
Create stats worker for participant on Active if not exists (#1059)
on migration, participants don't send JOIN event, so create it in PARTICIPANT_ACTIVE event
This commit is contained in:
+11
-6
@@ -70,19 +70,14 @@ func (t *telemetryService) ParticipantJoined(
|
||||
prometheus.IncrementParticipantJoin(1)
|
||||
prometheus.AddParticipant()
|
||||
|
||||
worker := newStatsWorker(
|
||||
t.createWorker(
|
||||
ctx,
|
||||
t,
|
||||
livekit.RoomID(room.Sid),
|
||||
livekit.RoomName(room.Name),
|
||||
livekit.ParticipantID(participant.Sid),
|
||||
livekit.ParticipantIdentity(participant.Identity),
|
||||
)
|
||||
|
||||
t.lock.Lock()
|
||||
t.workers[livekit.ParticipantID(participant.Sid)] = worker
|
||||
t.lock.Unlock()
|
||||
|
||||
t.SendEvent(ctx, &livekit.AnalyticsEvent{
|
||||
Type: livekit.AnalyticsEventType_PARTICIPANT_JOINED,
|
||||
Timestamp: timestamppb.Now(),
|
||||
@@ -110,6 +105,16 @@ func (t *telemetryService) ParticipantActive(
|
||||
Participant: participant,
|
||||
})
|
||||
|
||||
if _, ok := t.getWorker(livekit.ParticipantID(participant.Sid)); !ok {
|
||||
t.createWorker(
|
||||
ctx,
|
||||
livekit.RoomID(room.Sid),
|
||||
livekit.RoomName(room.Name),
|
||||
livekit.ParticipantID(participant.Sid),
|
||||
livekit.ParticipantIdentity(participant.Identity),
|
||||
)
|
||||
}
|
||||
|
||||
t.SendEvent(ctx, &livekit.AnalyticsEvent{
|
||||
Type: livekit.AnalyticsEventType_PARTICIPANT_ACTIVE,
|
||||
Timestamp: timestamppb.Now(),
|
||||
|
||||
@@ -116,6 +116,22 @@ func (t *telemetryService) getWorker(participantID livekit.ParticipantID) (worke
|
||||
return
|
||||
}
|
||||
|
||||
func (t *telemetryService) createWorker(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName,
|
||||
participantID livekit.ParticipantID, participantIdentity livekit.ParticipantIdentity) {
|
||||
worker := newStatsWorker(
|
||||
ctx,
|
||||
t,
|
||||
roomID,
|
||||
roomName,
|
||||
participantID,
|
||||
participantIdentity,
|
||||
)
|
||||
|
||||
t.lock.Lock()
|
||||
t.workers[participantID] = worker
|
||||
t.lock.Unlock()
|
||||
}
|
||||
|
||||
func (t *telemetryService) cleanupWorkers() {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
Reference in New Issue
Block a user