From 742ed86596438ee2db40bb2f0a0080bb1474c475 Mon Sep 17 00:00:00 2001 From: Kpa-clawbot <259247574+Kpa-clawbot@users.noreply.github.com> Date: Fri, 27 Mar 2026 01:16:59 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20add=20Go=20web=20server=20(cmd/server/)?= =?UTF-8?q?=20=E2=80=94=20full=20API=20+=20WebSocket=20+=20static=20files?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 35+ REST endpoints matching Node.js server, WebSocket broadcast, static file serving with SPA fallback, config.json support. Uses modernc.org/sqlite (pure Go, no CGO required). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- cmd/server/config.go | 160 ++++++ cmd/server/db.go | 1093 +++++++++++++++++++++++++++++++++++++ cmd/server/go.mod | 21 + cmd/server/go.sum | 47 ++ cmd/server/main.go | 144 +++++ cmd/server/routes.go | 1146 +++++++++++++++++++++++++++++++++++++++ cmd/server/websocket.go | 186 +++++++ 7 files changed, 2797 insertions(+) create mode 100644 cmd/server/config.go create mode 100644 cmd/server/db.go create mode 100644 cmd/server/go.mod create mode 100644 cmd/server/go.sum create mode 100644 cmd/server/main.go create mode 100644 cmd/server/routes.go create mode 100644 cmd/server/websocket.go diff --git a/cmd/server/config.go b/cmd/server/config.go new file mode 100644 index 00000000..3085e116 --- /dev/null +++ b/cmd/server/config.go @@ -0,0 +1,160 @@ +package main + +import ( + "encoding/json" + "os" + "path/filepath" +) + +// Config mirrors the Node.js config.json structure (read-only fields). +type Config struct { + Port int `json:"port"` + APIKey string `json:"apiKey"` + DBPath string `json:"dbPath"` + + Branding map[string]interface{} `json:"branding"` + Theme map[string]interface{} `json:"theme"` + ThemeDark map[string]interface{} `json:"themeDark"` + NodeColors map[string]interface{} `json:"nodeColors"` + TypeColors map[string]interface{} `json:"typeColors"` + Home map[string]interface{} `json:"home"` + + MapDefaults struct { + Center []float64 `json:"center"` + Zoom int `json:"zoom"` + } `json:"mapDefaults"` + + Regions map[string]string `json:"regions"` + + Roles map[string]interface{} `json:"roles"` + HealthThresholds *HealthThresholds `json:"healthThresholds"` + Tiles map[string]interface{} `json:"tiles"` + SnrThresholds map[string]interface{} `json:"snrThresholds"` + DistThresholds map[string]interface{} `json:"distThresholds"` + MaxHopDist *float64 `json:"maxHopDist"` + Limits map[string]interface{} `json:"limits"` + PerfSlowMs *int `json:"perfSlowMs"` + WsReconnectMs *int `json:"wsReconnectMs"` + CacheInvalidMs *int `json:"cacheInvalidateMs"` + ExternalUrls map[string]interface{} `json:"externalUrls"` + + LiveMap struct { + PropagationBufferMs int `json:"propagationBufferMs"` + } `json:"liveMap"` + + CacheTTL map[string]interface{} `json:"cacheTTL"` +} + +type HealthThresholds struct { + InfraDegradedMs int `json:"infraDegradedMs"` + InfraSilentMs int `json:"infraSilentMs"` + NodeDegradedMs int `json:"nodeDegradedMs"` + NodeSilentMs int `json:"nodeSilentMs"` +} + +// ThemeFile mirrors theme.json overlay. +type ThemeFile struct { + Branding map[string]interface{} `json:"branding"` + Theme map[string]interface{} `json:"theme"` + ThemeDark map[string]interface{} `json:"themeDark"` + NodeColors map[string]interface{} `json:"nodeColors"` + TypeColors map[string]interface{} `json:"typeColors"` + Home map[string]interface{} `json:"home"` +} + +func LoadConfig(baseDirs ...string) (*Config, error) { + if len(baseDirs) == 0 { + baseDirs = []string{"."} + } + paths := make([]string, 0) + for _, d := range baseDirs { + paths = append(paths, filepath.Join(d, "config.json")) + paths = append(paths, filepath.Join(d, "data", "config.json")) + } + + cfg := &Config{Port: 3000} + for _, p := range paths { + data, err := os.ReadFile(p) + if err != nil { + continue + } + if err := json.Unmarshal(data, cfg); err != nil { + continue + } + return cfg, nil + } + return cfg, nil // defaults +} + +func LoadTheme(baseDirs ...string) *ThemeFile { + if len(baseDirs) == 0 { + baseDirs = []string{"."} + } + for _, d := range baseDirs { + for _, name := range []string{"theme.json"} { + p := filepath.Join(d, name) + data, err := os.ReadFile(p) + if err != nil { + p = filepath.Join(d, "data", name) + data, err = os.ReadFile(p) + if err != nil { + continue + } + } + var t ThemeFile + if json.Unmarshal(data, &t) == nil { + return &t + } + } + } + return &ThemeFile{} +} + +func (c *Config) GetHealthThresholds() HealthThresholds { + h := HealthThresholds{ + InfraDegradedMs: 86400000, + InfraSilentMs: 259200000, + NodeDegradedMs: 3600000, + NodeSilentMs: 86400000, + } + if c.HealthThresholds != nil { + if c.HealthThresholds.InfraDegradedMs > 0 { + h.InfraDegradedMs = c.HealthThresholds.InfraDegradedMs + } + if c.HealthThresholds.InfraSilentMs > 0 { + h.InfraSilentMs = c.HealthThresholds.InfraSilentMs + } + if c.HealthThresholds.NodeDegradedMs > 0 { + h.NodeDegradedMs = c.HealthThresholds.NodeDegradedMs + } + if c.HealthThresholds.NodeSilentMs > 0 { + h.NodeSilentMs = c.HealthThresholds.NodeSilentMs + } + } + return h +} + +// GetHealthMs returns degraded/silent thresholds for a given role. +func (h HealthThresholds) GetHealthMs(role string) (degradedMs, silentMs int) { + if role == "repeater" || role == "room" { + return h.InfraDegradedMs, h.InfraSilentMs + } + return h.NodeDegradedMs, h.NodeSilentMs +} + +func (c *Config) ResolveDBPath(baseDir string) string { + if c.DBPath != "" { + return c.DBPath + } + if v := os.Getenv("DB_PATH"); v != "" { + return v + } + return filepath.Join(baseDir, "data", "meshcore.db") +} + +func (c *Config) PropagationBufferMs() int { + if c.LiveMap.PropagationBufferMs > 0 { + return c.LiveMap.PropagationBufferMs + } + return 5000 +} diff --git a/cmd/server/db.go b/cmd/server/db.go new file mode 100644 index 00000000..9fc1e6e7 --- /dev/null +++ b/cmd/server/db.go @@ -0,0 +1,1093 @@ +package main + +import ( + "database/sql" + "encoding/json" + "fmt" + "math" + "strings" + "time" + + _ "modernc.org/sqlite" +) + +// DB wraps a read-only connection to the MeshCore SQLite database. +type DB struct { + conn *sql.DB +} + +// OpenDB opens a read-only SQLite connection with WAL mode. +func OpenDB(path string) (*DB, error) { + dsn := fmt.Sprintf("file:%s?mode=ro&_journal_mode=WAL&_busy_timeout=5000", path) + conn, err := sql.Open("sqlite", dsn) + if err != nil { + return nil, err + } + conn.SetMaxOpenConns(4) + conn.SetMaxIdleConns(2) + if err := conn.Ping(); err != nil { + conn.Close() + return nil, fmt.Errorf("ping failed: %w", err) + } + return &DB{conn: conn}, nil +} + +func (db *DB) Close() error { + return db.conn.Close() +} + +// Node represents a row from the nodes table. +type Node struct { + PublicKey string `json:"public_key"` + Name *string `json:"name"` + Role *string `json:"role"` + Lat *float64 `json:"lat"` + Lon *float64 `json:"lon"` + LastSeen *string `json:"last_seen"` + FirstSeen *string `json:"first_seen"` + AdvertCount int `json:"advert_count"` +} + +// Observer represents a row from the observers table. +type Observer struct { + ID string `json:"id"` + Name *string `json:"name"` + IATA *string `json:"iata"` + LastSeen *string `json:"last_seen"` + FirstSeen *string `json:"first_seen"` + PacketCount int `json:"packet_count"` + Model *string `json:"model"` + Firmware *string `json:"firmware"` + ClientVersion *string `json:"client_version"` + Radio *string `json:"radio"` + BatteryMv *int `json:"battery_mv"` + UptimeSecs *int `json:"uptime_secs"` + NoiseFloor *int `json:"noise_floor"` +} + +// Transmission represents a row from the transmissions table. +type Transmission struct { + ID int `json:"id"` + RawHex *string `json:"raw_hex"` + Hash string `json:"hash"` + FirstSeen string `json:"first_seen"` + RouteType *int `json:"route_type"` + PayloadType *int `json:"payload_type"` + PayloadVersion *int `json:"payload_version"` + DecodedJSON *string `json:"decoded_json"` + CreatedAt *string `json:"created_at"` +} + +// Observation (from packets_v view). +type Observation struct { + ID int `json:"id"` + RawHex *string `json:"raw_hex"` + Timestamp *string `json:"timestamp"` + ObserverID *string `json:"observer_id"` + ObserverName *string `json:"observer_name"` + Direction *string `json:"direction"` + SNR *float64 `json:"snr"` + RSSI *float64 `json:"rssi"` + Score *int `json:"score"` + Hash *string `json:"hash"` + RouteType *int `json:"route_type"` + PayloadType *int `json:"payload_type"` + PayloadVer *int `json:"payload_version"` + PathJSON *string `json:"path_json"` + DecodedJSON *string `json:"decoded_json"` + CreatedAt *string `json:"created_at"` +} + +// Stats holds system statistics. +type Stats struct { + TotalPackets int `json:"totalPackets"` + TotalTransmissions int `json:"totalTransmissions"` + TotalObservations int `json:"totalObservations"` + TotalNodes int `json:"totalNodes"` + TotalObservers int `json:"totalObservers"` + PacketsLastHour int `json:"packetsLastHour"` +} + +// GetStats returns aggregate counts. +func (db *DB) GetStats() (*Stats, error) { + s := &Stats{} + err := db.conn.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&s.TotalTransmissions) + if err != nil { + return nil, err + } + s.TotalPackets = s.TotalTransmissions + + db.conn.QueryRow("SELECT COUNT(*) FROM observations").Scan(&s.TotalObservations) + db.conn.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&s.TotalNodes) + db.conn.QueryRow("SELECT COUNT(*) FROM observers").Scan(&s.TotalObservers) + + oneHourAgo := time.Now().Add(-1 * time.Hour).Unix() + db.conn.QueryRow("SELECT COUNT(*) FROM observations WHERE timestamp > ?", oneHourAgo).Scan(&s.PacketsLastHour) + + return s, nil +} + +// GetRoleCounts returns count per role. +func (db *DB) GetRoleCounts() map[string]int { + counts := map[string]int{} + for _, role := range []string{"repeater", "room", "companion", "sensor"} { + var c int + db.conn.QueryRow("SELECT COUNT(*) FROM nodes WHERE role = ?", role).Scan(&c) + counts[role+"s"] = c + } + return counts +} + +// PacketQuery holds filter params for packet listing. +type PacketQuery struct { + Limit int + Offset int + Type *int + Route *int + Observer string + Hash string + Since string + Until string + Region string + Node string + Order string // ASC or DESC +} + +// PacketResult wraps paginated packet list. +type PacketResult struct { + Packets []map[string]interface{} `json:"packets"` + Total int `json:"total"` +} + +// QueryPackets returns paginated, filtered packets from packets_v view. +func (db *DB) QueryPackets(q PacketQuery) (*PacketResult, error) { + if q.Limit <= 0 { + q.Limit = 50 + } + if q.Order == "" { + q.Order = "DESC" + } + + where, args := db.buildPacketWhere(q) + w := "" + if len(where) > 0 { + w = "WHERE " + strings.Join(where, " AND ") + } + + var total int + countSQL := fmt.Sprintf("SELECT COUNT(*) FROM packets_v %s", w) + if err := db.conn.QueryRow(countSQL, args...).Scan(&total); err != nil { + return nil, err + } + + querySQL := fmt.Sprintf("SELECT id, raw_hex, timestamp, observer_id, observer_name, direction, snr, rssi, score, hash, route_type, payload_type, payload_version, path_json, decoded_json, created_at FROM packets_v %s ORDER BY timestamp %s LIMIT ? OFFSET ?", w, q.Order) + args = append(args, q.Limit, q.Offset) + + rows, err := db.conn.Query(querySQL, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + packets := make([]map[string]interface{}, 0) + for rows.Next() { + p := scanPacketRow(rows) + if p != nil { + packets = append(packets, p) + } + } + + return &PacketResult{Packets: packets, Total: total}, nil +} + +// QueryGroupedPackets groups by hash (transmissions). +func (db *DB) QueryGroupedPackets(q PacketQuery) (*PacketResult, error) { + if q.Limit <= 0 { + q.Limit = 50 + } + where, args := db.buildPacketWhere(q) + w := "" + if len(where) > 0 { + w = "WHERE " + strings.Join(where, " AND ") + } + + qry := fmt.Sprintf(`SELECT hash, COUNT(*) as count, COUNT(DISTINCT observer_id) as observer_count, + MAX(timestamp) as latest, MIN(observer_id) as observer_id, MIN(observer_name) as observer_name, + MIN(path_json) as path_json, MIN(payload_type) as payload_type, MIN(route_type) as route_type, + MIN(raw_hex) as raw_hex, MIN(decoded_json) as decoded_json, MIN(snr) as snr, MIN(rssi) as rssi + FROM packets_v %s GROUP BY hash ORDER BY latest DESC LIMIT ? OFFSET ?`, w) + args = append(args, q.Limit, q.Offset) + + rows, err := db.conn.Query(qry, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + packets := make([]map[string]interface{}, 0) + for rows.Next() { + var hash, latest, observerID, observerName, pathJSON, rawHex, decodedJSON sql.NullString + var count, observerCount int + var payloadType, routeType sql.NullInt64 + var snr, rssi sql.NullFloat64 + if err := rows.Scan(&hash, &count, &observerCount, &latest, &observerID, &observerName, &pathJSON, &payloadType, &routeType, &rawHex, &decodedJSON, &snr, &rssi); err != nil { + continue + } + p := map[string]interface{}{ + "hash": nullStr(hash), + "count": count, + "observer_count": observerCount, + "observation_count": count, + "latest": nullStr(latest), + "first_seen": nullStr(latest), + "observer_id": nullStr(observerID), + "observer_name": nullStr(observerName), + "path_json": nullStr(pathJSON), + "payload_type": nullInt(payloadType), + "route_type": nullInt(routeType), + "raw_hex": nullStr(rawHex), + "decoded_json": nullStr(decodedJSON), + "snr": nullFloat(snr), + "rssi": nullFloat(rssi), + } + packets = append(packets, p) + } + + var total int + countSQL := fmt.Sprintf("SELECT COUNT(DISTINCT hash) FROM packets_v %s", w) + baseArgs := args[:len(args)-2] // remove LIMIT/OFFSET + db.conn.QueryRow(countSQL, baseArgs...).Scan(&total) + + return &PacketResult{Packets: packets, Total: total}, nil +} + +func (db *DB) buildPacketWhere(q PacketQuery) ([]string, []interface{}) { + var where []string + var args []interface{} + + if q.Type != nil { + where = append(where, "payload_type = ?") + args = append(args, *q.Type) + } + if q.Route != nil { + where = append(where, "route_type = ?") + args = append(args, *q.Route) + } + if q.Observer != "" { + where = append(where, "observer_id = ?") + args = append(args, q.Observer) + } + if q.Hash != "" { + where = append(where, "hash = ?") + args = append(args, strings.ToLower(q.Hash)) + } + if q.Since != "" { + where = append(where, "timestamp > ?") + args = append(args, q.Since) + } + if q.Until != "" { + where = append(where, "timestamp < ?") + args = append(args, q.Until) + } + if q.Region != "" { + where = append(where, "observer_id IN (SELECT id FROM observers WHERE iata = ?)") + args = append(args, q.Region) + } + if q.Node != "" { + pk := db.resolveNodePubkey(q.Node) + where = append(where, "decoded_json LIKE ?") + args = append(args, "%"+pk+"%") + } + return where, args +} + +func (db *DB) resolveNodePubkey(nodeIDOrName string) string { + var pk string + err := db.conn.QueryRow("SELECT public_key FROM nodes WHERE public_key = ? OR name = ? LIMIT 1", nodeIDOrName, nodeIDOrName).Scan(&pk) + if err != nil { + return nodeIDOrName + } + return pk +} + +// GetPacketByID fetches a single packet/observation. +func (db *DB) GetPacketByID(id int) (map[string]interface{}, error) { + rows, err := db.conn.Query("SELECT id, raw_hex, timestamp, observer_id, observer_name, direction, snr, rssi, score, hash, route_type, payload_type, payload_version, path_json, decoded_json, created_at FROM packets_v WHERE id = ?", id) + if err != nil { + return nil, err + } + defer rows.Close() + if rows.Next() { + return scanPacketRow(rows), nil + } + return nil, nil +} + +// GetTransmissionByID fetches from transmissions table. +func (db *DB) GetTransmissionByID(id int) (map[string]interface{}, error) { + var txID int + var rawHex, hash, firstSeen, decodedJSON, createdAt sql.NullString + var routeType, payloadType, payloadVersion sql.NullInt64 + err := db.conn.QueryRow("SELECT id, raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, created_at FROM transmissions WHERE id = ?", id). + Scan(&txID, &rawHex, &hash, &firstSeen, &routeType, &payloadType, &payloadVersion, &decodedJSON, &createdAt) + if err != nil { + return nil, err + } + return map[string]interface{}{ + "id": txID, + "raw_hex": nullStr(rawHex), + "hash": nullStr(hash), + "first_seen": nullStr(firstSeen), + "timestamp": nullStr(firstSeen), + "route_type": nullInt(routeType), + "payload_type": nullInt(payloadType), + "payload_version": nullInt(payloadVersion), + "decoded_json": nullStr(decodedJSON), + "created_at": nullStr(createdAt), + }, nil +} + +// GetPacketByHash fetches a transmission by content hash. +func (db *DB) GetPacketByHash(hash string) (map[string]interface{}, error) { + var txID int + var rawHex, h, firstSeen, decodedJSON, createdAt sql.NullString + var routeType, payloadType, payloadVersion sql.NullInt64 + err := db.conn.QueryRow("SELECT id, raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, created_at FROM transmissions WHERE hash = ?", strings.ToLower(hash)). + Scan(&txID, &rawHex, &h, &firstSeen, &routeType, &payloadType, &payloadVersion, &decodedJSON, &createdAt) + if err != nil { + return nil, err + } + return map[string]interface{}{ + "id": txID, + "raw_hex": nullStr(rawHex), + "hash": nullStr(h), + "first_seen": nullStr(firstSeen), + "timestamp": nullStr(firstSeen), + "route_type": nullInt(routeType), + "payload_type": nullInt(payloadType), + "payload_version": nullInt(payloadVersion), + "decoded_json": nullStr(decodedJSON), + "created_at": nullStr(createdAt), + }, nil +} + +// GetObservationsForHash returns all observations for a given hash. +func (db *DB) GetObservationsForHash(hash string) ([]map[string]interface{}, error) { + rows, err := db.conn.Query(`SELECT id, raw_hex, timestamp, observer_id, observer_name, direction, snr, rssi, score, hash, route_type, payload_type, payload_version, path_json, decoded_json, created_at + FROM packets_v WHERE hash = ? ORDER BY timestamp DESC`, strings.ToLower(hash)) + if err != nil { + return nil, err + } + defer rows.Close() + + result := make([]map[string]interface{}, 0) + for rows.Next() { + p := scanPacketRow(rows) + if p != nil { + result = append(result, p) + } + } + return result, nil +} + +// GetNodes returns filtered, paginated node list. +func (db *DB) GetNodes(limit, offset int, role, search, before, lastHeard, sortBy, region string) ([]map[string]interface{}, int, map[string]int, error) { + var where []string + var args []interface{} + + if role != "" { + where = append(where, "role = ?") + args = append(args, role) + } + if search != "" { + where = append(where, "name LIKE ?") + args = append(args, "%"+search+"%") + } + if before != "" { + where = append(where, "first_seen <= ?") + args = append(args, before) + } + if lastHeard != "" { + durations := map[string]int64{ + "1h": 3600000, "6h": 21600000, "24h": 86400000, + "7d": 604800000, "30d": 2592000000, + } + if ms, ok := durations[lastHeard]; ok { + since := time.Now().Add(-time.Duration(ms) * time.Millisecond).Format(time.RFC3339) + where = append(where, "last_seen > ?") + args = append(args, since) + } + } + + w := "" + if len(where) > 0 { + w = "WHERE " + strings.Join(where, " AND ") + } + + sortMap := map[string]string{ + "name": "name ASC", "lastSeen": "last_seen DESC", "packetCount": "advert_count DESC", + } + order := "last_seen DESC" + if s, ok := sortMap[sortBy]; ok { + order = s + } + + if limit <= 0 { + limit = 50 + } + + var total int + db.conn.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM nodes %s", w), args...).Scan(&total) + + querySQL := fmt.Sprintf("SELECT public_key, name, role, lat, lon, last_seen, first_seen, advert_count FROM nodes %s ORDER BY %s LIMIT ? OFFSET ?", w, order) + qArgs := append(args, limit, offset) + + rows, err := db.conn.Query(querySQL, qArgs...) + if err != nil { + return nil, 0, nil, err + } + defer rows.Close() + + nodes := make([]map[string]interface{}, 0) + for rows.Next() { + n := scanNodeRow(rows) + if n != nil { + nodes = append(nodes, n) + } + } + + counts := db.GetRoleCounts() + return nodes, total, counts, nil +} + +// SearchNodes searches nodes by name or pubkey prefix. +func (db *DB) SearchNodes(query string, limit int) ([]map[string]interface{}, error) { + if limit <= 0 { + limit = 10 + } + rows, err := db.conn.Query(`SELECT public_key, name, role, lat, lon, last_seen, first_seen, advert_count + FROM nodes WHERE name LIKE ? OR public_key LIKE ? ORDER BY last_seen DESC LIMIT ?`, + "%"+query+"%", query+"%", limit) + if err != nil { + return nil, err + } + defer rows.Close() + + nodes := make([]map[string]interface{}, 0) + for rows.Next() { + n := scanNodeRow(rows) + if n != nil { + nodes = append(nodes, n) + } + } + return nodes, nil +} + +// GetNodeByPubkey returns a single node. +func (db *DB) GetNodeByPubkey(pubkey string) (map[string]interface{}, error) { + rows, err := db.conn.Query("SELECT public_key, name, role, lat, lon, last_seen, first_seen, advert_count FROM nodes WHERE public_key = ?", pubkey) + if err != nil { + return nil, err + } + defer rows.Close() + if rows.Next() { + return scanNodeRow(rows), nil + } + return nil, nil +} + +// GetRecentPacketsForNode returns recent packets referencing a node. +func (db *DB) GetRecentPacketsForNode(pubkey string, name string, limit int) ([]map[string]interface{}, error) { + if limit <= 0 { + limit = 20 + } + pk := "%" + pubkey + "%" + np := "%" + name + "%" + rows, err := db.conn.Query(`SELECT id, raw_hex, timestamp, observer_id, observer_name, direction, snr, rssi, score, hash, route_type, payload_type, payload_version, path_json, decoded_json, created_at + FROM packets_v WHERE decoded_json LIKE ? OR decoded_json LIKE ? + ORDER BY timestamp DESC LIMIT ?`, pk, np, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + packets := make([]map[string]interface{}, 0) + for rows.Next() { + p := scanPacketRow(rows) + if p != nil { + packets = append(packets, p) + } + } + return packets, nil +} + +// GetObservers returns all observers sorted by last_seen DESC. +func (db *DB) GetObservers() ([]Observer, error) { + rows, err := db.conn.Query("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor FROM observers ORDER BY last_seen DESC") + if err != nil { + return nil, err + } + defer rows.Close() + + var observers []Observer + for rows.Next() { + var o Observer + if err := rows.Scan(&o.ID, &o.Name, &o.IATA, &o.LastSeen, &o.FirstSeen, &o.PacketCount, &o.Model, &o.Firmware, &o.ClientVersion, &o.Radio, &o.BatteryMv, &o.UptimeSecs, &o.NoiseFloor); err != nil { + continue + } + observers = append(observers, o) + } + return observers, nil +} + +// GetObserverByID returns a single observer. +func (db *DB) GetObserverByID(id string) (*Observer, error) { + var o Observer + err := db.conn.QueryRow("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor FROM observers WHERE id = ?", id). + Scan(&o.ID, &o.Name, &o.IATA, &o.LastSeen, &o.FirstSeen, &o.PacketCount, &o.Model, &o.Firmware, &o.ClientVersion, &o.Radio, &o.BatteryMv, &o.UptimeSecs, &o.NoiseFloor) + if err != nil { + return nil, err + } + return &o, nil +} + +// GetObserverIdsForRegion returns observer IDs for given IATA codes. +func (db *DB) GetObserverIdsForRegion(regionParam string) ([]string, error) { + if regionParam == "" { + return nil, nil + } + codes := strings.Split(regionParam, ",") + placeholders := make([]string, len(codes)) + args := make([]interface{}, len(codes)) + for i, c := range codes { + placeholders[i] = "?" + args[i] = strings.TrimSpace(c) + } + rows, err := db.conn.Query(fmt.Sprintf("SELECT id FROM observers WHERE iata IN (%s)", strings.Join(placeholders, ",")), args...) + if err != nil { + return nil, err + } + defer rows.Close() + var ids []string + for rows.Next() { + var id string + rows.Scan(&id) + ids = append(ids, id) + } + return ids, nil +} + +// GetDistinctIATAs returns all distinct IATA codes from observers. +func (db *DB) GetDistinctIATAs() ([]string, error) { + rows, err := db.conn.Query("SELECT DISTINCT iata FROM observers WHERE iata IS NOT NULL") + if err != nil { + return nil, err + } + defer rows.Close() + var codes []string + for rows.Next() { + var code string + rows.Scan(&code) + codes = append(codes, code) + } + return codes, nil +} + +// GetNodeHealth returns health info for a node (observers, stats, recent packets). +func (db *DB) GetNodeHealth(pubkey string) (map[string]interface{}, error) { + node, err := db.GetNodeByPubkey(pubkey) + if err != nil || node == nil { + return nil, err + } + + name := "" + if n, ok := node["name"]; ok && n != nil { + name = fmt.Sprintf("%v", n) + } + + pk := "%" + pubkey + "%" + np := "%" + name + "%" + whereClause := "decoded_json LIKE ? OR decoded_json LIKE ?" + if name == "" { + whereClause = "decoded_json LIKE ?" + np = pk + } + + todayStart := time.Now().UTC().Truncate(24 * time.Hour).Format(time.RFC3339) + + // Observers + observerSQL := fmt.Sprintf(`SELECT observer_id, observer_name, AVG(snr) as avgSnr, AVG(rssi) as avgRssi, COUNT(*) as packetCount + FROM packets_v WHERE (%s) AND observer_id IS NOT NULL GROUP BY observer_id ORDER BY packetCount DESC`, whereClause) + oRows, err := db.conn.Query(observerSQL, pk, np) + if err != nil { + return nil, err + } + defer oRows.Close() + + observers := make([]map[string]interface{}, 0) + for oRows.Next() { + var obsID, obsName sql.NullString + var avgSnr, avgRssi sql.NullFloat64 + var pktCount int + oRows.Scan(&obsID, &obsName, &avgSnr, &avgRssi, &pktCount) + observers = append(observers, map[string]interface{}{ + "observer_id": nullStr(obsID), + "observer_name": nullStr(obsName), + "avgSnr": nullFloat(avgSnr), + "avgRssi": nullFloat(avgRssi), + "packetCount": pktCount, + }) + } + + // Stats + var packetsToday, totalPackets int + db.conn.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM packets_v WHERE (%s) AND timestamp > ?", whereClause), pk, np, todayStart).Scan(&packetsToday) + db.conn.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM packets_v WHERE (%s)", whereClause), pk, np).Scan(&totalPackets) + + var avgSnr sql.NullFloat64 + db.conn.QueryRow(fmt.Sprintf("SELECT AVG(snr) FROM packets_v WHERE (%s)", whereClause), pk, np).Scan(&avgSnr) + + var lastHeard sql.NullString + db.conn.QueryRow(fmt.Sprintf("SELECT MAX(timestamp) FROM packets_v WHERE (%s)", whereClause), pk, np).Scan(&lastHeard) + + // Avg hops + hRows, _ := db.conn.Query(fmt.Sprintf("SELECT path_json FROM packets_v WHERE (%s) AND path_json IS NOT NULL", whereClause), pk, np) + totalHops, hopCount := 0, 0 + if hRows != nil { + defer hRows.Close() + for hRows.Next() { + var pj sql.NullString + hRows.Scan(&pj) + if pj.Valid { + var hops []interface{} + if json.Unmarshal([]byte(pj.String), &hops) == nil { + totalHops += len(hops) + hopCount++ + } + } + } + } + avgHops := 0 + if hopCount > 0 { + avgHops = int(math.Round(float64(totalHops) / float64(hopCount))) + } + + // Recent packets + recentPackets, _ := db.GetRecentPacketsForNode(pubkey, name, 20) + + return map[string]interface{}{ + "node": node, + "observers": observers, + "stats": map[string]interface{}{ + "totalTransmissions": totalPackets, + "totalObservations": totalPackets, + "totalPackets": totalPackets, + "packetsToday": packetsToday, + "avgSnr": nullFloat(avgSnr), + "avgHops": avgHops, + "lastHeard": nullStr(lastHeard), + }, + "recentPackets": recentPackets, + }, nil +} + +// GetNetworkStatus returns overall network health status. +func (db *DB) GetNetworkStatus(healthThresholds HealthThresholds) (map[string]interface{}, error) { + rows, err := db.conn.Query("SELECT public_key, name, role, last_seen FROM nodes") + if err != nil { + return nil, err + } + defer rows.Close() + + now := time.Now().UnixMilli() + active, degraded, silent, total := 0, 0, 0, 0 + roleCounts := map[string]int{} + + for rows.Next() { + var pk string + var name, role, lastSeen sql.NullString + rows.Scan(&pk, &name, &role, &lastSeen) + total++ + r := "unknown" + if role.Valid { + r = role.String + } + roleCounts[r]++ + + age := int64(math.MaxInt64) + if lastSeen.Valid { + if t, err := time.Parse(time.RFC3339, lastSeen.String); err == nil { + age = now - t.UnixMilli() + } else if t, err := time.Parse("2006-01-02 15:04:05", lastSeen.String); err == nil { + age = now - t.UnixMilli() + } + } + degradedMs, silentMs := healthThresholds.GetHealthMs(r) + if age < int64(degradedMs) { + active++ + } else if age < int64(silentMs) { + degraded++ + } else { + silent++ + } + } + + return map[string]interface{}{ + "total": total, "active": active, "degraded": degraded, "silent": silent, + "roleCounts": roleCounts, + }, nil +} + +// GetTraces returns observations for a hash. +func (db *DB) GetTraces(hash string) ([]map[string]interface{}, error) { + rows, err := db.conn.Query(`SELECT observer_id, observer_name, timestamp, snr, rssi, path_json + FROM packets_v WHERE hash = ? ORDER BY timestamp ASC`, strings.ToLower(hash)) + if err != nil { + return nil, err + } + defer rows.Close() + var traces []map[string]interface{} + for rows.Next() { + var obsID, obsName, ts, pathJSON sql.NullString + var snr, rssi sql.NullFloat64 + rows.Scan(&obsID, &obsName, &ts, &snr, &rssi, &pathJSON) + traces = append(traces, map[string]interface{}{ + "observer": nullStr(obsID), + "observer_name": nullStr(obsName), + "time": nullStr(ts), + "snr": nullFloat(snr), + "rssi": nullFloat(rssi), + "path_json": nullStr(pathJSON), + }) + } + if traces == nil { + traces = make([]map[string]interface{}, 0) + } + return traces, nil +} + +// GetChannels returns channel list from GRP_TXT packets. +func (db *DB) GetChannels() ([]map[string]interface{}, error) { + rows, err := db.conn.Query(`SELECT decoded_json, timestamp FROM packets_v WHERE payload_type = 5 ORDER BY timestamp ASC`) + if err != nil { + return nil, err + } + defer rows.Close() + + channelMap := map[string]map[string]interface{}{} + for rows.Next() { + var dj, ts sql.NullString + rows.Scan(&dj, &ts) + if !dj.Valid { + continue + } + var decoded map[string]interface{} + if json.Unmarshal([]byte(dj.String), &decoded) != nil { + continue + } + dtype, _ := decoded["type"].(string) + if dtype != "CHAN" { + continue + } + channelName, _ := decoded["channel"].(string) + if channelName == "" { + channelName = "unknown" + } + key := channelName + + ch, exists := channelMap[key] + if !exists { + ch = map[string]interface{}{ + "hash": key, "name": channelName, + "lastMessage": nil, "lastSender": nil, + "messageCount": 0, "lastActivity": nullStr(ts), + } + channelMap[key] = ch + } + ch["messageCount"] = ch["messageCount"].(int) + 1 + if ts.Valid { + ch["lastActivity"] = ts.String + } + if text, ok := decoded["text"].(string); ok && text != "" { + idx := strings.Index(text, ": ") + if idx > 0 { + ch["lastMessage"] = text[idx+2:] + } else { + ch["lastMessage"] = text + } + if sender, ok := decoded["sender"].(string); ok { + ch["lastSender"] = sender + } + } + } + + channels := make([]map[string]interface{}, 0, len(channelMap)) + for _, ch := range channelMap { + channels = append(channels, ch) + } + return channels, nil +} + +// GetChannelMessages returns messages for a specific channel. +func (db *DB) GetChannelMessages(channelHash string, limit, offset int) ([]map[string]interface{}, int, error) { + if limit <= 0 { + limit = 100 + } + rows, err := db.conn.Query(`SELECT id, hash, decoded_json, timestamp, observer_id, observer_name, snr, path_json + FROM packets_v WHERE payload_type = 5 ORDER BY timestamp ASC`) + if err != nil { + return nil, 0, err + } + defer rows.Close() + + type msg struct { + Data map[string]interface{} + Repeats int + } + msgMap := map[string]*msg{} + var msgOrder []string + + for rows.Next() { + var pktID int + var pktHash, dj, ts, obsID, obsName, pathJSON sql.NullString + var snr sql.NullFloat64 + rows.Scan(&pktID, &pktHash, &dj, &ts, &obsID, &obsName, &snr, &pathJSON) + if !dj.Valid { + continue + } + var decoded map[string]interface{} + if json.Unmarshal([]byte(dj.String), &decoded) != nil { + continue + } + dtype, _ := decoded["type"].(string) + if dtype != "CHAN" { + continue + } + ch, _ := decoded["channel"].(string) + if ch == "" { + ch = "unknown" + } + if ch != channelHash { + continue + } + + text, _ := decoded["text"].(string) + sender, _ := decoded["sender"].(string) + if sender == "" && text != "" { + idx := strings.Index(text, ": ") + if idx > 0 && idx < 50 { + sender = text[:idx] + } + } + + dedupeKey := fmt.Sprintf("%s:%s", sender, nullStr(pktHash)) + + if existing, ok := msgMap[dedupeKey]; ok { + existing.Repeats++ + } else { + displaySender := sender + displayText := text + if text != "" { + idx := strings.Index(text, ": ") + if idx > 0 && idx < 50 { + displaySender = text[:idx] + displayText = text[idx+2:] + } + } + + var hops int + if pathJSON.Valid { + var h []interface{} + if json.Unmarshal([]byte(pathJSON.String), &h) == nil { + hops = len(h) + } + } + + senderTs, _ := decoded["sender_timestamp"] + m := &msg{ + Data: map[string]interface{}{ + "sender": displaySender, + "text": displayText, + "timestamp": nullStr(ts), + "sender_timestamp": senderTs, + "packetId": pktID, + "packetHash": nullStr(pktHash), + "repeats": 1, + "observers": []string{}, + "hops": hops, + "snr": nullFloat(snr), + }, + Repeats: 1, + } + if obsName.Valid { + m.Data["observers"] = []string{obsName.String} + } else if obsID.Valid { + m.Data["observers"] = []string{obsID.String} + } + msgMap[dedupeKey] = m + msgOrder = append(msgOrder, dedupeKey) + } + } + + total := len(msgOrder) + // Return latest messages (tail) + start := total - limit - offset + if start < 0 { + start = 0 + } + end := total - offset + if end < 0 { + end = 0 + } + if end > total { + end = total + } + + messages := make([]map[string]interface{}, 0) + for i := start; i < end; i++ { + key := msgOrder[i] + m := msgMap[key] + m.Data["repeats"] = m.Repeats + messages = append(messages, m.Data) + } + + return messages, total, nil +} + +// GetTimestamps returns packet timestamps since a given time. +func (db *DB) GetTimestamps(since string) ([]string, error) { + rows, err := db.conn.Query("SELECT timestamp FROM packets_v WHERE timestamp > ? ORDER BY timestamp ASC", since) + if err != nil { + return nil, err + } + defer rows.Close() + var timestamps []string + for rows.Next() { + var ts string + rows.Scan(&ts) + timestamps = append(timestamps, ts) + } + if timestamps == nil { + timestamps = []string{} + } + return timestamps, nil +} + +// GetNodeCountsForPacket returns observation count for a hash. +func (db *DB) GetObservationCount(hash string) int { + var count int + db.conn.QueryRow("SELECT COUNT(*) FROM packets_v WHERE hash = ?", strings.ToLower(hash)).Scan(&count) + return count +} + +// GetNewTransmissionsSince returns new transmissions after a given ID for WebSocket polling. +func (db *DB) GetNewTransmissionsSince(lastID int, limit int) ([]map[string]interface{}, error) { + if limit <= 0 { + limit = 100 + } + rows, err := db.conn.Query(`SELECT t.id, t.raw_hex, t.hash, t.first_seen, t.route_type, t.payload_type, t.payload_version, t.decoded_json + FROM transmissions t WHERE t.id > ? ORDER BY t.id ASC LIMIT ?`, lastID, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + var result []map[string]interface{} + for rows.Next() { + var id int + var rawHex, hash, firstSeen, decodedJSON sql.NullString + var routeType, payloadType, payloadVersion sql.NullInt64 + rows.Scan(&id, &rawHex, &hash, &firstSeen, &routeType, &payloadType, &payloadVersion, &decodedJSON) + result = append(result, map[string]interface{}{ + "id": id, + "raw_hex": nullStr(rawHex), + "hash": nullStr(hash), + "first_seen": nullStr(firstSeen), + "route_type": nullInt(routeType), + "payload_type": nullInt(payloadType), + "payload_version": nullInt(payloadVersion), + "decoded_json": nullStr(decodedJSON), + }) + } + return result, nil +} + +// GetMaxTransmissionID returns the current max ID for polling. +func (db *DB) GetMaxTransmissionID() int { + var maxID int + db.conn.QueryRow("SELECT COALESCE(MAX(id), 0) FROM transmissions").Scan(&maxID) + return maxID +} + +// --- Helpers --- + +func scanPacketRow(rows *sql.Rows) map[string]interface{} { + var id int + var rawHex, ts, obsID, obsName, direction, hash, pathJSON, decodedJSON, createdAt sql.NullString + var snr, rssi sql.NullFloat64 + var score, routeType, payloadType, payloadVersion sql.NullInt64 + + if err := rows.Scan(&id, &rawHex, &ts, &obsID, &obsName, &direction, &snr, &rssi, &score, &hash, &routeType, &payloadType, &payloadVersion, &pathJSON, &decodedJSON, &createdAt); err != nil { + return nil + } + return map[string]interface{}{ + "id": id, + "raw_hex": nullStr(rawHex), + "timestamp": nullStr(ts), + "observer_id": nullStr(obsID), + "observer_name": nullStr(obsName), + "direction": nullStr(direction), + "snr": nullFloat(snr), + "rssi": nullFloat(rssi), + "score": nullInt(score), + "hash": nullStr(hash), + "route_type": nullInt(routeType), + "payload_type": nullInt(payloadType), + "payload_version": nullInt(payloadVersion), + "path_json": nullStr(pathJSON), + "decoded_json": nullStr(decodedJSON), + "created_at": nullStr(createdAt), + } +} + +func scanNodeRow(rows *sql.Rows) map[string]interface{} { + var pk string + var name, role, lastSeen, firstSeen sql.NullString + var lat, lon sql.NullFloat64 + var advertCount int + + if err := rows.Scan(&pk, &name, &role, &lat, &lon, &lastSeen, &firstSeen, &advertCount); err != nil { + return nil + } + return map[string]interface{}{ + "public_key": pk, + "name": nullStr(name), + "role": nullStr(role), + "lat": nullFloat(lat), + "lon": nullFloat(lon), + "last_seen": nullStr(lastSeen), + "first_seen": nullStr(firstSeen), + "advert_count": advertCount, + } +} + +func nullStr(ns sql.NullString) interface{} { + if ns.Valid { + return ns.String + } + return nil +} + +func nullFloat(nf sql.NullFloat64) interface{} { + if nf.Valid { + return nf.Float64 + } + return nil +} + +func nullInt(ni sql.NullInt64) interface{} { + if ni.Valid { + return int(ni.Int64) + } + return nil +} diff --git a/cmd/server/go.mod b/cmd/server/go.mod new file mode 100644 index 00000000..55343edb --- /dev/null +++ b/cmd/server/go.mod @@ -0,0 +1,21 @@ +module github.com/meshcore-analyzer/server + +go 1.22 + +require ( + github.com/gorilla/mux v1.8.1 + github.com/gorilla/websocket v1.5.3 + modernc.org/sqlite v1.34.5 +) + +require ( + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/ncruces/go-strftime v0.1.9 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + golang.org/x/sys v0.22.0 // indirect + modernc.org/libc v1.55.3 // indirect + modernc.org/mathutil v1.6.0 // indirect + modernc.org/memory v1.8.0 // indirect +) diff --git a/cmd/server/go.sum b/cmd/server/go.sum new file mode 100644 index 00000000..537897d5 --- /dev/null +++ b/cmd/server/go.sum @@ -0,0 +1,47 @@ +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo= +github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= +github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic= +golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= +golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc= +modernc.org/cc/v4 v4.21.4 h1:3Be/Rdo1fpr8GrQ7IVw9OHtplU4gWbb+wNgeoBMmGLQ= +modernc.org/cc/v4 v4.21.4/go.mod h1:HM7VJTZbUCR3rV8EYBi9wxnJ0ZBRiGE5OeGXNA0IsLQ= +modernc.org/ccgo/v4 v4.19.2 h1:lwQZgvboKD0jBwdaeVCTouxhxAyN6iawF3STraAal8Y= +modernc.org/ccgo/v4 v4.19.2/go.mod h1:ysS3mxiMV38XGRTTcgo0DQTeTmAO4oCmJl1nX9VFI3s= +modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE= +modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ= +modernc.org/gc/v2 v2.4.1 h1:9cNzOqPyMJBvrUipmynX0ZohMhcxPtMccYgGOJdOiBw= +modernc.org/gc/v2 v2.4.1/go.mod h1:wzN5dK1AzVGoH6XOzc3YZ+ey/jPgYHLuVckd62P0GYU= +modernc.org/libc v1.55.3 h1:AzcW1mhlPNrRtjS5sS+eW2ISCgSOLLNyFzRh/V3Qj/U= +modernc.org/libc v1.55.3/go.mod h1:qFXepLhz+JjFThQ4kzwzOjA/y/artDeg+pcYnY+Q83w= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= +modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E= +modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU= +modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4= +modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= +modernc.org/sortutil v1.2.0 h1:jQiD3PfS2REGJNzNCMMaLSp/wdMNieTbKX920Cqdgqc= +modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss= +modernc.org/sqlite v1.34.5 h1:Bb6SR13/fjp15jt70CL4f18JIN7p7dnMExd+UFnF15g= +modernc.org/sqlite v1.34.5/go.mod h1:YLuNmX9NKs8wRNK2ko1LW1NGYcc9FkBO69JOt1AR9JE= +modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA= +modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/cmd/server/main.go b/cmd/server/main.go new file mode 100644 index 00000000..bcae1f43 --- /dev/null +++ b/cmd/server/main.go @@ -0,0 +1,144 @@ +package main + +import ( + "database/sql" + "flag" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "path/filepath" + "syscall" + "time" + + "github.com/gorilla/mux" +) + +func main() { + var ( + configDir string + port int + dbPath string + publicDir string + pollMs int + ) + + flag.StringVar(&configDir, "config-dir", ".", "Directory containing config.json") + flag.IntVar(&port, "port", 0, "HTTP port (overrides config)") + flag.StringVar(&dbPath, "db", "", "SQLite database path (overrides config/env)") + flag.StringVar(&publicDir, "public", "public", "Directory to serve static files from") + flag.IntVar(&pollMs, "poll-ms", 1000, "SQLite poll interval for WebSocket broadcast (ms)") + flag.Parse() + + // Load config + cfg, err := LoadConfig(configDir) + if err != nil { + log.Printf("[config] warning: %v (using defaults)", err) + } + + // CLI flags override config + if port > 0 { + cfg.Port = port + } + if cfg.Port == 0 { + cfg.Port = 3000 + } + if dbPath != "" { + cfg.DBPath = dbPath + } + + // Resolve DB path + resolvedDB := cfg.ResolveDBPath(configDir) + log.Printf("[config] port=%d db=%s public=%s", cfg.Port, resolvedDB, publicDir) + + // Open database + database, err := OpenDB(resolvedDB) + if err != nil { + log.Fatalf("[db] failed to open %s: %v", resolvedDB, err) + } + defer database.Close() + + // Verify DB has expected tables + var tableName string + err = database.conn.QueryRow("SELECT name FROM sqlite_master WHERE type='table' AND name='transmissions'").Scan(&tableName) + if err == sql.ErrNoRows { + log.Fatalf("[db] table 'transmissions' not found — is this a MeshCore Analyzer database?") + } + + stats, err := database.GetStats() + if err != nil { + log.Printf("[db] warning: could not read stats: %v", err) + } else { + log.Printf("[db] transmissions=%d observations=%d nodes=%d observers=%d", + stats.TotalTransmissions, stats.TotalObservations, stats.TotalNodes, stats.TotalObservers) + } + + // WebSocket hub + hub := NewHub() + + // HTTP server + srv := NewServer(database, cfg, hub) + router := mux.NewRouter() + srv.RegisterRoutes(router) + + // WebSocket endpoint + router.HandleFunc("/ws", hub.ServeWS) + + // Static files + SPA fallback + absPublic, _ := filepath.Abs(publicDir) + if _, err := os.Stat(absPublic); err == nil { + fs := http.FileServer(http.Dir(absPublic)) + router.PathPrefix("/").Handler(spaHandler(absPublic, fs)) + log.Printf("[static] serving %s", absPublic) + } else { + log.Printf("[static] directory %s not found — API-only mode", absPublic) + router.PathPrefix("/").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/html") + w.Write([]byte(`

