Allow fetching latest changes via /sync and filtering

Based on the provided since token and filter.
This commit is contained in:
Andrew Morgan
2026-03-13 16:52:46 -04:00
parent e45045d2a5
commit d9adfd85d3
3 changed files with 160 additions and 0 deletions

View File

@@ -123,6 +123,13 @@ USER_FILTER_SCHEMA = {
"filter": FILTER_SCHEMA,
"room_filter": ROOM_FILTER_SCHEMA,
"room_event_filter": ROOM_EVENT_FILTER_SCHEMA,
"profile_fields_filter": {
"type": "object",
"properties": {
"ids": {"type": "array", "items": {"type": "string"}},
},
"additionalProperties": True,
},
},
"properties": {
"presence": {"$ref": "#/definitions/filter"},
@@ -130,6 +137,10 @@ USER_FILTER_SCHEMA = {
"room": {"$ref": "#/definitions/room_filter"},
"event_format": {"type": "string", "enum": ["client", "federation"]},
"event_fields": {"type": "array", "items": {"type": "string"}},
"profile_fields": {"$ref": "#/definitions/profile_fields_filter"},
"org.matrix.msc4429.profile_fields": {
"$ref": "#/definitions/profile_fields_filter"
},
},
"additionalProperties": True, # Allow new fields for forward compatibility
}
@@ -217,6 +228,20 @@ class FilterCollection:
self.event_fields = filter_json.get("event_fields", [])
self.event_format = filter_json.get("event_format", "client")
self.profile_fields: list[str] = []
if hs.config.experimental.msc4429_enabled:
profile_fields_filter = filter_json.get("profile_fields")
if profile_fields_filter is None:
profile_fields_filter = filter_json.get(
"org.matrix.msc4429.profile_fields"
)
if isinstance(profile_fields_filter, Mapping):
ids = profile_fields_filter.get("ids", [])
if ids is None:
ids = []
self.profile_fields = list(ids)
def __repr__(self) -> str:
return "<FilterCollection %s>" % (json.dumps(self._filter_json),)

View File

