mirror of
https://github.com/livekit/livekit.git
synced 2026-03-29 09:19:53 +00:00
Some checks failed
Test / test (push) Failing after 17s
Release to Docker / docker (push) Failing after 3m42s
* Key telemetry stats work using combination of roomID, participantID With forwarded participant, the same participantID can existing in two rooms. NOTE: This does not yet allow a participant session to report its events/track stats into multiple rooms. That would require regitering multiple listeners (from rooms a participant is forwarded to). * missed file * data channel stats * PR comments + pass in room name so that telemetry events have proper room name also
326 lines
15 KiB
Go
326 lines
15 KiB
Go
// Copyright 2023 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package telemetry
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/livekit/livekit-server/pkg/utils"
|
|
"github.com/livekit/protocol/codecs/mime"
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
"github.com/livekit/protocol/webhook"
|
|
)
|
|
|
|
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
|
|
|
|
//counterfeiter:generate . TelemetryService
|
|
type TelemetryService interface {
|
|
// TrackStats is called periodically for each track in both directions (published/subscribed)
|
|
TrackStats(roomID livekit.RoomID, roomName livekit.RoomName, key StatsKey, stat *livekit.AnalyticsStat)
|
|
|
|
// events
|
|
RoomStarted(ctx context.Context, room *livekit.Room)
|
|
RoomEnded(ctx context.Context, room *livekit.Room)
|
|
|
|
// ParticipantJoined - a participant establishes signal connection to a room
|
|
ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta, shouldSendEvent bool, guard *ReferenceGuard)
|
|
// ParticipantActive - a participant establishes media connection
|
|
ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta, isMigration bool, guard *ReferenceGuard)
|
|
// ParticipantResumed - there has been an ICE restart or connection resume attempt, and we've received their signal connection
|
|
ParticipantResumed(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, nodeID livekit.NodeID, reason livekit.ReconnectReason)
|
|
// ParticipantLeft - the participant leaves the room, only sent if ParticipantActive has been called before
|
|
ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, shouldSendEvent bool, guard *ReferenceGuard)
|
|
// TrackPublishRequested - a publication attempt has been received
|
|
TrackPublishRequested(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo)
|
|
// TrackPublished - a publication attempt has been successful
|
|
TrackPublished(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool)
|
|
// TrackUnpublished - a participant unpublished a track
|
|
TrackUnpublished(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool)
|
|
// TrackSubscribeRequested - a participant requested to subscribe to a track
|
|
TrackSubscribeRequested(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo)
|
|
// TrackSubscribed - a participant subscribed to a track successfully
|
|
TrackSubscribed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, publisher *livekit.ParticipantInfo, shouldSendEvent bool)
|
|
// TrackUnsubscribed - a participant unsubscribed from a track successfully
|
|
TrackUnsubscribed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, shouldSendEvent bool)
|
|
// TrackSubscribeFailed - failure to subscribe to a track
|
|
TrackSubscribeFailed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, err error, isUserError bool)
|
|
// TrackMuted - the publisher has muted the Track
|
|
TrackMuted(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo)
|
|
// TrackUnmuted - the publisher has muted the Track
|
|
TrackUnmuted(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo)
|
|
// TrackPublishedUpdate - track metadata has been updated
|
|
TrackPublishedUpdate(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo)
|
|
// TrackMaxSubscribedVideoQuality - publisher is notified of the max quality subscribers desire
|
|
TrackMaxSubscribedVideoQuality(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, mime mime.MimeType, maxQuality livekit.VideoQuality)
|
|
TrackPublishRTPStats(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, mimeType mime.MimeType, layer int, stats *livekit.RTPStats)
|
|
TrackSubscribeRTPStats(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, mimeType mime.MimeType, stats *livekit.RTPStats)
|
|
|
|
EgressStarted(ctx context.Context, info *livekit.EgressInfo)
|
|
EgressUpdated(ctx context.Context, info *livekit.EgressInfo)
|
|
EgressEnded(ctx context.Context, info *livekit.EgressInfo)
|
|
|
|
IngressCreated(ctx context.Context, info *livekit.IngressInfo)
|
|
IngressDeleted(ctx context.Context, info *livekit.IngressInfo)
|
|
IngressStarted(ctx context.Context, info *livekit.IngressInfo)
|
|
IngressUpdated(ctx context.Context, info *livekit.IngressInfo)
|
|
IngressEnded(ctx context.Context, info *livekit.IngressInfo)
|
|
|
|
LocalRoomState(ctx context.Context, info *livekit.AnalyticsNodeRooms)
|
|
|
|
Report(ctx context.Context, reportInfo *livekit.ReportInfo)
|
|
|
|
APICall(ctx context.Context, apiCallInfo *livekit.APICallInfo)
|
|
|
|
Webhook(ctx context.Context, webhookInfo *livekit.WebhookInfo)
|
|
|
|
// helpers
|
|
AnalyticsService
|
|
NotifyEgressEvent(ctx context.Context, event string, info *livekit.EgressInfo)
|
|
FlushStats()
|
|
}
|
|
|
|
// -----------------------------
|
|
|
|
var _ TelemetryService = (*NullTelemetryService)(nil)
|
|
|
|
type NullTelemetryService struct {
|
|
NullAnalyticService
|
|
}
|
|
|
|
func (n NullTelemetryService) TrackStats(roomID livekit.RoomID, roomName livekit.RoomName, key StatsKey, stat *livekit.AnalyticsStat) {
|
|
}
|
|
func (n NullTelemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {}
|
|
func (n NullTelemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {}
|
|
func (n NullTelemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta, shouldSendEvent bool, guard *ReferenceGuard) {
|
|
}
|
|
func (n NullTelemetryService) ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta, isMigration bool, guard *ReferenceGuard) {
|
|
}
|
|
func (n NullTelemetryService) ParticipantResumed(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, nodeID livekit.NodeID, reason livekit.ReconnectReason) {
|
|
}
|
|
func (n NullTelemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, shouldSendEvent bool, guard *ReferenceGuard) {
|
|
}
|
|
func (n NullTelemetryService) TrackPublishRequested(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) {
|
|
}
|
|
func (n NullTelemetryService) TrackPublished(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool) {
|
|
}
|
|
func (n NullTelemetryService) TrackUnpublished(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, shouldSendEvent bool) {
|
|
}
|
|
func (n NullTelemetryService) TrackSubscribeRequested(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
|
|
}
|
|
func (n NullTelemetryService) TrackSubscribed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, publisher *livekit.ParticipantInfo, shouldSendEvent bool) {
|
|
}
|
|
func (n NullTelemetryService) TrackUnsubscribed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, shouldSendEvent bool) {
|
|
}
|
|
func (n NullTelemetryService) TrackSubscribeFailed(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, err error, isUserError bool) {
|
|
}
|
|
func (n NullTelemetryService) TrackMuted(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
|
|
}
|
|
func (n NullTelemetryService) TrackUnmuted(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
|
|
}
|
|
func (n NullTelemetryService) TrackPublishedUpdate(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
|
|
}
|
|
func (n NullTelemetryService) TrackMaxSubscribedVideoQuality(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, track *livekit.TrackInfo, mime mime.MimeType, maxQuality livekit.VideoQuality) {
|
|
}
|
|
func (n NullTelemetryService) TrackPublishRTPStats(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, mimeType mime.MimeType, layer int, stats *livekit.RTPStats) {
|
|
}
|
|
func (n NullTelemetryService) TrackSubscribeRTPStats(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, trackID livekit.TrackID, mimeType mime.MimeType, stats *livekit.RTPStats) {
|
|
}
|
|
func (n NullTelemetryService) EgressStarted(ctx context.Context, info *livekit.EgressInfo) {}
|
|
func (n NullTelemetryService) EgressUpdated(ctx context.Context, info *livekit.EgressInfo) {}
|
|
func (n NullTelemetryService) EgressEnded(ctx context.Context, info *livekit.EgressInfo) {}
|
|
func (n NullTelemetryService) IngressCreated(ctx context.Context, info *livekit.IngressInfo) {}
|
|
func (n NullTelemetryService) IngressDeleted(ctx context.Context, info *livekit.IngressInfo) {}
|
|
func (n NullTelemetryService) IngressStarted(ctx context.Context, info *livekit.IngressInfo) {}
|
|
func (n NullTelemetryService) IngressUpdated(ctx context.Context, info *livekit.IngressInfo) {}
|
|
func (n NullTelemetryService) IngressEnded(ctx context.Context, info *livekit.IngressInfo) {}
|
|
func (n NullTelemetryService) LocalRoomState(ctx context.Context, info *livekit.AnalyticsNodeRooms) {}
|
|
func (n NullTelemetryService) Report(ctx context.Context, reportInfo *livekit.ReportInfo) {}
|
|
func (n NullTelemetryService) APICall(ctx context.Context, apiCallInfo *livekit.APICallInfo) {}
|
|
func (n NullTelemetryService) Webhook(ctx context.Context, webhookInfo *livekit.WebhookInfo) {}
|
|
func (n NullTelemetryService) NotifyEgressEvent(ctx context.Context, event string, info *livekit.EgressInfo) {
|
|
}
|
|
func (n NullTelemetryService) FlushStats() {}
|
|
|
|
// -----------------------------
|
|
|
|
const (
|
|
workerCleanupWait = 3 * time.Minute
|
|
jobsQueueMinSize = 2048
|
|
|
|
telemetryStatsUpdateInterval = time.Second * 30
|
|
)
|
|
|
|
type statsWorkerKey struct {
|
|
roomID livekit.RoomID
|
|
participantID livekit.ParticipantID
|
|
}
|
|
|
|
type telemetryService struct {
|
|
AnalyticsService
|
|
|
|
notifier webhook.QueuedNotifier
|
|
jobsQueue *utils.OpsQueue
|
|
|
|
workersMu sync.RWMutex
|
|
workers map[statsWorkerKey]*StatsWorker
|
|
workerList *StatsWorker
|
|
|
|
flushMu sync.Mutex
|
|
}
|
|
|
|
func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsService) TelemetryService {
|
|
t := &telemetryService{
|
|
AnalyticsService: analytics,
|
|
notifier: notifier,
|
|
jobsQueue: utils.NewOpsQueue(utils.OpsQueueParams{
|
|
Name: "telemetry",
|
|
MinSize: jobsQueueMinSize,
|
|
FlushOnStop: true,
|
|
Logger: logger.GetLogger(),
|
|
}),
|
|
workers: make(map[statsWorkerKey]*StatsWorker),
|
|
}
|
|
if t.notifier != nil {
|
|
t.notifier.RegisterProcessedHook(func(ctx context.Context, whi *livekit.WebhookInfo) {
|
|
t.Webhook(ctx, whi)
|
|
})
|
|
}
|
|
|
|
t.jobsQueue.Start()
|
|
go t.run()
|
|
|
|
return t
|
|
}
|
|
|
|
func (t *telemetryService) FlushStats() {
|
|
t.flushMu.Lock()
|
|
defer t.flushMu.Unlock()
|
|
|
|
t.workersMu.RLock()
|
|
worker := t.workerList
|
|
t.workersMu.RUnlock()
|
|
|
|
now := time.Now()
|
|
var prev, reap *StatsWorker
|
|
for worker != nil {
|
|
next := worker.next
|
|
if closed := worker.Flush(now, workerCleanupWait); closed {
|
|
if prev == nil {
|
|
// this worker was at the head of the list
|
|
t.workersMu.Lock()
|
|
p := &t.workerList
|
|
for *p != worker {
|
|
// new workers have been added. scan until we find the one
|
|
// immediately before this
|
|
prev = *p
|
|
p = &prev.next
|
|
}
|
|
*p = worker.next
|
|
t.workersMu.Unlock()
|
|
} else {
|
|
prev.next = worker.next
|
|
}
|
|
|
|
worker.next = reap
|
|
reap = worker
|
|
} else {
|
|
prev = worker
|
|
}
|
|
worker = next
|
|
}
|
|
|
|
if reap != nil {
|
|
t.workersMu.Lock()
|
|
for reap != nil {
|
|
key := statsWorkerKey{reap.roomID, reap.participantID}
|
|
if reap == t.workers[key] {
|
|
delete(t.workers, key)
|
|
}
|
|
reap = reap.next
|
|
}
|
|
t.workersMu.Unlock()
|
|
}
|
|
}
|
|
|
|
func (t *telemetryService) run() {
|
|
for range time.Tick(telemetryStatsUpdateInterval) {
|
|
t.FlushStats()
|
|
}
|
|
}
|
|
|
|
func (t *telemetryService) enqueue(op func()) {
|
|
t.jobsQueue.Enqueue(op)
|
|
}
|
|
|
|
func (t *telemetryService) getWorker(roomID livekit.RoomID, participantID livekit.ParticipantID) (worker *StatsWorker, ok bool) {
|
|
t.workersMu.RLock()
|
|
defer t.workersMu.RUnlock()
|
|
|
|
worker, ok = t.workers[statsWorkerKey{roomID, participantID}]
|
|
return
|
|
}
|
|
|
|
func (t *telemetryService) getOrCreateWorker(
|
|
ctx context.Context,
|
|
roomID livekit.RoomID,
|
|
roomName livekit.RoomName,
|
|
participantID livekit.ParticipantID,
|
|
participantIdentity livekit.ParticipantIdentity,
|
|
guard *ReferenceGuard,
|
|
) (*StatsWorker, bool) {
|
|
t.workersMu.Lock()
|
|
defer t.workersMu.Unlock()
|
|
|
|
key := statsWorkerKey{roomID, participantID}
|
|
worker, ok := t.workers[key]
|
|
if ok && !worker.Closed(guard) {
|
|
return worker, true
|
|
}
|
|
|
|
existingIsConnected := false
|
|
if ok {
|
|
existingIsConnected = worker.IsConnected()
|
|
}
|
|
|
|
worker = newStatsWorker(
|
|
ctx,
|
|
t,
|
|
roomID,
|
|
roomName,
|
|
participantID,
|
|
participantIdentity,
|
|
guard,
|
|
)
|
|
if existingIsConnected {
|
|
worker.SetConnected()
|
|
}
|
|
|
|
t.workers[key] = worker
|
|
|
|
worker.next = t.workerList
|
|
t.workerList = worker
|
|
|
|
return worker, false
|
|
}
|
|
|
|
func (t *telemetryService) LocalRoomState(ctx context.Context, info *livekit.AnalyticsNodeRooms) {
|
|
t.enqueue(func() {
|
|
t.SendNodeRoomStates(ctx, info)
|
|
})
|
|
}
|