diff --git a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go index 76a936714..5f1267fa9 100644 --- a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go +++ b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go @@ -4,7 +4,6 @@ package telemetryfakes import ( "context" "sync" - "time" "github.com/livekit/livekit-server/pkg/sfu/mime" "github.com/livekit/livekit-server/pkg/telemetry" @@ -170,11 +169,6 @@ type FakeTelemetryService struct { arg1 context.Context arg2 []*livekit.AnalyticsStat } - SetWorkerCleanupWaitDurationStub func(time.Duration) - setWorkerCleanupWaitDurationMutex sync.RWMutex - setWorkerCleanupWaitDurationArgsForCall []struct { - arg1 time.Duration - } TrackMaxSubscribedVideoQualityStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo, mime.MimeType, livekit.VideoQuality) trackMaxSubscribedVideoQualityMutex sync.RWMutex trackMaxSubscribedVideoQualityArgsForCall []struct { @@ -1097,38 +1091,6 @@ func (fake *FakeTelemetryService) SendStatsArgsForCall(i int) (context.Context, return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeTelemetryService) SetWorkerCleanupWaitDuration(arg1 time.Duration) { - fake.setWorkerCleanupWaitDurationMutex.Lock() - fake.setWorkerCleanupWaitDurationArgsForCall = append(fake.setWorkerCleanupWaitDurationArgsForCall, struct { - arg1 time.Duration - }{arg1}) - stub := fake.SetWorkerCleanupWaitDurationStub - fake.recordInvocation("SetWorkerCleanupWaitDuration", []interface{}{arg1}) - fake.setWorkerCleanupWaitDurationMutex.Unlock() - if stub != nil { - fake.SetWorkerCleanupWaitDurationStub(arg1) - } -} - -func (fake *FakeTelemetryService) SetWorkerCleanupWaitDurationCallCount() int { - fake.setWorkerCleanupWaitDurationMutex.RLock() - defer fake.setWorkerCleanupWaitDurationMutex.RUnlock() - return len(fake.setWorkerCleanupWaitDurationArgsForCall) -} - -func (fake *FakeTelemetryService) SetWorkerCleanupWaitDurationCalls(stub func(time.Duration)) { - fake.setWorkerCleanupWaitDurationMutex.Lock() - defer fake.setWorkerCleanupWaitDurationMutex.Unlock() - fake.SetWorkerCleanupWaitDurationStub = stub -} - -func (fake *FakeTelemetryService) SetWorkerCleanupWaitDurationArgsForCall(i int) time.Duration { - fake.setWorkerCleanupWaitDurationMutex.RLock() - defer fake.setWorkerCleanupWaitDurationMutex.RUnlock() - argsForCall := fake.setWorkerCleanupWaitDurationArgsForCall[i] - return argsForCall.arg1 -} - func (fake *FakeTelemetryService) TrackMaxSubscribedVideoQuality(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo, arg4 mime.MimeType, arg5 livekit.VideoQuality) { fake.trackMaxSubscribedVideoQualityMutex.Lock() fake.trackMaxSubscribedVideoQualityArgsForCall = append(fake.trackMaxSubscribedVideoQualityArgsForCall, struct { diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index a840ef948..be27643ac 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -24,7 +24,6 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/webhook" - "go.uber.org/atomic" ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate @@ -86,7 +85,6 @@ type TelemetryService interface { AnalyticsService NotifyEgressEvent(ctx context.Context, event string, info *livekit.EgressInfo) FlushStats() - SetWorkerCleanupWaitDuration(wait time.Duration) } const ( @@ -107,8 +105,6 @@ type telemetryService struct { workerList *StatsWorker flushMu sync.Mutex - - workerCleanupWait atomic.Duration } func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsService) TelemetryService { @@ -123,7 +119,6 @@ func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsSer }), workers: make(map[livekit.ParticipantID]*StatsWorker), } - t.workerCleanupWait.Store(workerCleanupWait) if t.notifier != nil { t.notifier.RegisterProcessedHook(func(ctx context.Context, whi *livekit.WebhookInfo) { t.Webhook(ctx, whi) @@ -148,7 +143,7 @@ func (t *telemetryService) FlushStats() { var prev, reap *StatsWorker for worker != nil { next := worker.next - if closed := worker.Flush(now, t.workerCleanupWait.Load()); closed { + if closed := worker.Flush(now, workerCleanupWait); closed { if prev == nil { // this worker was at the head of the list t.workersMu.Lock() @@ -185,10 +180,6 @@ func (t *telemetryService) FlushStats() { } } -func (t *telemetryService) SetWorkerCleanupWaitDuration(wait time.Duration) { - t.workerCleanupWait.Store(max(workerCleanupWait, wait)) -} - func (t *telemetryService) run() { for range time.Tick(telemetryStatsUpdateInterval) { t.FlushStats()