Compare commits

...

53 Commits

Author SHA1 Message Date
Ginger
77ae79396f feat: Use OptimisticTransactionDB for the database engine 2025-12-16 09:49:13 -05:00
Jade Ellis
cdc53b3421 fix: Allow using LDAP passwords in UIAA
Fixes #1131

Co-authored-by: Jade Ellis <jade@ellis.link>
2025-12-16 13:55:32 +00:00
Ginger
0b667ae4fd fix(ci): Try explicitly specifying the ref for debian/fedora workflows 2025-12-15 10:21:46 -05:00
unbeatable-101
83baf9b524 Keep location of Continuwuity configuration file consitant 2025-12-13 22:51:16 +00:00
timedout
4f198fb4ef fix: Enforce limits when joining rooms 2025-12-13 22:17:47 +00:00
timedout
1631c0afa4 fix: Perform additional validation on events 2025-12-13 21:36:20 +00:00
Charlotte Hartmann Paludo
862684af28 fix: remove trailing whitespace from secrets read from secrets file 2025-12-13 16:07:51 +00:00
Ginger
7345c241a9 fix: Don't halt and catch fire on deserialization errors in MSC4133 migration 2025-12-12 11:16:52 -05:00
Ginger
6a8b988b36 fix(ci): Downgrade upload-artifact actions again to v3 this time 2025-12-10 11:33:36 -05:00
Ginger
f1d6536793 fix(ci): Downgrade upload-artifact actions to v4 2025-12-10 11:33:36 -05:00
Ginger
cf8d8e4ea6 chore: Post-rebase cleanup 2025-12-09 03:25:04 +00:00
timedout
393d341f07 perf: Throttle frequent device metadata updates & centralise site 2025-12-09 03:25:03 +00:00
timedout
ba55dffa0e perf: Don't increment the device list version when updating local info 2025-12-09 03:25:03 +00:00
timedout
f3115e14ab feat: Update device metadata upon hitting hot endpoints 2025-12-09 03:25:03 +00:00
Ginger
b3fa4705ef chore: Fix line endings 2025-12-07 15:28:19 -05:00
Ginger
53b06a7918 chore(sync/v3): Remove unused imports 2025-12-07 19:58:24 +00:00
Ginger
fafc1d3fd1 fix(sync/v3): Don't send rejected invites on initial syncs 2025-12-07 19:58:24 +00:00
Ginger
dbc74272c3 refactor(sync/v3): Extract left room timeline logic into its own function 2025-12-07 19:58:24 +00:00
Ginger
f11caac05e fix(sync/v3): Don't send dummy leaves on an initial sync 2025-12-07 19:58:24 +00:00
Ginger
e581face44 chore: Formatting 2025-12-07 19:58:24 +00:00
ginger
037ba41adb fix: Nitpicky comment reword 2025-12-07 19:58:24 +00:00
Ginger
941c8f7d52 fix: Bump max startup time to ten minutes in the systemd unit 2025-12-07 19:58:24 +00:00
Ginger
7dae118af9 chore(sync/v3): More goat sacrifices 2025-12-07 19:58:24 +00:00
Ginger
07dfc5528d refactor(sync/v3): Split load_joined_room into smaller functions 2025-12-07 19:58:24 +00:00
ginger
3f4749a796 fix: Correct error message 2025-12-07 19:58:24 +00:00
Ginger
be8d72fafc fix(sync/v3): Add a workaround for matrix-js-sdk/5071 2025-12-07 19:58:24 +00:00
Ginger
0008709481 fix(sync/v3): Stop ignoring leave cache deserialization failures 2025-12-07 19:58:24 +00:00
Ginger
ee51d4357f fix(sync/v3): Do not include the last membership event when syncing left rooms 2025-12-07 19:58:24 +00:00
Ginger
8ffc6d4f15 chore(sync/v3): Sacrifice a goat to clippy 2025-12-07 19:58:24 +00:00
Ginger
93efe89a1f fix(sync/v3): Cache shortstatehashes to speed up migration 2025-12-07 19:58:24 +00:00
Ginger
16f37d21ff fix(sync/v3): Implement a migration for the userroomid_leftstate table 2025-12-07 19:58:24 +00:00
Ginger
800ac8d1f1 fix(sync/v3): Fix invite filtering for federated invites 2025-12-07 19:58:24 +00:00
Ginger
872f5bf077 feat(sync/v3): Remove TL size config option in favor of using the sync filter 2025-12-07 19:58:24 +00:00
Ginger
992217d644 chore(sync/v3): Fix clippy lints 2025-12-07 19:58:24 +00:00
Ginger
4fb4397a9f fix(sync/v3): Remove mysterious membership event manipulation code 2025-12-07 19:58:24 +00:00
Ginger
61b6947e88 fix(sync/v3): Properly sync room heroes 2025-12-07 19:58:24 +00:00
Ginger
876d3faec4 chore(sync/v3): Use "build_*" terminology instead of "calculate_*" 2025-12-07 19:58:24 +00:00
Ginger
9cc0cc69f7 chore(sync/v3): Use more descriptive names for SyncContext properties 2025-12-07 19:58:24 +00:00
Ginger
5513bb4dff chore: Remove unneeded comment 2025-12-07 19:58:24 +00:00
Ginger
693e327004 fix: Use prepare_lazily_loaded_members for joined rooms
Also, don't take read receipts into consideration for lazy loading.
Synapse doesn't do this and they're making initial syncs very large.
2025-12-07 19:58:24 +00:00
Ginger
3e6571a2b8 chore: Clippy fixes 2025-12-07 19:58:24 +00:00
Jade Ellis
f0f10f8f3e feat: Typing notifications in simplified sliding sync
What's missing? Being able to use separate rooms & lists for typing
indicators.
At the moment, we use the same ones as we use for the timeline, as
todo_rooms is quite intertwined. We need to disentangle this to get that
functionality, although I'm not sure if clients use it.
2025-12-07 19:58:24 +00:00
Ginger
a4f2b55a8a feat: Add a config option to change the max TL size for legacy sync 2025-12-07 19:58:24 +00:00
Ginger
213a361c53 fix: Set limited to true for newly joined rooms again 2025-12-07 19:58:24 +00:00
Ginger
1c21e4af6e fix: Properly sync left rooms
- Remove most usages of `update_membership` in favor
  of directly calling the `mark_as_*` functions
- Store the leave membership event as the value in the
  `userroomid_leftstate` table
- Use the `userroomid_leftstate` table to synchronize the
  timeline and state for left rooms if possible
2025-12-07 19:58:24 +00:00
Ginger
fceaaedc04 fix: Properly sync newly joined rooms 2025-12-07 19:58:24 +00:00
Ginger
0eff173c0b fix(sync/v3): Further cleanup + improve incremental sync consistency 2025-12-07 19:58:24 +00:00
Ginger
72bf8e5927 fix: Correctly send limited timelines again 2025-12-07 19:58:24 +00:00
Ginger
3491f653a5 refactor: Split sync v3 into multiple files 2025-12-07 19:58:24 +00:00
Ginger
e820dd7aed feat: Drop support for MSC3575 (legacy sliding sync) 2025-12-07 19:58:24 +00:00
Ginger
c92b7239a8 chore: Clippy fixes 2025-12-07 19:58:24 +00:00
Ginger
2940bc69c1 fix(sync/v3): Cleanup part 1: mostly fix redundant data in state 2025-12-07 19:58:24 +00:00
Jade
502919b248 chore: Tell continuwuity.org to use my livekit instance 2025-12-04 14:23:02 +00:00
61 changed files with 2775 additions and 2571 deletions

View File

