diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 93672f7e5..50bbe31f9 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -5,7 +5,6 @@ import ( "time" "github.com/livekit/livekit-server/pkg/config" - "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/webhook" @@ -36,7 +35,7 @@ type TelemetryService interface { type telemetryService struct { internalService TelemetryServiceInternal - jobQueue *utils.OpsQueue + jobsChan chan func() } // queue should be sufficiently large to avoid blocking @@ -45,11 +44,9 @@ const jobQueueBufferSize = 10000 func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) TelemetryService { t := &telemetryService{ internalService: NewTelemetryServiceInternal(notifier, analytics), - jobQueue: utils.NewOpsQueue(logger.GetDefaultLogger(), "telemetry", jobQueueBufferSize), + jobsChan: make(chan func(), jobQueueBufferSize), } - t.jobQueue.Start() - go t.run() return t @@ -59,104 +56,117 @@ func (t *telemetryService) run() { ticker := time.NewTicker(config.StatsUpdateInterval) defer ticker.Stop() for { - <-ticker.C - t.internalService.SendAnalytics() + select { + case <-ticker.C: + t.internalService.SendAnalytics() + case op := <-t.jobsChan: + op() + } + } +} + +func (t *telemetryService) enqueue(op func()) { + select { + case t.jobsChan <- op: + // success + default: + logger.Warnw("telemetry queue full", nil) } } func (t *telemetryService) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stats *livekit.AnalyticsStat) { - t.jobQueue.Enqueue(func() { + t.enqueue(func() { t.internalService.TrackStats(streamType, participantID, trackID, stats) }) } func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) { - t.jobQueue.Enqueue(func() { + t.enqueue(func() { t.internalService.RoomStarted(ctx, room) }) } func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) { - t.jobQueue.Enqueue(func() { + t.enqueue(func() { t.internalService.RoomEnded(ctx, room) }) } func (t *telemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo, clientMeta *livekit.AnalyticsClientMeta) { - t.jobQueue.Enqueue(func() { + t.enqueue(func() { t.internalService.ParticipantJoined(ctx, room, participant, clientInfo, clientMeta) }) } func (t *telemetryService) ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta) { - t.jobQueue.Enqueue(func() { + t.enqueue(func() { t.internalService.ParticipantActive(ctx, room, participant, clientMeta) }) } func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { - t.jobQueue.Enqueue(func() { + t.enqueue(func() { t.internalService.ParticipantLeft(ctx, room, participant) }) } func (t *telemetryService) TrackPublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo) { - t.jobQueue.Enqueue(func() { + t.enqueue(func() { t.internalService.TrackPublished(ctx, participantID, identity, track) }) } func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID livekit.ParticipantID, identity livekit.ParticipantIdentity, track *livekit.TrackInfo, ssrc uint32) { - t.jobQueue.Enqueue(func() { + t.enqueue(func() { t.internalService.TrackUnpublished(ctx, participantID, identity, track, ssrc) }) } func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, publisher *livekit.ParticipantInfo) { - t.jobQueue.Enqueue(func() { + t.enqueue(func() { t.internalService.TrackSubscribed(ctx, participantID, track, publisher) }) } func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { - t.jobQueue.Enqueue(func() { + t.enqueue(func() { t.internalService.TrackUnsubscribed(ctx, participantID, track) }) } func (t *telemetryService) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) { - t.jobQueue.Enqueue(func() { + t.enqueue(func() { t.internalService.RecordingStarted(ctx, ri) }) } func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) { - t.jobQueue.Enqueue(func() { + t.enqueue(func() { t.internalService.RecordingEnded(ctx, ri) }) } func (t *telemetryService) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { - t.jobQueue.Enqueue(func() { + t.enqueue(func() { t.internalService.TrackPublishedUpdate(ctx, participantID, track) }) } func (t *telemetryService) TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, maxQuality livekit.VideoQuality) { - t.jobQueue.Enqueue(func() { + t.enqueue(func() { t.internalService.TrackMaxSubscribedVideoQuality(ctx, participantID, track, maxQuality) }) } func (t *telemetryService) EgressStarted(ctx context.Context, info *livekit.EgressInfo) { - t.jobQueue.Enqueue(func() { + t.enqueue(func() { t.internalService.EgressStarted(ctx, info) }) } func (t *telemetryService) EgressEnded(ctx context.Context, info *livekit.EgressInfo) { - t.jobQueue.Enqueue(func() { + t.enqueue(func() { t.internalService.EgressEnded(ctx, info) }) }