Files
livekit/pkg/telemetry/telemetryservice.go
Raja Subramanian b81bac0ec3
Some checks failed
Test / test (push) Failing after 17s
Release to Docker / docker (push) Failing after 3m42s
Key telemetry stats worker using combination of roomID, participantID (#4323)
* Key telemetry stats work using combination of roomID, participantID

With forwarded participant, the same participantID can existing in two
rooms.

NOTE: This does not yet allow a participant session to report its
events/track stats into multiple rooms. That would require regitering
multiple listeners (from rooms a participant is forwarded to).

* missed file

* data channel stats

* PR comments + pass in room name so that telemetry events have proper room name also
2026-02-16 13:56:13 +05:30

326 lines
15 KiB
Go

// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package telemetry
import (
"context"
"sync"
"time"
"github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/codecs/mime"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/webhook"
)
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
//counterfeiter:generate . TelemetryService
type TelemetryService interface {
// TrackStats is called periodically for each track in both directions (published/subscribed)
TrackStats(roomID livekit.RoomID, roomName livekit.RoomName, key StatsKey, stat *livekit.AnalyticsStat)
// events
RoomStarted(ctx context.Context, room *livekit.Room)
RoomEnded(ctx context.Context, room *livekit.Room)
// ParticipantJoined - a participant establishes signal connection to a room
ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta, shouldSendEvent bool, guard *ReferenceGuard)
// ParticipantActive - a participant establishes media connection
ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta, isMigration bool, guard *ReferenceGuard)
// ParticipantResumed - there has been an ICE restart or connection resume attempt, and we've received their signal connection
ParticipantResumed(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, nodeID livekit.NodeID, reason livekit.ReconnectReason)
// ParticipantLeft - the participant leaves the room, only sent if ParticipantActive has been called before
ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, shouldSendEvent bool, guard *ReferenceGuard)
// TrackPublishRequested - a publication attempt has been received
TrackPublishRequested(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo)
// TrackPublished - a publication attempt has been successful
TrackPublished(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool)
// TrackUnpublished - a participant unpublished a track
TrackUnpublished(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool)
// TrackSubscribeRequested - a participant requested to subscribe to a track
TrackSubscribeRequested(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo)
// TrackSubscribed - a participant subscribed to a track successfully
TrackSubscribed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, publisher *livekit.ParticipantInfo, shouldSendEvent bool)
// TrackUnsubscribed - a participant unsubscribed from a track successfully
TrackUnsubscribed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, shouldSendEvent bool)
// TrackSubscribeFailed - failure to subscribe to a track
TrackSubscribeFailed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, err error, isUserError bool)
// TrackMuted - the publisher has muted the Track
TrackMuted(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo)
// TrackUnmuted - the publisher has muted the Track
TrackUnmuted(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo)
// TrackPublishedUpdate - track metadata has been updated
TrackPublishedUpdate(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo)
// TrackMaxSubscribedVideoQuality - publisher is notified of the max quality subscribers desire
TrackMaxSubscribedVideoQuality(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, mime mime.MimeType, maxQuality livekit.VideoQuality)
TrackPublishRTPStats(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, mimeType mime.MimeType, layer int, stats *livekit.RTPStats)
TrackSubscribeRTPStats(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, mimeType mime.MimeType, stats *livekit.RTPStats)
EgressStarted(ctx context.Context, info *livekit.EgressInfo)
EgressUpdated(ctx context.Context, info *livekit.EgressInfo)
EgressEnded(ctx context.Context, info *livekit.EgressInfo)
IngressCreated(ctx context.Context, info *livekit.IngressInfo)
IngressDeleted(ctx context.Context, info *livekit.IngressInfo)
IngressStarted(ctx context.Context, info *livekit.IngressInfo)
IngressUpdated(ctx context.Context, info *livekit.IngressInfo)
IngressEnded(ctx context.Context, info *livekit.IngressInfo)
LocalRoomState(ctx context.Context, info *livekit.AnalyticsNodeRooms)
Report(ctx context.Context, reportInfo *livekit.ReportInfo)
APICall(ctx context.Context, apiCallInfo *livekit.APICallInfo)
Webhook(ctx context.Context, webhookInfo *livekit.WebhookInfo)
// helpers
AnalyticsService
NotifyEgressEvent(ctx context.Context, event string, info *livekit.EgressInfo)
FlushStats()
}
// -----------------------------
var _ TelemetryService = (*NullTelemetryService)(nil)
type NullTelemetryService struct {
NullAnalyticService
}
func (n NullTelemetryService) TrackStats(roomID livekit.RoomID, roomName livekit.RoomName, key StatsKey, stat *livekit.AnalyticsStat) {
}
func (n NullTelemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {}
func (n NullTelemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {}
func (n NullTelemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta, shouldSendEvent bool, guard *ReferenceGuard) {
}
func (n NullTelemetryService) ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta, isMigration bool, guard *ReferenceGuard) {
}
func (n NullTelemetryService) ParticipantResumed(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, nodeID livekit.NodeID, reason livekit.ReconnectReason) {
}
func (n NullTelemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, shouldSendEvent bool, guard *ReferenceGuard) {
}
func (n NullTelemetryService) TrackPublishRequested(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) {
}
func (n NullTelemetryService) TrackPublished(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool) {
}
func (n NullTelemetryService) TrackUnpublished(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool) {
}
func (n NullTelemetryService) TrackSubscribeRequested(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
}
func (n NullTelemetryService) TrackSubscribed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, publisher *livekit.ParticipantInfo, shouldSendEvent bool) {
}
func (n NullTelemetryService) TrackUnsubscribed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, shouldSendEvent bool) {
}
func (n NullTelemetryService) TrackSubscribeFailed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, err error, isUserError bool) {
}
func (n NullTelemetryService) TrackMuted(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
}
func (n NullTelemetryService) TrackUnmuted(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
}
func (n NullTelemetryService) TrackPublishedUpdate(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
}
func (n NullTelemetryService) TrackMaxSubscribedVideoQuality(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, mime mime.MimeType, maxQuality livekit.VideoQuality) {
}
func (n NullTelemetryService) TrackPublishRTPStats(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, mimeType mime.MimeType, layer int, stats *livekit.RTPStats) {
}
func (n NullTelemetryService) TrackSubscribeRTPStats(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, mimeType mime.MimeType, stats *livekit.RTPStats) {
}
func (n NullTelemetryService) EgressStarted(ctx context.Context, info *livekit.EgressInfo) {}
func (n NullTelemetryService) EgressUpdated(ctx context.Context, info *livekit.EgressInfo) {}
func (n NullTelemetryService) EgressEnded(ctx context.Context, info *livekit.EgressInfo) {}
func (n NullTelemetryService) IngressCreated(ctx context.Context, info *livekit.IngressInfo) {}
func (n NullTelemetryService) IngressDeleted(ctx context.Context, info *livekit.IngressInfo) {}
func (n NullTelemetryService) IngressStarted(ctx context.Context, info *livekit.IngressInfo) {}
func (n NullTelemetryService) IngressUpdated(ctx context.Context, info *livekit.IngressInfo) {}
func (n NullTelemetryService) IngressEnded(ctx context.Context, info *livekit.IngressInfo) {}
func (n NullTelemetryService) LocalRoomState(ctx context.Context, info *livekit.AnalyticsNodeRooms) {}
func (n NullTelemetryService) Report(ctx context.Context, reportInfo *livekit.ReportInfo) {}
func (n NullTelemetryService) APICall(ctx context.Context, apiCallInfo *livekit.APICallInfo) {}
func (n NullTelemetryService) Webhook(ctx context.Context, webhookInfo *livekit.WebhookInfo) {}
func (n NullTelemetryService) NotifyEgressEvent(ctx context.Context, event string, info *livekit.EgressInfo) {
}
func (n NullTelemetryService) FlushStats() {}
// -----------------------------
const (
workerCleanupWait = 3 * time.Minute
jobsQueueMinSize = 2048
telemetryStatsUpdateInterval = time.Second * 30
)
type statsWorkerKey struct {
roomID livekit.RoomID
participantID livekit.ParticipantID
}
type telemetryService struct {
AnalyticsService
notifier webhook.QueuedNotifier
jobsQueue *utils.OpsQueue
workersMu sync.RWMutex
workers map[statsWorkerKey]*StatsWorker
workerList *StatsWorker
flushMu sync.Mutex
}
func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsService) TelemetryService {
t := &telemetryService{
AnalyticsService: analytics,
notifier: notifier,
jobsQueue: utils.NewOpsQueue(utils.OpsQueueParams{
Name: "telemetry",
MinSize: jobsQueueMinSize,
FlushOnStop: true,
Logger: logger.GetLogger(),
}),
workers: make(map[statsWorkerKey]*StatsWorker),
}
if t.notifier != nil {
t.notifier.RegisterProcessedHook(func(ctx context.Context, whi *livekit.WebhookInfo) {
t.Webhook(ctx, whi)
})
}
t.jobsQueue.Start()
go t.run()
return t
}
func (t *telemetryService) FlushStats() {
t.flushMu.Lock()
defer t.flushMu.Unlock()
t.workersMu.RLock()
worker := t.workerList
t.workersMu.RUnlock()
now := time.Now()
var prev, reap *StatsWorker
for worker != nil {
next := worker.next
if closed := worker.Flush(now, workerCleanupWait); closed {
if prev == nil {
// this worker was at the head of the list
t.workersMu.Lock()
p := &t.workerList
for *p != worker {
// new workers have been added. scan until we find the one
// immediately before this
prev = *p
p = &prev.next
}
*p = worker.next
t.workersMu.Unlock()
} else {
prev.next = worker.next
}
worker.next = reap
reap = worker
} else {
prev = worker
}
worker = next
}
if reap != nil {
t.workersMu.Lock()
for reap != nil {
key := statsWorkerKey{reap.roomID, reap.participantID}
if reap == t.workers[key] {
delete(t.workers, key)
}
reap = reap.next
}
t.workersMu.Unlock()
}
}
func (t *telemetryService) run() {
for range time.Tick(telemetryStatsUpdateInterval) {
t.FlushStats()
}
}
func (t *telemetryService) enqueue(op func()) {
t.jobsQueue.Enqueue(op)
}
func (t *telemetryService) getWorker(roomID livekit.RoomID, participantID livekit.ParticipantID) (worker *StatsWorker, ok bool) {
t.workersMu.RLock()
defer t.workersMu.RUnlock()
worker, ok = t.workers[statsWorkerKey{roomID, participantID}]
return
}
func (t *telemetryService) getOrCreateWorker(
ctx context.Context,
roomID livekit.RoomID,
roomName livekit.RoomName,
participantID livekit.ParticipantID,
participantIdentity livekit.ParticipantIdentity,
guard *ReferenceGuard,
) (*StatsWorker, bool) {
t.workersMu.Lock()
defer t.workersMu.Unlock()
key := statsWorkerKey{roomID, participantID}
worker, ok := t.workers[key]
if ok && !worker.Closed(guard) {
return worker, true
}
existingIsConnected := false
if ok {
existingIsConnected = worker.IsConnected()
}
worker = newStatsWorker(
ctx,
t,
roomID,
roomName,
participantID,
participantIdentity,
guard,
)
if existingIsConnected {
worker.SetConnected()
}
t.workers[key] = worker
worker.next = t.workerList
t.workerList = worker
return worker, false
}
func (t *telemetryService) LocalRoomState(ctx context.Context, info *livekit.AnalyticsNodeRooms) {
t.enqueue(func() {
t.SendNodeRoomStates(ctx, info)
})
}