@@ -54,7 +54,7 @@ runs:
run: mv /tmp/binaries/sbin/conduwuit /tmp/binaries/conduwuit${{ inputs.cpu_suffix }}-${{ inputs.slug }}${{ inputs.artifact_suffix }}
- name: Upload binary artifact
uses: forgejo/upload-artifact@v4
uses: forgejo/upload-artifact@v3
with:
name: conduwuit${{ inputs.cpu_suffix }}-${{ inputs.slug }}${{ inputs.artifact_suffix }}
path: /tmp/binaries/conduwuit${{ inputs.cpu_suffix }}-${{ inputs.slug }}${{ inputs.artifact_suffix }}
@@ -62,7 +62,7 @@ runs:
- name: Upload digest
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
uses: forgejo/upload-artifact@v4
uses: forgejo/upload-artifact@v3
with:
name: digests${{ inputs.digest_suffix }}-${{ inputs.slug }}${{ inputs.cpu_suffix }}
path: /tmp/digests/*

View File

@@ -35,6 +35,7 @@ jobs:
uses: actions/checkout@v6
with:
fetch-depth: 0
ref: ${{ github.ref_name }}
- name: Cache Cargo registry
uses: actions/cache@v4
@@ -126,7 +127,7 @@ jobs:
[ -f /etc/conduwuit/conduwuit.toml ] && echo "✅ Config file installed"
- name: Upload deb artifact
uses: actions/upload-artifact@v5
uses: actions/upload-artifact@v3
with:
name: continuwuity-${{ steps.debian-version.outputs.distribution }}
path: ${{ steps.cargo-deb.outputs.path }}

View File

@@ -33,6 +33,7 @@ jobs:
uses: actions/checkout@v6
with:
fetch-depth: 0
ref: ${{ github.ref_name }}
- name: Cache DNF packages
@@ -238,13 +239,13 @@ jobs:
cp $BIN_RPM upload-bin/
- name: Upload binary RPM
uses: actions/upload-artifact@v5
uses: actions/upload-artifact@v3
with:
name: continuwuity
path: upload-bin/
- name: Upload debug RPM artifact
uses: actions/upload-artifact@v5
uses: actions/upload-artifact@v3
with:
name: continuwuity-debug
path: artifacts/*debuginfo*.rpm

View File

@@ -109,7 +109,7 @@ jobs:
cat ./element-web/webapp/config.json
- name: 📤 Upload Artifact
uses: forgejo/upload-artifact@v4
uses: forgejo/upload-artifact@v3
with:
name: element-web
path: ./element-web/webapp/

View File

@@ -134,7 +134,7 @@ ### Example systemd Unit File
## Creating the Continuwuity configuration file
Now you need to create the Continuwuity configuration file in
`/etc/continuwuity/continuwuity.toml`. You can find an example configuration at
`/etc/conduwuit/conduwuit.toml`. You can find an example configuration at
[conduwuit-example.toml](../reference/config.mdx).
**Please take a moment to read the config. You need to change at least the

View File

@@ -1 +1 @@
{"m.homeserver":{"base_url": "https://matrix.continuwuity.org"},"org.matrix.msc3575.proxy":{"url": "https://matrix.continuwuity.org"}}
{"m.homeserver":{"base_url": "https://matrix.continuwuity.org"},"org.matrix.msc3575.proxy":{"url": "https://matrix.continuwuity.org"},"org.matrix.msc4143.rtc_foci":[{"type":"livekit","livekit_service_url":"https://livekit.ellis.link"}]}

View File

@@ -63,7 +63,7 @@ Restart=on-failure
RestartSec=5
TimeoutStopSec=4m
TimeoutStartSec=4m
TimeoutStartSec=10m
StartLimitInterval=1m
StartLimitBurst=5

View File

@@ -41,7 +41,7 @@ async fn changes_since(
let results: Vec<_> = self
.services
.account_data
.changes_since(room_id.as_deref(), &user_id, since, None)
.changes_since(room_id.as_deref(), &user_id, Some(since), None)
.collect()
.await;
let query_time = timer.elapsed();

View File

@@ -389,7 +389,7 @@ pub(crate) async fn get_key_changes_route(
device_list_updates.extend(
services
.users
.keys_changed(sender_user, from, Some(to))
.keys_changed(sender_user, Some(from), Some(to))
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
.await,
@@ -401,7 +401,7 @@ pub(crate) async fn get_key_changes_route(
device_list_updates.extend(
services
.users
.room_keys_changed(room_id, from, Some(to))
.room_keys_changed(room_id, Some(from), Some(to))
.map(|(user_id, _)| user_id)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()

View File

@@ -44,6 +44,7 @@
rooms::{
state::RoomMutexGuard,
state_compressor::{CompressedState, HashSetCompressStateEvent},
timeline::pdu_fits,
},
};
@@ -573,6 +574,13 @@ async fn join_room_by_id_helper_remote(
return state;
},
};
if !pdu_fits(&mut value.clone()) {
warn!(
"dropping incoming PDU {event_id} in room {room_id} from room join because \
it exceeds 65535 bytes or is otherwise too large."
);
return state;
}
services.rooms.outlier.add_pdu_outlier(&event_id, &value);
if let Some(state_key) = &pdu.state_key {
let shortstatekey = services

View File

@@ -5,7 +5,7 @@
use conduwuit::{
Err, Result, debug, debug_info, debug_warn, err, info,
matrix::{
event::{Event, gen_event_id},
event::gen_event_id,
pdu::{PduBuilder, PduEvent},
},
result::FlatOk,
@@ -458,7 +458,7 @@ async fn knock_room_helper_local(
.await,
};
let send_knock_response = services
services
.sending
.send_federation_request(&remote_server, send_knock_request)
.await?;
@@ -477,20 +477,14 @@ async fn knock_room_helper_local(
.map_err(|e| err!(BadServerResponse("Invalid knock event PDU: {e:?}")))?;
info!("Updating membership locally to knock state with provided stripped state events");
// TODO: this call does not appear to do anything because `update_membership`
// doesn't call `mark_as_knock`. investigate further, ideally with the aim of
// removing this call entirely -- Ginger thinks `update_membership` should only
// be called from `force_state` and `append_pdu`.
services
.rooms
.state_cache
.update_membership(
room_id,
sender_user,
parsed_knock_pdu
.get_content::<RoomMemberEventContent>()
.expect("we just created this"),
sender_user,
Some(send_knock_response.knock_room_state),
None,
false,
)
.update_membership(room_id, sender_user, &parsed_knock_pdu, false)
.await?;
info!("Appending room knock event locally");
@@ -677,20 +671,11 @@ async fn knock_room_helper_remote(
.await?;
info!("Updating membership locally to knock state with provided stripped state events");
// TODO: see TODO on the other call to `update_membership`
services
.rooms
.state_cache
.update_membership(
room_id,
sender_user,
parsed_knock_pdu
.get_content::<RoomMemberEventContent>()
.expect("we just created this"),
sender_user,
Some(send_knock_response.knock_room_state),
None,
false,
)
.update_membership(room_id, sender_user, &parsed_knock_pdu, false)
.await?;
info!("Appending room knock event locally");

View File

@@ -2,12 +2,12 @@
use axum::extract::State;
use conduwuit::{
Err, Result, debug_info, debug_warn, err,
Err, Pdu, Result, debug_info, debug_warn, err,
matrix::{event::gen_event_id, pdu::PduBuilder},
utils::{self, FutureBoolExt, future::ReadyEqExt},
warn,
};
use futures::{FutureExt, StreamExt, TryFutureExt, pin_mut};
use futures::{FutureExt, StreamExt, pin_mut};
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, OwnedServerName, RoomId, RoomVersionId, UserId,
api::{
@@ -81,42 +81,9 @@ pub async fn leave_room(
room_id: &RoomId,
reason: Option<String>,
) -> Result {
let default_member_content = RoomMemberEventContent {
membership: MembershipState::Leave,
reason: reason.clone(),
join_authorized_via_users_server: None,
is_direct: None,
avatar_url: None,
displayname: None,
third_party_invite: None,
blurhash: None,
redact_events: None,
};
let is_banned = services.rooms.metadata.is_banned(room_id);
let is_disabled = services.rooms.metadata.is_disabled(room_id);
pin_mut!(is_banned, is_disabled);
if is_banned.or(is_disabled).await {
// the room is banned/disabled, the room must be rejected locally since we
// cant/dont want to federate with this server
services
.rooms
.state_cache
.update_membership(
room_id,
user_id,
default_member_content,
user_id,
None,
None,
true,
)
.await?;
return Ok(());
}
let dont_have_room = services
.rooms
.state_cache
@@ -129,44 +96,41 @@ pub async fn leave_room(
.is_knocked(user_id, room_id)
.eq(&false);
// Ask a remote server if we don't have this room and are not knocking on it
if dont_have_room.and(not_knocked).await {
if let Err(e) =
remote_leave_room(services, user_id, room_id, reason.clone(), HashSet::new())
.boxed()
.await
{
warn!(%user_id, "Failed to leave room {room_id} remotely: {e}");
// Don't tell the client about this error
}
pin_mut!(is_banned, is_disabled);
let last_state = services
.rooms
.state_cache
.invite_state(user_id, room_id)
.or_else(|_| services.rooms.state_cache.knock_state(user_id, room_id))
.or_else(|_| services.rooms.state_cache.left_state(user_id, room_id))
/*
there are three possible cases when leaving a room:
1. the room is banned or disabled, so we're not federating with it.
2. nobody on the homeserver is in the room, which can happen if the user is rejecting an invite
to a room that we don't have any members in.
3. someone else on the homeserver is in the room. in this case we can leave like normal by sending a PDU over federation.
in cases 1 and 2, we have to update the state cache using `mark_as_left` directly.
otherwise `build_and_append_pdu` will take care of updating the state cache for us.
*/
// `leave_pdu` is the outlier `m.room.member` event which will be synced to the
// user. if it's None the sync handler will create a dummy PDU.
let leave_pdu = if is_banned.or(is_disabled).await {
// case 1: the room is banned/disabled. we don't want to federate with another
// server to leave, so we can't create an outlier PDU.
None
} else if dont_have_room.and(not_knocked).await {
// case 2: ask a remote server to assist us with leaving
// we always mark the room as left locally, regardless of if the federated leave
// failed
remote_leave_room(services, user_id, room_id, reason.clone(), HashSet::new())
.await
.ok();
// We always drop the invite, we can't rely on other servers
services
.rooms
.state_cache
.update_membership(
room_id,
user_id,
default_member_content,
user_id,
last_state,
None,
true,
)
.await?;
.inspect_err(|err| {
warn!(%user_id, "Failed to leave room {room_id} remotely: {err}");
})
.ok()
} else {
// case 3: we can leave by sending a PDU.
let state_lock = services.rooms.state.mutex.lock(room_id).await;
let Ok(event) = services
let user_member_event_content = services
.rooms
.state_accessor
.room_state_get_content::<RoomMemberEventContent>(
@@ -174,44 +138,61 @@ pub async fn leave_room(
&StateEventType::RoomMember,
user_id.as_str(),
)
.await
else {
debug_warn!(
"Trying to leave a room you are not a member of, marking room as left locally."
);
.await;
return services
.rooms
.state_cache
.update_membership(
room_id,
user_id,
default_member_content,
user_id,
None,
None,
true,
)
.await;
};
match user_member_event_content {
| Ok(content) => {
services
.rooms
.timeline
.build_and_append_pdu(
PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
membership: MembershipState::Leave,
reason,
join_authorized_via_users_server: None,
is_direct: None,
..content
}),
user_id,
Some(room_id),
&state_lock,
)
.await?;
services
.rooms
.timeline
.build_and_append_pdu(
PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
membership: MembershipState::Leave,
reason,
join_authorized_via_users_server: None,
is_direct: None,
..event
}),
user_id,
Some(room_id),
&state_lock,
)
.await?;
}
// `build_and_append_pdu` calls `mark_as_left` internally, so we return early.
return Ok(());
},
| Err(_) => {
// an exception to case 3 is if the user isn't even in the room they're trying
// to leave. this can happen if the client's caching is wrong.
debug_warn!(
"Trying to leave a room you are not a member of, marking room as left \
locally."
);
// return the existing leave state, if one exists. `mark_as_left` will then
// update the `roomuserid_leftcount` table, making the leave come down sync
// again.
services
.rooms
.state_cache
.left_state(user_id, room_id)
.await?
},
}
};
services
.rooms
.state_cache
.mark_as_left(user_id, room_id, leave_pdu)
.await;
services
.rooms
.state_cache
.update_joined_count(room_id)
.await;
Ok(())
}
@@ -222,7 +203,7 @@ pub async fn remote_leave_room<S: ::std::hash::BuildHasher>(
room_id: &RoomId,
reason: Option<String>,
mut servers: HashSet<OwnedServerName, S>,
) -> Result<()> {
) -> Result<Pdu> {
let mut make_leave_response_and_server =
Err!(BadServerResponse("No remote server available to assist in leaving {room_id}."));
@@ -393,7 +374,7 @@ pub async fn remote_leave_room<S: ::std::hash::BuildHasher>(
&remote_server,
federation::membership::create_leave_event::v2::Request {
room_id: room_id.to_owned(),
event_id,
event_id: event_id.clone(),
pdu: services
.sending
.convert_to_outgoing_federation_event(leave_event.clone())
@@ -402,5 +383,14 @@ pub async fn remote_leave_room<S: ::std::hash::BuildHasher>(
)
.await?;
Ok(())
services
.rooms
.outlier
.add_pdu_outlier(&event_id, &leave_event);
let leave_pdu = Pdu::from_id_val(&event_id, leave_event).map_err(|e| {
err!(BadServerResponse("Invalid leave PDU received during federated leave: {e:?}"))
})?;
Ok(leave_pdu)
}

View File

@@ -1,4 +1,5 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, at,
matrix::{
@@ -16,7 +17,7 @@
Services,
rooms::{
lazy_loading,
lazy_loading::{Options, Witness},
lazy_loading::{MemberSet, Options},
timeline::PdusIterItem,
},
};
@@ -70,6 +71,7 @@
/// where the user was joined, depending on `history_visibility`)
pub(crate) async fn get_message_events_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<get_message_events::v3::Request>,
) -> Result<get_message_events::v3::Response> {
debug_assert!(IGNORED_MESSAGE_TYPES.is_sorted(), "IGNORED_MESSAGE_TYPES is not sorted");
@@ -78,6 +80,11 @@ pub(crate) async fn get_message_events_route(
let room_id = &body.room_id;
let filter = &body.filter;
services
.users
.update_device_last_seen(sender_user, sender_device, client_ip)
.await;
if !services.rooms.metadata.exists(room_id).await {
return Err!(Request(Forbidden("Room does not exist to this server")));
}
@@ -162,7 +169,7 @@ pub(crate) async fn get_message_events_route(
let state = witness
.map(Option::into_iter)
.map(|option| option.flat_map(Witness::into_iter))
.map(|option| option.flat_map(MemberSet::into_iter))
.map(IterStream::stream)
.into_stream()
.flatten()
@@ -192,7 +199,7 @@ pub(crate) async fn lazy_loading_witness<'a, I>(
services: &Services,
lazy_loading_context: &lazy_loading::Context<'_>,
events: I,
) -> Witness
) -> MemberSet
where
I: Iterator<Item = &'a PdusIterItem> + Clone + Send,
{
@@ -213,10 +220,10 @@ pub(crate) async fn lazy_loading_witness<'a, I>(
let receipts = services
.rooms
.read_receipt
.readreceipts_since(lazy_loading_context.room_id, oldest.into_unsigned());
.readreceipts_since(lazy_loading_context.room_id, Some(oldest.into_unsigned()));
pin_mut!(receipts);
let witness: Witness = events
let witness: MemberSet = events
.stream()
.map(ref_at!(1))
.map(Event::sender)
@@ -224,7 +231,7 @@ pub(crate) async fn lazy_loading_witness<'a, I>(
.chain(
receipts
.ready_take_while(|(_, c, _)| *c <= newest.into_unsigned())
.map(|(user_id, ..)| user_id.to_owned()),
.map(|(user_id, ..)| user_id),
)
.collect()
.await;
@@ -232,7 +239,7 @@ pub(crate) async fn lazy_loading_witness<'a, I>(
services
.rooms
.lazy_loading
.witness_retain(witness, lazy_loading_context)
.retain_lazy_members(witness, lazy_loading_context)
.await
}

View File

@@ -1,6 +1,7 @@
use std::collections::BTreeMap;
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, PduCount, Result, err};
use ruma::{
MilliSecondsSinceUnixEpoch,
@@ -118,9 +119,14 @@ pub(crate) async fn set_read_marker_route(
/// Sets private read marker and public read receipt EDU.
pub(crate) async fn create_receipt_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<create_receipt::v3::Request>,
) -> Result<create_receipt::v3::Response> {
let sender_user = body.sender_user();
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
.await;
if matches!(
&body.receipt_type,

View File

@@ -1,4 +1,5 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, matrix::pdu::PduBuilder};
use ruma::{
api::client::redact::redact_event, events::room::redaction::RoomRedactionEventContent,
@@ -13,9 +14,14 @@
/// - TODO: Handle txn id
pub(crate) async fn redact_event_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<redact_event::v3::Request>,
) -> Result<redact_event::v3::Response> {
let sender_user = body.sender_user();
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
.await;
let body = &body.body;
if services.users.is_suspended(sender_user).await? {
// TODO: Users can redact their own messages while suspended

View File

@@ -1,6 +1,7 @@
use std::collections::BTreeMap;
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, err, matrix::pdu::PduBuilder, utils};
use ruma::{api::client::message::send_message_event, events::MessageLikeEventType};
use serde_json::from_str;
@@ -18,6 +19,7 @@
/// allowed
pub(crate) async fn send_message_event_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<send_message_event::v3::Request>,
) -> Result<send_message_event::v3::Response> {
let sender_user = body.sender_user();
@@ -27,6 +29,11 @@ pub(crate) async fn send_message_event_route(
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
.await;
// Forbid m.room.encrypted if encryption is disabled
if MessageLikeEventType::RoomEncrypted == body.event_type && !services.config.allow_encryption
{

View File

@@ -1,4 +1,5 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, err,
matrix::{Event, pdu::PduBuilder},
@@ -7,7 +8,7 @@
use conduwuit_service::Services;
use futures::{FutureExt, TryStreamExt};
use ruma::{
OwnedEventId, RoomId, UserId,
MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId,
api::client::state::{get_state_events, get_state_events_for_key, send_state_event},
events::{
AnyStateEventContent, StateEventType,
@@ -30,9 +31,14 @@
/// Sends a state event into the room.
pub(crate) async fn send_state_event_for_key_route(
State(services): State<crate::State>,
InsecureClientIp(ip): InsecureClientIp,
body: Ruma<send_state_event::v3::Request>,
) -> Result<send_state_event::v3::Response> {
let sender_user = body.sender_user();
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), ip)
.await;
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
@@ -61,9 +67,10 @@ pub(crate) async fn send_state_event_for_key_route(
/// Sends a state event into the room.
pub(crate) async fn send_state_event_for_empty_key_route(
State(services): State<crate::State>,
InsecureClientIp(ip): InsecureClientIp,
body: Ruma<send_state_event::v3::Request>,
) -> Result<RumaResponse<send_state_event::v3::Response>> {
send_state_event_for_key_route(State(services), body)
send_state_event_for_key_route(State(services), InsecureClientIp(ip), body)
.boxed()
.await
.map(RumaResponse)
@@ -185,7 +192,7 @@ async fn send_state_event_for_key_helper(
event_type: &StateEventType,
json: &Raw<AnyStateEventContent>,
state_key: &str,
timestamp: Option<ruma::MilliSecondsSinceUnixEpoch>,
timestamp: Option<MilliSecondsSinceUnixEpoch>,
) -> Result<OwnedEventId> {
allowed_to_send_state_event(services, room_id, event_type, state_key, json).await?;
let state_lock = services.rooms.state.mutex.lock(room_id).await;

View File

@@ -1,65 +1,123 @@
mod v3;
mod v4;
mod v5;
use std::collections::VecDeque;
use conduwuit::{
Error, PduCount, Result,
Event, PduCount, Result, err,
matrix::pdu::PduEvent,
ref_at, trace,
utils::stream::{BroadbandExt, ReadyExt, TryIgnore},
};
use conduwuit_service::Services;
use futures::{StreamExt, pin_mut};
use futures::StreamExt;
use ruma::{
RoomId, UserId,
OwnedUserId, RoomId, UserId,
events::TimelineEventType::{
self, Beacon, CallInvite, PollStart, RoomEncrypted, RoomMessage, Sticker,
},
};
pub(crate) use self::{
v3::sync_events_route, v4::sync_events_v4_route, v5::sync_events_v5_route,
};
pub(crate) use self::{v3::sync_events_route, v5::sync_events_v5_route};
pub(crate) const DEFAULT_BUMP_TYPES: &[TimelineEventType; 6] =
&[CallInvite, PollStart, Beacon, RoomEncrypted, RoomMessage, Sticker];
#[derive(Default)]
pub(crate) struct TimelinePdus {
pub pdus: VecDeque<(PduCount, PduEvent)>,
pub limited: bool,
}
impl TimelinePdus {
fn senders(&self) -> impl Iterator<Item = OwnedUserId> {
self.pdus
.iter()
.map(ref_at!(1))
.map(Event::sender)
.map(Into::into)
}
}
/// Load up to `limit` PDUs in the range (starting_count, ending_count].
async fn load_timeline(
services: &Services,
sender_user: &UserId,
room_id: &RoomId,
roomsincecount: PduCount,
next_batch: Option<PduCount>,
starting_count: Option<PduCount>,
ending_count: Option<PduCount>,
limit: usize,
) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
let last_timeline_count = services
.rooms
.timeline
.last_timeline_count(Some(sender_user), room_id)
.await?;
) -> Result<TimelinePdus> {
let mut pdu_stream = match starting_count {
| Some(starting_count) => {
let last_timeline_count = services
.rooms
.timeline
.last_timeline_count(Some(sender_user), room_id)
.await
.map_err(|err| {
err!(Database(warn!("Failed to fetch end of room timeline: {}", err)))
})?;
if last_timeline_count <= roomsincecount {
return Ok((Vec::new(), false));
}
if last_timeline_count <= starting_count {
// no messages have been sent in this room since `starting_count`
return Ok(TimelinePdus::default());
}
let non_timeline_pdus = services
.rooms
.timeline
.pdus_rev(Some(sender_user), room_id, None)
.ignore_err()
.ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max))
.ready_take_while(|&(pducount, _)| pducount > roomsincecount);
// for incremental sync, stream from the DB all PDUs which were sent after
// `starting_count` but before `ending_count`, including `ending_count` but
// not `starting_count`. this code is pretty similar to the initial sync
// branch, they're separate to allow for future optimization
services
.rooms
.timeline
.pdus_rev(
Some(sender_user),
room_id,
ending_count.map(|count| count.saturating_add(1)),
)
.ignore_err()
.ready_take_while(move |&(pducount, _)| pducount > starting_count)
.boxed()
},
| None => {
// For initial sync, stream from the DB all PDUs before and including
// `ending_count` in reverse order
services
.rooms
.timeline
.pdus_rev(
Some(sender_user),
room_id,
ending_count.map(|count| count.saturating_add(1)),
)
.ignore_err()
.boxed()
},
};
// Take the last events for the timeline
pin_mut!(non_timeline_pdus);
let timeline_pdus: Vec<_> = non_timeline_pdus.by_ref().take(limit).collect().await;
// Return at most `limit` PDUs from the stream
let pdus = pdu_stream
.by_ref()
.take(limit)
.ready_fold(VecDeque::with_capacity(limit), |mut pdus, item| {
pdus.push_front(item);
pdus
})
.await;
let timeline_pdus: Vec<_> = timeline_pdus.into_iter().rev().collect();
// The timeline is limited if there are still more PDUs in the stream
let limited = pdu_stream.next().await.is_some();
// They /sync response doesn't always return all messages, so we say the output
// is limited unless there are events in non_timeline_pdus
let limited = non_timeline_pdus.next().await.is_some();
trace!(
"syncing {:?} timeline pdus from {:?} to {:?} (limited = {:?})",
pdus.len(),
starting_count,
ending_count,
limited,
);
Ok((timeline_pdus, limited))
Ok(TimelinePdus { pdus, limited })
}
async fn share_encrypted_room(

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,852 @@
use std::collections::{BTreeMap, HashSet};
use conduwuit::{
Result, at, debug_warn, err, extract_variant,
matrix::{
Event,
pdu::{PduCount, PduEvent},
},
trace,
utils::{
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
math::ruma_from_u64,
stream::{TryIgnore, WidebandExt},
},
warn,
};
use conduwuit_service::Services;
use futures::{
FutureExt, StreamExt, TryFutureExt,
future::{OptionFuture, join, join3, join4, try_join, try_join3},
};
use ruma::{
OwnedRoomId, OwnedUserId, RoomId, UserId,
api::client::sync::sync_events::{
UnreadNotificationsCount,
v3::{Ephemeral, JoinedRoom, RoomAccountData, RoomSummary, State as RoomState, Timeline},
},
events::{
AnyRawAccountDataEvent, StateEventType,
TimelineEventType::*,
room::member::{MembershipState, RoomMemberEventContent},
},
serde::Raw,
uint,
};
use service::rooms::short::ShortStateHash;
use super::{load_timeline, share_encrypted_room};
use crate::client::{
TimelinePdus, ignored_filter,
sync::v3::{
DEFAULT_TIMELINE_LIMIT, DeviceListUpdates, SyncContext, prepare_lazily_loaded_members,
state::{build_state_incremental, build_state_initial},
},
};
/// Generate the sync response for a room the user is joined to.
#[tracing::instrument(
name = "joined",
level = "debug",
skip_all,
fields(
room_id = ?room_id,
syncing_user = ?sync_context.syncing_user,
),
)]
pub(super) async fn load_joined_room(
services: &Services,
sync_context: SyncContext<'_>,
ref room_id: OwnedRoomId,
) -> Result<(JoinedRoom, DeviceListUpdates)> {
/*
Building a sync response involves many steps which all depend on each other.
To parallelize the process as much as possible, each step is divided into its own function,
and `join*` functions are used to perform steps in parallel which do not depend on each other.
*/
let (
account_data,
ephemeral,
StateAndTimeline {
state_events,
timeline,
summary,
notification_counts,
device_list_updates,
},
) = try_join3(
build_account_data(services, sync_context, room_id),
build_ephemeral(services, sync_context, room_id),
build_state_and_timeline(services, sync_context, room_id),
)
.boxed()
.await?;
if !timeline.is_empty() || !state_events.is_empty() {
trace!(
"syncing {} timeline events (limited = {}) and {} state events",
timeline.events.len(),
timeline.limited,
state_events.len()
);
}
let joined_room = JoinedRoom {
account_data,
summary: summary.unwrap_or_default(),
unread_notifications: notification_counts.unwrap_or_default(),
timeline,
state: RoomState {
events: state_events.into_iter().map(Event::into_format).collect(),
},
ephemeral,
unread_thread_notifications: BTreeMap::new(),
};
Ok((joined_room, device_list_updates))
}
/// Collect changes to the syncing user's account data events.
#[tracing::instrument(level = "debug", skip_all)]
async fn build_account_data(
services: &Services,
SyncContext {
syncing_user,
last_sync_end_count,
current_count,
..
}: SyncContext<'_>,
room_id: &RoomId,
) -> Result<RoomAccountData> {
let account_data_changes = services
.account_data
.changes_since(Some(room_id), syncing_user, last_sync_end_count, Some(current_count))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect()
.await;
Ok(RoomAccountData { events: account_data_changes })
}
/// Collect new ephemeral events.
#[tracing::instrument(level = "debug", skip_all)]
async fn build_ephemeral(
services: &Services,
SyncContext { syncing_user, last_sync_end_count, .. }: SyncContext<'_>,
room_id: &RoomId,
) -> Result<Ephemeral> {
// note: some of the futures below are boxed. this is because, without the box,
// rustc produces over thirty inscrutable errors in `mod.rs` at the call-site
// of `load_joined_room`. I don't know why boxing them fixes this -- it seems
// to be related to the async closures and borrowing from the sync context.
// collect updates to read receipts
let receipt_events = services
.rooms
.read_receipt
.readreceipts_since(room_id, last_sync_end_count)
.filter_map(async |(read_user, _, edu)| {
let is_ignored = services
.users
.user_is_ignored(&read_user, syncing_user)
.await;
// filter out read receipts for ignored users
is_ignored.or_some(edu)
})
.collect::<Vec<_>>()
.boxed();
// collect the updated list of typing users, if it's changed
let typing_event = async {
let should_send_typing_event = match last_sync_end_count {
| Some(last_sync_end_count) => {
match services.rooms.typing.last_typing_update(room_id).await {
| Ok(last_typing_update) => {
// update the typing list if the users typing have changed since the last
// sync
last_typing_update > last_sync_end_count
},
| Err(err) => {
warn!("Error checking last typing update: {}", err);
return None;
},
}
},
// always update the typing list on an initial sync
| None => true,
};
if should_send_typing_event {
let event = services
.rooms
.typing
.typings_event_for_user(room_id, syncing_user)
.await;
if let Ok(event) = event {
return Some(
Raw::new(&event)
.expect("typing event should be valid")
.cast(),
);
}
}
None
};
// collect the syncing user's private-read marker, if it's changed
let private_read_event = async {
let should_send_private_read = match last_sync_end_count {
| Some(last_sync_end_count) => {
let last_privateread_update = services
.rooms
.read_receipt
.last_privateread_update(syncing_user, room_id)
.await;
// update the marker if it's changed since the last sync
last_privateread_update > last_sync_end_count
},
// always update the marker on an initial sync
| None => true,
};
if should_send_private_read {
services
.rooms
.read_receipt
.private_read_get(room_id, syncing_user)
.await
.ok()
} else {
None
}
};
let (receipt_events, typing_event, private_read_event) =
join3(receipt_events, typing_event, private_read_event).await;
let mut edus = receipt_events;
edus.extend(typing_event);
edus.extend(private_read_event);
Ok(Ephemeral { events: edus })
}
/// A struct to hold the state events, timeline, and other data which is
/// computed from them.
struct StateAndTimeline {
state_events: Vec<PduEvent>,
timeline: Timeline,
summary: Option<RoomSummary>,
notification_counts: Option<UnreadNotificationsCount>,
device_list_updates: DeviceListUpdates,
}
/// Compute changes to the room's state and timeline.
#[tracing::instrument(level = "debug", skip_all)]
async fn build_state_and_timeline(
services: &Services,
sync_context: SyncContext<'_>,
room_id: &RoomId,
) -> Result<StateAndTimeline> {
let (shortstatehashes, timeline) = try_join(
fetch_shortstatehashes(services, sync_context, room_id),
build_timeline(services, sync_context, room_id),
)
.await?;
let (state_events, notification_counts, joined_since_last_sync) = try_join3(
build_state_events(services, sync_context, room_id, shortstatehashes, &timeline),
build_notification_counts(services, sync_context, room_id, &timeline),
check_joined_since_last_sync(services, shortstatehashes, sync_context),
)
.await?;
// the timeline should always include at least one PDU if the syncing user
// joined since the last sync, that being the syncing user's join event. if
// it's empty something is wrong.
if joined_since_last_sync && timeline.pdus.is_empty() {
warn!("timeline for newly joined room is empty");
}
let (summary, device_list_updates) = try_join(
build_room_summary(
services,
sync_context,
room_id,
shortstatehashes,
&timeline,
&state_events,
joined_since_last_sync,
),
build_device_list_updates(
services,
sync_context,
room_id,
shortstatehashes,
&state_events,
joined_since_last_sync,
),
)
.await?;
// the token which may be passed to the messages endpoint to backfill room
// history
let prev_batch = timeline.pdus.front().map(at!(0));
// note: we always indicate a limited timeline if the syncing user just joined
// the room, to indicate to the client that it should request backfill (and to
// copy Synapse's behavior). for federated room joins, the `timeline` will
// usually only include the syncing user's join event.
let limited = timeline.limited || joined_since_last_sync;
// filter out ignored events from the timeline and convert the PDUs into Ruma's
// AnySyncTimelineEvent type
let filtered_timeline = timeline
.pdus
.into_iter()
.stream()
.wide_filter_map(|item| ignored_filter(services, item, sync_context.syncing_user))
.map(at!(1))
.map(Event::into_format)
.collect::<Vec<_>>()
.await;
Ok(StateAndTimeline {
state_events,
timeline: Timeline {
limited,
prev_batch: prev_batch.as_ref().map(ToString::to_string),
events: filtered_timeline,
},
summary,
notification_counts,
device_list_updates,
})
}
/// Shortstatehashes necessary to compute what state events to sync.
#[derive(Clone, Copy)]
struct ShortStateHashes {
/// The current state of the syncing room.
current_shortstatehash: ShortStateHash,
/// The state of the syncing room at the end of the last sync.
last_sync_end_shortstatehash: Option<ShortStateHash>,
}
/// Fetch the current_shortstatehash and last_sync_end_shortstatehash.
#[tracing::instrument(level = "debug", skip_all)]
async fn fetch_shortstatehashes(
services: &Services,
SyncContext { last_sync_end_count, current_count, .. }: SyncContext<'_>,
room_id: &RoomId,
) -> Result<ShortStateHashes> {
// the room state currently.
// TODO: this should be the room state as of `current_count`, but there's no way
// to get that right now.
let current_shortstatehash = services
.rooms
.state
.get_room_shortstatehash(room_id)
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
// the room state as of the end of the last sync.
// this will be None if we are doing an initial sync or if we just joined this
// room.
let last_sync_end_shortstatehash =
OptionFuture::from(last_sync_end_count.map(|last_sync_end_count| {
// look up the shortstatehash saved by the last sync's call to
// `associate_token_shortstatehash`
services
.rooms
.user
.get_token_shortstatehash(room_id, last_sync_end_count)
.inspect_err(move |_| {
debug_warn!(
token = last_sync_end_count,
"Room has no shortstatehash for this token"
);
})
.ok()
}))
.map(Option::flatten)
.map(Ok);
let (current_shortstatehash, last_sync_end_shortstatehash) =
try_join(current_shortstatehash, last_sync_end_shortstatehash).await?;
/*
associate the `current_count` with the `current_shortstatehash`, so we can
use it on the next sync as the `last_sync_end_shortstatehash`.
TODO: the table written to by this call grows extremely fast, gaining one new entry for each
joined room on _every single sync request_. we need to find a better way to remember the shortstatehash
between syncs.
*/
services
.rooms
.user
.associate_token_shortstatehash(room_id, current_count, current_shortstatehash)
.await;
Ok(ShortStateHashes {
current_shortstatehash,
last_sync_end_shortstatehash,
})
}
/// Fetch recent timeline events.
#[tracing::instrument(level = "debug", skip_all)]
async fn build_timeline(
services: &Services,
sync_context: SyncContext<'_>,
room_id: &RoomId,
) -> Result<TimelinePdus> {
let SyncContext {
syncing_user,
last_sync_end_count,
current_count,
filter,
..
} = sync_context;
/*
determine the maximum number of events to return in this sync.
if the sync filter specifies a limit, that will be used, otherwise
`DEFAULT_TIMELINE_LIMIT` will be used. `DEFAULT_TIMELINE_LIMIT` will also be
used if the limit is somehow greater than usize::MAX.
*/
let timeline_limit = filter
.room
.timeline
.limit
.and_then(|limit| limit.try_into().ok())
.unwrap_or(DEFAULT_TIMELINE_LIMIT);
load_timeline(
services,
syncing_user,
room_id,
last_sync_end_count.map(PduCount::Normal),
Some(PduCount::Normal(current_count)),
timeline_limit,
)
.await
}
/// Calculate the state events to sync.
async fn build_state_events(
services: &Services,
sync_context: SyncContext<'_>,
room_id: &RoomId,
shortstatehashes: ShortStateHashes,
timeline: &TimelinePdus,
) -> Result<Vec<PduEvent>> {
let SyncContext {
syncing_user,
last_sync_end_count,
full_state,
..
} = sync_context;
let ShortStateHashes {
current_shortstatehash,
last_sync_end_shortstatehash,
} = shortstatehashes;
// the spec states that the `state` property only includes state events up to
// the beginning of the timeline, so we determine the state of the syncing room
// as of the first timeline event. NOTE: this explanation is not entirely
// accurate; see the implementation of `build_state_incremental`.
let timeline_start_shortstatehash = async {
if let Some((_, pdu)) = timeline.pdus.front() {
if let Ok(shortstatehash) = services
.rooms
.state_accessor
.pdu_shortstatehash(&pdu.event_id)
.await
{
return shortstatehash;
}
}
current_shortstatehash
};
// the user IDs of members whose membership needs to be sent to the client, if
// lazy-loading is enabled.
let lazily_loaded_members =
prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders());
let (timeline_start_shortstatehash, lazily_loaded_members) =
join(timeline_start_shortstatehash, lazily_loaded_members).await;
// compute the state delta between the previous sync and this sync.
match (last_sync_end_count, last_sync_end_shortstatehash) {
/*
if `last_sync_end_count` is Some (meaning this is an incremental sync), and `last_sync_end_shortstatehash`
is Some (meaning the syncing user didn't just join this room for the first time ever), and `full_state` is false,
then use `build_state_incremental`.
*/
| (Some(last_sync_end_count), Some(last_sync_end_shortstatehash)) if !full_state =>
build_state_incremental(
services,
syncing_user,
room_id,
PduCount::Normal(last_sync_end_count),
last_sync_end_shortstatehash,
timeline_start_shortstatehash,
current_shortstatehash,
timeline,
lazily_loaded_members.as_ref(),
)
.boxed()
.await,
/*
otherwise use `build_state_initial`. note that this branch will be taken if the user joined this room since the last sync
for the first time ever, because in that case we have no `last_sync_end_shortstatehash` and can't correctly calculate
the state using the incremental sync algorithm.
*/
| _ =>
build_state_initial(
services,
syncing_user,
timeline_start_shortstatehash,
lazily_loaded_members.as_ref(),
)
.boxed()
.await,
}
}
/// Compute the number of unread notifications in this room.
#[tracing::instrument(level = "debug", skip_all)]
async fn build_notification_counts(
services: &Services,
SyncContext { syncing_user, last_sync_end_count, .. }: SyncContext<'_>,
room_id: &RoomId,
timeline: &TimelinePdus,
) -> Result<Option<UnreadNotificationsCount>> {
// determine whether to actually update the notification counts
let should_send_notification_counts = async {
// if we're going to sync some timeline events, the notification count has
// definitely changed to include them
if !timeline.pdus.is_empty() {
return true;
}
// if this is an initial sync, we need to send notification counts because the
// client doesn't know what they are yet
let Some(last_sync_end_count) = last_sync_end_count else {
return true;
};
let last_notification_read = services
.rooms
.user
.last_notification_read(syncing_user, room_id)
.await;
// if the syncing user has read the events we sent during the last sync, we need
// to send a new notification count on this sync.
if last_notification_read > last_sync_end_count {
return true;
}
// otherwise, nothing's changed.
false
};
if should_send_notification_counts.await {
let (notification_count, highlight_count) = join(
services
.rooms
.user
.notification_count(syncing_user, room_id)
.map(TryInto::try_into)
.unwrap_or(uint!(0)),
services
.rooms
.user
.highlight_count(syncing_user, room_id)
.map(TryInto::try_into)
.unwrap_or(uint!(0)),
)
.await;
trace!(?notification_count, ?highlight_count, "syncing new notification counts");
Ok(Some(UnreadNotificationsCount {
notification_count: Some(notification_count),
highlight_count: Some(highlight_count),
}))
} else {
Ok(None)
}
}
/// Check if the syncing user joined the room since their last incremental sync.
#[tracing::instrument(level = "debug", skip_all)]
async fn check_joined_since_last_sync(
services: &Services,
ShortStateHashes { last_sync_end_shortstatehash, .. }: ShortStateHashes,
SyncContext { syncing_user, .. }: SyncContext<'_>,
) -> Result<bool> {
// fetch the syncing user's membership event during the last sync.
// this will be None if `previous_sync_end_shortstatehash` is None.
let membership_during_previous_sync = match last_sync_end_shortstatehash {
| Some(last_sync_end_shortstatehash) => services
.rooms
.state_accessor
.state_get_content(
last_sync_end_shortstatehash,
&StateEventType::RoomMember,
syncing_user.as_str(),
)
.await
.inspect_err(|_| debug_warn!("User has no previous membership"))
.ok(),
| None => None,
};
// TODO: If the requesting user got state-reset out of the room, this
// will be `true` when it shouldn't be. this function should never be called
// in that situation, but it may be if the membership cache didn't get updated.
// the root cause of this needs to be addressed
let joined_since_last_sync =
membership_during_previous_sync.is_none_or(|content: RoomMemberEventContent| {
content.membership != MembershipState::Join
});
if joined_since_last_sync {
trace!("user joined since last sync");
}
Ok(joined_since_last_sync)
}
/// Build the `summary` field of the room object, which includes
/// the number of joined and invited users and the room's heroes.
#[tracing::instrument(level = "debug", skip_all)]
async fn build_room_summary(
services: &Services,
SyncContext { syncing_user, .. }: SyncContext<'_>,
room_id: &RoomId,
ShortStateHashes { current_shortstatehash, .. }: ShortStateHashes,
timeline: &TimelinePdus,
state_events: &[PduEvent],
joined_since_last_sync: bool,
) -> Result<Option<RoomSummary>> {
// determine whether any events in the state or timeline are membership events.
let are_syncing_membership_events = timeline
.pdus
.iter()
.map(|(_, pdu)| pdu)
.chain(state_events.iter())
.any(|event| event.kind == RoomMember);
/*
we only need to send an updated room summary if:
1. there are membership events in the state or timeline, because they might have changed the
membership counts or heroes, or
2. the syncing user just joined this room, which usually implies #1 because their join event should be in the timeline.
*/
if !(are_syncing_membership_events || joined_since_last_sync) {
return Ok(None);
}
let joined_member_count = services
.rooms
.state_cache
.room_joined_count(room_id)
.unwrap_or(0);
let invited_member_count = services
.rooms
.state_cache
.room_invited_count(room_id)
.unwrap_or(0);
let has_name = services
.rooms
.state_accessor
.state_contains_type(current_shortstatehash, &StateEventType::RoomName);
let has_canonical_alias = services
.rooms
.state_accessor
.state_contains_type(current_shortstatehash, &StateEventType::RoomCanonicalAlias);
let (joined_member_count, invited_member_count, has_name, has_canonical_alias) =
join4(joined_member_count, invited_member_count, has_name, has_canonical_alias).await;
// only send heroes if the room has neither a name nor a canonical alias
let heroes = if !(has_name || has_canonical_alias) {
Some(build_heroes(services, room_id, syncing_user, current_shortstatehash).await)
} else {
None
};
trace!(
?joined_member_count,
?invited_member_count,
heroes_length = heroes.as_ref().map(HashSet::len),
"syncing updated summary"
);
Ok(Some(RoomSummary {
heroes: heroes
.map(|heroes| heroes.into_iter().collect())
.unwrap_or_default(),
joined_member_count: Some(ruma_from_u64(joined_member_count)),
invited_member_count: Some(ruma_from_u64(invited_member_count)),
}))
}
/// Fetch the user IDs to include in the `m.heroes` property of the room
/// summary.
async fn build_heroes(
services: &Services,
room_id: &RoomId,
syncing_user: &UserId,
current_shortstatehash: ShortStateHash,
) -> HashSet<OwnedUserId> {
const MAX_HERO_COUNT: usize = 5;
// fetch joined members from the state cache first
let joined_members_stream = services
.rooms
.state_cache
.room_members(room_id)
.map(ToOwned::to_owned);
// then fetch invited members
let invited_members_stream = services
.rooms
.state_cache
.room_members_invited(room_id)
.map(ToOwned::to_owned);
// then as a last resort fetch every membership event
let all_members_stream = services
.rooms
.short
.multi_get_statekey_from_short(
services
.rooms
.state_accessor
.state_full_shortids(current_shortstatehash)
.ignore_err()
.ready_filter_map(|(key, _)| Some(key)),
)
.ignore_err()
.ready_filter_map(|(event_type, state_key)| {
if event_type == StateEventType::RoomMember {
state_key.to_string().try_into().ok()
} else {
None
}
});
joined_members_stream
.chain(invited_members_stream)
.chain(all_members_stream)
// the hero list should never include the syncing user
.ready_filter(|user_id| user_id != syncing_user)
.take(MAX_HERO_COUNT)
.collect()
.await
}
/// Collect updates to users' device lists for E2EE.
#[tracing::instrument(level = "debug", skip_all)]
async fn build_device_list_updates(
services: &Services,
SyncContext {
syncing_user,
last_sync_end_count,
current_count,
..
}: SyncContext<'_>,
room_id: &RoomId,
ShortStateHashes { current_shortstatehash, .. }: ShortStateHashes,
state_events: &Vec<PduEvent>,
joined_since_last_sync: bool,
) -> Result<DeviceListUpdates> {
let is_encrypted_room = services
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")
.is_ok();
// initial syncs don't include device updates, and rooms which aren't encrypted
// don't affect them, so return early in either of those cases
if last_sync_end_count.is_none() || !(is_encrypted_room.await) {
return Ok(DeviceListUpdates::new());
}
let mut device_list_updates = DeviceListUpdates::new();
// add users with changed keys to the `changed` list
services
.users
.room_keys_changed(room_id, last_sync_end_count, Some(current_count))
.map(at!(0))
.map(ToOwned::to_owned)
.ready_for_each(|user_id| {
device_list_updates.changed.insert(user_id);
})
.await;
// add users who now share encrypted rooms to `changed` and
// users who no longer share encrypted rooms to `left`
for state_event in state_events {
if state_event.kind == RoomMember {
let Some(content): Option<RoomMemberEventContent> = state_event.get_content().ok()
else {
continue;
};
let Some(user_id): Option<OwnedUserId> = state_event
.state_key
.as_ref()
.and_then(|key| key.parse().ok())
else {
continue;
};
{
use MembershipState::*;
if matches!(content.membership, Leave | Join) {
let shares_encrypted_room =
share_encrypted_room(services, syncing_user, &user_id, Some(room_id))
.await;
match content.membership {
| Leave if !shares_encrypted_room => {
device_list_updates.left.insert(user_id);
},
| Join if joined_since_last_sync || shares_encrypted_room => {
device_list_updates.changed.insert(user_id);
},
| _ => (),
}
}
}
}
}
if !device_list_updates.is_empty() {
trace!(
changed = device_list_updates.changed.len(),
left = device_list_updates.left.len(),
"syncing device list updates"
);
}
Ok(device_list_updates)
}

View File

@@ -0,0 +1,349 @@
use conduwuit::{
Event, PduCount, PduEvent, Result, at, debug_warn,
pdu::EventHash,
trace,
utils::{self, IterStream, future::ReadyEqExt, stream::WidebandExt as _},
};
use futures::{StreamExt, future::join};
use ruma::{
EventId, OwnedRoomId, RoomId,
api::client::sync::sync_events::v3::{LeftRoom, RoomAccountData, State, Timeline},
events::{StateEventType, TimelineEventType},
uint,
};
use serde_json::value::RawValue;
use service::{Services, rooms::short::ShortStateHash};
use crate::client::{
TimelinePdus, ignored_filter,
sync::{
load_timeline,
v3::{
DEFAULT_TIMELINE_LIMIT, SyncContext, prepare_lazily_loaded_members,
state::build_state_initial,
},
},
};
#[tracing::instrument(
name = "left",
level = "debug",
skip_all,
fields(
room_id = %room_id,
),
)]
#[allow(clippy::too_many_arguments)]
pub(super) async fn load_left_room(
services: &Services,
sync_context: SyncContext<'_>,
ref room_id: OwnedRoomId,
leave_membership_event: Option<PduEvent>,
) -> Result<Option<LeftRoom>> {
let SyncContext {
syncing_user,
last_sync_end_count,
current_count,
filter,
..
} = sync_context;
// the global count as of the moment the user left the room
let Some(left_count) = services
.rooms
.state_cache
.get_left_count(room_id, syncing_user)
.await
.ok()
else {
// if we get here, the membership cache is incorrect, likely due to a state
// reset
debug_warn!("attempting to sync left room but no left count exists");
return Ok(None);
};
// return early if we haven't gotten to this leave yet.
// this can happen if the user leaves while a sync response is being generated
if current_count < left_count {
return Ok(None);
}
// return early if this is an incremental sync, and we've already synced this
// leave to the user, and `include_leave` isn't set on the filter.
if !filter.room.include_leave && last_sync_end_count >= Some(left_count) {
return Ok(None);
}
if let Some(ref leave_membership_event) = leave_membership_event {
debug_assert_eq!(
leave_membership_event.kind,
TimelineEventType::RoomMember,
"leave PDU should be m.room.member"
);
}
let does_not_exist = services.rooms.metadata.exists(room_id).eq(&false).await;
let (timeline, state_events) = match leave_membership_event {
| Some(leave_membership_event) if does_not_exist => {
/*
we have none PDUs with left beef for this room, likely because it was a rejected invite to a room
which nobody on this homeserver is in. `leave_pdu` is the remote-assisted outlier leave event for the room,
which is all we can send to the client.
if this is an initial sync, don't include this room at all to keep the client from asking for
state that we don't have.
*/
if last_sync_end_count.is_none() {
return Ok(None);
}
trace!("syncing remote-assisted leave PDU");
(TimelinePdus::default(), vec![leave_membership_event])
},
| Some(leave_membership_event) => {
// we have this room in our DB, and can fetch the state and timeline from when
// the user left.
let leave_state_key = syncing_user;
debug_assert_eq!(
Some(leave_state_key.as_str()),
leave_membership_event.state_key(),
"leave PDU should be for the user requesting the sync"
);
// the shortstatehash of the state _immediately before_ the syncing user left
// this room. the state represented here _does not_ include
// `leave_membership_event`.
let leave_shortstatehash = services
.rooms
.state_accessor
.pdu_shortstatehash(&leave_membership_event.event_id)
.await?;
let prev_membership_event = services
.rooms
.state_accessor
.state_get(
leave_shortstatehash,
&StateEventType::RoomMember,
leave_state_key.as_str(),
)
.await?;
build_left_state_and_timeline(
services,
sync_context,
room_id,
leave_membership_event,
leave_shortstatehash,
prev_membership_event,
)
.await?
},
| None => {
/*
no leave event was actually sent in this room, but we still need to pretend
like the user left it. this is usually because the room was banned by a server admin.
if this is an incremental sync, generate a fake leave event to make the room vanish from clients.
otherwise we don't tell the client about this room at all.
*/
if last_sync_end_count.is_none() {
return Ok(None);
}
trace!("syncing dummy leave event");
(TimelinePdus::default(), vec![create_dummy_leave_event(
services,
sync_context,
room_id,
)])
},
};
let raw_timeline_pdus = timeline
.pdus
.into_iter()
.stream()
// filter out ignored events from the timeline
.wide_filter_map(|item| ignored_filter(services, item, syncing_user))
.map(at!(1))
.map(Event::into_format)
.collect::<Vec<_>>()
.await;
Ok(Some(LeftRoom {
account_data: RoomAccountData { events: Vec::new() },
timeline: Timeline {
limited: timeline.limited,
prev_batch: Some(current_count.to_string()),
events: raw_timeline_pdus,
},
state: State {
events: state_events.into_iter().map(Event::into_format).collect(),
},
}))
}
async fn build_left_state_and_timeline(
services: &Services,
sync_context: SyncContext<'_>,
room_id: &RoomId,
leave_membership_event: PduEvent,
leave_shortstatehash: ShortStateHash,
prev_membership_event: PduEvent,
) -> Result<(TimelinePdus, Vec<PduEvent>)> {
let SyncContext {
syncing_user,
last_sync_end_count,
filter,
..
} = sync_context;
let timeline_start_count = if let Some(last_sync_end_count) = last_sync_end_count {
// for incremental syncs, start the timeline after `since`
PduCount::Normal(last_sync_end_count)
} else {
// for initial syncs, start the timeline after the previous membership
// event. we don't want to include the membership event itself
// because clients get confused when they see a `join`
// membership event in a `leave` room.
services
.rooms
.timeline
.get_pdu_count(&prev_membership_event.event_id)
.await?
};
// end the timeline at the user's leave event
let timeline_end_count = services
.rooms
.timeline
.get_pdu_count(leave_membership_event.event_id())
.await?;
// limit the timeline using the same logic as for joined rooms
let timeline_limit = filter
.room
.timeline
.limit
.and_then(|limit| limit.try_into().ok())
.unwrap_or(DEFAULT_TIMELINE_LIMIT);
let timeline = load_timeline(
services,
syncing_user,
room_id,
Some(timeline_start_count),
Some(timeline_end_count),
timeline_limit,
)
.await?;
let timeline_start_shortstatehash = async {
if let Some((_, pdu)) = timeline.pdus.front() {
if let Ok(shortstatehash) = services
.rooms
.state_accessor
.pdu_shortstatehash(&pdu.event_id)
.await
{
return shortstatehash;
}
}
// the timeline generally should not be empty (see the TODO further down),
// but in case it is we use `leave_shortstatehash` as the state to
// send
leave_shortstatehash
};
let lazily_loaded_members =
prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders());
let (timeline_start_shortstatehash, lazily_loaded_members) =
join(timeline_start_shortstatehash, lazily_loaded_members).await;
// TODO: calculate incremental state for incremental syncs.
// always calculating initial state _works_ but returns more data and does
// more processing than strictly necessary.
let mut state = build_state_initial(
services,
syncing_user,
timeline_start_shortstatehash,
lazily_loaded_members.as_ref(),
)
.await?;
/*
remove membership events for the syncing user from state.
usually, `state` should include a `join` membership event and `timeline` should include a `leave` one.
however, the matrix-js-sdk gets confused when this happens (see [1]) and doesn't process the room leave,
so we have to filter out the membership from `state`.
NOTE: we are sending more information than synapse does in this scenario, because we always
calculate `state` for initial syncs, even when the sync being performed is incremental.
however, the specification does not forbid sending extraneous events in `state`.
TODO: there is an additional bug at play here. sometimes `load_joined_room` syncs the `leave` event
before `load_left_room` does, which means the `timeline` we sync immediately after a leave is empty.
this shouldn't happen -- `timeline` should always include the `leave` event. this is probably
a race condition with the membership state cache.
[1]: https://github.com/matrix-org/matrix-js-sdk/issues/5071
*/
// `state` should only ever include one membership event for the syncing user
let membership_event_index = state.iter().position(|pdu| {
*pdu.event_type() == TimelineEventType::RoomMember
&& pdu.state_key() == Some(syncing_user.as_str())
});
if let Some(index) = membership_event_index {
// the ordering of events in `state` does not matter
state.swap_remove(index);
}
trace!(
?timeline_start_count,
?timeline_end_count,
"syncing {} timeline events (limited = {}) and {} state events",
timeline.pdus.len(),
timeline.limited,
state.len()
);
Ok((timeline, state))
}
fn create_dummy_leave_event(
services: &Services,
SyncContext { syncing_user, .. }: SyncContext<'_>,
room_id: &RoomId,
) -> PduEvent {
// TODO: because this event ID is random, it could cause caching issues with
// clients. perhaps a database table could be created to hold these dummy
// events, or they could be stored as outliers?
PduEvent {
event_id: EventId::new(services.globals.server_name()),
sender: syncing_user.to_owned(),
origin: None,
origin_server_ts: utils::millis_since_unix_epoch()
.try_into()
.expect("Timestamp is valid js_int value"),
kind: TimelineEventType::RoomMember,
content: RawValue::from_string(r#"{"membership": "leave"}"#.to_owned()).unwrap(),
state_key: Some(syncing_user.as_str().into()),
unsigned: None,
// The following keys are dropped on conversion
room_id: Some(room_id.to_owned()),
prev_events: vec![],
depth: uint!(1),
auth_events: vec![],
redacts: None,
hashes: EventHash { sha256: String::new() },
signatures: None,
}
}

View File

@@ -0,0 +1,502 @@
mod joined;
mod left;
mod state;
use std::{
cmp::{self},
collections::{BTreeMap, HashMap, HashSet},
time::Duration,
};
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Result, extract_variant,
utils::{
ReadyExt, TryFutureExtExt,
stream::{BroadbandExt, Tools, WidebandExt},
},
warn,
};
use conduwuit_service::Services;
use futures::{
FutureExt, StreamExt, TryFutureExt,
future::{OptionFuture, join3, join4, join5},
};
use ruma::{
DeviceId, OwnedUserId, RoomId, UserId,
api::client::{
filter::FilterDefinition,
sync::sync_events::{
self, DeviceLists,
v3::{
Filter, GlobalAccountData, InviteState, InvitedRoom, KnockState, KnockedRoom,
Presence, Rooms, ToDevice,
},
},
uiaa::UiaaResponse,
},
events::{
AnyRawAccountDataEvent,
presence::{PresenceEvent, PresenceEventContent},
},
serde::Raw,
};
use service::rooms::lazy_loading::{self, MemberSet, Options as _};
use super::{load_timeline, share_encrypted_room};
use crate::{
Ruma, RumaResponse,
client::{
is_ignored_invite,
sync::v3::{joined::load_joined_room, left::load_left_room},
},
};
/// The default maximum number of events to return in the `timeline` key of
/// joined and left rooms. If the number of events sent since the last sync
/// exceeds this number, the `timeline` will be `limited`.
const DEFAULT_TIMELINE_LIMIT: usize = 30;
/// A collection of updates to users' device lists, used for E2EE.
struct DeviceListUpdates {
changed: HashSet<OwnedUserId>,
left: HashSet<OwnedUserId>,
}
impl DeviceListUpdates {
fn new() -> Self {
Self {
changed: HashSet::new(),
left: HashSet::new(),
}
}
fn merge(&mut self, other: Self) {
self.changed.extend(other.changed);
self.left.extend(other.left);
}
fn is_empty(&self) -> bool { self.changed.is_empty() && self.left.is_empty() }
}
impl From<DeviceListUpdates> for DeviceLists {
fn from(val: DeviceListUpdates) -> Self {
Self {
changed: val.changed.into_iter().collect(),
left: val.left.into_iter().collect(),
}
}
}
/// References to common data needed to calculate the sync response.
#[derive(Clone, Copy)]
struct SyncContext<'a> {
/// The ID of the user requesting this sync.
syncing_user: &'a UserId,
/// The ID of the device requesting this sync, which will belong to
/// `syncing_user`.
syncing_device: &'a DeviceId,
/// The global count at the end of the previous sync response.
/// The previous sync's `current_count` will become the next sync's
/// `last_sync_end_count`. This will be None if no `since` query parameter
/// was specified, indicating an initial sync.
last_sync_end_count: Option<u64>,
/// The global count as of when we started building the sync response.
/// This is used as an upper bound when querying the database to ensure the
/// response represents a snapshot in time and doesn't include data which
/// appeared while the response was being built.
current_count: u64,
/// The `full_state` query parameter, used when syncing state for joined and
/// left rooms.
full_state: bool,
/// The sync filter, which the client uses to specify what data should be
/// included in the sync response.
filter: &'a FilterDefinition,
}
impl<'a> SyncContext<'a> {
fn lazy_loading_context(&self, room_id: &'a RoomId) -> lazy_loading::Context<'a> {
lazy_loading::Context {
user_id: self.syncing_user,
device_id: Some(self.syncing_device),
room_id,
token: self.last_sync_end_count,
options: Some(&self.filter.room.state.lazy_load_options),
}
}
#[inline]
fn lazy_loading_enabled(&self) -> bool {
(self.filter.room.state.lazy_load_options.is_enabled()
|| self.filter.room.timeline.lazy_load_options.is_enabled())
&& !self.full_state
}
}
type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
/// # `GET /_matrix/client/r0/sync`
///
/// Synchronize the client's state with the latest state on the server.
///
/// - This endpoint takes a `since` parameter which should be the `next_batch`
/// value from a previous request for incremental syncs.
///
/// Calling this endpoint without a `since` parameter returns:
/// - Some of the most recent events of each timeline
/// - Notification counts for each room
/// - Joined and invited member counts, heroes
/// - All state events
///
/// Calling this endpoint with a `since` parameter from a previous `next_batch`
/// returns: For joined rooms:
/// - Some of the most recent events of each timeline that happened after since
/// - If user joined the room after since: All state events (unless lazy loading
/// is activated) and all device list updates in that room
/// - If the user was already in the room: A list of all events that are in the
/// state now, but were not in the state at `since`
/// - If the state we send contains a member event: Joined and invited member
/// counts, heroes
/// - Device list updates that happened after `since`
/// - If there are events in the timeline we send or the user send updated his
/// read mark: Notification counts
/// - EDUs that are active now (read receipts, typing updates, presence)
/// - TODO: Allow multiple sync streams to support Pantalaimon
///
/// For invited rooms:
/// - If the user was invited after `since`: A subset of the state of the room
/// at the point of the invite
///
/// For left rooms:
/// - If the user left after `since`: `prev_batch` token, empty state (TODO:
/// subset of the state at the point of the leave)
#[tracing::instrument(
name = "sync",
level = "debug",
skip_all,
fields(
since = %body.body.since.as_deref().unwrap_or_default(),
)
)]
pub(crate) async fn sync_events_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<sync_events::v3::Request>,
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
let (sender_user, sender_device) = body.sender();
// Presence update
if services.config.allow_local_presence {
services
.presence
.ping_presence(sender_user, &body.body.set_presence)
.await?;
}
// Increment the "device last active" metadata
services
.users
.update_device_last_seen(sender_user, Some(sender_device), client_ip)
.await;
// Setup watchers, so if there's no response, we can wait for them
let watcher = services.sync.watch(sender_user, sender_device);
let response = build_sync_events(&services, &body).await?;
if body.body.full_state
|| !(response.rooms.is_empty()
&& response.presence.is_empty()
&& response.account_data.is_empty()
&& response.device_lists.is_empty()
&& response.to_device.is_empty())
{
return Ok(response);
}
// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let default = Duration::from_secs(30);
let duration = cmp::min(body.body.timeout.unwrap_or(default), default);
_ = tokio::time::timeout(duration, watcher).await;
// Retry returning data
build_sync_events(&services, &body).await
}
pub(crate) async fn build_sync_events(
services: &Services,
body: &Ruma<sync_events::v3::Request>,
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
let (syncing_user, syncing_device) = body.sender();
let current_count = services.globals.current_count()?;
// the `since` token is the last sync end count stringified
let last_sync_end_count = body
.body
.since
.as_ref()
.and_then(|string| string.parse().ok());
let full_state = body.body.full_state;
// FilterDefinition is very large (0x1000 bytes), let's put it on the heap
let filter = Box::new(match body.body.filter.as_ref() {
// use the default filter if none was specified
| None => FilterDefinition::default(),
// use inline filters directly
| Some(Filter::FilterDefinition(filter)) => filter.clone(),
// look up filter IDs from the database
| Some(Filter::FilterId(filter_id)) => services
.users
.get_filter(syncing_user, filter_id)
.await
.unwrap_or_default(),
});
let context = SyncContext {
syncing_user,
syncing_device,
last_sync_end_count,
current_count,
full_state,
filter: &filter,
};
let joined_rooms = services
.rooms
.state_cache
.rooms_joined(syncing_user)
.map(ToOwned::to_owned)
.broad_filter_map(|room_id| async {
let joined_room = load_joined_room(services, context, room_id.clone()).await;
match joined_room {
| Ok((room, updates)) => Some((room_id, room, updates)),
| Err(err) => {
warn!(?err, ?room_id, "error loading joined room {}", room_id);
None
},
}
})
.ready_fold(
(BTreeMap::new(), DeviceListUpdates::new()),
|(mut joined_rooms, mut all_updates), (room_id, joined_room, updates)| {
all_updates.merge(updates);
if !joined_room.is_empty() {
joined_rooms.insert(room_id, joined_room);
}
(joined_rooms, all_updates)
},
);
let left_rooms = services
.rooms
.state_cache
.rooms_left(syncing_user)
.broad_filter_map(|(room_id, leave_pdu)| {
load_left_room(services, context, room_id.clone(), leave_pdu)
.map_ok(move |left_room| (room_id, left_room))
.ok()
})
.ready_filter_map(|(room_id, left_room)| left_room.map(|left_room| (room_id, left_room)))
.collect();
let invited_rooms = services
.rooms
.state_cache
.rooms_invited(syncing_user)
.wide_filter_map(async |(room_id, invite_state)| {
if is_ignored_invite(services, syncing_user, &room_id).await {
None
} else {
Some((room_id, invite_state))
}
})
.fold_default(|mut invited_rooms: BTreeMap<_, _>, (room_id, invite_state)| async move {
let invite_count = services
.rooms
.state_cache
.get_invite_count(&room_id, syncing_user)
.await
.ok();
// only sync this invite if it was sent after the last /sync call
if last_sync_end_count < invite_count {
let invited_room = InvitedRoom {
invite_state: InviteState { events: invite_state },
};
invited_rooms.insert(room_id, invited_room);
}
invited_rooms
});
let knocked_rooms = services
.rooms
.state_cache
.rooms_knocked(syncing_user)
.fold_default(|mut knocked_rooms: BTreeMap<_, _>, (room_id, knock_state)| async move {
let knock_count = services
.rooms
.state_cache
.get_knock_count(&room_id, syncing_user)
.await
.ok();
// only sync this knock if it was sent after the last /sync call
if last_sync_end_count < knock_count {
let knocked_room = KnockedRoom {
knock_state: KnockState { events: knock_state },
};
knocked_rooms.insert(room_id, knocked_room);
}
knocked_rooms
});
let presence_updates: OptionFuture<_> = services
.config
.allow_local_presence
.then(|| process_presence_updates(services, last_sync_end_count, syncing_user))
.into();
let account_data = services
.account_data
.changes_since(None, syncing_user, last_sync_end_count, Some(current_count))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
.collect();
// Look for device list updates of this account
let keys_changed = services
.users
.keys_changed(syncing_user, last_sync_end_count, Some(current_count))
.map(ToOwned::to_owned)
.collect::<HashSet<_>>();
let to_device_events = services
.users
.get_to_device_events(
syncing_user,
syncing_device,
last_sync_end_count,
Some(current_count),
)
.collect::<Vec<_>>();
let device_one_time_keys_count = services
.users
.count_one_time_keys(syncing_user, syncing_device);
// Remove all to-device events the device received *last time*
let remove_to_device_events =
services
.users
.remove_to_device_events(syncing_user, syncing_device, last_sync_end_count);
let rooms = join4(joined_rooms, left_rooms, invited_rooms, knocked_rooms);
let ephemeral = join3(remove_to_device_events, to_device_events, presence_updates);
let top = join5(account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms)
.boxed()
.await;
let (account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) = top;
let ((), to_device_events, presence_updates) = ephemeral;
let (joined_rooms, left_rooms, invited_rooms, knocked_rooms) = rooms;
let (joined_rooms, mut device_list_updates) = joined_rooms;
device_list_updates.changed.extend(keys_changed);
let response = sync_events::v3::Response {
account_data: GlobalAccountData { events: account_data },
device_lists: device_list_updates.into(),
device_one_time_keys_count,
// Fallback keys are not yet supported
device_unused_fallback_key_types: None,
next_batch: current_count.to_string(),
presence: Presence {
events: presence_updates
.into_iter()
.flat_map(IntoIterator::into_iter)
.map(|(sender, content)| PresenceEvent { content, sender })
.map(|ref event| Raw::new(event))
.filter_map(Result::ok)
.collect(),
},
rooms: Rooms {
leave: left_rooms,
join: joined_rooms,
invite: invited_rooms,
knock: knocked_rooms,
},
to_device: ToDevice { events: to_device_events },
};
Ok(response)
}
#[tracing::instrument(name = "presence", level = "debug", skip_all)]
async fn process_presence_updates(
services: &Services,
last_sync_end_count: Option<u64>,
syncing_user: &UserId,
) -> PresenceUpdates {
services
.presence
.presence_since(last_sync_end_count.unwrap_or(0)) // send all presences on initial sync
.filter(|(user_id, ..)| {
services
.rooms
.state_cache
.user_sees_user(syncing_user, user_id)
})
.filter_map(|(user_id, _, presence_bytes)| {
services
.presence
.from_json_bytes_to_event(presence_bytes, user_id)
.map_ok(move |event| (user_id, event))
.ok()
})
.map(|(user_id, event)| (user_id.to_owned(), event.content))
.collect()
.await
}
/// Using the provided sync context and an iterator of user IDs in the
/// `timeline`, return a HashSet of user IDs whose membership events should be
/// sent to the client if lazy-loading is enabled.
#[allow(clippy::let_and_return)]
async fn prepare_lazily_loaded_members(
services: &Services,
sync_context: SyncContext<'_>,
room_id: &RoomId,
timeline_members: impl Iterator<Item = OwnedUserId>,
) -> Option<MemberSet> {
let lazy_loading_context = &sync_context.lazy_loading_context(room_id);
// reset lazy loading state on initial sync.
// do this even if lazy loading is disabled so future lazy loads
// will have the correct members.
if sync_context.last_sync_end_count.is_none() {
services
.rooms
.lazy_loading
.reset(lazy_loading_context)
.await;
}
// filter the input members through `retain_lazy_members`, which
// contains the actual lazy loading logic.
let lazily_loaded_members =
OptionFuture::from(sync_context.lazy_loading_enabled().then(|| {
services
.rooms
.lazy_loading
.retain_lazy_members(timeline_members.collect(), lazy_loading_context)
}))
.await;
lazily_loaded_members
}

