From 7af74298b36e7c4153c1edb7d36e736042e513c8 Mon Sep 17 00:00:00 2001 From: Kegan Dougal <7190048+kegsay@users.noreply.github.com> Date: Mon, 22 Sep 2025 10:16:00 +0100 Subject: [PATCH] Don't include expired sticky events in /sync responses --- synapse/handlers/sync.py | 6 +++++- synapse/storage/databases/main/sticky_events.py | 13 ++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f9c5e3b3c0..6f8010fe82 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -20,6 +20,7 @@ # import itertools import logging +import time from typing import ( TYPE_CHECKING, AbstractSet, @@ -626,6 +627,7 @@ class SyncHandler: events are included, and a dict mapping from room_id to a list of sticky event IDs for that room. """ + now = round(time.time() * 1000) with Measure( self.clock, name="sticky_events_by_room", server_name=self.server_name ): @@ -634,7 +636,9 @@ class SyncHandler: room_ids = sync_result_builder.joined_room_ids to_id, sticky_by_room = await self.store.get_sticky_events_in_rooms( - room_ids, from_id + room_ids, + from_id, + now, ) now_token = now_token.copy_and_replace(StreamKeyType.STICKY_EVENTS, to_id) diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index 7f8082421c..925a640f56 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -103,6 +103,7 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore): self, room_ids: Collection[str], from_id: int, + now: int, ) -> Tuple[int, Dict[str, Set[str]]]: """ Fetch all the sticky events in the given rooms, from the given sticky stream ID. @@ -110,6 +111,7 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore): Args: room_ids: The room IDs to return sticky events in. from_id: The sticky stream ID that sticky events should be returned from. + now: The current time in unix millis, used for skipping expired events. Returns: A tuple of (to_id, map[room_id, event_ids]) """ @@ -118,6 +120,7 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore): self._get_sticky_events_in_rooms_txn, room_ids, from_id, + now, ) to_id = from_id room_to_events: Dict[str, Set[str]] = {} @@ -133,6 +136,7 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore): txn: LoggingTransaction, room_ids: Collection[str], from_id: int, + now: int, ) -> List[Tuple[int, str, str]]: if len(room_ids) == 0: return [] @@ -141,9 +145,9 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore): ) txn.execute( f""" - SELECT stream_id, room_id, event_id FROM sticky_events WHERE soft_failed=FALSE AND stream_id > ? AND {clause} + SELECT stream_id, room_id, event_id FROM sticky_events WHERE soft_failed=FALSE AND expires_at > ? AND stream_id > ? AND {clause} """, - (from_id, room_id_values), + (now, from_id, room_id_values), ) return cast(List[Tuple[int, str, str]], txn.fetchall()) @@ -225,7 +229,7 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore): txn: LoggingTransaction, events_and_contexts: List[EventPersistencePair], ) -> None: - now_ms = round(time.time() * 1000) + now_ms = self._now() # event, expires_at, stream_id sticky_events: List[Tuple[EventBase, int, int]] = [] for ev, _ in events_and_contexts: @@ -421,3 +425,6 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore): """ # We know the events are otherwise authorised, so we only need to load the current state # and check if the events pass auth at the current state. + + def _now(self) -> int: + return round(time.time() * 1000)