perf(wip): Improve individual events fetcher

This commit is contained in:
timedout
2026-05-27 02:43:08 +01:00
committed by Jade Ellis
parent b619eae9ef
commit 049e2f6287
6 changed files with 177 additions and 221 deletions
@@ -1,26 +1,24 @@
use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map},
time::Instant,
};
use std::collections::{HashMap, HashSet, VecDeque};
use assign::assign;
use conduwuit::{
Err, Event, PduEvent, debug, debug_info, debug_warn, err, error,
matrix::event::gen_event_id_canonical_json,
state_res::lexicographical_topological_sort,
trace,
utils::{IterStream, continue_exponential_backoff_secs, stream::BroadbandExt},
utils::{IterStream, stream::BroadbandExt},
warn,
};
use futures::StreamExt;
use futures::{StreamExt, future::select_ok};
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
OwnedRoomId, OwnedServerName, RoomId, ServerName, UInt,
api::federation::event::{get_event, get_missing_events},
int,
room_version_rules::RoomVersionRules,
};
use super::get_room_version_rules;
use crate::rooms::event_handler::parse_incoming_pdu::expect_event_id_array;
/// Attempts to build a localised directed acyclic graph out of the given PDUs,
/// returning them in a topologically sorted order.
@@ -91,222 +89,143 @@ pub async fn build_local_dag<S: std::hash::BuildHasher>(
}
impl super::Service {
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
/// it is appended to the outliers Tree.
///
/// Returns pdu and if we fetched it over federation the raw json.
///
/// a. Look in the main timeline (pduid_pdu tree)
/// b. Look at outlier pdu tree
/// c. Ask origin server over federation
/// d. TODO: Ask other servers over federation?
#[deprecated]
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
async fn fetch_and_handle_missing_event_via(
&self,
remote: OwnedServerName,
event_id: OwnedEventId,
room_version_rules: &RoomVersionRules,
) -> conduwuit::Result<(OwnedEventId, CanonicalJsonObject)> {
let res = self
.services
.sending
.send_federation_request(&remote, get_event::v1::Request::new(event_id.clone()))
.await?;
let (calculated_event_id, value) = self
.parse_incoming_pdu_with_known_room(&res.pdu, room_version_rules)
.await?;
if calculated_event_id != event_id {
Err!(Request(BadJson(warn!(
expected=%event_id,
received=%calculated_event_id,
"Server didn't return event id we requested",
))))
} else {
Ok((event_id, value))
}
}
/// Asks remote servers for any individual events that are missing. Should
/// only be used for fetching missing auth events or resolving missing
/// events from state_ids. For all other uses, use get_missing_events.
pub(super) async fn fetch_and_handle_missing_events<'a, Pdu>(
&self,
origin: &'a ServerName,
events: Events,
events: Vec<OwnedEventId>,
create_event: &'a Pdu,
room_id: &'a RoomId,
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
) -> HashMap<OwnedEventId, PduEvent>
where
Pdu: Event + Send + Sync,
Events: Iterator<Item = &'a EventId> + Clone + Send,
{
let back_off = |id| match self
let room_version_rules =
&get_room_version_rules(create_event).unwrap_or(RoomVersionRules::V1);
let mut candidates = self
.services
.globals
.bad_event_ratelimiter
.write()
.entry(id)
{
| hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
},
| hash_map::Entry::Occupied(mut e) => {
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
},
};
let mut events_with_auth_events = Vec::with_capacity(events.clone().count());
trace!("Fetching {} outlier pdus", events.clone().count());
.timeline
.candidate_backfill_servers(room_id)
.await;
candidates.insert(origin.to_owned());
let mut seeded_events =
HashMap::with_capacity(events.len() + (events.len().saturating_mul(3)));
trace!(
"Fetching {} unknown PDUs on demand from {} candidates",
events.len(),
candidates.len()
);
for id in events {
// a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree
// (get_pdu_json checks both)
if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await {
trace!("Found {id} in main timeline or outlier tree");
events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![]));
continue;
}
// c. Ask origin server over federation
// We also handle its auth chain here so we don't get a stack overflow in
// handle_outlier_pdu.
let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into();
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
while let Some(next_id) = todo_auth_events.pop_front() {
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.get(&*next_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 60 * 2;
const MAX_DURATION: u64 = 60 * 60 * 8;
if continue_exponential_backoff_secs(
MIN_DURATION,
MAX_DURATION,
time.elapsed(),
*tries,
) {
debug_warn!(
tried = ?*tries,
elapsed = ?time.elapsed(),
"Backing off from {next_id}",
);
continue;
}
}
if events_all.contains(&next_id) {
let mut todo: VecDeque<_> = [id.clone()].into();
while let Some(next_id) = todo.pop_front() {
if seeded_events.contains_key(&next_id) {
continue;
}
if self.services.timeline.pdu_exists(&next_id).await {
if let Ok(local_pdu) = self.services.timeline.get_pdu(&next_id).await {
trace!("Found {next_id} in db");
seeded_events.insert(id.clone(), local_pdu.into_canonical_object());
continue;
}
debug!("Fetching {next_id} over federation from {origin}.");
match self
.services
.sending
.send_federation_request(
origin,
get_event::v1::Request::new((*next_id).to_owned()),
)
.await
{
| Ok(res) => {
debug!("Got {next_id} over federation from {origin}");
let Ok(room_version_rules) = get_room_version_rules(create_event) else {
back_off((*next_id).to_owned());
continue;
};
let Ok((calculated_event_id, value)) =
gen_event_id_canonical_json(&res.pdu, &room_version_rules)
else {
back_off((*next_id).to_owned());
continue;
};
if calculated_event_id != *next_id {
warn!(
"Server didn't return event id we requested: requested: \
{next_id}, we got {calculated_event_id}. Event: {:?}",
&res.pdu
);
}
if let Some(auth_events) = value
.get("auth_events")
.and_then(CanonicalJsonValue::as_array)
{
for auth_event in auth_events {
match serde_json::from_value::<OwnedEventId>(
auth_event.clone().into(),
) {
| Ok(auth_event) => {
trace!(
"Found auth event id {auth_event} for event \
{next_id}"
);
todo_auth_events.push_back(auth_event);
},
| _ => {
warn!("Auth event id is not valid");
},
}
}
} else {
warn!("Auth event list invalid");
}
events_in_reverse_order.push((next_id.clone(), value));
events_all.insert(next_id);
},
debug!("Fetching {next_id} over federation");
let futures = candidates
.iter()
.map(|remote| {
Box::pin(self.fetch_and_handle_missing_event_via(
remote.clone(),
next_id.clone(),
room_version_rules,
))
})
.collect::<Vec<_>>();
let (event_id, value) = match select_ok(futures).await {
| Ok((x, _)) => x,
| Err(e) => {
warn!("Failed to fetch auth event {next_id} from {origin}: {e}");
back_off((*next_id).to_owned());
},
}
}
events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order));
}
let mut pdus = Vec::with_capacity(events_with_auth_events.len());
for (id, local_pdu, events_in_reverse_order) in events_with_auth_events {
// a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree
// (get_pdu_json checks both)
if let Some(local_pdu) = local_pdu {
trace!("Found {id} in main timeline or outlier tree");
pdus.push((local_pdu.clone(), None));
}
for (next_id, value) in events_in_reverse_order.into_iter().rev() {
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.get(&*next_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 5 * 60;
const MAX_DURATION: u64 = 60 * 60 * 24;
if continue_exponential_backoff_secs(
MIN_DURATION,
MAX_DURATION,
time.elapsed(),
*tries,
) {
debug!("Backing off from {next_id}");
warn!("failed to fetch missing event {next_id} from any candidate: {e}");
continue;
}
}
trace!("Handling outlier {next_id}");
match Box::pin(self.handle_outlier_pdu(
origin,
create_event,
&next_id,
room_id,
value.clone(),
true,
))
.await
{
| Ok((pdu, json)) =>
if next_id == *id {
trace!("Handled outlier {next_id} (original request)");
pdus.push((pdu, Some(json)));
},
| Err(e) => {
warn!("Authentication of event {next_id} failed: {e:?}");
back_off(next_id);
},
}
};
let auth_events =
match expect_event_id_array(&value, "auth_events").map_err(|e| {
err!(Request(BadJson(warn!(
%event_id,
"Failed to parse event fetched from remote: {e}"
))))
}) {
| Ok(auth_events) => auth_events,
| Err(e) => {
warn!(
?e,
"event {event_id} is malformed (bad auth_events), skipping"
);
continue;
},
};
todo.extend(auth_events);
seeded_events.insert(event_id, value);
}
}
trace!("Fetched and handled {} outlier pdus", pdus.len());
let seeded_ordered = build_local_dag(
&seeded_events
.iter()
.map(|(eid, e)| (eid.to_owned(), e.clone()))
.collect::<HashMap<OwnedEventId, CanonicalJsonObject>>(),
)
.await
.expect("failed to build local DAG");
let mut pdus = HashMap::with_capacity(seeded_ordered.len());
for id in seeded_ordered {
let pdu_json = seeded_events.remove(&id).unwrap();
trace!("Handling missing event {id}");
match Box::pin(self.handle_outlier_pdu(
origin,
create_event,
&id,
room_id,
pdu_json,
true,
))
.await
{
| Ok((pdu, _)) => {
let _ = pdus.insert(id, pdu);
},
| Err(e) => warn!("Authentication of event {id} failed: {e:?}"),
}
}
trace!("Fetched and handled {} missing PDUs", pdus.len());
pdus
}
+36 -13
View File
@@ -1,12 +1,13 @@
use std::collections::{HashMap, hash_map};
use conduwuit::{Err, Event, Result, debug, debug_warn, err};
use conduwuit::{Err, Event, PduEvent, Result, debug, debug_warn, err, utils::IterStream};
use futures::StreamExt;
use ruma::{
EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids,
events::StateEventType,
};
use crate::rooms::short::ShortStateKey;
use crate::{conduwuit::utils::stream::BroadbandExt, rooms::short::ShortStateKey};
impl super::Service {
/// Call /state_ids to find out what the state at this pdu is. We trust the
@@ -27,7 +28,7 @@ pub(super) async fn fetch_state<Pdu>(
where
Pdu: Event + Send + Sync,
{
let res = self
let res: get_room_state_ids::v1::Response = self
.services
.sending
.send_federation_request(
@@ -38,17 +39,39 @@ pub(super) async fn fetch_state<Pdu>(
.inspect_err(|e| debug_warn!("Fetching state for event failed: {e}"))?;
debug!("Fetching state events");
let state_ids = res.pdu_ids.iter().map(AsRef::as_ref);
let state_vec = self
.fetch_and_handle_outliers(origin, state_ids, create_event, room_id)
let mut state_events: HashMap<OwnedEventId, PduEvent> =
HashMap::with_capacity(res.pdu_ids.len());
let to_fetch: Vec<OwnedEventId> = res
.pdu_ids
.clone()
.into_iter()
.stream()
.broad_filter_map(|event_id| async move {
if self.services.timeline.pdu_exists(&event_id).await {
None
} else {
Some(event_id)
}
})
.collect()
.await;
if !to_fetch.is_empty() {
if to_fetch.len() >= 100 {
// That's a lot of events to fetch, just ask for the full state
// at that point. TODO: fetch /state
}
state_events.extend(
self.fetch_and_handle_missing_events(origin, to_fetch, create_event, room_id)
.await,
);
}
let mut state: HashMap<ShortStateKey, OwnedEventId> =
HashMap::with_capacity(state_vec.len());
for (pdu, _) in state_vec {
let state_key = pdu
.state_key()
.ok_or_else(|| err!(Database("Found non-state pdu in state events.")))?;
HashMap::with_capacity(state_events.len());
for (event_id, pdu) in state_events {
let state_key = pdu.state_key().ok_or_else(|| {
err!(Database("Found non-state pdu in state events: {event_id}"))
})?;
let shortstatekey = self
.services
@@ -62,8 +85,8 @@ pub(super) async fn fetch_state<Pdu>(
},
| hash_map::Entry::Occupied(_) => {
return Err!(Database(
"State event's type and state_key combination exists multiple times: \
{}, {}",
"State event's type and state_key combination exists multiple times \
({event_id}): {}, {}",
pdu.kind(),
state_key
));
@@ -14,7 +14,6 @@
room::member::{MembershipState, RoomMemberEventContent},
},
};
use tracing::debug;
use crate::rooms::timeline::{RawPduId, pdu_fits};
@@ -248,7 +247,6 @@ pub async fn handle_incoming_pdu<'a>(
// 9. Fetch any missing prev events doing all checks listed here starting at 1.
// These are timeline events
debug!("Handling previous events");
self.fetch_prevs(room_id, create_event, &incoming_pdu, origin)
.await?;
@@ -7,7 +7,7 @@
use itertools::Itertools;
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, RoomId,
RoomVersionId,
RoomVersionId, room_version_rules::RoomVersionRules,
};
use serde_json::value::RawValue as RawJsonValue;
@@ -105,6 +105,19 @@ pub fn validate_pdu(&self, pdu: &CanonicalJsonObject) -> Result {
Ok(())
}
pub async fn parse_incoming_pdu_with_known_room(
&self,
pdu: &RawJsonValue,
room_version_rules: &RoomVersionRules,
) -> Result<(OwnedEventId, CanonicalJsonObject)> {
let (event_id, value) =
gen_event_id_canonical_json(pdu, room_version_rules).map_err(|e| {
err!(Request(InvalidParam("Could not convert event to canonical json: {e}")))
})?;
self.validate_pdu(&value)?;
Ok((event_id, value))
}
pub async fn parse_incoming_pdu(&self, pdu: &RawJsonValue) -> Result<Parsed> {
let value = serde_json::from_str::<CanonicalJsonObject>(pdu.get()).map_err(|e| {
err!(BadServerResponse(debug_warn!("Error parsing incoming event {e:?}")))
@@ -54,9 +54,9 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
.is_event_soft_failed(incoming_pdu.event_id())
);
if rejected {
return Err!(Request(InvalidParam("Event has been rejected")));
return Err!(Request(Forbidden("Event has been rejected")));
} else if soft_failed {
return Err!(Request(InvalidParam("Event has been soft-failed")));
return Err!(Request(Forbidden("Event has been soft-failed")));
}
debug!(
+4 -1
View File
@@ -224,7 +224,10 @@ pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box<RawJsonValue>) ->
/// Determines which servers are trusted enough to provide backfill in a
/// room.
async fn candidate_backfill_servers(&self, room_id: &RoomId) -> HashSet<OwnedServerName> {
pub(crate) async fn candidate_backfill_servers(
&self,
room_id: &RoomId,
) -> HashSet<OwnedServerName> {
let mut candidate_backfill_servers = HashSet::new();
let power_levels = self