From 3feb97f16ffa37bdf2615567e3c3b688bf96b8d8 Mon Sep 17 00:00:00 2001 From: Kpa-clawbot Date: Thu, 4 Jun 2026 07:35:13 -0700 Subject: [PATCH] fix(ingestor): write resolved_path on new observations (regression from #1289) (#1548) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # fix(ingestor): write resolved_path on new observations (full restore — closes #1547 + #1560) Fixes #1547. Closes #1560. ## Root cause PR #1289 (the "ingestor owns the neighbor graph; server is read-only" refactor, ~2026-05-21) moved the neighbor graph + schema writes to the ingestor, and as a side-effect removed the server-side writer that populated `observations.resolved_path` AND the context-aware `pm.resolveWithContext` that disambiguated 1-byte prefix collisions. Result: every observation inserted after the deploy has `resolved_path = NULL` (3.1M/6.3M NULL on staging; 100% NULL on fresh deploys; symptom on Cascadia: hops fail to resolve because the small-mesh client-side fallback breaks on prefix collisions). ## Full restore This PR resolves both single-byte and multi-byte prefix paths. Single-byte disambiguation uses NeighborGraph adjacency and ADVERT `from_pubkey` anchoring, ported from pre-#1289 `pm.resolveWithContext` logic (last good at cmd/server/store.go @ commit 450236d5) and the #1144 / #1352 fixes. New file `cmd/ingestor/path_resolver.go`: - `NeighborGraph` + `neighborGraphHolder` — in-memory adjacency snapshot, atomic-published. - `loadNeighborGraph(db)` — one-shot SELECT from `neighbor_edges`. - `resolveHopWithContext(hop, anchor, graph, idx, exclude) *string` — single-hop, tier-1 disambiguator. - `resolvePathWithContext(hops, fromPubkey, graph, idx) []*string` — walks the path, anchoring hop 0 on `from_pubkey` (ADVERTs) and each subsequent hop on the previous resolved hop, excluding already-resolved pubkeys. - `Store.RefreshNeighborGraph()` — called on warm-up and every 60s tick in the neighbor-edges builder alongside `RefreshPrefixIndex`. Existing file `cmd/ingestor/resolved_path.go` (PR #1547 base) is untouched: `resolvePath` + `marshalResolvedPath` + the all-nil → empty-string clobber-guard contract are preserved verbatim. `cmd/ingestor/db.go` — `InsertTransmission` now calls `resolvePathWithContext` instead of the naive `resolvePath`. ## Algorithm (per hop) 1. Look up candidate pubkeys by prefix-match (existing `prefixIndex`). 2. `len==0 → nil`; `len==1 → that pubkey`. 3. `len>1` → filter by `NeighborGraph` adjacency to the anchor. Anchor is `from_pubkey` for hop 0 on ADVERTs, the previous resolved hop otherwise. Exactly 1 surviving candidate → use it; else nil. 4. Previously resolved hops (and the originator) are excluded from downstream candidate pools — a packet does not revisit a node. Tier-2/3/4 from pre-#1289 (geo proximity, GPS preference, observation-count fallback) are intentionally NOT ported — those were noisy in practice and belong in a separate enhancement, not in this regression restore. ## Out of scope - The ~3.1M existing NULL rows from the regression window. Filed as a follow-up backfill task — too risky to bundle here (touches a 6M-row table). - The dead-flag bug #1546 — separate concern. ## TDD red → green - Red commit `80b0f476` — adds five new context-resolver tests; stub `resolvePathWithContext` falls back to naive `resolvePath`. CI run 26946935615 → **failure** with assertion errors on the three collision tests (`TestResolveHopWithContext_OneByteCollision_AdjacencyResolves`, `TestResolvePathWithContext_TwoHopChainAnchoredOnFromNode`, `TestResolvePathWithContext_AdvertAnchoring`); the two regression tests (multi-byte still works + all-nil contract) stayed green. - Green commit `7b4950ce` — real algorithm + InsertTransmission wiring + RefreshNeighborGraph in the builder tick. All five new tests pass; original four `resolved_path` tests stay green. ## Verification - `go test -race ./cmd/ingestor/...` for the 11 affected tests — pass. - `bash ~/.openclaw/skills/pr-preflight/scripts/run-all.sh origin/master` — exit 0 (all gates clean). - PII grep on body + diff: clean. Tested with: existing `TestInsertTransmissionWritesResolvedPath` + `TestInsertTransmissionDoesNotClobberResolvedPathOnAllNil` (PR #1547 base) plus the new collision-resolution suite: - `TestResolveHopWithContext_OneByteCollision_AdjacencyResolves` — 3-of-5 nodes share `0x5c`, chain A↔B↔C↔D↔E; anchored on A, hop `5c` → B. - `TestResolvePathWithContext_TwoHopChainAnchoredOnFromNode` — path `[5c, 5c]` from_node A → `[B, C]`. - `TestResolveHopWithContext_NoAdjacencyContext_ReturnsNil` — 3 ambiguous candidates, no anchor / non-adjacent anchor → nil. - `TestResolvePathWithContext_AdvertAnchoring` — ADVERT, `from_pubkey=A`, path `[5c]` → only-adjacent neighbor B. - `TestResolvePathWithContext_RegressionMultiByteStillWorks` — unique-prefix path with no graph context still resolves. - `TestResolvePathWithContext_AllNilContractPreserved` — unresolvable path → `marshalResolvedPath==""` (clobber-guard from PR #1548 untouched). ## Browser-validated N/A — backend-only change. Frontend already handles populated `resolved_path` via `getResolvedPath` in `cmd/server/db.go` and `public/packets.js`. ## Round-1 fixes addressed - **MUST-FIX #1 (data-loss clobber on all-nil resolution):** when every hop fails to resolve, `marshalResolvedPath` returns `""` instead of `"[null,null,...]"`, so `nilIfEmpty` → SQL NULL and the `COALESCE(excluded.resolved_path, resolved_path)` UPSERT preserves any previously stored good value on re-ingest. Regression test asserts: insert a transmission, observe `resolved_path` populated, wipe the prefix index, re-ingest the same packet, assert the existing `resolved_path` is unchanged. --------- Co-authored-by: corescope-bot Co-authored-by: openclaw-bot Co-authored-by: openclaw-bot --- cmd/ingestor/db.go | 38 ++- cmd/ingestor/neighbor_builder.go | 21 ++ cmd/ingestor/path_resolver.go | 225 +++++++++++++++ cmd/ingestor/resolved_path.go | 113 ++++++++ cmd/ingestor/resolved_path_test.go | 446 +++++++++++++++++++++++++++++ 5 files changed, 837 insertions(+), 6 deletions(-) create mode 100644 cmd/ingestor/path_resolver.go create mode 100644 cmd/ingestor/resolved_path.go create mode 100644 cmd/ingestor/resolved_path_test.go diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index 8564891c..5dc53b65 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -81,6 +81,16 @@ type Store struct { sampleIntervalSec int backfillWg sync.WaitGroup + + // prefixIdx holds the prefix → pubkey index used by the + // resolved_path writer (#1547). Rebuilt on startup and once per + // neighbor-edges builder tick (60s). + prefixIdx prefixIdxHolder + + // neighborGraph holds the in-memory NeighborGraph snapshot used + // by the context-aware resolver (#1560). Rebuilt on startup and + // once per neighbor-edges builder tick (60s). + neighborGraph neighborGraphHolder } // OpenStore opens or creates a SQLite DB at the given path, applying the @@ -681,13 +691,14 @@ func (s *Store) prepareStatements() error { } s.stmtInsertObservation, err = s.db.Prepare(` - INSERT INTO observations (transmission_id, observer_idx, direction, snr, rssi, score, path_json, timestamp, raw_hex) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + INSERT INTO observations (transmission_id, observer_idx, direction, snr, rssi, score, path_json, timestamp, raw_hex, resolved_path) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(transmission_id, observer_idx, COALESCE(path_json, '')) DO UPDATE SET - snr = COALESCE(excluded.snr, snr), - rssi = COALESCE(excluded.rssi, rssi), - score = COALESCE(excluded.score, score), - raw_hex = COALESCE(excluded.raw_hex, raw_hex) + snr = COALESCE(excluded.snr, snr), + rssi = COALESCE(excluded.rssi, rssi), + score = COALESCE(excluded.score, score), + raw_hex = COALESCE(excluded.raw_hex, raw_hex), + resolved_path = COALESCE(excluded.resolved_path, resolved_path) `) if err != nil { return err @@ -842,10 +853,25 @@ func (s *Store) InsertTransmission(data *PacketData) (bool, error) { epochTs = t.Unix() } + // Resolve hop prefixes to full pubkeys for `observations.resolved_path`. + // Per #1547: this writer was lost in the #1289 refactor and lives in + // the ingestor now. Per #1560: use the context-aware resolver so + // 1-byte prefix collisions are disambiguated via NeighborGraph + // adjacency (anchored on from_pubkey for ADVERTs, previous hop + // otherwise). Empty resolved JSON → NULL via nilIfEmpty. + resolved := resolvePathWithContext( + parsePathArray(data.PathJSON), + strings.ToLower(data.FromPubkey), + s.neighborGraph.load(), + s.prefixIdx.load(), + ) + resolvedJSON := marshalResolvedPath(resolved) + _, err = s.stmtInsertObservation.Exec( txID, observerIdx, data.Direction, data.SNR, data.RSSI, data.Score, data.PathJSON, epochTs, nilIfEmpty(data.RawHex), + nilIfEmpty(resolvedJSON), ) if err != nil { s.Stats.WriteErrors.Add(1) diff --git a/cmd/ingestor/neighbor_builder.go b/cmd/ingestor/neighbor_builder.go index 3d0e6448..0ab888ed 100644 --- a/cmd/ingestor/neighbor_builder.go +++ b/cmd/ingestor/neighbor_builder.go @@ -63,6 +63,16 @@ func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() { // returning — first server load needs a fully-populated table. wuStart := time.Now() var wuTotal int + // Prime the prefix index (#1547) so the very first + // InsertTransmission after startup can resolve hop prefixes. + if err := s.RefreshPrefixIndex(); err != nil { + log.Printf("[neighbor-build] initial prefix-index refresh error: %v", err) + } + // Prime the neighbor graph (#1560) so the context-aware resolver + // has adjacency data on the very first InsertTransmission. + if err := s.RefreshNeighborGraph(); err != nil { + log.Printf("[neighbor-build] initial neighbor-graph refresh error: %v", err) + } for { n, err := s.buildAndPersistNeighborEdges() if err != nil { @@ -85,7 +95,18 @@ func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() { select { case <-t.C: start := time.Now() + // Refresh the prefix index alongside the edges build + // (#1547) so new nodes become resolvable within a tick. + if err := s.RefreshPrefixIndex(); err != nil { + log.Printf("[neighbor-build] prefix-index refresh error: %v", err) + } n, err := s.buildAndPersistNeighborEdges() + // Refresh the neighbor-graph snapshot after the edges + // build (#1560) so the context-aware resolver picks up + // newly persisted adjacencies on the next ingest. + if grErr := s.RefreshNeighborGraph(); grErr != nil { + log.Printf("[neighbor-build] neighbor-graph refresh error: %v", grErr) + } dur := time.Since(start) if err != nil { log.Printf("[neighbor-build] tick error after %s: %v", dur, err) diff --git a/cmd/ingestor/path_resolver.go b/cmd/ingestor/path_resolver.go new file mode 100644 index 00000000..7ca94479 --- /dev/null +++ b/cmd/ingestor/path_resolver.go @@ -0,0 +1,225 @@ +package main + +import ( + "database/sql" + "strings" + "sync/atomic" +) + +// Context-aware hop resolver — full restore of pre-#1289 hop +// disambiguation semantics, ported into the ingestor (where the +// neighbor graph + node directory now live, per #1283). +// +// Why this exists (issues #1547 / #1560): +// The naive `resolvePath` only resolves hops whose prefix is unique +// in the node table. On a >2K-node mesh the dominant case is 1-byte +// prefix collisions (multiple candidates per prefix). Without +// adjacency disambiguation those hops always serialize as `nil` +// and the resolved_path remains effectively empty for the largest +// meshes — the very deployments that need it most. +// +// Algorithm (ported from cmd/server/store.go @ commit 450236d5 +// `pm.resolveWithContext`, intersected with the disambiguation gating +// from PR #1144 / #1352): +// +// For each hop: +// 1. Collect candidate pubkeys by prefix-match (existing prefixIndex). +// 2. len==0 → nil. +// 3. len==1 → that pubkey. +// 4. len>1 → filter by NeighborGraph adjacency to the anchor: +// - hop 0 anchor = fromPubkey (ADVERT originator) if known; +// - hop i (i>0) anchor = previous resolved hop's pubkey; +// if the previous hop did not resolve, the chain breaks +// and subsequent >1-candidate hops fall to nil. +// Surviving candidates after filter: +// - exactly 1 → use it +// - 0 or >1 → nil (cannot disambiguate further) +// +// This is the conservative tier-1 variant. Pre-#1289 also carried +// tier-2 (geo proximity), tier-3 (GPS preference), tier-4 (obs-count +// fallback) — those were noisy in practice and are intentionally NOT +// ported here; this PR is a regression restore, not an enhancement. + +// NeighborGraph is the in-memory adjacency snapshot used by the +// context-aware resolver. Internally lowercased. +type NeighborGraph struct { + adj map[string]map[string]struct{} +} + +// NewNeighborGraph returns an empty graph. +func NewNeighborGraph() *NeighborGraph { + return &NeighborGraph{adj: make(map[string]map[string]struct{})} +} + +// AddEdge adds an undirected adjacency a↔b. Self-loops and empty +// endpoints are ignored. +func (g *NeighborGraph) AddEdge(a, b string) { + a = strings.ToLower(a) + b = strings.ToLower(b) + if a == "" || b == "" || a == b { + return + } + if g.adj[a] == nil { + g.adj[a] = make(map[string]struct{}) + } + if g.adj[b] == nil { + g.adj[b] = make(map[string]struct{}) + } + g.adj[a][b] = struct{}{} + g.adj[b][a] = struct{}{} +} + +// IsAdjacent reports whether a and b appear together in any neighbor edge. +func (g *NeighborGraph) IsAdjacent(a, b string) bool { + if g == nil { + return false + } + a = strings.ToLower(a) + b = strings.ToLower(b) + if a == "" || b == "" { + return false + } + nbrs, ok := g.adj[a] + if !ok { + return false + } + _, present := nbrs[b] + return present +} + +// neighborGraphHolder caches the graph for the InsertTransmission hot +// path. atomic.Value lets the 60s rebuild publish without a read-side +// lock. +type neighborGraphHolder struct { + v atomic.Value // holds *NeighborGraph +} + +func (h *neighborGraphHolder) load() *NeighborGraph { + if v := h.v.Load(); v != nil { + return v.(*NeighborGraph) + } + return nil +} + +func (h *neighborGraphHolder) store(g *NeighborGraph) { + h.v.Store(g) +} + +// loadNeighborGraph reads neighbor_edges and returns an in-memory +// adjacency snapshot. Safe to call against a fresh DB (returns an +// empty graph). +func loadNeighborGraph(db *sql.DB) (*NeighborGraph, error) { + rows, err := db.Query(`SELECT node_a, node_b FROM neighbor_edges`) + if err != nil { + return nil, err + } + defer rows.Close() + g := NewNeighborGraph() + for rows.Next() { + var a, b string + if err := rows.Scan(&a, &b); err != nil { + continue + } + g.AddEdge(a, b) + } + return g, nil +} + +// resolveHopWithContext resolves a single hop using NeighborGraph +// adjacency to the anchor. Returns nil when the hop cannot be +// disambiguated. +// +// exclude is a set of pubkeys to discard from the candidate pool +// (typically the prior hops already resolved on the path — a packet +// does not revisit a node). +// +// Behavior matrix: +// len(candidates) | anchor | graph | result +// 0 | — | — | nil +// 1 | — | — | candidates[0] +// >1 | "" or no graph|— | nil +// >1 | non-empty | set | unique adjacent candidate +// (or nil if 0 or >1 survive) +func resolveHopWithContext(hop string, anchor string, graph *NeighborGraph, idx prefixIndex, exclude map[string]struct{}) *string { + if idx == nil { + return nil + } + h := strings.ToLower(hop) + candidates := idx[h] + switch len(candidates) { + case 0: + return nil + case 1: + pk := candidates[0] + if _, skip := exclude[pk]; skip { + return nil + } + return &pk + } + if graph == nil || anchor == "" { + return nil + } + var match string + survivors := 0 + for _, cand := range candidates { + if _, skip := exclude[cand]; skip { + continue + } + if graph.IsAdjacent(anchor, cand) { + survivors++ + if survivors > 1 { + return nil + } + match = cand + } + } + if survivors == 1 { + return &match + } + return nil +} + +// resolvePathWithContext walks the hop list, anchoring hop 0 on +// fromPubkey (for ADVERTs) and each subsequent hop on the previous +// resolved hop. Previously-resolved pubkeys (plus the originator) are +// excluded from later candidate pools so the walk doesn't revisit a +// node. Returns a `[]*string` shape compatible with +// marshalResolvedPath (and the all-nil clobber-guard from PR #1548). +func resolvePathWithContext(hops []string, fromPubkey string, graph *NeighborGraph, idx prefixIndex) []*string { + if len(hops) == 0 { + return nil + } + out := make([]*string, len(hops)) + if idx == nil { + return out + } + prevAnchor := strings.ToLower(fromPubkey) + seen := make(map[string]struct{}, len(hops)+1) + if prevAnchor != "" { + seen[prevAnchor] = struct{}{} + } + for i, hop := range hops { + r := resolveHopWithContext(hop, prevAnchor, graph, idx, seen) + out[i] = r + if r != nil { + lc := strings.ToLower(*r) + seen[lc] = struct{}{} + prevAnchor = lc + } else { + prevAnchor = "" + } + } + return out +} + +// RefreshNeighborGraph loads the latest neighbor_edges snapshot and +// publishes it atomically. Called on startup and once per neighbor- +// edges builder tick (60s) alongside RefreshPrefixIndex. +func (s *Store) RefreshNeighborGraph() error { + g, err := loadNeighborGraph(s.db) + if err != nil { + return err + } + s.neighborGraph.store(g) + return nil +} diff --git a/cmd/ingestor/resolved_path.go b/cmd/ingestor/resolved_path.go new file mode 100644 index 00000000..5a416836 --- /dev/null +++ b/cmd/ingestor/resolved_path.go @@ -0,0 +1,113 @@ +package main + +import ( + "encoding/json" + "strings" + "sync/atomic" +) + +// Issue #1547 — resolved_path writer (ingestor-owned). +// +// Per the #1283 refactor (server is read-only; ingestor owns the +// neighbor graph + node directory), the writer that populated +// `observations.resolved_path` must live here in the ingestor. PR #1289 +// removed the server-side writer without porting it — this restores it. +// +// Approach: +// - `resolvePath` is a pure function: hop prefixes → full pubkeys +// using the in-memory prefix index built from `nodes.public_key`. +// - Unique-prefix hops resolve to the full pubkey; ambiguous or +// unknown hops resolve to `nil`. The output shape is `[]*string` +// (with nulls for unresolved positions) — the JSON serialization +// matches what the server's `unmarshalResolvedPath` / +// frontend `getResolvedPath` already consume. +// - The prefix index is rebuilt on startup and once per neighbor- +// builder tick (60s) so new nodes start resolving within a minute +// without blocking the MQTT ingest path. + +// resolvePath maps each hop prefix to a full pubkey when the index +// has exactly one candidate; returns nil at that position otherwise. +// Returns nil for empty/no hops. +func resolvePath(hops []string, idx prefixIndex) []*string { + if len(hops) == 0 { + return nil + } + out := make([]*string, len(hops)) + if idx == nil { + return out + } + for i, hop := range hops { + h := strings.ToLower(hop) + candidates := idx[h] + if len(candidates) == 1 { + pk := candidates[0] + out[i] = &pk + } + } + return out +} + +// marshalResolvedPath JSON-encodes a resolved path. Returns "" when +// the input is empty OR when every element is nil (writer treats "" as +// SQL NULL). +// +// The all-nil case matters because of the UPSERT in InsertTransmission: +// +// resolved_path = COALESCE(excluded.resolved_path, resolved_path) +// +// If we emitted "[null,null]" here, nilIfEmpty() would let it through +// as a non-NULL string and the COALESCE would OVERWRITE a previously +// stored good resolved_path on re-ingest. Returning "" lets nilIfEmpty +// produce SQL NULL so the COALESCE falls through to the existing value. +// See issue #1547 / PR #1548 reviewer findings. +func marshalResolvedPath(rp []*string) string { + if len(rp) == 0 { + return "" + } + allNil := true + for _, p := range rp { + if p != nil { + allNil = false + break + } + } + if allNil { + return "" + } + b, err := json.Marshal(rp) + if err != nil { + return "" + } + return string(b) +} + +// prefixIdxHolder caches the prefix index for the InsertTransmission +// hot path. atomic.Value lets the 60s rebuild happen without a lock on +// the read side. +type prefixIdxHolder struct { + v atomic.Value // holds prefixIndex +} + +func (h *prefixIdxHolder) load() prefixIndex { + if v := h.v.Load(); v != nil { + return v.(prefixIndex) + } + return nil +} + +func (h *prefixIdxHolder) store(idx prefixIndex) { + h.v.Store(idx) +} + +// RefreshPrefixIndex rebuilds the in-memory prefix index from the +// nodes table and publishes it atomically. Called on startup and from +// the neighbor-edges builder tick (60s) so new nodes become resolvable +// without per-insert DB scans. +func (s *Store) RefreshPrefixIndex() error { + idx, err := buildPrefixIndex(s.db) + if err != nil { + return err + } + s.prefixIdx.store(idx) + return nil +} diff --git a/cmd/ingestor/resolved_path_test.go b/cmd/ingestor/resolved_path_test.go new file mode 100644 index 00000000..30886850 --- /dev/null +++ b/cmd/ingestor/resolved_path_test.go @@ -0,0 +1,446 @@ +package main + +import ( + "database/sql" + "encoding/json" + "path/filepath" + "testing" +) + +func unmarshalResolvedPathLocal(s string) []*string { + if s == "" { + return nil + } + var out []*string + if json.Unmarshal([]byte(s), &out) != nil { + return nil + } + return out +} + +// TestResolvePathPureFunction is a unit test for the pure resolvePath +// helper. Asserts: +// - unique-prefix hops resolve to the full pubkey +// - ambiguous-prefix hops resolve to nil +// - unknown-prefix hops resolve to nil +// - return slice length equals input hop count +// +// Regression gate for #1547 (resolved_path stopped being written). +func TestResolvePathPureFunction(t *testing.T) { + idx := prefixIndex{ + // "aa" → exactly one pubkey + "aa": {"aaaaaaaaaa"}, + "aaaaaaaaaa": {"aaaaaaaaaa"}, + // "bb" → exactly one pubkey + "bb": {"bbbbbbbbbb"}, + "bbbbbbbbbb": {"bbbbbbbbbb"}, + // "cc" → ambiguous (2 candidates) + "cc": {"cccccccccc", "ccdddddddd"}, + "cccccccccc": {"cccccccccc"}, + } + + got := resolvePath([]string{"aa", "cc", "ff", "bb"}, idx) + if len(got) != 4 { + t.Fatalf("expected len 4, got %d", len(got)) + } + if got[0] == nil || *got[0] != "aaaaaaaaaa" { + t.Errorf("hop[0] aa: want aaaaaaaaaa, got %v", deref(got[0])) + } + if got[1] != nil { + t.Errorf("hop[1] cc: want nil (ambiguous), got %v", deref(got[1])) + } + if got[2] != nil { + t.Errorf("hop[2] ff: want nil (unknown), got %v", deref(got[2])) + } + if got[3] == nil || *got[3] != "bbbbbbbbbb" { + t.Errorf("hop[3] bb: want bbbbbbbbbb, got %v", deref(got[3])) + } +} + +// TestResolvePathEmptyHops asserts empty/no-path produces nil. +func TestResolvePathEmptyHops(t *testing.T) { + if got := resolvePath(nil, prefixIndex{}); got != nil { + t.Errorf("nil hops: want nil, got %v", got) + } + if got := resolvePath([]string{}, prefixIndex{}); got != nil { + t.Errorf("empty hops: want nil, got %v", got) + } +} + +// TestMarshalResolvedPathRoundtrip asserts the JSON shape matches the +// server's marshal/unmarshal contract: `[]*string` with nulls for +// unresolved hops. +func TestMarshalResolvedPathRoundtrip(t *testing.T) { + a := "aaaaaaaaaa" + b := "bbbbbbbbbb" + in := []*string{&a, nil, &b} + s := marshalResolvedPath(in) + want := `["aaaaaaaaaa",null,"bbbbbbbbbb"]` + if s != want { + t.Errorf("marshal: want %s, got %s", want, s) + } +} + +// TestInsertTransmissionWritesResolvedPath is the integration test that +// gates the regression introduced by PR #1289 (issue #1547). +// +// Setup: seed two nodes + one observer + invoke InsertTransmission with +// a PacketData whose PathJSON references one of the seeded nodes by +// unique 1-byte (2-hex) prefix. +// +// Assert: the inserted observations row has a non-NULL resolved_path +// whose JSON-decoded length equals the hop count, and the resolved +// element matches the seeded node's full pubkey. +func TestInsertTransmissionWritesResolvedPath(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "ingest.db") + + store, err := OpenStore(dbPath) + if err != nil { + t.Fatalf("OpenStore: %v", err) + } + defer store.Close() + + // Seed nodes with unique 1-byte prefixes. + if _, err := store.db.Exec( + `INSERT INTO nodes (public_key, name) VALUES (?, ?), (?, ?)`, + "aaaaaaaaaa", "from-node", + "bbbbbbbbbb", "first-hop", + ); err != nil { + t.Fatal(err) + } + + // Seed one observer (needed so InsertTransmission resolves observer_idx). + if err := store.UpsertObserver("obs-1", "observer-1", "", nil); err != nil { + t.Fatalf("UpsertObserver: %v", err) + } + + // Force the prefix index to be (re)built from the seeded nodes so + // the InsertTransmission path has something to resolve against. + if err := store.RefreshPrefixIndex(); err != nil { + t.Fatalf("RefreshPrefixIndex: %v", err) + } + + pkt := &PacketData{ + RawHex: "deadbeef", + Timestamp: "2026-06-01T00:00:00Z", + ObserverID: "obs-1", + Hash: "h-1547", + RouteType: 0, + PayloadType: int(payloadADVERT), + PathJSON: `["bb"]`, + DecodedJSON: "{}", + FromPubkey: "aaaaaaaaaa", + } + if _, err := store.InsertTransmission(pkt); err != nil { + t.Fatalf("InsertTransmission: %v", err) + } + + var rp sql.NullString + if err := store.db.QueryRow( + `SELECT resolved_path FROM observations WHERE transmission_id = (SELECT id FROM transmissions WHERE hash = ?)`, + "h-1547", + ).Scan(&rp); err != nil { + t.Fatalf("query: %v", err) + } + if !rp.Valid || rp.String == "" { + t.Fatalf("expected non-nil resolved_path, got NULL/empty (regression: #1547)") + } + got := unmarshalResolvedPathLocal(rp.String) + if len(got) != 1 { + t.Fatalf("resolved_path length: want 1, got %d (value=%s)", len(got), rp.String) + } + if got[0] == nil || *got[0] != "bbbbbbbbbb" { + t.Errorf("resolved_path[0]: want bbbbbbbbbb, got %v (raw=%s)", deref(got[0]), rp.String) + } +} + +func deref(p *string) string { + if p == nil { + return "" + } + return *p +} + +// ─── #1560: context-aware resolution tests ───────────────────────────────── +// +// These exercise the post-fix behavior of resolveHopWithContext + +// resolvePathWithContext. Until the green commit lands they MUST fail +// on assertions (the stub falls back to naive `len==1` and returns nil +// on every >1-candidate prefix), proving the gate is real. + +// build5NodeAmbiguousIndex returns a prefixIndex where 3 of 5 nodes +// share the 1-byte prefix 0x5c. Pubkeys are the "fingerprints": +// +// A = "5c000000000000000000000000000000aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" +// B = "5c000000000000000000000000000000bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" +// C = "5c000000000000000000000000000000cccccccccccccccccccccccccccccccc" +// D = "dd000000000000000000000000000000dddddddddddddddddddddddddddddddd" +// E = "ee000000000000000000000000000000eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee" +func build5NodeAmbiguousIndex() (idx prefixIndex, A, B, C, D, E string) { + A = "5c000000000000000000000000000000aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + B = "5c000000000000000000000000000000bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + C = "5c000000000000000000000000000000cccccccccccccccccccccccccccccccc" + D = "dd000000000000000000000000000000dddddddddddddddddddddddddddddddd" + E = "ee000000000000000000000000000000eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee" + idx = prefixIndex{ + // 1-byte: 5c → A,B,C (collision); dd → D; ee → E + "5c": {A, B, C}, + "dd": {D}, + "ee": {E}, + // full-key entries (so exact-match lookups still resolve) + A: {A}, B: {B}, C: {C}, D: {D}, E: {E}, + } + return +} + +// TestResolveHopWithContext_OneByteCollision_AdjacencyResolves +// asserts the dominant production case (#1560): three nodes share the +// 1-byte prefix 0x5c, but NeighborGraph adjacency narrows to exactly +// one. The naive resolver returns nil; the context-aware resolver +// MUST return the right pubkey. +func TestResolveHopWithContext_OneByteCollision_AdjacencyResolves(t *testing.T) { + idx, A, B, C, D, E := build5NodeAmbiguousIndex() + g := NewNeighborGraph() + // chain: A↔B, B↔C, C↔D, D↔E + g.AddEdge(A, B) + g.AddEdge(B, C) + g.AddEdge(C, D) + g.AddEdge(D, E) + + // Anchored on A, the only 5c neighbor of A is B. + got := resolveHopWithContext("5c", A, g, idx, nil) + if got == nil { + t.Fatalf("anchor=A, hop=5c: want B (%s), got ", B) + } + if *got != B { + t.Errorf("anchor=A, hop=5c: want %s, got %s", B, *got) + } + + // Anchored on B, the only 5c neighbors of B are A and C — but A is + // the originator anchor in a path-walk; here we just assert that + // 2 surviving candidates → nil (cannot disambiguate further). + got = resolveHopWithContext("5c", B, g, idx, nil) + if got != nil { + t.Errorf("anchor=B, hop=5c: ambiguous (A and C both adjacent); want , got %s", *got) + } +} + +// TestResolvePathWithContext_TwoHopChainAnchoredOnFromNode covers the +// canonical 1-byte collision case end-to-end: path = [5c, 5c], +// from_node = A → expect [B, C]. +func TestResolvePathWithContext_TwoHopChainAnchoredOnFromNode(t *testing.T) { + idx, A, B, C, _, _ := build5NodeAmbiguousIndex() + g := NewNeighborGraph() + g.AddEdge(A, B) + g.AddEdge(B, C) + + got := resolvePathWithContext([]string{"5c", "5c"}, A, g, idx) + if len(got) != 2 { + t.Fatalf("len(got)=%d, want 2 (raw=%v)", len(got), got) + } + if got[0] == nil || *got[0] != B { + t.Errorf("hop[0]: want %s, got %v", B, deref(got[0])) + } + if got[1] == nil || *got[1] != C { + t.Errorf("hop[1]: want %s, got %v", C, deref(got[1])) + } +} + +// TestResolveHopWithContext_NoAdjacencyContext_ReturnsNil asserts the +// negative gate: 3 nodes with shared prefix, no edges between them in +// the graph, hop=[5c] with no usable anchor → nil. Guards against an +// over-eager resolver that just picks the first candidate. +func TestResolveHopWithContext_NoAdjacencyContext_ReturnsNil(t *testing.T) { + idx, _, _, _, _, _ := build5NodeAmbiguousIndex() + g := NewNeighborGraph() // empty: no edges + got := resolveHopWithContext("5c", "", g, idx, nil) + if got != nil { + t.Errorf("no anchor + empty graph: want , got %s", *got) + } + + // With an anchor that's not adjacent to any candidate, also nil. + got = resolveHopWithContext("5c", "deadbeefdeadbeef", g, idx, nil) + if got != nil { + t.Errorf("non-adjacent anchor: want , got %s", *got) + } +} + +// TestResolvePathWithContext_AdvertAnchoring asserts ADVERT-style +// anchoring: from_pubkey is the originator, hop[0] is one of its +// 1-byte-prefix neighbors → resolved. +func TestResolvePathWithContext_AdvertAnchoring(t *testing.T) { + idx, A, B, _, _, _ := build5NodeAmbiguousIndex() + g := NewNeighborGraph() + g.AddEdge(A, B) // only B is adjacent to A among the 5c candidates + + got := resolvePathWithContext([]string{"5c"}, A, g, idx) + if len(got) != 1 { + t.Fatalf("len(got)=%d, want 1", len(got)) + } + if got[0] == nil || *got[0] != B { + t.Errorf("ADVERT anchored on A, hop=5c: want %s, got %v", B, deref(got[0])) + } +} + +// TestResolvePathWithContext_RegressionMultiByteStillWorks asserts no +// regression in the 2/3/4-byte prefix path that PR #1548 already +// handled — unique prefixes resolve regardless of graph context. +func TestResolvePathWithContext_RegressionMultiByteStillWorks(t *testing.T) { + idx, _, _, _, D, E := build5NodeAmbiguousIndex() + // dd and ee are unique 1-byte prefixes — naive path still works. + got := resolvePathWithContext([]string{"dd", "ee"}, "", nil, idx) + if len(got) != 2 { + t.Fatalf("len(got)=%d, want 2", len(got)) + } + if got[0] == nil || *got[0] != D { + t.Errorf("hop[0] dd: want %s, got %v", D, deref(got[0])) + } + if got[1] == nil || *got[1] != E { + t.Errorf("hop[1] ee: want %s, got %v", E, deref(got[1])) + } +} + +// TestResolvePathWithContext_AllNilContractPreserved asserts the +// all-nil → empty-string clobber-guard contract from PR #1548 still +// holds: an unresolvable path through the context resolver, when fed +// to marshalResolvedPath, MUST yield "" (so nilIfEmpty → SQL NULL +// → COALESCE preserves existing). +func TestResolvePathWithContext_AllNilContractPreserved(t *testing.T) { + // Empty index → every hop nil. + got := resolvePathWithContext([]string{"5c", "dd"}, "", nil, prefixIndex{}) + if len(got) != 2 { + t.Fatalf("len(got)=%d, want 2", len(got)) + } + for i, p := range got { + if p != nil { + t.Errorf("hop[%d]: want , got %s", i, *p) + } + } + if s := marshalResolvedPath(got); s != "" { + t.Errorf("all-nil marshal: want \"\", got %q (clobber-guard regression)", s) + } +} + +// TestMarshalResolvedPathAllNilReturnsEmpty is a regression gate for +// the data-loss clobber bug surfaced in PR #1548 review. +// +// When resolvePath fails to resolve ANY hop (every element nil), +// marshalResolvedPath previously emitted "[null,null,...]" — a +// non-empty string that bypassed nilIfEmpty and then OVERWROTE the +// existing resolved_path via the COALESCE(excluded, current) UPSERT +// on re-ingest. The fix returns "" so nilIfEmpty produces SQL NULL and +// the COALESCE preserves the existing good value. +func TestMarshalResolvedPathAllNilReturnsEmpty(t *testing.T) { + cases := []struct { + name string + in []*string + }{ + {"one-nil", []*string{nil}}, + {"two-nils", []*string{nil, nil}}, + {"three-nils", []*string{nil, nil, nil}}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := marshalResolvedPath(tc.in) + if got != "" { + t.Errorf("all-nil input must return \"\" (so nilIfEmpty → SQL NULL → COALESCE preserves existing); got %q", got) + } + }) + } + + // Mixed (at least one non-nil) MUST still marshal normally so we + // don't lose partial resolutions. + a := "aaaaaaaaaa" + mixed := marshalResolvedPath([]*string{&a, nil}) + if mixed != `["aaaaaaaaaa",null]` { + t.Errorf("partial resolution must still serialize; got %q", mixed) + } +} + +// TestInsertTransmissionDoesNotClobberResolvedPathOnAllNil is the +// integration-level regression test for the data-loss bug. +// +// Setup: insert a transmission whose first ingest resolves cleanly to +// a known pubkey. Then re-ingest the SAME transmission after the +// prefix index has been cleared (simulating an empty NeighborGraph / +// all-nil resolution path) and assert the previously stored +// resolved_path is PRESERVED (NOT overwritten to "[null]" or NULL). +// +// Pre-fix behavior: marshalResolvedPath emitted "[null]", nilIfEmpty +// kept it non-NULL, and COALESCE(excluded.resolved_path, resolved_path) +// clobbered the original "bbbbbbbbbb". +func TestInsertTransmissionDoesNotClobberResolvedPathOnAllNil(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "ingest.db") + + store, err := OpenStore(dbPath) + if err != nil { + t.Fatalf("OpenStore: %v", err) + } + defer store.Close() + + if _, err := store.db.Exec( + `INSERT INTO nodes (public_key, name) VALUES (?, ?), (?, ?)`, + "aaaaaaaaaa", "from-node", + "bbbbbbbbbb", "first-hop", + ); err != nil { + t.Fatal(err) + } + if err := store.UpsertObserver("obs-1", "observer-1", "", nil); err != nil { + t.Fatalf("UpsertObserver: %v", err) + } + if err := store.RefreshPrefixIndex(); err != nil { + t.Fatalf("RefreshPrefixIndex: %v", err) + } + + pkt := &PacketData{ + RawHex: "deadbeef", + Timestamp: "2026-06-01T00:00:00Z", + ObserverID: "obs-1", + Hash: "h-clobber", + RouteType: 0, + PayloadType: int(payloadADVERT), + PathJSON: `["bb"]`, + DecodedJSON: "{}", + FromPubkey: "aaaaaaaaaa", + } + if _, err := store.InsertTransmission(pkt); err != nil { + t.Fatalf("first InsertTransmission: %v", err) + } + + // Sanity: first write populated resolved_path. + var first sql.NullString + if err := store.db.QueryRow( + `SELECT resolved_path FROM observations WHERE transmission_id = (SELECT id FROM transmissions WHERE hash = ?)`, + "h-clobber", + ).Scan(&first); err != nil { + t.Fatalf("first query: %v", err) + } + if !first.Valid || first.String == "" { + t.Fatalf("precondition failed: first ingest left resolved_path NULL/empty; cannot test clobber") + } + wantPreserved := first.String + + // Now wipe the prefix index so re-ingest produces an all-nil + // resolution — exactly the scenario where the bug clobbers data. + store.prefixIdx.store(prefixIndex{}) + + if _, err := store.InsertTransmission(pkt); err != nil { + t.Fatalf("re-ingest InsertTransmission: %v", err) + } + + var after sql.NullString + if err := store.db.QueryRow( + `SELECT resolved_path FROM observations WHERE transmission_id = (SELECT id FROM transmissions WHERE hash = ?)`, + "h-clobber", + ).Scan(&after); err != nil { + t.Fatalf("post-reingest query: %v", err) + } + if !after.Valid { + t.Fatalf("data loss: resolved_path was NULL'd by re-ingest (was %q)", wantPreserved) + } + if after.String != wantPreserved { + t.Errorf("data loss: resolved_path was clobbered by all-nil re-ingest\n before: %s\n after: %s", wantPreserved, after.String) + } +}