mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-06-28 13:21:43 +00:00
## Summary
Mirrors the distance-index lazy pattern (#1011): the subpath and
path-hop index builds are no longer part of `Load()`'s synchronous
critical section. They now run in **two parallel background goroutines**
kicked off after `s.loaded = true`, so HTTP comes up immediately even at
Cascadia scale (5M observations, previously ~60s blocked on these two
builds inside `Load()` under `s.mu`).
Fixes #1008.
## Approach
Two new `atomic.Bool` fields on `PacketStore` (`subpathReady`,
`pathHopReady`) plus a one-shot broadcast channel (`indexReadyChan`) for
waiters. `Load()` removes the synchronous `s.buildSubpathIndex()` /
`s.buildPathHopIndex()` calls and instead kicks
`s.startBackgroundIndexBuilds()` right before returning. That function
spawns **two independent goroutines** (review m7), one per index. Each
goroutine:
1. acquires `s.mu.Lock()` (blocks until `Load()`'s deferred Unlock
fires),
2. runs its builder, releases the lock, stores its `ready = true`,
3. closes the broadcast channel if both flags are now true,
4. logs `[startup] index build complete: subpath (Xs)` (or pathHop).
Analytics handlers whose entire response IS the index aggregate —
`/api/analytics/subpaths`, `/api/analytics/subpaths-bulk`,
`/api/analytics/subpath-detail`, `/api/nodes/{pubkey}/paths` — gate
reads behind the corresponding atomic and respond with `503 Service
Unavailable`, `Retry-After: 5`, body `{"error":"index
loading","retryAfter":5}` until the build completes — matching the
triage spec.
### Handler scope (review M2)
A second class of handlers also touches these indexes — `/api/nodes`,
`/api/nodes/{pubkey}`, the `GetRepeaterRelayInfoMap` /
`GetRepeaterUsefulnessScoreMap` / `GetBridgeScore` enrichment helpers,
and `repeater_liveness` / `repeater_usefulness`. These are
**intentionally NOT 503-gated**: they expose the index via optional
enrichment fields that callers already treat as "may be empty", and
503-ing the SPA bootstrap to wait for an index that only affects
relay-activity badges would be a worse UX than a 30–60s window of "—"
values. The rationale is documented in the package doc-comment at the
top of `index_ready_1008.go`.
The recomputer's synchronous prewarm path
(`StartRepeaterEnrichmentRecomputer`) gates on `WaitIndexesReady(60s)`
(review M1) so it never snapshots an empty `byPathHop` into
`s.repeaterRelayCache`; on timeout it skips the prewarm and lets the
5-minute ticker pick up the populated index.
## Concurrency safety
Each build goroutine acquires `s.mu.Lock()` before calling the existing
`buildSubpathIndex()` / `buildPathHopIndex()` helpers, which replace
`s.spIndex` / `s.spTxIndex` / `s.byPathHop` with freshly-allocated maps.
Visibility of the populated maps to handlers that observe
`Ready()==true` is established by Go 1.19+ sync/atomic acquire-release
semantics: the atomic store of `true` happens-after `s.mu.Unlock()`, and
the handler's atomic load synchronizes-with that store. The handler's
subsequent `s.mu.RLock` serializes against concurrent ingest writers,
not against the builder.
The existing `main.go` boot sequence does not start ingest goroutines
until after `store.Load()` returns and graph init completes, so the
brief window between `Load()` returning and the two goroutines acquiring
`s.mu` does not race with concurrent ingest writes.
## TDD: red → green
- **Red** commit `63e79e11`: `cmd/server/index_ready_1008_test.go` adds
four assertions; `cmd/server/index_ready_1008.go` adds compile-only
stubs returning `true` so the tests fail on assertions, not build
errors.
- **Green** commit `fb1d22b0`: implements the real atomic gates, the
background goroutine, and the four handler 503 branches; also updates
four existing tests that read indexes directly post-`Load()` to call
`store.WaitIndexesReady(5s)` first.
- **Race-fix commit `b77d56eb`** (review m8 — test-infra exemption):
adds `WaitIndexesReady` calls in test helpers/setup paths so the race
detector no longer flags the read-after-Load() pattern in existing
tests. Per AGENTS.md, race-detector flakes are observable evidence (test
crashes under `-race`) and qualify for the test-infra exemption from the
TDD red-commit requirement; no behavior change in production code.
- **Polish round 2 — M1 red `408c7462` / green `85e82c8a`**:
`TestIssue1008_M1_PrewarmWaitsForIndexes` asserts the recomputer prewarm
SKIPs when indexes are not ready. Red commit adds the assertion + a stub
`repeaterEnrichmentPrewarmWait` var; green commit wires
`WaitIndexesReady` into the prewarm path and adds the handler-scope docs
for M2.
- **Polish round 2 — minor cleanups `fd089bd0`** (m3..m7): chunk-loader
wires `markIndexesReadySync`, memory-model comment rewritten to cite
acquire-release, sentinel deleted, polling replaced with a broadcast
channel, two parallel goroutines for the builds.
`TestIssue1008_m7_BothFlagsSetAfterParallelStart` covers the parallel
path.
## Reproduction
```
git fetch origin fix/issue-1008
git checkout 63e79e11 # red commit
cd cmd/server && go test -run TestIssue1008_ -count=1 . # FAILs
git checkout fix/issue-1008 # latest green
cd cmd/server && go test -run TestIssue1008 -count=1 -race . # all pass
cd cmd/server && go test -count=1 -race -short ./... # full suite ok
```
## Files changed
| file | role |
|---|---|
| `cmd/server/store.go` | atomic.Bool fields + indexReadyChan broadcast
field; remove sync build calls in Load(); kick goroutines; wire
markIndexesReadySync from chunk loader |
| `cmd/server/index_ready_1008.go` | ready flags, two-goroutine
background builds, 503 helper, channel-based WaitIndexesReady,
handler-scope docs |
| `cmd/server/index_ready_1008_test.go` | red-commit contract tests +
parallel-start assertion |
| `cmd/server/repeater_enrich_recomputer.go` | gate prewarm on
WaitIndexesReady (M1) |
| `cmd/server/repeater_enrich_recomputer_1008_test.go` | M1 red+green
assertions |
| `cmd/server/routes.go` | 503 gate on 4 analytics handlers |
| `cmd/server/routes_test.go` | setup helpers wait for ready; collision
test waits |
| `cmd/server/coverage_test.go` | three tests wait for ready before
reading indexes |
## Out of scope
- Distance index (already deferred in #1011) — untouched.
- The `pickBestObservation` + `indexByNode` per-tx loop in `Load()` —
kept synchronous per triage Findings (ordering-sensitive,
contiguous-memory, fast).
---------
Co-authored-by: bot <bot@noreply.local>
Co-authored-by: openclaw-bot <bot@openclaw.local>
Co-authored-by: mc-bot <mc-bot@users.noreply.github.com>
This commit is contained in:
@@ -2289,6 +2289,10 @@ func TestSubpathPrecomputedIndex(t *testing.T) {
|
||||
defer db.Close()
|
||||
store := NewPacketStore(db, nil)
|
||||
store.Load()
|
||||
// #1008: indexes built in background goroutine; wait before reading.
|
||||
if !store.WaitIndexesReady(5 * time.Second) {
|
||||
t.Fatal("indexes never became ready")
|
||||
}
|
||||
|
||||
// After Load(), the precomputed index must be populated.
|
||||
if len(store.spIndex) == 0 {
|
||||
@@ -2343,6 +2347,10 @@ func TestSubpathTxIndexPopulated(t *testing.T) {
|
||||
defer db.Close()
|
||||
store := NewPacketStore(db, nil)
|
||||
store.Load()
|
||||
// #1008: indexes built in background goroutine; wait before reading.
|
||||
if !store.WaitIndexesReady(5 * time.Second) {
|
||||
t.Fatal("indexes never became ready")
|
||||
}
|
||||
|
||||
// spTxIndex must be populated alongside spIndex
|
||||
if len(store.spTxIndex) == 0 {
|
||||
@@ -2387,6 +2395,10 @@ func TestSubpathDetailMixedCaseHops(t *testing.T) {
|
||||
defer db.Close()
|
||||
store := NewPacketStore(db, nil)
|
||||
store.Load()
|
||||
// #1008: indexes built in background goroutine; wait before reading.
|
||||
if !store.WaitIndexesReady(5 * time.Second) {
|
||||
t.Fatal("indexes never became ready")
|
||||
}
|
||||
|
||||
// Query with lowercase hops to establish baseline
|
||||
lower := store.GetSubpathDetail([]string{"eeff", "0011"})
|
||||
|
||||
@@ -0,0 +1,218 @@
|
||||
// Issue #1008: background-deferred subpath + pathHop index builds.
|
||||
//
|
||||
// Pattern mirrors the distance index (#1011) — but where distance is
|
||||
// fully lazy (built on first request), these two indexes are kicked off
|
||||
// eagerly by Load() in a background goroutine so HTTP becomes ready
|
||||
// immediately while the indexes finish populating.
|
||||
//
|
||||
// Concurrency model:
|
||||
//
|
||||
// - subpathReady / pathHopReady are atomic.Bool flags written exactly
|
||||
// once by the background builder (false → true) and never reset
|
||||
// thereafter. Handlers read them via SubpathIndexReady() /
|
||||
// PathHopIndexReady() before touching s.spIndex / s.spTxIndex /
|
||||
// s.byPathHop. While a flag is false, the handler responds 503 +
|
||||
// Retry-After: 5.
|
||||
//
|
||||
// - The builder itself acquires s.mu.Lock() and calls the existing
|
||||
// buildSubpathIndex() / buildPathHopIndex() methods. Those methods
|
||||
// replace s.spIndex / s.spTxIndex / s.byPathHop with freshly-
|
||||
// allocated maps under the write lock. Visibility of the populated
|
||||
// maps to handlers that see Ready()==true is guaranteed by Go's
|
||||
// sync/atomic acquire-release semantics (formalized in Go 1.19):
|
||||
// the atomic.Store(true) happens-after the s.mu.Unlock() that
|
||||
// completes the build, and the handler's atomic.Load()==true
|
||||
// synchronizes-with that store. The handler's subsequent s.mu.RLock
|
||||
// is not what establishes visibility — it only serializes against
|
||||
// concurrent ingest writers — so dropping the RLock would still be
|
||||
// safe for the build's "populated map" snapshot (we keep it for
|
||||
// ingest serialization).
|
||||
//
|
||||
// - Ingest-side incremental updates in StoreNewTransmissions /
|
||||
// pruning / hash-collision paths continue to write s.spIndex /
|
||||
// s.spTxIndex / s.byPathHop directly under s.mu.Lock(). Because
|
||||
// the builder also runs under s.mu.Lock() and the builder
|
||||
// overwrites whatever is there, the brief window between Load()
|
||||
// returning and the goroutine acquiring s.mu means any
|
||||
// concurrent ingest writes will be overwritten by the build —
|
||||
// this matches the prior behavior where ingest could not start
|
||||
// until Load() released s.mu, so in practice ingest does not
|
||||
// run during the build window. Documenting this rather than
|
||||
// adding a separate gate: the existing main.go boot sequence
|
||||
// does not start ingest goroutines until after store.Load()
|
||||
// and graph init complete.
|
||||
//
|
||||
// Handler scope of the ready gate (issue #1008 review M2):
|
||||
//
|
||||
// - HARD-GATED with 503 + Retry-After: 5 — analytics endpoints whose
|
||||
// entire response is the index aggregate. Empty data would be
|
||||
// visibly broken (charts, top-N tables). See routes.go:
|
||||
// /api/analytics/subpaths, /api/analytics/subpaths-bulk,
|
||||
// /api/analytics/subpath-detail, /api/nodes/{pubkey}/paths.
|
||||
//
|
||||
// - BEST-EFFORT (not gated) — endpoints where the index drives
|
||||
// enrichment fields that callers already treat as optional. During
|
||||
// the not-ready window these report zero counts / nil scores
|
||||
// rather than 503-ing the whole list. Acceptable because:
|
||||
//
|
||||
// * /api/nodes and /api/nodes/{pubkey} have many other fields
|
||||
// (last-seen, position, advert metadata) that callers depend
|
||||
// on at startup. 503-ing the SPA bootstrap to wait for an
|
||||
// index that exclusively affects "relay activity" badges
|
||||
// would be a worse UX than a 30–60s window of "—" badges.
|
||||
//
|
||||
// * GetRepeaterRelayInfoMap / GetRepeaterUsefulnessScoreMap /
|
||||
// GetBridgeScore / repeater_liveness / repeater_usefulness
|
||||
// all walk s.byPathHop. During the build window they return
|
||||
// empty maps or zero scores; the steady-state recomputer
|
||||
// (#1262) refreshes them every 5min once indexes flip ready
|
||||
// (prewarm guarded by WaitIndexesReady — see review M1).
|
||||
//
|
||||
// This is documented rather than gated so operators do not see
|
||||
// /api/nodes 503 during routine restarts on Cascadia-scale data.
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// writeIndexLoading503 emits the standard 503 response used by handlers
|
||||
// that depend on a not-yet-built index (#1008). Body shape matches the
|
||||
// triage spec: {"error":"index loading","retryAfter":5}. The Retry-After
|
||||
// header is also set so well-behaved clients back off automatically.
|
||||
func writeIndexLoading503(w http.ResponseWriter) {
|
||||
w.Header().Set("Retry-After", "5")
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
_, _ = w.Write([]byte(`{"error":"index loading","retryAfter":5}`))
|
||||
}
|
||||
|
||||
// SubpathIndexReady reports whether the subpath index build kicked off
|
||||
// by Load() has completed (#1008). Until this returns true, callers
|
||||
// must NOT read s.spIndex / s.spTxIndex.
|
||||
func (s *PacketStore) SubpathIndexReady() bool {
|
||||
return s.subpathReady.Load()
|
||||
}
|
||||
|
||||
// PathHopIndexReady reports whether the path-hop index build kicked
|
||||
// off by Load() has completed (#1008). Until this returns true,
|
||||
// callers must NOT read s.byPathHop.
|
||||
func (s *PacketStore) PathHopIndexReady() bool {
|
||||
return s.pathHopReady.Load()
|
||||
}
|
||||
|
||||
// indexReadyCh returns the channel that is closed when BOTH indexes
|
||||
// have flipped ready. Lazily created on first access. Safe to call
|
||||
// concurrently. Used by WaitIndexesReady and any future waiters that
|
||||
// want event-driven semantics instead of polling.
|
||||
func (s *PacketStore) indexReadyCh() <-chan struct{} {
|
||||
s.indexReadyChMu.Lock()
|
||||
defer s.indexReadyChMu.Unlock()
|
||||
if s.indexReadyChan == nil {
|
||||
s.indexReadyChan = make(chan struct{})
|
||||
// If both are already ready (e.g. background chunk loader
|
||||
// flipped them synchronously before any waiter showed up),
|
||||
// close immediately so the channel is usable as a one-shot.
|
||||
if s.subpathReady.Load() && s.pathHopReady.Load() {
|
||||
close(s.indexReadyChan)
|
||||
}
|
||||
}
|
||||
return s.indexReadyChan
|
||||
}
|
||||
|
||||
// maybeCloseIndexReadyCh closes the ready channel iff both flags are
|
||||
// set. Idempotent (a sync.Once on the channel) and safe to call from
|
||||
// either builder goroutine on the green-path transitions, as well as
|
||||
// from markIndexesReadySync.
|
||||
func (s *PacketStore) maybeCloseIndexReadyCh() {
|
||||
if !(s.subpathReady.Load() && s.pathHopReady.Load()) {
|
||||
return
|
||||
}
|
||||
s.indexReadyChMu.Lock()
|
||||
defer s.indexReadyChMu.Unlock()
|
||||
if s.indexReadyChan == nil {
|
||||
// Lazily allocate AND close it in one step so any future
|
||||
// indexReadyCh() caller gets a pre-closed channel.
|
||||
s.indexReadyChan = make(chan struct{})
|
||||
close(s.indexReadyChan)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-s.indexReadyChan:
|
||||
// Already closed.
|
||||
default:
|
||||
close(s.indexReadyChan)
|
||||
}
|
||||
}
|
||||
|
||||
// startBackgroundIndexBuilds is called from Load() after s.loaded=true
|
||||
// to populate the subpath + path-hop indexes off the critical path
|
||||
// (#1008). It returns immediately; the work runs in two background
|
||||
// goroutines (one per index — see review m7) that each acquire
|
||||
// s.mu.Lock() independently, install their map, then set the
|
||||
// corresponding atomic ready flag.
|
||||
//
|
||||
// At Cascadia scale (~5M observations) this previously blocked HTTP
|
||||
// readiness ~60s inside Load() under s.mu. Running the two builds in
|
||||
// parallel halves the pathHop-not-ready window since the two builders
|
||||
// are independent of each other.
|
||||
func (s *PacketStore) startBackgroundIndexBuilds() {
|
||||
go func() {
|
||||
t0 := time.Now()
|
||||
s.mu.Lock()
|
||||
s.buildSubpathIndex()
|
||||
s.mu.Unlock()
|
||||
// Atomic.Store happens-after s.mu.Unlock; handlers that
|
||||
// observe Ready()==true synchronize-with this store.
|
||||
s.subpathReady.Store(true)
|
||||
s.maybeCloseIndexReadyCh()
|
||||
log.Printf("[startup] index build complete: subpath (%s)",
|
||||
time.Since(t0).Round(time.Millisecond))
|
||||
}()
|
||||
go func() {
|
||||
t1 := time.Now()
|
||||
s.mu.Lock()
|
||||
s.buildPathHopIndex()
|
||||
s.mu.Unlock()
|
||||
s.pathHopReady.Store(true)
|
||||
s.maybeCloseIndexReadyCh()
|
||||
log.Printf("[startup] index build complete: pathHop (%s)",
|
||||
time.Since(t1).Round(time.Millisecond))
|
||||
}()
|
||||
}
|
||||
|
||||
// markIndexesReadySync is the synchronous-build entry point used by
|
||||
// the background chunk loader in store.go (and by tests). The chunk
|
||||
// loader rebuilds both indexes under s.mu.Lock(); after the Unlock it
|
||||
// calls this to flip the ready flags and close the broadcast channel
|
||||
// in one shot, preserving symmetry with the goroutine path above.
|
||||
func (s *PacketStore) markIndexesReadySync() {
|
||||
s.subpathReady.Store(true)
|
||||
s.pathHopReady.Store(true)
|
||||
s.maybeCloseIndexReadyCh()
|
||||
}
|
||||
|
||||
// WaitIndexesReady blocks until both background indexes built by
|
||||
// startBackgroundIndexBuilds() report ready, or the deadline expires.
|
||||
// Returns true if both flipped in time. Intended for tests that read
|
||||
// s.spIndex / s.spTxIndex / s.byPathHop directly after Load(); production
|
||||
// code paths gate via SubpathIndexReady() / PathHopIndexReady() and
|
||||
// respond 503 + Retry-After to clients instead of blocking.
|
||||
//
|
||||
// Uses the indexReadyCh broadcast channel rather than polling
|
||||
// (see review m6) so wake-up is immediate with no poll-interval jitter.
|
||||
func (s *PacketStore) WaitIndexesReady(timeout time.Duration) bool {
|
||||
if s.SubpathIndexReady() && s.PathHopIndexReady() {
|
||||
return true
|
||||
}
|
||||
ch := s.indexReadyCh()
|
||||
select {
|
||||
case <-ch:
|
||||
return true
|
||||
case <-time.After(timeout):
|
||||
return s.SubpathIndexReady() && s.PathHopIndexReady()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,140 @@
|
||||
// Issue #1008: subpath + pathHop index builds must move off the
|
||||
// synchronous Load() critical path into a background goroutine.
|
||||
//
|
||||
// Contract:
|
||||
// 1. Immediately after Load() returns, SubpathIndexReady() and
|
||||
// PathHopIndexReady() report false (the goroutine has not finished).
|
||||
// 2. Analytics handlers that depend on those indices respond 503 with
|
||||
// Retry-After: 5 until the corresponding ready flag flips true.
|
||||
// 3. After the background build completes (waitable via a helper),
|
||||
// both flags flip true and handlers respond 200.
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestIssue1008_SubpathIndexReadyFalseImmediatelyAfterLoad asserts the
|
||||
// subpath ready flag is false the instant Load() returns. Red commit: the
|
||||
// stub returns true → assertion fires. Green commit: the flag is owned by
|
||||
// the background goroutine, which has not yet run, so the assertion holds.
|
||||
func TestIssue1008_SubpathIndexReadyFalseImmediatelyAfterLoad(t *testing.T) {
|
||||
db := setupRichTestDB(t)
|
||||
defer db.Close()
|
||||
store := NewPacketStore(db, nil)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("Load() error: %v", err)
|
||||
}
|
||||
if store.SubpathIndexReady() {
|
||||
t.Fatal("expected SubpathIndexReady()==false immediately after Load(); want background-deferred build (#1008)")
|
||||
}
|
||||
}
|
||||
|
||||
// TestIssue1008_PathHopIndexReadyFalseImmediatelyAfterLoad: same contract
|
||||
// for the path-hop index.
|
||||
func TestIssue1008_PathHopIndexReadyFalseImmediatelyAfterLoad(t *testing.T) {
|
||||
db := setupRichTestDB(t)
|
||||
defer db.Close()
|
||||
store := NewPacketStore(db, nil)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("Load() error: %v", err)
|
||||
}
|
||||
if store.PathHopIndexReady() {
|
||||
t.Fatal("expected PathHopIndexReady()==false immediately after Load(); want background-deferred build (#1008)")
|
||||
}
|
||||
}
|
||||
|
||||
// TestIssue1008_HandlerReturns503WhileSubpathIndexLoading asserts the
|
||||
// analytics/subpaths handler returns 503 + Retry-After: 5 + a JSON body
|
||||
// matching the triage spec while the subpath index is still building.
|
||||
func TestIssue1008_HandlerReturns503WhileSubpathIndexLoading(t *testing.T) {
|
||||
db := setupRichTestDB(t)
|
||||
defer db.Close()
|
||||
store := NewPacketStore(db, nil)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("Load() error: %v", err)
|
||||
}
|
||||
// Don't wait for the background build — we want to observe the
|
||||
// not-ready window.
|
||||
srv := &Server{store: store}
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/analytics/subpaths?minLen=2&maxLen=4&limit=10", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
srv.handleAnalyticsSubpaths(rec, req)
|
||||
|
||||
if rec.Code != http.StatusServiceUnavailable {
|
||||
t.Fatalf("status = %d, want 503 (subpath index loading, #1008)", rec.Code)
|
||||
}
|
||||
if got := rec.Header().Get("Retry-After"); got != "5" {
|
||||
t.Errorf("Retry-After header = %q, want %q", got, "5")
|
||||
}
|
||||
var body map[string]interface{}
|
||||
if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil {
|
||||
t.Fatalf("body not valid JSON: %v (body=%s)", err, rec.Body.String())
|
||||
}
|
||||
if body["error"] != "index loading" {
|
||||
t.Errorf(`body["error"] = %v, want "index loading"`, body["error"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestIssue1008_HandlerRecoversAfterIndexReady asserts that, once the
|
||||
// background build completes, the handler returns 200.
|
||||
func TestIssue1008_HandlerRecoversAfterIndexReady(t *testing.T) {
|
||||
db := setupRichTestDB(t)
|
||||
defer db.Close()
|
||||
store := NewPacketStore(db, nil)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("Load() error: %v", err)
|
||||
}
|
||||
|
||||
// Wait up to 5s for both background builds to finish on this small
|
||||
// fixture (rich test DB has ~3 packets; build is sub-millisecond).
|
||||
deadline := time.Now().Add(5 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
if store.SubpathIndexReady() && store.PathHopIndexReady() {
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
if !store.SubpathIndexReady() {
|
||||
t.Fatal("SubpathIndexReady() never flipped true within 5s")
|
||||
}
|
||||
if !store.PathHopIndexReady() {
|
||||
t.Fatal("PathHopIndexReady() never flipped true within 5s")
|
||||
}
|
||||
|
||||
srv := &Server{store: store}
|
||||
req := httptest.NewRequest("GET", "/api/analytics/subpaths?minLen=2&maxLen=4&limit=10", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
srv.handleAnalyticsSubpaths(rec, req)
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("status after ready = %d, want 200 (body=%s)", rec.Code, rec.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestIssue1008_m7_BothFlagsSetAfterParallelStart verifies that the
|
||||
// parallel two-goroutine version of startBackgroundIndexBuilds (review
|
||||
// m7) sets BOTH ready flags after a bounded wait, regardless of which
|
||||
// goroutine wins the race to s.mu.Lock(). Sanity check that breaking
|
||||
// the two builds apart didn't drop the pathHop flag flip.
|
||||
func TestIssue1008_m7_BothFlagsSetAfterParallelStart(t *testing.T) {
|
||||
db := setupRichTestDB(t)
|
||||
defer db.Close()
|
||||
store := NewPacketStore(db, nil)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("Load: %v", err)
|
||||
}
|
||||
if !store.WaitIndexesReady(5 * time.Second) {
|
||||
t.Fatal("indexes never ready after parallel start (#1008 m7)")
|
||||
}
|
||||
if !store.SubpathIndexReady() {
|
||||
t.Error("subpath flag not set after WaitIndexesReady returned true")
|
||||
}
|
||||
if !store.PathHopIndexReady() {
|
||||
t.Error("pathHop flag not set after WaitIndexesReady returned true")
|
||||
}
|
||||
}
|
||||
@@ -15,6 +15,20 @@ import (
|
||||
// plenty fresh for an at-a-glance status column.
|
||||
const repeaterEnrichmentRecomputerDefaultInterval = 5 * time.Minute
|
||||
|
||||
// repeaterEnrichmentPrewarmWait is the upper bound on how long the
|
||||
// synchronous prewarm in StartRepeaterEnrichmentRecomputer will wait
|
||||
// for the background subpath+pathHop index builds to flip ready before
|
||||
// skipping the prewarm. Override in tests via the package-level var.
|
||||
//
|
||||
// Background (issue #1008 review M1): the prewarm computes against
|
||||
// s.byPathHop. If the background index builds haven't finished, the
|
||||
// snapshot is built against an empty map and locked into
|
||||
// s.repeaterRelayCache for `interval` (default 5min) — every
|
||||
// /api/nodes during that window would report relay_count_24h=0. We
|
||||
// wait up to this deadline and, on timeout, skip the prewarm entirely
|
||||
// so the next ticker fire (which will see ready=true) does the work.
|
||||
var repeaterEnrichmentPrewarmWait = 60 * time.Second
|
||||
|
||||
// StartRepeaterEnrichmentRecomputer is the steady-state background
|
||||
// recompute loop for the repeater enrichment bulk caches consumed by
|
||||
// handleNodes (GetRepeaterRelayInfoMap + GetRepeaterUsefulnessScoreMap).
|
||||
@@ -55,7 +69,15 @@ func (s *PacketStore) StartRepeaterEnrichmentRecomputer(windowHours float64, int
|
||||
// is to make sure the very first /api/nodes?limit=2000 from
|
||||
// live.js's SPA bootstrap (issue #1262) hits a populated cache
|
||||
// instead of paying the on-thread rebuild cost.
|
||||
recomputeRepeaterEnrichmentSafe(s, windowHours)
|
||||
//
|
||||
// Issue #1008 review M1: skip the prewarm if the background
|
||||
// subpath+pathHop index builds haven't finished — otherwise we'd
|
||||
// snapshot against an empty s.byPathHop and serve relay_count_24h=0
|
||||
// for the entire `interval` window. The next ticker fire will pick
|
||||
// up the populated index.
|
||||
if s.WaitIndexesReady(repeaterEnrichmentPrewarmWait) {
|
||||
recomputeRepeaterEnrichmentSafe(s, windowHours)
|
||||
}
|
||||
|
||||
var stopOnce sync.Once
|
||||
go func() {
|
||||
|
||||
@@ -0,0 +1,95 @@
|
||||
// Issue #1008 review M1: StartRepeaterEnrichmentRecomputer must wait
|
||||
// for the background subpath+pathHop index builds before doing its
|
||||
// synchronous prewarm — otherwise the prewarm reads an empty
|
||||
// s.byPathHop and locks zeroed enrichment into s.repeaterRelayCache
|
||||
// for the entire ticker interval.
|
||||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestIssue1008_M1_PrewarmWaitsForIndexes asserts that when the index
|
||||
// ready flags are FALSE at the moment StartRepeaterEnrichmentRecomputer
|
||||
// is called, the synchronous prewarm does NOT populate
|
||||
// repeaterRelayCache (it either waits for ready, or skips). Without the
|
||||
// fix the prewarm runs immediately against empty byPathHop and the
|
||||
// cache becomes non-nil.
|
||||
func TestIssue1008_M1_PrewarmWaitsForIndexes(t *testing.T) {
|
||||
db := setupRichTestDB(t)
|
||||
defer db.Close()
|
||||
store := NewPacketStore(db, nil)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("Load: %v", err)
|
||||
}
|
||||
// Wait for the background builder to finish so it can't race past
|
||||
// our Store(false) below. Once it's done it won't write the flags
|
||||
// again, so flipping them back to false is stable.
|
||||
if !store.WaitIndexesReady(5 * time.Second) {
|
||||
t.Fatal("background builds never finished")
|
||||
}
|
||||
// Force the ready flags back to false to simulate the race where
|
||||
// the recomputer is started before background builds finish. Also
|
||||
// reset the broadcast channel — it was closed when the background
|
||||
// builder flipped both flags true; if we left it closed,
|
||||
// WaitIndexesReady would return immediately on the channel select
|
||||
// (correct for production semantics where flags never reset,
|
||||
// wrong for this synthetic test).
|
||||
store.subpathReady.Store(false)
|
||||
store.pathHopReady.Store(false)
|
||||
store.indexReadyChMu.Lock()
|
||||
store.indexReadyChan = nil
|
||||
store.indexReadyChMu.Unlock()
|
||||
|
||||
// Use a tiny wait so the test runs fast. With the fix in place the
|
||||
// prewarm should time out waiting for ready and SKIP, leaving the
|
||||
// cache untouched. Without the fix it would compute immediately
|
||||
// against the empty byPathHop.
|
||||
prev := repeaterEnrichmentPrewarmWait
|
||||
repeaterEnrichmentPrewarmWait = 50 * time.Millisecond
|
||||
defer func() { repeaterEnrichmentPrewarmWait = prev }()
|
||||
|
||||
stop := store.StartRepeaterEnrichmentRecomputer(24, time.Hour)
|
||||
defer stop()
|
||||
|
||||
// Give the prewarm time to complete (or to skip).
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
|
||||
store.repeaterEnrichMu.Lock()
|
||||
cached := store.repeaterRelayCache
|
||||
at := store.repeaterRelayAt
|
||||
store.repeaterEnrichMu.Unlock()
|
||||
|
||||
if cached != nil || !at.IsZero() {
|
||||
t.Fatalf("expected prewarm to SKIP when indexes not ready (cache==nil, at==zero); got cache=%v at=%v (#1008 M1)",
|
||||
cached != nil, at)
|
||||
}
|
||||
}
|
||||
|
||||
// TestIssue1008_M1_PrewarmRunsWhenReady asserts the prewarm still runs
|
||||
// (cache populated) when the indexes are already ready.
|
||||
func TestIssue1008_M1_PrewarmRunsWhenReady(t *testing.T) {
|
||||
db := setupRichTestDB(t)
|
||||
defer db.Close()
|
||||
store := NewPacketStore(db, nil)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("Load: %v", err)
|
||||
}
|
||||
if !store.WaitIndexesReady(5 * time.Second) {
|
||||
t.Fatal("indexes never ready")
|
||||
}
|
||||
|
||||
stop := store.StartRepeaterEnrichmentRecomputer(24, time.Hour)
|
||||
defer stop()
|
||||
|
||||
// Prewarm is synchronous on the caller's goroutine, so after
|
||||
// Start returns the cache must be populated.
|
||||
store.repeaterEnrichMu.Lock()
|
||||
at := store.repeaterRelayAt
|
||||
store.repeaterEnrichMu.Unlock()
|
||||
|
||||
if at.IsZero() {
|
||||
t.Fatal("expected prewarm to populate repeaterRelayAt when indexes ready (#1008 M1)")
|
||||
}
|
||||
}
|
||||
@@ -1525,6 +1525,10 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) {
|
||||
writeError(w, 503, "Packet store unavailable")
|
||||
return
|
||||
}
|
||||
if !s.store.PathHopIndexReady() {
|
||||
writeIndexLoading503(w)
|
||||
return
|
||||
}
|
||||
|
||||
// Use the precomputed byPathHop index instead of scanning all packets.
|
||||
// Look up by full pubkey (resolved hops) and by short prefixes (raw hops).
|
||||
@@ -2093,6 +2097,10 @@ func (s *Server) handleAnalyticsHashCollisions(w http.ResponseWriter, r *http.Re
|
||||
|
||||
func (s *Server) handleAnalyticsSubpaths(w http.ResponseWriter, r *http.Request) {
|
||||
if s.store != nil {
|
||||
if !s.store.SubpathIndexReady() {
|
||||
writeIndexLoading503(w)
|
||||
return
|
||||
}
|
||||
region := r.URL.Query().Get("region")
|
||||
minLen := queryInt(r, "minLen", 2)
|
||||
if minLen < 2 {
|
||||
@@ -2119,6 +2127,10 @@ func (s *Server) handleAnalyticsSubpaths(w http.ResponseWriter, r *http.Request)
|
||||
// response, avoiding repeated scans of the same packet data. Query format:
|
||||
// ?groups=2-2:50,3-3:30,4-4:20,5-8:15 (minLen-maxLen:limit per group)
|
||||
func (s *Server) handleAnalyticsSubpathsBulk(w http.ResponseWriter, r *http.Request) {
|
||||
if s.store != nil && !s.store.SubpathIndexReady() {
|
||||
writeIndexLoading503(w)
|
||||
return
|
||||
}
|
||||
region := r.URL.Query().Get("region")
|
||||
groupsParam := r.URL.Query().Get("groups")
|
||||
if groupsParam == "" {
|
||||
@@ -2198,6 +2210,10 @@ func (s *Server) handleAnalyticsSubpathDetail(w http.ResponseWriter, r *http.Req
|
||||
}
|
||||
}
|
||||
if s.store != nil {
|
||||
if !s.store.SubpathIndexReady() {
|
||||
writeIndexLoading503(w)
|
||||
return
|
||||
}
|
||||
writeJSON(w, s.store.GetSubpathDetail(rawHops))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -27,6 +27,12 @@ func setupTestServer(t *testing.T) (*Server, *mux.Router) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
// #1008: Load() now defers subpath + path-hop index builds to a
|
||||
// background goroutine. Wait for them before handlers go live so
|
||||
// the existing assertions (which expect 200, not 503) hold.
|
||||
if !store.WaitIndexesReady(5 * time.Second) {
|
||||
t.Fatalf("background indexes never became ready")
|
||||
}
|
||||
srv.store = store
|
||||
router := mux.NewRouter()
|
||||
srv.RegisterRoutes(router)
|
||||
@@ -44,6 +50,10 @@ func setupTestServerWithAPIKey(t *testing.T, apiKey string) (*Server, *mux.Route
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
// #1008: see setupTestServer comment.
|
||||
if !store.WaitIndexesReady(5 * time.Second) {
|
||||
t.Fatalf("background indexes never became ready")
|
||||
}
|
||||
srv.store = store
|
||||
router := mux.NewRouter()
|
||||
srv.RegisterRoutes(router)
|
||||
@@ -1079,6 +1089,7 @@ func TestChannelMessagesWithRegion(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
srv.store = store
|
||||
router := mux.NewRouter()
|
||||
srv.RegisterRoutes(router)
|
||||
@@ -1332,6 +1343,7 @@ func TestResolveHopsAmbiguous(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
srv.store = store
|
||||
router := mux.NewRouter()
|
||||
srv.RegisterRoutes(router)
|
||||
@@ -1578,6 +1590,7 @@ func TestNodeAnalyticsNoNameNode(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
srv.store = store
|
||||
router := mux.NewRouter()
|
||||
srv.RegisterRoutes(router)
|
||||
@@ -1614,6 +1627,7 @@ func TestNodeHealthForNoNameNode(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
srv.store = store
|
||||
router := mux.NewRouter()
|
||||
srv.RegisterRoutes(router)
|
||||
@@ -2250,6 +2264,7 @@ store := NewPacketStore(db, nil)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
|
||||
pk := "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
|
||||
db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'TestNode', 'repeater')", pk)
|
||||
@@ -2298,6 +2313,7 @@ store := NewPacketStore(db, nil)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
|
||||
pk := "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
|
||||
db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'Repeater2B', 'repeater')", pk)
|
||||
@@ -2341,6 +2357,7 @@ func TestGetNodeHashSizeInfoLatestWins(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
|
||||
pk := "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
|
||||
db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'LatestWins', 'repeater')", pk)
|
||||
@@ -2390,6 +2407,7 @@ func TestGetNodeHashSizeInfoIgnoreDirectZeroHop(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
|
||||
pk := "dddd111122223333444455556666777788889999aaaabbbbccccddddeeee3333"
|
||||
db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'DirIgnore', 'repeater')", pk)
|
||||
@@ -2437,6 +2455,7 @@ func TestGetNodeHashSizeInfoOnlyDirectZeroHopIgnored(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
|
||||
pk := "eeee111122223333444455556666777788889999aaaabbbbccccddddeeee4444"
|
||||
db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'OnlyDirect', 'repeater')", pk)
|
||||
@@ -2471,6 +2490,7 @@ func TestGetNodeHashSizeInfoDirectNonZeroHopCounted(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
|
||||
pk := "ffff111122223333444455556666777788889999aaaabbbbccccddddeeee5555"
|
||||
db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'DirNonZero', 'repeater')", pk)
|
||||
@@ -2510,6 +2530,7 @@ func TestGetNodeHashSizeInfoNoAdverts(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
|
||||
pk := "dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd"
|
||||
db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'NoAdverts', 'repeater')", pk)
|
||||
@@ -2543,6 +2564,7 @@ func TestHashAnalyticsZeroHopAdvert(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
|
||||
// Capture baseline from seed data (bypass cache via computeAnalyticsHashSizes)
|
||||
baseline := store.computeAnalyticsHashSizes("", "")
|
||||
@@ -2602,6 +2624,7 @@ func TestAnalyticsHashSizeSameNameDifferentPubkey(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
|
||||
pk1 := "aaaa111122223333444455556666777788889999aaaabbbbccccddddeeee1111"
|
||||
pk2 := "aaaa111122223333444455556666777788889999aaaabbbbccccddddeeee2222"
|
||||
@@ -2670,6 +2693,7 @@ func TestInconsistentNodesExcludesCompanions(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
|
||||
now := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")
|
||||
payloadType := 4
|
||||
@@ -2753,6 +2777,7 @@ func TestHashSizeInfoTimeWindow(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
|
||||
pk := "dd44444444444444444444444444444444444444444444444444444444444444"
|
||||
db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'OldNode', 'repeater')", pk)
|
||||
@@ -2924,6 +2949,7 @@ func TestLatestSeenMaintained(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
|
||||
store.mu.RLock()
|
||||
defer store.mu.RUnlock()
|
||||
@@ -2988,6 +3014,7 @@ func TestQueryGroupedPacketsSortedByLatest(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
|
||||
result := store.QueryGroupedPackets(PacketQuery{Limit: 50})
|
||||
if result.Total < 2 {
|
||||
@@ -3025,6 +3052,7 @@ func TestQueryGroupedPacketsCacheReturnsConsistentResult(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
|
||||
q := PacketQuery{Limit: 50}
|
||||
r1 := store.QueryGroupedPackets(q)
|
||||
@@ -3054,6 +3082,7 @@ func TestGetChannelsCacheReturnsConsistentResult(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
|
||||
r1 := store.GetChannels("")
|
||||
r2 := store.GetChannels("")
|
||||
@@ -3092,6 +3121,7 @@ func TestGetChannelsNotBlockedByLargeLock(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
|
||||
channels := store.GetChannels("")
|
||||
|
||||
@@ -3328,6 +3358,7 @@ func TestHashCollisionsCacheTTL(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
|
||||
if store.collisionCacheTTL != 3600*time.Second {
|
||||
t.Errorf("expected collisionCacheTTL=3600s, got %v", store.collisionCacheTTL)
|
||||
@@ -3372,6 +3403,7 @@ func TestHashCollisionsEmptyStore(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
srv.store = store
|
||||
router := mux.NewRouter()
|
||||
srv.RegisterRoutes(router)
|
||||
@@ -3424,6 +3456,7 @@ func TestHashCollisionsWithCollision(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
// Inject hash_size=1 for both nodes so they appear in the 1-byte bucket
|
||||
store.hashSizeInfoMu.Lock()
|
||||
store.hashSizeInfoCache = map[string]*hashSizeNodeInfo{
|
||||
@@ -3490,6 +3523,7 @@ func TestHashCollisionsShortPublicKey(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
srv.store = store
|
||||
router := mux.NewRouter()
|
||||
srv.RegisterRoutes(router)
|
||||
@@ -3522,6 +3556,7 @@ func TestHashCollisionsMissingCoordinates(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
store.WaitIndexesReady(5 * time.Second)
|
||||
srv.store = store
|
||||
router := mux.NewRouter()
|
||||
srv.RegisterRoutes(router)
|
||||
@@ -3774,6 +3809,11 @@ func TestNodePathsPrefixCollisionFilter(t *testing.T) {
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load failed: %v", err)
|
||||
}
|
||||
// #1008: wait for the background index build to complete before
|
||||
// hitting the handler (otherwise it returns 503 index-loading).
|
||||
if !store.WaitIndexesReady(5 * time.Second) {
|
||||
t.Fatal("indexes never became ready")
|
||||
}
|
||||
srv.store = store
|
||||
|
||||
// Query paths for TestRepeater — should NOT include the collision packet
|
||||
|
||||
+33
-2
@@ -247,6 +247,19 @@ type PacketStore struct {
|
||||
spIndex map[string]int // "hop1,hop2" → count
|
||||
spTxIndex map[string][]*StoreTx // "hop1,hop2" → transmissions containing this subpath
|
||||
spTotalPaths int // transmissions with paths >= 2 hops
|
||||
// Background-build ready gates for spIndex/spTxIndex and byPathHop
|
||||
// (#1008). Flipped from false→true exactly once by the goroutine
|
||||
// kicked off in Load() (or synchronously by the background chunk
|
||||
// loader). Handlers gate reads via SubpathIndexReady() /
|
||||
// PathHopIndexReady(); while false, they respond 503 + Retry-After.
|
||||
subpathReady atomic.Bool
|
||||
pathHopReady atomic.Bool
|
||||
// indexReadyChan is closed exactly once when BOTH subpathReady
|
||||
// and pathHopReady are true (#1008 review m6). Replaces the
|
||||
// previous 2ms poll in WaitIndexesReady. Lazily allocated by
|
||||
// indexReadyCh / maybeCloseIndexReadyCh in index_ready_1008.go.
|
||||
indexReadyChMu sync.Mutex
|
||||
indexReadyChan chan struct{}
|
||||
// Precomputed distance analytics: hop distances and path totals.
|
||||
// Built LAZILY on first /api/analytics/distance request (#1011) —
|
||||
// previously eager in Load() at startup, which was O(n²) work for
|
||||
@@ -837,10 +850,15 @@ func (s *PacketStore) Load() error {
|
||||
}
|
||||
|
||||
// Build precomputed subpath index for O(1) analytics queries
|
||||
s.buildSubpathIndex()
|
||||
// — DEFERRED to a background goroutine (#1008). Same rationale
|
||||
// as the distance index (#1011): synchronous build under s.mu
|
||||
// blocks HTTP readiness ~60s at Cascadia scale. The goroutine is
|
||||
// started AFTER s.loaded = true below.
|
||||
// s.buildSubpathIndex()
|
||||
|
||||
// Build path-hop index for O(1) node path lookups
|
||||
s.buildPathHopIndex()
|
||||
// — DEFERRED to a background goroutine (#1008).
|
||||
// s.buildPathHopIndex()
|
||||
|
||||
// Precompute distance analytics (hop distances, path totals)
|
||||
// — DEFERRED to first /api/analytics/distance request (#1011).
|
||||
@@ -869,6 +887,12 @@ func (s *PacketStore) Load() error {
|
||||
len(s.packets), s.totalObs, elapsed, s.trackedMemoryMB(), s.estimatedMemoryMB())
|
||||
}
|
||||
s.loadMultibyteCapFromDB()
|
||||
// Kick off background subpath + path-hop index builds (#1008).
|
||||
// The goroutine acquires s.mu.Lock() and so will block until Load's
|
||||
// deferred Unlock fires when this function returns. HTTP handlers
|
||||
// gate reads behind SubpathIndexReady() / PathHopIndexReady() and
|
||||
// respond 503 + Retry-After: 5 until the builds finish.
|
||||
s.startBackgroundIndexBuilds()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1282,6 +1306,13 @@ func (s *PacketStore) loadBackgroundChunks() {
|
||||
s.distLazyOnce = sync.Once{}
|
||||
s.distLazyMu.Unlock()
|
||||
s.mu.Unlock()
|
||||
// #1008 review m3: flip the ready flags after the synchronous
|
||||
// rebuild for symmetry with startBackgroundIndexBuilds. Safe
|
||||
// today because the chunk loader runs after Load() has already
|
||||
// kicked the goroutines that set these to true; this is a
|
||||
// belt-and-suspenders against a future reorder where the chunk
|
||||
// loader could be the first writer.
|
||||
s.markIndexesReadySync()
|
||||
|
||||
s.backgroundLoadDone.Store(true)
|
||||
if chunkErrors > 0 {
|
||||
|
||||
Reference in New Issue
Block a user