Send a SSS response immediately if the config has changed and there are new results to sync (#19714)

This fixes the bug described in #19713 (and double-checked against the
SDK integration test, which now passes with this change). A sync
response must be returned immediately if a room subscription
configuration change caused a new non-empty response (checked with `if
response` in the code) to be produced.

Fixes #19713.
Fixes #18844.

---------

Co-authored-by: Erik Johnston <erik@matrix.org>
This commit is contained in:
Benjamin Bouvier
2026-04-24 12:18:05 +02:00
committed by Erik Johnston
parent 966e193e4e
commit 0b910449ef
4 changed files with 156 additions and 28 deletions
+1
View File
@@ -0,0 +1 @@
Have SSS return a new response immediately if a room subscription have changed and produced a new response.
+28 -24
View File
@@ -184,34 +184,38 @@ class SlidingSyncHandler:
timeout_ms -= after_wait_ts - before_wait_ts
timeout_ms = max(timeout_ms, 0)
# We're going to respond immediately if the timeout is 0 or if this is an
# initial sync (without a `from_token`) so we can avoid calling
# `notifier.wait_for_events()`.
if timeout_ms == 0 or from_token is None:
now_token = self.event_sources.get_current_token()
result = await self.current_sync_for_user(
# Compute a response immediately. We always need to do this before
# waiting for new data (unlike in /v3/sync), as the request config might
# have changed (e.g. new room subscriptions, etc).
now_token = self.event_sources.get_current_token()
result = await self.current_sync_for_user(
sync_config,
from_token=from_token,
to_token=now_token,
)
# Return immediately if we have a result, the timeout is 0, or this is
# an initial sync.
if result or timeout_ms == 0 or from_token is None:
return result, did_wait
# Otherwise, we wait for something to happen and report it to the user.
async def current_sync_callback(
before_token: StreamToken, after_token: StreamToken
) -> SlidingSyncResult:
return await self.current_sync_for_user(
sync_config,
from_token=from_token,
to_token=now_token,
to_token=after_token,
)
else:
# Otherwise, we wait for something to happen and report it to the user.
async def current_sync_callback(
before_token: StreamToken, after_token: StreamToken
) -> SlidingSyncResult:
return await self.current_sync_for_user(
sync_config,
from_token=from_token,
to_token=after_token,
)
result = await self.notifier.wait_for_events(
sync_config.user.to_string(),
timeout_ms,
current_sync_callback,
from_token=from_token.stream_token,
)
did_wait = True
result = await self.notifier.wait_for_events(
sync_config.user.to_string(),
timeout_ms,
current_sync_callback,
from_token=now_token,
)
did_wait = True
return result, did_wait
+8 -4
View File
@@ -852,11 +852,15 @@ class SlidingSyncRoomLists:
previous_connection_state.room_configs.get(room_id)
)
if prev_room_sync_config is not None:
# Always include rooms whose timeline limit has increased.
# (see the "XXX: Odd behavior" described below)
# Always include rooms whose effective config has
# expanded. This covers timeline-limit increases and
# required-state additions introduced by room
# subscriptions overriding list-derived params.
if (
prev_room_sync_config.timeline_limit
< room_config.timeline_limit
prev_room_sync_config.combine_room_sync_config(
room_config
)
!= prev_room_sync_config
):
rooms_should_send.add(room_id)
continue
@@ -22,6 +22,7 @@ import synapse.rest.admin
from synapse.api.constants import EventTypes, HistoryVisibility
from synapse.rest.client import login, room, sync
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util.clock import Clock
from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
@@ -126,6 +127,124 @@ class SlidingSyncRoomSubscriptionsTestCase(SlidingSyncBase):
response_body["rooms"][room_id1],
)
def test_room_subscription_required_state_expansion_returns_immediately(
self,
) -> None:
"""
Test that adding a room subscription with stronger params than the list causes an
incremental long-poll to return immediately, even without new stream activity.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
sync_body: JsonDict = {
"lists": {
"foo-list": {
"ranges": [[0, 0]],
"required_state": [],
"timeline_limit": 0,
}
},
"conn_id": "conn_id",
}
_, from_token = self.do_sync(sync_body, tok=user1_tok)
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [
[EventTypes.Create, ""],
],
"timeline_limit": 0,
}
}
channel = self.make_request(
"POST",
self.sync_endpoint + f"?timeout=10000&pos={from_token}",
content=sync_body,
access_token=user1_tok,
await_result=False,
)
channel.await_result(timeout_ms=3000)
self.assertEqual(channel.code, 200, channel.json_body)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
room_response = channel.json_body["rooms"][room_id1]
self.assertNotIn("initial", room_response)
self._assertRequiredStateIncludes(
room_response["required_state"],
{
state_map[(EventTypes.Create, "")],
},
exact=True,
)
def test_room_subscription_required_state_change_returns_immediately(self) -> None:
"""
Test that expanding an existing room subscription's required state causes an
incremental long-poll to return immediately, even without new stream activity.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(
user1_id, tok=user1_tok, extra_content={"name": "Foo"}
)
sync_body: JsonDict = {
"room_subscriptions": {
room_id1: {
"required_state": [
[EventTypes.Create, ""],
],
"timeline_limit": 0,
}
},
"conn_id": "conn_id",
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
},
exact=True,
)
sync_body["room_subscriptions"][room_id1]["required_state"] = [
[EventTypes.Create, ""],
[EventTypes.Name, ""],
]
channel = self.make_request(
"POST",
self.sync_endpoint + f"?timeout=10000&pos={from_token}",
content=sync_body,
access_token=user1_tok,
await_result=False,
)
channel.await_result(timeout_ms=3000)
self.assertEqual(channel.code, 200, channel.json_body)
room_response = channel.json_body["rooms"][room_id1]
self.assertNotIn("initial", room_response)
self._assertRequiredStateIncludes(
room_response["required_state"],
{
state_map[(EventTypes.Name, "")],
},
exact=True,
)
def test_room_subscriptions_with_leave_membership(self) -> None:
"""
Test `room_subscriptions` with a leave room should give us timeline and state