@@ -37,6 +37,7 @@ from synapse.api.constants import (
EventContentFields,
EventTypes,
Membership,
ProfileFields,
StickyEvent,
)
from synapse.api.filtering import FilterCollection
@@ -63,6 +64,7 @@ from synapse.types import (
DeviceListUpdates,
JsonDict,
JsonMapping,
JsonValue,
MultiWriterStreamToken,
MutableStateMap,
Requester,
@@ -223,6 +225,7 @@ class SyncResult:
next_batch: Token for the next sync
presence: List of presence events for the user.
account_data: List of account_data events for the user.
profile_updates: Map of user_id to profile field updates for that user.
joined: JoinedSyncResult for each joined room.
invited: InvitedSyncResult for each invited room.
knocked: KnockedSyncResult for each knocked on room.
@@ -238,6 +241,7 @@ class SyncResult:
next_batch: StreamToken
presence: list[UserPresenceState]
account_data: list[JsonDict]
profile_updates: dict[str, dict[str, JsonValue | None]]
joined: list[JoinedSyncResult]
invited: list[InvitedSyncResult]
knocked: list[KnockedSyncResult]
@@ -259,6 +263,7 @@ class SyncResult:
or self.knocked
or self.archived
or self.account_data
or self.profile_updates
or self.to_device
or self.device_lists
)
@@ -274,6 +279,7 @@ class SyncResult:
next_batch=next_batch,
presence=[],
account_data=[],
profile_updates={},
joined=[],
invited=[],
knocked=[],
@@ -290,6 +296,7 @@ class SyncHandler:
self.server_name = hs.hostname
self.hs_config = hs.config
self.store = hs.get_datastores().main
self._is_mine_id = hs.is_mine_id
self.notifier = hs.get_notifier()
self.presence_handler = hs.get_presence_handler()
self._relations_handler = hs.get_relations_handler()
@@ -1733,6 +1740,9 @@ class SyncHandler:
if not sync_config.filter_collection.blocks_all_global_account_data():
await self._generate_sync_entry_for_account_data(sync_result_builder)
if self.hs_config.experimental.msc4429_enabled:
await self._generate_sync_entry_for_profile_updates(sync_result_builder)
# Presence data is included if the server has it enabled and not filtered out.
include_presence_data = bool(
self.hs_config.server.presence_enabled
@@ -1832,6 +1842,7 @@ class SyncHandler:
return SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
profile_updates=sync_result_builder.profile_updates,
joined=sync_result_builder.joined,
invited=sync_result_builder.invited,
knocked=sync_result_builder.knocked,
@@ -2096,6 +2107,121 @@ class SyncHandler:
sync_result_builder.account_data = account_data_for_user
async def _generate_initial_sync_entry_for_profile_updates(
self,
user_id: str,
sync_result_builder: "SyncResultBuilder",
profile_fields: list[str],
) -> None:
user_ids = await self.store.get_users_who_share_room_with_user(user_id)
user_ids = {u for u in user_ids if self._is_mine_id(u)}
if not user_ids:
return
profile_data_by_user = await self.store.get_profile_data_for_users(user_ids)
all_updates: dict[str, dict[str, JsonValue | None]] = {}
for other_user_id in user_ids:
displayname = None
avatar_url = None
custom_fields: JsonDict = {}
profile_data = profile_data_by_user.get(other_user_id)
if profile_data is not None:
displayname, avatar_url, custom_fields = profile_data
per_user_updates: dict[str, JsonValue | None] = {}
for field_name in profile_fields:
if field_name == ProfileFields.DISPLAYNAME:
per_user_updates[field_name] = displayname
elif field_name == ProfileFields.AVATAR_URL:
per_user_updates[field_name] = avatar_url
else:
per_user_updates[field_name] = custom_fields.get(field_name)
all_updates[other_user_id] = per_user_updates
sync_result_builder.profile_updates = all_updates
return
async def _generate_sync_entry_for_profile_updates(
self, sync_result_builder: "SyncResultBuilder"
) -> None:
"""Generates the profile update portion of the sync response."""
sync_config = sync_result_builder.sync_config
profile_fields = sync_config.filter_collection.profile_fields
if not profile_fields:
return
user_id = sync_config.user.to_string()
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
if since_token is None:
# TODO: Refactor this into a separate function.
await self._generate_initial_sync_entry_for_profile_updates(
user_id, sync_result_builder, profile_fields
)
return
if since_token.profile_updates_key == now_token.profile_updates_key:
return
updates = await self.store.get_profile_updates_for_fields(
from_id=since_token.profile_updates_key,
to_id=now_token.profile_updates_key,
field_names=profile_fields,
)
if not updates:
return
updated_user_ids = {update.user_id for update in updates}
if not updated_user_ids:
return
shared_user_ids = await self.store.do_users_share_a_room(
user_id, updated_user_ids
)
shared_user_ids.add(user_id)
user_fields: dict[str, set[str]] = {}
for update in updates:
if update.user_id not in shared_user_ids:
continue
user_fields.setdefault(update.user_id, set()).add(update.field_name)
if not user_fields:
return
profile_data_by_user = await self.store.get_profile_data_for_users(
user_fields.keys()
)
profile_updates: dict[str, dict[str, JsonValue | None]] = {}
for other_user_id, fields in user_fields.items():
displayname = None
avatar_url = None
custom_fields: JsonDict = {}
profile_data = profile_data_by_user.get(other_user_id)
if profile_data is not None:
displayname, avatar_url, custom_fields = profile_data
per_user_updates: dict[str, JsonValue | None] = {}
for field_name in fields:
if field_name == ProfileFields.DISPLAYNAME:
per_user_updates[field_name] = displayname
elif field_name == ProfileFields.AVATAR_URL:
per_user_updates[field_name] = avatar_url
else:
per_user_updates[field_name] = custom_fields.get(field_name)
if per_user_updates:
profile_updates[other_user_id] = per_user_updates
if profile_updates:
sync_result_builder.profile_updates = profile_updates
async def _generate_sync_entry_for_presence(
self,
sync_result_builder: "SyncResultBuilder",
@@ -3108,6 +3234,7 @@ class SyncResultBuilder:
# The following mirror the fields in a sync response
presence
account_data
profile_updates
joined
invited
knocked
@@ -3126,6 +3253,7 @@ class SyncResultBuilder:
presence: list[UserPresenceState] = attr.Factory(list)
account_data: list[JsonDict] = attr.Factory(list)
profile_updates: dict[str, dict[str, JsonValue | None]] = attr.Factory(dict)
joined: list[JoinedSyncResult] = attr.Factory(list)
invited: list[InvitedSyncResult] = attr.Factory(list)
knocked: list[KnockedSyncResult] = attr.Factory(list)

View File

@@ -124,6 +124,7 @@ class SyncRestServlet(RestServlet):
self._event_serializer = hs.get_event_client_serializer()
self._msc2654_enabled = hs.config.experimental.msc2654_enabled
self._msc3773_enabled = hs.config.experimental.msc3773_enabled
self._msc4429_enabled = hs.config.experimental.msc4429_enabled
self._json_filter_cache: LruCache[str, bool] = LruCache(
max_size=1000,
@@ -352,6 +353,12 @@ class SyncRestServlet(RestServlet):
if sync_result.to_device:
response["to_device"] = {"events": sync_result.to_device}
if self._msc4429_enabled and sync_result.profile_updates:
response["org.matrix.msc4429.users"] = {
user_id: {"profile_updates": updates}
for user_id, updates in sync_result.profile_updates.items()
}
if sync_result.device_lists.changed:
response["device_lists"]["changed"] = list(sync_result.device_lists.changed)
if sync_result.device_lists.left: