analytics: Relay Airtime Share endpoint + dumbbell chart (#1359) (#1601)

Implements the locked spec from #1359.

Red commit: 68a140a8 — `distinctRelayCount` stub returns 0; test fails
on assertion (compiles + runs to assertion, not a build error).
Green commit: 48c2ddad — real implementation.

## Backend (in-memory, no SQL, no schema change)

- `cmd/server/relay_airtime_share.go`
- `distinctRelayCount(tx)` — unions the resolved-pubkey reverse index
for `tx.ID`. That index already dedups `(pubkey-hash, txID)` pairs
across every observation's `resolved_path`, so its length IS the count
of distinct repeaters that forwarded the packet. NOT length of any
single observation's resolved_path (the bug-trap from #1358).
- `computeRelayAirtimeShare(window)` — per-tx `score = payload_bytes ×
distinctRelays`, bucketed by `payload_type`, sorted desc by airtime_pct.
- `GetRelayAirtimeShareWithWindow` — cached behind existing `rfCache` +
`rfCacheTTL` pool. Shallow-copies the cached payload with `cached=true`
for the client.
- `cmd/server/routes.go` — `GET
/api/analytics/relay-airtime-share?window=…` returning
`{rows:[{payload_type,type,count,count_pct,score,airtime_pct}],
total_count, total_score, window, cached}`.

## Frontend

- `public/analytics.js`
- `renderRelayAirtimeDumbbell(data)` — horizontal dumbbell chart per
payload_type. Gray dot = count %, colored dot = airtime %, connector
line between them = the divergence, shared 0-100% axis, sorted desc by
airtime.
- Tooltip: payload_type, count %, count N, airtime %, raw score,
within-mesh caveat.
  - Title: **Relay Airtime Share**.
- Subtitle (exact): `Score = payload bytes × distinct repeaters that
forwarded the packet. Counts relay re-transmissions; originator TX
excluded. Not comparable across meshes.`
  - Mounted on the Overview tab immediately beneath Payload Type Mix.

## Tests

`TestRelayAirtimeShare_ADVERTvsACKDivergence` — the locked acceptance
scenario:

- 1 ADVERT (200 B, 8 distinct relays) → score 1600, airtime 100%
- 1000 ACKs (10 B, 0 relays each)     → score 0,    airtime 0%
- Count distribution is the inverse (ACK 99.9%, ADVERT 0.1%).
- Sort assertion: ADVERT is rows[0] by airtime_pct desc.

Full suite: `go test -short ./cmd/server/...` → PASS (25.9s).

## Acceptance criteria

- [x] In-memory `airtime_usage_score` accumulator in analytics path
- [x] `distinctRelayCount(tx)` helper unioning resolved-pubkey reverse
index across all observations of `transmission_id`
- [x] `/api/analytics/relay-airtime-share?window=…` endpoint
- [x] Cached via existing `rfCache` + `rfCacheTTL`; no new cache layer
- [x] Dumbbell chart on `/analytics` beneath Payload Type Mix;
gray=count, colored=airtime, shared axis, sorted desc by airtime
- [x] Title + subtitle exactly as specified
- [x] Tooltip with payload_type, count %, count N, airtime %, raw score,
caveat
- [x] Unit test demonstrates the ADVERT-vs-ACK divergence
- [x] No new SQL, no new index, no schema migration (verified via diff)
- [ ] Live staging bench (<5ms p99 uncached / <1ms cached) — deferred to
follow-up; cached behind 60s `rfCacheTTL` so steady-state cost is a map
lookup

## Preflight overrides

- Branch scope cross-stack: justified — backend endpoint and frontend
chart are a single deliverable per #1359 spec (one chart bound to one
endpoint, no incremental staging).

Fixes #1359

---------

