From 9a7d8c2be44072da69b12a3edee02f04271d94c8 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 27 Aug 2024 17:28:07 -0500 Subject: [PATCH] Start catch-up if nothing written yet --- synapse/storage/prepare_database.py | 99 ++++++++++++++++------------- 1 file changed, 55 insertions(+), 44 deletions(-) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 16b4dea523..2527766e2d 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -48,6 +48,7 @@ from synapse.storage.databases.main.events_bg_updates import _BackgroundUpdates from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION from synapse.storage.types import Cursor +from synapse.types import JsonDict from synapse.util.iterutils import batch_iter logger = logging.getLogger(__name__) @@ -637,44 +638,56 @@ def _resolve_stale_data_in_sliding_sync_joined_rooms_table( """, ) - row = txn.fetchone() - # We have nothing written to the `sliding_sync_joined_rooms` table so there is + # If we have nothing written to the `sliding_sync_joined_rooms` table, there is # nothing to clean up - if row is None: - return + row = cast(Optional[Tuple[int]], txn.fetchone()) + max_stream_ordering_sliding_sync_joined_rooms_table = None + if row is not None: + (max_stream_ordering_sliding_sync_joined_rooms_table,) = row - max_stream_ordering_sliding_sync_joined_rooms_table = row[0] - - txn.execute( - """ - SELECT DISTINCT(room_id) - FROM events - WHERE stream_ordering > ? - ORDER BY stream_ordering DESC - """, - (max_stream_ordering_sliding_sync_joined_rooms_table,), - ) - - room_rows = txn.fetchall() - # No new events have been written to the `events` table since the last time we wrote - # to the `sliding_sync_joined_rooms` table so there is nothing to clean up. This is - # the expected normal scenario for people who have not downgraded their Synapse - # version. - if not room_rows: - return - - for chunk in batch_iter(room_rows, 1000): - # Handle updating the `sliding_sync_joined_rooms` table - # - DatabasePool.simple_delete_many_batch_txn( - txn, - table="sliding_sync_joined_rooms", - keys=("room_id",), - values=chunk, + txn.execute( + """ + SELECT DISTINCT(room_id) + FROM events + WHERE stream_ordering > ? + ORDER BY stream_ordering DESC + """, + (max_stream_ordering_sliding_sync_joined_rooms_table,), ) + room_rows = txn.fetchall() + # No new events have been written to the `events` table since the last time we wrote + # to the `sliding_sync_joined_rooms` table so there is nothing to clean up. This is + # the expected normal scenario for people who have not downgraded their Synapse + # version. + if not room_rows: + return + + # 1000 is an arbitrary batch size with no testing + for chunk in batch_iter(room_rows, 1000): + # Handle updating the `sliding_sync_joined_rooms` table + # + DatabasePool.simple_delete_many_batch_txn( + txn, + table="sliding_sync_joined_rooms", + keys=("room_id",), + values=chunk, + ) + # Now kick-off the background update to catch-up with what we missed while Synapse # was downgraded. + # + # We may need to catch-up on everything if we have nothing written to the + # `sliding_sync_joined_rooms` table yet. This could happen if someone had zero rooms + # on their server (so the normal background update completes), downgrade Synapse + # versions, join and create some new rooms, and upgrade again. + # + progress_json: JsonDict = {} + if max_stream_ordering_sliding_sync_joined_rooms_table is not None: + progress_json["last_event_stream_ordering"] = ( + max_stream_ordering_sliding_sync_joined_rooms_table + ) + DatabasePool.simple_upsert_txn_native_upsert( txn, table="background_updates", @@ -685,13 +698,7 @@ def _resolve_stale_data_in_sliding_sync_joined_rooms_table( # Only insert the row if it doesn't already exist. If it already exists, we will # eventually fill in the rows we're trying to populate. insertion_values={ - "progress_json": json_encoder.encode( - { - "last_event_stream_ordering": { - str(max_stream_ordering_sliding_sync_joined_rooms_table) - } - } - ), + "progress_json": json_encoder.encode(progress_json), }, ) @@ -720,10 +727,10 @@ def _resolve_stale_data_in_sliding_sync_membership_snapshots_table( # If we have nothing written to the `sliding_sync_membership_snapshots` table, # there is nothing to clean up - row = cast(Tuple[int], txn.fetchone()) + row = cast(Optional[Tuple[int]], txn.fetchone()) max_stream_ordering_sliding_sync_membership_snapshots_table = None if row is not None: - max_stream_ordering_sliding_sync_membership_snapshots_table = row[0] + (max_stream_ordering_sliding_sync_membership_snapshots_table,) = row # XXX: Since `forgotten` is simply a flag on the `room_memberships` table that is # set out-of-band, there is no way to tell whether it was set while Synapse was @@ -749,6 +756,7 @@ def _resolve_stale_data_in_sliding_sync_membership_snapshots_table( if not membership_rows: return + # 1000 is an arbitrary batch size with no testing for chunk in batch_iter(membership_rows, 1000): # Handle updating the `sliding_sync_membership_snapshots` table # @@ -762,14 +770,17 @@ def _resolve_stale_data_in_sliding_sync_membership_snapshots_table( # Now kick-off the background update to catch-up with what we missed while Synapse # was downgraded. # - progress_json = {} + # We may need to catch-up on everything if we have nothing written to the + # `sliding_sync_membership_snapshots` table yet. This could happen if someone had + # zero rooms on their server (so the normal background update completes), downgrade + # Synapse versions, join and create some new rooms, and upgrade again. + # + progress_json: JsonDict = {} if max_stream_ordering_sliding_sync_membership_snapshots_table is not None: progress_json["last_event_stream_ordering"] = ( max_stream_ordering_sliding_sync_membership_snapshots_table ) - # We still need to kick off the background update to catch-up regardless of whether - # there was anything to clean up. DatabasePool.simple_upsert_txn_native_upsert( txn, table="background_updates",