From 6245988aaeec49382e229c66cee5a83fc70f0d74 Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Thu, 26 Mar 2026 13:15:44 +0000 Subject: [PATCH] add priority_room_ids to task state, do not refetch on resume. This fixes a race condition where the restart happens before any prioritised rooms canbe processed --- synapse/handlers/profile.py | 73 +++++++++++++++++++++++-------------- 1 file changed, 45 insertions(+), 28 deletions(-) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index f610816152..c8283a1a15 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -733,39 +733,51 @@ class ProfileHandler: processed_priority_rooms = set( task.result.get("processed_priority_rooms", []) if task.result else [] ) + # Get the stored priority room list if we're resuming + stored_priority_room_ids = ( + task.result.get("priority_room_ids", []) if task.result else [] + ) NUMBER_OF_PRIORITISED_ROOMS = 50 - # If we haven't processed priority rooms yet (no last_room_id means fresh start) - if not last_room_id: - # Get the most recently active rooms for this user - try: - # Get the last event position for each room - room_activity = await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering( - all_room_ids, - self.store.get_room_max_token(), # Use current positions - ) + priority_room_ids: list[str] = [] + + # Determine which phase we're in and what to do + if last_room_id is None: + # We're in priority phase (or starting fresh) + if stored_priority_room_ids: + # Resuming priority phase - use stored list + priority_room_ids = stored_priority_room_ids + elif not processed_priority_rooms: + # Fresh start - get the most recently active rooms + try: + # Get the last event position for each room + room_activity = await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering( + all_room_ids, + self.store.get_room_max_token(), # Use current positions + ) - # Sort rooms by activity (descending stream ordering) - priority_room_ids = sorted( - room_activity.keys(), - key=lambda rid: room_activity.get(rid, 0), - reverse=True, - )[:NUMBER_OF_PRIORITISED_ROOMS] + # Sort rooms by activity (descending stream ordering) + priority_room_ids = sorted( + room_activity.keys(), + key=lambda rid: room_activity.get(rid, 0), + reverse=True, + )[:NUMBER_OF_PRIORITISED_ROOMS] - except Exception as e: - # If we can't get priority rooms, fall back to alphabetical - logger.warning( - "Failed to get priority rooms for %s: %s. Falling back to alphabetical order.", - target_user.to_string(), - str(e), - ) - priority_room_ids = [] + # Store the priority list so we can resume if interrupted + stored_priority_room_ids = priority_room_ids + + except Exception as e: + # If we can't get priority rooms, fall back to alphabetical + logger.warning( + "Failed to get priority rooms for %s: %s. Falling back to alphabetical order.", + target_user.to_string(), + str(e), + ) + priority_room_ids = [] + stored_priority_room_ids = [] else: - # We're resuming, skip priority processing - priority_room_ids = [] + # We're in alphabetical phase (last_room_id is set), continue from there # Filter out room IDs that have already been handled - # by finding the first room ID greater than the last handled room ID - # and slicing the list from that point onwards. sorted_room_ids = sorted_room_ids[ bisect_right(sorted_room_ids, last_room_id) : ] @@ -809,11 +821,15 @@ class ProfileHandler: result={ "last_room_id": None, "processed_priority_rooms": list(processed_priority_rooms), + "priority_room_ids": stored_priority_room_ids, }, ) - # Now process all rooms in alphabetical order (including re-processing priority rooms) + # Now process all rooms in alphabetical order (skipping already-processed priority rooms) for room_id in sorted_room_ids: + if room_id in processed_priority_rooms: + continue # Skip rooms already processed in priority phase + try: # Assume the target_user isn't a guest, # because we don't let guests set profile or avatar data. @@ -835,6 +851,7 @@ class ProfileHandler: result={ "last_room_id": room_id, "processed_priority_rooms": list(processed_priority_rooms), + "priority_room_ids": stored_priority_room_ids, }, )