Files
meshcore-analyzer/cmd/server/neighbor_api.go
T
Kpa-clawbot 13bdee57d4 perf: P0 hot-path fixes (observers, neighbor-graph, observer-analytics) (#1481) (#1483)
## What

Three of the four P0s from #1481's scale-test findings. Each cuts a
distinct
hot path; together they target /api/observers,
/api/analytics/neighbor-graph,
and /api/observers/{id}/analytics — the top three live offenders.

### P0-1: 5-min atomic-pointer cache for default neighbor-graph response
- Live p95 10.8s on the most-trafficked organic endpoint.
- Background recomputer (5-min cadence per operator directive) builds
the
  default-filter (`minCount=5 minScore=0.1`, no region, no role)
  `NeighborGraphResponse` and stores it via `atomic.Pointer`.
- `handleNeighborGraph` short-circuits on the default shape; non-default
filters take the extracted `computeNeighborGraphResponse` path
(identical
  semantics to the previous inline build).

### P0-2: cache parsed `StoreObs.Timestamp` + drop RLock window
- `handleObserverAnalytics` re-parsed the RFC3339 timestamp three times
  per observation, for 60k+ observations per active observer, under
  `s.store.mu.RLock` — blocking writers for the full scan.
- `StoreObs.ParsedTime()` parses once via `sync.Once` (mirrors
  `StoreTx.ParsedDecoded`).
- Handler snapshots the `byObserver[id]` pointer slice, releases the
  RLock immediately, then iterates locally.

### P0-3: 30s cache for `/api/observers` + sargable `IN` + covering
index
- Three SQL queries on every request → ~1.7s p50 at 50-concurrent.
- Atomic-pointer 30s cache for the default (no-filter) query.
- `GetNodeLocationsByKeys` drops `LOWER(public_key) IN (...)`
(non-sargable);
  callers pre-lowercase in Go and the plain `IN` matches the existing
  `public_key` index.
- New ingestor migration `obs_observer_ts_idx_v1` adds composite index
  `idx_observations_observer_idx_timestamp(observer_idx, timestamp)` so
  `GetObserverPacketCounts` can resolve its GROUP-BY + range filter from
  the index without scanning the 1.9M-row observations table.

### P0-4: deferred
`perfMiddleware`'s global mutex was claimed to serialize every API
request.
A direct test (`50 concurrent requests through the middleware, handler
sleeps 20ms each`) shows total elapsed ≈ 25ms, not 1s — the lock is held
only for the post-handler bookkeeping (a few µs). Real impact is below
measurement noise. Skipping to avoid invasive churn on PerfStats
consumers
without a demonstrable win.

## Test plan

Red → green per P0:
- `observers_cache_test.go` — handler reads `s.observersCache` before
SQL,
  TTL boundary, atomic.Pointer (no mutex contention).
- `storeobs_parsedtime_test.go` — parses three timestamp shapes, caches
  result, no race under concurrent readers.
- `neighbor_graph_cache_test.go` — handler serves from atomic pointer
  when set, bypasses cache when `?region=` (or any non-default filter)
  is passed.

Full server + ingestor suites pass: `go test -count=1 ./...`.

## Perf proof

Before/after p50/p95/p99 (50 requests × 50 concurrent) against prod
(before)
and staging once CI deploys (after) will be posted as a PR comment per
the
operator's "no merge without proof of improvement" gate.

Closes #1481


## TDD exemption — P0-1 and P0-2 (net-new surfaces, AGENTS.md)

Per CoreScope `AGENTS.md` § "Exemptions": **net-new code surfaces with
no
prior tests to break** may land tests in the same PR without a strict
test-first → impl commit split.

- **P0-1 (neighbor-graph atomic-pointer cache)** — `neighborGraphCache`,
  `recomputeNeighborGraphCache`, `loadNeighborGraphCacheBytes`,
  `startNeighborGraphRecomputer` and the default-shape short-circuit in
  `handleNeighborGraph` were brand-new code with no pre-existing
  assertions covering them. There was no green test to first turn red.
- **P0-2 (cached `StoreObs.Timestamp` + RLock window drop)** —
  `StoreObs.ParsedTime()` and the snapshot+release pattern in
  `handleObserverAnalytics` were new surfaces; the prior code did the
  parse inline per call with no behavioural test to break.

P0-3 was authored properly red-then-green (commit `6e63ec6a` red, then
`83ae129b` green) and does NOT use this exemption.

## Default-filter detection vs frontend reality (#1483 follow-up)

The Neighbor Graph analytics tab in `public/analytics.js` fetches
`/analytics/neighbor-graph?min_count=1&min_score=0` because the
client-side sliders need the full edge set to filter from. That shape
did NOT match the `(5, 0.1)` cached default, so the UI tab still paid
the cold compute cost despite #1481 P0-1.

The #1483 follow-up commit caches BOTH shapes in the same recomputer
pass:
- `(minCount=5, minScore=0.1, no region, no role)` — `live.js`
  affinity-scoring consumer.
- `(minCount=1, minScore=0, no region, no role)` — analytics tab.

Both are served from `atomic.Pointer` with an `X-Cache-Age-Seconds`
header. The per-shape cost in the background goroutine is roughly
linear in edge count; total recompute time stays well under the
5-minute cadence on prod-scale graphs.

---------

Co-authored-by: openclaw-bot <bot@openclaw.dev>
Co-authored-by: mc-bot <mc-bot@users.noreply.github.com>
2026-05-29 02:42:21 -07:00

531 lines
15 KiB
Go

package main
import (
"encoding/json"
"net/http"
"sort"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/gorilla/mux"
)
// ─── Neighbor API response types ───────────────────────────────────────────────
type NeighborResponse struct {
Node string `json:"node"`
Neighbors []NeighborEntry `json:"neighbors"`
TotalObservations int `json:"total_observations"`
}
type NeighborEntry struct {
Pubkey *string `json:"pubkey"`
Prefix string `json:"prefix"`
Name *string `json:"name"`
Role *string `json:"role"`
Count int `json:"count"`
Score float64 `json:"score"`
FirstSeen string `json:"first_seen"`
LastSeen string `json:"last_seen"`
AvgSNR *float64 `json:"avg_snr"`
DistanceKm *float64 `json:"distance_km,omitempty"`
Observers []string `json:"observers"`
Ambiguous bool `json:"ambiguous"`
Unresolved bool `json:"unresolved,omitempty"`
Candidates []CandidateEntry `json:"candidates,omitempty"`
}
type CandidateEntry struct {
Pubkey string `json:"pubkey"`
Name string `json:"name"`
Role string `json:"role"`
}
type NeighborGraphResponse struct {
Nodes []GraphNode `json:"nodes"`
Edges []GraphEdge `json:"edges"`
Stats GraphStats `json:"stats"`
}
type GraphNode struct {
Pubkey string `json:"pubkey"`
Name string `json:"name"`
Role string `json:"role"`
NeighborCount int `json:"neighbor_count"`
}
type GraphEdge struct {
Source string `json:"source"`
Target string `json:"target"`
Weight int `json:"weight"`
Score float64 `json:"score"`
Bidirectional bool `json:"bidirectional"`
AvgSNR *float64 `json:"avg_snr"`
Ambiguous bool `json:"ambiguous"`
}
type GraphStats struct {
TotalNodes int `json:"total_nodes"`
TotalEdges int `json:"total_edges"`
AmbiguousEdges int `json:"ambiguous_edges"`
AvgClusterSize float64 `json:"avg_cluster_size"`
RejectedEdgesGeoFar uint64 `json:"rejected_edges_geo_far"` // edges dropped at build time by the geo-implausibility filter (#1228)
}
// ─── Graph accessor on Server ──────────────────────────────────────────────────
// getNeighborGraph returns the current neighbor graph, rebuilding if stale.
func (s *Server) getNeighborGraph() *NeighborGraph {
s.neighborMu.Lock()
defer s.neighborMu.Unlock()
if s.neighborGraph == nil || s.neighborGraph.IsStale() {
if s.store != nil {
opts := BuildOptions{MaxEdgeKm: DefaultMaxEdgeKm}
if s.cfg != nil {
opts.EnableLog = s.cfg.DebugAffinity
opts.MaxEdgeKm = s.cfg.NeighborMaxEdgeKm()
}
s.neighborGraph = BuildFromStoreWithOptions(s.store, opts)
} else {
s.neighborGraph = NewNeighborGraph()
}
}
return s.neighborGraph
}
// ─── Handlers ──────────────────────────────────────────────────────────────────
func (s *Server) handleNodeNeighbors(w http.ResponseWriter, r *http.Request) {
pubkey := strings.ToLower(mux.Vars(r)["pubkey"])
if s.cfg.IsBlacklisted(pubkey) {
writeError(w, 404, "Not found")
return
}
minCount := 1
if v := r.URL.Query().Get("min_count"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
minCount = n
}
}
minScore := 0.0
if v := r.URL.Query().Get("min_score"); v != "" {
if f, err := strconv.ParseFloat(v, 64); err == nil {
minScore = f
}
}
includeAmbiguous := true
if v := r.URL.Query().Get("include_ambiguous"); v == "false" {
includeAmbiguous = false
}
graph := s.getNeighborGraph()
edges := graph.Neighbors(pubkey)
now := time.Now()
// Build node info lookup for names/roles/coordinates.
nodeMap := s.buildNodeInfoMap()
// Look up the queried node's GPS coordinates for distance computation.
var srcInfo nodeInfo
if nodeMap != nil {
srcInfo = nodeMap[pubkey]
}
var entries []NeighborEntry
totalObs := 0
for _, e := range edges {
score := e.Score(now)
if e.Count < minCount || score < minScore {
continue
}
if e.Ambiguous && !includeAmbiguous {
continue
}
totalObs += e.Count
// Determine the "other" node (neighbor of the queried pubkey).
neighborPK := e.NodeA
if strings.EqualFold(neighborPK, pubkey) {
neighborPK = e.NodeB
}
entry := NeighborEntry{
Prefix: e.Prefix,
Count: e.Count,
Score: score,
FirstSeen: e.FirstSeen.UTC().Format(time.RFC3339),
LastSeen: e.LastSeen.UTC().Format(time.RFC3339),
Ambiguous: e.Ambiguous,
Observers: observerList(e.Observers),
}
if e.SNRCount > 0 {
avg := e.AvgSNR()
entry.AvgSNR = &avg
}
if e.Ambiguous {
if len(e.Candidates) == 0 {
entry.Unresolved = true
}
for _, cpk := range e.Candidates {
ce := CandidateEntry{Pubkey: cpk}
if info, ok := nodeMap[strings.ToLower(cpk)]; ok {
ce.Name = info.Name
ce.Role = info.Role
}
entry.Candidates = append(entry.Candidates, ce)
}
} else if neighborPK != "" {
entry.Pubkey = &neighborPK
if info, ok := nodeMap[strings.ToLower(neighborPK)]; ok {
entry.Name = &info.Name
entry.Role = &info.Role
if srcInfo.HasGPS && info.HasGPS {
d := haversineKm(srcInfo.Lat, srcInfo.Lon, info.Lat, info.Lon)
entry.DistanceKm = &d
}
}
}
entries = append(entries, entry)
}
// Defense-in-depth: deduplicate unresolved prefix entries that match
// resolved pubkey entries in the same neighbor set (fixes #698).
entries = dedupPrefixEntries(entries)
// Sort by score descending.
sort.Slice(entries, func(i, j int) bool {
return entries[i].Score > entries[j].Score
})
if entries == nil {
entries = []NeighborEntry{}
}
resp := NeighborResponse{
Node: pubkey,
Neighbors: entries,
TotalObservations: totalObs,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
func (s *Server) handleNeighborGraph(w http.ResponseWriter, r *http.Request) {
minCount := 5
if v := r.URL.Query().Get("min_count"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
minCount = n
}
}
minScore := 0.1
if v := r.URL.Query().Get("min_score"); v != "" {
if f, err := strconv.ParseFloat(v, 64); err == nil {
minScore = f
}
}
region := r.URL.Query().Get("region")
roleFilter := strings.ToLower(r.URL.Query().Get("role"))
// #1481 P0-1: serve the default-shape request from the atomic-pointer
// snapshot maintained by the background recomputer (5 min cadence).
// Default shape: minCount=5, minScore=0.1, no region, no role.
if minCount == 5 && minScore == 0.1 && region == "" && roleFilter == "" {
if raw, age, ok := s.loadNeighborGraphCacheBytes(); ok {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Cache-Age-Seconds", cacheAgeSecondsHeader(age))
w.Write(raw)
return
}
}
// #1483: also serve the (minCount=1, minScore=0) shape from cache —
// that's what the analytics UI tab fetches so it can client-side
// slider over the full edge set. Without this branch the user-
// visible analytics tab still hit the cold compute path.
if minCount == 1 && minScore == 0 && region == "" && roleFilter == "" {
if raw, age, ok := s.loadNeighborGraphCacheBytesUnfiltered(); ok {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Cache-Age-Seconds", cacheAgeSecondsHeader(age))
w.Write(raw)
return
}
}
resp := s.computeNeighborGraphResponseDispatch(minCount, minScore, region, roleFilter)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
// computeNeighborGraphResponseDispatch routes to the test-injected
// function when set, otherwise to the real pipeline. #1483 follow-up.
func (s *Server) computeNeighborGraphResponseDispatch(minCount int, minScore float64, region, roleFilter string) NeighborGraphResponse {
if s.computeNeighborGraphResponseFn != nil {
return s.computeNeighborGraphResponseFn(minCount, minScore, region, roleFilter)
}
return s.computeNeighborGraphResponse(minCount, minScore, region, roleFilter)
}
// buildDefaultNeighborGraphResponse builds the default-shape response
// used by the #1481 P0-1 recomputer. Goes through the dispatch so test
// hooks can inject failures (#1483 follow-up).
func (s *Server) buildDefaultNeighborGraphResponse() NeighborGraphResponse {
return s.computeNeighborGraphResponseDispatch(5, 0.1, "", "")
}
// computeNeighborGraphResponse does the full graph build + filter + score
// pipeline previously inlined in handleNeighborGraph.
func (s *Server) computeNeighborGraphResponse(minCount int, minScore float64, region, roleFilter string) NeighborGraphResponse {
graph := s.getNeighborGraph()
allEdges := graph.AllEdges()
now := time.Now()
// Resolve region observers if filtering.
var regionObs map[string]bool
if region != "" && s.store != nil {
regionObs = s.store.resolveRegionObservers(region)
}
nodeMap := s.buildNodeInfoMap()
nodeSet := make(map[string]bool)
var filteredEdges []GraphEdge
ambiguousCount := 0
for _, e := range allEdges {
score := e.Score(now)
if e.Count < minCount || score < minScore {
continue
}
// Role filter: at least one endpoint must match the role.
if roleFilter != "" && nodeMap != nil {
aInfo, aOK := nodeMap[strings.ToLower(e.NodeA)]
bInfo, bOK := nodeMap[strings.ToLower(e.NodeB)]
aMatch := aOK && strings.EqualFold(aInfo.Role, roleFilter)
bMatch := bOK && strings.EqualFold(bInfo.Role, roleFilter)
if !aMatch && !bMatch {
continue
}
}
// Region filter: at least one observer must be in the region.
if regionObs != nil {
match := false
for obs := range e.Observers {
if regionObs[obs] {
match = true
break
}
}
if !match {
continue
}
}
// Filter blacklisted nodes from graph.
if s.cfg != nil && (s.cfg.IsBlacklisted(e.NodeA) || s.cfg.IsBlacklisted(e.NodeB)) {
continue
}
ge := GraphEdge{
Source: e.NodeA,
Target: e.NodeB,
Weight: e.Count,
Score: score,
Bidirectional: true,
Ambiguous: e.Ambiguous,
}
if e.SNRCount > 0 {
avg := e.AvgSNR()
ge.AvgSNR = &avg
}
if e.Ambiguous {
ambiguousCount++
// For ambiguous edges, use prefix as target.
if e.NodeB == "" {
ge.Target = "prefix:" + e.Prefix
}
}
filteredEdges = append(filteredEdges, ge)
// Track nodes.
if e.NodeA != "" && !strings.HasPrefix(e.NodeA, "prefix:") {
nodeSet[e.NodeA] = true
}
if e.NodeB != "" && !strings.HasPrefix(e.NodeB, "prefix:") {
nodeSet[e.NodeB] = true
}
}
// Build node list.
// Count neighbors per node from filtered edges.
neighborCounts := make(map[string]int)
for _, ge := range filteredEdges {
neighborCounts[ge.Source]++
neighborCounts[ge.Target]++
}
var nodes []GraphNode
for pk := range nodeSet {
gn := GraphNode{Pubkey: pk, NeighborCount: neighborCounts[pk]}
if info, ok := nodeMap[strings.ToLower(pk)]; ok {
gn.Name = info.Name
gn.Role = info.Role
}
nodes = append(nodes, gn)
}
if filteredEdges == nil {
filteredEdges = []GraphEdge{}
}
if nodes == nil {
nodes = []GraphNode{}
}
avgCluster := 0.0
if len(nodes) > 0 {
avgCluster = float64(len(filteredEdges)*2) / float64(len(nodes))
}
return NeighborGraphResponse{
Nodes: nodes,
Edges: filteredEdges,
Stats: GraphStats{
TotalNodes: len(nodes),
TotalEdges: len(filteredEdges),
AmbiguousEdges: ambiguousCount,
AvgClusterSize: avgCluster,
RejectedEdgesGeoFar: atomic.LoadUint64(&graph.RejectedEdgesGeoFar),
},
}
}
// ─── Helpers ───────────────────────────────────────────────────────────────────
func observerList(m map[string]bool) []string {
if len(m) == 0 {
return []string{}
}
out := make([]string, 0, len(m))
for k := range m {
out = append(out, k)
}
sort.Strings(out)
return out
}
// buildNodeInfoMap returns a map of lowercase pubkey → nodeInfo for name/role lookups.
func (s *Server) buildNodeInfoMap() map[string]nodeInfo {
if s.store == nil {
return nil
}
nodes, _ := s.store.getCachedNodesAndPM()
m := make(map[string]nodeInfo, len(nodes))
for _, n := range nodes {
m[strings.ToLower(n.PublicKey)] = n
}
// Enrich observer-only nodes: if an observer pubkey isn't already in the
// map (i.e. it's not also a repeater/companion), add it with role "observer".
if s.db != nil {
rows, err := s.db.conn.Query("SELECT id, name FROM observers")
if err == nil {
defer rows.Close()
for rows.Next() {
var id, name string
if rows.Scan(&id, &name) != nil {
continue
}
key := strings.ToLower(id)
if _, exists := m[key]; !exists {
m[key] = nodeInfo{PublicKey: id, Name: name, Role: "observer"}
}
}
}
}
return m
}
// dedupPrefixEntries merges unresolved prefix entries with resolved pubkey entries
// where the prefix is a prefix of the resolved pubkey. Defense-in-depth for #698.
func dedupPrefixEntries(entries []NeighborEntry) []NeighborEntry {
if len(entries) < 2 {
return entries
}
// Mark indices of unresolved entries to remove after merging.
remove := make(map[int]bool)
for i := range entries {
if entries[i].Pubkey != nil {
continue // only check unresolved (no pubkey)
}
prefix := strings.ToLower(entries[i].Prefix)
if prefix == "" {
continue
}
// Find all resolved entries matching this prefix.
matchIdx := -1
matchCount := 0
for j := range entries {
if i == j || entries[j].Pubkey == nil {
continue
}
if strings.HasPrefix(strings.ToLower(*entries[j].Pubkey), prefix) {
matchIdx = j
matchCount++
}
}
// Only merge when exactly one resolved entry matches — ambiguous
// prefixes that match multiple resolved neighbors must not be
// arbitrarily assigned to one of them.
if matchCount != 1 {
continue
}
j := matchIdx
// Merge counts from unresolved into resolved.
entries[j].Count += entries[i].Count
// Preserve higher LastSeen.
if entries[i].LastSeen > entries[j].LastSeen {
entries[j].LastSeen = entries[i].LastSeen
}
// Merge observers.
obsSet := make(map[string]bool)
for _, o := range entries[j].Observers {
obsSet[o] = true
}
for _, o := range entries[i].Observers {
obsSet[o] = true
}
entries[j].Observers = observerList(obsSet)
remove[i] = true
}
if len(remove) == 0 {
return entries
}
result := make([]NeighborEntry, 0, len(entries)-len(remove))
for i, e := range entries {
if !remove[i] {
result = append(result, e)
}
}
return result
}