Better wait_for_multi_writer_stream_token

This commit is contained in:
Eric Eastwood
2026-04-02 12:52:15 -05:00
parent 2bdafecfea
commit dc3d205ffc
+15 -5
View File
@@ -51,6 +51,7 @@ from synapse.storage.util.id_generators import (
)
from synapse.streams.config import PaginationConfig
from synapse.types import (
AbstractMultiWriterStreamToken,
ISynapseReactor,
JsonDict,
MultiWriterStreamToken,
@@ -876,23 +877,32 @@ class Notifier:
await self.clock.sleep(Duration(milliseconds=500))
async def wait_for_multi_writer_stream_token(
self, token: MultiWriterStreamToken, id_gen: MultiWriterIdGenerator
self,
token: AbstractMultiWriterStreamToken,
id_gen: MultiWriterIdGenerator,
) -> bool:
"""Wait for this worker to catch up with the given stream token."""
current_token = id_gen.get_current_token()
"""
Wait for this worker to catch up with the given stream token.
Returns:
True when this worker has caught up
False when we timed out waiting
"""
current_token = AbstractMultiWriterStreamToken.from_generator(id_gen)
# Return early if we are already caught up
if token.is_before_or_eq(current_token):
return True
# Work around a bug where older Synapse versions gave out tokens "from
# the future", i.e. that are ahead of the tokens persisted in the DB.
token = token.bound_future_token(TODO)
max_token = await id_gen.get_max_allocated_token()
token = token.bound_stream_token(max_token)
# Start waiting until we've caught up to the `stream_token`
start = self.clock.time_msec()
logged = False
while True:
current_token = self.event_sources.get_current_token()
current_token = AbstractMultiWriterStreamToken.from_generator(id_gen)
if token.is_before_or_eq(current_token):
return True