Revert telemetry stats worker wait configuration. (#4151)

Mostly reverting https://github.com/livekit/livekit/pull/4148. Leaving
the one bit to pass in a wait time to `Flush`.
This commit is contained in:
Raja Subramanian
2025-12-12 10:56:25 +05:30
committed by GitHub
parent ca4b56d2d5
commit f01008f876
2 changed files with 1 additions and 48 deletions

View File

@@ -4,7 +4,6 @@ package telemetryfakes
import (
"context"
"sync"
"time"
"github.com/livekit/livekit-server/pkg/sfu/mime"
"github.com/livekit/livekit-server/pkg/telemetry"
@@ -170,11 +169,6 @@ type FakeTelemetryService struct {
arg1 context.Context
arg2 []*livekit.AnalyticsStat
}
SetWorkerCleanupWaitDurationStub func(time.Duration)
setWorkerCleanupWaitDurationMutex sync.RWMutex
setWorkerCleanupWaitDurationArgsForCall []struct {
arg1 time.Duration
}
TrackMaxSubscribedVideoQualityStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo, mime.MimeType, livekit.VideoQuality)
trackMaxSubscribedVideoQualityMutex sync.RWMutex
trackMaxSubscribedVideoQualityArgsForCall []struct {
@@ -1097,38 +1091,6 @@ func (fake *FakeTelemetryService) SendStatsArgsForCall(i int) (context.Context,
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeTelemetryService) SetWorkerCleanupWaitDuration(arg1 time.Duration) {
fake.setWorkerCleanupWaitDurationMutex.Lock()
fake.setWorkerCleanupWaitDurationArgsForCall = append(fake.setWorkerCleanupWaitDurationArgsForCall, struct {
arg1 time.Duration
}{arg1})
stub := fake.SetWorkerCleanupWaitDurationStub
fake.recordInvocation("SetWorkerCleanupWaitDuration", []interface{}{arg1})
fake.setWorkerCleanupWaitDurationMutex.Unlock()
if stub != nil {
fake.SetWorkerCleanupWaitDurationStub(arg1)
}
}
func (fake *FakeTelemetryService) SetWorkerCleanupWaitDurationCallCount() int {
fake.setWorkerCleanupWaitDurationMutex.RLock()
defer fake.setWorkerCleanupWaitDurationMutex.RUnlock()
return len(fake.setWorkerCleanupWaitDurationArgsForCall)
}
func (fake *FakeTelemetryService) SetWorkerCleanupWaitDurationCalls(stub func(time.Duration)) {
fake.setWorkerCleanupWaitDurationMutex.Lock()
defer fake.setWorkerCleanupWaitDurationMutex.Unlock()
fake.SetWorkerCleanupWaitDurationStub = stub
}
func (fake *FakeTelemetryService) SetWorkerCleanupWaitDurationArgsForCall(i int) time.Duration {
fake.setWorkerCleanupWaitDurationMutex.RLock()
defer fake.setWorkerCleanupWaitDurationMutex.RUnlock()
argsForCall := fake.setWorkerCleanupWaitDurationArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeTelemetryService) TrackMaxSubscribedVideoQuality(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo, arg4 mime.MimeType, arg5 livekit.VideoQuality) {
fake.trackMaxSubscribedVideoQualityMutex.Lock()
fake.trackMaxSubscribedVideoQualityArgsForCall = append(fake.trackMaxSubscribedVideoQualityArgsForCall, struct {

View File

@@ -24,7 +24,6 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/webhook"
"go.uber.org/atomic"
)
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
@@ -86,7 +85,6 @@ type TelemetryService interface {
AnalyticsService
NotifyEgressEvent(ctx context.Context, event string, info *livekit.EgressInfo)
FlushStats()
SetWorkerCleanupWaitDuration(wait time.Duration)
}
const (
@@ -107,8 +105,6 @@ type telemetryService struct {
workerList *StatsWorker
flushMu sync.Mutex
workerCleanupWait atomic.Duration
}
func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsService) TelemetryService {
@@ -123,7 +119,6 @@ func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsSer
}),
workers: make(map[livekit.ParticipantID]*StatsWorker),
}
t.workerCleanupWait.Store(workerCleanupWait)
if t.notifier != nil {
t.notifier.RegisterProcessedHook(func(ctx context.Context, whi *livekit.WebhookInfo) {
t.Webhook(ctx, whi)
@@ -148,7 +143,7 @@ func (t *telemetryService) FlushStats() {
var prev, reap *StatsWorker
for worker != nil {
next := worker.next
if closed := worker.Flush(now, t.workerCleanupWait.Load()); closed {
if closed := worker.Flush(now, workerCleanupWait); closed {
if prev == nil {
// this worker was at the head of the list
t.workersMu.Lock()
@@ -185,10 +180,6 @@ func (t *telemetryService) FlushStats() {
}
}
func (t *telemetryService) SetWorkerCleanupWaitDuration(wait time.Duration) {
t.workerCleanupWait.Store(max(workerCleanupWait, wait))
}
func (t *telemetryService) run() {
for range time.Tick(telemetryStatsUpdateInterval) {
t.FlushStats()