Compare commits

...

3 Commits

Author SHA1 Message Date
timedout eabed74d6e fix: Be less aggressive with progress monitors 2026-07-02 11:59:37 +01:00
timedout 614f8adb48 feat: Add more timing info
Shots in the dark <3
2026-07-02 11:59:36 +01:00
timedout 304850147c feat: Add progress monitor to federation sender suite 2026-07-02 11:57:34 +01:00
+167 -3
View File
@@ -10,11 +10,13 @@
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
use conduwuit::{
debug_info, debug_warn, info, utils::time::exponential_backoff::min_exp_backoff_duration,
debug_info, debug_warn, defer, info,
utils::time::exponential_backoff::min_exp_backoff_duration,
};
use conduwuit_core::{
Error, Event, Result, at, debug, err, error,
result::LogErr,
trace,
utils::{
ReadyExt, calculate_hash, continue_exponential_backoff_secs,
future::TryExtExt,
@@ -51,6 +53,7 @@
uint,
};
use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
use tokio::{select, time::interval};
use super::{Destination, EduBuf, EduVec, Msg, SendingEvent, Service, data::QueueItem};
@@ -168,8 +171,21 @@ async fn handle_response_ok<'a>(
futures: &mut SendingFutures<'a>,
statuses: &mut CurTransactionStatus,
) {
let monitor = self.new_progress_monitor(
format!("Handle OK response from {dest:?}"),
Duration::from_secs(70),
);
defer! {{ monitor.notify_one(); }}
let start = Instant::now();
let _cork = self.db.db.cork();
self.db.delete_all_active_requests_for(dest).await;
if start.elapsed().as_secs() > 1 {
warn!(
elapsed=?start.elapsed(),
"Deleting all active requests for {dest:?} took a long time"
);
}
let start = Instant::now();
// Find events that have been added since starting the last request
let new_events = self
@@ -178,6 +194,12 @@ async fn handle_response_ok<'a>(
.take(DEQUEUE_LIMIT)
.collect::<Vec<_>>()
.await;
if start.elapsed().as_secs() > 1 {
warn!(
elapsed=?start.elapsed(),
"Fetching queued requests for {dest:?} took a long time"
);
}
// Insert any pdus we found
if !new_events.is_empty() {
@@ -201,6 +223,11 @@ async fn handle_request<'a>(
futures: &mut SendingFutures<'a>,
statuses: &mut CurTransactionStatus,
) {
let monitor = self.new_progress_monitor(
format!("Handle send request {:?}", msg.queue_id),
Duration::from_secs(70),
);
defer! {{ monitor.notify_one(); }}
let iv = vec![(msg.queue_id, msg.event)];
if let Ok(Some(events)) = self.select_events(&msg.dest, iv, statuses).await {
if !events.is_empty() {
@@ -257,6 +284,10 @@ async fn startup_netburst<'a>(
futures: &mut SendingFutures<'a>,
statuses: &mut CurTransactionStatus,
) {
let monitor = self.new_progress_monitor(
format!("Startup netburst on shard {id}"),
Duration::from_mins(2),
);
let keep =
usize::try_from(self.server.config.startup_netburst_keep).unwrap_or(usize::MAX);
let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new();
@@ -287,6 +318,7 @@ async fn startup_netburst<'a>(
futures.push(self.send_events(dest.clone(), events));
}
}
monitor.notify_one();
}
/// Selects any new events to send to the given destination.
@@ -317,11 +349,16 @@ async fn select_events(
// Must retry any previous transaction for this remote.
if retry {
let active_requests_task = self.new_progress_monitor(
format!("Fetch previous transactions for {dest:?}"),
Duration::from_secs(1),
);
self.db
.active_requests_for(dest)
.ready_for_each(|(_, e)| events.push(e))
.await;
active_requests_task.notify_one();
info!(?dest, "Attempting to retry destination with {} previous events", events.len());
return Ok(Some(events));
}
@@ -387,6 +424,11 @@ fn should_attempt_send(
/// Selects a number of EDUs to send to the target.
#[tracing::instrument(name = "edus", level = "debug", skip_all)]
async fn select_edus(&self, server_name: &ServerName) -> Result<(EduVec, u64)> {
let monitor = self.new_progress_monitor(
format!("Select EDUs for {server_name}"),
Duration::from_secs(1),
);
defer! {{ monitor.notify_one(); }}
// selection window
let since = self.db.get_latest_educount(server_name).await;
let since_upper = self.services.globals.current_count()?;
@@ -435,6 +477,11 @@ async fn select_edus_device_changes(
max_edu_count: &AtomicU64,
events_len: &AtomicUsize,
) -> EduVec {
let monitor = self.new_progress_monitor(
format!("Select device list update EDUs for {server_name}"),
Duration::from_secs(1),
);
defer! {{ monitor.notify_one(); }}
let mut events = EduVec::new();
let server_rooms = self.services.state_cache.server_rooms(server_name);
@@ -492,6 +539,11 @@ async fn select_edus_receipts(
since: (u64, u64),
max_edu_count: &AtomicU64,
) -> Option<EduBuf> {
let monitor = self.new_progress_monitor(
format!("Select read receipt EDUs for {server_name}"),
Duration::from_secs(1),
);
defer! {{ monitor.notify_one(); }}
let mut num = 0;
let receipts: BTreeMap<OwnedRoomId, ReceiptMap> = self
.services
@@ -598,6 +650,11 @@ async fn select_edus_presence(
since: (u64, u64),
max_edu_count: &AtomicU64,
) -> Option<EduBuf> {
let monitor = self.new_progress_monitor(
format!("Select presence EDUs for {server_name}"),
Duration::from_secs(1),
);
defer! {{ monitor.notify_one(); }}
let presence_since = self.services.presence.presence_since(since.0);
pin_mut!(presence_since);
@@ -679,6 +736,21 @@ fn send_events(&self, dest: Destination, events: Vec<SendingEvent>) -> SendingFu
}
}
/// Creates a new progress monitor with the given name, returning a marker
/// that can be used to mark the progress monitor as finished.
fn new_progress_monitor(
&self,
task_name: String,
tolerance: Duration,
) -> Arc<tokio::sync::Notify> {
let marker = Arc::new(tokio::sync::Notify::new());
let monitor_marker = marker.clone();
self.server.runtime().spawn(async move {
progress_monitor(&task_name, tolerance, monitor_marker).await;
});
marker
}
#[tracing::instrument(
name = "appservice",
level = "debug",
@@ -692,7 +764,12 @@ async fn send_events_dest_appservice(
id: String,
events: Vec<SendingEvent>,
) -> SendingResult {
let monitor = self.new_progress_monitor(
format!("Send {} events to appservice {}", events.len(), id),
Duration::from_secs(self.server.config.sender_timeout),
);
let Some(appservice) = self.services.appservice.get_registration(&id).await else {
monitor.notify_one();
return Err((
Destination::Appservice(id.clone()),
err!(Database(warn!(?id, "Missing appservice registration"))),
@@ -741,7 +818,10 @@ async fn send_events_dest_appservice(
request.ephemeral = edu_jsons;
request.to_device = Vec::new(); // TODO
match self.send_appservice_request(appservice, request).await {
let result = self.send_appservice_request(appservice, request).await;
monitor.notify_one();
match result {
| Ok(_) => Ok(Destination::Appservice(id)),
| Err(e) => Err((Destination::Appservice(id), e)),
}
@@ -767,6 +847,10 @@ async fn send_events_dest_push(
err!(Database(error!(%user_id, ?pushkey, "Missing pusher"))),
));
};
let monitor = self.new_progress_monitor(
format!("Send {} events to pusher {} for {user_id}", pusher.ids.app_id, events.len()),
Duration::from_secs(self.server.config.sender_timeout),
);
let mut pdus = Vec::with_capacity(
events
@@ -819,6 +903,7 @@ async fn send_events_dest_push(
.await
.map_err(|e| (Destination::Push(user_id.clone(), pushkey.clone()), e));
}
monitor.notify_one();
Ok(Destination::Push(user_id, pushkey))
}
@@ -828,6 +913,18 @@ async fn send_events_dest_federation(
server: OwnedServerName,
events: Vec<SendingEvent>,
) -> SendingResult {
let master_monitor = self.new_progress_monitor(
format!("Dispatch {} events to {server} (overall)", events.len()),
Duration::from_secs(self.server.config.sender_timeout),
);
defer! {{
master_monitor.notify_one();
}}
let monitor = self.new_progress_monitor(
format!("Get {} events to send to {server}", events.len()),
Duration::from_secs(5),
);
let pdus: Vec<_> = events
.iter()
.filter_map(|pdu| match pdu {
@@ -850,6 +947,7 @@ async fn send_events_dest_federation(
.filter_map(Result::ok)
.collect();
monitor.notify_one();
if pdus.is_empty() && edus.is_empty() {
return Ok(Destination::Federation(server));
}
@@ -878,6 +976,15 @@ async fn send_events_dest_federation(
"Sending transaction to remote"
);
let start = Instant::now();
let monitor = self.new_progress_monitor(
format!(
"Send {} events ({} PDUs, {} EDUs) to {server}",
events.len(),
request.pdus.len(),
request.edus.len(),
),
Duration::from_secs(self.server.config.sender_timeout),
);
let result = self
.services
.federation
@@ -902,6 +1009,7 @@ async fn send_events_dest_federation(
);
}
}
monitor.notify_one();
match result {
| Err(error) => Err((Destination::Federation(server), error)),
@@ -929,3 +1037,59 @@ pub async fn convert_to_outgoing_federation_event(
to_raw_value(&pdu_json).expect("CanonicalJson is valid serde_json::Value")
}
}
/// Monitors progress by alerting every 10 seconds that elapse after 60
/// seconds if the progress monitor is not marked as finished via the
/// marker.
/// Progress monitoring starts immediately.
async fn progress_monitor(
task_name: &str,
tolerance: Duration,
marker: Arc<tokio::sync::Notify>,
) {
// TODO: this might be useful in more places than the sender service.
// Only temporary for diagnostic purposes rn
let start = Instant::now();
let mut interval = interval(Duration::from_secs(10));
loop {
select! {
_ = interval.tick() => {
let elapsed = start.elapsed();
if elapsed > tolerance {
warn!(
%task_name,
?elapsed,
?tolerance,
"Task has been running for a long time (stuck?)"
);
} else {
trace!(
%task_name,
?elapsed,
?tolerance,
"Task is still running"
);
}
},
() = marker.notified() => {
let elapsed = start.elapsed();
if elapsed > tolerance {
warn!(
%task_name,
?elapsed,
?tolerance,
"Task took a long time (but didn't get stuck)"
);
} else {
trace!(
%task_name,
?elapsed,
?tolerance,
"Task finished"
);
}
break;
}
}
}
}