From 7801e68a33b174e42ec3ad4bf56fb21330e452e0 Mon Sep 17 00:00:00 2001 From: Kegan Dougal <7190048+kegsay@users.noreply.github.com> Date: Fri, 19 Sep 2025 10:09:18 +0100 Subject: [PATCH] Use multi-writer streams for sticky events --- synapse/app/generic_worker.py | 2 + synapse/config/workers.py | 2 +- synapse/replication/tcp/handler.py | 7 + synapse/replication/tcp/streams/__init__.py | 3 + synapse/replication/tcp/streams/_base.py | 42 +++ synapse/storage/databases/main/__init__.py | 2 + synapse/storage/databases/main/events.py | 247 +----------- .../storage/databases/main/sticky_events.py | 351 ++++++++++++++++++ .../schema/main/delta/93/01_sticky_events.sql | 5 +- .../93/01_sticky_events_seq.sql.postgres | 19 + 10 files changed, 439 insertions(+), 241 deletions(-) create mode 100644 synapse/storage/databases/main/sticky_events.py create mode 100644 synapse/storage/schema/main/delta/93/01_sticky_events_seq.sql.postgres diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 4f5bea6bd6..742b2af081 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -23,6 +23,7 @@ import logging import sys from typing import Dict, List +from synapse.storage.databases.main.sticky_events import StickyEventsWorkerStore from twisted.web.resource import Resource import synapse @@ -136,6 +137,7 @@ class GenericWorkerStore( RoomWorkerStore, DirectoryWorkerStore, ThreadSubscriptionsWorkerStore, + StickyEventsWorkerStore, PushRulesWorkerStore, ApplicationServiceTransactionWorkerStore, ApplicationServiceWorkerStore, diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 825ba78482..05f854e640 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -127,7 +127,7 @@ class WriterLocations: """Specifies the instances that write various streams. Attributes: - events: The instances that write to the event and backfill streams. + events: The instances that write to the event, backfill and sticky events streams. typing: The instances that write to the typing stream. Currently can only be a single instance. to_device: The instances that write to the to_device stream. Currently diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index dd7e38dd78..4b01eac09d 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -74,6 +74,7 @@ from synapse.replication.tcp.streams import ( ) from synapse.replication.tcp.streams._base import ( DeviceListsStream, + StickyEventsStream, ThreadSubscriptionsStream, ) @@ -224,6 +225,12 @@ class ReplicationCommandHandler: continue + if isinstance(stream, StickyEventsStream): + if hs.get_instance_name() in hs.config.worker.writers.events: + self._streams_to_replicate.append(stream) + + continue + if isinstance(stream, DeviceListsStream): if hs.get_instance_name() in hs.config.worker.writers.device_lists: self._streams_to_replicate.append(stream) diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index 25c15e5d48..96dddcc556 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -40,6 +40,7 @@ from synapse.replication.tcp.streams._base import ( PushersStream, PushRulesStream, ReceiptsStream, + StickyEventsStream, Stream, ThreadSubscriptionsStream, ToDeviceStream, @@ -68,6 +69,7 @@ STREAMS_MAP = { ToDeviceStream, FederationStream, AccountDataStream, + StickyEventsStream, ThreadSubscriptionsStream, UnPartialStatedRoomStream, UnPartialStatedEventStream, @@ -88,6 +90,7 @@ __all__ = [ "DeviceListsStream", "ToDeviceStream", "AccountDataStream", + "StickyEventsStream", "ThreadSubscriptionsStream", "UnPartialStatedRoomStream", "UnPartialStatedEventStream", diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index ec7e935d6a..bdc74e4b6a 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -766,3 +766,45 @@ class ThreadSubscriptionsStream(_StreamFromIdGen): return [], to_token, False return rows, rows[-1][0], len(updates) == limit + + +class StickyEventsStream(_StreamFromIdGen): + """A sticky event was changed.""" + + @attr.s(slots=True, auto_attribs=True) + class StickyEventsStreamRow: + """Stream to inform workers about changes to sticky events.""" + + room_id: str + event_id: str # The sticky event ID + + NAME = "sticky_events" + ROW_TYPE = StickyEventsStreamRow + + def __init__(self, hs: "HomeServer"): + self.store = hs.get_datastores().main + super().__init__( + hs.get_instance_name(), + self._update_function, + self.store._sticky_events_id_gen, + ) + + async def _update_function( + self, instance_name: str, from_token: int, to_token: int, limit: int + ) -> StreamUpdateResult: + updates = await self.store.get_updated_sticky_events( + from_id=from_token, to_id=to_token, limit=limit + ) + rows = [ + ( + stream_id, + # These are the args to `StickyEventsStreamRow` + (room_id, event_id), + ) + for stream_id, room_id, event_id in updates + ] + + if not rows: + return [], to_token, False + + return rows, rows[-1][0], len(updates) == limit diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index de55c452ae..afac2d0835 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -34,6 +34,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.sliding_sync import SlidingSyncStore from synapse.storage.databases.main.stats import UserSortOrder +from synapse.storage.databases.main.sticky_events import StickyEventsWorkerStore from synapse.storage.databases.main.thread_subscriptions import ( ThreadSubscriptionsWorkerStore, ) @@ -144,6 +145,7 @@ class DataStore( TagsStore, AccountDataStore, ThreadSubscriptionsWorkerStore, + StickyEventsWorkerStore, PushRulesWorkerStore, StreamWorkerStore, OpenIdStore, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 37742a0b1c..2dc687481c 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -22,7 +22,6 @@ import collections import itertools import logging -import time from collections import OrderedDict from typing import ( TYPE_CHECKING, @@ -49,7 +48,6 @@ from synapse.api.constants import ( EventTypes, Membership, RelationTypes, - StickyEvent, ) from synapse.api.errors import PartialStateConflictError from synapse.api.room_versions import RoomVersions @@ -253,6 +251,7 @@ class PersistEventsStore: self.database_engine = db.engine self._clock = hs.get_clock() self._instance_name = hs.get_instance_name() + self.msc4354_sticky_events = hs.config.experimental.msc4354_enabled self._ephemeral_messages_enabled = hs.config.server.enable_ephemeral_messages self.is_mine_id = hs.is_mine_id @@ -1176,13 +1175,15 @@ class PersistEventsStore: self._update_sliding_sync_tables_with_new_persisted_events_txn( txn, room_id, events_and_contexts ) - # process events which are sticky as well as re-evaluate soft-failed sticky events. - self._handle_sticky_events_txn( - txn, - room_id, - events_and_contexts, - state_delta_for_room, - ) + + if self.msc4354_sticky_events: + # process events which are sticky as well as re-evaluate soft-failed sticky events. + self.store.handle_sticky_events_txn( + txn, + room_id, + events_and_contexts, + state_delta_for_room, + ) def _persist_event_auth_chain_txn( self, @@ -2930,234 +2931,6 @@ class PersistEventsStore: }, ) - def _handle_sticky_events_txn( - self, - txn: LoggingTransaction, - room_id: str, - events_and_contexts: List[EventPersistencePair], - state_delta_for_room: Optional[DeltaState], - ) -> None: - """ - Update the sticky events table, used in MSC4354. - - This function assumes that `_store_event_txn()` (to persist the event) and - `_update_current_state_txn(...)` (so the current state has taken the events into account) - have already been run. - - "Handling" sticky events is broken into two phases: - - for each sticky event in events_and_contexts, mark them as sticky in the sticky events table. - - for each still-sticky soft-failed event in the room, re-evaluate soft-failedness. - - Args: - txn - room_id: The room that all of the events belong to - events_and_contexts: The events being persisted. - state_delta_for_room: The changes to the current state, used to detect if we need to - re-evaluate soft-failed sticky events. - """ - if len(events_and_contexts) == 0: - return - - # TODO: finish the impl - # fetch soft failed sticky events to recheck now, before we insert new sticky events, else - # we could incorrectly re-evaluate new sticky events - # event_ids_to_check = self._get_soft_failed_sticky_events_to_recheck(txn, room_id, state_delta_for_room) - # logger.info(f"_get_soft_failed_sticky_events_to_recheck => {event_ids_to_check}") - # recheck them and update any that now pass soft-fail checks. - # self._recheck_soft_failed_events(txn, room_id, event_ids_to_check) - - # insert brand new sticky events. - self._insert_sticky_events_txn(txn, events_and_contexts) - - def _insert_sticky_events_txn( - self, - txn: LoggingTransaction, - events_and_contexts: List[EventPersistencePair], - ) -> None: - sticky_events: List[EventBase] = [] - for ev, _ in events_and_contexts: - # MSC: Note: policy servers and other similar antispam techniques still apply to these events. - if ev.internal_metadata.policy_server_spammy: - continue - # We shouldn't be passed rejected events, but if we do, we filter them out too. - if ev.rejected_reason is not None: - continue - # MSC: The presence of sticky.duration_ms with a valid value makes the event “sticky” - sticky_obj = ev.get_dict().get(StickyEvent.FIELD_NAME, None) - if type(sticky_obj) is dict: - sticky_duration_ms = sticky_obj.get("duration_ms", None) - # MSC: Valid values are the integer range 0-3600000 (1 hour). - if ( - type(sticky_duration_ms) is int - and sticky_duration_ms >= 0 - and sticky_duration_ms <= 3600000 - ): - sticky_events.append(ev) - - # TODO: filter out already expired sticky events. - - if len(sticky_events) == 0: - return - logger.info( - "inserting %d sticky events in room %s", - len(sticky_events), - sticky_events[0].room_id, - ) - now_ms = round(time.time() * 1000) - self.db_pool.simple_insert_many_txn( - txn, - "sticky_events", - keys=("room_id", "event_id", "sender", "expires_at", "soft_failed"), - values=[ - ( - ev.room_id, - ev.event_id, - ev.sender, - # MSC: The start time is min(now, origin_server_ts). - # This ensures that malicious origin timestamps cannot specify start times in the future. - # Calculate the end time as start_time + min(sticky.duration_ms, 3600000). - min(ev.origin_server_ts, now_ms) - + min( - ev.get_dict()[StickyEvent.FIELD_NAME]["duration_ms"], 3600000 - ), - ev.internal_metadata.is_soft_failed(), - ) - for ev in sticky_events - ], - ) - - def _get_soft_failed_sticky_events_to_recheck( - self, - txn: LoggingTransaction, - room_id: str, - state_delta_for_room: Optional[DeltaState], - ) -> List[str]: - """Fetch soft-failed sticky events which should be rechecked against the current state. - - Soft-failed events are not rejected, so they pass auth at the state before - the event and at the auth_events in the event. Instead, soft-failed events failed auth at - the _current state of the room_. We only need to recheck soft failure if we have a reason to - believe the event may pass that check now. - - Note that we don't bother rechecking accepted events that may now be soft-failed, because - by that point it's too late as we've already sent the event to clients. - - Returns: - A list of event IDs to recheck - """ - - if state_delta_for_room is None: - # No change to current state => no way soft failure status could be different. - return [] - - # any change to critical auth events may change soft failure status. This means any changes - # to join rules, power levels or member events. If the state has changed but it isn't one - # of those events, we don't need to recheck. - critical_auth_types = ( - EventTypes.JoinRules, - EventTypes.PowerLevels, - EventTypes.Member, - ) - critical_auth_types_changed = set() - critical_auth_types_changed.update( - [ - typ - for typ, _ in state_delta_for_room.to_delete - if typ in critical_auth_types - ] - ) - critical_auth_types_changed.update( - [ - typ - for typ, _ in state_delta_for_room.to_insert - if typ in critical_auth_types - ] - ) - if len(critical_auth_types_changed) == 0: - # No change to critical auth events => no way soft failure status could be different. - return [] - - if critical_auth_types_changed == {EventTypes.Member}: - # the final case we want to catch is when unprivileged users join/leave rooms. These users cause - # changes in the critical auth types (the member event) but ultimately have no effect on soft - # failure status for anyone but that user themselves. - # - # Grab the set of senders that have been modified and see if any of them sent a soft-failed - # sticky event. If they did, then we need to re-evaluate. If they didn't, then we don't need to. - new_membership_changes = set( - [ - skey - for typ, skey in state_delta_for_room.to_insert - if typ == EventTypes.Member - ] - + [ - skey - for typ, skey in state_delta_for_room.to_delete - if typ == EventTypes.Member - ] - ) - # pull out senders of sticky events in this room - events_to_recheck: List[Tuple[str]] = self.db_pool.simple_select_many_txn( - txn, - table="sticky_events", - column="sender", - iterable=new_membership_changes, - keyvalues={ - "room_id": room_id, - "soft_failed": True, - }, - retcols=("event_id"), - ) - return [event_id for (event_id,) in events_to_recheck] - - # otherwise one of the following must be true: - # - there was a change in PL or join rules - # - there was a change in the membership of a sender of a soft-failed sticky event. - # In both of these cases we want to re-evaluate soft failure status. - # - # NB: event auth checks are NOT recursive. We don't need to specifically handle the case where - # an admin user's membership changes which causes a PL event to be allowed, as when the PL event - # gets allowed we will re-evaluate anyway. E.g: - # - # PL(send_event=0, sender=Admin) - # ^ ^_____________________ - # | | - # . PL(send_event=50, sender=Mod) sticky event (sender=User) - # - # In this scenario, the sticky event is soft-failed due to the Mod updating the PL event to - # set send_event=50, which User does not have. If we learn of an event which makes Mod's PL - # event invalid (say, Mod was banned by Admin concurrently to Mod setting the PL event), then - # the act of seeing the ban event will cause the old PL event to be in the state delta, meaning - # we will re-evaluate the sticky event due to the PL changing. We don't need to specially handle case.a - events_to_recheck = self.db_pool.simple_select_list_txn( - txn, - table="sticky_events", - keyvalues={ - "room_id": room_id, - "soft_failed": True, - }, - retcols=("event_id"), - ) - return [event_id for (event_id,) in events_to_recheck] - - def _recheck_soft_failed_events( - self, - txn: LoggingTransaction, - room_id: str, - event_ids: List[str], - ) -> None: - """ - Recheck authorised but soft-failed events. The provided event IDs must have already passed - all auth checks (so the event isn't rejected) but soft-failure checks. - - Args: - txn: The SQL transaction - room_id: The room the event IDs are in. - event_ids: The soft-failed events to re-evaluate. - """ - # We know the events are otherwise authorised, so we only need to load the current state - # and check if the events pass auth at the current state. - def insert_labels_for_event_txn( self, txn: LoggingTransaction, diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py new file mode 100644 index 0000000000..3f08a8bde9 --- /dev/null +++ b/synapse/storage/databases/main/sticky_events.py @@ -0,0 +1,351 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2025 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +import logging +import time +from typing import ( + TYPE_CHECKING, + Any, + Iterable, + List, + Optional, + Tuple, +) + +from synapse.api.constants import EventTypes, StickyEvent +from synapse.events import EventBase +from synapse.events.snapshot import EventPersistencePair +from synapse.replication.tcp.streams._base import StickyEventsStream +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, +) +from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore +from synapse.storage.databases.main.events import DeltaState +from synapse.storage.util.id_generators import MultiWriterIdGenerator + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class StickyEventsWorkerStore(CacheInvalidationWorkerStore): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self._can_write_to_sticky_events = ( + self._instance_name in hs.config.worker.writers.events + ) + + self._sticky_events_id_gen: MultiWriterIdGenerator = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + notifier=hs.get_replication_notifier(), + stream_name="sticky_events", + server_name=self.server_name, + instance_name=self._instance_name, + tables=[ + ("sticky_events", "instance_name", "stream_id"), + ], + sequence_name="sticky_events_sequence", + writers=hs.config.worker.writers.events, + ) + + def process_replication_rows( + self, + stream_name: str, + instance_name: str, + token: int, + rows: Iterable[Any], + ) -> None: + super().process_replication_rows(stream_name, instance_name, token, rows) + + def process_replication_position( + self, stream_name: str, instance_name: str, token: int + ) -> None: + if stream_name == StickyEventsStream.NAME: + self._sticky_events_id_gen.advance(instance_name, token) + super().process_replication_position(stream_name, instance_name, token) + + def get_max_sticky_events_stream_id(self) -> int: + """Get the current maximum stream_id for thread subscriptions. + + Returns: + The maximum stream_id + """ + return self._sticky_events_id_gen.get_current_token() + + def get_sticky_events_stream_id_generator(self) -> MultiWriterIdGenerator: + return self._sticky_events_id_gen + + async def get_updated_sticky_events( + self, from_id: int, to_id: int, limit: int + ) -> List[Tuple[int, str, str]]: + """Get updates to sticky events between two stream IDs. + + Args: + from_id: The starting stream ID (exclusive) + to_id: The ending stream ID (inclusive) + limit: The maximum number of rows to return + + Returns: + list of (stream_id, room_id, event_id) tuples + """ + return [] # TODO + + def handle_sticky_events_txn( + self, + txn: LoggingTransaction, + room_id: str, + events_and_contexts: List[EventPersistencePair], + state_delta_for_room: Optional[DeltaState], + ) -> None: + """Update the sticky events table, used in MSC4354. Intended to be called within the persist + events transaction. + + This function assumes that `_store_event_txn()` (to persist the event) and + `_update_current_state_txn(...)` (so the current state has taken the events into account) + have already been run. + + "Handling" sticky events is broken into two phases: + - for each sticky event in events_and_contexts, mark them as sticky in the sticky events table. + - for each still-sticky soft-failed event in the room, re-evaluate soft-failedness. + + Args: + txn + room_id: The room that all of the events belong to + events_and_contexts: The events being persisted. + state_delta_for_room: The changes to the current state, used to detect if we need to + re-evaluate soft-failed sticky events. + """ + if len(events_and_contexts) == 0: + return + + # TODO: finish the impl + # fetch soft failed sticky events to recheck now, before we insert new sticky events, else + # we could incorrectly re-evaluate new sticky events + # event_ids_to_check = self._get_soft_failed_sticky_events_to_recheck(txn, room_id, state_delta_for_room) + # logger.info(f"_get_soft_failed_sticky_events_to_recheck => {event_ids_to_check}") + # recheck them and update any that now pass soft-fail checks. + # self._recheck_soft_failed_events(txn, room_id, event_ids_to_check) + + # insert brand new sticky events. + self._insert_sticky_events_txn(txn, events_and_contexts) + + def _insert_sticky_events_txn( + self, + txn: LoggingTransaction, + events_and_contexts: List[EventPersistencePair], + ) -> None: + now_ms = round(time.time() * 1000) + # event, expires_at, stream_id + sticky_events: List[Tuple[EventBase, int, int]] = [] + for ev, _ in events_and_contexts: + # MSC: Note: policy servers and other similar antispam techniques still apply to these events. + if ev.internal_metadata.policy_server_spammy: + continue + # We shouldn't be passed rejected events, but if we do, we filter them out too. + if ev.rejected_reason is not None: + continue + # MSC: The presence of sticky.duration_ms with a valid value makes the event “sticky” + sticky_obj = ev.get_dict().get(StickyEvent.FIELD_NAME, None) + if type(sticky_obj) is not dict: + continue + sticky_duration_ms = sticky_obj.get("duration_ms", None) + # MSC: Valid values are the integer range 0-3600000 (1 hour). + if ( + type(sticky_duration_ms) is int + and sticky_duration_ms >= 0 + and sticky_duration_ms <= 3600000 + ): + # MSC: The start time is min(now, origin_server_ts). + # This ensures that malicious origin timestamps cannot specify start times in the future. + # Calculate the end time as start_time + min(sticky.duration_ms, 3600000). + expires_at = min(ev.origin_server_ts, now_ms) + min( + ev.get_dict()[StickyEvent.FIELD_NAME]["duration_ms"], 3600000 + ) + # filter out already expired sticky events + if expires_at > now_ms: + sticky_events.append( + (ev, expires_at, self._sticky_events_id_gen.get_next_txn(txn)) + ) + if len(sticky_events) == 0: + return + logger.info( + "inserting %d sticky events in room %s", + len(sticky_events), + sticky_events[0][0].room_id, + ) + self.db_pool.simple_insert_many_txn( + txn, + "sticky_events", + keys=( + "instance_name", + "stream_id", + "room_id", + "event_id", + "sender", + "expires_at", + "soft_failed", + ), + values=[ + ( + self._instance_name, + stream_id, + ev.room_id, + ev.event_id, + ev.sender, + expires_at, + ev.internal_metadata.is_soft_failed(), + ) + for (ev, expires_at, stream_id) in sticky_events + ], + ) + + def _get_soft_failed_sticky_events_to_recheck( + self, + txn: LoggingTransaction, + room_id: str, + state_delta_for_room: Optional[DeltaState], + ) -> List[str]: + """Fetch soft-failed sticky events which should be rechecked against the current state. + + Soft-failed events are not rejected, so they pass auth at the state before + the event and at the auth_events in the event. Instead, soft-failed events failed auth at + the _current state of the room_. We only need to recheck soft failure if we have a reason to + believe the event may pass that check now. + + Note that we don't bother rechecking accepted events that may now be soft-failed, because + by that point it's too late as we've already sent the event to clients. + + Returns: + A list of event IDs to recheck + """ + + if state_delta_for_room is None: + # No change to current state => no way soft failure status could be different. + return [] + + # any change to critical auth events may change soft failure status. This means any changes + # to join rules, power levels or member events. If the state has changed but it isn't one + # of those events, we don't need to recheck. + critical_auth_types = ( + EventTypes.JoinRules, + EventTypes.PowerLevels, + EventTypes.Member, + ) + critical_auth_types_changed = set() + critical_auth_types_changed.update( + [ + typ + for typ, _ in state_delta_for_room.to_delete + if typ in critical_auth_types + ] + ) + critical_auth_types_changed.update( + [ + typ + for typ, _ in state_delta_for_room.to_insert + if typ in critical_auth_types + ] + ) + if len(critical_auth_types_changed) == 0: + # No change to critical auth events => no way soft failure status could be different. + return [] + + if critical_auth_types_changed == {EventTypes.Member}: + # the final case we want to catch is when unprivileged users join/leave rooms. These users cause + # changes in the critical auth types (the member event) but ultimately have no effect on soft + # failure status for anyone but that user themselves. + # + # Grab the set of senders that have been modified and see if any of them sent a soft-failed + # sticky event. If they did, then we need to re-evaluate. If they didn't, then we don't need to. + new_membership_changes = set( + [ + skey + for typ, skey in state_delta_for_room.to_insert + if typ == EventTypes.Member + ] + + [ + skey + for typ, skey in state_delta_for_room.to_delete + if typ == EventTypes.Member + ] + ) + # pull out senders of sticky events in this room + events_to_recheck: List[Tuple[str]] = self.db_pool.simple_select_many_txn( + txn, + table="sticky_events", + column="sender", + iterable=new_membership_changes, + keyvalues={ + "room_id": room_id, + "soft_failed": True, + }, + retcols=("event_id"), + ) + return [event_id for (event_id,) in events_to_recheck] + + # otherwise one of the following must be true: + # - there was a change in PL or join rules + # - there was a change in the membership of a sender of a soft-failed sticky event. + # In both of these cases we want to re-evaluate soft failure status. + # + # NB: event auth checks are NOT recursive. We don't need to specifically handle the case where + # an admin user's membership changes which causes a PL event to be allowed, as when the PL event + # gets allowed we will re-evaluate anyway. E.g: + # + # PL(send_event=0, sender=Admin) + # ^ ^_____________________ + # | | + # . PL(send_event=50, sender=Mod) sticky event (sender=User) + # + # In this scenario, the sticky event is soft-failed due to the Mod updating the PL event to + # set send_event=50, which User does not have. If we learn of an event which makes Mod's PL + # event invalid (say, Mod was banned by Admin concurrently to Mod setting the PL event), then + # the act of seeing the ban event will cause the old PL event to be in the state delta, meaning + # we will re-evaluate the sticky event due to the PL changing. We don't need to specially handle case.a + events_to_recheck = self.db_pool.simple_select_list_txn( + txn, + table="sticky_events", + keyvalues={ + "room_id": room_id, + "soft_failed": True, + }, + retcols=("event_id"), + ) + return [event_id for (event_id,) in events_to_recheck] + + def _recheck_soft_failed_events( + self, + txn: LoggingTransaction, + room_id: str, + event_ids: List[str], + ) -> None: + """ + Recheck authorised but soft-failed events. The provided event IDs must have already passed + all auth checks (so the event isn't rejected) but soft-failure checks. + + Args: + txn: The SQL transaction + room_id: The room the event IDs are in. + event_ids: The soft-failed events to re-evaluate. + """ + # We know the events are otherwise authorised, so we only need to load the current state + # and check if the events pass auth at the current state. diff --git a/synapse/storage/schema/main/delta/93/01_sticky_events.sql b/synapse/storage/schema/main/delta/93/01_sticky_events.sql index e3288968de..5f94860130 100644 --- a/synapse/storage/schema/main/delta/93/01_sticky_events.sql +++ b/synapse/storage/schema/main/delta/93/01_sticky_events.sql @@ -11,10 +11,9 @@ -- See the GNU Affero General Public License for more details: -- . -CREATE SEQUENCE IF NOT EXISTS sticky_events_seq; - CREATE TABLE IF NOT EXISTS sticky_events( - id BIGINT PRIMARY KEY DEFAULT nextval('sticky_events_seq'), + stream_id INTEGER NOT NULL PRIMARY KEY, + instance_name TEXT NOT NULL, room_id TEXT NOT NULL, event_id TEXT NOT NULL, sender TEXT NOT NULL, diff --git a/synapse/storage/schema/main/delta/93/01_sticky_events_seq.sql.postgres b/synapse/storage/schema/main/delta/93/01_sticky_events_seq.sql.postgres new file mode 100644 index 0000000000..e4f4ff5798 --- /dev/null +++ b/synapse/storage/schema/main/delta/93/01_sticky_events_seq.sql.postgres @@ -0,0 +1,19 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2025 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + +CREATE SEQUENCE sticky_events_sequence + -- Synapse streams start at 2, because the default position is 1 + -- so any item inserted at position 1 is ignored. + -- This is also what existing streams do, except they use `setval(..., 1)` + -- which is semantically the same except less obvious. + START WITH 2;