mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-06-29 10:41:59 +00:00
## Summary Build the distance analytics index lazily on the first `/api/analytics/distance` request instead of eagerly inside `Load()` (and its background-load chunked merge). Per the triage Fix path on the issue: - Eager startup build removed from `Load()` and from `loadAllPacketsBackground()`'s post-merge pass. - First request returns `202 Accepted` + `Retry-After: 5` and kicks off the build in a background goroutine, gated by `sync.Once` so concurrent first-window requests all observe 202 (single build, not N parallel O(n²) computations). - Once built, subsequent requests fall through to the existing analytics-recomputer / TTL cache and serve 200 as before. - Debounced rebuild policy: refire only when `Δobs > 5%` since last build OR `>5 min` elapsed, whichever is more restrictive. Background loader also resets the gate so the next request rebuilds against the larger dataset. Effect: operators who never visit distance analytics no longer pay the O(n²) construction at startup. Acceptance criteria (a) no startup build, (b) first request triggers build, (c) concurrent in-flight requests get 202 are encoded as failing-first tests. ## Red → green - Red: `bc947ad1` — 3 assertion failures (`expected ... empty, got 3`, `expected 202, got 200`, `expected all 10 ... got 0`). - Green: `5264b68a` — production change makes them pass, no other tests regress. ## Files changed - `cmd/server/store.go` — lazy-build state (`distLazyMu`/`Once`/`Built`/`Building`/`LastBuilt`/`LastObs`), `TriggerDistanceIndexBuild`, `DistanceIndexBuilt`, `DistanceIndexBuilding`; eager `buildDistanceIndex` calls in `Load()` post-pass and chunked-background-load post-pass removed (Once reset instead so the next request rebuilds against the full dataset). - `cmd/server/routes.go` — `/api/analytics/distance` returns 202 + `Retry-After` until built. - `cmd/server/distance_lazy_index_test.go` — new tests (the three triage acceptance criteria). - `cmd/server/coverage_test.go`, `cmd/server/parity_test.go`, `cmd/server/routes_test.go`, `cmd/server/hop_disambig_e2e_test.go` — pre-warm the index via `TriggerDistanceIndexBuild()` + `DistanceIndexBuilt()` poll where the test asserts the 200 JSON shape. ## Perf justification Startup cost on a 500K-obs / 2K-node dataset: previously O(n²) hop scan during `Load()` post-pass and again during the background-load merge — measured at 10–20s in `specs/startup-audit.md`. New code: zero work at startup, the same O(n²) work runs at most once per HTTP request cycle (and only when the index is stale per debounce policy). Cold-path concurrency is bounded by `sync.Once`, so N parallel first-window requests never produce N parallel builds. ## Scope No config field added (debounce thresholds are hardcoded constants per the triage Fix path — `5%` / `5min`). No public API signature changes. No DB-side migration. Tests cover the lazy invariant, the 202+Retry-After contract, and concurrent first-request behavior. Closes #1011 --------- Co-authored-by: Kpa-clawbot <bot@corescope.local>
This commit is contained in:
@@ -2701,6 +2701,17 @@ func TestHandleAnalyticsDistanceWithStore(t *testing.T) {
|
||||
router := mux.NewRouter()
|
||||
srv.RegisterRoutes(router)
|
||||
|
||||
// #1011: lazy distance index — first request returns 202; trigger
|
||||
// the build and wait for it before asserting the 200 shape.
|
||||
store.TriggerDistanceIndexBuild()
|
||||
deadline := time.Now().Add(5 * time.Second)
|
||||
for !store.DistanceIndexBuilt() {
|
||||
if time.Now().After(deadline) {
|
||||
t.Fatal("distance index did not finish building within 5s")
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/analytics/distance", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
@@ -0,0 +1,114 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
// Issue #1011: distance index must NOT be built eagerly at startup.
|
||||
// It is constructed lazily on first /api/analytics/distance request,
|
||||
// the first request returns 202 + Retry-After while the build runs,
|
||||
// and concurrent requests during the build also get 202 (one build
|
||||
// only, not N parallel builds).
|
||||
//
|
||||
// These three assertions encode the acceptance criteria from the
|
||||
// triage Fix path (sync.Once-style first-request trigger, 202+Retry-After).
|
||||
|
||||
// TestDistanceIndexNotBuiltOnLoad: Load() must complete without
|
||||
// populating distHops / distPaths. Eager build is gone.
|
||||
func TestDistanceIndexNotBuiltOnLoad(t *testing.T) {
|
||||
db := setupRichTestDB(t)
|
||||
defer db.Close()
|
||||
store := NewPacketStore(db, nil)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("Load(): %v", err)
|
||||
}
|
||||
store.mu.RLock()
|
||||
nHops := len(store.distHops)
|
||||
nPaths := len(store.distPaths)
|
||||
store.mu.RUnlock()
|
||||
if nHops != 0 || nPaths != 0 {
|
||||
t.Fatalf("expected distance index empty after Load() (lazy build, #1011); got %d hops, %d paths — eager build still firing in Load()", nHops, nPaths)
|
||||
}
|
||||
if store.DistanceIndexBuilt() {
|
||||
t.Fatalf("expected DistanceIndexBuilt() = false directly after Load(); got true")
|
||||
}
|
||||
}
|
||||
|
||||
// TestDistanceFirstRequestReturns202: first /api/analytics/distance call
|
||||
// must trigger async build and return 202 + Retry-After. The handler must
|
||||
// NOT block for the full build.
|
||||
func TestDistanceFirstRequestReturns202(t *testing.T) {
|
||||
db := setupRichTestDB(t)
|
||||
defer db.Close()
|
||||
cfg := &Config{Port: 3000}
|
||||
hub := NewHub()
|
||||
srv := NewServer(db, cfg, hub)
|
||||
store := NewPacketStore(db, nil)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("Load(): %v", err)
|
||||
}
|
||||
srv.store = store
|
||||
r := mux.NewRouter()
|
||||
srv.RegisterRoutes(r)
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/analytics/distance", nil)
|
||||
w := httptest.NewRecorder()
|
||||
t0 := time.Now()
|
||||
r.ServeHTTP(w, req)
|
||||
elapsed := time.Since(t0)
|
||||
|
||||
if w.Code != 202 {
|
||||
t.Fatalf("expected 202 Accepted on first request (lazy build, #1011); got %d (body=%s)", w.Code, w.Body.String())
|
||||
}
|
||||
if ra := w.Header().Get("Retry-After"); ra == "" {
|
||||
t.Fatalf("expected non-empty Retry-After header on 202 response; got none")
|
||||
}
|
||||
// Handler must return quickly — must not block on the full build.
|
||||
if elapsed > 500*time.Millisecond {
|
||||
t.Fatalf("first-request handler took %v — must not block on build (#1011)", elapsed)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDistanceConcurrentRequestsDuringBuildReturn202: 10 requests fired
|
||||
// in close succession while the build is in flight must all receive 202;
|
||||
// exactly one build runs.
|
||||
func TestDistanceConcurrentRequestsDuringBuildReturn202(t *testing.T) {
|
||||
db := setupRichTestDB(t)
|
||||
defer db.Close()
|
||||
cfg := &Config{Port: 3000}
|
||||
hub := NewHub()
|
||||
srv := NewServer(db, cfg, hub)
|
||||
store := NewPacketStore(db, nil)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("Load(): %v", err)
|
||||
}
|
||||
srv.store = store
|
||||
r := mux.NewRouter()
|
||||
srv.RegisterRoutes(r)
|
||||
|
||||
const N = 10
|
||||
var wg sync.WaitGroup
|
||||
var got202 atomic.Int32
|
||||
wg.Add(N)
|
||||
for i := 0; i < N; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
req := httptest.NewRequest("GET", "/api/analytics/distance", nil)
|
||||
w := httptest.NewRecorder()
|
||||
r.ServeHTTP(w, req)
|
||||
if w.Code == 202 {
|
||||
got202.Add(1)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
if got202.Load() != N {
|
||||
t.Fatalf("expected all %d concurrent first-window requests to get 202; only %d did", N, got202.Load())
|
||||
}
|
||||
}
|
||||
@@ -172,6 +172,17 @@ func TestTopHopsRespectsContextAcrossAllCallSites(t *testing.T) {
|
||||
t.Fatalf("Load: %v", err)
|
||||
}
|
||||
|
||||
// #1011: distance index is now lazy — trigger it explicitly and
|
||||
// wait for build completion before inspecting distHops.
|
||||
store.TriggerDistanceIndexBuild()
|
||||
deadline := time.Now().Add(5 * time.Second)
|
||||
for !store.DistanceIndexBuilt() {
|
||||
if time.Now().After(deadline) {
|
||||
t.Fatal("distance index did not finish building within 5s")
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Inspect precomputed distance index.
|
||||
store.mu.RLock()
|
||||
hops := make([]distHopRecord, len(store.distHops))
|
||||
|
||||
@@ -146,7 +146,17 @@ type parityEndpoint struct {
|
||||
|
||||
func TestParityShapes(t *testing.T) {
|
||||
shapes := loadShapes(t)
|
||||
_, router := setupTestServer(t)
|
||||
srv, router := setupTestServer(t)
|
||||
// #1011: lazy distance index — pre-warm before parity shape
|
||||
// validation expects 200.
|
||||
srv.store.TriggerDistanceIndexBuild()
|
||||
deadline := time.Now().Add(5 * time.Second)
|
||||
for !srv.store.DistanceIndexBuilt() {
|
||||
if time.Now().After(deadline) {
|
||||
t.Fatal("distance index did not finish building within 5s")
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
endpoints := []parityEndpoint{
|
||||
{"stats", "/api/stats"},
|
||||
|
||||
@@ -2019,6 +2019,19 @@ func (s *Server) handleAnalyticsDistance(w http.ResponseWriter, r *http.Request)
|
||||
region := r.URL.Query().Get("region")
|
||||
area := r.URL.Query().Get("area")
|
||||
if s.store != nil {
|
||||
// Lazy build (#1011): distance index is not built at startup.
|
||||
// First request triggers an async build and gets 202 +
|
||||
// Retry-After; concurrent requests during the build window
|
||||
// also get 202. Cached results (after the build completes)
|
||||
// are served as 200 from the analytics recomputer / TTL cache.
|
||||
if !s.store.DistanceIndexBuilt() {
|
||||
s.store.TriggerDistanceIndexBuild()
|
||||
w.Header().Set("Retry-After", "5")
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
_, _ = w.Write([]byte(`{"status":"building","retry_after_seconds":5,"detail":"distance index is being computed (lazy build, #1011). Retry after Retry-After seconds."}`))
|
||||
return
|
||||
}
|
||||
writeJSON(w, s.store.GetAnalyticsDistance(region, area))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -626,7 +626,17 @@ func TestContentTypeJSON(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAllEndpointsReturn200(t *testing.T) {
|
||||
_, router := setupTestServer(t)
|
||||
srv, router := setupTestServer(t)
|
||||
// #1011: ensure /api/analytics/distance returns 200 (not the
|
||||
// lazy-build 202) by pre-warming the index.
|
||||
srv.store.TriggerDistanceIndexBuild()
|
||||
deadline := time.Now().Add(5 * time.Second)
|
||||
for !srv.store.DistanceIndexBuilt() {
|
||||
if time.Now().After(deadline) {
|
||||
t.Fatal("distance index did not finish building within 5s")
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
endpoints := []struct {
|
||||
path string
|
||||
status int
|
||||
@@ -1154,7 +1164,17 @@ func TestAnalyticsChannels(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAnalyticsDistance(t *testing.T) {
|
||||
_, router := setupTestServer(t)
|
||||
srv, router := setupTestServer(t)
|
||||
// #1011: lazy distance index — trigger build and wait for it
|
||||
// before asserting 200 shape.
|
||||
srv.store.TriggerDistanceIndexBuild()
|
||||
deadline := time.Now().Add(5 * time.Second)
|
||||
for !srv.store.DistanceIndexBuilt() {
|
||||
if time.Now().After(deadline) {
|
||||
t.Fatal("distance index did not finish building within 5s")
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
req := httptest.NewRequest("GET", "/api/analytics/distance", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
+111
-4
@@ -247,11 +247,30 @@ type PacketStore struct {
|
||||
spIndex map[string]int // "hop1,hop2" → count
|
||||
spTxIndex map[string][]*StoreTx // "hop1,hop2" → transmissions containing this subpath
|
||||
spTotalPaths int // transmissions with paths >= 2 hops
|
||||
// Precomputed distance analytics: hop distances and path totals
|
||||
// computed during Load() and incrementally updated on ingest.
|
||||
// Precomputed distance analytics: hop distances and path totals.
|
||||
// Built LAZILY on first /api/analytics/distance request (#1011) —
|
||||
// previously eager in Load() at startup, which was O(n²) work for
|
||||
// operators who never visit the distance analytics page. After the
|
||||
// initial lazy build, the in-memory index is maintained
|
||||
// incrementally by updateDistanceIndexForTxs on ingest.
|
||||
distHops []distHopRecord
|
||||
distPaths []distPathRecord
|
||||
|
||||
// Lazy-build gate for the distance index (#1011). distLazyBuilt is
|
||||
// true once the first /api/analytics/distance request has completed
|
||||
// its build (or a debounced rebuild). distLazyBuilding is true
|
||||
// while a build is currently running — concurrent requests in this
|
||||
// window receive 202 + Retry-After rather than racing N parallel
|
||||
// O(n²) computations. distLazyOnce serialises the first build;
|
||||
// reset by the background loader (Load() chunked merge) and by the
|
||||
// debounced-rebuild policy so subsequent rebuilds can re-fire.
|
||||
distLazyMu sync.Mutex
|
||||
distLazyOnce sync.Once
|
||||
distLazyBuilt bool
|
||||
distLazyBuilding bool
|
||||
distLazyLastBuilt time.Time
|
||||
distLazyLastObs int // totalObs at last build, for Δobs debounce
|
||||
|
||||
// Cached GetNodeHashSizeInfo result — recomputed at most once every 15s
|
||||
hashSizeInfoMu sync.Mutex
|
||||
hashSizeInfoCache map[string]*hashSizeNodeInfo
|
||||
@@ -824,7 +843,11 @@ func (s *PacketStore) Load() error {
|
||||
s.buildPathHopIndex()
|
||||
|
||||
// Precompute distance analytics (hop distances, path totals)
|
||||
s.buildDistanceIndex()
|
||||
// — DEFERRED to first /api/analytics/distance request (#1011).
|
||||
// At scale (2K+ nodes) this is O(n²) per-pair work that most
|
||||
// operators never trigger. See lazy build via
|
||||
// TriggerDistanceIndexBuild + handleAnalyticsDistance 202 path.
|
||||
// s.buildDistanceIndex()
|
||||
|
||||
// Track oldest loaded timestamp for future SQL fallback queries.
|
||||
// When hotStartupHours > 0 use the SAME cutoff string that was used in
|
||||
@@ -1249,7 +1272,15 @@ func (s *PacketStore) loadBackgroundChunks() {
|
||||
s.mu.Lock()
|
||||
s.buildSubpathIndex()
|
||||
s.buildPathHopIndex()
|
||||
s.buildDistanceIndex()
|
||||
// Distance index is now lazy (#1011) — built on first
|
||||
// /api/analytics/distance request, not at background-load
|
||||
// completion. If a previous request already triggered the build,
|
||||
// invalidate the gate so the next request rebuilds against the
|
||||
// fuller dataset.
|
||||
s.distLazyMu.Lock()
|
||||
s.distLazyBuilt = false
|
||||
s.distLazyOnce = sync.Once{}
|
||||
s.distLazyMu.Unlock()
|
||||
s.mu.Unlock()
|
||||
|
||||
s.backgroundLoadDone.Store(true)
|
||||
@@ -3809,6 +3840,82 @@ func (s *PacketStore) updateDistanceIndexForTxs(txs []*StoreTx) {
|
||||
}
|
||||
}
|
||||
|
||||
// DistanceIndexBuilt reports whether the distance analytics index has
|
||||
// been constructed. Used by tests and /api/perf to verify the lazy
|
||||
// build invariant from issue #1011 (eager Load() build removed).
|
||||
func (s *PacketStore) DistanceIndexBuilt() bool {
|
||||
s.distLazyMu.Lock()
|
||||
defer s.distLazyMu.Unlock()
|
||||
return s.distLazyBuilt
|
||||
}
|
||||
|
||||
// DistanceIndexBuilding reports whether a lazy distance-index build
|
||||
// is currently in flight. Used by the /api/analytics/distance handler
|
||||
// to serve 202 + Retry-After to concurrent first-window requests
|
||||
// (#1011).
|
||||
func (s *PacketStore) DistanceIndexBuilding() bool {
|
||||
s.distLazyMu.Lock()
|
||||
defer s.distLazyMu.Unlock()
|
||||
return s.distLazyBuilding
|
||||
}
|
||||
|
||||
// TriggerDistanceIndexBuild kicks off a lazy build of the distance
|
||||
// index in a background goroutine if one is not already running and
|
||||
// the debounce policy permits. Returns immediately. Idempotent:
|
||||
// concurrent callers see only one build, gated by sync.Once.
|
||||
//
|
||||
// Debounce policy (#1011 triage Fix path): rebuild if Δobs > 5% since
|
||||
// the last build OR at most once per 5 minutes — whichever is more
|
||||
// restrictive. The first-ever build always runs.
|
||||
func (s *PacketStore) TriggerDistanceIndexBuild() {
|
||||
s.distLazyMu.Lock()
|
||||
if s.distLazyBuilding {
|
||||
s.distLazyMu.Unlock()
|
||||
return
|
||||
}
|
||||
// Debounce: if a build has already completed, suppress re-trigger
|
||||
// unless Δobs > 5% or >5min has elapsed.
|
||||
if s.distLazyBuilt {
|
||||
s.mu.RLock()
|
||||
curObs := s.totalObs
|
||||
s.mu.RUnlock()
|
||||
elapsed := time.Since(s.distLazyLastBuilt)
|
||||
deltaPct := 0.0
|
||||
if s.distLazyLastObs > 0 {
|
||||
deltaPct = float64(curObs-s.distLazyLastObs) / float64(s.distLazyLastObs)
|
||||
}
|
||||
if elapsed < 5*time.Minute && deltaPct < 0.05 {
|
||||
s.distLazyMu.Unlock()
|
||||
return
|
||||
}
|
||||
// Reset the gate so a new build can fire.
|
||||
s.distLazyOnce = sync.Once{}
|
||||
s.distLazyBuilt = false
|
||||
}
|
||||
s.distLazyMu.Unlock()
|
||||
|
||||
// Fire-and-forget; sync.Once collapses concurrent goroutines into
|
||||
// a single build. The Once is reset above (under the mutex) before
|
||||
// each rebuild cycle.
|
||||
go s.distLazyOnce.Do(func() {
|
||||
s.distLazyMu.Lock()
|
||||
s.distLazyBuilding = true
|
||||
s.distLazyMu.Unlock()
|
||||
|
||||
s.mu.Lock()
|
||||
s.buildDistanceIndex()
|
||||
obsAtBuild := s.totalObs
|
||||
s.mu.Unlock()
|
||||
|
||||
s.distLazyMu.Lock()
|
||||
s.distLazyBuilding = false
|
||||
s.distLazyBuilt = true
|
||||
s.distLazyLastBuilt = time.Now()
|
||||
s.distLazyLastObs = obsAtBuild
|
||||
s.distLazyMu.Unlock()
|
||||
})
|
||||
}
|
||||
|
||||
// buildDistanceIndex precomputes haversine distances for all packets.
|
||||
// Must be called with s.mu held (Lock).
|
||||
func (s *PacketStore) buildDistanceIndex() {
|
||||
|
||||
Reference in New Issue
Block a user