mirror of
https://github.com/livekit/livekit.git
synced 2026-04-03 10:35:45 +00:00
* Use delta stats throughout and avoid calculating deltas in telemetry * Fix a few things after testing * Remove debug * Fix tests * delete instead of setting to nil * Point to the latest protocol
95 lines
2.3 KiB
Go
95 lines
2.3 KiB
Go
package telemetry
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/gammazero/workerpool"
|
|
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/webhook"
|
|
|
|
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
|
)
|
|
|
|
const maxWebhookWorkers = 50
|
|
|
|
type TelemetryServiceInternal interface {
|
|
TelemetryService
|
|
SendAnalytics()
|
|
}
|
|
|
|
type TelemetryReporter interface {
|
|
Report(ctx context.Context, stats []*livekit.AnalyticsStat)
|
|
}
|
|
|
|
type telemetryServiceInternal struct {
|
|
notifier webhook.Notifier
|
|
webhookPool *workerpool.WorkerPool
|
|
|
|
// one worker per participant
|
|
workers map[livekit.ParticipantID]*StatsWorker
|
|
|
|
analytics AnalyticsService
|
|
}
|
|
|
|
func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsService) TelemetryServiceInternal {
|
|
return &telemetryServiceInternal{
|
|
notifier: notifier,
|
|
webhookPool: workerpool.New(maxWebhookWorkers),
|
|
workers: make(map[livekit.ParticipantID]*StatsWorker),
|
|
analytics: analytics,
|
|
}
|
|
}
|
|
|
|
func (t *telemetryServiceInternal) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stat *livekit.AnalyticsStat) {
|
|
direction := prometheus.Incoming
|
|
if streamType == livekit.StreamType_DOWNSTREAM {
|
|
direction = prometheus.Outgoing
|
|
}
|
|
|
|
nacks := uint32(0)
|
|
plis := uint32(0)
|
|
firs := uint32(0)
|
|
for _, stream := range stat.Streams {
|
|
nacks += stream.Nacks
|
|
plis += stream.Plis
|
|
firs += stream.Firs
|
|
}
|
|
prometheus.IncrementRTCP(direction, nacks, plis, firs)
|
|
|
|
w := t.workers[participantID]
|
|
if w != nil {
|
|
w.OnTrackStat(trackID, streamType, stat)
|
|
}
|
|
}
|
|
|
|
func (t *telemetryServiceInternal) Report(ctx context.Context, stats []*livekit.AnalyticsStat) {
|
|
for _, stat := range stats {
|
|
if len(stat.Streams) == 0 {
|
|
continue
|
|
}
|
|
|
|
direction := prometheus.Incoming
|
|
if stat.Kind == livekit.StreamType_DOWNSTREAM {
|
|
direction = prometheus.Outgoing
|
|
}
|
|
|
|
packets := uint32(0)
|
|
bytes := uint64(0)
|
|
for _, stream := range stat.Streams {
|
|
packets += stream.PrimaryPackets + stream.RetransmitPackets + stream.PaddingPackets
|
|
bytes += stream.PrimaryBytes + stream.RetransmitBytes + stream.PaddingBytes
|
|
}
|
|
prometheus.IncrementPackets(direction, uint64(packets))
|
|
prometheus.IncrementBytes(direction, bytes)
|
|
}
|
|
|
|
t.analytics.SendStats(ctx, stats)
|
|
}
|
|
|
|
func (t *telemetryServiceInternal) SendAnalytics() {
|
|
for _, worker := range t.workers {
|
|
worker.Update()
|
|
}
|
|
}
|