mirror of
https://github.com/element-hq/synapse.git
synced 2026-05-14 21:15:12 +00:00
Don't assert invalid future token in wait_for_stream_token(...)
See https://github.com/element-hq/synapse/pull/19644#discussion_r3029874002
This commit is contained in:
+17
-17
@@ -841,6 +841,23 @@ class Notifier:
|
||||
which is behind; if the worker assembles a response up to the token, it could be
|
||||
missing data in the gap between where it's behind and the requested token.
|
||||
|
||||
### Inavlid future tokens
|
||||
|
||||
We assume the token has already been validated/sanitized before being passed to
|
||||
this function to ensure it's not some invalid future token. We consider a token
|
||||
invalid, if the token has positions ahead of our persisted positions in the
|
||||
database. This is important as we we don't want to wait for the stream to
|
||||
advance in those cases (as it may never do so).
|
||||
|
||||
Previously, we would `bound_future_token(...)` within this function but that
|
||||
leads to bad patterns upstream where people can continue to use the unbounded
|
||||
token.
|
||||
|
||||
Args:
|
||||
stream_token: The token to wait for. We assume the token has already been
|
||||
validated/sanitized to ensure it's not some invalid future token (has a
|
||||
stream position ahead of what is in the DB). (see details above)
|
||||
|
||||
Returns:
|
||||
True when this worker has caught up
|
||||
False when we timed out waiting
|
||||
@@ -850,23 +867,6 @@ class Notifier:
|
||||
if stream_token.is_before_or_eq(current_token):
|
||||
return True
|
||||
|
||||
# Assert as we consider this a Synapse programming error. We shouldn't be
|
||||
# handing out invalid future tokens and tokens should be validated before it
|
||||
# reaches this point.
|
||||
#
|
||||
# We consider a token invalid, if the token has positions ahead of our persisted
|
||||
# positions in the database
|
||||
#
|
||||
# Previously, we would bound the tokens within this function but that leads to
|
||||
# bad patterns upstream where people can continue to use the unbounded token.
|
||||
original_stream_token = stream_token
|
||||
max_token = await self.event_sources.bound_future_token(stream_token)
|
||||
assert stream_token.is_before_or_eq(max_token), (
|
||||
f"Refusing to wait for invalid future stream token (token={original_stream_token} "
|
||||
"that has positions ahead of our max persisted position {max_token}) "
|
||||
"(Synapse programming error)"
|
||||
)
|
||||
|
||||
# Start waiting until we've caught up to the `stream_token`
|
||||
start = self.clock.time_msec()
|
||||
logged = False
|
||||
|
||||
@@ -134,32 +134,3 @@ class NotifierTestCase(tests.unittest.HomeserverTestCase):
|
||||
# Make sure we gave up waiting and not caught-up (False)
|
||||
wait_result = self.get_success(wait_d)
|
||||
self.assertEqual(wait_result, False)
|
||||
|
||||
def test_wait_for_stream_token_with_invalid_future_sync_token(
|
||||
self,
|
||||
) -> None:
|
||||
"""
|
||||
Like `test_wait_for_stream_token_with_future_sync_token`, except we
|
||||
give a token that has a stream position ahead of what is in the DB, i.e. its
|
||||
invalid and we shouldn't wait for the stream to advance (as it may never do so).
|
||||
|
||||
This can happen due to older versions of Synapse giving out stream
|
||||
positions without persisting them in the DB, and so on restart the
|
||||
stream would get reset back to an older position.
|
||||
"""
|
||||
# Create a token and advance one of the streams.
|
||||
receipt_id_gen = self.store.get_receipts_stream_id_gen()
|
||||
current_receipt_token = MultiWriterStreamToken.from_generator(receipt_id_gen)
|
||||
receipt_token = current_receipt_token.copy_and_advance(
|
||||
MultiWriterStreamToken(
|
||||
stream=current_receipt_token.get_max_stream_pos() + 1
|
||||
)
|
||||
)
|
||||
token = StreamToken.START.copy_and_advance(StreamKeyType.RECEIPT, receipt_token)
|
||||
|
||||
# Function under test
|
||||
wait_d = defer.ensureDeferred(self.notifier.wait_for_stream_token(token))
|
||||
|
||||
# Expect to fail. We expect callers to sanitize/validate the tokens they give to
|
||||
# `wait_for_stream_token` to ensure they aren't in the future.
|
||||
self.get_failure(wait_d, exc=AssertionError)
|
||||
|
||||
Reference in New Issue
Block a user