Use a Twirp server hook to send API call details to telemetry. (#3401)

* Use a Twirp server hook to send API call details to telemetry.

* mage generate and clean up

* Add project_id

* deps

* - Redact requests
- Do not store responses
- Extract top level fields room_name, room_id, participant_identity,
  participant_id, track_id as appropriate
- Store status as int

* deps
This commit is contained in:
Raja Subramanian
2025-02-07 16:16:41 +05:30
committed by GitHub
parent 7ebe528792
commit 99afbf587b
7 changed files with 425 additions and 70 deletions

21
go.mod
View File

@@ -22,8 +22,8 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564
github.com/livekit/protocol v1.32.2-0.20250206110518-331f97dbf4f3
github.com/livekit/psrpc v0.6.1-0.20250204212339-6de8b05bfcff
github.com/livekit/protocol v1.33.1-0.20250207102646-9ce0387a72a4
github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.11.2
@@ -61,8 +61,9 @@ require (
)
require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.34.2-20240717164558-a6c49f84cc0f.2 // indirect
buf.build/go/protoyaml v0.2.0 // indirect
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.0-20241127180247-a33202765966.1 // indirect
buf.build/go/protoyaml v0.3.1 // indirect
cel.dev/expr v0.19.0 // indirect
dario.cat/mergo v1.0.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
@@ -70,7 +71,7 @@ require (
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bufbuild/protovalidate-go v0.6.3 // indirect
github.com/bufbuild/protovalidate-go v0.8.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/containerd/continuity v0.4.3 // indirect
@@ -86,7 +87,7 @@ require (
github.com/go-jose/go-jose/v3 v3.0.3 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/cel-go v0.21.0 // indirect
github.com/google/cel-go v0.22.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/subcommands v1.2.0 // indirect
@@ -96,7 +97,7 @@ require (
github.com/josharian/native v1.1.0 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/lithammer/shortuuid/v4 v4.0.0 // indirect
github.com/lithammer/shortuuid/v4 v4.2.0 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/mdlayher/netlink v1.7.1 // indirect
github.com/mdlayher/socket v0.4.0 // indirect
@@ -104,13 +105,13 @@ require (
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nats.go v1.38.0 // indirect
github.com/nats-io/nats.go v1.39.0 // indirect
github.com/nats-io/nkeys v0.4.9 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/opencontainers/runc v1.1.14 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/logging v0.2.3 // indirect
github.com/pion/mdns/v2 v2.0.7 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/srtp/v3 v3.0.4 // indirect
@@ -119,7 +120,7 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/puzpuzpuz/xsync/v3 v3.4.0 // indirect
github.com/puzpuzpuz/xsync/v3 v3.5.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect

43
go.sum
View File

@@ -1,7 +1,9 @@
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.34.2-20240717164558-a6c49f84cc0f.2 h1:SZRVx928rbYZ6hEKUIN+vtGDkl7uotABRWGY4OAg5gM=
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.34.2-20240717164558-a6c49f84cc0f.2/go.mod h1:ylS4c28ACSI59oJrOdW4pHS4n0Hw4TgSPHn8rpHl4Yw=
buf.build/go/protoyaml v0.2.0 h1:2g3OHjtLDqXBREIOjpZGHmQ+U/4mkN1YiQjxNB68Ip8=
buf.build/go/protoyaml v0.2.0/go.mod h1:L/9QvTDkTWcDTzAL6HMfN+mYC6CmZRm2KnsUA054iL0=
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.0-20241127180247-a33202765966.1 h1:ntAj16eF7AtUyzOOAFk5gvbAO52QmUKPKk7GmsIEORo=
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.0-20241127180247-a33202765966.1/go.mod h1:AxRT+qTj5PJCz2nyQzsR/qxAcveW5USRhJTt/edTO5w=
buf.build/go/protoyaml v0.3.1 h1:ucyzE7DRnjX+mQ6AH4JzN0Kg50ByHHu+yrSKbgQn2D4=
buf.build/go/protoyaml v0.3.1/go.mod h1:0TzNpFQDXhwbkXb/ajLvxIijqbve+vMQvWY/b3/Dzxg=
cel.dev/expr v0.19.0 h1:lXuo+nDhpyJSpWxpPVi5cPUwzKb+dsdOiw6IreM5yt0=
cel.dev/expr v0.19.0/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw=
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
@@ -24,8 +26,8 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bufbuild/protovalidate-go v0.6.3 h1:wxQyzW035zM16Binbaz/nWAzS12dRIXhZdSUWRY7Fv0=
github.com/bufbuild/protovalidate-go v0.6.3/go.mod h1:J4PtwP9Z2YAGgB0+o+tTWEDtLtXvz/gfhFZD8pbzM/U=
github.com/bufbuild/protovalidate-go v0.8.0 h1:Xs3kCLCJ4tQiogJ0iOXm+ClKw/KviW3nLAryCGW2I3Y=
github.com/bufbuild/protovalidate-go v0.8.0/go.mod h1:JPWZInGm2y2NBg3vKDKdDIkvDjyLv31J3hLH5GIFc/Q=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
@@ -90,8 +92,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/cel-go v0.21.0 h1:cl6uW/gxN+Hy50tNYvI691+sXxioCnstFzLp2WO4GCI=
github.com/google/cel-go v0.21.0/go.mod h1:rHUlWCcBKgyEk+eV03RPdZUekPp6YcJwV0FxuUksYxc=
github.com/google/cel-go v0.22.1 h1:AfVXx3chM2qwoSbM7Da8g8hX8OVSkBFwX+rz2+PcK40=
github.com/google/cel-go v0.22.1/go.mod h1:BuznPXXfQDpXKWQ9sPW3TzlAJN5zzFe+i9tIs0yC4s8=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
@@ -107,7 +109,6 @@ github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaU
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/google/subcommands v1.2.0 h1:vWQspBTo2nEqTUFita5/KeEWlUL8kQObDFbub/EN9oE=
github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/wire v0.6.0 h1:HBkoIh4BdSxoyo9PveV8giw7ZsaBOvzWKfcg/6MrVwI=
@@ -163,16 +164,16 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw7k08o4c=
github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y=
github.com/lithammer/shortuuid/v4 v4.2.0 h1:LMFOzVB3996a7b8aBuEXxqOBflbfPQAiVzkIcHO0h8c=
github.com/lithammer/shortuuid/v4 v4.2.0/go.mod h1:D5noHZ2oFw/YaKCfGy0YxyE7M0wMbezmMjPdhyEFe6Y=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 h1:GX7KF/V9ExmcfT/2Bdia8aROjkxrgx7WpyH7w9MB4J4=
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564/go.mod h1:36s+wwmU3O40IAhE+MjBWP3W71QRiEE9SfooSBvtBqY=
github.com/livekit/protocol v1.32.2-0.20250206110518-331f97dbf4f3 h1:zg9/GTNZX3LcyEYxHidwU0zwzkOOIEM4x0n4EXGruuQ=
github.com/livekit/protocol v1.32.2-0.20250206110518-331f97dbf4f3/go.mod h1:68cVVExsPKEPBOzOyzhvGZbbn/+mIAGtAPYZ/+dWtRc=
github.com/livekit/psrpc v0.6.1-0.20250204212339-6de8b05bfcff h1:1P84qlSggoKa60H20mAUXUkzjckHGl172ilzg5OJkho=
github.com/livekit/psrpc v0.6.1-0.20250204212339-6de8b05bfcff/go.mod h1:X5WtEZ7OnEs72Fi5/J+i0on3964F1aynQpCalcgMqRo=
github.com/livekit/protocol v1.33.1-0.20250207102646-9ce0387a72a4 h1:emGZ0j5nDb5rmiz4oSpUM+m44K16Fl9/f8EF8TUTRLc=
github.com/livekit/protocol v1.33.1-0.20250207102646-9ce0387a72a4/go.mod h1:23cpl78K2XxTrYB9VT2qw/YTB86/Au/i2ohO0xNIIHM=
github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126 h1:fzuYpAQbCid7ySPpQWWePfQOWUrs8x6dJ0T3Wl07n+Y=
github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126/go.mod h1:X5WtEZ7OnEs72Fi5/J+i0on3964F1aynQpCalcgMqRo=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=
github.com/mackerelio/go-osstat v0.2.5/go.mod h1:atxwWF+POUZcdtR1wnsUcQxTytoHG4uhl2AKKzrOajY=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
@@ -214,8 +215,8 @@ github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA=
github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw=
github.com/nats-io/nats.go v1.39.0 h1:2/yg2JQjiYYKLwDuBzV0FbB2sIV+eFNkEevlRi4n9lI=
github.com/nats-io/nats.go v1.39.0/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM=
github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0=
github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
@@ -240,8 +241,8 @@ github.com/pion/ice/v4 v4.0.5 h1:6awVfa1jg9YsI9/Lep4TG/o3kwS1Oayr5b8xz50ibJ8=
github.com/pion/ice/v4 v4.0.5/go.mod h1:JJaoEIxUIlGDA9gaRZbwXYqI3j6VG/QchpjX+QmwN6A=
github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI=
github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/logging v0.2.3 h1:gHuf0zpoh1GW67Nr6Gj4cv5Z9ZscU7g/EaoC/Ke/igI=
github.com/pion/logging v0.2.3/go.mod h1:z8YfknkquMe1csOrxK5kc+5/ZPAzMxbKLX5aXpbpC90=
github.com/pion/mdns/v2 v2.0.7 h1:c9kM8ewCgjslaAmicYMFQIde2H9/lrZpjBkN8VwoVtM=
github.com/pion/mdns/v2 v2.0.7/go.mod h1:vAdSYNAT0Jy3Ru0zl2YiW3Rm/fJCwIeM0nToenfOJKA=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
@@ -276,8 +277,8 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4=
github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
github.com/puzpuzpuz/xsync/v3 v3.5.0 h1:i+cMcpEDY1BkNm7lPDkCtE4oElsYLn+EKF8kAu2vXT4=
github.com/puzpuzpuz/xsync/v3 v3.5.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=

View File

@@ -69,7 +69,10 @@ func NewRoomService(
}
func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) {
AppendLogFields(ctx, "room", req.Name, "request", logger.Proto(redactCreateRoomRequest(req)))
redactedReq := redactCreateRoomRequest(req)
RecordRequest(ctx, redactedReq)
AppendLogFields(ctx, "room", req.Name, "request", logger.Proto(redactedReq))
if err := EnsureCreatePermission(ctx); err != nil {
return nil, twirpAuthError(err)
} else if req.Egress != nil && s.egressLauncher == nil {
@@ -85,10 +88,14 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
return nil, err
}
return s.router.CreateRoom(ctx, req)
room, err := s.router.CreateRoom(ctx, req)
RecordResponse(ctx, room)
return room, err
}
func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsRequest) (*livekit.ListRoomsResponse, error) {
RecordRequest(ctx, req)
AppendLogFields(ctx, "room", req.Names)
err := EnsureListPermission(ctx)
if err != nil {
@@ -108,10 +115,13 @@ func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsReque
res := &livekit.ListRoomsResponse{
Rooms: rooms,
}
RecordResponse(ctx, res)
return res, nil
}
func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomRequest) (*livekit.DeleteRoomResponse, error) {
RecordRequest(ctx, req)
AppendLogFields(ctx, "room", req.Room)
if err := EnsureCreatePermission(ctx); err != nil {
return nil, twirpAuthError(err)
@@ -134,10 +144,14 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq
}
err = s.roomStore.DeleteRoom(ctx, livekit.RoomName(req.Room))
return &livekit.DeleteRoomResponse{}, err
res := &livekit.DeleteRoomResponse{}
RecordResponse(ctx, res)
return res, err
}
func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListParticipantsRequest) (*livekit.ListParticipantsResponse, error) {
RecordRequest(ctx, req)
AppendLogFields(ctx, "room", req.Room)
if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil {
return nil, twirpAuthError(err)
@@ -151,10 +165,13 @@ func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListPar
res := &livekit.ListParticipantsResponse{
Participants: participants,
}
RecordResponse(ctx, res)
return res, nil
}
func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (*livekit.ParticipantInfo, error) {
RecordRequest(ctx, req)
AppendLogFields(ctx, "room", req.Room, "participant", req.Identity)
if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil {
return nil, twirpAuthError(err)
@@ -165,10 +182,13 @@ func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParti
return nil, err
}
RecordResponse(ctx, participant)
return participant, nil
}
func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (*livekit.RemoveParticipantResponse, error) {
RecordRequest(ctx, req)
AppendLogFields(ctx, "room", req.Room, "participant", req.Identity)
if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil {
@@ -179,19 +199,27 @@ func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomPa
return nil, twirp.NotFoundError("participant not found")
}
return s.participantClient.RemoveParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
res, err := s.participantClient.RemoveParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
RecordResponse(ctx, res)
return res, err
}
func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteRoomTrackRequest) (*livekit.MuteRoomTrackResponse, error) {
RecordRequest(ctx, req)
AppendLogFields(ctx, "room", req.Room, "participant", req.Identity, "trackID", req.TrackSid, "muted", req.Muted)
if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil {
return nil, twirpAuthError(err)
}
return s.participantClient.MutePublishedTrack(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
res, err := s.participantClient.MutePublishedTrack(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
RecordResponse(ctx, res)
return res, err
}
func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.UpdateParticipantRequest) (*livekit.ParticipantInfo, error) {
RecordRequest(ctx, redactUpdateParticipantRequest(req))
AppendLogFields(ctx, "room", req.Room, "participant", req.Identity)
if !s.limitConf.CheckParticipantNameLength(req.Name) {
@@ -210,10 +238,14 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update
return nil, twirpAuthError(err)
}
return s.participantClient.UpdateParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
res, err := s.participantClient.UpdateParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
RecordResponse(ctx, res)
return res, err
}
func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.UpdateSubscriptionsRequest) (*livekit.UpdateSubscriptionsResponse, error) {
RecordRequest(ctx, req)
trackSIDs := append(make([]string, 0), req.TrackSids...)
for _, pt := range req.ParticipantTracks {
trackSIDs = append(trackSIDs, pt.TrackSids...)
@@ -224,10 +256,14 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda
return nil, twirpAuthError(err)
}
return s.participantClient.UpdateSubscriptions(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
res, err := s.participantClient.UpdateSubscriptions(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
RecordResponse(ctx, res)
return res, err
}
func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) {
RecordRequest(ctx, redactSendDataRequest(req))
roomName := livekit.RoomName(req.Room)
AppendLogFields(ctx, "room", roomName, "size", len(req.Data))
if err := EnsureAdminPermission(ctx, roomName); err != nil {
@@ -239,10 +275,14 @@ func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest
return nil, twirp.NewError(twirp.InvalidArgument, fmt.Sprintf("nonce should be 16-bytes or not present, got: %d bytes", len(req.Nonce)))
}
return s.roomClient.SendData(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
res, err := s.roomClient.SendData(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
RecordResponse(ctx, res)
return res, err
}
func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.UpdateRoomMetadataRequest) (*livekit.Room, error) {
RecordRequest(ctx, redactUpdateRoomMetadataRequest(req))
AppendLogFields(ctx, "room", req.Room, "size", len(req.Metadata))
maxMetadataSize := int(s.limitConf.MaxMetadataSize)
if maxMetadataSize > 0 && len(req.Metadata) > maxMetadataSize {
@@ -263,11 +303,12 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
return nil, err
}
RecordResponse(ctx, room)
return room, nil
}
func redactCreateRoomRequest(req *livekit.CreateRoomRequest) *livekit.CreateRoomRequest {
if req.Egress == nil {
if req.Egress == nil && req.Metadata == "" {
// nothing to redact
return req
}
@@ -284,5 +325,62 @@ func redactCreateRoomRequest(req *livekit.CreateRoomRequest) *livekit.CreateRoom
egress.RedactUpload(clone.Egress.Tracks)
}
// replace with size of metadata to provide visibility on request size
if clone.Metadata != "" {
clone.Metadata = fmt.Sprintf("__size: %d", len(clone.Metadata))
}
return clone
}
func redactUpdateParticipantRequest(req *livekit.UpdateParticipantRequest) *livekit.UpdateParticipantRequest {
if req.Metadata == "" && len(req.Attributes) == 0 {
return req
}
clone := utils.CloneProto(req)
// replace with size of metadata/attributes to provide visibility on request size
if clone.Metadata != "" {
clone.Metadata = fmt.Sprintf("__size: %d", len(clone.Metadata))
}
if len(clone.Attributes) != 0 {
total := 0
for k, v := range clone.Attributes {
total += len(k) + len(v)
}
clone.Attributes = map[string]string{
"__size": fmt.Sprintf("%d", total),
}
}
return clone
}
func redactSendDataRequest(req *livekit.SendDataRequest) *livekit.SendDataRequest {
if len(req.Data) == 0 {
return req
}
clone := utils.CloneProto(req)
// replace with size of data to provide visibility on request size
clone.Data = []byte(fmt.Sprintf("__size: %d", len(clone.Data)))
return clone
}
func redactUpdateRoomMetadataRequest(req *livekit.UpdateRoomMetadataRequest) *livekit.UpdateRoomMetadataRequest {
if req.Metadata == "" {
return req
}
clone := utils.CloneProto(req)
// replace with size of metadata to provide visibility on request size
clone.Metadata = fmt.Sprintf("__size: %d", len(clone.Metadata))
return clone
}

View File

@@ -23,43 +23,48 @@ import (
"time"
"github.com/twitchtv/twirp"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/livekit"
)
type twirpLoggerContext struct{}
type statusReporterKey struct{}
type twirpRequestFields struct {
service string
method string
error twirp.Error
}
// --------------------------------------------------------------------------
type twirpLoggerKey struct{}
// logging handling inspired by https://github.com/bakins/twirpzap
// License: Apache-2.0
func TwirpLogger() *twirp.ServerHooks {
loggerPool := &sync.Pool{
New: func() interface{} {
return &requestLogger{
return &twirpLogger{
fieldsOrig: make([]interface{}, 0, 30),
}
},
}
return &twirp.ServerHooks{
RequestReceived: func(ctx context.Context) (context.Context, error) {
return requestReceived(ctx, loggerPool)
return loggerRequestReceived(ctx, loggerPool)
},
RequestRouted: responseRouted,
Error: errorReceived,
RequestRouted: loggerRequestRouted,
Error: loggerErrorReceived,
ResponseSent: func(ctx context.Context) {
responseSent(ctx, loggerPool)
loggerResponseSent(ctx, loggerPool)
},
}
}
type requestLogger struct {
type twirpLogger struct {
twirpRequestFields
fieldsOrig []interface{}
@@ -68,7 +73,7 @@ type requestLogger struct {
}
func AppendLogFields(ctx context.Context, fields ...interface{}) {
r, ok := ctx.Value(twirpLoggerContext{}).(*requestLogger)
r, ok := ctx.Value(twirpLoggerKey{}).(*twirpLogger)
if !ok || r == nil {
return
}
@@ -76,8 +81,8 @@ func AppendLogFields(ctx context.Context, fields ...interface{}) {
r.fields = append(r.fields, fields...)
}
func requestReceived(ctx context.Context, requestLoggerPool *sync.Pool) (context.Context, error) {
r := requestLoggerPool.Get().(*requestLogger)
func loggerRequestReceived(ctx context.Context, twirpLoggerPool *sync.Pool) (context.Context, error) {
r := twirpLoggerPool.Get().(*twirpLogger)
r.startedAt = time.Now()
r.fields = r.fieldsOrig
r.error = nil
@@ -87,13 +92,12 @@ func requestReceived(ctx context.Context, requestLoggerPool *sync.Pool) (context
r.fields = append(r.fields, "service", svc)
}
ctx = context.WithValue(ctx, twirpLoggerContext{}, r)
return ctx, nil
return context.WithValue(ctx, twirpLoggerKey{}, r), nil
}
func responseRouted(ctx context.Context) (context.Context, error) {
func loggerRequestRouted(ctx context.Context) (context.Context, error) {
if meth, ok := twirp.MethodName(ctx); ok {
l, ok := ctx.Value(twirpLoggerContext{}).(*requestLogger)
l, ok := ctx.Value(twirpLoggerKey{}).(*twirpLogger)
if !ok || l == nil {
return ctx, nil
}
@@ -104,8 +108,8 @@ func responseRouted(ctx context.Context) (context.Context, error) {
return ctx, nil
}
func responseSent(ctx context.Context, requestLoggerPool *sync.Pool) {
r, ok := ctx.Value(twirpLoggerContext{}).(*requestLogger)
func loggerResponseSent(ctx context.Context, twirpLoggerPool *sync.Pool) {
r, ok := ctx.Value(twirpLoggerKey{}).(*twirpLogger)
if !ok || r == nil {
return
}
@@ -126,24 +130,27 @@ func responseSent(ctx context.Context, requestLoggerPool *sync.Pool) {
r.fields = r.fieldsOrig
r.error = nil
requestLoggerPool.Put(r)
twirpLoggerPool.Put(r)
}
func errorReceived(ctx context.Context, e twirp.Error) context.Context {
r, ok := ctx.Value(twirpLoggerContext{}).(*requestLogger)
func loggerErrorReceived(ctx context.Context, e twirp.Error) context.Context {
r, ok := ctx.Value(twirpLoggerKey{}).(*twirpLogger)
if !ok || r == nil {
return ctx
}
r.error = e
return ctx
}
// --------------------------------------------------------------------------
type statusReporterKey struct{}
func TwirpRequestStatusReporter() *twirp.ServerHooks {
return &twirp.ServerHooks{
RequestReceived: statusReporterRequestReceived,
RequestRouted: statusReporterResponseRouted,
RequestRouted: statusReporterRequestRouted,
Error: statusReporterErrorReceived,
ResponseSent: statusReporterResponseSent,
}
@@ -156,11 +163,10 @@ func statusReporterRequestReceived(ctx context.Context) (context.Context, error)
r.service = svc
}
ctx = context.WithValue(ctx, statusReporterKey{}, r)
return ctx, nil
return context.WithValue(ctx, statusReporterKey{}, r), nil
}
func statusReporterResponseRouted(ctx context.Context) (context.Context, error) {
func statusReporterRequestRouted(ctx context.Context) (context.Context, error) {
if meth, ok := twirp.MethodName(ctx); ok {
l, ok := ctx.Value(statusReporterKey{}).(*twirpRequestFields)
if !ok || l == nil {
@@ -207,6 +213,202 @@ func statusReporterErrorReceived(ctx context.Context, e twirp.Error) context.Con
}
r.error = e
return ctx
}
// --------------------------------------------------------------------------
type twirpTelemetryKey struct{}
func TwirpTelemetry(
nodeID livekit.NodeID,
getProjectID func(ctx context.Context) string,
telemetry telemetry.TelemetryService,
) *twirp.ServerHooks {
return &twirp.ServerHooks{
RequestReceived: telemetryRequestReceived,
Error: telemetryErrorReceived,
ResponseSent: func(ctx context.Context) {
telemetryResponseSent(ctx, nodeID, getProjectID, telemetry)
},
}
}
func RecordRequest(ctx context.Context, request proto.Message) {
if request == nil {
return
}
a, ok := ctx.Value(twirpTelemetryKey{}).(*livekit.APICallInfo)
if !ok || a == nil {
return
}
// capture request and extract common fields to top level as appropriate
switch msg := request.(type) {
case *livekit.CreateRoomRequest:
a.Request = &livekit.APICallRequest{
Message: &livekit.APICallRequest_CreateRoomRequest{
CreateRoomRequest: msg,
},
}
a.RoomName = msg.Name
case *livekit.ListRoomsRequest:
a.Request = &livekit.APICallRequest{
Message: &livekit.APICallRequest_ListRoomsRequest{
ListRoomsRequest: msg,
},
}
case *livekit.DeleteRoomRequest:
a.Request = &livekit.APICallRequest{
Message: &livekit.APICallRequest_DeleteRoomRequest{
DeleteRoomRequest: msg,
},
}
a.RoomName = msg.Room
case *livekit.ListParticipantsRequest:
a.Request = &livekit.APICallRequest{
Message: &livekit.APICallRequest_ListParticipantsRequest{
ListParticipantsRequest: msg,
},
}
a.RoomName = msg.Room
case *livekit.RoomParticipantIdentity:
a.Request = &livekit.APICallRequest{
Message: &livekit.APICallRequest_RoomParticipantIdentity{
RoomParticipantIdentity: msg,
},
}
a.RoomName = msg.Room
a.ParticipantIdentity = msg.Identity
case *livekit.MuteRoomTrackRequest:
a.Request = &livekit.APICallRequest{
Message: &livekit.APICallRequest_MuteRoomTrackRequest{
MuteRoomTrackRequest: msg,
},
}
a.RoomName = msg.Room
a.ParticipantIdentity = msg.Identity
a.TrackId = msg.TrackSid
case *livekit.UpdateParticipantRequest:
a.Request = &livekit.APICallRequest{
Message: &livekit.APICallRequest_UpdateParticipantRequest{
UpdateParticipantRequest: msg,
},
}
a.RoomName = msg.Room
a.ParticipantIdentity = msg.Identity
case *livekit.UpdateSubscriptionsRequest:
a.Request = &livekit.APICallRequest{
Message: &livekit.APICallRequest_UpdateSubscriptionsRequest{
UpdateSubscriptionsRequest: msg,
},
}
a.RoomName = msg.Room
a.ParticipantIdentity = msg.Identity
case *livekit.SendDataRequest:
a.Request = &livekit.APICallRequest{
Message: &livekit.APICallRequest_SendDataRequest{
SendDataRequest: msg,
},
}
a.RoomName = msg.Room
case *livekit.UpdateRoomMetadataRequest:
a.Request = &livekit.APICallRequest{
Message: &livekit.APICallRequest_UpdateRoomMetadataRequest{
UpdateRoomMetadataRequest: msg,
},
}
}
}
func RecordResponse(ctx context.Context, response proto.Message) {
if response == nil {
return
}
a, ok := ctx.Value(twirpTelemetryKey{}).(*livekit.APICallInfo)
if !ok || a == nil {
return
}
// extract common fields to top level as appropriate
switch msg := response.(type) {
case *livekit.Room:
a.RoomId = msg.Sid
case *livekit.ParticipantInfo:
a.ParticipantId = msg.Sid
}
}
func telemetryRequestReceived(ctx context.Context) (context.Context, error) {
a := &livekit.APICallInfo{}
a.StartedAt = timestamppb.Now()
if svc, ok := twirp.ServiceName(ctx); ok {
a.Service = svc
}
return context.WithValue(ctx, twirpTelemetryKey{}, a), nil
}
func telemetryRequestRouted(ctx context.Context) (context.Context, error) {
if meth, ok := twirp.MethodName(ctx); ok {
a, ok := ctx.Value(twirpTelemetryKey{}).(*livekit.APICallInfo)
if !ok || a == nil {
return ctx, nil
}
a.Method = meth
}
return ctx, nil
}
func telemetryResponseSent(
ctx context.Context,
nodeID livekit.NodeID,
getProjectID func(ctx context.Context) string,
telemetry telemetry.TelemetryService,
) {
a, ok := ctx.Value(twirpTelemetryKey{}).(*livekit.APICallInfo)
if !ok || a == nil {
return
}
if getProjectID != nil {
a.ProjectId = getProjectID(ctx)
}
a.NodeId = string(nodeID)
if statusCode, ok := twirp.StatusCode(ctx); ok {
if status, err := strconv.Atoi(statusCode); err == nil {
a.Status = int32(status)
}
}
a.DurationNs = time.Since(a.StartedAt.AsTime()).Nanoseconds()
if telemetry != nil {
telemetry.APICall(ctx, a)
}
}
func telemetryErrorReceived(ctx context.Context, e twirp.Error) context.Context {
a, ok := ctx.Value(twirpTelemetryKey{}).(*livekit.APICallInfo)
if !ok || a == nil {
return ctx
}
a.TwirpErrorCode = string(e.Code())
a.TwirpErrorMessage = e.Msg()
return ctx
}
// --------------------------------------------------------------------------

View File

@@ -499,12 +499,23 @@ func (t *telemetryService) IngressEnded(ctx context.Context, info *livekit.Ingre
})
}
func (t *telemetryService) Report(ctx context.Context, report *livekit.ReportInfo) {
func (t *telemetryService) Report(ctx context.Context, reportInfo *livekit.ReportInfo) {
t.enqueue(func() {
ev := &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_REPORT,
Timestamp: timestamppb.Now(),
Report: report,
Report: reportInfo,
}
t.SendEvent(ctx, ev)
})
}
func (t *telemetryService) APICall(ctx context.Context, apiCallInfo *livekit.APICallInfo) {
t.enqueue(func() {
ev := &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_API_CALL,
Timestamp: timestamppb.Now(),
ApiCall: apiCallInfo,
}
t.SendEvent(ctx, ev)
})

View File

@@ -10,6 +10,12 @@ import (
)
type FakeTelemetryService struct {
APICallStub func(context.Context, *livekit.APICallInfo)
aPICallMutex sync.RWMutex
aPICallArgsForCall []struct {
arg1 context.Context
arg2 *livekit.APICallInfo
}
EgressEndedStub func(context.Context, *livekit.EgressInfo)
egressEndedMutex sync.RWMutex
egressEndedArgsForCall []struct {
@@ -263,6 +269,39 @@ type FakeTelemetryService struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeTelemetryService) APICall(arg1 context.Context, arg2 *livekit.APICallInfo) {
fake.aPICallMutex.Lock()
fake.aPICallArgsForCall = append(fake.aPICallArgsForCall, struct {
arg1 context.Context
arg2 *livekit.APICallInfo
}{arg1, arg2})
stub := fake.APICallStub
fake.recordInvocation("APICall", []interface{}{arg1, arg2})
fake.aPICallMutex.Unlock()
if stub != nil {
fake.APICallStub(arg1, arg2)
}
}
func (fake *FakeTelemetryService) APICallCallCount() int {
fake.aPICallMutex.RLock()
defer fake.aPICallMutex.RUnlock()
return len(fake.aPICallArgsForCall)
}
func (fake *FakeTelemetryService) APICallCalls(stub func(context.Context, *livekit.APICallInfo)) {
fake.aPICallMutex.Lock()
defer fake.aPICallMutex.Unlock()
fake.APICallStub = stub
}
func (fake *FakeTelemetryService) APICallArgsForCall(i int) (context.Context, *livekit.APICallInfo) {
fake.aPICallMutex.RLock()
defer fake.aPICallMutex.RUnlock()
argsForCall := fake.aPICallArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeTelemetryService) EgressEnded(arg1 context.Context, arg2 *livekit.EgressInfo) {
fake.egressEndedMutex.Lock()
fake.egressEndedArgsForCall = append(fake.egressEndedArgsForCall, struct {
@@ -1458,6 +1497,8 @@ func (fake *FakeTelemetryService) TrackUnsubscribedArgsForCall(i int) (context.C
func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.aPICallMutex.RLock()
defer fake.aPICallMutex.RUnlock()
fake.egressEndedMutex.RLock()
defer fake.egressEndedMutex.RUnlock()
fake.egressStartedMutex.RLock()

View File

@@ -76,7 +76,8 @@ type TelemetryService interface {
IngressUpdated(ctx context.Context, info *livekit.IngressInfo)
IngressEnded(ctx context.Context, info *livekit.IngressInfo)
LocalRoomState(ctx context.Context, info *livekit.AnalyticsNodeRooms)
Report(ctx context.Context, report *livekit.ReportInfo)
Report(ctx context.Context, reportInfo *livekit.ReportInfo)
APICall(ctx context.Context, apiCallInfo *livekit.APICallInfo)
// helpers
AnalyticsService