From f2235f2e0bf3aed6fadd7eed9eaf67bb8354f262 Mon Sep 17 00:00:00 2001 From: Ivan Date: Thu, 7 May 2026 20:03:52 -0500 Subject: [PATCH] feat(tests): add integration tests for announce limits and spam handling, including max storage constraints and websocket broadcast behavior --- tests/backend/test_announce_dao_trim.py | 13 + tests/backend/test_announce_limits.py | 85 +++++ .../test_announce_limits_integration.py | 338 ++++++++++++++++++ tests/backend/test_announce_spam_sqlite.py | 218 +++++++++++ tests/backend/test_async_utils_critical.py | 108 ++++++ tests/backend/test_auto_propagation.py | 21 ++ tests/backend/test_interface_discovery.py | 125 ++++++- tests/backend/test_long_running_stress.py | 247 +++++++++++++ tests/backend/test_search_integration.py | 21 ++ tests/backend/test_websocket_scale.py | 51 +++ 10 files changed, 1226 insertions(+), 1 deletion(-) create mode 100644 tests/backend/test_announce_limits_integration.py create mode 100644 tests/backend/test_announce_spam_sqlite.py create mode 100644 tests/backend/test_async_utils_critical.py create mode 100644 tests/backend/test_long_running_stress.py diff --git a/tests/backend/test_announce_dao_trim.py b/tests/backend/test_announce_dao_trim.py index d78c6b2..c5ed439 100644 --- a/tests/backend/test_announce_dao_trim.py +++ b/tests/backend/test_announce_dao_trim.py @@ -53,6 +53,19 @@ def _cleanup(db, path): pass +def test_trim_announces_for_aspect_noop_when_max_rows_below_one(): + db = path = None + try: + db, path = _new_db() + aspect = "lxmf.delivery" + _insert(db, "01" * 16, aspect, "2000-01-01T00:00:00Z") + _insert(db, "02" * 16, aspect, "2000-01-02T00:00:00Z") + db.announces.trim_announces_for_aspect(aspect, 0) + assert db.announces.get_announce_count_by_aspect(aspect) == 2 + finally: + _cleanup(db, path) + + def test_trim_announces_for_aspect_drops_oldest(): db = path = None try: diff --git a/tests/backend/test_announce_limits.py b/tests/backend/test_announce_limits.py index 59bb9b4..4ce2c3c 100644 --- a/tests/backend/test_announce_limits.py +++ b/tests/backend/test_announce_limits.py @@ -166,6 +166,91 @@ def test_get_filtered_announces_resolves_default_limit(mock_db, mock_config): assert 33 in params +def test_max_stored_clamps_to_one_million(mock_db, mock_config): + mock_config.announce_max_stored_lxmf_delivery.get.return_value = 9_999_999 + + manager = _make_manager(mock_db, mock_config) + + assert manager._get_max_stored_for_aspect("lxmf.delivery") == 1_000_000 + + +def test_trim_called_with_clamped_max_stored(mock_db, mock_config): + mock_config.announce_max_stored_lxmf_delivery.get.return_value = 5_000_000 + + manager = _make_manager(mock_db, mock_config) + reticulum = MagicMock() + identity = MagicMock() + identity.hash.hex.return_value = "ab" * 16 + identity.get_public_key.return_value = b"pub_key" + + manager.upsert_announce( + reticulum, + identity, + b"\x01" * 16, + "lxmf.delivery", + b"app_data", + b"packet_hash", + ) + + mock_db.announces.trim_announces_for_aspect.assert_called_once_with( + "lxmf.delivery", + 1_000_000, + ) + + +def test_max_stored_zero_skips_cap(mock_db, mock_config): + mock_config.announce_max_stored_lxmf_delivery.get.return_value = 0 + + manager = _make_manager(mock_db, mock_config) + + assert manager._get_max_stored_for_aspect("lxmf.delivery") is None + + +def test_fetch_limit_clamps_to_hundred_thousand(mock_db, mock_config): + mock_config.announce_fetch_limit_lxmf_delivery = MagicMock() + mock_config.announce_fetch_limit_lxmf_delivery.get.return_value = 800_000 + + manager = _make_manager(mock_db, mock_config) + + assert manager._get_fetch_limit_for_aspect("lxmf.delivery") == 100_000 + + +def test_fetch_limit_invalid_falls_back_to_default(mock_db, mock_config): + mock_config.announce_fetch_limit_lxmf_delivery = MagicMock() + mock_config.announce_fetch_limit_lxmf_delivery.get.return_value = 0 + + manager = _make_manager(mock_db, mock_config) + + assert manager._get_fetch_limit_for_aspect("lxmf.delivery") == 500 + + +def test_fetch_limit_unknown_aspect_returns_default(mock_db, mock_config): + manager = _make_manager(mock_db, mock_config) + + assert manager._get_fetch_limit_for_aspect("unknown.aspect") == 500 + + +def test_fetch_limit_none_falls_back_to_default(mock_db, mock_config): + mock_config.announce_fetch_limit_lxmf_delivery = MagicMock() + mock_config.announce_fetch_limit_lxmf_delivery.get.return_value = None + + manager = _make_manager(mock_db, mock_config) + + assert manager._get_fetch_limit_for_aspect("lxmf.delivery") == 500 + + +def test_get_filtered_announces_uses_clamped_fetch_limit(mock_db, mock_config): + mock_config.announce_fetch_limit_lxmf_delivery = MagicMock() + mock_config.announce_fetch_limit_lxmf_delivery.get.return_value = 400_000 + + manager = _make_manager(mock_db, mock_config) + manager.get_filtered_announces(aspect="lxmf.delivery", limit=None) + + args, _ = mock_db.provider.fetchall.call_args + _sql, params = args + assert 100_000 in params + + def test_announce_handles_none_packet_hash(mock_db): manager = AnnounceManager(mock_db) reticulum = MagicMock() diff --git a/tests/backend/test_announce_limits_integration.py b/tests/backend/test_announce_limits_integration.py new file mode 100644 index 0000000..36b501d --- /dev/null +++ b/tests/backend/test_announce_limits_integration.py @@ -0,0 +1,338 @@ +# SPDX-License-Identifier: 0BSD + +"""Integration tests: announce row caps via AnnounceManager + real SQLite.""" + +import os +import tempfile +from unittest.mock import MagicMock + +import pytest + +from meshchatx.src.backend.announce_manager import AnnounceManager +from meshchatx.src.backend.database import Database +from meshchatx.src.backend.database.provider import DatabaseProvider + + +class _FakeIdentity: + __slots__ = ("_h",) + + def __init__(self, identity_hex32: str): + self._h = bytes.fromhex(identity_hex32) + + @property + def hash(self): + return self._h + + def get_public_key(self): + return b"\xaa\xbb" + + +def _cleanup(db, path): + if db is not None: + try: + db.close() + except Exception: + pass + DatabaseProvider._instance = None + if path: + try: + os.unlink(path) + except OSError: + pass + for suffix in ("-wal", "-shm"): + try: + os.unlink(path + suffix) + except OSError: + pass + + +def _new_db(): + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: + path = f.name + db = Database(path) + db.initialize() + return db, path + + +def _store_enabled_config(**max_stored): + """Config mock with storage toggles on and configurable announce_max_stored_* .get() values.""" + config = MagicMock() + for _k in ( + "announce_store_lxmf_delivery", + "announce_store_lxst_telephony", + "announce_store_nomadnetwork_node", + "announce_store_lxmf_propagation", + "announce_store_git_repositories", + ): + m = MagicMock() + m.get.return_value = True + setattr(config, _k, m) + + for key, default in ( + ("announce_max_stored_lxmf_delivery", None), + ("announce_max_stored_nomadnetwork_node", None), + ("announce_max_stored_lxmf_propagation", None), + ): + attr = MagicMock() + attr.get.return_value = max_stored.get(key, default) + setattr(config, key, attr) + + return config + + +@pytest.fixture +def sqlite_db(): + db, path = _new_db() + yield db, path + _cleanup(db, path) + + +def test_many_sequential_upserts_trims_to_max(sqlite_db): + db, _path = sqlite_db + max_keep = 12 + cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=max_keep) + mgr = AnnounceManager(db, cfg) + ret = MagicMock() + + n_insert = 55 + for i in range(n_insert): + dh = f"{i:032x}" + ident = _FakeIdentity(f"{i:032x}") + mgr.upsert_announce( + ret, + ident, + bytes.fromhex(dh), + "lxmf.delivery", + b"payload", + None, + ) + + assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == max_keep + rows = db.announces.get_announces(aspect="lxmf.delivery") + kept = {r["destination_hash"] for r in rows} + expect = {f"{i:032x}" for i in range(n_insert - max_keep, n_insert)} + assert kept == expect + + +def test_aspect_max_limits_are_independent(sqlite_db): + db, _path = sqlite_db + cfg = _store_enabled_config( + announce_max_stored_lxmf_delivery=7, + announce_max_stored_nomadnetwork_node=4, + ) + mgr = AnnounceManager(db, cfg) + ret = MagicMock() + + for i in range(20): + dh = f"{i:032x}" + ident = _FakeIdentity(f"{i:032x}") + mgr.upsert_announce( + ret, + ident, + bytes.fromhex(dh), + "lxmf.delivery", + b"x", + None, + ) + + for i in range(15): + dh = f"{0x70000000 + i:032x}" + ident = _FakeIdentity(f"{0x71000000 + i:032x}") + mgr.upsert_announce( + ret, + ident, + bytes.fromhex(dh), + "nomadnetwork.node", + b"y", + None, + ) + + assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == 7 + assert db.announces.get_announce_count_by_aspect("nomadnetwork.node") == 4 + + +def test_repeated_upsert_same_destination_does_not_expand_table(sqlite_db): + db, _path = sqlite_db + max_keep = 10 + cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=max_keep) + mgr = AnnounceManager(db, cfg) + ret = MagicMock() + + primary_dest = "f" * 32 + ident = _FakeIdentity("e" * 32) + + for _ in range(80): + mgr.upsert_announce( + ret, + ident, + bytes.fromhex(primary_dest), + "lxmf.delivery", + b"v1", + None, + ) + + for i in range(25): + dh = f"{i:032x}" + oid = _FakeIdentity(f"1{i:031x}") + mgr.upsert_announce( + ret, + oid, + bytes.fromhex(dh), + "lxmf.delivery", + b"x", + None, + ) + + assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == max_keep + dup_rows = db.provider.fetchall( + """ + SELECT destination_hash FROM announces WHERE aspect = ? + GROUP BY destination_hash HAVING COUNT(*) > 1 + """, + ("lxmf.delivery",), + ) + assert dup_rows == [] + + +def test_manager_trim_skips_contact_linked_identity(sqlite_db): + db, _path = sqlite_db + cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=500) + mgr = AnnounceManager(db, cfg) + ret = MagicMock() + + protected_idx = 3 + contact_ih = f"{protected_idx:032x}" + + for i in range(8): + dh = f"{i:032x}" + ident = _FakeIdentity(f"{i:032x}") + mgr.upsert_announce( + ret, + ident, + bytes.fromhex(dh), + "lxmf.delivery", + b"p", + None, + ) + + db.contacts.add_contact("peer", contact_ih) + + cfg.announce_max_stored_lxmf_delivery.get.return_value = 3 + mgr.upsert_announce( + ret, + _FakeIdentity("ffffffffffffffffffffffffffffffff"), + bytes.fromhex("ffffffffffffffffffffffffffffffff"), + "lxmf.delivery", + b"tick", + None, + ) + + assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == 3 + rows = db.announces.get_announces(aspect="lxmf.delivery") + hashes = {r["destination_hash"] for r in rows} + assert f"{protected_idx:032x}" in hashes + + +def test_trim_after_prefilled_table_overflow(sqlite_db): + """Simulates a large announce backlog (direct DAO inserts), then one managed upsert.""" + db, _path = sqlite_db + aspect = "lxmf.delivery" + for i in range(220): + dh = f"{i:032x}" + db.announces.upsert_announce( + { + "destination_hash": dh, + "aspect": aspect, + "identity_hash": f"{i:032x}", + "identity_public_key": "cHVibmtleQ==", + "app_data": None, + "rssi": None, + "snr": None, + "quality": None, + }, + ) + + assert db.announces.get_announce_count_by_aspect(aspect) == 220 + + max_keep = 15 + cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=max_keep) + mgr = AnnounceManager(db, cfg) + ret = MagicMock() + + mgr.upsert_announce( + ret, + _FakeIdentity("aa" * 16), + bytes.fromhex(f"{220:032x}"), + aspect, + b"flush", + None, + ) + + assert db.announces.get_announce_count_by_aspect(aspect) == max_keep + kept = {r["destination_hash"] for r in db.announces.get_announces(aspect=aspect)} + assert kept == {f"{i:032x}" for i in range(206, 221)} + + +def test_integration_respects_favourite_under_tight_cap(sqlite_db): + db, _path = sqlite_db + aspect = "lxmf.delivery" + favourite_dest = f"{5:032x}" + for i in range(24): + dh = f"{i:032x}" + db.announces.upsert_announce( + { + "destination_hash": dh, + "aspect": aspect, + "identity_hash": f"{i:032x}", + "identity_public_key": "cHVibmtleQ==", + "app_data": None, + "rssi": None, + "snr": None, + "quality": None, + }, + ) + + db.announces.upsert_favourite(favourite_dest, "Pinned", aspect) + + cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=4) + mgr = AnnounceManager(db, cfg) + ret = MagicMock() + + mgr.upsert_announce( + ret, + _FakeIdentity("bb" * 16), + bytes.fromhex(f"{100:032x}"), + aspect, + b"tight", + None, + ) + + rows = db.announces.get_announces(aspect=aspect) + hashes = {r["destination_hash"] for r in rows} + assert favourite_dest in hashes + assert f"{100:032x}" in hashes + + +def test_lxst_telephony_shares_lxmf_delivery_cap(sqlite_db): + db, _path = sqlite_db + cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=6) + mgr = AnnounceManager(db, cfg) + ret = MagicMock() + + for i in range(10): + dh = f"{0x60000000 + i:032x}" + mgr.upsert_announce( + ret, + _FakeIdentity(f"{0x61000000 + i:032x}"), + bytes.fromhex(dh), + "lxst.telephony", + b"t", + None, + ) + + assert db.announces.get_announce_count_by_aspect("lxst.telephony") == 6 + kept = { + r["destination_hash"] + for r in db.announces.get_announces(aspect="lxst.telephony") + } + assert kept == {f"{0x60000000 + i:032x}" for i in range(4, 10)} diff --git a/tests/backend/test_announce_spam_sqlite.py b/tests/backend/test_announce_spam_sqlite.py new file mode 100644 index 0000000..e240b30 --- /dev/null +++ b/tests/backend/test_announce_spam_sqlite.py @@ -0,0 +1,218 @@ +# SPDX-License-Identifier: 0BSD + +"""SQLite integration: announce spam and bounded storage (anti-exhaustion). + +Multi-minute soak scenarios live in ``test_long_running_stress.py`` (opt-in via +``MESHCHAT_LONG_TEST_SECONDS``). +""" + +from __future__ import annotations + +import os +import tempfile +from unittest.mock import MagicMock + +import pytest + +from meshchatx.src.backend.announce_manager import AnnounceManager +from meshchatx.src.backend.database import Database +from meshchatx.src.backend.database.provider import DatabaseProvider + + +class _FakeIdentity: + __slots__ = ("_h",) + + def __init__(self, identity_hex32: str): + self._h = bytes.fromhex(identity_hex32) + + @property + def hash(self): + return self._h + + def get_public_key(self): + return b"\xaa\xbb" + + +def _cleanup(db, path): + if db is not None: + try: + db.close() + except Exception: + pass + DatabaseProvider._instance = None + if path: + try: + os.unlink(path) + except OSError: + pass + for suffix in ("-wal", "-shm"): + try: + os.unlink(path + suffix) + except OSError: + pass + + +def _new_db(): + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: + path = f.name + db = Database(path) + db.initialize() + return db, path + + +def _store_enabled_config(**max_stored): + config = MagicMock() + for _k in ( + "announce_store_lxmf_delivery", + "announce_store_lxst_telephony", + "announce_store_nomadnetwork_node", + "announce_store_lxmf_propagation", + "announce_store_git_repositories", + ): + m = MagicMock() + m.get.return_value = True + setattr(config, _k, m) + + for key, default in ( + ("announce_max_stored_lxmf_delivery", None), + ("announce_max_stored_nomadnetwork_node", None), + ("announce_max_stored_lxmf_propagation", None), + ): + attr = MagicMock() + attr.get.return_value = max_stored.get(key, default) + setattr(config, key, attr) + + return config + + +@pytest.fixture +def sqlite_db(): + db, path = _new_db() + yield db, path + _cleanup(db, path) + + +def test_spam_unique_destinations_stays_within_cap(sqlite_db): + db, _path = sqlite_db + cap = 48 + cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=cap) + mgr = AnnounceManager(db, cfg) + ret = MagicMock() + + total = 650 + for i in range(total): + dh = f"{i:032x}" + mgr.upsert_announce( + ret, + _FakeIdentity(f"{i:032x}"), + bytes.fromhex(dh), + "lxmf.delivery", + b"x", + None, + ) + + assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == cap + + +def test_spam_same_destination_does_not_duplicate_rows(sqlite_db): + db, _path = sqlite_db + cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=12) + mgr = AnnounceManager(db, cfg) + ret = MagicMock() + dest = bytes.fromhex("ab" * 16) + ident = _FakeIdentity("cd" * 16) + + for _ in range(400): + mgr.upsert_announce(ret, ident, dest, "lxmf.delivery", b"y", None) + + rows = db.provider.fetchall( + "SELECT COUNT(*) AS n FROM announces WHERE aspect = ? AND destination_hash = ?", + ("lxmf.delivery", "ab" * 16), + ) + assert rows[0]["n"] == 1 + assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == 1 + + +def test_spam_interleaved_aspects_each_bounded(sqlite_db): + db, _path = sqlite_db + cfg = _store_enabled_config( + announce_max_stored_lxmf_delivery=15, + announce_max_stored_nomadnetwork_node=9, + announce_max_stored_lxmf_propagation=11, + ) + mgr = AnnounceManager(db, cfg) + ret = MagicMock() + + for round_i in range(120): + i = round_i * 3 + mgr.upsert_announce( + ret, + _FakeIdentity(f"{i + 1:032x}"), + bytes.fromhex(f"{i + 1:032x}"), + "lxmf.delivery", + b"a", + None, + ) + mgr.upsert_announce( + ret, + _FakeIdentity(f"{i + 2:032x}"), + bytes.fromhex(f"{i + 2:032x}"), + "nomadnetwork.node", + b"b", + None, + ) + mgr.upsert_announce( + ret, + _FakeIdentity(f"{i + 3:032x}"), + bytes.fromhex(f"{i + 3:032x}"), + "lxmf.propagation", + b"c", + None, + ) + + assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == 15 + assert db.announces.get_announce_count_by_aspect("nomadnetwork.node") == 9 + assert db.announces.get_announce_count_by_aspect("lxmf.propagation") == 11 + + +def test_quick_check_ok_after_heavy_announce_spam(sqlite_db): + db, _path = sqlite_db + cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=30) + mgr = AnnounceManager(db, cfg) + ret = MagicMock() + + for i in range(900): + mgr.upsert_announce( + ret, + _FakeIdentity(f"{i % 100:032x}"), + bytes.fromhex(f"{i:032x}"), + "lxmf.delivery", + os.urandom(64), + None, + ) + + qc = db.provider.quick_check() + assert qc + first = qc[0] + val = next(iter(first.values())) + assert val == "ok" + + +def test_large_app_data_spam_remains_bounded(sqlite_db): + db, _path = sqlite_db + cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=20) + mgr = AnnounceManager(db, cfg) + ret = MagicMock() + blob = b"z" * 12000 + + for i in range(180): + mgr.upsert_announce( + ret, + _FakeIdentity(f"{i:032x}"), + bytes.fromhex(f"{i:032x}"), + "lxmf.delivery", + blob, + None, + ) + + assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == 20 diff --git a/tests/backend/test_async_utils_critical.py b/tests/backend/test_async_utils_critical.py new file mode 100644 index 0000000..ffe1711 --- /dev/null +++ b/tests/backend/test_async_utils_critical.py @@ -0,0 +1,108 @@ +# SPDX-License-Identifier: 0BSD + +"""Critical-path tests for ``AsyncUtils``: cross-thread scheduling and memory caps.""" + +from __future__ import annotations + +import asyncio +import threading +import warnings + +import pytest + +from meshchatx.src.backend.async_utils import AsyncUtils + + +@pytest.fixture(autouse=True) +def _reset_async_utils(): + AsyncUtils.main_loop = None + AsyncUtils._pending_futures.clear() + AsyncUtils._pending_coroutines.clear() + yield + AsyncUtils.main_loop = None + with warnings.catch_warnings(): + warnings.simplefilter("ignore", RuntimeWarning) + AsyncUtils._pending_futures.clear() + AsyncUtils._pending_coroutines.clear() + + +async def _noop(): + return None + + +def test_buffered_coroutines_capped_when_event_loop_not_running(): + with warnings.catch_warnings(): + warnings.simplefilter("ignore", RuntimeWarning) + for _ in range(AsyncUtils._COROUTINES_MAX + 12): + AsyncUtils.run_async(_noop()) + + assert len(AsyncUtils._pending_coroutines) == AsyncUtils._COROUTINES_MAX + + +@pytest.mark.asyncio +async def test_set_main_loop_drains_buffered_coroutines(): + seen: list[bool] = [] + + async def record(): + seen.append(True) + + AsyncUtils.main_loop = None + + queued = threading.Event() + + def schedule_from_worker(): + AsyncUtils.run_async(record()) + queued.set() + + threading.Thread(target=schedule_from_worker).start() + assert queued.wait(timeout=2.0) + assert len(AsyncUtils._pending_coroutines) == 1 + + AsyncUtils.set_main_loop(asyncio.get_running_loop()) + assert AsyncUtils._pending_coroutines == [] + + await asyncio.sleep(0.15) + assert seen == [True] + + +@pytest.mark.asyncio +async def test_run_async_with_running_loop_executes_coroutine(): + outcomes: list[int] = [] + + async def work(): + outcomes.append(7) + + AsyncUtils.set_main_loop(asyncio.get_running_loop()) + + done = threading.Event() + + def schedule_from_worker(): + AsyncUtils.run_async(work()) + done.set() + + threading.Thread(target=schedule_from_worker).start() + assert done.wait(timeout=2.0) + await asyncio.sleep(0.15) + assert outcomes == [7] + + +@pytest.mark.asyncio +async def test_pending_futures_list_sheds_completed_entries(): + AsyncUtils.set_main_loop(asyncio.get_running_loop()) + with AsyncUtils._futures_lock: + AsyncUtils._pending_futures.clear() + + finished = threading.Event() + + def blast(): + for _ in range(AsyncUtils._FUTURES_SWEEP_THRESHOLD + 8): + AsyncUtils.run_async(asyncio.sleep(0)) + finished.set() + + threading.Thread(target=blast).start() + assert finished.wait(timeout=5.0) + await asyncio.sleep(0.15) + + with AsyncUtils._futures_lock: + still_pending = [f for f in AsyncUtils._pending_futures if not f.done()] + assert len(still_pending) == 0 diff --git a/tests/backend/test_auto_propagation.py b/tests/backend/test_auto_propagation.py index cbc72c2..f96595a 100644 --- a/tests/backend/test_auto_propagation.py +++ b/tests/backend/test_auto_propagation.py @@ -214,3 +214,24 @@ async def test_auto_propagation_removes_broken_node_when_all_candidates_fail(): app.set_active_propagation_node.assert_not_called() app.remove_active_propagation_node.assert_called_once_with(context=context) + + +@pytest.mark.asyncio +async def test_check_and_update_propagation_node_noops_without_message_router(): + manager, app, context, config, _database = _make_manager() + context.message_router = None + config.lxmf_preferred_propagation_node_auto_select.get.return_value = True + + await manager.check_and_update_propagation_node() + + app.set_active_propagation_node.assert_not_called() + app.remove_active_propagation_node.assert_not_called() + + +def test_stop_propagation_node_sync_noops_when_message_router_none(): + from meshchatx.meshchat import ReticulumMeshChat + + app = ReticulumMeshChat.__new__(ReticulumMeshChat) + ctx = MagicMock() + ctx.message_router = None + ReticulumMeshChat.stop_propagation_node_sync(app, context=ctx) diff --git a/tests/backend/test_interface_discovery.py b/tests/backend/test_interface_discovery.py index 70f481c..2dd9ad6 100644 --- a/tests/backend/test_interface_discovery.py +++ b/tests/backend/test_interface_discovery.py @@ -150,6 +150,40 @@ async def test_reticulum_discovery_get_and_patch(temp_dir): assert config.write_called +@pytest.mark.asyncio +async def test_reticulum_discovery_get_default_bootstrap_false_when_unset(temp_dir): + config = ConfigDict({"reticulum": {}, "interfaces": {}}) + + with ( + patch("meshchatx.meshchat.generate_ssl_certificate"), + patch("RNS.Reticulum") as mock_rns, + patch("RNS.Transport"), + patch("LXMF.LXMRouter"), + ): + mock_reticulum = mock_rns.return_value + mock_reticulum.config = config + mock_reticulum.configpath = "/tmp/mock_config" + mock_reticulum.is_connected_to_shared_instance = False + mock_reticulum.transport_enabled.return_value = True + + app_instance = ReticulumMeshChat( + identity=build_identity(), + storage_dir=temp_dir, + reticulum_config_dir=temp_dir, + ) + + get_handler = await find_route_handler( + app_instance, + "/api/v1/reticulum/discovery", + "GET", + ) + assert get_handler + + get_response = await get_handler(MagicMock()) + get_data = json.loads(get_response.body) + assert get_data["discovery"]["default_bootstrap_only"] is False + + @pytest.mark.asyncio async def test_discovery_patch_rejects_zero_autoconnect_as_unset(temp_dir): config = ConfigDict( @@ -409,7 +443,7 @@ async def test_interface_add_includes_discovery_fields(temp_dir): assert saved["discovery_frequency"] == 915000000 assert saved["discovery_bandwidth"] == 125000 assert saved["discovery_modulation"] == "LoRa" - assert saved.get("bootstrap_only") == "yes" + assert "bootstrap_only" not in saved assert config.write_called @@ -516,6 +550,66 @@ async def test_interface_add_tcp_explicit_bootstrap_only_no(temp_dir): assert config["interfaces"]["ExplicitNo"]["bootstrap_only"] == "no" +@pytest.mark.asyncio +async def test_interface_edit_tcp_preserves_bootstrap_when_key_omitted(temp_dir): + config = ConfigDict( + { + "reticulum": {"default_bootstrap_only": True}, + "interfaces": { + "KeepBoot": { + "type": "TCPClientInterface", + "target_host": "example.com", + "target_port": "4242", + "bootstrap_only": "yes", + }, + }, + }, + ) + + with ( + patch("meshchatx.meshchat.generate_ssl_certificate"), + patch("RNS.Reticulum") as mock_rns, + patch("RNS.Transport"), + patch("LXMF.LXMRouter"), + ): + mock_reticulum = mock_rns.return_value + mock_reticulum.config = config + mock_reticulum.configpath = "/tmp/mock_config" + mock_reticulum.is_connected_to_shared_instance = False + mock_reticulum.transport_enabled.return_value = True + + app_instance = ReticulumMeshChat( + identity=build_identity(), + storage_dir=temp_dir, + reticulum_config_dir=temp_dir, + ) + + add_handler = await find_route_handler( + app_instance, + "/api/v1/reticulum/interfaces/add", + "POST", + ) + assert add_handler + + payload = { + "allow_overwriting_interface": True, + "name": "KeepBoot", + "type": "TCPClientInterface", + "target_host": "example.com", + "target_port": "4242", + } + + class AddRequest: + @staticmethod + async def json(): + return payload + + response = await add_handler(AddRequest()) + data = json.loads(response.body) + assert "message" in data + assert config["interfaces"]["KeepBoot"]["bootstrap_only"] == "yes" + + def test_apply_bootstrap_only_to_interface(): details = {} ReticulumMeshChat.apply_bootstrap_only_to_interface(details, {}, True) @@ -531,6 +625,35 @@ def test_apply_bootstrap_only_to_interface(): ReticulumMeshChat.apply_bootstrap_only_to_interface(details, {}, False) assert "bootstrap_only" not in details + details = {"bootstrap_only": "yes"} + ReticulumMeshChat.apply_bootstrap_only_to_interface( + details, {}, True, updating_existing=True + ) + assert details["bootstrap_only"] == "yes" + + +def test_strip_reload_instance_suffix(): + assert ReticulumMeshChat._strip_reload_instance_suffix(None) is None + assert ReticulumMeshChat._strip_reload_instance_suffix("") is None + assert ReticulumMeshChat._strip_reload_instance_suffix("mesh") == "mesh" + assert ReticulumMeshChat._strip_reload_instance_suffix( + "production-reload-backend" + ) == ("production-reload-backend") + assert ( + ReticulumMeshChat._strip_reload_instance_suffix("my-net-reload-peer") + == "my-net-reload-peer" + ) + assert ( + ReticulumMeshChat._strip_reload_instance_suffix("node-reload-1-500") + == "node-reload-1-500" + ) + assert ( + ReticulumMeshChat._strip_reload_instance_suffix( + "default-reload-2246687-1777566181-reload-3009314-1777566481", + ) + == "default" + ) + @pytest.mark.asyncio async def test_interface_add_discoverable_without_optional_coordinates(temp_dir): diff --git a/tests/backend/test_long_running_stress.py b/tests/backend/test_long_running_stress.py new file mode 100644 index 0000000..4841a88 --- /dev/null +++ b/tests/backend/test_long_running_stress.py @@ -0,0 +1,247 @@ +# SPDX-License-Identifier: 0BSD + +"""Multi-minute soak tests (announce DB + websocket fan-out). + +These are **opt-in**: unset ``MESHCHAT_LONG_TEST_SECONDS`` skips them immediately. + +Examples:: + + MESHCHAT_LONG_TEST_SECONDS=300 uv run pytest tests/backend/test_long_running_stress.py -m long_running -v + MESHCHAT_LONG_TEST_SECONDS=600 uv run pytest tests/backend/test_long_running_stress.py -m long_running -v + +Quick smoke (seconds):: + + MESHCHAT_LONG_TEST_SECONDS=5 uv run pytest tests/backend/test_long_running_stress.py -m long_running -v +""" + +from __future__ import annotations + +import os +import tempfile +import time +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from meshchatx.meshchat import ReticulumMeshChat +from meshchatx.src.backend.announce_manager import AnnounceManager +from meshchatx.src.backend.database import Database +from meshchatx.src.backend.database.provider import DatabaseProvider + + +class _FakeIdentity: + __slots__ = ("_h",) + + def __init__(self, identity_hex32: str): + self._h = bytes.fromhex(identity_hex32) + + @property + def hash(self): + return self._h + + def get_public_key(self): + return b"\xaa\xbb" + + +def _cleanup(db, path): + if db is not None: + try: + db.close() + except Exception: + pass + DatabaseProvider._instance = None + if path: + try: + os.unlink(path) + except OSError: + pass + for suffix in ("-wal", "-shm"): + try: + os.unlink(path + suffix) + except OSError: + pass + + +def _new_db(): + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: + path = f.name + db = Database(path) + db.initialize() + return db, path + + +def _store_enabled_config(**max_stored): + config = MagicMock() + for _k in ( + "announce_store_lxmf_delivery", + "announce_store_lxst_telephony", + "announce_store_nomadnetwork_node", + "announce_store_lxmf_propagation", + "announce_store_git_repositories", + ): + m = MagicMock() + m.get.return_value = True + setattr(config, _k, m) + + for key, default in ( + ("announce_max_stored_lxmf_delivery", None), + ("announce_max_stored_nomadnetwork_node", None), + ("announce_max_stored_lxmf_propagation", None), + ): + attr = MagicMock() + attr.get.return_value = max_stored.get(key, default) + setattr(config, key, attr) + + return config + + +def _long_test_seconds() -> float: + raw = os.environ.get("MESHCHAT_LONG_TEST_SECONDS", "").strip() + if not raw: + return 0.0 + try: + return float(raw) + except ValueError: + return 0.0 + + +def _require_long_duration(): + sec = _long_test_seconds() + if sec <= 0: + pytest.skip( + "Set MESHCHAT_LONG_TEST_SECONDS to a positive value " + "(e.g. 300 for 5 minutes, 600 for 10 minutes).", + ) + return sec + + +def _bind_real_websocket_broadcast(app): + return ReticulumMeshChat.websocket_broadcast.__get__(app, ReticulumMeshChat) + + +class _MagicWs: + __slots__ = ("send_str",) + + def __init__(self): + self.send_str = AsyncMock(return_value=None) + + +@pytest.mark.long_running +def test_soak_sqlite_announces_stay_bounded_and_quick_check(): + duration_s = _require_long_duration() + cap = 96 + batch_size = 120 + qc_interval_batches = 15 + + db, path = _new_db() + try: + cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=cap) + mgr = AnnounceManager(db, cfg) + ret = MagicMock() + deadline = time.monotonic() + duration_s + seq = 0 + batches = 0 + while time.monotonic() < deadline: + for _ in range(batch_size): + mgr.upsert_announce( + ret, + _FakeIdentity(f"{seq % 2048:032x}"), + bytes.fromhex(f"{seq:032x}"), + "lxmf.delivery", + os.urandom(48), + None, + ) + seq += 1 + batches += 1 + assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == cap + if batches % qc_interval_batches == 0: + qc = db.provider.quick_check() + assert qc + assert next(iter(qc[0].values())) == "ok" + assert seq > 0 + finally: + _cleanup(db, path) + + +@pytest.mark.long_running +def test_soak_interleaved_aspects_under_cap(): + duration_s = _require_long_duration() + cfg = _store_enabled_config( + announce_max_stored_lxmf_delivery=40, + announce_max_stored_nomadnetwork_node=25, + announce_max_stored_lxmf_propagation=30, + ) + + db, path = _new_db() + try: + mgr = AnnounceManager(db, cfg) + ret = MagicMock() + deadline = time.monotonic() + duration_s + n = 0 + while time.monotonic() < deadline: + for aspect, payload in ( + ("lxmf.delivery", b"a"), + ("nomadnetwork.node", b"b"), + ("lxmf.propagation", b"c"), + ): + mgr.upsert_announce( + ret, + _FakeIdentity(f"{n % 900:032x}"), + bytes.fromhex(f"{n:032x}"), + aspect, + payload, + None, + ) + n += 1 + assert db.announces.get_announce_count_by_aspect("lxmf.delivery") <= 40 + assert db.announces.get_announce_count_by_aspect("nomadnetwork.node") <= 25 + assert db.announces.get_announce_count_by_aspect("lxmf.propagation") <= 30 + assert n > 0 + assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == 40 + assert db.announces.get_announce_count_by_aspect("nomadnetwork.node") == 25 + assert db.announces.get_announce_count_by_aspect("lxmf.propagation") == 30 + finally: + _cleanup(db, path) + + +@pytest.mark.long_running +@pytest.mark.asyncio +async def test_soak_websocket_broadcast_under_load(mock_app): + duration_s = _require_long_duration() + mock_app.websocket_clients.clear() + n_clients = min(int(os.environ.get("MESHCHAT_LONG_TEST_WS_CLIENTS", "400")), 2000) + clients = [_MagicWs() for _ in range(n_clients)] + mock_app.websocket_clients.extend(clients) + real = _bind_real_websocket_broadcast(mock_app) + + deadline = time.monotonic() + duration_s + rounds = 0 + while time.monotonic() < deadline: + payload = f'{{"type":"soak","round":{rounds}}}' + await real(payload) + for c in clients: + assert c.send_str.await_args[0][0] == payload + rounds += 1 + assert rounds > 0 + + +@pytest.mark.long_running +@pytest.mark.asyncio +async def test_soak_websocket_broadcast_with_churn(mock_app): + duration_s = _require_long_duration() + real = _bind_real_websocket_broadcast(mock_app) + deadline = time.monotonic() + duration_s + wave = 0 + while time.monotonic() < deadline: + mock_app.websocket_clients.clear() + batch = [_MagicWs() for _ in range(80)] + if wave % 3 == 0: + for c in batch[:20]: + c.send_str = AsyncMock(side_effect=ConnectionError("closed")) + mock_app.websocket_clients.extend(batch) + payload = f'{{"wave":{wave}}}' + await real(payload) + for c in mock_app.websocket_clients: + assert c.send_str.await_args[0][0] == payload + wave += 1 + assert wave > 0 diff --git a/tests/backend/test_search_integration.py b/tests/backend/test_search_integration.py index 759a569..77d32d8 100644 --- a/tests/backend/test_search_integration.py +++ b/tests/backend/test_search_integration.py @@ -85,6 +85,27 @@ def test_filter_announced_dicts_by_search_query_destination_hash_substring(): assert len(out) == 1 +def test_filter_announced_dicts_empty_search_matches_all(): + """Empty substring matches every string in Python; callers should normalize UI input.""" + items = [ + {"display_name": "AAA"}, + {"destination_hash": "0123abcd"}, + {"identity_hash": "fedcba"}, + ] + out = filter_announced_dicts_by_search_query(items, "") + assert len(out) == 3 + + +def test_filter_announced_dicts_whitespace_search_can_match(): + items = [ + {"display_name": " Hi "}, + {"destination_hash": "99"}, + ] + out = filter_announced_dicts_by_search_query(items, " ") + assert len(out) == 1 + assert out[0]["display_name"] == " Hi " + + def test_filter_announced_dicts_by_search_query_case_insensitive(): items = [ {"display_name": "CamelCaseName", "destination_hash": "z" * 32}, diff --git a/tests/backend/test_websocket_scale.py b/tests/backend/test_websocket_scale.py index 1ade132..29becb3 100644 --- a/tests/backend/test_websocket_scale.py +++ b/tests/backend/test_websocket_scale.py @@ -109,6 +109,57 @@ async def test_websocket_broadcast_drops_dead_clients(mock_app): assert good.send_str.await_count == 1 +@pytest.mark.asyncio +async def test_websocket_broadcast_all_clients_dead_empties_list(mock_app): + mock_app.websocket_clients.clear() + clients = [] + for _ in range(80): + c = MagicWs() + c.send_str = AsyncMock(side_effect=ConnectionError("closed")) + clients.append(c) + mock_app.websocket_clients.extend(clients) + + real = _bind_real_websocket_broadcast(mock_app) + await real("{}") + + assert mock_app.websocket_clients == [] + + +@pytest.mark.asyncio +async def test_websocket_broadcast_fanout_large_client_pool(mock_app): + mock_app.websocket_clients.clear() + n = 1500 + clients = [MagicWs() for _ in range(n)] + mock_app.websocket_clients.extend(clients) + + real = _bind_real_websocket_broadcast(mock_app) + payload = '{"type":"metrics","x":1}' + await real(payload) + + for c in clients: + assert c.send_str.await_count == 1 + assert c.send_str.await_args[0][0] == payload + + +@pytest.mark.asyncio +async def test_websocket_broadcast_mixed_failures_still_delivers_to_healthy(mock_app): + mock_app.websocket_clients.clear() + failing = [MagicWs() for _ in range(40)] + for c in failing: + c.send_str = AsyncMock(side_effect=BrokenPipeError()) + healthy = [MagicWs() for _ in range(60)] + mock_app.websocket_clients.extend(failing + healthy) + + real = _bind_real_websocket_broadcast(mock_app) + await real('{"type":"ping"}') + + for c in failing: + assert c not in mock_app.websocket_clients + assert mock_app.websocket_clients == healthy + for c in healthy: + assert c.send_str.await_count == 1 + + class MagicWs: def __init__(self): self.send_str = AsyncMock(return_value=None)