From 1ae2e48c2ed38cf1ec25de9f25e6af4072f240b8 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 13 Feb 2025 10:39:45 +0530 Subject: [PATCH] Webhook analytics event. (#3423) * Webhook analytics event. * deps * generate * nil notifier --- go.mod | 18 ++++---- go.sum | 36 ++++++++-------- pkg/service/wire_gen.go | 10 ++--- pkg/telemetry/events.go | 11 +++++ .../telemetryfakes/fake_telemetry_service.go | 41 +++++++++++++++++++ pkg/telemetry/telemetryservice.go | 9 +++- 6 files changed, 91 insertions(+), 34 deletions(-) diff --git a/go.mod b/go.mod index 5e005566f..21fb911e9 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/dustin/go-humanize v1.0.1 github.com/elliotchance/orderedmap/v2 v2.7.0 github.com/florianl/go-tc v0.4.4 - github.com/frostbyte73/core v0.1.0 + github.com/frostbyte73/core v0.1.1 github.com/gammazero/deque v1.0.0 github.com/gammazero/workerpool v1.1.3 github.com/google/uuid v1.6.0 @@ -22,7 +22,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 - github.com/livekit/protocol v1.33.1-0.20250207102646-9ce0387a72a4 + github.com/livekit/protocol v1.33.1-0.20250213045117-9b6e5d703ff8 github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126 github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 @@ -54,7 +54,7 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - golang.org/x/exp v0.0.0-20250207012021-f9890c6ad9f3 + golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac golang.org/x/sync v0.11.0 google.golang.org/protobuf v1.36.5 gopkg.in/yaml.v3 v3.0.1 @@ -106,7 +106,7 @@ require ( github.com/moby/term v0.5.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nats-io/nats.go v1.39.0 // indirect - github.com/nats-io/nkeys v0.4.9 // indirect + github.com/nats-io/nkeys v0.4.10 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect @@ -131,14 +131,14 @@ require ( github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/zap/exp v0.3.0 // indirect - golang.org/x/crypto v0.32.0 // indirect + golang.org/x/crypto v0.33.0 // indirect golang.org/x/mod v0.23.0 // indirect - golang.org/x/net v0.34.0 // indirect + golang.org/x/net v0.35.0 // indirect golang.org/x/sys v0.30.0 // indirect - golang.org/x/text v0.21.0 // indirect - golang.org/x/tools v0.29.0 // indirect + golang.org/x/text v0.22.0 // indirect + golang.org/x/tools v0.30.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250204164813-702378808489 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250212204824-5a70512c5d8b // indirect google.golang.org/grpc v1.70.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index bc8972692..e0df8b6bd 100644 --- a/go.sum +++ b/go.sum @@ -72,8 +72,8 @@ github.com/florianl/go-tc v0.4.4 h1:q6lhEWEfyhGffRzdl3eIcNqX/yVIw0IJwXqa9Rdcctw= github.com/florianl/go-tc v0.4.4/go.mod h1:uvp6pIlOw7Z8hhfnT5M4+V1hHVgZWRZwwMS8Z0JsRxc= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= -github.com/frostbyte73/core v0.1.0 h1:KA4klxRjLbEHLv+judmlRtweyjcj1NWOJ+BQHQgNxfw= -github.com/frostbyte73/core v0.1.0/go.mod h1:mhfOtR+xWAvwXiwor7jnqPMnu4fxbv1F2MwZ0BEpzZo= +github.com/frostbyte73/core v0.1.1 h1:ChhJOR7bAKOCPbA+lqDLE2cGKlCG5JXsDvvQr4YaJIA= +github.com/frostbyte73/core v0.1.1/go.mod h1:mhfOtR+xWAvwXiwor7jnqPMnu4fxbv1F2MwZ0BEpzZo= github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34= @@ -170,8 +170,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 h1:GX7KF/V9ExmcfT/2Bdia8aROjkxrgx7WpyH7w9MB4J4= github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564/go.mod h1:36s+wwmU3O40IAhE+MjBWP3W71QRiEE9SfooSBvtBqY= -github.com/livekit/protocol v1.33.1-0.20250207102646-9ce0387a72a4 h1:emGZ0j5nDb5rmiz4oSpUM+m44K16Fl9/f8EF8TUTRLc= -github.com/livekit/protocol v1.33.1-0.20250207102646-9ce0387a72a4/go.mod h1:23cpl78K2XxTrYB9VT2qw/YTB86/Au/i2ohO0xNIIHM= +github.com/livekit/protocol v1.33.1-0.20250213045117-9b6e5d703ff8 h1:TwYfrw9nnuhpTYsgY/T+OQi8z9s1itgdzNMDAOmq3BA= +github.com/livekit/protocol v1.33.1-0.20250213045117-9b6e5d703ff8/go.mod h1:yXuQ7ucrLj91nbxL6/AHgtxdha1DGzLj1LkgvnT90So= github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126 h1:fzuYpAQbCid7ySPpQWWePfQOWUrs8x6dJ0T3Wl07n+Y= github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126/go.mod h1:X5WtEZ7OnEs72Fi5/J+i0on3964F1aynQpCalcgMqRo= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= @@ -217,8 +217,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nats-io/nats.go v1.39.0 h1:2/yg2JQjiYYKLwDuBzV0FbB2sIV+eFNkEevlRi4n9lI= github.com/nats-io/nats.go v1.39.0/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM= -github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= -github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= +github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc= +github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= @@ -362,10 +362,10 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= -golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= -golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= -golang.org/x/exp v0.0.0-20250207012021-f9890c6ad9f3 h1:qNgPs5exUA+G0C96DrPwNrvLSj7GT/9D+3WMWUcUg34= -golang.org/x/exp v0.0.0-20250207012021-f9890c6ad9f3/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU= +golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= +golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac h1:l5+whBCLH3iH2ZNHYLbAe58bo7yrN4mVcnkHDYz5vvs= +golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac/go.mod h1:hH+7mtFmImwwcMvScyxUhjuVHR3HGaDPMn9rMSUUbxo= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -396,8 +396,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= -golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= -golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -458,8 +458,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= @@ -468,16 +468,16 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= -golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE= -golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588= +golang.org/x/tools v0.30.0 h1:BgcpHewrV5AUp2G9MebG4XPFI1E2W41zU1SaqVA9vJY= +golang.org/x/tools v0.30.0/go.mod h1:c347cR/OJfw5TI+GfX7RUPNMdDRRbjvYTS0jPyvsVtY= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a h1:OAiGFfOiA0v9MRYsSidp3ubZaBnteRUyn3xB2ZQ5G/E= google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a/go.mod h1:jehYqy3+AhJU9ve55aNOaSml7wUXjF9x6z2LcCfpAhY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250204164813-702378808489 h1:5bKytslY8ViY0Cj/ewmRtrWHW64bNF03cAatUUFCdFI= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250204164813-702378808489/go.mod h1:8BS3B93F/U1juMFq9+EDk+qOT5CO1R9IzXxG3PTqiRk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250212204824-5a70512c5d8b h1:FQtJ1MxbXoIIrZHZ33M+w5+dAP9o86rgpjoKr/ZmT7k= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250212204824-5a70512c5d8b/go.mod h1:8BS3B93F/U1juMFq9+EDk+qOT5CO1R9IzXxG3PTqiRk= google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 1ef7867c1..f1df2a8d9 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -89,23 +89,23 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService) topicFormatter := rpc.NewTopicFormatter() - roomClient, err := rpc.NewTypedRoomClient(clientParams) + v, err := rpc.NewTypedRoomClient(clientParams) if err != nil { return nil, err } - participantClient, err := rpc.NewTypedParticipantClient(clientParams) + v2, err := rpc.NewTypedParticipantClient(clientParams) if err != nil { return nil, err } - roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient) + roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2) if err != nil { return nil, err } - agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) + v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) if err != nil { return nil, err } - agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router) + agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router) egressService := NewEgressService(egressClient, rtcEgressLauncher, objectStore, ioInfoService, roomService) ingressConfig := getIngressConfig(conf) ingressClient, err := rpc.NewIngressClient(clientParams) diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index b6e6c703c..d0ba9389a 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -522,6 +522,17 @@ func (t *telemetryService) APICall(ctx context.Context, apiCallInfo *livekit.API }) } +func (t *telemetryService) Webhook(ctx context.Context, webhookInfo *livekit.WebhookInfo) { + t.enqueue(func() { + ev := &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_WEBHOOK, + Timestamp: timestamppb.Now(), + Webhook: webhookInfo, + } + t.SendEvent(ctx, ev) + }) +} + // returns a livekit.Room with only name and sid filled out // returns nil if room is not found func (t *telemetryService) getRoomDetails(participantID livekit.ParticipantID) *livekit.Room { diff --git a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go index f3605cc67..52ab44536 100644 --- a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go +++ b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go @@ -266,6 +266,12 @@ type FakeTelemetryService struct { arg3 *livekit.TrackInfo arg4 bool } + WebhookStub func(context.Context, *livekit.WebhookInfo) + webhookMutex sync.RWMutex + webhookArgsForCall []struct { + arg1 context.Context + arg2 *livekit.WebhookInfo + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -1495,6 +1501,39 @@ func (fake *FakeTelemetryService) TrackUnsubscribedArgsForCall(i int) (context.C return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 } +func (fake *FakeTelemetryService) Webhook(arg1 context.Context, arg2 *livekit.WebhookInfo) { + fake.webhookMutex.Lock() + fake.webhookArgsForCall = append(fake.webhookArgsForCall, struct { + arg1 context.Context + arg2 *livekit.WebhookInfo + }{arg1, arg2}) + stub := fake.WebhookStub + fake.recordInvocation("Webhook", []interface{}{arg1, arg2}) + fake.webhookMutex.Unlock() + if stub != nil { + fake.WebhookStub(arg1, arg2) + } +} + +func (fake *FakeTelemetryService) WebhookCallCount() int { + fake.webhookMutex.RLock() + defer fake.webhookMutex.RUnlock() + return len(fake.webhookArgsForCall) +} + +func (fake *FakeTelemetryService) WebhookCalls(stub func(context.Context, *livekit.WebhookInfo)) { + fake.webhookMutex.Lock() + defer fake.webhookMutex.Unlock() + fake.WebhookStub = stub +} + +func (fake *FakeTelemetryService) WebhookArgsForCall(i int) (context.Context, *livekit.WebhookInfo) { + fake.webhookMutex.RLock() + defer fake.webhookMutex.RUnlock() + argsForCall := fake.webhookArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -1570,6 +1609,8 @@ func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} { defer fake.trackUnpublishedMutex.RUnlock() fake.trackUnsubscribedMutex.RLock() defer fake.trackUnsubscribedMutex.RUnlock() + fake.webhookMutex.RLock() + defer fake.webhookMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 9d2cdf371..d17099d26 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -79,6 +79,7 @@ type TelemetryService interface { LocalRoomState(ctx context.Context, info *livekit.AnalyticsNodeRooms) Report(ctx context.Context, reportInfo *livekit.ReportInfo) APICall(ctx context.Context, apiCallInfo *livekit.APICallInfo) + Webhook(ctx context.Context, webhookInfo *livekit.WebhookInfo) // helpers AnalyticsService @@ -110,8 +111,7 @@ type telemetryService struct { func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsService) TelemetryService { t := &telemetryService{ AnalyticsService: analytics, - - notifier: notifier, + notifier: notifier, jobsQueue: utils.NewOpsQueue(utils.OpsQueueParams{ Name: "telemetry", MinSize: jobsQueueMinSize, @@ -120,6 +120,11 @@ func NewTelemetryService(notifier webhook.QueuedNotifier, analytics AnalyticsSer }), workers: make(map[livekit.ParticipantID]*StatsWorker), } + if t.notifier != nil { + t.notifier.RegisterProcessedHook(func(ctx context.Context, whi *livekit.WebhookInfo) { + t.Webhook(ctx, whi) + }) + } t.jobsQueue.Start() go t.run()