From ba9dd0f68e1bdee8a928f356bd63b230f65c7ba4 Mon Sep 17 00:00:00 2001 From: Coding Assistant Date: Fri, 22 May 2026 11:15:40 +0100 Subject: [PATCH] test: add regression test for to-device replication drop bug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When another writer advances past the local writer, Stream.get_updates() early-return path was jumping last_token to current_token (inflated by the other writer) instead of minimal_local_current_token (the local writer's actual position). This caused subsequent local writes to be silently dropped — the streamer sent POSITION instead of RDATA, and synchrotron workers never woke up for the affected users. The test creates a real _StreamFromIdGen, triggers the advance, and asserts that last_token stays at minimal_local_current_token rather than jumping to the inflated current_token. --- tests/storage/test_id_generators.py | 113 ++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py index 051c5de44d..097edb6d02 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py @@ -687,6 +687,119 @@ class WorkerMultiWriterIdGeneratorTestCase(MultiWriterIdGeneratorBase): self.assertEqual(second_id_gen.get_current_token(), 7) + def test_get_updates_does_not_skip_after_other_writer_advance(self) -> None: + """Test that Stream.get_updates() does not silently skip local writes + after another writer's advance inflates _max_position_of_local_instance. + + Regression test for a bug where the replication streamer sent POSITION + (no data) instead of RDATA (with data), causing to-device messages to + be silently dropped on synchrotron workers. + + The bug scenario: + 1. Writers "first" and "second" share a MultiWriterIdGenerator + 2. Both write, then replicate. Stream's last_token = 7. + 3. "second" writes more (positions 8-10) and notifies "first" via + advance(). This inflates first's _max_position_of_local_instance + to 10, even though first's actual position is still 3. + 4. get_updates() is called (notifier poked by advance()). + With the bug: last_token jumps from 7 to 10 in the early-return + path, because current_token=10 but minimal_local_current_token=3. + 5. When "first" later writes at 11, last_token=10 < minimal=11, so + the DB is queried. But if "first" had written at e.g. 8 (allocated + before the advance), that data at position 8 would be silently + dropped because last_token was already jumped past it. + + To test this concretely, we create a real _StreamFromIdGen and call + its actual get_updates() method. + """ + from synapse.replication.tcp.streams._base import _StreamFromIdGen + + self._insert_rows("first", 3) + first_id_gen = self._create_id_generator("first", writers=["first", "second"]) + + self._insert_rows("second", 4) + second_id_gen = self._create_id_generator( + "second", writers=["first", "second"] + ) + + self._replicate_all() + + # Create a real stream wrapping the id generator, with a mock + # update_function that records what range was queried. + db_query_ranges: list[tuple[int, int]] = [] + + async def mock_update_function( + instance_name: str, from_token: int, upto_token: int, limit: int + ) -> tuple[list, int, bool]: + db_query_ranges.append((from_token, upto_token)) + return [], upto_token, False + + stream = _StreamFromIdGen("first", mock_update_function, first_id_gen) + + # Sanity: after construction, stream.last_token == current_token == 7 + self.assertEqual(stream.last_token, 7) + + # "second" writes more (8, 9, 10) and replicates to "first" + async def _second_writes() -> None: + async with second_id_gen.get_next_mult(3): + pass + + self.get_success(_second_writes()) + first_id_gen.advance("second", 10) + + # State: first at 3, second at 10, current_token = 10, minimal = 3 + self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 10) + self.assertEqual(first_id_gen.get_minimal_local_current_token(), 3) + + # Call get_updates() — this is where the bug manifests. + # With the bug: early return jumps last_token from 7 to 10. + # With the fix: last_token advances only to minimal = 3. + updates, new_token, limited = self.get_success(stream.get_updates()) + + self.assertEqual( + updates, + [], + "No updates expected: first hasn't written anything new", + ) + self.assertEqual( + db_query_ranges, + [], + "DB should not have been queried: first hasn't written anything", + ) + + # THE KEY ASSERTION: last_token must not have jumped past 3. + # With the bug it jumps to 10, which means any future write by + # "first" at a position <= 10 would be silently dropped. + self.assertEqual( + stream.last_token, + 3, + f"last_token should be 3 (minimal_local_current_token), not " + f"{stream.last_token}. The bug causes last_token to jump to " + f"current_token (10), silently skipping future local writes.", + ) + + # Now "first" writes at position 11 and finishes + async def _first_writes() -> None: + async with first_id_gen.get_next() as stream_id: + self.assertEqual(stream_id, 11) + + self.get_success(_first_writes()) + + # get_updates() should now query the DB for the new data + db_query_ranges.clear() + updates, new_token, limited = self.get_success(stream.get_updates()) + + # last_token was 3, minimal is now 11, so 3 < 11: should query DB + self.assertEqual( + len(db_query_ranges), + 1, + f"Expected 1 DB query for first's new data, got {len(db_query_ranges)}: " + f"{db_query_ranges}. The stream should query from last_token (3) " + f"to current_token ({new_token}).", + ) + self.assertEqual(db_query_ranges[0][0], 3, "Query should start from last_token") + + class BackwardsMultiWriterIdGeneratorTestCase(MultiWriterIdGeneratorBase): """Tests MultiWriterIdGenerator that produce *negative* stream IDs."""