mirror of
https://github.com/element-hq/synapse.git
synced 2026-05-25 16:24:06 +00:00
Correct misleading logging in _add_messages_to_local_device_inbox_txn
This commit is contained in:
@@ -897,7 +897,8 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
) -> None:
|
||||
assert self._can_write_to_device
|
||||
|
||||
local_by_user_then_device = {}
|
||||
# A map from user id, to device id, to a pair of (serialized message, msgid).
|
||||
local_by_user_then_device: dict[str, dict[str, tuple[str, str]]] = {}
|
||||
for user_id, messages_by_device in messages_by_user_then_device.items():
|
||||
messages_json_for_user = {}
|
||||
devices = list(messages_by_device.keys())
|
||||
@@ -912,11 +913,13 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
retcol="device_id",
|
||||
)
|
||||
|
||||
message_json = json_encoder.encode(messages_by_device["*"])
|
||||
message_json, msgid = _serialize_to_device_message(
|
||||
user_id, "*", messages_by_device["*"]
|
||||
)
|
||||
for device_id in devices:
|
||||
# Add the message for all devices for this user on this
|
||||
# server.
|
||||
messages_json_for_user[device_id] = message_json
|
||||
messages_json_for_user[device_id] = (message_json, msgid)
|
||||
else:
|
||||
if not devices:
|
||||
continue
|
||||
@@ -938,19 +941,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
for (device_id,) in rows:
|
||||
# Only insert into the local inbox if the device exists on
|
||||
# this server
|
||||
with start_active_span("serialise_to_device_message"):
|
||||
msg = messages_by_device[device_id]
|
||||
set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"])
|
||||
set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"])
|
||||
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
|
||||
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
|
||||
set_tag(
|
||||
SynapseTags.TO_DEVICE_MSGID,
|
||||
msg["content"].get(EventContentFields.TO_DEVICE_MSGID),
|
||||
)
|
||||
message_json = json_encoder.encode(msg)
|
||||
|
||||
messages_json_for_user[device_id] = message_json
|
||||
msg = messages_by_device[device_id]
|
||||
message_json, msgid = _serialize_to_device_message(
|
||||
user_id, device_id, msg
|
||||
)
|
||||
messages_json_for_user[device_id] = (message_json, msgid)
|
||||
|
||||
if messages_json_for_user:
|
||||
local_by_user_then_device[user_id] = messages_json_for_user
|
||||
@@ -965,22 +960,21 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
values=[
|
||||
(user_id, device_id, stream_id, message_json, self._instance_name)
|
||||
for user_id, messages_by_device in local_by_user_then_device.items()
|
||||
for device_id, message_json in messages_by_device.items()
|
||||
for device_id, (message_json, _msgid) in messages_by_device.items()
|
||||
],
|
||||
)
|
||||
|
||||
if issue9533_logger.isEnabledFor(logging.DEBUG):
|
||||
issue9533_logger.debug(
|
||||
"Stored to-device messages with stream_id %i: %s",
|
||||
"Storing to-device messages with stream_id %i: %s",
|
||||
stream_id,
|
||||
[
|
||||
f"{user_id}/{device_id} (msgid "
|
||||
f"{msg['content'].get(EventContentFields.TO_DEVICE_MSGID)})"
|
||||
f"{user_id}/{device_id} (msgid {msgid})"
|
||||
for (
|
||||
user_id,
|
||||
messages_by_device,
|
||||
) in messages_by_user_then_device.items()
|
||||
for (device_id, msg) in messages_by_device.items()
|
||||
) in local_by_user_then_device.items()
|
||||
for (device_id, (_msg, msgid)) in messages_by_device.items()
|
||||
],
|
||||
)
|
||||
|
||||
@@ -1066,6 +1060,24 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
return results
|
||||
|
||||
|
||||
def _serialize_to_device_message(
|
||||
user_id: str, device_id: str, msg: JsonDict
|
||||
) -> tuple[str, str]:
|
||||
"""Serialiize a to-device message, ready to add to the device_inbox table.
|
||||
|
||||
Returns a tuple (message_json, msgid).
|
||||
"""
|
||||
with start_active_span("serialise_to_device_message"):
|
||||
msgid: str = msg["content"].get(EventContentFields.TO_DEVICE_MSGID, "")
|
||||
set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"])
|
||||
set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"])
|
||||
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
|
||||
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
|
||||
set_tag(SynapseTags.TO_DEVICE_MSGID, msgid)
|
||||
message_json = json_encoder.encode(msg)
|
||||
return message_json, msgid
|
||||
|
||||
|
||||
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
|
||||
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
|
||||
REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox"
|
||||
|
||||
Reference in New Issue
Block a user