diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 0b185f8fef..364a04ef7b 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -129,6 +129,19 @@ class ProfileHandler: if not room_ids: return + users_who_share_rooms = ( + await self.store.get_local_users_who_share_room_with_user( + user_id.to_string() + ) + ) + # Remove ourselves from the user ID list + users_who_share_rooms.remove(user_id.to_string()) + if users_who_share_rooms: + await self.store.track_profile_updates_per_user( + stream_id=stream_id, + user_ids=users_who_share_rooms, + ) + self._notifier.on_new_event( StreamKeyType.PROFILE_UPDATES, stream_id, rooms=room_ids ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 36319e4e61..f5e3a47d54 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -2252,8 +2252,18 @@ class SyncHandler: to_id=now_token.profile_updates_key, field_names=profile_fields, ) + interesting_updates = await self.store.get_profile_updates_per_user_for_user( + from_id=since_token.profile_updates_key, + to_id=now_token.profile_updates_key, + user_id=user_id, + ) + # Only include updates we've got for us specifically + updates = [ + update for update in updates if update.stream_id in interesting_updates + ] + if include_users: - # Filter down to selected included users + # Further filter down to selected included users updates = [update for update in updates if update.user_id in include_users] if not updates: @@ -2264,10 +2274,7 @@ class SyncHandler: for update in updates if update.action == ProfileUpdateAction.UPDATE.value } - shared_updated_user_ids = await self.store.do_users_share_a_room( - user_id, updated_user_ids - ) - shared_updated_user_ids.add(user_id) + updated_user_ids.add(user_id) left_room_user_ids = { update.user_id for update in updates @@ -2282,7 +2289,7 @@ class SyncHandler: user_fields: dict[str, set[str]] = {} for update in updates: - if not update.field_name or update.user_id not in shared_updated_user_ids: + if not update.field_name or update.user_id not in updated_user_ids: continue user_fields.setdefault(update.user_id, set()).add(update.field_name) diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py index 9d726f29e4..c7f8aea48f 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py @@ -540,6 +540,74 @@ class ProfileWorkerStore(SQLBaseStore): "add_profile_updates", _add_profile_updates_txn ) + async def track_profile_updates_per_user( + self, + stream_id: int, + user_ids: set[str], + ) -> None: + """ + Create tracking rows for profile updater per target user interested in profile + updates for the user triggering one. + + Args: + stream_id: Stream ID referencing a `profile_updates` stream ID. + user_ids: A set of the full user IDs of the target users interested in + this change. + """ + + def _track_profile_updates_per_user_txn(txn: LoggingTransaction) -> None: + inserted_ts = self.clock.time_msec() + values = [[stream_id, user_id, inserted_ts] for user_id in user_ids] + self.db_pool.simple_insert_many_txn( + txn, + table="profile_updates_per_user", + keys=[ + "stream_id", + "user_id", + "inserted_ts", + ], + values=values, + ) + + return await self.db_pool.runInteraction( + "track_profile_updates_per_user", + _track_profile_updates_per_user_txn, + ) + + async def get_profile_updates_per_user_for_user( + self, *, from_id: int, to_id: int, user_id: str + ) -> list[int]: + """ + Get profile updates per user stream ID's for a particular user. + + Args: + from_id: The starting stream ID (exclusive) + to_id: The ending stream ID (inclusive) + user_id: The full user ID to filter on + + Returns: + List of stream ID's. + """ + + def _get_profile_updates_per_user_for_user_txn( + txn: LoggingTransaction, + ) -> list[int]: + sql = """ + SELECT + stream_id + FROM profile_updates_per_user + WHERE + ? < stream_id AND stream_id <= ? AND user_id = ? + """ + txn.execute(sql, (from_id, to_id, user_id)) + rows = cast(list[tuple[int]], txn.fetchall()) + return [row[0] for row in rows] + + return await self.db_pool.runInteraction( + "get_profile_updates_per_user_for_user", + _get_profile_updates_per_user_for_user_txn, + ) + async def create_profile(self, user_id: UserID) -> None: """ Create a blank profile for a user. diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 1ff05120c4..5267a6ab3b 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -316,6 +316,53 @@ class ProfileTestCase(unittest.HomeserverTestCase): (3, "@1234abcd:test", ProfileUpdateAction.UPDATE.value, field_name), ) + @override_config({"experimental_features": {"msc4429_enabled": True}}) + def test_update_profile_set_field_writes_to_per_user_profile_tracking_table( + self, + ) -> None: + self.register_user("roger", "password") + roger_token = self.login("roger", "password") + self.register_user("millie", "password") + millie_token = self.login("millie", "password") + room_id = self.helper.create_room_as( + room_creator=self.frank.to_string(), + tok=self.frank_token, + ) + self.helper.join(room_id, "@roger:test", tok=roger_token) + self.helper.join(room_id, "@millie:test", tok=millie_token) + self.get_success( + self.handler.set_field( + target_user=self.frank, + requester=synapse.types.create_requester(self.frank), + field_name="m.status", + new_value='{"text": "Holiday"}', + ) + ) + per_user_updates = self.get_success( + self.store.get_profile_updates_per_user_for_user( + from_id=1, + to_id=2, + user_id="@roger:test", + ) + ) + self.assertEqual(per_user_updates, [2]) + per_user_updates = self.get_success( + self.store.get_profile_updates_per_user_for_user( + from_id=1, + to_id=2, + user_id="@millie:test", + ) + ) + self.assertEqual(per_user_updates, [2]) + per_user_updates = self.get_success( + self.store.get_profile_updates_per_user_for_user( + from_id=1, + to_id=2, + user_id=self.frank.to_string(), + ) + ) + self.assertEqual(per_user_updates, []) + @parameterized.expand( [ ["displayname", "Frank"],