Files
meshcore-analyzer/cmd/server/analytics_recomputer_test.go
T
Kpa-clawbot 356f001027 perf(#1240): steady-state background recompute for analytics endpoints (#1248)
RED commit: `27630f6a` — adds latency test that fails on master
(p99=225ms > 50ms budget) and a stub `StartAnalyticsRecomputers` that
returns a no-op so the assertion (not a build error) gates the change.

GREEN commit: `20fbbceb` — wires real background recompute
infrastructure. Test passes at p99=~1µs.

## What changed

Replaces the on-request "compute-then-cache" pattern for the
default-shape analytics queries with a steady-state background recompute
loop. Reads always hit an `atomic.Value` snapshot in <1µs regardless of
compute cost or writer contention. Operator principle: serving slightly
stale data quickly beats real-time data slowly.

## Endpoints converted (default 5min interval each)

| Endpoint | Cold compute | Recomputer interval |
|---|---|---|
| `/api/analytics/topology` | ~5s | 5 min |
| `/api/analytics/rf` | ~4s | 5 min |
| `/api/analytics/distance` | ~3s | 5 min |
| `/api/analytics/channels` | ~0.5s | 5 min |
| `/api/analytics/hash-collisions` | ~0.5s | 5 min |
| `/api/analytics/hash-sizes` | ~22ms | 5 min |

All intervals configurable per-endpoint via
`analytics.recomputeIntervalSeconds.<name>` in `config.json`; documented
in `config.example.json`. Default override via
`analytics.defaultIntervalSeconds`.

## Scope: default query only

Only the canonical shape `(region="", window=zero)` is precomputed.
Region- or window-filtered requests fall back to the legacy TTL cache +
on-request compute — keeps recomputer count bounded (6, not 6×N×M).

## Latency

Test `TestAnalyticsRecomputerSteadyStateLatency`: 100 concurrent readers
+ 4 writers churning `s.mu.Lock` on 20k distHops.
- Before: p50=188ms p99=225ms (assertion failed)
- After:  p50=240ns p99=1.1µs (atomic load + map return)

## Shutdown integration

`StartAnalyticsRecomputers` returns a stop closure invoked from
`main.go`'s SIGTERM handler BEFORE `dbClose()` so any in-flight SQLite
compute drains cleanly. `TestAnalyticsRecomputerShutdownNoLeak` confirms
all 6 goroutines are reaped (Δ=6 within 2s).

## Safety details

- Initial compute is synchronous in `Start()` — first read after startup
never sees nil.
- `recover()` inside `runOnce` keeps a compute panic from killing the
goroutine; previous snapshot remains valid.
- `analyticsRecomputerMu` is a sync.RWMutex; recomputer pointers are
read-locked in the hot path. The atomic.Value swap inside `runOnce` is
lock-free.

Fixes #1240.

---------

Co-authored-by: OpenClaw Bot <bot@openclaw.local>
2026-05-17 17:33:30 +00:00

175 lines
5.4 KiB
Go

package main
import (
"runtime"
"sort"
"sync"
"sync/atomic"
"testing"
"time"
)
func numGoroutinesForTest() int { return runtime.NumGoroutine() }
// TestAnalyticsRecomputerSteadyStateLatency asserts that issue #1240's
// steady-state background recompute is in place: reads of the common
// analytics endpoints (region="") return from cache in <50ms p99 even
// under simulated ingest load.
//
// On master (pre-fix), GetAnalyticsTopology holds s.mu.RLock for the
// entire compute. Concurrent ingest writers (s.mu.Lock) starve readers
// or vice versa, producing per-read latencies in the hundreds of
// milliseconds. The cache TTL doesn't help: after every expiry one
// reader still pays the full compute cost.
//
// Post-fix, GetAnalyticsTopology with region="" and zero window must
// Load() from the background-refreshed atomic snapshot — never blocking
// under writer contention.
func TestAnalyticsRecomputerSteadyStateLatency(t *testing.T) {
if testing.Short() {
t.Skip("skipping latency timing test in -short mode")
}
db := setupTestDB(t)
defer db.Close()
store := NewPacketStore(db, nil)
// Populate with enough records to make on-request compute non-trivial.
const N = 20000
hops := make([]distHopRecord, N)
for i := 0; i < N; i++ {
hops[i] = distHopRecord{
FromName: "A", FromPk: "aa",
ToName: "B", ToPk: "bb",
Dist: float64(i%500) + 0.5,
Type: []string{"R↔R", "C↔R", "C↔C"}[i%3],
Hash: "h",
Timestamp: "2024-01-01T00:00:00Z",
HourBucket: "2024-01-01-00",
}
}
store.mu.Lock()
store.distHops = hops
store.mu.Unlock()
// Start the recomputer infrastructure. On master this method
// doesn't exist, so this test won't compile until the GREEN commit
// lands; the RED commit lands the test + a stub. Stub returns
// without wiring background recompute, so the test still fails on
// the latency assertion below.
stop := store.StartAnalyticsRecomputers(10 * time.Millisecond)
defer stop()
// Give the initial compute a moment to populate.
time.Sleep(50 * time.Millisecond)
// Simulated writer: contend for s.mu.Lock. This is what makes the
// non-recomputer path miss the latency target — the old
// GetAnalyticsTopology grabs s.mu.RLock for the entire compute and
// blocks behind every writer cycle.
var stopWriters atomic.Bool
var writerWg sync.WaitGroup
const Writers = 4
writerWg.Add(Writers)
for w := 0; w < Writers; w++ {
go func() {
defer writerWg.Done()
for !stopWriters.Load() {
store.mu.Lock()
// Trivial mutation: extend distHops by one and shrink back.
store.distHops = append(store.distHops, distHopRecord{
Dist: 1, Hash: "x", Timestamp: "2024-01-01T00:00:00Z",
})
store.distHops = store.distHops[:len(store.distHops)-1]
store.mu.Unlock()
// Brief pause to keep the lock-cycle rate realistic.
time.Sleep(100 * time.Microsecond)
}
}()
}
// 100 concurrent reads.
const Readers = 100
latencies := make([]time.Duration, Readers)
var rwg sync.WaitGroup
rwg.Add(Readers)
for i := 0; i < Readers; i++ {
i := i
go func() {
defer rwg.Done()
t0 := time.Now()
r := store.GetAnalyticsDistance("")
latencies[i] = time.Since(t0)
if r == nil {
t.Errorf("reader %d got nil result", i)
}
}()
}
rwg.Wait()
stopWriters.Store(true)
writerWg.Wait()
sort.Slice(latencies, func(i, j int) bool { return latencies[i] < latencies[j] })
p50 := latencies[Readers/2]
p99 := latencies[(Readers*99)/100]
t.Logf("analytics distance read latency: p50=%v p99=%v max=%v",
p50, p99, latencies[Readers-1])
// p99 budget: 50ms. Atomic-pointer load + JSON-shape map return
// should be sub-millisecond; 50ms leaves margin for goroutine
// scheduling jitter under concurrent test runs.
const budget = 50 * time.Millisecond
if p99 > budget {
t.Fatalf("p99 read latency %v exceeds %v budget (issue #1240 not in effect)", p99, budget)
}
}
// TestAnalyticsRecomputerShutdownNoLeak asserts the background
// goroutines started by StartAnalyticsRecomputers exit cleanly when
// the returned stop function is called — no leak across server
// shutdown (issue #1240 acceptance criterion).
func TestAnalyticsRecomputerShutdownNoLeak(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
store := NewPacketStore(db, nil)
// Use a tight tick so we know recompute is actually running (not
// just blocked on the ticker).
stop := store.StartAnalyticsRecomputers(20 * time.Millisecond)
// Snapshot active goroutines a beat after start.
time.Sleep(80 * time.Millisecond)
startGoroutines := runtimeNumGoroutine()
stop()
// After stop returns, give the scheduler a beat to reap exits.
deadline := time.Now().Add(2 * time.Second)
var endGoroutines int
for time.Now().Before(deadline) {
endGoroutines = runtimeNumGoroutine()
if endGoroutines <= startGoroutines-5 { // we started 6 recomputers
break
}
time.Sleep(20 * time.Millisecond)
}
// We expect ~6 fewer goroutines than the snapshot taken DURING
// recompute (one per registered recomputer). Allow some slack
// since test runners can have flaky goroutine counts.
if endGoroutines >= startGoroutines {
t.Fatalf("goroutine leak after stop: %d → %d (expected fewer)",
startGoroutines, endGoroutines)
}
t.Logf("goroutines: during=%d after=%d (Δ=%d)",
startGoroutines, endGoroutines, startGoroutines-endGoroutines)
}
// runtimeNumGoroutine is wrapped to keep the imports section of the
// production file minimal.
func runtimeNumGoroutine() int {
// imported below
return numGoroutinesForTest()
}