From 2b5f07d714c006ccb342a038b18dccfc9b17fe05 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 5 Aug 2024 22:34:21 -0500 Subject: [PATCH] Start of updating `sliding_sync_joined_rooms` --- synapse/storage/databases/main/events.py | 99 ++++++++++++++++++- .../delta/87/01_sliding_sync_memberships.sql | 7 +- 2 files changed, 99 insertions(+), 7 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 1f7acdb859..1ce7578160 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -34,6 +34,7 @@ from typing import ( Optional, Set, Tuple, + Union, cast, ) @@ -1163,6 +1164,7 @@ class PersistEventsStore: for ev_type, state_key in itertools.chain(to_delete, to_insert) if ev_type == EventTypes.Member } + members_to_cache_bust = members_changed.copy() if delta_state.no_longer_in_room: # Server is no longer in the room so we delete the room from @@ -1182,16 +1184,22 @@ class PersistEventsStore: """ txn.execute(sql, (stream_id, self._instance_name, room_id)) + # Grab the list of users before we clear out the current state + users_in_room = self.store.get_users_in_room_txn(txn, room_id) # We also want to invalidate the membership caches for users # that were in the room. - users_in_room = self.store.get_users_in_room_txn(txn, room_id) - members_changed.update(users_in_room) + members_to_cache_bust.update(users_in_room) self.db_pool.simple_delete_txn( txn, table="current_state_events", keyvalues={"room_id": room_id}, ) + self.db_pool.simple_delete_txn( + txn, + table="sliding_sync_joined_rooms", + keyvalues={"room_id": room_id}, + ) else: # We're still in the room, so we update the current state as normal. @@ -1260,6 +1268,81 @@ class PersistEventsStore: ], ) + # Handle updating the `sliding_sync_joined_rooms` table + sliding_sync_joined_rooms_insert_map: Dict[ + str, Optional[Union[str, bool]] + ] = {} + event_ids_to_fetch: List[str] = [] + create_event_id = None + room_encryption_event_id = None + room_name_event_id = None + for state_key, event_id in to_insert.items(): + if state_key[0] == EventTypes.Create: + create_event_id = event_id + event_ids_to_fetch.append(event_id) + sliding_sync_joined_rooms_insert_map["room_type"] = None + elif state_key[0] == EventTypes.RoomEncryption: + room_encryption_event_id = event_id + event_ids_to_fetch.append(event_id) + sliding_sync_joined_rooms_insert_map["is_encrypted"] = None + elif state_key[0] == EventTypes.Name: + room_name_event_id = event_id + event_ids_to_fetch.append(event_id) + sliding_sync_joined_rooms_insert_map["room_name"] = None + + # Fetch the events from the database + event_json_rows = cast( + List[Tuple[str, str]], + self.db_pool.simple_select_many_txn( + txn, + table="event_json", + column="event_id", + iterable=event_ids_to_fetch, + retcols=["event_id", "json"], + keyvalues={}, + ), + ) + # Parse the raw event JSON + for event_id, json in event_json_rows: + event_json = db_to_json(json) + + if event_id == create_event_id: + room_type = event_json.get("content", {}).get( + EventContentFields.ROOM_TYPE + ) + sliding_sync_joined_rooms_insert_map["room_type"] = room_type + elif event_id == room_encryption_event_id: + is_encrypted = event_json.get("content", {}).get( + EventContentFields.ENCRYPTION_ALGORITHM + ) + sliding_sync_joined_rooms_insert_map["is_encrypted"] = is_encrypted + elif event_id == room_name_event_id: + room_name = event_json.get("content", {}).get("name") + sliding_sync_joined_rooms_insert_map["room_name"] = room_name + else: + raise AssertionError( + f"Unexpected event_id (we should not be fetching extra events): {event_id}" + ) + + # Update the `sliding_sync_joined_rooms` table + insert_keys, insert_values = sliding_sync_joined_rooms_insert_map.items() + if len(insert_keys) > 0: + # TODO: Should we add `bump_stamp` on insert? + txn.execute( + f""" + INSERT INTO sliding_sync_joined_rooms + (room_id, stream_ordering, {", ".join(insert_keys)}) + VALUES ( + ?, + (SELECT stream_ordering FROM events WHERE event_id = ?) + {", ".join("?" for _ in insert_values)} + ) + ON CONFLICT (room_id) + DO UPDATE SET + {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)} + """, + ) + # We now update `local_current_membership`. We do this regardless # of whether we're still in the room or not to handle the case where # e.g. we just got banned (where we need to record that fact here). @@ -1296,6 +1379,12 @@ class PersistEventsStore: ], ) + # We now update `sliding_sync_non_join_memberships`. We do this regardless of + # whether the server is still in the room or not because we still want a row if + # we just left/kicked or got banned from the room. + # + # TODO + txn.call_after( self.store._curr_state_delta_stream_cache.entity_has_changed, room_id, @@ -1303,12 +1392,14 @@ class PersistEventsStore: ) # Invalidate the various caches - self.store._invalidate_state_caches_and_stream(txn, room_id, members_changed) + self.store._invalidate_state_caches_and_stream( + txn, room_id, members_to_cache_bust + ) # Check if any of the remote membership changes requires us to # unsubscribe from their device lists. self.store.handle_potentially_left_users_txn( - txn, {m for m in members_changed if not self.hs.is_mine_id(m)} + txn, {m for m in members_to_cache_bust if not self.hs.is_mine_id(m)} ) def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str) -> None: diff --git a/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql b/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql index 646d3b7128..be1b27fdfd 100644 --- a/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql +++ b/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql @@ -11,6 +11,7 @@ -- See the GNU Affero General Public License for more details: -- . +-- Kept in sync with `current_state_events` CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms( FOREIGN KEY(room_id) REFERENCES rooms(room_id), room_type TEXT, @@ -18,18 +19,18 @@ CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms( is_encrypted BOOLEAN, stream_ordering: BIGINT, bump_stamp: BIGINT, + PRIMARY KEY (room_id) ); -CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_joined_rooms_room_id ON sliding_sync_joined_rooms(room_id); CREATE TABLE IF NOT EXISTS sliding_sync_non_join_memberships( - FOREIGN KEY(membership_event_id) REFERENCES events(event_id), FOREIGN KEY(room_id) REFERENCES rooms(room_id), + FOREIGN KEY(membership_event_id) REFERENCES events(event_id), room_type TEXT, room_name TEXT, is_encrypted BOOLEAN, stream_ordering: BIGINT, bump_stamp: BIGINT, + PRIMARY KEY (room_id, membership_event_id) ); -CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_non_join_memberships_membership_event_id ON sliding_sync_non_join_memberships(membership_event_id);