Files
meshcore-analyzer/cmd/server/from_pubkey_migration.go
T
Kpa-clawbot fb744d895f fix(#1143): structural pubkey attribution via from_pubkey column (#1152)
Fixes #1143.

## Summary

Replaces the structurally unsound `decoded_json LIKE '%pubkey%'` (and
`OR LIKE '%name%'`) attribution path with an exact-match lookup on a
dedicated, indexed `transmissions.from_pubkey` column.

This closes both holes documented in #1143:
- **Hole 1** — same-name false positives via `OR LIKE '%name%'`
- **Hole 2a** — adversarial spoofing: a malicious node names itself with
another node's pubkey and gets attributed to the victim
- **Hole 2b** — accidental false positive when any free-text field (path
elements, channel names, message bodies) contains a 64-char hex
substring matching a real pubkey
- **Perf** — query now uses an index instead of a full-table scan
against `LIKE '%substring%'`

## TDD

Two-commit history shows red-then-green:

| Commit | Status | Purpose |
|---|---|---|
| `7f0f08e` | RED — tests assertion-fail on master behaviour |
Adversarial fixtures + spec |
| `59327db` | GREEN — schema + ingestor + server + migration |
Implementation |

The red commit's test schema includes the new column so the file
compiles, but the production code still uses LIKE — the assertions fail
because the malicious / same-name / free-text rows are returned. The
green commit changes the query plus adds the migration/ingest path.

## Changes

### Schema
- new column `transmissions.from_pubkey TEXT`
- new index `idx_transmissions_from_pubkey`

### Ingestor (`cmd/ingestor/`)
- `PacketData.FromPubkey` populated from decoded ADVERT `pubKey` at
write time. Cheap — already parsing `decoded_json`. Non-ADVERTs stay
NULL.
- `stmtInsertTransmission` writes the column.
- Migration `from_pubkey_v1` ALTERs legacy DBs to add the column +
index.
- Bonus: rewrote the recipe in the gated one-shot
`advert_count_unique_v1` migration to use `from_pubkey` (already marked
done on existing DBs; kept correct for fresh installs).

### Server (`cmd/server/`)
- `ensureFromPubkeyColumn` mirrors the ingestor migration so the server
can boot against a DB the ingestor has never touched (e2e fixture, fresh
installs).
- `backfillFromPubkeyAsync` runs **after** HTTP starts. Scans `WHERE
from_pubkey IS NULL AND payload_type = 4` in 5000-row chunks with a
100ms yield between chunks. Cannot block boot even on prod-sized DBs
(100K+ transmissions). Queries handle NULL gracefully (return empty for
that pubkey, same as today's unknown-pubkey path).
- All in-scope LIKE call sites switched to exact match:

| Site | Before | After |
|---|---|---|
| `buildPacketWhere` (was db.go:582) | `decoded_json LIKE '%pubkey%'` |
`from_pubkey = ?` |
| `buildTransmissionWhere` (was db.go:626) | `t.decoded_json LIKE
'%pubkey%'` | `t.from_pubkey = ?` |
| `GetRecentTransmissionsForNode` (was db.go:910) | `LIKE '%pubkey%' OR
LIKE '%name%'` | `t.from_pubkey = ?` |
| `QueryMultiNodePackets` (was db.go:1785) | `decoded_json LIKE
'%pubkey%' OR ...` | `t.from_pubkey IN (?, ?, ...)` |
| `advert_count_unique_v1` (was ingestor/db.go:257) | `decoded_json LIKE
'%' \|\| nodes.public_key \|\| '%'` | `t.from_pubkey = nodes.public_key`
|

`GetRecentTransmissionsForNode` signature simplifies: the `name`
parameter is gone (it was only ever used for the legacy `OR LIKE
'%name%'` fallback). Sole caller in `routes.go:1243` updated.

### Tests
- `cmd/server/from_pubkey_attribution_test.go` — adversarial fixtures +
Hole 1/2a/2b/QueryMultiNodePackets exact-match assertions, EXPLAIN QUERY
PLAN index check, migration backfill correctness.
- `cmd/ingestor/from_pubkey_test.go` — write-time correctness
(BuildPacketData populates FromPubkey for ADVERT only;
InsertTransmission persists it; non-ADVERTs stay NULL).
- Existing test schemas (server v2, server v3, coverage) get the new
column **plus a SQLite trigger** that auto-populates `from_pubkey` from
`decoded_json` on ADVERT inserts. This means existing fixtures (which
only seed `decoded_json`) keep attributing correctly without per-test
edits.
- `seedTestData`'s ADVERTs explicitly set `from_pubkey`.

## Performance — index is used

```
$ EXPLAIN QUERY PLAN SELECT id FROM transmissions WHERE from_pubkey = ?
SEARCH transmissions USING INDEX idx_transmissions_from_pubkey (from_pubkey=?)
```

Asserted in `TestFromPubkeyIndexUsed`.

## Migration approach

- **Sync at boot**: `ALTER TABLE transmissions ADD COLUMN from_pubkey
TEXT` is a metadata-only operation in SQLite — microseconds regardless
of table size. `CREATE INDEX IF NOT EXISTS
idx_transmissions_from_pubkey` is **not** metadata-only: it scans the
table once. Empirically a few hundred ms on a 100K-row table; expect a
few seconds on a 10M-row table (one-time cost, blocking boot during that
window). Subsequent boots no-op via `IF NOT EXISTS`. If this boot delay
becomes an operational concern at prod scale we can defer the `CREATE
INDEX` to a goroutine — for now a few-second one-time delay is
acceptable.
- **Async**: row-level backfill of legacy NULL ADVERTs (chunked 5000 /
100ms yield). On a 100K-ADVERT prod DB, this completes in seconds in the
background; HTTP is fully available throughout.
- **Safety**: queries handle NULL gracefully — a node whose ADVERTs
haven't backfilled yet returns empty, identical to today's behaviour for
unknown pubkeys. No half-state regression.

## Out of scope (intentionally)

The free-text `LIKE` paths the issue explicitly leaves alone (e.g.
user-typed packet search) are untouched. Only the pubkey-attribution
sites get the column treatment.



## Cycle-3 review fixes

| Finding | Status | Commit |
|---|---|---|
| **M1c** — async-contract test was tautological (test's own `go`, not
production's) | Fixed | `23ace71` (red) → `a05b50c` (green) |
| **m1c** — package-global atomic resets unsafe under `t.Parallel()` |
Fixed (`// DO NOT t.Parallel` comment + `Reset()` helper) | rolled into
`23ace71` / `241ec69` |
| **m2c** — `/api/healthz` read 3 atomics non-atomically (torn snapshot)
| Fixed (single RWMutex-guarded snapshot + race test) | `241ec69` |
| **n3c.m1** — vestigial OR-scaffolding in `QueryMultiNodePackets` |
Fixed (cleanup) | `5a53ceb` |
| **n3c.m2** — verify PR body language about `ALTER` vs `CREATE INDEX` |
Verified accurate (already corrected in cycle 2) | (no change) |
| **n3c.m3** — `json.Unmarshal` per row in backfill → could use SQL
`json_extract` | **Deferred as known followup** — pure perf optimization
(current per-row Unmarshal is correct, just slower); SQL rewrite would
unwind the chunked-yield architecture and is non-trivial. Acceptable for
one-time backfill at boot on legacy DBs. |

### M1c implementation detail

`startFromPubkeyBackfill(dbPath, chunkSize, yieldDuration)` is now the
single production entry point used by `main.go`. It internally does `go
backfillFromPubkeyAsync(...)`. The test calls `startFromPubkeyBackfill`
(no `go` prefix) and asserts the dispatch returns within 50ms — so if
anyone removes the `go` keyword inside the wrapper, the test fails.
**Manually verified**: removing the `go` keyword causes
`TestBackfillFromPubkey_DoesNotBlockBoot` to fail with "backfill
dispatch took ~1s (>50ms): not async — would block boot."

### m2c implementation detail

`fromPubkeyBackfillTotal/Processed/Done` are now plain `int64`/`bool`
package globals guarded by a single `sync.RWMutex`.
`fromPubkeyBackfillSnapshot()` returns all three under one RLock.
`TestHealthzFromPubkeyBackfillConsistentSnapshot` races a writer
(lock-step total/processed updates with periodic done flips) against 8
readers hammering `/api/healthz`, asserting `processed<=total` and
`(done => processed==total)` on every response. Verified the test
catches torn reads (manually injected a 3-RLock implementation; test
failed within milliseconds with "processed>total" and "done=true but
processed!=total" errors).

---------

Co-authored-by: openclaw-bot <bot@openclaw.local>
Co-authored-by: openclaw-bot <bot@openclaw.dev>
2026-05-06 23:50:44 -07:00

262 lines
8.4 KiB
Go

package main
// from_pubkey migration (#1143).
//
// Adds the `transmissions.from_pubkey` column + index, and provides an async
// backfill that populates the column from `decoded_json` for ADVERT packets
// whose `from_pubkey` is still NULL.
//
// Why a column at all: the legacy attribution path used
// `WHERE decoded_json LIKE '%pubkey%'` (and `OR LIKE '%name%'`). This is
// structurally unsound (adversarial spoofing + accidental hex-substring
// false positives + full table scan). The column gives us exact match,
// O(log n) lookups, and an explicit, auditable attribution surface.
//
// Backfill is run async (best-effort) so it cannot block server startup
// even on prod-sized DBs (100K+ transmissions). Queries handle NULL
// gracefully (return empty for that pubkey, same as today's behaviour
// for unknown pubkeys).
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"sync"
"time"
)
// ensureFromPubkeyColumn adds the from_pubkey column + index to the
// transmissions table if missing. Safe to call repeatedly.
func ensureFromPubkeyColumn(dbPath string) error {
rw, err := cachedRW(dbPath)
if err != nil {
return err
}
has, err := tableHasColumn(rw, "transmissions", "from_pubkey")
if err != nil {
return fmt.Errorf("inspect transmissions: %w", err)
}
if !has {
if _, err := rw.Exec("ALTER TABLE transmissions ADD COLUMN from_pubkey TEXT"); err != nil {
return fmt.Errorf("add from_pubkey column: %w", err)
}
log.Println("[store] Added from_pubkey column to transmissions (#1143)")
}
if _, err := rw.Exec("CREATE INDEX IF NOT EXISTS idx_transmissions_from_pubkey ON transmissions(from_pubkey)"); err != nil {
return fmt.Errorf("create idx_transmissions_from_pubkey: %w", err)
}
return nil
}
// fromPubkeyBackfillProgress reports backfill state for /api/healthz.
// All three values are read together via fromPubkeyBackfillSnapshot()
// under a single RWMutex so /api/healthz never sees a torn snapshot
// (e.g. done=true with processed<total). Updates use the Set/Mark
// helpers which take the write lock.
//
// Cycle-3 m2c: previously these were independent atomic.{Int64,Bool};
// healthz read each one separately and could observe an interleaved
// write between Loads. The mutex-guarded snapshot fixes that.
var (
fromPubkeyBackfillMu sync.RWMutex
fromPubkeyBackfillTotal int64
fromPubkeyBackfillProcessed int64
fromPubkeyBackfillDone bool
)
// fromPubkeyBackfillSnapshot returns a consistent snapshot of all three
// backfill progress fields under a single read lock.
func fromPubkeyBackfillSnapshot() (total, processed int64, done bool) {
fromPubkeyBackfillMu.RLock()
defer fromPubkeyBackfillMu.RUnlock()
return fromPubkeyBackfillTotal, fromPubkeyBackfillProcessed, fromPubkeyBackfillDone
}
func fromPubkeyBackfillSetTotal(v int64) {
fromPubkeyBackfillMu.Lock()
fromPubkeyBackfillTotal = v
fromPubkeyBackfillMu.Unlock()
}
func fromPubkeyBackfillSetProcessed(v int64) {
fromPubkeyBackfillMu.Lock()
fromPubkeyBackfillProcessed = v
fromPubkeyBackfillMu.Unlock()
}
func fromPubkeyBackfillMarkDone() {
fromPubkeyBackfillMu.Lock()
fromPubkeyBackfillDone = true
fromPubkeyBackfillMu.Unlock()
}
// fromPubkeyBackfillReset zeroes all three fields atomically. Used by
// tests; never called from production code.
func fromPubkeyBackfillReset() {
fromPubkeyBackfillMu.Lock()
fromPubkeyBackfillTotal = 0
fromPubkeyBackfillProcessed = 0
fromPubkeyBackfillDone = false
fromPubkeyBackfillMu.Unlock()
}
// startFromPubkeyBackfill is the production entry point used by main.go to
// launch the backfill so it cannot block startup. It MUST dispatch the
// backfill in a goroutine; the dispatch path is gated by
// TestBackfillFromPubkey_DoesNotBlockBoot — if the `go` keyword below is ever
// removed, that test fails because dispatch becomes synchronous and exceeds
// the 50ms boot budget.
func startFromPubkeyBackfill(dbPath string, chunkSize int, yieldDuration time.Duration) {
// MUST stay `go` — TestBackfillFromPubkey_DoesNotBlockBoot fails if
// this becomes synchronous (boot dispatch budget exceeds 50ms).
go backfillFromPubkeyAsync(dbPath, chunkSize, yieldDuration)
}
// backfillFromPubkeyAsync scans transmissions where from_pubkey IS NULL and
// populates from_pubkey by parsing decoded_json. Runs in chunks with a
// short yield between chunks so it can't starve other writers.
//
// Strategy:
// - ADVERT (payload_type = 4) -> decoded_json.pubKey
// - other types -> leave NULL (queries handle NULL gracefully)
//
// chunkSize and yieldDuration are tunable for tests.
func backfillFromPubkeyAsync(dbPath string, chunkSize int, yieldDuration time.Duration) {
defer func() {
if r := recover(); r != nil {
log.Printf("[store] backfillFromPubkeyAsync panic recovered: %v", r)
}
fromPubkeyBackfillMarkDone()
}()
if chunkSize <= 0 {
chunkSize = 5000
}
rw, err := cachedRW(dbPath)
if err != nil {
log.Printf("[store] from_pubkey backfill: open rw error: %v", err)
return
}
var total int64
if err := rw.QueryRow(
"SELECT COUNT(*) FROM transmissions WHERE from_pubkey IS NULL AND payload_type = 4",
).Scan(&total); err != nil {
log.Printf("[store] from_pubkey backfill: count error: %v", err)
return
}
fromPubkeyBackfillSetTotal(total)
if total == 0 {
log.Println("[store] from_pubkey backfill: nothing to do")
return
}
log.Printf("[store] from_pubkey backfill starting: %d ADVERT rows", total)
updateStmt, err := rw.Prepare("UPDATE transmissions SET from_pubkey = ? WHERE id = ?")
if err != nil {
log.Printf("[store] from_pubkey backfill: prepare update: %v", err)
return
}
defer updateStmt.Close()
var processed int64
for {
rows, err := rw.Query(
"SELECT id, decoded_json FROM transmissions WHERE from_pubkey IS NULL AND payload_type = 4 LIMIT ?",
chunkSize)
if err != nil {
log.Printf("[store] from_pubkey backfill: select error: %v", err)
return
}
type row struct {
id int64
pk string
}
batch := make([]row, 0, chunkSize)
for rows.Next() {
var id int64
var dj sql.NullString
if err := rows.Scan(&id, &dj); err != nil {
continue
}
pk := extractPubkeyFromAdvertJSON(dj.String)
batch = append(batch, row{id: id, pk: pk})
}
rows.Close()
if len(batch) == 0 {
break
}
// Apply updates in a single tx for throughput.
tx, err := rw.Begin()
if err != nil {
log.Printf("[store] from_pubkey backfill: begin tx: %v", err)
return
}
txStmt := tx.Stmt(updateStmt)
for _, b := range batch {
// Sentinel convention for transmissions.from_pubkey (#1143, m5):
// NULL — row has not yet been scanned by this backfill.
// "" — scanned, no extractable pubkey (malformed/legacy ADVERT
// decoded_json, or a JSON shape we don't understand).
// hex — scanned, pubkey successfully extracted.
//
// The "" sentinel exists ONLY in this backfill path: it's how we
// avoid the #1119 infinite-rescan loop (the WHERE clause is
// `from_pubkey IS NULL`, so once we mark a row "" it never matches
// again). The ingest write path (cmd/ingestor/db.go ~1289) leaves
// from_pubkey NULL when PubKey is empty; the two states are
// semantically equivalent ("we have no pubkey for this row") and
// all attribution call sites query `from_pubkey = ?` with a real
// pubkey, so neither NULL nor "" matches — no UX divergence.
var val interface{}
if b.pk != "" {
val = b.pk
} else {
val = "" // scanned, no extractable pubkey — see comment above
}
if _, err := txStmt.Exec(val, b.id); err != nil {
// non-fatal; log first failure per chunk and keep going
log.Printf("[store] from_pubkey backfill: update id=%d: %v", b.id, err)
}
}
if err := tx.Commit(); err != nil {
log.Printf("[store] from_pubkey backfill: commit: %v", err)
return
}
processed += int64(len(batch))
fromPubkeyBackfillSetProcessed(processed)
if len(batch) < chunkSize {
break
}
if yieldDuration > 0 {
time.Sleep(yieldDuration)
}
}
log.Printf("[store] from_pubkey backfill complete: %d rows processed", processed)
}
// extractPubkeyFromAdvertJSON parses an ADVERT decoded_json blob and returns
// the pubKey field, or "" if absent/invalid. Lenient: any parse error yields
// the empty string rather than a panic.
func extractPubkeyFromAdvertJSON(s string) string {
if s == "" {
return ""
}
var m map[string]interface{}
if err := json.Unmarshal([]byte(s), &m); err != nil {
return ""
}
if v, ok := m["pubKey"].(string); ok {
return v
}
return ""
}