diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index b33cc1744..22b1e0c1c 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -89,23 +89,23 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService, objectStore) topicFormatter := rpc.NewTopicFormatter() - roomClient, err := rpc.NewTypedRoomClient(clientParams) + v, err := rpc.NewTypedRoomClient(clientParams) if err != nil { return nil, err } - participantClient, err := rpc.NewTypedParticipantClient(clientParams) + v2, err := rpc.NewTypedParticipantClient(clientParams) if err != nil { return nil, err } - roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient) + roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2) if err != nil { return nil, err } - agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) + v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) if err != nil { return nil, err } - agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router) + agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router) egressService := NewEgressService(egressClient, rtcEgressLauncher, ioInfoService, roomService) ingressConfig := getIngressConfig(conf) ingressClient, err := rpc.NewIngressClient(clientParams) @@ -120,11 +120,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService) rtcService := NewRTCService(conf, roomAllocator, router, telemetryService) - whipParticipantClient, err := rpc.NewTypedWHIPParticipantClient(clientParams) + v4, err := rpc.NewTypedWHIPParticipantClient(clientParams) if err != nil { return nil, err } - serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, whipParticipantClient) + serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, v4) if err != nil { return nil, err } diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go index 3a0c63ee0..51eec637f 100644 --- a/pkg/telemetry/statsworker.go +++ b/pkg/telemetry/statsworker.go @@ -120,7 +120,7 @@ func (s *StatsWorker) IsConnected() bool { return s.isConnected } -func (s *StatsWorker) Flush(now time.Time) bool { +func (s *StatsWorker) Flush(now time.Time, closeWait time.Duration) bool { ts := timestamppb.New(now) s.lock.Lock() @@ -132,7 +132,7 @@ func (s *StatsWorker) Flush(now time.Time) bool { outgoingPerTrack := s.outgoingPerTrack s.outgoingPerTrack = make(map[livekit.TrackID][]*livekit.AnalyticsStat) - closed := !s.closedAt.IsZero() && now.Sub(s.closedAt) > workerCleanupWait + closed := !s.closedAt.IsZero() && now.Sub(s.closedAt) > closeWait s.lock.Unlock() stats = s.collectStats(ts, livekit.StreamType_UPSTREAM, incomingPerTrack, stats) diff --git a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go index 5f1267fa9..76a936714 100644 --- a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go +++ b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go @@ -4,6 +4,7 @@ package telemetryfakes import ( "context" "sync" + "time" "github.com/livekit/livekit-server/pkg/sfu/mime" "github.com/livekit/livekit-server/pkg/telemetry" @@ -169,6 +170,11 @@ type FakeTelemetryService struct { arg1 context.Context arg2 []*livekit.AnalyticsStat } + SetWorkerCleanupWaitDurationStub func(time.Duration) + setWorkerCleanupWaitDurationMutex sync.RWMutex + setWorkerCleanupWaitDurationArgsForCall []struct { + arg1 time.Duration + } TrackMaxSubscribedVideoQualityStub func(context.Context, livekit.ParticipantID, *livekit.TrackInfo, mime.MimeType, livekit.VideoQuality) trackMaxSubscribedVideoQualityMutex sync.RWMutex trackMaxSubscribedVideoQualityArgsForCall []struct { @@ -1091,6 +1097,38 @@ func (fake *FakeTelemetryService) SendStatsArgsForCall(i int) (context.Context, return argsForCall.arg1, argsForCall.arg2 } +func (fake *FakeTelemetryService) SetWorkerCleanupWaitDuration(arg1 time.Duration) { + fake.setWorkerCleanupWaitDurationMutex.Lock() + fake.setWorkerCleanupWaitDurationArgsForCall = append(fake.setWorkerCleanupWaitDurationArgsForCall, struct { + arg1 time.Duration + }{arg1}) + stub := fake.SetWorkerCleanupWaitDurationStub + fake.recordInvocation("SetWorkerCleanupWaitDuration", []interface{}{arg1}) + fake.setWorkerCleanupWaitDurationMutex.Unlock() + if stub != nil { + fake.SetWorkerCleanupWaitDurationStub(arg1) + } +} + +func (fake *FakeTelemetryService) SetWorkerCleanupWaitDurationCallCount() int { + fake.setWorkerCleanupWaitDurationMutex.RLock() + defer fake.setWorkerCleanupWaitDurationMutex.RUnlock() + return len(fake.setWorkerCleanupWaitDurationArgsForCall) +} + +func (fake *FakeTelemetryService) SetWorkerCleanupWaitDurationCalls(stub func(time.Duration)) { + fake.setWorkerCleanupWaitDurationMutex.Lock() + defer fake.setWorkerCleanupWaitDurationMutex.Unlock() + fake.SetWorkerCleanupWaitDurationStub = stub +} + +func (fake *FakeTelemetryService) SetWorkerCleanupWaitDurationArgsForCall(i int) time.Duration { + fake.setWorkerCleanupWaitDurationMutex.RLock() + defer fake.setWorkerCleanupWaitDurationMutex.RUnlock() + argsForCall := fake.setWorkerCleanupWaitDurationArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeTelemetryService) TrackMaxSubscribedVideoQuality(arg1 context.Context, arg2 livekit.ParticipantID, arg3 *livekit.TrackInfo, arg4 mime.MimeType, arg5 livekit.VideoQuality) { fake.trackMaxSubscribedVideoQualityMutex.Lock() fake.trackMaxSubscribedVideoQualityArgsForCall = append(fake.trackMaxSubscribedVideoQualityArgsForCall, struct { diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 02bb625a5..a840ef948 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -24,6 +24,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/webhook" + "go.uber.org/atomic" ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate @@ -85,6 +86,7 @@ type TelemetryService interface { AnalyticsService NotifyEgressEvent(ctx context.Context, event string, info *livekit.EgressInfo) FlushStats() + SetWorkerCleanupWaitDuration(wait time.Duration) } const ( @@ -105,6 +107,8 @@ type telemetryService struct { workerList *StatsWorker flushMu sync.Mutex + + workerCleanupWait atomic.Duration } func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsService) TelemetryService { @@ -119,6 +123,7 @@ func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsSer }), workers: make(map[livekit.ParticipantID]*StatsWorker), } + t.workerCleanupWait.Store(workerCleanupWait) if t.notifier != nil { t.notifier.RegisterProcessedHook(func(ctx context.Context, whi *livekit.WebhookInfo) { t.Webhook(ctx, whi) @@ -143,7 +148,7 @@ func (t *telemetryService) FlushStats() { var prev, reap *StatsWorker for worker != nil { next := worker.next - if closed := worker.Flush(now); closed { + if closed := worker.Flush(now, t.workerCleanupWait.Load()); closed { if prev == nil { // this worker was at the head of the list t.workersMu.Lock() @@ -180,6 +185,10 @@ func (t *telemetryService) FlushStats() { } } +func (t *telemetryService) SetWorkerCleanupWaitDuration(wait time.Duration) { + t.workerCleanupWait.Store(max(workerCleanupWait, wait)) +} + func (t *telemetryService) run() { for range time.Tick(telemetryStatsUpdateInterval) { t.FlushStats()