From 004ea3a6a3118b1b2f5c06ffcc58b3765a2171dd Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 22 May 2026 11:54:37 +0100 Subject: [PATCH] Improve logging around sending replication updates In particular, include the "last_token" that we used to calculate whether there were any updates. Also a bit of general cleanup. --- synapse/replication/tcp/resource.py | 96 +++++++++++++---------------- 1 file changed, 43 insertions(+), 53 deletions(-) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 36dd39ed67..e9cbcd958e 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -182,13 +182,8 @@ class ReplicationStreamer: self.command_handler.will_announce_positions() for stream in all_streams: - self.command_handler.send_command( - PositionCommand( - stream.NAME, - self._instance_name, - stream.last_token, - stream.last_token, - ) + self._send_position_command( + stream.NAME, stream.last_token, stream.last_token ) for stream in all_streams: @@ -205,9 +200,9 @@ class ReplicationStreamer: last_token = stream.last_token logger.debug( - "Getting stream: %s: %s -> %s", + "Getting stream updates for %s: %s -> %s", stream.NAME, - stream.last_token, + last_token, stream.current_token(self._instance_name), ) try: @@ -217,26 +212,7 @@ class ReplicationStreamer: logger.info("Failed to handle stream %s", stream.NAME) raise - logger.debug( - "Sending %d updates", - len(updates), - ) - - if updates: - logger.info( - "Streaming: %s -> %s (limited: %s, updates: %s, max token: %s)", - stream.NAME, - updates[-1][0], - limited, - len(updates), - current_token, - ) - stream_updates_counter.labels( - stream_name=stream.NAME, - **{SERVER_NAME_LABEL: self.server_name}, - ).inc(len(updates)) - - else: + if not updates: # The token has advanced but there is no data to # send, so we send a `POSITION` to inform other # workers of the updated position. @@ -266,21 +242,26 @@ class ReplicationStreamer: # POSITION with last token of X+1, which will # cause them to check if there were any missing # updates between X and X+1. - logger.info( - "Sending position: %s -> %s", - stream.NAME, - current_token, - ) - self.command_handler.send_command( - PositionCommand( - stream.NAME, - self._instance_name, - last_token, - current_token, - ) + self._send_position_command( + stream.NAME, last_token, current_token ) continue + logger.info( + "Streaming %s: %s -> %s (limited: %s, updates: %s, max token: %s)", + stream.NAME, + last_token, + updates[-1][0], + limited, + len(updates), + current_token, + ) + + stream_updates_counter.labels( + stream_name=stream.NAME, + **{SERVER_NAME_LABEL: self.server_name}, + ).inc(len(updates)) + # Some streams return multiple rows with the same stream IDs, # we need to make sure they get sent out in batches. We do # this by setting the current token to all but the last of @@ -300,18 +281,8 @@ class ReplicationStreamer: # token, in which case we want to send out a `POSITION` # to tell other workers the actual current position. if updates[-1][0] < current_token: - logger.info( - "Sending position: %s -> %s", - stream.NAME, - current_token, - ) - self.command_handler.send_command( - PositionCommand( - stream.NAME, - self._instance_name, - updates[-1][0], - current_token, - ) + self._send_position_command( + stream.NAME, updates[-1][0], current_token ) logger.debug("No more pending updates, breaking poke loop") @@ -319,6 +290,25 @@ class ReplicationStreamer: self.pending_updates = False self.is_looping = False + def _send_position_command( + self, stream_name: str, prev_token: int, new_token: int + ) -> None: + """Send a POSITION command over replication""" + logger.info( + "Sending position for %s: %s -> %s", + stream_name, + prev_token, + new_token, + ) + self.command_handler.send_command( + PositionCommand( + stream_name, + self._instance_name, + prev_token, + new_token, + ) + ) + def _batch_updates( updates: list[tuple[Token, StreamRow]],