mirror of
https://forgejo.ellis.link/continuwuation/continuwuity/
synced 2026-06-10 17:41:45 +00:00
Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8182ab3c29 | |||
| 8ec13eaae1 | |||
| 760e4dadbb | |||
| ec9d032c89 | |||
| 6353d1123e | |||
| d54d06038a | |||
| ea6c5c1cfd | |||
| 5325f89b1b | |||
| 976eedf024 |
@@ -0,0 +1 @@
|
||||
Added support for Matrix 1.16's `state_after` feature, allowing clients which understand it to sync room state changes more reliably. Contributed by @ginger.
|
||||
@@ -0,0 +1 @@
|
||||
Adjusted legacy sync logic to no longer use the `roomsynctoken_shortstatehash` database column. Once this change has been confirmed to be stable and reliable, a future update will remove it entirely, significantly decreasing database sizes. Contributed by @ginger.
|
||||
@@ -48,6 +48,13 @@ async fn load_timeline(
|
||||
ending_count: Option<PduCount>,
|
||||
limit: usize,
|
||||
) -> Result<TimelinePdus> {
|
||||
if let (Some(starting_count), Some(ending_count)) = (starting_count, ending_count) {
|
||||
debug_assert!(
|
||||
starting_count <= ending_count,
|
||||
"starting count {starting_count} > ending count {ending_count}"
|
||||
);
|
||||
}
|
||||
|
||||
let mut pdu_stream = match starting_count {
|
||||
| Some(starting_count) => {
|
||||
let last_timeline_count = services
|
||||
|
||||
@@ -38,6 +38,7 @@
|
||||
uint,
|
||||
};
|
||||
use service::{account_data::AnyRawAccountDataEvent, rooms::short::ShortStateHash};
|
||||
use tokio::pin;
|
||||
|
||||
use super::{load_timeline, share_encrypted_room};
|
||||
use crate::client::{
|
||||
@@ -96,12 +97,19 @@ pub(super) async fn load_joined_room(
|
||||
);
|
||||
}
|
||||
|
||||
let state_events =
|
||||
StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect());
|
||||
|
||||
let joined_room = assign!(JoinedRoom::new(), {
|
||||
account_data,
|
||||
summary: summary.unwrap_or_default(),
|
||||
unread_notifications: notification_counts.unwrap_or_default(),
|
||||
timeline,
|
||||
state: RoomState::Before(StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect())),
|
||||
state: if sync_context.use_state_after {
|
||||
RoomState::After(state_events)
|
||||
} else {
|
||||
RoomState::Before(state_events)
|
||||
},
|
||||
ephemeral,
|
||||
unread_thread_notifications: BTreeMap::new(),
|
||||
});
|
||||
@@ -344,7 +352,7 @@ struct ShortStateHashes {
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
async fn fetch_shortstatehashes(
|
||||
services: &Services,
|
||||
SyncContext { last_sync_end_count, current_count, .. }: SyncContext<'_>,
|
||||
SyncContext { last_sync_end_count, .. }: SyncContext<'_>,
|
||||
room_id: &RoomId,
|
||||
) -> Result<ShortStateHashes> {
|
||||
// the room state currently.
|
||||
@@ -354,46 +362,45 @@ async fn fetch_shortstatehashes(
|
||||
.rooms
|
||||
.state
|
||||
.get_room_shortstatehash(room_id)
|
||||
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
|
||||
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))))
|
||||
.await?;
|
||||
|
||||
// the room state as of the end of the last sync.
|
||||
// this will be None if we are doing an initial sync or if we just joined this
|
||||
// room.
|
||||
// The room state as of the end of the last sync.
|
||||
// This will be None if we are doing an initial sync.
|
||||
let last_sync_end_shortstatehash =
|
||||
OptionFuture::from(last_sync_end_count.map(|last_sync_end_count| {
|
||||
// look up the shortstatehash saved by the last sync's call to
|
||||
// `associate_token_shortstatehash`
|
||||
services
|
||||
.rooms
|
||||
.user
|
||||
.get_token_shortstatehash(room_id, last_sync_end_count)
|
||||
.inspect_err(move |_| {
|
||||
debug_warn!(
|
||||
token = last_sync_end_count,
|
||||
"Room has no shortstatehash for this token"
|
||||
);
|
||||
})
|
||||
.ok()
|
||||
OptionFuture::from(last_sync_end_count.map(async |last_sync_end_count| {
|
||||
pin! {
|
||||
let pdus = services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus(room_id, Some(PduCount::Normal(last_sync_end_count)))
|
||||
.ignore_err();
|
||||
}
|
||||
|
||||
match pdus.next().await {
|
||||
| Some((_, pdu_after_last_sync_end)) => {
|
||||
trace!(?pdu_after_last_sync_end.event_id, "pdu at last sync end");
|
||||
|
||||
Some(
|
||||
services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.pdu_shortstatehash(&pdu_after_last_sync_end.event_id)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
err!("Last sync end PDU has no shortstatehash: {err}")
|
||||
}),
|
||||
)
|
||||
},
|
||||
| None => {
|
||||
// No events have been sent since the last sync, or we just joined this room
|
||||
None
|
||||
},
|
||||
}
|
||||
}))
|
||||
.map(Option::flatten)
|
||||
.map(Ok);
|
||||
|
||||
let (current_shortstatehash, last_sync_end_shortstatehash) =
|
||||
try_join(current_shortstatehash, last_sync_end_shortstatehash).await?;
|
||||
|
||||
/*
|
||||
associate the `current_count` with the `current_shortstatehash`, so we can
|
||||
use it on the next sync as the `last_sync_end_shortstatehash`.
|
||||
|
||||
TODO: the table written to by this call grows extremely fast, gaining one new entry for each
|
||||
joined room on _every single sync request_. we need to find a better way to remember the shortstatehash
|
||||
between syncs.
|
||||
*/
|
||||
services
|
||||
.rooms
|
||||
.user
|
||||
.associate_token_shortstatehash(room_id, current_count, current_shortstatehash)
|
||||
.await;
|
||||
.await
|
||||
.flatten()
|
||||
.transpose()?;
|
||||
|
||||
Ok(ShortStateHashes {
|
||||
current_shortstatehash,
|
||||
@@ -452,6 +459,7 @@ async fn build_state_events(
|
||||
syncing_user,
|
||||
last_sync_end_count,
|
||||
full_state,
|
||||
use_state_after,
|
||||
..
|
||||
} = sync_context;
|
||||
|
||||
@@ -460,32 +468,15 @@ async fn build_state_events(
|
||||
last_sync_end_shortstatehash,
|
||||
} = shortstatehashes;
|
||||
|
||||
// the spec states that the `state` property only includes state events up to
|
||||
// the beginning of the timeline, so we determine the state of the syncing room
|
||||
// as of the first timeline event. NOTE: this explanation is not entirely
|
||||
// accurate; see the implementation of `build_state_incremental`.
|
||||
let timeline_start_shortstatehash = async {
|
||||
if let Some((_, pdu)) = timeline.pdus.front() {
|
||||
if let Ok(shortstatehash) = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.pdu_shortstatehash(&pdu.event_id)
|
||||
.await
|
||||
{
|
||||
return shortstatehash;
|
||||
}
|
||||
}
|
||||
|
||||
current_shortstatehash
|
||||
};
|
||||
if timeline.pdus.is_empty() {
|
||||
// If the timeline is empty there can't possibly be any changes to the state
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
// the user IDs of members whose membership needs to be sent to the client, if
|
||||
// lazy-loading is enabled.
|
||||
let lazily_loaded_members =
|
||||
prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders());
|
||||
|
||||
let (timeline_start_shortstatehash, lazily_loaded_members) =
|
||||
join(timeline_start_shortstatehash, lazily_loaded_members).await;
|
||||
prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders()).await;
|
||||
|
||||
// compute the state delta between the previous sync and this sync.
|
||||
match (last_sync_end_count, last_sync_end_shortstatehash) {
|
||||
@@ -494,16 +485,14 @@ async fn build_state_events(
|
||||
is Some (meaning the syncing user didn't just join this room for the first time ever), and `full_state` is false,
|
||||
then use `build_state_incremental`.
|
||||
*/
|
||||
| (Some(last_sync_end_count), Some(last_sync_end_shortstatehash)) if !full_state =>
|
||||
| (Some(_), Some(last_sync_end_shortstatehash)) if !full_state =>
|
||||
build_state_incremental(
|
||||
services,
|
||||
syncing_user,
|
||||
room_id,
|
||||
PduCount::Normal(last_sync_end_count),
|
||||
last_sync_end_shortstatehash,
|
||||
timeline_start_shortstatehash,
|
||||
current_shortstatehash,
|
||||
timeline,
|
||||
use_state_after,
|
||||
lazily_loaded_members.as_ref(),
|
||||
)
|
||||
.boxed()
|
||||
@@ -517,7 +506,9 @@ async fn build_state_events(
|
||||
build_state_initial(
|
||||
services,
|
||||
syncing_user,
|
||||
timeline_start_shortstatehash,
|
||||
current_shortstatehash,
|
||||
timeline,
|
||||
use_state_after,
|
||||
lazily_loaded_members.as_ref(),
|
||||
)
|
||||
.boxed()
|
||||
@@ -598,23 +589,25 @@ async fn check_joined_since_last_sync(
|
||||
ShortStateHashes { last_sync_end_shortstatehash, .. }: ShortStateHashes,
|
||||
SyncContext { syncing_user, .. }: SyncContext<'_>,
|
||||
) -> Result<bool> {
|
||||
// fetch the syncing user's membership event during the last sync.
|
||||
// this will be None if `previous_sync_end_shortstatehash` is None.
|
||||
let membership_during_previous_sync = match last_sync_end_shortstatehash {
|
||||
| Some(last_sync_end_shortstatehash) => services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_get_content(
|
||||
last_sync_end_shortstatehash,
|
||||
&StateEventType::RoomMember,
|
||||
syncing_user.as_str(),
|
||||
)
|
||||
.await
|
||||
.inspect_err(|_| debug_warn!("User has no previous membership"))
|
||||
.ok(),
|
||||
| None => None,
|
||||
let Some(last_sync_end_shortstatehash) = last_sync_end_shortstatehash else {
|
||||
// For initial syncs always return false, since there's no "last sync" for the
|
||||
// user to have joined since.
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
// Fetch the syncing user's membership event during the last sync.
|
||||
let membership_during_previous_sync = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_get_content(
|
||||
last_sync_end_shortstatehash,
|
||||
&StateEventType::RoomMember,
|
||||
syncing_user.as_str(),
|
||||
)
|
||||
.await
|
||||
.inspect_err(|_| debug_warn!("User has no previous membership"))
|
||||
.ok();
|
||||
|
||||
// TODO: If the requesting user got state-reset out of the room, this
|
||||
// will be `true` when it shouldn't be. this function should never be called
|
||||
// in that situation, but it may be if the membership cache didn't get updated.
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
trace,
|
||||
utils::{self, IterStream, future::ReadyEqExt, stream::WidebandExt as _},
|
||||
};
|
||||
use futures::{StreamExt, future::join};
|
||||
use futures::StreamExt;
|
||||
use ruma::{
|
||||
EventId, OwnedRoomId, RoomId,
|
||||
api::client::sync::sync_events::v3::{
|
||||
@@ -181,6 +181,9 @@ pub(super) async fn load_left_room(
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
let state_events =
|
||||
StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect());
|
||||
|
||||
Ok(Some(assign!(LeftRoom::new(), {
|
||||
account_data: RoomAccountData::new(),
|
||||
timeline: assign!(Timeline::new(), {
|
||||
@@ -188,7 +191,11 @@ pub(super) async fn load_left_room(
|
||||
prev_batch: Some(current_count.to_string()),
|
||||
events: raw_timeline_pdus,
|
||||
}),
|
||||
state: State::Before(StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect())),
|
||||
state: if sync_context.use_state_after {
|
||||
State::After(state_events)
|
||||
} else {
|
||||
State::Before(state_events)
|
||||
},
|
||||
})))
|
||||
}
|
||||
|
||||
@@ -233,29 +240,8 @@ async fn build_left_state_and_timeline(
|
||||
)
|
||||
.await?;
|
||||
|
||||
let timeline_start_shortstatehash = async {
|
||||
if let Some((_, pdu)) = timeline.pdus.front() {
|
||||
if let Ok(shortstatehash) = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.pdu_shortstatehash(&pdu.event_id)
|
||||
.await
|
||||
{
|
||||
return shortstatehash;
|
||||
}
|
||||
}
|
||||
|
||||
// the timeline generally should not be empty (see the TODO further down),
|
||||
// but in case it is we use `leave_shortstatehash` as the state to
|
||||
// send
|
||||
leave_shortstatehash
|
||||
};
|
||||
|
||||
let lazily_loaded_members =
|
||||
prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders());
|
||||
|
||||
let (timeline_start_shortstatehash, lazily_loaded_members) =
|
||||
join(timeline_start_shortstatehash, lazily_loaded_members).await;
|
||||
prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders()).await;
|
||||
|
||||
// TODO: calculate incremental state for incremental syncs.
|
||||
// always calculating initial state _works_ but returns more data and does
|
||||
@@ -263,7 +249,9 @@ async fn build_left_state_and_timeline(
|
||||
let mut state = build_state_initial(
|
||||
services,
|
||||
syncing_user,
|
||||
timeline_start_shortstatehash,
|
||||
leave_shortstatehash,
|
||||
&timeline,
|
||||
sync_context.use_state_after,
|
||||
lazily_loaded_members.as_ref(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -11,12 +11,11 @@
|
||||
use axum::extract::State;
|
||||
use axum_client_ip::ClientIp;
|
||||
use conduwuit::{
|
||||
Err, Result, at, extract_variant,
|
||||
Err, Result, at, error, extract_variant,
|
||||
utils::{
|
||||
ReadyExt, TryFutureExtExt,
|
||||
stream::{BroadbandExt, Tools, WidebandExt},
|
||||
},
|
||||
warn,
|
||||
};
|
||||
use conduwuit_service::Services;
|
||||
use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture};
|
||||
@@ -110,6 +109,9 @@ struct SyncContext<'a> {
|
||||
/// The sync filter, which the client uses to specify what data should be
|
||||
/// included in the sync response.
|
||||
filter: &'a FilterDefinition,
|
||||
/// Whether the state at the end of the timeline should be used when
|
||||
/// calculating state diffs for sync.
|
||||
use_state_after: bool,
|
||||
}
|
||||
|
||||
impl<'a> SyncContext<'a> {
|
||||
@@ -263,6 +265,7 @@ pub(crate) async fn build_sync_events(
|
||||
current_count,
|
||||
full_state,
|
||||
filter: &filter,
|
||||
use_state_after: body.use_state_after,
|
||||
};
|
||||
|
||||
let joined_rooms = services
|
||||
@@ -275,7 +278,7 @@ pub(crate) async fn build_sync_events(
|
||||
match joined_room {
|
||||
| Ok((room, updates)) => Some((room_id, room, updates)),
|
||||
| Err(err) => {
|
||||
warn!(?err, %room_id, "error loading joined room");
|
||||
error!(?err, %room_id, "error loading joined room");
|
||||
None
|
||||
},
|
||||
}
|
||||
@@ -304,7 +307,7 @@ pub(crate) async fn build_sync_events(
|
||||
| Ok(Some(left_room)) => Some((room_id, left_room)),
|
||||
| Ok(None) => None,
|
||||
| Err(err) => {
|
||||
warn!(?err, %room_id, "error loading joined room");
|
||||
error!(?err, %room_id, "error loading joined room");
|
||||
None
|
||||
},
|
||||
}
|
||||
|
||||
+63
-146
@@ -1,11 +1,8 @@
|
||||
use std::{collections::BTreeSet, ops::ControlFlow};
|
||||
use std::collections::HashSet;
|
||||
|
||||
use conduwuit::{
|
||||
Result, at, is_equal_to,
|
||||
matrix::{
|
||||
Event,
|
||||
pdu::{PduCount, PduEvent},
|
||||
},
|
||||
Result, at,
|
||||
matrix::{Event, pdu::PduEvent},
|
||||
utils::{
|
||||
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
|
||||
stream::{BroadbandExt, TryIgnore},
|
||||
@@ -16,9 +13,7 @@
|
||||
rooms::{lazy_loading::MemberSet, short::ShortStateHash},
|
||||
};
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use itertools::Itertools;
|
||||
use ruma::{OwnedEventId, RoomId, UserId, events::StateEventType};
|
||||
use service::rooms::short::ShortEventId;
|
||||
use ruma::{OwnedEventId, UserId, events::StateEventType};
|
||||
use tracing::trace;
|
||||
|
||||
use crate::client::TimelinePdus;
|
||||
@@ -38,14 +33,22 @@
|
||||
pub(super) async fn build_state_initial(
|
||||
services: &Services,
|
||||
sender_user: &UserId,
|
||||
timeline_start_shortstatehash: ShortStateHash,
|
||||
timeline_end_shortstatehash: ShortStateHash,
|
||||
timeline: &TimelinePdus,
|
||||
use_state_after: bool,
|
||||
lazily_loaded_members: Option<&MemberSet>,
|
||||
) -> Result<Vec<PduEvent>> {
|
||||
let event_ids_in_timeline: HashSet<_> =
|
||||
timeline.pdus.iter().map(|pdu| &pdu.1.event_id).collect();
|
||||
|
||||
// load the keys and event IDs of the state events at the start of the timeline
|
||||
let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(timeline_start_shortstatehash)
|
||||
.state_full_ids(timeline_end_shortstatehash)
|
||||
.ready_filter(|(_, event_id)| {
|
||||
use_state_after || !event_ids_in_timeline.contains(event_id)
|
||||
})
|
||||
.unzip()
|
||||
.await;
|
||||
|
||||
@@ -92,82 +95,32 @@ pub(super) async fn build_state_initial(
|
||||
pub(super) async fn build_state_incremental<'a>(
|
||||
services: &Services,
|
||||
sender_user: &'a UserId,
|
||||
room_id: &RoomId,
|
||||
last_sync_end_count: PduCount,
|
||||
last_sync_end_shortstatehash: ShortStateHash,
|
||||
timeline_start_shortstatehash: ShortStateHash,
|
||||
timeline_end_shortstatehash: ShortStateHash,
|
||||
timeline: &TimelinePdus,
|
||||
use_state_after: bool,
|
||||
lazily_loaded_members: Option<&'a MemberSet>,
|
||||
) -> Result<Vec<PduEvent>> {
|
||||
/*
|
||||
NB: a limited sync is one where `timeline.limited == true`. Synapse calls this a "gappy" sync internally.
|
||||
let mut state_event_ids: HashSet<OwnedEventId> = HashSet::new();
|
||||
|
||||
The algorithm implemented in this function is, currently, quite different from the algorithm vaguely described
|
||||
by the Matrix specification. This is because the specification's description of the `state` property does not accurately
|
||||
reflect how Synapse behaves, and therefore how client SDKs behave. Notable differences include:
|
||||
1. We do not compute the delta using the naive approach of "every state event from the end of the last sync
|
||||
up to the start of this sync's timeline". see below for details.
|
||||
2. If lazy-loading is enabled, we include lazily-loaded membership events. The specific users to include are determined
|
||||
elsewhere and supplied to this function in the `lazily_loaded_members` parameter.
|
||||
*/
|
||||
trace!(
|
||||
%use_state_after,
|
||||
%last_sync_end_shortstatehash,
|
||||
%timeline_end_shortstatehash,
|
||||
"computing state for incremental sync"
|
||||
);
|
||||
|
||||
/*
|
||||
the `state` property of an incremental sync which isn't limited are _usually_ empty.
|
||||
(note: the specification says that the `state` property is _always_ empty for limited syncs, which is incorrect.)
|
||||
however, if an event in the timeline (`timeline.pdus`) merges a split in the room's DAG (i.e. has multiple `prev_events`),
|
||||
the state at the _end_ of the timeline may include state events which were merged in and don't exist in the state
|
||||
at the _start_ of the timeline. because this is uncommon, we check here to see if any events in the timeline
|
||||
merged a split in the DAG.
|
||||
// Fetch lazy-loaded membership events if lazy-loading is enabled
|
||||
if let Some(lazily_loaded_members) = lazily_loaded_members
|
||||
&& !lazily_loaded_members.is_empty()
|
||||
{
|
||||
trace!("including lazy membership events for members: {:?}", lazily_loaded_members);
|
||||
|
||||
see: https://github.com/element-hq/synapse/issues/16941
|
||||
*/
|
||||
|
||||
let timeline_is_linear = timeline.pdus.is_empty() || {
|
||||
let last_pdu_of_last_sync = services
|
||||
services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(room_id, Some(last_sync_end_count.saturating_add(1)))
|
||||
.boxed()
|
||||
.next()
|
||||
.await
|
||||
.transpose()
|
||||
.expect("last sync should have had some PDUs")
|
||||
.map(at!(1));
|
||||
|
||||
// make sure the prev_events of each pdu in the timeline refer only to the
|
||||
// previous pdu
|
||||
timeline
|
||||
.pdus
|
||||
.iter()
|
||||
.try_fold(last_pdu_of_last_sync.map(|pdu| pdu.event_id), |prev_event_id, (_, pdu)| {
|
||||
if let Ok(pdu_prev_event_id) = pdu.prev_events.iter().exactly_one() {
|
||||
if prev_event_id
|
||||
.as_ref()
|
||||
.is_none_or(is_equal_to!(pdu_prev_event_id))
|
||||
{
|
||||
return ControlFlow::Continue(Some(pdu_prev_event_id.to_owned()));
|
||||
}
|
||||
}
|
||||
|
||||
trace!(
|
||||
"pdu {:?} has split prev_events (expected {:?}): {:?}",
|
||||
pdu.event_id, prev_event_id, pdu.prev_events
|
||||
);
|
||||
ControlFlow::Break(())
|
||||
})
|
||||
.is_continue()
|
||||
};
|
||||
|
||||
if timeline_is_linear && !timeline.limited {
|
||||
// if there are no splits in the DAG and the timeline isn't limited, then
|
||||
// `state` will always be empty unless lazy loading is enabled.
|
||||
|
||||
if let Some(lazily_loaded_members) = lazily_loaded_members {
|
||||
if !timeline.pdus.is_empty() {
|
||||
// lazy loading is enabled, so we return the membership events which were
|
||||
// requested by the caller.
|
||||
let lazy_membership_events: Vec<_> = lazily_loaded_members
|
||||
.short
|
||||
.multi_get_eventid_from_short::<'_, OwnedEventId, _>(
|
||||
lazily_loaded_members
|
||||
.iter()
|
||||
.stream()
|
||||
.broad_filter_map(|user_id| async move {
|
||||
@@ -178,71 +131,24 @@ pub(super) async fn build_state_incremental<'a>(
|
||||
services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_get(
|
||||
timeline_start_shortstatehash,
|
||||
.state_get_shortid(
|
||||
timeline_end_shortstatehash,
|
||||
&StateEventType::RoomMember,
|
||||
user_id.as_str(),
|
||||
)
|
||||
.ok()
|
||||
.await
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
if !lazy_membership_events.is_empty() {
|
||||
trace!(
|
||||
"syncing lazy membership events for members: {:?}",
|
||||
lazy_membership_events
|
||||
.iter()
|
||||
.map(|pdu| pdu.state_key().unwrap())
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
return Ok(lazy_membership_events);
|
||||
}
|
||||
}
|
||||
|
||||
// lazy loading is disabled, `state` is empty.
|
||||
return Ok(vec![]);
|
||||
}),
|
||||
)
|
||||
.ignore_err()
|
||||
.ready_for_each(|event_id| {
|
||||
state_event_ids.insert(event_id);
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
/*
|
||||
at this point, either the timeline is `limited` or the DAG has a split in it. this necessitates
|
||||
computing the incremental state (which may be empty).
|
||||
|
||||
NOTE: this code path does not use the `lazy_membership_events` parameter. any changes to membership will be included
|
||||
in the incremental state. therefore, the incremental state may include "redundant" membership events,
|
||||
which we do not filter out because A. the spec forbids lazy-load filtering if the timeline is `limited`,
|
||||
and B. DAG splits which require sending extra membership state events are (probably) uncommon enough that
|
||||
the performance penalty is acceptable.
|
||||
*/
|
||||
|
||||
trace!(%timeline_is_linear, %timeline.limited, "computing state for incremental sync");
|
||||
|
||||
// fetch the shorteventids of state events in the timeline
|
||||
let state_events_in_timeline: BTreeSet<ShortEventId> = services
|
||||
.rooms
|
||||
.short
|
||||
.multi_get_or_create_shorteventid(timeline.pdus.iter().filter_map(|(_, pdu)| {
|
||||
if pdu.state_key().is_some() {
|
||||
Some(pdu.event_id.as_ref())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}))
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
trace!("{} state events in timeline", state_events_in_timeline.len());
|
||||
|
||||
/*
|
||||
fetch the state events which were added since the last sync.
|
||||
|
||||
specifically we fetch the difference between the state at the last sync and the state at the _end_
|
||||
of the timeline, and then we filter out state events in the timeline itself using the shorteventids we fetched.
|
||||
this is necessary to account for splits in the DAG, as explained above.
|
||||
*/
|
||||
let state_diff = services
|
||||
// Fetch the state events added since the last sync.
|
||||
services
|
||||
.rooms
|
||||
.short
|
||||
.multi_get_eventid_from_short::<'_, OwnedEventId, _>(
|
||||
@@ -252,18 +158,29 @@ pub(super) async fn build_state_incremental<'a>(
|
||||
.state_added((last_sync_end_shortstatehash, timeline_end_shortstatehash))
|
||||
.await?
|
||||
.stream()
|
||||
.ready_filter_map(|(_, shorteventid)| {
|
||||
if state_events_in_timeline.contains(&shorteventid) {
|
||||
None
|
||||
} else {
|
||||
Some(shorteventid)
|
||||
}
|
||||
}),
|
||||
.map(at!(1)),
|
||||
)
|
||||
.ignore_err();
|
||||
.ignore_err()
|
||||
.ready_for_each(|event_id| {
|
||||
state_event_ids.insert(event_id);
|
||||
})
|
||||
.await;
|
||||
|
||||
// finally, fetch the PDU contents and collect them into a vec
|
||||
let state_diff_pdus = state_diff
|
||||
if !use_state_after {
|
||||
// If state_after isn't enabled, filter out state events which also exist
|
||||
// in the timeline. If splits exist in the DAG, this may not be exactly the same
|
||||
// thing as the state diff ending at the start of the timeline, but Synapse
|
||||
// also does this and it's technically more useful behavior anyway.
|
||||
// See: https://github.com/element-hq/synapse/issues/16941
|
||||
|
||||
for (_, pdu) in &timeline.pdus {
|
||||
state_event_ids.remove(pdu.event_id());
|
||||
}
|
||||
}
|
||||
|
||||
// Finally, fetch the PDU contents and collect them into a vec
|
||||
let state_diff_pdus = state_event_ids
|
||||
.stream()
|
||||
.broad_filter_map(|event_id| async move {
|
||||
services
|
||||
.rooms
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
|
||||
future::ReadyEqExt,
|
||||
math::{ruma_from_usize, usize_from_ruma},
|
||||
stream::WidebandExt,
|
||||
stream::{TryIgnore, WidebandExt},
|
||||
},
|
||||
warn,
|
||||
};
|
||||
@@ -41,6 +41,7 @@
|
||||
uint,
|
||||
};
|
||||
use service::account_data::AnyRawAccountDataEvent;
|
||||
use tokio::pin;
|
||||
|
||||
use super::share_encrypted_room;
|
||||
use crate::{
|
||||
@@ -858,12 +859,31 @@ async fn collect_e2ee<'a, Rooms>(
|
||||
continue;
|
||||
};
|
||||
|
||||
let since_shortstatehash = services
|
||||
.rooms
|
||||
.user
|
||||
.get_token_shortstatehash(room_id, globalsince)
|
||||
.await
|
||||
.ok();
|
||||
let since_shortstatehash = async {
|
||||
pin! {
|
||||
let pdus_rev = services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(room_id, Some(PduCount::Normal(globalsince.saturating_sub(1))))
|
||||
.ignore_err();
|
||||
}
|
||||
|
||||
let (count, pdu_at_last_sync_end) = pdus_rev.next().await?;
|
||||
|
||||
if matches!(count, PduCount::Backfilled(_)) {
|
||||
None
|
||||
} else {
|
||||
Some(
|
||||
services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.pdu_shortstatehash(&pdu_at_last_sync_end.event_id)
|
||||
.await
|
||||
.expect("pdu should have a shortstatehash"),
|
||||
)
|
||||
}
|
||||
}
|
||||
.await;
|
||||
|
||||
let encrypted_room = services
|
||||
.rooms
|
||||
|
||||
@@ -21,6 +21,7 @@ pub fn versions() -> Vec<String> {
|
||||
"v1.12".to_owned(),
|
||||
"v1.13".to_owned(),
|
||||
"v1.14".to_owned(),
|
||||
"v1.16".to_owned(),
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
@@ -193,12 +193,7 @@ pub(super) fn open_list(db: &Arc<Engine>, maps: &[Descriptor]) -> Result<Maps> {
|
||||
},
|
||||
Descriptor {
|
||||
name: "roomsynctoken_shortstatehash",
|
||||
file_shape: 3,
|
||||
val_size_hint: Some(8),
|
||||
block_size: 512,
|
||||
compression_level: 3,
|
||||
bottommost_level: Some(6),
|
||||
..descriptor::SEQUENTIAL
|
||||
..descriptor::DROPPED
|
||||
},
|
||||
Descriptor {
|
||||
name: "roomuserdataid_accountdata",
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use conduwuit::{Result, implement};
|
||||
use database::{Database, Deserialized, Map};
|
||||
use database::{Deserialized, Map};
|
||||
use ruma::{RoomId, UserId};
|
||||
|
||||
use crate::{Dep, globals, rooms, rooms::short::ShortStateHash};
|
||||
use crate::{Dep, globals};
|
||||
|
||||
pub struct Service {
|
||||
db: Data,
|
||||
@@ -12,32 +12,25 @@ pub struct Service {
|
||||
}
|
||||
|
||||
struct Data {
|
||||
db: Arc<Database>,
|
||||
userroomid_notificationcount: Arc<Map>,
|
||||
userroomid_highlightcount: Arc<Map>,
|
||||
roomuserid_lastnotificationread: Arc<Map>,
|
||||
roomsynctoken_shortstatehash: Arc<Map>,
|
||||
}
|
||||
|
||||
struct Services {
|
||||
globals: Dep<globals::Service>,
|
||||
short: Dep<rooms::short::Service>,
|
||||
}
|
||||
|
||||
impl crate::Service for Service {
|
||||
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
Ok(Arc::new(Self {
|
||||
db: Data {
|
||||
db: args.db.clone(),
|
||||
userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(),
|
||||
userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(),
|
||||
roomuserid_lastnotificationread: args.db["userroomid_highlightcount"].clone(),
|
||||
roomsynctoken_shortstatehash: args.db["roomsynctoken_shortstatehash"].clone(),
|
||||
},
|
||||
|
||||
services: Services {
|
||||
globals: args.depend::<globals::Service>("globals"),
|
||||
short: args.depend::<rooms::short::Service>("rooms::short"),
|
||||
},
|
||||
}))
|
||||
}
|
||||
@@ -90,40 +83,3 @@ pub async fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -
|
||||
.deserialized()
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub async fn associate_token_shortstatehash(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
token: u64,
|
||||
shortstatehash: ShortStateHash,
|
||||
) {
|
||||
let shortroomid = self
|
||||
.services
|
||||
.short
|
||||
.get_shortroomid(room_id)
|
||||
.await
|
||||
.expect("room exists");
|
||||
|
||||
let _cork = self.db.db.cork();
|
||||
let key: &[u64] = &[shortroomid, token];
|
||||
self.db
|
||||
.roomsynctoken_shortstatehash
|
||||
.put(key, shortstatehash);
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub async fn get_token_shortstatehash(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
token: u64,
|
||||
) -> Result<ShortStateHash> {
|
||||
let shortroomid = self.services.short.get_shortroomid(room_id).await?;
|
||||
|
||||
let key: &[u64] = &[shortroomid, token];
|
||||
self.db
|
||||
.roomsynctoken_shortstatehash
|
||||
.qry(key)
|
||||
.await
|
||||
.deserialized()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user