diff --git a/cmd/server/coverage_test.go b/cmd/server/coverage_test.go index 82c426e9..1369553c 100644 --- a/cmd/server/coverage_test.go +++ b/cmd/server/coverage_test.go @@ -2289,6 +2289,10 @@ func TestSubpathPrecomputedIndex(t *testing.T) { defer db.Close() store := NewPacketStore(db, nil) store.Load() + // #1008: indexes built in background goroutine; wait before reading. + if !store.WaitIndexesReady(5 * time.Second) { + t.Fatal("indexes never became ready") + } // After Load(), the precomputed index must be populated. if len(store.spIndex) == 0 { @@ -2343,6 +2347,10 @@ func TestSubpathTxIndexPopulated(t *testing.T) { defer db.Close() store := NewPacketStore(db, nil) store.Load() + // #1008: indexes built in background goroutine; wait before reading. + if !store.WaitIndexesReady(5 * time.Second) { + t.Fatal("indexes never became ready") + } // spTxIndex must be populated alongside spIndex if len(store.spTxIndex) == 0 { @@ -2387,6 +2395,10 @@ func TestSubpathDetailMixedCaseHops(t *testing.T) { defer db.Close() store := NewPacketStore(db, nil) store.Load() + // #1008: indexes built in background goroutine; wait before reading. + if !store.WaitIndexesReady(5 * time.Second) { + t.Fatal("indexes never became ready") + } // Query with lowercase hops to establish baseline lower := store.GetSubpathDetail([]string{"eeff", "0011"}) diff --git a/cmd/server/index_ready_1008.go b/cmd/server/index_ready_1008.go new file mode 100644 index 00000000..70256c9e --- /dev/null +++ b/cmd/server/index_ready_1008.go @@ -0,0 +1,218 @@ +// Issue #1008: background-deferred subpath + pathHop index builds. +// +// Pattern mirrors the distance index (#1011) — but where distance is +// fully lazy (built on first request), these two indexes are kicked off +// eagerly by Load() in a background goroutine so HTTP becomes ready +// immediately while the indexes finish populating. +// +// Concurrency model: +// +// - subpathReady / pathHopReady are atomic.Bool flags written exactly +// once by the background builder (false → true) and never reset +// thereafter. Handlers read them via SubpathIndexReady() / +// PathHopIndexReady() before touching s.spIndex / s.spTxIndex / +// s.byPathHop. While a flag is false, the handler responds 503 + +// Retry-After: 5. +// +// - The builder itself acquires s.mu.Lock() and calls the existing +// buildSubpathIndex() / buildPathHopIndex() methods. Those methods +// replace s.spIndex / s.spTxIndex / s.byPathHop with freshly- +// allocated maps under the write lock. Visibility of the populated +// maps to handlers that see Ready()==true is guaranteed by Go's +// sync/atomic acquire-release semantics (formalized in Go 1.19): +// the atomic.Store(true) happens-after the s.mu.Unlock() that +// completes the build, and the handler's atomic.Load()==true +// synchronizes-with that store. The handler's subsequent s.mu.RLock +// is not what establishes visibility — it only serializes against +// concurrent ingest writers — so dropping the RLock would still be +// safe for the build's "populated map" snapshot (we keep it for +// ingest serialization). +// +// - Ingest-side incremental updates in StoreNewTransmissions / +// pruning / hash-collision paths continue to write s.spIndex / +// s.spTxIndex / s.byPathHop directly under s.mu.Lock(). Because +// the builder also runs under s.mu.Lock() and the builder +// overwrites whatever is there, the brief window between Load() +// returning and the goroutine acquiring s.mu means any +// concurrent ingest writes will be overwritten by the build — +// this matches the prior behavior where ingest could not start +// until Load() released s.mu, so in practice ingest does not +// run during the build window. Documenting this rather than +// adding a separate gate: the existing main.go boot sequence +// does not start ingest goroutines until after store.Load() +// and graph init complete. +// +// Handler scope of the ready gate (issue #1008 review M2): +// +// - HARD-GATED with 503 + Retry-After: 5 — analytics endpoints whose +// entire response is the index aggregate. Empty data would be +// visibly broken (charts, top-N tables). See routes.go: +// /api/analytics/subpaths, /api/analytics/subpaths-bulk, +// /api/analytics/subpath-detail, /api/nodes/{pubkey}/paths. +// +// - BEST-EFFORT (not gated) — endpoints where the index drives +// enrichment fields that callers already treat as optional. During +// the not-ready window these report zero counts / nil scores +// rather than 503-ing the whole list. Acceptable because: +// +// * /api/nodes and /api/nodes/{pubkey} have many other fields +// (last-seen, position, advert metadata) that callers depend +// on at startup. 503-ing the SPA bootstrap to wait for an +// index that exclusively affects "relay activity" badges +// would be a worse UX than a 30–60s window of "—" badges. +// +// * GetRepeaterRelayInfoMap / GetRepeaterUsefulnessScoreMap / +// GetBridgeScore / repeater_liveness / repeater_usefulness +// all walk s.byPathHop. During the build window they return +// empty maps or zero scores; the steady-state recomputer +// (#1262) refreshes them every 5min once indexes flip ready +// (prewarm guarded by WaitIndexesReady — see review M1). +// +// This is documented rather than gated so operators do not see +// /api/nodes 503 during routine restarts on Cascadia-scale data. +package main + +import ( + "log" + "net/http" + "time" +) + +// writeIndexLoading503 emits the standard 503 response used by handlers +// that depend on a not-yet-built index (#1008). Body shape matches the +// triage spec: {"error":"index loading","retryAfter":5}. The Retry-After +// header is also set so well-behaved clients back off automatically. +func writeIndexLoading503(w http.ResponseWriter) { + w.Header().Set("Retry-After", "5") + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte(`{"error":"index loading","retryAfter":5}`)) +} + +// SubpathIndexReady reports whether the subpath index build kicked off +// by Load() has completed (#1008). Until this returns true, callers +// must NOT read s.spIndex / s.spTxIndex. +func (s *PacketStore) SubpathIndexReady() bool { + return s.subpathReady.Load() +} + +// PathHopIndexReady reports whether the path-hop index build kicked +// off by Load() has completed (#1008). Until this returns true, +// callers must NOT read s.byPathHop. +func (s *PacketStore) PathHopIndexReady() bool { + return s.pathHopReady.Load() +} + +// indexReadyCh returns the channel that is closed when BOTH indexes +// have flipped ready. Lazily created on first access. Safe to call +// concurrently. Used by WaitIndexesReady and any future waiters that +// want event-driven semantics instead of polling. +func (s *PacketStore) indexReadyCh() <-chan struct{} { + s.indexReadyChMu.Lock() + defer s.indexReadyChMu.Unlock() + if s.indexReadyChan == nil { + s.indexReadyChan = make(chan struct{}) + // If both are already ready (e.g. background chunk loader + // flipped them synchronously before any waiter showed up), + // close immediately so the channel is usable as a one-shot. + if s.subpathReady.Load() && s.pathHopReady.Load() { + close(s.indexReadyChan) + } + } + return s.indexReadyChan +} + +// maybeCloseIndexReadyCh closes the ready channel iff both flags are +// set. Idempotent (a sync.Once on the channel) and safe to call from +// either builder goroutine on the green-path transitions, as well as +// from markIndexesReadySync. +func (s *PacketStore) maybeCloseIndexReadyCh() { + if !(s.subpathReady.Load() && s.pathHopReady.Load()) { + return + } + s.indexReadyChMu.Lock() + defer s.indexReadyChMu.Unlock() + if s.indexReadyChan == nil { + // Lazily allocate AND close it in one step so any future + // indexReadyCh() caller gets a pre-closed channel. + s.indexReadyChan = make(chan struct{}) + close(s.indexReadyChan) + return + } + select { + case <-s.indexReadyChan: + // Already closed. + default: + close(s.indexReadyChan) + } +} + +// startBackgroundIndexBuilds is called from Load() after s.loaded=true +// to populate the subpath + path-hop indexes off the critical path +// (#1008). It returns immediately; the work runs in two background +// goroutines (one per index — see review m7) that each acquire +// s.mu.Lock() independently, install their map, then set the +// corresponding atomic ready flag. +// +// At Cascadia scale (~5M observations) this previously blocked HTTP +// readiness ~60s inside Load() under s.mu. Running the two builds in +// parallel halves the pathHop-not-ready window since the two builders +// are independent of each other. +func (s *PacketStore) startBackgroundIndexBuilds() { + go func() { + t0 := time.Now() + s.mu.Lock() + s.buildSubpathIndex() + s.mu.Unlock() + // Atomic.Store happens-after s.mu.Unlock; handlers that + // observe Ready()==true synchronize-with this store. + s.subpathReady.Store(true) + s.maybeCloseIndexReadyCh() + log.Printf("[startup] index build complete: subpath (%s)", + time.Since(t0).Round(time.Millisecond)) + }() + go func() { + t1 := time.Now() + s.mu.Lock() + s.buildPathHopIndex() + s.mu.Unlock() + s.pathHopReady.Store(true) + s.maybeCloseIndexReadyCh() + log.Printf("[startup] index build complete: pathHop (%s)", + time.Since(t1).Round(time.Millisecond)) + }() +} + +// markIndexesReadySync is the synchronous-build entry point used by +// the background chunk loader in store.go (and by tests). The chunk +// loader rebuilds both indexes under s.mu.Lock(); after the Unlock it +// calls this to flip the ready flags and close the broadcast channel +// in one shot, preserving symmetry with the goroutine path above. +func (s *PacketStore) markIndexesReadySync() { + s.subpathReady.Store(true) + s.pathHopReady.Store(true) + s.maybeCloseIndexReadyCh() +} + +// WaitIndexesReady blocks until both background indexes built by +// startBackgroundIndexBuilds() report ready, or the deadline expires. +// Returns true if both flipped in time. Intended for tests that read +// s.spIndex / s.spTxIndex / s.byPathHop directly after Load(); production +// code paths gate via SubpathIndexReady() / PathHopIndexReady() and +// respond 503 + Retry-After to clients instead of blocking. +// +// Uses the indexReadyCh broadcast channel rather than polling +// (see review m6) so wake-up is immediate with no poll-interval jitter. +func (s *PacketStore) WaitIndexesReady(timeout time.Duration) bool { + if s.SubpathIndexReady() && s.PathHopIndexReady() { + return true + } + ch := s.indexReadyCh() + select { + case <-ch: + return true + case <-time.After(timeout): + return s.SubpathIndexReady() && s.PathHopIndexReady() + } +} + diff --git a/cmd/server/index_ready_1008_test.go b/cmd/server/index_ready_1008_test.go new file mode 100644 index 00000000..cf759f42 --- /dev/null +++ b/cmd/server/index_ready_1008_test.go @@ -0,0 +1,140 @@ +// Issue #1008: subpath + pathHop index builds must move off the +// synchronous Load() critical path into a background goroutine. +// +// Contract: +// 1. Immediately after Load() returns, SubpathIndexReady() and +// PathHopIndexReady() report false (the goroutine has not finished). +// 2. Analytics handlers that depend on those indices respond 503 with +// Retry-After: 5 until the corresponding ready flag flips true. +// 3. After the background build completes (waitable via a helper), +// both flags flip true and handlers respond 200. +package main + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +// TestIssue1008_SubpathIndexReadyFalseImmediatelyAfterLoad asserts the +// subpath ready flag is false the instant Load() returns. Red commit: the +// stub returns true → assertion fires. Green commit: the flag is owned by +// the background goroutine, which has not yet run, so the assertion holds. +func TestIssue1008_SubpathIndexReadyFalseImmediatelyAfterLoad(t *testing.T) { + db := setupRichTestDB(t) + defer db.Close() + store := NewPacketStore(db, nil) + if err := store.Load(); err != nil { + t.Fatalf("Load() error: %v", err) + } + if store.SubpathIndexReady() { + t.Fatal("expected SubpathIndexReady()==false immediately after Load(); want background-deferred build (#1008)") + } +} + +// TestIssue1008_PathHopIndexReadyFalseImmediatelyAfterLoad: same contract +// for the path-hop index. +func TestIssue1008_PathHopIndexReadyFalseImmediatelyAfterLoad(t *testing.T) { + db := setupRichTestDB(t) + defer db.Close() + store := NewPacketStore(db, nil) + if err := store.Load(); err != nil { + t.Fatalf("Load() error: %v", err) + } + if store.PathHopIndexReady() { + t.Fatal("expected PathHopIndexReady()==false immediately after Load(); want background-deferred build (#1008)") + } +} + +// TestIssue1008_HandlerReturns503WhileSubpathIndexLoading asserts the +// analytics/subpaths handler returns 503 + Retry-After: 5 + a JSON body +// matching the triage spec while the subpath index is still building. +func TestIssue1008_HandlerReturns503WhileSubpathIndexLoading(t *testing.T) { + db := setupRichTestDB(t) + defer db.Close() + store := NewPacketStore(db, nil) + if err := store.Load(); err != nil { + t.Fatalf("Load() error: %v", err) + } + // Don't wait for the background build — we want to observe the + // not-ready window. + srv := &Server{store: store} + + req := httptest.NewRequest("GET", "/api/analytics/subpaths?minLen=2&maxLen=4&limit=10", nil) + rec := httptest.NewRecorder() + srv.handleAnalyticsSubpaths(rec, req) + + if rec.Code != http.StatusServiceUnavailable { + t.Fatalf("status = %d, want 503 (subpath index loading, #1008)", rec.Code) + } + if got := rec.Header().Get("Retry-After"); got != "5" { + t.Errorf("Retry-After header = %q, want %q", got, "5") + } + var body map[string]interface{} + if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil { + t.Fatalf("body not valid JSON: %v (body=%s)", err, rec.Body.String()) + } + if body["error"] != "index loading" { + t.Errorf(`body["error"] = %v, want "index loading"`, body["error"]) + } +} + +// TestIssue1008_HandlerRecoversAfterIndexReady asserts that, once the +// background build completes, the handler returns 200. +func TestIssue1008_HandlerRecoversAfterIndexReady(t *testing.T) { + db := setupRichTestDB(t) + defer db.Close() + store := NewPacketStore(db, nil) + if err := store.Load(); err != nil { + t.Fatalf("Load() error: %v", err) + } + + // Wait up to 5s for both background builds to finish on this small + // fixture (rich test DB has ~3 packets; build is sub-millisecond). + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + if store.SubpathIndexReady() && store.PathHopIndexReady() { + break + } + time.Sleep(10 * time.Millisecond) + } + if !store.SubpathIndexReady() { + t.Fatal("SubpathIndexReady() never flipped true within 5s") + } + if !store.PathHopIndexReady() { + t.Fatal("PathHopIndexReady() never flipped true within 5s") + } + + srv := &Server{store: store} + req := httptest.NewRequest("GET", "/api/analytics/subpaths?minLen=2&maxLen=4&limit=10", nil) + rec := httptest.NewRecorder() + srv.handleAnalyticsSubpaths(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("status after ready = %d, want 200 (body=%s)", rec.Code, rec.Body.String()) + } +} + +// TestIssue1008_m7_BothFlagsSetAfterParallelStart verifies that the +// parallel two-goroutine version of startBackgroundIndexBuilds (review +// m7) sets BOTH ready flags after a bounded wait, regardless of which +// goroutine wins the race to s.mu.Lock(). Sanity check that breaking +// the two builds apart didn't drop the pathHop flag flip. +func TestIssue1008_m7_BothFlagsSetAfterParallelStart(t *testing.T) { + db := setupRichTestDB(t) + defer db.Close() + store := NewPacketStore(db, nil) + if err := store.Load(); err != nil { + t.Fatalf("Load: %v", err) + } + if !store.WaitIndexesReady(5 * time.Second) { + t.Fatal("indexes never ready after parallel start (#1008 m7)") + } + if !store.SubpathIndexReady() { + t.Error("subpath flag not set after WaitIndexesReady returned true") + } + if !store.PathHopIndexReady() { + t.Error("pathHop flag not set after WaitIndexesReady returned true") + } +} diff --git a/cmd/server/repeater_enrich_recomputer.go b/cmd/server/repeater_enrich_recomputer.go index 43967885..9532f8a7 100644 --- a/cmd/server/repeater_enrich_recomputer.go +++ b/cmd/server/repeater_enrich_recomputer.go @@ -15,6 +15,20 @@ import ( // plenty fresh for an at-a-glance status column. const repeaterEnrichmentRecomputerDefaultInterval = 5 * time.Minute +// repeaterEnrichmentPrewarmWait is the upper bound on how long the +// synchronous prewarm in StartRepeaterEnrichmentRecomputer will wait +// for the background subpath+pathHop index builds to flip ready before +// skipping the prewarm. Override in tests via the package-level var. +// +// Background (issue #1008 review M1): the prewarm computes against +// s.byPathHop. If the background index builds haven't finished, the +// snapshot is built against an empty map and locked into +// s.repeaterRelayCache for `interval` (default 5min) — every +// /api/nodes during that window would report relay_count_24h=0. We +// wait up to this deadline and, on timeout, skip the prewarm entirely +// so the next ticker fire (which will see ready=true) does the work. +var repeaterEnrichmentPrewarmWait = 60 * time.Second + // StartRepeaterEnrichmentRecomputer is the steady-state background // recompute loop for the repeater enrichment bulk caches consumed by // handleNodes (GetRepeaterRelayInfoMap + GetRepeaterUsefulnessScoreMap). @@ -55,7 +69,15 @@ func (s *PacketStore) StartRepeaterEnrichmentRecomputer(windowHours float64, int // is to make sure the very first /api/nodes?limit=2000 from // live.js's SPA bootstrap (issue #1262) hits a populated cache // instead of paying the on-thread rebuild cost. - recomputeRepeaterEnrichmentSafe(s, windowHours) + // + // Issue #1008 review M1: skip the prewarm if the background + // subpath+pathHop index builds haven't finished — otherwise we'd + // snapshot against an empty s.byPathHop and serve relay_count_24h=0 + // for the entire `interval` window. The next ticker fire will pick + // up the populated index. + if s.WaitIndexesReady(repeaterEnrichmentPrewarmWait) { + recomputeRepeaterEnrichmentSafe(s, windowHours) + } var stopOnce sync.Once go func() { diff --git a/cmd/server/repeater_enrich_recomputer_1008_test.go b/cmd/server/repeater_enrich_recomputer_1008_test.go new file mode 100644 index 00000000..0227e41a --- /dev/null +++ b/cmd/server/repeater_enrich_recomputer_1008_test.go @@ -0,0 +1,95 @@ +// Issue #1008 review M1: StartRepeaterEnrichmentRecomputer must wait +// for the background subpath+pathHop index builds before doing its +// synchronous prewarm — otherwise the prewarm reads an empty +// s.byPathHop and locks zeroed enrichment into s.repeaterRelayCache +// for the entire ticker interval. +package main + +import ( + "testing" + "time" +) + +// TestIssue1008_M1_PrewarmWaitsForIndexes asserts that when the index +// ready flags are FALSE at the moment StartRepeaterEnrichmentRecomputer +// is called, the synchronous prewarm does NOT populate +// repeaterRelayCache (it either waits for ready, or skips). Without the +// fix the prewarm runs immediately against empty byPathHop and the +// cache becomes non-nil. +func TestIssue1008_M1_PrewarmWaitsForIndexes(t *testing.T) { + db := setupRichTestDB(t) + defer db.Close() + store := NewPacketStore(db, nil) + if err := store.Load(); err != nil { + t.Fatalf("Load: %v", err) + } + // Wait for the background builder to finish so it can't race past + // our Store(false) below. Once it's done it won't write the flags + // again, so flipping them back to false is stable. + if !store.WaitIndexesReady(5 * time.Second) { + t.Fatal("background builds never finished") + } + // Force the ready flags back to false to simulate the race where + // the recomputer is started before background builds finish. Also + // reset the broadcast channel — it was closed when the background + // builder flipped both flags true; if we left it closed, + // WaitIndexesReady would return immediately on the channel select + // (correct for production semantics where flags never reset, + // wrong for this synthetic test). + store.subpathReady.Store(false) + store.pathHopReady.Store(false) + store.indexReadyChMu.Lock() + store.indexReadyChan = nil + store.indexReadyChMu.Unlock() + + // Use a tiny wait so the test runs fast. With the fix in place the + // prewarm should time out waiting for ready and SKIP, leaving the + // cache untouched. Without the fix it would compute immediately + // against the empty byPathHop. + prev := repeaterEnrichmentPrewarmWait + repeaterEnrichmentPrewarmWait = 50 * time.Millisecond + defer func() { repeaterEnrichmentPrewarmWait = prev }() + + stop := store.StartRepeaterEnrichmentRecomputer(24, time.Hour) + defer stop() + + // Give the prewarm time to complete (or to skip). + time.Sleep(150 * time.Millisecond) + + store.repeaterEnrichMu.Lock() + cached := store.repeaterRelayCache + at := store.repeaterRelayAt + store.repeaterEnrichMu.Unlock() + + if cached != nil || !at.IsZero() { + t.Fatalf("expected prewarm to SKIP when indexes not ready (cache==nil, at==zero); got cache=%v at=%v (#1008 M1)", + cached != nil, at) + } +} + +// TestIssue1008_M1_PrewarmRunsWhenReady asserts the prewarm still runs +// (cache populated) when the indexes are already ready. +func TestIssue1008_M1_PrewarmRunsWhenReady(t *testing.T) { + db := setupRichTestDB(t) + defer db.Close() + store := NewPacketStore(db, nil) + if err := store.Load(); err != nil { + t.Fatalf("Load: %v", err) + } + if !store.WaitIndexesReady(5 * time.Second) { + t.Fatal("indexes never ready") + } + + stop := store.StartRepeaterEnrichmentRecomputer(24, time.Hour) + defer stop() + + // Prewarm is synchronous on the caller's goroutine, so after + // Start returns the cache must be populated. + store.repeaterEnrichMu.Lock() + at := store.repeaterRelayAt + store.repeaterEnrichMu.Unlock() + + if at.IsZero() { + t.Fatal("expected prewarm to populate repeaterRelayAt when indexes ready (#1008 M1)") + } +} diff --git a/cmd/server/routes.go b/cmd/server/routes.go index b4de7c15..3dfc1734 100644 --- a/cmd/server/routes.go +++ b/cmd/server/routes.go @@ -1525,6 +1525,10 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) { writeError(w, 503, "Packet store unavailable") return } + if !s.store.PathHopIndexReady() { + writeIndexLoading503(w) + return + } // Use the precomputed byPathHop index instead of scanning all packets. // Look up by full pubkey (resolved hops) and by short prefixes (raw hops). @@ -2093,6 +2097,10 @@ func (s *Server) handleAnalyticsHashCollisions(w http.ResponseWriter, r *http.Re func (s *Server) handleAnalyticsSubpaths(w http.ResponseWriter, r *http.Request) { if s.store != nil { + if !s.store.SubpathIndexReady() { + writeIndexLoading503(w) + return + } region := r.URL.Query().Get("region") minLen := queryInt(r, "minLen", 2) if minLen < 2 { @@ -2119,6 +2127,10 @@ func (s *Server) handleAnalyticsSubpaths(w http.ResponseWriter, r *http.Request) // response, avoiding repeated scans of the same packet data. Query format: // ?groups=2-2:50,3-3:30,4-4:20,5-8:15 (minLen-maxLen:limit per group) func (s *Server) handleAnalyticsSubpathsBulk(w http.ResponseWriter, r *http.Request) { + if s.store != nil && !s.store.SubpathIndexReady() { + writeIndexLoading503(w) + return + } region := r.URL.Query().Get("region") groupsParam := r.URL.Query().Get("groups") if groupsParam == "" { @@ -2198,6 +2210,10 @@ func (s *Server) handleAnalyticsSubpathDetail(w http.ResponseWriter, r *http.Req } } if s.store != nil { + if !s.store.SubpathIndexReady() { + writeIndexLoading503(w) + return + } writeJSON(w, s.store.GetSubpathDetail(rawHops)) return } diff --git a/cmd/server/routes_test.go b/cmd/server/routes_test.go index 6840274e..5ad9db06 100644 --- a/cmd/server/routes_test.go +++ b/cmd/server/routes_test.go @@ -27,6 +27,12 @@ func setupTestServer(t *testing.T) (*Server, *mux.Router) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + // #1008: Load() now defers subpath + path-hop index builds to a + // background goroutine. Wait for them before handlers go live so + // the existing assertions (which expect 200, not 503) hold. + if !store.WaitIndexesReady(5 * time.Second) { + t.Fatalf("background indexes never became ready") + } srv.store = store router := mux.NewRouter() srv.RegisterRoutes(router) @@ -44,6 +50,10 @@ func setupTestServerWithAPIKey(t *testing.T, apiKey string) (*Server, *mux.Route if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + // #1008: see setupTestServer comment. + if !store.WaitIndexesReady(5 * time.Second) { + t.Fatalf("background indexes never became ready") + } srv.store = store router := mux.NewRouter() srv.RegisterRoutes(router) @@ -1079,6 +1089,7 @@ func TestChannelMessagesWithRegion(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) srv.store = store router := mux.NewRouter() srv.RegisterRoutes(router) @@ -1332,6 +1343,7 @@ func TestResolveHopsAmbiguous(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) srv.store = store router := mux.NewRouter() srv.RegisterRoutes(router) @@ -1578,6 +1590,7 @@ func TestNodeAnalyticsNoNameNode(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) srv.store = store router := mux.NewRouter() srv.RegisterRoutes(router) @@ -1614,6 +1627,7 @@ func TestNodeHealthForNoNameNode(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) srv.store = store router := mux.NewRouter() srv.RegisterRoutes(router) @@ -2250,6 +2264,7 @@ store := NewPacketStore(db, nil) if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } +store.WaitIndexesReady(5 * time.Second) pk := "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890" db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'TestNode', 'repeater')", pk) @@ -2298,6 +2313,7 @@ store := NewPacketStore(db, nil) if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } +store.WaitIndexesReady(5 * time.Second) pk := "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'Repeater2B', 'repeater')", pk) @@ -2341,6 +2357,7 @@ func TestGetNodeHashSizeInfoLatestWins(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) pk := "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'LatestWins', 'repeater')", pk) @@ -2390,6 +2407,7 @@ func TestGetNodeHashSizeInfoIgnoreDirectZeroHop(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) pk := "dddd111122223333444455556666777788889999aaaabbbbccccddddeeee3333" db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'DirIgnore', 'repeater')", pk) @@ -2437,6 +2455,7 @@ func TestGetNodeHashSizeInfoOnlyDirectZeroHopIgnored(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) pk := "eeee111122223333444455556666777788889999aaaabbbbccccddddeeee4444" db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'OnlyDirect', 'repeater')", pk) @@ -2471,6 +2490,7 @@ func TestGetNodeHashSizeInfoDirectNonZeroHopCounted(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) pk := "ffff111122223333444455556666777788889999aaaabbbbccccddddeeee5555" db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'DirNonZero', 'repeater')", pk) @@ -2510,6 +2530,7 @@ func TestGetNodeHashSizeInfoNoAdverts(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) pk := "dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd" db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'NoAdverts', 'repeater')", pk) @@ -2543,6 +2564,7 @@ func TestHashAnalyticsZeroHopAdvert(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) // Capture baseline from seed data (bypass cache via computeAnalyticsHashSizes) baseline := store.computeAnalyticsHashSizes("", "") @@ -2602,6 +2624,7 @@ func TestAnalyticsHashSizeSameNameDifferentPubkey(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) pk1 := "aaaa111122223333444455556666777788889999aaaabbbbccccddddeeee1111" pk2 := "aaaa111122223333444455556666777788889999aaaabbbbccccddddeeee2222" @@ -2670,6 +2693,7 @@ func TestInconsistentNodesExcludesCompanions(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) now := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") payloadType := 4 @@ -2753,6 +2777,7 @@ func TestHashSizeInfoTimeWindow(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) pk := "dd44444444444444444444444444444444444444444444444444444444444444" db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'OldNode', 'repeater')", pk) @@ -2924,6 +2949,7 @@ func TestLatestSeenMaintained(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) store.mu.RLock() defer store.mu.RUnlock() @@ -2988,6 +3014,7 @@ func TestQueryGroupedPacketsSortedByLatest(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) result := store.QueryGroupedPackets(PacketQuery{Limit: 50}) if result.Total < 2 { @@ -3025,6 +3052,7 @@ func TestQueryGroupedPacketsCacheReturnsConsistentResult(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) q := PacketQuery{Limit: 50} r1 := store.QueryGroupedPackets(q) @@ -3054,6 +3082,7 @@ func TestGetChannelsCacheReturnsConsistentResult(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) r1 := store.GetChannels("") r2 := store.GetChannels("") @@ -3092,6 +3121,7 @@ func TestGetChannelsNotBlockedByLargeLock(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) channels := store.GetChannels("") @@ -3328,6 +3358,7 @@ func TestHashCollisionsCacheTTL(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) if store.collisionCacheTTL != 3600*time.Second { t.Errorf("expected collisionCacheTTL=3600s, got %v", store.collisionCacheTTL) @@ -3372,6 +3403,7 @@ func TestHashCollisionsEmptyStore(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) srv.store = store router := mux.NewRouter() srv.RegisterRoutes(router) @@ -3424,6 +3456,7 @@ func TestHashCollisionsWithCollision(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) // Inject hash_size=1 for both nodes so they appear in the 1-byte bucket store.hashSizeInfoMu.Lock() store.hashSizeInfoCache = map[string]*hashSizeNodeInfo{ @@ -3490,6 +3523,7 @@ func TestHashCollisionsShortPublicKey(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) srv.store = store router := mux.NewRouter() srv.RegisterRoutes(router) @@ -3522,6 +3556,7 @@ func TestHashCollisionsMissingCoordinates(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + store.WaitIndexesReady(5 * time.Second) srv.store = store router := mux.NewRouter() srv.RegisterRoutes(router) @@ -3774,6 +3809,11 @@ func TestNodePathsPrefixCollisionFilter(t *testing.T) { if err := store.Load(); err != nil { t.Fatalf("store.Load failed: %v", err) } + // #1008: wait for the background index build to complete before + // hitting the handler (otherwise it returns 503 index-loading). + if !store.WaitIndexesReady(5 * time.Second) { + t.Fatal("indexes never became ready") + } srv.store = store // Query paths for TestRepeater — should NOT include the collision packet diff --git a/cmd/server/store.go b/cmd/server/store.go index 126839f7..2fc4f8bd 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -247,6 +247,19 @@ type PacketStore struct { spIndex map[string]int // "hop1,hop2" → count spTxIndex map[string][]*StoreTx // "hop1,hop2" → transmissions containing this subpath spTotalPaths int // transmissions with paths >= 2 hops + // Background-build ready gates for spIndex/spTxIndex and byPathHop + // (#1008). Flipped from false→true exactly once by the goroutine + // kicked off in Load() (or synchronously by the background chunk + // loader). Handlers gate reads via SubpathIndexReady() / + // PathHopIndexReady(); while false, they respond 503 + Retry-After. + subpathReady atomic.Bool + pathHopReady atomic.Bool + // indexReadyChan is closed exactly once when BOTH subpathReady + // and pathHopReady are true (#1008 review m6). Replaces the + // previous 2ms poll in WaitIndexesReady. Lazily allocated by + // indexReadyCh / maybeCloseIndexReadyCh in index_ready_1008.go. + indexReadyChMu sync.Mutex + indexReadyChan chan struct{} // Precomputed distance analytics: hop distances and path totals. // Built LAZILY on first /api/analytics/distance request (#1011) — // previously eager in Load() at startup, which was O(n²) work for @@ -837,10 +850,15 @@ func (s *PacketStore) Load() error { } // Build precomputed subpath index for O(1) analytics queries - s.buildSubpathIndex() + // — DEFERRED to a background goroutine (#1008). Same rationale + // as the distance index (#1011): synchronous build under s.mu + // blocks HTTP readiness ~60s at Cascadia scale. The goroutine is + // started AFTER s.loaded = true below. + // s.buildSubpathIndex() // Build path-hop index for O(1) node path lookups - s.buildPathHopIndex() + // — DEFERRED to a background goroutine (#1008). + // s.buildPathHopIndex() // Precompute distance analytics (hop distances, path totals) // — DEFERRED to first /api/analytics/distance request (#1011). @@ -869,6 +887,12 @@ func (s *PacketStore) Load() error { len(s.packets), s.totalObs, elapsed, s.trackedMemoryMB(), s.estimatedMemoryMB()) } s.loadMultibyteCapFromDB() + // Kick off background subpath + path-hop index builds (#1008). + // The goroutine acquires s.mu.Lock() and so will block until Load's + // deferred Unlock fires when this function returns. HTTP handlers + // gate reads behind SubpathIndexReady() / PathHopIndexReady() and + // respond 503 + Retry-After: 5 until the builds finish. + s.startBackgroundIndexBuilds() return nil } @@ -1282,6 +1306,13 @@ func (s *PacketStore) loadBackgroundChunks() { s.distLazyOnce = sync.Once{} s.distLazyMu.Unlock() s.mu.Unlock() + // #1008 review m3: flip the ready flags after the synchronous + // rebuild for symmetry with startBackgroundIndexBuilds. Safe + // today because the chunk loader runs after Load() has already + // kicked the goroutines that set these to true; this is a + // belt-and-suspenders against a future reorder where the chunk + // loader could be the first writer. + s.markIndexesReadySync() s.backgroundLoadDone.Store(true) if chunkErrors > 0 {