Files
meshcore-analyzer/internal/dbschema/dbschema.go
T
efiten 0b35c7eef3 feat(server): persist multi-byte capability across restart + O(1) per-key lookup (#903) (#1324)
## Summary

Follows the reconciliation recommendation in #916 — extracts only the
NET-NEW persistence layer from that PR (which is now superseded by #1002
for the overlay UI) into a focused 6-file change against current master.

**What this adds:**
- `multibyte_sup_v1` migration: `multibyte_sup INTEGER NOT NULL DEFAULT
0` + `multibyte_evidence TEXT` on `nodes`/`inactive_nodes` so capability
survives restart
- `hasMultibyteSupCols` schema detection gates the persist/load paths
- `loadMultibyteCapFromDB()`: pre-populates `mbCapSnapshot`/`mbCapIndex`
at startup — cold starts serve last-known capability without waiting for
the first ~15s analytics cycle
- `maybePersistMultibyteCapability()` + `persistMultibyteCapability()`:
after each analytics cycle; TryLock-gated (concurrent cycles coalesce);
skips `sup==0` entries (data-destruction guard)
- `GetMultibyteCapFor(pk)`: O(1) map lookup; both `handleNodes` and
node-detail call sites updated from the O(N)-alloc
`GetMultiByteCapMap()`

**What this explicitly does NOT change:**
- API field names (`multi_byte_status`, `multi_byte_evidence`,
`multi_byte_max_hash_size`)
- `EnrichNodeWithMultiByte` — unchanged
- `GetMultiByteCapMap` — still present for any external callers
- `public/map.js`, `public/live.css`, `Dockerfile`, `docs/` — zero
frontend churn

## Test plan

- [x] `TestMultibyteCapPersistRoundTrip` — confirmed values survive
persist → fresh-store load
- [x] `TestMultibyteCapPersistSkipsUnknown` — data-destruction guard:
`sup==0` entry does not overwrite DB-confirmed value
- [x] `TestMultibyteCapMaybePersistCoalesces` — TryLock coalesces 10
concurrent callers without deadlock
- [x] `TestMultibyteCapGetMultibyteCapForO1` — O(1) index returns
correct entry / false for unknown pubkey
- [x] `TestMultibyteCapLoadFromDB` — only `sup>0` rows loaded; `sup==0`
row excluded
- [x] `TestSchemaMultibyteSupColumns` — migration adds columns to both
tables; idempotent on second `OpenStore`
- [x] All existing `TestMultiByteCapability_*` tests pass unchanged
- [x] Full ingestor test suite: `ok` in 27s
- [x] `go build ./cmd/server/ && go build ./cmd/ingestor/` clean

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: openclaw-bot <bot@openclaw>
2026-05-25 22:35:35 -07:00

486 lines
17 KiB
Go

// Package dbschema centralizes schema migrations and read-side schema
// assertions for the CoreScope SQLite DB. Per issue #1287 the writer
// (cmd/ingestor) owns ALL CREATE/ALTER/INSERT/UPDATE/DELETE on schema
// objects; the server (cmd/server) only ASSERTS that the schema is in
// the expected shape and refuses to start otherwise.
//
// Apply(rw, log) runs from the ingestor at startup BEFORE subscribing to
// MQTT. AssertReady(ro) runs from the server at startup and returns an
// error listing every missing column/index/table.
//
// INVARIANT (#1321): Any optional column the SERVER detects via PRAGMA
// (e.g. cmd/server/db.go detectSchema → hasScopeName, hasDefaultScope,
// hasObsRawHex, hasResolvedPath) MUST be added here, in Apply, AND
// asserted in AssertReady. Adding it only to cmd/ingestor/db.go
// applySchema reintroduces the startup-race bug from #1321: the
// server's PRAGMA can run before the ingestor finishes applySchema,
// the boolean caches `false`, and feature endpoints permanently 500
// until the server restarts. Source of truth lives here — full stop.
package dbschema
import (
"database/sql"
"errors"
"fmt"
"strings"
)
// Logger is the minimal logging surface used by Apply. Both cmd/server
// and cmd/ingestor satisfy this with the stdlib `log` package's Printf
// (passed as a closure to avoid an indirect log dependency here).
type Logger func(format string, args ...interface{})
// Apply runs every server-side ensure_* migration against the given
// read-write SQLite connection. Each operation is idempotent
// (IF NOT EXISTS / column-probe-before-ALTER). Safe to call repeatedly.
//
// Called by the ingestor at startup. The server MUST NOT call this —
// it only calls AssertReady.
func Apply(rw *sql.DB, logf Logger) error {
if logf == nil {
logf = func(string, ...interface{}) {}
}
if err := ensureServerIndexes(rw); err != nil {
return fmt.Errorf("ensure server indexes: %w", err)
}
if err := ensureNeighborEdgesTable(rw); err != nil {
return fmt.Errorf("ensure neighbor_edges: %w", err)
}
if err := ensureInactiveNodesTable(rw); err != nil {
return fmt.Errorf("ensure inactive_nodes: %w", err)
}
if err := ensureResolvedPathColumn(rw, logf); err != nil {
return fmt.Errorf("ensure resolved_path: %w", err)
}
if err := ensureObserverInactiveColumn(rw, logf); err != nil {
return fmt.Errorf("ensure observers.inactive: %w", err)
}
if err := ensureLastPacketAtColumn(rw, logf); err != nil {
return fmt.Errorf("ensure observers.last_packet_at: %w", err)
}
if err := ensureObserverIATAColumn(rw, logf); err != nil {
return fmt.Errorf("ensure observers.iata: %w", err)
}
if err := ensureForeignAdvertColumn(rw, logf); err != nil {
return fmt.Errorf("ensure foreign_advert: %w", err)
}
if err := ensureFromPubkeyColumn(rw, logf); err != nil {
return fmt.Errorf("ensure from_pubkey: %w", err)
}
if err := ensureScopeNameColumn(rw, logf); err != nil {
return fmt.Errorf("ensure scope_name: %w", err)
}
if err := ensureDefaultScopeColumns(rw, logf); err != nil {
return fmt.Errorf("ensure default_scope: %w", err)
}
if err := ensureObservationsRawHexColumn(rw, logf); err != nil {
return fmt.Errorf("ensure observations.raw_hex: %w", err)
}
if err := ensureMultibyteCapColumns(rw, logf); err != nil {
return fmt.Errorf("ensure multibyte_cap columns: %w", err)
}
return nil
}
// AssertReady verifies the schema is in the expected shape. The server
// calls this at startup against a read-only connection; if it returns
// non-nil, the server MUST fatal-log and exit so the operator restarts
// the ingestor (which owns migrations).
func AssertReady(ro *sql.DB) error {
var missing []string
mustCol := func(table, col string) {
has, err := TableHasColumn(ro, table, col)
if err != nil {
missing = append(missing, fmt.Sprintf("%s.%s (probe error: %v)", table, col, err))
return
}
if !has {
missing = append(missing, fmt.Sprintf("%s.%s", table, col))
}
}
mustTable := func(name string) {
var n int
err := ro.QueryRow(`SELECT 1 FROM sqlite_master WHERE type='table' AND name=?`, name).Scan(&n)
if errors.Is(err, sql.ErrNoRows) {
missing = append(missing, "table:"+name)
} else if err != nil {
missing = append(missing, fmt.Sprintf("table:%s (probe error: %v)", name, err))
}
}
mustTable("neighbor_edges")
mustCol("observations", "resolved_path")
mustCol("observers", "inactive")
mustCol("observers", "last_packet_at")
mustCol("observers", "iata")
mustCol("nodes", "foreign_advert")
mustCol("inactive_nodes", "foreign_advert")
mustCol("transmissions", "from_pubkey")
// #1321: server's detectSchema PRAGMA-detects these and caches a
// boolean. To kill the startup race they're owned + asserted here.
mustCol("transmissions", "scope_name")
mustCol("nodes", "default_scope")
mustCol("inactive_nodes", "default_scope")
mustCol("observations", "raw_hex")
// Multi-byte capability cache (#1324 follow-up; PR #903 surface).
// Owned by ingestor — server reads these for O(1) /api/nodes
// enrichment, ingestor's RunMultibyteCapPersist is the only writer.
mustCol("nodes", "multibyte_sup")
mustCol("nodes", "multibyte_evidence")
mustCol("inactive_nodes", "multibyte_sup")
mustCol("inactive_nodes", "multibyte_evidence")
if len(missing) > 0 {
return fmt.Errorf("schema not migrated by ingestor; restart ingestor first. missing: %s",
strings.Join(missing, ", "))
}
return nil
}
// TableHasColumn reports whether the given table has the given column.
// Exported because tests and the read-side need it without re-implementing.
func TableHasColumn(db *sql.DB, table, column string) (bool, error) {
rows, err := db.Query(fmt.Sprintf("PRAGMA table_info(%s)", table))
if err != nil {
return false, err
}
defer rows.Close()
for rows.Next() {
var cid int
var name string
var ctype sql.NullString
var notnull, pk int
var dflt sql.NullString
if err := rows.Scan(&cid, &name, &ctype, &notnull, &dflt, &pk); err != nil {
return false, err
}
if name == column {
return true, nil
}
}
return false, rows.Err()
}
// ─── ensure_* helpers (writer side) ────────────────────────────────────────
func ensureServerIndexes(rw *sql.DB) error {
stmts := []string{
`CREATE INDEX IF NOT EXISTS idx_transmissions_first_seen ON transmissions(first_seen)`,
`CREATE INDEX IF NOT EXISTS idx_transmissions_hash ON transmissions(hash)`,
`CREATE INDEX IF NOT EXISTS idx_transmissions_payload_type ON transmissions(payload_type)`,
`CREATE INDEX IF NOT EXISTS idx_observations_timestamp ON observations(timestamp)`,
`CREATE INDEX IF NOT EXISTS idx_observations_transmission_id ON observations(transmission_id)`,
// Composite covers GetChannelMessages' grouped MAX(timestamp) per
// transmission_id (issue #1366 / PR #1368). With this index sqlite can
// satisfy the aggregate index-only without touching the heap.
`CREATE INDEX IF NOT EXISTS idx_observations_tx_ts ON observations(transmission_id, timestamp)`,
}
for _, s := range stmts {
if _, err := rw.Exec(s); err != nil {
return fmt.Errorf("ensure index %q: %w", s, err)
}
}
// observer_idx (v3) vs observer_id (v2) — probe + index the matching one.
hasIdx, err := TableHasColumn(rw, "observations", "observer_idx")
if err != nil {
return err
}
if hasIdx {
if _, err := rw.Exec(`CREATE INDEX IF NOT EXISTS idx_observations_observer_idx ON observations(observer_idx)`); err != nil {
return err
}
}
hasID, err := TableHasColumn(rw, "observations", "observer_id")
if err != nil {
return err
}
if hasID {
if _, err := rw.Exec(`CREATE INDEX IF NOT EXISTS idx_observations_observer_id ON observations(observer_id)`); err != nil {
return err
}
}
return nil
}
func ensureNeighborEdgesTable(rw *sql.DB) error {
_, err := rw.Exec(`CREATE TABLE IF NOT EXISTS neighbor_edges (
node_a TEXT NOT NULL,
node_b TEXT NOT NULL,
count INTEGER DEFAULT 1,
last_seen TEXT,
PRIMARY KEY (node_a, node_b)
)`)
return err
}
// ensureInactiveNodesTable creates the inactive_nodes table if missing.
// The ingestor's applySchema also creates this table — duplicating it
// here makes dbschema.Apply self-sufficient when called against a
// fixture DB that pre-dates the soft-delete feature (e.g. CI's
// test-fixtures/e2e-fixture.db, which never had any inactive rows).
// Schema kept in sync with cmd/ingestor/db.go:applySchema.
func ensureInactiveNodesTable(rw *sql.DB) error {
_, err := rw.Exec(`CREATE TABLE IF NOT EXISTS inactive_nodes (
public_key TEXT PRIMARY KEY,
name TEXT,
role TEXT,
lat REAL,
lon REAL,
last_seen TEXT,
first_seen TEXT,
advert_count INTEGER DEFAULT 0,
battery_mv INTEGER,
temperature_c REAL,
foreign_advert INTEGER DEFAULT 0
)`)
if err != nil {
return err
}
_, err = rw.Exec(`CREATE INDEX IF NOT EXISTS idx_inactive_nodes_last_seen ON inactive_nodes(last_seen)`)
return err
}
func ensureResolvedPathColumn(rw *sql.DB, logf Logger) error {
has, err := TableHasColumn(rw, "observations", "resolved_path")
if err != nil {
return err
}
if has {
return nil
}
if _, err := rw.Exec("ALTER TABLE observations ADD COLUMN resolved_path TEXT"); err != nil {
return err
}
logf("[dbschema] added resolved_path column to observations")
return nil
}
func ensureObserverInactiveColumn(rw *sql.DB, logf Logger) error {
has, err := TableHasColumn(rw, "observers", "inactive")
if err != nil {
return err
}
if has {
return nil
}
if _, err := rw.Exec("ALTER TABLE observers ADD COLUMN inactive INTEGER DEFAULT 0"); err != nil {
return err
}
logf("[dbschema] added inactive column to observers")
return nil
}
func ensureLastPacketAtColumn(rw *sql.DB, logf Logger) error {
has, err := TableHasColumn(rw, "observers", "last_packet_at")
if err != nil {
return err
}
if has {
return nil
}
if _, err := rw.Exec("ALTER TABLE observers ADD COLUMN last_packet_at TEXT"); err != nil {
return err
}
logf("[dbschema] added last_packet_at column to observers")
return nil
}
func ensureObserverIATAColumn(rw *sql.DB, logf Logger) error {
has, err := TableHasColumn(rw, "observers", "iata")
if err != nil {
return err
}
if has {
return nil
}
if _, err := rw.Exec("ALTER TABLE observers ADD COLUMN iata TEXT"); err != nil {
return err
}
logf("[dbschema] added iata column to observers")
return nil
}
func ensureForeignAdvertColumn(rw *sql.DB, logf Logger) error {
for _, table := range []string{"nodes", "inactive_nodes"} {
has, err := TableHasColumn(rw, table, "foreign_advert")
if err != nil {
return fmt.Errorf("inspect %s: %w", table, err)
}
if has {
continue
}
if _, err := rw.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN foreign_advert INTEGER DEFAULT 0", table)); err != nil {
return err
}
logf("[dbschema] added foreign_advert column to %s", table)
}
return nil
}
func ensureFromPubkeyColumn(rw *sql.DB, logf Logger) error {
has, err := TableHasColumn(rw, "transmissions", "from_pubkey")
if err != nil {
return err
}
if !has {
if _, err := rw.Exec("ALTER TABLE transmissions ADD COLUMN from_pubkey TEXT"); err != nil {
return err
}
logf("[dbschema] added from_pubkey column to transmissions (#1143)")
}
if _, err := rw.Exec("CREATE INDEX IF NOT EXISTS idx_transmissions_from_pubkey ON transmissions(from_pubkey)"); err != nil {
return err
}
return nil
}
// ensureScopeNameColumn adds transmissions.scope_name (+ partial index) and
// records the legacy `_migrations` marker so the ingestor's old gated path
// stays idempotent on existing DBs. Moved here from cmd/ingestor/db.go in
// #1321 to be the canonical source of truth for the optional column the
// server PRAGMA-detects as hasScopeName.
func ensureScopeNameColumn(rw *sql.DB, logf Logger) error {
if err := ensureMigrationsTable(rw); err != nil {
return err
}
has, err := TableHasColumn(rw, "transmissions", "scope_name")
if err != nil {
return err
}
if !has {
if _, err := rw.Exec(`ALTER TABLE transmissions ADD COLUMN scope_name TEXT DEFAULT NULL`); err != nil {
return fmt.Errorf("alter transmissions add scope_name: %w", err)
}
logf("[dbschema] added scope_name column to transmissions (#899)")
}
if _, err := rw.Exec(`CREATE INDEX IF NOT EXISTS idx_tx_scope_name ON transmissions(scope_name) WHERE scope_name IS NOT NULL`); err != nil {
return fmt.Errorf("create idx_tx_scope_name: %w", err)
}
if _, err := rw.Exec(`INSERT OR IGNORE INTO _migrations (name) VALUES ('scope_name_v1')`); err != nil {
return fmt.Errorf("record scope_name_v1: %w", err)
}
return nil
}
// ensureDefaultScopeColumns adds default_scope to nodes + inactive_nodes
// and records the `_migrations` marker. Source of truth lives here per
// #1321 (was previously cmd/ingestor/db.go only).
func ensureDefaultScopeColumns(rw *sql.DB, logf Logger) error {
if err := ensureMigrationsTable(rw); err != nil {
return err
}
for _, table := range []string{"nodes", "inactive_nodes"} {
has, err := TableHasColumn(rw, table, "default_scope")
if err != nil {
return fmt.Errorf("inspect %s.default_scope: %w", table, err)
}
if has {
continue
}
if _, err := rw.Exec(fmt.Sprintf(`ALTER TABLE %s ADD COLUMN default_scope TEXT DEFAULT NULL`, table)); err != nil {
return fmt.Errorf("alter %s add default_scope: %w", table, err)
}
logf("[dbschema] added default_scope column to %s (#899)", table)
}
if _, err := rw.Exec(`INSERT OR IGNORE INTO _migrations (name) VALUES ('nodes_default_scope_v1')`); err != nil {
return fmt.Errorf("record nodes_default_scope_v1: %w", err)
}
return nil
}
// ensureObservationsRawHexColumn adds observations.raw_hex (#881).
// Source of truth lives here per #1321 (was previously cmd/ingestor/db.go only):
// the server PRAGMA-detects this column as hasObsRawHex.
func ensureObservationsRawHexColumn(rw *sql.DB, logf Logger) error {
if err := ensureMigrationsTable(rw); err != nil {
return err
}
has, err := TableHasColumn(rw, "observations", "raw_hex")
if err != nil {
return err
}
if !has {
if _, err := rw.Exec(`ALTER TABLE observations ADD COLUMN raw_hex TEXT`); err != nil {
return fmt.Errorf("alter observations add raw_hex: %w", err)
}
logf("[dbschema] added raw_hex column to observations (#881)")
}
if _, err := rw.Exec(`INSERT OR IGNORE INTO _migrations (name) VALUES ('observations_raw_hex_v1')`); err != nil {
return fmt.Errorf("record observations_raw_hex_v1: %w", err)
}
return nil
}
// ensureMigrationsTable is idempotent — the ingestor's applySchema also
// creates this table on legacy paths. Keeping it here lets the gated
// `INSERT OR IGNORE` markers above stay self-sufficient even when Apply
// runs against a brand-new fixture DB (e.g. dbschema_test.go).
func ensureMigrationsTable(rw *sql.DB) error {
_, err := rw.Exec(`CREATE TABLE IF NOT EXISTS _migrations (name TEXT PRIMARY KEY)`)
return err
}
// SoftDeleteBlacklistedObservers marks the given observer IDs as
// inactive=1 (case-insensitive match). Returns count affected.
// Writer-side helper; ingestor calls it at startup with the operator
// blacklist (read from config).
func SoftDeleteBlacklistedObservers(rw *sql.DB, blacklist []string) (int64, error) {
placeholders := make([]string, 0, len(blacklist))
args := make([]interface{}, 0, len(blacklist))
for _, pk := range blacklist {
t := strings.TrimSpace(pk)
if t == "" {
continue
}
placeholders = append(placeholders, "LOWER(?)")
args = append(args, t)
}
if len(placeholders) == 0 {
return 0, nil
}
q := "UPDATE observers SET inactive = 1 WHERE LOWER(id) IN (" +
strings.Join(placeholders, ",") + ") AND (inactive IS NULL OR inactive = 0)"
res, err := rw.Exec(q, args...)
if err != nil {
return 0, err
}
n, _ := res.RowsAffected()
return n, nil
}
// ensureMultibyteCapColumns adds the multi-byte capability cache columns
// to nodes / inactive_nodes (PR #903, canonical owner per #1324
// follow-up). These columns are populated by the ingestor's
// RunMultibyteCapPersist from snapshot files written by the server's
// analytics cycle; the server is read-only since #1289 and MUST NOT
// write here. The schema itself lives here in dbschema (the writer
// owns migrations, the read-only server merely AssertReady's them).
func ensureMultibyteCapColumns(rw *sql.DB, logf Logger) error {
for _, table := range []string{"nodes", "inactive_nodes"} {
hasSup, err := TableHasColumn(rw, table, "multibyte_sup")
if err != nil {
return fmt.Errorf("inspect %s.multibyte_sup: %w", table, err)
}
if !hasSup {
if _, err := rw.Exec(fmt.Sprintf(
"ALTER TABLE %s ADD COLUMN multibyte_sup INTEGER NOT NULL DEFAULT 0", table)); err != nil {
return fmt.Errorf("add %s.multibyte_sup: %w", table, err)
}
logf("[dbschema] added multibyte_sup column to %s", table)
}
hasEvid, err := TableHasColumn(rw, table, "multibyte_evidence")
if err != nil {
return fmt.Errorf("inspect %s.multibyte_evidence: %w", table, err)
}
if !hasEvid {
if _, err := rw.Exec(fmt.Sprintf(
"ALTER TABLE %s ADD COLUMN multibyte_evidence TEXT", table)); err != nil {
return fmt.Errorf("add %s.multibyte_evidence: %w", table, err)
}
logf("[dbschema] added multibyte_evidence column to %s", table)
}
}
return nil
}