perf(load): background subpath+pathHop index builds with ready gates (#1008) (#1604)

## Summary

Mirrors the distance-index lazy pattern (#1011): the subpath and
path-hop index builds are no longer part of `Load()`'s synchronous
critical section. They now run in **two parallel background goroutines**
kicked off after `s.loaded = true`, so HTTP comes up immediately even at
Cascadia scale (5M observations, previously ~60s blocked on these two
builds inside `Load()` under `s.mu`).

Fixes #1008.

## Approach

Two new `atomic.Bool` fields on `PacketStore` (`subpathReady`,
`pathHopReady`) plus a one-shot broadcast channel (`indexReadyChan`) for
waiters. `Load()` removes the synchronous `s.buildSubpathIndex()` /
`s.buildPathHopIndex()` calls and instead kicks
`s.startBackgroundIndexBuilds()` right before returning. That function
spawns **two independent goroutines** (review m7), one per index. Each
goroutine:

1. acquires `s.mu.Lock()` (blocks until `Load()`'s deferred Unlock
fires),
2. runs its builder, releases the lock, stores its `ready = true`,
3. closes the broadcast channel if both flags are now true,
4. logs `[startup] index build complete: subpath (Xs)` (or pathHop).

Analytics handlers whose entire response IS the index aggregate —
`/api/analytics/subpaths`, `/api/analytics/subpaths-bulk`,
`/api/analytics/subpath-detail`, `/api/nodes/{pubkey}/paths` — gate
reads behind the corresponding atomic and respond with `503 Service
Unavailable`, `Retry-After: 5`, body `{"error":"index
loading","retryAfter":5}` until the build completes — matching the
triage spec.

### Handler scope (review M2)

A second class of handlers also touches these indexes — `/api/nodes`,
`/api/nodes/{pubkey}`, the `GetRepeaterRelayInfoMap` /
`GetRepeaterUsefulnessScoreMap` / `GetBridgeScore` enrichment helpers,
and `repeater_liveness` / `repeater_usefulness`. These are
**intentionally NOT 503-gated**: they expose the index via optional
enrichment fields that callers already treat as "may be empty", and
503-ing the SPA bootstrap to wait for an index that only affects
relay-activity badges would be a worse UX than a 30–60s window of "—"
values. The rationale is documented in the package doc-comment at the
top of `index_ready_1008.go`.

The recomputer's synchronous prewarm path
(`StartRepeaterEnrichmentRecomputer`) gates on `WaitIndexesReady(60s)`
(review M1) so it never snapshots an empty `byPathHop` into
`s.repeaterRelayCache`; on timeout it skips the prewarm and lets the
5-minute ticker pick up the populated index.

## Concurrency safety

Each build goroutine acquires `s.mu.Lock()` before calling the existing
`buildSubpathIndex()` / `buildPathHopIndex()` helpers, which replace
`s.spIndex` / `s.spTxIndex` / `s.byPathHop` with freshly-allocated maps.
Visibility of the populated maps to handlers that observe
`Ready()==true` is established by Go 1.19+ sync/atomic acquire-release
semantics: the atomic store of `true` happens-after `s.mu.Unlock()`, and
the handler's atomic load synchronizes-with that store. The handler's
subsequent `s.mu.RLock` serializes against concurrent ingest writers,
not against the builder.

The existing `main.go` boot sequence does not start ingest goroutines
until after `store.Load()` returns and graph init completes, so the
brief window between `Load()` returning and the two goroutines acquiring
`s.mu` does not race with concurrent ingest writes.

## TDD: red → green

- **Red** commit `63e79e11`: `cmd/server/index_ready_1008_test.go` adds
four assertions; `cmd/server/index_ready_1008.go` adds compile-only
stubs returning `true` so the tests fail on assertions, not build
errors.
- **Green** commit `fb1d22b0`: implements the real atomic gates, the
background goroutine, and the four handler 503 branches; also updates
four existing tests that read indexes directly post-`Load()` to call
`store.WaitIndexesReady(5s)` first.
- **Race-fix commit `b77d56eb`** (review m8 — test-infra exemption):
adds `WaitIndexesReady` calls in test helpers/setup paths so the race
detector no longer flags the read-after-Load() pattern in existing
tests. Per AGENTS.md, race-detector flakes are observable evidence (test
crashes under `-race`) and qualify for the test-infra exemption from the
TDD red-commit requirement; no behavior change in production code.
- **Polish round 2 — M1 red `408c7462` / green `85e82c8a`**:
`TestIssue1008_M1_PrewarmWaitsForIndexes` asserts the recomputer prewarm
SKIPs when indexes are not ready. Red commit adds the assertion + a stub
`repeaterEnrichmentPrewarmWait` var; green commit wires
`WaitIndexesReady` into the prewarm path and adds the handler-scope docs
for M2.
- **Polish round 2 — minor cleanups `fd089bd0`** (m3..m7): chunk-loader
wires `markIndexesReadySync`, memory-model comment rewritten to cite
acquire-release, sentinel deleted, polling replaced with a broadcast
channel, two parallel goroutines for the builds.
`TestIssue1008_m7_BothFlagsSetAfterParallelStart` covers the parallel
path.

## Reproduction

```
git fetch origin fix/issue-1008
git checkout 63e79e11   # red commit
cd cmd/server && go test -run TestIssue1008_ -count=1 .   # FAILs

git checkout fix/issue-1008   # latest green
cd cmd/server && go test -run TestIssue1008 -count=1 -race .   # all pass
cd cmd/server && go test -count=1 -race -short ./...           # full suite ok
```

## Files changed

| file | role |
|---|---|
| `cmd/server/store.go` | atomic.Bool fields + indexReadyChan broadcast
field; remove sync build calls in Load(); kick goroutines; wire
markIndexesReadySync from chunk loader |
| `cmd/server/index_ready_1008.go` | ready flags, two-goroutine
background builds, 503 helper, channel-based WaitIndexesReady,
handler-scope docs |
| `cmd/server/index_ready_1008_test.go` | red-commit contract tests +
parallel-start assertion |
| `cmd/server/repeater_enrich_recomputer.go` | gate prewarm on
WaitIndexesReady (M1) |
| `cmd/server/repeater_enrich_recomputer_1008_test.go` | M1 red+green
assertions |
| `cmd/server/routes.go` | 503 gate on 4 analytics handlers |
| `cmd/server/routes_test.go` | setup helpers wait for ready; collision
test waits |
| `cmd/server/coverage_test.go` | three tests wait for ready before
reading indexes |

## Out of scope

- Distance index (already deferred in #1011) — untouched.
- The `pickBestObservation` + `indexByNode` per-tx loop in `Load()` —
kept synchronous per triage Findings (ordering-sensitive,
contiguous-memory, fast).

---------

Co-authored-by: bot <bot@noreply.local>
Co-authored-by: openclaw-bot <bot@openclaw.local>
Co-authored-by: mc-bot <mc-bot@users.noreply.github.com>
This commit is contained in:
Kpa-clawbot
2026-06-06 20:46:42 -07:00
committed by GitHub
parent 3898688d6d
commit df61660a5e
8 changed files with 577 additions and 3 deletions
+12
View File
@@ -2289,6 +2289,10 @@ func TestSubpathPrecomputedIndex(t *testing.T) {
defer db.Close()
store := NewPacketStore(db, nil)
store.Load()
// #1008: indexes built in background goroutine; wait before reading.
if !store.WaitIndexesReady(5 * time.Second) {
t.Fatal("indexes never became ready")
}
// After Load(), the precomputed index must be populated.
if len(store.spIndex) == 0 {
@@ -2343,6 +2347,10 @@ func TestSubpathTxIndexPopulated(t *testing.T) {
defer db.Close()
store := NewPacketStore(db, nil)
store.Load()
// #1008: indexes built in background goroutine; wait before reading.
if !store.WaitIndexesReady(5 * time.Second) {
t.Fatal("indexes never became ready")
}
// spTxIndex must be populated alongside spIndex
if len(store.spTxIndex) == 0 {
@@ -2387,6 +2395,10 @@ func TestSubpathDetailMixedCaseHops(t *testing.T) {
defer db.Close()
store := NewPacketStore(db, nil)
store.Load()
// #1008: indexes built in background goroutine; wait before reading.
if !store.WaitIndexesReady(5 * time.Second) {
t.Fatal("indexes never became ready")
}
// Query with lowercase hops to establish baseline
lower := store.GetSubpathDetail([]string{"eeff", "0011"})
+218
View File
@@ -0,0 +1,218 @@
// Issue #1008: background-deferred subpath + pathHop index builds.
//
// Pattern mirrors the distance index (#1011) — but where distance is
// fully lazy (built on first request), these two indexes are kicked off
// eagerly by Load() in a background goroutine so HTTP becomes ready
// immediately while the indexes finish populating.
//
// Concurrency model:
//
// - subpathReady / pathHopReady are atomic.Bool flags written exactly
// once by the background builder (false → true) and never reset
// thereafter. Handlers read them via SubpathIndexReady() /
// PathHopIndexReady() before touching s.spIndex / s.spTxIndex /
// s.byPathHop. While a flag is false, the handler responds 503 +
// Retry-After: 5.
//
// - The builder itself acquires s.mu.Lock() and calls the existing
// buildSubpathIndex() / buildPathHopIndex() methods. Those methods
// replace s.spIndex / s.spTxIndex / s.byPathHop with freshly-
// allocated maps under the write lock. Visibility of the populated
// maps to handlers that see Ready()==true is guaranteed by Go's
// sync/atomic acquire-release semantics (formalized in Go 1.19):
// the atomic.Store(true) happens-after the s.mu.Unlock() that
// completes the build, and the handler's atomic.Load()==true
// synchronizes-with that store. The handler's subsequent s.mu.RLock
// is not what establishes visibility — it only serializes against
// concurrent ingest writers — so dropping the RLock would still be
// safe for the build's "populated map" snapshot (we keep it for
// ingest serialization).
//
// - Ingest-side incremental updates in StoreNewTransmissions /
// pruning / hash-collision paths continue to write s.spIndex /
// s.spTxIndex / s.byPathHop directly under s.mu.Lock(). Because
// the builder also runs under s.mu.Lock() and the builder
// overwrites whatever is there, the brief window between Load()
// returning and the goroutine acquiring s.mu means any
// concurrent ingest writes will be overwritten by the build —
// this matches the prior behavior where ingest could not start
// until Load() released s.mu, so in practice ingest does not
// run during the build window. Documenting this rather than
// adding a separate gate: the existing main.go boot sequence
// does not start ingest goroutines until after store.Load()
// and graph init complete.
//
// Handler scope of the ready gate (issue #1008 review M2):
//
// - HARD-GATED with 503 + Retry-After: 5 — analytics endpoints whose
// entire response is the index aggregate. Empty data would be
// visibly broken (charts, top-N tables). See routes.go:
// /api/analytics/subpaths, /api/analytics/subpaths-bulk,
// /api/analytics/subpath-detail, /api/nodes/{pubkey}/paths.
//
// - BEST-EFFORT (not gated) — endpoints where the index drives
// enrichment fields that callers already treat as optional. During
// the not-ready window these report zero counts / nil scores
// rather than 503-ing the whole list. Acceptable because:
//
// * /api/nodes and /api/nodes/{pubkey} have many other fields
// (last-seen, position, advert metadata) that callers depend
// on at startup. 503-ing the SPA bootstrap to wait for an
// index that exclusively affects "relay activity" badges
// would be a worse UX than a 3060s window of "—" badges.
//
// * GetRepeaterRelayInfoMap / GetRepeaterUsefulnessScoreMap /
// GetBridgeScore / repeater_liveness / repeater_usefulness
// all walk s.byPathHop. During the build window they return
// empty maps or zero scores; the steady-state recomputer
// (#1262) refreshes them every 5min once indexes flip ready
// (prewarm guarded by WaitIndexesReady — see review M1).
//
// This is documented rather than gated so operators do not see
// /api/nodes 503 during routine restarts on Cascadia-scale data.
package main
import (
"log"
"net/http"
"time"
)
// writeIndexLoading503 emits the standard 503 response used by handlers
// that depend on a not-yet-built index (#1008). Body shape matches the
// triage spec: {"error":"index loading","retryAfter":5}. The Retry-After
// header is also set so well-behaved clients back off automatically.
func writeIndexLoading503(w http.ResponseWriter) {
w.Header().Set("Retry-After", "5")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(`{"error":"index loading","retryAfter":5}`))
}
// SubpathIndexReady reports whether the subpath index build kicked off
// by Load() has completed (#1008). Until this returns true, callers
// must NOT read s.spIndex / s.spTxIndex.
func (s *PacketStore) SubpathIndexReady() bool {
return s.subpathReady.Load()
}
// PathHopIndexReady reports whether the path-hop index build kicked
// off by Load() has completed (#1008). Until this returns true,
// callers must NOT read s.byPathHop.
func (s *PacketStore) PathHopIndexReady() bool {
return s.pathHopReady.Load()
}
// indexReadyCh returns the channel that is closed when BOTH indexes
// have flipped ready. Lazily created on first access. Safe to call
// concurrently. Used by WaitIndexesReady and any future waiters that
// want event-driven semantics instead of polling.
func (s *PacketStore) indexReadyCh() <-chan struct{} {
s.indexReadyChMu.Lock()
defer s.indexReadyChMu.Unlock()
if s.indexReadyChan == nil {
s.indexReadyChan = make(chan struct{})
// If both are already ready (e.g. background chunk loader
// flipped them synchronously before any waiter showed up),
// close immediately so the channel is usable as a one-shot.
if s.subpathReady.Load() && s.pathHopReady.Load() {
close(s.indexReadyChan)
}
}
return s.indexReadyChan
}
// maybeCloseIndexReadyCh closes the ready channel iff both flags are
// set. Idempotent (a sync.Once on the channel) and safe to call from
// either builder goroutine on the green-path transitions, as well as
// from markIndexesReadySync.
func (s *PacketStore) maybeCloseIndexReadyCh() {
if !(s.subpathReady.Load() && s.pathHopReady.Load()) {
return
}
s.indexReadyChMu.Lock()
defer s.indexReadyChMu.Unlock()
if s.indexReadyChan == nil {
// Lazily allocate AND close it in one step so any future
// indexReadyCh() caller gets a pre-closed channel.
s.indexReadyChan = make(chan struct{})
close(s.indexReadyChan)
return
}
select {
case <-s.indexReadyChan:
// Already closed.
default:
close(s.indexReadyChan)
}
}
// startBackgroundIndexBuilds is called from Load() after s.loaded=true
// to populate the subpath + path-hop indexes off the critical path
// (#1008). It returns immediately; the work runs in two background
// goroutines (one per index — see review m7) that each acquire
// s.mu.Lock() independently, install their map, then set the
// corresponding atomic ready flag.
//
// At Cascadia scale (~5M observations) this previously blocked HTTP
// readiness ~60s inside Load() under s.mu. Running the two builds in
// parallel halves the pathHop-not-ready window since the two builders
// are independent of each other.
func (s *PacketStore) startBackgroundIndexBuilds() {
go func() {
t0 := time.Now()
s.mu.Lock()
s.buildSubpathIndex()
s.mu.Unlock()
// Atomic.Store happens-after s.mu.Unlock; handlers that
// observe Ready()==true synchronize-with this store.
s.subpathReady.Store(true)
s.maybeCloseIndexReadyCh()
log.Printf("[startup] index build complete: subpath (%s)",
time.Since(t0).Round(time.Millisecond))
}()
go func() {
t1 := time.Now()
s.mu.Lock()
s.buildPathHopIndex()
s.mu.Unlock()
s.pathHopReady.Store(true)
s.maybeCloseIndexReadyCh()
log.Printf("[startup] index build complete: pathHop (%s)",
time.Since(t1).Round(time.Millisecond))
}()
}
// markIndexesReadySync is the synchronous-build entry point used by
// the background chunk loader in store.go (and by tests). The chunk
// loader rebuilds both indexes under s.mu.Lock(); after the Unlock it
// calls this to flip the ready flags and close the broadcast channel
// in one shot, preserving symmetry with the goroutine path above.
func (s *PacketStore) markIndexesReadySync() {
s.subpathReady.Store(true)
s.pathHopReady.Store(true)
s.maybeCloseIndexReadyCh()
}
// WaitIndexesReady blocks until both background indexes built by
// startBackgroundIndexBuilds() report ready, or the deadline expires.
// Returns true if both flipped in time. Intended for tests that read
// s.spIndex / s.spTxIndex / s.byPathHop directly after Load(); production
// code paths gate via SubpathIndexReady() / PathHopIndexReady() and
// respond 503 + Retry-After to clients instead of blocking.
//
// Uses the indexReadyCh broadcast channel rather than polling
// (see review m6) so wake-up is immediate with no poll-interval jitter.
func (s *PacketStore) WaitIndexesReady(timeout time.Duration) bool {
if s.SubpathIndexReady() && s.PathHopIndexReady() {
return true
}
ch := s.indexReadyCh()
select {
case <-ch:
return true
case <-time.After(timeout):
return s.SubpathIndexReady() && s.PathHopIndexReady()
}
}
+140
View File
@@ -0,0 +1,140 @@
// Issue #1008: subpath + pathHop index builds must move off the
// synchronous Load() critical path into a background goroutine.
//
// Contract:
// 1. Immediately after Load() returns, SubpathIndexReady() and
// PathHopIndexReady() report false (the goroutine has not finished).
// 2. Analytics handlers that depend on those indices respond 503 with
// Retry-After: 5 until the corresponding ready flag flips true.
// 3. After the background build completes (waitable via a helper),
// both flags flip true and handlers respond 200.
package main
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
)
// TestIssue1008_SubpathIndexReadyFalseImmediatelyAfterLoad asserts the
// subpath ready flag is false the instant Load() returns. Red commit: the
// stub returns true → assertion fires. Green commit: the flag is owned by
// the background goroutine, which has not yet run, so the assertion holds.
func TestIssue1008_SubpathIndexReadyFalseImmediatelyAfterLoad(t *testing.T) {
db := setupRichTestDB(t)
defer db.Close()
store := NewPacketStore(db, nil)
if err := store.Load(); err != nil {
t.Fatalf("Load() error: %v", err)
}
if store.SubpathIndexReady() {
t.Fatal("expected SubpathIndexReady()==false immediately after Load(); want background-deferred build (#1008)")
}
}
// TestIssue1008_PathHopIndexReadyFalseImmediatelyAfterLoad: same contract
// for the path-hop index.
func TestIssue1008_PathHopIndexReadyFalseImmediatelyAfterLoad(t *testing.T) {
db := setupRichTestDB(t)
defer db.Close()
store := NewPacketStore(db, nil)
if err := store.Load(); err != nil {
t.Fatalf("Load() error: %v", err)
}
if store.PathHopIndexReady() {
t.Fatal("expected PathHopIndexReady()==false immediately after Load(); want background-deferred build (#1008)")
}
}
// TestIssue1008_HandlerReturns503WhileSubpathIndexLoading asserts the
// analytics/subpaths handler returns 503 + Retry-After: 5 + a JSON body
// matching the triage spec while the subpath index is still building.
func TestIssue1008_HandlerReturns503WhileSubpathIndexLoading(t *testing.T) {
db := setupRichTestDB(t)
defer db.Close()
store := NewPacketStore(db, nil)
if err := store.Load(); err != nil {
t.Fatalf("Load() error: %v", err)
}
// Don't wait for the background build — we want to observe the
// not-ready window.
srv := &Server{store: store}
req := httptest.NewRequest("GET", "/api/analytics/subpaths?minLen=2&maxLen=4&limit=10", nil)
rec := httptest.NewRecorder()
srv.handleAnalyticsSubpaths(rec, req)
if rec.Code != http.StatusServiceUnavailable {
t.Fatalf("status = %d, want 503 (subpath index loading, #1008)", rec.Code)
}
if got := rec.Header().Get("Retry-After"); got != "5" {
t.Errorf("Retry-After header = %q, want %q", got, "5")
}
var body map[string]interface{}
if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil {
t.Fatalf("body not valid JSON: %v (body=%s)", err, rec.Body.String())
}
if body["error"] != "index loading" {
t.Errorf(`body["error"] = %v, want "index loading"`, body["error"])
}
}
// TestIssue1008_HandlerRecoversAfterIndexReady asserts that, once the
// background build completes, the handler returns 200.
func TestIssue1008_HandlerRecoversAfterIndexReady(t *testing.T) {
db := setupRichTestDB(t)
defer db.Close()
store := NewPacketStore(db, nil)
if err := store.Load(); err != nil {
t.Fatalf("Load() error: %v", err)
}
// Wait up to 5s for both background builds to finish on this small
// fixture (rich test DB has ~3 packets; build is sub-millisecond).
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) {
if store.SubpathIndexReady() && store.PathHopIndexReady() {
break
}
time.Sleep(10 * time.Millisecond)
}
if !store.SubpathIndexReady() {
t.Fatal("SubpathIndexReady() never flipped true within 5s")
}
if !store.PathHopIndexReady() {
t.Fatal("PathHopIndexReady() never flipped true within 5s")
}
srv := &Server{store: store}
req := httptest.NewRequest("GET", "/api/analytics/subpaths?minLen=2&maxLen=4&limit=10", nil)
rec := httptest.NewRecorder()
srv.handleAnalyticsSubpaths(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("status after ready = %d, want 200 (body=%s)", rec.Code, rec.Body.String())
}
}
// TestIssue1008_m7_BothFlagsSetAfterParallelStart verifies that the
// parallel two-goroutine version of startBackgroundIndexBuilds (review
// m7) sets BOTH ready flags after a bounded wait, regardless of which
// goroutine wins the race to s.mu.Lock(). Sanity check that breaking
// the two builds apart didn't drop the pathHop flag flip.
func TestIssue1008_m7_BothFlagsSetAfterParallelStart(t *testing.T) {
db := setupRichTestDB(t)
defer db.Close()
store := NewPacketStore(db, nil)
if err := store.Load(); err != nil {
t.Fatalf("Load: %v", err)
}
if !store.WaitIndexesReady(5 * time.Second) {
t.Fatal("indexes never ready after parallel start (#1008 m7)")
}
if !store.SubpathIndexReady() {
t.Error("subpath flag not set after WaitIndexesReady returned true")
}
if !store.PathHopIndexReady() {
t.Error("pathHop flag not set after WaitIndexesReady returned true")
}
}
+23 -1
View File
@@ -15,6 +15,20 @@ import (
// plenty fresh for an at-a-glance status column.
const repeaterEnrichmentRecomputerDefaultInterval = 5 * time.Minute
// repeaterEnrichmentPrewarmWait is the upper bound on how long the
// synchronous prewarm in StartRepeaterEnrichmentRecomputer will wait
// for the background subpath+pathHop index builds to flip ready before
// skipping the prewarm. Override in tests via the package-level var.
//
// Background (issue #1008 review M1): the prewarm computes against
// s.byPathHop. If the background index builds haven't finished, the
// snapshot is built against an empty map and locked into
// s.repeaterRelayCache for `interval` (default 5min) — every
// /api/nodes during that window would report relay_count_24h=0. We
// wait up to this deadline and, on timeout, skip the prewarm entirely
// so the next ticker fire (which will see ready=true) does the work.
var repeaterEnrichmentPrewarmWait = 60 * time.Second
// StartRepeaterEnrichmentRecomputer is the steady-state background
// recompute loop for the repeater enrichment bulk caches consumed by
// handleNodes (GetRepeaterRelayInfoMap + GetRepeaterUsefulnessScoreMap).
@@ -55,7 +69,15 @@ func (s *PacketStore) StartRepeaterEnrichmentRecomputer(windowHours float64, int
// is to make sure the very first /api/nodes?limit=2000 from
// live.js's SPA bootstrap (issue #1262) hits a populated cache
// instead of paying the on-thread rebuild cost.
recomputeRepeaterEnrichmentSafe(s, windowHours)
//
// Issue #1008 review M1: skip the prewarm if the background
// subpath+pathHop index builds haven't finished — otherwise we'd
// snapshot against an empty s.byPathHop and serve relay_count_24h=0
// for the entire `interval` window. The next ticker fire will pick
// up the populated index.
if s.WaitIndexesReady(repeaterEnrichmentPrewarmWait) {
recomputeRepeaterEnrichmentSafe(s, windowHours)
}
var stopOnce sync.Once
go func() {
@@ -0,0 +1,95 @@
// Issue #1008 review M1: StartRepeaterEnrichmentRecomputer must wait
// for the background subpath+pathHop index builds before doing its
// synchronous prewarm — otherwise the prewarm reads an empty
// s.byPathHop and locks zeroed enrichment into s.repeaterRelayCache
// for the entire ticker interval.
package main
import (
"testing"
"time"
)
// TestIssue1008_M1_PrewarmWaitsForIndexes asserts that when the index
// ready flags are FALSE at the moment StartRepeaterEnrichmentRecomputer
// is called, the synchronous prewarm does NOT populate
// repeaterRelayCache (it either waits for ready, or skips). Without the
// fix the prewarm runs immediately against empty byPathHop and the
// cache becomes non-nil.
func TestIssue1008_M1_PrewarmWaitsForIndexes(t *testing.T) {
db := setupRichTestDB(t)
defer db.Close()
store := NewPacketStore(db, nil)
if err := store.Load(); err != nil {
t.Fatalf("Load: %v", err)
}
// Wait for the background builder to finish so it can't race past
// our Store(false) below. Once it's done it won't write the flags
// again, so flipping them back to false is stable.
if !store.WaitIndexesReady(5 * time.Second) {
t.Fatal("background builds never finished")
}
// Force the ready flags back to false to simulate the race where
// the recomputer is started before background builds finish. Also
// reset the broadcast channel — it was closed when the background
// builder flipped both flags true; if we left it closed,
// WaitIndexesReady would return immediately on the channel select
// (correct for production semantics where flags never reset,
// wrong for this synthetic test).
store.subpathReady.Store(false)
store.pathHopReady.Store(false)
store.indexReadyChMu.Lock()
store.indexReadyChan = nil
store.indexReadyChMu.Unlock()
// Use a tiny wait so the test runs fast. With the fix in place the
// prewarm should time out waiting for ready and SKIP, leaving the
// cache untouched. Without the fix it would compute immediately
// against the empty byPathHop.
prev := repeaterEnrichmentPrewarmWait
repeaterEnrichmentPrewarmWait = 50 * time.Millisecond
defer func() { repeaterEnrichmentPrewarmWait = prev }()
stop := store.StartRepeaterEnrichmentRecomputer(24, time.Hour)
defer stop()
// Give the prewarm time to complete (or to skip).
time.Sleep(150 * time.Millisecond)
store.repeaterEnrichMu.Lock()
cached := store.repeaterRelayCache
at := store.repeaterRelayAt
store.repeaterEnrichMu.Unlock()
if cached != nil || !at.IsZero() {
t.Fatalf("expected prewarm to SKIP when indexes not ready (cache==nil, at==zero); got cache=%v at=%v (#1008 M1)",
cached != nil, at)
}
}
// TestIssue1008_M1_PrewarmRunsWhenReady asserts the prewarm still runs
// (cache populated) when the indexes are already ready.
func TestIssue1008_M1_PrewarmRunsWhenReady(t *testing.T) {
db := setupRichTestDB(t)
defer db.Close()
store := NewPacketStore(db, nil)
if err := store.Load(); err != nil {
t.Fatalf("Load: %v", err)
}
if !store.WaitIndexesReady(5 * time.Second) {
t.Fatal("indexes never ready")
}
stop := store.StartRepeaterEnrichmentRecomputer(24, time.Hour)
defer stop()
// Prewarm is synchronous on the caller's goroutine, so after
// Start returns the cache must be populated.
store.repeaterEnrichMu.Lock()
at := store.repeaterRelayAt
store.repeaterEnrichMu.Unlock()
if at.IsZero() {
t.Fatal("expected prewarm to populate repeaterRelayAt when indexes ready (#1008 M1)")
}
}
+16
View File
@@ -1525,6 +1525,10 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) {
writeError(w, 503, "Packet store unavailable")
return
}
if !s.store.PathHopIndexReady() {
writeIndexLoading503(w)
return
}
// Use the precomputed byPathHop index instead of scanning all packets.
// Look up by full pubkey (resolved hops) and by short prefixes (raw hops).
@@ -2093,6 +2097,10 @@ func (s *Server) handleAnalyticsHashCollisions(w http.ResponseWriter, r *http.Re
func (s *Server) handleAnalyticsSubpaths(w http.ResponseWriter, r *http.Request) {
if s.store != nil {
if !s.store.SubpathIndexReady() {
writeIndexLoading503(w)
return
}
region := r.URL.Query().Get("region")
minLen := queryInt(r, "minLen", 2)
if minLen < 2 {
@@ -2119,6 +2127,10 @@ func (s *Server) handleAnalyticsSubpaths(w http.ResponseWriter, r *http.Request)
// response, avoiding repeated scans of the same packet data. Query format:
// ?groups=2-2:50,3-3:30,4-4:20,5-8:15 (minLen-maxLen:limit per group)
func (s *Server) handleAnalyticsSubpathsBulk(w http.ResponseWriter, r *http.Request) {
if s.store != nil && !s.store.SubpathIndexReady() {
writeIndexLoading503(w)
return
}
region := r.URL.Query().Get("region")
groupsParam := r.URL.Query().Get("groups")
if groupsParam == "" {
@@ -2198,6 +2210,10 @@ func (s *Server) handleAnalyticsSubpathDetail(w http.ResponseWriter, r *http.Req
}
}
if s.store != nil {
if !s.store.SubpathIndexReady() {
writeIndexLoading503(w)
return
}
writeJSON(w, s.store.GetSubpathDetail(rawHops))
return
}
+40
View File
@@ -27,6 +27,12 @@ func setupTestServer(t *testing.T) (*Server, *mux.Router) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
// #1008: Load() now defers subpath + path-hop index builds to a
// background goroutine. Wait for them before handlers go live so
// the existing assertions (which expect 200, not 503) hold.
if !store.WaitIndexesReady(5 * time.Second) {
t.Fatalf("background indexes never became ready")
}
srv.store = store
router := mux.NewRouter()
srv.RegisterRoutes(router)
@@ -44,6 +50,10 @@ func setupTestServerWithAPIKey(t *testing.T, apiKey string) (*Server, *mux.Route
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
// #1008: see setupTestServer comment.
if !store.WaitIndexesReady(5 * time.Second) {
t.Fatalf("background indexes never became ready")
}
srv.store = store
router := mux.NewRouter()
srv.RegisterRoutes(router)
@@ -1079,6 +1089,7 @@ func TestChannelMessagesWithRegion(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
srv.store = store
router := mux.NewRouter()
srv.RegisterRoutes(router)
@@ -1332,6 +1343,7 @@ func TestResolveHopsAmbiguous(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
srv.store = store
router := mux.NewRouter()
srv.RegisterRoutes(router)
@@ -1578,6 +1590,7 @@ func TestNodeAnalyticsNoNameNode(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
srv.store = store
router := mux.NewRouter()
srv.RegisterRoutes(router)
@@ -1614,6 +1627,7 @@ func TestNodeHealthForNoNameNode(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
srv.store = store
router := mux.NewRouter()
srv.RegisterRoutes(router)
@@ -2250,6 +2264,7 @@ store := NewPacketStore(db, nil)
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
pk := "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'TestNode', 'repeater')", pk)
@@ -2298,6 +2313,7 @@ store := NewPacketStore(db, nil)
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
pk := "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'Repeater2B', 'repeater')", pk)
@@ -2341,6 +2357,7 @@ func TestGetNodeHashSizeInfoLatestWins(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
pk := "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'LatestWins', 'repeater')", pk)
@@ -2390,6 +2407,7 @@ func TestGetNodeHashSizeInfoIgnoreDirectZeroHop(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
pk := "dddd111122223333444455556666777788889999aaaabbbbccccddddeeee3333"
db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'DirIgnore', 'repeater')", pk)
@@ -2437,6 +2455,7 @@ func TestGetNodeHashSizeInfoOnlyDirectZeroHopIgnored(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
pk := "eeee111122223333444455556666777788889999aaaabbbbccccddddeeee4444"
db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'OnlyDirect', 'repeater')", pk)
@@ -2471,6 +2490,7 @@ func TestGetNodeHashSizeInfoDirectNonZeroHopCounted(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
pk := "ffff111122223333444455556666777788889999aaaabbbbccccddddeeee5555"
db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'DirNonZero', 'repeater')", pk)
@@ -2510,6 +2530,7 @@ func TestGetNodeHashSizeInfoNoAdverts(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
pk := "dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd"
db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'NoAdverts', 'repeater')", pk)
@@ -2543,6 +2564,7 @@ func TestHashAnalyticsZeroHopAdvert(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
// Capture baseline from seed data (bypass cache via computeAnalyticsHashSizes)
baseline := store.computeAnalyticsHashSizes("", "")
@@ -2602,6 +2624,7 @@ func TestAnalyticsHashSizeSameNameDifferentPubkey(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
pk1 := "aaaa111122223333444455556666777788889999aaaabbbbccccddddeeee1111"
pk2 := "aaaa111122223333444455556666777788889999aaaabbbbccccddddeeee2222"
@@ -2670,6 +2693,7 @@ func TestInconsistentNodesExcludesCompanions(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
now := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")
payloadType := 4
@@ -2753,6 +2777,7 @@ func TestHashSizeInfoTimeWindow(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
pk := "dd44444444444444444444444444444444444444444444444444444444444444"
db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'OldNode', 'repeater')", pk)
@@ -2924,6 +2949,7 @@ func TestLatestSeenMaintained(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
store.mu.RLock()
defer store.mu.RUnlock()
@@ -2988,6 +3014,7 @@ func TestQueryGroupedPacketsSortedByLatest(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
result := store.QueryGroupedPackets(PacketQuery{Limit: 50})
if result.Total < 2 {
@@ -3025,6 +3052,7 @@ func TestQueryGroupedPacketsCacheReturnsConsistentResult(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
q := PacketQuery{Limit: 50}
r1 := store.QueryGroupedPackets(q)
@@ -3054,6 +3082,7 @@ func TestGetChannelsCacheReturnsConsistentResult(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
r1 := store.GetChannels("")
r2 := store.GetChannels("")
@@ -3092,6 +3121,7 @@ func TestGetChannelsNotBlockedByLargeLock(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
channels := store.GetChannels("")
@@ -3328,6 +3358,7 @@ func TestHashCollisionsCacheTTL(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
if store.collisionCacheTTL != 3600*time.Second {
t.Errorf("expected collisionCacheTTL=3600s, got %v", store.collisionCacheTTL)
@@ -3372,6 +3403,7 @@ func TestHashCollisionsEmptyStore(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
srv.store = store
router := mux.NewRouter()
srv.RegisterRoutes(router)
@@ -3424,6 +3456,7 @@ func TestHashCollisionsWithCollision(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
// Inject hash_size=1 for both nodes so they appear in the 1-byte bucket
store.hashSizeInfoMu.Lock()
store.hashSizeInfoCache = map[string]*hashSizeNodeInfo{
@@ -3490,6 +3523,7 @@ func TestHashCollisionsShortPublicKey(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
srv.store = store
router := mux.NewRouter()
srv.RegisterRoutes(router)
@@ -3522,6 +3556,7 @@ func TestHashCollisionsMissingCoordinates(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
store.WaitIndexesReady(5 * time.Second)
srv.store = store
router := mux.NewRouter()
srv.RegisterRoutes(router)
@@ -3774,6 +3809,11 @@ func TestNodePathsPrefixCollisionFilter(t *testing.T) {
if err := store.Load(); err != nil {
t.Fatalf("store.Load failed: %v", err)
}
// #1008: wait for the background index build to complete before
// hitting the handler (otherwise it returns 503 index-loading).
if !store.WaitIndexesReady(5 * time.Second) {
t.Fatal("indexes never became ready")
}
srv.store = store
// Query paths for TestRepeater — should NOT include the collision packet
+33 -2
View File
@@ -247,6 +247,19 @@ type PacketStore struct {
spIndex map[string]int // "hop1,hop2" → count
spTxIndex map[string][]*StoreTx // "hop1,hop2" → transmissions containing this subpath
spTotalPaths int // transmissions with paths >= 2 hops
// Background-build ready gates for spIndex/spTxIndex and byPathHop
// (#1008). Flipped from false→true exactly once by the goroutine
// kicked off in Load() (or synchronously by the background chunk
// loader). Handlers gate reads via SubpathIndexReady() /
// PathHopIndexReady(); while false, they respond 503 + Retry-After.
subpathReady atomic.Bool
pathHopReady atomic.Bool
// indexReadyChan is closed exactly once when BOTH subpathReady
// and pathHopReady are true (#1008 review m6). Replaces the
// previous 2ms poll in WaitIndexesReady. Lazily allocated by
// indexReadyCh / maybeCloseIndexReadyCh in index_ready_1008.go.
indexReadyChMu sync.Mutex
indexReadyChan chan struct{}
// Precomputed distance analytics: hop distances and path totals.
// Built LAZILY on first /api/analytics/distance request (#1011) —
// previously eager in Load() at startup, which was O(n²) work for
@@ -837,10 +850,15 @@ func (s *PacketStore) Load() error {
}
// Build precomputed subpath index for O(1) analytics queries
s.buildSubpathIndex()
// — DEFERRED to a background goroutine (#1008). Same rationale
// as the distance index (#1011): synchronous build under s.mu
// blocks HTTP readiness ~60s at Cascadia scale. The goroutine is
// started AFTER s.loaded = true below.
// s.buildSubpathIndex()
// Build path-hop index for O(1) node path lookups
s.buildPathHopIndex()
// — DEFERRED to a background goroutine (#1008).
// s.buildPathHopIndex()
// Precompute distance analytics (hop distances, path totals)
// — DEFERRED to first /api/analytics/distance request (#1011).
@@ -869,6 +887,12 @@ func (s *PacketStore) Load() error {
len(s.packets), s.totalObs, elapsed, s.trackedMemoryMB(), s.estimatedMemoryMB())
}
s.loadMultibyteCapFromDB()
// Kick off background subpath + path-hop index builds (#1008).
// The goroutine acquires s.mu.Lock() and so will block until Load's
// deferred Unlock fires when this function returns. HTTP handlers
// gate reads behind SubpathIndexReady() / PathHopIndexReady() and
// respond 503 + Retry-After: 5 until the builds finish.
s.startBackgroundIndexBuilds()
return nil
}
@@ -1282,6 +1306,13 @@ func (s *PacketStore) loadBackgroundChunks() {
s.distLazyOnce = sync.Once{}
s.distLazyMu.Unlock()
s.mu.Unlock()
// #1008 review m3: flip the ready flags after the synchronous
// rebuild for symmetry with startBackgroundIndexBuilds. Safe
// today because the chunk loader runs after Load() has already
// kicked the goroutines that set these to true; this is a
// belt-and-suspenders against a future reorder where the chunk
// loader could be the first writer.
s.markIndexesReadySync()
s.backgroundLoadDone.Store(true)
if chunkErrors > 0 {