MeshCore Analyzer

Frontend not found. API available at /api/

`)) + }) + } + + // Start SQLite poller for WebSocket broadcast + poller := NewPoller(database, hub, time.Duration(pollMs)*time.Millisecond) + go poller.Start() + + // Graceful shutdown + httpServer := &http.Server{ + Addr: fmt.Sprintf(":%d", cfg.Port), + Handler: router, + ReadTimeout: 30 * time.Second, + WriteTimeout: 60 * time.Second, + IdleTimeout: 120 * time.Second, + } + + go func() { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + log.Println("[server] shutting down...") + poller.Stop() + httpServer.Close() + }() + + log.Printf("[server] MeshCore Analyzer (Go) listening on http://localhost:%d", cfg.Port) + if err := httpServer.ListenAndServe(); err != http.ErrServerClosed { + log.Fatalf("[server] %v", err) + } +} + +// spaHandler serves static files, falling back to index.html for SPA routes. +func spaHandler(root string, fs http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + path := filepath.Join(root, r.URL.Path) + if _, err := os.Stat(path); os.IsNotExist(err) { + http.ServeFile(w, r, filepath.Join(root, "index.html")) + return + } + // Disable caching for JS/CSS/HTML + if filepath.Ext(path) == ".js" || filepath.Ext(path) == ".css" || filepath.Ext(path) == ".html" { + w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") + } + fs.ServeHTTP(w, r) + }) +} diff --git a/cmd/server/routes.go b/cmd/server/routes.go new file mode 100644 index 00000000..1b4d170e --- /dev/null +++ b/cmd/server/routes.go @@ -0,0 +1,1146 @@ +package main + +import ( + "database/sql" + "encoding/json" + "fmt" + "log" + "net/http" + "regexp" + "runtime" + "strconv" + "strings" + "time" + + "github.com/gorilla/mux" +) + +// Server holds shared state for route handlers. +type Server struct { + db *DB + cfg *Config + hub *Hub + startedAt time.Time + perfStats *PerfStats +} + +// PerfStats tracks request performance. +type PerfStats struct { + Requests int64 + TotalMs float64 + Endpoints map[string]*EndpointPerf + SlowQueries []map[string]interface{} + StartedAt time.Time +} + +type EndpointPerf struct { + Count int + TotalMs float64 + MaxMs float64 + Recent []float64 +} + +func NewPerfStats() *PerfStats { + return &PerfStats{ + Endpoints: make(map[string]*EndpointPerf), + SlowQueries: make([]map[string]interface{}, 0), + StartedAt: time.Now(), + } +} + +func NewServer(db *DB, cfg *Config, hub *Hub) *Server { + return &Server{ + db: db, + cfg: cfg, + hub: hub, + startedAt: time.Now(), + perfStats: NewPerfStats(), + } +} + +// RegisterRoutes sets up all HTTP routes on the given router. +func (s *Server) RegisterRoutes(r *mux.Router) { + // Performance instrumentation middleware + r.Use(s.perfMiddleware) + + // Config endpoints + r.HandleFunc("/api/config/cache", s.handleConfigCache).Methods("GET") + r.HandleFunc("/api/config/client", s.handleConfigClient).Methods("GET") + r.HandleFunc("/api/config/regions", s.handleConfigRegions).Methods("GET") + r.HandleFunc("/api/config/theme", s.handleConfigTheme).Methods("GET") + r.HandleFunc("/api/config/map", s.handleConfigMap).Methods("GET") + + // System endpoints + r.HandleFunc("/api/health", s.handleHealth).Methods("GET") + r.HandleFunc("/api/stats", s.handleStats).Methods("GET") + r.HandleFunc("/api/perf", s.handlePerf).Methods("GET") + + // Packet endpoints + r.HandleFunc("/api/packets/timestamps", s.handlePacketTimestamps).Methods("GET") + r.HandleFunc("/api/packets/{id}", s.handlePacketDetail).Methods("GET") + r.HandleFunc("/api/packets", s.handlePackets).Methods("GET") + + // Node endpoints — fixed routes BEFORE parameterized + r.HandleFunc("/api/nodes/search", s.handleNodeSearch).Methods("GET") + r.HandleFunc("/api/nodes/bulk-health", s.handleBulkHealth).Methods("GET") + r.HandleFunc("/api/nodes/network-status", s.handleNetworkStatus).Methods("GET") + r.HandleFunc("/api/nodes/{pubkey}/health", s.handleNodeHealth).Methods("GET") + r.HandleFunc("/api/nodes/{pubkey}/paths", s.handleNodePaths).Methods("GET") + r.HandleFunc("/api/nodes/{pubkey}/analytics", s.handleNodeAnalytics).Methods("GET") + r.HandleFunc("/api/nodes/{pubkey}", s.handleNodeDetail).Methods("GET") + r.HandleFunc("/api/nodes", s.handleNodes).Methods("GET") + + // Analytics endpoints + r.HandleFunc("/api/analytics/rf", s.handleAnalyticsRF).Methods("GET") + r.HandleFunc("/api/analytics/topology", s.handleAnalyticsTopology).Methods("GET") + r.HandleFunc("/api/analytics/channels", s.handleAnalyticsChannels).Methods("GET") + r.HandleFunc("/api/analytics/distance", s.handleAnalyticsDistance).Methods("GET") + r.HandleFunc("/api/analytics/hash-sizes", s.handleAnalyticsHashSizes).Methods("GET") + r.HandleFunc("/api/analytics/subpaths", s.handleAnalyticsSubpaths).Methods("GET") + r.HandleFunc("/api/analytics/subpath-detail", s.handleAnalyticsSubpathDetail).Methods("GET") + + // Other endpoints + r.HandleFunc("/api/resolve-hops", s.handleResolveHops).Methods("GET") + r.HandleFunc("/api/channels/{hash}/messages", s.handleChannelMessages).Methods("GET") + r.HandleFunc("/api/channels", s.handleChannels).Methods("GET") + r.HandleFunc("/api/observers/{id}/analytics", s.handleObserverAnalytics).Methods("GET") + r.HandleFunc("/api/observers/{id}", s.handleObserverDetail).Methods("GET") + r.HandleFunc("/api/observers", s.handleObservers).Methods("GET") + r.HandleFunc("/api/traces/{hash}", s.handleTraces).Methods("GET") + r.HandleFunc("/api/iata-coords", s.handleIATACoords).Methods("GET") + r.HandleFunc("/api/audio-lab/buckets", s.handleAudioLabBuckets).Methods("GET") +} + +func (s *Server) perfMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, "/api/") { + next.ServeHTTP(w, r) + return + } + start := time.Now() + next.ServeHTTP(w, r) + ms := float64(time.Since(start).Microseconds()) / 1000.0 + + s.perfStats.Requests++ + s.perfStats.TotalMs += ms + + // Normalize key + re := regexp.MustCompile(`[0-9a-f]{8,}`) + key := re.ReplaceAllString(r.URL.Path, ":id") + if _, ok := s.perfStats.Endpoints[key]; !ok { + s.perfStats.Endpoints[key] = &EndpointPerf{Recent: make([]float64, 0, 100)} + } + ep := s.perfStats.Endpoints[key] + ep.Count++ + ep.TotalMs += ms + if ms > ep.MaxMs { + ep.MaxMs = ms + } + ep.Recent = append(ep.Recent, ms) + if len(ep.Recent) > 100 { + ep.Recent = ep.Recent[1:] + } + if ms > 100 { + slow := map[string]interface{}{ + "path": r.URL.Path, "ms": round(ms, 1), + "time": time.Now().UTC().Format(time.RFC3339), "status": 200, + } + s.perfStats.SlowQueries = append(s.perfStats.SlowQueries, slow) + if len(s.perfStats.SlowQueries) > 50 { + s.perfStats.SlowQueries = s.perfStats.SlowQueries[1:] + } + } + }) +} + +// --- Config Handlers --- + +func (s *Server) handleConfigCache(w http.ResponseWriter, r *http.Request) { + ct := s.cfg.CacheTTL + if ct == nil { + ct = map[string]interface{}{} + } + writeJSON(w, ct) +} + +func (s *Server) handleConfigClient(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]interface{}{ + "roles": s.cfg.Roles, + "healthThresholds": s.cfg.HealthThresholds, + "tiles": s.cfg.Tiles, + "snrThresholds": s.cfg.SnrThresholds, + "distThresholds": s.cfg.DistThresholds, + "maxHopDist": s.cfg.MaxHopDist, + "limits": s.cfg.Limits, + "perfSlowMs": s.cfg.PerfSlowMs, + "wsReconnectMs": s.cfg.WsReconnectMs, + "cacheInvalidateMs": s.cfg.CacheInvalidMs, + "externalUrls": s.cfg.ExternalUrls, + "propagationBufferMs": s.cfg.PropagationBufferMs(), + }) +} + +func (s *Server) handleConfigRegions(w http.ResponseWriter, r *http.Request) { + regions := make(map[string]string) + for k, v := range s.cfg.Regions { + regions[k] = v + } + codes, _ := s.db.GetDistinctIATAs() + for _, c := range codes { + if _, ok := regions[c]; !ok { + regions[c] = c + } + } + writeJSON(w, regions) +} + +func (s *Server) handleConfigTheme(w http.ResponseWriter, r *http.Request) { + theme := LoadTheme(".") + + branding := mergeMap(map[string]interface{}{ + "siteName": "MeshCore Analyzer", + "tagline": "Real-time MeshCore LoRa mesh network analyzer", + }, s.cfg.Branding, theme.Branding) + + themeColors := mergeMap(map[string]interface{}{ + "accent": "#4a9eff", + "accentHover": "#6db3ff", + "navBg": "#0f0f23", + "navBg2": "#1a1a2e", + }, s.cfg.Theme, theme.Theme) + + nodeColors := mergeMap(map[string]interface{}{ + "repeater": "#dc2626", + "companion": "#2563eb", + "room": "#16a34a", + "sensor": "#d97706", + "observer": "#8b5cf6", + }, s.cfg.NodeColors, theme.NodeColors) + + themeDark := mergeMap(map[string]interface{}{}, s.cfg.ThemeDark, theme.ThemeDark) + typeColors := mergeMap(map[string]interface{}{}, s.cfg.TypeColors, theme.TypeColors) + + var home interface{} + if theme.Home != nil { + home = theme.Home + } else if s.cfg.Home != nil { + home = s.cfg.Home + } + + writeJSON(w, map[string]interface{}{ + "branding": branding, + "theme": themeColors, + "themeDark": themeDark, + "nodeColors": nodeColors, + "typeColors": typeColors, + "home": home, + }) +} + +func (s *Server) handleConfigMap(w http.ResponseWriter, r *http.Request) { + center := s.cfg.MapDefaults.Center + if len(center) == 0 { + center = []float64{37.45, -122.0} + } + zoom := s.cfg.MapDefaults.Zoom + if zoom == 0 { + zoom = 9 + } + writeJSON(w, map[string]interface{}{"center": center, "zoom": zoom}) +} + +// --- System Handlers --- + +func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { + var m runtime.MemStats + runtime.ReadMemStats(&m) + uptime := time.Since(s.startedAt).Seconds() + + wsClients := 0 + if s.hub != nil { + wsClients = s.hub.ClientCount() + } + + writeJSON(w, map[string]interface{}{ + "status": "ok", + "uptime": int(uptime), + "uptimeHuman": fmt.Sprintf("%dh %dm", int(uptime)/3600, (int(uptime)%3600)/60), + "memory": map[string]interface{}{ + "rss": int(m.Sys / 1024 / 1024), + "heapUsed": int(m.HeapAlloc / 1024 / 1024), + "heapTotal": int(m.HeapSys / 1024 / 1024), + "external": 0, + }, + "eventLoop": map[string]interface{}{ + "currentLagMs": 0, "maxLagMs": 0, + "p50Ms": 0, "p95Ms": 0, "p99Ms": 0, + }, + "cache": map[string]interface{}{ + "entries": 0, "hits": 0, "misses": 0, + "staleHits": 0, "recomputes": 0, "hitRate": 0, + }, + "websocket": map[string]interface{}{"clients": wsClients}, + "packetStore": map[string]interface{}{ + "packets": 0, "estimatedMB": 0, + }, + "perf": map[string]interface{}{ + "totalRequests": s.perfStats.Requests, + "avgMs": safeAvg(s.perfStats.TotalMs, float64(s.perfStats.Requests)), + "slowQueries": len(s.perfStats.SlowQueries), + "recentSlow": lastN(s.perfStats.SlowQueries, 5), + }, + }) +} + +func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) { + stats, err := s.db.GetStats() + if err != nil { + writeError(w, 500, err.Error()) + return + } + counts := s.db.GetRoleCounts() + result := map[string]interface{}{ + "totalPackets": stats.TotalPackets, + "totalTransmissions": stats.TotalTransmissions, + "totalObservations": stats.TotalObservations, + "totalNodes": stats.TotalNodes, + "totalObservers": stats.TotalObservers, + "packetsLastHour": stats.PacketsLastHour, + "counts": counts, + } + writeJSON(w, result) +} + +func (s *Server) handlePerf(w http.ResponseWriter, r *http.Request) { + summary := map[string]interface{}{} + for path, ep := range s.perfStats.Endpoints { + sorted := sortedCopy(ep.Recent) + summary[path] = map[string]interface{}{ + "count": ep.Count, + "avgMs": round(ep.TotalMs/float64(ep.Count), 1), + "p50Ms": round(percentile(sorted, 0.5), 1), + "p95Ms": round(percentile(sorted, 0.95), 1), + "maxMs": round(ep.MaxMs, 1), + } + } + writeJSON(w, map[string]interface{}{ + "uptime": int(time.Since(s.perfStats.StartedAt).Seconds()), + "totalRequests": s.perfStats.Requests, + "avgMs": safeAvg(s.perfStats.TotalMs, float64(s.perfStats.Requests)), + "endpoints": summary, + "slowQueries": lastN(s.perfStats.SlowQueries, 20), + "cache": map[string]interface{}{ + "size": 0, "hits": 0, "misses": 0, + "staleHits": 0, "recomputes": 0, "hitRate": 0, + }, + }) +} + +// --- Packet Handlers --- + +func (s *Server) handlePackets(w http.ResponseWriter, r *http.Request) { + q := PacketQuery{ + Limit: queryInt(r, "limit", 50), + Offset: queryInt(r, "offset", 0), + Observer: r.URL.Query().Get("observer"), + Hash: r.URL.Query().Get("hash"), + Since: r.URL.Query().Get("since"), + Until: r.URL.Query().Get("until"), + Region: r.URL.Query().Get("region"), + Node: r.URL.Query().Get("node"), + Order: "DESC", + } + if r.URL.Query().Get("order") == "asc" { + q.Order = "ASC" + } + if v := r.URL.Query().Get("type"); v != "" { + t, _ := strconv.Atoi(v) + q.Type = &t + } + if v := r.URL.Query().Get("route"); v != "" { + t, _ := strconv.Atoi(v) + q.Route = &t + } + + if r.URL.Query().Get("groupByHash") == "true" { + result, err := s.db.QueryGroupedPackets(q) + if err != nil { + writeError(w, 500, err.Error()) + return + } + writeJSON(w, result) + return + } + + result, err := s.db.QueryPackets(q) + if err != nil { + writeError(w, 500, err.Error()) + return + } + + // Strip observations from default response + if r.URL.Query().Get("expand") != "observations" { + for _, p := range result.Packets { + delete(p, "observations") + } + } + + writeJSON(w, result) +} + +func (s *Server) handlePacketTimestamps(w http.ResponseWriter, r *http.Request) { + since := r.URL.Query().Get("since") + if since == "" { + writeError(w, 400, "since required") + return + } + ts, err := s.db.GetTimestamps(since) + if err != nil { + writeError(w, 500, err.Error()) + return + } + writeJSON(w, ts) +} + +var hashPattern = regexp.MustCompile(`^[0-9a-f]{16}$`) + +func (s *Server) handlePacketDetail(w http.ResponseWriter, r *http.Request) { + param := mux.Vars(r)["id"] + var packet map[string]interface{} + var err error + + if hashPattern.MatchString(strings.ToLower(param)) { + packet, err = s.db.GetPacketByHash(param) + } + if packet == nil { + id, parseErr := strconv.Atoi(param) + if parseErr == nil { + packet, err = s.db.GetTransmissionByID(id) + if packet == nil { + packet, err = s.db.GetPacketByID(id) + } + } + } + if err != nil || packet == nil { + writeError(w, 404, "Not found") + return + } + + // Build observation list + hash, _ := packet["hash"].(string) + observations, _ := s.db.GetObservationsForHash(hash) + observationCount := len(observations) + if observationCount == 0 { + observationCount = 1 + } + + writeJSON(w, map[string]interface{}{ + "packet": packet, + "path": []interface{}{}, + "breakdown": map[string]interface{}{}, + "observation_count": observationCount, + "observations": observations, + }) +} + +// --- Node Handlers --- + +func (s *Server) handleNodes(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + nodes, total, counts, err := s.db.GetNodes( + queryInt(r, "limit", 50), + queryInt(r, "offset", 0), + q.Get("role"), q.Get("search"), q.Get("before"), + q.Get("lastHeard"), q.Get("sortBy"), q.Get("region"), + ) + if err != nil { + writeError(w, 500, err.Error()) + return + } + writeJSON(w, map[string]interface{}{"nodes": nodes, "total": total, "counts": counts}) +} + +func (s *Server) handleNodeSearch(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query().Get("q") + if strings.TrimSpace(q) == "" { + writeJSON(w, map[string]interface{}{"nodes": []interface{}{}}) + return + } + nodes, err := s.db.SearchNodes(strings.TrimSpace(q), 10) + if err != nil { + writeError(w, 500, err.Error()) + return + } + writeJSON(w, map[string]interface{}{"nodes": nodes}) +} + +func (s *Server) handleNodeDetail(w http.ResponseWriter, r *http.Request) { + pubkey := mux.Vars(r)["pubkey"] + node, err := s.db.GetNodeByPubkey(pubkey) + if err != nil || node == nil { + writeError(w, 404, "Not found") + return + } + + name := "" + if n, ok := node["name"]; ok && n != nil { + name = fmt.Sprintf("%v", n) + } + recentAdverts, _ := s.db.GetRecentPacketsForNode(pubkey, name, 20) + + writeJSON(w, map[string]interface{}{ + "node": node, + "recentAdverts": recentAdverts, + }) +} + +func (s *Server) handleNodeHealth(w http.ResponseWriter, r *http.Request) { + pubkey := mux.Vars(r)["pubkey"] + result, err := s.db.GetNodeHealth(pubkey) + if err != nil || result == nil { + writeError(w, 404, "Not found") + return + } + writeJSON(w, result) +} + +func (s *Server) handleBulkHealth(w http.ResponseWriter, r *http.Request) { + limit := queryInt(r, "limit", 50) + if limit > 200 { + limit = 200 + } + + rows, err := s.db.conn.Query("SELECT public_key, name, role, lat, lon, last_seen FROM nodes ORDER BY last_seen DESC LIMIT ?", limit) + if err != nil { + writeError(w, 500, err.Error()) + return + } + defer rows.Close() + + results := make([]map[string]interface{}, 0) + for rows.Next() { + var pk string + var name, role, lastSeen sql.NullString + var lat, lon sql.NullFloat64 + rows.Scan(&pk, &name, &role, &lat, &lon, &lastSeen) + + results = append(results, map[string]interface{}{ + "public_key": pk, + "name": nullStr(name), + "role": nullStr(role), + "lat": nullFloat(lat), + "lon": nullFloat(lon), + "stats": map[string]interface{}{ + "totalTransmissions": 0, + "totalObservations": 0, + "totalPackets": 0, + "packetsToday": 0, + "avgSnr": nil, + "lastHeard": nullStr(lastSeen), + }, + "observers": []interface{}{}, + }) + } + writeJSON(w, results) +} + +func (s *Server) handleNetworkStatus(w http.ResponseWriter, r *http.Request) { + ht := s.cfg.GetHealthThresholds() + result, err := s.db.GetNetworkStatus(ht) + if err != nil { + writeError(w, 500, err.Error()) + return + } + writeJSON(w, result) +} + +func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) { + pubkey := mux.Vars(r)["pubkey"] + node, err := s.db.GetNodeByPubkey(pubkey) + if err != nil || node == nil { + writeError(w, 404, "Not found") + return + } + writeJSON(w, map[string]interface{}{ + "node": node, + "paths": []interface{}{}, + "totalPaths": 0, + "totalTransmissions": 0, + }) +} + +func (s *Server) handleNodeAnalytics(w http.ResponseWriter, r *http.Request) { + pubkey := mux.Vars(r)["pubkey"] + days := queryInt(r, "days", 7) + if days < 1 { + days = 1 + } + if days > 365 { + days = 365 + } + + node, err := s.db.GetNodeByPubkey(pubkey) + if err != nil || node == nil { + writeError(w, 404, "Not found") + return + } + + name := "" + if n, ok := node["name"]; ok && n != nil { + name = fmt.Sprintf("%v", n) + } + + fromISO := time.Now().Add(-time.Duration(days) * 24 * time.Hour).Format(time.RFC3339) + toISO := time.Now().Format(time.RFC3339) + + pk := "%" + pubkey + "%" + np := "%" + name + "%" + whereClause := "decoded_json LIKE ? OR decoded_json LIKE ?" + if name == "" { + whereClause = "decoded_json LIKE ?" + np = pk + } + timeWhere := fmt.Sprintf("(%s) AND timestamp > ?", whereClause) + + // Activity timeline + actSQL := fmt.Sprintf(`SELECT substr(timestamp, 1, 13) || ':00:00Z' as bucket, COUNT(*) as count + FROM packets_v WHERE %s GROUP BY bucket ORDER BY bucket`, timeWhere) + aRows, _ := s.db.conn.Query(actSQL, pk, np, fromISO) + activityTimeline := make([]map[string]interface{}, 0) + if aRows != nil { + defer aRows.Close() + for aRows.Next() { + var bucket string + var count int + aRows.Scan(&bucket, &count) + activityTimeline = append(activityTimeline, map[string]interface{}{"bucket": bucket, "count": count}) + } + } + + // SNR trend + snrSQL := fmt.Sprintf(`SELECT timestamp, snr, rssi, observer_id, observer_name + FROM packets_v WHERE %s AND snr IS NOT NULL ORDER BY timestamp`, timeWhere) + sRows, _ := s.db.conn.Query(snrSQL, pk, np, fromISO) + snrTrend := make([]map[string]interface{}, 0) + if sRows != nil { + defer sRows.Close() + for sRows.Next() { + var ts string + var snr, rssi sql.NullFloat64 + var obsID, obsName sql.NullString + sRows.Scan(&ts, &snr, &rssi, &obsID, &obsName) + snrTrend = append(snrTrend, map[string]interface{}{ + "timestamp": ts, "snr": nullFloat(snr), "rssi": nullFloat(rssi), + "observer_id": nullStr(obsID), "observer_name": nullStr(obsName), + }) + } + } + + // Packet type breakdown + ptSQL := fmt.Sprintf("SELECT payload_type, COUNT(*) as count FROM packets_v WHERE %s GROUP BY payload_type", timeWhere) + ptRows, _ := s.db.conn.Query(ptSQL, pk, np, fromISO) + packetTypeBreakdown := make([]map[string]interface{}, 0) + if ptRows != nil { + defer ptRows.Close() + for ptRows.Next() { + var pt, count int + ptRows.Scan(&pt, &count) + packetTypeBreakdown = append(packetTypeBreakdown, map[string]interface{}{"payload_type": pt, "count": count}) + } + } + + // Observer coverage + ocSQL := fmt.Sprintf(`SELECT observer_id, observer_name, COUNT(*) as packetCount, + AVG(snr) as avgSnr, AVG(rssi) as avgRssi, MIN(timestamp) as firstSeen, MAX(timestamp) as lastSeen + FROM packets_v WHERE %s AND observer_id IS NOT NULL + GROUP BY observer_id ORDER BY packetCount DESC`, timeWhere) + ocRows, _ := s.db.conn.Query(ocSQL, pk, np, fromISO) + observerCoverage := make([]map[string]interface{}, 0) + if ocRows != nil { + defer ocRows.Close() + for ocRows.Next() { + var obsID, obsName, first, last sql.NullString + var pktCount int + var avgSnr, avgRssi sql.NullFloat64 + ocRows.Scan(&obsID, &obsName, &pktCount, &avgSnr, &avgRssi, &first, &last) + observerCoverage = append(observerCoverage, map[string]interface{}{ + "observer_id": nullStr(obsID), "observer_name": nullStr(obsName), + "packetCount": pktCount, "avgSnr": nullFloat(avgSnr), "avgRssi": nullFloat(avgRssi), + "firstSeen": nullStr(first), "lastSeen": nullStr(last), + }) + } + } + + writeJSON(w, map[string]interface{}{ + "node": node, + "timeRange": map[string]interface{}{"from": fromISO, "to": toISO, "days": days}, + "activityTimeline": activityTimeline, + "snrTrend": snrTrend, + "packetTypeBreakdown": packetTypeBreakdown, + "observerCoverage": observerCoverage, + "hopDistribution": []interface{}{}, + "peerInteractions": []interface{}{}, + "uptimeHeatmap": []interface{}{}, + "computedStats": map[string]interface{}{ + "availabilityPct": 0, "longestSilenceMs": 0, "longestSilenceStart": nil, + "signalGrade": "D", "snrMean": 0, "snrStdDev": 0, + "relayPct": 0, "totalPackets": len(activityTimeline), + "uniqueObservers": len(observerCoverage), "uniquePeers": 0, "avgPacketsPerDay": 0, + }, + }) +} + +// --- Analytics Handlers --- + +func (s *Server) handleAnalyticsRF(w http.ResponseWriter, r *http.Request) { + // Basic RF analytics from SQL + region := r.URL.Query().Get("region") + regionFilter := "" + var rArgs []interface{} + if region != "" { + regionFilter = "AND observer_id IN (SELECT id FROM observers WHERE iata = ?)" + rArgs = append(rArgs, region) + } + + // SNR/RSSI stats + rfSQL := fmt.Sprintf(`SELECT COUNT(*) as cnt, AVG(snr) as avgSnr, MIN(snr) as minSnr, MAX(snr) as maxSnr, + AVG(rssi) as avgRssi, MIN(rssi) as minRssi, MAX(rssi) as maxRssi + FROM packets_v WHERE snr IS NOT NULL %s`, regionFilter) + var cnt int + var avgSnr, minSnr, maxSnr, avgRssi, minRssi, maxRssi sql.NullFloat64 + s.db.conn.QueryRow(rfSQL, rArgs...).Scan(&cnt, &avgSnr, &minSnr, &maxSnr, &avgRssi, &minRssi, &maxRssi) + + // Payload type distribution + ptSQL := fmt.Sprintf(`SELECT payload_type, COUNT(DISTINCT hash) as count FROM packets_v WHERE 1=1 %s GROUP BY payload_type ORDER BY count DESC`, regionFilter) + ptRows, _ := s.db.conn.Query(ptSQL, rArgs...) + payloadTypes := make([]map[string]interface{}, 0) + ptNames := map[int]string{0: "REQ", 1: "RESPONSE", 2: "TXT_MSG", 3: "ACK", 4: "ADVERT", 5: "GRP_TXT", 7: "ANON_REQ", 8: "PATH", 9: "TRACE", 11: "CONTROL"} + if ptRows != nil { + defer ptRows.Close() + for ptRows.Next() { + var pt, count int + ptRows.Scan(&pt, &count) + name := ptNames[pt] + if name == "" { + name = fmt.Sprintf("UNK(%d)", pt) + } + payloadTypes = append(payloadTypes, map[string]interface{}{"type": pt, "name": name, "count": count}) + } + } + + // Total counts + var totalAll int + countSQL := fmt.Sprintf("SELECT COUNT(*) FROM packets_v WHERE 1=1 %s", regionFilter) + s.db.conn.QueryRow(countSQL, rArgs...).Scan(&totalAll) + var totalTx int + txSQL := fmt.Sprintf("SELECT COUNT(DISTINCT hash) FROM packets_v WHERE 1=1 %s", regionFilter) + s.db.conn.QueryRow(txSQL, rArgs...).Scan(&totalTx) + + writeJSON(w, map[string]interface{}{ + "totalPackets": cnt, + "totalAllPackets": totalAll, + "totalTransmissions": totalTx, + "snr": map[string]interface{}{ + "min": nullFloat(minSnr), "max": nullFloat(maxSnr), + "avg": nullFloat(avgSnr), "median": 0, "stddev": 0, + }, + "rssi": map[string]interface{}{ + "min": nullFloat(minRssi), "max": nullFloat(maxRssi), + "avg": nullFloat(avgRssi), "median": 0, "stddev": 0, + }, + "snrValues": map[string]interface{}{"bins": []interface{}{}, "min": 0, "max": 0}, + "rssiValues": map[string]interface{}{"bins": []interface{}{}, "min": 0, "max": 0}, + "packetSizes": map[string]interface{}{"bins": []interface{}{}, "min": 0, "max": 0}, + "minPacketSize": 0, "maxPacketSize": 0, "avgPacketSize": 0, + "packetsPerHour": []interface{}{}, + "payloadTypes": payloadTypes, + "snrByType": []interface{}{}, + "signalOverTime": []interface{}{}, + "scatterData": []interface{}{}, + "timeSpanHours": 0, + }) +} + +func (s *Server) handleAnalyticsTopology(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]interface{}{ + "uniqueNodes": 0, "avgHops": 0, "medianHops": 0, "maxHops": 0, + "hopDistribution": []interface{}{}, + "topRepeaters": []interface{}{}, + "topPairs": []interface{}{}, + "hopsVsSnr": []interface{}{}, + "observers": []interface{}{}, + "perObserverReach": map[string]interface{}{}, + "multiObsNodes": []interface{}{}, + "bestPathList": []interface{}{}, + }) +} + +func (s *Server) handleAnalyticsChannels(w http.ResponseWriter, r *http.Request) { + channels, _ := s.db.GetChannels() + writeJSON(w, map[string]interface{}{ + "activeChannels": len(channels), + "decryptable": len(channels), + "channels": channels, + "topSenders": []interface{}{}, + "channelTimeline": []interface{}{}, + "msgLengths": []interface{}{}, + }) +} + +func (s *Server) handleAnalyticsDistance(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]interface{}{ + "summary": map[string]interface{}{"totalHops": 0, "totalPaths": 0, "avgDist": 0, "maxDist": 0}, + "topHops": []interface{}{}, + "topPaths": []interface{}{}, + "catStats": map[string]interface{}{}, + "distHistogram": []interface{}{}, + "distOverTime": []interface{}{}, + }) +} + +func (s *Server) handleAnalyticsHashSizes(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]interface{}{ + "total": 0, + "distribution": map[string]int{"1": 0, "2": 0, "3": 0}, + "hourly": []interface{}{}, + "topHops": []interface{}{}, + "multiByteNodes": []interface{}{}, + }) +} + +func (s *Server) handleAnalyticsSubpaths(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]interface{}{ + "subpaths": []interface{}{}, + "totalPaths": 0, + }) +} + +func (s *Server) handleAnalyticsSubpathDetail(w http.ResponseWriter, r *http.Request) { + hops := r.URL.Query().Get("hops") + if hops == "" { + writeJSON(w, map[string]interface{}{"error": "Need at least 2 hops"}) + return + } + writeJSON(w, map[string]interface{}{ + "hops": strings.Split(hops, ","), + "nodes": []interface{}{}, + "totalMatches": 0, + "firstSeen": nil, + "lastSeen": nil, + "signal": map[string]interface{}{"avgSnr": nil, "avgRssi": nil, "samples": 0}, + "hourDistribution": make([]int, 24), + "parentPaths": []interface{}{}, + "observers": []interface{}{}, + }) +} + +// --- Other Handlers --- + +func (s *Server) handleResolveHops(w http.ResponseWriter, r *http.Request) { + hopsParam := r.URL.Query().Get("hops") + if hopsParam == "" { + writeJSON(w, map[string]interface{}{"resolved": map[string]interface{}{}}) + return + } + hops := strings.Split(hopsParam, ",") + resolved := map[string]interface{}{} + + for _, hop := range hops { + if hop == "" { + continue + } + hopLower := strings.ToLower(hop) + rows, err := s.db.conn.Query("SELECT public_key, name, lat, lon FROM nodes WHERE LOWER(public_key) LIKE ?", hopLower+"%") + if err != nil { + resolved[hop] = map[string]interface{}{"name": nil, "candidates": []interface{}{}, "conflicts": []interface{}{}} + continue + } + + var candidates []map[string]interface{} + for rows.Next() { + var pk string + var name sql.NullString + var lat, lon sql.NullFloat64 + rows.Scan(&pk, &name, &lat, &lon) + candidates = append(candidates, map[string]interface{}{ + "name": nullStr(name), "pubkey": pk, + "lat": nullFloat(lat), "lon": nullFloat(lon), + }) + } + rows.Close() + + if len(candidates) == 0 { + resolved[hop] = map[string]interface{}{"name": nil, "candidates": []interface{}{}, "conflicts": []interface{}{}} + } else if len(candidates) == 1 { + resolved[hop] = map[string]interface{}{ + "name": candidates[0]["name"], "pubkey": candidates[0]["pubkey"], + "candidates": candidates, "conflicts": []interface{}{}, + } + } else { + resolved[hop] = map[string]interface{}{ + "name": candidates[0]["name"], "pubkey": candidates[0]["pubkey"], + "ambiguous": true, "candidates": candidates, "conflicts": candidates, + } + } + } + writeJSON(w, map[string]interface{}{"resolved": resolved}) +} + +func (s *Server) handleChannels(w http.ResponseWriter, r *http.Request) { + channels, err := s.db.GetChannels() + if err != nil { + writeError(w, 500, err.Error()) + return + } + writeJSON(w, map[string]interface{}{"channels": channels}) +} + +func (s *Server) handleChannelMessages(w http.ResponseWriter, r *http.Request) { + hash := mux.Vars(r)["hash"] + limit := queryInt(r, "limit", 100) + offset := queryInt(r, "offset", 0) + messages, total, err := s.db.GetChannelMessages(hash, limit, offset) + if err != nil { + writeError(w, 500, err.Error()) + return + } + writeJSON(w, map[string]interface{}{"messages": messages, "total": total}) +} + +func (s *Server) handleObservers(w http.ResponseWriter, r *http.Request) { + observers, err := s.db.GetObservers() + if err != nil { + writeError(w, 500, err.Error()) + return + } + result := make([]map[string]interface{}, 0, len(observers)) + for _, o := range observers { + m := map[string]interface{}{ + "id": o.ID, "name": o.Name, "iata": o.IATA, + "last_seen": o.LastSeen, "first_seen": o.FirstSeen, + "packet_count": o.PacketCount, + "model": o.Model, "firmware": o.Firmware, + "client_version": o.ClientVersion, "radio": o.Radio, + "battery_mv": o.BatteryMv, "uptime_secs": o.UptimeSecs, + "noise_floor": o.NoiseFloor, + "packetsLastHour": 0, + "lat": nil, "lon": nil, "nodeRole": nil, + } + result = append(result, m) + } + writeJSON(w, map[string]interface{}{ + "observers": result, + "server_time": time.Now().UTC().Format(time.RFC3339), + }) +} + +func (s *Server) handleObserverDetail(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + obs, err := s.db.GetObserverByID(id) + if err != nil || obs == nil { + writeError(w, 404, "Observer not found") + return + } + writeJSON(w, map[string]interface{}{ + "id": obs.ID, "name": obs.Name, "iata": obs.IATA, + "last_seen": obs.LastSeen, "first_seen": obs.FirstSeen, + "packet_count": obs.PacketCount, + "model": obs.Model, "firmware": obs.Firmware, + "client_version": obs.ClientVersion, "radio": obs.Radio, + "battery_mv": obs.BatteryMv, "uptime_secs": obs.UptimeSecs, + "noise_floor": obs.NoiseFloor, + "packetsLastHour": 0, + }) +} + +func (s *Server) handleObserverAnalytics(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + days := queryInt(r, "days", 7) + since := time.Now().Add(-time.Duration(days) * 24 * time.Hour).Format(time.RFC3339) + + // Timeline + bucketH := 4 + if days <= 1 { + bucketH = 1 + } else if days > 7 { + bucketH = 24 + } + _ = bucketH + + // Packet type breakdown + ptSQL := `SELECT payload_type, COUNT(*) as count FROM packets_v WHERE observer_id = ? AND timestamp > ? GROUP BY payload_type` + ptRows, _ := s.db.conn.Query(ptSQL, id, since) + packetTypes := map[string]interface{}{} + if ptRows != nil { + defer ptRows.Close() + for ptRows.Next() { + var pt, count int + ptRows.Scan(&pt, &count) + packetTypes[strconv.Itoa(pt)] = count + } + } + + // Recent packets + rpSQL := `SELECT id, raw_hex, timestamp, observer_id, observer_name, direction, snr, rssi, score, hash, route_type, payload_type, payload_version, path_json, decoded_json, created_at + FROM packets_v WHERE observer_id = ? AND timestamp > ? ORDER BY timestamp DESC LIMIT 20` + rpRows, _ := s.db.conn.Query(rpSQL, id, since) + recentPackets := make([]map[string]interface{}, 0) + if rpRows != nil { + defer rpRows.Close() + for rpRows.Next() { + p := scanPacketRow(rpRows) + if p != nil { + recentPackets = append(recentPackets, p) + } + } + } + + writeJSON(w, map[string]interface{}{ + "timeline": []interface{}{}, + "packetTypes": packetTypes, + "nodesTimeline": []interface{}{}, + "snrDistribution": []interface{}{}, + "recentPackets": recentPackets, + }) +} + +func (s *Server) handleTraces(w http.ResponseWriter, r *http.Request) { + hash := mux.Vars(r)["hash"] + traces, err := s.db.GetTraces(hash) + if err != nil { + writeError(w, 500, err.Error()) + return + } + writeJSON(w, map[string]interface{}{"traces": traces}) +} + +func (s *Server) handleIATACoords(w http.ResponseWriter, r *http.Request) { + // Return empty coords — full IATA coordinate table would be in a shared package + writeJSON(w, map[string]interface{}{"coords": map[string]interface{}{}}) +} + +func (s *Server) handleAudioLabBuckets(w http.ResponseWriter, r *http.Request) { + // Query representative packets by type + ptSQL := `SELECT payload_type, id, raw_hex, hash, decoded_json, path_json, observer_id, timestamp + FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY payload_type ORDER BY length(raw_hex)) as rn + FROM packets_v WHERE raw_hex IS NOT NULL + ) sub WHERE rn <= 8` + rows, err := s.db.conn.Query(ptSQL) + if err != nil { + writeJSON(w, map[string]interface{}{"buckets": map[string]interface{}{}}) + return + } + defer rows.Close() + + ptNames := map[int]string{0: "REQ", 1: "RESPONSE", 2: "TXT_MSG", 3: "ACK", 4: "ADVERT", 5: "GRP_TXT", 7: "ANON_REQ", 8: "PATH", 9: "TRACE", 11: "CONTROL"} + buckets := map[string][]map[string]interface{}{} + for rows.Next() { + var pt, id int + var rawHex, hash, decodedJSON, pathJSON, obsID, ts sql.NullString + rows.Scan(&pt, &id, &rawHex, &hash, &decodedJSON, &pathJSON, &obsID, &ts) + typeName := ptNames[pt] + if typeName == "" { + typeName = "UNKNOWN" + } + if _, ok := buckets[typeName]; !ok { + buckets[typeName] = make([]map[string]interface{}, 0) + } + buckets[typeName] = append(buckets[typeName], map[string]interface{}{ + "hash": nullStr(hash), "raw_hex": nullStr(rawHex), + "decoded_json": nullStr(decodedJSON), "observation_count": 1, + "payload_type": pt, "path_json": nullStr(pathJSON), + "observer_id": nullStr(obsID), "timestamp": nullStr(ts), + }) + } + writeJSON(w, map[string]interface{}{"buckets": buckets}) +} + +// --- Helpers --- + +func writeJSON(w http.ResponseWriter, v interface{}) { + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(v); err != nil { + log.Printf("[routes] JSON encode error: %v", err) + } +} + +func writeError(w http.ResponseWriter, code int, msg string) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + json.NewEncoder(w).Encode(map[string]string{"error": msg}) +} + +func queryInt(r *http.Request, key string, def int) int { + v := r.URL.Query().Get(key) + if v == "" { + return def + } + n, err := strconv.Atoi(v) + if err != nil { + return def + } + return n +} + +func mergeMap(base map[string]interface{}, overlays ...map[string]interface{}) map[string]interface{} { + result := make(map[string]interface{}) + for k, v := range base { + result[k] = v + } + for _, o := range overlays { + if o == nil { + continue + } + for k, v := range o { + result[k] = v + } + } + return result +} + +func safeAvg(total, count float64) float64 { + if count == 0 { + return 0 + } + return round(total/count, 1) +} + +func round(val float64, places int) float64 { + m := 1.0 + for i := 0; i < places; i++ { + m *= 10 + } + return float64(int(val*m+0.5)) / m +} + +func percentile(sorted []float64, p float64) float64 { + if len(sorted) == 0 { + return 0 + } + idx := int(float64(len(sorted)) * p) + if idx >= len(sorted) { + idx = len(sorted) - 1 + } + return sorted[idx] +} + +func sortedCopy(arr []float64) []float64 { + cp := make([]float64, len(arr)) + copy(cp, arr) + for i := 0; i < len(cp); i++ { + for j := i + 1; j < len(cp); j++ { + if cp[j] < cp[i] { + cp[i], cp[j] = cp[j], cp[i] + } + } + } + return cp +} + +func lastN(arr []map[string]interface{}, n int) []map[string]interface{} { + if len(arr) <= n { + return arr + } + return arr[len(arr)-n:] +} diff --git a/cmd/server/websocket.go b/cmd/server/websocket.go new file mode 100644 index 00000000..3ee50fe3 --- /dev/null +++ b/cmd/server/websocket.go @@ -0,0 +1,186 @@ +package main + +import ( + "encoding/json" + "log" + "net/http" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 4096, + CheckOrigin: func(r *http.Request) bool { return true }, +} + +// Hub manages WebSocket clients and broadcasts. +type Hub struct { + mu sync.RWMutex + clients map[*Client]bool +} + +// Client is a single WebSocket connection. +type Client struct { + conn *websocket.Conn + send chan []byte +} + +func NewHub() *Hub { + return &Hub{ + clients: make(map[*Client]bool), + } +} + +func (h *Hub) ClientCount() int { + h.mu.RLock() + defer h.mu.RUnlock() + return len(h.clients) +} + +func (h *Hub) Register(c *Client) { + h.mu.Lock() + h.clients[c] = true + h.mu.Unlock() + log.Printf("[ws] client connected (%d total)", h.ClientCount()) +} + +func (h *Hub) Unregister(c *Client) { + h.mu.Lock() + if _, ok := h.clients[c]; ok { + delete(h.clients, c) + close(c.send) + } + h.mu.Unlock() + log.Printf("[ws] client disconnected (%d total)", h.ClientCount()) +} + +// Broadcast sends a message to all connected clients. +func (h *Hub) Broadcast(msg interface{}) { + data, err := json.Marshal(msg) + if err != nil { + log.Printf("[ws] marshal error: %v", err) + return + } + h.mu.RLock() + defer h.mu.RUnlock() + for c := range h.clients { + select { + case c.send <- data: + default: + // Client buffer full — drop + } + } +} + +// ServeWS handles the WebSocket upgrade and runs the client. +func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("[ws] upgrade error: %v", err) + return + } + + client := &Client{ + conn: conn, + send: make(chan []byte, 256), + } + h.Register(client) + + go client.writePump() + go client.readPump(h) +} + +func (c *Client) readPump(hub *Hub) { + defer func() { + hub.Unregister(c) + c.conn.Close() + }() + c.conn.SetReadLimit(512) + c.conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + c.conn.SetPongHandler(func(string) error { + c.conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + return nil + }) + for { + _, _, err := c.conn.ReadMessage() + if err != nil { + break + } + } +} + +func (c *Client) writePump() { + ticker := time.NewTicker(30 * time.Second) + defer func() { + ticker.Stop() + c.conn.Close() + }() + for { + select { + case message, ok := <-c.send: + c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if !ok { + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil { + return + } + case <-ticker.C: + c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} + +// Poller watches for new transmissions in SQLite and broadcasts them. +type Poller struct { + db *DB + hub *Hub + interval time.Duration + stop chan struct{} +} + +func NewPoller(db *DB, hub *Hub, interval time.Duration) *Poller { + return &Poller{db: db, hub: hub, interval: interval, stop: make(chan struct{})} +} + +func (p *Poller) Start() { + lastID := p.db.GetMaxTransmissionID() + log.Printf("[poller] starting from transmission ID %d, interval %v", lastID, p.interval) + + ticker := time.NewTicker(p.interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + newTxs, err := p.db.GetNewTransmissionsSince(lastID, 100) + if err != nil { + log.Printf("[poller] error: %v", err) + continue + } + for _, tx := range newTxs { + id, _ := tx["id"].(int) + if id > lastID { + lastID = id + } + p.hub.Broadcast(map[string]interface{}{ + "type": "packet", + "data": tx, + }) + } + case <-p.stop: + return + } + } +} + +func (p *Poller) Stop() { + close(p.stop) +}