Files
Kpa-clawbot f81ed5b3cf perf(#1256): wire /api/analytics/roles into steady-state recomputer (#1259)
RED commit: `0190466d` — failing CI:
https://github.com/Kpa-clawbot/CoreScope/actions (will populate after PR
creation)

## Problem
On staging (commit `d69d9fb`, 78k tx, 2.3M obs), `curl
http://localhost/api/analytics/roles` times out at 60s with 0 bytes —
the Roles tab is unusable. Issue #1256.

PR #1248's steady-state recomputer fan-out (topology / rf / distance /
channels / hash-collisions / hash-sizes) **didn't include roles**. The
legacy handler:

1. Holds `s.mu.RLock` for the entire compute.
2. Calls `GetFleetClockSkew()`, which drives `clockSkew.Recompute(s)`
over all ADVERT transmissions — O(78k) per request.
3. Concurrent ingest writers compound the latency through
writer-starvation.

Result: every request hits the cold path; the response never comes back
inside the 60 s HTTP budget.

## Fix
Add `roles` as the 7th endpoint in the recomputer fan-out — same pattern
as #1248:

- `PacketStore.recompRoles` slot, registered in
`StartAnalyticsRecomputers` with default 5-min interval.
- `PacketStore.GetAnalyticsRoles()` → atomic-pointer load from the
snapshot (sub-ms), with a `computeAnalyticsRoles()` fallback only for
the brief startup window before the initial sync compute completes.
- Handler is now a thin wrapper — no lock-held work on the request path.
- New optional `roles` key under `analytics.recomputeIntervalSeconds` in
config; `config.example.json` and `_comment_analytics` updated.

## Latency (unit-scope benchmark)
- Worst-of-50 handler latency: **<100 ms** (test budget; well under the
2 s p99 acceptance).
- Compute itself is bounded by the existing 5-min recompute window — it
runs once in the background, never on the request path.

## Tests
- RED `0190466d`: asserts `recompRoles` is registered and the handler
returns under the latency budget. Fails on master with `recompRoles not
registered`.
- GREEN `d7784f76`: registers the recomputer + snapshot accessor — both
tests pass.

Fixes #1256

---------

Co-authored-by: openclaw-bot <bot@openclaw.local>
2026-05-18 07:36:28 -07:00

167 lines
5.1 KiB
Go

