mirror of
https://github.com/element-hq/synapse.git
synced 2026-06-06 17:42:10 +00:00
Start catch-up if nothing written yet
This commit is contained in:
@@ -48,6 +48,7 @@ from synapse.storage.databases.main.events_bg_updates import _BackgroundUpdates
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||
from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.iterutils import batch_iter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -637,44 +638,56 @@ def _resolve_stale_data_in_sliding_sync_joined_rooms_table(
|
||||
""",
|
||||
)
|
||||
|
||||
row = txn.fetchone()
|
||||
# We have nothing written to the `sliding_sync_joined_rooms` table so there is
|
||||
# If we have nothing written to the `sliding_sync_joined_rooms` table, there is
|
||||
# nothing to clean up
|
||||
if row is None:
|
||||
return
|
||||
row = cast(Optional[Tuple[int]], txn.fetchone())
|
||||
max_stream_ordering_sliding_sync_joined_rooms_table = None
|
||||
if row is not None:
|
||||
(max_stream_ordering_sliding_sync_joined_rooms_table,) = row
|
||||
|
||||
max_stream_ordering_sliding_sync_joined_rooms_table = row[0]
|
||||
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT DISTINCT(room_id)
|
||||
FROM events
|
||||
WHERE stream_ordering > ?
|
||||
ORDER BY stream_ordering DESC
|
||||
""",
|
||||
(max_stream_ordering_sliding_sync_joined_rooms_table,),
|
||||
)
|
||||
|
||||
room_rows = txn.fetchall()
|
||||
# No new events have been written to the `events` table since the last time we wrote
|
||||
# to the `sliding_sync_joined_rooms` table so there is nothing to clean up. This is
|
||||
# the expected normal scenario for people who have not downgraded their Synapse
|
||||
# version.
|
||||
if not room_rows:
|
||||
return
|
||||
|
||||
for chunk in batch_iter(room_rows, 1000):
|
||||
# Handle updating the `sliding_sync_joined_rooms` table
|
||||
#
|
||||
DatabasePool.simple_delete_many_batch_txn(
|
||||
txn,
|
||||
table="sliding_sync_joined_rooms",
|
||||
keys=("room_id",),
|
||||
values=chunk,
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT DISTINCT(room_id)
|
||||
FROM events
|
||||
WHERE stream_ordering > ?
|
||||
ORDER BY stream_ordering DESC
|
||||
""",
|
||||
(max_stream_ordering_sliding_sync_joined_rooms_table,),
|
||||
)
|
||||
|
||||
room_rows = txn.fetchall()
|
||||
# No new events have been written to the `events` table since the last time we wrote
|
||||
# to the `sliding_sync_joined_rooms` table so there is nothing to clean up. This is
|
||||
# the expected normal scenario for people who have not downgraded their Synapse
|
||||
# version.
|
||||
if not room_rows:
|
||||
return
|
||||
|
||||
# 1000 is an arbitrary batch size with no testing
|
||||
for chunk in batch_iter(room_rows, 1000):
|
||||
# Handle updating the `sliding_sync_joined_rooms` table
|
||||
#
|
||||
DatabasePool.simple_delete_many_batch_txn(
|
||||
txn,
|
||||
table="sliding_sync_joined_rooms",
|
||||
keys=("room_id",),
|
||||
values=chunk,
|
||||
)
|
||||
|
||||
# Now kick-off the background update to catch-up with what we missed while Synapse
|
||||
# was downgraded.
|
||||
#
|
||||
# We may need to catch-up on everything if we have nothing written to the
|
||||
# `sliding_sync_joined_rooms` table yet. This could happen if someone had zero rooms
|
||||
# on their server (so the normal background update completes), downgrade Synapse
|
||||
# versions, join and create some new rooms, and upgrade again.
|
||||
#
|
||||
progress_json: JsonDict = {}
|
||||
if max_stream_ordering_sliding_sync_joined_rooms_table is not None:
|
||||
progress_json["last_event_stream_ordering"] = (
|
||||
max_stream_ordering_sliding_sync_joined_rooms_table
|
||||
)
|
||||
|
||||
DatabasePool.simple_upsert_txn_native_upsert(
|
||||
txn,
|
||||
table="background_updates",
|
||||
@@ -685,13 +698,7 @@ def _resolve_stale_data_in_sliding_sync_joined_rooms_table(
|
||||
# Only insert the row if it doesn't already exist. If it already exists, we will
|
||||
# eventually fill in the rows we're trying to populate.
|
||||
insertion_values={
|
||||
"progress_json": json_encoder.encode(
|
||||
{
|
||||
"last_event_stream_ordering": {
|
||||
str(max_stream_ordering_sliding_sync_joined_rooms_table)
|
||||
}
|
||||
}
|
||||
),
|
||||
"progress_json": json_encoder.encode(progress_json),
|
||||
},
|
||||
)
|
||||
|
||||
@@ -720,10 +727,10 @@ def _resolve_stale_data_in_sliding_sync_membership_snapshots_table(
|
||||
|
||||
# If we have nothing written to the `sliding_sync_membership_snapshots` table,
|
||||
# there is nothing to clean up
|
||||
row = cast(Tuple[int], txn.fetchone())
|
||||
row = cast(Optional[Tuple[int]], txn.fetchone())
|
||||
max_stream_ordering_sliding_sync_membership_snapshots_table = None
|
||||
if row is not None:
|
||||
max_stream_ordering_sliding_sync_membership_snapshots_table = row[0]
|
||||
(max_stream_ordering_sliding_sync_membership_snapshots_table,) = row
|
||||
|
||||
# XXX: Since `forgotten` is simply a flag on the `room_memberships` table that is
|
||||
# set out-of-band, there is no way to tell whether it was set while Synapse was
|
||||
@@ -749,6 +756,7 @@ def _resolve_stale_data_in_sliding_sync_membership_snapshots_table(
|
||||
if not membership_rows:
|
||||
return
|
||||
|
||||
# 1000 is an arbitrary batch size with no testing
|
||||
for chunk in batch_iter(membership_rows, 1000):
|
||||
# Handle updating the `sliding_sync_membership_snapshots` table
|
||||
#
|
||||
@@ -762,14 +770,17 @@ def _resolve_stale_data_in_sliding_sync_membership_snapshots_table(
|
||||
# Now kick-off the background update to catch-up with what we missed while Synapse
|
||||
# was downgraded.
|
||||
#
|
||||
progress_json = {}
|
||||
# We may need to catch-up on everything if we have nothing written to the
|
||||
# `sliding_sync_membership_snapshots` table yet. This could happen if someone had
|
||||
# zero rooms on their server (so the normal background update completes), downgrade
|
||||
# Synapse versions, join and create some new rooms, and upgrade again.
|
||||
#
|
||||
progress_json: JsonDict = {}
|
||||
if max_stream_ordering_sliding_sync_membership_snapshots_table is not None:
|
||||
progress_json["last_event_stream_ordering"] = (
|
||||
max_stream_ordering_sliding_sync_membership_snapshots_table
|
||||
)
|
||||
|
||||
# We still need to kick off the background update to catch-up regardless of whether
|
||||
# there was anything to clean up.
|
||||
DatabasePool.simple_upsert_txn_native_upsert(
|
||||
txn,
|
||||
table="background_updates",
|
||||
|
||||
Reference in New Issue
Block a user