Batch outbound device list poke replication notifications

Move notify_replication() and send_device_messages() from inside the
per-row loop to after each batch of room-poke conversions. This allows
the outbound pokes to be coalesced into fewer RDATA messages by the
deduplication in DeviceListsStream._update_function, rather than
trickling out one at a time.
This commit is contained in:
Quentin Gliech
2026-03-05 11:13:55 +01:00
parent b30607ccaf
commit ea10ea6c77

View File

@@ -1098,6 +1098,11 @@ class DeviceWriterHandler(DeviceHandler):
else:
return
# Collect all hosts we need to notify federation about
# across the whole batch, so we can send a single
# notification at the end.
all_hosts_to_notify: set[str] = set()
for user_id, device_id, room_id, stream_id, opentracing_context in rows:
hosts = set()
@@ -1133,25 +1138,7 @@ class DeviceWriterHandler(DeviceHandler):
converted_upto_stream_id=stream_id,
)
# Notify replication that we've updated the device list stream.
self.notifier.notify_replication()
if hosts and self.federation_sender:
logger.info(
"Sending device list update notif for %r to: %r",
user_id,
hosts,
)
await self.federation_sender.send_device_messages(
hosts, immediate=False
)
# TODO: when called, this isn't in a logging context.
# This leads to log spam, sentry event spam, and massive
# memory usage.
# See https://github.com/matrix-org/synapse/issues/12552.
# log_kv(
# {"message": "sent device update to host", "host": host}
# )
all_hosts_to_notify.update(hosts)
if current_stream_id != stream_id:
# Clear the set of hosts we've already sent to as we're
@@ -1161,6 +1148,15 @@ class DeviceWriterHandler(DeviceHandler):
hosts_already_sent_to.update(hosts)
current_stream_id = stream_id
# Notify replication once for the whole batch, so that the
# outbound pokes can be coalesced into fewer RDATA messages.
self.notifier.notify_replication()
if all_hosts_to_notify and self.federation_sender:
await self.federation_sender.send_device_messages(
all_hosts_to_notify, immediate=False
)
# Advance `(stream_id, room_id)`.
_, _, room_id, stream_id, _ = rows[-1]
await self.store.set_device_change_last_converted_pos(