Files
meshcore-analyzer/cmd/server/observers_cache_test.go
T
Kpa-clawbot 13bdee57d4 perf: P0 hot-path fixes (observers, neighbor-graph, observer-analytics) (#1481) (#1483)
## What

Three of the four P0s from #1481's scale-test findings. Each cuts a
distinct
hot path; together they target /api/observers,
/api/analytics/neighbor-graph,
and /api/observers/{id}/analytics — the top three live offenders.

### P0-1: 5-min atomic-pointer cache for default neighbor-graph response
- Live p95 10.8s on the most-trafficked organic endpoint.
- Background recomputer (5-min cadence per operator directive) builds
the
  default-filter (`minCount=5 minScore=0.1`, no region, no role)
  `NeighborGraphResponse` and stores it via `atomic.Pointer`.
- `handleNeighborGraph` short-circuits on the default shape; non-default
filters take the extracted `computeNeighborGraphResponse` path
(identical
  semantics to the previous inline build).

### P0-2: cache parsed `StoreObs.Timestamp` + drop RLock window
- `handleObserverAnalytics` re-parsed the RFC3339 timestamp three times
  per observation, for 60k+ observations per active observer, under
  `s.store.mu.RLock` — blocking writers for the full scan.
- `StoreObs.ParsedTime()` parses once via `sync.Once` (mirrors
  `StoreTx.ParsedDecoded`).
- Handler snapshots the `byObserver[id]` pointer slice, releases the
  RLock immediately, then iterates locally.

### P0-3: 30s cache for `/api/observers` + sargable `IN` + covering
index
- Three SQL queries on every request → ~1.7s p50 at 50-concurrent.
- Atomic-pointer 30s cache for the default (no-filter) query.
- `GetNodeLocationsByKeys` drops `LOWER(public_key) IN (...)`
(non-sargable);
  callers pre-lowercase in Go and the plain `IN` matches the existing
  `public_key` index.
- New ingestor migration `obs_observer_ts_idx_v1` adds composite index
  `idx_observations_observer_idx_timestamp(observer_idx, timestamp)` so
  `GetObserverPacketCounts` can resolve its GROUP-BY + range filter from
  the index without scanning the 1.9M-row observations table.

### P0-4: deferred
`perfMiddleware`'s global mutex was claimed to serialize every API
request.
A direct test (`50 concurrent requests through the middleware, handler
sleeps 20ms each`) shows total elapsed ≈ 25ms, not 1s — the lock is held
only for the post-handler bookkeeping (a few µs). Real impact is below
measurement noise. Skipping to avoid invasive churn on PerfStats
consumers
without a demonstrable win.

## Test plan

Red → green per P0:
- `observers_cache_test.go` — handler reads `s.observersCache` before
SQL,
  TTL boundary, atomic.Pointer (no mutex contention).
- `storeobs_parsedtime_test.go` — parses three timestamp shapes, caches
  result, no race under concurrent readers.
- `neighbor_graph_cache_test.go` — handler serves from atomic pointer
  when set, bypasses cache when `?region=` (or any non-default filter)
  is passed.

Full server + ingestor suites pass: `go test -count=1 ./...`.

## Perf proof

Before/after p50/p95/p99 (50 requests × 50 concurrent) against prod
(before)
and staging once CI deploys (after) will be posted as a PR comment per
the
operator's "no merge without proof of improvement" gate.

Closes #1481


## TDD exemption — P0-1 and P0-2 (net-new surfaces, AGENTS.md)

Per CoreScope `AGENTS.md` § "Exemptions": **net-new code surfaces with
no
prior tests to break** may land tests in the same PR without a strict
test-first → impl commit split.

- **P0-1 (neighbor-graph atomic-pointer cache)** — `neighborGraphCache`,
  `recomputeNeighborGraphCache`, `loadNeighborGraphCacheBytes`,
  `startNeighborGraphRecomputer` and the default-shape short-circuit in
  `handleNeighborGraph` were brand-new code with no pre-existing
  assertions covering them. There was no green test to first turn red.
- **P0-2 (cached `StoreObs.Timestamp` + RLock window drop)** —
  `StoreObs.ParsedTime()` and the snapshot+release pattern in
  `handleObserverAnalytics` were new surfaces; the prior code did the
  parse inline per call with no behavioural test to break.

P0-3 was authored properly red-then-green (commit `6e63ec6a` red, then
`83ae129b` green) and does NOT use this exemption.

## Default-filter detection vs frontend reality (#1483 follow-up)

The Neighbor Graph analytics tab in `public/analytics.js` fetches
`/analytics/neighbor-graph?min_count=1&min_score=0` because the
client-side sliders need the full edge set to filter from. That shape
did NOT match the `(5, 0.1)` cached default, so the UI tab still paid
the cold compute cost despite #1481 P0-1.

The #1483 follow-up commit caches BOTH shapes in the same recomputer
pass:
- `(minCount=5, minScore=0.1, no region, no role)` — `live.js`
  affinity-scoring consumer.
- `(minCount=1, minScore=0, no region, no role)` — analytics tab.

Both are served from `atomic.Pointer` with an `X-Cache-Age-Seconds`
header. The per-shape cost in the background goroutine is roughly
linear in edge count; total recompute time stays well under the
5-minute cadence on prod-scale graphs.

---------

Co-authored-by: openclaw-bot <bot@openclaw.dev>
Co-authored-by: mc-bot <mc-bot@users.noreply.github.com>
2026-05-29 02:42:21 -07:00

179 lines
5.7 KiB
Go

package main
import (
"net/http/httptest"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
// TestObserversCacheServesFromAtomicPointer asserts the /api/observers default
// (no-filter) handler serves from an in-memory snapshot after the first request,
// not from SQL. Issue #1481 P0-3.
func TestObserversCacheServesFromAtomicPointer(t *testing.T) {
s := &Server{}
resp := ObserverListResponse{
Observers: []ObserverResp{{ID: "abc", Name: "test"}},
ServerTime: time.Now().UTC().Format(time.RFC3339),
}
s.observersCacheV2.ptr.Store(&observersCacheEntry{resp: resp, at: time.Now()})
req := httptest.NewRequest("GET", "/api/observers", nil)
w := httptest.NewRecorder()
s.handleObservers(w, req)
if w.Code != 200 {
t.Fatalf("status=%d body=%s", w.Code, w.Body.String())
}
body := w.Body.String()
if !strings.Contains(body, `"id":"abc"`) {
t.Fatalf("expected cached observer in body, got: %s", body)
}
if h := w.Header().Get("X-Cache-Age-Seconds"); h == "" {
t.Error("expected X-Cache-Age-Seconds header on cached response")
}
}
// TestObserversCacheTTLBoundary exercises the helper AND the handler:
// an entry older than TTL must NOT be served. We assert by toggling the
// stored entry's age and observing whether the handler re-enters the
// build path (signaled by the request producing a stale-sentinel body
// from cache vs. attempting SQL on a nil DB → 500).
func TestObserversCacheTTLBoundary(t *testing.T) {
if d := observersCacheTTL; d != 30*time.Second {
t.Errorf("observersCacheTTL want 30s, got %v", d)
}
s := &Server{}
if !s.observersCacheExpired(time.Time{}) {
t.Error("zero time should be expired")
}
if s.observersCacheExpired(time.Now()) {
t.Error("just-now should not be expired")
}
if !s.observersCacheExpired(time.Now().Add(-31 * time.Second)) {
t.Error("31s ago should be expired")
}
// Handler integration: fresh entry → served from cache.
fresh := ObserverListResponse{
Observers: []ObserverResp{{ID: "fresh-sentinel"}},
ServerTime: time.Now().UTC().Format(time.RFC3339),
}
s.observersCacheV2.ptr.Store(&observersCacheEntry{resp: fresh, at: time.Now()})
req := httptest.NewRequest("GET", "/api/observers", nil)
w := httptest.NewRecorder()
s.handleObservers(w, req)
if !strings.Contains(w.Body.String(), "fresh-sentinel") {
t.Fatalf("fresh cache should be served by handler, body=%s", w.Body.String())
}
// Stale entry → handler must NOT serve it; it will enter the
// singleflight build path and (with nil DB) crash. We assert it
// did NOT short-circuit by checking the response is not the stale
// sentinel: either 500 or panic-recover. Use defer recover().
stale := ObserverListResponse{
Observers: []ObserverResp{{ID: "stale-sentinel"}},
ServerTime: time.Now().UTC().Format(time.RFC3339),
}
s.observersCacheV2.ptr.Store(&observersCacheEntry{resp: stale, at: time.Now().Add(-31 * time.Second)})
w2 := httptest.NewRecorder()
func() {
defer func() { _ = recover() }()
s.handleObservers(w2, req)
}()
if strings.Contains(w2.Body.String(), "stale-sentinel") {
t.Fatalf("stale cache MUST NOT be served by handler, body=%s", w2.Body.String())
}
}
// TestObserversCacheSingleflightCollapsesStampede fires N concurrent
// requests at a fresh (empty) cache and asserts the underlying fill
// runs exactly once — singleflight collapsed the herd. #1483 follow-up.
func TestObserversCacheSingleflightCollapsesStampede(t *testing.T) {
s := &Server{}
// Pre-empt the SQL path by storing a STALE entry. The handler will
// then enter singleflight and call buildObserversDefaultResponse,
// which nil-derefs on s.db. We can't use the real build for this
// test, so we install a sentinel by storing an entry post-flight.
// Simpler: use the singleflight Group directly to count calls
// across N goroutines via Do() — that's exactly the contract.
const N = 50
var wg sync.WaitGroup
var calls atomic.Int64
wg.Add(N)
start := make(chan struct{})
for i := 0; i < N; i++ {
go func() {
defer wg.Done()
<-start
_, _, _ = s.observersCacheV2.sf.Do(observersCacheFlightKey, func() (interface{}, error) {
calls.Add(1)
time.Sleep(20 * time.Millisecond) // hold the flight long enough to catch stragglers
return "ok", nil
})
}()
}
close(start)
wg.Wait()
if got := calls.Load(); got != 1 {
t.Fatalf("singleflight must collapse %d concurrent requests into 1 fill, got %d", N, got)
}
}
// TestObserversCacheConcurrentReadersDuringRecompute asserts that
// readers can keep reading the OLD entry while a recompute is in
// flight (atomic.Pointer payload immutability). #1483 follow-up.
func TestObserversCacheConcurrentReadersDuringRecompute(t *testing.T) {
s := &Server{}
initial := ObserverListResponse{
Observers: []ObserverResp{{ID: "v1"}},
}
s.observersCacheV2.ptr.Store(&observersCacheEntry{resp: initial, at: time.Now()})
var wg sync.WaitGroup
stop := make(chan struct{})
var observedV1, observedV2 atomic.Int64
for i := 0; i < 8; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stop:
return
default:
e := s.observersCacheV2.ptr.Load()
if e == nil {
continue
}
if len(e.resp.Observers) > 0 {
switch e.resp.Observers[0].ID {
case "v1":
observedV1.Add(1)
case "v2":
observedV2.Add(1)
}
}
}
}
}()
}
time.Sleep(5 * time.Millisecond)
// Swap in v2 while readers are running.
updated := ObserverListResponse{
Observers: []ObserverResp{{ID: "v2"}},
}
s.observersCacheV2.ptr.Store(&observersCacheEntry{resp: updated, at: time.Now()})
time.Sleep(5 * time.Millisecond)
close(stop)
wg.Wait()
if observedV1.Load() == 0 {
t.Error("expected at least one read of v1 before swap")
}
if observedV2.Load() == 0 {
t.Error("expected at least one read of v2 after swap")
}
}