Files
livekit/pkg/telemetry/telemetryserviceinternal.go
Raja Subramanian a98d955284 Delta stats throughout (#615)
* Use delta stats throughout and avoid calculating deltas in telemetry

* Fix a few things after testing

* Remove debug

* Fix tests

* delete instead of setting to nil

* Point to the latest protocol
2022-04-16 21:11:32 +05:30

95 lines
2.3 KiB
Go

package telemetry
import (
"context"
"github.com/gammazero/workerpool"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/webhook"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
const maxWebhookWorkers = 50
type TelemetryServiceInternal interface {
TelemetryService
SendAnalytics()
}
type TelemetryReporter interface {
Report(ctx context.Context, stats []*livekit.AnalyticsStat)
}
type telemetryServiceInternal struct {
notifier webhook.Notifier
webhookPool *workerpool.WorkerPool
// one worker per participant
workers map[livekit.ParticipantID]*StatsWorker
analytics AnalyticsService
}
func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsService) TelemetryServiceInternal {
return &telemetryServiceInternal{
notifier: notifier,
webhookPool: workerpool.New(maxWebhookWorkers),
workers: make(map[livekit.ParticipantID]*StatsWorker),
analytics: analytics,
}
}
func (t *telemetryServiceInternal) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stat *livekit.AnalyticsStat) {
direction := prometheus.Incoming
if streamType == livekit.StreamType_DOWNSTREAM {
direction = prometheus.Outgoing
}
nacks := uint32(0)
plis := uint32(0)
firs := uint32(0)
for _, stream := range stat.Streams {
nacks += stream.Nacks
plis += stream.Plis
firs += stream.Firs
}
prometheus.IncrementRTCP(direction, nacks, plis, firs)
w := t.workers[participantID]
if w != nil {
w.OnTrackStat(trackID, streamType, stat)
}
}
func (t *telemetryServiceInternal) Report(ctx context.Context, stats []*livekit.AnalyticsStat) {
for _, stat := range stats {
if len(stat.Streams) == 0 {
continue
}
direction := prometheus.Incoming
if stat.Kind == livekit.StreamType_DOWNSTREAM {
direction = prometheus.Outgoing
}
packets := uint32(0)
bytes := uint64(0)
for _, stream := range stat.Streams {
packets += stream.PrimaryPackets + stream.RetransmitPackets + stream.PaddingPackets
bytes += stream.PrimaryBytes + stream.RetransmitBytes + stream.PaddingBytes
}
prometheus.IncrementPackets(direction, uint64(packets))
prometheus.IncrementBytes(direction, bytes)
}
t.analytics.SendStats(ctx, stats)
}
func (t *telemetryServiceInternal) SendAnalytics() {
for _, worker := range t.workers {
worker.Update()
}
}