mirror of
https://forgejo.ellis.link/continuwuation/continuwuity/
synced 2026-04-02 17:25:46 +00:00
Compare commits
22 Commits
jade/css-s
...
nex/fix-cr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6947b75f6e | ||
|
|
51185daca2 | ||
|
|
6e438c8448 | ||
|
|
e6aae8a994 | ||
|
|
cfff12190e | ||
|
|
5ea42418f7 | ||
|
|
3ebac17291 | ||
|
|
b44211c03e | ||
|
|
24cd34ee98 | ||
|
|
eda20ac4f5 | ||
|
|
e8d823a653 | ||
|
|
0ba77674c7 | ||
|
|
2ccbd7d60b | ||
|
|
60960c6e09 | ||
|
|
ce40304667 | ||
|
|
dcbc4b54c5 | ||
|
|
fce024b30b | ||
|
|
3e4e696761 | ||
|
|
f605913ea9 | ||
|
|
44302ce732 | ||
|
|
bfb0a2b76a | ||
|
|
fcd5669aa1 |
@@ -80,6 +80,7 @@ jobs:
|
||||
-D warnings
|
||||
|
||||
- name: Show sccache stats
|
||||
if: always()
|
||||
run: sccache --show-stats
|
||||
|
||||
cargo-test:
|
||||
@@ -137,4 +138,5 @@ jobs:
|
||||
--no-fail-fast
|
||||
|
||||
- name: Show sccache stats
|
||||
if: always()
|
||||
run: sccache --show-stats
|
||||
|
||||
476
Cargo.lock
generated
476
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
63
SECURITY.md
Normal file
63
SECURITY.md
Normal file
@@ -0,0 +1,63 @@
|
||||
# Security Policy for Continuwuity
|
||||
|
||||
This document outlines the security policy for Continuwuity. Our goal is to maintain a secure platform for all users, and we take security matters seriously.
|
||||
|
||||
## Supported Versions
|
||||
|
||||
We provide security updates for the following versions of Continuwuity:
|
||||
|
||||
| Version | Supported |
|
||||
| -------------- |:----------------:|
|
||||
| Latest release | ✅ |
|
||||
| Main branch | ✅ |
|
||||
| Older releases | ❌ |
|
||||
|
||||
We may backport fixes to the previous release at our discretion, but we don't guarantee this.
|
||||
|
||||
## Reporting a Vulnerability
|
||||
|
||||
### Responsible Disclosure
|
||||
|
||||
We appreciate the efforts of security researchers and the community in identifying and reporting vulnerabilities. To ensure that potential vulnerabilities are addressed properly, please follow these guidelines:
|
||||
|
||||
1. Contact members of the team over E2EE private message.
|
||||
- [@jade:ellis.link](https://matrix.to/#/@jade:ellis.link)
|
||||
- [@nex:nexy7574.co.uk](https://matrix.to/#/@nex:nexy7574.co.uk) <!-- ? -->
|
||||
2. **Email the security team** directly at [security@continuwuity.org](mailto:security@continuwuity.org). This is not E2EE, so don't include sensitive details.
|
||||
3. **Do not disclose the vulnerability publicly** until it has been addressed
|
||||
4. **Provide detailed information** about the vulnerability, including:
|
||||
- A clear description of the issue
|
||||
- Steps to reproduce
|
||||
- Potential impact
|
||||
- Any possible mitigations
|
||||
- Version(s) affected, including specific commits if possible
|
||||
|
||||
If you have any doubts about a potential security vulnerability, contact us via private channels first! We'd prefer that you bother us, instead of having a vulnerability disclosed without a fix.
|
||||
|
||||
### What to Expect
|
||||
|
||||
When you report a security vulnerability:
|
||||
|
||||
1. **Acknowledgment**: We will acknowledge receipt of your report.
|
||||
2. **Assessment**: We will assess the vulnerability and determine its impact on our users
|
||||
3. **Updates**: We will provide updates on our progress in addressing the vulnerability, and may request you help test mitigations
|
||||
4. **Resolution**: Once resolved, we will notify you and discuss coordinated disclosure
|
||||
5. **Credit**: We will recognize your contribution (unless you prefer to remain anonymous)
|
||||
|
||||
## Security Update Process
|
||||
|
||||
When security vulnerabilities are identified:
|
||||
|
||||
1. We will develop and test fixes in a private branch
|
||||
2. Security updates will be released as soon as possible
|
||||
3. Release notes will include information about the vulnerabilities, avoiding details that could facilitate exploitation where possible
|
||||
4. Critical security updates may be backported to the previous stable release
|
||||
|
||||
## Additional Resources
|
||||
|
||||
- [Matrix Security Disclosure Policy](https://matrix.org/security-disclosure-policy/)
|
||||
- [Continuwuity Documentation](https://continuwuity.org/introduction)
|
||||
|
||||
---
|
||||
|
||||
This security policy was last updated on May 25, 2025.
|
||||
@@ -1641,19 +1641,29 @@
|
||||
#
|
||||
#server =
|
||||
|
||||
# This item is undocumented. Please contribute documentation for it.
|
||||
# URL to a support page for the server, which will be served as part of
|
||||
# the MSC1929 server support endpoint at /.well-known/matrix/support.
|
||||
# Will be included alongside any contact information
|
||||
#
|
||||
#support_page =
|
||||
|
||||
# This item is undocumented. Please contribute documentation for it.
|
||||
# Role string for server support contacts, to be served as part of the
|
||||
# MSC1929 server support endpoint at /.well-known/matrix/support.
|
||||
#
|
||||
#support_role =
|
||||
#support_role = "m.role.admin"
|
||||
|
||||
# This item is undocumented. Please contribute documentation for it.
|
||||
# Email address for server support contacts, to be served as part of the
|
||||
# MSC1929 server support endpoint.
|
||||
# This will be used along with support_mxid if specified.
|
||||
#
|
||||
#support_email =
|
||||
|
||||
# This item is undocumented. Please contribute documentation for it.
|
||||
# Matrix ID for server support contacts, to be served as part of the
|
||||
# MSC1929 server support endpoint.
|
||||
# This will be used along with support_email if specified.
|
||||
#
|
||||
# If no email or mxid is specified, all of the server's admins will be
|
||||
# listed.
|
||||
#
|
||||
#support_mxid =
|
||||
|
||||
|
||||
@@ -20,3 +20,4 @@ # Summary
|
||||
- [Testing](development/testing.md)
|
||||
- [Hot Reloading ("Live" Development)](development/hot_reload.md)
|
||||
- [Community (and Guidelines)](community.md)
|
||||
- [Security](security.md)
|
||||
|
||||
1
docs/security.md
Normal file
1
docs/security.md
Normal file
@@ -0,0 +1 @@
|
||||
{{#include ../SECURITY.md}}
|
||||
@@ -1243,6 +1243,7 @@ async fn join_room_by_id_helper_remote(
|
||||
services.rooms.timeline.get_pdu(event_id).await.ok()
|
||||
};
|
||||
|
||||
debug!("running stateres check on send_join parsed PDU");
|
||||
let auth_check = state_res::event_auth::auth_check(
|
||||
&state_res::RoomVersion::new(&room_version_id)?,
|
||||
&parsed_join_pdu,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use axum::{Json, extract::State, response::IntoResponse};
|
||||
use conduwuit::{Error, Result};
|
||||
use futures::StreamExt;
|
||||
use ruma::api::client::{
|
||||
discovery::{
|
||||
discover_homeserver::{self, HomeserverInfo, SlidingSyncProxyInfo},
|
||||
@@ -17,7 +18,7 @@ pub(crate) async fn well_known_client(
|
||||
State(services): State<crate::State>,
|
||||
_body: Ruma<discover_homeserver::Request>,
|
||||
) -> Result<discover_homeserver::Response> {
|
||||
let client_url = match services.server.config.well_known.client.as_ref() {
|
||||
let client_url = match services.config.well_known.client.as_ref() {
|
||||
| Some(url) => url.to_string(),
|
||||
| None => return Err(Error::BadRequest(ErrorKind::NotFound, "Not found.")),
|
||||
};
|
||||
@@ -33,44 +34,63 @@ pub(crate) async fn well_known_client(
|
||||
/// # `GET /.well-known/matrix/support`
|
||||
///
|
||||
/// Server support contact and support page of a homeserver's domain.
|
||||
/// Implements MSC1929 for server discovery.
|
||||
/// If no configuration is set, uses admin users as contacts.
|
||||
pub(crate) async fn well_known_support(
|
||||
State(services): State<crate::State>,
|
||||
_body: Ruma<discover_support::Request>,
|
||||
) -> Result<discover_support::Response> {
|
||||
let support_page = services
|
||||
.server
|
||||
.config
|
||||
.well_known
|
||||
.support_page
|
||||
.as_ref()
|
||||
.map(ToString::to_string);
|
||||
|
||||
let role = services.server.config.well_known.support_role.clone();
|
||||
|
||||
// support page or role must be either defined for this to be valid
|
||||
if support_page.is_none() && role.is_none() {
|
||||
return Err(Error::BadRequest(ErrorKind::NotFound, "Not found."));
|
||||
}
|
||||
|
||||
let email_address = services.server.config.well_known.support_email.clone();
|
||||
let matrix_id = services.server.config.well_known.support_mxid.clone();
|
||||
|
||||
// if a role is specified, an email address or matrix id is required
|
||||
if role.is_some() && (email_address.is_none() && matrix_id.is_none()) {
|
||||
return Err(Error::BadRequest(ErrorKind::NotFound, "Not found."));
|
||||
}
|
||||
let email_address = services.config.well_known.support_email.clone();
|
||||
let matrix_id = services.config.well_known.support_mxid.clone();
|
||||
|
||||
// TODO: support defining multiple contacts in the config
|
||||
let mut contacts: Vec<Contact> = vec![];
|
||||
|
||||
if let Some(role) = role {
|
||||
let contact = Contact { role, email_address, matrix_id };
|
||||
let role_value = services
|
||||
.config
|
||||
.well_known
|
||||
.support_role
|
||||
.clone()
|
||||
.unwrap_or_else(|| "m.role.admin".to_owned().into());
|
||||
|
||||
contacts.push(contact);
|
||||
// Add configured contact if at least one contact method is specified
|
||||
if email_address.is_some() || matrix_id.is_some() {
|
||||
contacts.push(Contact {
|
||||
role: role_value.clone(),
|
||||
email_address: email_address.clone(),
|
||||
matrix_id: matrix_id.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
// Try to add admin users as contacts if no contacts are configured
|
||||
if contacts.is_empty() {
|
||||
if let Ok(admin_room) = services.admin.get_admin_room().await {
|
||||
let admin_users = services.rooms.state_cache.room_members(&admin_room);
|
||||
let mut stream = admin_users;
|
||||
|
||||
while let Some(user_id) = stream.next().await {
|
||||
// Skip server user
|
||||
if *user_id == services.globals.server_user {
|
||||
break;
|
||||
}
|
||||
contacts.push(Contact {
|
||||
role: role_value.clone(),
|
||||
email_address: None,
|
||||
matrix_id: Some(user_id.to_owned()),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// support page or role+contacts must be either defined for this to be valid
|
||||
if contacts.is_empty() && support_page.is_none() {
|
||||
// No admin room, no configured contacts, and no support page
|
||||
return Err(Error::BadRequest(ErrorKind::NotFound, "Not found."));
|
||||
}
|
||||
|
||||
@@ -84,9 +104,9 @@ pub(crate) async fn well_known_support(
|
||||
pub(crate) async fn syncv3_client_server_json(
|
||||
State(services): State<crate::State>,
|
||||
) -> Result<impl IntoResponse> {
|
||||
let server_url = match services.server.config.well_known.client.as_ref() {
|
||||
let server_url = match services.config.well_known.client.as_ref() {
|
||||
| Some(url) => url.to_string(),
|
||||
| None => match services.server.config.well_known.server.as_ref() {
|
||||
| None => match services.config.well_known.server.as_ref() {
|
||||
| Some(url) => url.to_string(),
|
||||
| None => return Err(Error::BadRequest(ErrorKind::NotFound, "Not found.")),
|
||||
},
|
||||
|
||||
@@ -13,13 +13,13 @@ version.workspace = true
|
||||
build = "build.rs"
|
||||
# [[bin]]
|
||||
# path = "main.rs"
|
||||
# name = "conduwuit_build_metadata"
|
||||
# name = "conduwuit_build_metadata"
|
||||
|
||||
[lib]
|
||||
path = "mod.rs"
|
||||
crate-type = [
|
||||
"rlib",
|
||||
# "dylib",
|
||||
"rlib",
|
||||
# "dylib",
|
||||
]
|
||||
|
||||
[features]
|
||||
@@ -28,7 +28,7 @@ crate-type = [
|
||||
[dependencies]
|
||||
|
||||
[build-dependencies]
|
||||
built = {version = "0.7", features = ["cargo-lock", "dependency-tree"]}
|
||||
built = { version = "0.8", features = [] }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -78,6 +78,7 @@ fn main() {
|
||||
}
|
||||
|
||||
// --- Rerun Triggers ---
|
||||
// TODO: The git rerun triggers seem to always run
|
||||
// Rerun if the git HEAD changes
|
||||
println!("cargo:rerun-if-changed=.git/HEAD");
|
||||
// Rerun if the ref pointed to by HEAD changes (e.g., new commit on branch)
|
||||
|
||||
@@ -12,11 +12,17 @@ pub mod built {
|
||||
v
|
||||
} else if let v @ Some(_) = option_env!("CONDUWUIT_VERSION_EXTRA") {
|
||||
v
|
||||
} else if let v @ Some(_) = option_env!("CONDUIT_VERSION_EXTRA") {
|
||||
v
|
||||
} else {
|
||||
GIT_COMMIT_HASH_SHORT
|
||||
option_env!("CONDUIT_VERSION_EXTRA")
|
||||
};
|
||||
|
||||
#[must_use]
|
||||
pub fn version_tag() -> Option<&'static str> {
|
||||
VERSION_EXTRA
|
||||
.filter(|s| !s.is_empty())
|
||||
.or(GIT_COMMIT_HASH_SHORT)
|
||||
}
|
||||
|
||||
pub static GIT_REMOTE_WEB_URL: Option<&str> = option_env!("GIT_REMOTE_WEB_URL");
|
||||
pub static GIT_REMOTE_COMMIT_URL: Option<&str> = option_env!("GIT_REMOTE_COMMIT_URL");
|
||||
|
||||
|
||||
@@ -274,6 +274,10 @@ pub fn set_dirty_decay<I: Into<Option<usize>>>(arena: I, decay_ms: isize) -> Res
|
||||
}
|
||||
}
|
||||
|
||||
pub fn background_thread_enable(enable: bool) -> Result<bool> {
|
||||
set::<u8>(&mallctl!("background_thread"), enable.into()).map(is_nonzero!())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[must_use]
|
||||
pub fn is_affine_arena() -> bool { is_percpu_arena() || is_phycpu_arena() }
|
||||
|
||||
@@ -1897,12 +1897,28 @@ pub struct WellKnownConfig {
|
||||
/// example: "matrix.example.com:443"
|
||||
pub server: Option<OwnedServerName>,
|
||||
|
||||
/// URL to a support page for the server, which will be served as part of
|
||||
/// the MSC1929 server support endpoint at /.well-known/matrix/support.
|
||||
/// Will be included alongside any contact information
|
||||
pub support_page: Option<Url>,
|
||||
|
||||
/// Role string for server support contacts, to be served as part of the
|
||||
/// MSC1929 server support endpoint at /.well-known/matrix/support.
|
||||
///
|
||||
/// default: "m.role.admin"
|
||||
pub support_role: Option<ContactRole>,
|
||||
|
||||
/// Email address for server support contacts, to be served as part of the
|
||||
/// MSC1929 server support endpoint.
|
||||
/// This will be used along with support_mxid if specified.
|
||||
pub support_email: Option<String>,
|
||||
|
||||
/// Matrix ID for server support contacts, to be served as part of the
|
||||
/// MSC1929 server support endpoint.
|
||||
/// This will be used along with support_email if specified.
|
||||
///
|
||||
/// If no email or mxid is specified, all of the server's admins will be
|
||||
/// listed.
|
||||
pub support_mxid: Option<OwnedUserId>,
|
||||
}
|
||||
|
||||
|
||||
@@ -26,6 +26,6 @@ pub fn user_agent() -> &'static str { USER_AGENT.get_or_init(init_user_agent) }
|
||||
fn init_user_agent() -> String { format!("{}/{}", name(), version()) }
|
||||
|
||||
fn init_version() -> String {
|
||||
conduwuit_build_metadata::VERSION_EXTRA
|
||||
conduwuit_build_metadata::version_tag()
|
||||
.map_or(SEMANTIC.to_owned(), |extra| format!("{SEMANTIC} ({extra})"))
|
||||
}
|
||||
|
||||
@@ -1,18 +1,10 @@
|
||||
use std::{
|
||||
borrow::Borrow,
|
||||
fmt::{Debug, Display},
|
||||
hash::Hash,
|
||||
};
|
||||
|
||||
use ruma::{EventId, MilliSecondsSinceUnixEpoch, RoomId, UserId, events::TimelineEventType};
|
||||
use serde_json::value::RawValue as RawJsonValue;
|
||||
|
||||
/// Abstraction of a PDU so users can have their own PDU types.
|
||||
pub trait Event {
|
||||
type Id: Clone + Debug + Display + Eq + Ord + Hash + Send + Borrow<EventId>;
|
||||
|
||||
/// The `EventId` of this event.
|
||||
fn event_id(&self) -> &Self::Id;
|
||||
fn event_id(&self) -> &EventId;
|
||||
|
||||
/// The `RoomId` of this event.
|
||||
fn room_id(&self) -> &RoomId;
|
||||
@@ -34,20 +26,18 @@ pub trait Event {
|
||||
|
||||
/// The events before this event.
|
||||
// Requires GATs to avoid boxing (and TAIT for making it convenient).
|
||||
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_;
|
||||
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_;
|
||||
|
||||
/// All the authenticating events for this event.
|
||||
// Requires GATs to avoid boxing (and TAIT for making it convenient).
|
||||
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_;
|
||||
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_;
|
||||
|
||||
/// If this event is a redaction event this is the event it redacts.
|
||||
fn redacts(&self) -> Option<&Self::Id>;
|
||||
fn redacts(&self) -> Option<&EventId>;
|
||||
}
|
||||
|
||||
impl<T: Event> Event for &T {
|
||||
type Id = T::Id;
|
||||
|
||||
fn event_id(&self) -> &Self::Id { (*self).event_id() }
|
||||
fn event_id(&self) -> &EventId { (*self).event_id() }
|
||||
|
||||
fn room_id(&self) -> &RoomId { (*self).room_id() }
|
||||
|
||||
@@ -61,13 +51,13 @@ fn content(&self) -> &RawJsonValue { (*self).content() }
|
||||
|
||||
fn state_key(&self) -> Option<&str> { (*self).state_key() }
|
||||
|
||||
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_ {
|
||||
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ {
|
||||
(*self).prev_events()
|
||||
}
|
||||
|
||||
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_ {
|
||||
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ {
|
||||
(*self).auth_events()
|
||||
}
|
||||
|
||||
fn redacts(&self) -> Option<&Self::Id> { (*self).redacts() }
|
||||
fn redacts(&self) -> Option<&EventId> { (*self).redacts() }
|
||||
}
|
||||
|
||||
@@ -79,9 +79,7 @@ pub fn from_id_val(event_id: &EventId, mut json: CanonicalJsonObject) -> Result<
|
||||
}
|
||||
|
||||
impl Event for Pdu {
|
||||
type Id = OwnedEventId;
|
||||
|
||||
fn event_id(&self) -> &Self::Id { &self.event_id }
|
||||
fn event_id(&self) -> &EventId { &self.event_id }
|
||||
|
||||
fn room_id(&self) -> &RoomId { &self.room_id }
|
||||
|
||||
@@ -97,15 +95,15 @@ fn origin_server_ts(&self) -> MilliSecondsSinceUnixEpoch {
|
||||
|
||||
fn state_key(&self) -> Option<&str> { self.state_key.as_deref() }
|
||||
|
||||
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_ {
|
||||
self.prev_events.iter()
|
||||
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ {
|
||||
self.prev_events.iter().map(AsRef::as_ref)
|
||||
}
|
||||
|
||||
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_ {
|
||||
self.auth_events.iter()
|
||||
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ {
|
||||
self.auth_events.iter().map(AsRef::as_ref)
|
||||
}
|
||||
|
||||
fn redacts(&self) -> Option<&Self::Id> { self.redacts.as_ref() }
|
||||
fn redacts(&self) -> Option<&EventId> { self.redacts.as_deref() }
|
||||
}
|
||||
|
||||
/// Prevent derived equality which wouldn't limit itself to event_id
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use ruma::{
|
||||
events::{
|
||||
AnyEphemeralRoomEvent, AnyMessageLikeEvent, AnyStateEvent, AnyStrippedStateEvent,
|
||||
AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent, StateEvent,
|
||||
room::member::RoomMemberEventContent, space::child::HierarchySpaceChildEvent,
|
||||
AnyMessageLikeEvent, AnyStateEvent, AnyStrippedStateEvent, AnySyncStateEvent,
|
||||
AnySyncTimelineEvent, AnyTimelineEvent, StateEvent, room::member::RoomMemberEventContent,
|
||||
space::child::HierarchySpaceChildEvent,
|
||||
},
|
||||
serde::Raw,
|
||||
};
|
||||
@@ -10,41 +10,6 @@
|
||||
|
||||
use crate::implement;
|
||||
|
||||
/// This only works for events that are also AnyRoomEvents.
|
||||
#[must_use]
|
||||
#[implement(super::Pdu)]
|
||||
pub fn into_any_event(self) -> Raw<AnyEphemeralRoomEvent> {
|
||||
serde_json::from_value(self.into_any_event_value()).expect("Raw::from_value always works")
|
||||
}
|
||||
|
||||
/// This only works for events that are also AnyRoomEvents.
|
||||
#[implement(super::Pdu)]
|
||||
#[must_use]
|
||||
#[inline]
|
||||
pub fn into_any_event_value(self) -> JsonValue {
|
||||
let (redacts, content) = self.copy_redacts();
|
||||
let mut json = json!({
|
||||
"content": content,
|
||||
"type": self.kind,
|
||||
"event_id": self.event_id,
|
||||
"sender": self.sender,
|
||||
"origin_server_ts": self.origin_server_ts,
|
||||
"room_id": self.room_id,
|
||||
});
|
||||
|
||||
if let Some(unsigned) = &self.unsigned {
|
||||
json["unsigned"] = json!(unsigned);
|
||||
}
|
||||
if let Some(state_key) = &self.state_key {
|
||||
json["state_key"] = json!(state_key);
|
||||
}
|
||||
if let Some(redacts) = &redacts {
|
||||
json["redacts"] = json!(redacts);
|
||||
}
|
||||
|
||||
json
|
||||
}
|
||||
|
||||
#[implement(super::Pdu)]
|
||||
#[must_use]
|
||||
#[inline]
|
||||
@@ -53,7 +18,8 @@ pub fn into_room_event(self) -> Raw<AnyTimelineEvent> { self.to_room_event() }
|
||||
#[implement(super::Pdu)]
|
||||
#[must_use]
|
||||
pub fn to_room_event(&self) -> Raw<AnyTimelineEvent> {
|
||||
serde_json::from_value(self.to_room_event_value()).expect("Raw::from_value always works")
|
||||
let value = self.to_room_event_value();
|
||||
serde_json::from_value(value).expect("Failed to serialize Event value")
|
||||
}
|
||||
|
||||
#[implement(super::Pdu)]
|
||||
@@ -91,8 +57,8 @@ pub fn into_message_like_event(self) -> Raw<AnyMessageLikeEvent> { self.to_messa
|
||||
#[implement(super::Pdu)]
|
||||
#[must_use]
|
||||
pub fn to_message_like_event(&self) -> Raw<AnyMessageLikeEvent> {
|
||||
serde_json::from_value(self.to_message_like_event_value())
|
||||
.expect("Raw::from_value always works")
|
||||
let value = self.to_message_like_event_value();
|
||||
serde_json::from_value(value).expect("Failed to serialize Event value")
|
||||
}
|
||||
|
||||
#[implement(super::Pdu)]
|
||||
@@ -130,7 +96,8 @@ pub fn into_sync_room_event(self) -> Raw<AnySyncTimelineEvent> { self.to_sync_ro
|
||||
#[implement(super::Pdu)]
|
||||
#[must_use]
|
||||
pub fn to_sync_room_event(&self) -> Raw<AnySyncTimelineEvent> {
|
||||
serde_json::from_value(self.to_sync_room_event_value()).expect("Raw::from_value always works")
|
||||
let value = self.to_sync_room_event_value();
|
||||
serde_json::from_value(value).expect("Failed to serialize Event value")
|
||||
}
|
||||
|
||||
#[implement(super::Pdu)]
|
||||
@@ -162,7 +129,8 @@ pub fn to_sync_room_event_value(&self) -> JsonValue {
|
||||
#[implement(super::Pdu)]
|
||||
#[must_use]
|
||||
pub fn into_state_event(self) -> Raw<AnyStateEvent> {
|
||||
serde_json::from_value(self.into_state_event_value()).expect("Raw::from_value always works")
|
||||
let value = self.into_state_event_value();
|
||||
serde_json::from_value(value).expect("Failed to serialize Event value")
|
||||
}
|
||||
|
||||
#[implement(super::Pdu)]
|
||||
@@ -189,8 +157,8 @@ pub fn into_state_event_value(self) -> JsonValue {
|
||||
#[implement(super::Pdu)]
|
||||
#[must_use]
|
||||
pub fn into_sync_state_event(self) -> Raw<AnySyncStateEvent> {
|
||||
serde_json::from_value(self.into_sync_state_event_value())
|
||||
.expect("Raw::from_value always works")
|
||||
let value = self.into_sync_state_event_value();
|
||||
serde_json::from_value(value).expect("Failed to serialize Event value")
|
||||
}
|
||||
|
||||
#[implement(super::Pdu)]
|
||||
@@ -223,8 +191,8 @@ pub fn into_stripped_state_event(self) -> Raw<AnyStrippedStateEvent> {
|
||||
#[implement(super::Pdu)]
|
||||
#[must_use]
|
||||
pub fn to_stripped_state_event(&self) -> Raw<AnyStrippedStateEvent> {
|
||||
serde_json::from_value(self.to_stripped_state_event_value())
|
||||
.expect("Raw::from_value always works")
|
||||
let value = self.to_stripped_state_event_value();
|
||||
serde_json::from_value(value).expect("Failed to serialize Event value")
|
||||
}
|
||||
|
||||
#[implement(super::Pdu)]
|
||||
@@ -242,8 +210,8 @@ pub fn to_stripped_state_event_value(&self) -> JsonValue {
|
||||
#[implement(super::Pdu)]
|
||||
#[must_use]
|
||||
pub fn into_stripped_spacechild_state_event(self) -> Raw<HierarchySpaceChildEvent> {
|
||||
serde_json::from_value(self.into_stripped_spacechild_state_event_value())
|
||||
.expect("Raw::from_value always works")
|
||||
let value = self.into_stripped_spacechild_state_event_value();
|
||||
serde_json::from_value(value).expect("Failed to serialize Event value")
|
||||
}
|
||||
|
||||
#[implement(super::Pdu)]
|
||||
@@ -262,7 +230,8 @@ pub fn into_stripped_spacechild_state_event_value(self) -> JsonValue {
|
||||
#[implement(super::Pdu)]
|
||||
#[must_use]
|
||||
pub fn into_member_event(self) -> Raw<StateEvent<RoomMemberEventContent>> {
|
||||
serde_json::from_value(self.into_member_event_value()).expect("Raw::from_value always works")
|
||||
let value = self.into_member_event_value();
|
||||
serde_json::from_value(value).expect("Failed to serialize Event value")
|
||||
}
|
||||
|
||||
#[implement(super::Pdu)]
|
||||
|
||||
@@ -52,7 +52,6 @@ fn lexico_topo_sort(c: &mut test::Bencher) {
|
||||
#[cfg(conduwuit_bench)]
|
||||
#[cfg_attr(conduwuit_bench, bench)]
|
||||
fn resolution_shallow_auth_chain(c: &mut test::Bencher) {
|
||||
let parallel_fetches = 32;
|
||||
let mut store = TestStore(hashmap! {});
|
||||
|
||||
// build up the DAG
|
||||
@@ -78,7 +77,6 @@ fn resolution_shallow_auth_chain(c: &mut test::Bencher) {
|
||||
&auth_chain_sets,
|
||||
&fetch,
|
||||
&exists,
|
||||
parallel_fetches,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -91,7 +89,6 @@ fn resolution_shallow_auth_chain(c: &mut test::Bencher) {
|
||||
#[cfg(conduwuit_bench)]
|
||||
#[cfg_attr(conduwuit_bench, bench)]
|
||||
fn resolve_deeper_event_set(c: &mut test::Bencher) {
|
||||
let parallel_fetches = 32;
|
||||
let mut inner = INITIAL_EVENTS();
|
||||
let ban = BAN_STATE_SET();
|
||||
|
||||
@@ -153,7 +150,6 @@ fn resolve_deeper_event_set(c: &mut test::Bencher) {
|
||||
&auth_chain_sets,
|
||||
&fetch,
|
||||
&exists,
|
||||
parallel_fetches,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -190,7 +186,11 @@ fn get_events(&self, room_id: &RoomId, event_ids: &[OwnedEventId]) -> Result<Vec
|
||||
}
|
||||
|
||||
/// Returns a Vec of the related auth events to the given `event`.
|
||||
fn auth_event_ids(&self, room_id: &RoomId, event_ids: Vec<E::Id>) -> Result<HashSet<E::Id>> {
|
||||
fn auth_event_ids(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
event_ids: Vec<OwnedEventId>,
|
||||
) -> Result<HashSet<OwnedEventId>> {
|
||||
let mut result = HashSet::new();
|
||||
let mut stack = event_ids;
|
||||
|
||||
@@ -216,8 +216,8 @@ fn auth_event_ids(&self, room_id: &RoomId, event_ids: Vec<E::Id>) -> Result<Hash
|
||||
fn auth_chain_diff(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
event_ids: Vec<Vec<E::Id>>,
|
||||
) -> Result<Vec<E::Id>> {
|
||||
event_ids: Vec<Vec<OwnedEventId>>,
|
||||
) -> Result<Vec<OwnedEventId>> {
|
||||
let mut auth_chain_sets = vec![];
|
||||
for ids in event_ids {
|
||||
// TODO state store `auth_event_ids` returns self in the event ids list
|
||||
@@ -238,7 +238,7 @@ fn auth_chain_diff(
|
||||
Ok(auth_chain_sets
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.filter(|id| !common.contains(id.borrow()))
|
||||
.filter(|id| !common.contains(id))
|
||||
.collect())
|
||||
} else {
|
||||
Ok(vec![])
|
||||
@@ -565,7 +565,7 @@ fn with_state_key(self, state_key: impl Into<String>) -> (StateEventType, String
|
||||
|
||||
mod event {
|
||||
use ruma::{
|
||||
MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId,
|
||||
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId,
|
||||
events::{TimelineEventType, pdu::Pdu},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -574,9 +574,7 @@ mod event {
|
||||
use super::Event;
|
||||
|
||||
impl Event for PduEvent {
|
||||
type Id = OwnedEventId;
|
||||
|
||||
fn event_id(&self) -> &Self::Id { &self.event_id }
|
||||
fn event_id(&self) -> &EventId { &self.event_id }
|
||||
|
||||
fn room_id(&self) -> &RoomId {
|
||||
match &self.rest {
|
||||
@@ -632,28 +630,30 @@ fn state_key(&self) -> Option<&str> {
|
||||
}
|
||||
}
|
||||
|
||||
fn prev_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + Send + '_> {
|
||||
fn prev_events(&self) -> Box<dyn DoubleEndedIterator<Item = &EventId> + Send + '_> {
|
||||
match &self.rest {
|
||||
| Pdu::RoomV1Pdu(ev) => Box::new(ev.prev_events.iter().map(|(id, _)| id)),
|
||||
| Pdu::RoomV3Pdu(ev) => Box::new(ev.prev_events.iter()),
|
||||
| Pdu::RoomV1Pdu(ev) =>
|
||||
Box::new(ev.prev_events.iter().map(|(id, _)| id.as_ref())),
|
||||
| Pdu::RoomV3Pdu(ev) => Box::new(ev.prev_events.iter().map(AsRef::as_ref)),
|
||||
#[cfg(not(feature = "unstable-exhaustive-types"))]
|
||||
| _ => unreachable!("new PDU version"),
|
||||
}
|
||||
}
|
||||
|
||||
fn auth_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + Send + '_> {
|
||||
fn auth_events(&self) -> Box<dyn DoubleEndedIterator<Item = &EventId> + Send + '_> {
|
||||
match &self.rest {
|
||||
| Pdu::RoomV1Pdu(ev) => Box::new(ev.auth_events.iter().map(|(id, _)| id)),
|
||||
| Pdu::RoomV3Pdu(ev) => Box::new(ev.auth_events.iter()),
|
||||
| Pdu::RoomV1Pdu(ev) =>
|
||||
Box::new(ev.auth_events.iter().map(|(id, _)| id.as_ref())),
|
||||
| Pdu::RoomV3Pdu(ev) => Box::new(ev.auth_events.iter().map(AsRef::as_ref)),
|
||||
#[cfg(not(feature = "unstable-exhaustive-types"))]
|
||||
| _ => unreachable!("new PDU version"),
|
||||
}
|
||||
}
|
||||
|
||||
fn redacts(&self) -> Option<&Self::Id> {
|
||||
fn redacts(&self) -> Option<&EventId> {
|
||||
match &self.rest {
|
||||
| Pdu::RoomV1Pdu(ev) => ev.redacts.as_ref(),
|
||||
| Pdu::RoomV3Pdu(ev) => ev.redacts.as_ref(),
|
||||
| Pdu::RoomV1Pdu(ev) => ev.redacts.as_deref(),
|
||||
| Pdu::RoomV3Pdu(ev) => ev.redacts.as_deref(),
|
||||
#[cfg(not(feature = "unstable-exhaustive-types"))]
|
||||
| _ => unreachable!("new PDU version"),
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
power_levels::RoomPowerLevelsEventContent,
|
||||
third_party_invite::RoomThirdPartyInviteEventContent,
|
||||
},
|
||||
EventId,
|
||||
int,
|
||||
serde::{Base64, Raw},
|
||||
};
|
||||
@@ -21,7 +22,6 @@
|
||||
de::{Error as _, IgnoredAny},
|
||||
};
|
||||
use serde_json::{from_str as from_json_str, value::RawValue as RawJsonValue};
|
||||
|
||||
use super::{
|
||||
Error, Event, Result, StateEventType, StateKey, TimelineEventType,
|
||||
power_levels::{
|
||||
@@ -133,7 +133,7 @@ struct RoomMemberContentFields {
|
||||
level = "debug",
|
||||
skip_all,
|
||||
fields(
|
||||
event_id = incoming_event.event_id().borrow().as_str()
|
||||
event_id = incoming_event.event_id().as_str(),
|
||||
)
|
||||
)]
|
||||
pub async fn auth_check<F, Fut, Fetched, Incoming>(
|
||||
@@ -217,8 +217,9 @@ struct RoomCreateContentFields {
|
||||
}
|
||||
|
||||
/*
|
||||
// TODO: In the past this code caused problems federating with synapse, maybe this has been
|
||||
// resolved already. Needs testing.
|
||||
// TODO: In the past this code was commented as it caused problems with Synapse. This is no
|
||||
// longer the case. This needs to be implemented.
|
||||
// See also: https://github.com/ruma/ruma/pull/2064
|
||||
//
|
||||
// 2. Reject if auth_events
|
||||
// a. auth_events cannot have duplicate keys since it's a BTree
|
||||
@@ -250,16 +251,38 @@ struct RoomCreateContentFields {
|
||||
|
||||
let room_create_event = match room_create_event {
|
||||
| None => {
|
||||
warn!("no m.room.create event in auth chain");
|
||||
error!(
|
||||
create_event = room_create_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(),
|
||||
power_levels = power_levels_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(),
|
||||
member_event = sender_member_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(),
|
||||
"no m.room.create event found for {} ({})!",
|
||||
incoming_event.event_id().as_str(),
|
||||
incoming_event.room_id().as_str()
|
||||
);
|
||||
return Ok(false);
|
||||
},
|
||||
| Some(e) => e,
|
||||
};
|
||||
// just re-check 1.2 to work around a bug
|
||||
let Some(room_id_server_name) = incoming_event.room_id().server_name() else {
|
||||
warn!("room ID has no servername");
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
if room_id_server_name != room_create_event.sender().server_name() {
|
||||
warn!(
|
||||
"servername of room ID origin ({}) does not match servername of m.room.create \
|
||||
sender ({})",
|
||||
room_id_server_name,
|
||||
room_create_event.sender().server_name()
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// 3. If event does not have m.room.create in auth_events reject
|
||||
if !incoming_event
|
||||
.auth_events()
|
||||
.any(|id| id.borrow() == room_create_event.event_id().borrow())
|
||||
.any(|id| id == room_create_event.event_id())
|
||||
{
|
||||
warn!("no m.room.create event in auth events");
|
||||
return Ok(false);
|
||||
@@ -1021,11 +1044,11 @@ fn check_redaction(
|
||||
|
||||
// If the domain of the event_id of the event being redacted is the same as the
|
||||
// domain of the event_id of the m.room.redaction, allow
|
||||
if redaction_event.event_id().borrow().server_name()
|
||||
if redaction_event.event_id().server_name()
|
||||
== redaction_event
|
||||
.redacts()
|
||||
.as_ref()
|
||||
.and_then(|&id| id.borrow().server_name())
|
||||
.and_then(|&id| id.server_name())
|
||||
{
|
||||
debug!("redaction event allowed via room version 1 rules");
|
||||
return Ok(true);
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
|
||||
use futures::{Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future};
|
||||
use ruma::{
|
||||
EventId, Int, MilliSecondsSinceUnixEpoch, RoomVersionId,
|
||||
EventId, Int, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomVersionId,
|
||||
events::{
|
||||
StateEventType, TimelineEventType,
|
||||
room::member::{MembershipState, RoomMemberEventContent},
|
||||
@@ -39,9 +39,7 @@
|
||||
debug, debug_error,
|
||||
matrix::{event::Event, pdu::StateKey},
|
||||
trace,
|
||||
utils::stream::{
|
||||
BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryReadyExt, WidebandExt,
|
||||
},
|
||||
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, WidebandExt},
|
||||
warn,
|
||||
};
|
||||
|
||||
@@ -69,9 +67,6 @@
|
||||
/// * `event_fetch` - Any event not found in the `event_map` will defer to this
|
||||
/// closure to find the event.
|
||||
///
|
||||
/// * `parallel_fetches` - The number of asynchronous fetch requests in-flight
|
||||
/// for any given operation.
|
||||
///
|
||||
/// ## Invariants
|
||||
///
|
||||
/// The caller of `resolve` must ensure that all the events are from the same
|
||||
@@ -82,21 +77,19 @@
|
||||
pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, ExistsFut>(
|
||||
room_version: &RoomVersionId,
|
||||
state_sets: Sets,
|
||||
auth_chain_sets: &'a [HashSet<E::Id, Hasher>],
|
||||
auth_chain_sets: &'a [HashSet<OwnedEventId, Hasher>],
|
||||
event_fetch: &Fetch,
|
||||
event_exists: &Exists,
|
||||
parallel_fetches: usize,
|
||||
) -> Result<StateMap<E::Id>>
|
||||
) -> Result<StateMap<OwnedEventId>>
|
||||
where
|
||||
Fetch: Fn(E::Id) -> FetchFut + Sync,
|
||||
Fetch: Fn(OwnedEventId) -> FetchFut + Sync,
|
||||
FetchFut: Future<Output = Option<E>> + Send,
|
||||
Exists: Fn(E::Id) -> ExistsFut + Sync,
|
||||
Exists: Fn(OwnedEventId) -> ExistsFut + Sync,
|
||||
ExistsFut: Future<Output = bool> + Send,
|
||||
Sets: IntoIterator<IntoIter = SetIter> + Send,
|
||||
SetIter: Iterator<Item = &'a StateMap<E::Id>> + Clone + Send,
|
||||
SetIter: Iterator<Item = &'a StateMap<OwnedEventId>> + Clone + Send,
|
||||
Hasher: BuildHasher + Send + Sync,
|
||||
E: Event + Clone + Send + Sync,
|
||||
E::Id: Borrow<EventId> + Send + Sync,
|
||||
for<'b> &'b E: Send,
|
||||
{
|
||||
debug!("State resolution starting");
|
||||
@@ -147,13 +140,8 @@ pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, Exis
|
||||
|
||||
// Sort the control events based on power_level/clock/event_id and
|
||||
// outgoing/incoming edges
|
||||
let sorted_control_levels = reverse_topological_power_sort(
|
||||
control_events,
|
||||
&all_conflicted,
|
||||
&event_fetch,
|
||||
parallel_fetches,
|
||||
)
|
||||
.await?;
|
||||
let sorted_control_levels =
|
||||
reverse_topological_power_sort(control_events, &all_conflicted, &event_fetch).await?;
|
||||
|
||||
debug!(count = sorted_control_levels.len(), "power events");
|
||||
trace!(list = ?sorted_control_levels, "sorted power events");
|
||||
@@ -162,7 +150,7 @@ pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, Exis
|
||||
// Sequentially auth check each control event.
|
||||
let resolved_control = iterative_auth_check(
|
||||
&room_version,
|
||||
sorted_control_levels.iter().stream(),
|
||||
sorted_control_levels.iter().stream().map(AsRef::as_ref),
|
||||
clean.clone(),
|
||||
&event_fetch,
|
||||
)
|
||||
@@ -179,7 +167,7 @@ pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, Exis
|
||||
// that failed auth
|
||||
let events_to_resolve: Vec<_> = all_conflicted
|
||||
.iter()
|
||||
.filter(|&id| !deduped_power_ev.contains(id.borrow()))
|
||||
.filter(|&id| !deduped_power_ev.contains(id))
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
@@ -199,7 +187,7 @@ pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, Exis
|
||||
|
||||
let mut resolved_state = iterative_auth_check(
|
||||
&room_version,
|
||||
sorted_left_events.iter().stream(),
|
||||
sorted_left_events.iter().stream().map(AsRef::as_ref),
|
||||
resolved_control, // The control events are added to the final resolved state
|
||||
&event_fetch,
|
||||
)
|
||||
@@ -292,16 +280,14 @@ fn get_auth_chain_diff<Id, Hasher>(
|
||||
/// earlier (further back in time) origin server timestamp.
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
async fn reverse_topological_power_sort<E, F, Fut>(
|
||||
events_to_sort: Vec<E::Id>,
|
||||
auth_diff: &HashSet<E::Id>,
|
||||
events_to_sort: Vec<OwnedEventId>,
|
||||
auth_diff: &HashSet<OwnedEventId>,
|
||||
fetch_event: &F,
|
||||
parallel_fetches: usize,
|
||||
) -> Result<Vec<E::Id>>
|
||||
) -> Result<Vec<OwnedEventId>>
|
||||
where
|
||||
F: Fn(E::Id) -> Fut + Sync,
|
||||
F: Fn(OwnedEventId) -> Fut + Sync,
|
||||
Fut: Future<Output = Option<E>> + Send,
|
||||
E: Event + Send + Sync,
|
||||
E::Id: Borrow<EventId> + Send + Sync,
|
||||
{
|
||||
debug!("reverse topological sort of power events");
|
||||
|
||||
@@ -311,35 +297,36 @@ async fn reverse_topological_power_sort<E, F, Fut>(
|
||||
}
|
||||
|
||||
// This is used in the `key_fn` passed to the lexico_topo_sort fn
|
||||
let event_to_pl = graph
|
||||
let event_to_pl: HashMap<_, _> = graph
|
||||
.keys()
|
||||
.cloned()
|
||||
.stream()
|
||||
.map(|event_id| {
|
||||
get_power_level_for_sender(event_id.clone(), fetch_event)
|
||||
.map(move |res| res.map(|pl| (event_id, pl)))
|
||||
.broad_filter_map(async |event_id| {
|
||||
let pl = get_power_level_for_sender(&event_id, fetch_event)
|
||||
.await
|
||||
.ok()?;
|
||||
Some((event_id, pl))
|
||||
})
|
||||
.buffer_unordered(parallel_fetches)
|
||||
.ready_try_fold(HashMap::new(), |mut event_to_pl, (event_id, pl)| {
|
||||
.inspect(|(event_id, pl)| {
|
||||
debug!(
|
||||
event_id = event_id.borrow().as_str(),
|
||||
power_level = i64::from(pl),
|
||||
event_id = event_id.as_str(),
|
||||
power_level = i64::from(*pl),
|
||||
"found the power level of an event's sender",
|
||||
);
|
||||
|
||||
event_to_pl.insert(event_id.clone(), pl);
|
||||
Ok(event_to_pl)
|
||||
})
|
||||
.collect()
|
||||
.boxed()
|
||||
.await?;
|
||||
.await;
|
||||
|
||||
let event_to_pl = &event_to_pl;
|
||||
let fetcher = |event_id: E::Id| async move {
|
||||
let fetcher = async |event_id: OwnedEventId| {
|
||||
let pl = *event_to_pl
|
||||
.get(event_id.borrow())
|
||||
.get(&event_id)
|
||||
.ok_or_else(|| Error::NotFound(String::new()))?;
|
||||
|
||||
let ev = fetch_event(event_id)
|
||||
.await
|
||||
.ok_or_else(|| Error::NotFound(String::new()))?;
|
||||
|
||||
Ok((pl, ev.origin_server_ts()))
|
||||
};
|
||||
|
||||
@@ -476,18 +463,17 @@ fn partial_cmp(&self, other: &Self) -> Option<Ordering> { Some(self.cmp(other))
|
||||
/// the eventId at the eventId's generation (we walk backwards to `EventId`s
|
||||
/// most recent previous power level event).
|
||||
async fn get_power_level_for_sender<E, F, Fut>(
|
||||
event_id: E::Id,
|
||||
event_id: &EventId,
|
||||
fetch_event: &F,
|
||||
) -> serde_json::Result<Int>
|
||||
where
|
||||
F: Fn(E::Id) -> Fut + Sync,
|
||||
F: Fn(OwnedEventId) -> Fut + Sync,
|
||||
Fut: Future<Output = Option<E>> + Send,
|
||||
E: Event + Send,
|
||||
E::Id: Borrow<EventId> + Send,
|
||||
{
|
||||
debug!("fetch event ({event_id}) senders power level");
|
||||
|
||||
let event = fetch_event(event_id).await;
|
||||
let event = fetch_event(event_id.to_owned()).await;
|
||||
|
||||
let auth_events = event.as_ref().map(Event::auth_events);
|
||||
|
||||
@@ -495,7 +481,7 @@ async fn get_power_level_for_sender<E, F, Fut>(
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.stream()
|
||||
.broadn_filter_map(5, |aid| fetch_event(aid.clone()))
|
||||
.broadn_filter_map(5, |aid| fetch_event(aid.to_owned()))
|
||||
.ready_find(|aev| is_type_and_key(aev, &TimelineEventType::RoomPowerLevels, ""))
|
||||
.await;
|
||||
|
||||
@@ -528,14 +514,13 @@ async fn get_power_level_for_sender<E, F, Fut>(
|
||||
async fn iterative_auth_check<'a, E, F, Fut, S>(
|
||||
room_version: &RoomVersion,
|
||||
events_to_check: S,
|
||||
unconflicted_state: StateMap<E::Id>,
|
||||
unconflicted_state: StateMap<OwnedEventId>,
|
||||
fetch_event: &F,
|
||||
) -> Result<StateMap<E::Id>>
|
||||
) -> Result<StateMap<OwnedEventId>>
|
||||
where
|
||||
F: Fn(E::Id) -> Fut + Sync,
|
||||
F: Fn(OwnedEventId) -> Fut + Sync,
|
||||
Fut: Future<Output = Option<E>> + Send,
|
||||
E::Id: Borrow<EventId> + Clone + Eq + Ord + Send + Sync + 'a,
|
||||
S: Stream<Item = &'a E::Id> + Send + 'a,
|
||||
S: Stream<Item = &'a EventId> + Send + 'a,
|
||||
E: Event + Clone + Send + Sync,
|
||||
{
|
||||
debug!("starting iterative auth check");
|
||||
@@ -543,7 +528,7 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
|
||||
let events_to_check: Vec<_> = events_to_check
|
||||
.map(Result::Ok)
|
||||
.broad_and_then(async |event_id| {
|
||||
fetch_event(event_id.clone())
|
||||
fetch_event(event_id.to_owned())
|
||||
.await
|
||||
.ok_or_else(|| Error::NotFound(format!("Failed to find {event_id}")))
|
||||
})
|
||||
@@ -551,16 +536,16 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
let auth_event_ids: HashSet<E::Id> = events_to_check
|
||||
let auth_event_ids: HashSet<OwnedEventId> = events_to_check
|
||||
.iter()
|
||||
.flat_map(|event: &E| event.auth_events().map(Clone::clone))
|
||||
.flat_map(|event: &E| event.auth_events().map(ToOwned::to_owned))
|
||||
.collect();
|
||||
|
||||
let auth_events: HashMap<E::Id, E> = auth_event_ids
|
||||
let auth_events: HashMap<OwnedEventId, E> = auth_event_ids
|
||||
.into_iter()
|
||||
.stream()
|
||||
.broad_filter_map(fetch_event)
|
||||
.map(|auth_event| (auth_event.event_id().clone(), auth_event))
|
||||
.map(|auth_event| (auth_event.event_id().to_owned(), auth_event))
|
||||
.collect()
|
||||
.boxed()
|
||||
.await;
|
||||
@@ -581,7 +566,7 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
|
||||
|
||||
let mut auth_state = StateMap::new();
|
||||
for aid in event.auth_events() {
|
||||
if let Some(ev) = auth_events.get(aid.borrow()) {
|
||||
if let Some(ev) = auth_events.get(aid) {
|
||||
//TODO: synapse checks "rejected_reason" which is most likely related to
|
||||
// soft-failing
|
||||
auth_state.insert(
|
||||
@@ -592,7 +577,7 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
|
||||
ev.clone(),
|
||||
);
|
||||
} else {
|
||||
warn!(event_id = aid.borrow().as_str(), "missing auth event");
|
||||
warn!(event_id = aid.as_str(), "missing auth event");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -601,7 +586,7 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
|
||||
.stream()
|
||||
.ready_filter_map(|key| Some((key, resolved_state.get(key)?)))
|
||||
.filter_map(|(key, ev_id)| async move {
|
||||
if let Some(event) = auth_events.get(ev_id.borrow()) {
|
||||
if let Some(event) = auth_events.get(ev_id) {
|
||||
Some((key, event.clone()))
|
||||
} else {
|
||||
Some((key, fetch_event(ev_id.clone()).await?))
|
||||
@@ -624,7 +609,7 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
|
||||
let fetch_state = |ty: &StateEventType, key: &str| {
|
||||
future::ready(auth_state.get(&ty.with_state_key(key)))
|
||||
};
|
||||
|
||||
debug!("running auth check on {:?}", event.event_id());
|
||||
let auth_result =
|
||||
auth_check(room_version, &event, current_third_party.as_ref(), fetch_state).await;
|
||||
|
||||
@@ -633,7 +618,7 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
|
||||
// add event to resolved state map
|
||||
resolved_state.insert(
|
||||
event.event_type().with_state_key(state_key),
|
||||
event.event_id().clone(),
|
||||
event.event_id().to_owned(),
|
||||
);
|
||||
},
|
||||
| Ok(false) => {
|
||||
@@ -660,15 +645,14 @@ async fn iterative_auth_check<'a, E, F, Fut, S>(
|
||||
/// level as a parent) will be marked as depth 1. depth 1 is "older" than depth
|
||||
/// 0.
|
||||
async fn mainline_sort<E, F, Fut>(
|
||||
to_sort: &[E::Id],
|
||||
resolved_power_level: Option<E::Id>,
|
||||
to_sort: &[OwnedEventId],
|
||||
resolved_power_level: Option<OwnedEventId>,
|
||||
fetch_event: &F,
|
||||
) -> Result<Vec<E::Id>>
|
||||
) -> Result<Vec<OwnedEventId>>
|
||||
where
|
||||
F: Fn(E::Id) -> Fut + Sync,
|
||||
F: Fn(OwnedEventId) -> Fut + Sync,
|
||||
Fut: Future<Output = Option<E>> + Send,
|
||||
E: Event + Clone + Send + Sync,
|
||||
E::Id: Borrow<EventId> + Clone + Send + Sync,
|
||||
{
|
||||
debug!("mainline sort of events");
|
||||
|
||||
@@ -688,7 +672,7 @@ async fn mainline_sort<E, F, Fut>(
|
||||
|
||||
pl = None;
|
||||
for aid in event.auth_events() {
|
||||
let ev = fetch_event(aid.clone())
|
||||
let ev = fetch_event(aid.to_owned())
|
||||
.await
|
||||
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
|
||||
|
||||
@@ -734,26 +718,29 @@ async fn mainline_sort<E, F, Fut>(
|
||||
/// that has an associated mainline depth.
|
||||
async fn get_mainline_depth<E, F, Fut>(
|
||||
mut event: Option<E>,
|
||||
mainline_map: &HashMap<E::Id, usize>,
|
||||
mainline_map: &HashMap<OwnedEventId, usize>,
|
||||
fetch_event: &F,
|
||||
) -> Result<usize>
|
||||
where
|
||||
F: Fn(E::Id) -> Fut + Sync,
|
||||
F: Fn(OwnedEventId) -> Fut + Sync,
|
||||
Fut: Future<Output = Option<E>> + Send,
|
||||
E: Event + Send + Sync,
|
||||
E::Id: Borrow<EventId> + Send + Sync,
|
||||
{
|
||||
let mut room_id = None;
|
||||
while let Some(sort_ev) = event {
|
||||
debug!(event_id = sort_ev.event_id().borrow().as_str(), "mainline");
|
||||
trace!(event_id = sort_ev.event_id().as_str(), "mainline");
|
||||
if room_id.is_none() {
|
||||
room_id = Some(sort_ev.room_id().to_owned());
|
||||
}
|
||||
|
||||
let id = sort_ev.event_id();
|
||||
if let Some(depth) = mainline_map.get(id.borrow()) {
|
||||
if let Some(depth) = mainline_map.get(id) {
|
||||
return Ok(*depth);
|
||||
}
|
||||
|
||||
event = None;
|
||||
for aid in sort_ev.auth_events() {
|
||||
let aev = fetch_event(aid.clone())
|
||||
let aev = fetch_event(aid.to_owned())
|
||||
.await
|
||||
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
|
||||
|
||||
@@ -763,20 +750,19 @@ async fn get_mainline_depth<E, F, Fut>(
|
||||
}
|
||||
}
|
||||
}
|
||||
// Did not find a power level event so we default to zero
|
||||
warn!("could not find a power event in the mainline map for {room_id:?}, defaulting to zero depth");
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
async fn add_event_and_auth_chain_to_graph<E, F, Fut>(
|
||||
graph: &mut HashMap<E::Id, HashSet<E::Id>>,
|
||||
event_id: E::Id,
|
||||
auth_diff: &HashSet<E::Id>,
|
||||
graph: &mut HashMap<OwnedEventId, HashSet<OwnedEventId>>,
|
||||
event_id: OwnedEventId,
|
||||
auth_diff: &HashSet<OwnedEventId>,
|
||||
fetch_event: &F,
|
||||
) where
|
||||
F: Fn(E::Id) -> Fut + Sync,
|
||||
F: Fn(OwnedEventId) -> Fut + Sync,
|
||||
Fut: Future<Output = Option<E>> + Send,
|
||||
E: Event + Send + Sync,
|
||||
E::Id: Borrow<EventId> + Clone + Send + Sync,
|
||||
{
|
||||
let mut state = vec![event_id];
|
||||
while let Some(eid) = state.pop() {
|
||||
@@ -786,26 +772,27 @@ async fn add_event_and_auth_chain_to_graph<E, F, Fut>(
|
||||
|
||||
// Prefer the store to event as the store filters dedups the events
|
||||
for aid in auth_events {
|
||||
if auth_diff.contains(aid.borrow()) {
|
||||
if !graph.contains_key(aid.borrow()) {
|
||||
if auth_diff.contains(aid) {
|
||||
if !graph.contains_key(aid) {
|
||||
state.push(aid.to_owned());
|
||||
}
|
||||
|
||||
// We just inserted this at the start of the while loop
|
||||
graph.get_mut(eid.borrow()).unwrap().insert(aid.to_owned());
|
||||
graph
|
||||
.get_mut(&eid)
|
||||
.expect("We just inserted this at the start of the while loop")
|
||||
.insert(aid.to_owned());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn is_power_event_id<E, F, Fut>(event_id: &E::Id, fetch: &F) -> bool
|
||||
async fn is_power_event_id<E, F, Fut>(event_id: &EventId, fetch: &F) -> bool
|
||||
where
|
||||
F: Fn(E::Id) -> Fut + Sync,
|
||||
F: Fn(OwnedEventId) -> Fut + Sync,
|
||||
Fut: Future<Output = Option<E>> + Send,
|
||||
E: Event + Send,
|
||||
E::Id: Borrow<EventId> + Send + Sync,
|
||||
{
|
||||
match fetch(event_id.clone()).await.as_ref() {
|
||||
match fetch(event_id.to_owned()).await.as_ref() {
|
||||
| Some(state) => is_power_event(state),
|
||||
| _ => false,
|
||||
}
|
||||
@@ -909,13 +896,13 @@ async fn test_event_sort() {
|
||||
|
||||
let fetcher = |id| ready(events.get(&id).cloned());
|
||||
let sorted_power_events =
|
||||
super::reverse_topological_power_sort(power_events, &auth_chain, &fetcher, 1)
|
||||
super::reverse_topological_power_sort(power_events, &auth_chain, &fetcher)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let resolved_power = super::iterative_auth_check(
|
||||
&RoomVersion::V6,
|
||||
sorted_power_events.iter().stream(),
|
||||
sorted_power_events.iter().map(AsRef::as_ref).stream(),
|
||||
HashMap::new(), // unconflicted events
|
||||
&fetcher,
|
||||
)
|
||||
@@ -1300,7 +1287,7 @@ async fn test_event_map_none() {
|
||||
let ev_map = store.0.clone();
|
||||
let fetcher = |id| ready(ev_map.get(&id).cloned());
|
||||
|
||||
let exists = |id: <PduEvent as Event>::Id| ready(ev_map.get(&*id).is_some());
|
||||
let exists = |id: OwnedEventId| ready(ev_map.get(&*id).is_some());
|
||||
|
||||
let state_sets = [state_at_bob, state_at_charlie];
|
||||
let auth_chain: Vec<_> = state_sets
|
||||
@@ -1312,19 +1299,13 @@ async fn test_event_map_none() {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let resolved = match super::resolve(
|
||||
&RoomVersionId::V2,
|
||||
&state_sets,
|
||||
&auth_chain,
|
||||
&fetcher,
|
||||
&exists,
|
||||
1,
|
||||
)
|
||||
.await
|
||||
{
|
||||
| Ok(state) => state,
|
||||
| Err(e) => panic!("{e}"),
|
||||
};
|
||||
let resolved =
|
||||
match super::resolve(&RoomVersionId::V2, &state_sets, &auth_chain, &fetcher, &exists)
|
||||
.await
|
||||
{
|
||||
| Ok(state) => state,
|
||||
| Err(e) => panic!("{e}"),
|
||||
};
|
||||
|
||||
assert_eq!(expected, resolved);
|
||||
}
|
||||
@@ -1429,21 +1410,15 @@ async fn ban_with_auth_chains2() {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let fetcher = |id: <PduEvent as Event>::Id| ready(ev_map.get(&id).cloned());
|
||||
let exists = |id: <PduEvent as Event>::Id| ready(ev_map.get(&id).is_some());
|
||||
let resolved = match super::resolve(
|
||||
&RoomVersionId::V6,
|
||||
&state_sets,
|
||||
&auth_chain,
|
||||
&fetcher,
|
||||
&exists,
|
||||
1,
|
||||
)
|
||||
.await
|
||||
{
|
||||
| Ok(state) => state,
|
||||
| Err(e) => panic!("{e}"),
|
||||
};
|
||||
let fetcher = |id: OwnedEventId| ready(ev_map.get(&id).cloned());
|
||||
let exists = |id: OwnedEventId| ready(ev_map.get(&id).is_some());
|
||||
let resolved =
|
||||
match super::resolve(&RoomVersionId::V6, &state_sets, &auth_chain, &fetcher, &exists)
|
||||
.await
|
||||
{
|
||||
| Ok(state) => state,
|
||||
| Err(e) => panic!("{e}"),
|
||||
};
|
||||
|
||||
debug!(
|
||||
resolved = ?resolved
|
||||
|
||||
@@ -133,17 +133,11 @@ pub(crate) async fn do_check(
|
||||
.collect();
|
||||
|
||||
let event_map = &event_map;
|
||||
let fetch = |id: <PduEvent as Event>::Id| ready(event_map.get(&id).cloned());
|
||||
let exists = |id: <PduEvent as Event>::Id| ready(event_map.get(&id).is_some());
|
||||
let resolved = super::resolve(
|
||||
&RoomVersionId::V6,
|
||||
state_sets,
|
||||
&auth_chain_sets,
|
||||
&fetch,
|
||||
&exists,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let fetch = |id: OwnedEventId| ready(event_map.get(&id).cloned());
|
||||
let exists = |id: OwnedEventId| ready(event_map.get(&id).is_some());
|
||||
let resolved =
|
||||
super::resolve(&RoomVersionId::V6, state_sets, &auth_chain_sets, &fetch, &exists)
|
||||
.await;
|
||||
|
||||
match resolved {
|
||||
| Ok(state) => state,
|
||||
@@ -247,8 +241,8 @@ pub(crate) fn get_event(&self, _: &RoomId, event_id: &EventId) -> Result<E> {
|
||||
pub(crate) fn auth_event_ids(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
event_ids: Vec<E::Id>,
|
||||
) -> Result<HashSet<E::Id>> {
|
||||
event_ids: Vec<OwnedEventId>,
|
||||
) -> Result<HashSet<OwnedEventId>> {
|
||||
let mut result = HashSet::new();
|
||||
let mut stack = event_ids;
|
||||
|
||||
@@ -584,7 +578,7 @@ pub(crate) fn INITIAL_EDGES() -> Vec<OwnedEventId> {
|
||||
|
||||
pub(crate) mod event {
|
||||
use ruma::{
|
||||
MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId,
|
||||
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId,
|
||||
events::{TimelineEventType, pdu::Pdu},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -593,9 +587,7 @@ pub(crate) mod event {
|
||||
use crate::Event;
|
||||
|
||||
impl Event for PduEvent {
|
||||
type Id = OwnedEventId;
|
||||
|
||||
fn event_id(&self) -> &Self::Id { &self.event_id }
|
||||
fn event_id(&self) -> &EventId { &self.event_id }
|
||||
|
||||
fn room_id(&self) -> &RoomId {
|
||||
match &self.rest {
|
||||
@@ -652,29 +644,31 @@ fn state_key(&self) -> Option<&str> {
|
||||
}
|
||||
|
||||
#[allow(refining_impl_trait)]
|
||||
fn prev_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + Send + '_> {
|
||||
fn prev_events(&self) -> Box<dyn DoubleEndedIterator<Item = &EventId> + Send + '_> {
|
||||
match &self.rest {
|
||||
| Pdu::RoomV1Pdu(ev) => Box::new(ev.prev_events.iter().map(|(id, _)| id)),
|
||||
| Pdu::RoomV3Pdu(ev) => Box::new(ev.prev_events.iter()),
|
||||
| Pdu::RoomV1Pdu(ev) =>
|
||||
Box::new(ev.prev_events.iter().map(|(id, _)| id.as_ref())),
|
||||
| Pdu::RoomV3Pdu(ev) => Box::new(ev.prev_events.iter().map(AsRef::as_ref)),
|
||||
#[allow(unreachable_patterns)]
|
||||
| _ => unreachable!("new PDU version"),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(refining_impl_trait)]
|
||||
fn auth_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + Send + '_> {
|
||||
fn auth_events(&self) -> Box<dyn DoubleEndedIterator<Item = &EventId> + Send + '_> {
|
||||
match &self.rest {
|
||||
| Pdu::RoomV1Pdu(ev) => Box::new(ev.auth_events.iter().map(|(id, _)| id)),
|
||||
| Pdu::RoomV3Pdu(ev) => Box::new(ev.auth_events.iter()),
|
||||
| Pdu::RoomV1Pdu(ev) =>
|
||||
Box::new(ev.auth_events.iter().map(|(id, _)| id.as_ref())),
|
||||
| Pdu::RoomV3Pdu(ev) => Box::new(ev.auth_events.iter().map(AsRef::as_ref)),
|
||||
#[allow(unreachable_patterns)]
|
||||
| _ => unreachable!("new PDU version"),
|
||||
}
|
||||
}
|
||||
|
||||
fn redacts(&self) -> Option<&Self::Id> {
|
||||
fn redacts(&self) -> Option<&EventId> {
|
||||
match &self.rest {
|
||||
| Pdu::RoomV1Pdu(ev) => ev.redacts.as_ref(),
|
||||
| Pdu::RoomV3Pdu(ev) => ev.redacts.as_ref(),
|
||||
| Pdu::RoomV1Pdu(ev) => ev.redacts.as_deref(),
|
||||
| Pdu::RoomV3Pdu(ev) => ev.redacts.as_deref(),
|
||||
#[allow(unreachable_patterns)]
|
||||
| _ => unreachable!("new PDU version"),
|
||||
}
|
||||
|
||||
@@ -98,12 +98,7 @@ pub(super) fn shutdown(server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
|
||||
Level::INFO
|
||||
};
|
||||
|
||||
debug!(
|
||||
timeout = ?SHUTDOWN_TIMEOUT,
|
||||
"Waiting for runtime..."
|
||||
);
|
||||
|
||||
runtime.shutdown_timeout(SHUTDOWN_TIMEOUT);
|
||||
wait_shutdown(server, runtime);
|
||||
let runtime_metrics = server.server.metrics.runtime_interval().unwrap_or_default();
|
||||
|
||||
event!(LEVEL, ?runtime_metrics, "Final runtime metrics");
|
||||
@@ -111,13 +106,23 @@ pub(super) fn shutdown(server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
|
||||
|
||||
#[cfg(not(tokio_unstable))]
|
||||
#[tracing::instrument(name = "stop", level = "info", skip_all)]
|
||||
pub(super) fn shutdown(_server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
|
||||
pub(super) fn shutdown(server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
|
||||
wait_shutdown(server, runtime);
|
||||
}
|
||||
|
||||
fn wait_shutdown(_server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
|
||||
debug!(
|
||||
timeout = ?SHUTDOWN_TIMEOUT,
|
||||
"Waiting for runtime..."
|
||||
);
|
||||
|
||||
runtime.shutdown_timeout(SHUTDOWN_TIMEOUT);
|
||||
|
||||
// Join any jemalloc threads so they don't appear in use at exit.
|
||||
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
|
||||
conduwuit_core::alloc::je::background_thread_enable(false)
|
||||
.log_debug_err()
|
||||
.ok();
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
|
||||
@@ -76,7 +76,7 @@ pub(super) async fn handle_outlier_pdu<'a>(
|
||||
// 5. Reject "due to auth events" if can't get all the auth events or some of
|
||||
// the auth events are also rejected "due to auth events"
|
||||
// NOTE: Step 5 is not applied anymore because it failed too often
|
||||
debug!("Fetching auth events");
|
||||
debug!("Fetching auth events for {}", incoming_pdu.event_id);
|
||||
Box::pin(self.fetch_and_handle_outliers(
|
||||
origin,
|
||||
&incoming_pdu.auth_events,
|
||||
@@ -88,12 +88,12 @@ pub(super) async fn handle_outlier_pdu<'a>(
|
||||
|
||||
// 6. Reject "due to auth events" if the event doesn't pass auth based on the
|
||||
// auth events
|
||||
debug!("Checking based on auth events");
|
||||
debug!("Checking {} based on auth events", incoming_pdu.event_id);
|
||||
// Build map of auth events
|
||||
let mut auth_events = HashMap::with_capacity(incoming_pdu.auth_events.len());
|
||||
for id in &incoming_pdu.auth_events {
|
||||
let Ok(auth_event) = self.services.timeline.get_pdu(id).await else {
|
||||
warn!("Could not find auth event {id}");
|
||||
warn!("Could not find auth event {id} for {}", incoming_pdu.event_id);
|
||||
continue;
|
||||
};
|
||||
|
||||
@@ -119,10 +119,7 @@ pub(super) async fn handle_outlier_pdu<'a>(
|
||||
}
|
||||
|
||||
// The original create event must be in the auth events
|
||||
if !matches!(
|
||||
auth_events.get(&(StateEventType::RoomCreate, String::new().into())),
|
||||
Some(_) | None
|
||||
) {
|
||||
if !auth_events.contains_key(&(StateEventType::RoomCreate, String::new().into())) {
|
||||
return Err!(Request(InvalidParam("Incoming event refers to wrong create event.")));
|
||||
}
|
||||
|
||||
@@ -131,6 +128,7 @@ pub(super) async fn handle_outlier_pdu<'a>(
|
||||
ready(auth_events.get(&key))
|
||||
};
|
||||
|
||||
debug!("running auth check to handle outlier pdu {:?}", incoming_pdu.event_id);
|
||||
let auth_check = state_res::event_auth::auth_check(
|
||||
&to_room_version(&room_version_id),
|
||||
&incoming_pdu,
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
Error, Result, err, implement,
|
||||
state_res::{self, StateMap},
|
||||
trace,
|
||||
utils::stream::{IterStream, ReadyExt, TryWidebandExt, WidebandExt, automatic_width},
|
||||
utils::stream::{IterStream, ReadyExt, TryWidebandExt, WidebandExt},
|
||||
};
|
||||
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join};
|
||||
use ruma::{OwnedEventId, RoomId, RoomVersionId};
|
||||
@@ -112,14 +112,7 @@ pub async fn state_resolution<'a, StateSets>(
|
||||
{
|
||||
let event_fetch = |event_id| self.event_fetch(event_id);
|
||||
let event_exists = |event_id| self.event_exists(event_id);
|
||||
state_res::resolve(
|
||||
room_version,
|
||||
state_sets,
|
||||
auth_chain_sets,
|
||||
&event_fetch,
|
||||
&event_exists,
|
||||
automatic_width(),
|
||||
)
|
||||
.map_err(|e| err!(error!("State resolution failed: {e:?}")))
|
||||
.await
|
||||
state_res::resolve(room_version, state_sets, auth_chain_sets, &event_fetch, &event_exists)
|
||||
.map_err(|e| err!(error!("State resolution failed: {e:?}")))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -1,12 +1,6 @@
|
||||
use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Result, debug, debug_info, err, implement,
|
||||
matrix::{EventTypeExt, PduEvent, StateKey, state_res},
|
||||
trace,
|
||||
utils::stream::{BroadbandExt, ReadyExt},
|
||||
warn,
|
||||
};
|
||||
use conduwuit::{Err, Result, debug, debug_info, err, implement, matrix::{EventTypeExt, PduEvent, StateKey, state_res}, trace, utils::stream::{BroadbandExt, ReadyExt}, warn, info};
|
||||
use futures::{FutureExt, StreamExt, future::ready};
|
||||
use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType};
|
||||
|
||||
@@ -44,7 +38,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
|
||||
return Err!(Request(InvalidParam("Event has been soft failed")));
|
||||
}
|
||||
|
||||
debug!("Upgrading to timeline pdu");
|
||||
debug!("Upgrading pdu {} from outlier to timeline pdu", incoming_pdu.event_id);
|
||||
let timer = Instant::now();
|
||||
let room_version_id = get_room_version_id(create_event)?;
|
||||
|
||||
@@ -52,7 +46,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
|
||||
// backwards extremities doing all the checks in this list starting at 1.
|
||||
// These are not timeline events.
|
||||
|
||||
debug!("Resolving state at event");
|
||||
debug!("Resolving state at event {}", incoming_pdu.event_id);
|
||||
let mut state_at_incoming_event = if incoming_pdu.prev_events.len() == 1 {
|
||||
self.state_at_incoming_degree_one(&incoming_pdu).await?
|
||||
} else {
|
||||
@@ -70,7 +64,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
|
||||
state_at_incoming_event.expect("we always set this to some above");
|
||||
let room_version = to_room_version(&room_version_id);
|
||||
|
||||
debug!("Performing auth check");
|
||||
debug!("Performing auth check to upgrade {}", incoming_pdu.event_id);
|
||||
// 11. Check the auth of the event passes based on the state of the event
|
||||
let state_fetch_state = &state_at_incoming_event;
|
||||
let state_fetch = |k: StateEventType, s: StateKey| async move {
|
||||
@@ -80,6 +74,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
|
||||
self.services.timeline.get_pdu(event_id).await.ok()
|
||||
};
|
||||
|
||||
debug!("running auth check on {}", incoming_pdu.event_id);
|
||||
let auth_check = state_res::event_auth::auth_check(
|
||||
&room_version,
|
||||
&incoming_pdu,
|
||||
@@ -93,7 +88,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
|
||||
return Err!(Request(Forbidden("Event has failed auth check with state at the event.")));
|
||||
}
|
||||
|
||||
debug!("Gathering auth events");
|
||||
debug!("Gathering auth events for {}", incoming_pdu.event_id);
|
||||
let auth_events = self
|
||||
.services
|
||||
.state
|
||||
@@ -111,6 +106,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
|
||||
ready(auth_events.get(&key).cloned())
|
||||
};
|
||||
|
||||
debug!("running auth check on {} with claimed state auth", incoming_pdu.event_id);
|
||||
let auth_check = state_res::event_auth::auth_check(
|
||||
&room_version,
|
||||
&incoming_pdu,
|
||||
@@ -121,7 +117,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
|
||||
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
|
||||
|
||||
// Soft fail check before doing state res
|
||||
debug!("Performing soft-fail check");
|
||||
debug!("Performing soft-fail check on {}", incoming_pdu.event_id);
|
||||
let soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_id)) {
|
||||
| (false, _) => true,
|
||||
| (true, None) => false,
|
||||
@@ -218,7 +214,8 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
|
||||
// 14. Check if the event passes auth based on the "current state" of the room,
|
||||
// if not soft fail it
|
||||
if soft_fail {
|
||||
debug!("Soft failing event");
|
||||
info!("Soft failing event {}", incoming_pdu.event_id);
|
||||
assert!(extremities.is_empty(), "soft_fail extremities empty");
|
||||
let extremities = extremities.iter().map(Borrow::borrow);
|
||||
|
||||
self.services
|
||||
|
||||
@@ -698,6 +698,20 @@ pub async fn create_hash_and_sign_event(
|
||||
.await
|
||||
.saturating_add(uint!(1));
|
||||
|
||||
if state_key.is_none() {
|
||||
if prev_events.is_empty() {
|
||||
warn!("Timeline event had zero prev_events, something broke.");
|
||||
return Err!(Request(Unknown("Timeline event had zero prev_events.")));
|
||||
}
|
||||
if depth.le(&uint!(2)) {
|
||||
warn!(
|
||||
"Had unsafe depth of {depth} in {room_id} when creating non-state event. \
|
||||
Bad!"
|
||||
);
|
||||
return Err!(Request(Unknown("Unsafe depth for non-state event.")));
|
||||
}
|
||||
};
|
||||
|
||||
let mut unsigned = unsigned.unwrap_or_default();
|
||||
|
||||
if let Some(state_key) = &state_key {
|
||||
@@ -757,6 +771,7 @@ pub async fn create_hash_and_sign_event(
|
||||
ready(auth_events.get(&key))
|
||||
};
|
||||
|
||||
debug!("running auth check on new {} event by {} in {}", pdu.kind, pdu.sender, pdu.room_id);
|
||||
let auth_check = state_res::auth_check(
|
||||
&room_version,
|
||||
&pdu,
|
||||
@@ -961,8 +976,9 @@ pub async fn append_incoming_pdu<'a, Leaves>(
|
||||
state_lock: &'a RoomMutexGuard,
|
||||
) -> Result<Option<RawPduId>>
|
||||
where
|
||||
Leaves: Iterator<Item = &'a EventId> + Send + 'a,
|
||||
Leaves: Iterator<Item = &'a EventId> + Send + Clone + 'a,
|
||||
{
|
||||
assert!(new_room_leaves.clone().count() > 0, "extremities are empty");
|
||||
// We append to state before appending the pdu, so we don't have a moment in
|
||||
// time with the pdu without it's state. This is okay because append_pdu can't
|
||||
// fail.
|
||||
@@ -1142,7 +1158,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
|
||||
.boxed();
|
||||
|
||||
while let Some(ref backfill_server) = servers.next().await {
|
||||
info!("Asking {backfill_server} for backfill");
|
||||
info!("Asking {backfill_server} for backfill in {:?}", room_id.to_owned());
|
||||
let response = self
|
||||
.services
|
||||
.sending
|
||||
@@ -1170,7 +1186,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
|
||||
}
|
||||
}
|
||||
|
||||
info!("No servers could backfill, but backfill was needed in room {room_id}");
|
||||
warn!("No servers could backfill, but backfill was needed in room {room_id}");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -1,85 +1,68 @@
|
||||
:root {
|
||||
color-scheme: light;
|
||||
--font-stack: sans-serif;
|
||||
color-scheme: light;
|
||||
--font-stack: sans-serif;
|
||||
|
||||
--background-color: #fff;
|
||||
--text-color: #000;
|
||||
--background-color: #fff;
|
||||
--text-color: #000;
|
||||
|
||||
--bg: oklch(0.76 0.0854 317.27);
|
||||
--panel-bg: oklch(0.91 0.042 317.27);
|
||||
--bg: oklch(0.76 0.0854 317.27);
|
||||
--panel-bg: oklch(0.91 0.042 317.27);
|
||||
|
||||
--name-lightness: 0.45;
|
||||
--name-lightness: 0.45;
|
||||
|
||||
@media (prefers-color-scheme: dark) {
|
||||
color-scheme: dark;
|
||||
--text-color: #fff;
|
||||
--bg: oklch(0.15 0.042 317.27);
|
||||
--panel-bg: oklch(0.24 0.03 317.27);
|
||||
@media (prefers-color-scheme: dark) {
|
||||
color-scheme: dark;
|
||||
--text-color: #fff;
|
||||
--bg: oklch(0.15 0.042 317.27);
|
||||
--panel-bg: oklch(0.24 0.03 317.27);
|
||||
|
||||
--name-lightness: 0.8;
|
||||
}
|
||||
--name-lightness: 0.8;
|
||||
}
|
||||
|
||||
--c1: oklch(0.44 0.177 353.06);
|
||||
--c2: oklch(0.59 0.158 150.88);
|
||||
--c1: oklch(0.44 0.177 353.06);
|
||||
--c2: oklch(0.59 0.158 150.88);
|
||||
|
||||
--normal-font-size: 1rem;
|
||||
--small-font-size: 0.8rem;
|
||||
--normal-font-size: 1rem;
|
||||
--small-font-size: 0.8rem;
|
||||
}
|
||||
|
||||
body {
|
||||
color: var(--text-color);
|
||||
font-family: var(--font-stack);
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
display: grid;
|
||||
place-items: center;
|
||||
min-height: 100vh;
|
||||
color: var(--text-color);
|
||||
font-family: var(--font-stack);
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
display: grid;
|
||||
place-items: center;
|
||||
min-height: 100vh;
|
||||
}
|
||||
|
||||
html {
|
||||
background-color: var(--bg);
|
||||
background-image: linear-gradient(
|
||||
70deg,
|
||||
oklch(from var(--bg) l + 0.2 c h),
|
||||
oklch(from var(--bg) l - 0.2 c h)
|
||||
);
|
||||
font-size: 16px;
|
||||
background-color: var(--bg);
|
||||
background-image: linear-gradient(
|
||||
70deg,
|
||||
oklch(from var(--bg) l + 0.2 c h),
|
||||
oklch(from var(--bg) l - 0.2 c h)
|
||||
);
|
||||
font-size: 16px;
|
||||
}
|
||||
|
||||
.panel {
|
||||
width: min(clamp(24rem, 12rem + 40vw, 48rem), calc(100vw - 3rem));
|
||||
border-radius: 15px;
|
||||
background-color: var(--panel-bg);
|
||||
padding-inline: 1.5rem;
|
||||
padding-block: 1rem;
|
||||
box-shadow: 0 0.25em 0.375em hsla(0, 0%, 0%, 0.1);
|
||||
}
|
||||
|
||||
@media (max-width: 24rem) {
|
||||
.panel {
|
||||
padding-inline: 0.25rem;
|
||||
width: calc(100vw - 0.5rem);
|
||||
border-radius: 0;
|
||||
margin-block-start: 0.2rem;
|
||||
}
|
||||
main {
|
||||
height: 100%;
|
||||
}
|
||||
}
|
||||
|
||||
footer {
|
||||
padding-inline: 0.25rem;
|
||||
height: max(fit-content, 2rem);
|
||||
width: min(clamp(24rem, 12rem + 40vw, 48rem), 100vw);
|
||||
border-radius: 15px;
|
||||
background-color: var(--panel-bg);
|
||||
padding-inline: 1.5rem;
|
||||
padding-block: 1rem;
|
||||
box-shadow: 0 0.25em 0.375em hsla(0, 0%, 0%, 0.1);
|
||||
}
|
||||
|
||||
.project-name {
|
||||
text-decoration: none;
|
||||
background: linear-gradient(
|
||||
130deg,
|
||||
oklch(from var(--c1) var(--name-lightness) c h),
|
||||
oklch(from var(--c2) var(--name-lightness) c h)
|
||||
);
|
||||
background-clip: text;
|
||||
color: transparent;
|
||||
filter: brightness(1.2);
|
||||
text-decoration: none;
|
||||
background: linear-gradient(
|
||||
130deg,
|
||||
oklch(from var(--c1) var(--name-lightness) c h),
|
||||
oklch(from var(--c2) var(--name-lightness) c h)
|
||||
);
|
||||
background-clip: text;
|
||||
color: transparent;
|
||||
filter: brightness(1.2);
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
response::{Html, IntoResponse, Response},
|
||||
routing::get,
|
||||
};
|
||||
use conduwuit_build_metadata::{GIT_REMOTE_COMMIT_URL, GIT_REMOTE_WEB_URL, VERSION_EXTRA};
|
||||
use conduwuit_build_metadata::{GIT_REMOTE_COMMIT_URL, GIT_REMOTE_WEB_URL, version_tag};
|
||||
use conduwuit_service::state;
|
||||
|
||||
pub fn build() -> Router<state::State> {
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
{%~ block footer ~%}
|
||||
<footer>
|
||||
<p>Powered by <a href="https://continuwuity.org">Continuwuity</a>
|
||||
{%~ if let Some(version_info) = VERSION_EXTRA ~%}
|
||||
{%~ if let Some(version_info) = self::version_tag() ~%}
|
||||
{%~ if let Some(url) = GIT_REMOTE_COMMIT_URL.or(GIT_REMOTE_WEB_URL) ~%}
|
||||
(<a href="{{ url }}">{{ version_info }}</a>)
|
||||
{%~ else ~%}
|
||||
|
||||
Reference in New Issue
Block a user