Review comments in persist_events.py

This commit is contained in:
Kegan Dougal
2026-03-09 15:35:36 +00:00
parent aa46122b2c
commit 2f82a5b399
2 changed files with 17 additions and 23 deletions

View File

@@ -36,7 +36,7 @@ class EventFormatVersions:
ROOM_V3 = 2 # MSC1659-style $hash event id format: used for room v3
ROOM_V4_PLUS = 3 # MSC1884-style $hash format: introduced for room v4
ROOM_V11_HYDRA_PLUS = 4 # MSC4291 room IDs as hashes: introduced for room HydraV11
ROOM_VMSC4242 = 5 # MSC4242 state dags: adds prev_state_events, removes auth_events
ROOM_VMSC4242 = 5 # MSC4242 state dags: adds prev_state_events, removes auth_events
KNOWN_EVENT_FORMAT_VERSIONS = {

View File

@@ -635,11 +635,8 @@ class EventsPersistenceStorageController:
# Get the room version for the first event. This room version is the same for all events
# as events_and_contexts is all for one room.
room_version = (
events_and_contexts[0][0].room_version
if len(events_and_contexts) > 0
else None
)
assert len(events_and_contexts) > 0
room_version = events_and_contexts[0][0].room_version
for chunk in chunks:
# We can't easily parallelize these since different chunks
@@ -656,17 +653,16 @@ class EventsPersistenceStorageController:
name="_process_state_dag_forward_extremities_and_state_delta",
server_name=self.server_name,
):
assert all(
isinstance(ev, FrozenEventVMSC4242) for ev, _ in chunk
)
(
new_forward_extremities, # for prev_events
state_delta_for_room, # for state groups
new_state_dag_extrems, # for prev_state_events
) = await self._process_state_dag_forward_extremities_and_state_delta(
room_id,
[
(ev, ctx)
for ev, ctx in chunk
if isinstance(ev, FrozenEventVMSC4242)
],
cast(list[tuple[FrozenEventVMSC4242, EventContext]], chunk),
)
else:
with Measure(
@@ -864,9 +860,6 @@ class EventsPersistenceStorageController:
existing_state_dag_fwd_extrems,
event_contexts,
)
assert new_state_dag_fwd_extrems, (
f"No state dag forward extremities left in room {room_id}!"
)
# ...and the room DAG
existing_room_dag_fwd_extrems = (
await self.main_store.get_latest_event_ids_in_room(room_id)
@@ -896,6 +889,7 @@ class EventsPersistenceStorageController:
existing_state_dag_fwd_extrems,
new_state_dag_fwd_extrems,
# do not prune forward extremities in the state DAG
# else we lose eventual delivery
should_prune=False,
)
@@ -946,12 +940,11 @@ class EventsPersistenceStorageController:
Raises:
SynapseError: if the new events include unknown prev_state_events
AssertionError: if there are no state DAG forward extremities remaining in the room
"""
# filter out events which don't belong in the state dag.
new_state_events_contexts = [
(e, ctx)
for e, ctx in event_contexts
if event_exists_in_state_dag(e) and isinstance(e, FrozenEventVMSC4242)
(e, ctx) for e, ctx in event_contexts if event_exists_in_state_dag(e)
]
if len(new_state_events_contexts) == 0:
# if there are no state events being persisted, then the fwd extremities of the state dag
@@ -973,19 +966,19 @@ class EventsPersistenceStorageController:
]
# We want to check that we are not missing any prev_state_events.
# To do this, we include rejected events in this check.
all_events = set(rejected_events + new_state_events)
all_new_state_events = set(rejected_events + new_state_events)
# First, verify that we know all prev_state_events. If we fail this check then we don't have
# a complete DAG and that is bad, so bail out.
# Start with them all missing.
missing_prev_state_events = {
e_id for event in all_events for e_id in event.prev_state_events
e_id for event in all_new_state_events for e_id in event.prev_state_events
}
# remove prev events which appear in all_events
missing_prev_state_events.difference_update(
event.event_id for event in all_events
event.event_id for event in all_new_state_events
)
# the rest of these events should be present in the DB. Some of them may be forward extremities,
# some may not be, that's ok.
@@ -1001,10 +994,9 @@ class EventsPersistenceStorageController:
room_id,
missing_prev_state_events,
)
msg = (ev.event_id for ev in all_events)
logger.error(
"_calculate_new_state_dag_extremities: was handling %s",
msg,
"_calculate_new_state_dag_extremities: was handling (max 10) %s",
[ev.event_id for ev in list(all_new_state_events)[:10]],
)
raise SynapseError(
code=500,
@@ -1041,6 +1033,8 @@ class EventsPersistenceStorageController:
msc4242_state_dag_forward_extremities_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).observe(len(result))
assert result, f"No state dag forward extremities left in room {room_id}!"
return result
async def _calculate_new_extremities(