feat: Implement event rejection

Co-Authored-By: star <star@nexy7574.co.uk>
This commit is contained in:
timedout
2026-05-08 19:49:51 +01:00
parent 1658b3bf6c
commit 44c5cfa3c4
15 changed files with 139 additions and 19 deletions
+3 -4
View File
@@ -614,6 +614,7 @@ pub(super) async fn force_set_room_state_from_server(
.await;
state.insert(shortstatekey, pdu.event_id.clone());
self.services.rooms.pdu_metadata.unmark_pdu(pdu.event_id());
}
}
@@ -631,6 +632,7 @@ pub(super) async fn force_set_room_state_from_server(
.rooms
.outlier
.add_pdu_outlier(&event_id, &value);
self.services.rooms.pdu_metadata.unmark_pdu(&event_id);
}
info!("Resolving new room state");
@@ -662,10 +664,7 @@ pub(super) async fn force_set_room_state_from_server(
.force_state(room_id.clone().as_ref(), short_state_hash, added, removed, &state_lock)
.await?;
info!(
"Updating joined counts for room just in case (e.g. we may have found a difference in \
the room's m.room.member state"
);
info!("Updating joined counts for room");
self.services
.rooms
.state_cache
+35
View File
@@ -763,6 +763,41 @@ pub(super) async fn force_join_room(
.await
}
#[admin_command]
pub(super) async fn force_join_room_remotely(
&self,
user_id: String,
room_id: OwnedRoomOrAliasId,
via: String,
) -> Result {
let via = ServerName::parse(&via)?;
let user_id = parse_local_user_id(self.services, &user_id)?;
let (room_id, mut servers) = self
.services
.rooms
.alias
.resolve_with_servers(&room_id, None)
.await?;
if servers.contains(&via) {
servers.retain(|n| *n != via);
}
servers.insert(0, via);
assert!(
self.services.globals.user_is_local(&user_id),
"Parsed user_id must be a local user"
);
let state_lock = self.services.rooms.state.mutex.lock(&*room_id).await;
self.services
.rooms
.membership
.join_remote_room(&user_id, &room_id, None, &servers, state_lock)
.await?;
self.write_str(&format!("{user_id} has been joined to {room_id}."))
.await
}
#[admin_command]
pub(super) async fn force_leave_room(
&self,
+14
View File
@@ -183,6 +183,20 @@ pub enum UserCommand {
room_id: OwnedRoomOrAliasId,
},
/// Manually join a local user to a room via a remote server, regardless of
/// our current residency.
ForceJoinRoomRemotely {
/// The user to join
user_id: String,
/// The room to join
room_id: OwnedRoomOrAliasId,
/// The server name to join via.
///
/// This server will always be tried first, however if more are
/// available, they may be tried after.
via: String,
},
/// Manually leave a local user from a room.
ForceLeaveRoom {
user_id: String,
+1 -1
View File
@@ -187,7 +187,7 @@ pub(crate) async fn change_password_route(
if services.server.config.admin_room_notices {
services
.admin
.notice(&format!("User {} changed their password.", &sender_user))
.notice(&format!("User {sender_user} changed their password."))
.await;
}
+1 -4
View File
@@ -537,10 +537,7 @@ pub(crate) async fn create_room_route(
if services.server.config.admin_room_notices {
services
.admin
.send_text(&format!(
"{sender_user} made {} public to the room directory",
&room_id
))
.send_text(&format!("{sender_user} made {room_id} public to the room directory"))
.await;
}
info!("{sender_user} made {0} public to the room directory", &room_id);
+1 -1
View File
@@ -260,7 +260,7 @@ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{real_error}")
}
} else {
write!(f, "Request error: {}", &self.0)
write!(f, "Request error: {}", self.0)
}
}
}
+5
View File
@@ -311,6 +311,11 @@ pub(super) fn open_list(db: &Arc<Engine>, maps: &[Descriptor]) -> Result<Maps> {
key_size_hint: Some(48),
..descriptor::RANDOM_SMALL
},
Descriptor {
name: "rejectedeventids",
key_size_hint: Some(48),
..descriptor::RANDOM_SMALL
},
Descriptor {
name: "statehash_shortstatehash",
val_size_hint: Some(8),
+1 -1
View File
@@ -44,7 +44,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
db,
server: args.server.clone(),
bad_event_ratelimiter: Arc::new(SyncRwLock::new(HashMap::new())),
admin_alias: OwnedRoomAliasId::try_from(format!("#admins:{}", &args.server.name))
admin_alias: OwnedRoomAliasId::try_from(format!("#admins:{}", args.server.name))
.expect("#admins:server_name is valid alias name"),
server_user: UserId::parse_with_server_name(
String::from("conduit"),
+1 -1
View File
@@ -37,7 +37,7 @@ pub struct PasswordReset<'a> {
}
impl MessageTemplate for PasswordReset<'_> {
fn subject(&self) -> String { format!("Password reset request for {}", &self.user_id) }
fn subject(&self) -> String { format!("Password reset request for {}", self.user_id) }
}
#[derive(Template)]
+1 -1
View File
@@ -48,7 +48,7 @@ pub fn is_valid(&self) -> bool {
impl std::fmt::Display for DatabaseTokenInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Token created by {} and used {} times. ", &self.creator, self.uses)?;
write!(f, "Token created by {} and used {} times. ", self.creator, self.uses)?;
if let Some(expires) = &self.expires {
write!(f, "{expires}.")?;
} else {
+1 -1
View File
@@ -35,7 +35,7 @@ pub struct ValidToken {
impl std::fmt::Display for ValidToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "`{}` --- {}", self.token, &self.source)
write!(f, "`{}` --- {}", self.token, self.source)
}
}
@@ -38,13 +38,27 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
return Ok(Some(pduid));
}
if self
if !self
.services
.pdu_metadata
.is_event_soft_failed(incoming_pdu.event_id())
.is_event_accepted(incoming_pdu.event_id())
.await
{
return Err!(Request(InvalidParam("Event has been soft failed")));
return Err!(Request(InvalidParam("Event has been rejected or soft-failed")));
}
// If any of the auth events are rejected, this event is also rejected.
for aid in incoming_pdu.auth_events() {
if self.services.pdu_metadata.is_event_rejected(aid).await {
warn!(
"Rejecting incoming event {} which depends on rejected auth event {aid}",
incoming_pdu.event_id()
);
self.services
.pdu_metadata
.mark_event_rejected(incoming_pdu.event_id());
return Err!(Request(InvalidParam("Event has rejected auth events")));
}
}
debug!(
@@ -106,6 +120,9 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
if !auth_check {
self.services
.pdu_metadata
.mark_event_rejected(incoming_pdu.event_id());
return Err!(Request(Forbidden("Event has failed auth check with state at the event.")));
}
+7 -2
View File
@@ -34,7 +34,7 @@
use crate::{
Dep, antispam, globals,
rooms::{
metadata, outlier, short,
metadata, outlier, pdu_metadata, short,
state::{self, RoomMutexGuard},
state_accessor, state_cache,
state_compressor::{self, CompressedState, HashSetCompressStateEvent},
@@ -54,6 +54,7 @@ struct Services {
globals: Dep<globals::Service>,
metadata: Dep<metadata::Service>,
outlier: Dep<outlier::Service>,
pdu_metadata: Dep<pdu_metadata::Service>,
sending: Dep<sending::Service>,
server_keys: Dep<server_keys::Service>,
short: Dep<short::Service>,
@@ -75,6 +76,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
globals: args.depend::<globals::Service>("globals"),
metadata: args.depend::<metadata::Service>("metadata"),
outlier: args.depend::<outlier::Service>("rooms::outlier"),
pdu_metadata: args.depend::<pdu_metadata::Service>("rooms::pdu_metadata"),
sending: args.depend::<sending::Service>("sending"),
server_keys: args.depend::<server_keys::Service>("server_keys"),
short: args.depend::<short::Service>("rooms::short"),
@@ -286,7 +288,7 @@ async fn join_local_room(
}
#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_remote_room", level = "info")]
async fn join_remote_room(
pub async fn join_remote_room(
&self,
sender_user: &UserId,
room_id: &RoomId,
@@ -294,6 +296,7 @@ async fn join_remote_room(
servers: &[OwnedServerName],
state_lock: RoomMutexGuard,
) -> Result {
// public so the admin command force-join-room-remotely works
info!("Joining {room_id} over federation.");
let (make_join_response, remote_server) = self
@@ -514,6 +517,7 @@ async fn join_remote_room(
return state;
}
self.services.outlier.add_pdu_outlier(&event_id, &value);
self.services.pdu_metadata.unmark_pdu(&event_id);
if let Some(state_key) = &pdu.state_key {
let shortstatekey = self
.services
@@ -545,6 +549,7 @@ async fn join_remote_room(
.ready_for_each(|(event_id, value)| {
trace!(%event_id, "Adding PDU as an outlier from send_join auth_chain");
self.services.outlier.add_pdu_outlier(&event_id, &value);
self.services.pdu_metadata.unmark_pdu(&event_id);
})
.await;
+24
View File
@@ -26,6 +26,7 @@ pub(super) struct Data {
tofrom_relation: Arc<Map>,
referencedevents: Arc<Map>,
softfailedeventids: Arc<Map>,
rejectedeventids: Arc<Map>,
services: Services,
}
@@ -40,6 +41,7 @@ pub(super) fn new(args: &crate::Args<'_>) -> Self {
tofrom_relation: db["tofrom_relation"].clone(),
referencedevents: db["referencedevents"].clone(),
softfailedeventids: db["softfailedeventids"].clone(),
rejectedeventids: db["rejectedeventids"].clone(),
services: Services {
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
},
@@ -118,7 +120,29 @@ pub(super) fn mark_event_soft_failed(&self, event_id: &EventId) {
self.softfailedeventids.insert(event_id, []);
}
pub(super) fn unmark_event_soft_failed(&self, event_id: &EventId) {
self.softfailedeventids.remove(event_id);
}
pub(super) async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
self.softfailedeventids.get(event_id).await.is_ok()
}
pub(super) fn mark_event_rejected(&self, event_id: &EventId) {
self.rejectedeventids.insert(event_id, []);
}
pub(super) fn unmark_event_rejected(&self, event_id: &EventId) {
self.rejectedeventids.remove(event_id);
}
pub(super) async fn is_event_rejected(&self, event_id: &EventId) -> bool {
self.rejectedeventids.get(event_id).await.is_ok()
}
/// Removes any soft-fail or rejection markers applied to the target PDU
pub(super) fn unmark_pdu(&self, event_id: &EventId) {
self.unmark_event_rejected(event_id);
self.unmark_event_soft_failed(event_id);
}
}
+24
View File
@@ -140,4 +140,28 @@ pub fn mark_event_soft_failed(&self, event_id: &EventId) {
pub async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
self.db.is_event_soft_failed(event_id).await
}
pub async fn is_event_rejected(&self, event_id: &EventId) -> bool {
self.db.is_event_rejected(event_id).await
}
pub fn mark_event_rejected(&self, event_id: &EventId) {
self.db.mark_event_rejected(event_id);
}
pub fn unmark_event_soft_failed(&self, event_id: &EventId) {
self.db.unmark_event_soft_failed(event_id);
}
pub fn unmark_event_rejected(&self, event_id: &EventId) {
self.db.unmark_event_rejected(event_id);
}
/// Returns true if the event is neither soft-failed nor rejected.
pub async fn is_event_accepted(&self, event_id: &EventId) -> bool {
!self.db.is_event_rejected(event_id).await
&& !self.db.is_event_soft_failed(event_id).await
}
pub fn unmark_pdu(&self, event_id: &EventId) { self.db.unmark_pdu(event_id); }
}