feat(tests): add integration tests for announce limits and spam handling, including max storage constraints and websocket broadcast behavior

This commit is contained in:
Ivan
2026-05-07 20:03:52 -05:00
parent 7867034f5d
commit f2235f2e0b
10 changed files with 1226 additions and 1 deletions
+13
View File
@@ -53,6 +53,19 @@ def _cleanup(db, path):
pass
def test_trim_announces_for_aspect_noop_when_max_rows_below_one():
db = path = None
try:
db, path = _new_db()
aspect = "lxmf.delivery"
_insert(db, "01" * 16, aspect, "2000-01-01T00:00:00Z")
_insert(db, "02" * 16, aspect, "2000-01-02T00:00:00Z")
db.announces.trim_announces_for_aspect(aspect, 0)
assert db.announces.get_announce_count_by_aspect(aspect) == 2
finally:
_cleanup(db, path)
def test_trim_announces_for_aspect_drops_oldest():
db = path = None
try:
+85
View File
@@ -166,6 +166,91 @@ def test_get_filtered_announces_resolves_default_limit(mock_db, mock_config):
assert 33 in params
def test_max_stored_clamps_to_one_million(mock_db, mock_config):
mock_config.announce_max_stored_lxmf_delivery.get.return_value = 9_999_999
manager = _make_manager(mock_db, mock_config)
assert manager._get_max_stored_for_aspect("lxmf.delivery") == 1_000_000
def test_trim_called_with_clamped_max_stored(mock_db, mock_config):
mock_config.announce_max_stored_lxmf_delivery.get.return_value = 5_000_000
manager = _make_manager(mock_db, mock_config)
reticulum = MagicMock()
identity = MagicMock()
identity.hash.hex.return_value = "ab" * 16
identity.get_public_key.return_value = b"pub_key"
manager.upsert_announce(
reticulum,
identity,
b"\x01" * 16,
"lxmf.delivery",
b"app_data",
b"packet_hash",
)
mock_db.announces.trim_announces_for_aspect.assert_called_once_with(
"lxmf.delivery",
1_000_000,
)
def test_max_stored_zero_skips_cap(mock_db, mock_config):
mock_config.announce_max_stored_lxmf_delivery.get.return_value = 0
manager = _make_manager(mock_db, mock_config)
assert manager._get_max_stored_for_aspect("lxmf.delivery") is None
def test_fetch_limit_clamps_to_hundred_thousand(mock_db, mock_config):
mock_config.announce_fetch_limit_lxmf_delivery = MagicMock()
mock_config.announce_fetch_limit_lxmf_delivery.get.return_value = 800_000
manager = _make_manager(mock_db, mock_config)
assert manager._get_fetch_limit_for_aspect("lxmf.delivery") == 100_000
def test_fetch_limit_invalid_falls_back_to_default(mock_db, mock_config):
mock_config.announce_fetch_limit_lxmf_delivery = MagicMock()
mock_config.announce_fetch_limit_lxmf_delivery.get.return_value = 0
manager = _make_manager(mock_db, mock_config)
assert manager._get_fetch_limit_for_aspect("lxmf.delivery") == 500
def test_fetch_limit_unknown_aspect_returns_default(mock_db, mock_config):
manager = _make_manager(mock_db, mock_config)
assert manager._get_fetch_limit_for_aspect("unknown.aspect") == 500
def test_fetch_limit_none_falls_back_to_default(mock_db, mock_config):
mock_config.announce_fetch_limit_lxmf_delivery = MagicMock()
mock_config.announce_fetch_limit_lxmf_delivery.get.return_value = None
manager = _make_manager(mock_db, mock_config)
assert manager._get_fetch_limit_for_aspect("lxmf.delivery") == 500
def test_get_filtered_announces_uses_clamped_fetch_limit(mock_db, mock_config):
mock_config.announce_fetch_limit_lxmf_delivery = MagicMock()
mock_config.announce_fetch_limit_lxmf_delivery.get.return_value = 400_000
manager = _make_manager(mock_db, mock_config)
manager.get_filtered_announces(aspect="lxmf.delivery", limit=None)
args, _ = mock_db.provider.fetchall.call_args
_sql, params = args
assert 100_000 in params
def test_announce_handles_none_packet_hash(mock_db):
manager = AnnounceManager(mock_db)
reticulum = MagicMock()
@@ -0,0 +1,338 @@
# SPDX-License-Identifier: 0BSD
"""Integration tests: announce row caps via AnnounceManager + real SQLite."""
import os
import tempfile
from unittest.mock import MagicMock
import pytest
from meshchatx.src.backend.announce_manager import AnnounceManager
from meshchatx.src.backend.database import Database
from meshchatx.src.backend.database.provider import DatabaseProvider
class _FakeIdentity:
__slots__ = ("_h",)
def __init__(self, identity_hex32: str):
self._h = bytes.fromhex(identity_hex32)
@property
def hash(self):
return self._h
def get_public_key(self):
return b"\xaa\xbb"
def _cleanup(db, path):
if db is not None:
try:
db.close()
except Exception:
pass
DatabaseProvider._instance = None
if path:
try:
os.unlink(path)
except OSError:
pass
for suffix in ("-wal", "-shm"):
try:
os.unlink(path + suffix)
except OSError:
pass
def _new_db():
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
path = f.name
db = Database(path)
db.initialize()
return db, path
def _store_enabled_config(**max_stored):
"""Config mock with storage toggles on and configurable announce_max_stored_* .get() values."""
config = MagicMock()
for _k in (
"announce_store_lxmf_delivery",
"announce_store_lxst_telephony",
"announce_store_nomadnetwork_node",
"announce_store_lxmf_propagation",
"announce_store_git_repositories",
):
m = MagicMock()
m.get.return_value = True
setattr(config, _k, m)
for key, default in (
("announce_max_stored_lxmf_delivery", None),
("announce_max_stored_nomadnetwork_node", None),
("announce_max_stored_lxmf_propagation", None),
):
attr = MagicMock()
attr.get.return_value = max_stored.get(key, default)
setattr(config, key, attr)
return config
@pytest.fixture
def sqlite_db():
db, path = _new_db()
yield db, path
_cleanup(db, path)
def test_many_sequential_upserts_trims_to_max(sqlite_db):
db, _path = sqlite_db
max_keep = 12
cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=max_keep)
mgr = AnnounceManager(db, cfg)
ret = MagicMock()
n_insert = 55
for i in range(n_insert):
dh = f"{i:032x}"
ident = _FakeIdentity(f"{i:032x}")
mgr.upsert_announce(
ret,
ident,
bytes.fromhex(dh),
"lxmf.delivery",
b"payload",
None,
)
assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == max_keep
rows = db.announces.get_announces(aspect="lxmf.delivery")
kept = {r["destination_hash"] for r in rows}
expect = {f"{i:032x}" for i in range(n_insert - max_keep, n_insert)}
assert kept == expect
def test_aspect_max_limits_are_independent(sqlite_db):
db, _path = sqlite_db
cfg = _store_enabled_config(
announce_max_stored_lxmf_delivery=7,
announce_max_stored_nomadnetwork_node=4,
)
mgr = AnnounceManager(db, cfg)
ret = MagicMock()
for i in range(20):
dh = f"{i:032x}"
ident = _FakeIdentity(f"{i:032x}")
mgr.upsert_announce(
ret,
ident,
bytes.fromhex(dh),
"lxmf.delivery",
b"x",
None,
)
for i in range(15):
dh = f"{0x70000000 + i:032x}"
ident = _FakeIdentity(f"{0x71000000 + i:032x}")
mgr.upsert_announce(
ret,
ident,
bytes.fromhex(dh),
"nomadnetwork.node",
b"y",
None,
)
assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == 7
assert db.announces.get_announce_count_by_aspect("nomadnetwork.node") == 4
def test_repeated_upsert_same_destination_does_not_expand_table(sqlite_db):
db, _path = sqlite_db
max_keep = 10
cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=max_keep)
mgr = AnnounceManager(db, cfg)
ret = MagicMock()
primary_dest = "f" * 32
ident = _FakeIdentity("e" * 32)
for _ in range(80):
mgr.upsert_announce(
ret,
ident,
bytes.fromhex(primary_dest),
"lxmf.delivery",
b"v1",
None,
)
for i in range(25):
dh = f"{i:032x}"
oid = _FakeIdentity(f"1{i:031x}")
mgr.upsert_announce(
ret,
oid,
bytes.fromhex(dh),
"lxmf.delivery",
b"x",
None,
)
assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == max_keep
dup_rows = db.provider.fetchall(
"""
SELECT destination_hash FROM announces WHERE aspect = ?
GROUP BY destination_hash HAVING COUNT(*) > 1
""",
("lxmf.delivery",),
)
assert dup_rows == []
def test_manager_trim_skips_contact_linked_identity(sqlite_db):
db, _path = sqlite_db
cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=500)
mgr = AnnounceManager(db, cfg)
ret = MagicMock()
protected_idx = 3
contact_ih = f"{protected_idx:032x}"
for i in range(8):
dh = f"{i:032x}"
ident = _FakeIdentity(f"{i:032x}")
mgr.upsert_announce(
ret,
ident,
bytes.fromhex(dh),
"lxmf.delivery",
b"p",
None,
)
db.contacts.add_contact("peer", contact_ih)
cfg.announce_max_stored_lxmf_delivery.get.return_value = 3
mgr.upsert_announce(
ret,
_FakeIdentity("ffffffffffffffffffffffffffffffff"),
bytes.fromhex("ffffffffffffffffffffffffffffffff"),
"lxmf.delivery",
b"tick",
None,
)
assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == 3
rows = db.announces.get_announces(aspect="lxmf.delivery")
hashes = {r["destination_hash"] for r in rows}
assert f"{protected_idx:032x}" in hashes
def test_trim_after_prefilled_table_overflow(sqlite_db):
"""Simulates a large announce backlog (direct DAO inserts), then one managed upsert."""
db, _path = sqlite_db
aspect = "lxmf.delivery"
for i in range(220):
dh = f"{i:032x}"
db.announces.upsert_announce(
{
"destination_hash": dh,
"aspect": aspect,
"identity_hash": f"{i:032x}",
"identity_public_key": "cHVibmtleQ==",
"app_data": None,
"rssi": None,
"snr": None,
"quality": None,
},
)
assert db.announces.get_announce_count_by_aspect(aspect) == 220
max_keep = 15
cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=max_keep)
mgr = AnnounceManager(db, cfg)
ret = MagicMock()
mgr.upsert_announce(
ret,
_FakeIdentity("aa" * 16),
bytes.fromhex(f"{220:032x}"),
aspect,
b"flush",
None,
)
assert db.announces.get_announce_count_by_aspect(aspect) == max_keep
kept = {r["destination_hash"] for r in db.announces.get_announces(aspect=aspect)}
assert kept == {f"{i:032x}" for i in range(206, 221)}
def test_integration_respects_favourite_under_tight_cap(sqlite_db):
db, _path = sqlite_db
aspect = "lxmf.delivery"
favourite_dest = f"{5:032x}"
for i in range(24):
dh = f"{i:032x}"
db.announces.upsert_announce(
{
"destination_hash": dh,
"aspect": aspect,
"identity_hash": f"{i:032x}",
"identity_public_key": "cHVibmtleQ==",
"app_data": None,
"rssi": None,
"snr": None,
"quality": None,
},
)
db.announces.upsert_favourite(favourite_dest, "Pinned", aspect)
cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=4)
mgr = AnnounceManager(db, cfg)
ret = MagicMock()
mgr.upsert_announce(
ret,
_FakeIdentity("bb" * 16),
bytes.fromhex(f"{100:032x}"),
aspect,
b"tight",
None,
)
rows = db.announces.get_announces(aspect=aspect)
hashes = {r["destination_hash"] for r in rows}
assert favourite_dest in hashes
assert f"{100:032x}" in hashes
def test_lxst_telephony_shares_lxmf_delivery_cap(sqlite_db):
db, _path = sqlite_db
cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=6)
mgr = AnnounceManager(db, cfg)
ret = MagicMock()
for i in range(10):
dh = f"{0x60000000 + i:032x}"
mgr.upsert_announce(
ret,
_FakeIdentity(f"{0x61000000 + i:032x}"),
bytes.fromhex(dh),
"lxst.telephony",
b"t",
None,
)
assert db.announces.get_announce_count_by_aspect("lxst.telephony") == 6
kept = {
r["destination_hash"]
for r in db.announces.get_announces(aspect="lxst.telephony")
}
assert kept == {f"{0x60000000 + i:032x}" for i in range(4, 10)}
+218
View File
@@ -0,0 +1,218 @@
# SPDX-License-Identifier: 0BSD
"""SQLite integration: announce spam and bounded storage (anti-exhaustion).
Multi-minute soak scenarios live in ``test_long_running_stress.py`` (opt-in via
``MESHCHAT_LONG_TEST_SECONDS``).
"""
from __future__ import annotations
import os
import tempfile
from unittest.mock import MagicMock
import pytest
from meshchatx.src.backend.announce_manager import AnnounceManager
from meshchatx.src.backend.database import Database
from meshchatx.src.backend.database.provider import DatabaseProvider
class _FakeIdentity:
__slots__ = ("_h",)
def __init__(self, identity_hex32: str):
self._h = bytes.fromhex(identity_hex32)
@property
def hash(self):
return self._h
def get_public_key(self):
return b"\xaa\xbb"
def _cleanup(db, path):
if db is not None:
try:
db.close()
except Exception:
pass
DatabaseProvider._instance = None
if path:
try:
os.unlink(path)
except OSError:
pass
for suffix in ("-wal", "-shm"):
try:
os.unlink(path + suffix)
except OSError:
pass
def _new_db():
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
path = f.name
db = Database(path)
db.initialize()
return db, path
def _store_enabled_config(**max_stored):
config = MagicMock()
for _k in (
"announce_store_lxmf_delivery",
"announce_store_lxst_telephony",
"announce_store_nomadnetwork_node",
"announce_store_lxmf_propagation",
"announce_store_git_repositories",
):
m = MagicMock()
m.get.return_value = True
setattr(config, _k, m)
for key, default in (
("announce_max_stored_lxmf_delivery", None),
("announce_max_stored_nomadnetwork_node", None),
("announce_max_stored_lxmf_propagation", None),
):
attr = MagicMock()
attr.get.return_value = max_stored.get(key, default)
setattr(config, key, attr)
return config
@pytest.fixture
def sqlite_db():
db, path = _new_db()
yield db, path
_cleanup(db, path)
def test_spam_unique_destinations_stays_within_cap(sqlite_db):
db, _path = sqlite_db
cap = 48
cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=cap)
mgr = AnnounceManager(db, cfg)
ret = MagicMock()
total = 650
for i in range(total):
dh = f"{i:032x}"
mgr.upsert_announce(
ret,
_FakeIdentity(f"{i:032x}"),
bytes.fromhex(dh),
"lxmf.delivery",
b"x",
None,
)
assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == cap
def test_spam_same_destination_does_not_duplicate_rows(sqlite_db):
db, _path = sqlite_db
cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=12)
mgr = AnnounceManager(db, cfg)
ret = MagicMock()
dest = bytes.fromhex("ab" * 16)
ident = _FakeIdentity("cd" * 16)
for _ in range(400):
mgr.upsert_announce(ret, ident, dest, "lxmf.delivery", b"y", None)
rows = db.provider.fetchall(
"SELECT COUNT(*) AS n FROM announces WHERE aspect = ? AND destination_hash = ?",
("lxmf.delivery", "ab" * 16),
)
assert rows[0]["n"] == 1
assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == 1
def test_spam_interleaved_aspects_each_bounded(sqlite_db):
db, _path = sqlite_db
cfg = _store_enabled_config(
announce_max_stored_lxmf_delivery=15,
announce_max_stored_nomadnetwork_node=9,
announce_max_stored_lxmf_propagation=11,
)
mgr = AnnounceManager(db, cfg)
ret = MagicMock()
for round_i in range(120):
i = round_i * 3
mgr.upsert_announce(
ret,
_FakeIdentity(f"{i + 1:032x}"),
bytes.fromhex(f"{i + 1:032x}"),
"lxmf.delivery",
b"a",
None,
)
mgr.upsert_announce(
ret,
_FakeIdentity(f"{i + 2:032x}"),
bytes.fromhex(f"{i + 2:032x}"),
"nomadnetwork.node",
b"b",
None,
)
mgr.upsert_announce(
ret,
_FakeIdentity(f"{i + 3:032x}"),
bytes.fromhex(f"{i + 3:032x}"),
"lxmf.propagation",
b"c",
None,
)
assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == 15
assert db.announces.get_announce_count_by_aspect("nomadnetwork.node") == 9
assert db.announces.get_announce_count_by_aspect("lxmf.propagation") == 11
def test_quick_check_ok_after_heavy_announce_spam(sqlite_db):
db, _path = sqlite_db
cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=30)
mgr = AnnounceManager(db, cfg)
ret = MagicMock()
for i in range(900):
mgr.upsert_announce(
ret,
_FakeIdentity(f"{i % 100:032x}"),
bytes.fromhex(f"{i:032x}"),
"lxmf.delivery",
os.urandom(64),
None,
)
qc = db.provider.quick_check()
assert qc
first = qc[0]
val = next(iter(first.values()))
assert val == "ok"
def test_large_app_data_spam_remains_bounded(sqlite_db):
db, _path = sqlite_db
cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=20)
mgr = AnnounceManager(db, cfg)
ret = MagicMock()
blob = b"z" * 12000
for i in range(180):
mgr.upsert_announce(
ret,
_FakeIdentity(f"{i:032x}"),
bytes.fromhex(f"{i:032x}"),
"lxmf.delivery",
blob,
None,
)
assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == 20
+108
View File
@@ -0,0 +1,108 @@
# SPDX-License-Identifier: 0BSD
"""Critical-path tests for ``AsyncUtils``: cross-thread scheduling and memory caps."""
from __future__ import annotations
import asyncio
import threading
import warnings
import pytest
from meshchatx.src.backend.async_utils import AsyncUtils
@pytest.fixture(autouse=True)
def _reset_async_utils():
AsyncUtils.main_loop = None
AsyncUtils._pending_futures.clear()
AsyncUtils._pending_coroutines.clear()
yield
AsyncUtils.main_loop = None
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
AsyncUtils._pending_futures.clear()
AsyncUtils._pending_coroutines.clear()
async def _noop():
return None
def test_buffered_coroutines_capped_when_event_loop_not_running():
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
for _ in range(AsyncUtils._COROUTINES_MAX + 12):
AsyncUtils.run_async(_noop())
assert len(AsyncUtils._pending_coroutines) == AsyncUtils._COROUTINES_MAX
@pytest.mark.asyncio
async def test_set_main_loop_drains_buffered_coroutines():
seen: list[bool] = []
async def record():
seen.append(True)
AsyncUtils.main_loop = None
queued = threading.Event()
def schedule_from_worker():
AsyncUtils.run_async(record())
queued.set()
threading.Thread(target=schedule_from_worker).start()
assert queued.wait(timeout=2.0)
assert len(AsyncUtils._pending_coroutines) == 1
AsyncUtils.set_main_loop(asyncio.get_running_loop())
assert AsyncUtils._pending_coroutines == []
await asyncio.sleep(0.15)
assert seen == [True]
@pytest.mark.asyncio
async def test_run_async_with_running_loop_executes_coroutine():
outcomes: list[int] = []
async def work():
outcomes.append(7)
AsyncUtils.set_main_loop(asyncio.get_running_loop())
done = threading.Event()
def schedule_from_worker():
AsyncUtils.run_async(work())
done.set()
threading.Thread(target=schedule_from_worker).start()
assert done.wait(timeout=2.0)
await asyncio.sleep(0.15)
assert outcomes == [7]
@pytest.mark.asyncio
async def test_pending_futures_list_sheds_completed_entries():
AsyncUtils.set_main_loop(asyncio.get_running_loop())
with AsyncUtils._futures_lock:
AsyncUtils._pending_futures.clear()
finished = threading.Event()
def blast():
for _ in range(AsyncUtils._FUTURES_SWEEP_THRESHOLD + 8):
AsyncUtils.run_async(asyncio.sleep(0))
finished.set()
threading.Thread(target=blast).start()
assert finished.wait(timeout=5.0)
await asyncio.sleep(0.15)
with AsyncUtils._futures_lock:
still_pending = [f for f in AsyncUtils._pending_futures if not f.done()]
assert len(still_pending) == 0
+21
View File
@@ -214,3 +214,24 @@ async def test_auto_propagation_removes_broken_node_when_all_candidates_fail():
app.set_active_propagation_node.assert_not_called()
app.remove_active_propagation_node.assert_called_once_with(context=context)
@pytest.mark.asyncio
async def test_check_and_update_propagation_node_noops_without_message_router():
manager, app, context, config, _database = _make_manager()
context.message_router = None
config.lxmf_preferred_propagation_node_auto_select.get.return_value = True
await manager.check_and_update_propagation_node()
app.set_active_propagation_node.assert_not_called()
app.remove_active_propagation_node.assert_not_called()
def test_stop_propagation_node_sync_noops_when_message_router_none():
from meshchatx.meshchat import ReticulumMeshChat
app = ReticulumMeshChat.__new__(ReticulumMeshChat)
ctx = MagicMock()
ctx.message_router = None
ReticulumMeshChat.stop_propagation_node_sync(app, context=ctx)
+124 -1
View File
@@ -150,6 +150,40 @@ async def test_reticulum_discovery_get_and_patch(temp_dir):
assert config.write_called
@pytest.mark.asyncio
async def test_reticulum_discovery_get_default_bootstrap_false_when_unset(temp_dir):
config = ConfigDict({"reticulum": {}, "interfaces": {}})
with (
patch("meshchatx.meshchat.generate_ssl_certificate"),
patch("RNS.Reticulum") as mock_rns,
patch("RNS.Transport"),
patch("LXMF.LXMRouter"),
):
mock_reticulum = mock_rns.return_value
mock_reticulum.config = config
mock_reticulum.configpath = "/tmp/mock_config"
mock_reticulum.is_connected_to_shared_instance = False
mock_reticulum.transport_enabled.return_value = True
app_instance = ReticulumMeshChat(
identity=build_identity(),
storage_dir=temp_dir,
reticulum_config_dir=temp_dir,
)
get_handler = await find_route_handler(
app_instance,
"/api/v1/reticulum/discovery",
"GET",
)
assert get_handler
get_response = await get_handler(MagicMock())
get_data = json.loads(get_response.body)
assert get_data["discovery"]["default_bootstrap_only"] is False
@pytest.mark.asyncio
async def test_discovery_patch_rejects_zero_autoconnect_as_unset(temp_dir):
config = ConfigDict(
@@ -409,7 +443,7 @@ async def test_interface_add_includes_discovery_fields(temp_dir):
assert saved["discovery_frequency"] == 915000000
assert saved["discovery_bandwidth"] == 125000
assert saved["discovery_modulation"] == "LoRa"
assert saved.get("bootstrap_only") == "yes"
assert "bootstrap_only" not in saved
assert config.write_called
@@ -516,6 +550,66 @@ async def test_interface_add_tcp_explicit_bootstrap_only_no(temp_dir):
assert config["interfaces"]["ExplicitNo"]["bootstrap_only"] == "no"
@pytest.mark.asyncio
async def test_interface_edit_tcp_preserves_bootstrap_when_key_omitted(temp_dir):
config = ConfigDict(
{
"reticulum": {"default_bootstrap_only": True},
"interfaces": {
"KeepBoot": {
"type": "TCPClientInterface",
"target_host": "example.com",
"target_port": "4242",
"bootstrap_only": "yes",
},
},
},
)
with (
patch("meshchatx.meshchat.generate_ssl_certificate"),
patch("RNS.Reticulum") as mock_rns,
patch("RNS.Transport"),
patch("LXMF.LXMRouter"),
):
mock_reticulum = mock_rns.return_value
mock_reticulum.config = config
mock_reticulum.configpath = "/tmp/mock_config"
mock_reticulum.is_connected_to_shared_instance = False
mock_reticulum.transport_enabled.return_value = True
app_instance = ReticulumMeshChat(
identity=build_identity(),
storage_dir=temp_dir,
reticulum_config_dir=temp_dir,
)
add_handler = await find_route_handler(
app_instance,
"/api/v1/reticulum/interfaces/add",
"POST",
)
assert add_handler
payload = {
"allow_overwriting_interface": True,
"name": "KeepBoot",
"type": "TCPClientInterface",
"target_host": "example.com",
"target_port": "4242",
}
class AddRequest:
@staticmethod
async def json():
return payload
response = await add_handler(AddRequest())
data = json.loads(response.body)
assert "message" in data
assert config["interfaces"]["KeepBoot"]["bootstrap_only"] == "yes"
def test_apply_bootstrap_only_to_interface():
details = {}
ReticulumMeshChat.apply_bootstrap_only_to_interface(details, {}, True)
@@ -531,6 +625,35 @@ def test_apply_bootstrap_only_to_interface():
ReticulumMeshChat.apply_bootstrap_only_to_interface(details, {}, False)
assert "bootstrap_only" not in details
details = {"bootstrap_only": "yes"}
ReticulumMeshChat.apply_bootstrap_only_to_interface(
details, {}, True, updating_existing=True
)
assert details["bootstrap_only"] == "yes"
def test_strip_reload_instance_suffix():
assert ReticulumMeshChat._strip_reload_instance_suffix(None) is None
assert ReticulumMeshChat._strip_reload_instance_suffix("") is None
assert ReticulumMeshChat._strip_reload_instance_suffix("mesh") == "mesh"
assert ReticulumMeshChat._strip_reload_instance_suffix(
"production-reload-backend"
) == ("production-reload-backend")
assert (
ReticulumMeshChat._strip_reload_instance_suffix("my-net-reload-peer")
== "my-net-reload-peer"
)
assert (
ReticulumMeshChat._strip_reload_instance_suffix("node-reload-1-500")
== "node-reload-1-500"
)
assert (
ReticulumMeshChat._strip_reload_instance_suffix(
"default-reload-2246687-1777566181-reload-3009314-1777566481",
)
== "default"
)
@pytest.mark.asyncio
async def test_interface_add_discoverable_without_optional_coordinates(temp_dir):
+247
View File
@@ -0,0 +1,247 @@
# SPDX-License-Identifier: 0BSD
"""Multi-minute soak tests (announce DB + websocket fan-out).
These are **opt-in**: unset ``MESHCHAT_LONG_TEST_SECONDS`` skips them immediately.
Examples::
MESHCHAT_LONG_TEST_SECONDS=300 uv run pytest tests/backend/test_long_running_stress.py -m long_running -v
MESHCHAT_LONG_TEST_SECONDS=600 uv run pytest tests/backend/test_long_running_stress.py -m long_running -v
Quick smoke (seconds)::
MESHCHAT_LONG_TEST_SECONDS=5 uv run pytest tests/backend/test_long_running_stress.py -m long_running -v
"""
from __future__ import annotations
import os
import tempfile
import time
from unittest.mock import AsyncMock, MagicMock
import pytest
from meshchatx.meshchat import ReticulumMeshChat
from meshchatx.src.backend.announce_manager import AnnounceManager
from meshchatx.src.backend.database import Database
from meshchatx.src.backend.database.provider import DatabaseProvider
class _FakeIdentity:
__slots__ = ("_h",)
def __init__(self, identity_hex32: str):
self._h = bytes.fromhex(identity_hex32)
@property
def hash(self):
return self._h
def get_public_key(self):
return b"\xaa\xbb"
def _cleanup(db, path):
if db is not None:
try:
db.close()
except Exception:
pass
DatabaseProvider._instance = None
if path:
try:
os.unlink(path)
except OSError:
pass
for suffix in ("-wal", "-shm"):
try:
os.unlink(path + suffix)
except OSError:
pass
def _new_db():
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
path = f.name
db = Database(path)
db.initialize()
return db, path
def _store_enabled_config(**max_stored):
config = MagicMock()
for _k in (
"announce_store_lxmf_delivery",
"announce_store_lxst_telephony",
"announce_store_nomadnetwork_node",
"announce_store_lxmf_propagation",
"announce_store_git_repositories",
):
m = MagicMock()
m.get.return_value = True
setattr(config, _k, m)
for key, default in (
("announce_max_stored_lxmf_delivery", None),
("announce_max_stored_nomadnetwork_node", None),
("announce_max_stored_lxmf_propagation", None),
):
attr = MagicMock()
attr.get.return_value = max_stored.get(key, default)
setattr(config, key, attr)
return config
def _long_test_seconds() -> float:
raw = os.environ.get("MESHCHAT_LONG_TEST_SECONDS", "").strip()
if not raw:
return 0.0
try:
return float(raw)
except ValueError:
return 0.0
def _require_long_duration():
sec = _long_test_seconds()
if sec <= 0:
pytest.skip(
"Set MESHCHAT_LONG_TEST_SECONDS to a positive value "
"(e.g. 300 for 5 minutes, 600 for 10 minutes).",
)
return sec
def _bind_real_websocket_broadcast(app):
return ReticulumMeshChat.websocket_broadcast.__get__(app, ReticulumMeshChat)
class _MagicWs:
__slots__ = ("send_str",)
def __init__(self):
self.send_str = AsyncMock(return_value=None)
@pytest.mark.long_running
def test_soak_sqlite_announces_stay_bounded_and_quick_check():
duration_s = _require_long_duration()
cap = 96
batch_size = 120
qc_interval_batches = 15
db, path = _new_db()
try:
cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=cap)
mgr = AnnounceManager(db, cfg)
ret = MagicMock()
deadline = time.monotonic() + duration_s
seq = 0
batches = 0
while time.monotonic() < deadline:
for _ in range(batch_size):
mgr.upsert_announce(
ret,
_FakeIdentity(f"{seq % 2048:032x}"),
bytes.fromhex(f"{seq:032x}"),
"lxmf.delivery",
os.urandom(48),
None,
)
seq += 1
batches += 1
assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == cap
if batches % qc_interval_batches == 0:
qc = db.provider.quick_check()
assert qc
assert next(iter(qc[0].values())) == "ok"
assert seq > 0
finally:
_cleanup(db, path)
@pytest.mark.long_running
def test_soak_interleaved_aspects_under_cap():
duration_s = _require_long_duration()
cfg = _store_enabled_config(
announce_max_stored_lxmf_delivery=40,
announce_max_stored_nomadnetwork_node=25,
announce_max_stored_lxmf_propagation=30,
)
db, path = _new_db()
try:
mgr = AnnounceManager(db, cfg)
ret = MagicMock()
deadline = time.monotonic() + duration_s
n = 0
while time.monotonic() < deadline:
for aspect, payload in (
("lxmf.delivery", b"a"),
("nomadnetwork.node", b"b"),
("lxmf.propagation", b"c"),
):
mgr.upsert_announce(
ret,
_FakeIdentity(f"{n % 900:032x}"),
bytes.fromhex(f"{n:032x}"),
aspect,
payload,
None,
)
n += 1
assert db.announces.get_announce_count_by_aspect("lxmf.delivery") <= 40
assert db.announces.get_announce_count_by_aspect("nomadnetwork.node") <= 25
assert db.announces.get_announce_count_by_aspect("lxmf.propagation") <= 30
assert n > 0
assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == 40
assert db.announces.get_announce_count_by_aspect("nomadnetwork.node") == 25
assert db.announces.get_announce_count_by_aspect("lxmf.propagation") == 30
finally:
_cleanup(db, path)
@pytest.mark.long_running
@pytest.mark.asyncio
async def test_soak_websocket_broadcast_under_load(mock_app):
duration_s = _require_long_duration()
mock_app.websocket_clients.clear()
n_clients = min(int(os.environ.get("MESHCHAT_LONG_TEST_WS_CLIENTS", "400")), 2000)
clients = [_MagicWs() for _ in range(n_clients)]
mock_app.websocket_clients.extend(clients)
real = _bind_real_websocket_broadcast(mock_app)
deadline = time.monotonic() + duration_s
rounds = 0
while time.monotonic() < deadline:
payload = f'{{"type":"soak","round":{rounds}}}'
await real(payload)
for c in clients:
assert c.send_str.await_args[0][0] == payload
rounds += 1
assert rounds > 0
@pytest.mark.long_running
@pytest.mark.asyncio
async def test_soak_websocket_broadcast_with_churn(mock_app):
duration_s = _require_long_duration()
real = _bind_real_websocket_broadcast(mock_app)
deadline = time.monotonic() + duration_s
wave = 0
while time.monotonic() < deadline:
mock_app.websocket_clients.clear()
batch = [_MagicWs() for _ in range(80)]
if wave % 3 == 0:
for c in batch[:20]:
c.send_str = AsyncMock(side_effect=ConnectionError("closed"))
mock_app.websocket_clients.extend(batch)
payload = f'{{"wave":{wave}}}'
await real(payload)
for c in mock_app.websocket_clients:
assert c.send_str.await_args[0][0] == payload
wave += 1
assert wave > 0
+21
View File
@@ -85,6 +85,27 @@ def test_filter_announced_dicts_by_search_query_destination_hash_substring():
assert len(out) == 1
def test_filter_announced_dicts_empty_search_matches_all():
"""Empty substring matches every string in Python; callers should normalize UI input."""
items = [
{"display_name": "AAA"},
{"destination_hash": "0123abcd"},
{"identity_hash": "fedcba"},
]
out = filter_announced_dicts_by_search_query(items, "")
assert len(out) == 3
def test_filter_announced_dicts_whitespace_search_can_match():
items = [
{"display_name": " Hi "},
{"destination_hash": "99"},
]
out = filter_announced_dicts_by_search_query(items, " ")
assert len(out) == 1
assert out[0]["display_name"] == " Hi "
def test_filter_announced_dicts_by_search_query_case_insensitive():
items = [
{"display_name": "CamelCaseName", "destination_hash": "z" * 32},
+51
View File
@@ -109,6 +109,57 @@ async def test_websocket_broadcast_drops_dead_clients(mock_app):
assert good.send_str.await_count == 1
@pytest.mark.asyncio
async def test_websocket_broadcast_all_clients_dead_empties_list(mock_app):
mock_app.websocket_clients.clear()
clients = []
for _ in range(80):
c = MagicWs()
c.send_str = AsyncMock(side_effect=ConnectionError("closed"))
clients.append(c)
mock_app.websocket_clients.extend(clients)
real = _bind_real_websocket_broadcast(mock_app)
await real("{}")
assert mock_app.websocket_clients == []
@pytest.mark.asyncio
async def test_websocket_broadcast_fanout_large_client_pool(mock_app):
mock_app.websocket_clients.clear()
n = 1500
clients = [MagicWs() for _ in range(n)]
mock_app.websocket_clients.extend(clients)
real = _bind_real_websocket_broadcast(mock_app)
payload = '{"type":"metrics","x":1}'
await real(payload)
for c in clients:
assert c.send_str.await_count == 1
assert c.send_str.await_args[0][0] == payload
@pytest.mark.asyncio
async def test_websocket_broadcast_mixed_failures_still_delivers_to_healthy(mock_app):
mock_app.websocket_clients.clear()
failing = [MagicWs() for _ in range(40)]
for c in failing:
c.send_str = AsyncMock(side_effect=BrokenPipeError())
healthy = [MagicWs() for _ in range(60)]
mock_app.websocket_clients.extend(failing + healthy)
real = _bind_real_websocket_broadcast(mock_app)
await real('{"type":"ping"}')
for c in failing:
assert c not in mock_app.websocket_clients
assert mock_app.websocket_clients == healthy
for c in healthy:
assert c.send_str.await_count == 1
class MagicWs:
def __init__(self):
self.send_str = AsyncMock(return_value=None)