Files
synapse/tests/replication/storage/test_events.py
T
Erik Johnston 9e2a076144 Port Event class to Rust (#19701)
Ports the event class to Rust.

The main difference here are:
1. There is now a single event class
2. We now validate a lot more at event construction time than we
previously did (we basically checked nothing before). This required some
changes to the tests, including
https://github.com/matrix-org/sytest/pull/1423

Reviewable commit-by-commit.

### Overview of Event Rust structure

The format of the event struct in Rust is quite different than that in
Python.

The top-level looks like:

```rust
pub struct Event {
    /// The parsed event JSON.
    fields: FormattedEvent,

    /// The event ID. For format v1 this is read directly from the JSON;
    /// for v2+ it is computed from the canonical-JSON hash at
    /// construction time and cached here.
    event_id: Arc<str>,

    /// Synapse-internal per-event state that lives outside the federated
    /// JSON (e.g. outlier flag, soft-failure, stream positions).
    #[pyo3(get)]
    internal_metadata: EventInternalMetadata,

    /// The room version this event was parsed for.
    #[pyo3(get)]
    room_version: &'static RoomVersion,

    /// `None` for accepted events; otherwise a short reason set by auth
    /// when the event was rejected.
    rejected_reason: Option<Box<str>>,
}
```

which includes the actual parsed event in `FormattedEvent`, plus the
rest of the event metadata.

```rust
pub struct FormattedEvent<E = Arc<EventFormatEnum>> {
    #[serde(default)]
    pub signatures: Signatures,

    #[serde(default)]
    pub unsigned: Unsigned,

    #[serde(flatten)]
    pub specific_fields: E,

    #[serde(flatten)]
    pub common_fields: Arc<EventCommonFields>,
}
```

The struct is further split into the common fields, format specific
fields, plus the signatures and unsigned. We split out the signature and
unsigned fields as they are mutable, so when we clone the event we can
still share the common and specific fields and only copy signature and
unsigned.

The `specific_fields` are the fields that depend on the format version.
They can either be a specific format (e.g. `E = EventFormatV1`) or a
type-erased enum `EventFormatEnum` that is across all room versions:

```rust
pub enum EventFormatEnum {
    V1(EventFormatV1),
    V2V3(EventFormatV2V3),
    V4(EventFormatV4),
    VMSC4242(EventFormatVMSC4242),
}
```

For example:

```rust
/// Shared flat-list encoding of `auth_events` and `prev_events`, reused
/// by every format from v2/v3 onwards.
#[derive(Serialize, Deserialize)]
pub struct SimpleAuthPrevEvents {
    pub auth_events: Vec<String>,
    pub prev_events: Vec<String>,
}

/// Version-specific fields for room versions 3-10.
#[derive(Serialize, Deserialize)]
pub struct EventFormatV2V3 {
    pub room_id: Box<str>,
    #[serde(flatten)]
    pub auth_prev_events: SimpleAuthPrevEvents,
}
```


### Dev notes

As discussed in
[`#element-backend-internal:matrix.org`](https://matrix.to/#/!SGNQGPGUwtcPBUotTL:matrix.org/$3gTjDO440GbAz57cXcCawwiyFLiD0crrarvS1uhzKOY?via=jki.re&via=element.io&via=matrix.org)

---------

Co-authored-by: Eric Eastwood <erice@element.io>
2026-06-02 11:05:38 +01:00

299 lines
9.9 KiB
Python

#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2016 OpenMarket Ltd
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
import logging
from typing import Any, Iterable
from canonicaljson import encode_canonical_json
from parameterized import parameterized
from twisted.internet.testing import MemoryReactor
from synapse.api.constants import ReceiptTypes
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, make_event_from_dict
from synapse.events.snapshot import EventContext
from synapse.server import HomeServer
from synapse.storage.databases.main.event_push_actions import (
NotifCounts,
RoomNotifCounts,
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.roommember import RoomsForUser
from synapse.types import PersistedEventPosition
from synapse.util.clock import Clock
from ._base import BaseWorkerStoreTestCase
USER_ID = "@feeling:test"
USER_ID_2 = "@bright:test"
OUTLIER = {"outlier": True}
ROOM_ID = "!room:test"
logger = logging.getLogger(__name__)
class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
STORE_TYPE = EventsWorkerStore
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
super().prepare(reactor, clock, hs)
self.get_success(
self.master_store.store_room(
ROOM_ID,
USER_ID,
is_public=False,
room_version=RoomVersions.V1,
)
)
def assertEventsEqual(
self, first: EventBase, second: EventBase, msg: Any | None = None
) -> None:
self.assertEqual(
encode_canonical_json(first.get_pdu_json()),
encode_canonical_json(second.get_pdu_json()),
msg,
)
def test_get_latest_event_ids_in_room(self) -> None:
create = self.persist(type="m.room.create", key="", creator=USER_ID)
self.replicate()
self.check("get_latest_event_ids_in_room", (ROOM_ID,), {create.event_id})
join = self.persist(
type="m.room.member",
key=USER_ID,
membership="join",
prev_events=[(create.event_id, {})],
)
self.replicate()
self.check("get_latest_event_ids_in_room", (ROOM_ID,), {join.event_id})
def test_redactions(self) -> None:
self.persist(type="m.room.create", key="", creator=USER_ID)
self.persist(type="m.room.member", key=USER_ID, membership="join")
msg = self.persist(type="m.room.message", msgtype="m.text", body="Hello")
self.replicate()
self.check("get_event", [msg.event_id], msg, asserter=self.assertEventsEqual)
redaction = self.persist(type="m.room.redaction", redacts=msg.event_id)
self.replicate()
msg_dict = msg.get_dict()
msg_dict["content"] = {}
redacted = make_event_from_dict(
msg_dict, internal_metadata_dict=msg.internal_metadata.get_dict()
)
redacted.internal_metadata.redacted_by = redaction.event_id
self.check(
"get_event", [msg.event_id], redacted, asserter=self.assertEventsEqual
)
def test_backfilled_redactions(self) -> None:
self.persist(type="m.room.create", key="", creator=USER_ID)
self.persist(type="m.room.member", key=USER_ID, membership="join")
msg = self.persist(type="m.room.message", msgtype="m.text", body="Hello")
self.replicate()
self.check("get_event", [msg.event_id], msg, asserter=self.assertEventsEqual)
redaction = self.persist(
type="m.room.redaction", redacts=msg.event_id, backfill=True
)
self.replicate()
msg_dict = msg.get_dict()
msg_dict["content"] = {}
redacted = make_event_from_dict(
msg_dict, internal_metadata_dict=msg.internal_metadata.get_dict()
)
redacted.internal_metadata.redacted_by = redaction.event_id
self.check(
"get_event", [msg.event_id], redacted, asserter=self.assertEventsEqual
)
def test_invites(self) -> None:
self.persist(type="m.room.create", key="", creator=USER_ID)
self.check("get_invited_rooms_for_local_user", [USER_ID_2], [])
event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
assert event.internal_metadata.instance_name is not None
assert event.internal_metadata.stream_ordering is not None
self.replicate()
self.check(
"get_invited_rooms_for_local_user",
[USER_ID_2],
[
RoomsForUser(
ROOM_ID,
USER_ID,
"invite",
event.event_id,
PersistedEventPosition(
event.internal_metadata.instance_name,
event.internal_metadata.stream_ordering,
),
RoomVersions.V1.identifier,
)
],
)
@parameterized.expand([(True,), (False,)])
def test_push_actions_for_user(self, send_receipt: bool) -> None:
self.persist(type="m.room.create", key="", creator=USER_ID)
self.persist(type="m.room.member", key=USER_ID, membership="join")
self.persist(
type="m.room.member", sender=USER_ID, key=USER_ID_2, membership="join"
)
event1 = self.persist(type="m.room.message", msgtype="m.text", body="hello")
self.replicate()
if send_receipt:
self.get_success(
self.master_store.insert_receipt(
ROOM_ID, ReceiptTypes.READ, USER_ID_2, [event1.event_id], None, {}
)
)
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2],
RoomNotifCounts(
NotifCounts(highlight_count=0, unread_count=0, notify_count=0), {}
),
)
self.persist(
type="m.room.message",
msgtype="m.text",
body="world",
push_actions=[(USER_ID_2, ["notify"])],
)
self.replicate()
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2],
RoomNotifCounts(
NotifCounts(highlight_count=0, unread_count=0, notify_count=1), {}
),
)
self.persist(
type="m.room.message",
msgtype="m.text",
body="world",
push_actions=[
(USER_ID_2, ["notify", {"set_tweak": "highlight", "value": True}])
],
)
self.replicate()
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2],
RoomNotifCounts(
NotifCounts(highlight_count=1, unread_count=0, notify_count=2), {}
),
)
event_id = 0
def persist(self, backfill: bool = False, **kwargs: Any) -> EventBase:
"""
Returns:
The event that was persisted.
"""
event, context = self.build_event(**kwargs)
if backfill:
self.get_success(
self.persistance.persist_events([(event, context)], backfilled=True)
)
else:
self.get_success(self.persistance.persist_event(event, context))
return event
def build_event(
self,
sender: str = USER_ID,
room_id: str = ROOM_ID,
type: str = "m.room.message",
key: str | None = None,
internal: dict | None = None,
depth: int | None = None,
prev_events: list[tuple[str, dict]] | None = None,
auth_events: list[str] | None = None,
prev_state: list[str] | None = None,
redacts: str | None = None,
push_actions: Iterable = frozenset(),
**content: object,
) -> tuple[EventBase, EventContext]:
prev_events = prev_events or []
auth_events = auth_events or []
prev_state = prev_state or []
if depth is None:
depth = self.event_id
if not prev_events:
latest_event_ids = self.get_success(
self.master_store.get_latest_event_ids_in_room(room_id)
)
prev_events = [(ev_id, {}) for ev_id in latest_event_ids]
event_dict = {
"sender": sender,
"type": type,
"content": content,
"event_id": "$%d:blue" % (self.event_id,),
"room_id": room_id,
"depth": depth,
"origin_server_ts": self.event_id,
"prev_events": prev_events,
"auth_events": auth_events,
"hashes": {},
}
if key is not None:
event_dict["state_key"] = key
event_dict["prev_state"] = prev_state
if redacts is not None:
event_dict["redacts"] = redacts
event = make_event_from_dict(event_dict, internal_metadata_dict=internal or {})
self.event_id += 1
state_handler = self.hs.get_state_handler()
context = self.get_success(state_handler.compute_event_context(event))
self.get_success(
self.master_store.add_push_actions_to_staging(
event.event_id,
dict(push_actions),
False,
"main",
)
)
return event, context