diff --git a/changelog.d/19669.misc b/changelog.d/19669.misc new file mode 100644 index 0000000000..c0355c398b --- /dev/null +++ b/changelog.d/19669.misc @@ -0,0 +1 @@ +Convert `EventInternalMetadata` to use `Arc>`. diff --git a/rust/src/events/internal_metadata.rs b/rust/src/events/internal_metadata.rs index 503722955e..21d3b8c435 100644 --- a/rust/src/events/internal_metadata.rs +++ b/rust/src/events/internal_metadata.rs @@ -32,12 +32,16 @@ //! attributes, but for small number of keys is actually faster than using a //! hash or btree map. -use std::{num::NonZeroI64, ops::Deref}; +use std::{ + num::NonZeroI64, + ops::Deref, + sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, +}; use anyhow::Context; use log::warn; use pyo3::{ - exceptions::PyAttributeError, + exceptions::{PyAttributeError, PyRuntimeError}, pybacked::PyBackedStr, pyclass, pymethods, types::{PyAnyMethods, PyDict, PyDictMethods, PyString}, @@ -235,19 +239,6 @@ macro_rules! get_property_opt { }; } -/// Helper macro to find the given field in internal metadata, raising an -/// attribute error if not found. -macro_rules! get_property { - ($self:expr, $name:ident) => { - get_property_opt!($self, $name).ok_or_else(|| { - PyAttributeError::new_err(format!( - "'EventInternalMetadata' has no attribute '{}'", - stringify!($name), - )) - }) - }; -} - /// Helper macro to set the give field. macro_rules! set_property { ($self:expr, $name:ident, $obj:expr) => { @@ -262,27 +253,240 @@ macro_rules! set_property { }; } -#[pyclass] +/// The inner data for `EventInternalMetadata`, containing all fields and +/// business logic with pure Rust types. #[derive(Clone)] -pub struct EventInternalMetadata { +struct EventInternalMetadataInner { /// The fields of internal metadata. This functions as a mapping. data: Vec, /// The stream ordering of this event. None, until it has been persisted. - #[pyo3(get, set)] - stream_ordering: Option, - #[pyo3(get, set)] - instance_name: Option, + pub stream_ordering: Option, + pub instance_name: Option, /// The event ID of the redaction event, if this event has been redacted. /// This is set dynamically at load time and is not persisted to the database. - #[pyo3(get, set)] - redacted_by: Option, + pub redacted_by: Option, /// whether this event is an outlier (ie, whether we have the state at that /// point in the DAG) - #[pyo3(get, set)] - outlier: bool, + pub outlier: bool, +} + +impl EventInternalMetadataInner { + pub fn is_outlier(&self) -> bool { + self.outlier + } + + /// Whether this event is an out-of-band membership. + /// + /// OOB memberships are a special case of outlier events: they are + /// membership events for federated rooms that we aren't full members of. + /// Examples include invites received over federation, and rejections for + /// such invites. + /// + /// The concept of an OOB membership is needed because these events need to + /// be processed as if they're new regular events (e.g. updating membership + /// state in the database, relaying to clients via /sync, etc) despite being + /// outliers. + /// + /// See also + /// https://element-hq.github.io/synapse/develop/development/room-dag-concepts.html#out-of-band-membership-events. + /// + /// (Added in synapse 0.99.0, so may be unreliable for events received + /// before that) + pub fn is_out_of_band_membership(&self) -> bool { + get_property_opt!(self, OutOfBandMembership) + .copied() + .unwrap_or(false) + } + + /// Whether this server should send the event on behalf of another server. + /// This is used by the federation "send_join" API to forward the initial + /// join event for a server in the room. + /// + /// returns a str with the name of the server this event is sent on behalf + /// of. + pub fn get_send_on_behalf_of(&self) -> Option<&str> { + get_property_opt!(self, SendOnBehalfOf).map(|a| a.deref()) + } + + /// Whether the redaction event needs to be rechecked when fetching + /// from the database. + /// + /// Starting in room v3 redaction events are accepted up front, and later + /// checked to see if the redacter and redactee's domains match. + /// + /// If the sender of the redaction event is allowed to redact any event + /// due to auth rules, then this will always return false. + pub fn need_to_check_redaction(&self) -> bool { + get_property_opt!(self, RecheckRedaction) + .copied() + .unwrap_or(false) + } + + /// Whether the event has been soft failed. + /// + /// Soft failed events should be handled as usual, except: + /// 1. They should not go down sync or event streams, or generally sent to + /// clients. + /// 2. They should not be added to the forward extremities (and therefore + /// not to current state). + pub fn is_soft_failed(&self) -> bool { + get_property_opt!(self, SoftFailed) + .copied() + .unwrap_or(false) + } + + /// Whether the event, if ours, should be sent to other clients and servers. + /// + /// This is used for sending dummy events internally. Servers and clients + /// can still explicitly fetch the event. + pub fn should_proactively_send(&self) -> bool { + get_property_opt!(self, ProactivelySend) + .copied() + .unwrap_or(true) + } + + /// Whether the event has been redacted. + /// + /// This is used for efficiently checking whether an event has been marked + /// as redacted without needing to make another database call. + pub fn is_redacted(&self) -> bool { + get_property_opt!(self, Redacted).copied().unwrap_or(false) + } + + /// Whether this event can trigger a push notification + pub fn is_notifiable(&self) -> bool { + !self.outlier || self.is_out_of_band_membership() + } + + pub fn get_out_of_band_membership(&self) -> Option { + get_property_opt!(self, OutOfBandMembership).copied() + } + + pub fn get_recheck_redaction(&self) -> Option { + get_property_opt!(self, RecheckRedaction).copied() + } + + pub fn get_soft_failed(&self) -> Option { + get_property_opt!(self, SoftFailed).copied() + } + + pub fn get_proactively_send(&self) -> Option { + get_property_opt!(self, ProactivelySend).copied() + } + + pub fn get_policy_server_spammy(&self) -> bool { + get_property_opt!(self, PolicyServerSpammy) + .copied() + .unwrap_or(false) + } + + pub fn get_redacted(&self) -> Option { + get_property_opt!(self, Redacted).copied() + } + + pub fn get_txn_id(&self) -> Option<&str> { + get_property_opt!(self, TxnId).map(|s| s.deref()) + } + + pub fn get_delay_id(&self) -> Option<&str> { + get_property_opt!(self, DelayId).map(|s| s.deref()) + } + + pub fn get_token_id(&self) -> Option { + get_property_opt!(self, TokenId).copied() + } + + pub fn get_device_id(&self) -> Option<&str> { + get_property_opt!(self, DeviceId).map(|s| s.deref()) + } + + pub fn set_out_of_band_membership(&mut self, obj: bool) { + set_property!(self, OutOfBandMembership, obj); + } + + pub fn set_send_on_behalf_of(&mut self, obj: String) { + set_property!(self, SendOnBehalfOf, obj.into_boxed_str()); + } + + pub fn set_recheck_redaction(&mut self, obj: bool) { + set_property!(self, RecheckRedaction, obj); + } + + pub fn set_soft_failed(&mut self, obj: bool) { + set_property!(self, SoftFailed, obj); + } + + pub fn set_proactively_send(&mut self, obj: bool) { + set_property!(self, ProactivelySend, obj); + } + + pub fn set_policy_server_spammy(&mut self, obj: bool) { + set_property!(self, PolicyServerSpammy, obj); + } + + fn get_spam_checker_spammy(&self) -> bool { + get_property_opt!(self, SpamCheckerSpammy) + .copied() + .unwrap_or(false) + } + + fn set_spam_checker_spammy(&mut self, obj: bool) { + set_property!(self, SpamCheckerSpammy, obj); + } + + pub fn set_redacted(&mut self, obj: bool) { + set_property!(self, Redacted, obj); + } + + pub fn set_txn_id(&mut self, obj: String) { + set_property!(self, TxnId, obj.into_boxed_str()); + } + + pub fn set_delay_id(&mut self, obj: String) { + set_property!(self, DelayId, obj.into_boxed_str()); + } + + pub fn set_token_id(&mut self, obj: i64) { + set_property!(self, TokenId, obj); + } + + pub fn set_device_id(&mut self, obj: String) { + set_property!(self, DeviceId, obj.into_boxed_str()); + } +} + +#[pyclass(frozen)] +#[derive(Clone)] +pub struct EventInternalMetadata { + inner: Arc>, +} + +impl EventInternalMetadata { + fn read_inner(&self) -> PyResult> { + self.inner + .read() + .map_err(|_| PyRuntimeError::new_err("EventInternalMetadata lock poisoned")) + } + + /// Get a write lock on the inner data. + /// + /// Note that callers should be careful not to panic while holding the write + /// lock, as this will poison the lock.q + fn write_inner(&self) -> PyResult> { + self.inner + .write() + .map_err(|_| PyRuntimeError::new_err("EventInternalMetadata lock poisoned")) + } +} + +/// Helper to convert `None` to an `AttributeError` for a property getter. +fn attr_err(val: Option, name: &str) -> PyResult { + val.ok_or_else(|| { + PyAttributeError::new_err(format!("'EventInternalMetadata' has no attribute '{name}'",)) + }) } #[pymethods] @@ -304,25 +508,33 @@ impl EventInternalMetadata { data.shrink_to_fit(); Ok(EventInternalMetadata { - data, - stream_ordering: None, - instance_name: None, - redacted_by: None, - outlier: false, + inner: Arc::new(RwLock::new(EventInternalMetadataInner { + data, + stream_ordering: None, + instance_name: None, + redacted_by: None, + outlier: false, + })), }) } - fn copy(&self) -> Self { - self.clone() + fn copy(&self) -> PyResult { + let guard = self.read_inner()?; + Ok(EventInternalMetadata { + inner: Arc::new(RwLock::new(guard.clone())), + }) } - /// Get a dict holding the data stored in the `internal_metadata` column in the database. + /// Get a dict holding the data stored in the `internal_metadata` column in + /// the database. /// - /// Note that `outlier` and `stream_ordering` are stored in separate columns so are not returned here. + /// Note that `outlier` and `stream_ordering` are stored in separate columns + /// so are not returned here. fn get_dict(&self, py: Python<'_>) -> PyResult> { + let guard = self.read_inner()?; let dict = PyDict::new(py); - for entry in &self.data { + for entry in &guard.data { let (key, value) = entry.to_python_pair(py); dict.set_item(key, value)?; } @@ -330,219 +542,218 @@ impl EventInternalMetadata { Ok(dict.into()) } - fn is_outlier(&self) -> bool { - self.outlier + fn is_outlier(&self) -> PyResult { + Ok(self.read_inner()?.is_outlier()) } - /// Whether this event is an out-of-band membership. - /// - /// OOB memberships are a special case of outlier events: they are - /// membership events for federated rooms that we aren't full members of. - /// Examples include invites received over federation, and rejections for - /// such invites. - /// - /// The concept of an OOB membership is needed because these events need to - /// be processed as if they're new regular events (e.g. updating membership - /// state in the database, relaying to clients via /sync, etc) despite being - /// outliers. - /// - /// See also - /// https://element-hq.github.io/synapse/develop/development/room-dag-concepts.html#out-of-band-membership-events. - /// - /// (Added in synapse 0.99.0, so may be unreliable for events received - /// before that) - fn is_out_of_band_membership(&self) -> bool { - get_property_opt!(self, OutOfBandMembership) - .copied() - .unwrap_or(false) + fn is_out_of_band_membership(&self) -> PyResult { + Ok(self.read_inner()?.is_out_of_band_membership()) } - /// Whether this server should send the event on behalf of another server. - /// This is used by the federation "send_join" API to forward the initial - /// join event for a server in the room. - /// - /// returns a str with the name of the server this event is sent on behalf - /// of. - fn get_send_on_behalf_of(&self) -> Option<&str> { - let s = get_property_opt!(self, SendOnBehalfOf); - s.map(|a| a.deref()) + fn get_send_on_behalf_of(&self) -> PyResult> { + Ok(self + .read_inner()? + .get_send_on_behalf_of() + .map(|s| s.to_owned())) } - /// Whether the redaction event needs to be rechecked when fetching - /// from the database. - /// - /// Starting in room v3 redaction events are accepted up front, and later - /// checked to see if the redacter and redactee's domains match. - /// - /// If the sender of the redaction event is allowed to redact any event - /// due to auth rules, then this will always return false. - fn need_to_check_redaction(&self) -> bool { - get_property_opt!(self, RecheckRedaction) - .copied() - .unwrap_or(false) + fn need_to_check_redaction(&self) -> PyResult { + Ok(self.read_inner()?.need_to_check_redaction()) } - /// Whether the event has been soft failed. - /// - /// Soft failed events should be handled as usual, except: - /// 1. They should not go down sync or event streams, or generally sent to - /// clients. - /// 2. They should not be added to the forward extremities (and therefore - /// not to current state). - fn is_soft_failed(&self) -> bool { - get_property_opt!(self, SoftFailed) - .copied() - .unwrap_or(false) + fn is_soft_failed(&self) -> PyResult { + Ok(self.read_inner()?.is_soft_failed()) } - /// Whether the event, if ours, should be sent to other clients and servers. - /// - /// This is used for sending dummy events internally. Servers and clients - /// can still explicitly fetch the event. - fn should_proactively_send(&self) -> bool { - get_property_opt!(self, ProactivelySend) - .copied() - .unwrap_or(true) + fn should_proactively_send(&self) -> PyResult { + Ok(self.read_inner()?.should_proactively_send()) } - /// Whether the event has been redacted. - /// - /// This is used for efficiently checking whether an event has been marked - /// as redacted without needing to make another database call. - fn is_redacted(&self) -> bool { - get_property_opt!(self, Redacted).copied().unwrap_or(false) + fn is_redacted(&self) -> PyResult { + Ok(self.read_inner()?.is_redacted()) } - /// Whether this event can trigger a push notification - fn is_notifiable(&self) -> bool { - !self.outlier || self.is_out_of_band_membership() + fn is_notifiable(&self) -> PyResult { + Ok(self.read_inner()?.is_notifiable()) } - // ** The following are the getters and setters of the various properties ** + #[getter] + fn get_stream_ordering(&self) -> PyResult> { + Ok(self.read_inner()?.stream_ordering) + } + #[setter] + fn set_stream_ordering(&self, val: Option) -> PyResult<()> { + self.write_inner()?.stream_ordering = val; + Ok(()) + } + + #[getter] + fn get_instance_name(&self) -> PyResult> { + Ok(self.read_inner()?.instance_name.clone()) + } + #[setter] + fn set_instance_name(&self, val: Option) -> PyResult<()> { + self.write_inner()?.instance_name = val; + Ok(()) + } + + #[getter] + fn get_redacted_by(&self) -> PyResult> { + Ok(self.read_inner()?.redacted_by.clone()) + } + #[setter] + fn set_redacted_by(&self, val: Option) -> PyResult<()> { + self.write_inner()?.redacted_by = val; + Ok(()) + } + + #[getter] + fn get_outlier(&self) -> PyResult { + Ok(self.read_inner()?.outlier) + } + #[setter] + fn set_outlier(&self, val: bool) -> PyResult<()> { + self.write_inner()?.outlier = val; + Ok(()) + } #[getter] fn get_out_of_band_membership(&self) -> PyResult { - let bool = get_property!(self, OutOfBandMembership)?; - Ok(*bool) + attr_err( + self.read_inner()?.get_out_of_band_membership(), + "out_of_band_membership", + ) } #[setter] - fn set_out_of_band_membership(&mut self, obj: bool) { - set_property!(self, OutOfBandMembership, obj); + fn set_out_of_band_membership(&self, obj: bool) -> PyResult<()> { + self.write_inner()?.set_out_of_band_membership(obj); + Ok(()) } #[getter(send_on_behalf_of)] - fn getter_send_on_behalf_of(&self) -> PyResult<&str> { - let s = get_property!(self, SendOnBehalfOf)?; - Ok(s) + fn getter_send_on_behalf_of(&self) -> PyResult { + let guard = self.read_inner()?; + attr_err( + guard.get_send_on_behalf_of().map(|s| s.to_owned()), + "send_on_behalf_of", + ) } #[setter] - fn set_send_on_behalf_of(&mut self, obj: String) { - set_property!(self, SendOnBehalfOf, obj.into_boxed_str()); + fn set_send_on_behalf_of(&self, obj: String) -> PyResult<()> { + self.write_inner()?.set_send_on_behalf_of(obj); + Ok(()) } #[getter] fn get_recheck_redaction(&self) -> PyResult { - let bool = get_property!(self, RecheckRedaction)?; - Ok(*bool) + attr_err( + self.read_inner()?.get_recheck_redaction(), + "recheck_redaction", + ) } #[setter] - fn set_recheck_redaction(&mut self, obj: bool) { - set_property!(self, RecheckRedaction, obj); + fn set_recheck_redaction(&self, obj: bool) -> PyResult<()> { + self.write_inner()?.set_recheck_redaction(obj); + Ok(()) } #[getter] fn get_soft_failed(&self) -> PyResult { - let bool = get_property!(self, SoftFailed)?; - Ok(*bool) + attr_err(self.read_inner()?.get_soft_failed(), "soft_failed") } #[setter] - fn set_soft_failed(&mut self, obj: bool) { - set_property!(self, SoftFailed, obj); + fn set_soft_failed(&self, obj: bool) -> PyResult<()> { + self.write_inner()?.set_soft_failed(obj); + Ok(()) } #[getter] fn get_proactively_send(&self) -> PyResult { - let bool = get_property!(self, ProactivelySend)?; - Ok(*bool) + attr_err( + self.read_inner()?.get_proactively_send(), + "proactively_send", + ) } #[setter] - fn set_proactively_send(&mut self, obj: bool) { - set_property!(self, ProactivelySend, obj); + fn set_proactively_send(&self, obj: bool) -> PyResult<()> { + self.write_inner()?.set_proactively_send(obj); + Ok(()) } #[getter] fn get_policy_server_spammy(&self) -> PyResult { - Ok(get_property_opt!(self, PolicyServerSpammy) - .copied() - .unwrap_or(false)) + Ok(self.read_inner()?.get_policy_server_spammy()) } #[setter] - fn set_policy_server_spammy(&mut self, obj: bool) { - set_property!(self, PolicyServerSpammy, obj); + fn set_policy_server_spammy(&self, obj: bool) -> PyResult<()> { + self.write_inner()?.set_policy_server_spammy(obj); + Ok(()) } #[getter] fn get_spam_checker_spammy(&self) -> PyResult { - Ok(get_property_opt!(self, SpamCheckerSpammy) - .copied() - .unwrap_or(false)) + Ok(self.read_inner()?.get_spam_checker_spammy()) } #[setter] - fn set_spam_checker_spammy(&mut self, obj: bool) { - set_property!(self, SpamCheckerSpammy, obj); + fn set_spam_checker_spammy(&self, obj: bool) -> PyResult<()> { + self.write_inner()?.set_spam_checker_spammy(obj); + Ok(()) } #[getter] fn get_redacted(&self) -> PyResult { - let bool = get_property!(self, Redacted)?; - Ok(*bool) + attr_err(self.read_inner()?.get_redacted(), "redacted") } #[setter] - fn set_redacted(&mut self, obj: bool) { - set_property!(self, Redacted, obj); + fn set_redacted(&self, obj: bool) -> PyResult<()> { + self.write_inner()?.set_redacted(obj); + Ok(()) } /// The transaction ID, if it was set when the event was created. #[getter] - fn get_txn_id(&self) -> PyResult<&str> { - let s = get_property!(self, TxnId)?; - Ok(s) + fn get_txn_id(&self) -> PyResult { + let guard = self.read_inner()?; + attr_err(guard.get_txn_id().map(|s| s.to_owned()), "txn_id") } #[setter] - fn set_txn_id(&mut self, obj: String) { - set_property!(self, TxnId, obj.into_boxed_str()); + fn set_txn_id(&self, obj: String) -> PyResult<()> { + self.write_inner()?.set_txn_id(obj); + Ok(()) } /// The delay ID, set only if the event was a delayed event. #[getter] - fn get_delay_id(&self) -> PyResult<&str> { - let s = get_property!(self, DelayId)?; - Ok(s) + fn get_delay_id(&self) -> PyResult { + let guard = self.read_inner()?; + attr_err(guard.get_delay_id().map(|s| s.to_owned()), "delay_id") } #[setter] - fn set_delay_id(&mut self, obj: String) { - set_property!(self, DelayId, obj.into_boxed_str()); + fn set_delay_id(&self, obj: String) -> PyResult<()> { + self.write_inner()?.set_delay_id(obj); + Ok(()) } /// The access token ID of the user who sent this event, if any. #[getter] fn get_token_id(&self) -> PyResult { - let r = get_property!(self, TokenId)?; - Ok(*r) + attr_err(self.read_inner()?.get_token_id(), "token_id") } #[setter] - fn set_token_id(&mut self, obj: i64) { - set_property!(self, TokenId, obj); + fn set_token_id(&self, obj: i64) -> PyResult<()> { + self.write_inner()?.set_token_id(obj); + Ok(()) } /// The device ID of the user who sent this event, if any. #[getter] - fn get_device_id(&self) -> PyResult<&str> { - let s = get_property!(self, DeviceId)?; - Ok(s) + fn get_device_id(&self) -> PyResult { + let guard = self.read_inner()?; + attr_err(guard.get_device_id().map(|s| s.to_owned()), "device_id") } #[setter] - fn set_device_id(&mut self, obj: String) { - set_property!(self, DeviceId, obj.into_boxed_str()); + fn set_device_id(&self, obj: String) -> PyResult<()> { + self.write_inner()?.set_device_id(obj); + Ok(()) } }