diff --git a/cmd/server/loadchunk_resolved_path_1558_test.go b/cmd/server/loadchunk_resolved_path_1558_test.go new file mode 100644 index 00000000..b9ede3c4 --- /dev/null +++ b/cmd/server/loadchunk_resolved_path_1558_test.go @@ -0,0 +1,160 @@ +package main + +import ( + "database/sql" + "fmt" + "path/filepath" + "testing" + "time" + + _ "modernc.org/sqlite" +) + +// createTestDBWithResolvedPath creates a fixture DB containing numTx old +// transmissions (48h ago, outside any default hot window) where each +// observation has a non-empty resolved_path JSON listing relay-hop pubkeys. +// Mirrors createTestDBWithAgedPackets shape but adds the resolved_path +// column so loadChunk's hasResolvedPath branch is exercised. +func createTestDBWithResolvedPath(t *testing.T, numTx int, relayPubkeys []string) string { + t.Helper() + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + + conn, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + exec := func(s string, args ...interface{}) { + if _, err := conn.Exec(s, args...); err != nil { + t.Fatalf("setup exec failed: %v\nSQL: %s", err, s) + } + } + + exec(`CREATE TABLE transmissions ( + id INTEGER PRIMARY KEY, + raw_hex TEXT, hash TEXT, first_seen TEXT, + route_type INTEGER, payload_type INTEGER, payload_version INTEGER, + decoded_json TEXT + )`) + exec(`CREATE TABLE observations ( + id INTEGER PRIMARY KEY, + transmission_id INTEGER, + observer_id TEXT, observer_name TEXT, + direction TEXT, snr REAL, rssi REAL, score INTEGER, + path_json TEXT, timestamp TEXT, + raw_hex TEXT, + resolved_path TEXT + )`) + exec(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`) + exec(`CREATE TABLE nodes (pubkey TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL, last_seen TEXT, first_seen TEXT, frequency REAL)`) + exec(`CREATE TABLE schema_version (version INTEGER)`) + exec(`INSERT INTO schema_version (version) VALUES (1)`) + exec(`CREATE INDEX idx_tx_first_seen ON transmissions(first_seen)`) + + // Build resolved_path JSON array of pubkey strings: ["pk1","pk2",...] + rpJSON := "[" + for i, pk := range relayPubkeys { + if i > 0 { + rpJSON += "," + } + rpJSON += fmt.Sprintf("%q", pk) + } + rpJSON += "]" + + now := time.Now().UTC() + for i := 0; i < numTx; i++ { + ts := now.Add(-48 * time.Hour).Add(time.Duration(i) * time.Second).Format(time.RFC3339) + hash := fmt.Sprintf("hash1558_%d", i) + exec("INSERT INTO transmissions VALUES (?,?,?,?,0,4,1,?)", + i+1, "aa", hash, ts, `{}`) + exec("INSERT INTO observations (id, transmission_id, observer_id, observer_name, direction, snr, rssi, score, path_json, timestamp, raw_hex, resolved_path) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", + i+1, i+1, "obs1", "Obs1", "RX", -10.0, -80.0, 5, `[]`, ts, "", rpJSON) + } + return dbPath +} + +// TestLoadChunk_IndexesResolvedPathPubkeys_Issue1558 verifies the +// contract-violation fix from #1558: +// +// `Load` (cmd/server/store.go:783-799) unmarshals each observation's +// resolved_path column and feeds every relay-hop pubkey through +// addToByNode / addResolvedPubkeysToPathHopIndex / +// addToResolvedPubkeyIndex. `loadChunk` (cmd/server/store.go:937-1023) +// scans the same column into resolvedPathStr but never feeds it +// anywhere — so background-backfilled transmissions never appear under +// their relay pubkeys in s.byNode, even though the same exact rows do +// when they happen to fall inside the hot startup window. +// +// Symptom in production: Home page per-node `packetsToday` / +// `totalTransmissions` / observer counts collapse after a container +// restart for any node that primarily appears as a relay (rather than +// as the endpoint pubKey/destPubKey/srcPubKey of a packet), because the +// background backfill path silently drops the relay-hop indexing +// branch. See issue #1558 for the full trace + diagnosis. +// +// This test loads a fixture DB exclusively via loadChunk (skipping +// Load) and asserts that for each relay pubkey present in +// `resolved_path` of every observation, s.byNode contains the +// transmission. +func TestLoadChunk_IndexesResolvedPathPubkeys_Issue1558(t *testing.T) { + // Two distinct relay pubkeys appear in every observation's resolved_path. + // Neither is an endpoint pubkey in decoded_json — so the ONLY path + // they can enter byNode through is the resolved_path branch. + relayPK1 := "1111111111111111111111111111111111111111111111111111111111111111" + relayPK2 := "2222222222222222222222222222222222222222222222222222222222222222" + + dbPath := createTestDBWithResolvedPath(t, 3, []string{relayPK1, relayPK2}) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + if !db.hasResolvedPath { + t.Fatalf("setup: fixture should expose resolved_path column; hasResolvedPath=false") + } + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 1, // initial Load should NOT pick up 48h-old fixture rows + }) + if err := store.Load(); err != nil { + t.Fatal(err) + } + // Confirm the fixture rows are outside the hot window — Load() must + // not have already populated byNode for the relay pubkeys; otherwise + // the test would not actually be exercising loadChunk. + if len(store.byNode[relayPK1]) != 0 { + t.Fatalf("setup: Load() unexpectedly picked up 48h-old rows; "+ + "byNode[relayPK1]=%d entries (expected 0)", len(store.byNode[relayPK1])) + } + + // Trigger background backfill of the 48h-old window via loadChunk — + // this is the code path under test. + chunkStart := time.Now().UTC().Add(-72 * time.Hour) + chunkEnd := time.Now().UTC().Add(-1 * time.Hour) + if err := store.loadChunk(chunkStart, chunkEnd); err != nil { + t.Fatalf("loadChunk failed: %v", err) + } + + // Sanity: loadChunk did merge the transmissions into the slice. + if len(store.packets) != 3 { + t.Fatalf("loadChunk should have merged 3 transmissions; got %d", len(store.packets)) + } + + // THE ASSERTION: every relay pubkey listed in resolved_path must be + // indexed in byNode for every transmission, because loadChunk's + // per-row scan should mirror Load()'s 783-799 block. + for _, relayPK := range []string{relayPK1, relayPK2} { + got := len(store.byNode[relayPK]) + if got != 3 { + t.Errorf("byNode[%s]: got %d transmissions, want 3 — "+ + "loadChunk dropped the resolved_path indexing branch "+ + "(issue #1558)", + relayPK, got) + } + } +} diff --git a/cmd/server/store.go b/cmd/server/store.go index 969c926e..4ea15e6d 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -784,18 +784,8 @@ func (s *PacketStore) Load() error { if rpStr != "" { rp := unmarshalResolvedPath(rpStr) pks := extractResolvedPubkeys(rp) - // Feed decode-window consumers for this observation's pubkeys - if len(pks) > 0 { - // addToByNode for relay nodes - for _, pk := range pks { - s.addToByNode(tx, pk) - } - // touchRelayLastSeen handled in post-load pass - // byPathHop resolved-key entries (#1164: helper invalidates relay stats cache). - s.addResolvedPubkeysToPathHopIndex(tx, pks, hopsSeen) - // resolvedPubkeyIndex - s.addToResolvedPubkeyIndex(tx.ID, pks) - } + // Single point of truth — see indexResolvedPathHops doc + #1558. + s.indexResolvedPathHops(tx, pks, hopsSeen) } tx.Observations = append(tx.Observations, obs) @@ -923,6 +913,12 @@ func (s *PacketStore) loadChunk(from, to time.Time) error { localByTxID := make(map[int]*StoreTx) localByObsID := make(map[int]*StoreObs) localByObserver := make(map[string][]*StoreObs) + // Issue #1558: accumulate the union of resolved_path relay-hop + // pubkeys per tx outside the lock. We unmarshal + dedupe here so the + // merge critical section below only does map-append work, mirroring + // the rest of loadChunk's "build local, merge under lock" shape. + localResolvedPKsByTx := make(map[int][]string) + localResolvedSeenByTx := make(map[int]map[string]bool) var localTotalObs int var localTrackedBytes int64 var localMaxTxID int @@ -1022,6 +1018,32 @@ func (s *PacketStore) loadChunk(from, to time.Time) error { } localTotalObs++ localTrackedBytes += estimateStoreObsBytes(obs) + + // Issue #1558: collect resolved_path relay-hop pubkeys for + // this observation. We unmarshal + dedupe OUTSIDE the merge + // critical section so the lock-held work in the per-batch + // merge below stays bounded. Without this, background-loaded + // transmissions silently miss byNode entries for every relay + // they were heard via — Load() does the equivalent inline. + rpStr := nullStrVal(resolvedPathStr) + if rpStr != "" { + rp := unmarshalResolvedPath(rpStr) + pks := extractResolvedPubkeys(rp) + if len(pks) > 0 { + seen := localResolvedSeenByTx[txID] + if seen == nil { + seen = make(map[string]bool, len(pks)) + localResolvedSeenByTx[txID] = seen + } + for _, pk := range pks { + if seen[pk] { + continue + } + seen[pk] = true + localResolvedPKsByTx[txID] = append(localResolvedPKsByTx[txID], pk) + } + } + } } } if err := rows.Err(); err != nil { @@ -1084,6 +1106,10 @@ func (s *PacketStore) loadChunk(from, to time.Time) error { } s.mu.Lock() + // Issue #1558: hopsSeen scratch map for indexResolvedPathHops → + // addResolvedPubkeysToPathHopIndex. Allocated once per batch and + // reused across txs (clear()d on each call inside the helper). + hopsSeen := make(map[string]bool) newObsIDs := make(map[int]bool, len(batchObsIDs)) for k := range batchObsIDs { if s.byObsID[k] == nil { @@ -1119,6 +1145,14 @@ func (s *PacketStore) loadChunk(from, to time.Time) error { s.byPayloadType[pt] = append(s.byPayloadType[pt], tx) } s.trackAdvertPubkey(tx) + // Issue #1558: mirror Load()'s 783-799 resolved-path branch. + // Without this, background-loaded transmissions never enter + // byNode under their relay-hop pubkeys → Home-page per-node + // stats collapse after restart for relay-heavy nodes. The + // pubkey union was pre-built outside the lock above. + if pks := localResolvedPKsByTx[tx.ID]; len(pks) > 0 { + s.indexResolvedPathHops(tx, pks, hopsSeen) + } } s.mu.Unlock() runtime.Gosched() @@ -1277,6 +1311,35 @@ func pathLen(pathJSON string) int { return len(hops) } +// indexResolvedPathHops indexes a transmission under every relay-hop pubkey +// extracted from an observation's resolved_path, and refreshes the dependent +// resolved-pubkey + path-hop indexes. This is the single point of truth for +// the "feed decode-window consumers for resolved-path pubkeys" contract that +// must hold across every code path that materializes a transmission into the +// in-memory store: initial Load (cmd/server/store.go ~783-799), background +// chunk loads (loadChunk, see issue #1558), MQTT ingest (~2293-2306), and +// late-arriving-observation ingest (~2630-2643). Duplicating these three +// calls inline let loadChunk silently drop the branch and collapse per-node +// Home-page stats after restart for relay-heavy nodes — see #1558. +// +// Caller contract: +// - Must hold s.mu write lock (addToByNode / addToResolvedPubkeyIndex / +// addResolvedPubkeysToPathHopIndex all mutate store state). +// - pks should be the output of extractResolvedPubkeys (no nils, no +// empties); the helper is a no-op when pks is empty. +// - hopsSeen is a reusable scratch map; addResolvedPubkeysToPathHopIndex +// clear()s it on entry. +func (s *PacketStore) indexResolvedPathHops(tx *StoreTx, pks []string, hopsSeen map[string]bool) { + if len(pks) == 0 { + return + } + for _, pk := range pks { + s.addToByNode(tx, pk) + } + s.addResolvedPubkeysToPathHopIndex(tx, pks, hopsSeen) + s.addToResolvedPubkeyIndex(tx.ID, pks) +} + // indexByNode extracts pubkeys from decoded_json and indexes the transmission. // indexByNode indexes a transmission under all pubkeys found in its decoded // JSON. Resolved path pubkeys are handled separately via the decode-window. @@ -2295,13 +2358,8 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac if r.pathJSON != "" && r.pathJSON != "[]" && cachedPM != nil { rpForBroadcast = resolvePathForObs(r.pathJSON, r.observerID, tx, cachedPM, cachedGraph) resolvedPubkeys = extractResolvedPubkeys(rpForBroadcast) - // Feed decode-window consumers: addToByNode + resolvedPubkeyIndex - for _, pk := range resolvedPubkeys { - s.addToByNode(tx, pk) - } - s.addToResolvedPubkeyIndex(tx.ID, resolvedPubkeys) - // byPathHop resolved-key entries (#1164: helper invalidates relay stats cache). - s.addResolvedPubkeysToPathHopIndex(tx, resolvedPubkeys, hopsSeen) + // Single point of truth — see indexResolvedPathHops doc + #1558. + s.indexResolvedPathHops(tx, resolvedPubkeys, hopsSeen) } // Stash rpForBroadcast for later broadcast/persist (keyed by obs ID) if rpForBroadcast != nil { @@ -2632,12 +2690,8 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] if pm != nil { obsResolvedPath = resolvePathForObs(r.pathJSON, r.observerID, tx, pm, graphRef) pks := extractResolvedPubkeys(obsResolvedPath) - for _, pk := range pks { - s.addToByNode(tx, pk) - } - s.addToResolvedPubkeyIndex(tx.ID, pks) - // byPathHop resolved-key entries (#1164: helper invalidates relay stats cache). - s.addResolvedPubkeysToPathHopIndex(tx, pks, hopsSeen) + // Single point of truth — see indexResolvedPathHops doc + #1558. + s.indexResolvedPathHops(tx, pks, hopsSeen) } } // Stash for broadcast/persist