diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py index fe4f97d128..3e06819424 100644 --- a/synapse/api/room_versions.py +++ b/synapse/api/room_versions.py @@ -36,7 +36,7 @@ class EventFormatVersions: ROOM_V3 = 2 # MSC1659-style $hash event id format: used for room v3 ROOM_V4_PLUS = 3 # MSC1884-style $hash format: introduced for room v4 ROOM_V11_HYDRA_PLUS = 4 # MSC4291 room IDs as hashes: introduced for room HydraV11 - ROOM_VMSC4242 = 5 # MSC4242 state dags: adds prev_state_events, removes auth_events + ROOM_VMSC4242 = 5 # MSC4242 state dags: adds prev_state_events, removes auth_events KNOWN_EVENT_FORMAT_VERSIONS = { diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index de9487e3dd..b75ce107c4 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -635,11 +635,8 @@ class EventsPersistenceStorageController: # Get the room version for the first event. This room version is the same for all events # as events_and_contexts is all for one room. - room_version = ( - events_and_contexts[0][0].room_version - if len(events_and_contexts) > 0 - else None - ) + assert len(events_and_contexts) > 0 + room_version = events_and_contexts[0][0].room_version for chunk in chunks: # We can't easily parallelize these since different chunks @@ -656,17 +653,16 @@ class EventsPersistenceStorageController: name="_process_state_dag_forward_extremities_and_state_delta", server_name=self.server_name, ): + assert all( + isinstance(ev, FrozenEventVMSC4242) for ev, _ in chunk + ) ( new_forward_extremities, # for prev_events state_delta_for_room, # for state groups new_state_dag_extrems, # for prev_state_events ) = await self._process_state_dag_forward_extremities_and_state_delta( room_id, - [ - (ev, ctx) - for ev, ctx in chunk - if isinstance(ev, FrozenEventVMSC4242) - ], + cast(list[tuple[FrozenEventVMSC4242, EventContext]], chunk), ) else: with Measure( @@ -864,9 +860,6 @@ class EventsPersistenceStorageController: existing_state_dag_fwd_extrems, event_contexts, ) - assert new_state_dag_fwd_extrems, ( - f"No state dag forward extremities left in room {room_id}!" - ) # ...and the room DAG existing_room_dag_fwd_extrems = ( await self.main_store.get_latest_event_ids_in_room(room_id) @@ -896,6 +889,7 @@ class EventsPersistenceStorageController: existing_state_dag_fwd_extrems, new_state_dag_fwd_extrems, # do not prune forward extremities in the state DAG + # else we lose eventual delivery should_prune=False, ) @@ -946,12 +940,11 @@ class EventsPersistenceStorageController: Raises: SynapseError: if the new events include unknown prev_state_events + AssertionError: if there are no state DAG forward extremities remaining in the room """ # filter out events which don't belong in the state dag. new_state_events_contexts = [ - (e, ctx) - for e, ctx in event_contexts - if event_exists_in_state_dag(e) and isinstance(e, FrozenEventVMSC4242) + (e, ctx) for e, ctx in event_contexts if event_exists_in_state_dag(e) ] if len(new_state_events_contexts) == 0: # if there are no state events being persisted, then the fwd extremities of the state dag @@ -973,19 +966,19 @@ class EventsPersistenceStorageController: ] # We want to check that we are not missing any prev_state_events. # To do this, we include rejected events in this check. - all_events = set(rejected_events + new_state_events) + all_new_state_events = set(rejected_events + new_state_events) # First, verify that we know all prev_state_events. If we fail this check then we don't have # a complete DAG and that is bad, so bail out. # Start with them all missing. missing_prev_state_events = { - e_id for event in all_events for e_id in event.prev_state_events + e_id for event in all_new_state_events for e_id in event.prev_state_events } # remove prev events which appear in all_events missing_prev_state_events.difference_update( - event.event_id for event in all_events + event.event_id for event in all_new_state_events ) # the rest of these events should be present in the DB. Some of them may be forward extremities, # some may not be, that's ok. @@ -1001,10 +994,9 @@ class EventsPersistenceStorageController: room_id, missing_prev_state_events, ) - msg = (ev.event_id for ev in all_events) logger.error( - "_calculate_new_state_dag_extremities: was handling %s", - msg, + "_calculate_new_state_dag_extremities: was handling (max 10) %s", + [ev.event_id for ev in list(all_new_state_events)[:10]], ) raise SynapseError( code=500, @@ -1041,6 +1033,8 @@ class EventsPersistenceStorageController: msc4242_state_dag_forward_extremities_counter.labels( **{SERVER_NAME_LABEL: self.server_name} ).observe(len(result)) + + assert result, f"No state dag forward extremities left in room {room_id}!" return result async def _calculate_new_extremities(