Co-authored-by: bot <bot@local>
This commit is contained in:
Kpa-clawbot
2026-06-06 20:46:24 -07:00
committed by GitHub
parent a26a412c9b
commit 3898688d6d
4 changed files with 456 additions and 2 deletions
+187
View File
@@ -0,0 +1,187 @@
package main
import (
"sort"
"time"
)
// relay_airtime_share.go — issue #1359
//
// Implements the "Relay Airtime Share" analytics metric:
// score(packet) = payload_bytes × COUNT(DISTINCT repeater_pubkey
// across all observations of that packet)
//
// Aggregated by payload_type. Originator TX is deliberately excluded — a
// never-relayed direct message scores 0, which is the correct framing for a
// "relay amplification" metric.
//
// In-memory only; no SQL, no new index, no schema change. The resolved-pubkey
// reverse index (populated under s.mu via addToResolvedPubkeyIndex from every
// observation's resolved_path) is the source of distinct relays per
// transmission — len(resolvedPubkeyReverse[tx.ID]) IS the union of distinct
// repeater pubkeys, deduplicated cross-observation. Critical: this is NOT the
// length of any single observation's resolved_path (the bug-trap from
// #1358's follow-up SQL hint).
// distinctRelayCount returns the number of distinct repeater pubkeys that
// forwarded `tx`, unioned across ALL observations of that transmission_id.
//
// Source: the resolved-pubkey reverse index — populated by
// indexResolvedPathHops / addToResolvedPubkeyIndex from every observation's
// resolved_path. Each entry is one distinct pubkey hash for THIS tx (the
// indexer dedups (hash, txID) pairs before appending).
//
// Caller MUST hold s.mu at least RLock.
func (s *PacketStore) distinctRelayCount(tx *StoreTx) int {
if tx == nil || !s.useResolvedPathIndex {
return 0
}
return len(s.resolvedPubkeyReverse[tx.ID])
}
// computeRelayAirtimeShare aggregates relay-airtime-share per payload_type.
//
// Returns:
//
// {
// "rows": [{payload_type, type, count, count_pct, score, airtime_pct}, ...] sorted by airtime_pct desc,
// "total_count": int,
// "total_score": int,
// "window": window label,
// "cached": false (overwritten by cached wrapper),
// }
func (s *PacketStore) computeRelayAirtimeShare(window TimeWindow) map[string]interface{} {
s.mu.RLock()
defer s.mu.RUnlock()
ptNames := payloadTypeNames
type bucket struct {
count int
score int
}
buckets := make(map[int]*bucket)
seenHash := make(map[string]bool, len(s.packets))
totalCount := 0
totalScore := 0
for _, tx := range s.packets {
if tx == nil || tx.PayloadType == nil {
continue
}
if !window.Includes(tx.FirstSeen) {
continue
}
// Dedup per-hash: each distinct packet counted once. ACKs in the
// test fixture have unique hashes so this only collapses true
// re-observations of the same packet.
if tx.Hash != "" {
if seenHash[tx.Hash] {
continue
}
seenHash[tx.Hash] = true
}
pt := *tx.PayloadType
b := buckets[pt]
if b == nil {
b = &bucket{}
buckets[pt] = b
}
b.count++
totalCount++
// payload bytes from RawHex (2 hex chars per byte).
payloadBytes := len(tx.RawHex) / 2
relays := s.distinctRelayCount(tx)
score := payloadBytes * relays
b.score += score
totalScore += score
}
rows := make([]map[string]interface{}, 0, len(buckets))
for pt, b := range buckets {
name := ptNames[pt]
if name == "" {
name = "UNK"
}
var countPct, airtimePct float64
if totalCount > 0 {
countPct = float64(b.count) / float64(totalCount) * 100.0
}
if totalScore > 0 {
airtimePct = float64(b.score) / float64(totalScore) * 100.0
}
rows = append(rows, map[string]interface{}{
"payload_type": name,
"type": pt,
"count": b.count,
"count_pct": countPct,
"score": b.score,
"airtime_pct": airtimePct,
})
}
// Sort descending by airtime_pct; tiebreak count desc, then name asc
// for deterministic ordering.
sort.SliceStable(rows, func(i, j int) bool {
ai, _ := rows[i]["airtime_pct"].(float64)
aj, _ := rows[j]["airtime_pct"].(float64)
if ai != aj {
return ai > aj
}
ci, _ := rows[i]["count"].(int)
cj, _ := rows[j]["count"].(int)
if ci != cj {
return ci > cj
}
ni, _ := rows[i]["payload_type"].(string)
nj, _ := rows[j]["payload_type"].(string)
return ni < nj
})
label := ""
if !window.IsZero() {
label = window.Label
}
return map[string]interface{}{
"rows": rows,
"total_count": totalCount,
"total_score": totalScore,
"window": label,
"cached": false,
}
}
// GetRelayAirtimeShareWithWindow is the cached wrapper around
// computeRelayAirtimeShare. Reuses the existing rfCache + rfCacheTTL pool
// (shared with RF / topology / distance analytics — no new cache layer per
// #1359 spec).
func (s *PacketStore) GetRelayAirtimeShareWithWindow(window TimeWindow) map[string]interface{} {
cacheKey := "relay-airtime-share|"
if !window.IsZero() {
cacheKey += window.CacheKey()
}
s.cacheMu.Lock()
if cached, ok := s.rfCache[cacheKey]; ok && time.Now().Before(cached.expiresAt) {
s.cacheHits++
s.cacheMu.Unlock()
// Shallow copy with cached=true so the JSON client can tell.
m := cached.data
out := make(map[string]interface{}, len(m)+1)
for k, v := range m {
out[k] = v
}
out["cached"] = true
return out
}
s.cacheMisses++
s.cacheMu.Unlock()
result := s.computeRelayAirtimeShare(window)
s.cacheMu.Lock()
s.rfCache[cacheKey] = &cachedResult{data: result, expiresAt: time.Now().Add(s.rfCacheTTL)}
s.cacheMu.Unlock()
return result
}
+185
View File
@@ -0,0 +1,185 @@
package main
import (
"strings"
"testing"
)
// newRelayAirtimeShareTestStore builds a minimal PacketStore for testing
// computeRelayAirtimeShare without any DB or background workers.
func newRelayAirtimeShareTestStore(packets []*StoreTx) *PacketStore {
ps := &PacketStore{
packets: packets,
byHash: make(map[string]*StoreTx),
byTxID: make(map[int]*StoreTx),
byObsID: make(map[int]*StoreObs),
byObserver: make(map[string][]*StoreObs),
byNode: make(map[string][]*StoreTx),
byPathHop: make(map[string][]*StoreTx),
nodeHashes: make(map[string]map[string]bool),
byPayloadType: make(map[int][]*StoreTx),
rfCache: make(map[string]*cachedResult),
topoCache: make(map[string]*cachedResult),
hashCache: make(map[string]*cachedResult),
collisionCache: make(map[string]*cachedResult),
chanCache: make(map[string]*cachedResult),
distCache: make(map[string]*cachedResult),
subpathCache: make(map[string]*cachedResult),
spIndex: make(map[string]int),
spTxIndex: make(map[string][]*StoreTx),
advertPubkeys: make(map[string]int),
}
ps.useResolvedPathIndex = true
ps.initResolvedPathIndex()
for _, tx := range packets {
ps.byTxID[tx.ID] = tx
if tx.Hash != "" {
ps.byHash[tx.Hash] = tx
}
if tx.PayloadType != nil {
pt := *tx.PayloadType
ps.byPayloadType[pt] = append(ps.byPayloadType[pt], tx)
}
}
return ps
}
// makeRelayAirtimeTx builds a synthetic transmission with rawHex sized for the
// given byte count and registers `distinctRelays` synthetic resolved-path
// pubkeys via the resolved-pubkey reverse index — same source that
// distinctRelayCount must read from.
func makeRelayAirtimeTx(id int, payloadType int, payloadBytes int, distinctRelays int, hashPrefix string) *StoreTx {
pt := payloadType
tx := &StoreTx{
ID: id,
Hash: hashPrefix,
FirstSeen: "2026-01-01T00:00:00Z",
PayloadType: &pt,
RawHex: strings.Repeat("ab", payloadBytes), // 2 hex chars per byte
}
return tx
}
// TestRelayAirtimeShare_ADVERTvsACKDivergence is the locked acceptance test
// from issue #1359:
// - 1 ADVERT, 200 B, 8 distinct relays → score = 200 * 8 = 1600
// - 1000 ACKs, 10 B each, 0 relays → score = 0
//
// Count distribution: ACK 1000/1001 = 99.90%, ADVERT 0.10%.
// Airtime distribution: ADVERT 1600/1600 = 100%, ACK 0%.
//
// This is the headline divergence the dumbbell chart must visualize.
func TestRelayAirtimeShare_ADVERTvsACKDivergence(t *testing.T) {
packets := make([]*StoreTx, 0, 1001)
// 1 ADVERT with 200 bytes payload + 8 distinct relays
advert := makeRelayAirtimeTx(1, PayloadADVERT, 200, 8, "ad000001")
packets = append(packets, advert)
// 1000 ACKs with 10 bytes payload + 0 relays
for i := 0; i < 1000; i++ {
ack := makeRelayAirtimeTx(100+i, PayloadACK, 10, 0, "")
// Give each a unique hash so dedup doesn't collapse them.
ack.Hash = "ac" + zeroPad(i, 6)
packets = append(packets, ack)
}
store := newRelayAirtimeShareTestStore(packets)
// Wire up the 8 distinct relay pubkeys for the ADVERT through the
// resolved-pubkey reverse index — the helper distinctRelayCount must
// read from this source (union across all observations of tx.ID).
relayPks := []string{
"relay01", "relay02", "relay03", "relay04",
"relay05", "relay06", "relay07", "relay08",
}
store.addToResolvedPubkeyIndex(advert.ID, relayPks)
// Sanity check the helper directly.
if got := store.distinctRelayCount(advert); got != 8 {
t.Fatalf("distinctRelayCount(ADVERT) = %d, want 8", got)
}
if got := store.distinctRelayCount(packets[1]); got != 0 {
t.Fatalf("distinctRelayCount(ACK) = %d, want 0", got)
}
result := store.computeRelayAirtimeShare(TimeWindow{})
rows, ok := result["rows"].([]map[string]interface{})
if !ok {
t.Fatalf("result['rows'] missing or wrong type: %T", result["rows"])
}
if len(rows) < 2 {
t.Fatalf("expected at least 2 rows (ADVERT, ACK), got %d: %+v", len(rows), rows)
}
// Index by payload_type name.
byType := make(map[string]map[string]interface{})
for _, r := range rows {
name, _ := r["payload_type"].(string)
byType[name] = r
}
advertRow, hasAdvert := byType["ADVERT"]
ackRow, hasACK := byType["ACK"]
if !hasAdvert {
t.Fatalf("rows missing ADVERT bucket: %+v", rows)
}
if !hasACK {
t.Fatalf("rows missing ACK bucket: %+v", rows)
}
// Count percentages: ACK should be ~99.9%, ADVERT ~0.1%.
ackCountPct, _ := ackRow["count_pct"].(float64)
advertCountPct, _ := advertRow["count_pct"].(float64)
if !(ackCountPct > 99.0 && ackCountPct < 100.0) {
t.Errorf("ACK count_pct = %.4f, want ~99.9", ackCountPct)
}
if !(advertCountPct < 1.0 && advertCountPct > 0.0) {
t.Errorf("ADVERT count_pct = %.4f, want ~0.1", advertCountPct)
}
// Airtime percentages: ADVERT should be 100%, ACK 0%.
advertAirtimePct, _ := advertRow["airtime_pct"].(float64)
ackAirtimePct, _ := ackRow["airtime_pct"].(float64)
if advertAirtimePct < 99.5 || advertAirtimePct > 100.001 {
t.Errorf("ADVERT airtime_pct = %.4f, want 100.0", advertAirtimePct)
}
if ackAirtimePct != 0.0 {
t.Errorf("ACK airtime_pct = %.4f, want 0.0", ackAirtimePct)
}
// Raw score check: ADVERT = 200 * 8 = 1600.
advertScore, _ := advertRow["score"].(int)
if advertScore != 1600 {
t.Errorf("ADVERT score = %d, want 1600 (200B × 8 relays)", advertScore)
}
ackScore, _ := ackRow["score"].(int)
if ackScore != 0 {
t.Errorf("ACK score = %d, want 0 (no relays)", ackScore)
}
// Count integer check.
advertCount, _ := advertRow["count"].(int)
if advertCount != 1 {
t.Errorf("ADVERT count = %d, want 1", advertCount)
}
ackCount, _ := ackRow["count"].(int)
if ackCount != 1000 {
t.Errorf("ACK count = %d, want 1000", ackCount)
}
// The divergence: ADVERT should rank #1 by airtime even though its
// count share is the smallest. This is the whole point of the chart.
if rows[0]["payload_type"] != "ADVERT" {
t.Errorf("rows must be sorted by airtime_pct desc; rows[0] payload_type = %v, want ADVERT", rows[0]["payload_type"])
}
}
func zeroPad(n, width int) string {
s := ""
for i := 0; i < width; i++ {
s = string(rune('0'+(n%10))) + s
n /= 10
}
return s
}
+16
View File
@@ -250,6 +250,7 @@ func (s *Server) RegisterRoutes(r *mux.Router) {
r.HandleFunc("/api/analytics/subpaths-bulk", s.handleAnalyticsSubpathsBulk).Methods("GET")
r.HandleFunc("/api/analytics/subpath-detail", s.handleAnalyticsSubpathDetail).Methods("GET")
r.HandleFunc("/api/analytics/neighbor-graph", s.handleNeighborGraph).Methods("GET")
r.HandleFunc("/api/analytics/relay-airtime-share", s.handleAnalyticsRelayAirtimeShare).Methods("GET")
// Other endpoints
r.HandleFunc("/api/resolve-hops", s.handleResolveHops).Methods("GET")
@@ -1969,6 +1970,21 @@ func (s *Server) handleAnalyticsRF(w http.ResponseWriter, r *http.Request) {
})
}
func (s *Server) handleAnalyticsRelayAirtimeShare(w http.ResponseWriter, r *http.Request) {
window := ParseTimeWindow(r)
if s.store != nil {
writeJSON(w, s.store.GetRelayAirtimeShareWithWindow(window))
return
}
writeJSON(w, map[string]interface{}{
"rows": []map[string]interface{}{},
"total_count": 0,
"total_score": 0,
"window": "",
"cached": false,
})
}
func (s *Server) handleAnalyticsTopology(w http.ResponseWriter, r *http.Request) {
region := r.URL.Query().Get("region")
area := r.URL.Query().Get("area")
+68 -2
View File
@@ -256,14 +256,15 @@
// channels: region + window (no area per original PR intent)
const chanQS = (rqs + tws).slice(1);
const sepChan = chanQS ? '?' + chanQS : '';
const [hashData, rfData, topoData, chanData, collisionData] = await Promise.all([
const [hashData, rfData, topoData, chanData, collisionData, airtimeData] = await Promise.all([
api('/analytics/hash-sizes' + sepBase, { ttl: CLIENT_TTL.analyticsRF }),
api('/analytics/rf' + sepWin, { ttl: CLIENT_TTL.analyticsRF }),
api('/analytics/topology' + sepWin, { ttl: CLIENT_TTL.analyticsRF }),
api('/analytics/channels' + sepChan, { ttl: CLIENT_TTL.analyticsRF }),
api('/analytics/hash-collisions' + sepBase, { ttl: CLIENT_TTL.analyticsRF }),
api('/analytics/relay-airtime-share' + sepWin, { ttl: CLIENT_TTL.analyticsRF }).catch(() => ({ rows: [] })),
]);
_analyticsData = { hashData, rfData, topoData, chanData, collisionData };
_analyticsData = { hashData, rfData, topoData, chanData, collisionData, airtimeData };
renderTab(_currentTab);
} catch (e) {
document.getElementById('analyticsContent').innerHTML =
@@ -401,6 +402,14 @@
${barChart(topo.hopDistribution.map(h=>h.count), topo.hopDistribution.map(h=>h.hops), ['#3b82f6'])}
</div>
</div>
<div class="analytics-row">
<div class="analytics-card flex-1">
<h3>📡 Relay Airtime Share</h3>
<p class="text-muted" style="margin-top:-4px;font-size:12px">Score = payload bytes × distinct repeaters that forwarded the packet. Counts relay re-transmissions; originator TX excluded. Not comparable across meshes.</p>
${renderRelayAirtimeDumbbell(d.airtimeData)}
</div>
</div>
`;
// Affinity stats widget — fetch and append if debugAffinity enabled
@@ -451,6 +460,63 @@
return html + '</div>';
}
// === Relay Airtime Share — dumbbell chart (#1359) ===
// Two dots per payload_type row: gray = count %, colored = airtime %.
// Connector line between them = the divergence. Shared 0100% axis.
// Sorted desc by airtime (server-side) so count dots visibly scatter
// out of rank order — that visible disorder IS the headline.
function renderRelayAirtimeDumbbell(data) {
var rows = (data && Array.isArray(data.rows)) ? data.rows : [];
if (!rows.length) {
return '<div class="text-muted" style="padding:20px">No relay-airtime data in this window.</div>';
}
var totalScore = (data && typeof data.total_score === 'number') ? data.total_score : 0;
if (totalScore <= 0) {
return '<div class="text-muted" style="padding:20px">No relay activity observed in this window (all packets direct).</div>';
}
// Layout: per row → label | track 0..100% | values
var palette = ['#ef4444','#f59e0b','#22c55e','#3b82f6','#8b5cf6','#ec4899','#14b8a6','#64748b','#f97316','#06b6d4','#84cc16'];
var html = '<div class="dumbbell-chart" style="display:flex;flex-direction:column;gap:8px;padding:8px 4px">';
rows.forEach(function (r, i) {
var name = r.payload_type || 'UNK';
var cnt = Number(r.count || 0);
var cpct = Number(r.count_pct || 0);
var apct = Number(r.airtime_pct || 0);
var score = Number(r.score || 0);
var color = palette[i % palette.length];
var loPct = Math.min(cpct, apct);
var hiPct = Math.max(cpct, apct);
// Tooltip per row — payload_type, count %, count N, airtime %, raw score, caveat.
var tip =
name + '\n' +
'Count: ' + cnt.toLocaleString() + ' (' + cpct.toFixed(2) + '%)\n' +
'Airtime: ' + apct.toFixed(2) + '% (score ' + score.toLocaleString() + ')\n' +
'Score = bytes × distinct repeaters. Within-mesh only.';
html += '<div class="dumbbell-row" title="' + esc(tip) + '" style="display:grid;grid-template-columns:80px 1fr 180px;align-items:center;gap:10px;font-size:12px">' +
'<div class="dumbbell-label" style="font-weight:600;color:var(--text)">' + esc(name) + '</div>' +
'<div class="dumbbell-track" style="position:relative;height:18px;background:var(--bg-elev,rgba(127,127,127,0.12));border-radius:9px">' +
'<div class="dumbbell-connector" style="position:absolute;top:50%;left:' + loPct.toFixed(3) + '%;width:' + (hiPct - loPct).toFixed(3) + '%;height:2px;background:var(--text-muted,#888);transform:translateY(-50%);opacity:0.5"></div>' +
'<div class="dumbbell-dot dumbbell-dot-count" style="position:absolute;top:50%;left:' + cpct.toFixed(3) + '%;width:10px;height:10px;border-radius:50%;background:var(--text-muted,#888);transform:translate(-50%,-50%);border:1px solid var(--bg,#fff)" aria-label="count share"></div>' +
'<div class="dumbbell-dot dumbbell-dot-airtime" style="position:absolute;top:50%;left:' + apct.toFixed(3) + '%;width:12px;height:12px;border-radius:50%;background:' + color + ';transform:translate(-50%,-50%);border:1px solid var(--bg,#fff)" aria-label="airtime share"></div>' +
'</div>' +
'<div class="dumbbell-values" style="text-align:right;color:var(--text-muted);font-variant-numeric:tabular-nums">' +
'<span style="color:var(--text-muted)">cnt ' + cpct.toFixed(1) + '%</span>' +
' &nbsp;·&nbsp; ' +
'<span style="color:' + color + ';font-weight:600">air ' + apct.toFixed(1) + '%</span>' +
'</div>' +
'</div>';
});
// Axis legend.
html += '<div class="dumbbell-axis" style="display:grid;grid-template-columns:80px 1fr 180px;gap:10px;color:var(--text-muted);font-size:11px;margin-top:4px">' +
'<div></div>' +
'<div style="display:flex;justify-content:space-between"><span>0%</span><span>50%</span><span>100%</span></div>' +
'<div></div>' +
'</div>';
html += '<div class="dumbbell-legend" style="display:flex;gap:14px;font-size:11px;color:var(--text-muted);padding:4px 0 0 84px"><span><span style="display:inline-block;width:8px;height:8px;border-radius:50%;background:var(--text-muted,#888);vertical-align:middle"></span> count %</span><span><span style="display:inline-block;width:10px;height:10px;border-radius:50%;background:var(--accent,#3b82f6);vertical-align:middle"></span> airtime %</span></div>';
html += '</div>';
return html;
}
// ===================== RF / SIGNAL =====================
function renderRF(el, rf) {
const snrHist = histogram(rf.snrValues, 20, statusGreen());