fix(ingestor): write resolved_path on new observations (regression from #1289) (#1548)

# fix(ingestor): write resolved_path on new observations (full restore —
closes #1547 + #1560)

Fixes #1547. Closes #1560.

## Root cause
PR #1289 (the "ingestor owns the neighbor graph; server is read-only"
refactor, ~2026-05-21) moved the neighbor graph + schema writes to the
ingestor, and as a side-effect removed the server-side writer that
populated `observations.resolved_path` AND the context-aware
`pm.resolveWithContext` that disambiguated 1-byte prefix collisions.
Result: every observation inserted after the deploy has `resolved_path =
NULL` (3.1M/6.3M NULL on staging; 100% NULL on fresh deploys; symptom on
Cascadia: hops fail to resolve because the small-mesh client-side
fallback breaks on prefix collisions).

## Full restore
This PR resolves both single-byte and multi-byte prefix paths.
Single-byte disambiguation uses NeighborGraph adjacency and ADVERT
`from_pubkey` anchoring, ported from pre-#1289 `pm.resolveWithContext`
logic (last good at cmd/server/store.go @ commit 450236d5) and the #1144
/ #1352 fixes.

New file `cmd/ingestor/path_resolver.go`:
- `NeighborGraph` + `neighborGraphHolder` — in-memory adjacency
snapshot, atomic-published.
- `loadNeighborGraph(db)` — one-shot SELECT from `neighbor_edges`.
- `resolveHopWithContext(hop, anchor, graph, idx, exclude) *string` —
single-hop, tier-1 disambiguator.
- `resolvePathWithContext(hops, fromPubkey, graph, idx) []*string` —
walks the path, anchoring hop 0 on `from_pubkey` (ADVERTs) and each
subsequent hop on the previous resolved hop, excluding already-resolved
pubkeys.
- `Store.RefreshNeighborGraph()` — called on warm-up and every 60s tick
in the neighbor-edges builder alongside `RefreshPrefixIndex`.

Existing file `cmd/ingestor/resolved_path.go` (PR #1547 base) is
untouched: `resolvePath` + `marshalResolvedPath` + the all-nil →
empty-string clobber-guard contract are preserved verbatim.

`cmd/ingestor/db.go` — `InsertTransmission` now calls
`resolvePathWithContext` instead of the naive `resolvePath`.

## Algorithm (per hop)
1. Look up candidate pubkeys by prefix-match (existing `prefixIndex`).
2. `len==0 → nil`; `len==1 → that pubkey`.
3. `len>1` → filter by `NeighborGraph` adjacency to the anchor. Anchor
is `from_pubkey` for hop 0 on ADVERTs, the previous resolved hop
otherwise. Exactly 1 surviving candidate → use it; else nil.
4. Previously resolved hops (and the originator) are excluded from
downstream candidate pools — a packet does not revisit a node.

Tier-2/3/4 from pre-#1289 (geo proximity, GPS preference,
observation-count fallback) are intentionally NOT ported — those were
noisy in practice and belong in a separate enhancement, not in this
regression restore.

## Out of scope
- The ~3.1M existing NULL rows from the regression window. Filed as a
follow-up backfill task — too risky to bundle here (touches a 6M-row
table).
- The dead-flag bug #1546 — separate concern.

## TDD red → green
- Red commit `80b0f476` — adds five new context-resolver tests; stub
`resolvePathWithContext` falls back to naive `resolvePath`. CI run
26946935615 → **failure** with assertion errors on the three collision
tests (`TestResolveHopWithContext_OneByteCollision_AdjacencyResolves`,
`TestResolvePathWithContext_TwoHopChainAnchoredOnFromNode`,
`TestResolvePathWithContext_AdvertAnchoring`); the two regression tests
(multi-byte still works + all-nil contract) stayed green.
- Green commit `7b4950ce` — real algorithm + InsertTransmission wiring +
RefreshNeighborGraph in the builder tick. All five new tests pass;
original four `resolved_path` tests stay green.

## Verification
- `go test -race ./cmd/ingestor/...` for the 11 affected tests — pass.
- `bash ~/.openclaw/skills/pr-preflight/scripts/run-all.sh
origin/master` — exit 0 (all gates clean).
- PII grep on body + diff: clean.

Tested with: existing `TestInsertTransmissionWritesResolvedPath` +
`TestInsertTransmissionDoesNotClobberResolvedPathOnAllNil` (PR #1547
base) plus the new collision-resolution suite:
- `TestResolveHopWithContext_OneByteCollision_AdjacencyResolves` —
3-of-5 nodes share `0x5c`, chain A↔B↔C↔D↔E; anchored on A, hop `5c` → B.
- `TestResolvePathWithContext_TwoHopChainAnchoredOnFromNode` — path
`[5c, 5c]` from_node A → `[B, C]`.
- `TestResolveHopWithContext_NoAdjacencyContext_ReturnsNil` — 3
ambiguous candidates, no anchor / non-adjacent anchor → nil.
- `TestResolvePathWithContext_AdvertAnchoring` — ADVERT,
`from_pubkey=A`, path `[5c]` → only-adjacent neighbor B.
- `TestResolvePathWithContext_RegressionMultiByteStillWorks` —
unique-prefix path with no graph context still resolves.
- `TestResolvePathWithContext_AllNilContractPreserved` — unresolvable
path → `marshalResolvedPath==""` (clobber-guard from PR #1548
untouched).

