test: add regression test for to-device replication drop bug

When another writer advances past the local writer, Stream.get_updates()
early-return path was jumping last_token to current_token (inflated by
the other writer) instead of minimal_local_current_token (the local
writer's actual position). This caused subsequent local writes to be
silently dropped — the streamer sent POSITION instead of RDATA, and
synchrotron workers never woke up for the affected users.

The test creates a real _StreamFromIdGen, triggers the advance, and
asserts that last_token stays at minimal_local_current_token rather
than jumping to the inflated current_token.
This commit is contained in:
Coding Assistant
2026-05-22 11:15:40 +01:00
parent 4655b435ee
commit ba9dd0f68e
+113
View File
@@ -687,6 +687,119 @@ class WorkerMultiWriterIdGeneratorTestCase(MultiWriterIdGeneratorBase):
self.assertEqual(second_id_gen.get_current_token(), 7)
def test_get_updates_does_not_skip_after_other_writer_advance(self) -> None:
"""Test that Stream.get_updates() does not silently skip local writes
after another writer's advance inflates _max_position_of_local_instance.
Regression test for a bug where the replication streamer sent POSITION
(no data) instead of RDATA (with data), causing to-device messages to
be silently dropped on synchrotron workers.
The bug scenario:
1. Writers "first" and "second" share a MultiWriterIdGenerator
2. Both write, then replicate. Stream's last_token = 7.
3. "second" writes more (positions 8-10) and notifies "first" via
advance(). This inflates first's _max_position_of_local_instance
to 10, even though first's actual position is still 3.
4. get_updates() is called (notifier poked by advance()).
With the bug: last_token jumps from 7 to 10 in the early-return
path, because current_token=10 but minimal_local_current_token=3.
5. When "first" later writes at 11, last_token=10 < minimal=11, so
the DB is queried. But if "first" had written at e.g. 8 (allocated
before the advance), that data at position 8 would be silently
dropped because last_token was already jumped past it.
To test this concretely, we create a real _StreamFromIdGen and call
its actual get_updates() method.
"""
from synapse.replication.tcp.streams._base import _StreamFromIdGen
self._insert_rows("first", 3)
first_id_gen = self._create_id_generator("first", writers=["first", "second"])
self._insert_rows("second", 4)
second_id_gen = self._create_id_generator(
"second", writers=["first", "second"]
)
self._replicate_all()
# Create a real stream wrapping the id generator, with a mock
# update_function that records what range was queried.
db_query_ranges: list[tuple[int, int]] = []
async def mock_update_function(
instance_name: str, from_token: int, upto_token: int, limit: int
) -> tuple[list, int, bool]:
db_query_ranges.append((from_token, upto_token))
return [], upto_token, False
stream = _StreamFromIdGen("first", mock_update_function, first_id_gen)
# Sanity: after construction, stream.last_token == current_token == 7
self.assertEqual(stream.last_token, 7)
# "second" writes more (8, 9, 10) and replicates to "first"
async def _second_writes() -> None:
async with second_id_gen.get_next_mult(3):
pass
self.get_success(_second_writes())
first_id_gen.advance("second", 10)
# State: first at 3, second at 10, current_token = 10, minimal = 3
self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 10)
self.assertEqual(first_id_gen.get_minimal_local_current_token(), 3)
# Call get_updates() — this is where the bug manifests.
# With the bug: early return jumps last_token from 7 to 10.
# With the fix: last_token advances only to minimal = 3.
updates, new_token, limited = self.get_success(stream.get_updates())
self.assertEqual(
updates,
[],
"No updates expected: first hasn't written anything new",
)
self.assertEqual(
db_query_ranges,
[],
"DB should not have been queried: first hasn't written anything",
)
# THE KEY ASSERTION: last_token must not have jumped past 3.
# With the bug it jumps to 10, which means any future write by
# "first" at a position <= 10 would be silently dropped.
self.assertEqual(
stream.last_token,
3,
f"last_token should be 3 (minimal_local_current_token), not "
f"{stream.last_token}. The bug causes last_token to jump to "
f"current_token (10), silently skipping future local writes.",
)
# Now "first" writes at position 11 and finishes
async def _first_writes() -> None:
async with first_id_gen.get_next() as stream_id:
self.assertEqual(stream_id, 11)
self.get_success(_first_writes())
# get_updates() should now query the DB for the new data
db_query_ranges.clear()
updates, new_token, limited = self.get_success(stream.get_updates())
# last_token was 3, minimal is now 11, so 3 < 11: should query DB
self.assertEqual(
len(db_query_ranges),
1,
f"Expected 1 DB query for first's new data, got {len(db_query_ranges)}: "
f"{db_query_ranges}. The stream should query from last_token (3) "
f"to current_token ({new_token}).",
)
self.assertEqual(db_query_ranges[0][0], 3, "Query should start from last_token")
class BackwardsMultiWriterIdGeneratorTestCase(MultiWriterIdGeneratorBase):
"""Tests MultiWriterIdGenerator that produce *negative* stream IDs."""