mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-04-15 06:45:50 +00:00
## RF Health Dashboard — M1: Observer Metrics Storage, API & Small Multiples Grid Implements M1 of #600. ### What this does Adds a complete RF health monitoring pipeline: MQTT stats ingestion → SQLite storage → REST API → interactive dashboard with small multiples grid. ### Backend Changes **Ingestor (`cmd/ingestor/`)** - New `observer_metrics` table via migration system (`_migrations` pattern) - Parse `tx_air_secs`, `rx_air_secs`, `recv_errors` from MQTT status messages (same pattern as existing `noise_floor` and `battery_mv`) - `INSERT OR REPLACE` with timestamps rounded to nearest 5-min interval boundary (using ingestor wall clock, not observer timestamps) - Missing fields stored as NULLs — partial data is always better than no data - Configurable retention pruning: `retention.metricsDays` (default 30), runs on startup + every 24h **Server (`cmd/server/`)** - `GET /api/observers/{id}/metrics?since=...&until=...` — per-observer time-series data - `GET /api/observers/metrics/summary?window=24h` — fleet summary with current NF, avg/max NF, sample count - `parseWindowDuration()` supports `1h`, `24h`, `3d`, `7d`, `30d` etc. - Server-side metrics retention pruning (same config, staggered 2min after packet prune) ### Frontend Changes **RF Health tab (`public/analytics.js`, `public/style.css`)** - Small multiples grid showing all observers simultaneously — anomalies pop out visually - Per-observer cell: name, current NF value, battery voltage, sparkline, avg/max stats - NF status coloring: warning (amber) at ≥-100 dBm, critical (red) at ≥-85 dBm — text color only, no background fills - Click any cell → expanded detail view with full noise floor line chart - Reference lines with direct text labels (`-100 warning`, `-85 critical`) — not color bands - Min/max points labeled directly on the chart - Time range selector: preset buttons (1h/3h/6h/12h/24h/3d/7d/30d) + custom from/to datetime picker - Deep linking: `#/analytics?tab=rf-health&observer=...&range=...` - All charts use SVG, matching existing analytics.js patterns - Responsive: 3-4 columns on desktop, 1 on mobile ### Design Decisions (from spec) - Labels directly on data, not in legends - Reference lines with text labels, not color bands - Small multiples grid, not card+accordion (Tufte: instant visual fleet comparison) - Ingestor wall clock for all timestamps (observer clocks may drift) ### Tests Added **Ingestor tests:** - `TestRoundToInterval` — 5 cases for rounding to 5-min boundaries - `TestInsertMetrics` — basic insertion with all fields - `TestInsertMetricsIdempotent` — INSERT OR REPLACE deduplication - `TestInsertMetricsNullFields` — partial data with NULLs - `TestPruneOldMetrics` — retention pruning - `TestExtractObserverMetaNewFields` — parsing tx_air_secs, rx_air_secs, recv_errors **Server tests:** - `TestGetObserverMetrics` — time-series query with since/until filters, NULL handling - `TestGetMetricsSummary` — fleet summary aggregation - `TestObserverMetricsAPIEndpoints` — DB query verification - `TestMetricsAPIEndpoints` — HTTP endpoint response shape - `TestParseWindowDuration` — duration parsing for h/d formats ### Test Results ``` cd cmd/ingestor && go test ./... → PASS (26s) cd cmd/server && go test ./... → PASS (5s) ``` ### What's NOT in this PR (deferred to M2+) - Server-side delta computation for cumulative counters - Airtime charts (TX/RX percentage lines) - Channel quality chart (recv_error_rate) - Battery voltage chart - Reboot detection and chart annotations - Resolution downsampling (1h, 1d aggregates) - Pattern detection / automated diagnosis --------- Co-authored-by: you <you@example.com>
372 lines
11 KiB
Go
372 lines
11 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
_ "net/http/pprof"
|
|
"os"
|
|
"os/exec"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/gorilla/mux"
|
|
)
|
|
|
|
// Set via -ldflags at build time
|
|
var Version string
|
|
var Commit string
|
|
var BuildTime string
|
|
|
|
func resolveCommit() string {
|
|
if Commit != "" {
|
|
return Commit
|
|
}
|
|
// Try .git-commit file (baked by Docker / CI)
|
|
if data, err := os.ReadFile(".git-commit"); err == nil {
|
|
if c := strings.TrimSpace(string(data)); c != "" && c != "unknown" {
|
|
return c
|
|
}
|
|
}
|
|
// Try git rev-parse at runtime
|
|
if out, err := exec.Command("git", "rev-parse", "--short", "HEAD").Output(); err == nil {
|
|
return strings.TrimSpace(string(out))
|
|
}
|
|
return "unknown"
|
|
}
|
|
|
|
func resolveVersion() string {
|
|
if Version != "" {
|
|
return Version
|
|
}
|
|
return "unknown"
|
|
}
|
|
|
|
func resolveBuildTime() string {
|
|
if BuildTime != "" {
|
|
return BuildTime
|
|
}
|
|
return "unknown"
|
|
}
|
|
|
|
func main() {
|
|
// pprof profiling — off by default, enable with ENABLE_PPROF=true
|
|
if os.Getenv("ENABLE_PPROF") == "true" {
|
|
pprofPort := os.Getenv("PPROF_PORT")
|
|
if pprofPort == "" {
|
|
pprofPort = "6060"
|
|
}
|
|
go func() {
|
|
log.Printf("[pprof] profiling UI at http://localhost:%s/debug/pprof/", pprofPort)
|
|
if err := http.ListenAndServe(":"+pprofPort, nil); err != nil {
|
|
log.Printf("[pprof] failed to start: %v (non-fatal)", err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
var (
|
|
configDir string
|
|
port int
|
|
dbPath string
|
|
publicDir string
|
|
pollMs int
|
|
)
|
|
|
|
flag.StringVar(&configDir, "config-dir", ".", "Directory containing config.json")
|
|
flag.IntVar(&port, "port", 0, "HTTP port (overrides config)")
|
|
flag.StringVar(&dbPath, "db", "", "SQLite database path (overrides config/env)")
|
|
flag.StringVar(&publicDir, "public", "public", "Directory to serve static files from")
|
|
flag.IntVar(&pollMs, "poll-ms", 1000, "SQLite poll interval for WebSocket broadcast (ms)")
|
|
flag.Parse()
|
|
|
|
// Load config
|
|
cfg, err := LoadConfig(configDir)
|
|
if err != nil {
|
|
log.Printf("[config] warning: %v (using defaults)", err)
|
|
}
|
|
|
|
// CLI flags override config
|
|
if port > 0 {
|
|
cfg.Port = port
|
|
}
|
|
if cfg.Port == 0 {
|
|
cfg.Port = 3000
|
|
}
|
|
if dbPath != "" {
|
|
cfg.DBPath = dbPath
|
|
}
|
|
if cfg.APIKey == "" {
|
|
log.Printf("[security] WARNING: no apiKey configured — write endpoints are BLOCKED (set apiKey in config.json to enable them)")
|
|
}
|
|
|
|
// Resolve DB path
|
|
resolvedDB := cfg.ResolveDBPath(configDir)
|
|
log.Printf("[config] port=%d db=%s public=%s", cfg.Port, resolvedDB, publicDir)
|
|
|
|
// Open database
|
|
database, err := OpenDB(resolvedDB)
|
|
if err != nil {
|
|
log.Fatalf("[db] failed to open %s: %v", resolvedDB, err)
|
|
}
|
|
var dbCloseOnce sync.Once
|
|
dbClose := func() error {
|
|
var err error
|
|
dbCloseOnce.Do(func() { err = database.Close() })
|
|
return err
|
|
}
|
|
defer dbClose()
|
|
|
|
// Verify DB has expected tables
|
|
var tableName string
|
|
err = database.conn.QueryRow("SELECT name FROM sqlite_master WHERE type='table' AND name='transmissions'").Scan(&tableName)
|
|
if err == sql.ErrNoRows {
|
|
log.Fatalf("[db] table 'transmissions' not found — is this a CoreScope database?")
|
|
}
|
|
|
|
stats, err := database.GetStats()
|
|
if err != nil {
|
|
log.Printf("[db] warning: could not read stats: %v", err)
|
|
} else {
|
|
log.Printf("[db] transmissions=%d observations=%d nodes=%d observers=%d",
|
|
stats.TotalTransmissions, stats.TotalObservations, stats.TotalNodes, stats.TotalObservers)
|
|
}
|
|
|
|
// In-memory packet store
|
|
store := NewPacketStore(database, cfg.PacketStore)
|
|
if err := store.Load(); err != nil {
|
|
log.Fatalf("[store] failed to load: %v", err)
|
|
}
|
|
|
|
// Initialize persisted neighbor graph
|
|
dbPath = database.path
|
|
if err := ensureNeighborEdgesTable(dbPath); err != nil {
|
|
log.Printf("[neighbor] warning: could not create neighbor_edges table: %v", err)
|
|
}
|
|
// Add resolved_path column if missing.
|
|
// NOTE on startup ordering (review item #10): ensureResolvedPathColumn runs AFTER
|
|
// OpenDB/detectSchema, so db.hasResolvedPath will be false on first run with a
|
|
// pre-existing DB. This means Load() won't SELECT resolved_path from SQLite.
|
|
// That's OK: backfillResolvedPaths (below) computes and persists them in-memory
|
|
// AND to SQLite. On next restart, detectSchema finds the column and Load() reads it.
|
|
if err := ensureResolvedPathColumn(dbPath); err != nil {
|
|
log.Printf("[store] warning: could not add resolved_path column: %v", err)
|
|
} else {
|
|
database.hasResolvedPath = true // detectSchema ran before column was added; fix the flag
|
|
}
|
|
|
|
// Load or build neighbor graph
|
|
if neighborEdgesTableExists(database.conn) {
|
|
store.graph = loadNeighborEdgesFromDB(database.conn)
|
|
log.Printf("[neighbor] loaded persisted neighbor graph")
|
|
} else {
|
|
log.Printf("[neighbor] no persisted edges found, building from store...")
|
|
rw, rwErr := openRW(dbPath)
|
|
if rwErr == nil {
|
|
edgeCount := buildAndPersistEdges(store, rw)
|
|
rw.Close()
|
|
log.Printf("[neighbor] persisted %d edges", edgeCount)
|
|
}
|
|
store.graph = BuildFromStore(store)
|
|
}
|
|
|
|
// Backfill resolved_path for observations that don't have it yet
|
|
if backfilled := backfillResolvedPaths(store, dbPath); backfilled > 0 {
|
|
log.Printf("[store] backfilled resolved_path for %d observations", backfilled)
|
|
}
|
|
|
|
// Re-pick best observation now that resolved paths are populated
|
|
store.mu.Lock()
|
|
for _, tx := range store.packets {
|
|
pickBestObservation(tx)
|
|
}
|
|
store.mu.Unlock()
|
|
|
|
// WebSocket hub
|
|
hub := NewHub()
|
|
|
|
// HTTP server
|
|
srv := NewServer(database, cfg, hub)
|
|
srv.store = store
|
|
router := mux.NewRouter()
|
|
srv.RegisterRoutes(router)
|
|
|
|
// WebSocket endpoint
|
|
router.HandleFunc("/ws", hub.ServeWS)
|
|
|
|
// Static files + SPA fallback
|
|
absPublic, _ := filepath.Abs(publicDir)
|
|
if _, err := os.Stat(absPublic); err == nil {
|
|
fs := http.FileServer(http.Dir(absPublic))
|
|
router.PathPrefix("/").Handler(wsOrStatic(hub, spaHandler(absPublic, fs)))
|
|
log.Printf("[static] serving %s", absPublic)
|
|
} else {
|
|
log.Printf("[static] directory %s not found — API-only mode", absPublic)
|
|
router.PathPrefix("/").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "text/html")
|
|
w.Write([]byte(`<!DOCTYPE html><html><body><h1>CoreScope</h1><p>Frontend not found. API available at /api/</p></body></html>`))
|
|
})
|
|
}
|
|
|
|
// Start SQLite poller for WebSocket broadcast
|
|
poller := NewPoller(database, hub, time.Duration(pollMs)*time.Millisecond)
|
|
poller.store = store
|
|
go poller.Start()
|
|
|
|
// Start periodic eviction
|
|
stopEviction := store.StartEvictionTicker()
|
|
defer stopEviction()
|
|
|
|
// Auto-prune old packets if retention.packetDays is configured
|
|
var stopPrune func()
|
|
if cfg.Retention != nil && cfg.Retention.PacketDays > 0 {
|
|
days := cfg.Retention.PacketDays
|
|
pruneTicker := time.NewTicker(24 * time.Hour)
|
|
pruneDone := make(chan struct{})
|
|
stopPrune = func() {
|
|
pruneTicker.Stop()
|
|
close(pruneDone)
|
|
}
|
|
go func() {
|
|
time.Sleep(1 * time.Minute)
|
|
if n, err := database.PruneOldPackets(days); err != nil {
|
|
log.Printf("[prune] error: %v", err)
|
|
} else {
|
|
log.Printf("[prune] deleted %d transmissions older than %d days", n, days)
|
|
}
|
|
for {
|
|
select {
|
|
case <-pruneTicker.C:
|
|
if n, err := database.PruneOldPackets(days); err != nil {
|
|
log.Printf("[prune] error: %v", err)
|
|
} else {
|
|
log.Printf("[prune] deleted %d transmissions older than %d days", n, days)
|
|
}
|
|
case <-pruneDone:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
log.Printf("[prune] auto-prune enabled: packets older than %d days will be removed daily", days)
|
|
}
|
|
|
|
// Auto-prune old metrics
|
|
var stopMetricsPrune func()
|
|
{
|
|
metricsDays := cfg.MetricsRetentionDays()
|
|
metricsPruneTicker := time.NewTicker(24 * time.Hour)
|
|
metricsPruneDone := make(chan struct{})
|
|
stopMetricsPrune = func() {
|
|
metricsPruneTicker.Stop()
|
|
close(metricsPruneDone)
|
|
}
|
|
go func() {
|
|
time.Sleep(2 * time.Minute) // stagger after packet prune
|
|
database.PruneOldMetrics(metricsDays)
|
|
for {
|
|
select {
|
|
case <-metricsPruneTicker.C:
|
|
database.PruneOldMetrics(metricsDays)
|
|
case <-metricsPruneDone:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
log.Printf("[metrics-prune] auto-prune enabled: metrics older than %d days", metricsDays)
|
|
}
|
|
|
|
// Graceful shutdown
|
|
httpServer := &http.Server{
|
|
Addr: fmt.Sprintf(":%d", cfg.Port),
|
|
Handler: router,
|
|
ReadTimeout: 30 * time.Second,
|
|
WriteTimeout: 60 * time.Second,
|
|
IdleTimeout: 120 * time.Second,
|
|
}
|
|
|
|
go func() {
|
|
sigCh := make(chan os.Signal, 1)
|
|
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
|
sig := <-sigCh
|
|
log.Printf("[server] received %v, shutting down...", sig)
|
|
|
|
// 1. Stop accepting new WebSocket/poll data
|
|
poller.Stop()
|
|
|
|
// 1b. Stop auto-prune ticker
|
|
if stopPrune != nil {
|
|
stopPrune()
|
|
}
|
|
if stopMetricsPrune != nil {
|
|
stopMetricsPrune()
|
|
}
|
|
|
|
// 2. Gracefully drain HTTP connections (up to 15s)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel()
|
|
if err := httpServer.Shutdown(ctx); err != nil {
|
|
log.Printf("[server] HTTP shutdown error: %v", err)
|
|
}
|
|
|
|
// 3. Close WebSocket hub
|
|
hub.Close()
|
|
|
|
// 4. Close database (release SQLite WAL lock)
|
|
if err := dbClose(); err != nil {
|
|
log.Printf("[server] DB close error: %v", err)
|
|
}
|
|
log.Println("[server] shutdown complete")
|
|
}()
|
|
|
|
log.Printf("[server] CoreScope (Go) listening on http://localhost:%d", cfg.Port)
|
|
if err := httpServer.ListenAndServe(); err != http.ErrServerClosed {
|
|
log.Fatalf("[server] %v", err)
|
|
}
|
|
}
|
|
|
|
// spaHandler serves static files, falling back to index.html for SPA routes.
|
|
// It reads index.html once at creation time and replaces the __BUST__ placeholder
|
|
// with a Unix timestamp so browsers fetch fresh JS/CSS after each server restart.
|
|
func spaHandler(root string, fs http.Handler) http.Handler {
|
|
// Pre-process index.html: replace __BUST__ with a cache-bust timestamp
|
|
indexPath := filepath.Join(root, "index.html")
|
|
rawHTML, err := os.ReadFile(indexPath)
|
|
if err != nil {
|
|
log.Printf("[static] warning: could not read index.html for cache-bust: %v", err)
|
|
rawHTML = []byte("<!DOCTYPE html><html><body><h1>CoreScope</h1><p>index.html not found</p></body></html>")
|
|
}
|
|
bustValue := fmt.Sprintf("%d", time.Now().Unix())
|
|
indexHTML := []byte(strings.ReplaceAll(string(rawHTML), "__BUST__", bustValue))
|
|
log.Printf("[static] cache-bust value: %s", bustValue)
|
|
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
// Serve pre-processed index.html for root and /index.html
|
|
if r.URL.Path == "/" || r.URL.Path == "/index.html" {
|
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
|
w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
|
|
w.Write(indexHTML)
|
|
return
|
|
}
|
|
|
|
path := filepath.Join(root, r.URL.Path)
|
|
if _, err := os.Stat(path); os.IsNotExist(err) {
|
|
// SPA fallback — serve pre-processed index.html
|
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
|
w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
|
|
w.Write(indexHTML)
|
|
return
|
|
}
|
|
// Disable caching for JS/CSS/HTML
|
|
if filepath.Ext(path) == ".js" || filepath.Ext(path) == ".css" || filepath.Ext(path) == ".html" {
|
|
w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
|
|
}
|
|
fs.ServeHTTP(w, r)
|
|
})
|
|
}
|