mirror of
https://git.quad4.io/RNS-Things/MeshChatX.git
synced 2026-03-31 19:05:47 +00:00
- Introduced SQLite pragma tuning in the Database initialization for improved performance. - Wrapped multiple database operations in transactions to optimize batch processing in MessageDAO. - Updated DatabaseSchema to version 39, adding new indexes for better query performance. - Improved test coverage for batch operations and SQL injection scenarios in the DAO layer.
388 lines
13 KiB
Python
388 lines
13 KiB
Python
"""Tests confirming SQL-injection fixes in the raw-SQL database layer.
|
|
|
|
Covers:
|
|
- ATTACH DATABASE path escaping (single-quote doubling) in LegacyMigrator
|
|
- Column-name identifier filtering during legacy migration
|
|
- _validate_identifier / _ensure_column rejection in DatabaseSchema
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
|
|
import pytest
|
|
from hypothesis import HealthCheck, given, settings
|
|
from hypothesis import strategies as st
|
|
|
|
from meshchatx.src.backend.database.legacy_migrator import LegacyMigrator
|
|
from meshchatx.src.backend.database.provider import DatabaseProvider
|
|
from meshchatx.src.backend.database.schema import DatabaseSchema, _validate_identifier
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def db_env(tmp_path):
|
|
"""Provide an initialized DatabaseProvider + Schema in a temp directory."""
|
|
db_path = str(tmp_path / "current.db")
|
|
provider = DatabaseProvider(db_path)
|
|
schema = DatabaseSchema(provider)
|
|
schema.initialize()
|
|
yield provider, schema, tmp_path
|
|
provider.close()
|
|
|
|
|
|
def _make_legacy_db(legacy_dir, identity_hash, tables_sql):
|
|
"""Create a legacy database with the given CREATE TABLE + INSERT statements."""
|
|
identity_dir = os.path.join(legacy_dir, "identities", identity_hash)
|
|
os.makedirs(identity_dir, exist_ok=True)
|
|
db_path = os.path.join(identity_dir, "database.db")
|
|
conn = sqlite3.connect(db_path)
|
|
for sql in tables_sql:
|
|
conn.execute(sql)
|
|
conn.commit()
|
|
conn.close()
|
|
return db_path
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 1. ATTACH DATABASE — single-quote escaping
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAttachDatabasePathEscaping:
|
|
def test_path_without_quotes_migrates_normally(self, db_env):
|
|
provider, _schema, tmp_path = db_env
|
|
legacy_dir = str(tmp_path / "legacy_normal")
|
|
identity_hash = "aabbccdd"
|
|
_make_legacy_db(
|
|
legacy_dir,
|
|
identity_hash,
|
|
[
|
|
"CREATE TABLE config (key TEXT UNIQUE, value TEXT)",
|
|
"INSERT INTO config (key, value) VALUES ('k1', 'v1')",
|
|
],
|
|
)
|
|
|
|
migrator = LegacyMigrator(provider, legacy_dir, identity_hash)
|
|
assert migrator.migrate() is True
|
|
|
|
row = provider.fetchone("SELECT value FROM config WHERE key = 'k1'")
|
|
assert row is not None
|
|
assert row["value"] == "v1"
|
|
|
|
def test_path_with_single_quote_does_not_crash(self, db_env):
|
|
"""A path containing a single quote must not cause SQL injection or crash."""
|
|
provider, _schema, tmp_path = db_env
|
|
|
|
quoted_dir = tmp_path / "it's_a_test"
|
|
quoted_dir.mkdir(parents=True, exist_ok=True)
|
|
legacy_dir = str(quoted_dir)
|
|
identity_hash = "aabbccdd"
|
|
_make_legacy_db(
|
|
legacy_dir,
|
|
identity_hash,
|
|
[
|
|
"CREATE TABLE config (key TEXT UNIQUE, value TEXT)",
|
|
"INSERT INTO config (key, value) VALUES ('q1', 'quoted_val')",
|
|
],
|
|
)
|
|
|
|
migrator = LegacyMigrator(provider, legacy_dir, identity_hash)
|
|
result = migrator.migrate()
|
|
assert result is True
|
|
|
|
row = provider.fetchone("SELECT value FROM config WHERE key = 'q1'")
|
|
assert row is not None
|
|
assert row["value"] == "quoted_val"
|
|
|
|
def test_path_with_multiple_quotes(self, db_env):
|
|
"""Multiple single quotes in the path are all escaped."""
|
|
provider, _schema, tmp_path = db_env
|
|
|
|
weird_dir = tmp_path / "a'b'c"
|
|
weird_dir.mkdir(parents=True, exist_ok=True)
|
|
legacy_dir = str(weird_dir)
|
|
identity_hash = "11223344"
|
|
_make_legacy_db(
|
|
legacy_dir,
|
|
identity_hash,
|
|
[
|
|
"CREATE TABLE config (key TEXT UNIQUE, value TEXT)",
|
|
"INSERT INTO config (key, value) VALUES ('mq', 'multi_quote')",
|
|
],
|
|
)
|
|
|
|
migrator = LegacyMigrator(provider, legacy_dir, identity_hash)
|
|
assert migrator.migrate() is True
|
|
|
|
row = provider.fetchone("SELECT value FROM config WHERE key = 'mq'")
|
|
assert row is not None
|
|
assert row["value"] == "multi_quote"
|
|
|
|
def test_path_with_sql_injection_attempt(self, db_env):
|
|
"""A path crafted to look like SQL injection is safely escaped."""
|
|
provider, _schema, tmp_path = db_env
|
|
|
|
evil_dir = tmp_path / "'; DROP TABLE config; --"
|
|
evil_dir.mkdir(parents=True, exist_ok=True)
|
|
legacy_dir = str(evil_dir)
|
|
identity_hash = "deadbeef"
|
|
_make_legacy_db(
|
|
legacy_dir,
|
|
identity_hash,
|
|
[
|
|
"CREATE TABLE config (key TEXT UNIQUE, value TEXT)",
|
|
"INSERT INTO config (key, value) VALUES ('evil', 'nope')",
|
|
],
|
|
)
|
|
|
|
migrator = LegacyMigrator(provider, legacy_dir, identity_hash)
|
|
migrator.migrate()
|
|
|
|
tables = provider.fetchall(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='config'"
|
|
)
|
|
assert len(tables) > 0, "config table must still exist after injection attempt"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 2. Legacy migrator — malicious column names filtered out
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestLegacyColumnFiltering:
|
|
def test_normal_columns_migrate(self, db_env):
|
|
"""Standard column names pass through the identifier filter."""
|
|
provider, _schema, tmp_path = db_env
|
|
legacy_dir = str(tmp_path / "legacy_cols")
|
|
identity_hash = "aabb0011"
|
|
_make_legacy_db(
|
|
legacy_dir,
|
|
identity_hash,
|
|
[
|
|
"CREATE TABLE config (key TEXT UNIQUE, value TEXT)",
|
|
"INSERT INTO config (key, value) VALUES ('c1', 'ok')",
|
|
],
|
|
)
|
|
|
|
migrator = LegacyMigrator(provider, legacy_dir, identity_hash)
|
|
assert migrator.migrate() is True
|
|
|
|
row = provider.fetchone("SELECT value FROM config WHERE key = 'c1'")
|
|
assert row is not None
|
|
|
|
def test_malicious_column_name_is_skipped(self, db_env):
|
|
"""A column with SQL metacharacters in its name must be silently skipped."""
|
|
provider, _schema, tmp_path = db_env
|
|
legacy_dir = str(tmp_path / "legacy_evil_col")
|
|
identity_hash = "cc00dd00"
|
|
|
|
identity_dir = os.path.join(legacy_dir, "identities", identity_hash)
|
|
os.makedirs(identity_dir, exist_ok=True)
|
|
db_path = os.path.join(identity_dir, "database.db")
|
|
conn = sqlite3.connect(db_path)
|
|
conn.execute(
|
|
'CREATE TABLE config (key TEXT UNIQUE, value TEXT, "key; DROP TABLE config" TEXT)'
|
|
)
|
|
conn.execute("INSERT INTO config (key, value) VALUES ('safe', 'data')")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
migrator = LegacyMigrator(provider, legacy_dir, identity_hash)
|
|
assert migrator.migrate() is True
|
|
|
|
row = provider.fetchone("SELECT value FROM config WHERE key = 'safe'")
|
|
assert row is not None
|
|
assert row["value"] == "data"
|
|
|
|
def test_column_with_parentheses_is_skipped(self, db_env):
|
|
"""Columns with () in the name are rejected by the identifier regex."""
|
|
provider, _schema, tmp_path = db_env
|
|
legacy_dir = str(tmp_path / "legacy_parens_col")
|
|
identity_hash = "ee00ff00"
|
|
|
|
identity_dir = os.path.join(legacy_dir, "identities", identity_hash)
|
|
os.makedirs(identity_dir, exist_ok=True)
|
|
db_path = os.path.join(identity_dir, "database.db")
|
|
conn = sqlite3.connect(db_path)
|
|
conn.execute('CREATE TABLE config (key TEXT UNIQUE, value TEXT, "evil()" TEXT)')
|
|
conn.execute("INSERT INTO config (key, value) VALUES ('p1', 'parens')")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
migrator = LegacyMigrator(provider, legacy_dir, identity_hash)
|
|
assert migrator.migrate() is True
|
|
|
|
row = provider.fetchone("SELECT value FROM config WHERE key = 'p1'")
|
|
assert row is not None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 3. _validate_identifier — unit tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestValidateIdentifier:
|
|
@pytest.mark.parametrize(
|
|
"name",
|
|
[
|
|
"config",
|
|
"lxmf_messages",
|
|
"A",
|
|
"_private",
|
|
"Column123",
|
|
"a_b_c_d",
|
|
],
|
|
)
|
|
def test_valid_identifiers_pass(self, name):
|
|
assert _validate_identifier(name) == name
|
|
|
|
@pytest.mark.parametrize(
|
|
"name",
|
|
[
|
|
"",
|
|
"123abc",
|
|
"table name",
|
|
"col;drop",
|
|
"a'b",
|
|
'a"b',
|
|
"col()",
|
|
"x--y",
|
|
"a,b",
|
|
"hello\nworld",
|
|
"tab\there",
|
|
"col/**/name",
|
|
],
|
|
)
|
|
def test_invalid_identifiers_raise(self, name):
|
|
with pytest.raises(ValueError, match="Invalid SQL"):
|
|
_validate_identifier(name)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 4. _ensure_column — rejects injection via table/column names
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestEnsureColumnInjection:
|
|
def test_ensure_column_rejects_malicious_table_name(self, db_env):
|
|
_provider, schema, _tmp_path = db_env
|
|
with pytest.raises(ValueError, match="Invalid SQL table name"):
|
|
schema._ensure_column("config; DROP TABLE config", "new_col", "TEXT")
|
|
|
|
def test_ensure_column_rejects_malicious_column_name(self, db_env):
|
|
_provider, schema, _tmp_path = db_env
|
|
with pytest.raises(ValueError, match="Invalid SQL column name"):
|
|
schema._ensure_column("config", "col; DROP TABLE config", "TEXT")
|
|
|
|
def test_ensure_column_works_for_valid_names(self, db_env):
|
|
_provider, schema, _tmp_path = db_env
|
|
result = schema._ensure_column("config", "test_new_col", "TEXT")
|
|
assert result is True
|
|
|
|
def test_ensure_column_idempotent(self, db_env):
|
|
_provider, schema, _tmp_path = db_env
|
|
schema._ensure_column("config", "idempotent_col", "TEXT")
|
|
result = schema._ensure_column("config", "idempotent_col", "TEXT")
|
|
assert result is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 5. Property-based tests — identifier regex
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
|
|
|
|
|
|
@given(name=st.text(min_size=1, max_size=80))
|
|
@settings(deadline=None)
|
|
def test_validate_identifier_never_allows_sql_metacharacters(name):
|
|
"""No string accepted by _validate_identifier contains SQL metacharacters."""
|
|
try:
|
|
_validate_identifier(name)
|
|
except ValueError:
|
|
return
|
|
|
|
assert ";" not in name
|
|
assert "'" not in name
|
|
assert '"' not in name
|
|
assert "(" not in name
|
|
assert ")" not in name
|
|
assert " " not in name
|
|
assert "-" not in name
|
|
assert "/" not in name
|
|
assert "\\" not in name
|
|
assert "\n" not in name
|
|
assert "\r" not in name
|
|
assert "\t" not in name
|
|
assert "," not in name
|
|
assert _IDENTIFIER_RE.match(name)
|
|
|
|
|
|
@given(name=st.from_regex(r"[A-Za-z_][A-Za-z0-9_]{0,30}", fullmatch=True))
|
|
@settings(deadline=None)
|
|
def test_validate_identifier_accepts_all_valid_identifiers(name):
|
|
"""Every string matching the identifier pattern is accepted."""
|
|
assert _validate_identifier(name) == name
|
|
|
|
|
|
@given(
|
|
name=st.text(
|
|
alphabet=st.sampled_from(list(";'\"()- \t\n\r,/*")),
|
|
min_size=1,
|
|
max_size=30,
|
|
)
|
|
)
|
|
@settings(deadline=None)
|
|
def test_validate_identifier_rejects_pure_metacharacter_strings(name):
|
|
"""Strings composed entirely of SQL metacharacters are always rejected."""
|
|
with pytest.raises(ValueError):
|
|
_validate_identifier(name)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 6. ATTACH path escaping — property-based
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@given(
|
|
path_segment=st.text(
|
|
alphabet=st.characters(
|
|
whitelist_categories=("L", "N", "P", "S", "Z"),
|
|
),
|
|
min_size=1,
|
|
max_size=60,
|
|
)
|
|
)
|
|
@settings(deadline=None, suppress_health_check=[HealthCheck.too_slow])
|
|
def test_attach_path_escaping_never_breaks_sql(path_segment):
|
|
"""The quote-doubling escaping produces a string that SQLite can parse
|
|
without breaking out of the literal, regardless of the path content."""
|
|
safe = path_segment.replace("'", "''")
|
|
sql = f"ATTACH DATABASE '{safe}' AS test_alias"
|
|
|
|
assert sql.count("ATTACH DATABASE '") == 1
|
|
|
|
after_open = sql.split("ATTACH DATABASE '", 1)[1]
|
|
in_literal = True
|
|
i = 0
|
|
while i < len(after_open):
|
|
if after_open[i] == "'":
|
|
if i + 1 < len(after_open) and after_open[i + 1] == "'":
|
|
i += 2
|
|
continue
|
|
else:
|
|
in_literal = False
|
|
remainder = after_open[i + 1 :]
|
|
break
|
|
i += 1
|
|
|
|
if not in_literal:
|
|
assert remainder.strip() == "AS test_alias", (
|
|
f"Unexpected SQL after literal end: {remainder!r}"
|
|
)
|