From 2bdafecfeae23cf1aeddd7cfcd9d519739f59584 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 2 Apr 2026 12:34:59 -0500 Subject: [PATCH] Start of `wait_for_multi_writer_stream_token` --- synapse/notifier.py | 52 +++++++++++++++++++++++++- synapse/storage/databases/main/room.py | 7 +++- synapse/types/__init__.py | 17 +++++++++ 3 files changed, 74 insertions(+), 2 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 93d438def7..020b341689 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -46,6 +46,9 @@ from synapse.logging import issue9533_logger from synapse.logging.context import PreserveLoggingContext from synapse.logging.opentracing import log_kv, start_active_span from synapse.metrics import SERVER_NAME_LABEL, LaterGauge +from synapse.storage.util.id_generators import ( + MultiWriterIdGenerator, +) from synapse.streams.config import PaginationConfig from synapse.types import ( ISynapseReactor, @@ -831,8 +834,15 @@ class Notifier: return result async def wait_for_stream_token(self, stream_token: StreamToken) -> bool: - """Wait for this worker to catch up with the given stream token.""" + """ + Wait for this worker to catch up with the given stream token. + + Returns: + True when this worker has caught up + False when we timed out waiting + """ current_token = self.event_sources.get_current_token() + # Return early if we are already caught up if stream_token.is_before_or_eq(current_token): return True @@ -840,6 +850,7 @@ class Notifier: # the future", i.e. that are ahead of the tokens persisted in the DB. stream_token = await self.event_sources.bound_future_token(stream_token) + # Start waiting until we've caught up to the `stream_token` start = self.clock.time_msec() logged = False while True: @@ -849,6 +860,7 @@ class Notifier: now = self.clock.time_msec() + # Timed out if now - start > 10_000: return False @@ -863,6 +875,44 @@ class Notifier: # TODO: be better await self.clock.sleep(Duration(milliseconds=500)) + async def wait_for_multi_writer_stream_token( + self, token: MultiWriterStreamToken, id_gen: MultiWriterIdGenerator + ) -> bool: + """Wait for this worker to catch up with the given stream token.""" + current_token = id_gen.get_current_token() + # Return early if we are already caught up + if token.is_before_or_eq(current_token): + return True + + # Work around a bug where older Synapse versions gave out tokens "from + # the future", i.e. that are ahead of the tokens persisted in the DB. + token = token.bound_future_token(TODO) + + # Start waiting until we've caught up to the `stream_token` + start = self.clock.time_msec() + logged = False + while True: + current_token = self.event_sources.get_current_token() + if token.is_before_or_eq(current_token): + return True + + now = self.clock.time_msec() + + # Timed out + if now - start > 10_000: + return False + + if not logged: + logger.info( + "Waiting for current token to reach %s; currently at %s", + token, + current_token, + ) + logged = True + + # TODO: be better + await self.clock.sleep(Duration(milliseconds=500)) + async def _get_room_ids( self, user: UserID, explicit_room_id: str | None ) -> tuple[StrCollection, bool]: diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 7ac88e4c2a..5e6e971f9f 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -58,7 +58,12 @@ from synapse.storage.database import ( from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.types import Cursor from synapse.storage.util.id_generators import IdGenerator, MultiWriterIdGenerator -from synapse.types import JsonDict, RetentionPolicy, StrCollection, ThirdPartyInstanceID +from synapse.types import ( + JsonDict, + RetentionPolicy, + StrCollection, + ThirdPartyInstanceID, +) from synapse.util.caches.descriptors import cached, cachedList from synapse.util.json import json_encoder from synapse.util.stringutils import MXC_REGEX diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 02889795bb..6e68123e62 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -685,6 +685,23 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta): def bound_stream_token(self, max_stream: int) -> "Self": """Bound the stream positions to a maximum value""" + # Shortcut if we're already under the bound + if max_stream <= self.get_max_stream_pos(): + return self + + # Log *something* as we consider this a programming error + # + # We don't assert as the whole point of bounding is so that we can recover + # gracefully. + # + # Old versions of Synapse could advance streams without persisting anything in + # the DB (fixed in https://github.com/element-hq/synapse/pull/17229) and on + # restart, those updates would be lost. + logger.error( + "Bounding token from the future: token: %s, bound: %s", + self, + max_stream, + ) min_pos = min(self.stream, max_stream) return type(self)(