Compare commits

...

99 Commits

Author SHA1 Message Date
Jade Ellis
f4ccb81913 chore: Release 2025-12-22 00:23:20 +00:00
Jade Ellis
710cdfeadb chore: Update mailmap 2025-12-21 20:34:11 +00:00
Jade Ellis
666849ea87 chore(ci): Unify artifact versions 2025-12-21 19:11:12 +00:00
Jade Ellis
71094803f1 fix(ci): Try use path that exists 2025-12-21 18:50:48 +00:00
Jade Ellis
bf91ce5c7f feat: Mark v12 as stable 2025-12-21 17:15:16 +00:00
Jade Ellis
8fd15f26ce style: Fix clippy 2025-12-21 17:12:36 +00:00
Jade Ellis
705fa6c5c6 fix: Simplify visibility check code 2025-12-21 17:12:36 +00:00
Jade Ellis
6f67c27538 fix: Ensure that room ID is present on state events sent to client
routes

Mostly fixes !1094

The remaining issue is federation routes
2025-12-21 17:12:35 +00:00
Jade Ellis
8586d747d1 feat: Run visibility checks on bundled relations 2025-12-21 17:12:35 +00:00
Jade Ellis
11012a9ce1 fix: Always return the same 404 message in context 2025-12-21 17:12:35 +00:00
Jade Ellis
07be190507 fix: Return 404 when event is not accessible 2025-12-21 17:12:35 +00:00
Jade Ellis
ae4acc9568 fix: Don't incorrectly add thread root to relation response 2025-12-21 17:12:35 +00:00
Jade Ellis
f83ddecd8c refactor(perf): Push down visibility check after limit 2025-12-21 17:12:34 +00:00
Jade Ellis
dd87232f1f refactor: Reduce database lookups in some cases 2025-12-21 17:12:34 +00:00
Jade Ellis
8e33f9a7d0 refactor: Improve code style for bundled aggregations 2025-12-21 17:12:34 +00:00
Jade Ellis
8d3e4eba99 fix: Add aggregations to the search endpoint 2025-12-21 17:12:34 +00:00
Jade Ellis
96bfdb97da fix: Filter out invalid replacements from bundled aggregations 2025-12-21 17:12:34 +00:00
Jade Ellis
b61010da47 feat: Add bundled aggregations support
Add support for the m.replace and m.reference bundled
aggregations.
This should fix plenty of subtle client issues.
Threads are not included in the new code as they have
historically been written to the database. Replacing the
old system would result in issues when switching away from
continuwuity, so saved for later.
Some TODOs have been left re event visibility and ignored users.
These should be OK for now, though.
2025-12-21 17:12:34 +00:00
Jade Ellis
987c5eeb03 refactor: Promote handling unsigned data out of timeline
Also fixes:
- Transaction IDs leaking in event route
- Age not being set for event relations or threads
- Both of the above for search results

Notes down concern with relations table
2025-12-21 17:12:33 +00:00
timedout
7fa4fa9862 fix: Also check sender origin 2025-12-21 10:58:50 +00:00
timedout
b2bead67ac fix: Apply additional validation to invites 2025-12-21 10:10:54 +00:00
timedout
48a6a475ce fix: Omit children with invalid state from space summary 2025-12-18 19:48:58 +00:00
timedout
86450da705 style: Run clippy 2025-12-18 19:48:26 +00:00
timedout
8538b21860 feat: Check for incoming signatures 2025-12-18 19:03:32 +00:00
timedout
63e4aacd2b style: Reword TODO comment 2025-12-18 18:24:00 +00:00
timedout
72f0eb9493 feat: Fetch policy server signatures 2025-12-18 18:23:54 +00:00
Odd E. Ebbesen
867d0ab671 fix(reload): Store paths to config files for admin reload
Paths given via --config at startup are now stored inside the config
struct at runtime, to make it possible to reload config without setting
an env var for the config file location.
2025-12-16 14:58:33 +00:00
Ginger
64e187e5b4 fix: Update comment in src/core/config/mod.rs 2025-12-16 14:19:43 +00:00
aviac
5dc449a87a test: add test for config with default_room_version
This commit refactors the test a bit to run the basic test script with
different configs. Currently we have two configs we test:

- the bare minimum to make it run (base)
- base + default_room_version set to "12"
2025-12-16 14:19:43 +00:00
aviac
f5fda01013 docs: Add note about the type of the default_room_version option 2025-12-16 14:19:43 +00:00
Jade Ellis
cdc53b3421 fix: Allow using LDAP passwords in UIAA
Fixes #1131

Co-authored-by: Jade Ellis <jade@ellis.link>
2025-12-16 13:55:32 +00:00
Ginger
0b667ae4fd fix(ci): Try explicitly specifying the ref for debian/fedora workflows 2025-12-15 10:21:46 -05:00
unbeatable-101
83baf9b524 Keep location of Continuwuity configuration file consitant 2025-12-13 22:51:16 +00:00
timedout
4f198fb4ef fix: Enforce limits when joining rooms 2025-12-13 22:17:47 +00:00
timedout
1631c0afa4 fix: Perform additional validation on events 2025-12-13 21:36:20 +00:00
Charlotte Hartmann Paludo
862684af28 fix: remove trailing whitespace from secrets read from secrets file 2025-12-13 16:07:51 +00:00
Ginger
7345c241a9 fix: Don't halt and catch fire on deserialization errors in MSC4133 migration 2025-12-12 11:16:52 -05:00
Ginger
6a8b988b36 fix(ci): Downgrade upload-artifact actions again to v3 this time 2025-12-10 11:33:36 -05:00
Ginger
f1d6536793 fix(ci): Downgrade upload-artifact actions to v4 2025-12-10 11:33:36 -05:00
Ginger
cf8d8e4ea6 chore: Post-rebase cleanup 2025-12-09 03:25:04 +00:00
timedout
393d341f07 perf: Throttle frequent device metadata updates & centralise site 2025-12-09 03:25:03 +00:00
timedout
ba55dffa0e perf: Don't increment the device list version when updating local info 2025-12-09 03:25:03 +00:00
timedout
f3115e14ab feat: Update device metadata upon hitting hot endpoints 2025-12-09 03:25:03 +00:00
Ginger
b3fa4705ef chore: Fix line endings 2025-12-07 15:28:19 -05:00
Ginger
53b06a7918 chore(sync/v3): Remove unused imports 2025-12-07 19:58:24 +00:00
Ginger
fafc1d3fd1 fix(sync/v3): Don't send rejected invites on initial syncs 2025-12-07 19:58:24 +00:00
Ginger
dbc74272c3 refactor(sync/v3): Extract left room timeline logic into its own function 2025-12-07 19:58:24 +00:00
Ginger
f11caac05e fix(sync/v3): Don't send dummy leaves on an initial sync 2025-12-07 19:58:24 +00:00
Ginger
e581face44 chore: Formatting 2025-12-07 19:58:24 +00:00
ginger
037ba41adb fix: Nitpicky comment reword 2025-12-07 19:58:24 +00:00
Ginger
941c8f7d52 fix: Bump max startup time to ten minutes in the systemd unit 2025-12-07 19:58:24 +00:00
Ginger
7dae118af9 chore(sync/v3): More goat sacrifices 2025-12-07 19:58:24 +00:00
Ginger
07dfc5528d refactor(sync/v3): Split load_joined_room into smaller functions 2025-12-07 19:58:24 +00:00
ginger
3f4749a796 fix: Correct error message 2025-12-07 19:58:24 +00:00
Ginger
be8d72fafc fix(sync/v3): Add a workaround for matrix-js-sdk/5071 2025-12-07 19:58:24 +00:00
Ginger
0008709481 fix(sync/v3): Stop ignoring leave cache deserialization failures 2025-12-07 19:58:24 +00:00
Ginger
ee51d4357f fix(sync/v3): Do not include the last membership event when syncing left rooms 2025-12-07 19:58:24 +00:00
Ginger
8ffc6d4f15 chore(sync/v3): Sacrifice a goat to clippy 2025-12-07 19:58:24 +00:00
Ginger
93efe89a1f fix(sync/v3): Cache shortstatehashes to speed up migration 2025-12-07 19:58:24 +00:00
Ginger
16f37d21ff fix(sync/v3): Implement a migration for the userroomid_leftstate table 2025-12-07 19:58:24 +00:00
Ginger
800ac8d1f1 fix(sync/v3): Fix invite filtering for federated invites 2025-12-07 19:58:24 +00:00
Ginger
872f5bf077 feat(sync/v3): Remove TL size config option in favor of using the sync filter 2025-12-07 19:58:24 +00:00
Ginger
992217d644 chore(sync/v3): Fix clippy lints 2025-12-07 19:58:24 +00:00
Ginger
4fb4397a9f fix(sync/v3): Remove mysterious membership event manipulation code 2025-12-07 19:58:24 +00:00
Ginger
61b6947e88 fix(sync/v3): Properly sync room heroes 2025-12-07 19:58:24 +00:00
Ginger
876d3faec4 chore(sync/v3): Use "build_*" terminology instead of "calculate_*" 2025-12-07 19:58:24 +00:00
Ginger
9cc0cc69f7 chore(sync/v3): Use more descriptive names for SyncContext properties 2025-12-07 19:58:24 +00:00
Ginger
5513bb4dff chore: Remove unneeded comment 2025-12-07 19:58:24 +00:00
Ginger
693e327004 fix: Use prepare_lazily_loaded_members for joined rooms
Also, don't take read receipts into consideration for lazy loading.
Synapse doesn't do this and they're making initial syncs very large.
2025-12-07 19:58:24 +00:00
Ginger
3e6571a2b8 chore: Clippy fixes 2025-12-07 19:58:24 +00:00
Jade Ellis
f0f10f8f3e feat: Typing notifications in simplified sliding sync
What's missing? Being able to use separate rooms & lists for typing
indicators.
At the moment, we use the same ones as we use for the timeline, as
todo_rooms is quite intertwined. We need to disentangle this to get that
functionality, although I'm not sure if clients use it.
2025-12-07 19:58:24 +00:00
Ginger
a4f2b55a8a feat: Add a config option to change the max TL size for legacy sync 2025-12-07 19:58:24 +00:00
Ginger
213a361c53 fix: Set limited to true for newly joined rooms again 2025-12-07 19:58:24 +00:00
Ginger
1c21e4af6e fix: Properly sync left rooms
- Remove most usages of `update_membership` in favor
  of directly calling the `mark_as_*` functions
- Store the leave membership event as the value in the
  `userroomid_leftstate` table
- Use the `userroomid_leftstate` table to synchronize the
  timeline and state for left rooms if possible
2025-12-07 19:58:24 +00:00
Ginger
fceaaedc04 fix: Properly sync newly joined rooms 2025-12-07 19:58:24 +00:00
Ginger
0eff173c0b fix(sync/v3): Further cleanup + improve incremental sync consistency 2025-12-07 19:58:24 +00:00
Ginger
72bf8e5927 fix: Correctly send limited timelines again 2025-12-07 19:58:24 +00:00
Ginger
3491f653a5 refactor: Split sync v3 into multiple files 2025-12-07 19:58:24 +00:00
Ginger
e820dd7aed feat: Drop support for MSC3575 (legacy sliding sync) 2025-12-07 19:58:24 +00:00
Ginger
c92b7239a8 chore: Clippy fixes 2025-12-07 19:58:24 +00:00
Ginger
2940bc69c1 fix(sync/v3): Cleanup part 1: mostly fix redundant data in state 2025-12-07 19:58:24 +00:00
Jade
502919b248 chore: Tell continuwuity.org to use my livekit instance 2025-12-04 14:23:02 +00:00
Renovate Bot
33c3d23d60 chore(deps): update rust-patch-updates 2025-11-29 05:01:44 +00:00
Renovate Bot
ce318fe455 chore(deps): update pre-commit hook crate-ci/typos to v1.40.0 2025-11-28 20:19:35 +00:00
Renovate Bot
a729e1d63d chore(deps): update actions/upload-artifact action to v5 2025-11-28 20:19:10 +00:00
Ginger
956c3dfa62 chore: Fix deprecation warning 2025-11-28 15:08:20 -05:00
Renovate Bot
49e8f06559 chore(deps): update rust-patch-updates 2025-11-28 15:00:38 -05:00
rooot
c0f4424cb9 fix(docs): blurry small logo, scroll resizing top bar
Signed-off-by: rooot <hey@rooot.gay>
2025-11-27 13:53:12 +01:00
Tobias Fella
3eac985c5e fix(docs): Correct typo and outdated name 2025-11-26 21:13:43 +01:00
Jade
5fd341096d fix(docs): Dead link 2025-11-26 00:10:59 +00:00
Renovate Bot
a1b2d6ec46 chore(deps): update dependency @rspress/plugin-client-redirects to v2.0.0-rc.1 2025-11-25 21:50:27 +00:00
Renovate Bot
551563ce83 chore(deps): update dependency @rspress/plugin-preview to v2.0.0-rc.1 2025-11-25 18:38:26 +00:00
Ginger
9f133cf75b chore(deps): Update actions/checkout to v6 2025-11-25 18:26:28 +00:00
Ginger
23c398dc1e fix(ci): Remove explicit references to code.forgejo.org in action steps 2025-11-25 18:26:28 +00:00
Renovate Bot
fa73893179 chore(deps): update pre-commit hook crate-ci/committed to v1.1.8 2025-11-25 17:16:13 +00:00
Renovate Bot
57fec44ec7 chore(deps): update dependency cargo-bins/cargo-binstall to v1.16.2 2025-11-24 05:02:19 +00:00
timedout
bc8d304dbf style: Fix unnecessary qualification
Sounds like my university experience
2025-11-23 16:33:32 +00:00
timedout
7f4248a8c6 feat: Enhance remote room leave handling 2025-11-23 16:33:32 +00:00
aviac
430200b60e fix: add explicit fix of rpath
Without this the rpath is empty and the binary won't be able to load the librocksdb.so.10 shared lib
2025-11-23 16:26:27 +00:00
97 changed files with 4425 additions and 3815 deletions

View File

@@ -32,11 +32,13 @@ outputs:
runs:
using: composite
steps:
- run: mkdir -p digests
shell: bash
- name: Download digests
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
uses: forgejo/download-artifact@v4
with:
path: /tmp/digests
path: digests
pattern: ${{ inputs.digest_pattern }}
merge-multiple: true
@@ -78,7 +80,7 @@ runs:
- name: Create manifest list and push
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
working-directory: /tmp/digests
working-directory: digests
shell: bash
env:
IMAGES: ${{ inputs.images }}

View File

@@ -32,12 +32,13 @@ jobs:
echo "Debian distribution: $DISTRIBUTION ($VERSION)"
- name: Checkout repository with full history
uses: https://code.forgejo.org/actions/checkout@v5
uses: actions/checkout@v6
with:
fetch-depth: 0
ref: ${{ github.ref_name }}
- name: Cache Cargo registry
uses: https://code.forgejo.org/actions/cache@v4
uses: actions/cache@v4
with:
path: |
~/.cargo/registry
@@ -126,7 +127,7 @@ jobs:
[ -f /etc/conduwuit/conduwuit.toml ] && echo "✅ Config file installed"
- name: Upload deb artifact
uses: https://code.forgejo.org/actions/upload-artifact@v3
uses: forgejo/upload-artifact@v4
with:
name: continuwuity-${{ steps.debian-version.outputs.distribution }}
path: ${{ steps.cargo-deb.outputs.path }}

View File

