diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 4552667176..d4c75bb965 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -1098,6 +1098,11 @@ class DeviceWriterHandler(DeviceHandler): else: return + # Collect all hosts we need to notify federation about + # across the whole batch, so we can send a single + # notification at the end. + all_hosts_to_notify: set[str] = set() + for user_id, device_id, room_id, stream_id, opentracing_context in rows: hosts = set() @@ -1133,25 +1138,7 @@ class DeviceWriterHandler(DeviceHandler): converted_upto_stream_id=stream_id, ) - # Notify replication that we've updated the device list stream. - self.notifier.notify_replication() - - if hosts and self.federation_sender: - logger.info( - "Sending device list update notif for %r to: %r", - user_id, - hosts, - ) - await self.federation_sender.send_device_messages( - hosts, immediate=False - ) - # TODO: when called, this isn't in a logging context. - # This leads to log spam, sentry event spam, and massive - # memory usage. - # See https://github.com/matrix-org/synapse/issues/12552. - # log_kv( - # {"message": "sent device update to host", "host": host} - # ) + all_hosts_to_notify.update(hosts) if current_stream_id != stream_id: # Clear the set of hosts we've already sent to as we're @@ -1161,6 +1148,15 @@ class DeviceWriterHandler(DeviceHandler): hosts_already_sent_to.update(hosts) current_stream_id = stream_id + # Notify replication once for the whole batch, so that the + # outbound pokes can be coalesced into fewer RDATA messages. + self.notifier.notify_replication() + + if all_hosts_to_notify and self.federation_sender: + await self.federation_sender.send_device_messages( + all_hosts_to_notify, immediate=False + ) + # Advance `(stream_id, room_id)`. _, _, room_id, stream_id, _ = rows[-1] await self.store.set_device_change_last_converted_pos(