mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 17:45:40 +00:00
Use a go routine to clean up stats workers. (#836)
* Use a go routine to clean up stats workers. It is possible that certain events (like TrackUnpublished) can happen after the participant is closed. For webhooks pertaining to those events, need details like room name/id. So,reap stats workers a little while after the participant left event happens. * handle data race report * log analytics worker reap * debug log
This commit is contained in:
@@ -1837,8 +1837,11 @@ func (p *ParticipantImpl) handleTrackPublished(track types.MediaTrack) {
|
||||
p.SetMigrateState(types.MigrateStateComplete)
|
||||
}
|
||||
|
||||
if p.onTrackPublished != nil {
|
||||
p.onTrackPublished(p, track)
|
||||
p.lock.RLock()
|
||||
onTrackPublished := p.onTrackPublished
|
||||
p.lock.RUnlock()
|
||||
if onTrackPublished != nil {
|
||||
onTrackPublished(p, track)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package telemetry
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
@@ -20,9 +21,10 @@ type StatsWorker struct {
|
||||
participantID livekit.ParticipantID
|
||||
participantIdentity livekit.ParticipantIdentity
|
||||
|
||||
lock sync.Mutex
|
||||
lock sync.RWMutex
|
||||
outgoingPerTrack map[livekit.TrackID][]*livekit.AnalyticsStat
|
||||
incomingPerTrack map[livekit.TrackID][]*livekit.AnalyticsStat
|
||||
closedAt time.Time
|
||||
}
|
||||
|
||||
func newStatsWorker(
|
||||
@@ -60,6 +62,11 @@ func (s *StatsWorker) Update() {
|
||||
ts := timestamppb.Now()
|
||||
|
||||
s.lock.Lock()
|
||||
if !s.closedAt.IsZero() {
|
||||
s.lock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
stats := make([]*livekit.AnalyticsStat, 0, len(s.incomingPerTrack)+len(s.outgoingPerTrack))
|
||||
|
||||
incomingPerTrack := s.incomingPerTrack
|
||||
@@ -123,6 +130,21 @@ func (s *StatsWorker) patch(
|
||||
|
||||
func (s *StatsWorker) Close() {
|
||||
s.Update()
|
||||
|
||||
s.lock.Lock()
|
||||
s.closedAt = time.Now()
|
||||
s.lock.Unlock()
|
||||
}
|
||||
|
||||
func (s *StatsWorker) ClosedAt() time.Time {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
return s.closedAt
|
||||
}
|
||||
|
||||
func (s *StatsWorker) ParticipantID() livekit.ParticipantID {
|
||||
return s.participantID
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
@@ -53,10 +53,16 @@ func NewTelemetryService(notifier webhook.Notifier, analytics AnalyticsService)
|
||||
func (t *telemetryService) run() {
|
||||
ticker := time.NewTicker(config.StatsUpdateInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
cleanupTicker := time.NewTicker(time.Minute)
|
||||
defer cleanupTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
t.internalService.SendAnalytics()
|
||||
case <-cleanupTicker.C:
|
||||
t.internalService.CleanupWorkers()
|
||||
case op := <-t.jobsChan:
|
||||
op()
|
||||
}
|
||||
|
||||
@@ -3,20 +3,26 @@ package telemetry
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gammazero/workerpool"
|
||||
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
"github.com/livekit/protocol/webhook"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
||||
)
|
||||
|
||||
const maxWebhookWorkers = 50
|
||||
const (
|
||||
maxWebhookWorkers = 50
|
||||
workerCleanupWait = 3 * time.Minute
|
||||
)
|
||||
|
||||
type TelemetryServiceInternal interface {
|
||||
TelemetryService
|
||||
SendAnalytics()
|
||||
CleanupWorkers()
|
||||
}
|
||||
|
||||
type TelemetryReporter interface {
|
||||
@@ -103,6 +109,30 @@ func (t *telemetryServiceInternal) SendAnalytics() {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *telemetryServiceInternal) CleanupWorkers() {
|
||||
t.workersMu.RLock()
|
||||
workers := t.workers
|
||||
t.workersMu.RUnlock()
|
||||
|
||||
for _, worker := range workers {
|
||||
if worker == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
closedAt := worker.ClosedAt()
|
||||
if !closedAt.IsZero() && time.Since(closedAt) > workerCleanupWait {
|
||||
pID := worker.ParticipantID()
|
||||
logger.Debugw("reaping analytics worker for participant", "pID", pID)
|
||||
t.workersMu.Lock()
|
||||
if idx, ok := t.workersIdx[pID]; ok {
|
||||
delete(t.workersIdx, pID)
|
||||
t.workers[idx] = nil
|
||||
}
|
||||
t.workersMu.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *telemetryServiceInternal) getStatsWorker(participantID livekit.ParticipantID) *StatsWorker {
|
||||
t.workersMu.RLock()
|
||||
defer t.workersMu.RUnlock()
|
||||
|
||||
@@ -119,13 +119,6 @@ func (t *telemetryServiceInternal) ParticipantLeft(ctx context.Context, room *li
|
||||
w.Close()
|
||||
}
|
||||
|
||||
t.workersMu.Lock()
|
||||
if idx, ok := t.workersIdx[livekit.ParticipantID(participant.Sid)]; ok {
|
||||
delete(t.workersIdx, livekit.ParticipantID(participant.Sid))
|
||||
t.workers[idx] = nil
|
||||
}
|
||||
t.workersMu.Unlock()
|
||||
|
||||
prometheus.SubParticipant()
|
||||
|
||||
t.notifyEvent(ctx, &livekit.WebhookEvent{
|
||||
|
||||
Reference in New Issue
Block a user