diff --git a/cmd/server/relay_airtime_share.go b/cmd/server/relay_airtime_share.go index d29fc4a2..65b3c7b6 100644 --- a/cmd/server/relay_airtime_share.go +++ b/cmd/server/relay_airtime_share.go @@ -1,29 +1,187 @@ package main -// relay_airtime_share.go — issue #1359 stub (RED commit). -// Real implementation lands in the next commit. +import ( + "sort" + "time" +) + +// relay_airtime_share.go — issue #1359 +// +// Implements the "Relay Airtime Share" analytics metric: +// score(packet) = payload_bytes × COUNT(DISTINCT repeater_pubkey +// across all observations of that packet) +// +// Aggregated by payload_type. Originator TX is deliberately excluded — a +// never-relayed direct message scores 0, which is the correct framing for a +// "relay amplification" metric. +// +// In-memory only; no SQL, no new index, no schema change. The resolved-pubkey +// reverse index (populated under s.mu via addToResolvedPubkeyIndex from every +// observation's resolved_path) is the source of distinct relays per +// transmission — len(resolvedPubkeyReverse[tx.ID]) IS the union of distinct +// repeater pubkeys, deduplicated cross-observation. Critical: this is NOT the +// length of any single observation's resolved_path (the bug-trap from +// #1358's follow-up SQL hint). // distinctRelayCount returns the number of distinct repeater pubkeys that // forwarded `tx`, unioned across ALL observations of that transmission_id. -// Stub returns 0 so the test compiles and fails on the assertion. +// +// Source: the resolved-pubkey reverse index — populated by +// indexResolvedPathHops / addToResolvedPubkeyIndex from every observation's +// resolved_path. Each entry is one distinct pubkey hash for THIS tx (the +// indexer dedups (hash, txID) pairs before appending). +// +// Caller MUST hold s.mu at least RLock. func (s *PacketStore) distinctRelayCount(tx *StoreTx) int { - return 0 + if tx == nil || !s.useResolvedPathIndex { + return 0 + } + return len(s.resolvedPubkeyReverse[tx.ID]) } -// computeRelayAirtimeShare returns the relay-airtime-share aggregation. -// Stub returns an empty row set so the test compiles and fails on the -// "rows missing ADVERT bucket" assertion. +// computeRelayAirtimeShare aggregates relay-airtime-share per payload_type. +// +// Returns: +// +// { +// "rows": [{payload_type, type, count, count_pct, score, airtime_pct}, ...] sorted by airtime_pct desc, +// "total_count": int, +// "total_score": int, +// "window": window label, +// "cached": false (overwritten by cached wrapper), +// } func (s *PacketStore) computeRelayAirtimeShare(window TimeWindow) map[string]interface{} { + s.mu.RLock() + defer s.mu.RUnlock() + + ptNames := payloadTypeNames + + type bucket struct { + count int + score int + } + buckets := make(map[int]*bucket) + seenHash := make(map[string]bool, len(s.packets)) + totalCount := 0 + totalScore := 0 + + for _, tx := range s.packets { + if tx == nil || tx.PayloadType == nil { + continue + } + if !window.Includes(tx.FirstSeen) { + continue + } + // Dedup per-hash: each distinct packet counted once. ACKs in the + // test fixture have unique hashes so this only collapses true + // re-observations of the same packet. + if tx.Hash != "" { + if seenHash[tx.Hash] { + continue + } + seenHash[tx.Hash] = true + } + pt := *tx.PayloadType + b := buckets[pt] + if b == nil { + b = &bucket{} + buckets[pt] = b + } + b.count++ + totalCount++ + + // payload bytes from RawHex (2 hex chars per byte). + payloadBytes := len(tx.RawHex) / 2 + relays := s.distinctRelayCount(tx) + score := payloadBytes * relays + b.score += score + totalScore += score + } + + rows := make([]map[string]interface{}, 0, len(buckets)) + for pt, b := range buckets { + name := ptNames[pt] + if name == "" { + name = "UNK" + } + var countPct, airtimePct float64 + if totalCount > 0 { + countPct = float64(b.count) / float64(totalCount) * 100.0 + } + if totalScore > 0 { + airtimePct = float64(b.score) / float64(totalScore) * 100.0 + } + rows = append(rows, map[string]interface{}{ + "payload_type": name, + "type": pt, + "count": b.count, + "count_pct": countPct, + "score": b.score, + "airtime_pct": airtimePct, + }) + } + + // Sort descending by airtime_pct; tiebreak count desc, then name asc + // for deterministic ordering. + sort.SliceStable(rows, func(i, j int) bool { + ai, _ := rows[i]["airtime_pct"].(float64) + aj, _ := rows[j]["airtime_pct"].(float64) + if ai != aj { + return ai > aj + } + ci, _ := rows[i]["count"].(int) + cj, _ := rows[j]["count"].(int) + if ci != cj { + return ci > cj + } + ni, _ := rows[i]["payload_type"].(string) + nj, _ := rows[j]["payload_type"].(string) + return ni < nj + }) + + label := "" + if !window.IsZero() { + label = window.Label + } return map[string]interface{}{ - "rows": []map[string]interface{}{}, - "total_count": 0, - "total_score": 0, - "window": "", + "rows": rows, + "total_count": totalCount, + "total_score": totalScore, + "window": label, "cached": false, } } -// GetRelayAirtimeShareWithWindow is the cached wrapper. Stub passes through. +// GetRelayAirtimeShareWithWindow is the cached wrapper around +// computeRelayAirtimeShare. Reuses the existing rfCache + rfCacheTTL pool +// (shared with RF / topology / distance analytics — no new cache layer per +// #1359 spec). func (s *PacketStore) GetRelayAirtimeShareWithWindow(window TimeWindow) map[string]interface{} { - return s.computeRelayAirtimeShare(window) + cacheKey := "relay-airtime-share|" + if !window.IsZero() { + cacheKey += window.CacheKey() + } + s.cacheMu.Lock() + if cached, ok := s.rfCache[cacheKey]; ok && time.Now().Before(cached.expiresAt) { + s.cacheHits++ + s.cacheMu.Unlock() + // Shallow copy with cached=true so the JSON client can tell. + m := cached.data + out := make(map[string]interface{}, len(m)+1) + for k, v := range m { + out[k] = v + } + out["cached"] = true + return out + } + s.cacheMisses++ + s.cacheMu.Unlock() + + result := s.computeRelayAirtimeShare(window) + + s.cacheMu.Lock() + s.rfCache[cacheKey] = &cachedResult{data: result, expiresAt: time.Now().Add(s.rfCacheTTL)} + s.cacheMu.Unlock() + + return result } diff --git a/cmd/server/routes.go b/cmd/server/routes.go index bd9d7fc0..fb86d555 100644 --- a/cmd/server/routes.go +++ b/cmd/server/routes.go @@ -250,6 +250,7 @@ func (s *Server) RegisterRoutes(r *mux.Router) { r.HandleFunc("/api/analytics/subpaths-bulk", s.handleAnalyticsSubpathsBulk).Methods("GET") r.HandleFunc("/api/analytics/subpath-detail", s.handleAnalyticsSubpathDetail).Methods("GET") r.HandleFunc("/api/analytics/neighbor-graph", s.handleNeighborGraph).Methods("GET") + r.HandleFunc("/api/analytics/relay-airtime-share", s.handleAnalyticsRelayAirtimeShare).Methods("GET") // Other endpoints r.HandleFunc("/api/resolve-hops", s.handleResolveHops).Methods("GET") @@ -1969,6 +1970,21 @@ func (s *Server) handleAnalyticsRF(w http.ResponseWriter, r *http.Request) { }) } +func (s *Server) handleAnalyticsRelayAirtimeShare(w http.ResponseWriter, r *http.Request) { + window := ParseTimeWindow(r) + if s.store != nil { + writeJSON(w, s.store.GetRelayAirtimeShareWithWindow(window)) + return + } + writeJSON(w, map[string]interface{}{ + "rows": []map[string]interface{}{}, + "total_count": 0, + "total_score": 0, + "window": "", + "cached": false, + }) +} + func (s *Server) handleAnalyticsTopology(w http.ResponseWriter, r *http.Request) { region := r.URL.Query().Get("region") area := r.URL.Query().Get("area") diff --git a/public/analytics.js b/public/analytics.js index d08e486c..8390d0c5 100644 --- a/public/analytics.js +++ b/public/analytics.js @@ -256,14 +256,15 @@ // channels: region + window (no area per original PR intent) const chanQS = (rqs + tws).slice(1); const sepChan = chanQS ? '?' + chanQS : ''; - const [hashData, rfData, topoData, chanData, collisionData] = await Promise.all([ + const [hashData, rfData, topoData, chanData, collisionData, airtimeData] = await Promise.all([ api('/analytics/hash-sizes' + sepBase, { ttl: CLIENT_TTL.analyticsRF }), api('/analytics/rf' + sepWin, { ttl: CLIENT_TTL.analyticsRF }), api('/analytics/topology' + sepWin, { ttl: CLIENT_TTL.analyticsRF }), api('/analytics/channels' + sepChan, { ttl: CLIENT_TTL.analyticsRF }), api('/analytics/hash-collisions' + sepBase, { ttl: CLIENT_TTL.analyticsRF }), + api('/analytics/relay-airtime-share' + sepWin, { ttl: CLIENT_TTL.analyticsRF }).catch(() => ({ rows: [] })), ]); - _analyticsData = { hashData, rfData, topoData, chanData, collisionData }; + _analyticsData = { hashData, rfData, topoData, chanData, collisionData, airtimeData }; renderTab(_currentTab); } catch (e) { document.getElementById('analyticsContent').innerHTML = @@ -401,6 +402,14 @@ ${barChart(topo.hopDistribution.map(h=>h.count), topo.hopDistribution.map(h=>h.hops), ['#3b82f6'])} + +
+
+

