Improve logging around sending replication updates

In particular, include the "last_token" that we used to calculate whether there
were any updates. Also a bit of general cleanup.
This commit is contained in:
Richard van der Hoff
2026-05-22 11:54:37 +01:00
parent 0b56f31f8b
commit 004ea3a6a3
+43 -53
View File
@@ -182,13 +182,8 @@ class ReplicationStreamer:
self.command_handler.will_announce_positions()
for stream in all_streams:
self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
stream.last_token,
stream.last_token,
)
self._send_position_command(
stream.NAME, stream.last_token, stream.last_token
)
for stream in all_streams:
@@ -205,9 +200,9 @@ class ReplicationStreamer:
last_token = stream.last_token
logger.debug(
"Getting stream: %s: %s -> %s",
"Getting stream updates for %s: %s -> %s",
stream.NAME,
stream.last_token,
last_token,
stream.current_token(self._instance_name),
)
try:
@@ -217,26 +212,7 @@ class ReplicationStreamer:
logger.info("Failed to handle stream %s", stream.NAME)
raise
logger.debug(
"Sending %d updates",
len(updates),
)
if updates:
logger.info(
"Streaming: %s -> %s (limited: %s, updates: %s, max token: %s)",
stream.NAME,
updates[-1][0],
limited,
len(updates),
current_token,
)
stream_updates_counter.labels(
stream_name=stream.NAME,
**{SERVER_NAME_LABEL: self.server_name},
).inc(len(updates))
else:
if not updates:
# The token has advanced but there is no data to
# send, so we send a `POSITION` to inform other
# workers of the updated position.
@@ -266,21 +242,26 @@ class ReplicationStreamer:
# POSITION with last token of X+1, which will
# cause them to check if there were any missing
# updates between X and X+1.
logger.info(
"Sending position: %s -> %s",
stream.NAME,
current_token,
)
self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
last_token,
current_token,
)
self._send_position_command(
stream.NAME, last_token, current_token
)
continue
logger.info(
"Streaming %s: %s -> %s (limited: %s, updates: %s, max token: %s)",
stream.NAME,
last_token,
updates[-1][0],
limited,
len(updates),
current_token,
)
stream_updates_counter.labels(
stream_name=stream.NAME,
**{SERVER_NAME_LABEL: self.server_name},
).inc(len(updates))
# Some streams return multiple rows with the same stream IDs,
# we need to make sure they get sent out in batches. We do
# this by setting the current token to all but the last of
@@ -300,18 +281,8 @@ class ReplicationStreamer:
# token, in which case we want to send out a `POSITION`
# to tell other workers the actual current position.
if updates[-1][0] < current_token:
logger.info(
"Sending position: %s -> %s",
stream.NAME,
current_token,
)
self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
updates[-1][0],
current_token,
)
self._send_position_command(
stream.NAME, updates[-1][0], current_token
)
logger.debug("No more pending updates, breaking poke loop")
@@ -319,6 +290,25 @@ class ReplicationStreamer:
self.pending_updates = False
self.is_looping = False
def _send_position_command(
self, stream_name: str, prev_token: int, new_token: int
) -> None:
"""Send a POSITION command over replication"""
logger.info(
"Sending position for %s: %s -> %s",
stream_name,
prev_token,
new_token,
)
self.command_handler.send_command(
PositionCommand(
stream_name,
self._instance_name,
prev_token,
new_token,
)
)
def _batch_updates(
updates: list[tuple[Token, StreamRow]],