mirror of
https://forgejo.ellis.link/continuwuation/continuwuity/
synced 2026-05-13 22:43:07 +00:00
1119 lines
28 KiB
Rust
1119 lines
28 KiB
Rust
use std::{
|
|
collections::{HashMap, HashSet},
|
|
fmt::Write,
|
|
iter::once,
|
|
time::{Instant, SystemTime},
|
|
};
|
|
|
|
use conduwuit::{
|
|
debug_error, err, info, matrix::{
|
|
pdu::{PduEvent, PduId, RawPduId},
|
|
Event,
|
|
}, trace,
|
|
utils,
|
|
utils::{
|
|
stream::{IterStream, ReadyExt},
|
|
string::EMPTY,
|
|
}, warn,
|
|
Err,
|
|
Result,
|
|
};
|
|
use futures::{FutureExt, StreamExt, TryStreamExt};
|
|
use lettre::message::Mailbox;
|
|
use ruma::{
|
|
api::federation::event::get_room_state, events::AnyStateEvent, serde::Raw, CanonicalJsonObject, CanonicalJsonValue,
|
|
EventId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
|
|
RoomId, RoomVersionId, UInt,
|
|
};
|
|
use service::rooms::{
|
|
short::{ShortEventId, ShortRoomId},
|
|
state_compressor::HashSetCompressStateEvent,
|
|
};
|
|
use tracing_subscriber::EnvFilter;
|
|
|
|
use crate::admin_command;
|
|
|
|
#[admin_command]
|
|
pub(super) async fn echo(&self, message: Vec<String>) -> Result {
|
|
let message = message.join(" ");
|
|
self.write_str(&message).await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn get_auth_chain(&self, event_id: OwnedEventId) -> Result {
|
|
let Ok(event) = self.services.rooms.timeline.get_pdu_json(&event_id).await else {
|
|
return Err!("Event not found.");
|
|
};
|
|
|
|
let room_id_str = event
|
|
.get("room_id")
|
|
.and_then(CanonicalJsonValue::as_str)
|
|
.ok_or_else(|| err!(Database("Invalid event in database")))?;
|
|
|
|
let room_id = <&RoomId>::try_from(room_id_str)
|
|
.map_err(|_| err!(Database("Invalid room id field in event in database")))?;
|
|
|
|
let start = Instant::now();
|
|
let count = self
|
|
.services
|
|
.rooms
|
|
.auth_chain
|
|
.event_ids_iter(room_id, once(event_id.as_ref()))
|
|
.ready_filter_map(Result::ok)
|
|
.count()
|
|
.await;
|
|
|
|
let elapsed = start.elapsed();
|
|
let out = format!("Loaded auth chain with length {count} in {elapsed:?}");
|
|
|
|
self.write_str(&out).await
|
|
}
|
|
|
|
#[derive(Clone, Copy, Eq, PartialEq)]
|
|
enum NodeStatus {
|
|
Normal(bool),
|
|
SoftFailed(bool),
|
|
Rejected(bool),
|
|
}
|
|
|
|
struct AuthChild {
|
|
node_id: String,
|
|
event_id: OwnedEventId,
|
|
depth: UInt,
|
|
ts: UInt,
|
|
first_seen: bool,
|
|
pdu: Option<PduEvent>,
|
|
}
|
|
|
|
fn render_node(
|
|
graph: &mut String,
|
|
node_id: &str,
|
|
event_id: &EventId,
|
|
status: NodeStatus,
|
|
) -> Result {
|
|
let evt_str = event_id.to_string();
|
|
|
|
let status_label = match status {
|
|
| NodeStatus::Normal(false) => evt_str,
|
|
| NodeStatus::Normal(true) => format!("{evt_str} (missing locally)"),
|
|
| NodeStatus::SoftFailed(false) => format!("{evt_str} (soft-failed)"),
|
|
| NodeStatus::SoftFailed(true) => format!("{evt_str} (soft-failed & missing locally)"),
|
|
| NodeStatus::Rejected(false) => format!("{evt_str} (rejected)"),
|
|
| NodeStatus::Rejected(true) => format!("{evt_str} (rejected & missing locally)"),
|
|
};
|
|
|
|
writeln!(graph, "{node_id}[\"{}\"]", status_label.as_str())?;
|
|
|
|
match status {
|
|
| NodeStatus::Rejected(_) => writeln!(graph, "class {node_id} rejected;")?,
|
|
| NodeStatus::SoftFailed(_) => writeln!(graph, "class {node_id} soft_failed;")?,
|
|
| NodeStatus::Normal(_) => {},
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn show_auth_chain(&self, event_id: OwnedEventId) -> Result {
|
|
let node_status = async |event_id: &EventId, missing: bool| -> NodeStatus {
|
|
if self
|
|
.services
|
|
.rooms
|
|
.pdu_metadata
|
|
.is_event_rejected(event_id)
|
|
.await
|
|
{
|
|
NodeStatus::Rejected(missing)
|
|
} else if self
|
|
.services
|
|
.rooms
|
|
.pdu_metadata
|
|
.is_event_soft_failed(event_id)
|
|
.await
|
|
{
|
|
NodeStatus::SoftFailed(missing)
|
|
} else {
|
|
NodeStatus::Normal(missing)
|
|
}
|
|
};
|
|
|
|
let Ok(root) = self.services.rooms.timeline.get_pdu(&event_id).await else {
|
|
return Err!("Event not found.");
|
|
};
|
|
|
|
let mut graph = String::from(
|
|
"```mermaid\n%% This is a mermaid graph. You can plug this output into\n\
|
|
%% https://mermaid.live/edit to visualise it on-the-fly.\nflowchart TD\n\
|
|
classDef rejected fill:#ffe5e5,stroke:#cc0000,stroke-width:2px,color:#000;\n\
|
|
classDef soft_failed fill:#fff6cc,stroke:#c9a400,stroke-width:2px,color:#000;\n"
|
|
);
|
|
|
|
let mut node_ids: HashMap<OwnedEventId, String> = HashMap::new();
|
|
let mut cached_events: HashMap<OwnedEventId, PduEvent> =
|
|
HashMap::from([(event_id.clone(), root.clone())]);
|
|
let mut scheduled: HashSet<OwnedEventId> = HashSet::from([event_id.clone()]);
|
|
let mut visited: HashSet<OwnedEventId> = HashSet::new();
|
|
let mut stack = vec![root];
|
|
let mut next_node_id = 0_usize;
|
|
|
|
let node_id_for = |event_id: &OwnedEventId,
|
|
node_ids: &mut HashMap<OwnedEventId, String>,
|
|
next_node_id: &mut usize| {
|
|
node_ids
|
|
.entry(event_id.clone())
|
|
.or_insert_with(|| {
|
|
let id = format!("n{}", *next_node_id);
|
|
*next_node_id = next_node_id.saturating_add(1);
|
|
id
|
|
})
|
|
.clone()
|
|
};
|
|
|
|
while let Some(event) = stack.pop() {
|
|
let current_event_id = event.event_id().to_owned();
|
|
if !visited.insert(current_event_id.clone()) {
|
|
continue;
|
|
}
|
|
|
|
let current_node_id = node_id_for(¤t_event_id, &mut node_ids, &mut next_node_id);
|
|
let current_status = node_status(¤t_event_id, false).await;
|
|
|
|
render_node(&mut graph, ¤t_node_id, ¤t_event_id, current_status)?;
|
|
|
|
let mut children = Vec::with_capacity(event.auth_events.len());
|
|
for auth_event_id in event.auth_events().rev() {
|
|
let auth_event_id = auth_event_id.to_owned();
|
|
let auth_node_id = node_id_for(&auth_event_id, &mut node_ids, &mut next_node_id);
|
|
writeln!(graph, "{current_node_id} --> {auth_node_id}")?;
|
|
|
|
let first_seen = scheduled.insert(auth_event_id.clone());
|
|
let auth_pdu = if let Some(auth_pdu) = cached_events.get(&auth_event_id) {
|
|
// NOTE: events might be referenced multiple times (like the create event)
|
|
// so this saves some cheeky db lookup time
|
|
Some(auth_pdu.clone())
|
|
} else if first_seen {
|
|
match self.services.rooms.timeline.get_pdu(&auth_event_id).await {
|
|
| Ok(auth_event) => {
|
|
cached_events.insert(auth_event_id.clone(), auth_event.clone());
|
|
Some(auth_event)
|
|
},
|
|
| Err(_) => None,
|
|
}
|
|
} else {
|
|
None
|
|
};
|
|
|
|
// NOTE: Depth is used as the primary sorting key here, even though it has no
|
|
// bearing on state resolution or anything. Timestamp is used as a
|
|
// tiebreaker, failing back to lexicographical comparison.
|
|
let (depth, ts) = auth_pdu
|
|
.as_ref()
|
|
.map_or((UInt::MAX, UInt::MAX), |pdu| (pdu.depth, pdu.origin_server_ts));
|
|
|
|
children.push(AuthChild {
|
|
node_id: auth_node_id,
|
|
event_id: auth_event_id,
|
|
depth,
|
|
ts,
|
|
first_seen,
|
|
pdu: auth_pdu,
|
|
});
|
|
}
|
|
|
|
children.sort_by(|a, b| {
|
|
a.depth
|
|
.cmp(&b.depth)
|
|
.then(a.ts.cmp(&b.ts))
|
|
.then(a.event_id.as_str().cmp(b.event_id.as_str()))
|
|
});
|
|
|
|
for child in children.into_iter().rev() {
|
|
if !child.first_seen {
|
|
continue;
|
|
}
|
|
|
|
if let Some(child_pdu) = child.pdu {
|
|
// We have this PDU so will want to traverse it.
|
|
stack.push(child_pdu);
|
|
} else {
|
|
// We don't have this PDU locally so we can't traverse its auth events,
|
|
// but we can still render it as a node.
|
|
render_node(
|
|
&mut graph,
|
|
&child.node_id,
|
|
&child.event_id,
|
|
node_status(&child.event_id, true).await,
|
|
)?;
|
|
}
|
|
}
|
|
}
|
|
|
|
graph.push_str("```\n");
|
|
self.write_str(&graph).await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn parse_pdu(&self) -> Result {
|
|
if self.body.len() < 2
|
|
|| !self.body[0].trim().starts_with("```")
|
|
|| self.body.last().unwrap_or(&EMPTY).trim() != "```"
|
|
{
|
|
return Err!("Expected code block in command body. Add --help for details.");
|
|
}
|
|
|
|
let string = self.body[1..self.body.len().saturating_sub(1)].join("\n");
|
|
let room_version_rules = RoomVersionId::V12.rules().unwrap();
|
|
|
|
match serde_json::from_str(&string) {
|
|
| Err(e) => return Err!("Invalid json in command body: {e}"),
|
|
| Ok(value) => match ruma::signatures::reference_hash(&value, &room_version_rules) {
|
|
| Err(e) => return Err!("Could not parse PDU JSON: {e:?}"),
|
|
| Ok(hash) => {
|
|
let event_id = EventId::parse(format!("${hash}"));
|
|
match serde_json::from_value::<PduEvent>(serde_json::to_value(value)?) {
|
|
| Err(e) => return Err!("EventId: {event_id:?}\nCould not parse event: {e}"),
|
|
| Ok(pdu) => write!(self, "EventId: {event_id:?}\n{pdu:#?}"),
|
|
}
|
|
},
|
|
},
|
|
}
|
|
.await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn get_pdu(&self, event_id: OwnedEventId) -> Result {
|
|
let mut outlier = false;
|
|
let mut pdu_json = self
|
|
.services
|
|
.rooms
|
|
.timeline
|
|
.get_non_outlier_pdu_json(&event_id)
|
|
.await;
|
|
|
|
if pdu_json.is_err() {
|
|
outlier = true;
|
|
pdu_json = self.services.rooms.timeline.get_pdu_json(&event_id).await;
|
|
}
|
|
let rejected = self
|
|
.services
|
|
.rooms
|
|
.pdu_metadata
|
|
.is_event_rejected(&event_id)
|
|
.await;
|
|
let soft_failed = self
|
|
.services
|
|
.rooms
|
|
.pdu_metadata
|
|
.is_event_soft_failed(&event_id)
|
|
.await;
|
|
|
|
match pdu_json {
|
|
| Err(_) => return Err!("PDU not found locally."),
|
|
| Ok(json) => {
|
|
let text = serde_json::to_string_pretty(&json)?;
|
|
let msg = if rejected {
|
|
"Rejected PDU:"
|
|
} else if soft_failed {
|
|
"Soft-failed PDU:"
|
|
} else if outlier {
|
|
"Outlier PDU:"
|
|
} else {
|
|
"PDU:"
|
|
};
|
|
write!(self, "{msg}\n```json\n{text}\n```")
|
|
},
|
|
}
|
|
.await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn get_short_pdu(
|
|
&self,
|
|
shortroomid: ShortRoomId,
|
|
shorteventid: ShortEventId,
|
|
) -> Result {
|
|
let pdu_id: RawPduId = PduId {
|
|
shortroomid,
|
|
shorteventid: shorteventid.into(),
|
|
}
|
|
.into();
|
|
|
|
let pdu_json = self
|
|
.services
|
|
.rooms
|
|
.timeline
|
|
.get_pdu_json_from_id(&pdu_id)
|
|
.await;
|
|
|
|
match pdu_json {
|
|
| Err(_) => return Err!("PDU not found locally."),
|
|
| Ok(json) => {
|
|
let json_text = serde_json::to_string_pretty(&json)?;
|
|
write!(self, "```json\n{json_text}\n```")
|
|
},
|
|
}
|
|
.await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn get_remote_pdu_list(&self, server: OwnedServerName, force: bool) -> Result {
|
|
if !self.services.server.config.allow_federation {
|
|
return Err!("Federation is disabled on this homeserver.",);
|
|
}
|
|
|
|
if server == self.services.globals.server_name() {
|
|
return Err!(
|
|
"Not allowed to send federation requests to ourselves. Please use `get-pdu` for \
|
|
fetching local PDUs from the database.",
|
|
);
|
|
}
|
|
|
|
if self.body.len() < 2
|
|
|| !self.body[0].trim().starts_with("```")
|
|
|| self.body.last().unwrap_or(&EMPTY).trim() != "```"
|
|
{
|
|
return Err!("Expected code block in command body. Add --help for details.",);
|
|
}
|
|
|
|
let list = self
|
|
.body
|
|
.iter()
|
|
.collect::<Vec<_>>()
|
|
.drain(1..self.body.len().saturating_sub(1))
|
|
.filter_map(|pdu| EventId::parse(pdu).ok())
|
|
.collect::<Vec<_>>();
|
|
|
|
let mut failed_count: usize = 0;
|
|
let mut success_count: usize = 0;
|
|
|
|
for event_id in list {
|
|
if force {
|
|
match self.get_remote_pdu(event_id.clone(), server.clone()).await {
|
|
| Err(e) => {
|
|
failed_count = failed_count.saturating_add(1);
|
|
self.services
|
|
.admin
|
|
.send_text(&format!("Failed to get remote PDU, ignoring error: {e}"))
|
|
.await;
|
|
|
|
warn!("Failed to get remote PDU, ignoring error: {e}");
|
|
},
|
|
| _ => {
|
|
success_count = success_count.saturating_add(1);
|
|
},
|
|
}
|
|
} else {
|
|
self.get_remote_pdu(event_id.clone(), server.clone())
|
|
.await?;
|
|
success_count = success_count.saturating_add(1);
|
|
}
|
|
}
|
|
|
|
let out =
|
|
format!("Fetched {success_count} remote PDUs successfully with {failed_count} failures");
|
|
|
|
self.write_str(&out).await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn get_remote_pdu(
|
|
&self,
|
|
event_id: OwnedEventId,
|
|
server: OwnedServerName,
|
|
) -> Result {
|
|
if !self.services.server.config.allow_federation {
|
|
return Err!("Federation is disabled on this homeserver.");
|
|
}
|
|
|
|
if server == self.services.globals.server_name() {
|
|
return Err!(
|
|
"Not allowed to send federation requests to ourselves. Please use `get-pdu` for \
|
|
fetching local PDUs.",
|
|
);
|
|
}
|
|
|
|
match self
|
|
.services
|
|
.sending
|
|
.send_federation_request(
|
|
&server,
|
|
ruma::api::federation::event::get_event::v1::Request::new(event_id.clone()),
|
|
)
|
|
.await
|
|
{
|
|
| Err(e) => {
|
|
return Err!(
|
|
"Remote server did not have PDU or failed sending request to remote server: {e}"
|
|
);
|
|
},
|
|
| Ok(response) => {
|
|
let json: CanonicalJsonObject =
|
|
serde_json::from_str(response.pdu.get()).map_err(|e| {
|
|
warn!(
|
|
"Requested event ID {event_id} from server but failed to convert from \
|
|
RawValue to CanonicalJsonObject (malformed event/response?): {e}"
|
|
);
|
|
err!(Request(Unknown(
|
|
"Received response from server but failed to parse PDU"
|
|
)))
|
|
})?;
|
|
|
|
trace!("Attempting to parse PDU: {:?}", &response.pdu);
|
|
let _parsed_pdu = {
|
|
let parsed_result = self
|
|
.services
|
|
.rooms
|
|
.event_handler
|
|
.parse_incoming_pdu(&response.pdu)
|
|
.boxed()
|
|
.await;
|
|
|
|
let (event_id, value, room_id) = match parsed_result {
|
|
| Ok(t) => t,
|
|
| Err(e) => {
|
|
warn!("Failed to parse PDU: {e}");
|
|
info!("Full PDU: {:?}", &response.pdu);
|
|
return Err!("Failed to parse PDU remote server {server} sent us: {e}");
|
|
},
|
|
};
|
|
|
|
vec![(event_id, value, room_id)]
|
|
};
|
|
|
|
let text = serde_json::to_string_pretty(&json)?;
|
|
let msg = "Got PDU from specified server:";
|
|
write!(self, "{msg}. Event body:\n```json\n{text}\n```")
|
|
},
|
|
}
|
|
.await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn get_room_state(&self, room: OwnedRoomOrAliasId) -> Result {
|
|
self.bail_restricted()?;
|
|
|
|
let room_id = self.services.rooms.alias.resolve(&room).await?;
|
|
let room_state: Vec<Raw<AnyStateEvent>> = self
|
|
.services
|
|
.rooms
|
|
.state_accessor
|
|
.room_state_full_pdus(&room_id)
|
|
.map_ok(Event::into_format)
|
|
.try_collect()
|
|
.await?;
|
|
|
|
if room_state.is_empty() {
|
|
return Err!("Unable to find room state in our database (vector is empty)",);
|
|
}
|
|
|
|
let json = serde_json::to_string_pretty(&room_state).map_err(|e| {
|
|
err!(Database(
|
|
"Failed to convert room state events to pretty JSON, possible invalid room state \
|
|
events in our database {e}",
|
|
))
|
|
})?;
|
|
|
|
let out = format!("```json\n{json}\n```");
|
|
self.write_str(&out).await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn ping(&self, server: OwnedServerName) -> Result {
|
|
if server == self.services.globals.server_name() {
|
|
return Err!("Not allowed to send federation requests to ourselves.");
|
|
}
|
|
|
|
let timer = tokio::time::Instant::now();
|
|
|
|
match self
|
|
.services
|
|
.sending
|
|
.send_unauthenticated_request(
|
|
&server,
|
|
ruma::api::federation::discovery::get_server_version::v1::Request::new(),
|
|
)
|
|
.await
|
|
{
|
|
| Err(e) => {
|
|
return Err!("Failed sending federation request to specified server:\n\n{e}");
|
|
},
|
|
| Ok(response) => {
|
|
let ping_time = timer.elapsed();
|
|
let json_text_res = serde_json::to_string_pretty(&response.server);
|
|
|
|
let out = if let Ok(json) = json_text_res {
|
|
format!("Got response which took {ping_time:?} time:\n```json\n{json}\n```")
|
|
} else {
|
|
format!("Got non-JSON response which took {ping_time:?} time:\n{response:?}")
|
|
};
|
|
|
|
write!(self, "{out}")
|
|
},
|
|
}
|
|
.await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn force_device_list_updates(&self) -> Result {
|
|
// Force E2EE device list updates for all users
|
|
self.services
|
|
.users
|
|
.stream()
|
|
.for_each(async |user_id| self.services.users.mark_device_key_update(&user_id).await)
|
|
.await;
|
|
|
|
write!(self, "Marked all devices for all users as having new keys to update").await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn change_log_level(&self, filter: Option<String>, reset: bool) -> Result {
|
|
let handles = &["console"];
|
|
|
|
if reset {
|
|
let old_filter_layer = match EnvFilter::try_new(&self.services.server.config.log) {
|
|
| Ok(s) => s,
|
|
| Err(e) => return Err!("Log level from config appears to be invalid now: {e}"),
|
|
};
|
|
|
|
match self
|
|
.services
|
|
.server
|
|
.log
|
|
.reload
|
|
.reload(&old_filter_layer, Some(handles))
|
|
{
|
|
| Err(e) => {
|
|
return Err!("Failed to modify and reload the global tracing log level: {e}");
|
|
},
|
|
| Ok(()) => {
|
|
let value = &self.services.server.config.log;
|
|
let out = format!("Successfully changed log level back to config value {value}");
|
|
return self.write_str(&out).await;
|
|
},
|
|
}
|
|
}
|
|
|
|
if let Some(filter) = filter {
|
|
let new_filter_layer = match EnvFilter::try_new(filter) {
|
|
| Ok(s) => s,
|
|
| Err(e) => return Err!("Invalid log level filter specified: {e}"),
|
|
};
|
|
|
|
match self
|
|
.services
|
|
.server
|
|
.log
|
|
.reload
|
|
.reload(&new_filter_layer, Some(handles))
|
|
{
|
|
| Ok(()) => {
|
|
return self.write_str("Successfully changed log level").await;
|
|
},
|
|
| Err(e) => {
|
|
return Err!("Failed to modify and reload the global tracing log level: {e}");
|
|
},
|
|
}
|
|
}
|
|
|
|
Err!("No log level was specified.")
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn verify_json(&self) -> Result {
|
|
if self.body.len() < 2
|
|
|| !self.body[0].trim().starts_with("```")
|
|
|| self.body.last().unwrap_or(&"").trim() != "```"
|
|
{
|
|
return Err!("Expected code block in command body. Add --help for details.");
|
|
}
|
|
|
|
let string = self.body[1..self.body.len().checked_sub(1).unwrap()].join("\n");
|
|
let room_version_rules = RoomVersionId::V12.rules().unwrap();
|
|
|
|
match serde_json::from_str::<CanonicalJsonObject>(&string) {
|
|
| Err(e) => return Err!("Invalid json: {e}"),
|
|
| Ok(value) => match self
|
|
.services
|
|
.server_keys
|
|
.verify_json(&value, &room_version_rules)
|
|
.await
|
|
{
|
|
| Err(e) => return Err!("Signature verification failed: {e}"),
|
|
| Ok(()) => write!(self, "Signature correct"),
|
|
},
|
|
}
|
|
.await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn verify_pdu(&self, event_id: OwnedEventId) -> Result {
|
|
use ruma::signatures::Verified;
|
|
|
|
let mut event = self.services.rooms.timeline.get_pdu_json(&event_id).await?;
|
|
let room_version_rules = RoomVersionId::V12.rules().unwrap();
|
|
|
|
event.remove("event_id");
|
|
let msg = match self
|
|
.services
|
|
.server_keys
|
|
.verify_event(&event, &room_version_rules)
|
|
.await
|
|
{
|
|
| Err(e) => return Err(e),
|
|
| Ok(Verified::Signatures) => "signatures OK, but content hash failed (redaction).",
|
|
| Ok(Verified::All) => "signatures and hashes OK.",
|
|
};
|
|
|
|
self.write_str(msg).await
|
|
}
|
|
|
|
#[admin_command]
|
|
#[tracing::instrument(skip(self), level = "info")]
|
|
pub(super) async fn first_pdu_in_room(&self, room_id: OwnedRoomId) -> Result {
|
|
self.bail_restricted()?;
|
|
|
|
if !self
|
|
.services
|
|
.rooms
|
|
.state_cache
|
|
.server_in_room(&self.services.server.name, &room_id)
|
|
.await
|
|
{
|
|
return Err!("We are not participating in the room / we don't know about the room ID.",);
|
|
}
|
|
|
|
let first_pdu = self
|
|
.services
|
|
.rooms
|
|
.timeline
|
|
.first_pdu_in_room(&room_id)
|
|
.await
|
|
.map_err(|_| err!(Database("Failed to find the first PDU in database")))?;
|
|
|
|
let out = format!("{first_pdu:?}");
|
|
self.write_str(&out).await
|
|
}
|
|
|
|
#[admin_command]
|
|
#[tracing::instrument(skip(self), level = "info")]
|
|
pub(super) async fn latest_pdu_in_room(&self, room_id: OwnedRoomId) -> Result {
|
|
self.bail_restricted()?;
|
|
|
|
if !self
|
|
.services
|
|
.rooms
|
|
.state_cache
|
|
.server_in_room(&self.services.server.name, &room_id)
|
|
.await
|
|
{
|
|
return Err!("We are not participating in the room / we don't know about the room ID.");
|
|
}
|
|
|
|
let latest_pdu = self
|
|
.services
|
|
.rooms
|
|
.timeline
|
|
.latest_pdu_in_room(&room_id)
|
|
.await
|
|
.map_err(|_| err!(Database("Failed to find the latest PDU in database")))?;
|
|
|
|
let out = format!("{latest_pdu:?}");
|
|
self.write_str(&out).await
|
|
}
|
|
|
|
#[admin_command]
|
|
#[tracing::instrument(skip(self), level = "info")]
|
|
pub(super) async fn force_set_room_state_from_server(
|
|
&self,
|
|
room_id: OwnedRoomId,
|
|
server_name: OwnedServerName,
|
|
at_event: Option<OwnedEventId>,
|
|
) -> Result {
|
|
self.bail_restricted()?;
|
|
|
|
if !self
|
|
.services
|
|
.rooms
|
|
.state_cache
|
|
.server_in_room(&self.services.server.name, &room_id)
|
|
.await
|
|
{
|
|
return Err!("We are not participating in the room / we don't know about the room ID.");
|
|
}
|
|
|
|
let at_event_id = match at_event {
|
|
| Some(event_id) => event_id,
|
|
| None => self
|
|
.services
|
|
.rooms
|
|
.timeline
|
|
.latest_pdu_in_room(&room_id)
|
|
.await
|
|
.map_err(|_| err!(Database("Failed to find the latest PDU in database")))?
|
|
.event_id()
|
|
.to_owned(),
|
|
};
|
|
|
|
let room_version = self.services.rooms.state.get_room_version(&room_id).await?;
|
|
let room_version_rules = room_version.rules().unwrap();
|
|
|
|
let mut state: HashMap<u64, OwnedEventId> = HashMap::new();
|
|
|
|
let remote_state_response = self
|
|
.services
|
|
.sending
|
|
.send_federation_request(
|
|
&server_name,
|
|
get_room_state::v1::Request::new(at_event_id, room_id.clone()),
|
|
)
|
|
.await?;
|
|
|
|
for pdu in remote_state_response.pdus.clone() {
|
|
match self
|
|
.services
|
|
.rooms
|
|
.event_handler
|
|
.parse_incoming_pdu(&pdu)
|
|
.await
|
|
{
|
|
| Ok(t) => t,
|
|
| Err(e) => {
|
|
warn!("Could not parse PDU, ignoring: {e}");
|
|
continue;
|
|
},
|
|
};
|
|
}
|
|
|
|
info!("Going through room_state response PDUs");
|
|
for result in remote_state_response.pdus.iter().map(|pdu| {
|
|
self.services
|
|
.server_keys
|
|
.validate_and_add_event_id(pdu, &room_version_rules)
|
|
}) {
|
|
let Ok((event_id, value)) = result.await else {
|
|
continue;
|
|
};
|
|
|
|
let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| {
|
|
debug_error!("Invalid PDU in fetching remote room state PDUs response: {value:#?}");
|
|
err!(BadServerResponse(debug_error!("Invalid PDU in send_join response: {e:?}")))
|
|
})?;
|
|
|
|
self.services
|
|
.rooms
|
|
.outlier
|
|
.add_pdu_outlier(&event_id, &value);
|
|
|
|
if let Some(state_key) = &pdu.state_key {
|
|
let shortstatekey = self
|
|
.services
|
|
.rooms
|
|
.short
|
|
.get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)
|
|
.await;
|
|
|
|
state.insert(shortstatekey, pdu.event_id.clone());
|
|
self.services.rooms.pdu_metadata.clear_pdu_markers(pdu.event_id());
|
|
}
|
|
}
|
|
|
|
info!("Going through auth_chain response");
|
|
for result in remote_state_response.auth_chain.iter().map(|pdu| {
|
|
self.services
|
|
.server_keys
|
|
.validate_and_add_event_id(pdu, &room_version_rules)
|
|
}) {
|
|
let Ok((event_id, value)) = result.await else {
|
|
continue;
|
|
};
|
|
|
|
self.services
|
|
.rooms
|
|
.outlier
|
|
.add_pdu_outlier(&event_id, &value);
|
|
self.services.rooms.pdu_metadata.clear_pdu_markers(&event_id);
|
|
}
|
|
|
|
info!("Resolving new room state");
|
|
let new_room_state = self
|
|
.services
|
|
.rooms
|
|
.event_handler
|
|
.resolve_state(&room_id, &room_version_rules, state)
|
|
.await?;
|
|
|
|
info!("Compressing new room state");
|
|
let HashSetCompressStateEvent {
|
|
shortstatehash: short_state_hash,
|
|
added,
|
|
removed,
|
|
} = self
|
|
.services
|
|
.rooms
|
|
.state_compressor
|
|
.save_state(room_id.clone().as_ref(), new_room_state)
|
|
.await?;
|
|
|
|
let state_lock = self.services.rooms.state.mutex.lock(&*room_id).await;
|
|
|
|
info!("Forcing new room state");
|
|
self.services
|
|
.rooms
|
|
.state
|
|
.force_state(room_id.clone().as_ref(), short_state_hash, added, removed, &state_lock)
|
|
.await?;
|
|
|
|
info!("Updating joined counts for room");
|
|
self.services
|
|
.rooms
|
|
.state_cache
|
|
.update_joined_count(&room_id)
|
|
.await;
|
|
|
|
self.write_str("Successfully forced the room state from the requested remote server.")
|
|
.await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn get_signing_keys(
|
|
&self,
|
|
server_name: Option<OwnedServerName>,
|
|
notary: Option<OwnedServerName>,
|
|
query: bool,
|
|
) -> Result {
|
|
let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone());
|
|
|
|
if let Some(notary) = notary {
|
|
let signing_keys = self
|
|
.services
|
|
.server_keys
|
|
.notary_request(¬ary, &server_name)
|
|
.await?;
|
|
|
|
let out = format!("```rs\n{signing_keys:#?}\n```");
|
|
return self.write_str(&out).await;
|
|
}
|
|
|
|
let signing_keys = if query {
|
|
self.services
|
|
.server_keys
|
|
.server_request(&server_name)
|
|
.await?
|
|
} else {
|
|
self.services
|
|
.server_keys
|
|
.signing_keys_for(&server_name)
|
|
.await?
|
|
};
|
|
|
|
let out = format!("```rs\n{signing_keys:#?}\n```");
|
|
self.write_str(&out).await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn get_verify_keys(&self, server_name: Option<OwnedServerName>) -> Result {
|
|
let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone());
|
|
|
|
let keys = self
|
|
.services
|
|
.server_keys
|
|
.verify_keys_for(&server_name)
|
|
.await;
|
|
|
|
let mut out = String::new();
|
|
writeln!(out, "| Key ID | Public Key |")?;
|
|
writeln!(out, "| --- | --- |")?;
|
|
for (key_id, key) in keys {
|
|
writeln!(out, "| {key_id} | {key:?} |")?;
|
|
}
|
|
|
|
self.write_str(&out).await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn resolve_true_destination(
|
|
&self,
|
|
server_name: OwnedServerName,
|
|
no_cache: bool,
|
|
) -> Result {
|
|
if !self.services.server.config.allow_federation {
|
|
return Err!("Federation is disabled on this homeserver.",);
|
|
}
|
|
|
|
if server_name == self.services.server.name {
|
|
return Err!(
|
|
"Not allowed to send federation requests to ourselves. Please use `get-pdu` for \
|
|
fetching local PDUs.",
|
|
);
|
|
}
|
|
|
|
let actual = self
|
|
.services
|
|
.resolver
|
|
.resolve_actual_dest(&server_name, !no_cache)
|
|
.await?;
|
|
|
|
let msg = format!("Destination: {}\nHostname URI: {}", actual.dest, actual.host);
|
|
self.write_str(&msg).await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn memory_stats(&self, opts: Option<String>) -> Result {
|
|
const OPTS: &str = "abcdefghijklmnopqrstuvwxyz";
|
|
|
|
let opts: String = OPTS
|
|
.chars()
|
|
.filter(|&c| {
|
|
let allow_any = opts.as_ref().is_some_and(|opts| opts == "*");
|
|
|
|
let allow = allow_any || opts.as_ref().is_some_and(|opts| opts.contains(c));
|
|
|
|
!allow
|
|
})
|
|
.collect();
|
|
|
|
let stats = conduwuit::alloc::memory_stats(&opts).unwrap_or_default();
|
|
|
|
self.write_str("```\n").await?;
|
|
self.write_str(&stats).await?;
|
|
self.write_str("\n```").await?;
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(tokio_unstable)]
|
|
#[admin_command]
|
|
pub(super) async fn runtime_metrics(&self) -> Result {
|
|
let out = self.services.server.metrics.runtime_metrics().map_or_else(
|
|
|| "Runtime metrics are not available.".to_owned(),
|
|
|metrics| {
|
|
format!(
|
|
"```rs\nnum_workers: {}\nnum_alive_tasks: {}\nglobal_queue_depth: {}\n```",
|
|
metrics.num_workers(),
|
|
metrics.num_alive_tasks(),
|
|
metrics.global_queue_depth()
|
|
)
|
|
},
|
|
);
|
|
|
|
self.write_str(&out).await
|
|
}
|
|
|
|
#[cfg(not(tokio_unstable))]
|
|
#[admin_command]
|
|
pub(super) async fn runtime_metrics(&self) -> Result {
|
|
self.write_str("Runtime metrics require building with `tokio_unstable`.")
|
|
.await
|
|
}
|
|
|
|
#[cfg(tokio_unstable)]
|
|
#[admin_command]
|
|
pub(super) async fn runtime_interval(&self) -> Result {
|
|
let out = self.services.server.metrics.runtime_interval().map_or_else(
|
|
|| "Runtime metrics are not available.".to_owned(),
|
|
|metrics| format!("```rs\n{metrics:#?}\n```"),
|
|
);
|
|
|
|
self.write_str(&out).await
|
|
}
|
|
|
|
#[cfg(not(tokio_unstable))]
|
|
#[admin_command]
|
|
pub(super) async fn runtime_interval(&self) -> Result {
|
|
self.write_str("Runtime metrics require building with `tokio_unstable`.")
|
|
.await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn time(&self) -> Result {
|
|
let now = SystemTime::now();
|
|
let now = utils::time::format(now, "%+");
|
|
|
|
self.write_str(&now).await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn database_stats(
|
|
&self,
|
|
property: Option<String>,
|
|
map: Option<String>,
|
|
) -> Result {
|
|
let map_name = map.as_ref().map_or(EMPTY, String::as_str);
|
|
let property = property.unwrap_or_else(|| "rocksdb.stats".to_owned());
|
|
self.services
|
|
.db
|
|
.iter()
|
|
.filter(|&(&name, _)| map_name.is_empty() || map_name == name)
|
|
.try_stream()
|
|
.try_for_each(|(&name, map)| {
|
|
let res = map.property(&property).expect("invalid property");
|
|
writeln!(self, "##### {name}:\n```\n{}\n```", res.trim())
|
|
})
|
|
.await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn database_files(&self, map: Option<String>, level: Option<i32>) -> Result {
|
|
let mut files: Vec<_> = self.services.db.db.file_list().collect::<Result<_>>()?;
|
|
|
|
files.sort_by_key(|f| f.name.clone());
|
|
|
|
writeln!(self, "| lev | sst | keys | dels | size | column |").await?;
|
|
writeln!(self, "| ---: | :--- | ---: | ---: | ---: | :--- |").await?;
|
|
files
|
|
.into_iter()
|
|
.filter(|file| {
|
|
map.as_deref()
|
|
.is_none_or(|map| map == file.column_family_name)
|
|
})
|
|
.filter(|file| level.as_ref().is_none_or(|&level| level == file.level))
|
|
.try_stream()
|
|
.try_for_each(|file| {
|
|
writeln!(
|
|
self,
|
|
"| {} | {:<13} | {:7}+ | {:4}- | {:9} | {} |",
|
|
file.level,
|
|
file.name,
|
|
file.num_entries,
|
|
file.num_deletions,
|
|
file.size,
|
|
file.column_family_name,
|
|
)
|
|
})
|
|
.await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn trim_memory(&self) -> Result {
|
|
conduwuit::alloc::trim(None)?;
|
|
|
|
writeln!(self, "done").await
|
|
}
|
|
|
|
#[admin_command]
|
|
pub(super) async fn send_test_email(&self) -> Result {
|
|
self.bail_restricted()?;
|
|
|
|
let mailer = self.services.mailer.expect_mailer()?;
|
|
let Some(sender) = self.sender else {
|
|
return Err!("No sender user provided in context");
|
|
};
|
|
|
|
let Some(email) = self
|
|
.services
|
|
.threepid
|
|
.get_email_for_localpart(sender.localpart())
|
|
.await
|
|
else {
|
|
return Err!("{} has no associated email address", sender);
|
|
};
|
|
|
|
mailer
|
|
.send(Mailbox::new(None, email.clone()), service::mailer::messages::Test)
|
|
.await?;
|
|
|
|
self.write_str(&format!("Test email successfully sent to {email}"))
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|