From df1b58733241780c0ef3c5aee114f2332fe785fc Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Thu, 28 May 2026 22:10:06 +0300 Subject: [PATCH] Add `ReplicationProfileSetFieldValue` endpoint If we get PUT for profile fields on an instance that isn't the profile updates stream writer, we'll route the request to be finished on the correct stream writer via http replication. --- docs/workers.md | 9 +++ synapse/handlers/profile.py | 36 ++++++++++ synapse/replication/http/__init__.py | 2 + synapse/replication/http/profile.py | 101 +++++++++++++++++++++++++++ synapse/rest/client/profile.py | 41 ++++++----- 5 files changed, 173 insertions(+), 16 deletions(-) create mode 100644 synapse/replication/http/profile.py diff --git a/docs/workers.md b/docs/workers.md index 8d3aad19c6..e37592c9a6 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -584,6 +584,15 @@ configured as stream writer for the `quarantined_media_changes` stream: ^/_synapse/admin/v1/quarantine_media/.*$ +#### The `profile_updates` stream + +The `profile_updates` stream supports multiple writers. The following endpoints +can be handled by any worker, but PUT and DELETE should be routed directly to one of the +workers configured as stream writer for the `profile_updates` stream: + + ^/_matrix/client/(api/v1|r0|v3|unstable)/profile/.*/[^/]+$ + + #### Restrict outbound federation traffic to a specific set of workers The diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 5a052baec2..ce044aa480 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -599,6 +599,42 @@ class ProfileHandler: return result.get(field_name) + async def set_field( + self, + *, + target_user: UserID, + requester: Requester, + field_name: str, + new_value: str, + by_admin: bool = False, + propagate: bool = False, + ) -> None: + """Wrapper function for setting any profile field for a user.""" + if field_name == ProfileFields.DISPLAYNAME: + await self.set_displayname( + target_user=target_user, + requester=requester, + new_displayname=new_value, + by_admin=by_admin, + propagate=propagate, + ) + elif field_name == ProfileFields.AVATAR_URL: + await self.set_avatar_url( + target_user=target_user, + requester=requester, + new_avatar_url=new_value, + by_admin=by_admin, + propagate=propagate, + ) + else: + await self.set_profile_field( + target_user=target_user, + requester=requester, + field_name=field_name, + new_value=new_value, + by_admin=by_admin, + ) + async def set_profile_field( self, target_user: UserID, diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 68cc6ce1fc..d934ef8067 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -30,6 +30,7 @@ from synapse.replication.http import ( login, membership, presence, + profile, push, register, send_events, @@ -59,6 +60,7 @@ class ReplicationRestResource(JsonResource): push.register_servlets(hs, self) state.register_servlets(hs, self) devices.register_servlets(hs, self) + profile.register_servlets(hs, self) # The following can't currently be instantiated on workers. if hs.config.worker.worker_app is None: diff --git a/synapse/replication/http/profile.py b/synapse/replication/http/profile.py new file mode 100644 index 0000000000..ccefc4ed4a --- /dev/null +++ b/synapse/replication/http/profile.py @@ -0,0 +1,101 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2026 Element Creations, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# + + +import logging +from typing import TYPE_CHECKING + +from twisted.web.server import Request + +from synapse.http.server import HttpServer +from synapse.replication.http._base import ReplicationEndpoint +from synapse.types import JsonDict, UserID, create_requester + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class ReplicationProfileSetFieldValue(ReplicationEndpoint): + """Set profile field for a user. + + The POST looks like: + + POST /_synapse/replication/profile_set_field_value/ + + { + "requester_id": "@user:domain.tld", + "field_name": "displayname", + "new_value": "User Display Name", + "by_admin": False, + "propagate": False, + } + + 200 OK + + {} + """ + + NAME = "profile_set_field_value" + PATH_ARGS = ("user_id",) + METHOD = "POST" + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._profile_handler = hs.get_profile_handler() + + @staticmethod + async def _serialize_payload( # type: ignore[override] + requester_id: str, + field_name: str, + new_value: str | None, + by_admin: bool = False, + propagate: bool = False, + authenticated_entity: str | None = None, + ) -> JsonDict: + return { + "requester_id": requester_id, + "field_name": field_name, + "new_value": new_value, + "by_admin": by_admin, + "propagate": propagate, + "authenticated_entity": authenticated_entity, + } + + async def _handle_request( # type: ignore[override] + self, request: Request, content: JsonDict, user_id: str + ) -> tuple[int, JsonDict]: + # Create a requester object with potentially an authenticated_entity, + # ie an admin who has done the request on behalf of the user. + requester = create_requester( + user_id=user_id, + authenticated_entity=content["authenticated_entity"] if content["by_admin"] else None, + ) + await self._profile_handler.set_field( + target_user=UserID.from_string(user_id), + requester=requester, + field_name=content["field_name"], + new_value=content["new_value"], + by_admin=content["by_admin"], + propagate=content["propagate"], + ) + + return (200, {}) + + +def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: + ReplicationProfileSetFieldValue(hs).register(http_server) diff --git a/synapse/rest/client/profile.py b/synapse/rest/client/profile.py index 3ecba9659f..e7b2e4bcda 100644 --- a/synapse/rest/client/profile.py +++ b/synapse/rest/client/profile.py @@ -35,6 +35,7 @@ from synapse.http.servlet import ( parse_json_object_from_request, ) from synapse.http.site import SynapseRequest +from synapse.replication.http.profile import ReplicationProfileSetFieldValue from synapse.rest.client._base import client_patterns from synapse.types import JsonDict, JsonValue, UserID from synapse.util.stringutils import is_namedspaced_grammar @@ -160,13 +161,6 @@ class ProfileFieldRestServlet(RestServlet): async def on_PUT( self, request: SynapseRequest, user_id: str, field_name: str ) -> tuple[int, JsonDict]: - if not self._is_profile_worker: - raise SynapseError( - HTTPStatus.METHOD_NOT_ALLOWED, - "Can only handle PUT /profile on instances configured to handle the profile_updates stream writer", - Codes.UNRECOGNIZED, - ) - if not UserID.is_valid(user_id): raise SynapseError( HTTPStatus.BAD_REQUEST, "Invalid user id", Codes.INVALID_PARAM @@ -214,17 +208,32 @@ class ProfileFieldRestServlet(RestServlet): Codes.USER_ACCOUNT_SUSPENDED, ) - if field_name == ProfileFields.DISPLAYNAME: - await self.profile_handler.set_displayname( - user, requester, new_value, by_admin=is_admin, propagate=propagate - ) - elif field_name == ProfileFields.AVATAR_URL: - await self.profile_handler.set_avatar_url( - user, requester, new_value, by_admin=is_admin, propagate=propagate + if self._is_profile_worker: + await self.profile_handler.set_field( + target_user=user, + requester=requester, + field_name=field_name, + new_value=new_value, + by_admin=is_admin, + propagate=propagate, ) else: - await self.profile_handler.set_profile_field( - user, requester, field_name, new_value, by_admin=is_admin + # Offload to the right worker via http replication + set_profile_data_client = ReplicationProfileSetFieldValue.make_client( + self.hs + ) + profile_updates_writer_instance = ( + self.hs.config.worker.writers.profile_updates[0] + ) + await set_profile_data_client( + instance_name=profile_updates_writer_instance, + user_id=user.to_string(), + requester_id=requester.user.to_string(), + field_name=field_name, + new_value=new_value, + by_admin=is_admin, + propagate=propagate, + authenticated_entity=requester.authenticated_entity, ) return 200, {}