Properly trace the cleanup-expired-tokens job

This commit is contained in:
Quentin Gliech
2023-08-03 14:03:07 +02:00
parent cc2bce7b03
commit 646b6cc0e3
3 changed files with 32 additions and 10 deletions
+7 -1
View File
@@ -29,7 +29,10 @@ use chrono::{DateTime, Utc};
use mas_storage::{oauth2::OAuth2AccessTokenRepository, RepositoryAccess};
use tracing::{debug, info};
use crate::{utils::metrics_layer, JobContextExt, State};
use crate::{
utils::{metrics_layer, trace_layer, TracedJob},
JobContextExt, State,
};
#[derive(Default, Clone)]
pub struct CleanupExpiredTokensJob {
@@ -46,6 +49,8 @@ impl Job for CleanupExpiredTokensJob {
const NAME: &'static str = "cleanup-expired-tokens";
}
impl TracedJob for CleanupExpiredTokensJob {}
pub async fn cleanup_expired_tokens(
job: CleanupExpiredTokensJob,
ctx: JobContext,
@@ -79,6 +84,7 @@ pub(crate) fn register(
.stream(CronStream::new(schedule).timer(TokioTimer).to_stream())
.layer(state.inject())
.layer(metrics_layer())
.layer(trace_layer())
.build_fn(cleanup_expired_tokens);
monitor.register(worker)
+1
View File
@@ -109,5 +109,6 @@ pub(crate) fn register(
c.fetch_interval(std::time::Duration::from_secs(1))
})
.build_fn(verify_email);
monitor.register(worker)
}
+24 -9
View File
@@ -18,14 +18,32 @@ use mas_tower::{
make_span_fn, DurationRecorderLayer, FnWrapper, IdentityLayer, InFlightCounterLayer,
TraceLayer, KV,
};
use opentelemetry::{Key, KeyValue};
use opentelemetry::{trace::SpanContext, Key, KeyValue};
use tracing::info_span;
use tracing_opentelemetry::OpenTelemetrySpanExt;
const JOB_NAME: Key = Key::from_static_str("job.name");
const JOB_STATUS: Key = Key::from_static_str("job.status");
fn make_span_for_job_request<J>(req: &JobRequest<JobWithSpanContext<J>>) -> tracing::Span
/// Represents a job that can may have a span context attached to it.
pub trait TracedJob: Job {
/// Returns the span context for this job, if any.
///
/// The default implementation returns `None`.
fn span_context(&self) -> Option<SpanContext> {
None
}
}
/// Implements [`TracedJob`] for any job with the [`JobWithSpanContext`]
/// wrapper.
impl<J: Job> TracedJob for JobWithSpanContext<J> {
fn span_context(&self) -> Option<SpanContext> {
JobWithSpanContext::span_context(self)
}
}
fn make_span_for_job_request<J: TracedJob>(req: &JobRequest<J>) -> tracing::Span
where
J: Job,
{
@@ -45,18 +63,15 @@ where
span
}
type TraceLayerForJob<J> = TraceLayer<
FnWrapper<fn(&JobRequest<JobWithSpanContext<J>>) -> tracing::Span>,
KV<&'static str>,
KV<&'static str>,
>;
type TraceLayerForJob<J> =
TraceLayer<FnWrapper<fn(&JobRequest<J>) -> tracing::Span>, KV<&'static str>, KV<&'static str>>;
pub(crate) fn trace_layer<J>() -> TraceLayerForJob<J>
where
J: Job,
J: TracedJob,
{
TraceLayer::new(make_span_fn(
make_span_for_job_request::<J> as fn(&JobRequest<JobWithSpanContext<J>>) -> tracing::Span,
make_span_for_job_request::<J> as fn(&JobRequest<J>) -> tracing::Span,
))
.on_response(KV("otel.status_code", "OK"))
.on_error(KV("otel.status_code", "ERROR"))