From 3cd87d766ea9312d0670d4dc9e440e1e71cb67b3 Mon Sep 17 00:00:00 2001 From: Kpa-clawbot <259247574+Kpa-clawbot@users.noreply.github.com> Date: Fri, 27 Mar 2026 21:53:21 -0700 Subject: [PATCH] feat: in-memory store.GetNodeAnalytics + _parsedPath in txToMap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #195 — /api/nodes/:pubkey/analytics was hitting SQL (packets_v view) for all queries. Added store.GetNodeAnalytics(pubkey, days) that uses the byNode[pubkey] index + text search through decoded_json, computing all analytics (timeline, SNR trend, type breakdown, observer coverage, hop distribution, peer interactions, uptime heatmap, computed stats) entirely in-memory. Route handler now uses store path when available, falling back to SQL only when store is nil. #196 — recentPackets from /api/nodes/:pubkey/health were missing the _parsedPath field that Node.js includes (lazy-cached parsed path_json array). Added _parsedPath to txToMap() output using txGetParsedPath(), matching the Node.js packet shape. fixes #195, fixes #196 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- cmd/server/routes.go | 12 ++ cmd/server/store.go | 377 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 389 insertions(+) diff --git a/cmd/server/routes.go b/cmd/server/routes.go index 486d8c1e..88441074 100644 --- a/cmd/server/routes.go +++ b/cmd/server/routes.go @@ -1101,6 +1101,18 @@ func (s *Server) handleNodeAnalytics(w http.ResponseWriter, r *http.Request) { days = 365 } + // Use in-memory store when available (fast path) + if s.store != nil { + result, err := s.store.GetNodeAnalytics(pubkey, days) + if err != nil || result == nil { + writeError(w, 404, "Not found") + return + } + writeJSON(w, result) + return + } + + // Fallback: SQL path (no in-memory store) node, err := s.db.GetNodeByPubkey(pubkey) if err != nil || node == nil { writeError(w, 404, "Not found") diff --git a/cmd/server/store.go b/cmd/server/store.go index 8de51fec..af714b23 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -1339,6 +1339,12 @@ func txToMap(tx *StoreTx) map[string]interface{} { "path_json": strOrNil(tx.PathJSON), "direction": strOrNil(tx.Direction), } + // Include parsed path array to match Node.js output shape + if hops := txGetParsedPath(tx); len(hops) > 0 { + m["_parsedPath"] = hops + } else { + m["_parsedPath"] = nil + } // Include observations for expand=observations support (stripped by handler when not requested) obs := make([]map[string]interface{}, 0, len(tx.Observations)) for _, o := range tx.Observations { @@ -3834,6 +3840,377 @@ func (s *PacketStore) GetNodeHealth(pubkey string) (map[string]interface{}, erro }, nil } +// GetNodeAnalytics computes analytics for a single node using in-memory byNode index. +func (s *PacketStore) GetNodeAnalytics(pubkey string, days int) (*NodeAnalyticsResponse, error) { + node, err := s.db.GetNodeByPubkey(pubkey) + if err != nil || node == nil { + return nil, err + } + + name := "" + if n, ok := node["name"]; ok && n != nil { + name = fmt.Sprintf("%v", n) + } + + fromTime := time.Now().Add(-time.Duration(days) * 24 * time.Hour) + fromISO := fromTime.Format(time.RFC3339) + toISO := time.Now().Format(time.RFC3339) + + s.mu.RLock() + defer s.mu.RUnlock() + + // Collect packets from byNode index + text search (matches Node.js findPacketsForNode) + indexed := s.byNode[pubkey] + hashSet := make(map[string]bool, len(indexed)) + for _, tx := range indexed { + hashSet[tx.Hash] = true + } + var allPkts []*StoreTx + if name != "" { + for _, tx := range s.packets { + if hashSet[tx.Hash] { + allPkts = append(allPkts, tx) + } else if tx.DecodedJSON != "" && (strings.Contains(tx.DecodedJSON, name) || strings.Contains(tx.DecodedJSON, pubkey)) { + allPkts = append(allPkts, tx) + } + } + } else { + allPkts = indexed + } + + // Filter by time range + var packets []*StoreTx + for _, p := range allPkts { + if p.FirstSeen > fromISO { + packets = append(packets, p) + } + } + + // Activity timeline (hourly buckets) + timelineBuckets := map[string]int{} + for _, p := range packets { + if len(p.FirstSeen) >= 13 { + bucket := p.FirstSeen[:13] + ":00:00Z" + timelineBuckets[bucket]++ + } + } + bucketKeys := make([]string, 0, len(timelineBuckets)) + for k := range timelineBuckets { + bucketKeys = append(bucketKeys, k) + } + sort.Strings(bucketKeys) + activityTimeline := make([]TimeBucket, 0, len(bucketKeys)) + for _, k := range bucketKeys { + b := k + activityTimeline = append(activityTimeline, TimeBucket{Bucket: &b, Count: timelineBuckets[k]}) + } + + // SNR trend + snrTrend := make([]SnrTrendEntry, 0) + for _, p := range packets { + if p.SNR != nil { + snrTrend = append(snrTrend, SnrTrendEntry{ + Timestamp: p.FirstSeen, + SNR: floatPtrOrNil(p.SNR), + RSSI: floatPtrOrNil(p.RSSI), + ObserverID: strOrNil(p.ObserverID), + ObserverName: strOrNil(p.ObserverName), + }) + } + } + + // Packet type breakdown + typeBuckets := map[int]int{} + for _, p := range packets { + if p.PayloadType != nil { + typeBuckets[*p.PayloadType]++ + } + } + packetTypeBreakdown := make([]PayloadTypeCount, 0, len(typeBuckets)) + for pt, cnt := range typeBuckets { + packetTypeBreakdown = append(packetTypeBreakdown, PayloadTypeCount{PayloadType: pt, Count: cnt}) + } + + // Observer coverage + type obsAccum struct { + name string + snrSum, rssiSum float64 + snrCount, rssiCount, count int + first, last string + } + obsMap := map[string]*obsAccum{} + for _, p := range packets { + if p.ObserverID == "" { + continue + } + o := obsMap[p.ObserverID] + if o == nil { + o = &obsAccum{name: p.ObserverName, first: p.FirstSeen, last: p.FirstSeen} + obsMap[p.ObserverID] = o + } + o.count++ + if p.SNR != nil { + o.snrSum += *p.SNR + o.snrCount++ + } + if p.RSSI != nil { + o.rssiSum += *p.RSSI + o.rssiCount++ + } + if p.FirstSeen < o.first { + o.first = p.FirstSeen + } + if p.FirstSeen > o.last { + o.last = p.FirstSeen + } + } + observerCoverage := make([]NodeObserverStatsResp, 0, len(obsMap)) + for id, o := range obsMap { + var avgSnr, avgRssi interface{} + if o.snrCount > 0 { + avgSnr = o.snrSum / float64(o.snrCount) + } + if o.rssiCount > 0 { + avgRssi = o.rssiSum / float64(o.rssiCount) + } + observerCoverage = append(observerCoverage, NodeObserverStatsResp{ + ObserverID: id, + ObserverName: o.name, + PacketCount: o.count, + AvgSnr: avgSnr, + AvgRssi: avgRssi, + FirstSeen: o.first, + LastSeen: o.last, + }) + } + sort.Slice(observerCoverage, func(i, j int) bool { + return observerCoverage[i].PacketCount > observerCoverage[j].PacketCount + }) + + // Hop distribution + hopCounts := map[string]int{} + totalWithPath := 0 + relayedCount := 0 + for _, p := range packets { + hops := txGetParsedPath(p) + if len(hops) > 0 { + key := fmt.Sprintf("%d", len(hops)) + if len(hops) >= 4 { + key = "4+" + } + hopCounts[key]++ + totalWithPath++ + if len(hops) > 1 { + relayedCount++ + } + } else { + hopCounts["0"]++ + } + } + hopDistribution := make([]HopDistEntry, 0) + for _, h := range []string{"0", "1", "2", "3", "4+"} { + if c, ok := hopCounts[h]; ok { + hopDistribution = append(hopDistribution, HopDistEntry{Hops: h, Count: c}) + } + } + + // Peer interactions + type peerAccum struct { + key, name string + count int + lastContact string + } + peerMap := map[string]*peerAccum{} + for _, p := range packets { + if p.DecodedJSON == "" { + continue + } + var decoded map[string]interface{} + if json.Unmarshal([]byte(p.DecodedJSON), &decoded) != nil { + continue + } + type candidate struct{ key, name string } + var candidates []candidate + if sk, ok := decoded["sender_key"].(string); ok && sk != "" && sk != pubkey { + sn, _ := decoded["sender_name"].(string) + if sn == "" { + sn, _ = decoded["sender_short_name"].(string) + } + candidates = append(candidates, candidate{sk, sn}) + } + if rk, ok := decoded["recipient_key"].(string); ok && rk != "" && rk != pubkey { + rn, _ := decoded["recipient_name"].(string) + if rn == "" { + rn, _ = decoded["recipient_short_name"].(string) + } + candidates = append(candidates, candidate{rk, rn}) + } + if pk, ok := decoded["pubkey"].(string); ok && pk != "" && pk != pubkey { + nm, _ := decoded["name"].(string) + candidates = append(candidates, candidate{pk, nm}) + } + for _, c := range candidates { + if c.key == "" { + continue + } + pm := peerMap[c.key] + if pm == nil { + pn := c.name + if pn == "" && len(c.key) >= 12 { + pn = c.key[:12] + } + pm = &peerAccum{key: c.key, name: pn, lastContact: p.FirstSeen} + peerMap[c.key] = pm + } + pm.count++ + if p.FirstSeen > pm.lastContact { + pm.lastContact = p.FirstSeen + } + } + } + peerSlice := make([]PeerInteraction, 0, len(peerMap)) + for _, pm := range peerMap { + peerSlice = append(peerSlice, PeerInteraction{ + PeerKey: pm.key, PeerName: pm.name, + MessageCount: pm.count, LastContact: pm.lastContact, + }) + } + sort.Slice(peerSlice, func(i, j int) bool { + return peerSlice[i].MessageCount > peerSlice[j].MessageCount + }) + if len(peerSlice) > 20 { + peerSlice = peerSlice[:20] + } + + // Uptime heatmap + heatBuckets := map[string]*HeatmapCell{} + for _, p := range packets { + t, err := time.Parse(time.RFC3339, p.FirstSeen) + if err != nil { + t, err = time.Parse("2006-01-02 15:04:05", p.FirstSeen) + if err != nil { + continue + } + } + dow := int(t.UTC().Weekday()) + hr := t.UTC().Hour() + k := fmt.Sprintf("%d:%d", dow, hr) + if heatBuckets[k] == nil { + heatBuckets[k] = &HeatmapCell{DayOfWeek: dow, Hour: hr} + } + heatBuckets[k].Count++ + } + uptimeHeatmap := make([]HeatmapCell, 0, len(heatBuckets)) + for _, cell := range heatBuckets { + uptimeHeatmap = append(uptimeHeatmap, *cell) + } + + // Computed stats + totalPackets := len(packets) + distinctHours := len(activityTimeline) + totalHours := float64(days) * 24 + availabilityPct := 0.0 + if totalHours > 0 { + availabilityPct = round(float64(distinctHours)*100.0/totalHours, 1) + if availabilityPct > 100 { + availabilityPct = 100 + } + } + + var avgPacketsPerDay float64 + if days > 0 { + avgPacketsPerDay = round(float64(totalPackets)/float64(days), 1) + } + + // Longest silence + var longestSilenceMs int + var longestSilenceStart interface{} + if len(activityTimeline) >= 2 { + for i := 1; i < len(activityTimeline); i++ { + var t1Str, t2Str string + if activityTimeline[i-1].Bucket != nil { + t1Str = *activityTimeline[i-1].Bucket + } + if activityTimeline[i].Bucket != nil { + t2Str = *activityTimeline[i].Bucket + } + t1, e1 := time.Parse(time.RFC3339, t1Str) + t2, e2 := time.Parse(time.RFC3339, t2Str) + if e1 == nil && e2 == nil { + gap := int(t2.Sub(t1).Milliseconds()) + if gap > longestSilenceMs { + longestSilenceMs = gap + longestSilenceStart = t1Str + } + } + } + } + + // Signal grade & SNR stats + var snrMean, snrStdDev float64 + if len(snrTrend) > 0 { + var sum float64 + for _, e := range snrTrend { + if v, ok := e.SNR.(float64); ok { + sum += v + } + } + snrMean = sum / float64(len(snrTrend)) + if len(snrTrend) > 1 { + var sqSum float64 + for _, e := range snrTrend { + if v, ok := e.SNR.(float64); ok { + sqSum += (v - snrMean) * (v - snrMean) + } + } + snrStdDev = math.Sqrt(sqSum / float64(len(snrTrend))) + } + } + + signalGrade := "D" + if snrMean > 15 && snrStdDev < 2 { + signalGrade = "A" + } else if snrMean > 15 { + signalGrade = "A-" + } else if snrMean > 12 && snrStdDev < 3 { + signalGrade = "B+" + } else if snrMean > 8 { + signalGrade = "B" + } else if snrMean > 3 { + signalGrade = "C" + } + + var relayPct float64 + if totalWithPath > 0 { + relayPct = round(float64(relayedCount)*100.0/float64(totalWithPath), 1) + } + + return &NodeAnalyticsResponse{ + Node: node, + TimeRange: TimeRangeResp{From: fromISO, To: toISO, Days: days}, + ActivityTimeline: activityTimeline, + SnrTrend: snrTrend, + PacketTypeBreakdown: packetTypeBreakdown, + ObserverCoverage: observerCoverage, + HopDistribution: hopDistribution, + PeerInteractions: peerSlice, + UptimeHeatmap: uptimeHeatmap, + ComputedStats: ComputedNodeStats{ + AvailabilityPct: availabilityPct, + LongestSilenceMs: longestSilenceMs, + LongestSilenceStart: longestSilenceStart, + SignalGrade: signalGrade, + SnrMean: round(snrMean, 1), + SnrStdDev: round(snrStdDev, 1), + RelayPct: relayPct, + TotalPackets: totalPackets, + UniqueObservers: len(observerCoverage), + UniquePeers: len(peerSlice), + AvgPacketsPerDay: avgPacketsPerDay, + }, + }, nil +} + func (s *PacketStore) GetAnalyticsSubpaths(region string, minLen, maxLen, limit int) map[string]interface{} { cacheKey := fmt.Sprintf("%s|%d|%d|%d", region, minLen, maxLen, limit)