Compare commits

..

71 Commits

Author SHA1 Message Date
timedout 3b6858e936 perf: Throttle dummy events to prevent stampeding 2026-06-23 15:26:28 +01:00
timedout 7ca00e4ab9 chore: Drop unused param from handle_outlier_pdu 2026-06-20 17:08:00 +01:00
timedout b8ca06029f perf: Use a hashmap for full-state filtering 2026-06-20 17:08:00 +01:00
timedout 2efe8f2ec0 fix: Inverted power level check in extremity squash 2026-06-20 17:08:00 +01:00
timedout d1aa911739 fix: Rename variables in auth event fetcher
Also fixes a couple bugs where events were being misattributed
2026-06-20 17:08:00 +01:00
timedout 4673282ca1 perf: Don't try to fetch prevs we already fetched
graphs are hard
2026-06-20 17:08:00 +01:00
timedout 0c03195aec fix: Fall back to legacy behaviour when prev events are missed from get_missing_events 2026-06-20 17:08:00 +01:00
timedout 439bc2784d style: Use more explicit variable names 2026-06-20 17:08:00 +01:00
timedout 689a1ce59b style: Use user_can_send_message 2026-06-20 17:08:00 +01:00
timedout c141503ccb style: Re-use GET_MISSING_EVENTS_MAX_BATCH_SIZE 2026-06-20 17:08:00 +01:00
timedout bcadecdc3b feat: Add !admin debug rooms-by-extremity-count command 2026-06-20 17:08:00 +01:00
timedout ab3be337cb chore: Reformat 2026-06-20 17:08:00 +01:00
timedout aa1281a9e0 fix: Prevent arbitrary state injection attack 2026-06-20 17:08:00 +01:00
timedout 115a2e802e style: Check power levels before attempting to send extremity squashes
Solves a problem where the console screams in agony when local users can't send dummy events
2026-06-20 17:08:00 +01:00
timedout 968328d788 perf: Squash weird mutable variable 2026-06-20 17:08:00 +01:00
timedout f1cde5f323 style: Fix up some TODOs 2026-06-20 17:08:00 +01:00
timedout 832ee8650b style: Adjust docstrings and dodgy comment 2026-06-20 17:08:00 +01:00
timedout c0666a1793 fix: Default PDU content to empty object instead of literal NULL 2026-06-20 17:08:00 +01:00
timedout ddb4ef539f fix: un-forget how streams work 2026-06-20 17:08:00 +01:00
timedout 3faedc4581 perf: Remove huge clone and tackle TODOs 2026-06-20 17:08:00 +01:00
timedout a3544353ba feat: Automatically squash extremities when they exceed a threshold
Attempts to tackle #1844
2026-06-20 17:08:00 +01:00
timedout b0612397d3 style: Tidy up 2026-06-20 17:08:00 +01:00
timedout 31737e127e fix: Make fetch_state_ids_from_backfill_servers candidate-free safe 2026-06-20 17:08:00 +01:00
timedout 486dcd208c style: Resolve lint complaints 2026-06-20 17:08:00 +01:00
timedout a945a4b2ad fix: Correctly handle still-missing state, always fetch full state atomically if regular fetch fails 2026-06-20 17:08:00 +01:00
timedout c28ea44e11 fix: Correct inverted boolean condition, add explicit timeout on /state fetch 2026-06-20 17:08:00 +01:00
timedout 3e4d6b2565 perf: Always fetch at least N events per GME 2026-06-20 17:08:00 +01:00
timedout 29ce21cd2e fix: Correctly pre-populate state events vec with known events 2026-06-20 17:08:00 +01:00
timedout d461c6977a fix: Friendly assertations 2026-06-20 17:08:00 +01:00
timedout 5a0d6461d1 perf: Don't try to re-persist non-outliers we already have 2026-06-20 17:08:00 +01:00
timedout 2ec7394785 perf: Don't add trees we already have to latest boundary 2026-06-20 17:08:00 +01:00
timedout 8a5708b9f9 fix: Be noisy when there's no incoming state 2026-06-20 17:08:00 +01:00
timedout cd46070bd3 fix: Elide auth chain from fetch_and_handle_outliers 2026-06-20 17:08:00 +01:00
timedout 9930e549a0 fix: Progress log in fetch_prev 2026-06-20 17:08:00 +01:00
timedout 36ed20cb04 fix: Downgrade safe assert to debug assert 2026-06-20 17:08:00 +01:00
timedout 3907589b6c fix: Don't download the world 2026-06-20 17:08:00 +01:00
timedout 2a598af888 feat: Make logging more verbose to diagnose the aranjesplosion 2026-06-20 17:08:00 +01:00
timedout f34f84832b feat: Include timing information in debug logs 2026-06-20 17:08:00 +01:00
timedout 0efa6ed1f2 fix: Don't treat prev outlier upgrades as fetch failures 2026-06-20 17:08:00 +01:00
timedout 2bf5876778 fix: Ask more servers for state_ids when origin fails to provide
Some servers reference events in prev_events that they might not yet have finished processing, so this allows us to at least attempt to get the state from another trustworthy server in the room that might be faster. I don't think this is too effective, however it's more effective than giving up immediately.
2026-06-20 17:08:00 +01:00
timedout 31982e84de fix: Remove redundant check 2
This may look scary, but this is safe because event auth performs the same check, and will reject the event if it doesn't reference the create event correctly.
2026-06-20 17:08:00 +01:00
timedout 970958652a fix: Remove redundant check that accidentally banned everyone 2026-06-20 17:08:00 +01:00
timedout c94a395bf0 fix: Make PDU handle errors noisier & correct error types 2026-06-20 17:08:00 +01:00
timedout 43adff926f fix: Make dedupe noisy, don't allow non-create event as create event 2026-06-20 17:07:59 +01:00
timedout 04fce56381 fix: Don't silence PDU handle logs 2026-06-20 17:07:59 +01:00
timedout 4df2097e6c style: Rename gapfill helpers instruments 2026-06-20 17:07:59 +01:00
timedout 8b85b04d10 fix: Properly remove event_id from the PDU JSON before upgrading it 2026-06-20 17:07:59 +01:00
timedout 9e0bcd3be8 fix: Hold a federation room lock while remotely joining a room 2026-06-20 17:07:59 +01:00
timedout 9509080e0d fix: Replace our local extremity tracking when joining a disconnected room remotely 2026-06-20 17:07:59 +01:00
timedout 61066bb0c6 fix: Don't try and fetch zero events 2026-06-20 17:07:59 +01:00
timedout 573d5bc50e fix: Fall back to atomic fetch when full-state fetch fails 2026-06-20 17:07:59 +01:00
timedout 162e6eb92f fix: Remove short-term memory loss
I keep writing forgetful code, it's a problem
2026-06-20 17:07:59 +01:00
timedout cef4ebe38e fix: Don't try to fetch the same event endlessly 2026-06-20 17:07:59 +01:00
timedout 316a0b7d58 fix: Don't repeat already-included metadata in fetch_state instrument 2026-06-20 17:07:59 +01:00
timedout 2bcc56704b feat: Enhance reliability by fetching full state when we're missing a lot of auth events 2026-06-20 17:07:59 +01:00
timedout 7f64de9727 fix: Calculate max iterations dynamically, and bump max prevs 2026-06-20 17:07:59 +01:00
timedout aea03f2f99 perf(wip): Improve individual events fetcher 2026-06-20 17:07:59 +01:00
timedout 8edf9552b8 fix: Don't lie about using already-known content 2026-06-20 17:07:59 +01:00
timedout d5f69c8a31 fix: Be smarter when re-receiving already-seen PDUs 2026-06-20 17:07:59 +01:00
timedout 9ea9b0e04c perf: Don't re-process events as outliers 2026-06-20 17:07:59 +01:00
timedout e8db01fc8d style: Improve logging 2026-06-20 17:07:59 +01:00
timedout f80e1e89a5 fix: Lower floor for min depth 2026-06-20 17:07:59 +01:00
timedout b9dca84acf fix: Only increment mindepth on state events 2026-06-20 17:07:59 +01:00
timedout e3ec1066c4 chore: Add newsfrag 2026-06-20 17:07:59 +01:00
timedout 1445a8d446 feat: Keep track of a min_depth value
Should prevent weird situations where we accidentally gapfill into backfill territory
2026-06-20 17:07:59 +01:00
timedout 9547c438d6 perf: Increase default max_fetch_prev_events to 256 2026-06-20 17:07:59 +01:00
timedout 51d0e615f5 perf: Make max gap depth fetch configurable 2026-06-20 17:07:59 +01:00
timedout eeb937416c perf: Improve gap filling, handle missing auth events better 2026-06-20 17:07:59 +01:00
timedout 0d5aa7ede1 fix: This is some bullshit I tell you 2026-06-20 17:07:59 +01:00
timedout ba9dc27773 feat: Better prev event fetching
fix: Don't panic in debug mode when making an empty notary query
2026-06-20 17:07:59 +01:00
timedout abf5a155ba feat: Add backfill_missing_events helper 2026-06-20 17:07:59 +01:00
48 changed files with 1644 additions and 811 deletions
+1 -1
View File
@@ -71,7 +71,7 @@ runs:
- name: Install timelord-cli and git-warp-time
if: steps.check-binaries.outputs.need-install == 'true'
uses: https://github.com/taiki-e/install-action@9e1e5806d4a4822de933115878265be9aaa786d9 # v2
uses: https://github.com/taiki-e/install-action@15449e3094499af05d8d964a1c884208e4b8b595 # v2
with:
tool: git-warp-time,timelord-cli@3.0.1
+3 -3
View File
@@ -43,7 +43,7 @@ jobs:
# fi
- name: Checkout repository with full history
uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7
uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6
with:
fetch-depth: 0
ref: ${{ github.ref_name }}
@@ -82,10 +82,10 @@ jobs:
# VERSION is the package version, COMPONENT is used in
# apt's repository config like a git repo branch
VERSION=$BASE_VERSION
if [[ ${{ forge.ref_name }} =~ ^v+[0-9]+\.+[0-9]+\.+[0-9]+$ ]]; then
if [[ ${{ forge.ref_name }} =~ ^v+[0-9]\.+[0-9]\.+[0-9]$ ]]; then
# Use the "stable" component for tagged semver releases
COMPONENT="stable"
elif [[ ${{ forge.ref_name }} =~ ^v+[0-9]+\.+[0-9]+\.+[0-9]+ ]]; then
elif [[ ${{ forge.ref_name }} =~ ^v+[0-9]\.+[0-9]\.+[0-9] ]]; then
# Use the "unstable" component for tagged semver pre-releases
COMPONENT="unstable"
else
+1 -1
View File
@@ -30,7 +30,7 @@ jobs:
echo "Fedora version: $VERSION"
- name: Checkout repository with full history
uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7
uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6
with:
fetch-depth: 0
ref: ${{ github.ref_name }}
+1 -1
View File
@@ -65,7 +65,7 @@ jobs:
path: binaries
merge-multiple: true
- name: Create Release and Upload
uses: https://github.com/softprops/action-gh-release@718ea10b132b3b2eba29c1007bb80653f286566b # v3
uses: https://github.com/softprops/action-gh-release@b4309332981a82ec1c5618f44dd2e27cc8bfbfda # v3
with:
draft: true
files: binaries/*
+1 -1
View File
@@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7
uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6
with:
ref: ${{ github.event.pull_request.head.sha }}
fetch-depth: 0
+1 -1
View File
@@ -21,7 +21,7 @@ jobs:
steps:
- name: Sync repository
uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7
uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6
with:
persist-credentials: false
fetch-depth: 0
+2 -2
View File
@@ -41,7 +41,7 @@ jobs:
DOCKER_MIRROR_TOKEN: ${{ secrets.DOCKER_MIRROR_TOKEN }}
steps:
- name: Checkout repository
uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7
uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6
with:
persist-credentials: false
@@ -55,7 +55,7 @@ jobs:
# repositories: continuwuity
- name: Install regsync
uses: https://github.com/regclient/actions/regsync-installer@4b4db1dcc7dad75ad67a788a380f75a20cc8a040 # main
uses: https://github.com/regclient/actions/regsync-installer@14f9d37db17b5dc41fefd1ffdd1af4b9e2490560 # main
- name: Check what images need mirroring
run: |
+3 -3
View File
@@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7
uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6
with:
persist-credentials: false
@@ -48,7 +48,7 @@ jobs:
rust: ${{ steps.filter.outputs.rust }}
steps:
- name: Checkout repository
uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7
uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6
with:
persist-credentials: false
@@ -70,7 +70,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7
uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6
with:
persist-credentials: false
+5 -5
View File
@@ -46,7 +46,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7
uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6
with:
persist-credentials: false
- name: Prepare Docker build environment
@@ -100,7 +100,7 @@ jobs:
needs: build-release
steps:
- name: Checkout repository
uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7
uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6
with:
persist-credentials: false
- name: Create multi-platform manifest
@@ -133,7 +133,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7
uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6
with:
persist-credentials: false
- name: Prepare max-perf Docker build environment
@@ -187,7 +187,7 @@ jobs:
needs: build-maxperf
steps:
- name: Checkout repository
uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7
uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6
with:
persist-credentials: false
- name: Create max-perf manifest
@@ -216,7 +216,7 @@ jobs:
path: binaries
merge-multiple: true
- name: Create Release and Upload
uses: https://github.com/softprops/action-gh-release@718ea10b132b3b2eba29c1007bb80653f286566b # v3
uses: https://github.com/softprops/action-gh-release@b4309332981a82ec1c5618f44dd2e27cc8bfbfda # v3
with:
draft: true
files: binaries/*
+2 -2
View File
@@ -43,11 +43,11 @@ jobs:
name: Renovate
runs-on: ubuntu-latest
container:
image: ghcr.io/renovatebot/renovate:43.234.0@sha256:bff111bfe347c559c615b658b28721eba5b7bb32a7b7901ea321336767209fe1
image: ghcr.io/renovatebot/renovate:43.222.1@sha256:b9af3f59f3f4d92b2c41e9f4ca3ffe92400503f20158d0bd67d07a3fdbe781d2
options: --tmpfs /tmp:exec
steps:
- name: Checkout
uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7
uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6
with:
show-progress: false
+3 -6
View File
@@ -14,7 +14,7 @@ jobs:
update-flake-hashes:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7
- uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6
with:
persist-credentials: true
token: ${{ secrets.FORGEJO_TOKEN }}
@@ -27,7 +27,7 @@ jobs:
- name: Get new toolchain hash
run: |
# Set the current sha256 to an empty hash to make `nix build` calculate a new one
awk '/fromToolchainName *\{/{found=1; print; next} found && /sha256 =/{sub(/sha256 = .*/, "sha256 = lib.fakeSha256;"); found=0} 1' nix/rust.nix > temp.nix
awk '/fromToolchainFile *\{/{found=1; print; next} found && /sha256 =/{sub(/sha256 = .*/, "sha256 = lib.fakeSha256;"); found=0} 1' nix/rust.nix > temp.nix
mv temp.nix nix/rust.nix
# Build continuwuity and filter for the new hash
@@ -39,15 +39,12 @@ jobs:
sed -i "s|lib.fakeSha256|\"$new_hash\"|" nix/rust.nix
echo "New hash:"
awk -F'"' '/fromToolchainName/{found=1; next} found && /sha256 =/{print $2; found=0}' nix/rust.nix
awk -F'"' '/fromToolchainFile/{found=1; next} found && /sha256 =/{print $2; found=0}' nix/rust.nix
echo "Expected new hash:"
cat new_toolchain_hash.txt
rm new_toolchain_hash.txt
- name: Update rocksdb
run: nix run .#update-rocksdb
- name: Show diff
run: git diff flake.nix nix
Generated
+10 -19
View File
@@ -560,9 +560,9 @@ checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495"
[[package]]
name = "bytes"
version = "1.12.0"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ae3f5d315924270530207e2a68396c3cc547f6dca3fbdca317cfb1a51edb593"
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
[[package]]
name = "bytesize"
@@ -889,7 +889,7 @@ dependencies = [
"http-body-util",
"hyper",
"ipaddress",
"itertools 0.15.0",
"itertools 0.14.0",
"lettre",
"log",
"rand 0.10.1",
@@ -942,7 +942,7 @@ dependencies = [
"http-body-util",
"hyper-util",
"ipaddress",
"itertools 0.15.0",
"itertools 0.14.0",
"lettre",
"libc",
"libloading 0.9.0",
@@ -1004,7 +1004,7 @@ name = "conduwuit_macros"
version = "26.6.0-alpha.1"
dependencies = [
"cargo_toml",
"itertools 0.15.0",
"itertools 0.14.0",
"proc-macro2",
"quote",
"syn",
@@ -1070,7 +1070,7 @@ dependencies = [
"http",
"image",
"ipaddress",
"itertools 0.15.0",
"itertools 0.14.0",
"lettre",
"log",
"loole",
@@ -2641,15 +2641,6 @@ dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b4baf93f58d4425749ca49a51c50ebab072c5df6994d08fed93541c331481dc"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.18"
@@ -3954,9 +3945,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.46"
version = "1.0.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfbc457d0c7a0759a614551b11a6409e5951f6c7537be1f1b7682b9ae9230368"
checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924"
dependencies = [
"proc-macro2",
]
@@ -5153,9 +5144,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
version = "2.0.118"
version = "2.0.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9ae57f904213ebb649ce6895b8a66c66f0203b9319718f69a5612a065b1422"
checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99"
dependencies = [
"proc-macro2",
"quote",
+1 -1
View File
@@ -316,7 +316,7 @@ default-features = false
# Used to make working with iterators easier, was already a transitive depdendency
[workspace.dependencies.itertools]
version = "0.15.0"
version = "0.14.0"
# to parse user-friendly time durations in admin commands
#TODO: overlaps chrono?
+2
View File
@@ -0,0 +1,2 @@
Improved the performance and reliability of fetching missing events, improving network partition recovery. Contributed
by @nex.
+9 -1
View File
@@ -297,7 +297,7 @@
# This item is undocumented. Please contribute documentation for it.
#
#max_fetch_prev_events = 192
#max_fetch_prev_events = 1024
# How many incoming federation transactions the server is willing to be
# processing at any given time before it becomes overloaded and starts
@@ -645,6 +645,14 @@
#
#default_room_acl_deny =
# The number of forward extremities to tolerate in a room before
# attempting to manually squash them with a "dummy event". Setting this
# above 20 will hinder its efficacy, and setting it below 5 will cause
# more dummy events to be sent than necessary (which increases federation
# traffic).
#
#dummy_event_threshold = 10
# Enable OpenTelemetry OTLP tracing export. This replaces the deprecated
# Jaeger exporter. Traces will be sent via OTLP to a collector (such as
# Jaeger) that supports the OpenTelemetry Protocol.
+1 -1
View File
@@ -50,7 +50,7 @@ EOF
# Developer tool versions
# renovate: datasource=github-releases depName=cargo-bins/cargo-binstall
ENV BINSTALL_VERSION=1.20.1
ENV BINSTALL_VERSION=1.20.0
# renovate: datasource=github-releases depName=psastras/sbom-rs
ENV CARGO_SBOM_VERSION=0.9.1
# renovate: datasource=crate depName=lddtree
+1 -1
View File
@@ -18,7 +18,7 @@ RUN --mount=type=cache,target=/etc/apk/cache apk add \
# Developer tool versions
# renovate: datasource=github-releases depName=cargo-bins/cargo-binstall
ENV BINSTALL_VERSION=1.20.1
ENV BINSTALL_VERSION=1.20.0
# renovate: datasource=github-releases depName=psastras/sbom-rs
ENV CARGO_SBOM_VERSION=0.9.1
# renovate: datasource=crate depName=lddtree
@@ -6,10 +6,10 @@
"message": "Welcome to Continuwuity! Important announcements about the project will appear here."
},
{
"id": 14,
"id": 13,
"mention_room": true,
"date": "2026-06-20",
"message": "[v0.5.10](https://forgejo.ellis.link/continuwuation/continuwuity/releases/tag/v0.5.10) has been released. It is a security release, so we suggest you update as soon as possible. Don't forget to also join [our announcements room](https://matrix.to/#/!jIdNjSM5X-V5JVx2h2kAhUZIIQ08GyzPL55NFZAH1vM/%24K1ISNKIqfNiZzsNVCaTt2E7ZtNeP6Dsy6sbz9l3rO0A?via=ellis.link&via=gingershaped.computer&via=matrix.org)."
"date": "2026-05-08",
"message": "[v0.5.9](https://forgejo.ellis.link/continuwuation/continuwuity/releases/tag/v0.5.9) has been released, fixing a few low-severity federation-related vulnerabilities. It is recommended you read the changelog and update as soon as possible. There are no new features or other changes in this release, only related bugfixes. Deployments tracking the main branch should also update to the latest commit."
}
]
}
+1 -1
View File
@@ -10,7 +10,7 @@ ## Continuwuity issues
### Slow joins to rooms
Some slowness is to be expected if you're the first person on your homeserver to join a room (which will
Some slowness is to be expected if you're the first person on your homserver to join a room (which will
always be the case for single-user homeservers). In this situation, your homeserver has to verify the signatures of
all of the state events sent by other servers before your join. To make this process as fast as possible, make sure you have
multiple fast, trusted servers listed in `trusted_servers` in your configuration, and ensure
-1
View File
@@ -4,6 +4,5 @@
./packages
./devshell.nix
./fmt.nix
./rocksdb-updater.nix
];
}
+2 -20
View File
@@ -2,10 +2,9 @@
lib,
self,
stdenv,
rocksdb,
liburing,
craneLib,
pkg-config,
liburing,
rustPlatform,
cargoExtraArgs ? "",
rustflags ? "",
@@ -37,17 +36,12 @@ let
pkg-config
rustPlatform.bindgenHook
];
buildInputs = lib.optionals stdenv.hostPlatform.isLinux [ liburing ];
doCheck = false;
env = {
CARGO_PROFILE = profile;
RUSTFLAGS = rustflags;
}
// (lib.optionalAttrs (rocksdb != null) {
ROCKSDB_INCLUDE_DIR = "${rocksdb}/include";
ROCKSDB_LIB_DIR = "${rocksdb}/lib";
})
// (lib.optionalAttrs (target_cpu != null) {
TARGET_CPU = target_cpu;
});
@@ -58,18 +52,6 @@ craneLib.buildPackage (
inherit cargoExtraArgs;
cargoArtifacts = craneLib.buildDepsOnly attrs;
# Needed to make continuwuity link to rocksdb
postFixup = lib.optionalString (stdenv.hostPlatform.isLinux && rocksdb != null) ''
old_rpath="$(patchelf --print-rpath $out/bin/conduwuit)"
extra_rpath="${
lib.makeLibraryPath [
rocksdb
]
}"
patchelf --set-rpath "$old_rpath:$extra_rpath" $out/bin/conduwuit
'';
meta = {
description = "A community-driven Matrix homeserver in Rust";
mainProgram = "conduwuit";
+4 -11
View File
@@ -9,10 +9,8 @@
self',
lib,
pkgs,
inputs',
system,
craneLib,
mkToolchain,
...
}:
{
@@ -21,7 +19,7 @@
mkPackages =
pkgs:
let
fnx = inputs'.fenix.packages;
fnx = inputs.fenix.packages.${system};
isStatic = pkgs.stdenv.hostPlatform.isMusl;
@@ -30,7 +28,7 @@
if isStatic then
fnx.combine [
self'.packages.stable-toolchain
(mkToolchain fnx.targets.${pkgs.stdenv.hostPlatform.config}).rust-std
(fnx.targets.${pkgs.stdenv.hostPlatform.config}.stable).rust-std
]
else
self'.packages.stable-toolchain
@@ -38,10 +36,7 @@
default = pkgs.callPackage ./continuwuity.nix {
inherit self craneLib;
liburing = (if isStatic then pkgs.pkgsStatic else pkgs).liburing;
rocksdb = if isStatic then null else self'.packages.rocksdb;
# extra features via `cargoExtraArgs`
cargoExtraArgs = "-F http3";
# extra RUSTFLAGS via `rustflags`
@@ -64,12 +59,10 @@
in
{
inherit default max-perf max-perf-haswell;
};
in
{
rocksdb = pkgs.callPackage ./rocksdb.nix { };
}
// (mkPackages pkgs)
(mkPackages pkgs)
// (lib.mapAttrs' (name: value: lib.nameValuePair "${name}-static-x86_64" value) (
mkPackages (
import inputs.nixpkgs {
-36
View File
@@ -1,36 +0,0 @@
{
# stdenv,
# enableJemalloc ? stdenv.hostPlatform.isLinux,
enableJemalloc ? false,
rocksdb,
fetchFromGitea,
rust-jemalloc-sys-unprefixed,
...
}:
(rocksdb.override {
# rocksdb fails to build with prefixed jemalloc, which is required on
# darwin due to [1]. In this case, fall back to building rocksdb with
# libc malloc. This should not cause conflicts, because all of the
# jemalloc symbols are prefixed.
#
# [1]: https://github.com/tikv/jemallocator/blob/ab0676d77e81268cd09b059260c75b38dbef2d51/jemalloc-sys/src/env.rs#L17
jemalloc = rust-jemalloc-sys-unprefixed;
inherit enableJemalloc;
}).overrideAttrs
({
version = "continuwuity-v0.5.0-unstable-2026-05-19";
src = fetchFromGitea {
domain = "forgejo.ellis.link";
owner = "continuwuation";
repo = "rocksdb";
rev = "3756b2b905e13216d8b56bcc783d814e7b073aff";
hash = "sha256-rSv4fr2bf9JJwdodgeuPCuceeh7k97KVxrAOC0wyPQY=";
};
# We have this already at https://forgejo.ellis.link/continuwuation/rocksdb/commit/a935c0273e1ba44eacf88ce3685a9b9831486155
# Unsetting `patches` so we don't have to revert it and make this nix exclusive
patches = [ ];
# Unset postPatch, as our version override breaks version-specific sed calls in the original package
postPatch = "";
})
-14
View File
@@ -1,14 +0,0 @@
{
perSystem =
{ pkgs, ... }:
{
apps.update-rocksdb = {
type = "app";
program = pkgs.writeShellApplication {
name = "update-rocksdb";
runtimeInputs = [ pkgs.nix-update ];
text = "nix-update rocksdb -F --version branch";
};
};
};
}
+9 -13
View File
@@ -2,26 +2,22 @@
{
perSystem =
{
system,
lib,
inputs',
pkgs,
...
}:
let
mkToolchain =
target:
target.fromToolchainName {
name = (lib.importTOML "${inputs.self}/rust-toolchain.toml").toolchain.channel;
sha256 = "sha256-mvUGEOHYJpn3ikC5hckneuGixaC+yGrkMM/liDIDgoU=";
};
in
{
_module.args = { inherit mkToolchain; };
packages =
let
fnx = inputs'.fenix.packages;
stable-toolchain = (mkToolchain fnx).toolchain;
fnx = inputs.fenix.packages.${system};
stable-toolchain = fnx.fromToolchainFile {
dir = inputs.self;
# See also `rust-toolchain.toml`
sha256 = "sha256-mvUGEOHYJpn3ikC5hckneuGixaC+yGrkMM/liDIDgoU=";
};
in
{
inherit stable-toolchain;
+51 -1
View File
@@ -31,7 +31,7 @@
};
use tracing_subscriber::EnvFilter;
use crate::admin_command;
use crate::{PAGE_SIZE, admin_command};
#[admin_command]
pub(super) async fn echo(&self, message: Vec<String>) -> Result {
@@ -1138,3 +1138,53 @@ pub(super) async fn send_test_email(&self) -> Result {
Ok(())
}
#[admin_command]
pub(super) async fn rooms_by_extremity_count(&self, page: Option<usize>) -> Result {
let page = page.unwrap_or(1);
// My Giant Chain:tm:
let mapped: HashMap<OwnedRoomId, u64> = self
.services
.rooms
.state
.all_forward_extremities()
.ready_fold(HashMap::new(), move |mut map, (room_id, _)| {
let count: u64 = map.get(&room_id).copied().unwrap_or(0);
map.insert(room_id, count.saturating_add(1));
map
})
.await
.into_iter()
.filter_map(|(room_id, count)| (count >= 2).then_some((room_id, count)))
.collect();
if mapped.is_empty() {
return Err!("No more rooms.");
}
let mut rooms = mapped.keys().collect::<Vec<_>>();
rooms.sort_by_key(|room_id| {
mapped
.get(*room_id)
.copied()
.expect("keys must have values")
});
rooms.reverse();
let body = rooms
.into_iter()
.stream()
.skip(page.saturating_sub(1).saturating_mul(PAGE_SIZE))
.take(PAGE_SIZE)
.map(|room_id| {
format!("{room_id}: {}", mapped.get(room_id).copied().expect("keys must have values"))
})
.collect::<Vec<_>>()
.await;
self.write_str(&format!(
"Rooms by extremity count ({}):\n```\n{}\n```",
body.len(),
body.join("\n")
))
.await
}
+5
View File
@@ -237,6 +237,11 @@ pub enum DebugCommand {
/// Send a test email to the invoking admin's email address
SendTestEmail,
/// Lists room IDs by forward extremity count in descending order
RoomsByExtremityCount {
page: Option<usize>,
},
/// Developer test stubs
#[command(subcommand)]
#[allow(non_snake_case)]
+3 -7
View File
@@ -4,15 +4,11 @@
use conduwuit::{Err, Event, Result, debug, info, trace, utils::to_canonical_object, warn};
use ruma::{OwnedEventId, api::federation::event::get_missing_events};
use serde_json::{json, value::RawValue};
use service::rooms::event_handler::GET_MISSING_EVENTS_MAX_BATCH_SIZE;
use super::AccessCheck;
use crate::Ruma;
/// arbitrary number but synapse's is 20 and we can handle lots of these anyways
const LIMIT_MAX: usize = 50;
/// spec says default is 10
const LIMIT_DEFAULT: usize = 10;
/// # `POST /_matrix/federation/v1/get_missing_events/{roomId}`
///
/// Retrieves events that the sender is missing.
@@ -45,8 +41,8 @@ pub(crate) async fn get_missing_events_route(
let limit = body
.limit
.try_into()
.unwrap_or(LIMIT_DEFAULT)
.min(LIMIT_MAX);
.unwrap_or(10)
.min(GET_MISSING_EVENTS_MAX_BATCH_SIZE);
let room_version = services.rooms.state.get_room_version(&body.room_id).await?;
+11 -3
View File
@@ -7,7 +7,7 @@
use axum::extract::State;
use axum_client_ip::ClientIp;
use conduwuit::{
Err, Error, Result, debug, debug_warn, err, error,
Err, Error, Result, debug, debug_error, debug_warn, err, error,
result::LogErr,
state_res::lexicographical_topological_sort,
trace,
@@ -133,6 +133,7 @@ async fn wait_for_result(
}
#[instrument(
name="transaction"
skip_all,
fields(
id = ?body.transaction_id.as_str(),
@@ -174,8 +175,14 @@ async fn process_inbound_transaction(
for (id, result) in &results {
if let Err(e) = result {
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
debug_warn!("Incoming PDU failed {id}: {e:?}");
match e {
| Error::BadRequest(
ErrorKind::Forbidden | ErrorKind::InvalidParam | ErrorKind::BadJson,
..,
) => {
debug_warn!("Incoming PDU {id} failed: {e:?}");
},
| _ => debug_error!("Incoming PDU {id} failed: {e:?}"),
}
}
}
@@ -381,6 +388,7 @@ async fn handle_room(
.rooms
.event_handler
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
.boxed()
.await
.map(|_| ());
results.push((event_id, result));
+14 -2
View File
@@ -375,7 +375,7 @@ pub struct Config {
#[serde(default = "default_max_request_size")]
pub max_request_size: usize,
/// default: 192
/// default: 1024
#[serde(default = "default_max_fetch_prev_events")]
pub max_fetch_prev_events: u16,
@@ -781,6 +781,16 @@ pub struct Config {
/// a substitute for moderation bots.
pub default_room_acl_deny: Option<Vec<String>>,
/// The number of forward extremities to tolerate in a room before
/// attempting to manually squash them with a "dummy event". Setting this
/// above 20 will hinder its efficacy, and setting it below 5 will cause
/// more dummy events to be sent than necessary (which increases federation
/// traffic).
///
/// default: 10
#[serde(default = "default_extremity_threshold")]
pub dummy_event_threshold: u8,
/// display: nested
#[serde(default)]
pub well_known: WellKnownConfig,
@@ -2549,7 +2559,7 @@ fn default_pusher_timeout() -> u64 { 60 }
fn default_pusher_idle_timeout() -> u64 { 15 }
fn default_max_fetch_prev_events() -> u16 { 192_u16 }
fn default_max_fetch_prev_events() -> u16 { 1024 }
fn default_max_concurrent_inbound_transactions() -> usize { 150 }
@@ -2652,6 +2662,8 @@ fn default_rocksdb_stats_level() -> u8 { 1 }
#[inline]
pub fn default_default_room_version() -> RoomVersionId { RoomVersionId::V12 }
fn default_extremity_threshold() -> u8 { 10 }
fn default_ip_range_denylist() -> Vec<String> {
vec![
"127.0.0.0/8".to_owned(),
+1 -1
View File
@@ -62,7 +62,7 @@ impl Default for PartialPdu {
fn default() -> Self {
Self {
event_type: "m.room.message".into(),
content: Box::<RawJsonValue>::default(),
content: to_raw_value("{}").unwrap(),
unsigned: None,
state_key: None,
redacts: None,
+4
View File
@@ -187,6 +187,10 @@ pub(super) fn open_list(db: &Arc<Engine>, maps: &[Descriptor]) -> Result<Maps> {
val_size_hint: Some(8),
..descriptor::RANDOM_SMALL
},
Descriptor {
name: "roomid_mindepth",
..descriptor::RANDOM_SMALL
},
Descriptor {
name: "roomserverids",
..descriptor::RANDOM_SMALL
@@ -1,233 +1,667 @@
use std::{
collections::{BTreeMap, HashSet, VecDeque, hash_map},
collections::{HashMap, HashSet, VecDeque},
time::Instant,
};
use assign::assign;
#[cfg(debug_assertions)]
use conduwuit::error;
use conduwuit::{
Event, PduEvent, debug, debug_warn, implement, matrix::event::gen_event_id_canonical_json,
trace, utils::continue_exponential_backoff_secs, warn,
Err, Event, PduEvent, debug, debug_error, debug_info, debug_warn, err,
state_res::lexicographical_topological_sort,
trace,
utils::{IterStream, math::Expected, stream::BroadbandExt},
warn,
};
use futures::{StreamExt, future::select_ok};
use ruma::{
CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
api::federation::event::get_event,
CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
OwnedServerName, RoomId, ServerName, UInt,
api::federation::event::{get_event, get_missing_events},
int,
room_version_rules::RoomVersionRules,
};
use super::get_room_version_rules;
use crate::rooms::event_handler::parse_incoming_pdu::expect_event_id_array;
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
/// it is appended to the outliers Tree.
pub const GET_MISSING_EVENTS_MAX_BATCH_SIZE: usize = 50;
/// Attempts to build a localised directed acyclic graph out of the given PDUs,
/// returning them in a topologically sorted order.
///
/// Returns pdu and if we fetched it over federation the raw json.
///
/// a. Look in the main timeline (pduid_pdu tree)
/// b. Look at outlier pdu tree
/// c. Ask origin server over federation
/// d. TODO: Ask other servers over federation?
#[implement(super::Service)]
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
&self,
origin: &'a ServerName,
events: Events,
create_event: &'a Pdu,
room_id: &'a RoomId,
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
where
Pdu: Event + Send + Sync,
Events: Iterator<Item = &'a EventId> + Clone + Send,
{
let back_off = |id| match self
.services
.globals
.bad_event_ratelimiter
.write()
.entry(id)
{
| hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
},
| hash_map::Entry::Occupied(mut e) => {
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
},
};
/// This is used to attempt to process PDUs in an order that respects their
/// dependencies, however it is ultimately the sender's responsibility to send
/// them in a processable order, so this is just a best effort attempt. It does
/// not account for power levels or other tie breaks.
pub async fn build_local_dag<S: std::hash::BuildHasher + Send + Sync>(
pdu_map: &HashMap<OwnedEventId, &CanonicalJsonObject, S>,
) -> conduwuit::Result<Vec<OwnedEventId>> {
debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs");
let mut dag: HashMap<OwnedEventId, HashSet<OwnedEventId>> =
HashMap::with_capacity(pdu_map.len());
let mut id_origin_ts: HashMap<OwnedEventId, _> = HashMap::with_capacity(pdu_map.len());
let mut events_with_auth_events = Vec::with_capacity(events.clone().count());
trace!("Fetching {} outlier pdus", events.clone().count());
for (event_id, value) in pdu_map {
// We already checked that these properties are correct in parse_incoming_pdu,
// so it's safe to unwrap here.
// We also filter to remove any prev_events that are not in this pdu_map, as we
// need to have at least one event with zero out degrees for the lexico-topo
// sort below. If there are multiple events with omitted prevs, they will be
// ordered by timestamp, then event ID. At that point though, it's unlikely to
// matter.
let prev_events = value
.get("prev_events")
.unwrap()
.as_array()
.unwrap()
.iter()
.map(|v| EventId::parse(v.as_str().unwrap()).unwrap())
.filter(|id| pdu_map.contains_key(id))
.collect();
for id in events {
// a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree
// (get_pdu_json checks both)
if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await {
trace!("Found {id} in main timeline or outlier tree");
events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![]));
continue;
}
dag.insert(event_id.clone(), prev_events);
let origin_server_ts = value
.get("origin_server_ts")
.and_then(CanonicalJsonValue::as_integer)
.unwrap_or_default();
id_origin_ts.insert(event_id.clone(), origin_server_ts);
}
// c. Ask origin server over federation
// We also handle its auth chain here so we don't get a stack overflow in
// handle_outlier_pdu.
let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into();
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
debug!(count = dag.len(), "Sorting incoming events with partial graph");
lexicographical_topological_sort(&dag, &async |node_id| {
// Note: we don't bother fetching power levels because that would massively slow
// this function down. This is a best-effort attempt to order events correctly
// for processing, however ultimately that should be the sender's job.
let ts = id_origin_ts
.get(&node_id)
.copied()
.unwrap_or_else(|| int!(0))
.to_string()
.parse::<u64>()
.ok()
.and_then(UInt::new)
.unwrap_or_default();
Ok((int!(0), MilliSecondsSinceUnixEpoch(ts)))
})
.await
.inspect(|sorted| {
debug_assert_eq!(
sorted.len(),
pdu_map.len(),
"Sorted graph was not the same size as the input graph"
);
})
.map_err(|e| err!("failed to resolve local graph: {e}"))
}
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
while let Some(next_id) = todo_auth_events.pop_front() {
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.get(&*next_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 60 * 2;
const MAX_DURATION: u64 = 60 * 60 * 8;
if continue_exponential_backoff_secs(
MIN_DURATION,
MAX_DURATION,
time.elapsed(),
*tries,
) {
debug_warn!(
tried = ?*tries,
elapsed = ?time.elapsed(),
"Backing off from {next_id}",
);
continue;
}
}
impl super::Service {
/// Uses `POST /_matrix/federation/v1/get_missing_events/{room_id}` to fill
/// gaps in the DAG.
///
/// This function walks backwards from `head`, fetching incrementally (by a
/// factor of 10) more events until the remote we're fetching from either
/// stops returning new events, or the min_depth is reached.
///
/// This function does not persist the events, but does validate them. The
/// caller is responsible for passing them through handle_incoming_pdu or
/// related functions.
///
/// Only the one `via` is asked for missing events, as multiplexing remotes
/// may result in the event tree being walked in a gappy or disordered
/// manner.
///
/// ## Parameters
///
/// - `room_id`: The room's ID.
/// - `head`: The event we are potentially missing prev_events for.
/// - `tail`: The most recently known events in the graph (typically forward
/// extremities).
/// - `via`: The server to ask for missing events.
/// - `min_depth`: Don't process events with a `depth` lower than this
/// value. Not massively useful, but can help short-circuit infinite loops
/// and weird edge paths.
#[tracing::instrument(name = "get_missing_events_bulk", skip_all)]
pub async fn get_missing_events(
&self,
room_id: &RoomId,
head: &PduEvent,
tail: Vec<OwnedEventId>,
via: &ServerName,
min_depth: UInt,
) -> conduwuit::Result<HashMap<OwnedEventId, PduEvent>> {
let start = Instant::now();
#[cfg(debug_assertions)]
{
let missing_count = head
.prev_events()
.stream()
.fold(0_u8, |i, event_id| async move {
if self.services.timeline.pdu_exists(event_id).await {
i.expected_add(1)
} else {
i
}
})
.await;
debug_assert_ne!(
missing_count, 0,
"event passed to get_missing_events is not missing any events (wasteful call)"
);
};
assert!(!tail.is_empty(), "empty tail");
assert_ne!(via, self.services.globals.server_name(), "cannot ask ourselves for events");
if events_all.contains(&next_id) {
continue;
}
// The iteration limit is in place to ensure that if the remote server leaves us
// in a state of infinite recursion (as old versions of continuwuity and
// predecessors would), we give up. However, get_missing_events doesn't return
// that many events per-request. Synapse returns 20, and conduwuit+ return 50.
// This means with a hard iteration limit, we might give up too early, before
// we get a chance to even come close to max_fetch_prev_events. As such, we'll
// calculate the limit based on that config option and the aforementioned
// averages.
let max_fetch = self.services.server.config.max_fetch_prev_events;
let iteration_limit = max_fetch.saturating_div(20).max(10);
if self.services.timeline.pdu_exists(&next_id).await {
trace!("Found {next_id} in db");
continue;
}
debug!("Fetching {next_id} over federation from {origin}.");
match self
let mut discovered = HashMap::with_capacity(head.prev_events.len());
let mut latest_events: Vec<OwnedEventId> = vec![head.event_id().to_owned()];
debug!(elapsed=?start.elapsed(),
%room_id,
event_id=%head.event_id(),
%iteration_limit,
"Fetching any missing events for head event",
);
for iteration in 0..iteration_limit {
let limit = iteration
.expected_add(1)
.saturating_mul(10)
.min(GET_MISSING_EVENTS_MAX_BATCH_SIZE.try_into().expect(
"GET_MISSING_EVENTS_MAX_BATCH_SIZE (usize) should fit in u16 (<=65536)",
))
.max(
// This max call ensures we fetch *at least* all the prev events the
// head has.
u16::try_from(head.prev_events.len())
.expect("cannot have more than 20 prev events, which fits in u16"),
);
debug_info!(elapsed=?start.elapsed(),
%limit,
%via,
%iteration,
%iteration_limit,
discovered=discovered.len(),
%min_depth,
"Attempting to gap fill missing events"
);
let response: get_missing_events::v1::Response = self
.services
.sending
.send_federation_request(
origin,
get_event::v1::Request::new((*next_id).to_owned()),
via,
assign!(
get_missing_events::v1::Request::new(
room_id.to_owned(),
tail.clone(),
latest_events.clone()
),
{limit: limit.into(), min_depth}
),
)
.await
{
| Ok(res) => {
debug!("Got {next_id} over federation from {origin}");
let Ok(room_version_rules) = get_room_version_rules(create_event) else {
back_off((*next_id).to_owned());
continue;
};
.await?;
let Ok((calculated_event_id, value)) =
gen_event_id_canonical_json(&res.pdu, &room_version_rules)
else {
back_off((*next_id).to_owned());
continue;
};
if calculated_event_id != *next_id {
warn!(
"Server didn't return event id we requested: requested: {next_id}, \
we got {calculated_event_id}. Event: {:?}",
&res.pdu
);
}
if let Some(auth_events) = value
.get("auth_events")
.and_then(CanonicalJsonValue::as_array)
{
for auth_event in auth_events {
match serde_json::from_value::<OwnedEventId>(
auth_event.clone().into(),
) {
| Ok(auth_event) => {
trace!(
"Found auth event id {auth_event} for event {next_id}"
);
todo_auth_events.push_back(auth_event);
},
| _ => {
warn!("Auth event id is not valid");
},
}
}
} else {
warn!("Auth event list invalid");
}
events_in_reverse_order.push((next_id.clone(), value));
events_all.insert(next_id);
},
| Err(e) => {
warn!("Failed to fetch auth event {next_id} from {origin}: {e}");
back_off((*next_id).to_owned());
},
if response.events.is_empty() {
debug_info!(
elapsed=?start.elapsed(),
%via,
"Finished gap filling missing events (remote returned no more events)."
);
break;
}
}
debug_info!(
elapsed=?start.elapsed(),
"Got {} events back from remote",
response.events.len()
);
events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order));
}
let mut pdus = Vec::with_capacity(events_with_auth_events.len());
for (id, local_pdu, events_in_reverse_order) in events_with_auth_events {
// a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree
// (get_pdu_json checks both)
if let Some(local_pdu) = local_pdu {
trace!("Found {id} in main timeline or outlier tree");
pdus.push((local_pdu.clone(), None));
}
for (next_id, value) in events_in_reverse_order.into_iter().rev() {
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.get(&*next_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 5 * 60;
const MAX_DURATION: u64 = 60 * 60 * 24;
if continue_exponential_backoff_secs(
MIN_DURATION,
MAX_DURATION,
time.elapsed(),
*tries,
) {
debug!("Backing off from {next_id}");
latest_events.clear();
for raw_event in response.events {
let (_, event_id, pdu_json) = self.parse_incoming_pdu(&raw_event).await?;
let pdu = PduEvent::from_id_val(&event_id, pdu_json).map_err(|e| {
err!(Request(BadJson("Failed to parse gapfilled event {event_id}: {e}")))
})?;
if discovered.contains_key(&event_id) {
// We already received this event.
trace!("Already received {event_id}");
continue;
}
if self
.services
.timeline
.non_outlier_pdu_exists(&event_id)
.await
{
// NOTE: we explicitly check for *non*-outlier events here, as if we end
// up discovering outlier events, we will be able to upgrade them
// immediately.
trace!("Already have {event_id} as a timeline PDU");
continue;
}
if pdu.depth < min_depth {
debug_warn!(
elapsed=?start.elapsed(),
"Received PDU with depth {} below min_depth {}",
pdu.depth,
min_depth
);
discovered.insert(event_id.clone(), pdu);
continue;
}
for prev_event_id in pdu.prev_events() {
if discovered.contains_key(prev_event_id) {
// We already received this event.
trace!("Already received prev event {prev_event_id}");
continue;
}
if self
.services
.timeline
.non_outlier_pdu_exists(prev_event_id)
.await
{
// NOTE: we explicitly check for *non*-outlier events here, as if we end
// up discovering outlier events, we will be able to upgrade them
// immediately.
trace!("Already have prev event {prev_event_id} as a timeline PDU");
continue;
}
if let Ok(outlier) = self.services.timeline.get_pdu(prev_event_id).await {
// We already have this PDU as an outlier, don't ask for
// it. However, if we are missing any prev events for it, add it to the
// latest events anyway.
let outlier_missing_prevs = outlier
.prev_events()
.stream()
.fold(0_u8, |i, event_id| async move {
if self.services.timeline.pdu_exists(event_id).await {
i.expected_add(1)
} else {
i
}
})
.await;
if outlier_missing_prevs > 0 {
trace!("Missing {outlier_missing_prevs} PDU(s) for prev event");
latest_events.push(prev_event_id.to_owned());
}
trace!("Had {prev_event_id} as an outlier already, skipping discovery");
discovered.insert(prev_event_id.to_owned(), outlier);
continue;
}
trace!("Missing prev {prev_event_id} of {event_id}");
latest_events.push(prev_event_id.to_owned());
}
trace!("Discovered {event_id}");
discovered.insert(event_id.clone(), pdu);
}
trace!("Handling outlier {next_id}");
if latest_events.is_empty() {
debug!(elapsed=?start.elapsed(),
%limit,
%via,
%iteration,
discovered=discovered.len(),
"No more events to fetch."
);
break;
}
if discovered.len() >= self.services.server.config.max_fetch_prev_events.into() {
// Stupid hack, debug_error!() drops the log to a DEBUG when not in debug mode,
// which is bad because this should at least produce a warning. It's an error in
// debug mode because this can be important, but typically not much can be done
// about it as a user.
#[cfg(debug_assertions)]
error!(elapsed=?start.elapsed(),
discovered=discovered.len(),
max_fetch_prev_events=self.services.server.config.max_fetch_prev_events,
%iteration,
%iteration_limit,
%via,
event_id=%head.event_id(),
%room_id,
"Encountered a gap too large to fill, giving up"
);
#[cfg(not(debug_assertions))]
warn!(elapsed=?start.elapsed(),
discovered=discovered.len(),
max_fetch_prev_events=self.services.server.config.max_fetch_prev_events,
%iteration,
%iteration_limit,
%via,
event_id=%head.event_id(),
%room_id,
"Encountered a gap too large to fill"
);
break;
}
}
trace!(elapsed=?start.elapsed(), "Finished get_missing_events");
Ok(discovered)
}
/// Sends a `GET /_matrix/federation/v1/event/{event_id}` request to the
/// target `remote`, parses the resulting PDU, and ensures the remote
/// returned the correct event.
/// Allows `fetch_and_handle_missing_events` to atomically fetch events from
/// multiple remotes in parallel.
async fn fetch_event_via(
&self,
remote: OwnedServerName,
event_id: OwnedEventId,
room_version_rules: &RoomVersionRules,
) -> conduwuit::Result<(OwnedEventId, CanonicalJsonObject)> {
let res = self
.services
.sending
.send_federation_request(&remote, get_event::v1::Request::new(event_id.clone()))
.await?;
let (calculated_event_id, value) = self
.parse_incoming_pdu_with_known_room(&res.pdu, room_version_rules)
.await?;
if calculated_event_id != event_id {
Err!(Request(BadJson(warn!(
expected=%event_id,
received=%calculated_event_id,
"Server didn't return event id we requested",
))))
} else {
Ok((event_id, value))
}
}
async fn fetch_event_vias(
&self,
candidates: impl Iterator<Item = &OwnedServerName>,
event_id: &EventId,
room_version_rules: &RoomVersionRules,
) -> conduwuit::Result<(OwnedEventId, CanonicalJsonObject)> {
if let Ok(pdu_json) = self.services.timeline.get_pdu_json(event_id).await {
return Ok((event_id.to_owned(), pdu_json));
}
let futures = candidates
.map(|remote| {
Box::pin(self.fetch_event_via(
remote.to_owned(),
event_id.to_owned(),
room_version_rules,
))
})
.collect::<Vec<_>>();
select_ok(futures).await.map(|(res, _)| res)
}
/// Asks remote servers for any individual events that are missing, also
/// known as "atomic fetch". Should only be used for fetching missing auth
/// events or resolving missing events from state_ids. For all other uses,
/// use get_missing_events.
///
/// This function manually walks auth_events trees in a breadth-first
/// search, and persists all fetched events as outliers when all the
/// backwards extremities have been resolved.
#[tracing::instrument(name = "get_missing_auth_events_atomic", skip_all)]
pub(super) async fn fetch_and_handle_auth_events<Pdu>(
&self,
origin: &ServerName,
events: Vec<OwnedEventId>,
create_event: &Pdu,
room_id: &RoomId,
) -> HashMap<OwnedEventId, PduEvent>
where
Pdu: Event + Send + Sync,
{
let start = Instant::now();
let room_version_rules =
&get_room_version_rules(create_event).unwrap_or(RoomVersionRules::V1);
let mut candidates = self
.services
.timeline
.candidate_backfill_servers(room_id)
.await;
candidates.insert(origin.to_owned());
assert!(!candidates.is_empty(), "no candidates to fetch missing events from");
let mut discovered_events =
HashMap::with_capacity(events.len().saturating_add(events.len().saturating_mul(3)));
trace!(
elapsed=?start.elapsed(),
"Fetching {} unknown PDUs on demand from {} candidates",
events.len(),
candidates.len()
);
let mut seen: HashMap<OwnedEventId, u8> = HashMap::new();
for apex_event_id in &events {
let mut todo: VecDeque<OwnedEventId> = [apex_event_id.to_owned()].into();
while let Some(target_id) = todo.pop_front() {
if discovered_events.contains_key(&target_id) {
continue;
}
if let Ok(local_pdu) = self.services.timeline.get_pdu(&target_id).await {
trace!(elapsed=?start.elapsed(), "Found {target_id} in db");
let mut obj = local_pdu.into_canonical_object();
obj.remove("event_id");
discovered_events.insert(target_id.clone(), obj);
continue;
}
let attempts = seen.get(&*target_id).copied().unwrap_or_default();
if attempts >= 5 {
debug_error!(
elapsed=?start.elapsed(),
%attempts,
%target_id,
"Could not fetch missing event after 5 attempts, giving up"
);
continue;
}
debug!(elapsed=?start.elapsed(),"Fetching {target_id} over federation");
let value = match self
.fetch_event_vias(candidates.iter(), &target_id, room_version_rules)
.await
{
| Ok((_, x)) => x,
| Err(e) => {
warn!(elapsed=?start.elapsed(),"failed to fetch missing event {target_id} from any candidate: {e}");
continue;
},
};
let auth_events =
match expect_event_id_array(&value, "auth_events").map_err(|e| {
err!(Request(BadJson(warn!(
elapsed=?start.elapsed(),
event_id=%target_id,
"Failed to parse event fetched from remote: {e}"
))))
}) {
| Ok(auth_events) => auth_events,
| Err(e) => {
warn!(
elapsed=?start.elapsed(),
?e,
"event {target_id} is malformed (bad auth_events), skipping"
);
continue;
},
};
let mut have_all_auth = true;
for auth_event_id in auth_events {
if let Ok(local_pdu) = self.services.timeline.get_pdu(&auth_event_id).await {
trace!(elapsed=?start.elapsed(),"Found auth event {auth_event_id} in db");
let mut obj = local_pdu.into_canonical_object();
obj.remove("event_id");
discovered_events.insert(auth_event_id.clone(), obj);
continue;
}
if discovered_events.contains_key(&auth_event_id) {
trace!(elapsed=?start.elapsed(),%auth_event_id, "Already found auth event");
continue;
}
debug!(elapsed=?start.elapsed(),"Missing auth event {auth_event_id} for event {target_id}");
seen.insert(
auth_event_id.clone(),
seen.get(&auth_event_id)
.copied()
.unwrap_or_default()
.saturating_add(1),
);
todo.push_back(auth_event_id);
have_all_auth = false;
}
// Insert this PDU back at the end of the queue so that it will be resolved once
// all of its auth events have been fetched.
if have_all_auth {
debug!(elapsed=?start.elapsed(),%target_id, "Have all auth events");
discovered_events.insert(target_id, value);
} else {
debug_warn!(elapsed=?start.elapsed(),
"Fetched {target_id} but missing some auth events, will have to re-fetch."
);
seen.insert(target_id.clone(), attempts.saturating_add(1));
todo.push_back(target_id);
}
}
}
let refmap: HashMap<OwnedEventId, &CanonicalJsonObject> = discovered_events
.iter()
.map(|(id, data)| (id.clone(), data))
.collect();
let seeded_ordered = build_local_dag(&refmap)
.await
.expect("failed to build local DAG");
let mut pdus = HashMap::with_capacity(seeded_ordered.len());
for discovered_event_id in seeded_ordered {
let pdu_json = discovered_events.remove(&discovered_event_id).unwrap();
debug_info!(
elapsed=?start.elapsed(),
"Handling missing event {discovered_event_id} as outlier"
);
assert_eq!(pdu_json.get("event_id"), None, "pdu_json had event_id");
match Box::pin(self.handle_outlier_pdu(
origin,
create_event,
&next_id,
&discovered_event_id,
room_id,
value.clone(),
true,
pdu_json,
))
.await
{
| Ok((pdu, json)) =>
if next_id == *id {
trace!("Handled outlier {next_id} (original request)");
pdus.push((pdu, Some(json)));
},
| Err(e) => {
warn!("Authentication of event {next_id} failed: {e:?}");
back_off(next_id);
| Ok((pdu, _)) => {
trace!(elapsed=?start.elapsed(), "Persisted {discovered_event_id}");
let _ = pdus.insert(discovered_event_id, pdu);
},
| Err(e) => warn!(
elapsed=?start.elapsed(),
"Authentication of event {discovered_event_id} failed: {e:?}"
),
}
}
trace!(
elapsed=?start.elapsed(),
"Finished fetch_and_handle_missing_events: fetched and handled {} missing PDUs",
pdus.len()
);
pdus.retain(|id, _| events.contains(id)); // Only return state events
trace!(elapsed=?start.elapsed(), "Filtered return value down to {} PDUs", pdus.len());
pdus
}
/// Similar to `fetch_and_handle_missing_events`, but simply walks the
/// prev events tree instead of the auth events tree. Additionally, it does
/// not *handle* fetched PDUs in any capacity.
#[tracing::instrument(name = "get_missing_prev_events_atomic", skip_all)]
pub(super) async fn fetch_prev_events<Pdu>(
&self,
origin: &ServerName,
events: Vec<OwnedEventId>,
create_event: &Pdu,
room_id: &RoomId,
) -> HashMap<OwnedEventId, PduEvent>
where
Pdu: Event + Send + Sync,
{
let room_version_rules =
&get_room_version_rules(create_event).unwrap_or(RoomVersionRules::V1);
let mut candidates = self
.services
.timeline
.candidate_backfill_servers(room_id)
.await;
candidates.insert(origin.to_owned());
let mut todo: VecDeque<OwnedEventId> = VecDeque::from(events);
let mut discovered_events = HashMap::new();
while let Some(next_id) = todo.pop_front() {
if discovered_events.len() >= self.services.server.config.max_fetch_prev_events.into()
{
debug_warn!(
"Encountered a gap too large to fill, giving up (fetched {} events)",
discovered_events.len()
);
break;
}
if discovered_events.contains_key(&next_id) {
continue;
}
let pdu = match self
.fetch_event_vias(candidates.iter(), &next_id, room_version_rules)
.await
{
| Ok((_, data)) => data,
| Err(e) => {
warn!("Failed to fetch prev event {next_id} from any candidate: {e}");
continue;
},
};
let prev_events = match expect_event_id_array(&pdu, "prev_events").map_err(|e| {
err!(Request(BadJson(warn!(
event_id=%next_id,
"Failed to parse event fetched from remote: {e}"
))))
}) {
| Ok(auth_events) => auth_events,
| Err(e) => {
warn!(?e, "event {next_id} is malformed (bad prev_events), skipping");
continue;
},
};
let missing_prev = prev_events
.iter()
.stream()
.broad_filter_map(|event_id| async {
if discovered_events.contains_key(event_id)
|| self.services.timeline.pdu_exists(event_id).await
{
None
} else {
Some(event_id.to_owned())
}
})
.collect::<Vec<_>>()
.await;
todo.extend(missing_prev);
discovered_events.insert(
next_id.clone(),
PduEvent::from_id_val(&next_id, pdu).expect("fetched PDU was already validated"),
);
}
discovered_events
}
trace!("Fetched and handled {} outlier pdus", pdus.len());
pdus
}
+141 -120
View File
@@ -1,128 +1,149 @@
use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque},
iter::once,
};
use std::{collections::HashMap, time::Instant};
use conduwuit::{
Event, PduEvent, Result, debug_warn, err, implement,
state_res::{self},
};
use futures::{FutureExt, future};
use ruma::{
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
int, uint,
Event, PduEvent, debug, debug_info, debug_warn, trace,
utils::{BoolExt, IterStream, stream::BroadbandExt},
};
use futures::StreamExt;
use ruma::{CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName};
use super::check_room_id;
use crate::rooms::event_handler::build_local_dag;
#[implement(super::Service)]
#[tracing::instrument(
level = "debug",
skip_all,
fields(%origin),
)]
#[allow(clippy::type_complexity)]
pub(super) async fn fetch_prev<'a, Pdu, Events>(
&self,
origin: &ServerName,
create_event: &Pdu,
room_id: &RoomId,
first_ts_in_room: MilliSecondsSinceUnixEpoch,
initial_set: Events,
) -> Result<(
Vec<OwnedEventId>,
HashMap<OwnedEventId, (PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
)>
where
Pdu: Event + Send + Sync,
Events: Iterator<Item = &'a EventId> + Clone + Send,
{
let num_ids = initial_set.clone().count();
let mut eventid_info = HashMap::new();
let mut graph: HashMap<OwnedEventId, _> = HashMap::with_capacity(num_ids);
let mut todo_outlier_stack: VecDeque<OwnedEventId> =
initial_set.map(ToOwned::to_owned).collect();
let mut amount = 0;
while let Some(prev_event_id) = todo_outlier_stack.pop_front() {
self.services.server.check_running()?;
match self
.fetch_and_handle_outliers(
origin,
once(prev_event_id.as_ref()),
create_event,
room_id,
)
.boxed()
.await
.pop()
{
| Some((pdu, mut json_opt)) => {
check_room_id(room_id, &pdu)?;
let limit = self.services.server.config.max_fetch_prev_events;
if amount > limit {
debug_warn!("Max prev event limit reached! Limit: {limit}");
graph.insert(prev_event_id.clone(), HashSet::new());
continue;
}
if json_opt.is_none() {
json_opt = self
.services
.outlier
.get_outlier_pdu_json(&prev_event_id)
.await
.ok();
}
if let Some(json) = json_opt {
if pdu.origin_server_ts() > first_ts_in_room {
amount = amount.saturating_add(1);
for prev_prev in pdu.prev_events() {
if !graph.contains_key(prev_prev) {
todo_outlier_stack.push_back(prev_prev.to_owned());
}
}
graph.insert(
prev_event_id.clone(),
pdu.prev_events().map(ToOwned::to_owned).collect(),
);
} else {
// Time based check failed
graph.insert(prev_event_id.clone(), HashSet::new());
}
eventid_info.insert(prev_event_id.clone(), (pdu, json));
} else {
// Get json failed, so this was not fetched over federation
graph.insert(prev_event_id.clone(), HashSet::new());
}
},
| _ => {
// Fetch and handle failed
graph.insert(prev_event_id.clone(), HashSet::new());
},
impl super::Service {
/// Fetches any missing prev_events for this event and persists them before
/// returning.
pub(super) async fn fetch_prevs(
&self,
room_id: &RoomId,
create_event: &PduEvent,
incoming_pdu: &PduEvent,
origin: &ServerName,
first_ts_in_room: MilliSecondsSinceUnixEpoch,
) -> conduwuit::Result<()> {
let start = Instant::now();
let mut missing = incoming_pdu
.prev_events()
.stream()
.broad_filter_map(|event_id| async move {
self.services
.timeline
.get_non_outlier_pdu_json(event_id)
.await
.is_ok()
.or(|| event_id.to_owned())
})
.collect::<Vec<_>>()
.await;
if missing.is_empty() {
debug!(elapsed=?start.elapsed(), event_id=%incoming_pdu.event_id(), "No missing prev events.");
return Ok(());
}
debug!(elapsed=?start.elapsed(), %room_id, event_id=%incoming_pdu.event_id(), ?missing, "Fetching previous events");
let tail = self
.services
.state
.get_forward_extremities(room_id)
.collect::<Vec<_>>()
.await;
let mut gapfilled = self
.get_missing_events(
room_id,
incoming_pdu,
tail,
origin,
self.services
.metadata
.get_mindepth(room_id)
.await
.saturating_sub(
u8::try_from(incoming_pdu.prev_events.len())
.unwrap()
.saturating_mul(2)
.into(),
),
)
.await?;
debug_info!(elapsed=?start.elapsed(), "Fetched {} missing events", gapfilled.len());
missing.retain(|eid| !gapfilled.contains_key(eid));
if !missing.is_empty() {
debug_warn!(elapsed=?start.elapsed(), "Still missing {} events, falling back to atomic fetch.", missing.len());
gapfilled.extend(
self.fetch_prev_events(origin, missing, create_event, room_id)
.await,
);
}
// Persist all fetched events
let mapped = gapfilled
.iter()
.map(|(eid, evt)| {
let mut obj = evt.to_canonical_object();
obj.remove("event_id"); // event_id is inserted by backfill_missing_events
(eid.clone(), obj)
})
.collect::<HashMap<_, _>>();
let to_persist = if mapped.len() <= 1 {
mapped.keys().map(ToOwned::to_owned).collect()
} else {
let refmap: HashMap<OwnedEventId, &CanonicalJsonObject> =
mapped.iter().map(|(id, data)| (id.clone(), data)).collect();
build_local_dag(&refmap).await?
};
let job_start = Instant::now();
trace!("Starting to persist {} prev events", to_persist.len());
for (i, event_id) in to_persist.iter().enumerate() {
debug!(
elapsed=?start.elapsed(),
"Persisting fetched prev event: {event_id} ({}/{})",
i.saturating_add(1),
to_persist.len(),
);
let obj = mapped.get(event_id).cloned().unwrap();
let persist_start = Instant::now();
match self
.handle_outlier_pdu(origin, create_event, event_id, room_id, obj)
.await
{
| Ok((pdu, val)) if pdu.origin_server_ts() >= first_ts_in_room => {
self.upgrade_outlier_to_timeline_pdu(pdu, val, create_event, origin, room_id)
.await
.inspect_err(|e| {
debug_warn!(
total_elapsed=?start.elapsed(),
job_elapsed=?job_start.elapsed(),
task_elapsed=?persist_start.elapsed(),
"Failed to upgrade prev event {event_id}: {e}",
);
})
.inspect(|_| {
debug_info!(
total_elapsed=?start.elapsed(),
job_elapsed=?job_start.elapsed(),
task_elapsed=?persist_start.elapsed(),
"Upgraded prev event {event_id}",
);
})
.ok();
},
| Err(e) => debug_warn!(
total_elapsed=?start.elapsed(),
job_elapsed=?job_start.elapsed(),
task_elapsed=?persist_start.elapsed(),
"Failed to persist prev event {event_id}: {e}",
),
| _ => {},
}
}
// NOTE because i keep forgetting: the caller persists incoming_pdu.
// we only care about its prev events
trace!(
total_elapsed=?start.elapsed(),
persist_elapsed=?job_start.elapsed(),
);
Ok(())
}
let event_fetch = |event_id| {
let origin_server_ts = eventid_info
.get(&event_id)
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get());
// This return value is the key used for sorting events,
// events are then sorted by power level, time,
// and lexically by event_id.
future::ok((int!(0), MilliSecondsSinceUnixEpoch(origin_server_ts)))
};
let sorted = state_res::lexicographical_topological_sort(&graph, &event_fetch)
.await
.map_err(|e| err!(Database(error!("Error sorting prev events: {e}"))))?;
Ok((sorted, eventid_info))
}
+379 -73
View File
@@ -1,86 +1,392 @@
use std::collections::{HashMap, hash_map};
use conduwuit::{Err, Event, Result, debug, debug_warn, err, implement};
use futures::FutureExt;
use ruma::{
EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids,
events::StateEventType,
use std::{
cmp::max,
collections::{HashMap, HashSet, hash_map},
hash::{BuildHasherDefault, DefaultHasher},
time::{Duration, Instant},
};
use crate::rooms::short::ShortStateKey;
use conduwuit::{
Err, Event, PduEvent, Result, debug, debug_warn, err, info, trace,
utils::{BoolExt, IterStream},
warn,
};
use futures::{StreamExt, TryFutureExt, future::select_ok};
use ruma::{
EventId, OwnedEventId, OwnedRoomId, RoomId, ServerName,
api::federation::event::{get_room_state, get_room_state_ids},
};
/// Call /state_ids to find out what the state at this pdu is. We trust the
/// server's response to some extend (sic), but we still do a lot of checks
/// on the events
#[implement(super::Service)]
#[tracing::instrument(
level = "debug",
skip_all,
fields(%origin),
)]
pub(super) async fn fetch_state<Pdu>(
&self,
origin: &ServerName,
create_event: &Pdu,
room_id: &RoomId,
event_id: &EventId,
) -> Result<Option<HashMap<u64, OwnedEventId>>>
where
Pdu: Event + Send + Sync,
{
let res = self
.services
.sending
.send_federation_request(
origin,
get_room_state_ids::v1::Request::new(event_id.to_owned(), room_id.to_owned()),
)
.await
.inspect_err(|e| debug_warn!("Fetching state for event failed: {e}"))?;
use crate::{conduwuit::utils::stream::BroadbandExt, rooms::short::ShortStateKey};
debug!("Fetching state events");
let state_ids = res.pdu_ids.iter().map(AsRef::as_ref);
let state_vec = self
.fetch_and_handle_outliers(origin, state_ids, create_event, room_id)
.boxed()
.await;
let mut state: HashMap<ShortStateKey, OwnedEventId> = HashMap::with_capacity(state_vec.len());
for (pdu, _) in state_vec {
let state_key = pdu
.state_key()
.ok_or_else(|| err!(Database("Found non-state pdu in state events.")))?;
let shortstatekey = self
impl super::Service {
/// Asks a remote server what the state at this event is.
/// It first attempts to call `GET /_matrix/federation/v1/state_ids` (fast).
/// If any events are missing, they are fetched from the remote, and
/// persisted as outliers, before being returned back to this function. If
/// we are missing a lot of events locally (>=50), this function falls back
/// to requesting the full state in PDU format from the remote (`GET
/// /_matrix/federation/v1/state, very slow in large rooms), and persists
/// them directly.
#[tracing::instrument(skip_all)]
pub(super) async fn fetch_state(
&self,
origin: &ServerName,
create_event: &PduEvent,
room_id: &RoomId,
event_id: &EventId,
) -> Result<HashMap<u64, OwnedEventId>> {
let start = Instant::now();
trace!(%origin, "Asking remote for state_ids");
let res: get_room_state_ids::v1::Response = match self
.services
.short
.get_or_create_shortstatekey(&pdu.kind().to_string().into(), state_key)
.sending
.send_federation_request(
origin,
get_room_state_ids::v1::Request::new(event_id.to_owned(), room_id.to_owned()),
)
.await
.inspect_err(
|e| debug_warn!(elapsed=?start.elapsed(), "Fetching state for event failed: {e}"),
) {
| Ok(resp) => Ok(resp),
| Err(e) =>
if e.is_not_found() {
self.fetch_state_ids_from_backfill_servers(
event_id.to_owned(),
room_id.to_owned(),
)
.await
} else {
Err(e)
},
}?;
debug!(elapsed=?start.elapsed(), events = res.pdu_ids.len(), "Fetching state events");
let mut state_events: HashMap<OwnedEventId, PduEvent> =
HashMap::with_capacity(res.pdu_ids.len());
let to_fetch: Vec<OwnedEventId> = res
.pdu_ids
.clone()
.into_iter()
.stream()
.broad_filter_map(|event_id| async move {
self.services
.timeline
.pdu_exists(&event_id)
.await
.or_some(event_id)
})
.collect()
.await;
if to_fetch.is_empty() {
debug!(elapsed=?start.elapsed(), "All required state events are already known.");
state_events = res
.pdu_ids
.iter()
.stream()
.broad_filter_map(|event_id| async move {
Some((
event_id.clone(),
self.services
.timeline
.get_pdu(event_id)
.await
.expect("Event disappeared between filtering and fetching"),
))
})
.collect()
.await;
assert_eq!(
state_events.len(),
res.pdu_ids.len(),
"Failed to load all required state events despite allegedly knowing all of them \
already",
);
} else {
let total_count = res.pdu_ids.len();
let missing_count = to_fetch.len();
let missing_threshold = max(50, total_count >> 2);
if missing_count >= missing_threshold {
// If there's more than 50 events to fetch, or we're missing 25% or more of the
// state, we would need to make a lot of atomic requests, so we'll just try
// to fetch the full state from the remote instead.
// Since this endpoint might fail in huge rooms, we fall back to atomic fetch
// anyway.
warn!(
elapsed=?start.elapsed(),
%missing_count,
%total_count,
%missing_threshold,
"Fetching full state from remote server for event"
);
let state_response = tokio::time::timeout(
Duration::from_secs(30),
self.fetch_full_state(origin, create_event, room_id, event_id),
)
.await;
info!(
elapsed=?start.elapsed(),
%missing_count,
%total_count,
%missing_threshold,
"Fetched full state from remote server for event"
);
let fetched_state = match state_response {
| Ok(Ok(state)) => {
// Filter to ensure we only use the PDUs we were expecting, preventing
// arbitrary state injection.
// Atomic fetch does not have this problem as each PDU is evaluated
// individually.
let expected: &HashSet<OwnedEventId, BuildHasherDefault<DefaultHasher>> =
&HashSet::from_iter(res.pdu_ids.clone());
state
.into_iter()
.stream()
.broad_filter_map(|(event_id, pdu)| async move {
expected.contains(&event_id).then_some((event_id, pdu))
})
.collect()
.await
},
| Ok(Err(e)) => {
warn!(
elapsed=?start.elapsed(),
error=?e,
%origin,
"Failed to fetch full state from remote, falling back to atomic fetch"
);
self.fetch_and_handle_auth_events(
origin,
res.pdu_ids.clone(),
create_event,
room_id,
)
.await
},
| Err(e) => {
warn!(
elapsed=?start.elapsed(),
error=?e,
%origin,
"Remote did not return room state in an acceptable timeframe, falling back to atomic fetch"
);
self.fetch_and_handle_auth_events(
origin,
res.pdu_ids.clone(),
create_event,
room_id,
)
.await
},
};
match state.entry(shortstatekey) {
| hash_map::Entry::Vacant(v) => {
v.insert(pdu.event_id().to_owned());
},
| hash_map::Entry::Occupied(_) => {
return Err!(Database(
"State event's type and state_key combination exists multiple times: {}, {}",
pdu.kind(),
state_key
));
},
assert!(
!fetched_state.is_empty(),
"fetch_full_state or fetch_and_handle_missing_events returned empty state \
map"
);
state_events.extend(fetched_state);
} else {
state_events = res
.pdu_ids
.iter()
.stream()
.broad_filter_map(|event_id| async move {
self.services
.timeline
.get_pdu(event_id)
.await
.map(|p| (event_id.to_owned(), p))
.ok()
})
.collect()
.await;
assert!(
!state_events.is_empty(),
"Only missing {} events but read-ahead state vec was empty",
to_fetch.len()
);
debug!(
elapsed=?start.elapsed(),
to_fetch = to_fetch.len(),
"Fetching missing events for state from remote"
);
let fetched_state = self
.fetch_and_handle_auth_events(origin, to_fetch, create_event, room_id)
.await;
state_events.extend(fetched_state);
}
}
if state_events.is_empty() {
return Ok(HashMap::new());
}
let mut state: HashMap<ShortStateKey, OwnedEventId> =
HashMap::with_capacity(state_events.len());
debug!(elapsed=?start.elapsed(), events = state_events.len(), "Processing state events");
for (event_id, pdu) in state_events {
let state_key = pdu.state_key().ok_or_else(|| {
err!(Request(BadJson("Found non-state pdu in state events: {event_id}")))
})?;
let shortstatekey = self
.services
.short
.get_or_create_shortstatekey(&pdu.kind().to_string().into(), state_key)
.await;
match state.entry(shortstatekey) {
| hash_map::Entry::Vacant(v) => {
v.insert(pdu.event_id().to_owned());
},
| hash_map::Entry::Occupied(existing) => {
return Err!(Request(Forbidden(
"State event's type and state_key combination exists multiple times \
({event_id} + {}): ({}, \"{}\")",
existing.get(),
pdu.kind(),
state_key,
)));
},
}
}
trace!(elapsed=?start.elapsed(), "fetch_state finished");
Ok(state)
}
// The original create event must still be in the state
let create_shortstatekey = self
.services
.short
.get_shortstatekey(&StateEventType::RoomCreate, "")
.await?;
if state.get(&create_shortstatekey).map(AsRef::as_ref) != Some(create_event.event_id()) {
return Err!(Database("Incoming event refers to wrong create event."));
async fn fetch_state_ids_from_backfill_servers(
&self,
event_id: OwnedEventId,
room_id: OwnedRoomId,
) -> Result<get_room_state_ids::v1::Response> {
let candidates = self
.services
.timeline
.candidate_backfill_servers(&room_id)
.await;
if candidates.is_empty() {
return Err!(Request(NotFound(
"Cannot ask any other servers for the state at this event"
)));
}
debug!(%room_id, ?candidates, "Asking backfill servers for state_ids");
let futures = candidates.iter().map(|server_name| {
Box::pin(
self.services
.sending
.send_federation_request(
server_name,
get_room_state_ids::v1::Request::new(event_id.clone(), room_id.clone()),
)
.inspect_err(|e| {
debug_warn!("Fallback fetching state for event failed: {e}");
}),
)
});
Ok(select_ok(futures).await?.0)
}
Ok(Some(state))
/// Fetches the full state via `GET /_matrix/federation/v1/state` from a
/// remote server, and persists all the incoming auth chain events and
/// state events as outliers, for use later.
///
/// Any events that cannot be persisted are dropped with a warning.
pub(super) async fn fetch_full_state(
&self,
origin: &ServerName,
create_event: &PduEvent,
room_id: &RoomId,
event_id: &EventId,
) -> Result<HashMap<OwnedEventId, PduEvent>> {
let start = Instant::now();
trace!("Fetching full state from remote server");
let res: get_room_state::v1::Response = self
.services
.sending
.send_federation_request(
origin,
get_room_state::v1::Request::new(event_id.to_owned(), room_id.to_owned()),
)
.await
.inspect_err(|e| debug_warn!("Fetching state for event failed: {e}"))?;
debug!(elapsed=?start.elapsed(), count = res.auth_chain.len(), "Handling incoming auth chain...");
res.auth_chain
.iter()
.stream()
.broad_filter_map(|raw_event_json| async {
if let Some(parsed) = self.parse_incoming_pdu(raw_event_json).await.ok()
&& parsed.0 == room_id
{
Some(parsed)
} else {
None
}
})
.for_each_concurrent(
None,
|(incoming_room_id, incoming_event_id, incoming_event_json)| async move {
self.handle_outlier_pdu(
origin,
create_event,
&incoming_event_id,
&incoming_room_id,
incoming_event_json,
)
.await
.inspect_err(|e| {
warn!(
%incoming_room_id,
%incoming_event_id,
?e,
"Failed to handle auth chain event from state fetch"
);
})
.ok();
},
)
.await;
debug!(elapsed=?start.elapsed(), count = res.pdus.len(), "Handling incoming state PDUs...");
let r = res
.pdus
.iter()
.stream()
.broad_filter_map(|raw_event_json| async {
if let Some(parsed) = self.parse_incoming_pdu(raw_event_json).await.ok()
&& parsed.0 == room_id
{
Some(parsed)
} else {
None
}
})
.broad_filter_map(
|(incoming_room_id, incoming_event_id, incoming_event_json)| async move {
self.handle_outlier_pdu(
origin,
create_event,
&incoming_event_id,
&incoming_room_id,
incoming_event_json,
)
.await
.inspect_err(|e| {
warn!(
elapsed=?start.elapsed(),
%incoming_room_id,
%incoming_event_id,
?e,
"Failed to handle state event from state fetch"
);
})
.ok()
},
)
.fold(HashMap::new(), |mut acc, (event, _)| async move {
acc.insert(event.event_id().to_owned(), event);
acc
})
.await;
trace!(elapsed=?start.elapsed(), "fetch_full_state finished");
Ok(r)
}
}
@@ -1,14 +1,14 @@
use std::{
collections::{BTreeMap, hash_map},
time::Instant,
collections::BTreeMap,
time::{Duration, Instant},
};
use conduwuit::{
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, debug_error, debug_info, defer, err,
implement, info, trace, utils::stream::IterStream, warn,
Err, Event, PduEvent, Result, debug, debug_error, debug_info, debug_warn, defer, err, error,
implement, info, matrix::PartialPdu, result::DebugInspect, trace, warn,
};
use futures::{
FutureExt, TryFutureExt, TryStreamExt,
FutureExt, StreamExt,
future::{OptionFuture, try_join4},
};
use ruma::{
@@ -18,7 +18,6 @@
room::member::{MembershipState, RoomMemberEventContent},
},
};
use tracing::debug;
use crate::rooms::timeline::{RawPduId, pdu_fits};
@@ -111,7 +110,6 @@ async fn should_rescind_invite(
#[implement(super::Service)]
#[tracing::instrument(
name = "pdu",
level = INFO_SPAN_LEVEL,
skip_all,
fields(%room_id, %event_id),
)]
@@ -151,7 +149,7 @@ pub async fn handle_incoming_pdu<'a>(
.and_then(|v| v.as_str())
.ok_or_else(|| err!("No sender in object"))
.and_then(|v| Ok(UserId::parse(v)?))
.map_err(|e| err!(Request(InvalidParam("PDU does not have a valid sender key: {e}"))))?;
.map_err(|e| err!(Request(BadJson("PDU does not have a valid sender key: {e}"))))?;
let sender_acl_check: OptionFuture<_> = sender
.server_name()
@@ -224,10 +222,10 @@ pub async fn handle_incoming_pdu<'a>(
self.federation_handletime
.write()
.remove(room_id);
}};
}}
let (incoming_pdu, val) = self
.handle_outlier_pdu(origin, create_event, event_id, room_id, value, false)
.handle_outlier_pdu(origin, create_event, event_id, room_id, value)
.await?;
// 8. if not timeline event: stop
@@ -235,64 +233,95 @@ pub async fn handle_incoming_pdu<'a>(
return Ok(None);
}
// Skip old events
// Skip events sent before we joined (they need to be persisted as backfilled
// events, not timeline events, which is handled elsewhere).
let first_ts_in_room = self
.services
.timeline
.first_pdu_in_room(room_id)
.await?
.origin_server_ts();
if incoming_pdu.origin_server_ts() < first_ts_in_room {
return Ok(None);
}
// 9. Fetch any missing prev events doing all checks listed here starting at 1.
// These are timeline events
let (sorted_prev_events, mut eventid_info) = self
.fetch_prev(origin, create_event, room_id, first_ts_in_room, incoming_pdu.prev_events())
.await?;
debug!(
events = ?sorted_prev_events,
"Handling previous events"
);
sorted_prev_events
.iter()
.try_stream()
.map_ok(AsRef::as_ref)
.try_for_each(|prev_id| {
self.handle_prev_pdu(
origin,
event_id,
room_id,
eventid_info.remove(prev_id),
create_event,
first_ts_in_room,
prev_id,
)
.inspect_err(move |e| {
warn!("Prev {prev_id} failed: {e}");
match self
.services
.globals
.bad_event_ratelimiter
.write()
.entry(prev_id.into())
{
| hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
},
| hash_map::Entry::Occupied(mut e) => {
let tries = e.get().1.saturating_add(1);
*e.get_mut() = (Instant::now(), tries);
},
}
})
.map(|_| self.services.server.check_running())
})
.boxed()
.await?;
debug!("Fetching and persisting any missing prev events");
self.fetch_prevs(room_id, create_event, &incoming_pdu, origin, first_ts_in_room)
.await
.debug_inspect_err(|e| {
error!("Failed to fetch and persist incoming event's prev_events: {e:?}");
})?;
// Done with prev events, now handling the incoming event
self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
.boxed()
.await
let pdu_id = self
.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
.await?;
let extremities_count = self
.services
.state
.get_forward_extremities(room_id)
.count()
.await;
if extremities_count >= self.services.server.config.dummy_event_threshold.into() {
self.squash_extremities(room_id, extremities_count).await;
}
Ok(pdu_id)
}
#[implement(super::Service)]
async fn squash_extremities(&self, room_id: &RoomId, count: usize) {
let last_squash = {
let squash_timings = self.last_extremity_squash.read();
squash_timings.get(room_id).copied()
};
if last_squash.is_some_and(|s| s.elapsed() < Duration::from_mins(1)) {
// Avoid sending more than one squash per minute to avoid flooding rooms.
return;
}
debug_warn!(
%count,
threshold=%self.services.server.config.dummy_event_threshold,
"Attempting to squash extremities after upgrading pdu"
);
// Try to send a dummy event to squash extremities. See issue #1844
let power_levels = self
.services
.state_accessor
.get_room_power_levels(room_id)
.await;
let mut local_users = self.services.state_cache.local_users_in_room(room_id);
while let Some(user_id) = local_users.next().await {
if !power_levels.user_can_send_message(&user_id, "org.matrix.dummy_event".into()) {
trace!(%user_id, "user does not have power level to send dummy event, skipping");
continue;
}
let state_lock = self.services.state.mutex.lock(room_id).await;
if self
.services
.timeline
.build_and_append_pdu(
PartialPdu {
event_type: "org.matrix.dummy_event".into(),
..PartialPdu::default()
},
&user_id,
Some(room_id),
&state_lock,
)
.await
.inspect(|_| debug!(sender=%user_id, "Successfully sent a dummy event"))
.inspect_err(|e| debug!(sender=%user_id, ?e, "Failed to send a dummy event via user"))
.is_ok()
{
break;
}
}
let mut squash_timings = self.last_extremity_squash.write();
squash_timings.insert(room_id.to_owned(), Instant::now());
}
@@ -1,12 +1,13 @@
use std::collections::{BTreeMap, HashMap, hash_map};
use conduwuit::{
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res,
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, info, state_res,
trace, warn,
};
use futures::future::ready;
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
api::federation::authorization::get_event_authorization, canonical_json::redact,
events::StateEventType,
};
@@ -15,6 +16,7 @@
#[implement(super::Service)]
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(name="handle_outlier", skip_all, fields(%event_id))]
pub(super) async fn handle_outlier_pdu<'a, Pdu>(
&self,
origin: &'a ServerName,
@@ -22,7 +24,6 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
event_id: &'a EventId,
room_id: &'a RoomId,
mut value: CanonicalJsonObject,
auth_events_known: bool,
) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>
where
Pdu: Event + Send + Sync,
@@ -46,27 +47,38 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
.verify_event(&value, &room_version_rules)
.await
{
| Ok(ruma::signatures::Verified::All) => value,
| Ok(ruma::signatures::Verified::All) => {
if let Ok(pdu_event) = self.services.timeline.get_pdu(event_id).await {
debug!(
"Already have event {event_id} as an outlier or timeline event, not \
re-processing"
);
value.insert(
"event_id".to_owned(),
CanonicalJsonValue::String(event_id.as_str().to_owned()),
);
check_room_id(room_id, &pdu_event)?;
return Ok((pdu_event, value));
}
value
},
| Ok(ruma::signatures::Verified::Signatures) => {
// Redact
debug_info!("Calculated hash does not match (redaction): {event_id}");
let Ok(obj) =
ruma::canonical_json::redact(value, &room_version_rules.redaction, None)
else {
return Err!(Request(InvalidParam("Redaction failed")));
};
// Skip the PDU if it is redacted and we already have it as an outlier event
if self.services.timeline.pdu_exists(event_id).await {
return Err!(Request(InvalidParam(
"Event was redacted and we already knew about it"
)));
if let Ok(pdu_event) = self.services.timeline.get_pdu(event_id).await {
debug!(
"Received a redacted copy of {event_id}, but we already knew about it. \
Re-using known content instead."
);
check_room_id(room_id, &pdu_event)?;
let obj = pdu_event.to_canonical_object();
return Ok((pdu_event, obj));
}
obj
debug_info!("Calculated hash does not match (redaction): {event_id}");
redact(value, &room_version_rules.redaction, None)
.map_err(|e| err!(Request(BadJson("Failed to redact {event_id}: {e}"))))?
},
| Err(e) => {
return Err!(Request(InvalidParam(debug_error!(
return Err!(Request(Forbidden(debug_error!(
"Signature verification failed for {event_id}: {e}"
))));
},
@@ -87,65 +99,78 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
// Fetch all auth events
let mut auth_events: HashMap<OwnedEventId, PduEvent> = HashMap::new();
for aid in pdu_event.auth_events() {
if self.services.pdu_metadata.is_event_rejected(aid).await {
for auth_event_id in pdu_event.auth_events() {
if self
.services
.pdu_metadata
.is_event_rejected(auth_event_id)
.await
{
debug_warn!(
"Rejecting incoming event {} which depends on rejected auth event {aid}",
"Rejecting incoming event {} which depends on rejected auth event \
{auth_event_id}",
event_id,
);
self.services.pdu_metadata.mark_event_rejected(event_id);
return Err!(Request(InvalidParam("Event has rejected auth event: {aid}")));
return Err!(Request(Forbidden("Event has rejected auth event: {auth_event_id}")));
}
if let Ok(auth_event) = self.services.timeline.get_pdu(aid).await {
if let Ok(auth_event) = self.services.timeline.get_pdu(auth_event_id).await {
check_room_id(room_id, &auth_event)?;
trace!("Found auth event {aid} for outlier event {event_id} locally");
auth_events.insert(aid.to_owned(), auth_event);
trace!("Found auth event {auth_event_id} for outlier event {event_id} locally");
auth_events.insert(auth_event_id.to_owned(), auth_event);
} else {
debug_warn!("Could not find auth event {aid} for outlier event {event_id} locally");
debug_warn!(
"Could not find auth event {auth_event_id} for outlier event {event_id} locally"
);
}
}
// Fetch any missing ones & reject invalid ones
let missing_auth_events = if auth_events_known {
pdu_event
.auth_events()
.filter(|id| !auth_events.contains_key(*id))
.collect::<Vec<_>>()
} else {
pdu_event.auth_events().collect::<Vec<_>>()
};
if !missing_auth_events.is_empty() || !auth_events_known {
debug_info!(
"Fetching {} missing auth events for outlier event {event_id}",
missing_auth_events.len()
);
for (pdu, _) in self
.fetch_and_handle_outliers(
if auth_events.len() != pdu_event.auth_events().count() {
info!("Missing some auth events, asking remote for auth chain");
let response: get_event_authorization::v1::Response = self
.services
.sending
.send_federation_request(
origin,
missing_auth_events.iter().copied(),
create_event,
room_id,
get_event_authorization::v1::Request::new(
room_id.to_owned(),
event_id.to_owned(),
),
)
.await
{
auth_events.insert(pdu.event_id().to_owned(), pdu);
.map_err(|e| {
err!(Request(Forbidden(
"Remote server is not divulging incoming event's auth chain: {e}"
)))
})?;
let mut auth_chain_map = HashMap::with_capacity(response.auth_chain.len());
for auth_pdu_json in response.auth_chain {
let (auth_event_room_id, auth_event_id, auth_pdu_json) =
self.parse_incoming_pdu(&auth_pdu_json).await?;
if auth_event_room_id != room_id {
return Err!(Request(Forbidden(
"Auth event {auth_event_id} is in {auth_event_room_id}, not {room_id}."
)));
}
let auth_pdu = PduEvent::from_id_val(&auth_event_id, auth_pdu_json)
.map_err(|e| err!(Request(BadJson("Invalid PDU {auth_event_id}: {e}"))))?;
auth_chain_map.insert(auth_event_id, auth_pdu);
}
for auth_event_id in pdu_event.auth_events() {
if auth_events.contains_key(auth_event_id) {
continue;
}
if let Some(auth_event) = auth_chain_map.get(auth_event_id) {
auth_events.insert(auth_event_id.to_owned(), auth_event.clone());
} else {
return Err!(Request(Forbidden(
"Remote server is not divulging incoming event's auth events (missing: \
{auth_event_id})"
)));
}
}
} else {
debug!("No missing auth events for outlier event {event_id}");
}
// reject if we are still missing some
let still_missing = pdu_event
.auth_events()
.filter(|id| !auth_events.contains_key(*id))
.collect::<Vec<_>>();
if !still_missing.is_empty() {
// Don't reject: this could be a temporary condition
// TODO: use get_missing_events?
return Err!(Request(InvalidParam(
"Could not fetch all auth events for outlier event {event_id}, still missing: \
{still_missing:?}"
)));
}
// 6. Reject "due to auth events" if the event doesn't pass auth based on the
@@ -176,7 +201,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
.outlier
.add_pdu_outlier(pdu_event.event_id(), &incoming_pdu);
self.services.pdu_metadata.mark_event_rejected(event_id);
return Err!(Request(InvalidParam(
return Err!(Request(Forbidden(
"Auth event's type and state_key combination exists multiple times: {}, {}",
auth_event.kind,
auth_event.state_key().unwrap_or("")
@@ -185,18 +210,6 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
}
}
// The original create event must be in the auth events
if !matches!(
auth_events_by_key.get(&(StateEventType::RoomCreate, String::new().into())),
Some(_) | None
) {
self.services.pdu_metadata.mark_event_rejected(event_id);
self.services
.outlier
.add_pdu_outlier(pdu_event.event_id(), &incoming_pdu);
return Err!(Request(InvalidParam("Incoming event refers to wrong create event.")));
}
let state_fetch = |ty: &StateEventType, sk: &str| {
let key = (ty.to_owned(), sk.into());
ready(auth_events_by_key.get(&key).map(ToOwned::to_owned))
@@ -1,89 +0,0 @@
use std::{collections::BTreeMap, time::Instant};
use conduwuit::{
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, defer, implement,
utils::continue_exponential_backoff_secs,
};
use ruma::{CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName};
use tracing::debug;
#[implement(super::Service)]
#[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(
name = "prev",
level = INFO_SPAN_LEVEL,
skip_all,
fields(%prev_id),
)]
pub(super) async fn handle_prev_pdu<'a, Pdu>(
&self,
origin: &'a ServerName,
event_id: &'a EventId,
room_id: &'a RoomId,
eventid_info: Option<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
create_event: &'a Pdu,
first_ts_in_room: MilliSecondsSinceUnixEpoch,
prev_id: &'a EventId,
) -> Result
where
Pdu: Event + Send + Sync,
{
// Check for disabled again because it might have changed
if self.services.metadata.is_disabled(room_id).await {
return Err!(Request(Forbidden(debug_warn!(
"Federaton of room {room_id} is currently disabled on this server. Request by \
origin {origin} and event ID {event_id}"
))));
}
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.get(prev_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 5 * 60;
const MAX_DURATION: u64 = 60 * 60 * 24;
if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) {
debug!(
?tries,
duration = ?time.elapsed(),
"Backing off from prev_event"
);
return Ok(());
}
}
let Some((pdu, json)) = eventid_info else {
return Ok(());
};
// Skip old events
if pdu.origin_server_ts() < first_ts_in_room {
return Ok(());
}
let start_time = Instant::now();
self.federation_handletime
.write()
.insert(room_id.into(), ((*prev_id).to_owned(), start_time));
defer! {{
self.federation_handletime
.write()
.remove(room_id);
}};
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id)
.await?;
debug!(
elapsed = ?start_time.elapsed(),
"Handled prev_event",
);
Ok(())
}
+8 -2
View File
@@ -4,7 +4,6 @@
mod fetch_state;
mod handle_incoming_pdu;
mod handle_outlier_pdu;
mod handle_prev_pdu;
mod parse_incoming_pdu;
mod policy_server;
mod resolve_state;
@@ -15,6 +14,7 @@
use async_trait::async_trait;
use conduwuit::{Err, Event, PduEvent, Result, Server, SyncRwLock, utils::MutexMap};
pub use fetch_and_handle_outliers::{GET_MISSING_EVENTS_MAX_BATCH_SIZE, build_local_dag};
use ruma::{
OwnedEventId, OwnedRoomId, RoomId, events::room::create::RoomCreateEventContent,
room_version_rules::RoomVersionRules,
@@ -22,10 +22,10 @@
use tokio::sync::Notify;
use crate::{Dep, globals, rooms, sending, server_keys};
pub struct Service {
pub mutex_federation: RoomMutexMap,
pub federation_handletime: SyncRwLock<HandleTimeMap>,
pub last_extremity_squash: SyncRwLock<HashMap<OwnedRoomId, Instant>>,
services: Services,
server_shutdown: Notify,
}
@@ -56,6 +56,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
mutex_federation: RoomMutexMap::new(),
federation_handletime: HandleTimeMap::new().into(),
last_extremity_squash: SyncRwLock::new(HashMap::new()),
services: Services {
globals: args.depend::<globals::Service>("globals"),
sending: args.depend::<sending::Service>("sending"),
@@ -91,6 +92,11 @@ async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result {
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
fn interrupt(&self) { self.server_shutdown.notify_waiters(); }
async fn clear_cache(&self) {
let mut squashes = self.last_extremity_squash.write();
squashes.clear();
}
}
impl Service {
@@ -7,7 +7,7 @@
use itertools::Itertools;
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, RoomId,
RoomVersionId,
RoomVersionId, room_version_rules::RoomVersionRules,
};
use serde_json::value::RawValue as RawJsonValue;
@@ -56,7 +56,10 @@ fn extract_room_id(event_type: &str, pdu: &CanonicalJsonObject) -> Result<OwnedR
/// Parses every entry in an array as an event ID, returning an error if any
/// step fails.
fn expect_event_id_array(value: &CanonicalJsonObject, field: &str) -> Result<Vec<OwnedEventId>> {
pub(super) fn expect_event_id_array(
value: &CanonicalJsonObject,
field: &str,
) -> Result<Vec<OwnedEventId>> {
value
.get(field)
.ok_or_else(|| err!(Request(BadJson("missing field `{field}` on PDU"))))?
@@ -101,6 +104,21 @@ pub fn validate_pdu(&self, pdu: &CanonicalJsonObject) -> Result {
}
#[implement(super::Service)]
pub async fn parse_incoming_pdu_with_known_room(
&self,
pdu: &RawJsonValue,
room_version_rules: &RoomVersionRules,
) -> Result<(OwnedEventId, CanonicalJsonObject)> {
let (event_id, value) =
gen_event_id_canonical_json(pdu, room_version_rules).map_err(|e| {
err!(Request(InvalidParam("Could not convert event to canonical json: {e}")))
})?;
self.validate_pdu(&value)?;
Ok((event_id, value))
}
#[implement(super::Service)]
#[tracing::instrument(name = "parse", skip_all)]
pub async fn parse_incoming_pdu(&self, pdu: &RawJsonValue) -> Result<Parsed> {
let value = serde_json::from_str::<CanonicalJsonObject>(pdu.get()).map_err(|e| {
err!(BadServerResponse(debug_warn!("Error parsing incoming event {e:?}")))
@@ -5,7 +5,7 @@
};
use conduwuit::{
Result, debug, err, error, implement,
Result, debug, debug_error, err, error, implement,
matrix::{Event, StateMap},
trace,
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt},
@@ -37,6 +37,7 @@ pub(super) async fn state_at_incoming_degree_one<Pdu>(
.pdu_shortstatehash(prev_event)
.await
else {
trace!("No shortstatehash for {prev_event}, cannot calculate one-degree state.");
return Ok(None);
};
@@ -99,6 +100,7 @@ pub(super) async fn state_at_incoming_resolved<Pdu>(
.map_ok(move |sstatehash| (sstatehash, prev_event))
})
.try_collect::<HashMap<_, _>>()
.inspect_err(|e| debug_error!("failed to calculate N-degree short state hashes: {e}"))
.await
else {
return Ok(None);
@@ -1,8 +1,9 @@
use std::{borrow::Borrow, sync::Arc, time::Instant};
use conduwuit::{
Err, Result, debug, debug_info, err, implement, info, is_equal_to,
Err, Result, debug, debug_error, debug_info, err, implement, info, is_equal_to,
matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res},
result::DebugInspect,
trace,
utils::{
IterStream,
@@ -23,28 +24,17 @@
};
#[implement(super::Service)]
pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
#[tracing::instrument(name="upgrade_outlier", skip_all, fields(event_id=%incoming_pdu.event_id()))]
pub(super) async fn upgrade_outlier_to_timeline_pdu(
&self,
incoming_pdu: PduEvent,
mut val: CanonicalJsonObject,
create_event: &Pdu,
create_event: &PduEvent,
origin: &ServerName,
room_id: &RoomId,
) -> Result<Option<RawPduId>>
where
Pdu: Event + Send + Sync,
{
// Skip the PDU if we already have it as a timeline event
if let Ok(pduid) = self
.services
.timeline
.get_pdu_id(incoming_pdu.event_id())
.await
{
return Ok(Some(pduid));
}
let (rejected, soft_failed) = join!(
) -> Result<Option<RawPduId>> {
let (pduid, rejected, soft_failed) = join!(
self.services.timeline.get_pdu_id(incoming_pdu.event_id()),
self.services
.pdu_metadata
.is_event_rejected(incoming_pdu.event_id()),
@@ -52,17 +42,27 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
.pdu_metadata
.is_event_soft_failed(incoming_pdu.event_id())
);
if rejected {
return Err!(Request(InvalidParam("Event has been rejected")));
if let Ok(id) = pduid {
trace!(event_id=%incoming_pdu.event_id(), "Skipping upgrade of already upgraded PDU");
return Ok(Some(id));
} else if rejected {
return Err!(Request(Forbidden("Event has been rejected")));
} else if soft_failed {
return Err!(Request(InvalidParam("Event has been soft-failed")));
return Err!(Request(Forbidden("Event has been soft-failed")));
}
assert_eq!(
*create_event.kind(),
StateEventType::RoomCreate.into(),
"tried to upgrade a PDU with a create_event that is not a room create event"
);
debug!(
event_id = %incoming_pdu.event_id,
"Upgrading PDU from outlier to timeline"
);
let timer = Instant::now();
let min_depth = self.services.metadata.get_mindepth(room_id).await;
let room_version_rules = get_room_version_rules(create_event)?;
// 10. Fetch missing state and auth chain events by calling /state_ids at
@@ -73,21 +73,32 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
event_id = %incoming_pdu.event_id,
"Resolving state at event"
);
let mut state_at_incoming_event = if incoming_pdu.prev_events().count() == 1 {
let state_at_incoming_event = if incoming_pdu.prev_events().count() == 1 {
self.state_at_incoming_degree_one(&incoming_pdu).await?
} else {
self.state_at_incoming_resolved(&incoming_pdu, room_id, &room_version_rules)
.await?
};
let state_at_incoming_event = match state_at_incoming_event {
| Some(s) => s,
| None => {
trace!("Could not calculate incoming state, asking remote {origin} for it");
self.fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
.await
.debug_inspect_err(|e| debug_error!("Could not fetch state from {origin}: {e}"))?
},
};
if state_at_incoming_event.is_none() {
state_at_incoming_event = self
.fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
.await?;
if state_at_incoming_event.is_empty()
&& *incoming_pdu.event_type() != StateEventType::RoomCreate.into()
{
// This can happen if the remote sends an event but cannot be reached to fetch
// the state at it, and all other servers in the room (which might just be the
// unreachable server) are unable to provide required info.
// returning an error here allows the upgrade to be attempted at another time.
return Err!(Request(Forbidden("Could not resolve incoming state at event")));
}
let state_at_incoming_event =
state_at_incoming_event.expect("we always set this to some above");
trace!(state_events = state_at_incoming_event.len(), "Calculated incoming state");
debug!(
event_id = %incoming_pdu.event_id,
@@ -382,6 +393,12 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
// Event has passed all auth/stateres checks
drop(state_lock);
if incoming_pdu.depth > min_depth && incoming_pdu.state_key().is_some() {
self.services
.metadata
.set_mindepth(room_id, incoming_pdu.depth.into());
trace!("Increased room's min depth from {} to {}", min_depth, incoming_pdu.depth);
}
Ok(pdu_id)
}
+43 -4
View File
@@ -1,7 +1,7 @@
use std::{collections::HashMap, sync::Arc};
use conduwuit::{
Err, Pdu, Result, Server, debug, debug_info, debug_warn, err, error, info, is_true,
Err, Event, Pdu, Result, Server, debug, debug_info, debug_warn, err, error, info, is_true,
matrix::{
StateKey,
event::{gen_event_id, gen_event_id_canonical_json},
@@ -34,7 +34,7 @@
use crate::{
Dep, antispam, globals,
rooms::{
metadata, outlier, pdu_metadata, short,
event_handler, metadata, outlier, pdu_metadata, short,
state::{self, RoomMutexGuard},
state_accessor, state_cache,
state_compressor::{self, CompressedState, HashSetCompressStateEvent},
@@ -51,6 +51,7 @@ struct Services {
server: Arc<Server>,
db: Arc<Database>,
antispam: Dep<antispam::Service>,
event_handler: Dep<event_handler::Service>,
globals: Dep<globals::Service>,
metadata: Dep<metadata::Service>,
outlier: Dep<outlier::Service>,
@@ -73,6 +74,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
server: args.server.clone(),
db: args.db.clone(),
antispam: args.depend::<antispam::Service>("antispam"),
event_handler: args.depend::<event_handler::Service>("rooms::event_handler"),
globals: args.depend::<globals::Service>("globals"),
metadata: args.depend::<metadata::Service>("rooms::metadata"),
outlier: args.depend::<outlier::Service>("rooms::outlier"),
@@ -381,8 +383,6 @@ pub async fn join_remote_room(
// It has enough fields to be called a proper event now
let mut join_event = join_event_stub;
info!("Asking {remote_server} for send_join in room {room_id}");
let send_join_request = federation::membership::create_join_event::v2::Request::new(
room_id.to_owned(),
event_id.clone(),
@@ -392,6 +392,18 @@ pub async fn join_remote_room(
.await,
);
// NOTE: send_join can take a long time to respond, but from the point of view
// of other servers, we may already have finished joining. This means they
// sometimes end up sending PDUs to us that we aren't yet ready to accept, and
// consequently drop. Holding the mutex over the room while processing mitigates
// this.
let _room_lock = self
.services
.event_handler
.mutex_federation
.lock(room_id.as_str())
.await;
info!("Asking {remote_server} for send_join in room {room_id}");
let send_join_response = match self
.services
.sending
@@ -577,7 +589,13 @@ pub async fn join_remote_room(
if !auth_check {
return Err!(Request(Forbidden("Auth check failed")));
}
let resident_before = self
.services
.state_cache
.server_in_room(self.services.globals.server_name(), room_id)
.await;
let cork = self.services.db.cork_and_flush();
info!("Compressing state from send_join");
let compressed: CompressedState = self
.services
@@ -626,6 +644,10 @@ pub async fn join_remote_room(
room_id,
)
.await?;
self.services
.metadata
.maybe_set_mindepth(room_id, parsed_join_pdu.depth.into())
.await;
info!("Setting final room state for new room");
// We set the room state after inserting the pdu, so that we never have a moment
@@ -633,6 +655,23 @@ pub async fn join_remote_room(
self.services
.state
.set_room_state(room_id, statehash_after_join, &state_lock);
if !resident_before {
// NOTE: We replace local extremities for this room if we were not a resident
// before. We might be doing a remote join to satisfy restricted join rules,
// so we don't want to do this if we're already a resident. Otherwise, we
// want to replace our forward extremities whole-sale in case we were
// desynced.
info!("Replacing local forward extremities");
self.services
.state
.set_forward_extremities(
room_id,
std::iter::once(parsed_join_pdu.event_id()),
&state_lock,
)
.await;
}
drop(cork);
Ok(())
}
+28 -2
View File
@@ -1,9 +1,9 @@
use std::sync::Arc;
use conduwuit::{Result, implement, utils::stream::TryIgnore};
use database::Map;
use database::{Deserialized, Map};
use futures::{Stream, StreamExt};
use ruma::{OwnedRoomId, RoomId};
use ruma::{OwnedRoomId, RoomId, UInt, uint};
use crate::{Dep, rooms};
@@ -17,6 +17,7 @@ struct Data {
bannedroomids: Arc<Map>,
roomid_shortroomid: Arc<Map>,
pduid_pdu: Arc<Map>,
roomid_mindepth: Arc<Map>,
}
struct Services {
@@ -31,6 +32,7 @@ fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
bannedroomids: args.db["bannedroomids"].clone(),
roomid_shortroomid: args.db["roomid_shortroomid"].clone(),
pduid_pdu: args.db["pduid_pdu"].clone(),
roomid_mindepth: args.db["roomid_mindepth"].clone(),
},
services: Services {
short: args.depend::<rooms::short::Service>("rooms::short"),
@@ -98,3 +100,27 @@ pub async fn is_disabled(&self, room_id: &RoomId) -> bool {
pub async fn is_banned(&self, room_id: &RoomId) -> bool {
self.db.bannedroomids.get(room_id).await.is_ok()
}
#[implement(Service)]
pub async fn get_mindepth(&self, room_id: &RoomId) -> UInt {
self.db
.roomid_mindepth
.get(room_id)
.await
.deserialized::<UInt>()
.unwrap_or_else(|_| uint!(0))
}
#[implement(Service)]
pub fn set_mindepth(&self, room_id: &RoomId, min_depth: u64) {
self.db
.roomid_mindepth
.put_raw(room_id.as_bytes(), min_depth.to_be_bytes());
}
#[implement(Service)]
pub async fn maybe_set_mindepth(&self, room_id: &RoomId, min_depth: u64) {
if min_depth > self.get_mindepth(room_id).await.into() {
self.set_mindepth(room_id, min_depth);
}
}
+10
View File
@@ -371,6 +371,16 @@ pub async fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result<ShortSta
.deserialized()
}
pub fn all_forward_extremities(
&self,
) -> impl Stream<Item = (OwnedRoomId, OwnedEventId)> + Send {
self.db
.roomid_pduleaves
.keys()
.map_ok(|(room_id, event_id): (OwnedRoomId, OwnedEventId)| (room_id, event_id))
.ignore_err()
}
pub fn get_forward_extremities<'a>(
&'a self,
room_id: &'a RoomId,
+4 -1
View File
@@ -221,7 +221,10 @@ pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box<RawJsonValue>) ->
}
#[implement(super::Service)]
async fn candidate_backfill_servers(&self, room_id: &RoomId) -> HashSet<OwnedServerName> {
pub(crate) async fn candidate_backfill_servers(
&self,
room_id: &RoomId,
) -> HashSet<OwnedServerName> {
let mut candidate_backfill_servers = HashSet::new();
let power_levels = self
+4
View File
@@ -173,6 +173,10 @@ pub async fn get_non_outlier_pdu_json(
self.db.get_non_outlier_pdu_json(event_id).await
}
pub async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> bool {
self.db.non_outlier_pdu_exists(event_id).await.is_ok()
}
/// Returns the pdu's id.
#[inline]
pub async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> {
+3 -2
View File
@@ -34,8 +34,9 @@ pub(super) async fn batch_notary_request<'a, S, K>(
batch
});
debug_assert!(!server_keys.is_empty(), "empty batch request to notary");
if server_keys.is_empty() {
return Ok(vec![]);
}
let mut results = Vec::new();
while let Some(batch) = server_keys