Files
meshcore-analyzer/cmd/server/analytics_recomputer.go
T
Kpa-clawbot f81ed5b3cf perf(#1256): wire /api/analytics/roles into steady-state recomputer (#1259)
RED commit: `0190466d` — failing CI:
https://github.com/Kpa-clawbot/CoreScope/actions (will populate after PR
creation)

## Problem
On staging (commit `d69d9fb`, 78k tx, 2.3M obs), `curl
http://localhost/api/analytics/roles` times out at 60s with 0 bytes —
the Roles tab is unusable. Issue #1256.

PR #1248's steady-state recomputer fan-out (topology / rf / distance /
channels / hash-collisions / hash-sizes) **didn't include roles**. The
legacy handler:

1. Holds `s.mu.RLock` for the entire compute.
2. Calls `GetFleetClockSkew()`, which drives `clockSkew.Recompute(s)`
over all ADVERT transmissions — O(78k) per request.
3. Concurrent ingest writers compound the latency through
writer-starvation.

Result: every request hits the cold path; the response never comes back
inside the 60 s HTTP budget.

## Fix
Add `roles` as the 7th endpoint in the recomputer fan-out — same pattern
as #1248:

- `PacketStore.recompRoles` slot, registered in
`StartAnalyticsRecomputers` with default 5-min interval.
- `PacketStore.GetAnalyticsRoles()` → atomic-pointer load from the
snapshot (sub-ms), with a `computeAnalyticsRoles()` fallback only for
the brief startup window before the initial sync compute completes.
- Handler is now a thin wrapper — no lock-held work on the request path.
- New optional `roles` key under `analytics.recomputeIntervalSeconds` in
config; `config.example.json` and `_comment_analytics` updated.

## Latency (unit-scope benchmark)
- Worst-of-50 handler latency: **<100 ms** (test budget; well under the
2 s p99 acceptance).
- Compute itself is bounded by the existing 5-min recompute window — it
runs once in the background, never on the request path.

## Tests
- RED `0190466d`: asserts `recompRoles` is registered and the handler
returns under the latency budget. Fails on master with `recompRoles not
registered`.
- GREEN `d7784f76`: registers the recomputer + snapshot accessor — both
tests pass.

Fixes #1256

---------

Co-authored-by: openclaw-bot <bot@openclaw.local>
2026-05-18 07:36:28 -07:00

