Compare commits

..

3 Commits

Author SHA1 Message Date
Jade Ellis
aa29b81ef6 fix: Don't store events that have already been redacted
This prevents clobbering
2025-06-14 19:40:43 +01:00
Jade Ellis
46b1eeb2c8 feat: Allow retrieving redacted message content (msc2815)
Still to do:
- Handling the difference between content that we have deleted and
content we never received
- Deleting the original content on command or expiry

Another question is if we have to store the full original content?
Can we get by with just storing the 'content' field?
2025-06-14 19:40:43 +01:00
Jade Ellis
88ecf61d49 feat: Store the original content of redacted PDUs 2025-06-14 19:40:42 +01:00
17 changed files with 132 additions and 326 deletions

1
Cargo.lock generated
View File

@@ -987,7 +987,6 @@ dependencies = [
"base64 0.22.1",
"blurhash",
"bytes",
"conduwuit_build_metadata",
"conduwuit_core",
"conduwuit_database",
"const-str",

View File

@@ -119,15 +119,6 @@
#
#allow_announcements_check = true
# If enabled, continuwuity will send anonymous analytics data periodically
# to help improve development. This includes basic server metadata like
# version, commit hash, and federation status. All requests are signed
# with the server's federation signing key. Data is sent on startup (with
# up to 5 minutes jitter) and every 12 hours thereafter (with up to 30
# minutes jitter) to distribute load.
#
#allow_analytics = true
# Set this to any float value to multiply continuwuity's in-memory LRU
# caches with such as "auth_chain_cache_capacity".
#

View File

@@ -3,7 +3,7 @@
"$id": "https://continwuity.org/schema/announcements.schema.json",
"type": "object",
"properties": {
"announcements": {
"updates": {
"type": "array",
"items": {
"type": "object",
@@ -16,10 +16,6 @@
},
"date": {
"type": "string"
},
"mention_room": {
"type": "boolean",
"description": "Whether to mention the room (@room) when posting this announcement"
}
},
"required": [
@@ -30,6 +26,6 @@
}
},
"required": [
"announcements"
"updates"
]
}

View File

@@ -145,16 +145,6 @@ pub(super) async fn restart(&self, force: bool) -> Result {
self.write_str("Restarting server...").await
}
#[admin_command]
pub(super) async fn upload_analytics(&self) -> Result {
match self.services.analytics.force_upload().await {
| Ok(()) => self.write_str("Analytics uploaded successfully.").await,
| Err(e) =>
self.write_str(&format!("Failed to upload analytics: {e}"))
.await,
}
}
#[admin_command]
pub(super) async fn shutdown(&self) -> Result {
warn!("shutdown command");

View File

@@ -64,7 +64,4 @@ pub(super) enum ServerCommand {
/// - Shutdown the server
Shutdown,
/// - Upload analytics
UploadAnalytics,
}

View File

@@ -1,7 +1,7 @@
use axum::extract::State;
use conduwuit::{Err, Event, Result, err};
use conduwuit::{Err, Event, PduEvent, Result, err};
use futures::{FutureExt, TryFutureExt, future::try_join};
use ruma::api::client::room::get_room_event;
use ruma::api::client::{error::ErrorKind, room::get_room_event};
use crate::{Ruma, client::is_ignored_pdu};
@@ -14,6 +14,7 @@ pub(crate) async fn get_room_event_route(
) -> Result<get_room_event::v3::Response> {
let event_id = &body.event_id;
let room_id = &body.room_id;
let sender_user = body.sender_user();
let event = services
.rooms
@@ -33,6 +34,52 @@ pub(crate) async fn get_room_event_route(
return Err!(Request(Forbidden("You don't have permission to view this event.")));
}
let include_unredacted_content = body
.include_unredacted_content // User's file has this field name
.unwrap_or(false);
if include_unredacted_content && event.is_redacted() {
let is_server_admin = services
.users
.is_admin(sender_user)
.map(|is_admin| Ok(is_admin));
let can_redact_privilege = services
.rooms
.state_accessor
.user_can_redact(event_id, sender_user, room_id, false) // federation=false for local check
;
let (is_server_admin, can_redact_privilege) =
try_join(is_server_admin, can_redact_privilege).await?;
if !is_server_admin && !can_redact_privilege {
return Err!(Request(Forbidden(
"You don't have permission to view redacted content.",
)));
}
let pdu_id = match services.rooms.timeline.get_pdu_id(event_id).await {
| Ok(id) => id,
| Err(e) => {
return Err(e);
},
};
let original_content = services
.rooms
.timeline
.get_original_pdu_content(&pdu_id)
.await?;
if let Some(original_content) = original_content {
// If the original content is available, we can return it.
// event.content = to_raw_value(&original_content)?;
event = PduEvent::from_id_val(event_id, original_content)?;
} else {
return Err(conduwuit::Error::BadRequest(
ErrorKind::UnredactedContentDeleted { content_keep_ms: None },
"The original unredacted content is not in the database.",
));
}
}
debug_assert!(
event.event_id() == event_id && event.room_id() == room_id,
"Fetched PDU must match requested"

View File

@@ -40,6 +40,7 @@ pub(crate) async fn get_supported_versions_route(
"v1.11".to_owned(),
],
unstable_features: BTreeMap::from_iter([
("fi.mau.msc2815".to_owned(), true),
("org.matrix.e2e_cross_signing".to_owned(), true),
("org.matrix.msc2285.stable".to_owned(), true), /* private read receipts (https://github.com/matrix-org/matrix-spec-proposals/pull/2285) */
("uk.half-shot.msc2666.query_mutual_rooms".to_owned(), true), /* query mutual rooms (https://github.com/matrix-org/matrix-spec-proposals/pull/2666) */

View File

@@ -169,18 +169,6 @@ pub struct Config {
#[serde(alias = "allow_check_for_updates", default = "true_fn")]
pub allow_announcements_check: bool,
/// If enabled, continuwuity will send anonymous analytics data periodically
/// to help improve development. This includes basic server metadata like
/// version, build information and federation status. All requests are
/// signed with the server's federation signing key.
///
/// This is also used to warn about potential problems with federation, if
/// federation is enabled.
///
/// default: true
#[serde(default = "true_fn")]
pub allow_analytics: bool,
/// Set this to any float value to multiply continuwuity's in-memory LRU
/// caches with such as "auth_chain_cache_capacity".
///

View File

@@ -121,6 +121,15 @@ pub(super) fn open_list(db: &Arc<Engine>, maps: &[Descriptor]) -> Result<Maps> {
index_size: 512,
..descriptor::SEQUENTIAL
},
Descriptor {
name: "pduid_originalcontent",
cache_disp: CacheDisp::SharedWith("pduid_pdu"),
key_size_hint: Some(16),
val_size_hint: Some(1520),
block_size: 2048,
index_size: 512,
..descriptor::RANDOM
},
Descriptor {
name: "publicroomids",
..descriptor::RANDOM_SMALL

View File

@@ -78,7 +78,6 @@ zstd_compression = [
async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
conduwuit-build-metadata.workspace = true
conduwuit-core.workspace = true
conduwuit-database.workspace = true
const-str.workspace = true

View File

@@ -1,245 +0,0 @@
//! # Analytics service
//!
//! This service is responsible for collecting and uploading anonymous server
//! metadata to help improve continuwuity development.
//!
//! All requests are signed with the server's federation signing key for
//! authentication. This service respects the `allow_analytics` configuration
//! option and is enabled by default.
//!
//! Analytics are sent on startup (with up to 5 minutes jitter) and every 12
//! hours thereafter (with up to 30 minutes jitter) to distribute load.
use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use conduwuit::{
Result, Server, debug, err, info,
version::{self, user_agent},
warn,
};
use database::{Deserialized, Map};
use rand::Rng;
use ruma::ServerName;
use serde::{Deserialize, Serialize};
use tokio::{
sync::Notify,
time::{MissedTickBehavior, interval},
};
use crate::{Dep, client, config, federation, globals, server_keys, users};
extern crate conduwuit_build_metadata as build_metadata;
pub struct Service {
interval: Duration,
jitter: Duration,
startup_jitter: Duration,
interrupt: Notify,
db: Arc<Map>,
services: Services,
}
struct Services {
client: Dep<client::Service>,
globals: Dep<globals::Service>,
server_keys: Dep<server_keys::Service>,
federation: Dep<federation::Service>,
users: Dep<users::Service>,
server: Arc<Server>,
config: Dep<config::Service>,
}
#[derive(Debug, Serialize)]
struct AnalyticsPayload {
server_name: String,
version: &'static str,
commit_hash: Option<&'static str>,
user_count: usize,
federation_enabled: bool,
room_creation_allowed: bool,
public_room_directory_over_federation: bool,
build_profile: &'static str,
opt_level: &'static str,
rustc_version: &'static str,
features: Vec<&'static str>,
host: &'static str,
target: &'static str,
// the following can all be derived from the target
target_arch: &'static str,
target_os: &'static str,
target_env: &'static str,
target_family: &'static str,
}
#[derive(Debug, Deserialize)]
struct AnalyticsResponse {
success: bool,
message: Option<String>,
}
const ANALYTICS_URL: &str = "https://analytics.continuwuity.org/api/v1/metrics";
const ANALYTICS_SERVERNAME: &str = "analytics.continuwuity.org";
const ANALYTICS_INTERVAL: u64 = 43200; // 12 hours in seconds
const ANALYTICS_JITTER: u64 = 1800; // 30 minutes in seconds
const ANALYTICS_STARTUP_JITTER: u64 = 300; // 5 minutes in seconds
const LAST_ANALYTICS_TIMESTAMP: &[u8; 21] = b"last_analytics_upload";
#[async_trait]
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let mut rng = rand::thread_rng();
let jitter_seconds = rng.gen_range(0..=ANALYTICS_JITTER);
let startup_jitter_seconds = rng.gen_range(0..=ANALYTICS_STARTUP_JITTER);
Ok(Arc::new(Self {
interval: Duration::from_secs(ANALYTICS_INTERVAL),
jitter: Duration::from_secs(jitter_seconds),
startup_jitter: Duration::from_secs(startup_jitter_seconds),
interrupt: Notify::new(),
db: args.db["global"].clone(),
services: Services {
globals: args.depend::<globals::Service>("globals"),
client: args.depend::<client::Service>("client"),
config: args.depend::<config::Service>("config"),
server_keys: args.depend::<server_keys::Service>("server_keys"),
users: args.depend::<users::Service>("users"),
federation: args.depend::<federation::Service>("federation"),
server: args.server.clone(),
},
}))
}
#[tracing::instrument(skip_all, name = "analytics", level = "debug")]
async fn worker(self: Arc<Self>) -> Result<()> {
if !self.services.server.config.allow_analytics {
debug!("Analytics collection is disabled");
return Ok(());
}
// Send initial analytics on startup (with shorter jitter)
tokio::time::sleep(self.startup_jitter).await;
if let Err(e) = self.upload_analytics().await {
warn!(%e, "Failed to upload initial analytics");
}
let mut i = interval(self.interval);
i.set_missed_tick_behavior(MissedTickBehavior::Delay);
i.reset_after(self.interval + self.jitter);
loop {
tokio::select! {
() = self.interrupt.notified() => break,
_ = i.tick() => {
if let Err(e) = self.upload_analytics().await {
warn!(%e, "Failed to upload analytics");
}
}
}
}
Ok(())
}
fn interrupt(&self) { self.interrupt.notify_waiters(); }
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
#[tracing::instrument(skip_all)]
async fn upload_analytics(&self) -> Result<()> {
let payload = self.collect_metadata().await;
let json_payload = serde_json::to_vec(&payload)?;
// Create HTTP request
let request = http::Request::builder()
.method("POST")
.uri(ANALYTICS_URL)
.header("Content-Type", "application/json")
.header("User-Agent", user_agent())
.body(json_payload)?;
// Sign the request using federation signing
let reqwest_request = self.services.federation.sign_non_federation_request(
ServerName::parse(ANALYTICS_SERVERNAME).unwrap(),
request,
)?;
// self.sign_analytics_request(&mut request).await?;
let response = self
.services
.client
.default
.execute(reqwest_request)
.await?;
let status = response.status();
if let Ok(analytics_response) =
serde_json::from_str::<AnalyticsResponse>(&response.text().await?)
{
if analytics_response.success {
debug!("Analytics uploaded successfully");
self.update_last_upload_timestamp().await;
}
let msg = analytics_response.message.unwrap_or_default();
warn!("Analytics upload warning: {}", msg);
} else if status.is_success() {
info!("Analytics uploaded successfully (no structured response)");
self.update_last_upload_timestamp().await;
} else {
warn!("Analytics upload failed (no structured response) with status: {}", status);
}
Ok(())
}
async fn collect_metadata(&self) -> AnalyticsPayload {
let config = &self.services.config;
AnalyticsPayload {
server_name: self.services.globals.server_name().to_string(),
version: version::version(),
commit_hash: build_metadata::GIT_COMMIT_HASH,
user_count: self.services.users.count().await,
federation_enabled: config.allow_federation,
room_creation_allowed: config.allow_room_creation,
public_room_directory_over_federation: config
.allow_public_room_directory_over_federation,
build_profile: build_metadata::built::PROFILE,
opt_level: build_metadata::built::OPT_LEVEL,
rustc_version: build_metadata::built::RUSTC_VERSION,
features: build_metadata::built::FEATURES.to_vec(),
host: build_metadata::built::HOST,
target: build_metadata::built::TARGET,
target_arch: build_metadata::built::CFG_TARGET_ARCH,
target_os: build_metadata::built::CFG_OS,
target_env: build_metadata::built::CFG_ENV,
target_family: build_metadata::built::CFG_FAMILY,
}
}
async fn update_last_upload_timestamp(&self) {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.db.raw_put(LAST_ANALYTICS_TIMESTAMP, timestamp);
}
pub async fn last_upload_timestamp(&self) -> u64 {
self.db
.get(LAST_ANALYTICS_TIMESTAMP)
.await
.deserialized()
.unwrap_or(0_u64)
}
pub async fn force_upload(&self) -> Result<()> {
if !self.services.config.allow_analytics {
return Err(err!(Config("allow_analytics", "Analytics collection is disabled")));
}
self.upload_analytics().await
}
}

View File

@@ -20,7 +20,7 @@
use async_trait::async_trait;
use conduwuit::{Result, Server, debug, info, warn};
use database::{Deserialized, Map};
use ruma::events::{Mentions, room::message::RoomMessageEventContent};
use ruma::events::room::message::RoomMessageEventContent;
use serde::Deserialize;
use tokio::{
sync::Notify,
@@ -53,8 +53,6 @@ struct CheckForAnnouncementsResponseEntry {
id: u64,
date: Option<String>,
message: String,
#[serde(default, skip_serializing_if = "bool::not")]
mention_room: bool,
}
const CHECK_FOR_ANNOUNCEMENTS_URL: &str =
@@ -141,20 +139,19 @@ async fn handle(&self, announcement: &CheckForAnnouncementsResponseEntry) {
} else {
info!("[announcements] {:#}", announcement.message);
}
let mut message = RoomMessageEventContent::text_markdown(format!(
"### New announcement{}\n\n{}",
announcement
.date
.as_ref()
.map_or_else(String::new, |date| format!(" - `{date}`")),
announcement.message
));
if announcement.mention_room {
message = message.add_mentions(Mentions::with_room_mention());
}
self.services.admin.send_message(message).await.ok();
self.services
.admin
.send_message(RoomMessageEventContent::text_markdown(format!(
"### New announcement{}\n\n{}",
announcement
.date
.as_ref()
.map_or_else(String::new, |date| format!(" - `{date}`")),
announcement.message
)))
.await
.ok();
}
#[inline]

View File

@@ -107,20 +107,6 @@ fn prepare(&self, dest: &ServerName, mut request: http::Request<Vec<u8>>) -> Res
Ok(request)
}
#[implement(super::Service)]
pub fn sign_non_federation_request(
&self,
dest: &ServerName,
mut request: http::Request<Vec<u8>>,
) -> Result<Request> {
self.sign_request(&mut request, dest);
let request = Request::try_from(request)?;
self.services.server.check_running()?;
Ok(request)
}
#[implement(super::Service)]
fn validate_url(&self, url: &Url) -> Result<()> {
if let Some(url_host) = url.host_str() {

View File

@@ -9,7 +9,6 @@
pub mod account_data;
pub mod admin;
pub mod analytics;
pub mod announcements;
pub mod appservice;
pub mod client;

View File

@@ -19,6 +19,8 @@ pub(super) struct Data {
pduid_pdu: Arc<Map>,
userroomid_highlightcount: Arc<Map>,
userroomid_notificationcount: Arc<Map>,
/// Stores the original content of redacted PDUs.
pduid_originalcontent: Arc<Map>,
pub(super) db: Arc<Database>,
services: Services,
}
@@ -38,6 +40,7 @@ pub(super) fn new(args: &crate::Args<'_>) -> Self {
pduid_pdu: db["pduid_pdu"].clone(),
userroomid_highlightcount: db["userroomid_highlightcount"].clone(),
userroomid_notificationcount: db["userroomid_notificationcount"].clone(),
pduid_originalcontent: db["pduid_originalcontent"].clone(), // Initialize new table
db: args.db.clone(),
services: Services {
short: args.depend::<rooms::short::Service>("rooms::short"),
@@ -177,6 +180,24 @@ pub(super) async fn get_pdu_json_from_id(
self.pduid_pdu.get(pdu_id).await.deserialized()
}
/// Stores the original content of a PDU that is about to be redacted.
pub(super) async fn store_redacted_pdu_content(
&self,
pdu_id: &RawPduId,
pdu_json: &CanonicalJsonObject,
) -> Result<()> {
self.pduid_originalcontent.raw_put(pdu_id, Json(pdu_json));
Ok(())
}
/// Returns the original content of a redacted PDU.
pub(super) async fn get_original_pdu_content(
&self,
pdu_id: &RawPduId,
) -> Result<Option<CanonicalJsonObject>> {
self.pduid_originalcontent.get(pdu_id).await.deserialized()
}
pub(super) async fn append_pdu(
&self,
pdu_id: &RawPduId,

View File

@@ -260,6 +260,25 @@ pub async fn replace_pdu(
self.db.replace_pdu(pdu_id, pdu_json, pdu).await
}
/// Stores the content of a to-be redacted pdu.
#[tracing::instrument(skip(self), level = "debug")]
pub async fn store_redacted_pdu_content(
&self,
pdu_id: &RawPduId,
pdu_json: &CanonicalJsonObject,
) -> Result<()> {
self.db.store_redacted_pdu_content(pdu_id, pdu_json).await
}
/// Returns the original content of a redacted PDU.
#[tracing::instrument(skip(self), level = "debug")]
pub async fn get_original_pdu_content(
&self,
pdu_id: &RawPduId,
) -> Result<Option<CanonicalJsonObject>> {
self.db.get_original_pdu_content(pdu_id).await
}
/// Creates a new persisted data unit and adds it to a room.
///
/// By this point the incoming event should be fully authenticated, no auth
@@ -472,7 +491,7 @@ pub async fn append_pdu<'a, Leaves>(
.user_can_redact(redact_id, &pdu.sender, &pdu.room_id, false)
.await?
{
self.redact_pdu(redact_id, pdu, shortroomid).await?;
self.redact_pdu(redact_id, pdu, shortroomid, true).await?;
}
}
},
@@ -485,7 +504,7 @@ pub async fn append_pdu<'a, Leaves>(
.user_can_redact(redact_id, &pdu.sender, &pdu.room_id, false)
.await?
{
self.redact_pdu(redact_id, pdu, shortroomid).await?;
self.redact_pdu(redact_id, pdu, shortroomid, true).await?;
}
}
},
@@ -1033,6 +1052,7 @@ pub async fn redact_pdu(
event_id: &EventId,
reason: &PduEvent,
shortroomid: ShortRoomId,
keep_original_content: bool,
) -> Result {
// TODO: Don't reserialize, keep original json
let Ok(pdu_id) = self.get_pdu_id(event_id).await else {
@@ -1054,6 +1074,19 @@ pub async fn redact_pdu(
let room_version_id = self.services.state.get_room_version(&pdu.room_id).await?;
if keep_original_content && !pdu.is_redacted() {
let original_pdu_json = utils::to_canonical_object(&pdu).map_err(|e| {
err!(Database(error!(
?event_id,
?e,
"Failed to convert PDU to canonical JSON for original content storage"
)))
})?;
self.db
.store_redacted_pdu_content(&pdu_id, &original_pdu_json)
.await?;
}
pdu.redact(&room_version_id, reason)?;
let obj = utils::to_canonical_object(&pdu).map_err(|e| {

View File

@@ -10,8 +10,8 @@
use tokio::sync::Mutex;
use crate::{
account_data, admin, analytics, announcements, appservice, client, config, emergency,
federation, globals, key_backups,
account_data, admin, announcements, appservice, client, config, emergency, federation,
globals, key_backups,
manager::Manager,
media, moderation, presence, pusher, resolver, rooms, sending, server_keys, service,
service::{Args, Map, Service},
@@ -21,7 +21,6 @@
pub struct Services {
pub account_data: Arc<account_data::Service>,
pub admin: Arc<admin::Service>,
pub analytics: Arc<analytics::Service>,
pub appservice: Arc<appservice::Service>,
pub config: Arc<config::Service>,
pub client: Arc<client::Service>,
@@ -69,7 +68,6 @@ macro_rules! build {
Ok(Arc::new(Self {
account_data: build!(account_data::Service),
admin: build!(admin::Service),
analytics: build!(analytics::Service),
appservice: build!(appservice::Service),
resolver: build!(resolver::Service),
client: build!(client::Service),