mirror of
https://github.com/livekit/livekit.git
synced 2026-06-07 08:51:56 +00:00
2dd5e63207
* telemetry: split webhook-processed hook registration out of NewTelemetryService NewTelemetryService used to register a notifier processed-hook on the inner *telemetryService directly. That made it impossible for downstream wrappers (e.g. cloud's TelemetryService that overrides Webhook to fan out to a v3 observability pipeline) to intercept webhook events without double-firing the legacy emission. Lift the registration into a new exported helper RegisterWebhookHook, and have the standalone server's wire provider createTelemetryService call it right after construction so behavior is unchanged for callers that don't wrap the service.
321 lines
15 KiB
Go
321 lines
15 KiB
Go
// Copyright 2023 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package telemetry
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/livekit/livekit-server/pkg/utils"
|
|
"github.com/livekit/protocol/codecs/mime"
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
"github.com/livekit/protocol/webhook"
|
|
)
|
|
|
|
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
|
|
|
|
//counterfeiter:generate . TelemetryService
|
|
type TelemetryService interface {
|
|
// TrackStats is called periodically for each track in both directions (published/subscribed)
|
|
TrackStats(roomID livekit.RoomID, roomName livekit.RoomName, key StatsKey, stat *livekit.AnalyticsStat)
|
|
|
|
// events
|
|
RoomStarted(ctx context.Context, room *livekit.Room)
|
|
RoomEnded(ctx context.Context, room *livekit.Room)
|
|
|
|
// ParticipantJoined - a participant establishes signal connection to a room
|
|
ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta, shouldSendEvent bool, guard *ReferenceGuard)
|
|
// ParticipantActive - a participant establishes media connection
|
|
ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta, isMigration bool, guard *ReferenceGuard)
|
|
// ParticipantResumed - there has been an ICE restart or connection resume attempt, and we've received their signal connection
|
|
ParticipantResumed(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, nodeID livekit.NodeID, reason livekit.ReconnectReason)
|
|
// ParticipantLeft - the participant leaves the room, only sent if ParticipantActive has been called before
|
|
ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, shouldSendEvent bool, guard *ReferenceGuard)
|
|
// TrackPublishRequested - a publication attempt has been received
|
|
TrackPublishRequested(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo)
|
|
// TrackPublished - a publication attempt has been successful
|
|
TrackPublished(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool)
|
|
// TrackUnpublished - a participant unpublished a track
|
|
TrackUnpublished(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool)
|
|
// TrackSubscribeRequested - a participant requested to subscribe to a track
|
|
TrackSubscribeRequested(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo)
|
|
// TrackSubscribed - a participant subscribed to a track successfully
|
|
TrackSubscribed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, publisher *livekit.ParticipantInfo, shouldSendEvent bool)
|
|
// TrackUnsubscribed - a participant unsubscribed from a track successfully
|
|
TrackUnsubscribed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, shouldSendEvent bool)
|
|
// TrackSubscribeFailed - failure to subscribe to a track
|
|
TrackSubscribeFailed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, err error, isUserError bool)
|
|
// TrackMuted - the publisher has muted the Track
|
|
TrackMuted(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo)
|
|
// TrackUnmuted - the publisher has muted the Track
|
|
TrackUnmuted(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo)
|
|
// TrackPublishedUpdate - track metadata has been updated
|
|
TrackPublishedUpdate(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo)
|
|
// TrackMaxSubscribedVideoQuality - publisher is notified of the max quality subscribers desire
|
|
TrackMaxSubscribedVideoQuality(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, mime mime.MimeType, maxQuality livekit.VideoQuality)
|
|
TrackPublishRTPStats(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, mimeType mime.MimeType, layer int, stats *livekit.RTPStats)
|
|
TrackSubscribeRTPStats(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, mimeType mime.MimeType, stats *livekit.RTPStats)
|
|
|
|
EgressStarted(ctx context.Context, info *livekit.EgressInfo)
|
|
EgressUpdated(ctx context.Context, info *livekit.EgressInfo)
|
|
EgressEnded(ctx context.Context, info *livekit.EgressInfo)
|
|
|
|
IngressCreated(ctx context.Context, info *livekit.IngressInfo)
|
|
IngressDeleted(ctx context.Context, info *livekit.IngressInfo)
|
|
IngressStarted(ctx context.Context, info *livekit.IngressInfo)
|
|
IngressUpdated(ctx context.Context, info *livekit.IngressInfo)
|
|
IngressEnded(ctx context.Context, info *livekit.IngressInfo)
|
|
|
|
LocalRoomState(ctx context.Context, info *livekit.AnalyticsNodeRooms)
|
|
|
|
Report(ctx context.Context, reportInfo *livekit.ReportInfo)
|
|
|
|
APICall(ctx context.Context, apiCallInfo *livekit.APICallInfo)
|
|
|
|
Webhook(ctx context.Context, webhookInfo *livekit.WebhookInfo)
|
|
|
|
// helpers
|
|
AnalyticsService
|
|
NotifyEgressEvent(ctx context.Context, event string, info *livekit.EgressInfo)
|
|
FlushStats()
|
|
}
|
|
|
|
// -----------------------------
|
|
|
|
var _ TelemetryService = (*NullTelemetryService)(nil)
|
|
|
|
type NullTelemetryService struct {
|
|
NullAnalyticService
|
|
}
|
|
|
|
func (n NullTelemetryService) TrackStats(roomID livekit.RoomID, roomName livekit.RoomName, key StatsKey, stat *livekit.AnalyticsStat) {
|
|
}
|
|
func (n NullTelemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {}
|
|
func (n NullTelemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {}
|
|
func (n NullTelemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta, shouldSendEvent bool, guard *ReferenceGuard) {
|
|
}
|
|
func (n NullTelemetryService) ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta, isMigration bool, guard *ReferenceGuard) {
|
|
}
|
|
func (n NullTelemetryService) ParticipantResumed(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, nodeID livekit.NodeID, reason livekit.ReconnectReason) {
|
|
}
|
|
func (n NullTelemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, shouldSendEvent bool, guard *ReferenceGuard) {
|
|
}
|
|
func (n NullTelemetryService) TrackPublishRequested(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) {
|
|
}
|
|
func (n NullTelemetryService) TrackPublished(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool) {
|
|
}
|
|
func (n NullTelemetryService) TrackUnpublished(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool) {
|
|
}
|
|
func (n NullTelemetryService) TrackSubscribeRequested(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
|
|
}
|
|
func (n NullTelemetryService) TrackSubscribed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, publisher *livekit.ParticipantInfo, shouldSendEvent bool) {
|
|
}
|
|
func (n NullTelemetryService) TrackUnsubscribed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, shouldSendEvent bool) {
|
|
}
|
|
func (n NullTelemetryService) TrackSubscribeFailed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, err error, isUserError bool) {
|
|
}
|
|
func (n NullTelemetryService) TrackMuted(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
|
|
}
|
|
func (n NullTelemetryService) TrackUnmuted(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
|
|
}
|
|
func (n NullTelemetryService) TrackPublishedUpdate(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
|
|
}
|
|
func (n NullTelemetryService) TrackMaxSubscribedVideoQuality(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, mime mime.MimeType, maxQuality livekit.VideoQuality) {
|
|
}
|
|
func (n NullTelemetryService) TrackPublishRTPStats(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, mimeType mime.MimeType, layer int, stats *livekit.RTPStats) {
|
|
}
|
|
func (n NullTelemetryService) TrackSubscribeRTPStats(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, mimeType mime.MimeType, stats *livekit.RTPStats) {
|
|
}
|
|
func (n NullTelemetryService) EgressStarted(ctx context.Context, info *livekit.EgressInfo) {}
|
|
func (n NullTelemetryService) EgressUpdated(ctx context.Context, info *livekit.EgressInfo) {}
|
|
func (n NullTelemetryService) EgressEnded(ctx context.Context, info *livekit.EgressInfo) {}
|
|
func (n NullTelemetryService) IngressCreated(ctx context.Context, info *livekit.IngressInfo) {}
|
|
func (n NullTelemetryService) IngressDeleted(ctx context.Context, info *livekit.IngressInfo) {}
|
|
func (n NullTelemetryService) IngressStarted(ctx context.Context, info *livekit.IngressInfo) {}
|
|
func (n NullTelemetryService) IngressUpdated(ctx context.Context, info *livekit.IngressInfo) {}
|
|
func (n NullTelemetryService) IngressEnded(ctx context.Context, info *livekit.IngressInfo) {}
|
|
func (n NullTelemetryService) LocalRoomState(ctx context.Context, info *livekit.AnalyticsNodeRooms) {}
|
|
func (n NullTelemetryService) Report(ctx context.Context, reportInfo *livekit.ReportInfo) {}
|
|
func (n NullTelemetryService) APICall(ctx context.Context, apiCallInfo *livekit.APICallInfo) {}
|
|
func (n NullTelemetryService) Webhook(ctx context.Context, webhookInfo *livekit.WebhookInfo) {}
|
|
func (n NullTelemetryService) NotifyEgressEvent(ctx context.Context, event string, info *livekit.EgressInfo) {
|
|
}
|
|
func (n NullTelemetryService) FlushStats() {}
|
|
|
|
// -----------------------------
|
|
|
|
const (
|
|
workerCleanupWait = 3 * time.Minute
|
|
jobsQueueMinSize = 2048
|
|
|
|
telemetryStatsUpdateInterval = time.Second * 30
|
|
)
|
|
|
|
type statsWorkerKey struct {
|
|
roomID livekit.RoomID
|
|
participantID livekit.ParticipantID
|
|
}
|
|
|
|
type telemetryService struct {
|
|
AnalyticsService
|
|
|
|
notifier webhook.QueuedNotifier
|
|
jobsQueue *utils.OpsQueue
|
|
|
|
workersMu sync.RWMutex
|
|
workers map[statsWorkerKey]*StatsWorker
|
|
workerList *StatsWorker
|
|
|
|
flushMu sync.Mutex
|
|
}
|
|
|
|
func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsService) TelemetryService {
|
|
t := &telemetryService{
|
|
AnalyticsService: analytics,
|
|
notifier: notifier,
|
|
jobsQueue: utils.NewOpsQueue(utils.OpsQueueParams{
|
|
Name: "telemetry",
|
|
MinSize: jobsQueueMinSize,
|
|
FlushOnStop: true,
|
|
Logger: logger.GetLogger(),
|
|
}),
|
|
workers: make(map[statsWorkerKey]*StatsWorker),
|
|
}
|
|
|
|
t.jobsQueue.Start()
|
|
go t.run()
|
|
|
|
return t
|
|
}
|
|
|
|
func (t *telemetryService) FlushStats() {
|
|
t.flushMu.Lock()
|
|
defer t.flushMu.Unlock()
|
|
|
|
t.workersMu.RLock()
|
|
worker := t.workerList
|
|
t.workersMu.RUnlock()
|
|
|
|
now := time.Now()
|
|
var prev, reap *StatsWorker
|
|
for worker != nil {
|
|
next := worker.next
|
|
if closed := worker.Flush(now, workerCleanupWait); closed {
|
|
if prev == nil {
|
|
// this worker was at the head of the list
|
|
t.workersMu.Lock()
|
|
p := &t.workerList
|
|
for *p != worker {
|
|
// new workers have been added. scan until we find the one
|
|
// immediately before this
|
|
prev = *p
|
|
p = &prev.next
|
|
}
|
|
*p = worker.next
|
|
t.workersMu.Unlock()
|
|
} else {
|
|
prev.next = worker.next
|
|
}
|
|
|
|
worker.next = reap
|
|
reap = worker
|
|
} else {
|
|
prev = worker
|
|
}
|
|
worker = next
|
|
}
|
|
|
|
if reap != nil {
|
|
t.workersMu.Lock()
|
|
for reap != nil {
|
|
key := statsWorkerKey{reap.roomID, reap.participantID}
|
|
if reap == t.workers[key] {
|
|
delete(t.workers, key)
|
|
}
|
|
reap = reap.next
|
|
}
|
|
t.workersMu.Unlock()
|
|
}
|
|
}
|
|
|
|
func (t *telemetryService) run() {
|
|
for range time.Tick(telemetryStatsUpdateInterval) {
|
|
t.FlushStats()
|
|
}
|
|
}
|
|
|
|
func (t *telemetryService) enqueue(op func()) {
|
|
t.jobsQueue.Enqueue(op)
|
|
}
|
|
|
|
func (t *telemetryService) getWorker(roomID livekit.RoomID, participantID livekit.ParticipantID) (worker *StatsWorker, ok bool) {
|
|
t.workersMu.RLock()
|
|
defer t.workersMu.RUnlock()
|
|
|
|
worker, ok = t.workers[statsWorkerKey{roomID, participantID}]
|
|
return
|
|
}
|
|
|
|
func (t *telemetryService) getOrCreateWorker(
|
|
ctx context.Context,
|
|
roomID livekit.RoomID,
|
|
roomName livekit.RoomName,
|
|
participantID livekit.ParticipantID,
|
|
participantIdentity livekit.ParticipantIdentity,
|
|
guard *ReferenceGuard,
|
|
) (*StatsWorker, bool) {
|
|
t.workersMu.Lock()
|
|
defer t.workersMu.Unlock()
|
|
|
|
key := statsWorkerKey{roomID, participantID}
|
|
worker, ok := t.workers[key]
|
|
if ok && !worker.Closed(guard) {
|
|
return worker, true
|
|
}
|
|
|
|
existingIsConnected := false
|
|
if ok {
|
|
existingIsConnected = worker.IsConnected()
|
|
}
|
|
|
|
worker = newStatsWorker(
|
|
ctx,
|
|
t,
|
|
roomID,
|
|
roomName,
|
|
participantID,
|
|
participantIdentity,
|
|
guard,
|
|
)
|
|
if existingIsConnected {
|
|
worker.SetConnected()
|
|
}
|
|
|
|
t.workers[key] = worker
|
|
|
|
worker.next = t.workerList
|
|
t.workerList = worker
|
|
|
|
return worker, false
|
|
}
|
|
|
|
func (t *telemetryService) LocalRoomState(ctx context.Context, info *livekit.AnalyticsNodeRooms) {
|
|
t.enqueue(func() {
|
|
t.SendNodeRoomStates(ctx, info)
|
|
})
|
|
}
|