mirror of
https://forgejo.ellis.link/continuwuation/continuwuity/
synced 2026-04-12 16:35:39 +00:00
Compare commits
39 Commits
ginger/dat
...
nex/fix/fe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
49ce6b4072 | ||
|
|
f6ad1787a0 | ||
|
|
81ff8f1bd3 | ||
|
|
04980b3ee7 | ||
|
|
1237e60aaf | ||
|
|
9b4845bf8d | ||
|
|
fb5b515f96 | ||
|
|
e6336d694a | ||
|
|
b7841280d9 | ||
|
|
f4ccb81913 | ||
|
|
710cdfeadb | ||
|
|
666849ea87 | ||
|
|
71094803f1 | ||
|
|
bf91ce5c7f | ||
|
|
8fd15f26ce | ||
|
|
705fa6c5c6 | ||
|
|
6f67c27538 | ||
|
|
8586d747d1 | ||
|
|
11012a9ce1 | ||
|
|
07be190507 | ||
|
|
ae4acc9568 | ||
|
|
f83ddecd8c | ||
|
|
dd87232f1f | ||
|
|
8e33f9a7d0 | ||
|
|
8d3e4eba99 | ||
|
|
96bfdb97da | ||
|
|
b61010da47 | ||
|
|
987c5eeb03 | ||
|
|
7fa4fa9862 | ||
|
|
b2bead67ac | ||
|
|
48a6a475ce | ||
|
|
86450da705 | ||
|
|
8538b21860 | ||
|
|
63e4aacd2b | ||
|
|
72f0eb9493 | ||
|
|
867d0ab671 | ||
|
|
64e187e5b4 | ||
|
|
5dc449a87a | ||
|
|
f5fda01013 |
@@ -32,11 +32,13 @@ outputs:
|
||||
runs:
|
||||
using: composite
|
||||
steps:
|
||||
- run: mkdir -p digests
|
||||
shell: bash
|
||||
- name: Download digests
|
||||
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
|
||||
uses: forgejo/download-artifact@v4
|
||||
with:
|
||||
path: /tmp/digests
|
||||
path: digests
|
||||
pattern: ${{ inputs.digest_pattern }}
|
||||
merge-multiple: true
|
||||
|
||||
@@ -78,7 +80,7 @@ runs:
|
||||
|
||||
- name: Create manifest list and push
|
||||
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
|
||||
working-directory: /tmp/digests
|
||||
working-directory: digests
|
||||
shell: bash
|
||||
env:
|
||||
IMAGES: ${{ inputs.images }}
|
||||
|
||||
@@ -54,7 +54,7 @@ runs:
|
||||
run: mv /tmp/binaries/sbin/conduwuit /tmp/binaries/conduwuit${{ inputs.cpu_suffix }}-${{ inputs.slug }}${{ inputs.artifact_suffix }}
|
||||
|
||||
- name: Upload binary artifact
|
||||
uses: forgejo/upload-artifact@v3
|
||||
uses: forgejo/upload-artifact@v4
|
||||
with:
|
||||
name: conduwuit${{ inputs.cpu_suffix }}-${{ inputs.slug }}${{ inputs.artifact_suffix }}
|
||||
path: /tmp/binaries/conduwuit${{ inputs.cpu_suffix }}-${{ inputs.slug }}${{ inputs.artifact_suffix }}
|
||||
@@ -62,7 +62,7 @@ runs:
|
||||
|
||||
- name: Upload digest
|
||||
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
|
||||
uses: forgejo/upload-artifact@v3
|
||||
uses: forgejo/upload-artifact@v4
|
||||
with:
|
||||
name: digests${{ inputs.digest_suffix }}-${{ inputs.slug }}${{ inputs.cpu_suffix }}
|
||||
path: /tmp/digests/*
|
||||
|
||||
@@ -127,7 +127,7 @@ jobs:
|
||||
[ -f /etc/conduwuit/conduwuit.toml ] && echo "✅ Config file installed"
|
||||
|
||||
- name: Upload deb artifact
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: forgejo/upload-artifact@v4
|
||||
with:
|
||||
name: continuwuity-${{ steps.debian-version.outputs.distribution }}
|
||||
path: ${{ steps.cargo-deb.outputs.path }}
|
||||
|
||||
@@ -239,13 +239,13 @@ jobs:
|
||||
cp $BIN_RPM upload-bin/
|
||||
|
||||
- name: Upload binary RPM
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: forgejo/upload-artifact@v4
|
||||
with:
|
||||
name: continuwuity
|
||||
path: upload-bin/
|
||||
|
||||
- name: Upload debug RPM artifact
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: forgejo/upload-artifact@v4
|
||||
with:
|
||||
name: continuwuity-debug
|
||||
path: artifacts/*debuginfo*.rpm
|
||||
|
||||
@@ -109,7 +109,7 @@ jobs:
|
||||
cat ./element-web/webapp/config.json
|
||||
|
||||
- name: 📤 Upload Artifact
|
||||
uses: forgejo/upload-artifact@v3
|
||||
uses: forgejo/upload-artifact@v4
|
||||
with:
|
||||
name: element-web
|
||||
path: ./element-web/webapp/
|
||||
|
||||
4
.mailmap
4
.mailmap
@@ -2,6 +2,7 @@ AlexPewMaster <git@alex.unbox.at> <68469103+AlexPewMaster@users.noreply.github.c
|
||||
Daniel Wiesenberg <weasy@hotmail.de> <weasy666@gmail.com>
|
||||
Devin Ragotzy <devin.ragotzy@gmail.com> <d6ragotzy@wmich.edu>
|
||||
Devin Ragotzy <devin.ragotzy@gmail.com> <dragotzy7460@mail.kvcc.edu>
|
||||
Ginger <ginger@gingershaped.computer> <75683114+gingershaped@users.noreply.github.com>
|
||||
Jonas Platte <jplatte+git@posteo.de> <jplatte+gitlab@posteo.de>
|
||||
Jonas Zohren <git-pbkyr@jzohren.de> <gitlab-jfowl-0ux98@sh14.de>
|
||||
Jonathan de Jong <jonathan@automatia.nl> <jonathandejong02@gmail.com>
|
||||
@@ -12,5 +13,6 @@ Olivia Lee <olivia@computer.surgery> <benjamin@computer.surgery>
|
||||
Rudi Floren <rudi.floren@gmail.com> <rudi.floren@googlemail.com>
|
||||
Tamara Schmitz <tamara.zoe.schmitz@posteo.de> <15906939+tamara-schmitz@users.noreply.github.com>
|
||||
Timo Kösters <timo@koesters.xyz>
|
||||
nexy7574 <git@nexy7574.co.uk> <nex@noreply.forgejo.ellis.link>
|
||||
nexy7574 <git@nexy7574.co.uk> <nex@noreply.localhost>
|
||||
x4u <xi.zhu@protonmail.ch> <14617923-x4u@users.noreply.gitlab.com>
|
||||
Ginger <ginger@gingershaped.computer> <75683114+gingershaped@users.noreply.github.com>
|
||||
|
||||
46
Cargo.lock
generated
46
Cargo.lock
generated
@@ -940,7 +940,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "conduwuit"
|
||||
version = "0.5.0-rc.8.1"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"conduwuit_admin",
|
||||
@@ -972,7 +972,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "conduwuit_admin"
|
||||
version = "0.5.0-rc.8.1"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"conduwuit_api",
|
||||
@@ -994,7 +994,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "conduwuit_api"
|
||||
version = "0.5.0-rc.8.1"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"axum 0.7.9",
|
||||
@@ -1027,14 +1027,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "conduwuit_build_metadata"
|
||||
version = "0.5.0-rc.8.1"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"built",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "conduwuit_core"
|
||||
version = "0.5.0-rc.8.1"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"argon2",
|
||||
"arrayvec",
|
||||
@@ -1095,7 +1095,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "conduwuit_database"
|
||||
version = "0.5.0-rc.8.1"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"async-channel",
|
||||
"conduwuit_core",
|
||||
@@ -1114,7 +1114,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "conduwuit_macros"
|
||||
version = "0.5.0-rc.8.1"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"itertools 0.14.0",
|
||||
"proc-macro2",
|
||||
@@ -1124,7 +1124,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "conduwuit_router"
|
||||
version = "0.5.0-rc.8.1"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"axum 0.7.9",
|
||||
"axum-client-ip",
|
||||
@@ -1159,7 +1159,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "conduwuit_service"
|
||||
version = "0.5.0-rc.8.1"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.22.1",
|
||||
@@ -1200,7 +1200,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "conduwuit_web"
|
||||
version = "0.5.0-rc.8.1"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"askama",
|
||||
"axum 0.7.9",
|
||||
@@ -4063,7 +4063,7 @@ checksum = "88f8660c1ff60292143c98d08fc6e2f654d722db50410e3f3797d40baaf9d8f3"
|
||||
[[package]]
|
||||
name = "ruma"
|
||||
version = "0.10.1"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
|
||||
dependencies = [
|
||||
"assign",
|
||||
"js_int",
|
||||
@@ -4083,7 +4083,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-appservice-api"
|
||||
version = "0.10.0"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
|
||||
dependencies = [
|
||||
"js_int",
|
||||
"ruma-common",
|
||||
@@ -4095,7 +4095,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-client-api"
|
||||
version = "0.18.0"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
|
||||
dependencies = [
|
||||
"as_variant",
|
||||
"assign",
|
||||
@@ -4118,7 +4118,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-common"
|
||||
version = "0.13.0"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
|
||||
dependencies = [
|
||||
"as_variant",
|
||||
"base64 0.22.1",
|
||||
@@ -4150,7 +4150,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-events"
|
||||
version = "0.28.1"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
|
||||
dependencies = [
|
||||
"as_variant",
|
||||
"indexmap",
|
||||
@@ -4175,7 +4175,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-federation-api"
|
||||
version = "0.9.0"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"headers",
|
||||
@@ -4197,7 +4197,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-identifiers-validation"
|
||||
version = "0.9.5"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
|
||||
dependencies = [
|
||||
"js_int",
|
||||
"thiserror 2.0.17",
|
||||
@@ -4206,7 +4206,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-identity-service-api"
|
||||
version = "0.9.0"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
|
||||
dependencies = [
|
||||
"js_int",
|
||||
"ruma-common",
|
||||
@@ -4216,7 +4216,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-macros"
|
||||
version = "0.13.0"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"proc-macro-crate",
|
||||
@@ -4231,7 +4231,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-push-gateway-api"
|
||||
version = "0.9.0"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
|
||||
dependencies = [
|
||||
"js_int",
|
||||
"ruma-common",
|
||||
@@ -4243,7 +4243,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "ruma-signatures"
|
||||
version = "0.15.0"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=50b2a91b2ab8f9830eea80b9911e11234e0eac66#50b2a91b2ab8f9830eea80b9911e11234e0eac66"
|
||||
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=3a6f29fda2c3ccf07282c746dc0e2021defc50bb#3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"ed25519-dalek",
|
||||
@@ -6204,7 +6204,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "xtask"
|
||||
version = "0.5.0-rc.8.1"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"serde",
|
||||
@@ -6213,7 +6213,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "xtask-generate-commands"
|
||||
version = "0.5.0-rc.8.1"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"clap-markdown",
|
||||
"clap_builder",
|
||||
|
||||
126
Cargo.toml
126
Cargo.toml
@@ -21,7 +21,7 @@ license = "Apache-2.0"
|
||||
readme = "README.md"
|
||||
repository = "https://forgejo.ellis.link/continuwuation/continuwuity"
|
||||
rust-version = "1.86.0"
|
||||
version = "0.5.0-rc.8.1"
|
||||
version = "0.5.0"
|
||||
|
||||
[workspace.metadata.crane]
|
||||
name = "conduwuit"
|
||||
@@ -33,11 +33,11 @@ features = ["serde"]
|
||||
[workspace.dependencies.smallvec]
|
||||
version = "1.14.0"
|
||||
features = [
|
||||
"const_generics",
|
||||
"const_new",
|
||||
"serde",
|
||||
"union",
|
||||
"write",
|
||||
"const_generics",
|
||||
"const_new",
|
||||
"serde",
|
||||
"union",
|
||||
"write",
|
||||
]
|
||||
|
||||
[workspace.dependencies.smallstr]
|
||||
@@ -96,13 +96,13 @@ version = "1.11.1"
|
||||
version = "0.7.9"
|
||||
default-features = false
|
||||
features = [
|
||||
"form",
|
||||
"http1",
|
||||
"http2",
|
||||
"json",
|
||||
"matched-path",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"form",
|
||||
"http1",
|
||||
"http2",
|
||||
"json",
|
||||
"matched-path",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[workspace.dependencies.axum-extra]
|
||||
@@ -149,10 +149,10 @@ features = ["aws_lc_rs"]
|
||||
version = "0.12.15"
|
||||
default-features = false
|
||||
features = [
|
||||
"rustls-tls-native-roots",
|
||||
"socks",
|
||||
"hickory-dns",
|
||||
"http2",
|
||||
"rustls-tls-native-roots",
|
||||
"socks",
|
||||
"hickory-dns",
|
||||
"http2",
|
||||
]
|
||||
|
||||
[workspace.dependencies.serde]
|
||||
@@ -188,18 +188,18 @@ default-features = false
|
||||
version = "0.25.5"
|
||||
default-features = false
|
||||
features = [
|
||||
"jpeg",
|
||||
"png",
|
||||
"gif",
|
||||
"webp",
|
||||
"jpeg",
|
||||
"png",
|
||||
"gif",
|
||||
"webp",
|
||||
]
|
||||
|
||||
[workspace.dependencies.blurhash]
|
||||
version = "0.2.3"
|
||||
default-features = false
|
||||
features = [
|
||||
"fast-linear-to-srgb",
|
||||
"image",
|
||||
"fast-linear-to-srgb",
|
||||
"image",
|
||||
]
|
||||
|
||||
# logging
|
||||
@@ -229,13 +229,13 @@ default-features = false
|
||||
version = "4.5.35"
|
||||
default-features = false
|
||||
features = [
|
||||
"derive",
|
||||
"env",
|
||||
"error-context",
|
||||
"help",
|
||||
"std",
|
||||
"string",
|
||||
"usage",
|
||||
"derive",
|
||||
"env",
|
||||
"error-context",
|
||||
"help",
|
||||
"std",
|
||||
"string",
|
||||
"usage",
|
||||
]
|
||||
|
||||
[workspace.dependencies.futures]
|
||||
@@ -247,15 +247,15 @@ features = ["std", "async-await"]
|
||||
version = "1.44.2"
|
||||
default-features = false
|
||||
features = [
|
||||
"fs",
|
||||
"net",
|
||||
"macros",
|
||||
"sync",
|
||||
"signal",
|
||||
"time",
|
||||
"rt-multi-thread",
|
||||
"io-util",
|
||||
"tracing",
|
||||
"fs",
|
||||
"net",
|
||||
"macros",
|
||||
"sync",
|
||||
"signal",
|
||||
"time",
|
||||
"rt-multi-thread",
|
||||
"io-util",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[workspace.dependencies.tokio-metrics]
|
||||
@@ -280,18 +280,18 @@ default-features = false
|
||||
version = "1.6.0"
|
||||
default-features = false
|
||||
features = [
|
||||
"server",
|
||||
"http1",
|
||||
"http2",
|
||||
"server",
|
||||
"http1",
|
||||
"http2",
|
||||
]
|
||||
|
||||
[workspace.dependencies.hyper-util]
|
||||
version = "=0.1.17"
|
||||
default-features = false
|
||||
features = [
|
||||
"server-auto",
|
||||
"server-graceful",
|
||||
"tokio",
|
||||
"server-auto",
|
||||
"server-graceful",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
# to support multiple variations of setting a config option
|
||||
@@ -310,9 +310,9 @@ features = ["env", "toml"]
|
||||
version = "0.25.1"
|
||||
default-features = false
|
||||
features = [
|
||||
"serde",
|
||||
"system-config",
|
||||
"tokio",
|
||||
"serde",
|
||||
"system-config",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
# Used for conduwuit::Error type
|
||||
@@ -351,7 +351,7 @@ version = "0.1.2"
|
||||
# Used for matrix spec type definitions and helpers
|
||||
[workspace.dependencies.ruma]
|
||||
git = "https://forgejo.ellis.link/continuwuation/ruwuma"
|
||||
rev = "50b2a91b2ab8f9830eea80b9911e11234e0eac66"
|
||||
rev = "3a6f29fda2c3ccf07282c746dc0e2021defc50bb"
|
||||
features = [
|
||||
"compat",
|
||||
"rand",
|
||||
@@ -381,13 +381,13 @@ features = [
|
||||
"unstable-msc4095",
|
||||
"unstable-msc4121",
|
||||
"unstable-msc4125",
|
||||
"unstable-msc4155",
|
||||
"unstable-msc4155",
|
||||
"unstable-msc4186",
|
||||
"unstable-msc4203", # sending to-device events to appservices
|
||||
"unstable-msc4210", # remove legacy mentions
|
||||
"unstable-extensible-events",
|
||||
"unstable-pdu",
|
||||
"unstable-msc4155"
|
||||
"unstable-msc4155"
|
||||
]
|
||||
|
||||
[workspace.dependencies.rust-rocksdb]
|
||||
@@ -395,11 +395,11 @@ git = "https://forgejo.ellis.link/continuwuation/rust-rocksdb-zaidoon1"
|
||||
rev = "61d9d23872197e9ace4a477f2617d5c9f50ecb23"
|
||||
default-features = false
|
||||
features = [
|
||||
"multi-threaded-cf",
|
||||
"mt_static",
|
||||
"lz4",
|
||||
"zstd",
|
||||
"bzip2",
|
||||
"multi-threaded-cf",
|
||||
"mt_static",
|
||||
"lz4",
|
||||
"zstd",
|
||||
"bzip2",
|
||||
]
|
||||
|
||||
[workspace.dependencies.sha2]
|
||||
@@ -458,16 +458,16 @@ git = "https://forgejo.ellis.link/continuwuation/jemallocator"
|
||||
rev = "82af58d6a13ddd5dcdc7d4e91eae3b63292995b8"
|
||||
default-features = false
|
||||
features = [
|
||||
"background_threads_runtime_support",
|
||||
"unprefixed_malloc_on_supported_platforms",
|
||||
"background_threads_runtime_support",
|
||||
"unprefixed_malloc_on_supported_platforms",
|
||||
]
|
||||
[workspace.dependencies.tikv-jemallocator]
|
||||
git = "https://forgejo.ellis.link/continuwuation/jemallocator"
|
||||
rev = "82af58d6a13ddd5dcdc7d4e91eae3b63292995b8"
|
||||
default-features = false
|
||||
features = [
|
||||
"background_threads_runtime_support",
|
||||
"unprefixed_malloc_on_supported_platforms",
|
||||
"background_threads_runtime_support",
|
||||
"unprefixed_malloc_on_supported_platforms",
|
||||
]
|
||||
[workspace.dependencies.tikv-jemalloc-ctl]
|
||||
git = "https://forgejo.ellis.link/continuwuation/jemallocator"
|
||||
@@ -491,9 +491,9 @@ default-features = false
|
||||
version = "0.1.2"
|
||||
default-features = false
|
||||
features = [
|
||||
"static",
|
||||
"gcc",
|
||||
"light",
|
||||
"static",
|
||||
"gcc",
|
||||
"light",
|
||||
]
|
||||
|
||||
[workspace.dependencies.rustyline-async]
|
||||
|
||||
@@ -586,10 +586,13 @@
|
||||
#allow_unstable_room_versions = true
|
||||
|
||||
# Default room version continuwuity will create rooms with.
|
||||
# Note that this has to be a string since the room version is a string
|
||||
# rather than an integer. Forgetting the quotes will make the server fail
|
||||
# to start!
|
||||
#
|
||||
# Per spec, room version 11 is the default.
|
||||
# Per spec, room version "11" is the default.
|
||||
#
|
||||
#default_room_version = 11
|
||||
#default_room_version = "11"
|
||||
|
||||
# Enable OpenTelemetry OTLP tracing export. This replaces the deprecated
|
||||
# Jaeger exporter. Traces will be sent via OTLP to a collector (such as
|
||||
@@ -1512,16 +1515,11 @@
|
||||
#
|
||||
#block_non_admin_invites = false
|
||||
|
||||
# Enable or disable making requests to MSC4284 Policy Servers.
|
||||
# It is recommended you keep this enabled unless you experience frequent
|
||||
# connectivity issues, such as in a restricted networking environment.
|
||||
# This item is undocumented. Please contribute documentation for it.
|
||||
#
|
||||
#enable_msc4284_policy_servers = true
|
||||
|
||||
# Enable running locally generated events through configured MSC4284
|
||||
# policy servers. You may wish to disable this if your server is
|
||||
# single-user for a slight speed benefit in some rooms, but otherwise
|
||||
# should leave it enabled.
|
||||
# This item is undocumented. Please contribute documentation for it.
|
||||
#
|
||||
#policy_server_check_own_events = true
|
||||
|
||||
@@ -1736,6 +1734,12 @@
|
||||
#
|
||||
#ldap = false
|
||||
|
||||
# Configuration for protocol experiments that enable experimental
|
||||
# features. Each one is associated with a matrix spec proposal, a list of
|
||||
# which are published at https://spec.matrix.org/proposals/
|
||||
#
|
||||
#experiments = false
|
||||
|
||||
[global.tls]
|
||||
|
||||
# Path to a valid TLS certificate file.
|
||||
|
||||
@@ -6,12 +6,10 @@
|
||||
"message": "Welcome to Continuwuity! Important announcements about the project will appear here."
|
||||
},
|
||||
{
|
||||
"id": 3,
|
||||
"message": "_taps microphone_ The Continuwuity 0.5.0-rc.7 release is now available, and it's better than ever! **177 commits**, **35 pull requests**, **11 contributors,** and a lot of new stuff!\n\nFor highlights, we've got:\n\n* 🕵️ Full Policy Server support to fight spam!\n* 🚀 Smarter room & space upgrades.\n* 🚫 User suspension tools for better moderation.\n* 🤖 reCaptcha support for safer open registration.\n* 🔍 Ability to disable read receipts & typing indicators.\n* ⚡ Sweeping performance improvements!\n\nGet the [full changelog and downloads on our Forgejo](https://forgejo.ellis.link/continuwuation/continuwuity/releases/tag/v0.5.0-rc.7) - and make sure you're in the [Announcements room](https://matrix.to/#/!releases:continuwuity.org/$hN9z6L2_dTAlPxFLAoXVfo_g8DyYXu4cpvWsSrWhmB0) to get stuff like this sooner."
|
||||
},
|
||||
{
|
||||
"id": 5,
|
||||
"message": "It's a bird! It's a plane! No, it's 0.5.0-rc.8.1!\n\nThis is a minor bugfix update to the rc8 which backports some important fixes from the latest main branch. If you still haven't updated to rc8, you should skip to main. Otherwise, you should upgrade to this bugfix release as soon as possible.\n\nBugfixes backported to this version:\n\n- Resolved several issues with state resolution v2.1 (room version 12)\n- Fixed issues with the `restricted` and `knock_restricted` join rules that would sometimes incorrectly disallow a valid join\n- Fixed the automatic support contact listing being a no-op\n- Fixed upgrading pre-v12 rooms to v12 rooms\n- Fixed policy servers sending the incorrect JSON objects (resulted in false positives)\n- Fixed debug build panic during MSC4133 migration\n\nIt is recommended, if you can and are comfortable with doing so, following updates to the main branch - we're in the run up to the full 0.5.0 release, and more and more bugfixes and new features are being pushed constantly. Please don't forget to join [#announcements:continuwuity.org](https://matrix.to/#/#announcements:continuwuity.org) to receive this news faster and be alerted to other important updates!"
|
||||
"id": 6,
|
||||
"mention_room": true,
|
||||
"date": "2025-12-22",
|
||||
"message": "Continuwuity v0.5.0 has been released. **The release contains a fix for the critical vulnerability [GHSA-22fw-4jq7-g8r8](https://github.com/continuwuity/continuwuity/security/advisories/GHSA-22fw-4jq7-g8r8). Update as soon as possible.**\n\nThis has been *actively exploited* to create fake leave events in the Continuwuity rooms. Please leave and rejoin the rooms to fix any issues this may have caused. \n\n - [Continuwuity (space)](https://matrix.to/#/!PxtzompFuodlyzdCDtV5lzjXs10XIHeOOaq_FYodHyk?via=ellis.link&via=gingershaped.computer&via=continuwuity.org)\n - [Continuwuity](https://matrix.to/#/!kn3VQSLcgWGUFm0FFRid4MinJ_aeZPjHQ0irXbHa3bU?via=ellis.link&via=gingershaped.computer&via=continuwuity.org)\n - [Continuwuity Announcements](https://matrix.to/#/!d7zDZg1Vu5nhkCi50jNfOIObD5fpfGhfl48SZWZek7k?via=ellis.link)\n - [Continuwuity Offtopic](https://matrix.to/#/!QlOomq-suHC9rJHfDFVdbcGg4HS2ojSQ0bo4W2JOGMM?via=ellis.link&via=gingershaped.computer&via=continuwuity.org)\n - [Continuwuity Development](https://matrix.to/#/!aAvealFbgiKTJGzumNbjuwDgt1tOkBKwiyfYqE3ouk0?via=ellis.link&via=explodie.org&via=continuwuity.org)\n"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
54
flake.lock
generated
54
flake.lock
generated
@@ -3,11 +3,11 @@
|
||||
"advisory-db": {
|
||||
"flake": false,
|
||||
"locked": {
|
||||
"lastModified": 1761112158,
|
||||
"narHash": "sha256-RIXu/7eyKpQHjsPuAUODO81I4ni8f+WYSb7K4mTG6+0=",
|
||||
"lastModified": 1766324728,
|
||||
"narHash": "sha256-9C+WyE5U3y5w4WQXxmb0ylRyMMsPyzxielWXSHrcDpE=",
|
||||
"owner": "rustsec",
|
||||
"repo": "advisory-db",
|
||||
"rev": "58f3aaec0e1776f4a900737be8cd7cb00972210d",
|
||||
"rev": "c88b88c62bda077be8aa621d4e89d8701e39cb5d",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@@ -18,11 +18,11 @@
|
||||
},
|
||||
"crane": {
|
||||
"locked": {
|
||||
"lastModified": 1760924934,
|
||||
"narHash": "sha256-tuuqY5aU7cUkR71sO2TraVKK2boYrdW3gCSXUkF4i44=",
|
||||
"lastModified": 1766194365,
|
||||
"narHash": "sha256-4AFsUZ0kl6MXSm4BaQgItD0VGlEKR3iq7gIaL7TjBvc=",
|
||||
"owner": "ipetkov",
|
||||
"repo": "crane",
|
||||
"rev": "c6b4d5308293d0d04fcfeee92705017537cad02f",
|
||||
"rev": "7d8ec2c71771937ab99790b45e6d9b93d15d9379",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@@ -39,11 +39,11 @@
|
||||
"rust-analyzer-src": "rust-analyzer-src"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1761115517,
|
||||
"narHash": "sha256-Fev/ag/c3Fp3JBwHfup3lpA5FlNXfkoshnQ7dssBgJ0=",
|
||||
"lastModified": 1766299592,
|
||||
"narHash": "sha256-7u+q5hexu2eAxL2VjhskHvaUKg+GexmelIR2ve9Nbb4=",
|
||||
"owner": "nix-community",
|
||||
"repo": "fenix",
|
||||
"rev": "320433651636186ea32b387cff05d6bbfa30cea7",
|
||||
"rev": "381579dee168d5ced412e2990e9637ecc7cf1c5d",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@@ -55,11 +55,11 @@
|
||||
"flake-compat": {
|
||||
"flake": false,
|
||||
"locked": {
|
||||
"lastModified": 1747046372,
|
||||
"narHash": "sha256-CIVLLkVgvHYbgI2UpXvIIBJ12HWgX+fjA8Xf8PUmqCY=",
|
||||
"lastModified": 1765121682,
|
||||
"narHash": "sha256-4VBOP18BFeiPkyhy9o4ssBNQEvfvv1kXkasAYd0+rrA=",
|
||||
"owner": "edolstra",
|
||||
"repo": "flake-compat",
|
||||
"rev": "9100a0f413b0c601e0533d1d94ffd501ce2e7885",
|
||||
"rev": "65f23138d8d09a92e30f1e5c87611b23ef451bf3",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@@ -74,11 +74,11 @@
|
||||
"nixpkgs-lib": "nixpkgs-lib"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1760948891,
|
||||
"narHash": "sha256-TmWcdiUUaWk8J4lpjzu4gCGxWY6/Ok7mOK4fIFfBuU4=",
|
||||
"lastModified": 1765835352,
|
||||
"narHash": "sha256-XswHlK/Qtjasvhd1nOa1e8MgZ8GS//jBoTqWtrS1Giw=",
|
||||
"owner": "hercules-ci",
|
||||
"repo": "flake-parts",
|
||||
"rev": "864599284fc7c0ba6357ed89ed5e2cd5040f0c04",
|
||||
"rev": "a34fae9c08a15ad73f295041fec82323541400a9",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@@ -89,11 +89,11 @@
|
||||
},
|
||||
"nixpkgs": {
|
||||
"locked": {
|
||||
"lastModified": 1760878510,
|
||||
"narHash": "sha256-K5Osef2qexezUfs0alLvZ7nQFTGS9DL2oTVsIXsqLgs=",
|
||||
"lastModified": 1766070988,
|
||||
"narHash": "sha256-G/WVghka6c4bAzMhTwT2vjLccg/awmHkdKSd2JrycLc=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "5e2a59a5b1a82f89f2c7e598302a9cacebb72a67",
|
||||
"rev": "c6245e83d836d0433170a16eb185cefe0572f8b8",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@@ -105,11 +105,11 @@
|
||||
},
|
||||
"nixpkgs-lib": {
|
||||
"locked": {
|
||||
"lastModified": 1754788789,
|
||||
"narHash": "sha256-x2rJ+Ovzq0sCMpgfgGaaqgBSwY+LST+WbZ6TytnT9Rk=",
|
||||
"lastModified": 1765674936,
|
||||
"narHash": "sha256-k00uTP4JNfmejrCLJOwdObYC9jHRrr/5M/a/8L2EIdo=",
|
||||
"owner": "nix-community",
|
||||
"repo": "nixpkgs.lib",
|
||||
"rev": "a73b9c743612e4244d865a2fdee11865283c04e6",
|
||||
"rev": "2075416fcb47225d9b68ac469a5c4801a9c4dd85",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@@ -132,11 +132,11 @@
|
||||
"rust-analyzer-src": {
|
||||
"flake": false,
|
||||
"locked": {
|
||||
"lastModified": 1761077270,
|
||||
"narHash": "sha256-O1uTuvI/rUlubJ8AXKyzh1WSWV3qCZX0huTFUvWLN4E=",
|
||||
"lastModified": 1766253897,
|
||||
"narHash": "sha256-ChK07B1aOlJ4QzWXpJo+y8IGAxp1V9yQ2YloJ+RgHRw=",
|
||||
"owner": "rust-lang",
|
||||
"repo": "rust-analyzer",
|
||||
"rev": "39990a923c8bca38f5bd29dc4c96e20ee7808d5d",
|
||||
"rev": "765b7bdb432b3740f2d564afccfae831d5a972e4",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@@ -153,11 +153,11 @@
|
||||
]
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1760945191,
|
||||
"narHash": "sha256-ZRVs8UqikBa4Ki3X4KCnMBtBW0ux1DaT35tgsnB1jM4=",
|
||||
"lastModified": 1766000401,
|
||||
"narHash": "sha256-+cqN4PJz9y0JQXfAK5J1drd0U05D5fcAGhzhfVrDlsI=",
|
||||
"owner": "numtide",
|
||||
"repo": "treefmt-nix",
|
||||
"rev": "f56b1934f5f8fcab8deb5d38d42fd692632b47c2",
|
||||
"rev": "42d96e75aa56a3f70cab7e7dc4a32868db28e8fd",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
|
||||
@@ -6,6 +6,69 @@
|
||||
pkgs,
|
||||
...
|
||||
}:
|
||||
let
|
||||
baseTestScript =
|
||||
pkgs.writers.writePython3Bin "do_test" { libraries = [ pkgs.python3Packages.matrix-nio ]; }
|
||||
''
|
||||
import asyncio
|
||||
import nio
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
# Connect to continuwuity
|
||||
client = nio.AsyncClient("http://continuwuity:6167", "alice")
|
||||
|
||||
# Register as user alice
|
||||
response = await client.register("alice", "my-secret-password")
|
||||
|
||||
# Log in as user alice
|
||||
response = await client.login("my-secret-password")
|
||||
|
||||
# Create a new room
|
||||
response = await client.room_create(federate=False)
|
||||
print("Matrix room create response:", response)
|
||||
assert isinstance(response, nio.RoomCreateResponse)
|
||||
room_id = response.room_id
|
||||
|
||||
# Join the room
|
||||
response = await client.join(room_id)
|
||||
print("Matrix join response:", response)
|
||||
assert isinstance(response, nio.JoinResponse)
|
||||
|
||||
# Send a message to the room
|
||||
response = await client.room_send(
|
||||
room_id=room_id,
|
||||
message_type="m.room.message",
|
||||
content={
|
||||
"msgtype": "m.text",
|
||||
"body": "Hello continuwuity!"
|
||||
}
|
||||
)
|
||||
print("Matrix room send response:", response)
|
||||
assert isinstance(response, nio.RoomSendResponse)
|
||||
|
||||
# Sync responses
|
||||
response = await client.sync(timeout=30000)
|
||||
print("Matrix sync response:", response)
|
||||
assert isinstance(response, nio.SyncResponse)
|
||||
|
||||
# Check the message was received by continuwuity
|
||||
last_message = response.rooms.join[room_id].timeline.events[-1].body
|
||||
assert last_message == "Hello continuwuity!"
|
||||
|
||||
# Leave the room
|
||||
response = await client.room_leave(room_id)
|
||||
print("Matrix room leave response:", response)
|
||||
assert isinstance(response, nio.RoomLeaveResponse)
|
||||
|
||||
# Close the client
|
||||
await client.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
'';
|
||||
in
|
||||
{
|
||||
# run some nixos tests as checks
|
||||
checks = lib.pipe self'.packages [
|
||||
@@ -18,106 +81,69 @@
|
||||
# this test was initially yoinked from
|
||||
#
|
||||
# https://github.com/NixOS/nixpkgs/blob/960ce26339661b1b69c6f12b9063ca51b688615f/nixos/tests/matrix/continuwuity.nix
|
||||
(builtins.map (name: {
|
||||
name = "test-${name}";
|
||||
value = pkgs.testers.runNixOSTest {
|
||||
inherit name;
|
||||
(builtins.concatMap (
|
||||
name:
|
||||
builtins.map
|
||||
(
|
||||
{ config, suffix }:
|
||||
{
|
||||
name = "test-${name}-${suffix}";
|
||||
value = pkgs.testers.runNixOSTest {
|
||||
inherit name;
|
||||
|
||||
nodes = {
|
||||
continuwuity = {
|
||||
services.matrix-continuwuity = {
|
||||
enable = true;
|
||||
package = self'.packages.${name};
|
||||
settings.global = {
|
||||
nodes = {
|
||||
continuwuity = {
|
||||
services.matrix-continuwuity = {
|
||||
enable = true;
|
||||
package = self'.packages.${name};
|
||||
settings = config;
|
||||
extraEnvironment.RUST_BACKTRACE = "yes";
|
||||
};
|
||||
networking.firewall.allowedTCPPorts = [ 6167 ];
|
||||
};
|
||||
client.environment.systemPackages = [ baseTestScript ];
|
||||
};
|
||||
|
||||
testScript = ''
|
||||
start_all()
|
||||
|
||||
with subtest("start continuwuity"):
|
||||
continuwuity.wait_for_unit("continuwuity.service")
|
||||
continuwuity.wait_for_open_port(6167)
|
||||
|
||||
with subtest("ensure messages can be exchanged"):
|
||||
client.succeed("${lib.getExe baseTestScript} >&2")
|
||||
'';
|
||||
|
||||
};
|
||||
}
|
||||
)
|
||||
[
|
||||
{
|
||||
suffix = "base";
|
||||
config = {
|
||||
global = {
|
||||
server_name = name;
|
||||
address = [ "0.0.0.0" ];
|
||||
allow_registration = true;
|
||||
yes_i_am_very_very_sure_i_want_an_open_registration_server_prone_to_abuse = true;
|
||||
};
|
||||
extraEnvironment.RUST_BACKTRACE = "yes";
|
||||
};
|
||||
networking.firewall.allowedTCPPorts = [ 6167 ];
|
||||
};
|
||||
client =
|
||||
{ pkgs, ... }:
|
||||
{
|
||||
environment.systemPackages = [
|
||||
(pkgs.writers.writePython3Bin "do_test" { libraries = [ pkgs.python3Packages.matrix-nio ]; } ''
|
||||
import asyncio
|
||||
import nio
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
# Connect to continuwuity
|
||||
client = nio.AsyncClient("http://continuwuity:6167", "alice")
|
||||
|
||||
# Register as user alice
|
||||
response = await client.register("alice", "my-secret-password")
|
||||
|
||||
# Log in as user alice
|
||||
response = await client.login("my-secret-password")
|
||||
|
||||
# Create a new room
|
||||
response = await client.room_create(federate=False)
|
||||
print("Matrix room create response:", response)
|
||||
assert isinstance(response, nio.RoomCreateResponse)
|
||||
room_id = response.room_id
|
||||
|
||||
# Join the room
|
||||
response = await client.join(room_id)
|
||||
print("Matrix join response:", response)
|
||||
assert isinstance(response, nio.JoinResponse)
|
||||
|
||||
# Send a message to the room
|
||||
response = await client.room_send(
|
||||
room_id=room_id,
|
||||
message_type="m.room.message",
|
||||
content={
|
||||
"msgtype": "m.text",
|
||||
"body": "Hello continuwuity!"
|
||||
}
|
||||
)
|
||||
print("Matrix room send response:", response)
|
||||
assert isinstance(response, nio.RoomSendResponse)
|
||||
|
||||
# Sync responses
|
||||
response = await client.sync(timeout=30000)
|
||||
print("Matrix sync response:", response)
|
||||
assert isinstance(response, nio.SyncResponse)
|
||||
|
||||
# Check the message was received by continuwuity
|
||||
last_message = response.rooms.join[room_id].timeline.events[-1].body
|
||||
assert last_message == "Hello continuwuity!"
|
||||
|
||||
# Leave the room
|
||||
response = await client.room_leave(room_id)
|
||||
print("Matrix room leave response:", response)
|
||||
assert isinstance(response, nio.RoomLeaveResponse)
|
||||
|
||||
# Close the client
|
||||
await client.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
'')
|
||||
];
|
||||
}
|
||||
{
|
||||
suffix = "with-room-version";
|
||||
config = {
|
||||
global = {
|
||||
server_name = name;
|
||||
address = [ "0.0.0.0" ];
|
||||
allow_registration = true;
|
||||
yes_i_am_very_very_sure_i_want_an_open_registration_server_prone_to_abuse = true;
|
||||
default_room_version = "12";
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
testScript = ''
|
||||
start_all()
|
||||
|
||||
with subtest("start continuwuity"):
|
||||
continuwuity.wait_for_unit("continuwuity.service")
|
||||
continuwuity.wait_for_open_port(6167)
|
||||
|
||||
with subtest("ensure messages can be exchanged"):
|
||||
client.succeed("do_test >&2")
|
||||
'';
|
||||
|
||||
};
|
||||
}))
|
||||
}
|
||||
]
|
||||
))
|
||||
builtins.listToAttrs
|
||||
];
|
||||
};
|
||||
|
||||
@@ -31,7 +31,7 @@ pub(super) async fn last(&self, room_id: OwnedRoomOrAliasId) -> Result {
|
||||
.services
|
||||
.rooms
|
||||
.timeline
|
||||
.last_timeline_count(None, &room_id)
|
||||
.last_timeline_count(&room_id)
|
||||
.await?;
|
||||
|
||||
self.write_str(&format!("{result:#?}")).await
|
||||
@@ -52,7 +52,7 @@ pub(super) async fn pdus(
|
||||
.services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(None, &room_id, from)
|
||||
.pdus_rev(&room_id, from)
|
||||
.try_take(limit.unwrap_or(3))
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
||||
@@ -30,10 +30,31 @@ pub(super) async fn show_config(&self) -> Result {
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn reload_config(&self, path: Option<PathBuf>) -> Result {
|
||||
let path = path.as_deref().into_iter();
|
||||
self.services.config.reload(path)?;
|
||||
// The path argument is only what's optionally passed via the admin command,
|
||||
// so we need to merge it with the existing paths if any were given at startup.
|
||||
let mut paths = Vec::new();
|
||||
|
||||
self.write_str("Successfully reconfigured.").await
|
||||
// Add previously saved paths to the argument list
|
||||
self.services
|
||||
.config
|
||||
.config_paths
|
||||
.clone()
|
||||
.unwrap_or_default()
|
||||
.iter()
|
||||
.for_each(|p| paths.push(p.to_owned()));
|
||||
|
||||
// If a path is given, and it's not already in the list,
|
||||
// add it last, so that it overrides earlier files
|
||||
if let Some(p) = path {
|
||||
if !paths.contains(&p) {
|
||||
paths.push(p);
|
||||
}
|
||||
}
|
||||
|
||||
self.services.config.reload(&paths)?;
|
||||
|
||||
self.write_str(&format!("Successfully reconfigured from paths: {paths:?}"))
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
|
||||
@@ -59,7 +59,7 @@ pub(crate) async fn get_context_route(
|
||||
.rooms
|
||||
.timeline
|
||||
.get_pdu(event_id)
|
||||
.map_err(|_| err!(Request(NotFound("Base event not found."))));
|
||||
.map_err(|_| err!(Request(NotFound("Event not found."))));
|
||||
|
||||
let visible = services
|
||||
.rooms
|
||||
@@ -70,7 +70,7 @@ pub(crate) async fn get_context_route(
|
||||
let (base_id, base_pdu, visible) = try_join3(base_id, base_pdu, visible).await?;
|
||||
|
||||
if base_pdu.room_id_or_hash() != *room_id || base_pdu.event_id != *event_id {
|
||||
return Err!(Request(NotFound("Base event not found.")));
|
||||
return Err!(Request(NotFound("Event not found.")));
|
||||
}
|
||||
|
||||
if !visible {
|
||||
@@ -82,11 +82,25 @@ pub(crate) async fn get_context_route(
|
||||
|
||||
let base_event = ignored_filter(&services, (base_count, base_pdu), sender_user);
|
||||
|
||||
// PDUs are used to get seen user IDs and then returned in response.
|
||||
|
||||
let events_before = services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(Some(sender_user), room_id, Some(base_count))
|
||||
.pdus_rev(room_id, Some(base_count))
|
||||
.ignore_err()
|
||||
.then(async |mut pdu| {
|
||||
pdu.1.set_unsigned(Some(sender_user));
|
||||
if let Err(e) = services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
|
||||
.await
|
||||
{
|
||||
debug_warn!("Failed to add bundled aggregations: {e}");
|
||||
}
|
||||
pdu
|
||||
})
|
||||
.ready_filter_map(|item| event_filter(item, filter))
|
||||
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
|
||||
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
|
||||
@@ -96,8 +110,20 @@ pub(crate) async fn get_context_route(
|
||||
let events_after = services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus(Some(sender_user), room_id, Some(base_count))
|
||||
.pdus(room_id, Some(base_count))
|
||||
.ignore_err()
|
||||
.then(async |mut pdu| {
|
||||
pdu.1.set_unsigned(Some(sender_user));
|
||||
if let Err(e) = services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
|
||||
.await
|
||||
{
|
||||
debug_warn!("Failed to add bundled aggregations: {e}");
|
||||
}
|
||||
pdu
|
||||
})
|
||||
.ready_filter_map(|item| event_filter(item, filter))
|
||||
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
|
||||
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
room::member::{MembershipState, RoomMemberEventContent},
|
||||
},
|
||||
};
|
||||
use serde_json::value::RawValue;
|
||||
use service::Services;
|
||||
|
||||
use super::banned_room_check;
|
||||
@@ -146,7 +147,17 @@ pub(crate) async fn invite_helper(
|
||||
)
|
||||
.await?;
|
||||
|
||||
let invite_room_state = services.rooms.state.summary_stripped(&pdu, room_id).await;
|
||||
let invite_room_state = services
|
||||
.rooms
|
||||
.state
|
||||
.summary(&pdu, room_id)
|
||||
.await
|
||||
.into_iter()
|
||||
.map(|evt| RawValue::from_string(evt.json().get().to_owned()))
|
||||
.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.map_err(|e| {
|
||||
err!(Request(BadJson(warn!("Could not clone invite state event: {e}"))))
|
||||
})?;
|
||||
|
||||
drop(state_lock);
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use axum::extract::State;
|
||||
use axum_client_ip::InsecureClientIp;
|
||||
use conduwuit::{
|
||||
Err, Result, at,
|
||||
Err, Result, at, debug_warn,
|
||||
matrix::{
|
||||
event::{Event, Matches},
|
||||
pdu::PduCount,
|
||||
@@ -122,14 +122,14 @@ pub(crate) async fn get_message_events_route(
|
||||
| Direction::Forward => services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus(Some(sender_user), room_id, Some(from))
|
||||
.pdus(room_id, Some(from))
|
||||
.ignore_err()
|
||||
.boxed(),
|
||||
|
||||
| Direction::Backward => services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(Some(sender_user), room_id, Some(from))
|
||||
.pdus_rev(room_id, Some(from))
|
||||
.ignore_err()
|
||||
.boxed(),
|
||||
};
|
||||
@@ -140,6 +140,18 @@ pub(crate) async fn get_message_events_route(
|
||||
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
|
||||
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
|
||||
.take(limit)
|
||||
.then(async |mut pdu| {
|
||||
pdu.1.set_unsigned(Some(sender_user));
|
||||
if let Err(e) = services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
|
||||
.await
|
||||
{
|
||||
debug_warn!("Failed to add bundled aggregations: {e}");
|
||||
}
|
||||
pdu
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use axum::extract::State;
|
||||
use conduwuit::{
|
||||
Result, at,
|
||||
Err, Result, at, debug_warn,
|
||||
matrix::{Event, event::RelationTypeEqual, pdu::PduCount},
|
||||
utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt},
|
||||
};
|
||||
@@ -109,6 +109,16 @@ async fn paginate_relations_with_filter(
|
||||
recurse: bool,
|
||||
dir: Direction,
|
||||
) -> Result<get_relating_events::v1::Response> {
|
||||
if !services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.user_can_see_event(sender_user, room_id, target)
|
||||
.await
|
||||
{
|
||||
debug_warn!(req_evt = ?target, ?room_id, "Event relations requested by {sender_user} but is not allowed to see it, returning 404");
|
||||
return Err!(Request(NotFound("Event not found.")));
|
||||
}
|
||||
|
||||
let start: PduCount = from
|
||||
.map(str::parse)
|
||||
.transpose()?
|
||||
@@ -129,11 +139,6 @@ async fn paginate_relations_with_filter(
|
||||
// Spec (v1.10) recommends depth of at least 3
|
||||
let depth: u8 = if recurse { 3 } else { 1 };
|
||||
|
||||
// Check if this is a thread request
|
||||
let is_thread = filter_rel_type
|
||||
.as_ref()
|
||||
.is_some_and(|rel| *rel == RelationType::Thread);
|
||||
|
||||
let events: Vec<_> = services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
@@ -152,40 +157,24 @@ async fn paginate_relations_with_filter(
|
||||
})
|
||||
.stream()
|
||||
.ready_take_while(|(count, _)| Some(*count) != to)
|
||||
.wide_filter_map(|item| visibility_filter(services, sender_user, item))
|
||||
.take(limit)
|
||||
.wide_filter_map(|item| visibility_filter(services, sender_user, item))
|
||||
.then(async |mut pdu| {
|
||||
if let Err(e) = services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
|
||||
.await
|
||||
{
|
||||
debug_warn!("Failed to add bundled aggregations to relation: {e}");
|
||||
}
|
||||
pdu
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
// For threads, check if we should include the root event
|
||||
let mut root_event = None;
|
||||
if is_thread && dir == Direction::Backward {
|
||||
// Check if we've reached the beginning of the thread
|
||||
// (fewer events than requested means we've exhausted the thread)
|
||||
if events.len() < limit {
|
||||
// Try to get the thread root event
|
||||
if let Ok(root_pdu) = services.rooms.timeline.get_pdu(target).await {
|
||||
// Check visibility
|
||||
if services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.user_can_see_event(sender_user, room_id, target)
|
||||
.await
|
||||
{
|
||||
// Store the root event to add to the response
|
||||
root_event = Some(root_pdu);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Determine if there are more events to fetch
|
||||
let has_more = if root_event.is_some() {
|
||||
false // We've included the root, no more events
|
||||
} else {
|
||||
// Check if we got a full page of results (might be more)
|
||||
events.len() >= limit
|
||||
};
|
||||
let has_more = events.len() >= limit;
|
||||
|
||||
let next_batch = if has_more {
|
||||
match dir {
|
||||
@@ -197,11 +186,10 @@ async fn paginate_relations_with_filter(
|
||||
None
|
||||
};
|
||||
|
||||
// Build the response chunk with thread root if needed
|
||||
let chunk: Vec<_> = root_event
|
||||
let chunk: Vec<_> = events
|
||||
.into_iter()
|
||||
.map(at!(1))
|
||||
.map(Event::into_format)
|
||||
.chain(events.into_iter().map(at!(1)).map(Event::into_format))
|
||||
.collect();
|
||||
|
||||
Ok(get_relating_events::v1::Response {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use axum::extract::State;
|
||||
use conduwuit::{Err, Event, Result, err};
|
||||
use conduwuit::{Err, Event, Result, debug_warn, err};
|
||||
use futures::{FutureExt, TryFutureExt, future::try_join};
|
||||
use ruma::api::client::room::get_room_event;
|
||||
|
||||
@@ -33,7 +33,16 @@ pub(crate) async fn get_room_event_route(
|
||||
return Err!(Request(Forbidden("You don't have permission to view this event.")));
|
||||
}
|
||||
|
||||
event.add_age().ok();
|
||||
if let Err(e) = services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.add_bundled_aggregations_to_pdu(body.sender_user(), &mut event)
|
||||
.await
|
||||
{
|
||||
debug_warn!("Failed to add bundled aggregations to event: {e}");
|
||||
}
|
||||
|
||||
event.set_unsigned(body.sender_user.as_deref());
|
||||
|
||||
Ok(get_room_event::v3::Response { event: event.into_format() })
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use axum::extract::State;
|
||||
use conduwuit::{
|
||||
Err, Event, Result, at,
|
||||
Err, Event, Result, at, debug_warn,
|
||||
utils::{BoolExt, stream::TryTools},
|
||||
};
|
||||
use futures::{FutureExt, TryStreamExt, future::try_join4};
|
||||
@@ -40,12 +40,28 @@ pub(crate) async fn room_initial_sync_route(
|
||||
.map_ok(Event::into_format)
|
||||
.try_collect::<Vec<_>>();
|
||||
|
||||
// Events are returned in body
|
||||
|
||||
let limit = LIMIT_MAX;
|
||||
let events = services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(None, room_id, None)
|
||||
.pdus_rev(room_id, None)
|
||||
.try_take(limit)
|
||||
.and_then(async |mut pdu| {
|
||||
pdu.1.set_unsigned(body.sender_user.as_deref());
|
||||
if let Some(sender_user) = body.sender_user.as_deref() {
|
||||
if let Err(e) = services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
|
||||
.await
|
||||
{
|
||||
debug_warn!("Failed to add bundled aggregations: {e}");
|
||||
}
|
||||
}
|
||||
Ok(pdu)
|
||||
})
|
||||
.try_collect::<Vec<_>>();
|
||||
|
||||
let (membership, visibility, state, events) =
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
use axum::extract::State;
|
||||
use conduwuit::{
|
||||
Err, Result, at, is_true,
|
||||
Err, Result, at, debug_warn, is_true,
|
||||
matrix::Event,
|
||||
result::FlatOk,
|
||||
utils::{IterStream, stream::ReadyExt},
|
||||
@@ -50,7 +50,7 @@ pub(crate) async fn search_events_route(
|
||||
|
||||
Ok(Response {
|
||||
search_categories: ResultCategories {
|
||||
room_events: room_events_result
|
||||
room_events: Box::pin(room_events_result)
|
||||
.await
|
||||
.unwrap_or_else(|| Ok(ResultRoomEvents::default()))?,
|
||||
},
|
||||
@@ -110,7 +110,12 @@ async fn category_room_events(
|
||||
limit,
|
||||
};
|
||||
|
||||
let (count, results) = services.rooms.search.search_pdus(&query).await.ok()?;
|
||||
let (count, results) = services
|
||||
.rooms
|
||||
.search
|
||||
.search_pdus(&query, sender_user)
|
||||
.await
|
||||
.ok()?;
|
||||
|
||||
results
|
||||
.collect::<Vec<_>>()
|
||||
@@ -144,6 +149,17 @@ async fn category_room_events(
|
||||
.map(at!(2))
|
||||
.flatten()
|
||||
.stream()
|
||||
.then(|mut pdu| async {
|
||||
if let Err(e) = services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu)
|
||||
.await
|
||||
{
|
||||
debug_warn!("Failed to add bundled aggregations to search result: {e}");
|
||||
}
|
||||
pdu
|
||||
})
|
||||
.map(Event::into_format)
|
||||
.map(|result| SearchResult {
|
||||
rank: None,
|
||||
|
||||
@@ -158,7 +158,7 @@ pub(crate) async fn get_state_events_for_key_route(
|
||||
"content": event.content(),
|
||||
"event_id": event.event_id(),
|
||||
"origin_server_ts": event.origin_server_ts(),
|
||||
"room_id": event.room_id(),
|
||||
"room_id": event.room_id_or_hash(),
|
||||
"sender": event.sender(),
|
||||
"state_key": event.state_key(),
|
||||
"type": event.kind(),
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use conduwuit::{
|
||||
Event, PduCount, Result, err,
|
||||
Event, PduCount, Result, debug_warn, err,
|
||||
matrix::pdu::PduEvent,
|
||||
ref_at, trace,
|
||||
utils::stream::{BroadbandExt, ReadyExt, TryIgnore},
|
||||
@@ -53,7 +53,7 @@ async fn load_timeline(
|
||||
let last_timeline_count = services
|
||||
.rooms
|
||||
.timeline
|
||||
.last_timeline_count(Some(sender_user), room_id)
|
||||
.last_timeline_count(room_id)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
err!(Database(warn!("Failed to fetch end of room timeline: {}", err)))
|
||||
@@ -71,13 +71,24 @@ async fn load_timeline(
|
||||
services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(
|
||||
Some(sender_user),
|
||||
room_id,
|
||||
ending_count.map(|count| count.saturating_add(1)),
|
||||
)
|
||||
.pdus_rev(room_id, ending_count.map(|count| count.saturating_add(1)))
|
||||
.ignore_err()
|
||||
.ready_take_while(move |&(pducount, _)| pducount > starting_count)
|
||||
.map(move |mut pdu| {
|
||||
pdu.1.set_unsigned(Some(sender_user));
|
||||
pdu
|
||||
})
|
||||
.then(async move |mut pdu| {
|
||||
if let Err(e) = services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
|
||||
.await
|
||||
{
|
||||
debug_warn!("Failed to add bundled aggregations: {e}");
|
||||
}
|
||||
pdu
|
||||
})
|
||||
.boxed()
|
||||
},
|
||||
| None => {
|
||||
@@ -86,12 +97,23 @@ async fn load_timeline(
|
||||
services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(
|
||||
Some(sender_user),
|
||||
room_id,
|
||||
ending_count.map(|count| count.saturating_add(1)),
|
||||
)
|
||||
.pdus_rev(room_id, ending_count.map(|count| count.saturating_add(1)))
|
||||
.ignore_err()
|
||||
.map(move |mut pdu| {
|
||||
pdu.1.set_unsigned(Some(sender_user));
|
||||
pdu
|
||||
})
|
||||
.then(async move |mut pdu| {
|
||||
if let Err(e) = services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
|
||||
.await
|
||||
{
|
||||
debug_warn!("Failed to add bundled aggregations: {e}");
|
||||
}
|
||||
pdu
|
||||
})
|
||||
.boxed()
|
||||
},
|
||||
};
|
||||
|
||||
@@ -127,7 +127,7 @@ pub(super) async fn build_state_incremental<'a>(
|
||||
let last_pdu_of_last_sync = services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(Some(sender_user), room_id, Some(last_sync_end_count.saturating_add(1)))
|
||||
.pdus_rev(room_id, Some(last_sync_end_count.saturating_add(1)))
|
||||
.boxed()
|
||||
.next()
|
||||
.await
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use axum::extract::State;
|
||||
use conduwuit::{
|
||||
Result, at,
|
||||
Result, at, debug_warn,
|
||||
matrix::{
|
||||
Event,
|
||||
pdu::{PduCount, PduEvent},
|
||||
@@ -45,6 +45,17 @@ pub(crate) async fn get_threads_route(
|
||||
.await
|
||||
.then_some((count, pdu))
|
||||
})
|
||||
.then(|(count, mut pdu)| async move {
|
||||
if let Err(e) = services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.add_bundled_aggregations_to_pdu(body.sender_user(), &mut pdu)
|
||||
.await
|
||||
{
|
||||
debug_warn!("Failed to add bundled aggregations to thread: {e}");
|
||||
}
|
||||
(count, pdu)
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
use axum::extract::State;
|
||||
use conduwuit::{
|
||||
Event, PduCount, Result,
|
||||
result::LogErr,
|
||||
utils::{IterStream, ReadyExt, stream::TryTools},
|
||||
};
|
||||
use futures::{FutureExt, StreamExt, TryStreamExt};
|
||||
@@ -62,7 +63,7 @@ pub(crate) async fn get_backfill_route(
|
||||
pdus: services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(None, &body.room_id, Some(from.saturating_add(1)))
|
||||
.pdus_rev(&body.room_id, Some(from.saturating_add(1)))
|
||||
.try_take(limit)
|
||||
.try_filter_map(|(_, pdu)| async move {
|
||||
Ok(services
|
||||
@@ -72,6 +73,15 @@ pub(crate) async fn get_backfill_route(
|
||||
.await
|
||||
.then_some(pdu))
|
||||
})
|
||||
.and_then(async |mut pdu| {
|
||||
// Strip the transaction ID, as that is private
|
||||
pdu.remove_transaction_id().log_err().ok();
|
||||
// Add age, as this is specified
|
||||
pdu.add_age().log_err().ok();
|
||||
// It's not clear if we should strip or add any more data, leave as is.
|
||||
// In particular: Redaction?
|
||||
Ok(pdu)
|
||||
})
|
||||
.try_filter_map(|pdu| async move {
|
||||
Ok(services
|
||||
.rooms
|
||||
|
||||
@@ -1,29 +1,327 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use axum::extract::State;
|
||||
use axum_client_ip::InsecureClientIp;
|
||||
use base64::{Engine as _, engine::general_purpose};
|
||||
use conduwuit::{
|
||||
Err, Error, PduEvent, Result, err,
|
||||
matrix::{Event, event::gen_event_id},
|
||||
Err, Error, EventTypeExt, PduEvent, Result, RoomVersion, debug, debug_warn, err,
|
||||
matrix::{Event, StateKey, event::gen_event_id},
|
||||
trace,
|
||||
utils::{self, hash::sha256},
|
||||
warn,
|
||||
};
|
||||
use ruma::{
|
||||
CanonicalJsonValue, OwnedUserId, UserId,
|
||||
CanonicalJsonValue, OwnedUserId, RoomId, RoomVersionId, ServerName, UserId,
|
||||
api::{client::error::ErrorKind, federation::membership::create_invite},
|
||||
serde::JsonObject,
|
||||
events::{
|
||||
StateEventType,
|
||||
room::{
|
||||
create::RoomCreateEventContent,
|
||||
member::{MembershipState, RoomMemberEventContent},
|
||||
},
|
||||
},
|
||||
};
|
||||
use serde_json::value::RawValue;
|
||||
use service::{Services, rooms::timeline::pdu_fits};
|
||||
|
||||
use crate::Ruma;
|
||||
|
||||
/// Ensures that the state received from the invite endpoint is sane, correct,
|
||||
/// and complies with the room version's requirements.
|
||||
async fn check_invite_state(
|
||||
services: &Services,
|
||||
stripped_state: &Vec<Box<RawValue>>,
|
||||
room_id: &RoomId,
|
||||
room_version_id: &RoomVersionId,
|
||||
) -> Result<()> {
|
||||
let room_version = RoomVersion::new(room_version_id).map_err(|e| {
|
||||
err!(Request(UnsupportedRoomVersion("Invalid room version provided: {e}")))
|
||||
})?;
|
||||
let mut room_state: HashMap<(StateEventType, StateKey), PduEvent> = HashMap::new();
|
||||
|
||||
// Build the room state from the provided state events,
|
||||
// ensuring that there's no duplicates. We need to check that m.room.create is
|
||||
// present and lines up with the other things we've been told, and then verify
|
||||
// any signatures present to ensure this isn't forged.
|
||||
for raw_event in stripped_state {
|
||||
trace!("Processing invite state event: {:?}", raw_event);
|
||||
let canonical = utils::to_canonical_object(raw_event)?;
|
||||
let event_id = gen_event_id(&canonical, room_version_id)?;
|
||||
let event = PduEvent::from_id_val(&event_id, canonical.clone())
|
||||
.map_err(|e| err!(Request(InvalidParam("Invite state event is invalid: {e}"))))?;
|
||||
if event.state_key().is_none() {
|
||||
return Err!(Request(InvalidParam("Invite state event missing event type.")));
|
||||
}
|
||||
let key = event
|
||||
.event_type()
|
||||
.with_state_key(event.state_key().unwrap());
|
||||
if room_state.contains_key(&key) {
|
||||
return Err!(Request(InvalidParam("Duplicate state event found for {key:?}")));
|
||||
}
|
||||
|
||||
// verify the event
|
||||
if !pdu_fits(&canonical) {
|
||||
return Err!(Request(InvalidParam(
|
||||
"An invite state event ({event_id}) is too large"
|
||||
)));
|
||||
}
|
||||
services
|
||||
.server_keys
|
||||
.verify_event(&canonical, Some(room_version_id))
|
||||
.await
|
||||
.map_err(|e| {
|
||||
err!(Request(InvalidParam(
|
||||
"Signature failed verification on event {event_id}: {e}"
|
||||
)))
|
||||
})?;
|
||||
|
||||
// Ensure all events are in the same room
|
||||
if event.room_id_or_hash() != room_id {
|
||||
return Err!(Request(InvalidParam(
|
||||
"State event room ID for {} does not match the expected room ID {}.",
|
||||
event.event_id,
|
||||
room_id,
|
||||
)));
|
||||
}
|
||||
room_state.insert(key, event);
|
||||
}
|
||||
|
||||
// verify m.room.create is present, has a matching room ID, and a matching room
|
||||
// version.
|
||||
let create_event = room_state
|
||||
.get(&(StateEventType::RoomCreate, "".into()))
|
||||
.ok_or_else(|| err!(Request(MissingParam("Missing m.room.create in stripped state."))))?;
|
||||
let create_event_content: RoomCreateEventContent = create_event
|
||||
.get_content()
|
||||
.map_err(|e| err!(Request(InvalidParam("Invalid m.room.create content: {e}"))))?;
|
||||
// Room v12 removed room IDs over federation, so we'll need to see if the event
|
||||
// ID matches the room ID instead.
|
||||
if room_version.room_ids_as_hashes {
|
||||
let given_room_id = create_event.event_id().as_str().replace('$', "!");
|
||||
if given_room_id != room_id.as_str() {
|
||||
return Err!(Request(InvalidParam(
|
||||
"m.room.create event ID does not match the room ID."
|
||||
)));
|
||||
}
|
||||
} else if create_event.room_id().unwrap() != room_id {
|
||||
return Err!(Request(InvalidParam("m.room.create room ID does not match the room ID.")));
|
||||
}
|
||||
|
||||
// Make sure the room version matches
|
||||
if &create_event_content.room_version != room_version_id {
|
||||
return Err!(Request(InvalidParam(
|
||||
"m.room.create room version does not match the given room version."
|
||||
)));
|
||||
}
|
||||
|
||||
// Looks solid
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Ensures that the invite event received from the invite endpoint is sane,
|
||||
/// correct, and complies with the room version's requirements.
|
||||
/// Returns the invited user ID on success.
|
||||
async fn check_invite_event(
|
||||
services: &Services,
|
||||
invite_event: &PduEvent,
|
||||
origin: &ServerName,
|
||||
room_id: &RoomId,
|
||||
room_version_id: &RoomVersionId,
|
||||
) -> Result<OwnedUserId> {
|
||||
// Check: The event sender is not a user ID on the origin server.
|
||||
if invite_event.sender.server_name() != origin {
|
||||
return Err!(Request(InvalidParam(
|
||||
"Invite event sender's server does not match the origin server."
|
||||
)));
|
||||
}
|
||||
// Check: The `state_key` is not a user ID on the receiving server.
|
||||
let state_key: &UserId = invite_event
|
||||
.state_key()
|
||||
.ok_or_else(|| err!(Request(MissingParam("Invite event missing state_key."))))?
|
||||
.try_into()
|
||||
.map_err(|e| err!(Request(InvalidParam("Invalid state_key property: {e}"))))?;
|
||||
if !services.globals.server_is_ours(state_key.server_name()) {
|
||||
return Err!(Request(InvalidParam(
|
||||
"Invite event state_key does not belong to this homeserver."
|
||||
)));
|
||||
}
|
||||
|
||||
// Check: The event's room ID matches the expected room ID.
|
||||
if let Some(evt_room_id) = invite_event.room_id() {
|
||||
if evt_room_id != room_id {
|
||||
return Err!(Request(InvalidParam(
|
||||
"Invite event room ID does not match the expected room ID."
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
return Err!(Request(MissingParam("Invite event missing room ID.")));
|
||||
}
|
||||
|
||||
// Check: the membership really is "invite"
|
||||
let content = invite_event.get_content::<RoomMemberEventContent>()?;
|
||||
if content.membership != MembershipState::Invite {
|
||||
return Err!(Request(InvalidParam("Invite event is not a membership invite.")));
|
||||
}
|
||||
|
||||
// Check: signature is valid
|
||||
let as_obj = &mut utils::to_canonical_object(invite_event)?;
|
||||
// remove the event_id before verification
|
||||
as_obj.remove("event_id");
|
||||
services
|
||||
.server_keys
|
||||
.verify_event(as_obj, Some(room_version_id))
|
||||
.await
|
||||
.map_err(|e| {
|
||||
err!(Request(InvalidParam("Invite event signature failed verification: {e}")))
|
||||
})?;
|
||||
|
||||
Ok(state_key.to_owned())
|
||||
}
|
||||
|
||||
/// Performs only legacy checks on the invite, for use when the requesting
|
||||
/// server doesn't support matrix v1.16 invites.
|
||||
/// This is significantly less secure than the full checks and should only be
|
||||
/// used if the user has allowed it.
|
||||
async fn legacy_check_invite(
|
||||
services: &Services,
|
||||
origin: &ServerName,
|
||||
invite_event: &PduEvent,
|
||||
stripped_state: &Vec<Box<RawValue>>,
|
||||
room_id: &RoomId,
|
||||
room_version_id: &RoomVersionId,
|
||||
) -> Result<OwnedUserId> {
|
||||
// Ensure the sender is from origin, the state key is a user ID that points at a
|
||||
// local user, the event type is m.room.member with membership "invite", and
|
||||
// the room ID matches.
|
||||
if invite_event.sender().server_name() != origin {
|
||||
return Err!(Request(InvalidParam(
|
||||
"Invite event sender's server does not match the origin server."
|
||||
)));
|
||||
}
|
||||
let state_key: &UserId = invite_event
|
||||
.state_key()
|
||||
.ok_or_else(|| err!(Request(MissingParam("Invite event missing state_key."))))?
|
||||
.try_into()
|
||||
.map_err(|e| err!(Request(InvalidParam("Invalid state_key property: {e}"))))?;
|
||||
if !services.globals.server_is_ours(state_key.server_name()) {
|
||||
return Err!(Request(InvalidParam(
|
||||
"Invite event state_key does not belong to this homeserver."
|
||||
)));
|
||||
}
|
||||
if let Some(evt_room_id) = invite_event.room_id() {
|
||||
if evt_room_id != room_id {
|
||||
return Err!(Request(InvalidParam(
|
||||
"Invite event room ID does not match the expected room ID."
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
return Err!(Request(MissingParam("Invite event missing room ID.")));
|
||||
}
|
||||
let content = invite_event.get_content::<RoomMemberEventContent>()?;
|
||||
if content.membership != MembershipState::Invite {
|
||||
return Err!(Request(InvalidParam("Invite event is not a membership invite.")));
|
||||
}
|
||||
|
||||
// We can also opportunistically check that the m.room.create event is present
|
||||
// and matches the room version, to avoid accepting invites to rooms that
|
||||
// don't match.
|
||||
let mut has_create = false;
|
||||
for raw_event in stripped_state {
|
||||
let canonical = utils::to_canonical_object(raw_event)?;
|
||||
if canonical.get("room_id").is_none() {
|
||||
// This is a stripped event, skip
|
||||
continue;
|
||||
}
|
||||
if let Some(event_type) = canonical.get("type") {
|
||||
if event_type.as_str().unwrap_or_default() == "m.room.create" {
|
||||
has_create = true;
|
||||
let event_id = gen_event_id(&canonical, room_version_id)?;
|
||||
let event = PduEvent::from_id_val(&event_id, canonical.clone()).map_err(|e| {
|
||||
err!(Request(InvalidParam("Invite state event is invalid: {e}")))
|
||||
})?;
|
||||
|
||||
// We can verify that the room ID is correct now
|
||||
let version = RoomVersion::new(room_version_id)?;
|
||||
if version.room_ids_as_hashes {
|
||||
let given_room_id = event.event_id().as_str().replace('$', "!");
|
||||
if given_room_id != room_id.as_str() {
|
||||
return Err!(Request(InvalidParam(
|
||||
"m.room.create event ID does not match the room ID."
|
||||
)));
|
||||
}
|
||||
} else if event.room_id().unwrap() != room_id {
|
||||
return Err!(Request(InvalidParam(
|
||||
"m.room.create room ID does not match the room ID."
|
||||
)));
|
||||
}
|
||||
// Everything's as fine as we're getting with this event
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if !has_create {
|
||||
warn!(
|
||||
"federated invite is missing m.room.create event in stripped state, the remote \
|
||||
server is either outdated or trying something fishy."
|
||||
);
|
||||
}
|
||||
|
||||
Ok(state_key.to_owned())
|
||||
}
|
||||
|
||||
/// Checks the incoming event is allowed and not forged.
|
||||
/// If the MSC4311 enforcement experiment is enabled, performs full checks,
|
||||
/// otherwise performs legacy checks only.
|
||||
async fn check_invite(
|
||||
services: &Services,
|
||||
invite_event: &PduEvent,
|
||||
stripped_state: &Vec<Box<RawValue>>,
|
||||
origin: &ServerName,
|
||||
room_id: &RoomId,
|
||||
room_version_id: &RoomVersionId,
|
||||
) -> Result<OwnedUserId> {
|
||||
if services.config.experiments.enforce_msc4311 {
|
||||
debug!("Checking invite event validity");
|
||||
let user = check_invite_event(services, invite_event, origin, room_id, room_version_id)
|
||||
.await
|
||||
.inspect_err(|e| {
|
||||
debug_warn!("Invite event validity check failed: {e}");
|
||||
})?;
|
||||
debug!("Checking invite state validity");
|
||||
check_invite_state(services, stripped_state, room_id, room_version_id)
|
||||
.await
|
||||
.inspect_err(|e| {
|
||||
debug_warn!("Invite state validity check failed: {e}");
|
||||
})?;
|
||||
Ok(user)
|
||||
} else {
|
||||
debug!("Performing legacy invite checks");
|
||||
legacy_check_invite(
|
||||
services,
|
||||
origin,
|
||||
invite_event,
|
||||
stripped_state,
|
||||
room_id,
|
||||
room_version_id,
|
||||
)
|
||||
.await
|
||||
.inspect_err(|e| {
|
||||
debug_warn!("Legacy invite validity check failed: {e}");
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// # `PUT /_matrix/federation/v2/invite/{roomId}/{eventId}`
|
||||
///
|
||||
/// Invites a remote user to a room.
|
||||
#[tracing::instrument(skip_all, fields(%client), name = "invite")]
|
||||
#[tracing::instrument(skip_all, fields(%client, room_id=?body.room_id), name = "invite")]
|
||||
pub(crate) async fn create_invite_route(
|
||||
State(services): State<crate::State>,
|
||||
InsecureClientIp(client): InsecureClientIp,
|
||||
body: Ruma<create_invite::v2::Request>,
|
||||
) -> Result<create_invite::v2::Response> {
|
||||
debug!("Received invite request from {}: {:?}", body.room_id, body.origin());
|
||||
|
||||
// ACL check origin
|
||||
services
|
||||
.rooms
|
||||
@@ -32,6 +330,7 @@ pub(crate) async fn create_invite_route(
|
||||
.await?;
|
||||
|
||||
if !services.server.supported_room_version(&body.room_version) {
|
||||
debug_warn!("Unsupported room version: {}", body.room_version);
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::IncompatibleRoomVersion { room_version: body.room_version.clone() },
|
||||
"Server does not support this room version.",
|
||||
@@ -40,6 +339,7 @@ pub(crate) async fn create_invite_route(
|
||||
|
||||
if let Some(server) = body.room_id.server_name() {
|
||||
if services.moderation.is_remote_server_forbidden(server) {
|
||||
warn!("Received invite to room created by a banned server: {}. Rejecting.", server);
|
||||
return Err!(Request(Forbidden("Server is banned on this homeserver.")));
|
||||
}
|
||||
}
|
||||
@@ -49,7 +349,7 @@ pub(crate) async fn create_invite_route(
|
||||
.is_remote_server_forbidden(body.origin())
|
||||
{
|
||||
warn!(
|
||||
"Received federated/remote invite from banned server {} for room ID {}. Rejecting.",
|
||||
"Received invite from banned server {} for room ID {}. Rejecting.",
|
||||
body.origin(),
|
||||
body.room_id
|
||||
);
|
||||
@@ -60,17 +360,42 @@ pub(crate) async fn create_invite_route(
|
||||
let mut signed_event = utils::to_canonical_object(&body.event)
|
||||
.map_err(|_| err!(Request(InvalidParam("Invite event is invalid."))))?;
|
||||
|
||||
let recipient_user: OwnedUserId = signed_event
|
||||
.get("state_key")
|
||||
.try_into()
|
||||
.map(UserId::to_owned)
|
||||
.map_err(|e| err!(Request(InvalidParam("Invalid state_key property: {e}"))))?;
|
||||
// We need to hash and sign the event before we can generate the event ID.
|
||||
// It is important that this signed event does not get sent back to the caller
|
||||
// until we've verified this isn't incorrect.
|
||||
trace!(event=?signed_event, "Hashing & signing invite event");
|
||||
services
|
||||
.server_keys
|
||||
.hash_and_sign_event(&mut signed_event, &body.room_version)
|
||||
.map_err(|e| err!(Request(InvalidParam("Failed to sign event: {e}"))))?;
|
||||
let event_id = gen_event_id(&signed_event.clone(), &body.room_version)?;
|
||||
if event_id != body.event_id {
|
||||
warn!("Event ID mismatch: expected {}, got {}", event_id, body.event_id);
|
||||
return Err!(Request(InvalidParam("Event ID does not match the generated event ID.")));
|
||||
}
|
||||
|
||||
if !services
|
||||
.globals
|
||||
.server_is_ours(recipient_user.server_name())
|
||||
let pdu = PduEvent::from_id_val(&event_id, signed_event.clone())
|
||||
.map_err(|e| err!(Request(InvalidParam("Invite event is invalid: {e}"))))?;
|
||||
|
||||
let recipient_user = check_invite(
|
||||
&services,
|
||||
&pdu,
|
||||
&body.invite_room_state,
|
||||
body.origin(),
|
||||
&body.room_id,
|
||||
&body.room_version,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Make sure the room isn't banned and we allow invites
|
||||
if services.config.block_non_admin_invites && !services.users.is_admin(&recipient_user).await
|
||||
{
|
||||
return Err!(Request(InvalidParam("User does not belong to this homeserver.")));
|
||||
return Err!(Request(Forbidden("This server does not allow room invites.")));
|
||||
}
|
||||
if services.rooms.metadata.is_banned(&body.room_id).await
|
||||
&& !services.users.is_admin(&recipient_user).await
|
||||
{
|
||||
return Err!(Request(Forbidden("This room is banned on this homeserver.")));
|
||||
}
|
||||
|
||||
// Make sure we're not ACL'ed from their room.
|
||||
@@ -80,45 +405,9 @@ pub(crate) async fn create_invite_route(
|
||||
.acl_check(recipient_user.server_name(), &body.room_id)
|
||||
.await?;
|
||||
|
||||
services
|
||||
.server_keys
|
||||
.hash_and_sign_event(&mut signed_event, &body.room_version)
|
||||
.map_err(|e| err!(Request(InvalidParam("Failed to sign event: {e}"))))?;
|
||||
|
||||
// Generate event id
|
||||
let event_id = gen_event_id(&signed_event, &body.room_version)?;
|
||||
|
||||
// Add event_id back
|
||||
signed_event.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.to_string()));
|
||||
|
||||
let sender_user: &UserId = signed_event
|
||||
.get("sender")
|
||||
.try_into()
|
||||
.map_err(|e| err!(Request(InvalidParam("Invalid sender property: {e}"))))?;
|
||||
|
||||
if services.rooms.metadata.is_banned(&body.room_id).await
|
||||
&& !services.users.is_admin(&recipient_user).await
|
||||
{
|
||||
return Err!(Request(Forbidden("This room is banned on this homeserver.")));
|
||||
}
|
||||
|
||||
if services.config.block_non_admin_invites && !services.users.is_admin(&recipient_user).await
|
||||
{
|
||||
return Err!(Request(Forbidden("This server does not allow room invites.")));
|
||||
}
|
||||
|
||||
let mut invite_state = body.invite_room_state.clone();
|
||||
|
||||
let mut event: JsonObject = serde_json::from_str(body.event.get())
|
||||
.map_err(|e| err!(Request(BadJson("Invalid invite event PDU: {e}"))))?;
|
||||
|
||||
event.insert("event_id".to_owned(), "$placeholder".into());
|
||||
|
||||
let pdu: PduEvent = serde_json::from_value(event.into())
|
||||
.map_err(|e| err!(Request(BadJson("Invalid invite event PDU: {e}"))))?;
|
||||
|
||||
invite_state.push(pdu.to_format());
|
||||
|
||||
// If we are active in the room, the remote server will notify us about the
|
||||
// join/invite through /send. If we are not in the room, we need to manually
|
||||
// record the invited state for client /sync through update_membership(), and
|
||||
@@ -129,6 +418,19 @@ pub(crate) async fn create_invite_route(
|
||||
.server_in_room(services.globals.server_name(), &body.room_id)
|
||||
.await
|
||||
{
|
||||
let mut invite_state: Vec<CanonicalJsonValue> = body
|
||||
.invite_room_state
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|v| utils::to_canonical_object(&v).unwrap().into())
|
||||
.collect();
|
||||
|
||||
invite_state.push(pdu.to_canonical_object().into());
|
||||
let sender_user: &UserId = signed_event
|
||||
.get("sender")
|
||||
.try_into()
|
||||
.map_err(|e| err!(Request(InvalidParam("Invalid sender property: {e}"))))?;
|
||||
debug!("Marking user {} as invited to remote room {}", recipient_user, body.room_id);
|
||||
services
|
||||
.rooms
|
||||
.state_cache
|
||||
@@ -167,6 +469,7 @@ pub(crate) async fn create_invite_route(
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Invite is valid, returning signed event");
|
||||
Ok(create_invite::v2::Response {
|
||||
event: services
|
||||
.sending
|
||||
|
||||
@@ -175,11 +175,7 @@ pub(crate) async fn create_knock_event_v1_route(
|
||||
.send_pdu_room(&body.room_id, &pdu_id)
|
||||
.await?;
|
||||
|
||||
let knock_room_state = services
|
||||
.rooms
|
||||
.state
|
||||
.summary_stripped(&pdu, &body.room_id)
|
||||
.await;
|
||||
let knock_room_state = services.rooms.state.summary(&pdu, &body.room_id).await;
|
||||
|
||||
Ok(send_knock::v1::Response { knock_room_state })
|
||||
}
|
||||
|
||||
25
src/core/config/experiments/mod.rs
Normal file
25
src/core/config/experiments/mod.rs
Normal file
@@ -0,0 +1,25 @@
|
||||
mod msc4284_policy_servers;
|
||||
|
||||
use conduwuit_macros::config_example_generator;
|
||||
use serde::Deserialize;
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize)]
|
||||
#[config_example_generator(filename = "conduwuit-example.toml", section = "global.experiments")]
|
||||
pub struct Experiments {
|
||||
/// Enforce MSC4311's updated requirements on all incoming invites.
|
||||
///
|
||||
/// This drastically increases the security and filtering capabilities
|
||||
/// when processing invites over federation, at the cost of compatibility.
|
||||
/// Servers that do not implement MSC4311 will be unable to send invites
|
||||
/// to your server when this is enabled, including continuwuity 0.5.0 and
|
||||
/// below.
|
||||
///
|
||||
/// default: false
|
||||
/// Introduced in: (unreleased)
|
||||
#[serde(default)]
|
||||
pub enforce_msc4311: bool,
|
||||
|
||||
/// MSC4284 Policy Server support configuration.
|
||||
#[serde(default)]
|
||||
pub msc4284: msc4284_policy_servers::MSC4248,
|
||||
}
|
||||
58
src/core/config/experiments/msc4284_policy_servers.rs
Normal file
58
src/core/config/experiments/msc4284_policy_servers.rs
Normal file
@@ -0,0 +1,58 @@
|
||||
use conduwuit_macros::config_example_generator;
|
||||
use serde::Deserialize;
|
||||
|
||||
fn true_fn() -> bool { true }
|
||||
|
||||
fn default_federation_timeout() -> u64 { 25 }
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize)]
|
||||
#[config_example_generator(
|
||||
filename = "conduwuit-example.toml",
|
||||
section = "global.experiments.msc4284"
|
||||
)]
|
||||
pub struct MSC4248 {
|
||||
/// Enable or disable making requests to MSC4284 Policy Servers.
|
||||
/// It is recommended you keep this enabled unless you experience frequent
|
||||
/// connectivity issues, such as in a restricted networking environment.
|
||||
///
|
||||
/// default: true
|
||||
/// Introduced in: 0.5.0
|
||||
#[serde(default = "true_fn")]
|
||||
pub enabled: bool,
|
||||
|
||||
/// Enable running locally generated events through configured MSC4284
|
||||
/// policy servers. You may wish to disable this if your server is
|
||||
/// single-user for a slight speed benefit in some rooms, but otherwise
|
||||
/// should leave it enabled.
|
||||
///
|
||||
/// If the room's policy server configuration requires event signatures,
|
||||
/// this option is effectively ignored, as otherwise local events would
|
||||
/// be rejected for missing the policy server's signature.
|
||||
///
|
||||
/// default: true
|
||||
/// Introduced in: 0.5.0
|
||||
#[serde(default = "true_fn")]
|
||||
pub check_own_events: bool,
|
||||
|
||||
/// MSC4284 Policy server request timeout (seconds). Generally policy
|
||||
/// servers should respond near instantly, however may slow down under
|
||||
/// load. If a policy server doesn't respond in a short amount of time, the
|
||||
/// room it is configured in may become unusable if this limit is set too
|
||||
/// high. 25 seconds is a good default, however should be raised if you
|
||||
/// experience too many connection issues.
|
||||
///
|
||||
/// Please be aware that policy requests are *NOT* currently re-tried, so if
|
||||
/// a spam check request fails, the event will be assumed to be not spam,
|
||||
/// which in some cases may result in spam being sent to or received from
|
||||
/// the room that would typically be prevented.
|
||||
///
|
||||
/// If your request timeout is too low, and the policy server requires
|
||||
/// signatures, you may find that you are unable to send events that are
|
||||
/// accepted regardless.
|
||||
///
|
||||
/// About policy servers: https://matrix.org/blog/2025/04/introducing-policy-servers/
|
||||
/// default: 25
|
||||
/// Introduced in: 0.5.0
|
||||
#[serde(default = "default_federation_timeout")]
|
||||
pub request_timeout: u64,
|
||||
}
|
||||
@@ -1,12 +1,13 @@
|
||||
#![allow(clippy::doc_link_with_quotes)]
|
||||
pub mod check;
|
||||
pub mod experiments;
|
||||
pub mod manager;
|
||||
pub mod proxy;
|
||||
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
|
||||
path::{Path, PathBuf},
|
||||
path::PathBuf,
|
||||
};
|
||||
|
||||
use conduwuit_macros::config_example_generator;
|
||||
@@ -53,9 +54,13 @@
|
||||
### For more information, see:
|
||||
### https://continuwuity.org/configuration.html
|
||||
"#,
|
||||
ignore = "catchall well_known tls blurhashing allow_invalid_tls_certificates_yes_i_know_what_the_fuck_i_am_doing_with_this_and_i_know_this_is_insecure"
|
||||
ignore = "config_paths catchall well_known tls blurhashing allow_invalid_tls_certificates_yes_i_know_what_the_fuck_i_am_doing_with_this_and_i_know_this_is_insecure"
|
||||
)]
|
||||
pub struct Config {
|
||||
// Paths to config file(s). Not supposed to be set manually in the config file,
|
||||
// only updated dynamically from the --config option given at runtime.
|
||||
pub config_paths: Option<Vec<PathBuf>>,
|
||||
|
||||
/// The server_name is the pretty name of this server. It is used as a
|
||||
/// suffix for user and room IDs/aliases.
|
||||
///
|
||||
@@ -703,10 +708,13 @@ pub struct Config {
|
||||
pub allow_unstable_room_versions: bool,
|
||||
|
||||
/// Default room version continuwuity will create rooms with.
|
||||
/// Note that this has to be a string since the room version is a string
|
||||
/// rather than an integer. Forgetting the quotes will make the server fail
|
||||
/// to start!
|
||||
///
|
||||
/// Per spec, room version 11 is the default.
|
||||
/// Per spec, room version "11" is the default.
|
||||
///
|
||||
/// default: 11
|
||||
/// default: "11"
|
||||
#[serde(default = "default_default_room_version")]
|
||||
pub default_room_version: RoomVersionId,
|
||||
|
||||
@@ -1727,16 +1735,9 @@ pub struct Config {
|
||||
#[serde(default)]
|
||||
pub block_non_admin_invites: bool,
|
||||
|
||||
/// Enable or disable making requests to MSC4284 Policy Servers.
|
||||
/// It is recommended you keep this enabled unless you experience frequent
|
||||
/// connectivity issues, such as in a restricted networking environment.
|
||||
#[serde(default = "true_fn")]
|
||||
pub enable_msc4284_policy_servers: bool,
|
||||
|
||||
/// Enable running locally generated events through configured MSC4284
|
||||
/// policy servers. You may wish to disable this if your server is
|
||||
/// single-user for a slight speed benefit in some rooms, but otherwise
|
||||
/// should leave it enabled.
|
||||
#[serde(default = "true_fn")]
|
||||
pub policy_server_check_own_events: bool,
|
||||
|
||||
@@ -1996,6 +1997,13 @@ pub struct Config {
|
||||
// external structure; separate section
|
||||
#[serde(default)]
|
||||
pub blurhashing: BlurhashConfig,
|
||||
|
||||
/// Configuration for protocol experiments that enable experimental
|
||||
/// features. Each one is associated with a matrix spec proposal, a list of
|
||||
/// which are published at https://spec.matrix.org/proposals/
|
||||
#[serde(default)]
|
||||
pub experiments: experiments::Experiments,
|
||||
|
||||
#[serde(flatten)]
|
||||
#[allow(clippy::zero_sized_map_values)]
|
||||
// this is a catchall, the map shouldn't be zero at runtime
|
||||
@@ -2209,7 +2217,7 @@ struct ListeningAddr {
|
||||
addrs: Either<IpAddr, Vec<IpAddr>>,
|
||||
}
|
||||
|
||||
const DEPRECATED_KEYS: &[&str; 9] = &[
|
||||
const DEPRECATED_KEYS: &[&str; 11] = &[
|
||||
"cache_capacity",
|
||||
"conduit_cache_capacity_modifier",
|
||||
"max_concurrent_requests",
|
||||
@@ -2219,30 +2227,30 @@ struct ListeningAddr {
|
||||
"well_known_support_role",
|
||||
"well_known_support_email",
|
||||
"well_known_support_mxid",
|
||||
"enable_msc4284_policy_servers",
|
||||
"policy_server_check_own_events",
|
||||
];
|
||||
|
||||
impl Config {
|
||||
/// Pre-initialize config
|
||||
pub fn load<'a, I>(paths: I) -> Result<Figment>
|
||||
where
|
||||
I: Iterator<Item = &'a Path>,
|
||||
{
|
||||
pub fn load(paths: &[PathBuf]) -> Result<Figment> {
|
||||
let envs = [
|
||||
Env::var("CONDUIT_CONFIG"),
|
||||
Env::var("CONDUWUIT_CONFIG"),
|
||||
Env::var("CONTINUWUITY_CONFIG"),
|
||||
];
|
||||
|
||||
let config = envs
|
||||
let mut config = envs
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.map(Toml::file)
|
||||
.chain(paths.map(Toml::file))
|
||||
.chain(paths.iter().cloned().map(Toml::file))
|
||||
.fold(Figment::new(), |config, file| config.merge(file.nested()))
|
||||
.merge(Env::prefixed("CONDUIT_").global().split("__"))
|
||||
.merge(Env::prefixed("CONDUWUIT_").global().split("__"))
|
||||
.merge(Env::prefixed("CONTINUWUITY_").global().split("__"));
|
||||
|
||||
config = config.join(("config_paths", paths));
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
|
||||
@@ -14,11 +14,12 @@
|
||||
RoomVersionId::V9,
|
||||
RoomVersionId::V10,
|
||||
RoomVersionId::V11,
|
||||
RoomVersionId::V12,
|
||||
];
|
||||
|
||||
/// Experimental, partially supported room versions
|
||||
pub const UNSTABLE_ROOM_VERSIONS: &[RoomVersionId] =
|
||||
&[RoomVersionId::V3, RoomVersionId::V4, RoomVersionId::V5, RoomVersionId::V12];
|
||||
&[RoomVersionId::V3, RoomVersionId::V4, RoomVersionId::V5];
|
||||
|
||||
type RoomVersion = (RoomVersionId, RoomVersionStability);
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ fn from(event: Ref<'a, E>) -> Self {
|
||||
"content": content,
|
||||
"event_id": event.event_id(),
|
||||
"origin_server_ts": event.origin_server_ts(),
|
||||
"room_id": event.room_id(),
|
||||
"room_id": event.room_id_or_hash(),
|
||||
"sender": event.sender(),
|
||||
"type": event.kind(),
|
||||
});
|
||||
@@ -117,7 +117,7 @@ fn from(event: Ref<'a, E>) -> Self {
|
||||
"content": event.content(),
|
||||
"event_id": event.event_id(),
|
||||
"origin_server_ts": event.origin_server_ts(),
|
||||
"room_id": event.room_id(),
|
||||
"room_id": event.room_id_or_hash(),
|
||||
"sender": event.sender(),
|
||||
"state_key": event.state_key(),
|
||||
"type": event.kind(),
|
||||
|
||||
@@ -1,10 +1,23 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::{borrow::Borrow, collections::BTreeMap};
|
||||
|
||||
use ruma::MilliSecondsSinceUnixEpoch;
|
||||
use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value};
|
||||
|
||||
use super::Pdu;
|
||||
use crate::{Result, err, implement};
|
||||
use crate::{Result, err, implement, result::LogErr};
|
||||
|
||||
/// Set the `unsigned` field of the PDU using only information in the PDU.
|
||||
/// Some unsigned data is already set within the database (eg. prev events,
|
||||
/// threads). Once this is done, other data must be calculated from the database
|
||||
/// (eg. relations) This is for server-to-client events.
|
||||
/// Backfill handles this itself.
|
||||
#[implement(Pdu)]
|
||||
pub fn set_unsigned(&mut self, user_id: Option<&ruma::UserId>) {
|
||||
if Some(self.sender.borrow()) != user_id {
|
||||
self.remove_transaction_id().log_err().ok();
|
||||
}
|
||||
self.add_age().log_err().ok();
|
||||
}
|
||||
|
||||
#[implement(Pdu)]
|
||||
pub fn remove_transaction_id(&mut self) -> Result {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::{path::PathBuf, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use conduwuit_core::{
|
||||
Error, Result,
|
||||
@@ -38,14 +38,9 @@ pub(crate) fn new(
|
||||
) -> Result<Arc<Self>, Error> {
|
||||
let _runtime_guard = runtime.map(runtime::Handle::enter);
|
||||
|
||||
let config_paths = args
|
||||
.config
|
||||
.as_deref()
|
||||
.into_iter()
|
||||
.flat_map(<[_]>::iter)
|
||||
.map(PathBuf::as_path);
|
||||
let config_paths = args.config.clone().unwrap_or_default();
|
||||
|
||||
let config = Config::load(config_paths)
|
||||
let config = Config::load(&config_paths)
|
||||
.and_then(|raw| update(raw, args))
|
||||
.and_then(|raw| Config::new(&raw))?;
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::{iter, ops::Deref, path::Path, sync::Arc};
|
||||
use std::{ops::Deref, path::PathBuf, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use conduwuit::{
|
||||
@@ -51,7 +51,8 @@ fn handle_reload(&self) -> Result {
|
||||
])
|
||||
.expect("failed to notify systemd of reloading state");
|
||||
|
||||
self.reload(iter::empty())?;
|
||||
let config_paths = self.server.config.config_paths.clone().unwrap_or_default();
|
||||
self.reload(&config_paths)?;
|
||||
|
||||
#[cfg(all(feature = "systemd", target_os = "linux"))]
|
||||
sd_notify::notify(false, &[sd_notify::NotifyState::Ready])
|
||||
@@ -62,10 +63,7 @@ fn handle_reload(&self) -> Result {
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub fn reload<'a, I>(&self, paths: I) -> Result<Arc<Config>>
|
||||
where
|
||||
I: Iterator<Item = &'a Path>,
|
||||
{
|
||||
pub fn reload(&self, paths: &[PathBuf]) -> Result<Arc<Config>> {
|
||||
let old = self.server.config.clone();
|
||||
let new = Config::load(paths).and_then(|raw| Config::new(&raw))?;
|
||||
|
||||
|
||||
@@ -3,14 +3,21 @@
|
||||
//! This module implements a check against a room-specific policy server, as
|
||||
//! described in the relevant Matrix spec proposal (see: https://github.com/matrix-org/matrix-spec-proposals/pull/4284).
|
||||
|
||||
use std::time::Duration;
|
||||
use std::{collections::BTreeMap, time::Duration};
|
||||
|
||||
use conduwuit::{Err, Event, PduEvent, Result, debug, debug_info, implement, trace, warn};
|
||||
use conduwuit::{
|
||||
Err, Event, PduEvent, Result, debug, debug_error, debug_info, debug_warn, implement, trace,
|
||||
warn,
|
||||
};
|
||||
use ruma::{
|
||||
CanonicalJsonObject, RoomId, ServerName,
|
||||
api::federation::room::policy::v1::Request as PolicyRequest,
|
||||
CanonicalJsonObject, CanonicalJsonValue, KeyId, RoomId, ServerName, SigningKeyId,
|
||||
api::federation::room::{
|
||||
policy_check::unstable::Request as PolicyCheckRequest,
|
||||
policy_sign::unstable::Request as PolicySignRequest,
|
||||
},
|
||||
events::{StateEventType, room::policy::RoomPolicyEventContent},
|
||||
};
|
||||
use serde_json::value::RawValue;
|
||||
|
||||
/// Asks a remote policy server if the event is allowed.
|
||||
///
|
||||
@@ -24,25 +31,18 @@
|
||||
/// contacted for whatever reason, Err(e) is returned, which generally is a
|
||||
/// fail-open operation.
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(skip_all, level = "debug")]
|
||||
#[tracing::instrument(skip(self, pdu, pdu_json, room_id))]
|
||||
pub async fn ask_policy_server(
|
||||
&self,
|
||||
pdu: &PduEvent,
|
||||
pdu_json: &CanonicalJsonObject,
|
||||
pdu_json: &mut CanonicalJsonObject,
|
||||
room_id: &RoomId,
|
||||
incoming: bool,
|
||||
) -> Result<bool> {
|
||||
if !self.services.server.config.enable_msc4284_policy_servers {
|
||||
trace!("policy server checking is disabled");
|
||||
return Ok(true); // don't ever contact policy servers
|
||||
}
|
||||
if self.services.server.config.policy_server_check_own_events
|
||||
&& pdu.origin.is_some()
|
||||
&& self
|
||||
.services
|
||||
.server
|
||||
.is_ours(pdu.origin.as_ref().unwrap().as_str())
|
||||
{
|
||||
return Ok(true); // don't contact policy servers for locally generated events
|
||||
}
|
||||
|
||||
if *pdu.event_type() == StateEventType::RoomPolicy.into() {
|
||||
debug!(
|
||||
@@ -52,16 +52,29 @@ pub async fn ask_policy_server(
|
||||
);
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let Ok(policyserver) = self
|
||||
.services
|
||||
.state_accessor
|
||||
.room_state_get_content(room_id, &StateEventType::RoomPolicy, "")
|
||||
.await
|
||||
.inspect_err(|e| debug_error!("failed to load room policy server state event: {e}"))
|
||||
.map(|c: RoomPolicyEventContent| c)
|
||||
else {
|
||||
debug!("room has no policy server configured");
|
||||
return Ok(true);
|
||||
};
|
||||
|
||||
if self.services.server.config.policy_server_check_own_events
|
||||
&& !incoming
|
||||
&& policyserver.public_key.is_none()
|
||||
{
|
||||
// don't contact policy servers for locally generated events, but only when the
|
||||
// policy server does not require signatures
|
||||
trace!("won't contact policy server for locally generated event");
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let via = match policyserver.via {
|
||||
| Some(ref via) => ServerName::parse(via)?,
|
||||
| None => {
|
||||
@@ -75,7 +88,6 @@ pub async fn ask_policy_server(
|
||||
}
|
||||
if !self.services.state_cache.server_in_room(via, room_id).await {
|
||||
debug!(
|
||||
room_id = %room_id,
|
||||
via = %via,
|
||||
"Policy server is not in the room, skipping spam check"
|
||||
);
|
||||
@@ -86,17 +98,43 @@ pub async fn ask_policy_server(
|
||||
.sending
|
||||
.convert_to_outgoing_federation_event(pdu_json.clone())
|
||||
.await;
|
||||
if policyserver.public_key.is_some() {
|
||||
if !incoming {
|
||||
debug_info!(
|
||||
via = %via,
|
||||
outgoing = ?pdu_json,
|
||||
"Getting policy server signature on event"
|
||||
);
|
||||
return self
|
||||
.fetch_policy_server_signature(pdu, pdu_json, via, outgoing, room_id)
|
||||
.await;
|
||||
}
|
||||
// for incoming events, is it signed by <via> with the key
|
||||
// "ed25519:policy_server"?
|
||||
if let Some(CanonicalJsonValue::Object(sigs)) = pdu_json.get("signatures") {
|
||||
if let Some(CanonicalJsonValue::Object(server_sigs)) = sigs.get(via.as_str()) {
|
||||
let wanted_key_id: &KeyId<ruma::SigningKeyAlgorithm, ruma::Base64PublicKey> =
|
||||
SigningKeyId::parse("ed25519:policy_server")?;
|
||||
if let Some(CanonicalJsonValue::String(_sig_value)) =
|
||||
server_sigs.get(wanted_key_id.as_str())
|
||||
{
|
||||
// TODO: verify signature
|
||||
}
|
||||
}
|
||||
}
|
||||
debug!(
|
||||
"Event is not local and has no policy server signature, performing legacy spam check"
|
||||
);
|
||||
}
|
||||
debug_info!(
|
||||
room_id = %room_id,
|
||||
via = %via,
|
||||
outgoing = ?pdu_json,
|
||||
"Checking event for spam with policy server"
|
||||
"Checking event for spam with policy server via legacy check"
|
||||
);
|
||||
let response = tokio::time::timeout(
|
||||
Duration::from_secs(self.services.server.config.policy_server_request_timeout),
|
||||
self.services
|
||||
.sending
|
||||
.send_federation_request(via, PolicyRequest {
|
||||
.send_federation_request(via, PolicyCheckRequest {
|
||||
event_id: pdu.event_id().to_owned(),
|
||||
pdu: Some(outgoing),
|
||||
}),
|
||||
@@ -142,3 +180,120 @@ pub async fn ask_policy_server(
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Asks a remote policy server for a signature on this event.
|
||||
/// If the policy server signs this event, the original data is mutated.
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(skip_all, fields(event_id=%pdu.event_id(), via=%via))]
|
||||
pub async fn fetch_policy_server_signature(
|
||||
&self,
|
||||
pdu: &PduEvent,
|
||||
pdu_json: &mut CanonicalJsonObject,
|
||||
via: &ServerName,
|
||||
outgoing: Box<RawValue>,
|
||||
room_id: &RoomId,
|
||||
) -> Result<bool> {
|
||||
debug!("Requesting policy server signature");
|
||||
let response = tokio::time::timeout(
|
||||
Duration::from_secs(self.services.server.config.policy_server_request_timeout),
|
||||
self.services
|
||||
.sending
|
||||
.send_federation_request(via, PolicySignRequest { pdu: outgoing }),
|
||||
)
|
||||
.await;
|
||||
|
||||
let response = match response {
|
||||
| Ok(Ok(response)) => {
|
||||
debug!("Response from policy server: {:?}", response);
|
||||
response
|
||||
},
|
||||
| Ok(Err(e)) => {
|
||||
warn!(
|
||||
via = %via,
|
||||
event_id = %pdu.event_id(),
|
||||
room_id = %room_id,
|
||||
"Failed to contact policy server: {e}"
|
||||
);
|
||||
// Network or policy server errors are treated as non-fatal: event is allowed by
|
||||
// default.
|
||||
return Err(e);
|
||||
},
|
||||
| Err(elapsed) => {
|
||||
warn!(
|
||||
%via,
|
||||
event_id = %pdu.event_id(),
|
||||
%room_id,
|
||||
%elapsed,
|
||||
"Policy server request timed out after 10 seconds"
|
||||
);
|
||||
return Err!("Request to policy server timed out");
|
||||
},
|
||||
};
|
||||
if response.signatures.is_none() {
|
||||
debug!("Policy server refused to sign event");
|
||||
return Ok(false);
|
||||
}
|
||||
let sigs: ruma::Signatures<ruma::OwnedServerName, ruma::ServerSigningKeyVersion> =
|
||||
response.signatures.unwrap();
|
||||
if !sigs.contains_key(via) {
|
||||
debug_warn!(
|
||||
"Policy server returned signatures, but did not include the expected server name \
|
||||
'{}': {:?}",
|
||||
via,
|
||||
sigs
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
let keypairs = sigs.get(via).unwrap();
|
||||
let wanted_key_id = KeyId::parse("ed25519:policy_server")?;
|
||||
if !keypairs.contains_key(wanted_key_id) {
|
||||
debug_warn!(
|
||||
"Policy server returned signature, but did not use the key ID \
|
||||
'ed25519:policy_server'."
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
let signatures_entry = pdu_json
|
||||
.entry("signatures".to_owned())
|
||||
.or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default()));
|
||||
|
||||
if let CanonicalJsonValue::Object(signatures_map) = signatures_entry {
|
||||
let sig_value = keypairs.get(wanted_key_id).unwrap().to_owned();
|
||||
|
||||
match signatures_map.get_mut(via.as_str()) {
|
||||
| Some(CanonicalJsonValue::Object(inner_map)) => {
|
||||
trace!("inserting PS signature: {}", sig_value);
|
||||
inner_map.insert(
|
||||
"ed25519:policy_server".to_owned(),
|
||||
CanonicalJsonValue::String(sig_value),
|
||||
);
|
||||
},
|
||||
| Some(_) => {
|
||||
debug_warn!(
|
||||
"Existing `signatures[{}]` field is not an object; cannot insert policy \
|
||||
signature",
|
||||
via
|
||||
);
|
||||
return Ok(false);
|
||||
},
|
||||
| None => {
|
||||
let mut inner = BTreeMap::new();
|
||||
inner.insert(
|
||||
"ed25519:policy_server".to_owned(),
|
||||
CanonicalJsonValue::String(sig_value.clone()),
|
||||
);
|
||||
trace!(
|
||||
"created new signatures object for {via} with the signature {}",
|
||||
sig_value
|
||||
);
|
||||
signatures_map.insert(via.as_str().to_owned(), CanonicalJsonValue::Object(inner));
|
||||
},
|
||||
}
|
||||
} else {
|
||||
debug_warn!(
|
||||
"Existing `signatures` field is not an object; cannot insert policy signature"
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
@@ -256,7 +256,12 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
||||
if incoming_pdu.state_key.is_none() {
|
||||
debug!(event_id = %incoming_pdu.event_id, "Checking policy server for event");
|
||||
match self
|
||||
.ask_policy_server(&incoming_pdu, &incoming_pdu.to_canonical_object(), room_id)
|
||||
.ask_policy_server(
|
||||
&incoming_pdu,
|
||||
&mut incoming_pdu.to_canonical_object(),
|
||||
room_id,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
{
|
||||
| Ok(false) => {
|
||||
|
||||
746
src/service/rooms/pdu_metadata/bundled_aggregations.rs
Normal file
746
src/service/rooms/pdu_metadata/bundled_aggregations.rs
Normal file
@@ -0,0 +1,746 @@
|
||||
use conduwuit::{Event, PduEvent, Result, err};
|
||||
use ruma::{
|
||||
UserId,
|
||||
api::Direction,
|
||||
events::relation::{BundledMessageLikeRelations, BundledReference, ReferenceChunk},
|
||||
};
|
||||
|
||||
use crate::rooms::timeline::PdusIterItem;
|
||||
|
||||
const MAX_BUNDLED_RELATIONS: usize = 50;
|
||||
|
||||
impl super::Service {
|
||||
/// Gets bundled aggregations for an event according to the Matrix
|
||||
/// specification.
|
||||
/// - m.replace relations are bundled to include the most recent replacement
|
||||
/// event.
|
||||
/// - m.reference relations are bundled to include a chunk of event IDs.
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub async fn get_bundled_aggregations(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
pdu: &PduEvent,
|
||||
) -> Result<Option<BundledMessageLikeRelations<Box<serde_json::value::RawValue>>>> {
|
||||
// Events that can never get bundled aggregations
|
||||
if pdu.state_key().is_some() || Self::is_replacement_event(pdu) {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let relations = self
|
||||
.get_relations(
|
||||
user_id,
|
||||
&pdu.room_id_or_hash(),
|
||||
pdu.event_id(),
|
||||
conduwuit::PduCount::max(),
|
||||
MAX_BUNDLED_RELATIONS,
|
||||
0,
|
||||
Direction::Backward,
|
||||
)
|
||||
.await;
|
||||
|
||||
// The relations database code still handles the basic unsigned data
|
||||
// We don't want to recursively fetch relations
|
||||
|
||||
if relations.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Partition relations by type
|
||||
let (replace_events, reference_events): (Vec<_>, Vec<_>) = relations
|
||||
.iter()
|
||||
.filter_map(|relation| {
|
||||
let pdu = &relation.1;
|
||||
let content = pdu.get_content_as_value();
|
||||
|
||||
content
|
||||
.get("m.relates_to")
|
||||
.and_then(|relates_to| relates_to.get("rel_type"))
|
||||
.and_then(|rel_type| rel_type.as_str())
|
||||
.and_then(|rel_type_str| match rel_type_str {
|
||||
| "m.replace" => Some(RelationType::Replace(relation)),
|
||||
| "m.reference" => Some(RelationType::Reference(relation)),
|
||||
| _ => None, /* Ignore other relation types (threads are in DB but not
|
||||
* handled here) */
|
||||
})
|
||||
})
|
||||
.fold((Vec::new(), Vec::new()), |(mut replaces, mut references), rel_type| {
|
||||
match rel_type {
|
||||
| RelationType::Replace(r) => replaces.push(r),
|
||||
| RelationType::Reference(r) => references.push(r),
|
||||
}
|
||||
(replaces, references)
|
||||
});
|
||||
|
||||
// If no relations to bundle, return None
|
||||
if replace_events.is_empty() && reference_events.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let mut bundled = BundledMessageLikeRelations::<Box<serde_json::value::RawValue>>::new();
|
||||
|
||||
// Handle m.replace relations - find the most recent valid one (lazy load
|
||||
// original event)
|
||||
if !replace_events.is_empty() {
|
||||
if let Some(replacement) = self
|
||||
.find_most_recent_valid_replacement(user_id, pdu, &replace_events)
|
||||
.await?
|
||||
{
|
||||
bundled.replace = Some(Self::serialize_replacement(replacement)?);
|
||||
}
|
||||
}
|
||||
|
||||
// Handle m.reference relations - collect event IDs
|
||||
if !reference_events.is_empty() {
|
||||
let reference_chunk: Vec<_> = reference_events
|
||||
.into_iter()
|
||||
.map(|relation| BundledReference::new(relation.1.event_id().to_owned()))
|
||||
.collect();
|
||||
|
||||
if !reference_chunk.is_empty() {
|
||||
bundled.reference = Some(Box::new(ReferenceChunk::new(reference_chunk)));
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Handle other relation types (m.annotation, etc.) when specified
|
||||
|
||||
Ok(Some(bundled))
|
||||
}
|
||||
|
||||
/// Serialize a replacement event to the bundled format
|
||||
fn serialize_replacement(pdu: &PduEvent) -> Result<Box<Box<serde_json::value::RawValue>>> {
|
||||
let replacement_json = serde_json::to_string(pdu)
|
||||
.map_err(|e| err!(Database("Failed to serialize replacement event: {e}")))?;
|
||||
|
||||
let raw_value = serde_json::value::RawValue::from_string(replacement_json)
|
||||
.map_err(|e| err!(Database("Failed to create RawValue: {e}")))?;
|
||||
|
||||
Ok(Box::new(raw_value))
|
||||
}
|
||||
|
||||
/// Find the most recent valid replacement event based on origin_server_ts
|
||||
/// and lexicographic event_id ordering
|
||||
async fn find_most_recent_valid_replacement<'a>(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
original_event: &PduEvent,
|
||||
replacement_events: &[&'a PdusIterItem],
|
||||
) -> Result<Option<&'a PduEvent>> {
|
||||
// Filter valid replacements and find the maximum in a single pass
|
||||
let mut result: Option<&PduEvent> = None;
|
||||
|
||||
for relation in replacement_events {
|
||||
let pdu = &relation.1;
|
||||
|
||||
// Validate replacement
|
||||
if !Self::is_valid_replacement_event(original_event, pdu).await? {
|
||||
continue;
|
||||
}
|
||||
|
||||
let next = match result {
|
||||
| None => Some(pdu),
|
||||
| Some(current) => {
|
||||
// Compare by origin_server_ts first, then event_id lexicographically
|
||||
match pdu.origin_server_ts().cmp(¤t.origin_server_ts()) {
|
||||
| std::cmp::Ordering::Greater => Some(pdu),
|
||||
| std::cmp::Ordering::Equal if pdu.event_id() > current.event_id() =>
|
||||
Some(pdu),
|
||||
| _ => None,
|
||||
}
|
||||
},
|
||||
};
|
||||
if let Some(pdu) = next
|
||||
&& self
|
||||
.services
|
||||
.state_accessor
|
||||
.user_can_see_event(user_id, &pdu.room_id_or_hash(), pdu.event_id())
|
||||
.await
|
||||
{
|
||||
result = Some(pdu);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Adds bundled aggregations to a PDU's unsigned field
|
||||
#[tracing::instrument(skip(self, pdu), level = "debug")]
|
||||
pub async fn add_bundled_aggregations_to_pdu(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
pdu: &mut PduEvent,
|
||||
) -> Result<()> {
|
||||
if pdu.is_redacted() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let bundled_aggregations = self.get_bundled_aggregations(user_id, pdu).await?;
|
||||
|
||||
if let Some(aggregations) = bundled_aggregations {
|
||||
let aggregations_json = serde_json::to_value(aggregations)
|
||||
.map_err(|e| err!(Database("Failed to serialize bundled aggregations: {e}")))?;
|
||||
|
||||
Self::add_bundled_aggregations_to_unsigned(pdu, aggregations_json)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper method to add bundled aggregations to a PDU's unsigned field
|
||||
fn add_bundled_aggregations_to_unsigned(
|
||||
pdu: &mut PduEvent,
|
||||
aggregations_json: serde_json::Value,
|
||||
) -> Result<()> {
|
||||
use serde_json::{
|
||||
Map, Value as JsonValue,
|
||||
value::{RawValue as RawJsonValue, to_raw_value},
|
||||
};
|
||||
|
||||
let mut unsigned: Map<String, JsonValue> = pdu
|
||||
.unsigned
|
||||
.as_deref()
|
||||
.map(RawJsonValue::get)
|
||||
.map_or_else(|| Ok(Map::new()), serde_json::from_str)
|
||||
.map_err(|e| err!(Database("Invalid unsigned in pdu event: {e}")))?;
|
||||
|
||||
let relations = unsigned
|
||||
.entry("m.relations")
|
||||
.or_insert_with(|| JsonValue::Object(Map::new()))
|
||||
.as_object_mut()
|
||||
.ok_or_else(|| err!(Database("m.relations is not an object")))?;
|
||||
|
||||
if let JsonValue::Object(aggregations_map) = aggregations_json {
|
||||
relations.extend(aggregations_map);
|
||||
}
|
||||
|
||||
pdu.unsigned = Some(to_raw_value(&unsigned)?);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validates that an event is acceptable as a replacement for another event
|
||||
/// See C/S spec "Validity of replacement events"
|
||||
#[tracing::instrument(level = "debug")]
|
||||
async fn is_valid_replacement_event(
|
||||
original_event: &PduEvent,
|
||||
replacement_event: &PduEvent,
|
||||
) -> Result<bool> {
|
||||
Ok(
|
||||
// 1. Same room_id
|
||||
original_event.room_id() == replacement_event.room_id()
|
||||
// 2. Same sender
|
||||
&& original_event.sender() == replacement_event.sender()
|
||||
// 3. Same type
|
||||
&& original_event.event_type() == replacement_event.event_type()
|
||||
// 4. Neither event should have a state_key property
|
||||
&& original_event.state_key().is_none()
|
||||
&& replacement_event.state_key().is_none()
|
||||
// 5. Original event must not have rel_type of m.replace
|
||||
&& !Self::is_replacement_event(original_event)
|
||||
// 6. Replacement event must have m.new_content property (skip for encrypted)
|
||||
&& Self::has_new_content_or_encrypted(replacement_event),
|
||||
)
|
||||
}
|
||||
|
||||
/// Check if an event is itself a replacement
|
||||
#[inline]
|
||||
fn is_replacement_event(event: &PduEvent) -> bool {
|
||||
event
|
||||
.get_content_as_value()
|
||||
.get("m.relates_to")
|
||||
.and_then(|relates_to| relates_to.get("rel_type"))
|
||||
.and_then(|rel_type| rel_type.as_str())
|
||||
.is_some_and(|rel_type| rel_type == "m.replace")
|
||||
}
|
||||
|
||||
/// Check if event has m.new_content or is encrypted (where m.new_content
|
||||
/// would be in the encrypted payload)
|
||||
#[inline]
|
||||
fn has_new_content_or_encrypted(event: &PduEvent) -> bool {
|
||||
event.event_type() == &ruma::events::TimelineEventType::RoomEncrypted
|
||||
|| event.get_content_as_value().get("m.new_content").is_some()
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper enum for partitioning relations
|
||||
enum RelationType<'a> {
|
||||
Replace(&'a PdusIterItem),
|
||||
Reference(&'a PdusIterItem),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use conduwuit_core::pdu::{EventHash, PduEvent};
|
||||
use ruma::{UInt, events::TimelineEventType, owned_event_id, owned_room_id, owned_user_id};
|
||||
use serde_json::{Value as JsonValue, json, value::to_raw_value};
|
||||
|
||||
fn create_test_pdu(unsigned_content: Option<JsonValue>) -> PduEvent {
|
||||
PduEvent {
|
||||
event_id: owned_event_id!("$test:example.com"),
|
||||
room_id: Some(owned_room_id!("!test:example.com")),
|
||||
sender: owned_user_id!("@test:example.com"),
|
||||
origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(),
|
||||
kind: TimelineEventType::RoomMessage,
|
||||
content: to_raw_value(&json!({"msgtype": "m.text", "body": "test"})).unwrap(),
|
||||
state_key: None,
|
||||
prev_events: vec![],
|
||||
depth: UInt::from(1_u32),
|
||||
auth_events: vec![],
|
||||
redacts: None,
|
||||
unsigned: unsigned_content.map(|content| to_raw_value(&content).unwrap()),
|
||||
hashes: EventHash { sha256: "test_hash".to_owned() },
|
||||
signatures: None,
|
||||
origin: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn create_bundled_aggregations() -> JsonValue {
|
||||
json!({
|
||||
"m.replace": {
|
||||
"event_id": "$replace:example.com",
|
||||
"origin_server_ts": 1_234_567_890,
|
||||
"sender": "@replacer:example.com"
|
||||
},
|
||||
"m.reference": {
|
||||
"count": 5,
|
||||
"chunk": [
|
||||
"$ref1:example.com",
|
||||
"$ref2:example.com"
|
||||
]
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_add_bundled_aggregations_to_unsigned_no_existing_unsigned() {
|
||||
let mut pdu = create_test_pdu(None);
|
||||
let aggregations = create_bundled_aggregations();
|
||||
|
||||
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
|
||||
&mut pdu,
|
||||
aggregations.clone(),
|
||||
);
|
||||
assert!(result.is_ok(), "Should succeed when no unsigned field exists");
|
||||
|
||||
assert!(pdu.unsigned.is_some(), "Unsigned field should be created");
|
||||
|
||||
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
|
||||
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
|
||||
|
||||
assert!(unsigned.get("m.relations").is_some(), "m.relations should exist");
|
||||
assert_eq!(
|
||||
unsigned["m.relations"], aggregations,
|
||||
"Relations should match the aggregations"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_add_bundled_aggregations_to_unsigned_overwrite_same_relation_type() {
|
||||
let existing_unsigned = json!({
|
||||
"m.relations": {
|
||||
"m.replace": {
|
||||
"event_id": "$old_replace:example.com",
|
||||
"origin_server_ts": 1_111_111_111,
|
||||
"sender": "@old_replacer:example.com"
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut pdu = create_test_pdu(Some(existing_unsigned));
|
||||
let new_aggregations = create_bundled_aggregations();
|
||||
|
||||
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
|
||||
&mut pdu,
|
||||
new_aggregations.clone(),
|
||||
);
|
||||
assert!(result.is_ok(), "Should succeed when overwriting same relation type");
|
||||
|
||||
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
|
||||
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
|
||||
|
||||
let relations = &unsigned["m.relations"];
|
||||
|
||||
assert_eq!(
|
||||
relations["m.replace"], new_aggregations["m.replace"],
|
||||
"m.replace should be updated"
|
||||
);
|
||||
assert_eq!(
|
||||
relations["m.replace"]["event_id"], "$replace:example.com",
|
||||
"Should have new event_id"
|
||||
);
|
||||
|
||||
assert!(relations.get("m.reference").is_some(), "New m.reference should be added");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_add_bundled_aggregations_to_unsigned_preserve_other_unsigned_fields() {
|
||||
// Test case: Other unsigned fields should be preserved
|
||||
let existing_unsigned = json!({
|
||||
"age": 98765,
|
||||
"prev_content": {"msgtype": "m.text", "body": "old message"},
|
||||
"redacted_because": {"event_id": "$redaction:example.com"},
|
||||
"m.relations": {
|
||||
"m.annotation": {"count": 1}
|
||||
}
|
||||
});
|
||||
|
||||
let mut pdu = create_test_pdu(Some(existing_unsigned));
|
||||
let new_aggregations = json!({
|
||||
"m.replace": {"event_id": "$new:example.com"}
|
||||
});
|
||||
|
||||
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
|
||||
&mut pdu,
|
||||
new_aggregations,
|
||||
);
|
||||
assert!(result.is_ok(), "Should succeed while preserving other fields");
|
||||
|
||||
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
|
||||
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
|
||||
|
||||
// Verify all existing fields are preserved
|
||||
assert_eq!(unsigned["age"], 98765, "age should be preserved");
|
||||
assert!(unsigned.get("prev_content").is_some(), "prev_content should be preserved");
|
||||
assert!(
|
||||
unsigned.get("redacted_because").is_some(),
|
||||
"redacted_because should be preserved"
|
||||
);
|
||||
|
||||
// Verify relations were merged correctly
|
||||
let relations = &unsigned["m.relations"];
|
||||
assert!(
|
||||
relations.get("m.annotation").is_some(),
|
||||
"Existing m.annotation should be preserved"
|
||||
);
|
||||
assert!(relations.get("m.replace").is_some(), "New m.replace should be added");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_add_bundled_aggregations_to_unsigned_invalid_existing_unsigned() {
|
||||
// Test case: Invalid JSON in existing unsigned should result in error
|
||||
let mut pdu = create_test_pdu(None);
|
||||
// Manually set invalid unsigned data
|
||||
pdu.unsigned = Some(to_raw_value(&"invalid json").unwrap());
|
||||
|
||||
let aggregations = create_bundled_aggregations();
|
||||
let result =
|
||||
super::super::Service::add_bundled_aggregations_to_unsigned(&mut pdu, aggregations);
|
||||
|
||||
assert!(result.is_err(), "fails when existing unsigned is invalid");
|
||||
// Should we ignore the error and overwrite anyway?
|
||||
}
|
||||
|
||||
// Test helper function to create test PDU events
|
||||
fn create_test_event(
|
||||
event_id: &str,
|
||||
room_id: &str,
|
||||
sender: &str,
|
||||
event_type: TimelineEventType,
|
||||
content: &JsonValue,
|
||||
state_key: Option<&str>,
|
||||
) -> PduEvent {
|
||||
PduEvent {
|
||||
event_id: event_id.try_into().unwrap(),
|
||||
room_id: Some(room_id.try_into().unwrap()),
|
||||
sender: sender.try_into().unwrap(),
|
||||
origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(),
|
||||
kind: event_type,
|
||||
content: to_raw_value(&content).unwrap(),
|
||||
state_key: state_key.map(Into::into),
|
||||
prev_events: vec![],
|
||||
depth: UInt::from(1_u32),
|
||||
auth_events: vec![],
|
||||
redacts: None,
|
||||
unsigned: None,
|
||||
hashes: EventHash { sha256: "test_hash".to_owned() },
|
||||
signatures: None,
|
||||
origin: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Test that a valid replacement event passes validation
|
||||
#[tokio::test]
|
||||
async fn test_valid_replacement_event() {
|
||||
let original = create_test_event(
|
||||
"$original:example.com",
|
||||
"!room:example.com",
|
||||
"@user:example.com",
|
||||
TimelineEventType::RoomMessage,
|
||||
&json!({"msgtype": "m.text", "body": "original message"}),
|
||||
None,
|
||||
);
|
||||
|
||||
let replacement = create_test_event(
|
||||
"$replacement:example.com",
|
||||
"!room:example.com",
|
||||
"@user:example.com",
|
||||
TimelineEventType::RoomMessage,
|
||||
&json!({
|
||||
"msgtype": "m.text",
|
||||
"body": "* edited message",
|
||||
"m.new_content": {
|
||||
"msgtype": "m.text",
|
||||
"body": "edited message"
|
||||
},
|
||||
"m.relates_to": {
|
||||
"rel_type": "m.replace",
|
||||
"event_id": "$original:example.com"
|
||||
}
|
||||
}),
|
||||
None,
|
||||
);
|
||||
|
||||
let result =
|
||||
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
|
||||
assert!(result.is_ok(), "Validation should succeed");
|
||||
assert!(result.unwrap(), "Valid replacement event should be accepted");
|
||||
}
|
||||
|
||||
/// Test replacement event with different room ID is rejected
|
||||
#[tokio::test]
|
||||
async fn test_replacement_event_different_room() {
|
||||
let original = create_test_event(
|
||||
"$original:example.com",
|
||||
"!room1:example.com",
|
||||
"@user:example.com",
|
||||
TimelineEventType::RoomMessage,
|
||||
&json!({"msgtype": "m.text", "body": "original message"}),
|
||||
None,
|
||||
);
|
||||
|
||||
let replacement = create_test_event(
|
||||
"$replacement:example.com",
|
||||
"!room2:example.com", // Different room
|
||||
"@user:example.com",
|
||||
TimelineEventType::RoomMessage,
|
||||
&json!({
|
||||
"msgtype": "m.text",
|
||||
"body": "* edited message",
|
||||
"m.new_content": {
|
||||
"msgtype": "m.text",
|
||||
"body": "edited message"
|
||||
}
|
||||
}),
|
||||
None,
|
||||
);
|
||||
|
||||
let result =
|
||||
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
|
||||
assert!(result.is_ok(), "Validation should succeed");
|
||||
assert!(!result.unwrap(), "Different room ID should be rejected");
|
||||
}
|
||||
|
||||
/// Test replacement event with different sender is rejected
|
||||
#[tokio::test]
|
||||
async fn test_replacement_event_different_sender() {
|
||||
let original = create_test_event(
|
||||
"$original:example.com",
|
||||
"!room:example.com",
|
||||
"@user1:example.com",
|
||||
TimelineEventType::RoomMessage,
|
||||
&json!({"msgtype": "m.text", "body": "original message"}),
|
||||
None,
|
||||
);
|
||||
|
||||
let replacement = create_test_event(
|
||||
"$replacement:example.com",
|
||||
"!room:example.com",
|
||||
"@user2:example.com", // Different sender
|
||||
TimelineEventType::RoomMessage,
|
||||
&json!({
|
||||
"msgtype": "m.text",
|
||||
"body": "* edited message",
|
||||
"m.new_content": {
|
||||
"msgtype": "m.text",
|
||||
"body": "edited message"
|
||||
}
|
||||
}),
|
||||
None,
|
||||
);
|
||||
|
||||
let result =
|
||||
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
|
||||
assert!(result.is_ok(), "Validation should succeed");
|
||||
assert!(!result.unwrap(), "Different sender should be rejected");
|
||||
}
|
||||
|
||||
/// Test replacement event with different type is rejected
|
||||
#[tokio::test]
|
||||
async fn test_replacement_event_different_type() {
|
||||
let original = create_test_event(
|
||||
"$original:example.com",
|
||||
"!room:example.com",
|
||||
"@user:example.com",
|
||||
TimelineEventType::RoomMessage,
|
||||
&json!({"msgtype": "m.text", "body": "original message"}),
|
||||
None,
|
||||
);
|
||||
|
||||
let replacement = create_test_event(
|
||||
"$replacement:example.com",
|
||||
"!room:example.com",
|
||||
"@user:example.com",
|
||||
TimelineEventType::RoomTopic, // Different event type
|
||||
&json!({
|
||||
"topic": "new topic",
|
||||
"m.new_content": {
|
||||
"topic": "new topic"
|
||||
}
|
||||
}),
|
||||
None,
|
||||
);
|
||||
|
||||
let result =
|
||||
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
|
||||
assert!(result.is_ok(), "Validation should succeed");
|
||||
assert!(!result.unwrap(), "Different event type should be rejected");
|
||||
}
|
||||
|
||||
/// Test replacement event with state key is rejected
|
||||
#[tokio::test]
|
||||
async fn test_replacement_event_with_state_key() {
|
||||
let original = create_test_event(
|
||||
"$original:example.com",
|
||||
"!room:example.com",
|
||||
"@user:example.com",
|
||||
TimelineEventType::RoomName,
|
||||
&json!({"name": "room name"}),
|
||||
Some(""), // Has state key
|
||||
);
|
||||
|
||||
let replacement = create_test_event(
|
||||
"$replacement:example.com",
|
||||
"!room:example.com",
|
||||
"@user:example.com",
|
||||
TimelineEventType::RoomName,
|
||||
&json!({
|
||||
"name": "new room name",
|
||||
"m.new_content": {
|
||||
"name": "new room name"
|
||||
}
|
||||
}),
|
||||
None,
|
||||
);
|
||||
|
||||
let result =
|
||||
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
|
||||
assert!(result.is_ok(), "Validation should succeed");
|
||||
assert!(!result.unwrap(), "Event with state key should be rejected");
|
||||
}
|
||||
|
||||
/// Test replacement of an event that is already a replacement is rejected
|
||||
#[tokio::test]
|
||||
async fn test_replacement_event_original_is_replacement() {
|
||||
let original = create_test_event(
|
||||
"$original:example.com",
|
||||
"!room:example.com",
|
||||
"@user:example.com",
|
||||
TimelineEventType::RoomMessage,
|
||||
&json!({
|
||||
"msgtype": "m.text",
|
||||
"body": "* edited message",
|
||||
"m.relates_to": {
|
||||
"rel_type": "m.replace", // Original is already a replacement
|
||||
"event_id": "$some_other:example.com"
|
||||
}
|
||||
}),
|
||||
None,
|
||||
);
|
||||
|
||||
let replacement = create_test_event(
|
||||
"$replacement:example.com",
|
||||
"!room:example.com",
|
||||
"@user:example.com",
|
||||
TimelineEventType::RoomMessage,
|
||||
&json!({
|
||||
"msgtype": "m.text",
|
||||
"body": "* edited again",
|
||||
"m.new_content": {
|
||||
"msgtype": "m.text",
|
||||
"body": "edited again"
|
||||
}
|
||||
}),
|
||||
None,
|
||||
);
|
||||
|
||||
let result =
|
||||
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
|
||||
assert!(result.is_ok(), "Validation should succeed");
|
||||
assert!(!result.unwrap(), "Replacement of replacement should be rejected");
|
||||
}
|
||||
|
||||
/// Test replacement event missing m.new_content is rejected
|
||||
#[tokio::test]
|
||||
async fn test_replacement_event_missing_new_content() {
|
||||
let original = create_test_event(
|
||||
"$original:example.com",
|
||||
"!room:example.com",
|
||||
"@user:example.com",
|
||||
TimelineEventType::RoomMessage,
|
||||
&json!({"msgtype": "m.text", "body": "original message"}),
|
||||
None,
|
||||
);
|
||||
|
||||
let replacement = create_test_event(
|
||||
"$replacement:example.com",
|
||||
"!room:example.com",
|
||||
"@user:example.com",
|
||||
TimelineEventType::RoomMessage,
|
||||
&json!({
|
||||
"msgtype": "m.text",
|
||||
"body": "* edited message"
|
||||
// Missing m.new_content
|
||||
}),
|
||||
None,
|
||||
);
|
||||
|
||||
let result =
|
||||
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
|
||||
assert!(result.is_ok(), "Validation should succeed");
|
||||
assert!(!result.unwrap(), "Missing m.new_content should be rejected");
|
||||
}
|
||||
|
||||
/// Test encrypted replacement event without m.new_content is accepted
|
||||
#[tokio::test]
|
||||
async fn test_replacement_event_encrypted_missing_new_content_is_valid() {
|
||||
let original = create_test_event(
|
||||
"$original:example.com",
|
||||
"!room:example.com",
|
||||
"@user:example.com",
|
||||
TimelineEventType::RoomEncrypted,
|
||||
&json!({
|
||||
"algorithm": "m.megolm.v1.aes-sha2",
|
||||
"ciphertext": "encrypted_payload_base64",
|
||||
"sender_key": "sender_key",
|
||||
"session_id": "session_id"
|
||||
}),
|
||||
None,
|
||||
);
|
||||
|
||||
let replacement = create_test_event(
|
||||
"$replacement:example.com",
|
||||
"!room:example.com",
|
||||
"@user:example.com",
|
||||
TimelineEventType::RoomEncrypted,
|
||||
&json!({
|
||||
"algorithm": "m.megolm.v1.aes-sha2",
|
||||
"ciphertext": "encrypted_replacement_payload_base64",
|
||||
"sender_key": "sender_key",
|
||||
"session_id": "session_id",
|
||||
"m.relates_to": {
|
||||
"rel_type": "m.replace",
|
||||
"event_id": "$original:example.com"
|
||||
}
|
||||
// No m.new_content in cleartext - this is valid for encrypted events
|
||||
}),
|
||||
None,
|
||||
);
|
||||
|
||||
let result =
|
||||
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
|
||||
assert!(result.is_ok(), "Validation should succeed");
|
||||
assert!(
|
||||
result.unwrap(),
|
||||
"Encrypted replacement without cleartext m.new_content should be accepted"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,6 @@
|
||||
use conduwuit::{
|
||||
arrayvec::ArrayVec,
|
||||
matrix::{Event, PduCount},
|
||||
result::LogErr,
|
||||
utils::{
|
||||
ReadyExt,
|
||||
stream::{TryIgnore, WidebandExt},
|
||||
@@ -15,10 +14,11 @@
|
||||
use ruma::{EventId, RoomId, UserId, api::Direction};
|
||||
|
||||
use crate::{
|
||||
Dep, rooms,
|
||||
Dep,
|
||||
rooms::{
|
||||
self,
|
||||
short::{ShortEventId, ShortRoomId},
|
||||
timeline::{PduId, RawPduId},
|
||||
timeline::{PduId, PdusIterItem, RawPduId},
|
||||
},
|
||||
};
|
||||
|
||||
@@ -60,7 +60,7 @@ pub(super) fn get_relations<'a>(
|
||||
target: ShortEventId,
|
||||
from: PduCount,
|
||||
dir: Direction,
|
||||
) -> impl Stream<Item = (PduCount, impl Event)> + Send + 'a {
|
||||
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
|
||||
// Query from exact position then filter excludes it (saturating_inc could skip
|
||||
// events at min/max boundaries)
|
||||
let from_unsigned = from.into_unsigned();
|
||||
@@ -92,9 +92,7 @@ pub(super) fn get_relations<'a>(
|
||||
|
||||
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
|
||||
|
||||
if pdu.sender() != user_id {
|
||||
pdu.as_mut_pdu().remove_transaction_id().log_err().ok();
|
||||
}
|
||||
pdu.as_mut_pdu().set_unsigned(Some(user_id));
|
||||
|
||||
Some((shorteventid, pdu))
|
||||
})
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
mod bundled_aggregations;
|
||||
mod data;
|
||||
use std::sync::Arc;
|
||||
|
||||
use conduwuit::{
|
||||
Result,
|
||||
matrix::{Event, PduCount},
|
||||
};
|
||||
use conduwuit::{Result, matrix::PduCount};
|
||||
use futures::{StreamExt, future::try_join};
|
||||
use ruma::{EventId, RoomId, UserId, api::Direction};
|
||||
|
||||
use self::data::Data;
|
||||
use crate::{Dep, rooms};
|
||||
use crate::{
|
||||
Dep,
|
||||
rooms::{self, timeline::PdusIterItem},
|
||||
};
|
||||
|
||||
pub struct Service {
|
||||
services: Services,
|
||||
@@ -19,6 +20,7 @@ pub struct Service {
|
||||
struct Services {
|
||||
short: Dep<rooms::short::Service>,
|
||||
timeline: Dep<rooms::timeline::Service>,
|
||||
state_accessor: Dep<rooms::state_accessor::Service>,
|
||||
}
|
||||
|
||||
impl crate::Service for Service {
|
||||
@@ -27,6 +29,8 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
services: Services {
|
||||
short: args.depend::<rooms::short::Service>("rooms::short"),
|
||||
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
|
||||
state_accessor: args
|
||||
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
|
||||
},
|
||||
db: Data::new(&args),
|
||||
}))
|
||||
@@ -56,7 +60,7 @@ pub async fn get_relations<'a>(
|
||||
limit: usize,
|
||||
max_depth: u8,
|
||||
dir: Direction,
|
||||
) -> Vec<(PduCount, impl Event)> {
|
||||
) -> Vec<PdusIterItem> {
|
||||
let room_id = self.services.short.get_shortroomid(room_id);
|
||||
|
||||
let target = self.services.timeline.get_pdu_count(target);
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use conduwuit::{
|
||||
PduCount, Result,
|
||||
PduCount, PduEvent, Result,
|
||||
arrayvec::ArrayVec,
|
||||
implement,
|
||||
debug_warn, implement,
|
||||
matrix::event::{Event, Matches},
|
||||
utils::{
|
||||
ArrayVecExt, IterStream, ReadyExt, set,
|
||||
@@ -35,6 +35,7 @@ struct Services {
|
||||
short: Dep<rooms::short::Service>,
|
||||
state_accessor: Dep<rooms::state_accessor::Service>,
|
||||
timeline: Dep<rooms::timeline::Service>,
|
||||
pdu_metadata: Dep<rooms::pdu_metadata::Service>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -61,6 +62,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
state_accessor: args
|
||||
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
|
||||
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
|
||||
pdu_metadata: args.depend::<rooms::pdu_metadata::Service>("rooms::pdu_metadata"),
|
||||
},
|
||||
}))
|
||||
}
|
||||
@@ -104,7 +106,8 @@ pub fn deindex_pdu(&self, shortroomid: ShortRoomId, pdu_id: &RawPduId, message_b
|
||||
pub async fn search_pdus<'a>(
|
||||
&'a self,
|
||||
query: &'a RoomQuery<'a>,
|
||||
) -> Result<(usize, impl Stream<Item = impl Event + use<>> + Send + 'a)> {
|
||||
sender_user: &'a UserId,
|
||||
) -> Result<(usize, impl Stream<Item = PduEvent> + Send + 'a)> {
|
||||
let pdu_ids: Vec<_> = self.search_pdu_ids(query).await?.collect().await;
|
||||
|
||||
let filter = &query.criteria.filter;
|
||||
@@ -129,7 +132,23 @@ pub async fn search_pdus<'a>(
|
||||
.then_some(pdu)
|
||||
})
|
||||
.skip(query.skip)
|
||||
.take(query.limit);
|
||||
.take(query.limit)
|
||||
.map(move |mut pdu| {
|
||||
pdu.set_unsigned(query.user_id);
|
||||
|
||||
pdu
|
||||
})
|
||||
.then(async move |mut pdu| {
|
||||
if let Err(e) = self
|
||||
.services
|
||||
.pdu_metadata
|
||||
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu)
|
||||
.await
|
||||
{
|
||||
debug_warn!("Failed to add bundled aggregations: {e}");
|
||||
}
|
||||
pdu
|
||||
});
|
||||
|
||||
Ok((count, pdus))
|
||||
}
|
||||
|
||||
@@ -264,6 +264,8 @@ fn get_space_child_events<'a>(
|
||||
if content.via.is_empty() {
|
||||
return None;
|
||||
}
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
|
||||
if RoomId::parse(&state_key).is_err() {
|
||||
|
||||
@@ -19,8 +19,7 @@
|
||||
use ruma::{
|
||||
EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, UserId,
|
||||
events::{
|
||||
AnyStrippedStateEvent, StateEventType, TimelineEventType,
|
||||
room::create::RoomCreateEventContent,
|
||||
AnyStateEvent, StateEventType, TimelineEventType, room::create::RoomCreateEventContent,
|
||||
},
|
||||
serde::Raw,
|
||||
};
|
||||
@@ -307,12 +306,22 @@ pub async fn append_to_state(&self, new_pdu: &PduEvent, room_id: &RoomId) -> Res
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a summary of the room state for invites and knock responses.
|
||||
///
|
||||
/// This used to return stripped state, but now returns complete events.
|
||||
///
|
||||
/// Returns:
|
||||
///
|
||||
/// - m.room.create
|
||||
/// - m.room.join_rules
|
||||
/// - m.room.canonical_alias
|
||||
/// - m.room.name
|
||||
/// - m.room.avatar
|
||||
/// - m.room.member (of the event sender)
|
||||
/// - m.room.encryption
|
||||
/// - m.room.topic
|
||||
#[tracing::instrument(skip_all, level = "debug")]
|
||||
pub async fn summary_stripped<'a, E>(
|
||||
&self,
|
||||
event: &'a E,
|
||||
room_id: &RoomId,
|
||||
) -> Vec<Raw<AnyStrippedStateEvent>>
|
||||
pub async fn summary<'a, E>(&self, event: &'a E, room_id: &RoomId) -> Vec<Raw<AnyStateEvent>>
|
||||
where
|
||||
E: Event + Send + Sync,
|
||||
&'a E: Event + Send,
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use conduwuit::{Err, Event, Pdu, Result, implement, is_not_empty, utils::ReadyExt, warn};
|
||||
use conduwuit::{Err, Event, Pdu, Result, implement, is_not_empty, utils::ReadyExt};
|
||||
use database::{Json, serialize_key};
|
||||
use futures::StreamExt;
|
||||
use ruma::{
|
||||
OwnedServerName, RoomId, UserId,
|
||||
CanonicalJsonValue, OwnedServerName, RoomId, UserId,
|
||||
events::{
|
||||
AnyStrippedStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType,
|
||||
StateEventType,
|
||||
@@ -334,7 +334,7 @@ pub async fn mark_as_invited(
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
sender_user: &UserId,
|
||||
last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>,
|
||||
last_state: Option<Vec<CanonicalJsonValue>>,
|
||||
invite_via: Option<Vec<OwnedServerName>>,
|
||||
) -> Result<()> {
|
||||
// return an error for blocked invites. ignored invites aren't handled here
|
||||
|
||||
@@ -163,9 +163,7 @@ pub async fn threads_until<'a>(
|
||||
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
|
||||
|
||||
let pdu_id: PduId = pdu_id.into();
|
||||
if pdu.sender() != user_id {
|
||||
pdu.as_mut_pdu().remove_transaction_id().ok();
|
||||
}
|
||||
pdu.as_mut_pdu().set_unsigned(Some(user_id));
|
||||
|
||||
Some((pdu_id.shorteventid, pdu))
|
||||
});
|
||||
|
||||
@@ -347,6 +347,10 @@ pub async fn append_pdu<'a, Leaves>(
|
||||
| _ => {},
|
||||
}
|
||||
|
||||
// CONCERN: If we receive events with a relation out-of-order, we never write
|
||||
// their relation / thread. We need some kind of way to trigger when we receive
|
||||
// this event, and potentially a way to rebuild the table entirely.
|
||||
|
||||
if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>() {
|
||||
if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await {
|
||||
self.services
|
||||
|
||||
@@ -23,7 +23,7 @@
|
||||
|
||||
use super::RoomMutexGuard;
|
||||
|
||||
pub fn pdu_fits(owned_obj: &mut CanonicalJsonObject) -> bool {
|
||||
pub fn pdu_fits(owned_obj: &CanonicalJsonObject) -> bool {
|
||||
// room IDs, event IDs, senders, types, and state keys must all be <= 255 bytes
|
||||
if let Some(CanonicalJsonValue::String(room_id)) = owned_obj.get("room_id") {
|
||||
if room_id.len() > 255 {
|
||||
@@ -308,7 +308,7 @@ fn from_evt(
|
||||
match self
|
||||
.services
|
||||
.event_handler
|
||||
.ask_policy_server(&pdu, &pdu_json, pdu.room_id().expect("has room ID"))
|
||||
.ask_policy_server(&pdu, &mut pdu_json, pdu.room_id().expect("has room ID"), false)
|
||||
.await
|
||||
{
|
||||
| Ok(true) => {},
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
use std::{borrow::Borrow, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use conduwuit::{
|
||||
Err, PduCount, PduEvent, Result, at, err,
|
||||
result::{LogErr, NotFound},
|
||||
result::NotFound,
|
||||
utils::{self, stream::TryReadyExt},
|
||||
};
|
||||
use database::{Database, Deserialized, Json, KeyVal, Map};
|
||||
use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};
|
||||
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction};
|
||||
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, api::Direction};
|
||||
|
||||
use super::{PduId, RawPduId};
|
||||
use crate::{Dep, rooms, rooms::short::ShortRoomId};
|
||||
@@ -45,12 +45,8 @@ pub(super) fn new(args: &crate::Args<'_>) -> Self {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(super) async fn last_timeline_count(
|
||||
&self,
|
||||
sender_user: Option<&UserId>,
|
||||
room_id: &RoomId,
|
||||
) -> Result<PduCount> {
|
||||
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
|
||||
pub(super) async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
|
||||
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
|
||||
|
||||
pin_mut!(pdus_rev);
|
||||
let last_count = pdus_rev
|
||||
@@ -64,12 +60,8 @@ pub(super) async fn last_timeline_count(
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(super) async fn latest_pdu_in_room(
|
||||
&self,
|
||||
sender_user: Option<&UserId>,
|
||||
room_id: &RoomId,
|
||||
) -> Result<PduEvent> {
|
||||
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
|
||||
pub(super) async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
|
||||
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
|
||||
|
||||
pin_mut!(pdus_rev);
|
||||
pdus_rev
|
||||
@@ -221,7 +213,6 @@ pub(super) async fn replace_pdu(
|
||||
/// order.
|
||||
pub(super) fn pdus_rev<'a>(
|
||||
&'a self,
|
||||
user_id: Option<&'a UserId>,
|
||||
room_id: &'a RoomId,
|
||||
until: PduCount,
|
||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||
@@ -231,14 +222,13 @@ pub(super) fn pdus_rev<'a>(
|
||||
self.pduid_pdu
|
||||
.rev_raw_stream_from(¤t)
|
||||
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
|
||||
.ready_and_then(move |item| Self::each_pdu(item, user_id))
|
||||
.ready_and_then(Self::from_json_slice)
|
||||
})
|
||||
.try_flatten_stream()
|
||||
}
|
||||
|
||||
pub(super) fn pdus<'a>(
|
||||
&'a self,
|
||||
user_id: Option<&'a UserId>,
|
||||
room_id: &'a RoomId,
|
||||
from: PduCount,
|
||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||
@@ -248,21 +238,15 @@ pub(super) fn pdus<'a>(
|
||||
self.pduid_pdu
|
||||
.raw_stream_from(¤t)
|
||||
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
|
||||
.ready_and_then(move |item| Self::each_pdu(item, user_id))
|
||||
.ready_and_then(Self::from_json_slice)
|
||||
})
|
||||
.try_flatten_stream()
|
||||
}
|
||||
|
||||
fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
|
||||
fn from_json_slice((pdu_id, pdu): KeyVal<'_>) -> Result<PdusIterItem> {
|
||||
let pdu_id: RawPduId = pdu_id.into();
|
||||
|
||||
let mut pdu = serde_json::from_slice::<PduEvent>(pdu)?;
|
||||
|
||||
if Some(pdu.sender.borrow()) != user_id {
|
||||
pdu.remove_transaction_id().log_err().ok();
|
||||
}
|
||||
|
||||
pdu.add_age().log_err().ok();
|
||||
let pdu = serde_json::from_slice::<PduEvent>(pdu)?;
|
||||
|
||||
Ok((pdu_id.pdu_count(), pdu))
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
};
|
||||
use futures::{Future, Stream, TryStreamExt, pin_mut};
|
||||
use ruma::{
|
||||
CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
|
||||
CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, RoomId,
|
||||
events::room::encrypted::Relation,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
@@ -138,7 +138,7 @@ pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<impl Event> {
|
||||
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, impl Event)> {
|
||||
let pdus = self.pdus(None, room_id, None);
|
||||
let pdus = self.pdus(room_id, None);
|
||||
|
||||
pin_mut!(pdus);
|
||||
pdus.try_next()
|
||||
@@ -148,16 +148,12 @@ pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, im
|
||||
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<impl Event> {
|
||||
self.db.latest_pdu_in_room(None, room_id).await
|
||||
self.db.latest_pdu_in_room(room_id).await
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub async fn last_timeline_count(
|
||||
&self,
|
||||
sender_user: Option<&UserId>,
|
||||
room_id: &RoomId,
|
||||
) -> Result<PduCount> {
|
||||
self.db.last_timeline_count(sender_user, room_id).await
|
||||
pub async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
|
||||
self.db.last_timeline_count(room_id).await
|
||||
}
|
||||
|
||||
/// Returns the `count` of this pdu's id.
|
||||
@@ -235,33 +231,29 @@ pub async fn replace_pdu(&self, pdu_id: &RawPduId, pdu_json: &CanonicalJsonObjec
|
||||
#[inline]
|
||||
pub fn all_pdus<'a>(
|
||||
&'a self,
|
||||
user_id: &'a UserId,
|
||||
room_id: &'a RoomId,
|
||||
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
|
||||
self.pdus(Some(user_id), room_id, None).ignore_err()
|
||||
self.pdus(room_id, None).ignore_err()
|
||||
}
|
||||
|
||||
/// Reverse iteration starting after `until`.
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub fn pdus_rev<'a>(
|
||||
&'a self,
|
||||
user_id: Option<&'a UserId>,
|
||||
room_id: &'a RoomId,
|
||||
until: Option<PduCount>,
|
||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||
self.db
|
||||
.pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max))
|
||||
.pdus_rev(room_id, until.unwrap_or_else(PduCount::max))
|
||||
}
|
||||
|
||||
/// Forward iteration starting after `from`.
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub fn pdus<'a>(
|
||||
&'a self,
|
||||
user_id: Option<&'a UserId>,
|
||||
room_id: &'a RoomId,
|
||||
from: Option<PduCount>,
|
||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||
self.db
|
||||
.pdus(user_id, room_id, from.unwrap_or_else(PduCount::min))
|
||||
self.db.pdus(room_id, from.unwrap_or_else(PduCount::min))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -781,7 +781,7 @@ async fn send_events_dest_push(
|
||||
|
||||
for pdu in pdus {
|
||||
// Redacted events are not notification targets (we don't send push for them)
|
||||
if pdu.contains_unsigned_property("redacted_because", serde_json::Value::is_string) {
|
||||
if pdu.is_redacted() {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user