Compare commits

..

2 Commits

Author SHA1 Message Date
Jade Ellis
4a7ad1350b WIP 2025-06-14 20:14:09 +01:00
Jade Ellis
d0f00e6f5c feat: Allow mentioning @room in an admin announcement 2025-06-14 19:09:54 +01:00
13 changed files with 335 additions and 56 deletions

1
Cargo.lock generated
View File

@@ -987,6 +987,7 @@ dependencies = [
"base64 0.22.1",
"blurhash",
"bytes",
"conduwuit_build_metadata",
"conduwuit_core",
"conduwuit_database",
"const-str",

View File

@@ -119,6 +119,15 @@
#
#allow_announcements_check = true
# If enabled, continuwuity will send anonymous analytics data periodically
# to help improve development. This includes basic server metadata like
# version, commit hash, and federation status. All requests are signed
# with the server's federation signing key. Data is sent on startup (with
# up to 5 minutes jitter) and every 12 hours thereafter (with up to 30
# minutes jitter) to distribute load.
#
#allow_analytics = true
# Set this to any float value to multiply continuwuity's in-memory LRU
# caches with such as "auth_chain_cache_capacity".
#

View File

@@ -3,7 +3,7 @@
"$id": "https://continwuity.org/schema/announcements.schema.json",
"type": "object",
"properties": {
"updates": {
"announcements": {
"type": "array",
"items": {
"type": "object",
@@ -16,6 +16,10 @@
},
"date": {
"type": "string"
},
"mention_room": {
"type": "boolean",
"description": "Whether to mention the room (@room) when posting this announcement"
}
},
"required": [
@@ -26,6 +30,6 @@
}
},
"required": [
"updates"
"announcements"
]
}

View File

@@ -145,6 +145,16 @@ pub(super) async fn restart(&self, force: bool) -> Result {
self.write_str("Restarting server...").await
}
#[admin_command]
pub(super) async fn upload_analytics(&self) -> Result {
match self.services.analytics.force_upload().await {
| Ok(()) => self.write_str("Analytics uploaded successfully.").await,
| Err(e) =>
self.write_str(&format!("Failed to upload analytics: {e}"))
.await,
}
}
#[admin_command]
pub(super) async fn shutdown(&self) -> Result {
warn!("shutdown command");

View File

@@ -64,4 +64,7 @@ pub(super) enum ServerCommand {
/// - Shutdown the server
Shutdown,
/// - Upload analytics
UploadAnalytics,
}

View File

@@ -169,6 +169,18 @@ pub struct Config {
#[serde(alias = "allow_check_for_updates", default = "true_fn")]
pub allow_announcements_check: bool,
/// If enabled, continuwuity will send anonymous analytics data periodically
/// to help improve development. This includes basic server metadata like
/// version, build information and federation status. All requests are
/// signed with the server's federation signing key.
///
/// This is also used to warn about potential problems with federation, if
/// federation is enabled.
///
/// default: true
#[serde(default = "true_fn")]
pub allow_analytics: bool,
/// Set this to any float value to multiply continuwuity's in-memory LRU
/// caches with such as "auth_chain_cache_capacity".
///

View File

@@ -78,6 +78,7 @@ zstd_compression = [
async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
conduwuit-build-metadata.workspace = true
conduwuit-core.workspace = true
conduwuit-database.workspace = true
const-str.workspace = true

View File

@@ -0,0 +1,245 @@
//! # Analytics service
//!
//! This service is responsible for collecting and uploading anonymous server
//! metadata to help improve continuwuity development.
//!
//! All requests are signed with the server's federation signing key for
//! authentication. This service respects the `allow_analytics` configuration
//! option and is enabled by default.
//!
//! Analytics are sent on startup (with up to 5 minutes jitter) and every 12
//! hours thereafter (with up to 30 minutes jitter) to distribute load.
use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use conduwuit::{
Result, Server, debug, err, info,
version::{self, user_agent},
warn,
};
use database::{Deserialized, Map};
use rand::Rng;
use ruma::ServerName;
use serde::{Deserialize, Serialize};
use tokio::{
sync::Notify,
time::{MissedTickBehavior, interval},
};
use crate::{Dep, client, config, federation, globals, server_keys, users};
extern crate conduwuit_build_metadata as build_metadata;
pub struct Service {
interval: Duration,
jitter: Duration,
startup_jitter: Duration,
interrupt: Notify,
db: Arc<Map>,
services: Services,
}
struct Services {
client: Dep<client::Service>,
globals: Dep<globals::Service>,
server_keys: Dep<server_keys::Service>,
federation: Dep<federation::Service>,
users: Dep<users::Service>,
server: Arc<Server>,
config: Dep<config::Service>,
}
#[derive(Debug, Serialize)]
struct AnalyticsPayload {
server_name: String,
version: &'static str,
commit_hash: Option<&'static str>,
user_count: usize,
federation_enabled: bool,
room_creation_allowed: bool,
public_room_directory_over_federation: bool,
build_profile: &'static str,
opt_level: &'static str,
rustc_version: &'static str,
features: Vec<&'static str>,
host: &'static str,
target: &'static str,
// the following can all be derived from the target
target_arch: &'static str,
target_os: &'static str,
target_env: &'static str,
target_family: &'static str,
}
#[derive(Debug, Deserialize)]
struct AnalyticsResponse {
success: bool,
message: Option<String>,
}
const ANALYTICS_URL: &str = "https://analytics.continuwuity.org/api/v1/metrics";
const ANALYTICS_SERVERNAME: &str = "analytics.continuwuity.org";
const ANALYTICS_INTERVAL: u64 = 43200; // 12 hours in seconds
const ANALYTICS_JITTER: u64 = 1800; // 30 minutes in seconds
const ANALYTICS_STARTUP_JITTER: u64 = 300; // 5 minutes in seconds
const LAST_ANALYTICS_TIMESTAMP: &[u8; 21] = b"last_analytics_upload";
#[async_trait]
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let mut rng = rand::thread_rng();
let jitter_seconds = rng.gen_range(0..=ANALYTICS_JITTER);
let startup_jitter_seconds = rng.gen_range(0..=ANALYTICS_STARTUP_JITTER);
Ok(Arc::new(Self {
interval: Duration::from_secs(ANALYTICS_INTERVAL),
jitter: Duration::from_secs(jitter_seconds),
startup_jitter: Duration::from_secs(startup_jitter_seconds),
interrupt: Notify::new(),
db: args.db["global"].clone(),
services: Services {
globals: args.depend::<globals::Service>("globals"),
client: args.depend::<client::Service>("client"),
config: args.depend::<config::Service>("config"),
server_keys: args.depend::<server_keys::Service>("server_keys"),
users: args.depend::<users::Service>("users"),
federation: args.depend::<federation::Service>("federation"),
server: args.server.clone(),
},
}))
}
#[tracing::instrument(skip_all, name = "analytics", level = "debug")]
async fn worker(self: Arc<Self>) -> Result<()> {
if !self.services.server.config.allow_analytics {
debug!("Analytics collection is disabled");
return Ok(());
}
// Send initial analytics on startup (with shorter jitter)
tokio::time::sleep(self.startup_jitter).await;
if let Err(e) = self.upload_analytics().await {
warn!(%e, "Failed to upload initial analytics");
}
let mut i = interval(self.interval);
i.set_missed_tick_behavior(MissedTickBehavior::Delay);
i.reset_after(self.interval + self.jitter);
loop {
tokio::select! {
() = self.interrupt.notified() => break,
_ = i.tick() => {
if let Err(e) = self.upload_analytics().await {
warn!(%e, "Failed to upload analytics");
}
}
}
}
Ok(())
}
fn interrupt(&self) { self.interrupt.notify_waiters(); }
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
#[tracing::instrument(skip_all)]
async fn upload_analytics(&self) -> Result<()> {
let payload = self.collect_metadata().await;
let json_payload = serde_json::to_vec(&payload)?;
// Create HTTP request
let request = http::Request::builder()
.method("POST")
.uri(ANALYTICS_URL)
.header("Content-Type", "application/json")
.header("User-Agent", user_agent())
.body(json_payload)?;
// Sign the request using federation signing
let reqwest_request = self.services.federation.sign_non_federation_request(
ServerName::parse(ANALYTICS_SERVERNAME).unwrap(),
request,
)?;
// self.sign_analytics_request(&mut request).await?;
let response = self
.services
.client
.default
.execute(reqwest_request)
.await?;
let status = response.status();
if let Ok(analytics_response) =
serde_json::from_str::<AnalyticsResponse>(&response.text().await?)
{
if analytics_response.success {
debug!("Analytics uploaded successfully");
self.update_last_upload_timestamp().await;
}
let msg = analytics_response.message.unwrap_or_default();
warn!("Analytics upload warning: {}", msg);
} else if status.is_success() {
info!("Analytics uploaded successfully (no structured response)");
self.update_last_upload_timestamp().await;
} else {
warn!("Analytics upload failed (no structured response) with status: {}", status);
}
Ok(())
}
async fn collect_metadata(&self) -> AnalyticsPayload {
let config = &self.services.config;
AnalyticsPayload {
server_name: self.services.globals.server_name().to_string(),
version: version::version(),
commit_hash: build_metadata::GIT_COMMIT_HASH,
user_count: self.services.users.count().await,
federation_enabled: config.allow_federation,
room_creation_allowed: config.allow_room_creation,
public_room_directory_over_federation: config
.allow_public_room_directory_over_federation,
build_profile: build_metadata::built::PROFILE,
opt_level: build_metadata::built::OPT_LEVEL,
rustc_version: build_metadata::built::RUSTC_VERSION,
features: build_metadata::built::FEATURES.to_vec(),
host: build_metadata::built::HOST,
target: build_metadata::built::TARGET,
target_arch: build_metadata::built::CFG_TARGET_ARCH,
target_os: build_metadata::built::CFG_OS,
target_env: build_metadata::built::CFG_ENV,
target_family: build_metadata::built::CFG_FAMILY,
}
}
async fn update_last_upload_timestamp(&self) {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.db.raw_put(LAST_ANALYTICS_TIMESTAMP, timestamp);
}
pub async fn last_upload_timestamp(&self) -> u64 {
self.db
.get(LAST_ANALYTICS_TIMESTAMP)
.await
.deserialized()
.unwrap_or(0_u64)
}
pub async fn force_upload(&self) -> Result<()> {
if !self.services.config.allow_analytics {
return Err(err!(Config("allow_analytics", "Analytics collection is disabled")));
}
self.upload_analytics().await
}
}

View File

@@ -20,7 +20,7 @@
use async_trait::async_trait;
use conduwuit::{Result, Server, debug, info, warn};
use database::{Deserialized, Map};
use ruma::events::room::message::RoomMessageEventContent;
use ruma::events::{Mentions, room::message::RoomMessageEventContent};
use serde::Deserialize;
use tokio::{
sync::Notify,
@@ -53,6 +53,8 @@ struct CheckForAnnouncementsResponseEntry {
id: u64,
date: Option<String>,
message: String,
#[serde(default, skip_serializing_if = "bool::not")]
mention_room: bool,
}
const CHECK_FOR_ANNOUNCEMENTS_URL: &str =
@@ -139,19 +141,20 @@ async fn handle(&self, announcement: &CheckForAnnouncementsResponseEntry) {
} else {
info!("[announcements] {:#}", announcement.message);
}
let mut message = RoomMessageEventContent::text_markdown(format!(
"### New announcement{}\n\n{}",
announcement
.date
.as_ref()
.map_or_else(String::new, |date| format!(" - `{date}`")),
announcement.message
));
self.services
.admin
.send_message(RoomMessageEventContent::text_markdown(format!(
"### New announcement{}\n\n{}",
announcement
.date
.as_ref()
.map_or_else(String::new, |date| format!(" - `{date}`")),
announcement.message
)))
.await
.ok();
if announcement.mention_room {
message = message.add_mentions(Mentions::with_room_mention());
}
self.services.admin.send_message(message).await.ok();
}
#[inline]

View File

@@ -107,6 +107,20 @@ fn prepare(&self, dest: &ServerName, mut request: http::Request<Vec<u8>>) -> Res
Ok(request)
}
#[implement(super::Service)]
pub fn sign_non_federation_request(
&self,
dest: &ServerName,
mut request: http::Request<Vec<u8>>,
) -> Result<Request> {
self.sign_request(&mut request, dest);
let request = Request::try_from(request)?;
self.services.server.check_running()?;
Ok(request)
}
#[implement(super::Service)]
fn validate_url(&self, url: &Url) -> Result<()> {
if let Some(url_host) = url.host_str() {

View File

@@ -9,6 +9,7 @@
pub mod account_data;
pub mod admin;
pub mod analytics;
pub mod announcements;
pub mod appservice;
pub mod client;

View File

@@ -1,6 +1,6 @@
mod data;
use std::{collections::{BTreeMap, HashMap}, sync::Arc};
use std::{collections::BTreeMap, sync::Arc};
use conduwuit::{
Result, debug, err,
@@ -9,9 +9,12 @@
};
use futures::{Stream, TryFutureExt, try_join};
use ruma::{
OwnedEventId, OwnedUserId, RoomId, UserId,
events::{
receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType, Receipts}, AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent
}, serde::Raw, OwnedEventId, OwnedUserId, RoomId, UserId
AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent,
receipt::{ReceiptEvent, ReceiptEventContent, Receipts},
},
serde::Raw,
};
use self::data::{Data, ReceiptItem};
@@ -44,48 +47,19 @@ fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Updates the public read receipt (`m.read`) based on the incoming event.
/// If the event referenced by the new public receipt is newer than the current
/// private read marker (`m.read.private`), the private marker is also updated
/// to match the public receipt's position.
/// Replaces the previous read receipt.
pub async fn readreceipt_update(
&self,
user_id: &UserId,
room_id: &RoomId,
event: &ReceiptEvent,
) {
debug!(target: "readreceipt", %room_id, %user_id, "Updating read receipt in database.");
// 2. Find the maximum PDU count for the m.read event(s) referenced in the new receipt
let mut max_new_public_pdu_count: Option<PduCount> = None;
for (event_id, receipts) in event.content.0.iter() {
// Check if this event_id has an m.read receipt for the target user
if let Some(user_receipts) = receipts.get(&ReceiptType::Read) {
if user_receipts.contains_key(user_id) {
// Try to get the PDU count (timeline position) for this event_id
match self.services.timeline.get_pdu_count(event_id).await {
Ok(count) => {
// Update the maximum count found so far
let current_max = max_new_public_pdu_count.unwrap_or(PduCount::Normal(0));
max_new_public_pdu_count = Some(current_max.max(count));
debug!(target: "readreceipt", %room_id, %user_id, %event_id, count, "Found PDU count for new public receipt event.");
}
Err(e) => {
warn!(
target: "readreceipt", %room_id, %user_id, %event_id,
"Failed to get PDU count for event ID from new public read receipt: {}",
e
);
}
}
}
}
}
// Flush the sending queue for the room to notify clients
if let Err(e) = self.services.sending.flush_room(room_id).await {
warn!(target: "readreceipt", %room_id, %user_id, "Failed to flush room after read receipt update: {}", e);
}
self.db.readreceipt_update(user_id, room_id, event).await;
self.services
.sending
.flush_room(room_id)
.await
.expect("room flush failed");
}
/// Gets the latest private read receipt from the user in the room

View File

@@ -10,8 +10,8 @@
use tokio::sync::Mutex;
use crate::{
account_data, admin, announcements, appservice, client, config, emergency, federation,
globals, key_backups,
account_data, admin, analytics, announcements, appservice, client, config, emergency,
federation, globals, key_backups,
manager::Manager,
media, moderation, presence, pusher, resolver, rooms, sending, server_keys, service,
service::{Args, Map, Service},
@@ -21,6 +21,7 @@
pub struct Services {
pub account_data: Arc<account_data::Service>,
pub admin: Arc<admin::Service>,
pub analytics: Arc<analytics::Service>,
pub appservice: Arc<appservice::Service>,
pub config: Arc<config::Service>,
pub client: Arc<client::Service>,
@@ -68,6 +69,7 @@ macro_rules! build {
Ok(Arc::new(Self {
account_data: build!(account_data::Service),
admin: build!(admin::Service),
analytics: build!(analytics::Service),
appservice: build!(appservice::Service),
resolver: build!(resolver::Service),
client: build!(client::Service),