From 29039b4e76ddbb5640c2ee9b426c8ff35edf7f31 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 18 Jul 2022 11:47:43 +0530 Subject: [PATCH] Use a go routine to clean up stats workers. (#836) * Use a go routine to clean up stats workers. It is possible that certain events (like TrackUnpublished) can happen after the participant is closed. For webhooks pertaining to those events, need details like room name/id. So,reap stats workers a little while after the participant left event happens. * handle data race report * log analytics worker reap * debug log --- pkg/rtc/participant.go | 7 ++-- pkg/telemetry/statsworker.go | 24 +++++++++++++- pkg/telemetry/telemetryservice.go | 6 ++++ pkg/telemetry/telemetryserviceinternal.go | 32 ++++++++++++++++++- .../telemetryserviceinternalevents.go | 7 ---- 5 files changed, 65 insertions(+), 11 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index d39fc59e3..5fc099de4 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1837,8 +1837,11 @@ func (p *ParticipantImpl) handleTrackPublished(track types.MediaTrack) { p.SetMigrateState(types.MigrateStateComplete) } - if p.onTrackPublished != nil { - p.onTrackPublished(p, track) + p.lock.RLock() + onTrackPublished := p.onTrackPublished + p.lock.RUnlock() + if onTrackPublished != nil { + onTrackPublished(p, track) } } diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go index 8baecf0b3..59a179628 100644 --- a/pkg/telemetry/statsworker.go +++ b/pkg/telemetry/statsworker.go @@ -3,6 +3,7 @@ package telemetry import ( "context" "sync" + "time" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" @@ -20,9 +21,10 @@ type StatsWorker struct { participantID livekit.ParticipantID participantIdentity livekit.ParticipantIdentity - lock sync.Mutex + lock sync.RWMutex outgoingPerTrack map[livekit.TrackID][]*livekit.AnalyticsStat incomingPerTrack map[livekit.TrackID][]*livekit.AnalyticsStat + closedAt time.Time } func newStatsWorker( @@ -60,6 +62,11 @@ func (s *StatsWorker) Update() { ts := timestamppb.Now() s.lock.Lock() + if !s.closedAt.IsZero() { + s.lock.Unlock() + return + } + stats := make([]*livekit.AnalyticsStat, 0, len(s.incomingPerTrack)+len(s.outgoingPerTrack)) incomingPerTrack := s.incomingPerTrack @@ -123,6 +130,21 @@ func (s *StatsWorker) patch( func (s *StatsWorker) Close() { s.Update() + + s.lock.Lock() + s.closedAt = time.Now() + s.lock.Unlock() +} + +func (s *StatsWorker) ClosedAt() time.Time { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.closedAt +} + +func (s *StatsWorker) ParticipantID() livekit.ParticipantID { + return s.participantID } // ------------------------------------------------------------------------- diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 85e3b307d..47602370f 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -53,10 +53,16 @@ func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) func (t *telemetryService) run() { ticker := time.NewTicker(config.StatsUpdateInterval) defer ticker.Stop() + + cleanupTicker := time.NewTicker(time.Minute) + defer cleanupTicker.Stop() + for { select { case <-ticker.C: t.internalService.SendAnalytics() + case <-cleanupTicker.C: + t.internalService.CleanupWorkers() case op := <-t.jobsChan: op() } diff --git a/pkg/telemetry/telemetryserviceinternal.go b/pkg/telemetry/telemetryserviceinternal.go index 68fca98e0..02afc381e 100644 --- a/pkg/telemetry/telemetryserviceinternal.go +++ b/pkg/telemetry/telemetryserviceinternal.go @@ -3,20 +3,26 @@ package telemetry import ( "context" "sync" + "time" "github.com/gammazero/workerpool" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/webhook" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) -const maxWebhookWorkers = 50 +const ( + maxWebhookWorkers = 50 + workerCleanupWait = 3 * time.Minute +) type TelemetryServiceInternal interface { TelemetryService SendAnalytics() + CleanupWorkers() } type TelemetryReporter interface { @@ -103,6 +109,30 @@ func (t *telemetryServiceInternal) SendAnalytics() { } } +func (t *telemetryServiceInternal) CleanupWorkers() { + t.workersMu.RLock() + workers := t.workers + t.workersMu.RUnlock() + + for _, worker := range workers { + if worker == nil { + continue + } + + closedAt := worker.ClosedAt() + if !closedAt.IsZero() && time.Since(closedAt) > workerCleanupWait { + pID := worker.ParticipantID() + logger.Debugw("reaping analytics worker for participant", "pID", pID) + t.workersMu.Lock() + if idx, ok := t.workersIdx[pID]; ok { + delete(t.workersIdx, pID) + t.workers[idx] = nil + } + t.workersMu.Unlock() + } + } +} + func (t *telemetryServiceInternal) getStatsWorker(participantID livekit.ParticipantID) *StatsWorker { t.workersMu.RLock() defer t.workersMu.RUnlock() diff --git a/pkg/telemetry/telemetryserviceinternalevents.go b/pkg/telemetry/telemetryserviceinternalevents.go index 82e7a11cc..372c049b5 100644 --- a/pkg/telemetry/telemetryserviceinternalevents.go +++ b/pkg/telemetry/telemetryserviceinternalevents.go @@ -119,13 +119,6 @@ func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *li w.Close() } - t.workersMu.Lock() - if idx, ok := t.workersIdx[livekit.ParticipantID(participant.Sid)]; ok { - delete(t.workersIdx, livekit.ParticipantID(participant.Sid)) - t.workers[idx] = nil - } - t.workersMu.Unlock() - prometheus.SubParticipant() t.notifyEvent(ctx, &livekit.WebhookEvent{