Fill in stream_ordering/bump_stamp when we add current state to the joined rooms table

This commit is contained in:
Eric Eastwood
2024-08-08 15:41:55 -05:00
parent bc3796d333
commit cc2d2b6b9f
4 changed files with 75 additions and 25 deletions

View File

@@ -74,7 +74,12 @@ from synapse.types import (
StreamToken,
UserID,
)
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
from synapse.types.handlers import (
SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
OperationType,
SlidingSyncConfig,
SlidingSyncResult,
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import concurrently_execute
from synapse.visibility import filter_events_for_client
@@ -91,18 +96,6 @@ class Sentinel(enum.Enum):
UNSET_SENTINEL = object()
# The event types that clients should consider as new activity.
DEFAULT_BUMP_EVENT_TYPES = {
EventTypes.Create,
EventTypes.Message,
EventTypes.Encrypted,
EventTypes.Sticker,
EventTypes.CallInvite,
EventTypes.PollStart,
EventTypes.LiveLocationShareStart,
}
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _RoomMembershipForUser:
"""
@@ -2174,7 +2167,9 @@ class SlidingSyncHandler:
# Figure out the last bump event in the room
last_bump_event_result = (
await self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
room_id,
to_token.room_key,
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
)
)

View File

@@ -73,6 +73,7 @@ from synapse.types import (
StrCollection,
get_domain_from_id,
)
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
from synapse.util import json_encoder
from synapse.util.iterutils import batch_iter, sorted_topologically
from synapse.util.stringutils import non_null_str_or_none
@@ -1308,6 +1309,9 @@ class PersistEventsStore:
)
# Update the `sliding_sync_non_join_memberships` table
#
# Pulling keys/values separately is safe and will produce congruent
# lists
insert_keys = sliding_sync_non_joined_rooms_insert_map.keys()
insert_values = sliding_sync_non_joined_rooms_insert_map.values()
txn.execute_batch(
@@ -1452,6 +1456,7 @@ class PersistEventsStore:
create_event_id = None
room_encryption_event_id = None
room_name_event_id = None
bump_event_id = None
for state_key, event_id in to_insert.items():
if state_key[0] == EventTypes.Create and state_key[1] == "":
create_event_id = event_id
@@ -1463,6 +1468,12 @@ class PersistEventsStore:
room_name_event_id = event_id
event_ids_to_fetch.append(event_id)
if (
state_key[0] in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
and state_key[1] == ""
):
bump_event_id = event_id
# Map of values to insert/update in the `sliding_sync_joined_rooms` table
sliding_sync_joined_rooms_insert_map: Dict[
str, Optional[Union[str, bool]]
@@ -1515,23 +1526,57 @@ class PersistEventsStore:
)
# Update the `sliding_sync_joined_rooms` table
args: List[Any] = [
room_id,
# Even though `Mapping`/`Dict` have no guaranteed order, some
# implementations may preserve insertion order so we're just going to
# choose the best possible answer by using the "last" event ID which we
# will assume will have the greatest `stream_ordering`. We really just
# need *some* answer in case we are the first ones inserting into the
# table and this will resolve itself when we update this field in the
# persist events loop.
list(to_insert.values())[-1],
]
# If we have a `bump_event_id`, let's update the `bump_stamp` column
bump_stamp_column = ""
bump_stamp_values_clause = ""
if bump_event_id is not None:
bump_stamp_column = "bump_stamp, "
bump_stamp_values_clause = (
"(SELECT stream_ordering FROM events WHERE event_id = ?),"
)
args.append(bump_event_id)
# Pulling keys/values separately is safe and will produce congruent lists
insert_keys = sliding_sync_joined_rooms_insert_map.keys()
insert_values = sliding_sync_joined_rooms_insert_map.values()
args.extend(iter(insert_values))
if len(insert_keys) > 0:
# TODO: Should we add `event_stream_ordering`, `bump_stamp` on insert?
# We don't update `bump_stamp` `ON CONFLICT` because we're dealing with
# state here and the only state event that is also a bump event type is
# `m.room.create`. Given the room creation event is the first one in the
# room, it's either going to be set on insert, or we've already moved on
# to other events with a greater `stream_ordering`/`bump_stamp` and we
# don't need to even try.
txn.execute(
f"""
INSERT INTO sliding_sync_joined_rooms
(room_id, {", ".join(insert_keys)})
(room_id, event_stream_ordering, {bump_stamp_column} {", ".join(insert_keys)})
VALUES (
?,
(SELECT stream_ordering FROM events WHERE event_id = ?),
{bump_stamp_values_clause}
{", ".join("?" for _ in insert_values)}
)
ON CONFLICT (room_id)
DO UPDATE SET
{", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)}
{", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)},
event_stream_ordering = CASE
WHEN event_stream_ordering < EXCLUDED.event_stream_ordering
THEN EXCLUDED.event_stream_ordering
ELSE event_stream_ordering
END
""",
[room_id] + list(insert_values),
args,
)
# We now update `local_current_membership`. We do this regardless

View File

@@ -30,6 +30,7 @@ if TYPE_CHECKING or HAS_PYDANTIC_V2:
else:
from pydantic import Extra
from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.types import (
DeviceListUpdates,
@@ -45,6 +46,18 @@ from synapse.types.rest.client import SlidingSyncBody
if TYPE_CHECKING:
from synapse.handlers.relations import BundledAggregations
# Sliding Sync: The event types that clients should consider as new activity and affect
# the `bump_stamp`
SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES = {
EventTypes.Create,
EventTypes.Message,
EventTypes.Encrypted,
EventTypes.Sticker,
EventTypes.CallInvite,
EventTypes.PollStart,
EventTypes.LiveLocationShareStart,
}
class ShutdownRoomParams(TypedDict):
"""

View File

@@ -523,10 +523,9 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
room.register_servlets,
]
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self.store = self.hs.get_datastores().main
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.storage_controllers = hs.get_storage_controllers()
def _get_sliding_sync_joined_rooms(self) -> Dict[str, _SlidingSyncJoinedRoomResult]:
"""
@@ -615,10 +614,8 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
# User1 joins the room
self.helper.join(room_id1, user1_id, tok=user1_tok)