Add ReplicationProfileSetFieldValue endpoint

If we get PUT for profile fields on an instance that isn't the profile updates stream writer, we'll route the request to be finished on the correct stream writer via http replication.
This commit is contained in:
Jason Robinson
2026-05-28 22:10:06 +03:00
parent 6ec8ee2a5e
commit df1b587332
5 changed files with 173 additions and 16 deletions
+9
View File
@@ -584,6 +584,15 @@ configured as stream writer for the `quarantined_media_changes` stream:
^/_synapse/admin/v1/quarantine_media/.*$
#### The `profile_updates` stream
The `profile_updates` stream supports multiple writers. The following endpoints
can be handled by any worker, but PUT and DELETE should be routed directly to one of the
workers configured as stream writer for the `profile_updates` stream:
^/_matrix/client/(api/v1|r0|v3|unstable)/profile/.*/[^/]+$
#### Restrict outbound federation traffic to a specific set of workers
The
+36
View File
@@ -599,6 +599,42 @@ class ProfileHandler:
return result.get(field_name)
async def set_field(
self,
*,
target_user: UserID,
requester: Requester,
field_name: str,
new_value: str,
by_admin: bool = False,
propagate: bool = False,
) -> None:
"""Wrapper function for setting any profile field for a user."""
if field_name == ProfileFields.DISPLAYNAME:
await self.set_displayname(
target_user=target_user,
requester=requester,
new_displayname=new_value,
by_admin=by_admin,
propagate=propagate,
)
elif field_name == ProfileFields.AVATAR_URL:
await self.set_avatar_url(
target_user=target_user,
requester=requester,
new_avatar_url=new_value,
by_admin=by_admin,
propagate=propagate,
)
else:
await self.set_profile_field(
target_user=target_user,
requester=requester,
field_name=field_name,
new_value=new_value,
by_admin=by_admin,
)
async def set_profile_field(
self,
target_user: UserID,
+2
View File
@@ -30,6 +30,7 @@ from synapse.replication.http import (
login,
membership,
presence,
profile,
push,
register,
send_events,
@@ -59,6 +60,7 @@ class ReplicationRestResource(JsonResource):
push.register_servlets(hs, self)
state.register_servlets(hs, self)
devices.register_servlets(hs, self)
profile.register_servlets(hs, self)
# The following can't currently be instantiated on workers.
if hs.config.worker.worker_app is None:
+101
View File
@@ -0,0 +1,101 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2026 Element Creations, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
import logging
from typing import TYPE_CHECKING
from twisted.web.server import Request
from synapse.http.server import HttpServer
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import JsonDict, UserID, create_requester
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class ReplicationProfileSetFieldValue(ReplicationEndpoint):
"""Set profile field for a user.
The POST looks like:
POST /_synapse/replication/profile_set_field_value/<user_id>
{
"requester_id": "@user:domain.tld",
"field_name": "displayname",
"new_value": "User Display Name",
"by_admin": False,
"propagate": False,
}
200 OK
{}
"""
NAME = "profile_set_field_value"
PATH_ARGS = ("user_id",)
METHOD = "POST"
CACHE = False
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self._profile_handler = hs.get_profile_handler()
@staticmethod
async def _serialize_payload( # type: ignore[override]
requester_id: str,
field_name: str,
new_value: str | None,
by_admin: bool = False,
propagate: bool = False,
authenticated_entity: str | None = None,
) -> JsonDict:
return {
"requester_id": requester_id,
"field_name": field_name,
"new_value": new_value,
"by_admin": by_admin,
"propagate": propagate,
"authenticated_entity": authenticated_entity,
}
async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict, user_id: str
) -> tuple[int, JsonDict]:
# Create a requester object with potentially an authenticated_entity,
# ie an admin who has done the request on behalf of the user.
requester = create_requester(
user_id=user_id,
authenticated_entity=content["authenticated_entity"] if content["by_admin"] else None,
)
await self._profile_handler.set_field(
target_user=UserID.from_string(user_id),
requester=requester,
field_name=content["field_name"],
new_value=content["new_value"],
by_admin=content["by_admin"],
propagate=content["propagate"],
)
return (200, {})
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationProfileSetFieldValue(hs).register(http_server)
+25 -16
View File
@@ -35,6 +35,7 @@ from synapse.http.servlet import (
parse_json_object_from_request,
)
from synapse.http.site import SynapseRequest
from synapse.replication.http.profile import ReplicationProfileSetFieldValue
from synapse.rest.client._base import client_patterns
from synapse.types import JsonDict, JsonValue, UserID
from synapse.util.stringutils import is_namedspaced_grammar
@@ -160,13 +161,6 @@ class ProfileFieldRestServlet(RestServlet):
async def on_PUT(
self, request: SynapseRequest, user_id: str, field_name: str
) -> tuple[int, JsonDict]:
if not self._is_profile_worker:
raise SynapseError(
HTTPStatus.METHOD_NOT_ALLOWED,
"Can only handle PUT /profile on instances configured to handle the profile_updates stream writer",
Codes.UNRECOGNIZED,
)
if not UserID.is_valid(user_id):
raise SynapseError(
HTTPStatus.BAD_REQUEST, "Invalid user id", Codes.INVALID_PARAM
@@ -214,17 +208,32 @@ class ProfileFieldRestServlet(RestServlet):
Codes.USER_ACCOUNT_SUSPENDED,
)
if field_name == ProfileFields.DISPLAYNAME:
await self.profile_handler.set_displayname(
user, requester, new_value, by_admin=is_admin, propagate=propagate
)
elif field_name == ProfileFields.AVATAR_URL:
await self.profile_handler.set_avatar_url(
user, requester, new_value, by_admin=is_admin, propagate=propagate
if self._is_profile_worker:
await self.profile_handler.set_field(
target_user=user,
requester=requester,
field_name=field_name,
new_value=new_value,
by_admin=is_admin,
propagate=propagate,
)
else:
await self.profile_handler.set_profile_field(
user, requester, field_name, new_value, by_admin=is_admin
# Offload to the right worker via http replication
set_profile_data_client = ReplicationProfileSetFieldValue.make_client(
self.hs
)
profile_updates_writer_instance = (
self.hs.config.worker.writers.profile_updates[0]
)
await set_profile_data_client(
instance_name=profile_updates_writer_instance,
user_id=user.to_string(),
requester_id=requester.user.to_string(),
field_name=field_name,
new_value=new_value,
by_admin=is_admin,
propagate=propagate,
authenticated_entity=requester.authenticated_entity,
)
return 200, {}