From 72728139251e2a818b8636cbf322086008597cc4 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Fri, 16 Jan 2026 13:31:31 +0100 Subject: [PATCH 1/2] Implement cleanup job for queue jobs Add scheduled cleanup job that removes old completed and failed queue jobs after 30 days. Jobs are kept for debugging purposes. Includes migration to change the next_attempt_id FK constraint from NO ACTION to SET NULL, allowing cleanup of retry chains without breaking foreign key constraints. One caveat is that cleanup is based on their creation time, *not* when they got completed/failed. This means that if the job takes a long time (as in, several days) to get scheduled, it might get cleared as soon as it runs. This is fine for now, we may want to revisit this if we start scheduling jobs far in the future --- ...7559da26def223b8954cf32959cce777577d7.json | 24 +++++++++ ...00003_queue_jobs_next_attempt_set_null.sql | 13 +++++ ...ue_jobs_next_attempt_set_null_validate.sql | 10 ++++ crates/storage-pg/src/queue/job.rs | 51 +++++++++++++++++++ crates/storage/src/queue/job.rs | 32 ++++++++++++ crates/storage/src/queue/tasks.rs | 8 +++ crates/tasks/src/database.rs | 46 ++++++++++++++++- crates/tasks/src/lib.rs | 7 +++ 8 files changed, 190 insertions(+), 1 deletion(-) create mode 100644 crates/storage-pg/.sqlx/query-5d0d4699aa82b3976c6c1fcb0d77559da26def223b8954cf32959cce777577d7.json create mode 100644 crates/storage-pg/migrations/20260116000003_queue_jobs_next_attempt_set_null.sql create mode 100644 crates/storage-pg/migrations/20260116000004_queue_jobs_next_attempt_set_null_validate.sql diff --git a/crates/storage-pg/.sqlx/query-5d0d4699aa82b3976c6c1fcb0d77559da26def223b8954cf32959cce777577d7.json b/crates/storage-pg/.sqlx/query-5d0d4699aa82b3976c6c1fcb0d77559da26def223b8954cf32959cce777577d7.json new file mode 100644 index 000000000..b712d9e63 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-5d0d4699aa82b3976c6c1fcb0d77559da26def223b8954cf32959cce777577d7.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_delete AS (\n SELECT queue_job_id\n FROM queue_jobs\n WHERE (status = 'completed' OR status = 'failed')\n AND ($1::uuid IS NULL OR queue_job_id > $1)\n AND queue_job_id <= $2\n ORDER BY queue_job_id\n LIMIT $3\n )\n DELETE FROM queue_jobs\n USING to_delete\n WHERE queue_jobs.queue_job_id = to_delete.queue_job_id\n RETURNING queue_jobs.queue_job_id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "queue_job_id", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Uuid", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "5d0d4699aa82b3976c6c1fcb0d77559da26def223b8954cf32959cce777577d7" +} diff --git a/crates/storage-pg/migrations/20260116000003_queue_jobs_next_attempt_set_null.sql b/crates/storage-pg/migrations/20260116000003_queue_jobs_next_attempt_set_null.sql new file mode 100644 index 000000000..a9c1a47b2 --- /dev/null +++ b/crates/storage-pg/migrations/20260116000003_queue_jobs_next_attempt_set_null.sql @@ -0,0 +1,13 @@ +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Change the FK constraint on next_attempt_id to SET NULL on delete +-- This allows us to clean up old completed/failed jobs without breaking retry chains +ALTER TABLE queue_jobs + DROP CONSTRAINT queue_jobs_next_attempt_id_fkey, + ADD CONSTRAINT queue_jobs_next_attempt_id_fkey + FOREIGN KEY (next_attempt_id) + REFERENCES queue_jobs (queue_job_id) + ON DELETE SET NULL; diff --git a/crates/storage-pg/migrations/20260116000004_queue_jobs_next_attempt_set_null_validate.sql b/crates/storage-pg/migrations/20260116000004_queue_jobs_next_attempt_set_null_validate.sql new file mode 100644 index 000000000..b29424a4f --- /dev/null +++ b/crates/storage-pg/migrations/20260116000004_queue_jobs_next_attempt_set_null_validate.sql @@ -0,0 +1,10 @@ +-- no-transaction +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Validate the FK constraint that was added in the previous migration +-- This is done in a separate migration to avoid holding locks for too long +ALTER TABLE queue_jobs + VALIDATE CONSTRAINT queue_jobs_next_attempt_id_fkey; diff --git a/crates/storage-pg/src/queue/job.rs b/crates/storage-pg/src/queue/job.rs index a7f9e5591..f3d1da697 100644 --- a/crates/storage-pg/src/queue/job.rs +++ b/crates/storage-pg/src/queue/job.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2024, 2025 New Vector Ltd. // // SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial @@ -444,4 +445,54 @@ impl QueueJobRepository for PgQueueJobRepository<'_> { let count = res.rows_affected(); Ok(usize::try_from(count).unwrap_or(usize::MAX)) } + + #[tracing::instrument( + name = "db.queue_job.cleanup", + skip_all, + fields( + db.query.text, + since = since.map(tracing::field::display), + until = %until, + limit = limit, + ), + err, + )] + async fn cleanup( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error> { + // Use ULID cursor-based pagination for completed and failed jobs. + // We delete both completed and failed jobs in the same batch. + // `MAX(uuid)` isn't a thing in Postgres, so we aggregate on the client side. + let res = sqlx::query_scalar!( + r#" + WITH to_delete AS ( + SELECT queue_job_id + FROM queue_jobs + WHERE (status = 'completed' OR status = 'failed') + AND ($1::uuid IS NULL OR queue_job_id > $1) + AND queue_job_id <= $2 + ORDER BY queue_job_id + LIMIT $3 + ) + DELETE FROM queue_jobs + USING to_delete + WHERE queue_jobs.queue_job_id = to_delete.queue_job_id + RETURNING queue_jobs.queue_job_id + "#, + since.map(Uuid::from), + Uuid::from(until), + i64::try_from(limit).unwrap_or(i64::MAX) + ) + .traced() + .fetch_all(&mut *self.conn) + .await?; + + let count = res.len(); + let max_id = res.into_iter().max(); + + Ok((count, max_id.map(Ulid::from))) + } } diff --git a/crates/storage/src/queue/job.rs b/crates/storage/src/queue/job.rs index 1fd21f442..a436e0669 100644 --- a/crates/storage/src/queue/job.rs +++ b/crates/storage/src/queue/job.rs @@ -1,3 +1,4 @@ +// Copyright 2025, 2026 Element Creations Ltd. // Copyright 2024, 2025 New Vector Ltd. // // SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial @@ -213,6 +214,30 @@ pub trait QueueJobRepository: Send + Sync { /// /// Returns an error if the underlying repository fails. async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result; + + /// Cleanup old completed and failed jobs + /// + /// This will delete jobs with status 'completed' or 'failed' and IDs up to + /// and including `until`. Uses ULID cursor-based pagination for efficiency. + /// + /// Returns the number of jobs deleted and the cursor for the next batch + /// + /// # Parameters + /// + /// * `since`: The cursor to start from (exclusive), or `None` to start from + /// the beginning + /// * `until`: The maximum ULID to delete (inclusive upper bound) + /// * `limit`: The maximum number of jobs to delete in this batch + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn cleanup( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error>; } repository_impl!(QueueJobRepository: @@ -261,6 +286,13 @@ repository_impl!(QueueJobRepository: ) -> Result<(), Self::Error>; async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result; + + async fn cleanup( + &mut self, + since: Option, + until: Ulid, + limit: usize, + ) -> Result<(usize, Option), Self::Error>; ); /// Extension trait for [`QueueJobRepository`] to help adding a job to the queue diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index 500a7eced..7ea900a47 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -414,6 +414,14 @@ impl InsertableJob for CleanupUpstreamOAuthLinksJob { const QUEUE_NAME: &'static str = "cleanup-upstream-oauth-links"; } +/// Cleanup old completed and failed queue jobs +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct CleanupQueueJobsJob; + +impl InsertableJob for CleanupQueueJobsJob { + const QUEUE_NAME: &'static str = "cleanup-queue-jobs"; +} + /// Scheduled job to expire inactive sessions /// /// This job will trigger jobs to expire inactive compat, oauth and user diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index fe92e5a99..c3876631f 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -13,7 +13,7 @@ use async_trait::async_trait; use mas_storage::queue::{ CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob, CleanupFinishedCompatSessionsJob, CleanupOAuthAuthorizationGrantsJob, - CleanupOAuthDeviceCodeGrantsJob, CleanupRevokedOAuthAccessTokensJob, + CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob, CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob, CleanupUpstreamOAuthLinksJob, CleanupUpstreamOAuthSessionsJob, CleanupUserEmailAuthenticationsJob, CleanupUserRecoverySessionsJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob, @@ -416,6 +416,50 @@ impl RunnableJob for CleanupUpstreamOAuthLinksJob { } } +#[async_trait] +impl RunnableJob for CleanupQueueJobsJob { + #[tracing::instrument(name = "job.cleanup_queue_jobs", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Remove completed and failed queue jobs after 30 days. + // Keep them for debugging purposes. + let until = state.clock.now() - chrono::Duration::days(30); + let until = Ulid::from_parts( + u64::try_from(until.timestamp_millis()).unwrap_or(u64::MIN), + u128::MAX, + ); + let mut total = 0; + + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + let (count, cursor) = repo + .queue_job() + .cleanup(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + since = cursor; + total += count; + + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no queue jobs to clean up"); + } else { + info!(count = total, "cleaned up queue jobs"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + Some(Duration::from_secs(10 * 60)) + } +} + #[async_trait] impl RunnableJob for CleanupUserRegistrationsJob { #[tracing::instrument(name = "job.cleanup_user_registrations", skip_all)] diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 5278caed3..54dc8d713 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -141,6 +141,7 @@ pub async fn init( .register_handler::() .register_handler::() .register_handler::() + .register_handler::() .register_handler::() .register_handler::() .register_handler::() @@ -222,6 +223,12 @@ pub async fn init( "0 59 * * * *".parse()?, mas_storage::queue::CleanupUpstreamOAuthLinksJob, ) + .add_schedule( + "cleanup-queue-jobs", + // Run this job every hour + "0 45 * * * *".parse()?, + mas_storage::queue::CleanupQueueJobsJob, + ) .add_schedule( "cleanup-expired-oauth-access-tokens", // Run this job every 4 hours From dbf3c351f42a7243d27f50b45e167d8e2fdbe15f Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Mon, 19 Jan 2026 15:44:22 +0100 Subject: [PATCH 2/2] Mark the next attempt foreign key as initially not valid --- .../20260116000003_queue_jobs_next_attempt_set_null.sql | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/storage-pg/migrations/20260116000003_queue_jobs_next_attempt_set_null.sql b/crates/storage-pg/migrations/20260116000003_queue_jobs_next_attempt_set_null.sql index a9c1a47b2..c94934500 100644 --- a/crates/storage-pg/migrations/20260116000003_queue_jobs_next_attempt_set_null.sql +++ b/crates/storage-pg/migrations/20260116000003_queue_jobs_next_attempt_set_null.sql @@ -10,4 +10,5 @@ ALTER TABLE queue_jobs ADD CONSTRAINT queue_jobs_next_attempt_id_fkey FOREIGN KEY (next_attempt_id) REFERENCES queue_jobs (queue_job_id) - ON DELETE SET NULL; + ON DELETE SET NULL + NOT VALID;