View File

@@ -0,0 +1,280 @@
use std::{collections::BTreeSet, ops::ControlFlow};
use conduwuit::{
Result, at, is_equal_to,
matrix::{
Event,
pdu::{PduCount, PduEvent},
},
utils::{
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
stream::{BroadbandExt, TryIgnore},
},
};
use conduwuit_service::{
Services,
rooms::{lazy_loading::MemberSet, short::ShortStateHash},
};
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use ruma::{OwnedEventId, RoomId, UserId, events::StateEventType};
use service::rooms::short::ShortEventId;
use tracing::trace;
use crate::client::TimelinePdus;
/// Calculate the state events to include in an initial sync response.
///
/// If lazy-loading is enabled (`lazily_loaded_members` is Some), the returned
/// Vec will include the membership events of exclusively the members in
/// `lazily_loaded_members`.
#[tracing::instrument(
name = "initial",
level = "trace",
skip_all,
fields(current_shortstatehash)
)]
#[allow(clippy::too_many_arguments)]
pub(super) async fn build_state_initial(
services: &Services,
sender_user: &UserId,
timeline_start_shortstatehash: ShortStateHash,
lazily_loaded_members: Option<&MemberSet>,
) -> Result<Vec<PduEvent>> {
// load the keys and event IDs of the state events at the start of the timeline
let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services
.rooms
.state_accessor
.state_full_ids(timeline_start_shortstatehash)
.unzip()
.await;
trace!("performing initial sync of {} state events", event_ids.len());
services
.rooms
.short
// look up the full state keys
.multi_get_statekey_from_short(shortstatekeys.into_iter().stream())
.zip(event_ids.into_iter().stream())
.ready_filter_map(|item| Some((item.0.ok()?, item.1)))
.ready_filter_map(|((event_type, state_key), event_id)| {
if let Some(lazily_loaded_members) = lazily_loaded_members {
/*
if lazy loading is enabled, filter out membership events which aren't for a user
included in `lazily_loaded_members` or for the user requesting the sync.
*/
let event_is_redundant = event_type == StateEventType::RoomMember
&& state_key.as_str().try_into().is_ok_and(|user_id: &UserId| {
sender_user != user_id && !lazily_loaded_members.contains(user_id)
});
event_is_redundant.or_some(event_id)
} else {
Some(event_id)
}
})
.broad_filter_map(|event_id: OwnedEventId| async move {
services.rooms.timeline.get_pdu(&event_id).await.ok()
})
.collect()
.map(Ok)
.await
}
/// Calculate the state events to include in an incremental sync response.
///
/// If lazy-loading is enabled (`lazily_loaded_members` is Some), the returned
/// Vec will include the membership events of all the members in
/// `lazily_loaded_members`.
#[tracing::instrument(name = "incremental", level = "trace", skip_all)]
#[allow(clippy::too_many_arguments)]
pub(super) async fn build_state_incremental<'a>(
services: &Services,
sender_user: &'a UserId,
room_id: &RoomId,
last_sync_end_count: PduCount,
last_sync_end_shortstatehash: ShortStateHash,
timeline_start_shortstatehash: ShortStateHash,
timeline_end_shortstatehash: ShortStateHash,
timeline: &TimelinePdus,
lazily_loaded_members: Option<&'a MemberSet>,
) -> Result<Vec<PduEvent>> {
/*
NB: a limited sync is one where `timeline.limited == true`. Synapse calls this a "gappy" sync internally.
The algorithm implemented in this function is, currently, quite different from the algorithm vaguely described
by the Matrix specification. This is because the specification's description of the `state` property does not accurately
reflect how Synapse behaves, and therefore how client SDKs behave. Notable differences include:
1. We do not compute the delta using the naive approach of "every state event from the end of the last sync
up to the start of this sync's timeline". see below for details.
2. If lazy-loading is enabled, we include lazily-loaded membership events. The specific users to include are determined
elsewhere and supplied to this function in the `lazily_loaded_members` parameter.
*/
/*
the `state` property of an incremental sync which isn't limited are _usually_ empty.
(note: the specification says that the `state` property is _always_ empty for limited syncs, which is incorrect.)
however, if an event in the timeline (`timeline.pdus`) merges a split in the room's DAG (i.e. has multiple `prev_events`),
the state at the _end_ of the timeline may include state events which were merged in and don't exist in the state
at the _start_ of the timeline. because this is uncommon, we check here to see if any events in the timeline
merged a split in the DAG.
see: https://github.com/element-hq/synapse/issues/16941
*/
let timeline_is_linear = timeline.pdus.is_empty() || {
let last_pdu_of_last_sync = services
.rooms
.timeline
.pdus_rev(Some(sender_user), room_id, Some(last_sync_end_count.saturating_add(1)))
.boxed()
.next()
.await
.transpose()
.expect("last sync should have had some PDUs")
.map(at!(1));
// make sure the prev_events of each pdu in the timeline refer only to the
// previous pdu
timeline
.pdus
.iter()
.try_fold(last_pdu_of_last_sync.map(|pdu| pdu.event_id), |prev_event_id, (_, pdu)| {
if let Ok(pdu_prev_event_id) = pdu.prev_events.iter().exactly_one() {
if prev_event_id
.as_ref()
.is_none_or(is_equal_to!(pdu_prev_event_id))
{
return ControlFlow::Continue(Some(pdu_prev_event_id.to_owned()));
}
}
trace!(
"pdu {:?} has split prev_events (expected {:?}): {:?}",
pdu.event_id, prev_event_id, pdu.prev_events
);
ControlFlow::Break(())
})
.is_continue()
};
if timeline_is_linear && !timeline.limited {
// if there are no splits in the DAG and the timeline isn't limited, then
// `state` will always be empty unless lazy loading is enabled.
if let Some(lazily_loaded_members) = lazily_loaded_members {
if !timeline.pdus.is_empty() {
// lazy loading is enabled, so we return the membership events which were
// requested by the caller.
let lazy_membership_events: Vec<_> = lazily_loaded_members
.iter()
.stream()
.broad_filter_map(|user_id| async move {
if user_id == sender_user {
return None;
}
services
.rooms
.state_accessor
.state_get(
timeline_start_shortstatehash,
&StateEventType::RoomMember,
user_id.as_str(),
)
.ok()
.await
})
.collect()
.await;
if !lazy_membership_events.is_empty() {
trace!(
"syncing lazy membership events for members: {:?}",
lazy_membership_events
.iter()
.map(|pdu| pdu.state_key().unwrap())
.collect::<Vec<_>>()
);
}
return Ok(lazy_membership_events);
}
}
// lazy loading is disabled, `state` is empty.
return Ok(vec![]);
}
/*
at this point, either the timeline is `limited` or the DAG has a split in it. this necessitates
computing the incremental state (which may be empty).
NOTE: this code path does not use the `lazy_membership_events` parameter. any changes to membership will be included
in the incremental state. therefore, the incremental state may include "redundant" membership events,
which we do not filter out because A. the spec forbids lazy-load filtering if the timeline is `limited`,
and B. DAG splits which require sending extra membership state events are (probably) uncommon enough that
the performance penalty is acceptable.
*/
trace!(?timeline_is_linear, ?timeline.limited, "computing state for incremental sync");
// fetch the shorteventids of state events in the timeline
let state_events_in_timeline: BTreeSet<ShortEventId> = services
.rooms
.short
.multi_get_or_create_shorteventid(timeline.pdus.iter().filter_map(|(_, pdu)| {
if pdu.state_key().is_some() {
Some(pdu.event_id.as_ref())
} else {
None
}
}))
.collect()
.await;
trace!("{} state events in timeline", state_events_in_timeline.len());
/*
fetch the state events which were added since the last sync.
specifically we fetch the difference between the state at the last sync and the state at the _end_
of the timeline, and then we filter out state events in the timeline itself using the shorteventids we fetched.
this is necessary to account for splits in the DAG, as explained above.
*/
let state_diff = services
.rooms
.short
.multi_get_eventid_from_short::<'_, OwnedEventId, _>(
services
.rooms
.state_accessor
.state_added((last_sync_end_shortstatehash, timeline_end_shortstatehash))
.await?
.stream()
.ready_filter_map(|(_, shorteventid)| {
if state_events_in_timeline.contains(&shorteventid) {
None
} else {
Some(shorteventid)
}
}),
)
.ignore_err();
// finally, fetch the PDU contents and collect them into a vec
let state_diff_pdus = state_diff
.broad_filter_map(|event_id| async move {
services
.rooms
.timeline
.get_non_outlier_pdu(&event_id)
.await
.ok()
})
.collect::<Vec<_>>()
.await;
trace!(?state_diff_pdus, "collected state PDUs for incremental sync");
Ok(state_diff_pdus)
}

