Start of wait_for_multi_writer_stream_token

This commit is contained in:
Eric Eastwood
2026-04-02 12:34:59 -05:00
parent 62f23fed27
commit 2bdafecfea
3 changed files with 74 additions and 2 deletions
+51 -1
View File
@@ -46,6 +46,9 @@ from synapse.logging import issue9533_logger
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import log_kv, start_active_span
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.storage.util.id_generators import (
MultiWriterIdGenerator,
)
from synapse.streams.config import PaginationConfig
from synapse.types import (
ISynapseReactor,
@@ -831,8 +834,15 @@ class Notifier:
return result
async def wait_for_stream_token(self, stream_token: StreamToken) -> bool:
"""Wait for this worker to catch up with the given stream token."""
"""
Wait for this worker to catch up with the given stream token.
Returns:
True when this worker has caught up
False when we timed out waiting
"""
current_token = self.event_sources.get_current_token()
# Return early if we are already caught up
if stream_token.is_before_or_eq(current_token):
return True
@@ -840,6 +850,7 @@ class Notifier:
# the future", i.e. that are ahead of the tokens persisted in the DB.
stream_token = await self.event_sources.bound_future_token(stream_token)
# Start waiting until we've caught up to the `stream_token`
start = self.clock.time_msec()
logged = False
while True:
@@ -849,6 +860,7 @@ class Notifier:
now = self.clock.time_msec()
# Timed out
if now - start > 10_000:
return False
@@ -863,6 +875,44 @@ class Notifier:
# TODO: be better
await self.clock.sleep(Duration(milliseconds=500))
async def wait_for_multi_writer_stream_token(
self, token: MultiWriterStreamToken, id_gen: MultiWriterIdGenerator
) -> bool:
"""Wait for this worker to catch up with the given stream token."""
current_token = id_gen.get_current_token()
# Return early if we are already caught up
if token.is_before_or_eq(current_token):
return True
# Work around a bug where older Synapse versions gave out tokens "from
# the future", i.e. that are ahead of the tokens persisted in the DB.
token = token.bound_future_token(TODO)
# Start waiting until we've caught up to the `stream_token`
start = self.clock.time_msec()
logged = False
while True:
current_token = self.event_sources.get_current_token()
if token.is_before_or_eq(current_token):
return True
now = self.clock.time_msec()
# Timed out
if now - start > 10_000:
return False
if not logged:
logger.info(
"Waiting for current token to reach %s; currently at %s",
token,
current_token,
)
logged = True
# TODO: be better
await self.clock.sleep(Duration(milliseconds=500))
async def _get_room_ids(
self, user: UserID, explicit_room_id: str | None
) -> tuple[StrCollection, bool]:
+6 -1
View File
@@ -58,7 +58,12 @@ from synapse.storage.database import (
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import IdGenerator, MultiWriterIdGenerator
from synapse.types import JsonDict, RetentionPolicy, StrCollection, ThirdPartyInstanceID
from synapse.types import (
JsonDict,
RetentionPolicy,
StrCollection,
ThirdPartyInstanceID,
)
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.json import json_encoder
from synapse.util.stringutils import MXC_REGEX
+17
View File
@@ -685,6 +685,23 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
def bound_stream_token(self, max_stream: int) -> "Self":
"""Bound the stream positions to a maximum value"""
# Shortcut if we're already under the bound
if max_stream <= self.get_max_stream_pos():
return self
# Log *something* as we consider this a programming error
#
# We don't assert as the whole point of bounding is so that we can recover
# gracefully.
#
# Old versions of Synapse could advance streams without persisting anything in
# the DB (fixed in https://github.com/element-hq/synapse/pull/17229) and on
# restart, those updates would be lost.
logger.error(
"Bounding token from the future: token: %s, bound: %s",
self,
max_stream,
)
min_pos = min(self.stream, max_stream)
return type(self)(