From 049e2f6287277d9c5f1847be2d15306debbed4e9 Mon Sep 17 00:00:00 2001 From: timedout Date: Wed, 27 May 2026 02:43:08 +0100 Subject: [PATCH] perf(wip): Improve individual events fetcher --- .../fetch_and_handle_outliers.rs | 323 +++++++----------- .../rooms/event_handler/fetch_state.rs | 49 ++- .../event_handler/handle_incoming_pdu.rs | 2 - .../rooms/event_handler/parse_incoming_pdu.rs | 15 +- .../event_handler/upgrade_outlier_pdu.rs | 4 +- src/service/rooms/timeline/backfill.rs | 5 +- 6 files changed, 177 insertions(+), 221 deletions(-) diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs index cbd645d62..b2b0c7873 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs @@ -1,26 +1,24 @@ -use std::{ - collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map}, - time::Instant, -}; +use std::collections::{HashMap, HashSet, VecDeque}; use assign::assign; use conduwuit::{ Err, Event, PduEvent, debug, debug_info, debug_warn, err, error, - matrix::event::gen_event_id_canonical_json, state_res::lexicographical_topological_sort, trace, - utils::{IterStream, continue_exponential_backoff_secs, stream::BroadbandExt}, + utils::{IterStream, stream::BroadbandExt}, warn, }; -use futures::StreamExt; +use futures::{StreamExt, future::select_ok}; use ruma::{ CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedServerName, RoomId, ServerName, UInt, api::federation::event::{get_event, get_missing_events}, int, + room_version_rules::RoomVersionRules, }; use super::get_room_version_rules; +use crate::rooms::event_handler::parse_incoming_pdu::expect_event_id_array; /// Attempts to build a localised directed acyclic graph out of the given PDUs, /// returning them in a topologically sorted order. @@ -91,222 +89,143 @@ pub async fn build_local_dag( } impl super::Service { - /// Find the event and auth it. Once the event is validated (steps 1 - 8) - /// it is appended to the outliers Tree. - /// - /// Returns pdu and if we fetched it over federation the raw json. - /// - /// a. Look in the main timeline (pduid_pdu tree) - /// b. Look at outlier pdu tree - /// c. Ask origin server over federation - /// d. TODO: Ask other servers over federation? - #[deprecated] - pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>( + async fn fetch_and_handle_missing_event_via( + &self, + remote: OwnedServerName, + event_id: OwnedEventId, + room_version_rules: &RoomVersionRules, + ) -> conduwuit::Result<(OwnedEventId, CanonicalJsonObject)> { + let res = self + .services + .sending + .send_federation_request(&remote, get_event::v1::Request::new(event_id.clone())) + .await?; + + let (calculated_event_id, value) = self + .parse_incoming_pdu_with_known_room(&res.pdu, room_version_rules) + .await?; + + if calculated_event_id != event_id { + Err!(Request(BadJson(warn!( + expected=%event_id, + received=%calculated_event_id, + "Server didn't return event id we requested", + )))) + } else { + Ok((event_id, value)) + } + } + + /// Asks remote servers for any individual events that are missing. Should + /// only be used for fetching missing auth events or resolving missing + /// events from state_ids. For all other uses, use get_missing_events. + pub(super) async fn fetch_and_handle_missing_events<'a, Pdu>( &self, origin: &'a ServerName, - events: Events, + events: Vec, create_event: &'a Pdu, room_id: &'a RoomId, - ) -> Vec<(PduEvent, Option>)> + ) -> HashMap where Pdu: Event + Send + Sync, - Events: Iterator + Clone + Send, { - let back_off = |id| match self + let room_version_rules = + &get_room_version_rules(create_event).unwrap_or(RoomVersionRules::V1); + let mut candidates = self .services - .globals - .bad_event_ratelimiter - .write() - .entry(id) - { - | hash_map::Entry::Vacant(e) => { - e.insert((Instant::now(), 1)); - }, - | hash_map::Entry::Occupied(mut e) => { - *e.get_mut() = (Instant::now(), e.get().1.saturating_add(1)); - }, - }; - - let mut events_with_auth_events = Vec::with_capacity(events.clone().count()); - trace!("Fetching {} outlier pdus", events.clone().count()); + .timeline + .candidate_backfill_servers(room_id) + .await; + candidates.insert(origin.to_owned()); + let mut seeded_events = + HashMap::with_capacity(events.len() + (events.len().saturating_mul(3))); + trace!( + "Fetching {} unknown PDUs on demand from {} candidates", + events.len(), + candidates.len() + ); for id in events { - // a. Look in the main timeline (pduid_pdu tree) - // b. Look at outlier pdu tree - // (get_pdu_json checks both) - if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await { - trace!("Found {id} in main timeline or outlier tree"); - events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![])); - continue; - } - - // c. Ask origin server over federation - // We also handle its auth chain here so we don't get a stack overflow in - // handle_outlier_pdu. - let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into(); - let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len()); - - let mut events_all = HashSet::with_capacity(todo_auth_events.len()); - while let Some(next_id) = todo_auth_events.pop_front() { - if let Some((time, tries)) = self - .services - .globals - .bad_event_ratelimiter - .read() - .get(&*next_id) - { - // Exponential backoff - const MIN_DURATION: u64 = 60 * 2; - const MAX_DURATION: u64 = 60 * 60 * 8; - if continue_exponential_backoff_secs( - MIN_DURATION, - MAX_DURATION, - time.elapsed(), - *tries, - ) { - debug_warn!( - tried = ?*tries, - elapsed = ?time.elapsed(), - "Backing off from {next_id}", - ); - continue; - } - } - - if events_all.contains(&next_id) { + let mut todo: VecDeque<_> = [id.clone()].into(); + while let Some(next_id) = todo.pop_front() { + if seeded_events.contains_key(&next_id) { continue; } - - if self.services.timeline.pdu_exists(&next_id).await { + if let Ok(local_pdu) = self.services.timeline.get_pdu(&next_id).await { trace!("Found {next_id} in db"); + seeded_events.insert(id.clone(), local_pdu.into_canonical_object()); continue; } - debug!("Fetching {next_id} over federation from {origin}."); - match self - .services - .sending - .send_federation_request( - origin, - get_event::v1::Request::new((*next_id).to_owned()), - ) - .await - { - | Ok(res) => { - debug!("Got {next_id} over federation from {origin}"); - let Ok(room_version_rules) = get_room_version_rules(create_event) else { - back_off((*next_id).to_owned()); - continue; - }; - - let Ok((calculated_event_id, value)) = - gen_event_id_canonical_json(&res.pdu, &room_version_rules) - else { - back_off((*next_id).to_owned()); - continue; - }; - - if calculated_event_id != *next_id { - warn!( - "Server didn't return event id we requested: requested: \ - {next_id}, we got {calculated_event_id}. Event: {:?}", - &res.pdu - ); - } - - if let Some(auth_events) = value - .get("auth_events") - .and_then(CanonicalJsonValue::as_array) - { - for auth_event in auth_events { - match serde_json::from_value::( - auth_event.clone().into(), - ) { - | Ok(auth_event) => { - trace!( - "Found auth event id {auth_event} for event \ - {next_id}" - ); - todo_auth_events.push_back(auth_event); - }, - | _ => { - warn!("Auth event id is not valid"); - }, - } - } - } else { - warn!("Auth event list invalid"); - } - - events_in_reverse_order.push((next_id.clone(), value)); - events_all.insert(next_id); - }, + debug!("Fetching {next_id} over federation"); + let futures = candidates + .iter() + .map(|remote| { + Box::pin(self.fetch_and_handle_missing_event_via( + remote.clone(), + next_id.clone(), + room_version_rules, + )) + }) + .collect::>(); + let (event_id, value) = match select_ok(futures).await { + | Ok((x, _)) => x, | Err(e) => { - warn!("Failed to fetch auth event {next_id} from {origin}: {e}"); - back_off((*next_id).to_owned()); - }, - } - } - - events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order)); - } - - let mut pdus = Vec::with_capacity(events_with_auth_events.len()); - for (id, local_pdu, events_in_reverse_order) in events_with_auth_events { - // a. Look in the main timeline (pduid_pdu tree) - // b. Look at outlier pdu tree - // (get_pdu_json checks both) - if let Some(local_pdu) = local_pdu { - trace!("Found {id} in main timeline or outlier tree"); - pdus.push((local_pdu.clone(), None)); - } - - for (next_id, value) in events_in_reverse_order.into_iter().rev() { - if let Some((time, tries)) = self - .services - .globals - .bad_event_ratelimiter - .read() - .get(&*next_id) - { - // Exponential backoff - const MIN_DURATION: u64 = 5 * 60; - const MAX_DURATION: u64 = 60 * 60 * 24; - if continue_exponential_backoff_secs( - MIN_DURATION, - MAX_DURATION, - time.elapsed(), - *tries, - ) { - debug!("Backing off from {next_id}"); + warn!("failed to fetch missing event {next_id} from any candidate: {e}"); continue; - } - } - - trace!("Handling outlier {next_id}"); - match Box::pin(self.handle_outlier_pdu( - origin, - create_event, - &next_id, - room_id, - value.clone(), - true, - )) - .await - { - | Ok((pdu, json)) => - if next_id == *id { - trace!("Handled outlier {next_id} (original request)"); - pdus.push((pdu, Some(json))); - }, - | Err(e) => { - warn!("Authentication of event {next_id} failed: {e:?}"); - back_off(next_id); }, - } + }; + let auth_events = + match expect_event_id_array(&value, "auth_events").map_err(|e| { + err!(Request(BadJson(warn!( + %event_id, + "Failed to parse event fetched from remote: {e}" + )))) + }) { + | Ok(auth_events) => auth_events, + | Err(e) => { + warn!( + ?e, + "event {event_id} is malformed (bad auth_events), skipping" + ); + continue; + }, + }; + todo.extend(auth_events); + seeded_events.insert(event_id, value); } } - trace!("Fetched and handled {} outlier pdus", pdus.len()); + + let seeded_ordered = build_local_dag( + &seeded_events + .iter() + .map(|(eid, e)| (eid.to_owned(), e.clone())) + .collect::>(), + ) + .await + .expect("failed to build local DAG"); + let mut pdus = HashMap::with_capacity(seeded_ordered.len()); + for id in seeded_ordered { + let pdu_json = seeded_events.remove(&id).unwrap(); + trace!("Handling missing event {id}"); + match Box::pin(self.handle_outlier_pdu( + origin, + create_event, + &id, + room_id, + pdu_json, + true, + )) + .await + { + | Ok((pdu, _)) => { + let _ = pdus.insert(id, pdu); + }, + | Err(e) => warn!("Authentication of event {id} failed: {e:?}"), + } + } + + trace!("Fetched and handled {} missing PDUs", pdus.len()); pdus } diff --git a/src/service/rooms/event_handler/fetch_state.rs b/src/service/rooms/event_handler/fetch_state.rs index b69432b8f..a97cd9b22 100644 --- a/src/service/rooms/event_handler/fetch_state.rs +++ b/src/service/rooms/event_handler/fetch_state.rs @@ -1,12 +1,13 @@ use std::collections::{HashMap, hash_map}; -use conduwuit::{Err, Event, Result, debug, debug_warn, err}; +use conduwuit::{Err, Event, PduEvent, Result, debug, debug_warn, err, utils::IterStream}; +use futures::StreamExt; use ruma::{ EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids, events::StateEventType, }; -use crate::rooms::short::ShortStateKey; +use crate::{conduwuit::utils::stream::BroadbandExt, rooms::short::ShortStateKey}; impl super::Service { /// Call /state_ids to find out what the state at this pdu is. We trust the @@ -27,7 +28,7 @@ pub(super) async fn fetch_state( where Pdu: Event + Send + Sync, { - let res = self + let res: get_room_state_ids::v1::Response = self .services .sending .send_federation_request( @@ -38,17 +39,39 @@ pub(super) async fn fetch_state( .inspect_err(|e| debug_warn!("Fetching state for event failed: {e}"))?; debug!("Fetching state events"); - let state_ids = res.pdu_ids.iter().map(AsRef::as_ref); - let state_vec = self - .fetch_and_handle_outliers(origin, state_ids, create_event, room_id) + let mut state_events: HashMap = + HashMap::with_capacity(res.pdu_ids.len()); + let to_fetch: Vec = res + .pdu_ids + .clone() + .into_iter() + .stream() + .broad_filter_map(|event_id| async move { + if self.services.timeline.pdu_exists(&event_id).await { + None + } else { + Some(event_id) + } + }) + .collect() .await; + if !to_fetch.is_empty() { + if to_fetch.len() >= 100 { + // That's a lot of events to fetch, just ask for the full state + // at that point. TODO: fetch /state + } + state_events.extend( + self.fetch_and_handle_missing_events(origin, to_fetch, create_event, room_id) + .await, + ); + } let mut state: HashMap = - HashMap::with_capacity(state_vec.len()); - for (pdu, _) in state_vec { - let state_key = pdu - .state_key() - .ok_or_else(|| err!(Database("Found non-state pdu in state events.")))?; + HashMap::with_capacity(state_events.len()); + for (event_id, pdu) in state_events { + let state_key = pdu.state_key().ok_or_else(|| { + err!(Database("Found non-state pdu in state events: {event_id}")) + })?; let shortstatekey = self .services @@ -62,8 +85,8 @@ pub(super) async fn fetch_state( }, | hash_map::Entry::Occupied(_) => { return Err!(Database( - "State event's type and state_key combination exists multiple times: \ - {}, {}", + "State event's type and state_key combination exists multiple times \ + ({event_id}): {}, {}", pdu.kind(), state_key )); diff --git a/src/service/rooms/event_handler/handle_incoming_pdu.rs b/src/service/rooms/event_handler/handle_incoming_pdu.rs index 163779814..736288e15 100644 --- a/src/service/rooms/event_handler/handle_incoming_pdu.rs +++ b/src/service/rooms/event_handler/handle_incoming_pdu.rs @@ -14,7 +14,6 @@ room::member::{MembershipState, RoomMemberEventContent}, }, }; -use tracing::debug; use crate::rooms::timeline::{RawPduId, pdu_fits}; @@ -248,7 +247,6 @@ pub async fn handle_incoming_pdu<'a>( // 9. Fetch any missing prev events doing all checks listed here starting at 1. // These are timeline events - debug!("Handling previous events"); self.fetch_prevs(room_id, create_event, &incoming_pdu, origin) .await?; diff --git a/src/service/rooms/event_handler/parse_incoming_pdu.rs b/src/service/rooms/event_handler/parse_incoming_pdu.rs index 502703b4f..067851515 100644 --- a/src/service/rooms/event_handler/parse_incoming_pdu.rs +++ b/src/service/rooms/event_handler/parse_incoming_pdu.rs @@ -7,7 +7,7 @@ use itertools::Itertools; use ruma::{ CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, RoomId, - RoomVersionId, + RoomVersionId, room_version_rules::RoomVersionRules, }; use serde_json::value::RawValue as RawJsonValue; @@ -105,6 +105,19 @@ pub fn validate_pdu(&self, pdu: &CanonicalJsonObject) -> Result { Ok(()) } + pub async fn parse_incoming_pdu_with_known_room( + &self, + pdu: &RawJsonValue, + room_version_rules: &RoomVersionRules, + ) -> Result<(OwnedEventId, CanonicalJsonObject)> { + let (event_id, value) = + gen_event_id_canonical_json(pdu, room_version_rules).map_err(|e| { + err!(Request(InvalidParam("Could not convert event to canonical json: {e}"))) + })?; + self.validate_pdu(&value)?; + Ok((event_id, value)) + } + pub async fn parse_incoming_pdu(&self, pdu: &RawJsonValue) -> Result { let value = serde_json::from_str::(pdu.get()).map_err(|e| { err!(BadServerResponse(debug_warn!("Error parsing incoming event {e:?}"))) diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index b72fe5bd0..7a1865152 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -54,9 +54,9 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( .is_event_soft_failed(incoming_pdu.event_id()) ); if rejected { - return Err!(Request(InvalidParam("Event has been rejected"))); + return Err!(Request(Forbidden("Event has been rejected"))); } else if soft_failed { - return Err!(Request(InvalidParam("Event has been soft-failed"))); + return Err!(Request(Forbidden("Event has been soft-failed"))); } debug!( diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index 26c36bf69..36b01c8e8 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -224,7 +224,10 @@ pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box) -> /// Determines which servers are trusted enough to provide backfill in a /// room. - async fn candidate_backfill_servers(&self, room_id: &RoomId) -> HashSet { + pub(crate) async fn candidate_backfill_servers( + &self, + room_id: &RoomId, + ) -> HashSet { let mut candidate_backfill_servers = HashSet::new(); let power_levels = self