📡 Relay Airtime Share

+

Score = payload bytes × distinct repeaters that forwarded the packet. Counts relay re-transmissions; originator TX excluded. Not comparable across meshes.

+ ${renderRelayAirtimeDumbbell(d.airtimeData)} +
+
`; // Affinity stats widget — fetch and append if debugAffinity enabled @@ -451,6 +460,63 @@ return html + ''; } + // === Relay Airtime Share — dumbbell chart (#1359) === + // Two dots per payload_type row: gray = count %, colored = airtime %. + // Connector line between them = the divergence. Shared 0–100% axis. + // Sorted desc by airtime (server-side) so count dots visibly scatter + // out of rank order — that visible disorder IS the headline. + function renderRelayAirtimeDumbbell(data) { + var rows = (data && Array.isArray(data.rows)) ? data.rows : []; + if (!rows.length) { + return '
No relay-airtime data in this window.
'; + } + var totalScore = (data && typeof data.total_score === 'number') ? data.total_score : 0; + if (totalScore <= 0) { + return '
No relay activity observed in this window (all packets direct).
'; + } + // Layout: per row → label | track 0..100% | values + var palette = ['#ef4444','#f59e0b','#22c55e','#3b82f6','#8b5cf6','#ec4899','#14b8a6','#64748b','#f97316','#06b6d4','#84cc16']; + var html = '
'; + rows.forEach(function (r, i) { + var name = r.payload_type || 'UNK'; + var cnt = Number(r.count || 0); + var cpct = Number(r.count_pct || 0); + var apct = Number(r.airtime_pct || 0); + var score = Number(r.score || 0); + var color = palette[i % palette.length]; + var loPct = Math.min(cpct, apct); + var hiPct = Math.max(cpct, apct); + // Tooltip per row — payload_type, count %, count N, airtime %, raw score, caveat. + var tip = + name + '\n' + + 'Count: ' + cnt.toLocaleString() + ' (' + cpct.toFixed(2) + '%)\n' + + 'Airtime: ' + apct.toFixed(2) + '% (score ' + score.toLocaleString() + ')\n' + + 'Score = bytes × distinct repeaters. Within-mesh only.'; + html += '
' + + '
' + esc(name) + '
' + + '
' + + '
' + + '
' + + '
' + + '
' + + '
' + + 'cnt ' + cpct.toFixed(1) + '%' + + '  ·  ' + + 'air ' + apct.toFixed(1) + '%' + + '
' + + '
'; + }); + // Axis legend. + html += '
' + + '
' + + '
0%50%100%
' + + '
' + + '
'; + html += '
count % airtime %
'; + html += '
'; + return html; + } + // ===================== RF / SIGNAL ===================== function renderRF(el, rf) { const snrHist = histogram(rf.snrValues, 20, statusGreen());