Start of updating sliding_sync_joined_rooms

This commit is contained in:
Eric Eastwood
2024-08-05 22:34:21 -05:00
parent ad1c887b4c
commit 2b5f07d714
2 changed files with 99 additions and 7 deletions
+95 -4
View File
@@ -34,6 +34,7 @@ from typing import (
Optional,
Set,
Tuple,
Union,
cast,
)
@@ -1163,6 +1164,7 @@ class PersistEventsStore:
for ev_type, state_key in itertools.chain(to_delete, to_insert)
if ev_type == EventTypes.Member
}
members_to_cache_bust = members_changed.copy()
if delta_state.no_longer_in_room:
# Server is no longer in the room so we delete the room from
@@ -1182,16 +1184,22 @@ class PersistEventsStore:
"""
txn.execute(sql, (stream_id, self._instance_name, room_id))
# Grab the list of users before we clear out the current state
users_in_room = self.store.get_users_in_room_txn(txn, room_id)
# We also want to invalidate the membership caches for users
# that were in the room.
users_in_room = self.store.get_users_in_room_txn(txn, room_id)
members_changed.update(users_in_room)
members_to_cache_bust.update(users_in_room)
self.db_pool.simple_delete_txn(
txn,
table="current_state_events",
keyvalues={"room_id": room_id},
)
self.db_pool.simple_delete_txn(
txn,
table="sliding_sync_joined_rooms",
keyvalues={"room_id": room_id},
)
else:
# We're still in the room, so we update the current state as normal.
@@ -1260,6 +1268,81 @@ class PersistEventsStore:
],
)
# Handle updating the `sliding_sync_joined_rooms` table
sliding_sync_joined_rooms_insert_map: Dict[
str, Optional[Union[str, bool]]
] = {}
event_ids_to_fetch: List[str] = []
create_event_id = None
room_encryption_event_id = None
room_name_event_id = None
for state_key, event_id in to_insert.items():
if state_key[0] == EventTypes.Create:
create_event_id = event_id
event_ids_to_fetch.append(event_id)
sliding_sync_joined_rooms_insert_map["room_type"] = None
elif state_key[0] == EventTypes.RoomEncryption:
room_encryption_event_id = event_id
event_ids_to_fetch.append(event_id)
sliding_sync_joined_rooms_insert_map["is_encrypted"] = None
elif state_key[0] == EventTypes.Name:
room_name_event_id = event_id
event_ids_to_fetch.append(event_id)
sliding_sync_joined_rooms_insert_map["room_name"] = None
# Fetch the events from the database
event_json_rows = cast(
List[Tuple[str, str]],
self.db_pool.simple_select_many_txn(
txn,
table="event_json",
column="event_id",
iterable=event_ids_to_fetch,
retcols=["event_id", "json"],
keyvalues={},
),
)
# Parse the raw event JSON
for event_id, json in event_json_rows:
event_json = db_to_json(json)
if event_id == create_event_id:
room_type = event_json.get("content", {}).get(
EventContentFields.ROOM_TYPE
)
sliding_sync_joined_rooms_insert_map["room_type"] = room_type
elif event_id == room_encryption_event_id:
is_encrypted = event_json.get("content", {}).get(
EventContentFields.ENCRYPTION_ALGORITHM
)
sliding_sync_joined_rooms_insert_map["is_encrypted"] = is_encrypted
elif event_id == room_name_event_id:
room_name = event_json.get("content", {}).get("name")
sliding_sync_joined_rooms_insert_map["room_name"] = room_name
else:
raise AssertionError(
f"Unexpected event_id (we should not be fetching extra events): {event_id}"
)
# Update the `sliding_sync_joined_rooms` table
insert_keys, insert_values = sliding_sync_joined_rooms_insert_map.items()
if len(insert_keys) > 0:
# TODO: Should we add `bump_stamp` on insert?
txn.execute(
f"""
INSERT INTO sliding_sync_joined_rooms
(room_id, stream_ordering, {", ".join(insert_keys)})
VALUES (
?,
(SELECT stream_ordering FROM events WHERE event_id = ?)
{", ".join("?" for _ in insert_values)}
)
ON CONFLICT (room_id)
DO UPDATE SET
{", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)}
""",
)
# We now update `local_current_membership`. We do this regardless
# of whether we're still in the room or not to handle the case where
# e.g. we just got banned (where we need to record that fact here).
@@ -1296,6 +1379,12 @@ class PersistEventsStore:
],
)
# We now update `sliding_sync_non_join_memberships`. We do this regardless of
# whether the server is still in the room or not because we still want a row if
# we just left/kicked or got banned from the room.
#
# TODO
txn.call_after(
self.store._curr_state_delta_stream_cache.entity_has_changed,
room_id,
@@ -1303,12 +1392,14 @@ class PersistEventsStore:
)
# Invalidate the various caches
self.store._invalidate_state_caches_and_stream(txn, room_id, members_changed)
self.store._invalidate_state_caches_and_stream(
txn, room_id, members_to_cache_bust
)
# Check if any of the remote membership changes requires us to
# unsubscribe from their device lists.
self.store.handle_potentially_left_users_txn(
txn, {m for m in members_changed if not self.hs.is_mine_id(m)}
txn, {m for m in members_to_cache_bust if not self.hs.is_mine_id(m)}
)
def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str) -> None:
@@ -11,6 +11,7 @@
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Kept in sync with `current_state_events`
CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms(
FOREIGN KEY(room_id) REFERENCES rooms(room_id),
room_type TEXT,
@@ -18,18 +19,18 @@ CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms(
is_encrypted BOOLEAN,
stream_ordering: BIGINT,
bump_stamp: BIGINT,
PRIMARY KEY (room_id)
);
CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_joined_rooms_room_id ON sliding_sync_joined_rooms(room_id);
CREATE TABLE IF NOT EXISTS sliding_sync_non_join_memberships(
FOREIGN KEY(membership_event_id) REFERENCES events(event_id),
FOREIGN KEY(room_id) REFERENCES rooms(room_id),
FOREIGN KEY(membership_event_id) REFERENCES events(event_id),
room_type TEXT,
room_name TEXT,
is_encrypted BOOLEAN,
stream_ordering: BIGINT,
bump_stamp: BIGINT,
PRIMARY KEY (room_id, membership_event_id)
);
CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_non_join_memberships_membership_event_id ON sliding_sync_non_join_memberships(membership_event_id);