Configurable telemetry stats worker clean up wait. (#4148)

* Configurable telemetry stats worker clean up wait.

* make worker clean up wait setting atomic
This commit is contained in:
Raja Subramanian
2025-12-11 11:25:32 +05:30
committed by GitHub
parent d7db7cb389
commit 97099cae3e
4 changed files with 57 additions and 10 deletions

View File

@@ -89,23 +89,23 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService, objectStore)
topicFormatter := rpc.NewTopicFormatter()
roomClient, err := rpc.NewTypedRoomClient(clientParams)
v, err := rpc.NewTypedRoomClient(clientParams)
if err != nil {
return nil, err
}
participantClient, err := rpc.NewTypedParticipantClient(clientParams)
v2, err := rpc.NewTypedParticipantClient(clientParams)
if err != nil {
return nil, err
}
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient)
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2)
if err != nil {
return nil, err
}
agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams)
v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams)
if err != nil {
return nil, err
}
agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router)
agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router)
egressService := NewEgressService(egressClient, rtcEgressLauncher, ioInfoService, roomService)
ingressConfig := getIngressConfig(conf)
ingressClient, err := rpc.NewIngressClient(clientParams)
@@ -120,11 +120,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, router, telemetryService)
whipParticipantClient, err := rpc.NewTypedWHIPParticipantClient(clientParams)
v4, err := rpc.NewTypedWHIPParticipantClient(clientParams)
if err != nil {
return nil, err
}
serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, whipParticipantClient)
serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, v4)
if err != nil {
return nil, err
}

View File

@@ -120,7 +120,7 @@ func (s *StatsWorker) IsConnected() bool {
return s.isConnected
}
func (s *StatsWorker) Flush(now time.Time) bool {
func (s *StatsWorker) Flush(now time.Time, closeWait time.Duration) bool {
ts := timestamppb.New(now)
s.lock.Lock()
@@ -132,7 +132,7 @@ func (s *StatsWorker) Flush(now time.Time) bool {
outgoingPerTrack := s.outgoingPerTrack
s.outgoingPerTrack = make(map[livekit.TrackID][]*livekit.AnalyticsStat)
closed := !s.closedAt.IsZero() && now.Sub(s.closedAt) > workerCleanupWait
closed := !s.closedAt.IsZero() && now.Sub(s.closedAt) > closeWait
s.lock.Unlock()
stats = s.collectStats(ts, livekit.StreamType_UPSTREAM, incomingPerTrack, stats)

View File

@@ -4,6 +4,7 @@ package telemetryfakes
import (
"context"
"sync"
"time"
"github.com/livekit/livekit-server/pkg/sfu/mime"
"github.com/livekit/livekit-server/pkg/telemetry"
@@ -169,6 +170,11 @@ type FakeTelemetryService struct {
arg1 context.Context
arg2 []*livekit.AnalyticsStat
}
SetWorkerCleanupWaitDurationStub func(time.Duration)
setWorkerCleanupWaitDurationMutex sync.RWMutex
setWorkerCleanupWaitDurationArgsForCall []struct {
arg1 time.Duration
}
TrackMaxSubscribedVideoQualityStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo, mime.MimeType, livekit.VideoQuality)
trackMaxSubscribedVideoQualityMutex sync.RWMutex
trackMaxSubscribedVideoQualityArgsForCall []struct {
@@ -1091,6 +1097,38 @@ func (fake *FakeTelemetryService) SendStatsArgsForCall(i int) (context.Context,
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeTelemetryService) SetWorkerCleanupWaitDuration(arg1 time.Duration) {
fake.setWorkerCleanupWaitDurationMutex.Lock()
fake.setWorkerCleanupWaitDurationArgsForCall = append(fake.setWorkerCleanupWaitDurationArgsForCall, struct {
arg1 time.Duration
}{arg1})
stub := fake.SetWorkerCleanupWaitDurationStub
fake.recordInvocation("SetWorkerCleanupWaitDuration", []interface{}{arg1})
fake.setWorkerCleanupWaitDurationMutex.Unlock()
if stub != nil {
fake.SetWorkerCleanupWaitDurationStub(arg1)
}
}
func (fake *FakeTelemetryService) SetWorkerCleanupWaitDurationCallCount() int {
fake.setWorkerCleanupWaitDurationMutex.RLock()
defer fake.setWorkerCleanupWaitDurationMutex.RUnlock()
return len(fake.setWorkerCleanupWaitDurationArgsForCall)
}
func (fake *FakeTelemetryService) SetWorkerCleanupWaitDurationCalls(stub func(time.Duration)) {
fake.setWorkerCleanupWaitDurationMutex.Lock()
defer fake.setWorkerCleanupWaitDurationMutex.Unlock()
fake.SetWorkerCleanupWaitDurationStub = stub
}
func (fake *FakeTelemetryService) SetWorkerCleanupWaitDurationArgsForCall(i int) time.Duration {
fake.setWorkerCleanupWaitDurationMutex.RLock()
defer fake.setWorkerCleanupWaitDurationMutex.RUnlock()
argsForCall := fake.setWorkerCleanupWaitDurationArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeTelemetryService) TrackMaxSubscribedVideoQuality(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo, arg4 mime.MimeType, arg5 livekit.VideoQuality) {
fake.trackMaxSubscribedVideoQualityMutex.Lock()
fake.trackMaxSubscribedVideoQualityArgsForCall = append(fake.trackMaxSubscribedVideoQualityArgsForCall, struct {

View File

@@ -24,6 +24,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/webhook"
"go.uber.org/atomic"
)
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
@@ -85,6 +86,7 @@ type TelemetryService interface {
AnalyticsService
NotifyEgressEvent(ctx context.Context, event string, info *livekit.EgressInfo)
FlushStats()
SetWorkerCleanupWaitDuration(wait time.Duration)
}
const (
@@ -105,6 +107,8 @@ type telemetryService struct {
workerList *StatsWorker
flushMu sync.Mutex
workerCleanupWait atomic.Duration
}
func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsService) TelemetryService {
@@ -119,6 +123,7 @@ func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsSer
}),
workers: make(map[livekit.ParticipantID]*StatsWorker),
}
t.workerCleanupWait.Store(workerCleanupWait)
if t.notifier != nil {
t.notifier.RegisterProcessedHook(func(ctx context.Context, whi *livekit.WebhookInfo) {
t.Webhook(ctx, whi)
@@ -143,7 +148,7 @@ func (t *telemetryService) FlushStats() {
var prev, reap *StatsWorker
for worker != nil {
next := worker.next
if closed := worker.Flush(now); closed {
if closed := worker.Flush(now, t.workerCleanupWait.Load()); closed {
if prev == nil {
// this worker was at the head of the list
t.workersMu.Lock()
@@ -180,6 +185,10 @@ func (t *telemetryService) FlushStats() {
}
}
func (t *telemetryService) SetWorkerCleanupWaitDuration(wait time.Duration) {
t.workerCleanupWait.Store(max(workerCleanupWait, wait))
}
func (t *telemetryService) run() {
for range time.Tick(telemetryStatsUpdateInterval) {
t.FlushStats()