Files
meshcore-analyzer/cmd/server/main.go
T
Kpa-clawbot 13bdee57d4 perf: P0 hot-path fixes (observers, neighbor-graph, observer-analytics) (#1481) (#1483)
## What

Three of the four P0s from #1481's scale-test findings. Each cuts a
distinct
hot path; together they target /api/observers,
/api/analytics/neighbor-graph,
and /api/observers/{id}/analytics — the top three live offenders.

### P0-1: 5-min atomic-pointer cache for default neighbor-graph response
- Live p95 10.8s on the most-trafficked organic endpoint.
- Background recomputer (5-min cadence per operator directive) builds
the
  default-filter (`minCount=5 minScore=0.1`, no region, no role)
  `NeighborGraphResponse` and stores it via `atomic.Pointer`.
- `handleNeighborGraph` short-circuits on the default shape; non-default
filters take the extracted `computeNeighborGraphResponse` path
(identical
  semantics to the previous inline build).

### P0-2: cache parsed `StoreObs.Timestamp` + drop RLock window
- `handleObserverAnalytics` re-parsed the RFC3339 timestamp three times
  per observation, for 60k+ observations per active observer, under
  `s.store.mu.RLock` — blocking writers for the full scan.
- `StoreObs.ParsedTime()` parses once via `sync.Once` (mirrors
  `StoreTx.ParsedDecoded`).
- Handler snapshots the `byObserver[id]` pointer slice, releases the
  RLock immediately, then iterates locally.

### P0-3: 30s cache for `/api/observers` + sargable `IN` + covering
index
- Three SQL queries on every request → ~1.7s p50 at 50-concurrent.
- Atomic-pointer 30s cache for the default (no-filter) query.
- `GetNodeLocationsByKeys` drops `LOWER(public_key) IN (...)`
(non-sargable);
  callers pre-lowercase in Go and the plain `IN` matches the existing
  `public_key` index.
- New ingestor migration `obs_observer_ts_idx_v1` adds composite index
  `idx_observations_observer_idx_timestamp(observer_idx, timestamp)` so
  `GetObserverPacketCounts` can resolve its GROUP-BY + range filter from
  the index without scanning the 1.9M-row observations table.

### P0-4: deferred
`perfMiddleware`'s global mutex was claimed to serialize every API
request.
A direct test (`50 concurrent requests through the middleware, handler
sleeps 20ms each`) shows total elapsed ≈ 25ms, not 1s — the lock is held
only for the post-handler bookkeeping (a few µs). Real impact is below
measurement noise. Skipping to avoid invasive churn on PerfStats
consumers
without a demonstrable win.

## Test plan

Red → green per P0:
- `observers_cache_test.go` — handler reads `s.observersCache` before
SQL,
  TTL boundary, atomic.Pointer (no mutex contention).
- `storeobs_parsedtime_test.go` — parses three timestamp shapes, caches
  result, no race under concurrent readers.
- `neighbor_graph_cache_test.go` — handler serves from atomic pointer
  when set, bypasses cache when `?region=` (or any non-default filter)
  is passed.

Full server + ingestor suites pass: `go test -count=1 ./...`.

## Perf proof

Before/after p50/p95/p99 (50 requests × 50 concurrent) against prod
(before)
and staging once CI deploys (after) will be posted as a PR comment per
the
operator's "no merge without proof of improvement" gate.

Closes #1481


## TDD exemption — P0-1 and P0-2 (net-new surfaces, AGENTS.md)

Per CoreScope `AGENTS.md` § "Exemptions": **net-new code surfaces with
no
prior tests to break** may land tests in the same PR without a strict
test-first → impl commit split.

- **P0-1 (neighbor-graph atomic-pointer cache)** — `neighborGraphCache`,
  `recomputeNeighborGraphCache`, `loadNeighborGraphCacheBytes`,
  `startNeighborGraphRecomputer` and the default-shape short-circuit in
  `handleNeighborGraph` were brand-new code with no pre-existing
  assertions covering them. There was no green test to first turn red.
- **P0-2 (cached `StoreObs.Timestamp` + RLock window drop)** —
  `StoreObs.ParsedTime()` and the snapshot+release pattern in
  `handleObserverAnalytics` were new surfaces; the prior code did the
  parse inline per call with no behavioural test to break.

P0-3 was authored properly red-then-green (commit `6e63ec6a` red, then
`83ae129b` green) and does NOT use this exemption.

## Default-filter detection vs frontend reality (#1483 follow-up)

The Neighbor Graph analytics tab in `public/analytics.js` fetches
`/analytics/neighbor-graph?min_count=1&min_score=0` because the
client-side sliders need the full edge set to filter from. That shape
did NOT match the `(5, 0.1)` cached default, so the UI tab still paid
the cold compute cost despite #1481 P0-1.

The #1483 follow-up commit caches BOTH shapes in the same recomputer
pass:
- `(minCount=5, minScore=0.1, no region, no role)` — `live.js`
  affinity-scoring consumer.
- `(minCount=1, minScore=0, no region, no role)` — analytics tab.

Both are served from `atomic.Pointer` with an `X-Cache-Age-Seconds`
header. The per-shape cost in the background goroutine is roughly
linear in edge count; total recompute time stays well under the
5-minute cadence on prod-scale graphs.

---------

Co-authored-by: openclaw-bot <bot@openclaw.dev>
Co-authored-by: mc-bot <mc-bot@users.noreply.github.com>
2026-05-29 02:42:21 -07:00

484 lines
17 KiB
Go

package main
import (
"context"
"database/sql"
"flag"
"fmt"
"log"
"net/http"
_ "net/http/pprof"
"os"
"os/exec"
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
"github.com/gorilla/mux"
"github.com/meshcore-analyzer/dbschema"
)
// Set via -ldflags at build time
var Version string
var Commit string
var BuildTime string
func resolveCommit() string {
if Commit != "" {
return Commit
}
// Try .git-commit file (baked by Docker / CI)
if data, err := os.ReadFile(".git-commit"); err == nil {
if c := strings.TrimSpace(string(data)); c != "" && c != "unknown" {
return c
}
}
// Try git rev-parse at runtime
if out, err := exec.Command("git", "rev-parse", "--short", "HEAD").Output(); err == nil {
return strings.TrimSpace(string(out))
}
return "unknown"
}
func resolveVersion() string {
if Version != "" {
return Version
}
return "unknown"
}
func resolveBuildTime() string {
if BuildTime != "" {
return BuildTime
}
return "unknown"
}
func main() {
// pprof profiling — off by default, enable with ENABLE_PPROF=true
if os.Getenv("ENABLE_PPROF") == "true" {
pprofPort := os.Getenv("PPROF_PORT")
if pprofPort == "" {
pprofPort = "6060"
}
go func() {
log.Printf("[pprof] profiling UI at http://localhost:%s/debug/pprof/", pprofPort)
if err := http.ListenAndServe(":"+pprofPort, nil); err != nil {
log.Printf("[pprof] failed to start: %v (non-fatal)", err)
}
}()
}
var (
configDir string
port int
dbPath string
publicDir string
pollMs int
)
flag.StringVar(&configDir, "config-dir", ".", "Directory containing config.json")
flag.IntVar(&port, "port", 0, "HTTP port (overrides config)")
flag.StringVar(&dbPath, "db", "", "SQLite database path (overrides config/env)")
flag.StringVar(&publicDir, "public", "public", "Directory to serve static files from")
flag.IntVar(&pollMs, "poll-ms", 1000, "SQLite poll interval for WebSocket broadcast (ms)")
flag.Parse()
// Load config
cfg, err := LoadConfig(configDir)
if err != nil {
log.Printf("[config] warning: %v (using defaults)", err)
}
// CLI flags override config
if port > 0 {
cfg.Port = port
}
if cfg.Port == 0 {
cfg.Port = 3000
}
if dbPath != "" {
cfg.DBPath = dbPath
}
if cfg.APIKey == "" {
log.Printf("[security] WARNING: no apiKey configured — write endpoints are BLOCKED (set apiKey in config.json to enable them)")
} else if IsWeakAPIKey(cfg.APIKey) {
log.Printf("[security] WARNING: API key is weak or a known default — write endpoints are vulnerable")
}
// Apply Go runtime soft memory limit (#836).
// Honors GOMEMLIMIT if set; otherwise derives from packetStore.maxMemoryMB.
{
_, envSet := os.LookupEnv("GOMEMLIMIT")
maxMB := 0
if cfg.PacketStore != nil {
maxMB = cfg.PacketStore.MaxMemoryMB
}
limit, source := applyMemoryLimit(maxMB, envSet)
switch source {
case "env":
log.Printf("[memlimit] using GOMEMLIMIT from environment (%s)", os.Getenv("GOMEMLIMIT"))
case "derived":
log.Printf("[memlimit] derived from packetStore.maxMemoryMB=%d → %d MiB (1.5x headroom)", maxMB, limit/(1024*1024))
default:
log.Printf("[memlimit] no soft memory limit set (GOMEMLIMIT unset, packetStore.maxMemoryMB=0); recommend setting one to avoid container OOM-kill")
}
warnIfMemlimitUnderprovisioned(limit)
}
// Resolve DB path
resolvedDB := cfg.ResolveDBPath(configDir)
log.Printf("[config] port=%d db=%s public=%s", cfg.Port, resolvedDB, publicDir)
if len(cfg.NodeBlacklist) > 0 {
log.Printf("[config] nodeBlacklist: %d node(s) will be hidden from API", len(cfg.NodeBlacklist))
for _, pk := range cfg.NodeBlacklist {
if trimmed := strings.ToLower(strings.TrimSpace(pk)); trimmed != "" {
log.Printf("[config] blacklisted: %s", trimmed)
}
}
}
// Open database
database, err := OpenDB(resolvedDB)
if err != nil {
log.Fatalf("[db] failed to open %s: %v", resolvedDB, err)
}
var dbCloseOnce sync.Once
dbClose := func() error {
var err error
dbCloseOnce.Do(func() { err = database.Close() })
return err
}
defer dbClose()
// Verify DB has expected tables
var tableName string
err = database.conn.QueryRow("SELECT name FROM sqlite_master WHERE type='table' AND name='transmissions'").Scan(&tableName)
if err == sql.ErrNoRows {
log.Fatalf("[db] table 'transmissions' not found — is this a CoreScope database?")
}
stats, err := database.GetStats()
if err != nil {
log.Printf("[db] warning: could not read stats: %v", err)
} else {
log.Printf("[db] transmissions=%d observations=%d nodes=%d observers=%d",
stats.TotalTransmissions, stats.TotalObservations, stats.TotalNodes, stats.TotalObservers)
}
// auto_vacuum is checked + migrated by the ingestor (#1283). The
// server is read-only and must not race the writer for the lock.
// Assert all schema migrations the ingestor owns have already run
// (#1287). The server NEVER migrates — it only reads. If a required
// column/index/table is missing, the operator must restart the
// ingestor (which owns dbschema.Apply) before this server can start.
if err := dbschema.AssertReady(database.conn); err != nil {
log.Fatalf("[db] schema not ready (ingestor must run migrations first): %v", err)
}
// In-memory packet store
store := NewPacketStore(database, cfg.PacketStore, cfg.CacheTTL)
store.config = cfg
if err := store.Load(); err != nil {
log.Fatalf("[store] failed to load: %v", err)
}
if store.hotStartupHours > 0 {
log.Printf("[store] starting background load: filling retentionHours=%gh from hotStartupHours=%gh",
store.retentionHours, store.hotStartupHours)
go store.loadBackgroundChunks()
}
// Initialize persisted neighbor graph.
// Per #1287, schema migrations all live in the ingestor (see
// dbschema.Apply). The server merely loads the snapshot here and
// then refreshes it via the recompNeighborGraph slot every 60s.
dbPath = database.path
database.hasResolvedPath = true // dbschema.AssertReady above already verified observations.resolved_path exists
// WaitGroup for background init steps that gate /api/healthz readiness.
var initWg sync.WaitGroup
// Load or build neighbor graph
if neighborEdgesTableExists(database.conn) {
store.graph.Store(loadNeighborEdgesFromDB(database.conn))
log.Printf("[neighbor] loaded persisted neighbor graph")
} else {
// No persisted snapshot yet (e.g. fresh DB before the ingestor
// has run its first edge-build cycle). Build an in-memory graph
// from the packets we already have so reads aren't empty. We
// do NOT persist — the ingestor owns neighbor_edges writes per
// #1287; the recompNeighborGraph recomputer will pick up the
// real snapshot as soon as the ingestor populates it.
log.Printf("[neighbor] no persisted edges found, will build in-memory in background...")
store.graph.Store(NewNeighborGraph())
initWg.Add(1)
go func() {
defer initWg.Done()
defer func() {
if r := recover(); r != nil {
log.Printf("[neighbor] graph build panic recovered: %v", r)
}
}()
built := BuildFromStore(store)
store.graph.Store(built)
log.Printf("[neighbor] in-memory graph build complete")
}()
}
// Initial pickBestObservation runs in background — doesn't need to block HTTP.
// API serves best-effort data until this completes (~10s for 100K txs).
// Processes in chunks of 5000, releasing the lock between chunks so API
// handlers remain responsive.
initWg.Add(1)
go func() {
defer initWg.Done()
defer func() {
if r := recover(); r != nil {
log.Printf("[store] pickBestObservation panic recovered: %v", r)
}
}()
const chunkSize = 5000
store.mu.RLock()
totalPackets := len(store.packets)
store.mu.RUnlock()
for i := 0; i < totalPackets; i += chunkSize {
end := i + chunkSize
if end > totalPackets {
end = totalPackets
}
store.mu.Lock()
for j := i; j < end && j < len(store.packets); j++ {
pickBestObservation(store.packets[j])
}
store.mu.Unlock()
if end < totalPackets {
time.Sleep(10 * time.Millisecond) // yield to API handlers
}
}
log.Printf("[store] initial pickBestObservation complete (%d transmissions)", totalPackets)
}()
// Mark server ready once all background init completes.
go func() {
initWg.Wait()
readiness.Store(1)
log.Printf("[server] readiness: ready=true (background init complete)")
}()
// WebSocket hub
hub := NewHub()
hub.upgrader.EnableCompression = cfg.WSCompressionEnabled()
// HTTP server
srv := NewServer(database, cfg, hub)
srv.configDir = configDir
srv.store = store
router := mux.NewRouter()
srv.RegisterRoutes(router)
// WebSocket endpoint
router.HandleFunc("/ws", hub.ServeWS)
// Static files + SPA fallback
absPublic, _ := filepath.Abs(publicDir)
if _, err := os.Stat(absPublic); err == nil {
fs := http.FileServer(http.Dir(absPublic))
router.PathPrefix("/").Handler(wsOrStatic(hub, spaHandler(absPublic, fs)))
log.Printf("[static] serving %s", absPublic)
} else {
log.Printf("[static] directory %s not found — API-only mode", absPublic)
router.PathPrefix("/").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html")
w.Write([]byte(`<!DOCTYPE html><html><body><h1>CoreScope</h1><p>Frontend not found. API available at /api/</p></body></html>`))
})
}
// Start SQLite poller for WebSocket broadcast
poller := NewPoller(database, hub, time.Duration(pollMs)*time.Millisecond)
poller.store = store
go poller.Start()
// Start periodic eviction
stopEviction := store.StartEvictionTicker()
defer stopEviction()
// Steady-state analytics recomputers (issue #1240). Replaces the
// on-request compute-then-cache pattern for the default (region="",
// zero-window) analytics queries with a background refresh loop so
// reads always hit cache in <1ms.
stopAnalyticsRecomp := store.StartAnalyticsRecomputers(
cfg.AnalyticsDefaultRecomputeInterval(),
cfg.AnalyticsRecomputeIntervals(),
)
defer stopAnalyticsRecomp()
log.Printf("[analytics-recompute] background recompute enabled (default=%s)", cfg.AnalyticsDefaultRecomputeInterval())
// #1481 P0-1: background recomputer for the default-shape
// /api/analytics/neighbor-graph response (5 min cadence). Reads
// hit an atomic pointer; the rebuild path no longer runs on the
// request goroutine for the common filter shape.
stopNeighborGraphCache := make(chan struct{})
ngInterval := neighborGraphCacheInterval
if cfg.NeighborGraph != nil && cfg.NeighborGraph.CacheRecomputeIntervalSeconds > 0 {
ngInterval = time.Duration(cfg.NeighborGraph.CacheRecomputeIntervalSeconds) * time.Second
}
srv.startNeighborGraphRecomputer(ngInterval, stopNeighborGraphCache)
defer close(stopNeighborGraphCache)
log.Printf("[neighbor-graph-cache] background recompute enabled (interval=%s)", ngInterval)
// Steady-state repeater-enrichment recomputer (issue #1262).
// Prewarms the bulk caches feeding handleNodes so the very first
// /api/nodes?limit=2000 from live.js's SPA bootstrap hits a
// populated cache instead of paying a 15.7s on-thread rebuild.
// Uses the configured RelayActiveHours window and the same
// default recompute interval as the other analytics caches.
relayWindowHours := cfg.GetHealthThresholds().RelayActiveHours
stopRepeaterEnrichRecomp := store.StartRepeaterEnrichmentRecomputer(
relayWindowHours,
cfg.AnalyticsDefaultRecomputeInterval(),
)
defer stopRepeaterEnrichRecomp()
log.Printf("[repeater-enrich-recompute] background recompute enabled (window=%.1fh, interval=%s)",
relayWindowHours, cfg.AnalyticsDefaultRecomputeInterval())
// Steady-state bridge-centrality recomputer (issue #672 axis 2).
// Computes betweenness centrality over the in-memory neighbor
// graph and stores the per-pubkey score map atomically. Read by
// handleNodes via a single atomic load.
stopBridgeRecomp := store.StartBridgeScoreRecomputer(
cfg.AnalyticsDefaultRecomputeInterval(),
)
defer stopBridgeRecomp()
log.Printf("[bridge-recompute] background recompute enabled (interval=%s)",
cfg.AnalyticsDefaultRecomputeInterval())
// Steady-state neighbor-graph snapshot recomputer (issue #1287).
// Per Option 4: the ingestor owns neighbor_edges; the server
// READS the snapshot every 60s and atomic-swaps it into s.graph.
// This is the ONLY path that updates s.graph at steady state.
stopNeighborRecomp := store.StartNeighborGraphRecomputer(NeighborGraphRecomputerDefaultInterval)
defer stopNeighborRecomp()
log.Printf("[neighbor-recompute] snapshot reload enabled (interval=%s)",
NeighborGraphRecomputerDefaultInterval)
// Packet / metrics / observer retention moved to the ingestor in
// #1283 (writes only belong on the writer process). Neighbor-edge
// pruning moved to the ingestor in #1287 for the same reason. The
// server no longer schedules any of these; the ingestor's tickers
// handle them.
_ = cfg.IncrementalVacuumPages() // kept reachable for config validation; not used here
_ = cfg.NeighborMaxAgeDays() // ditto — owned by ingestor now
// Graceful shutdown
var handler http.Handler = router
if cfg.GZipEnabled() {
handler = gzipMiddlewareWithConfig(cfg.Compression, router)
log.Printf("[server] HTTP gzip compression enabled")
}
if cfg.WSCompressionEnabled() {
log.Printf("[server] WebSocket permessage-deflate compression enabled")
}
httpServer := &http.Server{
Addr: fmt.Sprintf(":%d", cfg.Port),
Handler: handler,
ReadTimeout: 30 * time.Second,
WriteTimeout: 60 * time.Second,
IdleTimeout: 120 * time.Second,
}
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigCh
log.Printf("[server] received %v, shutting down...", sig)
// 1. Stop accepting new WebSocket/poll data
poller.Stop()
// 1b. Auto-prune tickers were all relocated to the ingestor in
// #1283/#1287 — nothing to stop here.
// 1c. Stop steady-state analytics recomputers (issue #1240).
// Must happen before dbClose so any in-flight compute that
// reaches into SQLite has finished.
if stopAnalyticsRecomp != nil {
stopAnalyticsRecomp()
}
// 2. Gracefully drain HTTP connections (up to 15s)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if err := httpServer.Shutdown(ctx); err != nil {
log.Printf("[server] HTTP shutdown error: %v", err)
}
// 3. Close WebSocket hub
hub.Close()
// 4. Close database (release SQLite WAL lock)
if err := dbClose(); err != nil {
log.Printf("[server] DB close error: %v", err)
}
log.Println("[server] shutdown complete")
}()
log.Printf("[server] CoreScope (Go) listening on http://localhost:%d", cfg.Port)
// Backfills (resolved_path, from_pubkey) moved to the ingestor in
// #1287 — they are write operations and belong on the writer
// process. The server reads the results via the periodic
// recompNeighborGraph / fetchResolvedPathForObs paths.
// Migrate old content hashes in background (one-time, idempotent).
go migrateContentHashesAsync(store, 5000, 100*time.Millisecond)
if err := httpServer.ListenAndServe(); err != http.ErrServerClosed {
log.Fatalf("[server] %v", err)
}
}
// spaHandler serves static files, falling back to index.html for SPA routes.
// It reads index.html once at creation time and replaces the __BUST__ placeholder
// with a Unix timestamp so browsers fetch fresh JS/CSS after each server restart.
func spaHandler(root string, fs http.Handler) http.Handler {
// Pre-process index.html: replace __BUST__ with a cache-bust timestamp
indexPath := filepath.Join(root, "index.html")
rawHTML, err := os.ReadFile(indexPath)
if err != nil {
log.Printf("[static] warning: could not read index.html for cache-bust: %v", err)
rawHTML = []byte("<!DOCTYPE html><html><body><h1>CoreScope</h1><p>index.html not found</p></body></html>")
}
bustValue := fmt.Sprintf("%d", time.Now().Unix())
indexHTML := []byte(strings.ReplaceAll(string(rawHTML), "__BUST__", bustValue))
log.Printf("[static] cache-bust value: %s", bustValue)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Serve pre-processed index.html for root and /index.html
if r.URL.Path == "/" || r.URL.Path == "/index.html" {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
w.Write(indexHTML)
return
}
path := filepath.Join(root, r.URL.Path)
if _, err := os.Stat(path); os.IsNotExist(err) {
// SPA fallback — serve pre-processed index.html
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
w.Write(indexHTML)
return
}
// Disable caching for JS/CSS/HTML
if filepath.Ext(path) == ".js" || filepath.Ext(path) == ".css" || filepath.Ext(path) == ".html" {
w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
}
fs.ServeHTTP(w, r)
})
}