Compare commits

..

31 Commits

Author SHA1 Message Date
Tom Foster
5a44f3e5e4 docs(docker): Restructure deployment guide and add env var reference
Add Quick Run section with complete getting-started workflow including
admin user creation via --execute flag. Consolidate Docker Compose to
treat reverse proxy as essential with Traefik/Caddy/nginx examples.

Move detailed image building to development guide, keeping deployment
docs focused on using pre-built images.

Create environment variables reference with practical examples and
context. Clarify built-in TLS is for testing only; production should
use reverse proxies.
2026-02-23 17:57:40 +00:00
timedout
558262dd1f chore: Refactor transaction_ids -> transactions 2026-02-23 17:44:35 +00:00
timedout
d311b87579 chore: Fix incorrect capitalisation
I didn't realise I agreed to take an English class with @ginger while
working on this server lol
2026-02-23 17:25:12 +00:00
timedout
8702f55cf5 fix: Don't panic if nobody's listening 2026-02-23 17:22:37 +00:00
timedout
d4481b07ac chore: Add news frag 2026-02-23 16:54:54 +00:00
Jade Ellis
92351df925 refactor: Make federation transaction handle errors correctly
We have a dedicated error type that's then matched.
Event sorting is now infallible.
Could probably be cleaned up in a bit.
2026-02-23 16:36:46 +00:00
Jade Ellis
47e2733ea1 refactor: Make stream utils generic over the error type 2026-02-23 16:36:46 +00:00
Jade Ellis
6637e4c6a7 fix: Clean up cache, prevent several race conditions
We use one map which is only ever held for a short time.
2026-02-23 16:36:46 +00:00
nexy7574
35e441452f feat: Attempt to build localised DAG before processing PDUs 2026-02-23 16:36:46 +00:00
nexy7574
66bbb655bf feat: Warn when server is overloaded 2026-02-23 16:36:45 +00:00
nexy7574
81b202ce51 chore: Decrease transaction log verbosity 2026-02-23 16:36:45 +00:00
nexy7574
4657844d46 feat: Show active transaction handle count in !admin federation incoming-federation 2026-02-23 16:36:45 +00:00
nexy7574
9016cd11a6 chore: Run pre-commit and clippy to fix inherited CI errs 2026-02-23 16:36:45 +00:00
nexy7574
dd70094719 feat: Make max_active_txns actually configurable 2026-02-23 16:36:45 +00:00
nexy7574
fcd49b7ab3 fix: Remove duplicate fields from logs 2026-02-23 16:36:45 +00:00
nexy7574
470c9b52dd feat: Instrument process_inbound_transaction 2026-02-23 16:36:45 +00:00
nexy7574
0d8cafc329 feat: Support casting transaction processing to the background 2026-02-23 16:36:44 +00:00
nexy7574
2f9956ddca feat: Add helper functions for federation channels 2026-02-23 16:36:44 +00:00
nexy7574
21a97cdd0b chore: Refactor existing references to transaction service 2026-02-23 16:36:44 +00:00
nexy7574
e986cd4536 feat(federation): Restructure transaction_ids service
Adds two new in-memory maps to the service in to prepare for better handlers
2026-02-23 16:36:40 +00:00
Shane Jaroch
526d862296 fix: more aggressive user agent for URL preview
adding "facebookexternalhit" alongside "embedbot" fixes many errors, such as YouTube Music's:
    "Your browser is deprecated. Please upgrade."

add admin command to clear URL stuck and broken data (per URL currently)

    add command to clear all saved URL previews.
    sync resolver docs.
2026-02-23 15:24:14 +00:00
Ben Botwin
fbeb5bf186 report permission denied errors 2026-02-23 15:22:18 +00:00
Ben Botwin
a336f2df44 fixed formatting 2026-02-23 15:22:18 +00:00
Ben Botwin
19b78ec73e made error handling more concise 2026-02-23 15:22:18 +00:00
Ben Botwin
27ff2d9363 added more granular error handling for other file fetch function 2026-02-23 15:22:18 +00:00
Ben Botwin
50fa8c3abf ran format 2026-02-23 15:22:18 +00:00
Ben Botwin
18c4be869f added handling for other potential errors 2026-02-23 15:22:18 +00:00
Ben Botwin
fc00b96d8b Added proper 404 for not found media and fixed devshell for running tests 2026-02-23 15:22:18 +00:00
Jade Ellis
fa4156d8a6 docs: Changelog 2026-02-22 21:19:20 +00:00
Jade Ellis
23638cd714 feat(appservices): MSC3202 Device masquerading for appservices 2026-02-22 21:19:20 +00:00
Raven
9f1a483e76 docs: Add information about partnered homeservers to the introduction page & update README.md
Includes step-by-step directions to ease the lift for those who have ended up
here and who have never created a matrix account or used matrix before in the
past.

Also updates the information in README.md to match, as these should generally be identical.
2026-02-21 18:51:56 -08:00
83 changed files with 1721 additions and 1253 deletions

26
Cargo.lock generated
View File