## Browser-validated
N/A — backend-only change. Frontend already handles populated
`resolved_path` via `getResolvedPath` in `cmd/server/db.go` and
`public/packets.js`.

## Round-1 fixes addressed
- **MUST-FIX #1 (data-loss clobber on all-nil resolution):** when every
hop fails to resolve, `marshalResolvedPath` returns `""` instead of
`"[null,null,...]"`, so `nilIfEmpty` → SQL NULL and the
`COALESCE(excluded.resolved_path, resolved_path)` UPSERT preserves any
previously stored good value on re-ingest. Regression test asserts:
insert a transmission, observe `resolved_path` populated, wipe the
prefix index, re-ingest the same packet, assert the existing
`resolved_path` is unchanged.

---------

Co-authored-by: corescope-bot <bot@corescope>
Co-authored-by: openclaw-bot <bot@openclaw>
Co-authored-by: openclaw-bot <bot@openclaw.local>
This commit is contained in:
Kpa-clawbot
2026-06-04 07:35:13 -07:00
committed by GitHub
parent 23f292d03b
commit 3feb97f16f
5 changed files with 837 additions and 6 deletions
+32 -6
View File
@@ -81,6 +81,16 @@ type Store struct {
sampleIntervalSec int
backfillWg sync.WaitGroup
// prefixIdx holds the prefix → pubkey index used by the
// resolved_path writer (#1547). Rebuilt on startup and once per
// neighbor-edges builder tick (60s).
prefixIdx prefixIdxHolder
// neighborGraph holds the in-memory NeighborGraph snapshot used
// by the context-aware resolver (#1560). Rebuilt on startup and
// once per neighbor-edges builder tick (60s).
neighborGraph neighborGraphHolder
}
// OpenStore opens or creates a SQLite DB at the given path, applying the
@@ -681,13 +691,14 @@ func (s *Store) prepareStatements() error {
}
s.stmtInsertObservation, err = s.db.Prepare(`
INSERT INTO observations (transmission_id, observer_idx, direction, snr, rssi, score, path_json, timestamp, raw_hex)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
INSERT INTO observations (transmission_id, observer_idx, direction, snr, rssi, score, path_json, timestamp, raw_hex, resolved_path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(transmission_id, observer_idx, COALESCE(path_json, '')) DO UPDATE SET
snr = COALESCE(excluded.snr, snr),
rssi = COALESCE(excluded.rssi, rssi),
score = COALESCE(excluded.score, score),
raw_hex = COALESCE(excluded.raw_hex, raw_hex)
snr = COALESCE(excluded.snr, snr),
rssi = COALESCE(excluded.rssi, rssi),
score = COALESCE(excluded.score, score),
raw_hex = COALESCE(excluded.raw_hex, raw_hex),
resolved_path = COALESCE(excluded.resolved_path, resolved_path)
`)
if err != nil {
return err
@@ -842,10 +853,25 @@ func (s *Store) InsertTransmission(data *PacketData) (bool, error) {
epochTs = t.Unix()
}
// Resolve hop prefixes to full pubkeys for `observations.resolved_path`.
// Per #1547: this writer was lost in the #1289 refactor and lives in
// the ingestor now. Per #1560: use the context-aware resolver so
// 1-byte prefix collisions are disambiguated via NeighborGraph
// adjacency (anchored on from_pubkey for ADVERTs, previous hop
// otherwise). Empty resolved JSON → NULL via nilIfEmpty.
resolved := resolvePathWithContext(
parsePathArray(data.PathJSON),
strings.ToLower(data.FromPubkey),
s.neighborGraph.load(),
s.prefixIdx.load(),
)
resolvedJSON := marshalResolvedPath(resolved)
_, err = s.stmtInsertObservation.Exec(
txID, observerIdx, data.Direction,
data.SNR, data.RSSI, data.Score,
data.PathJSON, epochTs, nilIfEmpty(data.RawHex),
nilIfEmpty(resolvedJSON),
)
if err != nil {
s.Stats.WriteErrors.Add(1)
+21
View File
@@ -63,6 +63,16 @@ func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() {
// returning — first server load needs a fully-populated table.
wuStart := time.Now()
var wuTotal int
// Prime the prefix index (#1547) so the very first
// InsertTransmission after startup can resolve hop prefixes.
if err := s.RefreshPrefixIndex(); err != nil {
log.Printf("[neighbor-build] initial prefix-index refresh error: %v", err)
}
// Prime the neighbor graph (#1560) so the context-aware resolver
// has adjacency data on the very first InsertTransmission.
if err := s.RefreshNeighborGraph(); err != nil {
log.Printf("[neighbor-build] initial neighbor-graph refresh error: %v", err)
}
for {
n, err := s.buildAndPersistNeighborEdges()
if err != nil {
@@ -85,7 +95,18 @@ func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() {
select {
case <-t.C:
start := time.Now()
// Refresh the prefix index alongside the edges build
// (#1547) so new nodes become resolvable within a tick.
if err := s.RefreshPrefixIndex(); err != nil {
log.Printf("[neighbor-build] prefix-index refresh error: %v", err)
}
n, err := s.buildAndPersistNeighborEdges()
// Refresh the neighbor-graph snapshot after the edges
// build (#1560) so the context-aware resolver picks up
// newly persisted adjacencies on the next ingest.
if grErr := s.RefreshNeighborGraph(); grErr != nil {
log.Printf("[neighbor-build] neighbor-graph refresh error: %v", grErr)
}
dur := time.Since(start)
if err != nil {
log.Printf("[neighbor-build] tick error after %s: %v", dur, err)
+225
View File
@@ -0,0 +1,225 @@
package main
import (
"database/sql"
"strings"
"sync/atomic"
)
// Context-aware hop resolver — full restore of pre-#1289 hop
// disambiguation semantics, ported into the ingestor (where the
// neighbor graph + node directory now live, per #1283).
//
// Why this exists (issues #1547 / #1560):
// The naive `resolvePath` only resolves hops whose prefix is unique
// in the node table. On a >2K-node mesh the dominant case is 1-byte
// prefix collisions (multiple candidates per prefix). Without
// adjacency disambiguation those hops always serialize as `nil`
// and the resolved_path remains effectively empty for the largest
// meshes — the very deployments that need it most.
//
// Algorithm (ported from cmd/server/store.go @ commit 450236d5
// `pm.resolveWithContext`, intersected with the disambiguation gating
// from PR #1144 / #1352):
//
// For each hop:
// 1. Collect candidate pubkeys by prefix-match (existing prefixIndex).
// 2. len==0 → nil.
// 3. len==1 → that pubkey.
// 4. len>1 → filter by NeighborGraph adjacency to the anchor:
// - hop 0 anchor = fromPubkey (ADVERT originator) if known;
// - hop i (i>0) anchor = previous resolved hop's pubkey;
// if the previous hop did not resolve, the chain breaks
// and subsequent >1-candidate hops fall to nil.
// Surviving candidates after filter:
// - exactly 1 → use it
// - 0 or >1 → nil (cannot disambiguate further)
//
// This is the conservative tier-1 variant. Pre-#1289 also carried
// tier-2 (geo proximity), tier-3 (GPS preference), tier-4 (obs-count
// fallback) — those were noisy in practice and are intentionally NOT
// ported here; this PR is a regression restore, not an enhancement.
// NeighborGraph is the in-memory adjacency snapshot used by the
// context-aware resolver. Internally lowercased.
type NeighborGraph struct {
adj map[string]map[string]struct{}
}
// NewNeighborGraph returns an empty graph.
func NewNeighborGraph() *NeighborGraph {
return &NeighborGraph{adj: make(map[string]map[string]struct{})}
}
// AddEdge adds an undirected adjacency a↔b. Self-loops and empty
// endpoints are ignored.
func (g *NeighborGraph) AddEdge(a, b string) {
a = strings.ToLower(a)
b = strings.ToLower(b)
if a == "" || b == "" || a == b {
return
}
if g.adj[a] == nil {
g.adj[a] = make(map[string]struct{})
}
if g.adj[b] == nil {
g.adj[b] = make(map[string]struct{})
}
g.adj[a][b] = struct{}{}
g.adj[b][a] = struct{}{}
}
// IsAdjacent reports whether a and b appear together in any neighbor edge.
func (g *NeighborGraph) IsAdjacent(a, b string) bool {
if g == nil {
return false
}
a = strings.ToLower(a)
b = strings.ToLower(b)
if a == "" || b == "" {
return false
}
nbrs, ok := g.adj[a]
if !ok {
return false
}
_, present := nbrs[b]
return present
}
// neighborGraphHolder caches the graph for the InsertTransmission hot
// path. atomic.Value lets the 60s rebuild publish without a read-side
// lock.
type neighborGraphHolder struct {
v atomic.Value // holds *NeighborGraph
}
func (h *neighborGraphHolder) load() *NeighborGraph {
if v := h.v.Load(); v != nil {
return v.(*NeighborGraph)
}
return nil
}
func (h *neighborGraphHolder) store(g *NeighborGraph) {
h.v.Store(g)
}
// loadNeighborGraph reads neighbor_edges and returns an in-memory
// adjacency snapshot. Safe to call against a fresh DB (returns an
// empty graph).
func loadNeighborGraph(db *sql.DB) (*NeighborGraph, error) {
rows, err := db.Query(`SELECT node_a, node_b FROM neighbor_edges`)
if err != nil {
return nil, err
}
defer rows.Close()
g := NewNeighborGraph()
for rows.Next() {
var a, b string
if err := rows.Scan(&a, &b); err != nil {
continue
}
g.AddEdge(a, b)
}
return g, nil
}
// resolveHopWithContext resolves a single hop using NeighborGraph
// adjacency to the anchor. Returns nil when the hop cannot be
// disambiguated.
//
// exclude is a set of pubkeys to discard from the candidate pool
// (typically the prior hops already resolved on the path — a packet
// does not revisit a node).
//
// Behavior matrix:
// len(candidates) | anchor | graph | result
// 0 | — | — | nil
// 1 | — | — | candidates[0]
// >1 | "" or no graph|— | nil
// >1 | non-empty | set | unique adjacent candidate
// (or nil if 0 or >1 survive)
func resolveHopWithContext(hop string, anchor string, graph *NeighborGraph, idx prefixIndex, exclude map[string]struct{}) *string {
if idx == nil {
return nil
}
h := strings.ToLower(hop)
candidates := idx[h]
switch len(candidates) {
case 0:
return nil
case 1:
pk := candidates[0]
if _, skip := exclude[pk]; skip {
return nil
}
return &pk
}
if graph == nil || anchor == "" {
return nil
}
var match string
survivors := 0
for _, cand := range candidates {
if _, skip := exclude[cand]; skip {
continue
}
if graph.IsAdjacent(anchor, cand) {
survivors++
if survivors > 1 {
return nil
}
match = cand
}
}
if survivors == 1 {
return &match
}
return nil
}
// resolvePathWithContext walks the hop list, anchoring hop 0 on
// fromPubkey (for ADVERTs) and each subsequent hop on the previous
// resolved hop. Previously-resolved pubkeys (plus the originator) are
// excluded from later candidate pools so the walk doesn't revisit a
// node. Returns a `[]*string` shape compatible with
// marshalResolvedPath (and the all-nil clobber-guard from PR #1548).
func resolvePathWithContext(hops []string, fromPubkey string, graph *NeighborGraph, idx prefixIndex) []*string {
if len(hops) == 0 {
return nil
}
out := make([]*string, len(hops))
if idx == nil {
return out
}
prevAnchor := strings.ToLower(fromPubkey)
seen := make(map[string]struct{}, len(hops)+1)
if prevAnchor != "" {
seen[prevAnchor] = struct{}{}
}
for i, hop := range hops {
r := resolveHopWithContext(hop, prevAnchor, graph, idx, seen)
out[i] = r
if r != nil {
lc := strings.ToLower(*r)
seen[lc] = struct{}{}
prevAnchor = lc
} else {
prevAnchor = ""
}
}
return out
}
// RefreshNeighborGraph loads the latest neighbor_edges snapshot and
// publishes it atomically. Called on startup and once per neighbor-
// edges builder tick (60s) alongside RefreshPrefixIndex.
func (s *Store) RefreshNeighborGraph() error {
g, err := loadNeighborGraph(s.db)
if err != nil {
return err
}
s.neighborGraph.store(g)
return nil
}
+113
View File
@@ -0,0 +1,113 @@
package main
import (
"encoding/json"
"strings"
"sync/atomic"
)
// Issue #1547 — resolved_path writer (ingestor-owned).
//
// Per the #1283 refactor (server is read-only; ingestor owns the
// neighbor graph + node directory), the writer that populated
// `observations.resolved_path` must live here in the ingestor. PR #1289
// removed the server-side writer without porting it — this restores it.
//
// Approach:
// - `resolvePath` is a pure function: hop prefixes → full pubkeys
// using the in-memory prefix index built from `nodes.public_key`.
// - Unique-prefix hops resolve to the full pubkey; ambiguous or
// unknown hops resolve to `nil`. The output shape is `[]*string`
// (with nulls for unresolved positions) — the JSON serialization
// matches what the server's `unmarshalResolvedPath` /
// frontend `getResolvedPath` already consume.
// - The prefix index is rebuilt on startup and once per neighbor-
// builder tick (60s) so new nodes start resolving within a minute
// without blocking the MQTT ingest path.
// resolvePath maps each hop prefix to a full pubkey when the index
// has exactly one candidate; returns nil at that position otherwise.
// Returns nil for empty/no hops.
func resolvePath(hops []string, idx prefixIndex) []*string {
if len(hops) == 0 {
return nil
}
out := make([]*string, len(hops))
if idx == nil {
return out
}
for i, hop := range hops {
h := strings.ToLower(hop)
candidates := idx[h]
if len(candidates) == 1 {
pk := candidates[0]
out[i] = &pk
}
}
return out
}
// marshalResolvedPath JSON-encodes a resolved path. Returns "" when
// the input is empty OR when every element is nil (writer treats "" as
// SQL NULL).
//
// The all-nil case matters because of the UPSERT in InsertTransmission:
//
// resolved_path = COALESCE(excluded.resolved_path, resolved_path)
//
// If we emitted "[null,null]" here, nilIfEmpty() would let it through
// as a non-NULL string and the COALESCE would OVERWRITE a previously
// stored good resolved_path on re-ingest. Returning "" lets nilIfEmpty
// produce SQL NULL so the COALESCE falls through to the existing value.
// See issue #1547 / PR #1548 reviewer findings.
func marshalResolvedPath(rp []*string) string {
if len(rp) == 0 {
return ""
}
allNil := true
for _, p := range rp {
if p != nil {
allNil = false
break
}
}
if allNil {
return ""
}
b, err := json.Marshal(rp)
if err != nil {
return ""
}
return string(b)
}
// prefixIdxHolder caches the prefix index for the InsertTransmission
// hot path. atomic.Value lets the 60s rebuild happen without a lock on
// the read side.
type prefixIdxHolder struct {
v atomic.Value // holds prefixIndex
}
func (h *prefixIdxHolder) load() prefixIndex {
if v := h.v.Load(); v != nil {
return v.(prefixIndex)
}
return nil
}
func (h *prefixIdxHolder) store(idx prefixIndex) {
h.v.Store(idx)
}
// RefreshPrefixIndex rebuilds the in-memory prefix index from the
// nodes table and publishes it atomically. Called on startup and from
// the neighbor-edges builder tick (60s) so new nodes become resolvable
// without per-insert DB scans.
func (s *Store) RefreshPrefixIndex() error {
idx, err := buildPrefixIndex(s.db)
if err != nil {
return err
}
s.prefixIdx.store(idx)
return nil
}
+446
View File
@@ -0,0 +1,446 @@
package main
import (
"database/sql"
"encoding/json"
"path/filepath"
"testing"
)
func unmarshalResolvedPathLocal(s string) []*string {
if s == "" {
return nil
}
var out []*string
if json.Unmarshal([]byte(s), &out) != nil {
return nil
}
return out
}
// TestResolvePathPureFunction is a unit test for the pure resolvePath
// helper. Asserts:
// - unique-prefix hops resolve to the full pubkey
// - ambiguous-prefix hops resolve to nil
// - unknown-prefix hops resolve to nil
// - return slice length equals input hop count
//
// Regression gate for #1547 (resolved_path stopped being written).
func TestResolvePathPureFunction(t *testing.T) {
idx := prefixIndex{
// "aa" → exactly one pubkey
"aa": {"aaaaaaaaaa"},
"aaaaaaaaaa": {"aaaaaaaaaa"},
// "bb" → exactly one pubkey
"bb": {"bbbbbbbbbb"},
"bbbbbbbbbb": {"bbbbbbbbbb"},
// "cc" → ambiguous (2 candidates)
"cc": {"cccccccccc", "ccdddddddd"},
"cccccccccc": {"cccccccccc"},
}
got := resolvePath([]string{"aa", "cc", "ff", "bb"}, idx)
if len(got) != 4 {
t.Fatalf("expected len 4, got %d", len(got))
}
if got[0] == nil || *got[0] != "aaaaaaaaaa" {
t.Errorf("hop[0] aa: want aaaaaaaaaa, got %v", deref(got[0]))
}
if got[1] != nil {
t.Errorf("hop[1] cc: want nil (ambiguous), got %v", deref(got[1]))
}
if got[2] != nil {
t.Errorf("hop[2] ff: want nil (unknown), got %v", deref(got[2]))
}
if got[3] == nil || *got[3] != "bbbbbbbbbb" {
t.Errorf("hop[3] bb: want bbbbbbbbbb, got %v", deref(got[3]))
}
}
// TestResolvePathEmptyHops asserts empty/no-path produces nil.
func TestResolvePathEmptyHops(t *testing.T) {
if got := resolvePath(nil, prefixIndex{}); got != nil {
t.Errorf("nil hops: want nil, got %v", got)
}
if got := resolvePath([]string{}, prefixIndex{}); got != nil {
t.Errorf("empty hops: want nil, got %v", got)
}
}
// TestMarshalResolvedPathRoundtrip asserts the JSON shape matches the
// server's marshal/unmarshal contract: `[]*string` with nulls for
// unresolved hops.
func TestMarshalResolvedPathRoundtrip(t *testing.T) {
a := "aaaaaaaaaa"
b := "bbbbbbbbbb"
in := []*string{&a, nil, &b}
s := marshalResolvedPath(in)
want := `["aaaaaaaaaa",null,"bbbbbbbbbb"]`
if s != want {
t.Errorf("marshal: want %s, got %s", want, s)
}
}
// TestInsertTransmissionWritesResolvedPath is the integration test that
// gates the regression introduced by PR #1289 (issue #1547).
//
// Setup: seed two nodes + one observer + invoke InsertTransmission with
// a PacketData whose PathJSON references one of the seeded nodes by
// unique 1-byte (2-hex) prefix.
//
// Assert: the inserted observations row has a non-NULL resolved_path
// whose JSON-decoded length equals the hop count, and the resolved
// element matches the seeded node's full pubkey.
func TestInsertTransmissionWritesResolvedPath(t *testing.T) {
dir := t.TempDir()
dbPath := filepath.Join(dir, "ingest.db")
store, err := OpenStore(dbPath)
if err != nil {
t.Fatalf("OpenStore: %v", err)
}
defer store.Close()
// Seed nodes with unique 1-byte prefixes.
if _, err := store.db.Exec(
`INSERT INTO nodes (public_key, name) VALUES (?, ?), (?, ?)`,
"aaaaaaaaaa", "from-node",
"bbbbbbbbbb", "first-hop",
); err != nil {
t.Fatal(err)
}
// Seed one observer (needed so InsertTransmission resolves observer_idx).
if err := store.UpsertObserver("obs-1", "observer-1", "", nil); err != nil {
t.Fatalf("UpsertObserver: %v", err)
}
// Force the prefix index to be (re)built from the seeded nodes so
// the InsertTransmission path has something to resolve against.
if err := store.RefreshPrefixIndex(); err != nil {
t.Fatalf("RefreshPrefixIndex: %v", err)
}
pkt := &PacketData{
RawHex: "deadbeef",
Timestamp: "2026-06-01T00:00:00Z",
ObserverID: "obs-1",
Hash: "h-1547",
RouteType: 0,
PayloadType: int(payloadADVERT),
PathJSON: `["bb"]`,
DecodedJSON: "{}",
FromPubkey: "aaaaaaaaaa",
}
if _, err := store.InsertTransmission(pkt); err != nil {
t.Fatalf("InsertTransmission: %v", err)
}
var rp sql.NullString
if err := store.db.QueryRow(
`SELECT resolved_path FROM observations WHERE transmission_id = (SELECT id FROM transmissions WHERE hash = ?)`,
"h-1547",
).Scan(&rp); err != nil {
t.Fatalf("query: %v", err)
}
if !rp.Valid || rp.String == "" {
t.Fatalf("expected non-nil resolved_path, got NULL/empty (regression: #1547)")
}
got := unmarshalResolvedPathLocal(rp.String)
if len(got) != 1 {
t.Fatalf("resolved_path length: want 1, got %d (value=%s)", len(got), rp.String)
}
if got[0] == nil || *got[0] != "bbbbbbbbbb" {
t.Errorf("resolved_path[0]: want bbbbbbbbbb, got %v (raw=%s)", deref(got[0]), rp.String)
}
}
func deref(p *string) string {
if p == nil {
return "<nil>"
}
return *p
}
// ─── #1560: context-aware resolution tests ─────────────────────────────────
//
// These exercise the post-fix behavior of resolveHopWithContext +
// resolvePathWithContext. Until the green commit lands they MUST fail
// on assertions (the stub falls back to naive `len==1` and returns nil
// on every >1-candidate prefix), proving the gate is real.
// build5NodeAmbiguousIndex returns a prefixIndex where 3 of 5 nodes
// share the 1-byte prefix 0x5c. Pubkeys are the "fingerprints":
//
// A = "5c000000000000000000000000000000aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
// B = "5c000000000000000000000000000000bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
// C = "5c000000000000000000000000000000cccccccccccccccccccccccccccccccc"
// D = "dd000000000000000000000000000000dddddddddddddddddddddddddddddddd"
// E = "ee000000000000000000000000000000eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"
func build5NodeAmbiguousIndex() (idx prefixIndex, A, B, C, D, E string) {
A = "5c000000000000000000000000000000aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
B = "5c000000000000000000000000000000bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
C = "5c000000000000000000000000000000cccccccccccccccccccccccccccccccc"
D = "dd000000000000000000000000000000dddddddddddddddddddddddddddddddd"
E = "ee000000000000000000000000000000eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"
idx = prefixIndex{
// 1-byte: 5c → A,B,C (collision); dd → D; ee → E
"5c": {A, B, C},
"dd": {D},
"ee": {E},
// full-key entries (so exact-match lookups still resolve)
A: {A}, B: {B}, C: {C}, D: {D}, E: {E},
}
return
}
// TestResolveHopWithContext_OneByteCollision_AdjacencyResolves
// asserts the dominant production case (#1560): three nodes share the
// 1-byte prefix 0x5c, but NeighborGraph adjacency narrows to exactly
// one. The naive resolver returns nil; the context-aware resolver
// MUST return the right pubkey.
func TestResolveHopWithContext_OneByteCollision_AdjacencyResolves(t *testing.T) {
idx, A, B, C, D, E := build5NodeAmbiguousIndex()
g := NewNeighborGraph()
// chain: A↔B, B↔C, C↔D, D↔E
g.AddEdge(A, B)
g.AddEdge(B, C)
g.AddEdge(C, D)
g.AddEdge(D, E)
// Anchored on A, the only 5c neighbor of A is B.
got := resolveHopWithContext("5c", A, g, idx, nil)
if got == nil {
t.Fatalf("anchor=A, hop=5c: want B (%s), got <nil>", B)
}
if *got != B {
t.Errorf("anchor=A, hop=5c: want %s, got %s", B, *got)
}
// Anchored on B, the only 5c neighbors of B are A and C — but A is
// the originator anchor in a path-walk; here we just assert that
// 2 surviving candidates → nil (cannot disambiguate further).
got = resolveHopWithContext("5c", B, g, idx, nil)
if got != nil {
t.Errorf("anchor=B, hop=5c: ambiguous (A and C both adjacent); want <nil>, got %s", *got)
}
}
// TestResolvePathWithContext_TwoHopChainAnchoredOnFromNode covers the
// canonical 1-byte collision case end-to-end: path = [5c, 5c],
// from_node = A → expect [B, C].
func TestResolvePathWithContext_TwoHopChainAnchoredOnFromNode(t *testing.T) {
idx, A, B, C, _, _ := build5NodeAmbiguousIndex()
g := NewNeighborGraph()
g.AddEdge(A, B)
g.AddEdge(B, C)
got := resolvePathWithContext([]string{"5c", "5c"}, A, g, idx)
if len(got) != 2 {
t.Fatalf("len(got)=%d, want 2 (raw=%v)", len(got), got)
}
if got[0] == nil || *got[0] != B {
t.Errorf("hop[0]: want %s, got %v", B, deref(got[0]))
}
if got[1] == nil || *got[1] != C {
t.Errorf("hop[1]: want %s, got %v", C, deref(got[1]))
}
}
// TestResolveHopWithContext_NoAdjacencyContext_ReturnsNil asserts the
// negative gate: 3 nodes with shared prefix, no edges between them in
// the graph, hop=[5c] with no usable anchor → nil. Guards against an
// over-eager resolver that just picks the first candidate.
func TestResolveHopWithContext_NoAdjacencyContext_ReturnsNil(t *testing.T) {
idx, _, _, _, _, _ := build5NodeAmbiguousIndex()
g := NewNeighborGraph() // empty: no edges
got := resolveHopWithContext("5c", "", g, idx, nil)
if got != nil {
t.Errorf("no anchor + empty graph: want <nil>, got %s", *got)
}
// With an anchor that's not adjacent to any candidate, also nil.
got = resolveHopWithContext("5c", "deadbeefdeadbeef", g, idx, nil)
if got != nil {
t.Errorf("non-adjacent anchor: want <nil>, got %s", *got)
}
}
// TestResolvePathWithContext_AdvertAnchoring asserts ADVERT-style
// anchoring: from_pubkey is the originator, hop[0] is one of its
// 1-byte-prefix neighbors → resolved.
func TestResolvePathWithContext_AdvertAnchoring(t *testing.T) {
idx, A, B, _, _, _ := build5NodeAmbiguousIndex()
g := NewNeighborGraph()
g.AddEdge(A, B) // only B is adjacent to A among the 5c candidates
got := resolvePathWithContext([]string{"5c"}, A, g, idx)
if len(got) != 1 {
t.Fatalf("len(got)=%d, want 1", len(got))
}
if got[0] == nil || *got[0] != B {
t.Errorf("ADVERT anchored on A, hop=5c: want %s, got %v", B, deref(got[0]))
}
}
// TestResolvePathWithContext_RegressionMultiByteStillWorks asserts no
// regression in the 2/3/4-byte prefix path that PR #1548 already
// handled — unique prefixes resolve regardless of graph context.
func TestResolvePathWithContext_RegressionMultiByteStillWorks(t *testing.T) {
idx, _, _, _, D, E := build5NodeAmbiguousIndex()
// dd and ee are unique 1-byte prefixes — naive path still works.
got := resolvePathWithContext([]string{"dd", "ee"}, "", nil, idx)
if len(got) != 2 {
t.Fatalf("len(got)=%d, want 2", len(got))
}
if got[0] == nil || *got[0] != D {
t.Errorf("hop[0] dd: want %s, got %v", D, deref(got[0]))
}
if got[1] == nil || *got[1] != E {
t.Errorf("hop[1] ee: want %s, got %v", E, deref(got[1]))
}
}
// TestResolvePathWithContext_AllNilContractPreserved asserts the
// all-nil → empty-string clobber-guard contract from PR #1548 still
// holds: an unresolvable path through the context resolver, when fed
// to marshalResolvedPath, MUST yield "" (so nilIfEmpty → SQL NULL
// → COALESCE preserves existing).
func TestResolvePathWithContext_AllNilContractPreserved(t *testing.T) {
// Empty index → every hop nil.
got := resolvePathWithContext([]string{"5c", "dd"}, "", nil, prefixIndex{})
if len(got) != 2 {
t.Fatalf("len(got)=%d, want 2", len(got))
}
for i, p := range got {
if p != nil {
t.Errorf("hop[%d]: want <nil>, got %s", i, *p)
}
}
if s := marshalResolvedPath(got); s != "" {
t.Errorf("all-nil marshal: want \"\", got %q (clobber-guard regression)", s)
}
}
// TestMarshalResolvedPathAllNilReturnsEmpty is a regression gate for
// the data-loss clobber bug surfaced in PR #1548 review.
//
// When resolvePath fails to resolve ANY hop (every element nil),
// marshalResolvedPath previously emitted "[null,null,...]" — a
// non-empty string that bypassed nilIfEmpty and then OVERWROTE the
// existing resolved_path via the COALESCE(excluded, current) UPSERT
// on re-ingest. The fix returns "" so nilIfEmpty produces SQL NULL and
// the COALESCE preserves the existing good value.
func TestMarshalResolvedPathAllNilReturnsEmpty(t *testing.T) {
cases := []struct {
name string
in []*string
}{
{"one-nil", []*string{nil}},
{"two-nils", []*string{nil, nil}},
{"three-nils", []*string{nil, nil, nil}},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := marshalResolvedPath(tc.in)
if got != "" {
t.Errorf("all-nil input must return \"\" (so nilIfEmpty → SQL NULL → COALESCE preserves existing); got %q", got)
}
})
}
// Mixed (at least one non-nil) MUST still marshal normally so we
// don't lose partial resolutions.
a := "aaaaaaaaaa"
mixed := marshalResolvedPath([]*string{&a, nil})
if mixed != `["aaaaaaaaaa",null]` {
t.Errorf("partial resolution must still serialize; got %q", mixed)
}
}
// TestInsertTransmissionDoesNotClobberResolvedPathOnAllNil is the
// integration-level regression test for the data-loss bug.
//
// Setup: insert a transmission whose first ingest resolves cleanly to
// a known pubkey. Then re-ingest the SAME transmission after the
// prefix index has been cleared (simulating an empty NeighborGraph /
// all-nil resolution path) and assert the previously stored
// resolved_path is PRESERVED (NOT overwritten to "[null]" or NULL).
//
// Pre-fix behavior: marshalResolvedPath emitted "[null]", nilIfEmpty
// kept it non-NULL, and COALESCE(excluded.resolved_path, resolved_path)
// clobbered the original "bbbbbbbbbb".
func TestInsertTransmissionDoesNotClobberResolvedPathOnAllNil(t *testing.T) {
dir := t.TempDir()
dbPath := filepath.Join(dir, "ingest.db")
store, err := OpenStore(dbPath)
if err != nil {
t.Fatalf("OpenStore: %v", err)
}
defer store.Close()
if _, err := store.db.Exec(
`INSERT INTO nodes (public_key, name) VALUES (?, ?), (?, ?)`,
"aaaaaaaaaa", "from-node",
"bbbbbbbbbb", "first-hop",
); err != nil {
t.Fatal(err)
}
if err := store.UpsertObserver("obs-1", "observer-1", "", nil); err != nil {
t.Fatalf("UpsertObserver: %v", err)
}
if err := store.RefreshPrefixIndex(); err != nil {
t.Fatalf("RefreshPrefixIndex: %v", err)
}
pkt := &PacketData{
RawHex: "deadbeef",
Timestamp: "2026-06-01T00:00:00Z",
ObserverID: "obs-1",
Hash: "h-clobber",
RouteType: 0,
PayloadType: int(payloadADVERT),
PathJSON: `["bb"]`,
DecodedJSON: "{}",
FromPubkey: "aaaaaaaaaa",
}
if _, err := store.InsertTransmission(pkt); err != nil {
t.Fatalf("first InsertTransmission: %v", err)
}
// Sanity: first write populated resolved_path.
var first sql.NullString
if err := store.db.QueryRow(
`SELECT resolved_path FROM observations WHERE transmission_id = (SELECT id FROM transmissions WHERE hash = ?)`,
"h-clobber",
).Scan(&first); err != nil {
t.Fatalf("first query: %v", err)
}
if !first.Valid || first.String == "" {
t.Fatalf("precondition failed: first ingest left resolved_path NULL/empty; cannot test clobber")
}
wantPreserved := first.String
// Now wipe the prefix index so re-ingest produces an all-nil
// resolution — exactly the scenario where the bug clobbers data.
store.prefixIdx.store(prefixIndex{})
if _, err := store.InsertTransmission(pkt); err != nil {
t.Fatalf("re-ingest InsertTransmission: %v", err)
}
var after sql.NullString
if err := store.db.QueryRow(
`SELECT resolved_path FROM observations WHERE transmission_id = (SELECT id FROM transmissions WHERE hash = ?)`,
"h-clobber",
).Scan(&after); err != nil {
t.Fatalf("post-reingest query: %v", err)
}
if !after.Valid {
t.Fatalf("data loss: resolved_path was NULL'd by re-ingest (was %q)", wantPreserved)
}
if after.String != wantPreserved {
t.Errorf("data loss: resolved_path was clobbered by all-nil re-ingest\n before: %s\n after: %s", wantPreserved, after.String)
}
}