feat: in-memory store.GetNodeAnalytics + _parsedPath in txToMap

#195 — /api/nodes/:pubkey/analytics was hitting SQL (packets_v view)
for all queries. Added store.GetNodeAnalytics(pubkey, days) that uses
the byNode[pubkey] index + text search through decoded_json, computing
all analytics (timeline, SNR trend, type breakdown, observer coverage,
hop distribution, peer interactions, uptime heatmap, computed stats)
entirely in-memory. Route handler now uses store path when available,
falling back to SQL only when store is nil.

#196 — recentPackets from /api/nodes/:pubkey/health were missing the
_parsedPath field that Node.js includes (lazy-cached parsed path_json
array). Added _parsedPath to txToMap() output using txGetParsedPath(),
matching the Node.js packet shape.

fixes #195, fixes #196

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Kpa-clawbot
2026-03-27 21:53:21 -07:00
parent bcf7159538
commit 3cd87d766e
2 changed files with 389 additions and 0 deletions
+12
View File
@@ -1101,6 +1101,18 @@ func (s *Server) handleNodeAnalytics(w http.ResponseWriter, r *http.Request) {
days = 365
}
// Use in-memory store when available (fast path)
if s.store != nil {
result, err := s.store.GetNodeAnalytics(pubkey, days)
if err != nil || result == nil {
writeError(w, 404, "Not found")
return
}
writeJSON(w, result)
return
}
// Fallback: SQL path (no in-memory store)
node, err := s.db.GetNodeByPubkey(pubkey)
if err != nil || node == nil {
writeError(w, 404, "Not found")
+377
View File
@@ -1339,6 +1339,12 @@ func txToMap(tx *StoreTx) map[string]interface{} {
"path_json": strOrNil(tx.PathJSON),
"direction": strOrNil(tx.Direction),
}
// Include parsed path array to match Node.js output shape
if hops := txGetParsedPath(tx); len(hops) > 0 {
m["_parsedPath"] = hops
} else {
m["_parsedPath"] = nil
}
// Include observations for expand=observations support (stripped by handler when not requested)
obs := make([]map[string]interface{}, 0, len(tx.Observations))
for _, o := range tx.Observations {
@@ -3834,6 +3840,377 @@ func (s *PacketStore) GetNodeHealth(pubkey string) (map[string]interface{}, erro
}, nil
}
// GetNodeAnalytics computes analytics for a single node using in-memory byNode index.
func (s *PacketStore) GetNodeAnalytics(pubkey string, days int) (*NodeAnalyticsResponse, error) {
node, err := s.db.GetNodeByPubkey(pubkey)
if err != nil || node == nil {
return nil, err
}
name := ""
if n, ok := node["name"]; ok && n != nil {
name = fmt.Sprintf("%v", n)
}
fromTime := time.Now().Add(-time.Duration(days) * 24 * time.Hour)
fromISO := fromTime.Format(time.RFC3339)
toISO := time.Now().Format(time.RFC3339)
s.mu.RLock()
defer s.mu.RUnlock()
// Collect packets from byNode index + text search (matches Node.js findPacketsForNode)
indexed := s.byNode[pubkey]
hashSet := make(map[string]bool, len(indexed))
for _, tx := range indexed {
hashSet[tx.Hash] = true
}
var allPkts []*StoreTx
if name != "" {
for _, tx := range s.packets {
if hashSet[tx.Hash] {
allPkts = append(allPkts, tx)
} else if tx.DecodedJSON != "" && (strings.Contains(tx.DecodedJSON, name) || strings.Contains(tx.DecodedJSON, pubkey)) {
allPkts = append(allPkts, tx)
}
}
} else {
allPkts = indexed
}
// Filter by time range
var packets []*StoreTx
for _, p := range allPkts {
if p.FirstSeen > fromISO {
packets = append(packets, p)
}
}
// Activity timeline (hourly buckets)
timelineBuckets := map[string]int{}
for _, p := range packets {
if len(p.FirstSeen) >= 13 {
bucket := p.FirstSeen[:13] + ":00:00Z"
timelineBuckets[bucket]++
}
}
bucketKeys := make([]string, 0, len(timelineBuckets))
for k := range timelineBuckets {
bucketKeys = append(bucketKeys, k)
}
sort.Strings(bucketKeys)
activityTimeline := make([]TimeBucket, 0, len(bucketKeys))
for _, k := range bucketKeys {
b := k
activityTimeline = append(activityTimeline, TimeBucket{Bucket: &b, Count: timelineBuckets[k]})
}
// SNR trend
snrTrend := make([]SnrTrendEntry, 0)
for _, p := range packets {
if p.SNR != nil {
snrTrend = append(snrTrend, SnrTrendEntry{
Timestamp: p.FirstSeen,
SNR: floatPtrOrNil(p.SNR),
RSSI: floatPtrOrNil(p.RSSI),
ObserverID: strOrNil(p.ObserverID),
ObserverName: strOrNil(p.ObserverName),
})
}
}
// Packet type breakdown
typeBuckets := map[int]int{}
for _, p := range packets {
if p.PayloadType != nil {
typeBuckets[*p.PayloadType]++
}
}
packetTypeBreakdown := make([]PayloadTypeCount, 0, len(typeBuckets))
for pt, cnt := range typeBuckets {
packetTypeBreakdown = append(packetTypeBreakdown, PayloadTypeCount{PayloadType: pt, Count: cnt})
}
// Observer coverage
type obsAccum struct {
name string
snrSum, rssiSum float64
snrCount, rssiCount, count int
first, last string
}
obsMap := map[string]*obsAccum{}
for _, p := range packets {
if p.ObserverID == "" {
continue
}
o := obsMap[p.ObserverID]
if o == nil {
o = &obsAccum{name: p.ObserverName, first: p.FirstSeen, last: p.FirstSeen}
obsMap[p.ObserverID] = o
}
o.count++
if p.SNR != nil {
o.snrSum += *p.SNR
o.snrCount++
}
if p.RSSI != nil {
o.rssiSum += *p.RSSI
o.rssiCount++
}
if p.FirstSeen < o.first {
o.first = p.FirstSeen
}
if p.FirstSeen > o.last {
o.last = p.FirstSeen
}
}
observerCoverage := make([]NodeObserverStatsResp, 0, len(obsMap))
for id, o := range obsMap {
var avgSnr, avgRssi interface{}
if o.snrCount > 0 {
avgSnr = o.snrSum / float64(o.snrCount)
}
if o.rssiCount > 0 {
avgRssi = o.rssiSum / float64(o.rssiCount)
}
observerCoverage = append(observerCoverage, NodeObserverStatsResp{
ObserverID: id,
ObserverName: o.name,
PacketCount: o.count,
AvgSnr: avgSnr,
AvgRssi: avgRssi,
FirstSeen: o.first,
LastSeen: o.last,
})
}
sort.Slice(observerCoverage, func(i, j int) bool {
return observerCoverage[i].PacketCount > observerCoverage[j].PacketCount
})
// Hop distribution
hopCounts := map[string]int{}
totalWithPath := 0
relayedCount := 0
for _, p := range packets {
hops := txGetParsedPath(p)
if len(hops) > 0 {
key := fmt.Sprintf("%d", len(hops))
if len(hops) >= 4 {
key = "4+"
}
hopCounts[key]++
totalWithPath++
if len(hops) > 1 {
relayedCount++
}
} else {
hopCounts["0"]++
}
}
hopDistribution := make([]HopDistEntry, 0)
for _, h := range []string{"0", "1", "2", "3", "4+"} {
if c, ok := hopCounts[h]; ok {
hopDistribution = append(hopDistribution, HopDistEntry{Hops: h, Count: c})
}
}
// Peer interactions
type peerAccum struct {
key, name string
count int
lastContact string
}
peerMap := map[string]*peerAccum{}
for _, p := range packets {
if p.DecodedJSON == "" {
continue
}
var decoded map[string]interface{}
if json.Unmarshal([]byte(p.DecodedJSON), &decoded) != nil {
continue
}
type candidate struct{ key, name string }
var candidates []candidate
if sk, ok := decoded["sender_key"].(string); ok && sk != "" && sk != pubkey {
sn, _ := decoded["sender_name"].(string)
if sn == "" {
sn, _ = decoded["sender_short_name"].(string)
}
candidates = append(candidates, candidate{sk, sn})
}
if rk, ok := decoded["recipient_key"].(string); ok && rk != "" && rk != pubkey {
rn, _ := decoded["recipient_name"].(string)
if rn == "" {
rn, _ = decoded["recipient_short_name"].(string)
}
candidates = append(candidates, candidate{rk, rn})
}
if pk, ok := decoded["pubkey"].(string); ok && pk != "" && pk != pubkey {
nm, _ := decoded["name"].(string)
candidates = append(candidates, candidate{pk, nm})
}
for _, c := range candidates {
if c.key == "" {
continue
}
pm := peerMap[c.key]
if pm == nil {
pn := c.name
if pn == "" && len(c.key) >= 12 {
pn = c.key[:12]
}
pm = &peerAccum{key: c.key, name: pn, lastContact: p.FirstSeen}
peerMap[c.key] = pm
}
pm.count++
if p.FirstSeen > pm.lastContact {
pm.lastContact = p.FirstSeen
}
}
}
peerSlice := make([]PeerInteraction, 0, len(peerMap))
for _, pm := range peerMap {
peerSlice = append(peerSlice, PeerInteraction{
PeerKey: pm.key, PeerName: pm.name,
MessageCount: pm.count, LastContact: pm.lastContact,
})
}
sort.Slice(peerSlice, func(i, j int) bool {
return peerSlice[i].MessageCount > peerSlice[j].MessageCount
})
if len(peerSlice) > 20 {
peerSlice = peerSlice[:20]
}
// Uptime heatmap
heatBuckets := map[string]*HeatmapCell{}
for _, p := range packets {
t, err := time.Parse(time.RFC3339, p.FirstSeen)
if err != nil {
t, err = time.Parse("2006-01-02 15:04:05", p.FirstSeen)
if err != nil {
continue
}
}
dow := int(t.UTC().Weekday())
hr := t.UTC().Hour()
k := fmt.Sprintf("%d:%d", dow, hr)
if heatBuckets[k] == nil {
heatBuckets[k] = &HeatmapCell{DayOfWeek: dow, Hour: hr}
}
heatBuckets[k].Count++
}
uptimeHeatmap := make([]HeatmapCell, 0, len(heatBuckets))
for _, cell := range heatBuckets {
uptimeHeatmap = append(uptimeHeatmap, *cell)
}
// Computed stats
totalPackets := len(packets)
distinctHours := len(activityTimeline)
totalHours := float64(days) * 24
availabilityPct := 0.0
if totalHours > 0 {
availabilityPct = round(float64(distinctHours)*100.0/totalHours, 1)
if availabilityPct > 100 {
availabilityPct = 100
}
}
var avgPacketsPerDay float64
if days > 0 {
avgPacketsPerDay = round(float64(totalPackets)/float64(days), 1)
}
// Longest silence
var longestSilenceMs int
var longestSilenceStart interface{}
if len(activityTimeline) >= 2 {
for i := 1; i < len(activityTimeline); i++ {
var t1Str, t2Str string
if activityTimeline[i-1].Bucket != nil {
t1Str = *activityTimeline[i-1].Bucket
}
if activityTimeline[i].Bucket != nil {
t2Str = *activityTimeline[i].Bucket
}
t1, e1 := time.Parse(time.RFC3339, t1Str)
t2, e2 := time.Parse(time.RFC3339, t2Str)
if e1 == nil && e2 == nil {
gap := int(t2.Sub(t1).Milliseconds())
if gap > longestSilenceMs {
longestSilenceMs = gap
longestSilenceStart = t1Str
}
}
}
}
// Signal grade & SNR stats
var snrMean, snrStdDev float64
if len(snrTrend) > 0 {
var sum float64
for _, e := range snrTrend {
if v, ok := e.SNR.(float64); ok {
sum += v
}
}
snrMean = sum / float64(len(snrTrend))
if len(snrTrend) > 1 {
var sqSum float64
for _, e := range snrTrend {
if v, ok := e.SNR.(float64); ok {
sqSum += (v - snrMean) * (v - snrMean)
}
}
snrStdDev = math.Sqrt(sqSum / float64(len(snrTrend)))
}
}
signalGrade := "D"
if snrMean > 15 && snrStdDev < 2 {
signalGrade = "A"
} else if snrMean > 15 {
signalGrade = "A-"
} else if snrMean > 12 && snrStdDev < 3 {
signalGrade = "B+"
} else if snrMean > 8 {
signalGrade = "B"
} else if snrMean > 3 {
signalGrade = "C"
}
var relayPct float64
if totalWithPath > 0 {
relayPct = round(float64(relayedCount)*100.0/float64(totalWithPath), 1)
}
return &NodeAnalyticsResponse{
Node: node,
TimeRange: TimeRangeResp{From: fromISO, To: toISO, Days: days},
ActivityTimeline: activityTimeline,
SnrTrend: snrTrend,
PacketTypeBreakdown: packetTypeBreakdown,
ObserverCoverage: observerCoverage,
HopDistribution: hopDistribution,
PeerInteractions: peerSlice,
UptimeHeatmap: uptimeHeatmap,
ComputedStats: ComputedNodeStats{
AvailabilityPct: availabilityPct,
LongestSilenceMs: longestSilenceMs,
LongestSilenceStart: longestSilenceStart,
SignalGrade: signalGrade,
SnrMean: round(snrMean, 1),
SnrStdDev: round(snrStdDev, 1),
RelayPct: relayPct,
TotalPackets: totalPackets,
UniqueObservers: len(observerCoverage),
UniquePeers: len(peerSlice),
AvgPacketsPerDay: avgPacketsPerDay,
},
}, nil
}
func (s *PacketStore) GetAnalyticsSubpaths(region string, minLen, maxLen, limit int) map[string]interface{} {
cacheKey := fmt.Sprintf("%s|%d|%d|%d", region, minLen, maxLen, limit)