Compare commits

...

8 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
a827fd3b43 fix: gate telemetry on sensor flag, fix 0°C emission, safe migration with PRAGMA check
Agent-Logs-Url: https://github.com/Kpa-clawbot/meshcore-analyzer/sessions/1c2af64b-0e8a-4dd0-ae80-e296f70437e9

Co-authored-by: KpaBap <746025+KpaBap@users.noreply.github.com>
2026-03-28 20:35:50 +00:00
Kpa-clawbot
54cbc648e0 feat: decode telemetry from adverts — battery voltage + temperature on nodes
Sensor nodes embed telemetry (battery_mv, temperature_c) in their advert
appdata after the null-terminated name. This commit adds decoding and
storage for both the Go ingestor and Node.js backend.

Changes:
- decoder.go/decoder.js: Parse telemetry bytes from advert appdata
  (battery_mv as uint16 LE millivolts, temperature_c as int16 LE /100)
- db.go/db.js: Add battery_mv INTEGER and temperature_c REAL columns
  to nodes and inactive_nodes tables, with migration for existing DBs
- main.go/server.js: Update node telemetry on advert processing
- server db.go: Include battery_mv/temperature_c in node API responses
- Tests: Decoder telemetry tests (positive, negative temp, no telemetry),
  DB migration test, node telemetry update test, server API shape tests

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-28 12:07:42 -07:00
Kpa-clawbot
aba4270ceb fix: undefined err in packets_v view creation (use vErr)
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-28 12:00:04 -07:00
Kpa-clawbot
57b0188158 fix: create packets_v VIEW in Go ingestor schema (#217)
Fresh Go installs failed with 'no such table: packets_v' because the
ingestor created tables but never the VIEW that the Go server queries.

Add DROP VIEW IF EXISTS + CREATE VIEW packets_v to applySchema(), using
the v3 definition (observer_idx → observers.rowid JOIN). The view is
rebuilt on every startup to stay current with any definition changes.

Add tests: verify view exists after OpenStore, and verify it returns
correct observer_id/observer_name via the LEFT JOIN.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-28 11:28:38 -07:00
Kpa-clawbot
f374a4a775 fix: enforce consistent types between Go ingestor writes and server reads
Schema:
- observers.noise_floor: INTEGER → REAL (dBm has decimals)
- battery_mv, uptime_secs remain INTEGER (always whole numbers)

Ingestor write side (cmd/ingestor/db.go):
- UpsertObserver now accepts ObserverMeta with battery_mv (int),
  uptime_secs (int64), noise_floor (float64)
- COALESCE preserves existing values when meta is nil
- Added migration: cast integer noise_floor values to REAL

Ingestor MQTT handler (cmd/ingestor/main.go — already updated):
- extractObserverMeta extracts hardware fields from status messages
- battery_mv/uptime_secs cast via math.Round to int on write

Server read side (cmd/server/db.go):
- Observer.BatteryMv: *float64 → *int (matches INTEGER storage)
- Observer.UptimeSecs: *float64 → *int64 (matches INTEGER storage)
- Observer.NoiseFloor: *float64 (unchanged, matches REAL storage)
- GetObservers/GetObserverByID: use sql.NullInt64 intermediaries
  for battery_mv/uptime_secs, sql.NullFloat64 for noise_floor

Proto (proto/observer.proto — already correct):
- battery_mv: int32, uptime_secs: int64, noise_floor: double

Tests:
- TestUpsertObserverWithMeta: verifies correct SQLite types via typeof()
- TestUpsertObserverMetaPreservesExisting: nil-meta preserves values
- TestExtractObserverMeta: float-to-int rounding, empty message
- TestSchemaNoiseFloorIsReal: PRAGMA table_info validation
- TestObserverTypeConsistency: server reads typed values correctly
- TestObserverTypesInGetObservers: list endpoint type consistency

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-28 11:22:14 -07:00
Kpa-clawbot
6d31cb2ad6 feat: add pprof profiling controlled by ENABLE_PPROF env var
Add net/http/pprof support to both Go server (default port 6060) and
ingestor (default port 6061). Profiling is off by default — only
starts the pprof HTTP listener when ENABLE_PPROF=true.

PPROF_PORT env var overrides the default port for each binary.

Enable on staging-go in docker-compose with exposed ports 6060/6061.
Not enabled on prod.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-28 11:18:33 -07:00
Kpa-clawbot
1619f4857e fix: noise_floor/battery_mv/uptime_secs scanned as float64 to handle REAL values
SQLite stores these as REAL on some instances. Go *int scan silently
fails, dropping the entire observer row (404 on detail, missing from list).
Reported for YC-Base-Repeater and YC-Work-Repeater.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-28 11:04:49 -07:00
Kpa-clawbot
58d19ec303 Merge pull request #214 from Kpa-clawbot/fix/sqlite-write-concurrency
Reviewed by Kobayashi — LGTM. Fixes SQLite BUSY contention with busy_timeout + single connection serialization.
2026-03-28 10:13:44 -07:00
15 changed files with 1023 additions and 51 deletions

View File

@@ -35,7 +35,8 @@ type Store struct {
stmtUpsertNode *sql.Stmt stmtUpsertNode *sql.Stmt
stmtIncrementAdvertCount *sql.Stmt stmtIncrementAdvertCount *sql.Stmt
stmtUpsertObserver *sql.Stmt stmtUpsertObserver *sql.Stmt
stmtGetObserverRowid *sql.Stmt stmtGetObserverRowid *sql.Stmt
stmtUpdateNodeTelemetry *sql.Stmt
} }
// OpenStore opens or creates a SQLite DB at the given path, applying the // OpenStore opens or creates a SQLite DB at the given path, applying the
@@ -81,7 +82,9 @@ func applySchema(db *sql.DB) error {
lon REAL, lon REAL,
last_seen TEXT, last_seen TEXT,
first_seen TEXT, first_seen TEXT,
advert_count INTEGER DEFAULT 0 advert_count INTEGER DEFAULT 0,
battery_mv INTEGER,
temperature_c REAL
); );
CREATE TABLE IF NOT EXISTS observers ( CREATE TABLE IF NOT EXISTS observers (
@@ -97,7 +100,7 @@ func applySchema(db *sql.DB) error {
radio TEXT, radio TEXT,
battery_mv INTEGER, battery_mv INTEGER,
uptime_secs INTEGER, uptime_secs INTEGER,
noise_floor INTEGER noise_floor REAL
); );
CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen); CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen);
@@ -111,7 +114,9 @@ func applySchema(db *sql.DB) error {
lon REAL, lon REAL,
last_seen TEXT, last_seen TEXT,
first_seen TEXT, first_seen TEXT,
advert_count INTEGER DEFAULT 0 advert_count INTEGER DEFAULT 0,
battery_mv INTEGER,
temperature_c REAL
); );
CREATE INDEX IF NOT EXISTS idx_inactive_nodes_last_seen ON inactive_nodes(last_seen); CREATE INDEX IF NOT EXISTS idx_inactive_nodes_last_seen ON inactive_nodes(last_seen);
@@ -167,6 +172,25 @@ func applySchema(db *sql.DB) error {
} }
} }
// Create/rebuild packets_v view (v3 schema: observer_idx → observers.rowid)
// The Go server reads this view; without it fresh installs get "no such table: packets_v".
db.Exec(`DROP VIEW IF EXISTS packets_v`)
_, vErr := db.Exec(`
CREATE VIEW packets_v AS
SELECT o.id, t.raw_hex,
datetime(o.timestamp, 'unixepoch') AS timestamp,
obs.id AS observer_id, obs.name AS observer_name,
o.direction, o.snr, o.rssi, o.score, t.hash, t.route_type,
t.payload_type, t.payload_version, o.path_json, t.decoded_json,
t.created_at
FROM observations o
JOIN transmissions t ON t.id = o.transmission_id
LEFT JOIN observers obs ON obs.rowid = o.observer_idx
`)
if vErr != nil {
return fmt.Errorf("packets_v view: %w", vErr)
}
// One-time migration: recalculate advert_count to count unique transmissions only // One-time migration: recalculate advert_count to count unique transmissions only
db.Exec(`CREATE TABLE IF NOT EXISTS _migrations (name TEXT PRIMARY KEY)`) db.Exec(`CREATE TABLE IF NOT EXISTS _migrations (name TEXT PRIMARY KEY)`)
var migDone int var migDone int
@@ -184,6 +208,77 @@ func applySchema(db *sql.DB) error {
log.Println("[migration] advert_count recalculated") log.Println("[migration] advert_count recalculated")
} }
// One-time migration: change noise_floor from INTEGER to REAL affinity.
// SQLite doesn't support ALTER COLUMN, but existing float values are stored
// as REAL regardless of column affinity. New table definition already uses REAL.
// This migration casts any integer-stored noise_floor values to real.
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'noise_floor_real_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Ensuring noise_floor values are stored as REAL...")
db.Exec(`UPDATE observers SET noise_floor = CAST(noise_floor AS REAL) WHERE noise_floor IS NOT NULL AND typeof(noise_floor) = 'integer'`)
db.Exec(`INSERT INTO _migrations (name) VALUES ('noise_floor_real_v1')`)
log.Println("[migration] noise_floor migration complete")
}
// One-time migration: add telemetry columns to nodes and inactive_nodes tables.
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'node_telemetry_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Adding telemetry columns to nodes/inactive_nodes...")
// checkAndAddColumn checks whether `column` already exists in `table`
// using PRAGMA table_info, and adds it if missing. All call sites pass
// hardcoded table/column/type literals so there is no SQL injection risk.
checkAndAddColumn := func(table, column, colType string) error {
rows, err := db.Query(fmt.Sprintf("PRAGMA table_info(%s)", table))
if err != nil {
return fmt.Errorf("querying table info for %s: %w", table, err)
}
defer rows.Close()
exists := false
for rows.Next() {
var cid int
var name, ctype string
var notnull, pk int
var dfltValue sql.NullString
if err := rows.Scan(&cid, &name, &ctype, &notnull, &dfltValue, &pk); err != nil {
return fmt.Errorf("scanning table info for %s: %w", table, err)
}
if name == column {
exists = true
break
}
}
if err := rows.Err(); err != nil {
return fmt.Errorf("iterating table info for %s: %w", table, err)
}
if exists {
return nil
}
if _, err := db.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN %s %s", table, column, colType)); err != nil {
return fmt.Errorf("adding column %s to %s: %w", column, table, err)
}
return nil
}
if err := checkAndAddColumn("nodes", "battery_mv", "INTEGER"); err != nil {
return err
}
if err := checkAndAddColumn("nodes", "temperature_c", "REAL"); err != nil {
return err
}
if err := checkAndAddColumn("inactive_nodes", "battery_mv", "INTEGER"); err != nil {
return err
}
if err := checkAndAddColumn("inactive_nodes", "temperature_c", "REAL"); err != nil {
return err
}
if _, err := db.Exec(`INSERT INTO _migrations (name) VALUES ('node_telemetry_v1')`); err != nil {
return fmt.Errorf("recording node_telemetry_v1 migration: %w", err)
}
log.Println("[migration] node telemetry columns added")
}
return nil return nil
} }
@@ -238,13 +333,16 @@ func (s *Store) prepareStatements() error {
} }
s.stmtUpsertObserver, err = s.db.Prepare(` s.stmtUpsertObserver, err = s.db.Prepare(`
INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count) INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count, battery_mv, uptime_secs, noise_floor)
VALUES (?, ?, ?, ?, ?, 1) VALUES (?, ?, ?, ?, ?, 1, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET ON CONFLICT(id) DO UPDATE SET
name = COALESCE(?, name), name = COALESCE(?, name),
iata = COALESCE(?, iata), iata = COALESCE(?, iata),
last_seen = ?, last_seen = ?,
packet_count = packet_count + 1 packet_count = packet_count + 1,
battery_mv = COALESCE(?, battery_mv),
uptime_secs = COALESCE(?, uptime_secs),
noise_floor = COALESCE(?, noise_floor)
`) `)
if err != nil { if err != nil {
return err return err
@@ -255,6 +353,16 @@ func (s *Store) prepareStatements() error {
return err return err
} }
s.stmtUpdateNodeTelemetry, err = s.db.Prepare(`
UPDATE nodes SET
battery_mv = COALESCE(?, battery_mv),
temperature_c = COALESCE(?, temperature_c)
WHERE public_key = ?
`)
if err != nil {
return err
}
return nil return nil
} }
@@ -359,12 +467,49 @@ func (s *Store) IncrementAdvertCount(pubKey string) error {
return err return err
} }
// UpsertObserver inserts or updates an observer. // UpdateNodeTelemetry updates battery and temperature for a node.
func (s *Store) UpsertObserver(id, name, iata string) error { func (s *Store) UpdateNodeTelemetry(pubKey string, batteryMv *int, temperatureC *float64) error {
var bv, tc interface{}
if batteryMv != nil {
bv = *batteryMv
}
if temperatureC != nil {
tc = *temperatureC
}
_, err := s.stmtUpdateNodeTelemetry.Exec(bv, tc, pubKey)
if err != nil {
s.Stats.WriteErrors.Add(1)
}
return err
}
// ObserverMeta holds optional observer hardware metadata.
type ObserverMeta struct {
BatteryMv *int // millivolts, always integer
UptimeSecs *int64 // seconds, always integer
NoiseFloor *float64 // dBm, may have decimals
}
// UpsertObserver inserts or updates an observer with optional hardware metadata.
func (s *Store) UpsertObserver(id, name, iata string, meta *ObserverMeta) error {
now := time.Now().UTC().Format(time.RFC3339) now := time.Now().UTC().Format(time.RFC3339)
var batteryMv, uptimeSecs, noiseFloor interface{}
if meta != nil {
if meta.BatteryMv != nil {
batteryMv = *meta.BatteryMv
}
if meta.UptimeSecs != nil {
uptimeSecs = *meta.UptimeSecs
}
if meta.NoiseFloor != nil {
noiseFloor = *meta.NoiseFloor
}
}
_, err := s.stmtUpsertObserver.Exec( _, err := s.stmtUpsertObserver.Exec(
id, name, iata, now, now, id, name, iata, now, now, batteryMv, uptimeSecs, noiseFloor,
name, iata, now, name, iata, now, batteryMv, uptimeSecs, noiseFloor,
) )
if err != nil { if err != nil {
s.Stats.WriteErrors.Add(1) s.Stats.WriteErrors.Add(1)

View File

@@ -1,6 +1,7 @@
package main package main
import ( import (
"database/sql"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
@@ -62,6 +63,16 @@ func TestOpenStore(t *testing.T) {
t.Errorf("missing table %s, got %v", e, tables) t.Errorf("missing table %s, got %v", e, tables)
} }
} }
// Verify packets_v view exists
var viewCount int
err = s.db.QueryRow("SELECT COUNT(*) FROM sqlite_master WHERE type='view' AND name='packets_v'").Scan(&viewCount)
if err != nil {
t.Fatal(err)
}
if viewCount != 1 {
t.Error("packets_v view not created")
}
} }
func TestInsertTransmission(t *testing.T) { func TestInsertTransmission(t *testing.T) {
@@ -114,6 +125,54 @@ func TestInsertTransmission(t *testing.T) {
} }
} }
func TestPacketsViewQueryable(t *testing.T) {
s, err := OpenStore(tempDBPath(t))
if err != nil {
t.Fatal(err)
}
defer s.Close()
// Insert observer so the LEFT JOIN resolves
if err := s.UpsertObserver("obs1", "TestObserver", "SJC", nil); err != nil {
t.Fatal(err)
}
snr := 3.5
rssi := -95.0
data := &PacketData{
RawHex: "AABB",
Timestamp: "2026-01-01T00:00:00Z",
ObserverID: "obs1",
Hash: "viewtesthash",
RouteType: 1,
PayloadType: 4,
PathJSON: "[]",
DecodedJSON: `{"type":"ADVERT"}`,
SNR: &snr,
RSSI: &rssi,
}
if _, err := s.InsertTransmission(data); err != nil {
t.Fatal(err)
}
// Query through packets_v — the view the Go server relies on
var obsID, obsName sql.NullString
var hash string
err = s.db.QueryRow("SELECT observer_id, observer_name, hash FROM packets_v LIMIT 1").Scan(&obsID, &obsName, &hash)
if err != nil {
t.Fatalf("packets_v query failed: %v", err)
}
if hash != "viewtesthash" {
t.Errorf("hash=%s, want viewtesthash", hash)
}
if !obsID.Valid || obsID.String != "obs1" {
t.Errorf("observer_id=%v, want obs1", obsID)
}
if !obsName.Valid || obsName.String != "TestObserver" {
t.Errorf("observer_name=%v, want TestObserver", obsName)
}
}
func TestUpsertNode(t *testing.T) { func TestUpsertNode(t *testing.T) {
s, err := OpenStore(tempDBPath(t)) s, err := OpenStore(tempDBPath(t))
if err != nil { if err != nil {
@@ -160,7 +219,7 @@ func TestUpsertObserver(t *testing.T) {
} }
defer s.Close() defer s.Close()
if err := s.UpsertObserver("obs1", "Observer1", "SJC"); err != nil { if err := s.UpsertObserver("obs1", "Observer1", "SJC", nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -174,6 +233,165 @@ func TestUpsertObserver(t *testing.T) {
} }
} }
func TestUpsertObserverWithMeta(t *testing.T) {
s, err := OpenStore(tempDBPath(t))
if err != nil {
t.Fatal(err)
}
defer s.Close()
battery := 3500
uptime := int64(86400)
noise := -115.5
meta := &ObserverMeta{
BatteryMv: &battery,
UptimeSecs: &uptime,
NoiseFloor: &noise,
}
if err := s.UpsertObserver("obs1", "Observer1", "SJC", meta); err != nil {
t.Fatal(err)
}
// Verify correct types in DB
var batteryMv int
var uptimeSecs int64
var noiseFloor float64
err = s.db.QueryRow("SELECT battery_mv, uptime_secs, noise_floor FROM observers WHERE id = 'obs1'").
Scan(&batteryMv, &uptimeSecs, &noiseFloor)
if err != nil {
t.Fatal(err)
}
if batteryMv != 3500 {
t.Errorf("battery_mv=%d, want 3500", batteryMv)
}
if uptimeSecs != 86400 {
t.Errorf("uptime_secs=%d, want 86400", uptimeSecs)
}
if noiseFloor != -115.5 {
t.Errorf("noise_floor=%f, want -115.5", noiseFloor)
}
// Verify typeof returns correct SQLite types
var typBattery, typUptime, typNoise string
s.db.QueryRow("SELECT typeof(battery_mv), typeof(uptime_secs), typeof(noise_floor) FROM observers WHERE id = 'obs1'").
Scan(&typBattery, &typUptime, &typNoise)
if typBattery != "integer" {
t.Errorf("typeof(battery_mv)=%s, want integer", typBattery)
}
if typUptime != "integer" {
t.Errorf("typeof(uptime_secs)=%s, want integer", typUptime)
}
if typNoise != "real" {
t.Errorf("typeof(noise_floor)=%s, want real", typNoise)
}
}
func TestUpsertObserverMetaPreservesExisting(t *testing.T) {
s, err := OpenStore(tempDBPath(t))
if err != nil {
t.Fatal(err)
}
defer s.Close()
// First upsert with metadata
battery := 3500
noise := -115.5
meta := &ObserverMeta{
BatteryMv: &battery,
NoiseFloor: &noise,
}
if err := s.UpsertObserver("obs1", "Observer1", "SJC", meta); err != nil {
t.Fatal(err)
}
// Second upsert without metadata — should preserve existing values
if err := s.UpsertObserver("obs1", "Observer1", "SJC", nil); err != nil {
t.Fatal(err)
}
var batteryMv int
var noiseFloor float64
s.db.QueryRow("SELECT battery_mv, noise_floor FROM observers WHERE id = 'obs1'").
Scan(&batteryMv, &noiseFloor)
if batteryMv != 3500 {
t.Errorf("battery_mv=%d after nil-meta upsert, want 3500 (preserved)", batteryMv)
}
if noiseFloor != -115.5 {
t.Errorf("noise_floor=%f after nil-meta upsert, want -115.5 (preserved)", noiseFloor)
}
}
func TestExtractObserverMeta(t *testing.T) {
// Float values from JSON (typical MQTT payload)
msg := map[string]interface{}{
"battery_mv": 3500.0,
"uptime_secs": 86400.0,
"noise_floor": -115.5,
}
meta := extractObserverMeta(msg)
if meta == nil {
t.Fatal("expected non-nil meta")
}
if meta.BatteryMv == nil || *meta.BatteryMv != 3500 {
t.Errorf("BatteryMv=%v, want 3500", meta.BatteryMv)
}
if meta.UptimeSecs == nil || *meta.UptimeSecs != 86400 {
t.Errorf("UptimeSecs=%v, want 86400", meta.UptimeSecs)
}
if meta.NoiseFloor == nil || *meta.NoiseFloor != -115.5 {
t.Errorf("NoiseFloor=%v, want -115.5", meta.NoiseFloor)
}
// Battery with fractional part should round
msg2 := map[string]interface{}{
"battery_mv": 3500.7,
}
meta2 := extractObserverMeta(msg2)
if meta2 == nil || meta2.BatteryMv == nil || *meta2.BatteryMv != 3501 {
t.Errorf("battery_mv rounding: got %v, want 3501", meta2)
}
// Empty message → nil
meta3 := extractObserverMeta(map[string]interface{}{})
if meta3 != nil {
t.Errorf("expected nil for empty message, got %v", meta3)
}
}
func TestSchemaNoiseFloorIsReal(t *testing.T) {
s, err := OpenStore(tempDBPath(t))
if err != nil {
t.Fatal(err)
}
defer s.Close()
// Check column type affinity via PRAGMA
rows, err := s.db.Query("PRAGMA table_info(observers)")
if err != nil {
t.Fatal(err)
}
defer rows.Close()
for rows.Next() {
var cid int
var colName, colType string
var notNull, pk int
var dflt interface{}
if rows.Scan(&cid, &colName, &colType, &notNull, &dflt, &pk) == nil {
if colName == "noise_floor" && colType != "REAL" {
t.Errorf("noise_floor column type=%s, want REAL", colType)
}
if colName == "battery_mv" && colType != "INTEGER" {
t.Errorf("battery_mv column type=%s, want INTEGER", colType)
}
if colName == "uptime_secs" && colType != "INTEGER" {
t.Errorf("uptime_secs column type=%s, want INTEGER", colType)
}
}
}
}
func TestInsertTransmissionWithObserver(t *testing.T) { func TestInsertTransmissionWithObserver(t *testing.T) {
s, err := OpenStore(tempDBPath(t)) s, err := OpenStore(tempDBPath(t))
if err != nil { if err != nil {
@@ -182,7 +400,7 @@ func TestInsertTransmissionWithObserver(t *testing.T) {
defer s.Close() defer s.Close()
// Insert observer first // Insert observer first
if err := s.UpsertObserver("obs1", "Observer1", "SJC"); err != nil { if err := s.UpsertObserver("obs1", "Observer1", "SJC", nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -639,7 +857,7 @@ func TestConcurrentWrites(t *testing.T) {
defer s.Close() defer s.Close()
// Pre-create an observer for observer_idx resolution // Pre-create an observer for observer_idx resolution
if err := s.UpsertObserver("obs1", "Observer1", "SJC"); err != nil { if err := s.UpsertObserver("obs1", "Observer1", "SJC", nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -681,7 +899,7 @@ func TestConcurrentWrites(t *testing.T) {
return return
} }
obsID := fmt.Sprintf("obs_%d_%d__________", gIdx, i) obsID := fmt.Sprintf("obs_%d_%d__________", gIdx, i)
if err := s.UpsertObserver(obsID[:16], "Obs", "SJC"); err != nil { if err := s.UpsertObserver(obsID[:16], "Obs", "SJC", nil); err != nil {
errCh <- fmt.Errorf("goroutine %d observer upsert %d: %w", gIdx, i, err) errCh <- fmt.Errorf("goroutine %d observer upsert %d: %w", gIdx, i, err)
return return
} }
@@ -782,7 +1000,7 @@ func TestDBStats(t *testing.T) {
} }
// Observer upsert // Observer upsert
if err := s.UpsertObserver("obs1", "Obs1", "SJC"); err != nil { if err := s.UpsertObserver("obs1", "Obs1", "SJC", nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if s.Stats.ObserverUpserts.Load() != 1 { if s.Stats.ObserverUpserts.Load() != 1 {
@@ -801,7 +1019,7 @@ func TestLoadTestThroughput(t *testing.T) {
defer s.Close() defer s.Close()
// Pre-create observer // Pre-create observer
if err := s.UpsertObserver("obs1", "Observer1", "SJC"); err != nil { if err := s.UpsertObserver("obs1", "Observer1", "SJC", nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -867,7 +1085,7 @@ func TestLoadTestThroughput(t *testing.T) {
} }
obsID := fmt.Sprintf("obs_%04d_%04d_____", gIdx, i) obsID := fmt.Sprintf("obs_%04d_%04d_____", gIdx, i)
if err := s.UpsertObserver(obsID[:16], "Obs", "SJC"); err != nil { if err := s.UpsertObserver(obsID[:16], "Obs", "SJC", nil); err != nil {
totalErrors.Add(1) totalErrors.Add(1)
if strings.Contains(err.Error(), "locked") || strings.Contains(err.Error(), "BUSY") { if strings.Contains(err.Error(), "locked") || strings.Contains(err.Error(), "BUSY") {
busyErrors.Add(1) busyErrors.Add(1)
@@ -933,3 +1151,75 @@ func TestLoadTestThroughput(t *testing.T) {
t.Errorf("transmissions=%d, want %d", txCount, totalMessages) t.Errorf("transmissions=%d, want %d", txCount, totalMessages)
} }
} }
func TestUpdateNodeTelemetry(t *testing.T) {
s, err := OpenStore(tempDBPath(t))
if err != nil {
t.Fatal(err)
}
defer s.Close()
lat := 37.0
lon := -122.0
if err := s.UpsertNode("telem1", "TelemetryNode", "sensor", &lat, &lon, "2026-03-25T00:00:00Z"); err != nil {
t.Fatal(err)
}
battery := 3700
temp := 28.5
if err := s.UpdateNodeTelemetry("telem1", &battery, &temp); err != nil {
t.Fatal(err)
}
var bv int
var tc float64
err = s.db.QueryRow("SELECT battery_mv, temperature_c FROM nodes WHERE public_key = 'telem1'").Scan(&bv, &tc)
if err != nil {
t.Fatal(err)
}
if bv != 3700 {
t.Errorf("battery_mv=%d, want 3700", bv)
}
if tc != 28.5 {
t.Errorf("temperature_c=%f, want 28.5", tc)
}
newTemp := -5.0
if err := s.UpdateNodeTelemetry("telem1", nil, &newTemp); err != nil {
t.Fatal(err)
}
err = s.db.QueryRow("SELECT battery_mv, temperature_c FROM nodes WHERE public_key = 'telem1'").Scan(&bv, &tc)
if err != nil {
t.Fatal(err)
}
if bv != 3700 {
t.Errorf("battery_mv after nil update=%d, want 3700 (preserved)", bv)
}
if tc != -5.0 {
t.Errorf("temperature_c after update=%f, want -5.0", tc)
}
}
func TestTelemetryMigrationAddsColumns(t *testing.T) {
s, err := OpenStore(tempDBPath(t))
if err != nil {
t.Fatal(err)
}
defer s.Close()
_, err = s.db.Exec("SELECT battery_mv, temperature_c FROM nodes LIMIT 1")
if err != nil {
t.Errorf("nodes table should have battery_mv and temperature_c columns: %v", err)
}
_, err = s.db.Exec("SELECT battery_mv, temperature_c FROM inactive_nodes LIMIT 1")
if err != nil {
t.Errorf("inactive_nodes table should have battery_mv and temperature_c columns: %v", err)
}
var count int
s.db.QueryRow("SELECT COUNT(*) FROM _migrations WHERE name = 'node_telemetry_v1'").Scan(&count)
if count != 1 {
t.Errorf("migration node_telemetry_v1 should be recorded, count=%d", count)
}
}

View File

@@ -111,6 +111,8 @@ type Payload struct {
Lat *float64 `json:"lat,omitempty"` Lat *float64 `json:"lat,omitempty"`
Lon *float64 `json:"lon,omitempty"` Lon *float64 `json:"lon,omitempty"`
Name string `json:"name,omitempty"` Name string `json:"name,omitempty"`
BatteryMv *int `json:"battery_mv,omitempty"`
TemperatureC *float64 `json:"temperature_c,omitempty"`
ChannelHash int `json:"channelHash,omitempty"` ChannelHash int `json:"channelHash,omitempty"`
ChannelHashHex string `json:"channelHashHex,omitempty"` ChannelHashHex string `json:"channelHashHex,omitempty"`
DecryptionStatus string `json:"decryptionStatus,omitempty"` DecryptionStatus string `json:"decryptionStatus,omitempty"`
@@ -251,10 +253,37 @@ func decodeAdvert(buf []byte) Payload {
off += 8 off += 8
} }
if p.Flags.HasName { if p.Flags.HasName {
name := string(appdata[off:]) // Find null terminator to separate name from trailing telemetry bytes
name = strings.TrimRight(name, "\x00") nameEnd := len(appdata)
for i := off; i < len(appdata); i++ {
if appdata[i] == 0x00 {
nameEnd = i
break
}
}
name := string(appdata[off:nameEnd])
name = sanitizeName(name) name = sanitizeName(name)
p.Name = name p.Name = name
off = nameEnd
// Skip null terminator(s)
for off < len(appdata) && appdata[off] == 0x00 {
off++
}
}
// Telemetry bytes after name: battery_mv(2 LE) + temperature_c(2 LE, signed, /100)
// Only sensor nodes (advType=4) carry telemetry bytes.
if p.Flags.Sensor && off+4 <= len(appdata) {
batteryMv := int(binary.LittleEndian.Uint16(appdata[off : off+2]))
tempRaw := int16(binary.LittleEndian.Uint16(appdata[off+2 : off+4]))
tempC := float64(tempRaw) / 100.0
if batteryMv > 0 && batteryMv <= 10000 {
p.BatteryMv = &batteryMv
}
// Raw int16 / 100 → °C; accept -50°C to 100°C (raw: -5000 to 10000)
if tempRaw >= -5000 && tempRaw <= 10000 {
p.TemperatureC = &tempC
}
} }
} }

View File

@@ -1355,3 +1355,154 @@ func TestDecodeGrpTxtGarbageMarkedFailed(t *testing.T) {
t.Errorf("type=%s, want GRP_TXT", p.Type) t.Errorf("type=%s, want GRP_TXT", p.Type)
} }
} }
func TestDecodeAdvertWithTelemetry(t *testing.T) {
pubkey := strings.Repeat("AA", 32)
timestamp := "78563412"
signature := strings.Repeat("BB", 64)
flags := "94" // sensor(4) | hasLocation(0x10) | hasName(0x80)
lat := "40933402"
lon := "E0E6B8F8"
name := hex.EncodeToString([]byte("Sensor1"))
nullTerm := "00"
batteryLE := make([]byte, 2)
binary.LittleEndian.PutUint16(batteryLE, 3700)
tempLE := make([]byte, 2)
binary.LittleEndian.PutUint16(tempLE, uint16(int16(2850)))
hexStr := "1200" + pubkey + timestamp + signature + flags + lat + lon +
name + nullTerm +
hex.EncodeToString(batteryLE) + hex.EncodeToString(tempLE)
pkt, err := DecodePacket(hexStr, nil)
if err != nil {
t.Fatal(err)
}
if pkt.Payload.Name != "Sensor1" {
t.Errorf("name=%s, want Sensor1", pkt.Payload.Name)
}
if pkt.Payload.BatteryMv == nil {
t.Fatal("battery_mv should not be nil")
}
if *pkt.Payload.BatteryMv != 3700 {
t.Errorf("battery_mv=%d, want 3700", *pkt.Payload.BatteryMv)
}
if pkt.Payload.TemperatureC == nil {
t.Fatal("temperature_c should not be nil")
}
if math.Abs(*pkt.Payload.TemperatureC-28.50) > 0.01 {
t.Errorf("temperature_c=%f, want 28.50", *pkt.Payload.TemperatureC)
}
}
func TestDecodeAdvertWithTelemetryNegativeTemp(t *testing.T) {
pubkey := strings.Repeat("CC", 32)
timestamp := "00000000"
signature := strings.Repeat("DD", 64)
flags := "84" // sensor(4) | hasName(0x80), no location
name := hex.EncodeToString([]byte("Cold"))
nullTerm := "00"
batteryLE := make([]byte, 2)
binary.LittleEndian.PutUint16(batteryLE, 4200)
tempLE := make([]byte, 2)
var negTemp int16 = -550
binary.LittleEndian.PutUint16(tempLE, uint16(negTemp))
hexStr := "1200" + pubkey + timestamp + signature + flags +
name + nullTerm +
hex.EncodeToString(batteryLE) + hex.EncodeToString(tempLE)
pkt, err := DecodePacket(hexStr, nil)
if err != nil {
t.Fatal(err)
}
if pkt.Payload.Name != "Cold" {
t.Errorf("name=%s, want Cold", pkt.Payload.Name)
}
if pkt.Payload.BatteryMv == nil || *pkt.Payload.BatteryMv != 4200 {
t.Errorf("battery_mv=%v, want 4200", pkt.Payload.BatteryMv)
}
if pkt.Payload.TemperatureC == nil {
t.Fatal("temperature_c should not be nil")
}
if math.Abs(*pkt.Payload.TemperatureC-(-5.50)) > 0.01 {
t.Errorf("temperature_c=%f, want -5.50", *pkt.Payload.TemperatureC)
}
}
func TestDecodeAdvertWithoutTelemetry(t *testing.T) {
pubkey := strings.Repeat("EE", 32)
timestamp := "00000000"
signature := strings.Repeat("FF", 64)
flags := "82" // repeater(2) | hasName(0x80)
name := hex.EncodeToString([]byte("Node1"))
hexStr := "1200" + pubkey + timestamp + signature + flags + name
pkt, err := DecodePacket(hexStr, nil)
if err != nil {
t.Fatal(err)
}
if pkt.Payload.Name != "Node1" {
t.Errorf("name=%s, want Node1", pkt.Payload.Name)
}
if pkt.Payload.BatteryMv != nil {
t.Errorf("battery_mv should be nil for advert without telemetry, got %d", *pkt.Payload.BatteryMv)
}
if pkt.Payload.TemperatureC != nil {
t.Errorf("temperature_c should be nil for advert without telemetry, got %f", *pkt.Payload.TemperatureC)
}
}
func TestDecodeAdvertNonSensorIgnoresTelemetryBytes(t *testing.T) {
// A repeater node with 4 trailing bytes after the name should NOT decode telemetry.
pubkey := strings.Repeat("AB", 32)
timestamp := "00000000"
signature := strings.Repeat("CD", 64)
flags := "82" // repeater(2) | hasName(0x80)
name := hex.EncodeToString([]byte("Rptr"))
nullTerm := "00"
extraBytes := "B40ED403" // battery-like and temp-like bytes
hexStr := "1200" + pubkey + timestamp + signature + flags + name + nullTerm + extraBytes
pkt, err := DecodePacket(hexStr, nil)
if err != nil {
t.Fatal(err)
}
if pkt.Payload.BatteryMv != nil {
t.Errorf("battery_mv should be nil for non-sensor node, got %d", *pkt.Payload.BatteryMv)
}
if pkt.Payload.TemperatureC != nil {
t.Errorf("temperature_c should be nil for non-sensor node, got %f", *pkt.Payload.TemperatureC)
}
}
func TestDecodeAdvertTelemetryZeroTemp(t *testing.T) {
// 0°C is a valid temperature and must be emitted.
pubkey := strings.Repeat("12", 32)
timestamp := "00000000"
signature := strings.Repeat("34", 64)
flags := "84" // sensor(4) | hasName(0x80)
name := hex.EncodeToString([]byte("FreezeSensor"))
nullTerm := "00"
batteryLE := make([]byte, 2)
binary.LittleEndian.PutUint16(batteryLE, 3600)
tempLE := make([]byte, 2) // tempRaw=0 → 0°C
hexStr := "1200" + pubkey + timestamp + signature + flags +
name + nullTerm +
hex.EncodeToString(batteryLE) + hex.EncodeToString(tempLE)
pkt, err := DecodePacket(hexStr, nil)
if err != nil {
t.Fatal(err)
}
if pkt.Payload.TemperatureC == nil {
t.Fatal("temperature_c should not be nil for 0°C")
}
if *pkt.Payload.TemperatureC != 0.0 {
t.Errorf("temperature_c=%f, want 0.0", *pkt.Payload.TemperatureC)
}
}

View File

@@ -8,6 +8,9 @@ import (
"flag" "flag"
"fmt" "fmt"
"log" "log"
"math"
"net/http"
_ "net/http/pprof"
"os" "os"
"os/signal" "os/signal"
"path/filepath" "path/filepath"
@@ -19,6 +22,18 @@ import (
) )
func main() { func main() {
// pprof profiling — off by default, enable with ENABLE_PPROF=true
if os.Getenv("ENABLE_PPROF") == "true" {
pprofPort := os.Getenv("PPROF_PORT")
if pprofPort == "" {
pprofPort = "6061"
}
go func() {
log.Printf("[pprof] ingestor profiling at http://localhost:%s/debug/pprof/", pprofPort)
log.Fatal(http.ListenAndServe(":"+pprofPort, nil))
}()
}
configPath := flag.String("config", "config.json", "path to config file") configPath := flag.String("config", "config.json", "path to config file")
flag.Parse() flag.Parse()
@@ -193,7 +208,8 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message,
observerID := parts[2] observerID := parts[2]
name, _ := msg["origin"].(string) name, _ := msg["origin"].(string)
iata := parts[1] iata := parts[1]
if err := store.UpsertObserver(observerID, name, iata); err != nil { meta := extractObserverMeta(msg)
if err := store.UpsertObserver(observerID, name, iata, meta); err != nil {
log.Printf("MQTT [%s] observer status error: %v", tag, err) log.Printf("MQTT [%s] observer status error: %v", tag, err)
} }
log.Printf("MQTT [%s] status: %s (%s)", tag, firstNonEmpty(name, observerID), iata) log.Printf("MQTT [%s] status: %s (%s)", tag, firstNonEmpty(name, observerID), iata)
@@ -252,6 +268,12 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message,
log.Printf("MQTT [%s] advert count error: %v", tag, err) log.Printf("MQTT [%s] advert count error: %v", tag, err)
} }
} }
// Update telemetry if present in advert
if decoded.Payload.BatteryMv != nil || decoded.Payload.TemperatureC != nil {
if err := store.UpdateNodeTelemetry(decoded.Payload.PubKey, decoded.Payload.BatteryMv, decoded.Payload.TemperatureC); err != nil {
log.Printf("MQTT [%s] node telemetry update error: %v", tag, err)
}
}
} else { } else {
log.Printf("MQTT [%s] skipping corrupted ADVERT: %s", tag, reason) log.Printf("MQTT [%s] skipping corrupted ADVERT: %s", tag, reason)
} }
@@ -260,7 +282,7 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message,
// Upsert observer // Upsert observer
if observerID != "" { if observerID != "" {
origin, _ := msg["origin"].(string) origin, _ := msg["origin"].(string)
if err := store.UpsertObserver(observerID, origin, region); err != nil { if err := store.UpsertObserver(observerID, origin, region, nil); err != nil {
log.Printf("MQTT [%s] observer upsert error: %v", tag, err) log.Printf("MQTT [%s] observer upsert error: %v", tag, err)
} }
} }
@@ -446,6 +468,39 @@ func toFloat64(v interface{}) (float64, bool) {
} }
} }
// extractObserverMeta extracts hardware metadata from an MQTT status message.
// Casts battery_mv and uptime_secs to integers (they're always whole numbers).
func extractObserverMeta(msg map[string]interface{}) *ObserverMeta {
meta := &ObserverMeta{}
hasData := false
if v, ok := msg["battery_mv"]; ok {
if f, ok := toFloat64(v); ok {
iv := int(math.Round(f))
meta.BatteryMv = &iv
hasData = true
}
}
if v, ok := msg["uptime_secs"]; ok {
if f, ok := toFloat64(v); ok {
iv := int64(math.Round(f))
meta.UptimeSecs = &iv
hasData = true
}
}
if v, ok := msg["noise_floor"]; ok {
if f, ok := toFloat64(v); ok {
meta.NoiseFloor = &f
hasData = true
}
}
if !hasData {
return nil
}
return meta
}
func firstNonEmpty(vals ...string) string { func firstNonEmpty(vals ...string) string {
for _, v := range vals { for _, v := range vals {
if v != "" { if v != "" {

View File

@@ -26,12 +26,13 @@ func setupTestDBv2(t *testing.T) *DB {
schema := ` schema := `
CREATE TABLE nodes ( CREATE TABLE nodes (
public_key TEXT PRIMARY KEY, name TEXT, role TEXT, public_key TEXT PRIMARY KEY, name TEXT, role TEXT,
lat REAL, lon REAL, last_seen TEXT, first_seen TEXT, advert_count INTEGER DEFAULT 0 lat REAL, lon REAL, last_seen TEXT, first_seen TEXT, advert_count INTEGER DEFAULT 0,
battery_mv INTEGER, temperature_c REAL
); );
CREATE TABLE observers ( CREATE TABLE observers (
id TEXT PRIMARY KEY, name TEXT, iata TEXT, last_seen TEXT, first_seen TEXT, id TEXT PRIMARY KEY, name TEXT, iata TEXT, last_seen TEXT, first_seen TEXT,
packet_count INTEGER DEFAULT 0, model TEXT, firmware TEXT, packet_count INTEGER DEFAULT 0, model TEXT, firmware TEXT,
client_version TEXT, radio TEXT, battery_mv INTEGER, uptime_secs INTEGER, noise_floor INTEGER client_version TEXT, radio TEXT, battery_mv INTEGER, uptime_secs INTEGER, noise_floor REAL
); );
CREATE TABLE transmissions ( CREATE TABLE transmissions (
id INTEGER PRIMARY KEY AUTOINCREMENT, raw_hex TEXT NOT NULL, id INTEGER PRIMARY KEY AUTOINCREMENT, raw_hex TEXT NOT NULL,

View File

@@ -128,23 +128,25 @@ type Node struct {
LastSeen *string `json:"last_seen"` LastSeen *string `json:"last_seen"`
FirstSeen *string `json:"first_seen"` FirstSeen *string `json:"first_seen"`
AdvertCount int `json:"advert_count"` AdvertCount int `json:"advert_count"`
BatteryMv *int `json:"battery_mv"`
TemperatureC *float64 `json:"temperature_c"`
} }
// Observer represents a row from the observers table. // Observer represents a row from the observers table.
type Observer struct { type Observer struct {
ID string `json:"id"` ID string `json:"id"`
Name *string `json:"name"` Name *string `json:"name"`
IATA *string `json:"iata"` IATA *string `json:"iata"`
LastSeen *string `json:"last_seen"` LastSeen *string `json:"last_seen"`
FirstSeen *string `json:"first_seen"` FirstSeen *string `json:"first_seen"`
PacketCount int `json:"packet_count"` PacketCount int `json:"packet_count"`
Model *string `json:"model"` Model *string `json:"model"`
Firmware *string `json:"firmware"` Firmware *string `json:"firmware"`
ClientVersion *string `json:"client_version"` ClientVersion *string `json:"client_version"`
Radio *string `json:"radio"` Radio *string `json:"radio"`
BatteryMv *int `json:"battery_mv"` BatteryMv *int `json:"battery_mv"`
UptimeSecs *int `json:"uptime_secs"` UptimeSecs *int64 `json:"uptime_secs"`
NoiseFloor *int `json:"noise_floor"` NoiseFloor *float64 `json:"noise_floor"`
} }
// Transmission represents a row from the transmissions table. // Transmission represents a row from the transmissions table.
@@ -739,7 +741,7 @@ func (db *DB) GetNodes(limit, offset int, role, search, before, lastHeard, sortB
var total int var total int
db.conn.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM nodes %s", w), args...).Scan(&total) db.conn.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM nodes %s", w), args...).Scan(&total)
querySQL := fmt.Sprintf("SELECT public_key, name, role, lat, lon, last_seen, first_seen, advert_count FROM nodes %s ORDER BY %s LIMIT ? OFFSET ?", w, order) querySQL := fmt.Sprintf("SELECT public_key, name, role, lat, lon, last_seen, first_seen, advert_count, battery_mv, temperature_c FROM nodes %s ORDER BY %s LIMIT ? OFFSET ?", w, order)
qArgs := append(args, limit, offset) qArgs := append(args, limit, offset)
rows, err := db.conn.Query(querySQL, qArgs...) rows, err := db.conn.Query(querySQL, qArgs...)
@@ -765,7 +767,7 @@ func (db *DB) SearchNodes(query string, limit int) ([]map[string]interface{}, er
if limit <= 0 { if limit <= 0 {
limit = 10 limit = 10
} }
rows, err := db.conn.Query(`SELECT public_key, name, role, lat, lon, last_seen, first_seen, advert_count rows, err := db.conn.Query(`SELECT public_key, name, role, lat, lon, last_seen, first_seen, advert_count, battery_mv, temperature_c
FROM nodes WHERE name LIKE ? OR public_key LIKE ? ORDER BY last_seen DESC LIMIT ?`, FROM nodes WHERE name LIKE ? OR public_key LIKE ? ORDER BY last_seen DESC LIMIT ?`,
"%"+query+"%", query+"%", limit) "%"+query+"%", query+"%", limit)
if err != nil { if err != nil {
@@ -785,7 +787,7 @@ func (db *DB) SearchNodes(query string, limit int) ([]map[string]interface{}, er
// GetNodeByPubkey returns a single node. // GetNodeByPubkey returns a single node.
func (db *DB) GetNodeByPubkey(pubkey string) (map[string]interface{}, error) { func (db *DB) GetNodeByPubkey(pubkey string) (map[string]interface{}, error) {
rows, err := db.conn.Query("SELECT public_key, name, role, lat, lon, last_seen, first_seen, advert_count FROM nodes WHERE public_key = ?", pubkey) rows, err := db.conn.Query("SELECT public_key, name, role, lat, lon, last_seen, first_seen, advert_count, battery_mv, temperature_c FROM nodes WHERE public_key = ?", pubkey)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -958,9 +960,21 @@ func (db *DB) GetObservers() ([]Observer, error) {
var observers []Observer var observers []Observer
for rows.Next() { for rows.Next() {
var o Observer var o Observer
if err := rows.Scan(&o.ID, &o.Name, &o.IATA, &o.LastSeen, &o.FirstSeen, &o.PacketCount, &o.Model, &o.Firmware, &o.ClientVersion, &o.Radio, &o.BatteryMv, &o.UptimeSecs, &o.NoiseFloor); err != nil { var batteryMv, uptimeSecs sql.NullInt64
var noiseFloor sql.NullFloat64
if err := rows.Scan(&o.ID, &o.Name, &o.IATA, &o.LastSeen, &o.FirstSeen, &o.PacketCount, &o.Model, &o.Firmware, &o.ClientVersion, &o.Radio, &batteryMv, &uptimeSecs, &noiseFloor); err != nil {
continue continue
} }
if batteryMv.Valid {
v := int(batteryMv.Int64)
o.BatteryMv = &v
}
if uptimeSecs.Valid {
o.UptimeSecs = &uptimeSecs.Int64
}
if noiseFloor.Valid {
o.NoiseFloor = &noiseFloor.Float64
}
observers = append(observers, o) observers = append(observers, o)
} }
return observers, nil return observers, nil
@@ -969,11 +983,23 @@ func (db *DB) GetObservers() ([]Observer, error) {
// GetObserverByID returns a single observer. // GetObserverByID returns a single observer.
func (db *DB) GetObserverByID(id string) (*Observer, error) { func (db *DB) GetObserverByID(id string) (*Observer, error) {
var o Observer var o Observer
var batteryMv, uptimeSecs sql.NullInt64
var noiseFloor sql.NullFloat64
err := db.conn.QueryRow("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor FROM observers WHERE id = ?", id). err := db.conn.QueryRow("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor FROM observers WHERE id = ?", id).
Scan(&o.ID, &o.Name, &o.IATA, &o.LastSeen, &o.FirstSeen, &o.PacketCount, &o.Model, &o.Firmware, &o.ClientVersion, &o.Radio, &o.BatteryMv, &o.UptimeSecs, &o.NoiseFloor) Scan(&o.ID, &o.Name, &o.IATA, &o.LastSeen, &o.FirstSeen, &o.PacketCount, &o.Model, &o.Firmware, &o.ClientVersion, &o.Radio, &batteryMv, &uptimeSecs, &noiseFloor)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if batteryMv.Valid {
v := int(batteryMv.Int64)
o.BatteryMv = &v
}
if uptimeSecs.Valid {
o.UptimeSecs = &uptimeSecs.Int64
}
if noiseFloor.Valid {
o.NoiseFloor = &noiseFloor.Float64
}
return &o, nil return &o, nil
} }
@@ -1634,11 +1660,13 @@ func scanNodeRow(rows *sql.Rows) map[string]interface{} {
var name, role, lastSeen, firstSeen sql.NullString var name, role, lastSeen, firstSeen sql.NullString
var lat, lon sql.NullFloat64 var lat, lon sql.NullFloat64
var advertCount int var advertCount int
var batteryMv sql.NullInt64
var temperatureC sql.NullFloat64
if err := rows.Scan(&pk, &name, &role, &lat, &lon, &lastSeen, &firstSeen, &advertCount); err != nil { if err := rows.Scan(&pk, &name, &role, &lat, &lon, &lastSeen, &firstSeen, &advertCount, &batteryMv, &temperatureC); err != nil {
return nil return nil
} }
return map[string]interface{}{ m := map[string]interface{}{
"public_key": pk, "public_key": pk,
"name": nullStr(name), "name": nullStr(name),
"role": nullStr(role), "role": nullStr(role),
@@ -1651,6 +1679,17 @@ func scanNodeRow(rows *sql.Rows) map[string]interface{} {
"hash_size": nil, "hash_size": nil,
"hash_size_inconsistent": false, "hash_size_inconsistent": false,
} }
if batteryMv.Valid {
m["battery_mv"] = int(batteryMv.Int64)
} else {
m["battery_mv"] = nil
}
if temperatureC.Valid {
m["temperature_c"] = temperatureC.Float64
} else {
m["temperature_c"] = nil
}
return m
} }
func nullStr(ns sql.NullString) interface{} { func nullStr(ns sql.NullString) interface{} {

View File

@@ -28,7 +28,9 @@ func setupTestDB(t *testing.T) *DB {
lon REAL, lon REAL,
last_seen TEXT, last_seen TEXT,
first_seen TEXT, first_seen TEXT,
advert_count INTEGER DEFAULT 0 advert_count INTEGER DEFAULT 0,
battery_mv INTEGER,
temperature_c REAL
); );
CREATE TABLE observers ( CREATE TABLE observers (
@@ -44,7 +46,7 @@ func setupTestDB(t *testing.T) *DB {
radio TEXT, radio TEXT,
battery_mv INTEGER, battery_mv INTEGER,
uptime_secs INTEGER, uptime_secs INTEGER,
noise_floor INTEGER noise_floor REAL
); );
CREATE TABLE transmissions ( CREATE TABLE transmissions (
@@ -369,6 +371,88 @@ func TestGetObserverByIDNotFound(t *testing.T) {
} }
} }
func TestObserverTypeConsistency(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
// Insert observer with typed metadata matching ingestor writes
db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count, battery_mv, uptime_secs, noise_floor)
VALUES ('obs_typed', 'TypedObs', 'SJC', '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z', 10, 3500, 86400, -115.5)`)
obs, err := db.GetObserverByID("obs_typed")
if err != nil {
t.Fatal(err)
}
// battery_mv should be *int
if obs.BatteryMv == nil {
t.Fatal("BatteryMv should not be nil")
}
if *obs.BatteryMv != 3500 {
t.Errorf("BatteryMv=%d, want 3500", *obs.BatteryMv)
}
// uptime_secs should be *int64
if obs.UptimeSecs == nil {
t.Fatal("UptimeSecs should not be nil")
}
if *obs.UptimeSecs != 86400 {
t.Errorf("UptimeSecs=%d, want 86400", *obs.UptimeSecs)
}
// noise_floor should be *float64
if obs.NoiseFloor == nil {
t.Fatal("NoiseFloor should not be nil")
}
if *obs.NoiseFloor != -115.5 {
t.Errorf("NoiseFloor=%f, want -115.5", *obs.NoiseFloor)
}
// Verify NULL handling: observer without metadata
db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count)
VALUES ('obs_null', 'NullObs', 'SFO', '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z', 5)`)
obsNull, err := db.GetObserverByID("obs_null")
if err != nil {
t.Fatal(err)
}
if obsNull.BatteryMv != nil {
t.Errorf("BatteryMv should be nil for observer without metadata, got %d", *obsNull.BatteryMv)
}
if obsNull.UptimeSecs != nil {
t.Errorf("UptimeSecs should be nil for observer without metadata, got %d", *obsNull.UptimeSecs)
}
if obsNull.NoiseFloor != nil {
t.Errorf("NoiseFloor should be nil for observer without metadata, got %f", *obsNull.NoiseFloor)
}
}
func TestObserverTypesInGetObservers(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count, battery_mv, uptime_secs, noise_floor)
VALUES ('obs1', 'Obs1', 'SJC', '2026-06-01T00:00:00Z', '2026-01-01T00:00:00Z', 10, 4200, 172800, -110.3)`)
observers, err := db.GetObservers()
if err != nil {
t.Fatal(err)
}
if len(observers) != 1 {
t.Fatalf("expected 1 observer, got %d", len(observers))
}
o := observers[0]
if o.BatteryMv == nil || *o.BatteryMv != 4200 {
t.Errorf("BatteryMv=%v, want 4200", o.BatteryMv)
}
if o.UptimeSecs == nil || *o.UptimeSecs != 172800 {
t.Errorf("UptimeSecs=%v, want 172800", o.UptimeSecs)
}
if o.NoiseFloor == nil || *o.NoiseFloor != -110.3 {
t.Errorf("NoiseFloor=%v, want -110.3", o.NoiseFloor)
}
}
func TestGetDistinctIATAs(t *testing.T) { func TestGetDistinctIATAs(t *testing.T) {
db := setupTestDB(t) db := setupTestDB(t)
defer db.Close() defer db.Close()
@@ -1386,6 +1470,49 @@ func TestGetChannelsStaleMessage(t *testing.T) {
} }
} }
func TestNodeTelemetryFields(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
db.conn.Exec(`INSERT INTO nodes (public_key, name, role, lat, lon, last_seen, first_seen, advert_count, battery_mv, temperature_c)
VALUES ('pk_telem1', 'SensorNode', 'sensor', 37.0, -122.0, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z', 5, 3700, 28.5)`)
node, err := db.GetNodeByPubkey("pk_telem1")
if err != nil {
t.Fatal(err)
}
if node == nil {
t.Fatal("expected node, got nil")
}
if node["battery_mv"] != 3700 {
t.Errorf("battery_mv=%v, want 3700", node["battery_mv"])
}
if node["temperature_c"] != 28.5 {
t.Errorf("temperature_c=%v, want 28.5", node["temperature_c"])
}
nodes, _, _, err := db.GetNodes(50, 0, "sensor", "", "", "", "", "")
if err != nil {
t.Fatal(err)
}
if len(nodes) != 1 {
t.Fatalf("expected 1 sensor node, got %d", len(nodes))
}
if nodes[0]["battery_mv"] != 3700 {
t.Errorf("GetNodes battery_mv=%v, want 3700", nodes[0]["battery_mv"])
}
db.conn.Exec(`INSERT INTO nodes (public_key, name, role, last_seen, first_seen, advert_count)
VALUES ('pk_notelem', 'PlainNode', 'repeater', '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z', 3)`)
node2, _ := db.GetNodeByPubkey("pk_notelem")
if node2["battery_mv"] != nil {
t.Errorf("expected nil battery_mv for node without telemetry, got %v", node2["battery_mv"])
}
if node2["temperature_c"] != nil {
t.Errorf("expected nil temperature_c for node without telemetry, got %v", node2["temperature_c"])
}
}
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
os.Exit(m.Run()) os.Exit(m.Run())
} }

View File

@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
_ "net/http/pprof"
"os" "os"
"os/exec" "os/exec"
"os/signal" "os/signal"
@@ -54,6 +55,18 @@ func resolveBuildTime() string {
} }
func main() { func main() {
// pprof profiling — off by default, enable with ENABLE_PPROF=true
if os.Getenv("ENABLE_PPROF") == "true" {
pprofPort := os.Getenv("PPROF_PORT")
if pprofPort == "" {
pprofPort = "6060"
}
go func() {
log.Printf("[pprof] profiling UI at http://localhost:%s/debug/pprof/", pprofPort)
log.Fatal(http.ListenAndServe(":"+pprofPort, nil))
}()
}
var ( var (
configDir string configDir string
port int port int

View File

@@ -996,6 +996,12 @@
"elementShape": { "elementShape": {
"type": "number" "type": "number"
} }
},
"battery_mv": {
"type": "nullable_number"
},
"temperature_c": {
"type": "nullable_number"
} }
} }
}, },
@@ -1097,6 +1103,12 @@
}, },
"last_heard": { "last_heard": {
"type": "string" "type": "string"
},
"battery_mv": {
"type": "nullable_number"
},
"temperature_c": {
"type": "nullable_number"
} }
} }
} }

40
db.js
View File

@@ -33,7 +33,9 @@ db.exec(`
lon REAL, lon REAL,
last_seen TEXT, last_seen TEXT,
first_seen TEXT, first_seen TEXT,
advert_count INTEGER DEFAULT 0 advert_count INTEGER DEFAULT 0,
battery_mv INTEGER,
temperature_c REAL
); );
CREATE TABLE IF NOT EXISTS observers ( CREATE TABLE IF NOT EXISTS observers (
@@ -60,7 +62,9 @@ db.exec(`
lon REAL, lon REAL,
last_seen TEXT, last_seen TEXT,
first_seen TEXT, first_seen TEXT,
advert_count INTEGER DEFAULT 0 advert_count INTEGER DEFAULT 0,
battery_mv INTEGER,
temperature_c REAL
); );
CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen); CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen);
@@ -324,6 +328,22 @@ for (const col of ['model', 'firmware', 'client_version', 'radio', 'battery_mv',
} }
} }
// --- One-time migration: add telemetry columns to nodes and inactive_nodes ---
{
const done = db.prepare(`SELECT 1 FROM _migrations WHERE name = 'node_telemetry_v1'`).get();
if (!done) {
console.log('[migration] Adding telemetry columns to nodes/inactive_nodes...');
const nodeCols = db.pragma('table_info(nodes)').map(c => c.name);
if (!nodeCols.includes('battery_mv')) db.exec(`ALTER TABLE nodes ADD COLUMN battery_mv INTEGER`);
if (!nodeCols.includes('temperature_c')) db.exec(`ALTER TABLE nodes ADD COLUMN temperature_c REAL`);
const inactiveCols = db.pragma('table_info(inactive_nodes)').map(c => c.name);
if (!inactiveCols.includes('battery_mv')) db.exec(`ALTER TABLE inactive_nodes ADD COLUMN battery_mv INTEGER`);
if (!inactiveCols.includes('temperature_c')) db.exec(`ALTER TABLE inactive_nodes ADD COLUMN temperature_c REAL`);
db.prepare(`INSERT INTO _migrations (name) VALUES ('node_telemetry_v1')`).run();
console.log('[migration] node telemetry columns added');
}
}
// --- Prepared statements --- // --- Prepared statements ---
const stmts = { const stmts = {
upsertNode: db.prepare(` upsertNode: db.prepare(`
@@ -339,6 +359,12 @@ const stmts = {
incrementAdvertCount: db.prepare(` incrementAdvertCount: db.prepare(`
UPDATE nodes SET advert_count = advert_count + 1 WHERE public_key = @public_key UPDATE nodes SET advert_count = advert_count + 1 WHERE public_key = @public_key
`), `),
updateNodeTelemetry: db.prepare(`
UPDATE nodes SET
battery_mv = COALESCE(@battery_mv, battery_mv),
temperature_c = COALESCE(@temperature_c, temperature_c)
WHERE public_key = @public_key
`),
upsertObserver: db.prepare(` upsertObserver: db.prepare(`
INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor) INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor)
VALUES (@id, @name, @iata, @last_seen, @first_seen, 1, @model, @firmware, @client_version, @radio, @battery_mv, @uptime_secs, @noise_floor) VALUES (@id, @name, @iata, @last_seen, @first_seen, 1, @model, @firmware, @client_version, @radio, @battery_mv, @uptime_secs, @noise_floor)
@@ -511,6 +537,14 @@ function incrementAdvertCount(publicKey) {
stmts.incrementAdvertCount.run({ public_key: publicKey }); stmts.incrementAdvertCount.run({ public_key: publicKey });
} }
function updateNodeTelemetry(data) {
stmts.updateNodeTelemetry.run({
public_key: data.public_key,
battery_mv: data.battery_mv ?? null,
temperature_c: data.temperature_c ?? null,
});
}
function upsertNode(data) { function upsertNode(data) {
const now = new Date().toISOString(); const now = new Date().toISOString();
stmts.upsertNode.run({ stmts.upsertNode.run({
@@ -898,4 +932,4 @@ function moveStaleNodes(nodeDays) {
return moved; return moved;
} }
module.exports = { db, schemaVersion, observerIdToRowid, resolveObserverIdx, insertTransmission, upsertNode, incrementAdvertCount, upsertObserver, updateObserverStatus, getPackets, getPacket, getTransmission, getNodes, getNode, getObservers, getStats, searchNodes, getNodeHealth, getNodeAnalytics, removePhantomNodes, moveStaleNodes }; module.exports = { db, schemaVersion, observerIdToRowid, resolveObserverIdx, insertTransmission, upsertNode, incrementAdvertCount, updateNodeTelemetry, upsertObserver, updateObserverStatus, getPackets, getPacket, getTransmission, getNodes, getNode, getObservers, getStats, searchNodes, getNodeHealth, getNodeAnalytics, removePhantomNodes, moveStaleNodes };

View File

@@ -135,10 +135,32 @@ function decodeAdvert(buf) {
off += 8; off += 8;
} }
if (result.flags.hasName) { if (result.flags.hasName) {
let name = appdata.subarray(off).toString('utf8'); // Find null terminator to separate name from trailing telemetry bytes
// Strip non-printable characters (< 0x20 except tab/newline) and DEL let nameEnd = appdata.length;
for (let i = off; i < appdata.length; i++) {
if (appdata[i] === 0x00) { nameEnd = i; break; }
}
let name = appdata.subarray(off, nameEnd).toString('utf8');
name = name.replace(/[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]/g, ''); name = name.replace(/[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]/g, '');
result.name = name; result.name = name;
off = nameEnd;
// Skip null terminator(s)
while (off < appdata.length && appdata[off] === 0x00) off++;
}
// Telemetry bytes after name: battery_mv(2 LE) + temperature_c(2 LE, signed, /100)
// Only sensor nodes (advType=4) carry telemetry bytes.
if (result.flags.sensor && off + 4 <= appdata.length) {
const batteryMv = appdata.readUInt16LE(off);
const tempRaw = appdata.readInt16LE(off + 2);
const tempC = tempRaw / 100.0;
if (batteryMv > 0 && batteryMv <= 10000) {
result.battery_mv = batteryMv;
}
// Raw int16 / 100 → °C; accept -50°C to 100°C (raw: -5000 to 10000)
if (tempRaw >= -5000 && tempRaw <= 10000) {
result.temperature_c = tempC;
}
} }
} }

View File

@@ -58,12 +58,16 @@ services:
ports: ports:
- "${STAGING_GO_HTTP_PORT:-82}:80" - "${STAGING_GO_HTTP_PORT:-82}:80"
- "${STAGING_GO_MQTT_PORT:-1885}:1883" - "${STAGING_GO_MQTT_PORT:-1885}:1883"
- "6060:6060" # pprof server
- "6061:6061" # pprof ingestor
volumes: volumes:
- ${STAGING_DATA_DIR:-~/meshcore-staging-data}/config.json:/app/config.json:ro - ${STAGING_DATA_DIR:-~/meshcore-staging-data}/config.json:/app/config.json:ro
- ${STAGING_DATA_DIR:-~/meshcore-staging-data}:/app/data - ${STAGING_DATA_DIR:-~/meshcore-staging-data}:/app/data
- caddy-data-staging-go:/data/caddy - caddy-data-staging-go:/data/caddy
environment: environment:
- NODE_ENV=staging - NODE_ENV=staging
- ENABLE_PPROF=true
- PPROF_PORT=6060
healthcheck: healthcheck:
test: ["CMD", "wget", "-qO-", "http://localhost:3000/api/stats"] test: ["CMD", "wget", "-qO-", "http://localhost:3000/api/stats"]
interval: 30s interval: 30s

View File

@@ -716,6 +716,10 @@ for (const source of mqttSources) {
const role = p.flags ? (p.flags.repeater ? 'repeater' : p.flags.room ? 'room' : p.flags.sensor ? 'sensor' : 'companion') : 'companion'; const role = p.flags ? (p.flags.repeater ? 'repeater' : p.flags.room ? 'room' : p.flags.sensor ? 'sensor' : 'companion') : 'companion';
db.upsertNode({ public_key: p.pubKey, name: p.name || null, role, lat: p.lat, lon: p.lon, last_seen: now }); db.upsertNode({ public_key: p.pubKey, name: p.name || null, role, lat: p.lat, lon: p.lon, last_seen: now });
if (txResult && txResult.isNew) db.incrementAdvertCount(p.pubKey); if (txResult && txResult.isNew) db.incrementAdvertCount(p.pubKey);
// Update telemetry if present in advert
if (p.battery_mv != null || p.temperature_c != null) {
db.updateNodeTelemetry({ public_key: p.pubKey, battery_mv: p.battery_mv ?? null, temperature_c: p.temperature_c ?? null });
}
// Invalidate this node's caches on advert // Invalidate this node's caches on advert
cache.invalidate('node:' + p.pubKey); cache.invalidate('node:' + p.pubKey);
cache.invalidate('health:' + p.pubKey); cache.invalidate('health:' + p.pubKey);
@@ -1057,6 +1061,10 @@ app.post('/api/packets', requireApiKey, (req, res) => {
const role = p.flags ? (p.flags.repeater ? 'repeater' : p.flags.room ? 'room' : p.flags.sensor ? 'sensor' : 'companion') : 'companion'; const role = p.flags ? (p.flags.repeater ? 'repeater' : p.flags.room ? 'room' : p.flags.sensor ? 'sensor' : 'companion') : 'companion';
db.upsertNode({ public_key: p.pubKey, name: p.name || null, role, lat: p.lat, lon: p.lon, last_seen: now }); db.upsertNode({ public_key: p.pubKey, name: p.name || null, role, lat: p.lat, lon: p.lon, last_seen: now });
if (txResult && txResult.isNew) db.incrementAdvertCount(p.pubKey); if (txResult && txResult.isNew) db.incrementAdvertCount(p.pubKey);
// Update telemetry if present in advert
if (p.battery_mv != null || p.temperature_c != null) {
db.updateNodeTelemetry({ public_key: p.pubKey, battery_mv: p.battery_mv ?? null, temperature_c: p.temperature_c ?? null });
}
} else { } else {
console.warn(`[advert] Skipping corrupted ADVERT (API): ${validation.reason}`); console.warn(`[advert] Skipping corrupted ADVERT (API): ${validation.reason}`);
} }

View File

@@ -213,6 +213,48 @@ console.log('── Spec Tests: Advert Payload ──');
assertEq(p.name, undefined, 'advert no name: name undefined'); assertEq(p.name, undefined, 'advert no name: name undefined');
} }
// Telemetry: sensor node with battery + positive temperature
{
const pubkey = 'AA'.repeat(32);
const sig = 'BB'.repeat(64);
const flags = '84'; // sensor(4) | hasName(0x80)
const name = Buffer.from('S1').toString('hex') + '00'; // null-terminated
const battBuf = Buffer.alloc(2); battBuf.writeUInt16LE(3700);
const tempBuf = Buffer.alloc(2); tempBuf.writeInt16LE(2850); // 28.50°C
const hex = '1200' + pubkey + '00000000' + sig + flags + name +
battBuf.toString('hex') + tempBuf.toString('hex');
const p = decodePacket(hex).payload;
assertEq(p.battery_mv, 3700, 'telemetry: battery_mv decoded');
assert(Math.abs(p.temperature_c - 28.50) < 0.01, 'telemetry: temperature_c positive');
}
// Telemetry: sensor node with 0°C must still emit temperature_c
{
const pubkey = 'CC'.repeat(32);
const sig = 'DD'.repeat(64);
const flags = '84'; // sensor(4) | hasName(0x80)
const name = Buffer.from('S2').toString('hex') + '00';
const battBuf = Buffer.alloc(2); battBuf.writeUInt16LE(3600);
const tempBuf = Buffer.alloc(2); // 0°C
const hex = '1200' + pubkey + '00000000' + sig + flags + name +
battBuf.toString('hex') + tempBuf.toString('hex');
const p = decodePacket(hex).payload;
assert(p.temperature_c === 0, 'telemetry: 0°C is valid and emitted');
}
// Telemetry: non-sensor node with trailing bytes must NOT decode telemetry
{
const pubkey = 'EE'.repeat(32);
const sig = 'FF'.repeat(64);
const flags = '82'; // repeater(2) | hasName(0x80)
const name = Buffer.from('R1').toString('hex') + '00';
const extraBytes = 'B40ED403'; // battery-like and temp-like bytes
const hex = '1200' + pubkey + '00000000' + sig + flags + name + extraBytes;
const p = decodePacket(hex).payload;
assertEq(p.battery_mv, undefined, 'telemetry: non-sensor node: battery_mv must be undefined');
assertEq(p.temperature_c, undefined, 'telemetry: non-sensor node: temperature_c must be undefined');
}
console.log('── Spec Tests: Encrypted Payload Format ──'); console.log('── Spec Tests: Encrypted Payload Format ──');
// NOTE: Spec says v1 encrypted payloads have dest(1) + src(1) + MAC(2) + ciphertext // NOTE: Spec says v1 encrypted payloads have dest(1) + src(1) + MAC(2) + ciphertext