View File

@@ -1,848 +0,0 @@
use std::{
cmp::{self, Ordering},
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
time::Duration,
};
use axum::extract::State;
use conduwuit::{
Err, Error, Event, PduCount, Result, at, debug, error, extract_variant,
matrix::TypeStateKey,
utils::{
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
math::{ruma_from_usize, usize_from_ruma, usize_from_u64_truncated},
stream::WidebandExt,
},
warn,
};
use conduwuit_service::{
Services,
rooms::read_receipt::pack_receipts,
sync::{into_db_key, into_snake_key},
};
use futures::{FutureExt, StreamExt, TryFutureExt};
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, RoomId, UInt, UserId,
api::client::sync::sync_events::{
self, DeviceLists, UnreadNotificationsCount,
v4::{SlidingOp, SlidingSyncRoomHero},
},
directory::RoomTypeFilter,
events::{
AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType,
TimelineEventType::*,
room::member::{MembershipState, RoomMemberEventContent},
},
serde::Raw,
uint,
};
use super::{load_timeline, share_encrypted_room};
use crate::{
Ruma,
client::{DEFAULT_BUMP_TYPES, ignored_filter, is_ignored_invite},
};
type TodoRooms = BTreeMap<OwnedRoomId, (BTreeSet<TypeStateKey>, usize, u64)>;
const SINGLE_CONNECTION_SYNC: &str = "single_connection_sync";
#[allow(clippy::cognitive_complexity)]
/// POST `/_matrix/client/unstable/org.matrix.msc3575/sync`
///
/// Sliding Sync endpoint (future endpoint: `/_matrix/client/v4/sync`)
pub(crate) async fn sync_events_v4_route(
State(services): State<crate::State>,
body: Ruma<sync_events::v4::Request>,
) -> Result<sync_events::v4::Response> {
debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted");
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_ref().expect("user is authenticated");
let mut body = body.body;
// Setup watchers, so if there's no response, we can wait for them
let watcher = services.sync.watch(sender_user, sender_device);
let next_batch = services.globals.next_count()?;
let conn_id = body
.conn_id
.clone()
.unwrap_or_else(|| SINGLE_CONNECTION_SYNC.to_owned());
let globalsince = body
.pos
.as_ref()
.and_then(|string| string.parse().ok())
.unwrap_or(0);
let db_key = into_db_key(sender_user, sender_device, conn_id.clone());
if globalsince != 0 && !services.sync.remembered(&db_key) {
debug!("Restarting sync stream because it was gone from the database");
return Err!(Request(UnknownPos("Connection data lost since last time")));
}
if globalsince == 0 {
services.sync.forget_sync_request_connection(&db_key);
}
// Get sticky parameters from cache
let snake_key = into_snake_key(sender_user, sender_device, conn_id.clone());
let known_rooms = services
.sync
.update_sync_request_with_cache(&snake_key, &mut body);
let all_joined_rooms: Vec<_> = services
.rooms
.state_cache
.rooms_joined(sender_user)
.map(ToOwned::to_owned)
.collect()
.await;
let all_invited_rooms: Vec<_> = services
.rooms
.state_cache
.rooms_invited(sender_user)
.wide_filter_map(async |(room_id, invite_state)| {
if is_ignored_invite(&services, sender_user, &room_id).await {
None
} else {
Some((room_id, invite_state))
}
})
.map(|r| r.0)
.collect()
.await;
let all_knocked_rooms: Vec<_> = services
.rooms
.state_cache
.rooms_knocked(sender_user)
.map(|r| r.0)
.collect()
.await;
let all_invited_rooms: Vec<&RoomId> = all_invited_rooms.iter().map(AsRef::as_ref).collect();
let all_knocked_rooms: Vec<&RoomId> = all_knocked_rooms.iter().map(AsRef::as_ref).collect();
let all_rooms: Vec<&RoomId> = all_joined_rooms
.iter()
.map(AsRef::as_ref)
.chain(all_invited_rooms.iter().map(AsRef::as_ref))
.chain(all_knocked_rooms.iter().map(AsRef::as_ref))
.collect();
let all_joined_rooms = all_joined_rooms.iter().map(AsRef::as_ref).collect();
let all_invited_rooms = all_invited_rooms.iter().map(AsRef::as_ref).collect();
if body.extensions.to_device.enabled.unwrap_or(false) {
services
.users
.remove_to_device_events(sender_user, sender_device, globalsince)
.await;
}
let mut left_encrypted_users = HashSet::new(); // Users that have left any encrypted rooms the sender was in
let mut device_list_changes = HashSet::new();
let mut device_list_left = HashSet::new();
let mut receipts = sync_events::v4::Receipts { rooms: BTreeMap::new() };
let mut account_data = sync_events::v4::AccountData {
global: Vec::new(),
rooms: BTreeMap::new(),
};
if body.extensions.account_data.enabled.unwrap_or(false) {
account_data.global = services
.account_data
.changes_since(None, sender_user, globalsince, Some(next_batch))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
.collect()
.await;
if let Some(rooms) = body.extensions.account_data.rooms {
for room in rooms {
account_data.rooms.insert(
room.clone(),
services
.account_data
.changes_since(Some(&room), sender_user, globalsince, Some(next_batch))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect()
.await,
);
}
}
}
if body.extensions.e2ee.enabled.unwrap_or(false) {
// Look for device list updates of this account
device_list_changes.extend(
services
.users
.keys_changed(sender_user, globalsince, None)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
.await,
);
for room_id in &all_joined_rooms {
let room_id: &&RoomId = room_id;
let Ok(current_shortstatehash) =
services.rooms.state.get_room_shortstatehash(room_id).await
else {
error!("Room {room_id} has no state");
continue;
};
let since_shortstatehash = services
.rooms
.user
.get_token_shortstatehash(room_id, globalsince)
.await
.ok();
let encrypted_room = services
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")
.await
.is_ok();
if let Some(since_shortstatehash) = since_shortstatehash {
// Skip if there are only timeline changes
if since_shortstatehash == current_shortstatehash {
continue;
}
let since_encryption = services
.rooms
.state_accessor
.state_get(since_shortstatehash, &StateEventType::RoomEncryption, "")
.await;
let since_sender_member: Option<RoomMemberEventContent> = services
.rooms
.state_accessor
.state_get_content(
since_shortstatehash,
&StateEventType::RoomMember,
sender_user.as_str(),
)
.ok()
.await;
let joined_since_last_sync = since_sender_member
.as_ref()
.is_none_or(|member| member.membership != MembershipState::Join);
let new_encrypted_room = encrypted_room && since_encryption.is_err();
if encrypted_room {
let current_state_ids: HashMap<_, OwnedEventId> = services
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.collect()
.await;
let since_state_ids: HashMap<_, _> = services
.rooms
.state_accessor
.state_full_ids(since_shortstatehash)
.collect()
.await;
for (key, id) in current_state_ids {
if since_state_ids.get(&key) != Some(&id) {
let Ok(pdu) = services.rooms.timeline.get_pdu(&id).await else {
error!("Pdu in state not found: {id}");
continue;
};
if pdu.kind == RoomMember {
if let Some(Ok(user_id)) =
pdu.state_key.as_deref().map(UserId::parse)
{
if user_id == sender_user {
continue;
}
let content: RoomMemberEventContent = pdu.get_content()?;
match content.membership {
| MembershipState::Join => {
// A new user joined an encrypted room
if !share_encrypted_room(
&services,
sender_user,
user_id,
Some(room_id),
)
.await
{
device_list_changes.insert(user_id.to_owned());
}
},
| MembershipState::Leave => {
// Write down users that have left encrypted rooms we
// are in
left_encrypted_users.insert(user_id.to_owned());
},
| _ => {},
}
}
}
}
}
if joined_since_last_sync || new_encrypted_room {
// If the user is in a new encrypted room, give them all joined users
device_list_changes.extend(
services
.rooms
.state_cache
.room_members(room_id)
// Don't send key updates from the sender to the sender
.ready_filter(|&user_id| sender_user != user_id)
// Only send keys if the sender doesn't share an encrypted room with the target
// already
.filter_map(|user_id| {
share_encrypted_room(&services, sender_user, user_id, Some(room_id))
.map(|res| res.or_some(user_id.to_owned()))
})
.collect::<Vec<_>>()
.await,
);
}
}
}
// Look for device list updates in this room
device_list_changes.extend(
services
.users
.room_keys_changed(room_id, globalsince, None)
.map(|(user_id, _)| user_id)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
.await,
);
}
for user_id in left_encrypted_users {
let dont_share_encrypted_room =
!share_encrypted_room(&services, sender_user, &user_id, None).await;
// If the user doesn't share an encrypted room with the target anymore, we need
// to tell them
if dont_share_encrypted_room {
device_list_left.insert(user_id);
}
}
}
let mut lists = BTreeMap::new();
let mut todo_rooms: TodoRooms = BTreeMap::new(); // and required state
for (list_id, list) in &body.lists {
let active_rooms = match list.filters.clone().and_then(|f| f.is_invite) {
| Some(true) => &all_invited_rooms,
| Some(false) => &all_joined_rooms,
| None => &all_rooms,
};
let active_rooms = match list.filters.clone().map(|f| f.not_room_types) {
| Some(filter) if filter.is_empty() => active_rooms.clone(),
| Some(value) => filter_rooms(&services, active_rooms, &value, true).await,
| None => active_rooms.clone(),
};
let active_rooms = match list.filters.clone().map(|f| f.room_types) {
| Some(filter) if filter.is_empty() => active_rooms.clone(),
| Some(value) => filter_rooms(&services, &active_rooms, &value, false).await,
| None => active_rooms,
};
let mut new_known_rooms: BTreeSet<OwnedRoomId> = BTreeSet::new();
let ranges = list.ranges.clone();
lists.insert(list_id.clone(), sync_events::v4::SyncList {
ops: ranges
.into_iter()
.map(|mut r| {
r.0 = r.0.clamp(
uint!(0),
UInt::try_from(active_rooms.len().saturating_sub(1)).unwrap_or(UInt::MAX),
);
r.1 = r.1.clamp(
r.0,
UInt::try_from(active_rooms.len().saturating_sub(1)).unwrap_or(UInt::MAX),
);
let room_ids = if !active_rooms.is_empty() {
active_rooms[usize_from_ruma(r.0)..=usize_from_ruma(r.1)].to_vec()
} else {
Vec::new()
};
new_known_rooms.extend(room_ids.clone().into_iter().map(ToOwned::to_owned));
for room_id in &room_ids {
let todo_room = todo_rooms.entry((*room_id).to_owned()).or_insert((
BTreeSet::new(),
0_usize,
u64::MAX,
));
let limit: usize = list
.room_details
.timeline_limit
.map(u64::from)
.map_or(10, usize_from_u64_truncated)
.min(100);
todo_room.0.extend(
list.room_details
.required_state
.iter()
.map(|(ty, sk)| (ty.clone(), sk.as_str().into())),
);
todo_room.1 = todo_room.1.max(limit);
// 0 means unknown because it got out of date
todo_room.2 = todo_room.2.min(
known_rooms
.get(list_id.as_str())
.and_then(|k| k.get(*room_id))
.copied()
.unwrap_or(0),
);
}
sync_events::v4::SyncOp {
op: SlidingOp::Sync,
range: Some(r),
index: None,
room_ids: room_ids.into_iter().map(ToOwned::to_owned).collect(),
room_id: None,
}
})
.collect(),
count: ruma_from_usize(active_rooms.len()),
});
if let Some(conn_id) = &body.conn_id {
let db_key = into_db_key(sender_user, sender_device, conn_id);
services.sync.update_sync_known_rooms(
&db_key,
list_id.clone(),
new_known_rooms,
globalsince,
);
}
}
let mut known_subscription_rooms = BTreeSet::new();
for (room_id, room) in &body.room_subscriptions {
if !services.rooms.metadata.exists(room_id).await
|| services.rooms.metadata.is_disabled(room_id).await
|| services.rooms.metadata.is_banned(room_id).await
{
continue;
}
let todo_room =
todo_rooms
.entry(room_id.clone())
.or_insert((BTreeSet::new(), 0_usize, u64::MAX));
let limit: usize = room
.timeline_limit
.map(u64::from)
.map_or(10, usize_from_u64_truncated)
.min(100);
todo_room.0.extend(
room.required_state
.iter()
.map(|(ty, sk)| (ty.clone(), sk.as_str().into())),
);
todo_room.1 = todo_room.1.max(limit);
// 0 means unknown because it got out of date
todo_room.2 = todo_room.2.min(
known_rooms
.get("subscriptions")
.and_then(|k| k.get(room_id))
.copied()
.unwrap_or(0),
);
known_subscription_rooms.insert(room_id.clone());
}
for r in body.unsubscribe_rooms {
known_subscription_rooms.remove(&r);
body.room_subscriptions.remove(&r);
}
if let Some(conn_id) = &body.conn_id {
let db_key = into_db_key(sender_user, sender_device, conn_id);
services.sync.update_sync_known_rooms(
&db_key,
"subscriptions".to_owned(),
known_subscription_rooms,
globalsince,
);
}
if let Some(conn_id) = body.conn_id.clone() {
let db_key = into_db_key(sender_user, sender_device, conn_id);
services
.sync
.update_sync_subscriptions(&db_key, body.room_subscriptions);
}
let mut rooms = BTreeMap::new();
for (room_id, (required_state_request, timeline_limit, roomsince)) in &todo_rooms {
let roomsincecount = PduCount::Normal(*roomsince);
let mut timestamp: Option<_> = None;
let mut invite_state = None;
let (timeline_pdus, limited);
let new_room_id: &RoomId = (*room_id).as_ref();
if all_invited_rooms.contains(&new_room_id) {
// TODO: figure out a timestamp we can use for remote invites
invite_state = services
.rooms
.state_cache
.invite_state(sender_user, room_id)
.await
.ok();
(timeline_pdus, limited) = (Vec::new(), true);
} else {
(timeline_pdus, limited) = match load_timeline(
&services,
sender_user,
room_id,
roomsincecount,
None,
*timeline_limit,
)
.await
{
| Ok(value) => value,
| Err(err) => {
warn!("Encountered missing timeline in {}, error {}", room_id, err);
continue;
},
};
}
account_data.rooms.insert(
room_id.to_owned(),
services
.account_data
.changes_since(Some(room_id), sender_user, *roomsince, Some(next_batch))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect()
.await,
);
let last_privateread_update = services
.rooms
.read_receipt
.last_privateread_update(sender_user, room_id)
.await > *roomsince;
let private_read_event = if last_privateread_update {
services
.rooms
.read_receipt
.private_read_get(room_id, sender_user)
.await
.ok()
} else {
None
};
let mut vector: Vec<Raw<AnySyncEphemeralRoomEvent>> = services
.rooms
.read_receipt
.readreceipts_since(room_id, *roomsince)
.filter_map(|(read_user, _ts, v)| async move {
services
.users
.user_is_ignored(read_user, sender_user)
.await
.or_some(v)
})
.collect()
.await;
if let Some(private_read_event) = private_read_event {
vector.push(private_read_event);
}
let receipt_size = vector.len();
receipts
.rooms
.insert(room_id.clone(), pack_receipts(Box::new(vector.into_iter())));
if roomsince != &0
&& timeline_pdus.is_empty()
&& account_data.rooms.get(room_id).is_some_and(Vec::is_empty)
&& receipt_size == 0
{
continue;
}
let prev_batch = timeline_pdus
.first()
.map_or(Ok::<_, Error>(None), |(pdu_count, _)| {
Ok(Some(match pdu_count {
| PduCount::Backfilled(_) => {
error!("timeline in backfill state?!");
"0".to_owned()
},
| PduCount::Normal(c) => c.to_string(),
}))
})?
.or_else(|| {
if roomsince != &0 {
Some(roomsince.to_string())
} else {
None
}
});
let room_events: Vec<_> = timeline_pdus
.iter()
.stream()
.filter_map(|item| ignored_filter(&services, item.clone(), sender_user))
.map(at!(1))
.map(Event::into_format)
.collect()
.await;
for (_, pdu) in timeline_pdus {
let ts = MilliSecondsSinceUnixEpoch(pdu.origin_server_ts);
if DEFAULT_BUMP_TYPES.binary_search(&pdu.kind).is_ok()
&& timestamp.is_none_or(|time| time <= ts)
{
timestamp = Some(ts);
}
}
let required_state = required_state_request
.iter()
.stream()
.filter_map(|state| async move {
services
.rooms
.state_accessor
.room_state_get(room_id, &state.0, &state.1)
.await
.map(Event::into_format)
.ok()
})
.collect()
.await;
// Heroes
let heroes: Vec<_> = services
.rooms
.state_cache
.room_members(room_id)
.ready_filter(|&member| member != sender_user)
.filter_map(|user_id| {
services
.rooms
.state_accessor
.get_member(room_id, user_id)
.map_ok(|memberevent| SlidingSyncRoomHero {
user_id: user_id.into(),
name: memberevent.displayname,
avatar: memberevent.avatar_url,
})
.ok()
})
.take(5)
.collect()
.await;
let name = match heroes.len().cmp(&(1_usize)) {
| Ordering::Greater => {
let firsts = heroes[1..]
.iter()
.map(|h| h.name.clone().unwrap_or_else(|| h.user_id.to_string()))
.collect::<Vec<_>>()
.join(", ");
let last = heroes[0]
.name
.clone()
.unwrap_or_else(|| heroes[0].user_id.to_string());
Some(format!("{firsts} and {last}"))
},
| Ordering::Equal => Some(
heroes[0]
.name
.clone()
.unwrap_or_else(|| heroes[0].user_id.to_string()),
),
| Ordering::Less => None,
};
let heroes_avatar = if heroes.len() == 1 {
heroes[0].avatar.clone()
} else {
None
};
rooms.insert(room_id.clone(), sync_events::v4::SlidingSyncRoom {
name: services
.rooms
.state_accessor
.get_name(room_id)
.await
.ok()
.or(name),
avatar: match heroes_avatar {
| Some(heroes_avatar) => ruma::JsOption::Some(heroes_avatar),
| _ => match services.rooms.state_accessor.get_avatar(room_id).await {
| ruma::JsOption::Some(avatar) => ruma::JsOption::from_option(avatar.url),
| ruma::JsOption::Null => ruma::JsOption::Null,
| ruma::JsOption::Undefined => ruma::JsOption::Undefined,
},
},
initial: Some(roomsince == &0),
is_dm: None,
invite_state,
unread_notifications: UnreadNotificationsCount {
highlight_count: Some(
services
.rooms
.user
.highlight_count(sender_user, room_id)
.await
.try_into()
.expect("notification count can't go that high"),
),
notification_count: Some(
services
.rooms
.user
.notification_count(sender_user, room_id)
.await
.try_into()
.expect("notification count can't go that high"),
),
},
timeline: room_events,
required_state,
prev_batch,
limited,
joined_count: Some(
services
.rooms
.state_cache
.room_joined_count(room_id)
.await
.unwrap_or(0)
.try_into()
.unwrap_or_else(|_| uint!(0)),
),
invited_count: Some(
services
.rooms
.state_cache
.room_invited_count(room_id)
.await
.unwrap_or(0)
.try_into()
.unwrap_or_else(|_| uint!(0)),
),
num_live: None, // Count events in timeline greater than global sync counter
timestamp,
heroes: Some(heroes),
});
}
if rooms.iter().all(|(id, r)| {
r.timeline.is_empty() && r.required_state.is_empty() && !receipts.rooms.contains_key(id)
}) {
// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let default = Duration::from_secs(30);
let duration = cmp::min(body.timeout.unwrap_or(default), default);
_ = tokio::time::timeout(duration, watcher).await;
}
Ok(sync_events::v4::Response {
initial: globalsince == 0,
txn_id: body.txn_id.clone(),
pos: next_batch.to_string(),
lists,
rooms,
extensions: sync_events::v4::Extensions {
to_device: if body.extensions.to_device.enabled.unwrap_or(false) {
Some(sync_events::v4::ToDevice {
events: services
.users
.get_to_device_events(
sender_user,
sender_device,
Some(globalsince),
Some(next_batch),
)
.collect()
.await,
next_batch: next_batch.to_string(),
})
} else {
None
},
e2ee: sync_events::v4::E2EE {
device_lists: DeviceLists {
changed: device_list_changes.into_iter().collect(),
left: device_list_left.into_iter().collect(),
},
device_one_time_keys_count: services
.users
.count_one_time_keys(sender_user, sender_device)
.await,
// Fallback keys are not yet supported
device_unused_fallback_key_types: None,
},
account_data,
receipts,
typing: sync_events::v4::Typing { rooms: BTreeMap::new() },
},
delta_token: None,
})
}
async fn filter_rooms<'a>(
services: &Services,
rooms: &[&'a RoomId],
filter: &[RoomTypeFilter],
negate: bool,
) -> Vec<&'a RoomId> {
rooms
.iter()
.stream()
.filter_map(|r| async move {
let room_type = services.rooms.state_accessor.get_room_type(r).await;
if room_type.as_ref().is_err_and(|e| !e.is_not_found()) {
return None;
}
let room_type_filter = RoomTypeFilter::from(room_type.ok());
let include = if negate {
!filter.contains(&room_type_filter)
} else {
filter.is_empty() || filter.contains(&room_type_filter)
};
include.then_some(r)
})
.collect()
.await
}