package main
import (
"math"
"net/http"
"sort"
"strings"
)
// RoleStats summarises one role's population and clock-skew posture.
type RoleStats struct {
Role string `json:"role"`
NodeCount int `json:"nodeCount"`
WithSkew int `json:"withSkew"`
MeanAbsSkewSec float64 `json:"meanAbsSkewSec"`
MedianAbsSkewSec float64 `json:"medianAbsSkewSec"`
OkCount int `json:"okCount"`
WarningCount int `json:"warningCount"`
CriticalCount int `json:"criticalCount"`
AbsurdCount int `json:"absurdCount"`
NoClockCount int `json:"noClockCount"`
}
// RoleAnalyticsResponse is the payload returned by /api/analytics/roles.
type RoleAnalyticsResponse struct {
TotalNodes int `json:"totalNodes"`
Roles []RoleStats `json:"roles"`
}
// normalizeRole canonicalises a role string so empty/unknown roles bucket
// together and case differences don't fragment the distribution.
func normalizeRole(r string) string {
r = strings.ToLower(strings.TrimSpace(r))
if r == "" {
return "unknown"
}
return r
}
// computeRoleAnalytics groups nodes by role and aggregates clock-skew per
// role. Pure function: takes the node roster and the per-pubkey skew map and
// returns the response — no store / lock dependencies, easy to unit test.
//
// `nodesByPubkey` lists every known node (pubkey → role). `skewByPubkey`
// is the subset of pubkeys that have clock-skew data with their severity and
// most-recent corrected skew (in seconds, signed — we take |x| for averages).
func computeRoleAnalytics(nodesByPubkey map[string]string, skewByPubkey map[string]*NodeClockSkew) RoleAnalyticsResponse {
type bucket struct {
stats RoleStats
absSkews []float64
}
buckets := make(map[string]*bucket)
for pk, rawRole := range nodesByPubkey {
role := normalizeRole(rawRole)
b, ok := buckets[role]
if !ok {
b = &bucket{stats: RoleStats{Role: role}}
buckets[role] = b
}
b.stats.NodeCount++
cs, has := skewByPubkey[pk]
if !has || cs == nil {
continue
}
b.stats.WithSkew++
abs := math.Abs(cs.RecentMedianSkewSec)
if abs == 0 {
abs = math.Abs(cs.LastSkewSec)
}
b.absSkews = append(b.absSkews, abs)
switch cs.Severity {
case SkewOK:
b.stats.OkCount++
case SkewWarning:
b.stats.WarningCount++
case SkewCritical:
b.stats.CriticalCount++
case SkewAbsurd:
b.stats.AbsurdCount++
case SkewNoClock:
b.stats.NoClockCount++
}
}
resp := RoleAnalyticsResponse{Roles: make([]RoleStats, 0, len(buckets))}
for _, b := range buckets {
if n := len(b.absSkews); n > 0 {
sum := 0.0
for _, v := range b.absSkews {
sum += v
}
b.stats.MeanAbsSkewSec = round(sum/float64(n), 2)
sorted := make([]float64, n)
copy(sorted, b.absSkews)
sort.Float64s(sorted)
if n%2 == 1 {
b.stats.MedianAbsSkewSec = round(sorted[n/2], 2)
} else {
b.stats.MedianAbsSkewSec = round((sorted[n/2-1]+sorted[n/2])/2, 2)
}
}
resp.TotalNodes += b.stats.NodeCount
resp.Roles = append(resp.Roles, b.stats)
}
// Sort: largest population first, then role name for stable output.
sort.Slice(resp.Roles, func(i, j int) bool {
if resp.Roles[i].NodeCount != resp.Roles[j].NodeCount {
return resp.Roles[i].NodeCount > resp.Roles[j].NodeCount
}
return resp.Roles[i].Role < resp.Roles[j].Role
})
return resp
}
// handleAnalyticsRoles serves /api/analytics/roles. Reads from the
// steady-state recomputer snapshot (issue #1256) so the request never
// holds s.mu.RLock for a full clock-skew recompute over the advert
// transmissions — that path hung >60s on staging with 78k tx.
func (s *Server) handleAnalyticsRoles(w http.ResponseWriter, r *http.Request) {
if s.store == nil {
writeJSON(w, RoleAnalyticsResponse{Roles: []RoleStats{}})
return
}
writeJSON(w, s.store.GetAnalyticsRoles())
}
// GetAnalyticsRoles returns the role-distribution analytics, preferring
// the steady-state recomputer snapshot (issue #1256). Falls back to an
// on-request compute path if the recomputer is not yet running (e.g.
// during the brief startup window before the initial compute completes
// — Start runs it synchronously, so this fallback is effectively only
// hit in tests that skip the recomputer entirely).
func (s *PacketStore) GetAnalyticsRoles() RoleAnalyticsResponse {
s.analyticsRecomputerMu.RLock()
rc := s.recompRoles
s.analyticsRecomputerMu.RUnlock()
if rc != nil {
if v := rc.Load(); v != nil {
if r, ok := v.(RoleAnalyticsResponse); ok {
s.cacheMu.Lock()
s.cacheHits++
s.cacheMu.Unlock()
return r
}
}
}
return s.computeAnalyticsRoles()
}
// computeAnalyticsRoles runs the actual role aggregation. Used by the
// background recomputer (issue #1256) and as a fallback for callers
// arriving before the snapshot is populated.
func (s *PacketStore) computeAnalyticsRoles() RoleAnalyticsResponse {
nodes, _ := s.getCachedNodesAndPM()
roles := make(map[string]string, len(nodes))
for _, n := range nodes {
roles[n.PublicKey] = n.Role
}
skewMap := make(map[string]*NodeClockSkew)
for _, cs := range s.GetFleetClockSkew() {
if cs == nil {
continue
}
skewMap[cs.Pubkey] = cs
}
return computeRoleAnalytics(roles, skewMap)
}