feat(tests): add comprehensive tests for access attempts, enforcement, and security paths, including database migration and backup safety

This commit is contained in:
Ivan
2026-03-31 06:06:48 +03:00
parent 06396a11f5
commit ea407d0143
7 changed files with 912 additions and 141 deletions
+140 -1
View File
@@ -1,9 +1,17 @@
import asyncio
import os
import tempfile
from unittest.mock import patch
from contextlib import ExitStack
from unittest.mock import MagicMock, patch
import pytest
import RNS
from meshchatx.meshchat import ReticulumMeshChat
from meshchatx.src.backend.config_manager import ConfigManager
from meshchatx.src.backend.database import Database
from meshchatx.src.backend.database.provider import DatabaseProvider
from meshchatx.src.backend.database.schema import DatabaseSchema
# Set log dir to a temporary directory for tests to avoid permission issues
# in restricted environments like sandboxes.
@@ -46,3 +54,134 @@ def cleanup_sqlite_connections():
import gc
gc.collect()
@pytest.fixture
def temp_db(tmp_path):
db_path = os.path.join(tmp_path, "test_meshchat.db")
yield db_path
if os.path.exists(db_path):
os.remove(db_path)
@pytest.fixture
def db(temp_db):
provider = DatabaseProvider(temp_db)
schema = DatabaseSchema(provider)
schema.initialize()
database = Database(temp_db)
yield database
database.close_all()
provider.close_all()
@pytest.fixture
def mock_app(db, tmp_path, temp_db):
real_identity_class = RNS.Identity
class MockIdentityClass(real_identity_class):
def __init__(self, *args, **kwargs):
self.hash = b"test_hash_32_bytes_long_01234567"
self.hexhash = self.hash.hex()
with ExitStack() as stack:
stack.enter_context(patch("RNS.Identity", MockIdentityClass))
stack.enter_context(patch("RNS.Reticulum"))
stack.enter_context(patch("RNS.Transport"))
stack.enter_context(patch("LXMF.LXMRouter"))
stack.enter_context(
patch("meshchatx.src.backend.identity_context.TelephoneManager"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.VoicemailManager"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.RingtoneManager"),
)
stack.enter_context(patch("meshchatx.src.backend.identity_context.RNCPHandler"))
stack.enter_context(
patch("meshchatx.src.backend.identity_context.RNStatusHandler"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.RNProbeHandler"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.TranslatorHandler"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.ArchiverManager"),
)
stack.enter_context(patch("meshchatx.src.backend.identity_context.MapManager"))
stack.enter_context(
patch("meshchatx.src.backend.identity_context.MessageHandler"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.AnnounceManager"),
)
stack.enter_context(patch("threading.Thread"))
mock_id = MockIdentityClass()
mock_id.get_private_key = MagicMock(return_value=b"test_private_key")
stack.enter_context(
patch.object(MockIdentityClass, "from_file", return_value=mock_id),
)
stack.enter_context(
patch.object(MockIdentityClass, "recall", return_value=mock_id),
)
stack.enter_context(
patch.object(MockIdentityClass, "from_bytes", return_value=mock_id),
)
stack.enter_context(
patch.object(
ReticulumMeshChat,
"announce_loop",
new=MagicMock(return_value=None),
),
)
stack.enter_context(
patch.object(
ReticulumMeshChat,
"announce_sync_propagation_nodes",
new=MagicMock(return_value=None),
),
)
stack.enter_context(
patch.object(
ReticulumMeshChat,
"crawler_loop",
new=MagicMock(return_value=None),
),
)
stack.enter_context(
patch.object(
ReticulumMeshChat,
"auto_backup_loop",
new=MagicMock(return_value=None),
),
)
stack.enter_context(
patch.object(
ReticulumMeshChat,
"send_config_to_websocket_clients",
return_value=None,
),
)
app = ReticulumMeshChat(
identity=mock_id,
storage_dir=str(tmp_path),
reticulum_config_dir=str(tmp_path),
)
# DatabaseProvider is a singleton; IdentityContext.setup() opens the identity DB
# and replaces the singleton. Recreate the test DB handle so config and DAOs use
# a live provider for the same path as the db fixture.
app.database = Database(temp_db)
app.current_context.config = ConfigManager(app.database)
app.websocket_broadcast = MagicMock(side_effect=lambda data: None)
yield app
app.teardown_identity()
+162
View File
@@ -0,0 +1,162 @@
"""Tests and property-based checks for AccessAttemptsDAO."""
from __future__ import annotations
import time
import pytest
from hypothesis import HealthCheck, assume, given, settings
from hypothesis import strategies as st
from meshchatx.src.backend.database.access_attempts import (
LOGIN_PATH,
SETUP_PATH,
AccessAttemptsDAO,
MAX_TRUSTED_FINGERPRINTS_PER_IDENTITY,
user_agent_hash,
)
def _id_hash() -> str:
return "746573745f686173685f33325f62797465735f6c6f6e675f3031323334353637"
@pytest.fixture
def dao(db) -> AccessAttemptsDAO:
return db.access_attempts
def test_user_agent_hash_deterministic():
assert user_agent_hash("Mozilla/5.0") == user_agent_hash("Mozilla/5.0")
assert user_agent_hash("") == user_agent_hash("")
@settings(max_examples=100, suppress_health_check=[HealthCheck.function_scoped_fixture])
@given(s=st.text(min_size=0, max_size=600))
def test_user_agent_hash_hypothesis_stable(dao, s):
h1 = user_agent_hash(s)
h2 = user_agent_hash(s)
assert h1 == h2
assert len(h1) == 64
def test_insert_list_roundtrip(dao):
ih = _id_hash()
dao.insert(ih, "10.0.0.1", "TestAgent/1", LOGIN_PATH, "POST", "success", None)
rows = dao.list_attempts(limit=10, offset=0)
assert len(rows) >= 1
top = rows[0]
assert top["identity_hash"] == ih
assert top["client_ip"] == "10.0.0.1"
assert top["user_agent"] == "TestAgent/1"
assert top["path"] == LOGIN_PATH
assert top["method"] == "POST"
assert top["outcome"] == "success"
def test_count_matches_list_for_no_filters(dao):
ih = _id_hash()
before = dao.count_attempts()
dao.insert(ih, "10.0.0.2", "a", LOGIN_PATH, "POST", "failed_password", None)
after = dao.count_attempts()
assert after == before + 1
def test_search_filters(dao):
ih = _id_hash()
dao.insert(
ih, "192.168.99.1", "UniqueSearchUA", LOGIN_PATH, "POST", "success", "detail-x"
)
found = dao.list_attempts(search="UniqueSearchUA", limit=50)
assert any(r["user_agent"] == "UniqueSearchUA" for r in found)
n = dao.count_attempts(search="UniqueSearchUA")
assert n >= 1
def test_outcome_filter(dao):
ih = _id_hash()
dao.insert(ih, "10.0.0.3", "b", SETUP_PATH, "POST", "invalid_json", None)
only = dao.list_attempts(outcome="invalid_json", limit=100)
assert all(r["outcome"] == "invalid_json" for r in only)
def test_is_trusted_and_upsert(dao):
ih = _id_hash()
ua = "TrustedClient/2"
h = user_agent_hash(ua)
assert dao.is_trusted(ih, "10.0.0.4", h) is False
dao.upsert_trusted(ih, "10.0.0.4", h)
assert dao.is_trusted(ih, "10.0.0.4", h) is True
def test_count_lockout_excludes_trusted_fingerprint(dao):
ih = _id_hash()
ip = "10.0.0.5"
ua_ok = "OkBrowser"
ua_bad = "BadBrowser"
h_ok = user_agent_hash(ua_ok)
dao.upsert_trusted(ih, ip, h_ok)
since = time.time() - 100
for _ in range(6):
dao.insert(ih, ip, ua_ok, LOGIN_PATH, "POST", "failed_password", None)
assert dao.count_lockout_failures(ih, ip, since) == 0
for _ in range(5):
dao.insert(ih, ip, ua_bad, LOGIN_PATH, "POST", "failed_password", None)
assert dao.count_lockout_failures(ih, ip, since) == 5
def test_prune_trusted_keeps_under_cap(dao):
ih = _id_hash()
ip = "10.0.0.6"
for i in range(MAX_TRUSTED_FINGERPRINTS_PER_IDENTITY + 5):
ua = f"Browser-{i}"
dao.upsert_trusted(ih, ip, user_agent_hash(ua))
row = dao.provider.fetchone(
"SELECT COUNT(*) AS c FROM trusted_login_clients WHERE identity_hash = ?",
(ih,),
)
assert int(row["c"]) == MAX_TRUSTED_FINGERPRINTS_PER_IDENTITY
def test_cleanup_old_removes_oldest_when_over_max(dao):
ih = _id_hash()
dao.cleanup_old(max_rows=3)
for i in range(5):
dao.insert(ih, f"10.0.1.{i}", "x", LOGIN_PATH, "POST", "success", None)
dao.cleanup_old(max_rows=3)
n = dao.count_attempts()
assert n == 3
@settings(max_examples=30, suppress_health_check=[HealthCheck.function_scoped_fixture])
@given(
ip=st.ip_addresses().map(str),
ua=st.text(
alphabet=st.characters(min_codepoint=32, max_codepoint=126),
min_size=0,
max_size=200,
),
outcome=st.sampled_from(
["success", "failed_password", "rate_limited", "invalid_json"],
),
)
def test_insert_list_hypothesis_invariants(dao, ip, ua, outcome):
assume(ip)
ih = _id_hash()
before_total = dao.count_attempts()
dao.insert(ih, ip, ua, LOGIN_PATH, "POST", outcome, "h")
after_total = dao.count_attempts()
assert after_total == before_total + 1
rows = dao.list_attempts(limit=500, offset=0, outcome=outcome)
assert any(r["client_ip"] == ip and r["outcome"] == outcome for r in rows)
def test_access_attempts_table_exists(db):
row = db.provider.fetchone(
"SELECT name FROM sqlite_master WHERE type='table' AND name='access_attempts'",
)
assert row is not None
row2 = db.provider.fetchone(
"SELECT name FROM sqlite_master WHERE type='table' AND name='trusted_login_clients'",
)
assert row2 is not None
@@ -0,0 +1,356 @@
"""Tests for login/setup rate limiting, lockout, and HTTP smoke paths."""
from __future__ import annotations
import json
import secrets
import time
from types import SimpleNamespace
from unittest.mock import patch
import bcrypt
import pytest
from aiohttp import web
from aiohttp.test_utils import TestClient, TestServer
from aiohttp_session import setup as setup_session
from hypothesis import HealthCheck, given, settings
from hypothesis import strategies as st
from meshchatx.meshchat import _request_client_ip
from meshchatx.src.backend.database.access_attempts import (
LOGIN_PATH,
MAX_FAILED_BEFORE_LOCKOUT,
MAX_TRUSTED_LOGIN_PER_WINDOW,
MAX_UNTRUSTED_LOGIN_PER_WINDOW,
WINDOW_LOCKOUT_S,
WINDOW_RATE_TRUSTED_S,
WINDOW_RATE_UNTRUSTED_S,
user_agent_hash,
)
def _make_req(
ip: str, ua: str, method: str = "POST", xff: str | None = None
) -> SimpleNamespace:
h = {"User-Agent": ua}
if xff is not None:
h["X-Forwarded-For"] = xff
return SimpleNamespace(headers=h, remote=ip, method=method)
def _id_hex(mock_app) -> str:
return mock_app.identity.hash.hex()
def test_request_client_ip_prefers_x_forwarded_for():
r = _make_req("10.0.0.1", "ua", xff="203.0.113.5, 10.0.0.2")
assert _request_client_ip(r) == "203.0.113.5"
def test_request_client_ip_falls_back_to_remote():
r = _make_req("198.51.100.2", "ua")
assert _request_client_ip(r) == "198.51.100.2"
def test_enforce_returns_none_when_no_database(mock_app):
db = mock_app.database
try:
mock_app.database = None
r = _make_req("127.0.0.1", "Mozilla/5.0")
assert mock_app._enforce_login_access(r, LOGIN_PATH) is None
finally:
mock_app.database = db
def test_untrusted_rate_limit_after_max_per_window(mock_app):
dao = mock_app.database.access_attempts
ih = _id_hex(mock_app)
ip = "192.0.2.10"
ua = "RateTest/1"
now = time.time()
ts = now - (WINDOW_RATE_UNTRUSTED_S / 2)
for _ in range(MAX_UNTRUSTED_LOGIN_PER_WINDOW):
dao.provider.execute(
"""
INSERT INTO access_attempts (
created_at, identity_hash, client_ip, user_agent, user_agent_hash,
path, method, outcome, detail
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
ts,
ih,
ip,
ua,
user_agent_hash(ua),
LOGIN_PATH,
"POST",
"success",
None,
),
)
r = _make_req(ip, ua)
resp = mock_app._enforce_login_access(r, LOGIN_PATH)
assert resp is not None
assert resp.status == 429
def test_lockout_when_enough_untrusted_failures(mock_app):
dao = mock_app.database.access_attempts
ih = _id_hex(mock_app)
ip = "192.0.2.20"
ua = "Attacker/1"
now = time.time()
ts = now - (WINDOW_LOCKOUT_S / 2)
for _ in range(MAX_FAILED_BEFORE_LOCKOUT):
dao.provider.execute(
"""
INSERT INTO access_attempts (
created_at, identity_hash, client_ip, user_agent, user_agent_hash,
path, method, outcome, detail
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
ts,
ih,
ip,
ua,
user_agent_hash(ua),
LOGIN_PATH,
"POST",
"failed_password",
None,
),
)
r = _make_req(ip, ua)
resp = mock_app._enforce_login_access(r, LOGIN_PATH)
assert resp is not None
assert resp.status == 429
body = json.loads(resp.body.decode())
err = body.get("error", "")
assert "failed" in err.lower() or "later" in err.lower()
def test_trusted_client_not_locked_out_by_own_failed_rows(mock_app):
dao = mock_app.database.access_attempts
ih = _id_hex(mock_app)
ip = "192.0.2.30"
ua = "GoodUser/1"
h = user_agent_hash(ua)
dao.upsert_trusted(ih, ip, h)
ts = time.time() - (WINDOW_LOCKOUT_S / 2)
for _ in range(10):
dao.provider.execute(
"""
INSERT INTO access_attempts (
created_at, identity_hash, client_ip, user_agent, user_agent_hash,
path, method, outcome, detail
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
ts,
ih,
ip,
ua,
h,
LOGIN_PATH,
"POST",
"failed_password",
None,
),
)
r = _make_req(ip, ua)
assert mock_app._enforce_login_access(r, LOGIN_PATH) is None
def test_trusted_rate_limit_after_max_trusted_window(mock_app):
dao = mock_app.database.access_attempts
ih = _id_hex(mock_app)
ip = "192.0.2.40"
ua = "HeavyTrusted/1"
h = user_agent_hash(ua)
dao.upsert_trusted(ih, ip, h)
now = time.time()
ts = now - (WINDOW_RATE_TRUSTED_S / 2)
for _ in range(MAX_TRUSTED_LOGIN_PER_WINDOW):
dao.provider.execute(
"""
INSERT INTO access_attempts (
created_at, identity_hash, client_ip, user_agent, user_agent_hash,
path, method, outcome, detail
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
ts,
ih,
ip,
ua,
h,
LOGIN_PATH,
"POST",
"success",
None,
),
)
r = _make_req(ip, ua)
resp = mock_app._enforce_login_access(r, LOGIN_PATH)
assert resp is not None
assert resp.status == 429
@settings(max_examples=25, suppress_health_check=[HealthCheck.function_scoped_fixture])
@given(
n=st.integers(min_value=0, max_value=MAX_UNTRUSTED_LOGIN_PER_WINDOW + 3),
)
def test_enforce_untrusted_monotone_hypothesis(mock_app, n):
dao = mock_app.database.access_attempts
ih = _id_hex(mock_app)
ip = "198.18.0.1"
ua = "Hyp/1"
dao.provider.execute("DELETE FROM access_attempts", ())
for _ in range(n):
dao.insert(ih, ip, ua, LOGIN_PATH, "POST", "success", None)
r = _make_req(ip, ua)
out = mock_app._enforce_login_access(r, LOGIN_PATH)
if n >= MAX_UNTRUSTED_LOGIN_PER_WINDOW:
assert out is not None
assert out.status == 429
else:
assert out is None
def _make_aio_app(mock_app, use_https: bool):
mock_app.session_secret_key = secrets.token_urlsafe(32)
routes = web.RouteTableDef()
auth_mw, mime_mw, sec_mw = mock_app._define_routes(routes)
aio_app = web.Application()
setup_session(aio_app, mock_app._encrypted_cookie_storage(use_https))
aio_app.middlewares.extend([auth_mw, mime_mw, sec_mw])
aio_app.add_routes(routes)
return aio_app
@pytest.mark.asyncio
async def test_login_records_access_attempt_and_debug_list(mock_app):
mock_app.config.auth_enabled.set(True)
pw = b"smoke-access-attempt-pw-ok"
mock_app.config.auth_password_hash.set(
bcrypt.hashpw(pw, bcrypt.gensalt()).decode("utf-8"),
)
aio_app = _make_aio_app(mock_app, use_https=False)
async with TestClient(TestServer(aio_app)) as client:
bad = await client.post("/api/v1/auth/login", json={"password": "wrong"})
assert bad.status == 401
good = await client.post(
"/api/v1/auth/login",
json={"password": pw.decode("utf-8")},
)
assert good.status == 200
dao = mock_app.database.access_attempts
rows = dao.list_attempts(limit=20, outcome="failed_password")
assert any(r["outcome"] == "failed_password" for r in rows)
rows_ok = dao.list_attempts(limit=20, outcome="success")
assert any(r["outcome"] == "success" for r in rows_ok)
total = dao.count_attempts()
assert total >= 2
@pytest.mark.asyncio
async def test_lockout_login_returns_429_smoke(mock_app):
mock_app.config.auth_enabled.set(True)
pw = b"lockout-smoke-pw"
mock_app.config.auth_password_hash.set(
bcrypt.hashpw(pw, bcrypt.gensalt()).decode("utf-8"),
)
aio_app = _make_aio_app(mock_app, use_https=False)
async with TestClient(TestServer(aio_app)) as client:
ip = "192.0.2.88"
with patch(
"meshchatx.meshchat._request_client_ip",
return_value=ip,
):
for _ in range(MAX_FAILED_BEFORE_LOCKOUT):
r = await client.post(
"/api/v1/auth/login",
json={"password": "nope"},
headers={"User-Agent": "SmokeLock/1"},
)
assert r.status == 401
r429 = await client.post(
"/api/v1/auth/login",
json={"password": "nope"},
headers={"User-Agent": "SmokeLock/1"},
)
assert r429.status == 429
@pytest.mark.asyncio
async def test_rate_limited_login_returns_429_smoke(mock_app):
mock_app.config.auth_enabled.set(True)
pw = b"rl-smoke-pw"
mock_app.config.auth_password_hash.set(
bcrypt.hashpw(pw, bcrypt.gensalt()).decode("utf-8"),
)
dao = mock_app.database.access_attempts
ih = _id_hex(mock_app)
ip = "192.0.2.99"
ua = "SmokeRL/1"
now = time.time()
ts = now - (WINDOW_RATE_UNTRUSTED_S / 2)
for _ in range(MAX_UNTRUSTED_LOGIN_PER_WINDOW):
dao.provider.execute(
"""
INSERT INTO access_attempts (
created_at, identity_hash, client_ip, user_agent, user_agent_hash,
path, method, outcome, detail
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
ts,
ih,
ip,
ua,
user_agent_hash(ua),
LOGIN_PATH,
"POST",
"success",
None,
),
)
aio_app = _make_aio_app(mock_app, use_https=False)
async with TestClient(TestServer(aio_app)) as client:
with patch(
"meshchatx.meshchat._request_client_ip",
return_value=ip,
):
r429 = await client.post(
"/api/v1/auth/login",
json={"password": "nope"},
headers={"User-Agent": ua},
)
assert r429.status == 429
@pytest.mark.asyncio
async def test_debug_access_attempts_endpoint_returns_shape(mock_app):
mock_app.config.auth_enabled.set(False)
aio_app = _make_aio_app(mock_app, use_https=False)
async with TestClient(TestServer(aio_app)) as client:
r = await client.get("/api/v1/debug/access-attempts?limit=5&offset=0")
assert r.status == 200
data = await r.json()
assert "attempts" in data
assert "total" in data
assert "limit" in data
assert "offset" in data
assert isinstance(data["attempts"], list)
+147
View File
@@ -0,0 +1,147 @@
import asyncio
import secrets
import bcrypt
import pytest
from aiohttp import web
from aiohttp.test_utils import TestClient, TestServer
from aiohttp_session import setup as setup_session
from hypothesis import HealthCheck, given, settings
from hypothesis import strategies as st
from meshchatx.src.backend.config_manager import ConfigManager
def _make_aio_app(mock_app, use_https: bool):
mock_app.session_secret_key = secrets.token_urlsafe(32)
routes = web.RouteTableDef()
auth_mw, mime_mw, sec_mw = mock_app._define_routes(routes)
aio_app = web.Application()
setup_session(aio_app, mock_app._encrypted_cookie_storage(use_https))
aio_app.middlewares.extend([auth_mw, mime_mw, sec_mw])
aio_app.add_routes(routes)
return aio_app
def test_config_password_hash_roundtrip(mock_app):
mock_app.config.auth_password_hash.set("roundtrip-test-hash")
assert mock_app.config.auth_password_hash.get() == "roundtrip-test-hash"
def test_config_bcrypt_hash_roundtrip(mock_app):
mock_app.config.auth_enabled.set(True)
h = bcrypt.hashpw(b"pw", bcrypt.gensalt()).decode("utf-8")
mock_app.config.auth_password_hash.set(h)
assert mock_app.config.auth_password_hash.get() == h
def test_encrypted_cookie_storage_https_flags(mock_app):
mock_app.session_secret_key = secrets.token_urlsafe(32)
storage = mock_app._encrypted_cookie_storage(True)
assert storage.cookie_params["secure"] is True
assert storage.cookie_params["httponly"] is True
assert storage.cookie_params["samesite"] == "Lax"
def test_encrypted_cookie_storage_http_flags(mock_app):
mock_app.session_secret_key = secrets.token_urlsafe(32)
storage = mock_app._encrypted_cookie_storage(False)
assert storage.cookie_params["secure"] is False
@pytest.mark.asyncio
async def test_login_sets_cookie_and_allows_protected_api(mock_app):
mock_app.config.auth_enabled.set(True)
pw = b"integration-test-password-ok"
stored_hash = bcrypt.hashpw(pw, bcrypt.gensalt()).decode("utf-8")
mock_app.config.auth_password_hash.set(stored_hash)
assert isinstance(mock_app.current_context.config, ConfigManager)
assert mock_app.config.auth_password_hash.get() == stored_hash
aio_app = _make_aio_app(mock_app, use_https=False)
async with TestClient(TestServer(aio_app)) as client:
login = await client.post(
"/api/v1/auth/login",
json={"password": pw.decode("utf-8")},
)
assert login.status == 200
set_cookie = login.headers.get("Set-Cookie", "")
assert "HttpOnly" in set_cookie
assert "SameSite=Lax" in set_cookie
assert "Secure" not in set_cookie
backups = await client.get("/api/v1/database/backups")
assert backups.status == 200
body = await backups.json()
assert "backups" in body
@pytest.mark.asyncio
async def test_ws_returns_401_without_session_when_auth_enabled(mock_app):
mock_app.config.auth_enabled.set(True)
mock_app.config.auth_password_hash.set(
bcrypt.hashpw(b"x", bcrypt.gensalt()).decode("utf-8"),
)
aio_app = _make_aio_app(mock_app, use_https=False)
async with TestClient(TestServer(aio_app)) as client:
r = await client.get("/ws")
assert r.status == 401
@pytest.mark.asyncio
async def test_logout_clears_session_for_protected_api(mock_app):
mock_app.config.auth_enabled.set(True)
pw = b"logout-test-password-ok"
mock_app.config.auth_password_hash.set(
bcrypt.hashpw(pw, bcrypt.gensalt()).decode("utf-8"),
)
aio_app = _make_aio_app(mock_app, use_https=False)
async with TestClient(TestServer(aio_app)) as client:
await client.post("/api/v1/auth/login", json={"password": pw.decode("utf-8")})
assert (await client.get("/api/v1/database/backups")).status == 200
out = await client.post("/api/v1/auth/logout")
assert out.status == 200
assert (await client.get("/api/v1/database/backups")).status == 401
@pytest.mark.asyncio
async def test_auth_login_invalid_json_returns_400(mock_app):
mock_app.config.auth_enabled.set(True)
mock_app.config.auth_password_hash.set(
bcrypt.hashpw(b"x", bcrypt.gensalt()).decode("utf-8"),
)
aio_app = _make_aio_app(mock_app, use_https=False)
async with TestClient(TestServer(aio_app)) as client:
r = await client.post(
"/api/v1/auth/login",
data="{not-json",
headers={"Content-Type": "application/json"},
)
assert r.status == 400
body = await r.json()
assert "error" in body
@settings(max_examples=40, suppress_health_check=[HealthCheck.function_scoped_fixture])
@given(body=st.binary(min_size=0, max_size=12000))
def test_auth_login_fuzz_never_500(mock_app, body):
mock_app.config.auth_enabled.set(True)
mock_app.config.auth_password_hash.set(
bcrypt.hashpw(b"fixed", bcrypt.gensalt()).decode("utf-8"),
)
aio_app = _make_aio_app(mock_app, use_https=False)
async def run():
async with TestClient(TestServer(aio_app)) as client:
r = await client.post(
"/api/v1/auth/login",
data=body,
headers={"Content-Type": "application/json"},
)
assert r.status != 500
asyncio.run(run())
+23 -140
View File
@@ -1,149 +1,12 @@
import os
import time
from contextlib import ExitStack
from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import RNS
from aiohttp import web
from aiohttp.test_utils import TestClient, TestServer
from hypothesis import HealthCheck, given, settings
from hypothesis import strategies as st
from meshchatx.meshchat import ReticulumMeshChat
from meshchatx.src.backend.database import Database
from meshchatx.src.backend.database.provider import DatabaseProvider
from meshchatx.src.backend.database.schema import DatabaseSchema
@pytest.fixture
def temp_db(tmp_path):
db_path = os.path.join(tmp_path, "test_notifications.db")
yield db_path
if os.path.exists(db_path):
os.remove(db_path)
@pytest.fixture
def db(temp_db):
provider = DatabaseProvider(temp_db)
schema = DatabaseSchema(provider)
schema.initialize()
database = Database(temp_db)
yield database
database.close_all()
provider.close_all()
@pytest.fixture
def mock_app(db, tmp_path):
# Save real Identity class to use as base for our mock class
real_identity_class = RNS.Identity
class MockIdentityClass(real_identity_class):
def __init__(self, *args, **kwargs):
self.hash = b"test_hash_32_bytes_long_01234567"
self.hexhash = self.hash.hex()
with ExitStack() as stack:
stack.enter_context(patch("RNS.Identity", MockIdentityClass))
stack.enter_context(patch("RNS.Reticulum"))
stack.enter_context(patch("RNS.Transport"))
stack.enter_context(patch("LXMF.LXMRouter"))
stack.enter_context(
patch("meshchatx.src.backend.identity_context.TelephoneManager"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.VoicemailManager"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.RingtoneManager"),
)
stack.enter_context(patch("meshchatx.src.backend.identity_context.RNCPHandler"))
stack.enter_context(
patch("meshchatx.src.backend.identity_context.RNStatusHandler"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.RNProbeHandler"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.TranslatorHandler"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.ArchiverManager"),
)
stack.enter_context(patch("meshchatx.src.backend.identity_context.MapManager"))
stack.enter_context(
patch("meshchatx.src.backend.identity_context.MessageHandler"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.AnnounceManager"),
)
stack.enter_context(patch("threading.Thread"))
mock_id = MockIdentityClass()
mock_id.get_private_key = MagicMock(return_value=b"test_private_key")
stack.enter_context(
patch.object(MockIdentityClass, "from_file", return_value=mock_id),
)
stack.enter_context(
patch.object(MockIdentityClass, "recall", return_value=mock_id),
)
stack.enter_context(
patch.object(MockIdentityClass, "from_bytes", return_value=mock_id),
)
# Patch background threads and other heavy init
stack.enter_context(
patch.object(
ReticulumMeshChat,
"announce_loop",
new=MagicMock(return_value=None),
),
)
stack.enter_context(
patch.object(
ReticulumMeshChat,
"announce_sync_propagation_nodes",
new=MagicMock(return_value=None),
),
)
stack.enter_context(
patch.object(
ReticulumMeshChat,
"crawler_loop",
new=MagicMock(return_value=None),
),
)
stack.enter_context(
patch.object(
ReticulumMeshChat,
"auto_backup_loop",
new=MagicMock(return_value=None),
),
)
# Prevent JSON serialization issues with MagicMocks
stack.enter_context(
patch.object(
ReticulumMeshChat,
"send_config_to_websocket_clients",
return_value=None,
),
)
app = ReticulumMeshChat(
identity=mock_id,
storage_dir=str(tmp_path),
reticulum_config_dir=str(tmp_path),
)
# Use our real test database
app.database = db
app.websocket_broadcast = MagicMock(side_effect=lambda data: None)
yield app
app.teardown_identity()
def test_add_get_notifications(db):
"""Test basic notification storage and retrieval."""
@@ -578,3 +441,23 @@ def test_random_notification_operations(db, operations):
count = db.misc.get_unread_notification_count()
assert count >= 0
@pytest.mark.asyncio
async def test_auth_middleware_returns_401_for_api_without_session_when_auth_enabled(
mock_app,
):
app = mock_app
app.current_context = MagicMock(running=True)
app.config.auth_enabled.set(True)
routes = web.RouteTableDef()
auth_mw, mime_mw, sec_mw = app._define_routes(routes)
aio_app = web.Application(middlewares=[auth_mw, mime_mw, sec_mw])
aio_app.add_routes(routes)
async with TestClient(TestServer(aio_app)) as client:
with patch("meshchatx.meshchat.get_session", new_callable=AsyncMock) as gs:
gs.return_value = {}
resp = await client.get("/api/v1/config")
assert resp.status == 401
@@ -0,0 +1,62 @@
"""Upgrade path: simulate database at version N-1 and assert migration to LATEST_VERSION."""
import pytest
from meshchatx.src.backend.database.provider import DatabaseProvider
from meshchatx.src.backend.database.schema import DatabaseSchema
def _column_names(provider, table: str) -> set[str]:
cur = provider.connection.cursor()
try:
cur.execute(f"PRAGMA table_info({table})") # noqa: S608
return {row[1] for row in cur.fetchall()}
finally:
cur.close()
@pytest.mark.skipif(
DatabaseSchema.LATEST_VERSION < 41,
reason="Test targets migration block for version 41",
)
def test_migrate_from_version_40_restores_attachments_stripped(tmp_path):
"""Version 41 adds lxmf_messages.attachments_stripped; re-apply from 40."""
db_path = tmp_path / "mig.db"
provider = DatabaseProvider(str(db_path))
schema = DatabaseSchema(provider)
schema.initialize()
assert (
int(
provider.fetchone(
"SELECT value FROM config WHERE key = ?",
("database_version",),
)["value"],
)
== DatabaseSchema.LATEST_VERSION
)
provider.execute(
"UPDATE config SET value = ? WHERE key = ?",
("40", "database_version"),
)
try:
provider.execute("ALTER TABLE lxmf_messages DROP COLUMN attachments_stripped")
except Exception as exc:
pytest.skip(f"SQLite DROP COLUMN not available or failed: {exc}")
assert "attachments_stripped" not in _column_names(provider, "lxmf_messages")
schema.migrate(schema._get_current_version())
assert "attachments_stripped" in _column_names(provider, "lxmf_messages")
assert (
int(
provider.fetchone(
"SELECT value FROM config WHERE key = ?",
("database_version",),
)["value"],
)
== DatabaseSchema.LATEST_VERSION
)
provider.close_all()
@@ -0,0 +1,22 @@
"""Path safety: backup/snapshot delete must not escape storage directories."""
import os
import pytest
from meshchatx.src.backend.database import Database
def test_delete_database_backup_rejects_path_outside_storage(tmp_path):
db_path = tmp_path / "t.db"
db = Database(str(db_path))
db.initialize()
storage = str(tmp_path)
os.makedirs(os.path.join(storage, "database-backups"), exist_ok=True)
with pytest.raises(ValueError, match="Invalid path"):
db.delete_snapshot_or_backup(
storage,
"../../../etc/passwd",
is_backup=True,
)
db.close_all()