@@ -30,13 +30,14 @@ jobs:
echo "Fedora version: $VERSION"
- name: Checkout repository with full history
uses: https://code.forgejo.org/actions/checkout@v5
uses: actions/checkout@v6
with:
fetch-depth: 0
ref: ${{ github.ref_name }}
- name: Cache DNF packages
uses: https://code.forgejo.org/actions/cache@v4
uses: actions/cache@v4
with:
path: |
/var/cache/dnf
@@ -46,7 +47,7 @@ jobs:
dnf-fedora${{ steps.fedora.outputs.version }}-
- name: Cache Cargo registry
uses: https://code.forgejo.org/actions/cache@v4
uses: actions/cache@v4
with:
path: |
~/.cargo/registry
@@ -56,7 +57,7 @@ jobs:
cargo-fedora${{ steps.fedora.outputs.version }}-
- name: Cache Rust build dependencies
uses: https://code.forgejo.org/actions/cache@v4
uses: actions/cache@v4
with:
path: |
~/rpmbuild/BUILD/*/target/release/deps
@@ -238,13 +239,13 @@ jobs:
cp $BIN_RPM upload-bin/
- name: Upload binary RPM
uses: https://code.forgejo.org/actions/upload-artifact@v3
uses: forgejo/upload-artifact@v4
with:
name: continuwuity
path: upload-bin/
- name: Upload debug RPM artifact
uses: https://code.forgejo.org/actions/upload-artifact@v3
uses: forgejo/upload-artifact@v4
with:
name: continuwuity-debug
path: artifacts/*debuginfo*.rpm

View File

@@ -21,7 +21,7 @@ jobs:
steps:
- name: Sync repository
uses: actions/checkout@v5
uses: actions/checkout@v6
with:
persist-credentials: false
fetch-depth: 0

View File

@@ -38,7 +38,7 @@ jobs:
DOCKER_MIRROR_TOKEN: ${{ secrets.DOCKER_MIRROR_TOKEN }}
steps:
- name: Checkout repository
uses: actions/checkout@v5
uses: actions/checkout@v6
with:
persist-credentials: false

View File

@@ -16,7 +16,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v5
uses: actions/checkout@v6
with:
persist-credentials: false
@@ -47,7 +47,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v5
uses: actions/checkout@v6
with:
persist-credentials: false

View File

@@ -43,7 +43,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v5
uses: actions/checkout@v6
with:
persist-credentials: false
- name: Prepare Docker build environment
@@ -97,7 +97,7 @@ jobs:
needs: build-release
steps:
- name: Checkout repository
uses: actions/checkout@v5
uses: actions/checkout@v6
with:
persist-credentials: false
- name: Create multi-platform manifest
@@ -130,7 +130,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v5
uses: actions/checkout@v6
with:
persist-credentials: false
- name: Prepare max-perf Docker build environment
@@ -184,7 +184,7 @@ jobs:
needs: build-maxperf
steps:
- name: Checkout repository
uses: actions/checkout@v5
uses: actions/checkout@v6
with:
persist-credentials: false
- name: Create max-perf manifest

View File

@@ -47,7 +47,7 @@ jobs:
options: --tmpfs /tmp:exec
steps:
- name: Checkout
uses: actions/checkout@v5
uses: actions/checkout@v6
with:
show-progress: false

View File

@@ -14,7 +14,7 @@ jobs:
update-flake-hashes:
runs-on: ubuntu-latest
steps:
- uses: https://code.forgejo.org/actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5
- uses: actions/checkout@v6
with:
fetch-depth: 0
fetch-tags: false

View File

@@ -2,6 +2,7 @@ AlexPewMaster <git@alex.unbox.at> <68469103+AlexPewMaster@users.noreply.github.c
Daniel Wiesenberg <weasy@hotmail.de> <weasy666@gmail.com>
Devin Ragotzy <devin.ragotzy@gmail.com> <d6ragotzy@wmich.edu>
Devin Ragotzy <devin.ragotzy@gmail.com> <dragotzy7460@mail.kvcc.edu>
Ginger <ginger@gingershaped.computer> <75683114+gingershaped@users.noreply.github.com>
Jonas Platte <jplatte+git@posteo.de> <jplatte+gitlab@posteo.de>
Jonas Zohren <git-pbkyr@jzohren.de> <gitlab-jfowl-0ux98@sh14.de>
Jonathan de Jong <jonathan@automatia.nl> <jonathandejong02@gmail.com>
@@ -12,5 +13,6 @@ Olivia Lee <olivia@computer.surgery> <benjamin@computer.surgery>
Rudi Floren <rudi.floren@gmail.com> <rudi.floren@googlemail.com>
Tamara Schmitz <tamara.zoe.schmitz@posteo.de> <15906939+tamara-schmitz@users.noreply.github.com>
Timo Kösters <timo@koesters.xyz>
nexy7574 <git@nexy7574.co.uk> <nex@noreply.forgejo.ellis.link>
nexy7574 <git@nexy7574.co.uk> <nex@noreply.localhost>
x4u <xi.zhu@protonmail.ch> <14617923-x4u@users.noreply.gitlab.com>
Ginger <ginger@gingershaped.computer> <75683114+gingershaped@users.noreply.github.com>

View File

@@ -23,7 +23,7 @@ repos:
- id: check-added-large-files
- repo: https://github.com/crate-ci/typos
rev: v1.39.2
rev: v1.40.0
hooks:
- id: typos
- id: typos
@@ -31,7 +31,7 @@ repos:
stages: [commit-msg]
- repo: https://github.com/crate-ci/committed
rev: v1.1.7
rev: v1.1.8
hooks:
- id: committed

128
Cargo.lock generated
View File

@@ -831,9 +831,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.5.52"
version = "4.5.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa8120877db0e5c011242f96806ce3c94e0737ab8108532a76a3300a01db2ab8"
checksum = "c9e340e012a1bf4935f5282ed1436d1489548e8f72308207ea5df0e23d2d03f8"
dependencies = [
"clap_builder",
"clap_derive",
@@ -850,9 +850,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.5.52"
version = "4.5.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02576b399397b659c26064fbc92a75fede9d18ffd5f80ca1cd74ddab167016e1"
checksum = "d76b5d13eaa18c901fd2f7fca939fefe3a0727a953561fefdf3b2922b8569d00"
dependencies = [
"anstream",
"anstyle",
@@ -940,7 +940,7 @@ dependencies = [
[[package]]
name = "conduwuit"
version = "0.5.0-rc.8.1"
version = "0.5.0"
dependencies = [
"clap",
"conduwuit_admin",
@@ -972,7 +972,7 @@ dependencies = [
[[package]]
name = "conduwuit_admin"
version = "0.5.0-rc.8.1"
version = "0.5.0"
dependencies = [
"clap",
"conduwuit_api",
@@ -994,7 +994,7 @@ dependencies = [
[[package]]
name = "conduwuit_api"
version = "0.5.0-rc.8.1"
version = "0.5.0"
dependencies = [
"async-trait",
"axum 0.7.9",
@@ -1027,14 +1027,14 @@ dependencies = [
[[package]]
name = "conduwuit_build_metadata"
version = "0.5.0-rc.8.1"
version = "0.5.0"
dependencies = [
"built",
]
[[package]]
name = "conduwuit_core"
version = "0.5.0-rc.8.1"
version = "0.5.0"
dependencies = [
"argon2",
"arrayvec",
@@ -1095,7 +1095,7 @@ dependencies = [
[[package]]
name = "conduwuit_database"
version = "0.5.0-rc.8.1"
version = "0.5.0"
dependencies = [
"async-channel",
"conduwuit_core",
@@ -1114,7 +1114,7 @@ dependencies = [
[[package]]
name = "conduwuit_macros"
version = "0.5.0-rc.8.1"
version = "0.5.0"
dependencies = [
"itertools 0.14.0",
"proc-macro2",
@@ -1124,7 +1124,7 @@ dependencies = [
[[package]]
name = "conduwuit_router"
version = "0.5.0-rc.8.1"
version = "0.5.0"
dependencies = [
"axum 0.7.9",
"axum-client-ip",
@@ -1159,7 +1159,7 @@ dependencies = [
[[package]]
name = "conduwuit_service"
version = "0.5.0-rc.8.1"
version = "0.5.0"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -1200,7 +1200,7 @@ dependencies = [
[[package]]
name = "conduwuit_web"
version = "0.5.0-rc.8.1"
version = "0.5.0"
dependencies = [
"askama",
"axum 0.7.9",
@@ -1750,7 +1750,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
dependencies = [
"libc",
"windows-sys 0.61.2",
"windows-sys 0.52.0",
]
[[package]]
@@ -2405,7 +2405,7 @@ dependencies = [
"libc",
"percent-encoding",
"pin-project-lite",
"socket2 0.6.1",
"socket2 0.5.10",
"tokio",
"tower-service",
"tracing",
@@ -2990,23 +2990,9 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "minicbor"
version = "2.1.1"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f182275033b808ede9427884caa8e05fa7db930801759524ca7925bd8aa7a82"
dependencies = [
"minicbor-derive",
]
[[package]]
name = "minicbor-derive"
version = "0.18.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b17290c95158a760027059fe3f511970d6857e47ff5008f9e09bffe3d3e1c6af"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
checksum = "f9a1119e42fbacc2bb65d860de6eb7c930562bc71d42dca026d06b0228231f77"
[[package]]
name = "minicbor-serde"
@@ -3020,9 +3006,9 @@ dependencies = [
[[package]]
name = "minimad"
version = "0.13.1"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9c5d708226d186590a7b6d4a9780e2bdda5f689e0d58cd17012a298efd745d2"
checksum = "df8b688969b16915f3ecadc7829d5b7779dee4977e503f767f34136803d5c06f"
dependencies = [
"once_cell",
]
@@ -3135,7 +3121,7 @@ version = "0.50.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
dependencies = [
"windows-sys 0.61.2",
"windows-sys 0.60.2",
]
[[package]]
@@ -3761,7 +3747,7 @@ dependencies = [
"quinn-udp",
"rustc-hash",
"rustls",
"socket2 0.6.1",
"socket2 0.5.10",
"thiserror 2.0.17",
"tokio",
"tracing",
@@ -3798,7 +3784,7 @@ dependencies = [
"cfg_aliases",
"libc",
"once_cell",
"socket2 0.6.1",
"socket2 0.5.10",
"tracing",
"windows-sys 0.52.0",
]
@@ -4044,9 +4030,9 @@ dependencies = [
[[package]]
name = "resolv-conf"
version = "0.7.5"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b3789b30bd25ba102de4beabd95d21ac45b69b1be7d14522bab988c526d6799"
checksum = "1e061d1b48cb8d38042de4ae0a7a6401009d6143dc80d2e2d6f31f0bdd6470c7"
[[package]]
name = "rgb"
@@ -4077,7 +4063,7 @@ checksum = "88f8660c1ff60292143c98d08fc6e2f654d722db50410e3f3797d40baaf9d8f3"
[[package]]
name = "ruma"
version = "0.10.1"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=27abe0dcd33fd4056efc94bab3582646b31b6ce9#27abe0dcd33fd4056efc94bab3582646b31b6ce9"
dependencies = [
"assign",
"js_int",
@@ -4097,7 +4083,7 @@ dependencies = [
[[package]]
name = "ruma-appservice-api"
version = "0.10.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=27abe0dcd33fd4056efc94bab3582646b31b6ce9#27abe0dcd33fd4056efc94bab3582646b31b6ce9"
dependencies = [
"js_int",
"ruma-common",
@@ -4109,7 +4095,7 @@ dependencies = [
[[package]]
name = "ruma-client-api"
version = "0.18.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=27abe0dcd33fd4056efc94bab3582646b31b6ce9#27abe0dcd33fd4056efc94bab3582646b31b6ce9"
dependencies = [
"as_variant",
"assign",
@@ -4132,7 +4118,7 @@ dependencies = [
[[package]]
name = "ruma-common"
version = "0.13.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=27abe0dcd33fd4056efc94bab3582646b31b6ce9#27abe0dcd33fd4056efc94bab3582646b31b6ce9"
dependencies = [
"as_variant",
"base64 0.22.1",
@@ -4164,7 +4150,7 @@ dependencies = [
[[package]]
name = "ruma-events"
version = "0.28.1"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=27abe0dcd33fd4056efc94bab3582646b31b6ce9#27abe0dcd33fd4056efc94bab3582646b31b6ce9"
dependencies = [
"as_variant",
"indexmap",
@@ -4189,7 +4175,7 @@ dependencies = [
[[package]]
name = "ruma-federation-api"
version = "0.9.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=27abe0dcd33fd4056efc94bab3582646b31b6ce9#27abe0dcd33fd4056efc94bab3582646b31b6ce9"
dependencies = [
"bytes",
"headers",
@@ -4211,7 +4197,7 @@ dependencies = [
[[package]]
name = "ruma-identifiers-validation"
version = "0.9.5"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=27abe0dcd33fd4056efc94bab3582646b31b6ce9#27abe0dcd33fd4056efc94bab3582646b31b6ce9"
dependencies = [
"js_int",
"thiserror 2.0.17",
@@ -4220,7 +4206,7 @@ dependencies = [
[[package]]
name = "ruma-identity-service-api"
version = "0.9.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=27abe0dcd33fd4056efc94bab3582646b31b6ce9#27abe0dcd33fd4056efc94bab3582646b31b6ce9"
dependencies = [
"js_int",
"ruma-common",
@@ -4230,7 +4216,7 @@ dependencies = [
[[package]]
name = "ruma-macros"
version = "0.13.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=27abe0dcd33fd4056efc94bab3582646b31b6ce9#27abe0dcd33fd4056efc94bab3582646b31b6ce9"
dependencies = [
"cfg-if",
"proc-macro-crate",
@@ -4245,7 +4231,7 @@ dependencies = [
[[package]]
name = "ruma-push-gateway-api"
version = "0.9.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=27abe0dcd33fd4056efc94bab3582646b31b6ce9#27abe0dcd33fd4056efc94bab3582646b31b6ce9"
dependencies = [
"js_int",
"ruma-common",
@@ -4257,7 +4243,7 @@ dependencies = [
[[package]]
name = "ruma-signatures"
version = "0.15.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=27abe0dcd33fd4056efc94bab3582646b31b6ce9#27abe0dcd33fd4056efc94bab3582646b31b6ce9"
dependencies = [
"base64 0.22.1",
"ed25519-dalek",
@@ -4337,7 +4323,7 @@ dependencies = [
"errno",
"libc",
"linux-raw-sys",
"windows-sys 0.61.2",
"windows-sys 0.52.0",
]
[[package]]
@@ -4643,9 +4629,9 @@ dependencies = [
[[package]]
name = "serde-saphyr"
version = "0.0.8"
version = "0.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0916ccf524f1ccec1b3c02193c9e3d2e167aee9b6b294829dce6f4411332155"
checksum = "9b9e06cddad47cc6214c0c456cf209b99a58b54223e7af2f6d4b88a5a9968499"
dependencies = [
"ahash",
"base64 0.22.1",
@@ -4974,9 +4960,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
version = "2.0.110"
version = "2.0.111"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a99801b5bd34ede4cf3fc688c5919368fea4e4814a4664359503e6015b280aea"
checksum = "390cc9a294ab71bdb1aa2e99d13be9c753cd2d7bd6560c77118597410c4d2e87"
dependencies = [
"proc-macro2",
"quote",
@@ -5022,9 +5008,9 @@ dependencies = [
[[package]]
name = "termimad"
version = "0.34.0"
version = "0.34.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68ff5ca043d65d4ea43b65cdb4e3aba119657d0d12caf44f93212ec3168a8e20"
checksum = "889a9370996b74cf46016ce35b96c248a9ac36d69aab1d112b3e09bc33affa49"
dependencies = [
"coolor",
"crokey",
@@ -5420,9 +5406,9 @@ dependencies = [
[[package]]
name = "tower-http"
version = "0.6.6"
version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2"
checksum = "9cf146f99d442e8e68e585f5d798ccd3cad9a7835b917e09728880a862706456"
dependencies = [
"async-compression",
"bitflags",
@@ -5456,9 +5442,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
[[package]]
name = "tracing"
version = "0.1.41"
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647"
dependencies = [
"pin-project-lite",
"tracing-attributes",
@@ -5467,9 +5453,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.30"
version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903"
checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da"
dependencies = [
"proc-macro2",
"quote",
@@ -5478,9 +5464,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.34"
version = "0.1.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678"
checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c"
dependencies = [
"once_cell",
"valuable",
@@ -5499,9 +5485,9 @@ dependencies = [
[[package]]
name = "tracing-journald"
version = "0.3.1"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc0b4143302cf1022dac868d521e36e8b27691f72c84b3311750d5188ebba657"
checksum = "2d3a81ed245bfb62592b1e2bc153e77656d94ee6a0497683a65a12ccaf2438d0"
dependencies = [
"libc",
"tracing-core",
@@ -5540,9 +5526,9 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.3.20"
version = "0.3.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5"
checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e"
dependencies = [
"matchers",
"nu-ansi-term",
@@ -6218,7 +6204,7 @@ dependencies = [
[[package]]
name = "xtask"
version = "0.5.0-rc.8.1"
version = "0.5.0"
dependencies = [
"clap",
"serde",
@@ -6227,7 +6213,7 @@ dependencies = [
[[package]]
name = "xtask-generate-commands"
version = "0.5.0-rc.8.1"
version = "0.5.0"
dependencies = [
"clap-markdown",
"clap_builder",

View File

@@ -21,7 +21,7 @@ license = "Apache-2.0"
readme = "README.md"
repository = "https://forgejo.ellis.link/continuwuation/continuwuity"
rust-version = "1.86.0"
version = "0.5.0-rc.8.1"
version = "0.5.0"
[workspace.metadata.crane]
name = "conduwuit"
@@ -167,7 +167,7 @@ features = ["raw_value"]
# Used for appservice registration files
[workspace.dependencies.serde-saphyr]
version = "0.0.8"
version = "0.0.10"
# Used to load forbidden room/user regex from config
[workspace.dependencies.serde_regex]
@@ -351,7 +351,7 @@ version = "0.1.2"
# Used for matrix spec type definitions and helpers
[workspace.dependencies.ruma]
git = "https://forgejo.ellis.link/continuwuation/ruwuma"
rev = "50b2a91b2ab8f9830eea80b9911e11234e0eac66"
rev = "27abe0dcd33fd4056efc94bab3582646b31b6ce9"
features = [
"compat",
"rand",

View File

@@ -586,10 +586,13 @@
#allow_unstable_room_versions = true
# Default room version continuwuity will create rooms with.
# Note that this has to be a string since the room version is a string
# rather than an integer. Forgetting the quotes will make the server fail
# to start!
#
# Per spec, room version 11 is the default.
# Per spec, room version "11" is the default.
#
#default_room_version = 11
#default_room_version = "11"
# Enable OpenTelemetry OTLP tracing export. This replaces the deprecated
# Jaeger exporter. Traces will be sent via OTLP to a collector (such as

View File

@@ -48,7 +48,7 @@ EOF
# Developer tool versions
# renovate: datasource=github-releases depName=cargo-bins/cargo-binstall
ENV BINSTALL_VERSION=1.16.0
ENV BINSTALL_VERSION=1.16.2
# renovate: datasource=github-releases depName=psastras/sbom-rs
ENV CARGO_SBOM_VERSION=0.9.1
# renovate: datasource=crate depName=lddtree

View File

@@ -18,7 +18,7 @@ RUN --mount=type=cache,target=/etc/apk/cache apk add \
# Developer tool versions
# renovate: datasource=github-releases depName=cargo-bins/cargo-binstall
ENV BINSTALL_VERSION=1.16.0
ENV BINSTALL_VERSION=1.16.2
# renovate: datasource=github-releases depName=psastras/sbom-rs
ENV CARGO_SBOM_VERSION=0.9.1
# renovate: datasource=crate depName=lddtree

View File

@@ -10,7 +10,7 @@ # Continuwuity Community Guidelines
environment where everyone feels safe and respected.
For code and contribution guidelines, please refer to the
[Contributor's Covenant](https://forgejo.ellis.link/continuwuation/continuwuity/src/branch/main/CODE_OF_CONDUCT.mdx).
[Contributor's Covenant](https://forgejo.ellis.link/continuwuation/continuwuity/src/branch/main/CODE_OF_CONDUCT.md).
Below are additional guidelines specific to the Continuwuity community.
## Our Values and Expected Behaviors

View File

@@ -2,7 +2,7 @@
services:
homeserver:
### If you already built the conduduwit image with 'docker build' or want to use the Docker Hub image,
### If you already built the continuwuity image with 'docker build' or want to use the Docker Hub image,
### then you are ready to go.
image: forgejo.ellis.link/continuwuation/continuwuity:latest
restart: unless-stopped

View File

@@ -134,7 +134,7 @@ ### Example systemd Unit File
## Creating the Continuwuity configuration file
Now you need to create the Continuwuity configuration file in
`/etc/continuwuity/continuwuity.toml`. You can find an example configuration at
`/etc/conduwuit/conduwuit.toml`. You can find an example configuration at
[conduwuit-example.toml](../reference/config.mdx).
**Please take a moment to read the config. You need to change at least the

View File

@@ -1 +1 @@
{"m.homeserver":{"base_url": "https://matrix.continuwuity.org"},"org.matrix.msc3575.proxy":{"url": "https://matrix.continuwuity.org"}}
{"m.homeserver":{"base_url": "https://matrix.continuwuity.org"},"org.matrix.msc3575.proxy":{"url": "https://matrix.continuwuity.org"},"org.matrix.msc4143.rtc_foci":[{"type":"livekit","livekit_service_url":"https://livekit.ellis.link"}]}

View File

@@ -97,6 +97,9 @@ rec {
craneLib.buildPackage (
(commonAttrs commonAttrsArgs)
// {
postFixup = ''
patchelf --set-rpath "$(${pkgs.patchelf}/bin/patchelf --print-rpath $out/bin/${crateInfo.pname}):${rocksdb}/lib" $out/bin/${crateInfo.pname}
'';
cargoArtifacts = deps;
doCheck = true;
env = uwuenv.buildPackageEnv // rocksdbEnv;

View File

@@ -6,6 +6,69 @@
pkgs,
...
}:
let
baseTestScript =
pkgs.writers.writePython3Bin "do_test" { libraries = [ pkgs.python3Packages.matrix-nio ]; }
''
import asyncio
import nio
async def main() -> None:
# Connect to continuwuity
client = nio.AsyncClient("http://continuwuity:6167", "alice")
# Register as user alice
response = await client.register("alice", "my-secret-password")
# Log in as user alice
response = await client.login("my-secret-password")
# Create a new room
response = await client.room_create(federate=False)
print("Matrix room create response:", response)
assert isinstance(response, nio.RoomCreateResponse)
room_id = response.room_id
# Join the room
response = await client.join(room_id)
print("Matrix join response:", response)
assert isinstance(response, nio.JoinResponse)
# Send a message to the room
response = await client.room_send(
room_id=room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": "Hello continuwuity!"
}
)
print("Matrix room send response:", response)
assert isinstance(response, nio.RoomSendResponse)
# Sync responses
response = await client.sync(timeout=30000)
print("Matrix sync response:", response)
assert isinstance(response, nio.SyncResponse)
# Check the message was received by continuwuity
last_message = response.rooms.join[room_id].timeline.events[-1].body
assert last_message == "Hello continuwuity!"
# Leave the room
response = await client.room_leave(room_id)
print("Matrix room leave response:", response)
assert isinstance(response, nio.RoomLeaveResponse)
# Close the client
await client.close()
if __name__ == "__main__":
asyncio.run(main())
'';
in
{
# run some nixos tests as checks
checks = lib.pipe self'.packages [
@@ -18,106 +81,69 @@
# this test was initially yoinked from
#
# https://github.com/NixOS/nixpkgs/blob/960ce26339661b1b69c6f12b9063ca51b688615f/nixos/tests/matrix/continuwuity.nix
(builtins.map (name: {
name = "test-${name}";
value = pkgs.testers.runNixOSTest {
inherit name;
(builtins.concatMap (
name:
builtins.map
(
{ config, suffix }:
{
name = "test-${name}-${suffix}";
value = pkgs.testers.runNixOSTest {
inherit name;
nodes = {
continuwuity = {
services.matrix-continuwuity = {
enable = true;
package = self'.packages.${name};
settings.global = {
nodes = {
continuwuity = {
services.matrix-continuwuity = {
enable = true;
package = self'.packages.${name};
settings = config;
extraEnvironment.RUST_BACKTRACE = "yes";
};
networking.firewall.allowedTCPPorts = [ 6167 ];
};
client.environment.systemPackages = [ baseTestScript ];
};
testScript = ''
start_all()
with subtest("start continuwuity"):
continuwuity.wait_for_unit("continuwuity.service")
continuwuity.wait_for_open_port(6167)
with subtest("ensure messages can be exchanged"):
client.succeed("${lib.getExe baseTestScript} >&2")
'';
};
}
)
[
{
suffix = "base";
config = {
global = {
server_name = name;
address = [ "0.0.0.0" ];
allow_registration = true;
yes_i_am_very_very_sure_i_want_an_open_registration_server_prone_to_abuse = true;
};
extraEnvironment.RUST_BACKTRACE = "yes";
};
networking.firewall.allowedTCPPorts = [ 6167 ];
};
client =
{ pkgs, ... }:
{
environment.systemPackages = [
(pkgs.writers.writePython3Bin "do_test" { libraries = [ pkgs.python3Packages.matrix-nio ]; } ''
import asyncio
import nio
async def main() -> None:
# Connect to continuwuity
client = nio.AsyncClient("http://continuwuity:6167", "alice")
# Register as user alice
response = await client.register("alice", "my-secret-password")
# Log in as user alice
response = await client.login("my-secret-password")
# Create a new room
response = await client.room_create(federate=False)
print("Matrix room create response:", response)
assert isinstance(response, nio.RoomCreateResponse)
room_id = response.room_id
# Join the room
response = await client.join(room_id)
print("Matrix join response:", response)
assert isinstance(response, nio.JoinResponse)
# Send a message to the room
response = await client.room_send(
room_id=room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": "Hello continuwuity!"
}
)
print("Matrix room send response:", response)
assert isinstance(response, nio.RoomSendResponse)
# Sync responses
response = await client.sync(timeout=30000)
print("Matrix sync response:", response)
assert isinstance(response, nio.SyncResponse)
# Check the message was received by continuwuity
last_message = response.rooms.join[room_id].timeline.events[-1].body
assert last_message == "Hello continuwuity!"
# Leave the room
response = await client.room_leave(room_id)
print("Matrix room leave response:", response)
assert isinstance(response, nio.RoomLeaveResponse)
# Close the client
await client.close()
if __name__ == "__main__":
asyncio.run(main())
'')
];
}
{
suffix = "with-room-version";
config = {
global = {
server_name = name;
address = [ "0.0.0.0" ];
allow_registration = true;
yes_i_am_very_very_sure_i_want_an_open_registration_server_prone_to_abuse = true;
default_room_version = "12";
};
};
};
testScript = ''
start_all()
with subtest("start continuwuity"):
continuwuity.wait_for_unit("continuwuity.service")
continuwuity.wait_for_open_port(6167)
with subtest("ensure messages can be exchanged"):
client.succeed("do_test >&2")
'';
};
}))
}
]
))
builtins.listToAttrs
];
};

1039
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -63,7 +63,7 @@ Restart=on-failure
RestartSec=5
TimeoutStopSec=4m
TimeoutStartSec=4m
TimeoutStartSec=10m
StartLimitInterval=1m
StartLimitBurst=5

View File

@@ -41,7 +41,7 @@ async fn changes_since(
let results: Vec<_> = self
.services
.account_data
.changes_since(room_id.as_deref(), &user_id, since, None)
.changes_since(room_id.as_deref(), &user_id, Some(since), None)
.collect()
.await;
let query_time = timer.elapsed();

View File

@@ -31,7 +31,7 @@ pub(super) async fn last(&self, room_id: OwnedRoomOrAliasId) -> Result {
.services
.rooms
.timeline
.last_timeline_count(None, &room_id)
.last_timeline_count(&room_id)
.await?;
self.write_str(&format!("{result:#?}")).await
@@ -52,7 +52,7 @@ pub(super) async fn pdus(
.services
.rooms
.timeline
.pdus_rev(None, &room_id, from)
.pdus_rev(&room_id, from)
.try_take(limit.unwrap_or(3))
.try_collect()
.await?;

View File

@@ -30,10 +30,31 @@ pub(super) async fn show_config(&self) -> Result {
#[admin_command]
pub(super) async fn reload_config(&self, path: Option<PathBuf>) -> Result {
let path = path.as_deref().into_iter();
self.services.config.reload(path)?;
// The path argument is only what's optionally passed via the admin command,
// so we need to merge it with the existing paths if any were given at startup.
let mut paths = Vec::new();
self.write_str("Successfully reconfigured.").await
// Add previously saved paths to the argument list
self.services
.config
.config_paths
.clone()
.unwrap_or_default()
.iter()
.for_each(|p| paths.push(p.to_owned()));
// If a path is given, and it's not already in the list,
// add it last, so that it overrides earlier files
if let Some(p) = path {
if !paths.contains(&p) {
paths.push(p);
}
}
self.services.config.reload(&paths)?;
self.write_str(&format!("Successfully reconfigured from paths: {paths:?}"))
.await
}
#[admin_command]

View File

@@ -1,4 +1,7 @@
use std::{collections::BTreeMap, fmt::Write as _};
use std::{
collections::{BTreeMap, HashSet},
fmt::Write as _,
};
use api::client::{
full_user_deactivate, join_room_by_id_helper, leave_all_rooms, leave_room, remote_leave_room,
@@ -12,7 +15,7 @@
};
use futures::{FutureExt, StreamExt};
use ruma::{
OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedUserId, UserId,
OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName, OwnedUserId, UserId,
events::{
RoomAccountDataEventType, StateEventType,
room::{
@@ -950,23 +953,38 @@ pub(super) async fn force_leave_remote_room(
&self,
user_id: String,
room_id: OwnedRoomOrAliasId,
via: Option<String>,
) -> Result {
let user_id = parse_local_user_id(self.services, &user_id)?;
let (room_id, _) = self
let (room_id, vias_raw) = self
.services
.rooms
.alias
.resolve_with_servers(&room_id, None)
.resolve_with_servers(
&room_id,
if let Some(v) = via.clone() {
Some(vec![OwnedServerName::parse(v)?])
} else {
None
},
)
.await?;
assert!(
self.services.globals.user_is_local(&user_id),
"Parsed user_id must be a local user"
);
remote_leave_room(self.services, &user_id, &room_id, None)
let mut vias: HashSet<OwnedServerName> = HashSet::new();
if let Some(via) = via {
vias.insert(OwnedServerName::parse(via)?);
}
for server in vias_raw {
vias.insert(server);
}
remote_leave_room(self.services, &user_id, &room_id, None, vias)
.boxed()
.await?;
self.write_str(&format!("{user_id} has been joined to {room_id}.",))
self.write_str(&format!("{user_id} successfully left {room_id} via remote server."))
.await
}

View File

@@ -107,6 +107,7 @@ pub enum UserCommand {
ForceLeaveRemoteRoom {
user_id: String,
room_id: OwnedRoomOrAliasId,
via: Option<String>,
},
/// - Forces the specified user to drop their power levels to the room

View File

@@ -59,7 +59,7 @@ pub(crate) async fn get_context_route(
.rooms
.timeline
.get_pdu(event_id)
.map_err(|_| err!(Request(NotFound("Base event not found."))));
.map_err(|_| err!(Request(NotFound("Event not found."))));
let visible = services
.rooms
@@ -70,7 +70,7 @@ pub(crate) async fn get_context_route(
let (base_id, base_pdu, visible) = try_join3(base_id, base_pdu, visible).await?;
if base_pdu.room_id_or_hash() != *room_id || base_pdu.event_id != *event_id {
return Err!(Request(NotFound("Base event not found.")));
return Err!(Request(NotFound("Event not found.")));
}
if !visible {
@@ -82,11 +82,25 @@ pub(crate) async fn get_context_route(
let base_event = ignored_filter(&services, (base_count, base_pdu), sender_user);
// PDUs are used to get seen user IDs and then returned in response.
let events_before = services
.rooms
.timeline
.pdus_rev(Some(sender_user), room_id, Some(base_count))
.pdus_rev(room_id, Some(base_count))
.ignore_err()
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.ready_filter_map(|item| event_filter(item, filter))
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
@@ -96,8 +110,20 @@ pub(crate) async fn get_context_route(
let events_after = services
.rooms
.timeline
.pdus(Some(sender_user), room_id, Some(base_count))
.pdus(room_id, Some(base_count))
.ignore_err()
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.ready_filter_map(|item| event_filter(item, filter))
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))

View File

@@ -389,7 +389,7 @@ pub(crate) async fn get_key_changes_route(
device_list_updates.extend(
services
.users
.keys_changed(sender_user, from, Some(to))
.keys_changed(sender_user, Some(from), Some(to))
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
.await,
@@ -401,7 +401,7 @@ pub(crate) async fn get_key_changes_route(
device_list_updates.extend(
services
.users
.room_keys_changed(room_id, from, Some(to))
.room_keys_changed(room_id, Some(from), Some(to))
.map(|(user_id, _)| user_id)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()

View File

@@ -44,6 +44,7 @@
rooms::{
state::RoomMutexGuard,
state_compressor::{CompressedState, HashSetCompressStateEvent},
timeline::pdu_fits,
},
};
@@ -573,6 +574,13 @@ async fn join_room_by_id_helper_remote(
return state;
},
};
if !pdu_fits(&mut value.clone()) {
warn!(
"dropping incoming PDU {event_id} in room {room_id} from room join because \
it exceeds 65535 bytes or is otherwise too large."
);
return state;
}
services.rooms.outlier.add_pdu_outlier(&event_id, &value);
if let Some(state_key) = &pdu.state_key {
let shortstatekey = services

View File

@@ -5,7 +5,7 @@
use conduwuit::{
Err, Result, debug, debug_info, debug_warn, err, info,
matrix::{
event::{Event, gen_event_id},
event::gen_event_id,
pdu::{PduBuilder, PduEvent},
},
result::FlatOk,
@@ -458,7 +458,7 @@ async fn knock_room_helper_local(
.await,
};
let send_knock_response = services
services
.sending
.send_federation_request(&remote_server, send_knock_request)
.await?;
@@ -477,20 +477,14 @@ async fn knock_room_helper_local(
.map_err(|e| err!(BadServerResponse("Invalid knock event PDU: {e:?}")))?;
info!("Updating membership locally to knock state with provided stripped state events");
// TODO: this call does not appear to do anything because `update_membership`
// doesn't call `mark_as_knock`. investigate further, ideally with the aim of
// removing this call entirely -- Ginger thinks `update_membership` should only
// be called from `force_state` and `append_pdu`.
services
.rooms
.state_cache
.update_membership(
room_id,
sender_user,
parsed_knock_pdu
.get_content::<RoomMemberEventContent>()
.expect("we just created this"),
sender_user,
Some(send_knock_response.knock_room_state),
None,
false,
)
.update_membership(room_id, sender_user, &parsed_knock_pdu, false)
.await?;
info!("Appending room knock event locally");
@@ -677,20 +671,11 @@ async fn knock_room_helper_remote(
.await?;
info!("Updating membership locally to knock state with provided stripped state events");
// TODO: see TODO on the other call to `update_membership`
services
.rooms
.state_cache
.update_membership(
room_id,
sender_user,
parsed_knock_pdu
.get_content::<RoomMemberEventContent>()
.expect("we just created this"),
sender_user,
Some(send_knock_response.knock_room_state),
None,
false,
)
.update_membership(room_id, sender_user, &parsed_knock_pdu, false)
.await?;
info!("Appending room knock event locally");

View File

@@ -2,12 +2,12 @@
use axum::extract::State;
use conduwuit::{
Err, Result, debug_info, debug_warn, err,
Err, Pdu, Result, debug_info, debug_warn, err,
matrix::{event::gen_event_id, pdu::PduBuilder},
utils::{self, FutureBoolExt, future::ReadyEqExt},
warn,
};
use futures::{FutureExt, StreamExt, TryFutureExt, pin_mut};
use futures::{FutureExt, StreamExt, pin_mut};
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, OwnedServerName, RoomId, RoomVersionId, UserId,
api::{
@@ -81,42 +81,9 @@ pub async fn leave_room(
room_id: &RoomId,
reason: Option<String>,
) -> Result {
let default_member_content = RoomMemberEventContent {
membership: MembershipState::Leave,
reason: reason.clone(),
join_authorized_via_users_server: None,
is_direct: None,
avatar_url: None,
displayname: None,
third_party_invite: None,
blurhash: None,
redact_events: None,
};
let is_banned = services.rooms.metadata.is_banned(room_id);
let is_disabled = services.rooms.metadata.is_disabled(room_id);
pin_mut!(is_banned, is_disabled);
if is_banned.or(is_disabled).await {
// the room is banned/disabled, the room must be rejected locally since we
// cant/dont want to federate with this server
services
.rooms
.state_cache
.update_membership(
room_id,
user_id,
default_member_content,
user_id,
None,
None,
true,
)
.await?;
return Ok(());
}
let dont_have_room = services
.rooms
.state_cache
@@ -129,43 +96,41 @@ pub async fn leave_room(
.is_knocked(user_id, room_id)
.eq(&false);
// Ask a remote server if we don't have this room and are not knocking on it
if dont_have_room.and(not_knocked).await {
if let Err(e) = remote_leave_room(services, user_id, room_id, reason.clone())
.boxed()
.await
{
warn!(%user_id, "Failed to leave room {room_id} remotely: {e}");
// Don't tell the client about this error
}
pin_mut!(is_banned, is_disabled);
let last_state = services
.rooms
.state_cache
.invite_state(user_id, room_id)
.or_else(|_| services.rooms.state_cache.knock_state(user_id, room_id))
.or_else(|_| services.rooms.state_cache.left_state(user_id, room_id))
.await
.ok();
/*
there are three possible cases when leaving a room:
1. the room is banned or disabled, so we're not federating with it.
2. nobody on the homeserver is in the room, which can happen if the user is rejecting an invite
to a room that we don't have any members in.
3. someone else on the homeserver is in the room. in this case we can leave like normal by sending a PDU over federation.
// We always drop the invite, we can't rely on other servers
services
.rooms
.state_cache
.update_membership(
room_id,
user_id,
default_member_content,
user_id,
last_state,
None,
true,
)
.await?;
in cases 1 and 2, we have to update the state cache using `mark_as_left` directly.
otherwise `build_and_append_pdu` will take care of updating the state cache for us.
*/
// `leave_pdu` is the outlier `m.room.member` event which will be synced to the
// user. if it's None the sync handler will create a dummy PDU.
let leave_pdu = if is_banned.or(is_disabled).await {
// case 1: the room is banned/disabled. we don't want to federate with another
// server to leave, so we can't create an outlier PDU.
None
} else if dont_have_room.and(not_knocked).await {
// case 2: ask a remote server to assist us with leaving
// we always mark the room as left locally, regardless of if the federated leave
// failed
remote_leave_room(services, user_id, room_id, reason.clone(), HashSet::new())
.await
.inspect_err(|err| {
warn!(%user_id, "Failed to leave room {room_id} remotely: {err}");
})
.ok()
} else {
// case 3: we can leave by sending a PDU.
let state_lock = services.rooms.state.mutex.lock(room_id).await;
let Ok(event) = services
let user_member_event_content = services
.rooms
.state_accessor
.room_state_get_content::<RoomMemberEventContent>(
@@ -173,64 +138,84 @@ pub async fn leave_room(
&StateEventType::RoomMember,
user_id.as_str(),
)
.await
else {
debug_warn!(
"Trying to leave a room you are not a member of, marking room as left locally."
);
.await;
return services
.rooms
.state_cache
.update_membership(
room_id,
user_id,
default_member_content,
user_id,
None,
None,
true,
)
.await;
};
match user_member_event_content {
| Ok(content) => {
services
.rooms
.timeline
.build_and_append_pdu(
PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
membership: MembershipState::Leave,
reason,
join_authorized_via_users_server: None,
is_direct: None,
..content
}),
user_id,
Some(room_id),
&state_lock,
)
.await?;
services
.rooms
.timeline
.build_and_append_pdu(
PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
membership: MembershipState::Leave,
reason,
join_authorized_via_users_server: None,
is_direct: None,
..event
}),
user_id,
Some(room_id),
&state_lock,
)
.await?;
}
// `build_and_append_pdu` calls `mark_as_left` internally, so we return early.
return Ok(());
},
| Err(_) => {
// an exception to case 3 is if the user isn't even in the room they're trying
// to leave. this can happen if the client's caching is wrong.
debug_warn!(
"Trying to leave a room you are not a member of, marking room as left \
locally."
);
// return the existing leave state, if one exists. `mark_as_left` will then
// update the `roomuserid_leftcount` table, making the leave come down sync
// again.
services
.rooms
.state_cache
.left_state(user_id, room_id)
.await?
},
}
};
services
.rooms
.state_cache
.mark_as_left(user_id, room_id, leave_pdu)
.await;
services
.rooms
.state_cache
.update_joined_count(room_id)
.await;
Ok(())
}
pub async fn remote_leave_room(
pub async fn remote_leave_room<S: ::std::hash::BuildHasher>(
services: &Services,
user_id: &UserId,
room_id: &RoomId,
reason: Option<String>,
) -> Result<()> {
mut servers: HashSet<OwnedServerName, S>,
) -> Result<Pdu> {
let mut make_leave_response_and_server =
Err!(BadServerResponse("No remote server available to assist in leaving {room_id}."));
let mut servers: HashSet<OwnedServerName> = services
.rooms
.state_cache
.servers_invite_via(room_id)
.map(ToOwned::to_owned)
.collect()
.await;
servers.extend(
services
.rooms
.state_cache
.servers_invite_via(room_id)
.map(ToOwned::to_owned)
.collect::<HashSet<OwnedServerName>>()
.await,
);
match services
.rooms
@@ -277,6 +262,11 @@ pub async fn remote_leave_room(
if let Some(room_id_server_name) = room_id.server_name() {
servers.insert(room_id_server_name.to_owned());
}
if servers.is_empty() {
return Err!(BadServerResponse(warn!(
"No remote servers found to assist in leaving {room_id}."
)));
}
debug_info!("servers in remote_leave_room: {servers:?}");
@@ -284,7 +274,7 @@ pub async fn remote_leave_room(
let make_leave_response = services
.sending
.send_federation_request(
&remote_server,
remote_server.as_ref(),
federation::membership::prepare_leave_event::v1::Request {
room_id: room_id.to_owned(),
user_id: user_id.to_owned(),
@@ -292,11 +282,21 @@ pub async fn remote_leave_room(
)
.await;
make_leave_response_and_server = make_leave_response.map(|r| (r, remote_server));
let error = make_leave_response.as_ref().err().map(ToString::to_string);
make_leave_response_and_server = make_leave_response.map(|r| (r, remote_server.clone()));
if make_leave_response_and_server.is_ok() {
debug_info!(
"Received make_leave_response from {} for leaving {room_id}",
remote_server
);
break;
}
debug_warn!(
"Failed to get make_leave_response from {} for leaving {room_id}: {}",
remote_server,
error.unwrap()
);
}
let (make_leave_response, remote_server) = make_leave_response_and_server?;
@@ -304,13 +304,14 @@ pub async fn remote_leave_room(
let Some(room_version_id) = make_leave_response.room_version else {
return Err!(BadServerResponse(warn!(
"No room version was returned by {remote_server} for {room_id}, room version is \
likely not supported by conduwuit"
likely not supported by continuwuity"
)));
};
if !services.server.supported_room_version(&room_version_id) {
return Err!(BadServerResponse(warn!(
"Remote room version {room_version_id} for {room_id} is not supported by conduwuit",
"Remote room version {room_version_id} for {room_id} is not supported by \
continuwuity",
)));
}
@@ -373,7 +374,7 @@ pub async fn remote_leave_room(
&remote_server,
federation::membership::create_leave_event::v2::Request {
room_id: room_id.to_owned(),
event_id,
event_id: event_id.clone(),
pdu: services
.sending
.convert_to_outgoing_federation_event(leave_event.clone())
@@ -382,5 +383,14 @@ pub async fn remote_leave_room(
)
.await?;
Ok(())
services
.rooms
.outlier
.add_pdu_outlier(&event_id, &leave_event);
let leave_pdu = Pdu::from_id_val(&event_id, leave_event).map_err(|e| {
err!(BadServerResponse("Invalid leave PDU received during federated leave: {e:?}"))
})?;
Ok(leave_pdu)
}

View File

@@ -1,6 +1,7 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, at,
Err, Result, at, debug_warn,
matrix::{
event::{Event, Matches},
pdu::PduCount,
@@ -16,7 +17,7 @@
Services,
rooms::{
lazy_loading,
lazy_loading::{Options, Witness},
lazy_loading::{MemberSet, Options},
timeline::PdusIterItem,
},
};
@@ -70,6 +71,7 @@
/// where the user was joined, depending on `history_visibility`)
pub(crate) async fn get_message_events_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<get_message_events::v3::Request>,
) -> Result<get_message_events::v3::Response> {
debug_assert!(IGNORED_MESSAGE_TYPES.is_sorted(), "IGNORED_MESSAGE_TYPES is not sorted");
@@ -78,6 +80,11 @@ pub(crate) async fn get_message_events_route(
let room_id = &body.room_id;
let filter = &body.filter;
services
.users
.update_device_last_seen(sender_user, sender_device, client_ip)
.await;
if !services.rooms.metadata.exists(room_id).await {
return Err!(Request(Forbidden("Room does not exist to this server")));
}
@@ -115,14 +122,14 @@ pub(crate) async fn get_message_events_route(
| Direction::Forward => services
.rooms
.timeline
.pdus(Some(sender_user), room_id, Some(from))
.pdus(room_id, Some(from))
.ignore_err()
.boxed(),
| Direction::Backward => services
.rooms
.timeline
.pdus_rev(Some(sender_user), room_id, Some(from))
.pdus_rev(room_id, Some(from))
.ignore_err()
.boxed(),
};
@@ -133,6 +140,18 @@ pub(crate) async fn get_message_events_route(
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
.take(limit)
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.collect()
.await;
@@ -162,7 +181,7 @@ pub(crate) async fn get_message_events_route(
let state = witness
.map(Option::into_iter)
.map(|option| option.flat_map(Witness::into_iter))
.map(|option| option.flat_map(MemberSet::into_iter))
.map(IterStream::stream)
.into_stream()
.flatten()
@@ -192,7 +211,7 @@ pub(crate) async fn lazy_loading_witness<'a, I>(
services: &Services,
lazy_loading_context: &lazy_loading::Context<'_>,
events: I,
) -> Witness
) -> MemberSet
where
I: Iterator<Item = &'a PdusIterItem> + Clone + Send,
{
@@ -213,10 +232,10 @@ pub(crate) async fn lazy_loading_witness<'a, I>(
let receipts = services
.rooms
.read_receipt
.readreceipts_since(lazy_loading_context.room_id, oldest.into_unsigned());
.readreceipts_since(lazy_loading_context.room_id, Some(oldest.into_unsigned()));
pin_mut!(receipts);
let witness: Witness = events
let witness: MemberSet = events
.stream()
.map(ref_at!(1))
.map(Event::sender)
@@ -224,7 +243,7 @@ pub(crate) async fn lazy_loading_witness<'a, I>(
.chain(
receipts
.ready_take_while(|(_, c, _)| *c <= newest.into_unsigned())
.map(|(user_id, ..)| user_id.to_owned()),
.map(|(user_id, ..)| user_id),
)
.collect()
.await;
@@ -232,7 +251,7 @@ pub(crate) async fn lazy_loading_witness<'a, I>(
services
.rooms
.lazy_loading
.witness_retain(witness, lazy_loading_context)
.retain_lazy_members(witness, lazy_loading_context)
.await
}

View File

@@ -1,6 +1,7 @@
use std::collections::BTreeMap;
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, PduCount, Result, err};
use ruma::{
MilliSecondsSinceUnixEpoch,
@@ -118,9 +119,14 @@ pub(crate) async fn set_read_marker_route(
/// Sets private read marker and public read receipt EDU.
pub(crate) async fn create_receipt_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<create_receipt::v3::Request>,
) -> Result<create_receipt::v3::Response> {
let sender_user = body.sender_user();
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
.await;
if matches!(
&body.receipt_type,

View File

@@ -1,4 +1,5 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, matrix::pdu::PduBuilder};
use ruma::{
api::client::redact::redact_event, events::room::redaction::RoomRedactionEventContent,
@@ -13,9 +14,14 @@
/// - TODO: Handle txn id
pub(crate) async fn redact_event_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<redact_event::v3::Request>,
) -> Result<redact_event::v3::Response> {
let sender_user = body.sender_user();
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
.await;
let body = &body.body;
if services.users.is_suspended(sender_user).await? {
// TODO: Users can redact their own messages while suspended

View File

@@ -1,6 +1,6 @@
use axum::extract::State;
use conduwuit::{
Result, at,
Err, Result, at, debug_warn,
matrix::{Event, event::RelationTypeEqual, pdu::PduCount},
utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt},
};
@@ -109,6 +109,16 @@ async fn paginate_relations_with_filter(
recurse: bool,
dir: Direction,
) -> Result<get_relating_events::v1::Response> {
if !services
.rooms
.state_accessor
.user_can_see_event(sender_user, room_id, target)
.await
{
debug_warn!(req_evt = ?target, ?room_id, "Event relations requested by {sender_user} but is not allowed to see it, returning 404");
return Err!(Request(NotFound("Event not found.")));
}
let start: PduCount = from
.map(str::parse)
.transpose()?
@@ -129,11 +139,6 @@ async fn paginate_relations_with_filter(
// Spec (v1.10) recommends depth of at least 3
let depth: u8 = if recurse { 3 } else { 1 };
// Check if this is a thread request
let is_thread = filter_rel_type
.as_ref()
.is_some_and(|rel| *rel == RelationType::Thread);
let events: Vec<_> = services
.rooms
.pdu_metadata
@@ -152,40 +157,24 @@ async fn paginate_relations_with_filter(
})
.stream()
.ready_take_while(|(count, _)| Some(*count) != to)
.wide_filter_map(|item| visibility_filter(services, sender_user, item))
.take(limit)
.wide_filter_map(|item| visibility_filter(services, sender_user, item))
.then(async |mut pdu| {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations to relation: {e}");
}
pdu
})
.collect()
.await;
// For threads, check if we should include the root event
let mut root_event = None;
if is_thread && dir == Direction::Backward {
// Check if we've reached the beginning of the thread
// (fewer events than requested means we've exhausted the thread)
if events.len() < limit {
// Try to get the thread root event
if let Ok(root_pdu) = services.rooms.timeline.get_pdu(target).await {
// Check visibility
if services
.rooms
.state_accessor
.user_can_see_event(sender_user, room_id, target)
.await
{
// Store the root event to add to the response
root_event = Some(root_pdu);
}
}
}
}
// Determine if there are more events to fetch
let has_more = if root_event.is_some() {
false // We've included the root, no more events
} else {
// Check if we got a full page of results (might be more)
events.len() >= limit
};
let has_more = events.len() >= limit;
let next_batch = if has_more {
match dir {
@@ -197,11 +186,10 @@ async fn paginate_relations_with_filter(
None
};
// Build the response chunk with thread root if needed
let chunk: Vec<_> = root_event
let chunk: Vec<_> = events
.into_iter()
.map(at!(1))
.map(Event::into_format)
.chain(events.into_iter().map(at!(1)).map(Event::into_format))
.collect();
Ok(get_relating_events::v1::Response {

View File

@@ -1,5 +1,5 @@
use axum::extract::State;
use conduwuit::{Err, Event, Result, err};
use conduwuit::{Err, Event, Result, debug_warn, err};
use futures::{FutureExt, TryFutureExt, future::try_join};
use ruma::api::client::room::get_room_event;
@@ -33,7 +33,16 @@ pub(crate) async fn get_room_event_route(
return Err!(Request(Forbidden("You don't have permission to view this event.")));
}
event.add_age().ok();
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(body.sender_user(), &mut event)
.await
{
debug_warn!("Failed to add bundled aggregations to event: {e}");
}
event.set_unsigned(body.sender_user.as_deref());
Ok(get_room_event::v3::Response { event: event.into_format() })
}

View File

@@ -1,6 +1,6 @@
use axum::extract::State;
use conduwuit::{
Err, Event, Result, at,
Err, Event, Result, at, debug_warn,
utils::{BoolExt, stream::TryTools},
};
use futures::{FutureExt, TryStreamExt, future::try_join4};
@@ -40,12 +40,28 @@ pub(crate) async fn room_initial_sync_route(
.map_ok(Event::into_format)
.try_collect::<Vec<_>>();
// Events are returned in body
let limit = LIMIT_MAX;
let events = services
.rooms
.timeline
.pdus_rev(None, room_id, None)
.pdus_rev(room_id, None)
.try_take(limit)
.and_then(async |mut pdu| {
pdu.1.set_unsigned(body.sender_user.as_deref());
if let Some(sender_user) = body.sender_user.as_deref() {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
}
Ok(pdu)
})
.try_collect::<Vec<_>>();
let (membership, visibility, state, events) =

View File

@@ -2,7 +2,7 @@
use axum::extract::State;
use conduwuit::{
Err, Result, at, is_true,
Err, Result, at, debug_warn, is_true,
matrix::Event,
result::FlatOk,
utils::{IterStream, stream::ReadyExt},
@@ -50,7 +50,7 @@ pub(crate) async fn search_events_route(
Ok(Response {
search_categories: ResultCategories {
room_events: room_events_result
room_events: Box::pin(room_events_result)
.await
.unwrap_or_else(|| Ok(ResultRoomEvents::default()))?,
},
@@ -110,7 +110,12 @@ async fn category_room_events(
limit,
};
let (count, results) = services.rooms.search.search_pdus(&query).await.ok()?;
let (count, results) = services
.rooms
.search
.search_pdus(&query, sender_user)
.await
.ok()?;
results
.collect::<Vec<_>>()
@@ -144,6 +149,17 @@ async fn category_room_events(
.map(at!(2))
.flatten()
.stream()
.then(|mut pdu| async {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu)
.await
{
debug_warn!("Failed to add bundled aggregations to search result: {e}");
}
pdu
})
.map(Event::into_format)
.map(|result| SearchResult {
rank: None,

View File

@@ -1,6 +1,7 @@
use std::collections::BTreeMap;
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, err, matrix::pdu::PduBuilder, utils};
use ruma::{api::client::message::send_message_event, events::MessageLikeEventType};
use serde_json::from_str;
@@ -18,6 +19,7 @@
/// allowed
pub(crate) async fn send_message_event_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<send_message_event::v3::Request>,
) -> Result<send_message_event::v3::Response> {
let sender_user = body.sender_user();
@@ -27,6 +29,11 @@ pub(crate) async fn send_message_event_route(
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
.await;
// Forbid m.room.encrypted if encryption is disabled
if MessageLikeEventType::RoomEncrypted == body.event_type && !services.config.allow_encryption
{

View File

@@ -1,4 +1,5 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, err,
matrix::{Event, pdu::PduBuilder},
@@ -7,7 +8,7 @@
use conduwuit_service::Services;
use futures::{FutureExt, TryStreamExt};
use ruma::{
OwnedEventId, RoomId, UserId,
MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId,
api::client::state::{get_state_events, get_state_events_for_key, send_state_event},
events::{
AnyStateEventContent, StateEventType,
@@ -30,9 +31,14 @@
/// Sends a state event into the room.
pub(crate) async fn send_state_event_for_key_route(
State(services): State<crate::State>,
InsecureClientIp(ip): InsecureClientIp,
body: Ruma<send_state_event::v3::Request>,
) -> Result<send_state_event::v3::Response> {
let sender_user = body.sender_user();
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), ip)
.await;
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
@@ -61,9 +67,10 @@ pub(crate) async fn send_state_event_for_key_route(
/// Sends a state event into the room.
pub(crate) async fn send_state_event_for_empty_key_route(
State(services): State<crate::State>,
InsecureClientIp(ip): InsecureClientIp,
body: Ruma<send_state_event::v3::Request>,
) -> Result<RumaResponse<send_state_event::v3::Response>> {
send_state_event_for_key_route(State(services), body)
send_state_event_for_key_route(State(services), InsecureClientIp(ip), body)
.boxed()
.await
.map(RumaResponse)
@@ -151,7 +158,7 @@ pub(crate) async fn get_state_events_for_key_route(
"content": event.content(),
"event_id": event.event_id(),
"origin_server_ts": event.origin_server_ts(),
"room_id": event.room_id(),
"room_id": event.room_id_or_hash(),
"sender": event.sender(),
"state_key": event.state_key(),
"type": event.kind(),
@@ -185,7 +192,7 @@ async fn send_state_event_for_key_helper(
event_type: &StateEventType,
json: &Raw<AnyStateEventContent>,
state_key: &str,
timestamp: Option<ruma::MilliSecondsSinceUnixEpoch>,
timestamp: Option<MilliSecondsSinceUnixEpoch>,
) -> Result<OwnedEventId> {
allowed_to_send_state_event(services, room_id, event_type, state_key, json).await?;
let state_lock = services.rooms.state.mutex.lock(room_id).await;

View File

@@ -1,65 +1,145 @@
mod v3;
mod v4;
mod v5;
use std::collections::VecDeque;
use conduwuit::{
Error, PduCount, Result,
Event, PduCount, Result, debug_warn, err,
matrix::pdu::PduEvent,
ref_at, trace,
utils::stream::{BroadbandExt, ReadyExt, TryIgnore},
};
use conduwuit_service::Services;
use futures::{StreamExt, pin_mut};
use futures::StreamExt;
use ruma::{
RoomId, UserId,
OwnedUserId, RoomId, UserId,
events::TimelineEventType::{
self, Beacon, CallInvite, PollStart, RoomEncrypted, RoomMessage, Sticker,
},
};
pub(crate) use self::{
v3::sync_events_route, v4::sync_events_v4_route, v5::sync_events_v5_route,
};
pub(crate) use self::{v3::sync_events_route, v5::sync_events_v5_route};
pub(crate) const DEFAULT_BUMP_TYPES: &[TimelineEventType; 6] =
&[CallInvite, PollStart, Beacon, RoomEncrypted, RoomMessage, Sticker];
#[derive(Default)]
pub(crate) struct TimelinePdus {
pub pdus: VecDeque<(PduCount, PduEvent)>,
pub limited: bool,
}
impl TimelinePdus {
fn senders(&self) -> impl Iterator<Item = OwnedUserId> {
self.pdus
.iter()
.map(ref_at!(1))
.map(Event::sender)
.map(Into::into)
}
}
/// Load up to `limit` PDUs in the range (starting_count, ending_count].
async fn load_timeline(
services: &Services,
sender_user: &UserId,
room_id: &RoomId,
roomsincecount: PduCount,
next_batch: Option<PduCount>,
starting_count: Option<PduCount>,
ending_count: Option<PduCount>,
limit: usize,
) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
let last_timeline_count = services
.rooms
.timeline
.last_timeline_count(Some(sender_user), room_id)
.await?;
) -> Result<TimelinePdus> {
let mut pdu_stream = match starting_count {
| Some(starting_count) => {
let last_timeline_count = services
.rooms
.timeline
.last_timeline_count(room_id)
.await
.map_err(|err| {
err!(Database(warn!("Failed to fetch end of room timeline: {}", err)))
})?;
if last_timeline_count <= roomsincecount {
return Ok((Vec::new(), false));
}
if last_timeline_count <= starting_count {
// no messages have been sent in this room since `starting_count`
return Ok(TimelinePdus::default());
}
let non_timeline_pdus = services
.rooms
.timeline
.pdus_rev(Some(sender_user), room_id, None)
.ignore_err()
.ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max))
.ready_take_while(|&(pducount, _)| pducount > roomsincecount);
// for incremental sync, stream from the DB all PDUs which were sent after
// `starting_count` but before `ending_count`, including `ending_count` but
// not `starting_count`. this code is pretty similar to the initial sync
// branch, they're separate to allow for future optimization
services
.rooms
.timeline
.pdus_rev(room_id, ending_count.map(|count| count.saturating_add(1)))
.ignore_err()
.ready_take_while(move |&(pducount, _)| pducount > starting_count)
.map(move |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
pdu
})
.then(async move |mut pdu| {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.boxed()
},
| None => {
// For initial sync, stream from the DB all PDUs before and including
// `ending_count` in reverse order
services
.rooms
.timeline
.pdus_rev(room_id, ending_count.map(|count| count.saturating_add(1)))
.ignore_err()
.map(move |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
pdu
})
.then(async move |mut pdu| {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.boxed()
},
};
// Take the last events for the timeline
pin_mut!(non_timeline_pdus);
let timeline_pdus: Vec<_> = non_timeline_pdus.by_ref().take(limit).collect().await;
// Return at most `limit` PDUs from the stream
let pdus = pdu_stream
.by_ref()
.take(limit)
.ready_fold(VecDeque::with_capacity(limit), |mut pdus, item| {
pdus.push_front(item);
pdus
})
.await;
let timeline_pdus: Vec<_> = timeline_pdus.into_iter().rev().collect();
// The timeline is limited if there are still more PDUs in the stream
let limited = pdu_stream.next().await.is_some();
// They /sync response doesn't always return all messages, so we say the output
// is limited unless there are events in non_timeline_pdus
let limited = non_timeline_pdus.next().await.is_some();
trace!(
"syncing {:?} timeline pdus from {:?} to {:?} (limited = {:?})",
pdus.len(),
starting_count,
ending_count,
limited,
);
Ok((timeline_pdus, limited))
Ok(TimelinePdus { pdus, limited })
}
async fn share_encrypted_room(

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,852 @@
use std::collections::{BTreeMap, HashSet};
use conduwuit::{
Result, at, debug_warn, err, extract_variant,
matrix::{
Event,
pdu::{PduCount, PduEvent},
},
trace,
utils::{
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
math::ruma_from_u64,
stream::{TryIgnore, WidebandExt},
},
warn,
};
use conduwuit_service::Services;
use futures::{
FutureExt, StreamExt, TryFutureExt,
future::{OptionFuture, join, join3, join4, try_join, try_join3},
};
use ruma::{
OwnedRoomId, OwnedUserId, RoomId, UserId,
api::client::sync::sync_events::{
UnreadNotificationsCount,
v3::{Ephemeral, JoinedRoom, RoomAccountData, RoomSummary, State as RoomState, Timeline},
},
events::{
AnyRawAccountDataEvent, StateEventType,
TimelineEventType::*,
room::member::{MembershipState, RoomMemberEventContent},
},
serde::Raw,
uint,
};
use service::rooms::short::ShortStateHash;
use super::{load_timeline, share_encrypted_room};
use crate::client::{
TimelinePdus, ignored_filter,
sync::v3::{
DEFAULT_TIMELINE_LIMIT, DeviceListUpdates, SyncContext, prepare_lazily_loaded_members,
state::{build_state_incremental, build_state_initial},
},
};
/// Generate the sync response for a room the user is joined to.
#[tracing::instrument(
name = "joined",
level = "debug",
skip_all,
fields(
room_id = ?room_id,
syncing_user = ?sync_context.syncing_user,
),
)]
pub(super) async fn load_joined_room(
services: &Services,
sync_context: SyncContext<'_>,
ref room_id: OwnedRoomId,
) -> Result<(JoinedRoom, DeviceListUpdates)> {
/*
Building a sync response involves many steps which all depend on each other.
To parallelize the process as much as possible, each step is divided into its own function,
and `join*` functions are used to perform steps in parallel which do not depend on each other.
*/
let (
account_data,
ephemeral,
StateAndTimeline {
state_events,
timeline,
summary,
notification_counts,
device_list_updates,
},
) = try_join3(
build_account_data(services, sync_context, room_id),
build_ephemeral(services, sync_context, room_id),
build_state_and_timeline(services, sync_context, room_id),
)
.boxed()
.await?;
if !timeline.is_empty() || !state_events.is_empty() {
trace!(
"syncing {} timeline events (limited = {}) and {} state events",
timeline.events.len(),
timeline.limited,
state_events.len()
);
}
let joined_room = JoinedRoom {
account_data,
summary: summary.unwrap_or_default(),
unread_notifications: notification_counts.unwrap_or_default(),
timeline,
state: RoomState {
events: state_events.into_iter().map(Event::into_format).collect(),
},
ephemeral,
unread_thread_notifications: BTreeMap::new(),
};
Ok((joined_room, device_list_updates))
}
/// Collect changes to the syncing user's account data events.
#[tracing::instrument(level = "debug", skip_all)]
async fn build_account_data(
services: &Services,
SyncContext {
syncing_user,
last_sync_end_count,
current_count,
..
}: SyncContext<'_>,
room_id: &RoomId,
) -> Result<RoomAccountData> {
let account_data_changes = services
.account_data
.changes_since(Some(room_id), syncing_user, last_sync_end_count, Some(current_count))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect()
.await;
Ok(RoomAccountData { events: account_data_changes })
}
/// Collect new ephemeral events.
#[tracing::instrument(level = "debug", skip_all)]
async fn build_ephemeral(
services: &Services,
SyncContext { syncing_user, last_sync_end_count, .. }: SyncContext<'_>,
room_id: &RoomId,
) -> Result<Ephemeral> {
// note: some of the futures below are boxed. this is because, without the box,
// rustc produces over thirty inscrutable errors in `mod.rs` at the call-site
// of `load_joined_room`. I don't know why boxing them fixes this -- it seems
// to be related to the async closures and borrowing from the sync context.
// collect updates to read receipts
let receipt_events = services
.rooms
.read_receipt
.readreceipts_since(room_id, last_sync_end_count)
.filter_map(async |(read_user, _, edu)| {
let is_ignored = services
.users
.user_is_ignored(&read_user, syncing_user)
.await;
// filter out read receipts for ignored users
is_ignored.or_some(edu)
})
.collect::<Vec<_>>()
.boxed();
// collect the updated list of typing users, if it's changed
let typing_event = async {
let should_send_typing_event = match last_sync_end_count {
| Some(last_sync_end_count) => {
match services.rooms.typing.last_typing_update(room_id).await {
| Ok(last_typing_update) => {
// update the typing list if the users typing have changed since the last
// sync
last_typing_update > last_sync_end_count
},
| Err(err) => {
warn!("Error checking last typing update: {}", err);
return None;
},
}
},
// always update the typing list on an initial sync
| None => true,
};
if should_send_typing_event {
let event = services
.rooms
.typing
.typings_event_for_user(room_id, syncing_user)
.await;
if let Ok(event) = event {
return Some(
Raw::new(&event)
.expect("typing event should be valid")
.cast(),
);
}
}
None
};
// collect the syncing user's private-read marker, if it's changed
let private_read_event = async {
let should_send_private_read = match last_sync_end_count {
| Some(last_sync_end_count) => {
let last_privateread_update = services
.rooms
.read_receipt
.last_privateread_update(syncing_user, room_id)
.await;
// update the marker if it's changed since the last sync
last_privateread_update > last_sync_end_count
},
// always update the marker on an initial sync
| None => true,
};
if should_send_private_read {
services
.rooms
.read_receipt
.private_read_get(room_id, syncing_user)
.await
.ok()
} else {
None
}
};
let (receipt_events, typing_event, private_read_event) =
join3(receipt_events, typing_event, private_read_event).await;
let mut edus = receipt_events;
edus.extend(typing_event);
edus.extend(private_read_event);
Ok(Ephemeral { events: edus })
}
/// A struct to hold the state events, timeline, and other data which is
/// computed from them.
struct StateAndTimeline {
state_events: Vec<PduEvent>,
timeline: Timeline,
summary: Option<RoomSummary>,
notification_counts: Option<UnreadNotificationsCount>,
device_list_updates: DeviceListUpdates,
}
/// Compute changes to the room's state and timeline.
#[tracing::instrument(level = "debug", skip_all)]
async fn build_state_and_timeline(
services: &Services,
sync_context: SyncContext<'_>,
room_id: &RoomId,
) -> Result<StateAndTimeline> {
let (shortstatehashes, timeline) = try_join(
fetch_shortstatehashes(services, sync_context, room_id),
build_timeline(services, sync_context, room_id),
)
.await?;
let (state_events, notification_counts, joined_since_last_sync) = try_join3(
build_state_events(services, sync_context, room_id, shortstatehashes, &timeline),
build_notification_counts(services, sync_context, room_id, &timeline),
check_joined_since_last_sync(services, shortstatehashes, sync_context),
)
.await?;
// the timeline should always include at least one PDU if the syncing user
// joined since the last sync, that being the syncing user's join event. if
// it's empty something is wrong.
if joined_since_last_sync && timeline.pdus.is_empty() {
warn!("timeline for newly joined room is empty");
}
let (summary, device_list_updates) = try_join(
build_room_summary(
services,
sync_context,
room_id,
shortstatehashes,
&timeline,
&state_events,
joined_since_last_sync,
),
build_device_list_updates(
services,
sync_context,
room_id,
shortstatehashes,
&state_events,
joined_since_last_sync,
),
)
.await?;
// the token which may be passed to the messages endpoint to backfill room
// history
let prev_batch = timeline.pdus.front().map(at!(0));
// note: we always indicate a limited timeline if the syncing user just joined
// the room, to indicate to the client that it should request backfill (and to
// copy Synapse's behavior). for federated room joins, the `timeline` will
// usually only include the syncing user's join event.
let limited = timeline.limited || joined_since_last_sync;
// filter out ignored events from the timeline and convert the PDUs into Ruma's
// AnySyncTimelineEvent type
let filtered_timeline = timeline
.pdus
.into_iter()
.stream()
.wide_filter_map(|item| ignored_filter(services, item, sync_context.syncing_user))
.map(at!(1))
.map(Event::into_format)
.collect::<Vec<_>>()
.await;
Ok(StateAndTimeline {
state_events,
timeline: Timeline {
limited,
prev_batch: prev_batch.as_ref().map(ToString::to_string),
events: filtered_timeline,
},
summary,
notification_counts,
device_list_updates,
})
}
/// Shortstatehashes necessary to compute what state events to sync.
#[derive(Clone, Copy)]
struct ShortStateHashes {
/// The current state of the syncing room.
current_shortstatehash: ShortStateHash,
/// The state of the syncing room at the end of the last sync.
last_sync_end_shortstatehash: Option<ShortStateHash>,
}
/// Fetch the current_shortstatehash and last_sync_end_shortstatehash.
#[tracing::instrument(level = "debug", skip_all)]
async fn fetch_shortstatehashes(
services: &Services,
SyncContext { last_sync_end_count, current_count, .. }: SyncContext<'_>,
room_id: &RoomId,
) -> Result<ShortStateHashes> {
// the room state currently.
// TODO: this should be the room state as of `current_count`, but there's no way
// to get that right now.
let current_shortstatehash = services
.rooms
.state
.get_room_shortstatehash(room_id)
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
// the room state as of the end of the last sync.
// this will be None if we are doing an initial sync or if we just joined this
// room.
let last_sync_end_shortstatehash =
OptionFuture::from(last_sync_end_count.map(|last_sync_end_count| {
// look up the shortstatehash saved by the last sync's call to
// `associate_token_shortstatehash`
services
.rooms
.user
.get_token_shortstatehash(room_id, last_sync_end_count)
.inspect_err(move |_| {
debug_warn!(
token = last_sync_end_count,
"Room has no shortstatehash for this token"
);
})
.ok()
}))
.map(Option::flatten)
.map(Ok);
let (current_shortstatehash, last_sync_end_shortstatehash) =
try_join(current_shortstatehash, last_sync_end_shortstatehash).await?;
/*
associate the `current_count` with the `current_shortstatehash`, so we can
use it on the next sync as the `last_sync_end_shortstatehash`.
TODO: the table written to by this call grows extremely fast, gaining one new entry for each
joined room on _every single sync request_. we need to find a better way to remember the shortstatehash
between syncs.
*/
services
.rooms
.user
.associate_token_shortstatehash(room_id, current_count, current_shortstatehash)
.await;
Ok(ShortStateHashes {
current_shortstatehash,
last_sync_end_shortstatehash,
})
}
/// Fetch recent timeline events.
#[tracing::instrument(level = "debug", skip_all)]
async fn build_timeline(
services: &Services,
sync_context: SyncContext<'_>,
room_id: &RoomId,
) -> Result<TimelinePdus> {
let SyncContext {
syncing_user,
last_sync_end_count,
current_count,
filter,
..
} = sync_context;
/*
determine the maximum number of events to return in this sync.
if the sync filter specifies a limit, that will be used, otherwise
`DEFAULT_TIMELINE_LIMIT` will be used. `DEFAULT_TIMELINE_LIMIT` will also be
used if the limit is somehow greater than usize::MAX.
*/
let timeline_limit = filter
.room
.timeline
.limit
.and_then(|limit| limit.try_into().ok())
.unwrap_or(DEFAULT_TIMELINE_LIMIT);
load_timeline(
services,
syncing_user,
room_id,
last_sync_end_count.map(PduCount::Normal),
Some(PduCount::Normal(current_count)),
timeline_limit,
)
.await
}
/// Calculate the state events to sync.
async fn build_state_events(
services: &Services,
sync_context: SyncContext<'_>,
room_id: &RoomId,
shortstatehashes: ShortStateHashes,
timeline: &TimelinePdus,
) -> Result<Vec<PduEvent>> {
let SyncContext {
syncing_user,
last_sync_end_count,
full_state,
..
} = sync_context;
let ShortStateHashes {
current_shortstatehash,
last_sync_end_shortstatehash,
} = shortstatehashes;
// the spec states that the `state` property only includes state events up to
// the beginning of the timeline, so we determine the state of the syncing room
// as of the first timeline event. NOTE: this explanation is not entirely
// accurate; see the implementation of `build_state_incremental`.
let timeline_start_shortstatehash = async {
if let Some((_, pdu)) = timeline.pdus.front() {
if let Ok(shortstatehash) = services
.rooms
.state_accessor
.pdu_shortstatehash(&pdu.event_id)
.await
{
return shortstatehash;
}
}
current_shortstatehash
};
// the user IDs of members whose membership needs to be sent to the client, if
// lazy-loading is enabled.
let lazily_loaded_members =
prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders());
let (timeline_start_shortstatehash, lazily_loaded_members) =
join(timeline_start_shortstatehash, lazily_loaded_members).await;
// compute the state delta between the previous sync and this sync.
match (last_sync_end_count, last_sync_end_shortstatehash) {
/*
if `last_sync_end_count` is Some (meaning this is an incremental sync), and `last_sync_end_shortstatehash`
is Some (meaning the syncing user didn't just join this room for the first time ever), and `full_state` is false,
then use `build_state_incremental`.
*/
| (Some(last_sync_end_count), Some(last_sync_end_shortstatehash)) if !full_state =>
build_state_incremental(
services,
syncing_user,
room_id,
PduCount::Normal(last_sync_end_count),
last_sync_end_shortstatehash,
timeline_start_shortstatehash,
current_shortstatehash,
timeline,
lazily_loaded_members.as_ref(),
)
.boxed()
.await,
/*
otherwise use `build_state_initial`. note that this branch will be taken if the user joined this room since the last sync
for the first time ever, because in that case we have no `last_sync_end_shortstatehash` and can't correctly calculate
the state using the incremental sync algorithm.
*/
| _ =>
build_state_initial(
services,
syncing_user,
timeline_start_shortstatehash,
lazily_loaded_members.as_ref(),
)
.boxed()
.await,
}
}
/// Compute the number of unread notifications in this room.
#[tracing::instrument(level = "debug", skip_all)]
async fn build_notification_counts(
services: &Services,
SyncContext { syncing_user, last_sync_end_count, .. }: SyncContext<'_>,
room_id: &RoomId,
timeline: &TimelinePdus,
) -> Result<Option<UnreadNotificationsCount>> {
// determine whether to actually update the notification counts
let should_send_notification_counts = async {
// if we're going to sync some timeline events, the notification count has
// definitely changed to include them
if !timeline.pdus.is_empty() {
return true;
}
// if this is an initial sync, we need to send notification counts because the
// client doesn't know what they are yet
let Some(last_sync_end_count) = last_sync_end_count else {
return true;
};
let last_notification_read = services
.rooms
.user
.last_notification_read(syncing_user, room_id)
.await;
// if the syncing user has read the events we sent during the last sync, we need
// to send a new notification count on this sync.
if last_notification_read > last_sync_end_count {
return true;
}
// otherwise, nothing's changed.
false
};
if should_send_notification_counts.await {
let (notification_count, highlight_count) = join(
services
.rooms
.user
.notification_count(syncing_user, room_id)
.map(TryInto::try_into)
.unwrap_or(uint!(0)),
services
.rooms
.user
.highlight_count(syncing_user, room_id)
.map(TryInto::try_into)
.unwrap_or(uint!(0)),
)
.await;
trace!(?notification_count, ?highlight_count, "syncing new notification counts");
Ok(Some(UnreadNotificationsCount {
notification_count: Some(notification_count),
highlight_count: Some(highlight_count),
}))
} else {
Ok(None)
}
}
/// Check if the syncing user joined the room since their last incremental sync.
#[tracing::instrument(level = "debug", skip_all)]
async fn check_joined_since_last_sync(
services: &Services,
ShortStateHashes { last_sync_end_shortstatehash, .. }: ShortStateHashes,
SyncContext { syncing_user, .. }: SyncContext<'_>,
) -> Result<bool> {
// fetch the syncing user's membership event during the last sync.
// this will be None if `previous_sync_end_shortstatehash` is None.
let membership_during_previous_sync = match last_sync_end_shortstatehash {
| Some(last_sync_end_shortstatehash) => services
.rooms
.state_accessor
.state_get_content(
last_sync_end_shortstatehash,
&StateEventType::RoomMember,
syncing_user.as_str(),
)
.await
.inspect_err(|_| debug_warn!("User has no previous membership"))
.ok(),
| None => None,
};
// TODO: If the requesting user got state-reset out of the room, this
// will be `true` when it shouldn't be. this function should never be called
// in that situation, but it may be if the membership cache didn't get updated.
// the root cause of this needs to be addressed
let joined_since_last_sync =
membership_during_previous_sync.is_none_or(|content: RoomMemberEventContent| {
content.membership != MembershipState::Join
});
if joined_since_last_sync {
trace!("user joined since last sync");
}
Ok(joined_since_last_sync)
}
/// Build the `summary` field of the room object, which includes
/// the number of joined and invited users and the room's heroes.
#[tracing::instrument(level = "debug", skip_all)]
async fn build_room_summary(
services: &Services,
SyncContext { syncing_user, .. }: SyncContext<'_>,
room_id: &RoomId,
ShortStateHashes { current_shortstatehash, .. }: ShortStateHashes,
timeline: &TimelinePdus,
state_events: &[PduEvent],
joined_since_last_sync: bool,
) -> Result<Option<RoomSummary>> {
// determine whether any events in the state or timeline are membership events.
let are_syncing_membership_events = timeline
.pdus
.iter()
.map(|(_, pdu)| pdu)
.chain(state_events.iter())
.any(|event| event.kind == RoomMember);
/*
we only need to send an updated room summary if:
1. there are membership events in the state or timeline, because they might have changed the
membership counts or heroes, or
2. the syncing user just joined this room, which usually implies #1 because their join event should be in the timeline.
*/
if !(are_syncing_membership_events || joined_since_last_sync) {
return Ok(None);
}
let joined_member_count = services
.rooms
.state_cache
.room_joined_count(room_id)
.unwrap_or(0);
let invited_member_count = services
.rooms
.state_cache
.room_invited_count(room_id)
.unwrap_or(0);
let has_name = services
.rooms
.state_accessor
.state_contains_type(current_shortstatehash, &StateEventType::RoomName);
let has_canonical_alias = services
.rooms
.state_accessor
.state_contains_type(current_shortstatehash, &StateEventType::RoomCanonicalAlias);
let (joined_member_count, invited_member_count, has_name, has_canonical_alias) =
join4(joined_member_count, invited_member_count, has_name, has_canonical_alias).await;
// only send heroes if the room has neither a name nor a canonical alias
let heroes = if !(has_name || has_canonical_alias) {
Some(build_heroes(services, room_id, syncing_user, current_shortstatehash).await)
} else {
None
};
trace!(
?joined_member_count,
?invited_member_count,
heroes_length = heroes.as_ref().map(HashSet::len),
"syncing updated summary"
);
Ok(Some(RoomSummary {
heroes: heroes
.map(|heroes| heroes.into_iter().collect())
.unwrap_or_default(),
joined_member_count: Some(ruma_from_u64(joined_member_count)),
invited_member_count: Some(ruma_from_u64(invited_member_count)),
}))
}
/// Fetch the user IDs to include in the `m.heroes` property of the room
/// summary.
async fn build_heroes(
services: &Services,
room_id: &RoomId,
syncing_user: &UserId,
current_shortstatehash: ShortStateHash,
) -> HashSet<OwnedUserId> {
const MAX_HERO_COUNT: usize = 5;
// fetch joined members from the state cache first
let joined_members_stream = services
.rooms
.state_cache
.room_members(room_id)
.map(ToOwned::to_owned);
// then fetch invited members
let invited_members_stream = services
.rooms
.state_cache
.room_members_invited(room_id)
.map(ToOwned::to_owned);
// then as a last resort fetch every membership event
let all_members_stream = services
.rooms
.short
.multi_get_statekey_from_short(
services
.rooms
.state_accessor
.state_full_shortids(current_shortstatehash)
.ignore_err()
.ready_filter_map(|(key, _)| Some(key)),
)
.ignore_err()
.ready_filter_map(|(event_type, state_key)| {
if event_type == StateEventType::RoomMember {
state_key.to_string().try_into().ok()
} else {
None
}
});
joined_members_stream
.chain(invited_members_stream)
.chain(all_members_stream)
// the hero list should never include the syncing user
.ready_filter(|user_id| user_id != syncing_user)
.take(MAX_HERO_COUNT)
.collect()
.await
}
/// Collect updates to users' device lists for E2EE.
#[tracing::instrument(level = "debug", skip_all)]
async fn build_device_list_updates(
services: &Services,
SyncContext {
syncing_user,
last_sync_end_count,
current_count,
..
}: SyncContext<'_>,
room_id: &RoomId,
ShortStateHashes { current_shortstatehash, .. }: ShortStateHashes,
state_events: &Vec<PduEvent>,
joined_since_last_sync: bool,
) -> Result<DeviceListUpdates> {
let is_encrypted_room = services
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")
.is_ok();
// initial syncs don't include device updates, and rooms which aren't encrypted
// don't affect them, so return early in either of those cases
if last_sync_end_count.is_none() || !(is_encrypted_room.await) {
return Ok(DeviceListUpdates::new());
}
let mut device_list_updates = DeviceListUpdates::new();
// add users with changed keys to the `changed` list
services
.users
.room_keys_changed(room_id, last_sync_end_count, Some(current_count))
.map(at!(0))
.map(ToOwned::to_owned)
.ready_for_each(|user_id| {
device_list_updates.changed.insert(user_id);
})
.await;
// add users who now share encrypted rooms to `changed` and
// users who no longer share encrypted rooms to `left`
for state_event in state_events {
if state_event.kind == RoomMember {
let Some(content): Option<RoomMemberEventContent> = state_event.get_content().ok()
else {
continue;
};
let Some(user_id): Option<OwnedUserId> = state_event
.state_key
.as_ref()
.and_then(|key| key.parse().ok())
else {
continue;
};
{
use MembershipState::*;
if matches!(content.membership, Leave | Join) {
let shares_encrypted_room =
share_encrypted_room(services, syncing_user, &user_id, Some(room_id))
.await;
match content.membership {
| Leave if !shares_encrypted_room => {
device_list_updates.left.insert(user_id);
},
| Join if joined_since_last_sync || shares_encrypted_room => {
device_list_updates.changed.insert(user_id);
},
| _ => (),
}
}
}
}
}
if !device_list_updates.is_empty() {
trace!(
changed = device_list_updates.changed.len(),
left = device_list_updates.left.len(),
"syncing device list updates"
);
}
Ok(device_list_updates)
}

View File

@@ -0,0 +1,349 @@
use conduwuit::{
Event, PduCount, PduEvent, Result, at, debug_warn,
pdu::EventHash,
trace,
utils::{self, IterStream, future::ReadyEqExt, stream::WidebandExt as _},
};
use futures::{StreamExt, future::join};
use ruma::{
EventId, OwnedRoomId, RoomId,
api::client::sync::sync_events::v3::{LeftRoom, RoomAccountData, State, Timeline},
events::{StateEventType, TimelineEventType},
uint,
};
use serde_json::value::RawValue;
use service::{Services, rooms::short::ShortStateHash};
use crate::client::{
TimelinePdus, ignored_filter,
sync::{
load_timeline,
v3::{
DEFAULT_TIMELINE_LIMIT, SyncContext, prepare_lazily_loaded_members,
state::build_state_initial,
},
},
};
#[tracing::instrument(
name = "left",
level = "debug",
skip_all,
fields(
room_id = %room_id,
),
)]
#[allow(clippy::too_many_arguments)]
pub(super) async fn load_left_room(
services: &Services,
sync_context: SyncContext<'_>,
ref room_id: OwnedRoomId,
leave_membership_event: Option<PduEvent>,
) -> Result<Option<LeftRoom>> {
let SyncContext {
syncing_user,
last_sync_end_count,
current_count,
filter,
..
} = sync_context;
// the global count as of the moment the user left the room
let Some(left_count) = services
.rooms
.state_cache
.get_left_count(room_id, syncing_user)
.await
.ok()
else {
// if we get here, the membership cache is incorrect, likely due to a state
// reset
debug_warn!("attempting to sync left room but no left count exists");
return Ok(None);
};
// return early if we haven't gotten to this leave yet.
// this can happen if the user leaves while a sync response is being generated
if current_count < left_count {
return Ok(None);
}
// return early if this is an incremental sync, and we've already synced this
// leave to the user, and `include_leave` isn't set on the filter.
if !filter.room.include_leave && last_sync_end_count >= Some(left_count) {
return Ok(None);
}
if let Some(ref leave_membership_event) = leave_membership_event {
debug_assert_eq!(
leave_membership_event.kind,
TimelineEventType::RoomMember,
"leave PDU should be m.room.member"
);
}
let does_not_exist = services.rooms.metadata.exists(room_id).eq(&false).await;
let (timeline, state_events) = match leave_membership_event {
| Some(leave_membership_event) if does_not_exist => {
/*
we have none PDUs with left beef for this room, likely because it was a rejected invite to a room
which nobody on this homeserver is in. `leave_pdu` is the remote-assisted outlier leave event for the room,
which is all we can send to the client.
if this is an initial sync, don't include this room at all to keep the client from asking for
state that we don't have.
*/
if last_sync_end_count.is_none() {
return Ok(None);
}
trace!("syncing remote-assisted leave PDU");
(TimelinePdus::default(), vec![leave_membership_event])
},
| Some(leave_membership_event) => {
// we have this room in our DB, and can fetch the state and timeline from when
// the user left.
let leave_state_key = syncing_user;
debug_assert_eq!(
Some(leave_state_key.as_str()),
leave_membership_event.state_key(),
"leave PDU should be for the user requesting the sync"
);
// the shortstatehash of the state _immediately before_ the syncing user left
// this room. the state represented here _does not_ include
// `leave_membership_event`.
let leave_shortstatehash = services
.rooms
.state_accessor
.pdu_shortstatehash(&leave_membership_event.event_id)
.await?;
let prev_membership_event = services
.rooms
.state_accessor
.state_get(
leave_shortstatehash,
&StateEventType::RoomMember,
leave_state_key.as_str(),
)
.await?;
build_left_state_and_timeline(
services,
sync_context,
room_id,
leave_membership_event,
leave_shortstatehash,
prev_membership_event,
)
.await?
},
| None => {
/*
no leave event was actually sent in this room, but we still need to pretend
like the user left it. this is usually because the room was banned by a server admin.
if this is an incremental sync, generate a fake leave event to make the room vanish from clients.
otherwise we don't tell the client about this room at all.
*/
if last_sync_end_count.is_none() {
return Ok(None);
}
trace!("syncing dummy leave event");
(TimelinePdus::default(), vec![create_dummy_leave_event(
services,
sync_context,
room_id,
)])
},
};
let raw_timeline_pdus = timeline
.pdus
.into_iter()
.stream()
// filter out ignored events from the timeline
.wide_filter_map(|item| ignored_filter(services, item, syncing_user))
.map(at!(1))
.map(Event::into_format)
.collect::<Vec<_>>()
.await;
Ok(Some(LeftRoom {
account_data: RoomAccountData { events: Vec::new() },
timeline: Timeline {
limited: timeline.limited,
prev_batch: Some(current_count.to_string()),
events: raw_timeline_pdus,
},
state: State {
events: state_events.into_iter().map(Event::into_format).collect(),
},
}))
}
async fn build_left_state_and_timeline(
services: &Services,
sync_context: SyncContext<'_>,
room_id: &RoomId,
leave_membership_event: PduEvent,
leave_shortstatehash: ShortStateHash,
prev_membership_event: PduEvent,
) -> Result<(TimelinePdus, Vec<PduEvent>)> {
let SyncContext {
syncing_user,
last_sync_end_count,
filter,
..
} = sync_context;
let timeline_start_count = if let Some(last_sync_end_count) = last_sync_end_count {
// for incremental syncs, start the timeline after `since`
PduCount::Normal(last_sync_end_count)
} else {
// for initial syncs, start the timeline after the previous membership
// event. we don't want to include the membership event itself
// because clients get confused when they see a `join`
// membership event in a `leave` room.
services
.rooms
.timeline
.get_pdu_count(&prev_membership_event.event_id)
.await?
};
// end the timeline at the user's leave event
let timeline_end_count = services
.rooms
.timeline
.get_pdu_count(leave_membership_event.event_id())
.await?;
// limit the timeline using the same logic as for joined rooms
let timeline_limit = filter
.room
.timeline
.limit
.and_then(|limit| limit.try_into().ok())
.unwrap_or(DEFAULT_TIMELINE_LIMIT);
let timeline = load_timeline(
services,
syncing_user,
room_id,
Some(timeline_start_count),
Some(timeline_end_count),
timeline_limit,
)
.await?;
let timeline_start_shortstatehash = async {
if let Some((_, pdu)) = timeline.pdus.front() {
if let Ok(shortstatehash) = services
.rooms
.state_accessor
.pdu_shortstatehash(&pdu.event_id)
.await
{
return shortstatehash;
}
}
// the timeline generally should not be empty (see the TODO further down),
// but in case it is we use `leave_shortstatehash` as the state to
// send
leave_shortstatehash
};
let lazily_loaded_members =
prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders());
let (timeline_start_shortstatehash, lazily_loaded_members) =
join(timeline_start_shortstatehash, lazily_loaded_members).await;
// TODO: calculate incremental state for incremental syncs.
// always calculating initial state _works_ but returns more data and does
// more processing than strictly necessary.
let mut state = build_state_initial(
services,
syncing_user,
timeline_start_shortstatehash,
lazily_loaded_members.as_ref(),
)
.await?;
/*
remove membership events for the syncing user from state.
usually, `state` should include a `join` membership event and `timeline` should include a `leave` one.
however, the matrix-js-sdk gets confused when this happens (see [1]) and doesn't process the room leave,
so we have to filter out the membership from `state`.
NOTE: we are sending more information than synapse does in this scenario, because we always
calculate `state` for initial syncs, even when the sync being performed is incremental.
however, the specification does not forbid sending extraneous events in `state`.
TODO: there is an additional bug at play here. sometimes `load_joined_room` syncs the `leave` event
before `load_left_room` does, which means the `timeline` we sync immediately after a leave is empty.
this shouldn't happen -- `timeline` should always include the `leave` event. this is probably
a race condition with the membership state cache.
[1]: https://github.com/matrix-org/matrix-js-sdk/issues/5071
*/
// `state` should only ever include one membership event for the syncing user
let membership_event_index = state.iter().position(|pdu| {
*pdu.event_type() == TimelineEventType::RoomMember
&& pdu.state_key() == Some(syncing_user.as_str())
});
if let Some(index) = membership_event_index {
// the ordering of events in `state` does not matter
state.swap_remove(index);
}
trace!(
?timeline_start_count,
?timeline_end_count,
"syncing {} timeline events (limited = {}) and {} state events",
timeline.pdus.len(),
timeline.limited,
state.len()
);
Ok((timeline, state))
}
fn create_dummy_leave_event(
services: &Services,
SyncContext { syncing_user, .. }: SyncContext<'_>,
room_id: &RoomId,
) -> PduEvent {
// TODO: because this event ID is random, it could cause caching issues with
// clients. perhaps a database table could be created to hold these dummy
// events, or they could be stored as outliers?
PduEvent {
event_id: EventId::new(services.globals.server_name()),
sender: syncing_user.to_owned(),
origin: None,
origin_server_ts: utils::millis_since_unix_epoch()
.try_into()
.expect("Timestamp is valid js_int value"),
kind: TimelineEventType::RoomMember,
content: RawValue::from_string(r#"{"membership": "leave"}"#.to_owned()).unwrap(),
state_key: Some(syncing_user.as_str().into()),
unsigned: None,
// The following keys are dropped on conversion
room_id: Some(room_id.to_owned()),
prev_events: vec![],
depth: uint!(1),
auth_events: vec![],
redacts: None,
hashes: EventHash { sha256: String::new() },
signatures: None,
}
}

View File

@@ -0,0 +1,502 @@
mod joined;
mod left;
mod state;
use std::{
cmp::{self},
collections::{BTreeMap, HashMap, HashSet},
time::Duration,
};
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Result, extract_variant,
utils::{
ReadyExt, TryFutureExtExt,
stream::{BroadbandExt, Tools, WidebandExt},
},
warn,
};
use conduwuit_service::Services;
use futures::{
FutureExt, StreamExt, TryFutureExt,
future::{OptionFuture, join3, join4, join5},
};
use ruma::{
DeviceId, OwnedUserId, RoomId, UserId,
api::client::{
filter::FilterDefinition,
sync::sync_events::{
self, DeviceLists,
v3::{
Filter, GlobalAccountData, InviteState, InvitedRoom, KnockState, KnockedRoom,
Presence, Rooms, ToDevice,
},
},
uiaa::UiaaResponse,
},
events::{
AnyRawAccountDataEvent,
presence::{PresenceEvent, PresenceEventContent},
},
serde::Raw,
};
use service::rooms::lazy_loading::{self, MemberSet, Options as _};
use super::{load_timeline, share_encrypted_room};
use crate::{
Ruma, RumaResponse,
client::{
is_ignored_invite,
sync::v3::{joined::load_joined_room, left::load_left_room},
},
};
/// The default maximum number of events to return in the `timeline` key of
/// joined and left rooms. If the number of events sent since the last sync
/// exceeds this number, the `timeline` will be `limited`.
const DEFAULT_TIMELINE_LIMIT: usize = 30;
/// A collection of updates to users' device lists, used for E2EE.
struct DeviceListUpdates {
changed: HashSet<OwnedUserId>,
left: HashSet<OwnedUserId>,
}
impl DeviceListUpdates {
fn new() -> Self {
Self {
changed: HashSet::new(),
left: HashSet::new(),
}
}
fn merge(&mut self, other: Self) {
self.changed.extend(other.changed);
self.left.extend(other.left);
}
fn is_empty(&self) -> bool { self.changed.is_empty() && self.left.is_empty() }
}
impl From<DeviceListUpdates> for DeviceLists {
fn from(val: DeviceListUpdates) -> Self {
Self {
changed: val.changed.into_iter().collect(),
left: val.left.into_iter().collect(),
}
}
}
/// References to common data needed to calculate the sync response.
#[derive(Clone, Copy)]
struct SyncContext<'a> {
/// The ID of the user requesting this sync.
syncing_user: &'a UserId,
/// The ID of the device requesting this sync, which will belong to
/// `syncing_user`.
syncing_device: &'a DeviceId,
/// The global count at the end of the previous sync response.
/// The previous sync's `current_count` will become the next sync's
/// `last_sync_end_count`. This will be None if no `since` query parameter
/// was specified, indicating an initial sync.
last_sync_end_count: Option<u64>,
/// The global count as of when we started building the sync response.
/// This is used as an upper bound when querying the database to ensure the
/// response represents a snapshot in time and doesn't include data which
/// appeared while the response was being built.
current_count: u64,
/// The `full_state` query parameter, used when syncing state for joined and
/// left rooms.
full_state: bool,
/// The sync filter, which the client uses to specify what data should be
/// included in the sync response.
filter: &'a FilterDefinition,
}
impl<'a> SyncContext<'a> {
fn lazy_loading_context(&self, room_id: &'a RoomId) -> lazy_loading::Context<'a> {
lazy_loading::Context {
user_id: self.syncing_user,
device_id: Some(self.syncing_device),
room_id,
token: self.last_sync_end_count,
options: Some(&self.filter.room.state.lazy_load_options),
}
}
#[inline]
fn lazy_loading_enabled(&self) -> bool {
(self.filter.room.state.lazy_load_options.is_enabled()
|| self.filter.room.timeline.lazy_load_options.is_enabled())
&& !self.full_state
}
}
type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
/// # `GET /_matrix/client/r0/sync`
///
/// Synchronize the client's state with the latest state on the server.
///
/// - This endpoint takes a `since` parameter which should be the `next_batch`
/// value from a previous request for incremental syncs.
///
/// Calling this endpoint without a `since` parameter returns:
/// - Some of the most recent events of each timeline
/// - Notification counts for each room
/// - Joined and invited member counts, heroes
/// - All state events
///
/// Calling this endpoint with a `since` parameter from a previous `next_batch`
/// returns: For joined rooms:
/// - Some of the most recent events of each timeline that happened after since
/// - If user joined the room after since: All state events (unless lazy loading
/// is activated) and all device list updates in that room
/// - If the user was already in the room: A list of all events that are in the
/// state now, but were not in the state at `since`
/// - If the state we send contains a member event: Joined and invited member
/// counts, heroes
/// - Device list updates that happened after `since`
/// - If there are events in the timeline we send or the user send updated his
/// read mark: Notification counts
/// - EDUs that are active now (read receipts, typing updates, presence)
/// - TODO: Allow multiple sync streams to support Pantalaimon
///
/// For invited rooms:
/// - If the user was invited after `since`: A subset of the state of the room
/// at the point of the invite
///
/// For left rooms:
/// - If the user left after `since`: `prev_batch` token, empty state (TODO:
/// subset of the state at the point of the leave)
#[tracing::instrument(
name = "sync",
level = "debug",
skip_all,
fields(
since = %body.body.since.as_deref().unwrap_or_default(),
)
)]
pub(crate) async fn sync_events_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<sync_events::v3::Request>,
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
let (sender_user, sender_device) = body.sender();
// Presence update
if services.config.allow_local_presence {
services
.presence
.ping_presence(sender_user, &body.body.set_presence)
.await?;
}
// Increment the "device last active" metadata
services
.users
.update_device_last_seen(sender_user, Some(sender_device), client_ip)
.await;
// Setup watchers, so if there's no response, we can wait for them
let watcher = services.sync.watch(sender_user, sender_device);
let response = build_sync_events(&services, &body).await?;
if body.body.full_state
|| !(response.rooms.is_empty()
&& response.presence.is_empty()
&& response.account_data.is_empty()
&& response.device_lists.is_empty()
&& response.to_device.is_empty())
{
return Ok(response);
}
// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let default = Duration::from_secs(30);
let duration = cmp::min(body.body.timeout.unwrap_or(default), default);
_ = tokio::time::timeout(duration, watcher).await;
// Retry returning data
build_sync_events(&services, &body).await
}
pub(crate) async fn build_sync_events(
services: &Services,
body: &Ruma<sync_events::v3::Request>,
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
let (syncing_user, syncing_device) = body.sender();
let current_count = services.globals.current_count()?;
// the `since` token is the last sync end count stringified
let last_sync_end_count = body
.body
.since
.as_ref()
.and_then(|string| string.parse().ok());
let full_state = body.body.full_state;
// FilterDefinition is very large (0x1000 bytes), let's put it on the heap
let filter = Box::new(match body.body.filter.as_ref() {
// use the default filter if none was specified
| None => FilterDefinition::default(),
// use inline filters directly
| Some(Filter::FilterDefinition(filter)) => filter.clone(),
// look up filter IDs from the database
| Some(Filter::FilterId(filter_id)) => services
.users
.get_filter(syncing_user, filter_id)
.await
.unwrap_or_default(),
});
let context = SyncContext {
syncing_user,
syncing_device,
last_sync_end_count,
current_count,
full_state,
filter: &filter,
};
let joined_rooms = services
.rooms
.state_cache
.rooms_joined(syncing_user)
.map(ToOwned::to_owned)
.broad_filter_map(|room_id| async {
let joined_room = load_joined_room(services, context, room_id.clone()).await;
match joined_room {
| Ok((room, updates)) => Some((room_id, room, updates)),
| Err(err) => {
warn!(?err, ?room_id, "error loading joined room {}", room_id);
None
},
}
})
.ready_fold(
(BTreeMap::new(), DeviceListUpdates::new()),
|(mut joined_rooms, mut all_updates), (room_id, joined_room, updates)| {
all_updates.merge(updates);
if !joined_room.is_empty() {
joined_rooms.insert(room_id, joined_room);
}
(joined_rooms, all_updates)
},
);
let left_rooms = services
.rooms
.state_cache
.rooms_left(syncing_user)
.broad_filter_map(|(room_id, leave_pdu)| {
load_left_room(services, context, room_id.clone(), leave_pdu)
.map_ok(move |left_room| (room_id, left_room))
.ok()
})
.ready_filter_map(|(room_id, left_room)| left_room.map(|left_room| (room_id, left_room)))
.collect();
let invited_rooms = services
.rooms
.state_cache
.rooms_invited(syncing_user)
.wide_filter_map(async |(room_id, invite_state)| {
if is_ignored_invite(services, syncing_user, &room_id).await {
None
} else {
Some((room_id, invite_state))
}
})
.fold_default(|mut invited_rooms: BTreeMap<_, _>, (room_id, invite_state)| async move {
let invite_count = services
.rooms
.state_cache
.get_invite_count(&room_id, syncing_user)
.await
.ok();
// only sync this invite if it was sent after the last /sync call
if last_sync_end_count < invite_count {
let invited_room = InvitedRoom {
invite_state: InviteState { events: invite_state },
};
invited_rooms.insert(room_id, invited_room);
}
invited_rooms
});
let knocked_rooms = services
.rooms
.state_cache
.rooms_knocked(syncing_user)
.fold_default(|mut knocked_rooms: BTreeMap<_, _>, (room_id, knock_state)| async move {
let knock_count = services
.rooms
.state_cache
.get_knock_count(&room_id, syncing_user)
.await
.ok();
// only sync this knock if it was sent after the last /sync call
if last_sync_end_count < knock_count {
let knocked_room = KnockedRoom {
knock_state: KnockState { events: knock_state },
};
knocked_rooms.insert(room_id, knocked_room);
}
knocked_rooms
});
let presence_updates: OptionFuture<_> = services
.config
.allow_local_presence
.then(|| process_presence_updates(services, last_sync_end_count, syncing_user))
.into();
let account_data = services
.account_data
.changes_since(None, syncing_user, last_sync_end_count, Some(current_count))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
.collect();
// Look for device list updates of this account
let keys_changed = services
.users
.keys_changed(syncing_user, last_sync_end_count, Some(current_count))
.map(ToOwned::to_owned)
.collect::<HashSet<_>>();
let to_device_events = services
.users
.get_to_device_events(
syncing_user,
syncing_device,
last_sync_end_count,
Some(current_count),
)
.collect::<Vec<_>>();
let device_one_time_keys_count = services
.users
.count_one_time_keys(syncing_user, syncing_device);
// Remove all to-device events the device received *last time*
let remove_to_device_events =
services
.users
.remove_to_device_events(syncing_user, syncing_device, last_sync_end_count);
let rooms = join4(joined_rooms, left_rooms, invited_rooms, knocked_rooms);
let ephemeral = join3(remove_to_device_events, to_device_events, presence_updates);
let top = join5(account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms)
.boxed()
.await;
let (account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) = top;
let ((), to_device_events, presence_updates) = ephemeral;
let (joined_rooms, left_rooms, invited_rooms, knocked_rooms) = rooms;
let (joined_rooms, mut device_list_updates) = joined_rooms;
device_list_updates.changed.extend(keys_changed);
let response = sync_events::v3::Response {
account_data: GlobalAccountData { events: account_data },
device_lists: device_list_updates.into(),
device_one_time_keys_count,
// Fallback keys are not yet supported
device_unused_fallback_key_types: None,
next_batch: current_count.to_string(),
presence: Presence {
events: presence_updates
.into_iter()
.flat_map(IntoIterator::into_iter)
.map(|(sender, content)| PresenceEvent { content, sender })
.map(|ref event| Raw::new(event))
.filter_map(Result::ok)
.collect(),
},
rooms: Rooms {
leave: left_rooms,
join: joined_rooms,
invite: invited_rooms,
knock: knocked_rooms,
},
to_device: ToDevice { events: to_device_events },
};
Ok(response)
}
#[tracing::instrument(name = "presence", level = "debug", skip_all)]
async fn process_presence_updates(
services: &Services,
last_sync_end_count: Option<u64>,
syncing_user: &UserId,
) -> PresenceUpdates {
services
.presence
.presence_since(last_sync_end_count.unwrap_or(0)) // send all presences on initial sync
.filter(|(user_id, ..)| {
services
.rooms
.state_cache
.user_sees_user(syncing_user, user_id)
})
.filter_map(|(user_id, _, presence_bytes)| {
services
.presence
.from_json_bytes_to_event(presence_bytes, user_id)
.map_ok(move |event| (user_id, event))
.ok()
})
.map(|(user_id, event)| (user_id.to_owned(), event.content))
.collect()
.await
}
/// Using the provided sync context and an iterator of user IDs in the
/// `timeline`, return a HashSet of user IDs whose membership events should be
/// sent to the client if lazy-loading is enabled.
#[allow(clippy::let_and_return)]
async fn prepare_lazily_loaded_members(
services: &Services,
sync_context: SyncContext<'_>,
room_id: &RoomId,
timeline_members: impl Iterator<Item = OwnedUserId>,
) -> Option<MemberSet> {
let lazy_loading_context = &sync_context.lazy_loading_context(room_id);
// reset lazy loading state on initial sync.
// do this even if lazy loading is disabled so future lazy loads
// will have the correct members.
if sync_context.last_sync_end_count.is_none() {
services
.rooms
.lazy_loading
.reset(lazy_loading_context)
.await;
}
// filter the input members through `retain_lazy_members`, which
// contains the actual lazy loading logic.
let lazily_loaded_members =
OptionFuture::from(sync_context.lazy_loading_enabled().then(|| {
services
.rooms
.lazy_loading
.retain_lazy_members(timeline_members.collect(), lazy_loading_context)
}))
.await;
lazily_loaded_members
}

View File

@@ -0,0 +1,280 @@
use std::{collections::BTreeSet, ops::ControlFlow};
use conduwuit::{
Result, at, is_equal_to,
matrix::{
Event,
pdu::{PduCount, PduEvent},
},
utils::{
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
stream::{BroadbandExt, TryIgnore},
},
};
use conduwuit_service::{
Services,
rooms::{lazy_loading::MemberSet, short::ShortStateHash},
};
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use ruma::{OwnedEventId, RoomId, UserId, events::StateEventType};
use service::rooms::short::ShortEventId;
use tracing::trace;
use crate::client::TimelinePdus;
/// Calculate the state events to include in an initial sync response.
///
/// If lazy-loading is enabled (`lazily_loaded_members` is Some), the returned
/// Vec will include the membership events of exclusively the members in
/// `lazily_loaded_members`.
#[tracing::instrument(
name = "initial",
level = "trace",
skip_all,
fields(current_shortstatehash)
)]
#[allow(clippy::too_many_arguments)]
pub(super) async fn build_state_initial(
services: &Services,
sender_user: &UserId,
timeline_start_shortstatehash: ShortStateHash,
lazily_loaded_members: Option<&MemberSet>,
) -> Result<Vec<PduEvent>> {
// load the keys and event IDs of the state events at the start of the timeline
let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services
.rooms
.state_accessor
.state_full_ids(timeline_start_shortstatehash)
.unzip()
.await;
trace!("performing initial sync of {} state events", event_ids.len());
services
.rooms
.short
// look up the full state keys
.multi_get_statekey_from_short(shortstatekeys.into_iter().stream())
.zip(event_ids.into_iter().stream())
.ready_filter_map(|item| Some((item.0.ok()?, item.1)))
.ready_filter_map(|((event_type, state_key), event_id)| {
if let Some(lazily_loaded_members) = lazily_loaded_members {
/*
if lazy loading is enabled, filter out membership events which aren't for a user
included in `lazily_loaded_members` or for the user requesting the sync.
*/
let event_is_redundant = event_type == StateEventType::RoomMember
&& state_key.as_str().try_into().is_ok_and(|user_id: &UserId| {
sender_user != user_id && !lazily_loaded_members.contains(user_id)
});
event_is_redundant.or_some(event_id)
} else {
Some(event_id)
}
})
.broad_filter_map(|event_id: OwnedEventId| async move {
services.rooms.timeline.get_pdu(&event_id).await.ok()
})
.collect()
.map(Ok)
.await
}
/// Calculate the state events to include in an incremental sync response.
///
/// If lazy-loading is enabled (`lazily_loaded_members` is Some), the returned
/// Vec will include the membership events of all the members in
/// `lazily_loaded_members`.
#[tracing::instrument(name = "incremental", level = "trace", skip_all)]
#[allow(clippy::too_many_arguments)]
pub(super) async fn build_state_incremental<'a>(
services: &Services,
sender_user: &'a UserId,
room_id: &RoomId,
last_sync_end_count: PduCount,
last_sync_end_shortstatehash: ShortStateHash,
timeline_start_shortstatehash: ShortStateHash,
timeline_end_shortstatehash: ShortStateHash,
timeline: &TimelinePdus,
lazily_loaded_members: Option<&'a MemberSet>,
) -> Result<Vec<PduEvent>> {
/*
NB: a limited sync is one where `timeline.limited == true`. Synapse calls this a "gappy" sync internally.
The algorithm implemented in this function is, currently, quite different from the algorithm vaguely described
by the Matrix specification. This is because the specification's description of the `state` property does not accurately
reflect how Synapse behaves, and therefore how client SDKs behave. Notable differences include:
1. We do not compute the delta using the naive approach of "every state event from the end of the last sync
up to the start of this sync's timeline". see below for details.
2. If lazy-loading is enabled, we include lazily-loaded membership events. The specific users to include are determined
elsewhere and supplied to this function in the `lazily_loaded_members` parameter.
*/
/*
the `state` property of an incremental sync which isn't limited are _usually_ empty.
(note: the specification says that the `state` property is _always_ empty for limited syncs, which is incorrect.)
however, if an event in the timeline (`timeline.pdus`) merges a split in the room's DAG (i.e. has multiple `prev_events`),
the state at the _end_ of the timeline may include state events which were merged in and don't exist in the state
at the _start_ of the timeline. because this is uncommon, we check here to see if any events in the timeline
merged a split in the DAG.
see: https://github.com/element-hq/synapse/issues/16941
*/
let timeline_is_linear = timeline.pdus.is_empty() || {
let last_pdu_of_last_sync = services
.rooms
.timeline
.pdus_rev(room_id, Some(last_sync_end_count.saturating_add(1)))
.boxed()
.next()
.await
.transpose()
.expect("last sync should have had some PDUs")
.map(at!(1));
// make sure the prev_events of each pdu in the timeline refer only to the
// previous pdu
timeline
.pdus
.iter()
.try_fold(last_pdu_of_last_sync.map(|pdu| pdu.event_id), |prev_event_id, (_, pdu)| {
if let Ok(pdu_prev_event_id) = pdu.prev_events.iter().exactly_one() {
if prev_event_id
.as_ref()
.is_none_or(is_equal_to!(pdu_prev_event_id))
{
return ControlFlow::Continue(Some(pdu_prev_event_id.to_owned()));
}
}
trace!(
"pdu {:?} has split prev_events (expected {:?}): {:?}",
pdu.event_id, prev_event_id, pdu.prev_events
);
ControlFlow::Break(())
})
.is_continue()
};
if timeline_is_linear && !timeline.limited {
// if there are no splits in the DAG and the timeline isn't limited, then
// `state` will always be empty unless lazy loading is enabled.
if let Some(lazily_loaded_members) = lazily_loaded_members {
if !timeline.pdus.is_empty() {
// lazy loading is enabled, so we return the membership events which were
// requested by the caller.
let lazy_membership_events: Vec<_> = lazily_loaded_members
.iter()
.stream()
.broad_filter_map(|user_id| async move {
if user_id == sender_user {
return None;
}
services
.rooms
.state_accessor
.state_get(
timeline_start_shortstatehash,
&StateEventType::RoomMember,
user_id.as_str(),
)
.ok()
.await
})
.collect()
.await;
if !lazy_membership_events.is_empty() {
trace!(
"syncing lazy membership events for members: {:?}",
lazy_membership_events
.iter()
.map(|pdu| pdu.state_key().unwrap())
.collect::<Vec<_>>()
);
}
return Ok(lazy_membership_events);
}
}
// lazy loading is disabled, `state` is empty.
return Ok(vec![]);
}
/*
at this point, either the timeline is `limited` or the DAG has a split in it. this necessitates
computing the incremental state (which may be empty).
NOTE: this code path does not use the `lazy_membership_events` parameter. any changes to membership will be included
in the incremental state. therefore, the incremental state may include "redundant" membership events,
which we do not filter out because A. the spec forbids lazy-load filtering if the timeline is `limited`,
and B. DAG splits which require sending extra membership state events are (probably) uncommon enough that
the performance penalty is acceptable.
*/
trace!(?timeline_is_linear, ?timeline.limited, "computing state for incremental sync");
// fetch the shorteventids of state events in the timeline
let state_events_in_timeline: BTreeSet<ShortEventId> = services
.rooms
.short
.multi_get_or_create_shorteventid(timeline.pdus.iter().filter_map(|(_, pdu)| {
if pdu.state_key().is_some() {
Some(pdu.event_id.as_ref())
} else {
None
}
}))
.collect()
.await;
trace!("{} state events in timeline", state_events_in_timeline.len());
/*
fetch the state events which were added since the last sync.
specifically we fetch the difference between the state at the last sync and the state at the _end_
of the timeline, and then we filter out state events in the timeline itself using the shorteventids we fetched.
this is necessary to account for splits in the DAG, as explained above.
*/
let state_diff = services
.rooms
.short
.multi_get_eventid_from_short::<'_, OwnedEventId, _>(
services
.rooms
.state_accessor
.state_added((last_sync_end_shortstatehash, timeline_end_shortstatehash))
.await?
.stream()
.ready_filter_map(|(_, shorteventid)| {
if state_events_in_timeline.contains(&shorteventid) {
None
} else {
Some(shorteventid)
}
}),
)
.ignore_err();
// finally, fetch the PDU contents and collect them into a vec
let state_diff_pdus = state_diff
.broad_filter_map(|event_id| async move {
services
.rooms
.timeline
.get_non_outlier_pdu(&event_id)
.await
.ok()
})
.collect::<Vec<_>>()
.await;
trace!(?state_diff_pdus, "collected state PDUs for incremental sync");
Ok(state_diff_pdus)
}

View File

@@ -1,848 +0,0 @@
use std::{
cmp::{self, Ordering},
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
time::Duration,
};
use axum::extract::State;
use conduwuit::{
Err, Error, Event, PduCount, Result, at, debug, error, extract_variant,
matrix::TypeStateKey,
utils::{
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
math::{ruma_from_usize, usize_from_ruma, usize_from_u64_truncated},
stream::WidebandExt,
},
warn,
};
use conduwuit_service::{
Services,
rooms::read_receipt::pack_receipts,
sync::{into_db_key, into_snake_key},
};
use futures::{FutureExt, StreamExt, TryFutureExt};
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, RoomId, UInt, UserId,
api::client::sync::sync_events::{
self, DeviceLists, UnreadNotificationsCount,
v4::{SlidingOp, SlidingSyncRoomHero},
},
directory::RoomTypeFilter,
events::{
AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType,
TimelineEventType::*,
room::member::{MembershipState, RoomMemberEventContent},
},
serde::Raw,
uint,
};
use super::{load_timeline, share_encrypted_room};
use crate::{
Ruma,
client::{DEFAULT_BUMP_TYPES, ignored_filter, is_ignored_invite},
};
type TodoRooms = BTreeMap<OwnedRoomId, (BTreeSet<TypeStateKey>, usize, u64)>;
const SINGLE_CONNECTION_SYNC: &str = "single_connection_sync";
#[allow(clippy::cognitive_complexity)]
/// POST `/_matrix/client/unstable/org.matrix.msc3575/sync`
///
/// Sliding Sync endpoint (future endpoint: `/_matrix/client/v4/sync`)
pub(crate) async fn sync_events_v4_route(
State(services): State<crate::State>,
body: Ruma<sync_events::v4::Request>,
) -> Result<sync_events::v4::Response> {
debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted");
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_ref().expect("user is authenticated");
let mut body = body.body;
// Setup watchers, so if there's no response, we can wait for them
let watcher = services.sync.watch(sender_user, sender_device);
let next_batch = services.globals.next_count()?;
let conn_id = body
.conn_id
.clone()
.unwrap_or_else(|| SINGLE_CONNECTION_SYNC.to_owned());
let globalsince = body
.pos
.as_ref()
.and_then(|string| string.parse().ok())
.unwrap_or(0);
let db_key = into_db_key(sender_user, sender_device, conn_id.clone());
if globalsince != 0 && !services.sync.remembered(&db_key) {
debug!("Restarting sync stream because it was gone from the database");
return Err!(Request(UnknownPos("Connection data lost since last time")));
}
if globalsince == 0 {
services.sync.forget_sync_request_connection(&db_key);
}
// Get sticky parameters from cache
let snake_key = into_snake_key(sender_user, sender_device, conn_id.clone());
let known_rooms = services
.sync
.update_sync_request_with_cache(&snake_key, &mut body);
let all_joined_rooms: Vec<_> = services
.rooms
.state_cache
.rooms_joined(sender_user)
.map(ToOwned::to_owned)
.collect()
.await;
let all_invited_rooms: Vec<_> = services
.rooms
.state_cache
.rooms_invited(sender_user)
.wide_filter_map(async |(room_id, invite_state)| {
if is_ignored_invite(&services, sender_user, &room_id).await {
None
} else {
Some((room_id, invite_state))
}
})
.map(|r| r.0)
.collect()
.await;
let all_knocked_rooms: Vec<_> = services
.rooms
.state_cache
.rooms_knocked(sender_user)
.map(|r| r.0)
.collect()
.await;
let all_invited_rooms: Vec<&RoomId> = all_invited_rooms.iter().map(AsRef::as_ref).collect();
let all_knocked_rooms: Vec<&RoomId> = all_knocked_rooms.iter().map(AsRef::as_ref).collect();
let all_rooms: Vec<&RoomId> = all_joined_rooms
.iter()
.map(AsRef::as_ref)
.chain(all_invited_rooms.iter().map(AsRef::as_ref))
.chain(all_knocked_rooms.iter().map(AsRef::as_ref))
.collect();
let all_joined_rooms = all_joined_rooms.iter().map(AsRef::as_ref).collect();
let all_invited_rooms = all_invited_rooms.iter().map(AsRef::as_ref).collect();
if body.extensions.to_device.enabled.unwrap_or(false) {
services
.users
.remove_to_device_events(sender_user, sender_device, globalsince)
.await;
}
let mut left_encrypted_users = HashSet::new(); // Users that have left any encrypted rooms the sender was in
let mut device_list_changes = HashSet::new();
let mut device_list_left = HashSet::new();
let mut receipts = sync_events::v4::Receipts { rooms: BTreeMap::new() };
let mut account_data = sync_events::v4::AccountData {
global: Vec::new(),
rooms: BTreeMap::new(),
};
if body.extensions.account_data.enabled.unwrap_or(false) {
account_data.global = services
.account_data
.changes_since(None, sender_user, globalsince, Some(next_batch))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
.collect()
.await;
if let Some(rooms) = body.extensions.account_data.rooms {
for room in rooms {
account_data.rooms.insert(
room.clone(),
services
.account_data
.changes_since(Some(&room), sender_user, globalsince, Some(next_batch))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect()
.await,
);
}
}
}
if body.extensions.e2ee.enabled.unwrap_or(false) {
// Look for device list updates of this account
device_list_changes.extend(
services
.users
.keys_changed(sender_user, globalsince, None)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
.await,
);
for room_id in &all_joined_rooms {
let room_id: &&RoomId = room_id;
let Ok(current_shortstatehash) =
services.rooms.state.get_room_shortstatehash(room_id).await
else {
error!("Room {room_id} has no state");
continue;
};
let since_shortstatehash = services
.rooms
.user
.get_token_shortstatehash(room_id, globalsince)
.await
.ok();
let encrypted_room = services
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")
.await
.is_ok();
if let Some(since_shortstatehash) = since_shortstatehash {
// Skip if there are only timeline changes
if since_shortstatehash == current_shortstatehash {
continue;
}
let since_encryption = services
.rooms
.state_accessor
.state_get(since_shortstatehash, &StateEventType::RoomEncryption, "")
.await;
let since_sender_member: Option<RoomMemberEventContent> = services
.rooms
.state_accessor
.state_get_content(
since_shortstatehash,
&StateEventType::RoomMember,
sender_user.as_str(),
)
.ok()
.await;
let joined_since_last_sync = since_sender_member
.as_ref()
.is_none_or(|member| member.membership != MembershipState::Join);
let new_encrypted_room = encrypted_room && since_encryption.is_err();
if encrypted_room {
let current_state_ids: HashMap<_, OwnedEventId> = services
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.collect()
.await;
let since_state_ids: HashMap<_, _> = services
.rooms
.state_accessor
.state_full_ids(since_shortstatehash)
.collect()
.await;
for (key, id) in current_state_ids {
if since_state_ids.get(&key) != Some(&id) {
let Ok(pdu) = services.rooms.timeline.get_pdu(&id).await else {
error!("Pdu in state not found: {id}");
continue;
};
if pdu.kind == RoomMember {
if let Some(Ok(user_id)) =
pdu.state_key.as_deref().map(UserId::parse)
{
if user_id == sender_user {
continue;
}
let content: RoomMemberEventContent = pdu.get_content()?;
match content.membership {
| MembershipState::Join => {
// A new user joined an encrypted room
if !share_encrypted_room(
&services,
sender_user,
user_id,
Some(room_id),
)
.await
{
device_list_changes.insert(user_id.to_owned());
}
},
| MembershipState::Leave => {
// Write down users that have left encrypted rooms we
// are in
left_encrypted_users.insert(user_id.to_owned());
},
| _ => {},
}
}
}
}
}
if joined_since_last_sync || new_encrypted_room {
// If the user is in a new encrypted room, give them all joined users
device_list_changes.extend(
services
.rooms
.state_cache
.room_members(room_id)
// Don't send key updates from the sender to the sender
.ready_filter(|&user_id| sender_user != user_id)
// Only send keys if the sender doesn't share an encrypted room with the target
// already
.filter_map(|user_id| {
share_encrypted_room(&services, sender_user, user_id, Some(room_id))
.map(|res| res.or_some(user_id.to_owned()))
})
.collect::<Vec<_>>()
.await,
);
}
}
}
// Look for device list updates in this room
device_list_changes.extend(
services
.users
.room_keys_changed(room_id, globalsince, None)
.map(|(user_id, _)| user_id)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
.await,
);
}
for user_id in left_encrypted_users {
let dont_share_encrypted_room =
!share_encrypted_room(&services, sender_user, &user_id, None).await;
// If the user doesn't share an encrypted room with the target anymore, we need
// to tell them
if dont_share_encrypted_room {
device_list_left.insert(user_id);
}
}
}
let mut lists = BTreeMap::new();
let mut todo_rooms: TodoRooms = BTreeMap::new(); // and required state
for (list_id, list) in &body.lists {
let active_rooms = match list.filters.clone().and_then(|f| f.is_invite) {
| Some(true) => &all_invited_rooms,
| Some(false) => &all_joined_rooms,
| None => &all_rooms,
};
let active_rooms = match list.filters.clone().map(|f| f.not_room_types) {
| Some(filter) if filter.is_empty() => active_rooms.clone(),
| Some(value) => filter_rooms(&services, active_rooms, &value, true).await,
| None => active_rooms.clone(),
};
let active_rooms = match list.filters.clone().map(|f| f.room_types) {
| Some(filter) if filter.is_empty() => active_rooms.clone(),
| Some(value) => filter_rooms(&services, &active_rooms, &value, false).await,
| None => active_rooms,
};
let mut new_known_rooms: BTreeSet<OwnedRoomId> = BTreeSet::new();
let ranges = list.ranges.clone();
lists.insert(list_id.clone(), sync_events::v4::SyncList {
ops: ranges
.into_iter()
.map(|mut r| {
r.0 = r.0.clamp(
uint!(0),
UInt::try_from(active_rooms.len().saturating_sub(1)).unwrap_or(UInt::MAX),
);
r.1 = r.1.clamp(
r.0,
UInt::try_from(active_rooms.len().saturating_sub(1)).unwrap_or(UInt::MAX),
);
let room_ids = if !active_rooms.is_empty() {
active_rooms[usize_from_ruma(r.0)..=usize_from_ruma(r.1)].to_vec()
} else {
Vec::new()
};
new_known_rooms.extend(room_ids.clone().into_iter().map(ToOwned::to_owned));
for room_id in &room_ids {
let todo_room = todo_rooms.entry((*room_id).to_owned()).or_insert((
BTreeSet::new(),
0_usize,
u64::MAX,
));
let limit: usize = list
.room_details
.timeline_limit
.map(u64::from)
.map_or(10, usize_from_u64_truncated)
.min(100);
todo_room.0.extend(
list.room_details
.required_state
.iter()
.map(|(ty, sk)| (ty.clone(), sk.as_str().into())),
);
todo_room.1 = todo_room.1.max(limit);
// 0 means unknown because it got out of date
todo_room.2 = todo_room.2.min(
known_rooms
.get(list_id.as_str())
.and_then(|k| k.get(*room_id))
.copied()
.unwrap_or(0),
);
}
sync_events::v4::SyncOp {
op: SlidingOp::Sync,
range: Some(r),
index: None,
room_ids: room_ids.into_iter().map(ToOwned::to_owned).collect(),
room_id: None,
}
})
.collect(),
count: ruma_from_usize(active_rooms.len()),
});
if let Some(conn_id) = &body.conn_id {
let db_key = into_db_key(sender_user, sender_device, conn_id);
services.sync.update_sync_known_rooms(
&db_key,
list_id.clone(),
new_known_rooms,
globalsince,
);
}
}
let mut known_subscription_rooms = BTreeSet::new();
for (room_id, room) in &body.room_subscriptions {
if !services.rooms.metadata.exists(room_id).await
|| services.rooms.metadata.is_disabled(room_id).await
|| services.rooms.metadata.is_banned(room_id).await
{
continue;
}
let todo_room =
todo_rooms
.entry(room_id.clone())
.or_insert((BTreeSet::new(), 0_usize, u64::MAX));
let limit: usize = room
.timeline_limit
.map(u64::from)
.map_or(10, usize_from_u64_truncated)
.min(100);
todo_room.0.extend(
room.required_state
.iter()
.map(|(ty, sk)| (ty.clone(), sk.as_str().into())),
);
todo_room.1 = todo_room.1.max(limit);
// 0 means unknown because it got out of date
todo_room.2 = todo_room.2.min(
known_rooms
.get("subscriptions")
.and_then(|k| k.get(room_id))
.copied()
.unwrap_or(0),
);
known_subscription_rooms.insert(room_id.clone());
}
for r in body.unsubscribe_rooms {
known_subscription_rooms.remove(&r);
body.room_subscriptions.remove(&r);
}
if let Some(conn_id) = &body.conn_id {
let db_key = into_db_key(sender_user, sender_device, conn_id);
services.sync.update_sync_known_rooms(
&db_key,
"subscriptions".to_owned(),
known_subscription_rooms,
globalsince,
);
}
if let Some(conn_id) = body.conn_id.clone() {
let db_key = into_db_key(sender_user, sender_device, conn_id);
services
.sync
.update_sync_subscriptions(&db_key, body.room_subscriptions);
}
let mut rooms = BTreeMap::new();
for (room_id, (required_state_request, timeline_limit, roomsince)) in &todo_rooms {
let roomsincecount = PduCount::Normal(*roomsince);
let mut timestamp: Option<_> = None;
let mut invite_state = None;
let (timeline_pdus, limited);
let new_room_id: &RoomId = (*room_id).as_ref();
if all_invited_rooms.contains(&new_room_id) {
// TODO: figure out a timestamp we can use for remote invites
invite_state = services
.rooms
.state_cache
.invite_state(sender_user, room_id)
.await
.ok();
(timeline_pdus, limited) = (Vec::new(), true);
} else {
(timeline_pdus, limited) = match load_timeline(
&services,
sender_user,
room_id,
roomsincecount,
None,
*timeline_limit,
)
.await
{
| Ok(value) => value,
| Err(err) => {
warn!("Encountered missing timeline in {}, error {}", room_id, err);
continue;
},
};
}
account_data.rooms.insert(
room_id.to_owned(),
services
.account_data
.changes_since(Some(room_id), sender_user, *roomsince, Some(next_batch))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect()
.await,
);
let last_privateread_update = services
.rooms
.read_receipt
.last_privateread_update(sender_user, room_id)
.await > *roomsince;
let private_read_event = if last_privateread_update {
services
.rooms
.read_receipt
.private_read_get(room_id, sender_user)
.await
.ok()
} else {
None
};
let mut vector: Vec<Raw<AnySyncEphemeralRoomEvent>> = services
.rooms
.read_receipt
.readreceipts_since(room_id, *roomsince)
.filter_map(|(read_user, _ts, v)| async move {
services
.users
.user_is_ignored(read_user, sender_user)
.await
.or_some(v)
})
.collect()
.await;
if let Some(private_read_event) = private_read_event {
vector.push(private_read_event);
}
let receipt_size = vector.len();
receipts
.rooms
.insert(room_id.clone(), pack_receipts(Box::new(vector.into_iter())));
if roomsince != &0
&& timeline_pdus.is_empty()
&& account_data.rooms.get(room_id).is_some_and(Vec::is_empty)
&& receipt_size == 0
{
continue;
}
let prev_batch = timeline_pdus
.first()
.map_or(Ok::<_, Error>(None), |(pdu_count, _)| {
Ok(Some(match pdu_count {
| PduCount::Backfilled(_) => {
error!("timeline in backfill state?!");
"0".to_owned()
},
| PduCount::Normal(c) => c.to_string(),
}))
})?
.or_else(|| {
if roomsince != &0 {
Some(roomsince.to_string())
} else {
None
}
});
let room_events: Vec<_> = timeline_pdus
.iter()
.stream()
.filter_map(|item| ignored_filter(&services, item.clone(), sender_user))
.map(at!(1))
.map(Event::into_format)
.collect()
.await;
for (_, pdu) in timeline_pdus {
let ts = MilliSecondsSinceUnixEpoch(pdu.origin_server_ts);
if DEFAULT_BUMP_TYPES.binary_search(&pdu.kind).is_ok()
&& timestamp.is_none_or(|time| time <= ts)
{
timestamp = Some(ts);
}
}
let required_state = required_state_request
.iter()
.stream()
.filter_map(|state| async move {
services
.rooms
.state_accessor
.room_state_get(room_id, &state.0, &state.1)
.await
.map(Event::into_format)
.ok()
})
.collect()
.await;
// Heroes
let heroes: Vec<_> = services
.rooms
.state_cache
.room_members(room_id)
.ready_filter(|&member| member != sender_user)
.filter_map(|user_id| {
services
.rooms
.state_accessor
.get_member(room_id, user_id)
.map_ok(|memberevent| SlidingSyncRoomHero {
user_id: user_id.into(),
name: memberevent.displayname,
avatar: memberevent.avatar_url,
})
.ok()
})
.take(5)
.collect()
.await;
let name = match heroes.len().cmp(&(1_usize)) {
| Ordering::Greater => {
let firsts = heroes[1..]
.iter()
.map(|h| h.name.clone().unwrap_or_else(|| h.user_id.to_string()))
.collect::<Vec<_>>()
.join(", ");
let last = heroes[0]
.name
.clone()
.unwrap_or_else(|| heroes[0].user_id.to_string());
Some(format!("{firsts} and {last}"))
},
| Ordering::Equal => Some(
heroes[0]
.name
.clone()
.unwrap_or_else(|| heroes[0].user_id.to_string()),
),
| Ordering::Less => None,
};
let heroes_avatar = if heroes.len() == 1 {
heroes[0].avatar.clone()
} else {
None
};
rooms.insert(room_id.clone(), sync_events::v4::SlidingSyncRoom {
name: services
.rooms
.state_accessor
.get_name(room_id)
.await
.ok()
.or(name),
avatar: match heroes_avatar {
| Some(heroes_avatar) => ruma::JsOption::Some(heroes_avatar),
| _ => match services.rooms.state_accessor.get_avatar(room_id).await {
| ruma::JsOption::Some(avatar) => ruma::JsOption::from_option(avatar.url),
| ruma::JsOption::Null => ruma::JsOption::Null,
| ruma::JsOption::Undefined => ruma::JsOption::Undefined,
},
},
initial: Some(roomsince == &0),
is_dm: None,
invite_state,
unread_notifications: UnreadNotificationsCount {
highlight_count: Some(
services
.rooms
.user
.highlight_count(sender_user, room_id)
.await
.try_into()
.expect("notification count can't go that high"),
),
notification_count: Some(
services
.rooms
.user
.notification_count(sender_user, room_id)
.await
.try_into()
.expect("notification count can't go that high"),
),
},
timeline: room_events,
required_state,
prev_batch,
limited,
joined_count: Some(
services
.rooms
.state_cache
.room_joined_count(room_id)
.await
.unwrap_or(0)
.try_into()
.unwrap_or_else(|_| uint!(0)),
),
invited_count: Some(
services
.rooms
.state_cache
.room_invited_count(room_id)
.await
.unwrap_or(0)
.try_into()
.unwrap_or_else(|_| uint!(0)),
),
num_live: None, // Count events in timeline greater than global sync counter
timestamp,
heroes: Some(heroes),
});
}
if rooms.iter().all(|(id, r)| {
r.timeline.is_empty() && r.required_state.is_empty() && !receipts.rooms.contains_key(id)
}) {
// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let default = Duration::from_secs(30);
let duration = cmp::min(body.timeout.unwrap_or(default), default);
_ = tokio::time::timeout(duration, watcher).await;
}
Ok(sync_events::v4::Response {
initial: globalsince == 0,
txn_id: body.txn_id.clone(),
pos: next_batch.to_string(),
lists,
rooms,
extensions: sync_events::v4::Extensions {
to_device: if body.extensions.to_device.enabled.unwrap_or(false) {
Some(sync_events::v4::ToDevice {
events: services
.users
.get_to_device_events(
sender_user,
sender_device,
Some(globalsince),
Some(next_batch),
)
.collect()
.await,
next_batch: next_batch.to_string(),
})
} else {
None
},
e2ee: sync_events::v4::E2EE {
device_lists: DeviceLists {
changed: device_list_changes.into_iter().collect(),
left: device_list_left.into_iter().collect(),
},
device_one_time_keys_count: services
.users
.count_one_time_keys(sender_user, sender_device)
.await,
// Fallback keys are not yet supported
device_unused_fallback_key_types: None,
},
account_data,
receipts,
typing: sync_events::v4::Typing { rooms: BTreeMap::new() },
},
delta_token: None,
})
}
async fn filter_rooms<'a>(
services: &Services,
rooms: &[&'a RoomId],
filter: &[RoomTypeFilter],
negate: bool,
) -> Vec<&'a RoomId> {
rooms
.iter()
.stream()
.filter_map(|r| async move {
let room_type = services.rooms.state_accessor.get_room_type(r).await;
if room_type.as_ref().is_err_and(|e| !e.is_not_found()) {
return None;
}
let room_type_filter = RoomTypeFilter::from(room_type.ok());
let include = if negate {
!filter.contains(&room_type_filter)
} else {
filter.is_empty() || filter.contains(&room_type_filter)
};
include.then_some(r)
})
.collect()
.await
}

View File

@@ -1,11 +1,12 @@
use std::{
cmp::{self, Ordering},
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
ops::Deref,
time::Duration,
};
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Error, Result, at, error, extract_variant, is_equal_to,
matrix::{Event, TypeStateKey, pdu::PduCount},
@@ -31,6 +32,7 @@
events::{
AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType, TimelineEventType,
room::member::{MembershipState, RoomMemberEventContent},
typing::TypingEventContent,
},
serde::Raw,
uint,
@@ -39,7 +41,9 @@
use super::share_encrypted_room;
use crate::{
Ruma,
client::{DEFAULT_BUMP_TYPES, ignored_filter, is_ignored_invite, sync::load_timeline},
client::{
DEFAULT_BUMP_TYPES, TimelinePdus, ignored_filter, is_ignored_invite, sync::load_timeline,
},
};
type SyncInfo<'a> = (&'a UserId, &'a DeviceId, u64, &'a sync_events::v5::Request);
@@ -58,11 +62,18 @@
/// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186
pub(crate) async fn sync_events_v5_route(
State(ref services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<sync_events::v5::Request>,
) -> Result<sync_events::v5::Response> {
debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted");
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_ref().expect("user is authenticated");
services
.users
.update_device_last_seen(sender_user, Some(sender_device), client_ip)
.await;
let mut body = body.body;
// Setup watchers, so if there's no response, we can wait for them
@@ -210,6 +221,9 @@ pub(crate) async fn sync_events_v5_route(
_ = tokio::time::timeout(duration, watcher).await;
}
let typing = collect_typing_events(services, sender_user, &body, &todo_rooms).await?;
response.extensions.typing = typing;
trace!(
rooms = ?response.rooms.len(),
account_data = ?response.extensions.account_data.rooms.len(),
@@ -293,6 +307,8 @@ async fn handle_lists<'a, Rooms, AllRooms>(
Rooms: Iterator<Item = &'a RoomId> + Clone + Send + 'a,
AllRooms: Iterator<Item = &'a RoomId> + Clone + Send + 'a,
{
// TODO MSC4186: Implement remaining list filters: is_dm, is_encrypted,
// room_types.
for (list_id, list) in &body.lists {
let active_rooms: Vec<_> = match list.filters.as_ref().and_then(|f| f.is_invite) {
| None => all_rooms.clone().collect(),
@@ -409,13 +425,13 @@ async fn process_rooms<'a, Rooms>(
.await
.ok();
(timeline_pdus, limited) = (Vec::new(), true);
(timeline_pdus, limited) = (VecDeque::new(), true);
} else {
(timeline_pdus, limited) = match load_timeline(
TimelinePdus { pdus: timeline_pdus, limited } = match load_timeline(
services,
sender_user,
room_id,
roomsincecount,
Some(roomsincecount),
Some(PduCount::from(next_batch)),
*timeline_limit,
)
@@ -434,7 +450,7 @@ async fn process_rooms<'a, Rooms>(
room_id.to_owned(),
services
.account_data
.changes_since(Some(room_id), sender_user, *roomsince, Some(next_batch))
.changes_since(Some(room_id), sender_user, Some(*roomsince), Some(next_batch))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect()
.await,
@@ -460,11 +476,11 @@ async fn process_rooms<'a, Rooms>(
let mut receipts: Vec<Raw<AnySyncEphemeralRoomEvent>> = services
.rooms
.read_receipt
.readreceipts_since(room_id, *roomsince)
.readreceipts_since(room_id, Some(*roomsince))
.filter_map(|(read_user, _ts, v)| async move {
services
.users
.user_is_ignored(read_user, sender_user)
.user_is_ignored(&read_user, sender_user)
.await
.or_some(v)
})
@@ -499,7 +515,7 @@ async fn process_rooms<'a, Rooms>(
}
let prev_batch = timeline_pdus
.first()
.front()
.map_or(Ok::<_, Error>(None), |(pdu_count, _)| {
Ok(Some(match pdu_count {
| PduCount::Backfilled(_) => {
@@ -672,6 +688,62 @@ async fn process_rooms<'a, Rooms>(
}
Ok(rooms)
}
async fn collect_typing_events(
services: &Services,
sender_user: &UserId,
body: &sync_events::v5::Request,
todo_rooms: &TodoRooms,
) -> Result<sync_events::v5::response::Typing> {
if !body.extensions.typing.enabled.unwrap_or(false) {
return Ok(sync_events::v5::response::Typing::default());
}
let rooms: Vec<_> = body.extensions.typing.rooms.clone().unwrap_or_else(|| {
body.room_subscriptions
.keys()
.map(ToOwned::to_owned)
.collect()
});
let lists: Vec<_> = body
.extensions
.typing
.lists
.clone()
.unwrap_or_else(|| body.lists.keys().map(ToOwned::to_owned).collect::<Vec<_>>());
if rooms.is_empty() && lists.is_empty() {
return Ok(sync_events::v5::response::Typing::default());
}
let mut typing_response = sync_events::v5::response::Typing::default();
for (room_id, (_, _, roomsince)) in todo_rooms {
if services.rooms.typing.last_typing_update(room_id).await? <= *roomsince {
continue;
}
match services
.rooms
.typing
.typing_users_for_user(room_id, sender_user)
.await
{
| Ok(typing_users) => {
typing_response.rooms.insert(
room_id.to_owned(), // Already OwnedRoomId
Raw::new(&sync_events::v5::response::SyncTypingEvent {
content: TypingEventContent::new(typing_users),
})?,
);
},
| Err(e) => {
warn!(%room_id, "Failed to get typing events for room: {}", e);
},
}
}
Ok(typing_response)
}
async fn collect_account_data(
services: &Services,
(sender_user, _, globalsince, body): (&UserId, &DeviceId, u64, &sync_events::v5::Request),
@@ -687,7 +759,7 @@ async fn collect_account_data(
account_data.global = services
.account_data
.changes_since(None, sender_user, globalsince, None)
.changes_since(None, sender_user, Some(globalsince), None)
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
.collect()
.await;
@@ -698,7 +770,7 @@ async fn collect_account_data(
room.clone(),
services
.account_data
.changes_since(Some(room), sender_user, globalsince, None)
.changes_since(Some(room), sender_user, Some(globalsince), None)
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect()
.await,
@@ -732,7 +804,7 @@ async fn collect_e2ee<'a, Rooms>(
device_list_changes.extend(
services
.users
.keys_changed(sender_user, globalsince, None)
.keys_changed(sender_user, Some(globalsince), None)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
.await,
@@ -868,7 +940,7 @@ async fn collect_e2ee<'a, Rooms>(
device_list_changes.extend(
services
.users
.room_keys_changed(room_id, globalsince, None)
.room_keys_changed(room_id, Some(globalsince), None)
.map(|(user_id, _)| user_id)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()

View File

@@ -1,6 +1,6 @@
use axum::extract::State;
use conduwuit::{
Result, at,
Result, at, debug_warn,
matrix::{
Event,
pdu::{PduCount, PduEvent},
@@ -45,6 +45,17 @@ pub(crate) async fn get_threads_route(
.await
.then_some((count, pdu))
})
.then(|(count, mut pdu)| async move {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(body.sender_user(), &mut pdu)
.await
{
debug_warn!("Failed to add bundled aggregations to thread: {e}");
}
(count, pdu)
})
.collect()
.await;

View File

@@ -1,4 +1,5 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, utils, utils::math::Tried};
use ruma::api::client::typing::create_typing_event;
@@ -9,10 +10,15 @@
/// Sets the typing state of the sender user.
pub(crate) async fn create_typing_event_route(
State(services): State<crate::State>,
InsecureClientIp(ip): InsecureClientIp,
body: Ruma<create_typing_event::v3::Request>,
) -> Result<create_typing_event::v3::Response> {
use create_typing_event::v3::Typing;
let sender_user = body.sender_user();
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), ip)
.await;
if sender_user != body.user_id && body.appservice_info.is_none() {
return Err!(Request(Forbidden("You cannot update typing status of other users.")));

View File

@@ -52,7 +52,6 @@ pub(crate) async fn get_supported_versions_route(
("org.matrix.msc3026.busy_presence".to_owned(), true), /* busy presence status (https://github.com/matrix-org/matrix-spec-proposals/pull/3026) */
("org.matrix.msc3827".to_owned(), true), /* filtering of /publicRooms by room type (https://github.com/matrix-org/matrix-spec-proposals/pull/3827) */
("org.matrix.msc3952_intentional_mentions".to_owned(), true), /* intentional mentions (https://github.com/matrix-org/matrix-spec-proposals/pull/3952) */
("org.matrix.msc3575".to_owned(), true), /* sliding sync (https://github.com/matrix-org/matrix-spec-proposals/pull/3575/files#r1588877046) */
("org.matrix.msc3916.stable".to_owned(), true), /* authenticated media (https://github.com/matrix-org/matrix-spec-proposals/pull/3916) */
("org.matrix.msc4180".to_owned(), true), /* stable flag for 3916 (https://github.com/matrix-org/matrix-spec-proposals/pull/4180) */
("uk.tcpip.msc4133".to_owned(), true), /* Extending User Profile API with Key:Value Pairs (https://github.com/matrix-org/matrix-spec-proposals/pull/4133) */

View File

@@ -143,7 +143,6 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
.put(client::send_state_event_for_empty_key_route),
)
.ruma_route(&client::sync_events_route)
.ruma_route(&client::sync_events_v4_route)
.ruma_route(&client::sync_events_v5_route)
.ruma_route(&client::get_context_route)
.ruma_route(&client::get_message_events_route)

View File

@@ -3,6 +3,7 @@
use axum::extract::State;
use conduwuit::{
Event, PduCount, Result,
result::LogErr,
utils::{IterStream, ReadyExt, stream::TryTools},
};
use futures::{FutureExt, StreamExt, TryStreamExt};
@@ -62,7 +63,7 @@ pub(crate) async fn get_backfill_route(
pdus: services
.rooms
.timeline
.pdus_rev(None, &body.room_id, Some(from.saturating_add(1)))
.pdus_rev(&body.room_id, Some(from.saturating_add(1)))
.try_take(limit)
.try_filter_map(|(_, pdu)| async move {
Ok(services
@@ -72,6 +73,15 @@ pub(crate) async fn get_backfill_route(
.await
.then_some(pdu))
})
.and_then(async |mut pdu| {
// Strip the transaction ID, as that is private
pdu.remove_transaction_id().log_err().ok();
// Add age, as this is specified
pdu.add_age().log_err().ok();
// It's not clear if we should strip or add any more data, leave as is.
// In particular: Redaction?
Ok(pdu)
})
.try_filter_map(|pdu| async move {
Ok(services
.rooms

View File

@@ -61,6 +61,46 @@ pub(crate) async fn create_invite_route(
let mut signed_event = utils::to_canonical_object(&body.event)
.map_err(|_| err!(Request(InvalidParam("Invite event is invalid."))))?;
// Ensure this is a membership event
if signed_event
.get("type")
.expect("event must have a type")
.as_str()
.expect("type must be a string")
!= "m.room.member"
{
return Err!(Request(BadJson(
"Not allowed to send non-membership event to invite endpoint."
)));
}
let content: RoomMemberEventContent = serde_json::from_value(
signed_event
.get("content")
.ok_or_else(|| err!(Request(BadJson("Event missing content property"))))?
.clone()
.into(),
)
.map_err(|e| err!(Request(BadJson(warn!("Event content is empty or invalid: {e}")))))?;
// Ensure this is an invite membership event
if content.membership != MembershipState::Invite {
return Err!(Request(BadJson(
"Not allowed to send a non-invite membership event to invite endpoint."
)));
}
// Ensure the sending user isn't a lying bozo
let sender_server = signed_event
.get("sender")
.try_into()
.map(UserId::server_name)
.map_err(|e| err!(Request(InvalidParam("Invalid sender property: {e}"))))?;
if sender_server != body.origin() {
return Err!(Request(Forbidden("Sender's server does not match the origin server.",)));
}
// Ensure the target user belongs to this server
let recipient_user: OwnedUserId = signed_event
.get("state_key")
.try_into()
@@ -133,17 +173,21 @@ pub(crate) async fn create_invite_route(
services
.rooms
.state_cache
.update_membership(
&body.room_id,
.mark_as_invited(
&recipient_user,
RoomMemberEventContent::new(MembershipState::Invite),
&body.room_id,
sender_user,
Some(invite_state),
body.via.clone(),
true,
)
.await?;
services
.rooms
.state_cache
.update_joined_count(&body.room_id)
.await;
for appservice in services.appservice.read().await.values() {
if appservice.is_user_match(&recipient_user) {
services

View File

@@ -6,7 +6,7 @@
use std::{
collections::{BTreeMap, BTreeSet},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
path::{Path, PathBuf},
path::PathBuf,
};
use conduwuit_macros::config_example_generator;
@@ -53,9 +53,13 @@
### For more information, see:
### https://continuwuity.org/configuration.html
"#,
ignore = "catchall well_known tls blurhashing allow_invalid_tls_certificates_yes_i_know_what_the_fuck_i_am_doing_with_this_and_i_know_this_is_insecure"
ignore = "config_paths catchall well_known tls blurhashing allow_invalid_tls_certificates_yes_i_know_what_the_fuck_i_am_doing_with_this_and_i_know_this_is_insecure"
)]
pub struct Config {
// Paths to config file(s). Not supposed to be set manually in the config file,
// only updated dynamically from the --config option given at runtime.
pub config_paths: Option<Vec<PathBuf>>,
/// The server_name is the pretty name of this server. It is used as a
/// suffix for user and room IDs/aliases.
///
@@ -703,10 +707,13 @@ pub struct Config {
pub allow_unstable_room_versions: bool,
/// Default room version continuwuity will create rooms with.
/// Note that this has to be a string since the room version is a string
/// rather than an integer. Forgetting the quotes will make the server fail
/// to start!
///
/// Per spec, room version 11 is the default.
/// Per spec, room version "11" is the default.
///
/// default: 11
/// default: "11"
#[serde(default = "default_default_room_version")]
pub default_room_version: RoomVersionId,
@@ -2223,26 +2230,24 @@ struct ListeningAddr {
impl Config {
/// Pre-initialize config
pub fn load<'a, I>(paths: I) -> Result<Figment>
where
I: Iterator<Item = &'a Path>,
{
pub fn load(paths: &[PathBuf]) -> Result<Figment> {
let envs = [
Env::var("CONDUIT_CONFIG"),
Env::var("CONDUWUIT_CONFIG"),
Env::var("CONTINUWUITY_CONFIG"),
];
let config = envs
let mut config = envs
.into_iter()
.flatten()
.map(Toml::file)
.chain(paths.map(Toml::file))
.chain(paths.iter().cloned().map(Toml::file))
.fold(Figment::new(), |config, file| config.merge(file.nested()))
.merge(Env::prefixed("CONDUIT_").global().split("__"))
.merge(Env::prefixed("CONDUWUIT_").global().split("__"))
.merge(Env::prefixed("CONTINUWUITY_").global().split("__"));
config = config.join(("config_paths", paths));
Ok(config)
}

View File

@@ -14,11 +14,12 @@
RoomVersionId::V9,
RoomVersionId::V10,
RoomVersionId::V11,
RoomVersionId::V12,
];
/// Experimental, partially supported room versions
pub const UNSTABLE_ROOM_VERSIONS: &[RoomVersionId] =
&[RoomVersionId::V3, RoomVersionId::V4, RoomVersionId::V5, RoomVersionId::V12];
&[RoomVersionId::V3, RoomVersionId::V4, RoomVersionId::V5];
type RoomVersion = (RoomVersionId, RoomVersionStability);

View File

@@ -56,7 +56,7 @@ fn from(event: Ref<'a, E>) -> Self {
"content": content,
"event_id": event.event_id(),
"origin_server_ts": event.origin_server_ts(),
"room_id": event.room_id(),
"room_id": event.room_id_or_hash(),
"sender": event.sender(),
"type": event.kind(),
});
@@ -117,7 +117,7 @@ fn from(event: Ref<'a, E>) -> Self {
"content": event.content(),
"event_id": event.event_id(),
"origin_server_ts": event.origin_server_ts(),
"room_id": event.room_id(),
"room_id": event.room_id_or_hash(),
"sender": event.sender(),
"state_key": event.state_key(),
"type": event.kind(),

View File

@@ -1,10 +1,23 @@
use std::collections::BTreeMap;
use std::{borrow::Borrow, collections::BTreeMap};
use ruma::MilliSecondsSinceUnixEpoch;
use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value};
use super::Pdu;
use crate::{Result, err, implement};
use crate::{Result, err, implement, result::LogErr};
/// Set the `unsigned` field of the PDU using only information in the PDU.
/// Some unsigned data is already set within the database (eg. prev events,
/// threads). Once this is done, other data must be calculated from the database
/// (eg. relations) This is for server-to-client events.
/// Backfill handles this itself.
#[implement(Pdu)]
pub fn set_unsigned(&mut self, user_id: Option<&ruma::UserId>) {
if Some(self.sender.borrow()) != user_id {
self.remove_transaction_id().log_err().ok();
}
self.add_age().log_err().ok();
}
#[implement(Pdu)]
pub fn remove_transaction_id(&mut self) -> Result {

View File

@@ -177,11 +177,6 @@ pub async fn auth_check<E, F, Fut>(
// [synapse] do_sig_check check the event has valid signatures for member events
// TODO do_size_check is false when called by `iterative_auth_check`
// do_size_check is also mostly accomplished by ruma with the exception of
// checking event_type, state_key, and json are below a certain size (255 and
// 65_536 respectively)
let sender = incoming_event.sender();
// Implementation of https://spec.matrix.org/latest/rooms/v1/#authorization-rules
@@ -908,7 +903,7 @@ struct GetThirdPartyInvite {
false
}
},
| JoinRule::Restricted(_) =>
| JoinRule::Restricted(_) => {
if membership_allows_join || user_for_join_auth_is_valid {
trace!(
%sender,
@@ -928,7 +923,8 @@ struct GetThirdPartyInvite {
valid authorising user given to permit the join"
);
false
},
}
},
| JoinRule::Public => {
trace!(%sender, "join rule is public, allowing join");
true

View File

@@ -36,7 +36,7 @@ fn map_ok_or<U, F>(
) -> MapOkOrElse<Self, impl FnOnce(Self::Ok) -> U, impl FnOnce(Self::Error) -> U>
where
F: FnOnce(Self::Ok) -> U,
Self: Send + Sized;
Self: Sized;
fn ok(
self,
@@ -100,7 +100,7 @@ fn map_ok_or<U, F>(
) -> MapOkOrElse<Self, impl FnOnce(Self::Ok) -> U, impl FnOnce(Self::Error) -> U>
where
F: FnOnce(Self::Ok) -> U,
Self: Send + Sized,
Self: Sized,
{
self.map_ok_or_else(|_| default, f)
}

View File

@@ -1,4 +1,4 @@
use std::{path::PathBuf, sync::Arc};
use std::sync::Arc;
use conduwuit_core::{
Error, Result,
@@ -38,14 +38,9 @@ pub(crate) fn new(
) -> Result<Arc<Self>, Error> {
let _runtime_guard = runtime.map(runtime::Handle::enter);
let config_paths = args
.config
.as_deref()
.into_iter()
.flat_map(<[_]>::iter)
.map(PathBuf::as_path);
let config_paths = args.config.clone().unwrap_or_default();
let config = Config::load(config_paths)
let config = Config::load(&config_paths)
.and_then(|raw| update(raw, args))
.and_then(|raw| Config::new(&raw))?;

View File

@@ -66,7 +66,7 @@ pub(crate) fn build(services: &Arc<Services>) -> Result<(Router, Guard)> {
.layer(RequestBodyTimeoutLayer::new(Duration::from_secs(
server.config.client_receive_timeout,
)))
.layer(TimeoutLayer::new(Duration::from_secs(server.config.client_request_timeout)))
.layer(TimeoutLayer::with_status_code(StatusCode::REQUEST_TIMEOUT, Duration::from_secs(server.config.client_request_timeout)))
.layer(SetResponseHeaderLayer::if_not_present(
HeaderName::from_static("origin-agent-cluster"), // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Origin-Agent-Cluster
HeaderValue::from_static("?1"),

View File

@@ -129,13 +129,14 @@ pub fn changes_since<'a>(
&'a self,
room_id: Option<&'a RoomId>,
user_id: &'a UserId,
since: u64,
since: Option<u64>,
to: Option<u64>,
) -> impl Stream<Item = AnyRawAccountDataEvent> + Send + 'a {
type Key<'a> = (Option<&'a RoomId>, &'a UserId, u64, Ignore);
// Skip the data that's exactly at since, because we sent that last time
let first_possible = (room_id, user_id, since.saturating_add(1));
// ...unless this is an initial sync, in which case send everything
let first_possible = (room_id, user_id, since.map_or(0, |since| since.saturating_add(1)));
self.db
.roomuserdataid_accountdata

View File

@@ -1,4 +1,4 @@
use std::{iter, ops::Deref, path::Path, sync::Arc};
use std::{ops::Deref, path::PathBuf, sync::Arc};
use async_trait::async_trait;
use conduwuit::{
@@ -51,7 +51,8 @@ fn handle_reload(&self) -> Result {
])
.expect("failed to notify systemd of reloading state");
self.reload(iter::empty())?;
let config_paths = self.server.config.config_paths.clone().unwrap_or_default();
self.reload(&config_paths)?;
#[cfg(all(feature = "systemd", target_os = "linux"))]
sd_notify::notify(false, &[sd_notify::NotifyState::Ready])
@@ -62,10 +63,7 @@ fn handle_reload(&self) -> Result {
}
#[implement(Service)]
pub fn reload<'a, I>(&self, paths: I) -> Result<Arc<Config>>
where
I: Iterator<Item = &'a Path>,
{
pub fn reload(&self, paths: &[PathBuf]) -> Result<Arc<Config>> {
let old = self.server.config.clone();
let new = Config::load(paths).and_then(|raw| Config::new(&raw))?;

View File

@@ -31,12 +31,13 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let turn_secret = config.turn_secret_file.as_ref().map_or_else(
|| config.turn_secret.clone(),
|path| {
std::fs::read_to_string(path).unwrap_or_else(|e| {
|path| match std::fs::read_to_string(path) {
| Ok(secret) => secret.trim().to_owned(),
| Err(e) => {
error!("Failed to read the TURN secret file: {e}");
config.turn_secret.clone()
})
},
},
);
@@ -49,7 +50,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
return config.registration_token.clone();
};
Some(token)
Some(token.trim().to_owned())
},
);

View File

@@ -1,7 +1,7 @@
use std::cmp;
use std::{cmp, collections::HashMap};
use conduwuit::{
Err, Result, debug, debug_info, debug_warn, error, info,
Err, Pdu, Result, debug, debug_info, debug_warn, error, info,
result::NotFound,
utils::{
IterStream, ReadyExt,
@@ -13,14 +13,16 @@
use futures::{FutureExt, StreamExt, TryStreamExt};
use itertools::Itertools;
use ruma::{
OwnedUserId, RoomId, UserId,
OwnedRoomId, OwnedUserId, RoomId, UserId,
events::{
GlobalAccountDataEventType, push_rules::PushRulesEvent, room::member::MembershipState,
GlobalAccountDataEventType, StateEventType, push_rules::PushRulesEvent,
room::member::MembershipState,
},
push::Ruleset,
serde::Raw,
};
use crate::{Services, media};
use crate::{Services, media, rooms::short::ShortStateHash};
/// The current schema version.
/// - If database is opened at greater version we reject with error. The
@@ -152,6 +154,14 @@ async fn migrate(services: &Services) -> Result<()> {
info!("Migration: Bumped database version to 18");
}
if db["global"]
.get(POPULATED_USERROOMID_LEFTSTATE_TABLE_MARKER)
.await
.is_not_found()
{
populate_userroomid_leftstate_table(services).await?;
}
assert_eq!(
services.globals.db.database_version().await,
DATABASE_VERSION,
@@ -456,7 +466,11 @@ async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services)
for user_id in &non_joined_members {
debug_info!("User is left or banned, marking as left");
services.rooms.state_cache.mark_as_left(user_id, room_id);
services
.rooms
.state_cache
.mark_as_left(user_id, room_id, None)
.await;
}
}
@@ -576,6 +590,10 @@ async fn fix_readreceiptid_readreceipt_duplicates(services: &Services) -> Result
const FIXED_CORRUPT_MSC4133_FIELDS_MARKER: &[u8] = b"fix_corrupt_msc4133_fields";
async fn fix_corrupt_msc4133_fields(services: &Services) -> Result {
// Due to an old bug, some conduwuit databases have `us.cloke.msc4175.tz` user
// profile fields with raw strings instead of quoted JSON ones.
// This migration fixes that.
use serde_json::{Value, from_slice};
type KeyVal<'a> = ((OwnedUserId, String), &'a [u8]);
@@ -592,24 +610,28 @@ async fn fix_corrupt_msc4133_fields(services: &Services) -> Result {
async |(mut total, mut fixed),
((user, key), value): KeyVal<'_>|
-> Result<(usize, usize)> {
if let Err(error) = from_slice::<Value>(value) {
// Due to an old bug, some conduwuit databases have `us.cloke.msc4175.tz` user
// profile fields with raw strings instead of quoted JSON ones.
// This migration fixes that.
let new_value = if key == "us.cloke.msc4175.tz" {
Value::String(String::from_utf8(value.to_vec())?)
} else {
return Err!(
"failed to deserialize msc4133 key {} of user {}: {}",
key,
user,
error
match from_slice::<Value>(value) {
// corrupted timezone field
| Err(_) if key == "us.cloke.msc4175.tz" => {
let new_value = Value::String(String::from_utf8(value.to_vec())?);
useridprofilekey_value.put((user, key), Json(new_value));
fixed = fixed.saturating_add(1);
},
// corrupted value for some other key
| Err(error) => {
warn!(
"deleting MSC4133 key {} for user {} due to deserialization \
failure: {}",
key, user, error
);
};
useridprofilekey_value.put((user, key), Json(new_value));
fixed = fixed.saturating_add(1);
useridprofilekey_value.del((user, key));
},
// other key with no issues
| Ok(_) => {
// do nothing
},
}
total = total.saturating_add(1);
Ok((total, fixed))
@@ -624,3 +646,78 @@ async fn fix_corrupt_msc4133_fields(services: &Services) -> Result {
db.db.sort()?;
Ok(())
}
const POPULATED_USERROOMID_LEFTSTATE_TABLE_MARKER: &str = "populate_userroomid_leftstate_table";
async fn populate_userroomid_leftstate_table(services: &Services) -> Result {
type KeyVal<'a> = (Key<'a>, Raw<Option<Pdu>>);
type Key<'a> = (&'a UserId, &'a RoomId);
let db = &services.db;
let cork = db.cork_and_sync();
let userroomid_leftstate = db["userroomid_leftstate"].clone();
let (total, fixed, _) = userroomid_leftstate
.stream()
.try_fold(
(0_usize, 0_usize, HashMap::<OwnedRoomId, ShortStateHash>::new()),
async |(mut total, mut fixed, mut shortstatehash_cache): (
usize,
usize,
HashMap<_, _>,
),
((user_id, room_id), state): KeyVal<'_>|
-> Result<(usize, usize, HashMap<_, _>)> {
if state.deserialize().is_err() {
let latest_shortstatehash =
if let Some(shortstatehash) = shortstatehash_cache.get(room_id) {
*shortstatehash
} else if let Ok(shortstatehash) =
services.rooms.state.get_room_shortstatehash(room_id).await
{
shortstatehash_cache.insert(room_id.to_owned(), shortstatehash);
shortstatehash
} else {
warn!(?room_id, ?user_id, "room has no shortstatehash");
return Ok((total, fixed, shortstatehash_cache));
};
let leave_state_event = services
.rooms
.state_accessor
.state_get(
latest_shortstatehash,
&StateEventType::RoomMember,
user_id.as_str(),
)
.await;
match leave_state_event {
| Ok(leave_state_event) => {
userroomid_leftstate.put((user_id, room_id), Json(leave_state_event));
fixed = fixed.saturating_add(1);
},
| Err(_) => {
warn!(
?room_id,
?user_id,
"room cached as left has no leave event for user, removing \
cache entry"
);
userroomid_leftstate.del((user_id, room_id));
},
}
}
total = total.saturating_add(1);
Ok((total, fixed, shortstatehash_cache))
},
)
.await?;
drop(cork);
info!(?total, ?fixed, "Fixed entries in `userroomid_leftstate`.");
db["global"].insert(POPULATED_USERROOMID_LEFTSTATE_TABLE_MARKER, []);
db.db.sort()?;
Ok(())
}

View File

@@ -14,7 +14,7 @@
use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UserId, events::StateEventType};
use tracing::debug;
use crate::rooms::timeline::RawPduId;
use crate::rooms::timeline::{RawPduId, pdu_fits};
/// When receiving an event one needs to:
/// 0. Check the server is in the room
@@ -62,6 +62,13 @@ pub async fn handle_incoming_pdu<'a>(
if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await {
return Ok(Some(pdu_id));
}
if !pdu_fits(&mut value.clone()) {
warn!(
"dropping incoming PDU {event_id} in room {room_id} from {origin} because it \
exceeds 65535 bytes or is otherwise too large."
);
return Err!(Request(TooLarge("PDU is too large")));
}
// 1.1 Check the server is in the room
let meta_exists = self.services.metadata.exists(room_id).map(Ok);

View File

@@ -1,7 +1,8 @@
use std::collections::{BTreeMap, HashMap, hash_map};
use conduwuit::{
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res, trace,
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res,
trace, warn,
};
use futures::future::ready;
use ruma::{
@@ -10,6 +11,7 @@
};
use super::{check_room_id, get_room_version_id, to_room_version};
use crate::rooms::timeline::pdu_fits;
#[implement(super::Service)]
#[allow(clippy::too_many_arguments)]
@@ -25,6 +27,13 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
where
Pdu: Event + Send + Sync,
{
if !pdu_fits(&mut value.clone()) {
warn!(
"dropping incoming PDU {event_id} in room {room_id} from {origin} because it \
exceeds 65535 bytes or is otherwise too large."
);
return Err!(Request(TooLarge("PDU is too large")));
}
// 1. Remove unsigned field
value.remove("unsigned");

View File

@@ -3,14 +3,21 @@
//! This module implements a check against a room-specific policy server, as
//! described in the relevant Matrix spec proposal (see: https://github.com/matrix-org/matrix-spec-proposals/pull/4284).
use std::time::Duration;
use std::{collections::BTreeMap, time::Duration};
use conduwuit::{Err, Event, PduEvent, Result, debug, debug_info, implement, trace, warn};
use conduwuit::{
Err, Event, PduEvent, Result, debug, debug_error, debug_info, debug_warn, implement, trace,
warn,
};
use ruma::{
CanonicalJsonObject, RoomId, ServerName,
api::federation::room::policy::v1::Request as PolicyRequest,
CanonicalJsonObject, CanonicalJsonValue, KeyId, RoomId, ServerName, SigningKeyId,
api::federation::room::{
policy_check::unstable::Request as PolicyCheckRequest,
policy_sign::unstable::Request as PolicySignRequest,
},
events::{StateEventType, room::policy::RoomPolicyEventContent},
};
use serde_json::value::RawValue;
/// Asks a remote policy server if the event is allowed.
///
@@ -24,25 +31,18 @@
/// contacted for whatever reason, Err(e) is returned, which generally is a
/// fail-open operation.
#[implement(super::Service)]
#[tracing::instrument(skip_all, level = "debug")]
#[tracing::instrument(skip(self, pdu, pdu_json, room_id))]
pub async fn ask_policy_server(
&self,
pdu: &PduEvent,
pdu_json: &CanonicalJsonObject,
pdu_json: &mut CanonicalJsonObject,
room_id: &RoomId,
incoming: bool,
) -> Result<bool> {
if !self.services.server.config.enable_msc4284_policy_servers {
trace!("policy server checking is disabled");
return Ok(true); // don't ever contact policy servers
}
if self.services.server.config.policy_server_check_own_events
&& pdu.origin.is_some()
&& self
.services
.server
.is_ours(pdu.origin.as_ref().unwrap().as_str())
{
return Ok(true); // don't contact policy servers for locally generated events
}
if *pdu.event_type() == StateEventType::RoomPolicy.into() {
debug!(
@@ -52,16 +52,29 @@ pub async fn ask_policy_server(
);
return Ok(true);
}
let Ok(policyserver) = self
.services
.state_accessor
.room_state_get_content(room_id, &StateEventType::RoomPolicy, "")
.await
.inspect_err(|e| debug_error!("failed to load room policy server state event: {e}"))
.map(|c: RoomPolicyEventContent| c)
else {
debug!("room has no policy server configured");
return Ok(true);
};
if self.services.server.config.policy_server_check_own_events
&& !incoming
&& policyserver.public_key.is_none()
{
// don't contact policy servers for locally generated events, but only when the
// policy server does not require signatures
trace!("won't contact policy server for locally generated event");
return Ok(true);
}
let via = match policyserver.via {
| Some(ref via) => ServerName::parse(via)?,
| None => {
@@ -75,7 +88,6 @@ pub async fn ask_policy_server(
}
if !self.services.state_cache.server_in_room(via, room_id).await {
debug!(
room_id = %room_id,
via = %via,
"Policy server is not in the room, skipping spam check"
);
@@ -86,17 +98,43 @@ pub async fn ask_policy_server(
.sending
.convert_to_outgoing_federation_event(pdu_json.clone())
.await;
if policyserver.public_key.is_some() {
if !incoming {
debug_info!(
via = %via,
outgoing = ?pdu_json,
"Getting policy server signature on event"
);
return self
.fetch_policy_server_signature(pdu, pdu_json, via, outgoing, room_id)
.await;
}
// for incoming events, is it signed by <via> with the key
// "ed25519:policy_server"?
if let Some(CanonicalJsonValue::Object(sigs)) = pdu_json.get("signatures") {
if let Some(CanonicalJsonValue::Object(server_sigs)) = sigs.get(via.as_str()) {
let wanted_key_id: &KeyId<ruma::SigningKeyAlgorithm, ruma::Base64PublicKey> =
SigningKeyId::parse("ed25519:policy_server")?;
if let Some(CanonicalJsonValue::String(_sig_value)) =
server_sigs.get(wanted_key_id.as_str())
{
// TODO: verify signature
}
}
}
debug!(
"Event is not local and has no policy server signature, performing legacy spam check"
);
}
debug_info!(
room_id = %room_id,
via = %via,
outgoing = ?pdu_json,
"Checking event for spam with policy server"
"Checking event for spam with policy server via legacy check"
);
let response = tokio::time::timeout(
Duration::from_secs(self.services.server.config.policy_server_request_timeout),
self.services
.sending
.send_federation_request(via, PolicyRequest {
.send_federation_request(via, PolicyCheckRequest {
event_id: pdu.event_id().to_owned(),
pdu: Some(outgoing),
}),
@@ -142,3 +180,120 @@ pub async fn ask_policy_server(
Ok(true)
}
/// Asks a remote policy server for a signature on this event.
/// If the policy server signs this event, the original data is mutated.
#[implement(super::Service)]
#[tracing::instrument(skip_all, fields(event_id=%pdu.event_id(), via=%via))]
pub async fn fetch_policy_server_signature(
&self,
pdu: &PduEvent,
pdu_json: &mut CanonicalJsonObject,
via: &ServerName,
outgoing: Box<RawValue>,
room_id: &RoomId,
) -> Result<bool> {
debug!("Requesting policy server signature");
let response = tokio::time::timeout(
Duration::from_secs(self.services.server.config.policy_server_request_timeout),
self.services
.sending
.send_federation_request(via, PolicySignRequest { pdu: outgoing }),
)
.await;
let response = match response {
| Ok(Ok(response)) => {
debug!("Response from policy server: {:?}", response);
response
},
| Ok(Err(e)) => {
warn!(
via = %via,
event_id = %pdu.event_id(),
room_id = %room_id,
"Failed to contact policy server: {e}"
);
// Network or policy server errors are treated as non-fatal: event is allowed by
// default.
return Err(e);
},
| Err(elapsed) => {
warn!(
%via,
event_id = %pdu.event_id(),
%room_id,
%elapsed,
"Policy server request timed out after 10 seconds"
);
return Err!("Request to policy server timed out");
},
};
if response.signatures.is_none() {
debug!("Policy server refused to sign event");
return Ok(false);
}
let sigs: ruma::Signatures<ruma::OwnedServerName, ruma::ServerSigningKeyVersion> =
response.signatures.unwrap();
if !sigs.contains_key(via) {
debug_warn!(
"Policy server returned signatures, but did not include the expected server name \
'{}': {:?}",
via,
sigs
);
return Ok(false);
}
let keypairs = sigs.get(via).unwrap();
let wanted_key_id = KeyId::parse("ed25519:policy_server")?;
if !keypairs.contains_key(wanted_key_id) {
debug_warn!(
"Policy server returned signature, but did not use the key ID \
'ed25519:policy_server'."
);
return Ok(false);
}
let signatures_entry = pdu_json
.entry("signatures".to_owned())
.or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default()));
if let CanonicalJsonValue::Object(signatures_map) = signatures_entry {
let sig_value = keypairs.get(wanted_key_id).unwrap().to_owned();
match signatures_map.get_mut(via.as_str()) {
| Some(CanonicalJsonValue::Object(inner_map)) => {
trace!("inserting PS signature: {}", sig_value);
inner_map.insert(
"ed25519:policy_server".to_owned(),
CanonicalJsonValue::String(sig_value),
);
},
| Some(_) => {
debug_warn!(
"Existing `signatures[{}]` field is not an object; cannot insert policy \
signature",
via
);
return Ok(false);
},
| None => {
let mut inner = BTreeMap::new();
inner.insert(
"ed25519:policy_server".to_owned(),
CanonicalJsonValue::String(sig_value.clone()),
);
trace!(
"created new signatures object for {via} with the signature {}",
sig_value
);
signatures_map.insert(via.as_str().to_owned(), CanonicalJsonValue::Object(inner));
},
}
} else {
debug_warn!(
"Existing `signatures` field is not an object; cannot insert policy signature"
);
return Ok(false);
}
Ok(true)
}

View File

@@ -256,7 +256,12 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
if incoming_pdu.state_key.is_none() {
debug!(event_id = %incoming_pdu.event_id, "Checking policy server for event");
match self
.ask_policy_server(&incoming_pdu, &incoming_pdu.to_canonical_object(), room_id)
.ask_policy_server(
&incoming_pdu,
&mut incoming_pdu.to_canonical_object(),
room_id,
true,
)
.await
{
| Ok(false) => {

View File

@@ -39,7 +39,7 @@ pub enum Status {
Seen(u64),
}
pub type Witness = HashSet<OwnedUserId>;
pub type MemberSet = HashSet<OwnedUserId>;
type Key<'a> = (&'a UserId, Option<&'a DeviceId>, &'a RoomId, &'a UserId);
impl crate::Service for Service {
@@ -67,9 +67,11 @@ pub async fn reset(&self, ctx: &Context<'_>) {
.await;
}
/// Returns only the subset of `senders` which should be sent to the client
/// according to the provided lazy loading context.
#[implement(Service)]
#[tracing::instrument(name = "retain", level = "debug", skip_all)]
pub async fn witness_retain(&self, senders: Witness, ctx: &Context<'_>) -> Witness {
pub async fn retain_lazy_members(&self, senders: MemberSet, ctx: &Context<'_>) -> MemberSet {
debug_assert!(
ctx.options.is_none_or(Options::is_enabled),
"lazy loading should be enabled by your options"
@@ -84,7 +86,7 @@ pub async fn witness_retain(&self, senders: Witness, ctx: &Context<'_>) -> Witne
pin_mut!(witness);
let _cork = self.db.db.cork();
let mut senders = Witness::with_capacity(senders.len());
let mut senders = MemberSet::with_capacity(senders.len());
while let Some((status, sender)) = witness.next().await {
if include_redundant || status == Status::Unseen {
senders.insert(sender.into());

View File

@@ -0,0 +1,746 @@
use conduwuit::{Event, PduEvent, Result, err};
use ruma::{
UserId,
api::Direction,
events::relation::{BundledMessageLikeRelations, BundledReference, ReferenceChunk},
};
use crate::rooms::timeline::PdusIterItem;
const MAX_BUNDLED_RELATIONS: usize = 50;
impl super::Service {
/// Gets bundled aggregations for an event according to the Matrix
/// specification.
/// - m.replace relations are bundled to include the most recent replacement
/// event.
/// - m.reference relations are bundled to include a chunk of event IDs.
#[tracing::instrument(skip(self), level = "debug")]
pub async fn get_bundled_aggregations(
&self,
user_id: &UserId,
pdu: &PduEvent,
) -> Result<Option<BundledMessageLikeRelations<Box<serde_json::value::RawValue>>>> {
// Events that can never get bundled aggregations
if pdu.state_key().is_some() || Self::is_replacement_event(pdu) {
return Ok(None);
}
let relations = self
.get_relations(
user_id,
&pdu.room_id_or_hash(),
pdu.event_id(),
conduwuit::PduCount::max(),
MAX_BUNDLED_RELATIONS,
0,
Direction::Backward,
)
.await;
// The relations database code still handles the basic unsigned data
// We don't want to recursively fetch relations
if relations.is_empty() {
return Ok(None);
}
// Partition relations by type
let (replace_events, reference_events): (Vec<_>, Vec<_>) = relations
.iter()
.filter_map(|relation| {
let pdu = &relation.1;
let content = pdu.get_content_as_value();
content
.get("m.relates_to")
.and_then(|relates_to| relates_to.get("rel_type"))
.and_then(|rel_type| rel_type.as_str())
.and_then(|rel_type_str| match rel_type_str {
| "m.replace" => Some(RelationType::Replace(relation)),
| "m.reference" => Some(RelationType::Reference(relation)),
| _ => None, /* Ignore other relation types (threads are in DB but not
* handled here) */
})
})
.fold((Vec::new(), Vec::new()), |(mut replaces, mut references), rel_type| {
match rel_type {
| RelationType::Replace(r) => replaces.push(r),
| RelationType::Reference(r) => references.push(r),
}
(replaces, references)
});
// If no relations to bundle, return None
if replace_events.is_empty() && reference_events.is_empty() {
return Ok(None);
}
let mut bundled = BundledMessageLikeRelations::<Box<serde_json::value::RawValue>>::new();
// Handle m.replace relations - find the most recent valid one (lazy load
// original event)
if !replace_events.is_empty() {
if let Some(replacement) = self
.find_most_recent_valid_replacement(user_id, pdu, &replace_events)
.await?
{
bundled.replace = Some(Self::serialize_replacement(replacement)?);
}
}
// Handle m.reference relations - collect event IDs
if !reference_events.is_empty() {
let reference_chunk: Vec<_> = reference_events
.into_iter()
.map(|relation| BundledReference::new(relation.1.event_id().to_owned()))
.collect();
if !reference_chunk.is_empty() {
bundled.reference = Some(Box::new(ReferenceChunk::new(reference_chunk)));
}
}
// TODO: Handle other relation types (m.annotation, etc.) when specified
Ok(Some(bundled))
}
/// Serialize a replacement event to the bundled format
fn serialize_replacement(pdu: &PduEvent) -> Result<Box<Box<serde_json::value::RawValue>>> {
let replacement_json = serde_json::to_string(pdu)
.map_err(|e| err!(Database("Failed to serialize replacement event: {e}")))?;
let raw_value = serde_json::value::RawValue::from_string(replacement_json)
.map_err(|e| err!(Database("Failed to create RawValue: {e}")))?;
Ok(Box::new(raw_value))
}
/// Find the most recent valid replacement event based on origin_server_ts
/// and lexicographic event_id ordering
async fn find_most_recent_valid_replacement<'a>(
&self,
user_id: &UserId,
original_event: &PduEvent,
replacement_events: &[&'a PdusIterItem],
) -> Result<Option<&'a PduEvent>> {
// Filter valid replacements and find the maximum in a single pass
let mut result: Option<&PduEvent> = None;
for relation in replacement_events {
let pdu = &relation.1;
// Validate replacement
if !Self::is_valid_replacement_event(original_event, pdu).await? {
continue;
}
let next = match result {
| None => Some(pdu),
| Some(current) => {
// Compare by origin_server_ts first, then event_id lexicographically
match pdu.origin_server_ts().cmp(&current.origin_server_ts()) {
| std::cmp::Ordering::Greater => Some(pdu),
| std::cmp::Ordering::Equal if pdu.event_id() > current.event_id() =>
Some(pdu),
| _ => None,
}
},
};
if let Some(pdu) = next
&& self
.services
.state_accessor
.user_can_see_event(user_id, &pdu.room_id_or_hash(), pdu.event_id())
.await
{
result = Some(pdu);
}
}
Ok(result)
}
/// Adds bundled aggregations to a PDU's unsigned field
#[tracing::instrument(skip(self, pdu), level = "debug")]
pub async fn add_bundled_aggregations_to_pdu(
&self,
user_id: &UserId,
pdu: &mut PduEvent,
) -> Result<()> {
if pdu.is_redacted() {
return Ok(());
}
let bundled_aggregations = self.get_bundled_aggregations(user_id, pdu).await?;
if let Some(aggregations) = bundled_aggregations {
let aggregations_json = serde_json::to_value(aggregations)
.map_err(|e| err!(Database("Failed to serialize bundled aggregations: {e}")))?;
Self::add_bundled_aggregations_to_unsigned(pdu, aggregations_json)?;
}
Ok(())
}
/// Helper method to add bundled aggregations to a PDU's unsigned field
fn add_bundled_aggregations_to_unsigned(
pdu: &mut PduEvent,
aggregations_json: serde_json::Value,
) -> Result<()> {
use serde_json::{
Map, Value as JsonValue,
value::{RawValue as RawJsonValue, to_raw_value},
};
let mut unsigned: Map<String, JsonValue> = pdu
.unsigned
.as_deref()
.map(RawJsonValue::get)
.map_or_else(|| Ok(Map::new()), serde_json::from_str)
.map_err(|e| err!(Database("Invalid unsigned in pdu event: {e}")))?;
let relations = unsigned
.entry("m.relations")
.or_insert_with(|| JsonValue::Object(Map::new()))
.as_object_mut()
.ok_or_else(|| err!(Database("m.relations is not an object")))?;
if let JsonValue::Object(aggregations_map) = aggregations_json {
relations.extend(aggregations_map);
}
pdu.unsigned = Some(to_raw_value(&unsigned)?);
Ok(())
}
/// Validates that an event is acceptable as a replacement for another event
/// See C/S spec "Validity of replacement events"
#[tracing::instrument(level = "debug")]
async fn is_valid_replacement_event(
original_event: &PduEvent,
replacement_event: &PduEvent,
) -> Result<bool> {
Ok(
// 1. Same room_id
original_event.room_id() == replacement_event.room_id()
// 2. Same sender
&& original_event.sender() == replacement_event.sender()
// 3. Same type
&& original_event.event_type() == replacement_event.event_type()
// 4. Neither event should have a state_key property
&& original_event.state_key().is_none()
&& replacement_event.state_key().is_none()
// 5. Original event must not have rel_type of m.replace
&& !Self::is_replacement_event(original_event)
// 6. Replacement event must have m.new_content property (skip for encrypted)
&& Self::has_new_content_or_encrypted(replacement_event),
)
}
/// Check if an event is itself a replacement
#[inline]
fn is_replacement_event(event: &PduEvent) -> bool {
event
.get_content_as_value()
.get("m.relates_to")
.and_then(|relates_to| relates_to.get("rel_type"))
.and_then(|rel_type| rel_type.as_str())
.is_some_and(|rel_type| rel_type == "m.replace")
}
/// Check if event has m.new_content or is encrypted (where m.new_content
/// would be in the encrypted payload)
#[inline]
fn has_new_content_or_encrypted(event: &PduEvent) -> bool {
event.event_type() == &ruma::events::TimelineEventType::RoomEncrypted
|| event.get_content_as_value().get("m.new_content").is_some()
}
}
/// Helper enum for partitioning relations
enum RelationType<'a> {
Replace(&'a PdusIterItem),
Reference(&'a PdusIterItem),
}
#[cfg(test)]
mod tests {
use conduwuit_core::pdu::{EventHash, PduEvent};
use ruma::{UInt, events::TimelineEventType, owned_event_id, owned_room_id, owned_user_id};
use serde_json::{Value as JsonValue, json, value::to_raw_value};
fn create_test_pdu(unsigned_content: Option<JsonValue>) -> PduEvent {
PduEvent {
event_id: owned_event_id!("$test:example.com"),
room_id: Some(owned_room_id!("!test:example.com")),
sender: owned_user_id!("@test:example.com"),
origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(),
kind: TimelineEventType::RoomMessage,
content: to_raw_value(&json!({"msgtype": "m.text", "body": "test"})).unwrap(),
state_key: None,
prev_events: vec![],
depth: UInt::from(1_u32),
auth_events: vec![],
redacts: None,
unsigned: unsigned_content.map(|content| to_raw_value(&content).unwrap()),
hashes: EventHash { sha256: "test_hash".to_owned() },
signatures: None,
origin: None,
}
}
fn create_bundled_aggregations() -> JsonValue {
json!({
"m.replace": {
"event_id": "$replace:example.com",
"origin_server_ts": 1_234_567_890,
"sender": "@replacer:example.com"
},
"m.reference": {
"count": 5,
"chunk": [
"$ref1:example.com",
"$ref2:example.com"
]
}
})
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_no_existing_unsigned() {
let mut pdu = create_test_pdu(None);
let aggregations = create_bundled_aggregations();
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
&mut pdu,
aggregations.clone(),
);
assert!(result.is_ok(), "Should succeed when no unsigned field exists");
assert!(pdu.unsigned.is_some(), "Unsigned field should be created");
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
assert!(unsigned.get("m.relations").is_some(), "m.relations should exist");
assert_eq!(
unsigned["m.relations"], aggregations,
"Relations should match the aggregations"
);
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_overwrite_same_relation_type() {
let existing_unsigned = json!({
"m.relations": {
"m.replace": {
"event_id": "$old_replace:example.com",
"origin_server_ts": 1_111_111_111,
"sender": "@old_replacer:example.com"
}
}
});
let mut pdu = create_test_pdu(Some(existing_unsigned));
let new_aggregations = create_bundled_aggregations();
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
&mut pdu,
new_aggregations.clone(),
);
assert!(result.is_ok(), "Should succeed when overwriting same relation type");
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
let relations = &unsigned["m.relations"];
assert_eq!(
relations["m.replace"], new_aggregations["m.replace"],
"m.replace should be updated"
);
assert_eq!(
relations["m.replace"]["event_id"], "$replace:example.com",
"Should have new event_id"
);
assert!(relations.get("m.reference").is_some(), "New m.reference should be added");
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_preserve_other_unsigned_fields() {
// Test case: Other unsigned fields should be preserved
let existing_unsigned = json!({
"age": 98765,
"prev_content": {"msgtype": "m.text", "body": "old message"},
"redacted_because": {"event_id": "$redaction:example.com"},
"m.relations": {
"m.annotation": {"count": 1}
}
});
let mut pdu = create_test_pdu(Some(existing_unsigned));
let new_aggregations = json!({
"m.replace": {"event_id": "$new:example.com"}
});
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
&mut pdu,
new_aggregations,
);
assert!(result.is_ok(), "Should succeed while preserving other fields");
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
// Verify all existing fields are preserved
assert_eq!(unsigned["age"], 98765, "age should be preserved");
assert!(unsigned.get("prev_content").is_some(), "prev_content should be preserved");
assert!(
unsigned.get("redacted_because").is_some(),
"redacted_because should be preserved"
);
// Verify relations were merged correctly
let relations = &unsigned["m.relations"];
assert!(
relations.get("m.annotation").is_some(),
"Existing m.annotation should be preserved"
);
assert!(relations.get("m.replace").is_some(), "New m.replace should be added");
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_invalid_existing_unsigned() {
// Test case: Invalid JSON in existing unsigned should result in error
let mut pdu = create_test_pdu(None);
// Manually set invalid unsigned data
pdu.unsigned = Some(to_raw_value(&"invalid json").unwrap());
let aggregations = create_bundled_aggregations();
let result =
super::super::Service::add_bundled_aggregations_to_unsigned(&mut pdu, aggregations);
assert!(result.is_err(), "fails when existing unsigned is invalid");
// Should we ignore the error and overwrite anyway?
}
// Test helper function to create test PDU events
fn create_test_event(
event_id: &str,
room_id: &str,
sender: &str,
event_type: TimelineEventType,
content: &JsonValue,
state_key: Option<&str>,
) -> PduEvent {
PduEvent {
event_id: event_id.try_into().unwrap(),
room_id: Some(room_id.try_into().unwrap()),
sender: sender.try_into().unwrap(),
origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(),
kind: event_type,
content: to_raw_value(&content).unwrap(),
state_key: state_key.map(Into::into),
prev_events: vec![],
depth: UInt::from(1_u32),
auth_events: vec![],
redacts: None,
unsigned: None,
hashes: EventHash { sha256: "test_hash".to_owned() },
signatures: None,
origin: None,
}
}
/// Test that a valid replacement event passes validation
#[tokio::test]
async fn test_valid_replacement_event() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message",
"m.new_content": {
"msgtype": "m.text",
"body": "edited message"
},
"m.relates_to": {
"rel_type": "m.replace",
"event_id": "$original:example.com"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(result.unwrap(), "Valid replacement event should be accepted");
}
/// Test replacement event with different room ID is rejected
#[tokio::test]
async fn test_replacement_event_different_room() {
let original = create_test_event(
"$original:example.com",
"!room1:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room2:example.com", // Different room
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message",
"m.new_content": {
"msgtype": "m.text",
"body": "edited message"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Different room ID should be rejected");
}
/// Test replacement event with different sender is rejected
#[tokio::test]
async fn test_replacement_event_different_sender() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user1:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user2:example.com", // Different sender
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message",
"m.new_content": {
"msgtype": "m.text",
"body": "edited message"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Different sender should be rejected");
}
/// Test replacement event with different type is rejected
#[tokio::test]
async fn test_replacement_event_different_type() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomTopic, // Different event type
&json!({
"topic": "new topic",
"m.new_content": {
"topic": "new topic"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Different event type should be rejected");
}
/// Test replacement event with state key is rejected
#[tokio::test]
async fn test_replacement_event_with_state_key() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomName,
&json!({"name": "room name"}),
Some(""), // Has state key
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomName,
&json!({
"name": "new room name",
"m.new_content": {
"name": "new room name"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Event with state key should be rejected");
}
/// Test replacement of an event that is already a replacement is rejected
#[tokio::test]
async fn test_replacement_event_original_is_replacement() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message",
"m.relates_to": {
"rel_type": "m.replace", // Original is already a replacement
"event_id": "$some_other:example.com"
}
}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited again",
"m.new_content": {
"msgtype": "m.text",
"body": "edited again"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Replacement of replacement should be rejected");
}
/// Test replacement event missing m.new_content is rejected
#[tokio::test]
async fn test_replacement_event_missing_new_content() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message"
// Missing m.new_content
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Missing m.new_content should be rejected");
}
/// Test encrypted replacement event without m.new_content is accepted
#[tokio::test]
async fn test_replacement_event_encrypted_missing_new_content_is_valid() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomEncrypted,
&json!({
"algorithm": "m.megolm.v1.aes-sha2",
"ciphertext": "encrypted_payload_base64",
"sender_key": "sender_key",
"session_id": "session_id"
}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomEncrypted,
&json!({
"algorithm": "m.megolm.v1.aes-sha2",
"ciphertext": "encrypted_replacement_payload_base64",
"sender_key": "sender_key",
"session_id": "session_id",
"m.relates_to": {
"rel_type": "m.replace",
"event_id": "$original:example.com"
}
// No m.new_content in cleartext - this is valid for encrypted events
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(
result.unwrap(),
"Encrypted replacement without cleartext m.new_content should be accepted"
);
}
}

View File

@@ -3,7 +3,6 @@
use conduwuit::{
arrayvec::ArrayVec,
matrix::{Event, PduCount},
result::LogErr,
utils::{
ReadyExt,
stream::{TryIgnore, WidebandExt},
@@ -15,10 +14,11 @@
use ruma::{EventId, RoomId, UserId, api::Direction};
use crate::{
Dep, rooms,
Dep,
rooms::{
self,
short::{ShortEventId, ShortRoomId},
timeline::{PduId, RawPduId},
timeline::{PduId, PdusIterItem, RawPduId},
},
};
@@ -60,7 +60,7 @@ pub(super) fn get_relations<'a>(
target: ShortEventId,
from: PduCount,
dir: Direction,
) -> impl Stream<Item = (PduCount, impl Event)> + Send + 'a {
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
// Query from exact position then filter excludes it (saturating_inc could skip
// events at min/max boundaries)
let from_unsigned = from.into_unsigned();
@@ -92,9 +92,7 @@ pub(super) fn get_relations<'a>(
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
if pdu.sender() != user_id {
pdu.as_mut_pdu().remove_transaction_id().log_err().ok();
}
pdu.as_mut_pdu().set_unsigned(Some(user_id));
Some((shorteventid, pdu))
})

View File

@@ -1,15 +1,16 @@
mod bundled_aggregations;
mod data;
use std::sync::Arc;
use conduwuit::{
Result,
matrix::{Event, PduCount},
};
use conduwuit::{Result, matrix::PduCount};
use futures::{StreamExt, future::try_join};
use ruma::{EventId, RoomId, UserId, api::Direction};
use self::data::Data;
use crate::{Dep, rooms};
use crate::{
Dep,
rooms::{self, timeline::PdusIterItem},
};
pub struct Service {
services: Services,
@@ -19,6 +20,7 @@ pub struct Service {
struct Services {
short: Dep<rooms::short::Service>,
timeline: Dep<rooms::timeline::Service>,
state_accessor: Dep<rooms::state_accessor::Service>,
}
impl crate::Service for Service {
@@ -27,6 +29,8 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
services: Services {
short: args.depend::<rooms::short::Service>("rooms::short"),
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
state_accessor: args
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
},
db: Data::new(&args),
}))
@@ -56,7 +60,7 @@ pub async fn get_relations<'a>(
limit: usize,
max_depth: u8,
dir: Direction,
) -> Vec<(PduCount, impl Event)> {
) -> Vec<PdusIterItem> {
let room_id = self.services.short.get_shortroomid(room_id);
let target = self.services.timeline.get_pdu_count(target);

View File

@@ -7,7 +7,7 @@
use database::{Deserialized, Json, Map};
use futures::{Stream, StreamExt};
use ruma::{
CanonicalJsonObject, RoomId, UserId,
CanonicalJsonObject, OwnedUserId, RoomId, UserId,
events::{AnySyncEphemeralRoomEvent, receipt::ReceiptEvent},
serde::Raw,
};
@@ -25,7 +25,7 @@ struct Services {
globals: Dep<globals::Service>,
}
pub(super) type ReceiptItem<'a> = (&'a UserId, u64, Raw<AnySyncEphemeralRoomEvent>);
pub(super) type ReceiptItem = (OwnedUserId, u64, Raw<AnySyncEphemeralRoomEvent>);
impl Data {
pub(super) fn new(args: &crate::Args<'_>) -> Self {
@@ -65,7 +65,7 @@ pub(super) fn readreceipts_since<'a>(
&'a self,
room_id: &'a RoomId,
since: u64,
) -> impl Stream<Item = ReceiptItem<'a>> + Send + 'a {
) -> impl Stream<Item = ReceiptItem> + Send + 'a {
type Key<'a> = (&'a RoomId, u64, &'a UserId);
type KeyVal<'a> = (Key<'a>, CanonicalJsonObject);
@@ -81,7 +81,7 @@ pub(super) fn readreceipts_since<'a>(
let event = serde_json::value::to_raw_value(&json)?;
Ok((user_id, count, Raw::from_json(event)))
Ok((user_id.to_owned(), count, Raw::from_json(event)))
})
.ignore_err()
}

View File

@@ -104,16 +104,16 @@ pub async fn private_read_get(
Ok(Raw::from_json(event))
}
/// Returns an iterator over the most recent read_receipts in a room that
/// happened after the event with id `since`.
/// Returns an iterator over the most recent read_receipts in a room,
/// optionally after the event with id `since`.
#[inline]
#[tracing::instrument(skip(self), level = "debug")]
pub fn readreceipts_since<'a>(
&'a self,
room_id: &'a RoomId,
since: u64,
) -> impl Stream<Item = ReceiptItem<'a>> + Send + 'a {
self.db.readreceipts_since(room_id, since)
since: Option<u64>,
) -> impl Stream<Item = ReceiptItem> + Send + 'a {
self.db.readreceipts_since(room_id, since.unwrap_or(0))
}
/// Sets a private read marker at PDU `count`.

View File

@@ -1,9 +1,9 @@
use std::sync::Arc;
use conduwuit::{
PduCount, Result,
PduCount, PduEvent, Result,
arrayvec::ArrayVec,
implement,
debug_warn, implement,
matrix::event::{Event, Matches},
utils::{
ArrayVecExt, IterStream, ReadyExt, set,
@@ -35,6 +35,7 @@ struct Services {
short: Dep<rooms::short::Service>,
state_accessor: Dep<rooms::state_accessor::Service>,
timeline: Dep<rooms::timeline::Service>,
pdu_metadata: Dep<rooms::pdu_metadata::Service>,
}
#[derive(Clone, Debug)]
@@ -61,6 +62,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
state_accessor: args
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
pdu_metadata: args.depend::<rooms::pdu_metadata::Service>("rooms::pdu_metadata"),
},
}))
}
@@ -104,7 +106,8 @@ pub fn deindex_pdu(&self, shortroomid: ShortRoomId, pdu_id: &RawPduId, message_b
pub async fn search_pdus<'a>(
&'a self,
query: &'a RoomQuery<'a>,
) -> Result<(usize, impl Stream<Item = impl Event + use<>> + Send + 'a)> {
sender_user: &'a UserId,
) -> Result<(usize, impl Stream<Item = PduEvent> + Send + 'a)> {
let pdu_ids: Vec<_> = self.search_pdu_ids(query).await?.collect().await;
let filter = &query.criteria.filter;
@@ -129,7 +132,23 @@ pub async fn search_pdus<'a>(
.then_some(pdu)
})
.skip(query.skip)
.take(query.limit);
.take(query.limit)
.map(move |mut pdu| {
pdu.set_unsigned(query.user_id);
pdu
})
.then(async move |mut pdu| {
if let Err(e) = self
.services
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
});
Ok((count, pdus))
}

View File

@@ -1,10 +1,18 @@
use std::{borrow::Borrow, fmt::Debug, mem::size_of_val, sync::Arc};
pub use conduwuit::matrix::pdu::{ShortEventId, ShortId, ShortRoomId, ShortStateKey};
use conduwuit::{Result, err, implement, matrix::StateKey, utils, utils::IterStream};
use conduwuit::{
Result, err, implement,
matrix::StateKey,
pair_of,
utils::{self, IterStream, ReadyExt},
};
use database::{Deserialized, Get, Map, Qry};
use futures::{Stream, StreamExt};
use ruma::{EventId, RoomId, events::StateEventType};
use futures::{
Stream, StreamExt,
stream::{self},
};
use ruma::{EventId, OwnedEventId, RoomId, events::StateEventType};
use serde::Deserialize;
use crate::{Dep, globals};
@@ -258,3 +266,23 @@ pub async fn get_or_create_shortroomid(&self, room_id: &RoomId) -> ShortRoomId {
short
})
}
#[implement(Service)]
pub async fn multi_get_state_from_short<'a, S>(
&'a self,
short_state: S,
) -> impl Stream<Item = Result<((StateEventType, StateKey), OwnedEventId)>> + Send + 'a
where
S: Stream<Item = (ShortStateKey, ShortEventId)> + Send + 'a,
{
let (short_state_keys, short_event_ids): pair_of!(Vec<_>) = short_state.unzip().await;
StreamExt::zip(
self.multi_get_statekey_from_short(stream::iter(short_state_keys.into_iter())),
self.multi_get_eventid_from_short(stream::iter(short_event_ids.into_iter())),
)
.ready_filter_map(|state_event| match state_event {
| (Ok(state_key), Ok(event_id)) => Some(Ok((state_key, event_id))),
| (Err(e), _) | (_, Err(e)) => Some(Err(e)),
})
}

View File

@@ -264,6 +264,8 @@ fn get_space_child_events<'a>(
if content.via.is_empty() {
return None;
}
} else {
return None;
}
if RoomId::parse(&state_key).is_err() {

View File

@@ -20,7 +20,7 @@
EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, UserId,
events::{
AnyStrippedStateEvent, StateEventType, TimelineEventType,
room::{create::RoomCreateEventContent, member::RoomMemberEventContent},
room::create::RoomCreateEventContent,
},
serde::Raw,
};
@@ -126,21 +126,9 @@ pub async fn force_state(
continue;
};
let Ok(membership_event) = pdu.get_content::<RoomMemberEventContent>() else {
continue;
};
self.services
.state_cache
.update_membership(
room_id,
user_id,
membership_event,
&pdu.sender,
None,
None,
false,
)
.update_membership(room_id, user_id, &pdu, false)
.await?;
},
| TimelineEventType::SpaceChild => {

View File

@@ -1,7 +1,7 @@
use std::{borrow::Borrow, ops::Deref, sync::Arc};
use conduwuit::{
Result, at, err, implement,
Pdu, Result, at, err, implement,
matrix::{Event, StateKey},
pair_of,
utils::{
@@ -10,7 +10,7 @@
},
};
use database::Deserialized;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, future::try_join, pin_mut};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, pin_mut};
use ruma::{
EventId, OwnedEventId, UserId,
events::{
@@ -125,7 +125,7 @@ pub async fn state_get(
shortstatehash: ShortStateHash,
event_type: &StateEventType,
state_key: &str,
) -> Result<impl Event> {
) -> Result<Pdu> {
self.state_get_id(shortstatehash, event_type, state_key)
.and_then(async |event_id: OwnedEventId| self.services.timeline.get_pdu(&event_id).await)
.await
@@ -286,28 +286,28 @@ pub fn state_keys<'a>(
/// not in .1)
#[implement(super::Service)]
#[inline]
pub fn state_removed(
pub async fn state_removed(
&self,
shortstatehash: pair_of!(ShortStateHash),
) -> impl Stream<Item = (ShortStateKey, ShortEventId)> + Send + '_ {
self.state_added((shortstatehash.1, shortstatehash.0))
) -> Result<Vec<(ShortStateKey, ShortEventId)>> {
self.state_added((shortstatehash.1, shortstatehash.0)).await
}
/// Returns the state events added between the interval (present in .1 but
/// not in .0)
#[implement(super::Service)]
pub fn state_added(
pub async fn state_added(
&self,
shortstatehash: pair_of!(ShortStateHash),
) -> impl Stream<Item = (ShortStateKey, ShortEventId)> + Send + '_ {
let a = self.load_full_state(shortstatehash.0);
let b = self.load_full_state(shortstatehash.1);
try_join(a, b)
.map_ok(|(a, b)| b.difference(&a).copied().collect::<Vec<_>>())
.map_ok(IterStream::try_stream)
.try_flatten_stream()
.ignore_err()
) -> Result<Vec<(ShortStateKey, ShortEventId)>> {
let full_state_a = self.load_full_state(shortstatehash.0).await?;
let full_state_b = self.load_full_state(shortstatehash.1).await?;
Ok(full_state_b
.difference(&full_state_a)
.copied()
.map(parse_compressed_state_event)
.collect())
}
#[implement(super::Service)]

View File

@@ -4,7 +4,7 @@
use std::{collections::HashMap, sync::Arc};
use conduwuit::{
Result, SyncRwLock, implement,
Pdu, Result, SyncRwLock, implement,
result::LogErr,
utils::{ReadyExt, stream::TryIgnore},
warn,
@@ -13,7 +13,7 @@
use futures::{Stream, StreamExt, future::join5, pin_mut};
use ruma::{
OwnedRoomId, OwnedUserId, RoomId, ServerName, UserId,
events::{AnyStrippedStateEvent, AnySyncStateEvent, room::member::MembershipState},
events::{AnyStrippedStateEvent, room::member::MembershipState},
serde::Raw,
};
@@ -54,7 +54,6 @@ struct Data {
type AppServiceInRoomCache = SyncRwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>;
type StrippedStateEventItem = (OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>);
type SyncStateEventItem = (OwnedRoomId, Vec<Raw<AnySyncStateEvent>>);
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
@@ -431,18 +430,9 @@ pub async fn knock_state(
#[implement(Service)]
#[tracing::instrument(skip(self), level = "trace")]
pub async fn left_state(
&self,
user_id: &UserId,
room_id: &RoomId,
) -> Result<Vec<Raw<AnyStrippedStateEvent>>> {
pub async fn left_state(&self, user_id: &UserId, room_id: &RoomId) -> Result<Option<Pdu>> {
let key = (user_id, room_id);
self.db
.userroomid_leftstate
.qry(&key)
.await
.deserialized()
.and_then(|val: Raw<Vec<AnyStrippedStateEvent>>| val.deserialize_as().map_err(Into::into))
self.db.userroomid_leftstate.qry(&key).await.deserialized()
}
/// Returns an iterator over all rooms a user left.
@@ -451,8 +441,8 @@ pub async fn left_state(
pub fn rooms_left<'a>(
&'a self,
user_id: &'a UserId,
) -> impl Stream<Item = SyncStateEventItem> + Send + 'a {
type KeyVal<'a> = (Key<'a>, Raw<Vec<Raw<AnySyncStateEvent>>>);
) -> impl Stream<Item = (OwnedRoomId, Option<Pdu>)> + Send + 'a {
type KeyVal<'a> = (Key<'a>, Raw<Option<Pdu>>);
type Key<'a> = (&'a UserId, &'a RoomId);
let prefix = (user_id, Interfix);
@@ -461,7 +451,7 @@ pub fn rooms_left<'a>(
.stream_prefix(&prefix)
.ignore_err()
.map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state))
.map(|(room_id, state)| Ok((room_id, state.deserialize_as()?)))
.map(|(room_id, state)| Ok((room_id, state.deserialize()?)))
.ignore_err()
}

View File

@@ -1,13 +1,13 @@
use std::collections::HashSet;
use conduwuit::{Err, Result, implement, is_not_empty, utils::ReadyExt, warn};
use conduwuit::{Err, Event, Pdu, Result, implement, is_not_empty, utils::ReadyExt, warn};
use database::{Json, serialize_key};
use futures::StreamExt;
use ruma::{
OwnedServerName, RoomId, UserId,
events::{
AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType,
RoomAccountDataEventType, StateEventType,
AnyStrippedStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType,
StateEventType,
direct::DirectEvent,
invite_permission_config::FilterLevel,
room::{
@@ -26,8 +26,7 @@
fields(
%room_id,
%user_id,
%sender,
?membership_event,
?pdu,
),
)]
#[allow(clippy::too_many_arguments)]
@@ -35,13 +34,10 @@ pub async fn update_membership(
&self,
room_id: &RoomId,
user_id: &UserId,
membership_event: RoomMemberEventContent,
sender: &UserId,
last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>,
invite_via: Option<Vec<OwnedServerName>>,
pdu: &Pdu,
update_joined_count: bool,
) -> Result {
let membership = membership_event.membership;
let membership = pdu.get_content::<RoomMemberEventContent>()?;
// Keep track what remote users exist by adding them as "deactivated" users
//
@@ -54,7 +50,7 @@ pub async fn update_membership(
}
}
match &membership {
match &membership.membership {
| MembershipState::Join => {
// Check if the user never joined this room
if !self.once_joined(user_id, room_id).await {
@@ -122,33 +118,14 @@ pub async fn update_membership(
self.mark_as_joined(user_id, room_id);
},
| MembershipState::Invite => {
// return an error for blocked invites. ignored invites aren't handled here
// since the recipient's membership should still be changed to `invite`.
// they're filtered out in the individual /sync handlers
if matches!(
self.services
.users
.invite_filter_level(sender, user_id)
.await,
FilterLevel::Block
) {
return Err!(Request(InviteBlocked(
"{user_id} has blocked invites from {sender}."
)));
}
self.mark_as_invited(user_id, room_id, sender, last_state, invite_via)
.await;
// TODO: make sure that passing None for `last_state` is correct behavior.
// the call from `append_pdu` used to use `services.state.summary_stripped`
// to fill that parameter.
self.mark_as_invited(user_id, room_id, pdu.sender(), None, None)
.await?;
},
| MembershipState::Leave | MembershipState::Ban => {
self.mark_as_left(user_id, room_id);
if self.services.globals.user_is_local(user_id)
&& (self.services.config.forget_forced_upon_leave
|| self.services.metadata.is_banned(room_id).await
|| self.services.metadata.is_disabled(room_id).await)
{
self.forget(room_id, user_id);
}
self.mark_as_left(user_id, room_id, Some(pdu.clone())).await;
},
| _ => {},
}
@@ -252,24 +229,24 @@ pub fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) {
self.db.roomid_inviteviaservers.remove(room_id);
}
/// Direct DB function to directly mark a user as left. It is not
/// recommended to use this directly. You most likely should use
/// `update_membership` instead
/// Mark a user as having left a room.
///
/// `leave_pdu` represents the m.room.member event which the user sent to leave
/// the room. If this is None, no event was actually sent, but we must still
/// behave as if the user is no longer in the room. This may occur, for example,
/// if the room being left has been server-banned by an administrator.
#[implement(super::Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) {
pub async fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId, leave_pdu: Option<Pdu>) {
let userroom_id = (user_id, room_id);
let userroom_id = serialize_key(userroom_id).expect("failed to serialize userroom_id");
let roomuser_id = (room_id, user_id);
let roomuser_id = serialize_key(roomuser_id).expect("failed to serialize roomuser_id");
// (timo) TODO
let leftstate = Vec::<Raw<AnySyncStateEvent>>::new();
self.db
.userroomid_leftstate
.raw_put(&userroom_id, Json(leftstate));
.raw_put(&userroom_id, Json(leave_pdu));
self.db
.roomuserid_leftcount
.raw_aput::<8, _, _>(&roomuser_id, self.services.globals.next_count().unwrap());
@@ -285,6 +262,14 @@ pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) {
self.db.roomuserid_knockedcount.remove(&roomuser_id);
self.db.roomid_inviteviaservers.remove(room_id);
if self.services.globals.user_is_local(user_id)
&& (self.services.config.forget_forced_upon_leave
|| self.services.metadata.is_banned(room_id).await
|| self.services.metadata.is_disabled(room_id).await)
{
self.forget(room_id, user_id);
}
}
/// Direct DB function to directly mark a user as knocked. It is not
@@ -351,7 +336,20 @@ pub async fn mark_as_invited(
sender_user: &UserId,
last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>,
invite_via: Option<Vec<OwnedServerName>>,
) {
) -> Result<()> {
// return an error for blocked invites. ignored invites aren't handled here
// since the recipient's membership should still be changed to `invite`.
// they're filtered out in the individual /sync handlers
if matches!(
self.services
.users
.invite_filter_level(sender_user, user_id)
.await,
FilterLevel::Block
) {
return Err!(Request(InviteBlocked("{user_id} has blocked invites from {sender_user}.")));
}
let roomuser_id = (room_id, user_id);
let roomuser_id = serialize_key(roomuser_id).expect("failed to serialize roomuser_id");
@@ -366,7 +364,7 @@ pub async fn mark_as_invited(
.raw_aput::<8, _, _>(&roomuser_id, self.services.globals.next_count().unwrap());
self.db
.userroomid_invitesender
.raw_put(&userroom_id, sender_user);
.insert(&userroom_id, sender_user);
self.db.userroomid_joined.remove(&userroom_id);
self.db.roomuserid_joined.remove(&roomuser_id);
@@ -380,4 +378,6 @@ pub async fn mark_as_invited(
if let Some(servers) = invite_via.filter(is_not_empty!()) {
self.add_servers_invite_via(room_id, servers).await;
}
Ok(())
}

View File

@@ -526,7 +526,7 @@ pub(crate) fn compress_state_event(
#[inline]
#[must_use]
pub(crate) fn parse_compressed_state_event(
pub fn parse_compressed_state_event(
compressed_event: CompressedStateEvent,
) -> (ShortStateKey, ShortEventId) {
use utils::u64_from_u8;

View File

@@ -163,9 +163,7 @@ pub async fn threads_until<'a>(
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
let pdu_id: PduId = pdu_id.into();
if pdu.sender() != user_id {
pdu.as_mut_pdu().remove_transaction_id().ok();
}
pdu.as_mut_pdu().set_unsigned(Some(user_id));
Some((pdu_id.shorteventid, pdu))
});

View File

@@ -19,9 +19,7 @@
GlobalAccountDataEventType, StateEventType, TimelineEventType,
push_rules::PushRulesEvent,
room::{
encrypted::Relation,
member::{MembershipState, RoomMemberEventContent},
power_levels::RoomPowerLevelsEventContent,
encrypted::Relation, power_levels::RoomPowerLevelsEventContent,
redaction::RoomRedactionEventContent,
},
},
@@ -323,31 +321,12 @@ pub async fn append_pdu<'a, Leaves>(
let target_user_id =
UserId::parse(state_key).expect("This state_key was previously validated");
let content: RoomMemberEventContent = pdu.get_content()?;
let stripped_state = match content.membership {
| MembershipState::Invite | MembershipState::Knock => self
.services
.state
.summary_stripped(pdu, room_id)
.await
.into(),
| _ => None,
};
// Update our membership info, we do this here incase a user is invited or
// knocked and immediately leaves we need the DB to record the invite or
// knock event for auth
self.services
.state_cache
.update_membership(
room_id,
target_user_id,
content,
pdu.sender(),
stripped_state,
None,
true,
)
.update_membership(room_id, target_user_id, pdu, true)
.await?;
}
},
@@ -368,6 +347,10 @@ pub async fn append_pdu<'a, Leaves>(
| _ => {},
}
// CONCERN: If we receive events with a relation out-of-order, we never write
// their relation / thread. We need some kind of way to trigger when we receive
// this event, and potentially a way to rebuild the table entirely.
if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>() {
if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await {
self.services

View File

@@ -23,6 +23,40 @@
use super::RoomMutexGuard;
pub fn pdu_fits(owned_obj: &mut CanonicalJsonObject) -> bool {
// room IDs, event IDs, senders, types, and state keys must all be <= 255 bytes
if let Some(CanonicalJsonValue::String(room_id)) = owned_obj.get("room_id") {
if room_id.len() > 255 {
return false;
}
}
if let Some(CanonicalJsonValue::String(event_id)) = owned_obj.get("event_id") {
if event_id.len() > 255 {
return false;
}
}
if let Some(CanonicalJsonValue::String(sender)) = owned_obj.get("sender") {
if sender.len() > 255 {
return false;
}
}
if let Some(CanonicalJsonValue::String(kind)) = owned_obj.get("type") {
if kind.len() > 255 {
return false;
}
}
if let Some(CanonicalJsonValue::String(state_key)) = owned_obj.get("state_key") {
if state_key.len() > 255 {
return false;
}
}
// Now check the full PDU size
match serde_json::to_string(owned_obj) {
| Ok(s) => s.len() <= 65535,
| Err(_) => false,
}
}
#[implement(super::Service)]
pub async fn create_hash_and_sign_event(
&self,
@@ -148,19 +182,6 @@ fn from_evt(
}
}
// if event_type != TimelineEventType::RoomCreate && prev_events.is_empty() {
// return Err!(Request(Unknown("Event incorrectly had zero prev_events.")));
// }
// if state_key.is_none() && depth.lt(&uint!(2)) {
// // The first two events in a room are always m.room.create and
// m.room.member, // so any other events with that same depth are illegal.
// warn!(
// "Had unsafe depth {depth} when creating non-state event in {}. Cowardly
// aborting", room_id.expect("room_id is Some here").as_str()
// );
// return Err!(Request(Unknown("Unsafe depth for non-state event.")));
// }
let mut pdu = PduEvent {
event_id: ruma::event_id!("$thiswillbefilledinlater").into(),
room_id: room_id.map(ToOwned::to_owned),
@@ -269,8 +290,16 @@ fn from_evt(
}
// Generate event id
pdu.event_id = gen_event_id(&pdu_json, &room_version_id)?;
// Check with the policy server
pdu_json.insert("event_id".into(), CanonicalJsonValue::String(pdu.event_id.clone().into()));
// Verify that the *full* PDU isn't over 64KiB.
// Ruma only validates that it's under 64KiB before signing and hashing.
// Has to be cloned to prevent mutating pdu_json itself :(
if !pdu_fits(&mut pdu_json.clone()) {
// feckin huge PDU mate
return Err!(Request(TooLarge("Message/PDU is too long (exceeds 65535 bytes)")));
}
// Check with the policy server
if room_id.is_some() {
trace!(
"Checking event in room {} with policy server",
@@ -279,7 +308,7 @@ fn from_evt(
match self
.services
.event_handler
.ask_policy_server(&pdu, &pdu_json, pdu.room_id().expect("has room ID"))
.ask_policy_server(&pdu, &mut pdu_json, pdu.room_id().expect("has room ID"), false)
.await
{
| Ok(true) => {},

View File

@@ -1,13 +1,13 @@
use std::{borrow::Borrow, sync::Arc};
use std::sync::Arc;
use conduwuit::{
Err, PduCount, PduEvent, Result, at, err,
result::{LogErr, NotFound},
result::NotFound,
utils::{self, stream::TryReadyExt},
};
use database::{Database, Deserialized, Json, KeyVal, Map};
use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction};
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, api::Direction};
use super::{PduId, RawPduId};
use crate::{Dep, rooms, rooms::short::ShortRoomId};
@@ -45,12 +45,8 @@ pub(super) fn new(args: &crate::Args<'_>) -> Self {
}
#[inline]
pub(super) async fn last_timeline_count(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pub(super) async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
pin_mut!(pdus_rev);
let last_count = pdus_rev
@@ -64,12 +60,8 @@ pub(super) async fn last_timeline_count(
}
#[inline]
pub(super) async fn latest_pdu_in_room(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduEvent> {
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pub(super) async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
pin_mut!(pdus_rev);
pdus_rev
@@ -221,7 +213,6 @@ pub(super) async fn replace_pdu(
/// order.
pub(super) fn pdus_rev<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
until: PduCount,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
@@ -231,14 +222,13 @@ pub(super) fn pdus_rev<'a>(
self.pduid_pdu
.rev_raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(move |item| Self::each_pdu(item, user_id))
.ready_and_then(Self::from_json_slice)
})
.try_flatten_stream()
}
pub(super) fn pdus<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
from: PduCount,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
@@ -248,21 +238,15 @@ pub(super) fn pdus<'a>(
self.pduid_pdu
.raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(move |item| Self::each_pdu(item, user_id))
.ready_and_then(Self::from_json_slice)
})
.try_flatten_stream()
}
fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
fn from_json_slice((pdu_id, pdu): KeyVal<'_>) -> Result<PdusIterItem> {
let pdu_id: RawPduId = pdu_id.into();
let mut pdu = serde_json::from_slice::<PduEvent>(pdu)?;
if Some(pdu.sender.borrow()) != user_id {
pdu.remove_transaction_id().log_err().ok();
}
pdu.add_age().log_err().ok();
let pdu = serde_json::from_slice::<PduEvent>(pdu)?;
Ok((pdu_id.pdu_count(), pdu))
}

View File

@@ -20,13 +20,13 @@
};
use futures::{Future, Stream, TryStreamExt, pin_mut};
use ruma::{
CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, RoomId,
events::room::encrypted::Relation,
};
use serde::Deserialize;
use self::data::Data;
pub use self::data::PdusIterItem;
pub use self::{create::pdu_fits, data::PdusIterItem};
use crate::{
Dep, account_data, admin, appservice, globals, pusher, rooms, sending, server_keys, users,
};
@@ -138,7 +138,7 @@ pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<impl Event> {
#[tracing::instrument(skip(self), level = "debug")]
pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, impl Event)> {
let pdus = self.pdus(None, room_id, None);
let pdus = self.pdus(room_id, None);
pin_mut!(pdus);
pdus.try_next()
@@ -148,16 +148,12 @@ pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, im
#[tracing::instrument(skip(self), level = "debug")]
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<impl Event> {
self.db.latest_pdu_in_room(None, room_id).await
self.db.latest_pdu_in_room(room_id).await
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn last_timeline_count(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
self.db.last_timeline_count(sender_user, room_id).await
pub async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
self.db.last_timeline_count(room_id).await
}
/// Returns the `count` of this pdu's id.
@@ -186,10 +182,8 @@ pub async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> {
}
/// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
#[inline]
pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<impl Event> {
pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
self.db.get_non_outlier_pdu(event_id).await
}
@@ -237,33 +231,29 @@ pub async fn replace_pdu(&self, pdu_id: &RawPduId, pdu_json: &CanonicalJsonObjec
#[inline]
pub fn all_pdus<'a>(
&'a self,
user_id: &'a UserId,
room_id: &'a RoomId,
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
self.pdus(Some(user_id), room_id, None).ignore_err()
self.pdus(room_id, None).ignore_err()
}
/// Reverse iteration starting at from.
/// Reverse iteration starting after `until`.
#[tracing::instrument(skip(self), level = "debug")]
pub fn pdus_rev<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
until: Option<PduCount>,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.db
.pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max))
.pdus_rev(room_id, until.unwrap_or_else(PduCount::max))
}
/// Forward iteration starting at from.
/// Forward iteration starting after `from`.
#[tracing::instrument(skip(self), level = "debug")]
pub fn pdus<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
from: Option<PduCount>,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.db
.pdus(user_id, room_id, from.unwrap_or_else(PduCount::min))
self.db.pdus(room_id, from.unwrap_or_else(PduCount::min))
}
}

View File

@@ -179,18 +179,15 @@ pub async fn last_typing_update(&self, room_id: &RoomId) -> Result<u64> {
.unwrap_or(0))
}
/// Returns a new typing EDU.
pub async fn typings_all(
pub async fn typing_users_for_user(
&self,
room_id: &RoomId,
sender_user: &UserId,
) -> Result<SyncEphemeralRoomEvent<ruma::events::typing::TypingEventContent>> {
) -> Result<Vec<OwnedUserId>> {
let room_typing_indicators = self.typing.read().await.get(room_id).cloned();
let Some(typing_indicators) = room_typing_indicators else {
return Ok(SyncEphemeralRoomEvent {
content: ruma::events::typing::TypingEventContent { user_ids: Vec::new() },
});
return Ok(Vec::new());
};
let user_ids: Vec<_> = typing_indicators
@@ -207,8 +204,19 @@ pub async fn typings_all(
.collect()
.await;
Ok(user_ids)
}
/// Returns a new typing EDU.
pub async fn typings_event_for_user(
&self,
room_id: &RoomId,
sender_user: &UserId,
) -> Result<SyncEphemeralRoomEvent<ruma::events::typing::TypingEventContent>> {
Ok(SyncEphemeralRoomEvent {
content: ruma::events::typing::TypingEventContent { user_ids },
content: ruma::events::typing::TypingEventContent {
user_ids: self.typing_users_for_user(room_id, sender_user).await?,
},
})
}

View File

@@ -423,7 +423,7 @@ async fn select_edus_device_changes(
let keys_changed = self
.services
.users
.room_keys_changed(room_id, since.0, None)
.room_keys_changed(room_id, Some(since.0), None)
.ready_filter(|(user_id, _)| self.services.globals.user_is_local(user_id));
pin_mut!(keys_changed);
@@ -520,7 +520,7 @@ async fn select_edus_receipts_room(
let receipts = self
.services
.read_receipt
.readreceipts_since(room_id, since.0);
.readreceipts_since(room_id, Some(since.0));
pin_mut!(receipts);
let mut read = BTreeMap::<OwnedUserId, ReceiptData>::new();
@@ -530,7 +530,7 @@ async fn select_edus_receipts_room(
}
max_edu_count.fetch_max(count, Ordering::Relaxed);
if !self.services.globals.user_is_local(user_id) {
if !self.services.globals.user_is_local(&user_id) {
continue;
}
@@ -554,7 +554,7 @@ async fn select_edus_receipts_room(
let receipt = receipt
.remove(&ReceiptType::Read)
.expect("our read receipts always set this")
.remove(user_id)
.remove(&user_id)
.expect("our read receipts always have the user here");
let receipt_data = ReceiptData {
@@ -562,7 +562,7 @@ async fn select_edus_receipts_room(
event_ids: vec![event_id.clone()],
};
if read.insert(user_id.to_owned(), receipt_data).is_none() {
if read.insert(user_id, receipt_data).is_none() {
*num = num.saturating_add(1);
if *num >= SELECT_RECEIPT_LIMIT {
break;
@@ -781,7 +781,7 @@ async fn send_events_dest_push(
for pdu in pdus {
// Redacted events are not notification targets (we don't send push for them)
if pdu.contains_unsigned_property("redacted_because", serde_json::Value::is_string) {
if pdu.is_redacted() {
continue;
}

View File

@@ -11,7 +11,7 @@
use ruma::{
CanonicalJsonValue, DeviceId, OwnedDeviceId, OwnedUserId, UserId,
api::client::{
error::ErrorKind,
error::{ErrorKind, StandardErrorBody},
uiaa::{AuthData, AuthType, Password, UiaaInfo, UserIdentifier},
},
};
@@ -104,6 +104,7 @@ pub fn create(
}
#[implement(Service)]
#[allow(clippy::useless_let_if_seq)]
pub async fn try_auth(
&self,
user_id: &UserId,
@@ -163,17 +164,39 @@ pub async fn try_auth(
let user_id = user_id_from_username;
// Check if password is correct
let mut password_verified = false;
// First try local password hash verification
if let Ok(hash) = self.services.users.password_hash(&user_id).await {
let hash_matches = hash::verify_password(password, &hash).is_ok();
if !hash_matches {
uiaainfo.auth_error = Some(ruma::api::client::error::StandardErrorBody {
kind: ErrorKind::forbidden(),
message: "Invalid username or password.".to_owned(),
});
return Ok((false, uiaainfo));
password_verified = hash::verify_password(password, &hash).is_ok();
}
// If local password verification failed, try LDAP authentication
#[cfg(feature = "ldap")]
if !password_verified && self.services.config.ldap.enable {
// Search for user in LDAP to get their DN
if let Ok(dns) = self.services.users.search_ldap(&user_id).await {
if let Some((user_dn, _is_admin)) = dns.first() {
// Try to authenticate with LDAP
password_verified = self
.services
.users
.auth_ldap(user_dn, password)
.await
.is_ok();
}
}
}
if !password_verified {
uiaainfo.auth_error = Some(StandardErrorBody {
kind: ErrorKind::forbidden(),
message: "Invalid username or password.".to_owned(),
});
return Ok((false, uiaainfo));
}
// Password was correct! Let's add it to `completed`
uiaainfo.completed.push(AuthType::Password);
},
@@ -197,7 +220,7 @@ pub async fn try_auth(
},
| Err(e) => {
error!("ReCaptcha verification failed: {e:?}");
uiaainfo.auth_error = Some(ruma::api::client::error::StandardErrorBody {
uiaainfo.auth_error = Some(StandardErrorBody {
kind: ErrorKind::forbidden(),
message: "ReCaptcha verification failed.".to_owned(),
});
@@ -210,7 +233,7 @@ pub async fn try_auth(
if tokens.contains(t.token.trim()) {
uiaainfo.completed.push(AuthType::RegistrationToken);
} else {
uiaainfo.auth_error = Some(ruma::api::client::error::StandardErrorBody {
uiaainfo.auth_error = Some(StandardErrorBody {
kind: ErrorKind::forbidden(),
message: "Invalid registration token.".to_owned(),
});

View File

@@ -1,6 +1,6 @@
#[cfg(feature = "ldap")]
use std::collections::HashMap;
use std::{collections::BTreeMap, mem, sync::Arc};
use std::{collections::BTreeMap, mem, net::IpAddr, sync::Arc};
#[cfg(feature = "ldap")]
use conduwuit::result::LogErr;
@@ -25,6 +25,7 @@
invite_permission_config::{FilterLevel, InvitePermissionConfigEvent},
},
serde::Raw,
uint,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
@@ -790,7 +791,7 @@ pub async fn sign_key(
pub fn keys_changed<'a>(
&'a self,
user_id: &'a UserId,
from: u64,
from: Option<u64>,
to: Option<u64>,
) -> impl Stream<Item = &'a UserId> + Send + 'a {
self.keys_changed_user_or_room(user_id.as_str(), from, to)
@@ -801,7 +802,7 @@ pub fn keys_changed<'a>(
pub fn room_keys_changed<'a>(
&'a self,
room_id: &'a RoomId,
from: u64,
from: Option<u64>,
to: Option<u64>,
) -> impl Stream<Item = (&'a UserId, u64)> + Send + 'a {
self.keys_changed_user_or_room(room_id.as_str(), from, to)
@@ -810,11 +811,12 @@ pub fn room_keys_changed<'a>(
fn keys_changed_user_or_room<'a>(
&'a self,
user_or_room_id: &'a str,
from: u64,
from: Option<u64>,
to: Option<u64>,
) -> impl Stream<Item = (&'a UserId, u64)> + Send + 'a {
type KeyVal<'a> = ((&'a str, u64), &'a UserId);
let from = from.unwrap_or(0);
let to = to.unwrap_or(u64::MAX);
let start = (user_or_room_id, from.saturating_add(1));
self.db
@@ -979,6 +981,7 @@ pub async fn remove_to_device_events<Until>(
.await;
}
/// Updates device metadata and increments the device list version.
pub async fn update_device_metadata(
&self,
user_id: &UserId,
@@ -986,13 +989,51 @@ pub async fn update_device_metadata(
device: &Device,
) -> Result<()> {
increment(&self.db.userid_devicelistversion, user_id.as_bytes());
self.update_device_metadata_no_increment(user_id, device_id, device)
.await
}
// Updates device metadata without incrementing the device list version.
// This is namely used for updating the last_seen_ip and last_seen_ts values,
// as those do not need a device list version bump due to them not being
// relevant to other consumers.
pub async fn update_device_metadata_no_increment(
&self,
user_id: &UserId,
device_id: &DeviceId,
device: &Device,
) -> Result<()> {
let key = (user_id, device_id);
self.db.userdeviceid_metadata.put(key, Json(device));
Ok(())
}
pub async fn update_device_last_seen(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
ip: IpAddr,
) {
let now = MilliSecondsSinceUnixEpoch::now();
if let Some(device_id) = device_id {
if let Ok(mut device) = self.get_device_metadata(user_id, device_id).await {
device.last_seen_ip = Some(ip.to_string());
// If the last update was less than 10 seconds ago, don't update the timestamp
if let Some(prev) = device.last_seen_ts {
if now.get().saturating_sub(prev.get()) < uint!(10_000) {
return;
}
}
device.last_seen_ts = Some(now);
self.update_device_metadata_no_increment(user_id, device_id, &device)
.await
.ok();
}
}
}
/// Get device metadata.
pub async fn get_device_metadata(
&self,

View File

@@ -96,3 +96,12 @@ img {
max-height: 35vh;
max-width: none; /* Having this set causes slight aspect ratio breakage */
}
/* fix navigation bar resizing when scrolling */
body:not(.notTopArrived) header.rp-nav {
border-bottom: 1px solid transparent !important;
}
/* fix the small logo on the top left looking blurry */
.rspress-logo {
height: 32px;
}