mirror of
https://forgejo.ellis.link/continuwuation/continuwuity/
synced 2026-05-13 19:13:21 +00:00
Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d3a40c40ba | |||
| 7559b31f99 | |||
| 0cd8b8fc35 | |||
| b884fbd240 | |||
| a03490c81c | |||
| 5a4b8309a7 | |||
| d576620953 | |||
| 36cf5b053b | |||
| d95bf66e3d | |||
| ab277409bc | |||
| ec76147784 | |||
| 3f3ca53fdd | |||
| 70a768a711 |
@@ -0,0 +1,9 @@
|
|||||||
|
Implemented event rejection, which should resolve and prevent future netsplits of the kinds observed
|
||||||
|
within some Continuwuity rooms.
|
||||||
|
Also resolved several bugs related to both soft-failing events, and event backfilling, which should
|
||||||
|
improve state resolution stability.
|
||||||
|
The `!admin debug get-pdu` command was updated to disambiguate event acceptance status, and
|
||||||
|
`!admin debug show-auth-chain` was added to visually display event auth chains, which may assist
|
||||||
|
developers in debugging strangely complex events.
|
||||||
|
|
||||||
|
Contributed by @nex.
|
||||||
+213
-9
@@ -1,5 +1,5 @@
|
|||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::{HashMap, HashSet},
|
||||||
fmt::Write,
|
fmt::Write,
|
||||||
iter::once,
|
iter::once,
|
||||||
time::{Instant, SystemTime},
|
time::{Instant, SystemTime},
|
||||||
@@ -22,7 +22,7 @@
|
|||||||
use lettre::message::Mailbox;
|
use lettre::message::Mailbox;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
|
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
|
||||||
OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId,
|
OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId, UInt,
|
||||||
api::federation::event::get_room_state, events::AnyStateEvent, serde::Raw,
|
api::federation::event::get_room_state, events::AnyStateEvent, serde::Raw,
|
||||||
};
|
};
|
||||||
use service::rooms::{
|
use service::rooms::{
|
||||||
@@ -69,6 +69,189 @@ pub(super) async fn get_auth_chain(&self, event_id: OwnedEventId) -> Result {
|
|||||||
self.write_str(&out).await
|
self.write_str(&out).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Eq, PartialEq)]
|
||||||
|
enum NodeStatus {
|
||||||
|
Normal(bool),
|
||||||
|
SoftFailed(bool),
|
||||||
|
Rejected(bool),
|
||||||
|
}
|
||||||
|
|
||||||
|
struct AuthChild {
|
||||||
|
node_id: String,
|
||||||
|
event_id: OwnedEventId,
|
||||||
|
depth: UInt,
|
||||||
|
ts: UInt,
|
||||||
|
first_seen: bool,
|
||||||
|
pdu: Option<PduEvent>,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn render_node(
|
||||||
|
graph: &mut String,
|
||||||
|
node_id: &str,
|
||||||
|
event_id: &EventId,
|
||||||
|
status: NodeStatus,
|
||||||
|
) -> Result {
|
||||||
|
let evt_str = event_id.to_string();
|
||||||
|
|
||||||
|
let status_label = match status {
|
||||||
|
| NodeStatus::Normal(false) => evt_str,
|
||||||
|
| NodeStatus::Normal(true) => format!("{evt_str} (missing locally)"),
|
||||||
|
| NodeStatus::SoftFailed(false) => format!("{evt_str} (soft-failed)"),
|
||||||
|
| NodeStatus::SoftFailed(true) => format!("{evt_str} (soft-failed & missing locally)"),
|
||||||
|
| NodeStatus::Rejected(false) => format!("{evt_str} (rejected)"),
|
||||||
|
| NodeStatus::Rejected(true) => format!("{evt_str} (rejected & missing locally)"),
|
||||||
|
};
|
||||||
|
|
||||||
|
writeln!(graph, "{node_id}[\"{}\"]", status_label.as_str())?;
|
||||||
|
|
||||||
|
match status {
|
||||||
|
| NodeStatus::Rejected(_) => writeln!(graph, "class {node_id} rejected;")?,
|
||||||
|
| NodeStatus::SoftFailed(_) => writeln!(graph, "class {node_id} soft_failed;")?,
|
||||||
|
| NodeStatus::Normal(_) => {},
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[admin_command]
|
||||||
|
pub(super) async fn show_auth_chain(&self, event_id: OwnedEventId) -> Result {
|
||||||
|
let node_status = async |event_id: &EventId, missing: bool| -> NodeStatus {
|
||||||
|
if self
|
||||||
|
.services
|
||||||
|
.rooms
|
||||||
|
.pdu_metadata
|
||||||
|
.is_event_rejected(event_id)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
NodeStatus::Rejected(missing)
|
||||||
|
} else if self
|
||||||
|
.services
|
||||||
|
.rooms
|
||||||
|
.pdu_metadata
|
||||||
|
.is_event_soft_failed(event_id)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
NodeStatus::SoftFailed(missing)
|
||||||
|
} else {
|
||||||
|
NodeStatus::Normal(missing)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let Ok(root) = self.services.rooms.timeline.get_pdu(&event_id).await else {
|
||||||
|
return Err!("Event not found.");
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut graph = String::from(
|
||||||
|
"```mermaid\n%% This is a mermaid graph. You can plug this output into\n\
|
||||||
|
%% https://mermaid.live/edit to visualise it on-the-fly.\nflowchart TD\n\
|
||||||
|
classDef rejected fill:#ffe5e5,stroke:#cc0000,stroke-width:2px,color:#000;\n\
|
||||||
|
classDef soft_failed fill:#fff6cc,stroke:#c9a400,stroke-width:2px,color:#000;\n"
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut node_ids: HashMap<OwnedEventId, String> = HashMap::new();
|
||||||
|
let mut cached_events: HashMap<OwnedEventId, PduEvent> =
|
||||||
|
HashMap::from([(event_id.clone(), root.clone())]);
|
||||||
|
let mut scheduled: HashSet<OwnedEventId> = HashSet::from([event_id.clone()]);
|
||||||
|
let mut visited: HashSet<OwnedEventId> = HashSet::new();
|
||||||
|
let mut stack = vec![root];
|
||||||
|
let mut next_node_id = 0_usize;
|
||||||
|
|
||||||
|
let node_id_for = |event_id: &OwnedEventId,
|
||||||
|
node_ids: &mut HashMap<OwnedEventId, String>,
|
||||||
|
next_node_id: &mut usize| {
|
||||||
|
node_ids
|
||||||
|
.entry(event_id.clone())
|
||||||
|
.or_insert_with(|| {
|
||||||
|
let id = format!("n{}", *next_node_id);
|
||||||
|
*next_node_id = next_node_id.saturating_add(1);
|
||||||
|
id
|
||||||
|
})
|
||||||
|
.clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
while let Some(event) = stack.pop() {
|
||||||
|
let current_event_id = event.event_id().to_owned();
|
||||||
|
if !visited.insert(current_event_id.clone()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let current_node_id = node_id_for(¤t_event_id, &mut node_ids, &mut next_node_id);
|
||||||
|
let current_status = node_status(¤t_event_id, false).await;
|
||||||
|
|
||||||
|
render_node(&mut graph, ¤t_node_id, ¤t_event_id, current_status)?;
|
||||||
|
|
||||||
|
let mut children = Vec::with_capacity(event.auth_events.len());
|
||||||
|
for auth_event_id in event.auth_events().rev() {
|
||||||
|
let auth_event_id = auth_event_id.to_owned();
|
||||||
|
let auth_node_id = node_id_for(&auth_event_id, &mut node_ids, &mut next_node_id);
|
||||||
|
writeln!(graph, "{current_node_id} --> {auth_node_id}")?;
|
||||||
|
|
||||||
|
let first_seen = scheduled.insert(auth_event_id.clone());
|
||||||
|
let auth_pdu = if let Some(auth_pdu) = cached_events.get(&auth_event_id) {
|
||||||
|
// NOTE: events might be referenced multiple times (like the create event)
|
||||||
|
// so this saves some cheeky db lookup time
|
||||||
|
Some(auth_pdu.clone())
|
||||||
|
} else if first_seen {
|
||||||
|
match self.services.rooms.timeline.get_pdu(&auth_event_id).await {
|
||||||
|
| Ok(auth_event) => {
|
||||||
|
cached_events.insert(auth_event_id.clone(), auth_event.clone());
|
||||||
|
Some(auth_event)
|
||||||
|
},
|
||||||
|
| Err(_) => None,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
// NOTE: Depth is used as the primary sorting key here, even though it has no
|
||||||
|
// bearing on state resolution or anything. Timestamp is used as a
|
||||||
|
// tiebreaker, failing back to lexicographical comparison.
|
||||||
|
let (depth, ts) = auth_pdu
|
||||||
|
.as_ref()
|
||||||
|
.map_or((UInt::MAX, UInt::MAX), |pdu| (pdu.depth, pdu.origin_server_ts));
|
||||||
|
|
||||||
|
children.push(AuthChild {
|
||||||
|
node_id: auth_node_id,
|
||||||
|
event_id: auth_event_id,
|
||||||
|
depth,
|
||||||
|
ts,
|
||||||
|
first_seen,
|
||||||
|
pdu: auth_pdu,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
children.sort_by(|a, b| {
|
||||||
|
a.depth
|
||||||
|
.cmp(&b.depth)
|
||||||
|
.then(a.ts.cmp(&b.ts))
|
||||||
|
.then(a.event_id.as_str().cmp(b.event_id.as_str()))
|
||||||
|
});
|
||||||
|
|
||||||
|
for child in children.into_iter().rev() {
|
||||||
|
if !child.first_seen {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(child_pdu) = child.pdu {
|
||||||
|
// We have this PDU so will want to traverse it.
|
||||||
|
stack.push(child_pdu);
|
||||||
|
} else {
|
||||||
|
// We don't have this PDU locally so we can't traverse its auth events,
|
||||||
|
// but we can still render it as a node.
|
||||||
|
render_node(
|
||||||
|
&mut graph,
|
||||||
|
&child.node_id,
|
||||||
|
&child.event_id,
|
||||||
|
node_status(&child.event_id, true).await,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
graph.push_str("```\n");
|
||||||
|
self.write_str(&graph).await
|
||||||
|
}
|
||||||
|
|
||||||
#[admin_command]
|
#[admin_command]
|
||||||
pub(super) async fn parse_pdu(&self) -> Result {
|
pub(super) async fn parse_pdu(&self) -> Result {
|
||||||
if self.body.len() < 2
|
if self.body.len() < 2
|
||||||
@@ -111,15 +294,31 @@ pub(super) async fn get_pdu(&self, event_id: OwnedEventId) -> Result {
|
|||||||
outlier = true;
|
outlier = true;
|
||||||
pdu_json = self.services.rooms.timeline.get_pdu_json(&event_id).await;
|
pdu_json = self.services.rooms.timeline.get_pdu_json(&event_id).await;
|
||||||
}
|
}
|
||||||
|
let rejected = self
|
||||||
|
.services
|
||||||
|
.rooms
|
||||||
|
.pdu_metadata
|
||||||
|
.is_event_rejected(&event_id)
|
||||||
|
.await;
|
||||||
|
let soft_failed = self
|
||||||
|
.services
|
||||||
|
.rooms
|
||||||
|
.pdu_metadata
|
||||||
|
.is_event_soft_failed(&event_id)
|
||||||
|
.await;
|
||||||
|
|
||||||
match pdu_json {
|
match pdu_json {
|
||||||
| Err(_) => return Err!("PDU not found locally."),
|
| Err(_) => return Err!("PDU not found locally."),
|
||||||
| Ok(json) => {
|
| Ok(json) => {
|
||||||
let text = serde_json::to_string_pretty(&json)?;
|
let text = serde_json::to_string_pretty(&json)?;
|
||||||
let msg = if outlier {
|
let msg = if rejected {
|
||||||
"Outlier (Rejected / Soft Failed) PDU found in our database"
|
"Rejected PDU:"
|
||||||
|
} else if soft_failed {
|
||||||
|
"Soft-failed PDU:"
|
||||||
|
} else if outlier {
|
||||||
|
"Outlier PDU:"
|
||||||
} else {
|
} else {
|
||||||
"PDU found in our database"
|
"PDU:"
|
||||||
};
|
};
|
||||||
write!(self, "{msg}\n```json\n{text}\n```")
|
write!(self, "{msg}\n```json\n{text}\n```")
|
||||||
},
|
},
|
||||||
@@ -614,6 +813,10 @@ pub(super) async fn force_set_room_state_from_server(
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
state.insert(shortstatekey, pdu.event_id.clone());
|
state.insert(shortstatekey, pdu.event_id.clone());
|
||||||
|
self.services
|
||||||
|
.rooms
|
||||||
|
.pdu_metadata
|
||||||
|
.clear_pdu_markers(pdu.event_id());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -631,6 +834,10 @@ pub(super) async fn force_set_room_state_from_server(
|
|||||||
.rooms
|
.rooms
|
||||||
.outlier
|
.outlier
|
||||||
.add_pdu_outlier(&event_id, &value);
|
.add_pdu_outlier(&event_id, &value);
|
||||||
|
self.services
|
||||||
|
.rooms
|
||||||
|
.pdu_metadata
|
||||||
|
.clear_pdu_markers(&event_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Resolving new room state");
|
info!("Resolving new room state");
|
||||||
@@ -662,10 +869,7 @@ pub(super) async fn force_set_room_state_from_server(
|
|||||||
.force_state(room_id.clone().as_ref(), short_state_hash, added, removed, &state_lock)
|
.force_state(room_id.clone().as_ref(), short_state_hash, added, removed, &state_lock)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
info!(
|
info!("Updating joined counts for room");
|
||||||
"Updating joined counts for room just in case (e.g. we may have found a difference in \
|
|
||||||
the room's m.room.member state"
|
|
||||||
);
|
|
||||||
self.services
|
self.services
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
|
|||||||
+10
-1
@@ -17,12 +17,21 @@ pub enum DebugCommand {
|
|||||||
message: Vec<String>,
|
message: Vec<String>,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// Get the auth_chain of a PDU
|
/// Loads the auth_chain of a PDU, reporting how long it took.
|
||||||
GetAuthChain {
|
GetAuthChain {
|
||||||
/// An event ID (the $ character followed by the base64 reference hash)
|
/// An event ID (the $ character followed by the base64 reference hash)
|
||||||
event_id: OwnedEventId,
|
event_id: OwnedEventId,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Walks & displays the auth_chain of a PDU in a mermaid graph format.
|
||||||
|
///
|
||||||
|
/// This is useless to basically anyone but developers, and is also probably
|
||||||
|
/// slow and memory hungry.
|
||||||
|
ShowAuthChain {
|
||||||
|
/// The root event ID to start walking back from.
|
||||||
|
event_id: OwnedEventId,
|
||||||
|
},
|
||||||
|
|
||||||
/// Parse and print a PDU from a JSON
|
/// Parse and print a PDU from a JSON
|
||||||
///
|
///
|
||||||
/// The PDU event is only checked for validity and is not added to the
|
/// The PDU event is only checked for validity and is not added to the
|
||||||
|
|||||||
@@ -763,6 +763,41 @@ pub(super) async fn force_join_room(
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[admin_command]
|
||||||
|
pub(super) async fn force_join_room_remotely(
|
||||||
|
&self,
|
||||||
|
user_id: String,
|
||||||
|
room_id: OwnedRoomOrAliasId,
|
||||||
|
via: String,
|
||||||
|
) -> Result {
|
||||||
|
let via = ServerName::parse(&via)?;
|
||||||
|
let user_id = parse_local_user_id(self.services, &user_id)?;
|
||||||
|
let (room_id, mut servers) = self
|
||||||
|
.services
|
||||||
|
.rooms
|
||||||
|
.alias
|
||||||
|
.resolve_with_servers(&room_id, None)
|
||||||
|
.await?;
|
||||||
|
if servers.contains(&via) {
|
||||||
|
servers.retain(|n| *n != via);
|
||||||
|
}
|
||||||
|
servers.insert(0, via);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
self.services.globals.user_is_local(&user_id),
|
||||||
|
"Parsed user_id must be a local user"
|
||||||
|
);
|
||||||
|
let state_lock = self.services.rooms.state.mutex.lock(&*room_id).await;
|
||||||
|
self.services
|
||||||
|
.rooms
|
||||||
|
.membership
|
||||||
|
.join_remote_room(&user_id, &room_id, None, &servers, state_lock)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
self.write_str(&format!("{user_id} has been joined to {room_id}."))
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
#[admin_command]
|
#[admin_command]
|
||||||
pub(super) async fn force_leave_room(
|
pub(super) async fn force_leave_room(
|
||||||
&self,
|
&self,
|
||||||
|
|||||||
@@ -183,6 +183,20 @@ pub enum UserCommand {
|
|||||||
room_id: OwnedRoomOrAliasId,
|
room_id: OwnedRoomOrAliasId,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Manually join a local user to a room via a remote server, regardless of
|
||||||
|
/// our current residency.
|
||||||
|
ForceJoinRoomRemotely {
|
||||||
|
/// The user to join
|
||||||
|
user_id: String,
|
||||||
|
/// The room to join
|
||||||
|
room_id: OwnedRoomOrAliasId,
|
||||||
|
/// The server name to join via.
|
||||||
|
///
|
||||||
|
/// This server will always be tried first, however if more are
|
||||||
|
/// available, they may be tried after.
|
||||||
|
via: String,
|
||||||
|
},
|
||||||
|
|
||||||
/// Manually leave a local user from a room.
|
/// Manually leave a local user from a room.
|
||||||
ForceLeaveRoom {
|
ForceLeaveRoom {
|
||||||
user_id: String,
|
user_id: String,
|
||||||
|
|||||||
@@ -187,7 +187,7 @@ pub(crate) async fn change_password_route(
|
|||||||
if services.server.config.admin_room_notices {
|
if services.server.config.admin_room_notices {
|
||||||
services
|
services
|
||||||
.admin
|
.admin
|
||||||
.notice(&format!("User {} changed their password.", &sender_user))
|
.notice(&format!("User {sender_user} changed their password."))
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -537,10 +537,7 @@ pub(crate) async fn create_room_route(
|
|||||||
if services.server.config.admin_room_notices {
|
if services.server.config.admin_room_notices {
|
||||||
services
|
services
|
||||||
.admin
|
.admin
|
||||||
.send_text(&format!(
|
.send_text(&format!("{sender_user} made {room_id} public to the room directory"))
|
||||||
"{sender_user} made {} public to the room directory",
|
|
||||||
&room_id
|
|
||||||
))
|
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
info!("{sender_user} made {0} public to the room directory", &room_id);
|
info!("{sender_user} made {0} public to the room directory", &room_id);
|
||||||
|
|||||||
@@ -381,6 +381,7 @@ async fn handle_room(
|
|||||||
.rooms
|
.rooms
|
||||||
.event_handler
|
.event_handler
|
||||||
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
|
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
|
||||||
|
.boxed()
|
||||||
.await
|
.await
|
||||||
.map(|_| ());
|
.map(|_| ());
|
||||||
results.push((event_id, result));
|
results.push((event_id, result));
|
||||||
|
|||||||
@@ -260,7 +260,7 @@ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|||||||
write!(f, "{real_error}")
|
write!(f, "{real_error}")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
write!(f, "Request error: {}", &self.0)
|
write!(f, "Request error: {}", self.0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -311,6 +311,11 @@ pub(super) fn open_list(db: &Arc<Engine>, maps: &[Descriptor]) -> Result<Maps> {
|
|||||||
key_size_hint: Some(48),
|
key_size_hint: Some(48),
|
||||||
..descriptor::RANDOM_SMALL
|
..descriptor::RANDOM_SMALL
|
||||||
},
|
},
|
||||||
|
Descriptor {
|
||||||
|
name: "rejectedeventids",
|
||||||
|
key_size_hint: Some(48),
|
||||||
|
..descriptor::RANDOM_SMALL
|
||||||
|
},
|
||||||
Descriptor {
|
Descriptor {
|
||||||
name: "statehash_shortstatehash",
|
name: "statehash_shortstatehash",
|
||||||
val_size_hint: Some(8),
|
val_size_hint: Some(8),
|
||||||
|
|||||||
@@ -44,7 +44,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
|||||||
db,
|
db,
|
||||||
server: args.server.clone(),
|
server: args.server.clone(),
|
||||||
bad_event_ratelimiter: Arc::new(SyncRwLock::new(HashMap::new())),
|
bad_event_ratelimiter: Arc::new(SyncRwLock::new(HashMap::new())),
|
||||||
admin_alias: OwnedRoomAliasId::try_from(format!("#admins:{}", &args.server.name))
|
admin_alias: OwnedRoomAliasId::try_from(format!("#admins:{}", args.server.name))
|
||||||
.expect("#admins:server_name is valid alias name"),
|
.expect("#admins:server_name is valid alias name"),
|
||||||
server_user: UserId::parse_with_server_name(
|
server_user: UserId::parse_with_server_name(
|
||||||
String::from("conduit"),
|
String::from("conduit"),
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ pub struct PasswordReset<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl MessageTemplate for PasswordReset<'_> {
|
impl MessageTemplate for PasswordReset<'_> {
|
||||||
fn subject(&self) -> String { format!("Password reset request for {}", &self.user_id) }
|
fn subject(&self) -> String { format!("Password reset request for {}", self.user_id) }
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Template)]
|
#[derive(Template)]
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ pub fn is_valid(&self) -> bool {
|
|||||||
|
|
||||||
impl std::fmt::Display for DatabaseTokenInfo {
|
impl std::fmt::Display for DatabaseTokenInfo {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
write!(f, "Token created by {} and used {} times. ", &self.creator, self.uses)?;
|
write!(f, "Token created by {} and used {} times. ", self.creator, self.uses)?;
|
||||||
if let Some(expires) = &self.expires {
|
if let Some(expires) = &self.expires {
|
||||||
write!(f, "{expires}.")?;
|
write!(f, "{expires}.")?;
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ pub struct ValidToken {
|
|||||||
|
|
||||||
impl std::fmt::Display for ValidToken {
|
impl std::fmt::Display for ValidToken {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
write!(f, "`{}` --- {}", self.token, &self.source)
|
write!(f, "`{}` --- {}", self.token, self.source)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,40 +1,259 @@
|
|||||||
use std::{
|
use std::{
|
||||||
collections::{BTreeMap, HashSet, VecDeque, hash_map},
|
cmp::min,
|
||||||
|
collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map},
|
||||||
time::Instant,
|
time::Instant,
|
||||||
};
|
};
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Event, PduEvent, debug, debug_warn, implement, matrix::event::gen_event_id_canonical_json,
|
Err, Event, PduEvent, debug, debug_info, debug_warn, err,
|
||||||
|
matrix::event::gen_event_id_canonical_json, state_res::lexicographical_topological_sort,
|
||||||
trace, utils::continue_exponential_backoff_secs, warn,
|
trace, utils::continue_exponential_backoff_secs, warn,
|
||||||
};
|
};
|
||||||
|
use futures::StreamExt;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
|
CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
|
||||||
api::federation::event::get_event,
|
OwnedRoomId, OwnedServerName, RoomId, ServerName, UInt,
|
||||||
|
api::federation::event::{get_event, get_missing_events},
|
||||||
|
int,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::get_room_version_rules;
|
use super::get_room_version_rules;
|
||||||
|
|
||||||
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
|
/// Attempts to build a localised directed acyclic graph out of the given PDUs,
|
||||||
/// it is appended to the outliers Tree.
|
/// returning them in a topologically sorted order.
|
||||||
///
|
///
|
||||||
/// Returns pdu and if we fetched it over federation the raw json.
|
/// This is used to attempt to process PDUs in an order that respects their
|
||||||
///
|
/// dependencies, however it is ultimately the sender's responsibility to send
|
||||||
/// a. Look in the main timeline (pduid_pdu tree)
|
/// them in a processable order, so this is just a best effort attempt. It does
|
||||||
/// b. Look at outlier pdu tree
|
/// not account for power levels or other tie breaks.
|
||||||
/// c. Ask origin server over federation
|
pub async fn build_local_dag<S: std::hash::BuildHasher>(
|
||||||
/// d. TODO: Ask other servers over federation?
|
pdu_map: &HashMap<OwnedEventId, CanonicalJsonObject, S>,
|
||||||
#[implement(super::Service)]
|
) -> conduwuit::Result<Vec<OwnedEventId>> {
|
||||||
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
|
debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs");
|
||||||
|
let mut dag: HashMap<OwnedEventId, HashSet<OwnedEventId>> =
|
||||||
|
HashMap::with_capacity(pdu_map.len());
|
||||||
|
let mut id_origin_ts: HashMap<OwnedEventId, _> = HashMap::with_capacity(pdu_map.len());
|
||||||
|
|
||||||
|
for (event_id, value) in pdu_map {
|
||||||
|
// We already checked that these properties are correct in parse_incoming_pdu,
|
||||||
|
// so it's safe to unwrap here.
|
||||||
|
// We also filter to remove any prev_events that are not in this pdu_map, as we
|
||||||
|
// need to have at least one event with zero out degrees for the lexico-topo
|
||||||
|
// sort below. If there are multiple events with omitted prevs, they will be
|
||||||
|
// ordered by timestamp, then event ID. At that point though, it's unlikely to
|
||||||
|
// matter.
|
||||||
|
let prev_events = value
|
||||||
|
.get("prev_events")
|
||||||
|
.unwrap()
|
||||||
|
.as_array()
|
||||||
|
.unwrap()
|
||||||
|
.iter()
|
||||||
|
.map(|v| EventId::parse(v.as_str().unwrap()).unwrap())
|
||||||
|
.filter(|id| pdu_map.contains_key(id))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
dag.insert(event_id.clone(), prev_events);
|
||||||
|
let origin_server_ts = value
|
||||||
|
.get("origin_server_ts")
|
||||||
|
.and_then(CanonicalJsonValue::as_integer)
|
||||||
|
.unwrap_or_default();
|
||||||
|
id_origin_ts.insert(event_id.clone(), origin_server_ts);
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!(count = dag.len(), "Sorting incoming events with partial graph");
|
||||||
|
lexicographical_topological_sort(&dag, &async |node_id| {
|
||||||
|
// Note: we don't bother fetching power levels because that would massively slow
|
||||||
|
// this function down. This is a best-effort attempt to order events correctly
|
||||||
|
// for processing, however ultimately that should be the sender's job.
|
||||||
|
let ts = id_origin_ts
|
||||||
|
.get(&node_id)
|
||||||
|
.copied()
|
||||||
|
.unwrap_or_else(|| int!(0))
|
||||||
|
.to_string()
|
||||||
|
.parse::<u64>()
|
||||||
|
.ok()
|
||||||
|
.and_then(UInt::new)
|
||||||
|
.unwrap_or_default();
|
||||||
|
Ok((int!(0), MilliSecondsSinceUnixEpoch(ts)))
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.inspect(|sorted| {
|
||||||
|
debug_assert_eq!(
|
||||||
|
sorted.len(),
|
||||||
|
pdu_map.len(),
|
||||||
|
"Sorted graph was not the same size as the input graph"
|
||||||
|
);
|
||||||
|
})
|
||||||
|
.map_err(|e| err!("failed to resolve local graph: {e}"))
|
||||||
|
}
|
||||||
|
|
||||||
|
impl super::Service {
|
||||||
|
/// Uses `/_matrix/federation/v1/get_missing_events` to fill gaps in the
|
||||||
|
/// DAG.
|
||||||
|
///
|
||||||
|
/// When this function is called, the "earliest events" (current forward
|
||||||
|
/// extremities) will be collected, and the function will loop with an
|
||||||
|
/// exponentially incrementing limit (up to 100 per request) until it has
|
||||||
|
/// filled the gap, i.e. when the remote says there's no more events.
|
||||||
|
///
|
||||||
|
/// This function does not persist the events. The caller is responsible for
|
||||||
|
/// passing them through handle_incoming_pdu.
|
||||||
|
pub(super) async fn backfill_missing_events(
|
||||||
|
&self,
|
||||||
|
room_id: OwnedRoomId,
|
||||||
|
head: Vec<OwnedEventId>,
|
||||||
|
via: OwnedServerName,
|
||||||
|
) -> conduwuit::Result<HashMap<OwnedEventId, PduEvent>> {
|
||||||
|
if head.is_empty() {
|
||||||
|
return Ok(HashMap::new());
|
||||||
|
}
|
||||||
|
let tail = self
|
||||||
|
.services
|
||||||
|
.state
|
||||||
|
.get_forward_extremities(&room_id)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.await;
|
||||||
|
// TODO: min_depth is probably necessary to avoid fetching the entire room
|
||||||
|
// history if there are very long gaps
|
||||||
|
let mut latest_events: HashSet<OwnedEventId> = HashSet::from_iter(head.clone());
|
||||||
|
let mut loop_count: u64 = 3;
|
||||||
|
// Start with a base number of 3 so that we fetch 10, 16, 25, 36, etc
|
||||||
|
// instead of 1, 2, 4, 9, so on.
|
||||||
|
let mut backfilled_events = HashMap::with_capacity(10);
|
||||||
|
|
||||||
|
while !latest_events.is_empty() {
|
||||||
|
let todo: Vec<OwnedEventId> = latest_events.clone().into_iter().collect();
|
||||||
|
let mut request =
|
||||||
|
get_missing_events::v1::Request::new(room_id.clone(), tail.clone(), todo.clone());
|
||||||
|
let limit = min(loop_count.saturating_pow(2), 100);
|
||||||
|
request.limit = limit
|
||||||
|
.try_into()
|
||||||
|
.expect("limit cannot be greater than 100, which fits into UInt");
|
||||||
|
|
||||||
|
debug_info!(
|
||||||
|
backfilled=%backfilled_events.len(),
|
||||||
|
%loop_count,
|
||||||
|
"Asking {via} for up to {limit} missing events",
|
||||||
|
);
|
||||||
|
trace!(
|
||||||
|
?latest_events,
|
||||||
|
?tail,
|
||||||
|
%via,
|
||||||
|
%limit,
|
||||||
|
"Requesting missing events"
|
||||||
|
);
|
||||||
|
let response: get_missing_events::v1::Response = self
|
||||||
|
.services
|
||||||
|
.sending
|
||||||
|
.send_federation_request(&via, request)
|
||||||
|
.await?;
|
||||||
|
loop_count = loop_count.saturating_add(1);
|
||||||
|
|
||||||
|
// Some buggy servers (including old continuwuity) may return the same events
|
||||||
|
// multiple times, which can cause this to be an infinite loop.
|
||||||
|
// In order to break this loop, if we see no new events from this response (i.e.
|
||||||
|
// all events in the response are already in backfilled_events), we stop,
|
||||||
|
// with a warning.
|
||||||
|
let mut unseen: usize = 0;
|
||||||
|
let chunk_len = response.events.len();
|
||||||
|
if response.events.is_empty() {
|
||||||
|
debug_info!("No more missing events found");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
for event in response.events {
|
||||||
|
trace!("Parsing incoming event from backfill");
|
||||||
|
let (incoming_room_id, event_id, pdu_json) =
|
||||||
|
self.parse_incoming_pdu(&event).await.map_err(|e| {
|
||||||
|
err!(BadServerResponse("{via} returned an invalid event: {e:?}"))
|
||||||
|
})?;
|
||||||
|
trace!(%incoming_room_id, %event_id, "Parsed incoming event from backfill");
|
||||||
|
if incoming_room_id != room_id {
|
||||||
|
return Err!(BadServerResponse(
|
||||||
|
"{via} returned {event_id} in missing events which belongs to \
|
||||||
|
{incoming_room_id}, not {room_id}"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
latest_events.remove(&event_id);
|
||||||
|
if head.contains(&event_id) || tail.contains(&event_id) {
|
||||||
|
debug!("Skipping known event {event_id}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if backfilled_events.contains_key(&event_id) {
|
||||||
|
debug_warn!(%via, %event_id, "Remote retransmitted event");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// TODO: Should this be scoped to the GME session? We might end up incorrectly
|
||||||
|
// assuming we've caught up if we do this
|
||||||
|
if let Ok(pdu) = self.services.timeline.get_pdu(&event_id).await {
|
||||||
|
debug!(%via, %event_id, "Already seen event in database");
|
||||||
|
backfilled_events.insert(event_id.clone(), pdu);
|
||||||
|
} else {
|
||||||
|
unseen = unseen.saturating_add(1);
|
||||||
|
}
|
||||||
|
let parsed = PduEvent::from_id_val(&event_id, pdu_json)
|
||||||
|
.map_err(|e| err!(BadServerResponse("Unable to parse {event_id}: {e}")))?;
|
||||||
|
for prev_event_id in parsed.prev_events() {
|
||||||
|
// Verify that we have all of this event's prev_events. If we don't, add it to
|
||||||
|
// the work queue
|
||||||
|
if !(backfilled_events.contains_key(prev_event_id)
|
||||||
|
|| self.services.timeline.pdu_exists(prev_event_id).await)
|
||||||
|
{
|
||||||
|
latest_events.insert(prev_event_id.to_owned());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
backfilled_events.insert(event_id.clone(), parsed);
|
||||||
|
}
|
||||||
|
for event_id in todo {
|
||||||
|
latest_events.remove(&event_id);
|
||||||
|
if let hash_map::Entry::Vacant(e) = backfilled_events.entry(event_id.clone()) {
|
||||||
|
let evt = self.services.timeline.get_pdu(&event_id).await?;
|
||||||
|
e.insert(evt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
debug!(
|
||||||
|
count=%chunk_len,
|
||||||
|
new=%unseen,
|
||||||
|
remaining=%latest_events.len(),
|
||||||
|
"Got missing events"
|
||||||
|
);
|
||||||
|
if unseen == 0 {
|
||||||
|
debug_warn!("Didn't see any new events, breaking cycle");
|
||||||
|
break;
|
||||||
|
} else if chunk_len < usize::try_from(limit)? {
|
||||||
|
debug!(
|
||||||
|
"Got less than the limit number of events, assuming there's no more to fetch"
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
debug_info!("Successfully fetched {} missing events from {via}", backfilled_events.len());
|
||||||
|
trace!("Missing_events: {backfilled_events:?}");
|
||||||
|
Ok(backfilled_events)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
|
||||||
|
/// it is appended to the outliers Tree.
|
||||||
|
///
|
||||||
|
/// Returns pdu and if we fetched it over federation the raw json.
|
||||||
|
///
|
||||||
|
/// a. Look in the main timeline (pduid_pdu tree)
|
||||||
|
/// b. Look at outlier pdu tree
|
||||||
|
/// c. Ask origin server over federation
|
||||||
|
/// d. TODO: Ask other servers over federation?
|
||||||
|
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
|
||||||
&self,
|
&self,
|
||||||
origin: &'a ServerName,
|
origin: &'a ServerName,
|
||||||
events: Events,
|
events: Events,
|
||||||
create_event: &'a Pdu,
|
create_event: &'a Pdu,
|
||||||
room_id: &'a RoomId,
|
room_id: &'a RoomId,
|
||||||
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
|
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
|
||||||
where
|
where
|
||||||
Pdu: Event + Send + Sync,
|
Pdu: Event + Send + Sync,
|
||||||
Events: Iterator<Item = &'a EventId> + Clone + Send,
|
Events: Iterator<Item = &'a EventId> + Clone + Send,
|
||||||
{
|
{
|
||||||
let back_off = |id| match self
|
let back_off = |id| match self
|
||||||
.services
|
.services
|
||||||
.globals
|
.globals
|
||||||
@@ -131,8 +350,8 @@ pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
|
|||||||
|
|
||||||
if calculated_event_id != *next_id {
|
if calculated_event_id != *next_id {
|
||||||
warn!(
|
warn!(
|
||||||
"Server didn't return event id we requested: requested: {next_id}, \
|
"Server didn't return event id we requested: requested: \
|
||||||
we got {calculated_event_id}. Event: {:?}",
|
{next_id}, we got {calculated_event_id}. Event: {:?}",
|
||||||
&res.pdu
|
&res.pdu
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -147,7 +366,8 @@ pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
|
|||||||
) {
|
) {
|
||||||
| Ok(auth_event) => {
|
| Ok(auth_event) => {
|
||||||
trace!(
|
trace!(
|
||||||
"Found auth event id {auth_event} for event {next_id}"
|
"Found auth event id {auth_event} for event \
|
||||||
|
{next_id}"
|
||||||
);
|
);
|
||||||
todo_auth_events.push_back(auth_event);
|
todo_auth_events.push_back(auth_event);
|
||||||
},
|
},
|
||||||
@@ -230,4 +450,5 @@ pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
|
|||||||
}
|
}
|
||||||
trace!("Fetched and handled {} outlier pdus", pdus.len());
|
trace!("Fetched and handled {} outlier pdus", pdus.len());
|
||||||
pdus
|
pdus
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,128 +1,118 @@
|
|||||||
use std::{
|
use std::collections::{HashMap, HashSet, VecDeque};
|
||||||
collections::{BTreeMap, HashMap, HashSet, VecDeque},
|
|
||||||
iter::once,
|
|
||||||
};
|
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{Event, PduEvent, debug, debug_info, error, trace};
|
||||||
Event, PduEvent, Result, debug_warn, err, implement,
|
use ruma::{OwnedEventId, RoomId, ServerName};
|
||||||
state_res::{self},
|
|
||||||
};
|
|
||||||
use futures::{FutureExt, future};
|
|
||||||
use ruma::{
|
|
||||||
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
|
|
||||||
int, uint,
|
|
||||||
};
|
|
||||||
|
|
||||||
use super::check_room_id;
|
use crate::rooms::event_handler::build_local_dag;
|
||||||
|
|
||||||
#[implement(super::Service)]
|
impl super::Service {
|
||||||
#[tracing::instrument(
|
pub(super) async fn fetch_prevs(
|
||||||
level = "debug",
|
|
||||||
skip_all,
|
|
||||||
fields(%origin),
|
|
||||||
)]
|
|
||||||
#[allow(clippy::type_complexity)]
|
|
||||||
pub(super) async fn fetch_prev<'a, Pdu, Events>(
|
|
||||||
&self,
|
&self,
|
||||||
origin: &ServerName,
|
|
||||||
create_event: &Pdu,
|
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
first_ts_in_room: MilliSecondsSinceUnixEpoch,
|
create_event: &PduEvent,
|
||||||
initial_set: Events,
|
incoming_pdu: &PduEvent,
|
||||||
) -> Result<(
|
origin: &ServerName,
|
||||||
Vec<OwnedEventId>,
|
) -> conduwuit::Result<()> {
|
||||||
HashMap<OwnedEventId, (PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
|
let mut queue: VecDeque<OwnedEventId> = VecDeque::new();
|
||||||
)>
|
queue.push_back(incoming_pdu.event_id().to_owned());
|
||||||
where
|
|
||||||
Pdu: Event + Send + Sync,
|
|
||||||
Events: Iterator<Item = &'a EventId> + Clone + Send,
|
|
||||||
{
|
|
||||||
let num_ids = initial_set.clone().count();
|
|
||||||
let mut eventid_info = HashMap::new();
|
|
||||||
let mut graph: HashMap<OwnedEventId, _> = HashMap::with_capacity(num_ids);
|
|
||||||
let mut todo_outlier_stack: VecDeque<OwnedEventId> =
|
|
||||||
initial_set.map(ToOwned::to_owned).collect();
|
|
||||||
|
|
||||||
let mut amount = 0;
|
|
||||||
|
|
||||||
while let Some(prev_event_id) = todo_outlier_stack.pop_front() {
|
|
||||||
self.services.server.check_running()?;
|
|
||||||
|
|
||||||
|
while let Some(event_id) = queue.pop_front() {
|
||||||
|
debug!(event_id=%incoming_pdu.event_id, "Fetching any missing prev_events");
|
||||||
|
let mut missing_prev_events: HashSet<OwnedEventId> =
|
||||||
|
incoming_pdu.prev_events().map(ToOwned::to_owned).collect();
|
||||||
|
for pid in missing_prev_events.clone() {
|
||||||
|
if self.services.timeline.pdu_exists(&pid).await {
|
||||||
|
trace!("Found prev event {pid} for outlier event {event_id} locally");
|
||||||
|
missing_prev_events.remove(&pid);
|
||||||
|
} else {
|
||||||
|
debug_info!(
|
||||||
|
"Could not find prev event {pid} for outlier event {event_id} locally, \
|
||||||
|
will fetch over federation"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !missing_prev_events.is_empty() {
|
||||||
|
debug_info!(
|
||||||
|
"Fetching {} missing prev events for outlier event {event_id}",
|
||||||
|
missing_prev_events.len()
|
||||||
|
);
|
||||||
|
let backfilled = self
|
||||||
|
.backfill_missing_events(
|
||||||
|
room_id.to_owned(),
|
||||||
|
vec![event_id.clone()],
|
||||||
|
origin.to_owned(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
debug_info!("Fetched {} missing events for {event_id}", backfilled.len());
|
||||||
|
let mapped = backfilled
|
||||||
|
.iter()
|
||||||
|
.map(|(eid, evt)| {
|
||||||
|
let mut obj = evt.to_canonical_object();
|
||||||
|
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
|
||||||
|
(eid.clone(), obj)
|
||||||
|
})
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
let local_dag = if mapped.len() == 1 {
|
||||||
|
mapped.keys().map(ToOwned::to_owned).collect()
|
||||||
|
} else {
|
||||||
|
build_local_dag(&mapped).await?
|
||||||
|
};
|
||||||
|
debug_info!("Preparing to handle {} missing events", backfilled.len());
|
||||||
|
for prev_event_id in local_dag {
|
||||||
|
let obj = mapped
|
||||||
|
.get(&prev_event_id)
|
||||||
|
.expect("We should have this event in memory");
|
||||||
|
debug_info!("Handling prev event {prev_event_id}");
|
||||||
match self
|
match self
|
||||||
.fetch_and_handle_outliers(
|
.handle_outlier_pdu(
|
||||||
origin,
|
origin,
|
||||||
once(prev_event_id.as_ref()),
|
|
||||||
create_event,
|
create_event,
|
||||||
|
&prev_event_id,
|
||||||
|
room_id,
|
||||||
|
obj.clone(),
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
| Ok(_) => {
|
||||||
|
debug!("Successfully handled {prev_event_id} as an outlier");
|
||||||
|
missing_prev_events.remove(&prev_event_id);
|
||||||
|
},
|
||||||
|
| Err(e) =>
|
||||||
|
error!(error=?e, %prev_event_id, %event_id, "Failed to handle prev event"),
|
||||||
|
}
|
||||||
|
debug_info!("Finished handling prev");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let outlier = self.services.timeline.get_pdu(&event_id).await;
|
||||||
|
if missing_prev_events.is_empty()
|
||||||
|
&& let Ok(pdu) = outlier
|
||||||
|
{
|
||||||
|
// promote any prevs first
|
||||||
|
for prev_event_id in pdu.prev_events() {
|
||||||
|
debug_info!("Promoting prev event {prev_event_id} to timeline");
|
||||||
|
let prev_pdu = self.services.timeline.get_pdu(&event_id).await?;
|
||||||
|
let val = prev_pdu.to_canonical_object();
|
||||||
|
self.upgrade_outlier_to_timeline_pdu(
|
||||||
|
prev_pdu,
|
||||||
|
val,
|
||||||
|
create_event,
|
||||||
|
origin,
|
||||||
room_id,
|
room_id,
|
||||||
)
|
)
|
||||||
.boxed()
|
.await?;
|
||||||
.await
|
debug_info!("Finished prev promoting {prev_event_id} to timeline");
|
||||||
.pop()
|
|
||||||
{
|
|
||||||
| Some((pdu, mut json_opt)) => {
|
|
||||||
check_room_id(room_id, &pdu)?;
|
|
||||||
|
|
||||||
let limit = self.services.server.config.max_fetch_prev_events;
|
|
||||||
if amount > limit {
|
|
||||||
debug_warn!("Max prev event limit reached! Limit: {limit}");
|
|
||||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
debug_info!("Promoting event {event_id} to timeline");
|
||||||
if json_opt.is_none() {
|
let val = pdu.to_canonical_object();
|
||||||
json_opt = self
|
self.upgrade_outlier_to_timeline_pdu(pdu, val, create_event, origin, room_id)
|
||||||
.services
|
.await?;
|
||||||
.outlier
|
debug_info!("Finished promoting {event_id} to timeline");
|
||||||
.get_outlier_pdu_json(&prev_event_id)
|
|
||||||
.await
|
|
||||||
.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(json) = json_opt {
|
|
||||||
if pdu.origin_server_ts() > first_ts_in_room {
|
|
||||||
amount = amount.saturating_add(1);
|
|
||||||
for prev_prev in pdu.prev_events() {
|
|
||||||
if !graph.contains_key(prev_prev) {
|
|
||||||
todo_outlier_stack.push_back(prev_prev.to_owned());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
graph.insert(
|
|
||||||
prev_event_id.clone(),
|
|
||||||
pdu.prev_events().map(ToOwned::to_owned).collect(),
|
|
||||||
);
|
|
||||||
} else {
|
} else {
|
||||||
// Time based check failed
|
debug!(?missing_prev_events, ok=%outlier.is_ok(), "Not promoting {event_id}");
|
||||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
|
||||||
}
|
|
||||||
|
|
||||||
eventid_info.insert(prev_event_id.clone(), (pdu, json));
|
|
||||||
} else {
|
|
||||||
// Get json failed, so this was not fetched over federation
|
|
||||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
|
||||||
}
|
|
||||||
},
|
|
||||||
| _ => {
|
|
||||||
// Fetch and handle failed
|
|
||||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let event_fetch = |event_id| {
|
Ok(())
|
||||||
let origin_server_ts = eventid_info
|
}
|
||||||
.get(&event_id)
|
|
||||||
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get());
|
|
||||||
|
|
||||||
// This return value is the key used for sorting events,
|
|
||||||
// events are then sorted by power level, time,
|
|
||||||
// and lexically by event_id.
|
|
||||||
future::ok((int!(0), MilliSecondsSinceUnixEpoch(origin_server_ts)))
|
|
||||||
};
|
|
||||||
|
|
||||||
let sorted = state_res::lexicographical_topological_sort(&graph, &event_fetch)
|
|
||||||
.await
|
|
||||||
.map_err(|e| err!(Database(error!("Error sorting prev events: {e}"))))?;
|
|
||||||
|
|
||||||
Ok((sorted, eventid_info))
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
use std::collections::{HashMap, hash_map};
|
use std::collections::{HashMap, hash_map};
|
||||||
|
|
||||||
use conduwuit::{Err, Event, Result, debug, debug_warn, err, implement};
|
use conduwuit::{Err, Event, Result, debug, debug_warn, err, implement};
|
||||||
use futures::FutureExt;
|
|
||||||
use ruma::{
|
use ruma::{
|
||||||
EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids,
|
EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids,
|
||||||
events::StateEventType,
|
events::StateEventType,
|
||||||
@@ -42,7 +41,6 @@ pub(super) async fn fetch_state<Pdu>(
|
|||||||
let state_ids = res.pdu_ids.iter().map(AsRef::as_ref);
|
let state_ids = res.pdu_ids.iter().map(AsRef::as_ref);
|
||||||
let state_vec = self
|
let state_vec = self
|
||||||
.fetch_and_handle_outliers(origin, state_ids, create_event, room_id)
|
.fetch_and_handle_outliers(origin, state_ids, create_event, room_id)
|
||||||
.boxed()
|
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let mut state: HashMap<ShortStateKey, OwnedEventId> = HashMap::with_capacity(state_vec.len());
|
let mut state: HashMap<ShortStateKey, OwnedEventId> = HashMap::with_capacity(state_vec.len());
|
||||||
|
|||||||
@@ -1,14 +1,11 @@
|
|||||||
use std::{
|
use std::{collections::BTreeMap, time::Instant};
|
||||||
collections::{BTreeMap, hash_map},
|
|
||||||
time::Instant,
|
|
||||||
};
|
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, debug_error, debug_info, defer, err,
|
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, debug_error, debug_info, defer, err,
|
||||||
implement, info, trace, utils::stream::IterStream, warn,
|
implement, info, trace, warn,
|
||||||
};
|
};
|
||||||
use futures::{
|
use futures::{
|
||||||
FutureExt, TryFutureExt, TryStreamExt,
|
FutureExt,
|
||||||
future::{OptionFuture, try_join4},
|
future::{OptionFuture, try_join4},
|
||||||
};
|
};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
@@ -215,72 +212,6 @@ pub async fn handle_incoming_pdu<'a>(
|
|||||||
.get_room_create_event(room_id)
|
.get_room_create_event(room_id)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let (incoming_pdu, val) = self
|
|
||||||
.handle_outlier_pdu(origin, create_event, event_id, room_id, value, false)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// 8. if not timeline event: stop
|
|
||||||
if !is_timeline_event {
|
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Skip old events
|
|
||||||
let first_ts_in_room = self
|
|
||||||
.services
|
|
||||||
.timeline
|
|
||||||
.first_pdu_in_room(room_id)
|
|
||||||
.await?
|
|
||||||
.origin_server_ts();
|
|
||||||
|
|
||||||
// 9. Fetch any missing prev events doing all checks listed here starting at 1.
|
|
||||||
// These are timeline events
|
|
||||||
let (sorted_prev_events, mut eventid_info) = self
|
|
||||||
.fetch_prev(origin, create_event, room_id, first_ts_in_room, incoming_pdu.prev_events())
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
events = ?sorted_prev_events,
|
|
||||||
"Handling previous events"
|
|
||||||
);
|
|
||||||
|
|
||||||
sorted_prev_events
|
|
||||||
.iter()
|
|
||||||
.try_stream()
|
|
||||||
.map_ok(AsRef::as_ref)
|
|
||||||
.try_for_each(|prev_id| {
|
|
||||||
self.handle_prev_pdu(
|
|
||||||
origin,
|
|
||||||
event_id,
|
|
||||||
room_id,
|
|
||||||
eventid_info.remove(prev_id),
|
|
||||||
create_event,
|
|
||||||
first_ts_in_room,
|
|
||||||
prev_id,
|
|
||||||
)
|
|
||||||
.inspect_err(move |e| {
|
|
||||||
warn!("Prev {prev_id} failed: {e}");
|
|
||||||
match self
|
|
||||||
.services
|
|
||||||
.globals
|
|
||||||
.bad_event_ratelimiter
|
|
||||||
.write()
|
|
||||||
.entry(prev_id.into())
|
|
||||||
{
|
|
||||||
| hash_map::Entry::Vacant(e) => {
|
|
||||||
e.insert((Instant::now(), 1));
|
|
||||||
},
|
|
||||||
| hash_map::Entry::Occupied(mut e) => {
|
|
||||||
let tries = e.get().1.saturating_add(1);
|
|
||||||
*e.get_mut() = (Instant::now(), tries);
|
|
||||||
},
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.map(|_| self.services.server.check_running())
|
|
||||||
})
|
|
||||||
.boxed()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// Done with prev events, now handling the incoming event
|
|
||||||
let start_time = Instant::now();
|
let start_time = Instant::now();
|
||||||
self.federation_handletime
|
self.federation_handletime
|
||||||
.write()
|
.write()
|
||||||
@@ -292,7 +223,31 @@ pub async fn handle_incoming_pdu<'a>(
|
|||||||
.remove(room_id);
|
.remove(room_id);
|
||||||
}};
|
}};
|
||||||
|
|
||||||
|
let (incoming_pdu, val) = self
|
||||||
|
.handle_outlier_pdu(origin, create_event, event_id, room_id, value, false)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// 8. if not timeline event: stop
|
||||||
|
if !is_timeline_event {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip old events
|
||||||
|
// let first_ts_in_room = self
|
||||||
|
// .services
|
||||||
|
// .timeline
|
||||||
|
// .first_pdu_in_room(room_id)
|
||||||
|
// .await?
|
||||||
|
// .origin_server_ts();
|
||||||
|
|
||||||
|
// 9. Fetch any missing prev events doing all checks listed here starting at 1.
|
||||||
|
// These are timeline events
|
||||||
|
debug!("Handling previous events");
|
||||||
|
|
||||||
|
self.fetch_prevs(room_id, create_event, &incoming_pdu, origin)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Done with prev events, now handling the incoming event
|
||||||
self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
|
self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
|
||||||
.boxed()
|
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use std::collections::{BTreeMap, HashMap, hash_map};
|
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res,
|
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res,
|
||||||
@@ -10,7 +10,7 @@
|
|||||||
events::StateEventType,
|
events::StateEventType,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::{check_room_id, get_room_version_rules};
|
use super::{build_local_dag, check_room_id, get_room_version_rules};
|
||||||
use crate::rooms::timeline::pdu_fits;
|
use crate::rooms::timeline::pdu_fits;
|
||||||
|
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
@@ -22,7 +22,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
|||||||
event_id: &'a EventId,
|
event_id: &'a EventId,
|
||||||
room_id: &'a RoomId,
|
room_id: &'a RoomId,
|
||||||
mut value: CanonicalJsonObject,
|
mut value: CanonicalJsonObject,
|
||||||
auth_events_known: bool,
|
_auth_events_known: bool,
|
||||||
) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>
|
) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>
|
||||||
where
|
where
|
||||||
Pdu: Event + Send + Sync,
|
Pdu: Event + Send + Sync,
|
||||||
@@ -100,42 +100,71 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Fetch any missing ones & reject invalid ones
|
// Fetch any missing ones & reject invalid ones
|
||||||
let missing_auth_events = if auth_events_known {
|
let mut missing_auth_events: HashSet<OwnedEventId> = pdu_event
|
||||||
pdu_event
|
|
||||||
.auth_events()
|
.auth_events()
|
||||||
.filter(|id| !auth_events.contains_key(*id))
|
.filter(|id| !auth_events.contains_key(*id))
|
||||||
.collect::<Vec<_>>()
|
.map(ToOwned::to_owned)
|
||||||
} else {
|
.collect();
|
||||||
pdu_event.auth_events().collect::<Vec<_>>()
|
|
||||||
};
|
if !missing_auth_events.is_empty() {
|
||||||
if !missing_auth_events.is_empty() || !auth_events_known {
|
|
||||||
debug_info!(
|
debug_info!(
|
||||||
"Fetching {} missing auth events for outlier event {event_id}",
|
"Fetching {} missing auth events for outlier event {event_id}",
|
||||||
missing_auth_events.len()
|
missing_auth_events.len()
|
||||||
);
|
);
|
||||||
for (pdu, _) in self
|
let backfilled = self
|
||||||
.fetch_and_handle_outliers(
|
.backfill_missing_events(
|
||||||
origin,
|
room_id.to_owned(),
|
||||||
missing_auth_events.iter().copied(),
|
vec![event_id.to_owned()],
|
||||||
create_event,
|
origin.to_owned(),
|
||||||
room_id,
|
|
||||||
)
|
)
|
||||||
.await
|
.await?;
|
||||||
{
|
debug_info!("Fetched {} missing auth events for {event_id}", backfilled.len());
|
||||||
auth_events.insert(pdu.event_id().to_owned(), pdu);
|
let mapped = backfilled
|
||||||
|
.iter()
|
||||||
|
.map(|(eid, evt)| {
|
||||||
|
let mut obj = evt.to_canonical_object();
|
||||||
|
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
|
||||||
|
(eid.clone(), obj)
|
||||||
|
})
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
let local_dag = if mapped.len() == 1 {
|
||||||
|
mapped.keys().map(ToOwned::to_owned).collect()
|
||||||
|
} else {
|
||||||
|
build_local_dag(&mapped).await?
|
||||||
|
};
|
||||||
|
debug_info!("Preparing to handle {} missing auth events", backfilled.len());
|
||||||
|
for prev_event_id in local_dag {
|
||||||
|
let obj = mapped
|
||||||
|
.get(&prev_event_id)
|
||||||
|
.expect("We should have this event in memory");
|
||||||
|
debug_info!("Handling prev {prev_event_id}");
|
||||||
|
let (prev, _) = Box::pin(self.handle_outlier_pdu(
|
||||||
|
origin,
|
||||||
|
create_event,
|
||||||
|
&prev_event_id,
|
||||||
|
room_id,
|
||||||
|
obj.clone(),
|
||||||
|
false,
|
||||||
|
))
|
||||||
|
.await?;
|
||||||
|
if missing_auth_events.contains(&*prev_event_id) {
|
||||||
|
missing_auth_events.remove(&prev_event_id);
|
||||||
|
auth_events.insert(prev_event_id, prev);
|
||||||
|
}
|
||||||
|
debug_info!("Finished handling prev auth event");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
debug!("No missing auth events for outlier event {event_id}");
|
debug!("No missing auth events for outlier event {event_id}");
|
||||||
}
|
}
|
||||||
// reject if we are still missing some
|
// reject if we are still missing some auth events.
|
||||||
let still_missing = pdu_event
|
// If we're still missing prev events, we will fetch them individually later,
|
||||||
.auth_events()
|
// but there's no reason for us to be missing auth events now we've gapfilled
|
||||||
.filter(|id| !auth_events.contains_key(*id))
|
// the DAG.
|
||||||
.collect::<Vec<_>>();
|
if !missing_auth_events.is_empty() {
|
||||||
if !still_missing.is_empty() {
|
// Don't reject: this could be a temporary condition
|
||||||
return Err!(Request(InvalidParam(
|
return Err!(Request(InvalidParam(
|
||||||
"Could not fetch all auth events for outlier event {event_id}, still missing: \
|
"Could not fetch all auth events for outlier event {event_id}, still missing: \
|
||||||
{still_missing:?}"
|
{missing_auth_events:?}"
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -163,6 +192,10 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
|||||||
v.insert(auth_event);
|
v.insert(auth_event);
|
||||||
},
|
},
|
||||||
| hash_map::Entry::Occupied(_) => {
|
| hash_map::Entry::Occupied(_) => {
|
||||||
|
self.services
|
||||||
|
.outlier
|
||||||
|
.add_pdu_outlier(pdu_event.event_id(), &incoming_pdu);
|
||||||
|
self.services.pdu_metadata.mark_event_rejected(event_id);
|
||||||
return Err!(Request(InvalidParam(
|
return Err!(Request(InvalidParam(
|
||||||
"Auth event's type and state_key combination exists multiple times: {}, {}",
|
"Auth event's type and state_key combination exists multiple times: {}, {}",
|
||||||
auth_event.kind,
|
auth_event.kind,
|
||||||
@@ -177,6 +210,10 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
|||||||
auth_events_by_key.get(&(StateEventType::RoomCreate, String::new().into())),
|
auth_events_by_key.get(&(StateEventType::RoomCreate, String::new().into())),
|
||||||
Some(_) | None
|
Some(_) | None
|
||||||
) {
|
) {
|
||||||
|
self.services.pdu_metadata.mark_event_rejected(event_id);
|
||||||
|
self.services
|
||||||
|
.outlier
|
||||||
|
.add_pdu_outlier(pdu_event.event_id(), &incoming_pdu);
|
||||||
return Err!(Request(InvalidParam("Incoming event refers to wrong create event.")));
|
return Err!(Request(InvalidParam("Incoming event refers to wrong create event.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -185,6 +222,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
|||||||
ready(auth_events_by_key.get(&key).map(ToOwned::to_owned))
|
ready(auth_events_by_key.get(&key).map(ToOwned::to_owned))
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// PDU check: 3
|
||||||
let auth_check = state_res::event_auth::auth_check(
|
let auth_check = state_res::event_auth::auth_check(
|
||||||
&room_version_rules,
|
&room_version_rules,
|
||||||
&pdu_event,
|
&pdu_event,
|
||||||
@@ -196,7 +234,13 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
|||||||
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
|
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
|
||||||
|
|
||||||
if !auth_check {
|
if !auth_check {
|
||||||
return Err!(Request(Forbidden("Auth check failed")));
|
self.services.pdu_metadata.mark_event_rejected(event_id);
|
||||||
|
self.services
|
||||||
|
.outlier
|
||||||
|
.add_pdu_outlier(pdu_event.event_id(), &incoming_pdu);
|
||||||
|
return Err!(Request(Forbidden(
|
||||||
|
"Event authorisation fails based on event's claimed auth events"
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!("Validation successful.");
|
trace!("Validation successful.");
|
||||||
|
|||||||
@@ -1,89 +0,0 @@
|
|||||||
use std::{collections::BTreeMap, time::Instant};
|
|
||||||
|
|
||||||
use conduwuit::{
|
|
||||||
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, defer, implement,
|
|
||||||
utils::continue_exponential_backoff_secs,
|
|
||||||
};
|
|
||||||
use ruma::{CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName};
|
|
||||||
use tracing::debug;
|
|
||||||
|
|
||||||
#[implement(super::Service)]
|
|
||||||
#[allow(clippy::type_complexity)]
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
|
||||||
#[tracing::instrument(
|
|
||||||
name = "prev",
|
|
||||||
level = INFO_SPAN_LEVEL,
|
|
||||||
skip_all,
|
|
||||||
fields(%prev_id),
|
|
||||||
)]
|
|
||||||
pub(super) async fn handle_prev_pdu<'a, Pdu>(
|
|
||||||
&self,
|
|
||||||
origin: &'a ServerName,
|
|
||||||
event_id: &'a EventId,
|
|
||||||
room_id: &'a RoomId,
|
|
||||||
eventid_info: Option<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
|
|
||||||
create_event: &'a Pdu,
|
|
||||||
first_ts_in_room: MilliSecondsSinceUnixEpoch,
|
|
||||||
prev_id: &'a EventId,
|
|
||||||
) -> Result
|
|
||||||
where
|
|
||||||
Pdu: Event + Send + Sync,
|
|
||||||
{
|
|
||||||
// Check for disabled again because it might have changed
|
|
||||||
if self.services.metadata.is_disabled(room_id).await {
|
|
||||||
return Err!(Request(Forbidden(debug_warn!(
|
|
||||||
"Federaton of room {room_id} is currently disabled on this server. Request by \
|
|
||||||
origin {origin} and event ID {event_id}"
|
|
||||||
))));
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some((time, tries)) = self
|
|
||||||
.services
|
|
||||||
.globals
|
|
||||||
.bad_event_ratelimiter
|
|
||||||
.read()
|
|
||||||
.get(prev_id)
|
|
||||||
{
|
|
||||||
// Exponential backoff
|
|
||||||
const MIN_DURATION: u64 = 5 * 60;
|
|
||||||
const MAX_DURATION: u64 = 60 * 60 * 24;
|
|
||||||
if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) {
|
|
||||||
debug!(
|
|
||||||
?tries,
|
|
||||||
duration = ?time.elapsed(),
|
|
||||||
"Backing off from prev_event"
|
|
||||||
);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let Some((pdu, json)) = eventid_info else {
|
|
||||||
return Ok(());
|
|
||||||
};
|
|
||||||
|
|
||||||
// Skip old events
|
|
||||||
if pdu.origin_server_ts() < first_ts_in_room {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let start_time = Instant::now();
|
|
||||||
self.federation_handletime
|
|
||||||
.write()
|
|
||||||
.insert(room_id.into(), ((*prev_id).to_owned(), start_time));
|
|
||||||
|
|
||||||
defer! {{
|
|
||||||
self.federation_handletime
|
|
||||||
.write()
|
|
||||||
.remove(room_id);
|
|
||||||
}};
|
|
||||||
|
|
||||||
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
elapsed = ?start_time.elapsed(),
|
|
||||||
"Handled prev_event",
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
@@ -4,7 +4,6 @@
|
|||||||
mod fetch_state;
|
mod fetch_state;
|
||||||
mod handle_incoming_pdu;
|
mod handle_incoming_pdu;
|
||||||
mod handle_outlier_pdu;
|
mod handle_outlier_pdu;
|
||||||
mod handle_prev_pdu;
|
|
||||||
mod parse_incoming_pdu;
|
mod parse_incoming_pdu;
|
||||||
mod policy_server;
|
mod policy_server;
|
||||||
mod resolve_state;
|
mod resolve_state;
|
||||||
@@ -15,13 +14,13 @@
|
|||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use conduwuit::{Err, Event, PduEvent, Result, Server, SyncRwLock, utils::MutexMap};
|
use conduwuit::{Err, Event, PduEvent, Result, Server, SyncRwLock, utils::MutexMap};
|
||||||
|
pub use fetch_and_handle_outliers::build_local_dag;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
OwnedEventId, OwnedRoomId, RoomId, events::room::create::RoomCreateEventContent,
|
OwnedEventId, OwnedRoomId, RoomId, events::room::create::RoomCreateEventContent,
|
||||||
room_version_rules::RoomVersionRules,
|
room_version_rules::RoomVersionRules,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{Dep, globals, rooms, sending, server_keys};
|
use crate::{Dep, globals, rooms, sending, server_keys};
|
||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
pub mutex_federation: RoomMutexMap,
|
pub mutex_federation: RoomMutexMap,
|
||||||
pub federation_handletime: SyncRwLock<HandleTimeMap>,
|
pub federation_handletime: SyncRwLock<HandleTimeMap>,
|
||||||
|
|||||||
@@ -56,7 +56,10 @@ fn extract_room_id(event_type: &str, pdu: &CanonicalJsonObject) -> Result<OwnedR
|
|||||||
|
|
||||||
/// Parses every entry in an array as an event ID, returning an error if any
|
/// Parses every entry in an array as an event ID, returning an error if any
|
||||||
/// step fails.
|
/// step fails.
|
||||||
fn expect_event_id_array(value: &CanonicalJsonObject, field: &str) -> Result<Vec<OwnedEventId>> {
|
pub(super) fn expect_event_id_array(
|
||||||
|
value: &CanonicalJsonObject,
|
||||||
|
field: &str,
|
||||||
|
) -> Result<Vec<OwnedEventId>> {
|
||||||
value
|
value
|
||||||
.get(field)
|
.get(field)
|
||||||
.ok_or_else(|| err!(Request(BadJson("missing field `{field}` on PDU"))))?
|
.ok_or_else(|| err!(Request(BadJson("missing field `{field}` on PDU"))))?
|
||||||
|
|||||||
@@ -5,7 +5,7 @@
|
|||||||
};
|
};
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Result, debug, err, implement,
|
Result, debug, err, error, implement,
|
||||||
matrix::{Event, StateMap},
|
matrix::{Event, StateMap},
|
||||||
trace,
|
trace,
|
||||||
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt},
|
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt},
|
||||||
@@ -121,6 +121,7 @@ pub(super) async fn state_at_incoming_resolved<Pdu>(
|
|||||||
.state_resolution(room_version_rules, fork_states.iter(), &auth_chain_sets)
|
.state_resolution(room_version_rules, fork_states.iter(), &auth_chain_sets)
|
||||||
.boxed()
|
.boxed()
|
||||||
.await
|
.await
|
||||||
|
.inspect_err(|e| error!("State resolution failed: {e:?}"))
|
||||||
else {
|
else {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1,14 +1,18 @@
|
|||||||
use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant};
|
use std::{borrow::Borrow, collections::BTreeMap, sync::Arc, time::Instant};
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Result, debug, debug_info, err, implement, info, is_equal_to,
|
Err, Result, debug, debug_info, debug_warn, err, implement, is_equal_to,
|
||||||
matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res},
|
matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res},
|
||||||
trace,
|
trace,
|
||||||
utils::stream::{BroadbandExt, ReadyExt},
|
utils::{
|
||||||
|
IterStream,
|
||||||
|
stream::{BroadbandExt, ReadyExt},
|
||||||
|
},
|
||||||
warn,
|
warn,
|
||||||
};
|
};
|
||||||
use futures::{FutureExt, StreamExt, future::ready};
|
use futures::{FutureExt, StreamExt, future::ready};
|
||||||
use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType};
|
use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType};
|
||||||
|
use tokio::join;
|
||||||
|
|
||||||
use super::get_room_version_rules;
|
use super::get_room_version_rules;
|
||||||
use crate::rooms::{
|
use crate::rooms::{
|
||||||
@@ -35,16 +39,37 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
|||||||
.get_pdu_id(incoming_pdu.event_id())
|
.get_pdu_id(incoming_pdu.event_id())
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
|
trace!(event_id=%incoming_pdu.event_id(), "Skipping upgrade of already upgraded PDU");
|
||||||
return Ok(Some(pduid));
|
return Ok(Some(pduid));
|
||||||
}
|
}
|
||||||
|
|
||||||
if self
|
let (rejected, soft_failed) = join!(
|
||||||
.services
|
self.services
|
||||||
|
.pdu_metadata
|
||||||
|
.is_event_rejected(incoming_pdu.event_id()),
|
||||||
|
self.services
|
||||||
.pdu_metadata
|
.pdu_metadata
|
||||||
.is_event_soft_failed(incoming_pdu.event_id())
|
.is_event_soft_failed(incoming_pdu.event_id())
|
||||||
.await
|
);
|
||||||
{
|
if rejected {
|
||||||
return Err!(Request(InvalidParam("Event has been soft failed")));
|
return Err!(Request(InvalidParam("Event has been rejected")));
|
||||||
|
} else if soft_failed {
|
||||||
|
return Err!(Request(InvalidParam("Event has been soft-failed")));
|
||||||
|
}
|
||||||
|
|
||||||
|
// If any of the auth events are rejected, this event is also rejected.
|
||||||
|
for aid in incoming_pdu.auth_events() {
|
||||||
|
if self.services.pdu_metadata.is_event_rejected(aid).await {
|
||||||
|
// TODO: debug_warn instead of warn
|
||||||
|
warn!(
|
||||||
|
"Rejecting incoming event {} which depends on rejected auth event {aid}",
|
||||||
|
incoming_pdu.event_id()
|
||||||
|
);
|
||||||
|
self.services
|
||||||
|
.pdu_metadata
|
||||||
|
.mark_event_rejected(incoming_pdu.event_id());
|
||||||
|
return Err!(Request(InvalidParam("Event has rejected auth event: {aid}")));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
@@ -70,6 +95,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
|||||||
};
|
};
|
||||||
|
|
||||||
if state_at_incoming_event.is_none() {
|
if state_at_incoming_event.is_none() {
|
||||||
|
trace!("Could not calculate incoming state, asking remote {origin} for it");
|
||||||
state_at_incoming_event = self
|
state_at_incoming_event = self
|
||||||
.fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
|
.fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
|
||||||
.await?;
|
.await?;
|
||||||
@@ -95,6 +121,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
|||||||
event_id = %incoming_pdu.event_id,
|
event_id = %incoming_pdu.event_id,
|
||||||
"Running initial auth check"
|
"Running initial auth check"
|
||||||
);
|
);
|
||||||
|
// PDU check: 5
|
||||||
let auth_check = state_res::event_auth::auth_check(
|
let auth_check = state_res::event_auth::auth_check(
|
||||||
&room_version_rules,
|
&room_version_rules,
|
||||||
&incoming_pdu,
|
&incoming_pdu,
|
||||||
@@ -106,7 +133,12 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
|||||||
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
|
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
|
||||||
|
|
||||||
if !auth_check {
|
if !auth_check {
|
||||||
return Err!(Request(Forbidden("Event has failed auth check with state at the event.")));
|
self.services
|
||||||
|
.pdu_metadata
|
||||||
|
.mark_event_rejected(incoming_pdu.event_id());
|
||||||
|
return Err!(Request(Forbidden(
|
||||||
|
"Event authorisation fails based on the state before the event"
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
@@ -135,6 +167,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
|||||||
event_id = %incoming_pdu.event_id,
|
event_id = %incoming_pdu.event_id,
|
||||||
"Running auth check with claimed state auth"
|
"Running auth check with claimed state auth"
|
||||||
);
|
);
|
||||||
|
// PDU check: 6
|
||||||
let auth_check = state_res::event_auth::auth_check(
|
let auth_check = state_res::event_auth::auth_check(
|
||||||
&room_version_rules,
|
&room_version_rules,
|
||||||
&incoming_pdu,
|
&incoming_pdu,
|
||||||
@@ -144,6 +177,12 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
|
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
|
||||||
|
if !auth_check {
|
||||||
|
warn!(
|
||||||
|
event_id = %incoming_pdu.event_id,
|
||||||
|
"Event authentication fails based on the current state of the room"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// Soft fail check before doing state res
|
// Soft fail check before doing state res
|
||||||
debug!(
|
debug!(
|
||||||
@@ -153,16 +192,22 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
|||||||
let mut soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_rules)) {
|
let mut soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_rules)) {
|
||||||
| (false, _) => true,
|
| (false, _) => true,
|
||||||
| (true, None) => false,
|
| (true, None) => false,
|
||||||
| (true, Some(redact_id)) =>
|
| (true, Some(redact_id)) => {
|
||||||
!self
|
if !self
|
||||||
.services
|
.services
|
||||||
.state_accessor
|
.state_accessor
|
||||||
.user_can_redact(&redact_id, incoming_pdu.sender(), room_id, true)
|
.user_can_redact(&redact_id, incoming_pdu.sender(), room_id, true)
|
||||||
.await?,
|
.await?
|
||||||
|
{
|
||||||
|
warn!(redacts = %redact_id, "User is not allowed to redact event");
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
// 13. Use state resolution to find new room state
|
// 13. Use state resolution to find new room state
|
||||||
|
|
||||||
// We start looking at current room state now, so lets lock the room
|
// We start looking at current room state now, so lets lock the room
|
||||||
trace!(
|
trace!(
|
||||||
room_id = %room_id,
|
room_id = %room_id,
|
||||||
@@ -170,36 +215,6 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
|||||||
);
|
);
|
||||||
let state_lock = self.services.state.mutex.lock(room_id).await;
|
let state_lock = self.services.state.mutex.lock(room_id).await;
|
||||||
|
|
||||||
// Now we calculate the set of extremities this room has after the incoming
|
|
||||||
// event has been applied. We start with the previous extremities (aka leaves)
|
|
||||||
trace!("Calculating extremities");
|
|
||||||
let mut extremities: Vec<_> = self
|
|
||||||
.services
|
|
||||||
.state
|
|
||||||
.get_forward_extremities(room_id)
|
|
||||||
.ready_filter(|event_id| {
|
|
||||||
// Remove any that are referenced by this incoming event's prev_events
|
|
||||||
!incoming_pdu.prev_events().any(is_equal_to!(event_id))
|
|
||||||
})
|
|
||||||
.broad_filter_map(|event_id| async move {
|
|
||||||
// Only keep those extremities were not referenced yet
|
|
||||||
self.services
|
|
||||||
.pdu_metadata
|
|
||||||
.is_event_referenced(room_id, &event_id)
|
|
||||||
.await
|
|
||||||
.eq(&false)
|
|
||||||
.then_some(event_id)
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
.await;
|
|
||||||
extremities.push(incoming_pdu.event_id().to_owned());
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
"Retained {} extremities checked against {} prev_events",
|
|
||||||
extremities.len(),
|
|
||||||
incoming_pdu.prev_events().count()
|
|
||||||
);
|
|
||||||
|
|
||||||
let state_ids_compressed: Arc<CompressedState> = self
|
let state_ids_compressed: Arc<CompressedState> = self
|
||||||
.services
|
.services
|
||||||
.state_compressor
|
.state_compressor
|
||||||
@@ -294,81 +309,88 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
|||||||
.is_event_soft_failed(&redact_id)
|
.is_event_soft_failed(&redact_id)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
|
// TODO: This should avoid pushing the event to the timeline instead of using
|
||||||
|
// soft-fails as a hack
|
||||||
warn!(
|
warn!(
|
||||||
redact_id = %redact_id,
|
redact_id = %redact_id,
|
||||||
"Redaction is for a soft-failed event, soft failing the redaction"
|
"Redaction is for a soft-failed event"
|
||||||
);
|
);
|
||||||
soft_fail = true;
|
soft_fail = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 14. Check if the event passes auth based on the "current state" of the room,
|
trace!("Appending pdu to timeline");
|
||||||
// if not soft fail it
|
let mut extremities: Vec<_> = self
|
||||||
if soft_fail {
|
.services
|
||||||
info!(
|
.state
|
||||||
event_id = %incoming_pdu.event_id,
|
.get_forward_extremities(room_id)
|
||||||
"Soft failing event"
|
.collect()
|
||||||
);
|
.await;
|
||||||
// assert!(extremities.is_empty(), "soft_fail extremities empty");
|
if !soft_fail {
|
||||||
let extremities = extremities.iter().map(Borrow::borrow);
|
// Per https://spec.matrix.org/unstable/server-server-api/#soft-failure, soft-failed events
|
||||||
debug_assert!(extremities.clone().count() > 0, "extremities not empty");
|
// are not added as forward extremities.
|
||||||
|
|
||||||
self.services
|
// Now we calculate the set of extremities this room has after the incoming
|
||||||
.timeline
|
// event has been applied. We start with the previous extremities (aka leaves)
|
||||||
.append_incoming_pdu(
|
trace!("Calculating extremities");
|
||||||
&incoming_pdu,
|
extremities = extremities
|
||||||
val,
|
.into_iter()
|
||||||
extremities,
|
.stream()
|
||||||
state_ids_compressed,
|
.ready_filter(|event_id| {
|
||||||
soft_fail,
|
// Remove any that are referenced by this incoming event's prev_events
|
||||||
&state_lock,
|
!incoming_pdu.prev_events().any(is_equal_to!(event_id))
|
||||||
room_id,
|
})
|
||||||
)
|
.broad_filter_map(|event_id| async move {
|
||||||
.await?;
|
// Only keep those extremities were not referenced yet
|
||||||
|
|
||||||
// Soft fail, we keep the event as an outlier but don't add it to the timeline
|
|
||||||
self.services
|
self.services
|
||||||
.pdu_metadata
|
.pdu_metadata
|
||||||
.mark_event_soft_failed(incoming_pdu.event_id());
|
.is_event_referenced(room_id, &event_id)
|
||||||
|
.await
|
||||||
warn!(
|
.eq(&false)
|
||||||
event_id = %incoming_pdu.event_id,
|
.then_some(event_id)
|
||||||
"Event was soft failed"
|
})
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.await;
|
||||||
|
extremities.push(incoming_pdu.event_id().to_owned());
|
||||||
|
debug!(
|
||||||
|
"Retained {} extremities checked against {} prev_events",
|
||||||
|
extremities.len(),
|
||||||
|
incoming_pdu.prev_events().count()
|
||||||
);
|
);
|
||||||
return Err!(Request(InvalidParam("Event has been soft failed")));
|
assert!(!extremities.is_empty(), "extremities must not empty");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now that the event has passed all auth it is added into the timeline.
|
|
||||||
// We use the `state_at_event` instead of `state_after` so we accurately
|
|
||||||
// represent the state for this event.
|
|
||||||
trace!("Appending pdu to timeline");
|
|
||||||
let extremities = extremities
|
|
||||||
.iter()
|
|
||||||
.map(Borrow::borrow)
|
|
||||||
.chain(once(incoming_pdu.event_id()));
|
|
||||||
debug_assert!(extremities.clone().count() > 0, "extremities not empty");
|
|
||||||
|
|
||||||
let pdu_id = self
|
let pdu_id = self
|
||||||
.services
|
.services
|
||||||
.timeline
|
.timeline
|
||||||
.append_incoming_pdu(
|
.append_incoming_pdu(
|
||||||
&incoming_pdu,
|
&incoming_pdu,
|
||||||
val,
|
val,
|
||||||
extremities,
|
extremities.iter().map(Borrow::borrow),
|
||||||
state_ids_compressed,
|
state_ids_compressed,
|
||||||
soft_fail,
|
soft_fail,
|
||||||
&state_lock,
|
&state_lock,
|
||||||
room_id,
|
room_id,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
if soft_fail {
|
||||||
// Event has passed all auth/stateres checks
|
self.services
|
||||||
drop(state_lock);
|
.pdu_metadata
|
||||||
|
.mark_event_soft_failed(incoming_pdu.event_id());
|
||||||
|
debug_warn!(
|
||||||
|
elapsed = ?timer.elapsed(),
|
||||||
|
"Event has been soft-failed",
|
||||||
|
);
|
||||||
|
} else {
|
||||||
debug_info!(
|
debug_info!(
|
||||||
elapsed = ?timer.elapsed(),
|
elapsed = ?timer.elapsed(),
|
||||||
"Accepted",
|
"Accepted",
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Event has passed all auth/stateres checks
|
||||||
|
drop(state_lock);
|
||||||
|
|
||||||
Ok(pdu_id)
|
Ok(pdu_id)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -34,7 +34,7 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
Dep, antispam, globals,
|
Dep, antispam, globals,
|
||||||
rooms::{
|
rooms::{
|
||||||
metadata, outlier, short,
|
metadata, outlier, pdu_metadata, short,
|
||||||
state::{self, RoomMutexGuard},
|
state::{self, RoomMutexGuard},
|
||||||
state_accessor, state_cache,
|
state_accessor, state_cache,
|
||||||
state_compressor::{self, CompressedState, HashSetCompressStateEvent},
|
state_compressor::{self, CompressedState, HashSetCompressStateEvent},
|
||||||
@@ -54,6 +54,7 @@ struct Services {
|
|||||||
globals: Dep<globals::Service>,
|
globals: Dep<globals::Service>,
|
||||||
metadata: Dep<metadata::Service>,
|
metadata: Dep<metadata::Service>,
|
||||||
outlier: Dep<outlier::Service>,
|
outlier: Dep<outlier::Service>,
|
||||||
|
pdu_metadata: Dep<pdu_metadata::Service>,
|
||||||
sending: Dep<sending::Service>,
|
sending: Dep<sending::Service>,
|
||||||
server_keys: Dep<server_keys::Service>,
|
server_keys: Dep<server_keys::Service>,
|
||||||
short: Dep<short::Service>,
|
short: Dep<short::Service>,
|
||||||
@@ -75,6 +76,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
|||||||
globals: args.depend::<globals::Service>("globals"),
|
globals: args.depend::<globals::Service>("globals"),
|
||||||
metadata: args.depend::<metadata::Service>("metadata"),
|
metadata: args.depend::<metadata::Service>("metadata"),
|
||||||
outlier: args.depend::<outlier::Service>("rooms::outlier"),
|
outlier: args.depend::<outlier::Service>("rooms::outlier"),
|
||||||
|
pdu_metadata: args.depend::<pdu_metadata::Service>("rooms::pdu_metadata"),
|
||||||
sending: args.depend::<sending::Service>("sending"),
|
sending: args.depend::<sending::Service>("sending"),
|
||||||
server_keys: args.depend::<server_keys::Service>("server_keys"),
|
server_keys: args.depend::<server_keys::Service>("server_keys"),
|
||||||
short: args.depend::<short::Service>("rooms::short"),
|
short: args.depend::<short::Service>("rooms::short"),
|
||||||
@@ -286,7 +288,7 @@ async fn join_local_room(
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_remote_room", level = "info")]
|
#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_remote_room", level = "info")]
|
||||||
async fn join_remote_room(
|
pub async fn join_remote_room(
|
||||||
&self,
|
&self,
|
||||||
sender_user: &UserId,
|
sender_user: &UserId,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
@@ -294,6 +296,7 @@ async fn join_remote_room(
|
|||||||
servers: &[OwnedServerName],
|
servers: &[OwnedServerName],
|
||||||
state_lock: RoomMutexGuard,
|
state_lock: RoomMutexGuard,
|
||||||
) -> Result {
|
) -> Result {
|
||||||
|
// public so the admin command force-join-room-remotely works
|
||||||
info!("Joining {room_id} over federation.");
|
info!("Joining {room_id} over federation.");
|
||||||
|
|
||||||
let (make_join_response, remote_server) = self
|
let (make_join_response, remote_server) = self
|
||||||
@@ -514,6 +517,7 @@ async fn join_remote_room(
|
|||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
self.services.outlier.add_pdu_outlier(&event_id, &value);
|
self.services.outlier.add_pdu_outlier(&event_id, &value);
|
||||||
|
self.services.pdu_metadata.clear_pdu_markers(&event_id);
|
||||||
if let Some(state_key) = &pdu.state_key {
|
if let Some(state_key) = &pdu.state_key {
|
||||||
let shortstatekey = self
|
let shortstatekey = self
|
||||||
.services
|
.services
|
||||||
@@ -545,6 +549,7 @@ async fn join_remote_room(
|
|||||||
.ready_for_each(|(event_id, value)| {
|
.ready_for_each(|(event_id, value)| {
|
||||||
trace!(%event_id, "Adding PDU as an outlier from send_join auth_chain");
|
trace!(%event_id, "Adding PDU as an outlier from send_join auth_chain");
|
||||||
self.services.outlier.add_pdu_outlier(&event_id, &value);
|
self.services.outlier.add_pdu_outlier(&event_id, &value);
|
||||||
|
self.services.pdu_metadata.clear_pdu_markers(&event_id);
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ pub(super) struct Data {
|
|||||||
tofrom_relation: Arc<Map>,
|
tofrom_relation: Arc<Map>,
|
||||||
referencedevents: Arc<Map>,
|
referencedevents: Arc<Map>,
|
||||||
softfailedeventids: Arc<Map>,
|
softfailedeventids: Arc<Map>,
|
||||||
|
rejectedeventids: Arc<Map>,
|
||||||
services: Services,
|
services: Services,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -40,6 +41,7 @@ pub(super) fn new(args: &crate::Args<'_>) -> Self {
|
|||||||
tofrom_relation: db["tofrom_relation"].clone(),
|
tofrom_relation: db["tofrom_relation"].clone(),
|
||||||
referencedevents: db["referencedevents"].clone(),
|
referencedevents: db["referencedevents"].clone(),
|
||||||
softfailedeventids: db["softfailedeventids"].clone(),
|
softfailedeventids: db["softfailedeventids"].clone(),
|
||||||
|
rejectedeventids: db["rejectedeventids"].clone(),
|
||||||
services: Services {
|
services: Services {
|
||||||
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
|
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
|
||||||
},
|
},
|
||||||
@@ -118,7 +120,29 @@ pub(super) fn mark_event_soft_failed(&self, event_id: &EventId) {
|
|||||||
self.softfailedeventids.insert(event_id, []);
|
self.softfailedeventids.insert(event_id, []);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) fn unmark_event_soft_failed(&self, event_id: &EventId) {
|
||||||
|
self.softfailedeventids.remove(event_id);
|
||||||
|
}
|
||||||
|
|
||||||
pub(super) async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
|
pub(super) async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
|
||||||
self.softfailedeventids.get(event_id).await.is_ok()
|
self.softfailedeventids.get(event_id).await.is_ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) fn mark_event_rejected(&self, event_id: &EventId) {
|
||||||
|
self.rejectedeventids.insert(event_id, []);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn unmark_event_rejected(&self, event_id: &EventId) {
|
||||||
|
self.rejectedeventids.remove(event_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn is_event_rejected(&self, event_id: &EventId) -> bool {
|
||||||
|
self.rejectedeventids.get(event_id).await.is_ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Removes any soft-fail or rejection markers applied to the target PDU
|
||||||
|
pub(super) fn clear_pdu_markers(&self, event_id: &EventId) {
|
||||||
|
self.unmark_event_rejected(event_id);
|
||||||
|
self.unmark_event_soft_failed(event_id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -140,4 +140,28 @@ pub fn mark_event_soft_failed(&self, event_id: &EventId) {
|
|||||||
pub async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
|
pub async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
|
||||||
self.db.is_event_soft_failed(event_id).await
|
self.db.is_event_soft_failed(event_id).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn is_event_rejected(&self, event_id: &EventId) -> bool {
|
||||||
|
self.db.is_event_rejected(event_id).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn mark_event_rejected(&self, event_id: &EventId) {
|
||||||
|
self.db.mark_event_rejected(event_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn unmark_event_soft_failed(&self, event_id: &EventId) {
|
||||||
|
self.db.unmark_event_soft_failed(event_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn unmark_event_rejected(&self, event_id: &EventId) {
|
||||||
|
self.db.unmark_event_rejected(event_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if the event is neither soft-failed nor rejected.
|
||||||
|
pub async fn is_event_accepted(&self, event_id: &EventId) -> bool {
|
||||||
|
!self.db.is_event_rejected(event_id).await
|
||||||
|
&& !self.db.is_event_soft_failed(event_id).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn clear_pdu_markers(&self, event_id: &EventId) { self.db.clear_pdu_markers(event_id); }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -53,15 +53,7 @@ pub async fn append_incoming_pdu<'a, Leaves>(
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if soft_fail {
|
if soft_fail {
|
||||||
self.services
|
// Nothing else to do with a soft-failed event.
|
||||||
.pdu_metadata
|
|
||||||
.mark_as_referenced(room_id, pdu.prev_events.iter().map(AsRef::as_ref));
|
|
||||||
|
|
||||||
// self.services
|
|
||||||
// .state
|
|
||||||
// .set_forward_extremities(room_id, new_room_leaves, state_lock)
|
|
||||||
// .await;
|
|
||||||
|
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user