Files
meshcore-analyzer/cmd/ingestor
Kpa-clawbot fb744d895f fix(#1143): structural pubkey attribution via from_pubkey column (#1152)
Fixes #1143.

## Summary

Replaces the structurally unsound `decoded_json LIKE '%pubkey%'` (and
`OR LIKE '%name%'`) attribution path with an exact-match lookup on a
dedicated, indexed `transmissions.from_pubkey` column.

This closes both holes documented in #1143:
- **Hole 1** — same-name false positives via `OR LIKE '%name%'`
- **Hole 2a** — adversarial spoofing: a malicious node names itself with
another node's pubkey and gets attributed to the victim
- **Hole 2b** — accidental false positive when any free-text field (path
elements, channel names, message bodies) contains a 64-char hex
substring matching a real pubkey
- **Perf** — query now uses an index instead of a full-table scan
against `LIKE '%substring%'`

## TDD

Two-commit history shows red-then-green:

| Commit | Status | Purpose |
|---|---|---|
| `7f0f08e` | RED — tests assertion-fail on master behaviour |
Adversarial fixtures + spec |
| `59327db` | GREEN — schema + ingestor + server + migration |
Implementation |

The red commit's test schema includes the new column so the file
compiles, but the production code still uses LIKE — the assertions fail
because the malicious / same-name / free-text rows are returned. The
green commit changes the query plus adds the migration/ingest path.

## Changes

### Schema
- new column `transmissions.from_pubkey TEXT`
- new index `idx_transmissions_from_pubkey`

### Ingestor (`cmd/ingestor/`)
- `PacketData.FromPubkey` populated from decoded ADVERT `pubKey` at
write time. Cheap — already parsing `decoded_json`. Non-ADVERTs stay
NULL.
- `stmtInsertTransmission` writes the column.
- Migration `from_pubkey_v1` ALTERs legacy DBs to add the column +
index.
- Bonus: rewrote the recipe in the gated one-shot
`advert_count_unique_v1` migration to use `from_pubkey` (already marked
done on existing DBs; kept correct for fresh installs).

### Server (`cmd/server/`)
- `ensureFromPubkeyColumn` mirrors the ingestor migration so the server
can boot against a DB the ingestor has never touched (e2e fixture, fresh
installs).
- `backfillFromPubkeyAsync` runs **after** HTTP starts. Scans `WHERE
from_pubkey IS NULL AND payload_type = 4` in 5000-row chunks with a
100ms yield between chunks. Cannot block boot even on prod-sized DBs
(100K+ transmissions). Queries handle NULL gracefully (return empty for
that pubkey, same as today's unknown-pubkey path).
- All in-scope LIKE call sites switched to exact match:

| Site | Before | After |
|---|---|---|
| `buildPacketWhere` (was db.go:582) | `decoded_json LIKE '%pubkey%'` |
`from_pubkey = ?` |
| `buildTransmissionWhere` (was db.go:626) | `t.decoded_json LIKE
'%pubkey%'` | `t.from_pubkey = ?` |
| `GetRecentTransmissionsForNode` (was db.go:910) | `LIKE '%pubkey%' OR
LIKE '%name%'` | `t.from_pubkey = ?` |
| `QueryMultiNodePackets` (was db.go:1785) | `decoded_json LIKE
'%pubkey%' OR ...` | `t.from_pubkey IN (?, ?, ...)` |
| `advert_count_unique_v1` (was ingestor/db.go:257) | `decoded_json LIKE
'%' \|\| nodes.public_key \|\| '%'` | `t.from_pubkey = nodes.public_key`
|

`GetRecentTransmissionsForNode` signature simplifies: the `name`
parameter is gone (it was only ever used for the legacy `OR LIKE
'%name%'` fallback). Sole caller in `routes.go:1243` updated.

### Tests
- `cmd/server/from_pubkey_attribution_test.go` — adversarial fixtures +
Hole 1/2a/2b/QueryMultiNodePackets exact-match assertions, EXPLAIN QUERY
PLAN index check, migration backfill correctness.
- `cmd/ingestor/from_pubkey_test.go` — write-time correctness
(BuildPacketData populates FromPubkey for ADVERT only;
InsertTransmission persists it; non-ADVERTs stay NULL).
- Existing test schemas (server v2, server v3, coverage) get the new
column **plus a SQLite trigger** that auto-populates `from_pubkey` from
`decoded_json` on ADVERT inserts. This means existing fixtures (which
only seed `decoded_json`) keep attributing correctly without per-test
edits.
- `seedTestData`'s ADVERTs explicitly set `from_pubkey`.

## Performance — index is used

```
$ EXPLAIN QUERY PLAN SELECT id FROM transmissions WHERE from_pubkey = ?
SEARCH transmissions USING INDEX idx_transmissions_from_pubkey (from_pubkey=?)
```

Asserted in `TestFromPubkeyIndexUsed`.

## Migration approach

- **Sync at boot**: `ALTER TABLE transmissions ADD COLUMN from_pubkey
TEXT` is a metadata-only operation in SQLite — microseconds regardless
of table size. `CREATE INDEX IF NOT EXISTS
idx_transmissions_from_pubkey` is **not** metadata-only: it scans the
table once. Empirically a few hundred ms on a 100K-row table; expect a
few seconds on a 10M-row table (one-time cost, blocking boot during that
window). Subsequent boots no-op via `IF NOT EXISTS`. If this boot delay
becomes an operational concern at prod scale we can defer the `CREATE
INDEX` to a goroutine — for now a few-second one-time delay is
acceptable.
- **Async**: row-level backfill of legacy NULL ADVERTs (chunked 5000 /
100ms yield). On a 100K-ADVERT prod DB, this completes in seconds in the
background; HTTP is fully available throughout.
- **Safety**: queries handle NULL gracefully — a node whose ADVERTs
haven't backfilled yet returns empty, identical to today's behaviour for
unknown pubkeys. No half-state regression.

## Out of scope (intentionally)

The free-text `LIKE` paths the issue explicitly leaves alone (e.g.
user-typed packet search) are untouched. Only the pubkey-attribution
sites get the column treatment.



## Cycle-3 review fixes

| Finding | Status | Commit |
|---|---|---|
| **M1c** — async-contract test was tautological (test's own `go`, not
production's) | Fixed | `23ace71` (red) → `a05b50c` (green) |
| **m1c** — package-global atomic resets unsafe under `t.Parallel()` |
Fixed (`// DO NOT t.Parallel` comment + `Reset()` helper) | rolled into
`23ace71` / `241ec69` |
| **m2c** — `/api/healthz` read 3 atomics non-atomically (torn snapshot)
| Fixed (single RWMutex-guarded snapshot + race test) | `241ec69` |
| **n3c.m1** — vestigial OR-scaffolding in `QueryMultiNodePackets` |
Fixed (cleanup) | `5a53ceb` |
| **n3c.m2** — verify PR body language about `ALTER` vs `CREATE INDEX` |
Verified accurate (already corrected in cycle 2) | (no change) |
| **n3c.m3** — `json.Unmarshal` per row in backfill → could use SQL
`json_extract` | **Deferred as known followup** — pure perf optimization
(current per-row Unmarshal is correct, just slower); SQL rewrite would
unwind the chunked-yield architecture and is non-trivial. Acceptable for
one-time backfill at boot on legacy DBs. |

### M1c implementation detail

`startFromPubkeyBackfill(dbPath, chunkSize, yieldDuration)` is now the
single production entry point used by `main.go`. It internally does `go
backfillFromPubkeyAsync(...)`. The test calls `startFromPubkeyBackfill`
(no `go` prefix) and asserts the dispatch returns within 50ms — so if
anyone removes the `go` keyword inside the wrapper, the test fails.
**Manually verified**: removing the `go` keyword causes
`TestBackfillFromPubkey_DoesNotBlockBoot` to fail with "backfill
dispatch took ~1s (>50ms): not async — would block boot."

### m2c implementation detail

`fromPubkeyBackfillTotal/Processed/Done` are now plain `int64`/`bool`
package globals guarded by a single `sync.RWMutex`.
`fromPubkeyBackfillSnapshot()` returns all three under one RLock.
`TestHealthzFromPubkeyBackfillConsistentSnapshot` races a writer
(lock-step total/processed updates with periodic done flips) against 8
readers hammering `/api/healthz`, asserting `processed<=total` and
`(done => processed==total)` on every response. Verified the test
catches torn reads (manually injected a 3-RLock implementation; test
failed within milliseconds with "processed>total" and "done=true but
processed!=total" errors).

---------

Co-authored-by: openclaw-bot <bot@openclaw.local>
Co-authored-by: openclaw-bot <bot@openclaw.dev>
2026-05-06 23:50:44 -07:00
..

MeshCore MQTT Ingestor (Go)

Standalone MQTT ingestion service for CoreScope. Connects to MQTT brokers, decodes raw MeshCore packets, and writes to the same SQLite database used by the Node.js web server.

This is the first step of a larger Go rewrite — separating MQTT ingestion from the web server.

Architecture

MQTT Broker(s)  →  Go Ingestor  →  SQLite DB  ←  Node.js Web Server
                    (this binary)     (shared)
  • Single static binary — no runtime dependencies, no CGO
  • SQLite via modernc.org/sqlite (pure Go)
  • MQTT via github.com/eclipse/paho.mqtt.golang
  • Runs alongside the Node.js server — they share the DB file
  • Does NOT serve HTTP/WebSocket — that stays in Node.js

Build

Requires Go 1.22+.

cd cmd/ingestor
go build -o corescope-ingestor .

Cross-compile for Linux (e.g., for the production VM):

GOOS=linux GOARCH=amd64 go build -o corescope-ingestor .

Run

./corescope-ingestor -config /path/to/config.json

The config file uses the same format as the Node.js config.json. The ingestor reads the mqttSources array (or legacy mqtt object) and dbPath fields.

Environment Variables

Variable Description Default
DB_PATH SQLite database path data/meshcore.db
MQTT_BROKER Single MQTT broker URL (overrides config)
MQTT_TOPIC MQTT topic (used with MQTT_BROKER) meshcore/#

Minimal Config

{
  "dbPath": "data/meshcore.db",
  "mqttSources": [
    {
      "name": "local",
      "broker": "mqtt://localhost:1883",
      "topics": ["meshcore/#"]
    }
  ]
}

Full Config (same as Node.js)

The ingestor reads these fields from the existing config.json:

  • mqttSources[] — array of MQTT broker connections
    • name — display name for logging
    • broker — MQTT URL (mqtt://, mqtts://)
    • username / password — auth credentials
    • topics — array of topic patterns to subscribe
    • iataFilter — optional regional filter
  • mqtt — legacy single-broker config (auto-converted to mqttSources)
  • dbPath — SQLite DB path (default: data/meshcore.db)

Test

cd cmd/ingestor
go test -v ./...

What It Does

  1. Connects to configured MQTT brokers with auto-reconnect
  2. Subscribes to mesh packet topics (e.g., meshcore/+/+/packets)
  3. Receives raw hex packets via JSON messages ({ "raw": "...", "SNR": ..., "RSSI": ... })
  4. Decodes MeshCore packet headers, paths, and payloads (ported from decoder.js)
  5. Computes content hashes (path-independent, SHA-256-based)
  6. Writes to SQLite: transmissions + observations tables
  7. Upserts nodes from decoded ADVERT packets (with validation)
  8. Upserts observers from MQTT topic metadata

Schema Compatibility

The Go ingestor creates the same v3 schema as the Node.js server:

  • transmissions — deduplicated by content hash
  • observations — per-observer sightings with observer_idx (rowid reference)
  • nodes — mesh nodes discovered from adverts
  • observers — MQTT feed sources

Both processes can write to the same DB concurrently (SQLite WAL mode).

What's Not Ported (Yet)

  • Companion bridge format (Format 2 — meshcore/advertisement, channel messages, etc.)
  • Channel key decryption (GRP_TXT encrypted payload decryption)
  • WebSocket broadcast to browsers
  • In-memory packet store
  • Cache invalidation

These stay in the Node.js server for now.

Files

cmd/ingestor/
  main.go          — entry point, MQTT connect, message handler
  decoder.go       — MeshCore packet decoder (ported from decoder.js)
  decoder_test.go  — decoder tests (25 tests, golden fixtures)
  db.go            — SQLite writer (schema-compatible with db.js)
  db_test.go       — DB tests (schema validation, insert/upsert, E2E)
  config.go        — config struct + loader
  util.go          — shared utilities
  go.mod / go.sum  — Go module definition