Compare commits

...

24 Commits

Author SHA1 Message Date
Jade Ellis
4c01274886 feat: Typing notifications in simplified sliding sync
What's missing? Being able to use separate rooms & lists for typing
indicators.
At the moment, we use the same ones as we use for the timeline, as
todo_rooms is quite intertwined. We need to disentangle this to get that
functionality, although I'm not sure if clients use it.
2025-06-14 19:41:00 +01:00
Jade Ellis
5d44653e3a fix: Incorrect command descriptions 2025-06-14 16:51:24 +01:00
Jade Ellis
44e60d0ea6 docs: Tiny phrasing changes to the security policy 2025-06-14 16:34:58 +01:00
Jade Ellis
d7514178ab ci: Fix extra bracket in commit shorthash 2025-06-13 14:30:26 +01:00
Jade Ellis
1d45e0b68c feat: Add warning when admin users will be exposed as support contacts 2025-06-13 13:39:50 +01:00
Jade Ellis
3c44dccd65 ci: HACK, disable saving to actions cache 2025-05-26 19:16:50 +01:00
Jade Ellis
b57be072c7 build: Don't rerun on git changes 2025-05-26 19:16:05 +01:00
Jade Ellis
ea5dc8e09d fix: Use correct brand in clap version string 2025-05-26 19:16:05 +01:00
Jade Ellis
b9d60c64e5 ci: Don't specify container for image builder 2025-05-26 19:16:04 +01:00
Jade Ellis
94ae824149 ci: Don't install rustup if it's already there 2025-05-26 19:16:03 +01:00
Jade Ellis
640714922b feat: For knock_restricted rooms, automatically join rooms we meet
restrictions for rather than knocking
2025-05-26 19:16:03 +01:00
Jade Ellis
2b268fdaf3 fix: Allow joining via invite for knock_restricted rooms 2025-05-26 19:16:01 +01:00
Jade Ellis
e8d823a653 docs: Apply feedback on security policy 2025-05-26 15:01:58 +01:00
Jade Ellis
0ba77674c7 docs: Security policy 2025-05-25 00:36:28 +01:00
Jade Ellis
2ccbd7d60b fix: Reference config directly 2025-05-21 21:06:44 +01:00
Jade Ellis
60960c6e09 feat: Automatically set well-known support contacts 2025-05-21 20:32:53 +01:00
Jade Ellis
ce40304667 chore: Upgrade deps 2025-05-21 15:28:46 +01:00
Jade Ellis
dcbc4b54c5 ci: Always show sccache stats 2025-05-21 12:45:25 +01:00
Jade Ellis
fce024b30b chore: Add must_use annotation 2025-05-21 12:45:14 +01:00
Jade Ellis
3e4e696761 fix: Make sure empty VERSION_EXTRA strings are ignored
Also updates built & removes unused optional features
2025-05-21 12:35:36 +01:00
Jason Volk
f605913ea9 Eliminate associated Id type from trait Event.
Co-authored-by: Jade Ellis <jade@ellis.link>
Signed-off-by: Jason Volk <jason@zemos.net>
2025-05-21 11:36:15 +01:00
Jason Volk
44302ce732 Eliminate explicit parallel_fetches argument.
Signed-off-by: Jason Volk <jason@zemos.net>
2025-05-21 11:36:15 +01:00
Jason Volk
bfb0a2b76a Remove unused Pdu::into_any_event().
Signed-off-by: Jason Volk <jason@zemos.net>
2025-05-21 11:36:14 +01:00
Jason Volk
fcd5669aa1 Join jemalloc background threads prior to exit.
Co-authored-by: Jade Ellis <jade@ellis.link>
Signed-off-by: Jason Volk <jason@zemos.net>
2025-05-21 11:36:13 +01:00
36 changed files with 781 additions and 616 deletions

View File

@@ -19,11 +19,20 @@ outputs:
rustc_version:
description: The rustc version installed
value: ${{ steps.rustc-version.outputs.version }}
rustup_version:
description: The rustup version installed
value: ${{ steps.rustup-version.outputs.version }}
runs:
using: composite
steps:
- name: Check if rustup is already installed
shell: bash
id: rustup-version
run: |
echo "version=$(rustup --version)" >> $GITHUB_OUTPUT
- name: Cache rustup toolchains
if: steps.rustup-version.outputs.version == ''
uses: actions/cache@v3
with:
path: |
@@ -33,6 +42,7 @@ runs:
# Requires repo to be cloned if toolchain is not specified
key: ${{ runner.os }}-rustup-${{ inputs.toolchain || hashFiles('**/rust-toolchain.toml') }}
- name: Install Rust toolchain
if: steps.rustup-version.outputs.version == ''
shell: bash
run: |
if ! command -v rustup &> /dev/null ; then

View File

@@ -57,7 +57,6 @@ jobs:
build-image:
runs-on: dind
container: ghcr.io/catthehacker/ubuntu:act-latest
needs: define-variables
permissions:
contents: read
@@ -181,14 +180,14 @@ jobs:
file: "docker/Dockerfile"
build-args: |
GIT_COMMIT_HASH=${{ github.sha }})
GIT_COMMIT_HASH_SHORT=${{ env.COMMIT_SHORT_SHA }})
GIT_COMMIT_HASH_SHORT=${{ env.COMMIT_SHORT_SHA }}
GIT_REMOTE_URL=${{github.event.repository.html_url }}
GIT_REMOTE_COMMIT_URL=${{github.event.head_commit.url }}
platforms: ${{ matrix.platform }}
labels: ${{ steps.meta.outputs.labels }}
annotations: ${{ steps.meta.outputs.annotations }}
cache-from: type=gha
cache-to: type=gha,mode=max
# cache-to: type=gha,mode=max
sbom: true
outputs: type=image,"name=${{ needs.define-variables.outputs.images_list }}",push-by-digest=true,name-canonical=true,push=true
env:
@@ -211,7 +210,6 @@ jobs:
merge:
runs-on: dind
container: ghcr.io/catthehacker/ubuntu:act-latest
needs: [define-variables, build-image]
steps:
- name: Download digests

View File

@@ -80,6 +80,7 @@ jobs:
-D warnings
- name: Show sccache stats
if: always()
run: sccache --show-stats
cargo-test:
@@ -137,4 +138,5 @@ jobs:
--no-fail-fast
- name: Show sccache stats
if: always()
run: sccache --show-stats

View File

@@ -1,5 +1,5 @@
[files]
extend-exclude = ["*.csr"]
extend-exclude = ["*.csr", "*.lock"]
[default.extend-words]
"allocatedp" = "allocatedp"

