Files
MeshChatX/tests/backend/test_sql_injection_fixes.py
Sudo-Ivan bc8969ab16 Improve database performance and SQL handling
- Introduced SQLite pragma tuning in the Database initialization for improved performance.
- Wrapped multiple database operations in transactions to optimize batch processing in MessageDAO.
- Updated DatabaseSchema to version 39, adding new indexes for better query performance.
- Improved test coverage for batch operations and SQL injection scenarios in the DAO layer.
2026-03-06 03:26:36 -06:00

388 lines
13 KiB
Python

"""Tests confirming SQL-injection fixes in the raw-SQL database layer.
Covers:
- ATTACH DATABASE path escaping (single-quote doubling) in LegacyMigrator
- Column-name identifier filtering during legacy migration
- _validate_identifier / _ensure_column rejection in DatabaseSchema
"""
import os
import re
import sqlite3
import pytest
from hypothesis import HealthCheck, given, settings
from hypothesis import strategies as st
from meshchatx.src.backend.database.legacy_migrator import LegacyMigrator
from meshchatx.src.backend.database.provider import DatabaseProvider
from meshchatx.src.backend.database.schema import DatabaseSchema, _validate_identifier
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def db_env(tmp_path):
"""Provide an initialized DatabaseProvider + Schema in a temp directory."""
db_path = str(tmp_path / "current.db")
provider = DatabaseProvider(db_path)
schema = DatabaseSchema(provider)
schema.initialize()
yield provider, schema, tmp_path
provider.close()
def _make_legacy_db(legacy_dir, identity_hash, tables_sql):
"""Create a legacy database with the given CREATE TABLE + INSERT statements."""
identity_dir = os.path.join(legacy_dir, "identities", identity_hash)
os.makedirs(identity_dir, exist_ok=True)
db_path = os.path.join(identity_dir, "database.db")
conn = sqlite3.connect(db_path)
for sql in tables_sql:
conn.execute(sql)
conn.commit()
conn.close()
return db_path
# ---------------------------------------------------------------------------
# 1. ATTACH DATABASE — single-quote escaping
# ---------------------------------------------------------------------------
class TestAttachDatabasePathEscaping:
def test_path_without_quotes_migrates_normally(self, db_env):
provider, _schema, tmp_path = db_env
legacy_dir = str(tmp_path / "legacy_normal")
identity_hash = "aabbccdd"
_make_legacy_db(
legacy_dir,
identity_hash,
[
"CREATE TABLE config (key TEXT UNIQUE, value TEXT)",
"INSERT INTO config (key, value) VALUES ('k1', 'v1')",
],
)
migrator = LegacyMigrator(provider, legacy_dir, identity_hash)
assert migrator.migrate() is True
row = provider.fetchone("SELECT value FROM config WHERE key = 'k1'")
assert row is not None
assert row["value"] == "v1"
def test_path_with_single_quote_does_not_crash(self, db_env):
"""A path containing a single quote must not cause SQL injection or crash."""
provider, _schema, tmp_path = db_env
quoted_dir = tmp_path / "it's_a_test"
quoted_dir.mkdir(parents=True, exist_ok=True)
legacy_dir = str(quoted_dir)
identity_hash = "aabbccdd"
_make_legacy_db(
legacy_dir,
identity_hash,
[
"CREATE TABLE config (key TEXT UNIQUE, value TEXT)",
"INSERT INTO config (key, value) VALUES ('q1', 'quoted_val')",
],
)
migrator = LegacyMigrator(provider, legacy_dir, identity_hash)
result = migrator.migrate()
assert result is True
row = provider.fetchone("SELECT value FROM config WHERE key = 'q1'")
assert row is not None
assert row["value"] == "quoted_val"
def test_path_with_multiple_quotes(self, db_env):
"""Multiple single quotes in the path are all escaped."""
provider, _schema, tmp_path = db_env
weird_dir = tmp_path / "a'b'c"
weird_dir.mkdir(parents=True, exist_ok=True)
legacy_dir = str(weird_dir)
identity_hash = "11223344"
_make_legacy_db(
legacy_dir,
identity_hash,
[
"CREATE TABLE config (key TEXT UNIQUE, value TEXT)",
"INSERT INTO config (key, value) VALUES ('mq', 'multi_quote')",
],
)
migrator = LegacyMigrator(provider, legacy_dir, identity_hash)
assert migrator.migrate() is True
row = provider.fetchone("SELECT value FROM config WHERE key = 'mq'")
assert row is not None
assert row["value"] == "multi_quote"
def test_path_with_sql_injection_attempt(self, db_env):
"""A path crafted to look like SQL injection is safely escaped."""
provider, _schema, tmp_path = db_env
evil_dir = tmp_path / "'; DROP TABLE config; --"
evil_dir.mkdir(parents=True, exist_ok=True)
legacy_dir = str(evil_dir)
identity_hash = "deadbeef"
_make_legacy_db(
legacy_dir,
identity_hash,
[
"CREATE TABLE config (key TEXT UNIQUE, value TEXT)",
"INSERT INTO config (key, value) VALUES ('evil', 'nope')",
],
)
migrator = LegacyMigrator(provider, legacy_dir, identity_hash)
migrator.migrate()
tables = provider.fetchall(
"SELECT name FROM sqlite_master WHERE type='table' AND name='config'"
)
assert len(tables) > 0, "config table must still exist after injection attempt"
# ---------------------------------------------------------------------------
# 2. Legacy migrator — malicious column names filtered out
# ---------------------------------------------------------------------------
class TestLegacyColumnFiltering:
def test_normal_columns_migrate(self, db_env):
"""Standard column names pass through the identifier filter."""
provider, _schema, tmp_path = db_env
legacy_dir = str(tmp_path / "legacy_cols")
identity_hash = "aabb0011"
_make_legacy_db(
legacy_dir,
identity_hash,
[
"CREATE TABLE config (key TEXT UNIQUE, value TEXT)",
"INSERT INTO config (key, value) VALUES ('c1', 'ok')",
],
)
migrator = LegacyMigrator(provider, legacy_dir, identity_hash)
assert migrator.migrate() is True
row = provider.fetchone("SELECT value FROM config WHERE key = 'c1'")
assert row is not None
def test_malicious_column_name_is_skipped(self, db_env):
"""A column with SQL metacharacters in its name must be silently skipped."""
provider, _schema, tmp_path = db_env
legacy_dir = str(tmp_path / "legacy_evil_col")
identity_hash = "cc00dd00"
identity_dir = os.path.join(legacy_dir, "identities", identity_hash)
os.makedirs(identity_dir, exist_ok=True)
db_path = os.path.join(identity_dir, "database.db")
conn = sqlite3.connect(db_path)
conn.execute(
'CREATE TABLE config (key TEXT UNIQUE, value TEXT, "key; DROP TABLE config" TEXT)'
)
conn.execute("INSERT INTO config (key, value) VALUES ('safe', 'data')")
conn.commit()
conn.close()
migrator = LegacyMigrator(provider, legacy_dir, identity_hash)
assert migrator.migrate() is True
row = provider.fetchone("SELECT value FROM config WHERE key = 'safe'")
assert row is not None
assert row["value"] == "data"
def test_column_with_parentheses_is_skipped(self, db_env):
"""Columns with () in the name are rejected by the identifier regex."""
provider, _schema, tmp_path = db_env
legacy_dir = str(tmp_path / "legacy_parens_col")
identity_hash = "ee00ff00"
identity_dir = os.path.join(legacy_dir, "identities", identity_hash)
os.makedirs(identity_dir, exist_ok=True)
db_path = os.path.join(identity_dir, "database.db")
conn = sqlite3.connect(db_path)
conn.execute('CREATE TABLE config (key TEXT UNIQUE, value TEXT, "evil()" TEXT)')
conn.execute("INSERT INTO config (key, value) VALUES ('p1', 'parens')")
conn.commit()
conn.close()
migrator = LegacyMigrator(provider, legacy_dir, identity_hash)
assert migrator.migrate() is True
row = provider.fetchone("SELECT value FROM config WHERE key = 'p1'")
assert row is not None
# ---------------------------------------------------------------------------
# 3. _validate_identifier — unit tests
# ---------------------------------------------------------------------------
class TestValidateIdentifier:
@pytest.mark.parametrize(
"name",
[
"config",
"lxmf_messages",
"A",
"_private",
"Column123",
"a_b_c_d",
],
)
def test_valid_identifiers_pass(self, name):
assert _validate_identifier(name) == name
@pytest.mark.parametrize(
"name",
[
"",
"123abc",
"table name",
"col;drop",
"a'b",
'a"b',
"col()",
"x--y",
"a,b",
"hello\nworld",
"tab\there",
"col/**/name",
],
)
def test_invalid_identifiers_raise(self, name):
with pytest.raises(ValueError, match="Invalid SQL"):
_validate_identifier(name)
# ---------------------------------------------------------------------------
# 4. _ensure_column — rejects injection via table/column names
# ---------------------------------------------------------------------------
class TestEnsureColumnInjection:
def test_ensure_column_rejects_malicious_table_name(self, db_env):
_provider, schema, _tmp_path = db_env
with pytest.raises(ValueError, match="Invalid SQL table name"):
schema._ensure_column("config; DROP TABLE config", "new_col", "TEXT")
def test_ensure_column_rejects_malicious_column_name(self, db_env):
_provider, schema, _tmp_path = db_env
with pytest.raises(ValueError, match="Invalid SQL column name"):
schema._ensure_column("config", "col; DROP TABLE config", "TEXT")
def test_ensure_column_works_for_valid_names(self, db_env):
_provider, schema, _tmp_path = db_env
result = schema._ensure_column("config", "test_new_col", "TEXT")
assert result is True
def test_ensure_column_idempotent(self, db_env):
_provider, schema, _tmp_path = db_env
schema._ensure_column("config", "idempotent_col", "TEXT")
result = schema._ensure_column("config", "idempotent_col", "TEXT")
assert result is True
# ---------------------------------------------------------------------------
# 5. Property-based tests — identifier regex
# ---------------------------------------------------------------------------
_IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
@given(name=st.text(min_size=1, max_size=80))
@settings(deadline=None)
def test_validate_identifier_never_allows_sql_metacharacters(name):
"""No string accepted by _validate_identifier contains SQL metacharacters."""
try:
_validate_identifier(name)
except ValueError:
return
assert ";" not in name
assert "'" not in name
assert '"' not in name
assert "(" not in name
assert ")" not in name
assert " " not in name
assert "-" not in name
assert "/" not in name
assert "\\" not in name
assert "\n" not in name
assert "\r" not in name
assert "\t" not in name
assert "," not in name
assert _IDENTIFIER_RE.match(name)
@given(name=st.from_regex(r"[A-Za-z_][A-Za-z0-9_]{0,30}", fullmatch=True))
@settings(deadline=None)
def test_validate_identifier_accepts_all_valid_identifiers(name):
"""Every string matching the identifier pattern is accepted."""
assert _validate_identifier(name) == name
@given(
name=st.text(
alphabet=st.sampled_from(list(";'\"()- \t\n\r,/*")),
min_size=1,
max_size=30,
)
)
@settings(deadline=None)
def test_validate_identifier_rejects_pure_metacharacter_strings(name):
"""Strings composed entirely of SQL metacharacters are always rejected."""
with pytest.raises(ValueError):
_validate_identifier(name)
# ---------------------------------------------------------------------------
# 6. ATTACH path escaping — property-based
# ---------------------------------------------------------------------------
@given(
path_segment=st.text(
alphabet=st.characters(
whitelist_categories=("L", "N", "P", "S", "Z"),
),
min_size=1,
max_size=60,
)
)
@settings(deadline=None, suppress_health_check=[HealthCheck.too_slow])
def test_attach_path_escaping_never_breaks_sql(path_segment):
"""The quote-doubling escaping produces a string that SQLite can parse
without breaking out of the literal, regardless of the path content."""
safe = path_segment.replace("'", "''")
sql = f"ATTACH DATABASE '{safe}' AS test_alias"
assert sql.count("ATTACH DATABASE '") == 1
after_open = sql.split("ATTACH DATABASE '", 1)[1]
in_literal = True
i = 0
while i < len(after_open):
if after_open[i] == "'":
if i + 1 < len(after_open) and after_open[i + 1] == "'":
i += 2
continue
else:
in_literal = False
remainder = after_open[i + 1 :]
break
i += 1
if not in_literal:
assert remainder.strip() == "AS test_alias", (
f"Unexpected SQL after literal end: {remainder!r}"
)