More refinement

This commit is contained in:
Eric Eastwood
2026-04-02 13:12:58 -05:00
parent dc3d205ffc
commit 6eda7661cd
3 changed files with 32 additions and 15 deletions
+24 -1
View File
@@ -884,6 +884,17 @@ class Notifier:
"""
Wait for this worker to catch up with the given stream token.
If the token has positions ahead of our persisted positions in the database
(invalid), then we simply wait until we catch-up to our max persisted position
(recover gracefully); instead of waiting for a position that may never come
around. We have a 10 second timeout here but upstream usage will just return the
same token as the `next_batch` and the user will try again which could cause no
progress to be made.
This can happen due to older versions of Synapse giving out stream positions
without persisting them in the DB, and so on restart the stream would get reset
back to an older position.
Returns:
True when this worker has caught up
False when we timed out waiting
@@ -896,7 +907,19 @@ class Notifier:
# Work around a bug where older Synapse versions gave out tokens "from
# the future", i.e. that are ahead of the tokens persisted in the DB.
max_token = await id_gen.get_max_allocated_token()
token = token.bound_stream_token(max_token)
if max_token < token.get_max_stream_pos():
# Log *something* as we consider this as a Synapse programming error
# (assuming no malicious user manipulation of the token) (we shouldn't be
# handing out future tokens).
#
# We don't assert as the whole point of bounding is so that we can recover
# gracefully.
logger.error(
"Bounding token from the future: token: %s, bound: %s",
token,
max_token,
)
token = token.bound_stream_token(max_token)
# Start waiting until we've caught up to the `stream_token`
start = self.clock.time_msec()
+8
View File
@@ -149,6 +149,14 @@ class EventSources:
].get_max_allocated_token()
if max_token < token_value.get_max_stream_pos():
# Log *something* as we consider this a Synapse programming error
#
# We don't assert as the whole point of bounding is so that we can recover
# gracefully.
#
# Old versions of Synapse could advance streams without persisting anything in
# the DB (fixed in https://github.com/element-hq/synapse/pull/17229) and on
# restart, those updates would be lost.
logger.error(
"Bounding token from the future '%s': token: %s, bound: %s",
key,
-14
View File
@@ -689,20 +689,6 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
if max_stream <= self.get_max_stream_pos():
return self
# Log *something* as we consider this a programming error
#
# We don't assert as the whole point of bounding is so that we can recover
# gracefully.
#
# Old versions of Synapse could advance streams without persisting anything in
# the DB (fixed in https://github.com/element-hq/synapse/pull/17229) and on
# restart, those updates would be lost.
logger.error(
"Bounding token from the future: token: %s, bound: %s",
self,
max_stream,
)
min_pos = min(self.stream, max_stream)
return type(self)(
stream=min_pos,