498
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -350,7 +350,7 @@ version = "0.1.2"
[workspace.dependencies.ruma]
git = "https://forgejo.ellis.link/continuwuation/ruwuma"
#branch = "conduwuit-changes"
rev = "d6870a7fb7f6cccff63f7fd0ff6c581bad80e983"
rev = "a48665b682be1016cea53ea5e7787442dfe7c1de"
features = [
"compat",
"rand",
@@ -381,7 +381,7 @@ features = [
"unstable-msc4121",
"unstable-msc4125",
"unstable-msc4186",
"unstable-msc4203", # sending to-device events to appservices
"unstable-msc4203", # sending to-device events to appservices
"unstable-msc4210", # remove legacy mentions
"unstable-extensible-events",
"unstable-pdu",

63
SECURITY.md Normal file
View File

@@ -0,0 +1,63 @@
# Security Policy for Continuwuity
This document outlines the security policy for Continuwuity. Our goal is to maintain a secure platform for all users, and we take security matters seriously.
## Supported Versions
We provide security updates for the following versions of Continuwuity:
| Version | Supported |
| -------------- |:----------------:|
| Latest release | ✅ |
| Main branch | ✅ |
| Older releases | ❌ |
We may backport fixes to the previous release at our discretion, but we don't guarantee this.
## Reporting a Vulnerability
### Responsible Disclosure
We appreciate the efforts of security researchers and the community in identifying and reporting vulnerabilities. To ensure that potential vulnerabilities are addressed properly, please follow these guidelines:
1. **Contact members of the team directly** over E2EE private message.
- [@jade:ellis.link](https://matrix.to/#/@jade:ellis.link)
- [@nex:nexy7574.co.uk](https://matrix.to/#/@nex:nexy7574.co.uk) <!-- ? -->
2. **Email the security team** at [security@continuwuity.org](mailto:security@continuwuity.org). This is not E2EE, so don't include sensitive details.
3. **Do not disclose the vulnerability publicly** until it has been addressed
4. **Provide detailed information** about the vulnerability, including:
- A clear description of the issue
- Steps to reproduce
- Potential impact
- Any possible mitigations
- Version(s) affected, including specific commits if possible
If you have any doubts about a potential security vulnerability, contact us via private channels first! We'd prefer that you bother us, instead of having a vulnerability disclosed without a fix.
### What to Expect
When you report a security vulnerability:
1. **Acknowledgment**: We will acknowledge receipt of your report.
2. **Assessment**: We will assess the vulnerability and determine its impact on our users
3. **Updates**: We will provide updates on our progress in addressing the vulnerability, and may request you help test mitigations
4. **Resolution**: Once resolved, we will notify you and discuss coordinated disclosure
5. **Credit**: We will recognize your contribution (unless you prefer to remain anonymous)
## Security Update Process
When security vulnerabilities are identified:
1. We will develop and test fixes in a private fork
2. Security updates will be released as soon as possible
3. Release notes will include information about the vulnerabilities, avoiding details that could facilitate exploitation where possible
4. Critical security updates may be backported to the previous stable release
## Additional Resources
- [Matrix Security Disclosure Policy](https://matrix.org/security-disclosure-policy/)
- [Continuwuity Documentation](https://continuwuity.org/introduction)
---
This security policy was last updated on May 25, 2025.

View File

@@ -1641,19 +1641,29 @@
#
#server =
# This item is undocumented. Please contribute documentation for it.
# URL to a support page for the server, which will be served as part of
# the MSC1929 server support endpoint at /.well-known/matrix/support.
# Will be included alongside any contact information
#
#support_page =
# This item is undocumented. Please contribute documentation for it.
# Role string for server support contacts, to be served as part of the
# MSC1929 server support endpoint at /.well-known/matrix/support.
#
#support_role =
#support_role = "m.role.admin"
# This item is undocumented. Please contribute documentation for it.
# Email address for server support contacts, to be served as part of the
# MSC1929 server support endpoint.
# This will be used along with support_mxid if specified.
#
#support_email =
# This item is undocumented. Please contribute documentation for it.
# Matrix ID for server support contacts, to be served as part of the
# MSC1929 server support endpoint.
# This will be used along with support_email if specified.
#
# If no email or mxid is specified, all of the server's admins will be
# listed.
#
#support_mxid =

View File

@@ -20,3 +20,4 @@ # Summary
- [Testing](development/testing.md)
- [Hot Reloading ("Live" Development)](development/hot_reload.md)
- [Community (and Guidelines)](community.md)
- [Security](security.md)

1
docs/security.md Normal file
View File

@@ -0,0 +1 @@
{{#include ../SECURITY.md}}

View File

@@ -125,13 +125,13 @@ pub(super) enum DebugCommand {
reset: bool,
},
/// - Verify json signatures
/// - Sign JSON blob
///
/// This command needs a JSON blob provided in a Markdown code block below
/// the command.
SignJson,
/// - Verify json signatures
/// - Verify JSON signatures
///
/// This command needs a JSON blob provided in a Markdown code block below
/// the command.

View File

@@ -2162,6 +2162,109 @@ async fn knock_room_by_id_helper(
}
}
// For knock_restricted rooms, check if the user meets the restricted conditions
// If they do, attempt to join instead of knock
// This is not mentioned in the spec, but should be allowable (we're allowed to
// auto-join invites to knocked rooms)
let join_rule = services.rooms.state_accessor.get_join_rules(room_id).await;
if let JoinRule::KnockRestricted(restricted) = &join_rule {
let restriction_rooms: Vec<_> = restricted
.allow
.iter()
.filter_map(|a| match a {
| AllowRule::RoomMembership(r) => Some(&r.room_id),
| _ => None,
})
.collect();
// Check if the user is in any of the allowed rooms
let mut user_meets_restrictions = false;
for restriction_room_id in &restriction_rooms {
if services
.rooms
.state_cache
.is_joined(sender_user, restriction_room_id)
.await
{
user_meets_restrictions = true;
break;
}
}
// If the user meets the restrictions, try joining instead
if user_meets_restrictions {
debug_info!(
"{sender_user} meets the restricted criteria in knock_restricted room \
{room_id}, attempting to join instead of knock"
);
// For this case, we need to drop the state lock and get a new one in
// join_room_by_id_helper We need to release the lock here and let
// join_room_by_id_helper acquire it again
drop(state_lock);
match join_room_by_id_helper(
services,
sender_user,
room_id,
reason.clone(),
servers,
None,
&None,
)
.await
{
| Ok(_) => return Ok(knock_room::v3::Response::new(room_id.to_owned())),
| Err(e) => {
debug_warn!(
"Failed to convert knock to join for {sender_user} in {room_id}: {e:?}"
);
// Get a new state lock for the remaining knock logic
let new_state_lock = services.rooms.state.mutex.lock(room_id).await;
let server_in_room = services
.rooms
.state_cache
.server_in_room(services.globals.server_name(), room_id)
.await;
let local_knock = server_in_room
|| servers.is_empty()
|| (servers.len() == 1 && services.globals.server_is_ours(&servers[0]));
if local_knock {
knock_room_helper_local(
services,
sender_user,
room_id,
reason,
servers,
new_state_lock,
)
.boxed()
.await?;
} else {
knock_room_helper_remote(
services,
sender_user,
room_id,
reason,
servers,
new_state_lock,
)
.boxed()
.await?;
}
return Ok(knock_room::v3::Response::new(room_id.to_owned()));
},
}
}
} else if !matches!(join_rule, JoinRule::Knock | JoinRule::KnockRestricted(_)) {
debug_warn!(
"{sender_user} attempted to knock on room {room_id} but its join rule is \
{join_rule:?}, not knock or knock_restricted"
);
}
let server_in_room = services
.rooms
.state_cache
@@ -2209,6 +2312,12 @@ async fn knock_room_helper_local(
return Err!(Request(Forbidden("This room does not support knocking.")));
}
// Verify that this room has a valid knock or knock_restricted join rule
let join_rule = services.rooms.state_accessor.get_join_rules(room_id).await;
if !matches!(join_rule, JoinRule::Knock | JoinRule::KnockRestricted(_)) {
return Err!(Request(Forbidden("This room's join rule does not allow knocking.")));
}
let content = RoomMemberEventContent {
displayname: services.users.displayname(sender_user).await.ok(),
avatar_url: services.users.avatar_url(sender_user).await.ok(),

View File

@@ -808,7 +808,7 @@ async fn load_joined_room(
let typings = services
.rooms
.typing
.typings_all(room_id, sender_user)
.typings_event_for_user(room_id, sender_user)
.await?;
Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?])

View File

@@ -33,6 +33,7 @@
events::{
AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType, TimelineEventType,
room::member::{MembershipState, RoomMemberEventContent},
typing::TypingEventContent,
},
serde::Raw,
uint,
@@ -205,6 +206,9 @@ pub(crate) async fn sync_events_v5_route(
_ = tokio::time::timeout(duration, watcher).await;
}
let typing = collect_typing_events(services, sender_user, &body, &todo_rooms).await?;
response.extensions.typing = typing;
trace!(
rooms = ?response.rooms.len(),
account_data = ?response.extensions.account_data.rooms.len(),
@@ -288,6 +292,8 @@ async fn handle_lists<'a, Rooms, AllRooms>(
Rooms: Iterator<Item = &'a RoomId> + Clone + Send + 'a,
AllRooms: Iterator<Item = &'a RoomId> + Clone + Send + 'a,
{
// TODO MSC4186: Implement remaining list filters: is_dm, is_encrypted,
// room_types.
for (list_id, list) in &body.lists {
let active_rooms: Vec<_> = match list.filters.as_ref().and_then(|f| f.is_invite) {
| None => all_rooms.clone().collect(),
@@ -665,6 +671,62 @@ async fn process_rooms<'a, Rooms>(
}
Ok(rooms)
}
async fn collect_typing_events(
services: &Services,
sender_user: &UserId,
body: &sync_events::v5::Request,
todo_rooms: &TodoRooms,
) -> Result<sync_events::v5::response::Typing> {
if !body.extensions.typing.enabled.unwrap_or(false) {
return Ok(sync_events::v5::response::Typing::default());
}
let rooms: Vec<_> = body.extensions.typing.rooms.clone().unwrap_or_else(|| {
body.room_subscriptions
.keys()
.map(ToOwned::to_owned)
.collect()
});
let lists: Vec<_> = body
.extensions
.typing
.lists
.clone()
.unwrap_or_else(|| body.lists.keys().map(ToOwned::to_owned).collect::<Vec<_>>());
if rooms.is_empty() && lists.is_empty() {
return Ok(sync_events::v5::response::Typing::default());
}
let mut typing_response = sync_events::v5::response::Typing::default();
for (room_id, (required_state_request, timeline_limit, roomsince)) in todo_rooms {
if services.rooms.typing.last_typing_update(room_id).await? <= *roomsince {
continue;
}
match services
.rooms
.typing
.typing_users_for_user(room_id, sender_user)
.await
{
| Ok(typing_users) => {
typing_response.rooms.insert(
room_id.to_owned(), // Already OwnedRoomId
Raw::new(&sync_events::v5::response::SyncTypingEvent {
content: TypingEventContent::new(typing_users),
})?,
);
},
| Err(e) => {
warn!(%room_id, "Failed to get typing events for room: {}", e);
},
}
}
Ok(typing_response)
}
async fn collect_account_data(
services: &Services,
(sender_user, _, globalsince, body): (&UserId, &DeviceId, u64, &sync_events::v5::Request),

View File

@@ -1,5 +1,6 @@
use axum::{Json, extract::State, response::IntoResponse};
use conduwuit::{Error, Result};
use futures::StreamExt;
use ruma::api::client::{
discovery::{
discover_homeserver::{self, HomeserverInfo, SlidingSyncProxyInfo},
@@ -17,7 +18,7 @@ pub(crate) async fn well_known_client(
State(services): State<crate::State>,
_body: Ruma<discover_homeserver::Request>,
) -> Result<discover_homeserver::Response> {
let client_url = match services.server.config.well_known.client.as_ref() {
let client_url = match services.config.well_known.client.as_ref() {
| Some(url) => url.to_string(),
| None => return Err(Error::BadRequest(ErrorKind::NotFound, "Not found.")),
};
@@ -33,44 +34,63 @@ pub(crate) async fn well_known_client(
/// # `GET /.well-known/matrix/support`
///
/// Server support contact and support page of a homeserver's domain.
/// Implements MSC1929 for server discovery.
/// If no configuration is set, uses admin users as contacts.
pub(crate) async fn well_known_support(
State(services): State<crate::State>,
_body: Ruma<discover_support::Request>,
) -> Result<discover_support::Response> {
let support_page = services
.server
.config
.well_known
.support_page
.as_ref()
.map(ToString::to_string);
let role = services.server.config.well_known.support_role.clone();
// support page or role must be either defined for this to be valid
if support_page.is_none() && role.is_none() {
return Err(Error::BadRequest(ErrorKind::NotFound, "Not found."));
}
let email_address = services.server.config.well_known.support_email.clone();
let matrix_id = services.server.config.well_known.support_mxid.clone();
// if a role is specified, an email address or matrix id is required
if role.is_some() && (email_address.is_none() && matrix_id.is_none()) {
return Err(Error::BadRequest(ErrorKind::NotFound, "Not found."));
}
let email_address = services.config.well_known.support_email.clone();
let matrix_id = services.config.well_known.support_mxid.clone();
// TODO: support defining multiple contacts in the config
let mut contacts: Vec<Contact> = vec![];
if let Some(role) = role {
let contact = Contact { role, email_address, matrix_id };
let role_value = services
.config
.well_known
.support_role
.clone()
.unwrap_or_else(|| "m.role.admin".to_owned().into());
contacts.push(contact);
// Add configured contact if at least one contact method is specified
if email_address.is_some() || matrix_id.is_some() {
contacts.push(Contact {
role: role_value.clone(),
email_address: email_address.clone(),
matrix_id: matrix_id.clone(),
});
}
// Try to add admin users as contacts if no contacts are configured
if contacts.is_empty() {
if let Ok(admin_room) = services.admin.get_admin_room().await {
let admin_users = services.rooms.state_cache.room_members(&admin_room);
let mut stream = admin_users;
while let Some(user_id) = stream.next().await {
// Skip server user
if *user_id == services.globals.server_user {
break;
}
contacts.push(Contact {
role: role_value.clone(),
email_address: None,
matrix_id: Some(user_id.to_owned()),
});
}
}
}
// support page or role+contacts must be either defined for this to be valid
if contacts.is_empty() && support_page.is_none() {
// No admin room, no configured contacts, and no support page
return Err(Error::BadRequest(ErrorKind::NotFound, "Not found."));
}
@@ -84,9 +104,9 @@ pub(crate) async fn well_known_support(
pub(crate) async fn syncv3_client_server_json(
State(services): State<crate::State>,
) -> Result<impl IntoResponse> {
let server_url = match services.server.config.well_known.client.as_ref() {
let server_url = match services.config.well_known.client.as_ref() {
| Some(url) => url.to_string(),
| None => match services.server.config.well_known.server.as_ref() {
| None => match services.config.well_known.server.as_ref() {
| Some(url) => url.to_string(),
| None => return Err(Error::BadRequest(ErrorKind::NotFound, "Not found.")),
},

View File

@@ -13,13 +13,13 @@ version.workspace = true
build = "build.rs"
# [[bin]]
# path = "main.rs"
# name = "conduwuit_build_metadata"
# name = "conduwuit_build_metadata"
[lib]
path = "mod.rs"
crate-type = [
"rlib",
# "dylib",
"rlib",
# "dylib",
]
[features]
@@ -28,7 +28,7 @@ crate-type = [
[dependencies]
[build-dependencies]
built = {version = "0.7", features = ["cargo-lock", "dependency-tree"]}
built = { version = "0.8", features = [] }
[lints]
workspace = true

View File

@@ -78,12 +78,13 @@ fn main() {
}
// --- Rerun Triggers ---
// Rerun if the git HEAD changes
println!("cargo:rerun-if-changed=.git/HEAD");
// Rerun if the ref pointed to by HEAD changes (e.g., new commit on branch)
if let Some(ref_path) = run_git_command(&["symbolic-ref", "--quiet", "HEAD"]) {
println!("cargo:rerun-if-changed=.git/{ref_path}");
}
// TODO: The git rerun triggers seem to always run
// // Rerun if the git HEAD changes
// println!("cargo:rerun-if-changed=.git/HEAD");
// // Rerun if the ref pointed to by HEAD changes (e.g., new commit on branch)
// if let Some(ref_path) = run_git_command(&["symbolic-ref", "--quiet", "HEAD"])
// { println!("cargo:rerun-if-changed=.git/{ref_path}");
// }
println!("cargo:rerun-if-env-changed=GIT_COMMIT_HASH");
println!("cargo:rerun-if-env-changed=GIT_COMMIT_HASH_SHORT");

View File

@@ -12,11 +12,17 @@ pub mod built {
v
} else if let v @ Some(_) = option_env!("CONDUWUIT_VERSION_EXTRA") {
v
} else if let v @ Some(_) = option_env!("CONDUIT_VERSION_EXTRA") {
v
} else {
GIT_COMMIT_HASH_SHORT
option_env!("CONDUIT_VERSION_EXTRA")
};
#[must_use]
pub fn version_tag() -> Option<&'static str> {
VERSION_EXTRA
.filter(|s| !s.is_empty())
.or(GIT_COMMIT_HASH_SHORT)
}
pub static GIT_REMOTE_WEB_URL: Option<&str> = option_env!("GIT_REMOTE_WEB_URL");
pub static GIT_REMOTE_COMMIT_URL: Option<&str> = option_env!("GIT_REMOTE_COMMIT_URL");

View File

@@ -274,6 +274,10 @@ pub fn set_dirty_decay<I: Into<Option<usize>>>(arena: I, decay_ms: isize) -> Res
}
}
pub fn background_thread_enable(enable: bool) -> Result<bool> {
set::<u8>(&mallctl!("background_thread"), enable.into()).map(is_nonzero!())
}
#[inline]
#[must_use]
pub fn is_affine_arena() -> bool { is_percpu_arena() || is_phycpu_arena() }

View File

@@ -219,6 +219,15 @@ pub fn check(config: &Config) -> Result {
));
}
// Check if support contact information is configured
if config.well_known.support_email.is_none() && config.well_known.support_mxid.is_none() {
warn!(
"No support contact information (support_email or support_mxid) is configured in \
the well_known section. Users in the admin room will be automatically listed as \
support contacts in the /.well-known/matrix/support endpoint."
);
}
if config
.url_preview_domain_contains_allowlist
.contains(&"*".to_owned())

View File

@@ -1897,12 +1897,28 @@ pub struct WellKnownConfig {
/// example: "matrix.example.com:443"
pub server: Option<OwnedServerName>,
/// URL to a support page for the server, which will be served as part of
/// the MSC1929 server support endpoint at /.well-known/matrix/support.
/// Will be included alongside any contact information
pub support_page: Option<Url>,
/// Role string for server support contacts, to be served as part of the
/// MSC1929 server support endpoint at /.well-known/matrix/support.
///
/// default: "m.role.admin"
pub support_role: Option<ContactRole>,
/// Email address for server support contacts, to be served as part of the
/// MSC1929 server support endpoint.
/// This will be used along with support_mxid if specified.
pub support_email: Option<String>,
/// Matrix ID for server support contacts, to be served as part of the
/// MSC1929 server support endpoint.
/// This will be used along with support_email if specified.
///
/// If no email or mxid is specified, all of the server's admins will be
/// listed.
pub support_mxid: Option<OwnedUserId>,
}

View File

@@ -26,6 +26,6 @@ pub fn user_agent() -> &'static str { USER_AGENT.get_or_init(init_user_agent) }
fn init_user_agent() -> String { format!("{}/{}", name(), version()) }
fn init_version() -> String {
conduwuit_build_metadata::VERSION_EXTRA
conduwuit_build_metadata::version_tag()
.map_or(SEMANTIC.to_owned(), |extra| format!("{SEMANTIC} ({extra})"))
}

View File

@@ -1,18 +1,10 @@
use std::{
borrow::Borrow,
fmt::{Debug, Display},
hash::Hash,
};
use ruma::{EventId, MilliSecondsSinceUnixEpoch, RoomId, UserId, events::TimelineEventType};
use serde_json::value::RawValue as RawJsonValue;
/// Abstraction of a PDU so users can have their own PDU types.
pub trait Event {
type Id: Clone + Debug + Display + Eq + Ord + Hash + Send + Borrow<EventId>;
/// The `EventId` of this event.
fn event_id(&self) -> &Self::Id;
fn event_id(&self) -> &EventId;
/// The `RoomId` of this event.
fn room_id(&self) -> &RoomId;
@@ -34,20 +26,18 @@ pub trait Event {
/// The events before this event.
// Requires GATs to avoid boxing (and TAIT for making it convenient).
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_;
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_;
/// All the authenticating events for this event.
// Requires GATs to avoid boxing (and TAIT for making it convenient).
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_;
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_;
/// If this event is a redaction event this is the event it redacts.
fn redacts(&self) -> Option<&Self::Id>;
fn redacts(&self) -> Option<&EventId>;
}
impl<T: Event> Event for &T {
type Id = T::Id;
fn event_id(&self) -> &Self::Id { (*self).event_id() }
fn event_id(&self) -> &EventId { (*self).event_id() }
fn room_id(&self) -> &RoomId { (*self).room_id() }
@@ -61,13 +51,13 @@ fn content(&self) -> &RawJsonValue { (*self).content() }
fn state_key(&self) -> Option<&str> { (*self).state_key() }
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_ {
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ {
(*self).prev_events()
}
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_ {
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ {
(*self).auth_events()
}
fn redacts(&self) -> Option<&Self::Id> { (*self).redacts() }
fn redacts(&self) -> Option<&EventId> { (*self).redacts() }
}

View File

@@ -79,9 +79,7 @@ pub fn from_id_val(event_id: &EventId, mut json: CanonicalJsonObject) -> Result<
}
impl Event for Pdu {
type Id = OwnedEventId;
fn event_id(&self) -> &Self::Id { &self.event_id }
fn event_id(&self) -> &EventId { &self.event_id }
fn room_id(&self) -> &RoomId { &self.room_id }
@@ -97,15 +95,15 @@ fn origin_server_ts(&self) -> MilliSecondsSinceUnixEpoch {
fn state_key(&self) -> Option<&str> { self.state_key.as_deref() }
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_ {
self.prev_events.iter()
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ {
self.prev_events.iter().map(AsRef::as_ref)
}
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_ {
self.auth_events.iter()
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ {
self.auth_events.iter().map(AsRef::as_ref)
}
fn redacts(&self) -> Option<&Self::Id> { self.redacts.as_ref() }
fn redacts(&self) -> Option<&EventId> { self.redacts.as_deref() }
}
/// Prevent derived equality which wouldn't limit itself to event_id

View File

@@ -1,8 +1,8 @@
use ruma::{
events::{
AnyEphemeralRoomEvent, AnyMessageLikeEvent, AnyStateEvent, AnyStrippedStateEvent,
AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent, StateEvent,
room::member::RoomMemberEventContent, space::child::HierarchySpaceChildEvent,
AnyMessageLikeEvent, AnyStateEvent, AnyStrippedStateEvent, AnySyncStateEvent,
AnySyncTimelineEvent, AnyTimelineEvent, StateEvent, room::member::RoomMemberEventContent,
space::child::HierarchySpaceChildEvent,
},
serde::Raw,
};
@@ -10,41 +10,6 @@
use crate::implement;
/// This only works for events that are also AnyRoomEvents.
#[must_use]
#[implement(super::Pdu)]
pub fn into_any_event(self) -> Raw<AnyEphemeralRoomEvent> {
serde_json::from_value(self.into_any_event_value()).expect("Raw::from_value always works")
}
/// This only works for events that are also AnyRoomEvents.
#[implement(super::Pdu)]
#[must_use]
#[inline]
pub fn into_any_event_value(self) -> JsonValue {
let (redacts, content) = self.copy_redacts();
let mut json = json!({
"content": content,
"type": self.kind,
"event_id": self.event_id,
"sender": self.sender,
"origin_server_ts": self.origin_server_ts,
"room_id": self.room_id,
});
if let Some(unsigned) = &self.unsigned {
json["unsigned"] = json!(unsigned);
}
if let Some(state_key) = &self.state_key {
json["state_key"] = json!(state_key);
}
if let Some(redacts) = &redacts {
json["redacts"] = json!(redacts);
}
json
}
#[implement(super::Pdu)]
#[must_use]
#[inline]
@@ -53,7 +18,8 @@ pub fn into_room_event(self) -> Raw<AnyTimelineEvent> { self.to_room_event() }
#[implement(super::Pdu)]
#[must_use]
pub fn to_room_event(&self) -> Raw<AnyTimelineEvent> {
serde_json::from_value(self.to_room_event_value()).expect("Raw::from_value always works")
let value = self.to_room_event_value();
serde_json::from_value(value).expect("Failed to serialize Event value")
}
#[implement(super::Pdu)]
@@ -91,8 +57,8 @@ pub fn into_message_like_event(self) -> Raw<AnyMessageLikeEvent> { self.to_messa
#[implement(super::Pdu)]
#[must_use]
pub fn to_message_like_event(&self) -> Raw<AnyMessageLikeEvent> {
serde_json::from_value(self.to_message_like_event_value())
.expect("Raw::from_value always works")
let value = self.to_message_like_event_value();
serde_json::from_value(value).expect("Failed to serialize Event value")
}
#[implement(super::Pdu)]
@@ -130,7 +96,8 @@ pub fn into_sync_room_event(self) -> Raw<AnySyncTimelineEvent> { self.to_sync_ro
#[implement(super::Pdu)]
#[must_use]
pub fn to_sync_room_event(&self) -> Raw<AnySyncTimelineEvent> {
serde_json::from_value(self.to_sync_room_event_value()).expect("Raw::from_value always works")
let value = self.to_sync_room_event_value();
serde_json::from_value(value).expect("Failed to serialize Event value")
}
#[implement(super::Pdu)]
@@ -162,7 +129,8 @@ pub fn to_sync_room_event_value(&self) -> JsonValue {
#[implement(super::Pdu)]
#[must_use]
pub fn into_state_event(self) -> Raw<AnyStateEvent> {
serde_json::from_value(self.into_state_event_value()).expect("Raw::from_value always works")
let value = self.into_state_event_value();
serde_json::from_value(value).expect("Failed to serialize Event value")
}
#[implement(super::Pdu)]
@@ -189,8 +157,8 @@ pub fn into_state_event_value(self) -> JsonValue {
#[implement(super::Pdu)]
#[must_use]
pub fn into_sync_state_event(self) -> Raw<AnySyncStateEvent> {
serde_json::from_value(self.into_sync_state_event_value())
.expect("Raw::from_value always works")
let value = self.into_sync_state_event_value();
serde_json::from_value(value).expect("Failed to serialize Event value")
}
#[implement(super::Pdu)]
@@ -223,8 +191,8 @@ pub fn into_stripped_state_event(self) -> Raw<AnyStrippedStateEvent> {
#[implement(super::Pdu)]
#[must_use]
pub fn to_stripped_state_event(&self) -> Raw<AnyStrippedStateEvent> {
serde_json::from_value(self.to_stripped_state_event_value())
.expect("Raw::from_value always works")
let value = self.to_stripped_state_event_value();
serde_json::from_value(value).expect("Failed to serialize Event value")
}
#[implement(super::Pdu)]
@@ -242,8 +210,8 @@ pub fn to_stripped_state_event_value(&self) -> JsonValue {
#[implement(super::Pdu)]
#[must_use]
pub fn into_stripped_spacechild_state_event(self) -> Raw<HierarchySpaceChildEvent> {
serde_json::from_value(self.into_stripped_spacechild_state_event_value())
.expect("Raw::from_value always works")
let value = self.into_stripped_spacechild_state_event_value();
serde_json::from_value(value).expect("Failed to serialize Event value")
}
#[implement(super::Pdu)]
@@ -262,7 +230,8 @@ pub fn into_stripped_spacechild_state_event_value(self) -> JsonValue {
#[implement(super::Pdu)]
#[must_use]
pub fn into_member_event(self) -> Raw<StateEvent<RoomMemberEventContent>> {
serde_json::from_value(self.into_member_event_value()).expect("Raw::from_value always works")
let value = self.into_member_event_value();
serde_json::from_value(value).expect("Failed to serialize Event value")
}
#[implement(super::Pdu)]

View File

@@ -52,7 +52,6 @@ fn lexico_topo_sort(c: &mut test::Bencher) {
#[cfg(conduwuit_bench)]
#[cfg_attr(conduwuit_bench, bench)]
fn resolution_shallow_auth_chain(c: &mut test::Bencher) {
let parallel_fetches = 32;
let mut store = TestStore(hashmap! {});
// build up the DAG
@@ -78,7 +77,6 @@ fn resolution_shallow_auth_chain(c: &mut test::Bencher) {
&auth_chain_sets,
&fetch,
&exists,
parallel_fetches,
)
.await
{
@@ -91,7 +89,6 @@ fn resolution_shallow_auth_chain(c: &mut test::Bencher) {
#[cfg(conduwuit_bench)]
#[cfg_attr(conduwuit_bench, bench)]
fn resolve_deeper_event_set(c: &mut test::Bencher) {
let parallel_fetches = 32;
let mut inner = INITIAL_EVENTS();
let ban = BAN_STATE_SET();
@@ -153,7 +150,6 @@ fn resolve_deeper_event_set(c: &mut test::Bencher) {
&auth_chain_sets,
&fetch,
&exists,
parallel_fetches,
)
.await
{
@@ -190,7 +186,11 @@ fn get_events(&self, room_id: &RoomId, event_ids: &[OwnedEventId]) -> Result<Vec
}
/// Returns a Vec of the related auth events to the given `event`.
fn auth_event_ids(&self, room_id: &RoomId, event_ids: Vec<E::Id>) -> Result<HashSet<E::Id>> {
fn auth_event_ids(
&self,
room_id: &RoomId,
event_ids: Vec<OwnedEventId>,
) -> Result<HashSet<OwnedEventId>> {
let mut result = HashSet::new();
let mut stack = event_ids;
@@ -216,8 +216,8 @@ fn auth_event_ids(&self, room_id: &RoomId, event_ids: Vec<E::Id>) -> Result<Hash
fn auth_chain_diff(
&self,
room_id: &RoomId,
event_ids: Vec<Vec<E::Id>>,
) -> Result<Vec<E::Id>> {
event_ids: Vec<Vec<OwnedEventId>>,
) -> Result<Vec<OwnedEventId>> {
let mut auth_chain_sets = vec![];
for ids in event_ids {
// TODO state store `auth_event_ids` returns self in the event ids list
@@ -238,7 +238,7 @@ fn auth_chain_diff(
Ok(auth_chain_sets
.into_iter()
.flatten()
.filter(|id| !common.contains(id.borrow()))
.filter(|id| !common.contains(id))
.collect())
} else {
Ok(vec![])
@@ -565,7 +565,7 @@ fn with_state_key(self, state_key: impl Into<String>) -> (StateEventType, String
mod event {
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId,
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId,
events::{TimelineEventType, pdu::Pdu},
};
use serde::{Deserialize, Serialize};
@@ -574,9 +574,7 @@ mod event {
use super::Event;
impl Event for PduEvent {
type Id = OwnedEventId;
fn event_id(&self) -> &Self::Id { &self.event_id }
fn event_id(&self) -> &EventId { &self.event_id }
fn room_id(&self) -> &RoomId {
match &self.rest {
@@ -632,28 +630,30 @@ fn state_key(&self) -> Option<&str> {
}
}
fn prev_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + Send + '_> {
fn prev_events(&self) -> Box<dyn DoubleEndedIterator<Item = &EventId> + Send + '_> {
match &self.rest {
| Pdu::RoomV1Pdu(ev) => Box::new(ev.prev_events.iter().map(|(id, _)| id)),
| Pdu::RoomV3Pdu(ev) => Box::new(ev.prev_events.iter()),
| Pdu::RoomV1Pdu(ev) =>
Box::new(ev.prev_events.iter().map(|(id, _)| id.as_ref())),
| Pdu::RoomV3Pdu(ev) => Box::new(ev.prev_events.iter().map(AsRef::as_ref)),
#[cfg(not(feature = "unstable-exhaustive-types"))]
| _ => unreachable!("new PDU version"),
}
}
fn auth_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + Send + '_> {
fn auth_events(&self) -> Box<dyn DoubleEndedIterator<Item = &EventId> + Send + '_> {
match &self.rest {
| Pdu::RoomV1Pdu(ev) => Box::new(ev.auth_events.iter().map(|(id, _)| id)),
| Pdu::RoomV3Pdu(ev) => Box::new(ev.auth_events.iter()),
| Pdu::RoomV1Pdu(ev) =>
Box::new(ev.auth_events.iter().map(|(id, _)| id.as_ref())),
| Pdu::RoomV3Pdu(ev) => Box::new(ev.auth_events.iter().map(AsRef::as_ref)),
#[cfg(not(feature = "unstable-exhaustive-types"))]
| _ => unreachable!("new PDU version"),
}
}
fn redacts(&self) -> Option<&Self::Id> {
fn redacts(&self) -> Option<&EventId> {
match &self.rest {
| Pdu::RoomV1Pdu(ev) => ev.redacts.as_ref(),
| Pdu::RoomV3Pdu(ev) => ev.redacts.as_ref(),
| Pdu::RoomV1Pdu(ev) => ev.redacts.as_deref(),
| Pdu::RoomV3Pdu(ev) => ev.redacts.as_deref(),
#[cfg(not(feature = "unstable-exhaustive-types"))]
| _ => unreachable!("new PDU version"),
}

View File

@@ -133,7 +133,7 @@ struct RoomMemberContentFields {
level = "debug",
skip_all,
fields(
event_id = incoming_event.event_id().borrow().as_str()
event_id = incoming_event.event_id().as_str(),
)
)]
pub async fn auth_check<F, Fut, Fetched, Incoming>(
@@ -259,7 +259,7 @@ struct RoomCreateContentFields {
// 3. If event does not have m.room.create in auth_events reject
if !incoming_event
.auth_events()
.any(|id| id.borrow() == room_create_event.event_id().borrow())
.any(|id| id == room_create_event.event_id())
{
warn!("no m.room.create event in auth events");
return Ok(false);
@@ -638,7 +638,7 @@ struct GetThirdPartyInvite {
warn!(?target_user_membership_event_id, "Banned user can't join");
false
} else if (join_rules == JoinRule::Invite
|| room_version.allow_knocking && join_rules == JoinRule::Knock)
|| room_version.allow_knocking && (join_rules == JoinRule::Knock || matches!(join_rules, JoinRule::KnockRestricted(_))))
// If the join_rule is invite then allow if membership state is invite or join
&& (target_user_current_membership == MembershipState::Join
|| target_user_current_membership == MembershipState::Invite)
@@ -1021,11 +1021,11 @@ fn check_redaction(
// If the domain of the event_id of the event being redacted is the same as the
// domain of the event_id of the m.room.redaction, allow
if redaction_event.event_id().borrow().server_name()
if redaction_event.event_id().server_name()
== redaction_event
.redacts()
.as_ref()
.and_then(|&id| id.borrow().server_name())
.and_then(|&id| id.server_name())
{
debug!("redaction event allowed via room version 1 rules");
return Ok(true);

View File

@@ -20,7 +20,7 @@
use futures::{Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future};
use ruma::{
EventId, Int, MilliSecondsSinceUnixEpoch, RoomVersionId,
EventId, Int, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomVersionId,
events::{
StateEventType, TimelineEventType,
room::member::{MembershipState, RoomMemberEventContent},
@@ -39,9 +39,7 @@
debug, debug_error,
matrix::{event::Event, pdu::StateKey},
trace,
utils::stream::{
BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryReadyExt, WidebandExt,
},
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, WidebandExt},
warn,
};
@@ -69,9 +67,6 @@
/// * `event_fetch` - Any event not found in the `event_map` will defer to this
/// closure to find the event.
///
/// * `parallel_fetches` - The number of asynchronous fetch requests in-flight
/// for any given operation.
///
/// ## Invariants
///
/// The caller of `resolve` must ensure that all the events are from the same
@@ -82,21 +77,19 @@
pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, ExistsFut>(
room_version: &RoomVersionId,
state_sets: Sets,
auth_chain_sets: &'a [HashSet<E::Id, Hasher>],
auth_chain_sets: &'a [HashSet<OwnedEventId, Hasher>],
event_fetch: &Fetch,
event_exists: &Exists,
parallel_fetches: usize,
) -> Result<StateMap<E::Id>>
) -> Result<StateMap<OwnedEventId>>
where
Fetch: Fn(E::Id) -> FetchFut + Sync,
Fetch: Fn(OwnedEventId) -> FetchFut + Sync,
FetchFut: Future<Output = Option<E>> + Send,
Exists: Fn(E::Id) -> ExistsFut + Sync,
Exists: Fn(OwnedEventId) -> ExistsFut + Sync,
ExistsFut: Future<Output = bool> + Send,
Sets: IntoIterator<IntoIter = SetIter> + Send,
SetIter: Iterator<Item = &'a StateMap<E::Id>> + Clone + Send,
SetIter: Iterator<Item = &'a StateMap<OwnedEventId>> + Clone + Send,
Hasher: BuildHasher + Send + Sync,
E: Event + Clone + Send + Sync,
E::Id: Borrow<EventId> + Send + Sync,
for<'b> &'b E: Send,
{
debug!("State resolution starting");
@@ -147,13 +140,8 @@ pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, Exis
// Sort the control events based on power_level/clock/event_id and
// outgoing/incoming edges
let sorted_control_levels = reverse_topological_power_sort(
control_events,
&all_conflicted,
&event_fetch,
parallel_fetches,
)
.await?;
let sorted_control_levels =
reverse_topological_power_sort(control_events, &all_conflicted, &event_fetch).await?;
debug!(count = sorted_control_levels.len(), "power events");
trace!(list = ?sorted_control_levels, "sorted power events");
@@ -162,7 +150,7 @@ pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, Exis
// Sequentially auth check each control event.
let resolved_control = iterative_auth_check(
&room_version,
sorted_control_levels.iter().stream(),
sorted_control_levels.iter().stream().map(AsRef::as_ref),
clean.clone(),
&event_fetch,
)
@@ -179,7 +167,7 @@ pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, Exis
// that failed auth
let events_to_resolve: Vec<_> = all_conflicted
.iter()
.filter(|&id| !deduped_power_ev.contains(id.borrow()))
.filter(|&id| !deduped_power_ev.contains(id))
.cloned()
.collect();
@@ -199,7 +187,7 @@ pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, Exis
let mut resolved_state = iterative_auth_check(
&room_version,
sorted_left_events.iter().stream(),
sorted_left_events.iter().stream().map(AsRef::as_ref),
resolved_control, // The control events are added to the final resolved state
&event_fetch,
)
@@ -292,16 +280,14 @@ fn get_auth_chain_diff<Id, Hasher>(
/// earlier (further back in time) origin server timestamp.
#[tracing::instrument(level = "debug", skip_all)]
async fn reverse_topological_power_sort<E, F, Fut>(
events_to_sort: Vec<E::Id>,
auth_diff: &HashSet<E::Id>,
events_to_sort: Vec<OwnedEventId>,
auth_diff: &HashSet<OwnedEventId>,
fetch_event: &F,
parallel_fetches: usize,
) -> Result<Vec<E::Id>>
) -> Result<Vec<OwnedEventId>>
where
F: Fn(E::Id) -> Fut + Sync,
F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send,
E: Event + Send + Sync,
E::Id: Borrow<EventId> + Send + Sync,
{
debug!("reverse topological sort of power events");
@@ -311,35 +297,36 @@ async fn reverse_topological_power_sort<E, F, Fut>(
}
// This is used in the `key_fn` passed to the lexico_topo_sort fn
let event_to_pl = graph
let event_to_pl: HashMap<_, _> = graph
.keys()
.cloned()
.stream()
.map(|event_id| {
get_power_level_for_sender(event_id.clone(), fetch_event)
.map(move |res| res.map(|pl| (event_id, pl)))
.broad_filter_map(async |event_id| {
let pl = get_power_level_for_sender(&event_id, fetch_event)
.await
.ok()?;
Some((event_id, pl))
})
.buffer_unordered(parallel_fetches)
.ready_try_fold(HashMap::new(), |mut event_to_pl, (event_id, pl)| {
.inspect(|(event_id, pl)| {
debug!(
event_id = event_id.borrow().as_str(),
power_level = i64::from(pl),
event_id = event_id.as_str(),
power_level = i64::from(*pl),
"found the power level of an event's sender",
);
event_to_pl.insert(event_id.clone(), pl);
Ok(event_to_pl)
})
.collect()
.boxed()
.await?;
.await;
let event_to_pl = &event_to_pl;
let fetcher = |event_id: E::Id| async move {
let fetcher = async |event_id: OwnedEventId| {
let pl = *event_to_pl
.get(event_id.borrow())
.get(&event_id)
.ok_or_else(|| Error::NotFound(String::new()))?;
let ev = fetch_event(event_id)
.await
.ok_or_else(|| Error::NotFound(String::new()))?;
Ok((pl, ev.origin_server_ts()))
};
@@ -476,18 +463,17 @@ fn partial_cmp(&self, other: &Self) -> Option<Ordering> { Some(self.cmp(other))
/// the eventId at the eventId's generation (we walk backwards to `EventId`s
/// most recent previous power level event).
async fn get_power_level_for_sender<E, F, Fut>(
event_id: E::Id,
event_id: &EventId,
fetch_event: &F,
) -> serde_json::Result<Int>
where
F: Fn(E::Id) -> Fut + Sync,
F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send,
E: Event + Send,
E::Id: Borrow<EventId> + Send,
{
debug!("fetch event ({event_id}) senders power level");
let event = fetch_event(event_id).await;
let event = fetch_event(event_id.to_owned()).await;
let auth_events = event.as_ref().map(Event::auth_events);
@@ -495,7 +481,7 @@ async fn get_power_level_for_sender<E, F, Fut>(
.into_iter()
.flatten()
.stream()
.broadn_filter_map(5, |aid| fetch_event(aid.clone()))
.broadn_filter_map(5, |aid| fetch_event(aid.to_owned()))
.ready_find(|aev| is_type_and_key(aev, &TimelineEventType::RoomPowerLevels, ""))
.await;
@@ -528,14 +514,13 @@ async fn get_power_level_for_sender<E, F, Fut>(
async fn iterative_auth_check<'a, E, F, Fut, S>(
room_version: &RoomVersion,
events_to_check: S,
unconflicted_state: StateMap<E::Id>,
unconflicted_state: StateMap<OwnedEventId>,
fetch_event: &F,
) -> Result<StateMap<E::Id>>
) -> Result<StateMap<OwnedEventId>>
where
F: Fn(E::Id) -> Fut + Sync,
F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send,
E::Id: Borrow<EventId> + Clone + Eq + Ord + Send + Sync + 'a,
S: Stream<Item = &'a E::Id> + Send + 'a,
S: Stream<Item = &'a EventId> + Send + 'a,
E: Event + Clone + Send + Sync,
{
debug!("starting iterative auth check");
@@ -543,7 +528,7 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
let events_to_check: Vec<_> = events_to_check
.map(Result::Ok)
.broad_and_then(async |event_id| {
fetch_event(event_id.clone())
fetch_event(event_id.to_owned())
.await
.ok_or_else(|| Error::NotFound(format!("Failed to find {event_id}")))
})
@@ -551,16 +536,16 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
.boxed()
.await?;
let auth_event_ids: HashSet<E::Id> = events_to_check
let auth_event_ids: HashSet<OwnedEventId> = events_to_check
.iter()
.flat_map(|event: &E| event.auth_events().map(Clone::clone))
.flat_map(|event: &E| event.auth_events().map(ToOwned::to_owned))
.collect();
let auth_events: HashMap<E::Id, E> = auth_event_ids
let auth_events: HashMap<OwnedEventId, E> = auth_event_ids
.into_iter()
.stream()
.broad_filter_map(fetch_event)
.map(|auth_event| (auth_event.event_id().clone(), auth_event))
.map(|auth_event| (auth_event.event_id().to_owned(), auth_event))
.collect()
.boxed()
.await;
@@ -581,7 +566,7 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
let mut auth_state = StateMap::new();
for aid in event.auth_events() {
if let Some(ev) = auth_events.get(aid.borrow()) {
if let Some(ev) = auth_events.get(aid) {
//TODO: synapse checks "rejected_reason" which is most likely related to
// soft-failing
auth_state.insert(
@@ -592,7 +577,7 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
ev.clone(),
);
} else {
warn!(event_id = aid.borrow().as_str(), "missing auth event");
warn!(event_id = aid.as_str(), "missing auth event");
}
}
@@ -601,7 +586,7 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
.stream()
.ready_filter_map(|key| Some((key, resolved_state.get(key)?)))
.filter_map(|(key, ev_id)| async move {
if let Some(event) = auth_events.get(ev_id.borrow()) {
if let Some(event) = auth_events.get(ev_id) {
Some((key, event.clone()))
} else {
Some((key, fetch_event(ev_id.clone()).await?))
@@ -633,7 +618,7 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
// add event to resolved state map
resolved_state.insert(
event.event_type().with_state_key(state_key),
event.event_id().clone(),
event.event_id().to_owned(),
);
},
| Ok(false) => {
@@ -660,15 +645,14 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
/// level as a parent) will be marked as depth 1. depth 1 is "older" than depth
/// 0.
async fn mainline_sort<E, F, Fut>(
to_sort: &[E::Id],
resolved_power_level: Option<E::Id>,
to_sort: &[OwnedEventId],
resolved_power_level: Option<OwnedEventId>,
fetch_event: &F,
) -> Result<Vec<E::Id>>
) -> Result<Vec<OwnedEventId>>
where
F: Fn(E::Id) -> Fut + Sync,
F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send,
E: Event + Clone + Send + Sync,
E::Id: Borrow<EventId> + Clone + Send + Sync,
{
debug!("mainline sort of events");
@@ -688,7 +672,7 @@ async fn mainline_sort<E, F, Fut>(
pl = None;
for aid in event.auth_events() {
let ev = fetch_event(aid.clone())
let ev = fetch_event(aid.to_owned())
.await
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
@@ -734,26 +718,25 @@ async fn mainline_sort<E, F, Fut>(
/// that has an associated mainline depth.
async fn get_mainline_depth<E, F, Fut>(
mut event: Option<E>,
mainline_map: &HashMap<E::Id, usize>,
mainline_map: &HashMap<OwnedEventId, usize>,
fetch_event: &F,
) -> Result<usize>
where
F: Fn(E::Id) -> Fut + Sync,
F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send,
E: Event + Send + Sync,
E::Id: Borrow<EventId> + Send + Sync,
{
while let Some(sort_ev) = event {
debug!(event_id = sort_ev.event_id().borrow().as_str(), "mainline");
debug!(event_id = sort_ev.event_id().as_str(), "mainline");
let id = sort_ev.event_id();
if let Some(depth) = mainline_map.get(id.borrow()) {
if let Some(depth) = mainline_map.get(id) {
return Ok(*depth);
}
event = None;
for aid in sort_ev.auth_events() {
let aev = fetch_event(aid.clone())
let aev = fetch_event(aid.to_owned())
.await
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
@@ -768,15 +751,14 @@ async fn get_mainline_depth<E, F, Fut>(
}
async fn add_event_and_auth_chain_to_graph<E, F, Fut>(
graph: &mut HashMap<E::Id, HashSet<E::Id>>,
event_id: E::Id,
auth_diff: &HashSet<E::Id>,
graph: &mut HashMap<OwnedEventId, HashSet<OwnedEventId>>,
event_id: OwnedEventId,
auth_diff: &HashSet<OwnedEventId>,
fetch_event: &F,
) where
F: Fn(E::Id) -> Fut + Sync,
F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send,
E: Event + Send + Sync,
E::Id: Borrow<EventId> + Clone + Send + Sync,
{
let mut state = vec![event_id];
while let Some(eid) = state.pop() {
@@ -786,26 +768,27 @@ async fn add_event_and_auth_chain_to_graph<E, F, Fut>(
// Prefer the store to event as the store filters dedups the events
for aid in auth_events {
if auth_diff.contains(aid.borrow()) {
if !graph.contains_key(aid.borrow()) {
if auth_diff.contains(aid) {
if !graph.contains_key(aid) {
state.push(aid.to_owned());
}
// We just inserted this at the start of the while loop
graph.get_mut(eid.borrow()).unwrap().insert(aid.to_owned());
graph
.get_mut(&eid)
.expect("We just inserted this at the start of the while loop")
.insert(aid.to_owned());
}
}
}
}
async fn is_power_event_id<E, F, Fut>(event_id: &E::Id, fetch: &F) -> bool
async fn is_power_event_id<E, F, Fut>(event_id: &EventId, fetch: &F) -> bool
where
F: Fn(E::Id) -> Fut + Sync,
F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send,
E: Event + Send,
E::Id: Borrow<EventId> + Send + Sync,
{
match fetch(event_id.clone()).await.as_ref() {
match fetch(event_id.to_owned()).await.as_ref() {
| Some(state) => is_power_event(state),
| _ => false,
}
@@ -909,13 +892,13 @@ async fn test_event_sort() {
let fetcher = |id| ready(events.get(&id).cloned());
let sorted_power_events =
super::reverse_topological_power_sort(power_events, &auth_chain, &fetcher, 1)
super::reverse_topological_power_sort(power_events, &auth_chain, &fetcher)
.await
.unwrap();
let resolved_power = super::iterative_auth_check(
&RoomVersion::V6,
sorted_power_events.iter().stream(),
sorted_power_events.iter().map(AsRef::as_ref).stream(),
HashMap::new(), // unconflicted events
&fetcher,
)
@@ -1300,7 +1283,7 @@ async fn test_event_map_none() {
let ev_map = store.0.clone();
let fetcher = |id| ready(ev_map.get(&id).cloned());
let exists = |id: <PduEvent as Event>::Id| ready(ev_map.get(&*id).is_some());
let exists = |id: OwnedEventId| ready(ev_map.get(&*id).is_some());
let state_sets = [state_at_bob, state_at_charlie];
let auth_chain: Vec<_> = state_sets
@@ -1312,19 +1295,13 @@ async fn test_event_map_none() {
})
.collect();
let resolved = match super::resolve(
&RoomVersionId::V2,
&state_sets,
&auth_chain,
&fetcher,
&exists,
1,
)
.await
{
| Ok(state) => state,
| Err(e) => panic!("{e}"),
};
let resolved =
match super::resolve(&RoomVersionId::V2, &state_sets, &auth_chain, &fetcher, &exists)
.await
{
| Ok(state) => state,
| Err(e) => panic!("{e}"),
};
assert_eq!(expected, resolved);
}
@@ -1429,21 +1406,15 @@ async fn ban_with_auth_chains2() {
})
.collect();
let fetcher = |id: <PduEvent as Event>::Id| ready(ev_map.get(&id).cloned());
let exists = |id: <PduEvent as Event>::Id| ready(ev_map.get(&id).is_some());
let resolved = match super::resolve(
&RoomVersionId::V6,
&state_sets,
&auth_chain,
&fetcher,
&exists,
1,
)
.await
{
| Ok(state) => state,
| Err(e) => panic!("{e}"),
};
let fetcher = |id: OwnedEventId| ready(ev_map.get(&id).cloned());
let exists = |id: OwnedEventId| ready(ev_map.get(&id).is_some());
let resolved =
match super::resolve(&RoomVersionId::V6, &state_sets, &auth_chain, &fetcher, &exists)
.await
{
| Ok(state) => state,
| Err(e) => panic!("{e}"),
};
debug!(
resolved = ?resolved

View File

@@ -133,17 +133,11 @@ pub(crate) async fn do_check(
.collect();
let event_map = &event_map;
let fetch = |id: <PduEvent as Event>::Id| ready(event_map.get(&id).cloned());
let exists = |id: <PduEvent as Event>::Id| ready(event_map.get(&id).is_some());
let resolved = super::resolve(
&RoomVersionId::V6,
state_sets,
&auth_chain_sets,
&fetch,
&exists,
1,
)
.await;
let fetch = |id: OwnedEventId| ready(event_map.get(&id).cloned());
let exists = |id: OwnedEventId| ready(event_map.get(&id).is_some());
let resolved =
super::resolve(&RoomVersionId::V6, state_sets, &auth_chain_sets, &fetch, &exists)
.await;
match resolved {
| Ok(state) => state,
@@ -247,8 +241,8 @@ pub(crate) fn get_event(&self, _: &RoomId, event_id: &EventId) -> Result<E> {
pub(crate) fn auth_event_ids(
&self,
room_id: &RoomId,
event_ids: Vec<E::Id>,
) -> Result<HashSet<E::Id>> {
event_ids: Vec<OwnedEventId>,
) -> Result<HashSet<OwnedEventId>> {
let mut result = HashSet::new();
let mut stack = event_ids;
@@ -584,7 +578,7 @@ pub(crate) fn INITIAL_EDGES() -> Vec<OwnedEventId> {
pub(crate) mod event {
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId,
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId,
events::{TimelineEventType, pdu::Pdu},
};
use serde::{Deserialize, Serialize};
@@ -593,9 +587,7 @@ pub(crate) mod event {
use crate::Event;
impl Event for PduEvent {
type Id = OwnedEventId;
fn event_id(&self) -> &Self::Id { &self.event_id }
fn event_id(&self) -> &EventId { &self.event_id }
fn room_id(&self) -> &RoomId {
match &self.rest {
@@ -652,29 +644,31 @@ fn state_key(&self) -> Option<&str> {
}
#[allow(refining_impl_trait)]
fn prev_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + Send + '_> {
fn prev_events(&self) -> Box<dyn DoubleEndedIterator<Item = &EventId> + Send + '_> {
match &self.rest {
| Pdu::RoomV1Pdu(ev) => Box::new(ev.prev_events.iter().map(|(id, _)| id)),
| Pdu::RoomV3Pdu(ev) => Box::new(ev.prev_events.iter()),
| Pdu::RoomV1Pdu(ev) =>
Box::new(ev.prev_events.iter().map(|(id, _)| id.as_ref())),
| Pdu::RoomV3Pdu(ev) => Box::new(ev.prev_events.iter().map(AsRef::as_ref)),
#[allow(unreachable_patterns)]
| _ => unreachable!("new PDU version"),
}
}
#[allow(refining_impl_trait)]
fn auth_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + Send + '_> {
fn auth_events(&self) -> Box<dyn DoubleEndedIterator<Item = &EventId> + Send + '_> {
match &self.rest {
| Pdu::RoomV1Pdu(ev) => Box::new(ev.auth_events.iter().map(|(id, _)| id)),
| Pdu::RoomV3Pdu(ev) => Box::new(ev.auth_events.iter()),
| Pdu::RoomV1Pdu(ev) =>
Box::new(ev.auth_events.iter().map(|(id, _)| id.as_ref())),
| Pdu::RoomV3Pdu(ev) => Box::new(ev.auth_events.iter().map(AsRef::as_ref)),
#[allow(unreachable_patterns)]
| _ => unreachable!("new PDU version"),
}
}
fn redacts(&self) -> Option<&Self::Id> {
fn redacts(&self) -> Option<&EventId> {
match &self.rest {
| Pdu::RoomV1Pdu(ev) => ev.redacts.as_ref(),
| Pdu::RoomV3Pdu(ev) => ev.redacts.as_ref(),
| Pdu::RoomV1Pdu(ev) => ev.redacts.as_deref(),
| Pdu::RoomV3Pdu(ev) => ev.redacts.as_deref(),
#[allow(unreachable_patterns)]
| _ => unreachable!("new PDU version"),
}

View File

@@ -21,7 +21,10 @@
pub use ::tracing;
pub use config::Config;
pub use error::Error;
pub use info::{rustc_flags_capture, version, version::version};
pub use info::{
rustc_flags_capture, version,
version::{name, version},
};
pub use matrix::{Event, EventTypeExt, PduCount, PduEvent, PduId, RoomVersion, pdu, state_res};
pub use server::Server;
pub use utils::{ctor, dtor, implement, result, result::Result};

View File

@@ -15,7 +15,7 @@
#[clap(
about,
long_about = None,
name = "conduwuit",
name = conduwuit_core::name(),
version = conduwuit_core::version(),
)]
pub(crate) struct Args {

View File

@@ -98,12 +98,7 @@ pub(super) fn shutdown(server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
Level::INFO
};
debug!(
timeout = ?SHUTDOWN_TIMEOUT,
"Waiting for runtime..."
);
runtime.shutdown_timeout(SHUTDOWN_TIMEOUT);
wait_shutdown(server, runtime);
let runtime_metrics = server.server.metrics.runtime_interval().unwrap_or_default();
event!(LEVEL, ?runtime_metrics, "Final runtime metrics");
@@ -111,13 +106,23 @@ pub(super) fn shutdown(server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
#[cfg(not(tokio_unstable))]
#[tracing::instrument(name = "stop", level = "info", skip_all)]
pub(super) fn shutdown(_server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
pub(super) fn shutdown(server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
wait_shutdown(server, runtime);
}
fn wait_shutdown(_server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
debug!(
timeout = ?SHUTDOWN_TIMEOUT,
"Waiting for runtime..."
);
runtime.shutdown_timeout(SHUTDOWN_TIMEOUT);
// Join any jemalloc threads so they don't appear in use at exit.
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
conduwuit_core::alloc::je::background_thread_enable(false)
.log_debug_err()
.ok();
}
#[tracing::instrument(

View File

@@ -8,7 +8,7 @@
Error, Result, err, implement,
state_res::{self, StateMap},
trace,
utils::stream::{IterStream, ReadyExt, TryWidebandExt, WidebandExt, automatic_width},
utils::stream::{IterStream, ReadyExt, TryWidebandExt, WidebandExt},
};
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join};
use ruma::{OwnedEventId, RoomId, RoomVersionId};
@@ -112,14 +112,7 @@ pub async fn state_resolution<'a, StateSets>(
{
let event_fetch = |event_id| self.event_fetch(event_id);
let event_exists = |event_id| self.event_exists(event_id);
state_res::resolve(
room_version,
state_sets,
auth_chain_sets,
&event_fetch,
&event_exists,
automatic_width(),
)
.map_err(|e| err!(error!("State resolution failed: {e:?}")))
.await
state_res::resolve(room_version, state_sets, auth_chain_sets, &event_fetch, &event_exists)
.map_err(|e| err!(error!("State resolution failed: {e:?}")))
.await
}

View File

@@ -179,18 +179,15 @@ pub async fn last_typing_update(&self, room_id: &RoomId) -> Result<u64> {
.unwrap_or(0))
}
/// Returns a new typing EDU.
pub async fn typings_all(
pub async fn typing_users_for_user(
&self,
room_id: &RoomId,
sender_user: &UserId,
) -> Result<SyncEphemeralRoomEvent<ruma::events::typing::TypingEventContent>> {
) -> Result<Vec<OwnedUserId>> {
let room_typing_indicators = self.typing.read().await.get(room_id).cloned();
let Some(typing_indicators) = room_typing_indicators else {
return Ok(SyncEphemeralRoomEvent {
content: ruma::events::typing::TypingEventContent { user_ids: Vec::new() },
});
return Ok(Vec::new());
};
let user_ids: Vec<_> = typing_indicators
@@ -207,8 +204,19 @@ pub async fn typings_all(
.collect()
.await;
Ok(user_ids)
}
/// Returns a new typing EDU.
pub async fn typings_event_for_user(
&self,
room_id: &RoomId,
sender_user: &UserId,
) -> Result<SyncEphemeralRoomEvent<ruma::events::typing::TypingEventContent>> {
Ok(SyncEphemeralRoomEvent {
content: ruma::events::typing::TypingEventContent { user_ids },
content: ruma::events::typing::TypingEventContent {
user_ids: self.typing_users_for_user(room_id, sender_user).await?,
},
})
}

View File

@@ -6,7 +6,7 @@
response::{Html, IntoResponse, Response},
routing::get,
};
use conduwuit_build_metadata::{GIT_REMOTE_COMMIT_URL, GIT_REMOTE_WEB_URL, VERSION_EXTRA};
use conduwuit_build_metadata::{GIT_REMOTE_COMMIT_URL, GIT_REMOTE_WEB_URL, version_tag};
use conduwuit_service::state;
pub fn build() -> Router<state::State> {

View File

@@ -18,7 +18,7 @@
{%~ block footer ~%}
<footer>
<p>Powered by <a href="https://continuwuity.org">Continuwuity</a>
{%~ if let Some(version_info) = VERSION_EXTRA ~%}
{%~ if let Some(version_info) = self::version_tag() ~%}
{%~ if let Some(url) = GIT_REMOTE_COMMIT_URL.or(GIT_REMOTE_WEB_URL) ~%}
(<a href="{{ url }}">{{ version_info }}</a>)
{%~ else ~%}