Compare commits

...

72 Commits

Author SHA1 Message Date
Tom Foster
5a44f3e5e4 docs(docker): Restructure deployment guide and add env var reference
Add Quick Run section with complete getting-started workflow including
admin user creation via --execute flag. Consolidate Docker Compose to
treat reverse proxy as essential with Traefik/Caddy/nginx examples.

Move detailed image building to development guide, keeping deployment
docs focused on using pre-built images.

Create environment variables reference with practical examples and
context. Clarify built-in TLS is for testing only; production should
use reverse proxies.
2026-02-23 17:57:40 +00:00
timedout
558262dd1f chore: Refactor transaction_ids -> transactions 2026-02-23 17:44:35 +00:00
timedout
d311b87579 chore: Fix incorrect capitalisation
I didn't realise I agreed to take an English class with @ginger while
working on this server lol
2026-02-23 17:25:12 +00:00
timedout
8702f55cf5 fix: Don't panic if nobody's listening 2026-02-23 17:22:37 +00:00
timedout
d4481b07ac chore: Add news frag 2026-02-23 16:54:54 +00:00
Jade Ellis
92351df925 refactor: Make federation transaction handle errors correctly
We have a dedicated error type that's then matched.
Event sorting is now infallible.
Could probably be cleaned up in a bit.
2026-02-23 16:36:46 +00:00
Jade Ellis
47e2733ea1 refactor: Make stream utils generic over the error type 2026-02-23 16:36:46 +00:00
Jade Ellis
6637e4c6a7 fix: Clean up cache, prevent several race conditions
We use one map which is only ever held for a short time.
2026-02-23 16:36:46 +00:00
nexy7574
35e441452f feat: Attempt to build localised DAG before processing PDUs 2026-02-23 16:36:46 +00:00
nexy7574
66bbb655bf feat: Warn when server is overloaded 2026-02-23 16:36:45 +00:00
nexy7574
81b202ce51 chore: Decrease transaction log verbosity 2026-02-23 16:36:45 +00:00
nexy7574
4657844d46 feat: Show active transaction handle count in !admin federation incoming-federation 2026-02-23 16:36:45 +00:00
nexy7574
9016cd11a6 chore: Run pre-commit and clippy to fix inherited CI errs 2026-02-23 16:36:45 +00:00
nexy7574
dd70094719 feat: Make max_active_txns actually configurable 2026-02-23 16:36:45 +00:00
nexy7574
fcd49b7ab3 fix: Remove duplicate fields from logs 2026-02-23 16:36:45 +00:00
nexy7574
470c9b52dd feat: Instrument process_inbound_transaction 2026-02-23 16:36:45 +00:00
nexy7574
0d8cafc329 feat: Support casting transaction processing to the background 2026-02-23 16:36:44 +00:00
nexy7574
2f9956ddca feat: Add helper functions for federation channels 2026-02-23 16:36:44 +00:00
nexy7574
21a97cdd0b chore: Refactor existing references to transaction service 2026-02-23 16:36:44 +00:00
nexy7574
e986cd4536 feat(federation): Restructure transaction_ids service
Adds two new in-memory maps to the service in to prepare for better handlers
2026-02-23 16:36:40 +00:00
Shane Jaroch
526d862296 fix: more aggressive user agent for URL preview
adding "facebookexternalhit" alongside "embedbot" fixes many errors, such as YouTube Music's:
    "Your browser is deprecated. Please upgrade."

add admin command to clear URL stuck and broken data (per URL currently)

    add command to clear all saved URL previews.
    sync resolver docs.
2026-02-23 15:24:14 +00:00
Ben Botwin
fbeb5bf186 report permission denied errors 2026-02-23 15:22:18 +00:00
Ben Botwin
a336f2df44 fixed formatting 2026-02-23 15:22:18 +00:00
Ben Botwin
19b78ec73e made error handling more concise 2026-02-23 15:22:18 +00:00
Ben Botwin
27ff2d9363 added more granular error handling for other file fetch function 2026-02-23 15:22:18 +00:00
Ben Botwin
50fa8c3abf ran format 2026-02-23 15:22:18 +00:00
Ben Botwin
18c4be869f added handling for other potential errors 2026-02-23 15:22:18 +00:00
Ben Botwin
fc00b96d8b Added proper 404 for not found media and fixed devshell for running tests 2026-02-23 15:22:18 +00:00
Jade Ellis
fa4156d8a6 docs: Changelog 2026-02-22 21:19:20 +00:00
Jade Ellis
23638cd714 feat(appservices): MSC3202 Device masquerading for appservices 2026-02-22 21:19:20 +00:00
Raven
9f1a483e76 docs: Add information about partnered homeservers to the introduction page & update README.md
Includes step-by-step directions to ease the lift for those who have ended up
here and who have never created a matrix account or used matrix before in the
past.

Also updates the information in README.md to match, as these should generally be identical.
2026-02-21 18:51:56 -08:00
Renovate Bot
688ef727e5 chore(deps): update rust crate nix to 0.31.0 2026-02-21 16:33:05 +00:00
Shannon Sterz
3de026160e docs: express forbidden_remote_server_names as valid regex
this field expects a regex not a glob, so the correct value should be
".*" if one wants to block all remote server names. otherwise, setting
"*" as documented results in an error on start because the configuration
could not be properly parsed.
2026-02-21 16:27:59 +00:00
Ginger
9fe761513d chore: Clippy & prek fixes 2026-02-21 11:27:39 -05:00
Renovate Bot
abf1e1195a chore(deps): update rust crate libloading to 0.9.0 2026-02-21 01:55:48 +00:00
Ginger
d9537e9b55 fix: Forbid registering users with a non-local localpart 2026-02-20 20:54:19 -05:00
Jade Ellis
0d1de70d8f fix(deps): Update lockfile 2026-02-21 00:22:42 +00:00
Ben Botwin
4aa03a71eb fix(nix): Added unstable flag to buildDeps 2026-02-21 00:15:53 +00:00
aviac
f847918575 fix(nix): Fix all-features build
The build was broken since we started using an unstable reqwest version
which requires setting an extra feature flag
2026-02-21 00:15:53 +00:00
Renovate Bot
7569a0545b chore(deps): update dependency lddtree to 0.5.0 2026-02-20 22:59:34 +00:00
Jade Ellis
b6c5991e1f chore(deps): Update rand
A couple indirect deps are still on rand_core 0.6 but we can deal
2026-02-20 22:57:45 +00:00
Katie Kloss
efd879fcd8 docs: Add news fragment 2026-02-20 10:13:54 +00:00
Katie Kloss
92a848f74d fix: Crash before starting on OpenBSD
core_affinity doesn't return any cores on OpenBSD, so we try to
clamp(1, 0). This is Less Good than fixing that crate, but at
least allows the server to start up.
2026-02-20 10:13:54 +00:00
Renovate Bot
776b5865ba chore(deps): update sentry-rust monorepo to 0.46.0 2026-02-19 14:56:25 +00:00
timedout
722bacbe89 chore: Fix busted lockfile merge 2026-02-19 02:33:41 +00:00
Jade Ellis
46907e3dce chore: Migrate to axum 0.8
Co-authored-by: dasha_uwu
2026-02-19 02:18:29 +00:00
timedout
31e2195e56 fix: Remove non-compliant and non-functional non-authoritative directory queries
chore: Add news frag
2026-02-19 01:37:42 +00:00
Terry
7ecac93ddc fix: Remove rocksdb secondary mode 2026-02-18 23:11:53 +00:00
Terry
6a0b103722 docs: Changelog 2026-02-18 23:11:53 +00:00
Terry
23d77b614f fix: Remove ability to set rocksdb as read only 2026-02-18 23:11:53 +00:00
stratself
e01aa44b16 fix: add nodejs URL in CONTRIBUTING.md page 2026-02-18 23:07:29 +00:00
stratself
a08739c246 docs: rewrite how to load docs with new rspress engine 2026-02-18 23:07:29 +00:00
Ginger
c14864b881 fix: Wording fixes 2026-02-18 14:41:03 +00:00
Ginger
1773e72e68 feat(docs): Add a note about !779 to the troubleshooting page 2026-02-18 14:41:03 +00:00
kraem
0f94d55689 fix: don't warn about needed backfill via federation for non-federated rooms 2026-02-18 14:27:14 +00:00
Renovate Bot
abfb6377c2 chore(deps): update rust-patch-updates 2026-02-18 14:26:49 +00:00
Renovate Bot
91d64f5b24 chore(deps): update rust crate askama to 0.15.0 2026-02-18 05:04:23 +00:00
Jade Ellis
9a3f3f6e78 ci: Explicitly enable Dependency Dashboard 2026-02-17 21:33:30 +00:00
Jade Ellis
b3e31a4aad ci(deps): Automerge typos updates 2026-02-17 21:33:13 +00:00
Jade Ellis
8cda431cc6 ci(deps): Group npm patch updates 2026-02-17 21:30:51 +00:00
Renovate Bot
02b9a3f713 chore(deps): update pre-commit hook crate-ci/typos to v1.43.5 2026-02-17 05:03:45 +00:00
timedout
d40893730c chore: Lighten the phrasing 2026-02-17 02:07:19 +00:00
timedout
28fae58cf6 chore: Add news frag & rebuild config 2026-02-17 02:07:19 +00:00
timedout
f458f6ab76 chore: Disable presence by default, and add warnings to other heavy ops 2026-02-17 02:07:19 +00:00
Shane Jaroch
fdf9cea533 fix(admin-cli): concatenation/formatting error, i.e.,
**NOTE:** If there are any features, tools, or admin internals dependent on this output that would break, let me know!
I'm hoping this is acceptable, since it's a human-readable command.

Current output:

```shell
uwu> server list-backups
    #1 Mon, 9 Feb 2026 20:36:25 +0000: 66135580 bytes, 595 files#2 Wed, 11 Feb 2026 02:33:15 +0000: 270963746 bytes, 1002 files#3 Sat, 14 Feb 2026 22:11:19 +0000: 675905487 bytes, 2139 files
```

Should be:

