diff --git a/cmd/server/go.mod b/cmd/server/go.mod
index eb801bd5..55a34d7f 100644
--- a/cmd/server/go.mod
+++ b/cmd/server/go.mod
@@ -36,7 +36,6 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
- golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.22.0 // indirect
modernc.org/libc v1.55.3 // indirect
modernc.org/mathutil v1.6.0 // indirect
@@ -47,6 +46,9 @@ require github.com/meshcore-analyzer/prunequeue v0.0.0
replace github.com/meshcore-analyzer/prunequeue => ../../internal/prunequeue
-require github.com/meshcore-analyzer/mbcapqueue v0.0.0
+require (
+ github.com/meshcore-analyzer/mbcapqueue v0.0.0
+ golang.org/x/sync v0.10.0
+)
replace github.com/meshcore-analyzer/mbcapqueue => ../../internal/mbcapqueue
diff --git a/cmd/server/neighbor_api.go b/cmd/server/neighbor_api.go
index fa6c50dd..8fd4cf1b 100644
--- a/cmd/server/neighbor_api.go
+++ b/cmd/server/neighbor_api.go
@@ -452,6 +452,28 @@ func (s *Server) buildNodeInfoMap() map[string]nodeInfo {
}
}
}
+
+ // Fold in nodes.first_seen so callers (e.g. /api/nodes/{pk}/reach)
+ // don't need a per-request single-row SELECT. One bulk scan amortises
+ // across the whole map; missing/NULL rows are silently skipped (the
+ // node may be observer-only or pre-first_seen-schema).
+ fsRows, err := s.db.conn.Query("SELECT LOWER(public_key), COALESCE(first_seen,'') FROM nodes")
+ if err == nil {
+ defer fsRows.Close()
+ for fsRows.Next() {
+ var pk, fs string
+ if fsRows.Scan(&pk, &fs) != nil {
+ continue
+ }
+ if fs == "" {
+ continue
+ }
+ if entry, ok := m[pk]; ok {
+ entry.FirstSeen = fs
+ m[pk] = entry
+ }
+ }
+ }
}
return m
diff --git a/cmd/server/node_reach.go b/cmd/server/node_reach.go
new file mode 100644
index 00000000..07d3fe2f
--- /dev/null
+++ b/cmd/server/node_reach.go
@@ -0,0 +1,664 @@
+package main
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+ "log"
+ "net/http"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/gorilla/mux"
+ "golang.org/x/sync/singleflight"
+)
+
+// reachScanRowLimit hard-caps the windowed observation scan so a hot relay node
+// with weeks of traffic can't pull an unbounded result set into memory. A node
+// with >200k matching observations in the window is far past dashboard scale;
+// beyond the cap the counts are a (still representative) truncation. The LIKE
+// filter is unavoidably a text scan of path_json over the timestamp-narrowed
+// window — an indexed path-token column would need an ingestor-side schema
+// migration (the server is read-only by invariant), so it's a follow-up.
+// var (not const) so tests can lower the cap to exercise the truncation path
+// without inserting 200k rows.
+var reachScanRowLimit = 200000
+
+// pathRow is one observation fed to attributeDirections. path tokens are
+// uppercase hex hop prefixes (as stored in observations.path_json). SNR is a
+// value + validity flag (not *float64) to avoid a heap escape per row.
+type pathRow struct {
+ observerPK string // lowercase pubkey of the observer (may be "")
+ fromPubkey string // lowercase originator pubkey (may be "")
+ payloadType int
+ path []string
+ snr float64
+ snrValid bool
+}
+
+type obsAgg struct {
+ count int
+ snrSum float64
+ snrN int
+}
+
+type dirCounts struct {
+ we map[string]int
+ they map[string]int
+ obs map[string]obsAgg // value map — no per-observer heap alloc
+ relay int
+}
+
+// attributeDirections walks each path and attributes directional evidence for
+// the target node (identified by any token in ourTokens). resolve maps a hop
+// token → a unique relay pubkey ("" when ambiguous/unknown → skipped). ourPK is
+// the target's own pubkey (lowercase) so self-edges are ignored.
+func attributeDirections(rows []pathRow, ourTokens map[string]bool, ourPK string, resolve func(string) string) dirCounts {
+ // Size hint: a small constant covers typical neighbour fan-out (dozens)
+ // without over-allocating ~12.5k buckets on a 100k-row scan. Independent
+ // r2 #4: the old `len(rows)/8+1` was ~250× too large for relays with
+ // modest fan-out.
+ const hint = 64
+ d := dirCounts{
+ we: make(map[string]int, hint),
+ they: make(map[string]int, hint),
+ obs: make(map[string]obsAgg, hint),
+ }
+ for _, r := range rows {
+ n := len(r.path)
+ if n == 0 {
+ continue
+ }
+ hit := false
+ for i, tok := range r.path {
+ if !ourTokens[tok] {
+ continue
+ }
+ hit = true
+ // predecessor → we heard it
+ if i > 0 {
+ if pk := resolve(r.path[i-1]); pk != "" && pk != ourPK {
+ d.we[pk]++
+ }
+ } else if r.payloadType == PayloadADVERT && r.fromPubkey != "" && r.fromPubkey != ourPK {
+ d.we[r.fromPubkey]++
+ }
+ // successor → it heard us; or if we're the last hop, the observer did
+ if i < n-1 {
+ if pk := resolve(r.path[i+1]); pk != "" && pk != ourPK {
+ d.they[pk]++
+ }
+ } else if r.observerPK != "" && r.observerPK != ourPK {
+ d.they[r.observerPK]++
+ a := d.obs[r.observerPK] // value copy; read-modify-write
+ a.count++
+ if r.snrValid {
+ a.snrSum += r.snr
+ a.snrN++
+ }
+ d.obs[r.observerPK] = a
+ }
+ }
+ if hit {
+ d.relay++
+ }
+ }
+ return d
+}
+
+// reliableTokens returns the uppercase hex prefixes (1, 2, 3 byte) of pubkey
+// that are UNIQUE among relay-capable nodes in pm AND resolve to pubkey itself.
+// 1-byte prefixes almost always collide and are excluded. The self-check matters
+// for non-relay targets (companion/sensor): pm only holds path-capable roles, so
+// a companion's prefix could otherwise be "unique" while pointing at an unrelated
+// relay — which would then credit that relay's traffic to the companion.
+func reliableTokens(pubkey string, pm *prefixMap) map[string]bool {
+ out := map[string]bool{}
+ lpk := strings.ToLower(pubkey)
+ for _, l := range []int{2, 4, 6} { // hex chars = 1,2,3 bytes
+ if len(lpk) < l {
+ continue
+ }
+ p := lpk[:l]
+ if pm != nil && len(pm.m[p]) == 1 && strings.EqualFold(pm.m[p][0].PublicKey, pubkey) {
+ out[strings.ToUpper(p)] = true
+ }
+ }
+ return out
+}
+
+// uniqueResolve returns the single relay pubkey (lowercase) for a hop token, or
+// "" when the token resolves to zero or multiple candidates (conservative).
+// Callers should memoize across a request (see newResolver) so the per-hop
+// ToLower + map lookup runs once per distinct token, not once per row.
+func uniqueResolve(pm *prefixMap, token string) string {
+ if pm == nil {
+ return ""
+ }
+ cands := pm.m[strings.ToLower(token)]
+ if len(cands) == 1 {
+ return strings.ToLower(cands[0].PublicKey)
+ }
+ return ""
+}
+
+// parsePathTokens extracts the quoted hex hop tokens from a path_json array
+// (e.g. `["AA","01FA","BB"]`) in a single pass, uppercased. Avoids the
+// json.Unmarshal reflection + per-row interface allocations on the hot scan
+// path. Tokens slice into pj (no copy) except where ToUpper must rewrite a
+// lowercase hop; path_json holds only hex strings, so there are no escapes to
+// worry about. Returns nil for an empty/degenerate array.
+func parsePathTokens(pj string) []string {
+ out := make([]string, 0, 8) // paths are short (a handful of hops)
+ i := 0
+ for {
+ q1 := strings.IndexByte(pj[i:], '"')
+ if q1 < 0 {
+ break
+ }
+ q1 += i
+ rel := strings.IndexByte(pj[q1+1:], '"')
+ if rel < 0 {
+ break
+ }
+ q2 := q1 + 1 + rel
+ out = append(out, strings.ToUpper(pj[q1+1:q2]))
+ i = q2 + 1
+ }
+ return out
+}
+
+// newResolver returns a memoized hop-token → pubkey resolver. Paths reuse the
+// same hop tokens across thousands of rows, so caching collapses the repeated
+// ToLower + prefix-map lookups to once per distinct token.
+func newResolver(pm *prefixMap) func(string) string {
+ cache := make(map[string]string)
+ return func(tok string) string {
+ if pk, ok := cache[tok]; ok {
+ return pk
+ }
+ pk := uniqueResolve(pm, tok)
+ cache[tok] = pk
+ return pk
+ }
+}
+
+type NodeReachInfo struct {
+ Pubkey string `json:"pubkey"`
+ Name string `json:"name"`
+ Role string `json:"role"`
+ Lat *float64 `json:"lat"`
+ Lon *float64 `json:"lon"`
+ FirstSeen string `json:"first_seen"`
+}
+type NodeReachWindow struct {
+ Days int `json:"days"`
+ Since string `json:"since"`
+}
+type NodeReachImportance struct {
+ NeighborDegree int `json:"neighbor_degree"`
+ DegreeRank int `json:"degree_rank"`
+ NodesWithEdges int `json:"nodes_with_edges"`
+ RelayObservations int `json:"relay_observations"`
+ BidirectionalLinks int `json:"bidirectional_links"`
+ DirectObservers int `json:"direct_observers"`
+}
+type NodeReachObserver struct {
+ Pubkey string `json:"pubkey"`
+ Name string `json:"name"`
+ Count int `json:"count"`
+ AvgSNR *float64 `json:"avg_snr"`
+ Lat *float64 `json:"lat"`
+ Lon *float64 `json:"lon"`
+ DistanceKm *float64 `json:"distance_km"`
+}
+type NodeReachLink struct {
+ Pubkey string `json:"pubkey"`
+ Name string `json:"name"`
+ Role string `json:"role"`
+ Lat *float64 `json:"lat"`
+ Lon *float64 `json:"lon"`
+ WeHear int `json:"we_hear"`
+ TheyHear int `json:"they_hear"`
+ Bottleneck int `json:"bottleneck"`
+ Bidir bool `json:"bidir"`
+ DistanceKm *float64 `json:"distance_km"`
+}
+type NodeReachResponse struct {
+ Node NodeReachInfo `json:"node"`
+ Window NodeReachWindow `json:"window"`
+ ReliableTokens []string `json:"reliable_tokens"`
+ Importance NodeReachImportance `json:"importance"`
+ DirectObservers []NodeReachObserver `json:"direct_observers"`
+ Links []NodeReachLink `json:"links"`
+}
+
+func fptr(v float64) *float64 { return &v }
+
+// gpsPtrs returns (lat,lon) pointers, nil when the node has no GPS.
+func gpsPtrs(info nodeInfo) (*float64, *float64) {
+ if !info.HasGPS {
+ return nil, nil
+ }
+ return fptr(info.Lat), fptr(info.Lon)
+}
+
+// clampDays bounds the lookback window to [1,30]; default callers pass 7.
+func clampDays(d int) int {
+ if d < 1 {
+ return 1
+ }
+ if d > 30 {
+ return 30
+ }
+ return d
+}
+
+// --- bounded TTL cache. perf is gated by the time window; this just avoids
+// recompute under dashboard polling. Keyed "pubkey|days". ---
+//
+// reachCacheMax bounds entry count; at ~2KB of marshalled JSON per entry the
+// worst case is well under 1MB, so an entry cap (rather than a byte budget)
+// keeps the bookkeeping trivial while staying memory-safe.
+const (
+ reachCacheTTL = 5 * time.Minute
+ reachCacheMax = 256
+)
+
+type reachCacheEntry struct {
+ at time.Time
+ raw []byte
+}
+
+// reachState bundles per-server reach caches. Was a set of package-level
+// globals — moved onto *Server so two Server instances (tests, future
+// per-listener) don't share observable state (Independent r2 #2).
+type reachState struct {
+ cacheMu sync.RWMutex
+ cache map[string]reachCacheEntry
+ // sf dedups concurrent cold-cache requests for the same key so N
+ // simultaneous callers run the scan + attribution once, not N times.
+ sf singleflight.Group
+
+ degreeMu sync.Mutex
+ degreeSnap *degreeSnapshot
+}
+
+// reachCacheGet returns the cached marshalled JSON for key. The returned slice
+// is shared (not copied): it is treated as immutable — only ever handed to
+// w.Write — so callers MUST NOT mutate it.
+func (s *Server) reachCacheGet(key string) ([]byte, bool) {
+ s.reach.cacheMu.RLock()
+ defer s.reach.cacheMu.RUnlock()
+ e, ok := s.reach.cache[key]
+ if !ok || time.Since(e.at) > reachCacheTTL {
+ return nil, false
+ }
+ return e.raw, true
+}
+
+// isHexPubkey reports whether s is a full 64-char lowercase-hex public key.
+// The handler lowercases input first, so we only accept [0-9a-f].
+func isHexPubkey(s string) bool {
+ if len(s) != 64 {
+ return false
+ }
+ for i := 0; i < len(s); i++ {
+ c := s[i]
+ if !(c >= '0' && c <= '9' || c >= 'a' && c <= 'f') {
+ return false
+ }
+ }
+ return true
+}
+
+func (s *Server) reachCachePut(key string, raw []byte) {
+ s.reach.cacheMu.Lock()
+ defer s.reach.cacheMu.Unlock()
+ if s.reach.cache == nil {
+ s.reach.cache = map[string]reachCacheEntry{}
+ }
+ if _, exists := s.reach.cache[key]; !exists && len(s.reach.cache) >= reachCacheMax {
+ s.evictReachLocked()
+ }
+ s.reach.cache[key] = reachCacheEntry{at: time.Now(), raw: raw}
+}
+
+// evictReachLocked drops expired entries first; if still at the cap it evicts
+// the single oldest entry. Avoids the full-map wipe that thrashed every cached
+// key once the cap was reached. Caller holds s.reach.cacheMu (write).
+func (s *Server) evictReachLocked() {
+ now := time.Now()
+ for k, e := range s.reach.cache {
+ if now.Sub(e.at) > reachCacheTTL {
+ delete(s.reach.cache, k)
+ }
+ }
+ if len(s.reach.cache) < reachCacheMax {
+ return
+ }
+ var oldestKey string
+ var oldestAt time.Time
+ first := true
+ for k, e := range s.reach.cache {
+ if first || e.at.Before(oldestAt) {
+ oldestKey, oldestAt, first = k, e.at, false
+ }
+ }
+ if !first {
+ delete(s.reach.cache, oldestKey)
+ }
+}
+
+func (s *Server) handleNodeReach(w http.ResponseWriter, r *http.Request) {
+ pubkey := strings.ToLower(mux.Vars(r)["pubkey"])
+ // Reject malformed pubkeys up front (cheap defense against cache-key
+ // pollution + wasted work on bogus IDs).
+ if !isHexPubkey(pubkey) {
+ writeError(w, 400, "invalid pubkey: expected 64 hex chars")
+ return
+ }
+ if s.cfg != nil && s.cfg.IsBlacklisted(pubkey) {
+ writeError(w, 404, "Not found")
+ return
+ }
+ days := 7
+ if v := r.URL.Query().Get("days"); v != "" {
+ if n, err := strconv.Atoi(v); err == nil {
+ days = n
+ }
+ }
+ days = clampDays(days)
+
+ cacheKey := pubkey + "|" + strconv.Itoa(days)
+ if raw, ok := s.reachCacheGet(cacheKey); ok {
+ w.Header().Set("Content-Type", "application/json")
+ w.Write(raw)
+ return
+ }
+
+ // singleflight: collapse a thundering herd on a cold key to one scan. The
+ // shared computation uses the triggering request's context; a disconnect
+ // there can cancel the in-flight scan for all waiters (acceptable — the
+ // next request recomputes).
+ v, err, _ := s.reach.sf.Do(cacheKey, func() (interface{}, error) {
+ if raw, ok := s.reachCacheGet(cacheKey); ok {
+ return raw, nil
+ }
+ resp, ok := s.computeNodeReach(r.Context(), pubkey, days)
+ if !ok {
+ return []byte(nil), nil
+ }
+ raw, mErr := json.Marshal(resp)
+ if mErr != nil {
+ log.Printf("[reach] marshal failed for %s: %v", cacheKey, mErr)
+ return nil, mErr
+ }
+ s.reachCachePut(cacheKey, raw)
+ return raw, nil
+ })
+ if err != nil {
+ writeError(w, 500, "reach computation failed")
+ return
+ }
+ raw, _ := v.([]byte)
+ if len(raw) == 0 {
+ writeError(w, 404, "Not found")
+ return
+ }
+ w.Header().Set("Content-Type", "application/json")
+ w.Write(raw)
+}
+
+// computeNodeReach does the read-only scan + assembly. ok=false → 404.
+func (s *Server) computeNodeReach(ctx context.Context, pubkey string, days int) (NodeReachResponse, bool) {
+ if s.store == nil || s.db == nil || s.db.conn == nil {
+ return NodeReachResponse{}, false
+ }
+ nodeMap := s.buildNodeInfoMap()
+ self, found := nodeMap[pubkey]
+ if !found {
+ return NodeReachResponse{}, false
+ }
+ _, pm := s.store.getCachedNodesAndPM()
+ tokens := reliableTokens(pubkey, pm)
+
+ since := time.Now().UTC().Add(-time.Duration(days) * 24 * time.Hour)
+ sinceEpoch := since.Unix()
+
+ var d dirCounts
+ if len(tokens) > 0 {
+ rows := s.scanReachRows(ctx, tokens, sinceEpoch)
+ d = attributeDirections(rows, tokens, pubkey, newResolver(pm))
+ } else {
+ d = dirCounts{we: map[string]int{}, they: map[string]int{}, obs: map[string]obsAgg{}}
+ }
+
+ // importance: neighbor_edges degree + rank (all-time). Served from a
+ // coarse-TTL snapshot so the full UNION+GROUP-BY aggregate runs at most
+ // once per snapshotTTL, not on every cache miss.
+ degree, rank, nodesWithEdges := s.reachDegreeRank(ctx, pubkey)
+
+ // node first_seen comes from nodeInfo (buildNodeInfoMap folds it in via a
+ // single bulk SELECT). Missing → empty string (the node may be
+ // observer-only or pre-first_seen-schema).
+ firstSeen := self.FirstSeen
+
+ // assemble links
+ links := make([]NodeReachLink, 0, len(d.we)+len(d.they))
+ bidir := 0
+ seen := make(map[string]bool, len(d.we)+len(d.they))
+ for pk := range d.we {
+ seen[pk] = true
+ }
+ for pk := range d.they {
+ seen[pk] = true
+ }
+ for pk := range seen {
+ we, they := d.we[pk], d.they[pk]
+ info := nodeMap[pk]
+ lat, lon := gpsPtrs(info)
+ var dist *float64
+ if self.HasGPS && info.HasGPS {
+ dist = fptr(haversineKm(self.Lat, self.Lon, info.Lat, info.Lon))
+ }
+ b := we > 0 && they > 0
+ if b {
+ bidir++
+ }
+ links = append(links, NodeReachLink{
+ Pubkey: pk, Name: info.Name, Role: info.Role, Lat: lat, Lon: lon,
+ WeHear: we, TheyHear: they, Bottleneck: min(we, they), Bidir: b, DistanceKm: dist,
+ })
+ }
+ sort.Slice(links, func(i, j int) bool {
+ if links[i].Bidir != links[j].Bidir {
+ return links[i].Bidir
+ }
+ if links[i].Bottleneck != links[j].Bottleneck {
+ return links[i].Bottleneck > links[j].Bottleneck
+ }
+ return links[i].WeHear+links[i].TheyHear > links[j].WeHear+links[j].TheyHear
+ })
+
+ // direct observers
+ directObs := make([]NodeReachObserver, 0, len(d.obs))
+ for pk, a := range d.obs {
+ info := nodeMap[pk]
+ lat, lon := gpsPtrs(info)
+ var avg, dist *float64
+ if a.snrN > 0 {
+ avg = fptr(a.snrSum / float64(a.snrN))
+ }
+ if self.HasGPS && info.HasGPS {
+ dist = fptr(haversineKm(self.Lat, self.Lon, info.Lat, info.Lon))
+ }
+ directObs = append(directObs, NodeReachObserver{
+ Pubkey: pk, Name: info.Name, Count: a.count, AvgSNR: avg, Lat: lat, Lon: lon, DistanceKm: dist,
+ })
+ }
+ sort.Slice(directObs, func(i, j int) bool { return directObs[i].Count > directObs[j].Count })
+
+ toks := make([]string, 0, len(tokens))
+ for t := range tokens {
+ toks = append(toks, t)
+ }
+ sort.Strings(toks)
+
+ selfLat, selfLon := gpsPtrs(self)
+ return NodeReachResponse{
+ Node: NodeReachInfo{Pubkey: pubkey, Name: self.Name, Role: self.Role,
+ Lat: selfLat, Lon: selfLon, FirstSeen: firstSeen},
+ Window: NodeReachWindow{Days: days, Since: since.Format(time.RFC3339)},
+ ReliableTokens: toks,
+ Importance: NodeReachImportance{
+ NeighborDegree: degree, DegreeRank: rank, NodesWithEdges: nodesWithEdges,
+ RelayObservations: d.relay, BidirectionalLinks: bidir, DirectObservers: len(directObs),
+ },
+ DirectObservers: directObs,
+ Links: links,
+ }, true
+}
+
+// --- neighbor-degree snapshot ---------------------------------------------
+// The degree/rank importance is identical across all reach requests except the
+// pubkey match, so the full neighbor_edges aggregate is computed once and shared
+// behind a coarse TTL. Rank is a binary search over the descending degree list.
+const reachDegreeTTL = 60 * time.Second
+
+type degreeSnapshot struct {
+ at time.Time
+ total int // nodes that have any edge
+ deg map[string]int // lowercase pubkey → neighbour count
+ sortedDesc []int // degrees sorted descending, for rank
+}
+
+func (s *Server) reachDegreeRank(ctx context.Context, pubkey string) (degree, rank, total int) {
+ snap := s.getDegreeSnapshot(ctx)
+ if snap == nil {
+ return 0, 0, 0
+ }
+ degree = snap.deg[pubkey]
+ if degree == 0 {
+ // No edges → not ranked. rank=0 is the documented "off-the-list" value;
+ // avoids the nonsensical "#N+1 / N" the binary search would produce.
+ return 0, 0, snap.total
+ }
+ // rank = 1 + (number of nodes with strictly higher degree). sortedDesc is
+ // descending, so the count of entries > degree is the first index whose
+ // value is <= degree.
+ rank = 1 + sort.Search(len(snap.sortedDesc), func(i int) bool { return snap.sortedDesc[i] <= degree })
+ return degree, rank, snap.total
+}
+
+func (s *Server) getDegreeSnapshot(ctx context.Context) *degreeSnapshot {
+ // Fast path: serve a fresh snapshot under a short lock.
+ s.reach.degreeMu.Lock()
+ if s.reach.degreeSnap != nil && time.Since(s.reach.degreeSnap.at) < reachDegreeTTL {
+ snap := s.reach.degreeSnap
+ s.reach.degreeMu.Unlock()
+ return snap
+ }
+ stale := s.reach.degreeSnap
+ s.reach.degreeMu.Unlock()
+
+ // Rebuild WITHOUT holding the lock so concurrent reach requests aren't
+ // serialized behind the aggregate query. A brief cold-start herd may run a
+ // few redundant queries; the last writer wins.
+ rows, err := s.db.conn.QueryContext(ctx, `
+ SELECT pk, COUNT(*) neigh FROM (
+ SELECT node_a pk FROM neighbor_edges
+ UNION ALL SELECT node_b FROM neighbor_edges
+ ) GROUP BY pk`)
+ if err != nil {
+ log.Printf("[reach] degree snapshot query failed: %v (serving stale)", err)
+ return stale // serve stale on error rather than zeroing
+ }
+ defer rows.Close()
+ deg := make(map[string]int)
+ var sortedDesc []int
+ for rows.Next() {
+ var pk string
+ var neigh int
+ if rows.Scan(&pk, &neigh) != nil {
+ continue
+ }
+ deg[strings.ToLower(pk)] = neigh
+ sortedDesc = append(sortedDesc, neigh)
+ }
+ sort.Sort(sort.Reverse(sort.IntSlice(sortedDesc)))
+ snap := °reeSnapshot{at: time.Now(), total: len(deg), deg: deg, sortedDesc: sortedDesc}
+ s.reach.degreeMu.Lock()
+ s.reach.degreeSnap = snap
+ s.reach.degreeMu.Unlock()
+ return snap
+}
+
+// scanReachRows reads windowed observations whose path contains any reliable
+// token, with the originator + observer + snr needed for attribution. Observer
+// id and originator pubkey are lowercased in SQL (not per row), the path slice
+// is uppercased in place (no second allocation), and the result is hard-capped
+// at reachScanRowLimit.
+func (s *Server) scanReachRows(ctx context.Context, tokens map[string]bool, sinceEpoch int64) []pathRow {
+ if len(tokens) == 0 {
+ return nil // defensive: an empty LIKE chain would render `AND ()` (SQL error)
+ }
+ likes := make([]string, 0, len(tokens))
+ args := []interface{}{sinceEpoch}
+ // Sort tokens so the generated SQL text is byte-stable across requests
+ // with the same token set — preserves the driver's prepared-statement
+ // cache and keeps query plans reproducible (Independent r2 #3).
+ toks := make([]string, 0, len(tokens))
+ for tok := range tokens {
+ toks = append(toks, tok)
+ }
+ sort.Strings(toks)
+ for _, tok := range toks {
+ likes = append(likes, "o.path_json LIKE ?")
+ args = append(args, "%\""+tok+"\"%")
+ }
+ q := `SELECT LOWER(COALESCE(obs.id,'')), LOWER(COALESCE(t.from_pubkey,'')), COALESCE(t.payload_type,0), o.path_json, o.snr
+ FROM observations o
+ JOIN transmissions t ON t.id = o.transmission_id
+ LEFT JOIN observers obs ON obs.rowid = o.observer_idx
+ WHERE o.timestamp >= ? AND (` + strings.Join(likes, " OR ") + `)
+ LIMIT ?`
+ args = append(args, reachScanRowLimit)
+ rows, err := s.db.conn.QueryContext(ctx, q, args...)
+ if err != nil {
+ log.Printf("[reach] scan query failed: %v", err)
+ return nil
+ }
+ defer rows.Close()
+ // Modest preallocation: most nodes return far fewer than the cap, so seed a
+ // reasonable capacity rather than reserving reachScanRowLimit up front.
+ out := make([]pathRow, 0, 2048)
+ var skipped int // malformed/empty rows discarded — surfaced below so ingest bugs aren't silent
+ for rows.Next() {
+ var oid, fpk, pj string
+ var pt int
+ var snr sql.NullFloat64
+ if err := rows.Scan(&oid, &fpk, &pt, &pj, &snr); err != nil {
+ skipped++
+ continue
+ }
+ path := parsePathTokens(pj)
+ if len(path) == 0 {
+ skipped++
+ continue
+ }
+ pr := pathRow{observerPK: oid, fromPubkey: fpk, payloadType: pt, path: path}
+ if snr.Valid {
+ pr.snr = snr.Float64
+ pr.snrValid = true
+ }
+ out = append(out, pr)
+ }
+ if skipped > 0 {
+ log.Printf("[reach] scan discarded %d malformed/empty rows (kept %d)", skipped, len(out))
+ }
+ return out
+}
diff --git a/cmd/server/node_reach_bench_test.go b/cmd/server/node_reach_bench_test.go
new file mode 100644
index 00000000..efce7731
--- /dev/null
+++ b/cmd/server/node_reach_bench_test.go
@@ -0,0 +1,151 @@
+package main
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "testing"
+
+ _ "modernc.org/sqlite"
+)
+
+// benchReachDB builds an in-memory DB with nObs observations. matchEvery
+// controls payload mix: 1 = every row contains the "01FA" token (worst case),
+// 2 = every other row matches (the rest carry an unrelated path), etc. This
+// lets benches measure the scan over a realistic mix, not just all-matching.
+func benchReachDB(b *testing.B, nObs, matchEvery int, lowerHops bool) *DB {
+ b.Helper()
+ if matchEvery < 1 {
+ matchEvery = 1
+ }
+ matchPath, fillerPath := `["AA","01FA","BB"]`, `["AA","CC","BB"]`
+ if lowerHops {
+ // Lowercase hops force parsePathTokens' ToUpper to allocate (production
+ // path_json is uppercase; this measures the worst case Carmack flagged).
+ matchPath, fillerPath = `["aa","01fa","bb"]`, `["aa","cc","bb"]`
+ }
+ conn, err := sql.Open("sqlite", ":memory:")
+ if err != nil {
+ b.Fatal(err)
+ }
+ schema := []string{
+ `CREATE TABLE transmissions (id INTEGER PRIMARY KEY, hash TEXT, first_seen TEXT, payload_type INTEGER, from_pubkey TEXT)`,
+ `CREATE TABLE observers (id TEXT PRIMARY KEY, name TEXT)`,
+ `CREATE TABLE observations (id INTEGER PRIMARY KEY, transmission_id INTEGER, observer_idx INTEGER, snr REAL, path_json TEXT, timestamp INTEGER)`,
+ `CREATE INDEX idx_obs_ts ON observations(timestamp)`,
+ }
+ for _, s := range schema {
+ if _, err := conn.Exec(s); err != nil {
+ b.Fatal(err)
+ }
+ }
+ tx, err := conn.Begin()
+ if err != nil {
+ b.Fatal(err)
+ }
+ if _, err := tx.Exec(`INSERT INTO observers (id, name) VALUES ('OBS', 'o')`); err != nil {
+ b.Fatal(err)
+ }
+ for i := 0; i < nObs; i++ {
+ if _, err := tx.Exec(`INSERT INTO transmissions (id, hash, first_seen, payload_type, from_pubkey) VALUES (?,?,?,5,'')`,
+ i, fmt.Sprintf("h%d", i), "2026-06-07T00:00:00Z"); err != nil {
+ b.Fatal(err)
+ }
+ path := fillerPath // non-matching filler
+ if i%matchEvery == 0 {
+ path = matchPath
+ }
+ if _, err := tx.Exec(`INSERT INTO observations (id, transmission_id, observer_idx, snr, path_json, timestamp) VALUES (?,?,1,-7.0,?,?)`,
+ i, i, path, 1000); err != nil {
+ b.Fatal(err)
+ }
+ }
+ if err := tx.Commit(); err != nil {
+ b.Fatal(err)
+ }
+ return &DB{conn: conn}
+}
+
+// BenchmarkNodeReachScan measures the windowed scan + path-decode at increasing
+// scale, all-matching (worst case for memory/allocs).
+func BenchmarkNodeReachScan(b *testing.B) {
+ tokens := map[string]bool{"01FA": true}
+ for _, n := range []int{1000, 10000, 100000} {
+ b.Run(fmt.Sprintf("rows=%d", n), func(b *testing.B) {
+ db := benchReachDB(b, n, 1, false)
+ srv := &Server{db: db}
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ rows := srv.scanReachRows(context.Background(), tokens, 0)
+ if len(rows) == 0 {
+ b.Fatal("expected rows")
+ }
+ }
+ })
+ }
+}
+
+// BenchmarkNodeReachScanMixed measures the scan when only half the windowed
+// rows actually contain the token — closer to production path mixes.
+func BenchmarkNodeReachScanMixed(b *testing.B) {
+ tokens := map[string]bool{"01FA": true}
+ db := benchReachDB(b, 100000, 2, false)
+ srv := &Server{db: db}
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ rows := srv.scanReachRows(context.Background(), tokens, 0)
+ if len(rows) == 0 {
+ b.Fatal("expected rows")
+ }
+ }
+}
+
+// BenchmarkNodeReachScanLowerCase measures the worst case for path decoding:
+// lowercase hops force parsePathTokens' ToUpper to allocate a new string per
+// hop (production path_json is uppercase, where ToUpper is a no-op). Publishing
+// this alongside the all-uppercase numbers keeps the perf claims honest.
+func BenchmarkNodeReachScanLowerCase(b *testing.B) {
+ tokens := map[string]bool{"01FA": true}
+ db := benchReachDB(b, 100000, 1, true)
+ srv := &Server{db: db}
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ rows := srv.scanReachRows(context.Background(), tokens, 0)
+ if len(rows) == 0 {
+ b.Fatal("expected rows")
+ }
+ }
+}
+
+// BenchmarkNodeReachAttribute measures the directional attribution pass over an
+// already-scanned row set (the in-memory hot loop + map building), isolated
+// from DB I/O.
+func BenchmarkNodeReachAttribute(b *testing.B) {
+ tokens := map[string]bool{"01FA": true}
+ db := benchReachDB(b, 100000, 1, false)
+ srv := &Server{db: db}
+ rows := srv.scanReachRows(context.Background(), tokens, 0)
+ if len(rows) == 0 {
+ b.Fatal("expected rows")
+ }
+ resolve := func(tok string) string {
+ switch tok {
+ case "AA":
+ return "aa00000000000000"
+ case "BB":
+ return "bb00000000000000"
+ }
+ return ""
+ }
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ d := attributeDirections(rows, tokens, "01fa326b", resolve)
+ if d.relay == 0 {
+ b.Fatal("expected relay hits")
+ }
+ }
+}
diff --git a/cmd/server/node_reach_endpoint_test.go b/cmd/server/node_reach_endpoint_test.go
new file mode 100644
index 00000000..c740c8d4
--- /dev/null
+++ b/cmd/server/node_reach_endpoint_test.go
@@ -0,0 +1,258 @@
+package main
+
+import (
+ "database/sql"
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/gorilla/mux"
+ _ "modernc.org/sqlite"
+)
+
+func serveReach(srv *Server, path string) *httptest.ResponseRecorder {
+ router := mux.NewRouter()
+ router.HandleFunc("/api/nodes/{pubkey}/reach", srv.handleNodeReach).Methods("GET")
+ req := httptest.NewRequest("GET", path, nil)
+ rr := httptest.NewRecorder()
+ router.ServeHTTP(rr, req)
+ return rr
+}
+
+// pk64 pads a short hex stem to a full 64-char lowercase pubkey.
+func pk64(stem string) string { return stem + strings.Repeat("0", 64-len(stem)) }
+
+// resetReachState clears the per-server reach caches so test order cannot
+// leak observable state between handler tests (and restores after the test).
+// Now operates on *Server (was package globals — Independent r2 #2); accepts
+// a variadic *Server so existing call sites that didn't pass one still
+// compile but the reset is a no-op (used by tests that build the Server
+// fresh and don't need state cleared).
+func resetReachState(t *testing.T, servers ...*Server) {
+ t.Helper()
+ clear := func() {
+ for _, s := range servers {
+ if s == nil {
+ continue
+ }
+ s.reach.cacheMu.Lock()
+ s.reach.cache = map[string]reachCacheEntry{}
+ s.reach.cacheMu.Unlock()
+ s.reach.degreeMu.Lock()
+ s.reach.degreeSnap = nil
+ s.reach.degreeMu.Unlock()
+ }
+ }
+ clear()
+ t.Cleanup(clear)
+}
+
+// newReachIntegrationDB builds a complete observer_idx-schema DB with a target
+// node N, two neighbours A/B, and one observation on obsPath so the HTTP handler
+// exercises real directional attribution. Pass a path that omits N's token to
+// build the zero-reach case (identifiable node, no matching observations).
+func newReachIntegrationDB(t *testing.T, obsPath string) (*DB, string) {
+ t.Helper()
+ conn, err := sql.Open("sqlite", ":memory:")
+ if err != nil {
+ t.Fatal(err)
+ }
+ n := pk64("01fa") // target — unique 2-byte token "01fa"
+ a := pk64("aabb") // predecessor → we hear A
+ b := pk64("ccdd") // successor → B hears us
+ now := time.Now().Unix()
+ stmts := []string{
+ `CREATE TABLE nodes (public_key TEXT, name TEXT, role TEXT, lat REAL, lon REAL, last_seen TEXT, first_seen TEXT, advert_count INTEGER)`,
+ `CREATE TABLE transmissions (id INTEGER PRIMARY KEY, from_pubkey TEXT, payload_type INTEGER)`,
+ `CREATE TABLE observers (id TEXT)`,
+ `CREATE TABLE observations (id INTEGER PRIMARY KEY, transmission_id INTEGER, observer_idx INTEGER, snr REAL, path_json TEXT, timestamp INTEGER)`,
+ `CREATE TABLE neighbor_edges (node_a TEXT, node_b TEXT, count INTEGER)`,
+ }
+ for _, s := range stmts {
+ if _, err := conn.Exec(s); err != nil {
+ t.Fatal(err)
+ }
+ }
+ ins := []struct {
+ q string
+ args []interface{}
+ }{
+ {`INSERT INTO nodes VALUES (?, 'N', 'repeater', 50.9, 5.4, ?, '2026-06-01T00:00:00Z', 3)`, []interface{}{n, "2026-06-07T00:00:00Z"}},
+ {`INSERT INTO nodes VALUES (?, 'A', 'repeater', 51.0, 5.5, ?, '2026-06-01T00:00:00Z', 1)`, []interface{}{a, "2026-06-07T00:00:00Z"}},
+ {`INSERT INTO nodes VALUES (?, 'B', 'repeater', 51.1, 5.6, ?, '2026-06-01T00:00:00Z', 1)`, []interface{}{b, "2026-06-07T00:00:00Z"}},
+ {`INSERT INTO observers (id) VALUES ('OBS1')`, nil},
+ {`INSERT INTO transmissions (id, from_pubkey, payload_type) VALUES (1, '', 5)`, nil},
+ {`INSERT INTO observations (id, transmission_id, observer_idx, snr, path_json, timestamp) VALUES (1,1,1,-7.0,?,?)`, []interface{}{obsPath, now}},
+ }
+ for _, in := range ins {
+ if _, err := conn.Exec(in.q, in.args...); err != nil {
+ t.Fatal(err)
+ }
+ }
+ return &DB{conn: conn, isV3: true}, n
+}
+
+func TestClampDays(t *testing.T) {
+ cases := []struct{ in, want int }{{0, 1}, {-5, 1}, {1, 1}, {7, 7}, {30, 30}, {31, 30}, {999, 30}}
+ for _, c := range cases {
+ if got := clampDays(c.in); got != c.want {
+ t.Errorf("clampDays(%d)=%d want %d", c.in, got, c.want)
+ }
+ }
+}
+
+func TestNodeReach_UnknownNode(t *testing.T) {
+ srv := makeTestServer(makeTestGraph()) // no store/db wired → 404
+ rr := serveReach(srv, "/api/nodes/"+pk64("deadbeef")+"/reach")
+ if rr.Code != http.StatusNotFound {
+ t.Fatalf("status=%d want 404", rr.Code)
+ }
+}
+
+func TestNodeReach_InvalidPubkey(t *testing.T) {
+ srv := makeTestServer(makeTestGraph())
+ for _, bad := range []string{"deadbeef", "xyz", pk64("01") + "zz"} {
+ rr := serveReach(srv, "/api/nodes/"+bad+"/reach")
+ if rr.Code != http.StatusBadRequest {
+ t.Errorf("pubkey %q: status=%d want 400", bad, rr.Code)
+ }
+ }
+}
+
+func TestNodeReach_ValidPubkeyNotInNodes(t *testing.T) {
+ resetReachState(t)
+ db := setupTestDBv2(t)
+ cfg := &Config{}
+ srv := &Server{store: newTestStoreWithDB(t, db, cfg), db: db, cfg: cfg, perfStats: NewPerfStats()}
+ // Syntactically valid pubkey that was never inserted → real 404 path.
+ rr := serveReach(srv, "/api/nodes/"+pk64("beef")+"/reach")
+ if rr.Code != http.StatusNotFound {
+ t.Fatalf("status=%d want 404 (body=%s)", rr.Code, rr.Body.String())
+ }
+}
+
+func TestNodeReach_BlacklistedReturns404(t *testing.T) {
+ pk := pk64("01fa")
+ cfg := &Config{NodeBlacklist: []string{pk}}
+ srv := &Server{cfg: cfg}
+ rr := serveReach(srv, "/api/nodes/"+pk+"/reach")
+ if rr.Code != http.StatusNotFound {
+ t.Fatalf("blacklisted pubkey: status=%d want 404", rr.Code)
+ }
+}
+
+func TestNodeReach_AttributionAndCacheHit(t *testing.T) {
+ resetReachState(t)
+ db, n := newReachIntegrationDB(t, `["AABB","01FA","CCDD"]`)
+ defer db.conn.Close()
+ cfg := &Config{}
+ srv := &Server{store: newTestStoreWithDB(t, db, cfg), db: db, cfg: cfg, perfStats: NewPerfStats()}
+
+ rr := serveReach(srv, "/api/nodes/"+n+"/reach?days=30")
+ if rr.Code != http.StatusOK {
+ t.Fatalf("status=%d want 200 (body=%s)", rr.Code, rr.Body.String())
+ }
+ var resp NodeReachResponse
+ if err := json.Unmarshal(rr.Body.Bytes(), &resp); err != nil {
+ t.Fatalf("bad json: %v", err)
+ }
+ if resp.Importance.RelayObservations < 1 {
+ t.Fatalf("expected ≥1 relay observation, got %d", resp.Importance.RelayObservations)
+ }
+ var weHearA, theyHearB bool
+ for _, l := range resp.Links {
+ if l.Name == "A" && l.WeHear >= 1 {
+ weHearA = true
+ }
+ if l.Name == "B" && l.TheyHear >= 1 {
+ theyHearB = true
+ }
+ }
+ if !weHearA {
+ t.Errorf("expected we_hear≥1 for neighbour A, links=%+v", resp.Links)
+ }
+ if !theyHearB {
+ t.Errorf("expected they_hear≥1 for neighbour B, links=%+v", resp.Links)
+ }
+
+ // Cache hit: the key must now be populated and a second request must 200.
+ if _, ok := srv.reachCacheGet(n + "|30"); !ok {
+ t.Fatalf("expected reach response to be cached under %q", n+"|30")
+ }
+ rr2 := serveReach(srv, "/api/nodes/"+n+"/reach?days=30")
+ if rr2.Code != http.StatusOK || rr2.Body.String() != rr.Body.String() {
+ t.Fatalf("cache-hit response differs: code=%d", rr2.Code)
+ }
+}
+
+// Zero-reach happy path: a node that IS identifiable (has reliable tokens) but
+// whose observations contain none of its tokens must return 200 with empty
+// arrays — NOT 404. A wrong implementation that 404s here passes every other
+// test. (docs/api-spec.md contract.)
+func TestNodeReach_ZeroReach(t *testing.T) {
+ resetReachState(t)
+ db, n := newReachIntegrationDB(t, `["AABB","CCDD"]`) // path omits N's "01FA" token
+ defer db.conn.Close()
+ cfg := &Config{}
+ srv := &Server{store: newTestStoreWithDB(t, db, cfg), db: db, cfg: cfg, perfStats: NewPerfStats()}
+
+ rr := serveReach(srv, "/api/nodes/"+n+"/reach?days=30")
+ if rr.Code != http.StatusOK {
+ t.Fatalf("zero-reach must be 200 not 404, got %d (body=%s)", rr.Code, rr.Body.String())
+ }
+ var resp NodeReachResponse
+ if err := json.Unmarshal(rr.Body.Bytes(), &resp); err != nil {
+ t.Fatalf("bad json: %v", err)
+ }
+ if len(resp.ReliableTokens) == 0 {
+ t.Fatalf("node should still be identifiable (reliable tokens present)")
+ }
+ if len(resp.Links) != 0 || len(resp.DirectObservers) != 0 || resp.Importance.RelayObservations != 0 {
+ t.Fatalf("expected empty reach, got links=%d obs=%d relay=%d",
+ len(resp.Links), len(resp.DirectObservers), resp.Importance.RelayObservations)
+ }
+}
+
+func TestNodeReach_ShapeAndClamp(t *testing.T) {
+ resetReachState(t)
+ db := setupTestDBv2(t)
+ const pk = "01fa326b475800a31105abcb9e4cac000b3e5d9e2b5ba0739981ce8d5f3a6754"
+ mustExecDB(t, db, `INSERT INTO nodes (public_key, name, role, lat, lon, last_seen, first_seen, advert_count)
+ VALUES ('`+pk+`', 'BE-Test', 'repeater', 50.9, 5.4, '2026-06-07T00:00:00Z', '2026-06-01T00:00:00Z', 3)`)
+
+ cfg := &Config{}
+ srv := &Server{store: newTestStoreWithDB(t, db, cfg), db: db, cfg: cfg, perfStats: NewPerfStats()}
+
+ rr := serveReach(srv, "/api/nodes/"+pk+"/reach?days=999")
+ if rr.Code != http.StatusOK {
+ t.Fatalf("status=%d want 200 (body=%s)", rr.Code, rr.Body.String())
+ }
+ var resp NodeReachResponse
+ if err := json.Unmarshal(rr.Body.Bytes(), &resp); err != nil {
+ t.Fatalf("bad json: %v", err)
+ }
+ if resp.Window.Days != 30 {
+ t.Fatalf("days not clamped to 30: %d", resp.Window.Days)
+ }
+ if resp.Links == nil || resp.DirectObservers == nil || resp.ReliableTokens == nil {
+ t.Fatalf("array fields must be non-nil (never null)")
+ }
+ if !contains(resp.ReliableTokens, "01FA") {
+ t.Fatalf("expected 01FA reliable token, got %v", resp.ReliableTokens)
+ }
+ if resp.Node.FirstSeen != "2026-06-01T00:00:00Z" {
+ t.Fatalf("first_seen not sourced from nodes table: %q", resp.Node.FirstSeen)
+ }
+}
+
+func contains(s []string, v string) bool {
+ for _, x := range s {
+ if x == v {
+ return true
+ }
+ }
+ return false
+}
diff --git a/cmd/server/node_reach_test.go b/cmd/server/node_reach_test.go
new file mode 100644
index 00000000..f576cf03
--- /dev/null
+++ b/cmd/server/node_reach_test.go
@@ -0,0 +1,291 @@
+package main
+
+import (
+ "context"
+ "database/sql"
+ "strconv"
+ "testing"
+
+ _ "modernc.org/sqlite"
+)
+
+// newReachScanTestDB builds a minimal observer_idx-schema DB with two rows whose
+// path contains "01FA" and one that does not, for scanReachRows coverage.
+func newReachScanTestDB(t *testing.T) *DB {
+ t.Helper()
+ conn, err := sql.Open("sqlite", ":memory:")
+ if err != nil {
+ t.Fatal(err)
+ }
+ stmts := []string{
+ `CREATE TABLE transmissions (id INTEGER PRIMARY KEY, from_pubkey TEXT, payload_type INTEGER)`,
+ `CREATE TABLE observers (id TEXT)`,
+ `CREATE TABLE observations (id INTEGER PRIMARY KEY, transmission_id INTEGER, observer_idx INTEGER, snr REAL, path_json TEXT, timestamp INTEGER)`,
+ `INSERT INTO observers (id) VALUES ('OBS1')`, // rowid 1
+ `INSERT INTO transmissions (id, from_pubkey, payload_type) VALUES (1,'FF00',4),(2,'',5),(3,'',5)`,
+ `INSERT INTO observations (id, transmission_id, observer_idx, snr, path_json, timestamp) VALUES
+ (1,1,1,-7.0,'["AA","01FA","BB"]',1000),
+ (2,2,1,NULL,'["01FA","CC"]',1000),
+ (3,3,1,-5.0,'["AA","CC"]',1000)`, // no 01FA → excluded
+ }
+ for _, s := range stmts {
+ if _, err := conn.Exec(s); err != nil {
+ t.Fatal(err)
+ }
+ }
+ return &DB{conn: conn}
+}
+
+// resolver that only resolves the exact tokens it's told are unique.
+func testResolver(unique map[string]string) func(string) string {
+ return func(tok string) string {
+ if pk, ok := unique[tok]; ok {
+ return pk
+ }
+ return "" // ambiguous / unknown → skip
+ }
+}
+
+func TestParsePathTokens(t *testing.T) {
+ cases := []struct {
+ in string
+ want []string
+ }{
+ {`["AA","01FA","BB"]`, []string{"AA", "01FA", "BB"}},
+ {`["aa","01fa"]`, []string{"AA", "01FA"}}, // uppercased
+ {`["EFEF"]`, []string{"EFEF"}},
+ {`[]`, nil},
+ {``, nil},
+ {`null`, nil},
+ {`["49A985"]`, []string{"49A985"}}, // 3-byte hop preserved
+ }
+ for _, c := range cases {
+ got := parsePathTokens(c.in)
+ if len(got) != len(c.want) {
+ t.Fatalf("parsePathTokens(%q) = %v, want %v", c.in, got, c.want)
+ }
+ for i := range got {
+ if got[i] != c.want[i] {
+ t.Errorf("parsePathTokens(%q)[%d] = %q, want %q", c.in, i, got[i], c.want[i])
+ }
+ }
+ }
+}
+
+func TestAttributeDirections_PredecessorAndSuccessor(t *testing.T) {
+ // path A(aa) -> N(01fa) -> B(bb): we hear A, B hears us.
+ unique := map[string]string{"AA": "aa00", "BB": "bb00"}
+ rows := []pathRow{{
+ observerPK: "obs1", payloadType: 5,
+ path: []string{"AA", "01FA", "BB"},
+ }}
+ d := attributeDirections(rows, map[string]bool{"01FA": true}, "01fa326b", testResolver(unique))
+ if d.we["aa00"] != 1 {
+ t.Fatalf("we_hear[aa00]=%d want 1", d.we["aa00"])
+ }
+ if d.they["bb00"] != 1 {
+ t.Fatalf("they_hear[bb00]=%d want 1", d.they["bb00"])
+ }
+ if d.relay != 1 {
+ t.Fatalf("relay=%d want 1", d.relay)
+ }
+}
+
+func TestAttributeDirections_LastHopObserverAndAdvertFirstHop(t *testing.T) {
+ rows := []pathRow{
+ // N is last hop → observer heard us directly (+snr).
+ {observerPK: "obsx", payloadType: 5, path: []string{"AA", "01FA"}, snr: 4.0, snrValid: true},
+ // N is first hop of an ADVERT (type 4) → we heard the originator.
+ {observerPK: "obsy", payloadType: 4, fromPubkey: "origin1", path: []string{"01FA", "CC"}},
+ }
+ d := attributeDirections(rows, map[string]bool{"01FA": true}, "01fa326b",
+ testResolver(map[string]string{"CC": "cc00"}))
+ if a, ok := d.obs["obsx"]; !ok || a.count != 1 {
+ t.Fatalf("observer obsx not counted")
+ }
+ if a := d.obs["obsx"]; a.snrN != 1 || a.snrSum != 4.0 {
+ t.Fatalf("observer snr not aggregated")
+ }
+ if d.they["obsx"] != 1 {
+ t.Fatalf("they_hear[obsx]=%d want 1", d.they["obsx"])
+ }
+ if d.we["origin1"] != 1 {
+ t.Fatalf("we_hear[origin1]=%d want 1 (advert first-hop)", d.we["origin1"])
+ }
+ if d.they["cc00"] != 1 {
+ t.Fatalf("they_hear[cc00]=%d want 1 (successor)", d.they["cc00"])
+ }
+}
+
+func TestAttributeDirections_AmbiguousSkippedAndSelfIgnored(t *testing.T) {
+ // No observer, so the last-hop observer branch can't fire — this isolates
+ // the resolve logic. ZZ is unresolved (ambiguous → skipped); the trailing
+ // 01FA resolves to self (ourPK) and must be ignored as a successor.
+ rows := []pathRow{{observerPK: "", payloadType: 5, path: []string{"ZZ", "01FA", "01FA"}}}
+ d := attributeDirections(rows, map[string]bool{"01FA": true}, "01fa326b",
+ testResolver(map[string]string{"01FA": "01fa326b"}))
+ if len(d.we) != 0 || len(d.they) != 0 {
+ t.Fatalf("ambiguous/self should yield no edges, got we=%v they=%v", d.we, d.they)
+ }
+}
+
+func TestAttributeDirections_LastHopWithObserverCountsObserver(t *testing.T) {
+ // Guards the case the previous test deliberately excludes: when our token is
+ // the last hop AND an observer is present, that observer heard us directly.
+ rows := []pathRow{{observerPK: "obs1", payloadType: 5, path: []string{"ZZ", "01FA"}}}
+ d := attributeDirections(rows, map[string]bool{"01FA": true}, "01fa326b",
+ testResolver(map[string]string{}))
+ if a, ok := d.obs["obs1"]; d.they["obs1"] != 1 || !ok || a.count != 1 {
+ t.Fatalf("last-hop observer should be counted, got they=%v", d.they)
+ }
+}
+
+func TestReliableTokens(t *testing.T) {
+ // pm where "01fa" is unique but "01" is shared (collision).
+ nodes := []nodeInfo{
+ {PublicKey: "01fa326b0000", Role: "repeater"},
+ {PublicKey: "0188aaaa0000", Role: "repeater"},
+ }
+ pm := buildPrefixMap(nodes)
+ toks := reliableTokens("01fa326b0000", pm)
+ if !toks["01FA"] {
+ t.Fatalf("expected 01FA reliable, got %v", toks)
+ }
+ if toks["01"] {
+ t.Fatalf("1-byte 01 must be excluded (collision), got %v", toks)
+ }
+}
+
+func TestReliableTokens_CompanionNotMisattributed(t *testing.T) {
+ // pm holds only path-capable relays. A companion target (not in pm) whose
+ // prefix uniquely matches an UNRELATED relay must yield NO reliable tokens —
+ // otherwise that relay's traffic would be credited to the companion.
+ relay := nodeInfo{PublicKey: "aa11000000000000", Role: "repeater"}
+ pm := buildPrefixMap([]nodeInfo{relay})
+ companion := "aa11ffff00000000" // shares 2-byte "aa11" with the relay, differs at byte 3
+ toks := reliableTokens(companion, pm)
+ if len(toks) != 0 {
+ t.Fatalf("companion must get no reliable tokens (prefix points at a relay), got %v", toks)
+ }
+ // Sanity: the relay itself still resolves to its own prefix.
+ if !reliableTokens(relay.PublicKey, pm)["AA11"] {
+ t.Fatalf("relay should keep its own AA11 token")
+ }
+}
+
+func TestScanReachRows_CapTruncates(t *testing.T) {
+ defer func(orig int) { reachScanRowLimit = orig }(reachScanRowLimit)
+ reachScanRowLimit = 1 // newReachScanTestDB has 2 matching rows
+ db := newReachScanTestDB(t)
+ defer db.conn.Close()
+ srv := &Server{db: db}
+ rows := srv.scanReachRows(context.Background(), map[string]bool{"01FA": true}, 0)
+ if len(rows) != 1 {
+ t.Fatalf("scan must hard-cap at reachScanRowLimit (1), got %d rows", len(rows))
+ }
+}
+
+func TestReachCacheEviction_BoundedNotWiped(t *testing.T) {
+ srv := &Server{}
+ resetReachState(t, srv)
+ for i := 0; i < reachCacheMax+50; i++ {
+ srv.reachCachePut("k"+strconv.Itoa(i), []byte("x"))
+ }
+ srv.reach.cacheMu.RLock()
+ n := len(srv.reach.cache)
+ srv.reach.cacheMu.RUnlock()
+ // Bounded at the cap and NOT a full wipe (the old crude reset would leave 1).
+ if n != reachCacheMax {
+ t.Fatalf("cache size after overflow = %d, want %d (bounded, evict-oldest not full-wipe)", n, reachCacheMax)
+ }
+}
+
+func TestReliableTokens_ThreeByteBranch(t *testing.T) {
+ // Two nodes share the 2-byte prefix "01fa" but diverge at byte 3, so the
+ // 3-byte (6-hex) prefix is the shortest unique token. Exercises the l=6
+ // branch that the 1-/2-byte test does not.
+ nodes := []nodeInfo{
+ {PublicKey: "01fa32000000", Role: "repeater"},
+ {PublicKey: "01fa99000000", Role: "repeater"},
+ }
+ pm := buildPrefixMap(nodes)
+ toks := reliableTokens("01fa32000000", pm)
+ if toks["01FA"] {
+ t.Fatalf("2-byte 01FA collides here and must be excluded, got %v", toks)
+ }
+ if !toks["01FA32"] {
+ t.Fatalf("expected 3-byte 01FA32 reliable token, got %v", toks)
+ }
+}
+
+func TestAttributeDirections_NonAdvertFirstHopNotCredited(t *testing.T) {
+ // Our token is the FIRST hop but payloadType is NOT an advert. The
+ // fromPubkey must NOT be credited as we_hear (only adverts carry a
+ // trustworthy originator → first-hop relationship). Guards the
+ // `payloadType == PayloadADVERT` condition on the first-hop branch.
+ rows := []pathRow{{
+ observerPK: "obs1", payloadType: 5, fromPubkey: "origin1",
+ path: []string{"01FA", "BB"},
+ }}
+ d := attributeDirections(rows, map[string]bool{"01FA": true}, "01fa326b",
+ testResolver(map[string]string{"BB": "bb00"}))
+ if d.we["origin1"] != 0 {
+ t.Fatalf("non-advert first hop must not credit we_hear[origin1], got %d", d.we["origin1"])
+ }
+ if len(d.we) != 0 {
+ t.Fatalf("expected no we_hear edges, got %v", d.we)
+ }
+ if d.they["bb00"] != 1 { // successor still counts
+ t.Fatalf("they_hear[bb00]=%d want 1", d.they["bb00"])
+ }
+}
+
+func TestAttributeDirections_ObserverAggregatesAcrossRows(t *testing.T) {
+ // Same observer on the last hop across multiple rows: count and SNR must
+ // accumulate, not overwrite.
+ rows := []pathRow{
+ {observerPK: "obs1", payloadType: 5, path: []string{"AA", "01FA"}, snr: 2.0, snrValid: true},
+ {observerPK: "obs1", payloadType: 5, path: []string{"BB", "01FA"}, snr: 6.0, snrValid: true},
+ }
+ d := attributeDirections(rows, map[string]bool{"01FA": true}, "01fa326b", testResolver(nil))
+ a, ok := d.obs["obs1"]
+ if !ok || a.count != 2 {
+ t.Fatalf("observer count should aggregate to 2, got %+v", a)
+ }
+ if a.snrN != 2 || a.snrSum != 8.0 {
+ t.Fatalf("snr should aggregate (n=2,sum=8), got n=%d sum=%v", a.snrN, a.snrSum)
+ }
+ if d.they["obs1"] != 2 {
+ t.Fatalf("they_hear[obs1]=%d want 2", d.they["obs1"])
+ }
+}
+
+func TestScanReachRows_DecodesRows(t *testing.T) {
+ db := newReachScanTestDB(t)
+ defer db.conn.Close()
+ srv := &Server{db: db}
+ rows := srv.scanReachRows(context.Background(), map[string]bool{"01FA": true}, 0)
+ if len(rows) != 2 {
+ t.Fatalf("expected 2 matching rows (non-matching path excluded), got %d", len(rows))
+ }
+ // Find the advert row (order is not guaranteed without ORDER BY).
+ var got *pathRow
+ for i := range rows {
+ if rows[i].payloadType == 4 {
+ got = &rows[i]
+ }
+ }
+ if got == nil {
+ t.Fatalf("advert row not returned: %+v", rows)
+ }
+ // Fields are decoded + normalized: lowercase observer/from, uppercase path.
+ if got.observerPK != "obs1" || got.fromPubkey != "ff00" {
+ t.Fatalf("decoded fields wrong: %+v", *got)
+ }
+ if len(got.path) != 3 || got.path[1] != "01FA" {
+ t.Fatalf("path not parsed/uppercased: %v", got.path)
+ }
+ if !got.snrValid || got.snr != -7.0 {
+ t.Fatalf("snr not decoded: valid=%v val=%v", got.snrValid, got.snr)
+ }
+}
diff --git a/cmd/server/routes.go b/cmd/server/routes.go
index 135b663e..09b62d4e 100644
--- a/cmd/server/routes.go
+++ b/cmd/server/routes.go
@@ -83,6 +83,12 @@ type Server struct {
// bypass branch was exercised without standing up a full DB/store.
// Production code MUST leave this nil. #1483 follow-up.
computeNeighborGraphResponseFn func(minCount int, minScore float64, region, role string) NeighborGraphResponse
+
+ // Per-server state for /api/nodes/{pk}/reach: TTL cache + singleflight
+ // + cached neighbor_edges degree snapshot. Lives on *Server (not as
+ // package globals) so multiple instances don't share observable
+ // state. Initialised lazily on first use; see node_reach.go.
+ reach reachState
}
// PerfStats tracks request performance.
@@ -238,6 +244,10 @@ func (s *Server) RegisterRoutes(r *mux.Router) {
r.HandleFunc("/api/nodes/{pubkey}/clock-skew", s.handleNodeClockSkew).Methods("GET")
r.HandleFunc("/api/observers/clock-skew", s.handleObserverClockSkew).Methods("GET")
r.HandleFunc("/api/nodes/{pubkey}/neighbors", s.handleNodeNeighbors).Methods("GET")
+ // Keep specific sub-routes (…/reach) registered BEFORE the catch-all
+ // /api/nodes/{pubkey} — mux matches in registration order, so reordering
+ // this below the catch-all would shadow it and break the route.
+ r.HandleFunc("/api/nodes/{pubkey}/reach", s.handleNodeReach).Methods("GET")
r.HandleFunc("/api/nodes/{pubkey}", s.handleNodeDetail).Methods("GET")
r.HandleFunc("/api/nodes", s.handleNodes).Methods("GET")
diff --git a/cmd/server/store.go b/cmd/server/store.go
index bfd71872..870b7d63 100644
--- a/cmd/server/store.go
+++ b/cmd/server/store.go
@@ -6049,7 +6049,8 @@ type nodeInfo struct {
Lon float64
HasGPS bool
LastSeen time.Time
- ObservationCount int // count of advertisements/observations; used for tier-3 tiebreak in resolveWithContext
+ FirstSeen string // RFC3339; populated by buildNodeInfoMap for callers that need it (e.g. /api/nodes/{pk}/reach)
+ ObservationCount int // count of advertisements/observations; used for tier-3 tiebreak in resolveWithContext
}
// schemaDegradationLogged is now a PacketStore field (see type definition) so
diff --git a/docs/api-spec.md b/docs/api-spec.md
index 353c3606..e964efe1 100644
--- a/docs/api-spec.md
+++ b/docs/api-spec.md
@@ -23,6 +23,7 @@
- [GET /api/nodes/:pubkey/health](#get-apinodespubkeyhealth)
- [GET /api/nodes/:pubkey/paths](#get-apinodespubkeypaths)
- [GET /api/nodes/:pubkey/analytics](#get-apinodespubkeyanalytics)
+- [GET /api/nodes/:pubkey/reach](#get-apinodespubkeyreach)
- [GET /api/packets](#get-apipackets)
- [GET /api/packets/timestamps](#get-apipacketstimestamps)
- [GET /api/packets/:id](#get-apipacketsid)
@@ -672,6 +673,82 @@ Per-node analytics over a time range.
---
+## GET /api/nodes/:pubkey/reach
+
+Per-node RF reach report (two-way link quality). Computes **directional** link counts from raw
+path adjacency (a flood path is recorded origin→observer, so in `[A,B]` B received
+A directly). A link is **bidirectional** when both directions have observations;
+the **bottleneck** (weaker direction) rates two-way stability. Read-only; bounded
+to a recent window. Identifies nodes only by **unique 2–3 byte** path prefixes
+(1-byte prefixes collide and are excluded).
+
+### Query Parameters
+
+| Param | Type | Default | Description |
+|--------|--------|---------|--------------------------------------|
+| `days` | number | `7` | Lookback window, clamped 1–30 |
+
+### Response `200`
+
+```jsonc
+{
+ "node": { "pubkey": string, "name": string, "role": string,
+ "lat": number | null, "lon": number | null, "first_seen": string (ISO) },
+ "window": { "days": number, "since": string (ISO) },
+ "reliable_tokens": [string], // uppercase hex prefixes unique to this node ([] if unidentifiable)
+ "importance": {
+ "neighbor_degree": number, // all-time, from neighbor_edges
+ "degree_rank": number, // 1-based rank among nodes with edges
+ "nodes_with_edges": number,
+ "relay_observations": number, // windowed obs with this node anywhere in path
+ "bidirectional_links":number,
+ "direct_observers": number
+ },
+ "direct_observers": [
+ { "pubkey": string, "name": string, "count": number,
+ "avg_snr": number | null, "lat": number | null, "lon": number | null,
+ "distance_km": number | null }
+ ],
+ "links": [
+ { "pubkey": string, "name": string, "role": string,
+ "lat": number | null, "lon": number | null,
+ "we_hear": number, "they_hear": number,
+ "bottleneck": number, "bidir": boolean,
+ "distance_km": number | null }
+ ]
+}
+```
+
+`reliable_tokens: []` means the node has no unique 1–3 byte prefix and cannot be
+reliably identified in paths; `links`/`direct_observers` will be empty.
+
+### Caching & limits
+
+- **Response cache:** computed responses are cached for **5 minutes** per
+ `pubkey|days`. Polling faster than that returns an identical body — clients
+ should not expect sub-5-minute freshness.
+- **Scan cap:** the windowed path scan is hard-capped at **200,000** rows. A node
+ with more matching observations in the window is truncated (counts become a
+ representative sample rather than exhaustive).
+
+### Response `400`
+
+Returned when `:pubkey` is not a 64-char hex string.
+
+```json
+{ "error": "invalid pubkey: expected 64 hex chars" }
+```
+
+### Response `404`
+
+Returned when the node is unknown or blacklisted.
+
+```json
+{ "error": "Not found" }
+```
+
+---
+
## GET /api/packets
Paginated packet (transmission) list with filtering.
diff --git a/public/app.js b/public/app.js
index 09c5fc22..f41f1cf8 100644
--- a/public/app.js
+++ b/public/app.js
@@ -908,6 +908,11 @@ function navigate() {
basePage = 'node-analytics';
}
+ // Special route: nodes/PUBKEY/reach → node-reach page
+ if (basePage === 'nodes' && routeParam && routeParam.endsWith('/reach')) {
+ basePage = 'node-reach';
+ }
+
// Special route: packet/123 → standalone packet detail page
if (basePage === 'packet' && routeParam) {
basePage = 'packet-detail';
diff --git a/public/index.html b/public/index.html
index 1a8b5454..dd566f03 100644
--- a/public/index.html
+++ b/public/index.html
@@ -39,6 +39,7 @@
+
+
+