mirror of
https://forgejo.ellis.link/continuwuation/continuwuity/
synced 2026-05-13 20:23:07 +00:00
Compare commits
63 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d3a40c40ba | |||
| 7559b31f99 | |||
| 0cd8b8fc35 | |||
| b884fbd240 | |||
| a03490c81c | |||
| 5a4b8309a7 | |||
| d576620953 | |||
| 36cf5b053b | |||
| d95bf66e3d | |||
| ab277409bc | |||
| ec76147784 | |||
| 3f3ca53fdd | |||
| 70a768a711 | |||
| 39a882c4a1 | |||
| f091d3a732 | |||
| ebf9a08cd1 | |||
| 4fef0a7ff2 | |||
| 2f37b446bc | |||
| 6185841b6a | |||
| 3e0d4b066e | |||
| 0d2eeed567 | |||
| b296720540 | |||
| d600aed8db | |||
| 9724953b5e | |||
| 1605176956 | |||
| 2b0aedf5fd | |||
| c78c431703 | |||
| 49b48b857d | |||
| bf1e42b225 | |||
| ec76a234db | |||
| 091514e9f9 | |||
| 789ad499f7 | |||
| 1e6eaa4337 | |||
| de97900b07 | |||
| cb68a3d0ae | |||
| d3852abe51 | |||
| 15845b1c55 | |||
| f7d558baa6 | |||
| edd80b2600 | |||
| 03eab32c27 | |||
| 636de8a708 | |||
| e212c91ebf | |||
| 83f3314f08 | |||
| 8c2cf67783 | |||
| 7436e2f4e1 | |||
| 9ba406761b | |||
| 97f49d6357 | |||
| 1a49bc6f87 | |||
| 833216256b | |||
| 5fa3087401 | |||
| e95c0bd53f | |||
| 52d1ed24a9 | |||
| 4c1638e495 | |||
| 3f69cf8ed7 | |||
| 560a615c29 | |||
| 2e19310a87 | |||
| 81c5c6b2bc | |||
| 73d8462ace | |||
| 8b5fda1fb5 | |||
| 6f9b4a989e | |||
| fe0d83d447 | |||
| 37dccdbeb0 | |||
| 1060adc670 |
@@ -71,7 +71,7 @@ runs:
|
||||
|
||||
- name: Install timelord-cli and git-warp-time
|
||||
if: steps.check-binaries.outputs.need-install == 'true'
|
||||
uses: https://github.com/taiki-e/install-action@787505cde8a44ea468a00478fe52baf23b15bccd # v2
|
||||
uses: https://github.com/taiki-e/install-action@b5fddbb5361bce8a06fb168c9d403a6cc552b084 # v2
|
||||
with:
|
||||
tool: git-warp-time,timelord-cli@3.0.1
|
||||
|
||||
|
||||
@@ -45,7 +45,6 @@
|
||||
- [ ] I have [tested my contribution][c1t] (or proof-read it for documentation-only changes)
|
||||
myself, if applicable. This includes ensuring code compiles.
|
||||
- [ ] My commit messages follow the [commit message format][c1cm] and are descriptive.
|
||||
- [ ] I have written a [news fragment][n1] for this PR, if applicable<!--(can be done after hitting open!)-->.
|
||||
|
||||
<!--
|
||||
Notes on these requirements:
|
||||
@@ -79,4 +78,3 @@
|
||||
[c1pc]: https://forgejo.ellis.link/continuwuation/continuwuity/src/branch/main/CONTRIBUTING.md#pre-commit-checks
|
||||
[c1t]: https://forgejo.ellis.link/continuwuation/continuwuity/src/branch/main/CONTRIBUTING.md#running-tests-locally
|
||||
[c1cm]: https://forgejo.ellis.link/continuwuation/continuwuity/src/branch/main/CONTRIBUTING.md#commit-messages
|
||||
[n1]: https://towncrier.readthedocs.io/en/stable/tutorial.html#creating-news-fragments
|
||||
|
||||
@@ -216,7 +216,7 @@ jobs:
|
||||
path: binaries
|
||||
merge-multiple: true
|
||||
- name: Create Release and Upload
|
||||
uses: https://github.com/softprops/action-gh-release@b4309332981a82ec1c5618f44dd2e27cc8bfbfda # v3
|
||||
uses: https://github.com/softprops/action-gh-release@v2
|
||||
with:
|
||||
draft: true
|
||||
files: binaries/*
|
||||
|
||||
Generated
+209
-343
File diff suppressed because it is too large
Load Diff
+9
-11
@@ -12,7 +12,7 @@ license = "Apache-2.0"
|
||||
# See also `rust-toolchain.toml`
|
||||
readme = "README.md"
|
||||
repository = "https://forgejo.ellis.link/continuwuation/continuwuity"
|
||||
version = "0.5.8"
|
||||
version = "0.5.9"
|
||||
|
||||
[workspace.metadata.crane]
|
||||
name = "conduwuit"
|
||||
@@ -39,7 +39,10 @@ features = ["ffi", "std", "union"]
|
||||
version = "1.1.0"
|
||||
|
||||
[workspace.dependencies.ctor]
|
||||
version = "0.10.0"
|
||||
version = "0.13.0"
|
||||
|
||||
[workspace.dependencies.dtor]
|
||||
version = "0.13.0"
|
||||
|
||||
[workspace.dependencies.cargo_toml]
|
||||
version = "0.22"
|
||||
@@ -430,7 +433,7 @@ features = ["http", "grpc-tonic", "trace", "logs", "metrics"]
|
||||
|
||||
# optional sentry metrics for crash/panic reporting
|
||||
[workspace.dependencies.sentry]
|
||||
version = "0.47.0"
|
||||
version = "0.48.0"
|
||||
default-features = false
|
||||
features = [
|
||||
"backtrace",
|
||||
@@ -445,9 +448,9 @@ features = [
|
||||
]
|
||||
|
||||
[workspace.dependencies.sentry-tracing]
|
||||
version = "0.47.0"
|
||||
version = "0.48.0"
|
||||
[workspace.dependencies.sentry-tower]
|
||||
version = "0.47.0"
|
||||
version = "0.48.0"
|
||||
|
||||
# jemalloc usage
|
||||
[workspace.dependencies.tikv-jemalloc-sys]
|
||||
@@ -546,16 +549,11 @@ features = ["std"]
|
||||
[workspace.dependencies.maplit]
|
||||
version = "1.0.2"
|
||||
|
||||
[workspace.dependencies.ldap3]
|
||||
version = "0.12.0"
|
||||
default-features = false
|
||||
features = ["sync", "tls-rustls", "rustls-provider"]
|
||||
|
||||
[workspace.dependencies.yansi]
|
||||
version = "1.0.1"
|
||||
|
||||
[workspace.dependencies.askama]
|
||||
version = "0.15.0"
|
||||
version = "0.16.0"
|
||||
|
||||
[workspace.dependencies.lettre]
|
||||
version = "0.11.19"
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
Removed support for guest user registration, a little-used and deprecated approach to room previews.
|
||||
@@ -0,0 +1 @@
|
||||
Add performance tuning documentation. Contributed by @stratself.
|
||||
@@ -0,0 +1 @@
|
||||
Removed support for LDAP.
|
||||
@@ -0,0 +1 @@
|
||||
Clarified in the config that `max_request_size` affects federated media as well.
|
||||
@@ -0,0 +1 @@
|
||||
Added support for fallback encryption keys.
|
||||
@@ -0,0 +1 @@
|
||||
Add `!admin users reject-all-invites` to clean invite spam
|
||||
@@ -0,0 +1,9 @@
|
||||
Implemented event rejection, which should resolve and prevent future netsplits of the kinds observed
|
||||
within some Continuwuity rooms.
|
||||
Also resolved several bugs related to both soft-failing events, and event backfilling, which should
|
||||
improve state resolution stability.
|
||||
The `!admin debug get-pdu` command was updated to disambiguate event acceptance status, and
|
||||
`!admin debug show-auth-chain` was added to visually display event auth chains, which may assist
|
||||
developers in debugging strangely complex events.
|
||||
|
||||
Contributed by @nex.
|
||||
@@ -7,7 +7,6 @@
|
||||
[global]
|
||||
address = "0.0.0.0"
|
||||
allow_device_name_federation = true
|
||||
allow_guest_registration = true
|
||||
allow_public_room_directory_over_federation = true
|
||||
allow_registration = true
|
||||
database_path = "/database"
|
||||
@@ -32,7 +31,6 @@ rocksdb_log_level = "info"
|
||||
rocksdb_max_log_files = 1
|
||||
rocksdb_recovery_mode = 0
|
||||
rocksdb_paranoid_file_checks = true
|
||||
log_guest_registrations = false
|
||||
allow_legacy_media = true
|
||||
startup_netburst = true
|
||||
startup_netburst_keep = -1
|
||||
|
||||
+1
-111
@@ -291,6 +291,7 @@
|
||||
#ip_lookup_strategy = 5
|
||||
|
||||
# Max request size for file uploads in bytes. Defaults to 20MB.
|
||||
# Also limits incoming federated media.
|
||||
#
|
||||
#max_request_size = 20971520
|
||||
|
||||
@@ -1270,21 +1271,6 @@
|
||||
#
|
||||
#brotli_compression = false
|
||||
|
||||
# Set to true to allow user type "guest" registrations. Some clients like
|
||||
# Element attempt to register guest users automatically.
|
||||
#
|
||||
#allow_guest_registration = false
|
||||
|
||||
# Set to true to log guest registrations in the admin room. Note that
|
||||
# these may be noisy or unnecessary if you're a public homeserver.
|
||||
#
|
||||
#log_guest_registrations = false
|
||||
|
||||
# Set to true to allow guest registrations/users to auto join any rooms
|
||||
# specified in `auto_join_rooms`.
|
||||
#
|
||||
#allow_guests_auto_join_rooms = false
|
||||
|
||||
# Enable the legacy unauthenticated Matrix media repository endpoints.
|
||||
# These endpoints consist of:
|
||||
# - /_matrix/media/*/config
|
||||
@@ -1933,102 +1919,6 @@
|
||||
#
|
||||
#foci = []
|
||||
|
||||
[global.ldap]
|
||||
|
||||
# Whether to enable LDAP login.
|
||||
#
|
||||
# example: "true"
|
||||
#
|
||||
#enable = false
|
||||
|
||||
# Whether to force LDAP authentication or authorize classical password
|
||||
# login.
|
||||
#
|
||||
# example: "true"
|
||||
#
|
||||
#ldap_only = false
|
||||
|
||||
# URI of the LDAP server.
|
||||
#
|
||||
# example: "ldap://ldap.example.com:389"
|
||||
#
|
||||
#uri = ""
|
||||
|
||||
# StartTLS for LDAP connections.
|
||||
#
|
||||
#use_starttls = false
|
||||
|
||||
# Skip TLS certificate verification, possibly dangerous.
|
||||
#
|
||||
#disable_tls_verification = false
|
||||
|
||||
# Root of the searches.
|
||||
#
|
||||
# example: "ou=users,dc=example,dc=org"
|
||||
#
|
||||
#base_dn = ""
|
||||
|
||||
# Bind DN if anonymous search is not enabled.
|
||||
#
|
||||
# You can use the variable `{username}` that will be replaced by the
|
||||
# entered username. In such case, the password used to bind will be the
|
||||
# one provided for the login and not the one given by
|
||||
# `bind_password_file`. Beware: automatically granting admin rights will
|
||||
# not work if you use this direct bind instead of a LDAP search.
|
||||
#
|
||||
# example: "cn=ldap-reader,dc=example,dc=org" or
|
||||
# "cn={username},ou=users,dc=example,dc=org"
|
||||
#
|
||||
#bind_dn = ""
|
||||
|
||||
# Path to a file on the system that contains the password for the
|
||||
# `bind_dn`.
|
||||
#
|
||||
# The server must be able to access the file, and it must not be empty.
|
||||
#
|
||||
#bind_password_file = ""
|
||||
|
||||
# Search filter to limit user searches.
|
||||
#
|
||||
# You can use the variable `{username}` that will be replaced by the
|
||||
# entered username for more complex filters.
|
||||
#
|
||||
# example: "(&(objectClass=person)(memberOf=matrix))"
|
||||
#
|
||||
#filter = "(objectClass=*)"
|
||||
|
||||
# Attribute to use to uniquely identify the user.
|
||||
#
|
||||
# example: "uid" or "cn"
|
||||
#
|
||||
#uid_attribute = "uid"
|
||||
|
||||
# Attribute containing the display name of the user.
|
||||
#
|
||||
# example: "givenName" or "sn"
|
||||
#
|
||||
#name_attribute = "givenName"
|
||||
|
||||
# Root of the searches for admin users.
|
||||
#
|
||||
# Defaults to `base_dn` if empty.
|
||||
#
|
||||
# example: "ou=admins,dc=example,dc=org"
|
||||
#
|
||||
#admin_base_dn = ""
|
||||
|
||||
# The LDAP search filter to find administrative users for continuwuity.
|
||||
#
|
||||
# If left blank, administrative state must be configured manually for each
|
||||
# user.
|
||||
#
|
||||
# You can use the variable `{username}` that will be replaced by the
|
||||
# entered username for more complex filters.
|
||||
#
|
||||
# example: "(objectClass=conduwuitAdmin)" or "(uid={username})"
|
||||
#
|
||||
#admin_filter = ""
|
||||
|
||||
#[global.antispam]
|
||||
|
||||
#[global.antispam.meowlnir]
|
||||
|
||||
+1
-1
@@ -50,7 +50,7 @@ EOF
|
||||
|
||||
# Developer tool versions
|
||||
# renovate: datasource=github-releases depName=cargo-bins/cargo-binstall
|
||||
ENV BINSTALL_VERSION=1.18.1
|
||||
ENV BINSTALL_VERSION=1.19.1
|
||||
# renovate: datasource=github-releases depName=psastras/sbom-rs
|
||||
ENV CARGO_SBOM_VERSION=0.9.1
|
||||
# renovate: datasource=crate depName=lddtree
|
||||
|
||||
@@ -18,7 +18,7 @@ RUN --mount=type=cache,target=/etc/apk/cache apk add \
|
||||
|
||||
# Developer tool versions
|
||||
# renovate: datasource=github-releases depName=cargo-bins/cargo-binstall
|
||||
ENV BINSTALL_VERSION=1.18.1
|
||||
ENV BINSTALL_VERSION=1.19.1
|
||||
# renovate: datasource=github-releases depName=psastras/sbom-rs
|
||||
ENV CARGO_SBOM_VERSION=0.9.1
|
||||
# renovate: datasource=crate depName=lddtree
|
||||
|
||||
@@ -8,6 +8,11 @@
|
||||
"type": "file",
|
||||
"name": "dns",
|
||||
"label": "DNS tuning (recommended)"
|
||||
},
|
||||
{
|
||||
"type": "file",
|
||||
"name": "performance",
|
||||
"label": "Performance tuning"
|
||||
}
|
||||
|
||||
]
|
||||
|
||||
@@ -156,9 +156,11 @@ ### Serving well-known files manually
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
Check with the [Matrix Connectivity Tester][federation-tester] to see that it's working.
|
||||
Check that other servers can connect to you.
|
||||
Here are some tools that can help identify federation issues:
|
||||
|
||||
[federation-tester]: https://federationtester.mtrnord.blog/
|
||||
- [Matrix Connectivity Tester](https://federationtester.mtrnord.blog/)
|
||||
- [Matrix Federation Tester](https://federationtester.matrix.org/)
|
||||
|
||||
### Cannot log in with web clients
|
||||
|
||||
|
||||
@@ -0,0 +1,135 @@
|
||||
# Performance tuning
|
||||
|
||||
Continuwuity's default configs are suited for many typical setups and scales appropriately with the size of your hardware. However, there are many scenarios where additional modifications can be made to better utilize your server resources.
|
||||
|
||||
This page aims to outline various performance tweaks for Continuwuity and their effects. These adjustments are especially helpful for homeservers that join many large federated rooms or have many users, and it will become increasingly necessary as the Matrix network expands. As always, your mileage may vary according to your setup's specifics. If you have further discussions or recommendations, please share them in the community rooms.
|
||||
|
||||
## DNS tuning (recommended)
|
||||
|
||||
Please see the dedicated [DNS tuning guide](./dns.mdx).
|
||||
|
||||
## Cache capacities
|
||||
|
||||
If you have memory to spare, consider increasing the `cache_capacity_modifier` value to a larger number to allow more data to be stored in hot memory. This *significantly* speeds up many intensive operations (such as state resolutions) and decreases CPU usage and disk I/O. Start with a baseline of `cache_capacity_modifier = 2.0` and tune up until you are satisfied with RAM usage.
|
||||
|
||||
On the other hand, if your system doesn't have a lot of RAM, consider decreasing the cache capacity modifier to something smaller than `1.0` to avoid low-memory issues (at the cost of higher load on disk/CPU). This recommendation also works if your system has abnormally little RAM compared to the number of CPU cores (for example, 2GB RAM for 12 cores), as cache capacities scale according to number of available cores.
|
||||
|
||||
## Disabling some features
|
||||
|
||||
You can disable outgoing **typing notifications** and **read markers** to reduce strain on the CPU and network when actively participating in rooms.
|
||||
|
||||
```toml
|
||||
# disables sending read receipts
|
||||
allow_outgoing_read_receipts = false
|
||||
# disables sending typing notifications
|
||||
allow_outgoing_typing = false
|
||||
```
|
||||
|
||||
Outgoing presence updates are also considered very expensive and have been disabled by default (`allow_outgoing_presence = false`). For more savings, you may wish to disable _all_ processing of presence entirely.
|
||||
|
||||
```toml title=continuwuity.toml
|
||||
# disabling presence updates entirely
|
||||
allow_local_presence = false
|
||||
allow_incoming_presence = false
|
||||
allow_outgoing_presence = false
|
||||
```
|
||||
|
||||
## Tuning database compression
|
||||
|
||||
:::warning
|
||||
These steps SHOULD be done **before** starting Continuwuity for the first time. While switching database compression midway through is theoretically possible, this has not been tested extensively in the wild.
|
||||
:::
|
||||
|
||||
### Changing the compression algorithm
|
||||
|
||||
For reduced CPU usage at a tradeoff of increased storage space, consider deploying Continuwuity with the faster and less intensive `lz4` algorithm instead of `zstd` for rocksdb, and disable WAL compression entirely:
|
||||
|
||||
```toml
|
||||
### in continuwuity.toml ###
|
||||
rocksdb_compression_algo = "lz4"
|
||||
rocksdb_wal_compression = "none"
|
||||
```
|
||||
|
||||
This tweak can especially be helpful if you have an older or less performant CPU (e.g. a Raspberry Pi) and disk space to spare.
|
||||
|
||||
### Increasing bottommost layer compression (`zstd` only)
|
||||
|
||||
The bottommost layer of the database usually contains old and read-only data, so it is a suitable place for further compression. In Continuwuity, this is possible by setting `rocksdb_bottommost_compression = true` and tuning `rocksdb_bottommost_compression_level` to a more compact level than the default one used in `rocksdb_compression_level`. This tweak comes at a cost of increased CPU usage, but may prevent your database from growing too large in the long run.
|
||||
|
||||
For those using `zstd` compression, the compression level ranges from 1 to 22. An example like this could apply:
|
||||
|
||||
```toml
|
||||
### in continuwuity.toml ###
|
||||
rocksdb_compression_algo = "zstd"
|
||||
rocksdb_compression_level = 32767 # magic number, translates to level 3 on zstd
|
||||
rocksdb_bottommost_compression = true
|
||||
rocksdb_bottommost_compression_level = 9 # level 9 on zstd
|
||||
```
|
||||
|
||||
For `lz4` users, the default level (`-1`) is already the most compact. You can only further decrease it to favor compression speed over ratio.
|
||||
|
||||
Consult these documents for more information on compression tuning and levels:
|
||||
|
||||
- [Rocksdb compression documentation][rocksdb-compression]
|
||||
- [Rocksdb default compression levels][rocksdb-compression-defaults]
|
||||
- [Zstd manual][zstd-manual]
|
||||
- [Lz4 manual][lz4-manual]
|
||||
|
||||
[rocksdb-compression]: https://github.com/facebook/rocksdb/wiki/Compression
|
||||
[rocksdb-compression-defaults]: https://github.com/facebook/rocksdb/blob/main/include/rocksdb/options.h#L208-L217
|
||||
[zstd-manual]: https://facebook.github.io/zstd/zstd_manual.html
|
||||
[lz4-manual]: https://github.com/lz4/lz4/blob/release/doc/lz4_manual.html
|
||||
|
||||
## Other tweaks
|
||||
|
||||
### Using UNIX sockets
|
||||
|
||||
If your homeserver and reverse proxy live on the same machine, you may wish to expose Continuwuity on a UNIX socket instead of a port. This removes TCP overhead between the two programs.
|
||||
|
||||
<details>
|
||||
|
||||
<summary>Example config with Caddy</summary>
|
||||
|
||||
```toml
|
||||
### in continuwuity.toml ###
|
||||
|
||||
# `address` and `port` has to be commented out first
|
||||
#address = ["127.0.0.1", "::1"]
|
||||
#port = 8008
|
||||
unix_socket_path = "/run/continuwuity/continuwuity.sock"
|
||||
```
|
||||
|
||||
```
|
||||
### in your Caddyfile ###
|
||||
https://matrix.example.com {
|
||||
reverse_proxy unix//run/continuwuity/continuwuity.sock
|
||||
|
||||
# alternatively, use the http2-plaintext protocol
|
||||
# reverse_proxy unix+h2c//run/continuwuity/continuwuity.sock
|
||||
}
|
||||
```
|
||||
|
||||
</details>
|
||||
|
||||
### Tuning your trusted servers
|
||||
|
||||
:::info Vet your trusted servers!
|
||||
Trusted servers are your first point of contact when obtaining public keys from other servers, and they could theoretically impersonate other servers and cause significant harm to your deployment. Please thoroughly verify your trusted servers' credibility before adding them to your configuration.
|
||||
:::
|
||||
|
||||
Trusted servers are queried sequentially in the order they are listed. If you have multiple trusted servers configured, put the faster ones first:
|
||||
|
||||
```toml
|
||||
# Example config, using maintainers' recommended homeservers
|
||||
trusted_servers = ["codestorm.net","starstruck.systems","unredacted.org","matrix.org"]
|
||||
```
|
||||
|
||||
Avoid prioritising `matrix.org` as your primary trusted server, as it tends to be quite slow.
|
||||
|
||||
Some users have also reported that increasing `trusted_server_batch_size` has helped with faster joins for huge rooms. Start with doubling the default to `2048` until you find a suitable value.
|
||||
|
||||
### Enable HTTP/3 on your reverse proxy
|
||||
|
||||
Consider enabling the newer **HTTP/3** protocol for inbound connections to Continuwuity. In Caddy HTTP/3 is allowed by default, but you must expose port :443/**udp** on your firewall.
|
||||
|
||||
HTTP/3 can vastly improve Client-Server connections especially on unstable networks, as it reduces packet losses and latency from TCP head-of-line blocking, includes workarounds for network switching, and reduces connection establishment handshakes. Continuwuity also includes experimental _outbound_ HTTP/3 support in its Docker images, so connections between Continuwuity servers can benefit from this too.
|
||||
@@ -268,9 +268,13 @@ ## Starting Your Server
|
||||
|
||||
## How do I know it works?
|
||||
|
||||
To check if your server can communicate with other homeservers, use the
|
||||
[Matrix Federation Tester](https://federationtester.mtrnord.blog/). If you can
|
||||
register your account but cannot join federated rooms, check your configuration
|
||||
To check if your server can communicate with other homeservers,
|
||||
use an external testing tool:
|
||||
|
||||
- [Matrix Connectivity Tester](https://federationtester.mtrnord.blog/)
|
||||
- [Matrix Federation Tester](https://federationtester.matrix.org/)
|
||||
|
||||
If you can register your account but cannot join federated rooms, check your configuration
|
||||
and verify that your federation endpoints are opened and forwarded correctly.
|
||||
|
||||
As a quick health check, you can also use these cURL commands:
|
||||
|
||||
@@ -6,10 +6,10 @@
|
||||
"message": "Welcome to Continuwuity! Important announcements about the project will appear here."
|
||||
},
|
||||
{
|
||||
"id": 12,
|
||||
"mention_room": false,
|
||||
"date": "2026-04-24",
|
||||
"message": "[v0.5.8](https://forgejo.ellis.link/continuwuation/continuwuity/releases/tag/v0.5.8) is out! This is a patch release which fixes a bug in 0.5.7's email support -- upgrade soon if you use that feature."
|
||||
"id": 13,
|
||||
"mention_room": true,
|
||||
"date": "2026-05-08",
|
||||
"message": "[v0.5.9](https://forgejo.ellis.link/continuwuation/continuwuity/releases/tag/v0.5.9) has been released, fixing a few low-severity federation-related vulnerabilities. It is recommended you read the changelog and update as soon as possible. There are no new features or other changes in this release, only related bugfixes. Deployments tracking the main branch should also update to the latest commit."
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ ## Running commands
|
||||
|
||||
* All commands listed here may be used by server administrators in the admin room by sending them as messages.
|
||||
* If the `admin_escape_commands` configuration option is enabled, server administrators may run certain commands in public rooms by prefixing them with a single backslash. These commands will only run on _their_ homeserver, even if they are a member of another homeserver's admin room. Some sensitive commands cannot be used outside the admin room and will return an error.
|
||||
* All commands listed here may be used in the server's console, if it is enabled. Commands entered in the console do not require the `!admin` prefix. If Continuwuity is deployed via Docker, be sure to set the appropriate options detailed in [the Docker deployment guide](../../deploying/docker.mdx#accessing-the-servers-console) to enable access to the server's console.
|
||||
* All commands listed here may be used in the server's console, if it is enabled. Commands entered in the console do not require the `!admin` prefix.
|
||||
|
||||
## Categories
|
||||
|
||||
|
||||
Generated
+15
-15
@@ -3,11 +3,11 @@
|
||||
"advisory-db": {
|
||||
"flake": false,
|
||||
"locked": {
|
||||
"lastModified": 1775907537,
|
||||
"narHash": "sha256-vbeLNgmsx1Z6TwnlDV0dKyeBCcon3UpkV9yLr/yc6HM=",
|
||||
"lastModified": 1777645914,
|
||||
"narHash": "sha256-P1T7QVQS13OvkXEuEhI91CLaQfyv6iqV9vW8IBLLDYg=",
|
||||
"owner": "rustsec",
|
||||
"repo": "advisory-db",
|
||||
"rev": "d99f7b9eb81731bddebf80a355f8be7b2f8b1b28",
|
||||
"rev": "d6ba1f7070ba91f45efe372d68eb648be67d0417",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@@ -18,11 +18,11 @@
|
||||
},
|
||||
"crane": {
|
||||
"locked": {
|
||||
"lastModified": 1775839657,
|
||||
"narHash": "sha256-SPm9ck7jh3Un9nwPuMGbRU04UroFmOHjLP56T10MOeM=",
|
||||
"lastModified": 1777335812,
|
||||
"narHash": "sha256-bEg5xoAxAwsyfnGhkEX7RJViTIBIYPd8ISg4O1c0HFc=",
|
||||
"owner": "ipetkov",
|
||||
"repo": "crane",
|
||||
"rev": "7cf72d978629469c4bd4206b95c402514c1f6000",
|
||||
"rev": "5e0fb2f64edff2822249f21293b8304dedaaf676",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@@ -39,11 +39,11 @@
|
||||
"rust-analyzer-src": "rust-analyzer-src"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1775891769,
|
||||
"narHash": "sha256-EOfVlTKw2n8w1uhfh46GS4hEGnQ7oWrIWQfIY6utIkI=",
|
||||
"lastModified": 1777624102,
|
||||
"narHash": "sha256-thSyElkje577x/kAbP72nHlfiFc1a+tCudskLPHXe9s=",
|
||||
"owner": "nix-community",
|
||||
"repo": "fenix",
|
||||
"rev": "6fbc54dde15aee725bdc7aae5e478849685d5f56",
|
||||
"rev": "4d81601e0b73f20d81d066754ad0e7d1e7f75a06",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@@ -89,11 +89,11 @@
|
||||
},
|
||||
"nixpkgs": {
|
||||
"locked": {
|
||||
"lastModified": 1775710090,
|
||||
"narHash": "sha256-ar3rofg+awPB8QXDaFJhJ2jJhu+KqN/PRCXeyuXR76E=",
|
||||
"lastModified": 1777268161,
|
||||
"narHash": "sha256-bxrdOn8SCOv8tN4JbTF/TXq7kjo9ag4M+C8yzzIRYbE=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "4c1018dae018162ec878d42fec712642d214fdfa",
|
||||
"rev": "1c3fe55ad329cbcb28471bb30f05c9827f724c76",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@@ -132,11 +132,11 @@
|
||||
"rust-analyzer-src": {
|
||||
"flake": false,
|
||||
"locked": {
|
||||
"lastModified": 1775843361,
|
||||
"narHash": "sha256-j53ZgyDvmYf3Sjh1IPvvTjqa614qUfVQSzj59+MpzkY=",
|
||||
"lastModified": 1777583169,
|
||||
"narHash": "sha256-dVJ4+wrRKc8oIgp3rLOFSq1obt/sCKlXy3h47qof/w0=",
|
||||
"owner": "rust-lang",
|
||||
"repo": "rust-analyzer",
|
||||
"rev": "9eb97ea96d8400e8957ddd56702e962614296583",
|
||||
"rev": "aa64e4828a2bbba44463c1229a81c748d3cce583",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
|
||||
Generated
+17
-17
@@ -1399,9 +1399,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/hookable": {
|
||||
"version": "6.1.0",
|
||||
"resolved": "https://registry.npmjs.org/hookable/-/hookable-6.1.0.tgz",
|
||||
"integrity": "sha512-ZoKZSJgu8voGK2geJS+6YtYjvIzu9AOM/KZXsBxr83uhLL++e9pEv/dlgwgy3dvHg06kTz6JOh1hk3C8Ceiymw==",
|
||||
"version": "6.1.1",
|
||||
"resolved": "https://registry.npmjs.org/hookable/-/hookable-6.1.1.tgz",
|
||||
"integrity": "sha512-U9LYDy1CwhMCnprUfeAZWZGByVbhd54hwepegYTK7Pi5NvqEj63ifz5z+xukznehT7i6NIZRu89Ay1AZmRsLEQ==",
|
||||
"dev": true,
|
||||
"license": "MIT"
|
||||
},
|
||||
@@ -2683,20 +2683,20 @@
|
||||
"license": "MIT"
|
||||
},
|
||||
"node_modules/oniguruma-parser": {
|
||||
"version": "0.12.1",
|
||||
"resolved": "https://registry.npmjs.org/oniguruma-parser/-/oniguruma-parser-0.12.1.tgz",
|
||||
"integrity": "sha512-8Unqkvk1RYc6yq2WBYRj4hdnsAxVze8i7iPfQr8e4uSP3tRv0rpZcbGUDvxfQQcdwHt/e9PrMvGCsa8OqG9X3w==",
|
||||
"version": "0.12.2",
|
||||
"resolved": "https://registry.npmjs.org/oniguruma-parser/-/oniguruma-parser-0.12.2.tgz",
|
||||
"integrity": "sha512-6HVa5oIrgMC6aA6WF6XyyqbhRPJrKR02L20+2+zpDtO5QAzGHAUGw5TKQvwi5vctNnRHkJYmjAhRVQF2EKdTQw==",
|
||||
"dev": true,
|
||||
"license": "MIT"
|
||||
},
|
||||
"node_modules/oniguruma-to-es": {
|
||||
"version": "4.3.5",
|
||||
"resolved": "https://registry.npmjs.org/oniguruma-to-es/-/oniguruma-to-es-4.3.5.tgz",
|
||||
"integrity": "sha512-Zjygswjpsewa0NLTsiizVuMQZbp0MDyM6lIt66OxsF21npUDlzpHi1Mgb/qhQdkb+dWFTzJmFbEWdvZgRho8eQ==",
|
||||
"version": "4.3.6",
|
||||
"resolved": "https://registry.npmjs.org/oniguruma-to-es/-/oniguruma-to-es-4.3.6.tgz",
|
||||
"integrity": "sha512-csuQ9x3Yr0cEIs/Zgx/OEt9iBw9vqIunAPQkx19R/fiMq2oGVTgcMqO/V3Ybqefr1TBvosI6jU539ksaBULJyA==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"oniguruma-parser": "^0.12.1",
|
||||
"oniguruma-parser": "^0.12.2",
|
||||
"regex": "^6.1.0",
|
||||
"regex-recursion": "^6.0.2"
|
||||
}
|
||||
@@ -2822,9 +2822,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/react-router": {
|
||||
"version": "7.14.0",
|
||||
"resolved": "https://registry.npmjs.org/react-router/-/react-router-7.14.0.tgz",
|
||||
"integrity": "sha512-m/xR9N4LQLmAS0ZhkY2nkPA1N7gQ5TUVa5n8TgANuDTARbn1gt+zLPXEm7W0XDTbrQ2AJSJKhoa6yx1D8BcpxQ==",
|
||||
"version": "7.14.2",
|
||||
"resolved": "https://registry.npmjs.org/react-router/-/react-router-7.14.2.tgz",
|
||||
"integrity": "sha512-yCqNne6I8IB6rVCH7XUvlBK7/QKyqypBFGv+8dj4QBFJiiRX+FG7/nkdAvGElyvVZ/HQP5N19wzteuTARXi5Gw==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
@@ -2845,13 +2845,13 @@
|
||||
}
|
||||
},
|
||||
"node_modules/react-router-dom": {
|
||||
"version": "7.14.0",
|
||||
"resolved": "https://registry.npmjs.org/react-router-dom/-/react-router-dom-7.14.0.tgz",
|
||||
"integrity": "sha512-2G3ajSVSZMEtmTjIklRWlNvo8wICEpLihfD/0YMDxbWK2UyP5EGfnoIn9AIQGnF3G/FX0MRbHXdFcD+rL1ZreQ==",
|
||||
"version": "7.14.2",
|
||||
"resolved": "https://registry.npmjs.org/react-router-dom/-/react-router-dom-7.14.2.tgz",
|
||||
"integrity": "sha512-YZcM5ES8jJSM+KrJ9BdvHHqlnGTg5tH3sC5ChFRj4inosKctdyzBDhOyyHdGk597q2OT6NTrCA1OvB/YDwfekQ==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"react-router": "7.14.0"
|
||||
"react-router": "7.14.2"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=20.0.0"
|
||||
|
||||
@@ -81,6 +81,7 @@ conduwuit-macros.workspace = true
|
||||
conduwuit-service.workspace = true
|
||||
const-str.workspace = true
|
||||
ctor.workspace = true
|
||||
dtor.workspace = true
|
||||
futures.workspace = true
|
||||
lettre.workspace = true
|
||||
log.workspace = true
|
||||
|
||||
+213
-9
@@ -1,5 +1,5 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
collections::{HashMap, HashSet},
|
||||
fmt::Write,
|
||||
iter::once,
|
||||
time::{Instant, SystemTime},
|
||||
@@ -22,7 +22,7 @@
|
||||
use lettre::message::Mailbox;
|
||||
use ruma::{
|
||||
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
|
||||
OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId,
|
||||
OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId, UInt,
|
||||
api::federation::event::get_room_state, events::AnyStateEvent, serde::Raw,
|
||||
};
|
||||
use service::rooms::{
|
||||
@@ -69,6 +69,189 @@ pub(super) async fn get_auth_chain(&self, event_id: OwnedEventId) -> Result {
|
||||
self.write_str(&out).await
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Eq, PartialEq)]
|
||||
enum NodeStatus {
|
||||
Normal(bool),
|
||||
SoftFailed(bool),
|
||||
Rejected(bool),
|
||||
}
|
||||
|
||||
struct AuthChild {
|
||||
node_id: String,
|
||||
event_id: OwnedEventId,
|
||||
depth: UInt,
|
||||
ts: UInt,
|
||||
first_seen: bool,
|
||||
pdu: Option<PduEvent>,
|
||||
}
|
||||
|
||||
fn render_node(
|
||||
graph: &mut String,
|
||||
node_id: &str,
|
||||
event_id: &EventId,
|
||||
status: NodeStatus,
|
||||
) -> Result {
|
||||
let evt_str = event_id.to_string();
|
||||
|
||||
let status_label = match status {
|
||||
| NodeStatus::Normal(false) => evt_str,
|
||||
| NodeStatus::Normal(true) => format!("{evt_str} (missing locally)"),
|
||||
| NodeStatus::SoftFailed(false) => format!("{evt_str} (soft-failed)"),
|
||||
| NodeStatus::SoftFailed(true) => format!("{evt_str} (soft-failed & missing locally)"),
|
||||
| NodeStatus::Rejected(false) => format!("{evt_str} (rejected)"),
|
||||
| NodeStatus::Rejected(true) => format!("{evt_str} (rejected & missing locally)"),
|
||||
};
|
||||
|
||||
writeln!(graph, "{node_id}[\"{}\"]", status_label.as_str())?;
|
||||
|
||||
match status {
|
||||
| NodeStatus::Rejected(_) => writeln!(graph, "class {node_id} rejected;")?,
|
||||
| NodeStatus::SoftFailed(_) => writeln!(graph, "class {node_id} soft_failed;")?,
|
||||
| NodeStatus::Normal(_) => {},
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn show_auth_chain(&self, event_id: OwnedEventId) -> Result {
|
||||
let node_status = async |event_id: &EventId, missing: bool| -> NodeStatus {
|
||||
if self
|
||||
.services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.is_event_rejected(event_id)
|
||||
.await
|
||||
{
|
||||
NodeStatus::Rejected(missing)
|
||||
} else if self
|
||||
.services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.is_event_soft_failed(event_id)
|
||||
.await
|
||||
{
|
||||
NodeStatus::SoftFailed(missing)
|
||||
} else {
|
||||
NodeStatus::Normal(missing)
|
||||
}
|
||||
};
|
||||
|
||||
let Ok(root) = self.services.rooms.timeline.get_pdu(&event_id).await else {
|
||||
return Err!("Event not found.");
|
||||
};
|
||||
|
||||
let mut graph = String::from(
|
||||
"```mermaid\n%% This is a mermaid graph. You can plug this output into\n\
|
||||
%% https://mermaid.live/edit to visualise it on-the-fly.\nflowchart TD\n\
|
||||
classDef rejected fill:#ffe5e5,stroke:#cc0000,stroke-width:2px,color:#000;\n\
|
||||
classDef soft_failed fill:#fff6cc,stroke:#c9a400,stroke-width:2px,color:#000;\n"
|
||||
);
|
||||
|
||||
let mut node_ids: HashMap<OwnedEventId, String> = HashMap::new();
|
||||
let mut cached_events: HashMap<OwnedEventId, PduEvent> =
|
||||
HashMap::from([(event_id.clone(), root.clone())]);
|
||||
let mut scheduled: HashSet<OwnedEventId> = HashSet::from([event_id.clone()]);
|
||||
let mut visited: HashSet<OwnedEventId> = HashSet::new();
|
||||
let mut stack = vec![root];
|
||||
let mut next_node_id = 0_usize;
|
||||
|
||||
let node_id_for = |event_id: &OwnedEventId,
|
||||
node_ids: &mut HashMap<OwnedEventId, String>,
|
||||
next_node_id: &mut usize| {
|
||||
node_ids
|
||||
.entry(event_id.clone())
|
||||
.or_insert_with(|| {
|
||||
let id = format!("n{}", *next_node_id);
|
||||
*next_node_id = next_node_id.saturating_add(1);
|
||||
id
|
||||
})
|
||||
.clone()
|
||||
};
|
||||
|
||||
while let Some(event) = stack.pop() {
|
||||
let current_event_id = event.event_id().to_owned();
|
||||
if !visited.insert(current_event_id.clone()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let current_node_id = node_id_for(¤t_event_id, &mut node_ids, &mut next_node_id);
|
||||
let current_status = node_status(¤t_event_id, false).await;
|
||||
|
||||
render_node(&mut graph, ¤t_node_id, ¤t_event_id, current_status)?;
|
||||
|
||||
let mut children = Vec::with_capacity(event.auth_events.len());
|
||||
for auth_event_id in event.auth_events().rev() {
|
||||
let auth_event_id = auth_event_id.to_owned();
|
||||
let auth_node_id = node_id_for(&auth_event_id, &mut node_ids, &mut next_node_id);
|
||||
writeln!(graph, "{current_node_id} --> {auth_node_id}")?;
|
||||
|
||||
let first_seen = scheduled.insert(auth_event_id.clone());
|
||||
let auth_pdu = if let Some(auth_pdu) = cached_events.get(&auth_event_id) {
|
||||
// NOTE: events might be referenced multiple times (like the create event)
|
||||
// so this saves some cheeky db lookup time
|
||||
Some(auth_pdu.clone())
|
||||
} else if first_seen {
|
||||
match self.services.rooms.timeline.get_pdu(&auth_event_id).await {
|
||||
| Ok(auth_event) => {
|
||||
cached_events.insert(auth_event_id.clone(), auth_event.clone());
|
||||
Some(auth_event)
|
||||
},
|
||||
| Err(_) => None,
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// NOTE: Depth is used as the primary sorting key here, even though it has no
|
||||
// bearing on state resolution or anything. Timestamp is used as a
|
||||
// tiebreaker, failing back to lexicographical comparison.
|
||||
let (depth, ts) = auth_pdu
|
||||
.as_ref()
|
||||
.map_or((UInt::MAX, UInt::MAX), |pdu| (pdu.depth, pdu.origin_server_ts));
|
||||
|
||||
children.push(AuthChild {
|
||||
node_id: auth_node_id,
|
||||
event_id: auth_event_id,
|
||||
depth,
|
||||
ts,
|
||||
first_seen,
|
||||
pdu: auth_pdu,
|
||||
});
|
||||
}
|
||||
|
||||
children.sort_by(|a, b| {
|
||||
a.depth
|
||||
.cmp(&b.depth)
|
||||
.then(a.ts.cmp(&b.ts))
|
||||
.then(a.event_id.as_str().cmp(b.event_id.as_str()))
|
||||
});
|
||||
|
||||
for child in children.into_iter().rev() {
|
||||
if !child.first_seen {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(child_pdu) = child.pdu {
|
||||
// We have this PDU so will want to traverse it.
|
||||
stack.push(child_pdu);
|
||||
} else {
|
||||
// We don't have this PDU locally so we can't traverse its auth events,
|
||||
// but we can still render it as a node.
|
||||
render_node(
|
||||
&mut graph,
|
||||
&child.node_id,
|
||||
&child.event_id,
|
||||
node_status(&child.event_id, true).await,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
graph.push_str("```\n");
|
||||
self.write_str(&graph).await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn parse_pdu(&self) -> Result {
|
||||
if self.body.len() < 2
|
||||
@@ -111,15 +294,31 @@ pub(super) async fn get_pdu(&self, event_id: OwnedEventId) -> Result {
|
||||
outlier = true;
|
||||
pdu_json = self.services.rooms.timeline.get_pdu_json(&event_id).await;
|
||||
}
|
||||
let rejected = self
|
||||
.services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.is_event_rejected(&event_id)
|
||||
.await;
|
||||
let soft_failed = self
|
||||
.services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.is_event_soft_failed(&event_id)
|
||||
.await;
|
||||
|
||||
match pdu_json {
|
||||
| Err(_) => return Err!("PDU not found locally."),
|
||||
| Ok(json) => {
|
||||
let text = serde_json::to_string_pretty(&json)?;
|
||||
let msg = if outlier {
|
||||
"Outlier (Rejected / Soft Failed) PDU found in our database"
|
||||
let msg = if rejected {
|
||||
"Rejected PDU:"
|
||||
} else if soft_failed {
|
||||
"Soft-failed PDU:"
|
||||
} else if outlier {
|
||||
"Outlier PDU:"
|
||||
} else {
|
||||
"PDU found in our database"
|
||||
"PDU:"
|
||||
};
|
||||
write!(self, "{msg}\n```json\n{text}\n```")
|
||||
},
|
||||
@@ -614,6 +813,10 @@ pub(super) async fn force_set_room_state_from_server(
|
||||
.await;
|
||||
|
||||
state.insert(shortstatekey, pdu.event_id.clone());
|
||||
self.services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.clear_pdu_markers(pdu.event_id());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -631,6 +834,10 @@ pub(super) async fn force_set_room_state_from_server(
|
||||
.rooms
|
||||
.outlier
|
||||
.add_pdu_outlier(&event_id, &value);
|
||||
self.services
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.clear_pdu_markers(&event_id);
|
||||
}
|
||||
|
||||
info!("Resolving new room state");
|
||||
@@ -662,10 +869,7 @@ pub(super) async fn force_set_room_state_from_server(
|
||||
.force_state(room_id.clone().as_ref(), short_state_hash, added, removed, &state_lock)
|
||||
.await?;
|
||||
|
||||
info!(
|
||||
"Updating joined counts for room just in case (e.g. we may have found a difference in \
|
||||
the room's m.room.member state"
|
||||
);
|
||||
info!("Updating joined counts for room");
|
||||
self.services
|
||||
.rooms
|
||||
.state_cache
|
||||
|
||||
+10
-1
@@ -17,12 +17,21 @@ pub enum DebugCommand {
|
||||
message: Vec<String>,
|
||||
},
|
||||
|
||||
/// Get the auth_chain of a PDU
|
||||
/// Loads the auth_chain of a PDU, reporting how long it took.
|
||||
GetAuthChain {
|
||||
/// An event ID (the $ character followed by the base64 reference hash)
|
||||
event_id: OwnedEventId,
|
||||
},
|
||||
|
||||
/// Walks & displays the auth_chain of a PDU in a mermaid graph format.
|
||||
///
|
||||
/// This is useless to basically anyone but developers, and is also probably
|
||||
/// slow and memory hungry.
|
||||
ShowAuthChain {
|
||||
/// The root event ID to start walking back from.
|
||||
event_id: OwnedEventId,
|
||||
},
|
||||
|
||||
/// Parse and print a PDU from a JSON
|
||||
///
|
||||
/// The PDU event is only checked for validity and is not added to the
|
||||
|
||||
@@ -102,10 +102,6 @@ pub(super) async fn remote_user_in_rooms(&self, user_id: OwnedUserId) -> Result
|
||||
);
|
||||
}
|
||||
|
||||
if !self.services.users.exists(&user_id).await {
|
||||
return Err!("Remote user does not exist in our database.",);
|
||||
}
|
||||
|
||||
let mut rooms: Vec<(OwnedRoomId, u64, String)> = self
|
||||
.services
|
||||
.rooms
|
||||
|
||||
@@ -15,10 +15,6 @@ pub enum UsersCommand {
|
||||
|
||||
IterUsers2,
|
||||
|
||||
PasswordHash {
|
||||
user_id: OwnedUserId,
|
||||
},
|
||||
|
||||
ListDevices {
|
||||
user_id: OwnedUserId,
|
||||
},
|
||||
@@ -235,16 +231,6 @@ async fn count_users(&self) -> Result {
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
async fn password_hash(&self, user_id: OwnedUserId) -> Result {
|
||||
let timer = tokio::time::Instant::now();
|
||||
let result = self.services.users.password_hash(&user_id).await;
|
||||
let query_time = timer.elapsed();
|
||||
|
||||
self.write_str(&format!("Query completed in {query_time:?}:\n\n```rs\n{result:#?}\n```"))
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
async fn list_devices(&self, user_id: OwnedUserId) -> Result {
|
||||
let timer = tokio::time::Instant::now();
|
||||
|
||||
+151
-44
@@ -4,8 +4,7 @@
|
||||
};
|
||||
|
||||
use api::client::{
|
||||
full_user_deactivate, join_room_by_id_helper, leave_room, recreate_push_rules_and_return,
|
||||
remote_leave_room,
|
||||
full_user_deactivate, leave_room, recreate_push_rules_and_return, remote_leave_room,
|
||||
};
|
||||
use conduwuit::{
|
||||
Err, Result, debug_warn, error, info,
|
||||
@@ -24,6 +23,7 @@
|
||||
tag::{TagEvent, TagEventContent, TagInfo},
|
||||
},
|
||||
};
|
||||
use service::users::HashedPassword;
|
||||
|
||||
use crate::{
|
||||
admin_command, get_room_info,
|
||||
@@ -70,7 +70,7 @@ pub(super) async fn create_user(&self, username: String, password: Option<String
|
||||
// Create user
|
||||
self.services
|
||||
.users
|
||||
.create(&user_id, Some(password.as_str()), None)
|
||||
.create(&user_id, Some(HashedPassword::new(&password)?))
|
||||
.await?;
|
||||
|
||||
// Default to pretty displayname
|
||||
@@ -134,18 +134,20 @@ pub(super) async fn create_user(&self, username: String, password: Option<String
|
||||
}
|
||||
|
||||
if let Some(room_server_name) = room.server_name() {
|
||||
match join_room_by_id_helper(
|
||||
self.services,
|
||||
&user_id,
|
||||
&room_id,
|
||||
Some("Automatically joining this room upon registration".to_owned()),
|
||||
&[
|
||||
self.services.globals.server_name().to_owned(),
|
||||
room_server_name.to_owned(),
|
||||
],
|
||||
&None,
|
||||
)
|
||||
.await
|
||||
match self
|
||||
.services
|
||||
.rooms
|
||||
.membership
|
||||
.join_room(
|
||||
&user_id,
|
||||
&room_id,
|
||||
Some("Automatically joining this room upon registration".to_owned()),
|
||||
&[
|
||||
self.services.globals.server_name().to_owned(),
|
||||
room_server_name.to_owned(),
|
||||
],
|
||||
)
|
||||
.await
|
||||
{
|
||||
| Ok(_response) => {
|
||||
info!("Automatically joined room {room} for user {user_id}");
|
||||
@@ -274,17 +276,13 @@ pub(super) async fn reset_password(
|
||||
|
||||
let new_password = password.unwrap_or_else(|| utils::random_string(AUTO_GEN_PASSWORD_LENGTH));
|
||||
|
||||
match self
|
||||
.services
|
||||
self.services
|
||||
.users
|
||||
.set_password(&user_id, Some(new_password.as_str()))
|
||||
.await
|
||||
{
|
||||
| Err(e) => return Err!("Couldn't reset the password for user {user_id}: {e}"),
|
||||
| Ok(()) => {
|
||||
write!(self, "Successfully reset the password for user {user_id}: `{new_password}`")
|
||||
},
|
||||
}
|
||||
.set_password(&user_id, Some(HashedPassword::new(&new_password)?));
|
||||
|
||||
self.write_str(&format!(
|
||||
"Successfully reset the password for user {user_id}: `{new_password}`"
|
||||
))
|
||||
.await?;
|
||||
|
||||
if logout {
|
||||
@@ -431,6 +429,82 @@ pub(super) async fn deactivate_all(&self, no_leave_rooms: bool, force: bool) ->
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn list_invited_rooms(&self, user_id: String) -> Result {
|
||||
// Validate user id
|
||||
let user_id = parse_local_user_id(self.services, &user_id)?;
|
||||
|
||||
let mut rooms: Vec<((OwnedRoomId, u64, String), Result<OwnedUserId>)> = self
|
||||
.services
|
||||
.rooms
|
||||
.state_cache
|
||||
.rooms_invited(&user_id)
|
||||
.then(async |(room_id, _)| {
|
||||
let sender = self
|
||||
.services
|
||||
.rooms
|
||||
.state_cache
|
||||
.invite_sender(&user_id, &room_id)
|
||||
.await;
|
||||
(get_room_info(self.services, &room_id).await, sender)
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
if rooms.is_empty() {
|
||||
return Err!("User is not invited to any rooms.");
|
||||
}
|
||||
|
||||
rooms.sort_by_key(|r| r.0.1);
|
||||
rooms.reverse();
|
||||
|
||||
let body = rooms
|
||||
.iter()
|
||||
.map(|((id, members, name), sender)| match sender {
|
||||
| Ok(user_id) =>
|
||||
format!("{id}\tInviter: {user_id}\tMembers: {members}\tName: {name}"),
|
||||
| Err(_) => format!("{id}\tMembers: {members}\tName: {name}"),
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
|
||||
self.write_str(&format!("Rooms {user_id} is Invited to ({}):\n```\n{body}\n```", rooms.len()))
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn reject_all_invites(&self, user_id: String) -> Result {
|
||||
let user_id = parse_local_user_id(self.services, &user_id)?;
|
||||
|
||||
assert!(
|
||||
self.services.globals.user_is_local(&user_id),
|
||||
"Parsed user_id must be a local user"
|
||||
);
|
||||
|
||||
let fails = self
|
||||
.services
|
||||
.rooms
|
||||
.state_cache
|
||||
.rooms_invited(&user_id)
|
||||
.filter_map(async |(room_id, _)| {
|
||||
match leave_room(self.services, &user_id, &room_id, None).await {
|
||||
| Err(ref e) => {
|
||||
warn!(%user_id, "Failed to leave {room_id}: {e}");
|
||||
Some(())
|
||||
},
|
||||
| Ok(()) => None,
|
||||
}
|
||||
})
|
||||
.count()
|
||||
.await;
|
||||
|
||||
if fails > 0 {
|
||||
return Err!("{fails} invites could not be rejected");
|
||||
}
|
||||
|
||||
self.write_str("Successfully rejected all invites.").await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn list_joined_rooms(&self, user_id: String) -> Result {
|
||||
// Validate user id
|
||||
@@ -556,15 +630,12 @@ pub(super) async fn force_join_list_of_local_users(
|
||||
let mut successful_joins: usize = 0;
|
||||
|
||||
for user_id in user_ids {
|
||||
match join_room_by_id_helper(
|
||||
self.services,
|
||||
&user_id,
|
||||
&room_id,
|
||||
Some(String::from(BULK_JOIN_REASON)),
|
||||
&servers,
|
||||
&None,
|
||||
)
|
||||
.await
|
||||
match self
|
||||
.services
|
||||
.rooms
|
||||
.membership
|
||||
.join_room(&user_id, &room_id, Some(String::from(BULK_JOIN_REASON)), &servers)
|
||||
.await
|
||||
{
|
||||
| Ok(_res) => {
|
||||
successful_joins = successful_joins.saturating_add(1);
|
||||
@@ -640,15 +711,12 @@ pub(super) async fn force_join_all_local_users(
|
||||
.collect::<Vec<_>>()
|
||||
.await
|
||||
{
|
||||
match join_room_by_id_helper(
|
||||
self.services,
|
||||
user_id,
|
||||
&room_id,
|
||||
Some(String::from(BULK_JOIN_REASON)),
|
||||
&servers,
|
||||
&None,
|
||||
)
|
||||
.await
|
||||
match self
|
||||
.services
|
||||
.rooms
|
||||
.membership
|
||||
.join_room(user_id, &room_id, Some(String::from(BULK_JOIN_REASON)), &servers)
|
||||
.await
|
||||
{
|
||||
| Ok(_res) => {
|
||||
successful_joins = successful_joins.saturating_add(1);
|
||||
@@ -685,7 +753,46 @@ pub(super) async fn force_join_room(
|
||||
self.services.globals.user_is_local(&user_id),
|
||||
"Parsed user_id must be a local user"
|
||||
);
|
||||
join_room_by_id_helper(self.services, &user_id, &room_id, None, &servers, &None).await?;
|
||||
self.services
|
||||
.rooms
|
||||
.membership
|
||||
.join_room(&user_id, &room_id, None, &servers)
|
||||
.await?;
|
||||
|
||||
self.write_str(&format!("{user_id} has been joined to {room_id}."))
|
||||
.await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn force_join_room_remotely(
|
||||
&self,
|
||||
user_id: String,
|
||||
room_id: OwnedRoomOrAliasId,
|
||||
via: String,
|
||||
) -> Result {
|
||||
let via = ServerName::parse(&via)?;
|
||||
let user_id = parse_local_user_id(self.services, &user_id)?;
|
||||
let (room_id, mut servers) = self
|
||||
.services
|
||||
.rooms
|
||||
.alias
|
||||
.resolve_with_servers(&room_id, None)
|
||||
.await?;
|
||||
if servers.contains(&via) {
|
||||
servers.retain(|n| *n != via);
|
||||
}
|
||||
servers.insert(0, via);
|
||||
|
||||
assert!(
|
||||
self.services.globals.user_is_local(&user_id),
|
||||
"Parsed user_id must be a local user"
|
||||
);
|
||||
let state_lock = self.services.rooms.state.mutex.lock(&*room_id).await;
|
||||
self.services
|
||||
.rooms
|
||||
.membership
|
||||
.join_remote_room(&user_id, &room_id, None, &servers, state_lock)
|
||||
.await?;
|
||||
|
||||
self.write_str(&format!("{user_id} has been joined to {room_id}."))
|
||||
.await
|
||||
|
||||
@@ -160,6 +160,17 @@ pub enum UserCommand {
|
||||
#[clap(alias = "list")]
|
||||
ListUsers,
|
||||
|
||||
/// Lists all the rooms (local and remote) that the specified user is
|
||||
/// invited to
|
||||
ListInvitedRooms {
|
||||
user_id: String,
|
||||
},
|
||||
|
||||
/// Manually make a user reject all current invites
|
||||
RejectAllInvites {
|
||||
user_id: String,
|
||||
},
|
||||
|
||||
/// Lists all the rooms (local and remote) that the specified user is
|
||||
/// joined in
|
||||
ListJoinedRooms {
|
||||
@@ -172,6 +183,20 @@ pub enum UserCommand {
|
||||
room_id: OwnedRoomOrAliasId,
|
||||
},
|
||||
|
||||
/// Manually join a local user to a room via a remote server, regardless of
|
||||
/// our current residency.
|
||||
ForceJoinRoomRemotely {
|
||||
/// The user to join
|
||||
user_id: String,
|
||||
/// The room to join
|
||||
room_id: OwnedRoomOrAliasId,
|
||||
/// The server name to join via.
|
||||
///
|
||||
/// This server will always be tried first, however if more are
|
||||
/// available, they may be tried after.
|
||||
via: String,
|
||||
},
|
||||
|
||||
/// Manually leave a local user from a room.
|
||||
ForceLeaveRoom {
|
||||
user_id: String,
|
||||
|
||||
+1
-1
@@ -48,7 +48,7 @@ pub(crate) fn parse_local_user_id(services: &Services, user_id: &str) -> Result<
|
||||
Ok(user_id)
|
||||
}
|
||||
|
||||
/// Parses user ID that is an active (not guest or deactivated) local user
|
||||
/// Parses user ID that is an active (not deactivated) local user
|
||||
pub(crate) async fn parse_active_local_user_id(
|
||||
services: &Services,
|
||||
user_id: &str,
|
||||
|
||||
+1
-3
@@ -48,9 +48,6 @@ jemalloc_stats = [
|
||||
"conduwuit-core/jemalloc_stats",
|
||||
"conduwuit-service/jemalloc_stats",
|
||||
]
|
||||
ldap = [
|
||||
"conduwuit-service/ldap"
|
||||
]
|
||||
release_max_log_level = [
|
||||
"conduwuit-core/release_max_log_level",
|
||||
"conduwuit-service/release_max_log_level",
|
||||
@@ -77,6 +74,7 @@ conduwuit-macros.workspace = true
|
||||
conduwuit-service.workspace = true
|
||||
const-str.workspace = true
|
||||
ctor.workspace = true
|
||||
dtor.workspace = true
|
||||
futures.workspace = true
|
||||
hmac.workspace = true
|
||||
http.workspace = true
|
||||
|
||||
@@ -24,9 +24,9 @@
|
||||
power_levels::RoomPowerLevelsEventContent,
|
||||
},
|
||||
};
|
||||
use service::{mailer::messages, uiaa::Identity};
|
||||
use service::{mailer::messages, uiaa::Identity, users::HashedPassword};
|
||||
|
||||
use super::{DEVICE_ID_LENGTH, TOKEN_LENGTH, join_room_by_id_helper};
|
||||
use super::{DEVICE_ID_LENGTH, TOKEN_LENGTH};
|
||||
use crate::Ruma;
|
||||
|
||||
pub(crate) mod register;
|
||||
@@ -150,8 +150,7 @@ pub(crate) async fn change_password_route(
|
||||
|
||||
services
|
||||
.users
|
||||
.set_password(&sender_user, Some(&body.new_password))
|
||||
.await?;
|
||||
.set_password(&sender_user, Some(HashedPassword::new(&body.new_password)?));
|
||||
|
||||
if body.logout_devices {
|
||||
// Logout all devices except the current one
|
||||
@@ -188,7 +187,7 @@ pub(crate) async fn change_password_route(
|
||||
if services.server.config.admin_room_notices {
|
||||
services
|
||||
.admin
|
||||
.notice(&format!("User {} changed their password.", &sender_user))
|
||||
.notice(&format!("User {sender_user} changed their password."))
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -239,19 +238,11 @@ pub(crate) async fn request_password_change_token_via_email_route(
|
||||
///
|
||||
/// Note: Also works for Application Services
|
||||
pub(crate) async fn whoami_route(
|
||||
State(services): State<crate::State>,
|
||||
State(_): State<crate::State>,
|
||||
body: Ruma<whoami::v3::Request>,
|
||||
) -> Result<whoami::v3::Response> {
|
||||
let is_guest = services
|
||||
.users
|
||||
.is_deactivated(body.sender_user())
|
||||
.await
|
||||
.map_err(|_| {
|
||||
err!(Request(Forbidden("Application service has not registered this user.")))
|
||||
})? && body.appservice_info.is_none();
|
||||
|
||||
Ok(assign!(whoami::v3::Response::new(body.sender_user().to_owned(), is_guest), {
|
||||
device_id: body.sender_device.clone(),
|
||||
Ok(assign!(whoami::v3::Response::new(body.sender_user().to_owned(), false), {
|
||||
device_id: body.sender_device,
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@@ -10,12 +10,11 @@
|
||||
use conduwuit_service::Services;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use lettre::{Address, message::Mailbox};
|
||||
use register::RegistrationKind;
|
||||
use ruma::{
|
||||
OwnedUserId, UserId,
|
||||
api::client::{
|
||||
account::{
|
||||
register::{self, LoginType},
|
||||
register::{self, LoginType, RegistrationKind},
|
||||
request_registration_token_via_email,
|
||||
},
|
||||
uiaa::{AuthFlow, AuthType},
|
||||
@@ -28,9 +27,9 @@
|
||||
push,
|
||||
};
|
||||
use serde_json::value::RawValue;
|
||||
use service::mailer::messages;
|
||||
use service::{mailer::messages, users::HashedPassword};
|
||||
|
||||
use super::{DEVICE_ID_LENGTH, TOKEN_LENGTH, join_room_by_id_helper};
|
||||
use super::{DEVICE_ID_LENGTH, TOKEN_LENGTH};
|
||||
use crate::Ruma;
|
||||
|
||||
const RANDOM_USER_ID_LENGTH: usize = 10;
|
||||
@@ -42,16 +41,6 @@
|
||||
/// You can use [`GET
|
||||
/// /_matrix/client/v3/register/available`](fn.get_register_available_route.
|
||||
/// html) to check if the user id is valid and available.
|
||||
///
|
||||
/// - Only works if registration is enabled
|
||||
/// - If type is guest: ignores all parameters except
|
||||
/// initial_device_display_name
|
||||
/// - If sender is not appservice: Requires UIAA (but we only use a dummy stage)
|
||||
/// - If type is not guest and no username is given: Always fails after UIAA
|
||||
/// check
|
||||
/// - Creates a new account and populates it with default account data
|
||||
/// - If `inhibit_login` is false: Creates a device and returns device id and
|
||||
/// access_token
|
||||
#[allow(clippy::doc_markdown)]
|
||||
#[tracing::instrument(skip_all, fields(%client), name = "register", level = "info")]
|
||||
pub(crate) async fn register_route(
|
||||
@@ -59,7 +48,10 @@ pub(crate) async fn register_route(
|
||||
ClientIp(client): ClientIp,
|
||||
body: Ruma<register::v3::Request>,
|
||||
) -> Result<register::v3::Response> {
|
||||
let is_guest = body.kind == RegistrationKind::Guest;
|
||||
if body.kind != RegistrationKind::User {
|
||||
return Err!(Request(GuestAccessForbidden("Guests may not register on this server.")));
|
||||
}
|
||||
|
||||
let emergency_mode_enabled = services.config.emergency_password.is_some();
|
||||
|
||||
// Allow registration if it's enabled in the config file or if this is the first
|
||||
@@ -68,69 +60,19 @@ pub(crate) async fn register_route(
|
||||
services.config.allow_registration || services.firstrun.is_first_run();
|
||||
|
||||
if !allow_registration && body.appservice_info.is_none() {
|
||||
match (body.username.as_ref(), body.initial_device_display_name.as_ref()) {
|
||||
| (Some(username), Some(device_display_name)) => {
|
||||
info!(
|
||||
%is_guest,
|
||||
user = %username,
|
||||
device_name = %device_display_name,
|
||||
"Rejecting registration attempt as registration is disabled"
|
||||
);
|
||||
},
|
||||
| (Some(username), _) => {
|
||||
info!(
|
||||
%is_guest,
|
||||
user = %username,
|
||||
"Rejecting registration attempt as registration is disabled"
|
||||
);
|
||||
},
|
||||
| (_, Some(device_display_name)) => {
|
||||
info!(
|
||||
%is_guest,
|
||||
device_name = %device_display_name,
|
||||
"Rejecting registration attempt as registration is disabled"
|
||||
);
|
||||
},
|
||||
| (None, _) => {
|
||||
info!(
|
||||
%is_guest,
|
||||
"Rejecting registration attempt as registration is disabled"
|
||||
);
|
||||
},
|
||||
}
|
||||
|
||||
return Err!(Request(Forbidden(
|
||||
"This server is not accepting registrations at this time."
|
||||
)));
|
||||
}
|
||||
|
||||
if is_guest && !services.config.allow_guest_registration {
|
||||
info!(
|
||||
"Guest registration disabled, rejecting guest registration attempt, initial device \
|
||||
name: \"{}\"",
|
||||
body.initial_device_display_name.as_deref().unwrap_or("")
|
||||
?body.username,
|
||||
?body.initial_device_display_name,
|
||||
"Rejecting registration attempt as registration is disabled"
|
||||
);
|
||||
return Err!(Request(GuestAccessForbidden("Guest registration is disabled.")));
|
||||
}
|
||||
|
||||
// forbid guests from registering if there is not a real admin user yet. give
|
||||
// generic user error.
|
||||
if is_guest && services.firstrun.is_first_run() {
|
||||
warn!(
|
||||
"Guest account attempted to register before a real admin user has been registered, \
|
||||
rejecting registration. Guest's initial device name: \"{}\"",
|
||||
body.initial_device_display_name.as_deref().unwrap_or("")
|
||||
);
|
||||
return Err!(Request(Forbidden(
|
||||
"This server is not accepting registrations at this time."
|
||||
)));
|
||||
}
|
||||
|
||||
// Appeservices and guests get to skip auth
|
||||
let skip_auth = body.appservice_info.is_some() || is_guest;
|
||||
|
||||
let identity = if skip_auth {
|
||||
// Appservices and guests have no identity
|
||||
let identity = if body.appservice_info.is_some() {
|
||||
// Appservices can skip auth
|
||||
None
|
||||
} else {
|
||||
// Perform UIAA to determine the user's identity
|
||||
@@ -157,13 +99,9 @@ pub(crate) async fn register_route(
|
||||
}
|
||||
});
|
||||
|
||||
let user_id = determine_registration_user_id(
|
||||
&services,
|
||||
supplied_username,
|
||||
is_guest,
|
||||
emergency_mode_enabled,
|
||||
)
|
||||
.await?;
|
||||
let user_id =
|
||||
determine_registration_user_id(&services, supplied_username, emergency_mode_enabled)
|
||||
.await?;
|
||||
|
||||
if body.body.login_type == Some(LoginType::ApplicationService) {
|
||||
// For appservice logins, make sure that the user ID is in the appservice's
|
||||
@@ -187,10 +125,16 @@ pub(crate) async fn register_route(
|
||||
return Err!(Request(Exclusive("Username is reserved by an appservice.")));
|
||||
}
|
||||
|
||||
let password = if is_guest { None } else { body.password.as_deref() };
|
||||
let password = if body.appservice_info.is_some() {
|
||||
None
|
||||
} else if let Some(password) = body.password.as_deref() {
|
||||
Some(HashedPassword::new(password)?)
|
||||
} else {
|
||||
return Err!(Request(InvalidParam("A password must be provided")));
|
||||
};
|
||||
|
||||
// Create user
|
||||
services.users.create(&user_id, password, None).await?;
|
||||
services.users.create(&user_id, password).await?;
|
||||
|
||||
// Set an initial display name
|
||||
let mut displayname = user_id.localpart().to_owned();
|
||||
@@ -222,7 +166,9 @@ pub(crate) async fn register_route(
|
||||
|
||||
// Generate new device id if the user didn't specify one
|
||||
let (token, device) = if !body.inhibit_login {
|
||||
let device_id = if is_guest { None } else { body.device_id.clone() }
|
||||
let device_id = body
|
||||
.device_id
|
||||
.clone()
|
||||
.unwrap_or_else(|| utils::random_string(DEVICE_ID_LENGTH).into());
|
||||
|
||||
// Generate new token for the device
|
||||
@@ -263,8 +209,7 @@ pub(crate) async fn register_route(
|
||||
|
||||
let device_display_name = body.initial_device_display_name.as_deref().unwrap_or("");
|
||||
|
||||
// log in conduit admin channel if a non-guest user registered
|
||||
if body.appservice_info.is_none() && !is_guest {
|
||||
if body.appservice_info.is_none() {
|
||||
if !device_display_name.is_empty() {
|
||||
let notice = format!(
|
||||
"New user \"{user_id}\" registered on this server from IP {client} and device \
|
||||
@@ -285,65 +230,32 @@ pub(crate) async fn register_route(
|
||||
}
|
||||
}
|
||||
|
||||
// log in conduit admin channel if a guest registered
|
||||
if body.appservice_info.is_none() && is_guest && services.config.log_guest_registrations {
|
||||
debug_info!("New guest user \"{user_id}\" registered on this server.");
|
||||
// Make the first user to register an administrator and disable first-run mode.
|
||||
let was_first_user = services.firstrun.empower_first_user(&user_id).await?;
|
||||
|
||||
if !device_display_name.is_empty() {
|
||||
if services.server.config.admin_room_notices {
|
||||
services
|
||||
.admin
|
||||
.notice(&format!(
|
||||
"Guest user \"{user_id}\" with device display name \
|
||||
\"{device_display_name}\" registered on this server from IP {client}"
|
||||
))
|
||||
.await;
|
||||
}
|
||||
} else {
|
||||
#[allow(clippy::collapsible_else_if)]
|
||||
if services.server.config.admin_room_notices {
|
||||
services
|
||||
.admin
|
||||
.notice(&format!(
|
||||
"Guest user \"{user_id}\" with no device display name registered on \
|
||||
this server from IP {client}",
|
||||
))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !is_guest {
|
||||
// Make the first user to register an administrator and disable first-run mode.
|
||||
let was_first_user = services.firstrun.empower_first_user(&user_id).await?;
|
||||
|
||||
// If the registering user was not the first and we're suspending users on
|
||||
// register, suspend them.
|
||||
if !was_first_user && services.config.suspend_on_register {
|
||||
// Note that we can still do auto joins for suspended users
|
||||
// If the registering user was not the first and we're suspending users on
|
||||
// register, suspend them.
|
||||
if !was_first_user && services.config.suspend_on_register {
|
||||
// Note that we can still do auto joins for suspended users
|
||||
services
|
||||
.users
|
||||
.suspend_account(&user_id, &services.globals.server_user)
|
||||
.await;
|
||||
// And send an @room notice to the admin room, to prompt admins to review the
|
||||
// new user and ideally unsuspend them if deemed appropriate.
|
||||
if services.server.config.admin_room_notices {
|
||||
services
|
||||
.users
|
||||
.suspend_account(&user_id, &services.globals.server_user)
|
||||
.await;
|
||||
// And send an @room notice to the admin room, to prompt admins to review the
|
||||
// new user and ideally unsuspend them if deemed appropriate.
|
||||
if services.server.config.admin_room_notices {
|
||||
services
|
||||
.admin
|
||||
.send_loud_message(RoomMessageEventContent::text_plain(format!(
|
||||
"User {user_id} has been suspended as they are not the first user on \
|
||||
this server. Please review and unsuspend them if appropriate."
|
||||
)))
|
||||
.await
|
||||
.ok();
|
||||
}
|
||||
.admin
|
||||
.send_loud_message(RoomMessageEventContent::text_plain(format!(
|
||||
"User {user_id} has been suspended as they are not the first user on this \
|
||||
server. Please review and unsuspend them if appropriate."
|
||||
)))
|
||||
.await
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
|
||||
if body.appservice_info.is_none()
|
||||
&& !services.server.config.auto_join_rooms.is_empty()
|
||||
&& (services.config.allow_guests_auto_join_rooms || !is_guest)
|
||||
{
|
||||
if body.appservice_info.is_none() && !services.server.config.auto_join_rooms.is_empty() {
|
||||
for room in &services.server.config.auto_join_rooms {
|
||||
let Ok(room_id) = services.rooms.alias.resolve(room).await else {
|
||||
error!(
|
||||
@@ -366,16 +278,17 @@ pub(crate) async fn register_route(
|
||||
}
|
||||
|
||||
if let Some(room_server_name) = room.server_name() {
|
||||
match join_room_by_id_helper(
|
||||
&services,
|
||||
&user_id,
|
||||
&room_id,
|
||||
Some("Automatically joining this room upon registration".to_owned()),
|
||||
&[services.globals.server_name().to_owned(), room_server_name.to_owned()],
|
||||
&body.appservice_info,
|
||||
)
|
||||
.boxed()
|
||||
.await
|
||||
match services
|
||||
.rooms
|
||||
.membership
|
||||
.join_room(
|
||||
&user_id,
|
||||
&room_id,
|
||||
Some("Automatically joining this room upon registration".to_owned()),
|
||||
&[services.globals.server_name().to_owned(), room_server_name.to_owned()],
|
||||
)
|
||||
.boxed()
|
||||
.await
|
||||
{
|
||||
| Err(e) => {
|
||||
// don't return this error so we don't fail registrations
|
||||
@@ -511,12 +424,9 @@ async fn create_registration_uiaa_session(
|
||||
async fn determine_registration_user_id(
|
||||
services: &Services,
|
||||
supplied_username: Option<String>,
|
||||
is_guest: bool,
|
||||
emergency_mode_enabled: bool,
|
||||
) -> Result<OwnedUserId> {
|
||||
if let Some(supplied_username) = supplied_username
|
||||
&& !is_guest
|
||||
{
|
||||
if let Some(supplied_username) = supplied_username {
|
||||
// The user gets to pick their username. Do some validation to make sure it's
|
||||
// acceptable.
|
||||
|
||||
@@ -569,7 +479,7 @@ async fn determine_registration_user_id(
|
||||
|
||||
Ok(user_id)
|
||||
} else {
|
||||
// The user is a guest or didn't specify a username. Generate a username for
|
||||
// The user didn't specify a username. Generate a username for
|
||||
// them.
|
||||
|
||||
loop {
|
||||
|
||||
@@ -122,16 +122,6 @@ pub(crate) async fn set_room_visibility_route(
|
||||
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||
}
|
||||
|
||||
if services
|
||||
.users
|
||||
.is_deactivated(sender_user)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
&& body.appservice_info.is_none()
|
||||
{
|
||||
return Err!(Request(Forbidden("Guests cannot publish to room directories")));
|
||||
}
|
||||
|
||||
if !user_can_publish_room(&services, sender_user, &body.room_id).await? {
|
||||
return Err!(Request(Forbidden("User is not allowed to publish this room")));
|
||||
}
|
||||
|
||||
@@ -64,6 +64,27 @@ pub(crate) async fn upload_keys_route(
|
||||
.await?;
|
||||
}
|
||||
|
||||
for (key_id, fallback_key) in &body.fallback_keys {
|
||||
if fallback_key
|
||||
.deserialize()
|
||||
.inspect_err(|e| {
|
||||
debug_warn!(
|
||||
%key_id,
|
||||
?fallback_key,
|
||||
"Invalid one time key JSON submitted by client, skipping: {e}"
|
||||
);
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
services
|
||||
.users
|
||||
.add_fallback_key(sender_user, sender_device, key_id, fallback_key, false)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if let Some(device_keys) = &body.device_keys {
|
||||
let deser_device_keys = device_keys.deserialize().map_err(|e| {
|
||||
err!(Request(BadJson(debug_warn!(
|
||||
|
||||
@@ -1,58 +1,18 @@
|
||||
use std::{borrow::Borrow, collections::HashMap, iter::once, sync::Arc};
|
||||
|
||||
use axum::extract::State;
|
||||
use axum_client_ip::ClientIp;
|
||||
use conduwuit::{
|
||||
Err, Result, debug, debug_info, debug_warn, err, error, info, is_true,
|
||||
matrix::{
|
||||
StateKey,
|
||||
event::{gen_event_id, gen_event_id_canonical_json},
|
||||
pdu::{PartialPdu, PduEvent},
|
||||
state_res,
|
||||
},
|
||||
Err, Result, debug,
|
||||
result::FlatOk,
|
||||
trace,
|
||||
utils::{
|
||||
self, shuffle,
|
||||
stream::{IterStream, ReadyExt},
|
||||
to_canonical_object,
|
||||
},
|
||||
warn,
|
||||
utils::{shuffle, stream::IterStream},
|
||||
};
|
||||
use futures::{FutureExt, StreamExt, TryFutureExt};
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use ruma::{
|
||||
CanonicalJsonObject, CanonicalJsonValue, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId,
|
||||
RoomVersionId, UserId,
|
||||
api::{
|
||||
client::membership::{join_room_by_id, join_room_by_id_or_alias},
|
||||
error::{ErrorKind, IncompatibleRoomVersionErrorData},
|
||||
federation::{self},
|
||||
},
|
||||
canonical_json::to_canonical_value,
|
||||
events::{
|
||||
StateEventType,
|
||||
room::{
|
||||
join_rules::JoinRule,
|
||||
member::{MembershipState, RoomMemberEventContent},
|
||||
},
|
||||
},
|
||||
OwnedRoomId, OwnedServerName, OwnedUserId, UserId,
|
||||
api::client::membership::{join_room_by_id, join_room_by_id_or_alias},
|
||||
};
|
||||
use service::{
|
||||
Services,
|
||||
appservice::RegistrationInfo,
|
||||
rooms::{
|
||||
state::RoomMutexGuard,
|
||||
state_compressor::{CompressedState, HashSetCompressStateEvent},
|
||||
timeline::pdu_fits,
|
||||
},
|
||||
};
|
||||
use tokio::join;
|
||||
|
||||
use super::{banned_room_check, validate_remote_member_event_stub};
|
||||
use crate::{
|
||||
Ruma,
|
||||
server::{select_authorising_user, user_can_perform_restricted_join},
|
||||
};
|
||||
use super::banned_room_check;
|
||||
use crate::Ruma;
|
||||
|
||||
/// # `POST /_matrix/client/r0/rooms/{roomId}/join`
|
||||
///
|
||||
@@ -112,16 +72,14 @@ pub(crate) async fn join_room_by_id_route(
|
||||
shuffle(&mut servers);
|
||||
let servers = deprioritize(servers, &services.config.deprioritize_joins_through_servers);
|
||||
|
||||
join_room_by_id_helper(
|
||||
&services,
|
||||
sender_user,
|
||||
&body.room_id,
|
||||
body.reason.clone(),
|
||||
&servers,
|
||||
&body.appservice_info,
|
||||
)
|
||||
.boxed()
|
||||
.await
|
||||
let room_id = services
|
||||
.rooms
|
||||
.membership
|
||||
.join_room(sender_user, &body.room_id, body.reason.clone(), &servers)
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
Ok(join_room_by_id::v3::Response::new(room_id))
|
||||
}
|
||||
|
||||
/// # `POST /_matrix/client/r0/join/{roomIdOrAlias}`
|
||||
@@ -140,7 +98,6 @@ pub(crate) async fn join_room_by_id_or_alias_route(
|
||||
body: Ruma<join_room_by_id_or_alias::v3::Request>,
|
||||
) -> Result<join_room_by_id_or_alias::v3::Response> {
|
||||
let sender_user = body.sender_user();
|
||||
let appservice_info = &body.appservice_info;
|
||||
let body = &body.body;
|
||||
if services.users.is_suspended(sender_user).await? {
|
||||
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||
@@ -235,650 +192,14 @@ pub(crate) async fn join_room_by_id_or_alias_route(
|
||||
};
|
||||
|
||||
let servers = deprioritize(servers, &services.config.deprioritize_joins_through_servers);
|
||||
let join_room_response = join_room_by_id_helper(
|
||||
&services,
|
||||
sender_user,
|
||||
&room_id,
|
||||
body.reason.clone(),
|
||||
&servers,
|
||||
appservice_info,
|
||||
)
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
Ok(join_room_by_id_or_alias::v3::Response::new(join_room_response.room_id))
|
||||
}
|
||||
|
||||
pub async fn join_room_by_id_helper(
|
||||
services: &Services,
|
||||
sender_user: &UserId,
|
||||
room_id: &RoomId,
|
||||
reason: Option<String>,
|
||||
servers: &[OwnedServerName],
|
||||
appservice_info: &Option<RegistrationInfo>,
|
||||
) -> Result<join_room_by_id::v3::Response> {
|
||||
let state_lock = services.rooms.state.mutex.lock(room_id).await;
|
||||
|
||||
let user_is_guest = services
|
||||
.users
|
||||
.is_deactivated(sender_user)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
&& appservice_info.is_none();
|
||||
|
||||
if user_is_guest && !services.rooms.state_accessor.guest_can_join(room_id).await {
|
||||
return Err!(Request(Forbidden("Guests are not allowed to join this room")));
|
||||
}
|
||||
|
||||
if services
|
||||
let room_id = services
|
||||
.rooms
|
||||
.state_cache
|
||||
.is_joined(sender_user, room_id)
|
||||
.await
|
||||
{
|
||||
debug_warn!("{sender_user} is already joined in {room_id}");
|
||||
return Ok(join_room_by_id::v3::Response::new(room_id.to_owned()));
|
||||
}
|
||||
|
||||
if let Err(e) = services
|
||||
.antispam
|
||||
.user_may_join_room(
|
||||
sender_user.to_owned(),
|
||||
room_id.to_owned(),
|
||||
services
|
||||
.rooms
|
||||
.state_cache
|
||||
.is_invited(sender_user, room_id)
|
||||
.await,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("Antispam prevented user {} from joining room {}: {}", sender_user, room_id, e);
|
||||
return Err!(Request(Forbidden("You are not allowed to join this room.")));
|
||||
}
|
||||
|
||||
let server_in_room = services
|
||||
.rooms
|
||||
.state_cache
|
||||
.server_in_room(services.globals.server_name(), room_id)
|
||||
.await;
|
||||
|
||||
// Only check our known membership if we're already in the room.
|
||||
// See: https://forgejo.ellis.link/continuwuation/continuwuity/issues/855
|
||||
let membership = if server_in_room {
|
||||
services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.get_member(room_id, sender_user)
|
||||
.await
|
||||
} else {
|
||||
debug!("Ignoring local state for join {room_id}, we aren't in the room yet.");
|
||||
Ok(RoomMemberEventContent::new(MembershipState::Leave))
|
||||
};
|
||||
if let Ok(m) = membership {
|
||||
if m.membership == MembershipState::Ban {
|
||||
debug_warn!("{sender_user} is banned from {room_id} but attempted to join");
|
||||
// TODO: return reason
|
||||
return Err!(Request(Forbidden("You are banned from the room.")));
|
||||
}
|
||||
}
|
||||
|
||||
if !server_in_room && servers.is_empty() {
|
||||
return Err!(Request(NotFound(
|
||||
"No servers were provided to assist in joining the room remotely, and we are not \
|
||||
already participating in the room."
|
||||
)));
|
||||
}
|
||||
|
||||
if services.antispam.check_all_joins() {
|
||||
if let Err(e) = services
|
||||
.antispam
|
||||
.meowlnir_accept_make_join(room_id.to_owned(), sender_user.to_owned())
|
||||
.await
|
||||
{
|
||||
warn!("Antispam prevented user {} from joining room {}: {}", sender_user, room_id, e);
|
||||
return Err!(Request(Forbidden("Antispam rejected join request.")));
|
||||
}
|
||||
}
|
||||
|
||||
if server_in_room {
|
||||
join_room_by_id_helper_local(services, sender_user, room_id, reason, servers, state_lock)
|
||||
.boxed()
|
||||
.await?;
|
||||
} else {
|
||||
// Ask a remote server if we are not participating in this room
|
||||
join_room_by_id_helper_remote(
|
||||
services,
|
||||
sender_user,
|
||||
room_id,
|
||||
reason,
|
||||
servers,
|
||||
state_lock,
|
||||
)
|
||||
.membership
|
||||
.join_room(sender_user, &room_id, body.reason.clone(), &servers)
|
||||
.boxed()
|
||||
.await?;
|
||||
}
|
||||
Ok(join_room_by_id::v3::Response::new(room_id.to_owned()))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_remote", level = "info")]
|
||||
async fn join_room_by_id_helper_remote(
|
||||
services: &Services,
|
||||
sender_user: &UserId,
|
||||
room_id: &RoomId,
|
||||
reason: Option<String>,
|
||||
servers: &[OwnedServerName],
|
||||
state_lock: RoomMutexGuard,
|
||||
) -> Result {
|
||||
info!("Joining {room_id} over federation.");
|
||||
|
||||
let (make_join_response, remote_server) =
|
||||
make_join_request(services, sender_user, room_id, servers).await?;
|
||||
|
||||
info!("make_join finished");
|
||||
|
||||
let room_version = make_join_response.room_version.unwrap_or(RoomVersionId::V1);
|
||||
let room_version_rules = room_version
|
||||
.rules()
|
||||
.expect("room version should have defined rules");
|
||||
|
||||
if !services.server.supported_room_version(&room_version) {
|
||||
// How did we get here?
|
||||
return Err!(BadServerResponse("Remote room version {room_version} is not supported"));
|
||||
}
|
||||
|
||||
let mut join_event_stub: CanonicalJsonObject =
|
||||
serde_json::from_str(make_join_response.event.get()).map_err(|e| {
|
||||
err!(BadServerResponse(warn!(
|
||||
"Invalid make_join event json received from server: {e:?}"
|
||||
)))
|
||||
})?;
|
||||
|
||||
let join_authorized_via_users_server = {
|
||||
use RoomVersionId::*;
|
||||
if !matches!(room_version, V1 | V2 | V3 | V4 | V5 | V6 | V7) {
|
||||
join_event_stub
|
||||
.get("content")
|
||||
.map(|s| {
|
||||
s.as_object()?
|
||||
.get("join_authorised_via_users_server")?
|
||||
.as_str()
|
||||
})
|
||||
.and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
join_event_stub.insert(
|
||||
"origin_server_ts".to_owned(),
|
||||
CanonicalJsonValue::Integer(
|
||||
utils::millis_since_unix_epoch()
|
||||
.try_into()
|
||||
.expect("Timestamp is valid js_int value"),
|
||||
),
|
||||
);
|
||||
|
||||
let mut join_content = RoomMemberEventContent::new(MembershipState::Join);
|
||||
join_content.displayname = services.users.displayname(sender_user).await.ok();
|
||||
join_content.avatar_url = services.users.avatar_url(sender_user).await.ok();
|
||||
join_content.blurhash = services.users.blurhash(sender_user).await.ok();
|
||||
join_content.reason = reason;
|
||||
join_content
|
||||
.join_authorized_via_users_server
|
||||
.clone_from(&join_authorized_via_users_server);
|
||||
|
||||
join_event_stub.insert(
|
||||
"content".to_owned(),
|
||||
to_canonical_value(join_content).expect("event is valid, we just created it"),
|
||||
);
|
||||
|
||||
// Remove event id if it exists
|
||||
join_event_stub.remove("event_id");
|
||||
|
||||
// In order to create a compatible ref hash (EventID) the `hashes` field needs
|
||||
// to be present
|
||||
services
|
||||
.server_keys
|
||||
.hash_and_sign_event(&mut join_event_stub, &room_version_rules)?;
|
||||
|
||||
// Generate event id
|
||||
let event_id = gen_event_id(&join_event_stub, &room_version_rules)?;
|
||||
|
||||
// Add event_id back
|
||||
join_event_stub
|
||||
.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into()));
|
||||
|
||||
// It has enough fields to be called a proper event now
|
||||
let mut join_event = join_event_stub;
|
||||
|
||||
info!("Asking {remote_server} for send_join in room {room_id}");
|
||||
let send_join_request = federation::membership::create_join_event::v2::Request::new(
|
||||
room_id.to_owned(),
|
||||
event_id.clone(),
|
||||
services
|
||||
.sending
|
||||
.convert_to_outgoing_federation_event(join_event.clone())
|
||||
.await,
|
||||
);
|
||||
|
||||
let send_join_response = match services
|
||||
.sending
|
||||
.send_synapse_request(&remote_server, send_join_request)
|
||||
.await
|
||||
{
|
||||
| Ok(response) => response,
|
||||
| Err(e) => {
|
||||
error!("send_join failed: {e}");
|
||||
return Err(e);
|
||||
},
|
||||
};
|
||||
|
||||
info!("send_join finished");
|
||||
|
||||
if join_authorized_via_users_server.is_some() {
|
||||
if let Some(signed_raw) = &send_join_response.room_state.event {
|
||||
debug_info!(
|
||||
"There is a signed event with join_authorized_via_users_server. This room is \
|
||||
probably using restricted joins. Adding signature to our event"
|
||||
);
|
||||
|
||||
let (signed_event_id, signed_value) =
|
||||
gen_event_id_canonical_json(signed_raw, &room_version_rules).map_err(|e| {
|
||||
err!(Request(BadJson(warn!(
|
||||
"Could not convert event to canonical JSON: {e}"
|
||||
))))
|
||||
})?;
|
||||
|
||||
if signed_event_id != event_id {
|
||||
return Err!(Request(BadJson(warn!(
|
||||
%signed_event_id, %event_id,
|
||||
"Server {remote_server} sent event with wrong event ID"
|
||||
))));
|
||||
}
|
||||
|
||||
match signed_value["signatures"]
|
||||
.as_object()
|
||||
.ok_or_else(|| {
|
||||
err!(BadServerResponse(warn!(
|
||||
"Server {remote_server} sent invalid signatures type"
|
||||
)))
|
||||
})
|
||||
.and_then(|e| {
|
||||
e.get(remote_server.as_str()).ok_or_else(|| {
|
||||
err!(BadServerResponse(warn!(
|
||||
"Server {remote_server} did not send its signature for a restricted \
|
||||
room"
|
||||
)))
|
||||
})
|
||||
}) {
|
||||
| Ok(signature) => {
|
||||
join_event
|
||||
.get_mut("signatures")
|
||||
.expect("we created a valid pdu")
|
||||
.as_object_mut()
|
||||
.expect("we created a valid pdu")
|
||||
.insert(remote_server.to_string(), signature.clone());
|
||||
},
|
||||
| Err(e) => {
|
||||
warn!(
|
||||
"Server {remote_server} sent invalid signature in send_join signatures \
|
||||
for event {signed_value:?}: {e:?}",
|
||||
);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
services
|
||||
.rooms
|
||||
.short
|
||||
.get_or_create_shortroomid(room_id)
|
||||
.await;
|
||||
|
||||
info!("Parsing join event");
|
||||
let parsed_join_pdu = PduEvent::from_id_val(&event_id, join_event.clone())
|
||||
.map_err(|e| err!(BadServerResponse("Invalid join event PDU: {e:?}")))?;
|
||||
|
||||
info!("Acquiring server signing keys for response events");
|
||||
let resp_events = &send_join_response.room_state;
|
||||
let resp_state = &resp_events.state;
|
||||
let resp_auth = &resp_events.auth_chain;
|
||||
services
|
||||
.server_keys
|
||||
.acquire_events_pubkeys(resp_auth.iter().chain(resp_state.iter()))
|
||||
.await;
|
||||
|
||||
info!("Going through send_join response room_state");
|
||||
let cork = services.db.cork_and_flush();
|
||||
let state = send_join_response
|
||||
.room_state
|
||||
.state
|
||||
.iter()
|
||||
.stream()
|
||||
.then(|pdu| {
|
||||
services
|
||||
.server_keys
|
||||
.validate_and_add_event_id_no_fetch(pdu, &room_version_rules)
|
||||
.inspect_err(|e| {
|
||||
debug_warn!("Could not validate send_join response room_state event: {e:?}");
|
||||
})
|
||||
.inspect(|_| debug!("Completed validating send_join response room_state event"))
|
||||
})
|
||||
.ready_filter_map(Result::ok)
|
||||
.fold(HashMap::new(), |mut state, (event_id, value)| async move {
|
||||
let pdu = match PduEvent::from_id_val(&event_id, value.clone()) {
|
||||
| Ok(pdu) => pdu,
|
||||
| Err(e) => {
|
||||
debug_warn!("Invalid PDU in send_join response: {e:?}: {value:#?}");
|
||||
return state;
|
||||
},
|
||||
};
|
||||
if !pdu_fits(&mut value.clone()) {
|
||||
warn!(
|
||||
"dropping incoming PDU {event_id} in room {room_id} from room join because \
|
||||
it exceeds 65535 bytes or is otherwise too large."
|
||||
);
|
||||
return state;
|
||||
}
|
||||
services.rooms.outlier.add_pdu_outlier(&event_id, &value);
|
||||
if let Some(state_key) = &pdu.state_key {
|
||||
let shortstatekey = services
|
||||
.rooms
|
||||
.short
|
||||
.get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)
|
||||
.await;
|
||||
|
||||
state.insert(shortstatekey, pdu.event_id.clone());
|
||||
}
|
||||
state
|
||||
})
|
||||
.await;
|
||||
|
||||
drop(cork);
|
||||
|
||||
info!("Going through send_join response auth_chain");
|
||||
let cork = services.db.cork_and_flush();
|
||||
send_join_response
|
||||
.room_state
|
||||
.auth_chain
|
||||
.iter()
|
||||
.stream()
|
||||
.then(|pdu| {
|
||||
services
|
||||
.server_keys
|
||||
.validate_and_add_event_id_no_fetch(pdu, &room_version_rules)
|
||||
})
|
||||
.ready_filter_map(Result::ok)
|
||||
.ready_for_each(|(event_id, value)| {
|
||||
trace!(%event_id, "Adding PDU as an outlier from send_join auth_chain");
|
||||
services.rooms.outlier.add_pdu_outlier(&event_id, &value);
|
||||
})
|
||||
.await;
|
||||
|
||||
drop(cork);
|
||||
|
||||
debug!("Running send_join auth check");
|
||||
let fetch_state = &state;
|
||||
let state_fetch = |k: StateEventType, s: StateKey| async move {
|
||||
let shortstatekey = services.rooms.short.get_shortstatekey(&k, &s).await.ok()?;
|
||||
|
||||
let event_id = fetch_state.get(&shortstatekey)?;
|
||||
services.rooms.timeline.get_pdu(event_id).await.ok()
|
||||
};
|
||||
|
||||
let auth_check = state_res::event_auth::auth_check(
|
||||
&room_version.rules().unwrap(),
|
||||
&parsed_join_pdu,
|
||||
None, // TODO: third party invite
|
||||
|k, s| state_fetch(k.clone(), s.into()),
|
||||
&state_fetch(StateEventType::RoomCreate, "".into())
|
||||
.await
|
||||
.expect("create event is missing from send_join auth"),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| err!(Request(Forbidden(warn!("Auth check failed: {e:?}")))))?;
|
||||
|
||||
if !auth_check {
|
||||
return Err!(Request(Forbidden("Auth check failed")));
|
||||
}
|
||||
|
||||
info!("Compressing state from send_join");
|
||||
let compressed: CompressedState = services
|
||||
.rooms
|
||||
.state_compressor
|
||||
.compress_state_events(state.iter().map(|(ssk, eid)| (ssk, eid.borrow())))
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
debug!("Saving compressed state");
|
||||
let HashSetCompressStateEvent {
|
||||
shortstatehash: statehash_before_join,
|
||||
added,
|
||||
removed,
|
||||
} = services
|
||||
.rooms
|
||||
.state_compressor
|
||||
.save_state(room_id, Arc::new(compressed))
|
||||
.await?;
|
||||
|
||||
debug!("Forcing state for new room");
|
||||
services
|
||||
.rooms
|
||||
.state
|
||||
.force_state(room_id, statehash_before_join, added, removed, &state_lock)
|
||||
.await?;
|
||||
|
||||
debug!("Updating joined counts for new room");
|
||||
services
|
||||
.rooms
|
||||
.state_cache
|
||||
.update_joined_count(room_id)
|
||||
.await;
|
||||
|
||||
// We append to state before appending the pdu, so we don't have a moment in
|
||||
// time with the pdu without it's state. This is okay because append_pdu can't
|
||||
// fail.
|
||||
let statehash_after_join = services
|
||||
.rooms
|
||||
.state
|
||||
.append_to_state(&parsed_join_pdu, room_id)
|
||||
.await?;
|
||||
|
||||
info!("Appending new room join event");
|
||||
services
|
||||
.rooms
|
||||
.timeline
|
||||
.append_pdu(
|
||||
&parsed_join_pdu,
|
||||
join_event,
|
||||
once(parsed_join_pdu.event_id.borrow()),
|
||||
&state_lock,
|
||||
room_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!("Setting final room state for new room");
|
||||
// We set the room state after inserting the pdu, so that we never have a moment
|
||||
// in time where events in the current room state do not exist
|
||||
services
|
||||
.rooms
|
||||
.state
|
||||
.set_room_state(room_id, statehash_after_join, &state_lock);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_local", level = "info")]
|
||||
async fn join_room_by_id_helper_local(
|
||||
services: &Services,
|
||||
sender_user: &UserId,
|
||||
room_id: &RoomId,
|
||||
reason: Option<String>,
|
||||
servers: &[OwnedServerName],
|
||||
state_lock: RoomMutexGuard,
|
||||
) -> Result {
|
||||
info!("Joining room locally");
|
||||
|
||||
let (room_version, join_rules, is_invited) = join!(
|
||||
services.rooms.state.get_room_version(room_id),
|
||||
services.rooms.state_accessor.get_join_rules(room_id),
|
||||
services.rooms.state_cache.is_invited(sender_user, room_id)
|
||||
);
|
||||
|
||||
let room_version = room_version?;
|
||||
let mut auth_user: Option<OwnedUserId> = None;
|
||||
if !is_invited && matches!(join_rules, JoinRule::Restricted(_) | JoinRule::KnockRestricted(_))
|
||||
{
|
||||
use RoomVersionId::*;
|
||||
if !matches!(room_version, V1 | V2 | V3 | V4 | V5 | V6 | V7) {
|
||||
// This is a restricted room, check if we can complete the join requirements
|
||||
// locally.
|
||||
let needs_auth_user =
|
||||
user_can_perform_restricted_join(services, sender_user, room_id).await;
|
||||
if needs_auth_user.is_ok_and(is_true!()) {
|
||||
// If there was an error or the value is false, we'll try joining over
|
||||
// federation. Since it's Ok(true), we can authorise this locally.
|
||||
// If we can't select a local user, this will remain None, the join will fail,
|
||||
// and we'll fall back to federation.
|
||||
auth_user = select_authorising_user(services, room_id, sender_user, &state_lock)
|
||||
.await
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut content = RoomMemberEventContent::new(MembershipState::Join);
|
||||
content.displayname = services.users.displayname(sender_user).await.ok();
|
||||
content.avatar_url = services.users.avatar_url(sender_user).await.ok();
|
||||
content.blurhash = services.users.blurhash(sender_user).await.ok();
|
||||
content.reason.clone_from(&reason);
|
||||
content.join_authorized_via_users_server = auth_user;
|
||||
|
||||
// Try normal join first
|
||||
let Err(error) = services
|
||||
.rooms
|
||||
.timeline
|
||||
.build_and_append_pdu(
|
||||
PartialPdu::state(sender_user.to_string(), &content),
|
||||
sender_user,
|
||||
Some(room_id),
|
||||
&state_lock,
|
||||
)
|
||||
.await
|
||||
else {
|
||||
info!("Joined room locally");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if servers.is_empty() || servers.len() == 1 && services.globals.server_is_ours(&servers[0]) {
|
||||
if !services.rooms.metadata.exists(room_id).await {
|
||||
return Err!(Request(
|
||||
Unknown(
|
||||
"Room was not found locally and no servers were found to help us discover it"
|
||||
),
|
||||
NOT_FOUND
|
||||
));
|
||||
}
|
||||
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
info!(
|
||||
?error,
|
||||
remote_servers = %servers.len(),
|
||||
"Could not join room locally, attempting remote join",
|
||||
);
|
||||
join_room_by_id_helper_remote(services, sender_user, room_id, reason, servers, state_lock)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn make_join_request(
|
||||
services: &Services,
|
||||
sender_user: &UserId,
|
||||
room_id: &RoomId,
|
||||
servers: &[OwnedServerName],
|
||||
) -> Result<(federation::membership::prepare_join_event::v1::Response, OwnedServerName)> {
|
||||
let mut make_join_counter: usize = 1;
|
||||
|
||||
for remote_server in servers {
|
||||
if services.globals.server_is_ours(remote_server) {
|
||||
continue;
|
||||
}
|
||||
info!(
|
||||
"Asking {remote_server} for make_join (attempt {make_join_counter}/{})",
|
||||
servers.len()
|
||||
);
|
||||
|
||||
let mut request = federation::membership::prepare_join_event::v1::Request::new(
|
||||
room_id.to_owned(),
|
||||
sender_user.to_owned(),
|
||||
);
|
||||
request.ver = services.server.supported_room_versions().collect();
|
||||
|
||||
let make_join_response = services
|
||||
.sending
|
||||
.send_federation_request(remote_server, request)
|
||||
.await;
|
||||
|
||||
trace!("make_join response: {:?}", make_join_response);
|
||||
make_join_counter = make_join_counter.saturating_add(1);
|
||||
|
||||
match make_join_response {
|
||||
| Ok(response) => {
|
||||
info!("Received make_join response from {remote_server}");
|
||||
if let Err(e) = validate_remote_member_event_stub(
|
||||
&MembershipState::Join,
|
||||
sender_user,
|
||||
room_id,
|
||||
&to_canonical_object(&response.event)?,
|
||||
) {
|
||||
warn!("make_join response from {remote_server} failed validation: {e}");
|
||||
continue;
|
||||
}
|
||||
return Ok((response, remote_server.clone()));
|
||||
},
|
||||
| Err(e) => match e.kind() {
|
||||
| ErrorKind::UnableToAuthorizeJoin => {
|
||||
info!(
|
||||
"{remote_server} was unable to verify the joining user satisfied \
|
||||
restricted join requirements: {e}. Will continue trying."
|
||||
);
|
||||
},
|
||||
| ErrorKind::UnableToGrantJoin => {
|
||||
info!(
|
||||
"{remote_server} believes the joining user satisfies restricted join \
|
||||
rules, but is unable to authorise a join for us. Will continue trying."
|
||||
);
|
||||
},
|
||||
| ErrorKind::IncompatibleRoomVersion(IncompatibleRoomVersionErrorData {
|
||||
room_version,
|
||||
..
|
||||
}) => {
|
||||
warn!(
|
||||
"{remote_server} reports the room we are trying to join is \
|
||||
v{room_version}, which we do not support."
|
||||
);
|
||||
return Err(e);
|
||||
},
|
||||
| ErrorKind::Forbidden => {
|
||||
warn!("{remote_server} refuses to let us join: {e}.");
|
||||
return Err(e);
|
||||
},
|
||||
| ErrorKind::NotFound => {
|
||||
info!(
|
||||
"{remote_server} does not know about {room_id}: {e}. Will continue \
|
||||
trying."
|
||||
);
|
||||
},
|
||||
| _ => {
|
||||
info!("{remote_server} failed to make_join: {e}. Will continue trying.");
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
info!("All {} servers were unable to assist in joining {room_id} :(", servers.len());
|
||||
Err!(BadServerResponse("No server available to assist in joining."))
|
||||
Ok(join_room_by_id_or_alias::v3::Response::new(room_id))
|
||||
}
|
||||
|
||||
/// Moves deprioritized servers (if any) to the back of the list.
|
||||
|
||||
@@ -33,12 +33,13 @@
|
||||
use service::{
|
||||
Services,
|
||||
rooms::{
|
||||
membership::validate_remote_member_event_stub,
|
||||
state::RoomMutexGuard,
|
||||
state_compressor::{CompressedState, HashSetCompressStateEvent},
|
||||
},
|
||||
};
|
||||
|
||||
use super::{banned_room_check, join::join_room_by_id_helper, validate_remote_member_event_stub};
|
||||
use super::banned_room_check;
|
||||
use crate::Ruma;
|
||||
|
||||
/// # `POST /_matrix/client/*/knock/{roomIdOrAlias}`
|
||||
@@ -238,15 +239,11 @@ async fn knock_room_by_id_helper(
|
||||
// join_room_by_id_helper We need to release the lock here and let
|
||||
// join_room_by_id_helper acquire it again
|
||||
drop(state_lock);
|
||||
match join_room_by_id_helper(
|
||||
services,
|
||||
sender_user,
|
||||
room_id,
|
||||
reason.clone(),
|
||||
servers,
|
||||
&None,
|
||||
)
|
||||
.await
|
||||
match services
|
||||
.rooms
|
||||
.membership
|
||||
.join_room(sender_user, room_id, reason.clone(), servers)
|
||||
.await
|
||||
{
|
||||
| Ok(_) => return Ok(knock_room::v3::Response::new(room_id.to_owned())),
|
||||
| Err(e) => {
|
||||
|
||||
@@ -19,9 +19,8 @@
|
||||
room::member::{MembershipState, RoomMemberEventContent},
|
||||
},
|
||||
};
|
||||
use service::Services;
|
||||
use service::{Services, rooms::membership::validate_remote_member_event_stub};
|
||||
|
||||
use super::validate_remote_member_event_stub;
|
||||
use crate::Ruma;
|
||||
|
||||
/// # `POST /_matrix/client/v3/rooms/{roomId}/leave`
|
||||
|
||||
@@ -13,16 +13,10 @@
|
||||
use axum::extract::State;
|
||||
use conduwuit::{Err, Result, warn};
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use ruma::{
|
||||
CanonicalJsonObject, OwnedRoomId, RoomId, ServerName, UserId,
|
||||
api::client::membership::joined_rooms,
|
||||
events::{
|
||||
StaticEventContent,
|
||||
room::member::{MembershipState, RoomMemberEventContent},
|
||||
},
|
||||
};
|
||||
use ruma::{OwnedRoomId, RoomId, ServerName, UserId, api::client::membership::joined_rooms};
|
||||
use service::Services;
|
||||
|
||||
pub use self::leave::{leave_all_rooms, leave_room, remote_leave_room};
|
||||
pub(crate) use self::{
|
||||
ban::ban_user_route,
|
||||
forget::forget_room_route,
|
||||
@@ -34,10 +28,6 @@
|
||||
members::{get_member_events_route, joined_members_route},
|
||||
unban::unban_user_route,
|
||||
};
|
||||
pub use self::{
|
||||
join::join_room_by_id_helper,
|
||||
leave::{leave_all_rooms, leave_room, remote_leave_room},
|
||||
};
|
||||
use crate::{Ruma, client::full_user_deactivate};
|
||||
|
||||
/// # `POST /_matrix/client/r0/joined_rooms`
|
||||
@@ -159,80 +149,3 @@ pub(crate) async fn banned_room_check(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validates that an event returned from a remote server by `/make_*`
|
||||
/// actually is a membership event with the expected fields.
|
||||
///
|
||||
/// Without checking this, the remote server could use the remote membership
|
||||
/// mechanism to trick our server into signing arbitrary malicious events.
|
||||
pub(crate) fn validate_remote_member_event_stub(
|
||||
membership: &MembershipState,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
event_stub: &CanonicalJsonObject,
|
||||
) -> Result<()> {
|
||||
let Some(event_type) = event_stub.get("type") else {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with missing type field"
|
||||
));
|
||||
};
|
||||
if event_type != &RoomMemberEventContent::TYPE {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with invalid event type"
|
||||
));
|
||||
}
|
||||
|
||||
let Some(sender) = event_stub.get("sender") else {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with missing sender field"
|
||||
));
|
||||
};
|
||||
if sender != &user_id.as_str() {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with incorrect sender"
|
||||
));
|
||||
}
|
||||
|
||||
let Some(state_key) = event_stub.get("state_key") else {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with missing state_key field"
|
||||
));
|
||||
};
|
||||
if state_key != &user_id.as_str() {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with incorrect state_key"
|
||||
));
|
||||
}
|
||||
|
||||
let Some(event_room_id) = event_stub.get("room_id") else {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with missing room_id field"
|
||||
));
|
||||
};
|
||||
if event_room_id != &room_id.as_str() {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with incorrect room_id"
|
||||
));
|
||||
}
|
||||
|
||||
let Some(content) = event_stub
|
||||
.get("content")
|
||||
.and_then(|content| content.as_object())
|
||||
else {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with missing content field"
|
||||
));
|
||||
};
|
||||
let Some(event_membership) = content.get("membership") else {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with missing membership field"
|
||||
));
|
||||
};
|
||||
if event_membership != &membership.as_str() {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with incorrect membership type"
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -58,7 +58,7 @@
|
||||
pub(super) use media::*;
|
||||
pub(super) use media_legacy::*;
|
||||
pub(super) use membership::*;
|
||||
pub use membership::{join_room_by_id_helper, leave_all_rooms, leave_room, remote_leave_room};
|
||||
pub use membership::{leave_all_rooms, leave_room, remote_leave_room};
|
||||
pub(super) use message::*;
|
||||
pub(super) use mutual_rooms::*;
|
||||
pub(super) use openid::*;
|
||||
|
||||
@@ -537,10 +537,7 @@ pub(crate) async fn create_room_route(
|
||||
if services.server.config.admin_room_notices {
|
||||
services
|
||||
.admin
|
||||
.send_text(&format!(
|
||||
"{sender_user} made {} public to the room directory",
|
||||
&room_id
|
||||
))
|
||||
.send_text(&format!("{sender_user} made {room_id} public to the room directory"))
|
||||
.await;
|
||||
}
|
||||
info!("{sender_user} made {0} public to the room directory", &room_id);
|
||||
|
||||
+3
-130
@@ -4,10 +4,9 @@
|
||||
use axum_client_ip::ClientIp;
|
||||
use conduwuit::{
|
||||
Err, Result, debug, err, info,
|
||||
utils::{self, ReadyExt, hash, stream::BroadbandExt},
|
||||
utils::{self, ReadyExt, stream::BroadbandExt},
|
||||
warn,
|
||||
};
|
||||
use conduwuit_core::{debug_error, debug_warn};
|
||||
use conduwuit_service::Services;
|
||||
use futures::StreamExt;
|
||||
use lettre::Address;
|
||||
@@ -54,113 +53,6 @@ pub(crate) async fn get_login_types_route(
|
||||
]))
|
||||
}
|
||||
|
||||
/// Authenticates the given user by its ID and its password.
|
||||
///
|
||||
/// Returns the user ID if successful, and an error otherwise.
|
||||
#[tracing::instrument(skip_all, fields(%user_id), name = "password", level = "debug")]
|
||||
pub(crate) async fn password_login(
|
||||
services: &Services,
|
||||
user_id: &UserId,
|
||||
lowercased_user_id: &UserId,
|
||||
password: &str,
|
||||
) -> Result<OwnedUserId> {
|
||||
// Restrict login to accounts only of type 'password', including untyped
|
||||
// legacy accounts which are equivalent to 'password'.
|
||||
if services
|
||||
.users
|
||||
.origin(user_id)
|
||||
.await
|
||||
.is_ok_and(|origin| origin != "password")
|
||||
{
|
||||
return Err!(Request(Forbidden("Account does not permit password login.")));
|
||||
}
|
||||
|
||||
let (hash, user_id) = match services.users.password_hash(user_id).await {
|
||||
| Ok(hash) => (hash, user_id),
|
||||
| Err(_) => services
|
||||
.users
|
||||
.password_hash(lowercased_user_id)
|
||||
.await
|
||||
.map(|hash| (hash, lowercased_user_id))
|
||||
.map_err(|_| err!(Request(Forbidden("Invalid identifier or password."))))?,
|
||||
};
|
||||
|
||||
if hash.is_empty() {
|
||||
return Err!(Request(UserDeactivated("The user has been deactivated")));
|
||||
}
|
||||
|
||||
hash::verify_password(password, &hash)
|
||||
.inspect_err(|e| debug_error!("{e}"))
|
||||
.map_err(|_| err!(Request(Forbidden("Invalid identifier or password."))))?;
|
||||
|
||||
Ok(user_id.to_owned())
|
||||
}
|
||||
|
||||
/// Authenticates the given user through the configured LDAP server.
|
||||
///
|
||||
/// Creates the user if the user is found in the LDAP and do not already have an
|
||||
/// account.
|
||||
#[tracing::instrument(skip_all, fields(%user_id), name = "ldap", level = "debug")]
|
||||
pub(super) async fn ldap_login(
|
||||
services: &Services,
|
||||
user_id: &UserId,
|
||||
lowercased_user_id: &UserId,
|
||||
password: &str,
|
||||
) -> Result<OwnedUserId> {
|
||||
let (user_dn, is_ldap_admin) = match services.config.ldap.bind_dn.as_ref() {
|
||||
| Some(bind_dn) if bind_dn.contains("{username}") =>
|
||||
(bind_dn.replace("{username}", lowercased_user_id.localpart()), None),
|
||||
| _ => {
|
||||
debug!("Searching user in LDAP");
|
||||
|
||||
let dns = services.users.search_ldap(user_id).await?;
|
||||
if dns.len() >= 2 {
|
||||
return Err!(Ldap("LDAP search returned two or more results"));
|
||||
}
|
||||
|
||||
let Some((user_dn, is_admin)) = dns.first() else {
|
||||
return password_login(services, user_id, lowercased_user_id, password).await;
|
||||
};
|
||||
|
||||
(user_dn.clone(), *is_admin)
|
||||
},
|
||||
};
|
||||
|
||||
let user_id = services
|
||||
.users
|
||||
.auth_ldap(&user_dn, password)
|
||||
.await
|
||||
.map(|()| lowercased_user_id.to_owned())?;
|
||||
|
||||
// LDAP users are automatically created on first login attempt. This is a very
|
||||
// common feature that can be seen on many services using a LDAP provider for
|
||||
// their users (synapse, Nextcloud, Jellyfin, ...).
|
||||
//
|
||||
// LDAP users are crated with a dummy password but non empty because an empty
|
||||
// password is reserved for deactivated accounts. The conduwuit password field
|
||||
// will never be read to login a LDAP user so it's not an issue.
|
||||
if !services.users.exists(lowercased_user_id).await {
|
||||
services
|
||||
.users
|
||||
.create(lowercased_user_id, Some("*"), Some("ldap"))
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Only sync admin status if LDAP can actually determine it.
|
||||
// None means LDAP cannot determine admin status (manual config required).
|
||||
if let Some(is_ldap_admin) = is_ldap_admin {
|
||||
let is_conduwuit_admin = services.admin.user_is_admin(lowercased_user_id).await;
|
||||
|
||||
if is_ldap_admin && !is_conduwuit_admin {
|
||||
Box::pin(services.admin.make_user_admin(lowercased_user_id)).await?;
|
||||
} else if !is_ldap_admin && is_conduwuit_admin {
|
||||
Box::pin(services.admin.revoke_admin(lowercased_user_id)).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(user_id)
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_login(
|
||||
services: &Services,
|
||||
identifier: Option<&UserIdentifier>,
|
||||
@@ -191,15 +83,7 @@ pub(crate) async fn handle_login(
|
||||
UserId::parse_with_server_name(user_id_or_localpart, &services.config.server_name)
|
||||
.map_err(|_| err!(Request(InvalidUsername("User ID is malformed"))))?;
|
||||
|
||||
let lowercased_user_id = UserId::parse_with_server_name(
|
||||
user_id.localpart().to_lowercase(),
|
||||
&services.config.server_name,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
if !services.globals.user_is_local(&user_id)
|
||||
|| !services.globals.user_is_local(&lowercased_user_id)
|
||||
{
|
||||
if !services.globals.user_is_local(&user_id) {
|
||||
return Err!(Request(InvalidParam("User ID does not belong to this homeserver")));
|
||||
}
|
||||
|
||||
@@ -212,18 +96,7 @@ pub(crate) async fn handle_login(
|
||||
return Err!(Request(Forbidden("This account is not permitted to log in.")));
|
||||
}
|
||||
|
||||
if cfg!(feature = "ldap") && services.config.ldap.enable {
|
||||
match Box::pin(ldap_login(services, &user_id, &lowercased_user_id, password)).await {
|
||||
| Ok(user_id) => Ok(user_id),
|
||||
| Err(err) if services.config.ldap.ldap_only => Err(err),
|
||||
| Err(err) => {
|
||||
debug_warn!("{err}");
|
||||
password_login(services, &user_id, &lowercased_user_id, password).await
|
||||
},
|
||||
}
|
||||
} else {
|
||||
password_login(services, &user_id, &lowercased_user_id, password).await
|
||||
}
|
||||
services.users.check_password(&user_id, password).await
|
||||
}
|
||||
|
||||
/// # `POST /_matrix/client/v3/login`
|
||||
|
||||
@@ -395,6 +395,10 @@ pub(crate) async fn build_sync_events(
|
||||
.users
|
||||
.count_one_time_keys(syncing_user, syncing_device);
|
||||
|
||||
let unused_fallback_key_types = services
|
||||
.users
|
||||
.list_unused_fallback_key_types(syncing_user, syncing_device);
|
||||
|
||||
let (
|
||||
(joined_rooms, mut device_list_updates),
|
||||
left_rooms,
|
||||
@@ -405,6 +409,7 @@ pub(crate) async fn build_sync_events(
|
||||
to_device_events,
|
||||
keys_changed,
|
||||
device_one_time_keys_count,
|
||||
unused_fallback_key_types,
|
||||
) = async {
|
||||
futures::join!(
|
||||
joined_rooms,
|
||||
@@ -415,7 +420,8 @@ pub(crate) async fn build_sync_events(
|
||||
account_data,
|
||||
to_device_events,
|
||||
keys_changed,
|
||||
device_one_time_keys_count
|
||||
device_one_time_keys_count,
|
||||
unused_fallback_key_types,
|
||||
)
|
||||
}
|
||||
.boxed()
|
||||
@@ -433,8 +439,7 @@ pub(crate) async fn build_sync_events(
|
||||
account_data: assign!(GlobalAccountData::new(), { events: account_data }),
|
||||
device_lists: device_list_updates.into(),
|
||||
device_one_time_keys_count,
|
||||
// Fallback keys are not yet supported
|
||||
device_unused_fallback_key_types: None,
|
||||
device_unused_fallback_key_types: Some(unused_fallback_key_types),
|
||||
presence: assign!(Presence::new(), {
|
||||
events: presence_updates
|
||||
.into_iter()
|
||||
|
||||
@@ -80,7 +80,7 @@ pub(crate) async fn conduwuit_server_version() -> Result<impl IntoResponse> {
|
||||
///
|
||||
/// conduwuit-specific API to return the amount of users registered on this
|
||||
/// homeserver. Endpoint is disabled if federation is disabled for privacy. This
|
||||
/// only includes active users (not deactivated, no guests, etc)
|
||||
/// only includes active users (not deactivated, etc)
|
||||
pub(crate) async fn conduwuit_local_user_count(
|
||||
State(services): State<crate::State>,
|
||||
) -> Result<impl IntoResponse> {
|
||||
|
||||
@@ -381,6 +381,7 @@ async fn handle_room(
|
||||
.rooms
|
||||
.event_handler
|
||||
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
|
||||
.boxed()
|
||||
.await
|
||||
.map(|_| ());
|
||||
results.push((event_id, result));
|
||||
|
||||
@@ -70,6 +70,7 @@ conduwuit-build-metadata.workspace = true
|
||||
const-str.workspace = true
|
||||
core_affinity.workspace = true
|
||||
ctor.workspace = true
|
||||
dtor.workspace = true
|
||||
cyborgtime.workspace = true
|
||||
either.workspace = true
|
||||
figment.workspace = true
|
||||
|
||||
@@ -47,7 +47,7 @@
|
||||
const NAME_MAX: usize = 128;
|
||||
const KEY_SEGS: usize = 8;
|
||||
|
||||
#[ctor::ctor]
|
||||
#[ctor::ctor(unsafe)]
|
||||
fn _static_initialization() {
|
||||
acq_epoch().expect("pre-initialization of jemalloc failed");
|
||||
acq_epoch().expect("pre-initialization of jemalloc failed");
|
||||
|
||||
+1
-145
@@ -371,6 +371,7 @@ pub struct Config {
|
||||
pub ip_lookup_strategy: u8,
|
||||
|
||||
/// Max request size for file uploads in bytes. Defaults to 20MB.
|
||||
/// Also limits incoming federated media.
|
||||
///
|
||||
/// default: 20971520
|
||||
#[serde(default = "default_max_request_size")]
|
||||
@@ -1485,21 +1486,6 @@ pub struct Config {
|
||||
#[serde(default)]
|
||||
pub brotli_compression: bool,
|
||||
|
||||
/// Set to true to allow user type "guest" registrations. Some clients like
|
||||
/// Element attempt to register guest users automatically.
|
||||
#[serde(default)]
|
||||
pub allow_guest_registration: bool,
|
||||
|
||||
/// Set to true to log guest registrations in the admin room. Note that
|
||||
/// these may be noisy or unnecessary if you're a public homeserver.
|
||||
#[serde(default)]
|
||||
pub log_guest_registrations: bool,
|
||||
|
||||
/// Set to true to allow guest registrations/users to auto join any rooms
|
||||
/// specified in `auto_join_rooms`.
|
||||
#[serde(default)]
|
||||
pub allow_guests_auto_join_rooms: bool,
|
||||
|
||||
/// Enable the legacy unauthenticated Matrix media repository endpoints.
|
||||
/// These endpoints consist of:
|
||||
/// - /_matrix/media/*/config
|
||||
@@ -2129,10 +2115,6 @@ pub struct Config {
|
||||
#[serde(default)]
|
||||
pub allow_web_indexing: bool,
|
||||
|
||||
/// display: nested
|
||||
#[serde(default)]
|
||||
pub ldap: LdapConfig,
|
||||
|
||||
/// Configuration for antispam support
|
||||
/// display: nested
|
||||
#[serde(default)]
|
||||
@@ -2294,126 +2276,6 @@ pub fn effective_foci(&self, deprecated_foci: &[RtcFocusInfo]) -> Vec<RtcTranspo
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize)]
|
||||
#[config_example_generator(filename = "conduwuit-example.toml", section = "global.ldap")]
|
||||
pub struct LdapConfig {
|
||||
/// Whether to enable LDAP login.
|
||||
///
|
||||
/// example: "true"
|
||||
#[serde(default)]
|
||||
pub enable: bool,
|
||||
|
||||
/// Whether to force LDAP authentication or authorize classical password
|
||||
/// login.
|
||||
///
|
||||
/// example: "true"
|
||||
#[serde(default)]
|
||||
pub ldap_only: bool,
|
||||
|
||||
/// URI of the LDAP server.
|
||||
///
|
||||
/// example: "ldap://ldap.example.com:389"
|
||||
///
|
||||
/// default: ""
|
||||
#[serde(default)]
|
||||
pub uri: Option<Url>,
|
||||
|
||||
/// StartTLS for LDAP connections.
|
||||
///
|
||||
/// default: false
|
||||
#[serde(default)]
|
||||
pub use_starttls: bool,
|
||||
|
||||
/// Skip TLS certificate verification, possibly dangerous.
|
||||
///
|
||||
/// default: false
|
||||
#[serde(default)]
|
||||
pub disable_tls_verification: bool,
|
||||
|
||||
/// Root of the searches.
|
||||
///
|
||||
/// example: "ou=users,dc=example,dc=org"
|
||||
///
|
||||
/// default: ""
|
||||
#[serde(default)]
|
||||
pub base_dn: String,
|
||||
|
||||
/// Bind DN if anonymous search is not enabled.
|
||||
///
|
||||
/// You can use the variable `{username}` that will be replaced by the
|
||||
/// entered username. In such case, the password used to bind will be the
|
||||
/// one provided for the login and not the one given by
|
||||
/// `bind_password_file`. Beware: automatically granting admin rights will
|
||||
/// not work if you use this direct bind instead of a LDAP search.
|
||||
///
|
||||
/// example: "cn=ldap-reader,dc=example,dc=org" or
|
||||
/// "cn={username},ou=users,dc=example,dc=org"
|
||||
///
|
||||
/// default: ""
|
||||
#[serde(default)]
|
||||
pub bind_dn: Option<String>,
|
||||
|
||||
/// Path to a file on the system that contains the password for the
|
||||
/// `bind_dn`.
|
||||
///
|
||||
/// The server must be able to access the file, and it must not be empty.
|
||||
///
|
||||
/// default: ""
|
||||
#[serde(default)]
|
||||
pub bind_password_file: Option<PathBuf>,
|
||||
|
||||
/// Search filter to limit user searches.
|
||||
///
|
||||
/// You can use the variable `{username}` that will be replaced by the
|
||||
/// entered username for more complex filters.
|
||||
///
|
||||
/// example: "(&(objectClass=person)(memberOf=matrix))"
|
||||
///
|
||||
/// default: "(objectClass=*)"
|
||||
#[serde(default = "default_ldap_search_filter")]
|
||||
pub filter: String,
|
||||
|
||||
/// Attribute to use to uniquely identify the user.
|
||||
///
|
||||
/// example: "uid" or "cn"
|
||||
///
|
||||
/// default: "uid"
|
||||
#[serde(default = "default_ldap_uid_attribute")]
|
||||
pub uid_attribute: String,
|
||||
|
||||
/// Attribute containing the display name of the user.
|
||||
///
|
||||
/// example: "givenName" or "sn"
|
||||
///
|
||||
/// default: "givenName"
|
||||
#[serde(default = "default_ldap_name_attribute")]
|
||||
pub name_attribute: String,
|
||||
|
||||
/// Root of the searches for admin users.
|
||||
///
|
||||
/// Defaults to `base_dn` if empty.
|
||||
///
|
||||
/// example: "ou=admins,dc=example,dc=org"
|
||||
///
|
||||
/// default: ""
|
||||
#[serde(default)]
|
||||
pub admin_base_dn: String,
|
||||
|
||||
/// The LDAP search filter to find administrative users for continuwuity.
|
||||
///
|
||||
/// If left blank, administrative state must be configured manually for each
|
||||
/// user.
|
||||
///
|
||||
/// You can use the variable `{username}` that will be replaced by the
|
||||
/// entered username for more complex filters.
|
||||
///
|
||||
/// example: "(objectClass=conduwuitAdmin)" or "(uid={username})"
|
||||
///
|
||||
/// default: ""
|
||||
#[serde(default)]
|
||||
pub admin_filter: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Clone, Debug)]
|
||||
#[serde(transparent)]
|
||||
struct ListeningPort {
|
||||
@@ -2934,9 +2796,3 @@ pub(super) fn default_blurhash_x_component() -> u32 { 4 }
|
||||
pub(super) fn default_blurhash_y_component() -> u32 { 3 }
|
||||
|
||||
// end recommended & blurhashing defaults
|
||||
|
||||
fn default_ldap_search_filter() -> String { "(objectClass=*)".to_owned() }
|
||||
|
||||
fn default_ldap_uid_attribute() -> String { String::from("uid") }
|
||||
|
||||
fn default_ldap_name_attribute() -> String { String::from("givenName") }
|
||||
|
||||
+1
-1
@@ -62,7 +62,7 @@ macro_rules! debug_info {
|
||||
pub static DEBUGGER: LazyLock<bool> =
|
||||
LazyLock::new(|| env::var("_").unwrap_or_default().ends_with("gdb"));
|
||||
|
||||
#[cfg_attr(debug_assertions, ctor::ctor)]
|
||||
#[cfg_attr(debug_assertions, ctor::ctor(unsafe))]
|
||||
#[cfg_attr(not(debug_assertions), allow(dead_code))]
|
||||
fn set_panic_trap() {
|
||||
if !*DEBUGGER {
|
||||
|
||||
@@ -110,8 +110,6 @@ pub enum Error {
|
||||
InconsistentRoomState(&'static str, ruma::OwnedRoomId),
|
||||
#[error(transparent)]
|
||||
IntoHttp(#[from] ruma::api::error::IntoHttpError),
|
||||
#[error("{0}")]
|
||||
Ldap(Cow<'static, str>),
|
||||
#[error(transparent)]
|
||||
Mxc(#[from] ruma::MxcUriError),
|
||||
#[error(transparent)]
|
||||
@@ -262,7 +260,7 @@ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{real_error}")
|
||||
}
|
||||
} else {
|
||||
write!(f, "Request error: {}", &self.0)
|
||||
write!(f, "Request error: {}", self.0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -337,8 +337,7 @@ pub async fn auth_check<E, F, Fut>(
|
||||
// If the create event content has the field m.federate set to false and the
|
||||
// sender domain of the event does not match the sender domain of the create
|
||||
// event, reject.
|
||||
if !(room_version.room_id_format == RoomIdFormatVersion::V2)
|
||||
&& !room_create_content.federate
|
||||
if !room_create_content.federate
|
||||
&& room_create_event.sender().server_name() != incoming_event.sender().server_name()
|
||||
{
|
||||
warn!(
|
||||
|
||||
@@ -58,6 +58,7 @@ conduwuit-core.workspace = true
|
||||
conduwuit-macros.workspace = true
|
||||
const-str.workspace = true
|
||||
ctor.workspace = true
|
||||
dtor.workspace = true
|
||||
futures.workspace = true
|
||||
log.workspace = true
|
||||
minicbor.workspace = true
|
||||
|
||||
+8
-2
@@ -288,8 +288,14 @@ fn deserialize_option<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> {
|
||||
}
|
||||
|
||||
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
|
||||
fn deserialize_bool<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> {
|
||||
unhandled!("deserialize bool not implemented")
|
||||
fn deserialize_bool<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> {
|
||||
let byte = self
|
||||
.buf
|
||||
.get(self.pos)
|
||||
.ok_or(Self::Error::SerdeDe("bool buffer underflow".into()))?;
|
||||
self.inc_pos(1);
|
||||
|
||||
visitor.visit_bool(*byte != 0x00)
|
||||
}
|
||||
|
||||
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
|
||||
|
||||
@@ -120,6 +120,10 @@ pub(super) fn open_list(db: &Arc<Engine>, maps: &[Descriptor]) -> Result<Maps> {
|
||||
name: "onetimekeyid_onetimekeys",
|
||||
..descriptor::RANDOM_SMALL
|
||||
},
|
||||
Descriptor {
|
||||
name: "fallbackkeyid_fallbackkey",
|
||||
..descriptor::RANDOM_SMALL
|
||||
},
|
||||
Descriptor {
|
||||
name: "passwordresettoken_info",
|
||||
..descriptor::RANDOM_SMALL
|
||||
@@ -307,6 +311,11 @@ pub(super) fn open_list(db: &Arc<Engine>, maps: &[Descriptor]) -> Result<Maps> {
|
||||
key_size_hint: Some(48),
|
||||
..descriptor::RANDOM_SMALL
|
||||
},
|
||||
Descriptor {
|
||||
name: "rejectedeventids",
|
||||
key_size_hint: Some(48),
|
||||
..descriptor::RANDOM_SMALL
|
||||
},
|
||||
Descriptor {
|
||||
name: "statehash_shortstatehash",
|
||||
val_size_hint: Some(8),
|
||||
|
||||
+2
-2
@@ -297,8 +297,8 @@ fn serialize_u16(self, _v: u16) -> Result<Self::Ok> {
|
||||
|
||||
fn serialize_u8(self, v: u8) -> Result<Self::Ok> { self.write(&[v]) }
|
||||
|
||||
fn serialize_bool(self, _v: bool) -> Result<Self::Ok> {
|
||||
unhandled!("serialize bool not implemented")
|
||||
fn serialize_bool(self, v: bool) -> Result<Self::Ok> {
|
||||
if v { self.write(&[0x01]) } else { self.write(&[0x00]) }
|
||||
}
|
||||
|
||||
fn serialize_unit(self) -> Result<Self::Ok> { unhandled!("serialize unit not implemented") }
|
||||
|
||||
@@ -32,11 +32,11 @@ mod __compile_introspection {
|
||||
const CRATE_NAME: &str = #crate_name;
|
||||
|
||||
/// Register this crate's features with the global registry during static initialization
|
||||
#[::ctor::ctor]
|
||||
#[::ctor::ctor(unsafe)]
|
||||
fn register() {
|
||||
conduwuit_core::info::introspection::ENABLED_FEATURES.lock().unwrap().insert(#crate_name, &ENABLED);
|
||||
}
|
||||
#[::ctor::dtor]
|
||||
#[::dtor::dtor(unsafe)]
|
||||
fn unregister() {
|
||||
conduwuit_core::info::introspection::ENABLED_FEATURES.lock().unwrap().remove(#crate_name);
|
||||
}
|
||||
|
||||
+1
-4
@@ -55,7 +55,6 @@ standard = [
|
||||
"jemalloc",
|
||||
"jemalloc_conf",
|
||||
"journald",
|
||||
"ldap",
|
||||
"media_thumbnail",
|
||||
"systemd",
|
||||
"url_preview",
|
||||
@@ -126,9 +125,6 @@ jemalloc_stats = [
|
||||
jemalloc_conf = [
|
||||
"conduwuit-core/jemalloc_conf",
|
||||
]
|
||||
ldap = [
|
||||
"conduwuit-api/ldap",
|
||||
]
|
||||
media_thumbnail = [
|
||||
"conduwuit-service/media_thumbnail",
|
||||
]
|
||||
@@ -217,6 +213,7 @@ conduwuit-macros.workspace = true
|
||||
|
||||
clap.workspace = true
|
||||
ctor.workspace = true
|
||||
dtor.workspace = true
|
||||
console-subscriber.optional = true
|
||||
console-subscriber.workspace = true
|
||||
const-str.workspace = true
|
||||
|
||||
@@ -105,6 +105,7 @@ conduwuit-service.workspace = true
|
||||
conduwuit-web.workspace = true
|
||||
const-str.workspace = true
|
||||
ctor.workspace = true
|
||||
dtor.workspace = true
|
||||
futures.workspace = true
|
||||
http.workspace = true
|
||||
http-body-util.workspace = true
|
||||
|
||||
@@ -52,9 +52,6 @@ jemalloc_stats = [
|
||||
"conduwuit-core/jemalloc_stats",
|
||||
"conduwuit-database/jemalloc_stats",
|
||||
]
|
||||
ldap = [
|
||||
"dep:ldap3"
|
||||
]
|
||||
media_thumbnail = [
|
||||
"dep:image",
|
||||
]
|
||||
@@ -89,6 +86,7 @@ conduwuit-database.workspace = true
|
||||
conduwuit-macros.workspace = true
|
||||
const-str.workspace = true
|
||||
ctor.workspace = true
|
||||
dtor.workspace = true
|
||||
either.workspace = true
|
||||
futures.workspace = true
|
||||
governor.workspace = true
|
||||
@@ -98,8 +96,6 @@ image.workspace = true
|
||||
image.optional = true
|
||||
ipaddress.workspace = true
|
||||
itertools.workspace = true
|
||||
ldap3.workspace = true
|
||||
ldap3.optional = true
|
||||
log.workspace = true
|
||||
loole.workspace = true
|
||||
lru-cache.workspace = true
|
||||
|
||||
@@ -37,7 +37,7 @@ pub async fn create_admin_room(services: &Services) -> Result {
|
||||
|
||||
// Create a user for the server
|
||||
let server_user = services.globals.server_user.as_ref();
|
||||
services.users.create(server_user, None, None).await?;
|
||||
services.users.create(server_user, None).await?;
|
||||
|
||||
let mut create_content = {
|
||||
use RoomVersionId::*;
|
||||
|
||||
@@ -111,7 +111,7 @@ async fn start_appservice(&self, id: String, registration: Registration) -> Resu
|
||||
if !self.services.users.exists(&appservice_user_id).await {
|
||||
self.services
|
||||
.users
|
||||
.create(&appservice_user_id, None, None)
|
||||
.create(&appservice_user_id, None)
|
||||
.await?;
|
||||
} else if self
|
||||
.services
|
||||
@@ -121,10 +121,7 @@ async fn start_appservice(&self, id: String, registration: Registration) -> Resu
|
||||
.unwrap_or(false)
|
||||
{
|
||||
// Reactivate the appservice user if it was accidentally deactivated
|
||||
self.services
|
||||
.users
|
||||
.set_password(&appservice_user_id, None)
|
||||
.await?;
|
||||
self.services.users.set_password(&appservice_user_id, None);
|
||||
}
|
||||
|
||||
self.registration_info
|
||||
|
||||
@@ -9,7 +9,10 @@
|
||||
push::Ruleset,
|
||||
};
|
||||
|
||||
use crate::{Dep, account_data, config, globals, users};
|
||||
use crate::{
|
||||
Dep, account_data, config, globals,
|
||||
users::{self, HashedPassword},
|
||||
};
|
||||
|
||||
pub struct Service {
|
||||
services: Services,
|
||||
@@ -37,11 +40,6 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
}
|
||||
|
||||
async fn worker(self: Arc<Self>) -> Result {
|
||||
if self.services.config.ldap.enable {
|
||||
warn!("emergency password feature not available with LDAP enabled.");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.set_emergency_access().await.inspect_err(|e| {
|
||||
error!("Could not set the configured emergency password for the server user: {e}");
|
||||
})
|
||||
@@ -56,10 +54,15 @@ impl Service {
|
||||
async fn set_emergency_access(&self) -> Result {
|
||||
let server_user = &self.services.globals.server_user;
|
||||
|
||||
self.services
|
||||
.users
|
||||
.set_password(server_user, self.services.config.emergency_password.as_deref())
|
||||
.await?;
|
||||
self.services.users.set_password(
|
||||
server_user,
|
||||
self.services
|
||||
.config
|
||||
.emergency_password
|
||||
.as_deref()
|
||||
.map(HashedPassword::new)
|
||||
.transpose()?,
|
||||
);
|
||||
|
||||
let (ruleset, pwd_set) = match self.services.config.emergency_password {
|
||||
| Some(_) => (Ruleset::server_default(server_user), true),
|
||||
|
||||
@@ -44,7 +44,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
db,
|
||||
server: args.server.clone(),
|
||||
bad_event_ratelimiter: Arc::new(SyncRwLock::new(HashMap::new())),
|
||||
admin_alias: OwnedRoomAliasId::try_from(format!("#admins:{}", &args.server.name))
|
||||
admin_alias: OwnedRoomAliasId::try_from(format!("#admins:{}", args.server.name))
|
||||
.expect("#admins:server_name is valid alias name"),
|
||||
server_user: UserId::parse_with_server_name(
|
||||
String::from("conduit"),
|
||||
|
||||
@@ -37,7 +37,7 @@ pub struct PasswordReset<'a> {
|
||||
}
|
||||
|
||||
impl MessageTemplate for PasswordReset<'_> {
|
||||
fn subject(&self) -> String { format!("Password reset request for {}", &self.user_id) }
|
||||
fn subject(&self) -> String { format!("Password reset request for {}", self.user_id) }
|
||||
}
|
||||
|
||||
#[derive(Template)]
|
||||
|
||||
@@ -6,7 +6,10 @@
|
||||
use data::{Data, ResetTokenInfo};
|
||||
use ruma::OwnedUserId;
|
||||
|
||||
use crate::{Dep, globals, users};
|
||||
use crate::{
|
||||
Dep, globals,
|
||||
users::{self, HashedPassword},
|
||||
};
|
||||
|
||||
pub const PASSWORD_RESET_PATH: &str = "/_continuwuity/account/reset_password";
|
||||
pub const RESET_TOKEN_QUERY_PARAM: &str = "token";
|
||||
@@ -58,17 +61,6 @@ pub async fn issue_token(&self, user_id: OwnedUserId) -> Result<ValidResetToken>
|
||||
return Err!("Cannot issue a password reset token for the server user");
|
||||
}
|
||||
|
||||
if self
|
||||
.services
|
||||
.users
|
||||
.origin(&user_id)
|
||||
.await
|
||||
.unwrap_or_else(|_| "password".to_owned())
|
||||
!= "password"
|
||||
{
|
||||
return Err!("Cannot issue a password reset token for non-internal user {user_id}");
|
||||
}
|
||||
|
||||
if self.services.users.is_deactivated(&user_id).await? {
|
||||
return Err!("Cannot issue a password reset token for deactivated user {user_id}");
|
||||
}
|
||||
@@ -111,8 +103,7 @@ pub async fn consume_token(
|
||||
self.db.remove_token(&token);
|
||||
self.services
|
||||
.users
|
||||
.set_password(&info.user, Some(new_password))
|
||||
.await?;
|
||||
.set_password(&info.user, Some(HashedPassword::new(new_password)?));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -48,7 +48,7 @@ pub fn is_valid(&self) -> bool {
|
||||
|
||||
impl std::fmt::Display for DatabaseTokenInfo {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "Token created by {} and used {} times. ", &self.creator, self.uses)?;
|
||||
write!(f, "Token created by {} and used {} times. ", self.creator, self.uses)?;
|
||||
if let Some(expires) = &self.expires {
|
||||
write!(f, "{expires}.")?;
|
||||
} else {
|
||||
|
||||
@@ -35,7 +35,7 @@ pub struct ValidToken {
|
||||
|
||||
impl std::fmt::Display for ValidToken {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "`{}` --- {}", self.token, &self.source)
|
||||
write!(f, "`{}` --- {}", self.token, self.source)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,233 +1,454 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, HashSet, VecDeque, hash_map},
|
||||
cmp::min,
|
||||
collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map},
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
use conduwuit::{
|
||||
Event, PduEvent, debug, debug_warn, implement, matrix::event::gen_event_id_canonical_json,
|
||||
Err, Event, PduEvent, debug, debug_info, debug_warn, err,
|
||||
matrix::event::gen_event_id_canonical_json, state_res::lexicographical_topological_sort,
|
||||
trace, utils::continue_exponential_backoff_secs, warn,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use ruma::{
|
||||
CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
|
||||
api::federation::event::get_event,
|
||||
CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
|
||||
OwnedRoomId, OwnedServerName, RoomId, ServerName, UInt,
|
||||
api::federation::event::{get_event, get_missing_events},
|
||||
int,
|
||||
};
|
||||
|
||||
use super::get_room_version_rules;
|
||||
|
||||
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
|
||||
/// it is appended to the outliers Tree.
|
||||
/// Attempts to build a localised directed acyclic graph out of the given PDUs,
|
||||
/// returning them in a topologically sorted order.
|
||||
///
|
||||
/// Returns pdu and if we fetched it over federation the raw json.
|
||||
///
|
||||
/// a. Look in the main timeline (pduid_pdu tree)
|
||||
/// b. Look at outlier pdu tree
|
||||
/// c. Ask origin server over federation
|
||||
/// d. TODO: Ask other servers over federation?
|
||||
#[implement(super::Service)]
|
||||
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
|
||||
&self,
|
||||
origin: &'a ServerName,
|
||||
events: Events,
|
||||
create_event: &'a Pdu,
|
||||
room_id: &'a RoomId,
|
||||
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
Events: Iterator<Item = &'a EventId> + Clone + Send,
|
||||
{
|
||||
let back_off = |id| match self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.write()
|
||||
.entry(id)
|
||||
{
|
||||
| hash_map::Entry::Vacant(e) => {
|
||||
e.insert((Instant::now(), 1));
|
||||
},
|
||||
| hash_map::Entry::Occupied(mut e) => {
|
||||
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
|
||||
},
|
||||
};
|
||||
/// This is used to attempt to process PDUs in an order that respects their
|
||||
/// dependencies, however it is ultimately the sender's responsibility to send
|
||||
/// them in a processable order, so this is just a best effort attempt. It does
|
||||
/// not account for power levels or other tie breaks.
|
||||
pub async fn build_local_dag<S: std::hash::BuildHasher>(
|
||||
pdu_map: &HashMap<OwnedEventId, CanonicalJsonObject, S>,
|
||||
) -> conduwuit::Result<Vec<OwnedEventId>> {
|
||||
debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs");
|
||||
let mut dag: HashMap<OwnedEventId, HashSet<OwnedEventId>> =
|
||||
HashMap::with_capacity(pdu_map.len());
|
||||
let mut id_origin_ts: HashMap<OwnedEventId, _> = HashMap::with_capacity(pdu_map.len());
|
||||
|
||||
let mut events_with_auth_events = Vec::with_capacity(events.clone().count());
|
||||
trace!("Fetching {} outlier pdus", events.clone().count());
|
||||
for (event_id, value) in pdu_map {
|
||||
// We already checked that these properties are correct in parse_incoming_pdu,
|
||||
// so it's safe to unwrap here.
|
||||
// We also filter to remove any prev_events that are not in this pdu_map, as we
|
||||
// need to have at least one event with zero out degrees for the lexico-topo
|
||||
// sort below. If there are multiple events with omitted prevs, they will be
|
||||
// ordered by timestamp, then event ID. At that point though, it's unlikely to
|
||||
// matter.
|
||||
let prev_events = value
|
||||
.get("prev_events")
|
||||
.unwrap()
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|v| EventId::parse(v.as_str().unwrap()).unwrap())
|
||||
.filter(|id| pdu_map.contains_key(id))
|
||||
.collect();
|
||||
|
||||
for id in events {
|
||||
// a. Look in the main timeline (pduid_pdu tree)
|
||||
// b. Look at outlier pdu tree
|
||||
// (get_pdu_json checks both)
|
||||
if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await {
|
||||
trace!("Found {id} in main timeline or outlier tree");
|
||||
events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![]));
|
||||
continue;
|
||||
dag.insert(event_id.clone(), prev_events);
|
||||
let origin_server_ts = value
|
||||
.get("origin_server_ts")
|
||||
.and_then(CanonicalJsonValue::as_integer)
|
||||
.unwrap_or_default();
|
||||
id_origin_ts.insert(event_id.clone(), origin_server_ts);
|
||||
}
|
||||
|
||||
debug!(count = dag.len(), "Sorting incoming events with partial graph");
|
||||
lexicographical_topological_sort(&dag, &async |node_id| {
|
||||
// Note: we don't bother fetching power levels because that would massively slow
|
||||
// this function down. This is a best-effort attempt to order events correctly
|
||||
// for processing, however ultimately that should be the sender's job.
|
||||
let ts = id_origin_ts
|
||||
.get(&node_id)
|
||||
.copied()
|
||||
.unwrap_or_else(|| int!(0))
|
||||
.to_string()
|
||||
.parse::<u64>()
|
||||
.ok()
|
||||
.and_then(UInt::new)
|
||||
.unwrap_or_default();
|
||||
Ok((int!(0), MilliSecondsSinceUnixEpoch(ts)))
|
||||
})
|
||||
.await
|
||||
.inspect(|sorted| {
|
||||
debug_assert_eq!(
|
||||
sorted.len(),
|
||||
pdu_map.len(),
|
||||
"Sorted graph was not the same size as the input graph"
|
||||
);
|
||||
})
|
||||
.map_err(|e| err!("failed to resolve local graph: {e}"))
|
||||
}
|
||||
|
||||
impl super::Service {
|
||||
/// Uses `/_matrix/federation/v1/get_missing_events` to fill gaps in the
|
||||
/// DAG.
|
||||
///
|
||||
/// When this function is called, the "earliest events" (current forward
|
||||
/// extremities) will be collected, and the function will loop with an
|
||||
/// exponentially incrementing limit (up to 100 per request) until it has
|
||||
/// filled the gap, i.e. when the remote says there's no more events.
|
||||
///
|
||||
/// This function does not persist the events. The caller is responsible for
|
||||
/// passing them through handle_incoming_pdu.
|
||||
pub(super) async fn backfill_missing_events(
|
||||
&self,
|
||||
room_id: OwnedRoomId,
|
||||
head: Vec<OwnedEventId>,
|
||||
via: OwnedServerName,
|
||||
) -> conduwuit::Result<HashMap<OwnedEventId, PduEvent>> {
|
||||
if head.is_empty() {
|
||||
return Ok(HashMap::new());
|
||||
}
|
||||
let tail = self
|
||||
.services
|
||||
.state
|
||||
.get_forward_extremities(&room_id)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
// TODO: min_depth is probably necessary to avoid fetching the entire room
|
||||
// history if there are very long gaps
|
||||
let mut latest_events: HashSet<OwnedEventId> = HashSet::from_iter(head.clone());
|
||||
let mut loop_count: u64 = 3;
|
||||
// Start with a base number of 3 so that we fetch 10, 16, 25, 36, etc
|
||||
// instead of 1, 2, 4, 9, so on.
|
||||
let mut backfilled_events = HashMap::with_capacity(10);
|
||||
|
||||
// c. Ask origin server over federation
|
||||
// We also handle its auth chain here so we don't get a stack overflow in
|
||||
// handle_outlier_pdu.
|
||||
let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into();
|
||||
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
|
||||
while !latest_events.is_empty() {
|
||||
let todo: Vec<OwnedEventId> = latest_events.clone().into_iter().collect();
|
||||
let mut request =
|
||||
get_missing_events::v1::Request::new(room_id.clone(), tail.clone(), todo.clone());
|
||||
let limit = min(loop_count.saturating_pow(2), 100);
|
||||
request.limit = limit
|
||||
.try_into()
|
||||
.expect("limit cannot be greater than 100, which fits into UInt");
|
||||
|
||||
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
|
||||
while let Some(next_id) = todo_auth_events.pop_front() {
|
||||
if let Some((time, tries)) = self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.get(&*next_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 60 * 2;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 8;
|
||||
if continue_exponential_backoff_secs(
|
||||
MIN_DURATION,
|
||||
MAX_DURATION,
|
||||
time.elapsed(),
|
||||
*tries,
|
||||
) {
|
||||
debug_warn!(
|
||||
tried = ?*tries,
|
||||
elapsed = ?time.elapsed(),
|
||||
"Backing off from {next_id}",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if events_all.contains(&next_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if self.services.timeline.pdu_exists(&next_id).await {
|
||||
trace!("Found {next_id} in db");
|
||||
continue;
|
||||
}
|
||||
|
||||
debug!("Fetching {next_id} over federation from {origin}.");
|
||||
match self
|
||||
debug_info!(
|
||||
backfilled=%backfilled_events.len(),
|
||||
%loop_count,
|
||||
"Asking {via} for up to {limit} missing events",
|
||||
);
|
||||
trace!(
|
||||
?latest_events,
|
||||
?tail,
|
||||
%via,
|
||||
%limit,
|
||||
"Requesting missing events"
|
||||
);
|
||||
let response: get_missing_events::v1::Response = self
|
||||
.services
|
||||
.sending
|
||||
.send_federation_request(
|
||||
origin,
|
||||
get_event::v1::Request::new((*next_id).to_owned()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
| Ok(res) => {
|
||||
debug!("Got {next_id} over federation from {origin}");
|
||||
let Ok(room_version_rules) = get_room_version_rules(create_event) else {
|
||||
back_off((*next_id).to_owned());
|
||||
continue;
|
||||
};
|
||||
.send_federation_request(&via, request)
|
||||
.await?;
|
||||
loop_count = loop_count.saturating_add(1);
|
||||
|
||||
let Ok((calculated_event_id, value)) =
|
||||
gen_event_id_canonical_json(&res.pdu, &room_version_rules)
|
||||
else {
|
||||
back_off((*next_id).to_owned());
|
||||
continue;
|
||||
};
|
||||
|
||||
if calculated_event_id != *next_id {
|
||||
warn!(
|
||||
"Server didn't return event id we requested: requested: {next_id}, \
|
||||
we got {calculated_event_id}. Event: {:?}",
|
||||
&res.pdu
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(auth_events) = value
|
||||
.get("auth_events")
|
||||
.and_then(CanonicalJsonValue::as_array)
|
||||
// Some buggy servers (including old continuwuity) may return the same events
|
||||
// multiple times, which can cause this to be an infinite loop.
|
||||
// In order to break this loop, if we see no new events from this response (i.e.
|
||||
// all events in the response are already in backfilled_events), we stop,
|
||||
// with a warning.
|
||||
let mut unseen: usize = 0;
|
||||
let chunk_len = response.events.len();
|
||||
if response.events.is_empty() {
|
||||
debug_info!("No more missing events found");
|
||||
break;
|
||||
}
|
||||
for event in response.events {
|
||||
trace!("Parsing incoming event from backfill");
|
||||
let (incoming_room_id, event_id, pdu_json) =
|
||||
self.parse_incoming_pdu(&event).await.map_err(|e| {
|
||||
err!(BadServerResponse("{via} returned an invalid event: {e:?}"))
|
||||
})?;
|
||||
trace!(%incoming_room_id, %event_id, "Parsed incoming event from backfill");
|
||||
if incoming_room_id != room_id {
|
||||
return Err!(BadServerResponse(
|
||||
"{via} returned {event_id} in missing events which belongs to \
|
||||
{incoming_room_id}, not {room_id}"
|
||||
));
|
||||
}
|
||||
latest_events.remove(&event_id);
|
||||
if head.contains(&event_id) || tail.contains(&event_id) {
|
||||
debug!("Skipping known event {event_id}");
|
||||
continue;
|
||||
}
|
||||
if backfilled_events.contains_key(&event_id) {
|
||||
debug_warn!(%via, %event_id, "Remote retransmitted event");
|
||||
continue;
|
||||
}
|
||||
// TODO: Should this be scoped to the GME session? We might end up incorrectly
|
||||
// assuming we've caught up if we do this
|
||||
if let Ok(pdu) = self.services.timeline.get_pdu(&event_id).await {
|
||||
debug!(%via, %event_id, "Already seen event in database");
|
||||
backfilled_events.insert(event_id.clone(), pdu);
|
||||
} else {
|
||||
unseen = unseen.saturating_add(1);
|
||||
}
|
||||
let parsed = PduEvent::from_id_val(&event_id, pdu_json)
|
||||
.map_err(|e| err!(BadServerResponse("Unable to parse {event_id}: {e}")))?;
|
||||
for prev_event_id in parsed.prev_events() {
|
||||
// Verify that we have all of this event's prev_events. If we don't, add it to
|
||||
// the work queue
|
||||
if !(backfilled_events.contains_key(prev_event_id)
|
||||
|| self.services.timeline.pdu_exists(prev_event_id).await)
|
||||
{
|
||||
for auth_event in auth_events {
|
||||
match serde_json::from_value::<OwnedEventId>(
|
||||
auth_event.clone().into(),
|
||||
) {
|
||||
| Ok(auth_event) => {
|
||||
trace!(
|
||||
"Found auth event id {auth_event} for event {next_id}"
|
||||
);
|
||||
todo_auth_events.push_back(auth_event);
|
||||
},
|
||||
| _ => {
|
||||
warn!("Auth event id is not valid");
|
||||
},
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("Auth event list invalid");
|
||||
latest_events.insert(prev_event_id.to_owned());
|
||||
break;
|
||||
}
|
||||
|
||||
events_in_reverse_order.push((next_id.clone(), value));
|
||||
events_all.insert(next_id);
|
||||
},
|
||||
| Err(e) => {
|
||||
warn!("Failed to fetch auth event {next_id} from {origin}: {e}");
|
||||
back_off((*next_id).to_owned());
|
||||
},
|
||||
continue;
|
||||
}
|
||||
backfilled_events.insert(event_id.clone(), parsed);
|
||||
}
|
||||
for event_id in todo {
|
||||
latest_events.remove(&event_id);
|
||||
if let hash_map::Entry::Vacant(e) = backfilled_events.entry(event_id.clone()) {
|
||||
let evt = self.services.timeline.get_pdu(&event_id).await?;
|
||||
e.insert(evt);
|
||||
}
|
||||
}
|
||||
debug!(
|
||||
count=%chunk_len,
|
||||
new=%unseen,
|
||||
remaining=%latest_events.len(),
|
||||
"Got missing events"
|
||||
);
|
||||
if unseen == 0 {
|
||||
debug_warn!("Didn't see any new events, breaking cycle");
|
||||
break;
|
||||
} else if chunk_len < usize::try_from(limit)? {
|
||||
debug!(
|
||||
"Got less than the limit number of events, assuming there's no more to fetch"
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order));
|
||||
debug_info!("Successfully fetched {} missing events from {via}", backfilled_events.len());
|
||||
trace!("Missing_events: {backfilled_events:?}");
|
||||
Ok(backfilled_events)
|
||||
}
|
||||
|
||||
let mut pdus = Vec::with_capacity(events_with_auth_events.len());
|
||||
for (id, local_pdu, events_in_reverse_order) in events_with_auth_events {
|
||||
// a. Look in the main timeline (pduid_pdu tree)
|
||||
// b. Look at outlier pdu tree
|
||||
// (get_pdu_json checks both)
|
||||
if let Some(local_pdu) = local_pdu {
|
||||
trace!("Found {id} in main timeline or outlier tree");
|
||||
pdus.push((local_pdu.clone(), None));
|
||||
}
|
||||
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
|
||||
/// it is appended to the outliers Tree.
|
||||
///
|
||||
/// Returns pdu and if we fetched it over federation the raw json.
|
||||
///
|
||||
/// a. Look in the main timeline (pduid_pdu tree)
|
||||
/// b. Look at outlier pdu tree
|
||||
/// c. Ask origin server over federation
|
||||
/// d. TODO: Ask other servers over federation?
|
||||
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
|
||||
&self,
|
||||
origin: &'a ServerName,
|
||||
events: Events,
|
||||
create_event: &'a Pdu,
|
||||
room_id: &'a RoomId,
|
||||
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
Events: Iterator<Item = &'a EventId> + Clone + Send,
|
||||
{
|
||||
let back_off = |id| match self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.write()
|
||||
.entry(id)
|
||||
{
|
||||
| hash_map::Entry::Vacant(e) => {
|
||||
e.insert((Instant::now(), 1));
|
||||
},
|
||||
| hash_map::Entry::Occupied(mut e) => {
|
||||
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
|
||||
},
|
||||
};
|
||||
|
||||
for (next_id, value) in events_in_reverse_order.into_iter().rev() {
|
||||
if let Some((time, tries)) = self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.get(&*next_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 5 * 60;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 24;
|
||||
if continue_exponential_backoff_secs(
|
||||
MIN_DURATION,
|
||||
MAX_DURATION,
|
||||
time.elapsed(),
|
||||
*tries,
|
||||
) {
|
||||
debug!("Backing off from {next_id}");
|
||||
let mut events_with_auth_events = Vec::with_capacity(events.clone().count());
|
||||
trace!("Fetching {} outlier pdus", events.clone().count());
|
||||
|
||||
for id in events {
|
||||
// a. Look in the main timeline (pduid_pdu tree)
|
||||
// b. Look at outlier pdu tree
|
||||
// (get_pdu_json checks both)
|
||||
if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await {
|
||||
trace!("Found {id} in main timeline or outlier tree");
|
||||
events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![]));
|
||||
continue;
|
||||
}
|
||||
|
||||
// c. Ask origin server over federation
|
||||
// We also handle its auth chain here so we don't get a stack overflow in
|
||||
// handle_outlier_pdu.
|
||||
let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into();
|
||||
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
|
||||
|
||||
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
|
||||
while let Some(next_id) = todo_auth_events.pop_front() {
|
||||
if let Some((time, tries)) = self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.get(&*next_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 60 * 2;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 8;
|
||||
if continue_exponential_backoff_secs(
|
||||
MIN_DURATION,
|
||||
MAX_DURATION,
|
||||
time.elapsed(),
|
||||
*tries,
|
||||
) {
|
||||
debug_warn!(
|
||||
tried = ?*tries,
|
||||
elapsed = ?time.elapsed(),
|
||||
"Backing off from {next_id}",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if events_all.contains(&next_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if self.services.timeline.pdu_exists(&next_id).await {
|
||||
trace!("Found {next_id} in db");
|
||||
continue;
|
||||
}
|
||||
|
||||
debug!("Fetching {next_id} over federation from {origin}.");
|
||||
match self
|
||||
.services
|
||||
.sending
|
||||
.send_federation_request(
|
||||
origin,
|
||||
get_event::v1::Request::new((*next_id).to_owned()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
| Ok(res) => {
|
||||
debug!("Got {next_id} over federation from {origin}");
|
||||
let Ok(room_version_rules) = get_room_version_rules(create_event) else {
|
||||
back_off((*next_id).to_owned());
|
||||
continue;
|
||||
};
|
||||
|
||||
let Ok((calculated_event_id, value)) =
|
||||
gen_event_id_canonical_json(&res.pdu, &room_version_rules)
|
||||
else {
|
||||
back_off((*next_id).to_owned());
|
||||
continue;
|
||||
};
|
||||
|
||||
if calculated_event_id != *next_id {
|
||||
warn!(
|
||||
"Server didn't return event id we requested: requested: \
|
||||
{next_id}, we got {calculated_event_id}. Event: {:?}",
|
||||
&res.pdu
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(auth_events) = value
|
||||
.get("auth_events")
|
||||
.and_then(CanonicalJsonValue::as_array)
|
||||
{
|
||||
for auth_event in auth_events {
|
||||
match serde_json::from_value::<OwnedEventId>(
|
||||
auth_event.clone().into(),
|
||||
) {
|
||||
| Ok(auth_event) => {
|
||||
trace!(
|
||||
"Found auth event id {auth_event} for event \
|
||||
{next_id}"
|
||||
);
|
||||
todo_auth_events.push_back(auth_event);
|
||||
},
|
||||
| _ => {
|
||||
warn!("Auth event id is not valid");
|
||||
},
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("Auth event list invalid");
|
||||
}
|
||||
|
||||
events_in_reverse_order.push((next_id.clone(), value));
|
||||
events_all.insert(next_id);
|
||||
},
|
||||
| Err(e) => {
|
||||
warn!("Failed to fetch auth event {next_id} from {origin}: {e}");
|
||||
back_off((*next_id).to_owned());
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
trace!("Handling outlier {next_id}");
|
||||
match Box::pin(self.handle_outlier_pdu(
|
||||
origin,
|
||||
create_event,
|
||||
&next_id,
|
||||
room_id,
|
||||
value.clone(),
|
||||
true,
|
||||
))
|
||||
.await
|
||||
{
|
||||
| Ok((pdu, json)) =>
|
||||
if next_id == *id {
|
||||
trace!("Handled outlier {next_id} (original request)");
|
||||
pdus.push((pdu, Some(json)));
|
||||
events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order));
|
||||
}
|
||||
|
||||
let mut pdus = Vec::with_capacity(events_with_auth_events.len());
|
||||
for (id, local_pdu, events_in_reverse_order) in events_with_auth_events {
|
||||
// a. Look in the main timeline (pduid_pdu tree)
|
||||
// b. Look at outlier pdu tree
|
||||
// (get_pdu_json checks both)
|
||||
if let Some(local_pdu) = local_pdu {
|
||||
trace!("Found {id} in main timeline or outlier tree");
|
||||
pdus.push((local_pdu.clone(), None));
|
||||
}
|
||||
|
||||
for (next_id, value) in events_in_reverse_order.into_iter().rev() {
|
||||
if let Some((time, tries)) = self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.get(&*next_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 5 * 60;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 24;
|
||||
if continue_exponential_backoff_secs(
|
||||
MIN_DURATION,
|
||||
MAX_DURATION,
|
||||
time.elapsed(),
|
||||
*tries,
|
||||
) {
|
||||
debug!("Backing off from {next_id}");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
trace!("Handling outlier {next_id}");
|
||||
match Box::pin(self.handle_outlier_pdu(
|
||||
origin,
|
||||
create_event,
|
||||
&next_id,
|
||||
room_id,
|
||||
value.clone(),
|
||||
true,
|
||||
))
|
||||
.await
|
||||
{
|
||||
| Ok((pdu, json)) =>
|
||||
if next_id == *id {
|
||||
trace!("Handled outlier {next_id} (original request)");
|
||||
pdus.push((pdu, Some(json)));
|
||||
},
|
||||
| Err(e) => {
|
||||
warn!("Authentication of event {next_id} failed: {e:?}");
|
||||
back_off(next_id);
|
||||
},
|
||||
| Err(e) => {
|
||||
warn!("Authentication of event {next_id} failed: {e:?}");
|
||||
back_off(next_id);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!("Fetched and handled {} outlier pdus", pdus.len());
|
||||
pdus
|
||||
}
|
||||
trace!("Fetched and handled {} outlier pdus", pdus.len());
|
||||
pdus
|
||||
}
|
||||
|
||||
@@ -1,128 +1,118 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap, HashSet, VecDeque},
|
||||
iter::once,
|
||||
};
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
|
||||
use conduwuit::{
|
||||
Event, PduEvent, Result, debug_warn, err, implement,
|
||||
state_res::{self},
|
||||
};
|
||||
use futures::{FutureExt, future};
|
||||
use ruma::{
|
||||
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
|
||||
int, uint,
|
||||
};
|
||||
use conduwuit::{Event, PduEvent, debug, debug_info, error, trace};
|
||||
use ruma::{OwnedEventId, RoomId, ServerName};
|
||||
|
||||
use super::check_room_id;
|
||||
use crate::rooms::event_handler::build_local_dag;
|
||||
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(
|
||||
level = "debug",
|
||||
skip_all,
|
||||
fields(%origin),
|
||||
)]
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub(super) async fn fetch_prev<'a, Pdu, Events>(
|
||||
&self,
|
||||
origin: &ServerName,
|
||||
create_event: &Pdu,
|
||||
room_id: &RoomId,
|
||||
first_ts_in_room: MilliSecondsSinceUnixEpoch,
|
||||
initial_set: Events,
|
||||
) -> Result<(
|
||||
Vec<OwnedEventId>,
|
||||
HashMap<OwnedEventId, (PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
|
||||
)>
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
Events: Iterator<Item = &'a EventId> + Clone + Send,
|
||||
{
|
||||
let num_ids = initial_set.clone().count();
|
||||
let mut eventid_info = HashMap::new();
|
||||
let mut graph: HashMap<OwnedEventId, _> = HashMap::with_capacity(num_ids);
|
||||
let mut todo_outlier_stack: VecDeque<OwnedEventId> =
|
||||
initial_set.map(ToOwned::to_owned).collect();
|
||||
impl super::Service {
|
||||
pub(super) async fn fetch_prevs(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
create_event: &PduEvent,
|
||||
incoming_pdu: &PduEvent,
|
||||
origin: &ServerName,
|
||||
) -> conduwuit::Result<()> {
|
||||
let mut queue: VecDeque<OwnedEventId> = VecDeque::new();
|
||||
queue.push_back(incoming_pdu.event_id().to_owned());
|
||||
|
||||
let mut amount = 0;
|
||||
|
||||
while let Some(prev_event_id) = todo_outlier_stack.pop_front() {
|
||||
self.services.server.check_running()?;
|
||||
|
||||
match self
|
||||
.fetch_and_handle_outliers(
|
||||
origin,
|
||||
once(prev_event_id.as_ref()),
|
||||
create_event,
|
||||
room_id,
|
||||
)
|
||||
.boxed()
|
||||
.await
|
||||
.pop()
|
||||
{
|
||||
| Some((pdu, mut json_opt)) => {
|
||||
check_room_id(room_id, &pdu)?;
|
||||
|
||||
let limit = self.services.server.config.max_fetch_prev_events;
|
||||
if amount > limit {
|
||||
debug_warn!("Max prev event limit reached! Limit: {limit}");
|
||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||
continue;
|
||||
}
|
||||
|
||||
if json_opt.is_none() {
|
||||
json_opt = self
|
||||
.services
|
||||
.outlier
|
||||
.get_outlier_pdu_json(&prev_event_id)
|
||||
.await
|
||||
.ok();
|
||||
}
|
||||
|
||||
if let Some(json) = json_opt {
|
||||
if pdu.origin_server_ts() > first_ts_in_room {
|
||||
amount = amount.saturating_add(1);
|
||||
for prev_prev in pdu.prev_events() {
|
||||
if !graph.contains_key(prev_prev) {
|
||||
todo_outlier_stack.push_back(prev_prev.to_owned());
|
||||
}
|
||||
}
|
||||
|
||||
graph.insert(
|
||||
prev_event_id.clone(),
|
||||
pdu.prev_events().map(ToOwned::to_owned).collect(),
|
||||
);
|
||||
} else {
|
||||
// Time based check failed
|
||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||
}
|
||||
|
||||
eventid_info.insert(prev_event_id.clone(), (pdu, json));
|
||||
while let Some(event_id) = queue.pop_front() {
|
||||
debug!(event_id=%incoming_pdu.event_id, "Fetching any missing prev_events");
|
||||
let mut missing_prev_events: HashSet<OwnedEventId> =
|
||||
incoming_pdu.prev_events().map(ToOwned::to_owned).collect();
|
||||
for pid in missing_prev_events.clone() {
|
||||
if self.services.timeline.pdu_exists(&pid).await {
|
||||
trace!("Found prev event {pid} for outlier event {event_id} locally");
|
||||
missing_prev_events.remove(&pid);
|
||||
} else {
|
||||
// Get json failed, so this was not fetched over federation
|
||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||
debug_info!(
|
||||
"Could not find prev event {pid} for outlier event {event_id} locally, \
|
||||
will fetch over federation"
|
||||
);
|
||||
}
|
||||
},
|
||||
| _ => {
|
||||
// Fetch and handle failed
|
||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||
},
|
||||
}
|
||||
if !missing_prev_events.is_empty() {
|
||||
debug_info!(
|
||||
"Fetching {} missing prev events for outlier event {event_id}",
|
||||
missing_prev_events.len()
|
||||
);
|
||||
let backfilled = self
|
||||
.backfill_missing_events(
|
||||
room_id.to_owned(),
|
||||
vec![event_id.clone()],
|
||||
origin.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
debug_info!("Fetched {} missing events for {event_id}", backfilled.len());
|
||||
let mapped = backfilled
|
||||
.iter()
|
||||
.map(|(eid, evt)| {
|
||||
let mut obj = evt.to_canonical_object();
|
||||
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
|
||||
(eid.clone(), obj)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
let local_dag = if mapped.len() == 1 {
|
||||
mapped.keys().map(ToOwned::to_owned).collect()
|
||||
} else {
|
||||
build_local_dag(&mapped).await?
|
||||
};
|
||||
debug_info!("Preparing to handle {} missing events", backfilled.len());
|
||||
for prev_event_id in local_dag {
|
||||
let obj = mapped
|
||||
.get(&prev_event_id)
|
||||
.expect("We should have this event in memory");
|
||||
debug_info!("Handling prev event {prev_event_id}");
|
||||
match self
|
||||
.handle_outlier_pdu(
|
||||
origin,
|
||||
create_event,
|
||||
&prev_event_id,
|
||||
room_id,
|
||||
obj.clone(),
|
||||
false,
|
||||
)
|
||||
.await
|
||||
{
|
||||
| Ok(_) => {
|
||||
debug!("Successfully handled {prev_event_id} as an outlier");
|
||||
missing_prev_events.remove(&prev_event_id);
|
||||
},
|
||||
| Err(e) =>
|
||||
error!(error=?e, %prev_event_id, %event_id, "Failed to handle prev event"),
|
||||
}
|
||||
debug_info!("Finished handling prev");
|
||||
}
|
||||
}
|
||||
let outlier = self.services.timeline.get_pdu(&event_id).await;
|
||||
if missing_prev_events.is_empty()
|
||||
&& let Ok(pdu) = outlier
|
||||
{
|
||||
// promote any prevs first
|
||||
for prev_event_id in pdu.prev_events() {
|
||||
debug_info!("Promoting prev event {prev_event_id} to timeline");
|
||||
let prev_pdu = self.services.timeline.get_pdu(&event_id).await?;
|
||||
let val = prev_pdu.to_canonical_object();
|
||||
self.upgrade_outlier_to_timeline_pdu(
|
||||
prev_pdu,
|
||||
val,
|
||||
create_event,
|
||||
origin,
|
||||
room_id,
|
||||
)
|
||||
.await?;
|
||||
debug_info!("Finished prev promoting {prev_event_id} to timeline");
|
||||
}
|
||||
debug_info!("Promoting event {event_id} to timeline");
|
||||
let val = pdu.to_canonical_object();
|
||||
self.upgrade_outlier_to_timeline_pdu(pdu, val, create_event, origin, room_id)
|
||||
.await?;
|
||||
debug_info!("Finished promoting {event_id} to timeline");
|
||||
} else {
|
||||
debug!(?missing_prev_events, ok=%outlier.is_ok(), "Not promoting {event_id}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let event_fetch = |event_id| {
|
||||
let origin_server_ts = eventid_info
|
||||
.get(&event_id)
|
||||
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get());
|
||||
|
||||
// This return value is the key used for sorting events,
|
||||
// events are then sorted by power level, time,
|
||||
// and lexically by event_id.
|
||||
future::ok((int!(0), MilliSecondsSinceUnixEpoch(origin_server_ts)))
|
||||
};
|
||||
|
||||
let sorted = state_res::lexicographical_topological_sort(&graph, &event_fetch)
|
||||
.await
|
||||
.map_err(|e| err!(Database(error!("Error sorting prev events: {e}"))))?;
|
||||
|
||||
Ok((sorted, eventid_info))
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::collections::{HashMap, hash_map};
|
||||
|
||||
use conduwuit::{Err, Event, Result, debug, debug_warn, err, implement};
|
||||
use futures::FutureExt;
|
||||
use ruma::{
|
||||
EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids,
|
||||
events::StateEventType,
|
||||
@@ -42,7 +41,6 @@ pub(super) async fn fetch_state<Pdu>(
|
||||
let state_ids = res.pdu_ids.iter().map(AsRef::as_ref);
|
||||
let state_vec = self
|
||||
.fetch_and_handle_outliers(origin, state_ids, create_event, room_id)
|
||||
.boxed()
|
||||
.await;
|
||||
|
||||
let mut state: HashMap<ShortStateKey, OwnedEventId> = HashMap::with_capacity(state_vec.len());
|
||||
|
||||
@@ -1,14 +1,11 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, hash_map},
|
||||
time::Instant,
|
||||
};
|
||||
use std::{collections::BTreeMap, time::Instant};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, debug_error, debug_info, defer, err,
|
||||
implement, info, trace, utils::stream::IterStream, warn,
|
||||
implement, info, trace, warn,
|
||||
};
|
||||
use futures::{
|
||||
FutureExt, TryFutureExt, TryStreamExt,
|
||||
FutureExt,
|
||||
future::{OptionFuture, try_join4},
|
||||
};
|
||||
use ruma::{
|
||||
@@ -215,72 +212,6 @@ pub async fn handle_incoming_pdu<'a>(
|
||||
.get_room_create_event(room_id)
|
||||
.await;
|
||||
|
||||
let (incoming_pdu, val) = self
|
||||
.handle_outlier_pdu(origin, create_event, event_id, room_id, value, false)
|
||||
.await?;
|
||||
|
||||
// 8. if not timeline event: stop
|
||||
if !is_timeline_event {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Skip old events
|
||||
let first_ts_in_room = self
|
||||
.services
|
||||
.timeline
|
||||
.first_pdu_in_room(room_id)
|
||||
.await?
|
||||
.origin_server_ts();
|
||||
|
||||
// 9. Fetch any missing prev events doing all checks listed here starting at 1.
|
||||
// These are timeline events
|
||||
let (sorted_prev_events, mut eventid_info) = self
|
||||
.fetch_prev(origin, create_event, room_id, first_ts_in_room, incoming_pdu.prev_events())
|
||||
.await?;
|
||||
|
||||
debug!(
|
||||
events = ?sorted_prev_events,
|
||||
"Handling previous events"
|
||||
);
|
||||
|
||||
sorted_prev_events
|
||||
.iter()
|
||||
.try_stream()
|
||||
.map_ok(AsRef::as_ref)
|
||||
.try_for_each(|prev_id| {
|
||||
self.handle_prev_pdu(
|
||||
origin,
|
||||
event_id,
|
||||
room_id,
|
||||
eventid_info.remove(prev_id),
|
||||
create_event,
|
||||
first_ts_in_room,
|
||||
prev_id,
|
||||
)
|
||||
.inspect_err(move |e| {
|
||||
warn!("Prev {prev_id} failed: {e}");
|
||||
match self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.write()
|
||||
.entry(prev_id.into())
|
||||
{
|
||||
| hash_map::Entry::Vacant(e) => {
|
||||
e.insert((Instant::now(), 1));
|
||||
},
|
||||
| hash_map::Entry::Occupied(mut e) => {
|
||||
let tries = e.get().1.saturating_add(1);
|
||||
*e.get_mut() = (Instant::now(), tries);
|
||||
},
|
||||
}
|
||||
})
|
||||
.map(|_| self.services.server.check_running())
|
||||
})
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
// Done with prev events, now handling the incoming event
|
||||
let start_time = Instant::now();
|
||||
self.federation_handletime
|
||||
.write()
|
||||
@@ -292,7 +223,31 @@ pub async fn handle_incoming_pdu<'a>(
|
||||
.remove(room_id);
|
||||
}};
|
||||
|
||||
let (incoming_pdu, val) = self
|
||||
.handle_outlier_pdu(origin, create_event, event_id, room_id, value, false)
|
||||
.await?;
|
||||
|
||||
// 8. if not timeline event: stop
|
||||
if !is_timeline_event {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Skip old events
|
||||
// let first_ts_in_room = self
|
||||
// .services
|
||||
// .timeline
|
||||
// .first_pdu_in_room(room_id)
|
||||
// .await?
|
||||
// .origin_server_ts();
|
||||
|
||||
// 9. Fetch any missing prev events doing all checks listed here starting at 1.
|
||||
// These are timeline events
|
||||
debug!("Handling previous events");
|
||||
|
||||
self.fetch_prevs(room_id, create_event, &incoming_pdu, origin)
|
||||
.await?;
|
||||
|
||||
// Done with prev events, now handling the incoming event
|
||||
self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
|
||||
.boxed()
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::collections::{BTreeMap, HashMap, hash_map};
|
||||
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res,
|
||||
@@ -10,7 +10,7 @@
|
||||
events::StateEventType,
|
||||
};
|
||||
|
||||
use super::{check_room_id, get_room_version_rules};
|
||||
use super::{build_local_dag, check_room_id, get_room_version_rules};
|
||||
use crate::rooms::timeline::pdu_fits;
|
||||
|
||||
#[implement(super::Service)]
|
||||
@@ -22,7 +22,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
||||
event_id: &'a EventId,
|
||||
room_id: &'a RoomId,
|
||||
mut value: CanonicalJsonObject,
|
||||
auth_events_known: bool,
|
||||
_auth_events_known: bool,
|
||||
) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
@@ -100,42 +100,71 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
||||
}
|
||||
|
||||
// Fetch any missing ones & reject invalid ones
|
||||
let missing_auth_events = if auth_events_known {
|
||||
pdu_event
|
||||
.auth_events()
|
||||
.filter(|id| !auth_events.contains_key(*id))
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
pdu_event.auth_events().collect::<Vec<_>>()
|
||||
};
|
||||
if !missing_auth_events.is_empty() || !auth_events_known {
|
||||
let mut missing_auth_events: HashSet<OwnedEventId> = pdu_event
|
||||
.auth_events()
|
||||
.filter(|id| !auth_events.contains_key(*id))
|
||||
.map(ToOwned::to_owned)
|
||||
.collect();
|
||||
|
||||
if !missing_auth_events.is_empty() {
|
||||
debug_info!(
|
||||
"Fetching {} missing auth events for outlier event {event_id}",
|
||||
missing_auth_events.len()
|
||||
);
|
||||
for (pdu, _) in self
|
||||
.fetch_and_handle_outliers(
|
||||
origin,
|
||||
missing_auth_events.iter().copied(),
|
||||
create_event,
|
||||
room_id,
|
||||
let backfilled = self
|
||||
.backfill_missing_events(
|
||||
room_id.to_owned(),
|
||||
vec![event_id.to_owned()],
|
||||
origin.to_owned(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
auth_events.insert(pdu.event_id().to_owned(), pdu);
|
||||
.await?;
|
||||
debug_info!("Fetched {} missing auth events for {event_id}", backfilled.len());
|
||||
let mapped = backfilled
|
||||
.iter()
|
||||
.map(|(eid, evt)| {
|
||||
let mut obj = evt.to_canonical_object();
|
||||
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
|
||||
(eid.clone(), obj)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
let local_dag = if mapped.len() == 1 {
|
||||
mapped.keys().map(ToOwned::to_owned).collect()
|
||||
} else {
|
||||
build_local_dag(&mapped).await?
|
||||
};
|
||||
debug_info!("Preparing to handle {} missing auth events", backfilled.len());
|
||||
for prev_event_id in local_dag {
|
||||
let obj = mapped
|
||||
.get(&prev_event_id)
|
||||
.expect("We should have this event in memory");
|
||||
debug_info!("Handling prev {prev_event_id}");
|
||||
let (prev, _) = Box::pin(self.handle_outlier_pdu(
|
||||
origin,
|
||||
create_event,
|
||||
&prev_event_id,
|
||||
room_id,
|
||||
obj.clone(),
|
||||
false,
|
||||
))
|
||||
.await?;
|
||||
if missing_auth_events.contains(&*prev_event_id) {
|
||||
missing_auth_events.remove(&prev_event_id);
|
||||
auth_events.insert(prev_event_id, prev);
|
||||
}
|
||||
debug_info!("Finished handling prev auth event");
|
||||
}
|
||||
} else {
|
||||
debug!("No missing auth events for outlier event {event_id}");
|
||||
}
|
||||
// reject if we are still missing some
|
||||
let still_missing = pdu_event
|
||||
.auth_events()
|
||||
.filter(|id| !auth_events.contains_key(*id))
|
||||
.collect::<Vec<_>>();
|
||||
if !still_missing.is_empty() {
|
||||
// reject if we are still missing some auth events.
|
||||
// If we're still missing prev events, we will fetch them individually later,
|
||||
// but there's no reason for us to be missing auth events now we've gapfilled
|
||||
// the DAG.
|
||||
if !missing_auth_events.is_empty() {
|
||||
// Don't reject: this could be a temporary condition
|
||||
return Err!(Request(InvalidParam(
|
||||
"Could not fetch all auth events for outlier event {event_id}, still missing: \
|
||||
{still_missing:?}"
|
||||
{missing_auth_events:?}"
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -163,6 +192,10 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
||||
v.insert(auth_event);
|
||||
},
|
||||
| hash_map::Entry::Occupied(_) => {
|
||||
self.services
|
||||
.outlier
|
||||
.add_pdu_outlier(pdu_event.event_id(), &incoming_pdu);
|
||||
self.services.pdu_metadata.mark_event_rejected(event_id);
|
||||
return Err!(Request(InvalidParam(
|
||||
"Auth event's type and state_key combination exists multiple times: {}, {}",
|
||||
auth_event.kind,
|
||||
@@ -177,6 +210,10 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
||||
auth_events_by_key.get(&(StateEventType::RoomCreate, String::new().into())),
|
||||
Some(_) | None
|
||||
) {
|
||||
self.services.pdu_metadata.mark_event_rejected(event_id);
|
||||
self.services
|
||||
.outlier
|
||||
.add_pdu_outlier(pdu_event.event_id(), &incoming_pdu);
|
||||
return Err!(Request(InvalidParam("Incoming event refers to wrong create event.")));
|
||||
}
|
||||
|
||||
@@ -185,6 +222,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
||||
ready(auth_events_by_key.get(&key).map(ToOwned::to_owned))
|
||||
};
|
||||
|
||||
// PDU check: 3
|
||||
let auth_check = state_res::event_auth::auth_check(
|
||||
&room_version_rules,
|
||||
&pdu_event,
|
||||
@@ -196,7 +234,13 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
|
||||
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
|
||||
|
||||
if !auth_check {
|
||||
return Err!(Request(Forbidden("Auth check failed")));
|
||||
self.services.pdu_metadata.mark_event_rejected(event_id);
|
||||
self.services
|
||||
.outlier
|
||||
.add_pdu_outlier(pdu_event.event_id(), &incoming_pdu);
|
||||
return Err!(Request(Forbidden(
|
||||
"Event authorisation fails based on event's claimed auth events"
|
||||
)));
|
||||
}
|
||||
|
||||
trace!("Validation successful.");
|
||||
|
||||
@@ -1,89 +0,0 @@
|
||||
use std::{collections::BTreeMap, time::Instant};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, defer, implement,
|
||||
utils::continue_exponential_backoff_secs,
|
||||
};
|
||||
use ruma::{CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName};
|
||||
use tracing::debug;
|
||||
|
||||
#[implement(super::Service)]
|
||||
#[allow(clippy::type_complexity)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(
|
||||
name = "prev",
|
||||
level = INFO_SPAN_LEVEL,
|
||||
skip_all,
|
||||
fields(%prev_id),
|
||||
)]
|
||||
pub(super) async fn handle_prev_pdu<'a, Pdu>(
|
||||
&self,
|
||||
origin: &'a ServerName,
|
||||
event_id: &'a EventId,
|
||||
room_id: &'a RoomId,
|
||||
eventid_info: Option<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
|
||||
create_event: &'a Pdu,
|
||||
first_ts_in_room: MilliSecondsSinceUnixEpoch,
|
||||
prev_id: &'a EventId,
|
||||
) -> Result
|
||||
where
|
||||
Pdu: Event + Send + Sync,
|
||||
{
|
||||
// Check for disabled again because it might have changed
|
||||
if self.services.metadata.is_disabled(room_id).await {
|
||||
return Err!(Request(Forbidden(debug_warn!(
|
||||
"Federaton of room {room_id} is currently disabled on this server. Request by \
|
||||
origin {origin} and event ID {event_id}"
|
||||
))));
|
||||
}
|
||||
|
||||
if let Some((time, tries)) = self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.get(prev_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 5 * 60;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 24;
|
||||
if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) {
|
||||
debug!(
|
||||
?tries,
|
||||
duration = ?time.elapsed(),
|
||||
"Backing off from prev_event"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
let Some((pdu, json)) = eventid_info else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Skip old events
|
||||
if pdu.origin_server_ts() < first_ts_in_room {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let start_time = Instant::now();
|
||||
self.federation_handletime
|
||||
.write()
|
||||
.insert(room_id.into(), ((*prev_id).to_owned(), start_time));
|
||||
|
||||
defer! {{
|
||||
self.federation_handletime
|
||||
.write()
|
||||
.remove(room_id);
|
||||
}};
|
||||
|
||||
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id)
|
||||
.await?;
|
||||
|
||||
debug!(
|
||||
elapsed = ?start_time.elapsed(),
|
||||
"Handled prev_event",
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -4,7 +4,6 @@
|
||||
mod fetch_state;
|
||||
mod handle_incoming_pdu;
|
||||
mod handle_outlier_pdu;
|
||||
mod handle_prev_pdu;
|
||||
mod parse_incoming_pdu;
|
||||
mod policy_server;
|
||||
mod resolve_state;
|
||||
@@ -15,13 +14,13 @@
|
||||
|
||||
use async_trait::async_trait;
|
||||
use conduwuit::{Err, Event, PduEvent, Result, Server, SyncRwLock, utils::MutexMap};
|
||||
pub use fetch_and_handle_outliers::build_local_dag;
|
||||
use ruma::{
|
||||
OwnedEventId, OwnedRoomId, RoomId, events::room::create::RoomCreateEventContent,
|
||||
room_version_rules::RoomVersionRules,
|
||||
};
|
||||
|
||||
use crate::{Dep, globals, rooms, sending, server_keys};
|
||||
|
||||
pub struct Service {
|
||||
pub mutex_federation: RoomMutexMap,
|
||||
pub federation_handletime: SyncRwLock<HandleTimeMap>,
|
||||
|
||||
@@ -56,7 +56,10 @@ fn extract_room_id(event_type: &str, pdu: &CanonicalJsonObject) -> Result<OwnedR
|
||||
|
||||
/// Parses every entry in an array as an event ID, returning an error if any
|
||||
/// step fails.
|
||||
fn expect_event_id_array(value: &CanonicalJsonObject, field: &str) -> Result<Vec<OwnedEventId>> {
|
||||
pub(super) fn expect_event_id_array(
|
||||
value: &CanonicalJsonObject,
|
||||
field: &str,
|
||||
) -> Result<Vec<OwnedEventId>> {
|
||||
value
|
||||
.get(field)
|
||||
.ok_or_else(|| err!(Request(BadJson("missing field `{field}` on PDU"))))?
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
};
|
||||
|
||||
use conduwuit::{
|
||||
Result, debug, err, implement,
|
||||
Result, debug, err, error, implement,
|
||||
matrix::{Event, StateMap},
|
||||
trace,
|
||||
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt},
|
||||
@@ -121,6 +121,7 @@ pub(super) async fn state_at_incoming_resolved<Pdu>(
|
||||
.state_resolution(room_version_rules, fork_states.iter(), &auth_chain_sets)
|
||||
.boxed()
|
||||
.await
|
||||
.inspect_err(|e| error!("State resolution failed: {e:?}"))
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
@@ -1,14 +1,18 @@
|
||||
use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant};
|
||||
use std::{borrow::Borrow, collections::BTreeMap, sync::Arc, time::Instant};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Result, debug, debug_info, err, implement, info, is_equal_to,
|
||||
Err, Result, debug, debug_info, debug_warn, err, implement, is_equal_to,
|
||||
matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res},
|
||||
trace,
|
||||
utils::stream::{BroadbandExt, ReadyExt},
|
||||
utils::{
|
||||
IterStream,
|
||||
stream::{BroadbandExt, ReadyExt},
|
||||
},
|
||||
warn,
|
||||
};
|
||||
use futures::{FutureExt, StreamExt, future::ready};
|
||||
use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType};
|
||||
use tokio::join;
|
||||
|
||||
use super::get_room_version_rules;
|
||||
use crate::rooms::{
|
||||
@@ -35,16 +39,37 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
||||
.get_pdu_id(incoming_pdu.event_id())
|
||||
.await
|
||||
{
|
||||
trace!(event_id=%incoming_pdu.event_id(), "Skipping upgrade of already upgraded PDU");
|
||||
return Ok(Some(pduid));
|
||||
}
|
||||
|
||||
if self
|
||||
.services
|
||||
.pdu_metadata
|
||||
.is_event_soft_failed(incoming_pdu.event_id())
|
||||
.await
|
||||
{
|
||||
return Err!(Request(InvalidParam("Event has been soft failed")));
|
||||
let (rejected, soft_failed) = join!(
|
||||
self.services
|
||||
.pdu_metadata
|
||||
.is_event_rejected(incoming_pdu.event_id()),
|
||||
self.services
|
||||
.pdu_metadata
|
||||
.is_event_soft_failed(incoming_pdu.event_id())
|
||||
);
|
||||
if rejected {
|
||||
return Err!(Request(InvalidParam("Event has been rejected")));
|
||||
} else if soft_failed {
|
||||
return Err!(Request(InvalidParam("Event has been soft-failed")));
|
||||
}
|
||||
|
||||
// If any of the auth events are rejected, this event is also rejected.
|
||||
for aid in incoming_pdu.auth_events() {
|
||||
if self.services.pdu_metadata.is_event_rejected(aid).await {
|
||||
// TODO: debug_warn instead of warn
|
||||
warn!(
|
||||
"Rejecting incoming event {} which depends on rejected auth event {aid}",
|
||||
incoming_pdu.event_id()
|
||||
);
|
||||
self.services
|
||||
.pdu_metadata
|
||||
.mark_event_rejected(incoming_pdu.event_id());
|
||||
return Err!(Request(InvalidParam("Event has rejected auth event: {aid}")));
|
||||
}
|
||||
}
|
||||
|
||||
debug!(
|
||||
@@ -70,6 +95,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
||||
};
|
||||
|
||||
if state_at_incoming_event.is_none() {
|
||||
trace!("Could not calculate incoming state, asking remote {origin} for it");
|
||||
state_at_incoming_event = self
|
||||
.fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
|
||||
.await?;
|
||||
@@ -95,6 +121,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
||||
event_id = %incoming_pdu.event_id,
|
||||
"Running initial auth check"
|
||||
);
|
||||
// PDU check: 5
|
||||
let auth_check = state_res::event_auth::auth_check(
|
||||
&room_version_rules,
|
||||
&incoming_pdu,
|
||||
@@ -106,7 +133,12 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
||||
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
|
||||
|
||||
if !auth_check {
|
||||
return Err!(Request(Forbidden("Event has failed auth check with state at the event.")));
|
||||
self.services
|
||||
.pdu_metadata
|
||||
.mark_event_rejected(incoming_pdu.event_id());
|
||||
return Err!(Request(Forbidden(
|
||||
"Event authorisation fails based on the state before the event"
|
||||
)));
|
||||
}
|
||||
|
||||
debug!(
|
||||
@@ -135,6 +167,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
||||
event_id = %incoming_pdu.event_id,
|
||||
"Running auth check with claimed state auth"
|
||||
);
|
||||
// PDU check: 6
|
||||
let auth_check = state_res::event_auth::auth_check(
|
||||
&room_version_rules,
|
||||
&incoming_pdu,
|
||||
@@ -144,6 +177,12 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
||||
)
|
||||
.await
|
||||
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
|
||||
if !auth_check {
|
||||
warn!(
|
||||
event_id = %incoming_pdu.event_id,
|
||||
"Event authentication fails based on the current state of the room"
|
||||
);
|
||||
}
|
||||
|
||||
// Soft fail check before doing state res
|
||||
debug!(
|
||||
@@ -153,16 +192,22 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
||||
let mut soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_rules)) {
|
||||
| (false, _) => true,
|
||||
| (true, None) => false,
|
||||
| (true, Some(redact_id)) =>
|
||||
!self
|
||||
| (true, Some(redact_id)) => {
|
||||
if !self
|
||||
.services
|
||||
.state_accessor
|
||||
.user_can_redact(&redact_id, incoming_pdu.sender(), room_id, true)
|
||||
.await?,
|
||||
.await?
|
||||
{
|
||||
warn!(redacts = %redact_id, "User is not allowed to redact event");
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
// 13. Use state resolution to find new room state
|
||||
|
||||
// We start looking at current room state now, so lets lock the room
|
||||
trace!(
|
||||
room_id = %room_id,
|
||||
@@ -170,36 +215,6 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
||||
);
|
||||
let state_lock = self.services.state.mutex.lock(room_id).await;
|
||||
|
||||
// Now we calculate the set of extremities this room has after the incoming
|
||||
// event has been applied. We start with the previous extremities (aka leaves)
|
||||
trace!("Calculating extremities");
|
||||
let mut extremities: Vec<_> = self
|
||||
.services
|
||||
.state
|
||||
.get_forward_extremities(room_id)
|
||||
.ready_filter(|event_id| {
|
||||
// Remove any that are referenced by this incoming event's prev_events
|
||||
!incoming_pdu.prev_events().any(is_equal_to!(event_id))
|
||||
})
|
||||
.broad_filter_map(|event_id| async move {
|
||||
// Only keep those extremities were not referenced yet
|
||||
self.services
|
||||
.pdu_metadata
|
||||
.is_event_referenced(room_id, &event_id)
|
||||
.await
|
||||
.eq(&false)
|
||||
.then_some(event_id)
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
extremities.push(incoming_pdu.event_id().to_owned());
|
||||
|
||||
debug!(
|
||||
"Retained {} extremities checked against {} prev_events",
|
||||
extremities.len(),
|
||||
incoming_pdu.prev_events().count()
|
||||
);
|
||||
|
||||
let state_ids_compressed: Arc<CompressedState> = self
|
||||
.services
|
||||
.state_compressor
|
||||
@@ -294,60 +309,57 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
||||
.is_event_soft_failed(&redact_id)
|
||||
.await
|
||||
{
|
||||
// TODO: This should avoid pushing the event to the timeline instead of using
|
||||
// soft-fails as a hack
|
||||
warn!(
|
||||
redact_id = %redact_id,
|
||||
"Redaction is for a soft-failed event, soft failing the redaction"
|
||||
"Redaction is for a soft-failed event"
|
||||
);
|
||||
soft_fail = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 14. Check if the event passes auth based on the "current state" of the room,
|
||||
// if not soft fail it
|
||||
if soft_fail {
|
||||
info!(
|
||||
event_id = %incoming_pdu.event_id,
|
||||
"Soft failing event"
|
||||
);
|
||||
// assert!(extremities.is_empty(), "soft_fail extremities empty");
|
||||
let extremities = extremities.iter().map(Borrow::borrow);
|
||||
debug_assert!(extremities.clone().count() > 0, "extremities not empty");
|
||||
|
||||
self.services
|
||||
.timeline
|
||||
.append_incoming_pdu(
|
||||
&incoming_pdu,
|
||||
val,
|
||||
extremities,
|
||||
state_ids_compressed,
|
||||
soft_fail,
|
||||
&state_lock,
|
||||
room_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Soft fail, we keep the event as an outlier but don't add it to the timeline
|
||||
self.services
|
||||
.pdu_metadata
|
||||
.mark_event_soft_failed(incoming_pdu.event_id());
|
||||
|
||||
warn!(
|
||||
event_id = %incoming_pdu.event_id,
|
||||
"Event was soft failed"
|
||||
);
|
||||
return Err!(Request(InvalidParam("Event has been soft failed")));
|
||||
}
|
||||
|
||||
// Now that the event has passed all auth it is added into the timeline.
|
||||
// We use the `state_at_event` instead of `state_after` so we accurately
|
||||
// represent the state for this event.
|
||||
trace!("Appending pdu to timeline");
|
||||
let extremities = extremities
|
||||
.iter()
|
||||
.map(Borrow::borrow)
|
||||
.chain(once(incoming_pdu.event_id()));
|
||||
debug_assert!(extremities.clone().count() > 0, "extremities not empty");
|
||||
let mut extremities: Vec<_> = self
|
||||
.services
|
||||
.state
|
||||
.get_forward_extremities(room_id)
|
||||
.collect()
|
||||
.await;
|
||||
if !soft_fail {
|
||||
// Per https://spec.matrix.org/unstable/server-server-api/#soft-failure, soft-failed events
|
||||
// are not added as forward extremities.
|
||||
|
||||
// Now we calculate the set of extremities this room has after the incoming
|
||||
// event has been applied. We start with the previous extremities (aka leaves)
|
||||
trace!("Calculating extremities");
|
||||
extremities = extremities
|
||||
.into_iter()
|
||||
.stream()
|
||||
.ready_filter(|event_id| {
|
||||
// Remove any that are referenced by this incoming event's prev_events
|
||||
!incoming_pdu.prev_events().any(is_equal_to!(event_id))
|
||||
})
|
||||
.broad_filter_map(|event_id| async move {
|
||||
// Only keep those extremities were not referenced yet
|
||||
self.services
|
||||
.pdu_metadata
|
||||
.is_event_referenced(room_id, &event_id)
|
||||
.await
|
||||
.eq(&false)
|
||||
.then_some(event_id)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
extremities.push(incoming_pdu.event_id().to_owned());
|
||||
debug!(
|
||||
"Retained {} extremities checked against {} prev_events",
|
||||
extremities.len(),
|
||||
incoming_pdu.prev_events().count()
|
||||
);
|
||||
assert!(!extremities.is_empty(), "extremities must not empty");
|
||||
}
|
||||
|
||||
let pdu_id = self
|
||||
.services
|
||||
@@ -355,20 +367,30 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
||||
.append_incoming_pdu(
|
||||
&incoming_pdu,
|
||||
val,
|
||||
extremities,
|
||||
extremities.iter().map(Borrow::borrow),
|
||||
state_ids_compressed,
|
||||
soft_fail,
|
||||
&state_lock,
|
||||
room_id,
|
||||
)
|
||||
.await?;
|
||||
if soft_fail {
|
||||
self.services
|
||||
.pdu_metadata
|
||||
.mark_event_soft_failed(incoming_pdu.event_id());
|
||||
debug_warn!(
|
||||
elapsed = ?timer.elapsed(),
|
||||
"Event has been soft-failed",
|
||||
);
|
||||
} else {
|
||||
debug_info!(
|
||||
elapsed = ?timer.elapsed(),
|
||||
"Accepted",
|
||||
);
|
||||
}
|
||||
|
||||
// Event has passed all auth/stateres checks
|
||||
drop(state_lock);
|
||||
debug_info!(
|
||||
elapsed = ?timer.elapsed(),
|
||||
"Accepted",
|
||||
);
|
||||
|
||||
Ok(pdu_id)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,939 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Pdu, Result, Server, debug, debug_info, debug_warn, err, error, info, is_true,
|
||||
matrix::{
|
||||
StateKey,
|
||||
event::{gen_event_id, gen_event_id_canonical_json},
|
||||
},
|
||||
pdu::PartialPdu,
|
||||
state_res, trace,
|
||||
utils::{self, IterStream, ReadyExt, to_canonical_object},
|
||||
warn,
|
||||
};
|
||||
use database::Database;
|
||||
use futures::{FutureExt, StreamExt, TryFutureExt, join};
|
||||
use ruma::{
|
||||
CanonicalJsonObject, CanonicalJsonValue, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId,
|
||||
RoomVersionId, UserId,
|
||||
api::{
|
||||
error::{ErrorKind, IncompatibleRoomVersionErrorData},
|
||||
federation,
|
||||
},
|
||||
canonical_json::to_canonical_value,
|
||||
events::{
|
||||
StateEventType, StaticEventContent,
|
||||
room::{
|
||||
join_rules::RoomJoinRulesEventContent,
|
||||
member::{MembershipState, RoomMemberEventContent},
|
||||
},
|
||||
},
|
||||
room::{AllowRule, JoinRule},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
Dep, antispam, globals,
|
||||
rooms::{
|
||||
metadata, outlier, pdu_metadata, short,
|
||||
state::{self, RoomMutexGuard},
|
||||
state_accessor, state_cache,
|
||||
state_compressor::{self, CompressedState, HashSetCompressStateEvent},
|
||||
timeline::{self, pdu_fits},
|
||||
},
|
||||
sending, server_keys, users,
|
||||
};
|
||||
|
||||
pub struct Service {
|
||||
services: Services,
|
||||
}
|
||||
|
||||
struct Services {
|
||||
server: Arc<Server>,
|
||||
db: Arc<Database>,
|
||||
antispam: Dep<antispam::Service>,
|
||||
globals: Dep<globals::Service>,
|
||||
metadata: Dep<metadata::Service>,
|
||||
outlier: Dep<outlier::Service>,
|
||||
pdu_metadata: Dep<pdu_metadata::Service>,
|
||||
sending: Dep<sending::Service>,
|
||||
server_keys: Dep<server_keys::Service>,
|
||||
short: Dep<short::Service>,
|
||||
state: Dep<state::Service>,
|
||||
state_accessor: Dep<state_accessor::Service>,
|
||||
state_cache: Dep<state_cache::Service>,
|
||||
state_compressor: Dep<state_compressor::Service>,
|
||||
timeline: Dep<timeline::Service>,
|
||||
users: Dep<users::Service>,
|
||||
}
|
||||
|
||||
impl crate::Service for Service {
|
||||
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
Ok(Arc::new(Self {
|
||||
services: Services {
|
||||
server: args.server.clone(),
|
||||
db: args.db.clone(),
|
||||
antispam: args.depend::<antispam::Service>("antispam"),
|
||||
globals: args.depend::<globals::Service>("globals"),
|
||||
metadata: args.depend::<metadata::Service>("metadata"),
|
||||
outlier: args.depend::<outlier::Service>("rooms::outlier"),
|
||||
pdu_metadata: args.depend::<pdu_metadata::Service>("rooms::pdu_metadata"),
|
||||
sending: args.depend::<sending::Service>("sending"),
|
||||
server_keys: args.depend::<server_keys::Service>("server_keys"),
|
||||
short: args.depend::<short::Service>("rooms::short"),
|
||||
state: args.depend::<state::Service>("rooms::state"),
|
||||
state_accessor: args.depend::<state_accessor::Service>("rooms::state_accessor"),
|
||||
state_cache: args.depend::<state_cache::Service>("rooms::state_cache"),
|
||||
state_compressor: args
|
||||
.depend::<state_compressor::Service>("rooms::state_compressor"),
|
||||
timeline: args.depend::<timeline::Service>("rooms::timeline"),
|
||||
users: args.depend::<users::Service>("users"),
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
||||
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
||||
}
|
||||
|
||||
impl Service {
|
||||
/// Join a local user to a room.
|
||||
pub async fn join_room(
|
||||
&self,
|
||||
sender_user: &UserId,
|
||||
room_id: &RoomId,
|
||||
reason: Option<String>,
|
||||
servers: &[OwnedServerName],
|
||||
) -> Result<OwnedRoomId> {
|
||||
assert!(self.services.globals.user_is_local(sender_user), "user should be local");
|
||||
|
||||
let state_lock = self.services.state.mutex.lock(room_id).await;
|
||||
|
||||
if self
|
||||
.services
|
||||
.state_cache
|
||||
.is_joined(sender_user, room_id)
|
||||
.await
|
||||
{
|
||||
debug_warn!("{sender_user} is already joined in {room_id}");
|
||||
return Ok(room_id.to_owned());
|
||||
}
|
||||
|
||||
if let Err(e) = self
|
||||
.services
|
||||
.antispam
|
||||
.user_may_join_room(
|
||||
sender_user.to_owned(),
|
||||
room_id.to_owned(),
|
||||
self.services
|
||||
.state_cache
|
||||
.is_invited(sender_user, room_id)
|
||||
.await,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("Antispam prevented user {} from joining room {}: {}", sender_user, room_id, e);
|
||||
return Err!(Request(Forbidden("You are not allowed to join this room.")));
|
||||
}
|
||||
|
||||
let server_in_room = self
|
||||
.services
|
||||
.state_cache
|
||||
.server_in_room(self.services.globals.server_name(), room_id)
|
||||
.await;
|
||||
|
||||
// Only check our known membership if we're already in the room.
|
||||
// See: https://forgejo.ellis.link/continuwuation/continuwuity/issues/855
|
||||
let membership = if server_in_room {
|
||||
self.services
|
||||
.state_accessor
|
||||
.get_member(room_id, sender_user)
|
||||
.await
|
||||
} else {
|
||||
debug!("Ignoring local state for join {room_id}, we aren't in the room yet.");
|
||||
Ok(RoomMemberEventContent::new(MembershipState::Leave))
|
||||
};
|
||||
|
||||
if let Ok(m) = membership {
|
||||
if m.membership == MembershipState::Ban {
|
||||
debug_warn!("{sender_user} is banned from {room_id} but attempted to join");
|
||||
// TODO: return reason
|
||||
return Err!(Request(Forbidden("You are banned from the room.")));
|
||||
}
|
||||
}
|
||||
|
||||
if !server_in_room && servers.is_empty() {
|
||||
return Err!(Request(NotFound(
|
||||
"No servers were provided to assist in joining the room remotely, and we are \
|
||||
not already participating in the room."
|
||||
)));
|
||||
}
|
||||
|
||||
if self.services.antispam.check_all_joins() {
|
||||
if let Err(e) = self
|
||||
.services
|
||||
.antispam
|
||||
.meowlnir_accept_make_join(room_id.to_owned(), sender_user.to_owned())
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"Antispam prevented user {} from joining room {}: {}",
|
||||
sender_user, room_id, e
|
||||
);
|
||||
return Err!(Request(Forbidden("Antispam rejected join request.")));
|
||||
}
|
||||
}
|
||||
|
||||
if server_in_room {
|
||||
self.join_local_room(sender_user, room_id, reason, servers, state_lock)
|
||||
.boxed()
|
||||
.await?;
|
||||
} else {
|
||||
// Ask a remote server if we are not participating in this room
|
||||
self.join_remote_room(sender_user, room_id, reason, servers, state_lock)
|
||||
.boxed()
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(room_id.to_owned())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_local", level = "info")]
|
||||
async fn join_local_room(
|
||||
&self,
|
||||
sender_user: &UserId,
|
||||
room_id: &RoomId,
|
||||
reason: Option<String>,
|
||||
servers: &[OwnedServerName],
|
||||
state_lock: RoomMutexGuard,
|
||||
) -> Result {
|
||||
info!("Joining room locally");
|
||||
|
||||
let (room_version, join_rules, is_invited) = join!(
|
||||
self.services.state.get_room_version(room_id),
|
||||
self.services.state_accessor.get_join_rules(room_id),
|
||||
self.services.state_cache.is_invited(sender_user, room_id)
|
||||
);
|
||||
|
||||
let room_version = room_version?;
|
||||
let room_version_rules = room_version.rules().unwrap();
|
||||
|
||||
let mut auth_user: Option<OwnedUserId> = None;
|
||||
if !is_invited
|
||||
&& matches!(join_rules, JoinRule::Restricted(_) | JoinRule::KnockRestricted(_))
|
||||
{
|
||||
if room_version_rules.authorization.restricted_join_rule {
|
||||
// This is a restricted room, check if we can complete the join requirements
|
||||
// locally.
|
||||
let needs_auth_user = self
|
||||
.user_can_perform_restricted_join(sender_user, room_id)
|
||||
.await;
|
||||
if needs_auth_user.is_ok_and(is_true!()) {
|
||||
// If there was an error or the value is false, we'll try joining over
|
||||
// federation. Since it's Ok(true), we can authorise this locally.
|
||||
// If we can't select a local user, this will remain None, the join will fail,
|
||||
// and we'll fall back to federation.
|
||||
auth_user = self
|
||||
.select_authorising_user(room_id, sender_user, &state_lock)
|
||||
.await
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut content = RoomMemberEventContent::new(MembershipState::Join);
|
||||
content.displayname = self.services.users.displayname(sender_user).await.ok();
|
||||
content.avatar_url = self.services.users.avatar_url(sender_user).await.ok();
|
||||
content.blurhash = self.services.users.blurhash(sender_user).await.ok();
|
||||
content.reason.clone_from(&reason);
|
||||
content.join_authorized_via_users_server = auth_user;
|
||||
|
||||
// Try normal join first
|
||||
let Err(error) = self
|
||||
.services
|
||||
.timeline
|
||||
.build_and_append_pdu(
|
||||
PartialPdu::state(sender_user.to_string(), &content),
|
||||
sender_user,
|
||||
Some(room_id),
|
||||
&state_lock,
|
||||
)
|
||||
.await
|
||||
else {
|
||||
info!("Joined room locally");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if servers.is_empty()
|
||||
|| servers.len() == 1 && self.services.globals.server_is_ours(&servers[0])
|
||||
{
|
||||
if !self.services.metadata.exists(room_id).await {
|
||||
return Err!(Request(
|
||||
Unknown(
|
||||
"Room was not found locally and no servers were found to help us \
|
||||
discover it"
|
||||
),
|
||||
NOT_FOUND
|
||||
));
|
||||
}
|
||||
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
info!(
|
||||
?error,
|
||||
remote_servers = %servers.len(),
|
||||
"Could not join room locally, attempting remote join",
|
||||
);
|
||||
self.join_remote_room(sender_user, room_id, reason, servers, state_lock)
|
||||
.await
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_remote_room", level = "info")]
|
||||
pub async fn join_remote_room(
|
||||
&self,
|
||||
sender_user: &UserId,
|
||||
room_id: &RoomId,
|
||||
reason: Option<String>,
|
||||
servers: &[OwnedServerName],
|
||||
state_lock: RoomMutexGuard,
|
||||
) -> Result {
|
||||
// public so the admin command force-join-room-remotely works
|
||||
info!("Joining {room_id} over federation.");
|
||||
|
||||
let (make_join_response, remote_server) = self
|
||||
.make_join_request(sender_user, room_id, servers)
|
||||
.await?;
|
||||
|
||||
info!("make_join finished");
|
||||
|
||||
let room_version = make_join_response.room_version.unwrap_or(RoomVersionId::V1);
|
||||
let room_version_rules = room_version
|
||||
.rules()
|
||||
.expect("room version should have defined rules");
|
||||
|
||||
if !self.services.server.supported_room_version(&room_version) {
|
||||
// How did we get here?
|
||||
return Err!(BadServerResponse(
|
||||
"Remote room version {room_version} is not supported"
|
||||
));
|
||||
}
|
||||
|
||||
let mut join_event_stub: CanonicalJsonObject =
|
||||
serde_json::from_str(make_join_response.event.get()).map_err(|e| {
|
||||
err!(BadServerResponse(warn!(
|
||||
"Invalid make_join event json received from server: {e:?}"
|
||||
)))
|
||||
})?;
|
||||
|
||||
let join_authorized_via_users_server = {
|
||||
if room_version_rules
|
||||
.signatures
|
||||
.check_join_authorised_via_users_server
|
||||
{
|
||||
join_event_stub
|
||||
.get("content")
|
||||
.map(|s| {
|
||||
s.as_object()?
|
||||
.get("join_authorised_via_users_server")?
|
||||
.as_str()
|
||||
})
|
||||
.and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
join_event_stub.insert(
|
||||
"origin_server_ts".to_owned(),
|
||||
CanonicalJsonValue::Integer(
|
||||
utils::millis_since_unix_epoch()
|
||||
.try_into()
|
||||
.expect("Timestamp is valid js_int value"),
|
||||
),
|
||||
);
|
||||
|
||||
let mut join_content = RoomMemberEventContent::new(MembershipState::Join);
|
||||
join_content.displayname = self.services.users.displayname(sender_user).await.ok();
|
||||
join_content.avatar_url = self.services.users.avatar_url(sender_user).await.ok();
|
||||
join_content.blurhash = self.services.users.blurhash(sender_user).await.ok();
|
||||
join_content.reason = reason;
|
||||
join_content
|
||||
.join_authorized_via_users_server
|
||||
.clone_from(&join_authorized_via_users_server);
|
||||
|
||||
join_event_stub.insert(
|
||||
"content".to_owned(),
|
||||
to_canonical_value(join_content).expect("event is valid, we just created it"),
|
||||
);
|
||||
|
||||
// Remove event id if it exists
|
||||
join_event_stub.remove("event_id");
|
||||
|
||||
// In order to create a compatible ref hash (EventID) the `hashes` field needs
|
||||
// to be present
|
||||
self.services
|
||||
.server_keys
|
||||
.hash_and_sign_event(&mut join_event_stub, &room_version_rules)?;
|
||||
|
||||
// Generate event id
|
||||
let event_id = gen_event_id(&join_event_stub, &room_version_rules)?;
|
||||
|
||||
// Add event_id back
|
||||
join_event_stub
|
||||
.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into()));
|
||||
|
||||
// It has enough fields to be called a proper event now
|
||||
let mut join_event = join_event_stub;
|
||||
|
||||
info!("Asking {remote_server} for send_join in room {room_id}");
|
||||
let send_join_request = federation::membership::create_join_event::v2::Request::new(
|
||||
room_id.to_owned(),
|
||||
event_id.clone(),
|
||||
self.services
|
||||
.sending
|
||||
.convert_to_outgoing_federation_event(join_event.clone())
|
||||
.await,
|
||||
);
|
||||
|
||||
let send_join_response = match self
|
||||
.services
|
||||
.sending
|
||||
.send_synapse_request(&remote_server, send_join_request)
|
||||
.await
|
||||
{
|
||||
| Ok(response) => response,
|
||||
| Err(e) => {
|
||||
error!("send_join failed: {e}");
|
||||
return Err(e);
|
||||
},
|
||||
};
|
||||
|
||||
info!("send_join finished");
|
||||
|
||||
if join_authorized_via_users_server.is_some() {
|
||||
if let Some(signed_raw) = &send_join_response.room_state.event {
|
||||
debug_info!(
|
||||
"There is a signed event with join_authorized_via_users_server. This room \
|
||||
is probably using restricted joins. Adding signature to our event"
|
||||
);
|
||||
|
||||
let (signed_event_id, signed_value) =
|
||||
gen_event_id_canonical_json(signed_raw, &room_version_rules).map_err(
|
||||
|e| {
|
||||
err!(Request(BadJson(warn!(
|
||||
"Could not convert event to canonical JSON: {e}"
|
||||
))))
|
||||
},
|
||||
)?;
|
||||
|
||||
if signed_event_id != event_id {
|
||||
return Err!(Request(BadJson(warn!(
|
||||
%signed_event_id, %event_id,
|
||||
"Server {remote_server} sent event with wrong event ID"
|
||||
))));
|
||||
}
|
||||
|
||||
match signed_value["signatures"]
|
||||
.as_object()
|
||||
.ok_or_else(|| {
|
||||
err!(BadServerResponse(warn!(
|
||||
"Server {remote_server} sent invalid signatures type"
|
||||
)))
|
||||
})
|
||||
.and_then(|e| {
|
||||
e.get(remote_server.as_str()).ok_or_else(|| {
|
||||
err!(BadServerResponse(warn!(
|
||||
"Server {remote_server} did not send its signature for a \
|
||||
restricted room"
|
||||
)))
|
||||
})
|
||||
}) {
|
||||
| Ok(signature) => {
|
||||
join_event
|
||||
.get_mut("signatures")
|
||||
.expect("we created a valid pdu")
|
||||
.as_object_mut()
|
||||
.expect("we created a valid pdu")
|
||||
.insert(remote_server.to_string(), signature.clone());
|
||||
},
|
||||
| Err(e) => {
|
||||
warn!(
|
||||
"Server {remote_server} sent invalid signature in send_join \
|
||||
signatures for event {signed_value:?}: {e:?}",
|
||||
);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.services.short.get_or_create_shortroomid(room_id).await;
|
||||
|
||||
info!("Parsing join event");
|
||||
let parsed_join_pdu = Pdu::from_id_val(&event_id, join_event.clone())
|
||||
.map_err(|e| err!(BadServerResponse("Invalid join event PDU: {e:?}")))?;
|
||||
|
||||
info!("Acquiring server signing keys for response events");
|
||||
let resp_events = &send_join_response.room_state;
|
||||
let resp_state = &resp_events.state;
|
||||
let resp_auth = &resp_events.auth_chain;
|
||||
self.services
|
||||
.server_keys
|
||||
.acquire_events_pubkeys(resp_auth.iter().chain(resp_state.iter()))
|
||||
.await;
|
||||
|
||||
info!("Going through send_join response room_state");
|
||||
let cork = self.services.db.cork_and_flush();
|
||||
let state = send_join_response
|
||||
.room_state
|
||||
.state
|
||||
.iter()
|
||||
.stream()
|
||||
.then(|pdu| {
|
||||
self.services
|
||||
.server_keys
|
||||
.validate_and_add_event_id_no_fetch(pdu, &room_version_rules)
|
||||
.inspect_err(|e| {
|
||||
debug_warn!(
|
||||
"Could not validate send_join response room_state event: {e:?}"
|
||||
);
|
||||
})
|
||||
.inspect(|_| {
|
||||
debug!("Completed validating send_join response room_state event");
|
||||
})
|
||||
})
|
||||
.ready_filter_map(Result::ok)
|
||||
.fold(HashMap::new(), |mut state, (event_id, value)| async move {
|
||||
let pdu = match Pdu::from_id_val(&event_id, value.clone()) {
|
||||
| Ok(pdu) => pdu,
|
||||
| Err(e) => {
|
||||
debug_warn!("Invalid PDU in send_join response: {e:?}: {value:#?}");
|
||||
return state;
|
||||
},
|
||||
};
|
||||
if !pdu_fits(&mut value.clone()) {
|
||||
warn!(
|
||||
"dropping incoming PDU {event_id} in room {room_id} from room join \
|
||||
because it exceeds 65535 bytes or is otherwise too large."
|
||||
);
|
||||
return state;
|
||||
}
|
||||
self.services.outlier.add_pdu_outlier(&event_id, &value);
|
||||
self.services.pdu_metadata.clear_pdu_markers(&event_id);
|
||||
if let Some(state_key) = &pdu.state_key {
|
||||
let shortstatekey = self
|
||||
.services
|
||||
.short
|
||||
.get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)
|
||||
.await;
|
||||
|
||||
state.insert(shortstatekey, pdu.event_id.clone());
|
||||
}
|
||||
state
|
||||
})
|
||||
.await;
|
||||
|
||||
drop(cork);
|
||||
|
||||
info!("Going through send_join response auth_chain");
|
||||
let cork = self.services.db.cork_and_flush();
|
||||
send_join_response
|
||||
.room_state
|
||||
.auth_chain
|
||||
.iter()
|
||||
.stream()
|
||||
.then(|pdu| {
|
||||
self.services
|
||||
.server_keys
|
||||
.validate_and_add_event_id_no_fetch(pdu, &room_version_rules)
|
||||
})
|
||||
.ready_filter_map(Result::ok)
|
||||
.ready_for_each(|(event_id, value)| {
|
||||
trace!(%event_id, "Adding PDU as an outlier from send_join auth_chain");
|
||||
self.services.outlier.add_pdu_outlier(&event_id, &value);
|
||||
self.services.pdu_metadata.clear_pdu_markers(&event_id);
|
||||
})
|
||||
.await;
|
||||
|
||||
drop(cork);
|
||||
|
||||
debug!("Running send_join auth check");
|
||||
let fetch_state = &state;
|
||||
let state_fetch = |k: StateEventType, s: StateKey| async move {
|
||||
let shortstatekey = self.services.short.get_shortstatekey(&k, &s).await.ok()?;
|
||||
|
||||
let event_id = fetch_state.get(&shortstatekey)?;
|
||||
self.services.timeline.get_pdu(event_id).await.ok()
|
||||
};
|
||||
|
||||
let auth_check = state_res::event_auth::auth_check(
|
||||
&room_version.rules().unwrap(),
|
||||
&parsed_join_pdu,
|
||||
None, // TODO: third party invite
|
||||
|k, s| state_fetch(k.clone(), s.into()),
|
||||
&state_fetch(StateEventType::RoomCreate, "".into())
|
||||
.await
|
||||
.expect("create event is missing from send_join auth"),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| err!(Request(Forbidden(warn!("Auth check failed: {e:?}")))))?;
|
||||
|
||||
if !auth_check {
|
||||
return Err!(Request(Forbidden("Auth check failed")));
|
||||
}
|
||||
|
||||
info!("Compressing state from send_join");
|
||||
let compressed: CompressedState = self
|
||||
.services
|
||||
.state_compressor
|
||||
.compress_state_events(state.iter().map(|(ssk, eid)| (ssk, eid.as_ref())))
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
debug!("Saving compressed state");
|
||||
let HashSetCompressStateEvent {
|
||||
shortstatehash: statehash_before_join,
|
||||
added,
|
||||
removed,
|
||||
} = self
|
||||
.services
|
||||
.state_compressor
|
||||
.save_state(room_id, Arc::new(compressed))
|
||||
.await?;
|
||||
|
||||
debug!("Forcing state for new room");
|
||||
self.services
|
||||
.state
|
||||
.force_state(room_id, statehash_before_join, added, removed, &state_lock)
|
||||
.await?;
|
||||
|
||||
debug!("Updating joined counts for new room");
|
||||
self.services.state_cache.update_joined_count(room_id).await;
|
||||
|
||||
// We append to state before appending the pdu, so we don't have a moment in
|
||||
// time with the pdu without it's state. This is okay because append_pdu can't
|
||||
// fail.
|
||||
let statehash_after_join = self
|
||||
.services
|
||||
.state
|
||||
.append_to_state(&parsed_join_pdu, room_id)
|
||||
.await?;
|
||||
|
||||
info!("Appending new room join event");
|
||||
self.services
|
||||
.timeline
|
||||
.append_pdu(
|
||||
&parsed_join_pdu,
|
||||
join_event,
|
||||
std::iter::once(parsed_join_pdu.event_id.as_ref()),
|
||||
&state_lock,
|
||||
room_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!("Setting final room state for new room");
|
||||
// We set the room state after inserting the pdu, so that we never have a moment
|
||||
// in time where events in the current room state do not exist
|
||||
self.services
|
||||
.state
|
||||
.set_room_state(room_id, statehash_after_join, &state_lock);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn make_join_request(
|
||||
&self,
|
||||
sender_user: &UserId,
|
||||
room_id: &RoomId,
|
||||
servers: &[OwnedServerName],
|
||||
) -> Result<(federation::membership::prepare_join_event::v1::Response, OwnedServerName)> {
|
||||
let mut make_join_counter: usize = 1;
|
||||
|
||||
for remote_server in servers {
|
||||
if self.services.globals.server_is_ours(remote_server) {
|
||||
continue;
|
||||
}
|
||||
info!(
|
||||
"Asking {remote_server} for make_join (attempt {make_join_counter}/{})",
|
||||
servers.len()
|
||||
);
|
||||
|
||||
let mut request = federation::membership::prepare_join_event::v1::Request::new(
|
||||
room_id.to_owned(),
|
||||
sender_user.to_owned(),
|
||||
);
|
||||
request.ver = self.services.server.supported_room_versions().collect();
|
||||
|
||||
let make_join_response = self
|
||||
.services
|
||||
.sending
|
||||
.send_federation_request(remote_server, request)
|
||||
.await;
|
||||
|
||||
trace!("make_join response: {:?}", make_join_response);
|
||||
make_join_counter = make_join_counter.saturating_add(1);
|
||||
|
||||
match make_join_response {
|
||||
| Ok(response) => {
|
||||
info!("Received make_join response from {remote_server}");
|
||||
if let Err(e) = validate_remote_member_event_stub(
|
||||
&MembershipState::Join,
|
||||
sender_user,
|
||||
room_id,
|
||||
&to_canonical_object(&response.event)?,
|
||||
) {
|
||||
warn!("make_join response from {remote_server} failed validation: {e}");
|
||||
continue;
|
||||
}
|
||||
return Ok((response, remote_server.clone()));
|
||||
},
|
||||
| Err(e) => match e.kind() {
|
||||
| ErrorKind::UnableToAuthorizeJoin => {
|
||||
info!(
|
||||
"{remote_server} was unable to verify the joining user satisfied \
|
||||
restricted join requirements: {e}. Will continue trying."
|
||||
);
|
||||
},
|
||||
| ErrorKind::UnableToGrantJoin => {
|
||||
info!(
|
||||
"{remote_server} believes the joining user satisfies restricted \
|
||||
join rules, but is unable to authorise a join for us. Will \
|
||||
continue trying."
|
||||
);
|
||||
},
|
||||
| ErrorKind::IncompatibleRoomVersion(IncompatibleRoomVersionErrorData {
|
||||
room_version,
|
||||
..
|
||||
}) => {
|
||||
warn!(
|
||||
"{remote_server} reports the room we are trying to join is \
|
||||
v{room_version}, which we do not support."
|
||||
);
|
||||
return Err(e);
|
||||
},
|
||||
| ErrorKind::Forbidden => {
|
||||
warn!("{remote_server} refuses to let us join: {e}.");
|
||||
return Err(e);
|
||||
},
|
||||
| ErrorKind::NotFound => {
|
||||
info!(
|
||||
"{remote_server} does not know about {room_id}: {e}. Will continue \
|
||||
trying."
|
||||
);
|
||||
},
|
||||
| _ => {
|
||||
info!("{remote_server} failed to make_join: {e}. Will continue trying.");
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
info!("All {} servers were unable to assist in joining {room_id} :(", servers.len());
|
||||
Err!(BadServerResponse("No server available to assist in joining."))
|
||||
}
|
||||
|
||||
/// Attempts to find a user who is able to issue an invite in the target
|
||||
/// room.
|
||||
pub async fn select_authorising_user<'a>(
|
||||
&self,
|
||||
room_id: &'a RoomId,
|
||||
user_id: &'a UserId,
|
||||
state_lock: &'a RoomMutexGuard,
|
||||
) -> Result<OwnedUserId> {
|
||||
let candidates = self.services.state_cache.local_users_in_room(room_id);
|
||||
|
||||
let mut candidates = std::pin::pin!(candidates);
|
||||
|
||||
while let Some(candidate) = candidates.next().await {
|
||||
if self
|
||||
.services
|
||||
.state_accessor
|
||||
.user_can_invite(room_id, &candidate, user_id, state_lock)
|
||||
.await
|
||||
{
|
||||
return Ok(candidate);
|
||||
}
|
||||
}
|
||||
|
||||
Err!(Request(UnableToGrantJoin(
|
||||
"No user on this server is able to assist in joining."
|
||||
)))
|
||||
}
|
||||
|
||||
/// Checks whether the given user can join the given room via a restricted
|
||||
/// join.
|
||||
pub(crate) async fn user_can_perform_restricted_join(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
) -> Result<bool> {
|
||||
let Ok(join_rules_event_content) = self
|
||||
.services
|
||||
.state_accessor
|
||||
.room_state_get_content::<RoomJoinRulesEventContent>(
|
||||
room_id,
|
||||
&StateEventType::RoomJoinRules,
|
||||
"",
|
||||
)
|
||||
.await
|
||||
else {
|
||||
// No join rules means there's nothing to authorise (defaults to invite)
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
let (JoinRule::Restricted(r) | JoinRule::KnockRestricted(r)) =
|
||||
join_rules_event_content.join_rule
|
||||
else {
|
||||
// This is not a restricted room
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
if r.allow.is_empty() {
|
||||
// This will never be authorisable, return forbidden.
|
||||
return Err!(Request(Forbidden("You are not invited to this room.")));
|
||||
}
|
||||
|
||||
let mut could_satisfy = true;
|
||||
for allow_rule in &r.allow {
|
||||
match allow_rule {
|
||||
| AllowRule::RoomMembership(membership) => {
|
||||
if !self
|
||||
.services
|
||||
.state_cache
|
||||
.server_in_room(self.services.globals.server_name(), &membership.room_id)
|
||||
.await
|
||||
{
|
||||
// Since we can't check this room, mark could_satisfy as false
|
||||
// so that we can return M_UNABLE_TO_AUTHORIZE_JOIN later.
|
||||
could_satisfy = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
if self
|
||||
.services
|
||||
.state_cache
|
||||
.is_joined(user_id, &membership.room_id)
|
||||
.await
|
||||
{
|
||||
debug!(
|
||||
"User {} is allowed to join room {} via membership in room {}",
|
||||
user_id, room_id, membership.room_id
|
||||
);
|
||||
return Ok(true);
|
||||
}
|
||||
},
|
||||
| other if other.rule_type() == "fi.mau.spam_checker" =>
|
||||
return match self
|
||||
.services
|
||||
.antispam
|
||||
.meowlnir_accept_make_join(room_id.to_owned(), user_id.to_owned())
|
||||
.await
|
||||
{
|
||||
| Ok(()) => Ok(true),
|
||||
| Err(_) => Err!(Request(Forbidden("Antispam rejected join request."))),
|
||||
},
|
||||
| _ => {
|
||||
// We don't recognise this join rule, so we cannot satisfy the request.
|
||||
could_satisfy = false;
|
||||
debug_info!(
|
||||
"Unsupported allow rule in restricted join for room {}: {:?}",
|
||||
room_id,
|
||||
allow_rule
|
||||
);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if could_satisfy {
|
||||
// We were able to check all the restrictions and can be certain that the
|
||||
// prospective member is not permitted to join.
|
||||
Err!(Request(Forbidden(
|
||||
"You do not belong to any of the rooms or spaces required to join this room."
|
||||
)))
|
||||
} else {
|
||||
// We were unable to check all the restrictions. This usually means we aren't in
|
||||
// one of the rooms this one is restricted to, ergo can't check its state for
|
||||
// the user's membership, and consequently the user *might* be able to join if
|
||||
// they ask another server.
|
||||
Err!(Request(UnableToAuthorizeJoin(
|
||||
"You do not belong to any of the recognised rooms or spaces required to join \
|
||||
this room, but this server is unable to verify every requirement. You may be \
|
||||
able to join via another server."
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Validates that an event returned from a remote server by `/make_*`
|
||||
/// actually is a membership event with the expected fields.
|
||||
///
|
||||
/// Without checking this, the remote server could use the remote membership
|
||||
/// mechanism to trick our server into signing arbitrary malicious events.
|
||||
pub fn validate_remote_member_event_stub(
|
||||
membership: &MembershipState,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
event_stub: &CanonicalJsonObject,
|
||||
) -> Result<()> {
|
||||
let Some(event_type) = event_stub.get("type") else {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with missing type field"
|
||||
));
|
||||
};
|
||||
if event_type != &RoomMemberEventContent::TYPE {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with invalid event type"
|
||||
));
|
||||
}
|
||||
|
||||
let Some(sender) = event_stub.get("sender") else {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with missing sender field"
|
||||
));
|
||||
};
|
||||
if sender != &user_id.as_str() {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with incorrect sender"
|
||||
));
|
||||
}
|
||||
|
||||
let Some(state_key) = event_stub.get("state_key") else {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with missing state_key field"
|
||||
));
|
||||
};
|
||||
if state_key != &user_id.as_str() {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with incorrect state_key"
|
||||
));
|
||||
}
|
||||
|
||||
let Some(event_room_id) = event_stub.get("room_id") else {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with missing room_id field"
|
||||
));
|
||||
};
|
||||
if event_room_id != &room_id.as_str() {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with incorrect room_id"
|
||||
));
|
||||
}
|
||||
|
||||
let Some(content) = event_stub
|
||||
.get("content")
|
||||
.and_then(|content| content.as_object())
|
||||
else {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with missing content field"
|
||||
));
|
||||
};
|
||||
let Some(event_membership) = content.get("membership") else {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with missing membership field"
|
||||
));
|
||||
};
|
||||
if event_membership != &membership.as_str() {
|
||||
return Err!(BadServerResponse(
|
||||
"Remote server returned member event with incorrect membership type"
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -3,6 +3,7 @@
|
||||
pub mod directory;
|
||||
pub mod event_handler;
|
||||
pub mod lazy_loading;
|
||||
pub mod membership;
|
||||
pub mod metadata;
|
||||
pub mod outlier;
|
||||
pub mod pdu_metadata;
|
||||
@@ -27,6 +28,7 @@ pub struct Service {
|
||||
pub directory: Arc<directory::Service>,
|
||||
pub event_handler: Arc<event_handler::Service>,
|
||||
pub lazy_loading: Arc<lazy_loading::Service>,
|
||||
pub membership: Arc<membership::Service>,
|
||||
pub metadata: Arc<metadata::Service>,
|
||||
pub outlier: Arc<outlier::Service>,
|
||||
pub pdu_metadata: Arc<pdu_metadata::Service>,
|
||||
|
||||
@@ -26,6 +26,7 @@ pub(super) struct Data {
|
||||
tofrom_relation: Arc<Map>,
|
||||
referencedevents: Arc<Map>,
|
||||
softfailedeventids: Arc<Map>,
|
||||
rejectedeventids: Arc<Map>,
|
||||
services: Services,
|
||||
}
|
||||
|
||||
@@ -40,6 +41,7 @@ pub(super) fn new(args: &crate::Args<'_>) -> Self {
|
||||
tofrom_relation: db["tofrom_relation"].clone(),
|
||||
referencedevents: db["referencedevents"].clone(),
|
||||
softfailedeventids: db["softfailedeventids"].clone(),
|
||||
rejectedeventids: db["rejectedeventids"].clone(),
|
||||
services: Services {
|
||||
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
|
||||
},
|
||||
@@ -118,7 +120,29 @@ pub(super) fn mark_event_soft_failed(&self, event_id: &EventId) {
|
||||
self.softfailedeventids.insert(event_id, []);
|
||||
}
|
||||
|
||||
pub(super) fn unmark_event_soft_failed(&self, event_id: &EventId) {
|
||||
self.softfailedeventids.remove(event_id);
|
||||
}
|
||||
|
||||
pub(super) async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
|
||||
self.softfailedeventids.get(event_id).await.is_ok()
|
||||
}
|
||||
|
||||
pub(super) fn mark_event_rejected(&self, event_id: &EventId) {
|
||||
self.rejectedeventids.insert(event_id, []);
|
||||
}
|
||||
|
||||
pub(super) fn unmark_event_rejected(&self, event_id: &EventId) {
|
||||
self.rejectedeventids.remove(event_id);
|
||||
}
|
||||
|
||||
pub(super) async fn is_event_rejected(&self, event_id: &EventId) -> bool {
|
||||
self.rejectedeventids.get(event_id).await.is_ok()
|
||||
}
|
||||
|
||||
/// Removes any soft-fail or rejection markers applied to the target PDU
|
||||
pub(super) fn clear_pdu_markers(&self, event_id: &EventId) {
|
||||
self.unmark_event_rejected(event_id);
|
||||
self.unmark_event_soft_failed(event_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -140,4 +140,28 @@ pub fn mark_event_soft_failed(&self, event_id: &EventId) {
|
||||
pub async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
|
||||
self.db.is_event_soft_failed(event_id).await
|
||||
}
|
||||
|
||||
pub async fn is_event_rejected(&self, event_id: &EventId) -> bool {
|
||||
self.db.is_event_rejected(event_id).await
|
||||
}
|
||||
|
||||
pub fn mark_event_rejected(&self, event_id: &EventId) {
|
||||
self.db.mark_event_rejected(event_id);
|
||||
}
|
||||
|
||||
pub fn unmark_event_soft_failed(&self, event_id: &EventId) {
|
||||
self.db.unmark_event_soft_failed(event_id);
|
||||
}
|
||||
|
||||
pub fn unmark_event_rejected(&self, event_id: &EventId) {
|
||||
self.db.unmark_event_rejected(event_id);
|
||||
}
|
||||
|
||||
/// Returns true if the event is neither soft-failed nor rejected.
|
||||
pub async fn is_event_accepted(&self, event_id: &EventId) -> bool {
|
||||
!self.db.is_event_rejected(event_id).await
|
||||
&& !self.db.is_event_soft_failed(event_id).await
|
||||
}
|
||||
|
||||
pub fn clear_pdu_markers(&self, event_id: &EventId) { self.db.clear_pdu_markers(event_id); }
|
||||
}
|
||||
|
||||
@@ -238,7 +238,7 @@ pub async fn room_joined_count(&self, room_id: &RoomId) -> Result<u64> {
|
||||
#[implement(Service)]
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
/// Returns an iterator of all our local users in the room, even if they're
|
||||
/// deactivated/guests
|
||||
/// deactivated
|
||||
pub fn local_users_in_room<'a>(
|
||||
&'a self,
|
||||
room_id: &'a RoomId,
|
||||
@@ -248,7 +248,7 @@ pub fn local_users_in_room<'a>(
|
||||
}
|
||||
|
||||
/// Returns an iterator of all our local joined users in a room who are
|
||||
/// active (not deactivated, not guest)
|
||||
/// active (not deactivated)
|
||||
#[implement(Service)]
|
||||
#[tracing::instrument(skip(self), level = "trace")]
|
||||
pub fn active_local_users_in_room<'a>(
|
||||
|
||||
@@ -47,7 +47,7 @@ pub async fn update_membership(
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if !self.services.globals.user_is_local(user_id) {
|
||||
if !self.services.users.exists(user_id).await {
|
||||
self.services.users.create(user_id, None, None).await?;
|
||||
self.services.users.create(user_id, None).await?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -53,15 +53,7 @@ pub async fn append_incoming_pdu<'a, Leaves>(
|
||||
.await?;
|
||||
|
||||
if soft_fail {
|
||||
self.services
|
||||
.pdu_metadata
|
||||
.mark_as_referenced(room_id, pdu.prev_events.iter().map(AsRef::as_ref));
|
||||
|
||||
// self.services
|
||||
// .state
|
||||
// .set_forward_extremities(room_id, new_room_leaves, state_lock)
|
||||
// .await;
|
||||
|
||||
// Nothing else to do with a soft-failed event.
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
|
||||
@@ -95,6 +95,7 @@ macro_rules! build {
|
||||
directory: build!(rooms::directory::Service),
|
||||
event_handler: build!(rooms::event_handler::Service),
|
||||
lazy_loading: build!(rooms::lazy_loading::Service),
|
||||
membership: build!(rooms::membership::Service),
|
||||
metadata: build!(rooms::metadata::Service),
|
||||
outlier: build!(rooms::outlier::Service),
|
||||
pdu_metadata: build!(rooms::pdu_metadata::Service),
|
||||
|
||||
+8
-27
@@ -4,7 +4,7 @@
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use conduwuit::{Err, Error, Result, error, utils, utils::hash};
|
||||
use conduwuit::{Err, Error, Result, error, utils};
|
||||
use lettre::Address;
|
||||
use ruma::{
|
||||
UserId,
|
||||
@@ -377,32 +377,13 @@ async fn check_stage(
|
||||
));
|
||||
};
|
||||
|
||||
// Check if password is correct
|
||||
let mut password_verified = false;
|
||||
|
||||
// First try local password hash verification
|
||||
if let Ok(hash) = self.services.users.password_hash(&user_id).await {
|
||||
password_verified = hash::verify_password(password, &hash).is_ok();
|
||||
}
|
||||
|
||||
// If local password verification failed, try LDAP authentication
|
||||
#[cfg(feature = "ldap")]
|
||||
if !password_verified && self.services.config.ldap.enable {
|
||||
// Search for user in LDAP to get their DN
|
||||
if let Ok(dns) = self.services.users.search_ldap(&user_id).await {
|
||||
if let Some((user_dn, _is_admin)) = dns.first() {
|
||||
// Try to authenticate with LDAP
|
||||
password_verified = self
|
||||
.services
|
||||
.users
|
||||
.auth_ldap(user_dn, password)
|
||||
.await
|
||||
.is_ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if password_verified {
|
||||
if self
|
||||
.services
|
||||
.users
|
||||
.check_password(&user_id, password)
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
identity.try_set_localpart(user_id.localpart().to_owned())?;
|
||||
|
||||
Ok(AuthType::Password)
|
||||
|
||||
+165
-244
@@ -1,24 +1,16 @@
|
||||
pub(super) mod dehydrated_device;
|
||||
|
||||
#[cfg(feature = "ldap")]
|
||||
use std::collections::HashMap;
|
||||
use std::{collections::BTreeMap, mem, net::IpAddr, sync::Arc};
|
||||
|
||||
#[cfg(feature = "ldap")]
|
||||
use conduwuit::result::LogErr;
|
||||
use conduwuit::{
|
||||
Err, Error, Result, Server, debug_warn, err, is_equal_to, trace,
|
||||
Err, Error, Result, Server, debug_error, debug_warn, err, trace,
|
||||
utils::{self, ReadyExt, stream::TryIgnore, string::Unquoted},
|
||||
};
|
||||
#[cfg(feature = "ldap")]
|
||||
use conduwuit_core::{debug, error};
|
||||
use database::{Deserialized, Ignore, Interfix, Json, Map};
|
||||
use futures::{Stream, StreamExt, TryFutureExt};
|
||||
#[cfg(feature = "ldap")]
|
||||
use ldap3::{LdapConnAsync, LdapConnSettings, Scope, SearchEntry};
|
||||
use ruma::{
|
||||
DeviceId, KeyId, MilliSecondsSinceUnixEpoch, OneTimeKeyAlgorithm, OneTimeKeyId,
|
||||
OneTimeKeyName, OwnedDeviceId, OwnedKeyId, OwnedMxcUri, OwnedUserId, RoomId, UInt, UserId,
|
||||
DeviceId, MilliSecondsSinceUnixEpoch, OneTimeKeyAlgorithm, OneTimeKeyId, OneTimeKeyName,
|
||||
OwnedDeviceId, OwnedKeyId, OwnedMxcUri, OwnedOneTimeKeyId, OwnedUserId, RoomId, UInt, UserId,
|
||||
api::{
|
||||
client::{device::Device, filter::FilterDefinition},
|
||||
error::ErrorKind,
|
||||
@@ -46,6 +38,19 @@ pub struct UserSuspension {
|
||||
pub suspended_by: String,
|
||||
}
|
||||
|
||||
/// A password hash. This is only for use when setting a user's password,
|
||||
/// if the hash needs to be kept around for a while without keeping the password
|
||||
/// in memory.
|
||||
pub struct HashedPassword(String);
|
||||
|
||||
impl HashedPassword {
|
||||
pub fn new(password: &str) -> Result<Self> {
|
||||
Ok(Self(utils::hash::password(password).map_err(|e| {
|
||||
err!(Request(InvalidParam("Password does not meet the requirements: {e}")))
|
||||
})?))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Service {
|
||||
services: Services,
|
||||
db: Data,
|
||||
@@ -65,6 +70,7 @@ struct Data {
|
||||
keychangeid_userid: Arc<Map>,
|
||||
keyid_key: Arc<Map>,
|
||||
onetimekeyid_onetimekeys: Arc<Map>,
|
||||
fallbackkeyid_fallbackkey: Arc<Map>,
|
||||
openidtoken_expiresatuserid: Arc<Map>,
|
||||
logintoken_expiresatuserid: Arc<Map>,
|
||||
todeviceid_events: Arc<Map>,
|
||||
@@ -79,7 +85,6 @@ struct Data {
|
||||
userid_displayname: Arc<Map>,
|
||||
userid_lastonetimekeyupdate: Arc<Map>,
|
||||
userid_masterkeyid: Arc<Map>,
|
||||
userid_origin: Arc<Map>,
|
||||
userid_password: Arc<Map>,
|
||||
userid_suspension: Arc<Map>,
|
||||
userid_lock: Arc<Map>,
|
||||
@@ -106,6 +111,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
keychangeid_userid: args.db["keychangeid_userid"].clone(),
|
||||
keyid_key: args.db["keyid_key"].clone(),
|
||||
onetimekeyid_onetimekeys: args.db["onetimekeyid_onetimekeys"].clone(),
|
||||
fallbackkeyid_fallbackkey: args.db["fallbackkeyid_fallbackkey"].clone(),
|
||||
openidtoken_expiresatuserid: args.db["openidtoken_expiresatuserid"].clone(),
|
||||
logintoken_expiresatuserid: args.db["logintoken_expiresatuserid"].clone(),
|
||||
todeviceid_events: args.db["todeviceid_events"].clone(),
|
||||
@@ -120,7 +126,6 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
userid_displayname: args.db["userid_displayname"].clone(),
|
||||
userid_lastonetimekeyupdate: args.db["userid_lastonetimekeyupdate"].clone(),
|
||||
userid_masterkeyid: args.db["userid_masterkeyid"].clone(),
|
||||
userid_origin: args.db["userid_origin"].clone(),
|
||||
userid_password: args.db["userid_password"].clone(),
|
||||
userid_suspension: args.db["userid_suspension"].clone(),
|
||||
userid_lock: args.db["userid_lock"].clone(),
|
||||
@@ -178,31 +183,24 @@ pub async fn is_admin(&self, user_id: &UserId) -> bool {
|
||||
}
|
||||
|
||||
/// Create a new user account on this homeserver.
|
||||
///
|
||||
/// User origin is by default "password" (meaning that it will login using
|
||||
/// its user_id/password). Users with other origins (currently only "ldap"
|
||||
/// is available) have special login processes.
|
||||
#[inline]
|
||||
pub async fn create(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
password: Option<&str>,
|
||||
origin: Option<&str>,
|
||||
) -> Result<()> {
|
||||
if !self.services.globals.user_is_local(user_id)
|
||||
&& (password.is_some() || origin.is_some())
|
||||
{
|
||||
return Err!("Cannot create a nonlocal user with a set password or origin");
|
||||
pub async fn create(&self, user_id: &UserId, password: Option<HashedPassword>) -> Result<()> {
|
||||
if !self.services.globals.user_is_local(user_id) && password.is_some() {
|
||||
return Err!("Cannot create a nonlocal user with a set password");
|
||||
}
|
||||
|
||||
self.db
|
||||
.userid_origin
|
||||
.insert(user_id, origin.unwrap_or("password"));
|
||||
self.set_password(user_id, password).await?;
|
||||
self.set_password(user_id, password);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// /// Create a new account for a local human or bot user.
|
||||
// pub async fn create_local_account(
|
||||
// &self,
|
||||
// username: String,
|
||||
// password:
|
||||
// )
|
||||
|
||||
/// Deactivate account
|
||||
pub async fn deactivate_account(&self, user_id: &UserId) -> Result<()> {
|
||||
// Remove all associated devices
|
||||
@@ -214,7 +212,7 @@ pub async fn deactivate_account(&self, user_id: &UserId) -> Result<()> {
|
||||
// result in an empty string, so the user will not be able to log in again.
|
||||
// Systems like changing the password without logging in should check if the
|
||||
// account is deactivated.
|
||||
self.set_password(user_id, None).await?;
|
||||
self.set_password(user_id, None);
|
||||
|
||||
// TODO: Unhook 3PID
|
||||
Ok(())
|
||||
@@ -257,10 +255,12 @@ pub async fn lock_account(&self, user_id: &UserId, locking_user: &UserId) {
|
||||
|
||||
pub async fn unlock_account(&self, user_id: &UserId) { self.db.userid_lock.remove(user_id); }
|
||||
|
||||
/// Check if a user has an account on this homeserver.
|
||||
/// Check if the provided user ID belongs to an existing (possibly
|
||||
/// deactivated) account on this homeserver.
|
||||
#[inline]
|
||||
pub async fn exists(&self, user_id: &UserId) -> bool {
|
||||
self.db.userid_password.get(user_id).await.is_ok()
|
||||
self.services.globals.user_is_local(user_id)
|
||||
&& self.db.userid_password.get(user_id).await.is_ok()
|
||||
}
|
||||
|
||||
/// Check if account is deactivated
|
||||
@@ -360,46 +360,42 @@ pub fn list_local_users(&self) -> impl Stream<Item = OwnedUserId> + Send + '_ {
|
||||
.ready_filter_map(|(u, p): (OwnedUserId, &[u8])| (!p.is_empty()).then_some(u))
|
||||
}
|
||||
|
||||
/// Returns the origin of the user (password/LDAP/...).
|
||||
pub async fn origin(&self, user_id: &UserId) -> Result<String> {
|
||||
self.db.userid_origin.get(user_id).await.deserialized()
|
||||
/// Set a user's password.
|
||||
pub fn set_password(&self, user_id: &UserId, password: Option<HashedPassword>) {
|
||||
if let Some(hash) = password {
|
||||
self.db.userid_password.insert(user_id, hash.0);
|
||||
} else {
|
||||
self.db.userid_password.insert(user_id, b"");
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the password hash for the given user.
|
||||
pub async fn password_hash(&self, user_id: &UserId) -> Result<String> {
|
||||
self.db.userid_password.get(user_id).await.deserialized()
|
||||
}
|
||||
|
||||
/// Hash and set the user's password to the Argon2 hash
|
||||
pub async fn set_password(&self, user_id: &UserId, password: Option<&str>) -> Result<()> {
|
||||
// Cannot change the password of a LDAP user. There are two special cases :
|
||||
// - a `None` password can be used to deactivate a LDAP user
|
||||
// - a "*" password is used as the default password of an active LDAP user
|
||||
if cfg!(feature = "ldap")
|
||||
&& password.is_some_and(|pwd| pwd != "*")
|
||||
&& self
|
||||
.db
|
||||
.userid_origin
|
||||
.get(user_id)
|
||||
.await
|
||||
.deserialized::<String>()
|
||||
.is_ok_and(is_equal_to!("ldap"))
|
||||
/// Check a user's password.
|
||||
pub async fn check_password(&self, user_id: &UserId, password: &str) -> Result<OwnedUserId> {
|
||||
let (hash, user_id): (String, OwnedUserId) = if let Ok(hash) =
|
||||
self.db.userid_password.get(user_id).await.deserialized()
|
||||
{
|
||||
return Err!(Request(InvalidParam("Cannot change password of a LDAP user")));
|
||||
(hash, user_id.to_owned())
|
||||
} else {
|
||||
// We also check the lowercased version of the user ID to handle legacy user IDs
|
||||
// better
|
||||
let lowercase_user_id = UserId::parse(user_id.as_str().to_lowercase()).unwrap();
|
||||
|
||||
if let Ok(hash) = self.db.userid_password.get(user_id).await.deserialized() {
|
||||
(hash, lowercase_user_id)
|
||||
} else {
|
||||
return Err!(Request(InvalidParam("This user cannot log in with a password.")));
|
||||
}
|
||||
};
|
||||
|
||||
if hash.is_empty() {
|
||||
return Err!(Request(UserDeactivated("This user is deactivated")));
|
||||
}
|
||||
|
||||
password
|
||||
.map(utils::hash::password)
|
||||
.transpose()
|
||||
.map_err(|e| {
|
||||
err!(Request(InvalidParam("Password does not meet the requirements: {e}")))
|
||||
})?
|
||||
.map_or_else(
|
||||
|| self.db.userid_password.insert(user_id, b""),
|
||||
|hash| self.db.userid_password.insert(user_id, hash),
|
||||
);
|
||||
utils::hash::verify_password(password, &hash)
|
||||
.inspect_err(|e| debug_error!("{e}"))
|
||||
.map_err(|_| err!(Request(Forbidden("Invalid identifier or password."))))?;
|
||||
|
||||
Ok(())
|
||||
Ok(user_id)
|
||||
}
|
||||
|
||||
/// Returns the displayname of a user on this homeserver.
|
||||
@@ -595,7 +591,7 @@ pub async fn add_one_time_key(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
device_id: &DeviceId,
|
||||
one_time_key_key: &KeyId<OneTimeKeyAlgorithm, OneTimeKeyName>,
|
||||
one_time_key_key: &OneTimeKeyId,
|
||||
one_time_key_value: &Raw<OneTimeKey>,
|
||||
) -> Result {
|
||||
// All devices have metadata
|
||||
@@ -632,6 +628,39 @@ pub async fn add_one_time_key(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Save a fallback key for the given user, device, and algorithm
|
||||
/// This key will replace an existing fallback key
|
||||
pub async fn add_fallback_key(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
device_id: &DeviceId,
|
||||
fallback_key_id: &OneTimeKeyId,
|
||||
fallback_key: &Raw<OneTimeKey>,
|
||||
used: bool,
|
||||
) -> Result {
|
||||
// All devices have metadata
|
||||
// Only existing devices should be able to call this, but we shouldn't assert
|
||||
// either...
|
||||
let key = (user_id, device_id);
|
||||
if self.db.userdeviceid_metadata.qry(&key).await.is_err() {
|
||||
return Err!(Database(error!(
|
||||
%user_id,
|
||||
%device_id,
|
||||
"User does not exist or device has no metadata."
|
||||
)));
|
||||
}
|
||||
|
||||
// There is one fallback key slot per user, per device, per algorithm
|
||||
// Therefore we use this as the DB key for this column
|
||||
let db_key = (user_id, device_id, fallback_key_id.algorithm());
|
||||
|
||||
self.db
|
||||
.fallbackkeyid_fallbackkey
|
||||
.put(db_key, (used, fallback_key_id.as_str(), Json(fallback_key)));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn last_one_time_keys_update(&self, user_id: &UserId) -> u64 {
|
||||
self.db
|
||||
.userid_lastonetimekeyupdate
|
||||
@@ -663,6 +692,8 @@ pub async fn take_one_time_key(
|
||||
.onetimekeyid_onetimekeys
|
||||
.raw_stream_prefix(&prefix)
|
||||
.ignore_err()
|
||||
.next()
|
||||
.await
|
||||
.map(|(key, val)| {
|
||||
self.db.onetimekeyid_onetimekeys.remove(key);
|
||||
|
||||
@@ -681,11 +712,44 @@ pub async fn take_one_time_key(
|
||||
.unwrap();
|
||||
|
||||
(key, val)
|
||||
})
|
||||
.next()
|
||||
.await;
|
||||
});
|
||||
|
||||
one_time_key.ok_or_else(|| err!(Request(NotFound("No one-time-key found"))))
|
||||
if let Some(result) = one_time_key {
|
||||
return Ok(result);
|
||||
}
|
||||
|
||||
// No one-time key has been found. Look for a fallback key.
|
||||
|
||||
let db_key = (user_id, device_id, key_algorithm);
|
||||
|
||||
let fallback_key = self
|
||||
.db
|
||||
.fallbackkeyid_fallbackkey
|
||||
.qry(&db_key)
|
||||
.await
|
||||
.ok()
|
||||
.and_then(|handle| {
|
||||
handle
|
||||
.deserialized::<(bool, OwnedOneTimeKeyId, Raw<OneTimeKey>)>()
|
||||
.ok()
|
||||
});
|
||||
|
||||
if let Some((used, fallback_key_id, fallback_key_value)) = fallback_key {
|
||||
if !used {
|
||||
// write the key to the database again to mark it as used
|
||||
self.add_fallback_key(
|
||||
user_id,
|
||||
device_id,
|
||||
&fallback_key_id,
|
||||
&fallback_key_value,
|
||||
true,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
return Ok((fallback_key_id, fallback_key_value));
|
||||
}
|
||||
|
||||
Err(err!(Request(NotFound("No one-time key or fallback key found"))))
|
||||
}
|
||||
|
||||
pub async fn count_one_time_keys(
|
||||
@@ -718,6 +782,34 @@ pub async fn count_one_time_keys(
|
||||
algorithm_counts
|
||||
}
|
||||
|
||||
pub async fn list_unused_fallback_key_types(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
device_id: &DeviceId,
|
||||
) -> Vec<OneTimeKeyAlgorithm> {
|
||||
type KeyVal = ((String, String, OneTimeKeyAlgorithm), (bool, String, Ignore));
|
||||
|
||||
let mut query = user_id.as_bytes().to_vec();
|
||||
query.push(0xFF);
|
||||
query.extend_from_slice(device_id.as_bytes());
|
||||
query.push(0xFF);
|
||||
|
||||
let mut unused_algorithms = Vec::new();
|
||||
|
||||
self.db
|
||||
.fallbackkeyid_fallbackkey
|
||||
.stream_prefix(&query)
|
||||
.ignore_err()
|
||||
.ready_for_each(|((_, _, fallback_key_algorithm), (used, ..)): KeyVal| {
|
||||
if !used {
|
||||
unused_algorithms.push(fallback_key_algorithm);
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
unused_algorithms
|
||||
}
|
||||
|
||||
pub async fn add_device_keys(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
@@ -1292,177 +1384,6 @@ pub async fn clear_profile(&self, user_id: &UserId) {
|
||||
.ready_for_each(|(key, _)| self.set_profile_key(user_id, &key, None))
|
||||
.await;
|
||||
}
|
||||
|
||||
#[cfg(feature = "ldap")]
|
||||
async fn create_ldap_connection(
|
||||
config: &conduwuit_core::config::LdapConfig,
|
||||
uri: &str,
|
||||
) -> Result<(LdapConnAsync, ldap3::Ldap), ldap3::LdapError> {
|
||||
let mut settings = LdapConnSettings::new();
|
||||
|
||||
if config.use_starttls {
|
||||
settings = settings.set_starttls(true);
|
||||
}
|
||||
|
||||
if config.disable_tls_verification {
|
||||
settings = settings.set_no_tls_verify(true);
|
||||
}
|
||||
|
||||
LdapConnAsync::with_settings(settings, uri).await
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "ldap"))]
|
||||
pub async fn search_ldap(&self, _user_id: &UserId) -> Result<Vec<(String, Option<bool>)>> {
|
||||
Err!(FeatureDisabled("ldap"))
|
||||
}
|
||||
|
||||
#[cfg(feature = "ldap")]
|
||||
pub async fn search_ldap(&self, user_id: &UserId) -> Result<Vec<(String, Option<bool>)>> {
|
||||
let localpart = user_id.localpart().to_owned();
|
||||
let lowercased_localpart = localpart.to_lowercase();
|
||||
|
||||
let config = &self.services.server.config.ldap;
|
||||
let uri = config
|
||||
.uri
|
||||
.as_ref()
|
||||
.ok_or_else(|| err!(Ldap(error!("LDAP URI is not configured."))))?;
|
||||
|
||||
debug!(?uri, "LDAP creating connection...");
|
||||
let (conn, mut ldap) = Self::create_ldap_connection(config, uri.as_str())
|
||||
.await
|
||||
.map_err(|e| err!(Ldap(error!(%user_id, "LDAP connection setup error: {e}"))))?;
|
||||
|
||||
let driver = self.services.server.runtime().spawn(async move {
|
||||
match conn.drive().await {
|
||||
| Err(e) => error!("LDAP connection error: {e}"),
|
||||
| Ok(()) => debug!("LDAP connection completed."),
|
||||
}
|
||||
});
|
||||
|
||||
match (&config.bind_dn, &config.bind_password_file) {
|
||||
| (Some(bind_dn), Some(bind_password_file)) => {
|
||||
let bind_pw = String::from_utf8(std::fs::read(bind_password_file)?)?;
|
||||
ldap.simple_bind(bind_dn, bind_pw.trim())
|
||||
.await
|
||||
.and_then(ldap3::LdapResult::success)
|
||||
.map_err(|e| err!(Ldap(error!("LDAP bind error: {e}"))))?;
|
||||
},
|
||||
| (..) => {},
|
||||
}
|
||||
|
||||
let attr = [&config.uid_attribute, &config.name_attribute];
|
||||
|
||||
let user_filter = &config.filter.replace("{username}", &lowercased_localpart);
|
||||
|
||||
let (entries, _result) = ldap
|
||||
.search(&config.base_dn, Scope::Subtree, user_filter, &attr)
|
||||
.await
|
||||
.and_then(ldap3::SearchResult::success)
|
||||
.inspect(|(entries, result)| trace!(?entries, ?result, "LDAP Search"))
|
||||
.map_err(|e| err!(Ldap(error!(?attr, ?user_filter, "LDAP search error: {e}"))))?;
|
||||
|
||||
let mut dns: HashMap<String, Option<bool>> = entries
|
||||
.into_iter()
|
||||
.filter_map(|entry| {
|
||||
let search_entry = SearchEntry::construct(entry);
|
||||
debug!(?search_entry, "LDAP search entry");
|
||||
search_entry
|
||||
.attrs
|
||||
.get(&config.uid_attribute)
|
||||
.into_iter()
|
||||
.chain(search_entry.attrs.get(&config.name_attribute))
|
||||
.any(|ids| ids.contains(&localpart) || ids.contains(&lowercased_localpart))
|
||||
.then_some((search_entry.dn, None))
|
||||
})
|
||||
.collect();
|
||||
|
||||
if !config.admin_filter.is_empty() {
|
||||
// Update all existing entries to Some(false) since we can now determine admin
|
||||
// status
|
||||
for admin_status in dns.values_mut() {
|
||||
*admin_status = Some(false);
|
||||
}
|
||||
let admin_base_dn = if config.admin_base_dn.is_empty() {
|
||||
&config.base_dn
|
||||
} else {
|
||||
&config.admin_base_dn
|
||||
};
|
||||
|
||||
let admin_filter = &config
|
||||
.admin_filter
|
||||
.replace("{username}", &lowercased_localpart);
|
||||
|
||||
let (admin_entries, _result) = ldap
|
||||
.search(admin_base_dn, Scope::Subtree, admin_filter, &attr)
|
||||
.await
|
||||
.and_then(ldap3::SearchResult::success)
|
||||
.inspect(|(entries, result)| trace!(?entries, ?result, "LDAP Admin Search"))
|
||||
.map_err(|e| {
|
||||
err!(Ldap(error!(?attr, ?admin_filter, "Ldap admin search error: {e}")))
|
||||
})?;
|
||||
|
||||
dns.extend(admin_entries.into_iter().filter_map(|entry| {
|
||||
let search_entry = SearchEntry::construct(entry);
|
||||
debug!(?search_entry, "LDAP search entry");
|
||||
search_entry
|
||||
.attrs
|
||||
.get(&config.uid_attribute)
|
||||
.into_iter()
|
||||
.chain(search_entry.attrs.get(&config.name_attribute))
|
||||
.any(|ids| ids.contains(&localpart) || ids.contains(&lowercased_localpart))
|
||||
.then_some((search_entry.dn, Some(true)))
|
||||
}));
|
||||
}
|
||||
|
||||
ldap.unbind()
|
||||
.await
|
||||
.map_err(|e| err!(Ldap(error!("LDAP unbind error: {e}"))))?;
|
||||
|
||||
driver.await.log_err().ok();
|
||||
|
||||
Ok(dns.drain().collect())
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "ldap"))]
|
||||
pub async fn auth_ldap(&self, _user_dn: &str, _password: &str) -> Result {
|
||||
Err!(FeatureDisabled("ldap"))
|
||||
}
|
||||
|
||||
#[cfg(feature = "ldap")]
|
||||
pub async fn auth_ldap(&self, user_dn: &str, password: &str) -> Result {
|
||||
let config = &self.services.server.config.ldap;
|
||||
let uri = config
|
||||
.uri
|
||||
.as_ref()
|
||||
.ok_or_else(|| err!(Ldap(error!("LDAP URI is not configured."))))?;
|
||||
|
||||
debug!(?uri, "LDAP creating connection...");
|
||||
let (conn, mut ldap) = Self::create_ldap_connection(config, uri.as_str())
|
||||
.await
|
||||
.map_err(|e| err!(Ldap(error!(%user_dn, "LDAP connection setup error: {e}"))))?;
|
||||
|
||||
let driver = self.services.server.runtime().spawn(async move {
|
||||
match conn.drive().await {
|
||||
| Err(e) => error!("LDAP connection error: {e}"),
|
||||
| Ok(()) => debug!("LDAP connection completed."),
|
||||
}
|
||||
});
|
||||
|
||||
ldap.simple_bind(user_dn, password)
|
||||
.await
|
||||
.and_then(ldap3::LdapResult::success)
|
||||
.map_err(|e| {
|
||||
err!(Request(Forbidden(debug_error!("LDAP authentication error: {e}"))))
|
||||
})?;
|
||||
|
||||
ldap.unbind()
|
||||
.await
|
||||
.map_err(|e| err!(Ldap(error!("LDAP unbind error: {e}"))))?;
|
||||
|
||||
driver.await.log_err().ok();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse_master_key(
|
||||
|
||||
+1
-1
@@ -14,7 +14,7 @@ conduwuit-admin.workspace = true
|
||||
conduwuit.workspace = true
|
||||
clap.workspace = true
|
||||
|
||||
askama = "0.15.1"
|
||||
askama = "0.16.0"
|
||||
cargo_metadata = "0.23.1"
|
||||
|
||||
[lints]
|
||||
|
||||
Reference in New Issue
Block a user