fix(#1558): mirror Load's resolved_path indexing into loadChunk via shared helper

GREEN for the test added in the previous commit. Root cause:

  cmd/server/store.go Load() (~783-799) per-observation unmarshals
  o.resolved_path, extracts every relay-hop pubkey, and feeds them
  to addToByNode + addToResolvedPubkeyIndex +
  addResolvedPubkeysToPathHopIndex. loadChunk (~937-1023) — the
  background-backfill path that loads everything past
  hotStartupHours — scans the column into resolvedPathStr but never
  unmarshals it. Result: a contract violation between Load and
  loadChunk: same SQL rows, two different post-conditions. After
  every container restart, transmissions older than hotStartupHours
  are present in s.packets / s.byHash / s.byTxID but missing from
  s.byNode[relayPK] for every relay pubkey, collapsing Home-page
  per-node packetsToday / totalTransmissions / observers /
  avgHops / avgSnr for relay-heavy nodes (e.g. 753 → 8 in the
  reporter's reproduction). Stats only self-heal as live ingest
  re-populates byNode through the ingest path.

Fix shape:

1. Extract a single (s *PacketStore) indexResolvedPathHops helper
   that owns the addToByNode + addResolvedPubkeysToPathHopIndex +
   addToResolvedPubkeyIndex sequence. This collapses three inline
   duplications across Load, the MQTT ingest path (~2293-2306),
   and the late-arriving-observation path (~2630-2643) into one
   point of truth so the invariant is structural, not duplicated.

2. In loadChunk, unmarshal resolved_path per row OUTSIDE the merge
   critical section, dedupe relay pubkeys per txID into
   localResolvedPKsByTx, and feed them through indexResolvedPathHops
   inside the existing per-batch merge lock alongside indexByNode.
   This matches loadChunk's 'build local, merge under lock' shape
   — the lock-held work stays bounded (no JSON unmarshal under
   s.mu) per AGENTS.md performance rule '#0: no expensive work
   under locks'.

3. Re-point Load and both ingest sites at the helper. Load's
   semantic behaviour is byte-identical (helper does exactly what
   the inlined block did).

Test:

  go test -run TestLoadChunk_IndexesResolvedPathPubkeys_Issue1558     ./cmd/server/...
  → PASS

Full suite: cmd/server go test ./... → PASS.

Companion issue #1546 covers the sticky /api/stats backfilling=true
flag mentioned in the reporter's writeup; not fixed here.

Closes #1558
This commit is contained in:
corescope-bot
2026-06-04 21:13:46 +00:00
parent 892424e60e
commit c6768dcacf
+79 -25
View File
@@ -784,18 +784,8 @@ func (s *PacketStore) Load() error {
if rpStr != "" {
rp := unmarshalResolvedPath(rpStr)
pks := extractResolvedPubkeys(rp)
// Feed decode-window consumers for this observation's pubkeys
if len(pks) > 0 {
// addToByNode for relay nodes
for _, pk := range pks {
s.addToByNode(tx, pk)
}
// touchRelayLastSeen handled in post-load pass
// byPathHop resolved-key entries (#1164: helper invalidates relay stats cache).
s.addResolvedPubkeysToPathHopIndex(tx, pks, hopsSeen)
// resolvedPubkeyIndex
s.addToResolvedPubkeyIndex(tx.ID, pks)
}
// Single point of truth — see indexResolvedPathHops doc + #1558.
s.indexResolvedPathHops(tx, pks, hopsSeen)
}
tx.Observations = append(tx.Observations, obs)
@@ -923,6 +913,12 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
localByTxID := make(map[int]*StoreTx)
localByObsID := make(map[int]*StoreObs)
localByObserver := make(map[string][]*StoreObs)
// Issue #1558: accumulate the union of resolved_path relay-hop
// pubkeys per tx outside the lock. We unmarshal + dedupe here so the
// merge critical section below only does map-append work, mirroring
// the rest of loadChunk's "build local, merge under lock" shape.
localResolvedPKsByTx := make(map[int][]string)
localResolvedSeenByTx := make(map[int]map[string]bool)
var localTotalObs int
var localTrackedBytes int64
var localMaxTxID int
@@ -1022,6 +1018,32 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
}
localTotalObs++
localTrackedBytes += estimateStoreObsBytes(obs)
// Issue #1558: collect resolved_path relay-hop pubkeys for
// this observation. We unmarshal + dedupe OUTSIDE the merge
// critical section so the lock-held work in the per-batch
// merge below stays bounded. Without this, background-loaded
// transmissions silently miss byNode entries for every relay
// they were heard via — Load() does the equivalent inline.
rpStr := nullStrVal(resolvedPathStr)
if rpStr != "" {
rp := unmarshalResolvedPath(rpStr)
pks := extractResolvedPubkeys(rp)
if len(pks) > 0 {
seen := localResolvedSeenByTx[txID]
if seen == nil {
seen = make(map[string]bool, len(pks))
localResolvedSeenByTx[txID] = seen
}
for _, pk := range pks {
if seen[pk] {
continue
}
seen[pk] = true
localResolvedPKsByTx[txID] = append(localResolvedPKsByTx[txID], pk)
}
}
}
}
}
if err := rows.Err(); err != nil {
@@ -1084,6 +1106,10 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
}
s.mu.Lock()
// Issue #1558: hopsSeen scratch map for indexResolvedPathHops →
// addResolvedPubkeysToPathHopIndex. Allocated once per batch and
// reused across txs (clear()d on each call inside the helper).
hopsSeen := make(map[string]bool)
newObsIDs := make(map[int]bool, len(batchObsIDs))
for k := range batchObsIDs {
if s.byObsID[k] == nil {
@@ -1119,6 +1145,14 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
s.byPayloadType[pt] = append(s.byPayloadType[pt], tx)
}
s.trackAdvertPubkey(tx)
// Issue #1558: mirror Load()'s 783-799 resolved-path branch.
// Without this, background-loaded transmissions never enter
// byNode under their relay-hop pubkeys → Home-page per-node
// stats collapse after restart for relay-heavy nodes. The
// pubkey union was pre-built outside the lock above.
if pks := localResolvedPKsByTx[tx.ID]; len(pks) > 0 {
s.indexResolvedPathHops(tx, pks, hopsSeen)
}
}
s.mu.Unlock()
runtime.Gosched()
@@ -1277,6 +1311,35 @@ func pathLen(pathJSON string) int {
return len(hops)
}
// indexResolvedPathHops indexes a transmission under every relay-hop pubkey
// extracted from an observation's resolved_path, and refreshes the dependent
// resolved-pubkey + path-hop indexes. This is the single point of truth for
// the "feed decode-window consumers for resolved-path pubkeys" contract that
// must hold across every code path that materializes a transmission into the
// in-memory store: initial Load (cmd/server/store.go ~783-799), background
// chunk loads (loadChunk, see issue #1558), MQTT ingest (~2293-2306), and
// late-arriving-observation ingest (~2630-2643). Duplicating these three
// calls inline let loadChunk silently drop the branch and collapse per-node
// Home-page stats after restart for relay-heavy nodes — see #1558.
//
// Caller contract:
// - Must hold s.mu write lock (addToByNode / addToResolvedPubkeyIndex /
// addResolvedPubkeysToPathHopIndex all mutate store state).
// - pks should be the output of extractResolvedPubkeys (no nils, no
// empties); the helper is a no-op when pks is empty.
// - hopsSeen is a reusable scratch map; addResolvedPubkeysToPathHopIndex
// clear()s it on entry.
func (s *PacketStore) indexResolvedPathHops(tx *StoreTx, pks []string, hopsSeen map[string]bool) {
if len(pks) == 0 {
return
}
for _, pk := range pks {
s.addToByNode(tx, pk)
}
s.addResolvedPubkeysToPathHopIndex(tx, pks, hopsSeen)
s.addToResolvedPubkeyIndex(tx.ID, pks)
}
// indexByNode extracts pubkeys from decoded_json and indexes the transmission.
// indexByNode indexes a transmission under all pubkeys found in its decoded
// JSON. Resolved path pubkeys are handled separately via the decode-window.
@@ -2295,13 +2358,8 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
if r.pathJSON != "" && r.pathJSON != "[]" && cachedPM != nil {
rpForBroadcast = resolvePathForObs(r.pathJSON, r.observerID, tx, cachedPM, cachedGraph)
resolvedPubkeys = extractResolvedPubkeys(rpForBroadcast)
// Feed decode-window consumers: addToByNode + resolvedPubkeyIndex
for _, pk := range resolvedPubkeys {
s.addToByNode(tx, pk)
}
s.addToResolvedPubkeyIndex(tx.ID, resolvedPubkeys)
// byPathHop resolved-key entries (#1164: helper invalidates relay stats cache).
s.addResolvedPubkeysToPathHopIndex(tx, resolvedPubkeys, hopsSeen)
// Single point of truth — see indexResolvedPathHops doc + #1558.
s.indexResolvedPathHops(tx, resolvedPubkeys, hopsSeen)
}
// Stash rpForBroadcast for later broadcast/persist (keyed by obs ID)
if rpForBroadcast != nil {
@@ -2632,12 +2690,8 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
if pm != nil {
obsResolvedPath = resolvePathForObs(r.pathJSON, r.observerID, tx, pm, graphRef)
pks := extractResolvedPubkeys(obsResolvedPath)
for _, pk := range pks {
s.addToByNode(tx, pk)
}
s.addToResolvedPubkeyIndex(tx.ID, pks)
// byPathHop resolved-key entries (#1164: helper invalidates relay stats cache).
s.addResolvedPubkeysToPathHopIndex(tx, pks, hopsSeen)
// Single point of truth — see indexResolvedPathHops doc + #1558.
s.indexResolvedPathHops(tx, pks, hopsSeen)
}
}
// Stash for broadcast/persist