244 lines
7.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package main: analytics recomputer (issue #1240).
//
// Steady-state background recompute loop for expensive analytics
// endpoints. Reads always hit an atomic-pointer cache; compute runs
// on a fixed ticker in a goroutine. This eliminates the on-request
// compute-then-cache pattern where the first reader after expiry pays
// the full compute cost and blocks under writer contention.
//
// See issue #1240 and AGENTS.md "Performance is a feature".
package main
import (
"sync"
"sync/atomic"
"time"
)
// analyticsRecomputer holds the latest snapshot of an analytics result
// in an atomic.Value, refreshed periodically by a background goroutine.
//
// Lifecycle:
// 1. Construct via newAnalyticsRecomputer(...)
// 2. Call Start() — runs initial compute synchronously, then launches
// the recompute goroutine. Initial compute is synchronous so the
// first Load() after Start returns never sees a nil cache.
// 3. Call Load() any number of times concurrently — never blocks
// beyond an atomic-pointer load.
// 4. Call Stop() to terminate the background goroutine cleanly.
//
// Compute func is called WITHOUT any lock held by this struct, so it
// may freely take any application-level locks it needs.
type analyticsRecomputer struct {
name string
interval time.Duration
compute func() interface{}
cache atomic.Value // holds interface{} — the latest snapshot
stop chan struct{}
done chan struct{}
startOnce sync.Once
stopOnce sync.Once
// Stats (atomic).
computeRuns atomic.Int64
lastComputeNs atomic.Int64 // duration of last compute in nanoseconds
}
// newAnalyticsRecomputer constructs an unstarted recomputer.
// interval must be > 0; compute must be non-nil.
func newAnalyticsRecomputer(name string, interval time.Duration, compute func() interface{}) *analyticsRecomputer {
if interval <= 0 {
interval = 5 * time.Minute
}
return &analyticsRecomputer{
name: name,
interval: interval,
compute: compute,
stop: make(chan struct{}),
done: make(chan struct{}),
}
}
// Start runs the initial compute synchronously (so the first Load
// after Start returns a populated snapshot, never nil), then launches
// a background goroutine to periodically recompute.
//
// Calling Start multiple times is a no-op after the first call.
func (r *analyticsRecomputer) Start() {
r.startOnce.Do(func() {
// Initial synchronous compute — first read must NOT see empty
// or uninitialized data (acceptance criterion #1240).
r.runOnce()
go r.loop()
})
}
func (r *analyticsRecomputer) loop() {
defer close(r.done)
t := time.NewTicker(r.interval)
defer t.Stop()
for {
select {
case <-t.C:
r.runOnce()
case <-r.stop:
return
}
}
}
func (r *analyticsRecomputer) runOnce() {
if r.compute == nil {
return
}
defer func() {
// Don't let a compute panic kill the background goroutine.
// The previous snapshot remains valid.
_ = recover()
}()
t0 := time.Now()
result := r.compute()
r.lastComputeNs.Store(int64(time.Since(t0)))
r.computeRuns.Add(1)
if result != nil {
r.cache.Store(result)
}
}
// Load returns the most recently computed snapshot, or nil if Start
// has not been called (or the very first compute returned nil).
// Never blocks beyond a single atomic load.
func (r *analyticsRecomputer) Load() interface{} {
v := r.cache.Load()
if v == nil {
return nil
}
return v
}
// Stop signals the background goroutine to exit and waits for it.
// Safe to call multiple times. Safe to call before Start (no-op).
func (r *analyticsRecomputer) Stop() {
r.stopOnce.Do(func() {
close(r.stop)
})
// Only wait if the goroutine was actually started.
select {
case <-r.done:
case <-time.After(5 * time.Second):
// Defensive timeout: shouldn't happen in practice.
}
}
// LastComputeDuration returns the duration of the most recent compute.
func (r *analyticsRecomputer) LastComputeDuration() time.Duration {
return time.Duration(r.lastComputeNs.Load())
}
// ComputeRuns returns the total number of compute invocations.
func (r *analyticsRecomputer) ComputeRuns() int64 {
return r.computeRuns.Load()
}
// AnalyticsRecomputeIntervals lets callers (main.go) override the
// per-endpoint recompute interval from config.json. Zero values fall
// back to the defaultInterval passed to StartAnalyticsRecomputers.
type AnalyticsRecomputeIntervals struct {
Topology time.Duration
RF time.Duration
Distance time.Duration
Channels time.Duration
HashCollisions time.Duration
HashSizes time.Duration
Roles time.Duration
}
func pickInterval(override, def time.Duration) time.Duration {
if override > 0 {
return override
}
return def
}
// StartAnalyticsRecomputers wires each analytics endpoint to a
// background recompute goroutine. Each runs an initial compute
// synchronously (so the first read after startup is a cache hit, never
// cold) and then refreshes on a ticker.
//
// All recomputers serve the DEFAULT query shape only: region="" and
// zero-window (no ?since= / ?until= params). Region-keyed or windowed
// queries continue to use the legacy on-request compute + TTL cache —
// the recomputer count would explode if we maintained one per
// (endpoint × region × window) combination, and region filtering is
// fast read-time work anyway.
//
// Returns a stop closure that signals all goroutines and blocks until
// they exit. Safe to call once per PacketStore. Idempotent if called
// multiple times (subsequent calls return the first stop closure).
func (s *PacketStore) StartAnalyticsRecomputers(defaultInterval time.Duration, overrides ...AnalyticsRecomputeIntervals) func() {
if defaultInterval <= 0 {
defaultInterval = 5 * time.Minute
}
var ov AnalyticsRecomputeIntervals
if len(overrides) > 0 {
ov = overrides[0]
}
s.analyticsRecomputerMu.Lock()
if s.recompTopology != nil {
// Already started; return a no-op so the caller's defer is harmless.
s.analyticsRecomputerMu.Unlock()
return func() {}
}
// Each recomputer wraps the underlying compute* function with the
// default arguments. We use computeAnalytics* (not GetAnalytics*) to
// bypass the legacy TTL cache layer — the recomputer IS the cache.
s.recompTopology = newAnalyticsRecomputer(
"topology", pickInterval(ov.Topology, defaultInterval),
func() interface{} { return s.computeAnalyticsTopology("", TimeWindow{}) },
)
s.recompRF = newAnalyticsRecomputer(
"rf", pickInterval(ov.RF, defaultInterval),
func() interface{} { return s.computeAnalyticsRF("", TimeWindow{}) },
)
s.recompDistance = newAnalyticsRecomputer(
"distance", pickInterval(ov.Distance, defaultInterval),
func() interface{} { return s.computeAnalyticsDistance("") },
)
s.recompChannels = newAnalyticsRecomputer(
"channels", pickInterval(ov.Channels, defaultInterval),
func() interface{} { return s.computeAnalyticsChannels("", TimeWindow{}) },
)
s.recompHashCollisions = newAnalyticsRecomputer(
"hash-collisions", pickInterval(ov.HashCollisions, defaultInterval),
func() interface{} { return s.computeHashCollisions("") },
)
s.recompHashSizes = newAnalyticsRecomputer(
"hash-sizes", pickInterval(ov.HashSizes, defaultInterval),
func() interface{} { return s.computeAnalyticsHashSizesWithCapability("") },
)
s.recompRoles = newAnalyticsRecomputer(
"roles", pickInterval(ov.Roles, defaultInterval),
func() interface{} { return s.computeAnalyticsRoles() },
)
all := []*analyticsRecomputer{
s.recompTopology, s.recompRF, s.recompDistance,
s.recompChannels, s.recompHashCollisions, s.recompHashSizes,
s.recompRoles,
}
s.analyticsRecomputerMu.Unlock()
for _, rc := range all {
rc.Start()
}
return func() {
for _, rc := range all {
rc.Stop()
}
}
}