mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-06-05 00:01:21 +00:00
Merge pull request #214 from Kpa-clawbot/fix/sqlite-write-concurrency
Reviewed by Kobayashi — LGTM. Fixes SQLite BUSY contention with busy_timeout + single connection serialization.
This commit is contained in:
+469
-422
@@ -1,422 +1,469 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
// Store wraps the SQLite database for packet ingestion.
|
||||
type Store struct {
|
||||
db *sql.DB
|
||||
|
||||
stmtGetTxByHash *sql.Stmt
|
||||
stmtInsertTransmission *sql.Stmt
|
||||
stmtUpdateTxFirstSeen *sql.Stmt
|
||||
stmtInsertObservation *sql.Stmt
|
||||
stmtUpsertNode *sql.Stmt
|
||||
stmtIncrementAdvertCount *sql.Stmt
|
||||
stmtUpsertObserver *sql.Stmt
|
||||
stmtGetObserverRowid *sql.Stmt
|
||||
}
|
||||
|
||||
// OpenStore opens or creates a SQLite DB at the given path, applying the
|
||||
// v3 schema that is compatible with the Node.js server.
|
||||
func OpenStore(dbPath string) (*Store, error) {
|
||||
dir := filepath.Dir(dbPath)
|
||||
if err := os.MkdirAll(dir, 0o755); err != nil {
|
||||
return nil, fmt.Errorf("creating data dir: %w", err)
|
||||
}
|
||||
|
||||
db, err := sql.Open("sqlite", dbPath+"?_pragma=journal_mode(WAL)&_pragma=foreign_keys(ON)")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("opening db: %w", err)
|
||||
}
|
||||
|
||||
if err := db.Ping(); err != nil {
|
||||
return nil, fmt.Errorf("pinging db: %w", err)
|
||||
}
|
||||
|
||||
if err := applySchema(db); err != nil {
|
||||
return nil, fmt.Errorf("applying schema: %w", err)
|
||||
}
|
||||
|
||||
s := &Store{db: db}
|
||||
if err := s.prepareStatements(); err != nil {
|
||||
return nil, fmt.Errorf("preparing statements: %w", err)
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func applySchema(db *sql.DB) error {
|
||||
schema := `
|
||||
CREATE TABLE IF NOT EXISTS nodes (
|
||||
public_key TEXT PRIMARY KEY,
|
||||
name TEXT,
|
||||
role TEXT,
|
||||
lat REAL,
|
||||
lon REAL,
|
||||
last_seen TEXT,
|
||||
first_seen TEXT,
|
||||
advert_count INTEGER DEFAULT 0
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS observers (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT,
|
||||
iata TEXT,
|
||||
last_seen TEXT,
|
||||
first_seen TEXT,
|
||||
packet_count INTEGER DEFAULT 0,
|
||||
model TEXT,
|
||||
firmware TEXT,
|
||||
client_version TEXT,
|
||||
radio TEXT,
|
||||
battery_mv INTEGER,
|
||||
uptime_secs INTEGER,
|
||||
noise_floor INTEGER
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen);
|
||||
CREATE INDEX IF NOT EXISTS idx_observers_last_seen ON observers(last_seen);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS inactive_nodes (
|
||||
public_key TEXT PRIMARY KEY,
|
||||
name TEXT,
|
||||
role TEXT,
|
||||
lat REAL,
|
||||
lon REAL,
|
||||
last_seen TEXT,
|
||||
first_seen TEXT,
|
||||
advert_count INTEGER DEFAULT 0
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_inactive_nodes_last_seen ON inactive_nodes(last_seen);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS transmissions (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
raw_hex TEXT NOT NULL,
|
||||
hash TEXT NOT NULL UNIQUE,
|
||||
first_seen TEXT NOT NULL,
|
||||
route_type INTEGER,
|
||||
payload_type INTEGER,
|
||||
payload_version INTEGER,
|
||||
decoded_json TEXT,
|
||||
created_at TEXT DEFAULT (datetime('now'))
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_transmissions_hash ON transmissions(hash);
|
||||
CREATE INDEX IF NOT EXISTS idx_transmissions_first_seen ON transmissions(first_seen);
|
||||
CREATE INDEX IF NOT EXISTS idx_transmissions_payload_type ON transmissions(payload_type);
|
||||
`
|
||||
if _, err := db.Exec(schema); err != nil {
|
||||
return fmt.Errorf("base schema: %w", err)
|
||||
}
|
||||
|
||||
// Create observations table (v3 schema)
|
||||
obsExists := false
|
||||
row := db.QueryRow("SELECT name FROM sqlite_master WHERE type='table' AND name='observations'")
|
||||
var dummy string
|
||||
if row.Scan(&dummy) == nil {
|
||||
obsExists = true
|
||||
}
|
||||
|
||||
if !obsExists {
|
||||
obs := `
|
||||
CREATE TABLE observations (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
transmission_id INTEGER NOT NULL REFERENCES transmissions(id),
|
||||
observer_idx INTEGER,
|
||||
direction TEXT,
|
||||
snr REAL,
|
||||
rssi REAL,
|
||||
score INTEGER,
|
||||
path_json TEXT,
|
||||
timestamp INTEGER NOT NULL
|
||||
);
|
||||
CREATE INDEX idx_observations_transmission_id ON observations(transmission_id);
|
||||
CREATE INDEX idx_observations_observer_idx ON observations(observer_idx);
|
||||
CREATE INDEX idx_observations_timestamp ON observations(timestamp);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_observations_dedup ON observations(transmission_id, observer_idx, COALESCE(path_json, ''));
|
||||
`
|
||||
if _, err := db.Exec(obs); err != nil {
|
||||
return fmt.Errorf("observations schema: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// One-time migration: recalculate advert_count to count unique transmissions only
|
||||
db.Exec(`CREATE TABLE IF NOT EXISTS _migrations (name TEXT PRIMARY KEY)`)
|
||||
var migDone int
|
||||
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'advert_count_unique_v1'")
|
||||
if row.Scan(&migDone) != nil {
|
||||
log.Println("[migration] Recalculating advert_count (unique transmissions only)...")
|
||||
db.Exec(`
|
||||
UPDATE nodes SET advert_count = (
|
||||
SELECT COUNT(*) FROM transmissions t
|
||||
WHERE t.payload_type = 4
|
||||
AND t.decoded_json LIKE '%' || nodes.public_key || '%'
|
||||
)
|
||||
`)
|
||||
db.Exec(`INSERT INTO _migrations (name) VALUES ('advert_count_unique_v1')`)
|
||||
log.Println("[migration] advert_count recalculated")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) prepareStatements() error {
|
||||
var err error
|
||||
|
||||
s.stmtGetTxByHash, err = s.db.Prepare("SELECT id, first_seen FROM transmissions WHERE hash = ?")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.stmtInsertTransmission, err = s.db.Prepare(`
|
||||
INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.stmtUpdateTxFirstSeen, err = s.db.Prepare("UPDATE transmissions SET first_seen = ? WHERE id = ?")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.stmtInsertObservation, err = s.db.Prepare(`
|
||||
INSERT OR IGNORE INTO observations (transmission_id, observer_idx, direction, snr, rssi, score, path_json, timestamp)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.stmtUpsertNode, err = s.db.Prepare(`
|
||||
INSERT INTO nodes (public_key, name, role, lat, lon, last_seen, first_seen)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(public_key) DO UPDATE SET
|
||||
name = COALESCE(?, name),
|
||||
role = COALESCE(?, role),
|
||||
lat = COALESCE(?, lat),
|
||||
lon = COALESCE(?, lon),
|
||||
last_seen = ?
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.stmtIncrementAdvertCount, err = s.db.Prepare(`
|
||||
UPDATE nodes SET advert_count = advert_count + 1 WHERE public_key = ?
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.stmtUpsertObserver, err = s.db.Prepare(`
|
||||
INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count)
|
||||
VALUES (?, ?, ?, ?, ?, 1)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
name = COALESCE(?, name),
|
||||
iata = COALESCE(?, iata),
|
||||
last_seen = ?,
|
||||
packet_count = packet_count + 1
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.stmtGetObserverRowid, err = s.db.Prepare("SELECT rowid FROM observers WHERE id = ?")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// InsertTransmission inserts a decoded packet into transmissions + observations.
|
||||
// Returns true if a new transmission was created (not a duplicate hash).
|
||||
func (s *Store) InsertTransmission(data *PacketData) (bool, error) {
|
||||
hash := data.Hash
|
||||
if hash == "" {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
now := data.Timestamp
|
||||
if now == "" {
|
||||
now = time.Now().UTC().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
var txID int64
|
||||
isNew := false
|
||||
|
||||
// Check for existing transmission
|
||||
var existingID int64
|
||||
var existingFirstSeen string
|
||||
err := s.stmtGetTxByHash.QueryRow(hash).Scan(&existingID, &existingFirstSeen)
|
||||
if err == nil {
|
||||
// Existing transmission
|
||||
txID = existingID
|
||||
if now < existingFirstSeen {
|
||||
_, _ = s.stmtUpdateTxFirstSeen.Exec(now, txID)
|
||||
}
|
||||
} else {
|
||||
// New transmission
|
||||
isNew = true
|
||||
result, err := s.stmtInsertTransmission.Exec(
|
||||
data.RawHex, hash, now,
|
||||
data.RouteType, data.PayloadType, data.PayloadVersion,
|
||||
data.DecodedJSON,
|
||||
)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("insert transmission: %w", err)
|
||||
}
|
||||
txID, _ = result.LastInsertId()
|
||||
}
|
||||
|
||||
// Resolve observer_idx
|
||||
var observerIdx *int64
|
||||
if data.ObserverID != "" {
|
||||
var rowid int64
|
||||
err := s.stmtGetObserverRowid.QueryRow(data.ObserverID).Scan(&rowid)
|
||||
if err == nil {
|
||||
observerIdx = &rowid
|
||||
}
|
||||
}
|
||||
|
||||
// Insert observation
|
||||
epochTs := time.Now().Unix()
|
||||
if t, err := time.Parse(time.RFC3339, now); err == nil {
|
||||
epochTs = t.Unix()
|
||||
}
|
||||
|
||||
_, err = s.stmtInsertObservation.Exec(
|
||||
txID, observerIdx, nil, // direction
|
||||
data.SNR, data.RSSI, nil, // score
|
||||
data.PathJSON, epochTs,
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("[db] observation insert (non-fatal): %v", err)
|
||||
}
|
||||
|
||||
return isNew, nil
|
||||
}
|
||||
|
||||
// UpsertNode inserts or updates a node.
|
||||
func (s *Store) UpsertNode(pubKey, name, role string, lat, lon *float64, lastSeen string) error {
|
||||
now := lastSeen
|
||||
if now == "" {
|
||||
now = time.Now().UTC().Format(time.RFC3339)
|
||||
}
|
||||
_, err := s.stmtUpsertNode.Exec(
|
||||
pubKey, name, role, lat, lon, now, now,
|
||||
name, role, lat, lon, now,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// IncrementAdvertCount increments advert_count for a node by public key.
|
||||
func (s *Store) IncrementAdvertCount(pubKey string) error {
|
||||
_, err := s.stmtIncrementAdvertCount.Exec(pubKey)
|
||||
return err
|
||||
}
|
||||
|
||||
// UpsertObserver inserts or updates an observer.
|
||||
func (s *Store) UpsertObserver(id, name, iata string) error {
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
_, err := s.stmtUpsertObserver.Exec(
|
||||
id, name, iata, now, now,
|
||||
name, iata, now,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// Close closes the database.
|
||||
func (s *Store) Close() error {
|
||||
return s.db.Close()
|
||||
}
|
||||
|
||||
// MoveStaleNodes moves nodes not seen in nodeDays to the inactive_nodes table.
|
||||
// Returns the number of nodes moved.
|
||||
func (s *Store) MoveStaleNodes(nodeDays int) (int64, error) {
|
||||
cutoff := time.Now().UTC().AddDate(0, 0, -nodeDays).Format(time.RFC3339)
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("begin tx: %w", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
_, err = tx.Exec(`INSERT OR REPLACE INTO inactive_nodes SELECT * FROM nodes WHERE last_seen < ?`, cutoff)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("insert inactive: %w", err)
|
||||
}
|
||||
result, err := tx.Exec(`DELETE FROM nodes WHERE last_seen < ?`, cutoff)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("delete stale: %w", err)
|
||||
}
|
||||
moved, _ := result.RowsAffected()
|
||||
if err := tx.Commit(); err != nil {
|
||||
return 0, fmt.Errorf("commit: %w", err)
|
||||
}
|
||||
if moved > 0 {
|
||||
log.Printf("Moved %d node(s) to inactive_nodes (not seen in %d days)", moved, nodeDays)
|
||||
}
|
||||
return moved, nil
|
||||
}
|
||||
|
||||
// PacketData holds the data needed to insert a packet into the DB.
|
||||
type PacketData struct {
|
||||
RawHex string
|
||||
Timestamp string
|
||||
ObserverID string
|
||||
ObserverName string
|
||||
SNR *float64
|
||||
RSSI *float64
|
||||
Hash string
|
||||
RouteType int
|
||||
PayloadType int
|
||||
PayloadVersion int
|
||||
PathJSON string
|
||||
DecodedJSON string
|
||||
}
|
||||
|
||||
// MQTTPacketMessage is the JSON payload from an MQTT raw packet message.
|
||||
type MQTTPacketMessage struct {
|
||||
Raw string `json:"raw"`
|
||||
SNR *float64 `json:"SNR"`
|
||||
RSSI *float64 `json:"RSSI"`
|
||||
Origin string `json:"origin"`
|
||||
}
|
||||
|
||||
// BuildPacketData constructs a PacketData from a decoded packet and MQTT message.
|
||||
func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID, region string) *PacketData {
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
pathJSON := "[]"
|
||||
if len(decoded.Path.Hops) > 0 {
|
||||
b, _ := json.Marshal(decoded.Path.Hops)
|
||||
pathJSON = string(b)
|
||||
}
|
||||
|
||||
return &PacketData{
|
||||
RawHex: msg.Raw,
|
||||
Timestamp: now,
|
||||
ObserverID: observerID,
|
||||
ObserverName: msg.Origin,
|
||||
SNR: msg.SNR,
|
||||
RSSI: msg.RSSI,
|
||||
Hash: ComputeContentHash(msg.Raw),
|
||||
RouteType: decoded.Header.RouteType,
|
||||
PayloadType: decoded.Header.PayloadType,
|
||||
PayloadVersion: decoded.Header.PayloadVersion,
|
||||
PathJSON: pathJSON,
|
||||
DecodedJSON: PayloadJSON(&decoded.Payload),
|
||||
}
|
||||
}
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
// DBStats tracks operational metrics for the ingestor database.
|
||||
type DBStats struct {
|
||||
TransmissionsInserted atomic.Int64
|
||||
ObservationsInserted atomic.Int64
|
||||
DuplicateTransmissions atomic.Int64
|
||||
NodeUpserts atomic.Int64
|
||||
ObserverUpserts atomic.Int64
|
||||
WriteErrors atomic.Int64
|
||||
}
|
||||
|
||||
// Store wraps the SQLite database for packet ingestion.
|
||||
type Store struct {
|
||||
db *sql.DB
|
||||
Stats DBStats
|
||||
|
||||
stmtGetTxByHash *sql.Stmt
|
||||
stmtInsertTransmission *sql.Stmt
|
||||
stmtUpdateTxFirstSeen *sql.Stmt
|
||||
stmtInsertObservation *sql.Stmt
|
||||
stmtUpsertNode *sql.Stmt
|
||||
stmtIncrementAdvertCount *sql.Stmt
|
||||
stmtUpsertObserver *sql.Stmt
|
||||
stmtGetObserverRowid *sql.Stmt
|
||||
}
|
||||
|
||||
// OpenStore opens or creates a SQLite DB at the given path, applying the
|
||||
// v3 schema that is compatible with the Node.js server.
|
||||
func OpenStore(dbPath string) (*Store, error) {
|
||||
dir := filepath.Dir(dbPath)
|
||||
if err := os.MkdirAll(dir, 0o755); err != nil {
|
||||
return nil, fmt.Errorf("creating data dir: %w", err)
|
||||
}
|
||||
|
||||
db, err := sql.Open("sqlite", dbPath+"?_pragma=journal_mode(WAL)&_pragma=foreign_keys(ON)&_pragma=busy_timeout(5000)")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("opening db: %w", err)
|
||||
}
|
||||
|
||||
if err := db.Ping(); err != nil {
|
||||
return nil, fmt.Errorf("pinging db: %w", err)
|
||||
}
|
||||
|
||||
db.SetMaxOpenConns(1)
|
||||
db.SetMaxIdleConns(1)
|
||||
log.Printf("SQLite config: busy_timeout=5000ms, max_open_conns=1, max_idle_conns=1, journal=WAL")
|
||||
|
||||
if err := applySchema(db); err != nil {
|
||||
return nil, fmt.Errorf("applying schema: %w", err)
|
||||
}
|
||||
|
||||
s := &Store{db: db}
|
||||
if err := s.prepareStatements(); err != nil {
|
||||
return nil, fmt.Errorf("preparing statements: %w", err)
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func applySchema(db *sql.DB) error {
|
||||
schema := `
|
||||
CREATE TABLE IF NOT EXISTS nodes (
|
||||
public_key TEXT PRIMARY KEY,
|
||||
name TEXT,
|
||||
role TEXT,
|
||||
lat REAL,
|
||||
lon REAL,
|
||||
last_seen TEXT,
|
||||
first_seen TEXT,
|
||||
advert_count INTEGER DEFAULT 0
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS observers (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT,
|
||||
iata TEXT,
|
||||
last_seen TEXT,
|
||||
first_seen TEXT,
|
||||
packet_count INTEGER DEFAULT 0,
|
||||
model TEXT,
|
||||
firmware TEXT,
|
||||
client_version TEXT,
|
||||
radio TEXT,
|
||||
battery_mv INTEGER,
|
||||
uptime_secs INTEGER,
|
||||
noise_floor INTEGER
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen);
|
||||
CREATE INDEX IF NOT EXISTS idx_observers_last_seen ON observers(last_seen);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS inactive_nodes (
|
||||
public_key TEXT PRIMARY KEY,
|
||||
name TEXT,
|
||||
role TEXT,
|
||||
lat REAL,
|
||||
lon REAL,
|
||||
last_seen TEXT,
|
||||
first_seen TEXT,
|
||||
advert_count INTEGER DEFAULT 0
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_inactive_nodes_last_seen ON inactive_nodes(last_seen);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS transmissions (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
raw_hex TEXT NOT NULL,
|
||||
hash TEXT NOT NULL UNIQUE,
|
||||
first_seen TEXT NOT NULL,
|
||||
route_type INTEGER,
|
||||
payload_type INTEGER,
|
||||
payload_version INTEGER,
|
||||
decoded_json TEXT,
|
||||
created_at TEXT DEFAULT (datetime('now'))
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_transmissions_hash ON transmissions(hash);
|
||||
CREATE INDEX IF NOT EXISTS idx_transmissions_first_seen ON transmissions(first_seen);
|
||||
CREATE INDEX IF NOT EXISTS idx_transmissions_payload_type ON transmissions(payload_type);
|
||||
`
|
||||
if _, err := db.Exec(schema); err != nil {
|
||||
return fmt.Errorf("base schema: %w", err)
|
||||
}
|
||||
|
||||
// Create observations table (v3 schema)
|
||||
obsExists := false
|
||||
row := db.QueryRow("SELECT name FROM sqlite_master WHERE type='table' AND name='observations'")
|
||||
var dummy string
|
||||
if row.Scan(&dummy) == nil {
|
||||
obsExists = true
|
||||
}
|
||||
|
||||
if !obsExists {
|
||||
obs := `
|
||||
CREATE TABLE observations (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
transmission_id INTEGER NOT NULL REFERENCES transmissions(id),
|
||||
observer_idx INTEGER,
|
||||
direction TEXT,
|
||||
snr REAL,
|
||||
rssi REAL,
|
||||
score INTEGER,
|
||||
path_json TEXT,
|
||||
timestamp INTEGER NOT NULL
|
||||
);
|
||||
CREATE INDEX idx_observations_transmission_id ON observations(transmission_id);
|
||||
CREATE INDEX idx_observations_observer_idx ON observations(observer_idx);
|
||||
CREATE INDEX idx_observations_timestamp ON observations(timestamp);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_observations_dedup ON observations(transmission_id, observer_idx, COALESCE(path_json, ''));
|
||||
`
|
||||
if _, err := db.Exec(obs); err != nil {
|
||||
return fmt.Errorf("observations schema: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// One-time migration: recalculate advert_count to count unique transmissions only
|
||||
db.Exec(`CREATE TABLE IF NOT EXISTS _migrations (name TEXT PRIMARY KEY)`)
|
||||
var migDone int
|
||||
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'advert_count_unique_v1'")
|
||||
if row.Scan(&migDone) != nil {
|
||||
log.Println("[migration] Recalculating advert_count (unique transmissions only)...")
|
||||
db.Exec(`
|
||||
UPDATE nodes SET advert_count = (
|
||||
SELECT COUNT(*) FROM transmissions t
|
||||
WHERE t.payload_type = 4
|
||||
AND t.decoded_json LIKE '%' || nodes.public_key || '%'
|
||||
)
|
||||
`)
|
||||
db.Exec(`INSERT INTO _migrations (name) VALUES ('advert_count_unique_v1')`)
|
||||
log.Println("[migration] advert_count recalculated")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) prepareStatements() error {
|
||||
var err error
|
||||
|
||||
s.stmtGetTxByHash, err = s.db.Prepare("SELECT id, first_seen FROM transmissions WHERE hash = ?")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.stmtInsertTransmission, err = s.db.Prepare(`
|
||||
INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.stmtUpdateTxFirstSeen, err = s.db.Prepare("UPDATE transmissions SET first_seen = ? WHERE id = ?")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.stmtInsertObservation, err = s.db.Prepare(`
|
||||
INSERT OR IGNORE INTO observations (transmission_id, observer_idx, direction, snr, rssi, score, path_json, timestamp)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.stmtUpsertNode, err = s.db.Prepare(`
|
||||
INSERT INTO nodes (public_key, name, role, lat, lon, last_seen, first_seen)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(public_key) DO UPDATE SET
|
||||
name = COALESCE(?, name),
|
||||
role = COALESCE(?, role),
|
||||
lat = COALESCE(?, lat),
|
||||
lon = COALESCE(?, lon),
|
||||
last_seen = ?
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.stmtIncrementAdvertCount, err = s.db.Prepare(`
|
||||
UPDATE nodes SET advert_count = advert_count + 1 WHERE public_key = ?
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.stmtUpsertObserver, err = s.db.Prepare(`
|
||||
INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count)
|
||||
VALUES (?, ?, ?, ?, ?, 1)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
name = COALESCE(?, name),
|
||||
iata = COALESCE(?, iata),
|
||||
last_seen = ?,
|
||||
packet_count = packet_count + 1
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.stmtGetObserverRowid, err = s.db.Prepare("SELECT rowid FROM observers WHERE id = ?")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// InsertTransmission inserts a decoded packet into transmissions + observations.
|
||||
// Returns true if a new transmission was created (not a duplicate hash).
|
||||
func (s *Store) InsertTransmission(data *PacketData) (bool, error) {
|
||||
hash := data.Hash
|
||||
if hash == "" {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
now := data.Timestamp
|
||||
if now == "" {
|
||||
now = time.Now().UTC().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
var txID int64
|
||||
isNew := false
|
||||
|
||||
// Check for existing transmission
|
||||
var existingID int64
|
||||
var existingFirstSeen string
|
||||
err := s.stmtGetTxByHash.QueryRow(hash).Scan(&existingID, &existingFirstSeen)
|
||||
if err == nil {
|
||||
// Existing transmission
|
||||
txID = existingID
|
||||
if now < existingFirstSeen {
|
||||
_, _ = s.stmtUpdateTxFirstSeen.Exec(now, txID)
|
||||
}
|
||||
} else {
|
||||
// New transmission
|
||||
isNew = true
|
||||
result, err := s.stmtInsertTransmission.Exec(
|
||||
data.RawHex, hash, now,
|
||||
data.RouteType, data.PayloadType, data.PayloadVersion,
|
||||
data.DecodedJSON,
|
||||
)
|
||||
if err != nil {
|
||||
s.Stats.WriteErrors.Add(1)
|
||||
return false, fmt.Errorf("insert transmission: %w", err)
|
||||
}
|
||||
txID, _ = result.LastInsertId()
|
||||
s.Stats.TransmissionsInserted.Add(1)
|
||||
}
|
||||
|
||||
if !isNew {
|
||||
s.Stats.DuplicateTransmissions.Add(1)
|
||||
}
|
||||
|
||||
// Resolve observer_idx
|
||||
var observerIdx *int64
|
||||
if data.ObserverID != "" {
|
||||
var rowid int64
|
||||
err := s.stmtGetObserverRowid.QueryRow(data.ObserverID).Scan(&rowid)
|
||||
if err == nil {
|
||||
observerIdx = &rowid
|
||||
}
|
||||
}
|
||||
|
||||
// Insert observation
|
||||
epochTs := time.Now().Unix()
|
||||
if t, err := time.Parse(time.RFC3339, now); err == nil {
|
||||
epochTs = t.Unix()
|
||||
}
|
||||
|
||||
_, err = s.stmtInsertObservation.Exec(
|
||||
txID, observerIdx, nil, // direction
|
||||
data.SNR, data.RSSI, nil, // score
|
||||
data.PathJSON, epochTs,
|
||||
)
|
||||
if err != nil {
|
||||
s.Stats.WriteErrors.Add(1)
|
||||
log.Printf("[db] observation insert (non-fatal): %v", err)
|
||||
} else {
|
||||
s.Stats.ObservationsInserted.Add(1)
|
||||
}
|
||||
|
||||
return isNew, nil
|
||||
}
|
||||
|
||||
// UpsertNode inserts or updates a node.
|
||||
func (s *Store) UpsertNode(pubKey, name, role string, lat, lon *float64, lastSeen string) error {
|
||||
now := lastSeen
|
||||
if now == "" {
|
||||
now = time.Now().UTC().Format(time.RFC3339)
|
||||
}
|
||||
_, err := s.stmtUpsertNode.Exec(
|
||||
pubKey, name, role, lat, lon, now, now,
|
||||
name, role, lat, lon, now,
|
||||
)
|
||||
if err != nil {
|
||||
s.Stats.WriteErrors.Add(1)
|
||||
} else {
|
||||
s.Stats.NodeUpserts.Add(1)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// IncrementAdvertCount increments advert_count for a node by public key.
|
||||
func (s *Store) IncrementAdvertCount(pubKey string) error {
|
||||
_, err := s.stmtIncrementAdvertCount.Exec(pubKey)
|
||||
return err
|
||||
}
|
||||
|
||||
// UpsertObserver inserts or updates an observer.
|
||||
func (s *Store) UpsertObserver(id, name, iata string) error {
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
_, err := s.stmtUpsertObserver.Exec(
|
||||
id, name, iata, now, now,
|
||||
name, iata, now,
|
||||
)
|
||||
if err != nil {
|
||||
s.Stats.WriteErrors.Add(1)
|
||||
} else {
|
||||
s.Stats.ObserverUpserts.Add(1)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Close closes the database.
|
||||
func (s *Store) Close() error {
|
||||
return s.db.Close()
|
||||
}
|
||||
|
||||
// LogStats logs current operational metrics.
|
||||
func (s *Store) LogStats() {
|
||||
log.Printf("[stats] tx_inserted=%d tx_dupes=%d obs_inserted=%d node_upserts=%d observer_upserts=%d write_errors=%d",
|
||||
s.Stats.TransmissionsInserted.Load(),
|
||||
s.Stats.DuplicateTransmissions.Load(),
|
||||
s.Stats.ObservationsInserted.Load(),
|
||||
s.Stats.NodeUpserts.Load(),
|
||||
s.Stats.ObserverUpserts.Load(),
|
||||
s.Stats.WriteErrors.Load(),
|
||||
)
|
||||
}
|
||||
|
||||
// MoveStaleNodes moves nodes not seen in nodeDays to the inactive_nodes table.
|
||||
// Returns the number of nodes moved.
|
||||
func (s *Store) MoveStaleNodes(nodeDays int) (int64, error) {
|
||||
cutoff := time.Now().UTC().AddDate(0, 0, -nodeDays).Format(time.RFC3339)
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("begin tx: %w", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
_, err = tx.Exec(`INSERT OR REPLACE INTO inactive_nodes SELECT * FROM nodes WHERE last_seen < ?`, cutoff)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("insert inactive: %w", err)
|
||||
}
|
||||
result, err := tx.Exec(`DELETE FROM nodes WHERE last_seen < ?`, cutoff)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("delete stale: %w", err)
|
||||
}
|
||||
moved, _ := result.RowsAffected()
|
||||
if err := tx.Commit(); err != nil {
|
||||
return 0, fmt.Errorf("commit: %w", err)
|
||||
}
|
||||
if moved > 0 {
|
||||
log.Printf("Moved %d node(s) to inactive_nodes (not seen in %d days)", moved, nodeDays)
|
||||
}
|
||||
return moved, nil
|
||||
}
|
||||
|
||||
// PacketData holds the data needed to insert a packet into the DB.
|
||||
type PacketData struct {
|
||||
RawHex string
|
||||
Timestamp string
|
||||
ObserverID string
|
||||
ObserverName string
|
||||
SNR *float64
|
||||
RSSI *float64
|
||||
Hash string
|
||||
RouteType int
|
||||
PayloadType int
|
||||
PayloadVersion int
|
||||
PathJSON string
|
||||
DecodedJSON string
|
||||
}
|
||||
|
||||
// MQTTPacketMessage is the JSON payload from an MQTT raw packet message.
|
||||
type MQTTPacketMessage struct {
|
||||
Raw string `json:"raw"`
|
||||
SNR *float64 `json:"SNR"`
|
||||
RSSI *float64 `json:"RSSI"`
|
||||
Origin string `json:"origin"`
|
||||
}
|
||||
|
||||
// BuildPacketData constructs a PacketData from a decoded packet and MQTT message.
|
||||
func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID, region string) *PacketData {
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
pathJSON := "[]"
|
||||
if len(decoded.Path.Hops) > 0 {
|
||||
b, _ := json.Marshal(decoded.Path.Hops)
|
||||
pathJSON = string(b)
|
||||
}
|
||||
|
||||
return &PacketData{
|
||||
RawHex: msg.Raw,
|
||||
Timestamp: now,
|
||||
ObserverID: observerID,
|
||||
ObserverName: msg.Origin,
|
||||
SNR: msg.SNR,
|
||||
RSSI: msg.RSSI,
|
||||
Hash: ComputeContentHash(msg.Raw),
|
||||
RouteType: decoded.Header.RouteType,
|
||||
PayloadType: decoded.Header.PayloadType,
|
||||
PayloadVersion: decoded.Header.PayloadVersion,
|
||||
PathJSON: pathJSON,
|
||||
DecodedJSON: PayloadJSON(&decoded.Payload),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func tempDBPath(t *testing.T) string {
|
||||
@@ -626,3 +630,306 @@ func TestSchemaCompatibility(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrentWrites(t *testing.T) {
|
||||
s, err := OpenStore(tempDBPath(t))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
// Pre-create an observer for observer_idx resolution
|
||||
if err := s.UpsertObserver("obs1", "Observer1", "SJC"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
const goroutines = 20
|
||||
const writesPerGoroutine = 50
|
||||
|
||||
errCh := make(chan error, goroutines*writesPerGoroutine)
|
||||
done := make(chan struct{})
|
||||
|
||||
for g := 0; g < goroutines; g++ {
|
||||
go func(gIdx int) {
|
||||
defer func() { done <- struct{}{} }()
|
||||
for i := 0; i < writesPerGoroutine; i++ {
|
||||
hash := fmt.Sprintf("concurrent_%d_%d_____", gIdx, i) // pad to 16+ chars
|
||||
snr := 5.0
|
||||
rssi := -100.0
|
||||
data := &PacketData{
|
||||
RawHex: "0A00D69F",
|
||||
Timestamp: time.Now().UTC().Format(time.RFC3339),
|
||||
ObserverID: "obs1",
|
||||
Hash: hash[:16],
|
||||
RouteType: 2,
|
||||
PayloadType: 4, // ADVERT
|
||||
PathJSON: "[]",
|
||||
DecodedJSON: `{"type":"ADVERT"}`,
|
||||
SNR: &snr,
|
||||
RSSI: &rssi,
|
||||
}
|
||||
if _, err := s.InsertTransmission(data); err != nil {
|
||||
errCh <- fmt.Errorf("goroutine %d write %d: %w", gIdx, i, err)
|
||||
return
|
||||
}
|
||||
// Also do node + observer upserts to simulate full pipeline
|
||||
lat := 37.0
|
||||
lon := -122.0
|
||||
pubKey := fmt.Sprintf("node_%d_%d________", gIdx, i)
|
||||
if err := s.UpsertNode(pubKey[:16], "Node", "repeater", &lat, &lon, data.Timestamp); err != nil {
|
||||
errCh <- fmt.Errorf("goroutine %d node upsert %d: %w", gIdx, i, err)
|
||||
return
|
||||
}
|
||||
obsID := fmt.Sprintf("obs_%d_%d__________", gIdx, i)
|
||||
if err := s.UpsertObserver(obsID[:16], "Obs", "SJC"); err != nil {
|
||||
errCh <- fmt.Errorf("goroutine %d observer upsert %d: %w", gIdx, i, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}(g)
|
||||
}
|
||||
|
||||
// Wait for all goroutines
|
||||
for g := 0; g < goroutines; g++ {
|
||||
<-done
|
||||
}
|
||||
close(errCh)
|
||||
|
||||
var errors []error
|
||||
for err := range errCh {
|
||||
errors = append(errors, err)
|
||||
}
|
||||
|
||||
if len(errors) > 0 {
|
||||
t.Errorf("got %d errors from %d concurrent writers (first: %v)", len(errors), goroutines, errors[0])
|
||||
}
|
||||
|
||||
// Verify data integrity
|
||||
var txCount, obsCount, nodeCount, observerCount int
|
||||
s.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&txCount)
|
||||
s.db.QueryRow("SELECT COUNT(*) FROM observations").Scan(&obsCount)
|
||||
s.db.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&nodeCount)
|
||||
s.db.QueryRow("SELECT COUNT(*) FROM observers").Scan(&observerCount)
|
||||
|
||||
expectedTx := goroutines * writesPerGoroutine
|
||||
if txCount != expectedTx {
|
||||
t.Errorf("transmissions count=%d, want %d", txCount, expectedTx)
|
||||
}
|
||||
if obsCount != expectedTx {
|
||||
t.Errorf("observations count=%d, want %d", obsCount, expectedTx)
|
||||
}
|
||||
|
||||
t.Logf("Concurrent write test: %d goroutines × %d writes = %d total, 0 errors",
|
||||
goroutines, writesPerGoroutine, goroutines*writesPerGoroutine)
|
||||
t.Logf("Stats: tx_inserted=%d tx_dupes=%d obs_inserted=%d write_errors=%d",
|
||||
s.Stats.TransmissionsInserted.Load(),
|
||||
s.Stats.DuplicateTransmissions.Load(),
|
||||
s.Stats.ObservationsInserted.Load(),
|
||||
s.Stats.WriteErrors.Load(),
|
||||
)
|
||||
}
|
||||
|
||||
func TestDBStats(t *testing.T) {
|
||||
s, err := OpenStore(tempDBPath(t))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
// Initial stats should be zero
|
||||
if s.Stats.TransmissionsInserted.Load() != 0 {
|
||||
t.Error("initial TransmissionsInserted should be 0")
|
||||
}
|
||||
if s.Stats.WriteErrors.Load() != 0 {
|
||||
t.Error("initial WriteErrors should be 0")
|
||||
}
|
||||
|
||||
// Insert a transmission
|
||||
data := &PacketData{
|
||||
RawHex: "0A00D69F",
|
||||
Timestamp: "2026-03-28T00:00:00Z",
|
||||
Hash: "stats_test_12345",
|
||||
RouteType: 2,
|
||||
PathJSON: "[]",
|
||||
}
|
||||
if _, err := s.InsertTransmission(data); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if s.Stats.TransmissionsInserted.Load() != 1 {
|
||||
t.Errorf("TransmissionsInserted=%d, want 1", s.Stats.TransmissionsInserted.Load())
|
||||
}
|
||||
if s.Stats.ObservationsInserted.Load() != 1 {
|
||||
t.Errorf("ObservationsInserted=%d, want 1", s.Stats.ObservationsInserted.Load())
|
||||
}
|
||||
|
||||
// Insert duplicate
|
||||
if _, err := s.InsertTransmission(data); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if s.Stats.DuplicateTransmissions.Load() != 1 {
|
||||
t.Errorf("DuplicateTransmissions=%d, want 1", s.Stats.DuplicateTransmissions.Load())
|
||||
}
|
||||
|
||||
// Node upsert
|
||||
lat := 37.0
|
||||
lon := -122.0
|
||||
if err := s.UpsertNode("pk1", "Node1", "repeater", &lat, &lon, "2026-03-28T00:00:00Z"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if s.Stats.NodeUpserts.Load() != 1 {
|
||||
t.Errorf("NodeUpserts=%d, want 1", s.Stats.NodeUpserts.Load())
|
||||
}
|
||||
|
||||
// Observer upsert
|
||||
if err := s.UpsertObserver("obs1", "Obs1", "SJC"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if s.Stats.ObserverUpserts.Load() != 1 {
|
||||
t.Errorf("ObserverUpserts=%d, want 1", s.Stats.ObserverUpserts.Load())
|
||||
}
|
||||
|
||||
// LogStats should not panic
|
||||
s.LogStats()
|
||||
}
|
||||
|
||||
func TestLoadTestThroughput(t *testing.T) {
|
||||
s, err := OpenStore(tempDBPath(t))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
// Pre-create observer
|
||||
if err := s.UpsertObserver("obs1", "Observer1", "SJC"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
const totalMessages = 1000
|
||||
const goroutines = 20
|
||||
perGoroutine := totalMessages / goroutines
|
||||
|
||||
// Simulate full pipeline: InsertTransmission + UpsertNode + UpsertObserver + IncrementAdvertCount
|
||||
// This matches the real handleMessage write pattern for ADVERT packets
|
||||
latencies := make([]time.Duration, totalMessages)
|
||||
var busyErrors atomic.Int64
|
||||
var totalErrors atomic.Int64
|
||||
errCh := make(chan error, totalMessages)
|
||||
done := make(chan struct{})
|
||||
|
||||
start := time.Now()
|
||||
|
||||
for g := 0; g < goroutines; g++ {
|
||||
go func(gIdx int) {
|
||||
defer func() { done <- struct{}{} }()
|
||||
for i := 0; i < perGoroutine; i++ {
|
||||
msgStart := time.Now()
|
||||
idx := gIdx*perGoroutine + i
|
||||
hash := fmt.Sprintf("load_%04d_%04d____", gIdx, i)
|
||||
snr := 5.0
|
||||
rssi := -100.0
|
||||
|
||||
data := &PacketData{
|
||||
RawHex: "0A00D69F",
|
||||
Timestamp: time.Now().UTC().Format(time.RFC3339),
|
||||
ObserverID: "obs1",
|
||||
Hash: hash[:16],
|
||||
RouteType: 2,
|
||||
PayloadType: 4,
|
||||
PathJSON: "[]",
|
||||
DecodedJSON: `{"type":"ADVERT","pubKey":"` + hash[:16] + `"}`,
|
||||
SNR: &snr,
|
||||
RSSI: &rssi,
|
||||
}
|
||||
|
||||
_, err := s.InsertTransmission(data)
|
||||
if err != nil {
|
||||
totalErrors.Add(1)
|
||||
if strings.Contains(err.Error(), "database is locked") || strings.Contains(err.Error(), "SQLITE_BUSY") {
|
||||
busyErrors.Add(1)
|
||||
}
|
||||
errCh <- err
|
||||
continue
|
||||
}
|
||||
|
||||
lat := 37.0 + float64(gIdx)*0.001
|
||||
lon := -122.0 + float64(i)*0.001
|
||||
pubKey := fmt.Sprintf("node_%04d_%04d____", gIdx, i)
|
||||
if err := s.UpsertNode(pubKey[:16], "Node", "repeater", &lat, &lon, data.Timestamp); err != nil {
|
||||
totalErrors.Add(1)
|
||||
if strings.Contains(err.Error(), "locked") || strings.Contains(err.Error(), "BUSY") {
|
||||
busyErrors.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.IncrementAdvertCount(pubKey[:16]); err != nil {
|
||||
totalErrors.Add(1)
|
||||
}
|
||||
|
||||
obsID := fmt.Sprintf("obs_%04d_%04d_____", gIdx, i)
|
||||
if err := s.UpsertObserver(obsID[:16], "Obs", "SJC"); err != nil {
|
||||
totalErrors.Add(1)
|
||||
if strings.Contains(err.Error(), "locked") || strings.Contains(err.Error(), "BUSY") {
|
||||
busyErrors.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
latencies[idx] = time.Since(msgStart)
|
||||
}
|
||||
}(g)
|
||||
}
|
||||
|
||||
for g := 0; g < goroutines; g++ {
|
||||
<-done
|
||||
}
|
||||
close(errCh)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
// Calculate p50, p95, p99
|
||||
validLatencies := make([]time.Duration, 0, totalMessages)
|
||||
for _, l := range latencies {
|
||||
if l > 0 {
|
||||
validLatencies = append(validLatencies, l)
|
||||
}
|
||||
}
|
||||
sort.Slice(validLatencies, func(i, j int) bool { return validLatencies[i] < validLatencies[j] })
|
||||
|
||||
p50 := validLatencies[len(validLatencies)*50/100]
|
||||
p95 := validLatencies[len(validLatencies)*95/100]
|
||||
p99 := validLatencies[len(validLatencies)*99/100]
|
||||
msgsPerSec := float64(totalMessages) / elapsed.Seconds()
|
||||
|
||||
t.Logf("=== LOAD TEST RESULTS ===")
|
||||
t.Logf("Messages: %d (%d goroutines × %d each)", totalMessages, goroutines, perGoroutine)
|
||||
t.Logf("Writes/msg: 4 (InsertTx + UpsertNode + IncrAdvertCount + UpsertObserver)")
|
||||
t.Logf("Total writes: %d", totalMessages*4)
|
||||
t.Logf("Duration: %s", elapsed.Round(time.Millisecond))
|
||||
t.Logf("Throughput: %.1f msgs/sec (%.1f writes/sec)", msgsPerSec, msgsPerSec*4)
|
||||
t.Logf("Latency p50: %s", p50.Round(time.Microsecond))
|
||||
t.Logf("Latency p95: %s", p95.Round(time.Microsecond))
|
||||
t.Logf("Latency p99: %s", p99.Round(time.Microsecond))
|
||||
t.Logf("SQLITE_BUSY: %d", busyErrors.Load())
|
||||
t.Logf("Total errors: %d", totalErrors.Load())
|
||||
t.Logf("Stats: tx=%d dupes=%d obs=%d nodes=%d observers=%d write_err=%d",
|
||||
s.Stats.TransmissionsInserted.Load(),
|
||||
s.Stats.DuplicateTransmissions.Load(),
|
||||
s.Stats.ObservationsInserted.Load(),
|
||||
s.Stats.NodeUpserts.Load(),
|
||||
s.Stats.ObserverUpserts.Load(),
|
||||
s.Stats.WriteErrors.Load(),
|
||||
)
|
||||
|
||||
// Hard assertions
|
||||
if busyErrors.Load() > 0 {
|
||||
t.Errorf("SQLITE_BUSY errors: %d (expected 0)", busyErrors.Load())
|
||||
}
|
||||
if totalErrors.Load() > 0 {
|
||||
t.Errorf("Total errors: %d (expected 0)", totalErrors.Load())
|
||||
}
|
||||
|
||||
var txCount int
|
||||
s.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&txCount)
|
||||
if txCount != totalMessages {
|
||||
t.Errorf("transmissions=%d, want %d", txCount, totalMessages)
|
||||
}
|
||||
}
|
||||
|
||||
+501
-491
@@ -1,491 +1,501 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"crypto/tls"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
)
|
||||
|
||||
func main() {
|
||||
configPath := flag.String("config", "config.json", "path to config file")
|
||||
flag.Parse()
|
||||
|
||||
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
|
||||
log.SetPrefix("[ingestor] ")
|
||||
|
||||
cfg, err := LoadConfig(*configPath)
|
||||
if err != nil {
|
||||
log.Fatalf("config: %v", err)
|
||||
}
|
||||
|
||||
sources := cfg.ResolvedSources()
|
||||
if len(sources) == 0 {
|
||||
log.Fatal("no MQTT sources configured — set mqttSources in config or MQTT_BROKER env var")
|
||||
}
|
||||
|
||||
store, err := OpenStore(cfg.DBPath)
|
||||
if err != nil {
|
||||
log.Fatalf("db: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
log.Printf("SQLite opened: %s", cfg.DBPath)
|
||||
|
||||
// Node retention: move stale nodes to inactive_nodes on startup
|
||||
nodeDays := cfg.NodeDaysOrDefault()
|
||||
store.MoveStaleNodes(nodeDays)
|
||||
|
||||
// Daily ticker for node retention
|
||||
retentionTicker := time.NewTicker(1 * time.Hour)
|
||||
go func() {
|
||||
for range retentionTicker.C {
|
||||
store.MoveStaleNodes(nodeDays)
|
||||
}
|
||||
}()
|
||||
|
||||
channelKeys := loadChannelKeys(cfg, *configPath)
|
||||
if len(channelKeys) > 0 {
|
||||
log.Printf("Loaded %d channel keys for GRP_TXT decryption", len(channelKeys))
|
||||
} else {
|
||||
log.Printf("No channel keys loaded — GRP_TXT packets will not be decrypted")
|
||||
}
|
||||
|
||||
// Connect to each MQTT source
|
||||
var clients []mqtt.Client
|
||||
for _, source := range sources {
|
||||
tag := source.Name
|
||||
if tag == "" {
|
||||
tag = source.Broker
|
||||
}
|
||||
|
||||
opts := mqtt.NewClientOptions().
|
||||
AddBroker(source.Broker).
|
||||
SetAutoReconnect(true).
|
||||
SetConnectRetry(true).
|
||||
SetOrderMatters(false)
|
||||
|
||||
if source.Username != "" {
|
||||
opts.SetUsername(source.Username)
|
||||
}
|
||||
if source.Password != "" {
|
||||
opts.SetPassword(source.Password)
|
||||
}
|
||||
if source.RejectUnauthorized != nil && !*source.RejectUnauthorized {
|
||||
opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})
|
||||
} else if strings.HasPrefix(source.Broker, "ssl://") {
|
||||
opts.SetTLSConfig(&tls.Config{})
|
||||
}
|
||||
|
||||
opts.SetOnConnectHandler(func(c mqtt.Client) {
|
||||
log.Printf("MQTT [%s] connected to %s", tag, source.Broker)
|
||||
topics := source.Topics
|
||||
if len(topics) == 0 {
|
||||
topics = []string{"meshcore/#"}
|
||||
}
|
||||
for _, t := range topics {
|
||||
token := c.Subscribe(t, 0, nil)
|
||||
token.Wait()
|
||||
if token.Error() != nil {
|
||||
log.Printf("MQTT [%s] subscribe error for %s: %v", tag, t, token.Error())
|
||||
} else {
|
||||
log.Printf("MQTT [%s] subscribed to %s", tag, t)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
|
||||
log.Printf("MQTT [%s] disconnected: %v", tag, err)
|
||||
})
|
||||
|
||||
// Capture source for closure
|
||||
src := source
|
||||
opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {
|
||||
handleMessage(store, tag, src, m, channelKeys)
|
||||
})
|
||||
|
||||
client := mqtt.NewClient(opts)
|
||||
token := client.Connect()
|
||||
token.Wait()
|
||||
if token.Error() != nil {
|
||||
log.Printf("MQTT [%s] connection failed (non-fatal): %v", tag, token.Error())
|
||||
continue
|
||||
}
|
||||
clients = append(clients, client)
|
||||
}
|
||||
|
||||
if len(clients) == 0 {
|
||||
log.Fatal("no MQTT connections established")
|
||||
}
|
||||
|
||||
log.Printf("Running — %d MQTT source(s) connected", len(clients))
|
||||
|
||||
// Wait for shutdown signal
|
||||
sig := make(chan os.Signal, 1)
|
||||
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sig
|
||||
|
||||
log.Println("Shutting down...")
|
||||
retentionTicker.Stop()
|
||||
for _, c := range clients {
|
||||
c.Disconnect(1000)
|
||||
}
|
||||
log.Println("Done.")
|
||||
}
|
||||
|
||||
func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, channelKeys map[string]string) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("MQTT [%s] panic in handler: %v", tag, r)
|
||||
}
|
||||
}()
|
||||
|
||||
topic := m.Topic()
|
||||
parts := strings.Split(topic, "/")
|
||||
|
||||
// IATA filter
|
||||
if len(source.IATAFilter) > 0 && len(parts) > 1 {
|
||||
region := parts[1]
|
||||
matched := false
|
||||
for _, f := range source.IATAFilter {
|
||||
if f == region {
|
||||
matched = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !matched {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
var msg map[string]interface{}
|
||||
if err := json.Unmarshal(m.Payload(), &msg); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Skip status/connection topics
|
||||
if topic == "meshcore/status" || topic == "meshcore/events/connection" {
|
||||
return
|
||||
}
|
||||
|
||||
// Status topic: meshcore/<region>/<observer_id>/status
|
||||
if len(parts) >= 4 && parts[3] == "status" {
|
||||
observerID := parts[2]
|
||||
name, _ := msg["origin"].(string)
|
||||
iata := parts[1]
|
||||
if err := store.UpsertObserver(observerID, name, iata); err != nil {
|
||||
log.Printf("MQTT [%s] observer status error: %v", tag, err)
|
||||
}
|
||||
log.Printf("MQTT [%s] status: %s (%s)", tag, firstNonEmpty(name, observerID), iata)
|
||||
return
|
||||
}
|
||||
|
||||
// Format 1: Raw packet (meshcoretomqtt / Cisien format)
|
||||
rawHex, _ := msg["raw"].(string)
|
||||
if rawHex != "" {
|
||||
decoded, err := DecodePacket(rawHex, channelKeys)
|
||||
if err != nil {
|
||||
log.Printf("MQTT [%s] decode error: %v", tag, err)
|
||||
return
|
||||
}
|
||||
|
||||
observerID := ""
|
||||
region := ""
|
||||
if len(parts) > 2 {
|
||||
observerID = parts[2]
|
||||
}
|
||||
if len(parts) > 1 {
|
||||
region = parts[1]
|
||||
}
|
||||
|
||||
mqttMsg := &MQTTPacketMessage{Raw: rawHex}
|
||||
if v, ok := msg["SNR"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
mqttMsg.SNR = &f
|
||||
}
|
||||
}
|
||||
if v, ok := msg["RSSI"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
mqttMsg.RSSI = &f
|
||||
}
|
||||
}
|
||||
if v, ok := msg["origin"].(string); ok {
|
||||
mqttMsg.Origin = v
|
||||
}
|
||||
|
||||
pktData := BuildPacketData(mqttMsg, decoded, observerID, region)
|
||||
isNew, err := store.InsertTransmission(pktData)
|
||||
if err != nil {
|
||||
log.Printf("MQTT [%s] db insert error: %v", tag, err)
|
||||
}
|
||||
|
||||
// Process ADVERT → upsert node
|
||||
if decoded.Header.PayloadTypeName == "ADVERT" && decoded.Payload.PubKey != "" {
|
||||
ok, reason := ValidateAdvert(&decoded.Payload)
|
||||
if ok {
|
||||
role := advertRole(decoded.Payload.Flags)
|
||||
if err := store.UpsertNode(decoded.Payload.PubKey, decoded.Payload.Name, role, decoded.Payload.Lat, decoded.Payload.Lon, pktData.Timestamp); err != nil {
|
||||
log.Printf("MQTT [%s] node upsert error: %v", tag, err)
|
||||
}
|
||||
if isNew {
|
||||
if err := store.IncrementAdvertCount(decoded.Payload.PubKey); err != nil {
|
||||
log.Printf("MQTT [%s] advert count error: %v", tag, err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Printf("MQTT [%s] skipping corrupted ADVERT: %s", tag, reason)
|
||||
}
|
||||
}
|
||||
|
||||
// Upsert observer
|
||||
if observerID != "" {
|
||||
origin, _ := msg["origin"].(string)
|
||||
if err := store.UpsertObserver(observerID, origin, region); err != nil {
|
||||
log.Printf("MQTT [%s] observer upsert error: %v", tag, err)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Format 2: Companion bridge channel message (meshcore/message/channel/<n>)
|
||||
if strings.HasPrefix(topic, "meshcore/message/channel/") {
|
||||
text, _ := msg["text"].(string)
|
||||
if text == "" {
|
||||
return
|
||||
}
|
||||
|
||||
channelIdx := ""
|
||||
if len(parts) >= 4 {
|
||||
channelIdx = parts[3]
|
||||
}
|
||||
if ci, ok := msg["channel_idx"]; ok {
|
||||
channelIdx = fmt.Sprintf("%v", ci)
|
||||
}
|
||||
|
||||
// Extract sender from "Name: message" format
|
||||
sender := ""
|
||||
if idx := strings.Index(text, ": "); idx > 0 && idx < 50 {
|
||||
sender = text[:idx]
|
||||
}
|
||||
|
||||
channelName := fmt.Sprintf("ch%s", channelIdx)
|
||||
|
||||
// Build decoded JSON matching Node.js CHAN format
|
||||
channelMsg := map[string]interface{}{
|
||||
"type": "CHAN",
|
||||
"channel": channelName,
|
||||
"text": text,
|
||||
"sender": sender,
|
||||
}
|
||||
if st, ok := msg["sender_timestamp"]; ok {
|
||||
channelMsg["sender_timestamp"] = st
|
||||
}
|
||||
|
||||
decodedJSON, _ := json.Marshal(channelMsg)
|
||||
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
hashInput := fmt.Sprintf("ch:%s:%s:%s", channelIdx, text, now)
|
||||
h := sha256.Sum256([]byte(hashInput))
|
||||
hash := hex.EncodeToString(h[:])[:16]
|
||||
|
||||
var snr, rssi *float64
|
||||
if v, ok := msg["SNR"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
snr = &f
|
||||
}
|
||||
} else if v, ok := msg["snr"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
snr = &f
|
||||
}
|
||||
}
|
||||
if v, ok := msg["RSSI"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
rssi = &f
|
||||
}
|
||||
} else if v, ok := msg["rssi"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
rssi = &f
|
||||
}
|
||||
}
|
||||
|
||||
pktData := &PacketData{
|
||||
Timestamp: now,
|
||||
ObserverID: "companion",
|
||||
ObserverName: "L1 Pro (BLE)",
|
||||
SNR: snr,
|
||||
RSSI: rssi,
|
||||
Hash: hash,
|
||||
RouteType: 1, // FLOOD
|
||||
PayloadType: 5, // GRP_TXT
|
||||
PathJSON: "[]",
|
||||
DecodedJSON: string(decodedJSON),
|
||||
}
|
||||
|
||||
if _, err := store.InsertTransmission(pktData); err != nil {
|
||||
log.Printf("MQTT [%s] channel insert error: %v", tag, err)
|
||||
}
|
||||
|
||||
// Upsert sender as a companion node
|
||||
if sender != "" {
|
||||
senderKey := "sender-" + strings.ToLower(sender)
|
||||
if err := store.UpsertNode(senderKey, sender, "companion", nil, nil, now); err != nil {
|
||||
log.Printf("MQTT [%s] sender node upsert error: %v", tag, err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("MQTT [%s] channel message: ch%s from %s", tag, channelIdx, firstNonEmpty(sender, "unknown"))
|
||||
return
|
||||
}
|
||||
|
||||
// Format 2b: Companion bridge direct message (meshcore/message/direct/<id>)
|
||||
if strings.HasPrefix(topic, "meshcore/message/direct/") {
|
||||
text, _ := msg["text"].(string)
|
||||
if text == "" {
|
||||
return
|
||||
}
|
||||
|
||||
sender := ""
|
||||
if idx := strings.Index(text, ": "); idx > 0 && idx < 50 {
|
||||
sender = text[:idx]
|
||||
}
|
||||
|
||||
dm := map[string]interface{}{
|
||||
"type": "DM",
|
||||
"text": text,
|
||||
"sender": sender,
|
||||
}
|
||||
if st, ok := msg["sender_timestamp"]; ok {
|
||||
dm["sender_timestamp"] = st
|
||||
}
|
||||
|
||||
decodedJSON, _ := json.Marshal(dm)
|
||||
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
hashInput := fmt.Sprintf("dm:%s:%s", text, now)
|
||||
h := sha256.Sum256([]byte(hashInput))
|
||||
hash := hex.EncodeToString(h[:])[:16]
|
||||
|
||||
var snr, rssi *float64
|
||||
if v, ok := msg["SNR"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
snr = &f
|
||||
}
|
||||
} else if v, ok := msg["snr"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
snr = &f
|
||||
}
|
||||
}
|
||||
if v, ok := msg["RSSI"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
rssi = &f
|
||||
}
|
||||
} else if v, ok := msg["rssi"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
rssi = &f
|
||||
}
|
||||
}
|
||||
|
||||
pktData := &PacketData{
|
||||
Timestamp: now,
|
||||
ObserverID: "companion",
|
||||
ObserverName: "L1 Pro (BLE)",
|
||||
SNR: snr,
|
||||
RSSI: rssi,
|
||||
Hash: hash,
|
||||
RouteType: 1, // FLOOD
|
||||
PayloadType: 2, // TXT_MSG
|
||||
PathJSON: "[]",
|
||||
DecodedJSON: string(decodedJSON),
|
||||
}
|
||||
|
||||
if _, err := store.InsertTransmission(pktData); err != nil {
|
||||
log.Printf("MQTT [%s] DM insert error: %v", tag, err)
|
||||
}
|
||||
|
||||
log.Printf("MQTT [%s] direct message from %s", tag, firstNonEmpty(sender, "unknown"))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func toFloat64(v interface{}) (float64, bool) {
|
||||
switch n := v.(type) {
|
||||
case float64:
|
||||
return n, true
|
||||
case float32:
|
||||
return float64(n), true
|
||||
case int:
|
||||
return float64(n), true
|
||||
case int64:
|
||||
return float64(n), true
|
||||
case json.Number:
|
||||
f, err := n.Float64()
|
||||
return f, err == nil
|
||||
default:
|
||||
return 0, false
|
||||
}
|
||||
}
|
||||
|
||||
func firstNonEmpty(vals ...string) string {
|
||||
for _, v := range vals {
|
||||
if v != "" {
|
||||
return v
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// loadChannelKeys loads channel decryption keys from config and/or a JSON file.
|
||||
// Priority: CHANNEL_KEYS_PATH env var > cfg.ChannelKeysPath > channel-rainbow.json next to config.
|
||||
func loadChannelKeys(cfg *Config, configPath string) map[string]string {
|
||||
keys := make(map[string]string)
|
||||
|
||||
// Determine file path for rainbow keys
|
||||
keysPath := os.Getenv("CHANNEL_KEYS_PATH")
|
||||
if keysPath == "" {
|
||||
keysPath = cfg.ChannelKeysPath
|
||||
}
|
||||
if keysPath == "" {
|
||||
// Default: look for channel-rainbow.json next to config file
|
||||
keysPath = filepath.Join(filepath.Dir(configPath), "channel-rainbow.json")
|
||||
}
|
||||
|
||||
if data, err := os.ReadFile(keysPath); err == nil {
|
||||
var fileKeys map[string]string
|
||||
if err := json.Unmarshal(data, &fileKeys); err == nil {
|
||||
for k, v := range fileKeys {
|
||||
keys[k] = v
|
||||
}
|
||||
log.Printf("Loaded %d channel keys from %s", len(fileKeys), keysPath)
|
||||
} else {
|
||||
log.Printf("Warning: failed to parse channel keys file %s: %v", keysPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Merge inline config keys (override file keys)
|
||||
for k, v := range cfg.ChannelKeys {
|
||||
keys[k] = v
|
||||
}
|
||||
|
||||
return keys
|
||||
}
|
||||
|
||||
// Version info (set via ldflags)
|
||||
var version = "dev"
|
||||
|
||||
func init() {
|
||||
if len(os.Args) > 1 && os.Args[1] == "--version" {
|
||||
fmt.Println("meshcore-ingestor", version)
|
||||
os.Exit(0)
|
||||
}
|
||||
}
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"crypto/tls"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
)
|
||||
|
||||
func main() {
|
||||
configPath := flag.String("config", "config.json", "path to config file")
|
||||
flag.Parse()
|
||||
|
||||
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
|
||||
log.SetPrefix("[ingestor] ")
|
||||
|
||||
cfg, err := LoadConfig(*configPath)
|
||||
if err != nil {
|
||||
log.Fatalf("config: %v", err)
|
||||
}
|
||||
|
||||
sources := cfg.ResolvedSources()
|
||||
if len(sources) == 0 {
|
||||
log.Fatal("no MQTT sources configured — set mqttSources in config or MQTT_BROKER env var")
|
||||
}
|
||||
|
||||
store, err := OpenStore(cfg.DBPath)
|
||||
if err != nil {
|
||||
log.Fatalf("db: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
log.Printf("SQLite opened: %s", cfg.DBPath)
|
||||
|
||||
// Node retention: move stale nodes to inactive_nodes on startup
|
||||
nodeDays := cfg.NodeDaysOrDefault()
|
||||
store.MoveStaleNodes(nodeDays)
|
||||
|
||||
// Daily ticker for node retention
|
||||
retentionTicker := time.NewTicker(1 * time.Hour)
|
||||
go func() {
|
||||
for range retentionTicker.C {
|
||||
store.MoveStaleNodes(nodeDays)
|
||||
}
|
||||
}()
|
||||
|
||||
// Periodic stats logging (every 5 minutes)
|
||||
statsTicker := time.NewTicker(5 * time.Minute)
|
||||
go func() {
|
||||
for range statsTicker.C {
|
||||
store.LogStats()
|
||||
}
|
||||
}()
|
||||
|
||||
channelKeys := loadChannelKeys(cfg, *configPath)
|
||||
if len(channelKeys) > 0 {
|
||||
log.Printf("Loaded %d channel keys for GRP_TXT decryption", len(channelKeys))
|
||||
} else {
|
||||
log.Printf("No channel keys loaded — GRP_TXT packets will not be decrypted")
|
||||
}
|
||||
|
||||
// Connect to each MQTT source
|
||||
var clients []mqtt.Client
|
||||
for _, source := range sources {
|
||||
tag := source.Name
|
||||
if tag == "" {
|
||||
tag = source.Broker
|
||||
}
|
||||
|
||||
opts := mqtt.NewClientOptions().
|
||||
AddBroker(source.Broker).
|
||||
SetAutoReconnect(true).
|
||||
SetConnectRetry(true).
|
||||
SetOrderMatters(true)
|
||||
|
||||
if source.Username != "" {
|
||||
opts.SetUsername(source.Username)
|
||||
}
|
||||
if source.Password != "" {
|
||||
opts.SetPassword(source.Password)
|
||||
}
|
||||
if source.RejectUnauthorized != nil && !*source.RejectUnauthorized {
|
||||
opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})
|
||||
} else if strings.HasPrefix(source.Broker, "ssl://") {
|
||||
opts.SetTLSConfig(&tls.Config{})
|
||||
}
|
||||
|
||||
opts.SetOnConnectHandler(func(c mqtt.Client) {
|
||||
log.Printf("MQTT [%s] connected to %s", tag, source.Broker)
|
||||
topics := source.Topics
|
||||
if len(topics) == 0 {
|
||||
topics = []string{"meshcore/#"}
|
||||
}
|
||||
for _, t := range topics {
|
||||
token := c.Subscribe(t, 0, nil)
|
||||
token.Wait()
|
||||
if token.Error() != nil {
|
||||
log.Printf("MQTT [%s] subscribe error for %s: %v", tag, t, token.Error())
|
||||
} else {
|
||||
log.Printf("MQTT [%s] subscribed to %s", tag, t)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
|
||||
log.Printf("MQTT [%s] disconnected: %v", tag, err)
|
||||
})
|
||||
|
||||
// Capture source for closure
|
||||
src := source
|
||||
opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {
|
||||
handleMessage(store, tag, src, m, channelKeys)
|
||||
})
|
||||
|
||||
client := mqtt.NewClient(opts)
|
||||
token := client.Connect()
|
||||
token.Wait()
|
||||
if token.Error() != nil {
|
||||
log.Printf("MQTT [%s] connection failed (non-fatal): %v", tag, token.Error())
|
||||
continue
|
||||
}
|
||||
clients = append(clients, client)
|
||||
}
|
||||
|
||||
if len(clients) == 0 {
|
||||
log.Fatal("no MQTT connections established")
|
||||
}
|
||||
|
||||
log.Printf("Running — %d MQTT source(s) connected", len(clients))
|
||||
|
||||
// Wait for shutdown signal
|
||||
sig := make(chan os.Signal, 1)
|
||||
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sig
|
||||
|
||||
log.Println("Shutting down...")
|
||||
retentionTicker.Stop()
|
||||
statsTicker.Stop()
|
||||
store.LogStats() // final stats on shutdown
|
||||
for _, c := range clients {
|
||||
c.Disconnect(1000)
|
||||
}
|
||||
log.Println("Done.")
|
||||
}
|
||||
|
||||
func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, channelKeys map[string]string) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("MQTT [%s] panic in handler: %v", tag, r)
|
||||
}
|
||||
}()
|
||||
|
||||
topic := m.Topic()
|
||||
parts := strings.Split(topic, "/")
|
||||
|
||||
// IATA filter
|
||||
if len(source.IATAFilter) > 0 && len(parts) > 1 {
|
||||
region := parts[1]
|
||||
matched := false
|
||||
for _, f := range source.IATAFilter {
|
||||
if f == region {
|
||||
matched = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !matched {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
var msg map[string]interface{}
|
||||
if err := json.Unmarshal(m.Payload(), &msg); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Skip status/connection topics
|
||||
if topic == "meshcore/status" || topic == "meshcore/events/connection" {
|
||||
return
|
||||
}
|
||||
|
||||
// Status topic: meshcore/<region>/<observer_id>/status
|
||||
if len(parts) >= 4 && parts[3] == "status" {
|
||||
observerID := parts[2]
|
||||
name, _ := msg["origin"].(string)
|
||||
iata := parts[1]
|
||||
if err := store.UpsertObserver(observerID, name, iata); err != nil {
|
||||
log.Printf("MQTT [%s] observer status error: %v", tag, err)
|
||||
}
|
||||
log.Printf("MQTT [%s] status: %s (%s)", tag, firstNonEmpty(name, observerID), iata)
|
||||
return
|
||||
}
|
||||
|
||||
// Format 1: Raw packet (meshcoretomqtt / Cisien format)
|
||||
rawHex, _ := msg["raw"].(string)
|
||||
if rawHex != "" {
|
||||
decoded, err := DecodePacket(rawHex, channelKeys)
|
||||
if err != nil {
|
||||
log.Printf("MQTT [%s] decode error: %v", tag, err)
|
||||
return
|
||||
}
|
||||
|
||||
observerID := ""
|
||||
region := ""
|
||||
if len(parts) > 2 {
|
||||
observerID = parts[2]
|
||||
}
|
||||
if len(parts) > 1 {
|
||||
region = parts[1]
|
||||
}
|
||||
|
||||
mqttMsg := &MQTTPacketMessage{Raw: rawHex}
|
||||
if v, ok := msg["SNR"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
mqttMsg.SNR = &f
|
||||
}
|
||||
}
|
||||
if v, ok := msg["RSSI"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
mqttMsg.RSSI = &f
|
||||
}
|
||||
}
|
||||
if v, ok := msg["origin"].(string); ok {
|
||||
mqttMsg.Origin = v
|
||||
}
|
||||
|
||||
pktData := BuildPacketData(mqttMsg, decoded, observerID, region)
|
||||
isNew, err := store.InsertTransmission(pktData)
|
||||
if err != nil {
|
||||
log.Printf("MQTT [%s] db insert error: %v", tag, err)
|
||||
}
|
||||
|
||||
// Process ADVERT → upsert node
|
||||
if decoded.Header.PayloadTypeName == "ADVERT" && decoded.Payload.PubKey != "" {
|
||||
ok, reason := ValidateAdvert(&decoded.Payload)
|
||||
if ok {
|
||||
role := advertRole(decoded.Payload.Flags)
|
||||
if err := store.UpsertNode(decoded.Payload.PubKey, decoded.Payload.Name, role, decoded.Payload.Lat, decoded.Payload.Lon, pktData.Timestamp); err != nil {
|
||||
log.Printf("MQTT [%s] node upsert error: %v", tag, err)
|
||||
}
|
||||
if isNew {
|
||||
if err := store.IncrementAdvertCount(decoded.Payload.PubKey); err != nil {
|
||||
log.Printf("MQTT [%s] advert count error: %v", tag, err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Printf("MQTT [%s] skipping corrupted ADVERT: %s", tag, reason)
|
||||
}
|
||||
}
|
||||
|
||||
// Upsert observer
|
||||
if observerID != "" {
|
||||
origin, _ := msg["origin"].(string)
|
||||
if err := store.UpsertObserver(observerID, origin, region); err != nil {
|
||||
log.Printf("MQTT [%s] observer upsert error: %v", tag, err)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Format 2: Companion bridge channel message (meshcore/message/channel/<n>)
|
||||
if strings.HasPrefix(topic, "meshcore/message/channel/") {
|
||||
text, _ := msg["text"].(string)
|
||||
if text == "" {
|
||||
return
|
||||
}
|
||||
|
||||
channelIdx := ""
|
||||
if len(parts) >= 4 {
|
||||
channelIdx = parts[3]
|
||||
}
|
||||
if ci, ok := msg["channel_idx"]; ok {
|
||||
channelIdx = fmt.Sprintf("%v", ci)
|
||||
}
|
||||
|
||||
// Extract sender from "Name: message" format
|
||||
sender := ""
|
||||
if idx := strings.Index(text, ": "); idx > 0 && idx < 50 {
|
||||
sender = text[:idx]
|
||||
}
|
||||
|
||||
channelName := fmt.Sprintf("ch%s", channelIdx)
|
||||
|
||||
// Build decoded JSON matching Node.js CHAN format
|
||||
channelMsg := map[string]interface{}{
|
||||
"type": "CHAN",
|
||||
"channel": channelName,
|
||||
"text": text,
|
||||
"sender": sender,
|
||||
}
|
||||
if st, ok := msg["sender_timestamp"]; ok {
|
||||
channelMsg["sender_timestamp"] = st
|
||||
}
|
||||
|
||||
decodedJSON, _ := json.Marshal(channelMsg)
|
||||
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
hashInput := fmt.Sprintf("ch:%s:%s:%s", channelIdx, text, now)
|
||||
h := sha256.Sum256([]byte(hashInput))
|
||||
hash := hex.EncodeToString(h[:])[:16]
|
||||
|
||||
var snr, rssi *float64
|
||||
if v, ok := msg["SNR"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
snr = &f
|
||||
}
|
||||
} else if v, ok := msg["snr"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
snr = &f
|
||||
}
|
||||
}
|
||||
if v, ok := msg["RSSI"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
rssi = &f
|
||||
}
|
||||
} else if v, ok := msg["rssi"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
rssi = &f
|
||||
}
|
||||
}
|
||||
|
||||
pktData := &PacketData{
|
||||
Timestamp: now,
|
||||
ObserverID: "companion",
|
||||
ObserverName: "L1 Pro (BLE)",
|
||||
SNR: snr,
|
||||
RSSI: rssi,
|
||||
Hash: hash,
|
||||
RouteType: 1, // FLOOD
|
||||
PayloadType: 5, // GRP_TXT
|
||||
PathJSON: "[]",
|
||||
DecodedJSON: string(decodedJSON),
|
||||
}
|
||||
|
||||
if _, err := store.InsertTransmission(pktData); err != nil {
|
||||
log.Printf("MQTT [%s] channel insert error: %v", tag, err)
|
||||
}
|
||||
|
||||
// Upsert sender as a companion node
|
||||
if sender != "" {
|
||||
senderKey := "sender-" + strings.ToLower(sender)
|
||||
if err := store.UpsertNode(senderKey, sender, "companion", nil, nil, now); err != nil {
|
||||
log.Printf("MQTT [%s] sender node upsert error: %v", tag, err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("MQTT [%s] channel message: ch%s from %s", tag, channelIdx, firstNonEmpty(sender, "unknown"))
|
||||
return
|
||||
}
|
||||
|
||||
// Format 2b: Companion bridge direct message (meshcore/message/direct/<id>)
|
||||
if strings.HasPrefix(topic, "meshcore/message/direct/") {
|
||||
text, _ := msg["text"].(string)
|
||||
if text == "" {
|
||||
return
|
||||
}
|
||||
|
||||
sender := ""
|
||||
if idx := strings.Index(text, ": "); idx > 0 && idx < 50 {
|
||||
sender = text[:idx]
|
||||
}
|
||||
|
||||
dm := map[string]interface{}{
|
||||
"type": "DM",
|
||||
"text": text,
|
||||
"sender": sender,
|
||||
}
|
||||
if st, ok := msg["sender_timestamp"]; ok {
|
||||
dm["sender_timestamp"] = st
|
||||
}
|
||||
|
||||
decodedJSON, _ := json.Marshal(dm)
|
||||
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
hashInput := fmt.Sprintf("dm:%s:%s", text, now)
|
||||
h := sha256.Sum256([]byte(hashInput))
|
||||
hash := hex.EncodeToString(h[:])[:16]
|
||||
|
||||
var snr, rssi *float64
|
||||
if v, ok := msg["SNR"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
snr = &f
|
||||
}
|
||||
} else if v, ok := msg["snr"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
snr = &f
|
||||
}
|
||||
}
|
||||
if v, ok := msg["RSSI"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
rssi = &f
|
||||
}
|
||||
} else if v, ok := msg["rssi"]; ok {
|
||||
if f, ok := toFloat64(v); ok {
|
||||
rssi = &f
|
||||
}
|
||||
}
|
||||
|
||||
pktData := &PacketData{
|
||||
Timestamp: now,
|
||||
ObserverID: "companion",
|
||||
ObserverName: "L1 Pro (BLE)",
|
||||
SNR: snr,
|
||||
RSSI: rssi,
|
||||
Hash: hash,
|
||||
RouteType: 1, // FLOOD
|
||||
PayloadType: 2, // TXT_MSG
|
||||
PathJSON: "[]",
|
||||
DecodedJSON: string(decodedJSON),
|
||||
}
|
||||
|
||||
if _, err := store.InsertTransmission(pktData); err != nil {
|
||||
log.Printf("MQTT [%s] DM insert error: %v", tag, err)
|
||||
}
|
||||
|
||||
log.Printf("MQTT [%s] direct message from %s", tag, firstNonEmpty(sender, "unknown"))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func toFloat64(v interface{}) (float64, bool) {
|
||||
switch n := v.(type) {
|
||||
case float64:
|
||||
return n, true
|
||||
case float32:
|
||||
return float64(n), true
|
||||
case int:
|
||||
return float64(n), true
|
||||
case int64:
|
||||
return float64(n), true
|
||||
case json.Number:
|
||||
f, err := n.Float64()
|
||||
return f, err == nil
|
||||
default:
|
||||
return 0, false
|
||||
}
|
||||
}
|
||||
|
||||
func firstNonEmpty(vals ...string) string {
|
||||
for _, v := range vals {
|
||||
if v != "" {
|
||||
return v
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// loadChannelKeys loads channel decryption keys from config and/or a JSON file.
|
||||
// Priority: CHANNEL_KEYS_PATH env var > cfg.ChannelKeysPath > channel-rainbow.json next to config.
|
||||
func loadChannelKeys(cfg *Config, configPath string) map[string]string {
|
||||
keys := make(map[string]string)
|
||||
|
||||
// Determine file path for rainbow keys
|
||||
keysPath := os.Getenv("CHANNEL_KEYS_PATH")
|
||||
if keysPath == "" {
|
||||
keysPath = cfg.ChannelKeysPath
|
||||
}
|
||||
if keysPath == "" {
|
||||
// Default: look for channel-rainbow.json next to config file
|
||||
keysPath = filepath.Join(filepath.Dir(configPath), "channel-rainbow.json")
|
||||
}
|
||||
|
||||
if data, err := os.ReadFile(keysPath); err == nil {
|
||||
var fileKeys map[string]string
|
||||
if err := json.Unmarshal(data, &fileKeys); err == nil {
|
||||
for k, v := range fileKeys {
|
||||
keys[k] = v
|
||||
}
|
||||
log.Printf("Loaded %d channel keys from %s", len(fileKeys), keysPath)
|
||||
} else {
|
||||
log.Printf("Warning: failed to parse channel keys file %s: %v", keysPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Merge inline config keys (override file keys)
|
||||
for k, v := range cfg.ChannelKeys {
|
||||
keys[k] = v
|
||||
}
|
||||
|
||||
return keys
|
||||
}
|
||||
|
||||
// Version info (set via ldflags)
|
||||
var version = "dev"
|
||||
|
||||
func init() {
|
||||
if len(os.Args) > 1 && os.Args[1] == "--version" {
|
||||
fmt.Println("meshcore-ingestor", version)
|
||||
os.Exit(0)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user