Use a "per user profile updates" tracking table

This makes writes heavier when profile updates happen, but reduces the effort to produce an incremental sync response by not needing to look whether users share rooms.
This commit is contained in:
Jason Robinson
2026-06-05 19:17:12 +03:00
parent 877074f38e
commit e90096ef57
4 changed files with 141 additions and 6 deletions
+13
View File
@@ -129,6 +129,19 @@ class ProfileHandler:
if not room_ids:
return
users_who_share_rooms = (
await self.store.get_local_users_who_share_room_with_user(
user_id.to_string()
)
)
# Remove ourselves from the user ID list
users_who_share_rooms.remove(user_id.to_string())
if users_who_share_rooms:
await self.store.track_profile_updates_per_user(
stream_id=stream_id,
user_ids=users_who_share_rooms,
)
self._notifier.on_new_event(
StreamKeyType.PROFILE_UPDATES, stream_id, rooms=room_ids
)
+13 -6
View File
@@ -2252,8 +2252,18 @@ class SyncHandler:
to_id=now_token.profile_updates_key,
field_names=profile_fields,
)
interesting_updates = await self.store.get_profile_updates_per_user_for_user(
from_id=since_token.profile_updates_key,
to_id=now_token.profile_updates_key,
user_id=user_id,
)
# Only include updates we've got for us specifically
updates = [
update for update in updates if update.stream_id in interesting_updates
]
if include_users:
# Filter down to selected included users
# Further filter down to selected included users
updates = [update for update in updates if update.user_id in include_users]
if not updates:
@@ -2264,10 +2274,7 @@ class SyncHandler:
for update in updates
if update.action == ProfileUpdateAction.UPDATE.value
}
shared_updated_user_ids = await self.store.do_users_share_a_room(
user_id, updated_user_ids
)
shared_updated_user_ids.add(user_id)
updated_user_ids.add(user_id)
left_room_user_ids = {
update.user_id
for update in updates
@@ -2282,7 +2289,7 @@ class SyncHandler:
user_fields: dict[str, set[str]] = {}
for update in updates:
if not update.field_name or update.user_id not in shared_updated_user_ids:
if not update.field_name or update.user_id not in updated_user_ids:
continue
user_fields.setdefault(update.user_id, set()).add(update.field_name)
+68
View File
@@ -540,6 +540,74 @@ class ProfileWorkerStore(SQLBaseStore):
"add_profile_updates", _add_profile_updates_txn
)
async def track_profile_updates_per_user(
self,
stream_id: int,
user_ids: set[str],
) -> None:
"""
Create tracking rows for profile updater per target user interested in profile
updates for the user triggering one.
Args:
stream_id: Stream ID referencing a `profile_updates` stream ID.
user_ids: A set of the full user IDs of the target users interested in
this change.
"""
def _track_profile_updates_per_user_txn(txn: LoggingTransaction) -> None:
inserted_ts = self.clock.time_msec()
values = [[stream_id, user_id, inserted_ts] for user_id in user_ids]
self.db_pool.simple_insert_many_txn(
txn,
table="profile_updates_per_user",
keys=[
"stream_id",
"user_id",
"inserted_ts",
],
values=values,
)
return await self.db_pool.runInteraction(
"track_profile_updates_per_user",
_track_profile_updates_per_user_txn,
)
async def get_profile_updates_per_user_for_user(
self, *, from_id: int, to_id: int, user_id: str
) -> list[int]:
"""
Get profile updates per user stream ID's for a particular user.
Args:
from_id: The starting stream ID (exclusive)
to_id: The ending stream ID (inclusive)
user_id: The full user ID to filter on
Returns:
List of stream ID's.
"""
def _get_profile_updates_per_user_for_user_txn(
txn: LoggingTransaction,
) -> list[int]:
sql = """
SELECT
stream_id
FROM profile_updates_per_user
WHERE
? < stream_id AND stream_id <= ? AND user_id = ?
"""
txn.execute(sql, (from_id, to_id, user_id))
rows = cast(list[tuple[int]], txn.fetchall())
return [row[0] for row in rows]
return await self.db_pool.runInteraction(
"get_profile_updates_per_user_for_user",
_get_profile_updates_per_user_for_user_txn,
)
async def create_profile(self, user_id: UserID) -> None:
"""
Create a blank profile for a user.
+47
View File
@@ -316,6 +316,53 @@ class ProfileTestCase(unittest.HomeserverTestCase):
(3, "@1234abcd:test", ProfileUpdateAction.UPDATE.value, field_name),
)
@override_config({"experimental_features": {"msc4429_enabled": True}})
def test_update_profile_set_field_writes_to_per_user_profile_tracking_table(
self,
) -> None:
self.register_user("roger", "password")
roger_token = self.login("roger", "password")
self.register_user("millie", "password")
millie_token = self.login("millie", "password")
room_id = self.helper.create_room_as(
room_creator=self.frank.to_string(),
tok=self.frank_token,
)
self.helper.join(room_id, "@roger:test", tok=roger_token)
self.helper.join(room_id, "@millie:test", tok=millie_token)
self.get_success(
self.handler.set_field(
target_user=self.frank,
requester=synapse.types.create_requester(self.frank),
field_name="m.status",
new_value='{"text": "Holiday"}',
)
)
per_user_updates = self.get_success(
self.store.get_profile_updates_per_user_for_user(
from_id=1,
to_id=2,
user_id="@roger:test",
)
)
self.assertEqual(per_user_updates, [2])
per_user_updates = self.get_success(
self.store.get_profile_updates_per_user_for_user(
from_id=1,
to_id=2,
user_id="@millie:test",
)
)
self.assertEqual(per_user_updates, [2])
per_user_updates = self.get_success(
self.store.get_profile_updates_per_user_for_user(
from_id=1,
to_id=2,
user_id=self.frank.to_string(),
)
)
self.assertEqual(per_user_updates, [])
@parameterized.expand(
[
["displayname", "Frank"],