Files
livekit/pkg/telemetry/events.go
Paul Wells c8bb2578be Rename log field "pID" to "participantID" for consistency (#4365)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 04:32:02 -07:00

650 lines
18 KiB
Go

// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package telemetry
import (
"context"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/codecs/mime"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/guid"
"github.com/livekit/protocol/webhook"
)
func (t *telemetryService) NotifyEvent(ctx context.Context, event *livekit.WebhookEvent, opts ...webhook.NotifyOption) {
if t.notifier == nil {
return
}
event.CreatedAt = time.Now().Unix()
event.Id = guid.New("EV_")
if err := t.notifier.QueueNotify(ctx, event, opts...); err != nil {
logger.Warnw("failed to notify webhook", err, "event", event.Event)
}
}
func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {
t.enqueue(func() {
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventRoomStarted,
Room: room,
})
t.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_ROOM_CREATED,
Timestamp: &timestamppb.Timestamp{Seconds: room.CreationTime},
Room: room,
})
})
}
func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
t.enqueue(func() {
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventRoomFinished,
Room: room,
})
t.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_ROOM_ENDED,
Timestamp: timestamppb.Now(),
RoomId: room.Sid,
Room: room,
})
})
}
func (t *telemetryService) ParticipantJoined(
ctx context.Context,
room *livekit.Room,
participant *livekit.ParticipantInfo,
clientInfo *livekit.ClientInfo,
clientMeta *livekit.AnalyticsClientMeta,
shouldSendEvent bool,
guard *ReferenceGuard,
) {
t.enqueue(func() {
_, found := t.getOrCreateWorker(
ctx,
livekit.RoomID(room.Sid),
livekit.RoomName(room.Name),
livekit.ParticipantID(participant.Sid),
livekit.ParticipantIdentity(participant.Identity),
guard,
)
if !found {
prometheus.IncrementParticipantRtcConnected(1)
prometheus.AddParticipant()
}
if shouldSendEvent {
ev := newParticipantEvent(livekit.AnalyticsEventType_PARTICIPANT_JOINED, room, participant)
ev.ClientInfo = clientInfo
ev.ClientMeta = clientMeta
t.SendEvent(ctx, ev)
}
})
}
func (t *telemetryService) ParticipantActive(
ctx context.Context,
room *livekit.Room,
participant *livekit.ParticipantInfo,
clientMeta *livekit.AnalyticsClientMeta,
isMigration bool,
guard *ReferenceGuard,
) {
t.enqueue(func() {
if !isMigration {
// a participant is considered "joined" only when they become "active"
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventParticipantJoined,
Room: room,
Participant: participant,
})
}
worker, found := t.getOrCreateWorker(
ctx,
livekit.RoomID(room.Sid),
livekit.RoomName(room.Name),
livekit.ParticipantID(participant.Sid),
livekit.ParticipantIdentity(participant.Identity),
guard,
)
if !found {
prometheus.AddParticipant()
}
worker.SetConnected()
ev := newParticipantEvent(livekit.AnalyticsEventType_PARTICIPANT_ACTIVE, room, participant)
ev.ClientMeta = clientMeta
t.SendEvent(ctx, ev)
})
}
func (t *telemetryService) ParticipantResumed(
ctx context.Context,
room *livekit.Room,
participant *livekit.ParticipantInfo,
nodeID livekit.NodeID,
reason livekit.ReconnectReason,
) {
t.enqueue(func() {
// create a worker if needed.
//
// Signalling channel stats collector and media channel stats collector could both call
// ParticipantJoined and ParticipantLeft.
//
// On a resume, the signalling channel collector would call `ParticipantLeft` which would close
// the corresponding participant's stats worker.
//
// So, on a successful resume, create the worker if needed.
_, found := t.getOrCreateWorker(
ctx,
livekit.RoomID(room.Sid),
livekit.RoomName(room.Name),
livekit.ParticipantID(participant.Sid),
livekit.ParticipantIdentity(participant.Identity),
nil,
)
if !found {
prometheus.AddParticipant()
}
ev := newParticipantEvent(livekit.AnalyticsEventType_PARTICIPANT_RESUMED, room, participant)
ev.ClientMeta = &livekit.AnalyticsClientMeta{
Node: string(nodeID),
ReconnectReason: reason,
}
t.SendEvent(ctx, ev)
})
}
func (t *telemetryService) ParticipantLeft(ctx context.Context,
room *livekit.Room,
participant *livekit.ParticipantInfo,
shouldSendEvent bool,
guard *ReferenceGuard,
) {
t.enqueue(func() {
isConnected := false
if worker, ok := t.getWorker(livekit.RoomID(room.Sid), livekit.ParticipantID(participant.Sid)); ok {
isConnected = worker.IsConnected()
if worker.Close(guard) {
prometheus.SubParticipant()
} else {
logger.Infow(
"stats worker active",
"room", room.Name,
"roomID", room.Sid,
"participant", participant.Identity,
"participantID", participant.Sid,
"worker", worker,
)
}
}
if shouldSendEvent {
webhookEvent := webhook.EventParticipantLeft
analyticsEvent := livekit.AnalyticsEventType_PARTICIPANT_LEFT
if !isConnected {
webhookEvent = webhook.EventParticipantConnectionAborted
analyticsEvent = livekit.AnalyticsEventType_PARTICIPANT_CONNECTION_ABORTED
}
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhookEvent,
Room: room,
Participant: participant,
})
t.SendEvent(ctx, newParticipantEvent(analyticsEvent, room, participant))
}
})
}
func (t *telemetryService) TrackPublishRequested(
ctx context.Context,
roomID livekit.RoomID,
roomName livekit.RoomName,
participantID livekit.ParticipantID,
identity livekit.ParticipantIdentity,
track *livekit.TrackInfo,
) {
t.enqueue(func() {
prometheus.RecordTrackPublishAttempt(track.Type.String())
room := toMinimalRoomProto(roomID, roomName)
ev := newTrackEvent(livekit.AnalyticsEventType_TRACK_PUBLISH_REQUESTED, room, participantID, track)
if ev.Participant != nil {
ev.Participant.Identity = string(identity)
}
t.SendEvent(ctx, ev)
})
}
func (t *telemetryService) TrackPublished(
ctx context.Context,
roomID livekit.RoomID,
roomName livekit.RoomName,
participantID livekit.ParticipantID,
identity livekit.ParticipantIdentity,
track *livekit.TrackInfo,
shouldSendEvent bool,
) {
t.enqueue(func() {
prometheus.AddPublishedTrack(track.Type.String())
prometheus.RecordTrackPublishSuccess(track.Type.String())
if !shouldSendEvent {
return
}
room := toMinimalRoomProto(roomID, roomName)
participant := &livekit.ParticipantInfo{
Sid: string(participantID),
Identity: string(identity),
}
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventTrackPublished,
Room: room,
Participant: participant,
Track: track,
})
ev := newTrackEvent(livekit.AnalyticsEventType_TRACK_PUBLISHED, room, participantID, track)
ev.Participant = participant
t.SendEvent(ctx, ev)
})
}
func (t *telemetryService) TrackPublishedUpdate(
ctx context.Context,
roomID livekit.RoomID,
roomName livekit.RoomName,
participantID livekit.ParticipantID,
track *livekit.TrackInfo,
) {
t.enqueue(func() {
room := toMinimalRoomProto(roomID, roomName)
t.SendEvent(ctx, newTrackEvent(livekit.AnalyticsEventType_TRACK_PUBLISHED_UPDATE, room, participantID, track))
})
}
func (t *telemetryService) TrackMaxSubscribedVideoQuality(
ctx context.Context,
roomID livekit.RoomID,
roomName livekit.RoomName,
participantID livekit.ParticipantID,
track *livekit.TrackInfo,
mime mime.MimeType,
maxQuality livekit.VideoQuality,
) {
t.enqueue(func() {
room := toMinimalRoomProto(roomID, roomName)
ev := newTrackEvent(livekit.AnalyticsEventType_TRACK_MAX_SUBSCRIBED_VIDEO_QUALITY, room, participantID, track)
ev.MaxSubscribedVideoQuality = maxQuality
ev.Mime = mime.String()
t.SendEvent(ctx, ev)
})
}
func (t *telemetryService) TrackSubscribeRequested(
ctx context.Context,
roomID livekit.RoomID,
roomName livekit.RoomName,
participantID livekit.ParticipantID,
track *livekit.TrackInfo,
) {
t.enqueue(func() {
prometheus.RecordTrackSubscribeAttempt()
room := toMinimalRoomProto(roomID, roomName)
ev := newTrackEvent(livekit.AnalyticsEventType_TRACK_SUBSCRIBE_REQUESTED, room, participantID, track)
t.SendEvent(ctx, ev)
})
}
func (t *telemetryService) TrackSubscribed(
ctx context.Context,
roomID livekit.RoomID,
roomName livekit.RoomName,
participantID livekit.ParticipantID,
track *livekit.TrackInfo,
publisher *livekit.ParticipantInfo,
shouldSendEvent bool,
) {
t.enqueue(func() {
prometheus.RecordTrackSubscribeSuccess(track.Type.String())
if !shouldSendEvent {
return
}
room := toMinimalRoomProto(roomID, roomName)
ev := newTrackEvent(livekit.AnalyticsEventType_TRACK_SUBSCRIBED, room, participantID, track)
ev.Publisher = publisher
t.SendEvent(ctx, ev)
})
}
func (t *telemetryService) TrackSubscribeFailed(
ctx context.Context,
roomID livekit.RoomID,
roomName livekit.RoomName,
participantID livekit.ParticipantID,
trackID livekit.TrackID,
err error,
isUserError bool,
) {
t.enqueue(func() {
prometheus.RecordTrackSubscribeFailure(err, isUserError)
room := toMinimalRoomProto(roomID, roomName)
ev := newTrackEvent(livekit.AnalyticsEventType_TRACK_SUBSCRIBE_FAILED, room, participantID, &livekit.TrackInfo{
Sid: string(trackID),
})
ev.Error = err.Error()
t.SendEvent(ctx, ev)
})
}
func (t *telemetryService) TrackUnsubscribed(
ctx context.Context,
roomID livekit.RoomID,
roomName livekit.RoomName,
participantID livekit.ParticipantID,
track *livekit.TrackInfo,
shouldSendEvent bool,
) {
t.enqueue(func() {
prometheus.RecordTrackUnsubscribed(track.Type.String())
if shouldSendEvent {
room := toMinimalRoomProto(roomID, roomName)
t.SendEvent(ctx, newTrackEvent(livekit.AnalyticsEventType_TRACK_UNSUBSCRIBED, room, participantID, track))
}
})
}
func (t *telemetryService) TrackUnpublished(
ctx context.Context,
roomID livekit.RoomID,
roomName livekit.RoomName,
participantID livekit.ParticipantID,
identity livekit.ParticipantIdentity,
track *livekit.TrackInfo,
shouldSendEvent bool,
) {
t.enqueue(func() {
prometheus.SubPublishedTrack(track.Type.String())
if !shouldSendEvent {
return
}
room := toMinimalRoomProto(roomID, roomName)
participant := &livekit.ParticipantInfo{
Sid: string(participantID),
Identity: string(identity),
}
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventTrackUnpublished,
Room: room,
Participant: participant,
Track: track,
})
t.SendEvent(ctx, newTrackEvent(livekit.AnalyticsEventType_TRACK_UNPUBLISHED, room, participantID, track))
})
}
func (t *telemetryService) TrackMuted(
ctx context.Context,
roomID livekit.RoomID,
roomName livekit.RoomName,
participantID livekit.ParticipantID,
track *livekit.TrackInfo,
) {
t.enqueue(func() {
room := toMinimalRoomProto(roomID, roomName)
t.SendEvent(ctx, newTrackEvent(livekit.AnalyticsEventType_TRACK_MUTED, room, participantID, track))
})
}
func (t *telemetryService) TrackUnmuted(
ctx context.Context,
roomID livekit.RoomID,
roomName livekit.RoomName,
participantID livekit.ParticipantID,
track *livekit.TrackInfo,
) {
t.enqueue(func() {
room := toMinimalRoomProto(roomID, roomName)
t.SendEvent(ctx, newTrackEvent(livekit.AnalyticsEventType_TRACK_UNMUTED, room, participantID, track))
})
}
func (t *telemetryService) TrackPublishRTPStats(
ctx context.Context,
roomID livekit.RoomID,
roomName livekit.RoomName,
participantID livekit.ParticipantID,
trackID livekit.TrackID,
mimeType mime.MimeType,
layer int,
stats *livekit.RTPStats,
) {
t.enqueue(func() {
room := toMinimalRoomProto(roomID, roomName)
ev := newRoomEvent(livekit.AnalyticsEventType_TRACK_PUBLISH_STATS, room)
ev.ParticipantId = string(participantID)
ev.TrackId = string(trackID)
ev.Mime = mimeType.String()
ev.VideoLayer = int32(layer)
ev.RtpStats = stats
t.SendEvent(ctx, ev)
})
}
func (t *telemetryService) TrackSubscribeRTPStats(
ctx context.Context,
roomID livekit.RoomID,
roomName livekit.RoomName,
participantID livekit.ParticipantID,
trackID livekit.TrackID,
mimeType mime.MimeType,
stats *livekit.RTPStats,
) {
t.enqueue(func() {
room := toMinimalRoomProto(roomID, roomName)
ev := newRoomEvent(livekit.AnalyticsEventType_TRACK_SUBSCRIBE_STATS, room)
ev.ParticipantId = string(participantID)
ev.TrackId = string(trackID)
ev.Mime = mimeType.String()
ev.RtpStats = stats
t.SendEvent(ctx, ev)
})
}
func (t *telemetryService) NotifyEgressEvent(ctx context.Context, event string, info *livekit.EgressInfo) {
opts := egress.GetEgressNotifyOptions(info)
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: event,
EgressInfo: info,
}, opts...)
}
func (t *telemetryService) EgressStarted(ctx context.Context, info *livekit.EgressInfo) {
t.enqueue(func() {
t.NotifyEgressEvent(ctx, webhook.EventEgressStarted, info)
t.SendEvent(ctx, newEgressEvent(livekit.AnalyticsEventType_EGRESS_STARTED, info))
})
}
func (t *telemetryService) EgressUpdated(ctx context.Context, info *livekit.EgressInfo) {
t.enqueue(func() {
t.NotifyEgressEvent(ctx, webhook.EventEgressUpdated, info)
t.SendEvent(ctx, newEgressEvent(livekit.AnalyticsEventType_EGRESS_UPDATED, info))
})
}
func (t *telemetryService) EgressEnded(ctx context.Context, info *livekit.EgressInfo) {
t.enqueue(func() {
t.NotifyEgressEvent(ctx, webhook.EventEgressEnded, info)
t.SendEvent(ctx, newEgressEvent(livekit.AnalyticsEventType_EGRESS_ENDED, info))
})
}
func (t *telemetryService) IngressCreated(ctx context.Context, info *livekit.IngressInfo) {
t.enqueue(func() {
t.SendEvent(ctx, newIngressEvent(livekit.AnalyticsEventType_INGRESS_CREATED, info))
})
}
func (t *telemetryService) IngressDeleted(ctx context.Context, info *livekit.IngressInfo) {
t.enqueue(func() {
t.SendEvent(ctx, newIngressEvent(livekit.AnalyticsEventType_INGRESS_DELETED, info))
})
}
func (t *telemetryService) IngressStarted(ctx context.Context, info *livekit.IngressInfo) {
t.enqueue(func() {
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventIngressStarted,
IngressInfo: info,
})
t.SendEvent(ctx, newIngressEvent(livekit.AnalyticsEventType_INGRESS_STARTED, info))
})
}
func (t *telemetryService) IngressUpdated(ctx context.Context, info *livekit.IngressInfo) {
t.enqueue(func() {
t.SendEvent(ctx, newIngressEvent(livekit.AnalyticsEventType_INGRESS_UPDATED, info))
})
}
func (t *telemetryService) IngressEnded(ctx context.Context, info *livekit.IngressInfo) {
t.enqueue(func() {
t.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventIngressEnded,
IngressInfo: info,
})
t.SendEvent(ctx, newIngressEvent(livekit.AnalyticsEventType_INGRESS_ENDED, info))
})
}
func (t *telemetryService) Report(ctx context.Context, reportInfo *livekit.ReportInfo) {
t.enqueue(func() {
ev := &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_REPORT,
Timestamp: timestamppb.Now(),
Report: reportInfo,
}
t.SendEvent(ctx, ev)
})
}
func (t *telemetryService) APICall(ctx context.Context, apiCallInfo *livekit.APICallInfo) {
t.enqueue(func() {
ev := &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_API_CALL,
Timestamp: timestamppb.Now(),
ApiCall: apiCallInfo,
}
t.SendEvent(ctx, ev)
})
}
func (t *telemetryService) Webhook(ctx context.Context, webhookInfo *livekit.WebhookInfo) {
t.enqueue(func() {
ev := &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_WEBHOOK,
Timestamp: timestamppb.Now(),
Webhook: webhookInfo,
}
t.SendEvent(ctx, ev)
})
}
func newRoomEvent(event livekit.AnalyticsEventType, room *livekit.Room) *livekit.AnalyticsEvent {
ev := &livekit.AnalyticsEvent{
Type: event,
Timestamp: timestamppb.Now(),
}
if room != nil {
ev.Room = room
ev.RoomId = room.Sid
}
return ev
}
func newParticipantEvent(event livekit.AnalyticsEventType, room *livekit.Room, participant *livekit.ParticipantInfo) *livekit.AnalyticsEvent {
ev := newRoomEvent(event, room)
if participant != nil {
ev.ParticipantId = participant.Sid
ev.Participant = participant
}
return ev
}
func newTrackEvent(event livekit.AnalyticsEventType, room *livekit.Room, participantID livekit.ParticipantID, track *livekit.TrackInfo) *livekit.AnalyticsEvent {
ev := newParticipantEvent(event, room, &livekit.ParticipantInfo{
Sid: string(participantID),
})
if track != nil {
ev.TrackId = track.Sid
ev.Track = track
}
return ev
}
func newEgressEvent(event livekit.AnalyticsEventType, egress *livekit.EgressInfo) *livekit.AnalyticsEvent {
return &livekit.AnalyticsEvent{
Type: event,
Timestamp: timestamppb.Now(),
EgressId: egress.EgressId,
RoomId: egress.RoomId,
Egress: egress,
}
}
func newIngressEvent(event livekit.AnalyticsEventType, ingress *livekit.IngressInfo) *livekit.AnalyticsEvent {
return &livekit.AnalyticsEvent{
Type: event,
Timestamp: timestamppb.Now(),
IngressId: ingress.IngressId,
Ingress: ingress,
}
}
func toMinimalRoomProto(roomID livekit.RoomID, roomName livekit.RoomName) *livekit.Room {
return &livekit.Room{
Sid: string(roomID),
Name: string(roomName),
}
}