mirror of
https://github.com/element-hq/synapse.git
synced 2026-06-06 22:02:08 +00:00
Limit to-device EDU sizes (#19617)
This is based on https://github.com/element-hq/synapse/pull/18416, which got reverted (#19614) due to it incorrectly rejecting to-device messages to users with many devices (and thus breaking message sending). Fix https://github.com/element-hq/synapse/issues/17035 A to-device message content looks like: ```jsonc { "@user:domain": {"device1": {...}, "device2": {...}}, ... } ``` The previous PR would split up into multiple EDUs, each with a subset of the users. However, if one user's entry was too large it would not further split it up and then error out. The main change in this PR is to allow splitting up a single user into multiple EDUs. Other changes: 1. Rename to `SOFT_MAX_EDU_SIZE` to indicate that we sometimes send EDUs with larger size than that, and its more a target than a hard limit. 2. Check early if any to-device message (to a specific device) is too large to send, even if we're not going to send it over federation. This ensures that we catch issues where clients try to send too large to-device. This still means that if a client send a large individual to-device message it will fail, but I don't believe we ever send such large to-device messages (normally they're in the range of a few KB). --- I ended up changing the implementation a bunch to make it easy to reuse the code to split up dictionaries. Instead of repeatedly splitting up the EDU until each bit fits into the size, we instead record the size of each entry in the dict and instead split up based on cumulative size. This means we call `encode_canonical_json` on each entry rather than once on the entire struct, but its not significantly slower to do so. -- cc @MatMaul @MadLittleMods --------- Co-authored-by: Mathieu Velten <matmaul@gmail.com> Co-authored-by: mcalinghee <mcalinghee.dev@gmail.com> Co-authored-by: Eric Eastwood <madlittlemods@gmail.com>
This commit is contained in:
@@ -0,0 +1 @@
|
||||
A long queue of to-device messages could prevent outgoing federation because of the size of the transaction, let's limit the to-device EDU size to a reasonable value.
|
||||
@@ -30,6 +30,33 @@ from synapse.util.duration import Duration
|
||||
|
||||
# the max size of a (canonical-json-encoded) event
|
||||
MAX_PDU_SIZE = 65536
|
||||
# This isn't spec'ed but is our own reasonable default to play nice with Synapse's
|
||||
# `max_request_size`/`max_request_body_size`. We chose the same as `MAX_PDU_SIZE` as our
|
||||
# `max_request_body_size` math is currently limited by 200 `MAX_PDU_SIZE` things. The
|
||||
# spec for a `/federation/v1/send` request sets the limit at 100 EDU's and 50 PDU's
|
||||
# which is below that 200 `MAX_PDU_SIZE` limit (`max_request_body_size`).
|
||||
#
|
||||
# Allowing oversized EDU's results in failed `/federation/v1/send` transactions (because
|
||||
# the request overall can overrun the `max_request_body_size`) which are retried over
|
||||
# and over and prevent other outbound federation traffic from happening.
|
||||
#
|
||||
# We may send EDU's that are larger than this, but we aim to avoid doing so.
|
||||
SOFT_MAX_EDU_SIZE = 65536
|
||||
|
||||
# This is the maximum size of the content of a to-device message. This is not
|
||||
# (yet) spec'ed but is our own reasonable default. We need to set a limit on the
|
||||
# size of to-device message contents, as they get sent over federation and
|
||||
# therefore need to fit inside transactions.
|
||||
#
|
||||
# https://github.com/matrix-org/matrix-spec/pull/2340 tracks adding this to the
|
||||
# spec.
|
||||
MAX_TO_DEVICE_CONTENT_SIZE = SOFT_MAX_EDU_SIZE
|
||||
|
||||
# This is defined in the Matrix spec and enforced by the receiver.
|
||||
MAX_EDUS_PER_TRANSACTION = 100
|
||||
# A transaction can contain up to 100 EDUs but synapse reserves 10 EDUs for other purposes
|
||||
# like trickling out some device list updates.
|
||||
NUMBER_OF_RESERVED_EDUS_PER_TRANSACTION = 10
|
||||
|
||||
# The maximum allowed size of an HTTP request.
|
||||
# Other than media uploads, the biggest request we expect to see is a fully-loaded
|
||||
|
||||
@@ -30,7 +30,11 @@ from prometheus_client import Counter
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EduTypes
|
||||
from synapse.api.constants import (
|
||||
MAX_EDUS_PER_TRANSACTION,
|
||||
NUMBER_OF_RESERVED_EDUS_PER_TRANSACTION,
|
||||
EduTypes,
|
||||
)
|
||||
from synapse.api.errors import (
|
||||
FederationDeniedError,
|
||||
HttpResponseException,
|
||||
@@ -51,9 +55,6 @@ from synapse.visibility import filter_events_for_server
|
||||
if TYPE_CHECKING:
|
||||
import synapse.server
|
||||
|
||||
# This is defined in the Matrix spec and enforced by the receiver.
|
||||
MAX_EDUS_PER_TRANSACTION = 100
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -798,7 +799,9 @@ class _TransactionQueueManager:
|
||||
(
|
||||
to_device_edus,
|
||||
device_stream_id,
|
||||
) = await self.queue._get_to_device_message_edus(edu_limit - 10)
|
||||
) = await self.queue._get_to_device_message_edus(
|
||||
edu_limit - NUMBER_OF_RESERVED_EDUS_PER_TRANSACTION
|
||||
)
|
||||
|
||||
if to_device_edus:
|
||||
self._device_stream_id = device_stream_id
|
||||
|
||||
@@ -23,8 +23,16 @@ import logging
|
||||
from http import HTTPStatus
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
||||
from synapse.api.constants import (
|
||||
MAX_TO_DEVICE_CONTENT_SIZE,
|
||||
SOFT_MAX_EDU_SIZE,
|
||||
EduTypes,
|
||||
EventContentFields,
|
||||
ToDeviceEventTypes,
|
||||
)
|
||||
from synapse.api.errors import Codes, EventSizeError, SynapseError
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.logging.opentracing import (
|
||||
@@ -34,8 +42,9 @@ from synapse.logging.opentracing import (
|
||||
set_tag,
|
||||
)
|
||||
from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
|
||||
from synapse.util import split_dict_to_fit_to_size
|
||||
from synapse.util.json import json_encoder
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.util.stringutils import random_string_insecure_fast
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@@ -222,6 +231,7 @@ class DeviceMessageHandler:
|
||||
set_tag(SynapseTags.TO_DEVICE_TYPE, message_type)
|
||||
set_tag(SynapseTags.TO_DEVICE_SENDER, sender_user_id)
|
||||
local_messages = {}
|
||||
# Map from destination (server) -> recipient (user ID) -> device_id -> JSON message content
|
||||
remote_messages: dict[str, dict[str, dict[str, JsonDict]]] = {}
|
||||
for user_id, by_device in messages.items():
|
||||
if not UserID.is_valid(user_id):
|
||||
@@ -233,6 +243,24 @@ class DeviceMessageHandler:
|
||||
|
||||
# add an opentracing log entry for each message
|
||||
for device_id, message_content in by_device.items():
|
||||
# Check the size of each message, as if these are too large we
|
||||
# can't send them over federation.
|
||||
#
|
||||
# We do this for all to-device messages, even those that aren't
|
||||
# over federation, so as to more easily catch clients that are
|
||||
# sending excessively large messages.
|
||||
if (
|
||||
message_len := len(encode_canonical_json(message_content))
|
||||
) > MAX_TO_DEVICE_CONTENT_SIZE:
|
||||
# 413 is currently an unspecced response for `/sendToDevice` but is
|
||||
# probably the best thing we can do.
|
||||
# https://github.com/matrix-org/matrix-spec/pull/2340 tracks adding
|
||||
# this to the spec
|
||||
raise EventSizeError(
|
||||
f"To-device message for {user_id}:{device_id} is too large to send ({message_len} > {MAX_TO_DEVICE_CONTENT_SIZE})",
|
||||
unpersistable=True,
|
||||
)
|
||||
|
||||
log_kv(
|
||||
{
|
||||
"event": "send_to_device_message",
|
||||
@@ -277,28 +305,33 @@ class DeviceMessageHandler:
|
||||
destination = get_domain_from_id(user_id)
|
||||
remote_messages.setdefault(destination, {})[user_id] = by_device
|
||||
|
||||
context = get_active_span_text_map()
|
||||
|
||||
remote_edu_contents = {}
|
||||
for destination, messages in remote_messages.items():
|
||||
# The EDU contains a "message_id" property which is used for
|
||||
# idempotence. Make up a random one.
|
||||
message_id = random_string(16)
|
||||
log_kv({"destination": destination, "message_id": message_id})
|
||||
|
||||
remote_edu_contents[destination] = {
|
||||
"messages": messages,
|
||||
"sender": sender_user_id,
|
||||
"type": message_type,
|
||||
"message_id": message_id,
|
||||
"org.matrix.opentracing_context": json_encoder.encode(context),
|
||||
}
|
||||
|
||||
# Add messages to the database.
|
||||
# Add local messages to the database.
|
||||
# Retrieve the stream id of the last-processed to-device message.
|
||||
last_stream_id = await self.store.add_messages_to_device_inbox(
|
||||
local_messages, remote_edu_contents
|
||||
last_stream_id = (
|
||||
await self.store.add_local_messages_from_client_to_device_inbox(
|
||||
local_messages
|
||||
)
|
||||
)
|
||||
for destination, messages in remote_messages.items():
|
||||
split_edus = split_device_messages_into_edus(
|
||||
sender_user_id, message_type, messages
|
||||
)
|
||||
for edu in split_edus:
|
||||
edu["org.matrix.opentracing_context"] = json_encoder.encode(
|
||||
get_active_span_text_map()
|
||||
)
|
||||
# Add remote messages to the database.
|
||||
last_stream_id = (
|
||||
await self.store.add_remote_messages_from_client_to_device_inbox(
|
||||
{destination: edu}
|
||||
)
|
||||
)
|
||||
log_kv(
|
||||
{
|
||||
"destination": destination,
|
||||
"message_id": edu["message_id"],
|
||||
}
|
||||
)
|
||||
|
||||
# Notify listeners that there are new to-device messages to process,
|
||||
# handing them the latest stream id.
|
||||
@@ -397,3 +430,128 @@ class DeviceMessageHandler:
|
||||
"events": messages,
|
||||
"next_batch": f"d{stream_id}",
|
||||
}
|
||||
|
||||
|
||||
def split_device_messages_into_edus(
|
||||
sender_user_id: str,
|
||||
message_type: str,
|
||||
messages_by_user_then_device: dict[str, dict[str, JsonDict]],
|
||||
) -> list[JsonDict]:
|
||||
"""
|
||||
This function takes many to-device messages and fits/splits them into several EDUs
|
||||
as necessary. We split the messages up as the overall request can overrun the
|
||||
`max_request_body_size` and prevent outbound federation traffic because of the size
|
||||
of the transaction (cf. `SOFT_MAX_EDU_SIZE`).
|
||||
|
||||
Args:
|
||||
sender_user_id: The user that is sending the to-device messages.
|
||||
message_type: The type of to-device messages that are being sent.
|
||||
messages_by_user_then_device: Dictionary of recipient user_id to recipient device_id to message.
|
||||
|
||||
Returns:
|
||||
Bin-packed list of EDU JSON content for the given to_device messages
|
||||
"""
|
||||
split_edus_content: list[JsonDict] = []
|
||||
|
||||
# The header size of the full EDU.
|
||||
base_edu_size = _EMPTY_EDU_SIZE + len(sender_user_id) + len(message_type)
|
||||
|
||||
# First split up the top-level dict of user_id to device messages.
|
||||
for subset_messages, estimated_size in split_dict_to_fit_to_size(
|
||||
messages_by_user_then_device,
|
||||
soft_max_size=SOFT_MAX_EDU_SIZE,
|
||||
wrapping_object_size=base_edu_size,
|
||||
):
|
||||
# The returned subset might be larger than the soft max size if it
|
||||
# contains a single entry that is larger than the soft max size.
|
||||
if estimated_size <= SOFT_MAX_EDU_SIZE:
|
||||
# This message fits in a single EDU, add it as is.
|
||||
content = create_new_to_device_edu_content(
|
||||
sender_user_id, message_type, subset_messages
|
||||
)
|
||||
split_edus_content.append(content)
|
||||
logger.debug(
|
||||
"Created EDU with %d recipients from %s (message_id=%s), (total EDUs so far: %d)",
|
||||
len(subset_messages),
|
||||
sender_user_id,
|
||||
content["message_id"],
|
||||
len(split_edus_content),
|
||||
)
|
||||
else:
|
||||
# This message doesn't fit in a single EDU. We split the message up
|
||||
# further by device.
|
||||
#
|
||||
# Note: `subset` should only have a single entry in it.
|
||||
for recipient, messages_by_device in subset_messages.items():
|
||||
# The header size of the EDU for this recipient, which includes
|
||||
# the size of the recipient user ID and the wrapping structure
|
||||
# for the device messages.
|
||||
subset_base_size = len(
|
||||
encode_canonical_json(
|
||||
_create_new_to_device_edu(
|
||||
sender_user_id, message_type, {recipient: {}}
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
for subset_messages, _estimated_size in split_dict_to_fit_to_size(
|
||||
messages_by_device,
|
||||
soft_max_size=SOFT_MAX_EDU_SIZE,
|
||||
wrapping_object_size=subset_base_size,
|
||||
):
|
||||
# Again, the returned subset might be larger than the soft
|
||||
# max size, but we can't split it any further so we have to
|
||||
# add it as is.
|
||||
content = create_new_to_device_edu_content(
|
||||
sender_user_id, message_type, {recipient: subset_messages}
|
||||
)
|
||||
split_edus_content.append(content)
|
||||
logger.debug(
|
||||
"Created EDU with %d recipients from %s (message_id=%s), (total EDUs so far: %d)",
|
||||
len(subset_messages),
|
||||
sender_user_id,
|
||||
content["message_id"],
|
||||
len(split_edus_content),
|
||||
)
|
||||
|
||||
return split_edus_content
|
||||
|
||||
|
||||
def create_new_to_device_edu_content(
|
||||
sender_user_id: str,
|
||||
message_type: str,
|
||||
messages_by_user_then_device: dict[str, dict[str, JsonDict]],
|
||||
) -> JsonDict:
|
||||
"""
|
||||
Create a new `m.direct_to_device` EDU `content` object with a unique message ID.
|
||||
"""
|
||||
# The EDU contains a "message_id" property which is used for
|
||||
# idempotence. Make up a random one.
|
||||
message_id = random_string_insecure_fast(16)
|
||||
content = {
|
||||
"sender": sender_user_id,
|
||||
"type": message_type,
|
||||
"message_id": message_id,
|
||||
"messages": messages_by_user_then_device,
|
||||
}
|
||||
return content
|
||||
|
||||
|
||||
def _create_new_to_device_edu(
|
||||
sender_user_id: str,
|
||||
message_type: str,
|
||||
messages_by_user_then_device: dict[str, dict[str, JsonDict]],
|
||||
) -> dict[str, Any]:
|
||||
"""Create a new `m.direct_to_device` EDU, returning the full EDU dict."""
|
||||
return {
|
||||
"edu_type": EduTypes.DIRECT_TO_DEVICE,
|
||||
"content": create_new_to_device_edu_content(
|
||||
sender_user_id, message_type, messages_by_user_then_device
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
# The size of an empty EDU with no messages, which we use as the base size when
|
||||
# packing messages into EDUs. The size of the sender and message type must be
|
||||
# added to this when calculating the size of an EDU.
|
||||
_EMPTY_EDU_SIZE = len(encode_canonical_json(_create_new_to_device_edu("", "", {})))
|
||||
|
||||
@@ -739,18 +739,15 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
@trace
|
||||
async def add_messages_to_device_inbox(
|
||||
async def add_local_messages_from_client_to_device_inbox(
|
||||
self,
|
||||
local_messages_by_user_then_device: dict[str, dict[str, JsonDict]],
|
||||
remote_messages_by_destination: dict[str, JsonDict],
|
||||
) -> int:
|
||||
"""Used to send messages from this server.
|
||||
"""Queue local device messages that will be sent to devices of local users.
|
||||
|
||||
Args:
|
||||
local_messages_by_user_then_device:
|
||||
Dictionary of recipient user_id to recipient device_id to message.
|
||||
remote_messages_by_destination:
|
||||
Dictionary of destination server_name to the EDU JSON to send.
|
||||
|
||||
Returns:
|
||||
The new stream_id.
|
||||
@@ -766,6 +763,39 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
txn, stream_id, local_messages_by_user_then_device
|
||||
)
|
||||
|
||||
async with self._to_device_msg_id_gen.get_next() as stream_id:
|
||||
now_ms = self.clock.time_msec()
|
||||
await self.db_pool.runInteraction(
|
||||
"add_local_messages_from_client_to_device_inbox",
|
||||
add_messages_txn,
|
||||
now_ms,
|
||||
stream_id,
|
||||
)
|
||||
for user_id in local_messages_by_user_then_device.keys():
|
||||
self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id)
|
||||
|
||||
return self._to_device_msg_id_gen.get_current_token()
|
||||
|
||||
@trace
|
||||
async def add_remote_messages_from_client_to_device_inbox(
|
||||
self,
|
||||
remote_messages_by_destination: dict[str, JsonDict],
|
||||
) -> int:
|
||||
"""Queue device messages that will be sent to remote servers.
|
||||
|
||||
Args:
|
||||
remote_messages_by_destination:
|
||||
Dictionary of destination server_name to the EDU JSON to send.
|
||||
|
||||
Returns:
|
||||
The new stream_id.
|
||||
"""
|
||||
|
||||
assert self._can_write_to_device
|
||||
|
||||
def add_messages_txn(
|
||||
txn: LoggingTransaction, now_ms: int, stream_id: int
|
||||
) -> None:
|
||||
# Add the remote messages to the federation outbox.
|
||||
# We'll send them to a remote server when we next send a
|
||||
# federation transaction to that destination.
|
||||
@@ -824,10 +854,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
async with self._to_device_msg_id_gen.get_next() as stream_id:
|
||||
now_ms = self.clock.time_msec()
|
||||
await self.db_pool.runInteraction(
|
||||
"add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id
|
||||
"add_remote_messages_from_client_to_device_inbox",
|
||||
add_messages_txn,
|
||||
now_ms,
|
||||
stream_id,
|
||||
)
|
||||
for user_id in local_messages_by_user_then_device.keys():
|
||||
self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id)
|
||||
for destination in remote_messages_by_destination.keys():
|
||||
self._device_federation_outbox_stream_cache.entity_has_changed(
|
||||
destination, stream_id
|
||||
|
||||
@@ -24,6 +24,7 @@ import logging
|
||||
import os
|
||||
import typing
|
||||
from typing import (
|
||||
Any,
|
||||
Iterator,
|
||||
Mapping,
|
||||
Sequence,
|
||||
@@ -31,11 +32,14 @@ from typing import (
|
||||
)
|
||||
|
||||
import attr
|
||||
from canonicaljson import encode_canonical_json
|
||||
from matrix_common.versionstring import get_distribution_version_string
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.types import JsonDict
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
pass
|
||||
|
||||
@@ -165,3 +169,107 @@ class MutableOverlayMapping(collections.abc.MutableMapping[K, V]):
|
||||
self._underlying_map = {}
|
||||
self._mutable_map.clear()
|
||||
self._deletions.clear()
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class _DictSplitterState:
|
||||
"""State for splitting a dict into multiple dicts, c.f.
|
||||
`split_dict_to_fit_to_size`."""
|
||||
|
||||
subset: dict[str, Any]
|
||||
"""A subset of the original dict."""
|
||||
|
||||
estimated_size: int
|
||||
"""Estimated size of the JSON encoding of the current payload, including any
|
||||
wrapping structure."""
|
||||
|
||||
|
||||
def split_dict_to_fit_to_size(
|
||||
original_dict: dict[str, Any],
|
||||
*,
|
||||
soft_max_size: int,
|
||||
wrapping_object_size: int = 2,
|
||||
) -> Iterator[tuple[dict[str, JsonDict], int]]:
|
||||
"""Splits a dict up into a list of dicts, each of which is small enough to
|
||||
fit into the given size when encoded as JSON. Every entry in the original
|
||||
dict is in exactly one of the resulting dicts.
|
||||
|
||||
The `wrapping_object_size` can be used if the resulting dicts are going to
|
||||
be wrapped in some additional JSON structure, to account for the additional
|
||||
size of that structure. The default assumes no wrapping, and just accounts
|
||||
for the two curly braces of the dict itself.
|
||||
|
||||
Note that if an individual entry in the original dict is larger than
|
||||
`soft_max_size` then this will emit a dict containing just that entry, which
|
||||
will be larger than `soft_max_size` when encoded as JSON.
|
||||
|
||||
Args:
|
||||
original_dict: The dict to split.
|
||||
soft_max_size: The maximum size of each dict when encoded as JSON.
|
||||
wrapping_object_size: The estimated size of the JSON encoding of the
|
||||
payload when empty.
|
||||
|
||||
Returns:
|
||||
An iterator of (dict, size) pairs, where dict is a subset of the
|
||||
original dict and size is the estimated size of the JSON encoding of
|
||||
that dict, including any wrapping structure.
|
||||
"""
|
||||
|
||||
if not original_dict:
|
||||
return
|
||||
|
||||
# Check if the whole dict fits within the size limit. If it does, we can
|
||||
# skip the splitting logic and just return the original dict.
|
||||
full_size = _len_with_wrapping_object(original_dict, wrapping_object_size)
|
||||
if full_size <= soft_max_size:
|
||||
yield (original_dict, full_size)
|
||||
return
|
||||
|
||||
# The current payload being built up. We keep track of the estimated size of
|
||||
# the JSON encoding of this payload so that we can decide when to start a
|
||||
# new one.
|
||||
current_payload = _DictSplitterState(subset={}, estimated_size=wrapping_object_size)
|
||||
|
||||
for key, payload in original_dict.items():
|
||||
current_payload.subset[key] = payload
|
||||
current_size = _len_with_wrapping_object(
|
||||
current_payload.subset, wrapping_object_size
|
||||
)
|
||||
|
||||
if current_size > soft_max_size:
|
||||
# We've exceeded the size limit, so we need to start a new payload. We pop
|
||||
# the current entry from the payload and yield the previous payload, then
|
||||
# start a new payload with just the current entry.
|
||||
if len(current_payload.subset) > 1:
|
||||
current_payload.subset.pop(key)
|
||||
yield current_payload.subset, current_payload.estimated_size
|
||||
|
||||
current_payload = _DictSplitterState(
|
||||
subset={},
|
||||
estimated_size=wrapping_object_size,
|
||||
)
|
||||
|
||||
# Recalculate the current size with just the current entry.
|
||||
current_size = _len_with_wrapping_object(
|
||||
{key: payload}, wrapping_object_size
|
||||
)
|
||||
|
||||
current_payload.subset[key] = payload
|
||||
current_payload.estimated_size = current_size
|
||||
|
||||
if current_payload.subset:
|
||||
# yield the final payload if it's non-empty
|
||||
yield current_payload.subset, current_payload.estimated_size
|
||||
|
||||
|
||||
def _len_with_wrapping_object(payload: Any, wrapping_object_size: int) -> int:
|
||||
"""Helper function to calculate the size of a payload when encoded as JSON,
|
||||
including any wrapping structure."""
|
||||
return (
|
||||
len(encode_canonical_json(payload))
|
||||
+ wrapping_object_size
|
||||
# account for the curly braces of the dict itself, which are
|
||||
# included in the size of the subset but not in the size of the
|
||||
# payload
|
||||
- 2
|
||||
)
|
||||
|
||||
@@ -856,7 +856,9 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
# Seed the device_inbox table with our fake messages
|
||||
self.get_success(
|
||||
self.hs.get_datastores().main.add_messages_to_device_inbox(messages, {})
|
||||
self.hs.get_datastores().main.add_local_messages_from_client_to_device_inbox(
|
||||
messages
|
||||
)
|
||||
)
|
||||
|
||||
# Now have local_user send a final to-device message to exclusive_as_user. All unsent
|
||||
|
||||
@@ -18,9 +18,26 @@
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
from synapse.api.constants import EduTypes
|
||||
from unittest.mock import AsyncMock, Mock
|
||||
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.api.constants import (
|
||||
MAX_EDUS_PER_TRANSACTION,
|
||||
SOFT_MAX_EDU_SIZE,
|
||||
EduTypes,
|
||||
)
|
||||
from synapse.api.errors import Codes
|
||||
from synapse.handlers.devicemessage import (
|
||||
_create_new_to_device_edu,
|
||||
)
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client import login, sendtodevice, sync
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.clock import Clock
|
||||
|
||||
from tests.unittest import HomeserverTestCase, override_config
|
||||
|
||||
@@ -41,6 +58,20 @@ class SendToDeviceTestCase(HomeserverTestCase):
|
||||
sync.register_servlets,
|
||||
]
|
||||
|
||||
def default_config(self) -> JsonDict:
|
||||
config = super().default_config()
|
||||
config["federation_sender_instances"] = None
|
||||
return config
|
||||
|
||||
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
|
||||
self.federation_transport_client = Mock(spec=["send_transaction"])
|
||||
self.federation_transport_client.send_transaction = AsyncMock()
|
||||
hs = self.setup_test_homeserver(
|
||||
federation_transport_client=self.federation_transport_client,
|
||||
)
|
||||
|
||||
return hs
|
||||
|
||||
def test_user_to_user(self) -> None:
|
||||
"""A to-device message from one user to another should get delivered"""
|
||||
|
||||
@@ -91,6 +122,238 @@ class SendToDeviceTestCase(HomeserverTestCase):
|
||||
self.assertEqual(channel.code, 200, channel.result)
|
||||
self.assertEqual(channel.json_body.get("to_device", {}).get("events", []), [])
|
||||
|
||||
def test_large_remote_todevice(self) -> None:
|
||||
"""A to-device message needs to fit in the EDU size limit"""
|
||||
_ = self.register_user("u1", "pass")
|
||||
user1_tok = self.login("u1", "pass", "d1")
|
||||
|
||||
# Create a message that is over the `SOFT_MAX_EDU_SIZE`
|
||||
test_msg = {"foo": "a" * SOFT_MAX_EDU_SIZE}
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
"/_matrix/client/r0/sendToDevice/m.test/12345",
|
||||
content={"messages": {"@remote_user:secondserver": {"device": test_msg}}},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 413, channel.result)
|
||||
self.assertEqual(Codes.TOO_LARGE, channel.json_body["errcode"])
|
||||
|
||||
def test_edu_large_messages_splitting(self) -> None:
|
||||
"""
|
||||
Test that a bunch of to-device messages are split over multiple EDUs if they are
|
||||
collectively too large to fit into a single EDU
|
||||
"""
|
||||
mock_send_transaction: AsyncMock = (
|
||||
self.federation_transport_client.send_transaction
|
||||
)
|
||||
mock_send_transaction.return_value = {}
|
||||
|
||||
sender = self.hs.get_federation_sender()
|
||||
|
||||
_ = self.register_user("u1", "pass")
|
||||
user1_tok = self.login("u1", "pass", "d1")
|
||||
destination = "secondserver"
|
||||
messages = {}
|
||||
|
||||
# 2 messages, each just big enough to fit into their own EDU
|
||||
for i in range(2):
|
||||
messages[f"@remote_user{i}:" + destination] = {
|
||||
"device": {"foo": "a" * (SOFT_MAX_EDU_SIZE - 1000)}
|
||||
}
|
||||
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
"/_matrix/client/r0/sendToDevice/m.test/1234567",
|
||||
content={"messages": messages},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.result)
|
||||
|
||||
self.get_success(sender.send_device_messages([destination]))
|
||||
|
||||
number_of_edus_sent = 0
|
||||
for call in mock_send_transaction.call_args_list:
|
||||
number_of_edus_sent += len(call[0][1]()["edus"])
|
||||
|
||||
self.assertEqual(number_of_edus_sent, 2)
|
||||
|
||||
def test_edu_large_messages_splitting_one_user(self) -> None:
|
||||
"""
|
||||
Test that a bunch of to-device messages for the same user are split over multiple EDUs if they are
|
||||
collectively too large to fit into a single EDU
|
||||
"""
|
||||
mock_send_transaction: AsyncMock = (
|
||||
self.federation_transport_client.send_transaction
|
||||
)
|
||||
mock_send_transaction.return_value = {}
|
||||
|
||||
sender = self.hs.get_federation_sender()
|
||||
|
||||
_ = self.register_user("u1", "pass")
|
||||
user1_tok = self.login("u1", "pass", "d1")
|
||||
destination = "secondserver"
|
||||
messages = {}
|
||||
|
||||
# 2 messages, each just big enough to fit into their own EDU
|
||||
messages["@remote_user:" + destination] = {
|
||||
"device1": {"foo": "a" * (SOFT_MAX_EDU_SIZE - 1000)},
|
||||
"device2": {"foo": "a" * (SOFT_MAX_EDU_SIZE - 1000)},
|
||||
}
|
||||
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
"/_matrix/client/r0/sendToDevice/m.test/12345678",
|
||||
content={"messages": messages},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.result)
|
||||
|
||||
self.get_success(sender.send_device_messages([destination]))
|
||||
|
||||
number_of_edus_sent = 0
|
||||
for call in mock_send_transaction.call_args_list:
|
||||
number_of_edus_sent += len(call[0][1]()["edus"])
|
||||
|
||||
self.assertEqual(number_of_edus_sent, 2)
|
||||
|
||||
def test_edu_large_messages_not_splitting_one_user(self) -> None:
|
||||
"""Test that a couple of to-device messages for the same user that just
|
||||
collectively fit in a single EDU do not get split into multiple EDUs.
|
||||
|
||||
This tests that we don't over-split messages into multiple EDUs when we
|
||||
don't need to.
|
||||
"""
|
||||
|
||||
mock_send_transaction: AsyncMock = (
|
||||
self.federation_transport_client.send_transaction
|
||||
)
|
||||
mock_send_transaction.return_value = {}
|
||||
|
||||
sender = self.hs.get_federation_sender()
|
||||
|
||||
user_id = self.register_user("u1", "pass")
|
||||
user1_tok = self.login("u1", "pass", "d1")
|
||||
destination = "secondserver"
|
||||
remote_user = "@remote_user:" + destination
|
||||
messages = {}
|
||||
|
||||
# We create two messages such that the combined size of the messages is
|
||||
# just under the `SOFT_MAX_EDU_SIZE` limit, so they should be able to
|
||||
# fit in a single EDU together, but not separately.
|
||||
wrapper_size = len(
|
||||
encode_canonical_json(_create_new_to_device_edu(user_id, "m.test", {}))
|
||||
)
|
||||
max_size_of_message = (
|
||||
SOFT_MAX_EDU_SIZE
|
||||
- wrapper_size
|
||||
# Size of the device content wrapper, `{"remote_user": …}`
|
||||
- (len(remote_user) + 5)
|
||||
)
|
||||
messages[remote_user] = {
|
||||
# The constants are the size of each individual message
|
||||
"d1": {"a": "a" * (max_size_of_message - 13 - 16)},
|
||||
"d2": {"b": "b"},
|
||||
}
|
||||
|
||||
# The full EDU size should now be just shy of the `SOFT_MAX_EDU_SIZE` limit
|
||||
full_edu = encode_canonical_json(
|
||||
_create_new_to_device_edu(user_id, "m.test", messages)
|
||||
)
|
||||
self.assertEqual(len(full_edu), SOFT_MAX_EDU_SIZE - 1)
|
||||
|
||||
# Now send the messages, and check they are sent in a single EDU.
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
"/_matrix/client/r0/sendToDevice/m.test/12345678",
|
||||
content={"messages": messages},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.result)
|
||||
|
||||
self.get_success(sender.send_device_messages([destination]))
|
||||
|
||||
number_of_edus_sent = 0
|
||||
for call in mock_send_transaction.call_args_list:
|
||||
number_of_edus_sent += len(call[0][1]()["edus"])
|
||||
|
||||
self.assertEqual(number_of_edus_sent, 1)
|
||||
|
||||
def test_edu_small_messages_not_splitting(self) -> None:
|
||||
"""
|
||||
Test that a couple of small messages do not get split into multiple EDUs
|
||||
"""
|
||||
mock_send_transaction: AsyncMock = (
|
||||
self.federation_transport_client.send_transaction
|
||||
)
|
||||
mock_send_transaction.return_value = {}
|
||||
|
||||
sender = self.hs.get_federation_sender()
|
||||
|
||||
_ = self.register_user("u1", "pass")
|
||||
user1_tok = self.login("u1", "pass", "d1")
|
||||
destination = "secondserver"
|
||||
messages = {}
|
||||
|
||||
# 2 small messages that should fit in a single EDU
|
||||
for i in range(2):
|
||||
messages[f"@remote_user{i}:" + destination] = {"device": {"foo": "a" * 100}}
|
||||
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
"/_matrix/client/r0/sendToDevice/m.test/123456",
|
||||
content={"messages": messages},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.result)
|
||||
|
||||
self.get_success(sender.send_device_messages([destination]))
|
||||
|
||||
number_of_edus_sent = 0
|
||||
for call in mock_send_transaction.call_args_list:
|
||||
number_of_edus_sent += len(call[0][1]()["edus"])
|
||||
|
||||
self.assertEqual(number_of_edus_sent, 1)
|
||||
|
||||
def test_transaction_splitting(self) -> None:
|
||||
"""Test that a bunch of to-device messages are split into multiple transactions if there are too many EDUs"""
|
||||
mock_send_transaction: AsyncMock = (
|
||||
self.federation_transport_client.send_transaction
|
||||
)
|
||||
mock_send_transaction.return_value = {}
|
||||
|
||||
sender = self.hs.get_federation_sender()
|
||||
|
||||
_ = self.register_user("u1", "pass")
|
||||
user1_tok = self.login("u1", "pass", "d1")
|
||||
destination = "secondserver"
|
||||
messages = {}
|
||||
|
||||
number_of_edus_to_send = MAX_EDUS_PER_TRANSACTION + 1
|
||||
|
||||
for i in range(number_of_edus_to_send):
|
||||
messages[f"@remote_user{i}:" + destination] = {
|
||||
"device": {"foo": "a" * (SOFT_MAX_EDU_SIZE - 1000)}
|
||||
}
|
||||
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
"/_matrix/client/r0/sendToDevice/m.test/12345678",
|
||||
content={"messages": messages},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.result)
|
||||
|
||||
self.get_success(sender.send_device_messages([destination]))
|
||||
|
||||
# At least 2 transactions should be sent since we are over the EDU limit per transaction
|
||||
self.assertGreaterEqual(mock_send_transaction.call_count, 2)
|
||||
|
||||
number_of_edus_sent = 0
|
||||
for call in mock_send_transaction.call_args_list:
|
||||
number_of_edus_sent += len(call[0][1]()["edus"])
|
||||
|
||||
self.assertEqual(number_of_edus_sent, number_of_edus_to_send)
|
||||
|
||||
@override_config({"rc_key_requests": {"per_second": 10, "burst_count": 2}})
|
||||
def test_local_room_key_request(self) -> None:
|
||||
"""m.room_key_request has special-casing; test from local user"""
|
||||
|
||||
@@ -0,0 +1,201 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2026 Element Creations Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
|
||||
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
||||
from synapse.util import split_dict_to_fit_to_size
|
||||
|
||||
from tests.unittest import TestCase
|
||||
|
||||
|
||||
class SplitDictTestCase(TestCase):
|
||||
def test_empty(self) -> None:
|
||||
"Test that an empty dict yields no payloads"
|
||||
|
||||
self.assertEqual(
|
||||
list(
|
||||
split_dict_to_fit_to_size({}, soft_max_size=10, wrapping_object_size=0)
|
||||
),
|
||||
[],
|
||||
)
|
||||
|
||||
def test_no_splitting(self) -> None:
|
||||
"Test that a dict that fits within the size limit is yielded as a single payload"
|
||||
|
||||
original_dict = {"a": {"key": "value"}, "b": {"key": "value"}}
|
||||
|
||||
# Set the soft max size to be the size of the original dict, so it
|
||||
# should fit
|
||||
soft_max_size = len(encode_canonical_json(original_dict))
|
||||
|
||||
self.assertEqual(
|
||||
list(
|
||||
split_dict_to_fit_to_size(
|
||||
original_dict,
|
||||
soft_max_size=soft_max_size,
|
||||
)
|
||||
),
|
||||
[(original_dict, soft_max_size)],
|
||||
)
|
||||
|
||||
def test_no_splitting_with_wrapping_size(self) -> None:
|
||||
"Test that the wrapping size is taken into account when deciding whether to split"
|
||||
|
||||
wrapping = {"key": "value", "payload": {}}
|
||||
original_dict = {"a": {"key": "value"}, "b": {"key": "value"}}
|
||||
wrapping_object_size = len(encode_canonical_json(wrapping))
|
||||
|
||||
# Set the soft max size to the size of the expected final output.
|
||||
soft_max_size = len(
|
||||
encode_canonical_json({"key": "value", "payload": original_dict})
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
list(
|
||||
split_dict_to_fit_to_size(
|
||||
original_dict,
|
||||
soft_max_size=soft_max_size,
|
||||
wrapping_object_size=wrapping_object_size,
|
||||
)
|
||||
),
|
||||
[(original_dict, soft_max_size)],
|
||||
)
|
||||
|
||||
def test_splitting(self) -> None:
|
||||
"Test that a dict that exceeds the size limit is split into multiple payloads"
|
||||
|
||||
original_dict = {
|
||||
"a": {"key": "value"},
|
||||
"b": {"key": "value"},
|
||||
"c": {"key": "value"},
|
||||
}
|
||||
|
||||
# Set the soft max size to be the size of a single key-value pair, so
|
||||
# it should split into three payloads.
|
||||
soft_max_size = len(encode_canonical_json({"a": {"key": "value"}}))
|
||||
|
||||
self.assertEqual(
|
||||
list(
|
||||
split_dict_to_fit_to_size(
|
||||
original_dict,
|
||||
soft_max_size=soft_max_size,
|
||||
)
|
||||
),
|
||||
[
|
||||
({"a": {"key": "value"}}, soft_max_size),
|
||||
({"b": {"key": "value"}}, soft_max_size),
|
||||
({"c": {"key": "value"}}, soft_max_size),
|
||||
],
|
||||
)
|
||||
|
||||
def test_splitting_with_wrapping_size(self) -> None:
|
||||
"Test that the wrapping size is taken into account when splitting"
|
||||
|
||||
wrapping = {"key": "value", "payload": {}}
|
||||
original_dict = {
|
||||
"a": {"key": "value"},
|
||||
"b": {"key": "value"},
|
||||
"c": {"key": "value"},
|
||||
}
|
||||
wrapping_object_size = len(encode_canonical_json(wrapping))
|
||||
# Set the soft max size to be the size of a single key-value pair plus
|
||||
# the wrapping size, so it should split into three payloads.
|
||||
soft_max_size = (
|
||||
len(encode_canonical_json({"a": {"key": "value"}}))
|
||||
+ wrapping_object_size
|
||||
- 2
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
list(
|
||||
split_dict_to_fit_to_size(
|
||||
original_dict,
|
||||
soft_max_size=soft_max_size,
|
||||
wrapping_object_size=wrapping_object_size,
|
||||
)
|
||||
),
|
||||
[
|
||||
({"a": {"key": "value"}}, soft_max_size),
|
||||
({"b": {"key": "value"}}, soft_max_size),
|
||||
({"c": {"key": "value"}}, soft_max_size),
|
||||
],
|
||||
)
|
||||
|
||||
def test_oversized_entry(self) -> None:
|
||||
"""Test that if a single entry exceeds the size limit, it is still
|
||||
yielded as a single payload"""
|
||||
|
||||
original_dict = {
|
||||
"a": {"key": "value"},
|
||||
"b": {"key": "value"},
|
||||
"c": {"key": "value"},
|
||||
}
|
||||
|
||||
# Set the soft max size to be smaller than the size of a single
|
||||
# key-value pair, so each entry exceeds the limit.
|
||||
soft_max_size = len(encode_canonical_json({"a": {"key": "value"}})) - 1
|
||||
|
||||
self.assertEqual(
|
||||
list(
|
||||
split_dict_to_fit_to_size(
|
||||
original_dict,
|
||||
soft_max_size=soft_max_size,
|
||||
)
|
||||
),
|
||||
[
|
||||
(
|
||||
{"a": {"key": "value"}},
|
||||
len(encode_canonical_json({"a": {"key": "value"}})),
|
||||
),
|
||||
(
|
||||
{"b": {"key": "value"}},
|
||||
len(encode_canonical_json({"b": {"key": "value"}})),
|
||||
),
|
||||
(
|
||||
{"c": {"key": "value"}},
|
||||
len(encode_canonical_json({"c": {"key": "value"}})),
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
def test_different_sized_entries(self) -> None:
|
||||
"""Test that entries of different sizes are split correctly"""
|
||||
|
||||
original_dict = {
|
||||
"a": "X" * 5, # size 13
|
||||
"b": "X" * 10, # size 18
|
||||
"c": "X" * 5, # size 13
|
||||
}
|
||||
|
||||
soft_max_size = 30
|
||||
|
||||
self.assertEqual(
|
||||
list(
|
||||
split_dict_to_fit_to_size(
|
||||
original_dict,
|
||||
soft_max_size=soft_max_size,
|
||||
)
|
||||
),
|
||||
[
|
||||
(
|
||||
{"a": "X" * 5, "b": "X" * 10},
|
||||
len(encode_canonical_json({"a": "X" * 5, "b": "X" * 10})),
|
||||
),
|
||||
(
|
||||
{"c": "X" * 5},
|
||||
len(encode_canonical_json({"c": "X" * 5})),
|
||||
),
|
||||
],
|
||||
)
|
||||
Reference in New Issue
Block a user