Compare commits

..

49 Commits

Author SHA1 Message Date
Jade Ellis c40cc3b236 chore: Release 2026-03-03 20:59:08 +00:00
Jade Ellis 754959e80d fix: Don't process admin escape commands for local users from federation
Reviewed-By: timedout <git@nexy7574.co.uk>
2026-03-03 19:55:50 +00:00
timedout 37888fb670 fix: Limit body read size of remote requests (CWE-409)
Reviewed-By: Jade Ellis <jade@ellis.link>
2026-03-03 19:54:34 +00:00
Jade Ellis 7207398a9e docs: Changelog 2026-03-03 19:39:54 +00:00
Jason Volk 1a7bda209b feat: Implement Dehydrated Devices MSC3814
Co-authored-by: Jade Ellis <jade@ellis.link>
Signed-off-by: Jason Volk <jason@zemos.net>
2026-03-03 19:39:53 +00:00
Autumn Ashton 7e1950b3d2 fix(docker): Fix building a docker container with dev profile
In Rust, the dev profile uses "debug" as the name of the output folder.
2026-03-03 19:31:04 +00:00
timedout b507898c62 fix: Bump ruwuma again 2026-03-03 18:10:28 +00:00
nexy7574 f4af67575e fix: Bump ruwuma to resolve duplicate state error 2026-03-03 06:01:02 +00:00
timedout 6adb99397e feat: Remove MSC4010 support 2026-02-27 17:03:19 +00:00
Renovate Bot 8ce83a8a14 chore(deps): update rust crate axum-extra to 0.12.0 2026-02-25 17:16:35 +00:00
Niklas Wojtkowiak 052c4dfa21 fix(sync): don't override sliding sync v5 list range start to zero 2026-02-24 13:59:33 +00:00
lynxize a43dee1728 fix: Don't show successful media deletion as an error
Fixes !admin media delete --mxc <url> responding with an error message
when the media was deleted successfully.
2026-02-23 22:02:34 -07:00
Niklas Wojtkowiak 763d9b3de8 fixup! fix(api): restore backwards compatibility for RTC foci config 2026-02-23 18:10:25 -05:00
Niklas Wojtkowiak 1e6d95583c chore(deps): update ruwuma revision 2026-02-23 23:01:15 +00:00
Niklas Wojtkowiak 8a254a33cc fix(api): restore backwards compatibility for RTC foci config 2026-02-23 23:01:15 +00:00
Niklas Wojtkowiak c97dd54766 chore(changelog): add news fragment for #1442 2026-02-23 23:01:15 +00:00
Niklas Wojtkowiak 8ddb7c70c0 feat(api): implement MSC4143 RTC transports discovery endpoint
Add dedicated \`GET /_matrix/client/v1/rtc/transports\` and \`GET /_matrix/client/unstable/org.matrix.msc4143/rtc/transports\` endpoints for MatrixRTC focus discovery (MSC4143), replacing the deprecated well-known approach.

Move RTC foci configuration from \`[global.well_known]\` into a new \`[global.matrix_rtc]\` config section with a \`foci\` field. Remove \`rtc_foci\` from the \`.well-known/matrix/client\` response. Update LiveKit setup documentation accordingly.

Closes #1431
2026-02-23 23:01:15 +00:00
Niklas Wojtkowiak cb9786466b chore(changelog): add news fragment for #1441 2026-02-23 17:59:13 +00:00
Niklas Wojtkowiak 18d2662b01 fix(config): remove allow_public_room_directory_without_auth 2026-02-23 17:59:13 +00:00
timedout 558262dd1f chore: Refactor transaction_ids -> transactions 2026-02-23 17:44:35 +00:00
timedout d311b87579 chore: Fix incorrect capitalisation
I didn't realise I agreed to take an English class with @ginger while
working on this server lol
2026-02-23 17:25:12 +00:00
timedout 8702f55cf5 fix: Don't panic if nobody's listening 2026-02-23 17:22:37 +00:00
timedout d4481b07ac chore: Add news frag 2026-02-23 16:54:54 +00:00
Jade Ellis 92351df925 refactor: Make federation transaction handle errors correctly
We have a dedicated error type that's then matched.
Event sorting is now infallible.
Could probably be cleaned up in a bit.
2026-02-23 16:36:46 +00:00
Jade Ellis 47e2733ea1 refactor: Make stream utils generic over the error type 2026-02-23 16:36:46 +00:00
Jade Ellis 6637e4c6a7 fix: Clean up cache, prevent several race conditions
We use one map which is only ever held for a short time.
2026-02-23 16:36:46 +00:00
nexy7574 35e441452f feat: Attempt to build localised DAG before processing PDUs 2026-02-23 16:36:46 +00:00
nexy7574 66bbb655bf feat: Warn when server is overloaded 2026-02-23 16:36:45 +00:00
nexy7574 81b202ce51 chore: Decrease transaction log verbosity 2026-02-23 16:36:45 +00:00
nexy7574 4657844d46 feat: Show active transaction handle count in !admin federation incoming-federation 2026-02-23 16:36:45 +00:00
nexy7574 9016cd11a6 chore: Run pre-commit and clippy to fix inherited CI errs 2026-02-23 16:36:45 +00:00
nexy7574 dd70094719 feat: Make max_active_txns actually configurable 2026-02-23 16:36:45 +00:00
nexy7574 fcd49b7ab3 fix: Remove duplicate fields from logs 2026-02-23 16:36:45 +00:00
nexy7574 470c9b52dd feat: Instrument process_inbound_transaction 2026-02-23 16:36:45 +00:00
nexy7574 0d8cafc329 feat: Support casting transaction processing to the background 2026-02-23 16:36:44 +00:00
nexy7574 2f9956ddca feat: Add helper functions for federation channels 2026-02-23 16:36:44 +00:00
nexy7574 21a97cdd0b chore: Refactor existing references to transaction service 2026-02-23 16:36:44 +00:00
nexy7574 e986cd4536 feat(federation): Restructure transaction_ids service
Adds two new in-memory maps to the service in to prepare for better handlers
2026-02-23 16:36:40 +00:00
Shane Jaroch 526d862296 fix: more aggressive user agent for URL preview
adding "facebookexternalhit" alongside "embedbot" fixes many errors, such as YouTube Music's:
    "Your browser is deprecated. Please upgrade."

add admin command to clear URL stuck and broken data (per URL currently)

    add command to clear all saved URL previews.
    sync resolver docs.
2026-02-23 15:24:14 +00:00
Ben Botwin fbeb5bf186 report permission denied errors 2026-02-23 15:22:18 +00:00
Ben Botwin a336f2df44 fixed formatting 2026-02-23 15:22:18 +00:00
Ben Botwin 19b78ec73e made error handling more concise 2026-02-23 15:22:18 +00:00
Ben Botwin 27ff2d9363 added more granular error handling for other file fetch function 2026-02-23 15:22:18 +00:00
Ben Botwin 50fa8c3abf ran format 2026-02-23 15:22:18 +00:00
Ben Botwin 18c4be869f added handling for other potential errors 2026-02-23 15:22:18 +00:00
Ben Botwin fc00b96d8b Added proper 404 for not found media and fixed devshell for running tests 2026-02-23 15:22:18 +00:00
Jade Ellis fa4156d8a6 docs: Changelog 2026-02-22 21:19:20 +00:00
Jade Ellis 23638cd714 feat(appservices): MSC3202 Device masquerading for appservices 2026-02-22 21:19:20 +00:00
Raven 9f1a483e76 docs: Add information about partnered homeservers to the introduction page & update README.md
Includes step-by-step directions to ease the lift for those who have ended up
here and who have never created a matrix account or used matrix before in the
past.

Also updates the information in README.md to match, as these should generally be identical.
2026-02-21 18:51:56 -08:00
107 changed files with 1854 additions and 1255 deletions
Generated
+45 -53
View File
@@ -445,13 +445,14 @@ dependencies = [
[[package]]
name = "axum-extra"
version = "0.10.3"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9963ff19f40c6102c76756ef0a46004c0d58957d87259fc9208ff8441c12ab96"
checksum = "fef252edff26ddba56bbcdf2ee3307b8129acb86f5749b68990c168a6fcc9c76"
dependencies = [
"axum",
"axum-core",
"bytes",
"futures-core",
"futures-util",
"headers",
"http",
@@ -459,8 +460,6 @@ dependencies = [
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"serde_core",
"tower-layer",
"tower-service",
"tracing",
@@ -888,7 +887,7 @@ dependencies = [
[[package]]
name = "conduwuit"
version = "0.5.5"
version = "0.5.6-alpha"
dependencies = [
"clap",
"conduwuit_admin",
@@ -920,7 +919,7 @@ dependencies = [
[[package]]
name = "conduwuit_admin"
version = "0.5.5"
version = "0.5.6-alpha"
dependencies = [
"clap",
"conduwuit_api",
@@ -941,7 +940,7 @@ dependencies = [
[[package]]
name = "conduwuit_api"
version = "0.5.5"
version = "0.5.6-alpha"
dependencies = [
"async-trait",
"axum",
@@ -973,14 +972,14 @@ dependencies = [
[[package]]
name = "conduwuit_build_metadata"
version = "0.5.5"
version = "0.5.6-alpha"
dependencies = [
"built",
]
[[package]]
name = "conduwuit_core"
version = "0.5.5"
version = "0.5.6-alpha"
dependencies = [
"argon2",
"arrayvec",
@@ -1014,7 +1013,6 @@ dependencies = [
"nix",
"num-traits",
"parking_lot",
"paste",
"rand 0.10.0",
"rand_core 0.6.4",
"regex",
@@ -1028,7 +1026,7 @@ dependencies = [
"serde_regex",
"smallstr",
"smallvec",
"snafu",
"thiserror 2.0.18",
"tikv-jemalloc-ctl",
"tikv-jemalloc-sys",
"tikv-jemallocator",
@@ -1043,7 +1041,7 @@ dependencies = [
[[package]]
name = "conduwuit_database"
version = "0.5.5"
version = "0.5.6-alpha"
dependencies = [
"async-channel",
"conduwuit_core",
@@ -1061,7 +1059,7 @@ dependencies = [
[[package]]
name = "conduwuit_macros"
version = "0.5.5"
version = "0.5.6-alpha"
dependencies = [
"itertools 0.14.0",
"proc-macro2",
@@ -1071,7 +1069,7 @@ dependencies = [
[[package]]
name = "conduwuit_router"
version = "0.5.5"
version = "0.5.6-alpha"
dependencies = [
"axum",
"axum-client-ip",
@@ -1105,7 +1103,7 @@ dependencies = [
[[package]]
name = "conduwuit_service"
version = "0.5.5"
version = "0.5.6-alpha"
dependencies = [
"askama",
"async-trait",
@@ -1147,7 +1145,7 @@ dependencies = [
[[package]]
name = "conduwuit_web"
version = "0.5.5"
version = "0.5.6-alpha"
dependencies = [
"askama",
"axum",
@@ -1155,7 +1153,7 @@ dependencies = [
"conduwuit_service",
"futures",
"rand 0.10.0",
"snafu",
"thiserror 2.0.18",
"tracing",
]
@@ -1223,7 +1221,7 @@ dependencies = [
[[package]]
name = "continuwuity-admin-api"
version = "0.1.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=e087ff15888156942ca2ffe6097d1b4c3fd27628#e087ff15888156942ca2ffe6097d1b4c3fd27628"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=bb12ed288a31a23aa11b10ba0fad22b7f985eb88#bb12ed288a31a23aa11b10ba0fad22b7f985eb88"
dependencies = [
"ruma-common",
"serde",
@@ -1602,7 +1600,7 @@ dependencies = [
[[package]]
name = "draupnir-antispam"
version = "0.1.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=e087ff15888156942ca2ffe6097d1b4c3fd27628#e087ff15888156942ca2ffe6097d1b4c3fd27628"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=bb12ed288a31a23aa11b10ba0fad22b7f985eb88#bb12ed288a31a23aa11b10ba0fad22b7f985eb88"
dependencies = [
"ruma-common",
"serde",
@@ -3004,7 +3002,7 @@ checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79"
[[package]]
name = "meowlnir-antispam"
version = "0.1.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=e087ff15888156942ca2ffe6097d1b4c3fd27628#e087ff15888156942ca2ffe6097d1b4c3fd27628"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=bb12ed288a31a23aa11b10ba0fad22b7f985eb88#bb12ed288a31a23aa11b10ba0fad22b7f985eb88"
dependencies = [
"ruma-common",
"serde",
@@ -4057,12 +4055,14 @@ dependencies = [
"sync_wrapper",
"tokio",
"tokio-rustls",
"tokio-util",
"tower",
"tower-http",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots",
]
@@ -4096,7 +4096,7 @@ dependencies = [
[[package]]
name = "ruma"
version = "0.10.1"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=e087ff15888156942ca2ffe6097d1b4c3fd27628#e087ff15888156942ca2ffe6097d1b4c3fd27628"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=bb12ed288a31a23aa11b10ba0fad22b7f985eb88#bb12ed288a31a23aa11b10ba0fad22b7f985eb88"
dependencies = [
"assign",
"continuwuity-admin-api",
@@ -4119,7 +4119,7 @@ dependencies = [
[[package]]
name = "ruma-appservice-api"
version = "0.10.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=e087ff15888156942ca2ffe6097d1b4c3fd27628#e087ff15888156942ca2ffe6097d1b4c3fd27628"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=bb12ed288a31a23aa11b10ba0fad22b7f985eb88#bb12ed288a31a23aa11b10ba0fad22b7f985eb88"
dependencies = [
"js_int",
"ruma-common",
@@ -4131,7 +4131,7 @@ dependencies = [
[[package]]
name = "ruma-client-api"
version = "0.18.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=e087ff15888156942ca2ffe6097d1b4c3fd27628#e087ff15888156942ca2ffe6097d1b4c3fd27628"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=bb12ed288a31a23aa11b10ba0fad22b7f985eb88#bb12ed288a31a23aa11b10ba0fad22b7f985eb88"
dependencies = [
"as_variant",
"assign",
@@ -4154,7 +4154,7 @@ dependencies = [
[[package]]
name = "ruma-common"
version = "0.13.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=e087ff15888156942ca2ffe6097d1b4c3fd27628#e087ff15888156942ca2ffe6097d1b4c3fd27628"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=bb12ed288a31a23aa11b10ba0fad22b7f985eb88#bb12ed288a31a23aa11b10ba0fad22b7f985eb88"
dependencies = [
"as_variant",
"base64 0.22.1",
@@ -4186,7 +4186,7 @@ dependencies = [
[[package]]
name = "ruma-events"
version = "0.28.1"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=e087ff15888156942ca2ffe6097d1b4c3fd27628#e087ff15888156942ca2ffe6097d1b4c3fd27628"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=bb12ed288a31a23aa11b10ba0fad22b7f985eb88#bb12ed288a31a23aa11b10ba0fad22b7f985eb88"
dependencies = [
"as_variant",
"indexmap",
@@ -4211,7 +4211,7 @@ dependencies = [
[[package]]
name = "ruma-federation-api"
version = "0.9.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=e087ff15888156942ca2ffe6097d1b4c3fd27628#e087ff15888156942ca2ffe6097d1b4c3fd27628"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=bb12ed288a31a23aa11b10ba0fad22b7f985eb88#bb12ed288a31a23aa11b10ba0fad22b7f985eb88"
dependencies = [
"bytes",
"headers",
@@ -4233,7 +4233,7 @@ dependencies = [
[[package]]
name = "ruma-identifiers-validation"
version = "0.9.5"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=e087ff15888156942ca2ffe6097d1b4c3fd27628#e087ff15888156942ca2ffe6097d1b4c3fd27628"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=bb12ed288a31a23aa11b10ba0fad22b7f985eb88#bb12ed288a31a23aa11b10ba0fad22b7f985eb88"
dependencies = [
"js_int",
"thiserror 2.0.18",
@@ -4242,7 +4242,7 @@ dependencies = [
[[package]]
name = "ruma-identity-service-api"
version = "0.9.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=e087ff15888156942ca2ffe6097d1b4c3fd27628#e087ff15888156942ca2ffe6097d1b4c3fd27628"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=bb12ed288a31a23aa11b10ba0fad22b7f985eb88#bb12ed288a31a23aa11b10ba0fad22b7f985eb88"
dependencies = [
"js_int",
"ruma-common",
@@ -4252,7 +4252,7 @@ dependencies = [
[[package]]
name = "ruma-macros"
version = "0.13.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=e087ff15888156942ca2ffe6097d1b4c3fd27628#e087ff15888156942ca2ffe6097d1b4c3fd27628"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=bb12ed288a31a23aa11b10ba0fad22b7f985eb88#bb12ed288a31a23aa11b10ba0fad22b7f985eb88"
dependencies = [
"cfg-if",
"proc-macro-crate",
@@ -4267,7 +4267,7 @@ dependencies = [
[[package]]
name = "ruma-push-gateway-api"
version = "0.9.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=e087ff15888156942ca2ffe6097d1b4c3fd27628#e087ff15888156942ca2ffe6097d1b4c3fd27628"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=bb12ed288a31a23aa11b10ba0fad22b7f985eb88#bb12ed288a31a23aa11b10ba0fad22b7f985eb88"
dependencies = [
"js_int",
"ruma-common",
@@ -4279,7 +4279,7 @@ dependencies = [
[[package]]
name = "ruma-signatures"
version = "0.15.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=e087ff15888156942ca2ffe6097d1b4c3fd27628#e087ff15888156942ca2ffe6097d1b4c3fd27628"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=bb12ed288a31a23aa11b10ba0fad22b7f985eb88#bb12ed288a31a23aa11b10ba0fad22b7f985eb88"
dependencies = [
"base64 0.22.1",
"ed25519-dalek",
@@ -4912,27 +4912,6 @@ dependencies = [
"serde",
]
[[package]]
name = "snafu"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e84b3f4eacbf3a1ce05eac6763b4d629d60cbc94d632e4092c54ade71f1e1a2"
dependencies = [
"snafu-derive",
]
[[package]]
name = "snafu-derive"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1c97747dbf44bb1ca44a561ece23508e99cb592e862f22222dcf42f51d1e451"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "socket2"
version = "0.5.10"
@@ -5891,6 +5870,19 @@ dependencies = [
"wasmparser",
]
[[package]]
name = "wasm-streams"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
dependencies = [
"futures-util",
"js-sys",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "wasmparser"
version = "0.244.0"
@@ -6355,7 +6347,7 @@ dependencies = [
[[package]]
name = "xtask"
version = "0.5.5"
version = "0.5.6-alpha"
dependencies = [
"askama",
"cargo_metadata",
+8 -10
View File
@@ -12,7 +12,7 @@ license = "Apache-2.0"
# See also `rust-toolchain.toml`
readme = "README.md"
repository = "https://forgejo.ellis.link/continuwuation/continuwuity"
version = "0.5.5"
version = "0.5.6-alpha"
[workspace.metadata.crane]
name = "conduwuit"
@@ -97,7 +97,7 @@ features = [
]
[workspace.dependencies.axum-extra]
version = "0.10.1"
version = "0.12.0"
default-features = false
features = ["typed-header", "tracing"]
@@ -144,6 +144,7 @@ features = [
"socks",
"hickory-dns",
"http2",
"stream",
]
[workspace.dependencies.serde]
@@ -307,14 +308,9 @@ features = [
]
# Used for conduwuit::Error type
[workspace.dependencies.snafu]
version = "0.8"
[workspace.dependencies.thiserror]
version = "2.0.12"
default-features = false
features = ["std", "rust_1_81"]
# Used for macro name generation
[workspace.dependencies.paste]
version = "1.0"
# Used when hashing the state
[workspace.dependencies.ring]
@@ -348,7 +344,7 @@ version = "0.1.2"
[workspace.dependencies.ruma]
git = "https://forgejo.ellis.link/continuwuation/ruwuma"
#branch = "conduwuit-changes"
rev = "e087ff15888156942ca2ffe6097d1b4c3fd27628"
rev = "bb12ed288a31a23aa11b10ba0fad22b7f985eb88"
features = [
"compat",
"rand",
@@ -368,6 +364,7 @@ features = [
"unstable-msc2870",
"unstable-msc3026",
"unstable-msc3061",
"unstable-msc3814",
"unstable-msc3245",
"unstable-msc3266",
"unstable-msc3381", # polls
@@ -386,6 +383,7 @@ features = [
"unstable-pdu",
"unstable-msc4155",
"unstable-msc4143", # livekit well_known response
"unstable-msc4284"
]
[workspace.dependencies.rust-rocksdb]
+8 -3
View File
@@ -57,10 +57,15 @@ ### What are the project's goals?
### Can I try it out?
Check out the [documentation](https://continuwuity.org) for installation instructions, or join one of these vetted public homeservers running Continuwuity to get a feel for things!
Check out the [documentation](https://continuwuity.org) for installation instructions.
- https://continuwuity.rocks -- A public demo server operated by the Continuwuity Team.
- https://federated.nexus -- Federated Nexus is a community resource hosting multiple FOSS (especially federated) services, including Matrix and Forgejo.
If you want to try it out as a user, we have some partnered homeservers you can use:
* You can head over to [https://federated.nexus](https://federated.nexus/) in your browser.
* Hit the `Apply to Join` button. Once your request has been accepted, you will receive an email with your username and password.
* Head over to [https://app.federated.nexus](https://app.federated.nexus/) and you can sign in there, or use any other matrix chat client you wish elsewhere.
* Your username for matrix will be in the form of `@username:federated.nexus`, however you can simply use the `username` part to log in. Your password is your password.
* There's also [https://continuwuity.rocks/](https://continuwuity.rocks/). You can register a new account using Cinny via [this convenient link](https://app.cinny.in/register/continuwuity.rocks), or you can use Element or another matrix client *that supports registration*.
### What are we working on?
+1
View File
@@ -0,0 +1 @@
Improved the concurrency handling of federation transactions, vastly improving performance and reliability by more accurately handling inbound transactions and reducing the amount of repeated wasted work. Contributed by @nex and @Jade.
+1
View File
@@ -0,0 +1 @@
Added MSC3202 Device masquerading (not all of MSC3202). This should fix issues with enabling MSC4190 for some Mautrix bridges. Contributed by @Jade
+1
View File
@@ -0,0 +1 @@
Added MSC3814 Dehydrated Devices - you can now decrypt messages sent while all devices were logged out.
+1
View File
@@ -0,0 +1 @@
Removed the `allow_public_room_directory_without_auth` config option. Contributed by @0xnim.
+1
View File
@@ -0,0 +1 @@
Implement MSC4143 MatrixRTC transport discovery endpoint. Move RTC foci configuration from `[global.well_known]` to a new `[global.matrix_rtc]` section with a `foci` field. Contributed by @0xnim
+1
View File
@@ -0,0 +1 @@
Fixed sliding sync v5 list ranges always starting from 0, causing extra rooms to be unnecessarily processed and returned. Contributed by @0xnim
+1
View File
@@ -0,0 +1 @@
Improved URL preview fetching with a more compatible user agent for sites like YouTube Music. Added `!admin media delete-url-preview <url>` command to clear cached URL previews that were stuck and broken.
+15 -3
View File
@@ -15,6 +15,18 @@ disallowed-macros = [
{ path = "log::trace", reason = "use conduwuit_core::trace" },
]
disallowed-methods = [
{ path = "tokio::spawn", reason = "use and pass conduuwit_core::server::Server::runtime() to spawn from" },
]
[[disallowed-methods]]
path = "tokio::spawn"
reason = "use and pass conduwuit_core::server::Server::runtime() to spawn from"
[[disallowed-methods]]
path = "reqwest::Response::bytes"
reason = "bytes is unsafe, use limit_read via the conduwuit_core::utils::LimitReadExt trait instead"
[[disallowed-methods]]
path = "reqwest::Response::text"
reason = "text is unsafe, use limit_read_text via the conduwuit_core::utils::LimitReadExt trait instead"
[[disallowed-methods]]
path = "reqwest::Response::json"
reason = "json is unsafe, use limit_read_text via the conduwuit_core::utils::LimitReadExt trait instead"
-1
View File
@@ -9,7 +9,6 @@ address = "0.0.0.0"
allow_device_name_federation = true
allow_guest_registration = true
allow_public_room_directory_over_federation = true
allow_public_room_directory_without_auth = true
allow_registration = true
database_path = "/database"
log = "trace,h2=debug,hyper=debug"
+42 -13
View File
@@ -290,6 +290,25 @@
#
#max_fetch_prev_events = 192
# How many incoming federation transactions the server is willing to be
# processing at any given time before it becomes overloaded and starts
# rejecting further transactions until some slots become available.
#
# Setting this value too low or too high may result in unstable
# federation, and setting it too high may cause runaway resource usage.
#
#max_concurrent_inbound_transactions = 150
# Maximum age (in seconds) for cached federation transaction responses.
# Entries older than this will be removed during cleanup.
#
#transaction_id_cache_max_age_secs = 7200 (2 hours)
# Maximum number of cached federation transaction responses.
# When the cache exceeds this limit, older entries will be removed.
#
#transaction_id_cache_max_entries = 8192
# Default/base connection timeout (seconds). This is used only by URL
# previews and update/news endpoint checks.
#
@@ -527,12 +546,6 @@
#
#allow_public_room_directory_over_federation = false
# Set this to true to allow your server's public room directory to be
# queried without client authentication (access token) through the Client
# APIs. Set this to false to protect against /publicRooms spiders.
#
#allow_public_room_directory_without_auth = false
# Allow guests/unauthenticated users to access TURN credentials.
#
# This is the equivalent of Synapse's `turn_allow_guests` config option.
@@ -1831,14 +1844,13 @@
#
#support_mxid =
# A list of MatrixRTC foci URLs which will be served as part of the
# MSC4143 client endpoint at /.well-known/matrix/client. If you're
# setting up livekit, you'd want something like:
# rtc_focus_server_urls = [
# { type = "livekit", livekit_service_url = "https://livekit.example.com" },
# ]
# **DEPRECATED**: Use `[global.matrix_rtc].foci` instead.
#
# To disable, set this to be an empty vector (`[]`).
# A list of MatrixRTC foci URLs which will be served as part of the
# MSC4143 client endpoint at /.well-known/matrix/client.
#
# This option is deprecated and will be removed in a future release.
# Please migrate to the new `[global.matrix_rtc]` config section.
#
#rtc_focus_server_urls = []
@@ -1860,6 +1872,23 @@
#
#blurhash_max_raw_size = 33554432
[global.matrix_rtc]
# A list of MatrixRTC foci (transports) which will be served via the
# MSC4143 RTC transports endpoint at
# `/_matrix/client/v1/rtc/transports`. If you're setting up livekit,
# you'd want something like:
# ```toml
# [global.matrix_rtc]
# foci = [
# { type = "livekit", livekit_service_url = "https://livekit.example.com" },
# ]
# ```
#
# To disable, set this to an empty list (`[]`).
#
#foci = []
[global.ldap]
# Whether to enable LDAP login.
+7 -2
View File
@@ -180,6 +180,11 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
export RUSTFLAGS="${RUSTFLAGS}"
fi
RUST_PROFILE_DIR="${RUST_PROFILE}"
if [[ "${RUST_PROFILE}" == "dev" ]]; then
RUST_PROFILE_DIR="debug"
fi
TARGET_DIR=($(cargo metadata --no-deps --format-version 1 | \
jq -r ".target_directory"))
mkdir /out/sbin
@@ -191,8 +196,8 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
jq -r ".packages[] | select(.name == \"$PACKAGE\") | .targets[] | select( .kind | map(. == \"bin\") | any ) | .name"))
for BINARY in "${BINARIES[@]}"; do
echo $BINARY
xx-verify $TARGET_DIR/$(xx-cargo --print-target-triple)/${RUST_PROFILE}/$BINARY
cp $TARGET_DIR/$(xx-cargo --print-target-triple)/${RUST_PROFILE}/$BINARY /out/sbin/$BINARY
xx-verify $TARGET_DIR/$(xx-cargo --print-target-triple)/${RUST_PROFILE_DIR}/$BINARY
cp $TARGET_DIR/$(xx-cargo --print-target-triple)/${RUST_PROFILE_DIR}/$BINARY /out/sbin/$BINARY
done
EOF
+4 -32
View File
@@ -78,47 +78,19 @@ #### Firewall hints
### 3. Telling clients where to find LiveKit
To tell clients where to find LiveKit, you need to add the address of your `lk-jwt-service` to your client .well-known file. To do so, in the config section `global.well-known`, add (or modify) the option `rtc_focus_server_urls`.
To tell clients where to find LiveKit, you need to add the address of your `lk-jwt-service` to the `[global.matrix_rtc]` config section using the `foci` option.
The variable should be a list of servers serving as MatrixRTC endpoints to serve in the well-known file to the client.
The variable should be a list of servers serving as MatrixRTC endpoints. Clients discover these via the `/_matrix/client/v1/rtc/transports` endpoint (MSC4143).
```toml
rtc_focus_server_urls = [
[global.matrix_rtc]
foci = [
{ type = "livekit", livekit_service_url = "https://livekit.example.com" },
]
```
Remember to replace the URL with the address you are deploying your instance of lk-jwt-service to.
#### Serving .well-known manually
If you don't let Continuwuity serve your `.well-known` files, you need to add the following lines to your `.well-known/matrix/client` file, remembering to replace the URL with your own `lk-jwt-service` deployment:
```json
"org.matrix.msc4143.rtc_foci": [
{
"type": "livekit",
"livekit_service_url": "https://livekit.example.com"
}
]
```
The final file should look something like this:
```json
{
"m.homeserver": {
"base_url":"https://matrix.example.com"
},
"org.matrix.msc4143.rtc_foci": [
{
"type": "livekit",
"livekit_service_url": "https://livekit.example.com"
}
]
}
```
### 4. Configure your Reverse Proxy
Reverse proxies can be configured in many different ways - so we can't provide a step by step for this.
+7 -1
View File
@@ -51,7 +51,13 @@ ## Can I try it out?
Check out the [documentation](https://continuwuity.org) for installation instructions.
There are currently no open registration continuwuity instances available.
If you want to try it out as a user, we have some partnered homeservers you can use:
* You can head over to [https://federated.nexus](https://federated.nexus/) in your browser.
* Hit the `Apply to Join` button. Once your request has been accepted, you will receive an email with your username and password.
* Head over to [https://app.federated.nexus](https://app.federated.nexus/) and you can sign in there, or use any other matrix chat client you wish elsewhere.
* Your username for matrix will be in the form of `@username:federated.nexus`, however you can simply use the `username` part to log in. Your password is your password.
* There's also [https://continuwuity.rocks/](https://continuwuity.rocks/). You can register a new account using Cinny via [this convenient link](https://app.cinny.in/register/continuwuity.rocks), or you can use Element or another matrix client *that supports registration*.
## What are we working on?
+4
View File
@@ -36,3 +36,7 @@ ## `!admin media delete-all-from-user`
## `!admin media delete-all-from-server`
Deletes all remote media from the specified remote server. This will always ignore errors by default
## `!admin media delete-url-preview`
Deletes a cached URL preview, forcing it to be re-fetched. Use --all to purge all cached URL previews
+15 -4
View File
@@ -1,6 +1,6 @@
use std::fmt::Write;
use conduwuit::{Err, Result};
use conduwuit::{Err, Result, utils::response::LimitReadExt};
use futures::StreamExt;
use ruma::{OwnedRoomId, OwnedServerName, OwnedUserId};
@@ -30,12 +30,15 @@ pub(super) async fn incoming_federation(&self) -> Result {
.federation_handletime
.read();
let mut msg = format!("Handling {} incoming pdus:\n", map.len());
let mut msg = format!(
"Handling {} incoming PDUs across {} active transactions:\n",
map.len(),
self.services.transactions.txn_active_handle_count()
);
for (r, (e, i)) in map.iter() {
let elapsed = i.elapsed();
writeln!(msg, "{} {}: {}m{}s", r, e, elapsed.as_secs() / 60, elapsed.as_secs() % 60)?;
}
msg
};
@@ -52,7 +55,15 @@ pub(super) async fn fetch_support_well_known(&self, server_name: OwnedServerName
.send()
.await?;
let text = response.text().await?;
let text = response
.limit_read_text(
self.services
.config
.max_request_size
.try_into()
.expect("u64 fits into usize"),
)
.await?;
if text.is_empty() {
return Err!("Response text/body is empty.");
+19 -1
View File
@@ -29,7 +29,9 @@ pub(super) async fn delete(
.delete(&mxc.as_str().try_into()?)
.await?;
return Err!("Deleted the MXC from our database and on our filesystem.",);
return self
.write_str("Deleted the MXC from our database and on our filesystem.")
.await;
}
if let Some(event_id) = event_id {
@@ -388,3 +390,19 @@ pub(super) async fn get_remote_thumbnail(
self.write_str(&format!("```\n{result:#?}\nreceived {len} bytes for file content.\n```"))
.await
}
#[admin_command]
pub(super) async fn delete_url_preview(&self, url: Option<String>, all: bool) -> Result {
if all {
self.services.media.clear_url_previews().await;
return self.write_str("Deleted all cached URL previews.").await;
}
let url = url.expect("clap enforces url is required unless --all");
self.services.media.remove_url_preview(&url).await?;
self.write_str(&format!("Deleted cached URL preview for: {url}"))
.await
}
+12
View File
@@ -108,4 +108,16 @@ pub enum MediaCommand {
#[arg(long, default_value("800"))]
height: u32,
},
/// Deletes a cached URL preview, forcing it to be re-fetched.
/// Use --all to purge all cached URL previews.
DeleteUrlPreview {
/// The URL to clear from the saved preview data
#[arg(required_unless_present = "all")]
url: Option<String>,
/// Purge all cached URL previews
#[arg(long, conflicts_with = "url")]
all: bool,
},
}
+1 -1
View File
@@ -209,7 +209,7 @@ pub(super) async fn compact(
let parallelism = parallelism.unwrap_or(1);
let results = maps
.into_iter()
.try_stream()
.try_stream::<conduwuit::Error>()
.paralleln_and_then(runtime, parallelism, move |map| {
map.compact_blocking(options.clone())?;
Ok(map.name().to_owned())
+11 -1
View File
@@ -20,7 +20,17 @@ pub enum ResolverCommand {
name: Option<String>,
},
/// Flush a specific server from the resolver caches or everything
/// Flush a given server from the resolver caches or flush them completely
///
/// * Examples:
/// * Flush a specific server:
///
/// `!admin query resolver flush-cache matrix.example.com`
///
/// * Flush all resolver caches completely:
///
/// `!admin query resolver flush-cache --all`
#[command(verbatim_doc_comment)]
FlushCache {
name: Option<OwnedServerName>,
+7 -7
View File
@@ -3,7 +3,7 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Event, Result, debug_info, err, error, info,
Err, Error, Event, Result, debug_info, err, error, info,
matrix::pdu::PduBuilder,
utils::{self, ReadyExt, stream::BroadbandExt},
warn,
@@ -387,7 +387,7 @@ pub(crate) async fn register_route(
)
.await?;
if !worked {
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
}
// Success!
},
@@ -401,7 +401,7 @@ pub(crate) async fn register_route(
&uiaainfo,
json,
);
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
},
| _ => {
return Err!(Request(NotJson("JSON body is not valid")));
@@ -661,7 +661,7 @@ pub(crate) async fn change_password_route(
.await?;
if !worked {
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
}
// Success!
@@ -673,7 +673,7 @@ pub(crate) async fn change_password_route(
.uiaa
.create(sender_user, body.sender_device(), &uiaainfo, json);
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
},
| _ => {
return Err!(Request(NotJson("JSON body is not valid")));
@@ -791,7 +791,7 @@ pub(crate) async fn deactivate_route(
.await?;
if !worked {
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
}
// Success!
},
@@ -802,7 +802,7 @@ pub(crate) async fn deactivate_route(
.uiaa
.create(sender_user, body.sender_device(), &uiaainfo, json);
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
},
| _ => {
return Err!(Request(NotJson("JSON body is not valid")));
+1 -7
View File
@@ -9,7 +9,7 @@
},
events::{
AnyGlobalAccountDataEventContent, AnyRoomAccountDataEventContent,
GlobalAccountDataEventType, RoomAccountDataEventType,
RoomAccountDataEventType,
},
serde::Raw,
};
@@ -126,12 +126,6 @@ async fn set_account_data(
)));
}
if event_type_s == GlobalAccountDataEventType::PushRules.to_cow_str() {
return Err!(Request(BadJson(
"This endpoint cannot be used for setting/configuring push rules."
)));
}
let data: serde_json::Value = serde_json::from_str(data.get())
.map_err(|e| err!(Request(BadJson(warn!("Invalid JSON provided: {e}")))))?;
+121
View File
@@ -0,0 +1,121 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, at};
use futures::StreamExt;
use ruma::api::client::dehydrated_device::{
delete_dehydrated_device::unstable as delete_dehydrated_device,
get_dehydrated_device::unstable as get_dehydrated_device, get_events::unstable as get_events,
put_dehydrated_device::unstable as put_dehydrated_device,
};
use crate::Ruma;
const MAX_BATCH_EVENTS: usize = 50;
/// # `PUT /_matrix/client/../dehydrated_device`
///
/// Creates or overwrites the user's dehydrated device.
#[tracing::instrument(skip_all, fields(%client))]
pub(crate) async fn put_dehydrated_device_route(
State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp,
body: Ruma<put_dehydrated_device::Request>,
) -> Result<put_dehydrated_device::Response> {
let sender_user = body
.sender_user
.as_deref()
.expect("AccessToken authentication required");
let device_id = body.body.device_id.clone();
services
.users
.set_dehydrated_device(sender_user, body.body)
.await?;
Ok(put_dehydrated_device::Response { device_id })
}
/// # `DELETE /_matrix/client/../dehydrated_device`
///
/// Deletes the user's dehydrated device without replacement.
#[tracing::instrument(skip_all, fields(%client))]
pub(crate) async fn delete_dehydrated_device_route(
State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp,
body: Ruma<delete_dehydrated_device::Request>,
) -> Result<delete_dehydrated_device::Response> {
let sender_user = body.sender_user();
let device_id = services.users.get_dehydrated_device_id(sender_user).await?;
services.users.remove_device(sender_user, &device_id).await;
Ok(delete_dehydrated_device::Response { device_id })
}
/// # `GET /_matrix/client/../dehydrated_device`
///
/// Gets the user's dehydrated device
#[tracing::instrument(skip_all, fields(%client))]
pub(crate) async fn get_dehydrated_device_route(
State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp,
body: Ruma<get_dehydrated_device::Request>,
) -> Result<get_dehydrated_device::Response> {
let sender_user = body.sender_user();
let device = services.users.get_dehydrated_device(sender_user).await?;
Ok(get_dehydrated_device::Response {
device_id: device.device_id,
device_data: device.device_data,
})
}
/// # `GET /_matrix/client/../dehydrated_device/{device_id}/events`
///
/// Paginates the events of the dehydrated device.
#[tracing::instrument(skip_all, fields(%client))]
pub(crate) async fn get_dehydrated_events_route(
State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp,
body: Ruma<get_events::Request>,
) -> Result<get_events::Response> {
let sender_user = body.sender_user();
let device_id = &body.body.device_id;
let existing_id = services.users.get_dehydrated_device_id(sender_user).await;
if existing_id.as_ref().is_err()
|| existing_id
.as_ref()
.is_ok_and(|existing_id| existing_id != device_id)
{
return Err!(Request(Forbidden("Not the dehydrated device_id.")));
}
let since: Option<u64> = body
.body
.next_batch
.as_deref()
.map(str::parse)
.transpose()?;
let mut next_batch: Option<u64> = None;
let events = services
.users
.get_to_device_events(sender_user, device_id, since, None)
.take(MAX_BATCH_EVENTS)
.inspect(|&(count, _)| {
next_batch.replace(count);
})
.map(at!(1))
.collect()
.await;
Ok(get_events::Response {
events,
next_batch: next_batch.as_ref().map(ToString::to_string),
})
}
+4 -4
View File
@@ -1,6 +1,6 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, debug, err, utils};
use conduwuit::{Err, Error, Result, debug, err, utils};
use futures::StreamExt;
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedDeviceId,
@@ -232,7 +232,7 @@ pub(crate) async fn delete_devices_route(
.await?;
if !worked {
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
}
// Success!
},
@@ -243,10 +243,10 @@ pub(crate) async fn delete_devices_route(
.uiaa
.create(sender_user, sender_device, &uiaainfo, json);
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
},
| _ => {
return Err!(BadRequest(ErrorKind::NotJson, "Not json."));
return Err(Error::BadRequest(ErrorKind::NotJson, "Not json."));
},
},
}
+6 -6
View File
@@ -5,7 +5,7 @@
use axum::extract::State;
use conduwuit::{
Err, Result, debug, debug_warn, err,
Err, Error, Result, debug, debug_warn, err,
result::NotFound,
utils,
utils::{IterStream, stream::WidebandExt},
@@ -215,7 +215,7 @@ pub(crate) async fn upload_signing_keys_route(
.await?;
if !worked {
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
}
// Success!
},
@@ -226,10 +226,10 @@ pub(crate) async fn upload_signing_keys_route(
.uiaa
.create(sender_user, sender_device, &uiaainfo, json);
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
},
| _ => {
return Err!(BadRequest(ErrorKind::NotJson, "Not json."));
return Err(Error::BadRequest(ErrorKind::NotJson, "Not json."));
},
},
}
@@ -396,12 +396,12 @@ pub(crate) async fn get_key_changes_route(
let from = body
.from
.parse()
.map_err(|_| err!(BadRequest(ErrorKind::InvalidParam, "Invalid `from`.")))?;
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid `from`."))?;
let to = body
.to
.parse()
.map_err(|_| err!(BadRequest(ErrorKind::InvalidParam, "Invalid `to`.")))?;
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid `to`."))?;
device_list_updates.extend(
services
+27 -5
View File
@@ -3,9 +3,10 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, err, error,
Err, Result, err,
utils::{self, content_disposition::make_content_disposition, math::ruma_from_usize},
};
use conduwuit_core::error;
use conduwuit_service::{
Services,
media::{CACHE_CONTROL_IMMUTABLE, CORP_CROSS_ORIGIN, Dim, FileMeta, MXC_LENGTH},
@@ -69,7 +70,7 @@ pub(crate) async fn create_content_route(
.create(mxc, Some(user), Some(&content_disposition), content_type, &body.file)
.await
{
error!("Failed to save uploaded media: {e}");
err!("Failed to save uploaded media: {e}");
return Err!(Request(Unknown("Failed to save uploaded media")));
}
@@ -144,12 +145,22 @@ pub(crate) async fn get_content_route(
server_name: &body.server_name,
media_id: &body.media_id,
};
let FileMeta {
content,
content_type,
content_disposition,
} = fetch_file(&services, &mxc, user, body.timeout_ms, None).await?;
} = match fetch_file(&services, &mxc, user, body.timeout_ms, None).await {
| Ok(meta) => meta,
| Err(conduwuit::Error::Io(e)) => match e.kind() {
| std::io::ErrorKind::NotFound => return Err!(Request(NotFound("Media not found."))),
| std::io::ErrorKind::PermissionDenied => {
error!("Permission denied when trying to read file: {e:?}");
return Err!(Request(Unknown("Unknown error when fetching file.")));
},
| _ => return Err!(Request(Unknown("Unknown error when fetching file."))),
},
| Err(_) => return Err!(Request(Unknown("Unknown error when fetching file."))),
};
Ok(get_content::v1::Response {
file: content.expect("entire file contents"),
@@ -185,7 +196,18 @@ pub(crate) async fn get_content_as_filename_route(
content,
content_type,
content_disposition,
} = fetch_file(&services, &mxc, user, body.timeout_ms, Some(&body.filename)).await?;
} = match fetch_file(&services, &mxc, user, body.timeout_ms, None).await {
| Ok(meta) => meta,
| Err(conduwuit::Error::Io(e)) => match e.kind() {
| std::io::ErrorKind::NotFound => return Err!(Request(NotFound("Media not found."))),
| std::io::ErrorKind::PermissionDenied => {
error!("Permission denied when trying to read file: {e:?}");
return Err!(Request(Unknown("Unknown error when fetching file.")));
},
| _ => return Err!(Request(Unknown("Unknown error when fetching file."))),
},
| Err(_) => return Err!(Request(Unknown("Unknown error when fetching file."))),
};
Ok(get_content_as_filename::v1::Response {
file: content.expect("entire file contents"),
+3 -3
View File
@@ -1,7 +1,7 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, at, debug_warn,
Err, Error, Result, at, debug_warn,
matrix::{
event::{Event, Matches},
pdu::PduCount,
@@ -322,7 +322,7 @@ pub(crate) async fn is_ignored_pdu<Pdu>(
if server_ignored {
// the sender's server is ignored, so ignore this event
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::SenderIgnored { sender: None },
"The sender's server is ignored by this server.",
));
@@ -331,7 +331,7 @@ pub(crate) async fn is_ignored_pdu<Pdu>(
if user_ignored && !services.config.send_messages_from_ignored_users_to_client {
// the recipient of this PDU has the sender ignored, and we're not
// configured to send ignored messages to clients
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::SenderIgnored { sender: Some(event.sender().to_owned()) },
"You have ignored this sender.",
));
+2
View File
@@ -6,6 +6,7 @@
pub(super) mod backup;
pub(super) mod capabilities;
pub(super) mod context;
pub(super) mod dehydrated_device;
pub(super) mod device;
pub(super) mod directory;
pub(super) mod filter;
@@ -49,6 +50,7 @@
pub(super) use backup::*;
pub(super) use capabilities::*;
pub(super) use context::*;
pub(super) use dehydrated_device::*;
pub(super) use device::*;
pub(super) use directory::*;
pub(super) use filter::*;
+16 -16
View File
@@ -1,5 +1,5 @@
use axum::extract::State;
use conduwuit::{Err, Result, err};
use conduwuit::{Err, Error, Result, err};
use conduwuit_service::Services;
use ruma::{
CanonicalJsonObject, CanonicalJsonValue,
@@ -243,27 +243,27 @@ pub(crate) async fn set_pushrule_route(
body.before.as_deref(),
) {
let err = match error {
| InsertPushRuleError::ServerDefaultRuleId => err!(BadRequest(
| InsertPushRuleError::ServerDefaultRuleId => Error::BadRequest(
ErrorKind::InvalidParam,
"Rule IDs starting with a dot are reserved for server-default rules.",
)),
| InsertPushRuleError::InvalidRuleId => err!(BadRequest(
),
| InsertPushRuleError::InvalidRuleId => Error::BadRequest(
ErrorKind::InvalidParam,
"Rule ID containing invalid characters.",
)),
| InsertPushRuleError::RelativeToServerDefaultRule => err!(BadRequest(
),
| InsertPushRuleError::RelativeToServerDefaultRule => Error::BadRequest(
ErrorKind::InvalidParam,
"Can't place a push rule relatively to a server-default rule.",
)),
| InsertPushRuleError::UnknownRuleId => err!(BadRequest(
),
| InsertPushRuleError::UnknownRuleId => Error::BadRequest(
ErrorKind::NotFound,
"The before or after rule could not be found.",
)),
| InsertPushRuleError::BeforeHigherThanAfter => err!(BadRequest(
),
| InsertPushRuleError::BeforeHigherThanAfter => Error::BadRequest(
ErrorKind::InvalidParam,
"The before rule has a higher priority than the after rule.",
)),
| _ => err!(BadRequest(ErrorKind::InvalidParam, "Invalid data.")),
),
| _ => Error::BadRequest(ErrorKind::InvalidParam, "Invalid data."),
};
return Err(err);
@@ -433,13 +433,13 @@ pub(crate) async fn delete_pushrule_route(
.remove(body.kind.clone(), &body.rule_id)
{
let err = match error {
| RemovePushRuleError::ServerDefault => err!(BadRequest(
| RemovePushRuleError::ServerDefault => Error::BadRequest(
ErrorKind::InvalidParam,
"Cannot delete a server-default pushrule.",
)),
),
| RemovePushRuleError::NotFound =>
err!(BadRequest(ErrorKind::NotFound, "Push rule not found.")),
| _ => err!(BadRequest(ErrorKind::InvalidParam, "Invalid data.")),
Error::BadRequest(ErrorKind::NotFound, "Push rule not found."),
| _ => Error::BadRequest(ErrorKind::InvalidParam, "Invalid data."),
};
return Err(err);
+6 -6
View File
@@ -2,7 +2,7 @@
use axum::extract::State;
use conduwuit::{
Err, Event, Result, RoomVersion, debug, err, info,
Err, Error, Event, Result, RoomVersion, debug, err, info,
matrix::{StateKey, pdu::PduBuilder},
};
use futures::{FutureExt, StreamExt};
@@ -58,7 +58,7 @@ pub(crate) async fn upgrade_room_route(
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if !services.server.supported_room_version(&body.new_version) {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::UnsupportedRoomVersion,
"This server does not support that room version.",
));
@@ -170,7 +170,7 @@ pub(crate) async fn upgrade_room_route(
"creator".into(),
json!(&sender_user).try_into().map_err(|e| {
info!("Error forming creation event: {e}");
err!(BadRequest(ErrorKind::BadJson, "Error forming creation event"))
Error::BadRequest(ErrorKind::BadJson, "Error forming creation event")
})?,
);
},
@@ -186,13 +186,13 @@ pub(crate) async fn upgrade_room_route(
"room_version".into(),
json!(&body.new_version)
.try_into()
.map_err(|_| err!(BadRequest(ErrorKind::BadJson, "Error forming creation event")))?,
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Error forming creation event"))?,
);
create_event_content.insert(
"predecessor".into(),
json!(predecessor)
.try_into()
.map_err(|_| err!(BadRequest(ErrorKind::BadJson, "Error forming creation event")))?,
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Error forming creation event"))?,
);
// Validate creation event content
@@ -203,7 +203,7 @@ pub(crate) async fn upgrade_room_route(
)
.is_err()
{
return Err!(BadRequest(ErrorKind::BadJson, "Error forming creation event"));
return Err(Error::BadRequest(ErrorKind::BadJson, "Error forming creation event"));
}
let create_event_id = services
+3 -3
View File
@@ -50,8 +50,8 @@ pub(crate) async fn send_message_event_route(
// Check if this is a new transaction id
if let Ok(response) = services
.transaction_ids
.existing_txnid(sender_user, sender_device, &body.txn_id)
.transactions
.get_client_txn(sender_user, sender_device, &body.txn_id)
.await
{
// The client might have sent a txnid of the /sendToDevice endpoint
@@ -92,7 +92,7 @@ pub(crate) async fn send_message_event_route(
)
.await?;
services.transaction_ids.add_txnid(
services.transactions.add_client_txnid(
sender_user,
sender_device,
&body.txn_id,
+4 -4
View File
@@ -3,7 +3,7 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, debug, err, info,
Err, Error, Result, debug, err, info,
utils::{self, ReadyExt, hash},
warn,
};
@@ -191,7 +191,7 @@ pub(crate) async fn handle_login(
}
if services.users.is_locked(&user_id).await? {
return Err!(BadRequest(ErrorKind::UserLocked, "This account has been locked."));
return Err(Error::BadRequest(ErrorKind::UserLocked, "This account has been locked."));
}
if services.users.is_login_disabled(&user_id).await {
@@ -390,7 +390,7 @@ pub(crate) async fn login_token_route(
.await?;
if !worked {
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
}
// Success!
@@ -402,7 +402,7 @@ pub(crate) async fn login_token_route(
.uiaa
.create(sender_user, sender_device, &uiaainfo, json);
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
},
| _ => {
return Err!(Request(NotJson("No JSON body was sent when required.")));
+2 -1
View File
@@ -11,7 +11,7 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Result, extract_variant,
Result, at, extract_variant,
utils::{
ReadyExt, TryFutureExtExt,
stream::{BroadbandExt, Tools, WidebandExt},
@@ -385,6 +385,7 @@ pub(crate) async fn build_sync_events(
last_sync_end_count,
Some(current_count),
)
.map(at!(1))
.collect::<Vec<_>>();
let device_one_time_keys_count = services
+4 -1
View File
@@ -336,7 +336,9 @@ async fn handle_lists<'a, Rooms, AllRooms>(
let ranges = list.ranges.clone();
for mut range in ranges {
range.0 = uint!(0);
range.0 = range
.0
.min(UInt::try_from(active_rooms.len()).unwrap_or(UInt::MAX));
range.1 = range.1.checked_add(uint!(1)).unwrap_or(range.1);
range.1 = range
.1
@@ -1027,6 +1029,7 @@ async fn collect_to_device(
events: services
.users
.get_to_device_events(sender_user, sender_device, None, Some(next_batch))
.map(at!(1))
.collect()
.await,
})
+6 -6
View File
@@ -1,7 +1,7 @@
use std::collections::BTreeMap;
use axum::extract::State;
use conduwuit::{Result, err};
use conduwuit::{Error, Result};
use conduwuit_service::sending::EduBuf;
use futures::StreamExt;
use ruma::{
@@ -26,8 +26,8 @@ pub(crate) async fn send_event_to_device_route(
// Check if this is a new transaction id
if services
.transaction_ids
.existing_txnid(sender_user, sender_device, &body.txn_id)
.transactions
.get_client_txn(sender_user, sender_device, &body.txn_id)
.await
.is_ok()
{
@@ -66,7 +66,7 @@ pub(crate) async fn send_event_to_device_route(
let event = event
.deserialize_as()
.map_err(|_| err!(BadRequest(ErrorKind::InvalidParam, "Event is invalid")))?;
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Event is invalid"))?;
match target_device_id_maybe {
| DeviceIdOrAllDevices::DeviceId(target_device_id) => {
@@ -104,8 +104,8 @@ pub(crate) async fn send_event_to_device_route(
// Save transaction id with empty data
services
.transaction_ids
.add_txnid(sender_user, sender_device, &body.txn_id, &[]);
.transactions
.add_client_txnid(sender_user, sender_device, &body.txn_id, &[]);
Ok(send_event_to_device::v3::Response {})
}
+1
View File
@@ -50,6 +50,7 @@ pub(crate) async fn get_supported_versions_route(
("org.matrix.msc2836".to_owned(), true), /* threading/threads (https://github.com/matrix-org/matrix-spec-proposals/pull/2836) */
("org.matrix.msc2946".to_owned(), true), /* spaces/hierarchy summaries (https://github.com/matrix-org/matrix-spec-proposals/pull/2946) */
("org.matrix.msc3026.busy_presence".to_owned(), true), /* busy presence status (https://github.com/matrix-org/matrix-spec-proposals/pull/3026) */
("org.matrix.msc3814".to_owned(), true), /* dehydrated devices */
("org.matrix.msc3827".to_owned(), true), /* filtering of /publicRooms by room type (https://github.com/matrix-org/matrix-spec-proposals/pull/3827) */
("org.matrix.msc3952_intentional_mentions".to_owned(), true), /* intentional mentions (https://github.com/matrix-org/matrix-spec-proposals/pull/3952) */
("org.matrix.msc3916.stable".to_owned(), true), /* authenticated media (https://github.com/matrix-org/matrix-spec-proposals/pull/3916) */
+33 -8
View File
@@ -1,8 +1,11 @@
use axum::{Json, extract::State, response::IntoResponse};
use conduwuit::{Err, Result};
use ruma::api::client::discovery::{
discover_homeserver::{self, HomeserverInfo, SlidingSyncProxyInfo},
discover_support::{self, Contact},
use conduwuit::{Error, Result};
use ruma::api::client::{
discovery::{
discover_homeserver::{self, HomeserverInfo, SlidingSyncProxyInfo},
discover_support::{self, Contact},
},
error::ErrorKind,
};
use crate::Ruma;
@@ -16,7 +19,7 @@ pub(crate) async fn well_known_client(
) -> Result<discover_homeserver::Response> {
let client_url = match services.config.well_known.client.as_ref() {
| Some(url) => url.to_string(),
| None => return Err!(BadRequest(ErrorKind::NotFound, "Not found.")),
| None => return Err(Error::BadRequest(ErrorKind::NotFound, "Not found.")),
};
Ok(discover_homeserver::Response {
@@ -24,10 +27,32 @@ pub(crate) async fn well_known_client(
identity_server: None,
sliding_sync_proxy: Some(SlidingSyncProxyInfo { url: client_url }),
tile_server: None,
rtc_foci: services.config.well_known.rtc_focus_server_urls.clone(),
rtc_foci: services
.config
.matrix_rtc
.effective_foci(&services.config.well_known.rtc_focus_server_urls)
.to_vec(),
})
}
/// # `GET /_matrix/client/v1/rtc/transports`
/// # `GET /_matrix/client/unstable/org.matrix.msc4143/rtc/transports`
///
/// Returns the list of MatrixRTC foci (transports) configured for this
/// homeserver, implementing MSC4143.
pub(crate) async fn get_rtc_transports(
State(services): State<crate::State>,
_body: Ruma<ruma::api::client::discovery::get_rtc_transports::Request>,
) -> Result<ruma::api::client::discovery::get_rtc_transports::Response> {
Ok(ruma::api::client::discovery::get_rtc_transports::Response::new(
services
.config
.matrix_rtc
.effective_foci(&services.config.well_known.rtc_focus_server_urls)
.to_vec(),
))
}
/// # `GET /.well-known/matrix/support`
///
/// Server support contact and support page of a homeserver's domain.
@@ -85,7 +110,7 @@ pub(crate) async fn well_known_support(
if contacts.is_empty() && support_page.is_none() {
// No admin room, no configured contacts, and no support page
return Err!(BadRequest(ErrorKind::NotFound, "Not found."));
return Err(Error::BadRequest(ErrorKind::NotFound, "Not found."));
}
Ok(discover_support::Response { contacts, support_page })
@@ -102,7 +127,7 @@ pub(crate) async fn syncv3_client_server_json(
| Some(url) => url.to_string(),
| None => match services.config.well_known.server.as_ref() {
| Some(url) => url.to_string(),
| None => return Err!(BadRequest(ErrorKind::NotFound, "Not found.")),
| None => return Err(Error::BadRequest(ErrorKind::NotFound, "Not found.")),
},
};
+5
View File
@@ -160,6 +160,10 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
.ruma_route(&client::update_device_route)
.ruma_route(&client::delete_device_route)
.ruma_route(&client::delete_devices_route)
.ruma_route(&client::put_dehydrated_device_route)
.ruma_route(&client::delete_dehydrated_device_route)
.ruma_route(&client::get_dehydrated_device_route)
.ruma_route(&client::get_dehydrated_events_route)
.ruma_route(&client::get_tags_route)
.ruma_route(&client::update_tag_route)
.ruma_route(&client::delete_tag_route)
@@ -184,6 +188,7 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
.ruma_route(&client::put_suspended_status)
.ruma_route(&client::well_known_support)
.ruma_route(&client::well_known_client)
.ruma_route(&client::get_rtc_transports)
.route("/_conduwuit/server_version", get(client::conduwuit_server_version))
.route("/_continuwuity/server_version", get(client::conduwuit_server_version))
.ruma_route(&client::room_initial_sync_route)
+46 -28
View File
@@ -4,7 +4,7 @@
headers::{Authorization, authorization::Bearer},
typed_header::TypedHeaderRejectionReason,
};
use conduwuit::{Err, Result, debug_error, err, warn};
use conduwuit::{Err, Error, Result, debug_error, err, warn};
use futures::{
TryFutureExt,
future::{
@@ -14,7 +14,8 @@
pin_mut,
};
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, OwnedDeviceId, OwnedServerName, OwnedUserId, UserId,
CanonicalJsonObject, CanonicalJsonValue, DeviceId, OwnedDeviceId, OwnedServerName,
OwnedUserId, UserId,
api::{
AuthScheme, IncomingRequest, Metadata,
client::{
@@ -66,23 +67,17 @@ pub(super) async fn auth(
if metadata.authentication == AuthScheme::None {
match metadata {
| &get_public_rooms::v3::Request::METADATA => {
if !services
.server
.config
.allow_public_room_directory_without_auth
{
match token {
| Token::Appservice(_) | Token::User(_) => {
// we should have validated the token above
// already
},
| Token::None | Token::Invalid => {
return Err!(BadRequest(
ErrorKind::MissingToken,
"Missing or invalid access token.",
));
},
}
match token {
| Token::Appservice(_) | Token::User(_) => {
// we should have validated the token above
// already
},
| Token::None | Token::Invalid => {
return Err(Error::BadRequest(
ErrorKind::MissingToken,
"Missing or invalid access token.",
));
},
}
},
| &get_profile::v3::Request::METADATA
@@ -96,7 +91,7 @@ pub(super) async fn auth(
// already
},
| Token::None | Token::Invalid => {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::MissingToken,
"Missing or invalid access token.",
));
@@ -130,10 +125,10 @@ pub(super) async fn auth(
appservice_info: None,
})
} else {
Err!(BadRequest(ErrorKind::MissingToken, "Missing access token."))
Err(Error::BadRequest(ErrorKind::MissingToken, "Missing access token."))
}
},
| _ => Err!(BadRequest(ErrorKind::MissingToken, "Missing access token.")),
| _ => Err(Error::BadRequest(ErrorKind::MissingToken, "Missing access token.")),
},
| (
AuthScheme::AccessToken | AuthScheme::AccessTokenOptional | AuthScheme::None,
@@ -149,7 +144,7 @@ pub(super) async fn auth(
&ruma::api::client::session::logout::v3::Request::METADATA
| &ruma::api::client::session::logout_all::v3::Request::METADATA
) {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::UserLocked,
"This account has been locked.",
));
@@ -174,11 +169,11 @@ pub(super) async fn auth(
appservice_info: None,
}),
| (AuthScheme::ServerSignatures, Token::Appservice(_) | Token::User(_)) =>
Err!(BadRequest(
Err(Error::BadRequest(
ErrorKind::Unauthorized,
"Only server signatures should be used on this endpoint.",
)),
| (AuthScheme::AppserviceToken, Token::User(_)) => Err!(BadRequest(
| (AuthScheme::AppserviceToken, Token::User(_)) => Err(Error::BadRequest(
ErrorKind::Unauthorized,
"Only appservice access tokens should be used on this endpoint.",
)),
@@ -196,13 +191,13 @@ pub(super) async fn auth(
appservice_info: None,
})
} else {
Err!(BadRequest(
Err(Error::BadRequest(
ErrorKind::UnknownToken { soft_logout: false },
"Unknown access token.",
))
}
},
| (_, Token::Invalid) => Err!(BadRequest(
| (_, Token::Invalid) => Err(Error::BadRequest(
ErrorKind::UnknownToken { soft_logout: false },
"Unknown access token.",
)),
@@ -234,10 +229,33 @@ async fn auth_appservice(
return Err!(Request(Exclusive("User is not in namespace.")));
}
// MSC3202/MSC4190: Handle device_id masquerading for appservices.
// The device_id can be provided via `device_id` or
// `org.matrix.msc3202.device_id` query parameter.
let sender_device = if let Some(ref device_id_str) = request.query.device_id {
let device_id: &DeviceId = device_id_str.as_str().into();
// Verify the device exists for this user
if services
.users
.get_device_metadata(&user_id, device_id)
.await
.is_err()
{
return Err!(Request(Forbidden(
"Device does not exist for user or appservice cannot masquerade as this device."
)));
}
Some(device_id.to_owned())
} else {
None
};
Ok(Auth {
origin: None,
sender_user: Some(user_id),
sender_device: None,
sender_device,
appservice_info: Some(*info),
})
}
+4
View File
@@ -11,6 +11,10 @@
pub(super) struct QueryParams {
pub(super) access_token: Option<String>,
pub(super) user_id: Option<String>,
/// Device ID for appservice device masquerading (MSC3202/MSC4190).
/// Can be provided as `device_id` or `org.matrix.msc3202.device_id`.
#[serde(alias = "org.matrix.msc3202.device_id")]
pub(super) device_id: Option<String>,
}
pub(super) struct Request {
+6 -3
View File
@@ -1,9 +1,12 @@
use std::{borrow::Borrow, iter::once};
use axum::extract::State;
use conduwuit::{Err, Error, Result, err, info, utils::stream::ReadyExt};
use conduwuit::{Err, Error, Result, info, utils::stream::ReadyExt};
use futures::StreamExt;
use ruma::{RoomId, api::federation::authorization::get_event_authorization};
use ruma::{
RoomId,
api::{client::error::ErrorKind, federation::authorization::get_event_authorization},
};
use super::AccessCheck;
use crate::Ruma;
@@ -44,7 +47,7 @@ pub(crate) async fn get_event_authorization_route(
.timeline
.get_pdu_json(&body.event_id)
.await
.map_err(|_| err!(BadRequest(ErrorKind::NotFound, "Event not found.")))?;
.map_err(|_| Error::BadRequest(ErrorKind::NotFound, "Event not found."))?;
let room_id_str = event
.get("room_id")
+2 -2
View File
@@ -2,7 +2,7 @@
use axum_client_ip::InsecureClientIp;
use base64::{Engine as _, engine::general_purpose};
use conduwuit::{
Err, PduEvent, Result, err, error,
Err, Error, PduEvent, Result, err, error,
matrix::{Event, event::gen_event_id},
utils::{self, hash::sha256},
warn,
@@ -33,7 +33,7 @@ pub(crate) async fn create_invite_route(
.await?;
if !services.server.supported_room_version(&body.room_version) {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::IncompatibleRoomVersion { room_version: body.room_version.clone() },
"Server does not support this room version.",
));
+2 -2
View File
@@ -1,7 +1,7 @@
use std::borrow::ToOwned;
use axum::extract::State;
use conduwuit::{Err, Result, debug, debug_info, info, matrix::pdu::PduBuilder, warn};
use conduwuit::{Err, Error, Result, debug, debug_info, info, matrix::pdu::PduBuilder, warn};
use conduwuit_service::Services;
use futures::StreamExt;
use ruma::{
@@ -80,7 +80,7 @@ pub(crate) async fn create_join_event_template_route(
let room_version_id = services.rooms.state.get_room_version(&body.room_id).await?;
if !body.ver.contains(&room_version_id) {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::IncompatibleRoomVersion { room_version: room_version_id },
"Room version not supported.",
));
+3 -3
View File
@@ -1,6 +1,6 @@
use RoomVersionId::*;
use axum::extract::State;
use conduwuit::{Err, Result, debug_warn, info, matrix::pdu::PduBuilder, warn};
use conduwuit::{Err, Error, Result, debug_warn, info, matrix::pdu::PduBuilder, warn};
use ruma::{
RoomVersionId,
api::{client::error::ErrorKind, federation::knock::create_knock_event_template},
@@ -67,14 +67,14 @@ pub(crate) async fn create_knock_event_template_route(
let room_version_id = services.rooms.state.get_room_version(&body.room_id).await?;
if matches!(room_version_id, V1 | V2 | V3 | V4 | V5 | V6) {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::IncompatibleRoomVersion { room_version: room_version_id },
"Room version does not support knocking.",
));
}
if !body.ver.contains(&room_version_id) {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::IncompatibleRoomVersion { room_version: room_version_id },
"Your homeserver does not support the features required to knock on this room.",
));
+5 -11
View File
@@ -1,6 +1,6 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, err};
use conduwuit::{Error, Result};
use ruma::{
api::{
client::error::ErrorKind,
@@ -25,7 +25,7 @@ pub(crate) async fn get_public_rooms_filtered_route(
.config
.allow_public_room_directory_over_federation
{
return Err!(BadRequest(ErrorKind::forbidden(), "Room directory is not public"));
return Err(Error::BadRequest(ErrorKind::forbidden(), "Room directory is not public"));
}
let response = crate::client::get_public_rooms_filtered_helper(
@@ -38,10 +38,7 @@ pub(crate) async fn get_public_rooms_filtered_route(
)
.await
.map_err(|_| {
err!(BadRequest(
ErrorKind::Unknown,
"Failed to return this server's public room list."
))
Error::BadRequest(ErrorKind::Unknown, "Failed to return this server's public room list.")
})?;
Ok(get_public_rooms_filtered::v1::Response {
@@ -65,7 +62,7 @@ pub(crate) async fn get_public_rooms_route(
.globals
.allow_public_room_directory_over_federation()
{
return Err!(BadRequest(ErrorKind::forbidden(), "Room directory is not public"));
return Err(Error::BadRequest(ErrorKind::forbidden(), "Room directory is not public"));
}
let response = crate::client::get_public_rooms_filtered_helper(
@@ -78,10 +75,7 @@ pub(crate) async fn get_public_rooms_route(
)
.await
.map_err(|_| {
err!(BadRequest(
ErrorKind::Unknown,
"Failed to return this server's public room list."
))
Error::BadRequest(ErrorKind::Unknown, "Failed to return this server's public room list.")
})?;
Ok(get_public_rooms::v1::Response {
+6 -5
View File
@@ -1,7 +1,7 @@
use std::collections::BTreeMap;
use axum::extract::State;
use conduwuit::{Err, Result, err};
use conduwuit::{Error, Result, err};
use futures::StreamExt;
use get_profile_information::v1::ProfileField;
use rand::seq::SliceRandom;
@@ -67,16 +67,17 @@ pub(crate) async fn get_profile_information_route(
.config
.allow_inbound_profile_lookup_federation_requests
{
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"Profile lookup over federation is not allowed on this homeserver.",
));
}
if !services.globals.server_is_ours(body.user_id.server_name()) {
return Err!(
BadRequest(ErrorKind::InvalidParam, "User does not belong to this server.",)
);
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"User does not belong to this server.",
));
}
let mut displayname = None;
+214 -64
View File
@@ -1,27 +1,33 @@
use std::{collections::BTreeMap, net::IpAddr, time::Instant};
use std::{
collections::{BTreeMap, HashMap, HashSet},
net::IpAddr,
time::{Duration, Instant},
};
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Error, Result, debug, debug_warn, err, error,
result::LogErr,
state_res::lexicographical_topological_sort,
trace,
utils::{
IterStream, ReadyExt, millis_since_unix_epoch,
stream::{BroadbandExt, TryBroadbandExt, automatic_width},
},
warn,
};
use conduwuit_service::{
Services,
sending::{EDU_LIMIT, PDU_LIMIT},
};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use http::StatusCode;
use itertools::Itertools;
use ruma::{
CanonicalJsonObject, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, ServerName, UserId,
CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId,
RoomId, ServerName, UserId,
api::{
client::error::ErrorKind,
client::error::{ErrorKind, ErrorKind::LimitExceeded},
federation::transactions::{
edu::{
DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent,
@@ -32,9 +38,16 @@
},
},
events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
int,
serde::Raw,
to_device::DeviceIdOrAllDevices,
uint,
};
use service::transactions::{
FederationTxnState, TransactionError, TxnKey, WrappedTransactionResponse,
};
use tokio::sync::watch::{Receiver, Sender};
use tracing::instrument;
use crate::Ruma;
@@ -44,15 +57,6 @@
/// # `PUT /_matrix/federation/v1/send/{txnId}`
///
/// Push EDUs and PDUs to this server.
#[tracing::instrument(
name = "txn",
level = "debug",
skip_all,
fields(
%client,
origin = body.origin().as_str()
),
)]
pub(crate) async fn send_transaction_message_route(
State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp,
@@ -76,16 +80,73 @@ pub(crate) async fn send_transaction_message_route(
)));
}
let txn_start_time = Instant::now();
trace!(
pdus = body.pdus.len(),
edus = body.edus.len(),
elapsed = ?txn_start_time.elapsed(),
id = %body.transaction_id,
origin = %body.origin(),
"Starting txn",
);
let txn_key = (body.origin().to_owned(), body.transaction_id.clone());
// Atomically check cache, join active, or start new transaction
match services
.transactions
.get_or_start_federation_txn(txn_key.clone())?
{
| FederationTxnState::Cached(response) => {
// Already responded
Ok(response)
},
| FederationTxnState::Active(receiver) => {
// Another thread is processing
wait_for_result(receiver).await
},
| FederationTxnState::Started { receiver, sender } => {
// We're the first, spawn the processing task
services
.server
.runtime()
.spawn(process_inbound_transaction(services, body, client, txn_key, sender));
// and wait for it
wait_for_result(receiver).await
},
}
}
async fn wait_for_result(
mut recv: Receiver<WrappedTransactionResponse>,
) -> Result<send_transaction_message::v1::Response> {
if tokio::time::timeout(Duration::from_secs(50), recv.changed())
.await
.is_err()
{
// Took too long, return 429 to encourage the sender to try again
return Err(Error::BadRequest(
LimitExceeded { retry_after: None },
"Transaction is being still being processed. Please try again later.",
));
}
let value = recv.borrow_and_update();
match value.clone() {
| Some(Ok(response)) => Ok(response),
| Some(Err(err)) => Err(transaction_error_to_response(&err)),
| None => Err(Error::Request(
ErrorKind::Unknown,
"Transaction processing failed unexpectedly".into(),
StatusCode::INTERNAL_SERVER_ERROR,
)),
}
}
#[instrument(
skip_all,
fields(
id = ?body.transaction_id.as_str(),
origin = ?body.origin()
)
)]
async fn process_inbound_transaction(
services: crate::State,
body: Ruma<send_transaction_message::v1::Request>,
client: IpAddr,
txn_key: TxnKey,
sender: Sender<WrappedTransactionResponse>,
) {
let txn_start_time = Instant::now();
let pdus = body
.pdus
.iter()
@@ -102,40 +163,79 @@ pub(crate) async fn send_transaction_message_route(
.filter_map(Result::ok)
.stream();
let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?;
debug!(pdus = body.pdus.len(), edus = body.edus.len(), "Processing transaction",);
let results = match handle(&services, &client, body.origin(), pdus, edus).await {
| Ok(results) => results,
| Err(err) => {
fail_federation_txn(services, &txn_key, &sender, err);
return;
},
};
for (id, result) in &results {
if let Err(e) = result {
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
debug_warn!("Incoming PDU failed {id}: {e:?}");
}
}
}
debug!(
pdus = body.pdus.len(),
edus = body.edus.len(),
elapsed = ?txn_start_time.elapsed(),
id = %body.transaction_id,
origin = %body.origin(),
"Finished txn",
"Finished processing transaction"
);
for (id, result) in &results {
if let Err(e) = result {
if matches!(e, Error::BadRequest { kind: ErrorKind::NotFound, .. }) {
warn!("Incoming PDU failed {id}: {e:?}");
}
}
}
Ok(send_transaction_message::v1::Response {
let response = send_transaction_message::v1::Response {
pdus: results
.into_iter()
.map(|(e, r)| (e, r.map_err(error::sanitized_message)))
.collect(),
})
};
services
.transactions
.finish_federation_txn(txn_key, sender, response);
}
/// Handles a failed federation transaction by sending the error through
/// the channel and cleaning up the transaction state. This allows waiters to
/// receive an appropriate error response.
fn fail_federation_txn(
services: crate::State,
txn_key: &TxnKey,
sender: &Sender<WrappedTransactionResponse>,
err: TransactionError,
) {
debug!("Transaction failed: {err}");
// Remove from active state so the transaction can be retried
services.transactions.remove_federation_txn(txn_key);
// Send the error to any waiters
if let Err(e) = sender.send(Some(Err(err))) {
debug_warn!("Failed to send transaction error to receivers: {e}");
}
}
/// Converts a TransactionError into an appropriate HTTP error response.
fn transaction_error_to_response(err: &TransactionError) -> Error {
match err {
| TransactionError::ShuttingDown => Error::Request(
ErrorKind::Unknown,
"Server is shutting down, please retry later".into(),
StatusCode::SERVICE_UNAVAILABLE,
),
}
}
async fn handle(
services: &Services,
client: &IpAddr,
origin: &ServerName,
started: Instant,
pdus: impl Stream<Item = Pdu> + Send,
edus: impl Stream<Item = Edu> + Send,
) -> Result<ResolvedMap> {
) -> std::result::Result<ResolvedMap, TransactionError> {
// group pdus by room
let pdus = pdus
.collect()
@@ -152,7 +252,7 @@ async fn handle(
.into_iter()
.try_stream()
.broad_and_then(|(room_id, pdus): (_, Vec<_>)| {
handle_room(services, client, origin, started, room_id, pdus.into_iter())
handle_room(services, client, origin, room_id, pdus.into_iter())
.map_ok(Vec::into_iter)
.map_ok(IterStream::try_stream)
})
@@ -169,14 +269,51 @@ async fn handle(
Ok(results)
}
/// Attempts to build a localised directed acyclic graph out of the given PDUs,
/// returning them in a topologically sorted order.
///
/// This is used to attempt to process PDUs in an order that respects their
/// dependencies, however it is ultimately the sender's responsibility to send
/// them in a processable order, so this is just a best effort attempt. It does
/// not account for power levels or other tie breaks.
async fn build_local_dag(
pdu_map: &HashMap<OwnedEventId, CanonicalJsonObject>,
) -> Result<Vec<OwnedEventId>> {
debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs");
let mut dag: HashMap<OwnedEventId, HashSet<OwnedEventId>> = HashMap::new();
for (event_id, value) in pdu_map {
let prev_events = value
.get("prev_events")
.expect("pdu must have prev_events")
.as_array()
.expect("prev_events must be an array")
.iter()
.map(|v| {
OwnedEventId::parse(v.as_str().expect("prev_events values must be strings"))
.expect("prev_events must be valid event IDs")
})
.collect::<HashSet<OwnedEventId>>();
dag.insert(event_id.clone(), prev_events);
}
lexicographical_topological_sort(&dag, &|_| async {
// Note: we don't bother fetching power levels because that would massively slow
// this function down. This is a best-effort attempt to order events correctly
// for processing, however ultimately that should be the sender's job.
Ok((int!(0), MilliSecondsSinceUnixEpoch(uint!(0))))
})
.await
.map_err(|e| err!("failed to resolve local graph: {e}"))
}
async fn handle_room(
services: &Services,
_client: &IpAddr,
origin: &ServerName,
txn_start_time: Instant,
room_id: OwnedRoomId,
pdus: impl Iterator<Item = Pdu> + Send,
) -> Result<Vec<(OwnedEventId, Result)>> {
) -> std::result::Result<Vec<(OwnedEventId, Result)>, TransactionError> {
let _room_lock = services
.rooms
.event_handler
@@ -185,27 +322,40 @@ async fn handle_room(
.await;
let room_id = &room_id;
pdus.try_stream()
.and_then(|(_, event_id, value)| async move {
services.server.check_running()?;
let pdu_start_time = Instant::now();
let result = services
.rooms
.event_handler
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
.await
.map(|_| ());
debug!(
pdu_elapsed = ?pdu_start_time.elapsed(),
txn_elapsed = ?txn_start_time.elapsed(),
"Finished PDU {event_id}",
);
Ok((event_id, result))
let pdu_map: HashMap<OwnedEventId, CanonicalJsonObject> = pdus
.into_iter()
.map(|(_, event_id, value)| (event_id, value))
.collect();
// Try to sort PDUs by their dependencies, but fall back to arbitrary order on
// failure (e.g., cycles). This is best-effort; proper ordering is the sender's
// responsibility.
let sorted_event_ids = if pdu_map.len() >= 2 {
build_local_dag(&pdu_map).await.unwrap_or_else(|e| {
debug_warn!("Failed to build local DAG for room {room_id}: {e}");
pdu_map.keys().cloned().collect()
})
.try_collect()
.await
} else {
pdu_map.keys().cloned().collect()
};
let mut results = Vec::with_capacity(sorted_event_ids.len());
for event_id in sorted_event_ids {
let value = pdu_map
.get(&event_id)
.expect("sorted event IDs must be from the original map")
.clone();
services
.server
.check_running()
.map_err(|_| TransactionError::ShuttingDown)?;
let result = services
.rooms
.event_handler
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
.await
.map(|_| ());
results.push((event_id, result));
}
Ok(results)
}
async fn handle_edu(services: &Services, client: &IpAddr, origin: &ServerName, edu: Edu) {
@@ -478,8 +628,8 @@ async fn handle_edu_direct_to_device(
// Check if this is a new transaction id
if services
.transaction_ids
.existing_txnid(sender, None, message_id)
.transactions
.get_client_txn(sender, None, message_id)
.await
.is_ok()
{
@@ -498,8 +648,8 @@ async fn handle_edu_direct_to_device(
// Save transaction id with empty data
services
.transaction_ids
.add_txnid(sender, None, message_id, &[]);
.transactions
.add_client_txnid(sender, None, message_id, &[]);
}
async fn handle_edu_direct_to_device_user<Event: Send + Sync>(
+7 -6
View File
@@ -1,7 +1,7 @@
use std::time::Duration;
use axum::extract::State;
use conduwuit::{Err, Result};
use conduwuit::{Error, Result};
use futures::{FutureExt, StreamExt, TryFutureExt};
use ruma::api::{
client::error::ErrorKind,
@@ -24,7 +24,7 @@ pub(crate) async fn get_devices_route(
body: Ruma<get_devices::v1::Request>,
) -> Result<get_devices::v1::Response> {
if !services.globals.user_is_local(&body.user_id) {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Tried to access user from other server.",
));
@@ -86,9 +86,10 @@ pub(crate) async fn get_keys_route(
.iter()
.any(|(u, _)| !services.globals.user_is_local(u))
{
return Err!(
BadRequest(ErrorKind::InvalidParam, "User does not belong to this server.",)
);
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"User does not belong to this server.",
));
}
let result = get_keys_helper(
@@ -120,7 +121,7 @@ pub(crate) async fn claim_keys_route(
.iter()
.any(|(u, _)| !services.globals.user_is_local(u))
{
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Tried to access user from other server.",
));
+3 -3
View File
@@ -1,6 +1,6 @@
use axum::extract::State;
use conduwuit::{Err, Result};
use ruma::api::federation::discovery::discover_homeserver;
use conduwuit::{Error, Result};
use ruma::api::{client::error::ErrorKind, federation::discovery::discover_homeserver};
use crate::Ruma;
@@ -14,7 +14,7 @@ pub(crate) async fn well_known_server(
Ok(discover_homeserver::Response {
server: match services.server.config.well_known.server.as_ref() {
| Some(server_name) => server_name.to_owned(),
| None => return Err!(BadRequest(ErrorKind::NotFound, "Not found.")),
| None => return Err(Error::BadRequest(ErrorKind::NotFound, "Not found.")),
},
})
}
+1 -2
View File
@@ -98,8 +98,7 @@ serde-saphyr.workspace = true
serde.workspace = true
smallvec.workspace = true
smallstr.workspace = true
snafu.workspace = true
paste.workspace = true
thiserror.workspace = true
tikv-jemallocator.optional = true
tikv-jemallocator.workspace = true
tikv-jemalloc-ctl.optional = true
+82 -17
View File
@@ -368,6 +368,31 @@ pub struct Config {
#[serde(default = "default_max_fetch_prev_events")]
pub max_fetch_prev_events: u16,
/// How many incoming federation transactions the server is willing to be
/// processing at any given time before it becomes overloaded and starts
/// rejecting further transactions until some slots become available.
///
/// Setting this value too low or too high may result in unstable
/// federation, and setting it too high may cause runaway resource usage.
///
/// default: 150
#[serde(default = "default_max_concurrent_inbound_transactions")]
pub max_concurrent_inbound_transactions: usize,
/// Maximum age (in seconds) for cached federation transaction responses.
/// Entries older than this will be removed during cleanup.
///
/// default: 7200 (2 hours)
#[serde(default = "default_transaction_id_cache_max_age_secs")]
pub transaction_id_cache_max_age_secs: u64,
/// Maximum number of cached federation transaction responses.
/// When the cache exceeds this limit, older entries will be removed.
///
/// default: 8192
#[serde(default = "default_transaction_id_cache_max_entries")]
pub transaction_id_cache_max_entries: usize,
/// Default/base connection timeout (seconds). This is used only by URL
/// previews and update/news endpoint checks.
///
@@ -653,12 +678,6 @@ pub struct Config {
#[serde(default)]
pub allow_public_room_directory_over_federation: bool,
/// Set this to true to allow your server's public room directory to be
/// queried without client authentication (access token) through the Client
/// APIs. Set this to false to protect against /publicRooms spiders.
#[serde(default)]
pub allow_public_room_directory_without_auth: bool,
/// Allow guests/unauthenticated users to access TURN credentials.
///
/// This is the equivalent of Synapse's `turn_allow_guests` config option.
@@ -2061,6 +2080,12 @@ pub struct Config {
/// display: nested
#[serde(default)]
pub blurhashing: BlurhashConfig,
/// Configuration for MatrixRTC (MSC4143) transport discovery.
/// display: nested
#[serde(default)]
pub matrix_rtc: MatrixRtcConfig,
#[serde(flatten)]
#[allow(clippy::zero_sized_map_values)]
// this is a catchall, the map shouldn't be zero at runtime
@@ -2126,17 +2151,16 @@ pub struct WellKnownConfig {
/// listed.
pub support_mxid: Option<OwnedUserId>,
/// A list of MatrixRTC foci URLs which will be served as part of the
/// MSC4143 client endpoint at /.well-known/matrix/client. If you're
/// setting up livekit, you'd want something like:
/// rtc_focus_server_urls = [
/// { type = "livekit", livekit_service_url = "https://livekit.example.com" },
/// ]
/// **DEPRECATED**: Use `[global.matrix_rtc].foci` instead.
///
/// To disable, set this to be an empty vector (`[]`).
/// A list of MatrixRTC foci URLs which will be served as part of the
/// MSC4143 client endpoint at /.well-known/matrix/client.
///
/// This option is deprecated and will be removed in a future release.
/// Please migrate to the new `[global.matrix_rtc]` config section.
///
/// default: []
#[serde(default = "default_rtc_focus_urls")]
#[serde(default)]
pub rtc_focus_server_urls: Vec<RtcFocusInfo>,
}
@@ -2165,6 +2189,43 @@ pub struct BlurhashConfig {
pub blurhash_max_raw_size: u64,
}
#[derive(Clone, Debug, Deserialize, Default)]
#[config_example_generator(filename = "conduwuit-example.toml", section = "global.matrix_rtc")]
pub struct MatrixRtcConfig {
/// A list of MatrixRTC foci (transports) which will be served via the
/// MSC4143 RTC transports endpoint at
/// `/_matrix/client/v1/rtc/transports`. If you're setting up livekit,
/// you'd want something like:
/// ```toml
/// [global.matrix_rtc]
/// foci = [
/// { type = "livekit", livekit_service_url = "https://livekit.example.com" },
/// ]
/// ```
///
/// To disable, set this to an empty list (`[]`).
///
/// default: []
#[serde(default)]
pub foci: Vec<RtcFocusInfo>,
}
impl MatrixRtcConfig {
/// Returns the effective foci, falling back to the deprecated
/// `rtc_focus_server_urls` if the new config is empty.
#[must_use]
pub fn effective_foci<'a>(
&'a self,
deprecated_foci: &'a [RtcFocusInfo],
) -> &'a [RtcFocusInfo] {
if !self.foci.is_empty() {
&self.foci
} else {
deprecated_foci
}
}
}
#[derive(Clone, Debug, Default, Deserialize)]
#[config_example_generator(filename = "conduwuit-example.toml", section = "global.ldap")]
pub struct LdapConfig {
@@ -2358,6 +2419,7 @@ pub struct DraupnirConfig {
"well_known_support_email",
"well_known_support_mxid",
"registration_token_file",
"well_known.rtc_focus_server_urls",
];
impl Config {
@@ -2540,6 +2602,12 @@ fn default_pusher_idle_timeout() -> u64 { 15 }
fn default_max_fetch_prev_events() -> u16 { 192_u16 }
fn default_max_concurrent_inbound_transactions() -> usize { 150 }
fn default_transaction_id_cache_max_age_secs() -> u64 { 60 * 60 * 2 }
fn default_transaction_id_cache_max_entries() -> usize { 8192 }
fn default_tracing_flame_filter() -> String {
cfg!(debug_assertions)
.then_some("trace,h2=off")
@@ -2635,9 +2703,6 @@ fn default_rocksdb_stats_level() -> u8 { 1 }
#[inline]
pub fn default_default_room_version() -> RoomVersionId { RoomVersionId::V11 }
#[must_use]
pub fn default_rtc_focus_urls() -> Vec<RtcFocusInfo> { vec![] }
fn default_ip_range_denylist() -> Vec<String> {
vec![
"127.0.0.0/8".to_owned(),
+30 -129
View File
@@ -45,162 +45,63 @@ macro_rules! Err {
macro_rules! err {
(Request(Forbidden($level:ident!($($args:tt)+)))) => {{
let mut buf = String::new();
$crate::error::Error::Request {
kind: $crate::ruma::api::client::error::ErrorKind::forbidden(),
message: $crate::err_log!(buf, $level, $($args)+),
code: $crate::http::StatusCode::BAD_REQUEST,
backtrace: Some($crate::snafu::Backtrace::capture()),
}
$crate::error::Error::Request(
$crate::ruma::api::client::error::ErrorKind::forbidden(),
$crate::err_log!(buf, $level, $($args)+),
$crate::http::StatusCode::BAD_REQUEST
)
}};
(Request(Forbidden($($args:tt)+))) => {
{
let message: std::borrow::Cow<'static, str> = $crate::format_maybe!($($args)+);
$crate::error::Error::Request {
kind: $crate::ruma::api::client::error::ErrorKind::forbidden(),
message,
code: $crate::http::StatusCode::BAD_REQUEST,
backtrace: Some($crate::snafu::Backtrace::capture()),
}
}
};
(Request(NotFound($level:ident!($($args:tt)+)))) => {{
let mut buf = String::new();
$crate::error::Error::Request {
kind: $crate::ruma::api::client::error::ErrorKind::NotFound,
message: $crate::err_log!(buf, $level, $($args)+),
code: $crate::http::StatusCode::BAD_REQUEST,
backtrace: None,
}
}};
(Request(NotFound($($args:tt)+))) => {
{
let message: std::borrow::Cow<'static, str> = $crate::format_maybe!($($args)+);
$crate::error::Error::Request {
kind: $crate::ruma::api::client::error::ErrorKind::NotFound,
message,
code: $crate::http::StatusCode::BAD_REQUEST,
backtrace: None,
}
}
$crate::error::Error::Request(
$crate::ruma::api::client::error::ErrorKind::forbidden(),
$crate::format_maybe!($($args)+),
$crate::http::StatusCode::BAD_REQUEST
)
};
(Request($variant:ident($level:ident!($($args:tt)+)))) => {{
let mut buf = String::new();
$crate::error::Error::Request {
kind: $crate::ruma::api::client::error::ErrorKind::$variant,
message: $crate::err_log!(buf, $level, $($args)+),
code: $crate::http::StatusCode::BAD_REQUEST,
backtrace: Some($crate::snafu::Backtrace::capture()),
}
$crate::error::Error::Request(
$crate::ruma::api::client::error::ErrorKind::$variant,
$crate::err_log!(buf, $level, $($args)+),
$crate::http::StatusCode::BAD_REQUEST
)
}};
(Request($variant:ident($($args:tt)+))) => {
{
let message: std::borrow::Cow<'static, str> = $crate::format_maybe!($($args)+);
$crate::error::Error::Request {
kind: $crate::ruma::api::client::error::ErrorKind::$variant,
message,
code: $crate::http::StatusCode::BAD_REQUEST,
backtrace: Some($crate::snafu::Backtrace::capture()),
}
}
$crate::error::Error::Request(
$crate::ruma::api::client::error::ErrorKind::$variant,
$crate::format_maybe!($($args)+),
$crate::http::StatusCode::BAD_REQUEST
)
};
(Config($item:literal, $($args:tt)+)) => {{
let mut buf = String::new();
$crate::error::ConfigSnafu {
directive: $item,
message: $crate::err_log!(buf, error, config = %$item, $($args)+),
}.build()
$crate::error::Error::Config($item, $crate::err_log!(buf, error, config = %$item, $($args)+))
}};
(BadRequest(ErrorKind::NotFound, $($args:tt)+)) => {
{
let message: std::borrow::Cow<'static, str> = $crate::format_maybe!($($args)+);
$crate::error::Error::Request {
kind: $crate::ruma::api::client::error::ErrorKind::NotFound,
message,
code: $crate::http::StatusCode::BAD_REQUEST,
backtrace: None,
}
}
};
(BadRequest($kind:expr, $($args:tt)+)) => {
{
let message: std::borrow::Cow<'static, str> = $crate::format_maybe!($($args)+);
$crate::error::BadRequestSnafu {
kind: $kind,
message,
}.build()
}
};
(FeatureDisabled($($args:tt)+)) => {
{
let feature: std::borrow::Cow<'static, str> = $crate::format_maybe!($($args)+);
$crate::error::FeatureDisabledSnafu { feature }.build()
}
};
(Federation($server:expr, $error:expr $(,)?)) => {
{
$crate::error::FederationSnafu {
server: $server,
error: $error,
}.build()
}
};
(InconsistentRoomState($message:expr, $room_id:expr $(,)?)) => {
{
$crate::error::InconsistentRoomStateSnafu {
message: $message,
room_id: $room_id,
}.build()
}
};
(Uiaa($info:expr $(,)?)) => {
{
$crate::error::UiaaSnafu {
info: $info,
}.build()
}
};
($variant:ident($level:ident!($($args:tt)+))) => {{
let mut buf = String::new();
$crate::paste::paste! {
$crate::error::[<$variant Snafu>] {
message: $crate::err_log!(buf, $level, $($args)+),
}.build()
}
$crate::error::Error::$variant($crate::err_log!(buf, $level, $($args)+))
}};
($variant:ident($($args:ident),+)) => {
$crate::error::Error::$variant($($args),+)
};
($variant:ident($($args:tt)+)) => {
$crate::paste::paste! {
{
let message: std::borrow::Cow<'static, str> = $crate::format_maybe!($($args)+);
$crate::error::[<$variant Snafu>] { message }.build()
}
}
$crate::error::Error::$variant($crate::format_maybe!($($args)+))
};
($level:ident!($($args:tt)+)) => {{
let mut buf = String::new();
let message: std::borrow::Cow<'static, str> = $crate::err_log!(buf, $level, $($args)+);
$crate::error::ErrSnafu { message }.build()
$crate::error::Error::Err($crate::err_log!(buf, $level, $($args)+))
}};
($($args:tt)+) => {
{
let message: std::borrow::Cow<'static, str> = $crate::format_maybe!($($args)+);
$crate::error::ErrSnafu { message }.build()
}
$crate::error::Error::Err($crate::format_maybe!($($args)+))
};
}
@@ -233,7 +134,7 @@ macro_rules! err_log {
};
($crate::error::visit)(&mut $out, LEVEL, &__CALLSITE, &mut valueset_all!(__CALLSITE.metadata().fields(), $($fields)+));
std::borrow::Cow::<'static, str>::from($out)
($out).into()
}}
}
+139 -448
View File
@@ -6,391 +6,151 @@
use std::{any::Any, borrow::Cow, convert::Infallible, sync::PoisonError};
use snafu::{IntoError, prelude::*};
pub use self::{err::visit, log::*};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
#[derive(thiserror::Error)]
pub enum Error {
#[snafu(display("PANIC!"))]
PanicAny {
panic: Box<dyn Any + Send>,
backtrace: snafu::Backtrace,
},
#[snafu(display("PANIC! {message}"))]
Panic {
message: &'static str,
panic: Box<dyn Any + Send + 'static>,
backtrace: snafu::Backtrace,
},
#[error("PANIC!")]
PanicAny(Box<dyn Any + Send>),
#[error("PANIC! {0}")]
Panic(&'static str, Box<dyn Any + Send + 'static>),
// std
#[snafu(display("Format error: {source}"))]
Fmt {
source: std::fmt::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("UTF-8 conversion error: {source}"))]
FromUtf8 {
source: std::string::FromUtf8Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("I/O error: {source}"))]
Io {
source: std::io::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("Parse float error: {source}"))]
ParseFloat {
source: std::num::ParseFloatError,
backtrace: snafu::Backtrace,
},
#[snafu(display("Parse int error: {source}"))]
ParseInt {
source: std::num::ParseIntError,
backtrace: snafu::Backtrace,
},
#[snafu(display("Error: {source}"))]
Std {
source: Box<dyn std::error::Error + Send>,
backtrace: snafu::Backtrace,
},
#[snafu(display("Thread access error: {source}"))]
ThreadAccessError {
source: std::thread::AccessError,
backtrace: snafu::Backtrace,
},
#[snafu(display("Integer conversion error: {source}"))]
TryFromInt {
source: std::num::TryFromIntError,
backtrace: snafu::Backtrace,
},
#[snafu(display("Slice conversion error: {source}"))]
TryFromSlice {
source: std::array::TryFromSliceError,
backtrace: snafu::Backtrace,
},
#[snafu(display("UTF-8 error: {source}"))]
Utf8 {
source: std::str::Utf8Error,
backtrace: snafu::Backtrace,
},
#[error(transparent)]
Fmt(#[from] std::fmt::Error),
#[error(transparent)]
FromUtf8(#[from] std::string::FromUtf8Error),
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error(transparent)]
ParseFloat(#[from] std::num::ParseFloatError),
#[error(transparent)]
ParseInt(#[from] std::num::ParseIntError),
#[error(transparent)]
Std(#[from] Box<dyn std::error::Error + Send>),
#[error(transparent)]
ThreadAccessError(#[from] std::thread::AccessError),
#[error(transparent)]
TryFromInt(#[from] std::num::TryFromIntError),
#[error(transparent)]
TryFromSlice(#[from] std::array::TryFromSliceError),
#[error(transparent)]
Utf8(#[from] std::str::Utf8Error),
// third-party
#[snafu(display("Capacity error: {source}"))]
CapacityError {
source: arrayvec::CapacityError,
backtrace: snafu::Backtrace,
},
#[snafu(display("Cargo.toml error: {source}"))]
CargoToml {
source: cargo_toml::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("Clap error: {source}"))]
Clap {
source: clap::error::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("Extension rejection: {source}"))]
Extension {
source: axum::extract::rejection::ExtensionRejection,
backtrace: snafu::Backtrace,
},
#[snafu(display("Figment error: {source}"))]
Figment {
source: figment::error::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("HTTP error: {source}"))]
Http {
source: http::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("Invalid HTTP header value: {source}"))]
HttpHeader {
source: http::header::InvalidHeaderValue,
backtrace: snafu::Backtrace,
},
#[snafu(display("Join error: {source}"))]
JoinError {
source: tokio::task::JoinError,
backtrace: snafu::Backtrace,
},
#[snafu(display("JSON error: {source}"))]
Json {
source: serde_json::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("JS parse int error: {source}"))]
JsParseInt {
source: ruma::JsParseIntError,
backtrace: snafu::Backtrace,
},
#[snafu(display("JS try from int error: {source}"))]
JsTryFromInt {
source: ruma::JsTryFromIntError,
backtrace: snafu::Backtrace,
},
#[snafu(display("Path rejection: {source}"))]
Path {
source: axum::extract::rejection::PathRejection,
backtrace: snafu::Backtrace,
},
#[snafu(display("Mutex poisoned: {message}"))]
Poison {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("Regex error: {source}"))]
Regex {
source: regex::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("Request error: {source}"))]
Reqwest {
source: reqwest::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("{message}"))]
SerdeDe {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("{message}"))]
SerdeSer {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("TOML deserialization error: {source}"))]
TomlDe {
source: toml::de::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("TOML serialization error: {source}"))]
TomlSer {
source: toml::ser::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("Tracing filter error: {source}"))]
TracingFilter {
source: tracing_subscriber::filter::ParseError,
backtrace: snafu::Backtrace,
},
#[snafu(display("Tracing reload error: {source}"))]
TracingReload {
source: tracing_subscriber::reload::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("Typed header rejection: {source}"))]
TypedHeader {
source: axum_extra::typed_header::TypedHeaderRejection,
backtrace: snafu::Backtrace,
},
#[snafu(display("YAML deserialization error: {source}"))]
YamlDe {
source: serde_saphyr::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("YAML serialization error: {source}"))]
YamlSer {
source: serde_saphyr::ser_error::Error,
backtrace: snafu::Backtrace,
},
#[error(transparent)]
CapacityError(#[from] arrayvec::CapacityError),
#[error(transparent)]
CargoToml(#[from] cargo_toml::Error),
#[error(transparent)]
Clap(#[from] clap::error::Error),
#[error(transparent)]
Extension(#[from] axum::extract::rejection::ExtensionRejection),
#[error(transparent)]
Figment(#[from] figment::error::Error),
#[error(transparent)]
Http(#[from] http::Error),
#[error(transparent)]
HttpHeader(#[from] http::header::InvalidHeaderValue),
#[error("Join error: {0}")]
JoinError(#[from] tokio::task::JoinError),
#[error(transparent)]
Json(#[from] serde_json::Error),
#[error(transparent)]
JsParseInt(#[from] ruma::JsParseIntError), // js_int re-export
#[error(transparent)]
JsTryFromInt(#[from] ruma::JsTryFromIntError), // js_int re-export
#[error(transparent)]
Path(#[from] axum::extract::rejection::PathRejection),
#[error("Mutex poisoned: {0}")]
Poison(Cow<'static, str>),
#[error("Regex error: {0}")]
Regex(#[from] regex::Error),
#[error("Request error: {0}")]
Reqwest(#[from] reqwest::Error),
#[error("{0}")]
SerdeDe(Cow<'static, str>),
#[error("{0}")]
SerdeSer(Cow<'static, str>),
#[error(transparent)]
TomlDe(#[from] toml::de::Error),
#[error(transparent)]
TomlSer(#[from] toml::ser::Error),
#[error("Tracing filter error: {0}")]
TracingFilter(#[from] tracing_subscriber::filter::ParseError),
#[error("Tracing reload error: {0}")]
TracingReload(#[from] tracing_subscriber::reload::Error),
#[error(transparent)]
TypedHeader(#[from] axum_extra::typed_header::TypedHeaderRejection),
#[error(transparent)]
YamlDe(#[from] serde_saphyr::Error),
#[error(transparent)]
YamlSer(#[from] serde_saphyr::ser_error::Error),
// ruma/conduwuit
#[snafu(display("Arithmetic operation failed: {message}"))]
Arithmetic {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("{kind}: {message}"))]
BadRequest {
kind: ruma::api::client::error::ErrorKind,
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("{message}"))]
BadServerResponse {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("Canonical JSON error: {source}"))]
CanonicalJson {
source: ruma::CanonicalJsonError,
backtrace: snafu::Backtrace,
},
#[snafu(display(
"There was a problem with the '{directive}' directive in your configuration: {message}"
))]
Config {
directive: &'static str,
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("{message}"))]
Conflict {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("Content disposition error: {source}"))]
ContentDisposition {
source: ruma::http_headers::ContentDispositionParseError,
backtrace: snafu::Backtrace,
},
#[snafu(display("{message}"))]
Database {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("Feature '{feature}' is not available on this server."))]
FeatureDisabled {
feature: Cow<'static, str>,
},
#[snafu(display("Remote server {server} responded with: {error}"))]
Federation {
server: ruma::OwnedServerName,
error: ruma::api::client::error::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("{message} in {room_id}"))]
InconsistentRoomState {
message: &'static str,
room_id: ruma::OwnedRoomId,
backtrace: snafu::Backtrace,
},
#[snafu(display("HTTP conversion error: {source}"))]
IntoHttp {
source: ruma::api::error::IntoHttpError,
backtrace: snafu::Backtrace,
},
#[snafu(display("{message}"))]
Ldap {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("MXC URI error: {source}"))]
Mxc {
source: ruma::MxcUriError,
backtrace: snafu::Backtrace,
},
#[snafu(display("Matrix ID parse error: {source}"))]
Mxid {
source: ruma::IdParseError,
backtrace: snafu::Backtrace,
},
#[snafu(display("from {server}: {error}"))]
Redaction {
server: ruma::OwnedServerName,
error: ruma::canonical_json::RedactionError,
backtrace: snafu::Backtrace,
},
#[snafu(display("{kind}: {message}"))]
Request {
kind: ruma::api::client::error::ErrorKind,
message: Cow<'static, str>,
code: http::StatusCode,
backtrace: Option<snafu::Backtrace>,
},
#[snafu(display("Ruma error: {source}"))]
Ruma {
source: ruma::api::client::error::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("Signature error: {source}"))]
Signatures {
source: ruma::signatures::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("State resolution error: {source}"))]
#[snafu(context(false))]
StateRes {
source: crate::state_res::Error,
},
#[snafu(display("uiaa"))]
Uiaa {
info: ruma::api::client::uiaa::UiaaInfo,
},
#[error("Arithmetic operation failed: {0}")]
Arithmetic(Cow<'static, str>),
#[error("{0}: {1}")]
BadRequest(ruma::api::client::error::ErrorKind, &'static str), //TODO: remove
#[error("{0}")]
BadServerResponse(Cow<'static, str>),
#[error(transparent)]
CanonicalJson(#[from] ruma::CanonicalJsonError),
#[error("There was a problem with the '{0}' directive in your configuration: {1}")]
Config(&'static str, Cow<'static, str>),
#[error("{0}")]
Conflict(Cow<'static, str>), // This is only needed for when a room alias already exists
#[error(transparent)]
ContentDisposition(#[from] ruma::http_headers::ContentDispositionParseError),
#[error("{0}")]
Database(Cow<'static, str>),
#[error("Feature '{0}' is not available on this server.")]
FeatureDisabled(Cow<'static, str>),
#[error("Remote server {0} responded with: {1}")]
Federation(ruma::OwnedServerName, ruma::api::client::error::Error),
#[error("{0} in {1}")]
InconsistentRoomState(&'static str, ruma::OwnedRoomId),
#[error(transparent)]
IntoHttp(#[from] ruma::api::error::IntoHttpError),
#[error("{0}")]
Ldap(Cow<'static, str>),
#[error(transparent)]
Mxc(#[from] ruma::MxcUriError),
#[error(transparent)]
Mxid(#[from] ruma::IdParseError),
#[error("from {0}: {1}")]
Redaction(ruma::OwnedServerName, ruma::canonical_json::RedactionError),
#[error("{0}: {1}")]
Request(ruma::api::client::error::ErrorKind, Cow<'static, str>, http::StatusCode),
#[error(transparent)]
Ruma(#[from] ruma::api::client::error::Error),
#[error(transparent)]
Signatures(#[from] ruma::signatures::Error),
#[error(transparent)]
StateRes(#[from] crate::state_res::Error),
#[error("uiaa")]
Uiaa(ruma::api::client::uiaa::UiaaInfo),
// unique / untyped
#[snafu(display("{message}"))]
Err {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[error("{0}")]
Err(Cow<'static, str>),
}
impl Error {
#[inline]
#[must_use]
pub fn from_errno() -> Self { IoSnafu {}.into_error(std::io::Error::last_os_error()) }
pub fn from_errno() -> Self { Self::Io(std::io::Error::last_os_error()) }
//#[deprecated]
#[must_use]
pub fn bad_database(message: &'static str) -> Self {
let message: Cow<'static, str> = message.into();
DatabaseSnafu { message }.build()
crate::err!(Database(error!("{message}")))
}
/// Sanitizes public-facing errors that can leak sensitive information.
pub fn sanitized_message(&self) -> String {
match self {
| Self::Database { .. } => String::from("Database error occurred."),
| Self::Io { .. } => String::from("I/O error occurred."),
| Self::Database(..) => String::from("Database error occurred."),
| Self::Io(..) => String::from("I/O error occurred."),
| _ => self.message(),
}
}
@@ -398,8 +158,8 @@ pub fn sanitized_message(&self) -> String {
/// Generate the error message string.
pub fn message(&self) -> String {
match self {
| Self::Federation { server, error, .. } => format!("Answer from {server}: {error}"),
| Self::Ruma { source, .. } => response::ruma_error_message(source),
| Self::Federation(origin, error) => format!("Answer from {origin}: {error}"),
| Self::Ruma(error) => response::ruma_error_message(error),
| _ => format!("{self}"),
}
}
@@ -410,10 +170,10 @@ pub fn kind(&self) -> ruma::api::client::error::ErrorKind {
use ruma::api::client::error::ErrorKind::{FeatureDisabled, Unknown};
match self {
| Self::Federation { error, .. } => response::ruma_error_kind(error).clone(),
| Self::Ruma { source, .. } => response::ruma_error_kind(source).clone(),
| Self::BadRequest { kind, .. } | Self::Request { kind, .. } => kind.clone(),
| Self::FeatureDisabled { .. } => FeatureDisabled,
| Self::Federation(_, error) | Self::Ruma(error) =>
response::ruma_error_kind(error).clone(),
| Self::BadRequest(kind, ..) | Self::Request(kind, ..) => kind.clone(),
| Self::FeatureDisabled(..) => FeatureDisabled,
| _ => Unknown,
}
}
@@ -424,15 +184,13 @@ pub fn status_code(&self) -> http::StatusCode {
use http::StatusCode;
match self {
| Self::Federation { error, .. } => error.status_code,
| Self::Ruma { source, .. } => source.status_code,
| Self::Request { kind, code, .. } => response::status_code(kind, *code),
| Self::BadRequest { kind, .. } => response::bad_request_code(kind),
| Self::FeatureDisabled { .. } => response::bad_request_code(&self.kind()),
| Self::Reqwest { source, .. } =>
source.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
| Self::Conflict { .. } => StatusCode::CONFLICT,
| Self::Io { source, .. } => response::io_error_code(source.kind()),
| Self::Federation(_, error) | Self::Ruma(error) => error.status_code,
| Self::Request(kind, _, code) => response::status_code(kind, *code),
| Self::BadRequest(kind, ..) => response::bad_request_code(kind),
| Self::FeatureDisabled(..) => response::bad_request_code(&self.kind()),
| Self::Reqwest(error) => error.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
| Self::Conflict(_) => StatusCode::CONFLICT,
| Self::Io(error) => response::io_error_code(error.kind()),
| _ => StatusCode::INTERNAL_SERVER_ERROR,
}
}
@@ -445,46 +203,16 @@ pub fn status_code(&self) -> http::StatusCode {
pub fn is_not_found(&self) -> bool { self.status_code() == http::StatusCode::NOT_FOUND }
}
// Debug is already derived by Snafu
/// Macro to reduce boilerplate for From implementations using Snafu context
macro_rules! impl_from_snafu {
($source_ty:ty => $context:ident) => {
impl From<$source_ty> for Error {
fn from(source: $source_ty) -> Self { $context.into_error(source) }
}
};
}
/// Macro for From impls that format messages into ErrSnafu or other
/// message-based contexts
macro_rules! impl_from_message {
($source_ty:ty => $context:ident, $msg:expr) => {
impl From<$source_ty> for Error {
fn from(source: $source_ty) -> Self {
let message: Cow<'static, str> = format!($msg, source).into();
$context { message }.build()
}
}
};
}
/// Macro for From impls with constant messages (no formatting)
macro_rules! impl_from_const_message {
($source_ty:ty => $context:ident, $msg:expr) => {
impl From<$source_ty> for Error {
fn from(_source: $source_ty) -> Self {
let message: Cow<'static, str> = $msg.into();
$context { message }.build()
}
}
};
impl std::fmt::Debug for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.message())
}
}
impl<T> From<PoisonError<T>> for Error {
#[cold]
#[inline(never)]
fn from(e: PoisonError<T>) -> Self { PoisonSnafu { message: e.to_string() }.build() }
fn from(e: PoisonError<T>) -> Self { Self::Poison(e.to_string().into()) }
}
#[allow(clippy::fallible_impl_from)]
@@ -496,43 +224,6 @@ fn from(_e: Infallible) -> Self {
}
}
// Implementations using the macro
impl_from_snafu!(std::io::Error => IoSnafu);
impl_from_snafu!(std::string::FromUtf8Error => FromUtf8Snafu);
impl_from_snafu!(regex::Error => RegexSnafu);
impl_from_snafu!(ruma::http_headers::ContentDispositionParseError => ContentDispositionSnafu);
impl_from_snafu!(ruma::api::error::IntoHttpError => IntoHttpSnafu);
impl_from_snafu!(ruma::JsTryFromIntError => JsTryFromIntSnafu);
impl_from_snafu!(ruma::CanonicalJsonError => CanonicalJsonSnafu);
impl_from_snafu!(axum::extract::rejection::PathRejection => PathSnafu);
impl_from_snafu!(clap::error::Error => ClapSnafu);
impl_from_snafu!(ruma::MxcUriError => MxcSnafu);
impl_from_snafu!(serde_saphyr::ser_error::Error => YamlSerSnafu);
impl_from_snafu!(toml::de::Error => TomlDeSnafu);
impl_from_snafu!(http::header::InvalidHeaderValue => HttpHeaderSnafu);
impl_from_snafu!(serde_json::Error => JsonSnafu);
// Custom implementations using message formatting
impl_from_const_message!(std::fmt::Error => ErrSnafu, "formatting error");
impl_from_message!(std::str::Utf8Error => ErrSnafu, "UTF-8 error: {}");
impl_from_message!(std::num::TryFromIntError => ArithmeticSnafu, "integer conversion error: {}");
impl_from_message!(tracing_subscriber::reload::Error => ErrSnafu, "tracing reload error: {}");
impl_from_message!(reqwest::Error => ErrSnafu, "HTTP client error: {}");
impl_from_message!(ruma::signatures::Error => ErrSnafu, "Signature error: {}");
impl_from_message!(ruma::IdParseError => ErrSnafu, "ID parse error: {}");
impl_from_message!(std::num::ParseIntError => ErrSnafu, "Integer parse error: {}");
impl_from_message!(std::array::TryFromSliceError => ErrSnafu, "Slice conversion error: {}");
impl_from_message!(tokio::task::JoinError => ErrSnafu, "Task join error: {}");
impl_from_message!(serde_saphyr::Error => ErrSnafu, "YAML error: {}");
// Generic implementation for CapacityError
impl<T> From<arrayvec::CapacityError<T>> for Error {
fn from(_source: arrayvec::CapacityError<T>) -> Self {
let message: Cow<'static, str> = "capacity error: buffer is full".into();
ErrSnafu { message }.build()
}
}
#[cold]
#[inline(never)]
pub fn infallible(_e: &Infallible) {
+5 -8
View File
@@ -15,16 +15,13 @@ pub fn panic(self) -> ! { panic_any(self.into_panic()) }
#[must_use]
#[inline]
pub fn from_panic(e: Box<dyn Any + Send>) -> Self {
use super::PanicSnafu;
PanicSnafu { message: debug::panic_str(&e), panic: e }.build()
}
pub fn from_panic(e: Box<dyn Any + Send>) -> Self { Self::Panic(debug::panic_str(&e), e) }
#[inline]
pub fn into_panic(self) -> Box<dyn Any + Send + 'static> {
match self {
| Self::Panic { panic, .. } | Self::PanicAny { panic, .. } => panic,
| Self::JoinError { source, .. } => source.into_panic(),
| Self::Panic(_, e) | Self::PanicAny(e) => e,
| Self::JoinError(e) => e.into_panic(),
| _ => Box::new(self),
}
}
@@ -40,8 +37,8 @@ pub fn panic_str(self) -> Option<&'static str> {
#[inline]
pub fn is_panic(&self) -> bool {
match &self {
| Self::Panic { .. } | Self::PanicAny { .. } => true,
| Self::JoinError { source, .. } => source.is_panic(),
| Self::Panic(..) | Self::PanicAny(..) => true,
| Self::JoinError(e) => e.is_panic(),
| _ => false,
}
}
+2 -2
View File
@@ -47,8 +47,8 @@ fn into_response(self) -> axum::response::Response {
impl From<Error> for UiaaResponse {
#[inline]
fn from(error: Error) -> Self {
if let Error::Uiaa { info, .. } = error {
return Self::AuthResponse(info);
if let Error::Uiaa(uiaainfo) = error {
return Self::AuthResponse(uiaainfo);
}
let body = ErrorBody::Standard {
+2 -8
View File
@@ -5,15 +5,9 @@
use crate::Error;
impl de::Error for Error {
fn custom<T: Display + ToString>(msg: T) -> Self {
let message: std::borrow::Cow<'static, str> = msg.to_string().into();
super::SerdeDeSnafu { message }.build()
}
fn custom<T: Display + ToString>(msg: T) -> Self { Self::SerdeDe(msg.to_string().into()) }
}
impl ser::Error for Error {
fn custom<T: Display + ToString>(msg: T) -> Self {
let message: std::borrow::Cow<'static, str> = msg.to_string().into();
super::SerdeSerSnafu { message }.build()
}
fn custom<T: Display + ToString>(msg: T) -> Self { Self::SerdeSer(msg.to_string().into()) }
}
+9
View File
@@ -14,6 +14,7 @@
static VERSION: OnceLock<String> = OnceLock::new();
static VERSION_UA: OnceLock<String> = OnceLock::new();
static USER_AGENT: OnceLock<String> = OnceLock::new();
static USER_AGENT_MEDIA: OnceLock<String> = OnceLock::new();
#[inline]
#[must_use]
@@ -21,14 +22,22 @@ pub fn name() -> &'static str { BRANDING }
#[inline]
pub fn version() -> &'static str { VERSION.get_or_init(init_version) }
#[inline]
pub fn version_ua() -> &'static str { VERSION_UA.get_or_init(init_version_ua) }
#[inline]
pub fn user_agent() -> &'static str { USER_AGENT.get_or_init(init_user_agent) }
#[inline]
pub fn user_agent_media() -> &'static str { USER_AGENT_MEDIA.get_or_init(init_user_agent_media) }
fn init_user_agent() -> String { format!("{}/{} (bot; +{WEBSITE})", name(), version_ua()) }
fn init_user_agent_media() -> String {
format!("{}/{} (embedbot; facebookexternalhit/1.1; +{WEBSITE})", name(), version_ua())
}
fn init_version_ua() -> String {
conduwuit_build_metadata::version_tag()
.map_or_else(|| SEMANTIC.to_owned(), |extra| format!("{SEMANTIC}+{extra}"))
+3 -10
View File
@@ -1,7 +1,7 @@
use ruma::{RoomVersionId, canonical_json::redact_content_in_place};
use serde_json::{Value as JsonValue, json, value::to_raw_value};
use crate::{Result, err, implement};
use crate::{Error, Result, err, implement};
#[implement(super::Pdu)]
pub fn redact(&mut self, room_version_id: &RoomVersionId, reason: JsonValue) -> Result {
@@ -10,15 +10,8 @@ pub fn redact(&mut self, room_version_id: &RoomVersionId, reason: JsonValue) ->
let mut content = serde_json::from_str(self.content.get())
.map_err(|e| err!(Request(BadJson("Failed to deserialize content into type: {e}"))))?;
redact_content_in_place(&mut content, room_version_id, self.kind.to_string()).map_err(
|error| {
crate::error::RedactionSnafu {
server: self.sender.server_name().to_owned(),
error,
}
.build()
},
)?;
redact_content_in_place(&mut content, room_version_id, self.kind.to_string())
.map_err(|e| Error::Redaction(self.sender.server_name().to_owned(), e))?;
let reason = serde_json::to_value(reason).expect("Failed to preserialize reason");
+5 -7
View File
@@ -27,7 +27,7 @@
use crate::{
matrix::{Event, Pdu, pdu::EventHash},
state_res::{self as state_res, Error, Result, StateMap, error::NotFoundSnafu},
state_res::{self as state_res, Error, Result, StateMap},
};
static SERVER_TIMESTAMP: AtomicU64 = AtomicU64::new(0);
@@ -170,12 +170,10 @@ fn resolve_deeper_event_set(c: &mut test::Bencher) {
#[allow(unused)]
impl<E: Event + Clone> TestStore<E> {
fn get_event(&self, room_id: &RoomId, event_id: &EventId) -> Result<E> {
self.0.get(event_id).cloned().ok_or_else(|| {
NotFoundSnafu {
message: format!("{} not found", event_id),
}
.build()
})
self.0
.get(event_id)
.cloned()
.ok_or_else(|| Error::NotFound(format!("{} not found", event_id)))
}
/// Returns the events that correspond to the `event_ids` sorted in the same
+10 -27
View File
@@ -1,40 +1,23 @@
use serde_json::Error as JsonError;
use snafu::{IntoError, prelude::*};
use thiserror::Error;
/// Represents the various errors that arise when resolving state.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum Error {
/// A deserialization error.
#[snafu(display("JSON error: {source}"))]
SerdeJson {
source: JsonError,
backtrace: snafu::Backtrace,
},
#[error(transparent)]
SerdeJson(#[from] JsonError),
/// The given option or version is unsupported.
#[snafu(display("Unsupported room version: {version}"))]
Unsupported {
version: String,
backtrace: snafu::Backtrace,
},
#[error("Unsupported room version: {0}")]
Unsupported(String),
/// The given event was not found.
#[snafu(display("Not found error: {message}"))]
NotFound {
message: String,
backtrace: snafu::Backtrace,
},
#[error("Not found error: {0}")]
NotFound(String),
/// Invalid fields in the given PDU.
#[snafu(display("Invalid PDU: {message}"))]
InvalidPdu {
message: String,
backtrace: snafu::Backtrace,
},
}
impl From<serde_json::Error> for Error {
fn from(source: serde_json::Error) -> Self { SerdeJsonSnafu.into_error(source) }
#[error("Invalid PDU: {0}")]
InvalidPdu(String),
}
+3 -4
View File
@@ -24,7 +24,6 @@
use super::{
Error, Event, Result, StateEventType, StateKey, TimelineEventType,
error::InvalidPduSnafu,
power_levels::{
deserialize_power_levels, deserialize_power_levels_content_fields,
deserialize_power_levels_content_invite, deserialize_power_levels_content_redact,
@@ -384,8 +383,8 @@ pub async fn auth_check<E, F, Fut>(
return Ok(false);
}
let target_user = <&UserId>::try_from(state_key)
.map_err(|e| InvalidPduSnafu { message: format!("{e}") }.build())?;
let target_user =
<&UserId>::try_from(state_key).map_err(|e| Error::InvalidPdu(format!("{e}")))?;
let user_for_join_auth = content
.join_authorised_via_users_server
@@ -462,7 +461,7 @@ pub async fn auth_check<E, F, Fut>(
?sender_membership_event_content,
"Sender membership event content missing membership field"
);
return Err(InvalidPduSnafu { message: "Missing membership field" }.build());
return Err(Error::InvalidPdu("Missing membership field".to_owned()));
};
let membership_state = membership_state.deserialize()?;
+29 -41
View File
@@ -29,18 +29,18 @@
};
use serde_json::from_str as from_json_str;
pub(crate) use self::error::{Error, InvalidPduSnafu, NotFoundSnafu};
pub(crate) use self::error::Error;
use self::power_levels::PowerLevelsContentFields;
pub use self::{
event_auth::{auth_check, auth_types_for_event},
room_version::RoomVersion,
};
use super::{Event, StateKey};
use crate::{
debug, debug_error,
debug, debug_error, err,
matrix::{Event, StateKey},
state_res::room_version::StateResolutionVersion,
trace,
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt},
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, WidebandExt},
warn,
};
@@ -118,10 +118,7 @@ pub async fn resolve<'a, Pdu, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, Ex
let csg = calculate_conflicted_subgraph(&conflicting, event_fetch)
.await
.ok_or_else(|| {
InvalidPduSnafu {
message: "Failed to calculate conflicted subgraph",
}
.build()
Error::InvalidPdu("Failed to calculate conflicted subgraph".to_owned())
})?;
debug!(count = csg.len(), "conflicted subgraph");
trace!(set = ?csg, "conflicted subgraph");
@@ -152,11 +149,10 @@ pub async fn resolve<'a, Pdu, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, Ex
let control_events: Vec<_> = all_conflicted
.iter()
.stream()
.broad_filter_map(async |id| {
event_fetch(id.clone())
.wide_filter_map(async |id| {
is_power_event_id(id, &event_fetch)
.await
.filter(|event| is_power_event(&event))
.map(|_| id.clone())
.then_some(id.clone())
})
.collect()
.await;
@@ -318,10 +314,7 @@ async fn calculate_conflicted_subgraph<F, Fut, E>(
trace!(event_id = event_id.as_str(), "fetching event for its auth events");
let evt = fetch_event(event_id.clone()).await;
if evt.is_none() {
tracing::error!(
"could not fetch event {} to calculate conflicted subgraph",
event_id
);
err!("could not fetch event {} to calculate conflicted subgraph", event_id);
path.pop();
continue;
}
@@ -409,11 +402,11 @@ async fn reverse_topological_power_sort<E, F, Fut>(
let fetcher = async |event_id: OwnedEventId| {
let pl = *event_to_pl
.get(&event_id)
.ok_or_else(|| NotFoundSnafu { message: "" }.build())?;
.ok_or_else(|| Error::NotFound(String::new()))?;
let ev = fetch_event(event_id)
.await
.ok_or_else(|| NotFoundSnafu { message: "" }.build())?;
.ok_or_else(|| Error::NotFound(String::new()))?;
Ok((pl, ev.origin_server_ts()))
};
@@ -619,12 +612,9 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
let events_to_check: Vec<_> = events_to_check
.map(Result::Ok)
.broad_and_then(async |event_id| {
fetch_event(event_id.to_owned()).await.ok_or_else(|| {
NotFoundSnafu {
message: format!("Failed to find {event_id}"),
}
.build()
})
fetch_event(event_id.to_owned())
.await
.ok_or_else(|| Error::NotFound(format!("Failed to find {event_id}")))
})
.try_collect()
.boxed()
@@ -663,7 +653,7 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
trace!(event_id = event.event_id().as_str(), "checking event");
let state_key = event
.state_key()
.ok_or_else(|| InvalidPduSnafu { message: "State event had no state key" }.build())?;
.ok_or_else(|| Error::InvalidPdu("State event had no state key".to_owned()))?;
let auth_types = auth_types_for_event(
event.event_type(),
@@ -679,14 +669,13 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
trace!("room version uses hashed IDs, manually fetching create event");
let create_event_id_raw = event.room_id_or_hash().as_str().replace('!', "$");
let create_event_id = EventId::parse(&create_event_id_raw).map_err(|e| {
InvalidPduSnafu {
message: format!("Failed to parse create event ID from room ID/hash: {e}"),
}
.build()
})?;
let create_event = fetch_event(create_event_id.into()).await.ok_or_else(|| {
NotFoundSnafu { message: "Failed to find create event" }.build()
Error::InvalidPdu(format!(
"Failed to parse create event ID from room ID/hash: {e}"
))
})?;
let create_event = fetch_event(create_event_id.into())
.await
.ok_or_else(|| Error::NotFound("Failed to find create event".into()))?;
auth_state.insert(create_event.event_type().with_state_key(""), create_event);
}
for aid in event.auth_events() {
@@ -697,7 +686,7 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
auth_state.insert(
ev.event_type()
.with_state_key(ev.state_key().ok_or_else(|| {
InvalidPduSnafu { message: "State event had no state key" }.build()
Error::InvalidPdu("State event had no state key".to_owned())
})?),
ev.clone(),
);
@@ -812,13 +801,13 @@ async fn mainline_sort<E, F, Fut>(
let event = fetch_event(p.clone())
.await
.ok_or_else(|| NotFoundSnafu { message: format!("Failed to find {p}") }.build())?;
.ok_or_else(|| Error::NotFound(format!("Failed to find {p}")))?;
pl = None;
for aid in event.auth_events() {
let ev = fetch_event(aid.to_owned()).await.ok_or_else(|| {
NotFoundSnafu { message: format!("Failed to find {aid}") }.build()
})?;
let ev = fetch_event(aid.to_owned())
.await
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
if is_type_and_key(&ev, &TimelineEventType::RoomPowerLevels, "") {
pl = Some(aid.to_owned());
@@ -880,9 +869,9 @@ async fn get_mainline_depth<E, F, Fut>(
event = None;
for aid in sort_ev.auth_events() {
let aev = fetch_event(aid.to_owned()).await.ok_or_else(|| {
NotFoundSnafu { message: format!("Failed to find {aid}") }.build()
})?;
let aev = fetch_event(aid.to_owned())
.await
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
if is_type_and_key(&aev, &TimelineEventType::RoomPowerLevels, "") {
event = Some(aev);
@@ -926,7 +915,6 @@ async fn add_event_and_auth_chain_to_graph<E, F, Fut>(
}
}
#[allow(dead_code)]
async fn is_power_event_id<E, F, Fut>(event_id: &EventId, fetch: &F) -> bool
where
F: Fn(OwnedEventId) -> Fut + Sync,
+2 -6
View File
@@ -1,6 +1,6 @@
use ruma::RoomVersionId;
use super::{Result, error::UnsupportedSnafu};
use super::{Error, Result};
#[derive(Debug)]
#[allow(clippy::exhaustive_enums)]
@@ -163,11 +163,7 @@ pub fn new(version: &RoomVersionId) -> Result<Self> {
| RoomVersionId::V10 => Self::V10,
| RoomVersionId::V11 => Self::V11,
| RoomVersionId::V12 => Self::V12,
| ver =>
return Err(UnsupportedSnafu {
version: format!("found version `{ver}`"),
}
.build()),
| ver => return Err(Error::Unsupported(format!("found version `{ver}`"))),
})
}
}
+2 -2
View File
@@ -22,7 +22,7 @@
value::{RawValue as RawJsonValue, to_raw_value as to_raw_json_value},
};
use super::{auth_types_for_event, error::NotFoundSnafu};
use super::auth_types_for_event;
use crate::{
Result, RoomVersion, info,
matrix::{Event, EventTypeExt, Pdu, StateMap, pdu::EventHash},
@@ -232,7 +232,7 @@ pub(crate) fn get_event(&self, _: &RoomId, event_id: &EventId) -> Result<E> {
self.0
.get(event_id)
.cloned()
.ok_or_else(|| NotFoundSnafu { message: format!("{event_id} not found") }.build())
.ok_or_else(|| super::Error::NotFound(format!("{event_id} not found")))
.map_err(Into::into)
}
-2
View File
@@ -14,11 +14,9 @@
pub use ::arrayvec;
pub use ::http;
pub use ::paste;
pub use ::ruma;
pub use ::smallstr;
pub use ::smallvec;
pub use ::snafu;
pub use ::toml;
pub use ::tracing;
pub use config::Config;
+1
View File
@@ -11,6 +11,7 @@
pub mod math;
pub mod mutex_map;
pub mod rand;
pub mod response;
pub mod result;
pub mod set;
pub mod stream;
+51
View File
@@ -0,0 +1,51 @@
use futures::StreamExt;
use num_traits::ToPrimitive;
use crate::Err;
/// Reads the response body while enforcing a maximum size limit to prevent
/// memory exhaustion.
pub async fn limit_read(response: reqwest::Response, max_size: u64) -> crate::Result<Vec<u8>> {
if response.content_length().is_some_and(|len| len > max_size) {
return Err!(BadServerResponse("Response too large"));
}
let mut data = Vec::new();
let mut reader = response.bytes_stream();
while let Some(chunk) = reader.next().await {
let chunk = chunk?;
data.extend_from_slice(&chunk);
if data.len() > max_size.to_usize().expect("max_size must fit in usize") {
return Err!(BadServerResponse("Response too large"));
}
}
Ok(data)
}
/// Reads the response body as text while enforcing a maximum size limit to
/// prevent memory exhaustion.
pub async fn limit_read_text(
response: reqwest::Response,
max_size: u64,
) -> crate::Result<String> {
let text = String::from_utf8(limit_read(response, max_size).await?)?;
Ok(text)
}
#[allow(async_fn_in_trait)]
pub trait LimitReadExt {
async fn limit_read(self, max_size: u64) -> crate::Result<Vec<u8>>;
async fn limit_read_text(self, max_size: u64) -> crate::Result<String>;
}
impl LimitReadExt for reqwest::Response {
async fn limit_read(self, max_size: u64) -> crate::Result<Vec<u8>> {
limit_read(self, max_size).await
}
async fn limit_read_text(self, max_size: u64) -> crate::Result<String> {
limit_read_text(self, max_size).await
}
}
+7 -9
View File
@@ -3,19 +3,17 @@
stream::{Stream, TryStream},
};
use crate::{Error, Result};
pub trait IterStream<I: IntoIterator + Send> {
/// Convert an Iterator into a Stream
fn stream(self) -> impl Stream<Item = <I as IntoIterator>::Item> + Send;
/// Convert an Iterator into a TryStream
fn try_stream(
/// Convert an Iterator into a TryStream with a generic error type
fn try_stream<E>(
self,
) -> impl TryStream<
Ok = <I as IntoIterator>::Item,
Error = Error,
Item = Result<<I as IntoIterator>::Item, Error>,
Error = E,
Item = Result<<I as IntoIterator>::Item, E>,
> + Send;
}
@@ -28,12 +26,12 @@ impl<I> IterStream<I> for I
fn stream(self) -> impl Stream<Item = <I as IntoIterator>::Item> + Send { stream::iter(self) }
#[inline]
fn try_stream(
fn try_stream<E>(
self,
) -> impl TryStream<
Ok = <I as IntoIterator>::Item,
Error = Error,
Item = Result<<I as IntoIterator>::Item, Error>,
Error = E,
Item = Result<<I as IntoIterator>::Item, E>,
> + Send {
self.stream().map(Ok)
}
+2 -1
View File
@@ -1,9 +1,10 @@
//! Synchronous combinator extensions to futures::TryStream
use std::result::Result;
use futures::{TryFuture, TryStream, TryStreamExt};
use super::automatic_width;
use crate::Result;
/// Concurrency extensions to augment futures::TryStreamExt. broad_ combinators
/// produce out-of-order
+1 -5
View File
@@ -2,8 +2,6 @@
use std::{cell::Cell, fmt::Debug, path::PathBuf, sync::LazyLock};
use snafu::IntoError;
use crate::{Result, is_equal_to};
type Id = usize;
@@ -144,9 +142,7 @@ pub fn getcpu() -> Result<usize> {
#[cfg(not(target_os = "linux"))]
#[inline]
pub fn getcpu() -> Result<usize> {
Err(crate::error::IoSnafu.into_error(std::io::ErrorKind::Unsupported.into()))
}
pub fn getcpu() -> Result<usize> { Err(crate::Error::Io(std::io::ErrorKind::Unsupported.into())) }
fn query_cores_available() -> impl Iterator<Item = Id> {
core_affinity::get_core_ids()
+7 -12
View File
@@ -255,10 +255,7 @@ fn deserialize_newtype_struct<V>(self, name: &'static str, visitor: V) -> Result
| "$serde_json::private::RawValue" => visitor.visit_map(self),
| "Cbor" => visitor
.visit_newtype_struct(&mut minicbor_serde::Deserializer::new(self.record_trail()))
.map_err(|e| {
let message: std::borrow::Cow<'static, str> = e.to_string().into();
conduwuit_core::error::SerdeDeSnafu { message }.build()
}),
.map_err(|e| Self::Error::SerdeDe(e.to_string().into())),
| _ => visitor.visit_newtype_struct(self),
}
@@ -316,10 +313,9 @@ fn deserialize_i64<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> {
let end = self.pos.saturating_add(BYTES).min(self.buf.len());
let bytes: ArrayVec<u8, BYTES> = self.buf[self.pos..end].try_into()?;
let bytes = bytes.into_inner().map_err(|_| {
let message: std::borrow::Cow<'static, str> = "i64 buffer underflow".into();
conduwuit_core::error::SerdeDeSnafu { message }.build()
})?;
let bytes = bytes
.into_inner()
.map_err(|_| Self::Error::SerdeDe("i64 buffer underflow".into()))?;
self.inc_pos(BYTES);
visitor.visit_i64(i64::from_be_bytes(bytes))
@@ -349,10 +345,9 @@ fn deserialize_u64<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> {
let end = self.pos.saturating_add(BYTES).min(self.buf.len());
let bytes: ArrayVec<u8, BYTES> = self.buf[self.pos..end].try_into()?;
let bytes = bytes.into_inner().map_err(|_| {
let message: std::borrow::Cow<'static, str> = "u64 buffer underflow".into();
conduwuit_core::error::SerdeDeSnafu { message }.build()
})?;
let bytes = bytes
.into_inner()
.map_err(|_| Self::Error::SerdeDe("u64 buffer underflow".into()))?;
self.inc_pos(BYTES);
visitor.visit_u64(u64::from_be_bytes(bytes))
+4
View File
@@ -362,6 +362,10 @@ pub(super) fn open_list(db: &Arc<Engine>, maps: &[Descriptor]) -> Result<Maps> {
name: "userid_blurhash",
..descriptor::RANDOM_SMALL
},
Descriptor {
name: "userid_dehydrateddevice",
..descriptor::RANDOM_SMALL
},
Descriptor {
name: "userid_devicelistversion",
..descriptor::RANDOM_SMALL
+1 -4
View File
@@ -199,10 +199,7 @@ fn serialize_newtype_struct<T>(self, name: &'static str, value: &T) -> Result<Se
value
.serialize(&mut Serializer::new(&mut Writer::new(&mut self.out)))
.map_err(|e| {
let message: std::borrow::Cow<'static, str> = e.to_string().into();
conduwuit_core::error::SerdeSerSnafu { message }.build()
})
.map_err(|e| Self::Error::SerdeSer(e.to_string().into()))
},
| _ => unhandled!("Unrecognized serialization Newtype {name:?}"),
}
+2 -7
View File
@@ -1,4 +1,4 @@
use std::{borrow::Cow, sync::Arc};
use std::sync::Arc;
use axum::{Router, response::IntoResponse};
use conduwuit::Error;
@@ -18,10 +18,5 @@ pub(crate) fn build(services: &Arc<Services>) -> (Router, Guard) {
}
async fn not_found(_uri: Uri) -> impl IntoResponse {
Error::Request {
kind: ErrorKind::Unrecognized,
message: Cow::Borrowed("Not Found"),
code: StatusCode::NOT_FOUND,
backtrace: None,
}
Error::Request(ErrorKind::Unrecognized, "Not Found".into(), StatusCode::NOT_FOUND)
}
+15 -1
View File
@@ -530,7 +530,12 @@ async fn handle_response_error(
Ok(())
}
pub async fn is_admin_command<E>(&self, event: &E, body: &str) -> Option<InvocationSource>
pub async fn is_admin_command<E>(
&self,
event: &E,
body: &str,
sent_locally: bool,
) -> Option<InvocationSource>
where
E: Event + Send + Sync,
{
@@ -580,6 +585,15 @@ pub async fn is_admin_command<E>(&self, event: &E, body: &str) -> Option<Invocat
return None;
}
// Escaped commands must be sent locally (via client API), not via federation
if !sent_locally {
conduwuit::warn!(
"Ignoring escaped admin command from {} that arrived via federation",
event.sender()
);
return None;
}
// Looks good
Some(InvocationSource::EscapedCommand)
}
+2 -2
View File
@@ -18,7 +18,7 @@
use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use conduwuit::{Result, Server, debug, error, warn};
use conduwuit::{Result, Server, debug, error, utils::response::LimitReadExt, warn};
use database::{Deserialized, Map};
use ruma::events::{Mentions, room::message::RoomMessageEventContent};
use serde::Deserialize;
@@ -137,7 +137,7 @@ async fn check(&self) -> Result<()> {
.get(CHECK_FOR_ANNOUNCEMENTS_URL)
.send()
.await?
.text()
.limit_read_text(1024 * 1024)
.await?;
let response = serde_json::from_str::<CheckForAnnouncementsResponse>(&response)?;
+4 -4
View File
@@ -147,11 +147,11 @@ pub async fn register_appservice(
// same appservice)
if let Ok(existing) = self.find_from_token(&registration.as_token).await {
if existing.registration.id != registration.id {
return Err!(Request(InvalidParam(
return Err(err!(Request(InvalidParam(
"Cannot register appservice: Token is already used by appservice '{}'. \
Please generate a different token.",
existing.registration.id
)));
))));
}
}
@@ -163,10 +163,10 @@ pub async fn register_appservice(
.await
.is_ok()
{
return Err!(Request(InvalidParam(
return Err(err!(Request(InvalidParam(
"Cannot register appservice: The provided token is already in use by a user \
device. Please generate a different token for the appservice."
)));
))));
}
self.db
+1 -1
View File
@@ -39,7 +39,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let url_preview_user_agent = config
.url_preview_user_agent
.clone()
.unwrap_or_else(|| conduwuit::version::user_agent().to_owned());
.unwrap_or_else(|| conduwuit::version::user_agent_media().to_owned());
Ok(Arc::new(Self {
default: base(config)?
+30 -11
View File
@@ -2,8 +2,8 @@
use bytes::Bytes;
use conduwuit::{
Err, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err,
error::inspect_debug_log, implement, trace,
Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err, implement,
trace, utils::response::LimitReadExt,
};
use http::{HeaderValue, header::AUTHORIZATION};
use ipaddress::IPAddress;
@@ -133,7 +133,22 @@ async fn handle_response<T>(
where
T: OutgoingRequest + Send,
{
let response = into_http_response(dest, actual, method, url, response).await?;
const HUGE_ENDPOINTS: [&str; 2] =
["/_matrix/federation/v2/send_join/", "/_matrix/federation/v2/state/"];
let size_limit: u64 = if HUGE_ENDPOINTS.iter().any(|e| url.path().starts_with(e)) {
// Some federation endpoints can return huge response bodies, so we'll bump the
// limit for those endpoints specifically.
self.services
.server
.config
.max_request_size
.saturating_mul(10)
} else {
self.services.server.config.max_request_size
}
.try_into()
.expect("size_limit (usize) should fit within a u64");
let response = into_http_response(dest, actual, method, url, response, size_limit).await?;
T::IncomingResponse::try_from_http_response(response)
.map_err(|e| err!(BadServerResponse("Server returned bad 200 response: {e:?}")))
@@ -145,6 +160,7 @@ async fn into_http_response(
method: &Method,
url: &Url,
mut response: Response,
max_size: u64,
) -> Result<http::Response<Bytes>> {
let status = response.status();
trace!(
@@ -167,19 +183,22 @@ async fn into_http_response(
);
trace!("Waiting for response body...");
let body = response
.bytes()
.await
.inspect_err(inspect_debug_log)
.unwrap_or_else(|_| Vec::new().into());
let http_response = http_response_builder
.body(body)
.body(
response
.limit_read(max_size)
.await
.unwrap_or_default()
.into(),
)
.expect("reqwest body is valid http body");
debug!("Got {status:?} for {method} {url}");
if !status.is_success() {
return Err!(Federation(dest.to_owned(), RumaError::from_http_response(http_response),));
return Err(Error::Federation(
dest.to_owned(),
RumaError::from_http_response(http_response),
));
}
Ok(http_response)
+2
View File
@@ -170,6 +170,8 @@ pub(super) fn remove_url_preview(&self, url: &str) -> Result<()> {
Ok(())
}
pub(super) async fn clear_url_previews(&self) { self.url_previews.clear().await; }
pub(super) fn set_url_preview(
&self,
url: &str,
+34 -21
View File
@@ -7,7 +7,7 @@
use std::time::SystemTime;
use conduwuit::{Err, Result, debug, err};
use conduwuit::{Err, Result, debug, err, utils::response::LimitReadExt};
use conduwuit_core::implement;
use ipaddress::IPAddress;
use serde::Serialize;
@@ -37,6 +37,9 @@ pub async fn remove_url_preview(&self, url: &str) -> Result<()> {
self.db.remove_url_preview(url)
}
#[implement(Service)]
pub async fn clear_url_previews(&self) { self.db.clear_url_previews().await; }
#[implement(Service)]
pub async fn set_url_preview(&self, url: &str, data: &UrlPreviewData) -> Result<()> {
let now = SystemTime::now()
@@ -109,8 +112,22 @@ pub async fn download_image(&self, url: &str) -> Result<UrlPreviewData> {
use image::ImageReader;
use ruma::Mxc;
let image = self.services.client.url_preview.get(url).send().await?;
let image = image.bytes().await?;
let image = self
.services
.client
.url_preview
.get(url)
.send()
.await?
.limit_read(
self.services
.server
.config
.max_request_size
.try_into()
.expect("u64 should fit in usize"),
)
.await?;
let mxc = Mxc {
server_name: self.services.globals.server_name(),
media_id: &random_string(super::MXC_LENGTH),
@@ -148,24 +165,20 @@ async fn download_html(&self, url: &str) -> Result<UrlPreviewData> {
use webpage::HTML;
let client = &self.services.client.url_preview;
let mut response = client.get(url).send().await?;
let mut bytes: Vec<u8> = Vec::new();
while let Some(chunk) = response.chunk().await? {
bytes.extend_from_slice(&chunk);
if bytes.len() > self.services.globals.url_preview_max_spider_size() {
debug!(
"Response body from URL {} exceeds url_preview_max_spider_size ({}), not \
processing the rest of the response body and assuming our necessary data is in \
this range.",
url,
self.services.globals.url_preview_max_spider_size()
);
break;
}
}
let body = String::from_utf8_lossy(&bytes);
let Ok(html) = HTML::from_string(body.to_string(), Some(url.to_owned())) else {
let body = client
.get(url)
.send()
.await?
.limit_read_text(
self.services
.server
.config
.max_request_size
.try_into()
.expect("u64 should fit in usize"),
)
.await?;
let Ok(html) = HTML::from_string(body.clone(), Some(url.to_owned())) else {
return Err!(Request(Unknown("Failed to parse HTML")));
};
+11 -6
View File
@@ -2,7 +2,7 @@
use conduwuit::{
Err, Error, Result, debug_warn, err, implement,
utils::content_disposition::make_content_disposition,
utils::{content_disposition::make_content_disposition, response::LimitReadExt},
};
use http::header::{CONTENT_DISPOSITION, CONTENT_TYPE, HeaderValue};
use ruma::{
@@ -35,7 +35,7 @@ pub async fn fetch_remote_thumbnail(
.fetch_thumbnail_authenticated(mxc, user, server, timeout_ms, dim)
.await;
if let Err(Error::Request { kind: NotFound, .. }) = &result {
if let Err(Error::Request(NotFound, ..)) = &result {
return self
.fetch_thumbnail_unauthenticated(mxc, user, server, timeout_ms, dim)
.await;
@@ -67,7 +67,7 @@ pub async fn fetch_remote_content(
);
});
if let Err(Error::Request { kind: Unrecognized, .. }) = &result {
if let Err(Error::Request(Unrecognized, ..)) = &result {
return self
.fetch_content_unauthenticated(mxc, user, server, timeout_ms)
.await;
@@ -286,10 +286,15 @@ async fn location_request(&self, location: &str) -> Result<FileMeta> {
.and_then(Result::ok);
response
.bytes()
.limit_read(
self.services
.server
.config
.max_request_size
.try_into()
.expect("u64 should fit in usize"),
)
.await
.map(Vec::from)
.map_err(Into::into)
.map(|content| FileMeta {
content: Some(content),
content_type: content_type.clone(),
+1 -1
View File
@@ -31,7 +31,7 @@
pub mod sending;
pub mod server_keys;
pub mod sync;
pub mod transaction_ids;
pub mod transactions;
pub mod uiaa;
pub mod users;
+13 -2
View File
@@ -1,6 +1,7 @@
use std::{fmt::Debug, mem, sync::Arc};
use bytes::BytesMut;
use conduwuit::utils::response::LimitReadExt;
use conduwuit_core::{
Err, Event, Result, debug_warn, err, trace,
utils::{stream::TryIgnore, string_from_bytes},
@@ -30,7 +31,7 @@
uint,
};
use crate::{Dep, client, globals, rooms, sending, users};
use crate::{Dep, client, config, globals, rooms, sending, users};
pub struct Service {
db: Data,
@@ -39,6 +40,7 @@ pub struct Service {
struct Services {
globals: Dep<globals::Service>,
config: Dep<config::Service>,
client: Dep<client::Service>,
state_accessor: Dep<rooms::state_accessor::Service>,
state_cache: Dep<rooms::state_cache::Service>,
@@ -61,6 +63,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
services: Services {
globals: args.depend::<globals::Service>("globals"),
client: args.depend::<client::Service>("client"),
config: args.depend::<config::Service>("config"),
state_accessor: args
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
@@ -245,7 +248,15 @@ pub async fn send_request<T>(&self, dest: &str, request: T) -> Result<T::Incomin
.expect("http::response::Builder is usable"),
);
let body = response.bytes().await?;
let body = response
.limit_read(
self.services
.config
.max_request_size
.try_into()
.expect("usize fits into u64"),
)
.await?;
if !status.is_success() {
debug_warn!("Push gateway response body: {:?}", string_from_bytes(&body));
+4 -6
View File
@@ -1,4 +1,6 @@
use conduwuit::{Result, debug, debug_error, debug_info, debug_warn, implement, trace};
use conduwuit::{
Result, debug, debug_error, debug_info, implement, trace, utils::response::LimitReadExt,
};
#[implement(super::Service)]
#[tracing::instrument(name = "well-known", level = "debug", skip(self, dest))]
@@ -24,12 +26,8 @@ pub(super) async fn request_well_known(&self, dest: &str) -> Result<Option<Strin
return Ok(None);
}
let text = response.text().await?;
let text = response.limit_read_text(8192).await?;
trace!("response text: {text:?}");
if text.len() >= 12288 {
debug_warn!("response contains junk");
return Ok(None);
}
let body: serde_json::Value = serde_json::from_str(&text).unwrap_or_default();
+1 -1
View File
@@ -142,7 +142,7 @@ async fn get_auth_chain_outer(
let chunk_cache: Vec<_> = chunk
.into_iter()
.try_stream()
.try_stream::<conduwuit::Error>()
.broad_and_then(|(shortid, event_id)| async move {
if let Ok(cached) = self.get_cached_eventid_authchain(&[shortid]).await {
return Ok(cached.to_vec());
@@ -63,7 +63,9 @@ pub(super) async fn fetch_state<Pdu>(
},
| hash_map::Entry::Occupied(_) => {
return Err!(Database(
"State event's type and state_key combination exists multiple times.",
"State event's type and state_key combination exists multiple times: {}, {}",
pdu.kind(),
state_key
));
},
}
@@ -162,7 +162,9 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
},
| hash_map::Entry::Occupied(_) => {
return Err!(Request(InvalidParam(
"Auth event's type and state_key combination exists multiple times.",
"Auth event's type and state_key combination exists multiple times: {}, {}",
auth_event.kind,
auth_event.state_key().unwrap_or("")
)));
},
}
@@ -112,14 +112,7 @@ pub async fn state_resolution<'a, StateSets>(
{
let event_fetch = |event_id| self.event_fetch(event_id);
let event_exists = |event_id| self.event_exists(event_id);
Ok(
state_res::resolve(
room_version,
state_sets,
auth_chain_sets,
&event_fetch,
&event_exists,
)
.await?,
)
state_res::resolve(room_version, state_sets, auth_chain_sets, &event_fetch, &event_exists)
.map_err(|e| err!(error!("State resolution failed: {e:?}")))
.await
}
+2 -2
View File
@@ -3,7 +3,7 @@
str::FromStr,
};
use conduwuit::{Err, Error, Result};
use conduwuit::{Error, Result};
use ruma::{UInt, api::client::error::ErrorKind};
use crate::rooms::short::ShortRoomId;
@@ -57,7 +57,7 @@ fn from_str(value: &str) -> Result<Self> {
if let Some(token) = pag_tok() {
Ok(token)
} else {
Err!(BadRequest(ErrorKind::InvalidParam, "invalid token"))
Err(Error::BadRequest(ErrorKind::InvalidParam, "invalid token"))
}
}
}
+20 -9
View File
@@ -72,6 +72,26 @@ pub async fn append_incoming_pdu<'a, Leaves>(
.append_pdu(pdu, pdu_json, new_room_leaves, state_lock, room_id)
.await?;
// Process admin commands for federation events
if *pdu.kind() == TimelineEventType::RoomMessage {
let content: ExtractBody = pdu.get_content()?;
if let Some(body) = content.body {
if let Some(source) = self
.services
.admin
.is_admin_command(pdu, &body, false)
.await
{
self.services.admin.command_with_sender(
body,
Some(pdu.event_id().into()),
source,
pdu.sender.clone().into(),
)?;
}
}
}
Ok(Some(pdu_id))
}
@@ -334,15 +354,6 @@ pub async fn append_pdu<'a, Leaves>(
let content: ExtractBody = pdu.get_content()?;
if let Some(body) = content.body {
self.services.search.index_pdu(shortroomid, &pdu_id, &body);
if let Some(source) = self.services.admin.is_admin_command(pdu, &body).await {
self.services.admin.command_with_sender(
body,
Some((pdu.event_id()).into()),
source,
pdu.sender.clone().into(),
)?;
}
}
},
| _ => {},
+23 -1
View File
@@ -18,7 +18,7 @@
},
};
use super::RoomMutexGuard;
use super::{ExtractBody, RoomMutexGuard};
/// Creates a new persisted data unit and adds it to a room. This function
/// takes a roomid_mutex_state, meaning that only this function is able to
@@ -126,6 +126,26 @@ pub async fn build_and_append_pdu(
.boxed()
.await?;
// Process admin commands for locally sent events
if *pdu.kind() == TimelineEventType::RoomMessage {
let content: ExtractBody = pdu.get_content()?;
if let Some(body) = content.body {
if let Some(source) = self
.services
.admin
.is_admin_command(&pdu, &body, true)
.await
{
self.services.admin.command_with_sender(
body,
Some(pdu.event_id().into()),
source,
pdu.sender.clone().into(),
)?;
}
}
}
// We set the room state after inserting the pdu, so that we never have a moment
// in time where events in the current room state do not exist
trace!("Setting room state for room {room_id}");
@@ -167,6 +187,8 @@ pub async fn build_and_append_pdu(
Ok(pdu.event_id().to_owned())
}
/// Assert invariants about the admin room, to prevent (for example) all admins
/// from leaving or being banned from the room
#[implement(super::Service)]
#[tracing::instrument(skip_all, level = "debug")]
async fn check_pdu_for_admin_room<Pdu>(&self, pdu: &Pdu, sender: &UserId) -> Result
+5 -4
View File
@@ -75,7 +75,10 @@ fn from_evt(
let content: RoomCreateEventContent = serde_json::from_str(content.get())?;
Ok(content.room_version)
} else {
Err!(InconsistentRoomState("non-create event for room of unknown version", room_id))
Err(Error::InconsistentRoomState(
"non-create event for room of unknown version",
room_id,
))
}
}
let PduBuilder {
@@ -272,9 +275,7 @@ fn from_evt(
.hash_and_sign_event(&mut pdu_json, &room_version_id)
{
return match e {
| Error::Signatures { source, .. }
if matches!(source, ruma::signatures::Error::PduSize) =>
{
| Error::Signatures(ruma::signatures::Error::PduSize) => {
Err!(Request(TooLarge("Message/PDU is too long (exceeds 65535 bytes)")))
},
| _ => Err!(Request(Unknown(warn!("Signing event failed: {e}")))),
+2 -2
View File
@@ -1,7 +1,7 @@
use std::{fmt::Debug, mem};
use bytes::BytesMut;
use conduwuit::{Err, Result, debug_error, err, utils, warn};
use conduwuit::{Err, Result, debug_error, err, utils, utils::response::LimitReadExt, warn};
use reqwest::Client;
use ruma::api::{IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken};
@@ -38,7 +38,7 @@ pub(crate) async fn send_antispam_request<T>(
.expect("http::response::Builder is usable"),
);
let body = response.bytes().await?; // TODO: handle timeout
let body = response.limit_read(65535).await?; // TODO: handle timeout
if !status.is_success() {
debug_error!("Antispam response bytes: {:?}", utils::string_from_bytes(&body));
+12 -2
View File
@@ -1,7 +1,9 @@
use std::{fmt::Debug, mem};
use bytes::BytesMut;
use conduwuit::{Err, Result, debug_error, err, implement, trace, utils, warn};
use conduwuit::{
Err, Result, debug_error, err, implement, trace, utils, utils::response::LimitReadExt, warn,
};
use ruma::api::{
IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken, appservice::Registration,
};
@@ -77,7 +79,15 @@ pub async fn send_appservice_request<T>(
.expect("http::response::Builder is usable"),
);
let body = response.bytes().await?;
let body = response
.limit_read(
self.server
.config
.max_request_size
.try_into()
.expect("usize fits into u64"),
)
.await?;
if !status.is_success() {
debug_error!("Appservice response bytes: {:?}", utils::string_from_bytes(&body));
+2 -2
View File
@@ -10,7 +10,7 @@
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
use conduwuit_core::{
Error, Event, Result, debug, err, error,
Error, Event, Result, at, debug, err, error,
result::LogErr,
trace,
utils::{
@@ -175,7 +175,7 @@ async fn handle_response_ok<'a>(
if !new_events.is_empty() {
self.db.mark_as_active(new_events.iter());
let new_events_vec = new_events.into_iter().map(|(_, event)| event).collect();
let new_events_vec = new_events.into_iter().map(at!(1)).collect();
futures.push(self.send_events(dest.clone(), new_events_vec));
} else {
statuses.remove(dest);
+3 -3
View File
@@ -14,7 +14,7 @@
media, moderation, presence, pusher, registration_tokens, resolver, rooms, sending,
server_keys,
service::{self, Args, Map, Service},
sync, transaction_ids, uiaa, users,
sync, transactions, uiaa, users,
};
pub struct Services {
@@ -37,7 +37,7 @@ pub struct Services {
pub sending: Arc<sending::Service>,
pub server_keys: Arc<server_keys::Service>,
pub sync: Arc<sync::Service>,
pub transaction_ids: Arc<transaction_ids::Service>,
pub transactions: Arc<transactions::Service>,
pub uiaa: Arc<uiaa::Service>,
pub users: Arc<users::Service>,
pub moderation: Arc<moderation::Service>,
@@ -110,7 +110,7 @@ macro_rules! build {
sending: build!(sending::Service),
server_keys: build!(server_keys::Service),
sync: build!(sync::Service),
transaction_ids: build!(transaction_ids::Service),
transactions: build!(transactions::Service),
uiaa: build!(uiaa::Service),
users: build!(users::Service),
moderation: build!(moderation::Service),
-54
View File
@@ -1,54 +0,0 @@
use std::sync::Arc;
use conduwuit::{Result, implement};
use database::{Handle, Map};
use ruma::{DeviceId, TransactionId, UserId};
pub struct Service {
db: Data,
}
struct Data {
userdevicetxnid_response: Arc<Map>,
}
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data {
userdevicetxnid_response: args.db["userdevicetxnid_response"].clone(),
},
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
#[implement(Service)]
pub fn add_txnid(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &TransactionId,
data: &[u8],
) {
let mut key = user_id.as_bytes().to_vec();
key.push(0xFF);
key.extend_from_slice(device_id.map(DeviceId::as_bytes).unwrap_or_default());
key.push(0xFF);
key.extend_from_slice(txn_id.as_bytes());
self.db.userdevicetxnid_response.insert(&key, data);
}
// If there's no entry, this is a new transaction
#[implement(Service)]
pub async fn existing_txnid(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &TransactionId,
) -> Result<Handle<'_>> {
let key = (user_id, device_id, txn_id);
self.db.userdevicetxnid_response.qry(&key).await
}

Some files were not shown because too many files have changed in this diff Show More