mirror of
https://git.quad4.io/RNS-Things/MeshChatX.git
synced 2026-04-02 10:05:43 +00:00
148 lines
5.2 KiB
Python
148 lines
5.2 KiB
Python
import asyncio
|
|
import secrets
|
|
import bcrypt
|
|
import pytest
|
|
from aiohttp import web
|
|
from aiohttp.test_utils import TestClient, TestServer
|
|
from aiohttp_session import setup as setup_session
|
|
from hypothesis import HealthCheck, given, settings
|
|
from hypothesis import strategies as st
|
|
|
|
from meshchatx.src.backend.config_manager import ConfigManager
|
|
|
|
|
|
def _make_aio_app(mock_app, use_https: bool):
|
|
mock_app.session_secret_key = secrets.token_urlsafe(32)
|
|
routes = web.RouteTableDef()
|
|
auth_mw, mime_mw, sec_mw = mock_app._define_routes(routes)
|
|
aio_app = web.Application()
|
|
setup_session(aio_app, mock_app._encrypted_cookie_storage(use_https))
|
|
aio_app.middlewares.extend([auth_mw, mime_mw, sec_mw])
|
|
aio_app.add_routes(routes)
|
|
return aio_app
|
|
|
|
|
|
def test_config_password_hash_roundtrip(mock_app):
|
|
mock_app.config.auth_password_hash.set("roundtrip-test-hash")
|
|
assert mock_app.config.auth_password_hash.get() == "roundtrip-test-hash"
|
|
|
|
|
|
def test_config_bcrypt_hash_roundtrip(mock_app):
|
|
mock_app.config.auth_enabled.set(True)
|
|
h = bcrypt.hashpw(b"pw", bcrypt.gensalt()).decode("utf-8")
|
|
mock_app.config.auth_password_hash.set(h)
|
|
assert mock_app.config.auth_password_hash.get() == h
|
|
|
|
|
|
def test_encrypted_cookie_storage_https_flags(mock_app):
|
|
mock_app.session_secret_key = secrets.token_urlsafe(32)
|
|
storage = mock_app._encrypted_cookie_storage(True)
|
|
assert storage.cookie_params["secure"] is True
|
|
assert storage.cookie_params["httponly"] is True
|
|
assert storage.cookie_params["samesite"] == "Lax"
|
|
|
|
|
|
def test_encrypted_cookie_storage_http_flags(mock_app):
|
|
mock_app.session_secret_key = secrets.token_urlsafe(32)
|
|
storage = mock_app._encrypted_cookie_storage(False)
|
|
assert storage.cookie_params["secure"] is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_login_sets_cookie_and_allows_protected_api(mock_app):
|
|
mock_app.config.auth_enabled.set(True)
|
|
pw = b"integration-test-password-ok"
|
|
stored_hash = bcrypt.hashpw(pw, bcrypt.gensalt()).decode("utf-8")
|
|
mock_app.config.auth_password_hash.set(stored_hash)
|
|
assert isinstance(mock_app.current_context.config, ConfigManager)
|
|
assert mock_app.config.auth_password_hash.get() == stored_hash
|
|
aio_app = _make_aio_app(mock_app, use_https=False)
|
|
|
|
async with TestClient(TestServer(aio_app)) as client:
|
|
login = await client.post(
|
|
"/api/v1/auth/login",
|
|
json={"password": pw.decode("utf-8")},
|
|
)
|
|
assert login.status == 200
|
|
set_cookie = login.headers.get("Set-Cookie", "")
|
|
assert "HttpOnly" in set_cookie
|
|
assert "SameSite=Lax" in set_cookie
|
|
assert "Secure" not in set_cookie
|
|
|
|
backups = await client.get("/api/v1/database/backups")
|
|
assert backups.status == 200
|
|
body = await backups.json()
|
|
assert "backups" in body
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ws_returns_401_without_session_when_auth_enabled(mock_app):
|
|
mock_app.config.auth_enabled.set(True)
|
|
mock_app.config.auth_password_hash.set(
|
|
bcrypt.hashpw(b"x", bcrypt.gensalt()).decode("utf-8"),
|
|
)
|
|
aio_app = _make_aio_app(mock_app, use_https=False)
|
|
|
|
async with TestClient(TestServer(aio_app)) as client:
|
|
r = await client.get("/ws")
|
|
assert r.status == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_logout_clears_session_for_protected_api(mock_app):
|
|
mock_app.config.auth_enabled.set(True)
|
|
pw = b"logout-test-password-ok"
|
|
mock_app.config.auth_password_hash.set(
|
|
bcrypt.hashpw(pw, bcrypt.gensalt()).decode("utf-8"),
|
|
)
|
|
aio_app = _make_aio_app(mock_app, use_https=False)
|
|
|
|
async with TestClient(TestServer(aio_app)) as client:
|
|
await client.post("/api/v1/auth/login", json={"password": pw.decode("utf-8")})
|
|
assert (await client.get("/api/v1/database/backups")).status == 200
|
|
|
|
out = await client.post("/api/v1/auth/logout")
|
|
assert out.status == 200
|
|
|
|
assert (await client.get("/api/v1/database/backups")).status == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_auth_login_invalid_json_returns_400(mock_app):
|
|
mock_app.config.auth_enabled.set(True)
|
|
mock_app.config.auth_password_hash.set(
|
|
bcrypt.hashpw(b"x", bcrypt.gensalt()).decode("utf-8"),
|
|
)
|
|
aio_app = _make_aio_app(mock_app, use_https=False)
|
|
|
|
async with TestClient(TestServer(aio_app)) as client:
|
|
r = await client.post(
|
|
"/api/v1/auth/login",
|
|
data="{not-json",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
assert r.status == 400
|
|
body = await r.json()
|
|
assert "error" in body
|
|
|
|
|
|
@settings(max_examples=40, suppress_health_check=[HealthCheck.function_scoped_fixture])
|
|
@given(body=st.binary(min_size=0, max_size=12000))
|
|
def test_auth_login_fuzz_never_500(mock_app, body):
|
|
mock_app.config.auth_enabled.set(True)
|
|
mock_app.config.auth_password_hash.set(
|
|
bcrypt.hashpw(b"fixed", bcrypt.gensalt()).decode("utf-8"),
|
|
)
|
|
aio_app = _make_aio_app(mock_app, use_https=False)
|
|
|
|
async def run():
|
|
async with TestClient(TestServer(aio_app)) as client:
|
|
r = await client.post(
|
|
"/api/v1/auth/login",
|
|
data=body,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
assert r.status != 500
|
|
|
|
asyncio.run(run())
|