mirror of
https://forgejo.ellis.link/continuwuation/continuwuity/
synced 2026-04-02 18:35:42 +00:00
Compare commits
3 Commits
jade/docke
...
jade/livei
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c91e9951a6 | ||
|
|
de6a44d272 | ||
|
|
c2ea303363 |
@@ -23,7 +23,7 @@ repos:
|
||||
- id: check-added-large-files
|
||||
|
||||
- repo: https://github.com/crate-ci/typos
|
||||
rev: v1.43.5
|
||||
rev: v1.43.4
|
||||
hooks:
|
||||
- id: typos
|
||||
- id: typos
|
||||
|
||||
@@ -85,31 +85,24 @@ ### Matrix tests
|
||||
|
||||
### Writing documentation
|
||||
|
||||
Continuwuity's website uses [`rspress`][rspress] and is deployed via CI using Cloudflare Pages
|
||||
Continuwuity's website uses [`mdbook`][mdbook] and is deployed via CI using Cloudflare Pages
|
||||
in the [`documentation.yml`][documentation.yml] workflow file. All documentation is in the `docs/`
|
||||
directory at the top level.
|
||||
|
||||
To load the documentation locally:
|
||||
|
||||
1. Install NodeJS and npm from their [official website][nodejs-download] or via your package manager of choice
|
||||
|
||||
2. From the project's root directory, install the relevant npm modules
|
||||
To build the documentation locally:
|
||||
|
||||
1. Install mdbook if you don't have it already:
|
||||
```bash
|
||||
npm ci
|
||||
cargo install mdbook # or cargo binstall, or another method
|
||||
```
|
||||
|
||||
3. Make changes to the document pages as you see fit
|
||||
|
||||
4. Generate a live preview of the documentation
|
||||
|
||||
2. Build the documentation:
|
||||
```bash
|
||||
npm run docs:dev
|
||||
mdbook build
|
||||
```
|
||||
|
||||
A webserver for the docs will be spun up for you (e.g. at `http://localhost:3000`). Any changes you make to the documentation will be live-reloaded on the webpage.
|
||||
The output of the mdbook generation is in `public/`. You can open the HTML files directly in your browser without needing a web server.
|
||||
|
||||
Alternatively, you can build the documentation using `npm run docs:build` - the output of this will be in the `/doc_build` directory. Once you're happy with your documentation updates, you can commit the changes.
|
||||
|
||||
### Commit Messages
|
||||
|
||||
@@ -176,6 +169,5 @@ ### Creating pull requests
|
||||
[continuwuity-matrix]: https://matrix.to/#/#continuwuity:continuwuity.org?via=continuwuity.org&via=ellis.link&via=explodie.org&via=matrix.org
|
||||
[complement]: https://github.com/matrix-org/complement/
|
||||
[sytest]: https://github.com/matrix-org/sytest/
|
||||
[nodejs-download]: https://nodejs.org/en/download
|
||||
[rspress]: https://rspress.rs/
|
||||
[mdbook]: https://rust-lang.github.io/mdBook/
|
||||
[documentation.yml]: https://forgejo.ellis.link/continuwuation/continuwuity/src/branch/main/.forgejo/workflows/documentation.yml
|
||||
|
||||
1050
Cargo.lock
generated
1050
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
29
Cargo.toml
29
Cargo.toml
@@ -68,7 +68,7 @@ default-features = false
|
||||
version = "0.1.3"
|
||||
|
||||
[workspace.dependencies.rand]
|
||||
version = "0.10.0"
|
||||
version = "0.8.5"
|
||||
|
||||
# Used for the http request / response body type for Ruma endpoints used with reqwest
|
||||
[workspace.dependencies.bytes]
|
||||
@@ -84,7 +84,7 @@ version = "1.3.1"
|
||||
version = "1.11.1"
|
||||
|
||||
[workspace.dependencies.axum]
|
||||
version = "0.8.8"
|
||||
version = "0.7.9"
|
||||
default-features = false
|
||||
features = [
|
||||
"form",
|
||||
@@ -97,7 +97,7 @@ features = [
|
||||
]
|
||||
|
||||
[workspace.dependencies.axum-extra]
|
||||
version = "0.12.0"
|
||||
version = "0.9.6"
|
||||
default-features = false
|
||||
features = ["typed-header", "tracing"]
|
||||
|
||||
@@ -110,7 +110,7 @@ default-features = false
|
||||
version = "0.7"
|
||||
|
||||
[workspace.dependencies.axum-client-ip]
|
||||
version = "0.7"
|
||||
version = "0.6.1"
|
||||
|
||||
[workspace.dependencies.tower]
|
||||
version = "0.5.2"
|
||||
@@ -118,7 +118,7 @@ default-features = false
|
||||
features = ["util"]
|
||||
|
||||
[workspace.dependencies.tower-http]
|
||||
version = "0.6.8"
|
||||
version = "0.6.2"
|
||||
default-features = false
|
||||
features = [
|
||||
"add-extension",
|
||||
@@ -253,7 +253,7 @@ features = [
|
||||
version = "0.4.0"
|
||||
|
||||
[workspace.dependencies.libloading]
|
||||
version = "0.9.0"
|
||||
version = "0.8.6"
|
||||
|
||||
# Validating urls in config, was already a transitive dependency
|
||||
[workspace.dependencies.url]
|
||||
@@ -298,7 +298,7 @@ default-features = false
|
||||
features = ["env", "toml"]
|
||||
|
||||
[workspace.dependencies.hickory-resolver]
|
||||
version = "0.25.2"
|
||||
version = "0.25.1"
|
||||
default-features = false
|
||||
features = [
|
||||
"serde",
|
||||
@@ -342,8 +342,7 @@ version = "0.1.2"
|
||||
# Used for matrix spec type definitions and helpers
|
||||
[workspace.dependencies.ruma]
|
||||
git = "https://forgejo.ellis.link/continuwuation/ruwuma"
|
||||
#branch = "conduwuit-changes"
|
||||
rev = "bb12ed288a31a23aa11b10ba0fad22b7f985eb88"
|
||||
rev = "b496b7f38d517149361a882e75d3fd4faf210441"
|
||||
features = [
|
||||
"compat",
|
||||
"rand",
|
||||
@@ -363,7 +362,6 @@ features = [
|
||||
"unstable-msc2870",
|
||||
"unstable-msc3026",
|
||||
"unstable-msc3061",
|
||||
"unstable-msc3814",
|
||||
"unstable-msc3245",
|
||||
"unstable-msc3266",
|
||||
"unstable-msc3381", # polls
|
||||
@@ -382,7 +380,6 @@ features = [
|
||||
"unstable-pdu",
|
||||
"unstable-msc4155",
|
||||
"unstable-msc4143", # livekit well_known response
|
||||
"unstable-msc4284"
|
||||
]
|
||||
|
||||
[workspace.dependencies.rust-rocksdb]
|
||||
@@ -427,7 +424,7 @@ features = ["http", "grpc-tonic", "trace", "logs", "metrics"]
|
||||
|
||||
# optional sentry metrics for crash/panic reporting
|
||||
[workspace.dependencies.sentry]
|
||||
version = "0.46.0"
|
||||
version = "0.45.0"
|
||||
default-features = false
|
||||
features = [
|
||||
"backtrace",
|
||||
@@ -443,9 +440,9 @@ features = [
|
||||
]
|
||||
|
||||
[workspace.dependencies.sentry-tracing]
|
||||
version = "0.46.0"
|
||||
version = "0.45.0"
|
||||
[workspace.dependencies.sentry-tower]
|
||||
version = "0.46.0"
|
||||
version = "0.45.0"
|
||||
|
||||
# jemalloc usage
|
||||
[workspace.dependencies.tikv-jemalloc-sys]
|
||||
@@ -474,7 +471,7 @@ features = ["use_std"]
|
||||
version = "0.5"
|
||||
|
||||
[workspace.dependencies.nix]
|
||||
version = "0.31.0"
|
||||
version = "0.30.1"
|
||||
default-features = false
|
||||
features = ["resource"]
|
||||
|
||||
@@ -556,7 +553,7 @@ version = "0.7.5"
|
||||
version = "1.0.1"
|
||||
|
||||
[workspace.dependencies.askama]
|
||||
version = "0.15.0"
|
||||
version = "0.14.0"
|
||||
|
||||
#
|
||||
# Patches
|
||||
|
||||
11
README.md
11
README.md
@@ -57,15 +57,10 @@ ### What are the project's goals?
|
||||
|
||||
### Can I try it out?
|
||||
|
||||
Check out the [documentation](https://continuwuity.org) for installation instructions.
|
||||
Check out the [documentation](https://continuwuity.org) for installation instructions, or join one of these vetted public homeservers running Continuwuity to get a feel for things!
|
||||
|
||||
If you want to try it out as a user, we have some partnered homeservers you can use:
|
||||
* You can head over to [https://federated.nexus](https://federated.nexus/) in your browser.
|
||||
* Hit the `Apply to Join` button. Once your request has been accepted, you will receive an email with your username and password.
|
||||
* Head over to [https://app.federated.nexus](https://app.federated.nexus/) and you can sign in there, or use any other matrix chat client you wish elsewhere.
|
||||
* Your username for matrix will be in the form of `@username:federated.nexus`, however you can simply use the `username` part to log in. Your password is your password.
|
||||
|
||||
* There's also [https://continuwuity.rocks/](https://continuwuity.rocks/). You can register a new account using Cinny via [this convenient link](https://app.cinny.in/register/continuwuity.rocks), or you can use Element or another matrix client *that supports registration*.
|
||||
- https://continuwuity.rocks -- A public demo server operated by the Continuwuity Team.
|
||||
- https://federated.nexus -- Federated Nexus is a community resource hosting multiple FOSS (especially federated) services, including Matrix and Forgejo.
|
||||
|
||||
### What are we working on?
|
||||
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Removed non-compliant nor functional room alias lookups over federation. Contributed by @nex
|
||||
@@ -1 +0,0 @@
|
||||
Outgoing presence is now disabled by default, and the config option documentation has been adjusted to more accurately represent the weight of presence, typing indicators, and read receipts. Contributed by @nex.
|
||||
@@ -1 +0,0 @@
|
||||
Removed ability to set rocksdb as read only. Doing so would cause unintentional and buggy behaviour. Contributed by @Terryiscool160.
|
||||
@@ -1 +0,0 @@
|
||||
Fixed a startup crash in the sender service if we can't detect the number of CPU cores, even if the `sender_workers' config option is set correctly. Contributed by @katie.
|
||||
@@ -1 +0,0 @@
|
||||
Improved the concurrency handling of federation transactions, vastly improving performance and reliability by more accurately handling inbound transactions and reducing the amount of repeated wasted work. Contributed by @nex and @Jade.
|
||||
@@ -1 +0,0 @@
|
||||
Added MSC3202 Device masquerading (not all of MSC3202). This should fix issues with enabling MSC4190 for some Mautrix bridges. Contributed by @Jade
|
||||
@@ -1 +0,0 @@
|
||||
Added MSC3814 Dehydrated Devices - you can now decrypt messages sent while all devices were logged out.
|
||||
@@ -1 +0,0 @@
|
||||
Removed the `allow_public_room_directory_without_auth` config option. Contributed by @0xnim.
|
||||
@@ -1 +0,0 @@
|
||||
Implement MSC4143 MatrixRTC transport discovery endpoint. Move RTC foci configuration from `[global.well_known]` to a new `[global.matrix_rtc]` section with a `foci` field. Contributed by @0xnim
|
||||
@@ -1 +0,0 @@
|
||||
Fixed sliding sync v5 list ranges always starting from 0, causing extra rooms to be unnecessarily processed and returned. Contributed by @0xnim
|
||||
@@ -1 +0,0 @@
|
||||
BREAKING: Added an entrypoint to the Docker image. This means you no longer need to specify the binary when running a command using the image. Contributed by @Jade
|
||||
@@ -1 +0,0 @@
|
||||
Updated `list-backups` admin command to output one backup per line.
|
||||
@@ -1 +0,0 @@
|
||||
Improved URL preview fetching with a more compatible user agent for sites like YouTube Music. Added `!admin media delete-url-preview <url>` command to clear cached URL previews that were stuck and broken.
|
||||
@@ -9,6 +9,7 @@ address = "0.0.0.0"
|
||||
allow_device_name_federation = true
|
||||
allow_guest_registration = true
|
||||
allow_public_room_directory_over_federation = true
|
||||
allow_public_room_directory_without_auth = true
|
||||
allow_registration = true
|
||||
database_path = "/database"
|
||||
log = "trace,h2=debug,hyper=debug"
|
||||
|
||||
@@ -290,25 +290,6 @@
|
||||
#
|
||||
#max_fetch_prev_events = 192
|
||||
|
||||
# How many incoming federation transactions the server is willing to be
|
||||
# processing at any given time before it becomes overloaded and starts
|
||||
# rejecting further transactions until some slots become available.
|
||||
#
|
||||
# Setting this value too low or too high may result in unstable
|
||||
# federation, and setting it too high may cause runaway resource usage.
|
||||
#
|
||||
#max_concurrent_inbound_transactions = 150
|
||||
|
||||
# Maximum age (in seconds) for cached federation transaction responses.
|
||||
# Entries older than this will be removed during cleanup.
|
||||
#
|
||||
#transaction_id_cache_max_age_secs = 7200 (2 hours)
|
||||
|
||||
# Maximum number of cached federation transaction responses.
|
||||
# When the cache exceeds this limit, older entries will be removed.
|
||||
#
|
||||
#transaction_id_cache_max_entries = 8192
|
||||
|
||||
# Default/base connection timeout (seconds). This is used only by URL
|
||||
# previews and update/news endpoint checks.
|
||||
#
|
||||
@@ -546,6 +527,12 @@
|
||||
#
|
||||
#allow_public_room_directory_over_federation = false
|
||||
|
||||
# Set this to true to allow your server's public room directory to be
|
||||
# queried without client authentication (access token) through the Client
|
||||
# APIs. Set this to false to protect against /publicRooms spiders.
|
||||
#
|
||||
#allow_public_room_directory_without_auth = false
|
||||
|
||||
# Allow guests/unauthenticated users to access TURN credentials.
|
||||
#
|
||||
# This is the equivalent of Synapse's `turn_allow_guests` config option.
|
||||
@@ -1069,6 +1056,14 @@
|
||||
#
|
||||
#rocksdb_repair = false
|
||||
|
||||
# This item is undocumented. Please contribute documentation for it.
|
||||
#
|
||||
#rocksdb_read_only = false
|
||||
|
||||
# This item is undocumented. Please contribute documentation for it.
|
||||
#
|
||||
#rocksdb_secondary = false
|
||||
|
||||
# Enables idle CPU priority for compaction thread. This is not enabled by
|
||||
# default to prevent compaction from falling too far behind on busy
|
||||
# systems.
|
||||
@@ -1125,34 +1120,27 @@
|
||||
|
||||
# Allow local (your server only) presence updates/requests.
|
||||
#
|
||||
# Local presence must be enabled for outgoing presence to function.
|
||||
#
|
||||
# Note that local presence is not as heavy on the CPU as federated
|
||||
# presence, but will still become more expensive the more local users you
|
||||
# have.
|
||||
# Note that presence on continuwuity is very fast unlike Synapse's. If
|
||||
# using outgoing presence, this MUST be enabled.
|
||||
#
|
||||
#allow_local_presence = true
|
||||
|
||||
# Allow incoming federated presence updates.
|
||||
# Allow incoming federated presence updates/requests.
|
||||
#
|
||||
# This option enables processing inbound presence updates from other
|
||||
# servers. Without it, remote users will appear as if they are always
|
||||
# offline to your local users. This does not affect typing indicators or
|
||||
# read receipts.
|
||||
# This option receives presence updates from other servers, but does not
|
||||
# send any unless `allow_outgoing_presence` is true. Note that presence on
|
||||
# continuwuity is very fast unlike Synapse's.
|
||||
#
|
||||
#allow_incoming_presence = true
|
||||
|
||||
# Allow outgoing presence updates/requests.
|
||||
#
|
||||
# This option sends presence updates to other servers, and requires that
|
||||
# `allow_local_presence` is also enabled.
|
||||
# This option sends presence updates to other servers, but does not
|
||||
# receive any unless `allow_incoming_presence` is true. Note that presence
|
||||
# on continuwuity is very fast unlike Synapse's. If using outgoing
|
||||
# presence, you MUST enable `allow_local_presence` as well.
|
||||
#
|
||||
# Note that outgoing presence is very heavy on the CPU and network, and
|
||||
# will typically cause extreme strain and slowdowns for no real benefit.
|
||||
# There are only a few clients that even implement presence, so you
|
||||
# probably don't want to enable this.
|
||||
#
|
||||
#allow_outgoing_presence = false
|
||||
#allow_outgoing_presence = true
|
||||
|
||||
# How many seconds without presence updates before you become idle.
|
||||
# Defaults to 5 minutes.
|
||||
@@ -1186,10 +1174,6 @@
|
||||
|
||||
# Allow sending read receipts to remote servers.
|
||||
#
|
||||
# Note that sending read receipts to remote servers in large rooms with
|
||||
# lots of other homeservers may cause additional strain on the CPU and
|
||||
# network.
|
||||
#
|
||||
#allow_outgoing_read_receipts = true
|
||||
|
||||
# Allow local typing updates.
|
||||
@@ -1201,10 +1185,6 @@
|
||||
|
||||
# Allow outgoing typing updates to federation.
|
||||
#
|
||||
# Note that sending typing indicators to remote servers in large rooms
|
||||
# with lots of other homeservers may cause additional strain on the CPU
|
||||
# and network.
|
||||
#
|
||||
#allow_outgoing_typing = true
|
||||
|
||||
# Allow incoming typing updates from federation.
|
||||
@@ -1338,7 +1318,7 @@
|
||||
# sender user's server name, inbound federation X-Matrix origin, and
|
||||
# outbound federation handler.
|
||||
#
|
||||
# You can set this to [".*"] to block all servers by default, and then
|
||||
# You can set this to ["*"] to block all servers by default, and then
|
||||
# use `allowed_remote_server_names` to allow only specific servers.
|
||||
#
|
||||
# example: ["badserver\\.tld$", "badphrase", "19dollarfortnitecards"]
|
||||
@@ -1844,13 +1824,14 @@
|
||||
#
|
||||
#support_mxid =
|
||||
|
||||
# **DEPRECATED**: Use `[global.matrix_rtc].foci` instead.
|
||||
#
|
||||
# A list of MatrixRTC foci URLs which will be served as part of the
|
||||
# MSC4143 client endpoint at /.well-known/matrix/client.
|
||||
# MSC4143 client endpoint at /.well-known/matrix/client. If you're
|
||||
# setting up livekit, you'd want something like:
|
||||
# rtc_focus_server_urls = [
|
||||
# { type = "livekit", livekit_service_url = "https://livekit.example.com" },
|
||||
# ]
|
||||
#
|
||||
# This option is deprecated and will be removed in a future release.
|
||||
# Please migrate to the new `[global.matrix_rtc]` config section.
|
||||
# To disable, set this to be an empty vector (`[]`).
|
||||
#
|
||||
#rtc_focus_server_urls = []
|
||||
|
||||
@@ -1872,23 +1853,6 @@
|
||||
#
|
||||
#blurhash_max_raw_size = 33554432
|
||||
|
||||
[global.matrix_rtc]
|
||||
|
||||
# A list of MatrixRTC foci (transports) which will be served via the
|
||||
# MSC4143 RTC transports endpoint at
|
||||
# `/_matrix/client/v1/rtc/transports`. If you're setting up livekit,
|
||||
# you'd want something like:
|
||||
# ```toml
|
||||
# [global.matrix_rtc]
|
||||
# foci = [
|
||||
# { type = "livekit", livekit_service_url = "https://livekit.example.com" },
|
||||
# ]
|
||||
# ```
|
||||
#
|
||||
# To disable, set this to an empty list (`[]`).
|
||||
#
|
||||
#foci = []
|
||||
|
||||
[global.ldap]
|
||||
|
||||
# Whether to enable LDAP login.
|
||||
|
||||
@@ -52,7 +52,7 @@ ENV BINSTALL_VERSION=1.17.5
|
||||
# renovate: datasource=github-releases depName=psastras/sbom-rs
|
||||
ENV CARGO_SBOM_VERSION=0.9.1
|
||||
# renovate: datasource=crate depName=lddtree
|
||||
ENV LDDTREE_VERSION=0.5.0
|
||||
ENV LDDTREE_VERSION=0.4.0
|
||||
# renovate: datasource=crate depName=timelord-cli
|
||||
ENV TIMELORD_VERSION=3.0.1
|
||||
|
||||
@@ -162,7 +162,6 @@ ENV CONDUWUIT_VERSION_EXTRA=$CONDUWUIT_VERSION_EXTRA
|
||||
ENV CONTINUWUITY_VERSION_EXTRA=$CONTINUWUITY_VERSION_EXTRA
|
||||
|
||||
ARG RUST_PROFILE=release
|
||||
ARG CARGO_FEATURES="default,http3"
|
||||
|
||||
# Build the binary
|
||||
RUN --mount=type=cache,target=/usr/local/cargo/registry \
|
||||
@@ -172,32 +171,18 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
|
||||
set -o allexport
|
||||
set -o xtrace
|
||||
. /etc/environment
|
||||
|
||||
# Check if http3 feature is enabled and set appropriate RUSTFLAGS
|
||||
if echo "${CARGO_FEATURES}" | grep -q "http3"; then
|
||||
export RUSTFLAGS="${RUSTFLAGS} --cfg reqwest_unstable"
|
||||
else
|
||||
export RUSTFLAGS="${RUSTFLAGS}"
|
||||
fi
|
||||
|
||||
RUST_PROFILE_DIR="${RUST_PROFILE}"
|
||||
if [[ "${RUST_PROFILE}" == "dev" ]]; then
|
||||
RUST_PROFILE_DIR="debug"
|
||||
fi
|
||||
|
||||
TARGET_DIR=($(cargo metadata --no-deps --format-version 1 | \
|
||||
jq -r ".target_directory"))
|
||||
mkdir /out/sbin
|
||||
PACKAGE=conduwuit
|
||||
xx-cargo build --locked --profile ${RUST_PROFILE} \
|
||||
--no-default-features --features ${CARGO_FEATURES} \
|
||||
-p $PACKAGE;
|
||||
BINARIES=($(cargo metadata --no-deps --format-version 1 | \
|
||||
jq -r ".packages[] | select(.name == \"$PACKAGE\") | .targets[] | select( .kind | map(. == \"bin\") | any ) | .name"))
|
||||
for BINARY in "${BINARIES[@]}"; do
|
||||
echo $BINARY
|
||||
xx-verify $TARGET_DIR/$(xx-cargo --print-target-triple)/${RUST_PROFILE_DIR}/$BINARY
|
||||
cp $TARGET_DIR/$(xx-cargo --print-target-triple)/${RUST_PROFILE_DIR}/$BINARY /out/sbin/$BINARY
|
||||
xx-verify $TARGET_DIR/$(xx-cargo --print-target-triple)/${RUST_PROFILE}/$BINARY
|
||||
cp $TARGET_DIR/$(xx-cargo --print-target-triple)/${RUST_PROFILE}/$BINARY /out/sbin/$BINARY
|
||||
done
|
||||
EOF
|
||||
|
||||
@@ -281,5 +266,4 @@ ENV LD_LIBRARY_PATH=/usr/lib
|
||||
# Continuwuity default port
|
||||
EXPOSE 8008
|
||||
|
||||
ENTRYPOINT [ "/sbin/conduwuit" ]
|
||||
CMD ["/sbin/conduwuit"]
|
||||
|
||||
@@ -22,7 +22,7 @@ ENV BINSTALL_VERSION=1.17.5
|
||||
# renovate: datasource=github-releases depName=psastras/sbom-rs
|
||||
ENV CARGO_SBOM_VERSION=0.9.1
|
||||
# renovate: datasource=crate depName=lddtree
|
||||
ENV LDDTREE_VERSION=0.5.0
|
||||
ENV LDDTREE_VERSION=0.4.0
|
||||
|
||||
# Install unpackaged tools
|
||||
RUN <<EOF
|
||||
|
||||
@@ -78,19 +78,47 @@ #### Firewall hints
|
||||
|
||||
### 3. Telling clients where to find LiveKit
|
||||
|
||||
To tell clients where to find LiveKit, you need to add the address of your `lk-jwt-service` to the `[global.matrix_rtc]` config section using the `foci` option.
|
||||
To tell clients where to find LiveKit, you need to add the address of your `lk-jwt-service` to your client .well-known file. To do so, in the config section `global.well-known`, add (or modify) the option `rtc_focus_server_urls`.
|
||||
|
||||
The variable should be a list of servers serving as MatrixRTC endpoints. Clients discover these via the `/_matrix/client/v1/rtc/transports` endpoint (MSC4143).
|
||||
The variable should be a list of servers serving as MatrixRTC endpoints to serve in the well-known file to the client.
|
||||
|
||||
```toml
|
||||
[global.matrix_rtc]
|
||||
foci = [
|
||||
rtc_focus_server_urls = [
|
||||
{ type = "livekit", livekit_service_url = "https://livekit.example.com" },
|
||||
]
|
||||
```
|
||||
|
||||
Remember to replace the URL with the address you are deploying your instance of lk-jwt-service to.
|
||||
|
||||
#### Serving .well-known manually
|
||||
|
||||
If you don't let Continuwuity serve your `.well-known` files, you need to add the following lines to your `.well-known/matrix/client` file, remembering to replace the URL with your own `lk-jwt-service` deployment:
|
||||
|
||||
```json
|
||||
"org.matrix.msc4143.rtc_foci": [
|
||||
{
|
||||
"type": "livekit",
|
||||
"livekit_service_url": "https://livekit.example.com"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
The final file should look something like this:
|
||||
|
||||
```json
|
||||
{
|
||||
"m.homeserver": {
|
||||
"base_url":"https://matrix.example.com"
|
||||
},
|
||||
"org.matrix.msc4143.rtc_foci": [
|
||||
{
|
||||
"type": "livekit",
|
||||
"livekit_service_url": "https://livekit.example.com"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### 4. Configure your Reverse Proxy
|
||||
|
||||
Reverse proxies can be configured in many different ways - so we can't provide a step by step for this.
|
||||
@@ -130,7 +158,7 @@ ### 4. Configure your Reverse Proxy
|
||||
proxy_set_header X-Forwarded-For $remote_addr;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_set_header Host $http_host;
|
||||
proxy_buffering off;
|
||||
proxy_buffering off;
|
||||
}
|
||||
|
||||
# for livekit
|
||||
@@ -238,3 +266,4 @@ ### Related Documentation
|
||||
- [Synapse documentation](https://github.com/element-hq/element-call/blob/livekit/docs/self-hosting.md)
|
||||
- [Community guide](https://tomfos.tr/matrix/livekit/)
|
||||
- [Community guide](https://blog.kimiblock.top/2024/12/24/hosting-element-call/)
|
||||
-
|
||||
|
||||
@@ -3,5 +3,3 @@ # Continuwuity for FreeBSD
|
||||
Continuwuity currently does not provide FreeBSD builds or FreeBSD packaging. However, Continuwuity does build and work on FreeBSD using the system-provided RocksDB.
|
||||
|
||||
Contributions to get Continuwuity packaged for FreeBSD are welcome.
|
||||
|
||||
Please join our [Continuwuity BSD](https://matrix.to/#/%23bsd:continuwuity.org) community room.
|
||||
|
||||
@@ -56,8 +56,6 @@ ### Building with the Rust toolchain
|
||||
|
||||
You can build Continuwuity using `cargo build --release`.
|
||||
|
||||
Continuwuity supports various optional features that can be enabled during compilation. Please see the Cargo.toml file for a comprehensive list, or ask in our rooms.
|
||||
|
||||
### Building with Nix
|
||||
|
||||
If you prefer, you can use Nix (or [Lix](https://lix.systems)) to build Continuwuity. This provides improved reproducibility and makes it easy to set up a build environment and generate output. This approach also allows for easy cross-compilation.
|
||||
|
||||
@@ -1,109 +1,7 @@
|
||||
# Continuwuity for Kubernetes
|
||||
|
||||
Continuwuity doesn't support horizontal scalability or distributed loading
|
||||
natively. However, a deployment in Kubernetes is very similar to the docker
|
||||
setup. This is because Continuwuity can be fully configured using environment
|
||||
variables. A sample StatefulSet is shared below. The only thing missing is
|
||||
a PVC definition (named `continuwuity-data`) for the volume mounted to
|
||||
the StatefulSet, an Ingress resources to point your webserver to the
|
||||
Continuwuity Pods, and a Service resource (targeting `app.kubernetes.io/name: continuwuity`)
|
||||
to glue the Ingress and Pod together.
|
||||
|
||||
Carefully go through the `env` section and add, change, and remove any env vars you like using the [Configuration reference](https://continuwuity.org/reference/config.html)
|
||||
|
||||
```yaml
|
||||
apiVersion: apps/v1
|
||||
kind: StatefulSet
|
||||
metadata:
|
||||
name: continuwuity
|
||||
namespace: matrix
|
||||
labels:
|
||||
app.kubernetes.io/name: continuwuity
|
||||
spec:
|
||||
replicas: 1
|
||||
serviceName: continuwuity
|
||||
podManagementPolicy: Parallel
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: continuwuity
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app.kubernetes.io/name: continuwuity
|
||||
spec:
|
||||
securityContext:
|
||||
sysctls:
|
||||
- name: net.ipv4.ip_unprivileged_port_start
|
||||
value: "0"
|
||||
containers:
|
||||
- name: continuwuity
|
||||
# use a sha hash <3
|
||||
image: forgejo.ellis.link/continuwuation/continuwuity:latest
|
||||
imagePullPolicy: IfNotPresent
|
||||
ports:
|
||||
- name: http
|
||||
containerPort: 80
|
||||
volumeMounts:
|
||||
- mountPath: /data
|
||||
name: data
|
||||
subPath: data
|
||||
securityContext:
|
||||
capabilities:
|
||||
add:
|
||||
- NET_BIND_SERVICE
|
||||
env:
|
||||
- name: TOKIO_WORKER_THREADS
|
||||
value: "2"
|
||||
- name: CONTINUWUITY_SERVER_NAME
|
||||
value: "example.com"
|
||||
- name: CONTINUWUITY_DATABASE_PATH
|
||||
value: "/data/db"
|
||||
- name: CONTINUWUITY_DATABASE_BACKEND
|
||||
value: "rocksdb"
|
||||
- name: CONTINUWUITY_PORT
|
||||
value: "80"
|
||||
- name: CONTINUWUITY_MAX_REQUEST_SIZE
|
||||
value: "20000000"
|
||||
- name: CONTINUWUITY_ALLOW_FEDERATION
|
||||
value: "true"
|
||||
- name: CONTINUWUITY_TRUSTED_SERVERS
|
||||
value: '["matrix.org"]'
|
||||
- name: CONTINUWUITY_ADDRESS
|
||||
value: "0.0.0.0"
|
||||
- name: CONTINUWUITY_ROCKSDB_PARALLELISM_THREADS
|
||||
value: "1"
|
||||
- name: CONTINUWUITY_WELL_KNOWN__SERVER
|
||||
value: "matrix.example.com:443"
|
||||
- name: CONTINUWUITY_WELL_KNOWN__CLIENT
|
||||
value: "https://matrix.example.com"
|
||||
- name: CONTINUWUITY_ALLOW_REGISTRATION
|
||||
value: "false"
|
||||
- name: RUST_LOG
|
||||
value: info
|
||||
readinessProbe:
|
||||
httpGet:
|
||||
path: /_matrix/federation/v1/version
|
||||
port: http
|
||||
periodSeconds: 4
|
||||
failureThreshold: 5
|
||||
resources:
|
||||
# Continuwuity might use quite some RAM :3
|
||||
requests:
|
||||
cpu: "2"
|
||||
memory: "512Mi"
|
||||
limits:
|
||||
cpu: "4"
|
||||
memory: "2048Mi"
|
||||
volumes:
|
||||
- name: data
|
||||
persistentVolumeClaim:
|
||||
claimName: continuwuity-data
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
Apart from manually configuring the containers,
|
||||
[a community-maintained Helm Chart is available here to run
|
||||
natively. However, [a community-maintained Helm Chart is available here to run
|
||||
conduwuit on Kubernetes](https://gitlab.cronce.io/charts/conduwuit)
|
||||
|
||||
This should be compatible with Continuwuity, but you will need to change the image reference.
|
||||
|
||||
@@ -51,13 +51,7 @@ ## Can I try it out?
|
||||
|
||||
Check out the [documentation](https://continuwuity.org) for installation instructions.
|
||||
|
||||
If you want to try it out as a user, we have some partnered homeservers you can use:
|
||||
* You can head over to [https://federated.nexus](https://federated.nexus/) in your browser.
|
||||
* Hit the `Apply to Join` button. Once your request has been accepted, you will receive an email with your username and password.
|
||||
* Head over to [https://app.federated.nexus](https://app.federated.nexus/) and you can sign in there, or use any other matrix chat client you wish elsewhere.
|
||||
* Your username for matrix will be in the form of `@username:federated.nexus`, however you can simply use the `username` part to log in. Your password is your password.
|
||||
|
||||
* There's also [https://continuwuity.rocks/](https://continuwuity.rocks/). You can register a new account using Cinny via [this convenient link](https://app.cinny.in/register/continuwuity.rocks), or you can use Element or another matrix client *that supports registration*.
|
||||
There are currently no open registration continuwuity instances available.
|
||||
|
||||
## What are we working on?
|
||||
|
||||
|
||||
@@ -36,7 +36,3 @@ ## `!admin media delete-all-from-user`
|
||||
## `!admin media delete-all-from-server`
|
||||
|
||||
Deletes all remote media from the specified remote server. This will always ignore errors by default
|
||||
|
||||
## `!admin media delete-url-preview`
|
||||
|
||||
Deletes a cached URL preview, forcing it to be re-fetched. Use --all to purge all cached URL previews
|
||||
|
||||
@@ -1,28 +1,13 @@
|
||||
# Troubleshooting Continuwuity
|
||||
|
||||
:::warning{title="Docker users:"}
|
||||
Docker can be difficult to use and debug. It's common for Docker
|
||||
misconfigurations to cause issues, particularly with networking and permissions.
|
||||
Please check that your issues are not due to problems with your Docker setup.
|
||||
:::
|
||||
> **Docker users ⚠️**
|
||||
>
|
||||
> Docker can be difficult to use and debug. It's common for Docker
|
||||
> misconfigurations to cause issues, particularly with networking and permissions.
|
||||
> Please check that your issues are not due to problems with your Docker setup.
|
||||
|
||||
## Continuwuity and Matrix issues
|
||||
|
||||
### Slow joins to rooms
|
||||
|
||||
Some slowness is to be expected if you're the first person on your homserver to join a room (which will
|
||||
always be the case for single-user homeservers). In this situation, your homeserver has to verify the signatures of
|
||||
all of the state events sent by other servers before your join. To make this process as fast as possible, make sure you have
|
||||
multiple fast, trusted servers listed in `trusted_servers` in your configuration, and ensure
|
||||
`query_trusted_key_servers_first_on_join` is set to true (the default).
|
||||
If you need suggestions for trusted servers, ask in the Continuwuity main room.
|
||||
|
||||
However, _very_ slow joins, especially to rooms with only a few users in them or rooms created by another user
|
||||
on your homeserver, may be caused by [issue !779](https://forgejo.ellis.link/continuwuation/continuwuity/issues/779),
|
||||
which is a longstanding bug with synchronizing room joins to clients. In this situation, you did succeed in joining the room, but
|
||||
the bug caused your homeserver to forget to tell your client. **To fix this, clear your client's cache.** Both Element and Cinny
|
||||
have a button to clear their cache in the "About" section of their settings.
|
||||
|
||||
### Lost access to admin room
|
||||
|
||||
You can reinvite yourself to the admin room through the following methods:
|
||||
|
||||
@@ -77,12 +77,7 @@ rec {
|
||||
craneLib.buildDepsOnly (
|
||||
(commonAttrs commonAttrsArgs)
|
||||
// {
|
||||
env = uwuenv.buildDepsOnlyEnv
|
||||
// (makeRocksDBEnv { inherit rocksdb; })
|
||||
// {
|
||||
# required since we started using unstable reqwest apparently ... otherwise the all-features build will fail
|
||||
RUSTFLAGS = "--cfg reqwest_unstable";
|
||||
};
|
||||
env = uwuenv.buildDepsOnlyEnv // (makeRocksDBEnv { inherit rocksdb; });
|
||||
inherit (features) cargoExtraArgs;
|
||||
}
|
||||
|
||||
@@ -107,13 +102,7 @@ rec {
|
||||
'';
|
||||
cargoArtifacts = deps;
|
||||
doCheck = true;
|
||||
env =
|
||||
uwuenv.buildPackageEnv
|
||||
// rocksdbEnv
|
||||
// {
|
||||
# required since we started using unstable reqwest apparently ... otherwise the all-features build will fail
|
||||
RUSTFLAGS = "--cfg reqwest_unstable";
|
||||
};
|
||||
env = uwuenv.buildPackageEnv // rocksdbEnv;
|
||||
passthru.env = uwuenv.buildPackageEnv // rocksdbEnv;
|
||||
meta.mainProgram = crateInfo.pname;
|
||||
inherit (features) cargoExtraArgs;
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
{
|
||||
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
|
||||
"extends": ["config:recommended", "replacements:all"],
|
||||
"dependencyDashboard": true,
|
||||
"osvVulnerabilityAlerts": true,
|
||||
"lockFileMaintenance": {
|
||||
"enabled": true,
|
||||
@@ -58,25 +57,12 @@
|
||||
"matchUpdateTypes": ["minor", "patch"],
|
||||
"groupName": "github-actions-non-major"
|
||||
},
|
||||
{
|
||||
"description": "Batch patch-level Node.js dependency updates",
|
||||
"matchManagers": ["npm"],
|
||||
"matchUpdateTypes": ["patch"],
|
||||
"groupName": "node-patch-updates"
|
||||
},
|
||||
{
|
||||
"description": "Pin forgejo artifact actions to prevent breaking changes",
|
||||
"matchManagers": ["github-actions"],
|
||||
"matchPackageNames": ["forgejo/upload-artifact", "forgejo/download-artifact"],
|
||||
"enabled": false
|
||||
},
|
||||
{
|
||||
"description": "Auto-merge crate-ci/typos minor updates",
|
||||
"matchPackageNames": ["crate-ci/typos"],
|
||||
"matchUpdateTypes": ["minor", "patch"],
|
||||
"automerge": true,
|
||||
"automergeStrategy": "fast-forward"
|
||||
},
|
||||
{
|
||||
"description": "Auto-merge renovatebot docker image updates",
|
||||
"matchDatasources": ["docker"],
|
||||
|
||||
@@ -30,15 +30,12 @@ pub(super) async fn incoming_federation(&self) -> Result {
|
||||
.federation_handletime
|
||||
.read();
|
||||
|
||||
let mut msg = format!(
|
||||
"Handling {} incoming PDUs across {} active transactions:\n",
|
||||
map.len(),
|
||||
self.services.transactions.txn_active_handle_count()
|
||||
);
|
||||
let mut msg = format!("Handling {} incoming pdus:\n", map.len());
|
||||
for (r, (e, i)) in map.iter() {
|
||||
let elapsed = i.elapsed();
|
||||
writeln!(msg, "{} {}: {}m{}s", r, e, elapsed.as_secs() / 60, elapsed.as_secs() % 60)?;
|
||||
}
|
||||
|
||||
msg
|
||||
};
|
||||
|
||||
|
||||
@@ -29,9 +29,7 @@ pub(super) async fn delete(
|
||||
.delete(&mxc.as_str().try_into()?)
|
||||
.await?;
|
||||
|
||||
return self
|
||||
.write_str("Deleted the MXC from our database and on our filesystem.")
|
||||
.await;
|
||||
return Err!("Deleted the MXC from our database and on our filesystem.",);
|
||||
}
|
||||
|
||||
if let Some(event_id) = event_id {
|
||||
@@ -390,19 +388,3 @@ pub(super) async fn get_remote_thumbnail(
|
||||
self.write_str(&format!("```\n{result:#?}\nreceived {len} bytes for file content.\n```"))
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn delete_url_preview(&self, url: Option<String>, all: bool) -> Result {
|
||||
if all {
|
||||
self.services.media.clear_url_previews().await;
|
||||
|
||||
return self.write_str("Deleted all cached URL previews.").await;
|
||||
}
|
||||
|
||||
let url = url.expect("clap enforces url is required unless --all");
|
||||
|
||||
self.services.media.remove_url_preview(&url).await?;
|
||||
|
||||
self.write_str(&format!("Deleted cached URL preview for: {url}"))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -108,16 +108,4 @@ pub enum MediaCommand {
|
||||
#[arg(long, default_value("800"))]
|
||||
height: u32,
|
||||
},
|
||||
|
||||
/// Deletes a cached URL preview, forcing it to be re-fetched.
|
||||
/// Use --all to purge all cached URL previews.
|
||||
DeleteUrlPreview {
|
||||
/// The URL to clear from the saved preview data
|
||||
#[arg(required_unless_present = "all")]
|
||||
url: Option<String>,
|
||||
|
||||
/// Purge all cached URL previews
|
||||
#[arg(long, conflicts_with = "url")]
|
||||
all: bool,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -209,7 +209,7 @@ pub(super) async fn compact(
|
||||
let parallelism = parallelism.unwrap_or(1);
|
||||
let results = maps
|
||||
.into_iter()
|
||||
.try_stream::<conduwuit::Error>()
|
||||
.try_stream()
|
||||
.paralleln_and_then(runtime, parallelism, move |map| {
|
||||
map.compact_blocking(options.clone())?;
|
||||
Ok(map.name().to_owned())
|
||||
|
||||
@@ -20,17 +20,7 @@ pub enum ResolverCommand {
|
||||
name: Option<String>,
|
||||
},
|
||||
|
||||
/// Flush a given server from the resolver caches or flush them completely
|
||||
///
|
||||
/// * Examples:
|
||||
/// * Flush a specific server:
|
||||
///
|
||||
/// `!admin query resolver flush-cache matrix.example.com`
|
||||
///
|
||||
/// * Flush all resolver caches completely:
|
||||
///
|
||||
/// `!admin query resolver flush-cache --all`
|
||||
#[command(verbatim_doc_comment)]
|
||||
/// Flush a specific server from the resolver caches or everything
|
||||
FlushCache {
|
||||
name: Option<OwnedServerName>,
|
||||
|
||||
|
||||
@@ -89,7 +89,13 @@ async fn ban_room(&self, room: OwnedRoomOrAliasId) -> Result {
|
||||
locally, if not using get_alias_helper to fetch room ID remotely"
|
||||
);
|
||||
|
||||
match self.services.rooms.alias.resolve_alias(room_alias).await {
|
||||
match self
|
||||
.services
|
||||
.rooms
|
||||
.alias
|
||||
.resolve_alias(room_alias, None)
|
||||
.await
|
||||
{
|
||||
| Ok((room_id, servers)) => {
|
||||
debug!(
|
||||
%room_id,
|
||||
@@ -229,7 +235,7 @@ async fn ban_list_of_rooms(&self) -> Result {
|
||||
.services
|
||||
.rooms
|
||||
.alias
|
||||
.resolve_alias(room_alias)
|
||||
.resolve_alias(room_alias, None)
|
||||
.await
|
||||
{
|
||||
| Ok((room_id, servers)) => {
|
||||
@@ -382,7 +388,13 @@ async fn unban_room(&self, room: OwnedRoomOrAliasId) -> Result {
|
||||
room ID over federation"
|
||||
);
|
||||
|
||||
match self.services.rooms.alias.resolve_alias(room_alias).await {
|
||||
match self
|
||||
.services
|
||||
.rooms
|
||||
.alias
|
||||
.resolve_alias(room_alias, None)
|
||||
.await
|
||||
{
|
||||
| Ok((room_id, servers)) => {
|
||||
debug!(
|
||||
%room_id,
|
||||
|
||||
@@ -86,7 +86,7 @@ pub(super) async fn list_backups(&self) -> Result {
|
||||
.db
|
||||
.backup_list()?
|
||||
.try_stream()
|
||||
.try_for_each(|result| writeln!(self, "{result}"))
|
||||
.try_for_each(|result| write!(self, "{result}"))
|
||||
.await
|
||||
}
|
||||
|
||||
|
||||
@@ -28,10 +28,6 @@ gzip_compression = [
|
||||
"conduwuit-service/gzip_compression",
|
||||
"reqwest/gzip",
|
||||
]
|
||||
http3 = [
|
||||
"conduwuit-core/http3",
|
||||
"conduwuit-service/http3",
|
||||
]
|
||||
io_uring = [
|
||||
"conduwuit-service/io_uring",
|
||||
]
|
||||
|
||||
@@ -252,13 +252,6 @@ pub(crate) async fn register_route(
|
||||
}
|
||||
}
|
||||
|
||||
// Don't allow registration with user IDs that aren't local
|
||||
if !services.globals.user_is_local(&user_id) {
|
||||
return Err!(Request(InvalidUsername(
|
||||
"Username {body_username} is not local to this server"
|
||||
)));
|
||||
}
|
||||
|
||||
user_id
|
||||
},
|
||||
| Err(e) => {
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
},
|
||||
events::{
|
||||
AnyGlobalAccountDataEventContent, AnyRoomAccountDataEventContent,
|
||||
RoomAccountDataEventType,
|
||||
GlobalAccountDataEventType, RoomAccountDataEventType,
|
||||
},
|
||||
serde::Raw,
|
||||
};
|
||||
@@ -126,6 +126,12 @@ async fn set_account_data(
|
||||
)));
|
||||
}
|
||||
|
||||
if event_type_s == GlobalAccountDataEventType::PushRules.to_cow_str() {
|
||||
return Err!(Request(BadJson(
|
||||
"This endpoint cannot be used for setting/configuring push rules."
|
||||
)));
|
||||
}
|
||||
|
||||
let data: serde_json::Value = serde_json::from_str(data.get())
|
||||
.map_err(|e| err!(Request(BadJson(warn!("Invalid JSON provided: {e}")))))?;
|
||||
|
||||
|
||||
@@ -1,6 +1,12 @@
|
||||
use axum::extract::State;
|
||||
use conduwuit::{Err, Result};
|
||||
use ruma::api::client::alias::{create_alias, delete_alias, get_alias};
|
||||
use conduwuit::{Err, Result, debug};
|
||||
use conduwuit_service::Services;
|
||||
use futures::StreamExt;
|
||||
use rand::seq::SliceRandom;
|
||||
use ruma::{
|
||||
OwnedServerName, RoomAliasId, RoomId,
|
||||
api::client::alias::{create_alias, delete_alias, get_alias},
|
||||
};
|
||||
|
||||
use crate::Ruma;
|
||||
|
||||
@@ -90,9 +96,65 @@ pub(crate) async fn get_alias_route(
|
||||
) -> Result<get_alias::v3::Response> {
|
||||
let room_alias = body.body.room_alias;
|
||||
|
||||
let Ok((room_id, servers)) = services.rooms.alias.resolve_alias(&room_alias).await else {
|
||||
let Ok((room_id, servers)) = services.rooms.alias.resolve_alias(&room_alias, None).await
|
||||
else {
|
||||
return Err!(Request(NotFound("Room with alias not found.")));
|
||||
};
|
||||
|
||||
let servers = room_available_servers(&services, &room_id, &room_alias, servers).await;
|
||||
debug!(%room_alias, %room_id, "available servers: {servers:?}");
|
||||
|
||||
Ok(get_alias::v3::Response::new(room_id, servers))
|
||||
}
|
||||
|
||||
async fn room_available_servers(
|
||||
services: &Services,
|
||||
room_id: &RoomId,
|
||||
room_alias: &RoomAliasId,
|
||||
pre_servers: Vec<OwnedServerName>,
|
||||
) -> Vec<OwnedServerName> {
|
||||
// find active servers in room state cache to suggest
|
||||
let mut servers: Vec<OwnedServerName> = services
|
||||
.rooms
|
||||
.state_cache
|
||||
.room_servers(room_id)
|
||||
.map(ToOwned::to_owned)
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
// push any servers we want in the list already (e.g. responded remote alias
|
||||
// servers, room alias server itself)
|
||||
servers.extend(pre_servers);
|
||||
|
||||
servers.sort_unstable();
|
||||
servers.dedup();
|
||||
|
||||
// shuffle list of servers randomly after sort and dedupe
|
||||
servers.shuffle(&mut rand::thread_rng());
|
||||
|
||||
// insert our server as the very first choice if in list, else check if we can
|
||||
// prefer the room alias server first
|
||||
match servers
|
||||
.iter()
|
||||
.position(|server_name| services.globals.server_is_ours(server_name))
|
||||
{
|
||||
| Some(server_index) => {
|
||||
servers.swap_remove(server_index);
|
||||
servers.insert(0, services.globals.server_name().to_owned());
|
||||
},
|
||||
| _ => {
|
||||
match servers
|
||||
.iter()
|
||||
.position(|server| server == room_alias.server_name())
|
||||
{
|
||||
| Some(alias_server_index) => {
|
||||
servers.swap_remove(alias_server_index);
|
||||
servers.insert(0, room_alias.server_name().into());
|
||||
},
|
||||
| _ => {},
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
servers
|
||||
}
|
||||
|
||||
@@ -1,121 +0,0 @@
|
||||
use axum::extract::State;
|
||||
use axum_client_ip::InsecureClientIp;
|
||||
use conduwuit::{Err, Result, at};
|
||||
use futures::StreamExt;
|
||||
use ruma::api::client::dehydrated_device::{
|
||||
delete_dehydrated_device::unstable as delete_dehydrated_device,
|
||||
get_dehydrated_device::unstable as get_dehydrated_device, get_events::unstable as get_events,
|
||||
put_dehydrated_device::unstable as put_dehydrated_device,
|
||||
};
|
||||
|
||||
use crate::Ruma;
|
||||
|
||||
const MAX_BATCH_EVENTS: usize = 50;
|
||||
|
||||
/// # `PUT /_matrix/client/../dehydrated_device`
|
||||
///
|
||||
/// Creates or overwrites the user's dehydrated device.
|
||||
#[tracing::instrument(skip_all, fields(%client))]
|
||||
pub(crate) async fn put_dehydrated_device_route(
|
||||
State(services): State<crate::State>,
|
||||
InsecureClientIp(client): InsecureClientIp,
|
||||
body: Ruma<put_dehydrated_device::Request>,
|
||||
) -> Result<put_dehydrated_device::Response> {
|
||||
let sender_user = body
|
||||
.sender_user
|
||||
.as_deref()
|
||||
.expect("AccessToken authentication required");
|
||||
|
||||
let device_id = body.body.device_id.clone();
|
||||
|
||||
services
|
||||
.users
|
||||
.set_dehydrated_device(sender_user, body.body)
|
||||
.await?;
|
||||
|
||||
Ok(put_dehydrated_device::Response { device_id })
|
||||
}
|
||||
|
||||
/// # `DELETE /_matrix/client/../dehydrated_device`
|
||||
///
|
||||
/// Deletes the user's dehydrated device without replacement.
|
||||
#[tracing::instrument(skip_all, fields(%client))]
|
||||
pub(crate) async fn delete_dehydrated_device_route(
|
||||
State(services): State<crate::State>,
|
||||
InsecureClientIp(client): InsecureClientIp,
|
||||
body: Ruma<delete_dehydrated_device::Request>,
|
||||
) -> Result<delete_dehydrated_device::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
|
||||
let device_id = services.users.get_dehydrated_device_id(sender_user).await?;
|
||||
|
||||
services.users.remove_device(sender_user, &device_id).await;
|
||||
|
||||
Ok(delete_dehydrated_device::Response { device_id })
|
||||
}
|
||||
|
||||
/// # `GET /_matrix/client/../dehydrated_device`
|
||||
///
|
||||
/// Gets the user's dehydrated device
|
||||
#[tracing::instrument(skip_all, fields(%client))]
|
||||
pub(crate) async fn get_dehydrated_device_route(
|
||||
State(services): State<crate::State>,
|
||||
InsecureClientIp(client): InsecureClientIp,
|
||||
body: Ruma<get_dehydrated_device::Request>,
|
||||
) -> Result<get_dehydrated_device::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
|
||||
let device = services.users.get_dehydrated_device(sender_user).await?;
|
||||
|
||||
Ok(get_dehydrated_device::Response {
|
||||
device_id: device.device_id,
|
||||
device_data: device.device_data,
|
||||
})
|
||||
}
|
||||
|
||||
/// # `GET /_matrix/client/../dehydrated_device/{device_id}/events`
|
||||
///
|
||||
/// Paginates the events of the dehydrated device.
|
||||
#[tracing::instrument(skip_all, fields(%client))]
|
||||
pub(crate) async fn get_dehydrated_events_route(
|
||||
State(services): State<crate::State>,
|
||||
InsecureClientIp(client): InsecureClientIp,
|
||||
body: Ruma<get_events::Request>,
|
||||
) -> Result<get_events::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
|
||||
let device_id = &body.body.device_id;
|
||||
let existing_id = services.users.get_dehydrated_device_id(sender_user).await;
|
||||
|
||||
if existing_id.as_ref().is_err()
|
||||
|| existing_id
|
||||
.as_ref()
|
||||
.is_ok_and(|existing_id| existing_id != device_id)
|
||||
{
|
||||
return Err!(Request(Forbidden("Not the dehydrated device_id.")));
|
||||
}
|
||||
|
||||
let since: Option<u64> = body
|
||||
.body
|
||||
.next_batch
|
||||
.as_deref()
|
||||
.map(str::parse)
|
||||
.transpose()?;
|
||||
|
||||
let mut next_batch: Option<u64> = None;
|
||||
let events = services
|
||||
.users
|
||||
.get_to_device_events(sender_user, device_id, since, None)
|
||||
.take(MAX_BATCH_EVENTS)
|
||||
.inspect(|&(count, _)| {
|
||||
next_batch.replace(count);
|
||||
})
|
||||
.map(at!(1))
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
Ok(get_events::Response {
|
||||
events,
|
||||
next_batch: next_batch.as_ref().map(ToString::to_string),
|
||||
})
|
||||
}
|
||||
@@ -6,7 +6,6 @@
|
||||
Err, Result, err,
|
||||
utils::{self, content_disposition::make_content_disposition, math::ruma_from_usize},
|
||||
};
|
||||
use conduwuit_core::error;
|
||||
use conduwuit_service::{
|
||||
Services,
|
||||
media::{CACHE_CONTROL_IMMUTABLE, CORP_CROSS_ORIGIN, Dim, FileMeta, MXC_LENGTH},
|
||||
@@ -145,22 +144,12 @@ pub(crate) async fn get_content_route(
|
||||
server_name: &body.server_name,
|
||||
media_id: &body.media_id,
|
||||
};
|
||||
|
||||
let FileMeta {
|
||||
content,
|
||||
content_type,
|
||||
content_disposition,
|
||||
} = match fetch_file(&services, &mxc, user, body.timeout_ms, None).await {
|
||||
| Ok(meta) => meta,
|
||||
| Err(conduwuit::Error::Io(e)) => match e.kind() {
|
||||
| std::io::ErrorKind::NotFound => return Err!(Request(NotFound("Media not found."))),
|
||||
| std::io::ErrorKind::PermissionDenied => {
|
||||
error!("Permission denied when trying to read file: {e:?}");
|
||||
return Err!(Request(Unknown("Unknown error when fetching file.")));
|
||||
},
|
||||
| _ => return Err!(Request(Unknown("Unknown error when fetching file."))),
|
||||
},
|
||||
| Err(_) => return Err!(Request(Unknown("Unknown error when fetching file."))),
|
||||
};
|
||||
} = fetch_file(&services, &mxc, user, body.timeout_ms, None).await?;
|
||||
|
||||
Ok(get_content::v1::Response {
|
||||
file: content.expect("entire file contents"),
|
||||
@@ -196,18 +185,7 @@ pub(crate) async fn get_content_as_filename_route(
|
||||
content,
|
||||
content_type,
|
||||
content_disposition,
|
||||
} = match fetch_file(&services, &mxc, user, body.timeout_ms, None).await {
|
||||
| Ok(meta) => meta,
|
||||
| Err(conduwuit::Error::Io(e)) => match e.kind() {
|
||||
| std::io::ErrorKind::NotFound => return Err!(Request(NotFound("Media not found."))),
|
||||
| std::io::ErrorKind::PermissionDenied => {
|
||||
error!("Permission denied when trying to read file: {e:?}");
|
||||
return Err!(Request(Unknown("Unknown error when fetching file.")));
|
||||
},
|
||||
| _ => return Err!(Request(Unknown("Unknown error when fetching file."))),
|
||||
},
|
||||
| Err(_) => return Err!(Request(Unknown("Unknown error when fetching file."))),
|
||||
};
|
||||
} = fetch_file(&services, &mxc, user, body.timeout_ms, Some(&body.filename)).await?;
|
||||
|
||||
Ok(get_content_as_filename::v1::Response {
|
||||
file: content.expect("entire file contents"),
|
||||
|
||||
@@ -198,7 +198,11 @@ pub(crate) async fn join_room_by_id_or_alias_route(
|
||||
(servers, room_id)
|
||||
},
|
||||
| Err(room_alias) => {
|
||||
let (room_id, mut servers) = services.rooms.alias.resolve_alias(&room_alias).await?;
|
||||
let (room_id, mut servers) = services
|
||||
.rooms
|
||||
.alias
|
||||
.resolve_alias(&room_alias, Some(body.via.clone()))
|
||||
.await?;
|
||||
|
||||
banned_room_check(
|
||||
&services,
|
||||
|
||||
@@ -102,7 +102,11 @@ pub(crate) async fn knock_room_route(
|
||||
(servers, room_id)
|
||||
},
|
||||
| Err(room_alias) => {
|
||||
let (room_id, mut servers) = services.rooms.alias.resolve_alias(&room_alias).await?;
|
||||
let (room_id, mut servers) = services
|
||||
.rooms
|
||||
.alias
|
||||
.resolve_alias(&room_alias, Some(body.via.clone()))
|
||||
.await?;
|
||||
|
||||
banned_room_check(
|
||||
&services,
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
pub(super) mod backup;
|
||||
pub(super) mod capabilities;
|
||||
pub(super) mod context;
|
||||
pub(super) mod dehydrated_device;
|
||||
pub(super) mod device;
|
||||
pub(super) mod directory;
|
||||
pub(super) mod filter;
|
||||
@@ -50,7 +49,6 @@
|
||||
pub(super) use backup::*;
|
||||
pub(super) use capabilities::*;
|
||||
pub(super) use context::*;
|
||||
pub(super) use dehydrated_device::*;
|
||||
pub(super) use device::*;
|
||||
pub(super) use directory::*;
|
||||
pub(super) use filter::*;
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
use axum_client_ip::InsecureClientIp;
|
||||
use conduwuit::{Err, Event, Result, debug_info, info, matrix::pdu::PduEvent, utils::ReadyExt};
|
||||
use conduwuit_service::Services;
|
||||
use rand::Rng;
|
||||
use ruma::{
|
||||
EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
|
||||
api::client::{
|
||||
@@ -243,7 +244,7 @@ fn build_report(report: Report) -> RoomMessageEventContent {
|
||||
/// random delay sending a response per spec suggestion regarding
|
||||
/// enumerating for potential events existing in our server.
|
||||
async fn delay_response() {
|
||||
let time_to_wait = rand::random_range(2..5);
|
||||
let time_to_wait = rand::thread_rng().gen_range(2..5);
|
||||
debug_info!(
|
||||
"Got successful /report request, waiting {time_to_wait} seconds before sending \
|
||||
successful response."
|
||||
|
||||
@@ -50,8 +50,8 @@ pub(crate) async fn send_message_event_route(
|
||||
|
||||
// Check if this is a new transaction id
|
||||
if let Ok(response) = services
|
||||
.transactions
|
||||
.get_client_txn(sender_user, sender_device, &body.txn_id)
|
||||
.transaction_ids
|
||||
.existing_txnid(sender_user, sender_device, &body.txn_id)
|
||||
.await
|
||||
{
|
||||
// The client might have sent a txnid of the /sendToDevice endpoint
|
||||
@@ -92,7 +92,7 @@ pub(crate) async fn send_message_event_route(
|
||||
)
|
||||
.await?;
|
||||
|
||||
services.transactions.add_client_txnid(
|
||||
services.transaction_ids.add_txnid(
|
||||
sender_user,
|
||||
sender_device,
|
||||
&body.txn_id,
|
||||
|
||||
@@ -342,10 +342,10 @@ async fn allowed_to_send_state_event(
|
||||
}
|
||||
|
||||
for alias in aliases {
|
||||
let (alias_room_id, _) = services
|
||||
let (alias_room_id, _servers) = services
|
||||
.rooms
|
||||
.alias
|
||||
.resolve_alias(&alias)
|
||||
.resolve_alias(&alias, None)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
err!(Request(Unknown("Failed resolving alias \"{alias}\": {e}")))
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
use axum::extract::State;
|
||||
use axum_client_ip::InsecureClientIp;
|
||||
use conduwuit::{
|
||||
Result, at, extract_variant,
|
||||
Result, extract_variant,
|
||||
utils::{
|
||||
ReadyExt, TryFutureExtExt,
|
||||
stream::{BroadbandExt, Tools, WidebandExt},
|
||||
@@ -385,7 +385,6 @@ pub(crate) async fn build_sync_events(
|
||||
last_sync_end_count,
|
||||
Some(current_count),
|
||||
)
|
||||
.map(at!(1))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let device_one_time_keys_count = services
|
||||
|
||||
@@ -336,9 +336,7 @@ async fn handle_lists<'a, Rooms, AllRooms>(
|
||||
let ranges = list.ranges.clone();
|
||||
|
||||
for mut range in ranges {
|
||||
range.0 = range
|
||||
.0
|
||||
.min(UInt::try_from(active_rooms.len()).unwrap_or(UInt::MAX));
|
||||
range.0 = uint!(0);
|
||||
range.1 = range.1.checked_add(uint!(1)).unwrap_or(range.1);
|
||||
range.1 = range
|
||||
.1
|
||||
@@ -1029,7 +1027,6 @@ async fn collect_to_device(
|
||||
events: services
|
||||
.users
|
||||
.get_to_device_events(sender_user, sender_device, None, Some(next_batch))
|
||||
.map(at!(1))
|
||||
.collect()
|
||||
.await,
|
||||
})
|
||||
|
||||
@@ -26,8 +26,8 @@ pub(crate) async fn send_event_to_device_route(
|
||||
|
||||
// Check if this is a new transaction id
|
||||
if services
|
||||
.transactions
|
||||
.get_client_txn(sender_user, sender_device, &body.txn_id)
|
||||
.transaction_ids
|
||||
.existing_txnid(sender_user, sender_device, &body.txn_id)
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
@@ -104,8 +104,8 @@ pub(crate) async fn send_event_to_device_route(
|
||||
|
||||
// Save transaction id with empty data
|
||||
services
|
||||
.transactions
|
||||
.add_client_txnid(sender_user, sender_device, &body.txn_id, &[]);
|
||||
.transaction_ids
|
||||
.add_txnid(sender_user, sender_device, &body.txn_id, &[]);
|
||||
|
||||
Ok(send_event_to_device::v3::Response {})
|
||||
}
|
||||
|
||||
@@ -50,7 +50,6 @@ pub(crate) async fn get_supported_versions_route(
|
||||
("org.matrix.msc2836".to_owned(), true), /* threading/threads (https://github.com/matrix-org/matrix-spec-proposals/pull/2836) */
|
||||
("org.matrix.msc2946".to_owned(), true), /* spaces/hierarchy summaries (https://github.com/matrix-org/matrix-spec-proposals/pull/2946) */
|
||||
("org.matrix.msc3026.busy_presence".to_owned(), true), /* busy presence status (https://github.com/matrix-org/matrix-spec-proposals/pull/3026) */
|
||||
("org.matrix.msc3814".to_owned(), true), /* dehydrated devices */
|
||||
("org.matrix.msc3827".to_owned(), true), /* filtering of /publicRooms by room type (https://github.com/matrix-org/matrix-spec-proposals/pull/3827) */
|
||||
("org.matrix.msc3952_intentional_mentions".to_owned(), true), /* intentional mentions (https://github.com/matrix-org/matrix-spec-proposals/pull/3952) */
|
||||
("org.matrix.msc3916.stable".to_owned(), true), /* authenticated media (https://github.com/matrix-org/matrix-spec-proposals/pull/3916) */
|
||||
|
||||
@@ -27,32 +27,10 @@ pub(crate) async fn well_known_client(
|
||||
identity_server: None,
|
||||
sliding_sync_proxy: Some(SlidingSyncProxyInfo { url: client_url }),
|
||||
tile_server: None,
|
||||
rtc_foci: services
|
||||
.config
|
||||
.matrix_rtc
|
||||
.effective_foci(&services.config.well_known.rtc_focus_server_urls)
|
||||
.to_vec(),
|
||||
rtc_foci: services.config.well_known.rtc_focus_server_urls.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// # `GET /_matrix/client/v1/rtc/transports`
|
||||
/// # `GET /_matrix/client/unstable/org.matrix.msc4143/rtc/transports`
|
||||
///
|
||||
/// Returns the list of MatrixRTC foci (transports) configured for this
|
||||
/// homeserver, implementing MSC4143.
|
||||
pub(crate) async fn get_rtc_transports(
|
||||
State(services): State<crate::State>,
|
||||
_body: Ruma<ruma::api::client::discovery::get_rtc_transports::Request>,
|
||||
) -> Result<ruma::api::client::discovery::get_rtc_transports::Response> {
|
||||
Ok(ruma::api::client::discovery::get_rtc_transports::Response::new(
|
||||
services
|
||||
.config
|
||||
.matrix_rtc
|
||||
.effective_foci(&services.config.well_known.rtc_focus_server_urls)
|
||||
.to_vec(),
|
||||
))
|
||||
}
|
||||
|
||||
/// # `GET /.well-known/matrix/support`
|
||||
///
|
||||
/// Server support contact and support page of a homeserver's domain.
|
||||
|
||||
@@ -122,23 +122,23 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
|
||||
// Ruma doesn't have support for multiple paths for a single endpoint yet, and these routes
|
||||
// share one Ruma request / response type pair with {get,send}_state_event_for_key_route
|
||||
.route(
|
||||
"/_matrix/client/r0/rooms/{room_id}/state/{event_type}",
|
||||
"/_matrix/client/r0/rooms/:room_id/state/:event_type",
|
||||
get(client::get_state_events_for_empty_key_route)
|
||||
.put(client::send_state_event_for_empty_key_route),
|
||||
)
|
||||
.route(
|
||||
"/_matrix/client/v3/rooms/{room_id}/state/{event_type}",
|
||||
"/_matrix/client/v3/rooms/:room_id/state/:event_type",
|
||||
get(client::get_state_events_for_empty_key_route)
|
||||
.put(client::send_state_event_for_empty_key_route),
|
||||
)
|
||||
// These two endpoints allow trailing slashes
|
||||
.route(
|
||||
"/_matrix/client/r0/rooms/{room_id}/state/{event_type}/",
|
||||
"/_matrix/client/r0/rooms/:room_id/state/:event_type/",
|
||||
get(client::get_state_events_for_empty_key_route)
|
||||
.put(client::send_state_event_for_empty_key_route),
|
||||
)
|
||||
.route(
|
||||
"/_matrix/client/v3/rooms/{room_id}/state/{event_type}/",
|
||||
"/_matrix/client/v3/rooms/:room_id/state/:event_type/",
|
||||
get(client::get_state_events_for_empty_key_route)
|
||||
.put(client::send_state_event_for_empty_key_route),
|
||||
)
|
||||
@@ -160,10 +160,6 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
|
||||
.ruma_route(&client::update_device_route)
|
||||
.ruma_route(&client::delete_device_route)
|
||||
.ruma_route(&client::delete_devices_route)
|
||||
.ruma_route(&client::put_dehydrated_device_route)
|
||||
.ruma_route(&client::delete_dehydrated_device_route)
|
||||
.ruma_route(&client::get_dehydrated_device_route)
|
||||
.ruma_route(&client::get_dehydrated_events_route)
|
||||
.ruma_route(&client::get_tags_route)
|
||||
.ruma_route(&client::update_tag_route)
|
||||
.ruma_route(&client::delete_tag_route)
|
||||
@@ -181,14 +177,13 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
|
||||
.ruma_route(&client::get_mutual_rooms_route)
|
||||
.ruma_route(&client::get_room_summary)
|
||||
.route(
|
||||
"/_matrix/client/unstable/im.nheko.summary/rooms/{room_id_or_alias}/summary",
|
||||
"/_matrix/client/unstable/im.nheko.summary/rooms/:room_id_or_alias/summary",
|
||||
get(client::get_room_summary_legacy)
|
||||
)
|
||||
.ruma_route(&client::get_suspended_status)
|
||||
.ruma_route(&client::put_suspended_status)
|
||||
.ruma_route(&client::well_known_support)
|
||||
.ruma_route(&client::well_known_client)
|
||||
.ruma_route(&client::get_rtc_transports)
|
||||
.route("/_conduwuit/server_version", get(client::conduwuit_server_version))
|
||||
.route("/_continuwuity/server_version", get(client::conduwuit_server_version))
|
||||
.ruma_route(&client::room_initial_sync_route)
|
||||
@@ -201,7 +196,7 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
|
||||
.ruma_route(&server::get_server_version_route)
|
||||
.route("/_matrix/key/v2/server", get(server::get_server_keys_route))
|
||||
.route(
|
||||
"/_matrix/key/v2/server/{key_id}",
|
||||
"/_matrix/key/v2/server/:key_id",
|
||||
get(server::get_server_keys_deprecated_route),
|
||||
)
|
||||
.ruma_route(&server::get_public_rooms_route)
|
||||
@@ -237,9 +232,9 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
|
||||
.route("/_continuwuity/local_user_count", get(client::conduwuit_local_user_count));
|
||||
} else {
|
||||
router = router
|
||||
.route("/_matrix/federation/{*path}", any(federation_disabled))
|
||||
.route("/_matrix/federation/*path", any(federation_disabled))
|
||||
.route("/.well-known/matrix/server", any(federation_disabled))
|
||||
.route("/_matrix/key/{*path}", any(federation_disabled))
|
||||
.route("/_matrix/key/*path", any(federation_disabled))
|
||||
.route("/_conduwuit/local_user_count", any(federation_disabled))
|
||||
.route("/_continuwuity/local_user_count", any(federation_disabled));
|
||||
}
|
||||
@@ -258,27 +253,27 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
|
||||
get(client::get_media_preview_legacy_legacy_route),
|
||||
)
|
||||
.route(
|
||||
"/_matrix/media/v1/download/{server_name}/{media_id}",
|
||||
"/_matrix/media/v1/download/:server_name/:media_id",
|
||||
get(client::get_content_legacy_legacy_route),
|
||||
)
|
||||
.route(
|
||||
"/_matrix/media/v1/download/{server_name}/{media_id}/{file_name}",
|
||||
"/_matrix/media/v1/download/:server_name/:media_id/:file_name",
|
||||
get(client::get_content_as_filename_legacy_legacy_route),
|
||||
)
|
||||
.route(
|
||||
"/_matrix/media/v1/thumbnail/{server_name}/{media_id}",
|
||||
"/_matrix/media/v1/thumbnail/:server_name/:media_id",
|
||||
get(client::get_content_thumbnail_legacy_legacy_route),
|
||||
);
|
||||
} else {
|
||||
router = router
|
||||
.route("/_matrix/media/v1/{*path}", any(legacy_media_disabled))
|
||||
.route("/_matrix/media/v1/*path", any(legacy_media_disabled))
|
||||
.route("/_matrix/media/v3/config", any(legacy_media_disabled))
|
||||
.route("/_matrix/media/v3/download/{*path}", any(legacy_media_disabled))
|
||||
.route("/_matrix/media/v3/thumbnail/{*path}", any(legacy_media_disabled))
|
||||
.route("/_matrix/media/v3/download/*path", any(legacy_media_disabled))
|
||||
.route("/_matrix/media/v3/thumbnail/*path", any(legacy_media_disabled))
|
||||
.route("/_matrix/media/v3/preview_url", any(redirect_legacy_preview))
|
||||
.route("/_matrix/media/r0/config", any(legacy_media_disabled))
|
||||
.route("/_matrix/media/r0/download/{*path}", any(legacy_media_disabled))
|
||||
.route("/_matrix/media/r0/thumbnail/{*path}", any(legacy_media_disabled))
|
||||
.route("/_matrix/media/r0/download/*path", any(legacy_media_disabled))
|
||||
.route("/_matrix/media/r0/thumbnail/*path", any(legacy_media_disabled))
|
||||
.route("/_matrix/media/r0/preview_url", any(redirect_legacy_preview));
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::{mem, ops::Deref};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use axum::{body::Body, extract::FromRequest};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use conduwuit::{Error, Result, debug, debug_warn, err, trace, utils::string::EMPTY};
|
||||
@@ -78,6 +79,7 @@ impl<T> Deref for Args<T>
|
||||
fn deref(&self) -> &Self::Target { &self.body }
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T> FromRequest<State, Body> for Args<T>
|
||||
where
|
||||
T: IncomingRequest + Send + Sync + 'static,
|
||||
|
||||
@@ -14,8 +14,7 @@
|
||||
pin_mut,
|
||||
};
|
||||
use ruma::{
|
||||
CanonicalJsonObject, CanonicalJsonValue, DeviceId, OwnedDeviceId, OwnedServerName,
|
||||
OwnedUserId, UserId,
|
||||
CanonicalJsonObject, CanonicalJsonValue, OwnedDeviceId, OwnedServerName, OwnedUserId, UserId,
|
||||
api::{
|
||||
AuthScheme, IncomingRequest, Metadata,
|
||||
client::{
|
||||
@@ -55,8 +54,7 @@ pub(super) async fn auth(
|
||||
json_body: Option<&CanonicalJsonValue>,
|
||||
metadata: &Metadata,
|
||||
) -> Result<Auth> {
|
||||
let bearer: Option<TypedHeader<Authorization<Bearer>>> =
|
||||
request.parts.extract().await.unwrap_or(None);
|
||||
let bearer: Option<TypedHeader<Authorization<Bearer>>> = request.parts.extract().await?;
|
||||
let token = match &bearer {
|
||||
| Some(TypedHeader(Authorization(bearer))) => Some(bearer.token()),
|
||||
| None => request.query.access_token.as_deref(),
|
||||
@@ -67,17 +65,23 @@ pub(super) async fn auth(
|
||||
if metadata.authentication == AuthScheme::None {
|
||||
match metadata {
|
||||
| &get_public_rooms::v3::Request::METADATA => {
|
||||
match token {
|
||||
| Token::Appservice(_) | Token::User(_) => {
|
||||
// we should have validated the token above
|
||||
// already
|
||||
},
|
||||
| Token::None | Token::Invalid => {
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::MissingToken,
|
||||
"Missing or invalid access token.",
|
||||
));
|
||||
},
|
||||
if !services
|
||||
.server
|
||||
.config
|
||||
.allow_public_room_directory_without_auth
|
||||
{
|
||||
match token {
|
||||
| Token::Appservice(_) | Token::User(_) => {
|
||||
// we should have validated the token above
|
||||
// already
|
||||
},
|
||||
| Token::None | Token::Invalid => {
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::MissingToken,
|
||||
"Missing or invalid access token.",
|
||||
));
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
| &get_profile::v3::Request::METADATA
|
||||
@@ -229,33 +233,10 @@ async fn auth_appservice(
|
||||
return Err!(Request(Exclusive("User is not in namespace.")));
|
||||
}
|
||||
|
||||
// MSC3202/MSC4190: Handle device_id masquerading for appservices.
|
||||
// The device_id can be provided via `device_id` or
|
||||
// `org.matrix.msc3202.device_id` query parameter.
|
||||
let sender_device = if let Some(ref device_id_str) = request.query.device_id {
|
||||
let device_id: &DeviceId = device_id_str.as_str().into();
|
||||
|
||||
// Verify the device exists for this user
|
||||
if services
|
||||
.users
|
||||
.get_device_metadata(&user_id, device_id)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return Err!(Request(Forbidden(
|
||||
"Device does not exist for user or appservice cannot masquerade as this device."
|
||||
)));
|
||||
}
|
||||
|
||||
Some(device_id.to_owned())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(Auth {
|
||||
origin: None,
|
||||
sender_user: Some(user_id),
|
||||
sender_device,
|
||||
sender_device: None,
|
||||
appservice_info: Some(*info),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -11,10 +11,6 @@
|
||||
pub(super) struct QueryParams {
|
||||
pub(super) access_token: Option<String>,
|
||||
pub(super) user_id: Option<String>,
|
||||
/// Device ID for appservice device masquerading (MSC3202/MSC4190).
|
||||
/// Can be provided as `device_id` or `org.matrix.msc3202.device_id`.
|
||||
#[serde(alias = "org.matrix.msc3202.device_id")]
|
||||
pub(super) device_id: Option<String>,
|
||||
}
|
||||
|
||||
pub(super) struct Request {
|
||||
|
||||
@@ -40,7 +40,7 @@ pub(crate) async fn get_room_information_route(
|
||||
servers.sort_unstable();
|
||||
servers.dedup();
|
||||
|
||||
servers.shuffle(&mut rand::rng());
|
||||
servers.shuffle(&mut rand::thread_rng());
|
||||
|
||||
// insert our server as the very first choice if in list
|
||||
if let Some(server_index) = servers
|
||||
|
||||
@@ -1,33 +1,27 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
net::IpAddr,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use std::{collections::BTreeMap, net::IpAddr, time::Instant};
|
||||
|
||||
use axum::extract::State;
|
||||
use axum_client_ip::InsecureClientIp;
|
||||
use conduwuit::{
|
||||
Err, Error, Result, debug, debug_warn, err, error,
|
||||
result::LogErr,
|
||||
state_res::lexicographical_topological_sort,
|
||||
trace,
|
||||
utils::{
|
||||
IterStream, ReadyExt, millis_since_unix_epoch,
|
||||
stream::{BroadbandExt, TryBroadbandExt, automatic_width},
|
||||
},
|
||||
warn,
|
||||
};
|
||||
use conduwuit_service::{
|
||||
Services,
|
||||
sending::{EDU_LIMIT, PDU_LIMIT},
|
||||
};
|
||||
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
|
||||
use http::StatusCode;
|
||||
use itertools::Itertools;
|
||||
use ruma::{
|
||||
CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId,
|
||||
RoomId, ServerName, UserId,
|
||||
CanonicalJsonObject, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, ServerName, UserId,
|
||||
api::{
|
||||
client::error::{ErrorKind, ErrorKind::LimitExceeded},
|
||||
client::error::ErrorKind,
|
||||
federation::transactions::{
|
||||
edu::{
|
||||
DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent,
|
||||
@@ -38,16 +32,9 @@
|
||||
},
|
||||
},
|
||||
events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
|
||||
int,
|
||||
serde::Raw,
|
||||
to_device::DeviceIdOrAllDevices,
|
||||
uint,
|
||||
};
|
||||
use service::transactions::{
|
||||
FederationTxnState, TransactionError, TxnKey, WrappedTransactionResponse,
|
||||
};
|
||||
use tokio::sync::watch::{Receiver, Sender};
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::Ruma;
|
||||
|
||||
@@ -57,6 +44,15 @@
|
||||
/// # `PUT /_matrix/federation/v1/send/{txnId}`
|
||||
///
|
||||
/// Push EDUs and PDUs to this server.
|
||||
#[tracing::instrument(
|
||||
name = "txn",
|
||||
level = "debug",
|
||||
skip_all,
|
||||
fields(
|
||||
%client,
|
||||
origin = body.origin().as_str()
|
||||
),
|
||||
)]
|
||||
pub(crate) async fn send_transaction_message_route(
|
||||
State(services): State<crate::State>,
|
||||
InsecureClientIp(client): InsecureClientIp,
|
||||
@@ -80,73 +76,16 @@ pub(crate) async fn send_transaction_message_route(
|
||||
)));
|
||||
}
|
||||
|
||||
let txn_key = (body.origin().to_owned(), body.transaction_id.clone());
|
||||
|
||||
// Atomically check cache, join active, or start new transaction
|
||||
match services
|
||||
.transactions
|
||||
.get_or_start_federation_txn(txn_key.clone())?
|
||||
{
|
||||
| FederationTxnState::Cached(response) => {
|
||||
// Already responded
|
||||
Ok(response)
|
||||
},
|
||||
| FederationTxnState::Active(receiver) => {
|
||||
// Another thread is processing
|
||||
wait_for_result(receiver).await
|
||||
},
|
||||
| FederationTxnState::Started { receiver, sender } => {
|
||||
// We're the first, spawn the processing task
|
||||
services
|
||||
.server
|
||||
.runtime()
|
||||
.spawn(process_inbound_transaction(services, body, client, txn_key, sender));
|
||||
// and wait for it
|
||||
wait_for_result(receiver).await
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_result(
|
||||
mut recv: Receiver<WrappedTransactionResponse>,
|
||||
) -> Result<send_transaction_message::v1::Response> {
|
||||
if tokio::time::timeout(Duration::from_secs(50), recv.changed())
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
// Took too long, return 429 to encourage the sender to try again
|
||||
return Err(Error::BadRequest(
|
||||
LimitExceeded { retry_after: None },
|
||||
"Transaction is being still being processed. Please try again later.",
|
||||
));
|
||||
}
|
||||
let value = recv.borrow_and_update();
|
||||
match value.clone() {
|
||||
| Some(Ok(response)) => Ok(response),
|
||||
| Some(Err(err)) => Err(transaction_error_to_response(&err)),
|
||||
| None => Err(Error::Request(
|
||||
ErrorKind::Unknown,
|
||||
"Transaction processing failed unexpectedly".into(),
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
skip_all,
|
||||
fields(
|
||||
id = ?body.transaction_id.as_str(),
|
||||
origin = ?body.origin()
|
||||
)
|
||||
)]
|
||||
async fn process_inbound_transaction(
|
||||
services: crate::State,
|
||||
body: Ruma<send_transaction_message::v1::Request>,
|
||||
client: IpAddr,
|
||||
txn_key: TxnKey,
|
||||
sender: Sender<WrappedTransactionResponse>,
|
||||
) {
|
||||
let txn_start_time = Instant::now();
|
||||
trace!(
|
||||
pdus = body.pdus.len(),
|
||||
edus = body.edus.len(),
|
||||
elapsed = ?txn_start_time.elapsed(),
|
||||
id = %body.transaction_id,
|
||||
origin = %body.origin(),
|
||||
"Starting txn",
|
||||
);
|
||||
|
||||
let pdus = body
|
||||
.pdus
|
||||
.iter()
|
||||
@@ -163,79 +102,40 @@ async fn process_inbound_transaction(
|
||||
.filter_map(Result::ok)
|
||||
.stream();
|
||||
|
||||
debug!(pdus = body.pdus.len(), edus = body.edus.len(), "Processing transaction",);
|
||||
let results = match handle(&services, &client, body.origin(), pdus, edus).await {
|
||||
| Ok(results) => results,
|
||||
| Err(err) => {
|
||||
fail_federation_txn(services, &txn_key, &sender, err);
|
||||
return;
|
||||
},
|
||||
};
|
||||
|
||||
for (id, result) in &results {
|
||||
if let Err(e) = result {
|
||||
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
|
||||
debug_warn!("Incoming PDU failed {id}: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?;
|
||||
|
||||
debug!(
|
||||
pdus = body.pdus.len(),
|
||||
edus = body.edus.len(),
|
||||
elapsed = ?txn_start_time.elapsed(),
|
||||
"Finished processing transaction"
|
||||
id = %body.transaction_id,
|
||||
origin = %body.origin(),
|
||||
"Finished txn",
|
||||
);
|
||||
for (id, result) in &results {
|
||||
if let Err(e) = result {
|
||||
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
|
||||
warn!("Incoming PDU failed {id}: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let response = send_transaction_message::v1::Response {
|
||||
Ok(send_transaction_message::v1::Response {
|
||||
pdus: results
|
||||
.into_iter()
|
||||
.map(|(e, r)| (e, r.map_err(error::sanitized_message)))
|
||||
.collect(),
|
||||
};
|
||||
|
||||
services
|
||||
.transactions
|
||||
.finish_federation_txn(txn_key, sender, response);
|
||||
})
|
||||
}
|
||||
|
||||
/// Handles a failed federation transaction by sending the error through
|
||||
/// the channel and cleaning up the transaction state. This allows waiters to
|
||||
/// receive an appropriate error response.
|
||||
fn fail_federation_txn(
|
||||
services: crate::State,
|
||||
txn_key: &TxnKey,
|
||||
sender: &Sender<WrappedTransactionResponse>,
|
||||
err: TransactionError,
|
||||
) {
|
||||
debug!("Transaction failed: {err}");
|
||||
|
||||
// Remove from active state so the transaction can be retried
|
||||
services.transactions.remove_federation_txn(txn_key);
|
||||
|
||||
// Send the error to any waiters
|
||||
if let Err(e) = sender.send(Some(Err(err))) {
|
||||
debug_warn!("Failed to send transaction error to receivers: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts a TransactionError into an appropriate HTTP error response.
|
||||
fn transaction_error_to_response(err: &TransactionError) -> Error {
|
||||
match err {
|
||||
| TransactionError::ShuttingDown => Error::Request(
|
||||
ErrorKind::Unknown,
|
||||
"Server is shutting down, please retry later".into(),
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
),
|
||||
}
|
||||
}
|
||||
async fn handle(
|
||||
services: &Services,
|
||||
client: &IpAddr,
|
||||
origin: &ServerName,
|
||||
started: Instant,
|
||||
pdus: impl Stream<Item = Pdu> + Send,
|
||||
edus: impl Stream<Item = Edu> + Send,
|
||||
) -> std::result::Result<ResolvedMap, TransactionError> {
|
||||
) -> Result<ResolvedMap> {
|
||||
// group pdus by room
|
||||
let pdus = pdus
|
||||
.collect()
|
||||
@@ -252,7 +152,7 @@ async fn handle(
|
||||
.into_iter()
|
||||
.try_stream()
|
||||
.broad_and_then(|(room_id, pdus): (_, Vec<_>)| {
|
||||
handle_room(services, client, origin, room_id, pdus.into_iter())
|
||||
handle_room(services, client, origin, started, room_id, pdus.into_iter())
|
||||
.map_ok(Vec::into_iter)
|
||||
.map_ok(IterStream::try_stream)
|
||||
})
|
||||
@@ -269,51 +169,14 @@ async fn handle(
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Attempts to build a localised directed acyclic graph out of the given PDUs,
|
||||
/// returning them in a topologically sorted order.
|
||||
///
|
||||
/// This is used to attempt to process PDUs in an order that respects their
|
||||
/// dependencies, however it is ultimately the sender's responsibility to send
|
||||
/// them in a processable order, so this is just a best effort attempt. It does
|
||||
/// not account for power levels or other tie breaks.
|
||||
async fn build_local_dag(
|
||||
pdu_map: &HashMap<OwnedEventId, CanonicalJsonObject>,
|
||||
) -> Result<Vec<OwnedEventId>> {
|
||||
debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs");
|
||||
let mut dag: HashMap<OwnedEventId, HashSet<OwnedEventId>> = HashMap::new();
|
||||
|
||||
for (event_id, value) in pdu_map {
|
||||
let prev_events = value
|
||||
.get("prev_events")
|
||||
.expect("pdu must have prev_events")
|
||||
.as_array()
|
||||
.expect("prev_events must be an array")
|
||||
.iter()
|
||||
.map(|v| {
|
||||
OwnedEventId::parse(v.as_str().expect("prev_events values must be strings"))
|
||||
.expect("prev_events must be valid event IDs")
|
||||
})
|
||||
.collect::<HashSet<OwnedEventId>>();
|
||||
|
||||
dag.insert(event_id.clone(), prev_events);
|
||||
}
|
||||
lexicographical_topological_sort(&dag, &|_| async {
|
||||
// Note: we don't bother fetching power levels because that would massively slow
|
||||
// this function down. This is a best-effort attempt to order events correctly
|
||||
// for processing, however ultimately that should be the sender's job.
|
||||
Ok((int!(0), MilliSecondsSinceUnixEpoch(uint!(0))))
|
||||
})
|
||||
.await
|
||||
.map_err(|e| err!("failed to resolve local graph: {e}"))
|
||||
}
|
||||
|
||||
async fn handle_room(
|
||||
services: &Services,
|
||||
_client: &IpAddr,
|
||||
origin: &ServerName,
|
||||
txn_start_time: Instant,
|
||||
room_id: OwnedRoomId,
|
||||
pdus: impl Iterator<Item = Pdu> + Send,
|
||||
) -> std::result::Result<Vec<(OwnedEventId, Result)>, TransactionError> {
|
||||
) -> Result<Vec<(OwnedEventId, Result)>> {
|
||||
let _room_lock = services
|
||||
.rooms
|
||||
.event_handler
|
||||
@@ -322,40 +185,27 @@ async fn handle_room(
|
||||
.await;
|
||||
|
||||
let room_id = &room_id;
|
||||
let pdu_map: HashMap<OwnedEventId, CanonicalJsonObject> = pdus
|
||||
.into_iter()
|
||||
.map(|(_, event_id, value)| (event_id, value))
|
||||
.collect();
|
||||
// Try to sort PDUs by their dependencies, but fall back to arbitrary order on
|
||||
// failure (e.g., cycles). This is best-effort; proper ordering is the sender's
|
||||
// responsibility.
|
||||
let sorted_event_ids = if pdu_map.len() >= 2 {
|
||||
build_local_dag(&pdu_map).await.unwrap_or_else(|e| {
|
||||
debug_warn!("Failed to build local DAG for room {room_id}: {e}");
|
||||
pdu_map.keys().cloned().collect()
|
||||
pdus.try_stream()
|
||||
.and_then(|(_, event_id, value)| async move {
|
||||
services.server.check_running()?;
|
||||
let pdu_start_time = Instant::now();
|
||||
let result = services
|
||||
.rooms
|
||||
.event_handler
|
||||
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
|
||||
.await
|
||||
.map(|_| ());
|
||||
|
||||
debug!(
|
||||
pdu_elapsed = ?pdu_start_time.elapsed(),
|
||||
txn_elapsed = ?txn_start_time.elapsed(),
|
||||
"Finished PDU {event_id}",
|
||||
);
|
||||
|
||||
Ok((event_id, result))
|
||||
})
|
||||
} else {
|
||||
pdu_map.keys().cloned().collect()
|
||||
};
|
||||
let mut results = Vec::with_capacity(sorted_event_ids.len());
|
||||
for event_id in sorted_event_ids {
|
||||
let value = pdu_map
|
||||
.get(&event_id)
|
||||
.expect("sorted event IDs must be from the original map")
|
||||
.clone();
|
||||
services
|
||||
.server
|
||||
.check_running()
|
||||
.map_err(|_| TransactionError::ShuttingDown)?;
|
||||
let result = services
|
||||
.rooms
|
||||
.event_handler
|
||||
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
|
||||
.await
|
||||
.map(|_| ());
|
||||
results.push((event_id, result));
|
||||
}
|
||||
Ok(results)
|
||||
.try_collect()
|
||||
.await
|
||||
}
|
||||
|
||||
async fn handle_edu(services: &Services, client: &IpAddr, origin: &ServerName, edu: Edu) {
|
||||
@@ -628,8 +478,8 @@ async fn handle_edu_direct_to_device(
|
||||
|
||||
// Check if this is a new transaction id
|
||||
if services
|
||||
.transactions
|
||||
.get_client_txn(sender, None, message_id)
|
||||
.transaction_ids
|
||||
.existing_txnid(sender, None, message_id)
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
@@ -648,8 +498,8 @@ async fn handle_edu_direct_to_device(
|
||||
|
||||
// Save transaction id with empty data
|
||||
services
|
||||
.transactions
|
||||
.add_client_txnid(sender, None, message_id, &[]);
|
||||
.transaction_ids
|
||||
.add_txnid(sender, None, message_id, &[]);
|
||||
}
|
||||
|
||||
async fn handle_edu_direct_to_device_user<Event: Send + Sync>(
|
||||
|
||||
@@ -24,9 +24,6 @@ conduwuit_mods = [
|
||||
gzip_compression = [
|
||||
"reqwest/gzip",
|
||||
]
|
||||
http3 = [
|
||||
"reqwest/http3",
|
||||
]
|
||||
hardened_malloc = [
|
||||
"dep:hardened_malloc-rs"
|
||||
]
|
||||
@@ -86,7 +83,6 @@ libloading.optional = true
|
||||
log.workspace = true
|
||||
num-traits.workspace = true
|
||||
rand.workspace = true
|
||||
rand_core = { version = "0.6.4", features = ["getrandom"] }
|
||||
regex.workspace = true
|
||||
reqwest.workspace = true
|
||||
ring.workspace = true
|
||||
|
||||
@@ -368,31 +368,6 @@ pub struct Config {
|
||||
#[serde(default = "default_max_fetch_prev_events")]
|
||||
pub max_fetch_prev_events: u16,
|
||||
|
||||
/// How many incoming federation transactions the server is willing to be
|
||||
/// processing at any given time before it becomes overloaded and starts
|
||||
/// rejecting further transactions until some slots become available.
|
||||
///
|
||||
/// Setting this value too low or too high may result in unstable
|
||||
/// federation, and setting it too high may cause runaway resource usage.
|
||||
///
|
||||
/// default: 150
|
||||
#[serde(default = "default_max_concurrent_inbound_transactions")]
|
||||
pub max_concurrent_inbound_transactions: usize,
|
||||
|
||||
/// Maximum age (in seconds) for cached federation transaction responses.
|
||||
/// Entries older than this will be removed during cleanup.
|
||||
///
|
||||
/// default: 7200 (2 hours)
|
||||
#[serde(default = "default_transaction_id_cache_max_age_secs")]
|
||||
pub transaction_id_cache_max_age_secs: u64,
|
||||
|
||||
/// Maximum number of cached federation transaction responses.
|
||||
/// When the cache exceeds this limit, older entries will be removed.
|
||||
///
|
||||
/// default: 8192
|
||||
#[serde(default = "default_transaction_id_cache_max_entries")]
|
||||
pub transaction_id_cache_max_entries: usize,
|
||||
|
||||
/// Default/base connection timeout (seconds). This is used only by URL
|
||||
/// previews and update/news endpoint checks.
|
||||
///
|
||||
@@ -678,6 +653,12 @@ pub struct Config {
|
||||
#[serde(default)]
|
||||
pub allow_public_room_directory_over_federation: bool,
|
||||
|
||||
/// Set this to true to allow your server's public room directory to be
|
||||
/// queried without client authentication (access token) through the Client
|
||||
/// APIs. Set this to false to protect against /publicRooms spiders.
|
||||
#[serde(default)]
|
||||
pub allow_public_room_directory_without_auth: bool,
|
||||
|
||||
/// Allow guests/unauthenticated users to access TURN credentials.
|
||||
///
|
||||
/// This is the equivalent of Synapse's `turn_allow_guests` config option.
|
||||
@@ -1263,6 +1244,12 @@ pub struct Config {
|
||||
#[serde(default)]
|
||||
pub rocksdb_repair: bool,
|
||||
|
||||
#[serde(default)]
|
||||
pub rocksdb_read_only: bool,
|
||||
|
||||
#[serde(default)]
|
||||
pub rocksdb_secondary: bool,
|
||||
|
||||
/// Enables idle CPU priority for compaction thread. This is not enabled by
|
||||
/// default to prevent compaction from falling too far behind on busy
|
||||
/// systems.
|
||||
@@ -1322,33 +1309,26 @@ pub struct Config {
|
||||
|
||||
/// Allow local (your server only) presence updates/requests.
|
||||
///
|
||||
/// Local presence must be enabled for outgoing presence to function.
|
||||
///
|
||||
/// Note that local presence is not as heavy on the CPU as federated
|
||||
/// presence, but will still become more expensive the more local users you
|
||||
/// have.
|
||||
/// Note that presence on continuwuity is very fast unlike Synapse's. If
|
||||
/// using outgoing presence, this MUST be enabled.
|
||||
#[serde(default = "true_fn")]
|
||||
pub allow_local_presence: bool,
|
||||
|
||||
/// Allow incoming federated presence updates.
|
||||
/// Allow incoming federated presence updates/requests.
|
||||
///
|
||||
/// This option enables processing inbound presence updates from other
|
||||
/// servers. Without it, remote users will appear as if they are always
|
||||
/// offline to your local users. This does not affect typing indicators or
|
||||
/// read receipts.
|
||||
/// This option receives presence updates from other servers, but does not
|
||||
/// send any unless `allow_outgoing_presence` is true. Note that presence on
|
||||
/// continuwuity is very fast unlike Synapse's.
|
||||
#[serde(default = "true_fn")]
|
||||
pub allow_incoming_presence: bool,
|
||||
|
||||
/// Allow outgoing presence updates/requests.
|
||||
///
|
||||
/// This option sends presence updates to other servers, and requires that
|
||||
/// `allow_local_presence` is also enabled.
|
||||
///
|
||||
/// Note that outgoing presence is very heavy on the CPU and network, and
|
||||
/// will typically cause extreme strain and slowdowns for no real benefit.
|
||||
/// There are only a few clients that even implement presence, so you
|
||||
/// probably don't want to enable this.
|
||||
#[serde(default)]
|
||||
/// This option sends presence updates to other servers, but does not
|
||||
/// receive any unless `allow_incoming_presence` is true. Note that presence
|
||||
/// on continuwuity is very fast unlike Synapse's. If using outgoing
|
||||
/// presence, you MUST enable `allow_local_presence` as well.
|
||||
#[serde(default = "true_fn")]
|
||||
pub allow_outgoing_presence: bool,
|
||||
|
||||
/// How many seconds without presence updates before you become idle.
|
||||
@@ -1386,10 +1366,6 @@ pub struct Config {
|
||||
pub allow_incoming_read_receipts: bool,
|
||||
|
||||
/// Allow sending read receipts to remote servers.
|
||||
///
|
||||
/// Note that sending read receipts to remote servers in large rooms with
|
||||
/// lots of other homeservers may cause additional strain on the CPU and
|
||||
/// network.
|
||||
#[serde(default = "true_fn")]
|
||||
pub allow_outgoing_read_receipts: bool,
|
||||
|
||||
@@ -1401,10 +1377,6 @@ pub struct Config {
|
||||
pub allow_local_typing: bool,
|
||||
|
||||
/// Allow outgoing typing updates to federation.
|
||||
///
|
||||
/// Note that sending typing indicators to remote servers in large rooms
|
||||
/// with lots of other homeservers may cause additional strain on the CPU
|
||||
/// and network.
|
||||
#[serde(default = "true_fn")]
|
||||
pub allow_outgoing_typing: bool,
|
||||
|
||||
@@ -1544,7 +1516,7 @@ pub struct Config {
|
||||
/// sender user's server name, inbound federation X-Matrix origin, and
|
||||
/// outbound federation handler.
|
||||
///
|
||||
/// You can set this to [".*"] to block all servers by default, and then
|
||||
/// You can set this to ["*"] to block all servers by default, and then
|
||||
/// use `allowed_remote_server_names` to allow only specific servers.
|
||||
///
|
||||
/// example: ["badserver\\.tld$", "badphrase", "19dollarfortnitecards"]
|
||||
@@ -2080,12 +2052,6 @@ pub struct Config {
|
||||
/// display: nested
|
||||
#[serde(default)]
|
||||
pub blurhashing: BlurhashConfig,
|
||||
|
||||
/// Configuration for MatrixRTC (MSC4143) transport discovery.
|
||||
/// display: nested
|
||||
#[serde(default)]
|
||||
pub matrix_rtc: MatrixRtcConfig,
|
||||
|
||||
#[serde(flatten)]
|
||||
#[allow(clippy::zero_sized_map_values)]
|
||||
// this is a catchall, the map shouldn't be zero at runtime
|
||||
@@ -2151,16 +2117,17 @@ pub struct WellKnownConfig {
|
||||
/// listed.
|
||||
pub support_mxid: Option<OwnedUserId>,
|
||||
|
||||
/// **DEPRECATED**: Use `[global.matrix_rtc].foci` instead.
|
||||
///
|
||||
/// A list of MatrixRTC foci URLs which will be served as part of the
|
||||
/// MSC4143 client endpoint at /.well-known/matrix/client.
|
||||
/// MSC4143 client endpoint at /.well-known/matrix/client. If you're
|
||||
/// setting up livekit, you'd want something like:
|
||||
/// rtc_focus_server_urls = [
|
||||
/// { type = "livekit", livekit_service_url = "https://livekit.example.com" },
|
||||
/// ]
|
||||
///
|
||||
/// This option is deprecated and will be removed in a future release.
|
||||
/// Please migrate to the new `[global.matrix_rtc]` config section.
|
||||
/// To disable, set this to be an empty vector (`[]`).
|
||||
///
|
||||
/// default: []
|
||||
#[serde(default)]
|
||||
#[serde(default = "default_rtc_focus_urls")]
|
||||
pub rtc_focus_server_urls: Vec<RtcFocusInfo>,
|
||||
}
|
||||
|
||||
@@ -2189,43 +2156,6 @@ pub struct BlurhashConfig {
|
||||
pub blurhash_max_raw_size: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Default)]
|
||||
#[config_example_generator(filename = "conduwuit-example.toml", section = "global.matrix_rtc")]
|
||||
pub struct MatrixRtcConfig {
|
||||
/// A list of MatrixRTC foci (transports) which will be served via the
|
||||
/// MSC4143 RTC transports endpoint at
|
||||
/// `/_matrix/client/v1/rtc/transports`. If you're setting up livekit,
|
||||
/// you'd want something like:
|
||||
/// ```toml
|
||||
/// [global.matrix_rtc]
|
||||
/// foci = [
|
||||
/// { type = "livekit", livekit_service_url = "https://livekit.example.com" },
|
||||
/// ]
|
||||
/// ```
|
||||
///
|
||||
/// To disable, set this to an empty list (`[]`).
|
||||
///
|
||||
/// default: []
|
||||
#[serde(default)]
|
||||
pub foci: Vec<RtcFocusInfo>,
|
||||
}
|
||||
|
||||
impl MatrixRtcConfig {
|
||||
/// Returns the effective foci, falling back to the deprecated
|
||||
/// `rtc_focus_server_urls` if the new config is empty.
|
||||
#[must_use]
|
||||
pub fn effective_foci<'a>(
|
||||
&'a self,
|
||||
deprecated_foci: &'a [RtcFocusInfo],
|
||||
) -> &'a [RtcFocusInfo] {
|
||||
if !self.foci.is_empty() {
|
||||
&self.foci
|
||||
} else {
|
||||
deprecated_foci
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize)]
|
||||
#[config_example_generator(filename = "conduwuit-example.toml", section = "global.ldap")]
|
||||
pub struct LdapConfig {
|
||||
@@ -2419,7 +2349,6 @@ pub struct DraupnirConfig {
|
||||
"well_known_support_email",
|
||||
"well_known_support_mxid",
|
||||
"registration_token_file",
|
||||
"well_known.rtc_focus_server_urls",
|
||||
];
|
||||
|
||||
impl Config {
|
||||
@@ -2602,12 +2531,6 @@ fn default_pusher_idle_timeout() -> u64 { 15 }
|
||||
|
||||
fn default_max_fetch_prev_events() -> u16 { 192_u16 }
|
||||
|
||||
fn default_max_concurrent_inbound_transactions() -> usize { 150 }
|
||||
|
||||
fn default_transaction_id_cache_max_age_secs() -> u64 { 60 * 60 * 2 }
|
||||
|
||||
fn default_transaction_id_cache_max_entries() -> usize { 8192 }
|
||||
|
||||
fn default_tracing_flame_filter() -> String {
|
||||
cfg!(debug_assertions)
|
||||
.then_some("trace,h2=off")
|
||||
@@ -2703,6 +2626,9 @@ fn default_rocksdb_stats_level() -> u8 { 1 }
|
||||
#[inline]
|
||||
pub fn default_default_room_version() -> RoomVersionId { RoomVersionId::V11 }
|
||||
|
||||
#[must_use]
|
||||
pub fn default_rtc_focus_urls() -> Vec<RtcFocusInfo> { vec![] }
|
||||
|
||||
fn default_ip_range_denylist() -> Vec<String> {
|
||||
vec![
|
||||
"127.0.0.0/8".to_owned(),
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
static VERSION: OnceLock<String> = OnceLock::new();
|
||||
static VERSION_UA: OnceLock<String> = OnceLock::new();
|
||||
static USER_AGENT: OnceLock<String> = OnceLock::new();
|
||||
static USER_AGENT_MEDIA: OnceLock<String> = OnceLock::new();
|
||||
|
||||
#[inline]
|
||||
#[must_use]
|
||||
@@ -22,22 +21,14 @@ pub fn name() -> &'static str { BRANDING }
|
||||
|
||||
#[inline]
|
||||
pub fn version() -> &'static str { VERSION.get_or_init(init_version) }
|
||||
|
||||
#[inline]
|
||||
pub fn version_ua() -> &'static str { VERSION_UA.get_or_init(init_version_ua) }
|
||||
|
||||
#[inline]
|
||||
pub fn user_agent() -> &'static str { USER_AGENT.get_or_init(init_user_agent) }
|
||||
|
||||
#[inline]
|
||||
pub fn user_agent_media() -> &'static str { USER_AGENT_MEDIA.get_or_init(init_user_agent_media) }
|
||||
|
||||
fn init_user_agent() -> String { format!("{}/{} (bot; +{WEBSITE})", name(), version_ua()) }
|
||||
|
||||
fn init_user_agent_media() -> String {
|
||||
format!("{}/{} (embedbot; facebookexternalhit/1.1; +{WEBSITE})", name(), version_ua())
|
||||
}
|
||||
|
||||
fn init_version_ua() -> String {
|
||||
conduwuit_build_metadata::version_tag()
|
||||
.map_or_else(|| SEMANTIC.to_owned(), |extra| format!("{SEMANTIC}+{extra}"))
|
||||
|
||||
@@ -1046,7 +1046,7 @@ async fn test_event_sort() {
|
||||
// don't remove any events so we know it sorts them all correctly
|
||||
let mut events_to_sort = events.keys().cloned().collect::<Vec<_>>();
|
||||
|
||||
events_to_sort.shuffle(&mut rand::rng());
|
||||
events_to_sort.shuffle(&mut rand::thread_rng());
|
||||
|
||||
let power_level = resolved_power
|
||||
.get(&(StateEventType::RoomPowerLevels, "".into()))
|
||||
|
||||
@@ -28,7 +28,7 @@ fn init_argon() -> Argon2<'static> {
|
||||
}
|
||||
|
||||
pub(super) fn password(password: &str) -> Result<String> {
|
||||
let salt = SaltString::generate(rand_core::OsRng);
|
||||
let salt = SaltString::generate(rand::thread_rng());
|
||||
ARGON
|
||||
.get_or_init(init_argon)
|
||||
.hash_password(password.as_bytes(), &salt)
|
||||
|
||||
@@ -4,16 +4,16 @@
|
||||
};
|
||||
|
||||
use arrayvec::ArrayString;
|
||||
use rand::{RngExt, seq::SliceRandom};
|
||||
use rand::{Rng, seq::SliceRandom, thread_rng};
|
||||
|
||||
pub fn shuffle<T>(vec: &mut [T]) {
|
||||
let mut rng = rand::rng();
|
||||
let mut rng = thread_rng();
|
||||
vec.shuffle(&mut rng);
|
||||
}
|
||||
|
||||
pub fn string(length: usize) -> String {
|
||||
rand::rng()
|
||||
.sample_iter(&rand::distr::Alphanumeric)
|
||||
thread_rng()
|
||||
.sample_iter(&rand::distributions::Alphanumeric)
|
||||
.take(length)
|
||||
.map(char::from)
|
||||
.collect()
|
||||
@@ -22,8 +22,8 @@ pub fn string(length: usize) -> String {
|
||||
#[inline]
|
||||
pub fn string_array<const LENGTH: usize>() -> ArrayString<LENGTH> {
|
||||
let mut ret = ArrayString::<LENGTH>::new();
|
||||
rand::rng()
|
||||
.sample_iter(&rand::distr::Alphanumeric)
|
||||
thread_rng()
|
||||
.sample_iter(&rand::distributions::Alphanumeric)
|
||||
.take(LENGTH)
|
||||
.map(char::from)
|
||||
.for_each(|c| ret.push(c));
|
||||
@@ -40,4 +40,7 @@ pub fn time_from_now_secs(range: Range<u64>) -> SystemTime {
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn secs(range: Range<u64>) -> Duration { Duration::from_secs(rand::random_range(range)) }
|
||||
pub fn secs(range: Range<u64>) -> Duration {
|
||||
let mut rng = thread_rng();
|
||||
Duration::from_secs(rng.gen_range(range))
|
||||
}
|
||||
|
||||
@@ -3,17 +3,19 @@
|
||||
stream::{Stream, TryStream},
|
||||
};
|
||||
|
||||
use crate::{Error, Result};
|
||||
|
||||
pub trait IterStream<I: IntoIterator + Send> {
|
||||
/// Convert an Iterator into a Stream
|
||||
fn stream(self) -> impl Stream<Item = <I as IntoIterator>::Item> + Send;
|
||||
|
||||
/// Convert an Iterator into a TryStream with a generic error type
|
||||
fn try_stream<E>(
|
||||
/// Convert an Iterator into a TryStream
|
||||
fn try_stream(
|
||||
self,
|
||||
) -> impl TryStream<
|
||||
Ok = <I as IntoIterator>::Item,
|
||||
Error = E,
|
||||
Item = Result<<I as IntoIterator>::Item, E>,
|
||||
Error = Error,
|
||||
Item = Result<<I as IntoIterator>::Item, Error>,
|
||||
> + Send;
|
||||
}
|
||||
|
||||
@@ -26,12 +28,12 @@ impl<I> IterStream<I> for I
|
||||
fn stream(self) -> impl Stream<Item = <I as IntoIterator>::Item> + Send { stream::iter(self) }
|
||||
|
||||
#[inline]
|
||||
fn try_stream<E>(
|
||||
fn try_stream(
|
||||
self,
|
||||
) -> impl TryStream<
|
||||
Ok = <I as IntoIterator>::Item,
|
||||
Error = E,
|
||||
Item = Result<<I as IntoIterator>::Item, E>,
|
||||
Error = Error,
|
||||
Item = Result<<I as IntoIterator>::Item, Error>,
|
||||
> + Send {
|
||||
self.stream().map(Ok)
|
||||
}
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
//! Synchronous combinator extensions to futures::TryStream
|
||||
|
||||
use std::result::Result;
|
||||
|
||||
use futures::{TryFuture, TryStream, TryStreamExt};
|
||||
|
||||
use super::automatic_width;
|
||||
use crate::Result;
|
||||
|
||||
/// Concurrency extensions to augment futures::TryStreamExt. broad_ combinators
|
||||
/// produce out-of-order
|
||||
|
||||
@@ -33,6 +33,8 @@ pub struct Engine {
|
||||
pub(crate) db: Db,
|
||||
pub(crate) pool: Arc<Pool>,
|
||||
pub(crate) ctx: Arc<Context>,
|
||||
pub(super) read_only: bool,
|
||||
pub(super) secondary: bool,
|
||||
pub(crate) checksums: bool,
|
||||
corks: AtomicU32,
|
||||
}
|
||||
@@ -127,6 +129,14 @@ pub fn current_sequence(&self) -> u64 {
|
||||
|
||||
sequence
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[must_use]
|
||||
pub fn is_read_only(&self) -> bool { self.secondary || self.read_only }
|
||||
|
||||
#[inline]
|
||||
#[must_use]
|
||||
pub fn is_secondary(&self) -> bool { self.secondary }
|
||||
}
|
||||
|
||||
impl Drop for Engine {
|
||||
|
||||
@@ -12,8 +12,9 @@ pub fn backup(&self) -> Result {
|
||||
let mut engine = self.backup_engine()?;
|
||||
let config = &self.ctx.server.config;
|
||||
if config.database_backups_to_keep > 0 {
|
||||
let flush = !self.is_read_only();
|
||||
engine
|
||||
.create_new_backup_flush(&self.db, true)
|
||||
.create_new_backup_flush(&self.db, flush)
|
||||
.map_err(map_err)?;
|
||||
|
||||
let engine_info = engine.get_backup_info();
|
||||
|
||||
@@ -35,7 +35,14 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
|
||||
}
|
||||
|
||||
debug!("Opening database...");
|
||||
let db = Db::open_cf_descriptors(&db_opts, path, cfds).or_else(or_else)?;
|
||||
let db = if config.rocksdb_read_only {
|
||||
Db::open_cf_descriptors_read_only(&db_opts, path, cfds, false)
|
||||
} else if config.rocksdb_secondary {
|
||||
Db::open_cf_descriptors_as_secondary(&db_opts, path, path, cfds)
|
||||
} else {
|
||||
Db::open_cf_descriptors(&db_opts, path, cfds)
|
||||
}
|
||||
.or_else(or_else)?;
|
||||
|
||||
info!(
|
||||
columns = num_cfds,
|
||||
@@ -48,6 +55,8 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
|
||||
db,
|
||||
pool: ctx.pool.clone(),
|
||||
ctx: ctx.clone(),
|
||||
read_only: config.rocksdb_read_only,
|
||||
secondary: config.rocksdb_secondary,
|
||||
checksums: config.rocksdb_checksums,
|
||||
corks: AtomicU32::new(0),
|
||||
}))
|
||||
|
||||
@@ -362,10 +362,6 @@ pub(super) fn open_list(db: &Arc<Engine>, maps: &[Descriptor]) -> Result<Maps> {
|
||||
name: "userid_blurhash",
|
||||
..descriptor::RANDOM_SMALL
|
||||
},
|
||||
Descriptor {
|
||||
name: "userid_dehydrateddevice",
|
||||
..descriptor::RANDOM_SMALL
|
||||
},
|
||||
Descriptor {
|
||||
name: "userid_devicelistversion",
|
||||
..descriptor::RANDOM_SMALL
|
||||
|
||||
@@ -74,6 +74,14 @@ pub fn iter(&self) -> impl Iterator<Item = (&MapsKey, &MapsVal)> + Send + '_ {
|
||||
|
||||
#[inline]
|
||||
pub fn keys(&self) -> impl Iterator<Item = &MapsKey> + Send + '_ { self.maps.keys() }
|
||||
|
||||
#[inline]
|
||||
#[must_use]
|
||||
pub fn is_read_only(&self) -> bool { self.db.is_read_only() }
|
||||
|
||||
#[inline]
|
||||
#[must_use]
|
||||
pub fn is_secondary(&self) -> bool { self.db.is_secondary() }
|
||||
}
|
||||
|
||||
impl Index<&str> for Database {
|
||||
|
||||
@@ -99,11 +99,6 @@ gzip_compression = [
|
||||
hardened_malloc = [
|
||||
"conduwuit-core/hardened_malloc",
|
||||
]
|
||||
http3 = [
|
||||
"conduwuit-api/http3",
|
||||
"conduwuit-core/http3",
|
||||
"conduwuit-service/http3",
|
||||
]
|
||||
io_uring = [
|
||||
"conduwuit-database/io_uring",
|
||||
]
|
||||
|
||||
@@ -27,6 +27,10 @@ pub struct Args {
|
||||
#[arg(long, short('O'))]
|
||||
pub option: Vec<String>,
|
||||
|
||||
/// Run in a stricter read-only --maintenance mode.
|
||||
#[arg(long)]
|
||||
pub read_only: bool,
|
||||
|
||||
/// Run in maintenance mode while refusing connections.
|
||||
#[arg(long)]
|
||||
pub maintenance: bool,
|
||||
@@ -139,7 +143,11 @@ pub(crate) fn parse() -> Args { Args::parse() }
|
||||
|
||||
/// Synthesize any command line options with configuration file options.
|
||||
pub(crate) fn update(mut config: Figment, args: &Args) -> Result<Figment> {
|
||||
if args.maintenance {
|
||||
if args.read_only {
|
||||
config = config.join(("rocksdb_read_only", true));
|
||||
}
|
||||
|
||||
if args.maintenance || args.read_only {
|
||||
config = config.join(("startup_netburst", false));
|
||||
config = config.join(("listening", false));
|
||||
}
|
||||
|
||||
@@ -32,9 +32,6 @@ gzip_compression = [
|
||||
"conduwuit-core/gzip_compression",
|
||||
"reqwest/gzip",
|
||||
]
|
||||
http3 = [
|
||||
"conduwuit-core/http3",
|
||||
]
|
||||
io_uring = [
|
||||
"conduwuit-database/io_uring",
|
||||
]
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
use async_trait::async_trait;
|
||||
use conduwuit::{Result, Server, debug, error, warn};
|
||||
use database::{Deserialized, Map};
|
||||
use rand::Rng;
|
||||
use ruma::events::{Mentions, room::message::RoomMessageEventContent};
|
||||
use serde::Deserialize;
|
||||
use tokio::{
|
||||
@@ -99,7 +100,8 @@ async fn worker(self: Arc<Self>) -> Result<()> {
|
||||
}
|
||||
|
||||
let first_check_jitter = {
|
||||
let jitter_percent = rand::random_range(-50.0..=10.0);
|
||||
let mut rng = rand::thread_rng();
|
||||
let jitter_percent = rng.gen_range(-50.0..=10.0);
|
||||
self.interval.mul_f64(1.0 + jitter_percent / 100.0)
|
||||
};
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
let url_preview_user_agent = config
|
||||
.url_preview_user_agent
|
||||
.clone()
|
||||
.unwrap_or_else(|| conduwuit::version::user_agent_media().to_owned());
|
||||
.unwrap_or_else(|| conduwuit::version::user_agent().to_owned());
|
||||
|
||||
Ok(Arc::new(Self {
|
||||
default: base(config)?
|
||||
|
||||
@@ -37,6 +37,10 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
}
|
||||
|
||||
async fn worker(self: Arc<Self>) -> Result {
|
||||
if self.services.globals.is_read_only() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if self.services.config.ldap.enable {
|
||||
warn!("emergency password feature not available with LDAP enabled.");
|
||||
return Ok(());
|
||||
|
||||
@@ -156,4 +156,7 @@ pub fn user_is_local(&self, user_id: &UserId) -> bool {
|
||||
pub fn server_is_ours(&self, server_name: &ServerName) -> bool {
|
||||
server_name == self.server_name()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn is_read_only(&self) -> bool { self.db.db.is_read_only() }
|
||||
}
|
||||
|
||||
@@ -170,8 +170,6 @@ pub(super) fn remove_url_preview(&self, url: &str) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) async fn clear_url_previews(&self) { self.url_previews.clear().await; }
|
||||
|
||||
pub(super) fn set_url_preview(
|
||||
&self,
|
||||
url: &str,
|
||||
|
||||
@@ -37,9 +37,6 @@ pub async fn remove_url_preview(&self, url: &str) -> Result<()> {
|
||||
self.db.remove_url_preview(url)
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub async fn clear_url_previews(&self) { self.db.clear_url_previews().await; }
|
||||
|
||||
#[implement(Service)]
|
||||
pub async fn set_url_preview(&self, url: &str, data: &UrlPreviewData) -> Result<()> {
|
||||
let now = SystemTime::now()
|
||||
|
||||
@@ -31,7 +31,7 @@
|
||||
pub mod sending;
|
||||
pub mod server_keys;
|
||||
pub mod sync;
|
||||
pub mod transactions;
|
||||
pub mod transaction_ids;
|
||||
pub mod uiaa;
|
||||
pub mod users;
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use conduwuit::{
|
||||
Err, Event, Result, err,
|
||||
Err, Event, Result, Server, err,
|
||||
utils::{ReadyExt, stream::TryIgnore},
|
||||
};
|
||||
use database::{Deserialized, Ignore, Interfix, Map};
|
||||
@@ -30,12 +30,12 @@ struct Data {
|
||||
}
|
||||
|
||||
struct Services {
|
||||
server: Arc<Server>,
|
||||
admin: Dep<admin::Service>,
|
||||
appservice: Dep<appservice::Service>,
|
||||
globals: Dep<globals::Service>,
|
||||
sending: Dep<sending::Service>,
|
||||
state_accessor: Dep<rooms::state_accessor::Service>,
|
||||
state_cache: Dep<rooms::state_cache::Service>,
|
||||
}
|
||||
|
||||
impl crate::Service for Service {
|
||||
@@ -47,13 +47,13 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
aliasid_alias: args.db["aliasid_alias"].clone(),
|
||||
},
|
||||
services: Services {
|
||||
server: args.server.clone(),
|
||||
admin: args.depend::<admin::Service>("admin"),
|
||||
appservice: args.depend::<appservice::Service>("appservice"),
|
||||
globals: args.depend::<globals::Service>("globals"),
|
||||
sending: args.depend::<sending::Service>("sending"),
|
||||
state_accessor: args
|
||||
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
|
||||
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
|
||||
},
|
||||
}))
|
||||
}
|
||||
@@ -117,9 +117,6 @@ pub async fn remove_alias(&self, alias: &RoomAliasId, user_id: &UserId) -> Resul
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Resolves the given room ID or alias, returning the resolved room ID.
|
||||
/// Unlike resolve_with_servers (the underlying call), potential resident
|
||||
/// servers are not returned
|
||||
#[inline]
|
||||
pub async fn resolve(&self, room: &RoomOrAliasId) -> Result<OwnedRoomId> {
|
||||
self.resolve_with_servers(room, None)
|
||||
@@ -127,14 +124,6 @@ pub async fn resolve(&self, room: &RoomOrAliasId) -> Result<OwnedRoomId> {
|
||||
.map(|(room_id, _)| room_id)
|
||||
}
|
||||
|
||||
/// Resolves the given room ID or alias, returning the resolved room ID, and
|
||||
/// any servers that might be able to assist in fetching room data.
|
||||
///
|
||||
/// If the input is a room ID, this simply returns it and <servers>.
|
||||
/// If the input is an alias, this attempts to resolve it locally, then via
|
||||
/// appservices, and finally remotely if the alias is not local.
|
||||
/// If the alias is successfully resolved, the room ID and an empty list of
|
||||
/// servers is returned.
|
||||
pub async fn resolve_with_servers(
|
||||
&self,
|
||||
room: &RoomOrAliasId,
|
||||
@@ -145,26 +134,28 @@ pub async fn resolve_with_servers(
|
||||
Ok((room_id.to_owned(), servers.unwrap_or_default()))
|
||||
} else {
|
||||
let alias: &RoomAliasId = room.try_into().expect("valid RoomAliasId");
|
||||
self.resolve_alias(alias).await
|
||||
self.resolve_alias(alias, servers).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolves the given room alias, returning the resolved room ID and any
|
||||
/// servers that might be in the room.
|
||||
#[tracing::instrument(skip(self), name = "resolve")]
|
||||
pub async fn resolve_alias(
|
||||
&self,
|
||||
room_alias: &RoomAliasId,
|
||||
servers: Option<Vec<OwnedServerName>>,
|
||||
) -> Result<(OwnedRoomId, Vec<OwnedServerName>)> {
|
||||
let server_is_ours = self
|
||||
.services
|
||||
.globals
|
||||
.server_is_ours(room_alias.server_name());
|
||||
let server_name = room_alias.server_name();
|
||||
let server_is_ours = self.services.globals.server_is_ours(server_name);
|
||||
let servers_contains_ours = || {
|
||||
servers
|
||||
.as_ref()
|
||||
.is_some_and(|servers| servers.contains(&self.services.server.name))
|
||||
};
|
||||
|
||||
if !server_is_ours {
|
||||
// TODO: The spec advises servers may cache remote room aliases temporarily.
|
||||
// We might want to look at doing that.
|
||||
return self.remote_resolve(room_alias).await;
|
||||
if !server_is_ours && !servers_contains_ours() {
|
||||
return self
|
||||
.remote_resolve(room_alias, servers.unwrap_or_default())
|
||||
.await;
|
||||
}
|
||||
|
||||
let room_id = match self.resolve_local_alias(room_alias).await {
|
||||
@@ -172,18 +163,10 @@ pub async fn resolve_alias(
|
||||
| Err(_) => self.resolve_appservice_alias(room_alias).await?,
|
||||
};
|
||||
|
||||
if let Some(room_id) = room_id {
|
||||
let servers: Vec<OwnedServerName> = self
|
||||
.services
|
||||
.state_cache
|
||||
.room_servers(&room_id)
|
||||
.map(ToOwned::to_owned)
|
||||
.collect()
|
||||
.await;
|
||||
return Ok((room_id, servers));
|
||||
}
|
||||
|
||||
Err!(Request(NotFound("Alias does not exist.")))
|
||||
room_id.map_or_else(
|
||||
|| Err!(Request(NotFound("Room with alias not found."))),
|
||||
|room_id| Ok((room_id, Vec::new())),
|
||||
)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
@@ -223,12 +206,12 @@ async fn user_can_remove_alias(&self, alias: &RoomAliasId, user_id: &UserId) ->
|
||||
|
||||
// The creator of an alias can remove it
|
||||
if self
|
||||
.who_created_alias(alias).await
|
||||
.is_ok_and(|user| user == user_id)
|
||||
// Server admins can remove any local alias
|
||||
|| self.services.admin.user_is_admin(user_id).await
|
||||
// Always allow the server service account to remove the alias, since there may not be an admin room
|
||||
|| server_user == user_id
|
||||
.who_created_alias(alias).await
|
||||
.is_ok_and(|user| user == user_id)
|
||||
// Server admins can remove any local alias
|
||||
|| self.services.admin.user_is_admin(user_id).await
|
||||
// Always allow the server service account to remove the alias, since there may not be an admin room
|
||||
|| server_user == user_id
|
||||
{
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use conduwuit::{Result, debug, error, implement};
|
||||
use std::iter::once;
|
||||
|
||||
use conduwuit::{Result, debug, debug_error, err, implement};
|
||||
use federation::query::get_room_information::v1::Response;
|
||||
use ruma::{OwnedRoomId, OwnedServerName, RoomAliasId, ServerName, api::federation};
|
||||
|
||||
@@ -6,21 +8,40 @@
|
||||
pub(super) async fn remote_resolve(
|
||||
&self,
|
||||
room_alias: &RoomAliasId,
|
||||
servers: Vec<OwnedServerName>,
|
||||
) -> Result<(OwnedRoomId, Vec<OwnedServerName>)> {
|
||||
debug!("Asking {} to resolve {room_alias:?}", room_alias.server_name());
|
||||
match self
|
||||
.remote_request(room_alias, room_alias.server_name())
|
||||
.await
|
||||
{
|
||||
| Err(e) => {
|
||||
error!("Unable to resolve remote room alias {}: {e}", room_alias);
|
||||
Err(e)
|
||||
},
|
||||
| Ok(Response { room_id, servers }) => {
|
||||
debug!("Remote resolved {room_alias:?} to {room_id:?} with servers {servers:?}");
|
||||
Ok((room_id, servers))
|
||||
},
|
||||
debug!(?room_alias, servers = ?servers, "resolve");
|
||||
let servers = once(room_alias.server_name())
|
||||
.map(ToOwned::to_owned)
|
||||
.chain(servers.into_iter());
|
||||
|
||||
let mut resolved_servers = Vec::new();
|
||||
let mut resolved_room_id: Option<OwnedRoomId> = None;
|
||||
for server in servers {
|
||||
match self.remote_request(room_alias, &server).await {
|
||||
| Err(e) => debug_error!("Failed to query for {room_alias:?} from {server}: {e}"),
|
||||
| Ok(Response { room_id, servers }) => {
|
||||
debug!(
|
||||
"Server {server} answered with {room_id:?} for {room_alias:?} servers: \
|
||||
{servers:?}"
|
||||
);
|
||||
|
||||
resolved_room_id.get_or_insert(room_id);
|
||||
add_server(&mut resolved_servers, server);
|
||||
|
||||
if !servers.is_empty() {
|
||||
add_servers(&mut resolved_servers, servers);
|
||||
break;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
resolved_room_id
|
||||
.map(|room_id| (room_id, resolved_servers))
|
||||
.ok_or_else(|| {
|
||||
err!(Request(NotFound("No servers could assist in resolving the room alias")))
|
||||
})
|
||||
}
|
||||
|
||||
#[implement(super::Service)]
|
||||
@@ -38,3 +59,15 @@ async fn remote_request(
|
||||
.send_federation_request(server, request)
|
||||
.await
|
||||
}
|
||||
|
||||
fn add_servers(servers: &mut Vec<OwnedServerName>, new: Vec<OwnedServerName>) {
|
||||
for server in new {
|
||||
add_server(servers, server);
|
||||
}
|
||||
}
|
||||
|
||||
fn add_server(servers: &mut Vec<OwnedServerName>, server: OwnedServerName) {
|
||||
if !servers.contains(&server) {
|
||||
servers.push(server);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,7 +142,7 @@ async fn get_auth_chain_outer(
|
||||
|
||||
let chunk_cache: Vec<_> = chunk
|
||||
.into_iter()
|
||||
.try_stream::<conduwuit::Error>()
|
||||
.try_stream()
|
||||
.broad_and_then(|(shortid, event_id)| async move {
|
||||
if let Ok(cached) = self.get_cached_eventid_authchain(&[shortid]).await {
|
||||
return Ok(cached.to_vec());
|
||||
|
||||
@@ -63,9 +63,7 @@ pub(super) async fn fetch_state<Pdu>(
|
||||
},
|
||||
| hash_map::Entry::Occupied(_) => {
|
||||
return Err!(Database(
|
||||
"State event's type and state_key combination exists multiple times: {}, {}",
|
||||
pdu.kind(),
|
||||
state_key
|
||||
"State event's type and state_key combination exists multiple times.",
|
||||
));
|
||||
},
|
||||
}
|
||||
|
||||
@@ -162,9 +162,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
||||
},
|
||||
| hash_map::Entry::Occupied(_) => {
|
||||
return Err!(Request(InvalidParam(
|
||||
"Auth event's type and state_key combination exists multiple times: {}, {}",
|
||||
auth_event.kind,
|
||||
auth_event.state_key().unwrap_or("")
|
||||
"Auth event's type and state_key combination exists multiple times.",
|
||||
)));
|
||||
},
|
||||
}
|
||||
|
||||
@@ -139,12 +139,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
|
||||
})
|
||||
.boxed();
|
||||
|
||||
let mut federated_room = false;
|
||||
|
||||
while let Some(ref backfill_server) = servers.next().await {
|
||||
if !self.services.globals.server_is_ours(backfill_server) {
|
||||
federated_room = true;
|
||||
}
|
||||
info!("Asking {backfill_server} for backfill in {room_id}");
|
||||
let response = self
|
||||
.services
|
||||
@@ -173,9 +168,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
|
||||
}
|
||||
}
|
||||
|
||||
if federated_room {
|
||||
warn!("No servers could backfill, but backfill was needed in room {room_id}");
|
||||
}
|
||||
warn!("No servers could backfill, but backfill was needed in room {room_id}");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -385,13 +385,11 @@ fn num_senders(args: &crate::Args<'_>) -> usize {
|
||||
const MIN_SENDERS: usize = 1;
|
||||
// Limit the number of senders to the number of workers threads or number of
|
||||
// cores, conservatively.
|
||||
let mut max_senders = args.server.metrics.num_workers();
|
||||
|
||||
// Work around some platforms not returning the number of cores.
|
||||
let num_cores = available_parallelism();
|
||||
if num_cores > 0 {
|
||||
max_senders = max_senders.min(num_cores);
|
||||
}
|
||||
let max_senders = args
|
||||
.server
|
||||
.metrics
|
||||
.num_workers()
|
||||
.min(available_parallelism());
|
||||
|
||||
// If the user doesn't override the default 0, this is intended to then default
|
||||
// to 1 for now as multiple senders is experimental.
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
|
||||
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
|
||||
use conduwuit_core::{
|
||||
Error, Event, Result, at, debug, err, error,
|
||||
Error, Event, Result, debug, err, error,
|
||||
result::LogErr,
|
||||
trace,
|
||||
utils::{
|
||||
@@ -175,7 +175,7 @@ async fn handle_response_ok<'a>(
|
||||
if !new_events.is_empty() {
|
||||
self.db.mark_as_active(new_events.iter());
|
||||
|
||||
let new_events_vec = new_events.into_iter().map(at!(1)).collect();
|
||||
let new_events_vec = new_events.into_iter().map(|(_, event)| event).collect();
|
||||
futures.push(self.send_events(dest.clone(), new_events_vec));
|
||||
} else {
|
||||
statuses.remove(dest);
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
media, moderation, presence, pusher, registration_tokens, resolver, rooms, sending,
|
||||
server_keys,
|
||||
service::{self, Args, Map, Service},
|
||||
sync, transactions, uiaa, users,
|
||||
sync, transaction_ids, uiaa, users,
|
||||
};
|
||||
|
||||
pub struct Services {
|
||||
@@ -37,7 +37,7 @@ pub struct Services {
|
||||
pub sending: Arc<sending::Service>,
|
||||
pub server_keys: Arc<server_keys::Service>,
|
||||
pub sync: Arc<sync::Service>,
|
||||
pub transactions: Arc<transactions::Service>,
|
||||
pub transaction_ids: Arc<transaction_ids::Service>,
|
||||
pub uiaa: Arc<uiaa::Service>,
|
||||
pub users: Arc<users::Service>,
|
||||
pub moderation: Arc<moderation::Service>,
|
||||
@@ -110,7 +110,7 @@ macro_rules! build {
|
||||
sending: build!(sending::Service),
|
||||
server_keys: build!(server_keys::Service),
|
||||
sync: build!(sync::Service),
|
||||
transactions: build!(transactions::Service),
|
||||
transaction_ids: build!(transaction_ids::Service),
|
||||
uiaa: build!(uiaa::Service),
|
||||
users: build!(users::Service),
|
||||
moderation: build!(moderation::Service),
|
||||
@@ -139,7 +139,7 @@ pub async fn start(self: &Arc<Self>) -> Result<Arc<Self>> {
|
||||
|
||||
// reset dormant online/away statuses to offline, and set the server user as
|
||||
// online
|
||||
if self.server.config.allow_local_presence {
|
||||
if self.server.config.allow_local_presence && !self.db.is_read_only() {
|
||||
self.presence.unset_all_presence().await;
|
||||
_ = self
|
||||
.presence
|
||||
@@ -156,7 +156,7 @@ pub async fn stop(&self) {
|
||||
info!("Shutting down services...");
|
||||
|
||||
// set the server user as offline
|
||||
if self.server.config.allow_local_presence {
|
||||
if self.server.config.allow_local_presence && !self.db.is_read_only() {
|
||||
_ = self
|
||||
.presence
|
||||
.ping_presence(&self.globals.server_user, &ruma::presence::PresenceState::Offline)
|
||||
|
||||
54
src/service/transaction_ids/mod.rs
Normal file
54
src/service/transaction_ids/mod.rs
Normal file
@@ -0,0 +1,54 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use conduwuit::{Result, implement};
|
||||
use database::{Handle, Map};
|
||||
use ruma::{DeviceId, TransactionId, UserId};
|
||||
|
||||
pub struct Service {
|
||||
db: Data,
|
||||
}
|
||||
|
||||
struct Data {
|
||||
userdevicetxnid_response: Arc<Map>,
|
||||
}
|
||||
|
||||
impl crate::Service for Service {
|
||||
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
Ok(Arc::new(Self {
|
||||
db: Data {
|
||||
userdevicetxnid_response: args.db["userdevicetxnid_response"].clone(),
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
||||
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub fn add_txnid(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
device_id: Option<&DeviceId>,
|
||||
txn_id: &TransactionId,
|
||||
data: &[u8],
|
||||
) {
|
||||
let mut key = user_id.as_bytes().to_vec();
|
||||
key.push(0xFF);
|
||||
key.extend_from_slice(device_id.map(DeviceId::as_bytes).unwrap_or_default());
|
||||
key.push(0xFF);
|
||||
key.extend_from_slice(txn_id.as_bytes());
|
||||
|
||||
self.db.userdevicetxnid_response.insert(&key, data);
|
||||
}
|
||||
|
||||
// If there's no entry, this is a new transaction
|
||||
#[implement(Service)]
|
||||
pub async fn existing_txnid(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
device_id: Option<&DeviceId>,
|
||||
txn_id: &TransactionId,
|
||||
) -> Result<Handle<'_>> {
|
||||
let key = (user_id, device_id, txn_id);
|
||||
self.db.userdevicetxnid_response.qry(&key).await
|
||||
}
|
||||
@@ -1,326 +0,0 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt,
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicU64, Ordering},
|
||||
},
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use conduwuit::{Error, Result, SyncRwLock, debug_warn, warn};
|
||||
use database::{Handle, Map};
|
||||
use ruma::{
|
||||
DeviceId, OwnedServerName, OwnedTransactionId, TransactionId, UserId,
|
||||
api::{
|
||||
client::error::ErrorKind::LimitExceeded,
|
||||
federation::transactions::send_transaction_message,
|
||||
},
|
||||
};
|
||||
use tokio::sync::watch::{Receiver, Sender};
|
||||
|
||||
use crate::{Dep, config};
|
||||
|
||||
pub type TxnKey = (OwnedServerName, OwnedTransactionId);
|
||||
pub type WrappedTransactionResponse =
|
||||
Option<Result<send_transaction_message::v1::Response, TransactionError>>;
|
||||
|
||||
/// Errors that can occur during federation transaction processing.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum TransactionError {
|
||||
/// Server is shutting down - the sender should retry the entire
|
||||
/// transaction.
|
||||
ShuttingDown,
|
||||
}
|
||||
|
||||
impl fmt::Display for TransactionError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
| Self::ShuttingDown => write!(f, "Server is shutting down"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for TransactionError {}
|
||||
|
||||
/// Minimum interval between cache cleanup runs.
|
||||
/// Exists to prevent thrashing when the cache is full of things that can't be
|
||||
/// cleared
|
||||
const CLEANUP_INTERVAL_SECS: u64 = 30;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct CachedTxnResponse {
|
||||
pub response: send_transaction_message::v1::Response,
|
||||
pub created: SystemTime,
|
||||
}
|
||||
|
||||
/// Internal state for a federation transaction.
|
||||
/// Either actively being processed or completed and cached.
|
||||
#[derive(Clone)]
|
||||
enum TxnState {
|
||||
/// Transaction is currently being processed.
|
||||
Active(Receiver<WrappedTransactionResponse>),
|
||||
|
||||
/// Transaction completed and response is cached.
|
||||
Cached(CachedTxnResponse),
|
||||
}
|
||||
|
||||
/// Result of atomically checking or starting a federation transaction.
|
||||
pub enum FederationTxnState {
|
||||
/// Transaction already completed and cached
|
||||
Cached(send_transaction_message::v1::Response),
|
||||
|
||||
/// Transaction is currently being processed by another request.
|
||||
/// Wait on this receiver for the result.
|
||||
Active(Receiver<WrappedTransactionResponse>),
|
||||
|
||||
/// This caller should process the transaction (first to request it).
|
||||
Started {
|
||||
receiver: Receiver<WrappedTransactionResponse>,
|
||||
sender: Sender<WrappedTransactionResponse>,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct Service {
|
||||
services: Services,
|
||||
db: Data,
|
||||
federation_txn_state: Arc<SyncRwLock<HashMap<TxnKey, TxnState>>>,
|
||||
last_cleanup: AtomicU64,
|
||||
}
|
||||
|
||||
struct Services {
|
||||
config: Dep<config::Service>,
|
||||
}
|
||||
|
||||
struct Data {
|
||||
userdevicetxnid_response: Arc<Map>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl crate::Service for Service {
|
||||
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
Ok(Arc::new(Self {
|
||||
services: Services {
|
||||
config: args.depend::<config::Service>("config"),
|
||||
},
|
||||
db: Data {
|
||||
userdevicetxnid_response: args.db["userdevicetxnid_response"].clone(),
|
||||
},
|
||||
federation_txn_state: Arc::new(SyncRwLock::new(HashMap::new())),
|
||||
last_cleanup: AtomicU64::new(0),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn clear_cache(&self) {
|
||||
let mut state = self.federation_txn_state.write();
|
||||
// Only clear cached entries, preserve active transactions
|
||||
state.retain(|_, v| matches!(v, TxnState::Active(_)));
|
||||
}
|
||||
|
||||
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
||||
}
|
||||
|
||||
impl Service {
|
||||
/// Returns the count of currently active (in-progress) transactions.
|
||||
#[must_use]
|
||||
pub fn txn_active_handle_count(&self) -> usize {
|
||||
let state = self.federation_txn_state.read();
|
||||
state
|
||||
.values()
|
||||
.filter(|v| matches!(v, TxnState::Active(_)))
|
||||
.count()
|
||||
}
|
||||
|
||||
pub fn add_client_txnid(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
device_id: Option<&DeviceId>,
|
||||
txn_id: &TransactionId,
|
||||
data: &[u8],
|
||||
) {
|
||||
let mut key = user_id.as_bytes().to_vec();
|
||||
key.push(0xFF);
|
||||
key.extend_from_slice(device_id.map(DeviceId::as_bytes).unwrap_or_default());
|
||||
key.push(0xFF);
|
||||
key.extend_from_slice(txn_id.as_bytes());
|
||||
|
||||
self.db.userdevicetxnid_response.insert(&key, data);
|
||||
}
|
||||
|
||||
pub async fn get_client_txn(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
device_id: Option<&DeviceId>,
|
||||
txn_id: &TransactionId,
|
||||
) -> Result<Handle<'_>> {
|
||||
let key = (user_id, device_id, txn_id);
|
||||
self.db.userdevicetxnid_response.qry(&key).await
|
||||
}
|
||||
|
||||
/// Atomically gets a cached response, joins an active transaction, or
|
||||
/// starts a new one.
|
||||
pub fn get_or_start_federation_txn(&self, key: TxnKey) -> Result<FederationTxnState> {
|
||||
// Only one upgradable lock can be held at a time, and there aren't any
|
||||
// read-only locks, so no point being upgradable
|
||||
let mut state = self.federation_txn_state.write();
|
||||
|
||||
// Check existing state for this key
|
||||
if let Some(txn_state) = state.get(&key) {
|
||||
return Ok(match txn_state {
|
||||
| TxnState::Cached(cached) => FederationTxnState::Cached(cached.response.clone()),
|
||||
| TxnState::Active(receiver) => FederationTxnState::Active(receiver.clone()),
|
||||
});
|
||||
}
|
||||
|
||||
// Check if another transaction from this origin is already running
|
||||
let has_active_from_origin = state
|
||||
.iter()
|
||||
.any(|(k, v)| k.0 == key.0 && matches!(v, TxnState::Active(_)));
|
||||
|
||||
if has_active_from_origin {
|
||||
debug_warn!(
|
||||
origin = ?key.0,
|
||||
"Got concurrent transaction request from an origin with an active transaction"
|
||||
);
|
||||
return Err(Error::BadRequest(
|
||||
LimitExceeded { retry_after: None },
|
||||
"Still processing another transaction from this origin",
|
||||
));
|
||||
}
|
||||
|
||||
let max_active_txns = self.services.config.max_concurrent_inbound_transactions;
|
||||
|
||||
// Check if we're at capacity
|
||||
if state.len() >= max_active_txns
|
||||
&& let active_count = state
|
||||
.values()
|
||||
.filter(|v| matches!(v, TxnState::Active(_)))
|
||||
.count() && active_count >= max_active_txns
|
||||
{
|
||||
warn!(
|
||||
active = active_count,
|
||||
max = max_active_txns,
|
||||
"Server is overloaded, dropping incoming transaction"
|
||||
);
|
||||
return Err(Error::BadRequest(
|
||||
LimitExceeded { retry_after: None },
|
||||
"Server is overloaded, try again later",
|
||||
));
|
||||
}
|
||||
|
||||
// Start new transaction
|
||||
let (sender, receiver) = tokio::sync::watch::channel(None);
|
||||
state.insert(key, TxnState::Active(receiver.clone()));
|
||||
|
||||
Ok(FederationTxnState::Started { receiver, sender })
|
||||
}
|
||||
|
||||
/// Finishes a transaction by transitioning it from active to cached state.
|
||||
/// Additionally may trigger cleanup of old entries.
|
||||
pub fn finish_federation_txn(
|
||||
&self,
|
||||
key: TxnKey,
|
||||
sender: Sender<WrappedTransactionResponse>,
|
||||
response: send_transaction_message::v1::Response,
|
||||
) {
|
||||
// Check if cleanup might be needed before acquiring the lock
|
||||
let should_try_cleanup = self.should_try_cleanup();
|
||||
|
||||
let mut state = self.federation_txn_state.write();
|
||||
|
||||
// Explicitly set cached first so there is no gap where receivers get a closed
|
||||
// channel
|
||||
state.insert(
|
||||
key,
|
||||
TxnState::Cached(CachedTxnResponse {
|
||||
response: response.clone(),
|
||||
created: SystemTime::now(),
|
||||
}),
|
||||
);
|
||||
|
||||
if let Err(e) = sender.send(Some(Ok(response))) {
|
||||
debug_warn!("Failed to send transaction response to waiting receivers: {e}");
|
||||
}
|
||||
|
||||
// Explicitly close
|
||||
drop(sender);
|
||||
|
||||
// This task is dangling, we can try clean caches now
|
||||
if should_try_cleanup {
|
||||
self.cleanup_entries_locked(&mut state);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove_federation_txn(&self, key: &TxnKey) {
|
||||
let mut state = self.federation_txn_state.write();
|
||||
state.remove(key);
|
||||
}
|
||||
|
||||
/// Checks if enough time has passed since the last cleanup to consider
|
||||
/// running another. Updates the last cleanup time if returning true.
|
||||
fn should_try_cleanup(&self) -> bool {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.expect("SystemTime before UNIX_EPOCH")
|
||||
.as_secs();
|
||||
let last = self.last_cleanup.load(Ordering::Relaxed);
|
||||
|
||||
if now.saturating_sub(last) >= CLEANUP_INTERVAL_SECS {
|
||||
// CAS: only update if no one else has updated it since we read
|
||||
self.last_cleanup
|
||||
.compare_exchange(last, now, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Cleans up cached entries based on age and count limits.
|
||||
///
|
||||
/// First removes all cached entries older than the configured max age.
|
||||
/// Then, if the cache still exceeds the max entry count, removes the oldest
|
||||
/// cached entries until the count is within limits.
|
||||
///
|
||||
/// Must be called with write lock held on the state map.
|
||||
fn cleanup_entries_locked(&self, state: &mut HashMap<TxnKey, TxnState>) {
|
||||
let max_age_secs = self.services.config.transaction_id_cache_max_age_secs;
|
||||
let max_entries = self.services.config.transaction_id_cache_max_entries;
|
||||
|
||||
// First pass: remove all cached entries older than max age
|
||||
let cutoff = SystemTime::now()
|
||||
.checked_sub(Duration::from_secs(max_age_secs))
|
||||
.unwrap_or(SystemTime::UNIX_EPOCH);
|
||||
|
||||
state.retain(|_, v| match v {
|
||||
| TxnState::Active(_) => true, // Never remove active transactions
|
||||
| TxnState::Cached(cached) => cached.created > cutoff,
|
||||
});
|
||||
|
||||
// Count cached entries
|
||||
let cached_count = state
|
||||
.values()
|
||||
.filter(|v| matches!(v, TxnState::Cached(_)))
|
||||
.count();
|
||||
|
||||
// Second pass: if still over max entries, remove oldest cached entries
|
||||
if cached_count > max_entries {
|
||||
let excess = cached_count.saturating_sub(max_entries);
|
||||
|
||||
// Collect cached entries sorted by age (oldest first)
|
||||
let mut cached_entries: Vec<_> = state
|
||||
.iter()
|
||||
.filter_map(|(k, v)| match v {
|
||||
| TxnState::Cached(cached) => Some((k.clone(), cached.created)),
|
||||
| TxnState::Active(_) => None,
|
||||
})
|
||||
.collect();
|
||||
cached_entries.sort_by(|a, b| a.1.cmp(&b.1));
|
||||
|
||||
// Remove the oldest cached entries to get under the limit
|
||||
for (key, _) in cached_entries.into_iter().take(excess) {
|
||||
state.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,149 +0,0 @@
|
||||
use conduwuit::{Err, Result, implement, trace};
|
||||
use conduwuit_database::{Deserialized, Json};
|
||||
use ruma::{
|
||||
DeviceId, OwnedDeviceId, UserId,
|
||||
api::client::dehydrated_device::{
|
||||
DehydratedDeviceData, put_dehydrated_device::unstable::Request,
|
||||
},
|
||||
serde::Raw,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct DehydratedDevice {
|
||||
/// Unique ID of the device.
|
||||
pub device_id: OwnedDeviceId,
|
||||
|
||||
/// Contains serialized and encrypted private data.
|
||||
pub device_data: Raw<DehydratedDeviceData>,
|
||||
}
|
||||
|
||||
/// Creates or recreates the user's dehydrated device.
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(
|
||||
%user_id,
|
||||
device_id = %request.device_id,
|
||||
display_name = ?request.initial_device_display_name,
|
||||
)
|
||||
)]
|
||||
pub async fn set_dehydrated_device(&self, user_id: &UserId, request: Request) -> Result {
|
||||
assert!(
|
||||
self.exists(user_id).await,
|
||||
"Tried to create dehydrated device for non-existent user"
|
||||
);
|
||||
|
||||
let existing_id = self.get_dehydrated_device_id(user_id).await;
|
||||
|
||||
if existing_id.is_err()
|
||||
&& self
|
||||
.get_device_metadata(user_id, &request.device_id)
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
return Err!("A hydrated device already exists with that ID.");
|
||||
}
|
||||
|
||||
if let Ok(existing_id) = existing_id {
|
||||
self.remove_device(user_id, &existing_id).await;
|
||||
}
|
||||
|
||||
self.create_device(
|
||||
user_id,
|
||||
&request.device_id,
|
||||
"",
|
||||
request.initial_device_display_name.clone(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
trace!(device_data = ?request.device_data);
|
||||
self.db.userid_dehydrateddevice.raw_put(
|
||||
user_id,
|
||||
Json(&DehydratedDevice {
|
||||
device_id: request.device_id.clone(),
|
||||
device_data: request.device_data,
|
||||
}),
|
||||
);
|
||||
|
||||
trace!(device_keys = ?request.device_keys);
|
||||
self.add_device_keys(user_id, &request.device_id, &request.device_keys)
|
||||
.await;
|
||||
|
||||
trace!(one_time_keys = ?request.one_time_keys);
|
||||
for (one_time_key_key, one_time_key_value) in &request.one_time_keys {
|
||||
self.add_one_time_key(user_id, &request.device_id, one_time_key_key, one_time_key_value)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes a user's dehydrated device.
|
||||
///
|
||||
/// Calling this directly will remove the dehydrated data but leak the frontage
|
||||
/// device. Thus this is called by the regular device interface such that the
|
||||
/// dehydrated data will not leak instead.
|
||||
///
|
||||
/// If device_id is given, the user's dehydrated device must match or this is a
|
||||
/// no-op, but an Err is still returned to indicate that. Otherwise returns the
|
||||
/// removed dehydrated device_id.
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(
|
||||
level = "debug",
|
||||
skip_all,
|
||||
fields(
|
||||
%user_id,
|
||||
device_id = ?maybe_device_id,
|
||||
)
|
||||
)]
|
||||
pub(super) async fn remove_dehydrated_device(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
maybe_device_id: Option<&DeviceId>,
|
||||
) -> Result<OwnedDeviceId> {
|
||||
let Ok(device_id) = self.get_dehydrated_device_id(user_id).await else {
|
||||
return Err!(Request(NotFound("No dehydrated device for this user.")));
|
||||
};
|
||||
|
||||
if let Some(maybe_device_id) = maybe_device_id {
|
||||
if maybe_device_id != device_id {
|
||||
return Err!(Request(NotFound("Not the user's dehydrated device.")));
|
||||
}
|
||||
}
|
||||
|
||||
self.db.userid_dehydrateddevice.remove(user_id);
|
||||
|
||||
Ok(device_id)
|
||||
}
|
||||
|
||||
/// Get the device_id of the user's dehydrated device.
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(
|
||||
level = "debug",
|
||||
skip_all,
|
||||
fields(%user_id)
|
||||
)]
|
||||
pub async fn get_dehydrated_device_id(&self, user_id: &UserId) -> Result<OwnedDeviceId> {
|
||||
self.get_dehydrated_device(user_id)
|
||||
.await
|
||||
.map(|device| device.device_id)
|
||||
}
|
||||
|
||||
/// Get the dehydrated device private data
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(
|
||||
level = "debug",
|
||||
skip_all,
|
||||
fields(%user_id),
|
||||
ret,
|
||||
)]
|
||||
pub async fn get_dehydrated_device(&self, user_id: &UserId) -> Result<DehydratedDevice> {
|
||||
self.db
|
||||
.userid_dehydrateddevice
|
||||
.get(user_id)
|
||||
.await
|
||||
.deserialized()
|
||||
}
|
||||
@@ -1,5 +1,3 @@
|
||||
pub(super) mod dehydrated_device;
|
||||
|
||||
#[cfg(feature = "ldap")]
|
||||
use std::collections::HashMap;
|
||||
use std::{collections::BTreeMap, mem, net::IpAddr, sync::Arc};
|
||||
@@ -7,7 +5,7 @@
|
||||
#[cfg(feature = "ldap")]
|
||||
use conduwuit::result::LogErr;
|
||||
use conduwuit::{
|
||||
Err, Error, Result, Server, debug_warn, err, is_equal_to, trace,
|
||||
Err, Error, Result, Server, at, debug_warn, err, is_equal_to, trace,
|
||||
utils::{self, ReadyExt, stream::TryIgnore, string::Unquoted},
|
||||
};
|
||||
#[cfg(feature = "ldap")]
|
||||
@@ -72,7 +70,6 @@ struct Data {
|
||||
userfilterid_filter: Arc<Map>,
|
||||
userid_avatarurl: Arc<Map>,
|
||||
userid_blurhash: Arc<Map>,
|
||||
userid_dehydrateddevice: Arc<Map>,
|
||||
userid_devicelistversion: Arc<Map>,
|
||||
userid_displayname: Arc<Map>,
|
||||
userid_lastonetimekeyupdate: Arc<Map>,
|
||||
@@ -113,7 +110,6 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
userfilterid_filter: args.db["userfilterid_filter"].clone(),
|
||||
userid_avatarurl: args.db["userid_avatarurl"].clone(),
|
||||
userid_blurhash: args.db["userid_blurhash"].clone(),
|
||||
userid_dehydrateddevice: args.db["userid_dehydrateddevice"].clone(),
|
||||
userid_devicelistversion: args.db["userid_devicelistversion"].clone(),
|
||||
userid_displayname: args.db["userid_displayname"].clone(),
|
||||
userid_lastonetimekeyupdate: args.db["userid_lastonetimekeyupdate"].clone(),
|
||||
@@ -188,12 +184,6 @@ pub async fn create(
|
||||
password: Option<&str>,
|
||||
origin: Option<&str>,
|
||||
) -> Result<()> {
|
||||
if !self.services.globals.user_is_local(user_id)
|
||||
&& (password.is_some() || origin.is_some())
|
||||
{
|
||||
return Err!("Cannot create a nonlocal user with a set password or origin");
|
||||
}
|
||||
|
||||
self.db
|
||||
.userid_origin
|
||||
.insert(user_id, origin.unwrap_or("password"));
|
||||
@@ -484,11 +474,6 @@ pub async fn create_device(
|
||||
|
||||
/// Removes a device from a user.
|
||||
pub async fn remove_device(&self, user_id: &UserId, device_id: &DeviceId) {
|
||||
// Remove dehydrated device if this is the dehydrated device
|
||||
let _: Result<_> = self
|
||||
.remove_dehydrated_device(user_id, Some(device_id))
|
||||
.await;
|
||||
|
||||
let userdeviceid = (user_id, device_id);
|
||||
|
||||
// Remove tokens
|
||||
@@ -1012,7 +997,7 @@ pub fn get_to_device_events<'a>(
|
||||
device_id: &'a DeviceId,
|
||||
since: Option<u64>,
|
||||
to: Option<u64>,
|
||||
) -> impl Stream<Item = (u64, Raw<AnyToDeviceEvent>)> + Send + 'a {
|
||||
) -> impl Stream<Item = Raw<AnyToDeviceEvent>> + Send + 'a {
|
||||
type Key<'a> = (&'a UserId, &'a DeviceId, u64);
|
||||
|
||||
let from = (user_id, device_id, since.map_or(0, |since| since.saturating_add(1)));
|
||||
@@ -1026,7 +1011,7 @@ pub fn get_to_device_events<'a>(
|
||||
&& device_id == *device_id_
|
||||
&& to.is_none_or(|to| *count <= to)
|
||||
})
|
||||
.map(|((_, _, count), event)| (count, event))
|
||||
.map(at!(1))
|
||||
}
|
||||
|
||||
pub async fn remove_to_device_events<Until>(
|
||||
|
||||
Reference in New Issue
Block a user