add priority_room_ids to task state, do not refetch on resume. This fixes a race condition where the restart happens before any prioritised rooms canbe processed

This commit is contained in:
Neil Johnson
2026-03-26 13:15:44 +00:00
parent 2c09bce002
commit 6245988aae

View File

@@ -733,39 +733,51 @@ class ProfileHandler:
processed_priority_rooms = set(
task.result.get("processed_priority_rooms", []) if task.result else []
)
# Get the stored priority room list if we're resuming
stored_priority_room_ids = (
task.result.get("priority_room_ids", []) if task.result else []
)
NUMBER_OF_PRIORITISED_ROOMS = 50
# If we haven't processed priority rooms yet (no last_room_id means fresh start)
if not last_room_id:
# Get the most recently active rooms for this user
try:
# Get the last event position for each room
room_activity = await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering(
all_room_ids,
self.store.get_room_max_token(), # Use current positions
)
priority_room_ids: list[str] = []
# Determine which phase we're in and what to do
if last_room_id is None:
# We're in priority phase (or starting fresh)
if stored_priority_room_ids:
# Resuming priority phase - use stored list
priority_room_ids = stored_priority_room_ids
elif not processed_priority_rooms:
# Fresh start - get the most recently active rooms
try:
# Get the last event position for each room
room_activity = await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering(
all_room_ids,
self.store.get_room_max_token(), # Use current positions
)
# Sort rooms by activity (descending stream ordering)
priority_room_ids = sorted(
room_activity.keys(),
key=lambda rid: room_activity.get(rid, 0),
reverse=True,
)[:NUMBER_OF_PRIORITISED_ROOMS]
# Sort rooms by activity (descending stream ordering)
priority_room_ids = sorted(
room_activity.keys(),
key=lambda rid: room_activity.get(rid, 0),
reverse=True,
)[:NUMBER_OF_PRIORITISED_ROOMS]
except Exception as e:
# If we can't get priority rooms, fall back to alphabetical
logger.warning(
"Failed to get priority rooms for %s: %s. Falling back to alphabetical order.",
target_user.to_string(),
str(e),
)
priority_room_ids = []
# Store the priority list so we can resume if interrupted
stored_priority_room_ids = priority_room_ids
except Exception as e:
# If we can't get priority rooms, fall back to alphabetical
logger.warning(
"Failed to get priority rooms for %s: %s. Falling back to alphabetical order.",
target_user.to_string(),
str(e),
)
priority_room_ids = []
stored_priority_room_ids = []
else:
# We're resuming, skip priority processing
priority_room_ids = []
# We're in alphabetical phase (last_room_id is set), continue from there
# Filter out room IDs that have already been handled
# by finding the first room ID greater than the last handled room ID
# and slicing the list from that point onwards.
sorted_room_ids = sorted_room_ids[
bisect_right(sorted_room_ids, last_room_id) :
]
@@ -809,11 +821,15 @@ class ProfileHandler:
result={
"last_room_id": None,
"processed_priority_rooms": list(processed_priority_rooms),
"priority_room_ids": stored_priority_room_ids,
},
)
# Now process all rooms in alphabetical order (including re-processing priority rooms)
# Now process all rooms in alphabetical order (skipping already-processed priority rooms)
for room_id in sorted_room_ids:
if room_id in processed_priority_rooms:
continue # Skip rooms already processed in priority phase
try:
# Assume the target_user isn't a guest,
# because we don't let guests set profile or avatar data.
@@ -835,6 +851,7 @@ class ProfileHandler:
result={
"last_room_id": room_id,
"processed_priority_rooms": list(processed_priority_rooms),
"priority_room_ids": stored_priority_room_ids,
},
)