From 0b56f31f8b80fb87e02f3c159e0bf3713a4be681 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 22 May 2026 10:34:14 +0100 Subject: [PATCH] Correct misleading logging in `_add_messages_to_local_device_inbox_txn` --- synapse/storage/databases/main/deviceinbox.py | 56 +++++++++++-------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index fc61f46c1c..04e3057464 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -897,7 +897,8 @@ class DeviceInboxWorkerStore(SQLBaseStore): ) -> None: assert self._can_write_to_device - local_by_user_then_device = {} + # A map from user id, to device id, to a pair of (serialized message, msgid). + local_by_user_then_device: dict[str, dict[str, tuple[str, str]]] = {} for user_id, messages_by_device in messages_by_user_then_device.items(): messages_json_for_user = {} devices = list(messages_by_device.keys()) @@ -912,11 +913,13 @@ class DeviceInboxWorkerStore(SQLBaseStore): retcol="device_id", ) - message_json = json_encoder.encode(messages_by_device["*"]) + message_json, msgid = _serialize_to_device_message( + user_id, "*", messages_by_device["*"] + ) for device_id in devices: # Add the message for all devices for this user on this # server. - messages_json_for_user[device_id] = message_json + messages_json_for_user[device_id] = (message_json, msgid) else: if not devices: continue @@ -938,19 +941,11 @@ class DeviceInboxWorkerStore(SQLBaseStore): for (device_id,) in rows: # Only insert into the local inbox if the device exists on # this server - with start_active_span("serialise_to_device_message"): - msg = messages_by_device[device_id] - set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"]) - set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"]) - set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id) - set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id) - set_tag( - SynapseTags.TO_DEVICE_MSGID, - msg["content"].get(EventContentFields.TO_DEVICE_MSGID), - ) - message_json = json_encoder.encode(msg) - - messages_json_for_user[device_id] = message_json + msg = messages_by_device[device_id] + message_json, msgid = _serialize_to_device_message( + user_id, device_id, msg + ) + messages_json_for_user[device_id] = (message_json, msgid) if messages_json_for_user: local_by_user_then_device[user_id] = messages_json_for_user @@ -965,22 +960,21 @@ class DeviceInboxWorkerStore(SQLBaseStore): values=[ (user_id, device_id, stream_id, message_json, self._instance_name) for user_id, messages_by_device in local_by_user_then_device.items() - for device_id, message_json in messages_by_device.items() + for device_id, (message_json, _msgid) in messages_by_device.items() ], ) if issue9533_logger.isEnabledFor(logging.DEBUG): issue9533_logger.debug( - "Stored to-device messages with stream_id %i: %s", + "Storing to-device messages with stream_id %i: %s", stream_id, [ - f"{user_id}/{device_id} (msgid " - f"{msg['content'].get(EventContentFields.TO_DEVICE_MSGID)})" + f"{user_id}/{device_id} (msgid {msgid})" for ( user_id, messages_by_device, - ) in messages_by_user_then_device.items() - for (device_id, msg) in messages_by_device.items() + ) in local_by_user_then_device.items() + for (device_id, (_msg, msgid)) in messages_by_device.items() ], ) @@ -1066,6 +1060,24 @@ class DeviceInboxWorkerStore(SQLBaseStore): return results +def _serialize_to_device_message( + user_id: str, device_id: str, msg: JsonDict +) -> tuple[str, str]: + """Serialiize a to-device message, ready to add to the device_inbox table. + + Returns a tuple (message_json, msgid). + """ + with start_active_span("serialise_to_device_message"): + msgid: str = msg["content"].get(EventContentFields.TO_DEVICE_MSGID, "") + set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"]) + set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"]) + set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id) + set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id) + set_tag(SynapseTags.TO_DEVICE_MSGID, msgid) + message_json = json_encoder.encode(msg) + return message_json, msgid + + class DeviceInboxBackgroundUpdateStore(SQLBaseStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox"