Convert EventInternalMetadata to use Arc<RwLock<_>> (#19669)

This moves the reference counting from PyO3 into standard Rust types,
allowing the class to be used natively from Rust without needing a
Python runtime.
This commit is contained in:
Erik Johnston
2026-04-16 10:59:39 +01:00
committed by GitHub
parent 71781de707
commit 2d015f78ea
2 changed files with 374 additions and 162 deletions

1
changelog.d/19669.misc Normal file
View File

@@ -0,0 +1 @@
Convert `EventInternalMetadata` to use `Arc<RwLock<_>>`.

View File

@@ -32,12 +32,16 @@
//! attributes, but for small number of keys is actually faster than using a
//! hash or btree map.
use std::{num::NonZeroI64, ops::Deref};
use std::{
num::NonZeroI64,
ops::Deref,
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
};
use anyhow::Context;
use log::warn;
use pyo3::{
exceptions::PyAttributeError,
exceptions::{PyAttributeError, PyRuntimeError},
pybacked::PyBackedStr,
pyclass, pymethods,
types::{PyAnyMethods, PyDict, PyDictMethods, PyString},
@@ -235,19 +239,6 @@ macro_rules! get_property_opt {
};
}
/// Helper macro to find the given field in internal metadata, raising an
/// attribute error if not found.
macro_rules! get_property {
($self:expr, $name:ident) => {
get_property_opt!($self, $name).ok_or_else(|| {
PyAttributeError::new_err(format!(
"'EventInternalMetadata' has no attribute '{}'",
stringify!($name),
))
})
};
}
/// Helper macro to set the give field.
macro_rules! set_property {
($self:expr, $name:ident, $obj:expr) => {
@@ -262,27 +253,240 @@ macro_rules! set_property {
};
}
#[pyclass]
/// The inner data for `EventInternalMetadata`, containing all fields and
/// business logic with pure Rust types.
#[derive(Clone)]
pub struct EventInternalMetadata {
struct EventInternalMetadataInner {
/// The fields of internal metadata. This functions as a mapping.
data: Vec<EventInternalMetadataData>,
/// The stream ordering of this event. None, until it has been persisted.
#[pyo3(get, set)]
stream_ordering: Option<NonZeroI64>,
#[pyo3(get, set)]
instance_name: Option<String>,
pub stream_ordering: Option<NonZeroI64>,
pub instance_name: Option<String>,
/// The event ID of the redaction event, if this event has been redacted.
/// This is set dynamically at load time and is not persisted to the database.
#[pyo3(get, set)]
redacted_by: Option<String>,
pub redacted_by: Option<String>,
/// whether this event is an outlier (ie, whether we have the state at that
/// point in the DAG)
#[pyo3(get, set)]
outlier: bool,
pub outlier: bool,
}
impl EventInternalMetadataInner {
pub fn is_outlier(&self) -> bool {
self.outlier
}
/// Whether this event is an out-of-band membership.
///
/// OOB memberships are a special case of outlier events: they are
/// membership events for federated rooms that we aren't full members of.
/// Examples include invites received over federation, and rejections for
/// such invites.
///
/// The concept of an OOB membership is needed because these events need to
/// be processed as if they're new regular events (e.g. updating membership
/// state in the database, relaying to clients via /sync, etc) despite being
/// outliers.
///
/// See also
/// https://element-hq.github.io/synapse/develop/development/room-dag-concepts.html#out-of-band-membership-events.
///
/// (Added in synapse 0.99.0, so may be unreliable for events received
/// before that)
pub fn is_out_of_band_membership(&self) -> bool {
get_property_opt!(self, OutOfBandMembership)
.copied()
.unwrap_or(false)
}
/// Whether this server should send the event on behalf of another server.
/// This is used by the federation "send_join" API to forward the initial
/// join event for a server in the room.
///
/// returns a str with the name of the server this event is sent on behalf
/// of.
pub fn get_send_on_behalf_of(&self) -> Option<&str> {
get_property_opt!(self, SendOnBehalfOf).map(|a| a.deref())
}
/// Whether the redaction event needs to be rechecked when fetching
/// from the database.
///
/// Starting in room v3 redaction events are accepted up front, and later
/// checked to see if the redacter and redactee's domains match.
///
/// If the sender of the redaction event is allowed to redact any event
/// due to auth rules, then this will always return false.
pub fn need_to_check_redaction(&self) -> bool {
get_property_opt!(self, RecheckRedaction)
.copied()
.unwrap_or(false)
}
/// Whether the event has been soft failed.
///
/// Soft failed events should be handled as usual, except:
/// 1. They should not go down sync or event streams, or generally sent to
/// clients.
/// 2. They should not be added to the forward extremities (and therefore
/// not to current state).
pub fn is_soft_failed(&self) -> bool {
get_property_opt!(self, SoftFailed)
.copied()
.unwrap_or(false)
}
/// Whether the event, if ours, should be sent to other clients and servers.
///
/// This is used for sending dummy events internally. Servers and clients
/// can still explicitly fetch the event.
pub fn should_proactively_send(&self) -> bool {
get_property_opt!(self, ProactivelySend)
.copied()
.unwrap_or(true)
}
/// Whether the event has been redacted.
///
/// This is used for efficiently checking whether an event has been marked
/// as redacted without needing to make another database call.
pub fn is_redacted(&self) -> bool {
get_property_opt!(self, Redacted).copied().unwrap_or(false)
}
/// Whether this event can trigger a push notification
pub fn is_notifiable(&self) -> bool {
!self.outlier || self.is_out_of_band_membership()
}
pub fn get_out_of_band_membership(&self) -> Option<bool> {
get_property_opt!(self, OutOfBandMembership).copied()
}
pub fn get_recheck_redaction(&self) -> Option<bool> {
get_property_opt!(self, RecheckRedaction).copied()
}
pub fn get_soft_failed(&self) -> Option<bool> {
get_property_opt!(self, SoftFailed).copied()
}
pub fn get_proactively_send(&self) -> Option<bool> {
get_property_opt!(self, ProactivelySend).copied()
}
pub fn get_policy_server_spammy(&self) -> bool {
get_property_opt!(self, PolicyServerSpammy)
.copied()
.unwrap_or(false)
}
pub fn get_redacted(&self) -> Option<bool> {
get_property_opt!(self, Redacted).copied()
}
pub fn get_txn_id(&self) -> Option<&str> {
get_property_opt!(self, TxnId).map(|s| s.deref())
}
pub fn get_delay_id(&self) -> Option<&str> {
get_property_opt!(self, DelayId).map(|s| s.deref())
}
pub fn get_token_id(&self) -> Option<i64> {
get_property_opt!(self, TokenId).copied()
}
pub fn get_device_id(&self) -> Option<&str> {
get_property_opt!(self, DeviceId).map(|s| s.deref())
}
pub fn set_out_of_band_membership(&mut self, obj: bool) {
set_property!(self, OutOfBandMembership, obj);
}
pub fn set_send_on_behalf_of(&mut self, obj: String) {
set_property!(self, SendOnBehalfOf, obj.into_boxed_str());
}
pub fn set_recheck_redaction(&mut self, obj: bool) {
set_property!(self, RecheckRedaction, obj);
}
pub fn set_soft_failed(&mut self, obj: bool) {
set_property!(self, SoftFailed, obj);
}
pub fn set_proactively_send(&mut self, obj: bool) {
set_property!(self, ProactivelySend, obj);
}
pub fn set_policy_server_spammy(&mut self, obj: bool) {
set_property!(self, PolicyServerSpammy, obj);
}
fn get_spam_checker_spammy(&self) -> bool {
get_property_opt!(self, SpamCheckerSpammy)
.copied()
.unwrap_or(false)
}
fn set_spam_checker_spammy(&mut self, obj: bool) {
set_property!(self, SpamCheckerSpammy, obj);
}
pub fn set_redacted(&mut self, obj: bool) {
set_property!(self, Redacted, obj);
}
pub fn set_txn_id(&mut self, obj: String) {
set_property!(self, TxnId, obj.into_boxed_str());
}
pub fn set_delay_id(&mut self, obj: String) {
set_property!(self, DelayId, obj.into_boxed_str());
}
pub fn set_token_id(&mut self, obj: i64) {
set_property!(self, TokenId, obj);
}
pub fn set_device_id(&mut self, obj: String) {
set_property!(self, DeviceId, obj.into_boxed_str());
}
}
#[pyclass(frozen)]
#[derive(Clone)]
pub struct EventInternalMetadata {
inner: Arc<RwLock<EventInternalMetadataInner>>,
}
impl EventInternalMetadata {
fn read_inner(&self) -> PyResult<RwLockReadGuard<'_, EventInternalMetadataInner>> {
self.inner
.read()
.map_err(|_| PyRuntimeError::new_err("EventInternalMetadata lock poisoned"))
}
/// Get a write lock on the inner data.
///
/// Note that callers should be careful not to panic while holding the write
/// lock, as this will poison the lock.q
fn write_inner(&self) -> PyResult<RwLockWriteGuard<'_, EventInternalMetadataInner>> {
self.inner
.write()
.map_err(|_| PyRuntimeError::new_err("EventInternalMetadata lock poisoned"))
}
}
/// Helper to convert `None` to an `AttributeError` for a property getter.
fn attr_err<T>(val: Option<T>, name: &str) -> PyResult<T> {
val.ok_or_else(|| {
PyAttributeError::new_err(format!("'EventInternalMetadata' has no attribute '{name}'",))
})
}
#[pymethods]
@@ -304,25 +508,33 @@ impl EventInternalMetadata {
data.shrink_to_fit();
Ok(EventInternalMetadata {
data,
stream_ordering: None,
instance_name: None,
redacted_by: None,
outlier: false,
inner: Arc::new(RwLock::new(EventInternalMetadataInner {
data,
stream_ordering: None,
instance_name: None,
redacted_by: None,
outlier: false,
})),
})
}
fn copy(&self) -> Self {
self.clone()
fn copy(&self) -> PyResult<Self> {
let guard = self.read_inner()?;
Ok(EventInternalMetadata {
inner: Arc::new(RwLock::new(guard.clone())),
})
}
/// Get a dict holding the data stored in the `internal_metadata` column in the database.
/// Get a dict holding the data stored in the `internal_metadata` column in
/// the database.
///
/// Note that `outlier` and `stream_ordering` are stored in separate columns so are not returned here.
/// Note that `outlier` and `stream_ordering` are stored in separate columns
/// so are not returned here.
fn get_dict(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
let guard = self.read_inner()?;
let dict = PyDict::new(py);
for entry in &self.data {
for entry in &guard.data {
let (key, value) = entry.to_python_pair(py);
dict.set_item(key, value)?;
}
@@ -330,219 +542,218 @@ impl EventInternalMetadata {
Ok(dict.into())
}
fn is_outlier(&self) -> bool {
self.outlier
fn is_outlier(&self) -> PyResult<bool> {
Ok(self.read_inner()?.is_outlier())
}
/// Whether this event is an out-of-band membership.
///
/// OOB memberships are a special case of outlier events: they are
/// membership events for federated rooms that we aren't full members of.
/// Examples include invites received over federation, and rejections for
/// such invites.
///
/// The concept of an OOB membership is needed because these events need to
/// be processed as if they're new regular events (e.g. updating membership
/// state in the database, relaying to clients via /sync, etc) despite being
/// outliers.
///
/// See also
/// https://element-hq.github.io/synapse/develop/development/room-dag-concepts.html#out-of-band-membership-events.
///
/// (Added in synapse 0.99.0, so may be unreliable for events received
/// before that)
fn is_out_of_band_membership(&self) -> bool {
get_property_opt!(self, OutOfBandMembership)
.copied()
.unwrap_or(false)
fn is_out_of_band_membership(&self) -> PyResult<bool> {
Ok(self.read_inner()?.is_out_of_band_membership())
}
/// Whether this server should send the event on behalf of another server.
/// This is used by the federation "send_join" API to forward the initial
/// join event for a server in the room.
///
/// returns a str with the name of the server this event is sent on behalf
/// of.
fn get_send_on_behalf_of(&self) -> Option<&str> {
let s = get_property_opt!(self, SendOnBehalfOf);
s.map(|a| a.deref())
fn get_send_on_behalf_of(&self) -> PyResult<Option<String>> {
Ok(self
.read_inner()?
.get_send_on_behalf_of()
.map(|s| s.to_owned()))
}
/// Whether the redaction event needs to be rechecked when fetching
/// from the database.
///
/// Starting in room v3 redaction events are accepted up front, and later
/// checked to see if the redacter and redactee's domains match.
///
/// If the sender of the redaction event is allowed to redact any event
/// due to auth rules, then this will always return false.
fn need_to_check_redaction(&self) -> bool {
get_property_opt!(self, RecheckRedaction)
.copied()
.unwrap_or(false)
fn need_to_check_redaction(&self) -> PyResult<bool> {
Ok(self.read_inner()?.need_to_check_redaction())
}
/// Whether the event has been soft failed.
///
/// Soft failed events should be handled as usual, except:
/// 1. They should not go down sync or event streams, or generally sent to
/// clients.
/// 2. They should not be added to the forward extremities (and therefore
/// not to current state).
fn is_soft_failed(&self) -> bool {
get_property_opt!(self, SoftFailed)
.copied()
.unwrap_or(false)
fn is_soft_failed(&self) -> PyResult<bool> {
Ok(self.read_inner()?.is_soft_failed())
}
/// Whether the event, if ours, should be sent to other clients and servers.
///
/// This is used for sending dummy events internally. Servers and clients
/// can still explicitly fetch the event.
fn should_proactively_send(&self) -> bool {
get_property_opt!(self, ProactivelySend)
.copied()
.unwrap_or(true)
fn should_proactively_send(&self) -> PyResult<bool> {
Ok(self.read_inner()?.should_proactively_send())
}
/// Whether the event has been redacted.
///
/// This is used for efficiently checking whether an event has been marked
/// as redacted without needing to make another database call.
fn is_redacted(&self) -> bool {
get_property_opt!(self, Redacted).copied().unwrap_or(false)
fn is_redacted(&self) -> PyResult<bool> {
Ok(self.read_inner()?.is_redacted())
}
/// Whether this event can trigger a push notification
fn is_notifiable(&self) -> bool {
!self.outlier || self.is_out_of_band_membership()
fn is_notifiable(&self) -> PyResult<bool> {
Ok(self.read_inner()?.is_notifiable())
}
// ** The following are the getters and setters of the various properties **
#[getter]
fn get_stream_ordering(&self) -> PyResult<Option<NonZeroI64>> {
Ok(self.read_inner()?.stream_ordering)
}
#[setter]
fn set_stream_ordering(&self, val: Option<NonZeroI64>) -> PyResult<()> {
self.write_inner()?.stream_ordering = val;
Ok(())
}
#[getter]
fn get_instance_name(&self) -> PyResult<Option<String>> {
Ok(self.read_inner()?.instance_name.clone())
}
#[setter]
fn set_instance_name(&self, val: Option<String>) -> PyResult<()> {
self.write_inner()?.instance_name = val;
Ok(())
}
#[getter]
fn get_redacted_by(&self) -> PyResult<Option<String>> {
Ok(self.read_inner()?.redacted_by.clone())
}
#[setter]
fn set_redacted_by(&self, val: Option<String>) -> PyResult<()> {
self.write_inner()?.redacted_by = val;
Ok(())
}
#[getter]
fn get_outlier(&self) -> PyResult<bool> {
Ok(self.read_inner()?.outlier)
}
#[setter]
fn set_outlier(&self, val: bool) -> PyResult<()> {
self.write_inner()?.outlier = val;
Ok(())
}
#[getter]
fn get_out_of_band_membership(&self) -> PyResult<bool> {
let bool = get_property!(self, OutOfBandMembership)?;
Ok(*bool)
attr_err(
self.read_inner()?.get_out_of_band_membership(),
"out_of_band_membership",
)
}
#[setter]
fn set_out_of_band_membership(&mut self, obj: bool) {
set_property!(self, OutOfBandMembership, obj);
fn set_out_of_band_membership(&self, obj: bool) -> PyResult<()> {
self.write_inner()?.set_out_of_band_membership(obj);
Ok(())
}
#[getter(send_on_behalf_of)]
fn getter_send_on_behalf_of(&self) -> PyResult<&str> {
let s = get_property!(self, SendOnBehalfOf)?;
Ok(s)
fn getter_send_on_behalf_of(&self) -> PyResult<String> {
let guard = self.read_inner()?;
attr_err(
guard.get_send_on_behalf_of().map(|s| s.to_owned()),
"send_on_behalf_of",
)
}
#[setter]
fn set_send_on_behalf_of(&mut self, obj: String) {
set_property!(self, SendOnBehalfOf, obj.into_boxed_str());
fn set_send_on_behalf_of(&self, obj: String) -> PyResult<()> {
self.write_inner()?.set_send_on_behalf_of(obj);
Ok(())
}
#[getter]
fn get_recheck_redaction(&self) -> PyResult<bool> {
let bool = get_property!(self, RecheckRedaction)?;
Ok(*bool)
attr_err(
self.read_inner()?.get_recheck_redaction(),
"recheck_redaction",
)
}
#[setter]
fn set_recheck_redaction(&mut self, obj: bool) {
set_property!(self, RecheckRedaction, obj);
fn set_recheck_redaction(&self, obj: bool) -> PyResult<()> {
self.write_inner()?.set_recheck_redaction(obj);
Ok(())
}
#[getter]
fn get_soft_failed(&self) -> PyResult<bool> {
let bool = get_property!(self, SoftFailed)?;
Ok(*bool)
attr_err(self.read_inner()?.get_soft_failed(), "soft_failed")
}
#[setter]
fn set_soft_failed(&mut self, obj: bool) {
set_property!(self, SoftFailed, obj);
fn set_soft_failed(&self, obj: bool) -> PyResult<()> {
self.write_inner()?.set_soft_failed(obj);
Ok(())
}
#[getter]
fn get_proactively_send(&self) -> PyResult<bool> {
let bool = get_property!(self, ProactivelySend)?;
Ok(*bool)
attr_err(
self.read_inner()?.get_proactively_send(),
"proactively_send",
)
}
#[setter]
fn set_proactively_send(&mut self, obj: bool) {
set_property!(self, ProactivelySend, obj);
fn set_proactively_send(&self, obj: bool) -> PyResult<()> {
self.write_inner()?.set_proactively_send(obj);
Ok(())
}
#[getter]
fn get_policy_server_spammy(&self) -> PyResult<bool> {
Ok(get_property_opt!(self, PolicyServerSpammy)
.copied()
.unwrap_or(false))
Ok(self.read_inner()?.get_policy_server_spammy())
}
#[setter]
fn set_policy_server_spammy(&mut self, obj: bool) {
set_property!(self, PolicyServerSpammy, obj);
fn set_policy_server_spammy(&self, obj: bool) -> PyResult<()> {
self.write_inner()?.set_policy_server_spammy(obj);
Ok(())
}
#[getter]
fn get_spam_checker_spammy(&self) -> PyResult<bool> {
Ok(get_property_opt!(self, SpamCheckerSpammy)
.copied()
.unwrap_or(false))
Ok(self.read_inner()?.get_spam_checker_spammy())
}
#[setter]
fn set_spam_checker_spammy(&mut self, obj: bool) {
set_property!(self, SpamCheckerSpammy, obj);
fn set_spam_checker_spammy(&self, obj: bool) -> PyResult<()> {
self.write_inner()?.set_spam_checker_spammy(obj);
Ok(())
}
#[getter]
fn get_redacted(&self) -> PyResult<bool> {
let bool = get_property!(self, Redacted)?;
Ok(*bool)
attr_err(self.read_inner()?.get_redacted(), "redacted")
}
#[setter]
fn set_redacted(&mut self, obj: bool) {
set_property!(self, Redacted, obj);
fn set_redacted(&self, obj: bool) -> PyResult<()> {
self.write_inner()?.set_redacted(obj);
Ok(())
}
/// The transaction ID, if it was set when the event was created.
#[getter]
fn get_txn_id(&self) -> PyResult<&str> {
let s = get_property!(self, TxnId)?;
Ok(s)
fn get_txn_id(&self) -> PyResult<String> {
let guard = self.read_inner()?;
attr_err(guard.get_txn_id().map(|s| s.to_owned()), "txn_id")
}
#[setter]
fn set_txn_id(&mut self, obj: String) {
set_property!(self, TxnId, obj.into_boxed_str());
fn set_txn_id(&self, obj: String) -> PyResult<()> {
self.write_inner()?.set_txn_id(obj);
Ok(())
}
/// The delay ID, set only if the event was a delayed event.
#[getter]
fn get_delay_id(&self) -> PyResult<&str> {
let s = get_property!(self, DelayId)?;
Ok(s)
fn get_delay_id(&self) -> PyResult<String> {
let guard = self.read_inner()?;
attr_err(guard.get_delay_id().map(|s| s.to_owned()), "delay_id")
}
#[setter]
fn set_delay_id(&mut self, obj: String) {
set_property!(self, DelayId, obj.into_boxed_str());
fn set_delay_id(&self, obj: String) -> PyResult<()> {
self.write_inner()?.set_delay_id(obj);
Ok(())
}
/// The access token ID of the user who sent this event, if any.
#[getter]
fn get_token_id(&self) -> PyResult<i64> {
let r = get_property!(self, TokenId)?;
Ok(*r)
attr_err(self.read_inner()?.get_token_id(), "token_id")
}
#[setter]
fn set_token_id(&mut self, obj: i64) {
set_property!(self, TokenId, obj);
fn set_token_id(&self, obj: i64) -> PyResult<()> {
self.write_inner()?.set_token_id(obj);
Ok(())
}
/// The device ID of the user who sent this event, if any.
#[getter]
fn get_device_id(&self) -> PyResult<&str> {
let s = get_property!(self, DeviceId)?;
Ok(s)
fn get_device_id(&self) -> PyResult<String> {
let guard = self.read_inner()?;
attr_err(guard.get_device_id().map(|s| s.to_owned()), "device_id")
}
#[setter]
fn set_device_id(&mut self, obj: String) {
set_property!(self, DeviceId, obj.into_boxed_str());
fn set_device_id(&self, obj: String) -> PyResult<()> {
self.write_inner()?.set_device_id(obj);
Ok(())
}
}