mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-05-21 00:55:34 +00:00
8bf7709970
RED test commit: `fd661569` — CI will fail on this (stub returns empty map; assertions fail by design). GREEN: `bf4b8592`. ## What Implements **axis 2 of 4** for the repeater usefulness score per #672 ([status comment](https://github.com/Kpa-clawbot/CoreScope/issues/672#issuecomment-4484635378)). The Bridge axis measures *structural importance*: how many shortest paths between other nodes route through this one. A high-traffic redundant node and a low-traffic critical bridge will no longer look identical. ## Algorithm **Brandes' weighted betweenness centrality** with Dijkstra for shortest paths (`cmd/server/bridge_score.go`). - Nodes: pubkeys in the `neighbor_edges` graph - Edge weight: `Score(now) * Confidence()` — per the convention from #1235 (count + recency decay scaled by observer-diversity confidence). Geo-rejected edges already excluded at graph build time (#1230) so we don't re-filter here. - Dijkstra distance: `1 / max(epsilon, weight)` — high affinity = cheap cost. - Normalize: divide by max observed centrality so output is in `[0, 1]`. Cost: `O(V · (E + V log V))`. Staging-scale (~600 nodes / ~2 000 edges) ≈ ~4.8M ops, completes in milliseconds. ## Where it lives - `cmd/server/bridge_score.go` — pure algorithm, no locks - `cmd/server/bridge_recomputer.go` — background recomputer (mirrors #1240/#1262 pattern), 5-min default interval, initial sync prewarm, snapshot stored in `s.bridgeScoreMap atomic.Pointer[map[string]float64]` - `cmd/server/routes.go` — `handleNodes` adds `node["bridge_score"]` on repeater/room rows; node-detail handler adds it on the single-node path - `public/nodes.js` — separate **Bridge** row in the node detail panel, alongside the existing **Usefulness** (Traffic) row. Distinct colour-coded bar. ## What's NOT in this PR (still pending for #672) - **Coverage axis** (axis 3) — unique observer-pair connectivity - **Redundancy axis** (axis 4) — simulated node-removal impact - **Composite** — once all 4 axes ship, swap the `usefulness_score` formula from "traffic-only" to the weighted composite `Refs #672` (not `Fixes` — issue stays open until all 4 axes + composite ship). ## Tests - `TestComputeBridgeScores_LineGraph` — 4-node line: middles non-zero, leaves zero, max normalized to 1.0 - `TestComputeBridgeScores_TriangleNoBridge` — clique has zero bridges - `TestComputeBridgeScores_Empty` — defensive nil-safety - `TestComputeBridgeScores_WeightSensitive` — mutation guard: revert the `1/w` inversion and this test fails - `TestBridgeScore_HandleNodesSurface` — integration: `/api/nodes` returns `bridge_score` on repeater rows; middle nodes > 0, ends == 0 --------- Co-authored-by: clawbot <bot@meshcore.local>
199 lines
6.1 KiB
Go
199 lines
6.1 KiB
Go
// Package main: bridge-axis recomputer (issue #672 axis 2 of 4).
|
|
//
|
|
// Steady-state background loop that recomputes the per-pubkey bridge
|
|
// centrality score over the in-memory NeighborGraph and stores the
|
|
// resulting map atomically. handleNodes reads via a single atomic
|
|
// load — no lock contention with ingest or with other recomputers
|
|
// (same pattern as #1240 / #1248).
|
|
//
|
|
// Interval default: 5 minutes. The graph itself rebuilds asynchronously
|
|
// on its own schedule (path_inspect.go); a 5-minute cadence here is
|
|
// well within the freshness budget for a structural metric (centrality
|
|
// changes slowly — a new edge or evicted node nudges scores by
|
|
// fractions of a percent).
|
|
//
|
|
// Cost (Brandes + Dijkstra): O(V · (E + V log V)). Staging-scale ~600
|
|
// nodes / ~2 000 edges ≈ ~4.8M ops, well under 100 ms in practice. On
|
|
// host-fleet scale (5 000 nodes / 30 000 edges) it is still seconds,
|
|
// running in a background goroutine off the request path.
|
|
package main
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// bridgeRecomputerDefaultInterval is how often the bridge score map is
|
|
// rebuilt. 5 minutes mirrors analytics_recomputer (#1240) and
|
|
// repeater_enrich_recomputer (#1262); centrality is a slow-moving
|
|
// structural signal and does not warrant tighter cadence.
|
|
const bridgeRecomputerDefaultInterval = 5 * time.Minute
|
|
|
|
// bridgeRecompStartedMu serializes start of the bridge recomputer.
|
|
// We do not currently expose Stop publicly — the goroutine lives for
|
|
// the lifetime of the process — but keeping the started flag local
|
|
// (instead of on PacketStore) avoids further field churn in store.go.
|
|
var (
|
|
bridgeRecompStartedMu sync.Mutex
|
|
bridgeRecompStarted bool
|
|
)
|
|
|
|
// StartBridgeScoreRecomputer launches the bridge-centrality recomputer
|
|
// (issue #672 axis 2). It performs an initial synchronous compute so
|
|
// that the very first /api/nodes after server start hits a populated
|
|
// snapshot instead of returning bridge_score=0 for every node, then
|
|
// reschedules every `interval` (default 5min if <= 0).
|
|
//
|
|
// Idempotent: subsequent calls are no-ops and return a no-op stop
|
|
// closure.
|
|
func (s *PacketStore) StartBridgeScoreRecomputer(interval time.Duration) func() {
|
|
if interval <= 0 {
|
|
interval = bridgeRecomputerDefaultInterval
|
|
}
|
|
|
|
bridgeRecompStartedMu.Lock()
|
|
if bridgeRecompStarted {
|
|
bridgeRecompStartedMu.Unlock()
|
|
return func() {}
|
|
}
|
|
bridgeRecompStarted = true
|
|
stop := make(chan struct{})
|
|
done := make(chan struct{})
|
|
bridgeRecompStartedMu.Unlock()
|
|
|
|
// Initial synchronous prewarm — see comment above.
|
|
recomputeBridgeScoresSafe(s)
|
|
|
|
var stopOnce sync.Once
|
|
go func() {
|
|
defer close(done)
|
|
t := time.NewTicker(interval)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
recomputeBridgeScoresSafe(s)
|
|
case <-stop:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return func() {
|
|
stopOnce.Do(func() {
|
|
close(stop)
|
|
})
|
|
select {
|
|
case <-done:
|
|
case <-time.After(5 * time.Second):
|
|
}
|
|
}
|
|
}
|
|
|
|
// recomputeBridgeScoresSafe runs ComputeBridgeScores over the current
|
|
// neighbor graph and installs the result. Panics in compute are
|
|
// swallowed (defensive) so the goroutine never dies; the previous
|
|
// snapshot remains valid.
|
|
func recomputeBridgeScoresSafe(s *PacketStore) {
|
|
defer func() { _ = recover() }()
|
|
graph := s.graph.Load()
|
|
if graph == nil {
|
|
// No graph yet — install an empty map so readers get a defined
|
|
// zero rather than a nil sentinel (handleNodes treats both as
|
|
// 0.0, but an explicit empty snapshot avoids "is this ready
|
|
// yet?" confusion in operator-facing tooling).
|
|
empty := map[string]float64{}
|
|
s.bridgeScoreMap.Store(&empty)
|
|
return
|
|
}
|
|
now := time.Now()
|
|
edges := bridgeEdgesFromGraph(graph, now)
|
|
scores := ComputeBridgeScores(edges)
|
|
s.bridgeScoreMap.Store(&scores)
|
|
}
|
|
|
|
// bridgeEdgesFromGraph snapshots the NeighborGraph into a flat slice
|
|
// of BridgeEdge tuples with weight = Score(now) * Confidence(), per
|
|
// the convention established by #1235. Edges with unresolved B
|
|
// endpoints (no concrete pubkey yet — only a hop prefix) are skipped:
|
|
// they contribute no betweenness signal because the second endpoint
|
|
// is unknown.
|
|
func bridgeEdgesFromGraph(graph *NeighborGraph, now time.Time) []BridgeEdge {
|
|
all := graph.AllEdges()
|
|
out := make([]BridgeEdge, 0, len(all))
|
|
for _, e := range all {
|
|
if e == nil {
|
|
continue
|
|
}
|
|
if e.NodeA == "" || e.NodeB == "" {
|
|
// Unresolved (prefix-only) — no defined second endpoint.
|
|
continue
|
|
}
|
|
w := e.Score(now) * e.Confidence()
|
|
if w < bridgeMinWeightEpsilon {
|
|
continue
|
|
}
|
|
out = append(out, BridgeEdge{A: e.NodeA, B: e.NodeB, Weight: w})
|
|
}
|
|
return out
|
|
}
|
|
|
|
// GetBridgeScore returns the bridge centrality score for a pubkey in
|
|
// [0, 1], or 0 if the recomputer has not run yet or the pubkey is not
|
|
// in the graph. Lookup is case-insensitive (the score map keys are
|
|
// lowercase, matching byPathHop convention).
|
|
func (s *PacketStore) GetBridgeScore(pubkey string) float64 {
|
|
if pubkey == "" {
|
|
return 0
|
|
}
|
|
snap := s.bridgeScoreMap.Load()
|
|
if snap == nil {
|
|
return 0
|
|
}
|
|
m := *snap
|
|
if v, ok := m[pubkey]; ok {
|
|
return v
|
|
}
|
|
// Try lowercase form.
|
|
lc := pubkey
|
|
for i := 0; i < len(lc); i++ {
|
|
if lc[i] >= 'A' && lc[i] <= 'Z' {
|
|
b := []byte(pubkey)
|
|
for j := i; j < len(b); j++ {
|
|
if b[j] >= 'A' && b[j] <= 'Z' {
|
|
b[j] += 'a' - 'A'
|
|
}
|
|
}
|
|
lc = string(b)
|
|
break
|
|
}
|
|
}
|
|
if v, ok := m[lc]; ok {
|
|
return v
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// GetBridgeScoreMap returns a defensive copy-by-reference of the
|
|
// current bridge score snapshot. Nil-safe: returns an empty map if
|
|
// no snapshot has been installed yet. Map is read-only by convention
|
|
// — callers MUST NOT mutate it (the snapshot is shared across all
|
|
// concurrent readers).
|
|
func (s *PacketStore) GetBridgeScoreMap() map[string]float64 {
|
|
snap := s.bridgeScoreMap.Load()
|
|
if snap == nil {
|
|
return map[string]float64{}
|
|
}
|
|
return *snap
|
|
}
|
|
|
|
// resetBridgeRecomputerForTest is a test-only helper to allow the
|
|
// integration test to re-Start the recomputer in a fresh process
|
|
// (which would otherwise be blocked by the package-level
|
|
// bridgeRecompStarted flag). Production code must not call this.
|
|
func resetBridgeRecomputerForTest() {
|
|
bridgeRecompStartedMu.Lock()
|
|
bridgeRecompStarted = false
|
|
bridgeRecompStartedMu.Unlock()
|
|
}
|