Files
meshcore-analyzer/cmd/ingestor/db.go
T
Joel Claw 95d7916530 fix(channels): normalize known channel display names (public → Public) (#777)
Normalizes well-known channel display names (currently only `public` → `Public`) so existing deployments with pre-#761 lowercase config keys show the canonical firmware-default name `Public` in the UI.

Behavior:
- `knownChannelCasing` lookup (`decoder.go`) — single-entry map, easy to extend.
- `normalizeChannelName()` applied at config load (`loadChannelKeys`) AND at decode time (defense in depth).
- One-shot SQLite migration `channel_hash_casing_v1` backfills `channel_hash='public'` → `'Public'` on `payload_type=5` rows so channel-grouping queries don't split across the upgrade boundary.
- Hardcoded list intentionally tiny (1 entry); custom/user channels left untouched.

Safety:
- Channel-hash derivation (`SHA256(channelName)[:16]` for `#`-prefixed `HashChannels`) is unchanged — normalization only renames map keys for explicit `ChannelKeys` entries (which don't feed `deriveHashtagChannelKey`).
- PSK lookup is by hash byte, not by name — mesh interop preserved.
- Migration is gated by `_migrations.name='channel_hash_casing_v1'`, idempotent.

Tests (`cmd/ingestor/normalize_channel_test.go`):
- `TestNormalizeChannelName` covers known + hashtag + custom + empty.
- `TestLoadChannelKeys_NormalizesKnownDisplayNames` — verifies `public` → `Public` at load.
- `TestLoadChannelKeys_LeavesCustomNamesUntouched` — custom names not auto-capitalized.
- `TestLoadChannelKeys_DuplicateCasingLogsWarning` — config containing both casings resolves deterministically (canonical wins).

Mutation test confirmed: reverting load-time normalize → `TestLoadChannelKeys_NormalizesKnownDisplayNames` and `_DuplicateCasingLogsWarning` both fail on assertions.

Related: #761
2026-05-25 23:05:07 -07:00

1456 lines
53 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/meshcore-analyzer/dbschema"
"github.com/meshcore-analyzer/packetpath"
_ "modernc.org/sqlite"
)
// DBStats tracks operational metrics for the ingestor database.
type DBStats struct {
TransmissionsInserted atomic.Int64
ObservationsInserted atomic.Int64
DuplicateTransmissions atomic.Int64
NodeUpserts atomic.Int64
ObserverUpserts atomic.Int64
WriteErrors atomic.Int64
SignatureDrops atomic.Int64
// WALCommits tracks every successful tx.Commit() that may have flushed
// WAL pages.
WALCommits atomic.Int64
// BackfillUpdates tracks per-named-backfill row write counts so an
// infinite-loop backfill (cf #1119) is obvious from the perf page.
BackfillUpdates sync.Map // name (string) -> *atomic.Int64
}
// IncBackfill increments the backfill counter for the given name, allocating
// the counter on first use.
func (s *DBStats) IncBackfill(name string) {
v, ok := s.BackfillUpdates.Load(name)
if !ok {
nc := new(atomic.Int64)
actual, loaded := s.BackfillUpdates.LoadOrStore(name, nc)
if loaded {
v = actual
} else {
v = nc
}
}
v.(*atomic.Int64).Add(1)
}
// SnapshotBackfills returns a name->count copy of all backfill counters.
func (s *DBStats) SnapshotBackfills() map[string]int64 {
out := make(map[string]int64)
s.BackfillUpdates.Range(func(k, v interface{}) bool {
out[k.(string)] = v.(*atomic.Int64).Load()
return true
})
return out
}
// Store wraps the SQLite database for packet ingestion.
type Store struct {
db *sql.DB
path string // filesystem path to the SQLite DB (used to resolve queue dirs)
Stats DBStats
stmtGetTxByHash *sql.Stmt
stmtInsertTransmission *sql.Stmt
stmtUpdateTxFirstSeen *sql.Stmt
stmtInsertObservation *sql.Stmt
stmtUpsertNode *sql.Stmt
stmtIncrementAdvertCount *sql.Stmt
stmtUpsertObserver *sql.Stmt
stmtGetObserverRowid *sql.Stmt
stmtUpdateObserverLastSeen *sql.Stmt
stmtUpdateNodeTelemetry *sql.Stmt
stmtUpsertMetrics *sql.Stmt
sampleIntervalSec int
backfillWg sync.WaitGroup
}
// OpenStore opens or creates a SQLite DB at the given path, applying the
// v3 schema that is compatible with the Node.js server.
func OpenStore(dbPath string) (*Store, error) {
return OpenStoreWithInterval(dbPath, 300)
}
// OpenStoreWithInterval opens or creates a SQLite DB with a configurable sample interval.
func OpenStoreWithInterval(dbPath string, sampleIntervalSec int) (*Store, error) {
dir := filepath.Dir(dbPath)
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, fmt.Errorf("creating data dir: %w", err)
}
db, err := sql.Open("sqlite", dbPath+"?_pragma=auto_vacuum(INCREMENTAL)&_pragma=journal_mode(WAL)&_pragma=foreign_keys(ON)&_pragma=busy_timeout(5000)")
if err != nil {
return nil, fmt.Errorf("opening db: %w", err)
}
if err := db.Ping(); err != nil {
return nil, fmt.Errorf("pinging db: %w", err)
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
log.Printf("SQLite config: busy_timeout=5000ms, max_open_conns=1, max_idle_conns=1, journal=WAL")
if err := applySchema(db); err != nil {
return nil, fmt.Errorf("applying schema: %w", err)
}
// Apply the additional server-originated migrations (now owned by
// the ingestor per #1287). Adds the indexes/columns that used to live
// in cmd/server/ensure_*.go: server now ASSERTS these exist.
if err := dbschema.Apply(db, log.Printf); err != nil {
return nil, fmt.Errorf("dbschema.Apply: %w", err)
}
s := &Store{db: db, path: dbPath, sampleIntervalSec: sampleIntervalSec}
if err := s.prepareStatements(); err != nil {
return nil, fmt.Errorf("preparing statements: %w", err)
}
return s, nil
}
func applySchema(db *sql.DB) error {
// auto_vacuum=INCREMENTAL is set via DSN pragma (must be before journal_mode).
// Logging of current mode is handled by CheckAutoVacuum — no duplicate log here.
schema := `
CREATE TABLE IF NOT EXISTS nodes (
public_key TEXT PRIMARY KEY,
name TEXT,
role TEXT,
lat REAL,
lon REAL,
last_seen TEXT,
first_seen TEXT,
advert_count INTEGER DEFAULT 0,
battery_mv INTEGER,
temperature_c REAL,
foreign_advert INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS observers (
id TEXT PRIMARY KEY,
name TEXT,
iata TEXT,
last_seen TEXT,
first_seen TEXT,
packet_count INTEGER DEFAULT 0,
model TEXT,
firmware TEXT,
client_version TEXT,
radio TEXT,
battery_mv INTEGER,
uptime_secs INTEGER,
noise_floor REAL,
inactive INTEGER DEFAULT 0,
last_packet_at TEXT DEFAULT NULL
);
CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen);
CREATE INDEX IF NOT EXISTS idx_observers_last_seen ON observers(last_seen);
CREATE TABLE IF NOT EXISTS inactive_nodes (
public_key TEXT PRIMARY KEY,
name TEXT,
role TEXT,
lat REAL,
lon REAL,
last_seen TEXT,
first_seen TEXT,
advert_count INTEGER DEFAULT 0,
battery_mv INTEGER,
temperature_c REAL,
foreign_advert INTEGER DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_inactive_nodes_last_seen ON inactive_nodes(last_seen);
CREATE TABLE IF NOT EXISTS transmissions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
raw_hex TEXT NOT NULL,
hash TEXT NOT NULL UNIQUE,
first_seen TEXT NOT NULL,
route_type INTEGER,
payload_type INTEGER,
payload_version INTEGER,
decoded_json TEXT,
from_pubkey TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_transmissions_hash ON transmissions(hash);
CREATE INDEX IF NOT EXISTS idx_transmissions_first_seen ON transmissions(first_seen);
CREATE INDEX IF NOT EXISTS idx_transmissions_payload_type ON transmissions(payload_type);
-- idx_transmissions_from_pubkey is created by the from_pubkey_v1
-- migration after the column is added on legacy DBs (#1143).
`
if _, err := db.Exec(schema); err != nil {
return fmt.Errorf("base schema: %w", err)
}
// Create observations table (v3 schema)
obsExists := false
row := db.QueryRow("SELECT name FROM sqlite_master WHERE type='table' AND name='observations'")
var dummy string
if row.Scan(&dummy) == nil {
obsExists = true
}
if !obsExists {
obs := `
CREATE TABLE observations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transmission_id INTEGER NOT NULL REFERENCES transmissions(id),
observer_idx INTEGER,
direction TEXT,
snr REAL,
rssi REAL,
score INTEGER,
path_json TEXT,
timestamp INTEGER NOT NULL
);
CREATE INDEX idx_observations_transmission_id ON observations(transmission_id);
CREATE INDEX idx_observations_observer_idx ON observations(observer_idx);
CREATE INDEX idx_observations_timestamp ON observations(timestamp);
CREATE UNIQUE INDEX IF NOT EXISTS idx_observations_dedup ON observations(transmission_id, observer_idx, COALESCE(path_json, ''));
`
if _, err := db.Exec(obs); err != nil {
return fmt.Errorf("observations schema: %w", err)
}
}
// Create/rebuild packets_v view (v3 schema: observer_idx → observers.rowid)
// The Go server reads this view; without it fresh installs get "no such table: packets_v".
db.Exec(`DROP VIEW IF EXISTS packets_v`)
_, vErr := db.Exec(`
CREATE VIEW packets_v AS
SELECT o.id, COALESCE(o.raw_hex, t.raw_hex) AS raw_hex,
datetime(o.timestamp, 'unixepoch') AS timestamp,
obs.id AS observer_id, obs.name AS observer_name,
o.direction, o.snr, o.rssi, o.score, t.hash, t.route_type,
t.payload_type, t.payload_version, o.path_json, t.decoded_json,
t.created_at
FROM observations o
JOIN transmissions t ON t.id = o.transmission_id
LEFT JOIN observers obs ON obs.rowid = o.observer_idx AND (obs.inactive IS NULL OR obs.inactive = 0)
`)
if vErr != nil {
return fmt.Errorf("packets_v view: %w", vErr)
}
// One-time migration: recalculate advert_count to count unique transmissions only
db.Exec(`CREATE TABLE IF NOT EXISTS _migrations (name TEXT PRIMARY KEY)`)
var migDone int
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'advert_count_unique_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Recalculating advert_count (unique transmissions only)...")
// Note: this migration is gated on a one-shot _migrations row, so it
// runs at most once per DB. The historical version used a LIKE-on-JSON
// substring match (#1143). Switching to from_pubkey here is safe even
// though the column may not yet be backfilled on legacy DBs: the
// migration is already marked done on those DBs and won't re-run.
db.Exec(`
UPDATE nodes SET advert_count = (
SELECT COUNT(*) FROM transmissions t
WHERE t.payload_type = 4
AND t.from_pubkey = nodes.public_key
)
`)
db.Exec(`INSERT INTO _migrations (name) VALUES ('advert_count_unique_v1')`)
log.Println("[migration] advert_count recalculated")
}
// One-time migration: change noise_floor from INTEGER to REAL affinity.
// SQLite doesn't support ALTER COLUMN, but existing float values are stored
// as REAL regardless of column affinity. New table definition already uses REAL.
// This migration casts any integer-stored noise_floor values to real.
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'noise_floor_real_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Ensuring noise_floor values are stored as REAL...")
db.Exec(`UPDATE observers SET noise_floor = CAST(noise_floor AS REAL) WHERE noise_floor IS NOT NULL AND typeof(noise_floor) = 'integer'`)
db.Exec(`INSERT INTO _migrations (name) VALUES ('noise_floor_real_v1')`)
log.Println("[migration] noise_floor migration complete")
}
// One-time migration: add telemetry columns to nodes and inactive_nodes tables.
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'node_telemetry_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Adding telemetry columns to nodes/inactive_nodes...")
// checkAndAddColumn checks whether `column` already exists in `table`
// using PRAGMA table_info, and adds it if missing. All call sites pass
// hardcoded table/column/type literals so there is no SQL injection risk.
checkAndAddColumn := func(table, column, colType string) error {
rows, err := db.Query(fmt.Sprintf("PRAGMA table_info(%s)", table))
if err != nil {
return fmt.Errorf("querying table info for %s: %w", table, err)
}
defer rows.Close()
exists := false
for rows.Next() {
var cid int
var name, ctype string
var notnull, pk int
var dfltValue sql.NullString
if err := rows.Scan(&cid, &name, &ctype, &notnull, &dfltValue, &pk); err != nil {
return fmt.Errorf("scanning table info for %s: %w", table, err)
}
if name == column {
exists = true
break
}
}
if err := rows.Err(); err != nil {
return fmt.Errorf("iterating table info for %s: %w", table, err)
}
if exists {
return nil
}
if _, err := db.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN %s %s", table, column, colType)); err != nil {
return fmt.Errorf("adding column %s to %s: %w", column, table, err)
}
return nil
}
if err := checkAndAddColumn("nodes", "battery_mv", "INTEGER"); err != nil {
return err
}
if err := checkAndAddColumn("nodes", "temperature_c", "REAL"); err != nil {
return err
}
if err := checkAndAddColumn("inactive_nodes", "battery_mv", "INTEGER"); err != nil {
return err
}
if err := checkAndAddColumn("inactive_nodes", "temperature_c", "REAL"); err != nil {
return err
}
if _, err := db.Exec(`INSERT INTO _migrations (name) VALUES ('node_telemetry_v1')`); err != nil {
return fmt.Errorf("recording node_telemetry_v1 migration: %w", err)
}
log.Println("[migration] node telemetry columns added")
}
// One-time migration: add timestamp index on observations for fast stats queries.
// Older databases created before this index was added suffer from full table scans
// on COUNT(*) WHERE timestamp > ?, causing /api/stats to take 30s+.
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'obs_timestamp_index_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Adding timestamp index on observations...")
db.Exec(`CREATE INDEX IF NOT EXISTS idx_observations_timestamp ON observations(timestamp)`)
db.Exec(`INSERT INTO _migrations (name) VALUES ('obs_timestamp_index_v1')`)
log.Println("[migration] observations timestamp index created")
}
// observer_metrics table for RF health dashboard
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'observer_metrics_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Creating observer_metrics table...")
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS observer_metrics (
observer_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
noise_floor REAL,
tx_air_secs INTEGER,
rx_air_secs INTEGER,
recv_errors INTEGER,
battery_mv INTEGER,
PRIMARY KEY (observer_id, timestamp)
)
`)
if err != nil {
return fmt.Errorf("observer_metrics schema: %w", err)
}
db.Exec(`INSERT INTO _migrations (name) VALUES ('observer_metrics_v1')`)
log.Println("[migration] observer_metrics table created")
}
// Migration: add timestamp index for cross-observer time-range queries
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'observer_metrics_ts_idx'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Creating observer_metrics timestamp index...")
_, err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_observer_metrics_timestamp ON observer_metrics(timestamp)`)
if err != nil {
return fmt.Errorf("observer_metrics timestamp index: %w", err)
}
db.Exec(`INSERT INTO _migrations (name) VALUES ('observer_metrics_ts_idx')`)
log.Println("[migration] observer_metrics timestamp index created")
}
// Migration: add inactive column to observers for soft-delete retention
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'observers_inactive_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Adding inactive column to observers...")
_, err := db.Exec(`ALTER TABLE observers ADD COLUMN inactive INTEGER DEFAULT 0`)
if err != nil {
// Column may already exist (e.g. fresh install with schema above)
log.Printf("[migration] observers.inactive: %v (may already exist)", err)
}
db.Exec(`INSERT INTO _migrations (name) VALUES ('observers_inactive_v1')`)
log.Println("[migration] observers.inactive column added")
}
// Migration: add packets_sent and packets_recv columns to observer_metrics
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'observer_metrics_packets_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Adding packets_sent/packets_recv columns to observer_metrics...")
db.Exec(`ALTER TABLE observer_metrics ADD COLUMN packets_sent INTEGER`)
db.Exec(`ALTER TABLE observer_metrics ADD COLUMN packets_recv INTEGER`)
db.Exec(`INSERT INTO _migrations (name) VALUES ('observer_metrics_packets_v1')`)
log.Println("[migration] packets_sent/packets_recv columns added")
}
// Migration: add channel_hash column for fast channel queries (#762)
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'channel_hash_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Adding channel_hash column to transmissions...")
db.Exec(`ALTER TABLE transmissions ADD COLUMN channel_hash TEXT DEFAULT NULL`)
db.Exec(`CREATE INDEX IF NOT EXISTS idx_tx_channel_hash ON transmissions(channel_hash) WHERE payload_type = 5`)
// Backfill: extract channel name for decrypted (CHAN) packets
res, err := db.Exec(`UPDATE transmissions SET channel_hash = json_extract(decoded_json, '$.channel') WHERE payload_type = 5 AND channel_hash IS NULL AND json_extract(decoded_json, '$.type') = 'CHAN'`)
if err == nil {
n, _ := res.RowsAffected()
log.Printf("[migration] Backfilled channel_hash for %d CHAN packets", n)
}
// Backfill: extract channelHashHex for encrypted (GRP_TXT) packets, prefixed with 'enc_'
res, err = db.Exec(`UPDATE transmissions SET channel_hash = 'enc_' || json_extract(decoded_json, '$.channelHashHex') WHERE payload_type = 5 AND channel_hash IS NULL AND json_extract(decoded_json, '$.type') = 'GRP_TXT'`)
if err == nil {
n, _ := res.RowsAffected()
log.Printf("[migration] Backfilled channel_hash for %d GRP_TXT packets", n)
}
db.Exec(`INSERT INTO _migrations (name) VALUES ('channel_hash_v1')`)
log.Println("[migration] channel_hash column added and backfilled")
}
// Migration: dropped_packets table for signature validation failures (#793)
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'dropped_packets_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Creating dropped_packets table...")
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS dropped_packets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
hash TEXT,
raw_hex TEXT,
reason TEXT NOT NULL,
observer_id TEXT,
observer_name TEXT,
node_pubkey TEXT,
node_name TEXT,
dropped_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_dropped_observer ON dropped_packets(observer_id);
CREATE INDEX IF NOT EXISTS idx_dropped_node ON dropped_packets(node_pubkey);
`)
if err != nil {
return fmt.Errorf("dropped_packets schema: %w", err)
}
db.Exec(`INSERT INTO _migrations (name) VALUES ('dropped_packets_v1')`)
log.Println("[migration] dropped_packets table created")
}
// Migration: observations.raw_hex (#881) is now owned by
// internal/dbschema/dbschema.go (#1321). The server PRAGMA-detects
// this column as hasObsRawHex; keeping a single canonical Apply
// path closes the startup race where the server's detector ran
// before this ALTER finished.
// Migration: transmissions.scope_name (#899) is now owned by
// internal/dbschema/dbschema.go (#1321). See above.
// Migration: add last_packet_at column to observers (#last-packet-at)
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'observers_last_packet_at_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Adding last_packet_at column to observers...")
_, alterErr := db.Exec(`ALTER TABLE observers ADD COLUMN last_packet_at TEXT DEFAULT NULL`)
if alterErr != nil && !strings.Contains(alterErr.Error(), "duplicate column") {
return fmt.Errorf("observers last_packet_at ALTER: %w", alterErr)
}
// Backfill: set last_packet_at = last_seen only for observers that actually have
// observation rows (packet_count alone is unreliable — UpsertObserver sets it to 1
// on INSERT even for status-only observers).
res, err := db.Exec(`UPDATE observers SET last_packet_at = last_seen
WHERE last_packet_at IS NULL
AND rowid IN (SELECT DISTINCT observer_idx FROM observations WHERE observer_idx IS NOT NULL)`)
if err == nil {
n, _ := res.RowsAffected()
log.Printf("[migration] Backfilled last_packet_at for %d observers with packets", n)
}
db.Exec(`INSERT INTO _migrations (name) VALUES ('observers_last_packet_at_v1')`)
log.Println("[migration] observers.last_packet_at column added")
}
// Migration: backfill observations.path_json from raw_hex (#888)
// NOTE: This runs ASYNC via BackfillPathJSONAsync() to avoid blocking MQTT startup.
// See staging outage where ~502K rows blocked ingest for 15+ hours.
// One-time cleanup: delete legacy packets with empty hash or empty first_seen (#994)
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'cleanup_legacy_null_hash_ts'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Cleaning up legacy packets with empty hash/timestamp...")
db.Exec(`DELETE FROM observations WHERE transmission_id IN (SELECT id FROM transmissions WHERE hash = '' OR first_seen = '')`)
res, err := db.Exec(`DELETE FROM transmissions WHERE hash = '' OR first_seen = ''`)
if err == nil {
deleted, _ := res.RowsAffected()
log.Printf("[migration] deleted %d legacy packets with empty hash/timestamp", deleted)
}
db.Exec(`INSERT INTO _migrations (name) VALUES ('cleanup_legacy_null_hash_ts')`)
}
// Migration: foreign_advert column on nodes/inactive_nodes (#730)
// Marks nodes whose ADVERT GPS lies outside the configured geofilter polygon.
// Default 0; set to 1 by the ingestor when GeoFilter is configured and
// PassesFilter() returns false. Allows operators to surface bridged/leaked
// adverts without silently dropping them.
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'foreign_advert_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Adding foreign_advert column to nodes/inactive_nodes...")
if _, err := db.Exec(`ALTER TABLE nodes ADD COLUMN foreign_advert INTEGER DEFAULT 0`); err != nil {
log.Printf("[migration] nodes.foreign_advert: %v (may already exist)", err)
}
if _, err := db.Exec(`ALTER TABLE inactive_nodes ADD COLUMN foreign_advert INTEGER DEFAULT 0`); err != nil {
log.Printf("[migration] inactive_nodes.foreign_advert: %v (may already exist)", err)
}
db.Exec(`CREATE INDEX IF NOT EXISTS idx_nodes_foreign_advert ON nodes(foreign_advert) WHERE foreign_advert = 1`)
db.Exec(`INSERT INTO _migrations (name) VALUES ('foreign_advert_v1')`)
log.Println("[migration] foreign_advert column added")
}
// Migration: from_pubkey column on transmissions (#1143).
// Replaces the unsound `decoded_json LIKE '%pubkey%'` attribution path with
// an exact-match indexed column. Synchronously adds the column + index;
// row-level backfill is run by the SERVER asynchronously
// (cmd/server/from_pubkey_migration.go) so we don't block ingestor boot.
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'from_pubkey_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Adding from_pubkey column + index to transmissions (#1143)...")
if _, err := db.Exec(`ALTER TABLE transmissions ADD COLUMN from_pubkey TEXT`); err != nil {
log.Printf("[migration] transmissions.from_pubkey: %v (may already exist)", err)
}
if _, err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_transmissions_from_pubkey ON transmissions(from_pubkey)`); err != nil {
log.Printf("[migration] idx_transmissions_from_pubkey: %v", err)
}
db.Exec(`INSERT INTO _migrations (name) VALUES ('from_pubkey_v1')`)
log.Println("[migration] from_pubkey column + index added")
}
// Migration: nodes.default_scope (#899 Feature 3) is now owned by
// internal/dbschema/dbschema.go (#1321). The server PRAGMA-detects
// this column as hasDefaultScope; keeping a single canonical Apply
// path closes the startup race that #1321 documented.
// Migration: normalize known channel_hash values for existing rows.
// Before this PR, config key "public" was stored as channel_hash="public".
// After this PR, new rows use channel_hash="Public". Without backfill,
// channel grouping queries split into two buckets across the upgrade boundary.
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'channel_hash_casing_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Normalizing known channel_hash values...")
res, err := db.Exec(`UPDATE transmissions SET channel_hash = 'Public' WHERE channel_hash = 'public' AND payload_type = 5`)
if err != nil {
log.Printf("[migration] ERROR: failed to normalize channel_hash: %v", err)
return fmt.Errorf("migration channel_hash_casing_v1 UPDATE failed: %w", err)
}
n, _ := res.RowsAffected()
log.Printf("[migration] Normalized %d channel_hash rows from 'public' to 'Public'", n)
if _, err := db.Exec(`INSERT OR IGNORE INTO _migrations (name) VALUES ('channel_hash_casing_v1')`); err != nil {
log.Printf("[migration] WARNING: failed to record migration: %v", err)
}
log.Println("[migration] channel_hash casing normalization complete")
}
return nil
}
func (s *Store) prepareStatements() error {
var err error
s.stmtGetTxByHash, err = s.db.Prepare("SELECT id, first_seen FROM transmissions WHERE hash = ?")
if err != nil {
return err
}
s.stmtInsertTransmission, err = s.db.Prepare(`
INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, channel_hash, scope_name, from_pubkey)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return err
}
s.stmtUpdateTxFirstSeen, err = s.db.Prepare("UPDATE transmissions SET first_seen = ? WHERE id = ?")
if err != nil {
return err
}
s.stmtInsertObservation, err = s.db.Prepare(`
INSERT INTO observations (transmission_id, observer_idx, direction, snr, rssi, score, path_json, timestamp, raw_hex)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(transmission_id, observer_idx, COALESCE(path_json, '')) DO UPDATE SET
snr = COALESCE(excluded.snr, snr),
rssi = COALESCE(excluded.rssi, rssi),
score = COALESCE(excluded.score, score),
raw_hex = COALESCE(excluded.raw_hex, raw_hex)
`)
if err != nil {
return err
}
s.stmtUpsertNode, err = s.db.Prepare(`
INSERT INTO nodes (public_key, name, role, lat, lon, last_seen, first_seen)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(public_key) DO UPDATE SET
name = COALESCE(?, name),
role = COALESCE(?, role),
lat = COALESCE(?, lat),
lon = COALESCE(?, lon),
last_seen = MAX(MIN(COALESCE(last_seen, ''), ?), ?)
`)
if err != nil {
return err
}
s.stmtIncrementAdvertCount, err = s.db.Prepare(`
UPDATE nodes SET advert_count = advert_count + 1 WHERE public_key = ?
`)
if err != nil {
return err
}
s.stmtUpsertObserver, err = s.db.Prepare(`
INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor)
VALUES (?, ?, ?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
name = COALESCE(?, name),
iata = COALESCE(?, iata),
last_seen = MAX(MIN(COALESCE(last_seen, ''), ?), ?),
packet_count = packet_count + 1,
model = COALESCE(?, model),
firmware = COALESCE(?, firmware),
client_version = COALESCE(?, client_version),
radio = COALESCE(?, radio),
battery_mv = COALESCE(?, battery_mv),
uptime_secs = COALESCE(?, uptime_secs),
noise_floor = COALESCE(?, noise_floor)
`)
if err != nil {
return err
}
s.stmtGetObserverRowid, err = s.db.Prepare("SELECT rowid FROM observers WHERE id = ?")
if err != nil {
return err
}
// Args: ingestNow, rxTime, ingestNow, rxTime, rowid
// MIN(existing, ingestNow) clamps any future value already in the DB before
// taking MAX with rxTime, so the guard never locks in a past bug's stale future.
s.stmtUpdateObserverLastSeen, err = s.db.Prepare(`
UPDATE observers SET
last_seen = MAX(MIN(COALESCE(last_seen, ''), ?), ?),
last_packet_at = MAX(MIN(COALESCE(last_packet_at, ''), ?), ?)
WHERE rowid = ?`)
if err != nil {
return err
}
s.stmtUpdateNodeTelemetry, err = s.db.Prepare(`
UPDATE nodes SET
battery_mv = COALESCE(?, battery_mv),
temperature_c = COALESCE(?, temperature_c)
WHERE public_key = ?
`)
if err != nil {
return err
}
s.stmtUpsertMetrics, err = s.db.Prepare(`
INSERT OR REPLACE INTO observer_metrics (observer_id, timestamp, noise_floor, tx_air_secs, rx_air_secs, recv_errors, battery_mv, packets_sent, packets_recv)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return err
}
return nil
}
// InsertTransmission inserts a decoded packet into transmissions + observations.
// Returns true if a new transmission was created (not a duplicate hash).
func (s *Store) InsertTransmission(data *PacketData) (bool, error) {
hash := data.Hash
if hash == "" {
return false, nil
}
rxTime := data.Timestamp
ingestNow := time.Now().UTC().Format(time.RFC3339)
if rxTime == "" {
rxTime = ingestNow
}
var txID int64
isNew := false
// Check for existing transmission
var existingID int64
var existingFirstSeen string
err := s.stmtGetTxByHash.QueryRow(hash).Scan(&existingID, &existingFirstSeen)
if err == nil {
// Existing transmission
txID = existingID
if rxTime < existingFirstSeen {
_, _ = s.stmtUpdateTxFirstSeen.Exec(rxTime, txID)
}
} else {
// New transmission
isNew = true
result, err := s.stmtInsertTransmission.Exec(
data.RawHex, hash, rxTime,
data.RouteType, data.PayloadType, data.PayloadVersion,
data.DecodedJSON, nilIfEmpty(data.ChannelHash),
scopeNameForDB(data),
nilIfEmpty(data.FromPubkey),
)
if err != nil {
s.Stats.WriteErrors.Add(1)
return false, fmt.Errorf("insert transmission: %w", err)
}
txID, _ = result.LastInsertId()
s.Stats.TransmissionsInserted.Add(1)
}
if !isNew {
s.Stats.DuplicateTransmissions.Add(1)
}
// Resolve observer_idx and update last_seen
var observerIdx *int64
if data.ObserverID != "" {
var rowid int64
err := s.stmtGetObserverRowid.QueryRow(data.ObserverID).Scan(&rowid)
if err == nil {
observerIdx = &rowid
// Update observer last_seen and last_packet_at on every packet to prevent
// low-traffic observers from appearing offline (#463)
_, _ = s.stmtUpdateObserverLastSeen.Exec(ingestNow, rxTime, ingestNow, rxTime, rowid)
}
}
// Insert observation
epochTs := time.Now().Unix()
if t, err := time.Parse(time.RFC3339, rxTime); err == nil {
epochTs = t.Unix()
}
_, err = s.stmtInsertObservation.Exec(
txID, observerIdx, data.Direction,
data.SNR, data.RSSI, data.Score,
data.PathJSON, epochTs, nilIfEmpty(data.RawHex),
)
if err != nil {
s.Stats.WriteErrors.Add(1)
log.Printf("[db] observation insert (non-fatal): %v", err)
} else {
s.Stats.ObservationsInserted.Add(1)
}
// Each prepared-stmt Exec auto-commits. Count one WAL commit per
// successful InsertTransmission so the perf page sees commit pressure.
s.Stats.WALCommits.Add(1)
return isNew, nil
}
// UpsertNode inserts or updates a node.
func (s *Store) UpsertNode(pubKey, name, role string, lat, lon *float64, lastSeen string) error {
ingestNow := time.Now().UTC().Format(time.RFC3339)
now := lastSeen
if now == "" {
now = ingestNow
}
_, err := s.stmtUpsertNode.Exec(
pubKey, name, role, lat, lon, now, now,
name, role, lat, lon, ingestNow, now,
)
if err != nil {
s.Stats.WriteErrors.Add(1)
} else {
s.Stats.NodeUpserts.Add(1)
}
return err
}
// IncrementAdvertCount increments advert_count for a node by public key.
func (s *Store) IncrementAdvertCount(pubKey string) error {
_, err := s.stmtIncrementAdvertCount.Exec(pubKey)
return err
}
// MarkNodeForeign sets foreign_advert=1 on the node row identified by pubKey.
// Used when an ADVERT arrives whose GPS lies outside the configured geofilter
// polygon (#730). Idempotent — safe to call repeatedly. No-op if pubKey is
// empty.
func (s *Store) MarkNodeForeign(pubKey string) error {
if pubKey == "" {
return nil
}
_, err := s.db.Exec(`UPDATE nodes SET foreign_advert = 1 WHERE public_key = ?`, pubKey)
if err != nil {
s.Stats.WriteErrors.Add(1)
}
return err
}
// UpdateNodeTelemetry updates battery and temperature for a node.
func (s *Store) UpdateNodeTelemetry(pubKey string, batteryMv *int, temperatureC *float64) error {
var bv, tc interface{}
if batteryMv != nil {
bv = *batteryMv
}
if temperatureC != nil {
tc = *temperatureC
}
_, err := s.stmtUpdateNodeTelemetry.Exec(bv, tc, pubKey)
if err != nil {
s.Stats.WriteErrors.Add(1)
}
return err
}
// ObserverMeta holds optional observer hardware metadata.
type ObserverMeta struct {
Model *string // e.g., L1
Firmware *string // firmware version string
ClientVersion *string // client app version string
Radio *string // radio chipset/platform string
BatteryMv *int // millivolts, always integer
UptimeSecs *int64 // seconds, always integer
NoiseFloor *float64 // dBm, may have decimals
TxAirSecs *int // cumulative TX seconds since boot
RxAirSecs *int // cumulative RX seconds since boot
RecvErrors *int // cumulative CRC/decode failures since boot
PacketsSent *int // cumulative packets sent since boot
PacketsRecv *int // cumulative packets received since boot
}
// UpsertObserver inserts or updates an observer using the current wall-clock
// time as last_seen. Use UpsertObserverAt when the message envelope provides
// an observer receive-time (e.g. MQTT status and data packet handlers).
func (s *Store) UpsertObserver(id, name, iata string, meta *ObserverMeta) error {
return s.UpsertObserverAt(id, name, iata, meta, time.Now().UTC().Format(time.RFC3339))
}
// UpsertObserverAt inserts or updates an observer with an explicit lastSeen
// timestamp (typically the observer receive-time from the MQTT envelope). The
// SQL uses MAX so last_seen never moves backwards — a retained or replayed
// message whose rxTime pre-dates the existing last_seen is a no-op for that
// field, preventing offline observers from flashing as Online on reconnect.
func (s *Store) UpsertObserverAt(id, name, iata string, meta *ObserverMeta, lastSeen string) error {
ingestNow := time.Now().UTC().Format(time.RFC3339)
if lastSeen == "" {
lastSeen = ingestNow
}
normalizedIATA := strings.TrimSpace(strings.ToUpper(iata))
var model, firmware, clientVersion, radio interface{}
var batteryMv, uptimeSecs, noiseFloor interface{}
if meta != nil {
if meta.Model != nil {
model = *meta.Model
}
if meta.Firmware != nil {
firmware = *meta.Firmware
}
if meta.ClientVersion != nil {
clientVersion = *meta.ClientVersion
}
if meta.Radio != nil {
radio = *meta.Radio
}
if meta.BatteryMv != nil {
batteryMv = *meta.BatteryMv
}
if meta.UptimeSecs != nil {
uptimeSecs = *meta.UptimeSecs
}
if meta.NoiseFloor != nil {
noiseFloor = *meta.NoiseFloor
}
}
_, err := s.stmtUpsertObserver.Exec(
id, name, normalizedIATA, lastSeen, lastSeen, model, firmware, clientVersion, radio, batteryMv, uptimeSecs, noiseFloor,
name, normalizedIATA, ingestNow, lastSeen, model, firmware, clientVersion, radio, batteryMv, uptimeSecs, noiseFloor,
)
if err != nil {
s.Stats.WriteErrors.Add(1)
return err
}
s.Stats.ObserverUpserts.Add(1)
// Reactivate if this observer was previously marked inactive
s.db.Exec(`UPDATE observers SET inactive = 0 WHERE id = ? AND inactive = 1`, id)
return nil
}
// Close checkpoints the WAL and closes the database.
func (s *Store) Close() error {
s.backfillWg.Wait()
s.Checkpoint()
return s.db.Close()
}
// RoundToInterval rounds a time to the nearest sample interval boundary.
func RoundToInterval(t time.Time, intervalSec int) time.Time {
if intervalSec <= 0 {
intervalSec = 300
}
epoch := t.Unix()
half := int64(intervalSec) / 2
rounded := ((epoch + half) / int64(intervalSec)) * int64(intervalSec)
return time.Unix(rounded, 0).UTC()
}
// MetricsData holds the fields to insert into observer_metrics.
type MetricsData struct {
ObserverID string
NoiseFloor *float64
TxAirSecs *int
RxAirSecs *int
RecvErrors *int
BatteryMv *int
PacketsSent *int
PacketsRecv *int
}
// InsertMetrics inserts a metrics sample for an observer using ingestor wall clock.
func (s *Store) InsertMetrics(data *MetricsData) error {
ts := RoundToInterval(time.Now().UTC(), s.sampleIntervalSec)
tsStr := ts.Format(time.RFC3339)
var nf, txAir, rxAir, recvErr, batt, pktSent, pktRecv interface{}
if data.NoiseFloor != nil {
nf = *data.NoiseFloor
}
if data.TxAirSecs != nil {
txAir = *data.TxAirSecs
}
if data.RxAirSecs != nil {
rxAir = *data.RxAirSecs
}
if data.RecvErrors != nil {
recvErr = *data.RecvErrors
}
if data.BatteryMv != nil {
batt = *data.BatteryMv
}
if data.PacketsSent != nil {
pktSent = *data.PacketsSent
}
if data.PacketsRecv != nil {
pktRecv = *data.PacketsRecv
}
_, err := s.stmtUpsertMetrics.Exec(data.ObserverID, tsStr, nf, txAir, rxAir, recvErr, batt, pktSent, pktRecv)
if err != nil {
s.Stats.WriteErrors.Add(1)
return fmt.Errorf("insert metrics: %w", err)
}
return nil
}
// PruneOldMetrics deletes observer_metrics rows older than retentionDays.
func (s *Store) PruneOldMetrics(retentionDays int) (int64, error) {
cutoff := time.Now().UTC().AddDate(0, 0, -retentionDays).Format(time.RFC3339)
result, err := s.db.Exec(`DELETE FROM observer_metrics WHERE timestamp < ?`, cutoff)
if err != nil {
return 0, fmt.Errorf("prune metrics: %w", err)
}
n, _ := result.RowsAffected()
if n > 0 {
log.Printf("[metrics] Pruned %d rows older than %d days", n, retentionDays)
}
return n, nil
}
// CheckAutoVacuum inspects the current auto_vacuum mode and logs a warning
// if not INCREMENTAL. Performs opt-in full VACUUM if db.vacuumOnStartup is set (#919).
func (s *Store) CheckAutoVacuum(cfg *Config) {
var autoVacuum int
if err := s.db.QueryRow("PRAGMA auto_vacuum").Scan(&autoVacuum); err != nil {
log.Printf("[db] warning: could not read auto_vacuum: %v", err)
return
}
if autoVacuum == 2 {
log.Printf("[db] auto_vacuum=INCREMENTAL")
return
}
modes := map[int]string{0: "NONE", 1: "FULL", 2: "INCREMENTAL"}
mode := modes[autoVacuum]
if mode == "" {
mode = fmt.Sprintf("UNKNOWN(%d)", autoVacuum)
}
log.Printf("[db] auto_vacuum=%s — DB needs one-time VACUUM to enable incremental auto-vacuum. "+
"Set db.vacuumOnStartup: true in config to migrate (will block startup for several minutes on large DBs). "+
"See https://github.com/Kpa-clawbot/CoreScope/issues/919", mode)
if cfg.DB != nil && cfg.DB.VacuumOnStartup {
// WARNING: Full VACUUM creates a temporary copy of the entire DB file.
// Requires ~2× the DB file size in free disk space or it will fail.
log.Printf("[db] vacuumOnStartup=true — starting one-time full VACUUM (ensure 2x DB size free disk space)...")
start := time.Now()
if _, err := s.db.Exec("PRAGMA auto_vacuum = INCREMENTAL"); err != nil {
log.Printf("[db] VACUUM failed: could not set auto_vacuum: %v", err)
return
}
if _, err := s.db.Exec("VACUUM"); err != nil {
log.Printf("[db] VACUUM failed: %v", err)
return
}
elapsed := time.Since(start)
log.Printf("[db] VACUUM complete in %v — auto_vacuum is now INCREMENTAL", elapsed.Round(time.Millisecond))
}
}
// RunIncrementalVacuum returns free pages to the OS (#919).
// Safe to call on auto_vacuum=NONE databases (noop).
func (s *Store) RunIncrementalVacuum(pages int) {
if _, err := s.db.Exec(fmt.Sprintf("PRAGMA incremental_vacuum(%d)", pages)); err != nil {
log.Printf("[vacuum] incremental_vacuum error: %v", err)
}
}
// Checkpoint forces a WAL checkpoint to release the WAL lock file,
// preventing lock contention with a new process starting up.
func (s *Store) Checkpoint() {
if _, err := s.db.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil {
log.Printf("[db] WAL checkpoint error: %v", err)
} else {
log.Println("[db] WAL checkpoint complete")
}
}
// BackfillPathJSONAsync launches the path_json backfill in a background goroutine.
// It processes observations with NULL/empty path_json that have raw_hex available,
// decoding hop paths and updating the column. Safe to run concurrently with ingest
// because new observations get path_json at write time; this only touches NULL rows.
// Idempotent: skips if migration already recorded.
func (s *Store) BackfillPathJSONAsync() {
s.backfillWg.Add(1)
go func() {
defer s.backfillWg.Done()
defer func() {
if r := recover(); r != nil {
log.Printf("[backfill] path_json async panic recovered: %v", r)
}
}()
var migDone int
row := s.db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'backfill_path_json_from_raw_hex_v1'")
if row.Scan(&migDone) == nil {
return // already done
}
log.Println("[backfill] Starting async path_json backfill from raw_hex...")
updated := 0
errored := false
const batchSize = 1000
batchNum := 0
for {
rows, err := s.db.Query(`
SELECT o.id, o.raw_hex
FROM observations o
JOIN transmissions t ON o.transmission_id = t.id
WHERE o.raw_hex IS NOT NULL AND o.raw_hex != ''
-- NB: '[]' is the "already attempted, no hops" sentinel; excluded
-- to prevent the infinite re-UPDATE loop fixed in #1119.
AND (o.path_json IS NULL OR o.path_json = '')
AND t.payload_type != 9
LIMIT ?`, batchSize)
if err != nil {
log.Printf("[backfill] path_json query error: %v", err)
errored = true
break
}
type pendingRow struct {
id int64
rawHex string
}
var batch []pendingRow
for rows.Next() {
var r pendingRow
if err := rows.Scan(&r.id, &r.rawHex); err == nil {
batch = append(batch, r)
}
}
rows.Close()
if len(batch) == 0 {
break
}
for _, r := range batch {
hops, err := packetpath.DecodePathFromRawHex(r.rawHex)
if err != nil || len(hops) == 0 {
if _, execErr := s.db.Exec(`UPDATE observations SET path_json = '[]' WHERE id = ?`, r.id); execErr != nil {
log.Printf("[backfill] write error (id=%d): %v", r.id, execErr)
} else {
s.Stats.IncBackfill("path_json")
}
continue
}
b, _ := json.Marshal(hops)
if _, execErr := s.db.Exec(`UPDATE observations SET path_json = ? WHERE id = ?`, string(b), r.id); execErr != nil {
log.Printf("[backfill] write error (id=%d): %v", r.id, execErr)
} else {
updated++
s.Stats.IncBackfill("path_json")
}
}
batchNum++
if batchNum%50 == 0 {
log.Printf("[backfill] progress: %d observations updated so far (%d batches)", updated, batchNum)
}
// Throttle: yield to ingest writers between batches
time.Sleep(50 * time.Millisecond)
}
log.Printf("[backfill] Async path_json backfill complete: %d observations updated", updated)
if !errored {
s.db.Exec(`INSERT INTO _migrations (name) VALUES ('backfill_path_json_from_raw_hex_v1')`)
} else {
log.Printf("[backfill] NOT recording migration due to errors — will retry on next restart")
}
}()
}
// BackfillDefaultScopeAsync populates default_scope for existing nodes that have
// transport-scoped ADVERT rows (scope_name IS NOT NULL AND scope_name != “).
// Runs in a background goroutine so it does not block MQTT startup.
// Uses the from_pubkey index — O(nodes × indexed lookup), not a full table scan.
//
// Concurrency: the store uses SetMaxOpenConns(1) so all DB writes — including
// MQTT packet inserts and any concurrent backfill goroutines — serialize through
// the single connection pool. busy_timeout(5000) handles transient cross-process
// contention with the read-only server process. No additional locking is needed.
func (s *Store) BackfillDefaultScopeAsync(regionKeys map[string][]byte) {
// No region keys configured — all scope_name values will be NULL, nothing to backfill.
if len(regionKeys) == 0 {
return
}
s.backfillWg.Add(1)
go func() {
defer s.backfillWg.Done()
defer func() {
if r := recover(); r != nil {
log.Printf("[backfill] default_scope async panic recovered: %v", r)
}
}()
var done int
if s.db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'backfill_default_scope_v1'").Scan(&done) == nil {
return // already ran
}
res, err := s.db.Exec(`
UPDATE nodes SET default_scope = (
SELECT t.scope_name FROM transmissions t
WHERE t.from_pubkey = nodes.public_key
AND t.payload_type = 4
AND t.scope_name IS NOT NULL AND t.scope_name != ''
ORDER BY t.first_seen DESC LIMIT 1 -- most-recently observed scope wins; first_seen is insertion time
) WHERE EXISTS (
SELECT 1 FROM transmissions t
WHERE t.from_pubkey = nodes.public_key
AND t.payload_type = 4
AND t.scope_name IS NOT NULL AND t.scope_name != ''
)`)
if err != nil {
log.Printf("[backfill] default_scope: %v", err)
return
}
n, _ := res.RowsAffected()
s.Stats.IncBackfill("default_scope")
log.Printf("[backfill] default_scope populated for %d nodes", n)
s.db.Exec(`INSERT INTO _migrations (name) VALUES ('backfill_default_scope_v1')`)
}()
}
// LogStats logs current operational metrics.
func (s *Store) LogStats() {
log.Printf("[stats] tx_inserted=%d tx_dupes=%d obs_inserted=%d node_upserts=%d observer_upserts=%d write_errors=%d sig_drops=%d",
s.Stats.TransmissionsInserted.Load(),
s.Stats.DuplicateTransmissions.Load(),
s.Stats.ObservationsInserted.Load(),
s.Stats.NodeUpserts.Load(),
s.Stats.ObserverUpserts.Load(),
s.Stats.WriteErrors.Load(),
s.Stats.SignatureDrops.Load(),
)
}
// MoveStaleNodes moves nodes not seen in nodeDays to the inactive_nodes table.
// Returns the number of nodes moved.
func (s *Store) MoveStaleNodes(nodeDays int) (int64, error) {
cutoff := time.Now().UTC().AddDate(0, 0, -nodeDays).Format(time.RFC3339)
tx, err := s.db.Begin()
if err != nil {
return 0, fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback()
_, err = tx.Exec(`INSERT OR REPLACE INTO inactive_nodes SELECT * FROM nodes WHERE last_seen < ?`, cutoff)
if err != nil {
return 0, fmt.Errorf("insert inactive: %w", err)
}
result, err := tx.Exec(`DELETE FROM nodes WHERE last_seen < ?`, cutoff)
if err != nil {
return 0, fmt.Errorf("delete stale: %w", err)
}
moved, _ := result.RowsAffected()
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("commit: %w", err)
}
if moved > 0 {
log.Printf("Moved %d node(s) to inactive_nodes (not seen in %d days)", moved, nodeDays)
}
return moved, nil
}
// RemoveStaleObservers marks observers that have not actively sent data in observerDays
// as inactive (soft-delete). This preserves JOIN integrity for observations.observer_idx
// and observer_metrics.observer_id — historical data still references the correct observer.
// An observer must actively send data to stay listed — being seen by another node does not count.
// observerDays <= -1 means never remove (keep forever).
func (s *Store) RemoveStaleObservers(observerDays int) (int64, error) {
if observerDays <= -1 {
return 0, nil // keep forever
}
cutoff := time.Now().UTC().AddDate(0, 0, -observerDays).Format(time.RFC3339)
result, err := s.db.Exec(`UPDATE observers SET inactive = 1 WHERE last_seen < ? AND (inactive IS NULL OR inactive = 0)`, cutoff)
if err != nil {
return 0, fmt.Errorf("mark stale observers inactive: %w", err)
}
removed, _ := result.RowsAffected()
if removed > 0 {
// Clean up orphaned metrics for now-inactive observers
s.db.Exec(`DELETE FROM observer_metrics WHERE observer_id IN (SELECT id FROM observers WHERE inactive = 1)`)
log.Printf("Marked %d observer(s) as inactive (not seen in %d days)", removed, observerDays)
}
return removed, nil
}
// DroppedPacket holds data for a packet rejected during ingest.
type DroppedPacket struct {
Hash string
RawHex string
Reason string
ObserverID string
ObserverName string
NodePubKey string
NodeName string
}
// InsertDroppedPacket records a rejected packet in the dropped_packets table.
func (s *Store) InsertDroppedPacket(dp *DroppedPacket) error {
_, err := s.db.Exec(
`INSERT INTO dropped_packets (hash, raw_hex, reason, observer_id, observer_name, node_pubkey, node_name) VALUES (?, ?, ?, ?, ?, ?, ?)`,
dp.Hash, dp.RawHex, dp.Reason, dp.ObserverID, dp.ObserverName, dp.NodePubKey, dp.NodeName,
)
if err != nil {
s.Stats.WriteErrors.Add(1)
return fmt.Errorf("insert dropped packet: %w", err)
}
s.Stats.SignatureDrops.Add(1)
return nil
}
// PruneDroppedPackets removes dropped_packets older than retentionDays.
func (s *Store) PruneDroppedPackets(retentionDays int) (int64, error) {
if retentionDays <= 0 {
return 0, nil
}
cutoff := time.Now().UTC().AddDate(0, 0, -retentionDays).Format(time.RFC3339)
result, err := s.db.Exec(`DELETE FROM dropped_packets WHERE dropped_at < ?`, cutoff)
if err != nil {
return 0, fmt.Errorf("prune dropped packets: %w", err)
}
n, _ := result.RowsAffected()
if n > 0 {
log.Printf("Pruned %d dropped packet(s) older than %d days", n, retentionDays)
}
return n, nil
}
// PacketData holds the data needed to insert a packet into the DB.
type PacketData struct {
RawHex string
Timestamp string
ObserverID string
ObserverName string
SNR *float64
RSSI *float64
Score *float64
Direction *string
Hash string
RouteType int
PayloadType int
PayloadVersion int
PathJSON string
DecodedJSON string
ChannelHash string // grouping key for channel queries (#762)
ScopeName string // matched region name, or "" for unknown-scoped
IsTransportScoped bool // true when route_type IN (0,3) AND Code1 ≠ "0000"
Region string // observer region: payload > topic > source config (#788)
Foreign bool // true when ADVERT GPS lies outside configured geofilter (#730)
FromPubkey string // pubkey of the originating node, for exact-match attribution (#1143)
}
// nilIfEmpty returns nil for empty strings (for nullable DB columns).
func nilIfEmpty(s string) interface{} {
if s == "" {
return nil
}
return s
}
// scopeNameForDB encodes PacketData scope semantics for DB storage:
// non-transport-scoped → nil (SQL NULL); transport-scoped → pointer to ScopeName
// (may be "" for unknown region, "#name" for matched region).
func scopeNameForDB(data *PacketData) *string {
if !data.IsTransportScoped {
return nil
}
s := data.ScopeName
return &s
}
// UpdateNodeDefaultScope records the most-recently observed region scope for a
// node. Skips the UPDATE when the stored value already matches to avoid
// redundant writes on the hot MQTT ingest path. Updates both nodes and
// inactive_nodes to stay consistent.
func (s *Store) UpdateNodeDefaultScope(pubkey, scope string) error {
// Short-circuit: skip if already stored.
var cur sql.NullString
row := s.db.QueryRow(`SELECT default_scope FROM nodes WHERE public_key = ?`, pubkey)
if row.Scan(&cur) == nil && cur.Valid && cur.String == scope {
return nil
}
if _, err := s.db.Exec(`UPDATE nodes SET default_scope = ? WHERE public_key = ?`, scope, pubkey); err != nil {
return err
}
// Mirror to inactive_nodes (node may be there if recently moved by retention).
_, err := s.db.Exec(`UPDATE inactive_nodes SET default_scope = ? WHERE public_key = ?`, scope, pubkey)
return err
}
// MQTTPacketMessage is the JSON payload from an MQTT raw packet message.
type MQTTPacketMessage struct {
Raw string `json:"raw"`
SNR *float64 `json:"SNR"`
RSSI *float64 `json:"RSSI"`
Score *float64 `json:"score"`
Direction *string `json:"direction"`
Origin string `json:"origin"`
Region string `json:"region,omitempty"` // optional region override (#788)
Timestamp string `json:"timestamp,omitempty"` // observer receive time, resolved by handler
}
// BuildPacketData constructs a PacketData from a decoded packet and MQTT message.
// path_json is derived directly from raw_hex header bytes (not decoded.Path.Hops)
// to guarantee the stored path always matches the raw bytes. This matters for
// TRACE packets where decoded.Path.Hops is overwritten with payload hops (#886).
//
// Timestamp is server ingest time (time.Now()), NOT msg.Timestamp (#1370):
// PR #1233 (commit 498fbc03) routed the envelope timestamp into
// PacketData.Timestamp on the premise that uploader-stamped envelope time
// was trustworthy. Issue #1370 disproved that premise — observers with
// broken client clocks (staging Voodoo3 tx 304114: 4/5 obs stamped 18:42
// while genuine receive was 01:42) poisoned transmissions.first_seen /
// observations.timestamp and dragged the /api/channels lastActivity 7h
// into the past. Packet ordering is owned by the server clock; client
// clocks are untrusted. msg.Timestamp still flows into observer.last_seen
// via UpsertObserverAt — that's #1233's MAX/MIN guarded path and is fine.
func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID, region string, regionKeys map[string][]byte) *PacketData {
pathJSON := "[]"
// For TRACE packets, path_json must be the payload-decoded route hops
// (decoded.Path.Hops), NOT the raw_hex header bytes which are SNR values.
// For all other packet types, derive path from raw_hex (#886).
if !packetpath.PathBytesAreHops(byte(decoded.Header.PayloadType)) {
if len(decoded.Path.Hops) > 0 {
b, _ := json.Marshal(decoded.Path.Hops)
pathJSON = string(b)
}
} else if hops, err := packetpath.DecodePathFromRawHex(msg.Raw); err == nil && len(hops) > 0 {
b, _ := json.Marshal(hops)
pathJSON = string(b)
}
pd := &PacketData{
RawHex: msg.Raw,
Timestamp: time.Now().UTC().Format(time.RFC3339), // #1370 (counters #1233)
ObserverID: observerID,
ObserverName: msg.Origin,
SNR: msg.SNR,
RSSI: msg.RSSI,
Score: msg.Score,
Direction: msg.Direction,
Hash: ComputeContentHash(msg.Raw),
RouteType: decoded.Header.RouteType,
PayloadType: decoded.Header.PayloadType,
PayloadVersion: decoded.Header.PayloadVersion,
PathJSON: pathJSON,
DecodedJSON: PayloadJSON(&decoded.Payload),
}
// Region priority: payload field > topic-derived parameter (#788)
if msg.Region != "" {
pd.Region = msg.Region
} else {
pd.Region = region
}
// Populate channel_hash for fast channel queries (#762)
if decoded.Header.PayloadType == PayloadGRP_TXT {
if decoded.Payload.Type == "CHAN" && decoded.Payload.Channel != "" {
pd.ChannelHash = decoded.Payload.Channel
} else if decoded.Payload.Type == "GRP_TXT" && decoded.Payload.ChannelHashHex != "" {
pd.ChannelHash = "enc_" + decoded.Payload.ChannelHashHex
}
}
if decoded.TransportCodes != nil && decoded.TransportCodes.Code1 != "0000" {
pd.IsTransportScoped = true
pd.ScopeName = matchScope(regionKeys, byte(decoded.Header.PayloadType), decoded.payloadRaw, decoded.TransportCodes.Code1)
}
// Populate from_pubkey at write time (#1143). ADVERTs carry the
// originating node's pubkey directly; other packet types stay NULL
// (downstream attribution queries handle NULL gracefully).
if decoded.Header.PayloadType == PayloadADVERT && decoded.Payload.PubKey != "" {
pd.FromPubkey = decoded.Payload.PubKey
}
return pd
}