Compare commits

...

74 Commits

Author SHA1 Message Date
Jade Ellis f9a43abb7f Fix: Correct bad merge
I honestly have no idea what happened here
2026-06-28 01:34:14 +01:00
Jade Ellis 3649f76045 feat: Debounce extremity squashing
Additionaly circuit-breaks it if the squash would have only
b triggered by dummy events / other squashes
2026-06-28 00:20:15 +01:00
Jade Ellis bdd8ad1413 chore: Box large futures 2026-06-27 20:39:50 +01:00
timedout 46aefa8307 perf: Throttle dummy events to prevent stampeding 2026-06-27 20:33:04 +01:00
timedout 6343b62778 chore: Drop unused param from handle_outlier_pdu 2026-06-27 20:29:09 +01:00
timedout 6ca3acbeff perf: Use a hashmap for full-state filtering 2026-06-27 20:29:04 +01:00
timedout 16f5d4b0e2 fix: Inverted power level check in extremity squash 2026-06-27 20:29:02 +01:00
timedout 350f789737 fix: Rename variables in auth event fetcher
Also fixes a couple bugs where events were being misattributed
2026-06-27 20:28:59 +01:00
timedout 54e89daa46 perf: Don't try to fetch prevs we already fetched
graphs are hard
2026-06-27 20:28:56 +01:00
timedout 770343b1c0 fix: Fall back to legacy behaviour when prev events are missed from get_missing_events 2026-06-27 20:28:53 +01:00
timedout 4cf13a8e5a style: Use more explicit variable names 2026-06-27 20:28:48 +01:00
timedout b73f34454f style: Use user_can_send_message 2026-06-27 20:27:44 +01:00
timedout 754f239241 style: Re-use GET_MISSING_EVENTS_MAX_BATCH_SIZE 2026-06-27 20:27:41 +01:00
timedout 8e845235d2 feat: Add !admin debug rooms-by-extremity-count command 2026-06-27 20:27:38 +01:00
timedout 04ffccdeac chore: Reformat 2026-06-27 20:23:29 +01:00
timedout 447b68e306 fix: Prevent arbitrary state injection attack 2026-06-27 20:23:27 +01:00
timedout 7471141eba style: Check power levels before attempting to send extremity squashes
Solves a problem where the console screams in agony when local users can't send dummy events
2026-06-27 20:23:26 +01:00
timedout a1fba2218d perf: Squash weird mutable variable 2026-06-27 20:23:23 +01:00
timedout 22c7fa1b08 style: Fix up some TODOs 2026-06-27 20:23:19 +01:00
timedout 9028b18792 style: Adjust docstrings and dodgy comment 2026-06-27 20:21:43 +01:00
timedout a5bf0f2bfb fix: Default PDU content to empty object instead of literal NULL 2026-06-27 20:21:39 +01:00
timedout 2e33760575 fix: un-forget how streams work 2026-06-27 20:21:37 +01:00
timedout 3b8788d5d2 perf: Remove huge clone and tackle TODOs 2026-06-27 20:21:35 +01:00
timedout 00a9ef1c06 feat: Automatically squash extremities when they exceed a threshold
Attempts to tackle #1844
2026-06-27 20:21:32 +01:00
timedout fd0f458978 style: Tidy up 2026-06-27 20:21:30 +01:00
timedout 3ff2797b11 fix: Make fetch_state_ids_from_backfill_servers candidate-free safe 2026-06-27 20:21:28 +01:00
timedout f0e4f53f03 style: Resolve lint complaints 2026-06-27 20:21:26 +01:00
timedout 2b486e3399 fix: Correctly handle still-missing state, always fetch full state atomically if regular fetch fails 2026-06-27 20:21:23 +01:00
timedout 29b5df8722 fix: Correct inverted boolean condition, add explicit timeout on /state fetch 2026-06-27 20:21:20 +01:00
timedout ae378bbdb7 perf: Always fetch at least N events per GME 2026-06-27 20:21:18 +01:00
timedout 0e0c7826ab fix: Correctly pre-populate state events vec with known events 2026-06-27 20:21:15 +01:00
timedout fa5e0faaf7 fix: Friendly assertations 2026-06-27 20:21:12 +01:00
timedout b07a5c780c perf: Don't try to re-persist non-outliers we already have 2026-06-27 20:21:11 +01:00
timedout 13e240ebdb perf: Don't add trees we already have to latest boundary 2026-06-27 20:21:07 +01:00
timedout a701b610c5 fix: Be noisy when there's no incoming state 2026-06-27 20:21:04 +01:00
timedout 17af0e54d9 fix: Elide auth chain from fetch_and_handle_outliers 2026-06-27 20:21:01 +01:00
timedout efb5ee0bf0 fix: Progress log in fetch_prev 2026-06-27 20:20:59 +01:00
timedout 7a681c7c43 fix: Downgrade safe assert to debug assert 2026-06-27 20:20:57 +01:00
timedout 7e87912177 fix: Don't download the world 2026-06-27 20:20:55 +01:00
timedout e76881d64a feat: Make logging more verbose to diagnose the aranjesplosion 2026-06-27 20:20:51 +01:00
timedout ad98dca8c2 feat: Include timing information in debug logs 2026-06-27 20:16:53 +01:00
timedout 1fc7c033cb fix: Don't treat prev outlier upgrades as fetch failures 2026-06-27 20:16:50 +01:00
timedout 8d4a313657 fix: Ask more servers for state_ids when origin fails to provide
Some servers reference events in prev_events that they might not yet have finished processing, so this allows us to at least attempt to get the state from another trustworthy server in the room that might be faster. I don't think this is too effective, however it's more effective than giving up immediately.
2026-06-27 20:16:48 +01:00
timedout 3a7259250b fix: Remove redundant check 2
This may look scary, but this is safe because event auth performs the same check, and will reject the event if it doesn't reference the create event correctly.
2026-06-27 20:16:46 +01:00
timedout 9292f16cd7 fix: Remove redundant check that accidentally banned everyone 2026-06-27 20:16:44 +01:00
timedout 95473f4c24 fix: Make PDU handle errors noisier & correct error types 2026-06-27 20:16:41 +01:00
timedout 6d606d9277 fix: Make dedupe noisy, don't allow non-create event as create event 2026-06-27 20:13:30 +01:00
timedout d1baaaab48 fix: Don't silence PDU handle logs 2026-06-27 20:13:27 +01:00
timedout 921a83825a style: Rename gapfill helpers instruments 2026-06-27 20:13:25 +01:00
timedout 4e3b3d133d fix: Properly remove event_id from the PDU JSON before upgrading it 2026-06-27 20:13:22 +01:00
timedout a9376b3b04 fix: Hold a federation room lock while remotely joining a room 2026-06-27 20:13:20 +01:00
timedout f7025af97e fix: Replace our local extremity tracking when joining a disconnected room remotely 2026-06-27 20:13:18 +01:00
timedout a3544ba01f fix: Don't try and fetch zero events 2026-06-27 20:13:15 +01:00
timedout c6acbf5440 fix: Fall back to atomic fetch when full-state fetch fails 2026-06-27 20:13:12 +01:00
timedout ba5beac7fc fix: Remove short-term memory loss
I keep writing forgetful code, it's a problem
2026-06-27 20:13:11 +01:00
timedout 4b8cf7fb25 fix: Don't try to fetch the same event endlessly 2026-06-27 20:13:08 +01:00
timedout dc966804e6 fix: Don't repeat already-included metadata in fetch_state instrument 2026-06-27 20:13:06 +01:00
timedout 98c1a466fd feat: Enhance reliability by fetching full state when we're missing a lot of auth events 2026-06-27 20:13:03 +01:00
timedout bcccb4373f fix: Calculate max iterations dynamically, and bump max prevs 2026-06-27 20:10:20 +01:00
timedout 049e2f6287 perf(wip): Improve individual events fetcher 2026-06-27 20:10:17 +01:00
timedout b619eae9ef fix: Don't lie about using already-known content 2026-06-27 19:49:47 +01:00
timedout 4b673692f6 fix: Be smarter when re-receiving already-seen PDUs 2026-06-27 19:49:44 +01:00
timedout 9633a37421 perf: Don't re-process events as outliers 2026-06-27 19:49:41 +01:00
timedout d090f5d769 style: Improve logging 2026-06-27 19:49:38 +01:00
timedout 51d7a82aa3 fix: Lower floor for min depth 2026-06-27 19:49:35 +01:00
timedout d2c183baeb fix: Only increment mindepth on state events 2026-06-27 19:49:33 +01:00
timedout b63eaa81d4 chore: Add newsfrag 2026-06-27 19:49:29 +01:00
timedout 94db17b53a feat: Keep track of a min_depth value
Should prevent weird situations where we accidentally gapfill into backfill territory
2026-06-27 19:49:27 +01:00
timedout ab5202677b perf: Increase default max_fetch_prev_events to 256 2026-06-27 19:40:07 +01:00
timedout 469c1b2ed7 perf: Make max gap depth fetch configurable 2026-06-27 19:40:05 +01:00
timedout 1b5e1e9886 perf: Improve gap filling, handle missing auth events better 2026-06-27 19:40:02 +01:00
timedout d8d7f863e5 fix: This is some bullshit I tell you 2026-06-27 19:22:08 +01:00
timedout e8fee00df6 feat: Better prev event fetching
fix: Don't panic in debug mode when making an empty notary query
2026-06-27 19:14:19 +01:00
timedout 266616de7b feat: Add backfill_missing_events helper 2026-06-27 19:02:10 +01:00
24 changed files with 1653 additions and 638 deletions
+2
View File
@@ -0,0 +1,2 @@
Improved the performance and reliability of fetching missing events, improving network partition recovery. Contributed
by @nex.
+9 -1
View File
@@ -297,7 +297,7 @@
# This item is undocumented. Please contribute documentation for it.
#
#max_fetch_prev_events = 192
#max_fetch_prev_events = 1024
# How many incoming federation transactions the server is willing to be
# processing at any given time before it becomes overloaded and starts
@@ -645,6 +645,14 @@
#
#default_room_acl_deny =
# The number of forward extremities to tolerate in a room before
# attempting to manually squash them with a "dummy event". Setting this
# above 20 will hinder its efficacy, and setting it below 5 will cause
# more dummy events to be sent than necessary (which increases federation
# traffic).
#
#dummy_event_threshold = 10
# Enable OpenTelemetry OTLP tracing export. This replaces the deprecated
# Jaeger exporter. Traces will be sent via OTLP to a collector (such as
# Jaeger) that supports the OpenTelemetry Protocol.
+54
View File
@@ -31,6 +31,8 @@
};
use tracing_subscriber::EnvFilter;
use crate::PAGE_SIZE;
#[derive(Clone, Copy, Eq, PartialEq)]
enum NodeStatus {
Normal(bool),
@@ -1166,4 +1168,56 @@ pub(super) async fn send_test_email(&self) -> Result {
Ok(())
}
pub(super) async fn rooms_by_extremity_count(&self, page: Option<usize>) -> Result {
let page = page.unwrap_or(1);
// My Giant Chain:tm:
let mapped: HashMap<OwnedRoomId, u64> = self
.services
.rooms
.state
.all_forward_extremities()
.ready_fold(HashMap::new(), move |mut map, (room_id, _)| {
let count: u64 = map.get(&room_id).copied().unwrap_or(0);
map.insert(room_id, count.saturating_add(1));
map
})
.await
.into_iter()
.filter_map(|(room_id, count)| (count >= 2).then_some((room_id, count)))
.collect();
if mapped.is_empty() {
return Err!("No more rooms.");
}
let mut rooms = mapped.keys().collect::<Vec<_>>();
rooms.sort_by_key(|room_id| {
mapped
.get(*room_id)
.copied()
.expect("keys must have values")
});
rooms.reverse();
let body = rooms
.into_iter()
.stream()
.skip(page.saturating_sub(1).saturating_mul(PAGE_SIZE))
.take(PAGE_SIZE)
.map(|room_id| {
format!(
"{room_id}: {}",
mapped.get(room_id).copied().expect("keys must have values")
)
})
.collect::<Vec<_>>()
.await;
self.write_str(&format!(
"Rooms by extremity count ({}):\n```\n{}\n```",
body.len(),
body.join("\n")
))
.await
}
}
+5
View File
@@ -245,6 +245,11 @@ pub enum DebugCommand {
/// Send a test email to the invoking admin's email address
SendTestEmail,
/// Lists room IDs by forward extremity count in descending order
RoomsByExtremityCount {
page: Option<usize>,
},
/// Developer test stubs
#[command(subcommand)]
#[allow(non_snake_case)]
+3 -7
View File
@@ -4,15 +4,11 @@
use conduwuit::{Err, Event, Result, debug, info, trace, utils::to_canonical_object, warn};
use ruma::{OwnedEventId, api::federation::event::get_missing_events};
use serde_json::{json, value::RawValue};
use service::rooms::event_handler::GET_MISSING_EVENTS_MAX_BATCH_SIZE;
use super::AccessCheck;
use crate::Ruma;
/// arbitrary number but synapse's is 20 and we can handle lots of these anyways
const LIMIT_MAX: usize = 50;
/// spec says default is 10
const LIMIT_DEFAULT: usize = 10;
/// # `POST /_matrix/federation/v1/get_missing_events/{roomId}`
///
/// Retrieves events that the sender is missing.
@@ -45,8 +41,8 @@ pub(crate) async fn get_missing_events_route(
let limit = body
.limit
.try_into()
.unwrap_or(LIMIT_DEFAULT)
.min(LIMIT_MAX);
.unwrap_or(10)
.min(GET_MISSING_EVENTS_MAX_BATCH_SIZE);
let room_version = services.rooms.state.get_room_version(&body.room_id).await?;
+11 -3
View File
@@ -7,7 +7,7 @@
use axum::extract::State;
use axum_client_ip::ClientIp;
use conduwuit::{
Err, Error, Result, debug, debug_warn, err, error,
Err, Error, Result, debug, debug_error, debug_warn, err, error,
result::LogErr,
state_res::lexicographical_topological_sort,
trace,
@@ -133,6 +133,7 @@ async fn wait_for_result(
}
#[instrument(
name="transaction"
skip_all,
fields(
id = ?body.transaction_id.as_str(),
@@ -174,8 +175,14 @@ async fn process_inbound_transaction(
for (id, result) in &results {
if let Err(e) = result {
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
debug_warn!("Incoming PDU failed {id}: {e:?}");
match e {
| Error::BadRequest(
ErrorKind::Forbidden | ErrorKind::InvalidParam | ErrorKind::BadJson,
..,
) => {
debug_warn!("Incoming PDU {id} failed: {e:?}");
},
| _ => debug_error!("Incoming PDU {id} failed: {e:?}"),
}
}
}
@@ -381,6 +388,7 @@ async fn handle_room(
.rooms
.event_handler
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
.boxed()
.await
.map(|_| ());
results.push((event_id, result));
+14 -2
View File
@@ -375,7 +375,7 @@ pub struct Config {
#[serde(default = "default_max_request_size")]
pub max_request_size: usize,
/// default: 192
/// default: 1024
#[serde(default = "default_max_fetch_prev_events")]
pub max_fetch_prev_events: u16,
@@ -781,6 +781,16 @@ pub struct Config {
/// a substitute for moderation bots.
pub default_room_acl_deny: Option<Vec<String>>,
/// The number of forward extremities to tolerate in a room before
/// attempting to manually squash them with a "dummy event". Setting this
/// above 20 will hinder its efficacy, and setting it below 5 will cause
/// more dummy events to be sent than necessary (which increases federation
/// traffic).
///
/// default: 10
#[serde(default = "default_extremity_threshold")]
pub dummy_event_threshold: u8,
/// display: nested
#[serde(default)]
pub well_known: WellKnownConfig,
@@ -2549,7 +2559,7 @@ fn default_pusher_timeout() -> u64 { 60 }
fn default_pusher_idle_timeout() -> u64 { 15 }
fn default_max_fetch_prev_events() -> u16 { 192_u16 }
fn default_max_fetch_prev_events() -> u16 { 1024 }
fn default_max_concurrent_inbound_transactions() -> usize { 150 }
@@ -2652,6 +2662,8 @@ fn default_rocksdb_stats_level() -> u8 { 1 }
#[inline]
pub fn default_default_room_version() -> RoomVersionId { RoomVersionId::V12 }
fn default_extremity_threshold() -> u8 { 10 }
fn default_ip_range_denylist() -> Vec<String> {
vec![
"127.0.0.0/8".to_owned(),
+1 -1
View File
@@ -62,7 +62,7 @@ impl Default for PartialPdu {
fn default() -> Self {
Self {
event_type: "m.room.message".into(),
content: Box::<RawJsonValue>::default(),
content: to_raw_value("{}").unwrap(),
unsigned: None,
state_key: None,
redacts: None,
+4
View File
@@ -187,6 +187,10 @@ pub(super) fn open_list(db: &Arc<Engine>, maps: &[Descriptor]) -> Result<Maps> {
val_size_hint: Some(8),
..descriptor::RANDOM_SMALL
},
Descriptor {
name: "roomid_mindepth",
..descriptor::RANDOM_SMALL
},
Descriptor {
name: "roomserverids",
..descriptor::RANDOM_SMALL
@@ -1,235 +1,667 @@
use std::{
collections::{BTreeMap, HashSet, VecDeque, hash_map},
collections::{HashMap, HashSet, VecDeque},
time::Instant,
};
use assign::assign;
#[cfg(debug_assertions)]
use conduwuit::error;
use conduwuit::{
Event, PduEvent, debug, debug_warn, matrix::event::gen_event_id_canonical_json, trace,
utils::continue_exponential_backoff_secs, warn,
Err, Event, PduEvent, debug, debug_error, debug_info, debug_warn, err,
state_res::lexicographical_topological_sort,
trace,
utils::{IterStream, math::Expected, stream::BroadbandExt},
warn,
};
use futures::{StreamExt, future::select_ok};
use ruma::{
CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
api::federation::event::get_event,
CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
OwnedServerName, RoomId, ServerName, UInt,
api::federation::event::{get_event, get_missing_events},
int,
room_version_rules::RoomVersionRules,
};
use super::get_room_version_rules;
use crate::rooms::event_handler::parse_incoming_pdu::expect_event_id_array;
pub const GET_MISSING_EVENTS_MAX_BATCH_SIZE: usize = 50;
/// Attempts to build a localised directed acyclic graph out of the given PDUs,
/// returning them in a topologically sorted order.
///
/// This is used to attempt to process PDUs in an order that respects their
/// dependencies, however it is ultimately the sender's responsibility to send
/// them in a processable order, so this is just a best effort attempt. It does
/// not account for power levels or other tie breaks.
pub async fn build_local_dag<S: std::hash::BuildHasher + Send + Sync>(
pdu_map: &HashMap<OwnedEventId, &CanonicalJsonObject, S>,
) -> conduwuit::Result<Vec<OwnedEventId>> {
debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs");
let mut dag: HashMap<OwnedEventId, HashSet<OwnedEventId>> =
HashMap::with_capacity(pdu_map.len());
let mut id_origin_ts: HashMap<OwnedEventId, _> = HashMap::with_capacity(pdu_map.len());
for (event_id, value) in pdu_map {
// We already checked that these properties are correct in parse_incoming_pdu,
// so it's safe to unwrap here.
// We also filter to remove any prev_events that are not in this pdu_map, as we
// need to have at least one event with zero out degrees for the lexico-topo
// sort below. If there are multiple events with omitted prevs, they will be
// ordered by timestamp, then event ID. At that point though, it's unlikely to
// matter.
let prev_events = value
.get("prev_events")
.unwrap()
.as_array()
.unwrap()
.iter()
.map(|v| EventId::parse(v.as_str().unwrap()).unwrap())
.filter(|id| pdu_map.contains_key(id))
.collect();
dag.insert(event_id.clone(), prev_events);
let origin_server_ts = value
.get("origin_server_ts")
.and_then(CanonicalJsonValue::as_integer)
.unwrap_or_default();
id_origin_ts.insert(event_id.clone(), origin_server_ts);
}
debug!(count = dag.len(), "Sorting incoming events with partial graph");
lexicographical_topological_sort(&dag, &async |node_id| {
// Note: we don't bother fetching power levels because that would massively slow
// this function down. This is a best-effort attempt to order events correctly
// for processing, however ultimately that should be the sender's job.
let ts = id_origin_ts
.get(&node_id)
.copied()
.unwrap_or_else(|| int!(0))
.to_string()
.parse::<u64>()
.ok()
.and_then(UInt::new)
.unwrap_or_default();
Ok((int!(0), MilliSecondsSinceUnixEpoch(ts)))
})
.await
.inspect(|sorted| {
debug_assert_eq!(
sorted.len(),
pdu_map.len(),
"Sorted graph was not the same size as the input graph"
);
})
.map_err(|e| err!("failed to resolve local graph: {e}"))
}
impl super::Service {
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
/// it is appended to the outliers Tree.
/// Uses `POST /_matrix/federation/v1/get_missing_events/{room_id}` to fill
/// gaps in the DAG.
///
/// Returns pdu and if we fetched it over federation the raw json.
/// This function walks backwards from `head`, fetching incrementally (by a
/// factor of 10) more events until the remote we're fetching from either
/// stops returning new events, or the min_depth is reached.
///
/// a. Look in the main timeline (pduid_pdu tree)
/// b. Look at outlier pdu tree
/// c. Ask origin server over federation
/// d. TODO: Ask other servers over federation?
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
/// This function does not persist the events, but does validate them. The
/// caller is responsible for passing them through handle_incoming_pdu or
/// related functions.
///
/// Only the one `via` is asked for missing events, as multiplexing remotes
/// may result in the event tree being walked in a gappy or disordered
/// manner.
///
/// ## Parameters
///
/// - `room_id`: The room's ID.
/// - `head`: The event we are potentially missing prev_events for.
/// - `tail`: The most recently known events in the graph (typically forward
/// extremities).
/// - `via`: The server to ask for missing events.
/// - `min_depth`: Don't process events with a `depth` lower than this
/// value. Not massively useful, but can help short-circuit infinite loops
/// and weird edge paths.
#[tracing::instrument(name = "get_missing_events_bulk", skip_all)]
pub async fn get_missing_events(
&self,
origin: &'a ServerName,
events: Events,
create_event: &'a Pdu,
room_id: &'a RoomId,
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
where
Pdu: Event + Send + Sync,
Events: Iterator<Item = &'a EventId> + Clone + Send,
{
let back_off = |id| match self
.services
.globals
.bad_event_ratelimiter
.write()
.entry(id)
room_id: &RoomId,
head: &PduEvent,
tail: Vec<OwnedEventId>,
via: &ServerName,
min_depth: UInt,
) -> conduwuit::Result<HashMap<OwnedEventId, PduEvent>> {
let start = Instant::now();
#[cfg(debug_assertions)]
{
| hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
},
| hash_map::Entry::Occupied(mut e) => {
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
},
};
let mut events_with_auth_events = Vec::with_capacity(events.clone().count());
trace!("Fetching {} outlier pdus", events.clone().count());
for id in events {
// a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree
// (get_pdu_json checks both)
if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await {
trace!("Found {id} in main timeline or outlier tree");
events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![]));
continue;
}
// c. Ask origin server over federation
// We also handle its auth chain here so we don't get a stack overflow in
// handle_outlier_pdu.
let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into();
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
while let Some(next_id) = todo_auth_events.pop_front() {
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.get(&*next_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 60 * 2;
const MAX_DURATION: u64 = 60 * 60 * 8;
if continue_exponential_backoff_secs(
MIN_DURATION,
MAX_DURATION,
time.elapsed(),
*tries,
) {
debug_warn!(
tried = ?*tries,
elapsed = ?time.elapsed(),
"Backing off from {next_id}",
);
continue;
let missing_count = head
.prev_events()
.stream()
.fold(0_u8, |i, event_id| async move {
if self.services.timeline.pdu_exists(event_id).await {
i.expected_add(1)
} else {
i
}
}
})
.await;
debug_assert_ne!(
missing_count, 0,
"event passed to get_missing_events is not missing any events (wasteful call)"
);
};
assert!(!tail.is_empty(), "empty tail");
assert_ne!(via, self.services.globals.server_name(), "cannot ask ourselves for events");
if events_all.contains(&next_id) {
// The iteration limit is in place to ensure that if the remote server leaves us
// in a state of infinite recursion (as old versions of continuwuity and
// predecessors would), we give up. However, get_missing_events doesn't return
// that many events per-request. Synapse returns 20, and conduwuit+ return 50.
// This means with a hard iteration limit, we might give up too early, before
// we get a chance to even come close to max_fetch_prev_events. As such, we'll
// calculate the limit based on that config option and the aforementioned
// averages.
let max_fetch = self.services.server.config.max_fetch_prev_events;
let iteration_limit = max_fetch.saturating_div(20).max(10);
let mut discovered = HashMap::with_capacity(head.prev_events.len());
let mut latest_events: Vec<OwnedEventId> = vec![head.event_id().to_owned()];
debug!(elapsed=?start.elapsed(),
%room_id,
event_id=%head.event_id(),
%iteration_limit,
"Fetching any missing events for head event",
);
for iteration in 0..iteration_limit {
let limit = iteration
.expected_add(1)
.saturating_mul(10)
.min(GET_MISSING_EVENTS_MAX_BATCH_SIZE.try_into().expect(
"GET_MISSING_EVENTS_MAX_BATCH_SIZE (usize) should fit in u16 (<=65536)",
))
.max(
// This max call ensures we fetch *at least* all the prev events the
// head has.
u16::try_from(head.prev_events.len())
.expect("cannot have more than 20 prev events, which fits in u16"),
);
debug_info!(elapsed=?start.elapsed(),
%limit,
%via,
%iteration,
%iteration_limit,
discovered=discovered.len(),
%min_depth,
"Attempting to gap fill missing events"
);
let response: get_missing_events::v1::Response = self
.services
.sending
.send_federation_request(
via,
assign!(
get_missing_events::v1::Request::new(
room_id.to_owned(),
tail.clone(),
latest_events.clone()
),
{limit: limit.into(), min_depth}
),
)
.await?;
if response.events.is_empty() {
debug_info!(
elapsed=?start.elapsed(),
%via,
"Finished gap filling missing events (remote returned no more events)."
);
break;
}
debug_info!(
elapsed=?start.elapsed(),
"Got {} events back from remote",
response.events.len()
);
latest_events.clear();
for raw_event in response.events {
let (_, event_id, pdu_json) = self.parse_incoming_pdu(&raw_event).await?;
let pdu = PduEvent::from_id_val(&event_id, pdu_json).map_err(|e| {
err!(Request(BadJson("Failed to parse gapfilled event {event_id}: {e}")))
})?;
if discovered.contains_key(&event_id) {
// We already received this event.
trace!("Already received {event_id}");
continue;
}
if self.services.timeline.pdu_exists(&next_id).await {
trace!("Found {next_id} in db");
continue;
}
debug!("Fetching {next_id} over federation from {origin}.");
match self
if self
.services
.sending
.send_federation_request(
origin,
get_event::v1::Request::new((*next_id).to_owned()),
)
.timeline
.non_outlier_pdu_exists(&event_id)
.await
{
| Ok(res) => {
debug!("Got {next_id} over federation from {origin}");
let Ok(room_version_rules) = get_room_version_rules(create_event) else {
back_off((*next_id).to_owned());
continue;
};
let Ok((calculated_event_id, value)) =
gen_event_id_canonical_json(&res.pdu, &room_version_rules)
else {
back_off((*next_id).to_owned());
continue;
};
if calculated_event_id != *next_id {
warn!(
"Server didn't return event id we requested: requested: \
{next_id}, we got {calculated_event_id}. Event: {:?}",
&res.pdu
);
}
if let Some(auth_events) = value
.get("auth_events")
.and_then(CanonicalJsonValue::as_array)
{
for auth_event in auth_events {
match serde_json::from_value::<OwnedEventId>(
auth_event.clone().into(),
) {
| Ok(auth_event) => {
trace!(
"Found auth event id {auth_event} for event \
{next_id}"
);
todo_auth_events.push_back(auth_event);
},
| _ => {
warn!("Auth event id is not valid");
},
}
}
} else {
warn!("Auth event list invalid");
}
events_in_reverse_order.push((next_id.clone(), value));
events_all.insert(next_id);
},
| Err(e) => {
warn!("Failed to fetch auth event {next_id} from {origin}: {e}");
back_off((*next_id).to_owned());
},
// NOTE: we explicitly check for *non*-outlier events here, as if we end
// up discovering outlier events, we will be able to upgrade them
// immediately.
trace!("Already have {event_id} as a timeline PDU");
continue;
}
}
events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order));
}
if pdu.depth < min_depth {
debug_warn!(
elapsed=?start.elapsed(),
"Received PDU with depth {} below min_depth {}",
pdu.depth,
min_depth
);
discovered.insert(event_id.clone(), pdu);
continue;
}
let mut pdus = Vec::with_capacity(events_with_auth_events.len());
for (id, local_pdu, events_in_reverse_order) in events_with_auth_events {
// a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree
// (get_pdu_json checks both)
if let Some(local_pdu) = local_pdu {
trace!("Found {id} in main timeline or outlier tree");
pdus.push((local_pdu.clone(), None));
}
for (next_id, value) in events_in_reverse_order.into_iter().rev() {
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.get(&*next_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 5 * 60;
const MAX_DURATION: u64 = 60 * 60 * 24;
if continue_exponential_backoff_secs(
MIN_DURATION,
MAX_DURATION,
time.elapsed(),
*tries,
) {
debug!("Backing off from {next_id}");
for prev_event_id in pdu.prev_events() {
if discovered.contains_key(prev_event_id) {
// We already received this event.
trace!("Already received prev event {prev_event_id}");
continue;
}
if self
.services
.timeline
.non_outlier_pdu_exists(prev_event_id)
.await
{
// NOTE: we explicitly check for *non*-outlier events here, as if we end
// up discovering outlier events, we will be able to upgrade them
// immediately.
trace!("Already have prev event {prev_event_id} as a timeline PDU");
continue;
}
if let Ok(outlier) = self.services.timeline.get_pdu(prev_event_id).await {
// We already have this PDU as an outlier, don't ask for
// it. However, if we are missing any prev events for it, add it to the
// latest events anyway.
let outlier_missing_prevs = outlier
.prev_events()
.stream()
.fold(0_u8, |i, event_id| async move {
if self.services.timeline.pdu_exists(event_id).await {
i.expected_add(1)
} else {
i
}
})
.await;
if outlier_missing_prevs > 0 {
trace!("Missing {outlier_missing_prevs} PDU(s) for prev event");
latest_events.push(prev_event_id.to_owned());
}
trace!("Had {prev_event_id} as an outlier already, skipping discovery");
discovered.insert(prev_event_id.to_owned(), outlier);
continue;
}
trace!("Missing prev {prev_event_id} of {event_id}");
latest_events.push(prev_event_id.to_owned());
}
trace!("Discovered {event_id}");
discovered.insert(event_id.clone(), pdu);
}
if latest_events.is_empty() {
debug!(elapsed=?start.elapsed(),
%limit,
%via,
%iteration,
discovered=discovered.len(),
"No more events to fetch."
);
break;
}
if discovered.len() >= self.services.server.config.max_fetch_prev_events.into() {
// Stupid hack, debug_error!() drops the log to a DEBUG when not in debug mode,
// which is bad because this should at least produce a warning. It's an error in
// debug mode because this can be important, but typically not much can be done
// about it as a user.
#[cfg(debug_assertions)]
error!(elapsed=?start.elapsed(),
discovered=discovered.len(),
max_fetch_prev_events=self.services.server.config.max_fetch_prev_events,
%iteration,
%iteration_limit,
%via,
event_id=%head.event_id(),
%room_id,
"Encountered a gap too large to fill, giving up"
);
#[cfg(not(debug_assertions))]
warn!(elapsed=?start.elapsed(),
discovered=discovered.len(),
max_fetch_prev_events=self.services.server.config.max_fetch_prev_events,
%iteration,
%iteration_limit,
%via,
event_id=%head.event_id(),
%room_id,
"Encountered a gap too large to fill"
);
break;
}
}
trace!(elapsed=?start.elapsed(), "Finished get_missing_events");
Ok(discovered)
}
/// Sends a `GET /_matrix/federation/v1/event/{event_id}` request to the
/// target `remote`, parses the resulting PDU, and ensures the remote
/// returned the correct event.
/// Allows `fetch_and_handle_missing_events` to atomically fetch events from
/// multiple remotes in parallel.
async fn fetch_event_via(
&self,
remote: OwnedServerName,
event_id: OwnedEventId,
room_version_rules: &RoomVersionRules,
) -> conduwuit::Result<(OwnedEventId, CanonicalJsonObject)> {
let res = self
.services
.sending
.send_federation_request(&remote, get_event::v1::Request::new(event_id.clone()))
.await?;
let (calculated_event_id, value) = self
.parse_incoming_pdu_with_known_room(&res.pdu, room_version_rules)
.await?;
if calculated_event_id != event_id {
Err!(Request(BadJson(warn!(
expected=%event_id,
received=%calculated_event_id,
"Server didn't return event id we requested",
))))
} else {
Ok((event_id, value))
}
}
async fn fetch_event_vias(
&self,
candidates: impl Iterator<Item = &OwnedServerName>,
event_id: &EventId,
room_version_rules: &RoomVersionRules,
) -> conduwuit::Result<(OwnedEventId, CanonicalJsonObject)> {
if let Ok(pdu_json) = self.services.timeline.get_pdu_json(event_id).await {
return Ok((event_id.to_owned(), pdu_json));
}
let futures = candidates
.map(|remote| {
Box::pin(self.fetch_event_via(
remote.to_owned(),
event_id.to_owned(),
room_version_rules,
))
})
.collect::<Vec<_>>();
select_ok(futures).await.map(|(res, _)| res)
}
/// Asks remote servers for any individual events that are missing, also
/// known as "atomic fetch". Should only be used for fetching missing auth
/// events or resolving missing events from state_ids. For all other uses,
/// use get_missing_events.
///
/// This function manually walks auth_events trees in a breadth-first
/// search, and persists all fetched events as outliers when all the
/// backwards extremities have been resolved.
#[tracing::instrument(name = "get_missing_auth_events_atomic", skip_all)]
pub(super) async fn fetch_and_handle_auth_events<Pdu>(
&self,
origin: &ServerName,
events: Vec<OwnedEventId>,
create_event: &Pdu,
room_id: &RoomId,
) -> HashMap<OwnedEventId, PduEvent>
where
Pdu: Event + Send + Sync,
{
let start = Instant::now();
let room_version_rules =
&get_room_version_rules(create_event).unwrap_or(RoomVersionRules::V1);
let mut candidates = self
.services
.timeline
.candidate_backfill_servers(room_id)
.await;
candidates.insert(origin.to_owned());
assert!(!candidates.is_empty(), "no candidates to fetch missing events from");
let mut discovered_events =
HashMap::with_capacity(events.len().saturating_add(events.len().saturating_mul(3)));
trace!(
elapsed=?start.elapsed(),
"Fetching {} unknown PDUs on demand from {} candidates",
events.len(),
candidates.len()
);
let mut seen: HashMap<OwnedEventId, u8> = HashMap::new();
for apex_event_id in &events {
let mut todo: VecDeque<OwnedEventId> = [apex_event_id.to_owned()].into();
while let Some(target_id) = todo.pop_front() {
if discovered_events.contains_key(&target_id) {
continue;
}
if let Ok(local_pdu) = self.services.timeline.get_pdu(&target_id).await {
trace!(elapsed=?start.elapsed(), "Found {target_id} in db");
let mut obj = local_pdu.into_canonical_object();
obj.remove("event_id");
discovered_events.insert(target_id.clone(), obj);
continue;
}
let attempts = seen.get(&*target_id).copied().unwrap_or_default();
if attempts >= 5 {
debug_error!(
elapsed=?start.elapsed(),
%attempts,
%target_id,
"Could not fetch missing event after 5 attempts, giving up"
);
continue;
}
trace!("Handling outlier {next_id}");
match Box::pin(self.handle_outlier_pdu(
origin,
create_event,
&next_id,
room_id,
value.clone(),
true,
))
.await
debug!(elapsed=?start.elapsed(),"Fetching {target_id} over federation");
let value = match self
.fetch_event_vias(candidates.iter(), &target_id, room_version_rules)
.await
{
| Ok((pdu, json)) =>
if next_id == *id {
trace!("Handled outlier {next_id} (original request)");
pdus.push((pdu, Some(json)));
},
| Ok((_, x)) => x,
| Err(e) => {
warn!("Authentication of event {next_id} failed: {e:?}");
back_off(next_id);
warn!(elapsed=?start.elapsed(),"failed to fetch missing event {target_id} from any candidate: {e}");
continue;
},
};
let auth_events =
match expect_event_id_array(&value, "auth_events").map_err(|e| {
err!(Request(BadJson(warn!(
elapsed=?start.elapsed(),
event_id=%target_id,
"Failed to parse event fetched from remote: {e}"
))))
}) {
| Ok(auth_events) => auth_events,
| Err(e) => {
warn!(
elapsed=?start.elapsed(),
?e,
"event {target_id} is malformed (bad auth_events), skipping"
);
continue;
},
};
let mut have_all_auth = true;
for auth_event_id in auth_events {
if let Ok(local_pdu) = self.services.timeline.get_pdu(&auth_event_id).await {
trace!(elapsed=?start.elapsed(),"Found auth event {auth_event_id} in db");
let mut obj = local_pdu.into_canonical_object();
obj.remove("event_id");
discovered_events.insert(auth_event_id.clone(), obj);
continue;
}
if discovered_events.contains_key(&auth_event_id) {
trace!(elapsed=?start.elapsed(),%auth_event_id, "Already found auth event");
continue;
}
debug!(elapsed=?start.elapsed(),"Missing auth event {auth_event_id} for event {target_id}");
seen.insert(
auth_event_id.clone(),
seen.get(&auth_event_id)
.copied()
.unwrap_or_default()
.saturating_add(1),
);
todo.push_back(auth_event_id);
have_all_auth = false;
}
// Insert this PDU back at the end of the queue so that it will be resolved once
// all of its auth events have been fetched.
if have_all_auth {
debug!(elapsed=?start.elapsed(),%target_id, "Have all auth events");
discovered_events.insert(target_id, value);
} else {
debug_warn!(elapsed=?start.elapsed(),
"Fetched {target_id} but missing some auth events, will have to re-fetch."
);
seen.insert(target_id.clone(), attempts.saturating_add(1));
todo.push_back(target_id);
}
}
}
trace!("Fetched and handled {} outlier pdus", pdus.len());
let refmap: HashMap<OwnedEventId, &CanonicalJsonObject> = discovered_events
.iter()
.map(|(id, data)| (id.clone(), data))
.collect();
let seeded_ordered = build_local_dag(&refmap)
.await
.expect("failed to build local DAG");
let mut pdus = HashMap::with_capacity(seeded_ordered.len());
for discovered_event_id in seeded_ordered {
let pdu_json = discovered_events.remove(&discovered_event_id).unwrap();
debug_info!(
elapsed=?start.elapsed(),
"Handling missing event {discovered_event_id} as outlier"
);
assert_eq!(pdu_json.get("event_id"), None, "pdu_json had event_id");
match Box::pin(self.handle_outlier_pdu(
origin,
create_event,
&discovered_event_id,
room_id,
pdu_json,
))
.await
{
| Ok((pdu, _)) => {
trace!(elapsed=?start.elapsed(), "Persisted {discovered_event_id}");
let _ = pdus.insert(discovered_event_id, pdu);
},
| Err(e) => warn!(
elapsed=?start.elapsed(),
"Authentication of event {discovered_event_id} failed: {e:?}"
),
}
}
trace!(
elapsed=?start.elapsed(),
"Finished fetch_and_handle_missing_events: fetched and handled {} missing PDUs",
pdus.len()
);
pdus.retain(|id, _| events.contains(id)); // Only return state events
trace!(elapsed=?start.elapsed(), "Filtered return value down to {} PDUs", pdus.len());
pdus
}
/// Similar to `fetch_and_handle_missing_events`, but simply walks the
/// prev events tree instead of the auth events tree. Additionally, it does
/// not *handle* fetched PDUs in any capacity.
#[tracing::instrument(name = "get_missing_prev_events_atomic", skip_all)]
pub(super) async fn fetch_prev_events<Pdu>(
&self,
origin: &ServerName,
events: Vec<OwnedEventId>,
create_event: &Pdu,
room_id: &RoomId,
) -> HashMap<OwnedEventId, PduEvent>
where
Pdu: Event + Send + Sync,
{
let room_version_rules =
&get_room_version_rules(create_event).unwrap_or(RoomVersionRules::V1);
let mut candidates = self
.services
.timeline
.candidate_backfill_servers(room_id)
.await;
candidates.insert(origin.to_owned());
let mut todo: VecDeque<OwnedEventId> = VecDeque::from(events);
let mut discovered_events = HashMap::new();
while let Some(next_id) = todo.pop_front() {
if discovered_events.len() >= self.services.server.config.max_fetch_prev_events.into()
{
debug_warn!(
"Encountered a gap too large to fill, giving up (fetched {} events)",
discovered_events.len()
);
break;
}
if discovered_events.contains_key(&next_id) {
continue;
}
let pdu = match self
.fetch_event_vias(candidates.iter(), &next_id, room_version_rules)
.await
{
| Ok((_, data)) => data,
| Err(e) => {
warn!("Failed to fetch prev event {next_id} from any candidate: {e}");
continue;
},
};
let prev_events = match expect_event_id_array(&pdu, "prev_events").map_err(|e| {
err!(Request(BadJson(warn!(
event_id=%next_id,
"Failed to parse event fetched from remote: {e}"
))))
}) {
| Ok(auth_events) => auth_events,
| Err(e) => {
warn!(?e, "event {next_id} is malformed (bad prev_events), skipping");
continue;
},
};
let missing_prev = prev_events
.iter()
.stream()
.broad_filter_map(|event_id| async {
if discovered_events.contains_key(event_id)
|| self.services.timeline.pdu_exists(event_id).await
{
None
} else {
Some(event_id.to_owned())
}
})
.collect::<Vec<_>>()
.await;
todo.extend(missing_prev);
discovered_events.insert(
next_id.clone(),
PduEvent::from_id_val(&next_id, pdu).expect("fetched PDU was already validated"),
);
}
discovered_events
}
}
+134 -103
View File
@@ -1,124 +1,155 @@
use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque},
iter::once,
};
use std::{collections::HashMap, time::Instant};
use conduwuit::{
Event, PduEvent, Result, debug_warn, err,
state_res::{self},
};
use futures::{FutureExt, future};
use ruma::{
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
int, uint,
Event, PduEvent, debug, debug_info, debug_warn, trace,
utils::{BoolExt, IterStream, stream::BroadbandExt},
};
use futures::StreamExt;
use ruma::{CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName};
use super::check_room_id;
use crate::rooms::event_handler::build_local_dag;
impl super::Service {
#[allow(clippy::type_complexity)]
pub(super) async fn fetch_prev<'a, Pdu, Events>(
/// Fetches any missing prev_events for this event and persists them before
/// returning.
pub(super) async fn fetch_prevs(
&self,
origin: &ServerName,
create_event: &Pdu,
room_id: &RoomId,
create_event: &PduEvent,
incoming_pdu: &PduEvent,
origin: &ServerName,
first_ts_in_room: MilliSecondsSinceUnixEpoch,
initial_set: Events,
) -> Result<(
Vec<OwnedEventId>,
HashMap<OwnedEventId, (PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
)>
where
Pdu: Event + Send + Sync,
Events: Iterator<Item = &'a EventId> + Clone + Send,
{
let num_ids = initial_set.clone().count();
let mut eventid_info = HashMap::new();
let mut graph: HashMap<OwnedEventId, _> = HashMap::with_capacity(num_ids);
let mut todo_outlier_stack: VecDeque<OwnedEventId> =
initial_set.map(ToOwned::to_owned).collect();
) -> conduwuit::Result<()> {
let start = Instant::now();
let mut missing = incoming_pdu
.prev_events()
.stream()
.broad_filter_map(|event_id| async move {
self.services
.timeline
.get_non_outlier_pdu_json(event_id)
.await
.is_ok()
.or(|| event_id.to_owned())
})
.collect::<Vec<_>>()
.await;
if missing.is_empty() {
debug!(elapsed=?start.elapsed(), event_id=%incoming_pdu.event_id(), "No missing prev events.");
return Ok(());
}
debug!(elapsed=?start.elapsed(), %room_id, event_id=%incoming_pdu.event_id(), ?missing, "Fetching previous events");
let tail = self
.services
.state
.get_forward_extremities(room_id)
.collect::<Vec<_>>()
.await;
let mut amount = 0;
let mut gapfilled = self
.get_missing_events(
room_id,
incoming_pdu,
tail,
origin,
self.services
.metadata
.get_mindepth(room_id)
.await
.saturating_sub(
u8::try_from(incoming_pdu.prev_events.len())
.unwrap()
.saturating_mul(2)
.into(),
),
)
.await?;
debug_info!(elapsed=?start.elapsed(), "Fetched {} missing events", gapfilled.len());
missing.retain(|eid| !gapfilled.contains_key(eid));
if !missing.is_empty() {
debug_warn!(elapsed=?start.elapsed(), "Still missing {} events, falling back to atomic fetch.", missing.len());
gapfilled.extend(
self.fetch_prev_events(origin, missing, create_event, room_id)
.await,
);
}
while let Some(prev_event_id) = todo_outlier_stack.pop_front() {
self.services.server.check_running()?;
// Persist all fetched events
let mapped = gapfilled
.iter()
.map(|(eid, evt)| {
let mut obj = evt.to_canonical_object();
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
(eid.clone(), obj)
})
.collect::<HashMap<_, _>>();
let to_persist = if mapped.len() <= 1 {
mapped.keys().map(ToOwned::to_owned).collect()
} else {
let refmap: HashMap<OwnedEventId, &CanonicalJsonObject> =
mapped.iter().map(|(id, data)| (id.clone(), data)).collect();
build_local_dag(&refmap).await?
};
let job_start = Instant::now();
trace!("Starting to persist {} prev events", to_persist.len());
for (i, event_id) in to_persist.iter().enumerate() {
debug!(
elapsed=?start.elapsed(),
"Persisting fetched prev event: {event_id} ({}/{})",
i.saturating_add(1),
to_persist.len(),
);
let obj = mapped.get(event_id).cloned().unwrap();
let persist_start = Instant::now();
match self
.fetch_and_handle_outliers(
origin,
once(prev_event_id.as_ref()),
create_event,
room_id,
)
.boxed()
.handle_outlier_pdu(origin, create_event, event_id, room_id, obj)
.await
.pop()
{
| Some((pdu, mut json_opt)) => {
check_room_id(room_id, &pdu)?;
let limit = self.services.server.config.max_fetch_prev_events;
if amount > limit {
debug_warn!("Max prev event limit reached! Limit: {limit}");
graph.insert(prev_event_id.clone(), HashSet::new());
continue;
}
if json_opt.is_none() {
json_opt = self
.services
.outlier
.get_outlier_pdu_json(&prev_event_id)
.await
.ok();
}
if let Some(json) = json_opt {
if pdu.origin_server_ts() > first_ts_in_room {
amount = amount.saturating_add(1);
for prev_prev in pdu.prev_events() {
if !graph.contains_key(prev_prev) {
todo_outlier_stack.push_back(prev_prev.to_owned());
}
}
graph.insert(
prev_event_id.clone(),
pdu.prev_events().map(ToOwned::to_owned).collect(),
);
} else {
// Time based check failed
graph.insert(prev_event_id.clone(), HashSet::new());
}
eventid_info.insert(prev_event_id.clone(), (pdu, json));
} else {
// Get json failed, so this was not fetched over federation
graph.insert(prev_event_id.clone(), HashSet::new());
}
},
| _ => {
// Fetch and handle failed
graph.insert(prev_event_id.clone(), HashSet::new());
| Ok((pdu, val)) if pdu.origin_server_ts() >= first_ts_in_room => {
Box::pin(self.upgrade_outlier_to_timeline_pdu(
pdu,
val,
create_event,
origin,
room_id,
))
.await
.inspect_err(|e| {
debug_warn!(
total_elapsed=?start.elapsed(),
job_elapsed=?job_start.elapsed(),
task_elapsed=?persist_start.elapsed(),
"Failed to upgrade prev event {event_id}: {e}",
);
})
.inspect(|_| {
debug_info!(
total_elapsed=?start.elapsed(),
job_elapsed=?job_start.elapsed(),
task_elapsed=?persist_start.elapsed(),
"Upgraded prev event {event_id}",
);
})
.ok();
},
| Err(e) => debug_warn!(
total_elapsed=?start.elapsed(),
job_elapsed=?job_start.elapsed(),
task_elapsed=?persist_start.elapsed(),
"Failed to persist prev event {event_id}: {e}",
),
| _ => {},
}
}
let event_fetch = |event_id| {
let origin_server_ts = eventid_info
.get(&event_id)
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get());
// This return value is the key used for sorting events,
// events are then sorted by power level, time,
// and lexically by event_id.
future::ok((int!(0), MilliSecondsSinceUnixEpoch(origin_server_ts)))
};
let sorted = state_res::lexicographical_topological_sort(&graph, &event_fetch)
.await
.map_err(|e| err!(Database(error!("Error sorting prev events: {e}"))))?;
Ok((sorted, eventid_info))
// NOTE because i keep forgetting: the caller persists incoming_pdu.
// we only care about its prev events
trace!(
total_elapsed=?start.elapsed(),
persist_elapsed=?job_start.elapsed(),
);
Ok(())
}
}
+352 -49
View File
@@ -1,34 +1,43 @@
use std::collections::{HashMap, hash_map};
use conduwuit::{Err, Event, Result, debug, debug_warn, err};
use futures::FutureExt;
use ruma::{
EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids,
events::StateEventType,
use std::{
cmp::max,
collections::{HashMap, HashSet, hash_map},
hash::{BuildHasherDefault, DefaultHasher},
time::{Duration, Instant},
};
use crate::rooms::short::ShortStateKey;
use conduwuit::{
Err, Event, PduEvent, Result, debug, debug_warn, err, info, trace,
utils::{BoolExt, IterStream},
warn,
};
use futures::{StreamExt, TryFutureExt, future::select_ok};
use ruma::{
EventId, OwnedEventId, OwnedRoomId, RoomId, ServerName,
api::federation::event::{get_room_state, get_room_state_ids},
};
use crate::{conduwuit::utils::stream::BroadbandExt, rooms::short::ShortStateKey};
impl super::Service {
/// Call /state_ids to find out what the state at this pdu is. We trust the
/// server's response to some extent (sic), but we still do a lot of checks
/// on the events
#[tracing::instrument(
level = "debug",
skip_all,
fields(%origin),
)]
pub(super) async fn fetch_state<Pdu>(
/// Asks a remote server what the state at this event is.
/// It first attempts to call `GET /_matrix/federation/v1/state_ids` (fast).
/// If any events are missing, they are fetched from the remote, and
/// persisted as outliers, before being returned back to this function. If
/// we are missing a lot of events locally (>=50), this function falls back
/// to requesting the full state in PDU format from the remote (`GET
/// /_matrix/federation/v1/state, very slow in large rooms), and persists
/// them directly.
#[tracing::instrument(skip_all)]
pub(super) async fn fetch_state(
&self,
origin: &ServerName,
create_event: &Pdu,
create_event: &PduEvent,
room_id: &RoomId,
event_id: &EventId,
) -> Result<Option<HashMap<u64, OwnedEventId>>>
where
Pdu: Event + Send + Sync,
{
let res = self
) -> Result<HashMap<u64, OwnedEventId>> {
let start = Instant::now();
trace!(%origin, "Asking remote for state_ids");
let res: get_room_state_ids::v1::Response = match self
.services
.sending
.send_federation_request(
@@ -36,21 +45,189 @@ pub(super) async fn fetch_state<Pdu>(
get_room_state_ids::v1::Request::new(event_id.to_owned(), room_id.to_owned()),
)
.await
.inspect_err(|e| debug_warn!("Fetching state for event failed: {e}"))?;
.inspect_err(
|e| debug_warn!(elapsed=?start.elapsed(), "Fetching state for event failed: {e}"),
) {
| Ok(resp) => Ok(resp),
| Err(e) =>
if e.is_not_found() {
self.fetch_state_ids_from_backfill_servers(
event_id.to_owned(),
room_id.to_owned(),
)
.await
} else {
Err(e)
},
}?;
debug!("Fetching state events");
let state_ids = res.pdu_ids.iter().map(AsRef::as_ref);
let state_vec = self
.fetch_and_handle_outliers(origin, state_ids, create_event, room_id)
.boxed()
debug!(elapsed=?start.elapsed(), events = res.pdu_ids.len(), "Fetching state events");
let mut state_events: HashMap<OwnedEventId, PduEvent> =
HashMap::with_capacity(res.pdu_ids.len());
let to_fetch: Vec<OwnedEventId> = res
.pdu_ids
.clone()
.into_iter()
.stream()
.broad_filter_map(|event_id| async move {
self.services
.timeline
.pdu_exists(&event_id)
.await
.or_some(event_id)
})
.collect()
.await;
if to_fetch.is_empty() {
debug!(elapsed=?start.elapsed(), "All required state events are already known.");
state_events = res
.pdu_ids
.iter()
.stream()
.broad_filter_map(|event_id| async move {
Some((
event_id.clone(),
self.services
.timeline
.get_pdu(event_id)
.await
.expect("Event disappeared between filtering and fetching"),
))
})
.collect()
.await;
assert_eq!(
state_events.len(),
res.pdu_ids.len(),
"Failed to load all required state events despite allegedly knowing all of them \
already",
);
} else {
let total_count = res.pdu_ids.len();
let missing_count = to_fetch.len();
let missing_threshold = max(50, total_count >> 2);
if missing_count >= missing_threshold {
// If there's more than 50 events to fetch, or we're missing 25% or more of the
// state, we would need to make a lot of atomic requests, so we'll just try
// to fetch the full state from the remote instead.
// Since this endpoint might fail in huge rooms, we fall back to atomic fetch
// anyway.
warn!(
elapsed=?start.elapsed(),
%missing_count,
%total_count,
%missing_threshold,
"Fetching full state from remote server for event"
);
let state_response = tokio::time::timeout(
Duration::from_secs(30),
self.fetch_full_state(origin, create_event, room_id, event_id),
)
.await;
info!(
elapsed=?start.elapsed(),
%missing_count,
%total_count,
%missing_threshold,
"Fetched full state from remote server for event"
);
let fetched_state = match state_response {
| Ok(Ok(state)) => {
// Filter to ensure we only use the PDUs we were expecting, preventing
// arbitrary state injection.
// Atomic fetch does not have this problem as each PDU is evaluated
// individually.
let expected: &HashSet<OwnedEventId, BuildHasherDefault<DefaultHasher>> =
&HashSet::from_iter(res.pdu_ids.clone());
state
.into_iter()
.stream()
.broad_filter_map(|(event_id, pdu)| async move {
expected.contains(&event_id).then_some((event_id, pdu))
})
.collect()
.await
},
| Ok(Err(e)) => {
warn!(
elapsed=?start.elapsed(),
error=?e,
%origin,
"Failed to fetch full state from remote, falling back to atomic fetch"
);
self.fetch_and_handle_auth_events(
origin,
res.pdu_ids.clone(),
create_event,
room_id,
)
.await
},
| Err(e) => {
warn!(
elapsed=?start.elapsed(),
error=?e,
%origin,
"Remote did not return room state in an acceptable timeframe, falling back to atomic fetch"
);
self.fetch_and_handle_auth_events(
origin,
res.pdu_ids.clone(),
create_event,
room_id,
)
.await
},
};
assert!(
!fetched_state.is_empty(),
"fetch_full_state or fetch_and_handle_missing_events returned empty state \
map"
);
state_events.extend(fetched_state);
} else {
state_events = res
.pdu_ids
.iter()
.stream()
.broad_filter_map(|event_id| async move {
self.services
.timeline
.get_pdu(event_id)
.await
.map(|p| (event_id.to_owned(), p))
.ok()
})
.collect()
.await;
assert!(
!state_events.is_empty(),
"Only missing {} events but read-ahead state vec was empty",
to_fetch.len()
);
debug!(
elapsed=?start.elapsed(),
to_fetch = to_fetch.len(),
"Fetching missing events for state from remote"
);
let fetched_state = self
.fetch_and_handle_auth_events(origin, to_fetch, create_event, room_id)
.await;
state_events.extend(fetched_state);
}
}
if state_events.is_empty() {
return Ok(HashMap::new());
}
let mut state: HashMap<ShortStateKey, OwnedEventId> =
HashMap::with_capacity(state_vec.len());
for (pdu, _) in state_vec {
let state_key = pdu
.state_key()
.ok_or_else(|| err!(Database("Found non-state pdu in state events.")))?;
HashMap::with_capacity(state_events.len());
debug!(elapsed=?start.elapsed(), events = state_events.len(), "Processing state events");
for (event_id, pdu) in state_events {
let state_key = pdu.state_key().ok_or_else(|| {
err!(Request(BadJson("Found non-state pdu in state events: {event_id}")))
})?;
let shortstatekey = self
.services
@@ -62,28 +239,154 @@ pub(super) async fn fetch_state<Pdu>(
| hash_map::Entry::Vacant(v) => {
v.insert(pdu.event_id().to_owned());
},
| hash_map::Entry::Occupied(_) => {
return Err!(Database(
"State event's type and state_key combination exists multiple times: \
{}, {}",
| hash_map::Entry::Occupied(existing) => {
return Err!(Request(Forbidden(
"State event's type and state_key combination exists multiple times \
({event_id} + {}): ({}, \"{}\")",
existing.get(),
pdu.kind(),
state_key
));
state_key,
)));
},
}
}
trace!(elapsed=?start.elapsed(), "fetch_state finished");
Ok(state)
}
// The original create event must still be in the state
let create_shortstatekey = self
async fn fetch_state_ids_from_backfill_servers(
&self,
event_id: OwnedEventId,
room_id: OwnedRoomId,
) -> Result<get_room_state_ids::v1::Response> {
let candidates = self
.services
.short
.get_shortstatekey(&StateEventType::RoomCreate, "")
.await?;
if state.get(&create_shortstatekey).map(AsRef::as_ref) != Some(create_event.event_id()) {
return Err!(Database("Incoming event refers to wrong create event."));
.timeline
.candidate_backfill_servers(&room_id)
.await;
if candidates.is_empty() {
return Err!(Request(NotFound(
"Cannot ask any other servers for the state at this event"
)));
}
debug!(%room_id, ?candidates, "Asking backfill servers for state_ids");
let futures = candidates.iter().map(|server_name| {
Box::pin(
self.services
.sending
.send_federation_request(
server_name,
get_room_state_ids::v1::Request::new(event_id.clone(), room_id.clone()),
)
.inspect_err(|e| {
debug_warn!("Fallback fetching state for event failed: {e}");
}),
)
});
Ok(select_ok(futures).await?.0)
}
Ok(Some(state))
/// Fetches the full state via `GET /_matrix/federation/v1/state` from a
/// remote server, and persists all the incoming auth chain events and
/// state events as outliers, for use later.
///
/// Any events that cannot be persisted are dropped with a warning.
pub(super) async fn fetch_full_state(
&self,
origin: &ServerName,
create_event: &PduEvent,
room_id: &RoomId,
event_id: &EventId,
) -> Result<HashMap<OwnedEventId, PduEvent>> {
let start = Instant::now();
trace!("Fetching full state from remote server");
let res: get_room_state::v1::Response = self
.services
.sending
.send_federation_request(
origin,
get_room_state::v1::Request::new(event_id.to_owned(), room_id.to_owned()),
)
.await
.inspect_err(|e| debug_warn!("Fetching state for event failed: {e}"))?;
debug!(elapsed=?start.elapsed(), count = res.auth_chain.len(), "Handling incoming auth chain...");
res.auth_chain
.iter()
.stream()
.broad_filter_map(|raw_event_json| async {
if let Some(parsed) = self.parse_incoming_pdu(raw_event_json).await.ok()
&& parsed.0 == room_id
{
Some(parsed)
} else {
None
}
})
.for_each_concurrent(
None,
|(incoming_room_id, incoming_event_id, incoming_event_json)| async move {
self.handle_outlier_pdu(
origin,
create_event,
&incoming_event_id,
&incoming_room_id,
incoming_event_json,
)
.await
.inspect_err(|e| {
warn!(
%incoming_room_id,
%incoming_event_id,
?e,
"Failed to handle auth chain event from state fetch"
);
})
.ok();
},
)
.await;
debug!(elapsed=?start.elapsed(), count = res.pdus.len(), "Handling incoming state PDUs...");
let r = res
.pdus
.iter()
.stream()
.broad_filter_map(|raw_event_json| async {
if let Some(parsed) = self.parse_incoming_pdu(raw_event_json).await.ok()
&& parsed.0 == room_id
{
Some(parsed)
} else {
None
}
})
.broad_filter_map(
|(incoming_room_id, incoming_event_id, incoming_event_json)| async move {
self.handle_outlier_pdu(
origin,
create_event,
&incoming_event_id,
&incoming_room_id,
incoming_event_json,
)
.await
.inspect_err(|e| {
warn!(
elapsed=?start.elapsed(),
%incoming_room_id,
%incoming_event_id,
?e,
"Failed to handle state event from state fetch"
);
})
.ok()
},
)
.fold(HashMap::new(), |mut acc, (event, _)| async move {
acc.insert(event.event_id().to_owned(), event);
acc
})
.await;
trace!(elapsed=?start.elapsed(), "fetch_full_state finished");
Ok(r)
}
}
@@ -1,14 +1,14 @@
use std::{
collections::{BTreeMap, hash_map},
time::Instant,
collections::BTreeMap,
time::{Duration, Instant},
};
use conduwuit::{
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, debug_error, debug_info, defer, err,
info, trace, utils::stream::IterStream, warn,
Err, Event, PduEvent, Result, debug, debug_error, debug_info, debug_warn, defer, err, error,
info, matrix::PartialPdu, result::DebugInspect, trace, warn,
};
use futures::{
FutureExt, TryFutureExt, TryStreamExt,
FutureExt, StreamExt,
future::{OptionFuture, try_join4},
};
use ruma::{
@@ -18,7 +18,7 @@
room::member::{MembershipState, RoomMemberEventContent},
},
};
use tracing::debug;
use tokio::sync::mpsc;
use crate::rooms::timeline::{RawPduId, pdu_fits};
@@ -111,7 +111,6 @@ impl super::Service {
/// room, if not soft fail it
#[tracing::instrument(
name = "pdu",
level = INFO_SPAN_LEVEL,
skip_all,
fields(%room_id, %event_id),
)]
@@ -153,9 +152,7 @@ pub async fn handle_incoming_pdu<'a>(
.and_then(|v| v.as_str())
.ok_or_else(|| err!("No sender in object"))
.and_then(|v| Ok(UserId::parse(v)?))
.map_err(|e| {
err!(Request(InvalidParam("PDU does not have a valid sender key: {e}")))
})?;
.map_err(|e| err!(Request(BadJson("PDU does not have a valid sender key: {e}"))))?;
let sender_acl_check: OptionFuture<_> = sender
.server_name()
@@ -235,7 +232,7 @@ pub async fn handle_incoming_pdu<'a>(
}}
let (incoming_pdu, val) = self
.handle_outlier_pdu(origin, create_event, event_id, room_id, value, false)
.handle_outlier_pdu(origin, create_event, event_id, room_id, value)
.await?;
// 8. if not timeline event: stop
@@ -243,71 +240,192 @@ pub async fn handle_incoming_pdu<'a>(
return Ok(None);
}
// Skip old events
// Skip events sent before we joined (they need to be persisted as backfilled
// events, not timeline events, which is handled elsewhere).
let first_ts_in_room = self
.services
.timeline
.first_pdu_in_room(room_id)
.await?
.origin_server_ts();
if incoming_pdu.origin_server_ts() < first_ts_in_room {
return Ok(None);
}
// 9. Fetch any missing prev events doing all checks listed here starting at 1.
// These are timeline events
let (sorted_prev_events, mut eventid_info) = self
.fetch_prev(
origin,
create_event,
room_id,
first_ts_in_room,
incoming_pdu.prev_events(),
)
.await?;
debug!(
events = ?sorted_prev_events,
"Handling previous events"
);
debug!("Fetching and persisting any missing prev events");
Box::pin(self.fetch_prevs(
room_id,
create_event,
&incoming_pdu,
origin,
first_ts_in_room,
))
.await
.debug_inspect_err(|e| {
error!("Failed to fetch and persist incoming event's prev_events: {e:?}");
})?;
sorted_prev_events
.iter()
.try_stream()
.map_ok(AsRef::as_ref)
.try_for_each(|prev_id| {
self.handle_prev_pdu(
origin,
event_id,
room_id,
eventid_info.remove(prev_id),
create_event,
first_ts_in_room,
prev_id,
)
.inspect_err(move |e| {
warn!("Prev {prev_id} failed: {e}");
match self
.services
.globals
.bad_event_ratelimiter
.write()
.entry(prev_id.into())
{
| hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
},
| hash_map::Entry::Occupied(mut e) => {
let tries = e.get().1.saturating_add(1);
*e.get_mut() = (Instant::now(), tries);
},
}
})
.map(|_| self.services.server.check_running())
})
.boxed()
.await?;
let is_dummy_event = incoming_pdu.event_type().to_string() == "org.matrix.dummy_event";
// Done with prev events, now handling the incoming event
self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
.boxed()
.await
let pdu_id = Box::pin(self.upgrade_outlier_to_timeline_pdu(
incoming_pdu,
val,
create_event,
origin,
room_id,
))
.await?;
let extremities_count = self
.services
.state
.get_forward_extremities(room_id)
.count()
.await;
self.maybe_squash_extremities(room_id, extremities_count, is_dummy_event)
.await;
Ok(pdu_id)
}
async fn maybe_squash_extremities(
&self,
room_id: &RoomId,
extremities_count: usize,
is_dummy_event: bool,
) {
let (tx, fut) = {
if let Some(tx) = self.extremity_squashers.read().get(room_id)
&& !tx.is_closed()
{
(tx.clone(), None)
} else {
let mut map = self.extremity_squashers.upgradable_read();
if let Some(tx) = map.get(room_id)
&& !tx.is_closed()
{
(tx.clone(), None)
} else {
let (tx, rx) = mpsc::channel(100);
map.with_upgraded(|map| map.insert(room_id.to_owned(), tx.clone()));
(tx, Some(self.spawn_squasher(room_id, rx)))
}
}
};
if let Some(fut) = fut {
fut.await;
}
let _ = tx.try_send((extremities_count, is_dummy_event));
}
async fn spawn_squasher(&self, room_id: &RoomId, mut rx: mpsc::Receiver<(usize, bool)>) {
let Some(service) = self.me.upgrade() else {
return;
};
let room_id = room_id.to_owned();
self.services.server.runtime().spawn(async move {
let mut latest_extremity_count = None;
let mut non_dummy_event = false;
let mut closing = false;
let waker = tokio::time::sleep(Duration::from_mins(2));
tokio::pin!(waker);
loop {
tokio::select! {
msg = rx.recv() => {
if let Some((extremities_count, is_dummy_event)) = msg {
latest_extremity_count = Some(extremities_count);
non_dummy_event = non_dummy_event || !is_dummy_event;
#[allow(clippy::arithmetic_side_effects)]
waker.as_mut().reset(tokio::time::Instant::now() + Duration::from_mins(1));
} else {
{let mut map = service.extremity_squashers.write();
if let Some(tx) = map.get(&room_id) && tx.is_closed() {
map.remove(&room_id);
}}
if let Some(count) = latest_extremity_count {
if non_dummy_event && count >= service.services.server.config.dummy_event_threshold.into() {
Self::squash_extremities(&service, &room_id, count).await;
}
}
break;
}
}
() = &mut waker, if !closing => {
if let Some(count) = latest_extremity_count {
if non_dummy_event && count >= service.services.server.config.dummy_event_threshold.into() {
Self::squash_extremities(&service, &room_id, count).await;
}
latest_extremity_count = None;
non_dummy_event = false;
#[allow(clippy::arithmetic_side_effects)]
waker.as_mut().reset(tokio::time::Instant::now() + Duration::from_mins(2));
} else {
rx.close();
closing = true;
}
}
() = service.server_shutdown.notified(), if !closing => {
rx.close();
closing = true;
}
}
}
});
}
async fn squash_extremities(&self, room_id: &RoomId, extremities_count: usize) {
debug_warn!(
%extremities_count,
threshold=%self.services.server.config.dummy_event_threshold,
"Attempting to squash extremities after upgrading pdu"
);
// Try to send a dummy event to squash extremities. See issue #1844
let power_levels = self
.services
.state_accessor
.get_room_power_levels(room_id)
.await;
let mut local_users = self.services.state_cache.local_users_in_room(room_id);
while let Some(user_id) = local_users.next().await {
if !power_levels.user_can_send_message(&user_id, "org.matrix.dummy_event".into()) {
trace!(%user_id, "user does not have power level to send dummy event, skipping");
continue;
}
let state_lock = self.services.state.mutex.lock(room_id).await;
if self
.services
.timeline
.build_and_append_pdu(
PartialPdu {
event_type: "org.matrix.dummy_event".into(),
..PartialPdu::default()
},
&user_id,
Some(room_id),
&state_lock,
)
.await
.inspect(|_| debug!(sender=%user_id, "Successfully sent a dummy event"))
.inspect_err(
|e| debug!(sender=%user_id, ?e, "Failed to send a dummy event via user"),
)
.is_ok()
{
break;
}
}
}
}
@@ -1,11 +1,13 @@
use std::collections::{BTreeMap, HashMap, hash_map};
use conduwuit::{
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, state_res, trace, warn,
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, info, state_res, trace,
warn,
};
use futures::future::ready;
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
api::federation::authorization::get_event_authorization, canonical_json::redact,
events::StateEventType,
};
@@ -16,6 +18,7 @@ impl super::Service {
/// Handles a PDU as an outlier, performing basic checks like signatures and
/// hashes, proclaimed event auth, and then adding it to the outlier tree.
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(name="handle_outlier", skip_all, fields(%event_id))]
pub(super) async fn handle_outlier_pdu<'a, Pdu>(
&self,
origin: &'a ServerName,
@@ -23,7 +26,6 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
event_id: &'a EventId,
room_id: &'a RoomId,
mut value: CanonicalJsonObject,
auth_events_known: bool,
) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>
where
Pdu: Event + Send + Sync,
@@ -47,27 +49,38 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
.verify_event(&value, &room_version_rules)
.await
{
| Ok(ruma::signatures::Verified::All) => value,
| Ok(ruma::signatures::Verified::All) => {
if let Ok(pdu_event) = self.services.timeline.get_pdu(event_id).await {
debug!(
"Already have event {event_id} as an outlier or timeline event, not \
re-processing"
);
value.insert(
"event_id".to_owned(),
CanonicalJsonValue::String(event_id.as_str().to_owned()),
);
check_room_id(room_id, &pdu_event)?;
return Ok((pdu_event, value));
}
value
},
| Ok(ruma::signatures::Verified::Signatures) => {
// Redact
debug_info!("Calculated hash does not match (redaction): {event_id}");
let Ok(obj) =
ruma::canonical_json::redact(value, &room_version_rules.redaction, None)
else {
return Err!(Request(InvalidParam("Redaction failed")));
};
// Skip the PDU if it is redacted and we already have it as an outlier event
if self.services.timeline.pdu_exists(event_id).await {
return Err!(Request(InvalidParam(
"Event was redacted and we already knew about it"
)));
if let Ok(pdu_event) = self.services.timeline.get_pdu(event_id).await {
debug!(
"Received a redacted copy of {event_id}, but we already knew about it. \
Re-using known content instead."
);
check_room_id(room_id, &pdu_event)?;
let obj = pdu_event.to_canonical_object();
return Ok((pdu_event, obj));
}
obj
debug_info!("Calculated hash does not match (redaction): {event_id}");
redact(value, &room_version_rules.redaction, None)
.map_err(|e| err!(Request(BadJson("Failed to redact {event_id}: {e}"))))?
},
| Err(e) => {
return Err!(Request(InvalidParam(debug_error!(
return Err!(Request(Forbidden(debug_error!(
"Signature verification failed for {event_id}: {e}"
))));
},
@@ -90,67 +103,81 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
// Fetch all auth events
let mut auth_events: HashMap<OwnedEventId, PduEvent> = HashMap::new();
for aid in pdu_event.auth_events() {
if self.services.pdu_metadata.is_event_rejected(aid).await {
for auth_event_id in pdu_event.auth_events() {
if self
.services
.pdu_metadata
.is_event_rejected(auth_event_id)
.await
{
debug_warn!(
"Rejecting incoming event {} which depends on rejected auth event {aid}",
"Rejecting incoming event {} which depends on rejected auth event \
{auth_event_id}",
event_id,
);
self.services.pdu_metadata.mark_event_rejected(event_id);
return Err!(Request(InvalidParam("Event has rejected auth event: {aid}")));
return Err!(Request(Forbidden(
"Event has rejected auth event: {auth_event_id}"
)));
}
if let Ok(auth_event) = self.services.timeline.get_pdu(aid).await {
if let Ok(auth_event) = self.services.timeline.get_pdu(auth_event_id).await {
check_room_id(room_id, &auth_event)?;
trace!("Found auth event {aid} for outlier event {event_id} locally");
auth_events.insert(aid.to_owned(), auth_event);
trace!("Found auth event {auth_event_id} for outlier event {event_id} locally");
auth_events.insert(auth_event_id.to_owned(), auth_event);
} else {
debug_warn!(
"Could not find auth event {aid} for outlier event {event_id} locally"
"Could not find auth event {auth_event_id} for outlier event {event_id} \
locally"
);
}
}
// Fetch any missing ones & reject invalid ones
let missing_auth_events = if auth_events_known {
pdu_event
.auth_events()
.filter(|id| !auth_events.contains_key(*id))
.collect::<Vec<_>>()
} else {
pdu_event.auth_events().collect::<Vec<_>>()
};
if !missing_auth_events.is_empty() || !auth_events_known {
debug_info!(
"Fetching {} missing auth events for outlier event {event_id}",
missing_auth_events.len()
);
for (pdu, _) in self
.fetch_and_handle_outliers(
if auth_events.len() != pdu_event.auth_events().count() {
info!("Missing some auth events, asking remote for auth chain");
let response: get_event_authorization::v1::Response = self
.services
.sending
.send_federation_request(
origin,
missing_auth_events.iter().copied(),
create_event,
room_id,
get_event_authorization::v1::Request::new(
room_id.to_owned(),
event_id.to_owned(),
),
)
.await
{
auth_events.insert(pdu.event_id().to_owned(), pdu);
.map_err(|e| {
err!(Request(Forbidden(
"Remote server is not divulging incoming event's auth chain: {e}"
)))
})?;
let mut auth_chain_map = HashMap::with_capacity(response.auth_chain.len());
for auth_pdu_json in response.auth_chain {
let (auth_event_room_id, auth_event_id, auth_pdu_json) =
self.parse_incoming_pdu(&auth_pdu_json).await?;
if auth_event_room_id != room_id {
return Err!(Request(Forbidden(
"Auth event {auth_event_id} is in {auth_event_room_id}, not {room_id}."
)));
}
let auth_pdu = PduEvent::from_id_val(&auth_event_id, auth_pdu_json)
.map_err(|e| err!(Request(BadJson("Invalid PDU {auth_event_id}: {e}"))))?;
auth_chain_map.insert(auth_event_id, auth_pdu);
}
for auth_event_id in pdu_event.auth_events() {
if auth_events.contains_key(auth_event_id) {
continue;
}
if let Some(auth_event) = auth_chain_map.get(auth_event_id) {
auth_events.insert(auth_event_id.to_owned(), auth_event.clone());
} else {
return Err!(Request(Forbidden(
"Remote server is not divulging incoming event's auth events (missing: \
{auth_event_id})"
)));
}
}
} else {
debug!("No missing auth events for outlier event {event_id}");
}
// reject if we are still missing some
let still_missing = pdu_event
.auth_events()
.filter(|id| !auth_events.contains_key(*id))
.collect::<Vec<_>>();
if !still_missing.is_empty() {
// Don't reject: this could be a temporary condition
// TODO: use get_missing_events?
return Err!(Request(InvalidParam(
"Could not fetch all auth events for outlier event {event_id}, still missing: \
{still_missing:?}"
)));
}
// 6. Reject "due to auth events" if the event doesn't pass auth based on the
@@ -181,7 +208,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
.outlier
.add_pdu_outlier(pdu_event.event_id(), &incoming_pdu);
self.services.pdu_metadata.mark_event_rejected(event_id);
return Err!(Request(InvalidParam(
return Err!(Request(Forbidden(
"Auth event's type and state_key combination exists multiple times: {}, \
{}",
auth_event.kind,
@@ -191,18 +218,6 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
}
}
// The original create event must be in the auth events
if !matches!(
auth_events_by_key.get(&(StateEventType::RoomCreate, String::new().into())),
Some(_) | None
) {
self.services.pdu_metadata.mark_event_rejected(event_id);
self.services
.outlier
.add_pdu_outlier(pdu_event.event_id(), &incoming_pdu);
return Err!(Request(InvalidParam("Incoming event refers to wrong create event.")));
}
let state_fetch = |ty: &StateEventType, sk: &str| {
let key = (ty.to_owned(), sk.into());
ready(auth_events_by_key.get(&key).map(ToOwned::to_owned))
@@ -1,95 +0,0 @@
use std::{collections::BTreeMap, time::Instant};
use conduwuit::{
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, defer,
utils::continue_exponential_backoff_secs,
};
use ruma::{CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName};
use tracing::debug;
impl super::Service {
#[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(
name = "prev",
level = INFO_SPAN_LEVEL,
skip_all,
fields(%prev_id),
)]
pub(super) async fn handle_prev_pdu<'a, Pdu>(
&self,
origin: &'a ServerName,
event_id: &'a EventId,
room_id: &'a RoomId,
eventid_info: Option<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
create_event: &'a Pdu,
first_ts_in_room: MilliSecondsSinceUnixEpoch,
prev_id: &'a EventId,
) -> Result
where
Pdu: Event + Send + Sync,
{
// Check for disabled again because it might have changed
if self.services.metadata.is_disabled(room_id).await {
return Err!(Request(Forbidden(debug_warn!(
"Federaton of room {room_id} is currently disabled on this server. Request by \
origin {origin} and event ID {event_id}"
))));
}
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.get(prev_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 5 * 60;
const MAX_DURATION: u64 = 60 * 60 * 24;
if continue_exponential_backoff_secs(
MIN_DURATION,
MAX_DURATION,
time.elapsed(),
*tries,
) {
debug!(
?tries,
duration = ?time.elapsed(),
"Backing off from prev_event"
);
return Ok(());
}
}
let Some((pdu, json)) = eventid_info else {
return Ok(());
};
// Skip old events
if pdu.origin_server_ts() < first_ts_in_room {
return Ok(());
}
let start_time = Instant::now();
self.federation_handletime
.write()
.insert(room_id.into(), ((*prev_id).to_owned(), start_time));
defer! {{
self.federation_handletime
.write()
.remove(room_id);
}};
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id)
.await?;
debug!(
elapsed = ?start_time.elapsed(),
"Handled prev_event",
);
Ok(())
}
}
+9 -4
View File
@@ -4,7 +4,6 @@
mod fetch_state;
mod handle_incoming_pdu;
mod handle_outlier_pdu;
mod handle_prev_pdu;
mod parse_incoming_pdu;
mod policy_server;
mod resolve_state;
@@ -15,19 +14,21 @@
use async_trait::async_trait;
use conduwuit::{Err, Event, PduEvent, Result, Server, SyncRwLock, utils::MutexMap};
pub use fetch_and_handle_outliers::{GET_MISSING_EVENTS_MAX_BATCH_SIZE, build_local_dag};
use ruma::{
OwnedEventId, OwnedRoomId, RoomId, events::room::create::RoomCreateEventContent,
room_version_rules::RoomVersionRules,
};
use tokio::sync::Notify;
use tokio::sync::{Notify, mpsc};
use crate::{Dep, globals, rooms, sending, server_keys};
pub struct Service {
pub mutex_federation: RoomMutexMap,
pub federation_handletime: SyncRwLock<HandleTimeMap>,
pub extremity_squashers: SyncRwLock<HashMap<OwnedRoomId, mpsc::Sender<(usize, bool)>>>,
services: Services,
server_shutdown: Notify,
me: std::sync::Weak<Self>,
}
struct Services {
@@ -53,9 +54,11 @@ struct Services {
#[async_trait]
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
Ok(Arc::new_cyclic(|s| Self {
me: s.clone(),
mutex_federation: RoomMutexMap::new(),
federation_handletime: HandleTimeMap::new().into(),
extremity_squashers: SyncRwLock::new(HashMap::new()),
services: Services {
globals: args.depend::<globals::Service>("globals"),
sending: args.depend::<sending::Service>("sending"),
@@ -91,6 +94,8 @@ async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result {
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
fn interrupt(&self) { self.server_shutdown.notify_waiters(); }
async fn clear_cache(&self) {}
}
impl Service {
@@ -7,7 +7,7 @@
use itertools::Itertools;
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, RoomId,
RoomVersionId,
RoomVersionId, room_version_rules::RoomVersionRules,
};
use serde_json::value::RawValue as RawJsonValue;
@@ -56,7 +56,10 @@ fn extract_room_id(event_type: &str, pdu: &CanonicalJsonObject) -> Result<OwnedR
/// Parses every entry in an array as an event ID, returning an error if any
/// step fails.
fn expect_event_id_array(value: &CanonicalJsonObject, field: &str) -> Result<Vec<OwnedEventId>> {
pub(super) fn expect_event_id_array(
value: &CanonicalJsonObject,
field: &str,
) -> Result<Vec<OwnedEventId>> {
value
.get(field)
.ok_or_else(|| err!(Request(BadJson("missing field `{field}` on PDU"))))?
@@ -102,6 +105,20 @@ pub fn validate_pdu(&self, pdu: &CanonicalJsonObject) -> Result {
Ok(())
}
pub async fn parse_incoming_pdu_with_known_room(
&self,
pdu: &RawJsonValue,
room_version_rules: &RoomVersionRules,
) -> Result<(OwnedEventId, CanonicalJsonObject)> {
let (event_id, value) =
gen_event_id_canonical_json(pdu, room_version_rules).map_err(|e| {
err!(Request(InvalidParam("Could not convert event to canonical json: {e}")))
})?;
self.validate_pdu(&value)?;
Ok((event_id, value))
}
#[tracing::instrument(name = "parse", skip_all)]
pub async fn parse_incoming_pdu(&self, pdu: &RawJsonValue) -> Result<Parsed> {
let value = serde_json::from_str::<CanonicalJsonObject>(pdu.get()).map_err(|e| {
err!(BadServerResponse(debug_warn!("Error parsing incoming event {e:?}")))
@@ -5,7 +5,7 @@
};
use conduwuit::{
Result, debug, err, error,
Result, debug, debug_error, err, error,
matrix::{Event, StateMap},
trace,
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt},
@@ -37,6 +37,7 @@ pub(super) async fn state_at_incoming_degree_one<Pdu>(
.pdu_shortstatehash(prev_event)
.await
else {
trace!("No shortstatehash for {prev_event}, cannot calculate one-degree state.");
return Ok(None);
};
@@ -100,6 +101,7 @@ pub(super) async fn state_at_incoming_resolved<Pdu>(
.map_ok(move |sstatehash| (sstatehash, prev_event))
})
.try_collect::<HashMap<_, _>>()
.inspect_err(|e| debug_error!("failed to calculate N-degree short state hashes: {e}"))
.await
else {
return Ok(None);
@@ -1,8 +1,9 @@
use std::{borrow::Borrow, sync::Arc, time::Instant};
use conduwuit::{
Err, Result, debug, debug_info, err, info, is_equal_to,
Err, Result, debug, debug_error, debug_info, err, info, is_equal_to,
matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res},
result::DebugInspect,
trace,
utils::{
IterStream,
@@ -23,28 +24,17 @@
};
impl super::Service {
pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
#[tracing::instrument(name="upgrade_outlier", skip_all, fields(event_id=%incoming_pdu.event_id()))]
pub(super) async fn upgrade_outlier_to_timeline_pdu(
&self,
incoming_pdu: PduEvent,
mut val: CanonicalJsonObject,
create_event: &Pdu,
create_event: &PduEvent,
origin: &ServerName,
room_id: &RoomId,
) -> Result<Option<RawPduId>>
where
Pdu: Event + Send + Sync,
{
// Skip the PDU if we already have it as a timeline event
if let Ok(pduid) = self
.services
.timeline
.get_pdu_id(incoming_pdu.event_id())
.await
{
return Ok(Some(pduid));
}
let (rejected, soft_failed) = join!(
) -> Result<Option<RawPduId>> {
let (pduid, rejected, soft_failed) = join!(
self.services.timeline.get_pdu_id(incoming_pdu.event_id()),
self.services
.pdu_metadata
.is_event_rejected(incoming_pdu.event_id()),
@@ -52,17 +42,27 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
.pdu_metadata
.is_event_soft_failed(incoming_pdu.event_id())
);
if rejected {
return Err!(Request(InvalidParam("Event has been rejected")));
if let Ok(id) = pduid {
trace!(event_id=%incoming_pdu.event_id(), "Skipping upgrade of already upgraded PDU");
return Ok(Some(id));
} else if rejected {
return Err!(Request(Forbidden("Event has been rejected")));
} else if soft_failed {
return Err!(Request(InvalidParam("Event has been soft-failed")));
return Err!(Request(Forbidden("Event has been soft-failed")));
}
assert_eq!(
*create_event.kind(),
StateEventType::RoomCreate.into(),
"tried to upgrade a PDU with a create_event that is not a room create event"
);
debug!(
event_id = %incoming_pdu.event_id,
"Upgrading PDU from outlier to timeline"
);
let timer = Instant::now();
let min_depth = self.services.metadata.get_mindepth(room_id).await;
let room_version_rules = get_room_version_rules(create_event)?;
// 10. Fetch missing state and auth chain events by calling /state_ids at
@@ -73,21 +73,34 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
event_id = %incoming_pdu.event_id,
"Resolving state at event"
);
let mut state_at_incoming_event = if incoming_pdu.prev_events().count() == 1 {
let state_at_incoming_event = if incoming_pdu.prev_events().count() == 1 {
self.state_at_incoming_degree_one(&incoming_pdu).await?
} else {
self.state_at_incoming_resolved(&incoming_pdu, room_id, &room_version_rules)
.await?
};
let state_at_incoming_event = match state_at_incoming_event {
| Some(s) => s,
| None => {
trace!("Could not calculate incoming state, asking remote {origin} for it");
self.fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
.await
.debug_inspect_err(|e| {
debug_error!("Could not fetch state from {origin}: {e}");
})?
},
};
if state_at_incoming_event.is_none() {
state_at_incoming_event = self
.fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
.await?;
if state_at_incoming_event.is_empty()
&& *incoming_pdu.event_type() != StateEventType::RoomCreate.into()
{
// This can happen if the remote sends an event but cannot be reached to fetch
// the state at it, and all other servers in the room (which might just be the
// unreachable server) are unable to provide required info.
// returning an error here allows the upgrade to be attempted at another time.
return Err!(Request(Forbidden("Could not resolve incoming state at event")));
}
let state_at_incoming_event =
state_at_incoming_event.expect("we always set this to some above");
trace!(state_events = state_at_incoming_event.len(), "Calculated incoming state");
debug!(
event_id = %incoming_pdu.event_id,
@@ -385,6 +398,12 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
// Event has passed all auth/stateres checks
drop(state_lock);
if incoming_pdu.depth > min_depth && incoming_pdu.state_key().is_some() {
self.services
.metadata
.set_mindepth(room_id, incoming_pdu.depth.into());
trace!("Increased room's min depth from {} to {}", min_depth, incoming_pdu.depth);
}
Ok(pdu_id)
}
+43 -4
View File
@@ -1,7 +1,7 @@
use std::{collections::HashMap, sync::Arc};
use conduwuit::{
Err, Pdu, Result, Server, debug, debug_info, debug_warn, err, error, info, is_true,
Err, Event, Pdu, Result, Server, debug, debug_info, debug_warn, err, error, info, is_true,
matrix::{
StateKey,
event::{gen_event_id, gen_event_id_canonical_json},
@@ -34,7 +34,7 @@
use crate::{
Dep, antispam, globals,
rooms::{
metadata, outlier, pdu_metadata, short,
event_handler, metadata, outlier, pdu_metadata, short,
state::{self, RoomMutexGuard},
state_accessor, state_cache,
state_compressor::{self, CompressedState, HashSetCompressStateEvent},
@@ -51,6 +51,7 @@ struct Services {
server: Arc<Server>,
db: Arc<Database>,
antispam: Dep<antispam::Service>,
event_handler: Dep<event_handler::Service>,
globals: Dep<globals::Service>,
metadata: Dep<metadata::Service>,
outlier: Dep<outlier::Service>,
@@ -73,6 +74,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
server: args.server.clone(),
db: args.db.clone(),
antispam: args.depend::<antispam::Service>("antispam"),
event_handler: args.depend::<event_handler::Service>("rooms::event_handler"),
globals: args.depend::<globals::Service>("globals"),
metadata: args.depend::<metadata::Service>("rooms::metadata"),
outlier: args.depend::<outlier::Service>("rooms::outlier"),
@@ -380,8 +382,6 @@ pub async fn join_remote_room(
// It has enough fields to be called a proper event now
let mut join_event = join_event_stub;
info!("Asking {remote_server} for send_join in room {room_id}");
let send_join_request = federation::membership::create_join_event::v2::Request::new(
room_id.to_owned(),
event_id.clone(),
@@ -391,6 +391,18 @@ pub async fn join_remote_room(
.await,
);
// NOTE: send_join can take a long time to respond, but from the point of view
// of other servers, we may already have finished joining. This means they
// sometimes end up sending PDUs to us that we aren't yet ready to accept, and
// consequently drop. Holding the mutex over the room while processing mitigates
// this.
let _room_lock = self
.services
.event_handler
.mutex_federation
.lock(room_id.as_str())
.await;
info!("Asking {remote_server} for send_join in room {room_id}");
let send_join_response = match self
.services
.sending
@@ -576,7 +588,13 @@ pub async fn join_remote_room(
if !auth_check {
return Err!(Request(Forbidden("Auth check failed")));
}
let resident_before = self
.services
.state_cache
.server_in_room(self.services.globals.server_name(), room_id)
.await;
let cork = self.services.db.cork_and_flush();
info!("Compressing state from send_join");
let compressed: CompressedState = self
.services
@@ -625,6 +643,10 @@ pub async fn join_remote_room(
room_id,
)
.await?;
self.services
.metadata
.maybe_set_mindepth(room_id, parsed_join_pdu.depth.into())
.await;
info!("Setting final room state for new room");
// We set the room state after inserting the pdu, so that we never have a moment
@@ -632,6 +654,23 @@ pub async fn join_remote_room(
self.services
.state
.set_room_state(room_id, statehash_after_join, &state_lock);
if !resident_before {
// NOTE: We replace local extremities for this room if we were not a resident
// before. We might be doing a remote join to satisfy restricted join rules,
// so we don't want to do this if we're already a resident. Otherwise, we
// want to replace our forward extremities whole-sale in case we were
// desynced.
info!("Replacing local forward extremities");
self.services
.state
.set_forward_extremities(
room_id,
std::iter::once(parsed_join_pdu.event_id()),
&state_lock,
)
.await;
}
drop(cork);
Ok(())
}
+25 -2
View File
@@ -1,9 +1,9 @@
use std::sync::Arc;
use conduwuit::{Result, utils::stream::TryIgnore};
use database::Map;
use database::{Deserialized, Map};
use futures::{Stream, StreamExt};
use ruma::{OwnedRoomId, RoomId};
use ruma::{OwnedRoomId, RoomId, UInt, uint};
use crate::{Dep, rooms};
@@ -17,6 +17,7 @@ struct Data {
bannedroomids: Arc<Map>,
roomid_shortroomid: Arc<Map>,
pduid_pdu: Arc<Map>,
roomid_mindepth: Arc<Map>,
}
struct Services {
@@ -31,6 +32,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
bannedroomids: args.db["bannedroomids"].clone(),
roomid_shortroomid: args.db["roomid_shortroomid"].clone(),
pduid_pdu: args.db["pduid_pdu"].clone(),
roomid_mindepth: args.db["roomid_mindepth"].clone(),
},
services: Services {
short: args.depend::<rooms::short::Service>("rooms::short"),
@@ -98,4 +100,25 @@ pub async fn is_banned(&self, room_id: &RoomId) -> bool {
pub fn list_banned_rooms(&self) -> impl Stream<Item = OwnedRoomId> + Send + '_ {
self.db.bannedroomids.keys().ignore_err()
}
pub async fn get_mindepth(&self, room_id: &RoomId) -> UInt {
self.db
.roomid_mindepth
.get(room_id)
.await
.deserialized::<UInt>()
.unwrap_or_else(|_| uint!(0))
}
pub fn set_mindepth(&self, room_id: &RoomId, min_depth: u64) {
self.db
.roomid_mindepth
.put_raw(room_id.as_bytes(), min_depth.to_be_bytes());
}
pub async fn maybe_set_mindepth(&self, room_id: &RoomId, min_depth: u64) {
if min_depth > self.get_mindepth(room_id).await.into() {
self.set_mindepth(room_id, min_depth);
}
}
}
+10
View File
@@ -371,6 +371,16 @@ pub async fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result<ShortSta
.deserialized()
}
pub fn all_forward_extremities(
&self,
) -> impl Stream<Item = (OwnedRoomId, OwnedEventId)> + Send {
self.db
.roomid_pduleaves
.keys()
.map_ok(|(room_id, event_id): (OwnedRoomId, OwnedEventId)| (room_id, event_id))
.ignore_err()
}
pub fn get_forward_extremities<'a>(
&'a self,
room_id: &'a RoomId,
+4 -1
View File
@@ -224,7 +224,10 @@ pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box<RawJsonValue>) ->
/// Determines which servers are trusted enough to provide backfill in a
/// room.
async fn candidate_backfill_servers(&self, room_id: &RoomId) -> HashSet<OwnedServerName> {
pub(crate) async fn candidate_backfill_servers(
&self,
room_id: &RoomId,
) -> HashSet<OwnedServerName> {
let mut candidate_backfill_servers = HashSet::new();
let power_levels = self
+4
View File
@@ -173,6 +173,10 @@ pub async fn get_non_outlier_pdu_json(
self.db.get_non_outlier_pdu_json(event_id).await
}
pub async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> bool {
self.db.non_outlier_pdu_exists(event_id).await.is_ok()
}
/// Returns the pdu's id.
#[inline]
pub async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> {