diff --git a/cmd/server/chunked_load.go b/cmd/server/chunked_load.go index 8acb5fd8..094375ad 100644 --- a/cmd/server/chunked_load.go +++ b/cmd/server/chunked_load.go @@ -124,6 +124,13 @@ func (s *PacketStore) LoadChunked(chunkSize int) error { if chunkSize <= 0 { chunkSize = 10000 } + // Startup-ordering invariant (PR #1643 R1 munger #2). Mirror the + // guard in Load() so the production async path also fast-fails when + // neighbor_edges has rows but the graph is missing. See Load() for + // the full rationale. + if neighborEdgesTableExists(s.db.conn) && s.graph.Load() == nil { + panic("packet store LoadChunked(): neighbor_edges table has rows but s.graph is nil — graph must be loaded before packet load (see main.go #1643 invariant)") + } s.chunkedLoadInit() // Reset state for repeat calls in tests. s.loadComplete.Store(false) @@ -193,6 +200,19 @@ func (s *PacketStore) LoadChunked(chunkSize int) error { // that semantic by starting one below SQLite's minimum rowid (-1). var cursorID int64 = -1 + // Relay-hop fallback inputs, fetched ONCE before the chunk-query loop. + // getCachedNodesAndPM issues its own DB query, so calling it while a + // chunk cursor is open would deadlock on a single-connection SQLite + // pool. resolved_path is never persisted post-#1287, so scanAndMergeChunk + // re-resolves relay hops from path_json using these snapshots. + // PR #1643 R1 munger #1: cold load uses unique_prefix-only gate, so + // the neighbor graph is no longer consulted here (affinity-tier + // resolution against ≤168h-old observations would silently mis-attribute). + s.mu.RLock() + _, relayPM := s.getCachedNodesAndPM() + s.mu.RUnlock() + var coldLoadAmbiguousHopsSkipped int + for { conds := append([]string{}, loadConditions...) conds = append(conds, fmt.Sprintf("t2.id > %d", cursorID)) @@ -233,7 +253,7 @@ func (s *PacketStore) LoadChunked(chunkSize int) error { return fmt.Errorf("chunk %d: query: %w", chunkIdx, err) } - chunkTxCount, lastID, err := s.scanAndMergeChunk(rows) + chunkTxCount, lastID, err := s.scanAndMergeChunk(rows, relayPM, &coldLoadAmbiguousHopsSkipped) rows.Close() if err != nil { return fmt.Errorf("chunk %d: scan: %w", chunkIdx, err) @@ -300,6 +320,10 @@ func (s *PacketStore) LoadChunked(chunkSize int) error { elapsed := time.Since(t0) log.Printf("[store] LoadChunked: %d transmissions (%d observations) across %d chunk(s) in %v (chunkSize=%d, DB total=%d)", totalLoaded, s.totalObs, chunkIdx, elapsed, chunkSize, totalInDB) + if coldLoadAmbiguousHopsSkipped > 0 { + log.Printf("[store] LoadChunked: skipped %d ambiguous-prefix relay hops (unique_prefix gate, PR #1643 R1)", + coldLoadAmbiguousHopsSkipped) + } s.loadMultibyteCapFromDB() // Mark complete on the success path only — see the function-level // defer above for why this is NOT in a deferred call. Probes that @@ -312,7 +336,7 @@ func (s *PacketStore) LoadChunked(chunkSize int) error { // scanAndMergeChunk consumes one chunk's rows under s.mu.Lock and // returns the number of distinct transmissions seen + the max // transmission id (cursor for the next chunk). -func (s *PacketStore) scanAndMergeChunk(rows *sql.Rows) (int, int64, error) { +func (s *PacketStore) scanAndMergeChunk(rows *sql.Rows, relayPM *prefixMap, coldLoadAmbiguousHopsSkipped *int) (int, int64, error) { s.mu.Lock() defer s.mu.Unlock() @@ -411,6 +435,20 @@ func (s *PacketStore) scanAndMergeChunk(rows *sql.Rows) (int, int64, error) { rp := unmarshalResolvedPath(rpStr) pks := extractResolvedPubkeys(rp) s.indexResolvedPathHops(tx, pks, hopsSeen) + } else if relayPM != nil && obsPJ != "" && obsPJ != "[]" { + // resolved_path is NULL on live (since #1287 relay data is + // persisted as neighbor_edges, not per-observation). Re-resolve + // relay-hop attribution from path_json so relay nodes keep their + // analytics history across a restart instead of rebuilding only + // from post-restart live traffic. relayPM is passed in from + // LoadChunked (fetched before any chunk cursor opened). + // byNode ONLY — see the Load() counterpart for why the + // resolved_path/path-hop indexes must NOT be populated here. + // PR #1643 R1 munger #1: unique_prefix-only gate. + rp := resolvePathForObsColdLoad(obsPJ, obsIDStr, tx, relayPM, coldLoadAmbiguousHopsSkipped) + for _, pk := range extractResolvedPubkeys(rp) { + s.addToByNode(tx, pk) + } } tx.Observations = append(tx.Observations, obs) diff --git a/cmd/server/load_graph_invariant_test.go b/cmd/server/load_graph_invariant_test.go new file mode 100644 index 00000000..e128756b --- /dev/null +++ b/cmd/server/load_graph_invariant_test.go @@ -0,0 +1,90 @@ +package main + +import ( + "database/sql" + "path/filepath" + "testing" + "time" + + _ "modernc.org/sqlite" +) + +// TestLoad_PanicsWhenGraphNotLoadedAndEdgesExist pins the startup-ordering +// invariant (munger R1 #2). Graph-load-before-packet-load is the entire +// premise of PR #1643's fix: without an in-memory neighbor graph, the +// path_json relay-hop fallback cannot resolve hops, so relay-node analytics +// history collapses. main.go currently does the right thing — but nothing +// asserts the ordering, so a future refactor could silently regress. +// +// Load() must panic when neighbor_edges has rows but s.graph.Load() returns +// nil. Fast-fail at startup beats silently-wrong attribution. +func TestLoad_PanicsWhenGraphNotLoadedAndEdgesExist(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + + rw, err := sql.Open("sqlite", "file:"+dbPath+"?_journal_mode=WAL") + if err != nil { + t.Fatal(err) + } + defer rw.Close() + + exec := func(s string, args ...interface{}) { + if _, err := rw.Exec(s, args...); err != nil { + t.Fatalf("setup exec failed: %v\nSQL: %s", err, s) + } + } + + // Minimal CoreScope schema. PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB" + exec(`CREATE TABLE transmissions ( + id INTEGER PRIMARY KEY, + raw_hex TEXT, hash TEXT, first_seen TEXT, + route_type INTEGER, payload_type INTEGER, payload_version INTEGER, + decoded_json TEXT + )`) + // PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB" + exec(`CREATE TABLE observations ( + id INTEGER PRIMARY KEY, transmission_id INTEGER, + observer_id TEXT, observer_name TEXT, + direction TEXT, snr REAL, rssi REAL, score INTEGER, + path_json TEXT, timestamp TEXT, raw_hex TEXT, resolved_path TEXT + )`) + // PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB" + exec(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`) + // PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB" + exec(`CREATE TABLE nodes ( + public_key TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL, + last_seen TEXT, first_seen TEXT, advert_count INTEGER DEFAULT 0 + )`) + // PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB" + exec(`CREATE TABLE schema_version (version INTEGER)`) + exec(`INSERT INTO schema_version (version) VALUES (1)`) + // PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB" + exec(`CREATE TABLE neighbor_edges ( + node_a TEXT NOT NULL, + node_b TEXT NOT NULL, + count INTEGER DEFAULT 1, + last_seen TEXT, + PRIMARY KEY (node_a, node_b) + )`) + now := time.Now().UTC().Format(time.RFC3339) + exec(`INSERT INTO neighbor_edges (node_a, node_b, count, last_seen) VALUES (?, ?, ?, ?)`, + "aaa", "bbb", 5, now) + + d, err := OpenDB(dbPath) + if err != nil { + t.Fatalf("OpenDB: %v", err) + } + defer d.conn.Close() + + // Deliberately DO NOT call store.graph.Store(...). s.graph.Load() returns + // nil → the bug condition the invariant guard must catch. + store := NewPacketStore(d, &PacketStoreConfig{RetentionHours: 72}) + + defer func() { + r := recover() + if r == nil { + t.Fatalf("Load() must panic when neighbor_edges has rows but graph is nil; got no panic") + } + }() + _ = store.Load() +} diff --git a/cmd/server/loadchunk_ambiguous_prefix_test.go b/cmd/server/loadchunk_ambiguous_prefix_test.go new file mode 100644 index 00000000..9a0d4265 --- /dev/null +++ b/cmd/server/loadchunk_ambiguous_prefix_test.go @@ -0,0 +1,172 @@ +package main + +import ( + "database/sql" + "fmt" + "path/filepath" + "testing" + "time" + + _ "modernc.org/sqlite" +) + +// createTestDBAmbiguousPrefix builds a fixture where TWO repeaters share the +// same 2-char hop prefix. An observation's path_json carries ONLY the +// ambiguous prefix (no longer prefix that would disambiguate). With no +// neighbor_edges seeded, the cold-load fallback in scanAndMergeChunk has +// nothing to anchor on — yet the current code resolves the prefix anyway +// (via observation_count_fallback or candidate[0]) and over-attributes the +// hop to ONE of the two repeaters. That is the time-travel bug munger +// flagged: the historical packet's actual relay is unknown, but the loader +// picks today's tier-4 winner against ~7-day-old observations. +func createTestDBAmbiguousPrefix(t *testing.T, relayA, relayB, hop, firstSeen string) string { + t.Helper() + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + + conn, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + exec := func(s string, args ...interface{}) { + if _, err := conn.Exec(s, args...); err != nil { + t.Fatalf("setup exec failed: %v\nSQL: %s", err, s) + } + } + + // PREFLIGHT: async=true reason="test fixture: in-memory t.TempDir SQLite, never touches a real DB." + exec(`CREATE TABLE transmissions ( + id INTEGER PRIMARY KEY, + raw_hex TEXT, hash TEXT, first_seen TEXT, + route_type INTEGER, payload_type INTEGER, payload_version INTEGER, + decoded_json TEXT + )`) + // PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB" + exec(`CREATE TABLE observations ( + id INTEGER PRIMARY KEY, + transmission_id INTEGER, + observer_id TEXT, observer_name TEXT, + direction TEXT, snr REAL, rssi REAL, score INTEGER, + path_json TEXT, timestamp TEXT, + raw_hex TEXT, + resolved_path TEXT + )`) + // PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB" + exec(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`) + // PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB" + exec(`CREATE TABLE nodes ( + public_key TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL, + last_seen TEXT, first_seen TEXT, advert_count INTEGER DEFAULT 0 + )`) + // PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB" + exec(`CREATE TABLE schema_version (version INTEGER)`) + exec(`INSERT INTO schema_version (version) VALUES (1)`) + // PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB" + exec(`CREATE INDEX idx_tx_first_seen ON transmissions(first_seen)`) + + // Two repeaters sharing the same 2-char prefix `hop`. + // Different advert_counts so tier-4 tiebreak deterministically picks one + // (proving the bug: it over-attributes to the higher-count node). + exec(`INSERT INTO nodes (public_key, name, role, advert_count) VALUES (?,?,?,?)`, + relayA, "Relay A", "repeater", 50) + exec(`INSERT INTO nodes (public_key, name, role, advert_count) VALUES (?,?,?,?)`, + relayB, "Relay B", "repeater", 10) + + // Aged 48h so it lands in the background window (loadChunk path). + exec("INSERT INTO transmissions VALUES (?,?,?,?,0,4,1,?)", + 1, "aa", "hashamb_1", firstSeen, `{}`) + exec("INSERT INTO observations (id, transmission_id, observer_id, observer_name, direction, snr, rssi, score, path_json, timestamp, raw_hex, resolved_path) VALUES (?,?,?,?,?,?,?,?,?,?,?,NULL)", + 1, 1, "obs1", "Obs1", "RX", -10.0, -80.0, 5, fmt.Sprintf(`[%q]`, hop), firstSeen, "") + + return dbPath +} + +// TestLoadChunk_AmbiguousPrefix_SkipsAttribution pins the fix for the +// time-travel attribution gate (munger R1 #1). When path_json carries an +// ambiguous prefix that matches multiple repeaters, the cold-load path +// MUST NOT pick a winner via affinity/observation-count tiebreak — today's +// affinity winner is not necessarily the historical hop. Safer to +// under-attribute (skip byNode for that hop) than to mis-attribute. +func TestLoadChunk_AmbiguousPrefix_SkipsAttribution(t *testing.T) { + relayA := "aabbccddeeff00112233445566778899aabbccddeeff00112233445566778899" + relayB := "aa1122334455667788990011223344556677889900112233445566778899aabb" + hop := "aa" // 2-char prefix shared by both relayA and relayB + + aged := time.Now().UTC().Add(-48 * time.Hour).Format(time.RFC3339) + dbPath := createTestDBAmbiguousPrefix(t, relayA, relayB, hop, aged) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 1, // hot load skips the 48h-old row → goes to loadChunk + }) + // Empty graph: no neighbor-affinity tiebreak signal. Mirrors a freshly + // restarted server whose only relay info is the prefix map. + store.graph.Store(NewNeighborGraph()) + + if err := store.LoadChunked(0); err != nil { + t.Fatalf("LoadChunked: %v", err) + } + if got := len(store.byNode[relayA]) + len(store.byNode[relayB]); got != 0 { + t.Fatalf("setup: hot load unexpectedly picked up 48h-old row "+ + "(byNode total=%d, want 0) — test would not exercise loadChunk", got) + } + + chunkStart := time.Now().UTC().Add(-72 * time.Hour) + chunkEnd := time.Now().UTC().Add(-1 * time.Hour) + if err := store.loadChunk(chunkStart, chunkEnd); err != nil { + t.Fatalf("loadChunk: %v", err) + } + + // Neither repeater may be over-attributed. The hop is ambiguous → the + // cold-load loader MUST NOT pick one as the byNode owner. + if got := len(store.byNode[relayA]); got != 0 { + t.Errorf("byNode[%s]: got %d transmissions, want 0 — ambiguous-prefix hop "+ + "was over-attributed to relayA (time-travel attribution bug)", relayA, got) + } + if got := len(store.byNode[relayB]); got != 0 { + t.Errorf("byNode[%s]: got %d transmissions, want 0 — ambiguous-prefix hop "+ + "was over-attributed to relayB (time-travel attribution bug)", relayB, got) + } +} + +// TestLoad_AmbiguousPrefix_SkipsAttribution covers the hot-window Load() +// path. Same setup as the loadChunk test but the row falls inside the hot +// window so it is loaded by Load() / scanAndMergeChunk. +func TestLoad_AmbiguousPrefix_SkipsAttribution(t *testing.T) { + relayA := "bbccddeeff00112233445566778899aabbccddeeff00112233445566778899aa" + relayB := "bb112233445566778899001122334455667788990011223344556677889900aa" + hop := "bb" + + ts := time.Now().UTC().Format(time.RFC3339) + dbPath := createTestDBAmbiguousPrefix(t, relayA, relayB, hop, ts) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{RetentionHours: 72}) + store.graph.Store(NewNeighborGraph()) + + if err := store.LoadChunked(0); err != nil { + t.Fatalf("LoadChunked: %v", err) + } + + if got := len(store.byNode[relayA]); got != 0 { + t.Errorf("byNode[%s]: got %d transmissions, want 0 — ambiguous-prefix hop "+ + "was over-attributed (hot Load path)", relayA, got) + } + if got := len(store.byNode[relayB]); got != 0 { + t.Errorf("byNode[%s]: got %d transmissions, want 0 — ambiguous-prefix hop "+ + "was over-attributed (hot Load path)", relayB, got) + } +} diff --git a/cmd/server/loadchunk_pathjson_fallback_test.go b/cmd/server/loadchunk_pathjson_fallback_test.go new file mode 100644 index 00000000..518fe161 --- /dev/null +++ b/cmd/server/loadchunk_pathjson_fallback_test.go @@ -0,0 +1,180 @@ +package main + +import ( + "database/sql" + "fmt" + "path/filepath" + "testing" + "time" + + _ "modernc.org/sqlite" +) + +// createTestDBPathJSONNoResolvedPath builds a fixture that mirrors the LIVE +// deployment state after #1287: observations carry a path_json hop list but +// observations.resolved_path is NULL (the ingestor no longer writes it; relay +// data is persisted as aggregate neighbor_edges instead). A single repeater +// node whose public_key starts with hopPrefix lets the in-memory prefix map +// resolve that hop unambiguously to relayPubkey. +// +// The transmission's decoded_json is empty ({}), so relayPubkey is NOT an +// endpoint (pubKey/destPubKey/srcPubKey). The ONLY way it can enter +// s.byNode is via path_json → resolvePathForObs relay-hop resolution. +func createTestDBPathJSONNoResolvedPath(t *testing.T, relayPubkey, hopPrefix, firstSeen string) string { + t.Helper() + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + + conn, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + exec := func(s string, args ...interface{}) { + if _, err := conn.Exec(s, args...); err != nil { + t.Fatalf("setup exec failed: %v\nSQL: %s", err, s) + } + } + + // PREFLIGHT: async=true reason="test fixture: in-memory t.TempDir SQLite, never touches a real DB. Tables are CREATE-from-empty in a one-shot OpenDB call, not a schema migration over existing data." + exec(`CREATE TABLE transmissions ( + id INTEGER PRIMARY KEY, + raw_hex TEXT, hash TEXT, first_seen TEXT, + route_type INTEGER, payload_type INTEGER, payload_version INTEGER, + decoded_json TEXT + )`) + // resolved_path column present (matches live schema) but left NULL. + // PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB" + exec(`CREATE TABLE observations ( + id INTEGER PRIMARY KEY, + transmission_id INTEGER, + observer_id TEXT, observer_name TEXT, + direction TEXT, snr REAL, rssi REAL, score INTEGER, + path_json TEXT, timestamp TEXT, + raw_hex TEXT, + resolved_path TEXT + )`) + // PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB" + exec(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`) + // Production nodes schema uses public_key (not pubkey) — getAllNodes / + // buildPrefixMap reads public_key, role, advert_count, first_seen. + // PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB" + exec(`CREATE TABLE nodes ( + public_key TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL, + last_seen TEXT, first_seen TEXT, advert_count INTEGER DEFAULT 0 + )`) + // PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB" + exec(`CREATE TABLE schema_version (version INTEGER)`) + exec(`INSERT INTO schema_version (version) VALUES (1)`) + // PREFLIGHT: async=true reason="test fixture, in-memory tmpdir DB" + exec(`CREATE INDEX idx_tx_first_seen ON transmissions(first_seen)`) + + // Repeater node so canAppearInPath() admits it to the prefix map. + exec(`INSERT INTO nodes (public_key, name, role, advert_count) VALUES (?,?,?,?)`, + relayPubkey, "Relay One", "repeater", 10) + + exec("INSERT INTO transmissions VALUES (?,?,?,?,0,4,1,?)", + 1, "aa", "hashpjf_1", firstSeen, `{}`) + // resolved_path explicitly NULL; path_json carries the relay hop prefix. + exec("INSERT INTO observations (id, transmission_id, observer_id, observer_name, direction, snr, rssi, score, path_json, timestamp, raw_hex, resolved_path) VALUES (?,?,?,?,?,?,?,?,?,?,?,NULL)", + 1, 1, "obs1", "Obs1", "RX", -10.0, -80.0, 5, fmt.Sprintf(`[%q]`, hopPrefix), firstSeen, "") + + return dbPath +} + +// TestLoadChunked_ResolvesRelayHopsFromPathJSON_WhenResolvedPathEmpty pins the +// fix for the "relay-node analytics empty after every restart" bug. +// +// On live, observations.resolved_path is 100% NULL (since #1287 the ingestor +// persists relay data as neighbor_edges, not per-observation resolved_path). +// The cold-load paths (Load / scanAndMergeChunk) indexed relay hops ONLY from +// resolved_path, so a relay node's path-hop attribution was never rebuilt on +// startup — it only re-accumulated from live traffic, collapsing the activity +// timeline to "just the hour the server restarted". +// +// The fix: when resolved_path is empty, fall back to resolving the hops from +// the persisted path_json using the in-memory prefix map + neighbor graph +// (exactly what the live ingest path already does), then index the relay hops. +func TestLoadChunked_ResolvesRelayHopsFromPathJSON_WhenResolvedPathEmpty(t *testing.T) { + relayPK := "aabbccddeeff00112233445566778899aabbccddeeff00112233445566778899" + hop := "aa" // 2-hex-char path hop; unique 2-char prefix of relayPK + + ts := time.Now().UTC().Format(time.RFC3339) + dbPath := createTestDBPathJSONNoResolvedPath(t, relayPK, hop, ts) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + if !db.hasResolvedPath { + t.Fatalf("setup: fixture should expose resolved_path column; hasResolvedPath=false") + } + + store := NewPacketStore(db, &PacketStoreConfig{RetentionHours: 72}) + // Empty graph is sufficient: a single prefix candidate resolves without + // neighbor-affinity disambiguation. Mirrors a freshly restarted server + // that has loaded its neighbor_edges snapshot before the packet load. + store.graph.Store(NewNeighborGraph()) + + if err := store.LoadChunked(0); err != nil { + t.Fatalf("LoadChunked: %v", err) + } + + // The relay pubkey only reachable through path_json resolution must be + // indexed in byNode for the transmission. + if got := len(store.byNode[relayPK]); got != 1 { + t.Errorf("byNode[%s]: got %d transmissions, want 1 — cold load did not "+ + "resolve relay hops from path_json when resolved_path was NULL "+ + "(relay history lost on restart)", relayPK, got) + } +} + +// TestLoadChunk_ResolvesRelayHopsFromPathJSON_WhenResolvedPathEmpty covers the +// background-window loader (loadBackgroundChunks → loadChunk), which on live +// loads everything older than hotStartupHours (24h) up to retentionHours +// (168h). Without the path_json fallback here, a relay node's analytics for +// the older 6 days would still vanish on every restart even with the hot +// window fixed. +func TestLoadChunk_ResolvesRelayHopsFromPathJSON_WhenResolvedPathEmpty(t *testing.T) { + relayPK := "ccddeeff00112233445566778899aabbccddeeff00112233445566778899aabb" + hop := "cc" + + // Aged 48h so it falls in the background window, not the hot window. + aged := time.Now().UTC().Add(-48 * time.Hour).Format(time.RFC3339) + dbPath := createTestDBPathJSONNoResolvedPath(t, relayPK, hop, aged) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 1, // hot load must NOT pick up the 48h-old row + }) + store.graph.Store(NewNeighborGraph()) + + if err := store.LoadChunked(0); err != nil { + t.Fatalf("LoadChunked: %v", err) + } + if got := len(store.byNode[relayPK]); got != 0 { + t.Fatalf("setup: hot load unexpectedly picked up 48h-old row; "+ + "byNode[relayPK]=%d (want 0) — test would not exercise loadChunk", got) + } + + chunkStart := time.Now().UTC().Add(-72 * time.Hour) + chunkEnd := time.Now().UTC().Add(-1 * time.Hour) + if err := store.loadChunk(chunkStart, chunkEnd); err != nil { + t.Fatalf("loadChunk: %v", err) + } + + if got := len(store.byNode[relayPK]); got != 1 { + t.Errorf("byNode[%s]: got %d transmissions, want 1 — background loadChunk "+ + "did not resolve relay hops from path_json when resolved_path was NULL "+ + "(relay history lost on restart for the older retention window)", relayPK, got) + } +} diff --git a/cmd/server/main.go b/cmd/server/main.go index 7491157c..c541dca7 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -198,6 +198,23 @@ func main() { // In-memory packet store store := NewPacketStore(database, cfg.PacketStore, cfg.CacheTTL) store.config = cfg + + // Load the persisted neighbor graph BEFORE the packet load so the + // chunked loader can resolve relay-hop pubkeys from path_json. Since + // #1287 the ingestor persists relay data only as aggregate + // neighbor_edges — observations.resolved_path is never written — so + // without an available graph at load time a relay node's analytics + // history would rebuild only from post-restart live traffic (the + // "timeline empty after every restart" bug). neighbor_edges is small, + // so this adds negligible latency before the HTTP listener binds. The + // fresh-DB branch (no snapshot) still builds in-memory AFTER the load + // below, because BuildFromStore needs the loaded packets. + neighborEdgesPersisted := neighborEdgesTableExists(database.conn) + if neighborEdgesPersisted { + store.graph.Store(loadNeighborEdgesFromDB(database.conn)) + log.Printf("[neighbor] loaded persisted neighbor graph") + } + // #1009: chunked Load with early HTTP readiness. LoadChunked runs // asynchronously and signals FirstChunkReady after the first chunk // is merged so the HTTP listener can bind without waiting for the @@ -228,9 +245,9 @@ func main() { go store.loadBackgroundChunks() } - // Initialize persisted neighbor graph. - // Per #1287, schema migrations all live in the ingestor (see - // dbschema.Apply). The server merely loads the snapshot here and + // Neighbor graph: the persisted snapshot (if present) was already + // loaded above, before the packet load. Per #1287 schema migrations + // all live in the ingestor; the server only reads the snapshot and // then refreshes it via the recompNeighborGraph slot every 60s. dbPath = database.path database.hasResolvedPath = true // dbschema.AssertReady above already verified observations.resolved_path exists @@ -238,11 +255,7 @@ func main() { // WaitGroup for background init steps that gate /api/healthz readiness. var initWg sync.WaitGroup - // Load or build neighbor graph - if neighborEdgesTableExists(database.conn) { - store.graph.Store(loadNeighborEdgesFromDB(database.conn)) - log.Printf("[neighbor] loaded persisted neighbor graph") - } else { + if !neighborEdgesPersisted { // No persisted snapshot yet (e.g. fresh DB before the ingestor // has run its first edge-build cycle). Build an in-memory graph // from the packets we already have so reads aren't empty. We diff --git a/cmd/server/neighbor_persist.go b/cmd/server/neighbor_persist.go index 54a40cc8..98d24818 100644 --- a/cmd/server/neighbor_persist.go +++ b/cmd/server/neighbor_persist.go @@ -131,6 +131,63 @@ func resolvePathForObs(pathJSON, observerID string, tx *StoreTx, pm *prefixMap, return resolved } +// resolvePathForObsColdLoad is the cold-load (Load / loadChunk / scanAndMergeChunk) +// variant of resolvePathForObs that gates hop resolution on `unique_prefix` +// only. Live ingest uses the affinity/observation-count tiebreak via +// resolvePathForObs because it has roughly-current state. Cold load runs +// against observations up to retentionHours (168h) old, where today's +// affinity winner ≠ historical affinity winner for that prefix — silently +// mis-attributing the relay (PR #1643 R1 munger #1, "time-travel attribution +// gate"). +// +// Behavior: hops whose prefix maps to exactly one repeater resolve as +// usual; hops whose prefix maps to multiple candidates return nil and +// increment skipped (caller-owned counter for observability — a single +// summary log line at the end of Load surfaces the total). +// +// Under-attribute > mis-attribute (reviewer consensus on PR #1643). +func resolvePathForObsColdLoad(pathJSON, observerID string, tx *StoreTx, pm *prefixMap, skipped *int) []*string { + hops := parsePathJSON(pathJSON) + if len(hops) == 0 { + return nil + } + resolved := make([]*string, len(hops)) + for i, hop := range hops { + // unique_prefix iff the prefix maps to exactly one candidate + // after the observer-known nonRelay filter. Mirrors the + // `len(candidates) == 1 → "unique_prefix"` arm of + // resolveWithContext (store.go ~6380). Calling resolveWithContext + // with a nil graph and empty context skips the affinity/ + // observation-count tiers entirely — but tier-4 + // observation_count_fallback would still pick a winner for + // ambiguous prefixes, which is exactly what we must NOT do. + // Hence the explicit candidate-count check here. + h := strings.ToLower(hop) + candidates := pm.m[h] + if len(pm.nonRelay) > 0 && len(candidates) > 0 { + filtered := candidates[:0:0] + for j := range candidates { + if _, isListener := pm.nonRelay[strings.ToLower(candidates[j].PublicKey)]; isListener { + continue + } + filtered = append(filtered, candidates[j]) + } + candidates = filtered + } + if len(candidates) == 1 { + pk := strings.ToLower(candidates[0].PublicKey) + resolved[i] = &pk + continue + } + // Ambiguous (len > 1) or no_match (len == 0). Under-attribute. + if len(candidates) > 1 && skipped != nil { + *skipped++ + } + // resolved[i] stays nil; extractResolvedPubkeys filters it out. + } + return resolved +} + // marshalResolvedPath converts []*string to JSON for in-memory caching. func marshalResolvedPath(rp []*string) string { if len(rp) == 0 { diff --git a/cmd/server/store.go b/cmd/server/store.go index 7b9e4677..5fab20f9 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -649,6 +649,18 @@ func (s *PacketStore) Load() error { s.mu.Lock() defer s.mu.Unlock() + // Startup-ordering invariant (PR #1643 R1 munger #2). Graph-load- + // before-packet-load is the entire premise of the relay-history + // cold-load fix: without an in-memory neighbor graph, the path_json + // fallback below cannot disambiguate relay hops, so relay-node + // analytics history silently collapses on every restart. main.go + // loads neighbor_edges → s.graph BEFORE invoking Load/LoadChunked; + // fast-fail here so a future refactor that swaps that ordering trips + // the panic at startup instead of regressing the bug silently. + if neighborEdgesTableExists(s.db.conn) && s.graph.Load() == nil { + panic("packet store Load(): neighbor_edges table has rows but s.graph is nil — graph must be loaded before packet load (see main.go #1643 invariant)") + } + t0 := time.Now() // Count total transmissions for logging. @@ -728,6 +740,19 @@ func (s *PacketStore) Load() error { ORDER BY t.first_seen ASC, o.timestamp DESC` } + // Relay-hop fallback inputs. When resolved_path is empty (always, on + // live, since #1287 the ingestor persists relay data as neighbor_edges + // instead) we re-resolve relay hops from path_json using the prefix + // map. PR #1643 R1 munger #1: cold load resolves ONLY when the prefix + // is unique (no affinity tiebreak against ≤168h-old observations, + // which would silently mis-attribute hops). Fetched BEFORE opening + // the rows cursor below: getCachedNodesAndPM issues its own DB query, + // which would deadlock against the still-open cursor on a single- + // connection SQLite pool. Load holds s.mu.Lock(), so calling + // getCachedNodesAndPM directly is safe. + _, relayPM := s.getCachedNodesAndPM() + var coldLoadAmbiguousHopsSkipped int + rows, err := s.db.conn.Query(loadSQL) if err != nil { return err @@ -825,6 +850,21 @@ func (s *PacketStore) Load() error { pks := extractResolvedPubkeys(rp) // Single point of truth — see indexResolvedPathHops doc + #1558. s.indexResolvedPathHops(tx, pks, hopsSeen) + } else if relayPM != nil && obsPJ != "" && obsPJ != "[]" { + // resolved_path not persisted — reconstruct relay hops from + // path_json so relay-node analytics history survives a restart. + // Index into byNode ONLY: the resolved_path / path-hop indexes + // (indexResolvedPathHops) are cross-checked by handleNodePaths + // against the persisted resolved_path column, which is NULL + // here — populating them would make that SQL confirmation fail + // and wrongly drop the tx from paths-through (#1352). byNode is + // what the node-analytics activity timeline reads. + // PR #1643 R1 munger #1: gate on unique_prefix only — ambiguous + // hops are silently dropped (skipped counter logged at end). + rp := resolvePathForObsColdLoad(obsPJ, obsIDStr, tx, relayPM, &coldLoadAmbiguousHopsSkipped) + for _, pk := range extractResolvedPubkeys(rp) { + s.addToByNode(tx, pk) + } } tx.Observations = append(tx.Observations, obs) @@ -900,6 +940,10 @@ func (s *PacketStore) Load() error { len(s.packets), s.totalObs, elapsed, s.trackedMemoryMB(), s.estimatedMemoryMB()) } s.loadMultibyteCapFromDB() + if coldLoadAmbiguousHopsSkipped > 0 { + log.Printf("[store] cold load: skipped %d ambiguous-prefix relay hops (unique_prefix gate, PR #1643 R1)", + coldLoadAmbiguousHopsSkipped) + } // Kick off background subpath + path-hop index builds (#1008). // The goroutine acquires s.mu.Lock() and so will block until Load's // deferred Unlock fires when this function returns. HTTP handlers @@ -919,6 +963,28 @@ func (s *PacketStore) Load() error { // byPayloadType is updated here incrementally. byPathHop, spIndex, and // distHops are NOT updated here — the caller (loadBackgroundChunks) rebuilds // those once after all chunks are merged. + +// accumulateDedup appends pks to byTx[txID], deduplicating per-tx via the +// parallel seenByTx set. Used by loadChunk to build the per-tx relay-hop +// pubkey unions outside the merge critical section. +func accumulateDedup(byTx map[int][]string, seenByTx map[int]map[string]bool, txID int, pks []string) { + if len(pks) == 0 { + return + } + seen := seenByTx[txID] + if seen == nil { + seen = make(map[string]bool, len(pks)) + seenByTx[txID] = seen + } + for _, pk := range pks { + if seen[pk] { + continue + } + seen[pk] = true + byTx[txID] = append(byTx[txID], pk) + } +} + func (s *PacketStore) loadChunk(from, to time.Time) error { fromStr := from.UTC().Format(time.RFC3339) toStr := to.UTC().Format(time.RFC3339) @@ -955,6 +1021,22 @@ func (s *PacketStore) loadChunk(from, to time.Time) error { ORDER BY t.first_seen ASC, o.timestamp DESC` } + // Relay-hop fallback inputs. observations.resolved_path is NULL on + // every live deployment (since #1287 the ingestor persists relay data + // as aggregate neighbor_edges, not per-observation resolved_path), so + // for this background-loaded older window we re-resolve relay hops + // from the persisted path_json using the prefix map. PR #1643 R1 + // munger #1: cold load resolves ONLY when the prefix is unique + // (affinity-tier resolution against ≤168h-old observations would + // silently mis-attribute hops). Fetched BEFORE opening the rows + // cursor below: getCachedNodesAndPM issues its own DB query, which + // would deadlock against the open cursor on a single-connection SQLite + // pool. + s.mu.RLock() + _, relayPM := s.getCachedNodesAndPM() + s.mu.RUnlock() + var coldLoadAmbiguousHopsSkipped int + rows, err := s.db.conn.Query(chunkSQL, fromStr, toStr) if err != nil { return err @@ -973,6 +1055,11 @@ func (s *PacketStore) loadChunk(from, to time.Time) error { // the rest of loadChunk's "build local, merge under lock" shape. localResolvedPKsByTx := make(map[int][]string) localResolvedSeenByTx := make(map[int]map[string]bool) + // Path_json fallback pubkeys (resolved_path NULL): indexed into byNode + // ONLY at merge, kept separate from localResolvedPKsByTx because they must + // NOT enter the resolved_path/path-hop indexes (see the per-row comment). + localByNodePKsByTx := make(map[int][]string) + localByNodeSeenByTx := make(map[int]map[string]bool) var localTotalObs int var localTrackedBytes int64 var localMaxTxID int @@ -1083,20 +1170,20 @@ func (s *PacketStore) loadChunk(from, to time.Time) error { if rpStr != "" { rp := unmarshalResolvedPath(rpStr) pks := extractResolvedPubkeys(rp) - if len(pks) > 0 { - seen := localResolvedSeenByTx[txID] - if seen == nil { - seen = make(map[string]bool, len(pks)) - localResolvedSeenByTx[txID] = seen - } - for _, pk := range pks { - if seen[pk] { - continue - } - seen[pk] = true - localResolvedPKsByTx[txID] = append(localResolvedPKsByTx[txID], pk) - } - } + accumulateDedup(localResolvedPKsByTx, localResolvedSeenByTx, txID, pks) + } else if relayPM != nil && obsPJ != "" && obsPJ != "[]" { + // resolved_path not persisted — reconstruct relay hops from + // path_json so the older retention window keeps relay-node + // history across a restart (mirrors the hot-window fix in + // scanAndMergeChunk and the live ingest path). These go into a + // SEPARATE accumulator indexed into byNode ONLY at merge — they + // must not enter the resolved_path/path-hop indexes, which + // handleNodePaths cross-checks against the NULL resolved_path + // column (#1352). + // PR #1643 R1 munger #1: unique_prefix-only gate. + rp := resolvePathForObsColdLoad(obsPJ, obsIDStr, tx, relayPM, &coldLoadAmbiguousHopsSkipped) + pks := extractResolvedPubkeys(rp) + accumulateDedup(localByNodePKsByTx, localByNodeSeenByTx, txID, pks) } } } @@ -1207,6 +1294,12 @@ func (s *PacketStore) loadChunk(from, to time.Time) error { if pks := localResolvedPKsByTx[tx.ID]; len(pks) > 0 { s.indexResolvedPathHops(tx, pks, hopsSeen) } + // path_json fallback (resolved_path NULL): byNode ONLY, so relay + // nodes keep their analytics history across a restart without + // polluting the resolved_path/path-hop indexes (#1352). + for _, pk := range localByNodePKsByTx[tx.ID] { + s.addToByNode(tx, pk) + } } s.mu.Unlock() runtime.Gosched() @@ -1229,6 +1322,10 @@ func (s *PacketStore) loadChunk(from, to time.Time) error { s.mu.Unlock() log.Printf("[store] background chunk [%s, %s) merged: %d tx, %d obs", fromStr, toStr, len(localPackets), localTotalObs) + if coldLoadAmbiguousHopsSkipped > 0 { + log.Printf("[store] background chunk: skipped %d ambiguous-prefix relay hops (unique_prefix gate, PR #1643 R1)", + coldLoadAmbiguousHopsSkipped) + } return nil }