From edd88d90b3ba71ec5ede3e83ba0083afbba06441 Mon Sep 17 00:00:00 2001 From: Ivan Date: Fri, 1 May 2026 04:13:11 -0500 Subject: [PATCH] feat(database): add sanitization functions for PRAGMA names and WAL checkpoint modes --- meshchatx/src/backend/database/__init__.py | 45 +++++++++++++++------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/meshchatx/src/backend/database/__init__.py b/meshchatx/src/backend/database/__init__.py index ea1d3e0..8241946 100644 --- a/meshchatx/src/backend/database/__init__.py +++ b/meshchatx/src/backend/database/__init__.py @@ -3,6 +3,7 @@ import json import logging import os +import re import shutil import zipfile from datetime import UTC, datetime @@ -14,7 +15,6 @@ from .contacts import ContactsDAO from .crash_history import CrashHistoryDAO from .debug_logs import DebugLogsDAO from .gifs import UserGifsDAO -from .legacy_migrator import LegacyMigrator from .map_drawings import MapDrawingsDAO from .messages import MessageDAO from .misc import MiscDAO @@ -32,6 +32,31 @@ MIN_SIZE_RATIO = 0.2 _log = logging.getLogger("meshchatx.database") +_PRAGMA_READ_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*\Z") + +_ALLOWED_WAL_CHECKPOINT_MODES = frozenset({"PASSIVE", "FULL", "RESTART", "TRUNCATE"}) + + +def _sanitize_pragma_read_name(name: str | None) -> str | None: + """Allow only simple SQLite pragma tokens for dynamic ``PRAGMA name`` reads.""" + if not name or not isinstance(name, str): + return None + token = name.strip() + if not token or not _PRAGMA_READ_NAME_RE.match(token): + return None + return token + + +def _sanitize_wal_checkpoint_mode(mode: str) -> str: + if not isinstance(mode, str): + msg = "WAL checkpoint mode must be a string" + raise ValueError(msg) + m = mode.strip().upper() + if m not in _ALLOWED_WAL_CHECKPOINT_MODES: + msg = f"Invalid WAL checkpoint mode: {mode!r}" + raise ValueError(msg) + return m + class Database: def __init__(self, db_path): @@ -58,16 +83,6 @@ class Database: self._tune_sqlite_pragmas() self.schema.initialize() - def migrate_from_legacy(self, reticulum_config_dir, identity_hash_hex): - migrator = LegacyMigrator( - self.provider, - reticulum_config_dir, - identity_hash_hex, - ) - if migrator.should_migrate(): - return migrator.migrate() - return False - def execute_sql(self, query, params=None): return self.provider.execute(query, params) @@ -84,8 +99,11 @@ class Database: print(f"SQLite pragma setup failed: {exc}") def _get_pragma_value(self, pragma: str, default=None): + safe = _sanitize_pragma_read_name(pragma) + if safe is None: + return default try: - cursor = self.execute_sql(f"PRAGMA {pragma}") + cursor = self.execute_sql(f"PRAGMA {safe}") row = cursor.fetchone() if row is None: return default @@ -295,7 +313,8 @@ class Database: } def _checkpoint_wal(self, mode: str = "TRUNCATE"): - return self.execute_sql(f"PRAGMA wal_checkpoint({mode})").fetchall() + safe_mode = _sanitize_wal_checkpoint_mode(mode) + return self.execute_sql(f"PRAGMA wal_checkpoint({safe_mode})").fetchall() def run_database_vacuum(self): try: