mirror of
https://forgejo.ellis.link/continuwuation/continuwuity/
synced 2026-07-02 02:31:55 +00:00
Compare commits
74 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f9a43abb7f | |||
| 3649f76045 | |||
| bdd8ad1413 | |||
| 46aefa8307 | |||
| 6343b62778 | |||
| 6ca3acbeff | |||
| 16f5d4b0e2 | |||
| 350f789737 | |||
| 54e89daa46 | |||
| 770343b1c0 | |||
| 4cf13a8e5a | |||
| b73f34454f | |||
| 754f239241 | |||
| 8e845235d2 | |||
| 04ffccdeac | |||
| 447b68e306 | |||
| 7471141eba | |||
| a1fba2218d | |||
| 22c7fa1b08 | |||
| 9028b18792 | |||
| a5bf0f2bfb | |||
| 2e33760575 | |||
| 3b8788d5d2 | |||
| 00a9ef1c06 | |||
| fd0f458978 | |||
| 3ff2797b11 | |||
| f0e4f53f03 | |||
| 2b486e3399 | |||
| 29b5df8722 | |||
| ae378bbdb7 | |||
| 0e0c7826ab | |||
| fa5e0faaf7 | |||
| b07a5c780c | |||
| 13e240ebdb | |||
| a701b610c5 | |||
| 17af0e54d9 | |||
| efb5ee0bf0 | |||
| 7a681c7c43 | |||
| 7e87912177 | |||
| e76881d64a | |||
| ad98dca8c2 | |||
| 1fc7c033cb | |||
| 8d4a313657 | |||
| 3a7259250b | |||
| 9292f16cd7 | |||
| 95473f4c24 | |||
| 6d606d9277 | |||
| d1baaaab48 | |||
| 921a83825a | |||
| 4e3b3d133d | |||
| a9376b3b04 | |||
| f7025af97e | |||
| a3544ba01f | |||
| c6acbf5440 | |||
| ba5beac7fc | |||
| 4b8cf7fb25 | |||
| dc966804e6 | |||
| 98c1a466fd | |||
| bcccb4373f | |||
| 049e2f6287 | |||
| b619eae9ef | |||
| 4b673692f6 | |||
| 9633a37421 | |||
| d090f5d769 | |||
| 51d7a82aa3 | |||
| d2c183baeb | |||
| b63eaa81d4 | |||
| 94db17b53a | |||
| ab5202677b | |||
| 469c1b2ed7 | |||
| 1b5e1e9886 | |||
| d8d7f863e5 | |||
| e8fee00df6 | |||
| 266616de7b |
@@ -0,0 +1,2 @@
|
||||
Improved the performance and reliability of fetching missing events, improving network partition recovery. Contributed
|
||||
by @nex.
|
||||
@@ -297,7 +297,7 @@
|
||||
|
||||
# This item is undocumented. Please contribute documentation for it.
|
||||
#
|
||||
#max_fetch_prev_events = 192
|
||||
#max_fetch_prev_events = 1024
|
||||
|
||||
# How many incoming federation transactions the server is willing to be
|
||||
# processing at any given time before it becomes overloaded and starts
|
||||
@@ -645,6 +645,14 @@
|
||||
#
|
||||
#default_room_acl_deny =
|
||||
|
||||
# The number of forward extremities to tolerate in a room before
|
||||
# attempting to manually squash them with a "dummy event". Setting this
|
||||
# above 20 will hinder its efficacy, and setting it below 5 will cause
|
||||
# more dummy events to be sent than necessary (which increases federation
|
||||
# traffic).
|
||||
#
|
||||
#dummy_event_threshold = 10
|
||||
|
||||
# Enable OpenTelemetry OTLP tracing export. This replaces the deprecated
|
||||
# Jaeger exporter. Traces will be sent via OTLP to a collector (such as
|
||||
# Jaeger) that supports the OpenTelemetry Protocol.
|
||||
|
||||
@@ -31,6 +31,8 @@
|
||||
};
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
use crate::PAGE_SIZE;
|
||||
|
||||
#[derive(Clone, Copy, Eq, PartialEq)]
|
||||
enum NodeStatus {
|
||||
Normal(bool),
|
||||
@@ -1166,4 +1168,56 @@ pub(super) async fn send_test_email(&self) -> Result {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) async fn rooms_by_extremity_count(&self, page: Option<usize>) -> Result {
|
||||
let page = page.unwrap_or(1);
|
||||
// My Giant Chain:tm:
|
||||
let mapped: HashMap<OwnedRoomId, u64> = self
|
||||
.services
|
||||
.rooms
|
||||
.state
|
||||
.all_forward_extremities()
|
||||
.ready_fold(HashMap::new(), move |mut map, (room_id, _)| {
|
||||
let count: u64 = map.get(&room_id).copied().unwrap_or(0);
|
||||
map.insert(room_id, count.saturating_add(1));
|
||||
map
|
||||
})
|
||||
.await
|
||||
.into_iter()
|
||||
.filter_map(|(room_id, count)| (count >= 2).then_some((room_id, count)))
|
||||
.collect();
|
||||
if mapped.is_empty() {
|
||||
return Err!("No more rooms.");
|
||||
}
|
||||
|
||||
let mut rooms = mapped.keys().collect::<Vec<_>>();
|
||||
rooms.sort_by_key(|room_id| {
|
||||
mapped
|
||||
.get(*room_id)
|
||||
.copied()
|
||||
.expect("keys must have values")
|
||||
});
|
||||
rooms.reverse();
|
||||
|
||||
let body = rooms
|
||||
.into_iter()
|
||||
.stream()
|
||||
.skip(page.saturating_sub(1).saturating_mul(PAGE_SIZE))
|
||||
.take(PAGE_SIZE)
|
||||
.map(|room_id| {
|
||||
format!(
|
||||
"{room_id}: {}",
|
||||
mapped.get(room_id).copied().expect("keys must have values")
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
self.write_str(&format!(
|
||||
"Rooms by extremity count ({}):\n```\n{}\n```",
|
||||
body.len(),
|
||||
body.join("\n")
|
||||
))
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -245,6 +245,11 @@ pub enum DebugCommand {
|
||||
/// Send a test email to the invoking admin's email address
|
||||
SendTestEmail,
|
||||
|
||||
/// Lists room IDs by forward extremity count in descending order
|
||||
RoomsByExtremityCount {
|
||||
page: Option<usize>,
|
||||
},
|
||||
|
||||
/// Developer test stubs
|
||||
#[command(subcommand)]
|
||||
#[allow(non_snake_case)]
|
||||
|
||||
@@ -4,15 +4,11 @@
|
||||
use conduwuit::{Err, Event, Result, debug, info, trace, utils::to_canonical_object, warn};
|
||||
use ruma::{OwnedEventId, api::federation::event::get_missing_events};
|
||||
use serde_json::{json, value::RawValue};
|
||||
use service::rooms::event_handler::GET_MISSING_EVENTS_MAX_BATCH_SIZE;
|
||||
|
||||
use super::AccessCheck;
|
||||
use crate::Ruma;
|
||||
|
||||
/// arbitrary number but synapse's is 20 and we can handle lots of these anyways
|
||||
const LIMIT_MAX: usize = 50;
|
||||
/// spec says default is 10
|
||||
const LIMIT_DEFAULT: usize = 10;
|
||||
|
||||
/// # `POST /_matrix/federation/v1/get_missing_events/{roomId}`
|
||||
///
|
||||
/// Retrieves events that the sender is missing.
|
||||
@@ -45,8 +41,8 @@ pub(crate) async fn get_missing_events_route(
|
||||
let limit = body
|
||||
.limit
|
||||
.try_into()
|
||||
.unwrap_or(LIMIT_DEFAULT)
|
||||
.min(LIMIT_MAX);
|
||||
.unwrap_or(10)
|
||||
.min(GET_MISSING_EVENTS_MAX_BATCH_SIZE);
|
||||
|
||||
let room_version = services.rooms.state.get_room_version(&body.room_id).await?;
|
||||
|
||||
|
||||
+11
-3
@@ -7,7 +7,7 @@
|
||||
use axum::extract::State;
|
||||
use axum_client_ip::ClientIp;
|
||||
use conduwuit::{
|
||||
Err, Error, Result, debug, debug_warn, err, error,
|
||||
Err, Error, Result, debug, debug_error, debug_warn, err, error,
|
||||
result::LogErr,
|
||||
state_res::lexicographical_topological_sort,
|
||||
trace,
|
||||
@@ -133,6 +133,7 @@ async fn wait_for_result(
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name="transaction"
|
||||
skip_all,
|
||||
fields(
|
||||
id = ?body.transaction_id.as_str(),
|
||||
@@ -174,8 +175,14 @@ async fn process_inbound_transaction(
|
||||
|
||||
for (id, result) in &results {
|
||||
if let Err(e) = result {
|
||||
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
|
||||
debug_warn!("Incoming PDU failed {id}: {e:?}");
|
||||
match e {
|
||||
| Error::BadRequest(
|
||||
ErrorKind::Forbidden | ErrorKind::InvalidParam | ErrorKind::BadJson,
|
||||
..,
|
||||
) => {
|
||||
debug_warn!("Incoming PDU {id} failed: {e:?}");
|
||||
},
|
||||
| _ => debug_error!("Incoming PDU {id} failed: {e:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -381,6 +388,7 @@ async fn handle_room(
|
||||
.rooms
|
||||
.event_handler
|
||||
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
|
||||
.boxed()
|
||||
.await
|
||||
.map(|_| ());
|
||||
results.push((event_id, result));
|
||||
|
||||
+14
-2
@@ -375,7 +375,7 @@ pub struct Config {
|
||||
#[serde(default = "default_max_request_size")]
|
||||
pub max_request_size: usize,
|
||||
|
||||
/// default: 192
|
||||
/// default: 1024
|
||||
#[serde(default = "default_max_fetch_prev_events")]
|
||||
pub max_fetch_prev_events: u16,
|
||||
|
||||
@@ -781,6 +781,16 @@ pub struct Config {
|
||||
/// a substitute for moderation bots.
|
||||
pub default_room_acl_deny: Option<Vec<String>>,
|
||||
|
||||
/// The number of forward extremities to tolerate in a room before
|
||||
/// attempting to manually squash them with a "dummy event". Setting this
|
||||
/// above 20 will hinder its efficacy, and setting it below 5 will cause
|
||||
/// more dummy events to be sent than necessary (which increases federation
|
||||
/// traffic).
|
||||
///
|
||||
/// default: 10
|
||||
#[serde(default = "default_extremity_threshold")]
|
||||
pub dummy_event_threshold: u8,
|
||||
|
||||
/// display: nested
|
||||
#[serde(default)]
|
||||
pub well_known: WellKnownConfig,
|
||||
@@ -2549,7 +2559,7 @@ fn default_pusher_timeout() -> u64 { 60 }
|
||||
|
||||
fn default_pusher_idle_timeout() -> u64 { 15 }
|
||||
|
||||
fn default_max_fetch_prev_events() -> u16 { 192_u16 }
|
||||
fn default_max_fetch_prev_events() -> u16 { 1024 }
|
||||
|
||||
fn default_max_concurrent_inbound_transactions() -> usize { 150 }
|
||||
|
||||
@@ -2652,6 +2662,8 @@ fn default_rocksdb_stats_level() -> u8 { 1 }
|
||||
#[inline]
|
||||
pub fn default_default_room_version() -> RoomVersionId { RoomVersionId::V12 }
|
||||
|
||||
fn default_extremity_threshold() -> u8 { 10 }
|
||||
|
||||
fn default_ip_range_denylist() -> Vec<String> {
|
||||
vec![
|
||||
"127.0.0.0/8".to_owned(),
|
||||
|
||||
@@ -62,7 +62,7 @@ impl Default for PartialPdu {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
event_type: "m.room.message".into(),
|
||||
content: Box::<RawJsonValue>::default(),
|
||||
content: to_raw_value("{}").unwrap(),
|
||||
unsigned: None,
|
||||
state_key: None,
|
||||
redacts: None,
|
||||
|
||||
@@ -187,6 +187,10 @@ pub(super) fn open_list(db: &Arc<Engine>, maps: &[Descriptor]) -> Result<Maps> {
|
||||
val_size_hint: Some(8),
|
||||
..descriptor::RANDOM_SMALL
|
||||
},
|
||||
Descriptor {
|
||||
name: "roomid_mindepth",
|
||||
..descriptor::RANDOM_SMALL
|
||||
},
|
||||
Descriptor {
|
||||
name: "roomserverids",
|
||||
..descriptor::RANDOM_SMALL
|
||||
|
||||
@@ -1,235 +1,667 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, HashSet, VecDeque, hash_map},
|
||||
collections::{HashMap, HashSet, VecDeque},
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
use assign::assign;
|
||||
#[cfg(debug_assertions)]
|
||||
use conduwuit::error;
|
||||
use conduwuit::{
|
||||
Event, PduEvent, debug, debug_warn, matrix::event::gen_event_id_canonical_json, trace,
|
||||
utils::continue_exponential_backoff_secs, warn,
|
||||
Err, Event, PduEvent, debug, debug_error, debug_info, debug_warn, err,
|
||||
state_res::lexicographical_topological_sort,
|
||||
trace,
|
||||
utils::{IterStream, math::Expected, stream::BroadbandExt},
|
||||
warn,
|
||||
};
|
||||
use futures::{StreamExt, future::select_ok};
|
||||
use ruma::{
|
||||
CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
|
||||
api::federation::event::get_event,
|
||||
CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
|
||||
OwnedServerName, RoomId, ServerName, UInt,
|
||||
api::federation::event::{get_event, get_missing_events},
|
||||
int,
|
||||
room_version_rules::RoomVersionRules,
|
||||
};
|
||||
|
||||
use super::get_room_version_rules;
|
||||
use crate::rooms::event_handler::parse_incoming_pdu::expect_event_id_array;
|
||||
|
||||
pub const GET_MISSING_EVENTS_MAX_BATCH_SIZE: usize = 50;
|
||||
|
||||
/// Attempts to build a localised directed acyclic graph out of the given PDUs,
|
||||
/// returning them in a topologically sorted order.
|
||||
///
|
||||
/// This is used to attempt to process PDUs in an order that respects their
|
||||
/// dependencies, however it is ultimately the sender's responsibility to send
|
||||
/// them in a processable order, so this is just a best effort attempt. It does
|
||||
/// not account for power levels or other tie breaks.
|
||||
pub async fn build_local_dag<S: std::hash::BuildHasher + Send + Sync>(
|
||||
pdu_map: &HashMap<OwnedEventId, &CanonicalJsonObject, S>,
|
||||
) -> conduwuit::Result<Vec<OwnedEventId>> {
|
||||
debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs");
|
||||
let mut dag: HashMap<OwnedEventId, HashSet<OwnedEventId>> =
|
||||
HashMap::with_capacity(pdu_map.len());
|
||||
let mut id_origin_ts: HashMap<OwnedEventId, _> = HashMap::with_capacity(pdu_map.len());
|
||||
|
||||
for (event_id, value) in pdu_map {
|
||||
// We already checked that these properties are correct in parse_incoming_pdu,
|
||||
// so it's safe to unwrap here.
|
||||
// We also filter to remove any prev_events that are not in this pdu_map, as we
|
||||
// need to have at least one event with zero out degrees for the lexico-topo
|
||||
// sort below. If there are multiple events with omitted prevs, they will be
|
||||
// ordered by timestamp, then event ID. At that point though, it's unlikely to
|
||||
// matter.
|
||||
let prev_events = value
|
||||
.get("prev_events")
|
||||
.unwrap()
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|v| EventId::parse(v.as_str().unwrap()).unwrap())
|
||||
.filter(|id| pdu_map.contains_key(id))
|
||||
.collect();
|
||||
|
||||
dag.insert(event_id.clone(), prev_events);
|
||||
let origin_server_ts = value
|
||||
.get("origin_server_ts")
|
||||
.and_then(CanonicalJsonValue::as_integer)
|
||||
.unwrap_or_default();
|
||||
id_origin_ts.insert(event_id.clone(), origin_server_ts);
|
||||
}
|
||||
|
||||
debug!(count = dag.len(), "Sorting incoming events with partial graph");
|
||||
lexicographical_topological_sort(&dag, &async |node_id| {
|
||||
// Note: we don't bother fetching power levels because that would massively slow
|
||||
// this function down. This is a best-effort attempt to order events correctly
|
||||
// for processing, however ultimately that should be the sender's job.
|
||||
let ts = id_origin_ts
|
||||
.get(&node_id)
|
||||
.copied()
|
||||
.unwrap_or_else(|| int!(0))
|
||||
.to_string()
|
||||
.parse::<u64>()
|
||||
.ok()
|
||||
.and_then(UInt::new)
|
||||
.unwrap_or_default();
|
||||
Ok((int!(0), MilliSecondsSinceUnixEpoch(ts)))
|
||||
})
|
||||
.await
|
||||
.inspect(|sorted| {
|
||||
debug_assert_eq!(
|
||||
sorted.len(),
|
||||
pdu_map.len(),
|
||||
"Sorted graph was not the same size as the input graph"
|
||||
);
|
||||
})
|
||||
.map_err(|e| err!("failed to resolve local graph: {e}"))
|
||||
}
|
||||
|
||||
impl super::Service {
|
||||
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
|
||||
/// it is appended to the outliers Tree.
|
||||
/// Uses `POST /_matrix/federation/v1/get_missing_events/{room_id}` to fill
|
||||
/// gaps in the DAG.
|
||||
///
|
||||
/// Returns pdu and if we fetched it over federation the raw json.
|
||||
/// This function walks backwards from `head`, fetching incrementally (by a
|
||||
/// factor of 10) more events until the remote we're fetching from either
|
||||
/// stops returning new events, or the min_depth is reached.
|
||||
///
|
||||
/// a. Look in the main timeline (pduid_pdu tree)
|
||||
/// b. Look at outlier pdu tree
|
||||
/// c. Ask origin server over federation
|
||||
/// d. TODO: Ask other servers over federation?
|
||||
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
|
||||
/// This function does not persist the events, but does validate them. The
|
||||
/// caller is responsible for passing them through handle_incoming_pdu or
|
||||
/// related functions.
|
||||
///
|
||||
/// Only the one `via` is asked for missing events, as multiplexing remotes
|
||||
/// may result in the event tree being walked in a gappy or disordered
|
||||
/// manner.
|
||||
///
|
||||
/// ## Parameters
|
||||
///
|
||||
/// - `room_id`: The room's ID.
|
||||
/// - `head`: The event we are potentially missing prev_events for.
|
||||
/// - `tail`: The most recently known events in the graph (typically forward
|
||||
/// extremities).
|
||||
/// - `via`: The server to ask for missing events.
|
||||
/// - `min_depth`: Don't process events with a `depth` lower than this
|
||||
/// value. Not massively useful, but can help short-circuit infinite loops
|
||||
/// and weird edge paths.
|
||||
#[tracing::instrument(name = "get_missing_events_bulk", skip_all)]
|
||||
pub async fn get_missing_events(
|
||||
&self,
|
||||
origin: &'a ServerName,
|
||||
events: Events,
|
||||
create_event: &'a Pdu,
|
||||
room_id: &'a RoomId,
|
||||
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
Events: Iterator<Item = &'a EventId> + Clone + Send,
|
||||
{
|
||||
let back_off = |id| match self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.write()
|
||||
.entry(id)
|
||||
room_id: &RoomId,
|
||||
head: &PduEvent,
|
||||
tail: Vec<OwnedEventId>,
|
||||
via: &ServerName,
|
||||
min_depth: UInt,
|
||||
) -> conduwuit::Result<HashMap<OwnedEventId, PduEvent>> {
|
||||
let start = Instant::now();
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
| hash_map::Entry::Vacant(e) => {
|
||||
e.insert((Instant::now(), 1));
|
||||
},
|
||||
| hash_map::Entry::Occupied(mut e) => {
|
||||
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
|
||||
},
|
||||
};
|
||||
|
||||
let mut events_with_auth_events = Vec::with_capacity(events.clone().count());
|
||||
trace!("Fetching {} outlier pdus", events.clone().count());
|
||||
|
||||
for id in events {
|
||||
// a. Look in the main timeline (pduid_pdu tree)
|
||||
// b. Look at outlier pdu tree
|
||||
// (get_pdu_json checks both)
|
||||
if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await {
|
||||
trace!("Found {id} in main timeline or outlier tree");
|
||||
events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![]));
|
||||
continue;
|
||||
}
|
||||
|
||||
// c. Ask origin server over federation
|
||||
// We also handle its auth chain here so we don't get a stack overflow in
|
||||
// handle_outlier_pdu.
|
||||
let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into();
|
||||
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
|
||||
|
||||
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
|
||||
while let Some(next_id) = todo_auth_events.pop_front() {
|
||||
if let Some((time, tries)) = self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.get(&*next_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 60 * 2;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 8;
|
||||
if continue_exponential_backoff_secs(
|
||||
MIN_DURATION,
|
||||
MAX_DURATION,
|
||||
time.elapsed(),
|
||||
*tries,
|
||||
) {
|
||||
debug_warn!(
|
||||
tried = ?*tries,
|
||||
elapsed = ?time.elapsed(),
|
||||
"Backing off from {next_id}",
|
||||
);
|
||||
continue;
|
||||
let missing_count = head
|
||||
.prev_events()
|
||||
.stream()
|
||||
.fold(0_u8, |i, event_id| async move {
|
||||
if self.services.timeline.pdu_exists(event_id).await {
|
||||
i.expected_add(1)
|
||||
} else {
|
||||
i
|
||||
}
|
||||
}
|
||||
})
|
||||
.await;
|
||||
debug_assert_ne!(
|
||||
missing_count, 0,
|
||||
"event passed to get_missing_events is not missing any events (wasteful call)"
|
||||
);
|
||||
};
|
||||
assert!(!tail.is_empty(), "empty tail");
|
||||
assert_ne!(via, self.services.globals.server_name(), "cannot ask ourselves for events");
|
||||
|
||||
if events_all.contains(&next_id) {
|
||||
// The iteration limit is in place to ensure that if the remote server leaves us
|
||||
// in a state of infinite recursion (as old versions of continuwuity and
|
||||
// predecessors would), we give up. However, get_missing_events doesn't return
|
||||
// that many events per-request. Synapse returns 20, and conduwuit+ return 50.
|
||||
// This means with a hard iteration limit, we might give up too early, before
|
||||
// we get a chance to even come close to max_fetch_prev_events. As such, we'll
|
||||
// calculate the limit based on that config option and the aforementioned
|
||||
// averages.
|
||||
let max_fetch = self.services.server.config.max_fetch_prev_events;
|
||||
let iteration_limit = max_fetch.saturating_div(20).max(10);
|
||||
|
||||
let mut discovered = HashMap::with_capacity(head.prev_events.len());
|
||||
let mut latest_events: Vec<OwnedEventId> = vec![head.event_id().to_owned()];
|
||||
debug!(elapsed=?start.elapsed(),
|
||||
%room_id,
|
||||
event_id=%head.event_id(),
|
||||
%iteration_limit,
|
||||
"Fetching any missing events for head event",
|
||||
);
|
||||
for iteration in 0..iteration_limit {
|
||||
let limit = iteration
|
||||
.expected_add(1)
|
||||
.saturating_mul(10)
|
||||
.min(GET_MISSING_EVENTS_MAX_BATCH_SIZE.try_into().expect(
|
||||
"GET_MISSING_EVENTS_MAX_BATCH_SIZE (usize) should fit in u16 (<=65536)",
|
||||
))
|
||||
.max(
|
||||
// This max call ensures we fetch *at least* all the prev events the
|
||||
// head has.
|
||||
u16::try_from(head.prev_events.len())
|
||||
.expect("cannot have more than 20 prev events, which fits in u16"),
|
||||
);
|
||||
debug_info!(elapsed=?start.elapsed(),
|
||||
%limit,
|
||||
%via,
|
||||
%iteration,
|
||||
%iteration_limit,
|
||||
discovered=discovered.len(),
|
||||
%min_depth,
|
||||
"Attempting to gap fill missing events"
|
||||
);
|
||||
let response: get_missing_events::v1::Response = self
|
||||
.services
|
||||
.sending
|
||||
.send_federation_request(
|
||||
via,
|
||||
assign!(
|
||||
get_missing_events::v1::Request::new(
|
||||
room_id.to_owned(),
|
||||
tail.clone(),
|
||||
latest_events.clone()
|
||||
),
|
||||
{limit: limit.into(), min_depth}
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
|
||||
if response.events.is_empty() {
|
||||
debug_info!(
|
||||
elapsed=?start.elapsed(),
|
||||
%via,
|
||||
"Finished gap filling missing events (remote returned no more events)."
|
||||
);
|
||||
break;
|
||||
}
|
||||
debug_info!(
|
||||
elapsed=?start.elapsed(),
|
||||
"Got {} events back from remote",
|
||||
response.events.len()
|
||||
);
|
||||
|
||||
latest_events.clear();
|
||||
for raw_event in response.events {
|
||||
let (_, event_id, pdu_json) = self.parse_incoming_pdu(&raw_event).await?;
|
||||
let pdu = PduEvent::from_id_val(&event_id, pdu_json).map_err(|e| {
|
||||
err!(Request(BadJson("Failed to parse gapfilled event {event_id}: {e}")))
|
||||
})?;
|
||||
if discovered.contains_key(&event_id) {
|
||||
// We already received this event.
|
||||
trace!("Already received {event_id}");
|
||||
continue;
|
||||
}
|
||||
|
||||
if self.services.timeline.pdu_exists(&next_id).await {
|
||||
trace!("Found {next_id} in db");
|
||||
continue;
|
||||
}
|
||||
|
||||
debug!("Fetching {next_id} over federation from {origin}.");
|
||||
match self
|
||||
if self
|
||||
.services
|
||||
.sending
|
||||
.send_federation_request(
|
||||
origin,
|
||||
get_event::v1::Request::new((*next_id).to_owned()),
|
||||
)
|
||||
.timeline
|
||||
.non_outlier_pdu_exists(&event_id)
|
||||
.await
|
||||
{
|
||||
| Ok(res) => {
|
||||
debug!("Got {next_id} over federation from {origin}");
|
||||
let Ok(room_version_rules) = get_room_version_rules(create_event) else {
|
||||
back_off((*next_id).to_owned());
|
||||
continue;
|
||||
};
|
||||
|
||||
let Ok((calculated_event_id, value)) =
|
||||
gen_event_id_canonical_json(&res.pdu, &room_version_rules)
|
||||
else {
|
||||
back_off((*next_id).to_owned());
|
||||
continue;
|
||||
};
|
||||
|
||||
if calculated_event_id != *next_id {
|
||||
warn!(
|
||||
"Server didn't return event id we requested: requested: \
|
||||
{next_id}, we got {calculated_event_id}. Event: {:?}",
|
||||
&res.pdu
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(auth_events) = value
|
||||
.get("auth_events")
|
||||
.and_then(CanonicalJsonValue::as_array)
|
||||
{
|
||||
for auth_event in auth_events {
|
||||
match serde_json::from_value::<OwnedEventId>(
|
||||
auth_event.clone().into(),
|
||||
) {
|
||||
| Ok(auth_event) => {
|
||||
trace!(
|
||||
"Found auth event id {auth_event} for event \
|
||||
{next_id}"
|
||||
);
|
||||
todo_auth_events.push_back(auth_event);
|
||||
},
|
||||
| _ => {
|
||||
warn!("Auth event id is not valid");
|
||||
},
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("Auth event list invalid");
|
||||
}
|
||||
|
||||
events_in_reverse_order.push((next_id.clone(), value));
|
||||
events_all.insert(next_id);
|
||||
},
|
||||
| Err(e) => {
|
||||
warn!("Failed to fetch auth event {next_id} from {origin}: {e}");
|
||||
back_off((*next_id).to_owned());
|
||||
},
|
||||
// NOTE: we explicitly check for *non*-outlier events here, as if we end
|
||||
// up discovering outlier events, we will be able to upgrade them
|
||||
// immediately.
|
||||
trace!("Already have {event_id} as a timeline PDU");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order));
|
||||
}
|
||||
if pdu.depth < min_depth {
|
||||
debug_warn!(
|
||||
elapsed=?start.elapsed(),
|
||||
"Received PDU with depth {} below min_depth {}",
|
||||
pdu.depth,
|
||||
min_depth
|
||||
);
|
||||
discovered.insert(event_id.clone(), pdu);
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut pdus = Vec::with_capacity(events_with_auth_events.len());
|
||||
for (id, local_pdu, events_in_reverse_order) in events_with_auth_events {
|
||||
// a. Look in the main timeline (pduid_pdu tree)
|
||||
// b. Look at outlier pdu tree
|
||||
// (get_pdu_json checks both)
|
||||
if let Some(local_pdu) = local_pdu {
|
||||
trace!("Found {id} in main timeline or outlier tree");
|
||||
pdus.push((local_pdu.clone(), None));
|
||||
}
|
||||
|
||||
for (next_id, value) in events_in_reverse_order.into_iter().rev() {
|
||||
if let Some((time, tries)) = self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.get(&*next_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 5 * 60;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 24;
|
||||
if continue_exponential_backoff_secs(
|
||||
MIN_DURATION,
|
||||
MAX_DURATION,
|
||||
time.elapsed(),
|
||||
*tries,
|
||||
) {
|
||||
debug!("Backing off from {next_id}");
|
||||
for prev_event_id in pdu.prev_events() {
|
||||
if discovered.contains_key(prev_event_id) {
|
||||
// We already received this event.
|
||||
trace!("Already received prev event {prev_event_id}");
|
||||
continue;
|
||||
}
|
||||
if self
|
||||
.services
|
||||
.timeline
|
||||
.non_outlier_pdu_exists(prev_event_id)
|
||||
.await
|
||||
{
|
||||
// NOTE: we explicitly check for *non*-outlier events here, as if we end
|
||||
// up discovering outlier events, we will be able to upgrade them
|
||||
// immediately.
|
||||
trace!("Already have prev event {prev_event_id} as a timeline PDU");
|
||||
continue;
|
||||
}
|
||||
if let Ok(outlier) = self.services.timeline.get_pdu(prev_event_id).await {
|
||||
// We already have this PDU as an outlier, don't ask for
|
||||
// it. However, if we are missing any prev events for it, add it to the
|
||||
// latest events anyway.
|
||||
let outlier_missing_prevs = outlier
|
||||
.prev_events()
|
||||
.stream()
|
||||
.fold(0_u8, |i, event_id| async move {
|
||||
if self.services.timeline.pdu_exists(event_id).await {
|
||||
i.expected_add(1)
|
||||
} else {
|
||||
i
|
||||
}
|
||||
})
|
||||
.await;
|
||||
if outlier_missing_prevs > 0 {
|
||||
trace!("Missing {outlier_missing_prevs} PDU(s) for prev event");
|
||||
latest_events.push(prev_event_id.to_owned());
|
||||
}
|
||||
trace!("Had {prev_event_id} as an outlier already, skipping discovery");
|
||||
discovered.insert(prev_event_id.to_owned(), outlier);
|
||||
continue;
|
||||
}
|
||||
trace!("Missing prev {prev_event_id} of {event_id}");
|
||||
latest_events.push(prev_event_id.to_owned());
|
||||
}
|
||||
trace!("Discovered {event_id}");
|
||||
discovered.insert(event_id.clone(), pdu);
|
||||
}
|
||||
|
||||
if latest_events.is_empty() {
|
||||
debug!(elapsed=?start.elapsed(),
|
||||
%limit,
|
||||
%via,
|
||||
%iteration,
|
||||
discovered=discovered.len(),
|
||||
"No more events to fetch."
|
||||
);
|
||||
break;
|
||||
}
|
||||
if discovered.len() >= self.services.server.config.max_fetch_prev_events.into() {
|
||||
// Stupid hack, debug_error!() drops the log to a DEBUG when not in debug mode,
|
||||
// which is bad because this should at least produce a warning. It's an error in
|
||||
// debug mode because this can be important, but typically not much can be done
|
||||
// about it as a user.
|
||||
#[cfg(debug_assertions)]
|
||||
error!(elapsed=?start.elapsed(),
|
||||
discovered=discovered.len(),
|
||||
max_fetch_prev_events=self.services.server.config.max_fetch_prev_events,
|
||||
%iteration,
|
||||
%iteration_limit,
|
||||
%via,
|
||||
event_id=%head.event_id(),
|
||||
%room_id,
|
||||
"Encountered a gap too large to fill, giving up"
|
||||
);
|
||||
#[cfg(not(debug_assertions))]
|
||||
warn!(elapsed=?start.elapsed(),
|
||||
discovered=discovered.len(),
|
||||
max_fetch_prev_events=self.services.server.config.max_fetch_prev_events,
|
||||
%iteration,
|
||||
%iteration_limit,
|
||||
%via,
|
||||
event_id=%head.event_id(),
|
||||
%room_id,
|
||||
"Encountered a gap too large to fill"
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
trace!(elapsed=?start.elapsed(), "Finished get_missing_events");
|
||||
Ok(discovered)
|
||||
}
|
||||
|
||||
/// Sends a `GET /_matrix/federation/v1/event/{event_id}` request to the
|
||||
/// target `remote`, parses the resulting PDU, and ensures the remote
|
||||
/// returned the correct event.
|
||||
/// Allows `fetch_and_handle_missing_events` to atomically fetch events from
|
||||
/// multiple remotes in parallel.
|
||||
async fn fetch_event_via(
|
||||
&self,
|
||||
remote: OwnedServerName,
|
||||
event_id: OwnedEventId,
|
||||
room_version_rules: &RoomVersionRules,
|
||||
) -> conduwuit::Result<(OwnedEventId, CanonicalJsonObject)> {
|
||||
let res = self
|
||||
.services
|
||||
.sending
|
||||
.send_federation_request(&remote, get_event::v1::Request::new(event_id.clone()))
|
||||
.await?;
|
||||
|
||||
let (calculated_event_id, value) = self
|
||||
.parse_incoming_pdu_with_known_room(&res.pdu, room_version_rules)
|
||||
.await?;
|
||||
|
||||
if calculated_event_id != event_id {
|
||||
Err!(Request(BadJson(warn!(
|
||||
expected=%event_id,
|
||||
received=%calculated_event_id,
|
||||
"Server didn't return event id we requested",
|
||||
))))
|
||||
} else {
|
||||
Ok((event_id, value))
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_event_vias(
|
||||
&self,
|
||||
candidates: impl Iterator<Item = &OwnedServerName>,
|
||||
event_id: &EventId,
|
||||
room_version_rules: &RoomVersionRules,
|
||||
) -> conduwuit::Result<(OwnedEventId, CanonicalJsonObject)> {
|
||||
if let Ok(pdu_json) = self.services.timeline.get_pdu_json(event_id).await {
|
||||
return Ok((event_id.to_owned(), pdu_json));
|
||||
}
|
||||
let futures = candidates
|
||||
.map(|remote| {
|
||||
Box::pin(self.fetch_event_via(
|
||||
remote.to_owned(),
|
||||
event_id.to_owned(),
|
||||
room_version_rules,
|
||||
))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
select_ok(futures).await.map(|(res, _)| res)
|
||||
}
|
||||
|
||||
/// Asks remote servers for any individual events that are missing, also
|
||||
/// known as "atomic fetch". Should only be used for fetching missing auth
|
||||
/// events or resolving missing events from state_ids. For all other uses,
|
||||
/// use get_missing_events.
|
||||
///
|
||||
/// This function manually walks auth_events trees in a breadth-first
|
||||
/// search, and persists all fetched events as outliers when all the
|
||||
/// backwards extremities have been resolved.
|
||||
#[tracing::instrument(name = "get_missing_auth_events_atomic", skip_all)]
|
||||
pub(super) async fn fetch_and_handle_auth_events<Pdu>(
|
||||
&self,
|
||||
origin: &ServerName,
|
||||
events: Vec<OwnedEventId>,
|
||||
create_event: &Pdu,
|
||||
room_id: &RoomId,
|
||||
) -> HashMap<OwnedEventId, PduEvent>
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
{
|
||||
let start = Instant::now();
|
||||
let room_version_rules =
|
||||
&get_room_version_rules(create_event).unwrap_or(RoomVersionRules::V1);
|
||||
let mut candidates = self
|
||||
.services
|
||||
.timeline
|
||||
.candidate_backfill_servers(room_id)
|
||||
.await;
|
||||
candidates.insert(origin.to_owned());
|
||||
assert!(!candidates.is_empty(), "no candidates to fetch missing events from");
|
||||
let mut discovered_events =
|
||||
HashMap::with_capacity(events.len().saturating_add(events.len().saturating_mul(3)));
|
||||
trace!(
|
||||
elapsed=?start.elapsed(),
|
||||
"Fetching {} unknown PDUs on demand from {} candidates",
|
||||
events.len(),
|
||||
candidates.len()
|
||||
);
|
||||
|
||||
let mut seen: HashMap<OwnedEventId, u8> = HashMap::new();
|
||||
for apex_event_id in &events {
|
||||
let mut todo: VecDeque<OwnedEventId> = [apex_event_id.to_owned()].into();
|
||||
|
||||
while let Some(target_id) = todo.pop_front() {
|
||||
if discovered_events.contains_key(&target_id) {
|
||||
continue;
|
||||
}
|
||||
if let Ok(local_pdu) = self.services.timeline.get_pdu(&target_id).await {
|
||||
trace!(elapsed=?start.elapsed(), "Found {target_id} in db");
|
||||
let mut obj = local_pdu.into_canonical_object();
|
||||
obj.remove("event_id");
|
||||
discovered_events.insert(target_id.clone(), obj);
|
||||
continue;
|
||||
}
|
||||
let attempts = seen.get(&*target_id).copied().unwrap_or_default();
|
||||
if attempts >= 5 {
|
||||
debug_error!(
|
||||
elapsed=?start.elapsed(),
|
||||
%attempts,
|
||||
%target_id,
|
||||
"Could not fetch missing event after 5 attempts, giving up"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
trace!("Handling outlier {next_id}");
|
||||
match Box::pin(self.handle_outlier_pdu(
|
||||
origin,
|
||||
create_event,
|
||||
&next_id,
|
||||
room_id,
|
||||
value.clone(),
|
||||
true,
|
||||
))
|
||||
.await
|
||||
debug!(elapsed=?start.elapsed(),"Fetching {target_id} over federation");
|
||||
let value = match self
|
||||
.fetch_event_vias(candidates.iter(), &target_id, room_version_rules)
|
||||
.await
|
||||
{
|
||||
| Ok((pdu, json)) =>
|
||||
if next_id == *id {
|
||||
trace!("Handled outlier {next_id} (original request)");
|
||||
pdus.push((pdu, Some(json)));
|
||||
},
|
||||
| Ok((_, x)) => x,
|
||||
| Err(e) => {
|
||||
warn!("Authentication of event {next_id} failed: {e:?}");
|
||||
back_off(next_id);
|
||||
warn!(elapsed=?start.elapsed(),"failed to fetch missing event {target_id} from any candidate: {e}");
|
||||
continue;
|
||||
},
|
||||
};
|
||||
let auth_events =
|
||||
match expect_event_id_array(&value, "auth_events").map_err(|e| {
|
||||
err!(Request(BadJson(warn!(
|
||||
elapsed=?start.elapsed(),
|
||||
event_id=%target_id,
|
||||
"Failed to parse event fetched from remote: {e}"
|
||||
))))
|
||||
}) {
|
||||
| Ok(auth_events) => auth_events,
|
||||
| Err(e) => {
|
||||
warn!(
|
||||
elapsed=?start.elapsed(),
|
||||
?e,
|
||||
"event {target_id} is malformed (bad auth_events), skipping"
|
||||
);
|
||||
continue;
|
||||
},
|
||||
};
|
||||
let mut have_all_auth = true;
|
||||
for auth_event_id in auth_events {
|
||||
if let Ok(local_pdu) = self.services.timeline.get_pdu(&auth_event_id).await {
|
||||
trace!(elapsed=?start.elapsed(),"Found auth event {auth_event_id} in db");
|
||||
let mut obj = local_pdu.into_canonical_object();
|
||||
obj.remove("event_id");
|
||||
discovered_events.insert(auth_event_id.clone(), obj);
|
||||
continue;
|
||||
}
|
||||
if discovered_events.contains_key(&auth_event_id) {
|
||||
trace!(elapsed=?start.elapsed(),%auth_event_id, "Already found auth event");
|
||||
continue;
|
||||
}
|
||||
debug!(elapsed=?start.elapsed(),"Missing auth event {auth_event_id} for event {target_id}");
|
||||
seen.insert(
|
||||
auth_event_id.clone(),
|
||||
seen.get(&auth_event_id)
|
||||
.copied()
|
||||
.unwrap_or_default()
|
||||
.saturating_add(1),
|
||||
);
|
||||
todo.push_back(auth_event_id);
|
||||
have_all_auth = false;
|
||||
}
|
||||
// Insert this PDU back at the end of the queue so that it will be resolved once
|
||||
// all of its auth events have been fetched.
|
||||
if have_all_auth {
|
||||
debug!(elapsed=?start.elapsed(),%target_id, "Have all auth events");
|
||||
discovered_events.insert(target_id, value);
|
||||
} else {
|
||||
debug_warn!(elapsed=?start.elapsed(),
|
||||
"Fetched {target_id} but missing some auth events, will have to re-fetch."
|
||||
);
|
||||
seen.insert(target_id.clone(), attempts.saturating_add(1));
|
||||
todo.push_back(target_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!("Fetched and handled {} outlier pdus", pdus.len());
|
||||
|
||||
let refmap: HashMap<OwnedEventId, &CanonicalJsonObject> = discovered_events
|
||||
.iter()
|
||||
.map(|(id, data)| (id.clone(), data))
|
||||
.collect();
|
||||
let seeded_ordered = build_local_dag(&refmap)
|
||||
.await
|
||||
.expect("failed to build local DAG");
|
||||
let mut pdus = HashMap::with_capacity(seeded_ordered.len());
|
||||
for discovered_event_id in seeded_ordered {
|
||||
let pdu_json = discovered_events.remove(&discovered_event_id).unwrap();
|
||||
debug_info!(
|
||||
elapsed=?start.elapsed(),
|
||||
"Handling missing event {discovered_event_id} as outlier"
|
||||
);
|
||||
assert_eq!(pdu_json.get("event_id"), None, "pdu_json had event_id");
|
||||
match Box::pin(self.handle_outlier_pdu(
|
||||
origin,
|
||||
create_event,
|
||||
&discovered_event_id,
|
||||
room_id,
|
||||
pdu_json,
|
||||
))
|
||||
.await
|
||||
{
|
||||
| Ok((pdu, _)) => {
|
||||
trace!(elapsed=?start.elapsed(), "Persisted {discovered_event_id}");
|
||||
let _ = pdus.insert(discovered_event_id, pdu);
|
||||
},
|
||||
| Err(e) => warn!(
|
||||
elapsed=?start.elapsed(),
|
||||
"Authentication of event {discovered_event_id} failed: {e:?}"
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
trace!(
|
||||
elapsed=?start.elapsed(),
|
||||
"Finished fetch_and_handle_missing_events: fetched and handled {} missing PDUs",
|
||||
pdus.len()
|
||||
);
|
||||
pdus.retain(|id, _| events.contains(id)); // Only return state events
|
||||
trace!(elapsed=?start.elapsed(), "Filtered return value down to {} PDUs", pdus.len());
|
||||
pdus
|
||||
}
|
||||
|
||||
/// Similar to `fetch_and_handle_missing_events`, but simply walks the
|
||||
/// prev events tree instead of the auth events tree. Additionally, it does
|
||||
/// not *handle* fetched PDUs in any capacity.
|
||||
#[tracing::instrument(name = "get_missing_prev_events_atomic", skip_all)]
|
||||
pub(super) async fn fetch_prev_events<Pdu>(
|
||||
&self,
|
||||
origin: &ServerName,
|
||||
events: Vec<OwnedEventId>,
|
||||
create_event: &Pdu,
|
||||
room_id: &RoomId,
|
||||
) -> HashMap<OwnedEventId, PduEvent>
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
{
|
||||
let room_version_rules =
|
||||
&get_room_version_rules(create_event).unwrap_or(RoomVersionRules::V1);
|
||||
let mut candidates = self
|
||||
.services
|
||||
.timeline
|
||||
.candidate_backfill_servers(room_id)
|
||||
.await;
|
||||
candidates.insert(origin.to_owned());
|
||||
|
||||
let mut todo: VecDeque<OwnedEventId> = VecDeque::from(events);
|
||||
let mut discovered_events = HashMap::new();
|
||||
while let Some(next_id) = todo.pop_front() {
|
||||
if discovered_events.len() >= self.services.server.config.max_fetch_prev_events.into()
|
||||
{
|
||||
debug_warn!(
|
||||
"Encountered a gap too large to fill, giving up (fetched {} events)",
|
||||
discovered_events.len()
|
||||
);
|
||||
break;
|
||||
}
|
||||
if discovered_events.contains_key(&next_id) {
|
||||
continue;
|
||||
}
|
||||
let pdu = match self
|
||||
.fetch_event_vias(candidates.iter(), &next_id, room_version_rules)
|
||||
.await
|
||||
{
|
||||
| Ok((_, data)) => data,
|
||||
| Err(e) => {
|
||||
warn!("Failed to fetch prev event {next_id} from any candidate: {e}");
|
||||
continue;
|
||||
},
|
||||
};
|
||||
|
||||
let prev_events = match expect_event_id_array(&pdu, "prev_events").map_err(|e| {
|
||||
err!(Request(BadJson(warn!(
|
||||
event_id=%next_id,
|
||||
"Failed to parse event fetched from remote: {e}"
|
||||
))))
|
||||
}) {
|
||||
| Ok(auth_events) => auth_events,
|
||||
| Err(e) => {
|
||||
warn!(?e, "event {next_id} is malformed (bad prev_events), skipping");
|
||||
continue;
|
||||
},
|
||||
};
|
||||
let missing_prev = prev_events
|
||||
.iter()
|
||||
.stream()
|
||||
.broad_filter_map(|event_id| async {
|
||||
if discovered_events.contains_key(event_id)
|
||||
|| self.services.timeline.pdu_exists(event_id).await
|
||||
{
|
||||
None
|
||||
} else {
|
||||
Some(event_id.to_owned())
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
todo.extend(missing_prev);
|
||||
discovered_events.insert(
|
||||
next_id.clone(),
|
||||
PduEvent::from_id_val(&next_id, pdu).expect("fetched PDU was already validated"),
|
||||
);
|
||||
}
|
||||
|
||||
discovered_events
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,124 +1,155 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap, HashSet, VecDeque},
|
||||
iter::once,
|
||||
};
|
||||
use std::{collections::HashMap, time::Instant};
|
||||
|
||||
use conduwuit::{
|
||||
Event, PduEvent, Result, debug_warn, err,
|
||||
state_res::{self},
|
||||
};
|
||||
use futures::{FutureExt, future};
|
||||
use ruma::{
|
||||
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
|
||||
int, uint,
|
||||
Event, PduEvent, debug, debug_info, debug_warn, trace,
|
||||
utils::{BoolExt, IterStream, stream::BroadbandExt},
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use ruma::{CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName};
|
||||
|
||||
use super::check_room_id;
|
||||
use crate::rooms::event_handler::build_local_dag;
|
||||
|
||||
impl super::Service {
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub(super) async fn fetch_prev<'a, Pdu, Events>(
|
||||
/// Fetches any missing prev_events for this event and persists them before
|
||||
/// returning.
|
||||
pub(super) async fn fetch_prevs(
|
||||
&self,
|
||||
origin: &ServerName,
|
||||
create_event: &Pdu,
|
||||
room_id: &RoomId,
|
||||
create_event: &PduEvent,
|
||||
incoming_pdu: &PduEvent,
|
||||
origin: &ServerName,
|
||||
first_ts_in_room: MilliSecondsSinceUnixEpoch,
|
||||
initial_set: Events,
|
||||
) -> Result<(
|
||||
Vec<OwnedEventId>,
|
||||
HashMap<OwnedEventId, (PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
|
||||
)>
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
Events: Iterator<Item = &'a EventId> + Clone + Send,
|
||||
{
|
||||
let num_ids = initial_set.clone().count();
|
||||
let mut eventid_info = HashMap::new();
|
||||
let mut graph: HashMap<OwnedEventId, _> = HashMap::with_capacity(num_ids);
|
||||
let mut todo_outlier_stack: VecDeque<OwnedEventId> =
|
||||
initial_set.map(ToOwned::to_owned).collect();
|
||||
) -> conduwuit::Result<()> {
|
||||
let start = Instant::now();
|
||||
let mut missing = incoming_pdu
|
||||
.prev_events()
|
||||
.stream()
|
||||
.broad_filter_map(|event_id| async move {
|
||||
self.services
|
||||
.timeline
|
||||
.get_non_outlier_pdu_json(event_id)
|
||||
.await
|
||||
.is_ok()
|
||||
.or(|| event_id.to_owned())
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
if missing.is_empty() {
|
||||
debug!(elapsed=?start.elapsed(), event_id=%incoming_pdu.event_id(), "No missing prev events.");
|
||||
return Ok(());
|
||||
}
|
||||
debug!(elapsed=?start.elapsed(), %room_id, event_id=%incoming_pdu.event_id(), ?missing, "Fetching previous events");
|
||||
let tail = self
|
||||
.services
|
||||
.state
|
||||
.get_forward_extremities(room_id)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
let mut amount = 0;
|
||||
let mut gapfilled = self
|
||||
.get_missing_events(
|
||||
room_id,
|
||||
incoming_pdu,
|
||||
tail,
|
||||
origin,
|
||||
self.services
|
||||
.metadata
|
||||
.get_mindepth(room_id)
|
||||
.await
|
||||
.saturating_sub(
|
||||
u8::try_from(incoming_pdu.prev_events.len())
|
||||
.unwrap()
|
||||
.saturating_mul(2)
|
||||
.into(),
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
debug_info!(elapsed=?start.elapsed(), "Fetched {} missing events", gapfilled.len());
|
||||
missing.retain(|eid| !gapfilled.contains_key(eid));
|
||||
if !missing.is_empty() {
|
||||
debug_warn!(elapsed=?start.elapsed(), "Still missing {} events, falling back to atomic fetch.", missing.len());
|
||||
gapfilled.extend(
|
||||
self.fetch_prev_events(origin, missing, create_event, room_id)
|
||||
.await,
|
||||
);
|
||||
}
|
||||
|
||||
while let Some(prev_event_id) = todo_outlier_stack.pop_front() {
|
||||
self.services.server.check_running()?;
|
||||
// Persist all fetched events
|
||||
let mapped = gapfilled
|
||||
.iter()
|
||||
.map(|(eid, evt)| {
|
||||
let mut obj = evt.to_canonical_object();
|
||||
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
|
||||
(eid.clone(), obj)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let to_persist = if mapped.len() <= 1 {
|
||||
mapped.keys().map(ToOwned::to_owned).collect()
|
||||
} else {
|
||||
let refmap: HashMap<OwnedEventId, &CanonicalJsonObject> =
|
||||
mapped.iter().map(|(id, data)| (id.clone(), data)).collect();
|
||||
build_local_dag(&refmap).await?
|
||||
};
|
||||
|
||||
let job_start = Instant::now();
|
||||
trace!("Starting to persist {} prev events", to_persist.len());
|
||||
for (i, event_id) in to_persist.iter().enumerate() {
|
||||
debug!(
|
||||
elapsed=?start.elapsed(),
|
||||
"Persisting fetched prev event: {event_id} ({}/{})",
|
||||
i.saturating_add(1),
|
||||
to_persist.len(),
|
||||
);
|
||||
let obj = mapped.get(event_id).cloned().unwrap();
|
||||
let persist_start = Instant::now();
|
||||
match self
|
||||
.fetch_and_handle_outliers(
|
||||
origin,
|
||||
once(prev_event_id.as_ref()),
|
||||
create_event,
|
||||
room_id,
|
||||
)
|
||||
.boxed()
|
||||
.handle_outlier_pdu(origin, create_event, event_id, room_id, obj)
|
||||
.await
|
||||
.pop()
|
||||
{
|
||||
| Some((pdu, mut json_opt)) => {
|
||||
check_room_id(room_id, &pdu)?;
|
||||
|
||||
let limit = self.services.server.config.max_fetch_prev_events;
|
||||
if amount > limit {
|
||||
debug_warn!("Max prev event limit reached! Limit: {limit}");
|
||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||
continue;
|
||||
}
|
||||
|
||||
if json_opt.is_none() {
|
||||
json_opt = self
|
||||
.services
|
||||
.outlier
|
||||
.get_outlier_pdu_json(&prev_event_id)
|
||||
.await
|
||||
.ok();
|
||||
}
|
||||
|
||||
if let Some(json) = json_opt {
|
||||
if pdu.origin_server_ts() > first_ts_in_room {
|
||||
amount = amount.saturating_add(1);
|
||||
for prev_prev in pdu.prev_events() {
|
||||
if !graph.contains_key(prev_prev) {
|
||||
todo_outlier_stack.push_back(prev_prev.to_owned());
|
||||
}
|
||||
}
|
||||
|
||||
graph.insert(
|
||||
prev_event_id.clone(),
|
||||
pdu.prev_events().map(ToOwned::to_owned).collect(),
|
||||
);
|
||||
} else {
|
||||
// Time based check failed
|
||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||
}
|
||||
|
||||
eventid_info.insert(prev_event_id.clone(), (pdu, json));
|
||||
} else {
|
||||
// Get json failed, so this was not fetched over federation
|
||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||
}
|
||||
},
|
||||
| _ => {
|
||||
// Fetch and handle failed
|
||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||
| Ok((pdu, val)) if pdu.origin_server_ts() >= first_ts_in_room => {
|
||||
Box::pin(self.upgrade_outlier_to_timeline_pdu(
|
||||
pdu,
|
||||
val,
|
||||
create_event,
|
||||
origin,
|
||||
room_id,
|
||||
))
|
||||
.await
|
||||
.inspect_err(|e| {
|
||||
debug_warn!(
|
||||
total_elapsed=?start.elapsed(),
|
||||
job_elapsed=?job_start.elapsed(),
|
||||
task_elapsed=?persist_start.elapsed(),
|
||||
"Failed to upgrade prev event {event_id}: {e}",
|
||||
);
|
||||
})
|
||||
.inspect(|_| {
|
||||
debug_info!(
|
||||
total_elapsed=?start.elapsed(),
|
||||
job_elapsed=?job_start.elapsed(),
|
||||
task_elapsed=?persist_start.elapsed(),
|
||||
"Upgraded prev event {event_id}",
|
||||
);
|
||||
})
|
||||
.ok();
|
||||
},
|
||||
| Err(e) => debug_warn!(
|
||||
total_elapsed=?start.elapsed(),
|
||||
job_elapsed=?job_start.elapsed(),
|
||||
task_elapsed=?persist_start.elapsed(),
|
||||
"Failed to persist prev event {event_id}: {e}",
|
||||
),
|
||||
| _ => {},
|
||||
}
|
||||
}
|
||||
|
||||
let event_fetch = |event_id| {
|
||||
let origin_server_ts = eventid_info
|
||||
.get(&event_id)
|
||||
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get());
|
||||
|
||||
// This return value is the key used for sorting events,
|
||||
// events are then sorted by power level, time,
|
||||
// and lexically by event_id.
|
||||
future::ok((int!(0), MilliSecondsSinceUnixEpoch(origin_server_ts)))
|
||||
};
|
||||
|
||||
let sorted = state_res::lexicographical_topological_sort(&graph, &event_fetch)
|
||||
.await
|
||||
.map_err(|e| err!(Database(error!("Error sorting prev events: {e}"))))?;
|
||||
|
||||
Ok((sorted, eventid_info))
|
||||
// NOTE because i keep forgetting: the caller persists incoming_pdu.
|
||||
// we only care about its prev events
|
||||
trace!(
|
||||
total_elapsed=?start.elapsed(),
|
||||
persist_elapsed=?job_start.elapsed(),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,34 +1,43 @@
|
||||
use std::collections::{HashMap, hash_map};
|
||||
|
||||
use conduwuit::{Err, Event, Result, debug, debug_warn, err};
|
||||
use futures::FutureExt;
|
||||
use ruma::{
|
||||
EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids,
|
||||
events::StateEventType,
|
||||
use std::{
|
||||
cmp::max,
|
||||
collections::{HashMap, HashSet, hash_map},
|
||||
hash::{BuildHasherDefault, DefaultHasher},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use crate::rooms::short::ShortStateKey;
|
||||
use conduwuit::{
|
||||
Err, Event, PduEvent, Result, debug, debug_warn, err, info, trace,
|
||||
utils::{BoolExt, IterStream},
|
||||
warn,
|
||||
};
|
||||
use futures::{StreamExt, TryFutureExt, future::select_ok};
|
||||
use ruma::{
|
||||
EventId, OwnedEventId, OwnedRoomId, RoomId, ServerName,
|
||||
api::federation::event::{get_room_state, get_room_state_ids},
|
||||
};
|
||||
|
||||
use crate::{conduwuit::utils::stream::BroadbandExt, rooms::short::ShortStateKey};
|
||||
|
||||
impl super::Service {
|
||||
/// Call /state_ids to find out what the state at this pdu is. We trust the
|
||||
/// server's response to some extent (sic), but we still do a lot of checks
|
||||
/// on the events
|
||||
#[tracing::instrument(
|
||||
level = "debug",
|
||||
skip_all,
|
||||
fields(%origin),
|
||||
)]
|
||||
pub(super) async fn fetch_state<Pdu>(
|
||||
/// Asks a remote server what the state at this event is.
|
||||
/// It first attempts to call `GET /_matrix/federation/v1/state_ids` (fast).
|
||||
/// If any events are missing, they are fetched from the remote, and
|
||||
/// persisted as outliers, before being returned back to this function. If
|
||||
/// we are missing a lot of events locally (>=50), this function falls back
|
||||
/// to requesting the full state in PDU format from the remote (`GET
|
||||
/// /_matrix/federation/v1/state, very slow in large rooms), and persists
|
||||
/// them directly.
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(super) async fn fetch_state(
|
||||
&self,
|
||||
origin: &ServerName,
|
||||
create_event: &Pdu,
|
||||
create_event: &PduEvent,
|
||||
room_id: &RoomId,
|
||||
event_id: &EventId,
|
||||
) -> Result<Option<HashMap<u64, OwnedEventId>>>
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
{
|
||||
let res = self
|
||||
) -> Result<HashMap<u64, OwnedEventId>> {
|
||||
let start = Instant::now();
|
||||
trace!(%origin, "Asking remote for state_ids");
|
||||
let res: get_room_state_ids::v1::Response = match self
|
||||
.services
|
||||
.sending
|
||||
.send_federation_request(
|
||||
@@ -36,21 +45,189 @@ pub(super) async fn fetch_state<Pdu>(
|
||||
get_room_state_ids::v1::Request::new(event_id.to_owned(), room_id.to_owned()),
|
||||
)
|
||||
.await
|
||||
.inspect_err(|e| debug_warn!("Fetching state for event failed: {e}"))?;
|
||||
.inspect_err(
|
||||
|e| debug_warn!(elapsed=?start.elapsed(), "Fetching state for event failed: {e}"),
|
||||
) {
|
||||
| Ok(resp) => Ok(resp),
|
||||
| Err(e) =>
|
||||
if e.is_not_found() {
|
||||
self.fetch_state_ids_from_backfill_servers(
|
||||
event_id.to_owned(),
|
||||
room_id.to_owned(),
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
Err(e)
|
||||
},
|
||||
}?;
|
||||
|
||||
debug!("Fetching state events");
|
||||
let state_ids = res.pdu_ids.iter().map(AsRef::as_ref);
|
||||
let state_vec = self
|
||||
.fetch_and_handle_outliers(origin, state_ids, create_event, room_id)
|
||||
.boxed()
|
||||
debug!(elapsed=?start.elapsed(), events = res.pdu_ids.len(), "Fetching state events");
|
||||
let mut state_events: HashMap<OwnedEventId, PduEvent> =
|
||||
HashMap::with_capacity(res.pdu_ids.len());
|
||||
let to_fetch: Vec<OwnedEventId> = res
|
||||
.pdu_ids
|
||||
.clone()
|
||||
.into_iter()
|
||||
.stream()
|
||||
.broad_filter_map(|event_id| async move {
|
||||
self.services
|
||||
.timeline
|
||||
.pdu_exists(&event_id)
|
||||
.await
|
||||
.or_some(event_id)
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
if to_fetch.is_empty() {
|
||||
debug!(elapsed=?start.elapsed(), "All required state events are already known.");
|
||||
state_events = res
|
||||
.pdu_ids
|
||||
.iter()
|
||||
.stream()
|
||||
.broad_filter_map(|event_id| async move {
|
||||
Some((
|
||||
event_id.clone(),
|
||||
self.services
|
||||
.timeline
|
||||
.get_pdu(event_id)
|
||||
.await
|
||||
.expect("Event disappeared between filtering and fetching"),
|
||||
))
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
assert_eq!(
|
||||
state_events.len(),
|
||||
res.pdu_ids.len(),
|
||||
"Failed to load all required state events despite allegedly knowing all of them \
|
||||
already",
|
||||
);
|
||||
} else {
|
||||
let total_count = res.pdu_ids.len();
|
||||
let missing_count = to_fetch.len();
|
||||
let missing_threshold = max(50, total_count >> 2);
|
||||
if missing_count >= missing_threshold {
|
||||
// If there's more than 50 events to fetch, or we're missing 25% or more of the
|
||||
// state, we would need to make a lot of atomic requests, so we'll just try
|
||||
// to fetch the full state from the remote instead.
|
||||
// Since this endpoint might fail in huge rooms, we fall back to atomic fetch
|
||||
// anyway.
|
||||
warn!(
|
||||
elapsed=?start.elapsed(),
|
||||
%missing_count,
|
||||
%total_count,
|
||||
%missing_threshold,
|
||||
"Fetching full state from remote server for event"
|
||||
);
|
||||
let state_response = tokio::time::timeout(
|
||||
Duration::from_secs(30),
|
||||
self.fetch_full_state(origin, create_event, room_id, event_id),
|
||||
)
|
||||
.await;
|
||||
info!(
|
||||
elapsed=?start.elapsed(),
|
||||
%missing_count,
|
||||
%total_count,
|
||||
%missing_threshold,
|
||||
"Fetched full state from remote server for event"
|
||||
);
|
||||
let fetched_state = match state_response {
|
||||
| Ok(Ok(state)) => {
|
||||
// Filter to ensure we only use the PDUs we were expecting, preventing
|
||||
// arbitrary state injection.
|
||||
// Atomic fetch does not have this problem as each PDU is evaluated
|
||||
// individually.
|
||||
let expected: &HashSet<OwnedEventId, BuildHasherDefault<DefaultHasher>> =
|
||||
&HashSet::from_iter(res.pdu_ids.clone());
|
||||
state
|
||||
.into_iter()
|
||||
.stream()
|
||||
.broad_filter_map(|(event_id, pdu)| async move {
|
||||
expected.contains(&event_id).then_some((event_id, pdu))
|
||||
})
|
||||
.collect()
|
||||
.await
|
||||
},
|
||||
| Ok(Err(e)) => {
|
||||
warn!(
|
||||
elapsed=?start.elapsed(),
|
||||
error=?e,
|
||||
%origin,
|
||||
"Failed to fetch full state from remote, falling back to atomic fetch"
|
||||
);
|
||||
self.fetch_and_handle_auth_events(
|
||||
origin,
|
||||
res.pdu_ids.clone(),
|
||||
create_event,
|
||||
room_id,
|
||||
)
|
||||
.await
|
||||
},
|
||||
| Err(e) => {
|
||||
warn!(
|
||||
elapsed=?start.elapsed(),
|
||||
error=?e,
|
||||
%origin,
|
||||
"Remote did not return room state in an acceptable timeframe, falling back to atomic fetch"
|
||||
);
|
||||
self.fetch_and_handle_auth_events(
|
||||
origin,
|
||||
res.pdu_ids.clone(),
|
||||
create_event,
|
||||
room_id,
|
||||
)
|
||||
.await
|
||||
},
|
||||
};
|
||||
|
||||
assert!(
|
||||
!fetched_state.is_empty(),
|
||||
"fetch_full_state or fetch_and_handle_missing_events returned empty state \
|
||||
map"
|
||||
);
|
||||
state_events.extend(fetched_state);
|
||||
} else {
|
||||
state_events = res
|
||||
.pdu_ids
|
||||
.iter()
|
||||
.stream()
|
||||
.broad_filter_map(|event_id| async move {
|
||||
self.services
|
||||
.timeline
|
||||
.get_pdu(event_id)
|
||||
.await
|
||||
.map(|p| (event_id.to_owned(), p))
|
||||
.ok()
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
assert!(
|
||||
!state_events.is_empty(),
|
||||
"Only missing {} events but read-ahead state vec was empty",
|
||||
to_fetch.len()
|
||||
);
|
||||
debug!(
|
||||
elapsed=?start.elapsed(),
|
||||
to_fetch = to_fetch.len(),
|
||||
"Fetching missing events for state from remote"
|
||||
);
|
||||
let fetched_state = self
|
||||
.fetch_and_handle_auth_events(origin, to_fetch, create_event, room_id)
|
||||
.await;
|
||||
state_events.extend(fetched_state);
|
||||
}
|
||||
}
|
||||
if state_events.is_empty() {
|
||||
return Ok(HashMap::new());
|
||||
}
|
||||
|
||||
let mut state: HashMap<ShortStateKey, OwnedEventId> =
|
||||
HashMap::with_capacity(state_vec.len());
|
||||
for (pdu, _) in state_vec {
|
||||
let state_key = pdu
|
||||
.state_key()
|
||||
.ok_or_else(|| err!(Database("Found non-state pdu in state events.")))?;
|
||||
HashMap::with_capacity(state_events.len());
|
||||
debug!(elapsed=?start.elapsed(), events = state_events.len(), "Processing state events");
|
||||
for (event_id, pdu) in state_events {
|
||||
let state_key = pdu.state_key().ok_or_else(|| {
|
||||
err!(Request(BadJson("Found non-state pdu in state events: {event_id}")))
|
||||
})?;
|
||||
|
||||
let shortstatekey = self
|
||||
.services
|
||||
@@ -62,28 +239,154 @@ pub(super) async fn fetch_state<Pdu>(
|
||||
| hash_map::Entry::Vacant(v) => {
|
||||
v.insert(pdu.event_id().to_owned());
|
||||
},
|
||||
| hash_map::Entry::Occupied(_) => {
|
||||
return Err!(Database(
|
||||
"State event's type and state_key combination exists multiple times: \
|
||||
{}, {}",
|
||||
| hash_map::Entry::Occupied(existing) => {
|
||||
return Err!(Request(Forbidden(
|
||||
"State event's type and state_key combination exists multiple times \
|
||||
({event_id} + {}): ({}, \"{}\")",
|
||||
existing.get(),
|
||||
pdu.kind(),
|
||||
state_key
|
||||
));
|
||||
state_key,
|
||||
)));
|
||||
},
|
||||
}
|
||||
}
|
||||
trace!(elapsed=?start.elapsed(), "fetch_state finished");
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
// The original create event must still be in the state
|
||||
let create_shortstatekey = self
|
||||
async fn fetch_state_ids_from_backfill_servers(
|
||||
&self,
|
||||
event_id: OwnedEventId,
|
||||
room_id: OwnedRoomId,
|
||||
) -> Result<get_room_state_ids::v1::Response> {
|
||||
let candidates = self
|
||||
.services
|
||||
.short
|
||||
.get_shortstatekey(&StateEventType::RoomCreate, "")
|
||||
.await?;
|
||||
|
||||
if state.get(&create_shortstatekey).map(AsRef::as_ref) != Some(create_event.event_id()) {
|
||||
return Err!(Database("Incoming event refers to wrong create event."));
|
||||
.timeline
|
||||
.candidate_backfill_servers(&room_id)
|
||||
.await;
|
||||
if candidates.is_empty() {
|
||||
return Err!(Request(NotFound(
|
||||
"Cannot ask any other servers for the state at this event"
|
||||
)));
|
||||
}
|
||||
debug!(%room_id, ?candidates, "Asking backfill servers for state_ids");
|
||||
let futures = candidates.iter().map(|server_name| {
|
||||
Box::pin(
|
||||
self.services
|
||||
.sending
|
||||
.send_federation_request(
|
||||
server_name,
|
||||
get_room_state_ids::v1::Request::new(event_id.clone(), room_id.clone()),
|
||||
)
|
||||
.inspect_err(|e| {
|
||||
debug_warn!("Fallback fetching state for event failed: {e}");
|
||||
}),
|
||||
)
|
||||
});
|
||||
Ok(select_ok(futures).await?.0)
|
||||
}
|
||||
|
||||
Ok(Some(state))
|
||||
/// Fetches the full state via `GET /_matrix/federation/v1/state` from a
|
||||
/// remote server, and persists all the incoming auth chain events and
|
||||
/// state events as outliers, for use later.
|
||||
///
|
||||
/// Any events that cannot be persisted are dropped with a warning.
|
||||
pub(super) async fn fetch_full_state(
|
||||
&self,
|
||||
origin: &ServerName,
|
||||
create_event: &PduEvent,
|
||||
room_id: &RoomId,
|
||||
event_id: &EventId,
|
||||
) -> Result<HashMap<OwnedEventId, PduEvent>> {
|
||||
let start = Instant::now();
|
||||
trace!("Fetching full state from remote server");
|
||||
let res: get_room_state::v1::Response = self
|
||||
.services
|
||||
.sending
|
||||
.send_federation_request(
|
||||
origin,
|
||||
get_room_state::v1::Request::new(event_id.to_owned(), room_id.to_owned()),
|
||||
)
|
||||
.await
|
||||
.inspect_err(|e| debug_warn!("Fetching state for event failed: {e}"))?;
|
||||
debug!(elapsed=?start.elapsed(), count = res.auth_chain.len(), "Handling incoming auth chain...");
|
||||
res.auth_chain
|
||||
.iter()
|
||||
.stream()
|
||||
.broad_filter_map(|raw_event_json| async {
|
||||
if let Some(parsed) = self.parse_incoming_pdu(raw_event_json).await.ok()
|
||||
&& parsed.0 == room_id
|
||||
{
|
||||
Some(parsed)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.for_each_concurrent(
|
||||
None,
|
||||
|(incoming_room_id, incoming_event_id, incoming_event_json)| async move {
|
||||
self.handle_outlier_pdu(
|
||||
origin,
|
||||
create_event,
|
||||
&incoming_event_id,
|
||||
&incoming_room_id,
|
||||
incoming_event_json,
|
||||
)
|
||||
.await
|
||||
.inspect_err(|e| {
|
||||
warn!(
|
||||
%incoming_room_id,
|
||||
%incoming_event_id,
|
||||
?e,
|
||||
"Failed to handle auth chain event from state fetch"
|
||||
);
|
||||
})
|
||||
.ok();
|
||||
},
|
||||
)
|
||||
.await;
|
||||
debug!(elapsed=?start.elapsed(), count = res.pdus.len(), "Handling incoming state PDUs...");
|
||||
let r = res
|
||||
.pdus
|
||||
.iter()
|
||||
.stream()
|
||||
.broad_filter_map(|raw_event_json| async {
|
||||
if let Some(parsed) = self.parse_incoming_pdu(raw_event_json).await.ok()
|
||||
&& parsed.0 == room_id
|
||||
{
|
||||
Some(parsed)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.broad_filter_map(
|
||||
|(incoming_room_id, incoming_event_id, incoming_event_json)| async move {
|
||||
self.handle_outlier_pdu(
|
||||
origin,
|
||||
create_event,
|
||||
&incoming_event_id,
|
||||
&incoming_room_id,
|
||||
incoming_event_json,
|
||||
)
|
||||
.await
|
||||
.inspect_err(|e| {
|
||||
warn!(
|
||||
elapsed=?start.elapsed(),
|
||||
%incoming_room_id,
|
||||
%incoming_event_id,
|
||||
?e,
|
||||
"Failed to handle state event from state fetch"
|
||||
);
|
||||
})
|
||||
.ok()
|
||||
},
|
||||
)
|
||||
.fold(HashMap::new(), |mut acc, (event, _)| async move {
|
||||
acc.insert(event.event_id().to_owned(), event);
|
||||
acc
|
||||
})
|
||||
.await;
|
||||
trace!(elapsed=?start.elapsed(), "fetch_full_state finished");
|
||||
Ok(r)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, hash_map},
|
||||
time::Instant,
|
||||
collections::BTreeMap,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, debug_error, debug_info, defer, err,
|
||||
info, trace, utils::stream::IterStream, warn,
|
||||
Err, Event, PduEvent, Result, debug, debug_error, debug_info, debug_warn, defer, err, error,
|
||||
info, matrix::PartialPdu, result::DebugInspect, trace, warn,
|
||||
};
|
||||
use futures::{
|
||||
FutureExt, TryFutureExt, TryStreamExt,
|
||||
FutureExt, StreamExt,
|
||||
future::{OptionFuture, try_join4},
|
||||
};
|
||||
use ruma::{
|
||||
@@ -18,7 +18,7 @@
|
||||
room::member::{MembershipState, RoomMemberEventContent},
|
||||
},
|
||||
};
|
||||
use tracing::debug;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::rooms::timeline::{RawPduId, pdu_fits};
|
||||
|
||||
@@ -111,7 +111,6 @@ impl super::Service {
|
||||
/// room, if not soft fail it
|
||||
#[tracing::instrument(
|
||||
name = "pdu",
|
||||
level = INFO_SPAN_LEVEL,
|
||||
skip_all,
|
||||
fields(%room_id, %event_id),
|
||||
)]
|
||||
@@ -153,9 +152,7 @@ pub async fn handle_incoming_pdu<'a>(
|
||||
.and_then(|v| v.as_str())
|
||||
.ok_or_else(|| err!("No sender in object"))
|
||||
.and_then(|v| Ok(UserId::parse(v)?))
|
||||
.map_err(|e| {
|
||||
err!(Request(InvalidParam("PDU does not have a valid sender key: {e}")))
|
||||
})?;
|
||||
.map_err(|e| err!(Request(BadJson("PDU does not have a valid sender key: {e}"))))?;
|
||||
|
||||
let sender_acl_check: OptionFuture<_> = sender
|
||||
.server_name()
|
||||
@@ -235,7 +232,7 @@ pub async fn handle_incoming_pdu<'a>(
|
||||
}}
|
||||
|
||||
let (incoming_pdu, val) = self
|
||||
.handle_outlier_pdu(origin, create_event, event_id, room_id, value, false)
|
||||
.handle_outlier_pdu(origin, create_event, event_id, room_id, value)
|
||||
.await?;
|
||||
|
||||
// 8. if not timeline event: stop
|
||||
@@ -243,71 +240,192 @@ pub async fn handle_incoming_pdu<'a>(
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Skip old events
|
||||
// Skip events sent before we joined (they need to be persisted as backfilled
|
||||
// events, not timeline events, which is handled elsewhere).
|
||||
let first_ts_in_room = self
|
||||
.services
|
||||
.timeline
|
||||
.first_pdu_in_room(room_id)
|
||||
.await?
|
||||
.origin_server_ts();
|
||||
if incoming_pdu.origin_server_ts() < first_ts_in_room {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// 9. Fetch any missing prev events doing all checks listed here starting at 1.
|
||||
// These are timeline events
|
||||
let (sorted_prev_events, mut eventid_info) = self
|
||||
.fetch_prev(
|
||||
origin,
|
||||
create_event,
|
||||
room_id,
|
||||
first_ts_in_room,
|
||||
incoming_pdu.prev_events(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
debug!(
|
||||
events = ?sorted_prev_events,
|
||||
"Handling previous events"
|
||||
);
|
||||
debug!("Fetching and persisting any missing prev events");
|
||||
Box::pin(self.fetch_prevs(
|
||||
room_id,
|
||||
create_event,
|
||||
&incoming_pdu,
|
||||
origin,
|
||||
first_ts_in_room,
|
||||
))
|
||||
.await
|
||||
.debug_inspect_err(|e| {
|
||||
error!("Failed to fetch and persist incoming event's prev_events: {e:?}");
|
||||
})?;
|
||||
|
||||
sorted_prev_events
|
||||
.iter()
|
||||
.try_stream()
|
||||
.map_ok(AsRef::as_ref)
|
||||
.try_for_each(|prev_id| {
|
||||
self.handle_prev_pdu(
|
||||
origin,
|
||||
event_id,
|
||||
room_id,
|
||||
eventid_info.remove(prev_id),
|
||||
create_event,
|
||||
first_ts_in_room,
|
||||
prev_id,
|
||||
)
|
||||
.inspect_err(move |e| {
|
||||
warn!("Prev {prev_id} failed: {e}");
|
||||
match self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.write()
|
||||
.entry(prev_id.into())
|
||||
{
|
||||
| hash_map::Entry::Vacant(e) => {
|
||||
e.insert((Instant::now(), 1));
|
||||
},
|
||||
| hash_map::Entry::Occupied(mut e) => {
|
||||
let tries = e.get().1.saturating_add(1);
|
||||
*e.get_mut() = (Instant::now(), tries);
|
||||
},
|
||||
}
|
||||
})
|
||||
.map(|_| self.services.server.check_running())
|
||||
})
|
||||
.boxed()
|
||||
.await?;
|
||||
let is_dummy_event = incoming_pdu.event_type().to_string() == "org.matrix.dummy_event";
|
||||
|
||||
// Done with prev events, now handling the incoming event
|
||||
self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
|
||||
.boxed()
|
||||
.await
|
||||
let pdu_id = Box::pin(self.upgrade_outlier_to_timeline_pdu(
|
||||
incoming_pdu,
|
||||
val,
|
||||
create_event,
|
||||
origin,
|
||||
room_id,
|
||||
))
|
||||
.await?;
|
||||
|
||||
let extremities_count = self
|
||||
.services
|
||||
.state
|
||||
.get_forward_extremities(room_id)
|
||||
.count()
|
||||
.await;
|
||||
|
||||
self.maybe_squash_extremities(room_id, extremities_count, is_dummy_event)
|
||||
.await;
|
||||
|
||||
Ok(pdu_id)
|
||||
}
|
||||
|
||||
async fn maybe_squash_extremities(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
extremities_count: usize,
|
||||
is_dummy_event: bool,
|
||||
) {
|
||||
let (tx, fut) = {
|
||||
if let Some(tx) = self.extremity_squashers.read().get(room_id)
|
||||
&& !tx.is_closed()
|
||||
{
|
||||
(tx.clone(), None)
|
||||
} else {
|
||||
let mut map = self.extremity_squashers.upgradable_read();
|
||||
|
||||
if let Some(tx) = map.get(room_id)
|
||||
&& !tx.is_closed()
|
||||
{
|
||||
(tx.clone(), None)
|
||||
} else {
|
||||
let (tx, rx) = mpsc::channel(100);
|
||||
map.with_upgraded(|map| map.insert(room_id.to_owned(), tx.clone()));
|
||||
|
||||
(tx, Some(self.spawn_squasher(room_id, rx)))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(fut) = fut {
|
||||
fut.await;
|
||||
}
|
||||
let _ = tx.try_send((extremities_count, is_dummy_event));
|
||||
}
|
||||
|
||||
async fn spawn_squasher(&self, room_id: &RoomId, mut rx: mpsc::Receiver<(usize, bool)>) {
|
||||
let Some(service) = self.me.upgrade() else {
|
||||
return;
|
||||
};
|
||||
let room_id = room_id.to_owned();
|
||||
|
||||
self.services.server.runtime().spawn(async move {
|
||||
let mut latest_extremity_count = None;
|
||||
let mut non_dummy_event = false;
|
||||
|
||||
let mut closing = false;
|
||||
|
||||
let waker = tokio::time::sleep(Duration::from_mins(2));
|
||||
tokio::pin!(waker);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
msg = rx.recv() => {
|
||||
if let Some((extremities_count, is_dummy_event)) = msg {
|
||||
latest_extremity_count = Some(extremities_count);
|
||||
non_dummy_event = non_dummy_event || !is_dummy_event;
|
||||
#[allow(clippy::arithmetic_side_effects)]
|
||||
waker.as_mut().reset(tokio::time::Instant::now() + Duration::from_mins(1));
|
||||
} else {
|
||||
{let mut map = service.extremity_squashers.write();
|
||||
if let Some(tx) = map.get(&room_id) && tx.is_closed() {
|
||||
map.remove(&room_id);
|
||||
}}
|
||||
|
||||
if let Some(count) = latest_extremity_count {
|
||||
if non_dummy_event && count >= service.services.server.config.dummy_event_threshold.into() {
|
||||
Self::squash_extremities(&service, &room_id, count).await;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
() = &mut waker, if !closing => {
|
||||
if let Some(count) = latest_extremity_count {
|
||||
if non_dummy_event && count >= service.services.server.config.dummy_event_threshold.into() {
|
||||
Self::squash_extremities(&service, &room_id, count).await;
|
||||
}
|
||||
latest_extremity_count = None;
|
||||
non_dummy_event = false;
|
||||
#[allow(clippy::arithmetic_side_effects)]
|
||||
waker.as_mut().reset(tokio::time::Instant::now() + Duration::from_mins(2));
|
||||
} else {
|
||||
rx.close();
|
||||
closing = true;
|
||||
}
|
||||
}
|
||||
() = service.server_shutdown.notified(), if !closing => {
|
||||
rx.close();
|
||||
closing = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn squash_extremities(&self, room_id: &RoomId, extremities_count: usize) {
|
||||
debug_warn!(
|
||||
%extremities_count,
|
||||
threshold=%self.services.server.config.dummy_event_threshold,
|
||||
"Attempting to squash extremities after upgrading pdu"
|
||||
);
|
||||
// Try to send a dummy event to squash extremities. See issue #1844
|
||||
let power_levels = self
|
||||
.services
|
||||
.state_accessor
|
||||
.get_room_power_levels(room_id)
|
||||
.await;
|
||||
let mut local_users = self.services.state_cache.local_users_in_room(room_id);
|
||||
while let Some(user_id) = local_users.next().await {
|
||||
if !power_levels.user_can_send_message(&user_id, "org.matrix.dummy_event".into()) {
|
||||
trace!(%user_id, "user does not have power level to send dummy event, skipping");
|
||||
continue;
|
||||
}
|
||||
let state_lock = self.services.state.mutex.lock(room_id).await;
|
||||
if self
|
||||
.services
|
||||
.timeline
|
||||
.build_and_append_pdu(
|
||||
PartialPdu {
|
||||
event_type: "org.matrix.dummy_event".into(),
|
||||
..PartialPdu::default()
|
||||
},
|
||||
&user_id,
|
||||
Some(room_id),
|
||||
&state_lock,
|
||||
)
|
||||
.await
|
||||
.inspect(|_| debug!(sender=%user_id, "Successfully sent a dummy event"))
|
||||
.inspect_err(
|
||||
|e| debug!(sender=%user_id, ?e, "Failed to send a dummy event via user"),
|
||||
)
|
||||
.is_ok()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
use std::collections::{BTreeMap, HashMap, hash_map};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, state_res, trace, warn,
|
||||
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, info, state_res, trace,
|
||||
warn,
|
||||
};
|
||||
use futures::future::ready;
|
||||
use ruma::{
|
||||
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
|
||||
api::federation::authorization::get_event_authorization, canonical_json::redact,
|
||||
events::StateEventType,
|
||||
};
|
||||
|
||||
@@ -16,6 +18,7 @@ impl super::Service {
|
||||
/// Handles a PDU as an outlier, performing basic checks like signatures and
|
||||
/// hashes, proclaimed event auth, and then adding it to the outlier tree.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(name="handle_outlier", skip_all, fields(%event_id))]
|
||||
pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
||||
&self,
|
||||
origin: &'a ServerName,
|
||||
@@ -23,7 +26,6 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
||||
event_id: &'a EventId,
|
||||
room_id: &'a RoomId,
|
||||
mut value: CanonicalJsonObject,
|
||||
auth_events_known: bool,
|
||||
) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
@@ -47,27 +49,38 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
||||
.verify_event(&value, &room_version_rules)
|
||||
.await
|
||||
{
|
||||
| Ok(ruma::signatures::Verified::All) => value,
|
||||
| Ok(ruma::signatures::Verified::All) => {
|
||||
if let Ok(pdu_event) = self.services.timeline.get_pdu(event_id).await {
|
||||
debug!(
|
||||
"Already have event {event_id} as an outlier or timeline event, not \
|
||||
re-processing"
|
||||
);
|
||||
value.insert(
|
||||
"event_id".to_owned(),
|
||||
CanonicalJsonValue::String(event_id.as_str().to_owned()),
|
||||
);
|
||||
check_room_id(room_id, &pdu_event)?;
|
||||
return Ok((pdu_event, value));
|
||||
}
|
||||
value
|
||||
},
|
||||
| Ok(ruma::signatures::Verified::Signatures) => {
|
||||
// Redact
|
||||
debug_info!("Calculated hash does not match (redaction): {event_id}");
|
||||
let Ok(obj) =
|
||||
ruma::canonical_json::redact(value, &room_version_rules.redaction, None)
|
||||
else {
|
||||
return Err!(Request(InvalidParam("Redaction failed")));
|
||||
};
|
||||
|
||||
// Skip the PDU if it is redacted and we already have it as an outlier event
|
||||
if self.services.timeline.pdu_exists(event_id).await {
|
||||
return Err!(Request(InvalidParam(
|
||||
"Event was redacted and we already knew about it"
|
||||
)));
|
||||
if let Ok(pdu_event) = self.services.timeline.get_pdu(event_id).await {
|
||||
debug!(
|
||||
"Received a redacted copy of {event_id}, but we already knew about it. \
|
||||
Re-using known content instead."
|
||||
);
|
||||
check_room_id(room_id, &pdu_event)?;
|
||||
let obj = pdu_event.to_canonical_object();
|
||||
return Ok((pdu_event, obj));
|
||||
}
|
||||
|
||||
obj
|
||||
debug_info!("Calculated hash does not match (redaction): {event_id}");
|
||||
redact(value, &room_version_rules.redaction, None)
|
||||
.map_err(|e| err!(Request(BadJson("Failed to redact {event_id}: {e}"))))?
|
||||
},
|
||||
| Err(e) => {
|
||||
return Err!(Request(InvalidParam(debug_error!(
|
||||
return Err!(Request(Forbidden(debug_error!(
|
||||
"Signature verification failed for {event_id}: {e}"
|
||||
))));
|
||||
},
|
||||
@@ -90,67 +103,81 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
||||
// Fetch all auth events
|
||||
let mut auth_events: HashMap<OwnedEventId, PduEvent> = HashMap::new();
|
||||
|
||||
for aid in pdu_event.auth_events() {
|
||||
if self.services.pdu_metadata.is_event_rejected(aid).await {
|
||||
for auth_event_id in pdu_event.auth_events() {
|
||||
if self
|
||||
.services
|
||||
.pdu_metadata
|
||||
.is_event_rejected(auth_event_id)
|
||||
.await
|
||||
{
|
||||
debug_warn!(
|
||||
"Rejecting incoming event {} which depends on rejected auth event {aid}",
|
||||
"Rejecting incoming event {} which depends on rejected auth event \
|
||||
{auth_event_id}",
|
||||
event_id,
|
||||
);
|
||||
self.services.pdu_metadata.mark_event_rejected(event_id);
|
||||
return Err!(Request(InvalidParam("Event has rejected auth event: {aid}")));
|
||||
return Err!(Request(Forbidden(
|
||||
"Event has rejected auth event: {auth_event_id}"
|
||||
)));
|
||||
}
|
||||
|
||||
if let Ok(auth_event) = self.services.timeline.get_pdu(aid).await {
|
||||
if let Ok(auth_event) = self.services.timeline.get_pdu(auth_event_id).await {
|
||||
check_room_id(room_id, &auth_event)?;
|
||||
trace!("Found auth event {aid} for outlier event {event_id} locally");
|
||||
auth_events.insert(aid.to_owned(), auth_event);
|
||||
trace!("Found auth event {auth_event_id} for outlier event {event_id} locally");
|
||||
auth_events.insert(auth_event_id.to_owned(), auth_event);
|
||||
} else {
|
||||
debug_warn!(
|
||||
"Could not find auth event {aid} for outlier event {event_id} locally"
|
||||
"Could not find auth event {auth_event_id} for outlier event {event_id} \
|
||||
locally"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch any missing ones & reject invalid ones
|
||||
let missing_auth_events = if auth_events_known {
|
||||
pdu_event
|
||||
.auth_events()
|
||||
.filter(|id| !auth_events.contains_key(*id))
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
pdu_event.auth_events().collect::<Vec<_>>()
|
||||
};
|
||||
if !missing_auth_events.is_empty() || !auth_events_known {
|
||||
debug_info!(
|
||||
"Fetching {} missing auth events for outlier event {event_id}",
|
||||
missing_auth_events.len()
|
||||
);
|
||||
for (pdu, _) in self
|
||||
.fetch_and_handle_outliers(
|
||||
if auth_events.len() != pdu_event.auth_events().count() {
|
||||
info!("Missing some auth events, asking remote for auth chain");
|
||||
let response: get_event_authorization::v1::Response = self
|
||||
.services
|
||||
.sending
|
||||
.send_federation_request(
|
||||
origin,
|
||||
missing_auth_events.iter().copied(),
|
||||
create_event,
|
||||
room_id,
|
||||
get_event_authorization::v1::Request::new(
|
||||
room_id.to_owned(),
|
||||
event_id.to_owned(),
|
||||
),
|
||||
)
|
||||
.await
|
||||
{
|
||||
auth_events.insert(pdu.event_id().to_owned(), pdu);
|
||||
.map_err(|e| {
|
||||
err!(Request(Forbidden(
|
||||
"Remote server is not divulging incoming event's auth chain: {e}"
|
||||
)))
|
||||
})?;
|
||||
let mut auth_chain_map = HashMap::with_capacity(response.auth_chain.len());
|
||||
for auth_pdu_json in response.auth_chain {
|
||||
let (auth_event_room_id, auth_event_id, auth_pdu_json) =
|
||||
self.parse_incoming_pdu(&auth_pdu_json).await?;
|
||||
if auth_event_room_id != room_id {
|
||||
return Err!(Request(Forbidden(
|
||||
"Auth event {auth_event_id} is in {auth_event_room_id}, not {room_id}."
|
||||
)));
|
||||
}
|
||||
let auth_pdu = PduEvent::from_id_val(&auth_event_id, auth_pdu_json)
|
||||
.map_err(|e| err!(Request(BadJson("Invalid PDU {auth_event_id}: {e}"))))?;
|
||||
auth_chain_map.insert(auth_event_id, auth_pdu);
|
||||
}
|
||||
for auth_event_id in pdu_event.auth_events() {
|
||||
if auth_events.contains_key(auth_event_id) {
|
||||
continue;
|
||||
}
|
||||
if let Some(auth_event) = auth_chain_map.get(auth_event_id) {
|
||||
auth_events.insert(auth_event_id.to_owned(), auth_event.clone());
|
||||
} else {
|
||||
return Err!(Request(Forbidden(
|
||||
"Remote server is not divulging incoming event's auth events (missing: \
|
||||
{auth_event_id})"
|
||||
)));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!("No missing auth events for outlier event {event_id}");
|
||||
}
|
||||
// reject if we are still missing some
|
||||
let still_missing = pdu_event
|
||||
.auth_events()
|
||||
.filter(|id| !auth_events.contains_key(*id))
|
||||
.collect::<Vec<_>>();
|
||||
if !still_missing.is_empty() {
|
||||
// Don't reject: this could be a temporary condition
|
||||
// TODO: use get_missing_events?
|
||||
return Err!(Request(InvalidParam(
|
||||
"Could not fetch all auth events for outlier event {event_id}, still missing: \
|
||||
{still_missing:?}"
|
||||
)));
|
||||
}
|
||||
|
||||
// 6. Reject "due to auth events" if the event doesn't pass auth based on the
|
||||
@@ -181,7 +208,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
||||
.outlier
|
||||
.add_pdu_outlier(pdu_event.event_id(), &incoming_pdu);
|
||||
self.services.pdu_metadata.mark_event_rejected(event_id);
|
||||
return Err!(Request(InvalidParam(
|
||||
return Err!(Request(Forbidden(
|
||||
"Auth event's type and state_key combination exists multiple times: {}, \
|
||||
{}",
|
||||
auth_event.kind,
|
||||
@@ -191,18 +218,6 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
||||
}
|
||||
}
|
||||
|
||||
// The original create event must be in the auth events
|
||||
if !matches!(
|
||||
auth_events_by_key.get(&(StateEventType::RoomCreate, String::new().into())),
|
||||
Some(_) | None
|
||||
) {
|
||||
self.services.pdu_metadata.mark_event_rejected(event_id);
|
||||
self.services
|
||||
.outlier
|
||||
.add_pdu_outlier(pdu_event.event_id(), &incoming_pdu);
|
||||
return Err!(Request(InvalidParam("Incoming event refers to wrong create event.")));
|
||||
}
|
||||
|
||||
let state_fetch = |ty: &StateEventType, sk: &str| {
|
||||
let key = (ty.to_owned(), sk.into());
|
||||
ready(auth_events_by_key.get(&key).map(ToOwned::to_owned))
|
||||
|
||||
@@ -1,95 +0,0 @@
|
||||
use std::{collections::BTreeMap, time::Instant};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, defer,
|
||||
utils::continue_exponential_backoff_secs,
|
||||
};
|
||||
use ruma::{CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName};
|
||||
use tracing::debug;
|
||||
|
||||
impl super::Service {
|
||||
#[allow(clippy::type_complexity)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(
|
||||
name = "prev",
|
||||
level = INFO_SPAN_LEVEL,
|
||||
skip_all,
|
||||
fields(%prev_id),
|
||||
)]
|
||||
pub(super) async fn handle_prev_pdu<'a, Pdu>(
|
||||
&self,
|
||||
origin: &'a ServerName,
|
||||
event_id: &'a EventId,
|
||||
room_id: &'a RoomId,
|
||||
eventid_info: Option<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
|
||||
create_event: &'a Pdu,
|
||||
first_ts_in_room: MilliSecondsSinceUnixEpoch,
|
||||
prev_id: &'a EventId,
|
||||
) -> Result
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
{
|
||||
// Check for disabled again because it might have changed
|
||||
if self.services.metadata.is_disabled(room_id).await {
|
||||
return Err!(Request(Forbidden(debug_warn!(
|
||||
"Federaton of room {room_id} is currently disabled on this server. Request by \
|
||||
origin {origin} and event ID {event_id}"
|
||||
))));
|
||||
}
|
||||
|
||||
if let Some((time, tries)) = self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.get(prev_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 5 * 60;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 24;
|
||||
if continue_exponential_backoff_secs(
|
||||
MIN_DURATION,
|
||||
MAX_DURATION,
|
||||
time.elapsed(),
|
||||
*tries,
|
||||
) {
|
||||
debug!(
|
||||
?tries,
|
||||
duration = ?time.elapsed(),
|
||||
"Backing off from prev_event"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
let Some((pdu, json)) = eventid_info else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Skip old events
|
||||
if pdu.origin_server_ts() < first_ts_in_room {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let start_time = Instant::now();
|
||||
self.federation_handletime
|
||||
.write()
|
||||
.insert(room_id.into(), ((*prev_id).to_owned(), start_time));
|
||||
|
||||
defer! {{
|
||||
self.federation_handletime
|
||||
.write()
|
||||
.remove(room_id);
|
||||
}};
|
||||
|
||||
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id)
|
||||
.await?;
|
||||
|
||||
debug!(
|
||||
elapsed = ?start_time.elapsed(),
|
||||
"Handled prev_event",
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,6 @@
|
||||
mod fetch_state;
|
||||
mod handle_incoming_pdu;
|
||||
mod handle_outlier_pdu;
|
||||
mod handle_prev_pdu;
|
||||
mod parse_incoming_pdu;
|
||||
mod policy_server;
|
||||
mod resolve_state;
|
||||
@@ -15,19 +14,21 @@
|
||||
|
||||
use async_trait::async_trait;
|
||||
use conduwuit::{Err, Event, PduEvent, Result, Server, SyncRwLock, utils::MutexMap};
|
||||
pub use fetch_and_handle_outliers::{GET_MISSING_EVENTS_MAX_BATCH_SIZE, build_local_dag};
|
||||
use ruma::{
|
||||
OwnedEventId, OwnedRoomId, RoomId, events::room::create::RoomCreateEventContent,
|
||||
room_version_rules::RoomVersionRules,
|
||||
};
|
||||
use tokio::sync::Notify;
|
||||
use tokio::sync::{Notify, mpsc};
|
||||
|
||||
use crate::{Dep, globals, rooms, sending, server_keys};
|
||||
|
||||
pub struct Service {
|
||||
pub mutex_federation: RoomMutexMap,
|
||||
pub federation_handletime: SyncRwLock<HandleTimeMap>,
|
||||
pub extremity_squashers: SyncRwLock<HashMap<OwnedRoomId, mpsc::Sender<(usize, bool)>>>,
|
||||
services: Services,
|
||||
server_shutdown: Notify,
|
||||
me: std::sync::Weak<Self>,
|
||||
}
|
||||
|
||||
struct Services {
|
||||
@@ -53,9 +54,11 @@ struct Services {
|
||||
#[async_trait]
|
||||
impl crate::Service for Service {
|
||||
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
Ok(Arc::new(Self {
|
||||
Ok(Arc::new_cyclic(|s| Self {
|
||||
me: s.clone(),
|
||||
mutex_federation: RoomMutexMap::new(),
|
||||
federation_handletime: HandleTimeMap::new().into(),
|
||||
extremity_squashers: SyncRwLock::new(HashMap::new()),
|
||||
services: Services {
|
||||
globals: args.depend::<globals::Service>("globals"),
|
||||
sending: args.depend::<sending::Service>("sending"),
|
||||
@@ -91,6 +94,8 @@ async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result {
|
||||
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
||||
|
||||
fn interrupt(&self) { self.server_shutdown.notify_waiters(); }
|
||||
|
||||
async fn clear_cache(&self) {}
|
||||
}
|
||||
|
||||
impl Service {
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
use itertools::Itertools;
|
||||
use ruma::{
|
||||
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, RoomId,
|
||||
RoomVersionId,
|
||||
RoomVersionId, room_version_rules::RoomVersionRules,
|
||||
};
|
||||
use serde_json::value::RawValue as RawJsonValue;
|
||||
|
||||
@@ -56,7 +56,10 @@ fn extract_room_id(event_type: &str, pdu: &CanonicalJsonObject) -> Result<OwnedR
|
||||
|
||||
/// Parses every entry in an array as an event ID, returning an error if any
|
||||
/// step fails.
|
||||
fn expect_event_id_array(value: &CanonicalJsonObject, field: &str) -> Result<Vec<OwnedEventId>> {
|
||||
pub(super) fn expect_event_id_array(
|
||||
value: &CanonicalJsonObject,
|
||||
field: &str,
|
||||
) -> Result<Vec<OwnedEventId>> {
|
||||
value
|
||||
.get(field)
|
||||
.ok_or_else(|| err!(Request(BadJson("missing field `{field}` on PDU"))))?
|
||||
@@ -102,6 +105,20 @@ pub fn validate_pdu(&self, pdu: &CanonicalJsonObject) -> Result {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn parse_incoming_pdu_with_known_room(
|
||||
&self,
|
||||
pdu: &RawJsonValue,
|
||||
room_version_rules: &RoomVersionRules,
|
||||
) -> Result<(OwnedEventId, CanonicalJsonObject)> {
|
||||
let (event_id, value) =
|
||||
gen_event_id_canonical_json(pdu, room_version_rules).map_err(|e| {
|
||||
err!(Request(InvalidParam("Could not convert event to canonical json: {e}")))
|
||||
})?;
|
||||
self.validate_pdu(&value)?;
|
||||
Ok((event_id, value))
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "parse", skip_all)]
|
||||
pub async fn parse_incoming_pdu(&self, pdu: &RawJsonValue) -> Result<Parsed> {
|
||||
let value = serde_json::from_str::<CanonicalJsonObject>(pdu.get()).map_err(|e| {
|
||||
err!(BadServerResponse(debug_warn!("Error parsing incoming event {e:?}")))
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
};
|
||||
|
||||
use conduwuit::{
|
||||
Result, debug, err, error,
|
||||
Result, debug, debug_error, err, error,
|
||||
matrix::{Event, StateMap},
|
||||
trace,
|
||||
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt},
|
||||
@@ -37,6 +37,7 @@ pub(super) async fn state_at_incoming_degree_one<Pdu>(
|
||||
.pdu_shortstatehash(prev_event)
|
||||
.await
|
||||
else {
|
||||
trace!("No shortstatehash for {prev_event}, cannot calculate one-degree state.");
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
@@ -100,6 +101,7 @@ pub(super) async fn state_at_incoming_resolved<Pdu>(
|
||||
.map_ok(move |sstatehash| (sstatehash, prev_event))
|
||||
})
|
||||
.try_collect::<HashMap<_, _>>()
|
||||
.inspect_err(|e| debug_error!("failed to calculate N-degree short state hashes: {e}"))
|
||||
.await
|
||||
else {
|
||||
return Ok(None);
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
use std::{borrow::Borrow, sync::Arc, time::Instant};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Result, debug, debug_info, err, info, is_equal_to,
|
||||
Err, Result, debug, debug_error, debug_info, err, info, is_equal_to,
|
||||
matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res},
|
||||
result::DebugInspect,
|
||||
trace,
|
||||
utils::{
|
||||
IterStream,
|
||||
@@ -23,28 +24,17 @@
|
||||
};
|
||||
|
||||
impl super::Service {
|
||||
pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
||||
#[tracing::instrument(name="upgrade_outlier", skip_all, fields(event_id=%incoming_pdu.event_id()))]
|
||||
pub(super) async fn upgrade_outlier_to_timeline_pdu(
|
||||
&self,
|
||||
incoming_pdu: PduEvent,
|
||||
mut val: CanonicalJsonObject,
|
||||
create_event: &Pdu,
|
||||
create_event: &PduEvent,
|
||||
origin: &ServerName,
|
||||
room_id: &RoomId,
|
||||
) -> Result<Option<RawPduId>>
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
{
|
||||
// Skip the PDU if we already have it as a timeline event
|
||||
if let Ok(pduid) = self
|
||||
.services
|
||||
.timeline
|
||||
.get_pdu_id(incoming_pdu.event_id())
|
||||
.await
|
||||
{
|
||||
return Ok(Some(pduid));
|
||||
}
|
||||
|
||||
let (rejected, soft_failed) = join!(
|
||||
) -> Result<Option<RawPduId>> {
|
||||
let (pduid, rejected, soft_failed) = join!(
|
||||
self.services.timeline.get_pdu_id(incoming_pdu.event_id()),
|
||||
self.services
|
||||
.pdu_metadata
|
||||
.is_event_rejected(incoming_pdu.event_id()),
|
||||
@@ -52,17 +42,27 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
||||
.pdu_metadata
|
||||
.is_event_soft_failed(incoming_pdu.event_id())
|
||||
);
|
||||
if rejected {
|
||||
return Err!(Request(InvalidParam("Event has been rejected")));
|
||||
if let Ok(id) = pduid {
|
||||
trace!(event_id=%incoming_pdu.event_id(), "Skipping upgrade of already upgraded PDU");
|
||||
return Ok(Some(id));
|
||||
} else if rejected {
|
||||
return Err!(Request(Forbidden("Event has been rejected")));
|
||||
} else if soft_failed {
|
||||
return Err!(Request(InvalidParam("Event has been soft-failed")));
|
||||
return Err!(Request(Forbidden("Event has been soft-failed")));
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
*create_event.kind(),
|
||||
StateEventType::RoomCreate.into(),
|
||||
"tried to upgrade a PDU with a create_event that is not a room create event"
|
||||
);
|
||||
|
||||
debug!(
|
||||
event_id = %incoming_pdu.event_id,
|
||||
"Upgrading PDU from outlier to timeline"
|
||||
);
|
||||
let timer = Instant::now();
|
||||
let min_depth = self.services.metadata.get_mindepth(room_id).await;
|
||||
let room_version_rules = get_room_version_rules(create_event)?;
|
||||
|
||||
// 10. Fetch missing state and auth chain events by calling /state_ids at
|
||||
@@ -73,21 +73,34 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
||||
event_id = %incoming_pdu.event_id,
|
||||
"Resolving state at event"
|
||||
);
|
||||
let mut state_at_incoming_event = if incoming_pdu.prev_events().count() == 1 {
|
||||
let state_at_incoming_event = if incoming_pdu.prev_events().count() == 1 {
|
||||
self.state_at_incoming_degree_one(&incoming_pdu).await?
|
||||
} else {
|
||||
self.state_at_incoming_resolved(&incoming_pdu, room_id, &room_version_rules)
|
||||
.await?
|
||||
};
|
||||
let state_at_incoming_event = match state_at_incoming_event {
|
||||
| Some(s) => s,
|
||||
| None => {
|
||||
trace!("Could not calculate incoming state, asking remote {origin} for it");
|
||||
self.fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
|
||||
.await
|
||||
.debug_inspect_err(|e| {
|
||||
debug_error!("Could not fetch state from {origin}: {e}");
|
||||
})?
|
||||
},
|
||||
};
|
||||
|
||||
if state_at_incoming_event.is_none() {
|
||||
state_at_incoming_event = self
|
||||
.fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
|
||||
.await?;
|
||||
if state_at_incoming_event.is_empty()
|
||||
&& *incoming_pdu.event_type() != StateEventType::RoomCreate.into()
|
||||
{
|
||||
// This can happen if the remote sends an event but cannot be reached to fetch
|
||||
// the state at it, and all other servers in the room (which might just be the
|
||||
// unreachable server) are unable to provide required info.
|
||||
// returning an error here allows the upgrade to be attempted at another time.
|
||||
return Err!(Request(Forbidden("Could not resolve incoming state at event")));
|
||||
}
|
||||
|
||||
let state_at_incoming_event =
|
||||
state_at_incoming_event.expect("we always set this to some above");
|
||||
trace!(state_events = state_at_incoming_event.len(), "Calculated incoming state");
|
||||
|
||||
debug!(
|
||||
event_id = %incoming_pdu.event_id,
|
||||
@@ -385,6 +398,12 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
||||
|
||||
// Event has passed all auth/stateres checks
|
||||
drop(state_lock);
|
||||
if incoming_pdu.depth > min_depth && incoming_pdu.state_key().is_some() {
|
||||
self.services
|
||||
.metadata
|
||||
.set_mindepth(room_id, incoming_pdu.depth.into());
|
||||
trace!("Increased room's min depth from {} to {}", min_depth, incoming_pdu.depth);
|
||||
}
|
||||
|
||||
Ok(pdu_id)
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Pdu, Result, Server, debug, debug_info, debug_warn, err, error, info, is_true,
|
||||
Err, Event, Pdu, Result, Server, debug, debug_info, debug_warn, err, error, info, is_true,
|
||||
matrix::{
|
||||
StateKey,
|
||||
event::{gen_event_id, gen_event_id_canonical_json},
|
||||
@@ -34,7 +34,7 @@
|
||||
use crate::{
|
||||
Dep, antispam, globals,
|
||||
rooms::{
|
||||
metadata, outlier, pdu_metadata, short,
|
||||
event_handler, metadata, outlier, pdu_metadata, short,
|
||||
state::{self, RoomMutexGuard},
|
||||
state_accessor, state_cache,
|
||||
state_compressor::{self, CompressedState, HashSetCompressStateEvent},
|
||||
@@ -51,6 +51,7 @@ struct Services {
|
||||
server: Arc<Server>,
|
||||
db: Arc<Database>,
|
||||
antispam: Dep<antispam::Service>,
|
||||
event_handler: Dep<event_handler::Service>,
|
||||
globals: Dep<globals::Service>,
|
||||
metadata: Dep<metadata::Service>,
|
||||
outlier: Dep<outlier::Service>,
|
||||
@@ -73,6 +74,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
server: args.server.clone(),
|
||||
db: args.db.clone(),
|
||||
antispam: args.depend::<antispam::Service>("antispam"),
|
||||
event_handler: args.depend::<event_handler::Service>("rooms::event_handler"),
|
||||
globals: args.depend::<globals::Service>("globals"),
|
||||
metadata: args.depend::<metadata::Service>("rooms::metadata"),
|
||||
outlier: args.depend::<outlier::Service>("rooms::outlier"),
|
||||
@@ -380,8 +382,6 @@ pub async fn join_remote_room(
|
||||
|
||||
// It has enough fields to be called a proper event now
|
||||
let mut join_event = join_event_stub;
|
||||
|
||||
info!("Asking {remote_server} for send_join in room {room_id}");
|
||||
let send_join_request = federation::membership::create_join_event::v2::Request::new(
|
||||
room_id.to_owned(),
|
||||
event_id.clone(),
|
||||
@@ -391,6 +391,18 @@ pub async fn join_remote_room(
|
||||
.await,
|
||||
);
|
||||
|
||||
// NOTE: send_join can take a long time to respond, but from the point of view
|
||||
// of other servers, we may already have finished joining. This means they
|
||||
// sometimes end up sending PDUs to us that we aren't yet ready to accept, and
|
||||
// consequently drop. Holding the mutex over the room while processing mitigates
|
||||
// this.
|
||||
let _room_lock = self
|
||||
.services
|
||||
.event_handler
|
||||
.mutex_federation
|
||||
.lock(room_id.as_str())
|
||||
.await;
|
||||
info!("Asking {remote_server} for send_join in room {room_id}");
|
||||
let send_join_response = match self
|
||||
.services
|
||||
.sending
|
||||
@@ -576,7 +588,13 @@ pub async fn join_remote_room(
|
||||
if !auth_check {
|
||||
return Err!(Request(Forbidden("Auth check failed")));
|
||||
}
|
||||
let resident_before = self
|
||||
.services
|
||||
.state_cache
|
||||
.server_in_room(self.services.globals.server_name(), room_id)
|
||||
.await;
|
||||
|
||||
let cork = self.services.db.cork_and_flush();
|
||||
info!("Compressing state from send_join");
|
||||
let compressed: CompressedState = self
|
||||
.services
|
||||
@@ -625,6 +643,10 @@ pub async fn join_remote_room(
|
||||
room_id,
|
||||
)
|
||||
.await?;
|
||||
self.services
|
||||
.metadata
|
||||
.maybe_set_mindepth(room_id, parsed_join_pdu.depth.into())
|
||||
.await;
|
||||
|
||||
info!("Setting final room state for new room");
|
||||
// We set the room state after inserting the pdu, so that we never have a moment
|
||||
@@ -632,6 +654,23 @@ pub async fn join_remote_room(
|
||||
self.services
|
||||
.state
|
||||
.set_room_state(room_id, statehash_after_join, &state_lock);
|
||||
if !resident_before {
|
||||
// NOTE: We replace local extremities for this room if we were not a resident
|
||||
// before. We might be doing a remote join to satisfy restricted join rules,
|
||||
// so we don't want to do this if we're already a resident. Otherwise, we
|
||||
// want to replace our forward extremities whole-sale in case we were
|
||||
// desynced.
|
||||
info!("Replacing local forward extremities");
|
||||
self.services
|
||||
.state
|
||||
.set_forward_extremities(
|
||||
room_id,
|
||||
std::iter::once(parsed_join_pdu.event_id()),
|
||||
&state_lock,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
drop(cork);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use conduwuit::{Result, utils::stream::TryIgnore};
|
||||
use database::Map;
|
||||
use database::{Deserialized, Map};
|
||||
use futures::{Stream, StreamExt};
|
||||
use ruma::{OwnedRoomId, RoomId};
|
||||
use ruma::{OwnedRoomId, RoomId, UInt, uint};
|
||||
|
||||
use crate::{Dep, rooms};
|
||||
|
||||
@@ -17,6 +17,7 @@ struct Data {
|
||||
bannedroomids: Arc<Map>,
|
||||
roomid_shortroomid: Arc<Map>,
|
||||
pduid_pdu: Arc<Map>,
|
||||
roomid_mindepth: Arc<Map>,
|
||||
}
|
||||
|
||||
struct Services {
|
||||
@@ -31,6 +32,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
bannedroomids: args.db["bannedroomids"].clone(),
|
||||
roomid_shortroomid: args.db["roomid_shortroomid"].clone(),
|
||||
pduid_pdu: args.db["pduid_pdu"].clone(),
|
||||
roomid_mindepth: args.db["roomid_mindepth"].clone(),
|
||||
},
|
||||
services: Services {
|
||||
short: args.depend::<rooms::short::Service>("rooms::short"),
|
||||
@@ -98,4 +100,25 @@ pub async fn is_banned(&self, room_id: &RoomId) -> bool {
|
||||
pub fn list_banned_rooms(&self) -> impl Stream<Item = OwnedRoomId> + Send + '_ {
|
||||
self.db.bannedroomids.keys().ignore_err()
|
||||
}
|
||||
|
||||
pub async fn get_mindepth(&self, room_id: &RoomId) -> UInt {
|
||||
self.db
|
||||
.roomid_mindepth
|
||||
.get(room_id)
|
||||
.await
|
||||
.deserialized::<UInt>()
|
||||
.unwrap_or_else(|_| uint!(0))
|
||||
}
|
||||
|
||||
pub fn set_mindepth(&self, room_id: &RoomId, min_depth: u64) {
|
||||
self.db
|
||||
.roomid_mindepth
|
||||
.put_raw(room_id.as_bytes(), min_depth.to_be_bytes());
|
||||
}
|
||||
|
||||
pub async fn maybe_set_mindepth(&self, room_id: &RoomId, min_depth: u64) {
|
||||
if min_depth > self.get_mindepth(room_id).await.into() {
|
||||
self.set_mindepth(room_id, min_depth);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -371,6 +371,16 @@ pub async fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result<ShortSta
|
||||
.deserialized()
|
||||
}
|
||||
|
||||
pub fn all_forward_extremities(
|
||||
&self,
|
||||
) -> impl Stream<Item = (OwnedRoomId, OwnedEventId)> + Send {
|
||||
self.db
|
||||
.roomid_pduleaves
|
||||
.keys()
|
||||
.map_ok(|(room_id, event_id): (OwnedRoomId, OwnedEventId)| (room_id, event_id))
|
||||
.ignore_err()
|
||||
}
|
||||
|
||||
pub fn get_forward_extremities<'a>(
|
||||
&'a self,
|
||||
room_id: &'a RoomId,
|
||||
|
||||
@@ -224,7 +224,10 @@ pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box<RawJsonValue>) ->
|
||||
|
||||
/// Determines which servers are trusted enough to provide backfill in a
|
||||
/// room.
|
||||
async fn candidate_backfill_servers(&self, room_id: &RoomId) -> HashSet<OwnedServerName> {
|
||||
pub(crate) async fn candidate_backfill_servers(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
) -> HashSet<OwnedServerName> {
|
||||
let mut candidate_backfill_servers = HashSet::new();
|
||||
|
||||
let power_levels = self
|
||||
|
||||
@@ -173,6 +173,10 @@ pub async fn get_non_outlier_pdu_json(
|
||||
self.db.get_non_outlier_pdu_json(event_id).await
|
||||
}
|
||||
|
||||
pub async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> bool {
|
||||
self.db.non_outlier_pdu_exists(event_id).await.is_ok()
|
||||
}
|
||||
|
||||
/// Returns the pdu's id.
|
||||
#[inline]
|
||||
pub async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> {
|
||||
|
||||
Reference in New Issue
Block a user