diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index 6c639eafe..d8f8eccc1 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -29,7 +29,10 @@ use chrono::{DateTime, Utc}; use mas_storage::{oauth2::OAuth2AccessTokenRepository, RepositoryAccess}; use tracing::{debug, info}; -use crate::{utils::metrics_layer, JobContextExt, State}; +use crate::{ + utils::{metrics_layer, trace_layer, TracedJob}, + JobContextExt, State, +}; #[derive(Default, Clone)] pub struct CleanupExpiredTokensJob { @@ -46,6 +49,8 @@ impl Job for CleanupExpiredTokensJob { const NAME: &'static str = "cleanup-expired-tokens"; } +impl TracedJob for CleanupExpiredTokensJob {} + pub async fn cleanup_expired_tokens( job: CleanupExpiredTokensJob, ctx: JobContext, @@ -79,6 +84,7 @@ pub(crate) fn register( .stream(CronStream::new(schedule).timer(TokioTimer).to_stream()) .layer(state.inject()) .layer(metrics_layer()) + .layer(trace_layer()) .build_fn(cleanup_expired_tokens); monitor.register(worker) diff --git a/crates/tasks/src/email.rs b/crates/tasks/src/email.rs index e1caf7128..8646ff0fa 100644 --- a/crates/tasks/src/email.rs +++ b/crates/tasks/src/email.rs @@ -109,5 +109,6 @@ pub(crate) fn register( c.fetch_interval(std::time::Duration::from_secs(1)) }) .build_fn(verify_email); + monitor.register(worker) } diff --git a/crates/tasks/src/utils.rs b/crates/tasks/src/utils.rs index 804658b74..a607c6af1 100644 --- a/crates/tasks/src/utils.rs +++ b/crates/tasks/src/utils.rs @@ -18,14 +18,32 @@ use mas_tower::{ make_span_fn, DurationRecorderLayer, FnWrapper, IdentityLayer, InFlightCounterLayer, TraceLayer, KV, }; -use opentelemetry::{Key, KeyValue}; +use opentelemetry::{trace::SpanContext, Key, KeyValue}; use tracing::info_span; use tracing_opentelemetry::OpenTelemetrySpanExt; const JOB_NAME: Key = Key::from_static_str("job.name"); const JOB_STATUS: Key = Key::from_static_str("job.status"); -fn make_span_for_job_request(req: &JobRequest>) -> tracing::Span +/// Represents a job that can may have a span context attached to it. +pub trait TracedJob: Job { + /// Returns the span context for this job, if any. + /// + /// The default implementation returns `None`. + fn span_context(&self) -> Option { + None + } +} + +/// Implements [`TracedJob`] for any job with the [`JobWithSpanContext`] +/// wrapper. +impl TracedJob for JobWithSpanContext { + fn span_context(&self) -> Option { + JobWithSpanContext::span_context(self) + } +} + +fn make_span_for_job_request(req: &JobRequest) -> tracing::Span where J: Job, { @@ -45,18 +63,15 @@ where span } -type TraceLayerForJob = TraceLayer< - FnWrapper>) -> tracing::Span>, - KV<&'static str>, - KV<&'static str>, ->; +type TraceLayerForJob = + TraceLayer) -> tracing::Span>, KV<&'static str>, KV<&'static str>>; pub(crate) fn trace_layer() -> TraceLayerForJob where - J: Job, + J: TracedJob, { TraceLayer::new(make_span_fn( - make_span_for_job_request:: as fn(&JobRequest>) -> tracing::Span, + make_span_for_job_request:: as fn(&JobRequest) -> tracing::Span, )) .on_response(KV("otel.status_code", "OK")) .on_error(KV("otel.status_code", "ERROR"))