From fe7cfd96e7b40417fd40ff8a7987b3d1dd157f1d Mon Sep 17 00:00:00 2001 From: timedout Date: Mon, 27 Apr 2026 22:39:41 +0100 Subject: [PATCH] feat: Assert that no events were dropped during sorting --- src/api/server/send.rs | 73 +++++++++++++++++++++++++----------------- 1 file changed, 43 insertions(+), 30 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index fc92fc31c..ee5509d7a 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -1,12 +1,15 @@ -use crate::Ruma; +use std::{ + collections::{BTreeMap, HashMap, HashSet}, + net::IpAddr, + time::{Duration, Instant}, +}; + use axum::extract::State; use axum_client_ip::ClientIp; -use conduwuit::utils::TryFutureExtExt; use conduwuit::{ debug, debug_warn, err, error, result::LogErr, state_res::lexicographical_topological_sort, trace, utils::{ - millis_since_unix_epoch, stream::{automatic_width, BroadbandExt, TryBroadbandExt}, IterStream, - ReadyExt, + millis_since_unix_epoch, stream::{automatic_width, BroadbandExt, TryBroadbandExt}, IterStream, ReadyExt, }, warn, Err, @@ -20,30 +23,33 @@ use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use http::StatusCode; use itertools::Itertools; -use ruma::{api::{ - client::error::{ErrorKind, ErrorKind::LimitExceeded}, - federation::transactions::{ - edu::{ - DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent, - PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap, SigningKeyUpdateContent, - TypingContent, +use ruma::{ + api::{ + client::error::{ErrorKind, ErrorKind::LimitExceeded}, + federation::transactions::{ + edu::{ + DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent, + PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap, SigningKeyUpdateContent, + TypingContent, + }, + send_transaction_message, }, - send_transaction_message, - }, -}, events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType}, int, serde::Raw, to_device::DeviceIdOrAllDevices, uint, CanonicalJsonObject, Int, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, ServerName, UInt, UserId}; + }, events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType}, int, serde::Raw, to_device::DeviceIdOrAllDevices, + CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, + OwnedUserId, + RoomId, + ServerName, + UInt, + UserId, +}; use service::transactions::{ FederationTxnState, TransactionError, TxnKey, WrappedTransactionResponse, }; -use std::cmp::Reverse; -use std::hash::Hasher; -use std::{ - collections::{BTreeMap, HashMap, HashSet}, - net::IpAddr, - time::{Duration, Instant}, -}; use tokio::sync::watch::{Receiver, Sender}; use tracing::instrument; +use crate::Ruma; + type ResolvedMap = BTreeMap; type Pdu = (OwnedRoomId, OwnedEventId, CanonicalJsonObject); @@ -273,16 +279,18 @@ async fn build_local_dag( pdu_map: &HashMap, ) -> Result> { debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs"); - let mut dag: HashMap> = HashMap::with_capacity(pdu_map.len()); + let mut dag: HashMap> = + HashMap::with_capacity(pdu_map.len()); let mut id_origin_ts: HashMap = HashMap::with_capacity(pdu_map.len()); for (event_id, value) in pdu_map { // We already checked that these properties are correct in parse_incoming_pdu, // so it's safe to unwrap here. - // We also filter to remove any prev_events that are not in this pdu_map, as we need to have - // at least one event with zero out degrees for the lexico-topo sort below. If there are - // multiple events with omitted prevs, they will be ordered by timestamp, then event ID. At - // that point though, it's unlikely to matter. + // We also filter to remove any prev_events that are not in this pdu_map, as we + // need to have at least one event with zero out degrees for the lexico-topo + // sort below. If there are multiple events with omitted prevs, they will be + // ordered by timestamp, then event ID. At that point though, it's unlikely to + // matter. let prev_events = value .get("prev_events") .unwrap() @@ -294,13 +302,14 @@ async fn build_local_dag( .collect(); dag.insert(event_id.clone(), prev_events); - let origin_server_ts = value.get("origin_server_ts") + let origin_server_ts = value + .get("origin_server_ts") .and_then(ruma::CanonicalJsonValue::as_integer) .unwrap_or_default(); id_origin_ts.insert(event_id.clone(), origin_server_ts); } - lexicographical_topological_sort(&dag, &async |node_id| { + let sorted = lexicographical_topological_sort(&dag, &async |node_id| { // Note: we don't bother fetching power levels because that would massively slow // this function down. This is a best-effort attempt to order events correctly // for processing, however ultimately that should be the sender's job. @@ -315,8 +324,12 @@ async fn build_local_dag( .unwrap_or_default(); Ok((int!(0), MilliSecondsSinceUnixEpoch(ts))) }) - .await - .map_err(|e| err!("failed to resolve local graph: {e}")) + .await + .map_err(|e| err!("failed to resolve local graph: {e}")); + if let Ok(ref s) = sorted { + assert_eq!(s.len(), pdu_map.len(), "Sorted graph was not the same size as the input graph"); + }; + sorted } async fn handle_room(