Compare commits

...

1 Commits

Author SHA1 Message Date
timedout ca5b43c6bb wip: Room data purging 2026-05-01 17:38:43 +01:00
17 changed files with 265 additions and 43 deletions
+68 -3
View File
@@ -1,9 +1,9 @@
use api::client::leave_room;
use clap::Subcommand;
use conduwuit::{
Err, Result, debug, info,
utils::{IterStream, ReadyExt},
warn,
debug, info, utils::{IterStream, ReadyExt}, warn,
Err,
Result,
};
use futures::{FutureExt, StreamExt};
use ruma::{OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId};
@@ -43,6 +43,12 @@ pub enum RoomModerationCommand {
/// information
no_details: bool,
},
/// Deletes a room
Delete {
/// The room ID
room_id: OwnedRoomId,
},
}
#[admin_command]
@@ -452,3 +458,62 @@ async fn list_banned_rooms(&self, no_details: bool) -> Result {
self.write_str(&format!("Rooms Banned ({num}):\n```\n{body}\n```"))
.await
}
#[admin_command]
async fn delete(&self, room_id: OwnedRoomId) -> Result {
let is_banned = self.services.rooms.metadata.is_banned(&room_id).await;
let is_disabled = self.services.rooms.metadata.is_disabled(&room_id).await;
// Temporarily forcefully ban the room to prevent people trying to join while
// deletion is ongoing.
self.services.rooms.metadata.disable_room(&room_id, true);
self.services.rooms.metadata.ban_room(&room_id, true);
let mut users = self
.services
.rooms
.state_cache
.room_members(&room_id)
.ready_filter(|user| self.services.globals.user_is_local(user))
.boxed();
while let Some(ref user_id) = users.next().await {
info!("Removing {user_id} from {room_id}",);
if let Err(e) = leave_room(self.services, user_id, &room_id, None)
.boxed()
.await
{
warn!("Failed to remove {user_id} from {room_id}: {e}");
}
self.services.rooms.state_cache.forget(&room_id, user_id);
}
self.services
.rooms
.alias
.local_aliases_for_room(&room_id)
.for_each(|local_alias| async move {
info!("Removing alias {local_alias}");
self.services
.rooms
.alias
.remove_alias(&local_alias, &self.services.globals.server_user)
.await
.ok();
})
.await;
info!("Removing the room from the directory if it is present");
self.services.rooms.directory.set_not_public(&room_id);
info!("Removing lazy-loading metadata");
self.services.rooms.lazy_loading.purge(&room_id).await;
info!("Removing PDU metadata");
self.services.rooms.pdu_metadata.purge(&room_id).await;
info!("Removing state cache");
self.services.rooms.state_cache.purge(&room_id).await;
info!("Removing room state");
self.services.rooms.state.purge(&room_id).await;
Ok(())
}
+3 -2
View File
@@ -118,8 +118,9 @@ pub(crate) async fn get_content_thumbnail_route(
} = match fetch_thumbnail_meta(&services, &mxc, user, body.timeout_ms, &dim).await {
| Ok(meta) => meta,
| Err(conduwuit::Error::Io(e)) => match e.kind() {
| std::io::ErrorKind::NotFound =>
return Err!(Request(NotFound("Thumbnail not found."))),
| std::io::ErrorKind::NotFound => {
return Err!(Request(NotFound("Thumbnail not found.")));
},
| std::io::ErrorKind::PermissionDenied => {
error!("Permission denied when trying to read file: {e:?}");
return Err!(Request(Unknown("Unknown error when fetching thumbnail.")));
+3 -2
View File
@@ -375,7 +375,7 @@ async fn allowed_to_send_state_event(
},
}
},
| StateEventType::RoomMember =>
| StateEventType::RoomMember => {
match json.deserialize_as_unchecked::<RoomMemberEventContent>() {
| Ok(mut membership_content) => {
let Ok(state_key) = UserId::parse(state_key) else {
@@ -434,7 +434,8 @@ async fn allowed_to_send_state_event(
membership state: {e}"
)));
},
},
}
},
| _ => (),
}
+3 -2
View File
@@ -19,10 +19,11 @@ pub(crate) async fn well_known_client(
) -> Result<discover_homeserver::Response> {
let client_url = match services.config.well_known.client.as_ref() {
| Some(url) => url.to_string(),
| None =>
| None => {
return Err!(Request(NotFound(
"This server is not configured to serve well-known client information."
))),
)));
},
};
Ok(assign!(discover_homeserver::Response::new(HomeserverInfo::new(client_url)), {
+3 -2
View File
@@ -89,8 +89,9 @@ async fn verify<B: AsRef<[u8]> + Sync>(
origin: Some(output.origin.clone()),
..Default::default()
}),
| Err(err) =>
Err!(Request(Unauthorized(warn!("Failed to verify X-Matrix header: {err}")))),
| Err(err) => {
Err!(Request(Unauthorized(warn!("Failed to verify X-Matrix header: {err}"))))
},
}
}
}
+3 -2
View File
@@ -252,7 +252,7 @@ pub(crate) async fn user_can_perform_restricted_join(
return Ok(true);
}
},
| other if other.rule_type() == "fi.mau.spam_checker" =>
| other if other.rule_type() == "fi.mau.spam_checker" => {
return match services
.antispam
.meowlnir_accept_make_join(room_id.to_owned(), user_id.to_owned())
@@ -260,7 +260,8 @@ pub(crate) async fn user_can_perform_restricted_join(
{
| Ok(()) => Ok(true),
| Err(_) => Err!(Request(Forbidden("Antispam rejected join request."))),
},
};
},
| _ => {
// We don't recognise this join rule, so we cannot satisfy the request.
could_satisfy = false;
+4 -1
View File
@@ -160,7 +160,10 @@ pub async fn create_admin_room(services: &Services) -> Result {
.boxed()
.await?;
let room_topic = format!("Manage {} | Run commands prefixed with `!admin` | Run `!admin -h` for help | Documentation: https://continuwuity.org/", services.config.server_name);
let room_topic = format!(
"Manage {} | Run commands prefixed with `!admin` | Run `!admin -h` for help | Documentation: https://continuwuity.org/",
services.config.server_name
);
services
.rooms
.timeline
+3 -1
View File
@@ -312,7 +312,9 @@ pub fn print_first_run_banner(&self) {
to open the console."
);
}
eprintln!("If you need assistance setting up your homeserver, make a Matrix account on another homeserver and join our chatroom: https://matrix.to/#/#continuwuity:continuwuity.org");
eprintln!(
"If you need assistance setting up your homeserver, make a Matrix account on another homeserver and join our chatroom: https://matrix.to/#/#continuwuity:continuwuity.org"
);
eprintln!("{}", "============".bold());
}
+11
View File
@@ -67,6 +67,17 @@ pub async fn reset(&self, ctx: &Context<'_>) {
.await;
}
#[implement(Service)]
pub async fn purge(&self, room_id: &RoomId) {
let prefix = (Interfix, Interfix, room_id, Interfix);
self.db
.lazyloadedids
.keys_prefix_raw(&prefix)
.ignore_err()
.ready_for_each(|key| self.db.lazyloadedids.remove(key))
.await;
}
/// Returns only the subset of `senders` which should be sent to the client
/// according to the provided lazy loading context.
#[implement(Service)]
+15 -1
View File
@@ -9,7 +9,7 @@
u64_from_u8,
},
};
use database::Map;
use database::{Interfix, Map};
use futures::{Stream, StreamExt};
use ruma::{EventId, RoomId, UserId, api::Direction};
@@ -46,6 +46,16 @@ pub(super) fn new(args: &crate::Args<'_>) -> Self {
}
}
pub(super) async fn purge(&self, room_id: &RoomId) {
// NOTE: This does not remove soft-failed event references, that must be done
// somewhere else.
self.referencedevents
.keys_prefix_raw(&(room_id, Interfix))
.ignore_err()
.ready_for_each(|key| self.referencedevents.remove(key))
.await;
}
pub(super) fn add_relation(&self, from: u64, to: u64) {
const BUFSIZE: usize = size_of::<u64>() * 2;
@@ -121,4 +131,8 @@ pub(super) fn mark_event_soft_failed(&self, event_id: &EventId) {
pub(super) async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
self.softfailedeventids.get(event_id).await.is_ok()
}
pub(super) async fn remove_soft_fail_marker(&self, event_id: &EventId) {
self.softfailedeventids.remove(event_id);
}
}
+6
View File
@@ -140,4 +140,10 @@ pub fn mark_event_soft_failed(&self, event_id: &EventId) {
pub async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
self.db.is_event_soft_failed(event_id).await
}
pub async fn purge(&self, room_id: &RoomId) { self.db.purge(room_id).await; }
pub async fn remove_soft_fail_marker(&self, event_id: &EventId) {
self.db.remove_soft_fail_marker(event_id).await;
}
}
+8
View File
@@ -115,4 +115,12 @@ pub(super) async fn last_privateread_update(
.deserialized()
.unwrap_or(0)
}
pub(super) async fn purge(&self, room_id: &RoomId) {
self.readreceiptid_readreceipt
.keys_prefix_raw(room_id)
.ignore_err()
.ready_for_each(|key| self.readreceiptid_readreceipt.remove(key))
.await;
}
}
+2
View File
@@ -142,6 +142,8 @@ pub async fn private_read_get_count(
pub async fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> u64 {
self.db.last_privateread_update(user_id, room_id).await
}
pub async fn purge(&self, room_id: &RoomId) { self.db.purge(room_id).await }
}
#[must_use]
+33 -14
View File
@@ -3,32 +3,32 @@
use async_trait::async_trait;
use conduwuit::debug;
use conduwuit_core::{
Event, PduEvent, Result, err,
result::FlatOk,
state_res::{self, StateMap},
utils::{
IterStream, MutexMap, MutexMapGuard, ReadyExt, calculate_hash,
stream::{BroadbandExt, TryIgnore},
err, result::FlatOk, state_res::{self, StateMap}, utils::{
calculate_hash, stream::{BroadbandExt, TryIgnore}, IterStream, MutexMap, MutexMapGuard,
ReadyExt,
},
warn,
Event,
PduEvent,
Result,
};
use conduwuit_database::{Deserialized, Ignore, Interfix, Map};
use futures::{
FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::join_all, pin_mut,
future::join_all, pin_mut, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt,
};
use ruma::{
EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, UserId,
api::federation::membership::RawStrippedState,
events::{StateEventType, TimelineEventType, room::create::RoomCreateEventContent},
room_version_rules::RoomVersionRules,
api::federation::membership::RawStrippedState, events::{room::create::RoomCreateEventContent, StateEventType, TimelineEventType}, room_version_rules::RoomVersionRules, EventId, OwnedEventId, OwnedRoomId,
RoomId,
RoomVersionId,
UserId,
};
use crate::{
Dep, globals, rooms,
rooms::{
globals, rooms, rooms::{
short::{ShortEventId, ShortStateHash},
state_compressor::{CompressedState, parse_compressed_state_event},
state_compressor::{parse_compressed_state_event, CompressedState},
},
Dep,
};
pub struct Service {
@@ -89,6 +89,25 @@ fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
pub async fn purge(&self, room_id: &RoomId) {
self.db.roomid_pduleaves
.keys_prefix_raw(room_id)
.ignore_err()
.ready_for_each(|k| self.db.roomid_pduleaves.remove(k))
.await;
let mut shortstatehashes = self.db.roomid_shortstatehash
.keys_prefix_raw(room_id)
.ignore_err();
while let Some(key) = shortstatehashes.next().await {
self.db.shorteventid_shortstatehash
.keys_prefix_raw(&(Interfix, key))
.ignore_err()
.ready_for_each(|key| self.db.shorteventid_shortstatehash.remove(key))
.await;
self.db.roomid_shortstatehash.remove(key);
};
}
/// Set the room to the given statehash and update caches.
pub async fn force_state(
&self,
+94 -9
View File
@@ -4,20 +4,21 @@
use std::{collections::HashMap, sync::Arc};
use conduwuit::{
Pdu, Result, SyncRwLock, implement,
result::LogErr,
utils::{ReadyExt, stream::TryIgnore},
warn,
implement, result::LogErr, utils::{stream::TryIgnore, ReadyExt}, warn,
Pdu,
Result,
SyncRwLock,
};
use database::{Deserialized, Ignore, Interfix, Map};
use futures::{Stream, StreamExt, future::join5, pin_mut};
use futures::{future::join5, pin_mut, Stream, StreamExt};
use ruma::{
OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId,
events::{AnyStrippedStateEvent, room::member::MembershipState},
serde::Raw,
events::{room::member::MembershipState, AnyStrippedStateEvent}, serde::Raw, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId,
ServerName,
UserId,
};
use tokio::join;
use crate::{Dep, account_data, appservice::RegistrationInfo, config, globals, rooms, users};
use crate::{account_data, appservice::RegistrationInfo, config, globals, rooms, users, Dep};
pub struct Service {
appservice_in_room_cache: AppServiceInRoomCache,
@@ -93,6 +94,90 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
#[implement(Service)]
#[tracing::instrument(skip(self))]
pub async fn purge(&self, room_id: &RoomId) {
let roomuser_key = (room_id, Interfix);
let userroom_key = (Interfix, room_id);
join!(
self.db
.roomid_invitedcount
.keys_prefix_raw(room_id)
.ignore_err()
.ready_for_each(|key| self.db.roomid_invitedcount.remove(key)),
self.db
.roomid_inviteviaservers
.keys_prefix_raw(room_id)
.ignore_err()
.ready_for_each(|key| self.db.roomid_inviteviaservers.remove(key)),
self.db
.roomid_joinedcount
.keys_prefix_raw(room_id)
.ignore_err()
.ready_for_each(|key| self.db.roomid_joinedcount.remove(key)),
self.db
.roomserverids
.keys_prefix_raw(room_id)
.ignore_err()
.ready_for_each(|key| self.db.roomserverids.remove(key)),
self.db
.roomuserid_invitecount
.keys_prefix_raw(&roomuser_key)
.ignore_err()
.ready_for_each(|key| self.db.roomuserid_invitecount.remove(key)),
self.db
.roomuserid_joined
.keys_prefix_raw(&roomuser_key)
.ignore_err()
.ready_for_each(|key| self.db.roomuserid_joined.remove(key)),
self.db
.roomuserid_leftcount
.keys_prefix_raw(&roomuser_key)
.ignore_err()
.ready_for_each(|key| self.db.roomuserid_leftcount.remove(key)),
self.db
.roomuserid_knockedcount
.keys_prefix_raw(&roomuser_key)
.ignore_err()
.ready_for_each(|key| self.db.roomuserid_knockedcount.remove(key)),
self.db
.roomuseroncejoinedids
.keys_prefix_raw(room_id)
.ignore_err()
.ready_for_each(|key| self.db.roomuseroncejoinedids.remove(key)),
self.db
.userroomid_invitestate
.keys_prefix_raw(&userroom_key)
.ignore_err()
.ready_for_each(|key| self.db.userroomid_invitestate.remove(key)),
self.db
.userroomid_joined
.keys_prefix_raw(&userroom_key)
.ignore_err()
.ready_for_each(|key| self.db.userroomid_joined.remove(key)),
self.db
.userroomid_leftstate
.keys_prefix_raw(&userroom_key)
.ignore_err()
.ready_for_each(|key| self.db.userroomid_leftstate.remove(key)),
self.db
.userroomid_knockedstate
.keys_prefix_raw(&userroom_key)
.ignore_err()
.ready_for_each(|key| self.db.userroomid_knockedstate.remove(key)),
self.db
.userroomid_invitesender
.keys_prefix_raw(&userroom_key)
.ignore_err()
.ready_for_each(|key| self.db.userroomid_invitesender.remove(key)),
self.db
.serverroomids
.keys_prefix_raw(&(Interfix, room_id))
.ignore_err()
.ready_for_each(|key| self.db.serverroomids.remove(key)),
);
}
#[implement(Service)]
#[tracing::instrument(level = "trace", skip_all)]
pub async fn appservice_in_room(&self, room_id: &RoomId, appservice: &RegistrationInfo) -> bool {
+3 -2
View File
@@ -54,8 +54,9 @@ pub(crate) async fn send_antispam_request<T>(
if !status.is_success() {
debug_error!("Antispam response bytes: {:?}", utils::string_from_bytes(&body));
return match status {
| http::StatusCode::FORBIDDEN =>
Err!(Request(Forbidden("Request was rejected by antispam service.",))),
| http::StatusCode::FORBIDDEN => {
Err!(Request(Forbidden("Request was rejected by antispam service.",)))
},
| _ => Err!(BadServerResponse(warn!(
"Antispam returned unsuccessful HTTP response {status}",
))),
+3 -2
View File
@@ -360,11 +360,12 @@ async fn check_stage(
));
}
},
| _ =>
| _ => {
return Err(StandardErrorBody::new(
ErrorKind::Unrecognized,
"Identifier type not recognized".to_owned(),
)),
));
},
};
let Ok(user_id) = UserId::parse_with_server_name(