From 773c4d656d6beeaabc01d25c07b9eabbef9bf7a7 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Thu, 12 Dec 2024 15:09:50 +0100 Subject: [PATCH] Move all the OTEL meters to crate-level statics --- crates/cli/src/app_state.rs | 15 ++-- crates/cli/src/main.rs | 6 +- crates/cli/src/telemetry.rs | 70 +++++++++++-------- .../handlers/src/activity_tracker/worker.rs | 15 ++-- crates/handlers/src/lib.rs | 12 +++- crates/tower/src/lib.rs | 8 ++- crates/tower/src/metrics/duration.rs | 4 +- crates/tower/src/metrics/in_flight.rs | 4 +- 8 files changed, 77 insertions(+), 57 deletions(-) diff --git a/crates/cli/src/app_state.rs b/crates/cli/src/app_state.rs index 74a0720f7..2ec207b0a 100644 --- a/crates/cli/src/app_state.rs +++ b/crates/cli/src/app_state.rs @@ -29,6 +29,8 @@ use opentelemetry::{metrics::Histogram, KeyValue}; use rand::SeedableRng; use sqlx::PgPool; +use crate::telemetry::METER; + #[derive(Clone)] pub struct AppState { pub pool: PgPool, @@ -53,15 +55,8 @@ pub struct AppState { impl AppState { /// Init the metrics for the app state. pub fn init_metrics(&mut self) { - // XXX: do we want to put that somewhere else? - let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME")) - .with_version(env!("CARGO_PKG_VERSION")) - .with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL) - .build(); - let meter = opentelemetry::global::meter_with_scope(scope); - let pool = self.pool.clone(); - meter + METER .i64_observable_up_down_counter("db.connections.usage") .with_description("The number of connections that are currently in `state` described by the state attribute.") .with_unit("{connection}") @@ -74,7 +69,7 @@ impl AppState { .build(); let pool = self.pool.clone(); - meter + METER .i64_observable_up_down_counter("db.connections.max") .with_description("The maximum number of open connections allowed.") .with_unit("{connection}") @@ -85,7 +80,7 @@ impl AppState { .build(); // Track the connection acquisition time - let histogram = meter + let histogram = METER .u64_histogram("db.client.connections.create_time") .with_description("The time it took to create a new connection.") .with_unit("ms") diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 220480b93..31583594b 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -119,11 +119,11 @@ async fn try_main() -> anyhow::Result { }); // Setup OpenTelemetry tracing and metrics - let tracer = telemetry::setup(&telemetry_config).context("failed to setup OpenTelemetry")?; + self::telemetry::setup(&telemetry_config).context("failed to setup OpenTelemetry")?; - let telemetry_layer = tracer.map(|tracer| { + let telemetry_layer = self::telemetry::TRACER.get().map(|tracer| { tracing_opentelemetry::layer() - .with_tracer(tracer) + .with_tracer(tracer.clone()) .with_tracked_inactivity(false) .with_filter(LevelFilter::INFO) }); diff --git a/crates/cli/src/telemetry.rs b/crates/cli/src/telemetry.rs index 7afc96b0a..bc9917db2 100644 --- a/crates/cli/src/telemetry.rs +++ b/crates/cli/src/telemetry.rs @@ -4,7 +4,10 @@ // SPDX-License-Identifier: AGPL-3.0-only // Please see LICENSE in the repository root for full details. -use std::time::Duration; +use std::{ + sync::{LazyLock, OnceLock}, + time::Duration, +}; use anyhow::Context as _; use bytes::Bytes; @@ -15,7 +18,7 @@ use mas_config::{ TracingExporterKind, }; use opentelemetry::{ - global, + metrics::Meter, propagation::{TextMapCompositePropagator, TextMapPropagator}, trace::TracerProvider as _, InstrumentationScope, KeyValue, @@ -23,7 +26,6 @@ use opentelemetry::{ use opentelemetry_otlp::{WithExportConfig, WithHttpConfig}; use opentelemetry_prometheus::PrometheusExporter; use opentelemetry_sdk::{ - self, metrics::{ManualReader, PeriodicReader, SdkMeterProvider}, propagation::{BaggagePropagator, TraceContextPropagator}, trace::{Sampler, Tracer, TracerProvider}, @@ -31,29 +33,38 @@ use opentelemetry_sdk::{ }; use opentelemetry_semantic_conventions as semcov; use prometheus::Registry; -use tokio::sync::OnceCell; use url::Url; -static METER_PROVIDER: OnceCell = OnceCell::const_new(); -static PROMETHEUS_REGISTRY: OnceCell = OnceCell::const_new(); +static SCOPE: LazyLock = LazyLock::new(|| { + InstrumentationScope::builder(env!("CARGO_PKG_NAME")) + .with_version(env!("CARGO_PKG_VERSION")) + .with_schema_url(semcov::SCHEMA_URL) + .build() +}); -pub fn setup(config: &TelemetryConfig) -> anyhow::Result> { +pub static METER: LazyLock = + LazyLock::new(|| opentelemetry::global::meter_with_scope(SCOPE.clone())); + +pub static TRACER: OnceLock = OnceLock::new(); +static METER_PROVIDER: OnceLock = OnceLock::new(); +static PROMETHEUS_REGISTRY: OnceLock = OnceLock::new(); + +pub fn setup(config: &TelemetryConfig) -> anyhow::Result<()> { let propagator = propagator(&config.tracing.propagators); // The CORS filter needs to know what headers it should whitelist for // CORS-protected requests. mas_http::set_propagator(&propagator); - global::set_text_map_propagator(propagator); - - let tracer = tracer(&config.tracing).context("Failed to configure traces exporter")?; + opentelemetry::global::set_text_map_propagator(propagator); + init_tracer(&config.tracing).context("Failed to configure traces exporter")?; init_meter(&config.metrics).context("Failed to configure metrics exporter")?; - Ok(tracer) + Ok(()) } pub fn shutdown() { - global::shutdown_tracer_provider(); + opentelemetry::global::shutdown_tracer_provider(); if let Some(meter_provider) = METER_PROVIDER.get() { meter_provider.shutdown().unwrap(); @@ -93,32 +104,30 @@ fn otlp_tracer_provider(endpoint: Option<&Url>) -> anyhow::Result anyhow::Result> { +fn init_tracer(config: &TracingConfig) -> anyhow::Result<()> { let tracer_provider = match config.exporter { - TracingExporterKind::None => return Ok(None), + TracingExporterKind::None => return Ok(()), TracingExporterKind::Stdout => stdout_tracer_provider(), TracingExporterKind::Otlp => otlp_tracer_provider(config.endpoint.as_ref())?, }; - let scope = InstrumentationScope::builder(env!("CARGO_PKG_NAME")) - .with_version(env!("CARGO_PKG_VERSION")) - .with_schema_url(semcov::SCHEMA_URL) - .build(); + let tracer = tracer_provider.tracer_with_scope(SCOPE.clone()); + TRACER + .set(tracer) + .map_err(|_| anyhow::anyhow!("TRACER was set twice"))?; - let tracer = tracer_provider.tracer_with_scope(scope); + opentelemetry::global::set_tracer_provider(tracer_provider); - global::set_tracer_provider(tracer_provider); - - Ok(Some(tracer)) + Ok(()) } fn otlp_metric_reader(endpoint: Option<&url::Url>) -> anyhow::Result { @@ -175,7 +184,7 @@ fn prometheus_service_fn(_req: T) -> PromServiceFuture { } pub fn prometheus_service() -> tower::util::ServiceFn PromServiceFuture> { - if !PROMETHEUS_REGISTRY.initialized() { + if PROMETHEUS_REGISTRY.get().is_none() { tracing::warn!("A Prometheus resource was mounted on a listener, but the Prometheus exporter was not setup in the config"); } @@ -184,7 +193,10 @@ pub fn prometheus_service() -> tower::util::ServiceFn PromServiceFut fn prometheus_metric_reader() -> anyhow::Result { let registry = Registry::new(); - PROMETHEUS_REGISTRY.set(registry.clone())?; + + PROMETHEUS_REGISTRY + .set(registry.clone()) + .map_err(|_| anyhow::anyhow!("PROMETHEUS_REGISTRY was set twice"))?; let exporter = opentelemetry_prometheus::exporter() .with_registry(registry) @@ -209,8 +221,10 @@ fn init_meter(config: &MetricsConfig) -> anyhow::Result<()> { let meter_provider = meter_provider_builder.with_resource(resource()).build(); - METER_PROVIDER.set(meter_provider.clone())?; - global::set_meter_provider(meter_provider.clone()); + METER_PROVIDER + .set(meter_provider.clone()) + .map_err(|_| anyhow::anyhow!("METER_PROVIDER was set twice"))?; + opentelemetry::global::set_meter_provider(meter_provider.clone()); Ok(()) } diff --git a/crates/handlers/src/activity_tracker/worker.rs b/crates/handlers/src/activity_tracker/worker.rs index 4102c1596..6fc3f8a86 100644 --- a/crates/handlers/src/activity_tracker/worker.rs +++ b/crates/handlers/src/activity_tracker/worker.rs @@ -16,7 +16,10 @@ use sqlx::PgPool; use tokio_util::sync::CancellationToken; use ulid::Ulid; -use crate::activity_tracker::{Message, SessionKind}; +use crate::{ + activity_tracker::{Message, SessionKind}, + METER, +}; /// The maximum number of pending activity records before we flush them to the /// database automatically. @@ -48,13 +51,7 @@ pub struct Worker { impl Worker { pub(crate) fn new(pool: PgPool) -> Self { - let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME")) - .with_version(env!("CARGO_PKG_VERSION")) - .with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL) - .build(); - let meter = opentelemetry::global::meter_with_scope(scope); - - let message_counter = meter + let message_counter = METER .u64_counter("mas.activity_tracker.messages") .with_description("The number of messages received by the activity tracker") .with_unit("{messages}") @@ -77,7 +74,7 @@ impl Worker { message_counter.add(0, &[KeyValue::new(TYPE, "flush")]); message_counter.add(0, &[KeyValue::new(TYPE, "shutdown")]); - let flush_time_histogram = meter + let flush_time_histogram = METER .u64_histogram("mas.activity_tracker.flush_time") .with_description("The time it took to flush the activity tracker") .with_unit("ms") diff --git a/crates/handlers/src/lib.rs b/crates/handlers/src/lib.rs index 9d0abb93f..10f96ca9b 100644 --- a/crates/handlers/src/lib.rs +++ b/crates/handlers/src/lib.rs @@ -15,7 +15,7 @@ clippy::let_with_type_underscore, )] -use std::{convert::Infallible, time::Duration}; +use std::{convert::Infallible, sync::LazyLock, time::Duration}; use axum::{ extract::{FromRef, FromRequestParts, OriginalUri, RawQuery, State}, @@ -41,6 +41,7 @@ use mas_policy::Policy; use mas_router::{Route, UrlBuilder}; use mas_storage::{BoxClock, BoxRepository, BoxRng}; use mas_templates::{ErrorContext, NotFoundContext, TemplateContext, Templates}; +use opentelemetry::metrics::Meter; use passwords::PasswordManager; use sqlx::PgPool; use tower::util::AndThenLayer; @@ -62,6 +63,15 @@ mod rate_limit; #[cfg(test)] mod test_utils; +static METER: LazyLock = LazyLock::new(|| { + let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME")) + .with_version(env!("CARGO_PKG_VERSION")) + .with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL) + .build(); + + opentelemetry::global::meter_with_scope(scope) +}); + /// Implement `From` for `RouteError`, for "internal server error" kind of /// errors. #[macro_export] diff --git a/crates/tower/src/lib.rs b/crates/tower/src/lib.rs index fac8379f8..7c3bd077b 100644 --- a/crates/tower/src/lib.rs +++ b/crates/tower/src/lib.rs @@ -6,6 +6,10 @@ #![allow(clippy::module_name_repetitions)] +use std::sync::LazyLock; + +use opentelemetry::metrics::Meter; + mod metrics; mod trace_context; mod tracing; @@ -13,11 +17,11 @@ mod utils; pub use self::{metrics::*, trace_context::*, tracing::*, utils::*}; -fn meter() -> opentelemetry::metrics::Meter { +static METER: LazyLock = LazyLock::new(|| { let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME")) .with_version(env!("CARGO_PKG_VERSION")) .with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL) .build(); opentelemetry::global::meter_with_scope(scope) -} +}); diff --git a/crates/tower/src/metrics/duration.rs b/crates/tower/src/metrics/duration.rs index 9a33a83bf..fa876fc70 100644 --- a/crates/tower/src/metrics/duration.rs +++ b/crates/tower/src/metrics/duration.rs @@ -10,7 +10,7 @@ use opentelemetry::{metrics::Histogram, KeyValue}; use pin_project_lite::pin_project; use tower::{Layer, Service}; -use crate::{utils::FnWrapper, MetricsAttributes}; +use crate::{utils::FnWrapper, MetricsAttributes, METER}; /// A [`Layer`] that records the duration of requests in milliseconds. #[derive(Clone, Debug)] @@ -25,7 +25,7 @@ impl DurationRecorderLayer { /// Create a new [`DurationRecorderLayer`]. #[must_use] pub fn new(name: &'static str) -> Self { - let histogram = crate::meter().u64_histogram(name).build(); + let histogram = METER.u64_histogram(name).build(); Self { histogram, on_request: (), diff --git a/crates/tower/src/metrics/in_flight.rs b/crates/tower/src/metrics/in_flight.rs index 5d5bd8676..55ebdfe17 100644 --- a/crates/tower/src/metrics/in_flight.rs +++ b/crates/tower/src/metrics/in_flight.rs @@ -10,7 +10,7 @@ use opentelemetry::{metrics::UpDownCounter, KeyValue}; use pin_project_lite::pin_project; use tower::{Layer, Service}; -use crate::MetricsAttributes; +use crate::{MetricsAttributes, METER}; /// A [`Layer`] that records the number of in-flight requests. /// @@ -27,7 +27,7 @@ impl InFlightCounterLayer { /// Create a new [`InFlightCounterLayer`]. #[must_use] pub fn new(name: &'static str) -> Self { - let counter = crate::meter() + let counter = METER .i64_up_down_counter(name) .with_unit("{request}") .with_description("The number of in-flight requests")