Compare commits

...

2 Commits

Author SHA1 Message Date
timedout acdb8cbf4f feat: Add more timing info
Shots in the dark <3
2026-06-26 10:10:37 +01:00
timedout 251f99e9b7 feat: Add progress monitor to federation sender suite 2026-06-26 09:25:14 +01:00
+168 -3
View File
@@ -9,7 +9,7 @@
};
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
use conduwuit::info;
use conduwuit::{defer, info};
use conduwuit_core::{
Error, Event, Result, at, debug, err, error,
result::LogErr,
@@ -50,6 +50,7 @@
uint,
};
use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
use tokio::{select, time::interval};
use super::{Destination, EduBuf, EduVec, Msg, SendingEvent, Service, data::QueueItem};
@@ -80,9 +81,11 @@ pub(super) async fn sender(self: Arc<Self>, id: usize) -> Result {
let mut statuses: CurTransactionStatus = CurTransactionStatus::new();
let mut futures: SendingFutures<'_> = FuturesUnordered::new();
info!(shard_id=%id, "Starting netburst");
self.startup_netburst(id, &mut futures, &mut statuses)
.boxed()
.await;
info!(shard_id=%id, "Finished netburst");
self.work_loop(id, &mut futures, &mut statuses).await;
@@ -161,8 +164,21 @@ async fn handle_response_ok<'a>(
futures: &mut SendingFutures<'a>,
statuses: &mut CurTransactionStatus,
) {
let monitor = self.new_progress_monitor(
format!("Handle OK response from {dest:?}"),
Duration::from_secs(70),
);
defer! {{ monitor.notify_one(); }}
let start = Instant::now();
let _cork = self.db.db.cork();
self.db.delete_all_active_requests_for(dest).await;
if start.elapsed().as_secs() > 1 {
warn!(
elapsed=?start.elapsed(),
"Deleting all active requests for {dest:?} took a long time"
);
}
let start = Instant::now();
// Find events that have been added since starting the last request
let new_events = self
@@ -171,6 +187,12 @@ async fn handle_response_ok<'a>(
.take(DEQUEUE_LIMIT)
.collect::<Vec<_>>()
.await;
if start.elapsed().as_secs() > 1 {
warn!(
elapsed=?start.elapsed(),
"Fetching queued requests for {dest:?} took a long time"
);
}
// Insert any pdus we found
if !new_events.is_empty() {
@@ -191,6 +213,11 @@ async fn handle_request<'a>(
futures: &mut SendingFutures<'a>,
statuses: &mut CurTransactionStatus,
) {
let monitor = self.new_progress_monitor(
format!("Handle send request {:?}", msg.queue_id),
Duration::from_secs(70),
);
defer! {{ monitor.notify_one(); }}
let iv = vec![(msg.queue_id, msg.event)];
if let Ok(Some(events)) = self.select_events(&msg.dest, iv, statuses).await {
if !events.is_empty() {
@@ -243,6 +270,10 @@ async fn startup_netburst<'a>(
futures: &mut SendingFutures<'a>,
statuses: &mut CurTransactionStatus,
) {
let monitor = self.new_progress_monitor(
format!("Startup netburst on shard {id}"),
Duration::from_mins(2),
);
let keep =
usize::try_from(self.server.config.startup_netburst_keep).unwrap_or(usize::MAX);
let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new();
@@ -268,6 +299,7 @@ async fn startup_netburst<'a>(
futures.push(self.send_events(dest.clone(), events));
}
}
monitor.notify_one();
}
#[tracing::instrument(
@@ -297,11 +329,16 @@ async fn select_events(
// Must retry any previous transaction for this remote.
if retry {
let active_requests_task = self.new_progress_monitor(
format!("Fetch previous transactions for {dest:?}"),
Duration::from_secs(1),
);
self.db
.active_requests_for(dest)
.ready_for_each(|(_, e)| events.push(e))
.await;
active_requests_task.notify_one();
info!(?dest, "Attempting to retry destination with {} previous events", events.len());
return Ok(Some(events));
}
@@ -365,6 +402,11 @@ fn select_events_current(
skip_all,
)]
async fn select_edus(&self, server_name: &ServerName) -> Result<(EduVec, u64)> {
let monitor = self.new_progress_monitor(
format!("Select EDUs for {server_name}"),
Duration::from_secs(1),
);
defer! {{ monitor.notify_one(); }}
// selection window
let since = self.db.get_latest_educount(server_name).await;
let since_upper = self.services.globals.current_count()?;
@@ -413,6 +455,11 @@ async fn select_edus_device_changes(
max_edu_count: &AtomicU64,
events_len: &AtomicUsize,
) -> EduVec {
let monitor = self.new_progress_monitor(
format!("Select device list update EDUs for {server_name}"),
Duration::from_secs(1),
);
defer! {{ monitor.notify_one(); }}
let mut events = EduVec::new();
let server_rooms = self.services.state_cache.server_rooms(server_name);
@@ -470,6 +517,11 @@ async fn select_edus_receipts(
since: (u64, u64),
max_edu_count: &AtomicU64,
) -> Option<EduBuf> {
let monitor = self.new_progress_monitor(
format!("Select read receipt EDUs for {server_name}"),
Duration::from_secs(1),
);
defer! {{ monitor.notify_one(); }}
let mut num = 0;
let receipts: BTreeMap<OwnedRoomId, ReceiptMap> = self
.services
@@ -576,6 +628,11 @@ async fn select_edus_presence(
since: (u64, u64),
max_edu_count: &AtomicU64,
) -> Option<EduBuf> {
let monitor = self.new_progress_monitor(
format!("Select presence EDUs for {server_name}"),
Duration::from_secs(1),
);
defer! {{ monitor.notify_one(); }}
let presence_since = self.services.presence.presence_since(since.0);
pin_mut!(presence_since);
@@ -651,6 +708,21 @@ fn send_events(&self, dest: Destination, events: Vec<SendingEvent>) -> SendingFu
}
}
/// Creates a new progress monitor with the given name, returning a marker
/// that can be used to mark the progress monitor as finished.
fn new_progress_monitor(
&self,
task_name: String,
tolerance: Duration,
) -> Arc<tokio::sync::Notify> {
let marker = Arc::new(tokio::sync::Notify::new());
let monitor_marker = marker.clone();
self.server.runtime().spawn(async move {
progress_monitor(&task_name, tolerance, monitor_marker).await;
});
marker
}
#[tracing::instrument(
name = "appservice",
level = "debug",
@@ -664,7 +736,12 @@ async fn send_events_dest_appservice(
id: String,
events: Vec<SendingEvent>,
) -> SendingResult {
let monitor = self.new_progress_monitor(
format!("Send {} events to appservice {}", events.len(), id),
Duration::from_secs(5),
);
let Some(appservice) = self.services.appservice.get_registration(&id).await else {
monitor.notify_one();
return Err((
Destination::Appservice(id.clone()),
err!(Database(warn!(?id, "Missing appservice registration"))),
@@ -716,7 +793,10 @@ async fn send_events_dest_appservice(
request.ephemeral = edu_jsons;
request.to_device = Vec::new(); // TODO
match self.send_appservice_request(appservice, request).await {
let result = self.send_appservice_request(appservice, request).await;
monitor.notify_one();
match result {
| Ok(_) => Ok(Destination::Appservice(id)),
| Err(e) => Err((Destination::Appservice(id), e)),
}
@@ -736,7 +816,12 @@ async fn send_events_dest_push(
pushkey: String,
events: Vec<SendingEvent>,
) -> SendingResult {
let monitor = self.new_progress_monitor(
format!("Send {} events to pusher {pushkey} for {user_id}", events.len()),
Duration::from_secs(5),
);
let Ok(pusher) = self.services.pusher.get_pusher(&user_id, &pushkey).await else {
monitor.notify_one();
return Err((
Destination::Push(user_id.clone(), pushkey.clone()),
err!(Database(error!(%user_id, ?pushkey, "Missing pusher"))),
@@ -794,6 +879,7 @@ async fn send_events_dest_push(
.await
.map_err(|e| (Destination::Push(user_id.clone(), pushkey.clone()), e));
}
monitor.notify_one();
Ok(Destination::Push(user_id, pushkey))
}
@@ -803,6 +889,18 @@ async fn send_events_dest_federation(
server: OwnedServerName,
events: Vec<SendingEvent>,
) -> SendingResult {
let master_monitor = self.new_progress_monitor(
format!("Dispatch {} events to {server} (overall)", events.len()),
Duration::from_secs(70),
);
defer! {{
master_monitor.notify_one();
}}
let monitor = self.new_progress_monitor(
format!("Get {} events to send to {server}", events.len()),
Duration::from_secs(5),
);
let pdus: Vec<_> = events
.iter()
.filter_map(|pdu| match pdu {
@@ -825,6 +923,7 @@ async fn send_events_dest_federation(
.filter_map(Result::ok)
.collect();
monitor.notify_one();
if pdus.is_empty() && edus.is_empty() {
return Ok(Destination::Federation(server));
}
@@ -844,6 +943,15 @@ async fn send_events_dest_federation(
request.pdus = pdus;
request.edus = edus;
let monitor = self.new_progress_monitor(
format!(
"Send {} events ({} PDUs, {} EDUs) to {server}",
events.len(),
request.pdus.len(),
request.edus.len(),
),
Duration::from_mins(1),
);
let result = self
.services
.federation
@@ -861,6 +969,7 @@ async fn send_events_dest_federation(
);
}
}
monitor.notify_one();
match result {
| Err(error) => Err((Destination::Federation(server), error)),
@@ -906,3 +1015,59 @@ pub async fn convert_to_outgoing_federation_event(
to_raw_value(&pdu_json).expect("CanonicalJson is valid serde_json::Value")
}
}
/// Monitors progress by alerting every 10 seconds that elapse after 60
/// seconds if the progress monitor is not marked as finished via the
/// marker.
/// Progress monitoring starts immediately.
async fn progress_monitor(
task_name: &str,
tolerance: Duration,
marker: Arc<tokio::sync::Notify>,
) {
// TODO: this might be useful in more places than the sender service.
// Only temporary for diagnostic purposes rn
let start = Instant::now();
let mut interval = interval(Duration::from_secs(10));
loop {
select! {
_ = interval.tick() => {
let elapsed = start.elapsed();
if elapsed > tolerance {
warn!(
%task_name,
?elapsed,
?tolerance,
"Task has been running for a long time (stuck?)"
);
} else {
trace!(
%task_name,
?elapsed,
?tolerance,
"Task is still running"
);
}
},
() = marker.notified() => {
let elapsed = start.elapsed();
if elapsed > tolerance {
warn!(
%task_name,
?elapsed,
?tolerance,
"Task took a long time (but didn't get stuck)"
);
} else {
trace!(
%task_name,
?elapsed,
?tolerance,
"Task finished"
);
}
break;
}
}
}
}