Files
meshcore-analyzer/cmd/ingestor
Kpa-clawbot f15d2efe81 fix(#1386): #1324 follow-up — test coverage + RWMutex + lock-hold-time + dead code + cadence (#1390)
# #1324 follow-up — test coverage + RWMutex + lock-hold-time + dead code
+ cadence

Addresses the post-merge audit findings in #1386 on PR #1324
(multi-byte capability persistence). Two independent audits (Kent
Beck test-quality + Carmack perf) surfaced one top-level
test-coverage gap and three perf concerns. This PR closes all of
them; cadence cleanup is included.

Red commit: `<RED_SHA>` (CI: `<RED_URL>`)

## What

1. **Tests** (`cmd/ingestor/multibyte_persist_test.go`):
   - `TestRunMultibyteCapPersist_RoundTrip` — end-to-end persist →
     close store → reopen → assert DB state survived.
   - `TestRunMultibyteCapPersist_MalformedSnapshot` — corrupt
     snapshot must log + no-op, not crash.
   - `TestRunMultibyteCapPersist_MissingSchemaColumns` — legacy DB
     without `multibyte_sup` cols must skip with explicit log, not
     panic / silently swallow.
   - `TestRunMultibyteCapPersist_PreservesConfirmedOnUnknown` —
     status=`unknown` MUST NOT clobber an existing `confirmed` row
     (mutation guard for the data-destruction check).
2. **`cmd/server/store.go`**
   - `cacheMu sync.Mutex` → `sync.RWMutex`. The per-node
     `GetMultibyteCapFor` read path in `/api/nodes` (`routes.go:1215`)
     uses `RLock` now; no longer serializes against itself or
     against analytics readers.
   - Build the multi-byte index map OUTSIDE `cacheMu`, then swap the
     pointer inside. Removes a 2400-iteration allocation hold from
     the analytics-cycle critical section.
   - Drop the dead `GetMultiByteCapMap` (zero callers confirmed by
     `rg`) and the stale `multibyteStatusToInt` tombstone comment.
3. **`cmd/ingestor/multibyte_persist.go`**
- Replace the per-entry pair of `UPDATE nodes` + `UPDATE inactive_nodes`
     (50% guaranteed-miss) with a single dispatch-by-table-membership
     `UPDATE` per entry. ~50% fewer prepared-stmt round-trips.
   - Explicit `MalformedSnapshot` log line distinct from cold-start.
   - Defensive schema-presence check via `PRAGMA table_info` once at
     start; logs `[multibyte-persist] schema missing` and returns
     clean stats on legacy DBs.
4. **`cmd/server/analytics_recomputer.go` / `config.example.json`** —
   bump default snapshot cadence from 15s to 1m (the snapshot is a
   derived cache the ingestor only reads every 5 min; 4× less disk
   churn, no observable freshness loss).

## Why

Direct quotes from the audit (#1386):

> *"No end-to-end persist→restart→load round-trip — the documented
> value prop of the PR ('survives restart') has no single test
> exercising the full path."* (Kent Beck)

> *"`cacheMu` is `sync.Mutex` not `sync.RWMutex` + per-node read in
> `handleNodes` — 2400 serialized lock acquisitions per `/api/nodes`
> call, contended against every analytics-cache reader/writer.
> The O(1) win is consumed by lock contention."* (Carmack #1)

> *"Map construction held under shared `cacheMu` — every 15s
> analytics cycle blocks every API cache read for the duration of a
> 2400-entry map build. Build outside the lock, swap pointer
> inside."* (Carmack #2)

> *"`UPDATE nodes` + `UPDATE inactive_nodes` per entry … 4800
> prepared-stmt round-trips, 2400 guaranteed-empty."* (Carmack #3)

> *"Server writes 20 snapshots for every one the ingestor reads.
> Cadence mismatch — server could publish every 1 min and lose
> nothing."* (Carmack §2)

## TDD

Red commit adds the four tests above. Two of the four
(`MalformedSnapshot`, `MissingSchemaColumns`) fail on assertions
against the pre-fix `multibyte_persist.go`; the other two
(`RoundTrip`, `PreservesConfirmedOnUnknown`) are regression coverage
of behaviour the original implementation already honoured but never
exercised — they exist to guard future mutation (the audit's
mutation-suggestion lens). Green commit lands the implementation.

## Bench

`go test -bench BenchmarkGetMultibyteCapFor -benchmem -count=10`
(local, idle laptop, n=2400-entry index, 8 reader goroutines vs. one
analytics writer):

| variant            | ns/op | allocs/op |
|--------------------|------:|----------:|
| `sync.Mutex` (pre) | n/a — see note | — |
| `sync.RWMutex`     | n/a — see note | — |

Note: did not produce a concurrent benchmark in this PR (would
require non-trivial test scaffolding around the cache lifecycle).
The win is structural — `RLock` allows the ~2400 per-`/api/nodes`
reads to proceed in parallel rather than serializing on the same
mutex held by every analytics writer. Documenting honestly per
AGENTS.md "perf claims require proof": full microbench deferred to
a follow-up.

## Manual verification (staging)

- New tests: `go test ./... -count=1 -timeout 300s` in `cmd/ingestor`
  and `cmd/server` — green.
- All multibyte-area tests (`#1366`, `#1368`, `#1372` regression
  suites in `multibyte_capability_test.go`, `multibyte_enrich_test.go`,
  `multibyte_region_filter_test.go`): green.
- Preflight: `bash ~/.openclaw/skills/pr-preflight/scripts/run-all.sh
  origin/master` — exit 0.

Fixes #1386

---------

Co-authored-by: claw <claw@openclaw.local>
2026-05-25 23:29:35 -07:00
..

MeshCore MQTT Ingestor (Go)

Standalone MQTT ingestion service for CoreScope. Connects to MQTT brokers, decodes raw MeshCore packets, and writes to the same SQLite database used by the Node.js web server.

This is the first step of a larger Go rewrite — separating MQTT ingestion from the web server.

Architecture

MQTT Broker(s)  →  Go Ingestor  →  SQLite DB  ←  Node.js Web Server
                    (this binary)     (shared)
  • Single static binary — no runtime dependencies, no CGO
  • SQLite via modernc.org/sqlite (pure Go)
  • MQTT via github.com/eclipse/paho.mqtt.golang
  • Runs alongside the Node.js server — they share the DB file
  • Does NOT serve HTTP/WebSocket — that stays in Node.js

Build

Requires Go 1.22+.

cd cmd/ingestor
go build -o corescope-ingestor .

Cross-compile for Linux (e.g., for the production VM):

GOOS=linux GOARCH=amd64 go build -o corescope-ingestor .

Run

./corescope-ingestor -config /path/to/config.json

The config file uses the same format as the Node.js config.json. The ingestor reads the mqttSources array (or legacy mqtt object) and dbPath fields.

Environment Variables

Variable Description Default
DB_PATH SQLite database path data/meshcore.db
MQTT_BROKER Single MQTT broker URL (overrides config)
MQTT_TOPIC MQTT topic (used with MQTT_BROKER) meshcore/#
CORESCOPE_INGESTOR_STATS Path to the per-second stats JSON file consumed by the server's /api/perf/io and /api/perf/write-sources endpoints (#1120) /tmp/corescope-ingestor-stats.json

Stats file (CORESCOPE_INGESTOR_STATS)

Every second the ingestor publishes a JSON snapshot of its counters (tx_inserted, obs_inserted, walCommits, backfillUpdates.*, etc.) plus a procIO block sampled from /proc/self/io (read/write/cancelled bytes per second + syscall counts). The server reads this file and surfaces the data on the Perf page so operators can self-diagnose write-volume anomalies.

The writer uses O_NOFOLLOW | O_CREAT | O_TRUNC mode 0o600, so a pre-planted symlink at the path cannot be used to clobber an arbitrary file.

Security note: the default lives in /tmp, which is world-writable on most hosts (sticky bit only protects deletion, not creation). On shared/multi-tenant hosts, override CORESCOPE_INGESTOR_STATS to point at a private directory (e.g. /var/lib/corescope/ingestor-stats.json) that only the corescope user can write to.

Minimal Config

{
  "dbPath": "data/meshcore.db",
  "mqttSources": [
    {
      "name": "local",
      "broker": "mqtt://localhost:1883",
      "topics": ["meshcore/#"]
    }
  ]
}

Full Config (same as Node.js)

The ingestor reads these fields from the existing config.json:

  • mqttSources[] — array of MQTT broker connections
    • name — display name for logging
    • broker — MQTT URL (mqtt://, mqtts://)
    • username / password — auth credentials
    • topics — array of topic patterns to subscribe
    • iataFilter — optional regional filter
  • mqtt — legacy single-broker config (auto-converted to mqttSources)
  • dbPath — SQLite DB path (default: data/meshcore.db)

Test

cd cmd/ingestor
go test -v ./...

What It Does

  1. Connects to configured MQTT brokers with auto-reconnect
  2. Subscribes to mesh packet topics (e.g., meshcore/+/+/packets)
  3. Receives raw hex packets via JSON messages ({ "raw": "...", "SNR": ..., "RSSI": ... })
  4. Decodes MeshCore packet headers, paths, and payloads (ported from decoder.js)
  5. Computes content hashes (path-independent, SHA-256-based)
  6. Writes to SQLite: transmissions + observations tables
  7. Upserts nodes from decoded ADVERT packets (with validation)
  8. Upserts observers from MQTT topic metadata

Schema Compatibility

The Go ingestor creates the same v3 schema as the Node.js server:

  • transmissions — deduplicated by content hash
  • observations — per-observer sightings with observer_idx (rowid reference)
  • nodes — mesh nodes discovered from adverts
  • observers — MQTT feed sources

Both processes can write to the same DB concurrently (SQLite WAL mode).

What's Not Ported (Yet)

  • Companion bridge format (Format 2 — meshcore/advertisement, channel messages, etc.)
  • Channel key decryption (GRP_TXT encrypted payload decryption)
  • WebSocket broadcast to browsers
  • In-memory packet store
  • Cache invalidation

These stay in the Node.js server for now.

Files

cmd/ingestor/
  main.go          — entry point, MQTT connect, message handler
  decoder.go       — MeshCore packet decoder (ported from decoder.js)
  decoder_test.go  — decoder tests (25 tests, golden fixtures)
  db.go            — SQLite writer (schema-compatible with db.js)
  db_test.go       — DB tests (schema validation, insert/upsert, E2E)
  config.go        — config struct + loader
  util.go          — shared utilities
  go.mod / go.sum  — Go module definition