From 9c5ffbfb0c34c49dbe78d5b7782f24d3d3314d06 Mon Sep 17 00:00:00 2001 From: you Date: Sat, 28 Mar 2026 16:16:07 +0000 Subject: [PATCH 1/4] fix: resolve SQLite SQLITE_BUSY write contention in ingestor Three changes to eliminate concurrent write collisions: 1. Add _busy_timeout=5000 to ingestor SQLite DSN (matches server) - SQLite will wait up to 5s for the write lock instead of immediately returning SQLITE_BUSY 2. Set SetMaxOpenConns(1) on ingestor DB connection pool - Serializes all DB access at the Go sql.DB level - Prevents multiple goroutines from opening overlapping writes 3. Change SetOrderMatters(false) to SetOrderMatters(true) - MQTT handlers now run sequentially per client - Eliminates concurrent handler execution that caused overlapping multi-statement write flows Root cause: concurrent MQTT handlers (SetOrderMatters=false) each performed multiple separate writes (transmission lookup/insert, observation insert, node upsert, observer upsert) without transactions or connection limits. SQLite only permits one writer at a time, so under bursty MQTT traffic the ingestor was competing with itself. --- cmd/ingestor/db.go | 846 ++++++++++++++++++------------------- cmd/ingestor/main.go | 982 +++++++++++++++++++++---------------------- 2 files changed, 915 insertions(+), 913 deletions(-) diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index 45fff738..230173f3 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -1,422 +1,424 @@ -package main - -import ( - "database/sql" - "encoding/json" - "fmt" - "log" - "os" - "path/filepath" - "time" - - _ "modernc.org/sqlite" -) - -// Store wraps the SQLite database for packet ingestion. -type Store struct { - db *sql.DB - - stmtGetTxByHash *sql.Stmt - stmtInsertTransmission *sql.Stmt - stmtUpdateTxFirstSeen *sql.Stmt - stmtInsertObservation *sql.Stmt - stmtUpsertNode *sql.Stmt - stmtIncrementAdvertCount *sql.Stmt - stmtUpsertObserver *sql.Stmt - stmtGetObserverRowid *sql.Stmt -} - -// OpenStore opens or creates a SQLite DB at the given path, applying the -// v3 schema that is compatible with the Node.js server. -func OpenStore(dbPath string) (*Store, error) { - dir := filepath.Dir(dbPath) - if err := os.MkdirAll(dir, 0o755); err != nil { - return nil, fmt.Errorf("creating data dir: %w", err) - } - - db, err := sql.Open("sqlite", dbPath+"?_pragma=journal_mode(WAL)&_pragma=foreign_keys(ON)") - if err != nil { - return nil, fmt.Errorf("opening db: %w", err) - } - - if err := db.Ping(); err != nil { - return nil, fmt.Errorf("pinging db: %w", err) - } - - if err := applySchema(db); err != nil { - return nil, fmt.Errorf("applying schema: %w", err) - } - - s := &Store{db: db} - if err := s.prepareStatements(); err != nil { - return nil, fmt.Errorf("preparing statements: %w", err) - } - - return s, nil -} - -func applySchema(db *sql.DB) error { - schema := ` - CREATE TABLE IF NOT EXISTS nodes ( - public_key TEXT PRIMARY KEY, - name TEXT, - role TEXT, - lat REAL, - lon REAL, - last_seen TEXT, - first_seen TEXT, - advert_count INTEGER DEFAULT 0 - ); - - CREATE TABLE IF NOT EXISTS observers ( - id TEXT PRIMARY KEY, - name TEXT, - iata TEXT, - last_seen TEXT, - first_seen TEXT, - packet_count INTEGER DEFAULT 0, - model TEXT, - firmware TEXT, - client_version TEXT, - radio TEXT, - battery_mv INTEGER, - uptime_secs INTEGER, - noise_floor INTEGER - ); - - CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen); - CREATE INDEX IF NOT EXISTS idx_observers_last_seen ON observers(last_seen); - - CREATE TABLE IF NOT EXISTS inactive_nodes ( - public_key TEXT PRIMARY KEY, - name TEXT, - role TEXT, - lat REAL, - lon REAL, - last_seen TEXT, - first_seen TEXT, - advert_count INTEGER DEFAULT 0 - ); - - CREATE INDEX IF NOT EXISTS idx_inactive_nodes_last_seen ON inactive_nodes(last_seen); - - CREATE TABLE IF NOT EXISTS transmissions ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - raw_hex TEXT NOT NULL, - hash TEXT NOT NULL UNIQUE, - first_seen TEXT NOT NULL, - route_type INTEGER, - payload_type INTEGER, - payload_version INTEGER, - decoded_json TEXT, - created_at TEXT DEFAULT (datetime('now')) - ); - - CREATE INDEX IF NOT EXISTS idx_transmissions_hash ON transmissions(hash); - CREATE INDEX IF NOT EXISTS idx_transmissions_first_seen ON transmissions(first_seen); - CREATE INDEX IF NOT EXISTS idx_transmissions_payload_type ON transmissions(payload_type); - ` - if _, err := db.Exec(schema); err != nil { - return fmt.Errorf("base schema: %w", err) - } - - // Create observations table (v3 schema) - obsExists := false - row := db.QueryRow("SELECT name FROM sqlite_master WHERE type='table' AND name='observations'") - var dummy string - if row.Scan(&dummy) == nil { - obsExists = true - } - - if !obsExists { - obs := ` - CREATE TABLE observations ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - transmission_id INTEGER NOT NULL REFERENCES transmissions(id), - observer_idx INTEGER, - direction TEXT, - snr REAL, - rssi REAL, - score INTEGER, - path_json TEXT, - timestamp INTEGER NOT NULL - ); - CREATE INDEX idx_observations_transmission_id ON observations(transmission_id); - CREATE INDEX idx_observations_observer_idx ON observations(observer_idx); - CREATE INDEX idx_observations_timestamp ON observations(timestamp); - CREATE UNIQUE INDEX IF NOT EXISTS idx_observations_dedup ON observations(transmission_id, observer_idx, COALESCE(path_json, '')); - ` - if _, err := db.Exec(obs); err != nil { - return fmt.Errorf("observations schema: %w", err) - } - } - - // One-time migration: recalculate advert_count to count unique transmissions only - db.Exec(`CREATE TABLE IF NOT EXISTS _migrations (name TEXT PRIMARY KEY)`) - var migDone int - row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'advert_count_unique_v1'") - if row.Scan(&migDone) != nil { - log.Println("[migration] Recalculating advert_count (unique transmissions only)...") - db.Exec(` - UPDATE nodes SET advert_count = ( - SELECT COUNT(*) FROM transmissions t - WHERE t.payload_type = 4 - AND t.decoded_json LIKE '%' || nodes.public_key || '%' - ) - `) - db.Exec(`INSERT INTO _migrations (name) VALUES ('advert_count_unique_v1')`) - log.Println("[migration] advert_count recalculated") - } - - return nil -} - -func (s *Store) prepareStatements() error { - var err error - - s.stmtGetTxByHash, err = s.db.Prepare("SELECT id, first_seen FROM transmissions WHERE hash = ?") - if err != nil { - return err - } - - s.stmtInsertTransmission, err = s.db.Prepare(` - INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json) - VALUES (?, ?, ?, ?, ?, ?, ?) - `) - if err != nil { - return err - } - - s.stmtUpdateTxFirstSeen, err = s.db.Prepare("UPDATE transmissions SET first_seen = ? WHERE id = ?") - if err != nil { - return err - } - - s.stmtInsertObservation, err = s.db.Prepare(` - INSERT OR IGNORE INTO observations (transmission_id, observer_idx, direction, snr, rssi, score, path_json, timestamp) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) - `) - if err != nil { - return err - } - - s.stmtUpsertNode, err = s.db.Prepare(` - INSERT INTO nodes (public_key, name, role, lat, lon, last_seen, first_seen) - VALUES (?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(public_key) DO UPDATE SET - name = COALESCE(?, name), - role = COALESCE(?, role), - lat = COALESCE(?, lat), - lon = COALESCE(?, lon), - last_seen = ? - `) - if err != nil { - return err - } - - s.stmtIncrementAdvertCount, err = s.db.Prepare(` - UPDATE nodes SET advert_count = advert_count + 1 WHERE public_key = ? - `) - if err != nil { - return err - } - - s.stmtUpsertObserver, err = s.db.Prepare(` - INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count) - VALUES (?, ?, ?, ?, ?, 1) - ON CONFLICT(id) DO UPDATE SET - name = COALESCE(?, name), - iata = COALESCE(?, iata), - last_seen = ?, - packet_count = packet_count + 1 - `) - if err != nil { - return err - } - - s.stmtGetObserverRowid, err = s.db.Prepare("SELECT rowid FROM observers WHERE id = ?") - if err != nil { - return err - } - - return nil -} - -// InsertTransmission inserts a decoded packet into transmissions + observations. -// Returns true if a new transmission was created (not a duplicate hash). -func (s *Store) InsertTransmission(data *PacketData) (bool, error) { - hash := data.Hash - if hash == "" { - return false, nil - } - - now := data.Timestamp - if now == "" { - now = time.Now().UTC().Format(time.RFC3339) - } - - var txID int64 - isNew := false - - // Check for existing transmission - var existingID int64 - var existingFirstSeen string - err := s.stmtGetTxByHash.QueryRow(hash).Scan(&existingID, &existingFirstSeen) - if err == nil { - // Existing transmission - txID = existingID - if now < existingFirstSeen { - _, _ = s.stmtUpdateTxFirstSeen.Exec(now, txID) - } - } else { - // New transmission - isNew = true - result, err := s.stmtInsertTransmission.Exec( - data.RawHex, hash, now, - data.RouteType, data.PayloadType, data.PayloadVersion, - data.DecodedJSON, - ) - if err != nil { - return false, fmt.Errorf("insert transmission: %w", err) - } - txID, _ = result.LastInsertId() - } - - // Resolve observer_idx - var observerIdx *int64 - if data.ObserverID != "" { - var rowid int64 - err := s.stmtGetObserverRowid.QueryRow(data.ObserverID).Scan(&rowid) - if err == nil { - observerIdx = &rowid - } - } - - // Insert observation - epochTs := time.Now().Unix() - if t, err := time.Parse(time.RFC3339, now); err == nil { - epochTs = t.Unix() - } - - _, err = s.stmtInsertObservation.Exec( - txID, observerIdx, nil, // direction - data.SNR, data.RSSI, nil, // score - data.PathJSON, epochTs, - ) - if err != nil { - log.Printf("[db] observation insert (non-fatal): %v", err) - } - - return isNew, nil -} - -// UpsertNode inserts or updates a node. -func (s *Store) UpsertNode(pubKey, name, role string, lat, lon *float64, lastSeen string) error { - now := lastSeen - if now == "" { - now = time.Now().UTC().Format(time.RFC3339) - } - _, err := s.stmtUpsertNode.Exec( - pubKey, name, role, lat, lon, now, now, - name, role, lat, lon, now, - ) - return err -} - -// IncrementAdvertCount increments advert_count for a node by public key. -func (s *Store) IncrementAdvertCount(pubKey string) error { - _, err := s.stmtIncrementAdvertCount.Exec(pubKey) - return err -} - -// UpsertObserver inserts or updates an observer. -func (s *Store) UpsertObserver(id, name, iata string) error { - now := time.Now().UTC().Format(time.RFC3339) - _, err := s.stmtUpsertObserver.Exec( - id, name, iata, now, now, - name, iata, now, - ) - return err -} - -// Close closes the database. -func (s *Store) Close() error { - return s.db.Close() -} - -// MoveStaleNodes moves nodes not seen in nodeDays to the inactive_nodes table. -// Returns the number of nodes moved. -func (s *Store) MoveStaleNodes(nodeDays int) (int64, error) { - cutoff := time.Now().UTC().AddDate(0, 0, -nodeDays).Format(time.RFC3339) - tx, err := s.db.Begin() - if err != nil { - return 0, fmt.Errorf("begin tx: %w", err) - } - defer tx.Rollback() - - _, err = tx.Exec(`INSERT OR REPLACE INTO inactive_nodes SELECT * FROM nodes WHERE last_seen < ?`, cutoff) - if err != nil { - return 0, fmt.Errorf("insert inactive: %w", err) - } - result, err := tx.Exec(`DELETE FROM nodes WHERE last_seen < ?`, cutoff) - if err != nil { - return 0, fmt.Errorf("delete stale: %w", err) - } - moved, _ := result.RowsAffected() - if err := tx.Commit(); err != nil { - return 0, fmt.Errorf("commit: %w", err) - } - if moved > 0 { - log.Printf("Moved %d node(s) to inactive_nodes (not seen in %d days)", moved, nodeDays) - } - return moved, nil -} - -// PacketData holds the data needed to insert a packet into the DB. -type PacketData struct { - RawHex string - Timestamp string - ObserverID string - ObserverName string - SNR *float64 - RSSI *float64 - Hash string - RouteType int - PayloadType int - PayloadVersion int - PathJSON string - DecodedJSON string -} - -// MQTTPacketMessage is the JSON payload from an MQTT raw packet message. -type MQTTPacketMessage struct { - Raw string `json:"raw"` - SNR *float64 `json:"SNR"` - RSSI *float64 `json:"RSSI"` - Origin string `json:"origin"` -} - -// BuildPacketData constructs a PacketData from a decoded packet and MQTT message. -func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID, region string) *PacketData { - now := time.Now().UTC().Format(time.RFC3339) - pathJSON := "[]" - if len(decoded.Path.Hops) > 0 { - b, _ := json.Marshal(decoded.Path.Hops) - pathJSON = string(b) - } - - return &PacketData{ - RawHex: msg.Raw, - Timestamp: now, - ObserverID: observerID, - ObserverName: msg.Origin, - SNR: msg.SNR, - RSSI: msg.RSSI, - Hash: ComputeContentHash(msg.Raw), - RouteType: decoded.Header.RouteType, - PayloadType: decoded.Header.PayloadType, - PayloadVersion: decoded.Header.PayloadVersion, - PathJSON: pathJSON, - DecodedJSON: PayloadJSON(&decoded.Payload), - } -} +package main + +import ( + "database/sql" + "encoding/json" + "fmt" + "log" + "os" + "path/filepath" + "time" + + _ "modernc.org/sqlite" +) + +// Store wraps the SQLite database for packet ingestion. +type Store struct { + db *sql.DB + + stmtGetTxByHash *sql.Stmt + stmtInsertTransmission *sql.Stmt + stmtUpdateTxFirstSeen *sql.Stmt + stmtInsertObservation *sql.Stmt + stmtUpsertNode *sql.Stmt + stmtIncrementAdvertCount *sql.Stmt + stmtUpsertObserver *sql.Stmt + stmtGetObserverRowid *sql.Stmt +} + +// OpenStore opens or creates a SQLite DB at the given path, applying the +// v3 schema that is compatible with the Node.js server. +func OpenStore(dbPath string) (*Store, error) { + dir := filepath.Dir(dbPath) + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, fmt.Errorf("creating data dir: %w", err) + } + + db, err := sql.Open("sqlite", dbPath+"?_pragma=journal_mode(WAL)&_pragma=foreign_keys(ON)&_pragma=busy_timeout(5000)") + if err != nil { + return nil, fmt.Errorf("opening db: %w", err) + } + + if err := db.Ping(); err != nil { + return nil, fmt.Errorf("pinging db: %w", err) + } + + db.SetMaxOpenConns(1) + + if err := applySchema(db); err != nil { + return nil, fmt.Errorf("applying schema: %w", err) + } + + s := &Store{db: db} + if err := s.prepareStatements(); err != nil { + return nil, fmt.Errorf("preparing statements: %w", err) + } + + return s, nil +} + +func applySchema(db *sql.DB) error { + schema := ` + CREATE TABLE IF NOT EXISTS nodes ( + public_key TEXT PRIMARY KEY, + name TEXT, + role TEXT, + lat REAL, + lon REAL, + last_seen TEXT, + first_seen TEXT, + advert_count INTEGER DEFAULT 0 + ); + + CREATE TABLE IF NOT EXISTS observers ( + id TEXT PRIMARY KEY, + name TEXT, + iata TEXT, + last_seen TEXT, + first_seen TEXT, + packet_count INTEGER DEFAULT 0, + model TEXT, + firmware TEXT, + client_version TEXT, + radio TEXT, + battery_mv INTEGER, + uptime_secs INTEGER, + noise_floor INTEGER + ); + + CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen); + CREATE INDEX IF NOT EXISTS idx_observers_last_seen ON observers(last_seen); + + CREATE TABLE IF NOT EXISTS inactive_nodes ( + public_key TEXT PRIMARY KEY, + name TEXT, + role TEXT, + lat REAL, + lon REAL, + last_seen TEXT, + first_seen TEXT, + advert_count INTEGER DEFAULT 0 + ); + + CREATE INDEX IF NOT EXISTS idx_inactive_nodes_last_seen ON inactive_nodes(last_seen); + + CREATE TABLE IF NOT EXISTS transmissions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + raw_hex TEXT NOT NULL, + hash TEXT NOT NULL UNIQUE, + first_seen TEXT NOT NULL, + route_type INTEGER, + payload_type INTEGER, + payload_version INTEGER, + decoded_json TEXT, + created_at TEXT DEFAULT (datetime('now')) + ); + + CREATE INDEX IF NOT EXISTS idx_transmissions_hash ON transmissions(hash); + CREATE INDEX IF NOT EXISTS idx_transmissions_first_seen ON transmissions(first_seen); + CREATE INDEX IF NOT EXISTS idx_transmissions_payload_type ON transmissions(payload_type); + ` + if _, err := db.Exec(schema); err != nil { + return fmt.Errorf("base schema: %w", err) + } + + // Create observations table (v3 schema) + obsExists := false + row := db.QueryRow("SELECT name FROM sqlite_master WHERE type='table' AND name='observations'") + var dummy string + if row.Scan(&dummy) == nil { + obsExists = true + } + + if !obsExists { + obs := ` + CREATE TABLE observations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + transmission_id INTEGER NOT NULL REFERENCES transmissions(id), + observer_idx INTEGER, + direction TEXT, + snr REAL, + rssi REAL, + score INTEGER, + path_json TEXT, + timestamp INTEGER NOT NULL + ); + CREATE INDEX idx_observations_transmission_id ON observations(transmission_id); + CREATE INDEX idx_observations_observer_idx ON observations(observer_idx); + CREATE INDEX idx_observations_timestamp ON observations(timestamp); + CREATE UNIQUE INDEX IF NOT EXISTS idx_observations_dedup ON observations(transmission_id, observer_idx, COALESCE(path_json, '')); + ` + if _, err := db.Exec(obs); err != nil { + return fmt.Errorf("observations schema: %w", err) + } + } + + // One-time migration: recalculate advert_count to count unique transmissions only + db.Exec(`CREATE TABLE IF NOT EXISTS _migrations (name TEXT PRIMARY KEY)`) + var migDone int + row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'advert_count_unique_v1'") + if row.Scan(&migDone) != nil { + log.Println("[migration] Recalculating advert_count (unique transmissions only)...") + db.Exec(` + UPDATE nodes SET advert_count = ( + SELECT COUNT(*) FROM transmissions t + WHERE t.payload_type = 4 + AND t.decoded_json LIKE '%' || nodes.public_key || '%' + ) + `) + db.Exec(`INSERT INTO _migrations (name) VALUES ('advert_count_unique_v1')`) + log.Println("[migration] advert_count recalculated") + } + + return nil +} + +func (s *Store) prepareStatements() error { + var err error + + s.stmtGetTxByHash, err = s.db.Prepare("SELECT id, first_seen FROM transmissions WHERE hash = ?") + if err != nil { + return err + } + + s.stmtInsertTransmission, err = s.db.Prepare(` + INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json) + VALUES (?, ?, ?, ?, ?, ?, ?) + `) + if err != nil { + return err + } + + s.stmtUpdateTxFirstSeen, err = s.db.Prepare("UPDATE transmissions SET first_seen = ? WHERE id = ?") + if err != nil { + return err + } + + s.stmtInsertObservation, err = s.db.Prepare(` + INSERT OR IGNORE INTO observations (transmission_id, observer_idx, direction, snr, rssi, score, path_json, timestamp) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `) + if err != nil { + return err + } + + s.stmtUpsertNode, err = s.db.Prepare(` + INSERT INTO nodes (public_key, name, role, lat, lon, last_seen, first_seen) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(public_key) DO UPDATE SET + name = COALESCE(?, name), + role = COALESCE(?, role), + lat = COALESCE(?, lat), + lon = COALESCE(?, lon), + last_seen = ? + `) + if err != nil { + return err + } + + s.stmtIncrementAdvertCount, err = s.db.Prepare(` + UPDATE nodes SET advert_count = advert_count + 1 WHERE public_key = ? + `) + if err != nil { + return err + } + + s.stmtUpsertObserver, err = s.db.Prepare(` + INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count) + VALUES (?, ?, ?, ?, ?, 1) + ON CONFLICT(id) DO UPDATE SET + name = COALESCE(?, name), + iata = COALESCE(?, iata), + last_seen = ?, + packet_count = packet_count + 1 + `) + if err != nil { + return err + } + + s.stmtGetObserverRowid, err = s.db.Prepare("SELECT rowid FROM observers WHERE id = ?") + if err != nil { + return err + } + + return nil +} + +// InsertTransmission inserts a decoded packet into transmissions + observations. +// Returns true if a new transmission was created (not a duplicate hash). +func (s *Store) InsertTransmission(data *PacketData) (bool, error) { + hash := data.Hash + if hash == "" { + return false, nil + } + + now := data.Timestamp + if now == "" { + now = time.Now().UTC().Format(time.RFC3339) + } + + var txID int64 + isNew := false + + // Check for existing transmission + var existingID int64 + var existingFirstSeen string + err := s.stmtGetTxByHash.QueryRow(hash).Scan(&existingID, &existingFirstSeen) + if err == nil { + // Existing transmission + txID = existingID + if now < existingFirstSeen { + _, _ = s.stmtUpdateTxFirstSeen.Exec(now, txID) + } + } else { + // New transmission + isNew = true + result, err := s.stmtInsertTransmission.Exec( + data.RawHex, hash, now, + data.RouteType, data.PayloadType, data.PayloadVersion, + data.DecodedJSON, + ) + if err != nil { + return false, fmt.Errorf("insert transmission: %w", err) + } + txID, _ = result.LastInsertId() + } + + // Resolve observer_idx + var observerIdx *int64 + if data.ObserverID != "" { + var rowid int64 + err := s.stmtGetObserverRowid.QueryRow(data.ObserverID).Scan(&rowid) + if err == nil { + observerIdx = &rowid + } + } + + // Insert observation + epochTs := time.Now().Unix() + if t, err := time.Parse(time.RFC3339, now); err == nil { + epochTs = t.Unix() + } + + _, err = s.stmtInsertObservation.Exec( + txID, observerIdx, nil, // direction + data.SNR, data.RSSI, nil, // score + data.PathJSON, epochTs, + ) + if err != nil { + log.Printf("[db] observation insert (non-fatal): %v", err) + } + + return isNew, nil +} + +// UpsertNode inserts or updates a node. +func (s *Store) UpsertNode(pubKey, name, role string, lat, lon *float64, lastSeen string) error { + now := lastSeen + if now == "" { + now = time.Now().UTC().Format(time.RFC3339) + } + _, err := s.stmtUpsertNode.Exec( + pubKey, name, role, lat, lon, now, now, + name, role, lat, lon, now, + ) + return err +} + +// IncrementAdvertCount increments advert_count for a node by public key. +func (s *Store) IncrementAdvertCount(pubKey string) error { + _, err := s.stmtIncrementAdvertCount.Exec(pubKey) + return err +} + +// UpsertObserver inserts or updates an observer. +func (s *Store) UpsertObserver(id, name, iata string) error { + now := time.Now().UTC().Format(time.RFC3339) + _, err := s.stmtUpsertObserver.Exec( + id, name, iata, now, now, + name, iata, now, + ) + return err +} + +// Close closes the database. +func (s *Store) Close() error { + return s.db.Close() +} + +// MoveStaleNodes moves nodes not seen in nodeDays to the inactive_nodes table. +// Returns the number of nodes moved. +func (s *Store) MoveStaleNodes(nodeDays int) (int64, error) { + cutoff := time.Now().UTC().AddDate(0, 0, -nodeDays).Format(time.RFC3339) + tx, err := s.db.Begin() + if err != nil { + return 0, fmt.Errorf("begin tx: %w", err) + } + defer tx.Rollback() + + _, err = tx.Exec(`INSERT OR REPLACE INTO inactive_nodes SELECT * FROM nodes WHERE last_seen < ?`, cutoff) + if err != nil { + return 0, fmt.Errorf("insert inactive: %w", err) + } + result, err := tx.Exec(`DELETE FROM nodes WHERE last_seen < ?`, cutoff) + if err != nil { + return 0, fmt.Errorf("delete stale: %w", err) + } + moved, _ := result.RowsAffected() + if err := tx.Commit(); err != nil { + return 0, fmt.Errorf("commit: %w", err) + } + if moved > 0 { + log.Printf("Moved %d node(s) to inactive_nodes (not seen in %d days)", moved, nodeDays) + } + return moved, nil +} + +// PacketData holds the data needed to insert a packet into the DB. +type PacketData struct { + RawHex string + Timestamp string + ObserverID string + ObserverName string + SNR *float64 + RSSI *float64 + Hash string + RouteType int + PayloadType int + PayloadVersion int + PathJSON string + DecodedJSON string +} + +// MQTTPacketMessage is the JSON payload from an MQTT raw packet message. +type MQTTPacketMessage struct { + Raw string `json:"raw"` + SNR *float64 `json:"SNR"` + RSSI *float64 `json:"RSSI"` + Origin string `json:"origin"` +} + +// BuildPacketData constructs a PacketData from a decoded packet and MQTT message. +func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID, region string) *PacketData { + now := time.Now().UTC().Format(time.RFC3339) + pathJSON := "[]" + if len(decoded.Path.Hops) > 0 { + b, _ := json.Marshal(decoded.Path.Hops) + pathJSON = string(b) + } + + return &PacketData{ + RawHex: msg.Raw, + Timestamp: now, + ObserverID: observerID, + ObserverName: msg.Origin, + SNR: msg.SNR, + RSSI: msg.RSSI, + Hash: ComputeContentHash(msg.Raw), + RouteType: decoded.Header.RouteType, + PayloadType: decoded.Header.PayloadType, + PayloadVersion: decoded.Header.PayloadVersion, + PathJSON: pathJSON, + DecodedJSON: PayloadJSON(&decoded.Payload), + } +} diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index 60c1c941..8503372a 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -1,491 +1,491 @@ -package main - -import ( - "crypto/sha256" - "crypto/tls" - "encoding/hex" - "encoding/json" - "flag" - "fmt" - "log" - "os" - "os/signal" - "path/filepath" - "strings" - "syscall" - "time" - - mqtt "github.com/eclipse/paho.mqtt.golang" -) - -func main() { - configPath := flag.String("config", "config.json", "path to config file") - flag.Parse() - - log.SetFlags(log.LstdFlags | log.Lmsgprefix) - log.SetPrefix("[ingestor] ") - - cfg, err := LoadConfig(*configPath) - if err != nil { - log.Fatalf("config: %v", err) - } - - sources := cfg.ResolvedSources() - if len(sources) == 0 { - log.Fatal("no MQTT sources configured — set mqttSources in config or MQTT_BROKER env var") - } - - store, err := OpenStore(cfg.DBPath) - if err != nil { - log.Fatalf("db: %v", err) - } - defer store.Close() - log.Printf("SQLite opened: %s", cfg.DBPath) - - // Node retention: move stale nodes to inactive_nodes on startup - nodeDays := cfg.NodeDaysOrDefault() - store.MoveStaleNodes(nodeDays) - - // Daily ticker for node retention - retentionTicker := time.NewTicker(1 * time.Hour) - go func() { - for range retentionTicker.C { - store.MoveStaleNodes(nodeDays) - } - }() - - channelKeys := loadChannelKeys(cfg, *configPath) - if len(channelKeys) > 0 { - log.Printf("Loaded %d channel keys for GRP_TXT decryption", len(channelKeys)) - } else { - log.Printf("No channel keys loaded — GRP_TXT packets will not be decrypted") - } - - // Connect to each MQTT source - var clients []mqtt.Client - for _, source := range sources { - tag := source.Name - if tag == "" { - tag = source.Broker - } - - opts := mqtt.NewClientOptions(). - AddBroker(source.Broker). - SetAutoReconnect(true). - SetConnectRetry(true). - SetOrderMatters(false) - - if source.Username != "" { - opts.SetUsername(source.Username) - } - if source.Password != "" { - opts.SetPassword(source.Password) - } - if source.RejectUnauthorized != nil && !*source.RejectUnauthorized { - opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true}) - } else if strings.HasPrefix(source.Broker, "ssl://") { - opts.SetTLSConfig(&tls.Config{}) - } - - opts.SetOnConnectHandler(func(c mqtt.Client) { - log.Printf("MQTT [%s] connected to %s", tag, source.Broker) - topics := source.Topics - if len(topics) == 0 { - topics = []string{"meshcore/#"} - } - for _, t := range topics { - token := c.Subscribe(t, 0, nil) - token.Wait() - if token.Error() != nil { - log.Printf("MQTT [%s] subscribe error for %s: %v", tag, t, token.Error()) - } else { - log.Printf("MQTT [%s] subscribed to %s", tag, t) - } - } - }) - - opts.SetConnectionLostHandler(func(c mqtt.Client, err error) { - log.Printf("MQTT [%s] disconnected: %v", tag, err) - }) - - // Capture source for closure - src := source - opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) { - handleMessage(store, tag, src, m, channelKeys) - }) - - client := mqtt.NewClient(opts) - token := client.Connect() - token.Wait() - if token.Error() != nil { - log.Printf("MQTT [%s] connection failed (non-fatal): %v", tag, token.Error()) - continue - } - clients = append(clients, client) - } - - if len(clients) == 0 { - log.Fatal("no MQTT connections established") - } - - log.Printf("Running — %d MQTT source(s) connected", len(clients)) - - // Wait for shutdown signal - sig := make(chan os.Signal, 1) - signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) - <-sig - - log.Println("Shutting down...") - retentionTicker.Stop() - for _, c := range clients { - c.Disconnect(1000) - } - log.Println("Done.") -} - -func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, channelKeys map[string]string) { - defer func() { - if r := recover(); r != nil { - log.Printf("MQTT [%s] panic in handler: %v", tag, r) - } - }() - - topic := m.Topic() - parts := strings.Split(topic, "/") - - // IATA filter - if len(source.IATAFilter) > 0 && len(parts) > 1 { - region := parts[1] - matched := false - for _, f := range source.IATAFilter { - if f == region { - matched = true - break - } - } - if !matched { - return - } - } - - var msg map[string]interface{} - if err := json.Unmarshal(m.Payload(), &msg); err != nil { - return - } - - // Skip status/connection topics - if topic == "meshcore/status" || topic == "meshcore/events/connection" { - return - } - - // Status topic: meshcore///status - if len(parts) >= 4 && parts[3] == "status" { - observerID := parts[2] - name, _ := msg["origin"].(string) - iata := parts[1] - if err := store.UpsertObserver(observerID, name, iata); err != nil { - log.Printf("MQTT [%s] observer status error: %v", tag, err) - } - log.Printf("MQTT [%s] status: %s (%s)", tag, firstNonEmpty(name, observerID), iata) - return - } - - // Format 1: Raw packet (meshcoretomqtt / Cisien format) - rawHex, _ := msg["raw"].(string) - if rawHex != "" { - decoded, err := DecodePacket(rawHex, channelKeys) - if err != nil { - log.Printf("MQTT [%s] decode error: %v", tag, err) - return - } - - observerID := "" - region := "" - if len(parts) > 2 { - observerID = parts[2] - } - if len(parts) > 1 { - region = parts[1] - } - - mqttMsg := &MQTTPacketMessage{Raw: rawHex} - if v, ok := msg["SNR"]; ok { - if f, ok := toFloat64(v); ok { - mqttMsg.SNR = &f - } - } - if v, ok := msg["RSSI"]; ok { - if f, ok := toFloat64(v); ok { - mqttMsg.RSSI = &f - } - } - if v, ok := msg["origin"].(string); ok { - mqttMsg.Origin = v - } - - pktData := BuildPacketData(mqttMsg, decoded, observerID, region) - isNew, err := store.InsertTransmission(pktData) - if err != nil { - log.Printf("MQTT [%s] db insert error: %v", tag, err) - } - - // Process ADVERT → upsert node - if decoded.Header.PayloadTypeName == "ADVERT" && decoded.Payload.PubKey != "" { - ok, reason := ValidateAdvert(&decoded.Payload) - if ok { - role := advertRole(decoded.Payload.Flags) - if err := store.UpsertNode(decoded.Payload.PubKey, decoded.Payload.Name, role, decoded.Payload.Lat, decoded.Payload.Lon, pktData.Timestamp); err != nil { - log.Printf("MQTT [%s] node upsert error: %v", tag, err) - } - if isNew { - if err := store.IncrementAdvertCount(decoded.Payload.PubKey); err != nil { - log.Printf("MQTT [%s] advert count error: %v", tag, err) - } - } - } else { - log.Printf("MQTT [%s] skipping corrupted ADVERT: %s", tag, reason) - } - } - - // Upsert observer - if observerID != "" { - origin, _ := msg["origin"].(string) - if err := store.UpsertObserver(observerID, origin, region); err != nil { - log.Printf("MQTT [%s] observer upsert error: %v", tag, err) - } - } - - return - } - - // Format 2: Companion bridge channel message (meshcore/message/channel/) - if strings.HasPrefix(topic, "meshcore/message/channel/") { - text, _ := msg["text"].(string) - if text == "" { - return - } - - channelIdx := "" - if len(parts) >= 4 { - channelIdx = parts[3] - } - if ci, ok := msg["channel_idx"]; ok { - channelIdx = fmt.Sprintf("%v", ci) - } - - // Extract sender from "Name: message" format - sender := "" - if idx := strings.Index(text, ": "); idx > 0 && idx < 50 { - sender = text[:idx] - } - - channelName := fmt.Sprintf("ch%s", channelIdx) - - // Build decoded JSON matching Node.js CHAN format - channelMsg := map[string]interface{}{ - "type": "CHAN", - "channel": channelName, - "text": text, - "sender": sender, - } - if st, ok := msg["sender_timestamp"]; ok { - channelMsg["sender_timestamp"] = st - } - - decodedJSON, _ := json.Marshal(channelMsg) - - now := time.Now().UTC().Format(time.RFC3339) - hashInput := fmt.Sprintf("ch:%s:%s:%s", channelIdx, text, now) - h := sha256.Sum256([]byte(hashInput)) - hash := hex.EncodeToString(h[:])[:16] - - var snr, rssi *float64 - if v, ok := msg["SNR"]; ok { - if f, ok := toFloat64(v); ok { - snr = &f - } - } else if v, ok := msg["snr"]; ok { - if f, ok := toFloat64(v); ok { - snr = &f - } - } - if v, ok := msg["RSSI"]; ok { - if f, ok := toFloat64(v); ok { - rssi = &f - } - } else if v, ok := msg["rssi"]; ok { - if f, ok := toFloat64(v); ok { - rssi = &f - } - } - - pktData := &PacketData{ - Timestamp: now, - ObserverID: "companion", - ObserverName: "L1 Pro (BLE)", - SNR: snr, - RSSI: rssi, - Hash: hash, - RouteType: 1, // FLOOD - PayloadType: 5, // GRP_TXT - PathJSON: "[]", - DecodedJSON: string(decodedJSON), - } - - if _, err := store.InsertTransmission(pktData); err != nil { - log.Printf("MQTT [%s] channel insert error: %v", tag, err) - } - - // Upsert sender as a companion node - if sender != "" { - senderKey := "sender-" + strings.ToLower(sender) - if err := store.UpsertNode(senderKey, sender, "companion", nil, nil, now); err != nil { - log.Printf("MQTT [%s] sender node upsert error: %v", tag, err) - } - } - - log.Printf("MQTT [%s] channel message: ch%s from %s", tag, channelIdx, firstNonEmpty(sender, "unknown")) - return - } - - // Format 2b: Companion bridge direct message (meshcore/message/direct/) - if strings.HasPrefix(topic, "meshcore/message/direct/") { - text, _ := msg["text"].(string) - if text == "" { - return - } - - sender := "" - if idx := strings.Index(text, ": "); idx > 0 && idx < 50 { - sender = text[:idx] - } - - dm := map[string]interface{}{ - "type": "DM", - "text": text, - "sender": sender, - } - if st, ok := msg["sender_timestamp"]; ok { - dm["sender_timestamp"] = st - } - - decodedJSON, _ := json.Marshal(dm) - - now := time.Now().UTC().Format(time.RFC3339) - hashInput := fmt.Sprintf("dm:%s:%s", text, now) - h := sha256.Sum256([]byte(hashInput)) - hash := hex.EncodeToString(h[:])[:16] - - var snr, rssi *float64 - if v, ok := msg["SNR"]; ok { - if f, ok := toFloat64(v); ok { - snr = &f - } - } else if v, ok := msg["snr"]; ok { - if f, ok := toFloat64(v); ok { - snr = &f - } - } - if v, ok := msg["RSSI"]; ok { - if f, ok := toFloat64(v); ok { - rssi = &f - } - } else if v, ok := msg["rssi"]; ok { - if f, ok := toFloat64(v); ok { - rssi = &f - } - } - - pktData := &PacketData{ - Timestamp: now, - ObserverID: "companion", - ObserverName: "L1 Pro (BLE)", - SNR: snr, - RSSI: rssi, - Hash: hash, - RouteType: 1, // FLOOD - PayloadType: 2, // TXT_MSG - PathJSON: "[]", - DecodedJSON: string(decodedJSON), - } - - if _, err := store.InsertTransmission(pktData); err != nil { - log.Printf("MQTT [%s] DM insert error: %v", tag, err) - } - - log.Printf("MQTT [%s] direct message from %s", tag, firstNonEmpty(sender, "unknown")) - return - } -} - -func toFloat64(v interface{}) (float64, bool) { - switch n := v.(type) { - case float64: - return n, true - case float32: - return float64(n), true - case int: - return float64(n), true - case int64: - return float64(n), true - case json.Number: - f, err := n.Float64() - return f, err == nil - default: - return 0, false - } -} - -func firstNonEmpty(vals ...string) string { - for _, v := range vals { - if v != "" { - return v - } - } - return "" -} - -// loadChannelKeys loads channel decryption keys from config and/or a JSON file. -// Priority: CHANNEL_KEYS_PATH env var > cfg.ChannelKeysPath > channel-rainbow.json next to config. -func loadChannelKeys(cfg *Config, configPath string) map[string]string { - keys := make(map[string]string) - - // Determine file path for rainbow keys - keysPath := os.Getenv("CHANNEL_KEYS_PATH") - if keysPath == "" { - keysPath = cfg.ChannelKeysPath - } - if keysPath == "" { - // Default: look for channel-rainbow.json next to config file - keysPath = filepath.Join(filepath.Dir(configPath), "channel-rainbow.json") - } - - if data, err := os.ReadFile(keysPath); err == nil { - var fileKeys map[string]string - if err := json.Unmarshal(data, &fileKeys); err == nil { - for k, v := range fileKeys { - keys[k] = v - } - log.Printf("Loaded %d channel keys from %s", len(fileKeys), keysPath) - } else { - log.Printf("Warning: failed to parse channel keys file %s: %v", keysPath, err) - } - } - - // Merge inline config keys (override file keys) - for k, v := range cfg.ChannelKeys { - keys[k] = v - } - - return keys -} - -// Version info (set via ldflags) -var version = "dev" - -func init() { - if len(os.Args) > 1 && os.Args[1] == "--version" { - fmt.Println("meshcore-ingestor", version) - os.Exit(0) - } -} +package main + +import ( + "crypto/sha256" + "crypto/tls" + "encoding/hex" + "encoding/json" + "flag" + "fmt" + "log" + "os" + "os/signal" + "path/filepath" + "strings" + "syscall" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +func main() { + configPath := flag.String("config", "config.json", "path to config file") + flag.Parse() + + log.SetFlags(log.LstdFlags | log.Lmsgprefix) + log.SetPrefix("[ingestor] ") + + cfg, err := LoadConfig(*configPath) + if err != nil { + log.Fatalf("config: %v", err) + } + + sources := cfg.ResolvedSources() + if len(sources) == 0 { + log.Fatal("no MQTT sources configured — set mqttSources in config or MQTT_BROKER env var") + } + + store, err := OpenStore(cfg.DBPath) + if err != nil { + log.Fatalf("db: %v", err) + } + defer store.Close() + log.Printf("SQLite opened: %s", cfg.DBPath) + + // Node retention: move stale nodes to inactive_nodes on startup + nodeDays := cfg.NodeDaysOrDefault() + store.MoveStaleNodes(nodeDays) + + // Daily ticker for node retention + retentionTicker := time.NewTicker(1 * time.Hour) + go func() { + for range retentionTicker.C { + store.MoveStaleNodes(nodeDays) + } + }() + + channelKeys := loadChannelKeys(cfg, *configPath) + if len(channelKeys) > 0 { + log.Printf("Loaded %d channel keys for GRP_TXT decryption", len(channelKeys)) + } else { + log.Printf("No channel keys loaded — GRP_TXT packets will not be decrypted") + } + + // Connect to each MQTT source + var clients []mqtt.Client + for _, source := range sources { + tag := source.Name + if tag == "" { + tag = source.Broker + } + + opts := mqtt.NewClientOptions(). + AddBroker(source.Broker). + SetAutoReconnect(true). + SetConnectRetry(true). + SetOrderMatters(true) + + if source.Username != "" { + opts.SetUsername(source.Username) + } + if source.Password != "" { + opts.SetPassword(source.Password) + } + if source.RejectUnauthorized != nil && !*source.RejectUnauthorized { + opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true}) + } else if strings.HasPrefix(source.Broker, "ssl://") { + opts.SetTLSConfig(&tls.Config{}) + } + + opts.SetOnConnectHandler(func(c mqtt.Client) { + log.Printf("MQTT [%s] connected to %s", tag, source.Broker) + topics := source.Topics + if len(topics) == 0 { + topics = []string{"meshcore/#"} + } + for _, t := range topics { + token := c.Subscribe(t, 0, nil) + token.Wait() + if token.Error() != nil { + log.Printf("MQTT [%s] subscribe error for %s: %v", tag, t, token.Error()) + } else { + log.Printf("MQTT [%s] subscribed to %s", tag, t) + } + } + }) + + opts.SetConnectionLostHandler(func(c mqtt.Client, err error) { + log.Printf("MQTT [%s] disconnected: %v", tag, err) + }) + + // Capture source for closure + src := source + opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) { + handleMessage(store, tag, src, m, channelKeys) + }) + + client := mqtt.NewClient(opts) + token := client.Connect() + token.Wait() + if token.Error() != nil { + log.Printf("MQTT [%s] connection failed (non-fatal): %v", tag, token.Error()) + continue + } + clients = append(clients, client) + } + + if len(clients) == 0 { + log.Fatal("no MQTT connections established") + } + + log.Printf("Running — %d MQTT source(s) connected", len(clients)) + + // Wait for shutdown signal + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + <-sig + + log.Println("Shutting down...") + retentionTicker.Stop() + for _, c := range clients { + c.Disconnect(1000) + } + log.Println("Done.") +} + +func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, channelKeys map[string]string) { + defer func() { + if r := recover(); r != nil { + log.Printf("MQTT [%s] panic in handler: %v", tag, r) + } + }() + + topic := m.Topic() + parts := strings.Split(topic, "/") + + // IATA filter + if len(source.IATAFilter) > 0 && len(parts) > 1 { + region := parts[1] + matched := false + for _, f := range source.IATAFilter { + if f == region { + matched = true + break + } + } + if !matched { + return + } + } + + var msg map[string]interface{} + if err := json.Unmarshal(m.Payload(), &msg); err != nil { + return + } + + // Skip status/connection topics + if topic == "meshcore/status" || topic == "meshcore/events/connection" { + return + } + + // Status topic: meshcore///status + if len(parts) >= 4 && parts[3] == "status" { + observerID := parts[2] + name, _ := msg["origin"].(string) + iata := parts[1] + if err := store.UpsertObserver(observerID, name, iata); err != nil { + log.Printf("MQTT [%s] observer status error: %v", tag, err) + } + log.Printf("MQTT [%s] status: %s (%s)", tag, firstNonEmpty(name, observerID), iata) + return + } + + // Format 1: Raw packet (meshcoretomqtt / Cisien format) + rawHex, _ := msg["raw"].(string) + if rawHex != "" { + decoded, err := DecodePacket(rawHex, channelKeys) + if err != nil { + log.Printf("MQTT [%s] decode error: %v", tag, err) + return + } + + observerID := "" + region := "" + if len(parts) > 2 { + observerID = parts[2] + } + if len(parts) > 1 { + region = parts[1] + } + + mqttMsg := &MQTTPacketMessage{Raw: rawHex} + if v, ok := msg["SNR"]; ok { + if f, ok := toFloat64(v); ok { + mqttMsg.SNR = &f + } + } + if v, ok := msg["RSSI"]; ok { + if f, ok := toFloat64(v); ok { + mqttMsg.RSSI = &f + } + } + if v, ok := msg["origin"].(string); ok { + mqttMsg.Origin = v + } + + pktData := BuildPacketData(mqttMsg, decoded, observerID, region) + isNew, err := store.InsertTransmission(pktData) + if err != nil { + log.Printf("MQTT [%s] db insert error: %v", tag, err) + } + + // Process ADVERT → upsert node + if decoded.Header.PayloadTypeName == "ADVERT" && decoded.Payload.PubKey != "" { + ok, reason := ValidateAdvert(&decoded.Payload) + if ok { + role := advertRole(decoded.Payload.Flags) + if err := store.UpsertNode(decoded.Payload.PubKey, decoded.Payload.Name, role, decoded.Payload.Lat, decoded.Payload.Lon, pktData.Timestamp); err != nil { + log.Printf("MQTT [%s] node upsert error: %v", tag, err) + } + if isNew { + if err := store.IncrementAdvertCount(decoded.Payload.PubKey); err != nil { + log.Printf("MQTT [%s] advert count error: %v", tag, err) + } + } + } else { + log.Printf("MQTT [%s] skipping corrupted ADVERT: %s", tag, reason) + } + } + + // Upsert observer + if observerID != "" { + origin, _ := msg["origin"].(string) + if err := store.UpsertObserver(observerID, origin, region); err != nil { + log.Printf("MQTT [%s] observer upsert error: %v", tag, err) + } + } + + return + } + + // Format 2: Companion bridge channel message (meshcore/message/channel/) + if strings.HasPrefix(topic, "meshcore/message/channel/") { + text, _ := msg["text"].(string) + if text == "" { + return + } + + channelIdx := "" + if len(parts) >= 4 { + channelIdx = parts[3] + } + if ci, ok := msg["channel_idx"]; ok { + channelIdx = fmt.Sprintf("%v", ci) + } + + // Extract sender from "Name: message" format + sender := "" + if idx := strings.Index(text, ": "); idx > 0 && idx < 50 { + sender = text[:idx] + } + + channelName := fmt.Sprintf("ch%s", channelIdx) + + // Build decoded JSON matching Node.js CHAN format + channelMsg := map[string]interface{}{ + "type": "CHAN", + "channel": channelName, + "text": text, + "sender": sender, + } + if st, ok := msg["sender_timestamp"]; ok { + channelMsg["sender_timestamp"] = st + } + + decodedJSON, _ := json.Marshal(channelMsg) + + now := time.Now().UTC().Format(time.RFC3339) + hashInput := fmt.Sprintf("ch:%s:%s:%s", channelIdx, text, now) + h := sha256.Sum256([]byte(hashInput)) + hash := hex.EncodeToString(h[:])[:16] + + var snr, rssi *float64 + if v, ok := msg["SNR"]; ok { + if f, ok := toFloat64(v); ok { + snr = &f + } + } else if v, ok := msg["snr"]; ok { + if f, ok := toFloat64(v); ok { + snr = &f + } + } + if v, ok := msg["RSSI"]; ok { + if f, ok := toFloat64(v); ok { + rssi = &f + } + } else if v, ok := msg["rssi"]; ok { + if f, ok := toFloat64(v); ok { + rssi = &f + } + } + + pktData := &PacketData{ + Timestamp: now, + ObserverID: "companion", + ObserverName: "L1 Pro (BLE)", + SNR: snr, + RSSI: rssi, + Hash: hash, + RouteType: 1, // FLOOD + PayloadType: 5, // GRP_TXT + PathJSON: "[]", + DecodedJSON: string(decodedJSON), + } + + if _, err := store.InsertTransmission(pktData); err != nil { + log.Printf("MQTT [%s] channel insert error: %v", tag, err) + } + + // Upsert sender as a companion node + if sender != "" { + senderKey := "sender-" + strings.ToLower(sender) + if err := store.UpsertNode(senderKey, sender, "companion", nil, nil, now); err != nil { + log.Printf("MQTT [%s] sender node upsert error: %v", tag, err) + } + } + + log.Printf("MQTT [%s] channel message: ch%s from %s", tag, channelIdx, firstNonEmpty(sender, "unknown")) + return + } + + // Format 2b: Companion bridge direct message (meshcore/message/direct/) + if strings.HasPrefix(topic, "meshcore/message/direct/") { + text, _ := msg["text"].(string) + if text == "" { + return + } + + sender := "" + if idx := strings.Index(text, ": "); idx > 0 && idx < 50 { + sender = text[:idx] + } + + dm := map[string]interface{}{ + "type": "DM", + "text": text, + "sender": sender, + } + if st, ok := msg["sender_timestamp"]; ok { + dm["sender_timestamp"] = st + } + + decodedJSON, _ := json.Marshal(dm) + + now := time.Now().UTC().Format(time.RFC3339) + hashInput := fmt.Sprintf("dm:%s:%s", text, now) + h := sha256.Sum256([]byte(hashInput)) + hash := hex.EncodeToString(h[:])[:16] + + var snr, rssi *float64 + if v, ok := msg["SNR"]; ok { + if f, ok := toFloat64(v); ok { + snr = &f + } + } else if v, ok := msg["snr"]; ok { + if f, ok := toFloat64(v); ok { + snr = &f + } + } + if v, ok := msg["RSSI"]; ok { + if f, ok := toFloat64(v); ok { + rssi = &f + } + } else if v, ok := msg["rssi"]; ok { + if f, ok := toFloat64(v); ok { + rssi = &f + } + } + + pktData := &PacketData{ + Timestamp: now, + ObserverID: "companion", + ObserverName: "L1 Pro (BLE)", + SNR: snr, + RSSI: rssi, + Hash: hash, + RouteType: 1, // FLOOD + PayloadType: 2, // TXT_MSG + PathJSON: "[]", + DecodedJSON: string(decodedJSON), + } + + if _, err := store.InsertTransmission(pktData); err != nil { + log.Printf("MQTT [%s] DM insert error: %v", tag, err) + } + + log.Printf("MQTT [%s] direct message from %s", tag, firstNonEmpty(sender, "unknown")) + return + } +} + +func toFloat64(v interface{}) (float64, bool) { + switch n := v.(type) { + case float64: + return n, true + case float32: + return float64(n), true + case int: + return float64(n), true + case int64: + return float64(n), true + case json.Number: + f, err := n.Float64() + return f, err == nil + default: + return 0, false + } +} + +func firstNonEmpty(vals ...string) string { + for _, v := range vals { + if v != "" { + return v + } + } + return "" +} + +// loadChannelKeys loads channel decryption keys from config and/or a JSON file. +// Priority: CHANNEL_KEYS_PATH env var > cfg.ChannelKeysPath > channel-rainbow.json next to config. +func loadChannelKeys(cfg *Config, configPath string) map[string]string { + keys := make(map[string]string) + + // Determine file path for rainbow keys + keysPath := os.Getenv("CHANNEL_KEYS_PATH") + if keysPath == "" { + keysPath = cfg.ChannelKeysPath + } + if keysPath == "" { + // Default: look for channel-rainbow.json next to config file + keysPath = filepath.Join(filepath.Dir(configPath), "channel-rainbow.json") + } + + if data, err := os.ReadFile(keysPath); err == nil { + var fileKeys map[string]string + if err := json.Unmarshal(data, &fileKeys); err == nil { + for k, v := range fileKeys { + keys[k] = v + } + log.Printf("Loaded %d channel keys from %s", len(fileKeys), keysPath) + } else { + log.Printf("Warning: failed to parse channel keys file %s: %v", keysPath, err) + } + } + + // Merge inline config keys (override file keys) + for k, v := range cfg.ChannelKeys { + keys[k] = v + } + + return keys +} + +// Version info (set via ldflags) +var version = "dev" + +func init() { + if len(os.Args) > 1 && os.Args[1] == "--version" { + fmt.Println("meshcore-ingestor", version) + os.Exit(0) + } +} From 9751141ffc3fa419e0b1225164bb9c78711b056e Mon Sep 17 00:00:00 2001 From: you Date: Sat, 28 Mar 2026 16:36:50 +0000 Subject: [PATCH 2/4] feat: add observability metrics and concurrency tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Observability: - Add DBStats struct with atomic counters for tx_inserted, tx_dupes, obs_inserted, node_upserts, observer_upserts, write_errors - Log SQLite config on startup (busy_timeout, max_open_conns, journal) - Periodic stats logging every 5 minutes + final stats on shutdown - Instrument all write paths with counter increments Tests: - TestConcurrentWrites: 20 goroutines × 50 writes (1000 total) with interleaved InsertTransmission + UpsertNode + UpsertObserver calls. Verifies zero errors and data integrity under concurrent load. - TestDBStats: verifies counter accuracy for inserts, duplicates, upserts, and that LogStats does not panic --- cmd/ingestor/db.go | 46 ++++++++++- cmd/ingestor/db_test.go | 164 ++++++++++++++++++++++++++++++++++++++++ cmd/ingestor/main.go | 10 +++ 3 files changed, 219 insertions(+), 1 deletion(-) diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index 230173f3..0be6affd 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -7,14 +7,26 @@ import ( "log" "os" "path/filepath" + "sync/atomic" "time" _ "modernc.org/sqlite" ) +// DBStats tracks operational metrics for the ingestor database. +type DBStats struct { + TransmissionsInserted atomic.Int64 + ObservationsInserted atomic.Int64 + DuplicateTransmissions atomic.Int64 + NodeUpserts atomic.Int64 + ObserverUpserts atomic.Int64 + WriteErrors atomic.Int64 +} + // Store wraps the SQLite database for packet ingestion. type Store struct { - db *sql.DB + db *sql.DB + Stats DBStats stmtGetTxByHash *sql.Stmt stmtInsertTransmission *sql.Stmt @@ -44,6 +56,7 @@ func OpenStore(dbPath string) (*Store, error) { } db.SetMaxOpenConns(1) + log.Printf("SQLite config: busy_timeout=5000ms, max_open_conns=1, journal=WAL") if err := applySchema(db); err != nil { return nil, fmt.Errorf("applying schema: %w", err) @@ -279,9 +292,15 @@ func (s *Store) InsertTransmission(data *PacketData) (bool, error) { data.DecodedJSON, ) if err != nil { + s.Stats.WriteErrors.Add(1) return false, fmt.Errorf("insert transmission: %w", err) } txID, _ = result.LastInsertId() + s.Stats.TransmissionsInserted.Add(1) + } + + if !isNew { + s.Stats.DuplicateTransmissions.Add(1) } // Resolve observer_idx @@ -306,7 +325,10 @@ func (s *Store) InsertTransmission(data *PacketData) (bool, error) { data.PathJSON, epochTs, ) if err != nil { + s.Stats.WriteErrors.Add(1) log.Printf("[db] observation insert (non-fatal): %v", err) + } else { + s.Stats.ObservationsInserted.Add(1) } return isNew, nil @@ -322,6 +344,11 @@ func (s *Store) UpsertNode(pubKey, name, role string, lat, lon *float64, lastSee pubKey, name, role, lat, lon, now, now, name, role, lat, lon, now, ) + if err != nil { + s.Stats.WriteErrors.Add(1) + } else { + s.Stats.NodeUpserts.Add(1) + } return err } @@ -338,6 +365,11 @@ func (s *Store) UpsertObserver(id, name, iata string) error { id, name, iata, now, now, name, iata, now, ) + if err != nil { + s.Stats.WriteErrors.Add(1) + } else { + s.Stats.ObserverUpserts.Add(1) + } return err } @@ -346,6 +378,18 @@ func (s *Store) Close() error { return s.db.Close() } +// LogStats logs current operational metrics. +func (s *Store) LogStats() { + log.Printf("[stats] tx_inserted=%d tx_dupes=%d obs_inserted=%d node_upserts=%d observer_upserts=%d write_errors=%d", + s.Stats.TransmissionsInserted.Load(), + s.Stats.DuplicateTransmissions.Load(), + s.Stats.ObservationsInserted.Load(), + s.Stats.NodeUpserts.Load(), + s.Stats.ObserverUpserts.Load(), + s.Stats.WriteErrors.Load(), + ) +} + // MoveStaleNodes moves nodes not seen in nodeDays to the inactive_nodes table. // Returns the number of nodes moved. func (s *Store) MoveStaleNodes(nodeDays int) (int64, error) { diff --git a/cmd/ingestor/db_test.go b/cmd/ingestor/db_test.go index 749139a9..e45ad2f9 100644 --- a/cmd/ingestor/db_test.go +++ b/cmd/ingestor/db_test.go @@ -1,10 +1,12 @@ package main import ( + "fmt" "os" "path/filepath" "strings" "testing" + "time" ) func tempDBPath(t *testing.T) string { @@ -626,3 +628,165 @@ func TestSchemaCompatibility(t *testing.T) { } } } + +func TestConcurrentWrites(t *testing.T) { + s, err := OpenStore(tempDBPath(t)) + if err != nil { + t.Fatal(err) + } + defer s.Close() + + // Pre-create an observer for observer_idx resolution + if err := s.UpsertObserver("obs1", "Observer1", "SJC"); err != nil { + t.Fatal(err) + } + + const goroutines = 20 + const writesPerGoroutine = 50 + + errCh := make(chan error, goroutines*writesPerGoroutine) + done := make(chan struct{}) + + for g := 0; g < goroutines; g++ { + go func(gIdx int) { + defer func() { done <- struct{}{} }() + for i := 0; i < writesPerGoroutine; i++ { + hash := fmt.Sprintf("concurrent_%d_%d_____", gIdx, i) // pad to 16+ chars + snr := 5.0 + rssi := -100.0 + data := &PacketData{ + RawHex: "0A00D69F", + Timestamp: time.Now().UTC().Format(time.RFC3339), + ObserverID: "obs1", + Hash: hash[:16], + RouteType: 2, + PayloadType: 4, // ADVERT + PathJSON: "[]", + DecodedJSON: `{"type":"ADVERT"}`, + SNR: &snr, + RSSI: &rssi, + } + if _, err := s.InsertTransmission(data); err != nil { + errCh <- fmt.Errorf("goroutine %d write %d: %w", gIdx, i, err) + return + } + // Also do node + observer upserts to simulate full pipeline + lat := 37.0 + lon := -122.0 + pubKey := fmt.Sprintf("node_%d_%d________", gIdx, i) + if err := s.UpsertNode(pubKey[:16], "Node", "repeater", &lat, &lon, data.Timestamp); err != nil { + errCh <- fmt.Errorf("goroutine %d node upsert %d: %w", gIdx, i, err) + return + } + obsID := fmt.Sprintf("obs_%d_%d__________", gIdx, i) + if err := s.UpsertObserver(obsID[:16], "Obs", "SJC"); err != nil { + errCh <- fmt.Errorf("goroutine %d observer upsert %d: %w", gIdx, i, err) + return + } + } + }(g) + } + + // Wait for all goroutines + for g := 0; g < goroutines; g++ { + <-done + } + close(errCh) + + var errors []error + for err := range errCh { + errors = append(errors, err) + } + + if len(errors) > 0 { + t.Errorf("got %d errors from %d concurrent writers (first: %v)", len(errors), goroutines, errors[0]) + } + + // Verify data integrity + var txCount, obsCount, nodeCount, observerCount int + s.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&txCount) + s.db.QueryRow("SELECT COUNT(*) FROM observations").Scan(&obsCount) + s.db.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&nodeCount) + s.db.QueryRow("SELECT COUNT(*) FROM observers").Scan(&observerCount) + + expectedTx := goroutines * writesPerGoroutine + if txCount != expectedTx { + t.Errorf("transmissions count=%d, want %d", txCount, expectedTx) + } + if obsCount != expectedTx { + t.Errorf("observations count=%d, want %d", obsCount, expectedTx) + } + + t.Logf("Concurrent write test: %d goroutines × %d writes = %d total, 0 errors", + goroutines, writesPerGoroutine, goroutines*writesPerGoroutine) + t.Logf("Stats: tx_inserted=%d tx_dupes=%d obs_inserted=%d write_errors=%d", + s.Stats.TransmissionsInserted.Load(), + s.Stats.DuplicateTransmissions.Load(), + s.Stats.ObservationsInserted.Load(), + s.Stats.WriteErrors.Load(), + ) +} + +func TestDBStats(t *testing.T) { + s, err := OpenStore(tempDBPath(t)) + if err != nil { + t.Fatal(err) + } + defer s.Close() + + // Initial stats should be zero + if s.Stats.TransmissionsInserted.Load() != 0 { + t.Error("initial TransmissionsInserted should be 0") + } + if s.Stats.WriteErrors.Load() != 0 { + t.Error("initial WriteErrors should be 0") + } + + // Insert a transmission + data := &PacketData{ + RawHex: "0A00D69F", + Timestamp: "2026-03-28T00:00:00Z", + Hash: "stats_test_12345", + RouteType: 2, + PathJSON: "[]", + } + if _, err := s.InsertTransmission(data); err != nil { + t.Fatal(err) + } + + if s.Stats.TransmissionsInserted.Load() != 1 { + t.Errorf("TransmissionsInserted=%d, want 1", s.Stats.TransmissionsInserted.Load()) + } + if s.Stats.ObservationsInserted.Load() != 1 { + t.Errorf("ObservationsInserted=%d, want 1", s.Stats.ObservationsInserted.Load()) + } + + // Insert duplicate + if _, err := s.InsertTransmission(data); err != nil { + t.Fatal(err) + } + if s.Stats.DuplicateTransmissions.Load() != 1 { + t.Errorf("DuplicateTransmissions=%d, want 1", s.Stats.DuplicateTransmissions.Load()) + } + + // Node upsert + lat := 37.0 + lon := -122.0 + if err := s.UpsertNode("pk1", "Node1", "repeater", &lat, &lon, "2026-03-28T00:00:00Z"); err != nil { + t.Fatal(err) + } + if s.Stats.NodeUpserts.Load() != 1 { + t.Errorf("NodeUpserts=%d, want 1", s.Stats.NodeUpserts.Load()) + } + + // Observer upsert + if err := s.UpsertObserver("obs1", "Obs1", "SJC"); err != nil { + t.Fatal(err) + } + if s.Stats.ObserverUpserts.Load() != 1 { + t.Errorf("ObserverUpserts=%d, want 1", s.Stats.ObserverUpserts.Load()) + } + + // LogStats should not panic + s.LogStats() +} diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index 8503372a..6d548429 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -54,6 +54,14 @@ func main() { } }() + // Periodic stats logging (every 5 minutes) + statsTicker := time.NewTicker(5 * time.Minute) + go func() { + for range statsTicker.C { + store.LogStats() + } + }() + channelKeys := loadChannelKeys(cfg, *configPath) if len(channelKeys) > 0 { log.Printf("Loaded %d channel keys for GRP_TXT decryption", len(channelKeys)) @@ -137,6 +145,8 @@ func main() { log.Println("Shutting down...") retentionTicker.Stop() + statsTicker.Stop() + store.LogStats() // final stats on shutdown for _, c := range clients { c.Disconnect(1000) } From cef8156a86d5a96d80b1656afcaf96824e599f63 Mon Sep 17 00:00:00 2001 From: you Date: Sat, 28 Mar 2026 16:37:56 +0000 Subject: [PATCH 3/4] fix: set MaxIdleConns(1) to match MaxOpenConns(1) Prevents unnecessary connection close/reopen churn from the default MaxIdleConns(2) when only 1 connection is ever open. --- cmd/ingestor/db.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index 0be6affd..44bca96e 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -56,7 +56,8 @@ func OpenStore(dbPath string) (*Store, error) { } db.SetMaxOpenConns(1) - log.Printf("SQLite config: busy_timeout=5000ms, max_open_conns=1, journal=WAL") + db.SetMaxIdleConns(1) + log.Printf("SQLite config: busy_timeout=5000ms, max_open_conns=1, max_idle_conns=1, journal=WAL") if err := applySchema(db); err != nil { return nil, fmt.Errorf("applying schema: %w", err) From 331dc0090ee5d9fe7888c3c3a05ae593c03aac60 Mon Sep 17 00:00:00 2001 From: you Date: Sat, 28 Mar 2026 16:54:06 +0000 Subject: [PATCH 4/4] test: add load test with throughput and latency metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TestLoadTestThroughput: 1000 messages × 4 writes each = 4000 writes, 20 concurrent goroutines. Reports msgs/sec, p50/p95/p99 latency, SQLITE_BUSY count, and total errors. Hard-asserts zero BUSY errors. --- cmd/ingestor/db_test.go | 143 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) diff --git a/cmd/ingestor/db_test.go b/cmd/ingestor/db_test.go index e45ad2f9..c5afb52c 100644 --- a/cmd/ingestor/db_test.go +++ b/cmd/ingestor/db_test.go @@ -4,7 +4,9 @@ import ( "fmt" "os" "path/filepath" + "sort" "strings" + "sync/atomic" "testing" "time" ) @@ -790,3 +792,144 @@ func TestDBStats(t *testing.T) { // LogStats should not panic s.LogStats() } + +func TestLoadTestThroughput(t *testing.T) { + s, err := OpenStore(tempDBPath(t)) + if err != nil { + t.Fatal(err) + } + defer s.Close() + + // Pre-create observer + if err := s.UpsertObserver("obs1", "Observer1", "SJC"); err != nil { + t.Fatal(err) + } + + const totalMessages = 1000 + const goroutines = 20 + perGoroutine := totalMessages / goroutines + + // Simulate full pipeline: InsertTransmission + UpsertNode + UpsertObserver + IncrementAdvertCount + // This matches the real handleMessage write pattern for ADVERT packets + latencies := make([]time.Duration, totalMessages) + var busyErrors atomic.Int64 + var totalErrors atomic.Int64 + errCh := make(chan error, totalMessages) + done := make(chan struct{}) + + start := time.Now() + + for g := 0; g < goroutines; g++ { + go func(gIdx int) { + defer func() { done <- struct{}{} }() + for i := 0; i < perGoroutine; i++ { + msgStart := time.Now() + idx := gIdx*perGoroutine + i + hash := fmt.Sprintf("load_%04d_%04d____", gIdx, i) + snr := 5.0 + rssi := -100.0 + + data := &PacketData{ + RawHex: "0A00D69F", + Timestamp: time.Now().UTC().Format(time.RFC3339), + ObserverID: "obs1", + Hash: hash[:16], + RouteType: 2, + PayloadType: 4, + PathJSON: "[]", + DecodedJSON: `{"type":"ADVERT","pubKey":"` + hash[:16] + `"}`, + SNR: &snr, + RSSI: &rssi, + } + + _, err := s.InsertTransmission(data) + if err != nil { + totalErrors.Add(1) + if strings.Contains(err.Error(), "database is locked") || strings.Contains(err.Error(), "SQLITE_BUSY") { + busyErrors.Add(1) + } + errCh <- err + continue + } + + lat := 37.0 + float64(gIdx)*0.001 + lon := -122.0 + float64(i)*0.001 + pubKey := fmt.Sprintf("node_%04d_%04d____", gIdx, i) + if err := s.UpsertNode(pubKey[:16], "Node", "repeater", &lat, &lon, data.Timestamp); err != nil { + totalErrors.Add(1) + if strings.Contains(err.Error(), "locked") || strings.Contains(err.Error(), "BUSY") { + busyErrors.Add(1) + } + } + + if err := s.IncrementAdvertCount(pubKey[:16]); err != nil { + totalErrors.Add(1) + } + + obsID := fmt.Sprintf("obs_%04d_%04d_____", gIdx, i) + if err := s.UpsertObserver(obsID[:16], "Obs", "SJC"); err != nil { + totalErrors.Add(1) + if strings.Contains(err.Error(), "locked") || strings.Contains(err.Error(), "BUSY") { + busyErrors.Add(1) + } + } + + latencies[idx] = time.Since(msgStart) + } + }(g) + } + + for g := 0; g < goroutines; g++ { + <-done + } + close(errCh) + elapsed := time.Since(start) + + // Calculate p50, p95, p99 + validLatencies := make([]time.Duration, 0, totalMessages) + for _, l := range latencies { + if l > 0 { + validLatencies = append(validLatencies, l) + } + } + sort.Slice(validLatencies, func(i, j int) bool { return validLatencies[i] < validLatencies[j] }) + + p50 := validLatencies[len(validLatencies)*50/100] + p95 := validLatencies[len(validLatencies)*95/100] + p99 := validLatencies[len(validLatencies)*99/100] + msgsPerSec := float64(totalMessages) / elapsed.Seconds() + + t.Logf("=== LOAD TEST RESULTS ===") + t.Logf("Messages: %d (%d goroutines × %d each)", totalMessages, goroutines, perGoroutine) + t.Logf("Writes/msg: 4 (InsertTx + UpsertNode + IncrAdvertCount + UpsertObserver)") + t.Logf("Total writes: %d", totalMessages*4) + t.Logf("Duration: %s", elapsed.Round(time.Millisecond)) + t.Logf("Throughput: %.1f msgs/sec (%.1f writes/sec)", msgsPerSec, msgsPerSec*4) + t.Logf("Latency p50: %s", p50.Round(time.Microsecond)) + t.Logf("Latency p95: %s", p95.Round(time.Microsecond)) + t.Logf("Latency p99: %s", p99.Round(time.Microsecond)) + t.Logf("SQLITE_BUSY: %d", busyErrors.Load()) + t.Logf("Total errors: %d", totalErrors.Load()) + t.Logf("Stats: tx=%d dupes=%d obs=%d nodes=%d observers=%d write_err=%d", + s.Stats.TransmissionsInserted.Load(), + s.Stats.DuplicateTransmissions.Load(), + s.Stats.ObservationsInserted.Load(), + s.Stats.NodeUpserts.Load(), + s.Stats.ObserverUpserts.Load(), + s.Stats.WriteErrors.Load(), + ) + + // Hard assertions + if busyErrors.Load() > 0 { + t.Errorf("SQLITE_BUSY errors: %d (expected 0)", busyErrors.Load()) + } + if totalErrors.Load() > 0 { + t.Errorf("Total errors: %d (expected 0)", totalErrors.Load()) + } + + var txCount int + s.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&txCount) + if txCount != totalMessages { + t.Errorf("transmissions=%d, want %d", txCount, totalMessages) + } +}