From 2dd5e63207c7a084b95f5256171e0a59dfe0e42b Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Wed, 27 May 2026 09:40:55 -0700 Subject: [PATCH] telemetry: split webhook-processed hook out of NewTelemetryService (#4548) * telemetry: split webhook-processed hook registration out of NewTelemetryService NewTelemetryService used to register a notifier processed-hook on the inner *telemetryService directly. That made it impossible for downstream wrappers (e.g. cloud's TelemetryService that overrides Webhook to fan out to a v3 observability pipeline) to intercept webhook events without double-firing the legacy emission. Lift the registration into a new exported helper RegisterWebhookHook, and have the standalone server's wire provider createTelemetryService call it right after construction so behavior is unchanged for callers that don't wrap the service. --- pkg/service/wire.go | 10 +++++++++- pkg/service/wire_gen.go | 28 ++++++++++++++++++---------- pkg/telemetry/telemetryservice.go | 5 ----- 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 72499fec5..a2d60fd32 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -60,7 +60,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live wire.Bind(new(routing.MessageRouter), new(routing.Router)), wire.Bind(new(livekit.RoomService), new(*RoomService)), telemetry.NewAnalyticsService, - telemetry.NewTelemetryService, + createTelemetryService, getMessageBus, NewIOInfoService, wire.Bind(new(IOClient), new(*IOInfoService)), @@ -171,6 +171,14 @@ func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webh return webhook.NewDefaultNotifier(wc, provider) } +func createTelemetryService(notifier webhook.QueuedNotifier, analytics telemetry.AnalyticsService) telemetry.TelemetryService { + svc := telemetry.NewTelemetryService(notifier, analytics) + if notifier != nil { + notifier.RegisterProcessedHook(svc.Webhook) + } + return svc +} + func createRedisClient(conf *config.Config) (redis.UniversalClient, error) { if !conf.Redis.IsConfigured() { return nil, nil diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 57b831a00..8d52e1ce1 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -83,30 +83,30 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live return nil, err } analyticsService := telemetry.NewAnalyticsService(conf, currentNode) - telemetryService := telemetry.NewTelemetryService(queuedNotifier, analyticsService) + telemetryService := createTelemetryService(queuedNotifier, analyticsService) ioInfoService, err := NewIOInfoService(messageBus, egressStore, ingressStore, sipStore, telemetryService) if err != nil { return nil, err } rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService, objectStore) topicFormatter := rpc.NewTopicFormatter() - roomClient, err := rpc.NewTypedRoomClient(clientParams) + v, err := rpc.NewTypedRoomClient(clientParams) if err != nil { return nil, err } - participantClient, err := rpc.NewTypedParticipantClient(clientParams) + v2, err := rpc.NewTypedParticipantClient(clientParams) if err != nil { return nil, err } - roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient) + roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2) if err != nil { return nil, err } - agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) + v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) if err != nil { return nil, err } - agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router) + agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router) egressService := NewEgressService(egressClient, rtcEgressLauncher, ioInfoService, roomService) ingressConfig := getIngressConfig(conf) ingressClient, err := rpc.NewIngressClient(clientParams) @@ -121,11 +121,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService) rtcService := NewRTCService(conf, roomAllocator, router, telemetryService) - whipParticipantClient, err := rpc.NewTypedWHIPParticipantClient(clientParams) + v4, err := rpc.NewTypedWHIPParticipantClient(clientParams) if err != nil { return nil, err } - serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, whipParticipantClient) + serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, v4) if err != nil { return nil, err } @@ -150,8 +150,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - authHandler := getTURNAuthHandlerFunc(turnAuthHandler) - server, err := newInProcessTurnServer(conf, authHandler) + v5 := getTURNAuthHandlerFunc(turnAuthHandler) + server, err := newInProcessTurnServer(conf, v5) if err != nil { return nil, err } @@ -236,6 +236,14 @@ func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webh return webhook.NewDefaultNotifier(wc, provider) } +func createTelemetryService(notifier webhook.QueuedNotifier, analytics telemetry.AnalyticsService) telemetry.TelemetryService { + svc := telemetry.NewTelemetryService(notifier, analytics) + if notifier != nil { + notifier.RegisterProcessedHook(svc.Webhook) + } + return svc +} + func createRedisClient(conf *config.Config) (redis.UniversalClient, error) { if !conf.Redis.IsConfigured() { return nil, nil diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 3f861992f..2ab77f5d6 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -195,11 +195,6 @@ func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsSer }), workers: make(map[statsWorkerKey]*StatsWorker), } - if t.notifier != nil { - t.notifier.RegisterProcessedHook(func(ctx context.Context, whi *livekit.WebhookInfo) { - t.Webhook(ctx, whi) - }) - } t.jobsQueue.Start() go t.run()