```shell
uwu> server list-backups
    #1 Mon, 9 Feb 2026 20:36:25 +0000: 66135580 bytes, 595 files
    #2 Wed, 11 Feb 2026 02:33:15 +0000: 270963746 bytes, 1002 files
    #3 Sat, 14 Feb 2026 22:11:19 +0000: 675905487 bytes, 2139 files
```
2026-02-16 00:52:02 -05:00
Jade Ellis
ecb1b73c84 style: Trailing whitespace 2026-02-16 03:47:16 +00:00
rooot
e03082480a docs(livekit): document nginx websockets too
Signed-off-by: rooot <hey@rooot.gay>
2026-02-16 03:43:43 +00:00
rooot
f9e7f019ad docs(livekit): fix port in caddy config example
Signed-off-by: rooot <hey@rooot.gay>
2026-02-16 03:43:43 +00:00
rooot
12069e7c86 docs(livekit): add nginx proxy example
Signed-off-by: rooot <hey@rooot.gay>
2026-02-16 03:43:42 +00:00
Jade Ellis
77928a62b4 docs: Document BSD community room 2026-02-16 03:31:56 +00:00
elisaado
c73cb5c1bf feat(docs): Add Kubernetes documentation with sample (#1387)
Reviewed-on: https://forgejo.ellis.link/continuwuation/continuwuity/pulls/1387
Reviewed-by: Jade Ellis <jade@ellis.link>
Co-authored-by: elisaado <forgejoellis@elisaado.com>
Co-committed-by: elisaado <forgejoellis@elisaado.com>
2026-02-16 03:14:29 +00:00
Jade Ellis
a140eacb04 docs: Fix trailing list 2026-02-16 03:12:50 +00:00
81 changed files with 2235 additions and 1203 deletions

View File

@@ -23,7 +23,7 @@ repos:
- id: check-added-large-files
- repo: https://github.com/crate-ci/typos
rev: v1.43.4
rev: v1.43.5
hooks:
- id: typos
- id: typos

View File

@@ -85,24 +85,31 @@ ### Matrix tests
### Writing documentation
Continuwuity's website uses [`mdbook`][mdbook] and is deployed via CI using Cloudflare Pages
Continuwuity's website uses [`rspress`][rspress] and is deployed via CI using Cloudflare Pages
in the [`documentation.yml`][documentation.yml] workflow file. All documentation is in the `docs/`
directory at the top level.
To build the documentation locally:
To load the documentation locally:
1. Install NodeJS and npm from their [official website][nodejs-download] or via your package manager of choice
2. From the project's root directory, install the relevant npm modules
1. Install mdbook if you don't have it already:
```bash
cargo install mdbook # or cargo binstall, or another method
npm ci
```
2. Build the documentation:
3. Make changes to the document pages as you see fit
4. Generate a live preview of the documentation
```bash
mdbook build
npm run docs:dev
```
The output of the mdbook generation is in `public/`. You can open the HTML files directly in your browser without needing a web server.
A webserver for the docs will be spun up for you (e.g. at `http://localhost:3000`). Any changes you make to the documentation will be live-reloaded on the webpage.
Alternatively, you can build the documentation using `npm run docs:build` - the output of this will be in the `/doc_build` directory. Once you're happy with your documentation updates, you can commit the changes.
### Commit Messages
@@ -169,5 +176,6 @@ ### Creating pull requests
[continuwuity-matrix]: https://matrix.to/#/#continuwuity:continuwuity.org?via=continuwuity.org&via=ellis.link&via=explodie.org&via=matrix.org
[complement]: https://github.com/matrix-org/complement/
[sytest]: https://github.com/matrix-org/sytest/
[mdbook]: https://rust-lang.github.io/mdBook/
[nodejs-download]: https://nodejs.org/en/download
[rspress]: https://rspress.rs/
[documentation.yml]: https://forgejo.ellis.link/continuwuation/continuwuity/src/branch/main/.forgejo/workflows/documentation.yml

1014
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -68,7 +68,7 @@ default-features = false
version = "0.1.3"
[workspace.dependencies.rand]
version = "0.8.5"
version = "0.10.0"
# Used for the http request / response body type for Ruma endpoints used with reqwest
[workspace.dependencies.bytes]
@@ -84,7 +84,7 @@ version = "1.3.1"
version = "1.11.1"
[workspace.dependencies.axum]
version = "0.7.9"
version = "0.8.8"
default-features = false
features = [
"form",
@@ -97,7 +97,7 @@ features = [
]
[workspace.dependencies.axum-extra]
version = "0.9.6"
version = "0.10.1"
default-features = false
features = ["typed-header", "tracing"]
@@ -110,7 +110,7 @@ default-features = false
version = "0.7"
[workspace.dependencies.axum-client-ip]
version = "0.6.1"
version = "0.7"
[workspace.dependencies.tower]
version = "0.5.2"
@@ -118,7 +118,7 @@ default-features = false
features = ["util"]
[workspace.dependencies.tower-http]
version = "0.6.2"
version = "0.6.8"
default-features = false
features = [
"add-extension",
@@ -253,7 +253,7 @@ features = [
version = "0.4.0"
[workspace.dependencies.libloading]
version = "0.8.6"
version = "0.9.0"
# Validating urls in config, was already a transitive dependency
[workspace.dependencies.url]
@@ -298,7 +298,7 @@ default-features = false
features = ["env", "toml"]
[workspace.dependencies.hickory-resolver]
version = "0.25.1"
version = "0.25.2"
default-features = false
features = [
"serde",
@@ -342,7 +342,8 @@ version = "0.1.2"
# Used for matrix spec type definitions and helpers
[workspace.dependencies.ruma]
git = "https://forgejo.ellis.link/continuwuation/ruwuma"
rev = "b496b7f38d517149361a882e75d3fd4faf210441"
#branch = "conduwuit-changes"
rev = "e087ff15888156942ca2ffe6097d1b4c3fd27628"
features = [
"compat",
"rand",
@@ -424,7 +425,7 @@ features = ["http", "grpc-tonic", "trace", "logs", "metrics"]
# optional sentry metrics for crash/panic reporting
[workspace.dependencies.sentry]
version = "0.45.0"
version = "0.46.0"
default-features = false
features = [
"backtrace",
@@ -440,9 +441,9 @@ features = [
]
[workspace.dependencies.sentry-tracing]
version = "0.45.0"
version = "0.46.0"
[workspace.dependencies.sentry-tower]
version = "0.45.0"
version = "0.46.0"
# jemalloc usage
[workspace.dependencies.tikv-jemalloc-sys]
@@ -471,7 +472,7 @@ features = ["use_std"]
version = "0.5"
[workspace.dependencies.nix]
version = "0.30.1"
version = "0.31.0"
default-features = false
features = ["resource"]
@@ -553,7 +554,7 @@ version = "0.7.5"
version = "1.0.1"
[workspace.dependencies.askama]
version = "0.14.0"
version = "0.15.0"
#
# Patches

View File

@@ -57,10 +57,15 @@ ### What are the project's goals?
### Can I try it out?
Check out the [documentation](https://continuwuity.org) for installation instructions, or join one of these vetted public homeservers running Continuwuity to get a feel for things!
Check out the [documentation](https://continuwuity.org) for installation instructions.
- https://continuwuity.rocks -- A public demo server operated by the Continuwuity Team.
- https://federated.nexus -- Federated Nexus is a community resource hosting multiple FOSS (especially federated) services, including Matrix and Forgejo.
If you want to try it out as a user, we have some partnered homeservers you can use:
* You can head over to [https://federated.nexus](https://federated.nexus/) in your browser.
* Hit the `Apply to Join` button. Once your request has been accepted, you will receive an email with your username and password.
* Head over to [https://app.federated.nexus](https://app.federated.nexus/) and you can sign in there, or use any other matrix chat client you wish elsewhere.
* Your username for matrix will be in the form of `@username:federated.nexus`, however you can simply use the `username` part to log in. Your password is your password.
* There's also [https://continuwuity.rocks/](https://continuwuity.rocks/). You can register a new account using Cinny via [this convenient link](https://app.cinny.in/register/continuwuity.rocks), or you can use Element or another matrix client *that supports registration*.
### What are we working on?

1
changelog.d/1393.bugfix Normal file
View File

@@ -0,0 +1 @@
Removed non-compliant nor functional room alias lookups over federation. Contributed by @nex

1
changelog.d/1399.feature Normal file
View File

@@ -0,0 +1 @@
Outgoing presence is now disabled by default, and the config option documentation has been adjusted to more accurately represent the weight of presence, typing indicators, and read receipts. Contributed by @nex.

1
changelog.d/1418.bugfix Normal file
View File

@@ -0,0 +1 @@
Removed ability to set rocksdb as read only. Doing so would cause unintentional and buggy behaviour. Contributed by @Terryiscool160.

1
changelog.d/1421.bugfix Normal file
View File

@@ -0,0 +1 @@
Fixed a startup crash in the sender service if we can't detect the number of CPU cores, even if the `sender_workers' config option is set correctly. Contributed by @katie.

1
changelog.d/1428.feature Normal file
View File

@@ -0,0 +1 @@
Improved the concurrency handling of federation transactions, vastly improving performance and reliability by more accurately handling inbound transactions and reducing the amount of repeated wasted work. Contributed by @nex and @Jade.

View File

@@ -0,0 +1 @@
Added MSC3202 Device masquerading (not all of MSC3202). This should fix issues with enabling MSC4190 for some Mautrix bridges. Contributed by @Jade

View File

@@ -0,0 +1 @@
Updated `list-backups` admin command to output one backup per line.

View File

@@ -0,0 +1 @@
Improved URL preview fetching with a more compatible user agent for sites like YouTube Music. Added `!admin media delete-url-preview <url>` command to clear cached URL previews that were stuck and broken.

View File

@@ -290,6 +290,25 @@
#
#max_fetch_prev_events = 192
# How many incoming federation transactions the server is willing to be
# processing at any given time before it becomes overloaded and starts
# rejecting further transactions until some slots become available.
#
# Setting this value too low or too high may result in unstable
# federation, and setting it too high may cause runaway resource usage.
#
#max_concurrent_inbound_transactions = 150
# Maximum age (in seconds) for cached federation transaction responses.
# Entries older than this will be removed during cleanup.
#
#transaction_id_cache_max_age_secs = 7200 (2 hours)
# Maximum number of cached federation transaction responses.
# When the cache exceeds this limit, older entries will be removed.
#
#transaction_id_cache_max_entries = 8192
# Default/base connection timeout (seconds). This is used only by URL
# previews and update/news endpoint checks.
#
@@ -1056,14 +1075,6 @@
#
#rocksdb_repair = false
# This item is undocumented. Please contribute documentation for it.
#
#rocksdb_read_only = false
# This item is undocumented. Please contribute documentation for it.
#
#rocksdb_secondary = false
# Enables idle CPU priority for compaction thread. This is not enabled by
# default to prevent compaction from falling too far behind on busy
# systems.
@@ -1120,27 +1131,34 @@
# Allow local (your server only) presence updates/requests.
#
# Note that presence on continuwuity is very fast unlike Synapse's. If
# using outgoing presence, this MUST be enabled.
# Local presence must be enabled for outgoing presence to function.
#
# Note that local presence is not as heavy on the CPU as federated
# presence, but will still become more expensive the more local users you
# have.
#
#allow_local_presence = true
# Allow incoming federated presence updates/requests.
# Allow incoming federated presence updates.
#
# This option receives presence updates from other servers, but does not
# send any unless `allow_outgoing_presence` is true. Note that presence on
# continuwuity is very fast unlike Synapse's.
# This option enables processing inbound presence updates from other
# servers. Without it, remote users will appear as if they are always
# offline to your local users. This does not affect typing indicators or
# read receipts.
#
#allow_incoming_presence = true
# Allow outgoing presence updates/requests.
#
# This option sends presence updates to other servers, but does not
# receive any unless `allow_incoming_presence` is true. Note that presence
# on continuwuity is very fast unlike Synapse's. If using outgoing
# presence, you MUST enable `allow_local_presence` as well.
# This option sends presence updates to other servers, and requires that
# `allow_local_presence` is also enabled.
#
#allow_outgoing_presence = true
# Note that outgoing presence is very heavy on the CPU and network, and
# will typically cause extreme strain and slowdowns for no real benefit.
# There are only a few clients that even implement presence, so you
# probably don't want to enable this.
#
#allow_outgoing_presence = false
# How many seconds without presence updates before you become idle.
# Defaults to 5 minutes.
@@ -1174,6 +1192,10 @@
# Allow sending read receipts to remote servers.
#
# Note that sending read receipts to remote servers in large rooms with
# lots of other homeservers may cause additional strain on the CPU and
# network.
#
#allow_outgoing_read_receipts = true
# Allow local typing updates.
@@ -1185,6 +1207,10 @@
# Allow outgoing typing updates to federation.
#
# Note that sending typing indicators to remote servers in large rooms
# with lots of other homeservers may cause additional strain on the CPU
# and network.
#
#allow_outgoing_typing = true
# Allow incoming typing updates from federation.
@@ -1318,7 +1344,7 @@
# sender user's server name, inbound federation X-Matrix origin, and
# outbound federation handler.
#
# You can set this to ["*"] to block all servers by default, and then
# You can set this to [".*"] to block all servers by default, and then
# use `allowed_remote_server_names` to allow only specific servers.
#
# example: ["badserver\\.tld$", "badphrase", "19dollarfortnitecards"]

View File

@@ -52,7 +52,7 @@ ENV BINSTALL_VERSION=1.17.5
# renovate: datasource=github-releases depName=psastras/sbom-rs
ENV CARGO_SBOM_VERSION=0.9.1
# renovate: datasource=crate depName=lddtree
ENV LDDTREE_VERSION=0.4.0
ENV LDDTREE_VERSION=0.5.0
# renovate: datasource=crate depName=timelord-cli
ENV TIMELORD_VERSION=3.0.1

View File

@@ -22,7 +22,7 @@ ENV BINSTALL_VERSION=1.17.5
# renovate: datasource=github-releases depName=psastras/sbom-rs
ENV CARGO_SBOM_VERSION=0.9.1
# renovate: datasource=crate depName=lddtree
ENV LDDTREE_VERSION=0.4.0
ENV LDDTREE_VERSION=0.5.0
# Install unpackaged tools
RUN <<EOF

View File

@@ -64,6 +64,11 @@
"label": "Configuration Reference",
"name": "/reference/config"
},
{
"type": "file",
"label": "Environment Variables",
"name": "/reference/environment-variables"
},
{
"type": "dir",
"label": "Admin Command Reference",

View File

@@ -137,7 +137,7 @@ ### 4. Configure your Reverse Proxy
# for lk-jwt-service
@lk-jwt-service path /sfu/get* /healthz* /get_token*
route @lk-jwt-service {
reverse_proxy 127.0.0.1:8080
reverse_proxy 127.0.0.1:8081
}
# for livekit
@@ -146,6 +146,46 @@ ### 4. Configure your Reverse Proxy
```
</details>
<details>
<summary>Example nginx config</summary>
```
server {
server_name matrix-rtc.example.com;
# for lk-jwt-service
location ~ ^/(sfu/get|healthz|get_token) {
proxy_pass http://127.0.0.1:8081$request_uri;
proxy_set_header X-Forwarded-For $remote_addr;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $http_host;
proxy_buffering off;
}
# for livekit
location / {
proxy_pass http://127.0.0.1:7880$request_uri;
proxy_set_header X-Forwarded-For $remote_addr;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $http_host;
proxy_buffering off;
# websocket
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
}
}
```
Note that for websockets to work, you need to have this somewhere outside your server block:
```
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
```
</details>
<details>
<summary>Example traefik router</summary>
```
@@ -226,4 +266,3 @@ ### Related Documentation
- [Synapse documentation](https://github.com/element-hq/element-call/blob/livekit/docs/self-hosting.md)
- [Community guide](https://tomfos.tr/matrix/livekit/)
- [Community guide](https://blog.kimiblock.top/2024/12/24/hosting-element-call/)
-

View File

@@ -8,7 +8,6 @@ services:
restart: unless-stopped
volumes:
- db:/var/lib/continuwuity
- /etc/resolv.conf:/etc/resolv.conf:ro # Use the host's DNS resolver rather than Docker's.
#- ./continuwuity.toml:/etc/continuwuity.toml
networks:
- proxy

View File

@@ -2,28 +2,26 @@ # Continuwuity for Docker
## Docker
To run Continuwuity with Docker, you can either build the image yourself or pull it
from a registry.
To run Continuwuity with Docker, you can either build the image yourself or pull
it from a registry.
### Use a registry
OCI images for Continuwuity are available in the registries listed below.
Available OCI images:
| Registry | Image | Notes |
| --------------- | --------------------------------------------------------------- | -----------------------|
| Forgejo Registry| [forgejo.ellis.link/continuwuation/continuwuity:latest](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/latest) | Latest tagged image. |
| Forgejo Registry| [forgejo.ellis.link/continuwuation/continuwuity:main](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/main) | Main branch image. |
| Forgejo Registry| [forgejo.ellis.link/continuwuation/continuwuity:latest-maxperf](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/latest-maxperf) | [Performance optimised version.](./generic.mdx#performance-optimised-builds) |
| Forgejo Registry| [forgejo.ellis.link/continuwuation/continuwuity:main-maxperf](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/main-maxperf) | [Performance optimised version.](./generic.mdx#performance-optimised-builds) |
| Registry | Image | Notes |
| ---------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------- |
| Forgejo Registry | [forgejo.ellis.link/continuwuation/continuwuity:latest](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/latest) | Latest tagged image. |
| Forgejo Registry | [forgejo.ellis.link/continuwuation/continuwuity:main](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/main) | Main branch image. |
| Forgejo Registry | [forgejo.ellis.link/continuwuation/continuwuity:latest-maxperf](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/latest-maxperf) | [Performance optimised version.](./generic.mdx#performance-optimised-builds) |
| Forgejo Registry | [forgejo.ellis.link/continuwuation/continuwuity:main-maxperf](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/main-maxperf) | [Performance optimised version.](./generic.mdx#performance-optimised-builds) |
Use
**Example:**
```bash
docker image pull $LINK
docker image pull forgejo.ellis.link/continuwuation/continuwuity:main-maxperf
```
to pull it to your machine.
#### Mirrors
Images are mirrored to multiple locations automatically, on a schedule:
@@ -33,39 +31,146 @@ #### Mirrors
- `registry.gitlab.com/continuwuity/continuwuity`
- `git.nexy7574.co.uk/mirrored/continuwuity` (releases only, no `main`)
### Run
### Quick Run
When you have the image, you can simply run it with
Get a working Continuwuity server with an admin user in four steps:
#### Prerequisites
Continuwuity requires HTTPS for Matrix federation. You'll need:
- A domain name pointing to your server
- A reverse proxy with SSL/TLS certificates (Traefik, Caddy, nginx, etc.)
See [Docker Compose](#docker-compose) for complete examples.
#### Environment Variables
- `CONTINUWUITY_SERVER_NAME` - Your Matrix server's domain name
- `CONTINUWUITY_DATABASE_PATH` - Where to store your database (must match the
volume mount)
- `CONTINUWUITY_ADDRESS` - Bind address (use `0.0.0.0` to listen on all
interfaces)
- `CONTINUWUITY_ALLOW_REGISTRATION` - Set to `false` to disable registration, or
use with `CONTINUWUITY_REGISTRATION_TOKEN` to require a token (see
[reference](../reference/environment-variables.mdx#registration--user-configuration)
for details)
See the
[Environment Variables Reference](../reference/environment-variables.mdx) for
more configuration options.
#### 1. Pull the image
```bash
docker run -d -p 8448:6167 \
-v db:/var/lib/continuwuity/ \
-e CONTINUWUITY_SERVER_NAME="your.server.name" \
-e CONTINUWUITY_ALLOW_REGISTRATION=false \
--name continuwuity $LINK
docker pull forgejo.ellis.link/continuwuation/continuwuity:latest
```
or you can use [Docker Compose](#docker-compose).
#### 2. Start the server with initial admin user
The `-d` flag lets the container run in detached mode. You may supply an
optional `continuwuity.toml` config file, the example config can be found
[here](../reference/config.mdx). You can pass in different env vars to
change config values on the fly. You can even configure Continuwuity completely by
using env vars. For an overview of possible values, please take a look at the
<a href="/examples/docker-compose.yml" target="_blank">`docker-compose.yml`</a> file.
```bash
docker run -d \
-p 6167:6167 \
-v continuwuity_db:/var/lib/continuwuity \
-e CONTINUWUITY_SERVER_NAME="matrix.example.com" \
-e CONTINUWUITY_DATABASE_PATH="/var/lib/continuwuity" \
-e CONTINUWUITY_ADDRESS="0.0.0.0" \
-e CONTINUWUITY_ALLOW_REGISTRATION="false" \
--name continuwuity \
forgejo.ellis.link/continuwuation/continuwuity:latest \
--execute "users create-user admin"
```
If you just want to test Continuwuity for a short time, you can use the `--rm`
flag, which cleans up everything related to your container after you stop
it.
Replace `matrix.example.com` with your actual server name and `admin` with
your preferred username.
#### 3. Get your admin password
```bash
docker logs continuwuity 2>&1 | grep "Created user"
```
You'll see output like:
```
Created user with user_id: @admin:matrix.example.com and password: `[auto-generated-password]`
```
#### 4. Configure your reverse proxy
Configure your reverse proxy to forward HTTPS traffic to Continuwuity. See
[Docker Compose](#docker-compose) for examples.
Once configured, log in with any Matrix client using `@admin:matrix.example.com`
and the generated password. You'll automatically be invited to the admin room
where you can manage your server.
### Docker Compose
If the `docker run` command is not suitable for you or your setup, you can also use one
of the provided `docker-compose` files.
Docker Compose is the recommended deployment method. These examples include
reverse proxy configurations for Matrix federation.
Depending on your proxy setup, you can use one of the following files:
#### Matrix Federation Requirements
### For existing Traefik setup
For Matrix federation to work, you need to serve `.well-known/matrix/client` and
`.well-known/matrix/server` endpoints. You can achieve this either by:
1. **Using a well-known service** - The compose files below include an nginx
container to serve these files
2. **Using Continuwuity's built-in delegation** (easier for Traefik) - Configure
delegation files in your config, then proxy `/.well-known/matrix/*` to
Continuwuity
**Traefik example using built-in delegation:**
```yaml
labels:
traefik.http.routers.continuwuity.rule: >-
(Host(`matrix.example.com`) ||
(Host(`example.com`) && PathPrefix(`/.well-known/matrix`)))
```
This routes your Matrix domain and well-known paths to Continuwuity.
#### Creating Your First Admin User
Add the `--execute` command to create an admin user on first startup. In your
compose file, add under the `continuwuity` service:
```yaml
services:
continuwuity:
image: forgejo.ellis.link/continuwuation/continuwuity:latest
command: --execute "users create-user admin"
# ... rest of configuration
```
Then retrieve the auto-generated password:
```bash
docker compose logs continuwuity | grep "Created user"
```
#### Choose Your Reverse Proxy
Select the compose file that matches your setup:
:::note DNS Performance
Docker's default DNS resolver can cause performance issues with Matrix
federation. If you experience slow federation or DNS timeouts, you may need to
use your host's DNS resolver instead. Add this volume mount to the
`continuwuity` service:
```yaml
volumes:
- /etc/resolv.conf:/etc/resolv.conf:ro
```
See [Troubleshooting - DNS Issues](../troubleshooting.mdx#potential-dns-issues-when-using-docker)
for more details and alternative solutions.
:::
##### For existing Traefik setup
<details>
<summary>docker-compose.for-traefik.yml</summary>
@@ -76,7 +181,7 @@ ### For existing Traefik setup
</details>
### With Traefik included
##### With Traefik included
<details>
<summary>docker-compose.with-traefik.yml</summary>
@@ -87,7 +192,7 @@ ### With Traefik included
</details>
### With Caddy Docker Proxy
##### With Caddy Docker Proxy
<details>
<summary>docker-compose.with-caddy.yml</summary>
@@ -98,9 +203,15 @@ ### With Caddy Docker Proxy
```
If you don't already have a network for Caddy to monitor, create one first:
```bash
docker network create caddy
```
</details>
### For other reverse proxies
##### For other reverse proxies
<details>
<summary>docker-compose.yml</summary>
@@ -111,7 +222,7 @@ ### For other reverse proxies
</details>
### Override file
##### Override file for customisation
<details>
<summary>docker-compose.override.yml</summary>
@@ -122,98 +233,24 @@ ### Override file
</details>
When picking the Traefik-related compose file, rename it to
`docker-compose.yml`, and rename the override file to
`docker-compose.override.yml`. Edit the latter with the values you want for your
server.
#### Starting Your Server
When picking the `caddy-docker-proxy` compose file, it's important to first
create the `caddy` network before spinning up the containers:
```bash
docker network create caddy
```
After that, you can rename it to `docker-compose.yml` and spin up the
containers!
Additional info about deploying Continuwuity can be found [here](generic.mdx).
### Build
Official Continuwuity images are built using **Docker Buildx** and the Dockerfile found at [`docker/Dockerfile`][dockerfile-path]. This approach uses common Docker tooling and enables efficient multi-platform builds.
The resulting images are widely compatible with Docker and other container runtimes like Podman or containerd.
The images *do not contain a shell*. They contain only the Continuwuity binary, required libraries, TLS certificates, and metadata.
<details>
<summary>Click to view the Dockerfile</summary>
You can also <a href="https://forgejo.ellis.link/continuwuation/continuwuation/src/branch/main/docker/Dockerfile" target="_blank">view the Dockerfile on Forgejo</a>.
```dockerfile file="../../docker/Dockerfile"
```
</details>
To build an image locally using Docker Buildx, you can typically run a command like:
```bash
# Build for the current platform and load into the local Docker daemon
docker buildx build --load --tag continuwuity:latest -f docker/Dockerfile .
# Example: Build for specific platforms and push to a registry.
# docker buildx build --platform linux/amd64,linux/arm64 --tag registry.io/org/continuwuity:latest -f docker/Dockerfile . --push
# Example: Build binary optimised for the current CPU (standard release profile)
# docker buildx build --load \
# --tag continuwuity:latest \
# --build-arg TARGET_CPU=native \
# -f docker/Dockerfile .
# Example: Build maxperf variant (release-max-perf profile with LTO)
# Optimised for runtime performance and smaller binary size, but requires longer build time
# docker buildx build --load \
# --tag continuwuity:latest-maxperf \
# --build-arg TARGET_CPU=native \
# --build-arg RUST_PROFILE=release-max-perf \
# -f docker/Dockerfile .
```
Refer to the Docker Buildx documentation for more advanced build options.
[dockerfile-path]: https://forgejo.ellis.link/continuwuation/continuwuation/src/branch/main/docker/Dockerfile
### Run
If you have already built the image or want to use one from the registries, you
can start the container and everything else in the compose file in detached
mode with:
1. Choose your compose file and rename it to `docker-compose.yml`
2. If using the override file, rename it to `docker-compose.override.yml` and
edit your values
3. Start the server:
```bash
docker compose up -d
```
> **Note:** Don't forget to modify and adjust the compose file to your needs.
See the [generic deployment guide](generic.mdx) for more deployment options.
### Use Traefik as Proxy
### Building Custom Images
As a container user, you probably know about Traefik. It is an easy-to-use
reverse proxy for making containerized apps and services available through the
web. With the Traefik-related docker-compose files provided above, it is equally easy
to deploy and use Continuwuity, with a small caveat. If you have already looked at
the files, you should have seen the `well-known` service, which is the
small caveat. Traefik is simply a proxy and load balancer and cannot
serve any kind of content. For Continuwuity to federate, we need to either
expose ports `443` and `8448` or serve two endpoints: `.well-known/matrix/client`
and `.well-known/matrix/server`.
With the service `well-known`, we use a single `nginx` container that serves
those two files.
Alternatively, you can use Continuwuity's built-in delegation file capability. Set up the delegation files in the configuration file, and then proxy paths under `/.well-known/matrix` to continuwuity. For example, the label ``traefik.http.routers.continuwuity.rule=(Host(`matrix.ellis.link`) || (Host(`ellis.link`) && PathPrefix(`/.well-known/matrix`)))`` does this for the domain `ellis.link`.
For information on building your own Continuwuity Docker images, see the
[Building Docker Images](../development/index.mdx#building-docker-images)
section in the development documentation.
## Voice communication

View File

@@ -3,3 +3,5 @@ # Continuwuity for FreeBSD
Continuwuity currently does not provide FreeBSD builds or FreeBSD packaging. However, Continuwuity does build and work on FreeBSD using the system-provided RocksDB.
Contributions to get Continuwuity packaged for FreeBSD are welcome.
Please join our [Continuwuity BSD](https://matrix.to/#/%23bsd:continuwuity.org) community room.

View File

@@ -1,7 +1,109 @@
# Continuwuity for Kubernetes
Continuwuity doesn't support horizontal scalability or distributed loading
natively. However, [a community-maintained Helm Chart is available here to run
natively. However, a deployment in Kubernetes is very similar to the docker
setup. This is because Continuwuity can be fully configured using environment
variables. A sample StatefulSet is shared below. The only thing missing is
a PVC definition (named `continuwuity-data`) for the volume mounted to
the StatefulSet, an Ingress resources to point your webserver to the
Continuwuity Pods, and a Service resource (targeting `app.kubernetes.io/name: continuwuity`)
to glue the Ingress and Pod together.
Carefully go through the `env` section and add, change, and remove any env vars you like using the [Configuration reference](https://continuwuity.org/reference/config.html)
```yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: continuwuity
namespace: matrix
labels:
app.kubernetes.io/name: continuwuity
spec:
replicas: 1
serviceName: continuwuity
podManagementPolicy: Parallel
selector:
matchLabels:
app.kubernetes.io/name: continuwuity
template:
metadata:
labels:
app.kubernetes.io/name: continuwuity
spec:
securityContext:
sysctls:
- name: net.ipv4.ip_unprivileged_port_start
value: "0"
containers:
- name: continuwuity
# use a sha hash <3
image: forgejo.ellis.link/continuwuation/continuwuity:latest
imagePullPolicy: IfNotPresent
ports:
- name: http
containerPort: 80
volumeMounts:
- mountPath: /data
name: data
subPath: data
securityContext:
capabilities:
add:
- NET_BIND_SERVICE
env:
- name: TOKIO_WORKER_THREADS
value: "2"
- name: CONTINUWUITY_SERVER_NAME
value: "example.com"
- name: CONTINUWUITY_DATABASE_PATH
value: "/data/db"
- name: CONTINUWUITY_DATABASE_BACKEND
value: "rocksdb"
- name: CONTINUWUITY_PORT
value: "80"
- name: CONTINUWUITY_MAX_REQUEST_SIZE
value: "20000000"
- name: CONTINUWUITY_ALLOW_FEDERATION
value: "true"
- name: CONTINUWUITY_TRUSTED_SERVERS
value: '["matrix.org"]'
- name: CONTINUWUITY_ADDRESS
value: "0.0.0.0"
- name: CONTINUWUITY_ROCKSDB_PARALLELISM_THREADS
value: "1"
- name: CONTINUWUITY_WELL_KNOWN__SERVER
value: "matrix.example.com:443"
- name: CONTINUWUITY_WELL_KNOWN__CLIENT
value: "https://matrix.example.com"
- name: CONTINUWUITY_ALLOW_REGISTRATION
value: "false"
- name: RUST_LOG
value: info
readinessProbe:
httpGet:
path: /_matrix/federation/v1/version
port: http
periodSeconds: 4
failureThreshold: 5
resources:
# Continuwuity might use quite some RAM :3
requests:
cpu: "2"
memory: "512Mi"
limits:
cpu: "4"
memory: "2048Mi"
volumes:
- name: data
persistentVolumeClaim:
claimName: continuwuity-data
```
---
Apart from manually configuring the containers,
[a community-maintained Helm Chart is available here to run
conduwuit on Kubernetes](https://gitlab.cronce.io/charts/conduwuit)
This should be compatible with Continuwuity, but you will need to change the image reference.

View File

@@ -2,7 +2,8 @@ # Development
Information about developing the project. If you are only interested in using
it, you can safely ignore this page. If you plan on contributing, see the
[contributor's guide](./contributing.mdx) and [code style guide](./code_style.mdx).
[contributor's guide](./contributing.mdx) and
[code style guide](./code_style.mdx).
## Continuwuity project layout
@@ -12,86 +13,98 @@ ## Continuwuity project layout
`Cargo.toml`.
The crate names are generally self-explanatory:
- `admin` is the admin room
- `api` is the HTTP API, Matrix C-S and S-S endpoints, etc
- `core` is core Continuwuity functionality like config loading, error definitions,
global utilities, logging infrastructure, etc
- `database` is RocksDB methods, helpers, RocksDB config, and general database definitions,
utilities, or functions
- `macros` are Continuwuity Rust [macros][macros] like general helper macros, logging
and error handling macros, and [syn][syn] and [procedural macros][proc-macro]
used for admin room commands and others
- `core` is core Continuwuity functionality like config loading, error
definitions, global utilities, logging infrastructure, etc
- `database` is RocksDB methods, helpers, RocksDB config, and general database
definitions, utilities, or functions
- `macros` are Continuwuity Rust [macros][macros] like general helper macros,
logging and error handling macros, and [syn][syn] and [procedural
macros][proc-macro] used for admin room commands and others
- `main` is the "primary" sub-crate. This is where the `main()` function lives,
tokio worker and async initialisation, Sentry initialisation, [clap][clap] init,
and signal handling. If you are adding new [Rust features][features], they *must*
go here.
- `router` is the webserver and request handling bits, using axum, tower, tower-http,
hyper, etc, and the [global server state][state] to access `services`.
tokio worker and async initialisation, Sentry initialisation, [clap][clap]
init, and signal handling. If you are adding new [Rust features][features],
they _must_ go here.
- `router` is the webserver and request handling bits, using axum, tower,
tower-http, hyper, etc, and the [global server state][state] to access
`services`.
- `service` is the high-level database definitions and functions for data,
outbound/sending code, and other business logic such as media fetching.
outbound/sending code, and other business logic such as media fetching.
It is highly unlikely you will ever need to add a new workspace member, but
if you truly find yourself needing to, we recommend reaching out to us in
the Matrix room for discussions about it beforehand.
It is highly unlikely you will ever need to add a new workspace member, but if
you truly find yourself needing to, we recommend reaching out to us in the
Matrix room for discussions about it beforehand.
The primary inspiration for this design was apart of hot reloadable development,
to support "Continuwuity as a library" where specific parts can simply be swapped out.
There is evidence Conduit wanted to go this route too as `axum` is technically an
optional feature in Conduit, and can be compiled without the binary or axum library
for handling inbound web requests; but it was never completed or worked.
to support "Continuwuity as a library" where specific parts can simply be
swapped out. There is evidence Conduit wanted to go this route too as `axum` is
technically an optional feature in Conduit, and can be compiled without the
binary or axum library for handling inbound web requests; but it was never
completed or worked.
See the Rust documentation on [Workspaces][workspaces] for general questions
and information on Cargo workspaces.
See the Rust documentation on [Workspaces][workspaces] for general questions and
information on Cargo workspaces.
## Adding compile-time [features][features]
If you'd like to add a compile-time feature, you must first define it in
the `main` workspace crate located in `src/main/Cargo.toml`. The feature must
enable a feature in the other workspace crate(s) you intend to use it in. Then
the said workspace crate(s) must define the feature there in its `Cargo.toml`.
If you'd like to add a compile-time feature, you must first define it in the
`main` workspace crate located in `src/main/Cargo.toml`. The feature must enable
a feature in the other workspace crate(s) you intend to use it in. Then the said
workspace crate(s) must define the feature there in its `Cargo.toml`.
So, if this is adding a feature to the API such as `woof`, you define the feature
in the `api` crate's `Cargo.toml` as `woof = []`. The feature definition in `main`'s
`Cargo.toml` will be `woof = ["conduwuit-api/woof"]`.
So, if this is adding a feature to the API such as `woof`, you define the
feature in the `api` crate's `Cargo.toml` as `woof = []`. The feature definition
in `main`'s `Cargo.toml` will be `woof = ["conduwuit-api/woof"]`.
The rationale for this is due to Rust / Cargo not supporting
["workspace level features"][9], we must make a choice of; either scattering
features all over the workspace crates, making it difficult for anyone to add
or remove default features; or define all the features in one central workspace
crate that propagate down/up to the other workspace crates. It is a Cargo pitfall,
and we'd like to see better developer UX in Rust's Workspaces.
The rationale for this is due to Rust / Cargo not supporting ["workspace level
features"][9], we must make a choice of; either scattering features all over the
workspace crates, making it difficult for anyone to add or remove default
features; or define all the features in one central workspace crate that
propagate down/up to the other workspace crates. It is a Cargo pitfall, and we'd
like to see better developer UX in Rust's Workspaces.
Additionally, the definition of one single place makes "feature collection" in our
Nix flake a million times easier instead of collecting and deduping them all from
searching in all the workspace crates' `Cargo.toml`s. Though we wouldn't need to
do this if Rust supported workspace-level features to begin with.
Additionally, the definition of one single place makes "feature collection" in
our Nix flake a million times easier instead of collecting and deduping them all
from searching in all the workspace crates' `Cargo.toml`s. Though we wouldn't
need to do this if Rust supported workspace-level features to begin with.
## List of forked dependencies
During Continuwuity (and prior projects) development, we have had to fork some dependencies to support our use-cases.
These forks exist for various reasons including features that upstream projects won't accept,
faster-paced development, Continuwuity-specific usecases, or lack of time to upstream changes.
During Continuwuity (and prior projects) development, we have had to fork some
dependencies to support our use-cases. These forks exist for various reasons
including features that upstream projects won't accept, faster-paced
development, Continuwuity-specific usecases, or lack of time to upstream
changes.
All forked dependencies are maintained under the [continuwuation organization on Forgejo](https://forgejo.ellis.link/continuwuation):
All forked dependencies are maintained under the
[continuwuation organization on Forgejo](https://forgejo.ellis.link/continuwuation):
- [ruwuma][continuwuation-ruwuma] - Fork of [ruma/ruma][ruma] with various performance improvements, more features and better client/server interop
- [rocksdb][continuwuation-rocksdb] - Fork of [facebook/rocksdb][rocksdb] via [`@zaidoon1`][8] with liburing build fixes and GCC debug build fixes
- [jemallocator][continuwuation-jemallocator] - Fork of [tikv/jemallocator][jemallocator] fixing musl builds, suspicious code,
and adding support for redzones in Valgrind
- [rustyline-async][continuwuation-rustyline-async] - Fork of [zyansheep/rustyline-async][rustyline-async] with tab completion callback
and `CTRL+\` signal quit event for Continuwuity console CLI
- [rust-rocksdb][continuwuation-rust-rocksdb] - Fork of [rust-rocksdb/rust-rocksdb][rust-rocksdb] fixing musl build issues,
removing unnecessary `gtest` include, and using our RocksDB and jemallocator forks
- [tracing][continuwuation-tracing] - Fork of [tokio-rs/tracing][tracing] implementing `Clone` for `EnvFilter` to
support dynamically changing tracing environments
- [ruwuma][continuwuation-ruwuma] - Fork of [ruma/ruma][ruma] with various
performance improvements, more features and better client/server interop
- [rocksdb][continuwuation-rocksdb] - Fork of [facebook/rocksdb][rocksdb] via
[`@zaidoon1`][8] with liburing build fixes and GCC debug build fixes
- [jemallocator][continuwuation-jemallocator] - Fork of
[tikv/jemallocator][jemallocator] fixing musl builds, suspicious code, and
adding support for redzones in Valgrind
- [rustyline-async][continuwuation-rustyline-async] - Fork of
[zyansheep/rustyline-async][rustyline-async] with tab completion callback and
`CTRL+\` signal quit event for Continuwuity console CLI
- [rust-rocksdb][continuwuation-rust-rocksdb] - Fork of
[rust-rocksdb/rust-rocksdb][rust-rocksdb] fixing musl build issues, removing
unnecessary `gtest` include, and using our RocksDB and jemallocator forks
- [tracing][continuwuation-tracing] - Fork of [tokio-rs/tracing][tracing]
implementing `Clone` for `EnvFilter` to support dynamically changing tracing
environments
## Debugging with `tokio-console`
[`tokio-console`][7] can be a useful tool for debugging and profiling. To make a
`tokio-console`-enabled build of Continuwuity, enable the `tokio_console` feature,
disable the default `release_max_log_level` feature, and set the `--cfg
tokio_unstable` flag to enable experimental tokio APIs. A build might look like
this:
`tokio-console`-enabled build of Continuwuity, enable the `tokio_console`
feature, disable the default `release_max_log_level` feature, and set the
`--cfg tokio_unstable` flag to enable experimental tokio APIs. A build might
look like this:
```bash
RUSTFLAGS="--cfg tokio_unstable" cargo +nightly build \
@@ -100,34 +113,84 @@ ## Debugging with `tokio-console`
--features=systemd,element_hacks,gzip_compression,brotli_compression,zstd_compression,tokio_console
```
You will also need to enable the `tokio_console` config option in Continuwuity when
starting it. This was due to tokio-console causing gradual memory leak/usage
if left enabled.
You will also need to enable the `tokio_console` config option in Continuwuity
when starting it. This was due to tokio-console causing gradual memory
leak/usage if left enabled.
## Building Docker Images
To build a Docker image for Continuwuity, use the standard Docker build command:
Official Continuwuity images are built using **Docker Buildx** and the
Dockerfile found at [`docker/Dockerfile`][dockerfile-path].
The images are compatible with Docker and other container runtimes like Podman
or containerd.
The images _do not contain a shell_. They contain only the Continuwuity binary,
required libraries, TLS certificates, and metadata.
<details>
<summary>Click to view the Dockerfile</summary>
You can also
<a
href="<https://forgejo.ellis.link/continuwuation/continuwuation/src/branch/main/docker/Dockerfile>"
target="_blank"
>
view the Dockerfile on Forgejo
</a>
.
```dockerfile file="../../docker/Dockerfile"
```bash
docker build -f docker/Dockerfile .
```
The image can be cross-compiled for different architectures.
</details>
### Building Locally
To build an image locally using Docker Buildx:
```bash
# Build for the current platform and load into the local Docker daemon
docker buildx build --load --tag continuwuity:latest -f docker/Dockerfile .
# Example: Build for specific platforms and push to a registry
# docker buildx build --platform linux/amd64,linux/arm64 --tag registry.io/org/continuwuity:latest -f docker/Dockerfile . --push
# Example: Build binary optimised for the current CPU (standard release profile)
# docker buildx build --load \
# --tag continuwuity:latest \
# --build-arg TARGET_CPU=native \
# -f docker/Dockerfile .
# Example: Build maxperf variant (release-max-perf profile with LTO)
# docker buildx build --load \
# --tag continuwuity:latest-maxperf \
# --build-arg TARGET_CPU=native \
# --build-arg RUST_PROFILE=release-max-perf \
# -f docker/Dockerfile .
```
Refer to the Docker Buildx documentation for more advanced build options.
[dockerfile-path]:
https://forgejo.ellis.link/continuwuation/continuwuation/src/branch/main/docker/Dockerfile
[continuwuation-ruwuma]: https://forgejo.ellis.link/continuwuation/ruwuma
[continuwuation-rocksdb]: https://forgejo.ellis.link/continuwuation/rocksdb
[continuwuation-jemallocator]: https://forgejo.ellis.link/continuwuation/jemallocator
[continuwuation-rustyline-async]: https://forgejo.ellis.link/continuwuation/rustyline-async
[continuwuation-rust-rocksdb]: https://forgejo.ellis.link/continuwuation/rust-rocksdb
[continuwuation-jemallocator]:
https://forgejo.ellis.link/continuwuation/jemallocator
[continuwuation-rustyline-async]:
https://forgejo.ellis.link/continuwuation/rustyline-async
[continuwuation-rust-rocksdb]:
https://forgejo.ellis.link/continuwuation/rust-rocksdb
[continuwuation-tracing]: https://forgejo.ellis.link/continuwuation/tracing
[ruma]: https://github.com/ruma/ruma/
[rocksdb]: https://github.com/facebook/rocksdb/
[jemallocator]: https://github.com/tikv/jemallocator/
[rustyline-async]: https://github.com/zyansheep/rustyline-async/
[rust-rocksdb]: https://github.com/rust-rocksdb/rust-rocksdb/
[tracing]: https://github.com/tokio-rs/tracing/
[7]: https://docs.rs/tokio-console/latest/tokio_console/
[8]: https://github.com/zaidoon1/
[9]: https://github.com/rust-lang/cargo/issues/12162

View File

@@ -51,7 +51,13 @@ ## Can I try it out?
Check out the [documentation](https://continuwuity.org) for installation instructions.
There are currently no open registration continuwuity instances available.
If you want to try it out as a user, we have some partnered homeservers you can use:
* You can head over to [https://federated.nexus](https://federated.nexus/) in your browser.
* Hit the `Apply to Join` button. Once your request has been accepted, you will receive an email with your username and password.
* Head over to [https://app.federated.nexus](https://app.federated.nexus/) and you can sign in there, or use any other matrix chat client you wish elsewhere.
* Your username for matrix will be in the form of `@username:federated.nexus`, however you can simply use the `username` part to log in. Your password is your password.
* There's also [https://continuwuity.rocks/](https://continuwuity.rocks/). You can register a new account using Cinny via [this convenient link](https://app.cinny.in/register/continuwuity.rocks), or you can use Element or another matrix client *that supports registration*.
## What are we working on?

View File

@@ -4,6 +4,11 @@
"name": "config",
"label": "Configuration"
},
{
"type": "file",
"name": "environment-variables",
"label": "Environment Variables"
},
{
"type": "file",
"name": "admin",

View File

@@ -36,3 +36,7 @@ ## `!admin media delete-all-from-user`
## `!admin media delete-all-from-server`
Deletes all remote media from the specified remote server. This will always ignore errors by default
## `!admin media delete-url-preview`
Deletes a cached URL preview, forcing it to be re-fetched. Use --all to purge all cached URL previews

View File

@@ -0,0 +1,281 @@
# Environment Variables
Continuwuity can be configured entirely through environment variables, making it
ideal for containerised deployments and infrastructure-as-code scenarios.
This is a convenience reference and may not be exhaustive. The
[Configuration Reference](./config.mdx) is the primary source for all
configuration options.
## Prefix System
Continuwuity supports three environment variable prefixes for backwards
compatibility:
- `CONTINUWUITY_*` (current, recommended)
- `CONDUWUIT_*` (compatibility)
- `CONDUIT_*` (legacy)
All three prefixes work identically. Use double underscores (`__`) to represent
nested configuration sections from the TOML config.
**Examples:**
```bash
# Simple top-level config
CONTINUWUITY_SERVER_NAME="matrix.example.com"
CONTINUWUITY_PORT="8008"
# Nested config sections use double underscores
# This maps to [database] section in TOML
CONTINUWUITY_DATABASE__PATH="/var/lib/continuwuity"
# This maps to [tls] section in TOML
CONTINUWUITY_TLS__CERTS="/path/to/cert.pem"
```
## Configuration File Override
You can specify a custom configuration file path:
- `CONTINUWUITY_CONFIG` - Path to continuwuity.toml (current)
- `CONDUWUIT_CONFIG` - Path to config file (compatibility)
- `CONDUIT_CONFIG` - Path to config file (legacy)
## Essential Variables
These are the minimum variables needed for a working deployment:
| Variable | Description | Default |
| ---------------------------- | ---------------------------------- | ---------------------- |
| `CONTINUWUITY_SERVER_NAME` | Your Matrix server's domain name | Required |
| `CONTINUWUITY_DATABASE_PATH` | Path to RocksDB database directory | `/var/lib/conduwuit` |
| `CONTINUWUITY_ADDRESS` | IP address to bind to | `["127.0.0.1", "::1"]` |
| `CONTINUWUITY_PORT` | Port to listen on | `8008` |
## Network Configuration
| Variable | Description | Default |
| -------------------------------- | ----------------------------------------------- | ---------------------- |
| `CONTINUWUITY_ADDRESS` | Bind address (use `0.0.0.0` for all interfaces) | `["127.0.0.1", "::1"]` |
| `CONTINUWUITY_PORT` | HTTP port | `8008` |
| `CONTINUWUITY_UNIX_SOCKET_PATH` | UNIX socket path (alternative to TCP) | - |
| `CONTINUWUITY_UNIX_SOCKET_PERMS` | Socket permissions (octal) | `660` |
## Database Configuration
| Variable | Description | Default |
| ------------------------------------------ | --------------------------- | -------------------- |
| `CONTINUWUITY_DATABASE_PATH` | RocksDB data directory | `/var/lib/conduwuit` |
| `CONTINUWUITY_DATABASE_BACKUP_PATH` | Backup directory | - |
| `CONTINUWUITY_DATABASE_BACKUPS_TO_KEEP` | Number of backups to retain | `1` |
| `CONTINUWUITY_DB_CACHE_CAPACITY_MB` | Database read cache (MB) | - |
| `CONTINUWUITY_DB_WRITE_BUFFER_CAPACITY_MB` | Write cache (MB) | - |
## Cache Configuration
| Variable | Description |
| ---------------------------------------- | ------------------------ |
| `CONTINUWUITY_CACHE_CAPACITY_MODIFIER` | LRU cache multiplier |
| `CONTINUWUITY_PDU_CACHE_CAPACITY` | PDU cache entries |
| `CONTINUWUITY_AUTH_CHAIN_CACHE_CAPACITY` | Auth chain cache entries |
## DNS Configuration
Configure DNS resolution behaviour for federation and external requests.
| Variable | Description | Default |
| ------------------------------------ | ---------------------------- | -------- |
| `CONTINUWUITY_DNS_CACHE_ENTRIES` | Max DNS cache entries | `32768` |
| `CONTINUWUITY_DNS_MIN_TTL` | Minimum cache TTL (seconds) | `10800` |
| `CONTINUWUITY_DNS_MIN_TTL_NXDOMAIN` | NXDOMAIN cache TTL (seconds) | `259200` |
| `CONTINUWUITY_DNS_ATTEMPTS` | Retry attempts | - |
| `CONTINUWUITY_DNS_TIMEOUT` | Query timeout (seconds) | - |
| `CONTINUWUITY_DNS_TCP_FALLBACK` | Allow TCP fallback | - |
| `CONTINUWUITY_QUERY_ALL_NAMESERVERS` | Query all nameservers | - |
| `CONTINUWUITY_QUERY_OVER_TCP_ONLY` | TCP-only queries | - |
## Request Configuration
| Variable | Description |
| ------------------------------------ | ----------------------------- |
| `CONTINUWUITY_MAX_REQUEST_SIZE` | Max HTTP request size (bytes) |
| `CONTINUWUITY_REQUEST_CONN_TIMEOUT` | Connection timeout (seconds) |
| `CONTINUWUITY_REQUEST_TIMEOUT` | Overall request timeout |
| `CONTINUWUITY_REQUEST_TOTAL_TIMEOUT` | Total timeout |
| `CONTINUWUITY_REQUEST_IDLE_TIMEOUT` | Idle timeout |
| `CONTINUWUITY_REQUEST_IDLE_PER_HOST` | Idle connections per host |
## Federation Configuration
Control how your server federates with other Matrix servers.
| Variable | Description | Default |
| ---------------------------------------------- | ----------------------------- | ------- |
| `CONTINUWUITY_ALLOW_FEDERATION` | Enable federation | `true` |
| `CONTINUWUITY_FEDERATION_LOOPBACK` | Allow loopback federation | - |
| `CONTINUWUITY_FEDERATION_CONN_TIMEOUT` | Connection timeout | - |
| `CONTINUWUITY_FEDERATION_TIMEOUT` | Request timeout | - |
| `CONTINUWUITY_FEDERATION_IDLE_TIMEOUT` | Idle timeout | - |
| `CONTINUWUITY_FEDERATION_IDLE_PER_HOST` | Idle connections per host | - |
| `CONTINUWUITY_TRUSTED_SERVERS` | JSON array of trusted servers | - |
| `CONTINUWUITY_QUERY_TRUSTED_KEY_SERVERS_FIRST` | Query trusted first | - |
| `CONTINUWUITY_ONLY_QUERY_TRUSTED_KEY_SERVERS` | Only query trusted | - |
**Example:**
```bash
# Trust matrix.org for key verification
CONTINUWUITY_TRUSTED_SERVERS='["matrix.org"]'
```
## Registration & User Configuration
Control user registration and account creation behaviour.
| Variable | Description | Default |
| ------------------------------------------ | --------------------- | ------- |
| `CONTINUWUITY_ALLOW_REGISTRATION` | Enable registration | `true` |
| `CONTINUWUITY_REGISTRATION_TOKEN` | Token requirement | - |
| `CONTINUWUITY_SUSPEND_ON_REGISTER` | Suspend new accounts | - |
| `CONTINUWUITY_NEW_USER_DISPLAYNAME_SUFFIX` | Display name suffix | 🏳️‍⚧️ |
| `CONTINUWUITY_RECAPTCHA_SITE_KEY` | reCAPTCHA site key | - |
| `CONTINUWUITY_RECAPTCHA_PRIVATE_SITE_KEY` | reCAPTCHA private key | - |
**Example:**
```bash
# Disable open registration
CONTINUWUITY_ALLOW_REGISTRATION="false"
# Require a registration token
CONTINUWUITY_REGISTRATION_TOKEN="your_secret_token_here"
```
## Feature Configuration
| Variable | Description | Default |
| ---------------------------------------------------------- | -------------------------- | ------- |
| `CONTINUWUITY_ALLOW_ENCRYPTION` | Enable E2EE | `true` |
| `CONTINUWUITY_ALLOW_ROOM_CREATION` | Enable room creation | - |
| `CONTINUWUITY_ALLOW_UNSTABLE_ROOM_VERSIONS` | Allow unstable versions | - |
| `CONTINUWUITY_DEFAULT_ROOM_VERSION` | Default room version | `v11` |
| `CONTINUWUITY_REQUIRE_AUTH_FOR_PROFILE_REQUESTS` | Auth for profiles | - |
| `CONTINUWUITY_ALLOW_PUBLIC_ROOM_DIRECTORY_OVER_FEDERATION` | Federate directory | - |
| `CONTINUWUITY_ALLOW_PUBLIC_ROOM_DIRECTORY_WITHOUT_AUTH` | Unauth directory | - |
| `CONTINUWUITY_ALLOW_DEVICE_NAME_FEDERATION` | Device names in federation | - |
## TLS Configuration
Built-in TLS support is primarily for testing. **For production deployments,
especially when federating on the internet, use a reverse proxy** (Traefik,
Caddy, nginx) to handle TLS termination.
| Variable | Description |
| --------------------------------- | ------------------------- |
| `CONTINUWUITY_TLS__CERTS` | TLS certificate file path |
| `CONTINUWUITY_TLS__KEY` | TLS private key path |
| `CONTINUWUITY_TLS__DUAL_PROTOCOL` | Support TLS 1.2 + 1.3 |
**Example (testing only):**
```bash
CONTINUWUITY_TLS__CERTS="/etc/letsencrypt/live/matrix.example.com/fullchain.pem"
CONTINUWUITY_TLS__KEY="/etc/letsencrypt/live/matrix.example.com/privkey.pem"
```
## Logging Configuration
Control log output format and verbosity.
| Variable | Description | Default |
| ------------------------------ | ------------------ | ------- |
| `CONTINUWUITY_LOG` | Log filter level | - |
| `CONTINUWUITY_LOG_COLORS` | ANSI colours | `true` |
| `CONTINUWUITY_LOG_SPAN_EVENTS` | Log span events | `none` |
| `CONTINUWUITY_LOG_THREAD_IDS` | Include thread IDs | - |
**Examples:**
```bash
# Set log level to info
CONTINUWUITY_LOG="info"
# Enable debug logging for specific modules
CONTINUWUITY_LOG="warn,continuwuity::api=debug"
# Disable colours for log aggregation
CONTINUWUITY_LOG_COLORS="false"
```
## Observability Configuration
| Variable | Description |
| ---------------------------------------- | --------------------- |
| `CONTINUWUITY_ALLOW_OTLP` | Enable OpenTelemetry |
| `CONTINUWUITY_OTLP_FILTER` | OTLP filter level |
| `CONTINUWUITY_OTLP_PROTOCOL` | Protocol (http/grpc) |
| `CONTINUWUITY_TRACING_FLAME` | Enable flame graphs |
| `CONTINUWUITY_TRACING_FLAME_FILTER` | Flame graph filter |
| `CONTINUWUITY_TRACING_FLAME_OUTPUT_PATH` | Output directory |
| `CONTINUWUITY_SENTRY` | Enable Sentry |
| `CONTINUWUITY_SENTRY_ENDPOINT` | Sentry DSN |
| `CONTINUWUITY_SENTRY_SEND_SERVER_NAME` | Include server name |
| `CONTINUWUITY_SENTRY_TRACES_SAMPLE_RATE` | Sample rate (0.0-1.0) |
## Admin Configuration
Configure admin users and automated command execution.
| Variable | Description | Default |
| ------------------------------------------ | -------------------------------- | ----------------- |
| `CONTINUWUITY_ADMINS_LIST` | JSON array of admin user IDs | - |
| `CONTINUWUITY_ADMINS_FROM_ROOM` | Derive admins from room | - |
| `CONTINUWUITY_ADMIN_ESCAPE_COMMANDS` | Allow `\` prefix in public rooms | - |
| `CONTINUWUITY_ADMIN_CONSOLE_AUTOMATIC` | Auto-activate console | - |
| `CONTINUWUITY_ADMIN_EXECUTE` | JSON array of startup commands | - |
| `CONTINUWUITY_ADMIN_EXECUTE_ERRORS_IGNORE` | Ignore command errors | - |
| `CONTINUWUITY_ADMIN_SIGNAL_EXECUTE` | Commands on SIGUSR2 | - |
| `CONTINUWUITY_ADMIN_ROOM_TAG` | Admin room tag | `m.server_notice` |
**Examples:**
```bash
# Create admin user on startup
CONTINUWUITY_ADMIN_EXECUTE='["users create-user admin", "users make-user-admin admin"]'
# Specify admin users directly
CONTINUWUITY_ADMINS_LIST='["@alice:example.com", "@bob:example.com"]'
```
## Media & URL Preview Configuration
| Variable | Description |
| ---------------------------------------------------- | ------------------ |
| `CONTINUWUITY_URL_PREVIEW_BOUND_INTERFACE` | Bind interface |
| `CONTINUWUITY_URL_PREVIEW_DOMAIN_CONTAINS_ALLOWLIST` | Domain allowlist |
| `CONTINUWUITY_URL_PREVIEW_DOMAIN_EXPLICIT_ALLOWLIST` | Explicit allowlist |
| `CONTINUWUITY_URL_PREVIEW_DOMAIN_EXPLICIT_DENYLIST` | Explicit denylist |
| `CONTINUWUITY_URL_PREVIEW_MAX_SPIDER_SIZE` | Max fetch size |
| `CONTINUWUITY_URL_PREVIEW_TIMEOUT` | Fetch timeout |
| `CONTINUWUITY_IP_RANGE_DENYLIST` | IP range denylist |
## Tokio Runtime Configuration
These can be set as environment variables or CLI arguments:
| Variable | Description |
| ----------------------------------------- | -------------------------- |
| `TOKIO_WORKER_THREADS` | Worker thread count |
| `TOKIO_GLOBAL_QUEUE_INTERVAL` | Global queue interval |
| `TOKIO_EVENT_INTERVAL` | Event interval |
| `TOKIO_MAX_IO_EVENTS_PER_TICK` | Max I/O events per tick |
| `CONTINUWUITY_RUNTIME_HISTOGRAM_INTERVAL` | Histogram bucket size (μs) |
| `CONTINUWUITY_RUNTIME_HISTOGRAM_BUCKETS` | Bucket count |
| `CONTINUWUITY_RUNTIME_WORKER_AFFINITY` | Enable worker affinity |
## See Also
- [Configuration Reference](./config.mdx) - Complete TOML configuration
documentation
- [Admin Commands](./admin/) - Admin command reference

View File

@@ -1,13 +1,28 @@
# Troubleshooting Continuwuity
> **Docker users ⚠️**
>
> Docker can be difficult to use and debug. It's common for Docker
> misconfigurations to cause issues, particularly with networking and permissions.
> Please check that your issues are not due to problems with your Docker setup.
:::warning{title="Docker users:"}
Docker can be difficult to use and debug. It's common for Docker
misconfigurations to cause issues, particularly with networking and permissions.
Please check that your issues are not due to problems with your Docker setup.
:::
## Continuwuity and Matrix issues
### Slow joins to rooms
Some slowness is to be expected if you're the first person on your homserver to join a room (which will
always be the case for single-user homeservers). In this situation, your homeserver has to verify the signatures of
all of the state events sent by other servers before your join. To make this process as fast as possible, make sure you have
multiple fast, trusted servers listed in `trusted_servers` in your configuration, and ensure
`query_trusted_key_servers_first_on_join` is set to true (the default).
If you need suggestions for trusted servers, ask in the Continuwuity main room.
However, _very_ slow joins, especially to rooms with only a few users in them or rooms created by another user
on your homeserver, may be caused by [issue !779](https://forgejo.ellis.link/continuwuation/continuwuity/issues/779),
which is a longstanding bug with synchronizing room joins to clients. In this situation, you did succeed in joining the room, but
the bug caused your homeserver to forget to tell your client. **To fix this, clear your client's cache.** Both Element and Cinny
have a button to clear their cache in the "About" section of their settings.
### Lost access to admin room
You can reinvite yourself to the admin room through the following methods:

View File

@@ -77,7 +77,12 @@ rec {
craneLib.buildDepsOnly (
(commonAttrs commonAttrsArgs)
// {
env = uwuenv.buildDepsOnlyEnv // (makeRocksDBEnv { inherit rocksdb; });
env = uwuenv.buildDepsOnlyEnv
// (makeRocksDBEnv { inherit rocksdb; })
// {
# required since we started using unstable reqwest apparently ... otherwise the all-features build will fail
RUSTFLAGS = "--cfg reqwest_unstable";
};
inherit (features) cargoExtraArgs;
}
@@ -102,7 +107,13 @@ rec {
'';
cargoArtifacts = deps;
doCheck = true;
env = uwuenv.buildPackageEnv // rocksdbEnv;
env =
uwuenv.buildPackageEnv
// rocksdbEnv
// {
# required since we started using unstable reqwest apparently ... otherwise the all-features build will fail
RUSTFLAGS = "--cfg reqwest_unstable";
};
passthru.env = uwuenv.buildPackageEnv // rocksdbEnv;
meta.mainProgram = crateInfo.pname;
inherit (features) cargoExtraArgs;

View File

@@ -1,6 +1,7 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
"extends": ["config:recommended", "replacements:all"],
"dependencyDashboard": true,
"osvVulnerabilityAlerts": true,
"lockFileMaintenance": {
"enabled": true,
@@ -57,12 +58,25 @@
"matchUpdateTypes": ["minor", "patch"],
"groupName": "github-actions-non-major"
},
{
"description": "Batch patch-level Node.js dependency updates",
"matchManagers": ["npm"],
"matchUpdateTypes": ["patch"],
"groupName": "node-patch-updates"
},
{
"description": "Pin forgejo artifact actions to prevent breaking changes",
"matchManagers": ["github-actions"],
"matchPackageNames": ["forgejo/upload-artifact", "forgejo/download-artifact"],
"enabled": false
},
{
"description": "Auto-merge crate-ci/typos minor updates",
"matchPackageNames": ["crate-ci/typos"],
"matchUpdateTypes": ["minor", "patch"],
"automerge": true,
"automergeStrategy": "fast-forward"
},
{
"description": "Auto-merge renovatebot docker image updates",
"matchDatasources": ["docker"],

View File

@@ -30,12 +30,15 @@ pub(super) async fn incoming_federation(&self) -> Result {
.federation_handletime
.read();
let mut msg = format!("Handling {} incoming pdus:\n", map.len());
let mut msg = format!(
"Handling {} incoming PDUs across {} active transactions:\n",
map.len(),
self.services.transactions.txn_active_handle_count()
);
for (r, (e, i)) in map.iter() {
let elapsed = i.elapsed();
writeln!(msg, "{} {}: {}m{}s", r, e, elapsed.as_secs() / 60, elapsed.as_secs() % 60)?;
}
msg
};

View File

@@ -388,3 +388,19 @@ pub(super) async fn get_remote_thumbnail(
self.write_str(&format!("```\n{result:#?}\nreceived {len} bytes for file content.\n```"))
.await
}
#[admin_command]
pub(super) async fn delete_url_preview(&self, url: Option<String>, all: bool) -> Result {
if all {
self.services.media.clear_url_previews().await;
return self.write_str("Deleted all cached URL previews.").await;
}
let url = url.expect("clap enforces url is required unless --all");
self.services.media.remove_url_preview(&url).await?;
self.write_str(&format!("Deleted cached URL preview for: {url}"))
.await
}

View File

@@ -108,4 +108,16 @@ pub enum MediaCommand {
#[arg(long, default_value("800"))]
height: u32,
},
/// Deletes a cached URL preview, forcing it to be re-fetched.
/// Use --all to purge all cached URL previews.
DeleteUrlPreview {
/// The URL to clear from the saved preview data
#[arg(required_unless_present = "all")]
url: Option<String>,
/// Purge all cached URL previews
#[arg(long, conflicts_with = "url")]
all: bool,
},
}

View File

@@ -209,7 +209,7 @@ pub(super) async fn compact(
let parallelism = parallelism.unwrap_or(1);
let results = maps
.into_iter()
.try_stream()
.try_stream::<conduwuit::Error>()
.paralleln_and_then(runtime, parallelism, move |map| {
map.compact_blocking(options.clone())?;
Ok(map.name().to_owned())

View File

@@ -20,7 +20,17 @@ pub enum ResolverCommand {
name: Option<String>,
},
/// Flush a specific server from the resolver caches or everything
/// Flush a given server from the resolver caches or flush them completely
///
/// * Examples:
/// * Flush a specific server:
///
/// `!admin query resolver flush-cache matrix.example.com`
///
/// * Flush all resolver caches completely:
///
/// `!admin query resolver flush-cache --all`
#[command(verbatim_doc_comment)]
FlushCache {
name: Option<OwnedServerName>,

View File

@@ -89,13 +89,7 @@ async fn ban_room(&self, room: OwnedRoomOrAliasId) -> Result {
locally, if not using get_alias_helper to fetch room ID remotely"
);
match self
.services
.rooms
.alias
.resolve_alias(room_alias, None)
.await
{
match self.services.rooms.alias.resolve_alias(room_alias).await {
| Ok((room_id, servers)) => {
debug!(
%room_id,
@@ -235,7 +229,7 @@ async fn ban_list_of_rooms(&self) -> Result {
.services
.rooms
.alias
.resolve_alias(room_alias, None)
.resolve_alias(room_alias)
.await
{
| Ok((room_id, servers)) => {
@@ -388,13 +382,7 @@ async fn unban_room(&self, room: OwnedRoomOrAliasId) -> Result {
room ID over federation"
);
match self
.services
.rooms
.alias
.resolve_alias(room_alias, None)
.await
{
match self.services.rooms.alias.resolve_alias(room_alias).await {
| Ok((room_id, servers)) => {
debug!(
%room_id,

View File

@@ -86,7 +86,7 @@ pub(super) async fn list_backups(&self) -> Result {
.db
.backup_list()?
.try_stream()
.try_for_each(|result| write!(self, "{result}"))
.try_for_each(|result| writeln!(self, "{result}"))
.await
}

View File

@@ -252,6 +252,13 @@ pub(crate) async fn register_route(
}
}
// Don't allow registration with user IDs that aren't local
if !services.globals.user_is_local(&user_id) {
return Err!(Request(InvalidUsername(
"Username {body_username} is not local to this server"
)));
}
user_id
},
| Err(e) => {

View File

@@ -1,12 +1,6 @@
use axum::extract::State;
use conduwuit::{Err, Result, debug};
use conduwuit_service::Services;
use futures::StreamExt;
use rand::seq::SliceRandom;
use ruma::{
OwnedServerName, RoomAliasId, RoomId,
api::client::alias::{create_alias, delete_alias, get_alias},
};
use conduwuit::{Err, Result};
use ruma::api::client::alias::{create_alias, delete_alias, get_alias};
use crate::Ruma;
@@ -96,65 +90,9 @@ pub(crate) async fn get_alias_route(
) -> Result<get_alias::v3::Response> {
let room_alias = body.body.room_alias;
let Ok((room_id, servers)) = services.rooms.alias.resolve_alias(&room_alias, None).await
else {
let Ok((room_id, servers)) = services.rooms.alias.resolve_alias(&room_alias).await else {
return Err!(Request(NotFound("Room with alias not found.")));
};
let servers = room_available_servers(&services, &room_id, &room_alias, servers).await;
debug!(%room_alias, %room_id, "available servers: {servers:?}");
Ok(get_alias::v3::Response::new(room_id, servers))
}
async fn room_available_servers(
services: &Services,
room_id: &RoomId,
room_alias: &RoomAliasId,
pre_servers: Vec<OwnedServerName>,
) -> Vec<OwnedServerName> {
// find active servers in room state cache to suggest
let mut servers: Vec<OwnedServerName> = services
.rooms
.state_cache
.room_servers(room_id)
.map(ToOwned::to_owned)
.collect()
.await;
// push any servers we want in the list already (e.g. responded remote alias
// servers, room alias server itself)
servers.extend(pre_servers);
servers.sort_unstable();
servers.dedup();
// shuffle list of servers randomly after sort and dedupe
servers.shuffle(&mut rand::thread_rng());
// insert our server as the very first choice if in list, else check if we can
// prefer the room alias server first
match servers
.iter()
.position(|server_name| services.globals.server_is_ours(server_name))
{
| Some(server_index) => {
servers.swap_remove(server_index);
servers.insert(0, services.globals.server_name().to_owned());
},
| _ => {
match servers
.iter()
.position(|server| server == room_alias.server_name())
{
| Some(alias_server_index) => {
servers.swap_remove(alias_server_index);
servers.insert(0, room_alias.server_name().into());
},
| _ => {},
}
},
}
servers
}

View File

@@ -6,6 +6,7 @@
Err, Result, err,
utils::{self, content_disposition::make_content_disposition, math::ruma_from_usize},
};
use conduwuit_core::error;
use conduwuit_service::{
Services,
media::{CACHE_CONTROL_IMMUTABLE, CORP_CROSS_ORIGIN, Dim, FileMeta, MXC_LENGTH},
@@ -144,12 +145,22 @@ pub(crate) async fn get_content_route(
server_name: &body.server_name,
media_id: &body.media_id,
};
let FileMeta {
content,
content_type,
content_disposition,
} = fetch_file(&services, &mxc, user, body.timeout_ms, None).await?;
} = match fetch_file(&services, &mxc, user, body.timeout_ms, None).await {
| Ok(meta) => meta,
| Err(conduwuit::Error::Io(e)) => match e.kind() {
| std::io::ErrorKind::NotFound => return Err!(Request(NotFound("Media not found."))),
| std::io::ErrorKind::PermissionDenied => {
error!("Permission denied when trying to read file: {e:?}");
return Err!(Request(Unknown("Unknown error when fetching file.")));
},
| _ => return Err!(Request(Unknown("Unknown error when fetching file."))),
},
| Err(_) => return Err!(Request(Unknown("Unknown error when fetching file."))),
};
Ok(get_content::v1::Response {
file: content.expect("entire file contents"),
@@ -185,7 +196,18 @@ pub(crate) async fn get_content_as_filename_route(
content,
content_type,
content_disposition,
} = fetch_file(&services, &mxc, user, body.timeout_ms, Some(&body.filename)).await?;
} = match fetch_file(&services, &mxc, user, body.timeout_ms, None).await {
| Ok(meta) => meta,
| Err(conduwuit::Error::Io(e)) => match e.kind() {
| std::io::ErrorKind::NotFound => return Err!(Request(NotFound("Media not found."))),
| std::io::ErrorKind::PermissionDenied => {
error!("Permission denied when trying to read file: {e:?}");
return Err!(Request(Unknown("Unknown error when fetching file.")));
},
| _ => return Err!(Request(Unknown("Unknown error when fetching file."))),
},
| Err(_) => return Err!(Request(Unknown("Unknown error when fetching file."))),
};
Ok(get_content_as_filename::v1::Response {
file: content.expect("entire file contents"),

View File

@@ -198,11 +198,7 @@ pub(crate) async fn join_room_by_id_or_alias_route(
(servers, room_id)
},
| Err(room_alias) => {
let (room_id, mut servers) = services
.rooms
.alias
.resolve_alias(&room_alias, Some(body.via.clone()))
.await?;
let (room_id, mut servers) = services.rooms.alias.resolve_alias(&room_alias).await?;
banned_room_check(
&services,

View File

@@ -102,11 +102,7 @@ pub(crate) async fn knock_room_route(
(servers, room_id)
},
| Err(room_alias) => {
let (room_id, mut servers) = services
.rooms
.alias
.resolve_alias(&room_alias, Some(body.via.clone()))
.await?;
let (room_id, mut servers) = services.rooms.alias.resolve_alias(&room_alias).await?;
banned_room_check(
&services,

View File

@@ -4,7 +4,6 @@
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Event, Result, debug_info, info, matrix::pdu::PduEvent, utils::ReadyExt};
use conduwuit_service::Services;
use rand::Rng;
use ruma::{
EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
api::client::{
@@ -244,7 +243,7 @@ fn build_report(report: Report) -> RoomMessageEventContent {
/// random delay sending a response per spec suggestion regarding
/// enumerating for potential events existing in our server.
async fn delay_response() {
let time_to_wait = rand::thread_rng().gen_range(2..5);
let time_to_wait = rand::random_range(2..5);
debug_info!(
"Got successful /report request, waiting {time_to_wait} seconds before sending \
successful response."

View File

@@ -50,8 +50,8 @@ pub(crate) async fn send_message_event_route(
// Check if this is a new transaction id
if let Ok(response) = services
.transaction_ids
.existing_txnid(sender_user, sender_device, &body.txn_id)
.transactions
.get_client_txn(sender_user, sender_device, &body.txn_id)
.await
{
// The client might have sent a txnid of the /sendToDevice endpoint
@@ -92,7 +92,7 @@ pub(crate) async fn send_message_event_route(
)
.await?;
services.transaction_ids.add_txnid(
services.transactions.add_client_txnid(
sender_user,
sender_device,
&body.txn_id,

View File

@@ -342,10 +342,10 @@ async fn allowed_to_send_state_event(
}
for alias in aliases {
let (alias_room_id, _servers) = services
let (alias_room_id, _) = services
.rooms
.alias
.resolve_alias(&alias, None)
.resolve_alias(&alias)
.await
.map_err(|e| {
err!(Request(Unknown("Failed resolving alias \"{alias}\": {e}")))

View File

@@ -26,8 +26,8 @@ pub(crate) async fn send_event_to_device_route(
// Check if this is a new transaction id
if services
.transaction_ids
.existing_txnid(sender_user, sender_device, &body.txn_id)
.transactions
.get_client_txn(sender_user, sender_device, &body.txn_id)
.await
.is_ok()
{
@@ -104,8 +104,8 @@ pub(crate) async fn send_event_to_device_route(
// Save transaction id with empty data
services
.transaction_ids
.add_txnid(sender_user, sender_device, &body.txn_id, &[]);
.transactions
.add_client_txnid(sender_user, sender_device, &body.txn_id, &[]);
Ok(send_event_to_device::v3::Response {})
}

View File

@@ -122,23 +122,23 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
// Ruma doesn't have support for multiple paths for a single endpoint yet, and these routes
// share one Ruma request / response type pair with {get,send}_state_event_for_key_route
.route(
"/_matrix/client/r0/rooms/:room_id/state/:event_type",
"/_matrix/client/r0/rooms/{room_id}/state/{event_type}",
get(client::get_state_events_for_empty_key_route)
.put(client::send_state_event_for_empty_key_route),
)
.route(
"/_matrix/client/v3/rooms/:room_id/state/:event_type",
"/_matrix/client/v3/rooms/{room_id}/state/{event_type}",
get(client::get_state_events_for_empty_key_route)
.put(client::send_state_event_for_empty_key_route),
)
// These two endpoints allow trailing slashes
.route(
"/_matrix/client/r0/rooms/:room_id/state/:event_type/",
"/_matrix/client/r0/rooms/{room_id}/state/{event_type}/",
get(client::get_state_events_for_empty_key_route)
.put(client::send_state_event_for_empty_key_route),
)
.route(
"/_matrix/client/v3/rooms/:room_id/state/:event_type/",
"/_matrix/client/v3/rooms/{room_id}/state/{event_type}/",
get(client::get_state_events_for_empty_key_route)
.put(client::send_state_event_for_empty_key_route),
)
@@ -177,7 +177,7 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
.ruma_route(&client::get_mutual_rooms_route)
.ruma_route(&client::get_room_summary)
.route(
"/_matrix/client/unstable/im.nheko.summary/rooms/:room_id_or_alias/summary",
"/_matrix/client/unstable/im.nheko.summary/rooms/{room_id_or_alias}/summary",
get(client::get_room_summary_legacy)
)
.ruma_route(&client::get_suspended_status)
@@ -196,7 +196,7 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
.ruma_route(&server::get_server_version_route)
.route("/_matrix/key/v2/server", get(server::get_server_keys_route))
.route(
"/_matrix/key/v2/server/:key_id",
"/_matrix/key/v2/server/{key_id}",
get(server::get_server_keys_deprecated_route),
)
.ruma_route(&server::get_public_rooms_route)
@@ -232,9 +232,9 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
.route("/_continuwuity/local_user_count", get(client::conduwuit_local_user_count));
} else {
router = router
.route("/_matrix/federation/*path", any(federation_disabled))
.route("/_matrix/federation/{*path}", any(federation_disabled))
.route("/.well-known/matrix/server", any(federation_disabled))
.route("/_matrix/key/*path", any(federation_disabled))
.route("/_matrix/key/{*path}", any(federation_disabled))
.route("/_conduwuit/local_user_count", any(federation_disabled))
.route("/_continuwuity/local_user_count", any(federation_disabled));
}
@@ -253,27 +253,27 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
get(client::get_media_preview_legacy_legacy_route),
)
.route(
"/_matrix/media/v1/download/:server_name/:media_id",
"/_matrix/media/v1/download/{server_name}/{media_id}",
get(client::get_content_legacy_legacy_route),
)
.route(
"/_matrix/media/v1/download/:server_name/:media_id/:file_name",
"/_matrix/media/v1/download/{server_name}/{media_id}/{file_name}",
get(client::get_content_as_filename_legacy_legacy_route),
)
.route(
"/_matrix/media/v1/thumbnail/:server_name/:media_id",
"/_matrix/media/v1/thumbnail/{server_name}/{media_id}",
get(client::get_content_thumbnail_legacy_legacy_route),
);
} else {
router = router
.route("/_matrix/media/v1/*path", any(legacy_media_disabled))
.route("/_matrix/media/v1/{*path}", any(legacy_media_disabled))
.route("/_matrix/media/v3/config", any(legacy_media_disabled))
.route("/_matrix/media/v3/download/*path", any(legacy_media_disabled))
.route("/_matrix/media/v3/thumbnail/*path", any(legacy_media_disabled))
.route("/_matrix/media/v3/download/{*path}", any(legacy_media_disabled))
.route("/_matrix/media/v3/thumbnail/{*path}", any(legacy_media_disabled))
.route("/_matrix/media/v3/preview_url", any(redirect_legacy_preview))
.route("/_matrix/media/r0/config", any(legacy_media_disabled))
.route("/_matrix/media/r0/download/*path", any(legacy_media_disabled))
.route("/_matrix/media/r0/thumbnail/*path", any(legacy_media_disabled))
.route("/_matrix/media/r0/download/{*path}", any(legacy_media_disabled))
.route("/_matrix/media/r0/thumbnail/{*path}", any(legacy_media_disabled))
.route("/_matrix/media/r0/preview_url", any(redirect_legacy_preview));
}

View File

@@ -1,6 +1,5 @@
use std::{mem, ops::Deref};
use async_trait::async_trait;
use axum::{body::Body, extract::FromRequest};
use bytes::{BufMut, Bytes, BytesMut};
use conduwuit::{Error, Result, debug, debug_warn, err, trace, utils::string::EMPTY};
@@ -79,7 +78,6 @@ impl<T> Deref for Args<T>
fn deref(&self) -> &Self::Target { &self.body }
}
#[async_trait]
impl<T> FromRequest<State, Body> for Args<T>
where
T: IncomingRequest + Send + Sync + 'static,

View File

@@ -14,7 +14,8 @@
pin_mut,
};
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, OwnedDeviceId, OwnedServerName, OwnedUserId, UserId,
CanonicalJsonObject, CanonicalJsonValue, DeviceId, OwnedDeviceId, OwnedServerName,
OwnedUserId, UserId,
api::{
AuthScheme, IncomingRequest, Metadata,
client::{
@@ -54,7 +55,8 @@ pub(super) async fn auth(
json_body: Option<&CanonicalJsonValue>,
metadata: &Metadata,
) -> Result<Auth> {
let bearer: Option<TypedHeader<Authorization<Bearer>>> = request.parts.extract().await?;
let bearer: Option<TypedHeader<Authorization<Bearer>>> =
request.parts.extract().await.unwrap_or(None);
let token = match &bearer {
| Some(TypedHeader(Authorization(bearer))) => Some(bearer.token()),
| None => request.query.access_token.as_deref(),
@@ -233,10 +235,33 @@ async fn auth_appservice(
return Err!(Request(Exclusive("User is not in namespace.")));
}
// MSC3202/MSC4190: Handle device_id masquerading for appservices.
// The device_id can be provided via `device_id` or
// `org.matrix.msc3202.device_id` query parameter.
let sender_device = if let Some(ref device_id_str) = request.query.device_id {
let device_id: &DeviceId = device_id_str.as_str().into();
// Verify the device exists for this user
if services
.users
.get_device_metadata(&user_id, device_id)
.await
.is_err()
{
return Err!(Request(Forbidden(
"Device does not exist for user or appservice cannot masquerade as this device."
)));
}
Some(device_id.to_owned())
} else {
None
};
Ok(Auth {
origin: None,
sender_user: Some(user_id),
sender_device: None,
sender_device,
appservice_info: Some(*info),
})
}

View File

@@ -11,6 +11,10 @@
pub(super) struct QueryParams {
pub(super) access_token: Option<String>,
pub(super) user_id: Option<String>,
/// Device ID for appservice device masquerading (MSC3202/MSC4190).
/// Can be provided as `device_id` or `org.matrix.msc3202.device_id`.
#[serde(alias = "org.matrix.msc3202.device_id")]
pub(super) device_id: Option<String>,
}
pub(super) struct Request {

View File

@@ -40,7 +40,7 @@ pub(crate) async fn get_room_information_route(
servers.sort_unstable();
servers.dedup();
servers.shuffle(&mut rand::thread_rng());
servers.shuffle(&mut rand::rng());
// insert our server as the very first choice if in list
if let Some(server_index) = servers

View File

@@ -1,27 +1,33 @@
use std::{collections::BTreeMap, net::IpAddr, time::Instant};
use std::{
collections::{BTreeMap, HashMap, HashSet},
net::IpAddr,
time::{Duration, Instant},
};
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Error, Result, debug, debug_warn, err, error,
result::LogErr,
state_res::lexicographical_topological_sort,
trace,
utils::{
IterStream, ReadyExt, millis_since_unix_epoch,
stream::{BroadbandExt, TryBroadbandExt, automatic_width},
},
warn,
};
use conduwuit_service::{
Services,
sending::{EDU_LIMIT, PDU_LIMIT},
};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use http::StatusCode;
use itertools::Itertools;
use ruma::{
CanonicalJsonObject, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, ServerName, UserId,
CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId,
RoomId, ServerName, UserId,
api::{
client::error::ErrorKind,
client::error::{ErrorKind, ErrorKind::LimitExceeded},
federation::transactions::{
edu::{
DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent,
@@ -32,9 +38,16 @@
},
},
events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
int,
serde::Raw,
to_device::DeviceIdOrAllDevices,
uint,
};
use service::transactions::{
FederationTxnState, TransactionError, TxnKey, WrappedTransactionResponse,
};
use tokio::sync::watch::{Receiver, Sender};
use tracing::instrument;
use crate::Ruma;
@@ -44,15 +57,6 @@
/// # `PUT /_matrix/federation/v1/send/{txnId}`
///
/// Push EDUs and PDUs to this server.
#[tracing::instrument(
name = "txn",
level = "debug",
skip_all,
fields(
%client,
origin = body.origin().as_str()
),
)]
pub(crate) async fn send_transaction_message_route(
State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp,
@@ -76,16 +80,73 @@ pub(crate) async fn send_transaction_message_route(
)));
}
let txn_start_time = Instant::now();
trace!(
pdus = body.pdus.len(),
edus = body.edus.len(),
elapsed = ?txn_start_time.elapsed(),
id = %body.transaction_id,
origin = %body.origin(),
"Starting txn",
);
let txn_key = (body.origin().to_owned(), body.transaction_id.clone());
// Atomically check cache, join active, or start new transaction
match services
.transactions
.get_or_start_federation_txn(txn_key.clone())?
{
| FederationTxnState::Cached(response) => {
// Already responded
Ok(response)
},
| FederationTxnState::Active(receiver) => {
// Another thread is processing
wait_for_result(receiver).await
},
| FederationTxnState::Started { receiver, sender } => {
// We're the first, spawn the processing task
services
.server
.runtime()
.spawn(process_inbound_transaction(services, body, client, txn_key, sender));
// and wait for it
wait_for_result(receiver).await
},
}
}
async fn wait_for_result(
mut recv: Receiver<WrappedTransactionResponse>,
) -> Result<send_transaction_message::v1::Response> {
if tokio::time::timeout(Duration::from_secs(50), recv.changed())
.await
.is_err()
{
// Took too long, return 429 to encourage the sender to try again
return Err(Error::BadRequest(
LimitExceeded { retry_after: None },
"Transaction is being still being processed. Please try again later.",
));
}
let value = recv.borrow_and_update();
match value.clone() {
| Some(Ok(response)) => Ok(response),
| Some(Err(err)) => Err(transaction_error_to_response(&err)),
| None => Err(Error::Request(
ErrorKind::Unknown,
"Transaction processing failed unexpectedly".into(),
StatusCode::INTERNAL_SERVER_ERROR,
)),
}
}
#[instrument(
skip_all,
fields(
id = ?body.transaction_id.as_str(),
origin = ?body.origin()
)
)]
async fn process_inbound_transaction(
services: crate::State,
body: Ruma<send_transaction_message::v1::Request>,
client: IpAddr,
txn_key: TxnKey,
sender: Sender<WrappedTransactionResponse>,
) {
let txn_start_time = Instant::now();
let pdus = body
.pdus
.iter()
@@ -102,40 +163,79 @@ pub(crate) async fn send_transaction_message_route(
.filter_map(Result::ok)
.stream();
let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?;
debug!(pdus = body.pdus.len(), edus = body.edus.len(), "Processing transaction",);
let results = match handle(&services, &client, body.origin(), pdus, edus).await {
| Ok(results) => results,
| Err(err) => {
fail_federation_txn(services, &txn_key, &sender, err);
return;
},
};
for (id, result) in &results {
if let Err(e) = result {
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
debug_warn!("Incoming PDU failed {id}: {e:?}");
}
}
}
debug!(
pdus = body.pdus.len(),
edus = body.edus.len(),
elapsed = ?txn_start_time.elapsed(),
id = %body.transaction_id,
origin = %body.origin(),
"Finished txn",
"Finished processing transaction"
);
for (id, result) in &results {
if let Err(e) = result {
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
warn!("Incoming PDU failed {id}: {e:?}");
}
}
}
Ok(send_transaction_message::v1::Response {
let response = send_transaction_message::v1::Response {
pdus: results
.into_iter()
.map(|(e, r)| (e, r.map_err(error::sanitized_message)))
.collect(),
})
};
services
.transactions
.finish_federation_txn(txn_key, sender, response);
}
/// Handles a failed federation transaction by sending the error through
/// the channel and cleaning up the transaction state. This allows waiters to
/// receive an appropriate error response.
fn fail_federation_txn(
services: crate::State,
txn_key: &TxnKey,
sender: &Sender<WrappedTransactionResponse>,
err: TransactionError,
) {
debug!("Transaction failed: {err}");
// Remove from active state so the transaction can be retried
services.transactions.remove_federation_txn(txn_key);
// Send the error to any waiters
if let Err(e) = sender.send(Some(Err(err))) {
debug_warn!("Failed to send transaction error to receivers: {e}");
}
}
/// Converts a TransactionError into an appropriate HTTP error response.
fn transaction_error_to_response(err: &TransactionError) -> Error {
match err {
| TransactionError::ShuttingDown => Error::Request(
ErrorKind::Unknown,
"Server is shutting down, please retry later".into(),
StatusCode::SERVICE_UNAVAILABLE,
),
}
}
async fn handle(
services: &Services,
client: &IpAddr,
origin: &ServerName,
started: Instant,
pdus: impl Stream<Item = Pdu> + Send,
edus: impl Stream<Item = Edu> + Send,
) -> Result<ResolvedMap> {
) -> std::result::Result<ResolvedMap, TransactionError> {
// group pdus by room
let pdus = pdus
.collect()
@@ -152,7 +252,7 @@ async fn handle(
.into_iter()
.try_stream()
.broad_and_then(|(room_id, pdus): (_, Vec<_>)| {
handle_room(services, client, origin, started, room_id, pdus.into_iter())
handle_room(services, client, origin, room_id, pdus.into_iter())
.map_ok(Vec::into_iter)
.map_ok(IterStream::try_stream)
})
@@ -169,14 +269,51 @@ async fn handle(
Ok(results)
}
/// Attempts to build a localised directed acyclic graph out of the given PDUs,
/// returning them in a topologically sorted order.
///
/// This is used to attempt to process PDUs in an order that respects their
/// dependencies, however it is ultimately the sender's responsibility to send
/// them in a processable order, so this is just a best effort attempt. It does
/// not account for power levels or other tie breaks.
async fn build_local_dag(
pdu_map: &HashMap<OwnedEventId, CanonicalJsonObject>,
) -> Result<Vec<OwnedEventId>> {
debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs");
let mut dag: HashMap<OwnedEventId, HashSet<OwnedEventId>> = HashMap::new();
for (event_id, value) in pdu_map {
let prev_events = value
.get("prev_events")
.expect("pdu must have prev_events")
.as_array()
.expect("prev_events must be an array")
.iter()
.map(|v| {
OwnedEventId::parse(v.as_str().expect("prev_events values must be strings"))
.expect("prev_events must be valid event IDs")
})
.collect::<HashSet<OwnedEventId>>();
dag.insert(event_id.clone(), prev_events);
}
lexicographical_topological_sort(&dag, &|_| async {
// Note: we don't bother fetching power levels because that would massively slow
// this function down. This is a best-effort attempt to order events correctly
// for processing, however ultimately that should be the sender's job.
Ok((int!(0), MilliSecondsSinceUnixEpoch(uint!(0))))
})
.await
.map_err(|e| err!("failed to resolve local graph: {e}"))
}
async fn handle_room(
services: &Services,
_client: &IpAddr,
origin: &ServerName,
txn_start_time: Instant,
room_id: OwnedRoomId,
pdus: impl Iterator<Item = Pdu> + Send,
) -> Result<Vec<(OwnedEventId, Result)>> {
) -> std::result::Result<Vec<(OwnedEventId, Result)>, TransactionError> {
let _room_lock = services
.rooms
.event_handler
@@ -185,27 +322,40 @@ async fn handle_room(
.await;
let room_id = &room_id;
pdus.try_stream()
.and_then(|(_, event_id, value)| async move {
services.server.check_running()?;
let pdu_start_time = Instant::now();
let result = services
.rooms
.event_handler
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
.await
.map(|_| ());
debug!(
pdu_elapsed = ?pdu_start_time.elapsed(),
txn_elapsed = ?txn_start_time.elapsed(),
"Finished PDU {event_id}",
);
Ok((event_id, result))
let pdu_map: HashMap<OwnedEventId, CanonicalJsonObject> = pdus
.into_iter()
.map(|(_, event_id, value)| (event_id, value))
.collect();
// Try to sort PDUs by their dependencies, but fall back to arbitrary order on
// failure (e.g., cycles). This is best-effort; proper ordering is the sender's
// responsibility.
let sorted_event_ids = if pdu_map.len() >= 2 {
build_local_dag(&pdu_map).await.unwrap_or_else(|e| {
debug_warn!("Failed to build local DAG for room {room_id}: {e}");
pdu_map.keys().cloned().collect()
})
.try_collect()
.await
} else {
pdu_map.keys().cloned().collect()
};
let mut results = Vec::with_capacity(sorted_event_ids.len());
for event_id in sorted_event_ids {
let value = pdu_map
.get(&event_id)
.expect("sorted event IDs must be from the original map")
.clone();
services
.server
.check_running()
.map_err(|_| TransactionError::ShuttingDown)?;
let result = services
.rooms
.event_handler
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
.await
.map(|_| ());
results.push((event_id, result));
}
Ok(results)
}
async fn handle_edu(services: &Services, client: &IpAddr, origin: &ServerName, edu: Edu) {
@@ -478,8 +628,8 @@ async fn handle_edu_direct_to_device(
// Check if this is a new transaction id
if services
.transaction_ids
.existing_txnid(sender, None, message_id)
.transactions
.get_client_txn(sender, None, message_id)
.await
.is_ok()
{
@@ -498,8 +648,8 @@ async fn handle_edu_direct_to_device(
// Save transaction id with empty data
services
.transaction_ids
.add_txnid(sender, None, message_id, &[]);
.transactions
.add_client_txnid(sender, None, message_id, &[]);
}
async fn handle_edu_direct_to_device_user<Event: Send + Sync>(

View File

@@ -86,6 +86,7 @@ libloading.optional = true
log.workspace = true
num-traits.workspace = true
rand.workspace = true
rand_core = { version = "0.6.4", features = ["getrandom"] }
regex.workspace = true
reqwest.workspace = true
ring.workspace = true

View File

@@ -368,6 +368,31 @@ pub struct Config {
#[serde(default = "default_max_fetch_prev_events")]
pub max_fetch_prev_events: u16,
/// How many incoming federation transactions the server is willing to be
/// processing at any given time before it becomes overloaded and starts
/// rejecting further transactions until some slots become available.
///
/// Setting this value too low or too high may result in unstable
/// federation, and setting it too high may cause runaway resource usage.
///
/// default: 150
#[serde(default = "default_max_concurrent_inbound_transactions")]
pub max_concurrent_inbound_transactions: usize,
/// Maximum age (in seconds) for cached federation transaction responses.
/// Entries older than this will be removed during cleanup.
///
/// default: 7200 (2 hours)
#[serde(default = "default_transaction_id_cache_max_age_secs")]
pub transaction_id_cache_max_age_secs: u64,
/// Maximum number of cached federation transaction responses.
/// When the cache exceeds this limit, older entries will be removed.
///
/// default: 8192
#[serde(default = "default_transaction_id_cache_max_entries")]
pub transaction_id_cache_max_entries: usize,
/// Default/base connection timeout (seconds). This is used only by URL
/// previews and update/news endpoint checks.
///
@@ -1244,12 +1269,6 @@ pub struct Config {
#[serde(default)]
pub rocksdb_repair: bool,
#[serde(default)]
pub rocksdb_read_only: bool,
#[serde(default)]
pub rocksdb_secondary: bool,
/// Enables idle CPU priority for compaction thread. This is not enabled by
/// default to prevent compaction from falling too far behind on busy
/// systems.
@@ -1309,26 +1328,33 @@ pub struct Config {
/// Allow local (your server only) presence updates/requests.
///
/// Note that presence on continuwuity is very fast unlike Synapse's. If
/// using outgoing presence, this MUST be enabled.
/// Local presence must be enabled for outgoing presence to function.
///
/// Note that local presence is not as heavy on the CPU as federated
/// presence, but will still become more expensive the more local users you
/// have.
#[serde(default = "true_fn")]
pub allow_local_presence: bool,
/// Allow incoming federated presence updates/requests.
/// Allow incoming federated presence updates.
///
/// This option receives presence updates from other servers, but does not
/// send any unless `allow_outgoing_presence` is true. Note that presence on
/// continuwuity is very fast unlike Synapse's.
/// This option enables processing inbound presence updates from other
/// servers. Without it, remote users will appear as if they are always
/// offline to your local users. This does not affect typing indicators or
/// read receipts.
#[serde(default = "true_fn")]
pub allow_incoming_presence: bool,
/// Allow outgoing presence updates/requests.
///
/// This option sends presence updates to other servers, but does not
/// receive any unless `allow_incoming_presence` is true. Note that presence
/// on continuwuity is very fast unlike Synapse's. If using outgoing
/// presence, you MUST enable `allow_local_presence` as well.
#[serde(default = "true_fn")]
/// This option sends presence updates to other servers, and requires that
/// `allow_local_presence` is also enabled.
///
/// Note that outgoing presence is very heavy on the CPU and network, and
/// will typically cause extreme strain and slowdowns for no real benefit.
/// There are only a few clients that even implement presence, so you
/// probably don't want to enable this.
#[serde(default)]
pub allow_outgoing_presence: bool,
/// How many seconds without presence updates before you become idle.
@@ -1366,6 +1392,10 @@ pub struct Config {
pub allow_incoming_read_receipts: bool,
/// Allow sending read receipts to remote servers.
///
/// Note that sending read receipts to remote servers in large rooms with
/// lots of other homeservers may cause additional strain on the CPU and
/// network.
#[serde(default = "true_fn")]
pub allow_outgoing_read_receipts: bool,
@@ -1377,6 +1407,10 @@ pub struct Config {
pub allow_local_typing: bool,
/// Allow outgoing typing updates to federation.
///
/// Note that sending typing indicators to remote servers in large rooms
/// with lots of other homeservers may cause additional strain on the CPU
/// and network.
#[serde(default = "true_fn")]
pub allow_outgoing_typing: bool,
@@ -1516,7 +1550,7 @@ pub struct Config {
/// sender user's server name, inbound federation X-Matrix origin, and
/// outbound federation handler.
///
/// You can set this to ["*"] to block all servers by default, and then
/// You can set this to [".*"] to block all servers by default, and then
/// use `allowed_remote_server_names` to allow only specific servers.
///
/// example: ["badserver\\.tld$", "badphrase", "19dollarfortnitecards"]
@@ -2531,6 +2565,12 @@ fn default_pusher_idle_timeout() -> u64 { 15 }
fn default_max_fetch_prev_events() -> u16 { 192_u16 }
fn default_max_concurrent_inbound_transactions() -> usize { 150 }
fn default_transaction_id_cache_max_age_secs() -> u64 { 60 * 60 * 2 }
fn default_transaction_id_cache_max_entries() -> usize { 8192 }
fn default_tracing_flame_filter() -> String {
cfg!(debug_assertions)
.then_some("trace,h2=off")

View File

@@ -14,6 +14,7 @@
static VERSION: OnceLock<String> = OnceLock::new();
static VERSION_UA: OnceLock<String> = OnceLock::new();
static USER_AGENT: OnceLock<String> = OnceLock::new();
static USER_AGENT_MEDIA: OnceLock<String> = OnceLock::new();
#[inline]
#[must_use]
@@ -21,14 +22,22 @@ pub fn name() -> &'static str { BRANDING }
#[inline]
pub fn version() -> &'static str { VERSION.get_or_init(init_version) }
#[inline]
pub fn version_ua() -> &'static str { VERSION_UA.get_or_init(init_version_ua) }
#[inline]
pub fn user_agent() -> &'static str { USER_AGENT.get_or_init(init_user_agent) }
#[inline]
pub fn user_agent_media() -> &'static str { USER_AGENT_MEDIA.get_or_init(init_user_agent_media) }
fn init_user_agent() -> String { format!("{}/{} (bot; +{WEBSITE})", name(), version_ua()) }
fn init_user_agent_media() -> String {
format!("{}/{} (embedbot; facebookexternalhit/1.1; +{WEBSITE})", name(), version_ua())
}
fn init_version_ua() -> String {
conduwuit_build_metadata::version_tag()
.map_or_else(|| SEMANTIC.to_owned(), |extra| format!("{SEMANTIC}+{extra}"))

View File

@@ -1046,7 +1046,7 @@ async fn test_event_sort() {
// don't remove any events so we know it sorts them all correctly
let mut events_to_sort = events.keys().cloned().collect::<Vec<_>>();
events_to_sort.shuffle(&mut rand::thread_rng());
events_to_sort.shuffle(&mut rand::rng());
let power_level = resolved_power
.get(&(StateEventType::RoomPowerLevels, "".into()))

View File

@@ -28,7 +28,7 @@ fn init_argon() -> Argon2<'static> {
}
pub(super) fn password(password: &str) -> Result<String> {
let salt = SaltString::generate(rand::thread_rng());
let salt = SaltString::generate(rand_core::OsRng);
ARGON
.get_or_init(init_argon)
.hash_password(password.as_bytes(), &salt)

View File

@@ -4,16 +4,16 @@
};
use arrayvec::ArrayString;
use rand::{Rng, seq::SliceRandom, thread_rng};
use rand::{RngExt, seq::SliceRandom};
pub fn shuffle<T>(vec: &mut [T]) {
let mut rng = thread_rng();
let mut rng = rand::rng();
vec.shuffle(&mut rng);
}
pub fn string(length: usize) -> String {
thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
rand::rng()
.sample_iter(&rand::distr::Alphanumeric)
.take(length)
.map(char::from)
.collect()
@@ -22,8 +22,8 @@ pub fn string(length: usize) -> String {
#[inline]
pub fn string_array<const LENGTH: usize>() -> ArrayString<LENGTH> {
let mut ret = ArrayString::<LENGTH>::new();
thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
rand::rng()
.sample_iter(&rand::distr::Alphanumeric)
.take(LENGTH)
.map(char::from)
.for_each(|c| ret.push(c));
@@ -40,7 +40,4 @@ pub fn time_from_now_secs(range: Range<u64>) -> SystemTime {
}
#[must_use]
pub fn secs(range: Range<u64>) -> Duration {
let mut rng = thread_rng();
Duration::from_secs(rng.gen_range(range))
}
pub fn secs(range: Range<u64>) -> Duration { Duration::from_secs(rand::random_range(range)) }

View File

@@ -3,19 +3,17 @@
stream::{Stream, TryStream},
};
use crate::{Error, Result};
pub trait IterStream<I: IntoIterator + Send> {
/// Convert an Iterator into a Stream
fn stream(self) -> impl Stream<Item = <I as IntoIterator>::Item> + Send;
/// Convert an Iterator into a TryStream
fn try_stream(
/// Convert an Iterator into a TryStream with a generic error type
fn try_stream<E>(
self,
) -> impl TryStream<
Ok = <I as IntoIterator>::Item,
Error = Error,
Item = Result<<I as IntoIterator>::Item, Error>,
Error = E,
Item = Result<<I as IntoIterator>::Item, E>,
> + Send;
}
@@ -28,12 +26,12 @@ impl<I> IterStream<I> for I
fn stream(self) -> impl Stream<Item = <I as IntoIterator>::Item> + Send { stream::iter(self) }
#[inline]
fn try_stream(
fn try_stream<E>(
self,
) -> impl TryStream<
Ok = <I as IntoIterator>::Item,
Error = Error,
Item = Result<<I as IntoIterator>::Item, Error>,
Error = E,
Item = Result<<I as IntoIterator>::Item, E>,
> + Send {
self.stream().map(Ok)
}

View File

@@ -1,9 +1,10 @@
//! Synchronous combinator extensions to futures::TryStream
use std::result::Result;
use futures::{TryFuture, TryStream, TryStreamExt};
use super::automatic_width;
use crate::Result;
/// Concurrency extensions to augment futures::TryStreamExt. broad_ combinators
/// produce out-of-order

View File

@@ -33,8 +33,6 @@ pub struct Engine {
pub(crate) db: Db,
pub(crate) pool: Arc<Pool>,
pub(crate) ctx: Arc<Context>,
pub(super) read_only: bool,
pub(super) secondary: bool,
pub(crate) checksums: bool,
corks: AtomicU32,
}
@@ -129,14 +127,6 @@ pub fn current_sequence(&self) -> u64 {
sequence
}
#[inline]
#[must_use]
pub fn is_read_only(&self) -> bool { self.secondary || self.read_only }
#[inline]
#[must_use]
pub fn is_secondary(&self) -> bool { self.secondary }
}
impl Drop for Engine {

View File

@@ -12,9 +12,8 @@ pub fn backup(&self) -> Result {
let mut engine = self.backup_engine()?;
let config = &self.ctx.server.config;
if config.database_backups_to_keep > 0 {
let flush = !self.is_read_only();
engine
.create_new_backup_flush(&self.db, flush)
.create_new_backup_flush(&self.db, true)
.map_err(map_err)?;
let engine_info = engine.get_backup_info();

View File

@@ -35,14 +35,7 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
}
debug!("Opening database...");
let db = if config.rocksdb_read_only {
Db::open_cf_descriptors_read_only(&db_opts, path, cfds, false)
} else if config.rocksdb_secondary {
Db::open_cf_descriptors_as_secondary(&db_opts, path, path, cfds)
} else {
Db::open_cf_descriptors(&db_opts, path, cfds)
}
.or_else(or_else)?;
let db = Db::open_cf_descriptors(&db_opts, path, cfds).or_else(or_else)?;
info!(
columns = num_cfds,
@@ -55,8 +48,6 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
db,
pool: ctx.pool.clone(),
ctx: ctx.clone(),
read_only: config.rocksdb_read_only,
secondary: config.rocksdb_secondary,
checksums: config.rocksdb_checksums,
corks: AtomicU32::new(0),
}))

View File

@@ -74,14 +74,6 @@ pub fn iter(&self) -> impl Iterator<Item = (&MapsKey, &MapsVal)> + Send + '_ {
#[inline]
pub fn keys(&self) -> impl Iterator<Item = &MapsKey> + Send + '_ { self.maps.keys() }
#[inline]
#[must_use]
pub fn is_read_only(&self) -> bool { self.db.is_read_only() }
#[inline]
#[must_use]
pub fn is_secondary(&self) -> bool { self.db.is_secondary() }
}
impl Index<&str> for Database {

View File

@@ -27,10 +27,6 @@ pub struct Args {
#[arg(long, short('O'))]
pub option: Vec<String>,
/// Run in a stricter read-only --maintenance mode.
#[arg(long)]
pub read_only: bool,
/// Run in maintenance mode while refusing connections.
#[arg(long)]
pub maintenance: bool,
@@ -143,11 +139,7 @@ pub(crate) fn parse() -> Args { Args::parse() }
/// Synthesize any command line options with configuration file options.
pub(crate) fn update(mut config: Figment, args: &Args) -> Result<Figment> {
if args.read_only {
config = config.join(("rocksdb_read_only", true));
}
if args.maintenance || args.read_only {
if args.maintenance {
config = config.join(("startup_netburst", false));
config = config.join(("listening", false));
}

View File

@@ -20,7 +20,6 @@
use async_trait::async_trait;
use conduwuit::{Result, Server, debug, error, warn};
use database::{Deserialized, Map};
use rand::Rng;
use ruma::events::{Mentions, room::message::RoomMessageEventContent};
use serde::Deserialize;
use tokio::{
@@ -100,8 +99,7 @@ async fn worker(self: Arc<Self>) -> Result<()> {
}
let first_check_jitter = {
let mut rng = rand::thread_rng();
let jitter_percent = rng.gen_range(-50.0..=10.0);
let jitter_percent = rand::random_range(-50.0..=10.0);
self.interval.mul_f64(1.0 + jitter_percent / 100.0)
};

View File

@@ -39,7 +39,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let url_preview_user_agent = config
.url_preview_user_agent
.clone()
.unwrap_or_else(|| conduwuit::version::user_agent().to_owned());
.unwrap_or_else(|| conduwuit::version::user_agent_media().to_owned());
Ok(Arc::new(Self {
default: base(config)?

View File

@@ -37,10 +37,6 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
}
async fn worker(self: Arc<Self>) -> Result {
if self.services.globals.is_read_only() {
return Ok(());
}
if self.services.config.ldap.enable {
warn!("emergency password feature not available with LDAP enabled.");
return Ok(());

View File

@@ -156,7 +156,4 @@ pub fn user_is_local(&self, user_id: &UserId) -> bool {
pub fn server_is_ours(&self, server_name: &ServerName) -> bool {
server_name == self.server_name()
}
#[inline]
pub fn is_read_only(&self) -> bool { self.db.db.is_read_only() }
}

View File

@@ -170,6 +170,8 @@ pub(super) fn remove_url_preview(&self, url: &str) -> Result<()> {
Ok(())
}
pub(super) async fn clear_url_previews(&self) { self.url_previews.clear().await; }
pub(super) fn set_url_preview(
&self,
url: &str,

View File

@@ -37,6 +37,9 @@ pub async fn remove_url_preview(&self, url: &str) -> Result<()> {
self.db.remove_url_preview(url)
}
#[implement(Service)]
pub async fn clear_url_previews(&self) { self.db.clear_url_previews().await; }
#[implement(Service)]
pub async fn set_url_preview(&self, url: &str, data: &UrlPreviewData) -> Result<()> {
let now = SystemTime::now()

View File

@@ -31,7 +31,7 @@
pub mod sending;
pub mod server_keys;
pub mod sync;
pub mod transaction_ids;
pub mod transactions;
pub mod uiaa;
pub mod users;

View File

@@ -3,7 +3,7 @@
use std::sync::Arc;
use conduwuit::{
Err, Event, Result, Server, err,
Err, Event, Result, err,
utils::{ReadyExt, stream::TryIgnore},
};
use database::{Deserialized, Ignore, Interfix, Map};
@@ -30,12 +30,12 @@ struct Data {
}
struct Services {
server: Arc<Server>,
admin: Dep<admin::Service>,
appservice: Dep<appservice::Service>,
globals: Dep<globals::Service>,
sending: Dep<sending::Service>,
state_accessor: Dep<rooms::state_accessor::Service>,
state_cache: Dep<rooms::state_cache::Service>,
}
impl crate::Service for Service {
@@ -47,13 +47,13 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
aliasid_alias: args.db["aliasid_alias"].clone(),
},
services: Services {
server: args.server.clone(),
admin: args.depend::<admin::Service>("admin"),
appservice: args.depend::<appservice::Service>("appservice"),
globals: args.depend::<globals::Service>("globals"),
sending: args.depend::<sending::Service>("sending"),
state_accessor: args
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
},
}))
}
@@ -117,6 +117,9 @@ pub async fn remove_alias(&self, alias: &RoomAliasId, user_id: &UserId) -> Resul
Ok(())
}
/// Resolves the given room ID or alias, returning the resolved room ID.
/// Unlike resolve_with_servers (the underlying call), potential resident
/// servers are not returned
#[inline]
pub async fn resolve(&self, room: &RoomOrAliasId) -> Result<OwnedRoomId> {
self.resolve_with_servers(room, None)
@@ -124,6 +127,14 @@ pub async fn resolve(&self, room: &RoomOrAliasId) -> Result<OwnedRoomId> {
.map(|(room_id, _)| room_id)
}
/// Resolves the given room ID or alias, returning the resolved room ID, and
/// any servers that might be able to assist in fetching room data.
///
/// If the input is a room ID, this simply returns it and <servers>.
/// If the input is an alias, this attempts to resolve it locally, then via
/// appservices, and finally remotely if the alias is not local.
/// If the alias is successfully resolved, the room ID and an empty list of
/// servers is returned.
pub async fn resolve_with_servers(
&self,
room: &RoomOrAliasId,
@@ -134,28 +145,26 @@ pub async fn resolve_with_servers(
Ok((room_id.to_owned(), servers.unwrap_or_default()))
} else {
let alias: &RoomAliasId = room.try_into().expect("valid RoomAliasId");
self.resolve_alias(alias, servers).await
self.resolve_alias(alias).await
}
}
/// Resolves the given room alias, returning the resolved room ID and any
/// servers that might be in the room.
#[tracing::instrument(skip(self), name = "resolve")]
pub async fn resolve_alias(
&self,
room_alias: &RoomAliasId,
servers: Option<Vec<OwnedServerName>>,
) -> Result<(OwnedRoomId, Vec<OwnedServerName>)> {
let server_name = room_alias.server_name();
let server_is_ours = self.services.globals.server_is_ours(server_name);
let servers_contains_ours = || {
servers
.as_ref()
.is_some_and(|servers| servers.contains(&self.services.server.name))
};
let server_is_ours = self
.services
.globals
.server_is_ours(room_alias.server_name());
if !server_is_ours && !servers_contains_ours() {
return self
.remote_resolve(room_alias, servers.unwrap_or_default())
.await;
if !server_is_ours {
// TODO: The spec advises servers may cache remote room aliases temporarily.
// We might want to look at doing that.
return self.remote_resolve(room_alias).await;
}
let room_id = match self.resolve_local_alias(room_alias).await {
@@ -163,10 +172,18 @@ pub async fn resolve_alias(
| Err(_) => self.resolve_appservice_alias(room_alias).await?,
};
room_id.map_or_else(
|| Err!(Request(NotFound("Room with alias not found."))),
|room_id| Ok((room_id, Vec::new())),
)
if let Some(room_id) = room_id {
let servers: Vec<OwnedServerName> = self
.services
.state_cache
.room_servers(&room_id)
.map(ToOwned::to_owned)
.collect()
.await;
return Ok((room_id, servers));
}
Err!(Request(NotFound("Alias does not exist.")))
}
#[tracing::instrument(skip(self), level = "debug")]
@@ -206,12 +223,12 @@ async fn user_can_remove_alias(&self, alias: &RoomAliasId, user_id: &UserId) ->
// The creator of an alias can remove it
if self
.who_created_alias(alias).await
.is_ok_and(|user| user == user_id)
// Server admins can remove any local alias
|| self.services.admin.user_is_admin(user_id).await
// Always allow the server service account to remove the alias, since there may not be an admin room
|| server_user == user_id
.who_created_alias(alias).await
.is_ok_and(|user| user == user_id)
// Server admins can remove any local alias
|| self.services.admin.user_is_admin(user_id).await
// Always allow the server service account to remove the alias, since there may not be an admin room
|| server_user == user_id
{
return Ok(true);
}

View File

@@ -1,6 +1,4 @@
use std::iter::once;
use conduwuit::{Result, debug, debug_error, err, implement};
use conduwuit::{Result, debug, error, implement};
use federation::query::get_room_information::v1::Response;
use ruma::{OwnedRoomId, OwnedServerName, RoomAliasId, ServerName, api::federation};
@@ -8,40 +6,21 @@
pub(super) async fn remote_resolve(
&self,
room_alias: &RoomAliasId,
servers: Vec<OwnedServerName>,
) -> Result<(OwnedRoomId, Vec<OwnedServerName>)> {
debug!(?room_alias, servers = ?servers, "resolve");
let servers = once(room_alias.server_name())
.map(ToOwned::to_owned)
.chain(servers.into_iter());
let mut resolved_servers = Vec::new();
let mut resolved_room_id: Option<OwnedRoomId> = None;
for server in servers {
match self.remote_request(room_alias, &server).await {
| Err(e) => debug_error!("Failed to query for {room_alias:?} from {server}: {e}"),
| Ok(Response { room_id, servers }) => {
debug!(
"Server {server} answered with {room_id:?} for {room_alias:?} servers: \
{servers:?}"
);
resolved_room_id.get_or_insert(room_id);
add_server(&mut resolved_servers, server);
if !servers.is_empty() {
add_servers(&mut resolved_servers, servers);
break;
}
},
}
debug!("Asking {} to resolve {room_alias:?}", room_alias.server_name());
match self
.remote_request(room_alias, room_alias.server_name())
.await
{
| Err(e) => {
error!("Unable to resolve remote room alias {}: {e}", room_alias);
Err(e)
},
| Ok(Response { room_id, servers }) => {
debug!("Remote resolved {room_alias:?} to {room_id:?} with servers {servers:?}");
Ok((room_id, servers))
},
}
resolved_room_id
.map(|room_id| (room_id, resolved_servers))
.ok_or_else(|| {
err!(Request(NotFound("No servers could assist in resolving the room alias")))
})
}
#[implement(super::Service)]
@@ -59,15 +38,3 @@ async fn remote_request(
.send_federation_request(server, request)
.await
}
fn add_servers(servers: &mut Vec<OwnedServerName>, new: Vec<OwnedServerName>) {
for server in new {
add_server(servers, server);
}
}
fn add_server(servers: &mut Vec<OwnedServerName>, server: OwnedServerName) {
if !servers.contains(&server) {
servers.push(server);
}
}

View File

@@ -142,7 +142,7 @@ async fn get_auth_chain_outer(
let chunk_cache: Vec<_> = chunk
.into_iter()
.try_stream()
.try_stream::<conduwuit::Error>()
.broad_and_then(|(shortid, event_id)| async move {
if let Ok(cached) = self.get_cached_eventid_authchain(&[shortid]).await {
return Ok(cached.to_vec());

View File

@@ -139,7 +139,12 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
})
.boxed();
let mut federated_room = false;
while let Some(ref backfill_server) = servers.next().await {
if !self.services.globals.server_is_ours(backfill_server) {
federated_room = true;
}
info!("Asking {backfill_server} for backfill in {room_id}");
let response = self
.services
@@ -168,7 +173,9 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
}
}
warn!("No servers could backfill, but backfill was needed in room {room_id}");
if federated_room {
warn!("No servers could backfill, but backfill was needed in room {room_id}");
}
Ok(())
}

View File

@@ -385,11 +385,13 @@ fn num_senders(args: &crate::Args<'_>) -> usize {
const MIN_SENDERS: usize = 1;
// Limit the number of senders to the number of workers threads or number of
// cores, conservatively.
let max_senders = args
.server
.metrics
.num_workers()
.min(available_parallelism());
let mut max_senders = args.server.metrics.num_workers();
// Work around some platforms not returning the number of cores.
let num_cores = available_parallelism();
if num_cores > 0 {
max_senders = max_senders.min(num_cores);
}
// If the user doesn't override the default 0, this is intended to then default
// to 1 for now as multiple senders is experimental.

View File

@@ -14,7 +14,7 @@
media, moderation, presence, pusher, registration_tokens, resolver, rooms, sending,
server_keys,
service::{self, Args, Map, Service},
sync, transaction_ids, uiaa, users,
sync, transactions, uiaa, users,
};
pub struct Services {
@@ -37,7 +37,7 @@ pub struct Services {
pub sending: Arc<sending::Service>,
pub server_keys: Arc<server_keys::Service>,
pub sync: Arc<sync::Service>,
pub transaction_ids: Arc<transaction_ids::Service>,
pub transactions: Arc<transactions::Service>,
pub uiaa: Arc<uiaa::Service>,
pub users: Arc<users::Service>,
pub moderation: Arc<moderation::Service>,
@@ -110,7 +110,7 @@ macro_rules! build {
sending: build!(sending::Service),
server_keys: build!(server_keys::Service),
sync: build!(sync::Service),
transaction_ids: build!(transaction_ids::Service),
transactions: build!(transactions::Service),
uiaa: build!(uiaa::Service),
users: build!(users::Service),
moderation: build!(moderation::Service),
@@ -139,7 +139,7 @@ pub async fn start(self: &Arc<Self>) -> Result<Arc<Self>> {
// reset dormant online/away statuses to offline, and set the server user as
// online
if self.server.config.allow_local_presence && !self.db.is_read_only() {
if self.server.config.allow_local_presence {
self.presence.unset_all_presence().await;
_ = self
.presence
@@ -156,7 +156,7 @@ pub async fn stop(&self) {
info!("Shutting down services...");
// set the server user as offline
if self.server.config.allow_local_presence && !self.db.is_read_only() {
if self.server.config.allow_local_presence {
_ = self
.presence
.ping_presence(&self.globals.server_user, &ruma::presence::PresenceState::Offline)

View File

@@ -1,54 +0,0 @@
use std::sync::Arc;
use conduwuit::{Result, implement};
use database::{Handle, Map};
use ruma::{DeviceId, TransactionId, UserId};
pub struct Service {
db: Data,
}
struct Data {
userdevicetxnid_response: Arc<Map>,
}
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data {
userdevicetxnid_response: args.db["userdevicetxnid_response"].clone(),
},
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
#[implement(Service)]
pub fn add_txnid(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &TransactionId,
data: &[u8],
) {
let mut key = user_id.as_bytes().to_vec();
key.push(0xFF);
key.extend_from_slice(device_id.map(DeviceId::as_bytes).unwrap_or_default());
key.push(0xFF);
key.extend_from_slice(txn_id.as_bytes());
self.db.userdevicetxnid_response.insert(&key, data);
}
// If there's no entry, this is a new transaction
#[implement(Service)]
pub async fn existing_txnid(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &TransactionId,
) -> Result<Handle<'_>> {
let key = (user_id, device_id, txn_id);
self.db.userdevicetxnid_response.qry(&key).await
}

View File

@@ -0,0 +1,326 @@
use std::{
collections::HashMap,
fmt,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::{Duration, SystemTime},
};
use async_trait::async_trait;
use conduwuit::{Error, Result, SyncRwLock, debug_warn, warn};
use database::{Handle, Map};
use ruma::{
DeviceId, OwnedServerName, OwnedTransactionId, TransactionId, UserId,
api::{
client::error::ErrorKind::LimitExceeded,
federation::transactions::send_transaction_message,
},
};
use tokio::sync::watch::{Receiver, Sender};
use crate::{Dep, config};
pub type TxnKey = (OwnedServerName, OwnedTransactionId);
pub type WrappedTransactionResponse =
Option<Result<send_transaction_message::v1::Response, TransactionError>>;
/// Errors that can occur during federation transaction processing.
#[derive(Debug, Clone)]
pub enum TransactionError {
/// Server is shutting down - the sender should retry the entire
/// transaction.
ShuttingDown,
}
impl fmt::Display for TransactionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
| Self::ShuttingDown => write!(f, "Server is shutting down"),
}
}
}
impl std::error::Error for TransactionError {}
/// Minimum interval between cache cleanup runs.
/// Exists to prevent thrashing when the cache is full of things that can't be
/// cleared
const CLEANUP_INTERVAL_SECS: u64 = 30;
#[derive(Clone, Debug)]
pub struct CachedTxnResponse {
pub response: send_transaction_message::v1::Response,
pub created: SystemTime,
}
/// Internal state for a federation transaction.
/// Either actively being processed or completed and cached.
#[derive(Clone)]
enum TxnState {
/// Transaction is currently being processed.
Active(Receiver<WrappedTransactionResponse>),
/// Transaction completed and response is cached.
Cached(CachedTxnResponse),
}
/// Result of atomically checking or starting a federation transaction.
pub enum FederationTxnState {
/// Transaction already completed and cached
Cached(send_transaction_message::v1::Response),
/// Transaction is currently being processed by another request.
/// Wait on this receiver for the result.
Active(Receiver<WrappedTransactionResponse>),
/// This caller should process the transaction (first to request it).
Started {
receiver: Receiver<WrappedTransactionResponse>,
sender: Sender<WrappedTransactionResponse>,
},
}
pub struct Service {
services: Services,
db: Data,
federation_txn_state: Arc<SyncRwLock<HashMap<TxnKey, TxnState>>>,
last_cleanup: AtomicU64,
}
struct Services {
config: Dep<config::Service>,
}
struct Data {
userdevicetxnid_response: Arc<Map>,
}
#[async_trait]
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
services: Services {
config: args.depend::<config::Service>("config"),
},
db: Data {
userdevicetxnid_response: args.db["userdevicetxnid_response"].clone(),
},
federation_txn_state: Arc::new(SyncRwLock::new(HashMap::new())),
last_cleanup: AtomicU64::new(0),
}))
}
async fn clear_cache(&self) {
let mut state = self.federation_txn_state.write();
// Only clear cached entries, preserve active transactions
state.retain(|_, v| matches!(v, TxnState::Active(_)));
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Returns the count of currently active (in-progress) transactions.
#[must_use]
pub fn txn_active_handle_count(&self) -> usize {
let state = self.federation_txn_state.read();
state
.values()
.filter(|v| matches!(v, TxnState::Active(_)))
.count()
}
pub fn add_client_txnid(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &TransactionId,
data: &[u8],
) {
let mut key = user_id.as_bytes().to_vec();
key.push(0xFF);
key.extend_from_slice(device_id.map(DeviceId::as_bytes).unwrap_or_default());
key.push(0xFF);
key.extend_from_slice(txn_id.as_bytes());
self.db.userdevicetxnid_response.insert(&key, data);
}
pub async fn get_client_txn(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &TransactionId,
) -> Result<Handle<'_>> {
let key = (user_id, device_id, txn_id);
self.db.userdevicetxnid_response.qry(&key).await
}
/// Atomically gets a cached response, joins an active transaction, or
/// starts a new one.
pub fn get_or_start_federation_txn(&self, key: TxnKey) -> Result<FederationTxnState> {
// Only one upgradable lock can be held at a time, and there aren't any
// read-only locks, so no point being upgradable
let mut state = self.federation_txn_state.write();
// Check existing state for this key
if let Some(txn_state) = state.get(&key) {
return Ok(match txn_state {
| TxnState::Cached(cached) => FederationTxnState::Cached(cached.response.clone()),
| TxnState::Active(receiver) => FederationTxnState::Active(receiver.clone()),
});
}
// Check if another transaction from this origin is already running
let has_active_from_origin = state
.iter()
.any(|(k, v)| k.0 == key.0 && matches!(v, TxnState::Active(_)));
if has_active_from_origin {
debug_warn!(
origin = ?key.0,
"Got concurrent transaction request from an origin with an active transaction"
);
return Err(Error::BadRequest(
LimitExceeded { retry_after: None },
"Still processing another transaction from this origin",
));
}
let max_active_txns = self.services.config.max_concurrent_inbound_transactions;
// Check if we're at capacity
if state.len() >= max_active_txns
&& let active_count = state
.values()
.filter(|v| matches!(v, TxnState::Active(_)))
.count() && active_count >= max_active_txns
{
warn!(
active = active_count,
max = max_active_txns,
"Server is overloaded, dropping incoming transaction"
);
return Err(Error::BadRequest(
LimitExceeded { retry_after: None },
"Server is overloaded, try again later",
));
}
// Start new transaction
let (sender, receiver) = tokio::sync::watch::channel(None);
state.insert(key, TxnState::Active(receiver.clone()));
Ok(FederationTxnState::Started { receiver, sender })
}
/// Finishes a transaction by transitioning it from active to cached state.
/// Additionally may trigger cleanup of old entries.
pub fn finish_federation_txn(
&self,
key: TxnKey,
sender: Sender<WrappedTransactionResponse>,
response: send_transaction_message::v1::Response,
) {
// Check if cleanup might be needed before acquiring the lock
let should_try_cleanup = self.should_try_cleanup();
let mut state = self.federation_txn_state.write();
// Explicitly set cached first so there is no gap where receivers get a closed
// channel
state.insert(
key,
TxnState::Cached(CachedTxnResponse {
response: response.clone(),
created: SystemTime::now(),
}),
);
if let Err(e) = sender.send(Some(Ok(response))) {
debug_warn!("Failed to send transaction response to waiting receivers: {e}");
}
// Explicitly close
drop(sender);
// This task is dangling, we can try clean caches now
if should_try_cleanup {
self.cleanup_entries_locked(&mut state);
}
}
pub fn remove_federation_txn(&self, key: &TxnKey) {
let mut state = self.federation_txn_state.write();
state.remove(key);
}
/// Checks if enough time has passed since the last cleanup to consider
/// running another. Updates the last cleanup time if returning true.
fn should_try_cleanup(&self) -> bool {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("SystemTime before UNIX_EPOCH")
.as_secs();
let last = self.last_cleanup.load(Ordering::Relaxed);
if now.saturating_sub(last) >= CLEANUP_INTERVAL_SECS {
// CAS: only update if no one else has updated it since we read
self.last_cleanup
.compare_exchange(last, now, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
} else {
false
}
}
/// Cleans up cached entries based on age and count limits.
///
/// First removes all cached entries older than the configured max age.
/// Then, if the cache still exceeds the max entry count, removes the oldest
/// cached entries until the count is within limits.
///
/// Must be called with write lock held on the state map.
fn cleanup_entries_locked(&self, state: &mut HashMap<TxnKey, TxnState>) {
let max_age_secs = self.services.config.transaction_id_cache_max_age_secs;
let max_entries = self.services.config.transaction_id_cache_max_entries;
// First pass: remove all cached entries older than max age
let cutoff = SystemTime::now()
.checked_sub(Duration::from_secs(max_age_secs))
.unwrap_or(SystemTime::UNIX_EPOCH);
state.retain(|_, v| match v {
| TxnState::Active(_) => true, // Never remove active transactions
| TxnState::Cached(cached) => cached.created > cutoff,
});
// Count cached entries
let cached_count = state
.values()
.filter(|v| matches!(v, TxnState::Cached(_)))
.count();
// Second pass: if still over max entries, remove oldest cached entries
if cached_count > max_entries {
let excess = cached_count.saturating_sub(max_entries);
// Collect cached entries sorted by age (oldest first)
let mut cached_entries: Vec<_> = state
.iter()
.filter_map(|(k, v)| match v {
| TxnState::Cached(cached) => Some((k.clone(), cached.created)),
| TxnState::Active(_) => None,
})
.collect();
cached_entries.sort_by(|a, b| a.1.cmp(&b.1));
// Remove the oldest cached entries to get under the limit
for (key, _) in cached_entries.into_iter().take(excess) {
state.remove(&key);
}
}
}
}

View File

@@ -184,6 +184,12 @@ pub async fn create(
password: Option<&str>,
origin: Option<&str>,
) -> Result<()> {
if !self.services.globals.user_is_local(user_id)
&& (password.is_some() || origin.is_some())
{
return Err!("Cannot create a nonlocal user with a set password or origin");
}
self.db
.userid_origin
.insert(user_id, origin.unwrap_or("password"));