mirror of
https://forgejo.ellis.link/continuwuation/continuwuity/
synced 2026-06-26 15:01:39 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| acdb8cbf4f | |||
| 251f99e9b7 |
@@ -9,7 +9,7 @@
|
||||
};
|
||||
|
||||
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
|
||||
use conduwuit::info;
|
||||
use conduwuit::{defer, info};
|
||||
use conduwuit_core::{
|
||||
Error, Event, Result, at, debug, err, error,
|
||||
result::LogErr,
|
||||
@@ -50,6 +50,7 @@
|
||||
uint,
|
||||
};
|
||||
use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
|
||||
use tokio::{select, time::interval};
|
||||
|
||||
use super::{Destination, EduBuf, EduVec, Msg, SendingEvent, Service, data::QueueItem};
|
||||
|
||||
@@ -80,9 +81,11 @@ pub(super) async fn sender(self: Arc<Self>, id: usize) -> Result {
|
||||
let mut statuses: CurTransactionStatus = CurTransactionStatus::new();
|
||||
let mut futures: SendingFutures<'_> = FuturesUnordered::new();
|
||||
|
||||
info!(shard_id=%id, "Starting netburst");
|
||||
self.startup_netburst(id, &mut futures, &mut statuses)
|
||||
.boxed()
|
||||
.await;
|
||||
info!(shard_id=%id, "Finished netburst");
|
||||
|
||||
self.work_loop(id, &mut futures, &mut statuses).await;
|
||||
|
||||
@@ -161,8 +164,21 @@ async fn handle_response_ok<'a>(
|
||||
futures: &mut SendingFutures<'a>,
|
||||
statuses: &mut CurTransactionStatus,
|
||||
) {
|
||||
let monitor = self.new_progress_monitor(
|
||||
format!("Handle OK response from {dest:?}"),
|
||||
Duration::from_secs(70),
|
||||
);
|
||||
defer! {{ monitor.notify_one(); }}
|
||||
let start = Instant::now();
|
||||
let _cork = self.db.db.cork();
|
||||
self.db.delete_all_active_requests_for(dest).await;
|
||||
if start.elapsed().as_secs() > 1 {
|
||||
warn!(
|
||||
elapsed=?start.elapsed(),
|
||||
"Deleting all active requests for {dest:?} took a long time"
|
||||
);
|
||||
}
|
||||
let start = Instant::now();
|
||||
|
||||
// Find events that have been added since starting the last request
|
||||
let new_events = self
|
||||
@@ -171,6 +187,12 @@ async fn handle_response_ok<'a>(
|
||||
.take(DEQUEUE_LIMIT)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
if start.elapsed().as_secs() > 1 {
|
||||
warn!(
|
||||
elapsed=?start.elapsed(),
|
||||
"Fetching queued requests for {dest:?} took a long time"
|
||||
);
|
||||
}
|
||||
|
||||
// Insert any pdus we found
|
||||
if !new_events.is_empty() {
|
||||
@@ -191,6 +213,11 @@ async fn handle_request<'a>(
|
||||
futures: &mut SendingFutures<'a>,
|
||||
statuses: &mut CurTransactionStatus,
|
||||
) {
|
||||
let monitor = self.new_progress_monitor(
|
||||
format!("Handle send request {:?}", msg.queue_id),
|
||||
Duration::from_secs(70),
|
||||
);
|
||||
defer! {{ monitor.notify_one(); }}
|
||||
let iv = vec![(msg.queue_id, msg.event)];
|
||||
if let Ok(Some(events)) = self.select_events(&msg.dest, iv, statuses).await {
|
||||
if !events.is_empty() {
|
||||
@@ -243,6 +270,10 @@ async fn startup_netburst<'a>(
|
||||
futures: &mut SendingFutures<'a>,
|
||||
statuses: &mut CurTransactionStatus,
|
||||
) {
|
||||
let monitor = self.new_progress_monitor(
|
||||
format!("Startup netburst on shard {id}"),
|
||||
Duration::from_mins(2),
|
||||
);
|
||||
let keep =
|
||||
usize::try_from(self.server.config.startup_netburst_keep).unwrap_or(usize::MAX);
|
||||
let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new();
|
||||
@@ -268,6 +299,7 @@ async fn startup_netburst<'a>(
|
||||
futures.push(self.send_events(dest.clone(), events));
|
||||
}
|
||||
}
|
||||
monitor.notify_one();
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
@@ -297,11 +329,16 @@ async fn select_events(
|
||||
|
||||
// Must retry any previous transaction for this remote.
|
||||
if retry {
|
||||
let active_requests_task = self.new_progress_monitor(
|
||||
format!("Fetch previous transactions for {dest:?}"),
|
||||
Duration::from_secs(1),
|
||||
);
|
||||
self.db
|
||||
.active_requests_for(dest)
|
||||
.ready_for_each(|(_, e)| events.push(e))
|
||||
.await;
|
||||
|
||||
active_requests_task.notify_one();
|
||||
info!(?dest, "Attempting to retry destination with {} previous events", events.len());
|
||||
return Ok(Some(events));
|
||||
}
|
||||
|
||||
@@ -365,6 +402,11 @@ fn select_events_current(
|
||||
skip_all,
|
||||
)]
|
||||
async fn select_edus(&self, server_name: &ServerName) -> Result<(EduVec, u64)> {
|
||||
let monitor = self.new_progress_monitor(
|
||||
format!("Select EDUs for {server_name}"),
|
||||
Duration::from_secs(1),
|
||||
);
|
||||
defer! {{ monitor.notify_one(); }}
|
||||
// selection window
|
||||
let since = self.db.get_latest_educount(server_name).await;
|
||||
let since_upper = self.services.globals.current_count()?;
|
||||
@@ -413,6 +455,11 @@ async fn select_edus_device_changes(
|
||||
max_edu_count: &AtomicU64,
|
||||
events_len: &AtomicUsize,
|
||||
) -> EduVec {
|
||||
let monitor = self.new_progress_monitor(
|
||||
format!("Select device list update EDUs for {server_name}"),
|
||||
Duration::from_secs(1),
|
||||
);
|
||||
defer! {{ monitor.notify_one(); }}
|
||||
let mut events = EduVec::new();
|
||||
let server_rooms = self.services.state_cache.server_rooms(server_name);
|
||||
|
||||
@@ -470,6 +517,11 @@ async fn select_edus_receipts(
|
||||
since: (u64, u64),
|
||||
max_edu_count: &AtomicU64,
|
||||
) -> Option<EduBuf> {
|
||||
let monitor = self.new_progress_monitor(
|
||||
format!("Select read receipt EDUs for {server_name}"),
|
||||
Duration::from_secs(1),
|
||||
);
|
||||
defer! {{ monitor.notify_one(); }}
|
||||
let mut num = 0;
|
||||
let receipts: BTreeMap<OwnedRoomId, ReceiptMap> = self
|
||||
.services
|
||||
@@ -576,6 +628,11 @@ async fn select_edus_presence(
|
||||
since: (u64, u64),
|
||||
max_edu_count: &AtomicU64,
|
||||
) -> Option<EduBuf> {
|
||||
let monitor = self.new_progress_monitor(
|
||||
format!("Select presence EDUs for {server_name}"),
|
||||
Duration::from_secs(1),
|
||||
);
|
||||
defer! {{ monitor.notify_one(); }}
|
||||
let presence_since = self.services.presence.presence_since(since.0);
|
||||
|
||||
pin_mut!(presence_since);
|
||||
@@ -651,6 +708,21 @@ fn send_events(&self, dest: Destination, events: Vec<SendingEvent>) -> SendingFu
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new progress monitor with the given name, returning a marker
|
||||
/// that can be used to mark the progress monitor as finished.
|
||||
fn new_progress_monitor(
|
||||
&self,
|
||||
task_name: String,
|
||||
tolerance: Duration,
|
||||
) -> Arc<tokio::sync::Notify> {
|
||||
let marker = Arc::new(tokio::sync::Notify::new());
|
||||
let monitor_marker = marker.clone();
|
||||
self.server.runtime().spawn(async move {
|
||||
progress_monitor(&task_name, tolerance, monitor_marker).await;
|
||||
});
|
||||
marker
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "appservice",
|
||||
level = "debug",
|
||||
@@ -664,7 +736,12 @@ async fn send_events_dest_appservice(
|
||||
id: String,
|
||||
events: Vec<SendingEvent>,
|
||||
) -> SendingResult {
|
||||
let monitor = self.new_progress_monitor(
|
||||
format!("Send {} events to appservice {}", events.len(), id),
|
||||
Duration::from_secs(5),
|
||||
);
|
||||
let Some(appservice) = self.services.appservice.get_registration(&id).await else {
|
||||
monitor.notify_one();
|
||||
return Err((
|
||||
Destination::Appservice(id.clone()),
|
||||
err!(Database(warn!(?id, "Missing appservice registration"))),
|
||||
@@ -716,7 +793,10 @@ async fn send_events_dest_appservice(
|
||||
request.ephemeral = edu_jsons;
|
||||
request.to_device = Vec::new(); // TODO
|
||||
|
||||
match self.send_appservice_request(appservice, request).await {
|
||||
let result = self.send_appservice_request(appservice, request).await;
|
||||
monitor.notify_one();
|
||||
|
||||
match result {
|
||||
| Ok(_) => Ok(Destination::Appservice(id)),
|
||||
| Err(e) => Err((Destination::Appservice(id), e)),
|
||||
}
|
||||
@@ -736,7 +816,12 @@ async fn send_events_dest_push(
|
||||
pushkey: String,
|
||||
events: Vec<SendingEvent>,
|
||||
) -> SendingResult {
|
||||
let monitor = self.new_progress_monitor(
|
||||
format!("Send {} events to pusher {pushkey} for {user_id}", events.len()),
|
||||
Duration::from_secs(5),
|
||||
);
|
||||
let Ok(pusher) = self.services.pusher.get_pusher(&user_id, &pushkey).await else {
|
||||
monitor.notify_one();
|
||||
return Err((
|
||||
Destination::Push(user_id.clone(), pushkey.clone()),
|
||||
err!(Database(error!(%user_id, ?pushkey, "Missing pusher"))),
|
||||
@@ -794,6 +879,7 @@ async fn send_events_dest_push(
|
||||
.await
|
||||
.map_err(|e| (Destination::Push(user_id.clone(), pushkey.clone()), e));
|
||||
}
|
||||
monitor.notify_one();
|
||||
|
||||
Ok(Destination::Push(user_id, pushkey))
|
||||
}
|
||||
@@ -803,6 +889,18 @@ async fn send_events_dest_federation(
|
||||
server: OwnedServerName,
|
||||
events: Vec<SendingEvent>,
|
||||
) -> SendingResult {
|
||||
let master_monitor = self.new_progress_monitor(
|
||||
format!("Dispatch {} events to {server} (overall)", events.len()),
|
||||
Duration::from_secs(70),
|
||||
);
|
||||
defer! {{
|
||||
master_monitor.notify_one();
|
||||
}}
|
||||
|
||||
let monitor = self.new_progress_monitor(
|
||||
format!("Get {} events to send to {server}", events.len()),
|
||||
Duration::from_secs(5),
|
||||
);
|
||||
let pdus: Vec<_> = events
|
||||
.iter()
|
||||
.filter_map(|pdu| match pdu {
|
||||
@@ -825,6 +923,7 @@ async fn send_events_dest_federation(
|
||||
.filter_map(Result::ok)
|
||||
.collect();
|
||||
|
||||
monitor.notify_one();
|
||||
if pdus.is_empty() && edus.is_empty() {
|
||||
return Ok(Destination::Federation(server));
|
||||
}
|
||||
@@ -844,6 +943,15 @@ async fn send_events_dest_federation(
|
||||
request.pdus = pdus;
|
||||
request.edus = edus;
|
||||
|
||||
let monitor = self.new_progress_monitor(
|
||||
format!(
|
||||
"Send {} events ({} PDUs, {} EDUs) to {server}",
|
||||
events.len(),
|
||||
request.pdus.len(),
|
||||
request.edus.len(),
|
||||
),
|
||||
Duration::from_mins(1),
|
||||
);
|
||||
let result = self
|
||||
.services
|
||||
.federation
|
||||
@@ -861,6 +969,7 @@ async fn send_events_dest_federation(
|
||||
);
|
||||
}
|
||||
}
|
||||
monitor.notify_one();
|
||||
|
||||
match result {
|
||||
| Err(error) => Err((Destination::Federation(server), error)),
|
||||
@@ -906,3 +1015,59 @@ pub async fn convert_to_outgoing_federation_event(
|
||||
to_raw_value(&pdu_json).expect("CanonicalJson is valid serde_json::Value")
|
||||
}
|
||||
}
|
||||
|
||||
/// Monitors progress by alerting every 10 seconds that elapse after 60
|
||||
/// seconds if the progress monitor is not marked as finished via the
|
||||
/// marker.
|
||||
/// Progress monitoring starts immediately.
|
||||
async fn progress_monitor(
|
||||
task_name: &str,
|
||||
tolerance: Duration,
|
||||
marker: Arc<tokio::sync::Notify>,
|
||||
) {
|
||||
// TODO: this might be useful in more places than the sender service.
|
||||
// Only temporary for diagnostic purposes rn
|
||||
let start = Instant::now();
|
||||
let mut interval = interval(Duration::from_secs(10));
|
||||
loop {
|
||||
select! {
|
||||
_ = interval.tick() => {
|
||||
let elapsed = start.elapsed();
|
||||
if elapsed > tolerance {
|
||||
warn!(
|
||||
%task_name,
|
||||
?elapsed,
|
||||
?tolerance,
|
||||
"Task has been running for a long time (stuck?)"
|
||||
);
|
||||
} else {
|
||||
trace!(
|
||||
%task_name,
|
||||
?elapsed,
|
||||
?tolerance,
|
||||
"Task is still running"
|
||||
);
|
||||
}
|
||||
},
|
||||
() = marker.notified() => {
|
||||
let elapsed = start.elapsed();
|
||||
if elapsed > tolerance {
|
||||
warn!(
|
||||
%task_name,
|
||||
?elapsed,
|
||||
?tolerance,
|
||||
"Task took a long time (but didn't get stuck)"
|
||||
);
|
||||
} else {
|
||||
trace!(
|
||||
%task_name,
|
||||
?elapsed,
|
||||
?tolerance,
|
||||
"Task finished"
|
||||
);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user