feat: repeater liveness indicator with relay stats (#662) (#755)

## Summary

- **Backend**: adds `relayTimes` in-memory index (sorted unix-millis per
repeater pubkey), maintained in lockstep with `byPathHop`. Populated at
startup from all packet observations (not just best), updated on
ingest/evict/backfill. Exposes `relay_count_1h`, `relay_count_24h`,
`last_relayed` in both `/api/nodes` (for repeaters) and
`/api/nodes/{pubkey}/health`.
- **Frontend**: `getNodeStatus` extended to three-state (`relaying` /
`active` / `stale`) for repeaters based on relay_count_24h.
`getStatusInfo` is the single source of truth for status label,
explanation, and relay stats. Detail pane shows relay counts and last
relayed time. Nodes list gets a status emoji column with hover tooltip
showing relay info.
- **Correctness fixes**: relay index scans all observations per packet
(not just best); backfill now updates relay index after resolving paths;
pubkeys lowercased consistently throughout index.

## Changes

### `cmd/server/store.go`
- `relayTimes map[string][]int64` field added to `PacketStore`
- `addTxToRelayTimeIndex` / `removeFromRelayTimeIndex`: scan all
observations, idempotent sorted insert, lowercase keys
- `relayMetrics(times, nowMs)`: returns `(count1h, count24h,
lastRelayed)`
- `buildPathHopIndex`: populates `relayTimes` at startup
- `pollAndMerge`: updates relay index on ingest and eviction; new `else`
branch for path-unchanged observations
- `addTxToPathHopIndex` / `removeTxFromPathHopIndex`: lowercase resolved
pubkeys (fixes casing mismatch with lookup)

### `cmd/server/routes.go`
- `GetBulkHealth` / `GetNodeHealth`: include relay stats for repeater
nodes
- `handleNodes`: enriches repeater nodes with relay stats from
`relayTimes` so list view has same data as detail pane

### `cmd/server/neighbor_persist.go`
- `backfillResolvedPathsAsync`: calls `addTxToRelayTimeIndex` after
`pickBestObservation` to capture newly resolved pubkeys

### `public/roles.js`
- `getNodeStatus(role, lastSeenMs, relayCount24h)`: three-state logic
for repeaters
- `getStatusInfo(n)`: single source of truth returning status, label,
explanation, relay counts, last relayed

### `public/nodes.js`
- Detail pane: `n.stats` populated from health endpoint before
`getStatusInfo` call
- Nodes list: status emoji column with relay hover tooltip; status
filter uses `getStatusInfo`

### Tests
- `relay_liveness_test.go`: index functions, relay metrics, wiring
integration, bulk/single health endpoints
- `test-repeater-liveness.js`: three-state frontend logic, backward
compat

## Test plan
- [x] Repeater with recent relay traffic shows green relaying emoji in
list and detail pane
- [x] Repeater with no relay traffic in 24h shows yellow idle in both
views
- [x] Repeater not heard recently shows grey stale in both views
- [x] Non-repeater nodes unaffected (no relay stats, no status change)
- [x] Hover tooltip on list emoji shows relay count and last relayed
time
- [x] `go test ./...` passes
- [x] `node test-repeater-liveness.js` passes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: openclaw-bot <bot@openclaw.local>
This commit is contained in:
efiten
2026-05-21 20:39:43 +02:00
committed by GitHub
parent e9d74e1bab
commit ba6c2ac6ba
5 changed files with 1458 additions and 2 deletions
-2
View File
@@ -21,7 +21,6 @@ COPY internal/packetpath/ ../../internal/packetpath/
COPY internal/dbconfig/ ../../internal/dbconfig/
COPY internal/dbschema/ ../../internal/dbschema/
COPY internal/perfio/ ../../internal/perfio/
COPY internal/dbschema/ ../../internal/dbschema/
RUN go mod download
COPY cmd/server/ ./
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} \
@@ -36,7 +35,6 @@ COPY internal/packetpath/ ../../internal/packetpath/
COPY internal/dbconfig/ ../../internal/dbconfig/
COPY internal/dbschema/ ../../internal/dbschema/
COPY internal/perfio/ ../../internal/perfio/
COPY internal/dbschema/ ../../internal/dbschema/
RUN go mod download
COPY cmd/ingestor/ ./
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} \
+156
View File
@@ -0,0 +1,156 @@
package main
import (
"sort"
"strings"
"testing"
"time"
)
func TestAddTxToRelayTimeIndex_SingleNode(t *testing.T) {
idx := make(map[string][]int64)
pk := "aabbccdd11223344"
ts := time.Now().Add(-30 * time.Minute).UTC()
addTxToRelayTimeIndex(idx, ts.Format(time.RFC3339), []string{pk})
if len(idx[pk]) != 1 {
t.Fatalf("expected 1 entry, got %d", len(idx[pk]))
}
wantMs := ts.UnixMilli()
// RFC3339 has second precision, so allow ±1000ms
if diff := idx[pk][0] - wantMs; diff < -1000 || diff > 1000 {
t.Errorf("timestamp mismatch: got %d, want ~%d", idx[pk][0], wantMs)
}
}
func TestAddTxToRelayTimeIndex_SortedOrder(t *testing.T) {
idx := make(map[string][]int64)
pk := "aabbccdd11223344"
t1 := time.Now().Add(-2 * time.Hour).UTC()
t2 := time.Now().Add(-30 * time.Minute).UTC()
// Insert newer first, expect sorted ascending
addTxToRelayTimeIndex(idx, t2.Format(time.RFC3339), []string{pk})
addTxToRelayTimeIndex(idx, t1.Format(time.RFC3339), []string{pk})
if len(idx[pk]) != 2 {
t.Fatalf("expected 2 entries, got %d", len(idx[pk]))
}
if !sort.SliceIsSorted(idx[pk], func(i, j int) bool { return idx[pk][i] < idx[pk][j] }) {
t.Error("relayTimes slice not sorted ascending")
}
}
func TestAddTxToRelayTimeIndex_MultipleNodes(t *testing.T) {
idx := make(map[string][]int64)
pk1 := "aabbccdd11223344"
pk2 := "eeff001122334455"
ts := time.Now().Add(-10 * time.Minute).UTC()
addTxToRelayTimeIndex(idx, ts.Format(time.RFC3339), []string{pk1, pk2})
if len(idx[pk1]) != 1 {
t.Errorf("pk1: expected 1 entry, got %d", len(idx[pk1]))
}
if len(idx[pk2]) != 1 {
t.Errorf("pk2: expected 1 entry, got %d", len(idx[pk2]))
}
}
func TestAddTxToRelayTimeIndex_NilResolvedPath(t *testing.T) {
idx := make(map[string][]int64)
addTxToRelayTimeIndex(idx, time.Now().UTC().Format(time.RFC3339), nil) // must not panic
if len(idx) != 0 {
t.Error("expected empty index for nil pubkeys")
}
}
func TestAddTxToRelayTimeIndex_DuplicatePubkeyInPath(t *testing.T) {
idx := make(map[string][]int64)
pk := "aabbccdd11223344"
ts := time.Now().UTC()
addTxToRelayTimeIndex(idx, ts.Format(time.RFC3339), []string{pk, pk}) // same pubkey twice
if len(idx[pk]) != 1 {
t.Errorf("duplicate pubkey should produce only 1 entry, got %d", len(idx[pk]))
}
}
func TestRemoveFromRelayTimeIndex_RemovesEntry(t *testing.T) {
idx := make(map[string][]int64)
pk := "aabbccdd11223344"
ts := time.Now().Add(-1 * time.Hour).UTC()
firstSeen := ts.Format(time.RFC3339)
addTxToRelayTimeIndex(idx, firstSeen, []string{pk})
if len(idx[pk]) != 1 {
t.Fatal("setup: expected 1 entry")
}
removeFromRelayTimeIndex(idx, firstSeen, []string{pk})
if _, ok := idx[pk]; ok {
t.Error("expected key deleted after last entry removed")
}
}
func TestRemoveFromRelayTimeIndex_PartialRemove(t *testing.T) {
idx := make(map[string][]int64)
pk := "aabbccdd11223344"
t1 := time.Now().Add(-2 * time.Hour).UTC()
t2 := time.Now().Add(-30 * time.Minute).UTC()
fs1 := t1.Format(time.RFC3339)
fs2 := t2.Format(time.RFC3339)
addTxToRelayTimeIndex(idx, fs1, []string{pk})
addTxToRelayTimeIndex(idx, fs2, []string{pk})
removeFromRelayTimeIndex(idx, fs1, []string{pk})
if len(idx[pk]) != 1 {
t.Errorf("expected 1 entry after removing one, got %d", len(idx[pk]))
}
}
func TestRelayMetrics_Counts(t *testing.T) {
now := time.Now().UnixMilli()
times := []int64{
now - 90*60*1000, // 90 min ago — inside 24h, outside 1h
now - 30*60*1000, // 30 min ago — inside both
now - 10*60*1000, // 10 min ago — inside both
}
c1h, c24h, lastRelayed := relayMetrics(times, now)
if c1h != 2 {
t.Errorf("relay_count_1h: expected 2, got %d", c1h)
}
if c24h != 3 {
t.Errorf("relay_count_24h: expected 3, got %d", c24h)
}
wantLast := time.UnixMilli(times[2]).UTC().Format(time.RFC3339)
if lastRelayed != wantLast {
t.Errorf("last_relayed: got %q, want %q", lastRelayed, wantLast)
}
}
func TestRelayMetrics_EmptySlice(t *testing.T) {
c1h, c24h, lastRelayed := relayMetrics(nil, time.Now().UnixMilli())
if c1h != 0 || c24h != 0 || lastRelayed != "" {
t.Errorf("empty slice: expected zeros and empty string, got %d %d %q", c1h, c24h, lastRelayed)
}
}
func TestRelayMetrics_AllOutsideWindow(t *testing.T) {
now := time.Now().UnixMilli()
times := []int64{now - 30*24*60*60*1000} // 30 days ago
c1h, c24h, _ := relayMetrics(times, now)
if c1h != 0 || c24h != 0 {
t.Errorf("expected 0/0 for old entry, got %d/%d", c1h, c24h)
}
}
func TestAddTxToRelayTimeIndex_LowercasesKey(t *testing.T) {
idx := make(map[string][]int64)
pkUpper := "AABBCCDD11223344"
pkLower := strings.ToLower(pkUpper)
ts := time.Now().UTC()
addTxToRelayTimeIndex(idx, ts.Format(time.RFC3339), []string{pkUpper})
if len(idx[pkLower]) != 1 {
t.Errorf("expected index keyed by lowercase, found %d entries at lowercase key", len(idx[pkLower]))
}
if len(idx[pkUpper]) != 0 {
t.Errorf("expected no entry at uppercase key")
}
}
+80
View File
@@ -137,6 +137,7 @@ type PacketStore struct {
byNode map[string][]*StoreTx // pubkey → transmissions
nodeHashes map[string]map[string]bool // pubkey → Set<hash>
byPathHop map[string][]*StoreTx // lowercase hop/pubkey → transmissions with that hop in path
relayTimes map[string][]int64 // lowercase pubkey → sorted unix-millis of relay events (full pubkeys only)
byPayloadType map[int][]*StoreTx // payload_type → transmissions
loaded bool
totalObs int
@@ -487,6 +488,7 @@ func NewPacketStore(db *DB, cfg *PacketStoreConfig, cacheTTLs ...map[string]inte
byObserver: make(map[string][]*StoreObs),
byNode: make(map[string][]*StoreTx),
byPathHop: make(map[string][]*StoreTx),
relayTimes: make(map[string][]int64),
nodeHashes: make(map[string]map[string]bool),
byPayloadType: make(map[int][]*StoreTx),
rfCache: make(map[string]*cachedResult),
@@ -3393,6 +3395,84 @@ func addTxToPathHopIndex(idx map[string][]*StoreTx, tx *StoreTx) {
}
}
// addTxToRelayTimeIndex records the relay timestamp for each resolved pubkey.
// pubkeys is the pre-extracted list (use extractResolvedPubkeys on the decoded path).
// Maintains sorted ascending order for O(log n) window queries.
// Must be called with s.mu held (or during build before store is live).
func addTxToRelayTimeIndex(idx map[string][]int64, firstSeen string, pubkeys []string) {
if len(pubkeys) == 0 {
return
}
ms, err := time.Parse(time.RFC3339, firstSeen)
if err != nil {
return
}
millis := ms.UnixMilli()
seen := make(map[string]bool, len(pubkeys))
for _, pk := range pubkeys {
pk = strings.ToLower(pk)
if pk == "" || seen[pk] {
continue
}
seen[pk] = true
slice := idx[pk]
i := sort.Search(len(slice), func(j int) bool { return slice[j] >= millis })
if i < len(slice) && slice[i] == millis {
continue // idempotent
}
slice = append(slice, 0)
copy(slice[i+1:], slice[i:])
slice[i] = millis
idx[pk] = slice
}
}
// removeFromRelayTimeIndex removes the relay timestamp for each resolved pubkey.
// Inverse of addTxToRelayTimeIndex.
func removeFromRelayTimeIndex(idx map[string][]int64, firstSeen string, pubkeys []string) {
if len(pubkeys) == 0 {
return
}
ms, err := time.Parse(time.RFC3339, firstSeen)
if err != nil {
return
}
millis := ms.UnixMilli()
seen := make(map[string]bool, len(pubkeys))
for _, pk := range pubkeys {
pk = strings.ToLower(pk)
if pk == "" || seen[pk] {
continue
}
seen[pk] = true
slice := idx[pk]
i := sort.Search(len(slice), func(j int) bool { return slice[j] >= millis })
if i < len(slice) && slice[i] == millis {
newSlice := make([]int64, 0, len(slice)-1)
newSlice = append(newSlice, slice[:i]...)
newSlice = append(newSlice, slice[i+1:]...)
idx[pk] = newSlice
if len(newSlice) == 0 {
delete(idx, pk)
}
}
}
}
// relayMetrics computes relay_count_1h, relay_count_24h, and last_relayed from a
// sorted unix-millis slice. now is time.Now().UnixMilli(). O(log n).
func relayMetrics(times []int64, now int64) (count1h, count24h int, lastRelayed string) {
if len(times) == 0 {
return 0, 0, ""
}
i1h := sort.Search(len(times), func(i int) bool { return times[i] >= now-3600000 })
i24h := sort.Search(len(times), func(i int) bool { return times[i] >= now-86400000 })
count1h = len(times) - i1h
count24h = len(times) - i24h
lastRelayed = time.UnixMilli(times[len(times)-1]).UTC().Format(time.RFC3339)
return
}
// removeTxFromPathHopIndex removes a transmission from all its raw path-hop index entries.
// Resolved pubkey entries are cleaned up via removeFromResolvedPubkeyIndex.
func removeTxFromPathHopIndex(idx map[string][]*StoreTx, tx *StoreTx) {
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,163 @@
# Repeater Liveness — Design Spec
**Issue:** Kpa-clawbot/CoreScope#662
**Date:** 2026-04-15
**Status:** Approved
---
## Problem
CoreScope conflates two distinct repeater states into "active":
| State | Adverts | In paths | Meaning |
|---|---|---|---|
| Relaying | Recent | Yes | Up and forwarding traffic |
| Alive but idle | Recent | No | Up, but nothing routed through it |
| Down | None | None | Offline |
An operator cannot tell whether a repeater is healthy and carrying traffic, or healthy but not actually relaying anything.
---
## Scope
M1 (backend relay metrics) + M2 (frontend three-state indicator). M3 (repeater dashboard) is out of scope for this implementation.
---
## Architecture
### Data Structure (Backend)
Add a parallel index to `PacketStore` in `store.go`:
```go
relayTimes map[string][]int64 // lowercase pubkey → sorted []int64 unix-millis
```
- Indexed by **full pubkeys only** (from `tx.ResolvedPath`) — not raw hop prefixes, avoiding hash-collision noise
- Maintained under the existing `s.mu` RWMutex — no new lock needed
- Lives parallel to `byPathHop`, sharing the same add/remove call sites
### Index Maintenance
**On add** — called from `addTxToPathHopIndex`:
- For each non-nil entry in `tx.ResolvedPath`
- Parse `tx.FirstSeen` → unix millis
- Binary-search insert into sorted slice
**On remove** — called from `removeTxFromPathHopIndex`:
- For each non-nil entry in `tx.ResolvedPath`
- Remove the millis value from the sorted slice
**On build** — called from `buildPathHopIndex`:
- Populate `relayTimes` in the same pass
### Query (O(log n))
```go
now := time.Now().UnixMilli()
times := s.relayTimes[lowerPK]
count1h := len(times) - sort.Search(len(times), func(i int) bool { return times[i] >= now-3600000 })
count24h := len(times) - sort.Search(len(times), func(i int) bool { return times[i] >= now-86400000 })
lastRelayed := times[len(times)-1] // free — last element of sorted slice
```
---
## API
No new endpoints. Relay metrics are added to existing health responses.
### `GET /api/nodes/bulk-health` and `GET /api/nodes/{pubkey}/health`
Added to the `stats` sub-object, **repeater nodes only** (fields absent for other roles):
```json
{
"stats": {
"lastHeard": "2026-04-15T10:00:00Z",
"packetsToday": 12,
"relay_count_1h": 3,
"relay_count_24h": 47,
"last_relayed": "2026-04-15T09:58:00Z"
}
}
```
`last_relayed` is an RFC3339 string (consistent with existing timestamp fields). Omitted when `relayTimes[pk]` is empty.
---
## Frontend
### `roles.js` — extend `getNodeStatus`
```js
// Signature change:
// Before: getNodeStatus(role, lastSeenMs) → 'active' | 'stale'
// After: getNodeStatus(role, lastSeenMs, relayCount24h) → 'relaying' | 'active' | 'stale'
```
Logic:
- Non-repeaters: `relayCount24h` ignored, returns `'active'` or `'stale'` as before — **no behaviour change**
- Repeaters:
- Stale threshold exceeded → `'stale'`
- Within threshold + `relayCount24h > 0``'relaying'`
- Within threshold + `relayCount24h == 0``'active'` (alive but idle)
### `nodes.js` — `getStatusInfo` and render
- Extract `relay_count_24h`, `relay_count_1h`, `last_relayed` from `n.stats`
- Pass `relay_count_24h` into `getNodeStatus`
**Status labels (repeaters):**
| Status | Label | Explanation |
|---|---|---|
| `'relaying'` | `🟢 Relaying` | `"Relayed N packets in last 24h, last X ago"` |
| `'active'` | `🟡 Idle` | `"Alive but no relay traffic in 24h"` |
| `'stale'` | `⚪ Stale` | existing text |
**Non-repeaters:** labels unchanged (`🟢 Active` / `⚪ Stale`).
**Node detail pane:** relay stats row added to the stats table for repeaters:
- `Relay (1h)` / `Relay (24h)` / `Last Relayed`
- Row hidden for non-repeater roles
**Status filter buttons:** `'relaying'` maps to the `active` bucket — filter UI unchanged.
---
## Testing
### Backend (Go) — new test cases in `store_test.go` or dedicated file
- `addTxToRelayTimeIndex`: insert packets with known timestamps → verify sorted order
- Count at 1h/24h boundaries: packets straddling the window edge → correct counts
- `removeFromRelayTimeIndex`: add then remove → slice returns to original state
- `GetBulkHealth` relay fields: repeater with relay activity → fields present; companion → fields absent
- Eviction: add packets, evict oldest → relay_count_24h drops correctly
- `last_relayed`: equals timestamp of most recently relayed packet
- Empty `relayTimes`: no panic, fields omitted from response
- Node with no pubkeys in `ResolvedPath`: `relayTimes` unchanged (raw hops ignored)
### Frontend (Node.js) — `test-repeater-liveness.js`
- `getNodeStatus('repeater', recentMs, 5)``'relaying'`
- `getNodeStatus('repeater', recentMs, 0)``'active'`
- `getNodeStatus('repeater', staleMs, 5)``'stale'`
- `getNodeStatus('companion', recentMs, 0)``'active'` (no three-state for non-repeaters)
- `getNodeStatus('companion', recentMs, 99)``'active'` (relay count ignored for non-repeaters)
- Status label: `'relaying'``🟢 Relaying`
- Status label: `'active'` on repeater → `🟡 Idle`
---
## Limitations
1. **Observer coverage gaps**: if no observer hears traffic through a repeater, relay activity won't be recorded even if the repeater is relaying. Inherent to passive observation.
2. **Low-traffic networks**: zero relay activity ≠ broken. The "Idle" label must be clearly worded.
3. **Hash collisions**: mitigated by indexing full pubkeys only (resolved path), not raw hop prefixes.
4. **Memory**: `relayTimes` adds one `int64` per relay event per node. Bounded by store packet limit — acceptable.