@@ -1014,7 +1014,6 @@ dependencies = [
"nix",
"num-traits",
"parking_lot",
"paste",
"rand 0.10.0",
"rand_core 0.6.4",
"regex",
@@ -1028,7 +1027,7 @@ dependencies = [
"serde_regex",
"smallstr",
"smallvec",
"snafu",
"thiserror 2.0.18",
"tikv-jemalloc-ctl",
"tikv-jemalloc-sys",
"tikv-jemallocator",
@@ -1155,7 +1154,7 @@ dependencies = [
"conduwuit_service",
"futures",
"rand 0.10.0",
"snafu",
"thiserror 2.0.18",
"tracing",
]
@@ -4912,27 +4911,6 @@ dependencies = [
"serde",
]
[[package]]
name = "snafu"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e84b3f4eacbf3a1ce05eac6763b4d629d60cbc94d632e4092c54ade71f1e1a2"
dependencies = [
"snafu-derive",
]
[[package]]
name = "snafu-derive"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1c97747dbf44bb1ca44a561ece23508e99cb592e862f22222dcf42f51d1e451"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "socket2"
version = "0.5.10"

View File

@@ -307,14 +307,9 @@ features = [
]
# Used for conduwuit::Error type
[workspace.dependencies.snafu]
version = "0.8"
[workspace.dependencies.thiserror]
version = "2.0.12"
default-features = false
features = ["std", "rust_1_81"]
# Used for macro name generation
[workspace.dependencies.paste]
version = "1.0"
# Used when hashing the state
[workspace.dependencies.ring]

View File

@@ -57,10 +57,15 @@ ### What are the project's goals?
### Can I try it out?
Check out the [documentation](https://continuwuity.org) for installation instructions, or join one of these vetted public homeservers running Continuwuity to get a feel for things!
Check out the [documentation](https://continuwuity.org) for installation instructions.
- https://continuwuity.rocks -- A public demo server operated by the Continuwuity Team.
- https://federated.nexus -- Federated Nexus is a community resource hosting multiple FOSS (especially federated) services, including Matrix and Forgejo.
If you want to try it out as a user, we have some partnered homeservers you can use:
* You can head over to [https://federated.nexus](https://federated.nexus/) in your browser.
* Hit the `Apply to Join` button. Once your request has been accepted, you will receive an email with your username and password.
* Head over to [https://app.federated.nexus](https://app.federated.nexus/) and you can sign in there, or use any other matrix chat client you wish elsewhere.
* Your username for matrix will be in the form of `@username:federated.nexus`, however you can simply use the `username` part to log in. Your password is your password.
* There's also [https://continuwuity.rocks/](https://continuwuity.rocks/). You can register a new account using Cinny via [this convenient link](https://app.cinny.in/register/continuwuity.rocks), or you can use Element or another matrix client *that supports registration*.
### What are we working on?

1
changelog.d/1428.feature Normal file
View File

@@ -0,0 +1 @@
Improved the concurrency handling of federation transactions, vastly improving performance and reliability by more accurately handling inbound transactions and reducing the amount of repeated wasted work. Contributed by @nex and @Jade.

View File

@@ -0,0 +1 @@
Added MSC3202 Device masquerading (not all of MSC3202). This should fix issues with enabling MSC4190 for some Mautrix bridges. Contributed by @Jade

View File

@@ -0,0 +1 @@
Improved URL preview fetching with a more compatible user agent for sites like YouTube Music. Added `!admin media delete-url-preview <url>` command to clear cached URL previews that were stuck and broken.

View File

@@ -290,6 +290,25 @@
#
#max_fetch_prev_events = 192
# How many incoming federation transactions the server is willing to be
# processing at any given time before it becomes overloaded and starts
# rejecting further transactions until some slots become available.
#
# Setting this value too low or too high may result in unstable
# federation, and setting it too high may cause runaway resource usage.
#
#max_concurrent_inbound_transactions = 150
# Maximum age (in seconds) for cached federation transaction responses.
# Entries older than this will be removed during cleanup.
#
#transaction_id_cache_max_age_secs = 7200 (2 hours)
# Maximum number of cached federation transaction responses.
# When the cache exceeds this limit, older entries will be removed.
#
#transaction_id_cache_max_entries = 8192
# Default/base connection timeout (seconds). This is used only by URL
# previews and update/news endpoint checks.
#

View File

@@ -64,6 +64,11 @@
"label": "Configuration Reference",
"name": "/reference/config"
},
{
"type": "file",
"label": "Environment Variables",
"name": "/reference/environment-variables"
},
{
"type": "dir",
"label": "Admin Command Reference",

View File

@@ -8,7 +8,6 @@ services:
restart: unless-stopped
volumes:
- db:/var/lib/continuwuity
- /etc/resolv.conf:/etc/resolv.conf:ro # Use the host's DNS resolver rather than Docker's.
#- ./continuwuity.toml:/etc/continuwuity.toml
networks:
- proxy

View File

@@ -2,28 +2,26 @@ # Continuwuity for Docker
## Docker
To run Continuwuity with Docker, you can either build the image yourself or pull it
from a registry.
To run Continuwuity with Docker, you can either build the image yourself or pull
it from a registry.
### Use a registry
OCI images for Continuwuity are available in the registries listed below.
Available OCI images:
| Registry | Image | Notes |
| --------------- | --------------------------------------------------------------- | -----------------------|
| Forgejo Registry| [forgejo.ellis.link/continuwuation/continuwuity:latest](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/latest) | Latest tagged image. |
| Forgejo Registry| [forgejo.ellis.link/continuwuation/continuwuity:main](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/main) | Main branch image. |
| Forgejo Registry| [forgejo.ellis.link/continuwuation/continuwuity:latest-maxperf](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/latest-maxperf) | [Performance optimised version.](./generic.mdx#performance-optimised-builds) |
| Forgejo Registry| [forgejo.ellis.link/continuwuation/continuwuity:main-maxperf](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/main-maxperf) | [Performance optimised version.](./generic.mdx#performance-optimised-builds) |
| Registry | Image | Notes |
| ---------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------- |
| Forgejo Registry | [forgejo.ellis.link/continuwuation/continuwuity:latest](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/latest) | Latest tagged image. |
| Forgejo Registry | [forgejo.ellis.link/continuwuation/continuwuity:main](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/main) | Main branch image. |
| Forgejo Registry | [forgejo.ellis.link/continuwuation/continuwuity:latest-maxperf](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/latest-maxperf) | [Performance optimised version.](./generic.mdx#performance-optimised-builds) |
| Forgejo Registry | [forgejo.ellis.link/continuwuation/continuwuity:main-maxperf](https://forgejo.ellis.link/continuwuation/-/packages/container/continuwuity/main-maxperf) | [Performance optimised version.](./generic.mdx#performance-optimised-builds) |
Use
**Example:**
```bash
docker image pull $LINK
docker image pull forgejo.ellis.link/continuwuation/continuwuity:main-maxperf
```
to pull it to your machine.
#### Mirrors
Images are mirrored to multiple locations automatically, on a schedule:
@@ -33,39 +31,146 @@ #### Mirrors
- `registry.gitlab.com/continuwuity/continuwuity`
- `git.nexy7574.co.uk/mirrored/continuwuity` (releases only, no `main`)
### Run
### Quick Run
When you have the image, you can simply run it with
Get a working Continuwuity server with an admin user in four steps:
#### Prerequisites
Continuwuity requires HTTPS for Matrix federation. You'll need:
- A domain name pointing to your server
- A reverse proxy with SSL/TLS certificates (Traefik, Caddy, nginx, etc.)
See [Docker Compose](#docker-compose) for complete examples.
#### Environment Variables
- `CONTINUWUITY_SERVER_NAME` - Your Matrix server's domain name
- `CONTINUWUITY_DATABASE_PATH` - Where to store your database (must match the
volume mount)
- `CONTINUWUITY_ADDRESS` - Bind address (use `0.0.0.0` to listen on all
interfaces)
- `CONTINUWUITY_ALLOW_REGISTRATION` - Set to `false` to disable registration, or
use with `CONTINUWUITY_REGISTRATION_TOKEN` to require a token (see
[reference](../reference/environment-variables.mdx#registration--user-configuration)
for details)
See the
[Environment Variables Reference](../reference/environment-variables.mdx) for
more configuration options.
#### 1. Pull the image
```bash
docker run -d -p 8448:6167 \
-v db:/var/lib/continuwuity/ \
-e CONTINUWUITY_SERVER_NAME="your.server.name" \
-e CONTINUWUITY_ALLOW_REGISTRATION=false \
--name continuwuity $LINK
docker pull forgejo.ellis.link/continuwuation/continuwuity:latest
```
or you can use [Docker Compose](#docker-compose).
#### 2. Start the server with initial admin user
The `-d` flag lets the container run in detached mode. You may supply an
optional `continuwuity.toml` config file, the example config can be found
[here](../reference/config.mdx). You can pass in different env vars to
change config values on the fly. You can even configure Continuwuity completely by
using env vars. For an overview of possible values, please take a look at the
<a href="/examples/docker-compose.yml" target="_blank">`docker-compose.yml`</a> file.
```bash
docker run -d \
-p 6167:6167 \
-v continuwuity_db:/var/lib/continuwuity \
-e CONTINUWUITY_SERVER_NAME="matrix.example.com" \
-e CONTINUWUITY_DATABASE_PATH="/var/lib/continuwuity" \
-e CONTINUWUITY_ADDRESS="0.0.0.0" \
-e CONTINUWUITY_ALLOW_REGISTRATION="false" \
--name continuwuity \
forgejo.ellis.link/continuwuation/continuwuity:latest \
--execute "users create-user admin"
```
If you just want to test Continuwuity for a short time, you can use the `--rm`
flag, which cleans up everything related to your container after you stop
it.
Replace `matrix.example.com` with your actual server name and `admin` with
your preferred username.
#### 3. Get your admin password
```bash
docker logs continuwuity 2>&1 | grep "Created user"
```
You'll see output like:
```
Created user with user_id: @admin:matrix.example.com and password: `[auto-generated-password]`
```
#### 4. Configure your reverse proxy
Configure your reverse proxy to forward HTTPS traffic to Continuwuity. See
[Docker Compose](#docker-compose) for examples.
Once configured, log in with any Matrix client using `@admin:matrix.example.com`
and the generated password. You'll automatically be invited to the admin room
where you can manage your server.
### Docker Compose
If the `docker run` command is not suitable for you or your setup, you can also use one
of the provided `docker-compose` files.
Docker Compose is the recommended deployment method. These examples include
reverse proxy configurations for Matrix federation.
Depending on your proxy setup, you can use one of the following files:
#### Matrix Federation Requirements
### For existing Traefik setup
For Matrix federation to work, you need to serve `.well-known/matrix/client` and
`.well-known/matrix/server` endpoints. You can achieve this either by:
1. **Using a well-known service** - The compose files below include an nginx
container to serve these files
2. **Using Continuwuity's built-in delegation** (easier for Traefik) - Configure
delegation files in your config, then proxy `/.well-known/matrix/*` to
Continuwuity
**Traefik example using built-in delegation:**
```yaml
labels:
traefik.http.routers.continuwuity.rule: >-
(Host(`matrix.example.com`) ||
(Host(`example.com`) && PathPrefix(`/.well-known/matrix`)))
```
This routes your Matrix domain and well-known paths to Continuwuity.
#### Creating Your First Admin User
Add the `--execute` command to create an admin user on first startup. In your
compose file, add under the `continuwuity` service:
```yaml
services:
continuwuity:
image: forgejo.ellis.link/continuwuation/continuwuity:latest
command: --execute "users create-user admin"
# ... rest of configuration
```
Then retrieve the auto-generated password:
```bash
docker compose logs continuwuity | grep "Created user"
```
#### Choose Your Reverse Proxy
Select the compose file that matches your setup:
:::note DNS Performance
Docker's default DNS resolver can cause performance issues with Matrix
federation. If you experience slow federation or DNS timeouts, you may need to
use your host's DNS resolver instead. Add this volume mount to the
`continuwuity` service:
```yaml
volumes:
- /etc/resolv.conf:/etc/resolv.conf:ro
```
See [Troubleshooting - DNS Issues](../troubleshooting.mdx#potential-dns-issues-when-using-docker)
for more details and alternative solutions.
:::
##### For existing Traefik setup
<details>
<summary>docker-compose.for-traefik.yml</summary>
@@ -76,7 +181,7 @@ ### For existing Traefik setup
</details>
### With Traefik included
##### With Traefik included
<details>
<summary>docker-compose.with-traefik.yml</summary>
@@ -87,7 +192,7 @@ ### With Traefik included
</details>
### With Caddy Docker Proxy
##### With Caddy Docker Proxy
<details>
<summary>docker-compose.with-caddy.yml</summary>
@@ -98,9 +203,15 @@ ### With Caddy Docker Proxy
```
If you don't already have a network for Caddy to monitor, create one first:
```bash
docker network create caddy
```
</details>
### For other reverse proxies
##### For other reverse proxies
<details>
<summary>docker-compose.yml</summary>
@@ -111,7 +222,7 @@ ### For other reverse proxies
</details>
### Override file
##### Override file for customisation
<details>
<summary>docker-compose.override.yml</summary>
@@ -122,98 +233,24 @@ ### Override file
</details>
When picking the Traefik-related compose file, rename it to
`docker-compose.yml`, and rename the override file to
`docker-compose.override.yml`. Edit the latter with the values you want for your
server.
#### Starting Your Server
When picking the `caddy-docker-proxy` compose file, it's important to first
create the `caddy` network before spinning up the containers:
```bash
docker network create caddy
```
After that, you can rename it to `docker-compose.yml` and spin up the
containers!
Additional info about deploying Continuwuity can be found [here](generic.mdx).
### Build
Official Continuwuity images are built using **Docker Buildx** and the Dockerfile found at [`docker/Dockerfile`][dockerfile-path]. This approach uses common Docker tooling and enables efficient multi-platform builds.
The resulting images are widely compatible with Docker and other container runtimes like Podman or containerd.
The images *do not contain a shell*. They contain only the Continuwuity binary, required libraries, TLS certificates, and metadata.
<details>
<summary>Click to view the Dockerfile</summary>
You can also <a href="https://forgejo.ellis.link/continuwuation/continuwuation/src/branch/main/docker/Dockerfile" target="_blank">view the Dockerfile on Forgejo</a>.
```dockerfile file="../../docker/Dockerfile"
```
</details>
To build an image locally using Docker Buildx, you can typically run a command like:
```bash
# Build for the current platform and load into the local Docker daemon
docker buildx build --load --tag continuwuity:latest -f docker/Dockerfile .
# Example: Build for specific platforms and push to a registry.
# docker buildx build --platform linux/amd64,linux/arm64 --tag registry.io/org/continuwuity:latest -f docker/Dockerfile . --push
# Example: Build binary optimised for the current CPU (standard release profile)
# docker buildx build --load \
# --tag continuwuity:latest \
# --build-arg TARGET_CPU=native \
# -f docker/Dockerfile .
# Example: Build maxperf variant (release-max-perf profile with LTO)
# Optimised for runtime performance and smaller binary size, but requires longer build time
# docker buildx build --load \
# --tag continuwuity:latest-maxperf \
# --build-arg TARGET_CPU=native \
# --build-arg RUST_PROFILE=release-max-perf \
# -f docker/Dockerfile .
```
Refer to the Docker Buildx documentation for more advanced build options.
[dockerfile-path]: https://forgejo.ellis.link/continuwuation/continuwuation/src/branch/main/docker/Dockerfile
### Run
If you have already built the image or want to use one from the registries, you
can start the container and everything else in the compose file in detached
mode with:
1. Choose your compose file and rename it to `docker-compose.yml`
2. If using the override file, rename it to `docker-compose.override.yml` and
edit your values
3. Start the server:
```bash
docker compose up -d
```
> **Note:** Don't forget to modify and adjust the compose file to your needs.
See the [generic deployment guide](generic.mdx) for more deployment options.
### Use Traefik as Proxy
### Building Custom Images
As a container user, you probably know about Traefik. It is an easy-to-use
reverse proxy for making containerized apps and services available through the
web. With the Traefik-related docker-compose files provided above, it is equally easy
to deploy and use Continuwuity, with a small caveat. If you have already looked at
the files, you should have seen the `well-known` service, which is the
small caveat. Traefik is simply a proxy and load balancer and cannot
serve any kind of content. For Continuwuity to federate, we need to either
expose ports `443` and `8448` or serve two endpoints: `.well-known/matrix/client`
and `.well-known/matrix/server`.
With the service `well-known`, we use a single `nginx` container that serves
those two files.
Alternatively, you can use Continuwuity's built-in delegation file capability. Set up the delegation files in the configuration file, and then proxy paths under `/.well-known/matrix` to continuwuity. For example, the label ``traefik.http.routers.continuwuity.rule=(Host(`matrix.ellis.link`) || (Host(`ellis.link`) && PathPrefix(`/.well-known/matrix`)))`` does this for the domain `ellis.link`.
For information on building your own Continuwuity Docker images, see the
[Building Docker Images](../development/index.mdx#building-docker-images)
section in the development documentation.
## Voice communication

View File

@@ -2,7 +2,8 @@ # Development
Information about developing the project. If you are only interested in using
it, you can safely ignore this page. If you plan on contributing, see the
[contributor's guide](./contributing.mdx) and [code style guide](./code_style.mdx).
[contributor's guide](./contributing.mdx) and
[code style guide](./code_style.mdx).
## Continuwuity project layout
@@ -12,86 +13,98 @@ ## Continuwuity project layout
`Cargo.toml`.
The crate names are generally self-explanatory:
- `admin` is the admin room
- `api` is the HTTP API, Matrix C-S and S-S endpoints, etc
- `core` is core Continuwuity functionality like config loading, error definitions,
global utilities, logging infrastructure, etc
- `database` is RocksDB methods, helpers, RocksDB config, and general database definitions,
utilities, or functions
- `macros` are Continuwuity Rust [macros][macros] like general helper macros, logging
and error handling macros, and [syn][syn] and [procedural macros][proc-macro]
used for admin room commands and others
- `core` is core Continuwuity functionality like config loading, error
definitions, global utilities, logging infrastructure, etc
- `database` is RocksDB methods, helpers, RocksDB config, and general database
definitions, utilities, or functions
- `macros` are Continuwuity Rust [macros][macros] like general helper macros,
logging and error handling macros, and [syn][syn] and [procedural
macros][proc-macro] used for admin room commands and others
- `main` is the "primary" sub-crate. This is where the `main()` function lives,
tokio worker and async initialisation, Sentry initialisation, [clap][clap] init,
and signal handling. If you are adding new [Rust features][features], they *must*
go here.
- `router` is the webserver and request handling bits, using axum, tower, tower-http,
hyper, etc, and the [global server state][state] to access `services`.
tokio worker and async initialisation, Sentry initialisation, [clap][clap]
init, and signal handling. If you are adding new [Rust features][features],
they _must_ go here.
- `router` is the webserver and request handling bits, using axum, tower,
tower-http, hyper, etc, and the [global server state][state] to access
`services`.
- `service` is the high-level database definitions and functions for data,
outbound/sending code, and other business logic such as media fetching.
outbound/sending code, and other business logic such as media fetching.
It is highly unlikely you will ever need to add a new workspace member, but
if you truly find yourself needing to, we recommend reaching out to us in
the Matrix room for discussions about it beforehand.
It is highly unlikely you will ever need to add a new workspace member, but if
you truly find yourself needing to, we recommend reaching out to us in the
Matrix room for discussions about it beforehand.
The primary inspiration for this design was apart of hot reloadable development,
to support "Continuwuity as a library" where specific parts can simply be swapped out.
There is evidence Conduit wanted to go this route too as `axum` is technically an
optional feature in Conduit, and can be compiled without the binary or axum library
for handling inbound web requests; but it was never completed or worked.
to support "Continuwuity as a library" where specific parts can simply be
swapped out. There is evidence Conduit wanted to go this route too as `axum` is
technically an optional feature in Conduit, and can be compiled without the
binary or axum library for handling inbound web requests; but it was never
completed or worked.
See the Rust documentation on [Workspaces][workspaces] for general questions
and information on Cargo workspaces.
See the Rust documentation on [Workspaces][workspaces] for general questions and
information on Cargo workspaces.
## Adding compile-time [features][features]
If you'd like to add a compile-time feature, you must first define it in
the `main` workspace crate located in `src/main/Cargo.toml`. The feature must
enable a feature in the other workspace crate(s) you intend to use it in. Then
the said workspace crate(s) must define the feature there in its `Cargo.toml`.
If you'd like to add a compile-time feature, you must first define it in the
`main` workspace crate located in `src/main/Cargo.toml`. The feature must enable
a feature in the other workspace crate(s) you intend to use it in. Then the said
workspace crate(s) must define the feature there in its `Cargo.toml`.
So, if this is adding a feature to the API such as `woof`, you define the feature
in the `api` crate's `Cargo.toml` as `woof = []`. The feature definition in `main`'s
`Cargo.toml` will be `woof = ["conduwuit-api/woof"]`.
So, if this is adding a feature to the API such as `woof`, you define the
feature in the `api` crate's `Cargo.toml` as `woof = []`. The feature definition
in `main`'s `Cargo.toml` will be `woof = ["conduwuit-api/woof"]`.
The rationale for this is due to Rust / Cargo not supporting
["workspace level features"][9], we must make a choice of; either scattering
features all over the workspace crates, making it difficult for anyone to add
or remove default features; or define all the features in one central workspace
crate that propagate down/up to the other workspace crates. It is a Cargo pitfall,
and we'd like to see better developer UX in Rust's Workspaces.
The rationale for this is due to Rust / Cargo not supporting ["workspace level
features"][9], we must make a choice of; either scattering features all over the
workspace crates, making it difficult for anyone to add or remove default
features; or define all the features in one central workspace crate that
propagate down/up to the other workspace crates. It is a Cargo pitfall, and we'd
like to see better developer UX in Rust's Workspaces.
Additionally, the definition of one single place makes "feature collection" in our
Nix flake a million times easier instead of collecting and deduping them all from
searching in all the workspace crates' `Cargo.toml`s. Though we wouldn't need to
do this if Rust supported workspace-level features to begin with.
Additionally, the definition of one single place makes "feature collection" in
our Nix flake a million times easier instead of collecting and deduping them all
from searching in all the workspace crates' `Cargo.toml`s. Though we wouldn't
need to do this if Rust supported workspace-level features to begin with.
## List of forked dependencies
During Continuwuity (and prior projects) development, we have had to fork some dependencies to support our use-cases.
These forks exist for various reasons including features that upstream projects won't accept,
faster-paced development, Continuwuity-specific usecases, or lack of time to upstream changes.
During Continuwuity (and prior projects) development, we have had to fork some
dependencies to support our use-cases. These forks exist for various reasons
including features that upstream projects won't accept, faster-paced
development, Continuwuity-specific usecases, or lack of time to upstream
changes.
All forked dependencies are maintained under the [continuwuation organization on Forgejo](https://forgejo.ellis.link/continuwuation):
All forked dependencies are maintained under the
[continuwuation organization on Forgejo](https://forgejo.ellis.link/continuwuation):
- [ruwuma][continuwuation-ruwuma] - Fork of [ruma/ruma][ruma] with various performance improvements, more features and better client/server interop
- [rocksdb][continuwuation-rocksdb] - Fork of [facebook/rocksdb][rocksdb] via [`@zaidoon1`][8] with liburing build fixes and GCC debug build fixes
- [jemallocator][continuwuation-jemallocator] - Fork of [tikv/jemallocator][jemallocator] fixing musl builds, suspicious code,
and adding support for redzones in Valgrind
- [rustyline-async][continuwuation-rustyline-async] - Fork of [zyansheep/rustyline-async][rustyline-async] with tab completion callback
and `CTRL+\` signal quit event for Continuwuity console CLI
- [rust-rocksdb][continuwuation-rust-rocksdb] - Fork of [rust-rocksdb/rust-rocksdb][rust-rocksdb] fixing musl build issues,
removing unnecessary `gtest` include, and using our RocksDB and jemallocator forks
- [tracing][continuwuation-tracing] - Fork of [tokio-rs/tracing][tracing] implementing `Clone` for `EnvFilter` to
support dynamically changing tracing environments
- [ruwuma][continuwuation-ruwuma] - Fork of [ruma/ruma][ruma] with various
performance improvements, more features and better client/server interop
- [rocksdb][continuwuation-rocksdb] - Fork of [facebook/rocksdb][rocksdb] via
[`@zaidoon1`][8] with liburing build fixes and GCC debug build fixes
- [jemallocator][continuwuation-jemallocator] - Fork of
[tikv/jemallocator][jemallocator] fixing musl builds, suspicious code, and
adding support for redzones in Valgrind
- [rustyline-async][continuwuation-rustyline-async] - Fork of
[zyansheep/rustyline-async][rustyline-async] with tab completion callback and
`CTRL+\` signal quit event for Continuwuity console CLI
- [rust-rocksdb][continuwuation-rust-rocksdb] - Fork of
[rust-rocksdb/rust-rocksdb][rust-rocksdb] fixing musl build issues, removing
unnecessary `gtest` include, and using our RocksDB and jemallocator forks
- [tracing][continuwuation-tracing] - Fork of [tokio-rs/tracing][tracing]
implementing `Clone` for `EnvFilter` to support dynamically changing tracing
environments
## Debugging with `tokio-console`
[`tokio-console`][7] can be a useful tool for debugging and profiling. To make a
`tokio-console`-enabled build of Continuwuity, enable the `tokio_console` feature,
disable the default `release_max_log_level` feature, and set the `--cfg
tokio_unstable` flag to enable experimental tokio APIs. A build might look like
this:
`tokio-console`-enabled build of Continuwuity, enable the `tokio_console`
feature, disable the default `release_max_log_level` feature, and set the
`--cfg tokio_unstable` flag to enable experimental tokio APIs. A build might
look like this:
```bash
RUSTFLAGS="--cfg tokio_unstable" cargo +nightly build \
@@ -100,34 +113,84 @@ ## Debugging with `tokio-console`
--features=systemd,element_hacks,gzip_compression,brotli_compression,zstd_compression,tokio_console
```
You will also need to enable the `tokio_console` config option in Continuwuity when
starting it. This was due to tokio-console causing gradual memory leak/usage
if left enabled.
You will also need to enable the `tokio_console` config option in Continuwuity
when starting it. This was due to tokio-console causing gradual memory
leak/usage if left enabled.
## Building Docker Images
To build a Docker image for Continuwuity, use the standard Docker build command:
Official Continuwuity images are built using **Docker Buildx** and the
Dockerfile found at [`docker/Dockerfile`][dockerfile-path].
The images are compatible with Docker and other container runtimes like Podman
or containerd.
The images _do not contain a shell_. They contain only the Continuwuity binary,
required libraries, TLS certificates, and metadata.
<details>
<summary>Click to view the Dockerfile</summary>
You can also
<a
href="<https://forgejo.ellis.link/continuwuation/continuwuation/src/branch/main/docker/Dockerfile>"
target="_blank"
>
view the Dockerfile on Forgejo
</a>
.
```dockerfile file="../../docker/Dockerfile"
```bash
docker build -f docker/Dockerfile .
```
The image can be cross-compiled for different architectures.
</details>
### Building Locally
To build an image locally using Docker Buildx:
```bash
# Build for the current platform and load into the local Docker daemon
docker buildx build --load --tag continuwuity:latest -f docker/Dockerfile .
# Example: Build for specific platforms and push to a registry
# docker buildx build --platform linux/amd64,linux/arm64 --tag registry.io/org/continuwuity:latest -f docker/Dockerfile . --push
# Example: Build binary optimised for the current CPU (standard release profile)
# docker buildx build --load \
# --tag continuwuity:latest \
# --build-arg TARGET_CPU=native \
# -f docker/Dockerfile .
# Example: Build maxperf variant (release-max-perf profile with LTO)
# docker buildx build --load \
# --tag continuwuity:latest-maxperf \
# --build-arg TARGET_CPU=native \
# --build-arg RUST_PROFILE=release-max-perf \
# -f docker/Dockerfile .
```
Refer to the Docker Buildx documentation for more advanced build options.
[dockerfile-path]:
https://forgejo.ellis.link/continuwuation/continuwuation/src/branch/main/docker/Dockerfile
[continuwuation-ruwuma]: https://forgejo.ellis.link/continuwuation/ruwuma
[continuwuation-rocksdb]: https://forgejo.ellis.link/continuwuation/rocksdb
[continuwuation-jemallocator]: https://forgejo.ellis.link/continuwuation/jemallocator
[continuwuation-rustyline-async]: https://forgejo.ellis.link/continuwuation/rustyline-async
[continuwuation-rust-rocksdb]: https://forgejo.ellis.link/continuwuation/rust-rocksdb
[continuwuation-jemallocator]:
https://forgejo.ellis.link/continuwuation/jemallocator
[continuwuation-rustyline-async]:
https://forgejo.ellis.link/continuwuation/rustyline-async
[continuwuation-rust-rocksdb]:
https://forgejo.ellis.link/continuwuation/rust-rocksdb
[continuwuation-tracing]: https://forgejo.ellis.link/continuwuation/tracing
[ruma]: https://github.com/ruma/ruma/
[rocksdb]: https://github.com/facebook/rocksdb/
[jemallocator]: https://github.com/tikv/jemallocator/
[rustyline-async]: https://github.com/zyansheep/rustyline-async/
[rust-rocksdb]: https://github.com/rust-rocksdb/rust-rocksdb/
[tracing]: https://github.com/tokio-rs/tracing/
[7]: https://docs.rs/tokio-console/latest/tokio_console/
[8]: https://github.com/zaidoon1/
[9]: https://github.com/rust-lang/cargo/issues/12162

View File

@@ -51,7 +51,13 @@ ## Can I try it out?
Check out the [documentation](https://continuwuity.org) for installation instructions.
There are currently no open registration continuwuity instances available.
If you want to try it out as a user, we have some partnered homeservers you can use:
* You can head over to [https://federated.nexus](https://federated.nexus/) in your browser.
* Hit the `Apply to Join` button. Once your request has been accepted, you will receive an email with your username and password.
* Head over to [https://app.federated.nexus](https://app.federated.nexus/) and you can sign in there, or use any other matrix chat client you wish elsewhere.
* Your username for matrix will be in the form of `@username:federated.nexus`, however you can simply use the `username` part to log in. Your password is your password.
* There's also [https://continuwuity.rocks/](https://continuwuity.rocks/). You can register a new account using Cinny via [this convenient link](https://app.cinny.in/register/continuwuity.rocks), or you can use Element or another matrix client *that supports registration*.
## What are we working on?

View File

@@ -4,6 +4,11 @@
"name": "config",
"label": "Configuration"
},
{
"type": "file",
"name": "environment-variables",
"label": "Environment Variables"
},
{
"type": "file",
"name": "admin",

View File

@@ -36,3 +36,7 @@ ## `!admin media delete-all-from-user`
## `!admin media delete-all-from-server`
Deletes all remote media from the specified remote server. This will always ignore errors by default
## `!admin media delete-url-preview`
Deletes a cached URL preview, forcing it to be re-fetched. Use --all to purge all cached URL previews

View File

@@ -0,0 +1,281 @@
# Environment Variables
Continuwuity can be configured entirely through environment variables, making it
ideal for containerised deployments and infrastructure-as-code scenarios.
This is a convenience reference and may not be exhaustive. The
[Configuration Reference](./config.mdx) is the primary source for all
configuration options.
## Prefix System
Continuwuity supports three environment variable prefixes for backwards
compatibility:
- `CONTINUWUITY_*` (current, recommended)
- `CONDUWUIT_*` (compatibility)
- `CONDUIT_*` (legacy)
All three prefixes work identically. Use double underscores (`__`) to represent
nested configuration sections from the TOML config.
**Examples:**
```bash
# Simple top-level config
CONTINUWUITY_SERVER_NAME="matrix.example.com"
CONTINUWUITY_PORT="8008"
# Nested config sections use double underscores
# This maps to [database] section in TOML
CONTINUWUITY_DATABASE__PATH="/var/lib/continuwuity"
# This maps to [tls] section in TOML
CONTINUWUITY_TLS__CERTS="/path/to/cert.pem"
```
## Configuration File Override
You can specify a custom configuration file path:
- `CONTINUWUITY_CONFIG` - Path to continuwuity.toml (current)
- `CONDUWUIT_CONFIG` - Path to config file (compatibility)
- `CONDUIT_CONFIG` - Path to config file (legacy)
## Essential Variables
These are the minimum variables needed for a working deployment:
| Variable | Description | Default |
| ---------------------------- | ---------------------------------- | ---------------------- |
| `CONTINUWUITY_SERVER_NAME` | Your Matrix server's domain name | Required |
| `CONTINUWUITY_DATABASE_PATH` | Path to RocksDB database directory | `/var/lib/conduwuit` |
| `CONTINUWUITY_ADDRESS` | IP address to bind to | `["127.0.0.1", "::1"]` |
| `CONTINUWUITY_PORT` | Port to listen on | `8008` |
## Network Configuration
| Variable | Description | Default |
| -------------------------------- | ----------------------------------------------- | ---------------------- |
| `CONTINUWUITY_ADDRESS` | Bind address (use `0.0.0.0` for all interfaces) | `["127.0.0.1", "::1"]` |
| `CONTINUWUITY_PORT` | HTTP port | `8008` |
| `CONTINUWUITY_UNIX_SOCKET_PATH` | UNIX socket path (alternative to TCP) | - |
| `CONTINUWUITY_UNIX_SOCKET_PERMS` | Socket permissions (octal) | `660` |
## Database Configuration
| Variable | Description | Default |
| ------------------------------------------ | --------------------------- | -------------------- |
| `CONTINUWUITY_DATABASE_PATH` | RocksDB data directory | `/var/lib/conduwuit` |
| `CONTINUWUITY_DATABASE_BACKUP_PATH` | Backup directory | - |
| `CONTINUWUITY_DATABASE_BACKUPS_TO_KEEP` | Number of backups to retain | `1` |
| `CONTINUWUITY_DB_CACHE_CAPACITY_MB` | Database read cache (MB) | - |
| `CONTINUWUITY_DB_WRITE_BUFFER_CAPACITY_MB` | Write cache (MB) | - |
## Cache Configuration
| Variable | Description |
| ---------------------------------------- | ------------------------ |
| `CONTINUWUITY_CACHE_CAPACITY_MODIFIER` | LRU cache multiplier |
| `CONTINUWUITY_PDU_CACHE_CAPACITY` | PDU cache entries |
| `CONTINUWUITY_AUTH_CHAIN_CACHE_CAPACITY` | Auth chain cache entries |
## DNS Configuration
Configure DNS resolution behaviour for federation and external requests.
| Variable | Description | Default |
| ------------------------------------ | ---------------------------- | -------- |
| `CONTINUWUITY_DNS_CACHE_ENTRIES` | Max DNS cache entries | `32768` |
| `CONTINUWUITY_DNS_MIN_TTL` | Minimum cache TTL (seconds) | `10800` |
| `CONTINUWUITY_DNS_MIN_TTL_NXDOMAIN` | NXDOMAIN cache TTL (seconds) | `259200` |
| `CONTINUWUITY_DNS_ATTEMPTS` | Retry attempts | - |
| `CONTINUWUITY_DNS_TIMEOUT` | Query timeout (seconds) | - |
| `CONTINUWUITY_DNS_TCP_FALLBACK` | Allow TCP fallback | - |
| `CONTINUWUITY_QUERY_ALL_NAMESERVERS` | Query all nameservers | - |
| `CONTINUWUITY_QUERY_OVER_TCP_ONLY` | TCP-only queries | - |
## Request Configuration
| Variable | Description |
| ------------------------------------ | ----------------------------- |
| `CONTINUWUITY_MAX_REQUEST_SIZE` | Max HTTP request size (bytes) |
| `CONTINUWUITY_REQUEST_CONN_TIMEOUT` | Connection timeout (seconds) |
| `CONTINUWUITY_REQUEST_TIMEOUT` | Overall request timeout |
| `CONTINUWUITY_REQUEST_TOTAL_TIMEOUT` | Total timeout |
| `CONTINUWUITY_REQUEST_IDLE_TIMEOUT` | Idle timeout |
| `CONTINUWUITY_REQUEST_IDLE_PER_HOST` | Idle connections per host |
## Federation Configuration
Control how your server federates with other Matrix servers.
| Variable | Description | Default |
| ---------------------------------------------- | ----------------------------- | ------- |
| `CONTINUWUITY_ALLOW_FEDERATION` | Enable federation | `true` |
| `CONTINUWUITY_FEDERATION_LOOPBACK` | Allow loopback federation | - |
| `CONTINUWUITY_FEDERATION_CONN_TIMEOUT` | Connection timeout | - |
| `CONTINUWUITY_FEDERATION_TIMEOUT` | Request timeout | - |
| `CONTINUWUITY_FEDERATION_IDLE_TIMEOUT` | Idle timeout | - |
| `CONTINUWUITY_FEDERATION_IDLE_PER_HOST` | Idle connections per host | - |
| `CONTINUWUITY_TRUSTED_SERVERS` | JSON array of trusted servers | - |
| `CONTINUWUITY_QUERY_TRUSTED_KEY_SERVERS_FIRST` | Query trusted first | - |
| `CONTINUWUITY_ONLY_QUERY_TRUSTED_KEY_SERVERS` | Only query trusted | - |
**Example:**
```bash
# Trust matrix.org for key verification
CONTINUWUITY_TRUSTED_SERVERS='["matrix.org"]'
```
## Registration & User Configuration
Control user registration and account creation behaviour.
| Variable | Description | Default |
| ------------------------------------------ | --------------------- | ------- |
| `CONTINUWUITY_ALLOW_REGISTRATION` | Enable registration | `true` |
| `CONTINUWUITY_REGISTRATION_TOKEN` | Token requirement | - |
| `CONTINUWUITY_SUSPEND_ON_REGISTER` | Suspend new accounts | - |
| `CONTINUWUITY_NEW_USER_DISPLAYNAME_SUFFIX` | Display name suffix | 🏳️‍⚧️ |
| `CONTINUWUITY_RECAPTCHA_SITE_KEY` | reCAPTCHA site key | - |
| `CONTINUWUITY_RECAPTCHA_PRIVATE_SITE_KEY` | reCAPTCHA private key | - |
**Example:**
```bash
# Disable open registration
CONTINUWUITY_ALLOW_REGISTRATION="false"
# Require a registration token
CONTINUWUITY_REGISTRATION_TOKEN="your_secret_token_here"
```
## Feature Configuration
| Variable | Description | Default |
| ---------------------------------------------------------- | -------------------------- | ------- |
| `CONTINUWUITY_ALLOW_ENCRYPTION` | Enable E2EE | `true` |
| `CONTINUWUITY_ALLOW_ROOM_CREATION` | Enable room creation | - |
| `CONTINUWUITY_ALLOW_UNSTABLE_ROOM_VERSIONS` | Allow unstable versions | - |
| `CONTINUWUITY_DEFAULT_ROOM_VERSION` | Default room version | `v11` |
| `CONTINUWUITY_REQUIRE_AUTH_FOR_PROFILE_REQUESTS` | Auth for profiles | - |
| `CONTINUWUITY_ALLOW_PUBLIC_ROOM_DIRECTORY_OVER_FEDERATION` | Federate directory | - |
| `CONTINUWUITY_ALLOW_PUBLIC_ROOM_DIRECTORY_WITHOUT_AUTH` | Unauth directory | - |
| `CONTINUWUITY_ALLOW_DEVICE_NAME_FEDERATION` | Device names in federation | - |
## TLS Configuration
Built-in TLS support is primarily for testing. **For production deployments,
especially when federating on the internet, use a reverse proxy** (Traefik,
Caddy, nginx) to handle TLS termination.
| Variable | Description |
| --------------------------------- | ------------------------- |
| `CONTINUWUITY_TLS__CERTS` | TLS certificate file path |
| `CONTINUWUITY_TLS__KEY` | TLS private key path |
| `CONTINUWUITY_TLS__DUAL_PROTOCOL` | Support TLS 1.2 + 1.3 |
**Example (testing only):**
```bash
CONTINUWUITY_TLS__CERTS="/etc/letsencrypt/live/matrix.example.com/fullchain.pem"
CONTINUWUITY_TLS__KEY="/etc/letsencrypt/live/matrix.example.com/privkey.pem"
```
## Logging Configuration
Control log output format and verbosity.
| Variable | Description | Default |
| ------------------------------ | ------------------ | ------- |
| `CONTINUWUITY_LOG` | Log filter level | - |
| `CONTINUWUITY_LOG_COLORS` | ANSI colours | `true` |
| `CONTINUWUITY_LOG_SPAN_EVENTS` | Log span events | `none` |
| `CONTINUWUITY_LOG_THREAD_IDS` | Include thread IDs | - |
**Examples:**
```bash
# Set log level to info
CONTINUWUITY_LOG="info"
# Enable debug logging for specific modules
CONTINUWUITY_LOG="warn,continuwuity::api=debug"
# Disable colours for log aggregation
CONTINUWUITY_LOG_COLORS="false"
```
## Observability Configuration
| Variable | Description |
| ---------------------------------------- | --------------------- |
| `CONTINUWUITY_ALLOW_OTLP` | Enable OpenTelemetry |
| `CONTINUWUITY_OTLP_FILTER` | OTLP filter level |
| `CONTINUWUITY_OTLP_PROTOCOL` | Protocol (http/grpc) |
| `CONTINUWUITY_TRACING_FLAME` | Enable flame graphs |
| `CONTINUWUITY_TRACING_FLAME_FILTER` | Flame graph filter |
| `CONTINUWUITY_TRACING_FLAME_OUTPUT_PATH` | Output directory |
| `CONTINUWUITY_SENTRY` | Enable Sentry |
| `CONTINUWUITY_SENTRY_ENDPOINT` | Sentry DSN |
| `CONTINUWUITY_SENTRY_SEND_SERVER_NAME` | Include server name |
| `CONTINUWUITY_SENTRY_TRACES_SAMPLE_RATE` | Sample rate (0.0-1.0) |
## Admin Configuration
Configure admin users and automated command execution.
| Variable | Description | Default |
| ------------------------------------------ | -------------------------------- | ----------------- |
| `CONTINUWUITY_ADMINS_LIST` | JSON array of admin user IDs | - |
| `CONTINUWUITY_ADMINS_FROM_ROOM` | Derive admins from room | - |
| `CONTINUWUITY_ADMIN_ESCAPE_COMMANDS` | Allow `\` prefix in public rooms | - |
| `CONTINUWUITY_ADMIN_CONSOLE_AUTOMATIC` | Auto-activate console | - |
| `CONTINUWUITY_ADMIN_EXECUTE` | JSON array of startup commands | - |
| `CONTINUWUITY_ADMIN_EXECUTE_ERRORS_IGNORE` | Ignore command errors | - |
| `CONTINUWUITY_ADMIN_SIGNAL_EXECUTE` | Commands on SIGUSR2 | - |
| `CONTINUWUITY_ADMIN_ROOM_TAG` | Admin room tag | `m.server_notice` |
**Examples:**
```bash
# Create admin user on startup
CONTINUWUITY_ADMIN_EXECUTE='["users create-user admin", "users make-user-admin admin"]'
# Specify admin users directly
CONTINUWUITY_ADMINS_LIST='["@alice:example.com", "@bob:example.com"]'
```
## Media & URL Preview Configuration
| Variable | Description |
| ---------------------------------------------------- | ------------------ |
| `CONTINUWUITY_URL_PREVIEW_BOUND_INTERFACE` | Bind interface |
| `CONTINUWUITY_URL_PREVIEW_DOMAIN_CONTAINS_ALLOWLIST` | Domain allowlist |
| `CONTINUWUITY_URL_PREVIEW_DOMAIN_EXPLICIT_ALLOWLIST` | Explicit allowlist |
| `CONTINUWUITY_URL_PREVIEW_DOMAIN_EXPLICIT_DENYLIST` | Explicit denylist |
| `CONTINUWUITY_URL_PREVIEW_MAX_SPIDER_SIZE` | Max fetch size |
| `CONTINUWUITY_URL_PREVIEW_TIMEOUT` | Fetch timeout |
| `CONTINUWUITY_IP_RANGE_DENYLIST` | IP range denylist |
## Tokio Runtime Configuration
These can be set as environment variables or CLI arguments:
| Variable | Description |
| ----------------------------------------- | -------------------------- |
| `TOKIO_WORKER_THREADS` | Worker thread count |
| `TOKIO_GLOBAL_QUEUE_INTERVAL` | Global queue interval |
| `TOKIO_EVENT_INTERVAL` | Event interval |
| `TOKIO_MAX_IO_EVENTS_PER_TICK` | Max I/O events per tick |
| `CONTINUWUITY_RUNTIME_HISTOGRAM_INTERVAL` | Histogram bucket size (μs) |
| `CONTINUWUITY_RUNTIME_HISTOGRAM_BUCKETS` | Bucket count |
| `CONTINUWUITY_RUNTIME_WORKER_AFFINITY` | Enable worker affinity |
## See Also
- [Configuration Reference](./config.mdx) - Complete TOML configuration
documentation
- [Admin Commands](./admin/) - Admin command reference

View File

@@ -30,12 +30,15 @@ pub(super) async fn incoming_federation(&self) -> Result {
.federation_handletime
.read();
let mut msg = format!("Handling {} incoming pdus:\n", map.len());
let mut msg = format!(
"Handling {} incoming PDUs across {} active transactions:\n",
map.len(),
self.services.transactions.txn_active_handle_count()
);
for (r, (e, i)) in map.iter() {
let elapsed = i.elapsed();
writeln!(msg, "{} {}: {}m{}s", r, e, elapsed.as_secs() / 60, elapsed.as_secs() % 60)?;
}
msg
};

View File

@@ -388,3 +388,19 @@ pub(super) async fn get_remote_thumbnail(
self.write_str(&format!("```\n{result:#?}\nreceived {len} bytes for file content.\n```"))
.await
}
#[admin_command]
pub(super) async fn delete_url_preview(&self, url: Option<String>, all: bool) -> Result {
if all {
self.services.media.clear_url_previews().await;
return self.write_str("Deleted all cached URL previews.").await;
}
let url = url.expect("clap enforces url is required unless --all");
self.services.media.remove_url_preview(&url).await?;
self.write_str(&format!("Deleted cached URL preview for: {url}"))
.await
}

View File

@@ -108,4 +108,16 @@ pub enum MediaCommand {
#[arg(long, default_value("800"))]
height: u32,
},
/// Deletes a cached URL preview, forcing it to be re-fetched.
/// Use --all to purge all cached URL previews.
DeleteUrlPreview {
/// The URL to clear from the saved preview data
#[arg(required_unless_present = "all")]
url: Option<String>,
/// Purge all cached URL previews
#[arg(long, conflicts_with = "url")]
all: bool,
},
}

View File

@@ -209,7 +209,7 @@ pub(super) async fn compact(
let parallelism = parallelism.unwrap_or(1);
let results = maps
.into_iter()
.try_stream()
.try_stream::<conduwuit::Error>()
.paralleln_and_then(runtime, parallelism, move |map| {
map.compact_blocking(options.clone())?;
Ok(map.name().to_owned())

View File

@@ -20,7 +20,17 @@ pub enum ResolverCommand {
name: Option<String>,
},
/// Flush a specific server from the resolver caches or everything
/// Flush a given server from the resolver caches or flush them completely
///
/// * Examples:
/// * Flush a specific server:
///
/// `!admin query resolver flush-cache matrix.example.com`
///
/// * Flush all resolver caches completely:
///
/// `!admin query resolver flush-cache --all`
#[command(verbatim_doc_comment)]
FlushCache {
name: Option<OwnedServerName>,

View File

@@ -3,7 +3,7 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Event, Result, debug_info, err, error, info,
Err, Error, Event, Result, debug_info, err, error, info,
matrix::pdu::PduBuilder,
utils::{self, ReadyExt, stream::BroadbandExt},
warn,
@@ -387,7 +387,7 @@ pub(crate) async fn register_route(
)
.await?;
if !worked {
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
}
// Success!
},
@@ -401,7 +401,7 @@ pub(crate) async fn register_route(
&uiaainfo,
json,
);
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
},
| _ => {
return Err!(Request(NotJson("JSON body is not valid")));
@@ -661,7 +661,7 @@ pub(crate) async fn change_password_route(
.await?;
if !worked {
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
}
// Success!
@@ -673,7 +673,7 @@ pub(crate) async fn change_password_route(
.uiaa
.create(sender_user, body.sender_device(), &uiaainfo, json);
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
},
| _ => {
return Err!(Request(NotJson("JSON body is not valid")));
@@ -791,7 +791,7 @@ pub(crate) async fn deactivate_route(
.await?;
if !worked {
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
}
// Success!
},
@@ -802,7 +802,7 @@ pub(crate) async fn deactivate_route(
.uiaa
.create(sender_user, body.sender_device(), &uiaainfo, json);
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
},
| _ => {
return Err!(Request(NotJson("JSON body is not valid")));

View File

@@ -1,6 +1,6 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, debug, err, utils};
use conduwuit::{Err, Error, Result, debug, err, utils};
use futures::StreamExt;
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedDeviceId,
@@ -232,7 +232,7 @@ pub(crate) async fn delete_devices_route(
.await?;
if !worked {
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
}
// Success!
},
@@ -243,10 +243,10 @@ pub(crate) async fn delete_devices_route(
.uiaa
.create(sender_user, sender_device, &uiaainfo, json);
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
},
| _ => {
return Err!(BadRequest(ErrorKind::NotJson, "Not json."));
return Err(Error::BadRequest(ErrorKind::NotJson, "Not json."));
},
},
}

View File

@@ -5,7 +5,7 @@
use axum::extract::State;
use conduwuit::{
Err, Result, debug, debug_warn, err,
Err, Error, Result, debug, debug_warn, err,
result::NotFound,
utils,
utils::{IterStream, stream::WidebandExt},
@@ -215,7 +215,7 @@ pub(crate) async fn upload_signing_keys_route(
.await?;
if !worked {
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
}
// Success!
},
@@ -226,10 +226,10 @@ pub(crate) async fn upload_signing_keys_route(
.uiaa
.create(sender_user, sender_device, &uiaainfo, json);
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
},
| _ => {
return Err!(BadRequest(ErrorKind::NotJson, "Not json."));
return Err(Error::BadRequest(ErrorKind::NotJson, "Not json."));
},
},
}
@@ -396,12 +396,12 @@ pub(crate) async fn get_key_changes_route(
let from = body
.from
.parse()
.map_err(|_| err!(BadRequest(ErrorKind::InvalidParam, "Invalid `from`.")))?;
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid `from`."))?;
let to = body
.to
.parse()
.map_err(|_| err!(BadRequest(ErrorKind::InvalidParam, "Invalid `to`.")))?;
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid `to`."))?;
device_list_updates.extend(
services

View File

@@ -3,9 +3,10 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, err, error,
Err, Result, err,
utils::{self, content_disposition::make_content_disposition, math::ruma_from_usize},
};
use conduwuit_core::error;
use conduwuit_service::{
Services,
media::{CACHE_CONTROL_IMMUTABLE, CORP_CROSS_ORIGIN, Dim, FileMeta, MXC_LENGTH},
@@ -69,7 +70,7 @@ pub(crate) async fn create_content_route(
.create(mxc, Some(user), Some(&content_disposition), content_type, &body.file)
.await
{
error!("Failed to save uploaded media: {e}");
err!("Failed to save uploaded media: {e}");
return Err!(Request(Unknown("Failed to save uploaded media")));
}
@@ -144,12 +145,22 @@ pub(crate) async fn get_content_route(
server_name: &body.server_name,
media_id: &body.media_id,
};
let FileMeta {
content,
content_type,
content_disposition,
} = fetch_file(&services, &mxc, user, body.timeout_ms, None).await?;
} = match fetch_file(&services, &mxc, user, body.timeout_ms, None).await {
| Ok(meta) => meta,
| Err(conduwuit::Error::Io(e)) => match e.kind() {
| std::io::ErrorKind::NotFound => return Err!(Request(NotFound("Media not found."))),
| std::io::ErrorKind::PermissionDenied => {
error!("Permission denied when trying to read file: {e:?}");
return Err!(Request(Unknown("Unknown error when fetching file.")));
},
| _ => return Err!(Request(Unknown("Unknown error when fetching file."))),
},
| Err(_) => return Err!(Request(Unknown("Unknown error when fetching file."))),
};
Ok(get_content::v1::Response {
file: content.expect("entire file contents"),
@@ -185,7 +196,18 @@ pub(crate) async fn get_content_as_filename_route(
content,
content_type,
content_disposition,
} = fetch_file(&services, &mxc, user, body.timeout_ms, Some(&body.filename)).await?;
} = match fetch_file(&services, &mxc, user, body.timeout_ms, None).await {
| Ok(meta) => meta,
| Err(conduwuit::Error::Io(e)) => match e.kind() {
| std::io::ErrorKind::NotFound => return Err!(Request(NotFound("Media not found."))),
| std::io::ErrorKind::PermissionDenied => {
error!("Permission denied when trying to read file: {e:?}");
return Err!(Request(Unknown("Unknown error when fetching file.")));
},
| _ => return Err!(Request(Unknown("Unknown error when fetching file."))),
},
| Err(_) => return Err!(Request(Unknown("Unknown error when fetching file."))),
};
Ok(get_content_as_filename::v1::Response {
file: content.expect("entire file contents"),

View File

@@ -1,7 +1,7 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, at, debug_warn,
Err, Error, Result, at, debug_warn,
matrix::{
event::{Event, Matches},
pdu::PduCount,
@@ -322,7 +322,7 @@ pub(crate) async fn is_ignored_pdu<Pdu>(
if server_ignored {
// the sender's server is ignored, so ignore this event
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::SenderIgnored { sender: None },
"The sender's server is ignored by this server.",
));
@@ -331,7 +331,7 @@ pub(crate) async fn is_ignored_pdu<Pdu>(
if user_ignored && !services.config.send_messages_from_ignored_users_to_client {
// the recipient of this PDU has the sender ignored, and we're not
// configured to send ignored messages to clients
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::SenderIgnored { sender: Some(event.sender().to_owned()) },
"You have ignored this sender.",
));

View File

@@ -1,5 +1,5 @@
use axum::extract::State;
use conduwuit::{Err, Result, err};
use conduwuit::{Err, Error, Result, err};
use conduwuit_service::Services;
use ruma::{
CanonicalJsonObject, CanonicalJsonValue,
@@ -243,27 +243,27 @@ pub(crate) async fn set_pushrule_route(
body.before.as_deref(),
) {
let err = match error {
| InsertPushRuleError::ServerDefaultRuleId => err!(BadRequest(
| InsertPushRuleError::ServerDefaultRuleId => Error::BadRequest(
ErrorKind::InvalidParam,
"Rule IDs starting with a dot are reserved for server-default rules.",
)),
| InsertPushRuleError::InvalidRuleId => err!(BadRequest(
),
| InsertPushRuleError::InvalidRuleId => Error::BadRequest(
ErrorKind::InvalidParam,
"Rule ID containing invalid characters.",
)),
| InsertPushRuleError::RelativeToServerDefaultRule => err!(BadRequest(
),
| InsertPushRuleError::RelativeToServerDefaultRule => Error::BadRequest(
ErrorKind::InvalidParam,
"Can't place a push rule relatively to a server-default rule.",
)),
| InsertPushRuleError::UnknownRuleId => err!(BadRequest(
),
| InsertPushRuleError::UnknownRuleId => Error::BadRequest(
ErrorKind::NotFound,
"The before or after rule could not be found.",
)),
| InsertPushRuleError::BeforeHigherThanAfter => err!(BadRequest(
),
| InsertPushRuleError::BeforeHigherThanAfter => Error::BadRequest(
ErrorKind::InvalidParam,
"The before rule has a higher priority than the after rule.",
)),
| _ => err!(BadRequest(ErrorKind::InvalidParam, "Invalid data.")),
),
| _ => Error::BadRequest(ErrorKind::InvalidParam, "Invalid data."),
};
return Err(err);
@@ -433,13 +433,13 @@ pub(crate) async fn delete_pushrule_route(
.remove(body.kind.clone(), &body.rule_id)
{
let err = match error {
| RemovePushRuleError::ServerDefault => err!(BadRequest(
| RemovePushRuleError::ServerDefault => Error::BadRequest(
ErrorKind::InvalidParam,
"Cannot delete a server-default pushrule.",
)),
),
| RemovePushRuleError::NotFound =>
err!(BadRequest(ErrorKind::NotFound, "Push rule not found.")),
| _ => err!(BadRequest(ErrorKind::InvalidParam, "Invalid data.")),
Error::BadRequest(ErrorKind::NotFound, "Push rule not found."),
| _ => Error::BadRequest(ErrorKind::InvalidParam, "Invalid data."),
};
return Err(err);

View File

@@ -2,7 +2,7 @@
use axum::extract::State;
use conduwuit::{
Err, Event, Result, RoomVersion, debug, err, info,
Err, Error, Event, Result, RoomVersion, debug, err, info,
matrix::{StateKey, pdu::PduBuilder},
};
use futures::{FutureExt, StreamExt};
@@ -58,7 +58,7 @@ pub(crate) async fn upgrade_room_route(
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if !services.server.supported_room_version(&body.new_version) {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::UnsupportedRoomVersion,
"This server does not support that room version.",
));
@@ -170,7 +170,7 @@ pub(crate) async fn upgrade_room_route(
"creator".into(),
json!(&sender_user).try_into().map_err(|e| {
info!("Error forming creation event: {e}");
err!(BadRequest(ErrorKind::BadJson, "Error forming creation event"))
Error::BadRequest(ErrorKind::BadJson, "Error forming creation event")
})?,
);
},
@@ -186,13 +186,13 @@ pub(crate) async fn upgrade_room_route(
"room_version".into(),
json!(&body.new_version)
.try_into()
.map_err(|_| err!(BadRequest(ErrorKind::BadJson, "Error forming creation event")))?,
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Error forming creation event"))?,
);
create_event_content.insert(
"predecessor".into(),
json!(predecessor)
.try_into()
.map_err(|_| err!(BadRequest(ErrorKind::BadJson, "Error forming creation event")))?,
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Error forming creation event"))?,
);
// Validate creation event content
@@ -203,7 +203,7 @@ pub(crate) async fn upgrade_room_route(
)
.is_err()
{
return Err!(BadRequest(ErrorKind::BadJson, "Error forming creation event"));
return Err(Error::BadRequest(ErrorKind::BadJson, "Error forming creation event"));
}
let create_event_id = services

View File

@@ -50,8 +50,8 @@ pub(crate) async fn send_message_event_route(
// Check if this is a new transaction id
if let Ok(response) = services
.transaction_ids
.existing_txnid(sender_user, sender_device, &body.txn_id)
.transactions
.get_client_txn(sender_user, sender_device, &body.txn_id)
.await
{
// The client might have sent a txnid of the /sendToDevice endpoint
@@ -92,7 +92,7 @@ pub(crate) async fn send_message_event_route(
)
.await?;
services.transaction_ids.add_txnid(
services.transactions.add_client_txnid(
sender_user,
sender_device,
&body.txn_id,

View File

@@ -3,7 +3,7 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, debug, err, info,
Err, Error, Result, debug, err, info,
utils::{self, ReadyExt, hash},
warn,
};
@@ -191,7 +191,7 @@ pub(crate) async fn handle_login(
}
if services.users.is_locked(&user_id).await? {
return Err!(BadRequest(ErrorKind::UserLocked, "This account has been locked."));
return Err(Error::BadRequest(ErrorKind::UserLocked, "This account has been locked."));
}
if services.users.is_login_disabled(&user_id).await {
@@ -390,7 +390,7 @@ pub(crate) async fn login_token_route(
.await?;
if !worked {
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
}
// Success!
@@ -402,7 +402,7 @@ pub(crate) async fn login_token_route(
.uiaa
.create(sender_user, sender_device, &uiaainfo, json);
return Err!(Uiaa(uiaainfo));
return Err(Error::Uiaa(uiaainfo));
},
| _ => {
return Err!(Request(NotJson("No JSON body was sent when required.")));

View File

@@ -1,7 +1,7 @@
use std::collections::BTreeMap;
use axum::extract::State;
use conduwuit::{Result, err};
use conduwuit::{Error, Result};
use conduwuit_service::sending::EduBuf;
use futures::StreamExt;
use ruma::{
@@ -26,8 +26,8 @@ pub(crate) async fn send_event_to_device_route(
// Check if this is a new transaction id
if services
.transaction_ids
.existing_txnid(sender_user, sender_device, &body.txn_id)
.transactions
.get_client_txn(sender_user, sender_device, &body.txn_id)
.await
.is_ok()
{
@@ -66,7 +66,7 @@ pub(crate) async fn send_event_to_device_route(
let event = event
.deserialize_as()
.map_err(|_| err!(BadRequest(ErrorKind::InvalidParam, "Event is invalid")))?;
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Event is invalid"))?;
match target_device_id_maybe {
| DeviceIdOrAllDevices::DeviceId(target_device_id) => {
@@ -104,8 +104,8 @@ pub(crate) async fn send_event_to_device_route(
// Save transaction id with empty data
services
.transaction_ids
.add_txnid(sender_user, sender_device, &body.txn_id, &[]);
.transactions
.add_client_txnid(sender_user, sender_device, &body.txn_id, &[]);
Ok(send_event_to_device::v3::Response {})
}

View File

@@ -1,8 +1,11 @@
use axum::{Json, extract::State, response::IntoResponse};
use conduwuit::{Err, Result};
use ruma::api::client::discovery::{
discover_homeserver::{self, HomeserverInfo, SlidingSyncProxyInfo},
discover_support::{self, Contact},
use conduwuit::{Error, Result};
use ruma::api::client::{
discovery::{
discover_homeserver::{self, HomeserverInfo, SlidingSyncProxyInfo},
discover_support::{self, Contact},
},
error::ErrorKind,
};
use crate::Ruma;
@@ -16,7 +19,7 @@ pub(crate) async fn well_known_client(
) -> Result<discover_homeserver::Response> {
let client_url = match services.config.well_known.client.as_ref() {
| Some(url) => url.to_string(),
| None => return Err!(BadRequest(ErrorKind::NotFound, "Not found.")),
| None => return Err(Error::BadRequest(ErrorKind::NotFound, "Not found.")),
};
Ok(discover_homeserver::Response {
@@ -85,7 +88,7 @@ pub(crate) async fn well_known_support(
if contacts.is_empty() && support_page.is_none() {
// No admin room, no configured contacts, and no support page
return Err!(BadRequest(ErrorKind::NotFound, "Not found."));
return Err(Error::BadRequest(ErrorKind::NotFound, "Not found."));
}
Ok(discover_support::Response { contacts, support_page })
@@ -102,7 +105,7 @@ pub(crate) async fn syncv3_client_server_json(
| Some(url) => url.to_string(),
| None => match services.config.well_known.server.as_ref() {
| Some(url) => url.to_string(),
| None => return Err!(BadRequest(ErrorKind::NotFound, "Not found.")),
| None => return Err(Error::BadRequest(ErrorKind::NotFound, "Not found.")),
},
};

View File

@@ -4,7 +4,7 @@
headers::{Authorization, authorization::Bearer},
typed_header::TypedHeaderRejectionReason,
};
use conduwuit::{Err, Result, debug_error, err, warn};
use conduwuit::{Err, Error, Result, debug_error, err, warn};
use futures::{
TryFutureExt,
future::{
@@ -14,7 +14,8 @@
pin_mut,
};
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, OwnedDeviceId, OwnedServerName, OwnedUserId, UserId,
CanonicalJsonObject, CanonicalJsonValue, DeviceId, OwnedDeviceId, OwnedServerName,
OwnedUserId, UserId,
api::{
AuthScheme, IncomingRequest, Metadata,
client::{
@@ -77,7 +78,7 @@ pub(super) async fn auth(
// already
},
| Token::None | Token::Invalid => {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::MissingToken,
"Missing or invalid access token.",
));
@@ -96,7 +97,7 @@ pub(super) async fn auth(
// already
},
| Token::None | Token::Invalid => {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::MissingToken,
"Missing or invalid access token.",
));
@@ -130,10 +131,10 @@ pub(super) async fn auth(
appservice_info: None,
})
} else {
Err!(BadRequest(ErrorKind::MissingToken, "Missing access token."))
Err(Error::BadRequest(ErrorKind::MissingToken, "Missing access token."))
}
},
| _ => Err!(BadRequest(ErrorKind::MissingToken, "Missing access token.")),
| _ => Err(Error::BadRequest(ErrorKind::MissingToken, "Missing access token.")),
},
| (
AuthScheme::AccessToken | AuthScheme::AccessTokenOptional | AuthScheme::None,
@@ -149,7 +150,7 @@ pub(super) async fn auth(
&ruma::api::client::session::logout::v3::Request::METADATA
| &ruma::api::client::session::logout_all::v3::Request::METADATA
) {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::UserLocked,
"This account has been locked.",
));
@@ -174,11 +175,11 @@ pub(super) async fn auth(
appservice_info: None,
}),
| (AuthScheme::ServerSignatures, Token::Appservice(_) | Token::User(_)) =>
Err!(BadRequest(
Err(Error::BadRequest(
ErrorKind::Unauthorized,
"Only server signatures should be used on this endpoint.",
)),
| (AuthScheme::AppserviceToken, Token::User(_)) => Err!(BadRequest(
| (AuthScheme::AppserviceToken, Token::User(_)) => Err(Error::BadRequest(
ErrorKind::Unauthorized,
"Only appservice access tokens should be used on this endpoint.",
)),
@@ -196,13 +197,13 @@ pub(super) async fn auth(
appservice_info: None,
})
} else {
Err!(BadRequest(
Err(Error::BadRequest(
ErrorKind::UnknownToken { soft_logout: false },
"Unknown access token.",
))
}
},
| (_, Token::Invalid) => Err!(BadRequest(
| (_, Token::Invalid) => Err(Error::BadRequest(
ErrorKind::UnknownToken { soft_logout: false },
"Unknown access token.",
)),
@@ -234,10 +235,33 @@ async fn auth_appservice(
return Err!(Request(Exclusive("User is not in namespace.")));
}
// MSC3202/MSC4190: Handle device_id masquerading for appservices.
// The device_id can be provided via `device_id` or
// `org.matrix.msc3202.device_id` query parameter.
let sender_device = if let Some(ref device_id_str) = request.query.device_id {
let device_id: &DeviceId = device_id_str.as_str().into();
// Verify the device exists for this user
if services
.users
.get_device_metadata(&user_id, device_id)
.await
.is_err()
{
return Err!(Request(Forbidden(
"Device does not exist for user or appservice cannot masquerade as this device."
)));
}
Some(device_id.to_owned())
} else {
None
};
Ok(Auth {
origin: None,
sender_user: Some(user_id),
sender_device: None,
sender_device,
appservice_info: Some(*info),
})
}

View File

@@ -11,6 +11,10 @@
pub(super) struct QueryParams {
pub(super) access_token: Option<String>,
pub(super) user_id: Option<String>,
/// Device ID for appservice device masquerading (MSC3202/MSC4190).
/// Can be provided as `device_id` or `org.matrix.msc3202.device_id`.
#[serde(alias = "org.matrix.msc3202.device_id")]
pub(super) device_id: Option<String>,
}
pub(super) struct Request {

View File

@@ -1,9 +1,12 @@
use std::{borrow::Borrow, iter::once};
use axum::extract::State;
use conduwuit::{Err, Error, Result, err, info, utils::stream::ReadyExt};
use conduwuit::{Err, Error, Result, info, utils::stream::ReadyExt};
use futures::StreamExt;
use ruma::{RoomId, api::federation::authorization::get_event_authorization};
use ruma::{
RoomId,
api::{client::error::ErrorKind, federation::authorization::get_event_authorization},
};
use super::AccessCheck;
use crate::Ruma;
@@ -44,7 +47,7 @@ pub(crate) async fn get_event_authorization_route(
.timeline
.get_pdu_json(&body.event_id)
.await
.map_err(|_| err!(BadRequest(ErrorKind::NotFound, "Event not found.")))?;
.map_err(|_| Error::BadRequest(ErrorKind::NotFound, "Event not found."))?;
let room_id_str = event
.get("room_id")

View File

@@ -2,7 +2,7 @@
use axum_client_ip::InsecureClientIp;
use base64::{Engine as _, engine::general_purpose};
use conduwuit::{
Err, PduEvent, Result, err, error,
Err, Error, PduEvent, Result, err, error,
matrix::{Event, event::gen_event_id},
utils::{self, hash::sha256},
warn,
@@ -33,7 +33,7 @@ pub(crate) async fn create_invite_route(
.await?;
if !services.server.supported_room_version(&body.room_version) {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::IncompatibleRoomVersion { room_version: body.room_version.clone() },
"Server does not support this room version.",
));

View File

@@ -1,7 +1,7 @@
use std::borrow::ToOwned;
use axum::extract::State;
use conduwuit::{Err, Result, debug, debug_info, info, matrix::pdu::PduBuilder, warn};
use conduwuit::{Err, Error, Result, debug, debug_info, info, matrix::pdu::PduBuilder, warn};
use conduwuit_service::Services;
use futures::StreamExt;
use ruma::{
@@ -80,7 +80,7 @@ pub(crate) async fn create_join_event_template_route(
let room_version_id = services.rooms.state.get_room_version(&body.room_id).await?;
if !body.ver.contains(&room_version_id) {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::IncompatibleRoomVersion { room_version: room_version_id },
"Room version not supported.",
));

View File

@@ -1,6 +1,6 @@
use RoomVersionId::*;
use axum::extract::State;
use conduwuit::{Err, Result, debug_warn, info, matrix::pdu::PduBuilder, warn};
use conduwuit::{Err, Error, Result, debug_warn, info, matrix::pdu::PduBuilder, warn};
use ruma::{
RoomVersionId,
api::{client::error::ErrorKind, federation::knock::create_knock_event_template},
@@ -67,14 +67,14 @@ pub(crate) async fn create_knock_event_template_route(
let room_version_id = services.rooms.state.get_room_version(&body.room_id).await?;
if matches!(room_version_id, V1 | V2 | V3 | V4 | V5 | V6) {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::IncompatibleRoomVersion { room_version: room_version_id },
"Room version does not support knocking.",
));
}
if !body.ver.contains(&room_version_id) {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::IncompatibleRoomVersion { room_version: room_version_id },
"Your homeserver does not support the features required to knock on this room.",
));

View File

@@ -1,6 +1,6 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, err};
use conduwuit::{Error, Result};
use ruma::{
api::{
client::error::ErrorKind,
@@ -25,7 +25,7 @@ pub(crate) async fn get_public_rooms_filtered_route(
.config
.allow_public_room_directory_over_federation
{
return Err!(BadRequest(ErrorKind::forbidden(), "Room directory is not public"));
return Err(Error::BadRequest(ErrorKind::forbidden(), "Room directory is not public"));
}
let response = crate::client::get_public_rooms_filtered_helper(
@@ -38,10 +38,7 @@ pub(crate) async fn get_public_rooms_filtered_route(
)
.await
.map_err(|_| {
err!(BadRequest(
ErrorKind::Unknown,
"Failed to return this server's public room list."
))
Error::BadRequest(ErrorKind::Unknown, "Failed to return this server's public room list.")
})?;
Ok(get_public_rooms_filtered::v1::Response {
@@ -65,7 +62,7 @@ pub(crate) async fn get_public_rooms_route(
.globals
.allow_public_room_directory_over_federation()
{
return Err!(BadRequest(ErrorKind::forbidden(), "Room directory is not public"));
return Err(Error::BadRequest(ErrorKind::forbidden(), "Room directory is not public"));
}
let response = crate::client::get_public_rooms_filtered_helper(
@@ -78,10 +75,7 @@ pub(crate) async fn get_public_rooms_route(
)
.await
.map_err(|_| {
err!(BadRequest(
ErrorKind::Unknown,
"Failed to return this server's public room list."
))
Error::BadRequest(ErrorKind::Unknown, "Failed to return this server's public room list.")
})?;
Ok(get_public_rooms::v1::Response {

View File

@@ -1,7 +1,7 @@
use std::collections::BTreeMap;
use axum::extract::State;
use conduwuit::{Err, Result, err};
use conduwuit::{Error, Result, err};
use futures::StreamExt;
use get_profile_information::v1::ProfileField;
use rand::seq::SliceRandom;
@@ -67,16 +67,17 @@ pub(crate) async fn get_profile_information_route(
.config
.allow_inbound_profile_lookup_federation_requests
{
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"Profile lookup over federation is not allowed on this homeserver.",
));
}
if !services.globals.server_is_ours(body.user_id.server_name()) {
return Err!(
BadRequest(ErrorKind::InvalidParam, "User does not belong to this server.",)
);
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"User does not belong to this server.",
));
}
let mut displayname = None;

View File

@@ -1,27 +1,33 @@
use std::{collections::BTreeMap, net::IpAddr, time::Instant};
use std::{
collections::{BTreeMap, HashMap, HashSet},
net::IpAddr,
time::{Duration, Instant},
};
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Error, Result, debug, debug_warn, err, error,
result::LogErr,
state_res::lexicographical_topological_sort,
trace,
utils::{
IterStream, ReadyExt, millis_since_unix_epoch,
stream::{BroadbandExt, TryBroadbandExt, automatic_width},
},
warn,
};
use conduwuit_service::{
Services,
sending::{EDU_LIMIT, PDU_LIMIT},
};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use http::StatusCode;
use itertools::Itertools;
use ruma::{
CanonicalJsonObject, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, ServerName, UserId,
CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId,
RoomId, ServerName, UserId,
api::{
client::error::ErrorKind,
client::error::{ErrorKind, ErrorKind::LimitExceeded},
federation::transactions::{
edu::{
DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent,
@@ -32,9 +38,16 @@
},
},
events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
int,
serde::Raw,
to_device::DeviceIdOrAllDevices,
uint,
};
use service::transactions::{
FederationTxnState, TransactionError, TxnKey, WrappedTransactionResponse,
};
use tokio::sync::watch::{Receiver, Sender};
use tracing::instrument;
use crate::Ruma;
@@ -44,15 +57,6 @@
/// # `PUT /_matrix/federation/v1/send/{txnId}`
///
/// Push EDUs and PDUs to this server.
#[tracing::instrument(
name = "txn",
level = "debug",
skip_all,
fields(
%client,
origin = body.origin().as_str()
),
)]
pub(crate) async fn send_transaction_message_route(
State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp,
@@ -76,16 +80,73 @@ pub(crate) async fn send_transaction_message_route(
)));
}
let txn_start_time = Instant::now();
trace!(
pdus = body.pdus.len(),
edus = body.edus.len(),
elapsed = ?txn_start_time.elapsed(),
id = %body.transaction_id,
origin = %body.origin(),
"Starting txn",
);
let txn_key = (body.origin().to_owned(), body.transaction_id.clone());
// Atomically check cache, join active, or start new transaction
match services
.transactions
.get_or_start_federation_txn(txn_key.clone())?
{
| FederationTxnState::Cached(response) => {
// Already responded
Ok(response)
},
| FederationTxnState::Active(receiver) => {
// Another thread is processing
wait_for_result(receiver).await
},
| FederationTxnState::Started { receiver, sender } => {
// We're the first, spawn the processing task
services
.server
.runtime()
.spawn(process_inbound_transaction(services, body, client, txn_key, sender));
// and wait for it
wait_for_result(receiver).await
},
}
}
async fn wait_for_result(
mut recv: Receiver<WrappedTransactionResponse>,
) -> Result<send_transaction_message::v1::Response> {
if tokio::time::timeout(Duration::from_secs(50), recv.changed())
.await
.is_err()
{
// Took too long, return 429 to encourage the sender to try again
return Err(Error::BadRequest(
LimitExceeded { retry_after: None },
"Transaction is being still being processed. Please try again later.",
));
}
let value = recv.borrow_and_update();
match value.clone() {
| Some(Ok(response)) => Ok(response),
| Some(Err(err)) => Err(transaction_error_to_response(&err)),
| None => Err(Error::Request(
ErrorKind::Unknown,
"Transaction processing failed unexpectedly".into(),
StatusCode::INTERNAL_SERVER_ERROR,
)),
}
}
#[instrument(
skip_all,
fields(
id = ?body.transaction_id.as_str(),
origin = ?body.origin()
)
)]
async fn process_inbound_transaction(
services: crate::State,
body: Ruma<send_transaction_message::v1::Request>,
client: IpAddr,
txn_key: TxnKey,
sender: Sender<WrappedTransactionResponse>,
) {
let txn_start_time = Instant::now();
let pdus = body
.pdus
.iter()
@@ -102,40 +163,79 @@ pub(crate) async fn send_transaction_message_route(
.filter_map(Result::ok)
.stream();
let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?;
debug!(pdus = body.pdus.len(), edus = body.edus.len(), "Processing transaction",);
let results = match handle(&services, &client, body.origin(), pdus, edus).await {
| Ok(results) => results,
| Err(err) => {
fail_federation_txn(services, &txn_key, &sender, err);
return;
},
};
for (id, result) in &results {
if let Err(e) = result {
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
debug_warn!("Incoming PDU failed {id}: {e:?}");
}
}
}
debug!(
pdus = body.pdus.len(),
edus = body.edus.len(),
elapsed = ?txn_start_time.elapsed(),
id = %body.transaction_id,
origin = %body.origin(),
"Finished txn",
"Finished processing transaction"
);
for (id, result) in &results {
if let Err(e) = result {
if matches!(e, Error::BadRequest { kind: ErrorKind::NotFound, .. }) {
warn!("Incoming PDU failed {id}: {e:?}");
}
}
}
Ok(send_transaction_message::v1::Response {
let response = send_transaction_message::v1::Response {
pdus: results
.into_iter()
.map(|(e, r)| (e, r.map_err(error::sanitized_message)))
.collect(),
})
};
services
.transactions
.finish_federation_txn(txn_key, sender, response);
}
/// Handles a failed federation transaction by sending the error through
/// the channel and cleaning up the transaction state. This allows waiters to
/// receive an appropriate error response.
fn fail_federation_txn(
services: crate::State,
txn_key: &TxnKey,
sender: &Sender<WrappedTransactionResponse>,
err: TransactionError,
) {
debug!("Transaction failed: {err}");
// Remove from active state so the transaction can be retried
services.transactions.remove_federation_txn(txn_key);
// Send the error to any waiters
if let Err(e) = sender.send(Some(Err(err))) {
debug_warn!("Failed to send transaction error to receivers: {e}");
}
}
/// Converts a TransactionError into an appropriate HTTP error response.
fn transaction_error_to_response(err: &TransactionError) -> Error {
match err {
| TransactionError::ShuttingDown => Error::Request(
ErrorKind::Unknown,
"Server is shutting down, please retry later".into(),
StatusCode::SERVICE_UNAVAILABLE,
),
}
}
async fn handle(
services: &Services,
client: &IpAddr,
origin: &ServerName,
started: Instant,
pdus: impl Stream<Item = Pdu> + Send,
edus: impl Stream<Item = Edu> + Send,
) -> Result<ResolvedMap> {
) -> std::result::Result<ResolvedMap, TransactionError> {
// group pdus by room
let pdus = pdus
.collect()
@@ -152,7 +252,7 @@ async fn handle(
.into_iter()
.try_stream()
.broad_and_then(|(room_id, pdus): (_, Vec<_>)| {
handle_room(services, client, origin, started, room_id, pdus.into_iter())
handle_room(services, client, origin, room_id, pdus.into_iter())
.map_ok(Vec::into_iter)
.map_ok(IterStream::try_stream)
})
@@ -169,14 +269,51 @@ async fn handle(
Ok(results)
}
/// Attempts to build a localised directed acyclic graph out of the given PDUs,
/// returning them in a topologically sorted order.
///
/// This is used to attempt to process PDUs in an order that respects their
/// dependencies, however it is ultimately the sender's responsibility to send
/// them in a processable order, so this is just a best effort attempt. It does
/// not account for power levels or other tie breaks.
async fn build_local_dag(
pdu_map: &HashMap<OwnedEventId, CanonicalJsonObject>,
) -> Result<Vec<OwnedEventId>> {
debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs");
let mut dag: HashMap<OwnedEventId, HashSet<OwnedEventId>> = HashMap::new();
for (event_id, value) in pdu_map {
let prev_events = value
.get("prev_events")
.expect("pdu must have prev_events")
.as_array()
.expect("prev_events must be an array")
.iter()
.map(|v| {
OwnedEventId::parse(v.as_str().expect("prev_events values must be strings"))
.expect("prev_events must be valid event IDs")
})
.collect::<HashSet<OwnedEventId>>();
dag.insert(event_id.clone(), prev_events);
}
lexicographical_topological_sort(&dag, &|_| async {
// Note: we don't bother fetching power levels because that would massively slow
// this function down. This is a best-effort attempt to order events correctly
// for processing, however ultimately that should be the sender's job.
Ok((int!(0), MilliSecondsSinceUnixEpoch(uint!(0))))
})
.await
.map_err(|e| err!("failed to resolve local graph: {e}"))
}
async fn handle_room(
services: &Services,
_client: &IpAddr,
origin: &ServerName,
txn_start_time: Instant,
room_id: OwnedRoomId,
pdus: impl Iterator<Item = Pdu> + Send,
) -> Result<Vec<(OwnedEventId, Result)>> {
) -> std::result::Result<Vec<(OwnedEventId, Result)>, TransactionError> {
let _room_lock = services
.rooms
.event_handler
@@ -185,27 +322,40 @@ async fn handle_room(
.await;
let room_id = &room_id;
pdus.try_stream()
.and_then(|(_, event_id, value)| async move {
services.server.check_running()?;
let pdu_start_time = Instant::now();
let result = services
.rooms
.event_handler
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
.await
.map(|_| ());
debug!(
pdu_elapsed = ?pdu_start_time.elapsed(),
txn_elapsed = ?txn_start_time.elapsed(),
"Finished PDU {event_id}",
);
Ok((event_id, result))
let pdu_map: HashMap<OwnedEventId, CanonicalJsonObject> = pdus
.into_iter()
.map(|(_, event_id, value)| (event_id, value))
.collect();
// Try to sort PDUs by their dependencies, but fall back to arbitrary order on
// failure (e.g., cycles). This is best-effort; proper ordering is the sender's
// responsibility.
let sorted_event_ids = if pdu_map.len() >= 2 {
build_local_dag(&pdu_map).await.unwrap_or_else(|e| {
debug_warn!("Failed to build local DAG for room {room_id}: {e}");
pdu_map.keys().cloned().collect()
})
.try_collect()
.await
} else {
pdu_map.keys().cloned().collect()
};
let mut results = Vec::with_capacity(sorted_event_ids.len());
for event_id in sorted_event_ids {
let value = pdu_map
.get(&event_id)
.expect("sorted event IDs must be from the original map")
.clone();
services
.server
.check_running()
.map_err(|_| TransactionError::ShuttingDown)?;
let result = services
.rooms
.event_handler
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
.await
.map(|_| ());
results.push((event_id, result));
}
Ok(results)
}
async fn handle_edu(services: &Services, client: &IpAddr, origin: &ServerName, edu: Edu) {
@@ -478,8 +628,8 @@ async fn handle_edu_direct_to_device(
// Check if this is a new transaction id
if services
.transaction_ids
.existing_txnid(sender, None, message_id)
.transactions
.get_client_txn(sender, None, message_id)
.await
.is_ok()
{
@@ -498,8 +648,8 @@ async fn handle_edu_direct_to_device(
// Save transaction id with empty data
services
.transaction_ids
.add_txnid(sender, None, message_id, &[]);
.transactions
.add_client_txnid(sender, None, message_id, &[]);
}
async fn handle_edu_direct_to_device_user<Event: Send + Sync>(

View File

@@ -1,7 +1,7 @@
use std::time::Duration;
use axum::extract::State;
use conduwuit::{Err, Result};
use conduwuit::{Error, Result};
use futures::{FutureExt, StreamExt, TryFutureExt};
use ruma::api::{
client::error::ErrorKind,
@@ -24,7 +24,7 @@ pub(crate) async fn get_devices_route(
body: Ruma<get_devices::v1::Request>,
) -> Result<get_devices::v1::Response> {
if !services.globals.user_is_local(&body.user_id) {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Tried to access user from other server.",
));
@@ -86,9 +86,10 @@ pub(crate) async fn get_keys_route(
.iter()
.any(|(u, _)| !services.globals.user_is_local(u))
{
return Err!(
BadRequest(ErrorKind::InvalidParam, "User does not belong to this server.",)
);
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"User does not belong to this server.",
));
}
let result = get_keys_helper(
@@ -120,7 +121,7 @@ pub(crate) async fn claim_keys_route(
.iter()
.any(|(u, _)| !services.globals.user_is_local(u))
{
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Tried to access user from other server.",
));

View File

@@ -1,6 +1,6 @@
use axum::extract::State;
use conduwuit::{Err, Result};
use ruma::api::federation::discovery::discover_homeserver;
use conduwuit::{Error, Result};
use ruma::api::{client::error::ErrorKind, federation::discovery::discover_homeserver};
use crate::Ruma;
@@ -14,7 +14,7 @@ pub(crate) async fn well_known_server(
Ok(discover_homeserver::Response {
server: match services.server.config.well_known.server.as_ref() {
| Some(server_name) => server_name.to_owned(),
| None => return Err!(BadRequest(ErrorKind::NotFound, "Not found.")),
| None => return Err(Error::BadRequest(ErrorKind::NotFound, "Not found.")),
},
})
}

View File

@@ -98,8 +98,7 @@ serde-saphyr.workspace = true
serde.workspace = true
smallvec.workspace = true
smallstr.workspace = true
snafu.workspace = true
paste.workspace = true
thiserror.workspace = true
tikv-jemallocator.optional = true
tikv-jemallocator.workspace = true
tikv-jemalloc-ctl.optional = true

View File

@@ -368,6 +368,31 @@ pub struct Config {
#[serde(default = "default_max_fetch_prev_events")]
pub max_fetch_prev_events: u16,
/// How many incoming federation transactions the server is willing to be
/// processing at any given time before it becomes overloaded and starts
/// rejecting further transactions until some slots become available.
///
/// Setting this value too low or too high may result in unstable
/// federation, and setting it too high may cause runaway resource usage.
///
/// default: 150
#[serde(default = "default_max_concurrent_inbound_transactions")]
pub max_concurrent_inbound_transactions: usize,
/// Maximum age (in seconds) for cached federation transaction responses.
/// Entries older than this will be removed during cleanup.
///
/// default: 7200 (2 hours)
#[serde(default = "default_transaction_id_cache_max_age_secs")]
pub transaction_id_cache_max_age_secs: u64,
/// Maximum number of cached federation transaction responses.
/// When the cache exceeds this limit, older entries will be removed.
///
/// default: 8192
#[serde(default = "default_transaction_id_cache_max_entries")]
pub transaction_id_cache_max_entries: usize,
/// Default/base connection timeout (seconds). This is used only by URL
/// previews and update/news endpoint checks.
///
@@ -2540,6 +2565,12 @@ fn default_pusher_idle_timeout() -> u64 { 15 }
fn default_max_fetch_prev_events() -> u16 { 192_u16 }
fn default_max_concurrent_inbound_transactions() -> usize { 150 }
fn default_transaction_id_cache_max_age_secs() -> u64 { 60 * 60 * 2 }
fn default_transaction_id_cache_max_entries() -> usize { 8192 }
fn default_tracing_flame_filter() -> String {
cfg!(debug_assertions)
.then_some("trace,h2=off")

View File

@@ -45,162 +45,63 @@ macro_rules! Err {
macro_rules! err {
(Request(Forbidden($level:ident!($($args:tt)+)))) => {{
let mut buf = String::new();
$crate::error::Error::Request {
kind: $crate::ruma::api::client::error::ErrorKind::forbidden(),
message: $crate::err_log!(buf, $level, $($args)+),
code: $crate::http::StatusCode::BAD_REQUEST,
backtrace: Some($crate::snafu::Backtrace::capture()),
}
$crate::error::Error::Request(
$crate::ruma::api::client::error::ErrorKind::forbidden(),
$crate::err_log!(buf, $level, $($args)+),
$crate::http::StatusCode::BAD_REQUEST
)
}};
(Request(Forbidden($($args:tt)+))) => {
{
let message: std::borrow::Cow<'static, str> = $crate::format_maybe!($($args)+);
$crate::error::Error::Request {
kind: $crate::ruma::api::client::error::ErrorKind::forbidden(),
message,
code: $crate::http::StatusCode::BAD_REQUEST,
backtrace: Some($crate::snafu::Backtrace::capture()),
}
}
};
(Request(NotFound($level:ident!($($args:tt)+)))) => {{
let mut buf = String::new();
$crate::error::Error::Request {
kind: $crate::ruma::api::client::error::ErrorKind::NotFound,
message: $crate::err_log!(buf, $level, $($args)+),
code: $crate::http::StatusCode::BAD_REQUEST,
backtrace: None,
}
}};
(Request(NotFound($($args:tt)+))) => {
{
let message: std::borrow::Cow<'static, str> = $crate::format_maybe!($($args)+);
$crate::error::Error::Request {
kind: $crate::ruma::api::client::error::ErrorKind::NotFound,
message,
code: $crate::http::StatusCode::BAD_REQUEST,
backtrace: None,
}
}
$crate::error::Error::Request(
$crate::ruma::api::client::error::ErrorKind::forbidden(),
$crate::format_maybe!($($args)+),
$crate::http::StatusCode::BAD_REQUEST
)
};
(Request($variant:ident($level:ident!($($args:tt)+)))) => {{
let mut buf = String::new();
$crate::error::Error::Request {
kind: $crate::ruma::api::client::error::ErrorKind::$variant,
message: $crate::err_log!(buf, $level, $($args)+),
code: $crate::http::StatusCode::BAD_REQUEST,
backtrace: Some($crate::snafu::Backtrace::capture()),
}
$crate::error::Error::Request(
$crate::ruma::api::client::error::ErrorKind::$variant,
$crate::err_log!(buf, $level, $($args)+),
$crate::http::StatusCode::BAD_REQUEST
)
}};
(Request($variant:ident($($args:tt)+))) => {
{
let message: std::borrow::Cow<'static, str> = $crate::format_maybe!($($args)+);
$crate::error::Error::Request {
kind: $crate::ruma::api::client::error::ErrorKind::$variant,
message,
code: $crate::http::StatusCode::BAD_REQUEST,
backtrace: Some($crate::snafu::Backtrace::capture()),
}
}
$crate::error::Error::Request(
$crate::ruma::api::client::error::ErrorKind::$variant,
$crate::format_maybe!($($args)+),
$crate::http::StatusCode::BAD_REQUEST
)
};
(Config($item:literal, $($args:tt)+)) => {{
let mut buf = String::new();
$crate::error::ConfigSnafu {
directive: $item,
message: $crate::err_log!(buf, error, config = %$item, $($args)+),
}.build()
$crate::error::Error::Config($item, $crate::err_log!(buf, error, config = %$item, $($args)+))
}};
(BadRequest(ErrorKind::NotFound, $($args:tt)+)) => {
{
let message: std::borrow::Cow<'static, str> = $crate::format_maybe!($($args)+);
$crate::error::Error::Request {
kind: $crate::ruma::api::client::error::ErrorKind::NotFound,
message,
code: $crate::http::StatusCode::BAD_REQUEST,
backtrace: None,
}
}
};
(BadRequest($kind:expr, $($args:tt)+)) => {
{
let message: std::borrow::Cow<'static, str> = $crate::format_maybe!($($args)+);
$crate::error::BadRequestSnafu {
kind: $kind,
message,
}.build()
}
};
(FeatureDisabled($($args:tt)+)) => {
{
let feature: std::borrow::Cow<'static, str> = $crate::format_maybe!($($args)+);
$crate::error::FeatureDisabledSnafu { feature }.build()
}
};
(Federation($server:expr, $error:expr $(,)?)) => {
{
$crate::error::FederationSnafu {
server: $server,
error: $error,
}.build()
}
};
(InconsistentRoomState($message:expr, $room_id:expr $(,)?)) => {
{
$crate::error::InconsistentRoomStateSnafu {
message: $message,
room_id: $room_id,
}.build()
}
};
(Uiaa($info:expr $(,)?)) => {
{
$crate::error::UiaaSnafu {
info: $info,
}.build()
}
};
($variant:ident($level:ident!($($args:tt)+))) => {{
let mut buf = String::new();
$crate::paste::paste! {
$crate::error::[<$variant Snafu>] {
message: $crate::err_log!(buf, $level, $($args)+),
}.build()
}
$crate::error::Error::$variant($crate::err_log!(buf, $level, $($args)+))
}};
($variant:ident($($args:ident),+)) => {
$crate::error::Error::$variant($($args),+)
};
($variant:ident($($args:tt)+)) => {
$crate::paste::paste! {
{
let message: std::borrow::Cow<'static, str> = $crate::format_maybe!($($args)+);
$crate::error::[<$variant Snafu>] { message }.build()
}
}
$crate::error::Error::$variant($crate::format_maybe!($($args)+))
};
($level:ident!($($args:tt)+)) => {{
let mut buf = String::new();
let message: std::borrow::Cow<'static, str> = $crate::err_log!(buf, $level, $($args)+);
$crate::error::ErrSnafu { message }.build()
$crate::error::Error::Err($crate::err_log!(buf, $level, $($args)+))
}};
($($args:tt)+) => {
{
let message: std::borrow::Cow<'static, str> = $crate::format_maybe!($($args)+);
$crate::error::ErrSnafu { message }.build()
}
$crate::error::Error::Err($crate::format_maybe!($($args)+))
};
}
@@ -233,7 +134,7 @@ macro_rules! err_log {
};
($crate::error::visit)(&mut $out, LEVEL, &__CALLSITE, &mut valueset_all!(__CALLSITE.metadata().fields(), $($fields)+));
std::borrow::Cow::<'static, str>::from($out)
($out).into()
}}
}

View File

@@ -6,391 +6,151 @@
use std::{any::Any, borrow::Cow, convert::Infallible, sync::PoisonError};
use snafu::{IntoError, prelude::*};
pub use self::{err::visit, log::*};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
#[derive(thiserror::Error)]
pub enum Error {
#[snafu(display("PANIC!"))]
PanicAny {
panic: Box<dyn Any + Send>,
backtrace: snafu::Backtrace,
},
#[snafu(display("PANIC! {message}"))]
Panic {
message: &'static str,
panic: Box<dyn Any + Send + 'static>,
backtrace: snafu::Backtrace,
},
#[error("PANIC!")]
PanicAny(Box<dyn Any + Send>),
#[error("PANIC! {0}")]
Panic(&'static str, Box<dyn Any + Send + 'static>),
// std
#[snafu(display("Format error: {source}"))]
Fmt {
source: std::fmt::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("UTF-8 conversion error: {source}"))]
FromUtf8 {
source: std::string::FromUtf8Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("I/O error: {source}"))]
Io {
source: std::io::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("Parse float error: {source}"))]
ParseFloat {
source: std::num::ParseFloatError,
backtrace: snafu::Backtrace,
},
#[snafu(display("Parse int error: {source}"))]
ParseInt {
source: std::num::ParseIntError,
backtrace: snafu::Backtrace,
},
#[snafu(display("Error: {source}"))]
Std {
source: Box<dyn std::error::Error + Send>,
backtrace: snafu::Backtrace,
},
#[snafu(display("Thread access error: {source}"))]
ThreadAccessError {
source: std::thread::AccessError,
backtrace: snafu::Backtrace,
},
#[snafu(display("Integer conversion error: {source}"))]
TryFromInt {
source: std::num::TryFromIntError,
backtrace: snafu::Backtrace,
},
#[snafu(display("Slice conversion error: {source}"))]
TryFromSlice {
source: std::array::TryFromSliceError,
backtrace: snafu::Backtrace,
},
#[snafu(display("UTF-8 error: {source}"))]
Utf8 {
source: std::str::Utf8Error,
backtrace: snafu::Backtrace,
},
#[error(transparent)]
Fmt(#[from] std::fmt::Error),
#[error(transparent)]
FromUtf8(#[from] std::string::FromUtf8Error),
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error(transparent)]
ParseFloat(#[from] std::num::ParseFloatError),
#[error(transparent)]
ParseInt(#[from] std::num::ParseIntError),
#[error(transparent)]
Std(#[from] Box<dyn std::error::Error + Send>),
#[error(transparent)]
ThreadAccessError(#[from] std::thread::AccessError),
#[error(transparent)]
TryFromInt(#[from] std::num::TryFromIntError),
#[error(transparent)]
TryFromSlice(#[from] std::array::TryFromSliceError),
#[error(transparent)]
Utf8(#[from] std::str::Utf8Error),
// third-party
#[snafu(display("Capacity error: {source}"))]
CapacityError {
source: arrayvec::CapacityError,
backtrace: snafu::Backtrace,
},
#[snafu(display("Cargo.toml error: {source}"))]
CargoToml {
source: cargo_toml::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("Clap error: {source}"))]
Clap {
source: clap::error::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("Extension rejection: {source}"))]
Extension {
source: axum::extract::rejection::ExtensionRejection,
backtrace: snafu::Backtrace,
},
#[snafu(display("Figment error: {source}"))]
Figment {
source: figment::error::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("HTTP error: {source}"))]
Http {
source: http::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("Invalid HTTP header value: {source}"))]
HttpHeader {
source: http::header::InvalidHeaderValue,
backtrace: snafu::Backtrace,
},
#[snafu(display("Join error: {source}"))]
JoinError {
source: tokio::task::JoinError,
backtrace: snafu::Backtrace,
},
#[snafu(display("JSON error: {source}"))]
Json {
source: serde_json::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("JS parse int error: {source}"))]
JsParseInt {
source: ruma::JsParseIntError,
backtrace: snafu::Backtrace,
},
#[snafu(display("JS try from int error: {source}"))]
JsTryFromInt {
source: ruma::JsTryFromIntError,
backtrace: snafu::Backtrace,
},
#[snafu(display("Path rejection: {source}"))]
Path {
source: axum::extract::rejection::PathRejection,
backtrace: snafu::Backtrace,
},
#[snafu(display("Mutex poisoned: {message}"))]
Poison {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("Regex error: {source}"))]
Regex {
source: regex::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("Request error: {source}"))]
Reqwest {
source: reqwest::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("{message}"))]
SerdeDe {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("{message}"))]
SerdeSer {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("TOML deserialization error: {source}"))]
TomlDe {
source: toml::de::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("TOML serialization error: {source}"))]
TomlSer {
source: toml::ser::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("Tracing filter error: {source}"))]
TracingFilter {
source: tracing_subscriber::filter::ParseError,
backtrace: snafu::Backtrace,
},
#[snafu(display("Tracing reload error: {source}"))]
TracingReload {
source: tracing_subscriber::reload::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("Typed header rejection: {source}"))]
TypedHeader {
source: axum_extra::typed_header::TypedHeaderRejection,
backtrace: snafu::Backtrace,
},
#[snafu(display("YAML deserialization error: {source}"))]
YamlDe {
source: serde_saphyr::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("YAML serialization error: {source}"))]
YamlSer {
source: serde_saphyr::ser_error::Error,
backtrace: snafu::Backtrace,
},
#[error(transparent)]
CapacityError(#[from] arrayvec::CapacityError),
#[error(transparent)]
CargoToml(#[from] cargo_toml::Error),
#[error(transparent)]
Clap(#[from] clap::error::Error),
#[error(transparent)]
Extension(#[from] axum::extract::rejection::ExtensionRejection),
#[error(transparent)]
Figment(#[from] figment::error::Error),
#[error(transparent)]
Http(#[from] http::Error),
#[error(transparent)]
HttpHeader(#[from] http::header::InvalidHeaderValue),
#[error("Join error: {0}")]
JoinError(#[from] tokio::task::JoinError),
#[error(transparent)]
Json(#[from] serde_json::Error),
#[error(transparent)]
JsParseInt(#[from] ruma::JsParseIntError), // js_int re-export
#[error(transparent)]
JsTryFromInt(#[from] ruma::JsTryFromIntError), // js_int re-export
#[error(transparent)]
Path(#[from] axum::extract::rejection::PathRejection),
#[error("Mutex poisoned: {0}")]
Poison(Cow<'static, str>),
#[error("Regex error: {0}")]
Regex(#[from] regex::Error),
#[error("Request error: {0}")]
Reqwest(#[from] reqwest::Error),
#[error("{0}")]
SerdeDe(Cow<'static, str>),
#[error("{0}")]
SerdeSer(Cow<'static, str>),
#[error(transparent)]
TomlDe(#[from] toml::de::Error),
#[error(transparent)]
TomlSer(#[from] toml::ser::Error),
#[error("Tracing filter error: {0}")]
TracingFilter(#[from] tracing_subscriber::filter::ParseError),
#[error("Tracing reload error: {0}")]
TracingReload(#[from] tracing_subscriber::reload::Error),
#[error(transparent)]
TypedHeader(#[from] axum_extra::typed_header::TypedHeaderRejection),
#[error(transparent)]
YamlDe(#[from] serde_saphyr::Error),
#[error(transparent)]
YamlSer(#[from] serde_saphyr::ser_error::Error),
// ruma/conduwuit
#[snafu(display("Arithmetic operation failed: {message}"))]
Arithmetic {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("{kind}: {message}"))]
BadRequest {
kind: ruma::api::client::error::ErrorKind,
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("{message}"))]
BadServerResponse {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("Canonical JSON error: {source}"))]
CanonicalJson {
source: ruma::CanonicalJsonError,
backtrace: snafu::Backtrace,
},
#[snafu(display(
"There was a problem with the '{directive}' directive in your configuration: {message}"
))]
Config {
directive: &'static str,
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("{message}"))]
Conflict {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("Content disposition error: {source}"))]
ContentDisposition {
source: ruma::http_headers::ContentDispositionParseError,
backtrace: snafu::Backtrace,
},
#[snafu(display("{message}"))]
Database {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("Feature '{feature}' is not available on this server."))]
FeatureDisabled {
feature: Cow<'static, str>,
},
#[snafu(display("Remote server {server} responded with: {error}"))]
Federation {
server: ruma::OwnedServerName,
error: ruma::api::client::error::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("{message} in {room_id}"))]
InconsistentRoomState {
message: &'static str,
room_id: ruma::OwnedRoomId,
backtrace: snafu::Backtrace,
},
#[snafu(display("HTTP conversion error: {source}"))]
IntoHttp {
source: ruma::api::error::IntoHttpError,
backtrace: snafu::Backtrace,
},
#[snafu(display("{message}"))]
Ldap {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[snafu(display("MXC URI error: {source}"))]
Mxc {
source: ruma::MxcUriError,
backtrace: snafu::Backtrace,
},
#[snafu(display("Matrix ID parse error: {source}"))]
Mxid {
source: ruma::IdParseError,
backtrace: snafu::Backtrace,
},
#[snafu(display("from {server}: {error}"))]
Redaction {
server: ruma::OwnedServerName,
error: ruma::canonical_json::RedactionError,
backtrace: snafu::Backtrace,
},
#[snafu(display("{kind}: {message}"))]
Request {
kind: ruma::api::client::error::ErrorKind,
message: Cow<'static, str>,
code: http::StatusCode,
backtrace: Option<snafu::Backtrace>,
},
#[snafu(display("Ruma error: {source}"))]
Ruma {
source: ruma::api::client::error::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("Signature error: {source}"))]
Signatures {
source: ruma::signatures::Error,
backtrace: snafu::Backtrace,
},
#[snafu(display("State resolution error: {source}"))]
#[snafu(context(false))]
StateRes {
source: crate::state_res::Error,
},
#[snafu(display("uiaa"))]
Uiaa {
info: ruma::api::client::uiaa::UiaaInfo,
},
#[error("Arithmetic operation failed: {0}")]
Arithmetic(Cow<'static, str>),
#[error("{0}: {1}")]
BadRequest(ruma::api::client::error::ErrorKind, &'static str), //TODO: remove
#[error("{0}")]
BadServerResponse(Cow<'static, str>),
#[error(transparent)]
CanonicalJson(#[from] ruma::CanonicalJsonError),
#[error("There was a problem with the '{0}' directive in your configuration: {1}")]
Config(&'static str, Cow<'static, str>),
#[error("{0}")]
Conflict(Cow<'static, str>), // This is only needed for when a room alias already exists
#[error(transparent)]
ContentDisposition(#[from] ruma::http_headers::ContentDispositionParseError),
#[error("{0}")]
Database(Cow<'static, str>),
#[error("Feature '{0}' is not available on this server.")]
FeatureDisabled(Cow<'static, str>),
#[error("Remote server {0} responded with: {1}")]
Federation(ruma::OwnedServerName, ruma::api::client::error::Error),
#[error("{0} in {1}")]
InconsistentRoomState(&'static str, ruma::OwnedRoomId),
#[error(transparent)]
IntoHttp(#[from] ruma::api::error::IntoHttpError),
#[error("{0}")]
Ldap(Cow<'static, str>),
#[error(transparent)]
Mxc(#[from] ruma::MxcUriError),
#[error(transparent)]
Mxid(#[from] ruma::IdParseError),
#[error("from {0}: {1}")]
Redaction(ruma::OwnedServerName, ruma::canonical_json::RedactionError),
#[error("{0}: {1}")]
Request(ruma::api::client::error::ErrorKind, Cow<'static, str>, http::StatusCode),
#[error(transparent)]
Ruma(#[from] ruma::api::client::error::Error),
#[error(transparent)]
Signatures(#[from] ruma::signatures::Error),
#[error(transparent)]
StateRes(#[from] crate::state_res::Error),
#[error("uiaa")]
Uiaa(ruma::api::client::uiaa::UiaaInfo),
// unique / untyped
#[snafu(display("{message}"))]
Err {
message: Cow<'static, str>,
backtrace: snafu::Backtrace,
},
#[error("{0}")]
Err(Cow<'static, str>),
}
impl Error {
#[inline]
#[must_use]
pub fn from_errno() -> Self { IoSnafu {}.into_error(std::io::Error::last_os_error()) }
pub fn from_errno() -> Self { Self::Io(std::io::Error::last_os_error()) }
//#[deprecated]
#[must_use]
pub fn bad_database(message: &'static str) -> Self {
let message: Cow<'static, str> = message.into();
DatabaseSnafu { message }.build()
crate::err!(Database(error!("{message}")))
}
/// Sanitizes public-facing errors that can leak sensitive information.
pub fn sanitized_message(&self) -> String {
match self {
| Self::Database { .. } => String::from("Database error occurred."),
| Self::Io { .. } => String::from("I/O error occurred."),
| Self::Database(..) => String::from("Database error occurred."),
| Self::Io(..) => String::from("I/O error occurred."),
| _ => self.message(),
}
}
@@ -398,8 +158,8 @@ pub fn sanitized_message(&self) -> String {
/// Generate the error message string.
pub fn message(&self) -> String {
match self {
| Self::Federation { server, error, .. } => format!("Answer from {server}: {error}"),
| Self::Ruma { source, .. } => response::ruma_error_message(source),
| Self::Federation(origin, error) => format!("Answer from {origin}: {error}"),
| Self::Ruma(error) => response::ruma_error_message(error),
| _ => format!("{self}"),
}
}
@@ -410,10 +170,10 @@ pub fn kind(&self) -> ruma::api::client::error::ErrorKind {
use ruma::api::client::error::ErrorKind::{FeatureDisabled, Unknown};
match self {
| Self::Federation { error, .. } => response::ruma_error_kind(error).clone(),
| Self::Ruma { source, .. } => response::ruma_error_kind(source).clone(),
| Self::BadRequest { kind, .. } | Self::Request { kind, .. } => kind.clone(),
| Self::FeatureDisabled { .. } => FeatureDisabled,
| Self::Federation(_, error) | Self::Ruma(error) =>
response::ruma_error_kind(error).clone(),
| Self::BadRequest(kind, ..) | Self::Request(kind, ..) => kind.clone(),
| Self::FeatureDisabled(..) => FeatureDisabled,
| _ => Unknown,
}
}
@@ -424,15 +184,13 @@ pub fn status_code(&self) -> http::StatusCode {
use http::StatusCode;
match self {
| Self::Federation { error, .. } => error.status_code,
| Self::Ruma { source, .. } => source.status_code,
| Self::Request { kind, code, .. } => response::status_code(kind, *code),
| Self::BadRequest { kind, .. } => response::bad_request_code(kind),
| Self::FeatureDisabled { .. } => response::bad_request_code(&self.kind()),
| Self::Reqwest { source, .. } =>
source.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
| Self::Conflict { .. } => StatusCode::CONFLICT,
| Self::Io { source, .. } => response::io_error_code(source.kind()),
| Self::Federation(_, error) | Self::Ruma(error) => error.status_code,
| Self::Request(kind, _, code) => response::status_code(kind, *code),
| Self::BadRequest(kind, ..) => response::bad_request_code(kind),
| Self::FeatureDisabled(..) => response::bad_request_code(&self.kind()),
| Self::Reqwest(error) => error.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
| Self::Conflict(_) => StatusCode::CONFLICT,
| Self::Io(error) => response::io_error_code(error.kind()),
| _ => StatusCode::INTERNAL_SERVER_ERROR,
}
}
@@ -445,46 +203,16 @@ pub fn status_code(&self) -> http::StatusCode {
pub fn is_not_found(&self) -> bool { self.status_code() == http::StatusCode::NOT_FOUND }
}
// Debug is already derived by Snafu
/// Macro to reduce boilerplate for From implementations using Snafu context
macro_rules! impl_from_snafu {
($source_ty:ty => $context:ident) => {
impl From<$source_ty> for Error {
fn from(source: $source_ty) -> Self { $context.into_error(source) }
}
};
}
/// Macro for From impls that format messages into ErrSnafu or other
/// message-based contexts
macro_rules! impl_from_message {
($source_ty:ty => $context:ident, $msg:expr) => {
impl From<$source_ty> for Error {
fn from(source: $source_ty) -> Self {
let message: Cow<'static, str> = format!($msg, source).into();
$context { message }.build()
}
}
};
}
/// Macro for From impls with constant messages (no formatting)
macro_rules! impl_from_const_message {
($source_ty:ty => $context:ident, $msg:expr) => {
impl From<$source_ty> for Error {
fn from(_source: $source_ty) -> Self {
let message: Cow<'static, str> = $msg.into();
$context { message }.build()
}
}
};
impl std::fmt::Debug for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.message())
}
}
impl<T> From<PoisonError<T>> for Error {
#[cold]
#[inline(never)]
fn from(e: PoisonError<T>) -> Self { PoisonSnafu { message: e.to_string() }.build() }
fn from(e: PoisonError<T>) -> Self { Self::Poison(e.to_string().into()) }
}
#[allow(clippy::fallible_impl_from)]
@@ -496,43 +224,6 @@ fn from(_e: Infallible) -> Self {
}
}
// Implementations using the macro
impl_from_snafu!(std::io::Error => IoSnafu);
impl_from_snafu!(std::string::FromUtf8Error => FromUtf8Snafu);
impl_from_snafu!(regex::Error => RegexSnafu);
impl_from_snafu!(ruma::http_headers::ContentDispositionParseError => ContentDispositionSnafu);
impl_from_snafu!(ruma::api::error::IntoHttpError => IntoHttpSnafu);
impl_from_snafu!(ruma::JsTryFromIntError => JsTryFromIntSnafu);
impl_from_snafu!(ruma::CanonicalJsonError => CanonicalJsonSnafu);
impl_from_snafu!(axum::extract::rejection::PathRejection => PathSnafu);
impl_from_snafu!(clap::error::Error => ClapSnafu);
impl_from_snafu!(ruma::MxcUriError => MxcSnafu);
impl_from_snafu!(serde_saphyr::ser_error::Error => YamlSerSnafu);
impl_from_snafu!(toml::de::Error => TomlDeSnafu);
impl_from_snafu!(http::header::InvalidHeaderValue => HttpHeaderSnafu);
impl_from_snafu!(serde_json::Error => JsonSnafu);
// Custom implementations using message formatting
impl_from_const_message!(std::fmt::Error => ErrSnafu, "formatting error");
impl_from_message!(std::str::Utf8Error => ErrSnafu, "UTF-8 error: {}");
impl_from_message!(std::num::TryFromIntError => ArithmeticSnafu, "integer conversion error: {}");
impl_from_message!(tracing_subscriber::reload::Error => ErrSnafu, "tracing reload error: {}");
impl_from_message!(reqwest::Error => ErrSnafu, "HTTP client error: {}");
impl_from_message!(ruma::signatures::Error => ErrSnafu, "Signature error: {}");
impl_from_message!(ruma::IdParseError => ErrSnafu, "ID parse error: {}");
impl_from_message!(std::num::ParseIntError => ErrSnafu, "Integer parse error: {}");
impl_from_message!(std::array::TryFromSliceError => ErrSnafu, "Slice conversion error: {}");
impl_from_message!(tokio::task::JoinError => ErrSnafu, "Task join error: {}");
impl_from_message!(serde_saphyr::Error => ErrSnafu, "YAML error: {}");
// Generic implementation for CapacityError
impl<T> From<arrayvec::CapacityError<T>> for Error {
fn from(_source: arrayvec::CapacityError<T>) -> Self {
let message: Cow<'static, str> = "capacity error: buffer is full".into();
ErrSnafu { message }.build()
}
}
#[cold]
#[inline(never)]
pub fn infallible(_e: &Infallible) {

View File

@@ -15,16 +15,13 @@ pub fn panic(self) -> ! { panic_any(self.into_panic()) }
#[must_use]
#[inline]
pub fn from_panic(e: Box<dyn Any + Send>) -> Self {
use super::PanicSnafu;
PanicSnafu { message: debug::panic_str(&e), panic: e }.build()
}
pub fn from_panic(e: Box<dyn Any + Send>) -> Self { Self::Panic(debug::panic_str(&e), e) }
#[inline]
pub fn into_panic(self) -> Box<dyn Any + Send + 'static> {
match self {
| Self::Panic { panic, .. } | Self::PanicAny { panic, .. } => panic,
| Self::JoinError { source, .. } => source.into_panic(),
| Self::Panic(_, e) | Self::PanicAny(e) => e,
| Self::JoinError(e) => e.into_panic(),
| _ => Box::new(self),
}
}
@@ -40,8 +37,8 @@ pub fn panic_str(self) -> Option<&'static str> {
#[inline]
pub fn is_panic(&self) -> bool {
match &self {
| Self::Panic { .. } | Self::PanicAny { .. } => true,
| Self::JoinError { source, .. } => source.is_panic(),
| Self::Panic(..) | Self::PanicAny(..) => true,
| Self::JoinError(e) => e.is_panic(),
| _ => false,
}
}

View File

@@ -47,8 +47,8 @@ fn into_response(self) -> axum::response::Response {
impl From<Error> for UiaaResponse {
#[inline]
fn from(error: Error) -> Self {
if let Error::Uiaa { info, .. } = error {
return Self::AuthResponse(info);
if let Error::Uiaa(uiaainfo) = error {
return Self::AuthResponse(uiaainfo);
}
let body = ErrorBody::Standard {

View File

@@ -5,15 +5,9 @@
use crate::Error;
impl de::Error for Error {
fn custom<T: Display + ToString>(msg: T) -> Self {
let message: std::borrow::Cow<'static, str> = msg.to_string().into();
super::SerdeDeSnafu { message }.build()
}
fn custom<T: Display + ToString>(msg: T) -> Self { Self::SerdeDe(msg.to_string().into()) }
}
impl ser::Error for Error {
fn custom<T: Display + ToString>(msg: T) -> Self {
let message: std::borrow::Cow<'static, str> = msg.to_string().into();
super::SerdeSerSnafu { message }.build()
}
fn custom<T: Display + ToString>(msg: T) -> Self { Self::SerdeSer(msg.to_string().into()) }
}

View File

@@ -14,6 +14,7 @@
static VERSION: OnceLock<String> = OnceLock::new();
static VERSION_UA: OnceLock<String> = OnceLock::new();
static USER_AGENT: OnceLock<String> = OnceLock::new();
static USER_AGENT_MEDIA: OnceLock<String> = OnceLock::new();
#[inline]
#[must_use]
@@ -21,14 +22,22 @@ pub fn name() -> &'static str { BRANDING }
#[inline]
pub fn version() -> &'static str { VERSION.get_or_init(init_version) }
#[inline]
pub fn version_ua() -> &'static str { VERSION_UA.get_or_init(init_version_ua) }
#[inline]
pub fn user_agent() -> &'static str { USER_AGENT.get_or_init(init_user_agent) }
#[inline]
pub fn user_agent_media() -> &'static str { USER_AGENT_MEDIA.get_or_init(init_user_agent_media) }
fn init_user_agent() -> String { format!("{}/{} (bot; +{WEBSITE})", name(), version_ua()) }
fn init_user_agent_media() -> String {
format!("{}/{} (embedbot; facebookexternalhit/1.1; +{WEBSITE})", name(), version_ua())
}
fn init_version_ua() -> String {
conduwuit_build_metadata::version_tag()
.map_or_else(|| SEMANTIC.to_owned(), |extra| format!("{SEMANTIC}+{extra}"))

View File

@@ -1,7 +1,7 @@
use ruma::{RoomVersionId, canonical_json::redact_content_in_place};
use serde_json::{Value as JsonValue, json, value::to_raw_value};
use crate::{Result, err, implement};
use crate::{Error, Result, err, implement};
#[implement(super::Pdu)]
pub fn redact(&mut self, room_version_id: &RoomVersionId, reason: JsonValue) -> Result {
@@ -10,15 +10,8 @@ pub fn redact(&mut self, room_version_id: &RoomVersionId, reason: JsonValue) ->
let mut content = serde_json::from_str(self.content.get())
.map_err(|e| err!(Request(BadJson("Failed to deserialize content into type: {e}"))))?;
redact_content_in_place(&mut content, room_version_id, self.kind.to_string()).map_err(
|error| {
crate::error::RedactionSnafu {
server: self.sender.server_name().to_owned(),
error,
}
.build()
},
)?;
redact_content_in_place(&mut content, room_version_id, self.kind.to_string())
.map_err(|e| Error::Redaction(self.sender.server_name().to_owned(), e))?;
let reason = serde_json::to_value(reason).expect("Failed to preserialize reason");

View File

@@ -27,7 +27,7 @@
use crate::{
matrix::{Event, Pdu, pdu::EventHash},
state_res::{self as state_res, Error, Result, StateMap, error::NotFoundSnafu},
state_res::{self as state_res, Error, Result, StateMap},
};
static SERVER_TIMESTAMP: AtomicU64 = AtomicU64::new(0);
@@ -170,12 +170,10 @@ fn resolve_deeper_event_set(c: &mut test::Bencher) {
#[allow(unused)]
impl<E: Event + Clone> TestStore<E> {
fn get_event(&self, room_id: &RoomId, event_id: &EventId) -> Result<E> {
self.0.get(event_id).cloned().ok_or_else(|| {
NotFoundSnafu {
message: format!("{} not found", event_id),
}
.build()
})
self.0
.get(event_id)
.cloned()
.ok_or_else(|| Error::NotFound(format!("{} not found", event_id)))
}
/// Returns the events that correspond to the `event_ids` sorted in the same

View File

@@ -1,40 +1,23 @@
use serde_json::Error as JsonError;
use snafu::{IntoError, prelude::*};
use thiserror::Error;
/// Represents the various errors that arise when resolving state.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum Error {
/// A deserialization error.
#[snafu(display("JSON error: {source}"))]
SerdeJson {
source: JsonError,
backtrace: snafu::Backtrace,
},
#[error(transparent)]
SerdeJson(#[from] JsonError),
/// The given option or version is unsupported.
#[snafu(display("Unsupported room version: {version}"))]
Unsupported {
version: String,
backtrace: snafu::Backtrace,
},
#[error("Unsupported room version: {0}")]
Unsupported(String),
/// The given event was not found.
#[snafu(display("Not found error: {message}"))]
NotFound {
message: String,
backtrace: snafu::Backtrace,
},
#[error("Not found error: {0}")]
NotFound(String),
/// Invalid fields in the given PDU.
#[snafu(display("Invalid PDU: {message}"))]
InvalidPdu {
message: String,
backtrace: snafu::Backtrace,
},
}
impl From<serde_json::Error> for Error {
fn from(source: serde_json::Error) -> Self { SerdeJsonSnafu.into_error(source) }
#[error("Invalid PDU: {0}")]
InvalidPdu(String),
}

View File

@@ -24,7 +24,6 @@
use super::{
Error, Event, Result, StateEventType, StateKey, TimelineEventType,
error::InvalidPduSnafu,
power_levels::{
deserialize_power_levels, deserialize_power_levels_content_fields,
deserialize_power_levels_content_invite, deserialize_power_levels_content_redact,
@@ -384,8 +383,8 @@ pub async fn auth_check<E, F, Fut>(
return Ok(false);
}
let target_user = <&UserId>::try_from(state_key)
.map_err(|e| InvalidPduSnafu { message: format!("{e}") }.build())?;
let target_user =
<&UserId>::try_from(state_key).map_err(|e| Error::InvalidPdu(format!("{e}")))?;
let user_for_join_auth = content
.join_authorised_via_users_server
@@ -462,7 +461,7 @@ pub async fn auth_check<E, F, Fut>(
?sender_membership_event_content,
"Sender membership event content missing membership field"
);
return Err(InvalidPduSnafu { message: "Missing membership field" }.build());
return Err(Error::InvalidPdu("Missing membership field".to_owned()));
};
let membership_state = membership_state.deserialize()?;

View File

@@ -29,18 +29,18 @@
};
use serde_json::from_str as from_json_str;
pub(crate) use self::error::{Error, InvalidPduSnafu, NotFoundSnafu};
pub(crate) use self::error::Error;
use self::power_levels::PowerLevelsContentFields;
pub use self::{
event_auth::{auth_check, auth_types_for_event},
room_version::RoomVersion,
};
use super::{Event, StateKey};
use crate::{
debug, debug_error,
debug, debug_error, err,
matrix::{Event, StateKey},
state_res::room_version::StateResolutionVersion,
trace,
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt},
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, WidebandExt},
warn,
};
@@ -118,10 +118,7 @@ pub async fn resolve<'a, Pdu, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, Ex
let csg = calculate_conflicted_subgraph(&conflicting, event_fetch)
.await
.ok_or_else(|| {
InvalidPduSnafu {
message: "Failed to calculate conflicted subgraph",
}
.build()
Error::InvalidPdu("Failed to calculate conflicted subgraph".to_owned())
})?;
debug!(count = csg.len(), "conflicted subgraph");
trace!(set = ?csg, "conflicted subgraph");
@@ -152,11 +149,10 @@ pub async fn resolve<'a, Pdu, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, Ex
let control_events: Vec<_> = all_conflicted
.iter()
.stream()
.broad_filter_map(async |id| {
event_fetch(id.clone())
.wide_filter_map(async |id| {
is_power_event_id(id, &event_fetch)
.await
.filter(|event| is_power_event(&event))
.map(|_| id.clone())
.then_some(id.clone())
})
.collect()
.await;
@@ -318,10 +314,7 @@ async fn calculate_conflicted_subgraph<F, Fut, E>(
trace!(event_id = event_id.as_str(), "fetching event for its auth events");
let evt = fetch_event(event_id.clone()).await;
if evt.is_none() {
tracing::error!(
"could not fetch event {} to calculate conflicted subgraph",
event_id
);
err!("could not fetch event {} to calculate conflicted subgraph", event_id);
path.pop();
continue;
}
@@ -409,11 +402,11 @@ async fn reverse_topological_power_sort<E, F, Fut>(
let fetcher = async |event_id: OwnedEventId| {
let pl = *event_to_pl
.get(&event_id)
.ok_or_else(|| NotFoundSnafu { message: "" }.build())?;
.ok_or_else(|| Error::NotFound(String::new()))?;
let ev = fetch_event(event_id)
.await
.ok_or_else(|| NotFoundSnafu { message: "" }.build())?;
.ok_or_else(|| Error::NotFound(String::new()))?;
Ok((pl, ev.origin_server_ts()))
};
@@ -619,12 +612,9 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
let events_to_check: Vec<_> = events_to_check
.map(Result::Ok)
.broad_and_then(async |event_id| {
fetch_event(event_id.to_owned()).await.ok_or_else(|| {
NotFoundSnafu {
message: format!("Failed to find {event_id}"),
}
.build()
})
fetch_event(event_id.to_owned())
.await
.ok_or_else(|| Error::NotFound(format!("Failed to find {event_id}")))
})
.try_collect()
.boxed()
@@ -663,7 +653,7 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
trace!(event_id = event.event_id().as_str(), "checking event");
let state_key = event
.state_key()
.ok_or_else(|| InvalidPduSnafu { message: "State event had no state key" }.build())?;
.ok_or_else(|| Error::InvalidPdu("State event had no state key".to_owned()))?;
let auth_types = auth_types_for_event(
event.event_type(),
@@ -679,14 +669,13 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
trace!("room version uses hashed IDs, manually fetching create event");
let create_event_id_raw = event.room_id_or_hash().as_str().replace('!', "$");
let create_event_id = EventId::parse(&create_event_id_raw).map_err(|e| {
InvalidPduSnafu {
message: format!("Failed to parse create event ID from room ID/hash: {e}"),
}
.build()
})?;
let create_event = fetch_event(create_event_id.into()).await.ok_or_else(|| {
NotFoundSnafu { message: "Failed to find create event" }.build()
Error::InvalidPdu(format!(
"Failed to parse create event ID from room ID/hash: {e}"
))
})?;
let create_event = fetch_event(create_event_id.into())
.await
.ok_or_else(|| Error::NotFound("Failed to find create event".into()))?;
auth_state.insert(create_event.event_type().with_state_key(""), create_event);
}
for aid in event.auth_events() {
@@ -697,7 +686,7 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
auth_state.insert(
ev.event_type()
.with_state_key(ev.state_key().ok_or_else(|| {
InvalidPduSnafu { message: "State event had no state key" }.build()
Error::InvalidPdu("State event had no state key".to_owned())
})?),
ev.clone(),
);
@@ -812,13 +801,13 @@ async fn mainline_sort<E, F, Fut>(
let event = fetch_event(p.clone())
.await
.ok_or_else(|| NotFoundSnafu { message: format!("Failed to find {p}") }.build())?;
.ok_or_else(|| Error::NotFound(format!("Failed to find {p}")))?;
pl = None;
for aid in event.auth_events() {
let ev = fetch_event(aid.to_owned()).await.ok_or_else(|| {
NotFoundSnafu { message: format!("Failed to find {aid}") }.build()
})?;
let ev = fetch_event(aid.to_owned())
.await
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
if is_type_and_key(&ev, &TimelineEventType::RoomPowerLevels, "") {
pl = Some(aid.to_owned());
@@ -880,9 +869,9 @@ async fn get_mainline_depth<E, F, Fut>(
event = None;
for aid in sort_ev.auth_events() {
let aev = fetch_event(aid.to_owned()).await.ok_or_else(|| {
NotFoundSnafu { message: format!("Failed to find {aid}") }.build()
})?;
let aev = fetch_event(aid.to_owned())
.await
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
if is_type_and_key(&aev, &TimelineEventType::RoomPowerLevels, "") {
event = Some(aev);
@@ -926,7 +915,6 @@ async fn add_event_and_auth_chain_to_graph<E, F, Fut>(
}
}
#[allow(dead_code)]
async fn is_power_event_id<E, F, Fut>(event_id: &EventId, fetch: &F) -> bool
where
F: Fn(OwnedEventId) -> Fut + Sync,

View File

@@ -1,6 +1,6 @@
use ruma::RoomVersionId;
use super::{Result, error::UnsupportedSnafu};
use super::{Error, Result};
#[derive(Debug)]
#[allow(clippy::exhaustive_enums)]
@@ -163,11 +163,7 @@ pub fn new(version: &RoomVersionId) -> Result<Self> {
| RoomVersionId::V10 => Self::V10,
| RoomVersionId::V11 => Self::V11,
| RoomVersionId::V12 => Self::V12,
| ver =>
return Err(UnsupportedSnafu {
version: format!("found version `{ver}`"),
}
.build()),
| ver => return Err(Error::Unsupported(format!("found version `{ver}`"))),
})
}
}

View File

@@ -22,7 +22,7 @@
value::{RawValue as RawJsonValue, to_raw_value as to_raw_json_value},
};
use super::{auth_types_for_event, error::NotFoundSnafu};
use super::auth_types_for_event;
use crate::{
Result, RoomVersion, info,
matrix::{Event, EventTypeExt, Pdu, StateMap, pdu::EventHash},
@@ -232,7 +232,7 @@ pub(crate) fn get_event(&self, _: &RoomId, event_id: &EventId) -> Result<E> {
self.0
.get(event_id)
.cloned()
.ok_or_else(|| NotFoundSnafu { message: format!("{event_id} not found") }.build())
.ok_or_else(|| super::Error::NotFound(format!("{event_id} not found")))
.map_err(Into::into)
}

View File

@@ -14,11 +14,9 @@
pub use ::arrayvec;
pub use ::http;
pub use ::paste;
pub use ::ruma;
pub use ::smallstr;
pub use ::smallvec;
pub use ::snafu;
pub use ::toml;
pub use ::tracing;
pub use config::Config;

View File

@@ -3,19 +3,17 @@
stream::{Stream, TryStream},
};
use crate::{Error, Result};
pub trait IterStream<I: IntoIterator + Send> {
/// Convert an Iterator into a Stream
fn stream(self) -> impl Stream<Item = <I as IntoIterator>::Item> + Send;
/// Convert an Iterator into a TryStream
fn try_stream(
/// Convert an Iterator into a TryStream with a generic error type
fn try_stream<E>(
self,
) -> impl TryStream<
Ok = <I as IntoIterator>::Item,
Error = Error,
Item = Result<<I as IntoIterator>::Item, Error>,
Error = E,
Item = Result<<I as IntoIterator>::Item, E>,
> + Send;
}
@@ -28,12 +26,12 @@ impl<I> IterStream<I> for I
fn stream(self) -> impl Stream<Item = <I as IntoIterator>::Item> + Send { stream::iter(self) }
#[inline]
fn try_stream(
fn try_stream<E>(
self,
) -> impl TryStream<
Ok = <I as IntoIterator>::Item,
Error = Error,
Item = Result<<I as IntoIterator>::Item, Error>,
Error = E,
Item = Result<<I as IntoIterator>::Item, E>,
> + Send {
self.stream().map(Ok)
}

View File

@@ -1,9 +1,10 @@
//! Synchronous combinator extensions to futures::TryStream
use std::result::Result;
use futures::{TryFuture, TryStream, TryStreamExt};
use super::automatic_width;
use crate::Result;
/// Concurrency extensions to augment futures::TryStreamExt. broad_ combinators
/// produce out-of-order

View File

@@ -2,8 +2,6 @@
use std::{cell::Cell, fmt::Debug, path::PathBuf, sync::LazyLock};
use snafu::IntoError;
use crate::{Result, is_equal_to};
type Id = usize;
@@ -144,9 +142,7 @@ pub fn getcpu() -> Result<usize> {
#[cfg(not(target_os = "linux"))]
#[inline]
pub fn getcpu() -> Result<usize> {
Err(crate::error::IoSnafu.into_error(std::io::ErrorKind::Unsupported.into()))
}
pub fn getcpu() -> Result<usize> { Err(crate::Error::Io(std::io::ErrorKind::Unsupported.into())) }
fn query_cores_available() -> impl Iterator<Item = Id> {
core_affinity::get_core_ids()

View File

@@ -255,10 +255,7 @@ fn deserialize_newtype_struct<V>(self, name: &'static str, visitor: V) -> Result
| "$serde_json::private::RawValue" => visitor.visit_map(self),
| "Cbor" => visitor
.visit_newtype_struct(&mut minicbor_serde::Deserializer::new(self.record_trail()))
.map_err(|e| {
let message: std::borrow::Cow<'static, str> = e.to_string().into();
conduwuit_core::error::SerdeDeSnafu { message }.build()
}),
.map_err(|e| Self::Error::SerdeDe(e.to_string().into())),
| _ => visitor.visit_newtype_struct(self),
}
@@ -316,10 +313,9 @@ fn deserialize_i64<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> {
let end = self.pos.saturating_add(BYTES).min(self.buf.len());
let bytes: ArrayVec<u8, BYTES> = self.buf[self.pos..end].try_into()?;
let bytes = bytes.into_inner().map_err(|_| {
let message: std::borrow::Cow<'static, str> = "i64 buffer underflow".into();
conduwuit_core::error::SerdeDeSnafu { message }.build()
})?;
let bytes = bytes
.into_inner()
.map_err(|_| Self::Error::SerdeDe("i64 buffer underflow".into()))?;
self.inc_pos(BYTES);
visitor.visit_i64(i64::from_be_bytes(bytes))
@@ -349,10 +345,9 @@ fn deserialize_u64<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> {
let end = self.pos.saturating_add(BYTES).min(self.buf.len());
let bytes: ArrayVec<u8, BYTES> = self.buf[self.pos..end].try_into()?;
let bytes = bytes.into_inner().map_err(|_| {
let message: std::borrow::Cow<'static, str> = "u64 buffer underflow".into();
conduwuit_core::error::SerdeDeSnafu { message }.build()
})?;
let bytes = bytes
.into_inner()
.map_err(|_| Self::Error::SerdeDe("u64 buffer underflow".into()))?;
self.inc_pos(BYTES);
visitor.visit_u64(u64::from_be_bytes(bytes))

View File

@@ -199,10 +199,7 @@ fn serialize_newtype_struct<T>(self, name: &'static str, value: &T) -> Result<Se
value
.serialize(&mut Serializer::new(&mut Writer::new(&mut self.out)))
.map_err(|e| {
let message: std::borrow::Cow<'static, str> = e.to_string().into();
conduwuit_core::error::SerdeSerSnafu { message }.build()
})
.map_err(|e| Self::Error::SerdeSer(e.to_string().into()))
},
| _ => unhandled!("Unrecognized serialization Newtype {name:?}"),
}

View File

@@ -1,4 +1,4 @@
use std::{borrow::Cow, sync::Arc};
use std::sync::Arc;
use axum::{Router, response::IntoResponse};
use conduwuit::Error;
@@ -18,10 +18,5 @@ pub(crate) fn build(services: &Arc<Services>) -> (Router, Guard) {
}
async fn not_found(_uri: Uri) -> impl IntoResponse {
Error::Request {
kind: ErrorKind::Unrecognized,
message: Cow::Borrowed("Not Found"),
code: StatusCode::NOT_FOUND,
backtrace: None,
}
Error::Request(ErrorKind::Unrecognized, "Not Found".into(), StatusCode::NOT_FOUND)
}

View File

@@ -147,11 +147,11 @@ pub async fn register_appservice(
// same appservice)
if let Ok(existing) = self.find_from_token(&registration.as_token).await {
if existing.registration.id != registration.id {
return Err!(Request(InvalidParam(
return Err(err!(Request(InvalidParam(
"Cannot register appservice: Token is already used by appservice '{}'. \
Please generate a different token.",
existing.registration.id
)));
))));
}
}
@@ -163,10 +163,10 @@ pub async fn register_appservice(
.await
.is_ok()
{
return Err!(Request(InvalidParam(
return Err(err!(Request(InvalidParam(
"Cannot register appservice: The provided token is already in use by a user \
device. Please generate a different token for the appservice."
)));
))));
}
self.db

View File

@@ -39,7 +39,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let url_preview_user_agent = config
.url_preview_user_agent
.clone()
.unwrap_or_else(|| conduwuit::version::user_agent().to_owned());
.unwrap_or_else(|| conduwuit::version::user_agent_media().to_owned());
Ok(Arc::new(Self {
default: base(config)?

View File

@@ -2,7 +2,7 @@
use bytes::Bytes;
use conduwuit::{
Err, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err,
Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err,
error::inspect_debug_log, implement, trace,
};
use http::{HeaderValue, header::AUTHORIZATION};
@@ -179,7 +179,10 @@ async fn into_http_response(
debug!("Got {status:?} for {method} {url}");
if !status.is_success() {
return Err!(Federation(dest.to_owned(), RumaError::from_http_response(http_response),));
return Err(Error::Federation(
dest.to_owned(),
RumaError::from_http_response(http_response),
));
}
Ok(http_response)

View File

@@ -170,6 +170,8 @@ pub(super) fn remove_url_preview(&self, url: &str) -> Result<()> {
Ok(())
}
pub(super) async fn clear_url_previews(&self) { self.url_previews.clear().await; }
pub(super) fn set_url_preview(
&self,
url: &str,

View File

@@ -37,6 +37,9 @@ pub async fn remove_url_preview(&self, url: &str) -> Result<()> {
self.db.remove_url_preview(url)
}
#[implement(Service)]
pub async fn clear_url_previews(&self) { self.db.clear_url_previews().await; }
#[implement(Service)]
pub async fn set_url_preview(&self, url: &str, data: &UrlPreviewData) -> Result<()> {
let now = SystemTime::now()

View File

@@ -35,7 +35,7 @@ pub async fn fetch_remote_thumbnail(
.fetch_thumbnail_authenticated(mxc, user, server, timeout_ms, dim)
.await;
if let Err(Error::Request { kind: NotFound, .. }) = &result {
if let Err(Error::Request(NotFound, ..)) = &result {
return self
.fetch_thumbnail_unauthenticated(mxc, user, server, timeout_ms, dim)
.await;
@@ -67,7 +67,7 @@ pub async fn fetch_remote_content(
);
});
if let Err(Error::Request { kind: Unrecognized, .. }) = &result {
if let Err(Error::Request(Unrecognized, ..)) = &result {
return self
.fetch_content_unauthenticated(mxc, user, server, timeout_ms)
.await;

View File

@@ -31,7 +31,7 @@
pub mod sending;
pub mod server_keys;
pub mod sync;
pub mod transaction_ids;
pub mod transactions;
pub mod uiaa;
pub mod users;

View File

@@ -142,7 +142,7 @@ async fn get_auth_chain_outer(
let chunk_cache: Vec<_> = chunk
.into_iter()
.try_stream()
.try_stream::<conduwuit::Error>()
.broad_and_then(|(shortid, event_id)| async move {
if let Ok(cached) = self.get_cached_eventid_authchain(&[shortid]).await {
return Ok(cached.to_vec());

View File

@@ -112,14 +112,7 @@ pub async fn state_resolution<'a, StateSets>(
{
let event_fetch = |event_id| self.event_fetch(event_id);
let event_exists = |event_id| self.event_exists(event_id);
Ok(
state_res::resolve(
room_version,
state_sets,
auth_chain_sets,
&event_fetch,
&event_exists,
)
.await?,
)
state_res::resolve(room_version, state_sets, auth_chain_sets, &event_fetch, &event_exists)
.map_err(|e| err!(error!("State resolution failed: {e:?}")))
.await
}

View File

@@ -3,7 +3,7 @@
str::FromStr,
};
use conduwuit::{Err, Error, Result};
use conduwuit::{Error, Result};
use ruma::{UInt, api::client::error::ErrorKind};
use crate::rooms::short::ShortRoomId;
@@ -57,7 +57,7 @@ fn from_str(value: &str) -> Result<Self> {
if let Some(token) = pag_tok() {
Ok(token)
} else {
Err!(BadRequest(ErrorKind::InvalidParam, "invalid token"))
Err(Error::BadRequest(ErrorKind::InvalidParam, "invalid token"))
}
}
}

View File

@@ -75,7 +75,10 @@ fn from_evt(
let content: RoomCreateEventContent = serde_json::from_str(content.get())?;
Ok(content.room_version)
} else {
Err!(InconsistentRoomState("non-create event for room of unknown version", room_id))
Err(Error::InconsistentRoomState(
"non-create event for room of unknown version",
room_id,
))
}
}
let PduBuilder {
@@ -272,9 +275,7 @@ fn from_evt(
.hash_and_sign_event(&mut pdu_json, &room_version_id)
{
return match e {
| Error::Signatures { source, .. }
if matches!(source, ruma::signatures::Error::PduSize) =>
{
| Error::Signatures(ruma::signatures::Error::PduSize) => {
Err!(Request(TooLarge("Message/PDU is too long (exceeds 65535 bytes)")))
},
| _ => Err!(Request(Unknown(warn!("Signing event failed: {e}")))),

View File

@@ -14,7 +14,7 @@
media, moderation, presence, pusher, registration_tokens, resolver, rooms, sending,
server_keys,
service::{self, Args, Map, Service},
sync, transaction_ids, uiaa, users,
sync, transactions, uiaa, users,
};
pub struct Services {
@@ -37,7 +37,7 @@ pub struct Services {
pub sending: Arc<sending::Service>,
pub server_keys: Arc<server_keys::Service>,
pub sync: Arc<sync::Service>,
pub transaction_ids: Arc<transaction_ids::Service>,
pub transactions: Arc<transactions::Service>,
pub uiaa: Arc<uiaa::Service>,
pub users: Arc<users::Service>,
pub moderation: Arc<moderation::Service>,
@@ -110,7 +110,7 @@ macro_rules! build {
sending: build!(sending::Service),
server_keys: build!(server_keys::Service),
sync: build!(sync::Service),
transaction_ids: build!(transaction_ids::Service),
transactions: build!(transactions::Service),
uiaa: build!(uiaa::Service),
users: build!(users::Service),
moderation: build!(moderation::Service),

View File

@@ -1,54 +0,0 @@
use std::sync::Arc;
use conduwuit::{Result, implement};
use database::{Handle, Map};
use ruma::{DeviceId, TransactionId, UserId};
pub struct Service {
db: Data,
}
struct Data {
userdevicetxnid_response: Arc<Map>,
}
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data {
userdevicetxnid_response: args.db["userdevicetxnid_response"].clone(),
},
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
#[implement(Service)]
pub fn add_txnid(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &TransactionId,
data: &[u8],
) {
let mut key = user_id.as_bytes().to_vec();
key.push(0xFF);
key.extend_from_slice(device_id.map(DeviceId::as_bytes).unwrap_or_default());
key.push(0xFF);
key.extend_from_slice(txn_id.as_bytes());
self.db.userdevicetxnid_response.insert(&key, data);
}
// If there's no entry, this is a new transaction
#[implement(Service)]
pub async fn existing_txnid(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &TransactionId,
) -> Result<Handle<'_>> {
let key = (user_id, device_id, txn_id);
self.db.userdevicetxnid_response.qry(&key).await
}

View File

@@ -0,0 +1,326 @@
use std::{
collections::HashMap,
fmt,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::{Duration, SystemTime},
};
use async_trait::async_trait;
use conduwuit::{Error, Result, SyncRwLock, debug_warn, warn};
use database::{Handle, Map};
use ruma::{
DeviceId, OwnedServerName, OwnedTransactionId, TransactionId, UserId,
api::{
client::error::ErrorKind::LimitExceeded,
federation::transactions::send_transaction_message,
},
};
use tokio::sync::watch::{Receiver, Sender};
use crate::{Dep, config};
pub type TxnKey = (OwnedServerName, OwnedTransactionId);
pub type WrappedTransactionResponse =
Option<Result<send_transaction_message::v1::Response, TransactionError>>;
/// Errors that can occur during federation transaction processing.
#[derive(Debug, Clone)]
pub enum TransactionError {
/// Server is shutting down - the sender should retry the entire
/// transaction.
ShuttingDown,
}
impl fmt::Display for TransactionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
| Self::ShuttingDown => write!(f, "Server is shutting down"),
}
}
}
impl std::error::Error for TransactionError {}
/// Minimum interval between cache cleanup runs.
/// Exists to prevent thrashing when the cache is full of things that can't be
/// cleared
const CLEANUP_INTERVAL_SECS: u64 = 30;
#[derive(Clone, Debug)]
pub struct CachedTxnResponse {
pub response: send_transaction_message::v1::Response,
pub created: SystemTime,
}
/// Internal state for a federation transaction.
/// Either actively being processed or completed and cached.
#[derive(Clone)]
enum TxnState {
/// Transaction is currently being processed.
Active(Receiver<WrappedTransactionResponse>),
/// Transaction completed and response is cached.
Cached(CachedTxnResponse),
}
/// Result of atomically checking or starting a federation transaction.
pub enum FederationTxnState {
/// Transaction already completed and cached
Cached(send_transaction_message::v1::Response),
/// Transaction is currently being processed by another request.
/// Wait on this receiver for the result.
Active(Receiver<WrappedTransactionResponse>),
/// This caller should process the transaction (first to request it).
Started {
receiver: Receiver<WrappedTransactionResponse>,
sender: Sender<WrappedTransactionResponse>,
},
}
pub struct Service {
services: Services,
db: Data,
federation_txn_state: Arc<SyncRwLock<HashMap<TxnKey, TxnState>>>,
last_cleanup: AtomicU64,
}
struct Services {
config: Dep<config::Service>,
}
struct Data {
userdevicetxnid_response: Arc<Map>,
}
#[async_trait]
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
services: Services {
config: args.depend::<config::Service>("config"),
},
db: Data {
userdevicetxnid_response: args.db["userdevicetxnid_response"].clone(),
},
federation_txn_state: Arc::new(SyncRwLock::new(HashMap::new())),
last_cleanup: AtomicU64::new(0),
}))
}
async fn clear_cache(&self) {
let mut state = self.federation_txn_state.write();
// Only clear cached entries, preserve active transactions
state.retain(|_, v| matches!(v, TxnState::Active(_)));
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Returns the count of currently active (in-progress) transactions.
#[must_use]
pub fn txn_active_handle_count(&self) -> usize {
let state = self.federation_txn_state.read();
state
.values()
.filter(|v| matches!(v, TxnState::Active(_)))
.count()
}
pub fn add_client_txnid(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &TransactionId,
data: &[u8],
) {
let mut key = user_id.as_bytes().to_vec();
key.push(0xFF);
key.extend_from_slice(device_id.map(DeviceId::as_bytes).unwrap_or_default());
key.push(0xFF);
key.extend_from_slice(txn_id.as_bytes());
self.db.userdevicetxnid_response.insert(&key, data);
}
pub async fn get_client_txn(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &TransactionId,
) -> Result<Handle<'_>> {
let key = (user_id, device_id, txn_id);
self.db.userdevicetxnid_response.qry(&key).await
}
/// Atomically gets a cached response, joins an active transaction, or
/// starts a new one.
pub fn get_or_start_federation_txn(&self, key: TxnKey) -> Result<FederationTxnState> {
// Only one upgradable lock can be held at a time, and there aren't any
// read-only locks, so no point being upgradable
let mut state = self.federation_txn_state.write();
// Check existing state for this key
if let Some(txn_state) = state.get(&key) {
return Ok(match txn_state {
| TxnState::Cached(cached) => FederationTxnState::Cached(cached.response.clone()),
| TxnState::Active(receiver) => FederationTxnState::Active(receiver.clone()),
});
}
// Check if another transaction from this origin is already running
let has_active_from_origin = state
.iter()
.any(|(k, v)| k.0 == key.0 && matches!(v, TxnState::Active(_)));
if has_active_from_origin {
debug_warn!(
origin = ?key.0,
"Got concurrent transaction request from an origin with an active transaction"
);
return Err(Error::BadRequest(
LimitExceeded { retry_after: None },
"Still processing another transaction from this origin",
));
}
let max_active_txns = self.services.config.max_concurrent_inbound_transactions;
// Check if we're at capacity
if state.len() >= max_active_txns
&& let active_count = state
.values()
.filter(|v| matches!(v, TxnState::Active(_)))
.count() && active_count >= max_active_txns
{
warn!(
active = active_count,
max = max_active_txns,
"Server is overloaded, dropping incoming transaction"
);
return Err(Error::BadRequest(
LimitExceeded { retry_after: None },
"Server is overloaded, try again later",
));
}
// Start new transaction
let (sender, receiver) = tokio::sync::watch::channel(None);
state.insert(key, TxnState::Active(receiver.clone()));
Ok(FederationTxnState::Started { receiver, sender })
}
/// Finishes a transaction by transitioning it from active to cached state.
/// Additionally may trigger cleanup of old entries.
pub fn finish_federation_txn(
&self,
key: TxnKey,
sender: Sender<WrappedTransactionResponse>,
response: send_transaction_message::v1::Response,
) {
// Check if cleanup might be needed before acquiring the lock
let should_try_cleanup = self.should_try_cleanup();
let mut state = self.federation_txn_state.write();
// Explicitly set cached first so there is no gap where receivers get a closed
// channel
state.insert(
key,
TxnState::Cached(CachedTxnResponse {
response: response.clone(),
created: SystemTime::now(),
}),
);
if let Err(e) = sender.send(Some(Ok(response))) {
debug_warn!("Failed to send transaction response to waiting receivers: {e}");
}
// Explicitly close
drop(sender);
// This task is dangling, we can try clean caches now
if should_try_cleanup {
self.cleanup_entries_locked(&mut state);
}
}
pub fn remove_federation_txn(&self, key: &TxnKey) {
let mut state = self.federation_txn_state.write();
state.remove(key);
}
/// Checks if enough time has passed since the last cleanup to consider
/// running another. Updates the last cleanup time if returning true.
fn should_try_cleanup(&self) -> bool {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("SystemTime before UNIX_EPOCH")
.as_secs();
let last = self.last_cleanup.load(Ordering::Relaxed);
if now.saturating_sub(last) >= CLEANUP_INTERVAL_SECS {
// CAS: only update if no one else has updated it since we read
self.last_cleanup
.compare_exchange(last, now, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
} else {
false
}
}
/// Cleans up cached entries based on age and count limits.
///
/// First removes all cached entries older than the configured max age.
/// Then, if the cache still exceeds the max entry count, removes the oldest
/// cached entries until the count is within limits.
///
/// Must be called with write lock held on the state map.
fn cleanup_entries_locked(&self, state: &mut HashMap<TxnKey, TxnState>) {
let max_age_secs = self.services.config.transaction_id_cache_max_age_secs;
let max_entries = self.services.config.transaction_id_cache_max_entries;
// First pass: remove all cached entries older than max age
let cutoff = SystemTime::now()
.checked_sub(Duration::from_secs(max_age_secs))
.unwrap_or(SystemTime::UNIX_EPOCH);
state.retain(|_, v| match v {
| TxnState::Active(_) => true, // Never remove active transactions
| TxnState::Cached(cached) => cached.created > cutoff,
});
// Count cached entries
let cached_count = state
.values()
.filter(|v| matches!(v, TxnState::Cached(_)))
.count();
// Second pass: if still over max entries, remove oldest cached entries
if cached_count > max_entries {
let excess = cached_count.saturating_sub(max_entries);
// Collect cached entries sorted by age (oldest first)
let mut cached_entries: Vec<_> = state
.iter()
.filter_map(|(k, v)| match v {
| TxnState::Cached(cached) => Some((k.clone(), cached.created)),
| TxnState::Active(_) => None,
})
.collect();
cached_entries.sort_by(|a, b| a.1.cmp(&b.1));
// Remove the oldest cached entries to get under the limit
for (key, _) in cached_entries.into_iter().take(excess) {
state.remove(&key);
}
}
}
}

View File

@@ -1,7 +1,7 @@
use std::{collections::BTreeMap, sync::Arc};
use conduwuit::{
Err, Result, SyncRwLock, err, error, implement, utils,
Err, Error, Result, SyncRwLock, err, error, implement, utils,
utils::{hash, string::EMPTY},
};
use database::{Deserialized, Json, Map};
@@ -117,7 +117,7 @@ pub async fn try_auth(
} else if let Some(username) = user {
username
} else {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::Unrecognized,
"Identifier type not recognized.",
));
@@ -125,7 +125,7 @@ pub async fn try_auth(
#[cfg(not(feature = "element_hacks"))]
let Some(UserIdentifier::UserIdOrLocalpart(username)) = identifier else {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::Unrecognized,
"Identifier type not recognized.",
));
@@ -135,7 +135,7 @@ pub async fn try_auth(
username.clone(),
self.services.globals.server_name(),
)
.map_err(|_| err!(BadRequest(ErrorKind::InvalidParam, "User ID is invalid.")))?;
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "User ID is invalid."))?;
// Check if the access token being used matches the credentials used for UIAA
if user_id.localpart() != user_id_from_username.localpart() {

View File

@@ -761,13 +761,13 @@ pub async fn add_cross_signing_keys(
.keys
.into_values();
let self_signing_key_id = self_signing_key_ids.next().ok_or(err!(BadRequest(
let self_signing_key_id = self_signing_key_ids.next().ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Self signing key contained no key.",
)))?;
))?;
if self_signing_key_ids.next().is_some() {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Self signing key contained more than one key.",
));
@@ -1439,13 +1439,13 @@ pub fn parse_master_key(
let master_key = master_key
.deserialize()
.map_err(|_| err!(BadRequest(ErrorKind::InvalidParam, "Invalid master key")))?;
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid master key"))?;
let mut master_key_ids = master_key.keys.values();
let master_key_id = master_key_ids
.next()
.ok_or(err!(BadRequest(ErrorKind::InvalidParam, "Master key contained no key.")))?;
.ok_or(Error::BadRequest(ErrorKind::InvalidParam, "Master key contained no key."))?;
if master_key_ids.next().is_some() {
return Err!(BadRequest(
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Master key contained more than one key.",
));

View File

@@ -25,7 +25,7 @@ axum.workspace = true
futures.workspace = true
tracing.workspace = true
rand.workspace = true
snafu.workspace = true
thiserror.workspace = true
[lints]
workspace = true

View File

@@ -8,7 +8,6 @@
};
use conduwuit_build_metadata::{GIT_REMOTE_COMMIT_URL, GIT_REMOTE_WEB_URL, version_tag};
use conduwuit_service::state;
use snafu::{IntoError, prelude::*};
pub fn build() -> Router<state::State> {
Router::<state::State>::new()
@@ -49,17 +48,10 @@ async fn logo_handler() -> impl IntoResponse {
)
}
#[derive(Debug, Snafu)]
#[derive(Debug, thiserror::Error)]
enum WebError {
#[snafu(display("Failed to render template: {source}"))]
Render {
source: askama::Error,
backtrace: snafu::Backtrace,
},
}
impl From<askama::Error> for WebError {
fn from(source: askama::Error) -> Self { RenderSnafu.into_error(source) }
#[error("Failed to render template: {0}")]
Render(#[from] askama::Error),
}
impl IntoResponse for WebError {
@@ -74,7 +66,7 @@ struct Error<'a> {
let nonce = rand::random::<u64>().to_string();
let status = match &self {
| Self::Render { .. } => StatusCode::INTERNAL_SERVER_ERROR,
| Self::Render(_) => StatusCode::INTERNAL_SERVER_ERROR,
};
let tmpl = Error { nonce: &nonce, err: self };
if let Ok(body) = tmpl.render() {

View File

@@ -12,8 +12,8 @@ Server Error
</h1>
{%- match err -%}
{% when WebError::Render { source, .. } -%}
<pre>{{ source }}</pre>
{% when WebError::Render(err) -%}
<pre>{{ err }}</pre>
{% else -%} <p>An error occurred</p>
{%- endmatch -%}