From 99afbf587bb5c533d7ddce633875b2a7cbb556a1 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 7 Feb 2025 16:16:41 +0530 Subject: [PATCH] Use a Twirp server hook to send API call details to telemetry. (#3401) * Use a Twirp server hook to send API call details to telemetry. * mage generate and clean up * Add project_id * deps * - Redact requests - Do not store responses - Extract top level fields room_name, room_id, participant_identity, participant_id, track_id as appropriate - Store status as int * deps --- go.mod | 21 +- go.sum | 43 +-- pkg/service/roomservice.go | 116 +++++++- pkg/service/twirp.go | 256 ++++++++++++++++-- pkg/telemetry/events.go | 15 +- .../telemetryfakes/fake_telemetry_service.go | 41 +++ pkg/telemetry/telemetryservice.go | 3 +- 7 files changed, 425 insertions(+), 70 deletions(-) diff --git a/go.mod b/go.mod index ec6b4792b..5e005566f 100644 --- a/go.mod +++ b/go.mod @@ -22,8 +22,8 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 - github.com/livekit/protocol v1.32.2-0.20250206110518-331f97dbf4f3 - github.com/livekit/psrpc v0.6.1-0.20250204212339-6de8b05bfcff + github.com/livekit/protocol v1.33.1-0.20250207102646-9ce0387a72a4 + github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126 github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.11.2 @@ -61,8 +61,9 @@ require ( ) require ( - buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.34.2-20240717164558-a6c49f84cc0f.2 // indirect - buf.build/go/protoyaml v0.2.0 // indirect + buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.0-20241127180247-a33202765966.1 // indirect + buf.build/go/protoyaml v0.3.1 // indirect + cel.dev/expr v0.19.0 // indirect dario.cat/mergo v1.0.0 // indirect github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect @@ -70,7 +71,7 @@ require ( github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/bufbuild/protovalidate-go v0.6.3 // indirect + github.com/bufbuild/protovalidate-go v0.8.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/containerd/continuity v0.4.3 // indirect @@ -86,7 +87,7 @@ require ( github.com/go-jose/go-jose/v3 v3.0.3 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/google/cel-go v0.21.0 // indirect + github.com/google/cel-go v0.22.1 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/google/subcommands v1.2.0 // indirect @@ -96,7 +97,7 @@ require ( github.com/josharian/native v1.1.0 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect - github.com/lithammer/shortuuid/v4 v4.0.0 // indirect + github.com/lithammer/shortuuid/v4 v4.2.0 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect github.com/mdlayher/netlink v1.7.1 // indirect github.com/mdlayher/socket v0.4.0 // indirect @@ -104,13 +105,13 @@ require ( github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/term v0.5.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/nats-io/nats.go v1.38.0 // indirect + github.com/nats-io/nats.go v1.39.0 // indirect github.com/nats-io/nkeys v0.4.9 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect github.com/opencontainers/runc v1.1.14 // indirect - github.com/pion/logging v0.2.2 // indirect + github.com/pion/logging v0.2.3 // indirect github.com/pion/mdns/v2 v2.0.7 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pion/srtp/v3 v3.0.4 // indirect @@ -119,7 +120,7 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect - github.com/puzpuzpuz/xsync/v3 v3.4.0 // indirect + github.com/puzpuzpuz/xsync/v3 v3.5.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect diff --git a/go.sum b/go.sum index f4bb70258..bc8972692 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,9 @@ -buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.34.2-20240717164558-a6c49f84cc0f.2 h1:SZRVx928rbYZ6hEKUIN+vtGDkl7uotABRWGY4OAg5gM= -buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.34.2-20240717164558-a6c49f84cc0f.2/go.mod h1:ylS4c28ACSI59oJrOdW4pHS4n0Hw4TgSPHn8rpHl4Yw= -buf.build/go/protoyaml v0.2.0 h1:2g3OHjtLDqXBREIOjpZGHmQ+U/4mkN1YiQjxNB68Ip8= -buf.build/go/protoyaml v0.2.0/go.mod h1:L/9QvTDkTWcDTzAL6HMfN+mYC6CmZRm2KnsUA054iL0= +buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.0-20241127180247-a33202765966.1 h1:ntAj16eF7AtUyzOOAFk5gvbAO52QmUKPKk7GmsIEORo= +buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.0-20241127180247-a33202765966.1/go.mod h1:AxRT+qTj5PJCz2nyQzsR/qxAcveW5USRhJTt/edTO5w= +buf.build/go/protoyaml v0.3.1 h1:ucyzE7DRnjX+mQ6AH4JzN0Kg50ByHHu+yrSKbgQn2D4= +buf.build/go/protoyaml v0.3.1/go.mod h1:0TzNpFQDXhwbkXb/ajLvxIijqbve+vMQvWY/b3/Dzxg= +cel.dev/expr v0.19.0 h1:lXuo+nDhpyJSpWxpPVi5cPUwzKb+dsdOiw6IreM5yt0= +cel.dev/expr v0.19.0/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw= dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= @@ -24,8 +26,8 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= -github.com/bufbuild/protovalidate-go v0.6.3 h1:wxQyzW035zM16Binbaz/nWAzS12dRIXhZdSUWRY7Fv0= -github.com/bufbuild/protovalidate-go v0.6.3/go.mod h1:J4PtwP9Z2YAGgB0+o+tTWEDtLtXvz/gfhFZD8pbzM/U= +github.com/bufbuild/protovalidate-go v0.8.0 h1:Xs3kCLCJ4tQiogJ0iOXm+ClKw/KviW3nLAryCGW2I3Y= +github.com/bufbuild/protovalidate-go v0.8.0/go.mod h1:JPWZInGm2y2NBg3vKDKdDIkvDjyLv31J3hLH5GIFc/Q= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -90,8 +92,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/cel-go v0.21.0 h1:cl6uW/gxN+Hy50tNYvI691+sXxioCnstFzLp2WO4GCI= -github.com/google/cel-go v0.21.0/go.mod h1:rHUlWCcBKgyEk+eV03RPdZUekPp6YcJwV0FxuUksYxc= +github.com/google/cel-go v0.22.1 h1:AfVXx3chM2qwoSbM7Da8g8hX8OVSkBFwX+rz2+PcK40= +github.com/google/cel-go v0.22.1/go.mod h1:BuznPXXfQDpXKWQ9sPW3TzlAJN5zzFe+i9tIs0yC4s8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -107,7 +109,6 @@ github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaU github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/google/subcommands v1.2.0 h1:vWQspBTo2nEqTUFita5/KeEWlUL8kQObDFbub/EN9oE= github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/wire v0.6.0 h1:HBkoIh4BdSxoyo9PveV8giw7ZsaBOvzWKfcg/6MrVwI= @@ -163,16 +164,16 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw7k08o4c= -github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= +github.com/lithammer/shortuuid/v4 v4.2.0 h1:LMFOzVB3996a7b8aBuEXxqOBflbfPQAiVzkIcHO0h8c= +github.com/lithammer/shortuuid/v4 v4.2.0/go.mod h1:D5noHZ2oFw/YaKCfGy0YxyE7M0wMbezmMjPdhyEFe6Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 h1:GX7KF/V9ExmcfT/2Bdia8aROjkxrgx7WpyH7w9MB4J4= github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564/go.mod h1:36s+wwmU3O40IAhE+MjBWP3W71QRiEE9SfooSBvtBqY= -github.com/livekit/protocol v1.32.2-0.20250206110518-331f97dbf4f3 h1:zg9/GTNZX3LcyEYxHidwU0zwzkOOIEM4x0n4EXGruuQ= -github.com/livekit/protocol v1.32.2-0.20250206110518-331f97dbf4f3/go.mod h1:68cVVExsPKEPBOzOyzhvGZbbn/+mIAGtAPYZ/+dWtRc= -github.com/livekit/psrpc v0.6.1-0.20250204212339-6de8b05bfcff h1:1P84qlSggoKa60H20mAUXUkzjckHGl172ilzg5OJkho= -github.com/livekit/psrpc v0.6.1-0.20250204212339-6de8b05bfcff/go.mod h1:X5WtEZ7OnEs72Fi5/J+i0on3964F1aynQpCalcgMqRo= +github.com/livekit/protocol v1.33.1-0.20250207102646-9ce0387a72a4 h1:emGZ0j5nDb5rmiz4oSpUM+m44K16Fl9/f8EF8TUTRLc= +github.com/livekit/protocol v1.33.1-0.20250207102646-9ce0387a72a4/go.mod h1:23cpl78K2XxTrYB9VT2qw/YTB86/Au/i2ohO0xNIIHM= +github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126 h1:fzuYpAQbCid7ySPpQWWePfQOWUrs8x6dJ0T3Wl07n+Y= +github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126/go.mod h1:X5WtEZ7OnEs72Fi5/J+i0on3964F1aynQpCalcgMqRo= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= github.com/mackerelio/go-osstat v0.2.5/go.mod h1:atxwWF+POUZcdtR1wnsUcQxTytoHG4uhl2AKKzrOajY= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= @@ -214,8 +215,8 @@ github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA= -github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw= +github.com/nats-io/nats.go v1.39.0 h1:2/yg2JQjiYYKLwDuBzV0FbB2sIV+eFNkEevlRi4n9lI= +github.com/nats-io/nats.go v1.39.0/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM= github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -240,8 +241,8 @@ github.com/pion/ice/v4 v4.0.5 h1:6awVfa1jg9YsI9/Lep4TG/o3kwS1Oayr5b8xz50ibJ8= github.com/pion/ice/v4 v4.0.5/go.mod h1:JJaoEIxUIlGDA9gaRZbwXYqI3j6VG/QchpjX+QmwN6A= github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI= github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y= -github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= -github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/logging v0.2.3 h1:gHuf0zpoh1GW67Nr6Gj4cv5Z9ZscU7g/EaoC/Ke/igI= +github.com/pion/logging v0.2.3/go.mod h1:z8YfknkquMe1csOrxK5kc+5/ZPAzMxbKLX5aXpbpC90= github.com/pion/mdns/v2 v2.0.7 h1:c9kM8ewCgjslaAmicYMFQIde2H9/lrZpjBkN8VwoVtM= github.com/pion/mdns/v2 v2.0.7/go.mod h1:vAdSYNAT0Jy3Ru0zl2YiW3Rm/fJCwIeM0nToenfOJKA= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= @@ -276,8 +277,8 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= -github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4= -github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= +github.com/puzpuzpuz/xsync/v3 v3.5.0 h1:i+cMcpEDY1BkNm7lPDkCtE4oElsYLn+EKF8kAu2vXT4= +github.com/puzpuzpuz/xsync/v3 v3.5.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 0506269a4..ebf88734b 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -69,7 +69,10 @@ func NewRoomService( } func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) { - AppendLogFields(ctx, "room", req.Name, "request", logger.Proto(redactCreateRoomRequest(req))) + redactedReq := redactCreateRoomRequest(req) + RecordRequest(ctx, redactedReq) + + AppendLogFields(ctx, "room", req.Name, "request", logger.Proto(redactedReq)) if err := EnsureCreatePermission(ctx); err != nil { return nil, twirpAuthError(err) } else if req.Egress != nil && s.egressLauncher == nil { @@ -85,10 +88,14 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return nil, err } - return s.router.CreateRoom(ctx, req) + room, err := s.router.CreateRoom(ctx, req) + RecordResponse(ctx, room) + return room, err } func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsRequest) (*livekit.ListRoomsResponse, error) { + RecordRequest(ctx, req) + AppendLogFields(ctx, "room", req.Names) err := EnsureListPermission(ctx) if err != nil { @@ -108,10 +115,13 @@ func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsReque res := &livekit.ListRoomsResponse{ Rooms: rooms, } + RecordResponse(ctx, res) return res, nil } func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomRequest) (*livekit.DeleteRoomResponse, error) { + RecordRequest(ctx, req) + AppendLogFields(ctx, "room", req.Room) if err := EnsureCreatePermission(ctx); err != nil { return nil, twirpAuthError(err) @@ -134,10 +144,14 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq } err = s.roomStore.DeleteRoom(ctx, livekit.RoomName(req.Room)) - return &livekit.DeleteRoomResponse{}, err + res := &livekit.DeleteRoomResponse{} + RecordResponse(ctx, res) + return res, err } func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListParticipantsRequest) (*livekit.ListParticipantsResponse, error) { + RecordRequest(ctx, req) + AppendLogFields(ctx, "room", req.Room) if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil { return nil, twirpAuthError(err) @@ -151,10 +165,13 @@ func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListPar res := &livekit.ListParticipantsResponse{ Participants: participants, } + RecordResponse(ctx, res) return res, nil } func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (*livekit.ParticipantInfo, error) { + RecordRequest(ctx, req) + AppendLogFields(ctx, "room", req.Room, "participant", req.Identity) if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil { return nil, twirpAuthError(err) @@ -165,10 +182,13 @@ func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParti return nil, err } + RecordResponse(ctx, participant) return participant, nil } func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (*livekit.RemoveParticipantResponse, error) { + RecordRequest(ctx, req) + AppendLogFields(ctx, "room", req.Room, "participant", req.Identity) if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil { @@ -179,19 +199,27 @@ func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomPa return nil, twirp.NotFoundError("participant not found") } - return s.participantClient.RemoveParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + res, err := s.participantClient.RemoveParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + RecordResponse(ctx, res) + return res, err } func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteRoomTrackRequest) (*livekit.MuteRoomTrackResponse, error) { + RecordRequest(ctx, req) + AppendLogFields(ctx, "room", req.Room, "participant", req.Identity, "trackID", req.TrackSid, "muted", req.Muted) if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil { return nil, twirpAuthError(err) } - return s.participantClient.MutePublishedTrack(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + res, err := s.participantClient.MutePublishedTrack(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + RecordResponse(ctx, res) + return res, err } func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.UpdateParticipantRequest) (*livekit.ParticipantInfo, error) { + RecordRequest(ctx, redactUpdateParticipantRequest(req)) + AppendLogFields(ctx, "room", req.Room, "participant", req.Identity) if !s.limitConf.CheckParticipantNameLength(req.Name) { @@ -210,10 +238,14 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update return nil, twirpAuthError(err) } - return s.participantClient.UpdateParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + res, err := s.participantClient.UpdateParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + RecordResponse(ctx, res) + return res, err } func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.UpdateSubscriptionsRequest) (*livekit.UpdateSubscriptionsResponse, error) { + RecordRequest(ctx, req) + trackSIDs := append(make([]string, 0), req.TrackSids...) for _, pt := range req.ParticipantTracks { trackSIDs = append(trackSIDs, pt.TrackSids...) @@ -224,10 +256,14 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda return nil, twirpAuthError(err) } - return s.participantClient.UpdateSubscriptions(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + res, err := s.participantClient.UpdateSubscriptions(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req) + RecordResponse(ctx, res) + return res, err } func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) { + RecordRequest(ctx, redactSendDataRequest(req)) + roomName := livekit.RoomName(req.Room) AppendLogFields(ctx, "room", roomName, "size", len(req.Data)) if err := EnsureAdminPermission(ctx, roomName); err != nil { @@ -239,10 +275,14 @@ func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest return nil, twirp.NewError(twirp.InvalidArgument, fmt.Sprintf("nonce should be 16-bytes or not present, got: %d bytes", len(req.Nonce))) } - return s.roomClient.SendData(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) + res, err := s.roomClient.SendData(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) + RecordResponse(ctx, res) + return res, err } func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.UpdateRoomMetadataRequest) (*livekit.Room, error) { + RecordRequest(ctx, redactUpdateRoomMetadataRequest(req)) + AppendLogFields(ctx, "room", req.Room, "size", len(req.Metadata)) maxMetadataSize := int(s.limitConf.MaxMetadataSize) if maxMetadataSize > 0 && len(req.Metadata) > maxMetadataSize { @@ -263,11 +303,12 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat return nil, err } + RecordResponse(ctx, room) return room, nil } func redactCreateRoomRequest(req *livekit.CreateRoomRequest) *livekit.CreateRoomRequest { - if req.Egress == nil { + if req.Egress == nil && req.Metadata == "" { // nothing to redact return req } @@ -284,5 +325,62 @@ func redactCreateRoomRequest(req *livekit.CreateRoomRequest) *livekit.CreateRoom egress.RedactUpload(clone.Egress.Tracks) } + // replace with size of metadata to provide visibility on request size + if clone.Metadata != "" { + clone.Metadata = fmt.Sprintf("__size: %d", len(clone.Metadata)) + } + + return clone +} + +func redactUpdateParticipantRequest(req *livekit.UpdateParticipantRequest) *livekit.UpdateParticipantRequest { + if req.Metadata == "" && len(req.Attributes) == 0 { + return req + } + + clone := utils.CloneProto(req) + + // replace with size of metadata/attributes to provide visibility on request size + if clone.Metadata != "" { + clone.Metadata = fmt.Sprintf("__size: %d", len(clone.Metadata)) + } + + if len(clone.Attributes) != 0 { + total := 0 + for k, v := range clone.Attributes { + total += len(k) + len(v) + } + + clone.Attributes = map[string]string{ + "__size": fmt.Sprintf("%d", total), + } + } + + return clone +} + +func redactSendDataRequest(req *livekit.SendDataRequest) *livekit.SendDataRequest { + if len(req.Data) == 0 { + return req + } + + clone := utils.CloneProto(req) + + // replace with size of data to provide visibility on request size + clone.Data = []byte(fmt.Sprintf("__size: %d", len(clone.Data))) + + return clone +} + +func redactUpdateRoomMetadataRequest(req *livekit.UpdateRoomMetadataRequest) *livekit.UpdateRoomMetadataRequest { + if req.Metadata == "" { + return req + } + + clone := utils.CloneProto(req) + + // replace with size of metadata to provide visibility on request size + clone.Metadata = fmt.Sprintf("__size: %d", len(clone.Metadata)) + return clone } diff --git a/pkg/service/twirp.go b/pkg/service/twirp.go index b7b31d764..6434f2cd8 100644 --- a/pkg/service/twirp.go +++ b/pkg/service/twirp.go @@ -23,43 +23,48 @@ import ( "time" "github.com/twitchtv/twirp" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/livekit-server/pkg/utils" + "github.com/livekit/protocol/livekit" ) -type twirpLoggerContext struct{} -type statusReporterKey struct{} - type twirpRequestFields struct { service string method string error twirp.Error } +// -------------------------------------------------------------------------- + +type twirpLoggerKey struct{} + // logging handling inspired by https://github.com/bakins/twirpzap // License: Apache-2.0 func TwirpLogger() *twirp.ServerHooks { loggerPool := &sync.Pool{ New: func() interface{} { - return &requestLogger{ + return &twirpLogger{ fieldsOrig: make([]interface{}, 0, 30), } }, } return &twirp.ServerHooks{ RequestReceived: func(ctx context.Context) (context.Context, error) { - return requestReceived(ctx, loggerPool) + return loggerRequestReceived(ctx, loggerPool) }, - RequestRouted: responseRouted, - Error: errorReceived, + RequestRouted: loggerRequestRouted, + Error: loggerErrorReceived, ResponseSent: func(ctx context.Context) { - responseSent(ctx, loggerPool) + loggerResponseSent(ctx, loggerPool) }, } } -type requestLogger struct { +type twirpLogger struct { twirpRequestFields fieldsOrig []interface{} @@ -68,7 +73,7 @@ type requestLogger struct { } func AppendLogFields(ctx context.Context, fields ...interface{}) { - r, ok := ctx.Value(twirpLoggerContext{}).(*requestLogger) + r, ok := ctx.Value(twirpLoggerKey{}).(*twirpLogger) if !ok || r == nil { return } @@ -76,8 +81,8 @@ func AppendLogFields(ctx context.Context, fields ...interface{}) { r.fields = append(r.fields, fields...) } -func requestReceived(ctx context.Context, requestLoggerPool *sync.Pool) (context.Context, error) { - r := requestLoggerPool.Get().(*requestLogger) +func loggerRequestReceived(ctx context.Context, twirpLoggerPool *sync.Pool) (context.Context, error) { + r := twirpLoggerPool.Get().(*twirpLogger) r.startedAt = time.Now() r.fields = r.fieldsOrig r.error = nil @@ -87,13 +92,12 @@ func requestReceived(ctx context.Context, requestLoggerPool *sync.Pool) (context r.fields = append(r.fields, "service", svc) } - ctx = context.WithValue(ctx, twirpLoggerContext{}, r) - return ctx, nil + return context.WithValue(ctx, twirpLoggerKey{}, r), nil } -func responseRouted(ctx context.Context) (context.Context, error) { +func loggerRequestRouted(ctx context.Context) (context.Context, error) { if meth, ok := twirp.MethodName(ctx); ok { - l, ok := ctx.Value(twirpLoggerContext{}).(*requestLogger) + l, ok := ctx.Value(twirpLoggerKey{}).(*twirpLogger) if !ok || l == nil { return ctx, nil } @@ -104,8 +108,8 @@ func responseRouted(ctx context.Context) (context.Context, error) { return ctx, nil } -func responseSent(ctx context.Context, requestLoggerPool *sync.Pool) { - r, ok := ctx.Value(twirpLoggerContext{}).(*requestLogger) +func loggerResponseSent(ctx context.Context, twirpLoggerPool *sync.Pool) { + r, ok := ctx.Value(twirpLoggerKey{}).(*twirpLogger) if !ok || r == nil { return } @@ -126,24 +130,27 @@ func responseSent(ctx context.Context, requestLoggerPool *sync.Pool) { r.fields = r.fieldsOrig r.error = nil - requestLoggerPool.Put(r) + twirpLoggerPool.Put(r) } -func errorReceived(ctx context.Context, e twirp.Error) context.Context { - r, ok := ctx.Value(twirpLoggerContext{}).(*requestLogger) +func loggerErrorReceived(ctx context.Context, e twirp.Error) context.Context { + r, ok := ctx.Value(twirpLoggerKey{}).(*twirpLogger) if !ok || r == nil { return ctx } r.error = e - return ctx } +// -------------------------------------------------------------------------- + +type statusReporterKey struct{} + func TwirpRequestStatusReporter() *twirp.ServerHooks { return &twirp.ServerHooks{ RequestReceived: statusReporterRequestReceived, - RequestRouted: statusReporterResponseRouted, + RequestRouted: statusReporterRequestRouted, Error: statusReporterErrorReceived, ResponseSent: statusReporterResponseSent, } @@ -156,11 +163,10 @@ func statusReporterRequestReceived(ctx context.Context) (context.Context, error) r.service = svc } - ctx = context.WithValue(ctx, statusReporterKey{}, r) - return ctx, nil + return context.WithValue(ctx, statusReporterKey{}, r), nil } -func statusReporterResponseRouted(ctx context.Context) (context.Context, error) { +func statusReporterRequestRouted(ctx context.Context) (context.Context, error) { if meth, ok := twirp.MethodName(ctx); ok { l, ok := ctx.Value(statusReporterKey{}).(*twirpRequestFields) if !ok || l == nil { @@ -207,6 +213,202 @@ func statusReporterErrorReceived(ctx context.Context, e twirp.Error) context.Con } r.error = e - return ctx } + +// -------------------------------------------------------------------------- + +type twirpTelemetryKey struct{} + +func TwirpTelemetry( + nodeID livekit.NodeID, + getProjectID func(ctx context.Context) string, + telemetry telemetry.TelemetryService, +) *twirp.ServerHooks { + return &twirp.ServerHooks{ + RequestReceived: telemetryRequestReceived, + Error: telemetryErrorReceived, + ResponseSent: func(ctx context.Context) { + telemetryResponseSent(ctx, nodeID, getProjectID, telemetry) + }, + } +} + +func RecordRequest(ctx context.Context, request proto.Message) { + if request == nil { + return + } + + a, ok := ctx.Value(twirpTelemetryKey{}).(*livekit.APICallInfo) + if !ok || a == nil { + return + } + + // capture request and extract common fields to top level as appropriate + switch msg := request.(type) { + case *livekit.CreateRoomRequest: + a.Request = &livekit.APICallRequest{ + Message: &livekit.APICallRequest_CreateRoomRequest{ + CreateRoomRequest: msg, + }, + } + a.RoomName = msg.Name + + case *livekit.ListRoomsRequest: + a.Request = &livekit.APICallRequest{ + Message: &livekit.APICallRequest_ListRoomsRequest{ + ListRoomsRequest: msg, + }, + } + + case *livekit.DeleteRoomRequest: + a.Request = &livekit.APICallRequest{ + Message: &livekit.APICallRequest_DeleteRoomRequest{ + DeleteRoomRequest: msg, + }, + } + a.RoomName = msg.Room + + case *livekit.ListParticipantsRequest: + a.Request = &livekit.APICallRequest{ + Message: &livekit.APICallRequest_ListParticipantsRequest{ + ListParticipantsRequest: msg, + }, + } + a.RoomName = msg.Room + + case *livekit.RoomParticipantIdentity: + a.Request = &livekit.APICallRequest{ + Message: &livekit.APICallRequest_RoomParticipantIdentity{ + RoomParticipantIdentity: msg, + }, + } + a.RoomName = msg.Room + a.ParticipantIdentity = msg.Identity + + case *livekit.MuteRoomTrackRequest: + a.Request = &livekit.APICallRequest{ + Message: &livekit.APICallRequest_MuteRoomTrackRequest{ + MuteRoomTrackRequest: msg, + }, + } + a.RoomName = msg.Room + a.ParticipantIdentity = msg.Identity + a.TrackId = msg.TrackSid + + case *livekit.UpdateParticipantRequest: + a.Request = &livekit.APICallRequest{ + Message: &livekit.APICallRequest_UpdateParticipantRequest{ + UpdateParticipantRequest: msg, + }, + } + a.RoomName = msg.Room + a.ParticipantIdentity = msg.Identity + + case *livekit.UpdateSubscriptionsRequest: + a.Request = &livekit.APICallRequest{ + Message: &livekit.APICallRequest_UpdateSubscriptionsRequest{ + UpdateSubscriptionsRequest: msg, + }, + } + a.RoomName = msg.Room + a.ParticipantIdentity = msg.Identity + + case *livekit.SendDataRequest: + a.Request = &livekit.APICallRequest{ + Message: &livekit.APICallRequest_SendDataRequest{ + SendDataRequest: msg, + }, + } + a.RoomName = msg.Room + + case *livekit.UpdateRoomMetadataRequest: + a.Request = &livekit.APICallRequest{ + Message: &livekit.APICallRequest_UpdateRoomMetadataRequest{ + UpdateRoomMetadataRequest: msg, + }, + } + } +} + +func RecordResponse(ctx context.Context, response proto.Message) { + if response == nil { + return + } + + a, ok := ctx.Value(twirpTelemetryKey{}).(*livekit.APICallInfo) + if !ok || a == nil { + return + } + + // extract common fields to top level as appropriate + switch msg := response.(type) { + case *livekit.Room: + a.RoomId = msg.Sid + + case *livekit.ParticipantInfo: + a.ParticipantId = msg.Sid + } +} + +func telemetryRequestReceived(ctx context.Context) (context.Context, error) { + a := &livekit.APICallInfo{} + a.StartedAt = timestamppb.Now() + + if svc, ok := twirp.ServiceName(ctx); ok { + a.Service = svc + } + + return context.WithValue(ctx, twirpTelemetryKey{}, a), nil +} + +func telemetryRequestRouted(ctx context.Context) (context.Context, error) { + if meth, ok := twirp.MethodName(ctx); ok { + a, ok := ctx.Value(twirpTelemetryKey{}).(*livekit.APICallInfo) + if !ok || a == nil { + return ctx, nil + } + a.Method = meth + } + + return ctx, nil +} + +func telemetryResponseSent( + ctx context.Context, + nodeID livekit.NodeID, + getProjectID func(ctx context.Context) string, + telemetry telemetry.TelemetryService, +) { + a, ok := ctx.Value(twirpTelemetryKey{}).(*livekit.APICallInfo) + if !ok || a == nil { + return + } + + if getProjectID != nil { + a.ProjectId = getProjectID(ctx) + } + a.NodeId = string(nodeID) + if statusCode, ok := twirp.StatusCode(ctx); ok { + if status, err := strconv.Atoi(statusCode); err == nil { + a.Status = int32(status) + } + } + a.DurationNs = time.Since(a.StartedAt.AsTime()).Nanoseconds() + if telemetry != nil { + telemetry.APICall(ctx, a) + } +} + +func telemetryErrorReceived(ctx context.Context, e twirp.Error) context.Context { + a, ok := ctx.Value(twirpTelemetryKey{}).(*livekit.APICallInfo) + if !ok || a == nil { + return ctx + } + + a.TwirpErrorCode = string(e.Code()) + a.TwirpErrorMessage = e.Msg() + return ctx +} + +// -------------------------------------------------------------------------- diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index 23f86803f..1e59e0b9f 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -499,12 +499,23 @@ func (t *telemetryService) IngressEnded(ctx context.Context, info *livekit.Ingre }) } -func (t *telemetryService) Report(ctx context.Context, report *livekit.ReportInfo) { +func (t *telemetryService) Report(ctx context.Context, reportInfo *livekit.ReportInfo) { t.enqueue(func() { ev := &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_REPORT, Timestamp: timestamppb.Now(), - Report: report, + Report: reportInfo, + } + t.SendEvent(ctx, ev) + }) +} + +func (t *telemetryService) APICall(ctx context.Context, apiCallInfo *livekit.APICallInfo) { + t.enqueue(func() { + ev := &livekit.AnalyticsEvent{ + Type: livekit.AnalyticsEventType_API_CALL, + Timestamp: timestamppb.Now(), + ApiCall: apiCallInfo, } t.SendEvent(ctx, ev) }) diff --git a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go index a304f95f9..cdfecfdae 100644 --- a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go +++ b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go @@ -10,6 +10,12 @@ import ( ) type FakeTelemetryService struct { + APICallStub func(context.Context, *livekit.APICallInfo) + aPICallMutex sync.RWMutex + aPICallArgsForCall []struct { + arg1 context.Context + arg2 *livekit.APICallInfo + } EgressEndedStub func(context.Context, *livekit.EgressInfo) egressEndedMutex sync.RWMutex egressEndedArgsForCall []struct { @@ -263,6 +269,39 @@ type FakeTelemetryService struct { invocationsMutex sync.RWMutex } +func (fake *FakeTelemetryService) APICall(arg1 context.Context, arg2 *livekit.APICallInfo) { + fake.aPICallMutex.Lock() + fake.aPICallArgsForCall = append(fake.aPICallArgsForCall, struct { + arg1 context.Context + arg2 *livekit.APICallInfo + }{arg1, arg2}) + stub := fake.APICallStub + fake.recordInvocation("APICall", []interface{}{arg1, arg2}) + fake.aPICallMutex.Unlock() + if stub != nil { + fake.APICallStub(arg1, arg2) + } +} + +func (fake *FakeTelemetryService) APICallCallCount() int { + fake.aPICallMutex.RLock() + defer fake.aPICallMutex.RUnlock() + return len(fake.aPICallArgsForCall) +} + +func (fake *FakeTelemetryService) APICallCalls(stub func(context.Context, *livekit.APICallInfo)) { + fake.aPICallMutex.Lock() + defer fake.aPICallMutex.Unlock() + fake.APICallStub = stub +} + +func (fake *FakeTelemetryService) APICallArgsForCall(i int) (context.Context, *livekit.APICallInfo) { + fake.aPICallMutex.RLock() + defer fake.aPICallMutex.RUnlock() + argsForCall := fake.aPICallArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + func (fake *FakeTelemetryService) EgressEnded(arg1 context.Context, arg2 *livekit.EgressInfo) { fake.egressEndedMutex.Lock() fake.egressEndedArgsForCall = append(fake.egressEndedArgsForCall, struct { @@ -1458,6 +1497,8 @@ func (fake *FakeTelemetryService) TrackUnsubscribedArgsForCall(i int) (context.C func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() + fake.aPICallMutex.RLock() + defer fake.aPICallMutex.RUnlock() fake.egressEndedMutex.RLock() defer fake.egressEndedMutex.RUnlock() fake.egressStartedMutex.RLock() diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index ba953ed75..226a52414 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -76,7 +76,8 @@ type TelemetryService interface { IngressUpdated(ctx context.Context, info *livekit.IngressInfo) IngressEnded(ctx context.Context, info *livekit.IngressInfo) LocalRoomState(ctx context.Context, info *livekit.AnalyticsNodeRooms) - Report(ctx context.Context, report *livekit.ReportInfo) + Report(ctx context.Context, reportInfo *livekit.ReportInfo) + APICall(ctx context.Context, apiCallInfo *livekit.APICallInfo) // helpers AnalyticsService