mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-07-02 00:41:38 +00:00
# fix(ingestor): write resolved_path on new observations (full restore — closes #1547 + #1560) Fixes #1547. Closes #1560. ## Root cause PR #1289 (the "ingestor owns the neighbor graph; server is read-only" refactor, ~2026-05-21) moved the neighbor graph + schema writes to the ingestor, and as a side-effect removed the server-side writer that populated `observations.resolved_path` AND the context-aware `pm.resolveWithContext` that disambiguated 1-byte prefix collisions. Result: every observation inserted after the deploy has `resolved_path = NULL` (3.1M/6.3M NULL on staging; 100% NULL on fresh deploys; symptom on Cascadia: hops fail to resolve because the small-mesh client-side fallback breaks on prefix collisions). ## Full restore This PR resolves both single-byte and multi-byte prefix paths. Single-byte disambiguation uses NeighborGraph adjacency and ADVERT `from_pubkey` anchoring, ported from pre-#1289 `pm.resolveWithContext` logic (last good at cmd/server/store.go @ commit 450236d5) and the #1144 / #1352 fixes. New file `cmd/ingestor/path_resolver.go`: - `NeighborGraph` + `neighborGraphHolder` — in-memory adjacency snapshot, atomic-published. - `loadNeighborGraph(db)` — one-shot SELECT from `neighbor_edges`. - `resolveHopWithContext(hop, anchor, graph, idx, exclude) *string` — single-hop, tier-1 disambiguator. - `resolvePathWithContext(hops, fromPubkey, graph, idx) []*string` — walks the path, anchoring hop 0 on `from_pubkey` (ADVERTs) and each subsequent hop on the previous resolved hop, excluding already-resolved pubkeys. - `Store.RefreshNeighborGraph()` — called on warm-up and every 60s tick in the neighbor-edges builder alongside `RefreshPrefixIndex`. Existing file `cmd/ingestor/resolved_path.go` (PR #1547 base) is untouched: `resolvePath` + `marshalResolvedPath` + the all-nil → empty-string clobber-guard contract are preserved verbatim. `cmd/ingestor/db.go` — `InsertTransmission` now calls `resolvePathWithContext` instead of the naive `resolvePath`. ## Algorithm (per hop) 1. Look up candidate pubkeys by prefix-match (existing `prefixIndex`). 2. `len==0 → nil`; `len==1 → that pubkey`. 3. `len>1` → filter by `NeighborGraph` adjacency to the anchor. Anchor is `from_pubkey` for hop 0 on ADVERTs, the previous resolved hop otherwise. Exactly 1 surviving candidate → use it; else nil. 4. Previously resolved hops (and the originator) are excluded from downstream candidate pools — a packet does not revisit a node. Tier-2/3/4 from pre-#1289 (geo proximity, GPS preference, observation-count fallback) are intentionally NOT ported — those were noisy in practice and belong in a separate enhancement, not in this regression restore. ## Out of scope - The ~3.1M existing NULL rows from the regression window. Filed as a follow-up backfill task — too risky to bundle here (touches a 6M-row table). - The dead-flag bug #1546 — separate concern. ## TDD red → green - Red commit `80b0f476` — adds five new context-resolver tests; stub `resolvePathWithContext` falls back to naive `resolvePath`. CI run 26946935615 → **failure** with assertion errors on the three collision tests (`TestResolveHopWithContext_OneByteCollision_AdjacencyResolves`, `TestResolvePathWithContext_TwoHopChainAnchoredOnFromNode`, `TestResolvePathWithContext_AdvertAnchoring`); the two regression tests (multi-byte still works + all-nil contract) stayed green. - Green commit `7b4950ce` — real algorithm + InsertTransmission wiring + RefreshNeighborGraph in the builder tick. All five new tests pass; original four `resolved_path` tests stay green. ## Verification - `go test -race ./cmd/ingestor/...` for the 11 affected tests — pass. - `bash ~/.openclaw/skills/pr-preflight/scripts/run-all.sh origin/master` — exit 0 (all gates clean). - PII grep on body + diff: clean. Tested with: existing `TestInsertTransmissionWritesResolvedPath` + `TestInsertTransmissionDoesNotClobberResolvedPathOnAllNil` (PR #1547 base) plus the new collision-resolution suite: - `TestResolveHopWithContext_OneByteCollision_AdjacencyResolves` — 3-of-5 nodes share `0x5c`, chain A↔B↔C↔D↔E; anchored on A, hop `5c` → B. - `TestResolvePathWithContext_TwoHopChainAnchoredOnFromNode` — path `[5c, 5c]` from_node A → `[B, C]`. - `TestResolveHopWithContext_NoAdjacencyContext_ReturnsNil` — 3 ambiguous candidates, no anchor / non-adjacent anchor → nil. - `TestResolvePathWithContext_AdvertAnchoring` — ADVERT, `from_pubkey=A`, path `[5c]` → only-adjacent neighbor B. - `TestResolvePathWithContext_RegressionMultiByteStillWorks` — unique-prefix path with no graph context still resolves. - `TestResolvePathWithContext_AllNilContractPreserved` — unresolvable path → `marshalResolvedPath==""` (clobber-guard from PR #1548 untouched). ## Browser-validated N/A — backend-only change. Frontend already handles populated `resolved_path` via `getResolvedPath` in `cmd/server/db.go` and `public/packets.js`. ## Round-1 fixes addressed - **MUST-FIX #1 (data-loss clobber on all-nil resolution):** when every hop fails to resolve, `marshalResolvedPath` returns `""` instead of `"[null,null,...]"`, so `nilIfEmpty` → SQL NULL and the `COALESCE(excluded.resolved_path, resolved_path)` UPSERT preserves any previously stored good value on re-ingest. Regression test asserts: insert a transmission, observe `resolved_path` populated, wipe the prefix index, re-ingest the same packet, assert the existing `resolved_path` is unchanged. --------- Co-authored-by: corescope-bot <bot@corescope> Co-authored-by: openclaw-bot <bot@openclaw> Co-authored-by: openclaw-bot <bot@openclaw.local>
This commit is contained in:
+32
-6
@@ -81,6 +81,16 @@ type Store struct {
|
||||
|
||||
sampleIntervalSec int
|
||||
backfillWg sync.WaitGroup
|
||||
|
||||
// prefixIdx holds the prefix → pubkey index used by the
|
||||
// resolved_path writer (#1547). Rebuilt on startup and once per
|
||||
// neighbor-edges builder tick (60s).
|
||||
prefixIdx prefixIdxHolder
|
||||
|
||||
// neighborGraph holds the in-memory NeighborGraph snapshot used
|
||||
// by the context-aware resolver (#1560). Rebuilt on startup and
|
||||
// once per neighbor-edges builder tick (60s).
|
||||
neighborGraph neighborGraphHolder
|
||||
}
|
||||
|
||||
// OpenStore opens or creates a SQLite DB at the given path, applying the
|
||||
@@ -681,13 +691,14 @@ func (s *Store) prepareStatements() error {
|
||||
}
|
||||
|
||||
s.stmtInsertObservation, err = s.db.Prepare(`
|
||||
INSERT INTO observations (transmission_id, observer_idx, direction, snr, rssi, score, path_json, timestamp, raw_hex)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
INSERT INTO observations (transmission_id, observer_idx, direction, snr, rssi, score, path_json, timestamp, raw_hex, resolved_path)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(transmission_id, observer_idx, COALESCE(path_json, '')) DO UPDATE SET
|
||||
snr = COALESCE(excluded.snr, snr),
|
||||
rssi = COALESCE(excluded.rssi, rssi),
|
||||
score = COALESCE(excluded.score, score),
|
||||
raw_hex = COALESCE(excluded.raw_hex, raw_hex)
|
||||
snr = COALESCE(excluded.snr, snr),
|
||||
rssi = COALESCE(excluded.rssi, rssi),
|
||||
score = COALESCE(excluded.score, score),
|
||||
raw_hex = COALESCE(excluded.raw_hex, raw_hex),
|
||||
resolved_path = COALESCE(excluded.resolved_path, resolved_path)
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -842,10 +853,25 @@ func (s *Store) InsertTransmission(data *PacketData) (bool, error) {
|
||||
epochTs = t.Unix()
|
||||
}
|
||||
|
||||
// Resolve hop prefixes to full pubkeys for `observations.resolved_path`.
|
||||
// Per #1547: this writer was lost in the #1289 refactor and lives in
|
||||
// the ingestor now. Per #1560: use the context-aware resolver so
|
||||
// 1-byte prefix collisions are disambiguated via NeighborGraph
|
||||
// adjacency (anchored on from_pubkey for ADVERTs, previous hop
|
||||
// otherwise). Empty resolved JSON → NULL via nilIfEmpty.
|
||||
resolved := resolvePathWithContext(
|
||||
parsePathArray(data.PathJSON),
|
||||
strings.ToLower(data.FromPubkey),
|
||||
s.neighborGraph.load(),
|
||||
s.prefixIdx.load(),
|
||||
)
|
||||
resolvedJSON := marshalResolvedPath(resolved)
|
||||
|
||||
_, err = s.stmtInsertObservation.Exec(
|
||||
txID, observerIdx, data.Direction,
|
||||
data.SNR, data.RSSI, data.Score,
|
||||
data.PathJSON, epochTs, nilIfEmpty(data.RawHex),
|
||||
nilIfEmpty(resolvedJSON),
|
||||
)
|
||||
if err != nil {
|
||||
s.Stats.WriteErrors.Add(1)
|
||||
|
||||
@@ -63,6 +63,16 @@ func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() {
|
||||
// returning — first server load needs a fully-populated table.
|
||||
wuStart := time.Now()
|
||||
var wuTotal int
|
||||
// Prime the prefix index (#1547) so the very first
|
||||
// InsertTransmission after startup can resolve hop prefixes.
|
||||
if err := s.RefreshPrefixIndex(); err != nil {
|
||||
log.Printf("[neighbor-build] initial prefix-index refresh error: %v", err)
|
||||
}
|
||||
// Prime the neighbor graph (#1560) so the context-aware resolver
|
||||
// has adjacency data on the very first InsertTransmission.
|
||||
if err := s.RefreshNeighborGraph(); err != nil {
|
||||
log.Printf("[neighbor-build] initial neighbor-graph refresh error: %v", err)
|
||||
}
|
||||
for {
|
||||
n, err := s.buildAndPersistNeighborEdges()
|
||||
if err != nil {
|
||||
@@ -85,7 +95,18 @@ func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() {
|
||||
select {
|
||||
case <-t.C:
|
||||
start := time.Now()
|
||||
// Refresh the prefix index alongside the edges build
|
||||
// (#1547) so new nodes become resolvable within a tick.
|
||||
if err := s.RefreshPrefixIndex(); err != nil {
|
||||
log.Printf("[neighbor-build] prefix-index refresh error: %v", err)
|
||||
}
|
||||
n, err := s.buildAndPersistNeighborEdges()
|
||||
// Refresh the neighbor-graph snapshot after the edges
|
||||
// build (#1560) so the context-aware resolver picks up
|
||||
// newly persisted adjacencies on the next ingest.
|
||||
if grErr := s.RefreshNeighborGraph(); grErr != nil {
|
||||
log.Printf("[neighbor-build] neighbor-graph refresh error: %v", grErr)
|
||||
}
|
||||
dur := time.Since(start)
|
||||
if err != nil {
|
||||
log.Printf("[neighbor-build] tick error after %s: %v", dur, err)
|
||||
|
||||
@@ -0,0 +1,225 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// Context-aware hop resolver — full restore of pre-#1289 hop
|
||||
// disambiguation semantics, ported into the ingestor (where the
|
||||
// neighbor graph + node directory now live, per #1283).
|
||||
//
|
||||
// Why this exists (issues #1547 / #1560):
|
||||
// The naive `resolvePath` only resolves hops whose prefix is unique
|
||||
// in the node table. On a >2K-node mesh the dominant case is 1-byte
|
||||
// prefix collisions (multiple candidates per prefix). Without
|
||||
// adjacency disambiguation those hops always serialize as `nil`
|
||||
// and the resolved_path remains effectively empty for the largest
|
||||
// meshes — the very deployments that need it most.
|
||||
//
|
||||
// Algorithm (ported from cmd/server/store.go @ commit 450236d5
|
||||
// `pm.resolveWithContext`, intersected with the disambiguation gating
|
||||
// from PR #1144 / #1352):
|
||||
//
|
||||
// For each hop:
|
||||
// 1. Collect candidate pubkeys by prefix-match (existing prefixIndex).
|
||||
// 2. len==0 → nil.
|
||||
// 3. len==1 → that pubkey.
|
||||
// 4. len>1 → filter by NeighborGraph adjacency to the anchor:
|
||||
// - hop 0 anchor = fromPubkey (ADVERT originator) if known;
|
||||
// - hop i (i>0) anchor = previous resolved hop's pubkey;
|
||||
// if the previous hop did not resolve, the chain breaks
|
||||
// and subsequent >1-candidate hops fall to nil.
|
||||
// Surviving candidates after filter:
|
||||
// - exactly 1 → use it
|
||||
// - 0 or >1 → nil (cannot disambiguate further)
|
||||
//
|
||||
// This is the conservative tier-1 variant. Pre-#1289 also carried
|
||||
// tier-2 (geo proximity), tier-3 (GPS preference), tier-4 (obs-count
|
||||
// fallback) — those were noisy in practice and are intentionally NOT
|
||||
// ported here; this PR is a regression restore, not an enhancement.
|
||||
|
||||
// NeighborGraph is the in-memory adjacency snapshot used by the
|
||||
// context-aware resolver. Internally lowercased.
|
||||
type NeighborGraph struct {
|
||||
adj map[string]map[string]struct{}
|
||||
}
|
||||
|
||||
// NewNeighborGraph returns an empty graph.
|
||||
func NewNeighborGraph() *NeighborGraph {
|
||||
return &NeighborGraph{adj: make(map[string]map[string]struct{})}
|
||||
}
|
||||
|
||||
// AddEdge adds an undirected adjacency a↔b. Self-loops and empty
|
||||
// endpoints are ignored.
|
||||
func (g *NeighborGraph) AddEdge(a, b string) {
|
||||
a = strings.ToLower(a)
|
||||
b = strings.ToLower(b)
|
||||
if a == "" || b == "" || a == b {
|
||||
return
|
||||
}
|
||||
if g.adj[a] == nil {
|
||||
g.adj[a] = make(map[string]struct{})
|
||||
}
|
||||
if g.adj[b] == nil {
|
||||
g.adj[b] = make(map[string]struct{})
|
||||
}
|
||||
g.adj[a][b] = struct{}{}
|
||||
g.adj[b][a] = struct{}{}
|
||||
}
|
||||
|
||||
// IsAdjacent reports whether a and b appear together in any neighbor edge.
|
||||
func (g *NeighborGraph) IsAdjacent(a, b string) bool {
|
||||
if g == nil {
|
||||
return false
|
||||
}
|
||||
a = strings.ToLower(a)
|
||||
b = strings.ToLower(b)
|
||||
if a == "" || b == "" {
|
||||
return false
|
||||
}
|
||||
nbrs, ok := g.adj[a]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
_, present := nbrs[b]
|
||||
return present
|
||||
}
|
||||
|
||||
// neighborGraphHolder caches the graph for the InsertTransmission hot
|
||||
// path. atomic.Value lets the 60s rebuild publish without a read-side
|
||||
// lock.
|
||||
type neighborGraphHolder struct {
|
||||
v atomic.Value // holds *NeighborGraph
|
||||
}
|
||||
|
||||
func (h *neighborGraphHolder) load() *NeighborGraph {
|
||||
if v := h.v.Load(); v != nil {
|
||||
return v.(*NeighborGraph)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *neighborGraphHolder) store(g *NeighborGraph) {
|
||||
h.v.Store(g)
|
||||
}
|
||||
|
||||
// loadNeighborGraph reads neighbor_edges and returns an in-memory
|
||||
// adjacency snapshot. Safe to call against a fresh DB (returns an
|
||||
// empty graph).
|
||||
func loadNeighborGraph(db *sql.DB) (*NeighborGraph, error) {
|
||||
rows, err := db.Query(`SELECT node_a, node_b FROM neighbor_edges`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
g := NewNeighborGraph()
|
||||
for rows.Next() {
|
||||
var a, b string
|
||||
if err := rows.Scan(&a, &b); err != nil {
|
||||
continue
|
||||
}
|
||||
g.AddEdge(a, b)
|
||||
}
|
||||
return g, nil
|
||||
}
|
||||
|
||||
// resolveHopWithContext resolves a single hop using NeighborGraph
|
||||
// adjacency to the anchor. Returns nil when the hop cannot be
|
||||
// disambiguated.
|
||||
//
|
||||
// exclude is a set of pubkeys to discard from the candidate pool
|
||||
// (typically the prior hops already resolved on the path — a packet
|
||||
// does not revisit a node).
|
||||
//
|
||||
// Behavior matrix:
|
||||
// len(candidates) | anchor | graph | result
|
||||
// 0 | — | — | nil
|
||||
// 1 | — | — | candidates[0]
|
||||
// >1 | "" or no graph|— | nil
|
||||
// >1 | non-empty | set | unique adjacent candidate
|
||||
// (or nil if 0 or >1 survive)
|
||||
func resolveHopWithContext(hop string, anchor string, graph *NeighborGraph, idx prefixIndex, exclude map[string]struct{}) *string {
|
||||
if idx == nil {
|
||||
return nil
|
||||
}
|
||||
h := strings.ToLower(hop)
|
||||
candidates := idx[h]
|
||||
switch len(candidates) {
|
||||
case 0:
|
||||
return nil
|
||||
case 1:
|
||||
pk := candidates[0]
|
||||
if _, skip := exclude[pk]; skip {
|
||||
return nil
|
||||
}
|
||||
return &pk
|
||||
}
|
||||
if graph == nil || anchor == "" {
|
||||
return nil
|
||||
}
|
||||
var match string
|
||||
survivors := 0
|
||||
for _, cand := range candidates {
|
||||
if _, skip := exclude[cand]; skip {
|
||||
continue
|
||||
}
|
||||
if graph.IsAdjacent(anchor, cand) {
|
||||
survivors++
|
||||
if survivors > 1 {
|
||||
return nil
|
||||
}
|
||||
match = cand
|
||||
}
|
||||
}
|
||||
if survivors == 1 {
|
||||
return &match
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// resolvePathWithContext walks the hop list, anchoring hop 0 on
|
||||
// fromPubkey (for ADVERTs) and each subsequent hop on the previous
|
||||
// resolved hop. Previously-resolved pubkeys (plus the originator) are
|
||||
// excluded from later candidate pools so the walk doesn't revisit a
|
||||
// node. Returns a `[]*string` shape compatible with
|
||||
// marshalResolvedPath (and the all-nil clobber-guard from PR #1548).
|
||||
func resolvePathWithContext(hops []string, fromPubkey string, graph *NeighborGraph, idx prefixIndex) []*string {
|
||||
if len(hops) == 0 {
|
||||
return nil
|
||||
}
|
||||
out := make([]*string, len(hops))
|
||||
if idx == nil {
|
||||
return out
|
||||
}
|
||||
prevAnchor := strings.ToLower(fromPubkey)
|
||||
seen := make(map[string]struct{}, len(hops)+1)
|
||||
if prevAnchor != "" {
|
||||
seen[prevAnchor] = struct{}{}
|
||||
}
|
||||
for i, hop := range hops {
|
||||
r := resolveHopWithContext(hop, prevAnchor, graph, idx, seen)
|
||||
out[i] = r
|
||||
if r != nil {
|
||||
lc := strings.ToLower(*r)
|
||||
seen[lc] = struct{}{}
|
||||
prevAnchor = lc
|
||||
} else {
|
||||
prevAnchor = ""
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// RefreshNeighborGraph loads the latest neighbor_edges snapshot and
|
||||
// publishes it atomically. Called on startup and once per neighbor-
|
||||
// edges builder tick (60s) alongside RefreshPrefixIndex.
|
||||
func (s *Store) RefreshNeighborGraph() error {
|
||||
g, err := loadNeighborGraph(s.db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.neighborGraph.store(g)
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,113 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// Issue #1547 — resolved_path writer (ingestor-owned).
|
||||
//
|
||||
// Per the #1283 refactor (server is read-only; ingestor owns the
|
||||
// neighbor graph + node directory), the writer that populated
|
||||
// `observations.resolved_path` must live here in the ingestor. PR #1289
|
||||
// removed the server-side writer without porting it — this restores it.
|
||||
//
|
||||
// Approach:
|
||||
// - `resolvePath` is a pure function: hop prefixes → full pubkeys
|
||||
// using the in-memory prefix index built from `nodes.public_key`.
|
||||
// - Unique-prefix hops resolve to the full pubkey; ambiguous or
|
||||
// unknown hops resolve to `nil`. The output shape is `[]*string`
|
||||
// (with nulls for unresolved positions) — the JSON serialization
|
||||
// matches what the server's `unmarshalResolvedPath` /
|
||||
// frontend `getResolvedPath` already consume.
|
||||
// - The prefix index is rebuilt on startup and once per neighbor-
|
||||
// builder tick (60s) so new nodes start resolving within a minute
|
||||
// without blocking the MQTT ingest path.
|
||||
|
||||
// resolvePath maps each hop prefix to a full pubkey when the index
|
||||
// has exactly one candidate; returns nil at that position otherwise.
|
||||
// Returns nil for empty/no hops.
|
||||
func resolvePath(hops []string, idx prefixIndex) []*string {
|
||||
if len(hops) == 0 {
|
||||
return nil
|
||||
}
|
||||
out := make([]*string, len(hops))
|
||||
if idx == nil {
|
||||
return out
|
||||
}
|
||||
for i, hop := range hops {
|
||||
h := strings.ToLower(hop)
|
||||
candidates := idx[h]
|
||||
if len(candidates) == 1 {
|
||||
pk := candidates[0]
|
||||
out[i] = &pk
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// marshalResolvedPath JSON-encodes a resolved path. Returns "" when
|
||||
// the input is empty OR when every element is nil (writer treats "" as
|
||||
// SQL NULL).
|
||||
//
|
||||
// The all-nil case matters because of the UPSERT in InsertTransmission:
|
||||
//
|
||||
// resolved_path = COALESCE(excluded.resolved_path, resolved_path)
|
||||
//
|
||||
// If we emitted "[null,null]" here, nilIfEmpty() would let it through
|
||||
// as a non-NULL string and the COALESCE would OVERWRITE a previously
|
||||
// stored good resolved_path on re-ingest. Returning "" lets nilIfEmpty
|
||||
// produce SQL NULL so the COALESCE falls through to the existing value.
|
||||
// See issue #1547 / PR #1548 reviewer findings.
|
||||
func marshalResolvedPath(rp []*string) string {
|
||||
if len(rp) == 0 {
|
||||
return ""
|
||||
}
|
||||
allNil := true
|
||||
for _, p := range rp {
|
||||
if p != nil {
|
||||
allNil = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if allNil {
|
||||
return ""
|
||||
}
|
||||
b, err := json.Marshal(rp)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
// prefixIdxHolder caches the prefix index for the InsertTransmission
|
||||
// hot path. atomic.Value lets the 60s rebuild happen without a lock on
|
||||
// the read side.
|
||||
type prefixIdxHolder struct {
|
||||
v atomic.Value // holds prefixIndex
|
||||
}
|
||||
|
||||
func (h *prefixIdxHolder) load() prefixIndex {
|
||||
if v := h.v.Load(); v != nil {
|
||||
return v.(prefixIndex)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *prefixIdxHolder) store(idx prefixIndex) {
|
||||
h.v.Store(idx)
|
||||
}
|
||||
|
||||
// RefreshPrefixIndex rebuilds the in-memory prefix index from the
|
||||
// nodes table and publishes it atomically. Called on startup and from
|
||||
// the neighbor-edges builder tick (60s) so new nodes become resolvable
|
||||
// without per-insert DB scans.
|
||||
func (s *Store) RefreshPrefixIndex() error {
|
||||
idx, err := buildPrefixIndex(s.db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.prefixIdx.store(idx)
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,446 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func unmarshalResolvedPathLocal(s string) []*string {
|
||||
if s == "" {
|
||||
return nil
|
||||
}
|
||||
var out []*string
|
||||
if json.Unmarshal([]byte(s), &out) != nil {
|
||||
return nil
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// TestResolvePathPureFunction is a unit test for the pure resolvePath
|
||||
// helper. Asserts:
|
||||
// - unique-prefix hops resolve to the full pubkey
|
||||
// - ambiguous-prefix hops resolve to nil
|
||||
// - unknown-prefix hops resolve to nil
|
||||
// - return slice length equals input hop count
|
||||
//
|
||||
// Regression gate for #1547 (resolved_path stopped being written).
|
||||
func TestResolvePathPureFunction(t *testing.T) {
|
||||
idx := prefixIndex{
|
||||
// "aa" → exactly one pubkey
|
||||
"aa": {"aaaaaaaaaa"},
|
||||
"aaaaaaaaaa": {"aaaaaaaaaa"},
|
||||
// "bb" → exactly one pubkey
|
||||
"bb": {"bbbbbbbbbb"},
|
||||
"bbbbbbbbbb": {"bbbbbbbbbb"},
|
||||
// "cc" → ambiguous (2 candidates)
|
||||
"cc": {"cccccccccc", "ccdddddddd"},
|
||||
"cccccccccc": {"cccccccccc"},
|
||||
}
|
||||
|
||||
got := resolvePath([]string{"aa", "cc", "ff", "bb"}, idx)
|
||||
if len(got) != 4 {
|
||||
t.Fatalf("expected len 4, got %d", len(got))
|
||||
}
|
||||
if got[0] == nil || *got[0] != "aaaaaaaaaa" {
|
||||
t.Errorf("hop[0] aa: want aaaaaaaaaa, got %v", deref(got[0]))
|
||||
}
|
||||
if got[1] != nil {
|
||||
t.Errorf("hop[1] cc: want nil (ambiguous), got %v", deref(got[1]))
|
||||
}
|
||||
if got[2] != nil {
|
||||
t.Errorf("hop[2] ff: want nil (unknown), got %v", deref(got[2]))
|
||||
}
|
||||
if got[3] == nil || *got[3] != "bbbbbbbbbb" {
|
||||
t.Errorf("hop[3] bb: want bbbbbbbbbb, got %v", deref(got[3]))
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolvePathEmptyHops asserts empty/no-path produces nil.
|
||||
func TestResolvePathEmptyHops(t *testing.T) {
|
||||
if got := resolvePath(nil, prefixIndex{}); got != nil {
|
||||
t.Errorf("nil hops: want nil, got %v", got)
|
||||
}
|
||||
if got := resolvePath([]string{}, prefixIndex{}); got != nil {
|
||||
t.Errorf("empty hops: want nil, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestMarshalResolvedPathRoundtrip asserts the JSON shape matches the
|
||||
// server's marshal/unmarshal contract: `[]*string` with nulls for
|
||||
// unresolved hops.
|
||||
func TestMarshalResolvedPathRoundtrip(t *testing.T) {
|
||||
a := "aaaaaaaaaa"
|
||||
b := "bbbbbbbbbb"
|
||||
in := []*string{&a, nil, &b}
|
||||
s := marshalResolvedPath(in)
|
||||
want := `["aaaaaaaaaa",null,"bbbbbbbbbb"]`
|
||||
if s != want {
|
||||
t.Errorf("marshal: want %s, got %s", want, s)
|
||||
}
|
||||
}
|
||||
|
||||
// TestInsertTransmissionWritesResolvedPath is the integration test that
|
||||
// gates the regression introduced by PR #1289 (issue #1547).
|
||||
//
|
||||
// Setup: seed two nodes + one observer + invoke InsertTransmission with
|
||||
// a PacketData whose PathJSON references one of the seeded nodes by
|
||||
// unique 1-byte (2-hex) prefix.
|
||||
//
|
||||
// Assert: the inserted observations row has a non-NULL resolved_path
|
||||
// whose JSON-decoded length equals the hop count, and the resolved
|
||||
// element matches the seeded node's full pubkey.
|
||||
func TestInsertTransmissionWritesResolvedPath(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
dbPath := filepath.Join(dir, "ingest.db")
|
||||
|
||||
store, err := OpenStore(dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("OpenStore: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
// Seed nodes with unique 1-byte prefixes.
|
||||
if _, err := store.db.Exec(
|
||||
`INSERT INTO nodes (public_key, name) VALUES (?, ?), (?, ?)`,
|
||||
"aaaaaaaaaa", "from-node",
|
||||
"bbbbbbbbbb", "first-hop",
|
||||
); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Seed one observer (needed so InsertTransmission resolves observer_idx).
|
||||
if err := store.UpsertObserver("obs-1", "observer-1", "", nil); err != nil {
|
||||
t.Fatalf("UpsertObserver: %v", err)
|
||||
}
|
||||
|
||||
// Force the prefix index to be (re)built from the seeded nodes so
|
||||
// the InsertTransmission path has something to resolve against.
|
||||
if err := store.RefreshPrefixIndex(); err != nil {
|
||||
t.Fatalf("RefreshPrefixIndex: %v", err)
|
||||
}
|
||||
|
||||
pkt := &PacketData{
|
||||
RawHex: "deadbeef",
|
||||
Timestamp: "2026-06-01T00:00:00Z",
|
||||
ObserverID: "obs-1",
|
||||
Hash: "h-1547",
|
||||
RouteType: 0,
|
||||
PayloadType: int(payloadADVERT),
|
||||
PathJSON: `["bb"]`,
|
||||
DecodedJSON: "{}",
|
||||
FromPubkey: "aaaaaaaaaa",
|
||||
}
|
||||
if _, err := store.InsertTransmission(pkt); err != nil {
|
||||
t.Fatalf("InsertTransmission: %v", err)
|
||||
}
|
||||
|
||||
var rp sql.NullString
|
||||
if err := store.db.QueryRow(
|
||||
`SELECT resolved_path FROM observations WHERE transmission_id = (SELECT id FROM transmissions WHERE hash = ?)`,
|
||||
"h-1547",
|
||||
).Scan(&rp); err != nil {
|
||||
t.Fatalf("query: %v", err)
|
||||
}
|
||||
if !rp.Valid || rp.String == "" {
|
||||
t.Fatalf("expected non-nil resolved_path, got NULL/empty (regression: #1547)")
|
||||
}
|
||||
got := unmarshalResolvedPathLocal(rp.String)
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("resolved_path length: want 1, got %d (value=%s)", len(got), rp.String)
|
||||
}
|
||||
if got[0] == nil || *got[0] != "bbbbbbbbbb" {
|
||||
t.Errorf("resolved_path[0]: want bbbbbbbbbb, got %v (raw=%s)", deref(got[0]), rp.String)
|
||||
}
|
||||
}
|
||||
|
||||
func deref(p *string) string {
|
||||
if p == nil {
|
||||
return "<nil>"
|
||||
}
|
||||
return *p
|
||||
}
|
||||
|
||||
// ─── #1560: context-aware resolution tests ─────────────────────────────────
|
||||
//
|
||||
// These exercise the post-fix behavior of resolveHopWithContext +
|
||||
// resolvePathWithContext. Until the green commit lands they MUST fail
|
||||
// on assertions (the stub falls back to naive `len==1` and returns nil
|
||||
// on every >1-candidate prefix), proving the gate is real.
|
||||
|
||||
// build5NodeAmbiguousIndex returns a prefixIndex where 3 of 5 nodes
|
||||
// share the 1-byte prefix 0x5c. Pubkeys are the "fingerprints":
|
||||
//
|
||||
// A = "5c000000000000000000000000000000aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
|
||||
// B = "5c000000000000000000000000000000bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
|
||||
// C = "5c000000000000000000000000000000cccccccccccccccccccccccccccccccc"
|
||||
// D = "dd000000000000000000000000000000dddddddddddddddddddddddddddddddd"
|
||||
// E = "ee000000000000000000000000000000eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"
|
||||
func build5NodeAmbiguousIndex() (idx prefixIndex, A, B, C, D, E string) {
|
||||
A = "5c000000000000000000000000000000aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
|
||||
B = "5c000000000000000000000000000000bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
|
||||
C = "5c000000000000000000000000000000cccccccccccccccccccccccccccccccc"
|
||||
D = "dd000000000000000000000000000000dddddddddddddddddddddddddddddddd"
|
||||
E = "ee000000000000000000000000000000eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"
|
||||
idx = prefixIndex{
|
||||
// 1-byte: 5c → A,B,C (collision); dd → D; ee → E
|
||||
"5c": {A, B, C},
|
||||
"dd": {D},
|
||||
"ee": {E},
|
||||
// full-key entries (so exact-match lookups still resolve)
|
||||
A: {A}, B: {B}, C: {C}, D: {D}, E: {E},
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// TestResolveHopWithContext_OneByteCollision_AdjacencyResolves
|
||||
// asserts the dominant production case (#1560): three nodes share the
|
||||
// 1-byte prefix 0x5c, but NeighborGraph adjacency narrows to exactly
|
||||
// one. The naive resolver returns nil; the context-aware resolver
|
||||
// MUST return the right pubkey.
|
||||
func TestResolveHopWithContext_OneByteCollision_AdjacencyResolves(t *testing.T) {
|
||||
idx, A, B, C, D, E := build5NodeAmbiguousIndex()
|
||||
g := NewNeighborGraph()
|
||||
// chain: A↔B, B↔C, C↔D, D↔E
|
||||
g.AddEdge(A, B)
|
||||
g.AddEdge(B, C)
|
||||
g.AddEdge(C, D)
|
||||
g.AddEdge(D, E)
|
||||
|
||||
// Anchored on A, the only 5c neighbor of A is B.
|
||||
got := resolveHopWithContext("5c", A, g, idx, nil)
|
||||
if got == nil {
|
||||
t.Fatalf("anchor=A, hop=5c: want B (%s), got <nil>", B)
|
||||
}
|
||||
if *got != B {
|
||||
t.Errorf("anchor=A, hop=5c: want %s, got %s", B, *got)
|
||||
}
|
||||
|
||||
// Anchored on B, the only 5c neighbors of B are A and C — but A is
|
||||
// the originator anchor in a path-walk; here we just assert that
|
||||
// 2 surviving candidates → nil (cannot disambiguate further).
|
||||
got = resolveHopWithContext("5c", B, g, idx, nil)
|
||||
if got != nil {
|
||||
t.Errorf("anchor=B, hop=5c: ambiguous (A and C both adjacent); want <nil>, got %s", *got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolvePathWithContext_TwoHopChainAnchoredOnFromNode covers the
|
||||
// canonical 1-byte collision case end-to-end: path = [5c, 5c],
|
||||
// from_node = A → expect [B, C].
|
||||
func TestResolvePathWithContext_TwoHopChainAnchoredOnFromNode(t *testing.T) {
|
||||
idx, A, B, C, _, _ := build5NodeAmbiguousIndex()
|
||||
g := NewNeighborGraph()
|
||||
g.AddEdge(A, B)
|
||||
g.AddEdge(B, C)
|
||||
|
||||
got := resolvePathWithContext([]string{"5c", "5c"}, A, g, idx)
|
||||
if len(got) != 2 {
|
||||
t.Fatalf("len(got)=%d, want 2 (raw=%v)", len(got), got)
|
||||
}
|
||||
if got[0] == nil || *got[0] != B {
|
||||
t.Errorf("hop[0]: want %s, got %v", B, deref(got[0]))
|
||||
}
|
||||
if got[1] == nil || *got[1] != C {
|
||||
t.Errorf("hop[1]: want %s, got %v", C, deref(got[1]))
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolveHopWithContext_NoAdjacencyContext_ReturnsNil asserts the
|
||||
// negative gate: 3 nodes with shared prefix, no edges between them in
|
||||
// the graph, hop=[5c] with no usable anchor → nil. Guards against an
|
||||
// over-eager resolver that just picks the first candidate.
|
||||
func TestResolveHopWithContext_NoAdjacencyContext_ReturnsNil(t *testing.T) {
|
||||
idx, _, _, _, _, _ := build5NodeAmbiguousIndex()
|
||||
g := NewNeighborGraph() // empty: no edges
|
||||
got := resolveHopWithContext("5c", "", g, idx, nil)
|
||||
if got != nil {
|
||||
t.Errorf("no anchor + empty graph: want <nil>, got %s", *got)
|
||||
}
|
||||
|
||||
// With an anchor that's not adjacent to any candidate, also nil.
|
||||
got = resolveHopWithContext("5c", "deadbeefdeadbeef", g, idx, nil)
|
||||
if got != nil {
|
||||
t.Errorf("non-adjacent anchor: want <nil>, got %s", *got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolvePathWithContext_AdvertAnchoring asserts ADVERT-style
|
||||
// anchoring: from_pubkey is the originator, hop[0] is one of its
|
||||
// 1-byte-prefix neighbors → resolved.
|
||||
func TestResolvePathWithContext_AdvertAnchoring(t *testing.T) {
|
||||
idx, A, B, _, _, _ := build5NodeAmbiguousIndex()
|
||||
g := NewNeighborGraph()
|
||||
g.AddEdge(A, B) // only B is adjacent to A among the 5c candidates
|
||||
|
||||
got := resolvePathWithContext([]string{"5c"}, A, g, idx)
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("len(got)=%d, want 1", len(got))
|
||||
}
|
||||
if got[0] == nil || *got[0] != B {
|
||||
t.Errorf("ADVERT anchored on A, hop=5c: want %s, got %v", B, deref(got[0]))
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolvePathWithContext_RegressionMultiByteStillWorks asserts no
|
||||
// regression in the 2/3/4-byte prefix path that PR #1548 already
|
||||
// handled — unique prefixes resolve regardless of graph context.
|
||||
func TestResolvePathWithContext_RegressionMultiByteStillWorks(t *testing.T) {
|
||||
idx, _, _, _, D, E := build5NodeAmbiguousIndex()
|
||||
// dd and ee are unique 1-byte prefixes — naive path still works.
|
||||
got := resolvePathWithContext([]string{"dd", "ee"}, "", nil, idx)
|
||||
if len(got) != 2 {
|
||||
t.Fatalf("len(got)=%d, want 2", len(got))
|
||||
}
|
||||
if got[0] == nil || *got[0] != D {
|
||||
t.Errorf("hop[0] dd: want %s, got %v", D, deref(got[0]))
|
||||
}
|
||||
if got[1] == nil || *got[1] != E {
|
||||
t.Errorf("hop[1] ee: want %s, got %v", E, deref(got[1]))
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolvePathWithContext_AllNilContractPreserved asserts the
|
||||
// all-nil → empty-string clobber-guard contract from PR #1548 still
|
||||
// holds: an unresolvable path through the context resolver, when fed
|
||||
// to marshalResolvedPath, MUST yield "" (so nilIfEmpty → SQL NULL
|
||||
// → COALESCE preserves existing).
|
||||
func TestResolvePathWithContext_AllNilContractPreserved(t *testing.T) {
|
||||
// Empty index → every hop nil.
|
||||
got := resolvePathWithContext([]string{"5c", "dd"}, "", nil, prefixIndex{})
|
||||
if len(got) != 2 {
|
||||
t.Fatalf("len(got)=%d, want 2", len(got))
|
||||
}
|
||||
for i, p := range got {
|
||||
if p != nil {
|
||||
t.Errorf("hop[%d]: want <nil>, got %s", i, *p)
|
||||
}
|
||||
}
|
||||
if s := marshalResolvedPath(got); s != "" {
|
||||
t.Errorf("all-nil marshal: want \"\", got %q (clobber-guard regression)", s)
|
||||
}
|
||||
}
|
||||
|
||||
// TestMarshalResolvedPathAllNilReturnsEmpty is a regression gate for
|
||||
// the data-loss clobber bug surfaced in PR #1548 review.
|
||||
//
|
||||
// When resolvePath fails to resolve ANY hop (every element nil),
|
||||
// marshalResolvedPath previously emitted "[null,null,...]" — a
|
||||
// non-empty string that bypassed nilIfEmpty and then OVERWROTE the
|
||||
// existing resolved_path via the COALESCE(excluded, current) UPSERT
|
||||
// on re-ingest. The fix returns "" so nilIfEmpty produces SQL NULL and
|
||||
// the COALESCE preserves the existing good value.
|
||||
func TestMarshalResolvedPathAllNilReturnsEmpty(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
in []*string
|
||||
}{
|
||||
{"one-nil", []*string{nil}},
|
||||
{"two-nils", []*string{nil, nil}},
|
||||
{"three-nils", []*string{nil, nil, nil}},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := marshalResolvedPath(tc.in)
|
||||
if got != "" {
|
||||
t.Errorf("all-nil input must return \"\" (so nilIfEmpty → SQL NULL → COALESCE preserves existing); got %q", got)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Mixed (at least one non-nil) MUST still marshal normally so we
|
||||
// don't lose partial resolutions.
|
||||
a := "aaaaaaaaaa"
|
||||
mixed := marshalResolvedPath([]*string{&a, nil})
|
||||
if mixed != `["aaaaaaaaaa",null]` {
|
||||
t.Errorf("partial resolution must still serialize; got %q", mixed)
|
||||
}
|
||||
}
|
||||
|
||||
// TestInsertTransmissionDoesNotClobberResolvedPathOnAllNil is the
|
||||
// integration-level regression test for the data-loss bug.
|
||||
//
|
||||
// Setup: insert a transmission whose first ingest resolves cleanly to
|
||||
// a known pubkey. Then re-ingest the SAME transmission after the
|
||||
// prefix index has been cleared (simulating an empty NeighborGraph /
|
||||
// all-nil resolution path) and assert the previously stored
|
||||
// resolved_path is PRESERVED (NOT overwritten to "[null]" or NULL).
|
||||
//
|
||||
// Pre-fix behavior: marshalResolvedPath emitted "[null]", nilIfEmpty
|
||||
// kept it non-NULL, and COALESCE(excluded.resolved_path, resolved_path)
|
||||
// clobbered the original "bbbbbbbbbb".
|
||||
func TestInsertTransmissionDoesNotClobberResolvedPathOnAllNil(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
dbPath := filepath.Join(dir, "ingest.db")
|
||||
|
||||
store, err := OpenStore(dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("OpenStore: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
if _, err := store.db.Exec(
|
||||
`INSERT INTO nodes (public_key, name) VALUES (?, ?), (?, ?)`,
|
||||
"aaaaaaaaaa", "from-node",
|
||||
"bbbbbbbbbb", "first-hop",
|
||||
); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := store.UpsertObserver("obs-1", "observer-1", "", nil); err != nil {
|
||||
t.Fatalf("UpsertObserver: %v", err)
|
||||
}
|
||||
if err := store.RefreshPrefixIndex(); err != nil {
|
||||
t.Fatalf("RefreshPrefixIndex: %v", err)
|
||||
}
|
||||
|
||||
pkt := &PacketData{
|
||||
RawHex: "deadbeef",
|
||||
Timestamp: "2026-06-01T00:00:00Z",
|
||||
ObserverID: "obs-1",
|
||||
Hash: "h-clobber",
|
||||
RouteType: 0,
|
||||
PayloadType: int(payloadADVERT),
|
||||
PathJSON: `["bb"]`,
|
||||
DecodedJSON: "{}",
|
||||
FromPubkey: "aaaaaaaaaa",
|
||||
}
|
||||
if _, err := store.InsertTransmission(pkt); err != nil {
|
||||
t.Fatalf("first InsertTransmission: %v", err)
|
||||
}
|
||||
|
||||
// Sanity: first write populated resolved_path.
|
||||
var first sql.NullString
|
||||
if err := store.db.QueryRow(
|
||||
`SELECT resolved_path FROM observations WHERE transmission_id = (SELECT id FROM transmissions WHERE hash = ?)`,
|
||||
"h-clobber",
|
||||
).Scan(&first); err != nil {
|
||||
t.Fatalf("first query: %v", err)
|
||||
}
|
||||
if !first.Valid || first.String == "" {
|
||||
t.Fatalf("precondition failed: first ingest left resolved_path NULL/empty; cannot test clobber")
|
||||
}
|
||||
wantPreserved := first.String
|
||||
|
||||
// Now wipe the prefix index so re-ingest produces an all-nil
|
||||
// resolution — exactly the scenario where the bug clobbers data.
|
||||
store.prefixIdx.store(prefixIndex{})
|
||||
|
||||
if _, err := store.InsertTransmission(pkt); err != nil {
|
||||
t.Fatalf("re-ingest InsertTransmission: %v", err)
|
||||
}
|
||||
|
||||
var after sql.NullString
|
||||
if err := store.db.QueryRow(
|
||||
`SELECT resolved_path FROM observations WHERE transmission_id = (SELECT id FROM transmissions WHERE hash = ?)`,
|
||||
"h-clobber",
|
||||
).Scan(&after); err != nil {
|
||||
t.Fatalf("post-reingest query: %v", err)
|
||||
}
|
||||
if !after.Valid {
|
||||
t.Fatalf("data loss: resolved_path was NULL'd by re-ingest (was %q)", wantPreserved)
|
||||
}
|
||||
if after.String != wantPreserved {
|
||||
t.Errorf("data loss: resolved_path was clobbered by all-nil re-ingest\n before: %s\n after: %s", wantPreserved, after.String)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user