mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-05-19 16:26:02 +00:00
f81ed5b3cf
RED commit: `0190466d` — failing CI: https://github.com/Kpa-clawbot/CoreScope/actions (will populate after PR creation) ## Problem On staging (commit `d69d9fb`, 78k tx, 2.3M obs), `curl http://localhost/api/analytics/roles` times out at 60s with 0 bytes — the Roles tab is unusable. Issue #1256. PR #1248's steady-state recomputer fan-out (topology / rf / distance / channels / hash-collisions / hash-sizes) **didn't include roles**. The legacy handler: 1. Holds `s.mu.RLock` for the entire compute. 2. Calls `GetFleetClockSkew()`, which drives `clockSkew.Recompute(s)` over all ADVERT transmissions — O(78k) per request. 3. Concurrent ingest writers compound the latency through writer-starvation. Result: every request hits the cold path; the response never comes back inside the 60 s HTTP budget. ## Fix Add `roles` as the 7th endpoint in the recomputer fan-out — same pattern as #1248: - `PacketStore.recompRoles` slot, registered in `StartAnalyticsRecomputers` with default 5-min interval. - `PacketStore.GetAnalyticsRoles()` → atomic-pointer load from the snapshot (sub-ms), with a `computeAnalyticsRoles()` fallback only for the brief startup window before the initial sync compute completes. - Handler is now a thin wrapper — no lock-held work on the request path. - New optional `roles` key under `analytics.recomputeIntervalSeconds` in config; `config.example.json` and `_comment_analytics` updated. ## Latency (unit-scope benchmark) - Worst-of-50 handler latency: **<100 ms** (test budget; well under the 2 s p99 acceptance). - Compute itself is bounded by the existing 5-min recompute window — it runs once in the background, never on the request path. ## Tests - RED `0190466d`: asserts `recompRoles` is registered and the handler returns under the latency budget. Fails on master with `recompRoles not registered`. - GREEN `d7784f76`: registers the recomputer + snapshot accessor — both tests pass. Fixes #1256 --------- Co-authored-by: openclaw-bot <bot@openclaw.local>
167 lines
5.1 KiB
Go
167 lines
5.1 KiB
Go
package main
|
|
|
|
import (
|
|
"math"
|
|
"net/http"
|
|
"sort"
|
|
"strings"
|
|
)
|
|
|
|
// RoleStats summarises one role's population and clock-skew posture.
|
|
type RoleStats struct {
|
|
Role string `json:"role"`
|
|
NodeCount int `json:"nodeCount"`
|
|
WithSkew int `json:"withSkew"`
|
|
MeanAbsSkewSec float64 `json:"meanAbsSkewSec"`
|
|
MedianAbsSkewSec float64 `json:"medianAbsSkewSec"`
|
|
OkCount int `json:"okCount"`
|
|
WarningCount int `json:"warningCount"`
|
|
CriticalCount int `json:"criticalCount"`
|
|
AbsurdCount int `json:"absurdCount"`
|
|
NoClockCount int `json:"noClockCount"`
|
|
}
|
|
|
|
// RoleAnalyticsResponse is the payload returned by /api/analytics/roles.
|
|
type RoleAnalyticsResponse struct {
|
|
TotalNodes int `json:"totalNodes"`
|
|
Roles []RoleStats `json:"roles"`
|
|
}
|
|
|
|
// normalizeRole canonicalises a role string so empty/unknown roles bucket
|
|
// together and case differences don't fragment the distribution.
|
|
func normalizeRole(r string) string {
|
|
r = strings.ToLower(strings.TrimSpace(r))
|
|
if r == "" {
|
|
return "unknown"
|
|
}
|
|
return r
|
|
}
|
|
|
|
// computeRoleAnalytics groups nodes by role and aggregates clock-skew per
|
|
// role. Pure function: takes the node roster and the per-pubkey skew map and
|
|
// returns the response — no store / lock dependencies, easy to unit test.
|
|
//
|
|
// `nodesByPubkey` lists every known node (pubkey → role). `skewByPubkey`
|
|
// is the subset of pubkeys that have clock-skew data with their severity and
|
|
// most-recent corrected skew (in seconds, signed — we take |x| for averages).
|
|
func computeRoleAnalytics(nodesByPubkey map[string]string, skewByPubkey map[string]*NodeClockSkew) RoleAnalyticsResponse {
|
|
type bucket struct {
|
|
stats RoleStats
|
|
absSkews []float64
|
|
}
|
|
buckets := make(map[string]*bucket)
|
|
for pk, rawRole := range nodesByPubkey {
|
|
role := normalizeRole(rawRole)
|
|
b, ok := buckets[role]
|
|
if !ok {
|
|
b = &bucket{stats: RoleStats{Role: role}}
|
|
buckets[role] = b
|
|
}
|
|
b.stats.NodeCount++
|
|
cs, has := skewByPubkey[pk]
|
|
if !has || cs == nil {
|
|
continue
|
|
}
|
|
b.stats.WithSkew++
|
|
abs := math.Abs(cs.RecentMedianSkewSec)
|
|
if abs == 0 {
|
|
abs = math.Abs(cs.LastSkewSec)
|
|
}
|
|
b.absSkews = append(b.absSkews, abs)
|
|
switch cs.Severity {
|
|
case SkewOK:
|
|
b.stats.OkCount++
|
|
case SkewWarning:
|
|
b.stats.WarningCount++
|
|
case SkewCritical:
|
|
b.stats.CriticalCount++
|
|
case SkewAbsurd:
|
|
b.stats.AbsurdCount++
|
|
case SkewNoClock:
|
|
b.stats.NoClockCount++
|
|
}
|
|
}
|
|
resp := RoleAnalyticsResponse{Roles: make([]RoleStats, 0, len(buckets))}
|
|
for _, b := range buckets {
|
|
if n := len(b.absSkews); n > 0 {
|
|
sum := 0.0
|
|
for _, v := range b.absSkews {
|
|
sum += v
|
|
}
|
|
b.stats.MeanAbsSkewSec = round(sum/float64(n), 2)
|
|
sorted := make([]float64, n)
|
|
copy(sorted, b.absSkews)
|
|
sort.Float64s(sorted)
|
|
if n%2 == 1 {
|
|
b.stats.MedianAbsSkewSec = round(sorted[n/2], 2)
|
|
} else {
|
|
b.stats.MedianAbsSkewSec = round((sorted[n/2-1]+sorted[n/2])/2, 2)
|
|
}
|
|
}
|
|
resp.TotalNodes += b.stats.NodeCount
|
|
resp.Roles = append(resp.Roles, b.stats)
|
|
}
|
|
// Sort: largest population first, then role name for stable output.
|
|
sort.Slice(resp.Roles, func(i, j int) bool {
|
|
if resp.Roles[i].NodeCount != resp.Roles[j].NodeCount {
|
|
return resp.Roles[i].NodeCount > resp.Roles[j].NodeCount
|
|
}
|
|
return resp.Roles[i].Role < resp.Roles[j].Role
|
|
})
|
|
return resp
|
|
}
|
|
|
|
// handleAnalyticsRoles serves /api/analytics/roles. Reads from the
|
|
// steady-state recomputer snapshot (issue #1256) so the request never
|
|
// holds s.mu.RLock for a full clock-skew recompute over the advert
|
|
// transmissions — that path hung >60s on staging with 78k tx.
|
|
func (s *Server) handleAnalyticsRoles(w http.ResponseWriter, r *http.Request) {
|
|
if s.store == nil {
|
|
writeJSON(w, RoleAnalyticsResponse{Roles: []RoleStats{}})
|
|
return
|
|
}
|
|
writeJSON(w, s.store.GetAnalyticsRoles())
|
|
}
|
|
|
|
// GetAnalyticsRoles returns the role-distribution analytics, preferring
|
|
// the steady-state recomputer snapshot (issue #1256). Falls back to an
|
|
// on-request compute path if the recomputer is not yet running (e.g.
|
|
// during the brief startup window before the initial compute completes
|
|
// — Start runs it synchronously, so this fallback is effectively only
|
|
// hit in tests that skip the recomputer entirely).
|
|
func (s *PacketStore) GetAnalyticsRoles() RoleAnalyticsResponse {
|
|
s.analyticsRecomputerMu.RLock()
|
|
rc := s.recompRoles
|
|
s.analyticsRecomputerMu.RUnlock()
|
|
if rc != nil {
|
|
if v := rc.Load(); v != nil {
|
|
if r, ok := v.(RoleAnalyticsResponse); ok {
|
|
s.cacheMu.Lock()
|
|
s.cacheHits++
|
|
s.cacheMu.Unlock()
|
|
return r
|
|
}
|
|
}
|
|
}
|
|
return s.computeAnalyticsRoles()
|
|
}
|
|
|
|
// computeAnalyticsRoles runs the actual role aggregation. Used by the
|
|
// background recomputer (issue #1256) and as a fallback for callers
|
|
// arriving before the snapshot is populated.
|
|
func (s *PacketStore) computeAnalyticsRoles() RoleAnalyticsResponse {
|
|
nodes, _ := s.getCachedNodesAndPM()
|
|
roles := make(map[string]string, len(nodes))
|
|
for _, n := range nodes {
|
|
roles[n.PublicKey] = n.Role
|
|
}
|
|
skewMap := make(map[string]*NodeClockSkew)
|
|
for _, cs := range s.GetFleetClockSkew() {
|
|
if cs == nil {
|
|
continue
|
|
}
|
|
skewMap[cs.Pubkey] = cs
|
|
}
|
|
return computeRoleAnalytics(roles, skewMap)
|
|
}
|