From b89d578d560614f2ff65a96503e6fc931608ff2b Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 24 Mar 2026 17:22:11 +0100 Subject: [PATCH] Limit outgoing to_device EDU size to 65536 (#18416) If a set of messages exceeds this limit, the messages are split across several EDUs. Fix #17035 (should) There is currently [no official specced limit for EDUs](https://github.com/matrix-org/matrix-spec/issues/807), but the consensus seems to be that it would be useful to have one to avoid this bug by bounding the transaction size. As a side effect it also limits the size of a single to-device message to a bit less than 65536. This should probably be added to the spec similarly to the [message size limit.](https://spec.matrix.org/v1.14/client-server-api/#size-limits) Spec PR: https://github.com/matrix-org/matrix-spec/pull/2340 --------- Co-authored-by: mcalinghee Co-authored-by: Eric Eastwood --- changelog.d/18416.bugfix | 1 + synapse/api/constants.py | 16 ++ .../sender/per_destination_queue.py | 13 +- synapse/handlers/devicemessage.py | 158 ++++++++++++++--- synapse/storage/databases/main/deviceinbox.py | 47 +++++- tests/handlers/test_appservice.py | 4 +- tests/rest/client/test_sendtodevice.py | 159 +++++++++++++++++- 7 files changed, 360 insertions(+), 38 deletions(-) create mode 100644 changelog.d/18416.bugfix diff --git a/changelog.d/18416.bugfix b/changelog.d/18416.bugfix new file mode 100644 index 0000000000..71f181fed4 --- /dev/null +++ b/changelog.d/18416.bugfix @@ -0,0 +1 @@ +A long queue of to-device messages could prevent outgoing federation because of the size of the transaction, let's limit the to-device EDU size to a reasonable value. diff --git a/synapse/api/constants.py b/synapse/api/constants.py index eb9e6cc39b..27f06ec525 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -30,6 +30,22 @@ from synapse.util.duration import Duration # the max size of a (canonical-json-encoded) event MAX_PDU_SIZE = 65536 +# This isn't spec'ed but is our own reasonable default to play nice with Synapse's +# `max_request_size`/`max_request_body_size`. We chose the same as `MAX_PDU_SIZE` as our +# `max_request_body_size` math is currently limited by 200 `MAX_PDU_SIZE` things. The +# spec for a `/federation/v1/send` request sets the limit at 100 EDU's and 50 PDU's +# which is below that 200 `MAX_PDU_SIZE` limit (`max_request_body_size`). +# +# Allowing oversized EDU's results in failed `/federation/v1/send` transactions (because +# the request overall can overrun the `max_request_body_size`) which are retried over +# and over and prevent other outbound federation traffic from happening. +MAX_EDU_SIZE = 65536 + +# This is defined in the Matrix spec and enforced by the receiver. +MAX_EDUS_PER_TRANSACTION = 100 +# A transaction can contain up to 100 EDUs but synapse reserves 10 EDUs for other purposes +# like trickling out some device list updates. +NUMBER_OF_RESERVED_EDUS_PER_TRANSACTION = 10 # The maximum allowed size of an HTTP request. # Other than media uploads, the biggest request we expect to see is a fully-loaded diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index cdacf16d72..32f8630c9d 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -30,7 +30,11 @@ from prometheus_client import Counter from twisted.internet import defer -from synapse.api.constants import EduTypes +from synapse.api.constants import ( + MAX_EDUS_PER_TRANSACTION, + NUMBER_OF_RESERVED_EDUS_PER_TRANSACTION, + EduTypes, +) from synapse.api.errors import ( FederationDeniedError, HttpResponseException, @@ -51,9 +55,6 @@ from synapse.visibility import filter_events_for_server if TYPE_CHECKING: import synapse.server -# This is defined in the Matrix spec and enforced by the receiver. -MAX_EDUS_PER_TRANSACTION = 100 - logger = logging.getLogger(__name__) @@ -798,7 +799,9 @@ class _TransactionQueueManager: ( to_device_edus, device_stream_id, - ) = await self.queue._get_to_device_message_edus(edu_limit - 10) + ) = await self.queue._get_to_device_message_edus( + edu_limit - NUMBER_OF_RESERVED_EDUS_PER_TRANSACTION + ) if to_device_edus: self._device_stream_id = device_stream_id diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 0ef14b31da..2096b44f6c 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -23,8 +23,15 @@ import logging from http import HTTPStatus from typing import TYPE_CHECKING, Any -from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes -from synapse.api.errors import Codes, SynapseError +from canonicaljson import encode_canonical_json + +from synapse.api.constants import ( + MAX_EDU_SIZE, + EduTypes, + EventContentFields, + ToDeviceEventTypes, +) +from synapse.api.errors import Codes, EventSizeError, SynapseError from synapse.api.ratelimiting import Ratelimiter from synapse.logging.context import run_in_background from synapse.logging.opentracing import ( @@ -35,7 +42,7 @@ from synapse.logging.opentracing import ( ) from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id from synapse.util.json import json_encoder -from synapse.util.stringutils import random_string +from synapse.util.stringutils import random_string_insecure_fast if TYPE_CHECKING: from synapse.server import HomeServer @@ -222,6 +229,7 @@ class DeviceMessageHandler: set_tag(SynapseTags.TO_DEVICE_TYPE, message_type) set_tag(SynapseTags.TO_DEVICE_SENDER, sender_user_id) local_messages = {} + # Map from destination (server) -> recipient (user ID) -> device_id -> JSON message content remote_messages: dict[str, dict[str, dict[str, JsonDict]]] = {} for user_id, by_device in messages.items(): if not UserID.is_valid(user_id): @@ -277,28 +285,33 @@ class DeviceMessageHandler: destination = get_domain_from_id(user_id) remote_messages.setdefault(destination, {})[user_id] = by_device - context = get_active_span_text_map() - - remote_edu_contents = {} - for destination, messages in remote_messages.items(): - # The EDU contains a "message_id" property which is used for - # idempotence. Make up a random one. - message_id = random_string(16) - log_kv({"destination": destination, "message_id": message_id}) - - remote_edu_contents[destination] = { - "messages": messages, - "sender": sender_user_id, - "type": message_type, - "message_id": message_id, - "org.matrix.opentracing_context": json_encoder.encode(context), - } - - # Add messages to the database. + # Add local messages to the database. # Retrieve the stream id of the last-processed to-device message. - last_stream_id = await self.store.add_messages_to_device_inbox( - local_messages, remote_edu_contents + last_stream_id = ( + await self.store.add_local_messages_from_client_to_device_inbox( + local_messages + ) ) + for destination, messages in remote_messages.items(): + split_edus = split_device_messages_into_edus( + sender_user_id, message_type, messages + ) + for edu in split_edus: + edu["org.matrix.opentracing_context"] = json_encoder.encode( + get_active_span_text_map() + ) + # Add remote messages to the database. + last_stream_id = ( + await self.store.add_remote_messages_from_client_to_device_inbox( + {destination: edu} + ) + ) + log_kv( + { + "destination": destination, + "message_id": edu["message_id"], + } + ) # Notify listeners that there are new to-device messages to process, # handing them the latest stream id. @@ -397,3 +410,102 @@ class DeviceMessageHandler: "events": messages, "next_batch": f"d{stream_id}", } + + +def split_device_messages_into_edus( + sender_user_id: str, + message_type: str, + messages_by_user_then_device: dict[str, dict[str, JsonDict]], +) -> list[JsonDict]: + """ + This function takes many to-device messages and fits/splits them into several EDUs + as necessary. We split the messages up as the overall request can overrun the + `max_request_body_size` and prevent outbound federation traffic because of the size + of the transaction (cf. `MAX_EDU_SIZE`). + + Args: + sender_user_id: The user that is sending the to-device messages. + message_type: The type of to-device messages that are being sent. + messages_by_user_then_device: Dictionary of recipient user_id to recipient device_id to message. + + Returns: + Bin-packed list of EDU JSON content for the given to_device messages + + Raises: + EventSizeError: If a single to-device message is too large to fit into an EDU. + """ + split_edus_content: list[JsonDict] = [] + + # Convert messages dict to a list of (recipient, messages_by_device) pairs + message_items = list(messages_by_user_then_device.items()) + + while message_items: + edu_messages = {} + # Start by trying to fit all remaining messages + target_count = len(message_items) + + while target_count > 0: + # Take the first target_count messages + edu_messages = dict(message_items[:target_count]) + edu_content = create_new_to_device_edu_content( + sender_user_id, message_type, edu_messages + ) + # Let's add the whole EDU structure before testing the size + edu = { + "content": edu_content, + "edu_type": EduTypes.DIRECT_TO_DEVICE, + } + + if len(encode_canonical_json(edu)) <= MAX_EDU_SIZE: + # It fits! Add this EDU and remove these messages from the list + split_edus_content.append(edu_content) + message_items = message_items[target_count:] + + logger.debug( + "Created EDU with %d recipients from %s (message_id=%s), (total EDUs so far: %d)", + target_count, + sender_user_id, + edu_content["message_id"], + len(split_edus_content), + ) + break + else: + if target_count == 1: + # Single recipient's messages are too large, let's reject the client + # call with 413/`M_TOO_LARGE`, we expect this error to reach the + # client in the case of the /sendToDevice endpoint. + # + # 413 is currently an unspecced response for `/sendToDevice` but is + # probably the best thing we can do. + # https://github.com/matrix-org/matrix-spec/pull/2340 tracks adding + # this to the spec + recipient = message_items[0][0] + raise EventSizeError( + f"device message to {recipient} too large to fit in a single EDU", + unpersistable=True, + ) + + # Halve the number of messages and try again + target_count = target_count // 2 + + return split_edus_content + + +def create_new_to_device_edu_content( + sender_user_id: str, + message_type: str, + messages_by_user_then_device: dict[str, dict[str, JsonDict]], +) -> JsonDict: + """ + Create a new `m.direct_to_device` EDU `content` object with a unique message ID. + """ + # The EDU contains a "message_id" property which is used for + # idempotence. Make up a random one. + message_id = random_string_insecure_fast(16) + content = { + "sender": sender_user_id, + "type": message_type, + "message_id": message_id, + "messages": messages_by_user_then_device, + } + return content diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index fc61f46c1c..9fed22c1b3 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -739,18 +739,15 @@ class DeviceInboxWorkerStore(SQLBaseStore): ) @trace - async def add_messages_to_device_inbox( + async def add_local_messages_from_client_to_device_inbox( self, local_messages_by_user_then_device: dict[str, dict[str, JsonDict]], - remote_messages_by_destination: dict[str, JsonDict], ) -> int: - """Used to send messages from this server. + """Queue local device messages that will be sent to devices of local users. Args: local_messages_by_user_then_device: Dictionary of recipient user_id to recipient device_id to message. - remote_messages_by_destination: - Dictionary of destination server_name to the EDU JSON to send. Returns: The new stream_id. @@ -766,6 +763,39 @@ class DeviceInboxWorkerStore(SQLBaseStore): txn, stream_id, local_messages_by_user_then_device ) + async with self._to_device_msg_id_gen.get_next() as stream_id: + now_ms = self.clock.time_msec() + await self.db_pool.runInteraction( + "add_local_messages_from_client_to_device_inbox", + add_messages_txn, + now_ms, + stream_id, + ) + for user_id in local_messages_by_user_then_device.keys(): + self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id) + + return self._to_device_msg_id_gen.get_current_token() + + @trace + async def add_remote_messages_from_client_to_device_inbox( + self, + remote_messages_by_destination: dict[str, JsonDict], + ) -> int: + """Queue device messages that will be sent to remote servers. + + Args: + remote_messages_by_destination: + Dictionary of destination server_name to the EDU JSON to send. + + Returns: + The new stream_id. + """ + + assert self._can_write_to_device + + def add_messages_txn( + txn: LoggingTransaction, now_ms: int, stream_id: int + ) -> None: # Add the remote messages to the federation outbox. # We'll send them to a remote server when we next send a # federation transaction to that destination. @@ -824,10 +854,11 @@ class DeviceInboxWorkerStore(SQLBaseStore): async with self._to_device_msg_id_gen.get_next() as stream_id: now_ms = self.clock.time_msec() await self.db_pool.runInteraction( - "add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id + "add_remote_messages_from_client_to_device_inbox", + add_messages_txn, + now_ms, + stream_id, ) - for user_id in local_messages_by_user_then_device.keys(): - self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id) for destination in remote_messages_by_destination.keys(): self._device_federation_outbox_stream_cache.entity_has_changed( destination, stream_id diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 6336edb108..ac7411291b 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -856,7 +856,9 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase): # Seed the device_inbox table with our fake messages self.get_success( - self.hs.get_datastores().main.add_messages_to_device_inbox(messages, {}) + self.hs.get_datastores().main.add_local_messages_from_client_to_device_inbox( + messages + ) ) # Now have local_user send a final to-device message to exclusive_as_user. All unsent diff --git a/tests/rest/client/test_sendtodevice.py b/tests/rest/client/test_sendtodevice.py index 56533d85f5..526d9f0b3c 100644 --- a/tests/rest/client/test_sendtodevice.py +++ b/tests/rest/client/test_sendtodevice.py @@ -18,9 +18,21 @@ # [This file includes modifications made by New Vector Limited] # # -from synapse.api.constants import EduTypes +from unittest.mock import AsyncMock, Mock + +from twisted.test.proto_helpers import MemoryReactor + +from synapse.api.constants import ( + MAX_EDU_SIZE, + MAX_EDUS_PER_TRANSACTION, + EduTypes, +) +from synapse.api.errors import Codes from synapse.rest import admin from synapse.rest.client import login, sendtodevice, sync +from synapse.server import HomeServer +from synapse.types import JsonDict +from synapse.util.clock import Clock from tests.unittest import HomeserverTestCase, override_config @@ -41,6 +53,20 @@ class SendToDeviceTestCase(HomeserverTestCase): sync.register_servlets, ] + def default_config(self) -> JsonDict: + config = super().default_config() + config["federation_sender_instances"] = None + return config + + def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: + self.federation_transport_client = Mock(spec=["send_transaction"]) + self.federation_transport_client.send_transaction = AsyncMock() + hs = self.setup_test_homeserver( + federation_transport_client=self.federation_transport_client, + ) + + return hs + def test_user_to_user(self) -> None: """A to-device message from one user to another should get delivered""" @@ -91,6 +117,137 @@ class SendToDeviceTestCase(HomeserverTestCase): self.assertEqual(channel.code, 200, channel.result) self.assertEqual(channel.json_body.get("to_device", {}).get("events", []), []) + def test_large_remote_todevice(self) -> None: + """A to-device message needs to fit in the EDU size limit""" + _ = self.register_user("u1", "pass") + user1_tok = self.login("u1", "pass", "d1") + + # Create a message that is over the `MAX_EDU_SIZE` + test_msg = {"foo": "a" * MAX_EDU_SIZE} + channel = self.make_request( + "PUT", + "/_matrix/client/r0/sendToDevice/m.test/12345", + content={"messages": {"@remote_user:secondserver": {"device": test_msg}}}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 413, channel.result) + self.assertEqual(Codes.TOO_LARGE, channel.json_body["errcode"]) + + def test_edu_large_messages_splitting(self) -> None: + """ + Test that a bunch of to-device messages are split over multiple EDUs if they are + collectively too large to fit into a single EDU + """ + mock_send_transaction: AsyncMock = ( + self.federation_transport_client.send_transaction + ) + mock_send_transaction.return_value = {} + + sender = self.hs.get_federation_sender() + + _ = self.register_user("u1", "pass") + user1_tok = self.login("u1", "pass", "d1") + destination = "secondserver" + messages = {} + + # 2 messages, each just big enough to fit into their own EDU + for i in range(2): + messages[f"@remote_user{i}:" + destination] = { + "device": {"foo": "a" * (MAX_EDU_SIZE - 1000)} + } + + channel = self.make_request( + "PUT", + "/_matrix/client/r0/sendToDevice/m.test/1234567", + content={"messages": messages}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.result) + + self.get_success(sender.send_device_messages([destination])) + + number_of_edus_sent = 0 + for call in mock_send_transaction.call_args_list: + number_of_edus_sent += len(call[0][1]()["edus"]) + + self.assertEqual(number_of_edus_sent, 2) + + def test_edu_small_messages_not_splitting(self) -> None: + """ + Test that a couple of small messages do not get split into multiple EDUs + """ + mock_send_transaction: AsyncMock = ( + self.federation_transport_client.send_transaction + ) + mock_send_transaction.return_value = {} + + sender = self.hs.get_federation_sender() + + _ = self.register_user("u1", "pass") + user1_tok = self.login("u1", "pass", "d1") + destination = "secondserver" + messages = {} + + # 2 small messages that should fit in a single EDU + for i in range(2): + messages[f"@remote_user{i}:" + destination] = {"device": {"foo": "a" * 100}} + + channel = self.make_request( + "PUT", + "/_matrix/client/r0/sendToDevice/m.test/123456", + content={"messages": messages}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.result) + + self.get_success(sender.send_device_messages([destination])) + + number_of_edus_sent = 0 + for call in mock_send_transaction.call_args_list: + number_of_edus_sent += len(call[0][1]()["edus"]) + + self.assertEqual(number_of_edus_sent, 1) + + def test_transaction_splitting(self) -> None: + """Test that a bunch of to-device messages are split into multiple transactions if there are too many EDUs""" + mock_send_transaction: AsyncMock = ( + self.federation_transport_client.send_transaction + ) + mock_send_transaction.return_value = {} + + sender = self.hs.get_federation_sender() + + _ = self.register_user("u1", "pass") + user1_tok = self.login("u1", "pass", "d1") + destination = "secondserver" + messages = {} + + number_of_edus_to_send = MAX_EDUS_PER_TRANSACTION + 1 + + for i in range(number_of_edus_to_send): + messages[f"@remote_user{i}:" + destination] = { + "device": {"foo": "a" * (MAX_EDU_SIZE - 1000)} + } + + channel = self.make_request( + "PUT", + "/_matrix/client/r0/sendToDevice/m.test/12345678", + content={"messages": messages}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.result) + + self.get_success(sender.send_device_messages([destination])) + + # At least 2 transactions should be sent since we are over the EDU limit per transaction + self.assertGreaterEqual(mock_send_transaction.call_count, 2) + + number_of_edus_sent = 0 + for call in mock_send_transaction.call_args_list: + number_of_edus_sent += len(call[0][1]()["edus"]) + + self.assertEqual(number_of_edus_sent, number_of_edus_to_send) + @override_config({"rc_key_requests": {"per_second": 10, "burst_count": 2}}) def test_local_room_key_request(self) -> None: """m.room_key_request has special-casing; test from local user"""