diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index d39fc59e3..5fc099de4 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1837,8 +1837,11 @@ func (p *ParticipantImpl) handleTrackPublished(track types.MediaTrack) { p.SetMigrateState(types.MigrateStateComplete) } - if p.onTrackPublished != nil { - p.onTrackPublished(p, track) + p.lock.RLock() + onTrackPublished := p.onTrackPublished + p.lock.RUnlock() + if onTrackPublished != nil { + onTrackPublished(p, track) } } diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go index 8baecf0b3..59a179628 100644 --- a/pkg/telemetry/statsworker.go +++ b/pkg/telemetry/statsworker.go @@ -3,6 +3,7 @@ package telemetry import ( "context" "sync" + "time" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" @@ -20,9 +21,10 @@ type StatsWorker struct { participantID livekit.ParticipantID participantIdentity livekit.ParticipantIdentity - lock sync.Mutex + lock sync.RWMutex outgoingPerTrack map[livekit.TrackID][]*livekit.AnalyticsStat incomingPerTrack map[livekit.TrackID][]*livekit.AnalyticsStat + closedAt time.Time } func newStatsWorker( @@ -60,6 +62,11 @@ func (s *StatsWorker) Update() { ts := timestamppb.Now() s.lock.Lock() + if !s.closedAt.IsZero() { + s.lock.Unlock() + return + } + stats := make([]*livekit.AnalyticsStat, 0, len(s.incomingPerTrack)+len(s.outgoingPerTrack)) incomingPerTrack := s.incomingPerTrack @@ -123,6 +130,21 @@ func (s *StatsWorker) patch( func (s *StatsWorker) Close() { s.Update() + + s.lock.Lock() + s.closedAt = time.Now() + s.lock.Unlock() +} + +func (s *StatsWorker) ClosedAt() time.Time { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.closedAt +} + +func (s *StatsWorker) ParticipantID() livekit.ParticipantID { + return s.participantID } // ------------------------------------------------------------------------- diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 85e3b307d..47602370f 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -53,10 +53,16 @@ func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService) func (t *telemetryService) run() { ticker := time.NewTicker(config.StatsUpdateInterval) defer ticker.Stop() + + cleanupTicker := time.NewTicker(time.Minute) + defer cleanupTicker.Stop() + for { select { case <-ticker.C: t.internalService.SendAnalytics() + case <-cleanupTicker.C: + t.internalService.CleanupWorkers() case op := <-t.jobsChan: op() } diff --git a/pkg/telemetry/telemetryserviceinternal.go b/pkg/telemetry/telemetryserviceinternal.go index 68fca98e0..02afc381e 100644 --- a/pkg/telemetry/telemetryserviceinternal.go +++ b/pkg/telemetry/telemetryserviceinternal.go @@ -3,20 +3,26 @@ package telemetry import ( "context" "sync" + "time" "github.com/gammazero/workerpool" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/webhook" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) -const maxWebhookWorkers = 50 +const ( + maxWebhookWorkers = 50 + workerCleanupWait = 3 * time.Minute +) type TelemetryServiceInternal interface { TelemetryService SendAnalytics() + CleanupWorkers() } type TelemetryReporter interface { @@ -103,6 +109,30 @@ func (t *telemetryServiceInternal) SendAnalytics() { } } +func (t *telemetryServiceInternal) CleanupWorkers() { + t.workersMu.RLock() + workers := t.workers + t.workersMu.RUnlock() + + for _, worker := range workers { + if worker == nil { + continue + } + + closedAt := worker.ClosedAt() + if !closedAt.IsZero() && time.Since(closedAt) > workerCleanupWait { + pID := worker.ParticipantID() + logger.Debugw("reaping analytics worker for participant", "pID", pID) + t.workersMu.Lock() + if idx, ok := t.workersIdx[pID]; ok { + delete(t.workersIdx, pID) + t.workers[idx] = nil + } + t.workersMu.Unlock() + } + } +} + func (t *telemetryServiceInternal) getStatsWorker(participantID livekit.ParticipantID) *StatsWorker { t.workersMu.RLock() defer t.workersMu.RUnlock() diff --git a/pkg/telemetry/telemetryserviceinternalevents.go b/pkg/telemetry/telemetryserviceinternalevents.go index 82e7a11cc..372c049b5 100644 --- a/pkg/telemetry/telemetryserviceinternalevents.go +++ b/pkg/telemetry/telemetryserviceinternalevents.go @@ -119,13 +119,6 @@ func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *li w.Close() } - t.workersMu.Lock() - if idx, ok := t.workersIdx[livekit.ParticipantID(participant.Sid)]; ok { - delete(t.workersIdx, livekit.ParticipantID(participant.Sid)) - t.workers[idx] = nil - } - t.workersMu.Unlock() - prometheus.SubParticipant() t.notifyEvent(ctx, &livekit.WebhookEvent{