fix(#1558): mirror Load's resolved_path indexing into loadChunk (#1582)

## Summary

Closes #1558.

The background-backfill path (`loadChunk`) silently dropped the
resolved-path
indexing branch that `Load` performs per observation. Same SQL rows, two
different post-conditions — a contract violation between the hot-startup
load and the background chunk load.

## Root cause (the differential matters)

The reporter's hypothesis — `indexByNode` not invoked on
background-loaded
transmissions — was 90% right but pointed at the wrong line.

- `cmd/server/store.go:1116` already calls `s.indexByNode(tx)` inside
the
  loadChunk per-batch merge lock for every backfilled tx. Decoded
  `pubKey` / `destPubKey` / `srcPubKey` ARE indexed.
- `indexByNode` (store.go:1313 pre-patch) only reads three fields from
  `decoded_json`. It does NOT and cannot touch `resolved_path`.
- `Load` (store.go:783-799) per-observation unmarshals
`o.resolved_path`,
  extracts every relay-hop pubkey, and feeds them through `addToByNode`
  + `addResolvedPubkeysToPathHopIndex` + `addToResolvedPubkeyIndex`.
- `loadChunk` (store.go:937-1023 pre-patch) selects `o.resolved_path`
into
  `resolvedPathStr`… then never touches it.

Result: after a container restart, every transmission older than
`hotStartupHours` ends up present in `s.packets` / `s.byHash` /
`s.byTxID`
but missing from `s.byNode[relayPK]` for every relay pubkey. Home-page
per-node `packetsToday` / `totalTransmissions` / `observers` / `avgHops`
/ `avgSnr` collapse for relay-heavy nodes (753 → 8 in the reporter's
trace). Stats only self-heal as live ingest re-populates `byNode`
through
the ingest path (which DID call the full sequence inline).

## Fix shape

1. **Extract a shared `(s *PacketStore) indexResolvedPathHops(tx, pks,
hopsSeen)` helper.**
   Owns the `addToByNode` + `addResolvedPubkeysToPathHopIndex` +
   `addToResolvedPubkeyIndex` sequence. Single point of truth so the
   "feed decode-window consumers for resolved-path pubkeys" invariant is
   structural, not duplicated.
2. **Re-point `Load` and both ingest sites at the helper.** Load's
semantic
   behaviour is byte-identical with the prior inline block.
3. **Add the missing call in `loadChunk`.** Per AGENTS.md performance
rule
   #0 ("no expensive work under locks"), unmarshal `resolved_path` and
   dedupe relay pubkeys per txID **outside** the merge critical section
   (`localResolvedPKsByTx`), then feed the pre-built slice through
   `indexResolvedPathHops` inside the existing per-batch lock alongside
   `indexByNode`. Mirrors `loadChunk`'s "build local, merge under lock"
   shape.

## TDD: red → green commits

```
892424e6  test(#1558): RED — loadChunk drops resolved_path relay-pubkey indexing
c6768dca  fix(#1558): mirror Load's resolved_path indexing into loadChunk via shared helper
```

The RED commit adds `TestLoadChunk_IndexesResolvedPathPubkeys_Issue1558`
to
`cmd/server/loadchunk_resolved_path_1558_test.go`. It loads a fixture DB
containing 3 transmissions each with an observation whose
`resolved_path`
lists two distinct relay pubkeys, calls `Load()` with `HotStartupHours:
1`
to confirm the rows are NOT picked up by the hot path, then calls
`loadChunk` directly over the 48h-old window and asserts
`s.byNode[relayPK]` contains 3 transmissions.

```
=== RUN   TestLoadChunk_IndexesResolvedPathPubkeys_Issue1558  (RED, pre-fix)
    loadchunk_resolved_path_1558_test.go:154: byNode[1111…]: got 0 transmissions, want 3 — loadChunk dropped the resolved_path indexing branch (issue #1558)
    loadchunk_resolved_path_1558_test.go:154: byNode[2222…]: got 0 transmissions, want 3 — loadChunk dropped the resolved_path indexing branch (issue #1558)
--- FAIL: TestLoadChunk_IndexesResolvedPathPubkeys_Issue1558 (0.01s)

=== RUN   TestLoadChunk_IndexesResolvedPathPubkeys_Issue1558  (GREEN, post-fix)
--- PASS: TestLoadChunk_IndexesResolvedPathPubkeys_Issue1558 (0.01s)
```

Full `go test ./...` from `cmd/server`: PASS (45.3s).

## Files changed

- `cmd/server/store.go` — helper + loadChunk fix + 3 call-site refactors
- `cmd/server/loadchunk_resolved_path_1558_test.go` — regression test +
fixture

## Performance / lock-scope

The merge critical section now also calls `indexResolvedPathHops`, which
is
three map-append loops over the pre-deduplicated pubkey slice for this
tx.
JSON unmarshal happens once per observation **outside** any lock, in the
same row loop as the existing scan work. No new allocations under lock
beyond what `addToByNode` etc already do per relay pubkey. Matches the
shape of the existing `indexByNode(tx)` call already in this critical
section.

## Out of scope

`/api/stats backfilling=true` sticky flag (mentioned in the reporter's
writeup) is tracked separately at #1546.

## Preflight overrides

- check-async-migrations: justified — flagged lines are SQLite DDL in
the
  in-memory test fixture `createTestDBWithResolvedPath` (test-only DB
  created via `sql.Open(":memory:"-like temp path)`, not a production
  migration). Mirrors the identical pattern in
  `cmd/server/bounded_load_test.go:163-167` which the gate also flags as
  a false positive. No production schema is touched in this PR.

---------

Co-authored-by: corescope-bot <bot@corescope.local>
This commit is contained in:
Kpa-clawbot
2026-06-04 14:41:22 -07:00
committed by GitHub
parent 7292d60fbe
commit 9465949e79
2 changed files with 239 additions and 25 deletions
@@ -0,0 +1,160 @@
package main
import (
"database/sql"
"fmt"
"path/filepath"
"testing"
"time"
_ "modernc.org/sqlite"
)
// createTestDBWithResolvedPath creates a fixture DB containing numTx old
// transmissions (48h ago, outside any default hot window) where each
// observation has a non-empty resolved_path JSON listing relay-hop pubkeys.
// Mirrors createTestDBWithAgedPackets shape but adds the resolved_path
// column so loadChunk's hasResolvedPath branch is exercised.
func createTestDBWithResolvedPath(t *testing.T, numTx int, relayPubkeys []string) string {
t.Helper()
dir := t.TempDir()
dbPath := filepath.Join(dir, "test.db")
conn, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL")
if err != nil {
t.Fatal(err)
}
defer conn.Close()
exec := func(s string, args ...interface{}) {
if _, err := conn.Exec(s, args...); err != nil {
t.Fatalf("setup exec failed: %v\nSQL: %s", err, s)
}
}
exec(`CREATE TABLE transmissions (
id INTEGER PRIMARY KEY,
raw_hex TEXT, hash TEXT, first_seen TEXT,
route_type INTEGER, payload_type INTEGER, payload_version INTEGER,
decoded_json TEXT
)`)
exec(`CREATE TABLE observations (
id INTEGER PRIMARY KEY,
transmission_id INTEGER,
observer_id TEXT, observer_name TEXT,
direction TEXT, snr REAL, rssi REAL, score INTEGER,
path_json TEXT, timestamp TEXT,
raw_hex TEXT,
resolved_path TEXT
)`)
exec(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`)
exec(`CREATE TABLE nodes (pubkey TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL, last_seen TEXT, first_seen TEXT, frequency REAL)`)
exec(`CREATE TABLE schema_version (version INTEGER)`)
exec(`INSERT INTO schema_version (version) VALUES (1)`)
exec(`CREATE INDEX idx_tx_first_seen ON transmissions(first_seen)`)
// Build resolved_path JSON array of pubkey strings: ["pk1","pk2",...]
rpJSON := "["
for i, pk := range relayPubkeys {
if i > 0 {
rpJSON += ","
}
rpJSON += fmt.Sprintf("%q", pk)
}
rpJSON += "]"
now := time.Now().UTC()
for i := 0; i < numTx; i++ {
ts := now.Add(-48 * time.Hour).Add(time.Duration(i) * time.Second).Format(time.RFC3339)
hash := fmt.Sprintf("hash1558_%d", i)
exec("INSERT INTO transmissions VALUES (?,?,?,?,0,4,1,?)",
i+1, "aa", hash, ts, `{}`)
exec("INSERT INTO observations (id, transmission_id, observer_id, observer_name, direction, snr, rssi, score, path_json, timestamp, raw_hex, resolved_path) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
i+1, i+1, "obs1", "Obs1", "RX", -10.0, -80.0, 5, `[]`, ts, "", rpJSON)
}
return dbPath
}
// TestLoadChunk_IndexesResolvedPathPubkeys_Issue1558 verifies the
// contract-violation fix from #1558:
//
// `Load` (cmd/server/store.go:783-799) unmarshals each observation's
// resolved_path column and feeds every relay-hop pubkey through
// addToByNode / addResolvedPubkeysToPathHopIndex /
// addToResolvedPubkeyIndex. `loadChunk` (cmd/server/store.go:937-1023)
// scans the same column into resolvedPathStr but never feeds it
// anywhere — so background-backfilled transmissions never appear under
// their relay pubkeys in s.byNode, even though the same exact rows do
// when they happen to fall inside the hot startup window.
//
// Symptom in production: Home page per-node `packetsToday` /
// `totalTransmissions` / observer counts collapse after a container
// restart for any node that primarily appears as a relay (rather than
// as the endpoint pubKey/destPubKey/srcPubKey of a packet), because the
// background backfill path silently drops the relay-hop indexing
// branch. See issue #1558 for the full trace + diagnosis.
//
// This test loads a fixture DB exclusively via loadChunk (skipping
// Load) and asserts that for each relay pubkey present in
// `resolved_path` of every observation, s.byNode contains the
// transmission.
func TestLoadChunk_IndexesResolvedPathPubkeys_Issue1558(t *testing.T) {
// Two distinct relay pubkeys appear in every observation's resolved_path.
// Neither is an endpoint pubkey in decoded_json — so the ONLY path
// they can enter byNode through is the resolved_path branch.
relayPK1 := "1111111111111111111111111111111111111111111111111111111111111111"
relayPK2 := "2222222222222222222222222222222222222222222222222222222222222222"
dbPath := createTestDBWithResolvedPath(t, 3, []string{relayPK1, relayPK2})
db, err := OpenDB(dbPath)
if err != nil {
t.Fatal(err)
}
defer db.conn.Close()
if !db.hasResolvedPath {
t.Fatalf("setup: fixture should expose resolved_path column; hasResolvedPath=false")
}
store := NewPacketStore(db, &PacketStoreConfig{
RetentionHours: 72,
HotStartupHours: 1, // initial Load should NOT pick up 48h-old fixture rows
})
if err := store.Load(); err != nil {
t.Fatal(err)
}
// Confirm the fixture rows are outside the hot window — Load() must
// not have already populated byNode for the relay pubkeys; otherwise
// the test would not actually be exercising loadChunk.
if len(store.byNode[relayPK1]) != 0 {
t.Fatalf("setup: Load() unexpectedly picked up 48h-old rows; "+
"byNode[relayPK1]=%d entries (expected 0)", len(store.byNode[relayPK1]))
}
// Trigger background backfill of the 48h-old window via loadChunk —
// this is the code path under test.
chunkStart := time.Now().UTC().Add(-72 * time.Hour)
chunkEnd := time.Now().UTC().Add(-1 * time.Hour)
if err := store.loadChunk(chunkStart, chunkEnd); err != nil {
t.Fatalf("loadChunk failed: %v", err)
}
// Sanity: loadChunk did merge the transmissions into the slice.
if len(store.packets) != 3 {
t.Fatalf("loadChunk should have merged 3 transmissions; got %d", len(store.packets))
}
// THE ASSERTION: every relay pubkey listed in resolved_path must be
// indexed in byNode for every transmission, because loadChunk's
// per-row scan should mirror Load()'s 783-799 block.
for _, relayPK := range []string{relayPK1, relayPK2} {
got := len(store.byNode[relayPK])
if got != 3 {
t.Errorf("byNode[%s]: got %d transmissions, want 3 — "+
"loadChunk dropped the resolved_path indexing branch "+
"(issue #1558)",
relayPK, got)
}
}
}
+79 -25
View File
@@ -784,18 +784,8 @@ func (s *PacketStore) Load() error {
if rpStr != "" {
rp := unmarshalResolvedPath(rpStr)
pks := extractResolvedPubkeys(rp)
// Feed decode-window consumers for this observation's pubkeys
if len(pks) > 0 {
// addToByNode for relay nodes
for _, pk := range pks {
s.addToByNode(tx, pk)
}
// touchRelayLastSeen handled in post-load pass
// byPathHop resolved-key entries (#1164: helper invalidates relay stats cache).
s.addResolvedPubkeysToPathHopIndex(tx, pks, hopsSeen)
// resolvedPubkeyIndex
s.addToResolvedPubkeyIndex(tx.ID, pks)
}
// Single point of truth — see indexResolvedPathHops doc + #1558.
s.indexResolvedPathHops(tx, pks, hopsSeen)
}
tx.Observations = append(tx.Observations, obs)
@@ -923,6 +913,12 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
localByTxID := make(map[int]*StoreTx)
localByObsID := make(map[int]*StoreObs)
localByObserver := make(map[string][]*StoreObs)
// Issue #1558: accumulate the union of resolved_path relay-hop
// pubkeys per tx outside the lock. We unmarshal + dedupe here so the
// merge critical section below only does map-append work, mirroring
// the rest of loadChunk's "build local, merge under lock" shape.
localResolvedPKsByTx := make(map[int][]string)
localResolvedSeenByTx := make(map[int]map[string]bool)
var localTotalObs int
var localTrackedBytes int64
var localMaxTxID int
@@ -1022,6 +1018,32 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
}
localTotalObs++
localTrackedBytes += estimateStoreObsBytes(obs)
// Issue #1558: collect resolved_path relay-hop pubkeys for
// this observation. We unmarshal + dedupe OUTSIDE the merge
// critical section so the lock-held work in the per-batch
// merge below stays bounded. Without this, background-loaded
// transmissions silently miss byNode entries for every relay
// they were heard via — Load() does the equivalent inline.
rpStr := nullStrVal(resolvedPathStr)
if rpStr != "" {
rp := unmarshalResolvedPath(rpStr)
pks := extractResolvedPubkeys(rp)
if len(pks) > 0 {
seen := localResolvedSeenByTx[txID]
if seen == nil {
seen = make(map[string]bool, len(pks))
localResolvedSeenByTx[txID] = seen
}
for _, pk := range pks {
if seen[pk] {
continue
}
seen[pk] = true
localResolvedPKsByTx[txID] = append(localResolvedPKsByTx[txID], pk)
}
}
}
}
}
if err := rows.Err(); err != nil {
@@ -1084,6 +1106,10 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
}
s.mu.Lock()
// Issue #1558: hopsSeen scratch map for indexResolvedPathHops →
// addResolvedPubkeysToPathHopIndex. Allocated once per batch and
// reused across txs (clear()d on each call inside the helper).
hopsSeen := make(map[string]bool)
newObsIDs := make(map[int]bool, len(batchObsIDs))
for k := range batchObsIDs {
if s.byObsID[k] == nil {
@@ -1119,6 +1145,14 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
s.byPayloadType[pt] = append(s.byPayloadType[pt], tx)
}
s.trackAdvertPubkey(tx)
// Issue #1558: mirror Load()'s 783-799 resolved-path branch.
// Without this, background-loaded transmissions never enter
// byNode under their relay-hop pubkeys → Home-page per-node
// stats collapse after restart for relay-heavy nodes. The
// pubkey union was pre-built outside the lock above.
if pks := localResolvedPKsByTx[tx.ID]; len(pks) > 0 {
s.indexResolvedPathHops(tx, pks, hopsSeen)
}
}
s.mu.Unlock()
runtime.Gosched()
@@ -1277,6 +1311,35 @@ func pathLen(pathJSON string) int {
return len(hops)
}
// indexResolvedPathHops indexes a transmission under every relay-hop pubkey
// extracted from an observation's resolved_path, and refreshes the dependent
// resolved-pubkey + path-hop indexes. This is the single point of truth for
// the "feed decode-window consumers for resolved-path pubkeys" contract that
// must hold across every code path that materializes a transmission into the
// in-memory store: initial Load (cmd/server/store.go ~783-799), background
// chunk loads (loadChunk, see issue #1558), MQTT ingest (~2293-2306), and
// late-arriving-observation ingest (~2630-2643). Duplicating these three
// calls inline let loadChunk silently drop the branch and collapse per-node
// Home-page stats after restart for relay-heavy nodes — see #1558.
//
// Caller contract:
// - Must hold s.mu write lock (addToByNode / addToResolvedPubkeyIndex /
// addResolvedPubkeysToPathHopIndex all mutate store state).
// - pks should be the output of extractResolvedPubkeys (no nils, no
// empties); the helper is a no-op when pks is empty.
// - hopsSeen is a reusable scratch map; addResolvedPubkeysToPathHopIndex
// clear()s it on entry.
func (s *PacketStore) indexResolvedPathHops(tx *StoreTx, pks []string, hopsSeen map[string]bool) {
if len(pks) == 0 {
return
}
for _, pk := range pks {
s.addToByNode(tx, pk)
}
s.addResolvedPubkeysToPathHopIndex(tx, pks, hopsSeen)
s.addToResolvedPubkeyIndex(tx.ID, pks)
}
// indexByNode extracts pubkeys from decoded_json and indexes the transmission.
// indexByNode indexes a transmission under all pubkeys found in its decoded
// JSON. Resolved path pubkeys are handled separately via the decode-window.
@@ -2295,13 +2358,8 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
if r.pathJSON != "" && r.pathJSON != "[]" && cachedPM != nil {
rpForBroadcast = resolvePathForObs(r.pathJSON, r.observerID, tx, cachedPM, cachedGraph)
resolvedPubkeys = extractResolvedPubkeys(rpForBroadcast)
// Feed decode-window consumers: addToByNode + resolvedPubkeyIndex
for _, pk := range resolvedPubkeys {
s.addToByNode(tx, pk)
}
s.addToResolvedPubkeyIndex(tx.ID, resolvedPubkeys)
// byPathHop resolved-key entries (#1164: helper invalidates relay stats cache).
s.addResolvedPubkeysToPathHopIndex(tx, resolvedPubkeys, hopsSeen)
// Single point of truth — see indexResolvedPathHops doc + #1558.
s.indexResolvedPathHops(tx, resolvedPubkeys, hopsSeen)
}
// Stash rpForBroadcast for later broadcast/persist (keyed by obs ID)
if rpForBroadcast != nil {
@@ -2632,12 +2690,8 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
if pm != nil {
obsResolvedPath = resolvePathForObs(r.pathJSON, r.observerID, tx, pm, graphRef)
pks := extractResolvedPubkeys(obsResolvedPath)
for _, pk := range pks {
s.addToByNode(tx, pk)
}
s.addToResolvedPubkeyIndex(tx.ID, pks)
// byPathHop resolved-key entries (#1164: helper invalidates relay stats cache).
s.addResolvedPubkeysToPathHopIndex(tx, pks, hopsSeen)
// Single point of truth — see indexResolvedPathHops doc + #1558.
s.indexResolvedPathHops(tx, pks, hopsSeen)
}
}
// Stash for broadcast/persist