diff --git a/synapse/notifier.py b/synapse/notifier.py index df1b87c945..961ea92e0d 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -884,6 +884,17 @@ class Notifier: """ Wait for this worker to catch up with the given stream token. + If the token has positions ahead of our persisted positions in the database + (invalid), then we simply wait until we catch-up to our max persisted position + (recover gracefully); instead of waiting for a position that may never come + around. We have a 10 second timeout here but upstream usage will just return the + same token as the `next_batch` and the user will try again which could cause no + progress to be made. + + This can happen due to older versions of Synapse giving out stream positions + without persisting them in the DB, and so on restart the stream would get reset + back to an older position. + Returns: True when this worker has caught up False when we timed out waiting @@ -896,7 +907,19 @@ class Notifier: # Work around a bug where older Synapse versions gave out tokens "from # the future", i.e. that are ahead of the tokens persisted in the DB. max_token = await id_gen.get_max_allocated_token() - token = token.bound_stream_token(max_token) + if max_token < token.get_max_stream_pos(): + # Log *something* as we consider this as a Synapse programming error + # (assuming no malicious user manipulation of the token) (we shouldn't be + # handing out future tokens). + # + # We don't assert as the whole point of bounding is so that we can recover + # gracefully. + logger.error( + "Bounding token from the future: token: %s, bound: %s", + token, + max_token, + ) + token = token.bound_stream_token(max_token) # Start waiting until we've caught up to the `stream_token` start = self.clock.time_msec() diff --git a/synapse/streams/events.py b/synapse/streams/events.py index d2720fb959..a5b7ded74f 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -149,6 +149,14 @@ class EventSources: ].get_max_allocated_token() if max_token < token_value.get_max_stream_pos(): + # Log *something* as we consider this a Synapse programming error + # + # We don't assert as the whole point of bounding is so that we can recover + # gracefully. + # + # Old versions of Synapse could advance streams without persisting anything in + # the DB (fixed in https://github.com/element-hq/synapse/pull/17229) and on + # restart, those updates would be lost. logger.error( "Bounding token from the future '%s': token: %s, bound: %s", key, diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 6e68123e62..d127be98ed 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -689,20 +689,6 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta): if max_stream <= self.get_max_stream_pos(): return self - # Log *something* as we consider this a programming error - # - # We don't assert as the whole point of bounding is so that we can recover - # gracefully. - # - # Old versions of Synapse could advance streams without persisting anything in - # the DB (fixed in https://github.com/element-hq/synapse/pull/17229) and on - # restart, those updates would be lost. - logger.error( - "Bounding token from the future: token: %s, bound: %s", - self, - max_stream, - ) - min_pos = min(self.stream, max_stream) return type(self)( stream=min_pos,