Compare commits

...

9 Commits

Author SHA1 Message Date
Ginger 8182ab3c29 fix: Correctly sync newly created rooms 2026-05-27 14:14:36 -04:00
Ginger 8ec13eaae1 fix: Don't panic on missing SSH in sliding sync 2026-05-27 14:14:36 -04:00
Ginger 760e4dadbb fix: Additional sync logic fixes 2026-05-27 14:14:35 -04:00
Ginger ec9d032c89 fix: Upgrade warning on room load failures to error 2026-05-27 14:14:35 -04:00
Ginger 6353d1123e fix: Don't panic on missing SSH 2026-05-27 14:14:35 -04:00
Ginger d54d06038a fix: Calculate state at end of last sync correctly 2026-05-27 14:14:35 -04:00
Ginger ea6c5c1cfd chore: News fragments 2026-05-27 14:14:35 -04:00
Ginger 5325f89b1b feat: Add support for state_after 2026-05-27 14:14:35 -04:00
Ginger 976eedf024 feat: Remove all uses of roomsynctoken_shortstatehash 2026-05-27 14:14:35 -04:00
11 changed files with 197 additions and 315 deletions
+1
View File
@@ -0,0 +1 @@
Added support for Matrix 1.16's `state_after` feature, allowing clients which understand it to sync room state changes more reliably. Contributed by @ginger.
+1
View File
@@ -0,0 +1 @@
Adjusted legacy sync logic to no longer use the `roomsynctoken_shortstatehash` database column. Once this change has been confirmed to be stable and reliable, a future update will remove it entirely, significantly decreasing database sizes. Contributed by @ginger.
+7
View File
@@ -48,6 +48,13 @@ async fn load_timeline(
ending_count: Option<PduCount>,
limit: usize,
) -> Result<TimelinePdus> {
if let (Some(starting_count), Some(ending_count)) = (starting_count, ending_count) {
debug_assert!(
starting_count <= ending_count,
"starting count {starting_count} > ending count {ending_count}"
);
}
let mut pdu_stream = match starting_count {
| Some(starting_count) => {
let last_timeline_count = services
+74 -81
View File
@@ -38,6 +38,7 @@
uint,
};
use service::{account_data::AnyRawAccountDataEvent, rooms::short::ShortStateHash};
use tokio::pin;
use super::{load_timeline, share_encrypted_room};
use crate::client::{
@@ -96,12 +97,19 @@ pub(super) async fn load_joined_room(
);
}
let state_events =
StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect());
let joined_room = assign!(JoinedRoom::new(), {
account_data,
summary: summary.unwrap_or_default(),
unread_notifications: notification_counts.unwrap_or_default(),
timeline,
state: RoomState::Before(StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect())),
state: if sync_context.use_state_after {
RoomState::After(state_events)
} else {
RoomState::Before(state_events)
},
ephemeral,
unread_thread_notifications: BTreeMap::new(),
});
@@ -344,7 +352,7 @@ struct ShortStateHashes {
#[tracing::instrument(level = "debug", skip_all)]
async fn fetch_shortstatehashes(
services: &Services,
SyncContext { last_sync_end_count, current_count, .. }: SyncContext<'_>,
SyncContext { last_sync_end_count, .. }: SyncContext<'_>,
room_id: &RoomId,
) -> Result<ShortStateHashes> {
// the room state currently.
@@ -354,46 +362,45 @@ async fn fetch_shortstatehashes(
.rooms
.state
.get_room_shortstatehash(room_id)
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))))
.await?;
// the room state as of the end of the last sync.
// this will be None if we are doing an initial sync or if we just joined this
// room.
// The room state as of the end of the last sync.
// This will be None if we are doing an initial sync.
let last_sync_end_shortstatehash =
OptionFuture::from(last_sync_end_count.map(|last_sync_end_count| {
// look up the shortstatehash saved by the last sync's call to
// `associate_token_shortstatehash`
services
.rooms
.user
.get_token_shortstatehash(room_id, last_sync_end_count)
.inspect_err(move |_| {
debug_warn!(
token = last_sync_end_count,
"Room has no shortstatehash for this token"
);
})
.ok()
OptionFuture::from(last_sync_end_count.map(async |last_sync_end_count| {
pin! {
let pdus = services
.rooms
.timeline
.pdus(room_id, Some(PduCount::Normal(last_sync_end_count)))
.ignore_err();
}
match pdus.next().await {
| Some((_, pdu_after_last_sync_end)) => {
trace!(?pdu_after_last_sync_end.event_id, "pdu at last sync end");
Some(
services
.rooms
.state_accessor
.pdu_shortstatehash(&pdu_after_last_sync_end.event_id)
.await
.map_err(|err| {
err!("Last sync end PDU has no shortstatehash: {err}")
}),
)
},
| None => {
// No events have been sent since the last sync, or we just joined this room
None
},
}
}))
.map(Option::flatten)
.map(Ok);
let (current_shortstatehash, last_sync_end_shortstatehash) =
try_join(current_shortstatehash, last_sync_end_shortstatehash).await?;
/*
associate the `current_count` with the `current_shortstatehash`, so we can
use it on the next sync as the `last_sync_end_shortstatehash`.
TODO: the table written to by this call grows extremely fast, gaining one new entry for each
joined room on _every single sync request_. we need to find a better way to remember the shortstatehash
between syncs.
*/
services
.rooms
.user
.associate_token_shortstatehash(room_id, current_count, current_shortstatehash)
.await;
.await
.flatten()
.transpose()?;
Ok(ShortStateHashes {
current_shortstatehash,
@@ -452,6 +459,7 @@ async fn build_state_events(
syncing_user,
last_sync_end_count,
full_state,
use_state_after,
..
} = sync_context;
@@ -460,32 +468,15 @@ async fn build_state_events(
last_sync_end_shortstatehash,
} = shortstatehashes;
// the spec states that the `state` property only includes state events up to
// the beginning of the timeline, so we determine the state of the syncing room
// as of the first timeline event. NOTE: this explanation is not entirely
// accurate; see the implementation of `build_state_incremental`.
let timeline_start_shortstatehash = async {
if let Some((_, pdu)) = timeline.pdus.front() {
if let Ok(shortstatehash) = services
.rooms
.state_accessor
.pdu_shortstatehash(&pdu.event_id)
.await
{
return shortstatehash;
}
}
current_shortstatehash
};
if timeline.pdus.is_empty() {
// If the timeline is empty there can't possibly be any changes to the state
return Ok(vec![]);
}
// the user IDs of members whose membership needs to be sent to the client, if
// lazy-loading is enabled.
let lazily_loaded_members =
prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders());
let (timeline_start_shortstatehash, lazily_loaded_members) =
join(timeline_start_shortstatehash, lazily_loaded_members).await;
prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders()).await;
// compute the state delta between the previous sync and this sync.
match (last_sync_end_count, last_sync_end_shortstatehash) {
@@ -494,16 +485,14 @@ async fn build_state_events(
is Some (meaning the syncing user didn't just join this room for the first time ever), and `full_state` is false,
then use `build_state_incremental`.
*/
| (Some(last_sync_end_count), Some(last_sync_end_shortstatehash)) if !full_state =>
| (Some(_), Some(last_sync_end_shortstatehash)) if !full_state =>
build_state_incremental(
services,
syncing_user,
room_id,
PduCount::Normal(last_sync_end_count),
last_sync_end_shortstatehash,
timeline_start_shortstatehash,
current_shortstatehash,
timeline,
use_state_after,
lazily_loaded_members.as_ref(),
)
.boxed()
@@ -517,7 +506,9 @@ async fn build_state_events(
build_state_initial(
services,
syncing_user,
timeline_start_shortstatehash,
current_shortstatehash,
timeline,
use_state_after,
lazily_loaded_members.as_ref(),
)
.boxed()
@@ -598,23 +589,25 @@ async fn check_joined_since_last_sync(
ShortStateHashes { last_sync_end_shortstatehash, .. }: ShortStateHashes,
SyncContext { syncing_user, .. }: SyncContext<'_>,
) -> Result<bool> {
// fetch the syncing user's membership event during the last sync.
// this will be None if `previous_sync_end_shortstatehash` is None.
let membership_during_previous_sync = match last_sync_end_shortstatehash {
| Some(last_sync_end_shortstatehash) => services
.rooms
.state_accessor
.state_get_content(
last_sync_end_shortstatehash,
&StateEventType::RoomMember,
syncing_user.as_str(),
)
.await
.inspect_err(|_| debug_warn!("User has no previous membership"))
.ok(),
| None => None,
let Some(last_sync_end_shortstatehash) = last_sync_end_shortstatehash else {
// For initial syncs always return false, since there's no "last sync" for the
// user to have joined since.
return Ok(false);
};
// Fetch the syncing user's membership event during the last sync.
let membership_during_previous_sync = services
.rooms
.state_accessor
.state_get_content(
last_sync_end_shortstatehash,
&StateEventType::RoomMember,
syncing_user.as_str(),
)
.await
.inspect_err(|_| debug_warn!("User has no previous membership"))
.ok();
// TODO: If the requesting user got state-reset out of the room, this
// will be `true` when it shouldn't be. this function should never be called
// in that situation, but it may be if the membership cache didn't get updated.
+13 -25
View File
@@ -4,7 +4,7 @@
trace,
utils::{self, IterStream, future::ReadyEqExt, stream::WidebandExt as _},
};
use futures::{StreamExt, future::join};
use futures::StreamExt;
use ruma::{
EventId, OwnedRoomId, RoomId,
api::client::sync::sync_events::v3::{
@@ -181,6 +181,9 @@ pub(super) async fn load_left_room(
.collect::<Vec<_>>()
.await;
let state_events =
StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect());
Ok(Some(assign!(LeftRoom::new(), {
account_data: RoomAccountData::new(),
timeline: assign!(Timeline::new(), {
@@ -188,7 +191,11 @@ pub(super) async fn load_left_room(
prev_batch: Some(current_count.to_string()),
events: raw_timeline_pdus,
}),
state: State::Before(StateEvents::with_events(state_events.into_iter().map(Event::into_format).collect())),
state: if sync_context.use_state_after {
State::After(state_events)
} else {
State::Before(state_events)
},
})))
}
@@ -233,29 +240,8 @@ async fn build_left_state_and_timeline(
)
.await?;
let timeline_start_shortstatehash = async {
if let Some((_, pdu)) = timeline.pdus.front() {
if let Ok(shortstatehash) = services
.rooms
.state_accessor
.pdu_shortstatehash(&pdu.event_id)
.await
{
return shortstatehash;
}
}
// the timeline generally should not be empty (see the TODO further down),
// but in case it is we use `leave_shortstatehash` as the state to
// send
leave_shortstatehash
};
let lazily_loaded_members =
prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders());
let (timeline_start_shortstatehash, lazily_loaded_members) =
join(timeline_start_shortstatehash, lazily_loaded_members).await;
prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders()).await;
// TODO: calculate incremental state for incremental syncs.
// always calculating initial state _works_ but returns more data and does
@@ -263,7 +249,9 @@ async fn build_left_state_and_timeline(
let mut state = build_state_initial(
services,
syncing_user,
timeline_start_shortstatehash,
leave_shortstatehash,
&timeline,
sync_context.use_state_after,
lazily_loaded_members.as_ref(),
)
.await?;
+7 -4
View File
@@ -11,12 +11,11 @@
use axum::extract::State;
use axum_client_ip::ClientIp;
use conduwuit::{
Err, Result, at, extract_variant,
Err, Result, at, error, extract_variant,
utils::{
ReadyExt, TryFutureExtExt,
stream::{BroadbandExt, Tools, WidebandExt},
},
warn,
};
use conduwuit_service::Services;
use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture};
@@ -110,6 +109,9 @@ struct SyncContext<'a> {
/// The sync filter, which the client uses to specify what data should be
/// included in the sync response.
filter: &'a FilterDefinition,
/// Whether the state at the end of the timeline should be used when
/// calculating state diffs for sync.
use_state_after: bool,
}
impl<'a> SyncContext<'a> {
@@ -263,6 +265,7 @@ pub(crate) async fn build_sync_events(
current_count,
full_state,
filter: &filter,
use_state_after: body.use_state_after,
};
let joined_rooms = services
@@ -275,7 +278,7 @@ pub(crate) async fn build_sync_events(
match joined_room {
| Ok((room, updates)) => Some((room_id, room, updates)),
| Err(err) => {
warn!(?err, %room_id, "error loading joined room");
error!(?err, %room_id, "error loading joined room");
None
},
}
@@ -304,7 +307,7 @@ pub(crate) async fn build_sync_events(
| Ok(Some(left_room)) => Some((room_id, left_room)),
| Ok(None) => None,
| Err(err) => {
warn!(?err, %room_id, "error loading joined room");
error!(?err, %room_id, "error loading joined room");
None
},
}
+63 -146
View File
@@ -1,11 +1,8 @@
use std::{collections::BTreeSet, ops::ControlFlow};
use std::collections::HashSet;
use conduwuit::{
Result, at, is_equal_to,
matrix::{
Event,
pdu::{PduCount, PduEvent},
},
Result, at,
matrix::{Event, pdu::PduEvent},
utils::{
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
stream::{BroadbandExt, TryIgnore},
@@ -16,9 +13,7 @@
rooms::{lazy_loading::MemberSet, short::ShortStateHash},
};
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use ruma::{OwnedEventId, RoomId, UserId, events::StateEventType};
use service::rooms::short::ShortEventId;
use ruma::{OwnedEventId, UserId, events::StateEventType};
use tracing::trace;
use crate::client::TimelinePdus;
@@ -38,14 +33,22 @@
pub(super) async fn build_state_initial(
services: &Services,
sender_user: &UserId,
timeline_start_shortstatehash: ShortStateHash,
timeline_end_shortstatehash: ShortStateHash,
timeline: &TimelinePdus,
use_state_after: bool,
lazily_loaded_members: Option<&MemberSet>,
) -> Result<Vec<PduEvent>> {
let event_ids_in_timeline: HashSet<_> =
timeline.pdus.iter().map(|pdu| &pdu.1.event_id).collect();
// load the keys and event IDs of the state events at the start of the timeline
let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services
.rooms
.state_accessor
.state_full_ids(timeline_start_shortstatehash)
.state_full_ids(timeline_end_shortstatehash)
.ready_filter(|(_, event_id)| {
use_state_after || !event_ids_in_timeline.contains(event_id)
})
.unzip()
.await;
@@ -92,82 +95,32 @@ pub(super) async fn build_state_initial(
pub(super) async fn build_state_incremental<'a>(
services: &Services,
sender_user: &'a UserId,
room_id: &RoomId,
last_sync_end_count: PduCount,
last_sync_end_shortstatehash: ShortStateHash,
timeline_start_shortstatehash: ShortStateHash,
timeline_end_shortstatehash: ShortStateHash,
timeline: &TimelinePdus,
use_state_after: bool,
lazily_loaded_members: Option<&'a MemberSet>,
) -> Result<Vec<PduEvent>> {
/*
NB: a limited sync is one where `timeline.limited == true`. Synapse calls this a "gappy" sync internally.
let mut state_event_ids: HashSet<OwnedEventId> = HashSet::new();
The algorithm implemented in this function is, currently, quite different from the algorithm vaguely described
by the Matrix specification. This is because the specification's description of the `state` property does not accurately
reflect how Synapse behaves, and therefore how client SDKs behave. Notable differences include:
1. We do not compute the delta using the naive approach of "every state event from the end of the last sync
up to the start of this sync's timeline". see below for details.
2. If lazy-loading is enabled, we include lazily-loaded membership events. The specific users to include are determined
elsewhere and supplied to this function in the `lazily_loaded_members` parameter.
*/
trace!(
%use_state_after,
%last_sync_end_shortstatehash,
%timeline_end_shortstatehash,
"computing state for incremental sync"
);
/*
the `state` property of an incremental sync which isn't limited are _usually_ empty.
(note: the specification says that the `state` property is _always_ empty for limited syncs, which is incorrect.)
however, if an event in the timeline (`timeline.pdus`) merges a split in the room's DAG (i.e. has multiple `prev_events`),
the state at the _end_ of the timeline may include state events which were merged in and don't exist in the state
at the _start_ of the timeline. because this is uncommon, we check here to see if any events in the timeline
merged a split in the DAG.
// Fetch lazy-loaded membership events if lazy-loading is enabled
if let Some(lazily_loaded_members) = lazily_loaded_members
&& !lazily_loaded_members.is_empty()
{
trace!("including lazy membership events for members: {:?}", lazily_loaded_members);
see: https://github.com/element-hq/synapse/issues/16941
*/
let timeline_is_linear = timeline.pdus.is_empty() || {
let last_pdu_of_last_sync = services
services
.rooms
.timeline
.pdus_rev(room_id, Some(last_sync_end_count.saturating_add(1)))
.boxed()
.next()
.await
.transpose()
.expect("last sync should have had some PDUs")
.map(at!(1));
// make sure the prev_events of each pdu in the timeline refer only to the
// previous pdu
timeline
.pdus
.iter()
.try_fold(last_pdu_of_last_sync.map(|pdu| pdu.event_id), |prev_event_id, (_, pdu)| {
if let Ok(pdu_prev_event_id) = pdu.prev_events.iter().exactly_one() {
if prev_event_id
.as_ref()
.is_none_or(is_equal_to!(pdu_prev_event_id))
{
return ControlFlow::Continue(Some(pdu_prev_event_id.to_owned()));
}
}
trace!(
"pdu {:?} has split prev_events (expected {:?}): {:?}",
pdu.event_id, prev_event_id, pdu.prev_events
);
ControlFlow::Break(())
})
.is_continue()
};
if timeline_is_linear && !timeline.limited {
// if there are no splits in the DAG and the timeline isn't limited, then
// `state` will always be empty unless lazy loading is enabled.
if let Some(lazily_loaded_members) = lazily_loaded_members {
if !timeline.pdus.is_empty() {
// lazy loading is enabled, so we return the membership events which were
// requested by the caller.
let lazy_membership_events: Vec<_> = lazily_loaded_members
.short
.multi_get_eventid_from_short::<'_, OwnedEventId, _>(
lazily_loaded_members
.iter()
.stream()
.broad_filter_map(|user_id| async move {
@@ -178,71 +131,24 @@ pub(super) async fn build_state_incremental<'a>(
services
.rooms
.state_accessor
.state_get(
timeline_start_shortstatehash,
.state_get_shortid(
timeline_end_shortstatehash,
&StateEventType::RoomMember,
user_id.as_str(),
)
.ok()
.await
})
.collect()
.await;
if !lazy_membership_events.is_empty() {
trace!(
"syncing lazy membership events for members: {:?}",
lazy_membership_events
.iter()
.map(|pdu| pdu.state_key().unwrap())
.collect::<Vec<_>>()
);
}
return Ok(lazy_membership_events);
}
}
// lazy loading is disabled, `state` is empty.
return Ok(vec![]);
}),
)
.ignore_err()
.ready_for_each(|event_id| {
state_event_ids.insert(event_id);
})
.await;
}
/*
at this point, either the timeline is `limited` or the DAG has a split in it. this necessitates
computing the incremental state (which may be empty).
NOTE: this code path does not use the `lazy_membership_events` parameter. any changes to membership will be included
in the incremental state. therefore, the incremental state may include "redundant" membership events,
which we do not filter out because A. the spec forbids lazy-load filtering if the timeline is `limited`,
and B. DAG splits which require sending extra membership state events are (probably) uncommon enough that
the performance penalty is acceptable.
*/
trace!(%timeline_is_linear, %timeline.limited, "computing state for incremental sync");
// fetch the shorteventids of state events in the timeline
let state_events_in_timeline: BTreeSet<ShortEventId> = services
.rooms
.short
.multi_get_or_create_shorteventid(timeline.pdus.iter().filter_map(|(_, pdu)| {
if pdu.state_key().is_some() {
Some(pdu.event_id.as_ref())
} else {
None
}
}))
.collect()
.await;
trace!("{} state events in timeline", state_events_in_timeline.len());
/*
fetch the state events which were added since the last sync.
specifically we fetch the difference between the state at the last sync and the state at the _end_
of the timeline, and then we filter out state events in the timeline itself using the shorteventids we fetched.
this is necessary to account for splits in the DAG, as explained above.
*/
let state_diff = services
// Fetch the state events added since the last sync.
services
.rooms
.short
.multi_get_eventid_from_short::<'_, OwnedEventId, _>(
@@ -252,18 +158,29 @@ pub(super) async fn build_state_incremental<'a>(
.state_added((last_sync_end_shortstatehash, timeline_end_shortstatehash))
.await?
.stream()
.ready_filter_map(|(_, shorteventid)| {
if state_events_in_timeline.contains(&shorteventid) {
None
} else {
Some(shorteventid)
}
}),
.map(at!(1)),
)
.ignore_err();
.ignore_err()
.ready_for_each(|event_id| {
state_event_ids.insert(event_id);
})
.await;
// finally, fetch the PDU contents and collect them into a vec
let state_diff_pdus = state_diff
if !use_state_after {
// If state_after isn't enabled, filter out state events which also exist
// in the timeline. If splits exist in the DAG, this may not be exactly the same
// thing as the state diff ending at the start of the timeline, but Synapse
// also does this and it's technically more useful behavior anyway.
// See: https://github.com/element-hq/synapse/issues/16941
for (_, pdu) in &timeline.pdus {
state_event_ids.remove(pdu.event_id());
}
}
// Finally, fetch the PDU contents and collect them into a vec
let state_diff_pdus = state_event_ids
.stream()
.broad_filter_map(|event_id| async move {
services
.rooms
+27 -7
View File
@@ -15,7 +15,7 @@
BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
future::ReadyEqExt,
math::{ruma_from_usize, usize_from_ruma},
stream::WidebandExt,
stream::{TryIgnore, WidebandExt},
},
warn,
};
@@ -41,6 +41,7 @@
uint,
};
use service::account_data::AnyRawAccountDataEvent;
use tokio::pin;
use super::share_encrypted_room;
use crate::{
@@ -858,12 +859,31 @@ async fn collect_e2ee<'a, Rooms>(
continue;
};
let since_shortstatehash = services
.rooms
.user
.get_token_shortstatehash(room_id, globalsince)
.await
.ok();
let since_shortstatehash = async {
pin! {
let pdus_rev = services
.rooms
.timeline
.pdus_rev(room_id, Some(PduCount::Normal(globalsince.saturating_sub(1))))
.ignore_err();
}
let (count, pdu_at_last_sync_end) = pdus_rev.next().await?;
if matches!(count, PduCount::Backfilled(_)) {
None
} else {
Some(
services
.rooms
.state_accessor
.pdu_shortstatehash(&pdu_at_last_sync_end.event_id)
.await
.expect("pdu should have a shortstatehash"),
)
}
}
.await;
let encrypted_room = services
.rooms
+1
View File
@@ -21,6 +21,7 @@ pub fn versions() -> Vec<String> {
"v1.12".to_owned(),
"v1.13".to_owned(),
"v1.14".to_owned(),
"v1.16".to_owned(),
]
}
+1 -6
View File
@@ -193,12 +193,7 @@ pub(super) fn open_list(db: &Arc<Engine>, maps: &[Descriptor]) -> Result<Maps> {
},
Descriptor {
name: "roomsynctoken_shortstatehash",
file_shape: 3,
val_size_hint: Some(8),
block_size: 512,
compression_level: 3,
bottommost_level: Some(6),
..descriptor::SEQUENTIAL
..descriptor::DROPPED
},
Descriptor {
name: "roomuserdataid_accountdata",
+2 -46
View File
@@ -1,10 +1,10 @@
use std::sync::Arc;
use conduwuit::{Result, implement};
use database::{Database, Deserialized, Map};
use database::{Deserialized, Map};
use ruma::{RoomId, UserId};
use crate::{Dep, globals, rooms, rooms::short::ShortStateHash};
use crate::{Dep, globals};
pub struct Service {
db: Data,
@@ -12,32 +12,25 @@ pub struct Service {
}
struct Data {
db: Arc<Database>,
userroomid_notificationcount: Arc<Map>,
userroomid_highlightcount: Arc<Map>,
roomuserid_lastnotificationread: Arc<Map>,
roomsynctoken_shortstatehash: Arc<Map>,
}
struct Services {
globals: Dep<globals::Service>,
short: Dep<rooms::short::Service>,
}
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data {
db: args.db.clone(),
userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(),
userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(),
roomuserid_lastnotificationread: args.db["userroomid_highlightcount"].clone(),
roomsynctoken_shortstatehash: args.db["roomsynctoken_shortstatehash"].clone(),
},
services: Services {
globals: args.depend::<globals::Service>("globals"),
short: args.depend::<rooms::short::Service>("rooms::short"),
},
}))
}
@@ -90,40 +83,3 @@ pub async fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -
.deserialized()
.unwrap_or(0)
}
#[implement(Service)]
pub async fn associate_token_shortstatehash(
&self,
room_id: &RoomId,
token: u64,
shortstatehash: ShortStateHash,
) {
let shortroomid = self
.services
.short
.get_shortroomid(room_id)
.await
.expect("room exists");
let _cork = self.db.db.cork();
let key: &[u64] = &[shortroomid, token];
self.db
.roomsynctoken_shortstatehash
.put(key, shortstatehash);
}
#[implement(Service)]
pub async fn get_token_shortstatehash(
&self,
room_id: &RoomId,
token: u64,
) -> Result<ShortStateHash> {
let shortroomid = self.services.short.get_shortroomid(room_id).await?;
let key: &[u64] = &[shortroomid, token];
self.db
.roomsynctoken_shortstatehash
.qry(key)
.await
.deserialized()
}