Limit outgoing to_device EDU size to 65536 (#18416)

If a set of messages exceeds this limit, the messages are split
across several EDUs.

Fix #17035 (should)

There is currently [no official specced limit for
EDUs](https://github.com/matrix-org/matrix-spec/issues/807), but the
consensus seems to be that it would be useful to have one to avoid this
bug by bounding the transaction size.

As a side effect it also limits the size of a single to-device message
to a bit less than 65536.

This should probably be added to the spec similarly to the [message size
limit.](https://spec.matrix.org/v1.14/client-server-api/#size-limits)

Spec PR: https://github.com/matrix-org/matrix-spec/pull/2340

---------

Co-authored-by: mcalinghee <mcalinghee.dev@gmail.com>
Co-authored-by: Eric Eastwood <madlittlemods@gmail.com>
This commit is contained in:
Mathieu Velten
2026-03-24 17:22:11 +01:00
committed by Erik Johnston
parent 0549307198
commit b89d578d56
7 changed files with 360 additions and 38 deletions

1
changelog.d/18416.bugfix Normal file
View File

@@ -0,0 +1 @@
A long queue of to-device messages could prevent outgoing federation because of the size of the transaction, let's limit the to-device EDU size to a reasonable value.

View File

@@ -30,6 +30,22 @@ from synapse.util.duration import Duration
# the max size of a (canonical-json-encoded) event
MAX_PDU_SIZE = 65536
# This isn't spec'ed but is our own reasonable default to play nice with Synapse's
# `max_request_size`/`max_request_body_size`. We chose the same as `MAX_PDU_SIZE` as our
# `max_request_body_size` math is currently limited by 200 `MAX_PDU_SIZE` things. The
# spec for a `/federation/v1/send` request sets the limit at 100 EDU's and 50 PDU's
# which is below that 200 `MAX_PDU_SIZE` limit (`max_request_body_size`).
#
# Allowing oversized EDU's results in failed `/federation/v1/send` transactions (because
# the request overall can overrun the `max_request_body_size`) which are retried over
# and over and prevent other outbound federation traffic from happening.
MAX_EDU_SIZE = 65536
# This is defined in the Matrix spec and enforced by the receiver.
MAX_EDUS_PER_TRANSACTION = 100
# A transaction can contain up to 100 EDUs but synapse reserves 10 EDUs for other purposes
# like trickling out some device list updates.
NUMBER_OF_RESERVED_EDUS_PER_TRANSACTION = 10
# The maximum allowed size of an HTTP request.
# Other than media uploads, the biggest request we expect to see is a fully-loaded

View File

@@ -30,7 +30,11 @@ from prometheus_client import Counter
from twisted.internet import defer
from synapse.api.constants import EduTypes
from synapse.api.constants import (
MAX_EDUS_PER_TRANSACTION,
NUMBER_OF_RESERVED_EDUS_PER_TRANSACTION,
EduTypes,
)
from synapse.api.errors import (
FederationDeniedError,
HttpResponseException,
@@ -51,9 +55,6 @@ from synapse.visibility import filter_events_for_server
if TYPE_CHECKING:
import synapse.server
# This is defined in the Matrix spec and enforced by the receiver.
MAX_EDUS_PER_TRANSACTION = 100
logger = logging.getLogger(__name__)
@@ -798,7 +799,9 @@ class _TransactionQueueManager:
(
to_device_edus,
device_stream_id,
) = await self.queue._get_to_device_message_edus(edu_limit - 10)
) = await self.queue._get_to_device_message_edus(
edu_limit - NUMBER_OF_RESERVED_EDUS_PER_TRANSACTION
)
if to_device_edus:
self._device_stream_id = device_stream_id

View File

@@ -23,8 +23,15 @@ import logging
from http import HTTPStatus
from typing import TYPE_CHECKING, Any
from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes
from synapse.api.errors import Codes, SynapseError
from canonicaljson import encode_canonical_json
from synapse.api.constants import (
MAX_EDU_SIZE,
EduTypes,
EventContentFields,
ToDeviceEventTypes,
)
from synapse.api.errors import Codes, EventSizeError, SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import (
@@ -35,7 +42,7 @@ from synapse.logging.opentracing import (
)
from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
from synapse.util.json import json_encoder
from synapse.util.stringutils import random_string
from synapse.util.stringutils import random_string_insecure_fast
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -222,6 +229,7 @@ class DeviceMessageHandler:
set_tag(SynapseTags.TO_DEVICE_TYPE, message_type)
set_tag(SynapseTags.TO_DEVICE_SENDER, sender_user_id)
local_messages = {}
# Map from destination (server) -> recipient (user ID) -> device_id -> JSON message content
remote_messages: dict[str, dict[str, dict[str, JsonDict]]] = {}
for user_id, by_device in messages.items():
if not UserID.is_valid(user_id):
@@ -277,28 +285,33 @@ class DeviceMessageHandler:
destination = get_domain_from_id(user_id)
remote_messages.setdefault(destination, {})[user_id] = by_device
context = get_active_span_text_map()
remote_edu_contents = {}
for destination, messages in remote_messages.items():
# The EDU contains a "message_id" property which is used for
# idempotence. Make up a random one.
message_id = random_string(16)
log_kv({"destination": destination, "message_id": message_id})
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
"org.matrix.opentracing_context": json_encoder.encode(context),
}
# Add messages to the database.
# Add local messages to the database.
# Retrieve the stream id of the last-processed to-device message.
last_stream_id = await self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
last_stream_id = (
await self.store.add_local_messages_from_client_to_device_inbox(
local_messages
)
)
for destination, messages in remote_messages.items():
split_edus = split_device_messages_into_edus(
sender_user_id, message_type, messages
)
for edu in split_edus:
edu["org.matrix.opentracing_context"] = json_encoder.encode(
get_active_span_text_map()
)
# Add remote messages to the database.
last_stream_id = (
await self.store.add_remote_messages_from_client_to_device_inbox(
{destination: edu}
)
)
log_kv(
{
"destination": destination,
"message_id": edu["message_id"],
}
)
# Notify listeners that there are new to-device messages to process,
# handing them the latest stream id.
@@ -397,3 +410,102 @@ class DeviceMessageHandler:
"events": messages,
"next_batch": f"d{stream_id}",
}
def split_device_messages_into_edus(
sender_user_id: str,
message_type: str,
messages_by_user_then_device: dict[str, dict[str, JsonDict]],
) -> list[JsonDict]:
"""
This function takes many to-device messages and fits/splits them into several EDUs
as necessary. We split the messages up as the overall request can overrun the
`max_request_body_size` and prevent outbound federation traffic because of the size
of the transaction (cf. `MAX_EDU_SIZE`).
Args:
sender_user_id: The user that is sending the to-device messages.
message_type: The type of to-device messages that are being sent.
messages_by_user_then_device: Dictionary of recipient user_id to recipient device_id to message.
Returns:
Bin-packed list of EDU JSON content for the given to_device messages
Raises:
EventSizeError: If a single to-device message is too large to fit into an EDU.
"""
split_edus_content: list[JsonDict] = []
# Convert messages dict to a list of (recipient, messages_by_device) pairs
message_items = list(messages_by_user_then_device.items())
while message_items:
edu_messages = {}
# Start by trying to fit all remaining messages
target_count = len(message_items)
while target_count > 0:
# Take the first target_count messages
edu_messages = dict(message_items[:target_count])
edu_content = create_new_to_device_edu_content(
sender_user_id, message_type, edu_messages
)
# Let's add the whole EDU structure before testing the size
edu = {
"content": edu_content,
"edu_type": EduTypes.DIRECT_TO_DEVICE,
}
if len(encode_canonical_json(edu)) <= MAX_EDU_SIZE:
# It fits! Add this EDU and remove these messages from the list
split_edus_content.append(edu_content)
message_items = message_items[target_count:]
logger.debug(
"Created EDU with %d recipients from %s (message_id=%s), (total EDUs so far: %d)",
target_count,
sender_user_id,
edu_content["message_id"],
len(split_edus_content),
)
break
else:
if target_count == 1:
# Single recipient's messages are too large, let's reject the client
# call with 413/`M_TOO_LARGE`, we expect this error to reach the
# client in the case of the /sendToDevice endpoint.
#
# 413 is currently an unspecced response for `/sendToDevice` but is
# probably the best thing we can do.
# https://github.com/matrix-org/matrix-spec/pull/2340 tracks adding
# this to the spec
recipient = message_items[0][0]
raise EventSizeError(
f"device message to {recipient} too large to fit in a single EDU",
unpersistable=True,
)
# Halve the number of messages and try again
target_count = target_count // 2
return split_edus_content
def create_new_to_device_edu_content(
sender_user_id: str,
message_type: str,
messages_by_user_then_device: dict[str, dict[str, JsonDict]],
) -> JsonDict:
"""
Create a new `m.direct_to_device` EDU `content` object with a unique message ID.
"""
# The EDU contains a "message_id" property which is used for
# idempotence. Make up a random one.
message_id = random_string_insecure_fast(16)
content = {
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
"messages": messages_by_user_then_device,
}
return content

View File

@@ -739,18 +739,15 @@ class DeviceInboxWorkerStore(SQLBaseStore):
)
@trace
async def add_messages_to_device_inbox(
async def add_local_messages_from_client_to_device_inbox(
self,
local_messages_by_user_then_device: dict[str, dict[str, JsonDict]],
remote_messages_by_destination: dict[str, JsonDict],
) -> int:
"""Used to send messages from this server.
"""Queue local device messages that will be sent to devices of local users.
Args:
local_messages_by_user_then_device:
Dictionary of recipient user_id to recipient device_id to message.
remote_messages_by_destination:
Dictionary of destination server_name to the EDU JSON to send.
Returns:
The new stream_id.
@@ -766,6 +763,39 @@ class DeviceInboxWorkerStore(SQLBaseStore):
txn, stream_id, local_messages_by_user_then_device
)
async with self._to_device_msg_id_gen.get_next() as stream_id:
now_ms = self.clock.time_msec()
await self.db_pool.runInteraction(
"add_local_messages_from_client_to_device_inbox",
add_messages_txn,
now_ms,
stream_id,
)
for user_id in local_messages_by_user_then_device.keys():
self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id)
return self._to_device_msg_id_gen.get_current_token()
@trace
async def add_remote_messages_from_client_to_device_inbox(
self,
remote_messages_by_destination: dict[str, JsonDict],
) -> int:
"""Queue device messages that will be sent to remote servers.
Args:
remote_messages_by_destination:
Dictionary of destination server_name to the EDU JSON to send.
Returns:
The new stream_id.
"""
assert self._can_write_to_device
def add_messages_txn(
txn: LoggingTransaction, now_ms: int, stream_id: int
) -> None:
# Add the remote messages to the federation outbox.
# We'll send them to a remote server when we next send a
# federation transaction to that destination.
@@ -824,10 +854,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
async with self._to_device_msg_id_gen.get_next() as stream_id:
now_ms = self.clock.time_msec()
await self.db_pool.runInteraction(
"add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id
"add_remote_messages_from_client_to_device_inbox",
add_messages_txn,
now_ms,
stream_id,
)
for user_id in local_messages_by_user_then_device.keys():
self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id)
for destination in remote_messages_by_destination.keys():
self._device_federation_outbox_stream_cache.entity_has_changed(
destination, stream_id

View File

@@ -856,7 +856,9 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
# Seed the device_inbox table with our fake messages
self.get_success(
self.hs.get_datastores().main.add_messages_to_device_inbox(messages, {})
self.hs.get_datastores().main.add_local_messages_from_client_to_device_inbox(
messages
)
)
# Now have local_user send a final to-device message to exclusive_as_user. All unsent

View File

@@ -18,9 +18,21 @@
# [This file includes modifications made by New Vector Limited]
#
#
from synapse.api.constants import EduTypes
from unittest.mock import AsyncMock, Mock
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import (
MAX_EDU_SIZE,
MAX_EDUS_PER_TRANSACTION,
EduTypes,
)
from synapse.api.errors import Codes
from synapse.rest import admin
from synapse.rest.client import login, sendtodevice, sync
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util.clock import Clock
from tests.unittest import HomeserverTestCase, override_config
@@ -41,6 +53,20 @@ class SendToDeviceTestCase(HomeserverTestCase):
sync.register_servlets,
]
def default_config(self) -> JsonDict:
config = super().default_config()
config["federation_sender_instances"] = None
return config
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
self.federation_transport_client = Mock(spec=["send_transaction"])
self.federation_transport_client.send_transaction = AsyncMock()
hs = self.setup_test_homeserver(
federation_transport_client=self.federation_transport_client,
)
return hs
def test_user_to_user(self) -> None:
"""A to-device message from one user to another should get delivered"""
@@ -91,6 +117,137 @@ class SendToDeviceTestCase(HomeserverTestCase):
self.assertEqual(channel.code, 200, channel.result)
self.assertEqual(channel.json_body.get("to_device", {}).get("events", []), [])
def test_large_remote_todevice(self) -> None:
"""A to-device message needs to fit in the EDU size limit"""
_ = self.register_user("u1", "pass")
user1_tok = self.login("u1", "pass", "d1")
# Create a message that is over the `MAX_EDU_SIZE`
test_msg = {"foo": "a" * MAX_EDU_SIZE}
channel = self.make_request(
"PUT",
"/_matrix/client/r0/sendToDevice/m.test/12345",
content={"messages": {"@remote_user:secondserver": {"device": test_msg}}},
access_token=user1_tok,
)
self.assertEqual(channel.code, 413, channel.result)
self.assertEqual(Codes.TOO_LARGE, channel.json_body["errcode"])
def test_edu_large_messages_splitting(self) -> None:
"""
Test that a bunch of to-device messages are split over multiple EDUs if they are
collectively too large to fit into a single EDU
"""
mock_send_transaction: AsyncMock = (
self.federation_transport_client.send_transaction
)
mock_send_transaction.return_value = {}
sender = self.hs.get_federation_sender()
_ = self.register_user("u1", "pass")
user1_tok = self.login("u1", "pass", "d1")
destination = "secondserver"
messages = {}
# 2 messages, each just big enough to fit into their own EDU
for i in range(2):
messages[f"@remote_user{i}:" + destination] = {
"device": {"foo": "a" * (MAX_EDU_SIZE - 1000)}
}
channel = self.make_request(
"PUT",
"/_matrix/client/r0/sendToDevice/m.test/1234567",
content={"messages": messages},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.result)
self.get_success(sender.send_device_messages([destination]))
number_of_edus_sent = 0
for call in mock_send_transaction.call_args_list:
number_of_edus_sent += len(call[0][1]()["edus"])
self.assertEqual(number_of_edus_sent, 2)
def test_edu_small_messages_not_splitting(self) -> None:
"""
Test that a couple of small messages do not get split into multiple EDUs
"""
mock_send_transaction: AsyncMock = (
self.federation_transport_client.send_transaction
)
mock_send_transaction.return_value = {}
sender = self.hs.get_federation_sender()
_ = self.register_user("u1", "pass")
user1_tok = self.login("u1", "pass", "d1")
destination = "secondserver"
messages = {}
# 2 small messages that should fit in a single EDU
for i in range(2):
messages[f"@remote_user{i}:" + destination] = {"device": {"foo": "a" * 100}}
channel = self.make_request(
"PUT",
"/_matrix/client/r0/sendToDevice/m.test/123456",
content={"messages": messages},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.result)
self.get_success(sender.send_device_messages([destination]))
number_of_edus_sent = 0
for call in mock_send_transaction.call_args_list:
number_of_edus_sent += len(call[0][1]()["edus"])
self.assertEqual(number_of_edus_sent, 1)
def test_transaction_splitting(self) -> None:
"""Test that a bunch of to-device messages are split into multiple transactions if there are too many EDUs"""
mock_send_transaction: AsyncMock = (
self.federation_transport_client.send_transaction
)
mock_send_transaction.return_value = {}
sender = self.hs.get_federation_sender()
_ = self.register_user("u1", "pass")
user1_tok = self.login("u1", "pass", "d1")
destination = "secondserver"
messages = {}
number_of_edus_to_send = MAX_EDUS_PER_TRANSACTION + 1
for i in range(number_of_edus_to_send):
messages[f"@remote_user{i}:" + destination] = {
"device": {"foo": "a" * (MAX_EDU_SIZE - 1000)}
}
channel = self.make_request(
"PUT",
"/_matrix/client/r0/sendToDevice/m.test/12345678",
content={"messages": messages},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.result)
self.get_success(sender.send_device_messages([destination]))
# At least 2 transactions should be sent since we are over the EDU limit per transaction
self.assertGreaterEqual(mock_send_transaction.call_count, 2)
number_of_edus_sent = 0
for call in mock_send_transaction.call_args_list:
number_of_edus_sent += len(call[0][1]()["edus"])
self.assertEqual(number_of_edus_sent, number_of_edus_to_send)
@override_config({"rc_key_requests": {"per_second": 10, "burst_count": 2}})
def test_local_room_key_request(self) -> None:
"""m.room_key_request has special-casing; test from local user"""