mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-05-19 16:26:02 +00:00
4cd8445233
RED: 97f49a0c · CI:
https://github.com/Kpa-clawbot/CoreScope/actions/runs/26046530920
Fixes #1265.
## Problem
On staging two clock-skew endpoints serve compute-on-request:
- `/api/observers/clock-skew` — 3.3s
- `/api/nodes/clock-skew` — 8.9s
Both drive a full `clockSkew.Recompute` over 100k+ adverts while holding
`s.mu.RLock`, blocking under concurrent reader load.
## Fix
Wire both endpoints into the established `analytics_recomputer.go`
pattern (PRs #1248 / #1259 / #1263). Two new slots:
- `recompObserversClockSkew` — wraps `computeObserverCalibrations()`
- `recompNodesClockSkew` — wraps `computeFleetClockSkew()`
Accessors `GetObserverCalibrations` / `GetFleetClockSkew` now prefer the
atomic-pointer snapshot; on-request compute is fallback-only for the
brief window before initial sync compute lands (and for tests that skip
the recomputer).
Default interval **300s**, overridable via:
```json
"analytics": {
"recomputeIntervalSeconds": {
"observersClockSkew": 300,
"nodesClockSkew": 300
}
}
```
`config.example.json` + the `_comment_analytics` doc updated.
## TDD
- RED `97f49a0c` — `TestClockSkewRecomputersRegistered` +
`TestClockSkewHandlersSteadyStateLatency` (8 concurrent readers × 25
reqs per endpoint, p99 < 100ms gate). Fails on master: recomputer slots
nil.
- GREEN `19599375` — wire + accessor switch. p99 well under 5ms on the
test fixture.
## Verification
```
cd cmd/server && go test ./... -count=1 # ok 42s
bash ~/.openclaw/skills/pr-preflight/scripts/run-all.sh origin/master # all gates pass
```
---------
Co-authored-by: CoreScope Bot <bot@corescope.local>
255 lines
8.1 KiB
Go
255 lines
8.1 KiB
Go
// Package main: analytics recomputer (issue #1240).
|
||
//
|
||
// Steady-state background recompute loop for expensive analytics
|
||
// endpoints. Reads always hit an atomic-pointer cache; compute runs
|
||
// on a fixed ticker in a goroutine. This eliminates the on-request
|
||
// compute-then-cache pattern where the first reader after expiry pays
|
||
// the full compute cost and blocks under writer contention.
|
||
//
|
||
// See issue #1240 and AGENTS.md "Performance is a feature".
|
||
package main
|
||
|
||
import (
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
)
|
||
|
||
// analyticsRecomputer holds the latest snapshot of an analytics result
|
||
// in an atomic.Value, refreshed periodically by a background goroutine.
|
||
//
|
||
// Lifecycle:
|
||
// 1. Construct via newAnalyticsRecomputer(...)
|
||
// 2. Call Start() — runs initial compute synchronously, then launches
|
||
// the recompute goroutine. Initial compute is synchronous so the
|
||
// first Load() after Start returns never sees a nil cache.
|
||
// 3. Call Load() any number of times concurrently — never blocks
|
||
// beyond an atomic-pointer load.
|
||
// 4. Call Stop() to terminate the background goroutine cleanly.
|
||
//
|
||
// Compute func is called WITHOUT any lock held by this struct, so it
|
||
// may freely take any application-level locks it needs.
|
||
type analyticsRecomputer struct {
|
||
name string
|
||
interval time.Duration
|
||
compute func() interface{}
|
||
|
||
cache atomic.Value // holds interface{} — the latest snapshot
|
||
stop chan struct{}
|
||
done chan struct{}
|
||
|
||
startOnce sync.Once
|
||
stopOnce sync.Once
|
||
|
||
// Stats (atomic).
|
||
computeRuns atomic.Int64
|
||
lastComputeNs atomic.Int64 // duration of last compute in nanoseconds
|
||
}
|
||
|
||
// newAnalyticsRecomputer constructs an unstarted recomputer.
|
||
// interval must be > 0; compute must be non-nil.
|
||
func newAnalyticsRecomputer(name string, interval time.Duration, compute func() interface{}) *analyticsRecomputer {
|
||
if interval <= 0 {
|
||
interval = 5 * time.Minute
|
||
}
|
||
return &analyticsRecomputer{
|
||
name: name,
|
||
interval: interval,
|
||
compute: compute,
|
||
stop: make(chan struct{}),
|
||
done: make(chan struct{}),
|
||
}
|
||
}
|
||
|
||
// Start runs the initial compute synchronously (so the first Load
|
||
// after Start returns a populated snapshot, never nil), then launches
|
||
// a background goroutine to periodically recompute.
|
||
//
|
||
// Calling Start multiple times is a no-op after the first call.
|
||
func (r *analyticsRecomputer) Start() {
|
||
r.startOnce.Do(func() {
|
||
// Initial synchronous compute — first read must NOT see empty
|
||
// or uninitialized data (acceptance criterion #1240).
|
||
r.runOnce()
|
||
go r.loop()
|
||
})
|
||
}
|
||
|
||
func (r *analyticsRecomputer) loop() {
|
||
defer close(r.done)
|
||
t := time.NewTicker(r.interval)
|
||
defer t.Stop()
|
||
for {
|
||
select {
|
||
case <-t.C:
|
||
r.runOnce()
|
||
case <-r.stop:
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
func (r *analyticsRecomputer) runOnce() {
|
||
if r.compute == nil {
|
||
return
|
||
}
|
||
defer func() {
|
||
// Don't let a compute panic kill the background goroutine.
|
||
// The previous snapshot remains valid.
|
||
_ = recover()
|
||
}()
|
||
t0 := time.Now()
|
||
result := r.compute()
|
||
r.lastComputeNs.Store(int64(time.Since(t0)))
|
||
r.computeRuns.Add(1)
|
||
if result != nil {
|
||
r.cache.Store(result)
|
||
}
|
||
}
|
||
|
||
// Load returns the most recently computed snapshot, or nil if Start
|
||
// has not been called (or the very first compute returned nil).
|
||
// Never blocks beyond a single atomic load.
|
||
func (r *analyticsRecomputer) Load() interface{} {
|
||
v := r.cache.Load()
|
||
if v == nil {
|
||
return nil
|
||
}
|
||
return v
|
||
}
|
||
|
||
// Stop signals the background goroutine to exit and waits for it.
|
||
// Safe to call multiple times. Safe to call before Start (no-op).
|
||
func (r *analyticsRecomputer) Stop() {
|
||
r.stopOnce.Do(func() {
|
||
close(r.stop)
|
||
})
|
||
// Only wait if the goroutine was actually started.
|
||
select {
|
||
case <-r.done:
|
||
case <-time.After(5 * time.Second):
|
||
// Defensive timeout: shouldn't happen in practice.
|
||
}
|
||
}
|
||
|
||
// LastComputeDuration returns the duration of the most recent compute.
|
||
func (r *analyticsRecomputer) LastComputeDuration() time.Duration {
|
||
return time.Duration(r.lastComputeNs.Load())
|
||
}
|
||
|
||
// ComputeRuns returns the total number of compute invocations.
|
||
func (r *analyticsRecomputer) ComputeRuns() int64 {
|
||
return r.computeRuns.Load()
|
||
}
|
||
|
||
// AnalyticsRecomputeIntervals lets callers (main.go) override the
|
||
// per-endpoint recompute interval from config.json. Zero values fall
|
||
// back to the defaultInterval passed to StartAnalyticsRecomputers.
|
||
type AnalyticsRecomputeIntervals struct {
|
||
Topology time.Duration
|
||
RF time.Duration
|
||
Distance time.Duration
|
||
Channels time.Duration
|
||
HashCollisions time.Duration
|
||
HashSizes time.Duration
|
||
Roles time.Duration
|
||
ObserversClockSkew time.Duration
|
||
NodesClockSkew time.Duration
|
||
}
|
||
|
||
func pickInterval(override, def time.Duration) time.Duration {
|
||
if override > 0 {
|
||
return override
|
||
}
|
||
return def
|
||
}
|
||
|
||
// StartAnalyticsRecomputers wires each analytics endpoint to a
|
||
// background recompute goroutine. Each runs an initial compute
|
||
// synchronously (so the first read after startup is a cache hit, never
|
||
// cold) and then refreshes on a ticker.
|
||
//
|
||
// All recomputers serve the DEFAULT query shape only: region="" and
|
||
// zero-window (no ?since= / ?until= params). Region-keyed or windowed
|
||
// queries continue to use the legacy on-request compute + TTL cache —
|
||
// the recomputer count would explode if we maintained one per
|
||
// (endpoint × region × window) combination, and region filtering is
|
||
// fast read-time work anyway.
|
||
//
|
||
// Returns a stop closure that signals all goroutines and blocks until
|
||
// they exit. Safe to call once per PacketStore. Idempotent if called
|
||
// multiple times (subsequent calls return the first stop closure).
|
||
func (s *PacketStore) StartAnalyticsRecomputers(defaultInterval time.Duration, overrides ...AnalyticsRecomputeIntervals) func() {
|
||
if defaultInterval <= 0 {
|
||
defaultInterval = 5 * time.Minute
|
||
}
|
||
var ov AnalyticsRecomputeIntervals
|
||
if len(overrides) > 0 {
|
||
ov = overrides[0]
|
||
}
|
||
|
||
s.analyticsRecomputerMu.Lock()
|
||
if s.recompTopology != nil {
|
||
// Already started; return a no-op so the caller's defer is harmless.
|
||
s.analyticsRecomputerMu.Unlock()
|
||
return func() {}
|
||
}
|
||
|
||
// Each recomputer wraps the underlying compute* function with the
|
||
// default arguments. We use computeAnalytics* (not GetAnalytics*) to
|
||
// bypass the legacy TTL cache layer — the recomputer IS the cache.
|
||
s.recompTopology = newAnalyticsRecomputer(
|
||
"topology", pickInterval(ov.Topology, defaultInterval),
|
||
func() interface{} { return s.computeAnalyticsTopology("", TimeWindow{}) },
|
||
)
|
||
s.recompRF = newAnalyticsRecomputer(
|
||
"rf", pickInterval(ov.RF, defaultInterval),
|
||
func() interface{} { return s.computeAnalyticsRF("", TimeWindow{}) },
|
||
)
|
||
s.recompDistance = newAnalyticsRecomputer(
|
||
"distance", pickInterval(ov.Distance, defaultInterval),
|
||
func() interface{} { return s.computeAnalyticsDistance("") },
|
||
)
|
||
s.recompChannels = newAnalyticsRecomputer(
|
||
"channels", pickInterval(ov.Channels, defaultInterval),
|
||
func() interface{} { return s.computeAnalyticsChannels("", TimeWindow{}) },
|
||
)
|
||
s.recompHashCollisions = newAnalyticsRecomputer(
|
||
"hash-collisions", pickInterval(ov.HashCollisions, defaultInterval),
|
||
func() interface{} { return s.computeHashCollisions("") },
|
||
)
|
||
s.recompHashSizes = newAnalyticsRecomputer(
|
||
"hash-sizes", pickInterval(ov.HashSizes, defaultInterval),
|
||
func() interface{} { return s.computeAnalyticsHashSizesWithCapability("") },
|
||
)
|
||
s.recompRoles = newAnalyticsRecomputer(
|
||
"roles", pickInterval(ov.Roles, defaultInterval),
|
||
func() interface{} { return s.computeAnalyticsRoles() },
|
||
)
|
||
s.recompObserversClockSkew = newAnalyticsRecomputer(
|
||
"observers-clock-skew", pickInterval(ov.ObserversClockSkew, defaultInterval),
|
||
func() interface{} { return s.computeObserverCalibrations() },
|
||
)
|
||
s.recompNodesClockSkew = newAnalyticsRecomputer(
|
||
"nodes-clock-skew", pickInterval(ov.NodesClockSkew, defaultInterval),
|
||
func() interface{} { return s.computeFleetClockSkew() },
|
||
)
|
||
all := []*analyticsRecomputer{
|
||
s.recompTopology, s.recompRF, s.recompDistance,
|
||
s.recompChannels, s.recompHashCollisions, s.recompHashSizes,
|
||
s.recompRoles,
|
||
s.recompObserversClockSkew, s.recompNodesClockSkew,
|
||
}
|
||
s.analyticsRecomputerMu.Unlock()
|
||
|
||
for _, rc := range all {
|
||
rc.Start()
|
||
}
|
||
|
||
return func() {
|
||
for _, rc := range all {
|
||
rc.Stop()
|
||
}
|
||
}
|
||
}
|