View File

@@ -1,11 +1,12 @@
use std::{
cmp::{self, Ordering},
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
ops::Deref,
time::Duration,
};
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Error, Result, at, error, extract_variant, is_equal_to,
matrix::{Event, TypeStateKey, pdu::PduCount},
@@ -31,6 +32,7 @@
events::{
AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType, TimelineEventType,
room::member::{MembershipState, RoomMemberEventContent},
typing::TypingEventContent,
},
serde::Raw,
uint,
@@ -39,7 +41,9 @@
use super::share_encrypted_room;
use crate::{
Ruma,
client::{DEFAULT_BUMP_TYPES, ignored_filter, is_ignored_invite, sync::load_timeline},
client::{
DEFAULT_BUMP_TYPES, TimelinePdus, ignored_filter, is_ignored_invite, sync::load_timeline,
},
};
type SyncInfo<'a> = (&'a UserId, &'a DeviceId, u64, &'a sync_events::v5::Request);
@@ -58,11 +62,18 @@
/// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186
pub(crate) async fn sync_events_v5_route(
State(ref services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<sync_events::v5::Request>,
) -> Result<sync_events::v5::Response> {
debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted");
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_ref().expect("user is authenticated");
services
.users
.update_device_last_seen(sender_user, Some(sender_device), client_ip)
.await;
let mut body = body.body;
// Setup watchers, so if there's no response, we can wait for them
@@ -210,6 +221,9 @@ pub(crate) async fn sync_events_v5_route(
_ = tokio::time::timeout(duration, watcher).await;
}
let typing = collect_typing_events(services, sender_user, &body, &todo_rooms).await?;
response.extensions.typing = typing;
trace!(
rooms = ?response.rooms.len(),
account_data = ?response.extensions.account_data.rooms.len(),
@@ -293,6 +307,8 @@ async fn handle_lists<'a, Rooms, AllRooms>(
Rooms: Iterator<Item = &'a RoomId> + Clone + Send + 'a,
AllRooms: Iterator<Item = &'a RoomId> + Clone + Send + 'a,
{
// TODO MSC4186: Implement remaining list filters: is_dm, is_encrypted,
// room_types.
for (list_id, list) in &body.lists {
let active_rooms: Vec<_> = match list.filters.as_ref().and_then(|f| f.is_invite) {
| None => all_rooms.clone().collect(),
@@ -409,13 +425,13 @@ async fn process_rooms<'a, Rooms>(
.await
.ok();
(timeline_pdus, limited) = (Vec::new(), true);
(timeline_pdus, limited) = (VecDeque::new(), true);
} else {
(timeline_pdus, limited) = match load_timeline(
TimelinePdus { pdus: timeline_pdus, limited } = match load_timeline(
services,
sender_user,
room_id,
roomsincecount,
Some(roomsincecount),
Some(PduCount::from(next_batch)),
*timeline_limit,
)
@@ -434,7 +450,7 @@ async fn process_rooms<'a, Rooms>(
room_id.to_owned(),
services
.account_data
.changes_since(Some(room_id), sender_user, *roomsince, Some(next_batch))
.changes_since(Some(room_id), sender_user, Some(*roomsince), Some(next_batch))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect()
.await,
@@ -460,11 +476,11 @@ async fn process_rooms<'a, Rooms>(
let mut receipts: Vec<Raw<AnySyncEphemeralRoomEvent>> = services
.rooms
.read_receipt
.readreceipts_since(room_id, *roomsince)
.readreceipts_since(room_id, Some(*roomsince))
.filter_map(|(read_user, _ts, v)| async move {
services
.users
.user_is_ignored(read_user, sender_user)
.user_is_ignored(&read_user, sender_user)
.await
.or_some(v)
})
@@ -499,7 +515,7 @@ async fn process_rooms<'a, Rooms>(
}
let prev_batch = timeline_pdus
.first()
.front()
.map_or(Ok::<_, Error>(None), |(pdu_count, _)| {
Ok(Some(match pdu_count {
| PduCount::Backfilled(_) => {
@@ -672,6 +688,62 @@ async fn process_rooms<'a, Rooms>(
}
Ok(rooms)
}
async fn collect_typing_events(
services: &Services,
sender_user: &UserId,
body: &sync_events::v5::Request,
todo_rooms: &TodoRooms,
) -> Result<sync_events::v5::response::Typing> {
if !body.extensions.typing.enabled.unwrap_or(false) {
return Ok(sync_events::v5::response::Typing::default());
}
let rooms: Vec<_> = body.extensions.typing.rooms.clone().unwrap_or_else(|| {
body.room_subscriptions
.keys()
.map(ToOwned::to_owned)
.collect()
});
let lists: Vec<_> = body
.extensions
.typing
.lists
.clone()
.unwrap_or_else(|| body.lists.keys().map(ToOwned::to_owned).collect::<Vec<_>>());
if rooms.is_empty() && lists.is_empty() {
return Ok(sync_events::v5::response::Typing::default());
}
let mut typing_response = sync_events::v5::response::Typing::default();
for (room_id, (_, _, roomsince)) in todo_rooms {
if services.rooms.typing.last_typing_update(room_id).await? <= *roomsince {
continue;
}
match services
.rooms
.typing
.typing_users_for_user(room_id, sender_user)
.await
{
| Ok(typing_users) => {
typing_response.rooms.insert(
room_id.to_owned(), // Already OwnedRoomId
Raw::new(&sync_events::v5::response::SyncTypingEvent {
content: TypingEventContent::new(typing_users),
})?,
);
},
| Err(e) => {
warn!(%room_id, "Failed to get typing events for room: {}", e);
},
}
}
Ok(typing_response)
}
async fn collect_account_data(
services: &Services,
(sender_user, _, globalsince, body): (&UserId, &DeviceId, u64, &sync_events::v5::Request),
@@ -687,7 +759,7 @@ async fn collect_account_data(
account_data.global = services
.account_data
.changes_since(None, sender_user, globalsince, None)
.changes_since(None, sender_user, Some(globalsince), None)
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
.collect()
.await;
@@ -698,7 +770,7 @@ async fn collect_account_data(
room.clone(),
services
.account_data
.changes_since(Some(room), sender_user, globalsince, None)
.changes_since(Some(room), sender_user, Some(globalsince), None)
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect()
.await,
@@ -732,7 +804,7 @@ async fn collect_e2ee<'a, Rooms>(
device_list_changes.extend(
services
.users
.keys_changed(sender_user, globalsince, None)
.keys_changed(sender_user, Some(globalsince), None)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
.await,
@@ -868,7 +940,7 @@ async fn collect_e2ee<'a, Rooms>(
device_list_changes.extend(
services
.users
.room_keys_changed(room_id, globalsince, None)
.room_keys_changed(room_id, Some(globalsince), None)
.map(|(user_id, _)| user_id)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()

View File

@@ -1,4 +1,5 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, utils, utils::math::Tried};
use ruma::api::client::typing::create_typing_event;
@@ -9,10 +10,15 @@
/// Sets the typing state of the sender user.
pub(crate) async fn create_typing_event_route(
State(services): State<crate::State>,
InsecureClientIp(ip): InsecureClientIp,
body: Ruma<create_typing_event::v3::Request>,
) -> Result<create_typing_event::v3::Response> {
use create_typing_event::v3::Typing;
let sender_user = body.sender_user();
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), ip)
.await;
if sender_user != body.user_id && body.appservice_info.is_none() {
return Err!(Request(Forbidden("You cannot update typing status of other users.")));

View File

@@ -52,7 +52,6 @@ pub(crate) async fn get_supported_versions_route(
("org.matrix.msc3026.busy_presence".to_owned(), true), /* busy presence status (https://github.com/matrix-org/matrix-spec-proposals/pull/3026) */
("org.matrix.msc3827".to_owned(), true), /* filtering of /publicRooms by room type (https://github.com/matrix-org/matrix-spec-proposals/pull/3827) */
("org.matrix.msc3952_intentional_mentions".to_owned(), true), /* intentional mentions (https://github.com/matrix-org/matrix-spec-proposals/pull/3952) */
("org.matrix.msc3575".to_owned(), true), /* sliding sync (https://github.com/matrix-org/matrix-spec-proposals/pull/3575/files#r1588877046) */
("org.matrix.msc3916.stable".to_owned(), true), /* authenticated media (https://github.com/matrix-org/matrix-spec-proposals/pull/3916) */
("org.matrix.msc4180".to_owned(), true), /* stable flag for 3916 (https://github.com/matrix-org/matrix-spec-proposals/pull/4180) */
("uk.tcpip.msc4133".to_owned(), true), /* Extending User Profile API with Key:Value Pairs (https://github.com/matrix-org/matrix-spec-proposals/pull/4133) */

View File

@@ -143,7 +143,6 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
.put(client::send_state_event_for_empty_key_route),
)
.ruma_route(&client::sync_events_route)
.ruma_route(&client::sync_events_v4_route)
.ruma_route(&client::sync_events_v5_route)
.ruma_route(&client::get_context_route)
.ruma_route(&client::get_message_events_route)

View File

@@ -10,7 +10,6 @@
use ruma::{
CanonicalJsonValue, OwnedUserId, UserId,
api::{client::error::ErrorKind, federation::membership::create_invite},
events::room::member::{MembershipState, RoomMemberEventContent},
serde::JsonObject,
};
@@ -133,17 +132,21 @@ pub(crate) async fn create_invite_route(
services
.rooms
.state_cache
.update_membership(
&body.room_id,
.mark_as_invited(
&recipient_user,
RoomMemberEventContent::new(MembershipState::Invite),
&body.room_id,
sender_user,
Some(invite_state),
body.via.clone(),
true,
)
.await?;
services
.rooms
.state_cache
.update_joined_count(&body.room_id)
.await;
for appservice in services.appservice.read().await.values() {
if appservice.is_user_match(&recipient_user) {
services

View File

@@ -1224,12 +1224,6 @@ pub struct Config {
#[serde(default)]
pub rocksdb_repair: bool,
#[serde(default)]
pub rocksdb_read_only: bool,
#[serde(default)]
pub rocksdb_secondary: bool,
/// Enables idle CPU priority for compaction thread. This is not enabled by
/// default to prevent compaction from falling too far behind on busy
/// systems.

View File

@@ -177,11 +177,6 @@ pub async fn auth_check<E, F, Fut>(
// [synapse] do_sig_check check the event has valid signatures for member events
// TODO do_size_check is false when called by `iterative_auth_check`
// do_size_check is also mostly accomplished by ruma with the exception of
// checking event_type, state_key, and json are below a certain size (255 and
// 65_536 respectively)
let sender = incoming_event.sender();
// Implementation of https://spec.matrix.org/latest/rooms/v1/#authorization-rules
@@ -908,7 +903,7 @@ struct GetThirdPartyInvite {
false
}
},
| JoinRule::Restricted(_) =>
| JoinRule::Restricted(_) => {
if membership_allows_join || user_for_join_auth_is_valid {
trace!(
%sender,
@@ -928,7 +923,8 @@ struct GetThirdPartyInvite {
valid authorising user given to permit the join"
);
false
},
}
},
| JoinRule::Public => {
trace!(%sender, "join rule is public, allowing join");
true

View File

@@ -36,7 +36,7 @@ fn map_ok_or<U, F>(
) -> MapOkOrElse<Self, impl FnOnce(Self::Ok) -> U, impl FnOnce(Self::Error) -> U>
where
F: FnOnce(Self::Ok) -> U,
Self: Send + Sized;
Self: Sized;
fn ok(
self,
@@ -100,7 +100,7 @@ fn map_ok_or<U, F>(
) -> MapOkOrElse<Self, impl FnOnce(Self::Ok) -> U, impl FnOnce(Self::Error) -> U>
where
F: FnOnce(Self::Ok) -> U,
Self: Send + Sized,
Self: Sized,
{
self.map_ok_or_else(|_| default, f)
}

View File

@@ -19,7 +19,7 @@
use conduwuit::{Err, Result, debug, info, warn};
use rocksdb::{
AsColumnFamilyRef, BoundColumnFamily, DBCommon, DBWithThreadMode, MultiThreaded,
AsColumnFamilyRef, BoundColumnFamily, DBCommon, MultiThreaded, OptimisticTransactionDB,
WaitForCompactOptions,
};
@@ -33,13 +33,11 @@ pub struct Engine {
pub(crate) db: Db,
pub(crate) pool: Arc<Pool>,
pub(crate) ctx: Arc<Context>,
pub(super) read_only: bool,
pub(super) secondary: bool,
pub(crate) checksums: bool,
corks: AtomicU32,
}
pub(crate) type Db = DBWithThreadMode<MultiThreaded>;
pub(crate) type Db = OptimisticTransactionDB<MultiThreaded>;
impl Engine {
#[tracing::instrument(
@@ -129,14 +127,6 @@ pub fn current_sequence(&self) -> u64 {
sequence
}
#[inline]
#[must_use]
pub fn is_read_only(&self) -> bool { self.secondary || self.read_only }
#[inline]
#[must_use]
pub fn is_secondary(&self) -> bool { self.secondary }
}
impl Drop for Engine {

View File

@@ -12,9 +12,8 @@ pub fn backup(&self) -> Result {
let mut engine = self.backup_engine()?;
let config = &self.ctx.server.config;
if config.database_backups_to_keep > 0 {
let flush = !self.is_read_only();
engine
.create_new_backup_flush(&self.db, flush)
.create_new_backup_flush(&self.db, true)
.map_err(map_err)?;
let engine_info = engine.get_backup_info();

View File

@@ -1,7 +1,7 @@
use std::fmt::Write;
use conduwuit::{Result, implement};
use rocksdb::perf::get_memory_usage_stats;
use rocksdb::perf::MemoryUsageBuilder;
use super::Engine;
use crate::or_else;
@@ -9,16 +9,21 @@
#[implement(Engine)]
pub fn memory_usage(&self) -> Result<String> {
let mut res = String::new();
let stats = get_memory_usage_stats(Some(&[&self.db]), Some(&[&*self.ctx.row_cache.lock()]))
.or_else(or_else)?;
let mut builder = MemoryUsageBuilder::new().or_else(or_else)?;
builder.add_db(&self.db);
builder.add_cache(&self.ctx.row_cache.lock());
let usage = builder.build().or_else(or_else)?;
let mibs = |input| f64::from(u32::try_from(input / 1024).unwrap_or(0)) / 1024.0;
writeln!(
res,
"Memory buffers: {:.2} MiB\nPending write: {:.2} MiB\nTable readers: {:.2} MiB\nRow \
cache: {:.2} MiB",
mibs(stats.mem_table_total),
mibs(stats.mem_table_unflushed),
mibs(stats.mem_table_readers_total),
mibs(usage.approximate_mem_table_total()),
mibs(usage.approximate_mem_table_unflushed()),
mibs(usage.approximate_mem_table_readers_total()),
mibs(u64::try_from(self.ctx.row_cache.lock().get_usage())?),
)?;

View File

@@ -35,14 +35,7 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
}
debug!("Opening database...");
let db = if config.rocksdb_read_only {
Db::open_cf_descriptors_read_only(&db_opts, path, cfds, false)
} else if config.rocksdb_secondary {
Db::open_cf_descriptors_as_secondary(&db_opts, path, path, cfds)
} else {
Db::open_cf_descriptors(&db_opts, path, cfds)
}
.or_else(or_else)?;
let db = Db::open_cf_descriptors(&db_opts, path, cfds).or_else(or_else)?;
info!(
columns = num_cfds,
@@ -55,8 +48,6 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
db,
pool: ctx.pool.clone(),
ctx: ctx.clone(),
read_only: config.rocksdb_read_only,
secondary: config.rocksdb_secondary,
checksums: config.rocksdb_checksums,
corks: AtomicU32::new(0),
}))

View File

@@ -219,7 +219,7 @@ pub fn insert_batch<'a, I, K, V>(&'a self, iter: I)
K: AsRef<[u8]> + Sized + Debug + 'a,
V: AsRef<[u8]> + Sized + 'a,
{
let mut batch = WriteBatchWithTransaction::<false>::default();
let mut batch = WriteBatchWithTransaction::<true>::default();
for (key, val) in iter {
batch.put_cf(&self.cf(), key.as_ref(), val.as_ref());
}

View File

@@ -77,14 +77,6 @@ pub fn iter(&self) -> impl Iterator<Item = (&MapsKey, &MapsVal)> + Send + '_ {
#[inline]
pub fn keys(&self) -> impl Iterator<Item = &MapsKey> + Send + '_ { self.maps.keys() }
#[inline]
#[must_use]
pub fn is_read_only(&self) -> bool { self.db.is_read_only() }
#[inline]
#[must_use]
pub fn is_secondary(&self) -> bool { self.db.is_secondary() }
}
impl Index<&str> for Database {

View File

@@ -129,13 +129,14 @@ pub fn changes_since<'a>(
&'a self,
room_id: Option<&'a RoomId>,
user_id: &'a UserId,
since: u64,
since: Option<u64>,
to: Option<u64>,
) -> impl Stream<Item = AnyRawAccountDataEvent> + Send + 'a {
type Key<'a> = (Option<&'a RoomId>, &'a UserId, u64, Ignore);
// Skip the data that's exactly at since, because we sent that last time
let first_possible = (room_id, user_id, since.saturating_add(1));
// ...unless this is an initial sync, in which case send everything
let first_possible = (room_id, user_id, since.map_or(0, |since| since.saturating_add(1)));
self.db
.roomuserdataid_accountdata

View File

@@ -37,10 +37,6 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
}
async fn worker(self: Arc<Self>) -> Result {
if self.services.globals.is_read_only() {
return Ok(());
}
if self.services.config.ldap.enable {
warn!("emergency password feature not available with LDAP enabled.");
return Ok(());

View File

@@ -31,12 +31,13 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let turn_secret = config.turn_secret_file.as_ref().map_or_else(
|| config.turn_secret.clone(),
|path| {
std::fs::read_to_string(path).unwrap_or_else(|e| {
|path| match std::fs::read_to_string(path) {
| Ok(secret) => secret.trim().to_owned(),
| Err(e) => {
error!("Failed to read the TURN secret file: {e}");
config.turn_secret.clone()
})
},
},
);
@@ -49,7 +50,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
return config.registration_token.clone();
};
Some(token)
Some(token.trim().to_owned())
},
);
@@ -170,7 +171,4 @@ pub fn user_is_local(&self, user_id: &UserId) -> bool {
pub fn server_is_ours(&self, server_name: &ServerName) -> bool {
server_name == self.server_name()
}
#[inline]
pub fn is_read_only(&self) -> bool { self.db.db.is_read_only() }
}

View File

@@ -1,7 +1,7 @@
use std::cmp;
use std::{cmp, collections::HashMap};
use conduwuit::{
Err, Result, debug, debug_info, debug_warn, error, info,
Err, Pdu, Result, debug, debug_info, debug_warn, error, info,
result::NotFound,
utils::{
IterStream, ReadyExt,
@@ -13,14 +13,16 @@
use futures::{FutureExt, StreamExt, TryStreamExt};
use itertools::Itertools;
use ruma::{
OwnedUserId, RoomId, UserId,
OwnedRoomId, OwnedUserId, RoomId, UserId,
events::{
GlobalAccountDataEventType, push_rules::PushRulesEvent, room::member::MembershipState,
GlobalAccountDataEventType, StateEventType, push_rules::PushRulesEvent,
room::member::MembershipState,
},
push::Ruleset,
serde::Raw,
};
use crate::{Services, media};
use crate::{Services, media, rooms::short::ShortStateHash};
/// The current schema version.
/// - If database is opened at greater version we reject with error. The
@@ -152,6 +154,14 @@ async fn migrate(services: &Services) -> Result<()> {
info!("Migration: Bumped database version to 18");
}
if db["global"]
.get(POPULATED_USERROOMID_LEFTSTATE_TABLE_MARKER)
.await
.is_not_found()
{
populate_userroomid_leftstate_table(services).await?;
}
assert_eq!(
services.globals.db.database_version().await,
DATABASE_VERSION,
@@ -456,7 +466,11 @@ async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services)
for user_id in &non_joined_members {
debug_info!("User is left or banned, marking as left");
services.rooms.state_cache.mark_as_left(user_id, room_id);
services
.rooms
.state_cache
.mark_as_left(user_id, room_id, None)
.await;
}
}
@@ -576,6 +590,10 @@ async fn fix_readreceiptid_readreceipt_duplicates(services: &Services) -> Result
const FIXED_CORRUPT_MSC4133_FIELDS_MARKER: &[u8] = b"fix_corrupt_msc4133_fields";
async fn fix_corrupt_msc4133_fields(services: &Services) -> Result {
// Due to an old bug, some conduwuit databases have `us.cloke.msc4175.tz` user
// profile fields with raw strings instead of quoted JSON ones.
// This migration fixes that.
use serde_json::{Value, from_slice};
type KeyVal<'a> = ((OwnedUserId, String), &'a [u8]);
@@ -592,24 +610,28 @@ async fn fix_corrupt_msc4133_fields(services: &Services) -> Result {
async |(mut total, mut fixed),
((user, key), value): KeyVal<'_>|
-> Result<(usize, usize)> {
if let Err(error) = from_slice::<Value>(value) {
// Due to an old bug, some conduwuit databases have `us.cloke.msc4175.tz` user
// profile fields with raw strings instead of quoted JSON ones.
// This migration fixes that.
let new_value = if key == "us.cloke.msc4175.tz" {
Value::String(String::from_utf8(value.to_vec())?)
} else {
return Err!(
"failed to deserialize msc4133 key {} of user {}: {}",
key,
user,
error
match from_slice::<Value>(value) {
// corrupted timezone field
| Err(_) if key == "us.cloke.msc4175.tz" => {
let new_value = Value::String(String::from_utf8(value.to_vec())?);
useridprofilekey_value.put((user, key), Json(new_value));
fixed = fixed.saturating_add(1);
},
// corrupted value for some other key
| Err(error) => {
warn!(
"deleting MSC4133 key {} for user {} due to deserialization \
failure: {}",
key, user, error
);
};
useridprofilekey_value.put((user, key), Json(new_value));
fixed = fixed.saturating_add(1);
useridprofilekey_value.del((user, key));
},
// other key with no issues
| Ok(_) => {
// do nothing
},
}
total = total.saturating_add(1);
Ok((total, fixed))
@@ -624,3 +646,78 @@ async fn fix_corrupt_msc4133_fields(services: &Services) -> Result {
db.db.sort()?;
Ok(())
}
const POPULATED_USERROOMID_LEFTSTATE_TABLE_MARKER: &str = "populate_userroomid_leftstate_table";
async fn populate_userroomid_leftstate_table(services: &Services) -> Result {
type KeyVal<'a> = (Key<'a>, Raw<Option<Pdu>>);
type Key<'a> = (&'a UserId, &'a RoomId);
let db = &services.db;
let cork = db.cork_and_sync();
let userroomid_leftstate = db["userroomid_leftstate"].clone();
let (total, fixed, _) = userroomid_leftstate
.stream()
.try_fold(
(0_usize, 0_usize, HashMap::<OwnedRoomId, ShortStateHash>::new()),
async |(mut total, mut fixed, mut shortstatehash_cache): (
usize,
usize,
HashMap<_, _>,
),
((user_id, room_id), state): KeyVal<'_>|
-> Result<(usize, usize, HashMap<_, _>)> {
if state.deserialize().is_err() {
let latest_shortstatehash =
if let Some(shortstatehash) = shortstatehash_cache.get(room_id) {
*shortstatehash
} else if let Ok(shortstatehash) =
services.rooms.state.get_room_shortstatehash(room_id).await
{
shortstatehash_cache.insert(room_id.to_owned(), shortstatehash);
shortstatehash
} else {
warn!(?room_id, ?user_id, "room has no shortstatehash");
return Ok((total, fixed, shortstatehash_cache));
};
let leave_state_event = services
.rooms
.state_accessor
.state_get(
latest_shortstatehash,
&StateEventType::RoomMember,
user_id.as_str(),
)
.await;
match leave_state_event {
| Ok(leave_state_event) => {
userroomid_leftstate.put((user_id, room_id), Json(leave_state_event));
fixed = fixed.saturating_add(1);
},
| Err(_) => {
warn!(
?room_id,
?user_id,
"room cached as left has no leave event for user, removing \
cache entry"
);
userroomid_leftstate.del((user_id, room_id));
},
}
}
total = total.saturating_add(1);
Ok((total, fixed, shortstatehash_cache))
},
)
.await?;
drop(cork);
info!(?total, ?fixed, "Fixed entries in `userroomid_leftstate`.");
db["global"].insert(POPULATED_USERROOMID_LEFTSTATE_TABLE_MARKER, []);
db.db.sort()?;
Ok(())
}

View File

@@ -14,7 +14,7 @@
use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UserId, events::StateEventType};
use tracing::debug;
use crate::rooms::timeline::RawPduId;
use crate::rooms::timeline::{RawPduId, pdu_fits};
/// When receiving an event one needs to:
/// 0. Check the server is in the room
@@ -62,6 +62,13 @@ pub async fn handle_incoming_pdu<'a>(
if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await {
return Ok(Some(pdu_id));
}
if !pdu_fits(&mut value.clone()) {
warn!(
"dropping incoming PDU {event_id} in room {room_id} from {origin} because it \
exceeds 65535 bytes or is otherwise too large."
);
return Err!(Request(TooLarge("PDU is too large")));
}
// 1.1 Check the server is in the room
let meta_exists = self.services.metadata.exists(room_id).map(Ok);

View File

@@ -1,7 +1,8 @@
use std::collections::{BTreeMap, HashMap, hash_map};
use conduwuit::{
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res, trace,
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res,
trace, warn,
};
use futures::future::ready;
use ruma::{
@@ -10,6 +11,7 @@
};
use super::{check_room_id, get_room_version_id, to_room_version};
use crate::rooms::timeline::pdu_fits;
#[implement(super::Service)]
#[allow(clippy::too_many_arguments)]
@@ -25,6 +27,13 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
where
Pdu: Event + Send + Sync,
{
if !pdu_fits(&mut value.clone()) {
warn!(
"dropping incoming PDU {event_id} in room {room_id} from {origin} because it \
exceeds 65535 bytes or is otherwise too large."
);
return Err!(Request(TooLarge("PDU is too large")));
}
// 1. Remove unsigned field
value.remove("unsigned");

View File

@@ -39,7 +39,7 @@ pub enum Status {
Seen(u64),
}
pub type Witness = HashSet<OwnedUserId>;
pub type MemberSet = HashSet<OwnedUserId>;
type Key<'a> = (&'a UserId, Option<&'a DeviceId>, &'a RoomId, &'a UserId);
impl crate::Service for Service {
@@ -67,9 +67,11 @@ pub async fn reset(&self, ctx: &Context<'_>) {
.await;
}
/// Returns only the subset of `senders` which should be sent to the client
/// according to the provided lazy loading context.
#[implement(Service)]
#[tracing::instrument(name = "retain", level = "debug", skip_all)]
pub async fn witness_retain(&self, senders: Witness, ctx: &Context<'_>) -> Witness {
pub async fn retain_lazy_members(&self, senders: MemberSet, ctx: &Context<'_>) -> MemberSet {
debug_assert!(
ctx.options.is_none_or(Options::is_enabled),
"lazy loading should be enabled by your options"
@@ -84,7 +86,7 @@ pub async fn witness_retain(&self, senders: Witness, ctx: &Context<'_>) -> Witne
pin_mut!(witness);
let _cork = self.db.db.cork();
let mut senders = Witness::with_capacity(senders.len());
let mut senders = MemberSet::with_capacity(senders.len());
while let Some((status, sender)) = witness.next().await {
if include_redundant || status == Status::Unseen {
senders.insert(sender.into());

View File

@@ -7,7 +7,7 @@
use database::{Deserialized, Json, Map};
use futures::{Stream, StreamExt};
use ruma::{
CanonicalJsonObject, RoomId, UserId,
CanonicalJsonObject, OwnedUserId, RoomId, UserId,
events::{AnySyncEphemeralRoomEvent, receipt::ReceiptEvent},
serde::Raw,
};
@@ -25,7 +25,7 @@ struct Services {
globals: Dep<globals::Service>,
}
pub(super) type ReceiptItem<'a> = (&'a UserId, u64, Raw<AnySyncEphemeralRoomEvent>);
pub(super) type ReceiptItem = (OwnedUserId, u64, Raw<AnySyncEphemeralRoomEvent>);
impl Data {
pub(super) fn new(args: &crate::Args<'_>) -> Self {
@@ -65,7 +65,7 @@ pub(super) fn readreceipts_since<'a>(
&'a self,
room_id: &'a RoomId,
since: u64,
) -> impl Stream<Item = ReceiptItem<'a>> + Send + 'a {
) -> impl Stream<Item = ReceiptItem> + Send + 'a {
type Key<'a> = (&'a RoomId, u64, &'a UserId);
type KeyVal<'a> = (Key<'a>, CanonicalJsonObject);
@@ -81,7 +81,7 @@ pub(super) fn readreceipts_since<'a>(
let event = serde_json::value::to_raw_value(&json)?;
Ok((user_id, count, Raw::from_json(event)))
Ok((user_id.to_owned(), count, Raw::from_json(event)))
})
.ignore_err()
}

View File

@@ -104,16 +104,16 @@ pub async fn private_read_get(
Ok(Raw::from_json(event))
}
/// Returns an iterator over the most recent read_receipts in a room that
/// happened after the event with id `since`.
/// Returns an iterator over the most recent read_receipts in a room,
/// optionally after the event with id `since`.
#[inline]
#[tracing::instrument(skip(self), level = "debug")]
pub fn readreceipts_since<'a>(
&'a self,
room_id: &'a RoomId,
since: u64,
) -> impl Stream<Item = ReceiptItem<'a>> + Send + 'a {
self.db.readreceipts_since(room_id, since)
since: Option<u64>,
) -> impl Stream<Item = ReceiptItem> + Send + 'a {
self.db.readreceipts_since(room_id, since.unwrap_or(0))
}
/// Sets a private read marker at PDU `count`.

View File

@@ -1,10 +1,18 @@
use std::{borrow::Borrow, fmt::Debug, mem::size_of_val, sync::Arc};
pub use conduwuit::matrix::pdu::{ShortEventId, ShortId, ShortRoomId, ShortStateKey};
use conduwuit::{Result, err, implement, matrix::StateKey, utils, utils::IterStream};
use conduwuit::{
Result, err, implement,
matrix::StateKey,
pair_of,
utils::{self, IterStream, ReadyExt},
};
use database::{Deserialized, Get, Map, Qry};
use futures::{Stream, StreamExt};
use ruma::{EventId, RoomId, events::StateEventType};
use futures::{
Stream, StreamExt,
stream::{self},
};
use ruma::{EventId, OwnedEventId, RoomId, events::StateEventType};
use serde::Deserialize;
use crate::{Dep, globals};
@@ -258,3 +266,23 @@ pub async fn get_or_create_shortroomid(&self, room_id: &RoomId) -> ShortRoomId {
short
})
}
#[implement(Service)]
pub async fn multi_get_state_from_short<'a, S>(
&'a self,
short_state: S,
) -> impl Stream<Item = Result<((StateEventType, StateKey), OwnedEventId)>> + Send + 'a
where
S: Stream<Item = (ShortStateKey, ShortEventId)> + Send + 'a,
{
let (short_state_keys, short_event_ids): pair_of!(Vec<_>) = short_state.unzip().await;
StreamExt::zip(
self.multi_get_statekey_from_short(stream::iter(short_state_keys.into_iter())),
self.multi_get_eventid_from_short(stream::iter(short_event_ids.into_iter())),
)
.ready_filter_map(|state_event| match state_event {
| (Ok(state_key), Ok(event_id)) => Some(Ok((state_key, event_id))),
| (Err(e), _) | (_, Err(e)) => Some(Err(e)),
})
}

View File

@@ -20,7 +20,7 @@
EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, UserId,
events::{
AnyStrippedStateEvent, StateEventType, TimelineEventType,
room::{create::RoomCreateEventContent, member::RoomMemberEventContent},
room::create::RoomCreateEventContent,
},
serde::Raw,
};
@@ -126,21 +126,9 @@ pub async fn force_state(
continue;
};
let Ok(membership_event) = pdu.get_content::<RoomMemberEventContent>() else {
continue;
};
self.services
.state_cache
.update_membership(
room_id,
user_id,
membership_event,
&pdu.sender,
None,
None,
false,
)
.update_membership(room_id, user_id, &pdu, false)
.await?;
},
| TimelineEventType::SpaceChild => {

View File

@@ -1,7 +1,7 @@
use std::{borrow::Borrow, ops::Deref, sync::Arc};
use conduwuit::{
Result, at, err, implement,
Pdu, Result, at, err, implement,
matrix::{Event, StateKey},
pair_of,
utils::{
@@ -10,7 +10,7 @@
},
};
use database::Deserialized;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, future::try_join, pin_mut};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, pin_mut};
use ruma::{
EventId, OwnedEventId, UserId,
events::{
@@ -125,7 +125,7 @@ pub async fn state_get(
shortstatehash: ShortStateHash,
event_type: &StateEventType,
state_key: &str,
) -> Result<impl Event> {
) -> Result<Pdu> {
self.state_get_id(shortstatehash, event_type, state_key)
.and_then(async |event_id: OwnedEventId| self.services.timeline.get_pdu(&event_id).await)
.await
@@ -286,28 +286,28 @@ pub fn state_keys<'a>(
/// not in .1)
#[implement(super::Service)]
#[inline]
pub fn state_removed(
pub async fn state_removed(
&self,
shortstatehash: pair_of!(ShortStateHash),
) -> impl Stream<Item = (ShortStateKey, ShortEventId)> + Send + '_ {
self.state_added((shortstatehash.1, shortstatehash.0))
) -> Result<Vec<(ShortStateKey, ShortEventId)>> {
self.state_added((shortstatehash.1, shortstatehash.0)).await
}
/// Returns the state events added between the interval (present in .1 but
/// not in .0)
#[implement(super::Service)]
pub fn state_added(
pub async fn state_added(
&self,
shortstatehash: pair_of!(ShortStateHash),
) -> impl Stream<Item = (ShortStateKey, ShortEventId)> + Send + '_ {
let a = self.load_full_state(shortstatehash.0);
let b = self.load_full_state(shortstatehash.1);
try_join(a, b)
.map_ok(|(a, b)| b.difference(&a).copied().collect::<Vec<_>>())
.map_ok(IterStream::try_stream)
.try_flatten_stream()
.ignore_err()
) -> Result<Vec<(ShortStateKey, ShortEventId)>> {
let full_state_a = self.load_full_state(shortstatehash.0).await?;
let full_state_b = self.load_full_state(shortstatehash.1).await?;
Ok(full_state_b
.difference(&full_state_a)
.copied()
.map(parse_compressed_state_event)
.collect())
}
#[implement(super::Service)]

View File

@@ -4,7 +4,7 @@
use std::{collections::HashMap, sync::Arc};
use conduwuit::{
Result, SyncRwLock, implement,
Pdu, Result, SyncRwLock, implement,
result::LogErr,
utils::{ReadyExt, stream::TryIgnore},
warn,
@@ -13,7 +13,7 @@
use futures::{Stream, StreamExt, future::join5, pin_mut};
use ruma::{
OwnedRoomId, OwnedUserId, RoomId, ServerName, UserId,
events::{AnyStrippedStateEvent, AnySyncStateEvent, room::member::MembershipState},
events::{AnyStrippedStateEvent, room::member::MembershipState},
serde::Raw,
};
@@ -54,7 +54,6 @@ struct Data {
type AppServiceInRoomCache = SyncRwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>;
type StrippedStateEventItem = (OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>);
type SyncStateEventItem = (OwnedRoomId, Vec<Raw<AnySyncStateEvent>>);
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
@@ -431,18 +430,9 @@ pub async fn knock_state(
#[implement(Service)]
#[tracing::instrument(skip(self), level = "trace")]
pub async fn left_state(
&self,
user_id: &UserId,
room_id: &RoomId,
) -> Result<Vec<Raw<AnyStrippedStateEvent>>> {
pub async fn left_state(&self, user_id: &UserId, room_id: &RoomId) -> Result<Option<Pdu>> {
let key = (user_id, room_id);
self.db
.userroomid_leftstate
.qry(&key)
.await
.deserialized()
.and_then(|val: Raw<Vec<AnyStrippedStateEvent>>| val.deserialize_as().map_err(Into::into))
self.db.userroomid_leftstate.qry(&key).await.deserialized()
}
/// Returns an iterator over all rooms a user left.
@@ -451,8 +441,8 @@ pub async fn left_state(
pub fn rooms_left<'a>(
&'a self,
user_id: &'a UserId,
) -> impl Stream<Item = SyncStateEventItem> + Send + 'a {
type KeyVal<'a> = (Key<'a>, Raw<Vec<Raw<AnySyncStateEvent>>>);
) -> impl Stream<Item = (OwnedRoomId, Option<Pdu>)> + Send + 'a {
type KeyVal<'a> = (Key<'a>, Raw<Option<Pdu>>);
type Key<'a> = (&'a UserId, &'a RoomId);
let prefix = (user_id, Interfix);
@@ -461,7 +451,7 @@ pub fn rooms_left<'a>(
.stream_prefix(&prefix)
.ignore_err()
.map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state))
.map(|(room_id, state)| Ok((room_id, state.deserialize_as()?)))
.map(|(room_id, state)| Ok((room_id, state.deserialize()?)))
.ignore_err()
}

View File

@@ -1,13 +1,13 @@
use std::collections::HashSet;
use conduwuit::{Err, Result, implement, is_not_empty, utils::ReadyExt, warn};
use conduwuit::{Err, Event, Pdu, Result, implement, is_not_empty, utils::ReadyExt, warn};
use database::{Json, serialize_key};
use futures::StreamExt;
use ruma::{
OwnedServerName, RoomId, UserId,
events::{
AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType,
RoomAccountDataEventType, StateEventType,
AnyStrippedStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType,
StateEventType,
direct::DirectEvent,
invite_permission_config::FilterLevel,
room::{
@@ -26,8 +26,7 @@
fields(
%room_id,
%user_id,
%sender,
?membership_event,
?pdu,
),
)]
#[allow(clippy::too_many_arguments)]
@@ -35,13 +34,10 @@ pub async fn update_membership(
&self,
room_id: &RoomId,
user_id: &UserId,
membership_event: RoomMemberEventContent,
sender: &UserId,
last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>,
invite_via: Option<Vec<OwnedServerName>>,
pdu: &Pdu,
update_joined_count: bool,
) -> Result {
let membership = membership_event.membership;
let membership = pdu.get_content::<RoomMemberEventContent>()?;
// Keep track what remote users exist by adding them as "deactivated" users
//
@@ -54,7 +50,7 @@ pub async fn update_membership(
}
}
match &membership {
match &membership.membership {
| MembershipState::Join => {
// Check if the user never joined this room
if !self.once_joined(user_id, room_id).await {
@@ -122,33 +118,14 @@ pub async fn update_membership(
self.mark_as_joined(user_id, room_id);
},
| MembershipState::Invite => {
// return an error for blocked invites. ignored invites aren't handled here
// since the recipient's membership should still be changed to `invite`.
// they're filtered out in the individual /sync handlers
if matches!(
self.services
.users
.invite_filter_level(sender, user_id)
.await,
FilterLevel::Block
) {
return Err!(Request(InviteBlocked(
"{user_id} has blocked invites from {sender}."
)));
}
self.mark_as_invited(user_id, room_id, sender, last_state, invite_via)
.await;
// TODO: make sure that passing None for `last_state` is correct behavior.
// the call from `append_pdu` used to use `services.state.summary_stripped`
// to fill that parameter.
self.mark_as_invited(user_id, room_id, pdu.sender(), None, None)
.await?;
},
| MembershipState::Leave | MembershipState::Ban => {
self.mark_as_left(user_id, room_id);
if self.services.globals.user_is_local(user_id)
&& (self.services.config.forget_forced_upon_leave
|| self.services.metadata.is_banned(room_id).await
|| self.services.metadata.is_disabled(room_id).await)
{
self.forget(room_id, user_id);
}
self.mark_as_left(user_id, room_id, Some(pdu.clone())).await;
},
| _ => {},
}
@@ -252,24 +229,24 @@ pub fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) {
self.db.roomid_inviteviaservers.remove(room_id);
}
/// Direct DB function to directly mark a user as left. It is not
/// recommended to use this directly. You most likely should use
/// `update_membership` instead
/// Mark a user as having left a room.
///
/// `leave_pdu` represents the m.room.member event which the user sent to leave
/// the room. If this is None, no event was actually sent, but we must still
/// behave as if the user is no longer in the room. This may occur, for example,
/// if the room being left has been server-banned by an administrator.
#[implement(super::Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) {
pub async fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId, leave_pdu: Option<Pdu>) {
let userroom_id = (user_id, room_id);
let userroom_id = serialize_key(userroom_id).expect("failed to serialize userroom_id");
let roomuser_id = (room_id, user_id);
let roomuser_id = serialize_key(roomuser_id).expect("failed to serialize roomuser_id");
// (timo) TODO
let leftstate = Vec::<Raw<AnySyncStateEvent>>::new();
self.db
.userroomid_leftstate
.raw_put(&userroom_id, Json(leftstate));
.raw_put(&userroom_id, Json(leave_pdu));
self.db
.roomuserid_leftcount
.raw_aput::<8, _, _>(&roomuser_id, self.services.globals.next_count().unwrap());
@@ -285,6 +262,14 @@ pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) {
self.db.roomuserid_knockedcount.remove(&roomuser_id);
self.db.roomid_inviteviaservers.remove(room_id);
if self.services.globals.user_is_local(user_id)
&& (self.services.config.forget_forced_upon_leave
|| self.services.metadata.is_banned(room_id).await
|| self.services.metadata.is_disabled(room_id).await)
{
self.forget(room_id, user_id);
}
}
/// Direct DB function to directly mark a user as knocked. It is not
@@ -351,7 +336,20 @@ pub async fn mark_as_invited(
sender_user: &UserId,
last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>,
invite_via: Option<Vec<OwnedServerName>>,
) {
) -> Result<()> {
// return an error for blocked invites. ignored invites aren't handled here
// since the recipient's membership should still be changed to `invite`.
// they're filtered out in the individual /sync handlers
if matches!(
self.services
.users
.invite_filter_level(sender_user, user_id)
.await,
FilterLevel::Block
) {
return Err!(Request(InviteBlocked("{user_id} has blocked invites from {sender_user}.")));
}
let roomuser_id = (room_id, user_id);
let roomuser_id = serialize_key(roomuser_id).expect("failed to serialize roomuser_id");
@@ -366,7 +364,7 @@ pub async fn mark_as_invited(
.raw_aput::<8, _, _>(&roomuser_id, self.services.globals.next_count().unwrap());
self.db
.userroomid_invitesender
.raw_put(&userroom_id, sender_user);
.insert(&userroom_id, sender_user);
self.db.userroomid_joined.remove(&userroom_id);
self.db.roomuserid_joined.remove(&roomuser_id);
@@ -380,4 +378,6 @@ pub async fn mark_as_invited(
if let Some(servers) = invite_via.filter(is_not_empty!()) {
self.add_servers_invite_via(room_id, servers).await;
}
Ok(())
}

View File

@@ -526,7 +526,7 @@ pub(crate) fn compress_state_event(
#[inline]
#[must_use]
pub(crate) fn parse_compressed_state_event(
pub fn parse_compressed_state_event(
compressed_event: CompressedStateEvent,
) -> (ShortStateKey, ShortEventId) {
use utils::u64_from_u8;

View File

@@ -19,9 +19,7 @@
GlobalAccountDataEventType, StateEventType, TimelineEventType,
push_rules::PushRulesEvent,
room::{
encrypted::Relation,
member::{MembershipState, RoomMemberEventContent},
power_levels::RoomPowerLevelsEventContent,
encrypted::Relation, power_levels::RoomPowerLevelsEventContent,
redaction::RoomRedactionEventContent,
},
},
@@ -323,31 +321,12 @@ pub async fn append_pdu<'a, Leaves>(
let target_user_id =
UserId::parse(state_key).expect("This state_key was previously validated");
let content: RoomMemberEventContent = pdu.get_content()?;
let stripped_state = match content.membership {
| MembershipState::Invite | MembershipState::Knock => self
.services
.state
.summary_stripped(pdu, room_id)
.await
.into(),
| _ => None,
};
// Update our membership info, we do this here incase a user is invited or
// knocked and immediately leaves we need the DB to record the invite or
// knock event for auth
self.services
.state_cache
.update_membership(
room_id,
target_user_id,
content,
pdu.sender(),
stripped_state,
None,
true,
)
.update_membership(room_id, target_user_id, pdu, true)
.await?;
}
},

View File

@@ -23,6 +23,40 @@
use super::RoomMutexGuard;
pub fn pdu_fits(owned_obj: &mut CanonicalJsonObject) -> bool {
// room IDs, event IDs, senders, types, and state keys must all be <= 255 bytes
if let Some(CanonicalJsonValue::String(room_id)) = owned_obj.get("room_id") {
if room_id.len() > 255 {
return false;
}
}
if let Some(CanonicalJsonValue::String(event_id)) = owned_obj.get("event_id") {
if event_id.len() > 255 {
return false;
}
}
if let Some(CanonicalJsonValue::String(sender)) = owned_obj.get("sender") {
if sender.len() > 255 {
return false;
}
}
if let Some(CanonicalJsonValue::String(kind)) = owned_obj.get("type") {
if kind.len() > 255 {
return false;
}
}
if let Some(CanonicalJsonValue::String(state_key)) = owned_obj.get("state_key") {
if state_key.len() > 255 {
return false;
}
}
// Now check the full PDU size
match serde_json::to_string(owned_obj) {
| Ok(s) => s.len() <= 65535,
| Err(_) => false,
}
}
#[implement(super::Service)]
pub async fn create_hash_and_sign_event(
&self,
@@ -148,19 +182,6 @@ fn from_evt(
}
}
// if event_type != TimelineEventType::RoomCreate && prev_events.is_empty() {
// return Err!(Request(Unknown("Event incorrectly had zero prev_events.")));
// }
// if state_key.is_none() && depth.lt(&uint!(2)) {
// // The first two events in a room are always m.room.create and
// m.room.member, // so any other events with that same depth are illegal.
// warn!(
// "Had unsafe depth {depth} when creating non-state event in {}. Cowardly
// aborting", room_id.expect("room_id is Some here").as_str()
// );
// return Err!(Request(Unknown("Unsafe depth for non-state event.")));
// }
let mut pdu = PduEvent {
event_id: ruma::event_id!("$thiswillbefilledinlater").into(),
room_id: room_id.map(ToOwned::to_owned),
@@ -269,8 +290,16 @@ fn from_evt(
}
// Generate event id
pdu.event_id = gen_event_id(&pdu_json, &room_version_id)?;
// Check with the policy server
pdu_json.insert("event_id".into(), CanonicalJsonValue::String(pdu.event_id.clone().into()));
// Verify that the *full* PDU isn't over 64KiB.
// Ruma only validates that it's under 64KiB before signing and hashing.
// Has to be cloned to prevent mutating pdu_json itself :(
if !pdu_fits(&mut pdu_json.clone()) {
// feckin huge PDU mate
return Err!(Request(TooLarge("Message/PDU is too long (exceeds 65535 bytes)")));
}
// Check with the policy server
if room_id.is_some() {
trace!(
"Checking event in room {} with policy server",

View File

@@ -26,7 +26,7 @@
use serde::Deserialize;
use self::data::Data;
pub use self::data::PdusIterItem;
pub use self::{create::pdu_fits, data::PdusIterItem};
use crate::{
Dep, account_data, admin, appservice, globals, pusher, rooms, sending, server_keys, users,
};
@@ -186,10 +186,8 @@ pub async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> {
}
/// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
#[inline]
pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<impl Event> {
pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
self.db.get_non_outlier_pdu(event_id).await
}
@@ -243,7 +241,7 @@ pub fn all_pdus<'a>(
self.pdus(Some(user_id), room_id, None).ignore_err()
}
/// Reverse iteration starting at from.
/// Reverse iteration starting after `until`.
#[tracing::instrument(skip(self), level = "debug")]
pub fn pdus_rev<'a>(
&'a self,
@@ -255,7 +253,7 @@ pub fn pdus_rev<'a>(
.pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max))
}
/// Forward iteration starting at from.
/// Forward iteration starting after `from`.
#[tracing::instrument(skip(self), level = "debug")]
pub fn pdus<'a>(
&'a self,

View File

@@ -179,18 +179,15 @@ pub async fn last_typing_update(&self, room_id: &RoomId) -> Result<u64> {
.unwrap_or(0))
}
/// Returns a new typing EDU.
pub async fn typings_all(
pub async fn typing_users_for_user(
&self,
room_id: &RoomId,
sender_user: &UserId,
) -> Result<SyncEphemeralRoomEvent<ruma::events::typing::TypingEventContent>> {
) -> Result<Vec<OwnedUserId>> {
let room_typing_indicators = self.typing.read().await.get(room_id).cloned();
let Some(typing_indicators) = room_typing_indicators else {
return Ok(SyncEphemeralRoomEvent {
content: ruma::events::typing::TypingEventContent { user_ids: Vec::new() },
});
return Ok(Vec::new());
};
let user_ids: Vec<_> = typing_indicators
@@ -207,8 +204,19 @@ pub async fn typings_all(
.collect()
.await;
Ok(user_ids)
}
/// Returns a new typing EDU.
pub async fn typings_event_for_user(
&self,
room_id: &RoomId,
sender_user: &UserId,
) -> Result<SyncEphemeralRoomEvent<ruma::events::typing::TypingEventContent>> {
Ok(SyncEphemeralRoomEvent {
content: ruma::events::typing::TypingEventContent { user_ids },
content: ruma::events::typing::TypingEventContent {
user_ids: self.typing_users_for_user(room_id, sender_user).await?,
},
})
}

View File

@@ -423,7 +423,7 @@ async fn select_edus_device_changes(
let keys_changed = self
.services
.users
.room_keys_changed(room_id, since.0, None)
.room_keys_changed(room_id, Some(since.0), None)
.ready_filter(|(user_id, _)| self.services.globals.user_is_local(user_id));
pin_mut!(keys_changed);
@@ -520,7 +520,7 @@ async fn select_edus_receipts_room(
let receipts = self
.services
.read_receipt
.readreceipts_since(room_id, since.0);
.readreceipts_since(room_id, Some(since.0));
pin_mut!(receipts);
let mut read = BTreeMap::<OwnedUserId, ReceiptData>::new();
@@ -530,7 +530,7 @@ async fn select_edus_receipts_room(
}
max_edu_count.fetch_max(count, Ordering::Relaxed);
if !self.services.globals.user_is_local(user_id) {
if !self.services.globals.user_is_local(&user_id) {
continue;
}
@@ -554,7 +554,7 @@ async fn select_edus_receipts_room(
let receipt = receipt
.remove(&ReceiptType::Read)
.expect("our read receipts always set this")
.remove(user_id)
.remove(&user_id)
.expect("our read receipts always have the user here");
let receipt_data = ReceiptData {
@@ -562,7 +562,7 @@ async fn select_edus_receipts_room(
event_ids: vec![event_id.clone()],
};
if read.insert(user_id.to_owned(), receipt_data).is_none() {
if read.insert(user_id, receipt_data).is_none() {
*num = num.saturating_add(1);
if *num >= SELECT_RECEIPT_LIMIT {
break;

View File

@@ -130,7 +130,7 @@ pub async fn start(self: &Arc<Self>) -> Result<Arc<Self>> {
// reset dormant online/away statuses to offline, and set the server user as
// online
if self.server.config.allow_local_presence && !self.db.is_read_only() {
if self.server.config.allow_local_presence {
self.presence.unset_all_presence().await;
_ = self
.presence
@@ -146,7 +146,7 @@ pub async fn stop(&self) {
info!("Shutting down services...");
// set the server user as offline
if self.server.config.allow_local_presence && !self.db.is_read_only() {
if self.server.config.allow_local_presence {
_ = self
.presence
.ping_presence(&self.globals.server_user, &ruma::presence::PresenceState::Offline)

View File

@@ -11,7 +11,7 @@
use ruma::{
CanonicalJsonValue, DeviceId, OwnedDeviceId, OwnedUserId, UserId,
api::client::{
error::ErrorKind,
error::{ErrorKind, StandardErrorBody},
uiaa::{AuthData, AuthType, Password, UiaaInfo, UserIdentifier},
},
};
@@ -104,6 +104,7 @@ pub fn create(
}
#[implement(Service)]
#[allow(clippy::useless_let_if_seq)]
pub async fn try_auth(
&self,
user_id: &UserId,
@@ -163,17 +164,39 @@ pub async fn try_auth(
let user_id = user_id_from_username;
// Check if password is correct
let mut password_verified = false;
// First try local password hash verification
if let Ok(hash) = self.services.users.password_hash(&user_id).await {
let hash_matches = hash::verify_password(password, &hash).is_ok();
if !hash_matches {
uiaainfo.auth_error = Some(ruma::api::client::error::StandardErrorBody {
kind: ErrorKind::forbidden(),
message: "Invalid username or password.".to_owned(),
});
return Ok((false, uiaainfo));
password_verified = hash::verify_password(password, &hash).is_ok();
}
// If local password verification failed, try LDAP authentication
#[cfg(feature = "ldap")]
if !password_verified && self.services.config.ldap.enable {
// Search for user in LDAP to get their DN
if let Ok(dns) = self.services.users.search_ldap(&user_id).await {
if let Some((user_dn, _is_admin)) = dns.first() {
// Try to authenticate with LDAP
password_verified = self
.services
.users
.auth_ldap(user_dn, password)
.await
.is_ok();
}
}
}
if !password_verified {
uiaainfo.auth_error = Some(StandardErrorBody {
kind: ErrorKind::forbidden(),
message: "Invalid username or password.".to_owned(),
});
return Ok((false, uiaainfo));
}
// Password was correct! Let's add it to `completed`
uiaainfo.completed.push(AuthType::Password);
},
@@ -197,7 +220,7 @@ pub async fn try_auth(
},
| Err(e) => {
error!("ReCaptcha verification failed: {e:?}");
uiaainfo.auth_error = Some(ruma::api::client::error::StandardErrorBody {
uiaainfo.auth_error = Some(StandardErrorBody {
kind: ErrorKind::forbidden(),
message: "ReCaptcha verification failed.".to_owned(),
});
@@ -210,7 +233,7 @@ pub async fn try_auth(
if tokens.contains(t.token.trim()) {
uiaainfo.completed.push(AuthType::RegistrationToken);
} else {
uiaainfo.auth_error = Some(ruma::api::client::error::StandardErrorBody {
uiaainfo.auth_error = Some(StandardErrorBody {
kind: ErrorKind::forbidden(),
message: "Invalid registration token.".to_owned(),
});

View File

@@ -1,6 +1,6 @@
#[cfg(feature = "ldap")]
use std::collections::HashMap;
use std::{collections::BTreeMap, mem, sync::Arc};
use std::{collections::BTreeMap, mem, net::IpAddr, sync::Arc};
#[cfg(feature = "ldap")]
use conduwuit::result::LogErr;
@@ -25,6 +25,7 @@
invite_permission_config::{FilterLevel, InvitePermissionConfigEvent},
},
serde::Raw,
uint,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
@@ -790,7 +791,7 @@ pub async fn sign_key(
pub fn keys_changed<'a>(
&'a self,
user_id: &'a UserId,
from: u64,
from: Option<u64>,
to: Option<u64>,
) -> impl Stream<Item = &'a UserId> + Send + 'a {
self.keys_changed_user_or_room(user_id.as_str(), from, to)
@@ -801,7 +802,7 @@ pub fn keys_changed<'a>(
pub fn room_keys_changed<'a>(
&'a self,
room_id: &'a RoomId,
from: u64,
from: Option<u64>,
to: Option<u64>,
) -> impl Stream<Item = (&'a UserId, u64)> + Send + 'a {
self.keys_changed_user_or_room(room_id.as_str(), from, to)
@@ -810,11 +811,12 @@ pub fn room_keys_changed<'a>(
fn keys_changed_user_or_room<'a>(
&'a self,
user_or_room_id: &'a str,
from: u64,
from: Option<u64>,
to: Option<u64>,
) -> impl Stream<Item = (&'a UserId, u64)> + Send + 'a {
type KeyVal<'a> = ((&'a str, u64), &'a UserId);
let from = from.unwrap_or(0);
let to = to.unwrap_or(u64::MAX);
let start = (user_or_room_id, from.saturating_add(1));
self.db
@@ -979,6 +981,7 @@ pub async fn remove_to_device_events<Until>(
.await;
}
/// Updates device metadata and increments the device list version.
pub async fn update_device_metadata(
&self,
user_id: &UserId,
@@ -986,13 +989,51 @@ pub async fn update_device_metadata(
device: &Device,
) -> Result<()> {
increment(&self.db.userid_devicelistversion, user_id.as_bytes());
self.update_device_metadata_no_increment(user_id, device_id, device)
.await
}
// Updates device metadata without incrementing the device list version.
// This is namely used for updating the last_seen_ip and last_seen_ts values,
// as those do not need a device list version bump due to them not being
// relevant to other consumers.
pub async fn update_device_metadata_no_increment(
&self,
user_id: &UserId,
device_id: &DeviceId,
device: &Device,
) -> Result<()> {
let key = (user_id, device_id);
self.db.userdeviceid_metadata.put(key, Json(device));
Ok(())
}
pub async fn update_device_last_seen(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
ip: IpAddr,
) {
let now = MilliSecondsSinceUnixEpoch::now();
if let Some(device_id) = device_id {
if let Ok(mut device) = self.get_device_metadata(user_id, device_id).await {
device.last_seen_ip = Some(ip.to_string());
// If the last update was less than 10 seconds ago, don't update the timestamp
if let Some(prev) = device.last_seen_ts {
if now.get().saturating_sub(prev.get()) < uint!(10_000) {
return;
}
}
device.last_seen_ts = Some(now);
self.update_device_metadata_no_increment(user_id, device_id, &device)
.await
.ok();
}
}
}
/// Get device metadata.
pub async fn get_device_metadata(
&self,