From ab074f5335c0a7c2dbbafbd9dccb717e94d0b262 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 12 Aug 2024 19:40:53 -0500 Subject: [PATCH] Fix events from rooms we're not joined to affecting the joined room stream ordering --- synapse/storage/databases/main/events.py | 145 ++++++++------ tests/storage/test_events.py | 229 ++++++++++++++++++++++- 2 files changed, 305 insertions(+), 69 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 984a54bf2e..95bf0ce660 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -609,6 +609,10 @@ class PersistEventsStore: txn, room_id, state_delta_for_room, min_stream_order ) + self._update_sliding_sync_tables_with_new_persisted_events_txn( + txn, events_and_contexts + ) + def _persist_event_auth_chain_txn( self, txn: LoggingTransaction, @@ -1631,6 +1635,84 @@ class PersistEventsStore: txn, {m for m in members_to_cache_bust if not self.hs.is_mine_id(m)} ) + def _update_sliding_sync_tables_with_new_persisted_events_txn( + self, + txn: LoggingTransaction, + events_and_contexts: List[Tuple[EventBase, EventContext]], + ) -> None: + """ + Update the latest `event_stream_ordering`/`bump_stamp` columns in the + `sliding_sync_joined_rooms` table for the room with new events. + + This function assumes that `_store_event_txn()` (to persist the event) and + `_update_current_state_txn(...)` (so that `sliding_sync_joined_rooms` table has + been updated with rooms that were joined) have already been run. + + Args: + txn + events_and_contexts: The events being persisted + """ + + # Handle updating `sliding_sync_joined_rooms` + room_id_to_stream_ordering_map: Dict[str, int] = {} + room_id_to_bump_stamp_map: Dict[str, int] = {} + for event, _ in events_and_contexts: + existing_stream_ordering = room_id_to_stream_ordering_map.get(event.room_id) + # This should exist for persisted events + assert event.internal_metadata.stream_ordering is not None + + # Ignore backfilled events which will have a negative stream ordering + if event.internal_metadata.stream_ordering < 0: + continue + + if ( + existing_stream_ordering is None + or existing_stream_ordering < event.internal_metadata.stream_ordering + ): + room_id_to_stream_ordering_map[event.room_id] = ( + event.internal_metadata.stream_ordering + ) + + if event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES: + existing_bump_stamp = room_id_to_bump_stamp_map.get(event.room_id) + # This should exist at this point because we're inserting events here which require it + assert event.internal_metadata.stream_ordering is not None + if ( + existing_bump_stamp is None + or existing_bump_stamp < event.internal_metadata.stream_ordering + ): + room_id_to_bump_stamp_map[event.room_id] = ( + event.internal_metadata.stream_ordering + ) + + txn.execute_batch( + """ + UPDATE sliding_sync_joined_rooms + SET + event_stream_ordering = CASE + WHEN event_stream_ordering IS NULL OR event_stream_ordering < ? + THEN ? + ELSE event_stream_ordering + END, + bump_stamp = CASE + WHEN bump_stamp IS NULL OR bump_stamp < ? + THEN ? + ELSE bump_stamp + END + WHERE room_id = ? + """, + [ + [ + room_id_to_stream_ordering_map[room_id], + room_id_to_stream_ordering_map[room_id], + room_id_to_bump_stamp_map.get(room_id), + room_id_to_bump_stamp_map.get(room_id), + room_id, + ] + for room_id in room_id_to_stream_ordering_map.keys() + ], + ) + def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str) -> None: """Update the room version in the database based off current state events. @@ -1976,67 +2058,6 @@ class PersistEventsStore: ], ) - # Handle updating `sliding_sync_joined_rooms` - room_id_to_stream_ordering_map: Dict[str, int] = {} - room_id_to_bump_stamp_map: Dict[str, int] = {} - for event, _ in events_and_contexts: - existing_stream_ordering = room_id_to_stream_ordering_map.get(event.room_id) - # This should exist at this point because we're inserting events here which require it - assert event.internal_metadata.stream_ordering is not None - if ( - existing_stream_ordering is None - or existing_stream_ordering < event.internal_metadata.stream_ordering - ): - room_id_to_stream_ordering_map[event.room_id] = ( - event.internal_metadata.stream_ordering - ) - - if event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES: - existing_bump_stamp = room_id_to_bump_stamp_map.get(event.room_id) - # This should exist at this point because we're inserting events here which require it - assert event.internal_metadata.stream_ordering is not None - if ( - existing_bump_stamp is None - or existing_bump_stamp < event.internal_metadata.stream_ordering - ): - room_id_to_bump_stamp_map[event.room_id] = ( - event.internal_metadata.stream_ordering - ) - - # This function (`_store_event_txn(...)`) is run before - # `_update_current_state_txn(...)` which handles deleting the rows if we are no - # longer in the room so we don't need to worry about inserting something that - # will be orphaned. - txn.execute_batch( - """ - INSERT INTO sliding_sync_joined_rooms - (room_id, event_stream_ordering, bump_stamp) - VALUES ( - ?, ?, ? - ) - ON CONFLICT (room_id) - DO UPDATE SET - event_stream_ordering = CASE - WHEN event_stream_ordering IS NULL OR event_stream_ordering < EXCLUDED.event_stream_ordering - THEN EXCLUDED.event_stream_ordering - ELSE event_stream_ordering - END, - bump_stamp = CASE - WHEN bump_stamp IS NULL OR bump_stamp < EXCLUDED.bump_stamp - THEN EXCLUDED.bump_stamp - ELSE bump_stamp - END - """, - [ - [ - room_id, - room_id_to_stream_ordering_map[room_id], - room_id_to_bump_stamp_map.get(room_id), - ] - for room_id in room_id_to_stream_ordering_map.keys() - ], - ) - def _store_rejected_events_txn( self, txn: LoggingTransaction, @@ -2385,6 +2406,8 @@ class PersistEventsStore: ) # Sanity check that we at-least have the create event if create_stripped_event is not None: + insert_values["has_known_state"] = True + # Find the room_type insert_values["room_type"] = ( create_stripped_event.content.get( diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py index 19b17fe4c8..fa9e7717fa 100644 --- a/tests/storage/test_events.py +++ b/tests/storage/test_events.py @@ -28,7 +28,8 @@ from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import EventContentFields, EventTypes, Membership, RoomTypes from synapse.api.room_versions import RoomVersions -from synapse.events import EventBase +from synapse.events import EventBase, StrippedStateEvent, make_event_from_dict +from synapse.events.snapshot import EventContext from synapse.federation.federation_base import event_from_pdu_json from synapse.rest import admin from synapse.rest.client import login, room @@ -500,7 +501,7 @@ class _SlidingSyncJoinedRoomResult: bump_stamp: Optional[int] room_type: Optional[str] room_name: Optional[str] - is_encrypted: Optional[bool] + is_encrypted: bool @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -515,9 +516,10 @@ class _SlidingSyncMembershipSnapshotResult: # exists for persisted events but in the context of these tests, we're only working # with persisted events and we're making comparisons so we will find any mismatch. event_stream_ordering: Optional[int] + has_known_state: bool room_type: Optional[str] room_name: Optional[str] - is_encrypted: Optional[bool] + is_encrypted: bool class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): @@ -569,7 +571,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): bump_stamp=row[2], room_type=row[3], room_name=row[4], - is_encrypted=row[5], + is_encrypted=bool(row[5]), ) for row in rows } @@ -584,7 +586,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): Mapping from the (room_id, user_id) to _SlidingSyncMembershipSnapshotResult. """ rows = cast( - List[Tuple[str, str, str, str, int, str, str, bool]], + List[Tuple[str, str, str, str, int, bool, str, str, bool]], self.get_success( self.store.db_pool.simple_select_list( "sliding_sync_membership_snapshots", @@ -595,6 +597,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): "membership_event_id", "membership", "event_stream_ordering", + "has_known_state", "room_type", "room_name", "is_encrypted", @@ -610,13 +613,88 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): membership_event_id=row[2], membership=row[3], event_stream_ordering=row[4], - room_type=row[5], - room_name=row[6], - is_encrypted=row[7], + has_known_state=bool(row[5]), + room_type=row[6], + room_name=row[7], + is_encrypted=bool(row[8]), ) for row in rows } + _remote_invite_count: int = 0 + + def _create_remote_invite_room_for_user( + self, + invitee_user_id: str, + unsigned_invite_room_state: Optional[List[StrippedStateEvent]], + ) -> Tuple[str, EventBase]: + """ + Create a fake invite for a remote room and persist it. + + We don't have any state for these kind of rooms and can only rely on the + stripped state included in the unsigned portion of the invite event to identify + the room. + + Args: + invitee_user_id: The person being invited + unsigned_invite_room_state: List of stripped state events to assist the + receiver in identifying the room. + + Returns: + The room ID of the remote invite room and the persisted remote invite event. + """ + invite_room_id = f"!test_room{self._remote_invite_count}:remote_server" + + invite_event_dict = { + "room_id": invite_room_id, + "sender": "@inviter:remote_server", + "state_key": invitee_user_id, + "depth": 1, + "origin_server_ts": 1, + "type": EventTypes.Member, + "content": {"membership": Membership.INVITE}, + "auth_events": [], + "prev_events": [], + } + if unsigned_invite_room_state is not None: + serialized_stripped_state_events = [] + for stripped_event in unsigned_invite_room_state: + serialized_stripped_state_events.append( + { + "type": stripped_event.type, + "state_key": stripped_event.state_key, + "sender": stripped_event.sender, + "content": stripped_event.content, + } + ) + + invite_event_dict["unsigned"] = { + "invite_room_state": serialized_stripped_state_events + } + + invite_event = make_event_from_dict( + invite_event_dict, + room_version=RoomVersions.V10, + ) + invite_event.internal_metadata.outlier = True + invite_event.internal_metadata.out_of_band_membership = True + + self.get_success( + self.store.maybe_store_room_on_outlier_membership( + room_id=invite_room_id, room_version=invite_event.room_version + ) + ) + context = EventContext.for_outlier(self.hs.get_storage_controllers()) + persist_controller = self.hs.get_storage_controllers().persistence + assert persist_controller is not None + persisted_event, _, _ = self.get_success( + persist_controller.persist_event(invite_event, context) + ) + + self._remote_invite_count += 1 + + return invite_room_id, persisted_event + def test_joined_room_with_no_info(self) -> None: """ Test joined room that doesn't have a room type, encryption, or name shows up in @@ -675,6 +753,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): event_stream_ordering=state_map[ (EventTypes.Member, user1_id) ].internal_metadata.stream_ordering, + has_known_state=True, room_type=None, room_name=None, is_encrypted=False, @@ -758,6 +837,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): event_stream_ordering=state_map[ (EventTypes.Member, user1_id) ].internal_metadata.stream_ordering, + has_known_state=True, room_type=None, room_name="my super duper room", is_encrypted=True, @@ -774,6 +854,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): event_stream_ordering=state_map[ (EventTypes.Member, user2_id) ].internal_metadata.stream_ordering, + has_known_state=True, room_type=None, # Even though this room does have a name and is encrypted, user2 is the # room creator and joined at the room creation time which didn't have @@ -859,6 +940,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): event_stream_ordering=state_map[ (EventTypes.Member, user1_id) ].internal_metadata.stream_ordering, + has_known_state=True, room_type=RoomTypes.SPACE, room_name="my super duper space", is_encrypted=False, @@ -875,6 +957,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): event_stream_ordering=state_map[ (EventTypes.Member, user2_id) ].internal_metadata.stream_ordering, + has_known_state=True, room_type=RoomTypes.SPACE, # Even though this room does have a name, user2 is the room creator and # joined at the room creation time which didn't have this state set yet. @@ -1005,6 +1088,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): event_stream_ordering=state_map[ (EventTypes.Member, user1_id) ].internal_metadata.stream_ordering, + has_known_state=True, room_type=None, room_name="my super duper room", is_encrypted=False, @@ -1021,6 +1105,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): event_stream_ordering=state_map[ (EventTypes.Member, user2_id) ].internal_metadata.stream_ordering, + has_known_state=True, room_type=None, room_name=None, is_encrypted=False, @@ -1096,6 +1181,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): event_stream_ordering=state_map[ (EventTypes.Member, user1_id) ].internal_metadata.stream_ordering, + has_known_state=True, room_type=None, room_name="my super duper room", is_encrypted=False, @@ -1113,6 +1199,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): event_stream_ordering=state_map[ (EventTypes.Member, user2_id) ].internal_metadata.stream_ordering, + has_known_state=True, room_type=None, room_name=None, is_encrypted=False, @@ -1269,6 +1356,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): membership_event_id=user1_invited_response["event_id"], membership=Membership.INVITE, event_stream_ordering=user1_invited_event_pos.stream, + has_known_state=True, room_type=RoomTypes.SPACE, room_name="my super duper space", is_encrypted=True, @@ -1285,6 +1373,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): event_stream_ordering=state_map[ (EventTypes.Member, user2_id) ].internal_metadata.stream_ordering, + has_known_state=True, room_type=RoomTypes.SPACE, room_name=None, is_encrypted=False, @@ -1370,6 +1459,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): membership_event_id=user1_invited_response["event_id"], membership=Membership.INVITE, event_stream_ordering=user1_invited_event_pos.stream, + has_known_state=True, room_type=None, room_name=None, is_encrypted=False, @@ -1386,6 +1476,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): event_stream_ordering=state_map[ (EventTypes.Member, user2_id) ].internal_metadata.stream_ordering, + has_known_state=True, room_type=None, room_name=None, is_encrypted=False, @@ -1400,12 +1491,132 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): membership_event_id=user3_ban_response["event_id"], membership=Membership.BAN, event_stream_ordering=user3_ban_event_pos.stream, + has_known_state=True, room_type=None, room_name=None, is_encrypted=False, ), ) + def test_non_join_remote_invite_no_stripped_state(self) -> None: + """ + Test remote invite with no stripped state provided shows up in + `sliding_sync_membership_snapshots` with `has_known_state=False`. + """ + user1_id = self.register_user("user1", "pass") + _user1_tok = self.login(user1_id, "pass") + + # Create a remote invite room without any `unsigned.invite_room_state` + remote_invite_room_id, remote_invite_event = ( + self._create_remote_invite_room_for_user(user1_id, None) + ) + + # No one local is joined to the remote room + sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms() + self.assertIncludes( + set(sliding_sync_joined_rooms_results.keys()), + set(), + exact=True, + ) + + sliding_sync_membership_snapshots_results = ( + self._get_sliding_sync_membership_snapshots() + ) + self.assertIncludes( + set(sliding_sync_membership_snapshots_results.keys()), + { + (remote_invite_room_id, user1_id), + }, + exact=True, + ) + self.assertEqual( + sliding_sync_membership_snapshots_results.get( + (remote_invite_room_id, user1_id) + ), + _SlidingSyncMembershipSnapshotResult( + room_id=remote_invite_room_id, + user_id=user1_id, + membership_event_id=remote_invite_event.event_id, + membership=Membership.INVITE, + event_stream_ordering=remote_invite_event.internal_metadata.stream_ordering, + # No stripped state provided + has_known_state=False, + room_type=None, + room_name=None, + is_encrypted=False, + ), + ) + + def test_non_join_remote_invite_encrypted_room(self) -> None: + """ + Test remote invite with stripped state (encrypted room) shows up in + `sliding_sync_membership_snapshots`. + """ + user1_id = self.register_user("user1", "pass") + _user1_tok = self.login(user1_id, "pass") + + # Create a remote invite room with some `unsigned.invite_room_state` + # indicating that the room is encrypted. + remote_invite_room_id, remote_invite_event = ( + self._create_remote_invite_room_for_user( + user1_id, + [ + StrippedStateEvent( + type=EventTypes.Create, + state_key="", + sender="@inviter:remote_server", + content={ + EventContentFields.ROOM_CREATOR: "@inviter:remote_server", + EventContentFields.ROOM_VERSION: RoomVersions.V10.identifier, + }, + ), + StrippedStateEvent( + type=EventTypes.RoomEncryption, + state_key="", + sender="@inviter:remote_server", + content={ + EventContentFields.ENCRYPTION_ALGORITHM: "m.megolm.v1.aes-sha2", + }, + ), + ], + ) + ) + + # No one local is joined to the remote room + sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms() + self.assertIncludes( + set(sliding_sync_joined_rooms_results.keys()), + set(), + exact=True, + ) + + sliding_sync_membership_snapshots_results = ( + self._get_sliding_sync_membership_snapshots() + ) + self.assertIncludes( + set(sliding_sync_membership_snapshots_results.keys()), + { + (remote_invite_room_id, user1_id), + }, + exact=True, + ) + self.assertEqual( + sliding_sync_membership_snapshots_results.get( + (remote_invite_room_id, user1_id) + ), + _SlidingSyncMembershipSnapshotResult( + room_id=remote_invite_room_id, + user_id=user1_id, + membership_event_id=remote_invite_event.event_id, + membership=Membership.INVITE, + event_stream_ordering=remote_invite_event.internal_metadata.stream_ordering, + has_known_state=True, + room_type=None, + room_name=None, + is_encrypted=True, + ), + ) + # TODO: Test remote invite # TODO: Test rejection of a remote invite @@ -1467,6 +1678,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): membership_event_id=user1_leave_response["event_id"], membership=Membership.LEAVE, event_stream_ordering=user1_leave_event_pos.stream, + has_known_state=True, room_type=None, room_name=None, is_encrypted=False, @@ -1480,6 +1692,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): membership_event_id=user2_leave_response["event_id"], membership=Membership.LEAVE, event_stream_ordering=user2_leave_event_pos.stream, + has_known_state=True, room_type=None, room_name=None, is_encrypted=False,