From f61d795c3dc369f08660e41be6664714458f2ff6 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 2 Apr 2026 14:02:15 -0500 Subject: [PATCH] Separate out bounding Previously, we would wait for the bounded token and then still use the unbounded `since_token` for all of the queries (flawed). --- synapse/handlers/sliding_sync/__init__.py | 12 ++++ synapse/handlers/sync.py | 7 +++ synapse/notifier.py | 69 +++++++++++++---------- 3 files changed, 59 insertions(+), 29 deletions(-) diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index 6feb6c292e..142e7dbd35 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -150,6 +150,18 @@ class SlidingSyncHandler: # events or future events if the user is nefariously, manually modifying the # token. if from_token is not None: + # Work around a bug where older Synapse versions gave out tokens "from + # the future", i.e. that are ahead of the tokens persisted in the DB. + # + # If the token has positions ahead of our persisted positions in the + # database (invalid), then we simply use our max persisted position (recover + # gracefully); instead of waiting for a position that may never come around. + from_token = SlidingSyncStreamToken( + stream_token=await self.event_sources.bound_future_token( + from_token.stream_token + ), + connection_position=from_token.connection_position, + ) # We need to make sure this worker has caught up with the token. If # this returns false, it means we timed out waiting, and we should # just return an empty response. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c8ef5e2aa6..90b5ebf8ac 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -413,6 +413,13 @@ class SyncHandler: context.tag = sync_label if since_token is not None: + # Work around a bug where older Synapse versions gave out tokens "from + # the future", i.e. that are ahead of the tokens persisted in the DB. + # + # If the token has positions ahead of our persisted positions in the + # database (invalid), then we simply use our max persisted position (recover + # gracefully); instead of waiting for a position that may never come around. + since_token = await self.event_sources.bound_future_token(since_token) # We need to make sure this worker has caught up with the token. If # this returns false it means we timed out waiting, and we should # just return an empty response. diff --git a/synapse/notifier.py b/synapse/notifier.py index 961ea92e0d..d266684a74 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -838,6 +838,12 @@ class Notifier: """ Wait for this worker to catch up with the given stream token. + This is important to ensure that the worker has a proper view of the world + before trying to serve a request. For example, one worker can return a response + with some `next_batch` token, but then the next request goes to another worker + which is behind; if the worker assembles a response up to the token, it could be + missing data in the gap between where it's behind and the requested token. + Returns: True when this worker has caught up False when we timed out waiting @@ -847,9 +853,21 @@ class Notifier: if stream_token.is_before_or_eq(current_token): return True - # Work around a bug where older Synapse versions gave out tokens "from - # the future", i.e. that are ahead of the tokens persisted in the DB. - stream_token = await self.event_sources.bound_future_token(stream_token) + # Assert as we consider this a Synapse programming error. We shouldn't be + # handing out invalid future tokens and tokens should be validated before it + # reaches this point. + # + # We consider a token invalid, if the token has positions ahead of our persisted + # positions in the database + # + # Previously, we would bound the tokens within this function but that leads to + # bad patterns upstream where people can continue to use the unbounded token. + original_stream_token = stream_token + max_token = await self.event_sources.bound_future_token(stream_token) + assert stream_token.is_before_or_eq(max_token), ( + f"Unable to wait for invalid future stream token (token={original_stream_token} has positions " + "ahead of our max persisted position {max_token})" + ) # Start waiting until we've caught up to the `stream_token` start = self.clock.time_msec() @@ -884,16 +902,11 @@ class Notifier: """ Wait for this worker to catch up with the given stream token. - If the token has positions ahead of our persisted positions in the database - (invalid), then we simply wait until we catch-up to our max persisted position - (recover gracefully); instead of waiting for a position that may never come - around. We have a 10 second timeout here but upstream usage will just return the - same token as the `next_batch` and the user will try again which could cause no - progress to be made. - - This can happen due to older versions of Synapse giving out stream positions - without persisting them in the DB, and so on restart the stream would get reset - back to an older position. + This is important to ensure that the worker has a proper view of the world + before trying to serve a request. For example, one worker can return a response + with some `next_batch` token, but then the next request goes to another worker + which is behind; if the worker assembles a response up to the token, it could be + missing data in the gap between where it's behind and the requested token. Returns: True when this worker has caught up @@ -904,22 +917,20 @@ class Notifier: if token.is_before_or_eq(current_token): return True - # Work around a bug where older Synapse versions gave out tokens "from - # the future", i.e. that are ahead of the tokens persisted in the DB. - max_token = await id_gen.get_max_allocated_token() - if max_token < token.get_max_stream_pos(): - # Log *something* as we consider this as a Synapse programming error - # (assuming no malicious user manipulation of the token) (we shouldn't be - # handing out future tokens). - # - # We don't assert as the whole point of bounding is so that we can recover - # gracefully. - logger.error( - "Bounding token from the future: token: %s, bound: %s", - token, - max_token, - ) - token = token.bound_stream_token(max_token) + # Assert as we consider this a Synapse programming error. We shouldn't be + # handing out invalid future tokens and tokens should be validated before it + # reaches this point. + # + # We consider a token invalid, if the token has positions ahead of our persisted + # positions in the database + # + # Previously, we would bound the tokens within this function but that leads to + # bad patterns upstream where people can continue to use the unbounded token. + max_persisted_position = await id_gen.get_max_allocated_token() + assert max_persisted_position >= token.get_max_stream_pos(), ( + f"Unable to wait for invalid future token (token={token} has positions " + "ahead of our max persisted position {max_persisted_position})" + ) # Start waiting until we've caught up to the `stream_token` start = self.clock.time_msec()