From f69855e21edeca4dd5f856439c2d68b7d75b8dd2 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 9 Jul 2025 17:22:13 +0200 Subject: [PATCH] Remove the duplicate clock and rng in QueueWorker Use the ones in the inner State instead --- crates/tasks/src/new_queue.rs | 48 +++++++++++++++++------------------ 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index 777844e2f..1ede913a7 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -19,7 +19,6 @@ use opentelemetry::{ metrics::{Counter, Histogram, UpDownCounter}, }; use rand::{Rng, RngCore, distributions::Uniform}; -use rand_chacha::ChaChaRng; use serde::de::DeserializeOwned; use sqlx::{ Acquire, Either, @@ -195,8 +194,6 @@ struct ScheduleDefinition { } pub struct QueueWorker { - rng: ChaChaRng, - clock: Box, listener: PgListener, registration: Worker, am_i_leader: bool, @@ -278,8 +275,6 @@ impl QueueWorker { let cancellation_guard = cancellation_token.clone().drop_guard(); Ok(Self { - rng, - clock, listener, registration, am_i_leader: false, @@ -397,6 +392,9 @@ impl QueueWorker { async fn shutdown(&mut self) -> Result<(), QueueRunnerError> { tracing::info!("Shutting down worker"); + let clock = self.state.clock(); + let mut rng = self.state.rng(); + // Start a transaction on the existing PgListener connection let txn = self .listener @@ -421,13 +419,13 @@ impl QueueWorker { // Wait for all the jobs to finish self.tracker - .process_jobs(&mut self.rng, &self.clock, &mut repo, true) + .process_jobs(&mut rng, clock, &mut repo, true) .await?; // Tell the other workers we're shutting down // This also releases the leader election lease repo.queue_worker() - .shutdown(&self.clock, &self.registration) + .shutdown(clock, &self.registration) .await?; repo.into_inner() @@ -440,12 +438,12 @@ impl QueueWorker { #[tracing::instrument(name = "worker.wait_until_wakeup", skip_all)] async fn wait_until_wakeup(&mut self) -> Result<(), QueueRunnerError> { + let mut rng = self.state.rng(); + // This is to make sure we wake up every second to do the maintenance tasks // We add a little bit of random jitter to the duration, so that we don't get // fully synced workers waking up at the same time after each notification - let sleep_duration = self - .rng - .sample(Uniform::new(MIN_SLEEP_DURATION, MAX_SLEEP_DURATION)); + let sleep_duration = rng.sample(Uniform::new(MIN_SLEEP_DURATION, MAX_SLEEP_DURATION)); let wakeup_sleep = tokio::time::sleep(sleep_duration); tokio::select! { @@ -490,7 +488,9 @@ impl QueueWorker { )] async fn tick(&mut self) -> Result<(), QueueRunnerError> { tracing::debug!("Tick"); - let now = self.clock.now(); + let clock = self.state.clock(); + let mut rng = self.state.rng(); + let now = clock.now(); // Start a transaction on the existing PgListener connection let txn = self @@ -505,25 +505,25 @@ impl QueueWorker { if now - self.last_heartbeat >= chrono::Duration::minutes(1) { tracing::info!("Sending heartbeat"); repo.queue_worker() - .heartbeat(&self.clock, &self.registration) + .heartbeat(clock, &self.registration) .await?; self.last_heartbeat = now; } // Remove any dead worker leader leases repo.queue_worker() - .remove_leader_lease_if_expired(&self.clock) + .remove_leader_lease_if_expired(clock) .await?; // Try to become (or stay) the leader let leader = repo .queue_worker() - .try_get_leader_lease(&self.clock, &self.registration) + .try_get_leader_lease(clock, &self.registration) .await?; // Process any job task which finished self.tracker - .process_jobs(&mut self.rng, &self.clock, &mut repo, false) + .process_jobs(&mut rng, clock, &mut repo, false) .await?; // Compute how many jobs we should fetch at most @@ -538,7 +538,7 @@ impl QueueWorker { let queues = self.tracker.queues(); let jobs = repo .queue_job() - .reserve(&self.clock, &self.registration, &queues, max_jobs_to_fetch) + .reserve(clock, &self.registration, &queues, max_jobs_to_fetch) .await?; for Job { @@ -592,6 +592,9 @@ impl QueueWorker { return Err(QueueRunnerError::NotLeader); } + let clock = self.state.clock(); + let mut rng = self.state.rng(); + // Start a transaction on the existing PgListener connection let txn = self .listener @@ -633,7 +636,7 @@ impl QueueWorker { // Look at the state of schedules in the database let schedules_status = repo.queue_schedule().list().await?; - let now = self.clock.now(); + let now = clock.now(); for schedule in &self.schedules { // Find the schedule status from the database let Some(schedule_status) = schedules_status @@ -670,8 +673,8 @@ impl QueueWorker { repo.queue_job() .schedule_later( - &mut self.rng, - &self.clock, + &mut rng, + clock, schedule.queue_name, schedule.payload.clone(), serde_json::json!({}), @@ -684,16 +687,13 @@ impl QueueWorker { // We also check if the worker is dead, and if so, we shutdown all the dead // workers that haven't checked in the last two minutes repo.queue_worker() - .shutdown_dead_workers(&self.clock, Duration::minutes(2)) + .shutdown_dead_workers(clock, Duration::minutes(2)) .await?; // TODO: mark tasks those workers had as lost // Mark all the scheduled jobs as available - let scheduled = repo - .queue_job() - .schedule_available_jobs(&self.clock) - .await?; + let scheduled = repo.queue_job().schedule_available_jobs(clock).await?; match scheduled { 0 => {} 1 => tracing::info!("One scheduled job marked as available"),