Compare commits

...

8 Commits

Author SHA1 Message Date
timedout
422db2105c style: Run linter 2026-03-29 15:16:54 +01:00
timedout
8b206564aa feat: Reduce verbosity of "remote server couldn't process pdu" warning 2026-03-29 14:28:33 +01:00
timedout
127030ac38 fix: Only pop unsigned from PDUs 2026-03-29 14:21:30 +01:00
timedout
303ad4ab9c fix: Don't include origin in remote memberships 2026-03-29 14:18:13 +01:00
timedout
053860e496 fix: Correctly handle size evaluation of local events
Also improved the performance of `pdu_fits`
2026-03-29 13:42:49 +01:00
timedout
b0e3b9eeb5 style: Fix clippy lint 2026-03-27 22:54:52 +00:00
timedout
d636da06e2 chore: Add news fragment 2026-03-27 22:19:26 +00:00
timedout
3cec9d0077 fix: Correctly, explicitly format outgoing events 2026-03-27 22:11:27 +00:00
9 changed files with 49 additions and 113 deletions

1
changelog.d/1586.feature Normal file
View File

@@ -0,0 +1 @@
Drop unnecessary fields when converting an event into the federation format, saving bandwidth. Contributed by @nex.

View File

@@ -584,7 +584,7 @@ async fn join_room_by_id_helper_remote(
return state;
},
};
if !pdu_fits(&mut value.clone()) {
if !pdu_fits(&value) {
warn!(
"dropping incoming PDU {event_id} in room {room_id} from room join because \
it exceeds 65535 bytes or is otherwise too large."

View File

@@ -409,11 +409,6 @@ async fn knock_room_helper_local(
room_id,
&knock_event_stub,
)?;
knock_event_stub.insert(
"origin".to_owned(),
CanonicalJsonValue::String(services.globals.server_name().as_str().to_owned()),
);
knock_event_stub.insert(
"origin_server_ts".to_owned(),
CanonicalJsonValue::Integer(

View File

@@ -9,7 +9,7 @@
};
use futures::{FutureExt, StreamExt, pin_mut};
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, OwnedServerName, RoomId, RoomVersionId, UserId,
CanonicalJsonObject, CanonicalJsonValue, OwnedServerName, RoomId, UserId,
api::{
client::membership::leave_room,
federation::{self},
@@ -337,6 +337,7 @@ pub async fn remote_leave_room<S: ::std::hash::BuildHasher>(
"Invalid make_leave event json received from {remote_server} for {room_id}: {e:?}"
)))
})?;
leave_event_stub.remove("event_id");
validate_remote_member_event_stub(
&MembershipState::Leave,
@@ -345,11 +346,6 @@ pub async fn remote_leave_room<S: ::std::hash::BuildHasher>(
&leave_event_stub,
)?;
// TODO: Is origin needed?
leave_event_stub.insert(
"origin".to_owned(),
CanonicalJsonValue::String(services.globals.server_name().as_str().to_owned()),
);
leave_event_stub.insert(
"origin_server_ts".to_owned(),
CanonicalJsonValue::Integer(
@@ -365,14 +361,6 @@ pub async fn remote_leave_room<S: ::std::hash::BuildHasher>(
}
}
// room v3 and above removed the "event_id" field from remote PDU format
match room_version_id {
| RoomVersionId::V1 | RoomVersionId::V2 => {},
| _ => {
leave_event_stub.remove("event_id");
},
}
// In order to create a compatible ref hash (EventID) the `hashes` field needs
// to be present
services

View File

@@ -127,12 +127,12 @@ pub async fn handle_incoming_pdu<'a>(
if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await {
return Ok(Some(pdu_id));
}
if !pdu_fits(&mut value.clone()) {
if !pdu_fits(&value) {
warn!(
"dropping incoming PDU {event_id} in room {room_id} from {origin} because it \
exceeds 65535 bytes or is otherwise too large."
);
return Err!(Request(TooLarge("PDU is too large")));
return Err!(Request(TooLarge("PDU {event_id} is too large")));
}
trace!("processing incoming PDU from {origin} for room {room_id} with event id {event_id}");

View File

@@ -1,8 +1,7 @@
use std::collections::{BTreeMap, HashMap, hash_map};
use conduwuit::{
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res,
trace, warn,
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res, trace,
};
use futures::future::ready;
use ruma::{
@@ -11,7 +10,6 @@
};
use super::{check_room_id, get_room_version_id, to_room_version};
use crate::rooms::timeline::pdu_fits;
#[implement(super::Service)]
#[allow(clippy::too_many_arguments)]
@@ -27,18 +25,9 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
where
Pdu: Event + Send + Sync,
{
if !pdu_fits(&mut value.clone()) {
warn!(
"dropping incoming PDU {event_id} in room {room_id} from {origin} because it \
exceeds 65535 bytes or is otherwise too large."
);
return Err!(Request(TooLarge("PDU is too large")));
}
// 1. Remove unsigned field
value.remove("unsigned");
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
// 2. Check signatures, otherwise drop
// 3. check content hash, redact if doesn't match
let room_version_id = get_room_version_id(create_event)?;

View File

@@ -22,33 +22,18 @@
use super::RoomMutexGuard;
pub fn pdu_fits(owned_obj: &mut CanonicalJsonObject) -> bool {
#[must_use]
pub fn pdu_fits(owned_obj: &CanonicalJsonObject) -> bool {
// room IDs, event IDs, senders, types, and state keys must all be <= 255 bytes
if let Some(CanonicalJsonValue::String(room_id)) = owned_obj.get("room_id") {
if room_id.len() > 255 {
return false;
}
}
if let Some(CanonicalJsonValue::String(event_id)) = owned_obj.get("event_id") {
if event_id.len() > 255 {
return false;
}
}
if let Some(CanonicalJsonValue::String(sender)) = owned_obj.get("sender") {
if sender.len() > 255 {
return false;
}
}
if let Some(CanonicalJsonValue::String(kind)) = owned_obj.get("type") {
if kind.len() > 255 {
return false;
}
}
if let Some(CanonicalJsonValue::String(state_key)) = owned_obj.get("state_key") {
if state_key.len() > 255 {
return false;
const STANDARD_KEYS: [&str; 5] = ["room_id", "event_id", "sender", "type", "state_key"];
for key in STANDARD_KEYS {
if let Some(CanonicalJsonValue::String(v)) = owned_obj.get(key) {
if v.len() > 255 {
return false;
}
}
}
// Now check the full PDU size
match serde_json::to_string(owned_obj) {
| Ok(s) => s.len() <= 65535,
@@ -259,14 +244,9 @@ fn from_evt(
let mut pdu_json = utils::to_canonical_object(&pdu).map_err(|e| {
err!(Request(BadJson(warn!("Failed to convert PDU to canonical JSON: {e}"))))
})?;
// room v3 and above removed the "event_id" field from remote PDU format
match room_version_id {
| RoomVersionId::V1 | RoomVersionId::V2 => {},
| _ => {
pdu_json.remove("event_id");
},
}
pdu_json.remove("event_id");
// We need to pop unsigned temporarily for the size check
let unsigned_tmp = pdu_json.remove_entry("unsigned");
trace!("hashing and signing event {}", pdu.event_id);
if let Err(e) = self
@@ -276,24 +256,28 @@ fn from_evt(
{
return match e {
| Error::Signatures(ruma::signatures::Error::PduSize) => {
Err!(Request(TooLarge("Message/PDU is too long (exceeds 65535 bytes)")))
// Ruma does do a PDU size check when generating the content hash, but it
// doesn't do it very correctly.
Err!(Request(TooLarge("PDU is too long large (exceeds 65535 bytes)")))
},
| _ => Err!(Request(Unknown(warn!("Signing event failed: {e}")))),
};
}
// Verify that the *full* PDU isn't over 64KiB.
if !pdu_fits(&pdu_json) {
return Err!(Request(TooLarge("PDU is too long large (exceeds 65535 bytes)")));
}
// Re-insert unsigned data
if let Some((key, value)) = unsigned_tmp {
pdu_json.insert(key, value);
}
// Generate event id
pdu.event_id = gen_event_id(&pdu_json, &room_version_id)?;
pdu_json.insert("event_id".into(), CanonicalJsonValue::String(pdu.event_id.clone().into()));
// Verify that the *full* PDU isn't over 64KiB.
// Ruma only validates that it's under 64KiB before signing and hashing.
// Has to be cloned to prevent mutating pdu_json itself :(
if !pdu_fits(&mut pdu_json.clone()) {
// feckin huge PDU mate
return Err!(Request(TooLarge("Message/PDU is too long (exceeds 65535 bytes)")));
}
// Check with the policy server
if room_id.is_some() {
if let Some(room_id) = pdu.room_id() {
trace!(
"Checking event in room {} with policy server",
pdu.room_id.as_ref().map_or("None", |id| id.as_str())
@@ -301,7 +285,7 @@ fn from_evt(
match self
.services
.event_handler
.ask_policy_server(&pdu, &mut pdu_json, pdu.room_id().expect("has room ID"), false)
.ask_policy_server(&pdu, &mut pdu_json, room_id, false)
.await
{
| Ok(true) => {},

View File

@@ -42,7 +42,6 @@ pub struct Service {
struct Services {
client: Dep<client::Service>,
globals: Dep<globals::Service>,
state: Dep<rooms::state::Service>,
state_cache: Dep<rooms::state_cache::Service>,
user: Dep<rooms::user::Service>,
users: Dep<users::Service>,
@@ -86,7 +85,6 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
services: Services {
client: args.depend::<client::Service>("client"),
globals: args.depend::<globals::Service>("globals"),
state: args.depend::<rooms::state::Service>("rooms::state"),
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
user: args.depend::<rooms::user::Service>("rooms::user"),
users: args.depend::<users::Service>("users"),

View File

@@ -9,6 +9,7 @@
};
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
use conduwuit::info;
use conduwuit_core::{
Error, Event, Result, at, debug, err, error,
result::LogErr,
@@ -28,7 +29,7 @@
};
use ruma::{
CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName, OwnedUserId,
RoomId, RoomVersionId, ServerName, UInt,
RoomId, ServerName, UInt,
api::{
appservice::event::push_events::v1::EphemeralData,
federation::transactions::{
@@ -866,9 +867,12 @@ async fn send_events_dest_federation(
for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) {
if let Err(e) = result {
warn!(
%txn_id, %server,
"error sending PDU {event_id} to remote server: {e:?}"
info!(
%txn_id,
%server,
%event_id,
remote_error=?e,
"remote server encountered an error while processing an event"
);
}
}
@@ -879,41 +883,18 @@ async fn send_events_dest_federation(
}
}
/// This does not return a full `Pdu` it is only to satisfy ruma's types.
/// Filters through a PDU JSON blob, retaining only keys that are expected
/// over federation.
pub async fn convert_to_outgoing_federation_event(
&self,
mut pdu_json: CanonicalJsonObject,
) -> Box<RawJsonValue> {
if let Some(unsigned) = pdu_json
.get_mut("unsigned")
.and_then(|val| val.as_object_mut())
{
unsigned.remove("transaction_id");
}
// room v3 and above removed the "event_id" field from remote PDU format
if let Some(room_id) = pdu_json
.get("room_id")
.and_then(|val| RoomId::parse(val.as_str()?).ok())
{
match self.services.state.get_room_version(room_id).await {
| Ok(room_version_id) => match room_version_id {
| RoomVersionId::V1 | RoomVersionId::V2 => {},
| _ => _ = pdu_json.remove("event_id"),
},
| Err(_) => _ = pdu_json.remove("event_id"),
}
} else {
pdu_json.remove("event_id");
}
// TODO: another option would be to convert it to a canonical string to validate
// size and return a Result<Raw<...>>
// serde_json::from_str::<Raw<_>>(
// ruma::serde::to_canonical_json_string(pdu_json).expect("CanonicalJson is
// valid serde_json::Value"), )
// .expect("Raw::from_value always works")
pdu_json.remove("unsigned");
// NOTE: Historically there have been attempts to further reduce the amount of
// data sent over the wire to remote servers. However, so far, the only key
// that is safe to drop entirely is `unsigned` - the rest of the keys are
// actually included as part of the content hash, and will cause issues if
// removed, even if they're irrelevant.
to_raw_value(&pdu_json).expect("CanonicalJson is valid serde_json::Value")
}
}