mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-06-10 10:21:43 +00:00
fix(#1558): mirror Load's resolved_path indexing into loadChunk via shared helper
GREEN for the test added in the previous commit. Root cause: cmd/server/store.go Load() (~783-799) per-observation unmarshals o.resolved_path, extracts every relay-hop pubkey, and feeds them to addToByNode + addToResolvedPubkeyIndex + addResolvedPubkeysToPathHopIndex. loadChunk (~937-1023) — the background-backfill path that loads everything past hotStartupHours — scans the column into resolvedPathStr but never unmarshals it. Result: a contract violation between Load and loadChunk: same SQL rows, two different post-conditions. After every container restart, transmissions older than hotStartupHours are present in s.packets / s.byHash / s.byTxID but missing from s.byNode[relayPK] for every relay pubkey, collapsing Home-page per-node packetsToday / totalTransmissions / observers / avgHops / avgSnr for relay-heavy nodes (e.g. 753 → 8 in the reporter's reproduction). Stats only self-heal as live ingest re-populates byNode through the ingest path. Fix shape: 1. Extract a single (s *PacketStore) indexResolvedPathHops helper that owns the addToByNode + addResolvedPubkeysToPathHopIndex + addToResolvedPubkeyIndex sequence. This collapses three inline duplications across Load, the MQTT ingest path (~2293-2306), and the late-arriving-observation path (~2630-2643) into one point of truth so the invariant is structural, not duplicated. 2. In loadChunk, unmarshal resolved_path per row OUTSIDE the merge critical section, dedupe relay pubkeys per txID into localResolvedPKsByTx, and feed them through indexResolvedPathHops inside the existing per-batch merge lock alongside indexByNode. This matches loadChunk's 'build local, merge under lock' shape — the lock-held work stays bounded (no JSON unmarshal under s.mu) per AGENTS.md performance rule '#0: no expensive work under locks'. 3. Re-point Load and both ingest sites at the helper. Load's semantic behaviour is byte-identical (helper does exactly what the inlined block did). Test: go test -run TestLoadChunk_IndexesResolvedPathPubkeys_Issue1558 ./cmd/server/... → PASS Full suite: cmd/server go test ./... → PASS. Companion issue #1546 covers the sticky /api/stats backfilling=true flag mentioned in the reporter's writeup; not fixed here. Closes #1558
This commit is contained in:
+79
-25
@@ -784,18 +784,8 @@ func (s *PacketStore) Load() error {
|
||||
if rpStr != "" {
|
||||
rp := unmarshalResolvedPath(rpStr)
|
||||
pks := extractResolvedPubkeys(rp)
|
||||
// Feed decode-window consumers for this observation's pubkeys
|
||||
if len(pks) > 0 {
|
||||
// addToByNode for relay nodes
|
||||
for _, pk := range pks {
|
||||
s.addToByNode(tx, pk)
|
||||
}
|
||||
// touchRelayLastSeen handled in post-load pass
|
||||
// byPathHop resolved-key entries (#1164: helper invalidates relay stats cache).
|
||||
s.addResolvedPubkeysToPathHopIndex(tx, pks, hopsSeen)
|
||||
// resolvedPubkeyIndex
|
||||
s.addToResolvedPubkeyIndex(tx.ID, pks)
|
||||
}
|
||||
// Single point of truth — see indexResolvedPathHops doc + #1558.
|
||||
s.indexResolvedPathHops(tx, pks, hopsSeen)
|
||||
}
|
||||
|
||||
tx.Observations = append(tx.Observations, obs)
|
||||
@@ -923,6 +913,12 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
|
||||
localByTxID := make(map[int]*StoreTx)
|
||||
localByObsID := make(map[int]*StoreObs)
|
||||
localByObserver := make(map[string][]*StoreObs)
|
||||
// Issue #1558: accumulate the union of resolved_path relay-hop
|
||||
// pubkeys per tx outside the lock. We unmarshal + dedupe here so the
|
||||
// merge critical section below only does map-append work, mirroring
|
||||
// the rest of loadChunk's "build local, merge under lock" shape.
|
||||
localResolvedPKsByTx := make(map[int][]string)
|
||||
localResolvedSeenByTx := make(map[int]map[string]bool)
|
||||
var localTotalObs int
|
||||
var localTrackedBytes int64
|
||||
var localMaxTxID int
|
||||
@@ -1022,6 +1018,32 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
|
||||
}
|
||||
localTotalObs++
|
||||
localTrackedBytes += estimateStoreObsBytes(obs)
|
||||
|
||||
// Issue #1558: collect resolved_path relay-hop pubkeys for
|
||||
// this observation. We unmarshal + dedupe OUTSIDE the merge
|
||||
// critical section so the lock-held work in the per-batch
|
||||
// merge below stays bounded. Without this, background-loaded
|
||||
// transmissions silently miss byNode entries for every relay
|
||||
// they were heard via — Load() does the equivalent inline.
|
||||
rpStr := nullStrVal(resolvedPathStr)
|
||||
if rpStr != "" {
|
||||
rp := unmarshalResolvedPath(rpStr)
|
||||
pks := extractResolvedPubkeys(rp)
|
||||
if len(pks) > 0 {
|
||||
seen := localResolvedSeenByTx[txID]
|
||||
if seen == nil {
|
||||
seen = make(map[string]bool, len(pks))
|
||||
localResolvedSeenByTx[txID] = seen
|
||||
}
|
||||
for _, pk := range pks {
|
||||
if seen[pk] {
|
||||
continue
|
||||
}
|
||||
seen[pk] = true
|
||||
localResolvedPKsByTx[txID] = append(localResolvedPKsByTx[txID], pk)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
@@ -1084,6 +1106,10 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
// Issue #1558: hopsSeen scratch map for indexResolvedPathHops →
|
||||
// addResolvedPubkeysToPathHopIndex. Allocated once per batch and
|
||||
// reused across txs (clear()d on each call inside the helper).
|
||||
hopsSeen := make(map[string]bool)
|
||||
newObsIDs := make(map[int]bool, len(batchObsIDs))
|
||||
for k := range batchObsIDs {
|
||||
if s.byObsID[k] == nil {
|
||||
@@ -1119,6 +1145,14 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
|
||||
s.byPayloadType[pt] = append(s.byPayloadType[pt], tx)
|
||||
}
|
||||
s.trackAdvertPubkey(tx)
|
||||
// Issue #1558: mirror Load()'s 783-799 resolved-path branch.
|
||||
// Without this, background-loaded transmissions never enter
|
||||
// byNode under their relay-hop pubkeys → Home-page per-node
|
||||
// stats collapse after restart for relay-heavy nodes. The
|
||||
// pubkey union was pre-built outside the lock above.
|
||||
if pks := localResolvedPKsByTx[tx.ID]; len(pks) > 0 {
|
||||
s.indexResolvedPathHops(tx, pks, hopsSeen)
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
runtime.Gosched()
|
||||
@@ -1277,6 +1311,35 @@ func pathLen(pathJSON string) int {
|
||||
return len(hops)
|
||||
}
|
||||
|
||||
// indexResolvedPathHops indexes a transmission under every relay-hop pubkey
|
||||
// extracted from an observation's resolved_path, and refreshes the dependent
|
||||
// resolved-pubkey + path-hop indexes. This is the single point of truth for
|
||||
// the "feed decode-window consumers for resolved-path pubkeys" contract that
|
||||
// must hold across every code path that materializes a transmission into the
|
||||
// in-memory store: initial Load (cmd/server/store.go ~783-799), background
|
||||
// chunk loads (loadChunk, see issue #1558), MQTT ingest (~2293-2306), and
|
||||
// late-arriving-observation ingest (~2630-2643). Duplicating these three
|
||||
// calls inline let loadChunk silently drop the branch and collapse per-node
|
||||
// Home-page stats after restart for relay-heavy nodes — see #1558.
|
||||
//
|
||||
// Caller contract:
|
||||
// - Must hold s.mu write lock (addToByNode / addToResolvedPubkeyIndex /
|
||||
// addResolvedPubkeysToPathHopIndex all mutate store state).
|
||||
// - pks should be the output of extractResolvedPubkeys (no nils, no
|
||||
// empties); the helper is a no-op when pks is empty.
|
||||
// - hopsSeen is a reusable scratch map; addResolvedPubkeysToPathHopIndex
|
||||
// clear()s it on entry.
|
||||
func (s *PacketStore) indexResolvedPathHops(tx *StoreTx, pks []string, hopsSeen map[string]bool) {
|
||||
if len(pks) == 0 {
|
||||
return
|
||||
}
|
||||
for _, pk := range pks {
|
||||
s.addToByNode(tx, pk)
|
||||
}
|
||||
s.addResolvedPubkeysToPathHopIndex(tx, pks, hopsSeen)
|
||||
s.addToResolvedPubkeyIndex(tx.ID, pks)
|
||||
}
|
||||
|
||||
// indexByNode extracts pubkeys from decoded_json and indexes the transmission.
|
||||
// indexByNode indexes a transmission under all pubkeys found in its decoded
|
||||
// JSON. Resolved path pubkeys are handled separately via the decode-window.
|
||||
@@ -2295,13 +2358,8 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
|
||||
if r.pathJSON != "" && r.pathJSON != "[]" && cachedPM != nil {
|
||||
rpForBroadcast = resolvePathForObs(r.pathJSON, r.observerID, tx, cachedPM, cachedGraph)
|
||||
resolvedPubkeys = extractResolvedPubkeys(rpForBroadcast)
|
||||
// Feed decode-window consumers: addToByNode + resolvedPubkeyIndex
|
||||
for _, pk := range resolvedPubkeys {
|
||||
s.addToByNode(tx, pk)
|
||||
}
|
||||
s.addToResolvedPubkeyIndex(tx.ID, resolvedPubkeys)
|
||||
// byPathHop resolved-key entries (#1164: helper invalidates relay stats cache).
|
||||
s.addResolvedPubkeysToPathHopIndex(tx, resolvedPubkeys, hopsSeen)
|
||||
// Single point of truth — see indexResolvedPathHops doc + #1558.
|
||||
s.indexResolvedPathHops(tx, resolvedPubkeys, hopsSeen)
|
||||
}
|
||||
// Stash rpForBroadcast for later broadcast/persist (keyed by obs ID)
|
||||
if rpForBroadcast != nil {
|
||||
@@ -2632,12 +2690,8 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
|
||||
if pm != nil {
|
||||
obsResolvedPath = resolvePathForObs(r.pathJSON, r.observerID, tx, pm, graphRef)
|
||||
pks := extractResolvedPubkeys(obsResolvedPath)
|
||||
for _, pk := range pks {
|
||||
s.addToByNode(tx, pk)
|
||||
}
|
||||
s.addToResolvedPubkeyIndex(tx.ID, pks)
|
||||
// byPathHop resolved-key entries (#1164: helper invalidates relay stats cache).
|
||||
s.addResolvedPubkeysToPathHopIndex(tx, pks, hopsSeen)
|
||||
// Single point of truth — see indexResolvedPathHops doc + #1558.
|
||||
s.indexResolvedPathHops(tx, pks, hopsSeen)
|
||||
}
|
||||
}
|
||||
// Stash for broadcast/persist
|
||||
|
||||
Reference in New Issue
Block a user