diff --git a/scripts/db/README.md b/scripts/db/README.md index 68d4f74ac7..333a6f4acf 100644 --- a/scripts/db/README.md +++ b/scripts/db/README.md @@ -1,96 +1,286 @@ -# Transfer data from SQLite to Postgres database +## Transfer data from SQLite to Postgres database -1. \* Decrypt SQLite database if it is encrypted. +1. Decrypt SQLite database if it is encrypted. - ```sh - sqlcipher encrypted_simplex_v1_agent.db - ``` + 1. Agent: - ```sql - PRAGMA key = 'password'; - ATTACH DATABASE 'simplex_v1_agent.db' AS plaintext KEY ''; - SELECT sqlcipher_export('plaintext'); - DETACH DATABASE plaintext; - ``` + 1. Open sqlite db: - Repeat for `simplex_v1_chat.db`. + ```sh + sqlcipher simplex_v1_agent.db + ``` + + 2. Set your db password: + + ```sql + PRAGMA key = ''; + ``` + + 3. Check if db was successfully decrypted: + + ```sh + SELECT count(*) FROM sqlite_master; + ``` + + 4. Attach new empty db: + + ```sh + ATTACH DATABASE 'simplex_v1_agent_plaintext.db' AS plaintext KEY ''; + ``` + + 5. Export opened db to attached db as plaintext: + + ```sh + SELECT sqlcipher_export('plaintext'); + ``` + + 6. Deattach the plaintext db: + + ```sh + DETACH DATABASE plaintext; + ``` + + 2. Chat: + + 1. Open sqlite db: + + ```sh + sqlcipher simplex_v1_chat.db + ``` + + 2. Set your db password: + + ```sql + PRAGMA key = ''; + ``` + + 3. Check if db was successfully decrypted: + + ```sh + SELECT count(*) FROM sqlite_master; + ``` + + 4. Attach new empty db: + + ```sh + ATTACH DATABASE 'simplex_v1_chat_plaintext.db' AS plaintext KEY ''; + ``` + + 5. Export opened db to attached db as plaintext: + + ```sh + SELECT sqlcipher_export('plaintext'); + ``` + + 6. Deattach the plaintext db: + + ```sh + DETACH DATABASE plaintext; + ``` 2. Prepare Postgres database. - - Create Postgres database. In shell: + 1. Connect to PostgreSQL databse: ```sh - createdb -O simplex simplex_v1 + psql -U postgres -h localhost ``` - Or via query. + 2. Create user with password: - - Build `simplex-chat` executable with `client_postgres` flag and run it to initialize new chat database. + ```sh + CREATE USER simplex WITH ENCRYPTED PASSWORD '123123'; + ``` - This should create `simplex_v1_agent_schema` and `simplex_v1_chat_schema` schemas in `simplex_v1` database, with `migrations` tables populated. Some tables would have initialization data - it will be truncated via pgloader command in next step. + 3. Create database: + + ```sh + CREATE DATABASE simplex_v1; + ``` + + 4. Assign permissions: + + ```sh + GRANT ALL PRIVILEGES ON DATABASE simplex_v1 TO simplex; + ``` + +3. Prepare database: + + You should build the CLI binary from the same `TAG` as the desktop. + + 1. Build CLI with PostgreSQL support: + + ```sh + cabal build -fclient_postgres exe:simplex-chat + ``` + + And rename it to: + + ```sh + mv simplex-chat simplex-chat-pg + ``` + + 2. Execute CLI: + + ```sh + ./simplex-chat-pg -d "postgresql://simplex:123123@localhost:5432/simplex_v1" --create-schema + ``` + + Press `Ctrl+C` when CLI ask for a display name. + + This should create `simplex_v1_agent_schema` and `simplex_v1_chat_schema` schemas in `simplex_v1` database, with `migrations` tables populated. Some tables would have initialization data - it will be truncated via pgloader command in next step. 3. Load data from decrypted SQLite databases to Postgres database via pgloader. Install pgloader and add it to PATH. Run in shell (substitute paths): ```sh - SQLITE_DBPATH='simplex_v1_agent.db' POSTGRES_CONN='postgres://simplex@/simplex_v1' POSTGRES_SCHEMA='simplex_v1_agent_schema' pgloader --on-error-stop sqlite.load + export POSTGRES_CONN='postgresql://simplex:123123@localhost:5432/simplex_v1' + ``` - SQLITE_DBPATH='simplex_v1_chat.db' POSTGRES_CONN='postgres://simplex@/simplex_v1' POSTGRES_SCHEMA='simplex_v1_chat_schema' pgloader --on-error-stop sqlite.load + And then: + + ```sh + SQLITE_DBPATH='simplex_v1_agent_plaintext.db' \ + POSTGRES_SCHEMA='simplex_v1_agent_schema' \ + CPU_CORES=$(nproc) WORKERS=$((CPU_CORES - 1)) pgloader --dynamic-space-size 262144 --on-error-stop sqlite.load + + SQLITE_DBPATH='simplex_v1_chat_plaintext.db' \ + POSTGRES_SCHEMA='simplex_v1_chat_schema' \ + CPU_CORES=$(nproc) WORKERS=$((CPU_CORES - 1)) pgloader --dynamic-space-size 262144 --on-error-stop sqlite.load ``` 4. Update sequences for Postgres tables. - ```sql - DO $$ - DECLARE - rec RECORD; - BEGIN - EXECUTE 'SET SEARCH_PATH TO simplex_v1_agent_schema'; + Connect to db: - FOR rec IN - SELECT - table_name, - column_name, - pg_get_serial_sequence(table_name, column_name) AS seq_name - FROM - information_schema.columns - WHERE - table_schema = 'simplex_v1_agent_schema' - AND identity_generation = 'ALWAYS' - LOOP - EXECUTE format( - 'SELECT setval(%L, (SELECT MAX(%I) FROM %I))', - rec.seq_name, rec.column_name, rec.table_name - ); - END LOOP; - END $$; + ```sh + PGPASSWORD=123123 psql -h localhost -U simplex -d simplex_v1 ``` - Repeat for `simplex_v1_chat_schema`. + Execute the following: -5. \* Compare number of rows between Postgres and SQLite tables. + 1. For `agent`: - To check number of rows for all tables in Postgres database schema run: + ```sql + DO $$ + DECLARE + rec RECORD; + BEGIN + EXECUTE 'SET SEARCH_PATH TO simplex_v1_agent_schema'; - ```sql - WITH tbl AS ( - SELECT table_schema, table_name - FROM information_schema.Tables - WHERE table_name NOT LIKE 'pg_%' - AND table_schema IN ('simplex_v1_agent_schema') - ) - SELECT - table_schema AS schema_name, - table_name, - (xpath('/row/c/text()', query_to_xml( - format('SELECT count(*) AS c FROM %I.%I', table_schema, table_name), false, true, '' - )))[1]::text::int AS records_count - FROM tbl - ORDER BY records_count DESC; - ``` + FOR rec IN + SELECT + table_name, + column_name, + pg_get_serial_sequence(table_name, column_name) AS seq_name + FROM + information_schema.columns + WHERE + table_schema = 'simplex_v1_agent_schema' + AND identity_generation = 'ALWAYS' + LOOP + EXECUTE format( + 'SELECT setval(%L, (SELECT MAX(%I) FROM %I))', + rec.seq_name, rec.column_name, rec.table_name + ); + END LOOP; + END $$; + ``` - Repeat for `simplex_v1_chat_schema`. + 2. For `chat`: + + ```sql + DO $$ + DECLARE + rec RECORD; + BEGIN + EXECUTE 'SET SEARCH_PATH TO simplex_v1_chat_schema'; + + FOR rec IN + SELECT + table_name, + column_name, + pg_get_serial_sequence(table_name, column_name) AS seq_name + FROM + information_schema.columns + WHERE + table_schema = 'simplex_v1_chat_schema' + AND identity_generation = 'ALWAYS' + LOOP + EXECUTE format( + 'SELECT setval(%L, (SELECT MAX(%I) FROM %I))', + rec.seq_name, rec.column_name, rec.table_name + ); + END LOOP; + END $$; + ``` + +5. Compare number of rows between Postgres and SQLite tables. + + **PostgreSQL**: + + 1. For `agent`: + + ```sql + WITH tbl AS ( + SELECT table_schema, table_name + FROM information_schema.Tables + WHERE table_name NOT LIKE 'pg_%' + AND table_schema IN ('simplex_v1_agent_schema') + ) + SELECT + table_schema AS schema_name, + table_name, + (xpath('/row/c/text()', query_to_xml( + format('SELECT count(*) AS c FROM %I.%I', table_schema, table_name), false, true, '' + )))[1]::text::int AS records_count + FROM tbl + ORDER BY records_count DESC; + ``` + + 2. For `chat`: + + ```sql + WITH tbl AS ( + SELECT table_schema, table_name + FROM information_schema.Tables + WHERE table_name NOT LIKE 'pg_%' + AND table_schema IN ('simplex_v1_chat_schema') + ) + SELECT + table_schema AS schema_name, + table_name, + (xpath('/row/c/text()', query_to_xml( + format('SELECT count(*) AS c FROM %I.%I', table_schema, table_name), false, true, '' + )))[1]::text::int AS records_count + FROM tbl + ORDER BY records_count DESC; + ``` + + **SQLite**: + + 1. For `agent`: + + ```sh + db="simplex_v1_agent_plaintext.db" + sqlite3 "$db" "SELECT name FROM sqlite_master WHERE type='table';" | + while read table; do + count=$(sqlite3 "$db" "SELECT COUNT(*) FROM \"$table\";") + echo "$table: $count" + done | sort -k2 -nr | less + ``` + + 2. For `chat`: + + ```sh + db="simplex_v1_chat_plaintext.db" + sqlite3 "$db" "SELECT name FROM sqlite_master WHERE type='table';" | + while read table; do + count=$(sqlite3 "$db" "SELECT COUNT(*) FROM \"$table\";") + echo "$table: $count" + done | sort -k2 -nr | less + ``` 6. Build and run desktop app with Postgres backend. @@ -103,3 +293,47 @@ # or ./gradlew packageDmg -Pdatabase.backend=postgres ``` + +## Transfer data from SQLite to Postgres database + +1. Prepare sqlite db: + + 1. Download simplex-chat CLI: + + You should download the CLI binary from the same `TAG` as the desktop. + + ```sh + export TAG='v6.4.3.1' + curl -L "https://github.com/simplex-chat/simplex-chat/releases/download/${TAG}/simplex-chat-ubuntu-22_04-x86_64" -o 'simplex-chat' + ``` + + 2. Run the CLI: + + ```sh + ./simplex-chat + ``` + + Press `Ctrl+C` when CLI ask for a display name. + + 3. Move database: + + ```sh + mv ~/.simplex/simplex_v1_* ~/.local/share/simplex/ + ``` + +2. Transfer data: + + ```sh + ./pg2sqlite.py --verbose 'postgresql://simplex:123123@localhost:5432/simplex_v1' ~/.local/share/simplex/ + ``` + +4. Update BLOBs: + + ```sh + sqlite3 simplex_v1_chat.db + ``` + + ```sh + UPDATE group_members SET member_role = CAST(member_role as BLOB); + UPDATE user_contact_links SET group_link_member_role = CAST(group_link_member_role AS BLOB) WHERE group_link_member_role is not null; + ``` diff --git a/scripts/db/pg2sqlite.py b/scripts/db/pg2sqlite.py new file mode 100755 index 0000000000..533b15d143 --- /dev/null +++ b/scripts/db/pg2sqlite.py @@ -0,0 +1,899 @@ +#!/usr/bin/env python3 +""" +PostgreSQL -> per-schema SQLite migration with colored, clean logging. + +Usage example: + python db_migrate.py 'postgresql://user:pass@host:5432/db' /path/to/sqlite/dir --dry-run + +Note: color output will be disabled automatically if stdout is not a TTY or if --no-color is passed. +""" + +from __future__ import annotations +import argparse +import logging +import os +import sqlite3 +import sys +import re +import datetime +from pathlib import Path +from typing import Set, List, Tuple, Dict, Optional, NamedTuple + +import psycopg +from psycopg import sql + +# ---------------------- +# Small utilities +# ---------------------- + +try: + sqlite3.register_adapter( + datetime.datetime, + lambda v: v.isoformat(sep=" ", timespec="microseconds"), + ) + sqlite3.register_adapter(datetime.date, lambda v: v.isoformat()) + sqlite3.register_adapter( + datetime.time, lambda v: v.isoformat(timespec="microseconds") + ) +except ValueError: + pass # already registered + +ANSI = { + "reset": "\x1b[0m", + "bold": "\x1b[1m", + "dim": "\x1b[2m", + "red": "\x1b[31m", + "green": "\x1b[32m", + "yellow": "\x1b[33m", + "blue": "\x1b[34m", + "magenta": "\x1b[35m", + "cyan": "\x1b[36m", + "gray": "\x1b[90m", +} + +DEFAULT_BATCH_SIZE = 10000 + +TYPE_COMPATIBILITY = { + "bytea": ["BLOB", "CHAR", "CLOB", "TEXT", "JSON"], + "int": ["INT", "NUMERIC"], + "serial": ["INT", "NUMERIC"], + "numeric": ["NUMERIC", "DECIMAL", "REAL", "FLOAT", "DOUBLE"], + "decimal": ["NUMERIC", "DECIMAL", "REAL", "FLOAT", "DOUBLE"], + "real": ["NUMERIC", "DECIMAL", "REAL", "FLOAT", "DOUBLE"], + "double": ["NUMERIC", "DECIMAL", "REAL", "FLOAT", "DOUBLE"], + "float": ["NUMERIC", "DECIMAL", "REAL", "FLOAT", "DOUBLE"], + "money": ["NUMERIC", "DECIMAL", "REAL", "FLOAT", "DOUBLE"], + "bool": ["BOOL", "INT", "NUMERIC"], + "varchar": ["CHAR", "CLOB", "TEXT"], + "char": ["CHAR", "CLOB", "TEXT"], + "text": ["CHAR", "CLOB", "TEXT"], + "citext": ["CHAR", "CLOB", "TEXT"], + "timestamp": ["DATE", "TIME", "CHAR", "TEXT", "DATETIME"], + "time": ["DATE", "TIME", "CHAR", "TEXT", "DATETIME"], + "date": ["DATE", "TIME", "CHAR", "TEXT", "DATETIME"], + "uuid": ["CHAR", "TEXT", "UUID", "CLOB"], + "json": ["JSON", "TEXT", "CHAR", "CLOB"], + "jsonb": ["JSON", "TEXT", "CHAR", "CLOB"], +} + + +def _sanitize_cursor_name(s: str) -> str: + return re.sub(r"[^A-Za-z0-9_]+", "_", s) + + +def supports_color(force_no: bool) -> bool: + """Return True when we should emit ANSI colors.""" + if force_no: + return False + if os.getenv("NO_COLOR"): + return False + term = os.getenv("TERM", "") + if term == "" or term.lower() == "dumb": + return False + try: + isatty = sys.stdout.isatty() + except Exception: + isatty = False + return isatty + + +class ColoredFormatter(logging.Formatter): + LEVEL_COLORS = { + logging.DEBUG: ANSI["gray"], + logging.INFO: ANSI["green"], + logging.WARNING: ANSI["yellow"], + logging.ERROR: ANSI["red"], + logging.CRITICAL: ANSI["red"] + ANSI["bold"], + } + + TAG_COLORS = { + "SKIP": ANSI["yellow"], + "SCHEMA": ANSI["blue"], + "OK": ANSI["magenta"], + } + + def __init__(self, use_color: bool = True): + super().__init__(fmt="%(message)s") + self.use_color = use_color + + def format(self, record: logging.LogRecord) -> str: + msg = super().format(record) + parts = msg.split(" ", 1) + tag = parts[0] + rest = parts[1] if len(parts) > 1 else "" + plain_label = f"[{tag}]" + + if not self.use_color: + return f"{plain_label}{(' ' + rest) if rest else ''}" + + color = self.TAG_COLORS.get( + tag.upper(), self.LEVEL_COLORS.get(record.levelno, "") + ) + reset = ANSI["reset"] + return f"{color}{plain_label}{reset}{(' ' + rest) if rest else ''}" + + +def setup_logger(verbose: bool, no_color: bool) -> logging.Logger: + use_color = supports_color(no_color) + logger = logging.getLogger("db_migrate") + logger.setLevel(logging.DEBUG if verbose else logging.INFO) + handler = logging.StreamHandler() + handler.setLevel(logging.DEBUG if verbose else logging.INFO) + handler.setFormatter(ColoredFormatter(use_color=use_color)) + logger.handlers.clear() + logger.addHandler(handler) + logging.getLogger("psycopg").setLevel(logging.WARNING) + + if verbose: + logger.debug(f"color_support: {use_color}") + try: + isatty = sys.stdout.isatty() + except Exception: + isatty = False + logger.debug( + f"TERM={os.getenv('TERM', '')!r} " + f"NO_COLOR={os.getenv('NO_COLOR')!r} " + f"isatty={isatty}" + ) + return logger + + +def quote_sqlite_identifier(name: str) -> str: + return '"' + name.replace('"', '""') + '"' + + +def quote_pg_identifier(name: str) -> str: + """Simple PG identifier quoting for building safe SQL strings.""" + return '"' + name.replace('"', '""') + '"' + + +def sqlite_decl_satisfies(pg_type: str, sqlite_decl: str) -> bool: + # Treat empty/blank SQLite declarations more permissively based on PG type. + decl_raw = sqlite_decl or "" + if not decl_raw.strip(): + pg = (pg_type or "").lower() + # arrays -> textual affinity + if pg.endswith("[]"): + return True + # integer-like + if re.search(r"\b(?:int|serial|bigint)\b", pg): + return True + # numeric/float + if re.search(r"\b(?:numeric|decimal|real|double|float|money)\b", pg): + return True + # boolean + if re.search(r"\b(?:bool|boolean)\b", pg): + return True + # binary + if re.search(r"\b(?:bytea)\b", pg): + return True + # textual/json/uuid/timestamps/dates/times + if re.search( + r"\b(?:varchar|char|text|citext|jsonb|json|uuid|timestamp|time|date)\b", pg + ): + return True + # conservative fallback: accept empty decl as permissive + return True + + decl = decl_raw.upper() + pg = (pg_type or "").lower() + + # array type + if pg.endswith("[]"): + return any(tok in decl for tok in ("TEXT", "CHAR", "CLOB", "JSON")) + + for key, allowed_types in TYPE_COMPATIBILITY.items(): + if re.search(r"\b" + re.escape(key) + r"\b", pg): + return any(tok in decl for tok in allowed_types) + + return any( + tok in decl for tok in ("TEXT", "CHAR", "CLOB", "NUMERIC", "BLOB", "INT") + ) + + +# ---------------------- +# Postgres helpers +# ---------------------- + + +def list_user_schemas(pg_cursor) -> List[str]: + pg_cursor.execute( + """ + SELECT nspname + FROM pg_namespace + WHERE nspname NOT LIKE 'pg_%' + AND nspname != 'information_schema' + AND nspname != 'public' + ORDER BY nspname; + """ + ) + return [r[0] for r in pg_cursor.fetchall()] + + +def list_tables_in_schema(pg_cursor, schema: str) -> List[str]: + pg_cursor.execute( + sql.SQL( + """ + SELECT table_name + FROM information_schema.tables + WHERE table_schema = %s + ORDER BY table_name; + """ + ), + (schema,), + ) + return [r[0] for r in pg_cursor.fetchall()] + + +def get_pg_columns(pg_cursor, schema: str, table: str) -> List[Tuple[str, str]]: + pg_cursor.execute( + sql.SQL( + """ + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_schema = %s AND table_name = %s + ORDER BY ordinal_position; + """ + ), + (schema, table), + ) + return [(r[0], r[1]) for r in pg_cursor.fetchall()] + + +# ---------------------- +# SQLite helpers +# ---------------------- + + +class SQLiteCol(NamedTuple): + cid: int + name: str + type: str + notnull: int + dflt_value: str + pk: int + + +def get_sqlite_table_info(sqlite_cursor, table_name: str) -> list[SQLiteCol]: + qname = quote_sqlite_identifier(table_name) + sqlite_cursor.execute(f"PRAGMA table_info({qname});") + return [SQLiteCol(*row) for row in sqlite_cursor.fetchall()] + + +def sqlite_table_has_autoincrement(sqlite_cursor, table_name: str) -> bool: + sqlite_cursor.execute( + "SELECT sql FROM sqlite_master WHERE type='table' AND name = ?;", (table_name,) + ) + row = sqlite_cursor.fetchone() + if not row or not row[0]: + return False + return "AUTOINCREMENT" in row[0].upper() + + +def sqlite_sequence_table_exists(sqlite_cursor) -> bool: + sqlite_cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='sqlite_sequence';" + ) + exists = sqlite_cursor.fetchone() is not None + if not exists: + logging.getLogger("db_migrate").debug("sqlite_sequence table not present") + return exists + + +# ---------------------- +# Core migration logic +# ---------------------- + + +def build_select_for_sqlite_columns( + schema: str, + table: str, + sqlite_cols: list[SQLiteCol], + pg_columns_map: Dict[str, Tuple[str, str]], +) -> str: + """Return a plain SQL string (no trailing semicolon) selecting PG columns in the order of sqlite_cols. + Uses simple identifier quoting to avoid psycopg.sql objects which may vary by driver version. + """ + select_parts = [] + for col in sqlite_cols: + lc = col.name.lower() + if lc in pg_columns_map: + pg_name, pg_type = pg_columns_map[lc] + pg_type_l = (pg_type or "").lower() + if any(tok in pg_type_l for tok in ("timestamp", "time")): + expr = f"{quote_pg_identifier(pg_name)}::text" + else: + expr = f"{quote_pg_identifier(pg_name)}" + select_parts.append(expr) + else: + select_parts.append("NULL") + select_list = ", ".join(select_parts) + # Quote schema and table names simply (this is not comprehensive for every corner case but avoids psycopg.sql use) + return f"SELECT {select_list} FROM {quote_pg_identifier(schema)}.{quote_pg_identifier(table)}" + + +def validate_column_compatibility( + sqlite_cols: list[SQLiteCol], + pg_columns_map: Dict[str, Tuple[str, str]], + schema: str, + table: str, +) -> None: + for col in sqlite_cols: + lc = col.name.lower() + if lc not in pg_columns_map: + continue + pg_type = pg_columns_map[lc][1] + if not sqlite_decl_satisfies(pg_type, col.type): + raise ValueError( + f"Type mismatch for {schema}.{table}.{col.name}: " + f"PG='{pg_type}' vs SQLite='{col.type}'" + ) + + +def process_row_for_sqlite(row: Tuple, sqlite_cols: list[SQLiteCol]) -> Tuple: + out = [] + for i, val in enumerate(row): + col = sqlite_cols[i] + decl_raw = col.type or "" + decl = decl_raw.upper() + decl_is_empty = decl_raw.strip() == "" + + if isinstance(val, memoryview): + b = val.tobytes() + if "BLOB" in decl or decl_is_empty: + out.append(sqlite3.Binary(b)) + else: + try: + out.append(b.decode("utf-8")) + except UnicodeDecodeError as e: + raise ValueError( + f"UTF-8 decode failed for column '{col.name}': {e}" + ) + elif isinstance(val, (bytes, bytearray)): + b = bytes(val) + if "BLOB" in decl or decl_is_empty: + out.append(sqlite3.Binary(b)) + else: + try: + out.append(b.decode("utf-8")) + except UnicodeDecodeError as e: + raise ValueError( + f"UTF-8 decode failed for column '{col.name}': {e}" + ) + else: + out.append(val) + return tuple(out) + + +def fetch_and_validate_row_batches( + pg_cursor, + select_sql: str, + sqlite_cols: list[SQLiteCol], + batch_size: int = DEFAULT_BATCH_SIZE, +): + pg_cursor.execute(select_sql) + total_rows_seen = 0 + while True: + rows = pg_cursor.fetchmany(batch_size) + if not rows: + break + validated_batch = [] + for row in rows: + total_rows_seen += 1 + try: + validated_batch.append(process_row_for_sqlite(row, sqlite_cols)) + except ValueError as e: + raise ValueError(f"Row {total_rows_seen} validation error: {e}") + yield validated_batch + + +def _get_postgres_sequence_info( + pg_cursor, schema: str, table: str, pk_col: str +) -> Optional[tuple[str, int]]: + """Get PostgreSQL sequence name and last_value for a table primary key, if any. + + Returns (sequence_name_text, last_value) or None. + """ + pg_cursor.execute( + """ + SELECT data_type + FROM information_schema.columns + WHERE table_schema = %s AND table_name = %s AND column_name = %s; + """, + (schema, table, pk_col), + ) + r = pg_cursor.fetchone() + if not r: + return None + + pg_type = (r[0] or "").lower() + # Match whole words to avoid matching 'interval' etc. + if not re.search(r"\b(?:int|serial|bigint)\b", pg_type): + return None + + # Get sequence name text (NULL if none) + pg_cursor.execute( + "SELECT pg_get_serial_sequence(%s, %s);", + (f"{schema}.{table}", pk_col), + ) + row = pg_cursor.fetchone() + seq_name = row[0] if row else None + if not seq_name: + return None + + try: + # Read last_value by casting the pg_get_serial_sequence result to regclass. + pg_cursor.execute( + "SELECT last_value FROM pg_get_serial_sequence(%s, %s)::regclass;", + (f"{schema}.{table}", pk_col), + ) + rr = pg_cursor.fetchone() + if rr and rr[0] is not None: + return (seq_name, int(rr[0])) + except Exception: + # Swallow errors (best-effort); callers treat missing info as absent + pass + + return None + + +def _get_sqlite_max_pk_value(sqlite_cursor, table: str, pk_col: str) -> Optional[int]: + """Get the maximum primary key value from SQLite table.""" + logger = logging.getLogger("db_migrate") + try: + sqlite_cursor.execute( + f"SELECT MAX({quote_sqlite_identifier(pk_col)}) " + f"FROM {quote_sqlite_identifier(table)};" + ) + r3 = sqlite_cursor.fetchone() + if r3 and r3[0] is not None: + return int(r3[0]) + except Exception: + logger.debug("Failed to read sqlite max pk", exc_info=True) + return None + + +def _update_sqlite_sequence( + sqlite_conn: sqlite3.Connection, table: str, sequence_value: int +) -> bool: + """Update SQLite sqlite_sequence with new value. + + IMPORTANT: This function does not commit; caller must commit/rollback. + """ + s_cur = sqlite_conn.cursor() + s_cur.execute( + "UPDATE sqlite_sequence SET seq = ? WHERE name = ?;", + (sequence_value, table), + ) + if s_cur.rowcount == 0: + s_cur.execute( + "INSERT INTO sqlite_sequence(name, seq) VALUES (?, ?);", + (table, sequence_value), + ) + return True + + +def try_update_sqlite_sequence( + pg_cursor, + sqlite_conn: sqlite3.Connection, + schema: str, + pg_table: str, # PostgreSQL table name + sqlite_table: str, # SQLite table name + sqlite_cols: list[SQLiteCol], +) -> bool: + """Update SQLite sequence table based on PostgreSQL sequence values.""" + logger = logging.getLogger("db_migrate") + # Check if table has a single primary key + pks = [c for c in sqlite_cols if c.pk] + if len(pks) != 1: + return False + pk_col = pks[0].name + + # Check if SQLite has AUTOINCREMENT (use sqlite_table) + s_cur = sqlite_conn.cursor() + if not sqlite_table_has_autoincrement(s_cur, sqlite_table): + return False + if not sqlite_sequence_table_exists(s_cur): + return False + + # Get PostgreSQL sequence info (use pg_table) + seq_info = _get_postgres_sequence_info(pg_cursor, schema, pg_table, pk_col) + pg_last_value = seq_info[1] if seq_info else None + + # Get SQLite max PK value (use sqlite_table) + sqlite_max = _get_sqlite_max_pk_value(s_cur, sqlite_table, pk_col) + + # Determine the value to use + if pg_last_value is not None and sqlite_max is not None: + candidate = max(pg_last_value, sqlite_max) + elif pg_last_value is not None: + candidate = pg_last_value + elif sqlite_max is not None: + candidate = sqlite_max + else: + return False + + updated = _update_sqlite_sequence(sqlite_conn, sqlite_table, candidate) + if updated: + try: + sqlite_conn.commit() + except Exception: + logger.debug("Failed to commit sqlite_sequence update", exc_info=True) + # Let caller proceed; treat as best-effort + return updated + + +# ---------------------- +# Flow: per-schema migration +# ---------------------- + + +def migrate_schema( + pg_conn, + sqlite_dir: str, + schema: str, + skipped_tables: Set[str], + logger: logging.Logger, + dry_run: bool = False, + batch_size: int = DEFAULT_BATCH_SIZE, +) -> Tuple[int, int]: + """Migrate a single schema. Returns (tables_migrated, rows_inserted_total).""" + processed_schema = schema[:-7] if schema.endswith("_schema") else schema + sqlite_db_path = Path(sqlite_dir) / f"{processed_schema}.db" + if not sqlite_db_path.is_file(): + logger.error(f"Missing SQLite DB: {sqlite_db_path}") + return 0, 0 + + tables_migrated = 0 + rows_inserted = 0 + + sqlite_path = sqlite_db_path.resolve() + if dry_run: + uri = sqlite_path.as_uri() + "?mode=ro" + conn_args = {"database": uri, "uri": True} + else: + conn_args = {"database": str(sqlite_path)} + + with sqlite3.connect(**conn_args) as sqlite_conn: + sqlite_cur = sqlite_conn.cursor() + sqlite_cur.execute("SELECT name FROM sqlite_master WHERE type='table';") + sqlite_tables = [r[0] for r in sqlite_cur.fetchall()] + # O(1) lookup map for matching by lowercase name + sqlite_table_map = {t.lower(): t for t in sqlite_tables} + + with pg_conn.cursor() as pg_cur: + # Ensure we start in a clean state for this connection + try: + pg_conn.rollback() + except Exception: + # Ignore rollback failure; connection is newly opened most likely + logger.debug( + "pg_conn.rollback() at schema start ignored", exc_info=True + ) + + # Get table list for this schema, but catch/rollback on failure + try: + tables = list_tables_in_schema(pg_cur, schema) + except Exception as e: + logger.error("ERROR %s (list tables): %s", schema, e) + logger.debug("Traceback (list tables):", exc_info=True) + try: + pg_conn.rollback() + except Exception: + logger.debug( + "pg_conn.rollback() failed after list tables error", + exc_info=True, + ) + return 0, 0 + + for table in tables: + # Defensive: ensure connection is in a clean state before any new PG work. + # A prior error can leave the connection in an aborted transaction; calling + # rollback() clears that and allows subsequent SELECTs to run. + try: + pg_conn.rollback() + except Exception: + # ignore: best-effort cleanup + logger.debug( + "pg_conn.rollback() ignored at start of table loop", + exc_info=True, + ) + + if table.lower() in skipped_tables: + logger.info("SKIP %s.%s (explicit)", schema, table) + continue + if table.lower() not in sqlite_table_map: + logger.info("SKIP %s.%s (no target table)", schema, table) + continue + + matched_table = sqlite_table_map[table.lower()] + logger.info("OK %s.%s -> %s", schema, table, matched_table) + + # Fetch PG columns, but defend against aborted transaction here + try: + pg_cols = get_pg_columns(pg_cur, schema, table) + except Exception as e: + logger.error("ERROR %s.%s (get columns): %s", schema, table, e) + logger.debug("Traceback (get columns):", exc_info=True) + try: + pg_conn.rollback() + except Exception: + logger.debug( + "pg_conn.rollback() failed after get columns error", + exc_info=True, + ) + continue + + pg_map: Dict[str, Tuple[str, str]] = { + name.lower(): (name, dtype) for name, dtype in pg_cols + } + + sqlite_info = get_sqlite_table_info(sqlite_cur, matched_table) + if not sqlite_info: + logger.warning(f"SKIP {schema}.{table} (no sqlite info)") + continue + + # Warn once about schema drift (extra columns) + pg_col_names = {name.lower() for name, _ in pg_cols} + extra_in_sqlite = { + c.name for c in sqlite_info if c.name.lower() not in pg_col_names + } + if extra_in_sqlite: + logger.warning( + f"Schema drift: {schema}.{table} has extra SQLite columns {extra_in_sqlite}" + ) + + try: + validate_column_compatibility(sqlite_info, pg_map, schema, table) + except ValueError as e: + logger.error(f"ERROR {schema}.{table} (type mismatch): {e}") + continue + + select_sql = build_select_for_sqlite_columns( + schema, table, sqlite_info, pg_map + ) + + csr_name = _sanitize_cursor_name(f"csr_{schema}_{table}") + try: + with pg_conn.cursor(name=csr_name) as data_cur: + batch_gen = fetch_and_validate_row_batches( + data_cur, select_sql, sqlite_info, batch_size=batch_size + ) + first_batch = next(batch_gen, None) + + if not first_batch: + logger.info(f"SKIP {schema}.{table} (no rows)") + # data_cur will be closed automatically on leaving the 'with' + continue + + if dry_run: + count = len(first_batch) + for batch in batch_gen: + count += len(batch) + inserted = 0 + logger.info( + f"DRY {schema}.{table} rows_validated={count}" + ) + else: + cur = sqlite_conn.cursor() + quoted_table = quote_sqlite_identifier(matched_table) + col_names = [c.name for c in sqlite_info] + quoted_cols = ", ".join( + quote_sqlite_identifier(c) for c in col_names + ) + placeholders = ", ".join(["?"] * len(col_names)) + + cur.execute("BEGIN;") + try: + cur.execute(f"DELETE FROM {quoted_table};") + inserted = 0 + if first_batch: + cur.executemany( + f"INSERT INTO {quoted_table} ({quoted_cols}) VALUES ({placeholders});", + first_batch, + ) + inserted += len(first_batch) + for batch in batch_gen: + if not batch: + continue + cur.executemany( + f"INSERT INTO {quoted_table} ({quoted_cols}) VALUES ({placeholders});", + batch, + ) + inserted += len(batch) + except Exception: + cur.execute("ROLLBACK;") + raise + else: + cur.execute("COMMIT;") + except ValueError as e: + logger.error("ERROR %s.%s (row validation): %s", schema, table, e) + try: + pg_conn.rollback() + except Exception: + logger.debug( + "pg_conn.rollback() failed after row validation error", + exc_info=True, + ) + continue + except Exception as e: + logger.error("ERROR %s.%s (select failed): %s", schema, table, e) + logger.debug("Traceback (select failed):", exc_info=True) + try: + pg_conn.rollback() + except Exception: + logger.debug( + "pg_conn.rollback() failed after select failed", + exc_info=True, + ) + continue + + rows_inserted += inserted + tables_migrated += 1 + logger.info(f"DONE {schema}.{table} rows={inserted}") + + if not dry_run: + try: + if try_update_sqlite_sequence( + pg_cur, + sqlite_conn, + schema, + table, + matched_table, + sqlite_info, + ): + logger.info( + "SEQ %s.%s sqlite_sequence updated", schema, table + ) + except Exception as e: + logger.warning( + "SEQ %s.%s update failed (ignored): %s", schema, table, e + ) + + return tables_migrated, rows_inserted + + +def migrate_data( + pg_conn_str: str, + sqlite_dir: str, + schema_filter: Optional[str], + dry_run: bool, + skip_tables: str, + logger: logging.Logger, + batch_size: int, +) -> None: + skipped_tables = {t.strip().lower() for t in skip_tables.split(",") if t.strip()} + total_tables = 0 + total_rows = 0 + total_errors = 0 + + # List schemas once with a short-lived connection + with psycopg.connect(pg_conn_str) as tmp_conn: + with tmp_conn.cursor() as cur: + schemas = list_user_schemas(cur) + + if schema_filter: + schemas = [s for s in schemas if s == schema_filter] + if not schemas: + logger.error("No schemas to process") + return + + for schema in schemas: + logger.info("SCHEMA %s", schema) + if dry_run: + logger.info("(dry-run) validating only — no writes will be performed") + + # Use a fresh connection per-schema to isolate failures/aborted transactions + try: + with psycopg.connect(pg_conn_str) as pg_conn: + migrated_tables, inserted_rows = migrate_schema( + pg_conn, + sqlite_dir, + schema, + skipped_tables, + logger, + dry_run=dry_run, + batch_size=batch_size, + ) + except Exception as e: + logger.error("Schema %s failed: %s", schema, e) + logger.debug("Traceback (schema failure):", exc_info=True) + total_errors += 1 + continue + + total_tables += migrated_tables + total_rows += inserted_rows + + logger.info("---") + logger.info( + f"SUMMARY: schemas={len(schemas)} " + f"tables_migrated={total_tables} " + f"rows_inserted={total_rows} " + f"errors={total_errors}" + ) + + +# ---------------------- +# CLI +# ---------------------- + + +def parse_args(argv): + p = argparse.ArgumentParser( + description="Migrate Postgres data into per-schema SQLite DBs (non-invasive)." + ) + p.add_argument("pg_conn", help="Postgres connection string") + p.add_argument( + "sqlite_dir", help="Directory containing per-schema sqlite .db files" + ) + p.add_argument( + "--schema", help="Only migrate this schema (exact match)", default=None + ) + p.add_argument( + "--dry-run", + help="Validate and report only; do not write to sqlite", + action="store_true", + ) + p.add_argument("--verbose", help="Verbose logging (debug)", action="store_true") + p.add_argument("--no-color", help="Disable ANSI color output", action="store_true") + p.add_argument( + "--batch-size", + type=int, + default=DEFAULT_BATCH_SIZE, + help="Batch size for data fetching", + ) + p.add_argument( + "--skip-tables", + help="Comma-separated list of tables to skip", + default="migrations,servers_stats", + ) + + return p.parse_args(argv[1:]) + + +def main(argv): + args = parse_args(argv) + logger = setup_logger(args.verbose, args.no_color) + + sqlite_dir_path = Path(args.sqlite_dir) + if not sqlite_dir_path.is_dir(): + logger.error("SQLite directory does not exist or is not a directory.") + raise SystemExit(1) + + try: + migrate_data( + args.pg_conn, + str(sqlite_dir_path), + args.schema, + args.dry_run, + args.skip_tables, + logger, + args.batch_size, + ) + except Exception as e: + logger.error(f"Fatal error: {e}") + raise SystemExit(2) + + +if __name__ == "__main__": + main(sys.argv) diff --git a/scripts/db/sqlite.load b/scripts/db/sqlite.load index 105e8c39d2..7f60e72c2f 100644 --- a/scripts/db/sqlite.load +++ b/scripts/db/sqlite.load @@ -10,7 +10,8 @@ LOAD DATABASE -- pgloader implementation doesn't find "GENERATED ALWAYS AS IDENTITY" sequences, -- instead we reset sequences manually via custom query after load reset no sequences, - data only + data only, + workers = {{WORKERS}} EXCLUDING TABLE NAMES LIKE 'migrations', 'sqlite_sequence'