mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-05-25 00:26:26 +00:00
9383201c07
Red commit:
https://github.com/Kpa-clawbot/CoreScope/commit/eae179b99b5fd34924547632aa8f8025c405aa53
(CI: pending — opens with this PR)
Finishes #1283. RED test `TestServerSourceHasNoCachedRWCalls` goes from
failing (13 writer call-sites) to GREEN (zero). Per #1287 Option 4
(https://github.com/Kpa-clawbot/CoreScope/issues/1287#issuecomment-4485099992):
ingestor owns the neighbor graph build + persist; server reads the
snapshot.
**Category A — Schema migrations** → new `internal/dbschema` package.
`dbschema.Apply(rw)` runs in `cmd/ingestor` startup (in `OpenStore`).
`dbschema.AssertReady(ro)` runs in `cmd/server/main.go` and
FATAL-LOG-EXITS if any expected column/index/table is missing — the
operator must restart the ingestor first. Covers indexes,
`neighbor_edges`, `observations.resolved_path`,
`observers.{inactive,last_packet_at,iata}`,
`(inactive_)nodes.foreign_advert`, `transmissions.from_pubkey`.
**Category B — Backfill** → ingestor.
`BackfillFromPubkey` and observer-blacklist soft-delete moved to
`cmd/ingestor/maintenance.go`. Server keeps an inert
`fromPubkeyBackfillSnapshot` stub for `/api/healthz` API compatibility.
**Category C — Neighbor-graph persistence (Option 4)** → ingestor
writes, server reads.
- Ingestor (`cmd/ingestor/neighbor_builder.go`): every 60s scans
`observations + transmissions`, extracts edges (originator↔first-hop for
ADVERTs; observer↔last-hop for all), resolves hop prefixes via a
node-table prefix index, upserts into `neighbor_edges`.
- Server (`cmd/server/neighbor_recomputer.go`): every 60s re-reads
`neighbor_edges` and atomic-swaps the resulting `NeighborGraph` into
`s.graph`. Initial load is synchronous on startup. All server-side
incremental edge writers (the two `asyncPersistResolvedPathsAndEdges`
paths in `cmd/server/store.go`) are gone.
- Neighbor-edge daily prune (`PruneNeighborEdges`) moved to ingestor.
**Why Option 4**: clean read/write separation, no startup CPU spike
(server loads existing snapshot instead of rebuilding from history), no
IPC/delta-protocol churn. Staleness budget ~60s — same model as the
analytics recomputers in #1240 / #1248 / #672 axis 2.
**Recomputer interval default for neighbor graph**: 60s
(`NeighborGraphRecomputerDefaultInterval`,
`NeighborEdgesBuilderInterval`).
**Invariants added**:
- `TestServerSourceHasNoCachedRWCalls` (RED commit eae179b9): grep
enforces zero `cachedRW(`, `mode=rw`, or `sql.Open(_journal_mode=WAL…)`
in non-test `cmd/server/` sources.
- `TestServerStartupRequiresMigratedSchema`: server refuses to start
against an unmigrated DB.
- `TestNeighborGraphRecomputerLoadsSnapshot`: post-write snapshot is
picked up on the next refresh.
- `TestNeighborEdgesBuilderUpsertsFromObservations`: end-to-end pipeline
writes the expected edge.
`grep cachedRW cmd/server/*.go | grep -v _test.go` → 0 matches.
Fixes #1287.
---------
Co-authored-by: MeshCore Bot <bot@meshcore.local>
Co-authored-by: Kpa-clawbot <Kpa-clawbot@users.noreply.github.com>
Co-authored-by: corescope-bot <bot@corescope.local>
327 lines
10 KiB
Go
327 lines
10 KiB
Go
// Package dbschema centralizes schema migrations and read-side schema
|
|
// assertions for the CoreScope SQLite DB. Per issue #1287 the writer
|
|
// (cmd/ingestor) owns ALL CREATE/ALTER/INSERT/UPDATE/DELETE on schema
|
|
// objects; the server (cmd/server) only ASSERTS that the schema is in
|
|
// the expected shape and refuses to start otherwise.
|
|
//
|
|
// Apply(rw, log) runs from the ingestor at startup BEFORE subscribing to
|
|
// MQTT. AssertReady(ro) runs from the server at startup and returns an
|
|
// error listing every missing column/index/table.
|
|
package dbschema
|
|
|
|
import (
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
)
|
|
|
|
// Logger is the minimal logging surface used by Apply. Both cmd/server
|
|
// and cmd/ingestor satisfy this with the stdlib `log` package's Printf
|
|
// (passed as a closure to avoid an indirect log dependency here).
|
|
type Logger func(format string, args ...interface{})
|
|
|
|
// Apply runs every server-side ensure_* migration against the given
|
|
// read-write SQLite connection. Each operation is idempotent
|
|
// (IF NOT EXISTS / column-probe-before-ALTER). Safe to call repeatedly.
|
|
//
|
|
// Called by the ingestor at startup. The server MUST NOT call this —
|
|
// it only calls AssertReady.
|
|
func Apply(rw *sql.DB, logf Logger) error {
|
|
if logf == nil {
|
|
logf = func(string, ...interface{}) {}
|
|
}
|
|
if err := ensureServerIndexes(rw); err != nil {
|
|
return fmt.Errorf("ensure server indexes: %w", err)
|
|
}
|
|
if err := ensureNeighborEdgesTable(rw); err != nil {
|
|
return fmt.Errorf("ensure neighbor_edges: %w", err)
|
|
}
|
|
if err := ensureInactiveNodesTable(rw); err != nil {
|
|
return fmt.Errorf("ensure inactive_nodes: %w", err)
|
|
}
|
|
if err := ensureResolvedPathColumn(rw, logf); err != nil {
|
|
return fmt.Errorf("ensure resolved_path: %w", err)
|
|
}
|
|
if err := ensureObserverInactiveColumn(rw, logf); err != nil {
|
|
return fmt.Errorf("ensure observers.inactive: %w", err)
|
|
}
|
|
if err := ensureLastPacketAtColumn(rw, logf); err != nil {
|
|
return fmt.Errorf("ensure observers.last_packet_at: %w", err)
|
|
}
|
|
if err := ensureObserverIATAColumn(rw, logf); err != nil {
|
|
return fmt.Errorf("ensure observers.iata: %w", err)
|
|
}
|
|
if err := ensureForeignAdvertColumn(rw, logf); err != nil {
|
|
return fmt.Errorf("ensure foreign_advert: %w", err)
|
|
}
|
|
if err := ensureFromPubkeyColumn(rw, logf); err != nil {
|
|
return fmt.Errorf("ensure from_pubkey: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AssertReady verifies the schema is in the expected shape. The server
|
|
// calls this at startup against a read-only connection; if it returns
|
|
// non-nil, the server MUST fatal-log and exit so the operator restarts
|
|
// the ingestor (which owns migrations).
|
|
func AssertReady(ro *sql.DB) error {
|
|
var missing []string
|
|
|
|
mustCol := func(table, col string) {
|
|
has, err := TableHasColumn(ro, table, col)
|
|
if err != nil {
|
|
missing = append(missing, fmt.Sprintf("%s.%s (probe error: %v)", table, col, err))
|
|
return
|
|
}
|
|
if !has {
|
|
missing = append(missing, fmt.Sprintf("%s.%s", table, col))
|
|
}
|
|
}
|
|
mustTable := func(name string) {
|
|
var n int
|
|
err := ro.QueryRow(`SELECT 1 FROM sqlite_master WHERE type='table' AND name=?`, name).Scan(&n)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
missing = append(missing, "table:"+name)
|
|
} else if err != nil {
|
|
missing = append(missing, fmt.Sprintf("table:%s (probe error: %v)", name, err))
|
|
}
|
|
}
|
|
|
|
mustTable("neighbor_edges")
|
|
mustCol("observations", "resolved_path")
|
|
mustCol("observers", "inactive")
|
|
mustCol("observers", "last_packet_at")
|
|
mustCol("observers", "iata")
|
|
mustCol("nodes", "foreign_advert")
|
|
mustCol("inactive_nodes", "foreign_advert")
|
|
mustCol("transmissions", "from_pubkey")
|
|
|
|
if len(missing) > 0 {
|
|
return fmt.Errorf("schema not migrated by ingestor; restart ingestor first. missing: %s",
|
|
strings.Join(missing, ", "))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TableHasColumn reports whether the given table has the given column.
|
|
// Exported because tests and the read-side need it without re-implementing.
|
|
func TableHasColumn(db *sql.DB, table, column string) (bool, error) {
|
|
rows, err := db.Query(fmt.Sprintf("PRAGMA table_info(%s)", table))
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
var cid int
|
|
var name string
|
|
var ctype sql.NullString
|
|
var notnull, pk int
|
|
var dflt sql.NullString
|
|
if err := rows.Scan(&cid, &name, &ctype, ¬null, &dflt, &pk); err != nil {
|
|
return false, err
|
|
}
|
|
if name == column {
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, rows.Err()
|
|
}
|
|
|
|
// ─── ensure_* helpers (writer side) ────────────────────────────────────────
|
|
|
|
func ensureServerIndexes(rw *sql.DB) error {
|
|
stmts := []string{
|
|
`CREATE INDEX IF NOT EXISTS idx_transmissions_first_seen ON transmissions(first_seen)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_transmissions_hash ON transmissions(hash)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_transmissions_payload_type ON transmissions(payload_type)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_observations_timestamp ON observations(timestamp)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_observations_transmission_id ON observations(transmission_id)`,
|
|
}
|
|
for _, s := range stmts {
|
|
if _, err := rw.Exec(s); err != nil {
|
|
return fmt.Errorf("ensure index %q: %w", s, err)
|
|
}
|
|
}
|
|
// observer_idx (v3) vs observer_id (v2) — probe + index the matching one.
|
|
hasIdx, err := TableHasColumn(rw, "observations", "observer_idx")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if hasIdx {
|
|
if _, err := rw.Exec(`CREATE INDEX IF NOT EXISTS idx_observations_observer_idx ON observations(observer_idx)`); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
hasID, err := TableHasColumn(rw, "observations", "observer_id")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if hasID {
|
|
if _, err := rw.Exec(`CREATE INDEX IF NOT EXISTS idx_observations_observer_id ON observations(observer_id)`); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func ensureNeighborEdgesTable(rw *sql.DB) error {
|
|
_, err := rw.Exec(`CREATE TABLE IF NOT EXISTS neighbor_edges (
|
|
node_a TEXT NOT NULL,
|
|
node_b TEXT NOT NULL,
|
|
count INTEGER DEFAULT 1,
|
|
last_seen TEXT,
|
|
PRIMARY KEY (node_a, node_b)
|
|
)`)
|
|
return err
|
|
}
|
|
|
|
// ensureInactiveNodesTable creates the inactive_nodes table if missing.
|
|
// The ingestor's applySchema also creates this table — duplicating it
|
|
// here makes dbschema.Apply self-sufficient when called against a
|
|
// fixture DB that pre-dates the soft-delete feature (e.g. CI's
|
|
// test-fixtures/e2e-fixture.db, which never had any inactive rows).
|
|
// Schema kept in sync with cmd/ingestor/db.go:applySchema.
|
|
func ensureInactiveNodesTable(rw *sql.DB) error {
|
|
_, err := rw.Exec(`CREATE TABLE IF NOT EXISTS inactive_nodes (
|
|
public_key TEXT PRIMARY KEY,
|
|
name TEXT,
|
|
role TEXT,
|
|
lat REAL,
|
|
lon REAL,
|
|
last_seen TEXT,
|
|
first_seen TEXT,
|
|
advert_count INTEGER DEFAULT 0,
|
|
battery_mv INTEGER,
|
|
temperature_c REAL,
|
|
foreign_advert INTEGER DEFAULT 0
|
|
)`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = rw.Exec(`CREATE INDEX IF NOT EXISTS idx_inactive_nodes_last_seen ON inactive_nodes(last_seen)`)
|
|
return err
|
|
}
|
|
|
|
func ensureResolvedPathColumn(rw *sql.DB, logf Logger) error {
|
|
has, err := TableHasColumn(rw, "observations", "resolved_path")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if has {
|
|
return nil
|
|
}
|
|
if _, err := rw.Exec("ALTER TABLE observations ADD COLUMN resolved_path TEXT"); err != nil {
|
|
return err
|
|
}
|
|
logf("[dbschema] added resolved_path column to observations")
|
|
return nil
|
|
}
|
|
|
|
func ensureObserverInactiveColumn(rw *sql.DB, logf Logger) error {
|
|
has, err := TableHasColumn(rw, "observers", "inactive")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if has {
|
|
return nil
|
|
}
|
|
if _, err := rw.Exec("ALTER TABLE observers ADD COLUMN inactive INTEGER DEFAULT 0"); err != nil {
|
|
return err
|
|
}
|
|
logf("[dbschema] added inactive column to observers")
|
|
return nil
|
|
}
|
|
|
|
func ensureLastPacketAtColumn(rw *sql.DB, logf Logger) error {
|
|
has, err := TableHasColumn(rw, "observers", "last_packet_at")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if has {
|
|
return nil
|
|
}
|
|
if _, err := rw.Exec("ALTER TABLE observers ADD COLUMN last_packet_at TEXT"); err != nil {
|
|
return err
|
|
}
|
|
logf("[dbschema] added last_packet_at column to observers")
|
|
return nil
|
|
}
|
|
|
|
func ensureObserverIATAColumn(rw *sql.DB, logf Logger) error {
|
|
has, err := TableHasColumn(rw, "observers", "iata")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if has {
|
|
return nil
|
|
}
|
|
if _, err := rw.Exec("ALTER TABLE observers ADD COLUMN iata TEXT"); err != nil {
|
|
return err
|
|
}
|
|
logf("[dbschema] added iata column to observers")
|
|
return nil
|
|
}
|
|
|
|
func ensureForeignAdvertColumn(rw *sql.DB, logf Logger) error {
|
|
for _, table := range []string{"nodes", "inactive_nodes"} {
|
|
has, err := TableHasColumn(rw, table, "foreign_advert")
|
|
if err != nil {
|
|
return fmt.Errorf("inspect %s: %w", table, err)
|
|
}
|
|
if has {
|
|
continue
|
|
}
|
|
if _, err := rw.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN foreign_advert INTEGER DEFAULT 0", table)); err != nil {
|
|
return err
|
|
}
|
|
logf("[dbschema] added foreign_advert column to %s", table)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func ensureFromPubkeyColumn(rw *sql.DB, logf Logger) error {
|
|
has, err := TableHasColumn(rw, "transmissions", "from_pubkey")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !has {
|
|
if _, err := rw.Exec("ALTER TABLE transmissions ADD COLUMN from_pubkey TEXT"); err != nil {
|
|
return err
|
|
}
|
|
logf("[dbschema] added from_pubkey column to transmissions (#1143)")
|
|
}
|
|
if _, err := rw.Exec("CREATE INDEX IF NOT EXISTS idx_transmissions_from_pubkey ON transmissions(from_pubkey)"); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SoftDeleteBlacklistedObservers marks the given observer IDs as
|
|
// inactive=1 (case-insensitive match). Returns count affected.
|
|
// Writer-side helper; ingestor calls it at startup with the operator
|
|
// blacklist (read from config).
|
|
func SoftDeleteBlacklistedObservers(rw *sql.DB, blacklist []string) (int64, error) {
|
|
placeholders := make([]string, 0, len(blacklist))
|
|
args := make([]interface{}, 0, len(blacklist))
|
|
for _, pk := range blacklist {
|
|
t := strings.TrimSpace(pk)
|
|
if t == "" {
|
|
continue
|
|
}
|
|
placeholders = append(placeholders, "LOWER(?)")
|
|
args = append(args, t)
|
|
}
|
|
if len(placeholders) == 0 {
|
|
return 0, nil
|
|
}
|
|
q := "UPDATE observers SET inactive = 1 WHERE LOWER(id) IN (" +
|
|
strings.Join(placeholders, ",") + ") AND (inactive IS NULL OR inactive = 0)"
|
|
res, err := rw.Exec(q, args...)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
n, _ := res.RowsAffected()
|
|
return n, nil
|
|
}
|