From e90096ef571c55eca2da1147d7850a5422c4e1ed Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Fri, 5 Jun 2026 19:17:12 +0300 Subject: [PATCH] Use a "per user profile updates" tracking table This makes writes heavier when profile updates happen, but reduces the effort to produce an incremental sync response by not needing to look whether users share rooms. --- synapse/handlers/profile.py | 13 +++++ synapse/handlers/sync.py | 19 +++++-- synapse/storage/databases/main/profile.py | 68 +++++++++++++++++++++++ tests/handlers/test_profile.py | 47 ++++++++++++++++ 4 files changed, 141 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 0b185f8fef..364a04ef7b 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -129,6 +129,19 @@ class ProfileHandler: if not room_ids: return + users_who_share_rooms = ( + await self.store.get_local_users_who_share_room_with_user( + user_id.to_string() + ) + ) + # Remove ourselves from the user ID list + users_who_share_rooms.remove(user_id.to_string()) + if users_who_share_rooms: + await self.store.track_profile_updates_per_user( + stream_id=stream_id, + user_ids=users_who_share_rooms, + ) + self._notifier.on_new_event( StreamKeyType.PROFILE_UPDATES, stream_id, rooms=room_ids ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 36319e4e61..f5e3a47d54 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -2252,8 +2252,18 @@ class SyncHandler: to_id=now_token.profile_updates_key, field_names=profile_fields, ) + interesting_updates = await self.store.get_profile_updates_per_user_for_user( + from_id=since_token.profile_updates_key, + to_id=now_token.profile_updates_key, + user_id=user_id, + ) + # Only include updates we've got for us specifically + updates = [ + update for update in updates if update.stream_id in interesting_updates + ] + if include_users: - # Filter down to selected included users + # Further filter down to selected included users updates = [update for update in updates if update.user_id in include_users] if not updates: @@ -2264,10 +2274,7 @@ class SyncHandler: for update in updates if update.action == ProfileUpdateAction.UPDATE.value } - shared_updated_user_ids = await self.store.do_users_share_a_room( - user_id, updated_user_ids - ) - shared_updated_user_ids.add(user_id) + updated_user_ids.add(user_id) left_room_user_ids = { update.user_id for update in updates @@ -2282,7 +2289,7 @@ class SyncHandler: user_fields: dict[str, set[str]] = {} for update in updates: - if not update.field_name or update.user_id not in shared_updated_user_ids: + if not update.field_name or update.user_id not in updated_user_ids: continue user_fields.setdefault(update.user_id, set()).add(update.field_name) diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py index 9d726f29e4..c7f8aea48f 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py @@ -540,6 +540,74 @@ class ProfileWorkerStore(SQLBaseStore): "add_profile_updates", _add_profile_updates_txn ) + async def track_profile_updates_per_user( + self, + stream_id: int, + user_ids: set[str], + ) -> None: + """ + Create tracking rows for profile updater per target user interested in profile + updates for the user triggering one. + + Args: + stream_id: Stream ID referencing a `profile_updates` stream ID. + user_ids: A set of the full user IDs of the target users interested in + this change. + """ + + def _track_profile_updates_per_user_txn(txn: LoggingTransaction) -> None: + inserted_ts = self.clock.time_msec() + values = [[stream_id, user_id, inserted_ts] for user_id in user_ids] + self.db_pool.simple_insert_many_txn( + txn, + table="profile_updates_per_user", + keys=[ + "stream_id", + "user_id", + "inserted_ts", + ], + values=values, + ) + + return await self.db_pool.runInteraction( + "track_profile_updates_per_user", + _track_profile_updates_per_user_txn, + ) + + async def get_profile_updates_per_user_for_user( + self, *, from_id: int, to_id: int, user_id: str + ) -> list[int]: + """ + Get profile updates per user stream ID's for a particular user. + + Args: + from_id: The starting stream ID (exclusive) + to_id: The ending stream ID (inclusive) + user_id: The full user ID to filter on + + Returns: + List of stream ID's. + """ + + def _get_profile_updates_per_user_for_user_txn( + txn: LoggingTransaction, + ) -> list[int]: + sql = """ + SELECT + stream_id + FROM profile_updates_per_user + WHERE + ? < stream_id AND stream_id <= ? AND user_id = ? + """ + txn.execute(sql, (from_id, to_id, user_id)) + rows = cast(list[tuple[int]], txn.fetchall()) + return [row[0] for row in rows] + + return await self.db_pool.runInteraction( + "get_profile_updates_per_user_for_user", + _get_profile_updates_per_user_for_user_txn, + ) + async def create_profile(self, user_id: UserID) -> None: """ Create a blank profile for a user. diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 1ff05120c4..5267a6ab3b 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -316,6 +316,53 @@ class ProfileTestCase(unittest.HomeserverTestCase): (3, "@1234abcd:test", ProfileUpdateAction.UPDATE.value, field_name), ) + @override_config({"experimental_features": {"msc4429_enabled": True}}) + def test_update_profile_set_field_writes_to_per_user_profile_tracking_table( + self, + ) -> None: + self.register_user("roger", "password") + roger_token = self.login("roger", "password") + self.register_user("millie", "password") + millie_token = self.login("millie", "password") + room_id = self.helper.create_room_as( + room_creator=self.frank.to_string(), + tok=self.frank_token, + ) + self.helper.join(room_id, "@roger:test", tok=roger_token) + self.helper.join(room_id, "@millie:test", tok=millie_token) + self.get_success( + self.handler.set_field( + target_user=self.frank, + requester=synapse.types.create_requester(self.frank), + field_name="m.status", + new_value='{"text": "Holiday"}', + ) + ) + per_user_updates = self.get_success( + self.store.get_profile_updates_per_user_for_user( + from_id=1, + to_id=2, + user_id="@roger:test", + ) + ) + self.assertEqual(per_user_updates, [2]) + per_user_updates = self.get_success( + self.store.get_profile_updates_per_user_for_user( + from_id=1, + to_id=2, + user_id="@millie:test", + ) + ) + self.assertEqual(per_user_updates, [2]) + per_user_updates = self.get_success( + self.store.get_profile_updates_per_user_for_user( + from_id=1, + to_id=2, + user_id=self.frank.to_string(), + ) + ) + self.assertEqual(per_user_updates, []) + @parameterized.expand( [ ["displayname", "Frank"],