mirror of
https://git.quad4.io/RNS-Things/MeshChatX.git
synced 2026-05-12 07:14:42 +00:00
feat(database): add sanitization functions for PRAGMA names and WAL checkpoint modes
This commit is contained in:
@@ -3,6 +3,7 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import zipfile
|
||||
from datetime import UTC, datetime
|
||||
@@ -14,7 +15,6 @@ from .contacts import ContactsDAO
|
||||
from .crash_history import CrashHistoryDAO
|
||||
from .debug_logs import DebugLogsDAO
|
||||
from .gifs import UserGifsDAO
|
||||
from .legacy_migrator import LegacyMigrator
|
||||
from .map_drawings import MapDrawingsDAO
|
||||
from .messages import MessageDAO
|
||||
from .misc import MiscDAO
|
||||
@@ -32,6 +32,31 @@ MIN_SIZE_RATIO = 0.2
|
||||
|
||||
_log = logging.getLogger("meshchatx.database")
|
||||
|
||||
_PRAGMA_READ_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*\Z")
|
||||
|
||||
_ALLOWED_WAL_CHECKPOINT_MODES = frozenset({"PASSIVE", "FULL", "RESTART", "TRUNCATE"})
|
||||
|
||||
|
||||
def _sanitize_pragma_read_name(name: str | None) -> str | None:
|
||||
"""Allow only simple SQLite pragma tokens for dynamic ``PRAGMA name`` reads."""
|
||||
if not name or not isinstance(name, str):
|
||||
return None
|
||||
token = name.strip()
|
||||
if not token or not _PRAGMA_READ_NAME_RE.match(token):
|
||||
return None
|
||||
return token
|
||||
|
||||
|
||||
def _sanitize_wal_checkpoint_mode(mode: str) -> str:
|
||||
if not isinstance(mode, str):
|
||||
msg = "WAL checkpoint mode must be a string"
|
||||
raise ValueError(msg)
|
||||
m = mode.strip().upper()
|
||||
if m not in _ALLOWED_WAL_CHECKPOINT_MODES:
|
||||
msg = f"Invalid WAL checkpoint mode: {mode!r}"
|
||||
raise ValueError(msg)
|
||||
return m
|
||||
|
||||
|
||||
class Database:
|
||||
def __init__(self, db_path):
|
||||
@@ -58,16 +83,6 @@ class Database:
|
||||
self._tune_sqlite_pragmas()
|
||||
self.schema.initialize()
|
||||
|
||||
def migrate_from_legacy(self, reticulum_config_dir, identity_hash_hex):
|
||||
migrator = LegacyMigrator(
|
||||
self.provider,
|
||||
reticulum_config_dir,
|
||||
identity_hash_hex,
|
||||
)
|
||||
if migrator.should_migrate():
|
||||
return migrator.migrate()
|
||||
return False
|
||||
|
||||
def execute_sql(self, query, params=None):
|
||||
return self.provider.execute(query, params)
|
||||
|
||||
@@ -84,8 +99,11 @@ class Database:
|
||||
print(f"SQLite pragma setup failed: {exc}")
|
||||
|
||||
def _get_pragma_value(self, pragma: str, default=None):
|
||||
safe = _sanitize_pragma_read_name(pragma)
|
||||
if safe is None:
|
||||
return default
|
||||
try:
|
||||
cursor = self.execute_sql(f"PRAGMA {pragma}")
|
||||
cursor = self.execute_sql(f"PRAGMA {safe}")
|
||||
row = cursor.fetchone()
|
||||
if row is None:
|
||||
return default
|
||||
@@ -295,7 +313,8 @@ class Database:
|
||||
}
|
||||
|
||||
def _checkpoint_wal(self, mode: str = "TRUNCATE"):
|
||||
return self.execute_sql(f"PRAGMA wal_checkpoint({mode})").fetchall()
|
||||
safe_mode = _sanitize_wal_checkpoint_mode(mode)
|
||||
return self.execute_sql(f"PRAGMA wal_checkpoint({safe_mode})").fetchall()
|
||||
|
||||
def run_database_vacuum(self):
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user