mirror of
https://github.com/element-hq/matrix-authentication-service.git
synced 2026-04-25 21:52:31 +00:00
Move all the OTEL meters to crate-level statics
This commit is contained in:
@@ -29,6 +29,8 @@ use opentelemetry::{metrics::Histogram, KeyValue};
|
||||
use rand::SeedableRng;
|
||||
use sqlx::PgPool;
|
||||
|
||||
use crate::telemetry::METER;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
pub pool: PgPool,
|
||||
@@ -53,15 +55,8 @@ pub struct AppState {
|
||||
impl AppState {
|
||||
/// Init the metrics for the app state.
|
||||
pub fn init_metrics(&mut self) {
|
||||
// XXX: do we want to put that somewhere else?
|
||||
let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
|
||||
.with_version(env!("CARGO_PKG_VERSION"))
|
||||
.with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL)
|
||||
.build();
|
||||
let meter = opentelemetry::global::meter_with_scope(scope);
|
||||
|
||||
let pool = self.pool.clone();
|
||||
meter
|
||||
METER
|
||||
.i64_observable_up_down_counter("db.connections.usage")
|
||||
.with_description("The number of connections that are currently in `state` described by the state attribute.")
|
||||
.with_unit("{connection}")
|
||||
@@ -74,7 +69,7 @@ impl AppState {
|
||||
.build();
|
||||
|
||||
let pool = self.pool.clone();
|
||||
meter
|
||||
METER
|
||||
.i64_observable_up_down_counter("db.connections.max")
|
||||
.with_description("The maximum number of open connections allowed.")
|
||||
.with_unit("{connection}")
|
||||
@@ -85,7 +80,7 @@ impl AppState {
|
||||
.build();
|
||||
|
||||
// Track the connection acquisition time
|
||||
let histogram = meter
|
||||
let histogram = METER
|
||||
.u64_histogram("db.client.connections.create_time")
|
||||
.with_description("The time it took to create a new connection.")
|
||||
.with_unit("ms")
|
||||
|
||||
@@ -119,11 +119,11 @@ async fn try_main() -> anyhow::Result<ExitCode> {
|
||||
});
|
||||
|
||||
// Setup OpenTelemetry tracing and metrics
|
||||
let tracer = telemetry::setup(&telemetry_config).context("failed to setup OpenTelemetry")?;
|
||||
self::telemetry::setup(&telemetry_config).context("failed to setup OpenTelemetry")?;
|
||||
|
||||
let telemetry_layer = tracer.map(|tracer| {
|
||||
let telemetry_layer = self::telemetry::TRACER.get().map(|tracer| {
|
||||
tracing_opentelemetry::layer()
|
||||
.with_tracer(tracer)
|
||||
.with_tracer(tracer.clone())
|
||||
.with_tracked_inactivity(false)
|
||||
.with_filter(LevelFilter::INFO)
|
||||
});
|
||||
|
||||
@@ -4,7 +4,10 @@
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
// Please see LICENSE in the repository root for full details.
|
||||
|
||||
use std::time::Duration;
|
||||
use std::{
|
||||
sync::{LazyLock, OnceLock},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use anyhow::Context as _;
|
||||
use bytes::Bytes;
|
||||
@@ -15,7 +18,7 @@ use mas_config::{
|
||||
TracingExporterKind,
|
||||
};
|
||||
use opentelemetry::{
|
||||
global,
|
||||
metrics::Meter,
|
||||
propagation::{TextMapCompositePropagator, TextMapPropagator},
|
||||
trace::TracerProvider as _,
|
||||
InstrumentationScope, KeyValue,
|
||||
@@ -23,7 +26,6 @@ use opentelemetry::{
|
||||
use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
|
||||
use opentelemetry_prometheus::PrometheusExporter;
|
||||
use opentelemetry_sdk::{
|
||||
self,
|
||||
metrics::{ManualReader, PeriodicReader, SdkMeterProvider},
|
||||
propagation::{BaggagePropagator, TraceContextPropagator},
|
||||
trace::{Sampler, Tracer, TracerProvider},
|
||||
@@ -31,29 +33,38 @@ use opentelemetry_sdk::{
|
||||
};
|
||||
use opentelemetry_semantic_conventions as semcov;
|
||||
use prometheus::Registry;
|
||||
use tokio::sync::OnceCell;
|
||||
use url::Url;
|
||||
|
||||
static METER_PROVIDER: OnceCell<SdkMeterProvider> = OnceCell::const_new();
|
||||
static PROMETHEUS_REGISTRY: OnceCell<Registry> = OnceCell::const_new();
|
||||
static SCOPE: LazyLock<InstrumentationScope> = LazyLock::new(|| {
|
||||
InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
|
||||
.with_version(env!("CARGO_PKG_VERSION"))
|
||||
.with_schema_url(semcov::SCHEMA_URL)
|
||||
.build()
|
||||
});
|
||||
|
||||
pub fn setup(config: &TelemetryConfig) -> anyhow::Result<Option<Tracer>> {
|
||||
pub static METER: LazyLock<Meter> =
|
||||
LazyLock::new(|| opentelemetry::global::meter_with_scope(SCOPE.clone()));
|
||||
|
||||
pub static TRACER: OnceLock<Tracer> = OnceLock::new();
|
||||
static METER_PROVIDER: OnceLock<SdkMeterProvider> = OnceLock::new();
|
||||
static PROMETHEUS_REGISTRY: OnceLock<Registry> = OnceLock::new();
|
||||
|
||||
pub fn setup(config: &TelemetryConfig) -> anyhow::Result<()> {
|
||||
let propagator = propagator(&config.tracing.propagators);
|
||||
|
||||
// The CORS filter needs to know what headers it should whitelist for
|
||||
// CORS-protected requests.
|
||||
mas_http::set_propagator(&propagator);
|
||||
global::set_text_map_propagator(propagator);
|
||||
|
||||
let tracer = tracer(&config.tracing).context("Failed to configure traces exporter")?;
|
||||
opentelemetry::global::set_text_map_propagator(propagator);
|
||||
|
||||
init_tracer(&config.tracing).context("Failed to configure traces exporter")?;
|
||||
init_meter(&config.metrics).context("Failed to configure metrics exporter")?;
|
||||
|
||||
Ok(tracer)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn shutdown() {
|
||||
global::shutdown_tracer_provider();
|
||||
opentelemetry::global::shutdown_tracer_provider();
|
||||
|
||||
if let Some(meter_provider) = METER_PROVIDER.get() {
|
||||
meter_provider.shutdown().unwrap();
|
||||
@@ -93,32 +104,30 @@ fn otlp_tracer_provider(endpoint: Option<&Url>) -> anyhow::Result<TracerProvider
|
||||
.build()
|
||||
.context("Failed to configure OTLP trace exporter")?;
|
||||
|
||||
let tracer = opentelemetry_sdk::trace::TracerProvider::builder()
|
||||
let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
|
||||
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
|
||||
.with_resource(resource())
|
||||
.with_sampler(Sampler::AlwaysOn)
|
||||
.build();
|
||||
|
||||
Ok(tracer)
|
||||
Ok(tracer_provider)
|
||||
}
|
||||
|
||||
fn tracer(config: &TracingConfig) -> anyhow::Result<Option<Tracer>> {
|
||||
fn init_tracer(config: &TracingConfig) -> anyhow::Result<()> {
|
||||
let tracer_provider = match config.exporter {
|
||||
TracingExporterKind::None => return Ok(None),
|
||||
TracingExporterKind::None => return Ok(()),
|
||||
TracingExporterKind::Stdout => stdout_tracer_provider(),
|
||||
TracingExporterKind::Otlp => otlp_tracer_provider(config.endpoint.as_ref())?,
|
||||
};
|
||||
|
||||
let scope = InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
|
||||
.with_version(env!("CARGO_PKG_VERSION"))
|
||||
.with_schema_url(semcov::SCHEMA_URL)
|
||||
.build();
|
||||
let tracer = tracer_provider.tracer_with_scope(SCOPE.clone());
|
||||
TRACER
|
||||
.set(tracer)
|
||||
.map_err(|_| anyhow::anyhow!("TRACER was set twice"))?;
|
||||
|
||||
let tracer = tracer_provider.tracer_with_scope(scope);
|
||||
opentelemetry::global::set_tracer_provider(tracer_provider);
|
||||
|
||||
global::set_tracer_provider(tracer_provider);
|
||||
|
||||
Ok(Some(tracer))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn otlp_metric_reader(endpoint: Option<&url::Url>) -> anyhow::Result<PeriodicReader> {
|
||||
@@ -175,7 +184,7 @@ fn prometheus_service_fn<T>(_req: T) -> PromServiceFuture {
|
||||
}
|
||||
|
||||
pub fn prometheus_service<T>() -> tower::util::ServiceFn<fn(T) -> PromServiceFuture> {
|
||||
if !PROMETHEUS_REGISTRY.initialized() {
|
||||
if PROMETHEUS_REGISTRY.get().is_none() {
|
||||
tracing::warn!("A Prometheus resource was mounted on a listener, but the Prometheus exporter was not setup in the config");
|
||||
}
|
||||
|
||||
@@ -184,7 +193,10 @@ pub fn prometheus_service<T>() -> tower::util::ServiceFn<fn(T) -> PromServiceFut
|
||||
|
||||
fn prometheus_metric_reader() -> anyhow::Result<PrometheusExporter> {
|
||||
let registry = Registry::new();
|
||||
PROMETHEUS_REGISTRY.set(registry.clone())?;
|
||||
|
||||
PROMETHEUS_REGISTRY
|
||||
.set(registry.clone())
|
||||
.map_err(|_| anyhow::anyhow!("PROMETHEUS_REGISTRY was set twice"))?;
|
||||
|
||||
let exporter = opentelemetry_prometheus::exporter()
|
||||
.with_registry(registry)
|
||||
@@ -209,8 +221,10 @@ fn init_meter(config: &MetricsConfig) -> anyhow::Result<()> {
|
||||
|
||||
let meter_provider = meter_provider_builder.with_resource(resource()).build();
|
||||
|
||||
METER_PROVIDER.set(meter_provider.clone())?;
|
||||
global::set_meter_provider(meter_provider.clone());
|
||||
METER_PROVIDER
|
||||
.set(meter_provider.clone())
|
||||
.map_err(|_| anyhow::anyhow!("METER_PROVIDER was set twice"))?;
|
||||
opentelemetry::global::set_meter_provider(meter_provider.clone());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -16,7 +16,10 @@ use sqlx::PgPool;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use ulid::Ulid;
|
||||
|
||||
use crate::activity_tracker::{Message, SessionKind};
|
||||
use crate::{
|
||||
activity_tracker::{Message, SessionKind},
|
||||
METER,
|
||||
};
|
||||
|
||||
/// The maximum number of pending activity records before we flush them to the
|
||||
/// database automatically.
|
||||
@@ -48,13 +51,7 @@ pub struct Worker {
|
||||
|
||||
impl Worker {
|
||||
pub(crate) fn new(pool: PgPool) -> Self {
|
||||
let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
|
||||
.with_version(env!("CARGO_PKG_VERSION"))
|
||||
.with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL)
|
||||
.build();
|
||||
let meter = opentelemetry::global::meter_with_scope(scope);
|
||||
|
||||
let message_counter = meter
|
||||
let message_counter = METER
|
||||
.u64_counter("mas.activity_tracker.messages")
|
||||
.with_description("The number of messages received by the activity tracker")
|
||||
.with_unit("{messages}")
|
||||
@@ -77,7 +74,7 @@ impl Worker {
|
||||
message_counter.add(0, &[KeyValue::new(TYPE, "flush")]);
|
||||
message_counter.add(0, &[KeyValue::new(TYPE, "shutdown")]);
|
||||
|
||||
let flush_time_histogram = meter
|
||||
let flush_time_histogram = METER
|
||||
.u64_histogram("mas.activity_tracker.flush_time")
|
||||
.with_description("The time it took to flush the activity tracker")
|
||||
.with_unit("ms")
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
clippy::let_with_type_underscore,
|
||||
)]
|
||||
|
||||
use std::{convert::Infallible, time::Duration};
|
||||
use std::{convert::Infallible, sync::LazyLock, time::Duration};
|
||||
|
||||
use axum::{
|
||||
extract::{FromRef, FromRequestParts, OriginalUri, RawQuery, State},
|
||||
@@ -41,6 +41,7 @@ use mas_policy::Policy;
|
||||
use mas_router::{Route, UrlBuilder};
|
||||
use mas_storage::{BoxClock, BoxRepository, BoxRng};
|
||||
use mas_templates::{ErrorContext, NotFoundContext, TemplateContext, Templates};
|
||||
use opentelemetry::metrics::Meter;
|
||||
use passwords::PasswordManager;
|
||||
use sqlx::PgPool;
|
||||
use tower::util::AndThenLayer;
|
||||
@@ -62,6 +63,15 @@ mod rate_limit;
|
||||
#[cfg(test)]
|
||||
mod test_utils;
|
||||
|
||||
static METER: LazyLock<Meter> = LazyLock::new(|| {
|
||||
let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
|
||||
.with_version(env!("CARGO_PKG_VERSION"))
|
||||
.with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL)
|
||||
.build();
|
||||
|
||||
opentelemetry::global::meter_with_scope(scope)
|
||||
});
|
||||
|
||||
/// Implement `From<E>` for `RouteError`, for "internal server error" kind of
|
||||
/// errors.
|
||||
#[macro_export]
|
||||
|
||||
@@ -6,6 +6,10 @@
|
||||
|
||||
#![allow(clippy::module_name_repetitions)]
|
||||
|
||||
use std::sync::LazyLock;
|
||||
|
||||
use opentelemetry::metrics::Meter;
|
||||
|
||||
mod metrics;
|
||||
mod trace_context;
|
||||
mod tracing;
|
||||
@@ -13,11 +17,11 @@ mod utils;
|
||||
|
||||
pub use self::{metrics::*, trace_context::*, tracing::*, utils::*};
|
||||
|
||||
fn meter() -> opentelemetry::metrics::Meter {
|
||||
static METER: LazyLock<Meter> = LazyLock::new(|| {
|
||||
let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
|
||||
.with_version(env!("CARGO_PKG_VERSION"))
|
||||
.with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL)
|
||||
.build();
|
||||
|
||||
opentelemetry::global::meter_with_scope(scope)
|
||||
}
|
||||
});
|
||||
|
||||
@@ -10,7 +10,7 @@ use opentelemetry::{metrics::Histogram, KeyValue};
|
||||
use pin_project_lite::pin_project;
|
||||
use tower::{Layer, Service};
|
||||
|
||||
use crate::{utils::FnWrapper, MetricsAttributes};
|
||||
use crate::{utils::FnWrapper, MetricsAttributes, METER};
|
||||
|
||||
/// A [`Layer`] that records the duration of requests in milliseconds.
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -25,7 +25,7 @@ impl DurationRecorderLayer {
|
||||
/// Create a new [`DurationRecorderLayer`].
|
||||
#[must_use]
|
||||
pub fn new(name: &'static str) -> Self {
|
||||
let histogram = crate::meter().u64_histogram(name).build();
|
||||
let histogram = METER.u64_histogram(name).build();
|
||||
Self {
|
||||
histogram,
|
||||
on_request: (),
|
||||
|
||||
@@ -10,7 +10,7 @@ use opentelemetry::{metrics::UpDownCounter, KeyValue};
|
||||
use pin_project_lite::pin_project;
|
||||
use tower::{Layer, Service};
|
||||
|
||||
use crate::MetricsAttributes;
|
||||
use crate::{MetricsAttributes, METER};
|
||||
|
||||
/// A [`Layer`] that records the number of in-flight requests.
|
||||
///
|
||||
@@ -27,7 +27,7 @@ impl InFlightCounterLayer {
|
||||
/// Create a new [`InFlightCounterLayer`].
|
||||
#[must_use]
|
||||
pub fn new(name: &'static str) -> Self {
|
||||
let counter = crate::meter()
|
||||
let counter = METER
|
||||
.i64_up_down_counter(name)
|
||||
.with_unit("{request}")
|
||||
.with_description("The number of in-flight requests")
|
||||
|
||||
Reference in New Issue
Block a user