Files
livekit/pkg/telemetry/telemetryservice.go
Raja Subramanian a98d955284 Delta stats throughout (#615)
* Use delta stats throughout and avoid calculating deltas in telemetry

* Fix a few things after testing

* Remove debug

* Fix tests

* delete instead of setting to nil

* Point to the latest protocol
2022-04-16 21:11:32 +05:30

164 lines
6.4 KiB
Go

package telemetry
import (
"context"
"time"
"github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/webhook"
)
const updateFrequency = time.Second * 10
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . TelemetryService
type TelemetryService interface {
// stats
TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stat *livekit.AnalyticsStat)
// events
RoomStarted(ctx context.Context, room *livekit.Room)
RoomEnded(ctx context.Context, room *livekit.Room)
ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta)
ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta)
ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo)
TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo)
TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, ssrc uint32)
TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, publisher *livekit.ParticipantInfo)
TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo)
TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo)
TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, maxQuality livekit.VideoQuality)
RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo)
RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo)
EgressStarted(ctx context.Context, info *livekit.EgressInfo)
EgressEnded(ctx context.Context, info *livekit.EgressInfo)
}
type telemetryService struct {
internalService TelemetryServiceInternal
jobQueue *utils.OpsQueue
}
// queue should be sufficiently large to avoid blocking
const jobQueueBufferSize = 10000
func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) TelemetryService {
t := &telemetryService{
internalService: NewTelemetryServiceInternal(notifier, analytics),
jobQueue: utils.NewOpsQueue(logger.Logger(logger.GetLogger()), "telemetry", jobQueueBufferSize),
}
t.jobQueue.Start()
go t.run()
return t
}
func (t *telemetryService) run() {
ticker := time.NewTicker(updateFrequency)
defer ticker.Stop()
for {
<-ticker.C
t.internalService.SendAnalytics()
}
}
func (t *telemetryService) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stats *livekit.AnalyticsStat) {
t.jobQueue.Enqueue(func() {
t.internalService.TrackStats(streamType, participantID, trackID, stats)
})
}
func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {
t.jobQueue.Enqueue(func() {
t.internalService.RoomStarted(ctx, room)
})
}
func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
t.jobQueue.Enqueue(func() {
t.internalService.RoomEnded(ctx, room)
})
}
func (t *telemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo,
clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta) {
t.jobQueue.Enqueue(func() {
t.internalService.ParticipantJoined(ctx, room, participant, clientInfo, clientMeta)
})
}
func (t *telemetryService) ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta) {
t.jobQueue.Enqueue(func() {
t.internalService.ParticipantActive(ctx, room, participant, clientMeta)
})
}
func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) {
t.jobQueue.Enqueue(func() {
t.internalService.ParticipantLeft(ctx, room, participant)
})
}
func (t *telemetryService) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) {
t.jobQueue.Enqueue(func() {
t.internalService.TrackPublished(ctx, participantID, identity, track)
})
}
func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, ssrc uint32) {
t.jobQueue.Enqueue(func() {
t.internalService.TrackUnpublished(ctx, participantID, identity, track, ssrc)
})
}
func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, publisher *livekit.ParticipantInfo) {
t.jobQueue.Enqueue(func() {
t.internalService.TrackSubscribed(ctx, participantID, track, publisher)
})
}
func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
t.jobQueue.Enqueue(func() {
t.internalService.TrackUnsubscribed(ctx, participantID, track)
})
}
func (t *telemetryService) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) {
t.jobQueue.Enqueue(func() {
t.internalService.RecordingStarted(ctx, ri)
})
}
func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) {
t.jobQueue.Enqueue(func() {
t.internalService.RecordingEnded(ctx, ri)
})
}
func (t *telemetryService) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
t.jobQueue.Enqueue(func() {
t.internalService.TrackPublishedUpdate(ctx, participantID, track)
})
}
func (t *telemetryService) TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, maxQuality livekit.VideoQuality) {
t.jobQueue.Enqueue(func() {
t.internalService.TrackMaxSubscribedVideoQuality(ctx, participantID, track, maxQuality)
})
}
func (t *telemetryService) EgressStarted(ctx context.Context, info *livekit.EgressInfo) {
t.jobQueue.Enqueue(func() {
t.internalService.EgressStarted(ctx, info)
})
}
func (t *telemetryService) EgressEnded(ctx context.Context, info *livekit.EgressInfo) {
t.jobQueue.Enqueue(func() {
t.internalService.EgressEnded(ctx, info)
})
}