From b744a9c2bac8f34b5c0d228b84b5260da45d69ad Mon Sep 17 00:00:00 2001 From: Artur Shellunts Date: Wed, 29 Dec 2021 19:51:12 +0100 Subject: [PATCH] Implement event loop for telemetry service (#297) It allows all actions/events to run in the same go routine. Therefore no synchronization primitives are needed inside telemetry service implementation. --- pkg/telemetry/statsworker.go | 16 ----- pkg/telemetry/telemetryservice.go | 66 +++++++++++++++---- pkg/telemetry/telemetryserviceinternal.go | 8 --- .../telemetryserviceinternalevents.go | 8 --- 4 files changed, 52 insertions(+), 46 deletions(-) diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go index a52c72b2f..01067e99b 100644 --- a/pkg/telemetry/statsworker.go +++ b/pkg/telemetry/statsworker.go @@ -2,7 +2,6 @@ package telemetry import ( "context" - "sync" "github.com/livekit/protocol/livekit" "google.golang.org/protobuf/types/known/timestamppb" @@ -18,7 +17,6 @@ type StatsWorker struct { roomName string participantID string - sync.RWMutex upstreamBuffers map[string][]*buffer.Buffer drainUpstreamBuffers map[string]bool @@ -52,16 +50,10 @@ func newStatsWorker(ctx context.Context, t TelemetryReporter, roomID, roomName, } func (s *StatsWorker) AddBuffer(trackID string, buffer *buffer.Buffer) { - s.Lock() - defer s.Unlock() - s.upstreamBuffers[trackID] = append(s.upstreamBuffers[trackID], buffer) } func (s *StatsWorker) OnDownstreamPacket(trackID string, bytes int) { - s.Lock() - defer s.Unlock() - s.getOrCreateOutgoingStatsIfEmpty(trackID).totalBytes += uint64(bytes) s.getOrCreateOutgoingStatsIfEmpty(trackID).totalPackets++ } @@ -91,9 +83,6 @@ func (s *StatsWorker) getOrCreateIncomingStatsIfEmpty(trackID string) *Stats { } func (s *StatsWorker) OnRTCP(trackID string, direction livekit.StreamType, stats *livekit.AnalyticsStat) { - s.Lock() - defer s.Unlock() - var ds *Stats if direction == livekit.StreamType_DOWNSTREAM { ds = s.getOrCreateOutgoingStatsIfEmpty(trackID) @@ -125,9 +114,6 @@ func (s *StatsWorker) calculateTotalBytesPackets(allBuffers []*buffer.Buffer) (t } func (s *StatsWorker) Update() { - s.Lock() - defer s.Unlock() - ts := timestamppb.Now() stats := make([]*livekit.AnalyticsStat, 0) @@ -196,9 +182,7 @@ func (s *StatsWorker) update(stats *Stats, ts *timestamppb.Timestamp) *livekit.A } func (s *StatsWorker) RemoveBuffer(trackID string) { - s.Lock() s.drainUpstreamBuffers[trackID] = true - s.Unlock() } func (s *StatsWorker) Close() { diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 85a7c9e24..9e420d629 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -32,13 +32,19 @@ type TelemetryService interface { RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) } +type doWorkFunc func() + type telemetryService struct { internalService TelemetryServiceInternal + jobQueue chan doWorkFunc } +const jobQueueBufferSize = 100 + func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) TelemetryService { t := &telemetryService{ internalService: NewTelemetryServiceInternal(notifier, analytics), + jobQueue: make(chan doWorkFunc, jobQueueBufferSize), } go t.run() @@ -47,62 +53,94 @@ func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) } func (t *telemetryService) run() { + + ticker := time.NewTicker(updateFrequency) for { select { - case <-time.After(updateFrequency): + case <-ticker.C: t.internalService.SendAnalytics() + case job, ok := <-t.jobQueue: + if ok { + job() + } } } } func (t *telemetryService) AddUpTrack(participantID string, trackID string, buff *buffer.Buffer) { - t.internalService.AddUpTrack(participantID, trackID, buff) + t.jobQueue <- func() { + t.internalService.AddUpTrack(participantID, trackID, buff) + } } func (t *telemetryService) OnDownstreamPacket(participantID string, trackID string, bytes int) { - t.internalService.OnDownstreamPacket(participantID, trackID, bytes) + t.jobQueue <- func() { + t.internalService.OnDownstreamPacket(participantID, trackID, bytes) + } } func (t *telemetryService) HandleRTCP(streamType livekit.StreamType, participantID string, trackID string, pkts []rtcp.Packet) { - t.internalService.HandleRTCP(streamType, participantID, trackID, pkts) + t.jobQueue <- func() { + t.internalService.HandleRTCP(streamType, participantID, trackID, pkts) + } } func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) { - t.internalService.RoomStarted(ctx, room) + t.jobQueue <- func() { + t.internalService.RoomStarted(ctx, room) + } } func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) { - t.internalService.RoomEnded(ctx, room) + t.jobQueue <- func() { + t.internalService.RoomEnded(ctx, room) + } } func (t *telemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo) { - t.internalService.ParticipantJoined(ctx, room, participant, clientInfo) + t.jobQueue <- func() { + t.internalService.ParticipantJoined(ctx, room, participant, clientInfo) + } } func (t *telemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { - t.internalService.ParticipantLeft(ctx, room, participant) + t.jobQueue <- func() { + t.internalService.ParticipantLeft(ctx, room, participant) + } } func (t *telemetryService) TrackPublished(ctx context.Context, participantID string, track *livekit.TrackInfo) { - t.internalService.TrackPublished(ctx, participantID, track) + t.jobQueue <- func() { + t.internalService.TrackPublished(ctx, participantID, track) + } } func (t *telemetryService) TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) { - t.internalService.TrackUnpublished(ctx, participantID, track, ssrc) + t.jobQueue <- func() { + t.internalService.TrackUnpublished(ctx, participantID, track, ssrc) + } } func (t *telemetryService) TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) { - t.internalService.TrackSubscribed(ctx, participantID, track) + t.jobQueue <- func() { + t.internalService.TrackSubscribed(ctx, participantID, track) + } } func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) { - t.internalService.TrackUnsubscribed(ctx, participantID, track) + t.jobQueue <- func() { + t.internalService.TrackUnsubscribed(ctx, participantID, track) + } } func (t *telemetryService) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) { - t.internalService.RecordingStarted(ctx, ri) + t.jobQueue <- func() { + t.internalService.RecordingStarted(ctx, ri) + } } func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) { - t.internalService.RecordingEnded(ctx, ri) + t.jobQueue <- func() { + t.internalService.RecordingEnded(ctx, ri) + } } diff --git a/pkg/telemetry/telemetryserviceinternal.go b/pkg/telemetry/telemetryserviceinternal.go index 3d11e9469..e1af45f90 100644 --- a/pkg/telemetry/telemetryserviceinternal.go +++ b/pkg/telemetry/telemetryserviceinternal.go @@ -2,7 +2,6 @@ package telemetry import ( "context" - "sync" "github.com/gammazero/workerpool" "github.com/livekit/protocol/livekit" @@ -26,7 +25,6 @@ type telemetryServiceInternal struct { notifier webhook.Notifier webhookPool *workerpool.WorkerPool - sync.RWMutex // one worker per participant workers map[string]*StatsWorker @@ -43,18 +41,14 @@ func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsS } func (t *telemetryServiceInternal) AddUpTrack(participantID string, trackID string, buff *buffer.Buffer) { - t.RLock() w := t.workers[participantID] - t.RUnlock() if w != nil { w.AddBuffer(trackID, buff) } } func (t *telemetryServiceInternal) OnDownstreamPacket(participantID string, trackID string, bytes int) { - t.RLock() w := t.workers[participantID] - t.RUnlock() if w != nil { w.OnDownstreamPacket(trackID, bytes) } @@ -90,9 +84,7 @@ func (t *telemetryServiceInternal) HandleRTCP(streamType livekit.StreamType, par prometheus.IncrementRTCP(direction, stats.NackCount, stats.PliCount, stats.FirCount) - t.RLock() w := t.workers[participantID] - t.RUnlock() if w != nil { w.OnRTCP(trackID, streamType, stats) } diff --git a/pkg/telemetry/telemetryserviceinternalevents.go b/pkg/telemetry/telemetryserviceinternalevents.go index 50b0db942..ff4e65226 100644 --- a/pkg/telemetry/telemetryserviceinternalevents.go +++ b/pkg/telemetry/telemetryserviceinternalevents.go @@ -46,9 +46,7 @@ func (t *telemetryServiceInternal) RoomEnded(ctx context.Context, room *livekit. func (t *telemetryServiceInternal) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientInfo *livekit.ClientInfo) { - t.Lock() t.workers[participant.Sid] = newStatsWorker(ctx, t, room.Sid, room.Name, participant.Sid) - t.Unlock() prometheus.AddParticipant() @@ -70,12 +68,10 @@ func (t *telemetryServiceInternal) ParticipantJoined(ctx context.Context, room * } func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { - t.Lock() if w := t.workers[participant.Sid]; w != nil { w.Close() delete(t.workers, participant.Sid) } - t.Unlock() prometheus.SubParticipant() @@ -111,9 +107,7 @@ func (t *telemetryServiceInternal) TrackPublished(ctx context.Context, participa func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) { roomID := "" roomName := "" - t.RLock() w := t.workers[participantID] - t.RUnlock() if w != nil { roomID = w.roomID w.RemoveBuffer(track.GetSid()) @@ -189,9 +183,7 @@ func (t *telemetryServiceInternal) RecordingEnded(ctx context.Context, ri *livek } func (t *telemetryServiceInternal) getRoomDetails(participantID string) (string, string) { - t.RLock() w := t.workers[participantID] - t.RUnlock() if w != nil { return w.roomID, w.roomName }