diff --git a/docs/workers.md b/docs/workers.md index 8d3aad19c6..e37592c9a6 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -584,6 +584,15 @@ configured as stream writer for the `quarantined_media_changes` stream: ^/_synapse/admin/v1/quarantine_media/.*$ +#### The `profile_updates` stream + +The `profile_updates` stream supports multiple writers. The following endpoints +can be handled by any worker, but PUT and DELETE should be routed directly to one of the +workers configured as stream writer for the `profile_updates` stream: + + ^/_matrix/client/(api/v1|r0|v3|unstable)/profile/.*/[^/]+$ + + #### Restrict outbound federation traffic to a specific set of workers The diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 5a052baec2..ce044aa480 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -599,6 +599,42 @@ class ProfileHandler: return result.get(field_name) + async def set_field( + self, + *, + target_user: UserID, + requester: Requester, + field_name: str, + new_value: str, + by_admin: bool = False, + propagate: bool = False, + ) -> None: + """Wrapper function for setting any profile field for a user.""" + if field_name == ProfileFields.DISPLAYNAME: + await self.set_displayname( + target_user=target_user, + requester=requester, + new_displayname=new_value, + by_admin=by_admin, + propagate=propagate, + ) + elif field_name == ProfileFields.AVATAR_URL: + await self.set_avatar_url( + target_user=target_user, + requester=requester, + new_avatar_url=new_value, + by_admin=by_admin, + propagate=propagate, + ) + else: + await self.set_profile_field( + target_user=target_user, + requester=requester, + field_name=field_name, + new_value=new_value, + by_admin=by_admin, + ) + async def set_profile_field( self, target_user: UserID, diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 68cc6ce1fc..d934ef8067 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -30,6 +30,7 @@ from synapse.replication.http import ( login, membership, presence, + profile, push, register, send_events, @@ -59,6 +60,7 @@ class ReplicationRestResource(JsonResource): push.register_servlets(hs, self) state.register_servlets(hs, self) devices.register_servlets(hs, self) + profile.register_servlets(hs, self) # The following can't currently be instantiated on workers. if hs.config.worker.worker_app is None: diff --git a/synapse/replication/http/profile.py b/synapse/replication/http/profile.py new file mode 100644 index 0000000000..ccefc4ed4a --- /dev/null +++ b/synapse/replication/http/profile.py @@ -0,0 +1,101 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2026 Element Creations, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# + + +import logging +from typing import TYPE_CHECKING + +from twisted.web.server import Request + +from synapse.http.server import HttpServer +from synapse.replication.http._base import ReplicationEndpoint +from synapse.types import JsonDict, UserID, create_requester + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class ReplicationProfileSetFieldValue(ReplicationEndpoint): + """Set profile field for a user. + + The POST looks like: + + POST /_synapse/replication/profile_set_field_value/ + + { + "requester_id": "@user:domain.tld", + "field_name": "displayname", + "new_value": "User Display Name", + "by_admin": False, + "propagate": False, + } + + 200 OK + + {} + """ + + NAME = "profile_set_field_value" + PATH_ARGS = ("user_id",) + METHOD = "POST" + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._profile_handler = hs.get_profile_handler() + + @staticmethod + async def _serialize_payload( # type: ignore[override] + requester_id: str, + field_name: str, + new_value: str | None, + by_admin: bool = False, + propagate: bool = False, + authenticated_entity: str | None = None, + ) -> JsonDict: + return { + "requester_id": requester_id, + "field_name": field_name, + "new_value": new_value, + "by_admin": by_admin, + "propagate": propagate, + "authenticated_entity": authenticated_entity, + } + + async def _handle_request( # type: ignore[override] + self, request: Request, content: JsonDict, user_id: str + ) -> tuple[int, JsonDict]: + # Create a requester object with potentially an authenticated_entity, + # ie an admin who has done the request on behalf of the user. + requester = create_requester( + user_id=user_id, + authenticated_entity=content["authenticated_entity"] if content["by_admin"] else None, + ) + await self._profile_handler.set_field( + target_user=UserID.from_string(user_id), + requester=requester, + field_name=content["field_name"], + new_value=content["new_value"], + by_admin=content["by_admin"], + propagate=content["propagate"], + ) + + return (200, {}) + + +def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: + ReplicationProfileSetFieldValue(hs).register(http_server) diff --git a/synapse/rest/client/profile.py b/synapse/rest/client/profile.py index 3ecba9659f..e7b2e4bcda 100644 --- a/synapse/rest/client/profile.py +++ b/synapse/rest/client/profile.py @@ -35,6 +35,7 @@ from synapse.http.servlet import ( parse_json_object_from_request, ) from synapse.http.site import SynapseRequest +from synapse.replication.http.profile import ReplicationProfileSetFieldValue from synapse.rest.client._base import client_patterns from synapse.types import JsonDict, JsonValue, UserID from synapse.util.stringutils import is_namedspaced_grammar @@ -160,13 +161,6 @@ class ProfileFieldRestServlet(RestServlet): async def on_PUT( self, request: SynapseRequest, user_id: str, field_name: str ) -> tuple[int, JsonDict]: - if not self._is_profile_worker: - raise SynapseError( - HTTPStatus.METHOD_NOT_ALLOWED, - "Can only handle PUT /profile on instances configured to handle the profile_updates stream writer", - Codes.UNRECOGNIZED, - ) - if not UserID.is_valid(user_id): raise SynapseError( HTTPStatus.BAD_REQUEST, "Invalid user id", Codes.INVALID_PARAM @@ -214,17 +208,32 @@ class ProfileFieldRestServlet(RestServlet): Codes.USER_ACCOUNT_SUSPENDED, ) - if field_name == ProfileFields.DISPLAYNAME: - await self.profile_handler.set_displayname( - user, requester, new_value, by_admin=is_admin, propagate=propagate - ) - elif field_name == ProfileFields.AVATAR_URL: - await self.profile_handler.set_avatar_url( - user, requester, new_value, by_admin=is_admin, propagate=propagate + if self._is_profile_worker: + await self.profile_handler.set_field( + target_user=user, + requester=requester, + field_name=field_name, + new_value=new_value, + by_admin=is_admin, + propagate=propagate, ) else: - await self.profile_handler.set_profile_field( - user, requester, field_name, new_value, by_admin=is_admin + # Offload to the right worker via http replication + set_profile_data_client = ReplicationProfileSetFieldValue.make_client( + self.hs + ) + profile_updates_writer_instance = ( + self.hs.config.worker.writers.profile_updates[0] + ) + await set_profile_data_client( + instance_name=profile_updates_writer_instance, + user_id=user.to_string(), + requester_id=requester.user.to_string(), + field_name=field_name, + new_value=new_value, + by_admin=is_admin, + propagate=propagate, + authenticated_entity=requester.authenticated_entity, ) return 200, {}