mirror of
https://github.com/element-hq/synapse.git
synced 2026-06-07 00:12:02 +00:00
Don't include expired sticky events in /sync responses
This commit is contained in:
@@ -20,6 +20,7 @@
|
||||
#
|
||||
import itertools
|
||||
import logging
|
||||
import time
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
AbstractSet,
|
||||
@@ -626,6 +627,7 @@ class SyncHandler:
|
||||
events are included, and a dict mapping from room_id to a list of
|
||||
sticky event IDs for that room.
|
||||
"""
|
||||
now = round(time.time() * 1000)
|
||||
with Measure(
|
||||
self.clock, name="sticky_events_by_room", server_name=self.server_name
|
||||
):
|
||||
@@ -634,7 +636,9 @@ class SyncHandler:
|
||||
room_ids = sync_result_builder.joined_room_ids
|
||||
|
||||
to_id, sticky_by_room = await self.store.get_sticky_events_in_rooms(
|
||||
room_ids, from_id
|
||||
room_ids,
|
||||
from_id,
|
||||
now,
|
||||
)
|
||||
now_token = now_token.copy_and_replace(StreamKeyType.STICKY_EVENTS, to_id)
|
||||
|
||||
|
||||
@@ -103,6 +103,7 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore):
|
||||
self,
|
||||
room_ids: Collection[str],
|
||||
from_id: int,
|
||||
now: int,
|
||||
) -> Tuple[int, Dict[str, Set[str]]]:
|
||||
"""
|
||||
Fetch all the sticky events in the given rooms, from the given sticky stream ID.
|
||||
@@ -110,6 +111,7 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore):
|
||||
Args:
|
||||
room_ids: The room IDs to return sticky events in.
|
||||
from_id: The sticky stream ID that sticky events should be returned from.
|
||||
now: The current time in unix millis, used for skipping expired events.
|
||||
Returns:
|
||||
A tuple of (to_id, map[room_id, event_ids])
|
||||
"""
|
||||
@@ -118,6 +120,7 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore):
|
||||
self._get_sticky_events_in_rooms_txn,
|
||||
room_ids,
|
||||
from_id,
|
||||
now,
|
||||
)
|
||||
to_id = from_id
|
||||
room_to_events: Dict[str, Set[str]] = {}
|
||||
@@ -133,6 +136,7 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore):
|
||||
txn: LoggingTransaction,
|
||||
room_ids: Collection[str],
|
||||
from_id: int,
|
||||
now: int,
|
||||
) -> List[Tuple[int, str, str]]:
|
||||
if len(room_ids) == 0:
|
||||
return []
|
||||
@@ -141,9 +145,9 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore):
|
||||
)
|
||||
txn.execute(
|
||||
f"""
|
||||
SELECT stream_id, room_id, event_id FROM sticky_events WHERE soft_failed=FALSE AND stream_id > ? AND {clause}
|
||||
SELECT stream_id, room_id, event_id FROM sticky_events WHERE soft_failed=FALSE AND expires_at > ? AND stream_id > ? AND {clause}
|
||||
""",
|
||||
(from_id, room_id_values),
|
||||
(now, from_id, room_id_values),
|
||||
)
|
||||
return cast(List[Tuple[int, str, str]], txn.fetchall())
|
||||
|
||||
@@ -225,7 +229,7 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore):
|
||||
txn: LoggingTransaction,
|
||||
events_and_contexts: List[EventPersistencePair],
|
||||
) -> None:
|
||||
now_ms = round(time.time() * 1000)
|
||||
now_ms = self._now()
|
||||
# event, expires_at, stream_id
|
||||
sticky_events: List[Tuple[EventBase, int, int]] = []
|
||||
for ev, _ in events_and_contexts:
|
||||
@@ -421,3 +425,6 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore):
|
||||
"""
|
||||
# We know the events are otherwise authorised, so we only need to load the current state
|
||||
# and check if the events pass auth at the current state.
|
||||
|
||||
def _now(self) -> int:
|
||||
return round(time.time() * 1000)
|
||||
|
||||
Reference in New Issue
Block a user