Files
MeshChatX/tests/backend/test_long_running_stress.py
T

248 lines
7.3 KiB
Python

# SPDX-License-Identifier: 0BSD
"""Multi-minute soak tests (announce DB + websocket fan-out).
These are **opt-in**: unset ``MESHCHAT_LONG_TEST_SECONDS`` skips them immediately.
Examples::
MESHCHAT_LONG_TEST_SECONDS=300 uv run pytest tests/backend/test_long_running_stress.py -m long_running -v
MESHCHAT_LONG_TEST_SECONDS=600 uv run pytest tests/backend/test_long_running_stress.py -m long_running -v
Quick smoke (seconds)::
MESHCHAT_LONG_TEST_SECONDS=5 uv run pytest tests/backend/test_long_running_stress.py -m long_running -v
"""
from __future__ import annotations
import os
import tempfile
import time
from unittest.mock import AsyncMock, MagicMock
import pytest
from meshchatx.meshchat import ReticulumMeshChat
from meshchatx.src.backend.announce_manager import AnnounceManager
from meshchatx.src.backend.database import Database
from meshchatx.src.backend.database.provider import DatabaseProvider
class _FakeIdentity:
__slots__ = ("_h",)
def __init__(self, identity_hex32: str):
self._h = bytes.fromhex(identity_hex32)
@property
def hash(self):
return self._h
def get_public_key(self):
return b"\xaa\xbb"
def _cleanup(db, path):
if db is not None:
try:
db.close()
except Exception:
pass
DatabaseProvider._instance = None
if path:
try:
os.unlink(path)
except OSError:
pass
for suffix in ("-wal", "-shm"):
try:
os.unlink(path + suffix)
except OSError:
pass
def _new_db():
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
path = f.name
db = Database(path)
db.initialize()
return db, path
def _store_enabled_config(**max_stored):
config = MagicMock()
for _k in (
"announce_store_lxmf_delivery",
"announce_store_lxst_telephony",
"announce_store_nomadnetwork_node",
"announce_store_lxmf_propagation",
"announce_store_git_repositories",
):
m = MagicMock()
m.get.return_value = True
setattr(config, _k, m)
for key, default in (
("announce_max_stored_lxmf_delivery", None),
("announce_max_stored_nomadnetwork_node", None),
("announce_max_stored_lxmf_propagation", None),
):
attr = MagicMock()
attr.get.return_value = max_stored.get(key, default)
setattr(config, key, attr)
return config
def _long_test_seconds() -> float:
raw = os.environ.get("MESHCHAT_LONG_TEST_SECONDS", "").strip()
if not raw:
return 0.0
try:
return float(raw)
except ValueError:
return 0.0
def _require_long_duration():
sec = _long_test_seconds()
if sec <= 0:
pytest.skip(
"Set MESHCHAT_LONG_TEST_SECONDS to a positive value "
"(e.g. 300 for 5 minutes, 600 for 10 minutes).",
)
return sec
def _bind_real_websocket_broadcast(app):
return ReticulumMeshChat.websocket_broadcast.__get__(app, ReticulumMeshChat)
class _MagicWs:
__slots__ = ("send_str",)
def __init__(self):
self.send_str = AsyncMock(return_value=None)
@pytest.mark.long_running
def test_soak_sqlite_announces_stay_bounded_and_quick_check():
duration_s = _require_long_duration()
cap = 96
batch_size = 120
qc_interval_batches = 15
db, path = _new_db()
try:
cfg = _store_enabled_config(announce_max_stored_lxmf_delivery=cap)
mgr = AnnounceManager(db, cfg)
ret = MagicMock()
deadline = time.monotonic() + duration_s
seq = 0
batches = 0
while time.monotonic() < deadline:
for _ in range(batch_size):
mgr.upsert_announce(
ret,
_FakeIdentity(f"{seq % 2048:032x}"),
bytes.fromhex(f"{seq:032x}"),
"lxmf.delivery",
os.urandom(48),
None,
)
seq += 1
batches += 1
assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == cap
if batches % qc_interval_batches == 0:
qc = db.provider.quick_check()
assert qc
assert next(iter(qc[0].values())) == "ok"
assert seq > 0
finally:
_cleanup(db, path)
@pytest.mark.long_running
def test_soak_interleaved_aspects_under_cap():
duration_s = _require_long_duration()
cfg = _store_enabled_config(
announce_max_stored_lxmf_delivery=40,
announce_max_stored_nomadnetwork_node=25,
announce_max_stored_lxmf_propagation=30,
)
db, path = _new_db()
try:
mgr = AnnounceManager(db, cfg)
ret = MagicMock()
deadline = time.monotonic() + duration_s
n = 0
while time.monotonic() < deadline:
for aspect, payload in (
("lxmf.delivery", b"a"),
("nomadnetwork.node", b"b"),
("lxmf.propagation", b"c"),
):
mgr.upsert_announce(
ret,
_FakeIdentity(f"{n % 900:032x}"),
bytes.fromhex(f"{n:032x}"),
aspect,
payload,
None,
)
n += 1
assert db.announces.get_announce_count_by_aspect("lxmf.delivery") <= 40
assert db.announces.get_announce_count_by_aspect("nomadnetwork.node") <= 25
assert db.announces.get_announce_count_by_aspect("lxmf.propagation") <= 30
assert n > 0
assert db.announces.get_announce_count_by_aspect("lxmf.delivery") == 40
assert db.announces.get_announce_count_by_aspect("nomadnetwork.node") == 25
assert db.announces.get_announce_count_by_aspect("lxmf.propagation") == 30
finally:
_cleanup(db, path)
@pytest.mark.long_running
@pytest.mark.asyncio
async def test_soak_websocket_broadcast_under_load(mock_app):
duration_s = _require_long_duration()
mock_app.websocket_clients.clear()
n_clients = min(int(os.environ.get("MESHCHAT_LONG_TEST_WS_CLIENTS", "400")), 2000)
clients = [_MagicWs() for _ in range(n_clients)]
mock_app.websocket_clients.extend(clients)
real = _bind_real_websocket_broadcast(mock_app)
deadline = time.monotonic() + duration_s
rounds = 0
while time.monotonic() < deadline:
payload = f'{{"type":"soak","round":{rounds}}}'
await real(payload)
for c in clients:
assert c.send_str.await_args[0][0] == payload
rounds += 1
assert rounds > 0
@pytest.mark.long_running
@pytest.mark.asyncio
async def test_soak_websocket_broadcast_with_churn(mock_app):
duration_s = _require_long_duration()
real = _bind_real_websocket_broadcast(mock_app)
deadline = time.monotonic() + duration_s
wave = 0
while time.monotonic() < deadline:
mock_app.websocket_clients.clear()
batch = [_MagicWs() for _ in range(80)]
if wave % 3 == 0:
for c in batch[:20]:
c.send_str = AsyncMock(side_effect=ConnectionError("closed"))
mock_app.websocket_clients.extend(batch)
payload = f'{{"wave":{wave}}}'
await real(payload)
for c in mock_app.websocket_clients:
assert c.send_str.await_args[0][0] == payload
wave += 1
assert wave > 0