From d9adfd85d340b11e90b600bd3fb51f56447c0463 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 13 Mar 2026 16:52:46 -0400 Subject: [PATCH] Allow fetching latest changes via /sync and filtering Based on the provided since token and filter. --- synapse/api/filtering.py | 25 +++++++ synapse/handlers/sync.py | 128 ++++++++++++++++++++++++++++++++++++ synapse/rest/client/sync.py | 7 ++ 3 files changed, 160 insertions(+) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 9b47c20437..bb46ead53c 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -123,6 +123,13 @@ USER_FILTER_SCHEMA = { "filter": FILTER_SCHEMA, "room_filter": ROOM_FILTER_SCHEMA, "room_event_filter": ROOM_EVENT_FILTER_SCHEMA, + "profile_fields_filter": { + "type": "object", + "properties": { + "ids": {"type": "array", "items": {"type": "string"}}, + }, + "additionalProperties": True, + }, }, "properties": { "presence": {"$ref": "#/definitions/filter"}, @@ -130,6 +137,10 @@ USER_FILTER_SCHEMA = { "room": {"$ref": "#/definitions/room_filter"}, "event_format": {"type": "string", "enum": ["client", "federation"]}, "event_fields": {"type": "array", "items": {"type": "string"}}, + "profile_fields": {"$ref": "#/definitions/profile_fields_filter"}, + "org.matrix.msc4429.profile_fields": { + "$ref": "#/definitions/profile_fields_filter" + }, }, "additionalProperties": True, # Allow new fields for forward compatibility } @@ -217,6 +228,20 @@ class FilterCollection: self.event_fields = filter_json.get("event_fields", []) self.event_format = filter_json.get("event_format", "client") + self.profile_fields: list[str] = [] + if hs.config.experimental.msc4429_enabled: + profile_fields_filter = filter_json.get("profile_fields") + if profile_fields_filter is None: + profile_fields_filter = filter_json.get( + "org.matrix.msc4429.profile_fields" + ) + + if isinstance(profile_fields_filter, Mapping): + ids = profile_fields_filter.get("ids", []) + if ids is None: + ids = [] + self.profile_fields = list(ids) + def __repr__(self) -> str: return "" % (json.dumps(self._filter_json),) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c8ef5e2aa6..b2c24e26ca 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -37,6 +37,7 @@ from synapse.api.constants import ( EventContentFields, EventTypes, Membership, + ProfileFields, StickyEvent, ) from synapse.api.filtering import FilterCollection @@ -63,6 +64,7 @@ from synapse.types import ( DeviceListUpdates, JsonDict, JsonMapping, + JsonValue, MultiWriterStreamToken, MutableStateMap, Requester, @@ -223,6 +225,7 @@ class SyncResult: next_batch: Token for the next sync presence: List of presence events for the user. account_data: List of account_data events for the user. + profile_updates: Map of user_id to profile field updates for that user. joined: JoinedSyncResult for each joined room. invited: InvitedSyncResult for each invited room. knocked: KnockedSyncResult for each knocked on room. @@ -238,6 +241,7 @@ class SyncResult: next_batch: StreamToken presence: list[UserPresenceState] account_data: list[JsonDict] + profile_updates: dict[str, dict[str, JsonValue | None]] joined: list[JoinedSyncResult] invited: list[InvitedSyncResult] knocked: list[KnockedSyncResult] @@ -259,6 +263,7 @@ class SyncResult: or self.knocked or self.archived or self.account_data + or self.profile_updates or self.to_device or self.device_lists ) @@ -274,6 +279,7 @@ class SyncResult: next_batch=next_batch, presence=[], account_data=[], + profile_updates={}, joined=[], invited=[], knocked=[], @@ -290,6 +296,7 @@ class SyncHandler: self.server_name = hs.hostname self.hs_config = hs.config self.store = hs.get_datastores().main + self._is_mine_id = hs.is_mine_id self.notifier = hs.get_notifier() self.presence_handler = hs.get_presence_handler() self._relations_handler = hs.get_relations_handler() @@ -1733,6 +1740,9 @@ class SyncHandler: if not sync_config.filter_collection.blocks_all_global_account_data(): await self._generate_sync_entry_for_account_data(sync_result_builder) + if self.hs_config.experimental.msc4429_enabled: + await self._generate_sync_entry_for_profile_updates(sync_result_builder) + # Presence data is included if the server has it enabled and not filtered out. include_presence_data = bool( self.hs_config.server.presence_enabled @@ -1832,6 +1842,7 @@ class SyncHandler: return SyncResult( presence=sync_result_builder.presence, account_data=sync_result_builder.account_data, + profile_updates=sync_result_builder.profile_updates, joined=sync_result_builder.joined, invited=sync_result_builder.invited, knocked=sync_result_builder.knocked, @@ -2096,6 +2107,121 @@ class SyncHandler: sync_result_builder.account_data = account_data_for_user + async def _generate_initial_sync_entry_for_profile_updates( + self, + user_id: str, + sync_result_builder: "SyncResultBuilder", + profile_fields: list[str], + ) -> None: + user_ids = await self.store.get_users_who_share_room_with_user(user_id) + user_ids = {u for u in user_ids if self._is_mine_id(u)} + if not user_ids: + return + + profile_data_by_user = await self.store.get_profile_data_for_users(user_ids) + + all_updates: dict[str, dict[str, JsonValue | None]] = {} + for other_user_id in user_ids: + displayname = None + avatar_url = None + custom_fields: JsonDict = {} + + profile_data = profile_data_by_user.get(other_user_id) + if profile_data is not None: + displayname, avatar_url, custom_fields = profile_data + + per_user_updates: dict[str, JsonValue | None] = {} + for field_name in profile_fields: + if field_name == ProfileFields.DISPLAYNAME: + per_user_updates[field_name] = displayname + elif field_name == ProfileFields.AVATAR_URL: + per_user_updates[field_name] = avatar_url + else: + per_user_updates[field_name] = custom_fields.get(field_name) + + all_updates[other_user_id] = per_user_updates + + sync_result_builder.profile_updates = all_updates + return + + async def _generate_sync_entry_for_profile_updates( + self, sync_result_builder: "SyncResultBuilder" + ) -> None: + """Generates the profile update portion of the sync response.""" + sync_config = sync_result_builder.sync_config + profile_fields = sync_config.filter_collection.profile_fields + if not profile_fields: + return + + user_id = sync_config.user.to_string() + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + + if since_token is None: + # TODO: Refactor this into a separate function. + await self._generate_initial_sync_entry_for_profile_updates( + user_id, sync_result_builder, profile_fields + ) + return + + if since_token.profile_updates_key == now_token.profile_updates_key: + return + + updates = await self.store.get_profile_updates_for_fields( + from_id=since_token.profile_updates_key, + to_id=now_token.profile_updates_key, + field_names=profile_fields, + ) + if not updates: + return + + updated_user_ids = {update.user_id for update in updates} + if not updated_user_ids: + return + + shared_user_ids = await self.store.do_users_share_a_room( + user_id, updated_user_ids + ) + shared_user_ids.add(user_id) + + user_fields: dict[str, set[str]] = {} + for update in updates: + if update.user_id not in shared_user_ids: + continue + user_fields.setdefault(update.user_id, set()).add(update.field_name) + + if not user_fields: + return + + profile_data_by_user = await self.store.get_profile_data_for_users( + user_fields.keys() + ) + + profile_updates: dict[str, dict[str, JsonValue | None]] = {} + for other_user_id, fields in user_fields.items(): + displayname = None + avatar_url = None + custom_fields: JsonDict = {} + + profile_data = profile_data_by_user.get(other_user_id) + if profile_data is not None: + displayname, avatar_url, custom_fields = profile_data + + per_user_updates: dict[str, JsonValue | None] = {} + for field_name in fields: + if field_name == ProfileFields.DISPLAYNAME: + per_user_updates[field_name] = displayname + elif field_name == ProfileFields.AVATAR_URL: + per_user_updates[field_name] = avatar_url + else: + per_user_updates[field_name] = custom_fields.get(field_name) + + if per_user_updates: + profile_updates[other_user_id] = per_user_updates + + if profile_updates: + sync_result_builder.profile_updates = profile_updates + async def _generate_sync_entry_for_presence( self, sync_result_builder: "SyncResultBuilder", @@ -3108,6 +3234,7 @@ class SyncResultBuilder: # The following mirror the fields in a sync response presence account_data + profile_updates joined invited knocked @@ -3126,6 +3253,7 @@ class SyncResultBuilder: presence: list[UserPresenceState] = attr.Factory(list) account_data: list[JsonDict] = attr.Factory(list) + profile_updates: dict[str, dict[str, JsonValue | None]] = attr.Factory(dict) joined: list[JoinedSyncResult] = attr.Factory(list) invited: list[InvitedSyncResult] = attr.Factory(list) knocked: list[KnockedSyncResult] = attr.Factory(list) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 710d097eab..c696cbb143 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -124,6 +124,7 @@ class SyncRestServlet(RestServlet): self._event_serializer = hs.get_event_client_serializer() self._msc2654_enabled = hs.config.experimental.msc2654_enabled self._msc3773_enabled = hs.config.experimental.msc3773_enabled + self._msc4429_enabled = hs.config.experimental.msc4429_enabled self._json_filter_cache: LruCache[str, bool] = LruCache( max_size=1000, @@ -352,6 +353,12 @@ class SyncRestServlet(RestServlet): if sync_result.to_device: response["to_device"] = {"events": sync_result.to_device} + if self._msc4429_enabled and sync_result.profile_updates: + response["org.matrix.msc4429.users"] = { + user_id: {"profile_updates": updates} + for user_id, updates in sync_result.profile_updates.items() + } + if sync_result.device_lists.changed: response["device_lists"]["changed"] = list(sync_result.device_lists.changed) if sync_result.device_lists.left: