diff --git a/tests/backend/conftest.py b/tests/backend/conftest.py index fa28b61..9eebfcd 100644 --- a/tests/backend/conftest.py +++ b/tests/backend/conftest.py @@ -1,9 +1,17 @@ import asyncio import os import tempfile -from unittest.mock import patch +from contextlib import ExitStack +from unittest.mock import MagicMock, patch import pytest +import RNS + +from meshchatx.meshchat import ReticulumMeshChat +from meshchatx.src.backend.config_manager import ConfigManager +from meshchatx.src.backend.database import Database +from meshchatx.src.backend.database.provider import DatabaseProvider +from meshchatx.src.backend.database.schema import DatabaseSchema # Set log dir to a temporary directory for tests to avoid permission issues # in restricted environments like sandboxes. @@ -46,3 +54,134 @@ def cleanup_sqlite_connections(): import gc gc.collect() + + +@pytest.fixture +def temp_db(tmp_path): + db_path = os.path.join(tmp_path, "test_meshchat.db") + yield db_path + if os.path.exists(db_path): + os.remove(db_path) + + +@pytest.fixture +def db(temp_db): + provider = DatabaseProvider(temp_db) + schema = DatabaseSchema(provider) + schema.initialize() + database = Database(temp_db) + yield database + database.close_all() + provider.close_all() + + +@pytest.fixture +def mock_app(db, tmp_path, temp_db): + real_identity_class = RNS.Identity + + class MockIdentityClass(real_identity_class): + def __init__(self, *args, **kwargs): + self.hash = b"test_hash_32_bytes_long_01234567" + self.hexhash = self.hash.hex() + + with ExitStack() as stack: + stack.enter_context(patch("RNS.Identity", MockIdentityClass)) + stack.enter_context(patch("RNS.Reticulum")) + stack.enter_context(patch("RNS.Transport")) + stack.enter_context(patch("LXMF.LXMRouter")) + stack.enter_context( + patch("meshchatx.src.backend.identity_context.TelephoneManager"), + ) + stack.enter_context( + patch("meshchatx.src.backend.identity_context.VoicemailManager"), + ) + stack.enter_context( + patch("meshchatx.src.backend.identity_context.RingtoneManager"), + ) + stack.enter_context(patch("meshchatx.src.backend.identity_context.RNCPHandler")) + stack.enter_context( + patch("meshchatx.src.backend.identity_context.RNStatusHandler"), + ) + stack.enter_context( + patch("meshchatx.src.backend.identity_context.RNProbeHandler"), + ) + stack.enter_context( + patch("meshchatx.src.backend.identity_context.TranslatorHandler"), + ) + stack.enter_context( + patch("meshchatx.src.backend.identity_context.ArchiverManager"), + ) + stack.enter_context(patch("meshchatx.src.backend.identity_context.MapManager")) + stack.enter_context( + patch("meshchatx.src.backend.identity_context.MessageHandler"), + ) + stack.enter_context( + patch("meshchatx.src.backend.identity_context.AnnounceManager"), + ) + stack.enter_context(patch("threading.Thread")) + + mock_id = MockIdentityClass() + mock_id.get_private_key = MagicMock(return_value=b"test_private_key") + + stack.enter_context( + patch.object(MockIdentityClass, "from_file", return_value=mock_id), + ) + stack.enter_context( + patch.object(MockIdentityClass, "recall", return_value=mock_id), + ) + stack.enter_context( + patch.object(MockIdentityClass, "from_bytes", return_value=mock_id), + ) + + stack.enter_context( + patch.object( + ReticulumMeshChat, + "announce_loop", + new=MagicMock(return_value=None), + ), + ) + stack.enter_context( + patch.object( + ReticulumMeshChat, + "announce_sync_propagation_nodes", + new=MagicMock(return_value=None), + ), + ) + stack.enter_context( + patch.object( + ReticulumMeshChat, + "crawler_loop", + new=MagicMock(return_value=None), + ), + ) + + stack.enter_context( + patch.object( + ReticulumMeshChat, + "auto_backup_loop", + new=MagicMock(return_value=None), + ), + ) + stack.enter_context( + patch.object( + ReticulumMeshChat, + "send_config_to_websocket_clients", + return_value=None, + ), + ) + + app = ReticulumMeshChat( + identity=mock_id, + storage_dir=str(tmp_path), + reticulum_config_dir=str(tmp_path), + ) + + # DatabaseProvider is a singleton; IdentityContext.setup() opens the identity DB + # and replaces the singleton. Recreate the test DB handle so config and DAOs use + # a live provider for the same path as the db fixture. + app.database = Database(temp_db) + app.current_context.config = ConfigManager(app.database) + app.websocket_broadcast = MagicMock(side_effect=lambda data: None) + + yield app + app.teardown_identity() diff --git a/tests/backend/test_access_attempts_dao.py b/tests/backend/test_access_attempts_dao.py new file mode 100644 index 0000000..d320200 --- /dev/null +++ b/tests/backend/test_access_attempts_dao.py @@ -0,0 +1,162 @@ +"""Tests and property-based checks for AccessAttemptsDAO.""" + +from __future__ import annotations + +import time + +import pytest +from hypothesis import HealthCheck, assume, given, settings +from hypothesis import strategies as st + +from meshchatx.src.backend.database.access_attempts import ( + LOGIN_PATH, + SETUP_PATH, + AccessAttemptsDAO, + MAX_TRUSTED_FINGERPRINTS_PER_IDENTITY, + user_agent_hash, +) + + +def _id_hash() -> str: + return "746573745f686173685f33325f62797465735f6c6f6e675f3031323334353637" + + +@pytest.fixture +def dao(db) -> AccessAttemptsDAO: + return db.access_attempts + + +def test_user_agent_hash_deterministic(): + assert user_agent_hash("Mozilla/5.0") == user_agent_hash("Mozilla/5.0") + assert user_agent_hash("") == user_agent_hash("") + + +@settings(max_examples=100, suppress_health_check=[HealthCheck.function_scoped_fixture]) +@given(s=st.text(min_size=0, max_size=600)) +def test_user_agent_hash_hypothesis_stable(dao, s): + h1 = user_agent_hash(s) + h2 = user_agent_hash(s) + assert h1 == h2 + assert len(h1) == 64 + + +def test_insert_list_roundtrip(dao): + ih = _id_hash() + dao.insert(ih, "10.0.0.1", "TestAgent/1", LOGIN_PATH, "POST", "success", None) + rows = dao.list_attempts(limit=10, offset=0) + assert len(rows) >= 1 + top = rows[0] + assert top["identity_hash"] == ih + assert top["client_ip"] == "10.0.0.1" + assert top["user_agent"] == "TestAgent/1" + assert top["path"] == LOGIN_PATH + assert top["method"] == "POST" + assert top["outcome"] == "success" + + +def test_count_matches_list_for_no_filters(dao): + ih = _id_hash() + before = dao.count_attempts() + dao.insert(ih, "10.0.0.2", "a", LOGIN_PATH, "POST", "failed_password", None) + after = dao.count_attempts() + assert after == before + 1 + + +def test_search_filters(dao): + ih = _id_hash() + dao.insert( + ih, "192.168.99.1", "UniqueSearchUA", LOGIN_PATH, "POST", "success", "detail-x" + ) + found = dao.list_attempts(search="UniqueSearchUA", limit=50) + assert any(r["user_agent"] == "UniqueSearchUA" for r in found) + n = dao.count_attempts(search="UniqueSearchUA") + assert n >= 1 + + +def test_outcome_filter(dao): + ih = _id_hash() + dao.insert(ih, "10.0.0.3", "b", SETUP_PATH, "POST", "invalid_json", None) + only = dao.list_attempts(outcome="invalid_json", limit=100) + assert all(r["outcome"] == "invalid_json" for r in only) + + +def test_is_trusted_and_upsert(dao): + ih = _id_hash() + ua = "TrustedClient/2" + h = user_agent_hash(ua) + assert dao.is_trusted(ih, "10.0.0.4", h) is False + dao.upsert_trusted(ih, "10.0.0.4", h) + assert dao.is_trusted(ih, "10.0.0.4", h) is True + + +def test_count_lockout_excludes_trusted_fingerprint(dao): + ih = _id_hash() + ip = "10.0.0.5" + ua_ok = "OkBrowser" + ua_bad = "BadBrowser" + h_ok = user_agent_hash(ua_ok) + dao.upsert_trusted(ih, ip, h_ok) + since = time.time() - 100 + for _ in range(6): + dao.insert(ih, ip, ua_ok, LOGIN_PATH, "POST", "failed_password", None) + assert dao.count_lockout_failures(ih, ip, since) == 0 + for _ in range(5): + dao.insert(ih, ip, ua_bad, LOGIN_PATH, "POST", "failed_password", None) + assert dao.count_lockout_failures(ih, ip, since) == 5 + + +def test_prune_trusted_keeps_under_cap(dao): + ih = _id_hash() + ip = "10.0.0.6" + for i in range(MAX_TRUSTED_FINGERPRINTS_PER_IDENTITY + 5): + ua = f"Browser-{i}" + dao.upsert_trusted(ih, ip, user_agent_hash(ua)) + row = dao.provider.fetchone( + "SELECT COUNT(*) AS c FROM trusted_login_clients WHERE identity_hash = ?", + (ih,), + ) + assert int(row["c"]) == MAX_TRUSTED_FINGERPRINTS_PER_IDENTITY + + +def test_cleanup_old_removes_oldest_when_over_max(dao): + ih = _id_hash() + dao.cleanup_old(max_rows=3) + for i in range(5): + dao.insert(ih, f"10.0.1.{i}", "x", LOGIN_PATH, "POST", "success", None) + dao.cleanup_old(max_rows=3) + n = dao.count_attempts() + assert n == 3 + + +@settings(max_examples=30, suppress_health_check=[HealthCheck.function_scoped_fixture]) +@given( + ip=st.ip_addresses().map(str), + ua=st.text( + alphabet=st.characters(min_codepoint=32, max_codepoint=126), + min_size=0, + max_size=200, + ), + outcome=st.sampled_from( + ["success", "failed_password", "rate_limited", "invalid_json"], + ), +) +def test_insert_list_hypothesis_invariants(dao, ip, ua, outcome): + assume(ip) + ih = _id_hash() + before_total = dao.count_attempts() + dao.insert(ih, ip, ua, LOGIN_PATH, "POST", outcome, "h") + after_total = dao.count_attempts() + assert after_total == before_total + 1 + rows = dao.list_attempts(limit=500, offset=0, outcome=outcome) + assert any(r["client_ip"] == ip and r["outcome"] == outcome for r in rows) + + +def test_access_attempts_table_exists(db): + row = db.provider.fetchone( + "SELECT name FROM sqlite_master WHERE type='table' AND name='access_attempts'", + ) + assert row is not None + row2 = db.provider.fetchone( + "SELECT name FROM sqlite_master WHERE type='table' AND name='trusted_login_clients'", + ) + assert row2 is not None diff --git a/tests/backend/test_access_attempts_enforcement.py b/tests/backend/test_access_attempts_enforcement.py new file mode 100644 index 0000000..f8fed41 --- /dev/null +++ b/tests/backend/test_access_attempts_enforcement.py @@ -0,0 +1,356 @@ +"""Tests for login/setup rate limiting, lockout, and HTTP smoke paths.""" + +from __future__ import annotations + +import json +import secrets +import time +from types import SimpleNamespace +from unittest.mock import patch + +import bcrypt +import pytest +from aiohttp import web +from aiohttp.test_utils import TestClient, TestServer +from aiohttp_session import setup as setup_session +from hypothesis import HealthCheck, given, settings +from hypothesis import strategies as st + +from meshchatx.meshchat import _request_client_ip +from meshchatx.src.backend.database.access_attempts import ( + LOGIN_PATH, + MAX_FAILED_BEFORE_LOCKOUT, + MAX_TRUSTED_LOGIN_PER_WINDOW, + MAX_UNTRUSTED_LOGIN_PER_WINDOW, + WINDOW_LOCKOUT_S, + WINDOW_RATE_TRUSTED_S, + WINDOW_RATE_UNTRUSTED_S, + user_agent_hash, +) + + +def _make_req( + ip: str, ua: str, method: str = "POST", xff: str | None = None +) -> SimpleNamespace: + h = {"User-Agent": ua} + if xff is not None: + h["X-Forwarded-For"] = xff + return SimpleNamespace(headers=h, remote=ip, method=method) + + +def _id_hex(mock_app) -> str: + return mock_app.identity.hash.hex() + + +def test_request_client_ip_prefers_x_forwarded_for(): + r = _make_req("10.0.0.1", "ua", xff="203.0.113.5, 10.0.0.2") + assert _request_client_ip(r) == "203.0.113.5" + + +def test_request_client_ip_falls_back_to_remote(): + r = _make_req("198.51.100.2", "ua") + assert _request_client_ip(r) == "198.51.100.2" + + +def test_enforce_returns_none_when_no_database(mock_app): + db = mock_app.database + try: + mock_app.database = None + r = _make_req("127.0.0.1", "Mozilla/5.0") + assert mock_app._enforce_login_access(r, LOGIN_PATH) is None + finally: + mock_app.database = db + + +def test_untrusted_rate_limit_after_max_per_window(mock_app): + dao = mock_app.database.access_attempts + ih = _id_hex(mock_app) + ip = "192.0.2.10" + ua = "RateTest/1" + now = time.time() + ts = now - (WINDOW_RATE_UNTRUSTED_S / 2) + for _ in range(MAX_UNTRUSTED_LOGIN_PER_WINDOW): + dao.provider.execute( + """ + INSERT INTO access_attempts ( + created_at, identity_hash, client_ip, user_agent, user_agent_hash, + path, method, outcome, detail + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + ts, + ih, + ip, + ua, + user_agent_hash(ua), + LOGIN_PATH, + "POST", + "success", + None, + ), + ) + r = _make_req(ip, ua) + resp = mock_app._enforce_login_access(r, LOGIN_PATH) + assert resp is not None + assert resp.status == 429 + + +def test_lockout_when_enough_untrusted_failures(mock_app): + dao = mock_app.database.access_attempts + ih = _id_hex(mock_app) + ip = "192.0.2.20" + ua = "Attacker/1" + now = time.time() + ts = now - (WINDOW_LOCKOUT_S / 2) + for _ in range(MAX_FAILED_BEFORE_LOCKOUT): + dao.provider.execute( + """ + INSERT INTO access_attempts ( + created_at, identity_hash, client_ip, user_agent, user_agent_hash, + path, method, outcome, detail + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + ts, + ih, + ip, + ua, + user_agent_hash(ua), + LOGIN_PATH, + "POST", + "failed_password", + None, + ), + ) + r = _make_req(ip, ua) + resp = mock_app._enforce_login_access(r, LOGIN_PATH) + assert resp is not None + assert resp.status == 429 + body = json.loads(resp.body.decode()) + err = body.get("error", "") + assert "failed" in err.lower() or "later" in err.lower() + + +def test_trusted_client_not_locked_out_by_own_failed_rows(mock_app): + dao = mock_app.database.access_attempts + ih = _id_hex(mock_app) + ip = "192.0.2.30" + ua = "GoodUser/1" + h = user_agent_hash(ua) + dao.upsert_trusted(ih, ip, h) + ts = time.time() - (WINDOW_LOCKOUT_S / 2) + for _ in range(10): + dao.provider.execute( + """ + INSERT INTO access_attempts ( + created_at, identity_hash, client_ip, user_agent, user_agent_hash, + path, method, outcome, detail + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + ts, + ih, + ip, + ua, + h, + LOGIN_PATH, + "POST", + "failed_password", + None, + ), + ) + r = _make_req(ip, ua) + assert mock_app._enforce_login_access(r, LOGIN_PATH) is None + + +def test_trusted_rate_limit_after_max_trusted_window(mock_app): + dao = mock_app.database.access_attempts + ih = _id_hex(mock_app) + ip = "192.0.2.40" + ua = "HeavyTrusted/1" + h = user_agent_hash(ua) + dao.upsert_trusted(ih, ip, h) + now = time.time() + ts = now - (WINDOW_RATE_TRUSTED_S / 2) + for _ in range(MAX_TRUSTED_LOGIN_PER_WINDOW): + dao.provider.execute( + """ + INSERT INTO access_attempts ( + created_at, identity_hash, client_ip, user_agent, user_agent_hash, + path, method, outcome, detail + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + ts, + ih, + ip, + ua, + h, + LOGIN_PATH, + "POST", + "success", + None, + ), + ) + r = _make_req(ip, ua) + resp = mock_app._enforce_login_access(r, LOGIN_PATH) + assert resp is not None + assert resp.status == 429 + + +@settings(max_examples=25, suppress_health_check=[HealthCheck.function_scoped_fixture]) +@given( + n=st.integers(min_value=0, max_value=MAX_UNTRUSTED_LOGIN_PER_WINDOW + 3), +) +def test_enforce_untrusted_monotone_hypothesis(mock_app, n): + dao = mock_app.database.access_attempts + ih = _id_hex(mock_app) + ip = "198.18.0.1" + ua = "Hyp/1" + dao.provider.execute("DELETE FROM access_attempts", ()) + for _ in range(n): + dao.insert(ih, ip, ua, LOGIN_PATH, "POST", "success", None) + r = _make_req(ip, ua) + out = mock_app._enforce_login_access(r, LOGIN_PATH) + if n >= MAX_UNTRUSTED_LOGIN_PER_WINDOW: + assert out is not None + assert out.status == 429 + else: + assert out is None + + +def _make_aio_app(mock_app, use_https: bool): + mock_app.session_secret_key = secrets.token_urlsafe(32) + routes = web.RouteTableDef() + auth_mw, mime_mw, sec_mw = mock_app._define_routes(routes) + aio_app = web.Application() + setup_session(aio_app, mock_app._encrypted_cookie_storage(use_https)) + aio_app.middlewares.extend([auth_mw, mime_mw, sec_mw]) + aio_app.add_routes(routes) + return aio_app + + +@pytest.mark.asyncio +async def test_login_records_access_attempt_and_debug_list(mock_app): + mock_app.config.auth_enabled.set(True) + pw = b"smoke-access-attempt-pw-ok" + mock_app.config.auth_password_hash.set( + bcrypt.hashpw(pw, bcrypt.gensalt()).decode("utf-8"), + ) + aio_app = _make_aio_app(mock_app, use_https=False) + + async with TestClient(TestServer(aio_app)) as client: + bad = await client.post("/api/v1/auth/login", json={"password": "wrong"}) + assert bad.status == 401 + good = await client.post( + "/api/v1/auth/login", + json={"password": pw.decode("utf-8")}, + ) + assert good.status == 200 + + dao = mock_app.database.access_attempts + rows = dao.list_attempts(limit=20, outcome="failed_password") + assert any(r["outcome"] == "failed_password" for r in rows) + rows_ok = dao.list_attempts(limit=20, outcome="success") + assert any(r["outcome"] == "success" for r in rows_ok) + total = dao.count_attempts() + assert total >= 2 + + +@pytest.mark.asyncio +async def test_lockout_login_returns_429_smoke(mock_app): + mock_app.config.auth_enabled.set(True) + pw = b"lockout-smoke-pw" + mock_app.config.auth_password_hash.set( + bcrypt.hashpw(pw, bcrypt.gensalt()).decode("utf-8"), + ) + aio_app = _make_aio_app(mock_app, use_https=False) + + async with TestClient(TestServer(aio_app)) as client: + ip = "192.0.2.88" + with patch( + "meshchatx.meshchat._request_client_ip", + return_value=ip, + ): + for _ in range(MAX_FAILED_BEFORE_LOCKOUT): + r = await client.post( + "/api/v1/auth/login", + json={"password": "nope"}, + headers={"User-Agent": "SmokeLock/1"}, + ) + assert r.status == 401 + r429 = await client.post( + "/api/v1/auth/login", + json={"password": "nope"}, + headers={"User-Agent": "SmokeLock/1"}, + ) + assert r429.status == 429 + + +@pytest.mark.asyncio +async def test_rate_limited_login_returns_429_smoke(mock_app): + mock_app.config.auth_enabled.set(True) + pw = b"rl-smoke-pw" + mock_app.config.auth_password_hash.set( + bcrypt.hashpw(pw, bcrypt.gensalt()).decode("utf-8"), + ) + dao = mock_app.database.access_attempts + ih = _id_hex(mock_app) + ip = "192.0.2.99" + ua = "SmokeRL/1" + now = time.time() + ts = now - (WINDOW_RATE_UNTRUSTED_S / 2) + for _ in range(MAX_UNTRUSTED_LOGIN_PER_WINDOW): + dao.provider.execute( + """ + INSERT INTO access_attempts ( + created_at, identity_hash, client_ip, user_agent, user_agent_hash, + path, method, outcome, detail + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + ts, + ih, + ip, + ua, + user_agent_hash(ua), + LOGIN_PATH, + "POST", + "success", + None, + ), + ) + aio_app = _make_aio_app(mock_app, use_https=False) + + async with TestClient(TestServer(aio_app)) as client: + with patch( + "meshchatx.meshchat._request_client_ip", + return_value=ip, + ): + r429 = await client.post( + "/api/v1/auth/login", + json={"password": "nope"}, + headers={"User-Agent": ua}, + ) + assert r429.status == 429 + + +@pytest.mark.asyncio +async def test_debug_access_attempts_endpoint_returns_shape(mock_app): + mock_app.config.auth_enabled.set(False) + aio_app = _make_aio_app(mock_app, use_https=False) + + async with TestClient(TestServer(aio_app)) as client: + r = await client.get("/api/v1/debug/access-attempts?limit=5&offset=0") + assert r.status == 200 + data = await r.json() + assert "attempts" in data + assert "total" in data + assert "limit" in data + assert "offset" in data + assert isinstance(data["attempts"], list) diff --git a/tests/backend/test_http_auth_security.py b/tests/backend/test_http_auth_security.py new file mode 100644 index 0000000..cd98ef5 --- /dev/null +++ b/tests/backend/test_http_auth_security.py @@ -0,0 +1,147 @@ +import asyncio +import secrets +import bcrypt +import pytest +from aiohttp import web +from aiohttp.test_utils import TestClient, TestServer +from aiohttp_session import setup as setup_session +from hypothesis import HealthCheck, given, settings +from hypothesis import strategies as st + +from meshchatx.src.backend.config_manager import ConfigManager + + +def _make_aio_app(mock_app, use_https: bool): + mock_app.session_secret_key = secrets.token_urlsafe(32) + routes = web.RouteTableDef() + auth_mw, mime_mw, sec_mw = mock_app._define_routes(routes) + aio_app = web.Application() + setup_session(aio_app, mock_app._encrypted_cookie_storage(use_https)) + aio_app.middlewares.extend([auth_mw, mime_mw, sec_mw]) + aio_app.add_routes(routes) + return aio_app + + +def test_config_password_hash_roundtrip(mock_app): + mock_app.config.auth_password_hash.set("roundtrip-test-hash") + assert mock_app.config.auth_password_hash.get() == "roundtrip-test-hash" + + +def test_config_bcrypt_hash_roundtrip(mock_app): + mock_app.config.auth_enabled.set(True) + h = bcrypt.hashpw(b"pw", bcrypt.gensalt()).decode("utf-8") + mock_app.config.auth_password_hash.set(h) + assert mock_app.config.auth_password_hash.get() == h + + +def test_encrypted_cookie_storage_https_flags(mock_app): + mock_app.session_secret_key = secrets.token_urlsafe(32) + storage = mock_app._encrypted_cookie_storage(True) + assert storage.cookie_params["secure"] is True + assert storage.cookie_params["httponly"] is True + assert storage.cookie_params["samesite"] == "Lax" + + +def test_encrypted_cookie_storage_http_flags(mock_app): + mock_app.session_secret_key = secrets.token_urlsafe(32) + storage = mock_app._encrypted_cookie_storage(False) + assert storage.cookie_params["secure"] is False + + +@pytest.mark.asyncio +async def test_login_sets_cookie_and_allows_protected_api(mock_app): + mock_app.config.auth_enabled.set(True) + pw = b"integration-test-password-ok" + stored_hash = bcrypt.hashpw(pw, bcrypt.gensalt()).decode("utf-8") + mock_app.config.auth_password_hash.set(stored_hash) + assert isinstance(mock_app.current_context.config, ConfigManager) + assert mock_app.config.auth_password_hash.get() == stored_hash + aio_app = _make_aio_app(mock_app, use_https=False) + + async with TestClient(TestServer(aio_app)) as client: + login = await client.post( + "/api/v1/auth/login", + json={"password": pw.decode("utf-8")}, + ) + assert login.status == 200 + set_cookie = login.headers.get("Set-Cookie", "") + assert "HttpOnly" in set_cookie + assert "SameSite=Lax" in set_cookie + assert "Secure" not in set_cookie + + backups = await client.get("/api/v1/database/backups") + assert backups.status == 200 + body = await backups.json() + assert "backups" in body + + +@pytest.mark.asyncio +async def test_ws_returns_401_without_session_when_auth_enabled(mock_app): + mock_app.config.auth_enabled.set(True) + mock_app.config.auth_password_hash.set( + bcrypt.hashpw(b"x", bcrypt.gensalt()).decode("utf-8"), + ) + aio_app = _make_aio_app(mock_app, use_https=False) + + async with TestClient(TestServer(aio_app)) as client: + r = await client.get("/ws") + assert r.status == 401 + + +@pytest.mark.asyncio +async def test_logout_clears_session_for_protected_api(mock_app): + mock_app.config.auth_enabled.set(True) + pw = b"logout-test-password-ok" + mock_app.config.auth_password_hash.set( + bcrypt.hashpw(pw, bcrypt.gensalt()).decode("utf-8"), + ) + aio_app = _make_aio_app(mock_app, use_https=False) + + async with TestClient(TestServer(aio_app)) as client: + await client.post("/api/v1/auth/login", json={"password": pw.decode("utf-8")}) + assert (await client.get("/api/v1/database/backups")).status == 200 + + out = await client.post("/api/v1/auth/logout") + assert out.status == 200 + + assert (await client.get("/api/v1/database/backups")).status == 401 + + +@pytest.mark.asyncio +async def test_auth_login_invalid_json_returns_400(mock_app): + mock_app.config.auth_enabled.set(True) + mock_app.config.auth_password_hash.set( + bcrypt.hashpw(b"x", bcrypt.gensalt()).decode("utf-8"), + ) + aio_app = _make_aio_app(mock_app, use_https=False) + + async with TestClient(TestServer(aio_app)) as client: + r = await client.post( + "/api/v1/auth/login", + data="{not-json", + headers={"Content-Type": "application/json"}, + ) + assert r.status == 400 + body = await r.json() + assert "error" in body + + +@settings(max_examples=40, suppress_health_check=[HealthCheck.function_scoped_fixture]) +@given(body=st.binary(min_size=0, max_size=12000)) +def test_auth_login_fuzz_never_500(mock_app, body): + mock_app.config.auth_enabled.set(True) + mock_app.config.auth_password_hash.set( + bcrypt.hashpw(b"fixed", bcrypt.gensalt()).decode("utf-8"), + ) + aio_app = _make_aio_app(mock_app, use_https=False) + + async def run(): + async with TestClient(TestServer(aio_app)) as client: + r = await client.post( + "/api/v1/auth/login", + data=body, + headers={"Content-Type": "application/json"}, + ) + assert r.status != 500 + + asyncio.run(run()) diff --git a/tests/backend/test_notifications.py b/tests/backend/test_notifications.py index f85e778..97fff08 100644 --- a/tests/backend/test_notifications.py +++ b/tests/backend/test_notifications.py @@ -1,149 +1,12 @@ -import os import time -from contextlib import ExitStack -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest -import RNS +from aiohttp import web +from aiohttp.test_utils import TestClient, TestServer from hypothesis import HealthCheck, given, settings from hypothesis import strategies as st -from meshchatx.meshchat import ReticulumMeshChat -from meshchatx.src.backend.database import Database -from meshchatx.src.backend.database.provider import DatabaseProvider -from meshchatx.src.backend.database.schema import DatabaseSchema - - -@pytest.fixture -def temp_db(tmp_path): - db_path = os.path.join(tmp_path, "test_notifications.db") - yield db_path - if os.path.exists(db_path): - os.remove(db_path) - - -@pytest.fixture -def db(temp_db): - provider = DatabaseProvider(temp_db) - schema = DatabaseSchema(provider) - schema.initialize() - database = Database(temp_db) - yield database - database.close_all() - provider.close_all() - - -@pytest.fixture -def mock_app(db, tmp_path): - # Save real Identity class to use as base for our mock class - real_identity_class = RNS.Identity - - class MockIdentityClass(real_identity_class): - def __init__(self, *args, **kwargs): - self.hash = b"test_hash_32_bytes_long_01234567" - self.hexhash = self.hash.hex() - - with ExitStack() as stack: - stack.enter_context(patch("RNS.Identity", MockIdentityClass)) - stack.enter_context(patch("RNS.Reticulum")) - stack.enter_context(patch("RNS.Transport")) - stack.enter_context(patch("LXMF.LXMRouter")) - stack.enter_context( - patch("meshchatx.src.backend.identity_context.TelephoneManager"), - ) - stack.enter_context( - patch("meshchatx.src.backend.identity_context.VoicemailManager"), - ) - stack.enter_context( - patch("meshchatx.src.backend.identity_context.RingtoneManager"), - ) - stack.enter_context(patch("meshchatx.src.backend.identity_context.RNCPHandler")) - stack.enter_context( - patch("meshchatx.src.backend.identity_context.RNStatusHandler"), - ) - stack.enter_context( - patch("meshchatx.src.backend.identity_context.RNProbeHandler"), - ) - stack.enter_context( - patch("meshchatx.src.backend.identity_context.TranslatorHandler"), - ) - stack.enter_context( - patch("meshchatx.src.backend.identity_context.ArchiverManager"), - ) - stack.enter_context(patch("meshchatx.src.backend.identity_context.MapManager")) - stack.enter_context( - patch("meshchatx.src.backend.identity_context.MessageHandler"), - ) - stack.enter_context( - patch("meshchatx.src.backend.identity_context.AnnounceManager"), - ) - stack.enter_context(patch("threading.Thread")) - - mock_id = MockIdentityClass() - mock_id.get_private_key = MagicMock(return_value=b"test_private_key") - - stack.enter_context( - patch.object(MockIdentityClass, "from_file", return_value=mock_id), - ) - stack.enter_context( - patch.object(MockIdentityClass, "recall", return_value=mock_id), - ) - stack.enter_context( - patch.object(MockIdentityClass, "from_bytes", return_value=mock_id), - ) - - # Patch background threads and other heavy init - stack.enter_context( - patch.object( - ReticulumMeshChat, - "announce_loop", - new=MagicMock(return_value=None), - ), - ) - stack.enter_context( - patch.object( - ReticulumMeshChat, - "announce_sync_propagation_nodes", - new=MagicMock(return_value=None), - ), - ) - stack.enter_context( - patch.object( - ReticulumMeshChat, - "crawler_loop", - new=MagicMock(return_value=None), - ), - ) - - stack.enter_context( - patch.object( - ReticulumMeshChat, - "auto_backup_loop", - new=MagicMock(return_value=None), - ), - ) - # Prevent JSON serialization issues with MagicMocks - stack.enter_context( - patch.object( - ReticulumMeshChat, - "send_config_to_websocket_clients", - return_value=None, - ), - ) - - app = ReticulumMeshChat( - identity=mock_id, - storage_dir=str(tmp_path), - reticulum_config_dir=str(tmp_path), - ) - - # Use our real test database - app.database = db - app.websocket_broadcast = MagicMock(side_effect=lambda data: None) - - yield app - app.teardown_identity() - def test_add_get_notifications(db): """Test basic notification storage and retrieval.""" @@ -578,3 +441,23 @@ def test_random_notification_operations(db, operations): count = db.misc.get_unread_notification_count() assert count >= 0 + + +@pytest.mark.asyncio +async def test_auth_middleware_returns_401_for_api_without_session_when_auth_enabled( + mock_app, +): + app = mock_app + app.current_context = MagicMock(running=True) + app.config.auth_enabled.set(True) + + routes = web.RouteTableDef() + auth_mw, mime_mw, sec_mw = app._define_routes(routes) + aio_app = web.Application(middlewares=[auth_mw, mime_mw, sec_mw]) + aio_app.add_routes(routes) + + async with TestClient(TestServer(aio_app)) as client: + with patch("meshchatx.meshchat.get_session", new_callable=AsyncMock) as gs: + gs.return_value = {} + resp = await client.get("/api/v1/config") + assert resp.status == 401 diff --git a/tests/backend/test_schema_migration_upgrade.py b/tests/backend/test_schema_migration_upgrade.py new file mode 100644 index 0000000..75b1eb8 --- /dev/null +++ b/tests/backend/test_schema_migration_upgrade.py @@ -0,0 +1,62 @@ +"""Upgrade path: simulate database at version N-1 and assert migration to LATEST_VERSION.""" + +import pytest + +from meshchatx.src.backend.database.provider import DatabaseProvider +from meshchatx.src.backend.database.schema import DatabaseSchema + + +def _column_names(provider, table: str) -> set[str]: + cur = provider.connection.cursor() + try: + cur.execute(f"PRAGMA table_info({table})") # noqa: S608 + return {row[1] for row in cur.fetchall()} + finally: + cur.close() + + +@pytest.mark.skipif( + DatabaseSchema.LATEST_VERSION < 41, + reason="Test targets migration block for version 41", +) +def test_migrate_from_version_40_restores_attachments_stripped(tmp_path): + """Version 41 adds lxmf_messages.attachments_stripped; re-apply from 40.""" + db_path = tmp_path / "mig.db" + provider = DatabaseProvider(str(db_path)) + schema = DatabaseSchema(provider) + schema.initialize() + assert ( + int( + provider.fetchone( + "SELECT value FROM config WHERE key = ?", + ("database_version",), + )["value"], + ) + == DatabaseSchema.LATEST_VERSION + ) + + provider.execute( + "UPDATE config SET value = ? WHERE key = ?", + ("40", "database_version"), + ) + try: + provider.execute("ALTER TABLE lxmf_messages DROP COLUMN attachments_stripped") + except Exception as exc: + pytest.skip(f"SQLite DROP COLUMN not available or failed: {exc}") + + assert "attachments_stripped" not in _column_names(provider, "lxmf_messages") + + schema.migrate(schema._get_current_version()) + + assert "attachments_stripped" in _column_names(provider, "lxmf_messages") + assert ( + int( + provider.fetchone( + "SELECT value FROM config WHERE key = ?", + ("database_version",), + )["value"], + ) + == DatabaseSchema.LATEST_VERSION + ) + + provider.close_all() diff --git a/tests/backend/test_security_path_and_backup.py b/tests/backend/test_security_path_and_backup.py new file mode 100644 index 0000000..e6c5071 --- /dev/null +++ b/tests/backend/test_security_path_and_backup.py @@ -0,0 +1,22 @@ +"""Path safety: backup/snapshot delete must not escape storage directories.""" + +import os + +import pytest + +from meshchatx.src.backend.database import Database + + +def test_delete_database_backup_rejects_path_outside_storage(tmp_path): + db_path = tmp_path / "t.db" + db = Database(str(db_path)) + db.initialize() + storage = str(tmp_path) + os.makedirs(os.path.join(storage, "database-backups"), exist_ok=True) + with pytest.raises(ValueError, match="Invalid path"): + db.delete_snapshot_or_backup( + storage, + "../../../etc/passwd", + is_backup=True, + ) + db.close_all()