Compare commits

..

5 Commits

Author SHA1 Message Date
Renovate Bot 7ca435053f chore(deps): update https://github.com/regclient/actions digest to c70ad64 2026-05-11 05:03:13 +00:00
Henry-Hiles 1658b3bf6c chore: Add changelog 2026-05-10 03:48:25 +00:00
Henry-Hiles 088fa3e725 fix: Use stable ID for Mutual Rooms support 2026-05-10 03:48:25 +00:00
Henry-Hiles 4694186c97 fix: Link to community guidelines in CoC file 2026-05-09 16:53:03 -04:00
Renovate Bot a5c61d5137 chore(deps): update pre-commit hook crate-ci/typos to v1.46.1 2026-05-09 05:03:19 +00:00
34 changed files with 650 additions and 1112 deletions
+1 -1
View File
@@ -55,7 +55,7 @@ jobs:
# repositories: continuwuity # repositories: continuwuity
- name: Install regsync - name: Install regsync
uses: https://github.com/regclient/actions/regsync-installer@f3c6d87835906c175eb6ccfc18b348b69bb447e7 # main uses: https://github.com/regclient/actions/regsync-installer@c70ad64367908075211b10dcd2ab9fad4bfa1816 # main
- name: Check what images need mirroring - name: Check what images need mirroring
run: | run: |
+1 -1
View File
@@ -24,7 +24,7 @@ repos:
- id: check-added-large-files - id: check-added-large-files
- repo: https://github.com/crate-ci/typos - repo: https://github.com/crate-ci/typos
rev: v1.46.0 rev: v1.46.1
hooks: hooks:
- id: typos - id: typos
- id: typos - id: typos
+1 -1
View File
@@ -1 +1 @@
Contributors are expected to follow the [Continuwuity Community Guidelines](continuwuity.org/community/guidelines). Contributors are expected to follow the [Continuwuity Community Guidelines](https://continuwuity.org/community/guidelines).
-9
View File
@@ -1,9 +0,0 @@
Implemented event rejection, which should resolve and prevent future netsplits of the kinds observed
within some Continuwuity rooms.
Also resolved several bugs related to both soft-failing events, and event backfilling, which should
improve state resolution stability.
The `!admin debug get-pdu` command was updated to disambiguate event acceptance status, and
`!admin debug show-auth-chain` was added to visually display event auth chains, which may assist
developers in debugging strangely complex events.
Contributed by @nex.
+1
View File
@@ -0,0 +1 @@
Fixed an issue where Continuwuity would only advertise support for the unstable endpoint for Mutual Rooms (MSC2666), despite only supporting the stable endpoint. Contributed by @Henry-Hiles (QuadRadical)
+9 -213
View File
@@ -1,5 +1,5 @@
use std::{ use std::{
collections::{HashMap, HashSet}, collections::HashMap,
fmt::Write, fmt::Write,
iter::once, iter::once,
time::{Instant, SystemTime}, time::{Instant, SystemTime},
@@ -22,7 +22,7 @@
use lettre::message::Mailbox; use lettre::message::Mailbox;
use ruma::{ use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId, UInt, OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId,
api::federation::event::get_room_state, events::AnyStateEvent, serde::Raw, api::federation::event::get_room_state, events::AnyStateEvent, serde::Raw,
}; };
use service::rooms::{ use service::rooms::{
@@ -69,189 +69,6 @@ pub(super) async fn get_auth_chain(&self, event_id: OwnedEventId) -> Result {
self.write_str(&out).await self.write_str(&out).await
} }
#[derive(Clone, Copy, Eq, PartialEq)]
enum NodeStatus {
Normal(bool),
SoftFailed(bool),
Rejected(bool),
}
struct AuthChild {
node_id: String,
event_id: OwnedEventId,
depth: UInt,
ts: UInt,
first_seen: bool,
pdu: Option<PduEvent>,
}
fn render_node(
graph: &mut String,
node_id: &str,
event_id: &EventId,
status: NodeStatus,
) -> Result {
let evt_str = event_id.to_string();
let status_label = match status {
| NodeStatus::Normal(false) => evt_str,
| NodeStatus::Normal(true) => format!("{evt_str} (missing locally)"),
| NodeStatus::SoftFailed(false) => format!("{evt_str} (soft-failed)"),
| NodeStatus::SoftFailed(true) => format!("{evt_str} (soft-failed & missing locally)"),
| NodeStatus::Rejected(false) => format!("{evt_str} (rejected)"),
| NodeStatus::Rejected(true) => format!("{evt_str} (rejected & missing locally)"),
};
writeln!(graph, "{node_id}[\"{}\"]", status_label.as_str())?;
match status {
| NodeStatus::Rejected(_) => writeln!(graph, "class {node_id} rejected;")?,
| NodeStatus::SoftFailed(_) => writeln!(graph, "class {node_id} soft_failed;")?,
| NodeStatus::Normal(_) => {},
}
Ok(())
}
#[admin_command]
pub(super) async fn show_auth_chain(&self, event_id: OwnedEventId) -> Result {
let node_status = async |event_id: &EventId, missing: bool| -> NodeStatus {
if self
.services
.rooms
.pdu_metadata
.is_event_rejected(event_id)
.await
{
NodeStatus::Rejected(missing)
} else if self
.services
.rooms
.pdu_metadata
.is_event_soft_failed(event_id)
.await
{
NodeStatus::SoftFailed(missing)
} else {
NodeStatus::Normal(missing)
}
};
let Ok(root) = self.services.rooms.timeline.get_pdu(&event_id).await else {
return Err!("Event not found.");
};
let mut graph = String::from(
"```mermaid\n%% This is a mermaid graph. You can plug this output into\n\
%% https://mermaid.live/edit to visualise it on-the-fly.\nflowchart TD\n\
classDef rejected fill:#ffe5e5,stroke:#cc0000,stroke-width:2px,color:#000;\n\
classDef soft_failed fill:#fff6cc,stroke:#c9a400,stroke-width:2px,color:#000;\n"
);
let mut node_ids: HashMap<OwnedEventId, String> = HashMap::new();
let mut cached_events: HashMap<OwnedEventId, PduEvent> =
HashMap::from([(event_id.clone(), root.clone())]);
let mut scheduled: HashSet<OwnedEventId> = HashSet::from([event_id.clone()]);
let mut visited: HashSet<OwnedEventId> = HashSet::new();
let mut stack = vec![root];
let mut next_node_id = 0_usize;
let node_id_for = |event_id: &OwnedEventId,
node_ids: &mut HashMap<OwnedEventId, String>,
next_node_id: &mut usize| {
node_ids
.entry(event_id.clone())
.or_insert_with(|| {
let id = format!("n{}", *next_node_id);
*next_node_id = next_node_id.saturating_add(1);
id
})
.clone()
};
while let Some(event) = stack.pop() {
let current_event_id = event.event_id().to_owned();
if !visited.insert(current_event_id.clone()) {
continue;
}
let current_node_id = node_id_for(&current_event_id, &mut node_ids, &mut next_node_id);
let current_status = node_status(&current_event_id, false).await;
render_node(&mut graph, &current_node_id, &current_event_id, current_status)?;
let mut children = Vec::with_capacity(event.auth_events.len());
for auth_event_id in event.auth_events().rev() {
let auth_event_id = auth_event_id.to_owned();
let auth_node_id = node_id_for(&auth_event_id, &mut node_ids, &mut next_node_id);
writeln!(graph, "{current_node_id} --> {auth_node_id}")?;
let first_seen = scheduled.insert(auth_event_id.clone());
let auth_pdu = if let Some(auth_pdu) = cached_events.get(&auth_event_id) {
// NOTE: events might be referenced multiple times (like the create event)
// so this saves some cheeky db lookup time
Some(auth_pdu.clone())
} else if first_seen {
match self.services.rooms.timeline.get_pdu(&auth_event_id).await {
| Ok(auth_event) => {
cached_events.insert(auth_event_id.clone(), auth_event.clone());
Some(auth_event)
},
| Err(_) => None,
}
} else {
None
};
// NOTE: Depth is used as the primary sorting key here, even though it has no
// bearing on state resolution or anything. Timestamp is used as a
// tiebreaker, failing back to lexicographical comparison.
let (depth, ts) = auth_pdu
.as_ref()
.map_or((UInt::MAX, UInt::MAX), |pdu| (pdu.depth, pdu.origin_server_ts));
children.push(AuthChild {
node_id: auth_node_id,
event_id: auth_event_id,
depth,
ts,
first_seen,
pdu: auth_pdu,
});
}
children.sort_by(|a, b| {
a.depth
.cmp(&b.depth)
.then(a.ts.cmp(&b.ts))
.then(a.event_id.as_str().cmp(b.event_id.as_str()))
});
for child in children.into_iter().rev() {
if !child.first_seen {
continue;
}
if let Some(child_pdu) = child.pdu {
// We have this PDU so will want to traverse it.
stack.push(child_pdu);
} else {
// We don't have this PDU locally so we can't traverse its auth events,
// but we can still render it as a node.
render_node(
&mut graph,
&child.node_id,
&child.event_id,
node_status(&child.event_id, true).await,
)?;
}
}
}
graph.push_str("```\n");
self.write_str(&graph).await
}
#[admin_command] #[admin_command]
pub(super) async fn parse_pdu(&self) -> Result { pub(super) async fn parse_pdu(&self) -> Result {
if self.body.len() < 2 if self.body.len() < 2
@@ -294,31 +111,15 @@ pub(super) async fn get_pdu(&self, event_id: OwnedEventId) -> Result {
outlier = true; outlier = true;
pdu_json = self.services.rooms.timeline.get_pdu_json(&event_id).await; pdu_json = self.services.rooms.timeline.get_pdu_json(&event_id).await;
} }
let rejected = self
.services
.rooms
.pdu_metadata
.is_event_rejected(&event_id)
.await;
let soft_failed = self
.services
.rooms
.pdu_metadata
.is_event_soft_failed(&event_id)
.await;
match pdu_json { match pdu_json {
| Err(_) => return Err!("PDU not found locally."), | Err(_) => return Err!("PDU not found locally."),
| Ok(json) => { | Ok(json) => {
let text = serde_json::to_string_pretty(&json)?; let text = serde_json::to_string_pretty(&json)?;
let msg = if rejected { let msg = if outlier {
"Rejected PDU:" "Outlier (Rejected / Soft Failed) PDU found in our database"
} else if soft_failed {
"Soft-failed PDU:"
} else if outlier {
"Outlier PDU:"
} else { } else {
"PDU:" "PDU found in our database"
}; };
write!(self, "{msg}\n```json\n{text}\n```") write!(self, "{msg}\n```json\n{text}\n```")
}, },
@@ -813,10 +614,6 @@ pub(super) async fn force_set_room_state_from_server(
.await; .await;
state.insert(shortstatekey, pdu.event_id.clone()); state.insert(shortstatekey, pdu.event_id.clone());
self.services
.rooms
.pdu_metadata
.clear_pdu_markers(pdu.event_id());
} }
} }
@@ -834,10 +631,6 @@ pub(super) async fn force_set_room_state_from_server(
.rooms .rooms
.outlier .outlier
.add_pdu_outlier(&event_id, &value); .add_pdu_outlier(&event_id, &value);
self.services
.rooms
.pdu_metadata
.clear_pdu_markers(&event_id);
} }
info!("Resolving new room state"); info!("Resolving new room state");
@@ -869,7 +662,10 @@ pub(super) async fn force_set_room_state_from_server(
.force_state(room_id.clone().as_ref(), short_state_hash, added, removed, &state_lock) .force_state(room_id.clone().as_ref(), short_state_hash, added, removed, &state_lock)
.await?; .await?;
info!("Updating joined counts for room"); info!(
"Updating joined counts for room just in case (e.g. we may have found a difference in \
the room's m.room.member state"
);
self.services self.services
.rooms .rooms
.state_cache .state_cache
+1 -10
View File
@@ -17,21 +17,12 @@ pub enum DebugCommand {
message: Vec<String>, message: Vec<String>,
}, },
/// Loads the auth_chain of a PDU, reporting how long it took. /// Get the auth_chain of a PDU
GetAuthChain { GetAuthChain {
/// An event ID (the $ character followed by the base64 reference hash) /// An event ID (the $ character followed by the base64 reference hash)
event_id: OwnedEventId, event_id: OwnedEventId,
}, },
/// Walks & displays the auth_chain of a PDU in a mermaid graph format.
///
/// This is useless to basically anyone but developers, and is also probably
/// slow and memory hungry.
ShowAuthChain {
/// The root event ID to start walking back from.
event_id: OwnedEventId,
},
/// Parse and print a PDU from a JSON /// Parse and print a PDU from a JSON
/// ///
/// The PDU event is only checked for validity and is not added to the /// The PDU event is only checked for validity and is not added to the
-35
View File
@@ -763,41 +763,6 @@ pub(super) async fn force_join_room(
.await .await
} }
#[admin_command]
pub(super) async fn force_join_room_remotely(
&self,
user_id: String,
room_id: OwnedRoomOrAliasId,
via: String,
) -> Result {
let via = ServerName::parse(&via)?;
let user_id = parse_local_user_id(self.services, &user_id)?;
let (room_id, mut servers) = self
.services
.rooms
.alias
.resolve_with_servers(&room_id, None)
.await?;
if servers.contains(&via) {
servers.retain(|n| *n != via);
}
servers.insert(0, via);
assert!(
self.services.globals.user_is_local(&user_id),
"Parsed user_id must be a local user"
);
let state_lock = self.services.rooms.state.mutex.lock(&*room_id).await;
self.services
.rooms
.membership
.join_remote_room(&user_id, &room_id, None, &servers, state_lock)
.await?;
self.write_str(&format!("{user_id} has been joined to {room_id}."))
.await
}
#[admin_command] #[admin_command]
pub(super) async fn force_leave_room( pub(super) async fn force_leave_room(
&self, &self,
-14
View File
@@ -183,20 +183,6 @@ pub enum UserCommand {
room_id: OwnedRoomOrAliasId, room_id: OwnedRoomOrAliasId,
}, },
/// Manually join a local user to a room via a remote server, regardless of
/// our current residency.
ForceJoinRoomRemotely {
/// The user to join
user_id: String,
/// The room to join
room_id: OwnedRoomOrAliasId,
/// The server name to join via.
///
/// This server will always be tried first, however if more are
/// available, they may be tried after.
via: String,
},
/// Manually leave a local user from a room. /// Manually leave a local user from a room.
ForceLeaveRoom { ForceLeaveRoom {
user_id: String, user_id: String,
+1 -1
View File
@@ -187,7 +187,7 @@ pub(crate) async fn change_password_route(
if services.server.config.admin_room_notices { if services.server.config.admin_room_notices {
services services
.admin .admin
.notice(&format!("User {sender_user} changed their password.")) .notice(&format!("User {} changed their password.", &sender_user))
.await; .await;
} }
+4 -1
View File
@@ -537,7 +537,10 @@ pub(crate) async fn create_room_route(
if services.server.config.admin_room_notices { if services.server.config.admin_room_notices {
services services
.admin .admin
.send_text(&format!("{sender_user} made {room_id} public to the room directory")) .send_text(&format!(
"{sender_user} made {} public to the room directory",
&room_id
))
.await; .await;
} }
info!("{sender_user} made {0} public to the room directory", &room_id); info!("{sender_user} made {0} public to the room directory", &room_id);
+1 -1
View File
@@ -46,7 +46,7 @@ pub(crate) async fn get_supported_versions_route(
let unstable_features = BTreeMap::from_iter([ let unstable_features = BTreeMap::from_iter([
("org.matrix.e2e_cross_signing".to_owned(), true), ("org.matrix.e2e_cross_signing".to_owned(), true),
("org.matrix.msc2285.stable".to_owned(), true), /* private read receipts (https://github.com/matrix-org/matrix-spec-proposals/pull/2285) */ ("org.matrix.msc2285.stable".to_owned(), true), /* private read receipts (https://github.com/matrix-org/matrix-spec-proposals/pull/2285) */
("uk.half-shot.msc2666.query_mutual_rooms".to_owned(), true), /* query mutual rooms (https://github.com/matrix-org/matrix-spec-proposals/pull/2666) */ ("uk.half-shot.msc2666.query_mutual_rooms.stable".to_owned(), true), /* query mutual rooms (https://github.com/matrix-org/matrix-spec-proposals/pull/2666) */
("org.matrix.msc2836".to_owned(), true), /* threading/threads (https://github.com/matrix-org/matrix-spec-proposals/pull/2836) */ ("org.matrix.msc2836".to_owned(), true), /* threading/threads (https://github.com/matrix-org/matrix-spec-proposals/pull/2836) */
("org.matrix.msc2946".to_owned(), true), /* spaces/hierarchy summaries (https://github.com/matrix-org/matrix-spec-proposals/pull/2946) */ ("org.matrix.msc2946".to_owned(), true), /* spaces/hierarchy summaries (https://github.com/matrix-org/matrix-spec-proposals/pull/2946) */
("org.matrix.msc3026.busy_presence".to_owned(), true), /* busy presence status (https://github.com/matrix-org/matrix-spec-proposals/pull/3026) */ ("org.matrix.msc3026.busy_presence".to_owned(), true), /* busy presence status (https://github.com/matrix-org/matrix-spec-proposals/pull/3026) */
-1
View File
@@ -381,7 +381,6 @@ async fn handle_room(
.rooms .rooms
.event_handler .event_handler
.handle_incoming_pdu(origin, room_id, &event_id, value, true) .handle_incoming_pdu(origin, room_id, &event_id, value, true)
.boxed()
.await .await
.map(|_| ()); .map(|_| ());
results.push((event_id, result)); results.push((event_id, result));
+1 -1
View File
@@ -260,7 +260,7 @@ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{real_error}") write!(f, "{real_error}")
} }
} else { } else {
write!(f, "Request error: {}", self.0) write!(f, "Request error: {}", &self.0)
} }
} }
} }
+1 -1
View File
@@ -29,7 +29,7 @@ pub fn unstable_features() -> BTreeMap<String, bool> {
BTreeMap::from_iter([ BTreeMap::from_iter([
("org.matrix.e2e_cross_signing".to_owned(), true), ("org.matrix.e2e_cross_signing".to_owned(), true),
("org.matrix.msc2285.stable".to_owned(), true), /* private read receipts (https://github.com/matrix-org/matrix-spec-proposals/pull/2285) */ ("org.matrix.msc2285.stable".to_owned(), true), /* private read receipts (https://github.com/matrix-org/matrix-spec-proposals/pull/2285) */
("uk.half-shot.msc2666.query_mutual_rooms".to_owned(), true), /* query mutual rooms (https://github.com/matrix-org/matrix-spec-proposals/pull/2666) */ ("uk.half-shot.msc2666.query_mutual_rooms.stable".to_owned(), true), /* query mutual rooms (https://github.com/matrix-org/matrix-spec-proposals/pull/2666) */
("org.matrix.msc2836".to_owned(), true), /* threading/threads (https://github.com/matrix-org/matrix-spec-proposals/pull/2836) */ ("org.matrix.msc2836".to_owned(), true), /* threading/threads (https://github.com/matrix-org/matrix-spec-proposals/pull/2836) */
("org.matrix.msc2946".to_owned(), true), /* spaces/hierarchy summaries (https://github.com/matrix-org/matrix-spec-proposals/pull/2946) */ ("org.matrix.msc2946".to_owned(), true), /* spaces/hierarchy summaries (https://github.com/matrix-org/matrix-spec-proposals/pull/2946) */
("org.matrix.msc3026.busy_presence".to_owned(), true), /* busy presence status (https://github.com/matrix-org/matrix-spec-proposals/pull/3026) */ ("org.matrix.msc3026.busy_presence".to_owned(), true), /* busy presence status (https://github.com/matrix-org/matrix-spec-proposals/pull/3026) */
-5
View File
@@ -311,11 +311,6 @@ pub(super) fn open_list(db: &Arc<Engine>, maps: &[Descriptor]) -> Result<Maps> {
key_size_hint: Some(48), key_size_hint: Some(48),
..descriptor::RANDOM_SMALL ..descriptor::RANDOM_SMALL
}, },
Descriptor {
name: "rejectedeventids",
key_size_hint: Some(48),
..descriptor::RANDOM_SMALL
},
Descriptor { Descriptor {
name: "statehash_shortstatehash", name: "statehash_shortstatehash",
val_size_hint: Some(8), val_size_hint: Some(8),
+1 -1
View File
@@ -44,7 +44,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
db, db,
server: args.server.clone(), server: args.server.clone(),
bad_event_ratelimiter: Arc::new(SyncRwLock::new(HashMap::new())), bad_event_ratelimiter: Arc::new(SyncRwLock::new(HashMap::new())),
admin_alias: OwnedRoomAliasId::try_from(format!("#admins:{}", args.server.name)) admin_alias: OwnedRoomAliasId::try_from(format!("#admins:{}", &args.server.name))
.expect("#admins:server_name is valid alias name"), .expect("#admins:server_name is valid alias name"),
server_user: UserId::parse_with_server_name( server_user: UserId::parse_with_server_name(
String::from("conduit"), String::from("conduit"),
+1 -1
View File
@@ -37,7 +37,7 @@ pub struct PasswordReset<'a> {
} }
impl MessageTemplate for PasswordReset<'_> { impl MessageTemplate for PasswordReset<'_> {
fn subject(&self) -> String { format!("Password reset request for {}", self.user_id) } fn subject(&self) -> String { format!("Password reset request for {}", &self.user_id) }
} }
#[derive(Template)] #[derive(Template)]
+1 -1
View File
@@ -48,7 +48,7 @@ pub fn is_valid(&self) -> bool {
impl std::fmt::Display for DatabaseTokenInfo { impl std::fmt::Display for DatabaseTokenInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Token created by {} and used {} times. ", self.creator, self.uses)?; write!(f, "Token created by {} and used {} times. ", &self.creator, self.uses)?;
if let Some(expires) = &self.expires { if let Some(expires) = &self.expires {
write!(f, "{expires}.")?; write!(f, "{expires}.")?;
} else { } else {
+1 -1
View File
@@ -35,7 +35,7 @@ pub struct ValidToken {
impl std::fmt::Display for ValidToken { impl std::fmt::Display for ValidToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "`{}` --- {}", self.token, self.source) write!(f, "`{}` --- {}", self.token, &self.source)
} }
} }
@@ -1,454 +1,233 @@
use std::{ use std::{
cmp::min, collections::{BTreeMap, HashSet, VecDeque, hash_map},
collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map},
time::Instant, time::Instant,
}; };
use conduwuit::{ use conduwuit::{
Err, Event, PduEvent, debug, debug_info, debug_warn, err, Event, PduEvent, debug, debug_warn, implement, matrix::event::gen_event_id_canonical_json,
matrix::event::gen_event_id_canonical_json, state_res::lexicographical_topological_sort,
trace, utils::continue_exponential_backoff_secs, warn, trace, utils::continue_exponential_backoff_secs, warn,
}; };
use futures::StreamExt;
use ruma::{ use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
OwnedRoomId, OwnedServerName, RoomId, ServerName, UInt, api::federation::event::get_event,
api::federation::event::{get_event, get_missing_events},
int,
}; };
use super::get_room_version_rules; use super::get_room_version_rules;
/// Attempts to build a localised directed acyclic graph out of the given PDUs, /// Find the event and auth it. Once the event is validated (steps 1 - 8)
/// returning them in a topologically sorted order. /// it is appended to the outliers Tree.
/// ///
/// This is used to attempt to process PDUs in an order that respects their /// Returns pdu and if we fetched it over federation the raw json.
/// dependencies, however it is ultimately the sender's responsibility to send ///
/// them in a processable order, so this is just a best effort attempt. It does /// a. Look in the main timeline (pduid_pdu tree)
/// not account for power levels or other tie breaks. /// b. Look at outlier pdu tree
pub async fn build_local_dag<S: std::hash::BuildHasher>( /// c. Ask origin server over federation
pdu_map: &HashMap<OwnedEventId, CanonicalJsonObject, S>, /// d. TODO: Ask other servers over federation?
) -> conduwuit::Result<Vec<OwnedEventId>> { #[implement(super::Service)]
debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs"); pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
let mut dag: HashMap<OwnedEventId, HashSet<OwnedEventId>> = &self,
HashMap::with_capacity(pdu_map.len()); origin: &'a ServerName,
let mut id_origin_ts: HashMap<OwnedEventId, _> = HashMap::with_capacity(pdu_map.len()); events: Events,
create_event: &'a Pdu,
for (event_id, value) in pdu_map { room_id: &'a RoomId,
// We already checked that these properties are correct in parse_incoming_pdu, ) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
// so it's safe to unwrap here. where
// We also filter to remove any prev_events that are not in this pdu_map, as we Pdu: Event + Send + Sync,
// need to have at least one event with zero out degrees for the lexico-topo Events: Iterator<Item = &'a EventId> + Clone + Send,
// sort below. If there are multiple events with omitted prevs, they will be {
// ordered by timestamp, then event ID. At that point though, it's unlikely to let back_off = |id| match self
// matter. .services
let prev_events = value .globals
.get("prev_events") .bad_event_ratelimiter
.unwrap() .write()
.as_array() .entry(id)
.unwrap()
.iter()
.map(|v| EventId::parse(v.as_str().unwrap()).unwrap())
.filter(|id| pdu_map.contains_key(id))
.collect();
dag.insert(event_id.clone(), prev_events);
let origin_server_ts = value
.get("origin_server_ts")
.and_then(CanonicalJsonValue::as_integer)
.unwrap_or_default();
id_origin_ts.insert(event_id.clone(), origin_server_ts);
}
debug!(count = dag.len(), "Sorting incoming events with partial graph");
lexicographical_topological_sort(&dag, &async |node_id| {
// Note: we don't bother fetching power levels because that would massively slow
// this function down. This is a best-effort attempt to order events correctly
// for processing, however ultimately that should be the sender's job.
let ts = id_origin_ts
.get(&node_id)
.copied()
.unwrap_or_else(|| int!(0))
.to_string()
.parse::<u64>()
.ok()
.and_then(UInt::new)
.unwrap_or_default();
Ok((int!(0), MilliSecondsSinceUnixEpoch(ts)))
})
.await
.inspect(|sorted| {
debug_assert_eq!(
sorted.len(),
pdu_map.len(),
"Sorted graph was not the same size as the input graph"
);
})
.map_err(|e| err!("failed to resolve local graph: {e}"))
}
impl super::Service {
/// Uses `/_matrix/federation/v1/get_missing_events` to fill gaps in the
/// DAG.
///
/// When this function is called, the "earliest events" (current forward
/// extremities) will be collected, and the function will loop with an
/// exponentially incrementing limit (up to 100 per request) until it has
/// filled the gap, i.e. when the remote says there's no more events.
///
/// This function does not persist the events. The caller is responsible for
/// passing them through handle_incoming_pdu.
pub(super) async fn backfill_missing_events(
&self,
room_id: OwnedRoomId,
head: Vec<OwnedEventId>,
via: OwnedServerName,
) -> conduwuit::Result<HashMap<OwnedEventId, PduEvent>> {
if head.is_empty() {
return Ok(HashMap::new());
}
let tail = self
.services
.state
.get_forward_extremities(&room_id)
.collect::<Vec<_>>()
.await;
// TODO: min_depth is probably necessary to avoid fetching the entire room
// history if there are very long gaps
let mut latest_events: HashSet<OwnedEventId> = HashSet::from_iter(head.clone());
let mut loop_count: u64 = 3;
// Start with a base number of 3 so that we fetch 10, 16, 25, 36, etc
// instead of 1, 2, 4, 9, so on.
let mut backfilled_events = HashMap::with_capacity(10);
while !latest_events.is_empty() {
let todo: Vec<OwnedEventId> = latest_events.clone().into_iter().collect();
let mut request =
get_missing_events::v1::Request::new(room_id.clone(), tail.clone(), todo.clone());
let limit = min(loop_count.saturating_pow(2), 100);
request.limit = limit
.try_into()
.expect("limit cannot be greater than 100, which fits into UInt");
debug_info!(
backfilled=%backfilled_events.len(),
%loop_count,
"Asking {via} for up to {limit} missing events",
);
trace!(
?latest_events,
?tail,
%via,
%limit,
"Requesting missing events"
);
let response: get_missing_events::v1::Response = self
.services
.sending
.send_federation_request(&via, request)
.await?;
loop_count = loop_count.saturating_add(1);
// Some buggy servers (including old continuwuity) may return the same events
// multiple times, which can cause this to be an infinite loop.
// In order to break this loop, if we see no new events from this response (i.e.
// all events in the response are already in backfilled_events), we stop,
// with a warning.
let mut unseen: usize = 0;
let chunk_len = response.events.len();
if response.events.is_empty() {
debug_info!("No more missing events found");
break;
}
for event in response.events {
trace!("Parsing incoming event from backfill");
let (incoming_room_id, event_id, pdu_json) =
self.parse_incoming_pdu(&event).await.map_err(|e| {
err!(BadServerResponse("{via} returned an invalid event: {e:?}"))
})?;
trace!(%incoming_room_id, %event_id, "Parsed incoming event from backfill");
if incoming_room_id != room_id {
return Err!(BadServerResponse(
"{via} returned {event_id} in missing events which belongs to \
{incoming_room_id}, not {room_id}"
));
}
latest_events.remove(&event_id);
if head.contains(&event_id) || tail.contains(&event_id) {
debug!("Skipping known event {event_id}");
continue;
}
if backfilled_events.contains_key(&event_id) {
debug_warn!(%via, %event_id, "Remote retransmitted event");
continue;
}
// TODO: Should this be scoped to the GME session? We might end up incorrectly
// assuming we've caught up if we do this
if let Ok(pdu) = self.services.timeline.get_pdu(&event_id).await {
debug!(%via, %event_id, "Already seen event in database");
backfilled_events.insert(event_id.clone(), pdu);
} else {
unseen = unseen.saturating_add(1);
}
let parsed = PduEvent::from_id_val(&event_id, pdu_json)
.map_err(|e| err!(BadServerResponse("Unable to parse {event_id}: {e}")))?;
for prev_event_id in parsed.prev_events() {
// Verify that we have all of this event's prev_events. If we don't, add it to
// the work queue
if !(backfilled_events.contains_key(prev_event_id)
|| self.services.timeline.pdu_exists(prev_event_id).await)
{
latest_events.insert(prev_event_id.to_owned());
break;
}
continue;
}
backfilled_events.insert(event_id.clone(), parsed);
}
for event_id in todo {
latest_events.remove(&event_id);
if let hash_map::Entry::Vacant(e) = backfilled_events.entry(event_id.clone()) {
let evt = self.services.timeline.get_pdu(&event_id).await?;
e.insert(evt);
}
}
debug!(
count=%chunk_len,
new=%unseen,
remaining=%latest_events.len(),
"Got missing events"
);
if unseen == 0 {
debug_warn!("Didn't see any new events, breaking cycle");
break;
} else if chunk_len < usize::try_from(limit)? {
debug!(
"Got less than the limit number of events, assuming there's no more to fetch"
);
break;
}
}
debug_info!("Successfully fetched {} missing events from {via}", backfilled_events.len());
trace!("Missing_events: {backfilled_events:?}");
Ok(backfilled_events)
}
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
/// it is appended to the outliers Tree.
///
/// Returns pdu and if we fetched it over federation the raw json.
///
/// a. Look in the main timeline (pduid_pdu tree)
/// b. Look at outlier pdu tree
/// c. Ask origin server over federation
/// d. TODO: Ask other servers over federation?
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
&self,
origin: &'a ServerName,
events: Events,
create_event: &'a Pdu,
room_id: &'a RoomId,
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
where
Pdu: Event + Send + Sync,
Events: Iterator<Item = &'a EventId> + Clone + Send,
{ {
let back_off = |id| match self | hash_map::Entry::Vacant(e) => {
.services e.insert((Instant::now(), 1));
.globals },
.bad_event_ratelimiter | hash_map::Entry::Occupied(mut e) => {
.write() *e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
.entry(id) },
{ };
| hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
},
| hash_map::Entry::Occupied(mut e) => {
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
},
};
let mut events_with_auth_events = Vec::with_capacity(events.clone().count()); let mut events_with_auth_events = Vec::with_capacity(events.clone().count());
trace!("Fetching {} outlier pdus", events.clone().count()); trace!("Fetching {} outlier pdus", events.clone().count());
for id in events { for id in events {
// a. Look in the main timeline (pduid_pdu tree) // a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree // b. Look at outlier pdu tree
// (get_pdu_json checks both) // (get_pdu_json checks both)
if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await { if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await {
trace!("Found {id} in main timeline or outlier tree"); trace!("Found {id} in main timeline or outlier tree");
events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![])); events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![]));
continue;
}
// c. Ask origin server over federation
// We also handle its auth chain here so we don't get a stack overflow in
// handle_outlier_pdu.
let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into();
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
while let Some(next_id) = todo_auth_events.pop_front() {
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.get(&*next_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 60 * 2;
const MAX_DURATION: u64 = 60 * 60 * 8;
if continue_exponential_backoff_secs(
MIN_DURATION,
MAX_DURATION,
time.elapsed(),
*tries,
) {
debug_warn!(
tried = ?*tries,
elapsed = ?time.elapsed(),
"Backing off from {next_id}",
);
continue;
}
}
if events_all.contains(&next_id) {
continue; continue;
} }
// c. Ask origin server over federation if self.services.timeline.pdu_exists(&next_id).await {
// We also handle its auth chain here so we don't get a stack overflow in trace!("Found {next_id} in db");
// handle_outlier_pdu. continue;
let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into();
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
while let Some(next_id) = todo_auth_events.pop_front() {
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.get(&*next_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 60 * 2;
const MAX_DURATION: u64 = 60 * 60 * 8;
if continue_exponential_backoff_secs(
MIN_DURATION,
MAX_DURATION,
time.elapsed(),
*tries,
) {
debug_warn!(
tried = ?*tries,
elapsed = ?time.elapsed(),
"Backing off from {next_id}",
);
continue;
}
}
if events_all.contains(&next_id) {
continue;
}
if self.services.timeline.pdu_exists(&next_id).await {
trace!("Found {next_id} in db");
continue;
}
debug!("Fetching {next_id} over federation from {origin}.");
match self
.services
.sending
.send_federation_request(
origin,
get_event::v1::Request::new((*next_id).to_owned()),
)
.await
{
| Ok(res) => {
debug!("Got {next_id} over federation from {origin}");
let Ok(room_version_rules) = get_room_version_rules(create_event) else {
back_off((*next_id).to_owned());
continue;
};
let Ok((calculated_event_id, value)) =
gen_event_id_canonical_json(&res.pdu, &room_version_rules)
else {
back_off((*next_id).to_owned());
continue;
};
if calculated_event_id != *next_id {
warn!(
"Server didn't return event id we requested: requested: \
{next_id}, we got {calculated_event_id}. Event: {:?}",
&res.pdu
);
}
if let Some(auth_events) = value
.get("auth_events")
.and_then(CanonicalJsonValue::as_array)
{
for auth_event in auth_events {
match serde_json::from_value::<OwnedEventId>(
auth_event.clone().into(),
) {
| Ok(auth_event) => {
trace!(
"Found auth event id {auth_event} for event \
{next_id}"
);
todo_auth_events.push_back(auth_event);
},
| _ => {
warn!("Auth event id is not valid");
},
}
}
} else {
warn!("Auth event list invalid");
}
events_in_reverse_order.push((next_id.clone(), value));
events_all.insert(next_id);
},
| Err(e) => {
warn!("Failed to fetch auth event {next_id} from {origin}: {e}");
back_off((*next_id).to_owned());
},
}
} }
events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order)); debug!("Fetching {next_id} over federation from {origin}.");
} match self
.services
let mut pdus = Vec::with_capacity(events_with_auth_events.len()); .sending
for (id, local_pdu, events_in_reverse_order) in events_with_auth_events { .send_federation_request(
// a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree
// (get_pdu_json checks both)
if let Some(local_pdu) = local_pdu {
trace!("Found {id} in main timeline or outlier tree");
pdus.push((local_pdu.clone(), None));
}
for (next_id, value) in events_in_reverse_order.into_iter().rev() {
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.get(&*next_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 5 * 60;
const MAX_DURATION: u64 = 60 * 60 * 24;
if continue_exponential_backoff_secs(
MIN_DURATION,
MAX_DURATION,
time.elapsed(),
*tries,
) {
debug!("Backing off from {next_id}");
continue;
}
}
trace!("Handling outlier {next_id}");
match Box::pin(self.handle_outlier_pdu(
origin, origin,
create_event, get_event::v1::Request::new((*next_id).to_owned()),
&next_id, )
room_id,
value.clone(),
true,
))
.await .await
{ {
| Ok((pdu, json)) => | Ok(res) => {
if next_id == *id { debug!("Got {next_id} over federation from {origin}");
trace!("Handled outlier {next_id} (original request)"); let Ok(room_version_rules) = get_room_version_rules(create_event) else {
pdus.push((pdu, Some(json))); back_off((*next_id).to_owned());
}, continue;
| Err(e) => { };
warn!("Authentication of event {next_id} failed: {e:?}");
back_off(next_id); let Ok((calculated_event_id, value)) =
}, gen_event_id_canonical_json(&res.pdu, &room_version_rules)
} else {
back_off((*next_id).to_owned());
continue;
};
if calculated_event_id != *next_id {
warn!(
"Server didn't return event id we requested: requested: {next_id}, \
we got {calculated_event_id}. Event: {:?}",
&res.pdu
);
}
if let Some(auth_events) = value
.get("auth_events")
.and_then(CanonicalJsonValue::as_array)
{
for auth_event in auth_events {
match serde_json::from_value::<OwnedEventId>(
auth_event.clone().into(),
) {
| Ok(auth_event) => {
trace!(
"Found auth event id {auth_event} for event {next_id}"
);
todo_auth_events.push_back(auth_event);
},
| _ => {
warn!("Auth event id is not valid");
},
}
}
} else {
warn!("Auth event list invalid");
}
events_in_reverse_order.push((next_id.clone(), value));
events_all.insert(next_id);
},
| Err(e) => {
warn!("Failed to fetch auth event {next_id} from {origin}: {e}");
back_off((*next_id).to_owned());
},
} }
} }
trace!("Fetched and handled {} outlier pdus", pdus.len());
pdus events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order));
} }
let mut pdus = Vec::with_capacity(events_with_auth_events.len());
for (id, local_pdu, events_in_reverse_order) in events_with_auth_events {
// a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree
// (get_pdu_json checks both)
if let Some(local_pdu) = local_pdu {
trace!("Found {id} in main timeline or outlier tree");
pdus.push((local_pdu.clone(), None));
}
for (next_id, value) in events_in_reverse_order.into_iter().rev() {
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.get(&*next_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 5 * 60;
const MAX_DURATION: u64 = 60 * 60 * 24;
if continue_exponential_backoff_secs(
MIN_DURATION,
MAX_DURATION,
time.elapsed(),
*tries,
) {
debug!("Backing off from {next_id}");
continue;
}
}
trace!("Handling outlier {next_id}");
match Box::pin(self.handle_outlier_pdu(
origin,
create_event,
&next_id,
room_id,
value.clone(),
true,
))
.await
{
| Ok((pdu, json)) =>
if next_id == *id {
trace!("Handled outlier {next_id} (original request)");
pdus.push((pdu, Some(json)));
},
| Err(e) => {
warn!("Authentication of event {next_id} failed: {e:?}");
back_off(next_id);
},
}
}
}
trace!("Fetched and handled {} outlier pdus", pdus.len());
pdus
} }
+118 -108
View File
@@ -1,118 +1,128 @@
use std::collections::{HashMap, HashSet, VecDeque}; use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque},
iter::once,
};
use conduwuit::{Event, PduEvent, debug, debug_info, error, trace}; use conduwuit::{
use ruma::{OwnedEventId, RoomId, ServerName}; Event, PduEvent, Result, debug_warn, err, implement,
state_res::{self},
};
use futures::{FutureExt, future};
use ruma::{
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
int, uint,
};
use crate::rooms::event_handler::build_local_dag; use super::check_room_id;
impl super::Service { #[implement(super::Service)]
pub(super) async fn fetch_prevs( #[tracing::instrument(
&self, level = "debug",
room_id: &RoomId, skip_all,
create_event: &PduEvent, fields(%origin),
incoming_pdu: &PduEvent, )]
origin: &ServerName, #[allow(clippy::type_complexity)]
) -> conduwuit::Result<()> { pub(super) async fn fetch_prev<'a, Pdu, Events>(
let mut queue: VecDeque<OwnedEventId> = VecDeque::new(); &self,
queue.push_back(incoming_pdu.event_id().to_owned()); origin: &ServerName,
create_event: &Pdu,
room_id: &RoomId,
first_ts_in_room: MilliSecondsSinceUnixEpoch,
initial_set: Events,
) -> Result<(
Vec<OwnedEventId>,
HashMap<OwnedEventId, (PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
)>
where
Pdu: Event + Send + Sync,
Events: Iterator<Item = &'a EventId> + Clone + Send,
{
let num_ids = initial_set.clone().count();
let mut eventid_info = HashMap::new();
let mut graph: HashMap<OwnedEventId, _> = HashMap::with_capacity(num_ids);
let mut todo_outlier_stack: VecDeque<OwnedEventId> =
initial_set.map(ToOwned::to_owned).collect();
while let Some(event_id) = queue.pop_front() { let mut amount = 0;
debug!(event_id=%incoming_pdu.event_id, "Fetching any missing prev_events");
let mut missing_prev_events: HashSet<OwnedEventId> = while let Some(prev_event_id) = todo_outlier_stack.pop_front() {
incoming_pdu.prev_events().map(ToOwned::to_owned).collect(); self.services.server.check_running()?;
for pid in missing_prev_events.clone() {
if self.services.timeline.pdu_exists(&pid).await { match self
trace!("Found prev event {pid} for outlier event {event_id} locally"); .fetch_and_handle_outliers(
missing_prev_events.remove(&pid); origin,
} else { once(prev_event_id.as_ref()),
debug_info!( create_event,
"Could not find prev event {pid} for outlier event {event_id} locally, \ room_id,
will fetch over federation" )
); .boxed()
.await
.pop()
{
| Some((pdu, mut json_opt)) => {
check_room_id(room_id, &pdu)?;
let limit = self.services.server.config.max_fetch_prev_events;
if amount > limit {
debug_warn!("Max prev event limit reached! Limit: {limit}");
graph.insert(prev_event_id.clone(), HashSet::new());
continue;
} }
}
if !missing_prev_events.is_empty() { if json_opt.is_none() {
debug_info!( json_opt = self
"Fetching {} missing prev events for outlier event {event_id}", .services
missing_prev_events.len() .outlier
); .get_outlier_pdu_json(&prev_event_id)
let backfilled = self
.backfill_missing_events(
room_id.to_owned(),
vec![event_id.clone()],
origin.to_owned(),
)
.await?;
debug_info!("Fetched {} missing events for {event_id}", backfilled.len());
let mapped = backfilled
.iter()
.map(|(eid, evt)| {
let mut obj = evt.to_canonical_object();
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
(eid.clone(), obj)
})
.collect::<HashMap<_, _>>();
let local_dag = if mapped.len() == 1 {
mapped.keys().map(ToOwned::to_owned).collect()
} else {
build_local_dag(&mapped).await?
};
debug_info!("Preparing to handle {} missing events", backfilled.len());
for prev_event_id in local_dag {
let obj = mapped
.get(&prev_event_id)
.expect("We should have this event in memory");
debug_info!("Handling prev event {prev_event_id}");
match self
.handle_outlier_pdu(
origin,
create_event,
&prev_event_id,
room_id,
obj.clone(),
false,
)
.await .await
{ .ok();
| Ok(_) => {
debug!("Successfully handled {prev_event_id} as an outlier");
missing_prev_events.remove(&prev_event_id);
},
| Err(e) =>
error!(error=?e, %prev_event_id, %event_id, "Failed to handle prev event"),
}
debug_info!("Finished handling prev");
} }
}
let outlier = self.services.timeline.get_pdu(&event_id).await;
if missing_prev_events.is_empty()
&& let Ok(pdu) = outlier
{
// promote any prevs first
for prev_event_id in pdu.prev_events() {
debug_info!("Promoting prev event {prev_event_id} to timeline");
let prev_pdu = self.services.timeline.get_pdu(&event_id).await?;
let val = prev_pdu.to_canonical_object();
self.upgrade_outlier_to_timeline_pdu(
prev_pdu,
val,
create_event,
origin,
room_id,
)
.await?;
debug_info!("Finished prev promoting {prev_event_id} to timeline");
}
debug_info!("Promoting event {event_id} to timeline");
let val = pdu.to_canonical_object();
self.upgrade_outlier_to_timeline_pdu(pdu, val, create_event, origin, room_id)
.await?;
debug_info!("Finished promoting {event_id} to timeline");
} else {
debug!(?missing_prev_events, ok=%outlier.is_ok(), "Not promoting {event_id}");
}
}
Ok(()) if let Some(json) = json_opt {
if pdu.origin_server_ts() > first_ts_in_room {
amount = amount.saturating_add(1);
for prev_prev in pdu.prev_events() {
if !graph.contains_key(prev_prev) {
todo_outlier_stack.push_back(prev_prev.to_owned());
}
}
graph.insert(
prev_event_id.clone(),
pdu.prev_events().map(ToOwned::to_owned).collect(),
);
} else {
// Time based check failed
graph.insert(prev_event_id.clone(), HashSet::new());
}
eventid_info.insert(prev_event_id.clone(), (pdu, json));
} else {
// Get json failed, so this was not fetched over federation
graph.insert(prev_event_id.clone(), HashSet::new());
}
},
| _ => {
// Fetch and handle failed
graph.insert(prev_event_id.clone(), HashSet::new());
},
}
} }
let event_fetch = |event_id| {
let origin_server_ts = eventid_info
.get(&event_id)
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get());
// This return value is the key used for sorting events,
// events are then sorted by power level, time,
// and lexically by event_id.
future::ok((int!(0), MilliSecondsSinceUnixEpoch(origin_server_ts)))
};
let sorted = state_res::lexicographical_topological_sort(&graph, &event_fetch)
.await
.map_err(|e| err!(Database(error!("Error sorting prev events: {e}"))))?;
Ok((sorted, eventid_info))
} }
@@ -1,6 +1,7 @@
use std::collections::{HashMap, hash_map}; use std::collections::{HashMap, hash_map};
use conduwuit::{Err, Event, Result, debug, debug_warn, err, implement}; use conduwuit::{Err, Event, Result, debug, debug_warn, err, implement};
use futures::FutureExt;
use ruma::{ use ruma::{
EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids, EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids,
events::StateEventType, events::StateEventType,
@@ -41,6 +42,7 @@ pub(super) async fn fetch_state<Pdu>(
let state_ids = res.pdu_ids.iter().map(AsRef::as_ref); let state_ids = res.pdu_ids.iter().map(AsRef::as_ref);
let state_vec = self let state_vec = self
.fetch_and_handle_outliers(origin, state_ids, create_event, room_id) .fetch_and_handle_outliers(origin, state_ids, create_event, room_id)
.boxed()
.await; .await;
let mut state: HashMap<ShortStateKey, OwnedEventId> = HashMap::with_capacity(state_vec.len()); let mut state: HashMap<ShortStateKey, OwnedEventId> = HashMap::with_capacity(state_vec.len());
@@ -1,11 +1,14 @@
use std::{collections::BTreeMap, time::Instant}; use std::{
collections::{BTreeMap, hash_map},
time::Instant,
};
use conduwuit::{ use conduwuit::{
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, debug_error, debug_info, defer, err, Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, debug_error, debug_info, defer, err,
implement, info, trace, warn, implement, info, trace, utils::stream::IterStream, warn,
}; };
use futures::{ use futures::{
FutureExt, FutureExt, TryFutureExt, TryStreamExt,
future::{OptionFuture, try_join4}, future::{OptionFuture, try_join4},
}; };
use ruma::{ use ruma::{
@@ -212,6 +215,72 @@ pub async fn handle_incoming_pdu<'a>(
.get_room_create_event(room_id) .get_room_create_event(room_id)
.await; .await;
let (incoming_pdu, val) = self
.handle_outlier_pdu(origin, create_event, event_id, room_id, value, false)
.await?;
// 8. if not timeline event: stop
if !is_timeline_event {
return Ok(None);
}
// Skip old events
let first_ts_in_room = self
.services
.timeline
.first_pdu_in_room(room_id)
.await?
.origin_server_ts();
// 9. Fetch any missing prev events doing all checks listed here starting at 1.
// These are timeline events
let (sorted_prev_events, mut eventid_info) = self
.fetch_prev(origin, create_event, room_id, first_ts_in_room, incoming_pdu.prev_events())
.await?;
debug!(
events = ?sorted_prev_events,
"Handling previous events"
);
sorted_prev_events
.iter()
.try_stream()
.map_ok(AsRef::as_ref)
.try_for_each(|prev_id| {
self.handle_prev_pdu(
origin,
event_id,
room_id,
eventid_info.remove(prev_id),
create_event,
first_ts_in_room,
prev_id,
)
.inspect_err(move |e| {
warn!("Prev {prev_id} failed: {e}");
match self
.services
.globals
.bad_event_ratelimiter
.write()
.entry(prev_id.into())
{
| hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
},
| hash_map::Entry::Occupied(mut e) => {
let tries = e.get().1.saturating_add(1);
*e.get_mut() = (Instant::now(), tries);
},
}
})
.map(|_| self.services.server.check_running())
})
.boxed()
.await?;
// Done with prev events, now handling the incoming event
let start_time = Instant::now(); let start_time = Instant::now();
self.federation_handletime self.federation_handletime
.write() .write()
@@ -223,31 +292,7 @@ pub async fn handle_incoming_pdu<'a>(
.remove(room_id); .remove(room_id);
}}; }};
let (incoming_pdu, val) = self
.handle_outlier_pdu(origin, create_event, event_id, room_id, value, false)
.await?;
// 8. if not timeline event: stop
if !is_timeline_event {
return Ok(None);
}
// Skip old events
// let first_ts_in_room = self
// .services
// .timeline
// .first_pdu_in_room(room_id)
// .await?
// .origin_server_ts();
// 9. Fetch any missing prev events doing all checks listed here starting at 1.
// These are timeline events
debug!("Handling previous events");
self.fetch_prevs(room_id, create_event, &incoming_pdu, origin)
.await?;
// Done with prev events, now handling the incoming event
self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id) self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
.boxed()
.await .await
} }
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, HashMap, HashSet, hash_map}; use std::collections::{BTreeMap, HashMap, hash_map};
use conduwuit::{ use conduwuit::{
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res, Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res,
@@ -10,7 +10,7 @@
events::StateEventType, events::StateEventType,
}; };
use super::{build_local_dag, check_room_id, get_room_version_rules}; use super::{check_room_id, get_room_version_rules};
use crate::rooms::timeline::pdu_fits; use crate::rooms::timeline::pdu_fits;
#[implement(super::Service)] #[implement(super::Service)]
@@ -22,7 +22,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
event_id: &'a EventId, event_id: &'a EventId,
room_id: &'a RoomId, room_id: &'a RoomId,
mut value: CanonicalJsonObject, mut value: CanonicalJsonObject,
_auth_events_known: bool, auth_events_known: bool,
) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)> ) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>
where where
Pdu: Event + Send + Sync, Pdu: Event + Send + Sync,
@@ -100,71 +100,42 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
} }
// Fetch any missing ones & reject invalid ones // Fetch any missing ones & reject invalid ones
let mut missing_auth_events: HashSet<OwnedEventId> = pdu_event let missing_auth_events = if auth_events_known {
.auth_events() pdu_event
.filter(|id| !auth_events.contains_key(*id)) .auth_events()
.map(ToOwned::to_owned) .filter(|id| !auth_events.contains_key(*id))
.collect(); .collect::<Vec<_>>()
} else {
if !missing_auth_events.is_empty() { pdu_event.auth_events().collect::<Vec<_>>()
};
if !missing_auth_events.is_empty() || !auth_events_known {
debug_info!( debug_info!(
"Fetching {} missing auth events for outlier event {event_id}", "Fetching {} missing auth events for outlier event {event_id}",
missing_auth_events.len() missing_auth_events.len()
); );
let backfilled = self for (pdu, _) in self
.backfill_missing_events( .fetch_and_handle_outliers(
room_id.to_owned(),
vec![event_id.to_owned()],
origin.to_owned(),
)
.await?;
debug_info!("Fetched {} missing auth events for {event_id}", backfilled.len());
let mapped = backfilled
.iter()
.map(|(eid, evt)| {
let mut obj = evt.to_canonical_object();
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
(eid.clone(), obj)
})
.collect::<HashMap<_, _>>();
let local_dag = if mapped.len() == 1 {
mapped.keys().map(ToOwned::to_owned).collect()
} else {
build_local_dag(&mapped).await?
};
debug_info!("Preparing to handle {} missing auth events", backfilled.len());
for prev_event_id in local_dag {
let obj = mapped
.get(&prev_event_id)
.expect("We should have this event in memory");
debug_info!("Handling prev {prev_event_id}");
let (prev, _) = Box::pin(self.handle_outlier_pdu(
origin, origin,
missing_auth_events.iter().copied(),
create_event, create_event,
&prev_event_id,
room_id, room_id,
obj.clone(), )
false, .await
)) {
.await?; auth_events.insert(pdu.event_id().to_owned(), pdu);
if missing_auth_events.contains(&*prev_event_id) {
missing_auth_events.remove(&prev_event_id);
auth_events.insert(prev_event_id, prev);
}
debug_info!("Finished handling prev auth event");
} }
} else { } else {
debug!("No missing auth events for outlier event {event_id}"); debug!("No missing auth events for outlier event {event_id}");
} }
// reject if we are still missing some auth events. // reject if we are still missing some
// If we're still missing prev events, we will fetch them individually later, let still_missing = pdu_event
// but there's no reason for us to be missing auth events now we've gapfilled .auth_events()
// the DAG. .filter(|id| !auth_events.contains_key(*id))
if !missing_auth_events.is_empty() { .collect::<Vec<_>>();
// Don't reject: this could be a temporary condition if !still_missing.is_empty() {
return Err!(Request(InvalidParam( return Err!(Request(InvalidParam(
"Could not fetch all auth events for outlier event {event_id}, still missing: \ "Could not fetch all auth events for outlier event {event_id}, still missing: \
{missing_auth_events:?}" {still_missing:?}"
))); )));
} }
@@ -192,10 +163,6 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
v.insert(auth_event); v.insert(auth_event);
}, },
| hash_map::Entry::Occupied(_) => { | hash_map::Entry::Occupied(_) => {
self.services
.outlier
.add_pdu_outlier(pdu_event.event_id(), &incoming_pdu);
self.services.pdu_metadata.mark_event_rejected(event_id);
return Err!(Request(InvalidParam( return Err!(Request(InvalidParam(
"Auth event's type and state_key combination exists multiple times: {}, {}", "Auth event's type and state_key combination exists multiple times: {}, {}",
auth_event.kind, auth_event.kind,
@@ -210,10 +177,6 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
auth_events_by_key.get(&(StateEventType::RoomCreate, String::new().into())), auth_events_by_key.get(&(StateEventType::RoomCreate, String::new().into())),
Some(_) | None Some(_) | None
) { ) {
self.services.pdu_metadata.mark_event_rejected(event_id);
self.services
.outlier
.add_pdu_outlier(pdu_event.event_id(), &incoming_pdu);
return Err!(Request(InvalidParam("Incoming event refers to wrong create event."))); return Err!(Request(InvalidParam("Incoming event refers to wrong create event.")));
} }
@@ -222,7 +185,6 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
ready(auth_events_by_key.get(&key).map(ToOwned::to_owned)) ready(auth_events_by_key.get(&key).map(ToOwned::to_owned))
}; };
// PDU check: 3
let auth_check = state_res::event_auth::auth_check( let auth_check = state_res::event_auth::auth_check(
&room_version_rules, &room_version_rules,
&pdu_event, &pdu_event,
@@ -234,13 +196,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?; .map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
if !auth_check { if !auth_check {
self.services.pdu_metadata.mark_event_rejected(event_id); return Err!(Request(Forbidden("Auth check failed")));
self.services
.outlier
.add_pdu_outlier(pdu_event.event_id(), &incoming_pdu);
return Err!(Request(Forbidden(
"Event authorisation fails based on event's claimed auth events"
)));
} }
trace!("Validation successful."); trace!("Validation successful.");
@@ -0,0 +1,89 @@
use std::{collections::BTreeMap, time::Instant};
use conduwuit::{
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, defer, implement,
utils::continue_exponential_backoff_secs,
};
use ruma::{CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName};
use tracing::debug;
#[implement(super::Service)]
#[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(
name = "prev",
level = INFO_SPAN_LEVEL,
skip_all,
fields(%prev_id),
)]
pub(super) async fn handle_prev_pdu<'a, Pdu>(
&self,
origin: &'a ServerName,
event_id: &'a EventId,
room_id: &'a RoomId,
eventid_info: Option<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
create_event: &'a Pdu,
first_ts_in_room: MilliSecondsSinceUnixEpoch,
prev_id: &'a EventId,
) -> Result
where
Pdu: Event + Send + Sync,
{
// Check for disabled again because it might have changed
if self.services.metadata.is_disabled(room_id).await {
return Err!(Request(Forbidden(debug_warn!(
"Federaton of room {room_id} is currently disabled on this server. Request by \
origin {origin} and event ID {event_id}"
))));
}
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.get(prev_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 5 * 60;
const MAX_DURATION: u64 = 60 * 60 * 24;
if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) {
debug!(
?tries,
duration = ?time.elapsed(),
"Backing off from prev_event"
);
return Ok(());
}
}
let Some((pdu, json)) = eventid_info else {
return Ok(());
};
// Skip old events
if pdu.origin_server_ts() < first_ts_in_room {
return Ok(());
}
let start_time = Instant::now();
self.federation_handletime
.write()
.insert(room_id.into(), ((*prev_id).to_owned(), start_time));
defer! {{
self.federation_handletime
.write()
.remove(room_id);
}};
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id)
.await?;
debug!(
elapsed = ?start_time.elapsed(),
"Handled prev_event",
);
Ok(())
}
+2 -1
View File
@@ -4,6 +4,7 @@
mod fetch_state; mod fetch_state;
mod handle_incoming_pdu; mod handle_incoming_pdu;
mod handle_outlier_pdu; mod handle_outlier_pdu;
mod handle_prev_pdu;
mod parse_incoming_pdu; mod parse_incoming_pdu;
mod policy_server; mod policy_server;
mod resolve_state; mod resolve_state;
@@ -14,13 +15,13 @@
use async_trait::async_trait; use async_trait::async_trait;
use conduwuit::{Err, Event, PduEvent, Result, Server, SyncRwLock, utils::MutexMap}; use conduwuit::{Err, Event, PduEvent, Result, Server, SyncRwLock, utils::MutexMap};
pub use fetch_and_handle_outliers::build_local_dag;
use ruma::{ use ruma::{
OwnedEventId, OwnedRoomId, RoomId, events::room::create::RoomCreateEventContent, OwnedEventId, OwnedRoomId, RoomId, events::room::create::RoomCreateEventContent,
room_version_rules::RoomVersionRules, room_version_rules::RoomVersionRules,
}; };
use crate::{Dep, globals, rooms, sending, server_keys}; use crate::{Dep, globals, rooms, sending, server_keys};
pub struct Service { pub struct Service {
pub mutex_federation: RoomMutexMap, pub mutex_federation: RoomMutexMap,
pub federation_handletime: SyncRwLock<HandleTimeMap>, pub federation_handletime: SyncRwLock<HandleTimeMap>,
@@ -56,10 +56,7 @@ fn extract_room_id(event_type: &str, pdu: &CanonicalJsonObject) -> Result<OwnedR
/// Parses every entry in an array as an event ID, returning an error if any /// Parses every entry in an array as an event ID, returning an error if any
/// step fails. /// step fails.
pub(super) fn expect_event_id_array( fn expect_event_id_array(value: &CanonicalJsonObject, field: &str) -> Result<Vec<OwnedEventId>> {
value: &CanonicalJsonObject,
field: &str,
) -> Result<Vec<OwnedEventId>> {
value value
.get(field) .get(field)
.ok_or_else(|| err!(Request(BadJson("missing field `{field}` on PDU"))))? .ok_or_else(|| err!(Request(BadJson("missing field `{field}` on PDU"))))?
@@ -5,7 +5,7 @@
}; };
use conduwuit::{ use conduwuit::{
Result, debug, err, error, implement, Result, debug, err, implement,
matrix::{Event, StateMap}, matrix::{Event, StateMap},
trace, trace,
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt}, utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt},
@@ -121,7 +121,6 @@ pub(super) async fn state_at_incoming_resolved<Pdu>(
.state_resolution(room_version_rules, fork_states.iter(), &auth_chain_sets) .state_resolution(room_version_rules, fork_states.iter(), &auth_chain_sets)
.boxed() .boxed()
.await .await
.inspect_err(|e| error!("State resolution failed: {e:?}"))
else { else {
return Ok(None); return Ok(None);
}; };
@@ -1,18 +1,14 @@
use std::{borrow::Borrow, collections::BTreeMap, sync::Arc, time::Instant}; use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant};
use conduwuit::{ use conduwuit::{
Err, Result, debug, debug_info, debug_warn, err, implement, is_equal_to, Err, Result, debug, debug_info, err, implement, info, is_equal_to,
matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res}, matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res},
trace, trace,
utils::{ utils::stream::{BroadbandExt, ReadyExt},
IterStream,
stream::{BroadbandExt, ReadyExt},
},
warn, warn,
}; };
use futures::{FutureExt, StreamExt, future::ready}; use futures::{FutureExt, StreamExt, future::ready};
use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType}; use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType};
use tokio::join;
use super::get_room_version_rules; use super::get_room_version_rules;
use crate::rooms::{ use crate::rooms::{
@@ -39,37 +35,16 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
.get_pdu_id(incoming_pdu.event_id()) .get_pdu_id(incoming_pdu.event_id())
.await .await
{ {
trace!(event_id=%incoming_pdu.event_id(), "Skipping upgrade of already upgraded PDU");
return Ok(Some(pduid)); return Ok(Some(pduid));
} }
let (rejected, soft_failed) = join!( if self
self.services .services
.pdu_metadata .pdu_metadata
.is_event_rejected(incoming_pdu.event_id()), .is_event_soft_failed(incoming_pdu.event_id())
self.services .await
.pdu_metadata {
.is_event_soft_failed(incoming_pdu.event_id()) return Err!(Request(InvalidParam("Event has been soft failed")));
);
if rejected {
return Err!(Request(InvalidParam("Event has been rejected")));
} else if soft_failed {
return Err!(Request(InvalidParam("Event has been soft-failed")));
}
// If any of the auth events are rejected, this event is also rejected.
for aid in incoming_pdu.auth_events() {
if self.services.pdu_metadata.is_event_rejected(aid).await {
// TODO: debug_warn instead of warn
warn!(
"Rejecting incoming event {} which depends on rejected auth event {aid}",
incoming_pdu.event_id()
);
self.services
.pdu_metadata
.mark_event_rejected(incoming_pdu.event_id());
return Err!(Request(InvalidParam("Event has rejected auth event: {aid}")));
}
} }
debug!( debug!(
@@ -95,7 +70,6 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
}; };
if state_at_incoming_event.is_none() { if state_at_incoming_event.is_none() {
trace!("Could not calculate incoming state, asking remote {origin} for it");
state_at_incoming_event = self state_at_incoming_event = self
.fetch_state(origin, create_event, room_id, incoming_pdu.event_id()) .fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
.await?; .await?;
@@ -121,7 +95,6 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
event_id = %incoming_pdu.event_id, event_id = %incoming_pdu.event_id,
"Running initial auth check" "Running initial auth check"
); );
// PDU check: 5
let auth_check = state_res::event_auth::auth_check( let auth_check = state_res::event_auth::auth_check(
&room_version_rules, &room_version_rules,
&incoming_pdu, &incoming_pdu,
@@ -133,12 +106,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?; .map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
if !auth_check { if !auth_check {
self.services return Err!(Request(Forbidden("Event has failed auth check with state at the event.")));
.pdu_metadata
.mark_event_rejected(incoming_pdu.event_id());
return Err!(Request(Forbidden(
"Event authorisation fails based on the state before the event"
)));
} }
debug!( debug!(
@@ -167,7 +135,6 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
event_id = %incoming_pdu.event_id, event_id = %incoming_pdu.event_id,
"Running auth check with claimed state auth" "Running auth check with claimed state auth"
); );
// PDU check: 6
let auth_check = state_res::event_auth::auth_check( let auth_check = state_res::event_auth::auth_check(
&room_version_rules, &room_version_rules,
&incoming_pdu, &incoming_pdu,
@@ -177,12 +144,6 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
) )
.await .await
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?; .map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
if !auth_check {
warn!(
event_id = %incoming_pdu.event_id,
"Event authentication fails based on the current state of the room"
);
}
// Soft fail check before doing state res // Soft fail check before doing state res
debug!( debug!(
@@ -192,22 +153,16 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
let mut soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_rules)) { let mut soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_rules)) {
| (false, _) => true, | (false, _) => true,
| (true, None) => false, | (true, None) => false,
| (true, Some(redact_id)) => { | (true, Some(redact_id)) =>
if !self !self
.services .services
.state_accessor .state_accessor
.user_can_redact(&redact_id, incoming_pdu.sender(), room_id, true) .user_can_redact(&redact_id, incoming_pdu.sender(), room_id, true)
.await? .await?,
{
warn!(redacts = %redact_id, "User is not allowed to redact event");
true
} else {
false
}
},
}; };
// 13. Use state resolution to find new room state // 13. Use state resolution to find new room state
// We start looking at current room state now, so lets lock the room // We start looking at current room state now, so lets lock the room
trace!( trace!(
room_id = %room_id, room_id = %room_id,
@@ -215,6 +170,36 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
); );
let state_lock = self.services.state.mutex.lock(room_id).await; let state_lock = self.services.state.mutex.lock(room_id).await;
// Now we calculate the set of extremities this room has after the incoming
// event has been applied. We start with the previous extremities (aka leaves)
trace!("Calculating extremities");
let mut extremities: Vec<_> = self
.services
.state
.get_forward_extremities(room_id)
.ready_filter(|event_id| {
// Remove any that are referenced by this incoming event's prev_events
!incoming_pdu.prev_events().any(is_equal_to!(event_id))
})
.broad_filter_map(|event_id| async move {
// Only keep those extremities were not referenced yet
self.services
.pdu_metadata
.is_event_referenced(room_id, &event_id)
.await
.eq(&false)
.then_some(event_id)
})
.collect()
.await;
extremities.push(incoming_pdu.event_id().to_owned());
debug!(
"Retained {} extremities checked against {} prev_events",
extremities.len(),
incoming_pdu.prev_events().count()
);
let state_ids_compressed: Arc<CompressedState> = self let state_ids_compressed: Arc<CompressedState> = self
.services .services
.state_compressor .state_compressor
@@ -309,88 +294,81 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
.is_event_soft_failed(&redact_id) .is_event_soft_failed(&redact_id)
.await .await
{ {
// TODO: This should avoid pushing the event to the timeline instead of using
// soft-fails as a hack
warn!( warn!(
redact_id = %redact_id, redact_id = %redact_id,
"Redaction is for a soft-failed event" "Redaction is for a soft-failed event, soft failing the redaction"
); );
soft_fail = true; soft_fail = true;
} }
} }
} }
trace!("Appending pdu to timeline"); // 14. Check if the event passes auth based on the "current state" of the room,
let mut extremities: Vec<_> = self // if not soft fail it
.services if soft_fail {
.state info!(
.get_forward_extremities(room_id) event_id = %incoming_pdu.event_id,
.collect() "Soft failing event"
.await;
if !soft_fail {
// Per https://spec.matrix.org/unstable/server-server-api/#soft-failure, soft-failed events
// are not added as forward extremities.
// Now we calculate the set of extremities this room has after the incoming
// event has been applied. We start with the previous extremities (aka leaves)
trace!("Calculating extremities");
extremities = extremities
.into_iter()
.stream()
.ready_filter(|event_id| {
// Remove any that are referenced by this incoming event's prev_events
!incoming_pdu.prev_events().any(is_equal_to!(event_id))
})
.broad_filter_map(|event_id| async move {
// Only keep those extremities were not referenced yet
self.services
.pdu_metadata
.is_event_referenced(room_id, &event_id)
.await
.eq(&false)
.then_some(event_id)
})
.collect::<Vec<_>>()
.await;
extremities.push(incoming_pdu.event_id().to_owned());
debug!(
"Retained {} extremities checked against {} prev_events",
extremities.len(),
incoming_pdu.prev_events().count()
); );
assert!(!extremities.is_empty(), "extremities must not empty"); // assert!(extremities.is_empty(), "soft_fail extremities empty");
let extremities = extremities.iter().map(Borrow::borrow);
debug_assert!(extremities.clone().count() > 0, "extremities not empty");
self.services
.timeline
.append_incoming_pdu(
&incoming_pdu,
val,
extremities,
state_ids_compressed,
soft_fail,
&state_lock,
room_id,
)
.await?;
// Soft fail, we keep the event as an outlier but don't add it to the timeline
self.services
.pdu_metadata
.mark_event_soft_failed(incoming_pdu.event_id());
warn!(
event_id = %incoming_pdu.event_id,
"Event was soft failed"
);
return Err!(Request(InvalidParam("Event has been soft failed")));
} }
// Now that the event has passed all auth it is added into the timeline.
// We use the `state_at_event` instead of `state_after` so we accurately
// represent the state for this event.
trace!("Appending pdu to timeline");
let extremities = extremities
.iter()
.map(Borrow::borrow)
.chain(once(incoming_pdu.event_id()));
debug_assert!(extremities.clone().count() > 0, "extremities not empty");
let pdu_id = self let pdu_id = self
.services .services
.timeline .timeline
.append_incoming_pdu( .append_incoming_pdu(
&incoming_pdu, &incoming_pdu,
val, val,
extremities.iter().map(Borrow::borrow), extremities,
state_ids_compressed, state_ids_compressed,
soft_fail, soft_fail,
&state_lock, &state_lock,
room_id, room_id,
) )
.await?; .await?;
if soft_fail {
self.services
.pdu_metadata
.mark_event_soft_failed(incoming_pdu.event_id());
debug_warn!(
elapsed = ?timer.elapsed(),
"Event has been soft-failed",
);
} else {
debug_info!(
elapsed = ?timer.elapsed(),
"Accepted",
);
}
// Event has passed all auth/stateres checks // Event has passed all auth/stateres checks
drop(state_lock); drop(state_lock);
debug_info!(
elapsed = ?timer.elapsed(),
"Accepted",
);
Ok(pdu_id) Ok(pdu_id)
} }
+2 -7
View File
@@ -34,7 +34,7 @@
use crate::{ use crate::{
Dep, antispam, globals, Dep, antispam, globals,
rooms::{ rooms::{
metadata, outlier, pdu_metadata, short, metadata, outlier, short,
state::{self, RoomMutexGuard}, state::{self, RoomMutexGuard},
state_accessor, state_cache, state_accessor, state_cache,
state_compressor::{self, CompressedState, HashSetCompressStateEvent}, state_compressor::{self, CompressedState, HashSetCompressStateEvent},
@@ -54,7 +54,6 @@ struct Services {
globals: Dep<globals::Service>, globals: Dep<globals::Service>,
metadata: Dep<metadata::Service>, metadata: Dep<metadata::Service>,
outlier: Dep<outlier::Service>, outlier: Dep<outlier::Service>,
pdu_metadata: Dep<pdu_metadata::Service>,
sending: Dep<sending::Service>, sending: Dep<sending::Service>,
server_keys: Dep<server_keys::Service>, server_keys: Dep<server_keys::Service>,
short: Dep<short::Service>, short: Dep<short::Service>,
@@ -76,7 +75,6 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
globals: args.depend::<globals::Service>("globals"), globals: args.depend::<globals::Service>("globals"),
metadata: args.depend::<metadata::Service>("metadata"), metadata: args.depend::<metadata::Service>("metadata"),
outlier: args.depend::<outlier::Service>("rooms::outlier"), outlier: args.depend::<outlier::Service>("rooms::outlier"),
pdu_metadata: args.depend::<pdu_metadata::Service>("rooms::pdu_metadata"),
sending: args.depend::<sending::Service>("sending"), sending: args.depend::<sending::Service>("sending"),
server_keys: args.depend::<server_keys::Service>("server_keys"), server_keys: args.depend::<server_keys::Service>("server_keys"),
short: args.depend::<short::Service>("rooms::short"), short: args.depend::<short::Service>("rooms::short"),
@@ -288,7 +286,7 @@ async fn join_local_room(
} }
#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_remote_room", level = "info")] #[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_remote_room", level = "info")]
pub async fn join_remote_room( async fn join_remote_room(
&self, &self,
sender_user: &UserId, sender_user: &UserId,
room_id: &RoomId, room_id: &RoomId,
@@ -296,7 +294,6 @@ pub async fn join_remote_room(
servers: &[OwnedServerName], servers: &[OwnedServerName],
state_lock: RoomMutexGuard, state_lock: RoomMutexGuard,
) -> Result { ) -> Result {
// public so the admin command force-join-room-remotely works
info!("Joining {room_id} over federation."); info!("Joining {room_id} over federation.");
let (make_join_response, remote_server) = self let (make_join_response, remote_server) = self
@@ -517,7 +514,6 @@ pub async fn join_remote_room(
return state; return state;
} }
self.services.outlier.add_pdu_outlier(&event_id, &value); self.services.outlier.add_pdu_outlier(&event_id, &value);
self.services.pdu_metadata.clear_pdu_markers(&event_id);
if let Some(state_key) = &pdu.state_key { if let Some(state_key) = &pdu.state_key {
let shortstatekey = self let shortstatekey = self
.services .services
@@ -549,7 +545,6 @@ pub async fn join_remote_room(
.ready_for_each(|(event_id, value)| { .ready_for_each(|(event_id, value)| {
trace!(%event_id, "Adding PDU as an outlier from send_join auth_chain"); trace!(%event_id, "Adding PDU as an outlier from send_join auth_chain");
self.services.outlier.add_pdu_outlier(&event_id, &value); self.services.outlier.add_pdu_outlier(&event_id, &value);
self.services.pdu_metadata.clear_pdu_markers(&event_id);
}) })
.await; .await;
-24
View File
@@ -26,7 +26,6 @@ pub(super) struct Data {
tofrom_relation: Arc<Map>, tofrom_relation: Arc<Map>,
referencedevents: Arc<Map>, referencedevents: Arc<Map>,
softfailedeventids: Arc<Map>, softfailedeventids: Arc<Map>,
rejectedeventids: Arc<Map>,
services: Services, services: Services,
} }
@@ -41,7 +40,6 @@ pub(super) fn new(args: &crate::Args<'_>) -> Self {
tofrom_relation: db["tofrom_relation"].clone(), tofrom_relation: db["tofrom_relation"].clone(),
referencedevents: db["referencedevents"].clone(), referencedevents: db["referencedevents"].clone(),
softfailedeventids: db["softfailedeventids"].clone(), softfailedeventids: db["softfailedeventids"].clone(),
rejectedeventids: db["rejectedeventids"].clone(),
services: Services { services: Services {
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"), timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
}, },
@@ -120,29 +118,7 @@ pub(super) fn mark_event_soft_failed(&self, event_id: &EventId) {
self.softfailedeventids.insert(event_id, []); self.softfailedeventids.insert(event_id, []);
} }
pub(super) fn unmark_event_soft_failed(&self, event_id: &EventId) {
self.softfailedeventids.remove(event_id);
}
pub(super) async fn is_event_soft_failed(&self, event_id: &EventId) -> bool { pub(super) async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
self.softfailedeventids.get(event_id).await.is_ok() self.softfailedeventids.get(event_id).await.is_ok()
} }
pub(super) fn mark_event_rejected(&self, event_id: &EventId) {
self.rejectedeventids.insert(event_id, []);
}
pub(super) fn unmark_event_rejected(&self, event_id: &EventId) {
self.rejectedeventids.remove(event_id);
}
pub(super) async fn is_event_rejected(&self, event_id: &EventId) -> bool {
self.rejectedeventids.get(event_id).await.is_ok()
}
/// Removes any soft-fail or rejection markers applied to the target PDU
pub(super) fn clear_pdu_markers(&self, event_id: &EventId) {
self.unmark_event_rejected(event_id);
self.unmark_event_soft_failed(event_id);
}
} }
-24
View File
@@ -140,28 +140,4 @@ pub fn mark_event_soft_failed(&self, event_id: &EventId) {
pub async fn is_event_soft_failed(&self, event_id: &EventId) -> bool { pub async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
self.db.is_event_soft_failed(event_id).await self.db.is_event_soft_failed(event_id).await
} }
pub async fn is_event_rejected(&self, event_id: &EventId) -> bool {
self.db.is_event_rejected(event_id).await
}
pub fn mark_event_rejected(&self, event_id: &EventId) {
self.db.mark_event_rejected(event_id);
}
pub fn unmark_event_soft_failed(&self, event_id: &EventId) {
self.db.unmark_event_soft_failed(event_id);
}
pub fn unmark_event_rejected(&self, event_id: &EventId) {
self.db.unmark_event_rejected(event_id);
}
/// Returns true if the event is neither soft-failed nor rejected.
pub async fn is_event_accepted(&self, event_id: &EventId) -> bool {
!self.db.is_event_rejected(event_id).await
&& !self.db.is_event_soft_failed(event_id).await
}
pub fn clear_pdu_markers(&self, event_id: &EventId) { self.db.clear_pdu_markers(event_id); }
} }
+9 -1
View File
@@ -53,7 +53,15 @@ pub async fn append_incoming_pdu<'a, Leaves>(
.await?; .await?;
if soft_fail { if soft_fail {
// Nothing else to do with a soft-failed event. self.services
.pdu_metadata
.mark_as_referenced(room_id, pdu.prev_events.iter().map(AsRef::as_ref));
// self.services
// .state
// .set_forward_extremities(room_id, new_room_leaves, state_lock)
// .await;
return Ok(None); return Ok(None);
} }