fix(nodes): rebuild relay-hop history on startup from path_json (#1643)

## Problem

A relay node's **activity timeline** — and its per-node `packetsToday` /
observer counts — collapses to *"only the hour the server restarted"*
after every restart. Before the restart the timeline shows only the
node's own adverts (~1–2/hr); all of its relay activity piles into the
single post-restart hour.

## Root cause

All DB cold-load paths (`Load`, `loadChunk`, `scanAndMergeChunk`) index
relay-hop attribution into `byNode` **only** from
`observations.resolved_path`. But since #1287 the ingestor persists
relay data as aggregate `neighbor_edges` and **never writes
`resolved_path`** — it is `NULL` on every deployment (verified on a live
DB: 0 of ~440k rows populated). So relay attribution is never
reconstructed on startup; it only re-accumulates from live traffic
(`IngestNew*`, which re-resolves from `path_json` + the neighbor graph),
piling a relay node's whole history into the post-restart window.

## Fix

Server read-side only — **no schema / ingestor / migration change**.
When `resolved_path` is empty, re-resolve relay hops from the
already-persisted `path_json` using the in-memory prefix map + neighbor
graph (the same `resolvePathForObs` compute the live ingest path already
runs). `main.go` now loads the persisted neighbor graph *before* the
packet load so resolution has the graph available.

Two correctness details worth a close look:

1. **Fetch the prefix-map/graph snapshot BEFORE opening each load
cursor.** `getCachedNodesAndPM` issues its own DB query; doing so while
a load cursor is open deadlocks on a single-connection SQLite pool (the
test harness uses one).
2. **Index into `byNode` ONLY** — not the `resolved_path` / path-hop
indexes. Those are cross-checked by `handleNodePaths` against the
persisted `resolved_path` column (NULL here); populating them from an
in-memory re-resolution would make that SQL confirmation fail and
wrongly drop the tx from paths-through (#1352).

## Tests

New coverage asserts a relay pubkey reachable *only* via `path_json`
lands in `byNode` after a restart-style load, for both the hot-window
(`LoadChunked`) and background-window (`loadChunk`) paths. Existing
#1558 (`resolved_path`) and #1352 (paths-through) tests still pass. Full
`cd cmd/server && go test ./...` is green under `-race`.

## Perf

The fallback runs `resolvePathForObs` per observation with a non-empty
`path_json` during cold load — the same per-packet compute the live
ingest path already performs, so no new asymptotic cost. The prefix map
+ graph are snapshotted **once per load** (not per row);
`getCachedNodesAndPM` is 30s-cached. In `loadChunk` the resolution runs
in the existing lock-free scan and is accumulated locally, matching that
function's "build local, merge under lock" design.

## Note on a pre-existing flaky test

`TestDistanceConcurrentRequestsDuringBuildReturn202` is timing-fragile
(fails ~1/15 on `master` without this change). It relies on the lazy
distance build being slow because it's the first caller of
`getCachedNodesAndPM` (cold cache). This PR pre-warms that cache during
`Load`, narrowing the build window, so the test fails more often in
**non-race** local runs. It passes reliably under `-race` (CI mode),
where the build stays slow. Flagging in case you want to harden the test
separately.

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
Co-authored-by: openclaw-bot <openclaw-bot@users.noreply.github.com>
Co-authored-by: openclaw-bot <bot@openclaw>
This commit is contained in:
efiten
2026-06-11 20:36:49 +02:00
committed by GitHub
parent 825b26485c
commit 938153dd92
7 changed files with 671 additions and 24 deletions
+40 -2
View File
@@ -124,6 +124,13 @@ func (s *PacketStore) LoadChunked(chunkSize int) error {
if chunkSize <= 0 {
chunkSize = 10000
}
// Startup-ordering invariant (PR #1643 R1 munger #2). Mirror the
// guard in Load() so the production async path also fast-fails when
// neighbor_edges has rows but the graph is missing. See Load() for
// the full rationale.
if neighborEdgesTableExists(s.db.conn) && s.graph.Load() == nil {
panic("packet store LoadChunked(): neighbor_edges table has rows but s.graph is nil — graph must be loaded before packet load (see main.go #1643 invariant)")
}
s.chunkedLoadInit()
// Reset state for repeat calls in tests.
s.loadComplete.Store(false)
@@ -193,6 +200,19 @@ func (s *PacketStore) LoadChunked(chunkSize int) error {
// that semantic by starting one below SQLite's minimum rowid (-1).
var cursorID int64 = -1
// Relay-hop fallback inputs, fetched ONCE before the chunk-query loop.
// getCachedNodesAndPM issues its own DB query, so calling it while a
// chunk cursor is open would deadlock on a single-connection SQLite
// pool. resolved_path is never persisted post-#1287, so scanAndMergeChunk
// re-resolves relay hops from path_json using these snapshots.
// PR #1643 R1 munger #1: cold load uses unique_prefix-only gate, so
// the neighbor graph is no longer consulted here (affinity-tier
// resolution against ≤168h-old observations would silently mis-attribute).
s.mu.RLock()
_, relayPM := s.getCachedNodesAndPM()
s.mu.RUnlock()
var coldLoadAmbiguousHopsSkipped int
for {
conds := append([]string{}, loadConditions...)
conds = append(conds, fmt.Sprintf("t2.id > %d", cursorID))
@@ -233,7 +253,7 @@ func (s *PacketStore) LoadChunked(chunkSize int) error {
return fmt.Errorf("chunk %d: query: %w", chunkIdx, err)
}
chunkTxCount, lastID, err := s.scanAndMergeChunk(rows)
chunkTxCount, lastID, err := s.scanAndMergeChunk(rows, relayPM, &coldLoadAmbiguousHopsSkipped)
rows.Close()
if err != nil {
return fmt.Errorf("chunk %d: scan: %w", chunkIdx, err)
@@ -300,6 +320,10 @@ func (s *PacketStore) LoadChunked(chunkSize int) error {
elapsed := time.Since(t0)
log.Printf("[store] LoadChunked: %d transmissions (%d observations) across %d chunk(s) in %v (chunkSize=%d, DB total=%d)",
totalLoaded, s.totalObs, chunkIdx, elapsed, chunkSize, totalInDB)
if coldLoadAmbiguousHopsSkipped > 0 {
log.Printf("[store] LoadChunked: skipped %d ambiguous-prefix relay hops (unique_prefix gate, PR #1643 R1)",
coldLoadAmbiguousHopsSkipped)
}
s.loadMultibyteCapFromDB()
// Mark complete on the success path only — see the function-level
// defer above for why this is NOT in a deferred call. Probes that
@@ -312,7 +336,7 @@ func (s *PacketStore) LoadChunked(chunkSize int) error {
// scanAndMergeChunk consumes one chunk's rows under s.mu.Lock and
// returns the number of distinct transmissions seen + the max
// transmission id (cursor for the next chunk).
func (s *PacketStore) scanAndMergeChunk(rows *sql.Rows) (int, int64, error) {
func (s *PacketStore) scanAndMergeChunk(rows *sql.Rows, relayPM *prefixMap, coldLoadAmbiguousHopsSkipped *int) (int, int64, error) {
s.mu.Lock()
defer s.mu.Unlock()
@@ -411,6 +435,20 @@ func (s *PacketStore) scanAndMergeChunk(rows *sql.Rows) (int, int64, error) {
rp := unmarshalResolvedPath(rpStr)
pks := extractResolvedPubkeys(rp)
s.indexResolvedPathHops(tx, pks, hopsSeen)
} else if relayPM != nil && obsPJ != "" && obsPJ != "[]" {
// resolved_path is NULL on live (since #1287 relay data is
// persisted as neighbor_edges, not per-observation). Re-resolve
// relay-hop attribution from path_json so relay nodes keep their
// analytics history across a restart instead of rebuilding only
// from post-restart live traffic. relayPM is passed in from
// LoadChunked (fetched before any chunk cursor opened).
// byNode ONLY — see the Load() counterpart for why the
// resolved_path/path-hop indexes must NOT be populated here.
// PR #1643 R1 munger #1: unique_prefix-only gate.
rp := resolvePathForObsColdLoad(obsPJ, obsIDStr, tx, relayPM, coldLoadAmbiguousHopsSkipped)
for _, pk := range extractResolvedPubkeys(rp) {
s.addToByNode(tx, pk)
}
}
tx.Observations = append(tx.Observations, obs)
+90
View File
@@ -0,0 +1,90 @@
package main
import (
"database/sql"
"path/filepath"
"testing"
"time"
_ "modernc.org/sqlite"
)
// TestLoad_PanicsWhenGraphNotLoadedAndEdgesExist pins the startup-ordering
// invariant (munger R1 #2). Graph-load-before-packet-load is the entire
// premise of PR #1643's fix: without an in-memory neighbor graph, the
// path_json relay-hop fallback cannot resolve hops, so relay-node analytics
// history collapses. main.go currently does the right thing — but nothing
// asserts the ordering, so a future refactor could silently regress.
//
// Load() must panic when neighbor_edges has rows but s.graph.Load() returns
// nil. Fast-fail at startup beats silently-wrong attribution.
func TestLoad_PanicsWhenGraphNotLoadedAndEdgesExist(t *testing.T) {
dir := t.TempDir()
dbPath := filepath.Join(dir, "test.db")
rw, err := sql.Open("sqlite", "file:"+dbPath+"?_journal_mode=WAL")
if err != nil {
t.Fatal(err)
}
defer rw.Close()
exec := func(s string, args ...interface{}) {
if _, err := rw.Exec(s, args...); err != nil {
t.Fatalf("setup exec failed: %v\nSQL: %s", err, s)
}
}
// Minimal CoreScope schema. PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB"
exec(`CREATE TABLE transmissions (
id INTEGER PRIMARY KEY,
raw_hex TEXT, hash TEXT, first_seen TEXT,
route_type INTEGER, payload_type INTEGER, payload_version INTEGER,
decoded_json TEXT
)`)
// PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB"
exec(`CREATE TABLE observations (
id INTEGER PRIMARY KEY, transmission_id INTEGER,
observer_id TEXT, observer_name TEXT,
direction TEXT, snr REAL, rssi REAL, score INTEGER,
path_json TEXT, timestamp TEXT, raw_hex TEXT, resolved_path TEXT
)`)
// PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB"
exec(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`)
// PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB"
exec(`CREATE TABLE nodes (
public_key TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL,
last_seen TEXT, first_seen TEXT, advert_count INTEGER DEFAULT 0
)`)
// PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB"
exec(`CREATE TABLE schema_version (version INTEGER)`)
exec(`INSERT INTO schema_version (version) VALUES (1)`)
// PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB"
exec(`CREATE TABLE neighbor_edges (
node_a TEXT NOT NULL,
node_b TEXT NOT NULL,
count INTEGER DEFAULT 1,
last_seen TEXT,
PRIMARY KEY (node_a, node_b)
)`)
now := time.Now().UTC().Format(time.RFC3339)
exec(`INSERT INTO neighbor_edges (node_a, node_b, count, last_seen) VALUES (?, ?, ?, ?)`,
"aaa", "bbb", 5, now)
d, err := OpenDB(dbPath)
if err != nil {
t.Fatalf("OpenDB: %v", err)
}
defer d.conn.Close()
// Deliberately DO NOT call store.graph.Store(...). s.graph.Load() returns
// nil → the bug condition the invariant guard must catch.
store := NewPacketStore(d, &PacketStoreConfig{RetentionHours: 72})
defer func() {
r := recover()
if r == nil {
t.Fatalf("Load() must panic when neighbor_edges has rows but graph is nil; got no panic")
}
}()
_ = store.Load()
}
@@ -0,0 +1,172 @@
package main
import (
"database/sql"
"fmt"
"path/filepath"
"testing"
"time"
_ "modernc.org/sqlite"
)
// createTestDBAmbiguousPrefix builds a fixture where TWO repeaters share the
// same 2-char hop prefix. An observation's path_json carries ONLY the
// ambiguous prefix (no longer prefix that would disambiguate). With no
// neighbor_edges seeded, the cold-load fallback in scanAndMergeChunk has
// nothing to anchor on — yet the current code resolves the prefix anyway
// (via observation_count_fallback or candidate[0]) and over-attributes the
// hop to ONE of the two repeaters. That is the time-travel bug munger
// flagged: the historical packet's actual relay is unknown, but the loader
// picks today's tier-4 winner against ~7-day-old observations.
func createTestDBAmbiguousPrefix(t *testing.T, relayA, relayB, hop, firstSeen string) string {
t.Helper()
dir := t.TempDir()
dbPath := filepath.Join(dir, "test.db")
conn, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL")
if err != nil {
t.Fatal(err)
}
defer conn.Close()
exec := func(s string, args ...interface{}) {
if _, err := conn.Exec(s, args...); err != nil {
t.Fatalf("setup exec failed: %v\nSQL: %s", err, s)
}
}
// PREFLIGHT: async=true reason="test fixture: in-memory t.TempDir SQLite, never touches a real DB."
exec(`CREATE TABLE transmissions (
id INTEGER PRIMARY KEY,
raw_hex TEXT, hash TEXT, first_seen TEXT,
route_type INTEGER, payload_type INTEGER, payload_version INTEGER,
decoded_json TEXT
)`)
// PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB"
exec(`CREATE TABLE observations (
id INTEGER PRIMARY KEY,
transmission_id INTEGER,
observer_id TEXT, observer_name TEXT,
direction TEXT, snr REAL, rssi REAL, score INTEGER,
path_json TEXT, timestamp TEXT,
raw_hex TEXT,
resolved_path TEXT
)`)
// PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB"
exec(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`)
// PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB"
exec(`CREATE TABLE nodes (
public_key TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL,
last_seen TEXT, first_seen TEXT, advert_count INTEGER DEFAULT 0
)`)
// PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB"
exec(`CREATE TABLE schema_version (version INTEGER)`)
exec(`INSERT INTO schema_version (version) VALUES (1)`)
// PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB"
exec(`CREATE INDEX idx_tx_first_seen ON transmissions(first_seen)`)
// Two repeaters sharing the same 2-char prefix `hop`.
// Different advert_counts so tier-4 tiebreak deterministically picks one
// (proving the bug: it over-attributes to the higher-count node).
exec(`INSERT INTO nodes (public_key, name, role, advert_count) VALUES (?,?,?,?)`,
relayA, "Relay A", "repeater", 50)
exec(`INSERT INTO nodes (public_key, name, role, advert_count) VALUES (?,?,?,?)`,
relayB, "Relay B", "repeater", 10)
// Aged 48h so it lands in the background window (loadChunk path).
exec("INSERT INTO transmissions VALUES (?,?,?,?,0,4,1,?)",
1, "aa", "hashamb_1", firstSeen, `{}`)
exec("INSERT INTO observations (id, transmission_id, observer_id, observer_name, direction, snr, rssi, score, path_json, timestamp, raw_hex, resolved_path) VALUES (?,?,?,?,?,?,?,?,?,?,?,NULL)",
1, 1, "obs1", "Obs1", "RX", -10.0, -80.0, 5, fmt.Sprintf(`[%q]`, hop), firstSeen, "")
return dbPath
}
// TestLoadChunk_AmbiguousPrefix_SkipsAttribution pins the fix for the
// time-travel attribution gate (munger R1 #1). When path_json carries an
// ambiguous prefix that matches multiple repeaters, the cold-load path
// MUST NOT pick a winner via affinity/observation-count tiebreak — today's
// affinity winner is not necessarily the historical hop. Safer to
// under-attribute (skip byNode for that hop) than to mis-attribute.
func TestLoadChunk_AmbiguousPrefix_SkipsAttribution(t *testing.T) {
relayA := "aabbccddeeff00112233445566778899aabbccddeeff00112233445566778899"
relayB := "aa1122334455667788990011223344556677889900112233445566778899aabb"
hop := "aa" // 2-char prefix shared by both relayA and relayB
aged := time.Now().UTC().Add(-48 * time.Hour).Format(time.RFC3339)
dbPath := createTestDBAmbiguousPrefix(t, relayA, relayB, hop, aged)
db, err := OpenDB(dbPath)
if err != nil {
t.Fatal(err)
}
defer db.conn.Close()
store := NewPacketStore(db, &PacketStoreConfig{
RetentionHours: 72,
HotStartupHours: 1, // hot load skips the 48h-old row → goes to loadChunk
})
// Empty graph: no neighbor-affinity tiebreak signal. Mirrors a freshly
// restarted server whose only relay info is the prefix map.
store.graph.Store(NewNeighborGraph())
if err := store.LoadChunked(0); err != nil {
t.Fatalf("LoadChunked: %v", err)
}
if got := len(store.byNode[relayA]) + len(store.byNode[relayB]); got != 0 {
t.Fatalf("setup: hot load unexpectedly picked up 48h-old row "+
"(byNode total=%d, want 0) — test would not exercise loadChunk", got)
}
chunkStart := time.Now().UTC().Add(-72 * time.Hour)
chunkEnd := time.Now().UTC().Add(-1 * time.Hour)
if err := store.loadChunk(chunkStart, chunkEnd); err != nil {
t.Fatalf("loadChunk: %v", err)
}
// Neither repeater may be over-attributed. The hop is ambiguous → the
// cold-load loader MUST NOT pick one as the byNode owner.
if got := len(store.byNode[relayA]); got != 0 {
t.Errorf("byNode[%s]: got %d transmissions, want 0 — ambiguous-prefix hop "+
"was over-attributed to relayA (time-travel attribution bug)", relayA, got)
}
if got := len(store.byNode[relayB]); got != 0 {
t.Errorf("byNode[%s]: got %d transmissions, want 0 — ambiguous-prefix hop "+
"was over-attributed to relayB (time-travel attribution bug)", relayB, got)
}
}
// TestLoad_AmbiguousPrefix_SkipsAttribution covers the hot-window Load()
// path. Same setup as the loadChunk test but the row falls inside the hot
// window so it is loaded by Load() / scanAndMergeChunk.
func TestLoad_AmbiguousPrefix_SkipsAttribution(t *testing.T) {
relayA := "bbccddeeff00112233445566778899aabbccddeeff00112233445566778899aa"
relayB := "bb112233445566778899001122334455667788990011223344556677889900aa"
hop := "bb"
ts := time.Now().UTC().Format(time.RFC3339)
dbPath := createTestDBAmbiguousPrefix(t, relayA, relayB, hop, ts)
db, err := OpenDB(dbPath)
if err != nil {
t.Fatal(err)
}
defer db.conn.Close()
store := NewPacketStore(db, &PacketStoreConfig{RetentionHours: 72})
store.graph.Store(NewNeighborGraph())
if err := store.LoadChunked(0); err != nil {
t.Fatalf("LoadChunked: %v", err)
}
if got := len(store.byNode[relayA]); got != 0 {
t.Errorf("byNode[%s]: got %d transmissions, want 0 — ambiguous-prefix hop "+
"was over-attributed (hot Load path)", relayA, got)
}
if got := len(store.byNode[relayB]); got != 0 {
t.Errorf("byNode[%s]: got %d transmissions, want 0 — ambiguous-prefix hop "+
"was over-attributed (hot Load path)", relayB, got)
}
}
@@ -0,0 +1,180 @@
package main
import (
"database/sql"
"fmt"
"path/filepath"
"testing"
"time"
_ "modernc.org/sqlite"
)
// createTestDBPathJSONNoResolvedPath builds a fixture that mirrors the LIVE
// deployment state after #1287: observations carry a path_json hop list but
// observations.resolved_path is NULL (the ingestor no longer writes it; relay
// data is persisted as aggregate neighbor_edges instead). A single repeater
// node whose public_key starts with hopPrefix lets the in-memory prefix map
// resolve that hop unambiguously to relayPubkey.
//
// The transmission's decoded_json is empty ({}), so relayPubkey is NOT an
// endpoint (pubKey/destPubKey/srcPubKey). The ONLY way it can enter
// s.byNode is via path_json → resolvePathForObs relay-hop resolution.
func createTestDBPathJSONNoResolvedPath(t *testing.T, relayPubkey, hopPrefix, firstSeen string) string {
t.Helper()
dir := t.TempDir()
dbPath := filepath.Join(dir, "test.db")
conn, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL")
if err != nil {
t.Fatal(err)
}
defer conn.Close()
exec := func(s string, args ...interface{}) {
if _, err := conn.Exec(s, args...); err != nil {
t.Fatalf("setup exec failed: %v\nSQL: %s", err, s)
}
}
// PREFLIGHT: async=true reason="test fixture: in-memory t.TempDir SQLite, never touches a real DB. Tables are CREATE-from-empty in a one-shot OpenDB call, not a schema migration over existing data."
exec(`CREATE TABLE transmissions (
id INTEGER PRIMARY KEY,
raw_hex TEXT, hash TEXT, first_seen TEXT,
route_type INTEGER, payload_type INTEGER, payload_version INTEGER,
decoded_json TEXT
)`)
// resolved_path column present (matches live schema) but left NULL.
// PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB"
exec(`CREATE TABLE observations (
id INTEGER PRIMARY KEY,
transmission_id INTEGER,
observer_id TEXT, observer_name TEXT,
direction TEXT, snr REAL, rssi REAL, score INTEGER,
path_json TEXT, timestamp TEXT,
raw_hex TEXT,
resolved_path TEXT
)`)
// PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB"
exec(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`)
// Production nodes schema uses public_key (not pubkey) — getAllNodes /
// buildPrefixMap reads public_key, role, advert_count, first_seen.
// PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB"
exec(`CREATE TABLE nodes (
public_key TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL,
last_seen TEXT, first_seen TEXT, advert_count INTEGER DEFAULT 0
)`)
// PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB"
exec(`CREATE TABLE schema_version (version INTEGER)`)
exec(`INSERT INTO schema_version (version) VALUES (1)`)
// PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB"
exec(`CREATE INDEX idx_tx_first_seen ON transmissions(first_seen)`)
// Repeater node so canAppearInPath() admits it to the prefix map.
exec(`INSERT INTO nodes (public_key, name, role, advert_count) VALUES (?,?,?,?)`,
relayPubkey, "Relay One", "repeater", 10)
exec("INSERT INTO transmissions VALUES (?,?,?,?,0,4,1,?)",
1, "aa", "hashpjf_1", firstSeen, `{}`)
// resolved_path explicitly NULL; path_json carries the relay hop prefix.
exec("INSERT INTO observations (id, transmission_id, observer_id, observer_name, direction, snr, rssi, score, path_json, timestamp, raw_hex, resolved_path) VALUES (?,?,?,?,?,?,?,?,?,?,?,NULL)",
1, 1, "obs1", "Obs1", "RX", -10.0, -80.0, 5, fmt.Sprintf(`[%q]`, hopPrefix), firstSeen, "")
return dbPath
}
// TestLoadChunked_ResolvesRelayHopsFromPathJSON_WhenResolvedPathEmpty pins the
// fix for the "relay-node analytics empty after every restart" bug.
//
// On live, observations.resolved_path is 100% NULL (since #1287 the ingestor
// persists relay data as neighbor_edges, not per-observation resolved_path).
// The cold-load paths (Load / scanAndMergeChunk) indexed relay hops ONLY from
// resolved_path, so a relay node's path-hop attribution was never rebuilt on
// startup — it only re-accumulated from live traffic, collapsing the activity
// timeline to "just the hour the server restarted".
//
// The fix: when resolved_path is empty, fall back to resolving the hops from
// the persisted path_json using the in-memory prefix map + neighbor graph
// (exactly what the live ingest path already does), then index the relay hops.
func TestLoadChunked_ResolvesRelayHopsFromPathJSON_WhenResolvedPathEmpty(t *testing.T) {
relayPK := "aabbccddeeff00112233445566778899aabbccddeeff00112233445566778899"
hop := "aa" // 2-hex-char path hop; unique 2-char prefix of relayPK
ts := time.Now().UTC().Format(time.RFC3339)
dbPath := createTestDBPathJSONNoResolvedPath(t, relayPK, hop, ts)
db, err := OpenDB(dbPath)
if err != nil {
t.Fatal(err)
}
defer db.conn.Close()
if !db.hasResolvedPath {
t.Fatalf("setup: fixture should expose resolved_path column; hasResolvedPath=false")
}
store := NewPacketStore(db, &PacketStoreConfig{RetentionHours: 72})
// Empty graph is sufficient: a single prefix candidate resolves without
// neighbor-affinity disambiguation. Mirrors a freshly restarted server
// that has loaded its neighbor_edges snapshot before the packet load.
store.graph.Store(NewNeighborGraph())
if err := store.LoadChunked(0); err != nil {
t.Fatalf("LoadChunked: %v", err)
}
// The relay pubkey only reachable through path_json resolution must be
// indexed in byNode for the transmission.
if got := len(store.byNode[relayPK]); got != 1 {
t.Errorf("byNode[%s]: got %d transmissions, want 1 — cold load did not "+
"resolve relay hops from path_json when resolved_path was NULL "+
"(relay history lost on restart)", relayPK, got)
}
}
// TestLoadChunk_ResolvesRelayHopsFromPathJSON_WhenResolvedPathEmpty covers the
// background-window loader (loadBackgroundChunks → loadChunk), which on live
// loads everything older than hotStartupHours (24h) up to retentionHours
// (168h). Without the path_json fallback here, a relay node's analytics for
// the older 6 days would still vanish on every restart even with the hot
// window fixed.
func TestLoadChunk_ResolvesRelayHopsFromPathJSON_WhenResolvedPathEmpty(t *testing.T) {
relayPK := "ccddeeff00112233445566778899aabbccddeeff00112233445566778899aabb"
hop := "cc"
// Aged 48h so it falls in the background window, not the hot window.
aged := time.Now().UTC().Add(-48 * time.Hour).Format(time.RFC3339)
dbPath := createTestDBPathJSONNoResolvedPath(t, relayPK, hop, aged)
db, err := OpenDB(dbPath)
if err != nil {
t.Fatal(err)
}
defer db.conn.Close()
store := NewPacketStore(db, &PacketStoreConfig{
RetentionHours: 72,
HotStartupHours: 1, // hot load must NOT pick up the 48h-old row
})
store.graph.Store(NewNeighborGraph())
if err := store.LoadChunked(0); err != nil {
t.Fatalf("LoadChunked: %v", err)
}
if got := len(store.byNode[relayPK]); got != 0 {
t.Fatalf("setup: hot load unexpectedly picked up 48h-old row; "+
"byNode[relayPK]=%d (want 0) — test would not exercise loadChunk", got)
}
chunkStart := time.Now().UTC().Add(-72 * time.Hour)
chunkEnd := time.Now().UTC().Add(-1 * time.Hour)
if err := store.loadChunk(chunkStart, chunkEnd); err != nil {
t.Fatalf("loadChunk: %v", err)
}
if got := len(store.byNode[relayPK]); got != 1 {
t.Errorf("byNode[%s]: got %d transmissions, want 1 — background loadChunk "+
"did not resolve relay hops from path_json when resolved_path was NULL "+
"(relay history lost on restart for the older retention window)", relayPK, got)
}
}
+21 -8
View File
@@ -198,6 +198,23 @@ func main() {
// In-memory packet store
store := NewPacketStore(database, cfg.PacketStore, cfg.CacheTTL)
store.config = cfg
// Load the persisted neighbor graph BEFORE the packet load so the
// chunked loader can resolve relay-hop pubkeys from path_json. Since
// #1287 the ingestor persists relay data only as aggregate
// neighbor_edges — observations.resolved_path is never written — so
// without an available graph at load time a relay node's analytics
// history would rebuild only from post-restart live traffic (the
// "timeline empty after every restart" bug). neighbor_edges is small,
// so this adds negligible latency before the HTTP listener binds. The
// fresh-DB branch (no snapshot) still builds in-memory AFTER the load
// below, because BuildFromStore needs the loaded packets.
neighborEdgesPersisted := neighborEdgesTableExists(database.conn)
if neighborEdgesPersisted {
store.graph.Store(loadNeighborEdgesFromDB(database.conn))
log.Printf("[neighbor] loaded persisted neighbor graph")
}
// #1009: chunked Load with early HTTP readiness. LoadChunked runs
// asynchronously and signals FirstChunkReady after the first chunk
// is merged so the HTTP listener can bind without waiting for the
@@ -228,9 +245,9 @@ func main() {
go store.loadBackgroundChunks()
}
// Initialize persisted neighbor graph.
// Per #1287, schema migrations all live in the ingestor (see
// dbschema.Apply). The server merely loads the snapshot here and
// Neighbor graph: the persisted snapshot (if present) was already
// loaded above, before the packet load. Per #1287 schema migrations
// all live in the ingestor; the server only reads the snapshot and
// then refreshes it via the recompNeighborGraph slot every 60s.
dbPath = database.path
database.hasResolvedPath = true // dbschema.AssertReady above already verified observations.resolved_path exists
@@ -238,11 +255,7 @@ func main() {
// WaitGroup for background init steps that gate /api/healthz readiness.
var initWg sync.WaitGroup
// Load or build neighbor graph
if neighborEdgesTableExists(database.conn) {
store.graph.Store(loadNeighborEdgesFromDB(database.conn))
log.Printf("[neighbor] loaded persisted neighbor graph")
} else {
if !neighborEdgesPersisted {
// No persisted snapshot yet (e.g. fresh DB before the ingestor
// has run its first edge-build cycle). Build an in-memory graph
// from the packets we already have so reads aren't empty. We
+57
View File
@@ -131,6 +131,63 @@ func resolvePathForObs(pathJSON, observerID string, tx *StoreTx, pm *prefixMap,
return resolved
}
// resolvePathForObsColdLoad is the cold-load (Load / loadChunk / scanAndMergeChunk)
// variant of resolvePathForObs that gates hop resolution on `unique_prefix`
// only. Live ingest uses the affinity/observation-count tiebreak via
// resolvePathForObs because it has roughly-current state. Cold load runs
// against observations up to retentionHours (168h) old, where today's
// affinity winner ≠ historical affinity winner for that prefix — silently
// mis-attributing the relay (PR #1643 R1 munger #1, "time-travel attribution
// gate").
//
// Behavior: hops whose prefix maps to exactly one repeater resolve as
// usual; hops whose prefix maps to multiple candidates return nil and
// increment skipped (caller-owned counter for observability — a single
// summary log line at the end of Load surfaces the total).
//
// Under-attribute > mis-attribute (reviewer consensus on PR #1643).
func resolvePathForObsColdLoad(pathJSON, observerID string, tx *StoreTx, pm *prefixMap, skipped *int) []*string {
hops := parsePathJSON(pathJSON)
if len(hops) == 0 {
return nil
}
resolved := make([]*string, len(hops))
for i, hop := range hops {
// unique_prefix iff the prefix maps to exactly one candidate
// after the observer-known nonRelay filter. Mirrors the
// `len(candidates) == 1 → "unique_prefix"` arm of
// resolveWithContext (store.go ~6380). Calling resolveWithContext
// with a nil graph and empty context skips the affinity/
// observation-count tiers entirely — but tier-4
// observation_count_fallback would still pick a winner for
// ambiguous prefixes, which is exactly what we must NOT do.
// Hence the explicit candidate-count check here.
h := strings.ToLower(hop)
candidates := pm.m[h]
if len(pm.nonRelay) > 0 && len(candidates) > 0 {
filtered := candidates[:0:0]
for j := range candidates {
if _, isListener := pm.nonRelay[strings.ToLower(candidates[j].PublicKey)]; isListener {
continue
}
filtered = append(filtered, candidates[j])
}
candidates = filtered
}
if len(candidates) == 1 {
pk := strings.ToLower(candidates[0].PublicKey)
resolved[i] = &pk
continue
}
// Ambiguous (len > 1) or no_match (len == 0). Under-attribute.
if len(candidates) > 1 && skipped != nil {
*skipped++
}
// resolved[i] stays nil; extractResolvedPubkeys filters it out.
}
return resolved
}
// marshalResolvedPath converts []*string to JSON for in-memory caching.
func marshalResolvedPath(rp []*string) string {
if len(rp) == 0 {
+111 -14
View File
@@ -649,6 +649,18 @@ func (s *PacketStore) Load() error {
s.mu.Lock()
defer s.mu.Unlock()
// Startup-ordering invariant (PR #1643 R1 munger #2). Graph-load-
// before-packet-load is the entire premise of the relay-history
// cold-load fix: without an in-memory neighbor graph, the path_json
// fallback below cannot disambiguate relay hops, so relay-node
// analytics history silently collapses on every restart. main.go
// loads neighbor_edges → s.graph BEFORE invoking Load/LoadChunked;
// fast-fail here so a future refactor that swaps that ordering trips
// the panic at startup instead of regressing the bug silently.
if neighborEdgesTableExists(s.db.conn) && s.graph.Load() == nil {
panic("packet store Load(): neighbor_edges table has rows but s.graph is nil — graph must be loaded before packet load (see main.go #1643 invariant)")
}
t0 := time.Now()
// Count total transmissions for logging.
@@ -728,6 +740,19 @@ func (s *PacketStore) Load() error {
ORDER BY t.first_seen ASC, o.timestamp DESC`
}
// Relay-hop fallback inputs. When resolved_path is empty (always, on
// live, since #1287 the ingestor persists relay data as neighbor_edges
// instead) we re-resolve relay hops from path_json using the prefix
// map. PR #1643 R1 munger #1: cold load resolves ONLY when the prefix
// is unique (no affinity tiebreak against ≤168h-old observations,
// which would silently mis-attribute hops). Fetched BEFORE opening
// the rows cursor below: getCachedNodesAndPM issues its own DB query,
// which would deadlock against the still-open cursor on a single-
// connection SQLite pool. Load holds s.mu.Lock(), so calling
// getCachedNodesAndPM directly is safe.
_, relayPM := s.getCachedNodesAndPM()
var coldLoadAmbiguousHopsSkipped int
rows, err := s.db.conn.Query(loadSQL)
if err != nil {
return err
@@ -825,6 +850,21 @@ func (s *PacketStore) Load() error {
pks := extractResolvedPubkeys(rp)
// Single point of truth — see indexResolvedPathHops doc + #1558.
s.indexResolvedPathHops(tx, pks, hopsSeen)
} else if relayPM != nil && obsPJ != "" && obsPJ != "[]" {
// resolved_path not persisted — reconstruct relay hops from
// path_json so relay-node analytics history survives a restart.
// Index into byNode ONLY: the resolved_path / path-hop indexes
// (indexResolvedPathHops) are cross-checked by handleNodePaths
// against the persisted resolved_path column, which is NULL
// here — populating them would make that SQL confirmation fail
// and wrongly drop the tx from paths-through (#1352). byNode is
// what the node-analytics activity timeline reads.
// PR #1643 R1 munger #1: gate on unique_prefix only — ambiguous
// hops are silently dropped (skipped counter logged at end).
rp := resolvePathForObsColdLoad(obsPJ, obsIDStr, tx, relayPM, &coldLoadAmbiguousHopsSkipped)
for _, pk := range extractResolvedPubkeys(rp) {
s.addToByNode(tx, pk)
}
}
tx.Observations = append(tx.Observations, obs)
@@ -900,6 +940,10 @@ func (s *PacketStore) Load() error {
len(s.packets), s.totalObs, elapsed, s.trackedMemoryMB(), s.estimatedMemoryMB())
}
s.loadMultibyteCapFromDB()
if coldLoadAmbiguousHopsSkipped > 0 {
log.Printf("[store] cold load: skipped %d ambiguous-prefix relay hops (unique_prefix gate, PR #1643 R1)",
coldLoadAmbiguousHopsSkipped)
}
// Kick off background subpath + path-hop index builds (#1008).
// The goroutine acquires s.mu.Lock() and so will block until Load's
// deferred Unlock fires when this function returns. HTTP handlers
@@ -919,6 +963,28 @@ func (s *PacketStore) Load() error {
// byPayloadType is updated here incrementally. byPathHop, spIndex, and
// distHops are NOT updated here — the caller (loadBackgroundChunks) rebuilds
// those once after all chunks are merged.
// accumulateDedup appends pks to byTx[txID], deduplicating per-tx via the
// parallel seenByTx set. Used by loadChunk to build the per-tx relay-hop
// pubkey unions outside the merge critical section.
func accumulateDedup(byTx map[int][]string, seenByTx map[int]map[string]bool, txID int, pks []string) {
if len(pks) == 0 {
return
}
seen := seenByTx[txID]
if seen == nil {
seen = make(map[string]bool, len(pks))
seenByTx[txID] = seen
}
for _, pk := range pks {
if seen[pk] {
continue
}
seen[pk] = true
byTx[txID] = append(byTx[txID], pk)
}
}
func (s *PacketStore) loadChunk(from, to time.Time) error {
fromStr := from.UTC().Format(time.RFC3339)
toStr := to.UTC().Format(time.RFC3339)
@@ -955,6 +1021,22 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
ORDER BY t.first_seen ASC, o.timestamp DESC`
}
// Relay-hop fallback inputs. observations.resolved_path is NULL on
// every live deployment (since #1287 the ingestor persists relay data
// as aggregate neighbor_edges, not per-observation resolved_path), so
// for this background-loaded older window we re-resolve relay hops
// from the persisted path_json using the prefix map. PR #1643 R1
// munger #1: cold load resolves ONLY when the prefix is unique
// (affinity-tier resolution against ≤168h-old observations would
// silently mis-attribute hops). Fetched BEFORE opening the rows
// cursor below: getCachedNodesAndPM issues its own DB query, which
// would deadlock against the open cursor on a single-connection SQLite
// pool.
s.mu.RLock()
_, relayPM := s.getCachedNodesAndPM()
s.mu.RUnlock()
var coldLoadAmbiguousHopsSkipped int
rows, err := s.db.conn.Query(chunkSQL, fromStr, toStr)
if err != nil {
return err
@@ -973,6 +1055,11 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
// the rest of loadChunk's "build local, merge under lock" shape.
localResolvedPKsByTx := make(map[int][]string)
localResolvedSeenByTx := make(map[int]map[string]bool)
// Path_json fallback pubkeys (resolved_path NULL): indexed into byNode
// ONLY at merge, kept separate from localResolvedPKsByTx because they must
// NOT enter the resolved_path/path-hop indexes (see the per-row comment).
localByNodePKsByTx := make(map[int][]string)
localByNodeSeenByTx := make(map[int]map[string]bool)
var localTotalObs int
var localTrackedBytes int64
var localMaxTxID int
@@ -1083,20 +1170,20 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
if rpStr != "" {
rp := unmarshalResolvedPath(rpStr)
pks := extractResolvedPubkeys(rp)
if len(pks) > 0 {
seen := localResolvedSeenByTx[txID]
if seen == nil {
seen = make(map[string]bool, len(pks))
localResolvedSeenByTx[txID] = seen
}
for _, pk := range pks {
if seen[pk] {
continue
}
seen[pk] = true
localResolvedPKsByTx[txID] = append(localResolvedPKsByTx[txID], pk)
}
}
accumulateDedup(localResolvedPKsByTx, localResolvedSeenByTx, txID, pks)
} else if relayPM != nil && obsPJ != "" && obsPJ != "[]" {
// resolved_path not persisted — reconstruct relay hops from
// path_json so the older retention window keeps relay-node
// history across a restart (mirrors the hot-window fix in
// scanAndMergeChunk and the live ingest path). These go into a
// SEPARATE accumulator indexed into byNode ONLY at merge — they
// must not enter the resolved_path/path-hop indexes, which
// handleNodePaths cross-checks against the NULL resolved_path
// column (#1352).
// PR #1643 R1 munger #1: unique_prefix-only gate.
rp := resolvePathForObsColdLoad(obsPJ, obsIDStr, tx, relayPM, &coldLoadAmbiguousHopsSkipped)
pks := extractResolvedPubkeys(rp)
accumulateDedup(localByNodePKsByTx, localByNodeSeenByTx, txID, pks)
}
}
}
@@ -1207,6 +1294,12 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
if pks := localResolvedPKsByTx[tx.ID]; len(pks) > 0 {
s.indexResolvedPathHops(tx, pks, hopsSeen)
}
// path_json fallback (resolved_path NULL): byNode ONLY, so relay
// nodes keep their analytics history across a restart without
// polluting the resolved_path/path-hop indexes (#1352).
for _, pk := range localByNodePKsByTx[tx.ID] {
s.addToByNode(tx, pk)
}
}
s.mu.Unlock()
runtime.Gosched()
@@ -1229,6 +1322,10 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
s.mu.Unlock()
log.Printf("[store] background chunk [%s, %s) merged: %d tx, %d obs", fromStr, toStr, len(localPackets), localTotalObs)
if coldLoadAmbiguousHopsSkipped > 0 {
log.Printf("[store] background chunk: skipped %d ambiguous-prefix relay hops (unique_prefix gate, PR #1643 R1)",
coldLoadAmbiguousHopsSkipped)
}
return nil
}