telemetry: split webhook-processed hook out of NewTelemetryService (#4548)

* telemetry: split webhook-processed hook registration out of NewTelemetryService

NewTelemetryService used to register a notifier processed-hook on the inner
*telemetryService directly. That made it impossible for downstream wrappers
(e.g. cloud's TelemetryService that overrides Webhook to fan out to a v3
observability pipeline) to intercept webhook events without double-firing
the legacy emission.

Lift the registration into a new exported helper RegisterWebhookHook, and
have the standalone server's wire provider createTelemetryService call it
right after construction so behavior is unchanged for callers that don't
wrap the service.
This commit is contained in:
Paul Wells
2026-05-27 09:40:55 -07:00
committed by GitHub
parent 222177a9e4
commit 2dd5e63207
3 changed files with 27 additions and 16 deletions
+9 -1
View File
@@ -60,7 +60,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
wire.Bind(new(routing.MessageRouter), new(routing.Router)),
wire.Bind(new(livekit.RoomService), new(*RoomService)),
telemetry.NewAnalyticsService,
telemetry.NewTelemetryService,
createTelemetryService,
getMessageBus,
NewIOInfoService,
wire.Bind(new(IOClient), new(*IOInfoService)),
@@ -171,6 +171,14 @@ func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webh
return webhook.NewDefaultNotifier(wc, provider)
}
func createTelemetryService(notifier webhook.QueuedNotifier, analytics telemetry.AnalyticsService) telemetry.TelemetryService {
svc := telemetry.NewTelemetryService(notifier, analytics)
if notifier != nil {
notifier.RegisterProcessedHook(svc.Webhook)
}
return svc
}
func createRedisClient(conf *config.Config) (redis.UniversalClient, error) {
if !conf.Redis.IsConfigured() {
return nil, nil
+18 -10
View File
@@ -83,30 +83,30 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
return nil, err
}
analyticsService := telemetry.NewAnalyticsService(conf, currentNode)
telemetryService := telemetry.NewTelemetryService(queuedNotifier, analyticsService)
telemetryService := createTelemetryService(queuedNotifier, analyticsService)
ioInfoService, err := NewIOInfoService(messageBus, egressStore, ingressStore, sipStore, telemetryService)
if err != nil {
return nil, err
}
rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService, objectStore)
topicFormatter := rpc.NewTopicFormatter()
roomClient, err := rpc.NewTypedRoomClient(clientParams)
v, err := rpc.NewTypedRoomClient(clientParams)
if err != nil {
return nil, err
}
participantClient, err := rpc.NewTypedParticipantClient(clientParams)
v2, err := rpc.NewTypedParticipantClient(clientParams)
if err != nil {
return nil, err
}
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient)
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2)
if err != nil {
return nil, err
}
agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams)
v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams)
if err != nil {
return nil, err
}
agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router)
agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router)
egressService := NewEgressService(egressClient, rtcEgressLauncher, ioInfoService, roomService)
ingressConfig := getIngressConfig(conf)
ingressClient, err := rpc.NewIngressClient(clientParams)
@@ -121,11 +121,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, router, telemetryService)
whipParticipantClient, err := rpc.NewTypedWHIPParticipantClient(clientParams)
v4, err := rpc.NewTypedWHIPParticipantClient(clientParams)
if err != nil {
return nil, err
}
serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, whipParticipantClient)
serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, v4)
if err != nil {
return nil, err
}
@@ -150,8 +150,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
authHandler := getTURNAuthHandlerFunc(turnAuthHandler)
server, err := newInProcessTurnServer(conf, authHandler)
v5 := getTURNAuthHandlerFunc(turnAuthHandler)
server, err := newInProcessTurnServer(conf, v5)
if err != nil {
return nil, err
}
@@ -236,6 +236,14 @@ func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webh
return webhook.NewDefaultNotifier(wc, provider)
}
func createTelemetryService(notifier webhook.QueuedNotifier, analytics telemetry.AnalyticsService) telemetry.TelemetryService {
svc := telemetry.NewTelemetryService(notifier, analytics)
if notifier != nil {
notifier.RegisterProcessedHook(svc.Webhook)
}
return svc
}
func createRedisClient(conf *config.Config) (redis.UniversalClient, error) {
if !conf.Redis.IsConfigured() {
return nil, nil
-5
View File
@@ -195,11 +195,6 @@ func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsSer
}),
workers: make(map[statsWorkerKey]*StatsWorker),
}
if t.notifier != nil {
t.notifier.RegisterProcessedHook(func(ctx context.Context, whi *livekit.WebhookInfo) {
t.Webhook(ctx, whi)
})
}
t.jobsQueue.Start()
go t.run()