diff --git a/synapse/notifier.py b/synapse/notifier.py index 656c18f6d1..dfd2178f93 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -841,6 +841,23 @@ class Notifier: which is behind; if the worker assembles a response up to the token, it could be missing data in the gap between where it's behind and the requested token. + ### Inavlid future tokens + + We assume the token has already been validated/sanitized before being passed to + this function to ensure it's not some invalid future token. We consider a token + invalid, if the token has positions ahead of our persisted positions in the + database. This is important as we we don't want to wait for the stream to + advance in those cases (as it may never do so). + + Previously, we would `bound_future_token(...)` within this function but that + leads to bad patterns upstream where people can continue to use the unbounded + token. + + Args: + stream_token: The token to wait for. We assume the token has already been + validated/sanitized to ensure it's not some invalid future token (has a + stream position ahead of what is in the DB). (see details above) + Returns: True when this worker has caught up False when we timed out waiting @@ -850,23 +867,6 @@ class Notifier: if stream_token.is_before_or_eq(current_token): return True - # Assert as we consider this a Synapse programming error. We shouldn't be - # handing out invalid future tokens and tokens should be validated before it - # reaches this point. - # - # We consider a token invalid, if the token has positions ahead of our persisted - # positions in the database - # - # Previously, we would bound the tokens within this function but that leads to - # bad patterns upstream where people can continue to use the unbounded token. - original_stream_token = stream_token - max_token = await self.event_sources.bound_future_token(stream_token) - assert stream_token.is_before_or_eq(max_token), ( - f"Refusing to wait for invalid future stream token (token={original_stream_token} " - "that has positions ahead of our max persisted position {max_token}) " - "(Synapse programming error)" - ) - # Start waiting until we've caught up to the `stream_token` start = self.clock.time_msec() logged = False diff --git a/tests/test_notifier.py b/tests/test_notifier.py index fdd44d7b6b..c65134e832 100644 --- a/tests/test_notifier.py +++ b/tests/test_notifier.py @@ -134,32 +134,3 @@ class NotifierTestCase(tests.unittest.HomeserverTestCase): # Make sure we gave up waiting and not caught-up (False) wait_result = self.get_success(wait_d) self.assertEqual(wait_result, False) - - def test_wait_for_stream_token_with_invalid_future_sync_token( - self, - ) -> None: - """ - Like `test_wait_for_stream_token_with_future_sync_token`, except we - give a token that has a stream position ahead of what is in the DB, i.e. its - invalid and we shouldn't wait for the stream to advance (as it may never do so). - - This can happen due to older versions of Synapse giving out stream - positions without persisting them in the DB, and so on restart the - stream would get reset back to an older position. - """ - # Create a token and advance one of the streams. - receipt_id_gen = self.store.get_receipts_stream_id_gen() - current_receipt_token = MultiWriterStreamToken.from_generator(receipt_id_gen) - receipt_token = current_receipt_token.copy_and_advance( - MultiWriterStreamToken( - stream=current_receipt_token.get_max_stream_pos() + 1 - ) - ) - token = StreamToken.START.copy_and_advance(StreamKeyType.RECEIPT, receipt_token) - - # Function under test - wait_d = defer.ensureDeferred(self.notifier.wait_for_stream_token(token)) - - # Expect to fail. We expect callers to sanitize/validate the tokens they give to - # `wait_for_stream_token` to ensure they aren't in the future. - self.get_failure(wait_d, exc=AssertionError)