Separate out bounding

Previously, we would wait for the bounded token and then still use the
unbounded `since_token` for all of the queries (flawed).
This commit is contained in:
Eric Eastwood
2026-04-02 14:02:15 -05:00
parent 6eda7661cd
commit f61d795c3d
3 changed files with 59 additions and 29 deletions
+12
View File
@@ -150,6 +150,18 @@ class SlidingSyncHandler:
# events or future events if the user is nefariously, manually modifying the
# token.
if from_token is not None:
# Work around a bug where older Synapse versions gave out tokens "from
# the future", i.e. that are ahead of the tokens persisted in the DB.
#
# If the token has positions ahead of our persisted positions in the
# database (invalid), then we simply use our max persisted position (recover
# gracefully); instead of waiting for a position that may never come around.
from_token = SlidingSyncStreamToken(
stream_token=await self.event_sources.bound_future_token(
from_token.stream_token
),
connection_position=from_token.connection_position,
)
# We need to make sure this worker has caught up with the token. If
# this returns false, it means we timed out waiting, and we should
# just return an empty response.
+7
View File
@@ -413,6 +413,13 @@ class SyncHandler:
context.tag = sync_label
if since_token is not None:
# Work around a bug where older Synapse versions gave out tokens "from
# the future", i.e. that are ahead of the tokens persisted in the DB.
#
# If the token has positions ahead of our persisted positions in the
# database (invalid), then we simply use our max persisted position (recover
# gracefully); instead of waiting for a position that may never come around.
since_token = await self.event_sources.bound_future_token(since_token)
# We need to make sure this worker has caught up with the token. If
# this returns false it means we timed out waiting, and we should
# just return an empty response.
+40 -29
View File
@@ -838,6 +838,12 @@ class Notifier:
"""
Wait for this worker to catch up with the given stream token.
This is important to ensure that the worker has a proper view of the world
before trying to serve a request. For example, one worker can return a response
with some `next_batch` token, but then the next request goes to another worker
which is behind; if the worker assembles a response up to the token, it could be
missing data in the gap between where it's behind and the requested token.
Returns:
True when this worker has caught up
False when we timed out waiting
@@ -847,9 +853,21 @@ class Notifier:
if stream_token.is_before_or_eq(current_token):
return True
# Work around a bug where older Synapse versions gave out tokens "from
# the future", i.e. that are ahead of the tokens persisted in the DB.
stream_token = await self.event_sources.bound_future_token(stream_token)
# Assert as we consider this a Synapse programming error. We shouldn't be
# handing out invalid future tokens and tokens should be validated before it
# reaches this point.
#
# We consider a token invalid, if the token has positions ahead of our persisted
# positions in the database
#
# Previously, we would bound the tokens within this function but that leads to
# bad patterns upstream where people can continue to use the unbounded token.
original_stream_token = stream_token
max_token = await self.event_sources.bound_future_token(stream_token)
assert stream_token.is_before_or_eq(max_token), (
f"Unable to wait for invalid future stream token (token={original_stream_token} has positions "
"ahead of our max persisted position {max_token})"
)
# Start waiting until we've caught up to the `stream_token`
start = self.clock.time_msec()
@@ -884,16 +902,11 @@ class Notifier:
"""
Wait for this worker to catch up with the given stream token.
If the token has positions ahead of our persisted positions in the database
(invalid), then we simply wait until we catch-up to our max persisted position
(recover gracefully); instead of waiting for a position that may never come
around. We have a 10 second timeout here but upstream usage will just return the
same token as the `next_batch` and the user will try again which could cause no
progress to be made.
This can happen due to older versions of Synapse giving out stream positions
without persisting them in the DB, and so on restart the stream would get reset
back to an older position.
This is important to ensure that the worker has a proper view of the world
before trying to serve a request. For example, one worker can return a response
with some `next_batch` token, but then the next request goes to another worker
which is behind; if the worker assembles a response up to the token, it could be
missing data in the gap between where it's behind and the requested token.
Returns:
True when this worker has caught up
@@ -904,22 +917,20 @@ class Notifier:
if token.is_before_or_eq(current_token):
return True
# Work around a bug where older Synapse versions gave out tokens "from
# the future", i.e. that are ahead of the tokens persisted in the DB.
max_token = await id_gen.get_max_allocated_token()
if max_token < token.get_max_stream_pos():
# Log *something* as we consider this as a Synapse programming error
# (assuming no malicious user manipulation of the token) (we shouldn't be
# handing out future tokens).
#
# We don't assert as the whole point of bounding is so that we can recover
# gracefully.
logger.error(
"Bounding token from the future: token: %s, bound: %s",
token,
max_token,
)
token = token.bound_stream_token(max_token)
# Assert as we consider this a Synapse programming error. We shouldn't be
# handing out invalid future tokens and tokens should be validated before it
# reaches this point.
#
# We consider a token invalid, if the token has positions ahead of our persisted
# positions in the database
#
# Previously, we would bound the tokens within this function but that leads to
# bad patterns upstream where people can continue to use the unbounded token.
max_persisted_position = await id_gen.get_max_allocated_token()
assert max_persisted_position >= token.get_max_stream_pos(), (
f"Unable to wait for invalid future token (token={token} has positions "
"ahead of our max persisted position {max_persisted_position})"
)
# Start waiting until we've caught up to the `stream_token`
start = self.clock.time_msec()