mirror of
https://forgejo.ellis.link/continuwuation/continuwuity/
synced 2026-04-02 11:35:58 +00:00
Compare commits
17 Commits
renovate/n
...
aranje/ill
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4f651fc68d | ||
|
|
ccfc16e269 | ||
|
|
c5aa01994c | ||
|
|
4e2965116b | ||
|
|
4c01e74df8 | ||
|
|
47152f854c | ||
|
|
6ca96742fa | ||
|
|
49c26bee73 | ||
|
|
e5cae9108a | ||
|
|
0360334013 | ||
|
|
a928662cd6 | ||
|
|
1915c484d5 | ||
|
|
31f703d4bf | ||
|
|
4a3d6fc0ec | ||
|
|
2fb26f9276 | ||
|
|
84040f115e | ||
|
|
8977c5b796 |
2
LICENSE
2
LICENSE
@@ -187,7 +187,7 @@
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright 2023 Continuwuity Team and contributors
|
||||
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
@@ -1787,11 +1787,9 @@
|
||||
#stream_amplification = 1024
|
||||
|
||||
# Number of sender task workers; determines sender parallelism. Default is
|
||||
# '0' which means the value is determined internally, likely matching the
|
||||
# number of tokio worker-threads or number of cores, etc. Override by
|
||||
# setting a non-zero value.
|
||||
# core count. Override by setting a different value.
|
||||
#
|
||||
#sender_workers = 0
|
||||
#sender_workers = core count
|
||||
|
||||
# Enables listener sockets; can be set to false to disable listening. This
|
||||
# option is intended for developer/diagnostic purposes only.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use conduwuit::{Err, Result};
|
||||
use futures::StreamExt;
|
||||
use ruma::OwnedRoomId;
|
||||
use ruma::{OwnedRoomId, OwnedRoomOrAliasId};
|
||||
|
||||
use crate::{PAGE_SIZE, admin_command, get_room_info};
|
||||
|
||||
@@ -82,3 +82,185 @@ pub(super) async fn exists(&self, room_id: OwnedRoomId) -> Result {
|
||||
|
||||
self.write_str(&format!("{result}")).await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn purge_sync_tokens(&self, room: OwnedRoomOrAliasId) -> Result {
|
||||
// Resolve the room ID from the room or alias ID
|
||||
let room_id = self.services.rooms.alias.resolve(&room).await?;
|
||||
|
||||
// Delete all tokens for this room using the service method
|
||||
let Ok(deleted_count) = self.services.rooms.user.delete_room_tokens(&room_id).await else {
|
||||
return Err!("Failed to delete sync tokens for room {}", room_id.as_str());
|
||||
};
|
||||
|
||||
self.write_str(&format!(
|
||||
"Successfully deleted {deleted_count} sync tokens for room {}",
|
||||
room_id.as_str()
|
||||
))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Target options for room purging
|
||||
#[derive(Default, Debug, clap::ValueEnum, Clone)]
|
||||
pub enum RoomTargetOption {
|
||||
#[default]
|
||||
/// Target all rooms
|
||||
All,
|
||||
/// Target only disabled rooms
|
||||
DisabledOnly,
|
||||
/// Target only banned rooms
|
||||
BannedOnly,
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn purge_all_sync_tokens(
|
||||
&self,
|
||||
target_option: Option<RoomTargetOption>,
|
||||
execute: bool,
|
||||
) -> Result {
|
||||
use conduwuit::{debug, info};
|
||||
|
||||
let mode = if !execute { "Simulating" } else { "Starting" };
|
||||
|
||||
// strictly, we should check if these reach the max value after the loop and
|
||||
// warn the user that the count is too large
|
||||
let mut total_rooms_checked: usize = 0;
|
||||
let mut total_tokens_deleted: usize = 0;
|
||||
let mut error_count: u32 = 0;
|
||||
let mut skipped_rooms: usize = 0;
|
||||
|
||||
info!("{} purge of sync tokens", mode);
|
||||
|
||||
// Get all rooms in the server
|
||||
let all_rooms = self
|
||||
.services
|
||||
.rooms
|
||||
.metadata
|
||||
.iter_ids()
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
info!("Found {} rooms total on the server", all_rooms.len());
|
||||
|
||||
// Filter rooms based on options
|
||||
let mut rooms = Vec::new();
|
||||
for room_id in all_rooms {
|
||||
if let Some(target) = &target_option {
|
||||
match target {
|
||||
| RoomTargetOption::DisabledOnly => {
|
||||
if !self.services.rooms.metadata.is_disabled(room_id).await {
|
||||
debug!("Skipping room {} as it's not disabled", room_id.as_str());
|
||||
skipped_rooms = skipped_rooms.saturating_add(1);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
| RoomTargetOption::BannedOnly => {
|
||||
if !self.services.rooms.metadata.is_banned(room_id).await {
|
||||
debug!("Skipping room {} as it's not banned", room_id.as_str());
|
||||
skipped_rooms = skipped_rooms.saturating_add(1);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
| RoomTargetOption::All => {},
|
||||
}
|
||||
}
|
||||
|
||||
rooms.push(room_id);
|
||||
}
|
||||
|
||||
// Total number of rooms we'll be checking
|
||||
let total_rooms = rooms.len();
|
||||
info!(
|
||||
"Processing {} rooms after filtering (skipped {} rooms)",
|
||||
total_rooms, skipped_rooms
|
||||
);
|
||||
|
||||
// Process each room
|
||||
for room_id in rooms {
|
||||
total_rooms_checked = total_rooms_checked.saturating_add(1);
|
||||
|
||||
// Log progress periodically
|
||||
if total_rooms_checked.is_multiple_of(100) || total_rooms_checked == total_rooms {
|
||||
info!(
|
||||
"Progress: {}/{} rooms checked, {} tokens {}",
|
||||
total_rooms_checked,
|
||||
total_rooms,
|
||||
total_tokens_deleted,
|
||||
if !execute { "would be deleted" } else { "deleted" }
|
||||
);
|
||||
}
|
||||
|
||||
// In dry run mode, just count what would be deleted, don't actually delete
|
||||
debug!(
|
||||
"Room {}: {}",
|
||||
room_id.as_str(),
|
||||
if !execute {
|
||||
"would purge sync tokens"
|
||||
} else {
|
||||
"purging sync tokens"
|
||||
}
|
||||
);
|
||||
|
||||
if !execute {
|
||||
// For dry run mode, count tokens without deleting
|
||||
match self.services.rooms.user.count_room_tokens(room_id).await {
|
||||
| Ok(count) =>
|
||||
if count > 0 {
|
||||
debug!(
|
||||
"Would delete {} sync tokens for room {}",
|
||||
count,
|
||||
room_id.as_str()
|
||||
);
|
||||
total_tokens_deleted = total_tokens_deleted.saturating_add(count);
|
||||
} else {
|
||||
debug!("No sync tokens found for room {}", room_id.as_str());
|
||||
},
|
||||
| Err(e) => {
|
||||
debug!("Error counting sync tokens for room {}: {:?}", room_id.as_str(), e);
|
||||
error_count = error_count.saturating_add(1);
|
||||
},
|
||||
}
|
||||
} else {
|
||||
// Real deletion mode
|
||||
match self.services.rooms.user.delete_room_tokens(room_id).await {
|
||||
| Ok(count) =>
|
||||
if count > 0 {
|
||||
debug!("Deleted {} sync tokens for room {}", count, room_id.as_str());
|
||||
total_tokens_deleted = total_tokens_deleted.saturating_add(count);
|
||||
} else {
|
||||
debug!("No sync tokens found for room {}", room_id.as_str());
|
||||
},
|
||||
| Err(e) => {
|
||||
debug!("Error purging sync tokens for room {}: {:?}", room_id.as_str(), e);
|
||||
error_count = error_count.saturating_add(1);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let action = if !execute { "would be deleted" } else { "deleted" };
|
||||
info!(
|
||||
"Finished {}: checked {} rooms out of {} total, {} tokens {}, errors: {}",
|
||||
if !execute {
|
||||
"purge simulation"
|
||||
} else {
|
||||
"purging sync tokens"
|
||||
},
|
||||
total_rooms_checked,
|
||||
total_rooms,
|
||||
total_tokens_deleted,
|
||||
action,
|
||||
error_count
|
||||
);
|
||||
|
||||
self.write_str(&format!(
|
||||
"Finished {}: checked {} rooms out of {} total, {} tokens {}, errors: {}",
|
||||
if !execute { "simulation" } else { "purging sync tokens" },
|
||||
total_rooms_checked,
|
||||
total_rooms,
|
||||
total_tokens_deleted,
|
||||
action,
|
||||
error_count
|
||||
))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -5,8 +5,9 @@
|
||||
mod moderation;
|
||||
|
||||
use clap::Subcommand;
|
||||
use commands::RoomTargetOption;
|
||||
use conduwuit::Result;
|
||||
use ruma::OwnedRoomId;
|
||||
use ruma::{OwnedRoomId, OwnedRoomOrAliasId};
|
||||
|
||||
use self::{
|
||||
alias::RoomAliasCommand, directory::RoomDirectoryCommand, info::RoomInfoCommand,
|
||||
@@ -60,4 +61,25 @@ pub enum RoomCommand {
|
||||
Exists {
|
||||
room_id: OwnedRoomId,
|
||||
},
|
||||
|
||||
/// - Delete all sync tokens for a room
|
||||
PurgeSyncTokens {
|
||||
/// Room ID or alias to purge sync tokens for
|
||||
#[arg(value_parser)]
|
||||
room: OwnedRoomOrAliasId,
|
||||
},
|
||||
|
||||
/// - Delete sync tokens for all rooms that have no local users
|
||||
///
|
||||
/// By default, processes all empty rooms.
|
||||
PurgeAllSyncTokens {
|
||||
/// Target specific room types
|
||||
#[arg(long, value_enum)]
|
||||
target_option: Option<RoomTargetOption>,
|
||||
|
||||
/// Execute token deletions. Otherwise,
|
||||
/// Performs a dry run without actually deleting any tokens
|
||||
#[arg(long)]
|
||||
execute: bool,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -65,6 +65,7 @@ pub(super) async fn load_joined_room(
|
||||
and `join*` functions are used to perform steps in parallel which do not depend on each other.
|
||||
*/
|
||||
|
||||
let insert_lock = services.rooms.timeline.mutex_insert.lock(room_id).await;
|
||||
let (
|
||||
account_data,
|
||||
ephemeral,
|
||||
@@ -82,6 +83,7 @@ pub(super) async fn load_joined_room(
|
||||
)
|
||||
.boxed()
|
||||
.await?;
|
||||
drop(insert_lock);
|
||||
|
||||
if !timeline.is_empty() || !state_events.is_empty() {
|
||||
trace!(
|
||||
|
||||
@@ -2059,12 +2059,10 @@ pub struct Config {
|
||||
pub stream_amplification: usize,
|
||||
|
||||
/// Number of sender task workers; determines sender parallelism. Default is
|
||||
/// '0' which means the value is determined internally, likely matching the
|
||||
/// number of tokio worker-threads or number of cores, etc. Override by
|
||||
/// setting a non-zero value.
|
||||
/// core count. Override by setting a different value.
|
||||
///
|
||||
/// default: 0
|
||||
#[serde(default)]
|
||||
/// default: core count
|
||||
#[serde(default = "default_sender_workers")]
|
||||
pub sender_workers: usize,
|
||||
|
||||
/// Enables listener sockets; can be set to false to disable listening. This
|
||||
@@ -2592,45 +2590,47 @@ fn default_database_backups_to_keep() -> i16 { 1 }
|
||||
|
||||
fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) }
|
||||
|
||||
fn default_db_cache_capacity_mb() -> f64 { 128.0 + parallelism_scaled_f64(64.0) }
|
||||
fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) }
|
||||
|
||||
fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(10_000).saturating_add(100_000) }
|
||||
fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(100_000) }
|
||||
|
||||
fn default_cache_capacity_modifier() -> f64 { 1.0 }
|
||||
|
||||
fn default_auth_chain_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(10_000).saturating_add(100_000)
|
||||
}
|
||||
|
||||
fn default_shorteventid_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(50_000).saturating_add(100_000)
|
||||
}
|
||||
|
||||
fn default_shorteventid_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(100_000).saturating_add(100_000)
|
||||
}
|
||||
|
||||
fn default_eventidshort_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(25_000).saturating_add(100_000)
|
||||
parallelism_scaled_u32(50_000).saturating_add(100_000)
|
||||
}
|
||||
|
||||
fn default_eventid_pdu_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(25_000).saturating_add(100_000)
|
||||
parallelism_scaled_u32(50_000).saturating_add(100_000)
|
||||
}
|
||||
|
||||
fn default_shortstatekey_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(10_000).saturating_add(100_000)
|
||||
parallelism_scaled_u32(50_000).saturating_add(100_000)
|
||||
}
|
||||
|
||||
fn default_statekeyshort_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(10_000).saturating_add(100_000)
|
||||
parallelism_scaled_u32(50_000).saturating_add(100_000)
|
||||
}
|
||||
|
||||
fn default_servernameevent_data_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(100_000).saturating_add(500_000)
|
||||
parallelism_scaled_u32(100_000).saturating_add(100_000)
|
||||
}
|
||||
|
||||
fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(100) }
|
||||
fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(500).clamp(100, 12000) }
|
||||
|
||||
fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) }
|
||||
fn default_roomid_spacehierarchy_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(500).clamp(100, 12000)
|
||||
}
|
||||
|
||||
fn default_dns_cache_entries() -> u32 { 32768 }
|
||||
fn default_dns_cache_entries() -> u32 { 327_680 }
|
||||
|
||||
fn default_dns_min_ttl() -> u64 { 60 * 180 }
|
||||
|
||||
@@ -2838,15 +2838,26 @@ fn default_admin_log_capture() -> String {
|
||||
|
||||
fn default_admin_room_tag() -> String { "m.server_notice".to_owned() }
|
||||
|
||||
#[must_use]
|
||||
#[allow(clippy::as_conversions, clippy::cast_precision_loss)]
|
||||
fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) }
|
||||
pub fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) }
|
||||
|
||||
fn parallelism_scaled_u32(val: u32) -> u32 {
|
||||
let val = val.try_into().expect("failed to cast u32 to usize");
|
||||
parallelism_scaled(val).try_into().unwrap_or(u32::MAX)
|
||||
#[must_use]
|
||||
#[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
|
||||
pub fn parallelism_scaled_u32(val: u32) -> u32 {
|
||||
val.saturating_mul(sys::available_parallelism() as u32)
|
||||
}
|
||||
|
||||
fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) }
|
||||
#[must_use]
|
||||
#[allow(clippy::as_conversions, clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
|
||||
pub fn parallelism_scaled_i32(val: i32) -> i32 {
|
||||
val.saturating_mul(sys::available_parallelism() as i32)
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn parallelism_scaled(val: usize) -> usize {
|
||||
val.saturating_mul(sys::available_parallelism())
|
||||
}
|
||||
|
||||
fn default_trusted_server_batch_size() -> usize { 256 }
|
||||
|
||||
@@ -2866,6 +2877,8 @@ fn default_stream_width_scale() -> f32 { 1.0 }
|
||||
|
||||
fn default_stream_amplification() -> usize { 1024 }
|
||||
|
||||
fn default_sender_workers() -> usize { parallelism_scaled(1) }
|
||||
|
||||
fn default_client_receive_timeout() -> u64 { 75 }
|
||||
|
||||
fn default_client_request_timeout() -> u64 { 180 }
|
||||
|
||||
@@ -29,7 +29,7 @@ fn descriptor_cf_options(
|
||||
set_table_options(&mut opts, &desc, cache)?;
|
||||
|
||||
opts.set_min_write_buffer_number(1);
|
||||
opts.set_max_write_buffer_number(2);
|
||||
opts.set_max_write_buffer_number(3);
|
||||
opts.set_write_buffer_size(desc.write_size);
|
||||
|
||||
opts.set_target_file_size_base(desc.file_size);
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
extract::State,
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use conduwuit::{Result, debug, debug_error, debug_warn, err, error, trace};
|
||||
use conduwuit::{Result, debug_warn, err, error, info, trace};
|
||||
use conduwuit_service::Services;
|
||||
use futures::FutureExt;
|
||||
use http::{Method, StatusCode, Uri};
|
||||
@@ -102,11 +102,11 @@ fn handle_result(method: &Method, uri: &Uri, result: Response) -> Result<Respons
|
||||
let reason = status.canonical_reason().unwrap_or("Unknown Reason");
|
||||
|
||||
if status.is_server_error() {
|
||||
error!(%method, %uri, "{code} {reason}");
|
||||
info!(%method, %uri, "{code} {reason}");
|
||||
} else if status.is_client_error() {
|
||||
debug_error!(%method, %uri, "{code} {reason}");
|
||||
info!(%method, %uri, "{code} {reason}");
|
||||
} else if status.is_redirection() {
|
||||
debug!(%method, %uri, "{code} {reason}");
|
||||
trace!(%method, %uri, "{code} {reason}");
|
||||
} else {
|
||||
trace!(%method, %uri, "{code} {reason}");
|
||||
}
|
||||
|
||||
@@ -100,7 +100,7 @@ pub async fn get_presence(&self, user_id: &UserId) -> Result<PresenceEvent> {
|
||||
/// Pings the presence of the given user in the given room, setting the
|
||||
/// specified state.
|
||||
pub async fn ping_presence(&self, user_id: &UserId, new_state: &PresenceState) -> Result<()> {
|
||||
const REFRESH_TIMEOUT: u64 = 60 * 1000;
|
||||
const REFRESH_TIMEOUT: u64 = 60 * 1000 * 4;
|
||||
|
||||
let last_presence = self.db.get_presence(user_id).await;
|
||||
let state_changed = match last_presence {
|
||||
|
||||
@@ -119,8 +119,12 @@ fn actual_dest_1(host_port: FedDest) -> Result<FedDest> {
|
||||
async fn actual_dest_2(&self, dest: &ServerName, cache: bool, pos: usize) -> Result<FedDest> {
|
||||
debug!("2: Hostname with included port");
|
||||
let (host, port) = dest.as_str().split_at(pos);
|
||||
self.conditional_query_and_cache(host, port.parse::<u16>().unwrap_or(8448), cache)
|
||||
.await?;
|
||||
self.conditional_query_and_cache(
|
||||
host,
|
||||
port.trim_start_matches(':').parse::<u16>().unwrap_or(8448),
|
||||
cache,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(FedDest::Named(
|
||||
host.to_owned(),
|
||||
@@ -165,8 +169,12 @@ async fn actual_dest_3_2(
|
||||
) -> Result<FedDest> {
|
||||
debug!("3.2: Hostname with port in .well-known file");
|
||||
let (host, port) = delegated.split_at(pos);
|
||||
self.conditional_query_and_cache(host, port.parse::<u16>().unwrap_or(8448), cache)
|
||||
.await?;
|
||||
self.conditional_query_and_cache(
|
||||
host,
|
||||
port.trim_start_matches(':').parse::<u16>().unwrap_or(8448),
|
||||
cache,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(FedDest::Named(
|
||||
host.to_owned(),
|
||||
|
||||
@@ -53,9 +53,9 @@ pub(super) fn build(server: &Arc<Server>, cache: Arc<Cache>) -> Result<Arc<Self>
|
||||
opts.cache_size = config.dns_cache_entries as usize;
|
||||
opts.preserve_intermediates = true;
|
||||
opts.negative_min_ttl = Some(Duration::from_secs(config.dns_min_ttl_nxdomain));
|
||||
opts.negative_max_ttl = Some(Duration::from_secs(60 * 60 * 24 * 30));
|
||||
opts.negative_max_ttl = Some(Duration::from_secs(60 * 60 * 24));
|
||||
opts.positive_min_ttl = Some(Duration::from_secs(config.dns_min_ttl));
|
||||
opts.positive_max_ttl = Some(Duration::from_secs(60 * 60 * 24 * 7));
|
||||
opts.positive_max_ttl = Some(Duration::from_secs(60 * 60 * 24));
|
||||
opts.timeout = Duration::from_secs(config.dns_timeout);
|
||||
opts.attempts = config.dns_attempts as usize;
|
||||
opts.try_tcp_on_error = config.dns_tcp_fallback;
|
||||
|
||||
@@ -80,7 +80,7 @@ pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
|
||||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 60 * 2;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 8;
|
||||
const MAX_DURATION: u64 = 60 * 60;
|
||||
if continue_exponential_backoff_secs(
|
||||
MIN_DURATION,
|
||||
MAX_DURATION,
|
||||
|
||||
@@ -46,7 +46,7 @@ pub(super) async fn handle_prev_pdu<'a, Pdu>(
|
||||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 5 * 60;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 24;
|
||||
const MAX_DURATION: u64 = 60 * 60;
|
||||
if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) {
|
||||
debug!(
|
||||
?tries,
|
||||
|
||||
@@ -197,6 +197,15 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
||||
.await;
|
||||
extremities.push(incoming_pdu.event_id().to_owned());
|
||||
|
||||
if extremities.is_empty() {
|
||||
info!(
|
||||
"Retained zero extremities when upgrading outlier PDU to timeline PDU with {} \
|
||||
previous events, event id: {}",
|
||||
incoming_pdu.prev_events.len(),
|
||||
incoming_pdu.event_id
|
||||
);
|
||||
}
|
||||
|
||||
debug!(
|
||||
"Retained {} extremities checked against {} prev_events",
|
||||
extremities.len(),
|
||||
|
||||
@@ -127,3 +127,63 @@ pub async fn get_token_shortstatehash(
|
||||
.await
|
||||
.deserialized()
|
||||
}
|
||||
|
||||
/// Count how many sync tokens exist for a room without deleting them
|
||||
///
|
||||
/// This is useful for dry runs to see how many tokens would be deleted
|
||||
#[implement(Service)]
|
||||
pub async fn count_room_tokens(&self, room_id: &RoomId) -> Result<usize> {
|
||||
use futures::TryStreamExt;
|
||||
|
||||
let shortroomid = self.services.short.get_shortroomid(room_id).await?;
|
||||
|
||||
// Create a prefix to search by - all entries for this room will start with its
|
||||
// short ID
|
||||
let prefix = &[shortroomid];
|
||||
|
||||
// Collect all keys into a Vec and count them
|
||||
let keys = self
|
||||
.db
|
||||
.roomsynctoken_shortstatehash
|
||||
.keys_prefix_raw(prefix)
|
||||
.map_ok(|_| ()) // We only need to count, not store the keys
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
Ok(keys.len())
|
||||
}
|
||||
|
||||
/// Delete all sync tokens associated with a room
|
||||
///
|
||||
/// This helps clean up the database as these tokens are never otherwise removed
|
||||
#[implement(Service)]
|
||||
pub async fn delete_room_tokens(&self, room_id: &RoomId) -> Result<usize> {
|
||||
use futures::TryStreamExt;
|
||||
|
||||
let shortroomid = self.services.short.get_shortroomid(room_id).await?;
|
||||
|
||||
// Create a prefix to search by - all entries for this room will start with its
|
||||
// short ID
|
||||
let prefix = &[shortroomid];
|
||||
|
||||
// Collect all keys into a Vec first, then delete them
|
||||
let keys = self
|
||||
.db
|
||||
.roomsynctoken_shortstatehash
|
||||
.keys_prefix_raw(prefix)
|
||||
.map_ok(|key| {
|
||||
// Clone the key since we can't store references in the Vec
|
||||
Vec::from(key)
|
||||
})
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
// Delete each key individually
|
||||
for key in &keys {
|
||||
self.db.roomsynctoken_shortstatehash.del(key);
|
||||
}
|
||||
|
||||
let count = keys.len();
|
||||
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user