mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-05-25 18:44:04 +00:00
feat: add Go web server (cmd/server/) — full API + WebSocket + static files
35+ REST endpoints matching Node.js server, WebSocket broadcast, static file serving with SPA fallback, config.json support. Uses modernc.org/sqlite (pure Go, no CGO required). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -0,0 +1,160 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
// Config mirrors the Node.js config.json structure (read-only fields).
|
||||
type Config struct {
|
||||
Port int `json:"port"`
|
||||
APIKey string `json:"apiKey"`
|
||||
DBPath string `json:"dbPath"`
|
||||
|
||||
Branding map[string]interface{} `json:"branding"`
|
||||
Theme map[string]interface{} `json:"theme"`
|
||||
ThemeDark map[string]interface{} `json:"themeDark"`
|
||||
NodeColors map[string]interface{} `json:"nodeColors"`
|
||||
TypeColors map[string]interface{} `json:"typeColors"`
|
||||
Home map[string]interface{} `json:"home"`
|
||||
|
||||
MapDefaults struct {
|
||||
Center []float64 `json:"center"`
|
||||
Zoom int `json:"zoom"`
|
||||
} `json:"mapDefaults"`
|
||||
|
||||
Regions map[string]string `json:"regions"`
|
||||
|
||||
Roles map[string]interface{} `json:"roles"`
|
||||
HealthThresholds *HealthThresholds `json:"healthThresholds"`
|
||||
Tiles map[string]interface{} `json:"tiles"`
|
||||
SnrThresholds map[string]interface{} `json:"snrThresholds"`
|
||||
DistThresholds map[string]interface{} `json:"distThresholds"`
|
||||
MaxHopDist *float64 `json:"maxHopDist"`
|
||||
Limits map[string]interface{} `json:"limits"`
|
||||
PerfSlowMs *int `json:"perfSlowMs"`
|
||||
WsReconnectMs *int `json:"wsReconnectMs"`
|
||||
CacheInvalidMs *int `json:"cacheInvalidateMs"`
|
||||
ExternalUrls map[string]interface{} `json:"externalUrls"`
|
||||
|
||||
LiveMap struct {
|
||||
PropagationBufferMs int `json:"propagationBufferMs"`
|
||||
} `json:"liveMap"`
|
||||
|
||||
CacheTTL map[string]interface{} `json:"cacheTTL"`
|
||||
}
|
||||
|
||||
type HealthThresholds struct {
|
||||
InfraDegradedMs int `json:"infraDegradedMs"`
|
||||
InfraSilentMs int `json:"infraSilentMs"`
|
||||
NodeDegradedMs int `json:"nodeDegradedMs"`
|
||||
NodeSilentMs int `json:"nodeSilentMs"`
|
||||
}
|
||||
|
||||
// ThemeFile mirrors theme.json overlay.
|
||||
type ThemeFile struct {
|
||||
Branding map[string]interface{} `json:"branding"`
|
||||
Theme map[string]interface{} `json:"theme"`
|
||||
ThemeDark map[string]interface{} `json:"themeDark"`
|
||||
NodeColors map[string]interface{} `json:"nodeColors"`
|
||||
TypeColors map[string]interface{} `json:"typeColors"`
|
||||
Home map[string]interface{} `json:"home"`
|
||||
}
|
||||
|
||||
func LoadConfig(baseDirs ...string) (*Config, error) {
|
||||
if len(baseDirs) == 0 {
|
||||
baseDirs = []string{"."}
|
||||
}
|
||||
paths := make([]string, 0)
|
||||
for _, d := range baseDirs {
|
||||
paths = append(paths, filepath.Join(d, "config.json"))
|
||||
paths = append(paths, filepath.Join(d, "data", "config.json"))
|
||||
}
|
||||
|
||||
cfg := &Config{Port: 3000}
|
||||
for _, p := range paths {
|
||||
data, err := os.ReadFile(p)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if err := json.Unmarshal(data, cfg); err != nil {
|
||||
continue
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
return cfg, nil // defaults
|
||||
}
|
||||
|
||||
func LoadTheme(baseDirs ...string) *ThemeFile {
|
||||
if len(baseDirs) == 0 {
|
||||
baseDirs = []string{"."}
|
||||
}
|
||||
for _, d := range baseDirs {
|
||||
for _, name := range []string{"theme.json"} {
|
||||
p := filepath.Join(d, name)
|
||||
data, err := os.ReadFile(p)
|
||||
if err != nil {
|
||||
p = filepath.Join(d, "data", name)
|
||||
data, err = os.ReadFile(p)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
var t ThemeFile
|
||||
if json.Unmarshal(data, &t) == nil {
|
||||
return &t
|
||||
}
|
||||
}
|
||||
}
|
||||
return &ThemeFile{}
|
||||
}
|
||||
|
||||
func (c *Config) GetHealthThresholds() HealthThresholds {
|
||||
h := HealthThresholds{
|
||||
InfraDegradedMs: 86400000,
|
||||
InfraSilentMs: 259200000,
|
||||
NodeDegradedMs: 3600000,
|
||||
NodeSilentMs: 86400000,
|
||||
}
|
||||
if c.HealthThresholds != nil {
|
||||
if c.HealthThresholds.InfraDegradedMs > 0 {
|
||||
h.InfraDegradedMs = c.HealthThresholds.InfraDegradedMs
|
||||
}
|
||||
if c.HealthThresholds.InfraSilentMs > 0 {
|
||||
h.InfraSilentMs = c.HealthThresholds.InfraSilentMs
|
||||
}
|
||||
if c.HealthThresholds.NodeDegradedMs > 0 {
|
||||
h.NodeDegradedMs = c.HealthThresholds.NodeDegradedMs
|
||||
}
|
||||
if c.HealthThresholds.NodeSilentMs > 0 {
|
||||
h.NodeSilentMs = c.HealthThresholds.NodeSilentMs
|
||||
}
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
||||
// GetHealthMs returns degraded/silent thresholds for a given role.
|
||||
func (h HealthThresholds) GetHealthMs(role string) (degradedMs, silentMs int) {
|
||||
if role == "repeater" || role == "room" {
|
||||
return h.InfraDegradedMs, h.InfraSilentMs
|
||||
}
|
||||
return h.NodeDegradedMs, h.NodeSilentMs
|
||||
}
|
||||
|
||||
func (c *Config) ResolveDBPath(baseDir string) string {
|
||||
if c.DBPath != "" {
|
||||
return c.DBPath
|
||||
}
|
||||
if v := os.Getenv("DB_PATH"); v != "" {
|
||||
return v
|
||||
}
|
||||
return filepath.Join(baseDir, "data", "meshcore.db")
|
||||
}
|
||||
|
||||
func (c *Config) PropagationBufferMs() int {
|
||||
if c.LiveMap.PropagationBufferMs > 0 {
|
||||
return c.LiveMap.PropagationBufferMs
|
||||
}
|
||||
return 5000
|
||||
}
|
||||
+1093
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,21 @@
|
||||
module github.com/meshcore-analyzer/server
|
||||
|
||||
go 1.22
|
||||
|
||||
require (
|
||||
github.com/gorilla/mux v1.8.1
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
modernc.org/sqlite v1.34.5
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/ncruces/go-strftime v0.1.9 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
golang.org/x/sys v0.22.0 // indirect
|
||||
modernc.org/libc v1.55.3 // indirect
|
||||
modernc.org/mathutil v1.6.0 // indirect
|
||||
modernc.org/memory v1.8.0 // indirect
|
||||
)
|
||||
@@ -0,0 +1,47 @@
|
||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||
github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo=
|
||||
github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
|
||||
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
|
||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
|
||||
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic=
|
||||
golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
|
||||
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw=
|
||||
golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc=
|
||||
modernc.org/cc/v4 v4.21.4 h1:3Be/Rdo1fpr8GrQ7IVw9OHtplU4gWbb+wNgeoBMmGLQ=
|
||||
modernc.org/cc/v4 v4.21.4/go.mod h1:HM7VJTZbUCR3rV8EYBi9wxnJ0ZBRiGE5OeGXNA0IsLQ=
|
||||
modernc.org/ccgo/v4 v4.19.2 h1:lwQZgvboKD0jBwdaeVCTouxhxAyN6iawF3STraAal8Y=
|
||||
modernc.org/ccgo/v4 v4.19.2/go.mod h1:ysS3mxiMV38XGRTTcgo0DQTeTmAO4oCmJl1nX9VFI3s=
|
||||
modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE=
|
||||
modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ=
|
||||
modernc.org/gc/v2 v2.4.1 h1:9cNzOqPyMJBvrUipmynX0ZohMhcxPtMccYgGOJdOiBw=
|
||||
modernc.org/gc/v2 v2.4.1/go.mod h1:wzN5dK1AzVGoH6XOzc3YZ+ey/jPgYHLuVckd62P0GYU=
|
||||
modernc.org/libc v1.55.3 h1:AzcW1mhlPNrRtjS5sS+eW2ISCgSOLLNyFzRh/V3Qj/U=
|
||||
modernc.org/libc v1.55.3/go.mod h1:qFXepLhz+JjFThQ4kzwzOjA/y/artDeg+pcYnY+Q83w=
|
||||
modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4=
|
||||
modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo=
|
||||
modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E=
|
||||
modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU=
|
||||
modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4=
|
||||
modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
|
||||
modernc.org/sortutil v1.2.0 h1:jQiD3PfS2REGJNzNCMMaLSp/wdMNieTbKX920Cqdgqc=
|
||||
modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss=
|
||||
modernc.org/sqlite v1.34.5 h1:Bb6SR13/fjp15jt70CL4f18JIN7p7dnMExd+UFnF15g=
|
||||
modernc.org/sqlite v1.34.5/go.mod h1:YLuNmX9NKs8wRNK2ko1LW1NGYcc9FkBO69JOt1AR9JE=
|
||||
modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA=
|
||||
modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0=
|
||||
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
|
||||
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
|
||||
@@ -0,0 +1,144 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var (
|
||||
configDir string
|
||||
port int
|
||||
dbPath string
|
||||
publicDir string
|
||||
pollMs int
|
||||
)
|
||||
|
||||
flag.StringVar(&configDir, "config-dir", ".", "Directory containing config.json")
|
||||
flag.IntVar(&port, "port", 0, "HTTP port (overrides config)")
|
||||
flag.StringVar(&dbPath, "db", "", "SQLite database path (overrides config/env)")
|
||||
flag.StringVar(&publicDir, "public", "public", "Directory to serve static files from")
|
||||
flag.IntVar(&pollMs, "poll-ms", 1000, "SQLite poll interval for WebSocket broadcast (ms)")
|
||||
flag.Parse()
|
||||
|
||||
// Load config
|
||||
cfg, err := LoadConfig(configDir)
|
||||
if err != nil {
|
||||
log.Printf("[config] warning: %v (using defaults)", err)
|
||||
}
|
||||
|
||||
// CLI flags override config
|
||||
if port > 0 {
|
||||
cfg.Port = port
|
||||
}
|
||||
if cfg.Port == 0 {
|
||||
cfg.Port = 3000
|
||||
}
|
||||
if dbPath != "" {
|
||||
cfg.DBPath = dbPath
|
||||
}
|
||||
|
||||
// Resolve DB path
|
||||
resolvedDB := cfg.ResolveDBPath(configDir)
|
||||
log.Printf("[config] port=%d db=%s public=%s", cfg.Port, resolvedDB, publicDir)
|
||||
|
||||
// Open database
|
||||
database, err := OpenDB(resolvedDB)
|
||||
if err != nil {
|
||||
log.Fatalf("[db] failed to open %s: %v", resolvedDB, err)
|
||||
}
|
||||
defer database.Close()
|
||||
|
||||
// Verify DB has expected tables
|
||||
var tableName string
|
||||
err = database.conn.QueryRow("SELECT name FROM sqlite_master WHERE type='table' AND name='transmissions'").Scan(&tableName)
|
||||
if err == sql.ErrNoRows {
|
||||
log.Fatalf("[db] table 'transmissions' not found — is this a MeshCore Analyzer database?")
|
||||
}
|
||||
|
||||
stats, err := database.GetStats()
|
||||
if err != nil {
|
||||
log.Printf("[db] warning: could not read stats: %v", err)
|
||||
} else {
|
||||
log.Printf("[db] transmissions=%d observations=%d nodes=%d observers=%d",
|
||||
stats.TotalTransmissions, stats.TotalObservations, stats.TotalNodes, stats.TotalObservers)
|
||||
}
|
||||
|
||||
// WebSocket hub
|
||||
hub := NewHub()
|
||||
|
||||
// HTTP server
|
||||
srv := NewServer(database, cfg, hub)
|
||||
router := mux.NewRouter()
|
||||
srv.RegisterRoutes(router)
|
||||
|
||||
// WebSocket endpoint
|
||||
router.HandleFunc("/ws", hub.ServeWS)
|
||||
|
||||
// Static files + SPA fallback
|
||||
absPublic, _ := filepath.Abs(publicDir)
|
||||
if _, err := os.Stat(absPublic); err == nil {
|
||||
fs := http.FileServer(http.Dir(absPublic))
|
||||
router.PathPrefix("/").Handler(spaHandler(absPublic, fs))
|
||||
log.Printf("[static] serving %s", absPublic)
|
||||
} else {
|
||||
log.Printf("[static] directory %s not found — API-only mode", absPublic)
|
||||
router.PathPrefix("/").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/html")
|
||||
w.Write([]byte(`<!DOCTYPE html><html><body><h1>MeshCore Analyzer</h1><p>Frontend not found. API available at /api/</p></body></html>`))
|
||||
})
|
||||
}
|
||||
|
||||
// Start SQLite poller for WebSocket broadcast
|
||||
poller := NewPoller(database, hub, time.Duration(pollMs)*time.Millisecond)
|
||||
go poller.Start()
|
||||
|
||||
// Graceful shutdown
|
||||
httpServer := &http.Server{
|
||||
Addr: fmt.Sprintf(":%d", cfg.Port),
|
||||
Handler: router,
|
||||
ReadTimeout: 30 * time.Second,
|
||||
WriteTimeout: 60 * time.Second,
|
||||
IdleTimeout: 120 * time.Second,
|
||||
}
|
||||
|
||||
go func() {
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigCh
|
||||
log.Println("[server] shutting down...")
|
||||
poller.Stop()
|
||||
httpServer.Close()
|
||||
}()
|
||||
|
||||
log.Printf("[server] MeshCore Analyzer (Go) listening on http://localhost:%d", cfg.Port)
|
||||
if err := httpServer.ListenAndServe(); err != http.ErrServerClosed {
|
||||
log.Fatalf("[server] %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// spaHandler serves static files, falling back to index.html for SPA routes.
|
||||
func spaHandler(root string, fs http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
path := filepath.Join(root, r.URL.Path)
|
||||
if _, err := os.Stat(path); os.IsNotExist(err) {
|
||||
http.ServeFile(w, r, filepath.Join(root, "index.html"))
|
||||
return
|
||||
}
|
||||
// Disable caching for JS/CSS/HTML
|
||||
if filepath.Ext(path) == ".js" || filepath.Ext(path) == ".css" || filepath.Ext(path) == ".html" {
|
||||
w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
|
||||
}
|
||||
fs.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,186 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 4096,
|
||||
CheckOrigin: func(r *http.Request) bool { return true },
|
||||
}
|
||||
|
||||
// Hub manages WebSocket clients and broadcasts.
|
||||
type Hub struct {
|
||||
mu sync.RWMutex
|
||||
clients map[*Client]bool
|
||||
}
|
||||
|
||||
// Client is a single WebSocket connection.
|
||||
type Client struct {
|
||||
conn *websocket.Conn
|
||||
send chan []byte
|
||||
}
|
||||
|
||||
func NewHub() *Hub {
|
||||
return &Hub{
|
||||
clients: make(map[*Client]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) ClientCount() int {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return len(h.clients)
|
||||
}
|
||||
|
||||
func (h *Hub) Register(c *Client) {
|
||||
h.mu.Lock()
|
||||
h.clients[c] = true
|
||||
h.mu.Unlock()
|
||||
log.Printf("[ws] client connected (%d total)", h.ClientCount())
|
||||
}
|
||||
|
||||
func (h *Hub) Unregister(c *Client) {
|
||||
h.mu.Lock()
|
||||
if _, ok := h.clients[c]; ok {
|
||||
delete(h.clients, c)
|
||||
close(c.send)
|
||||
}
|
||||
h.mu.Unlock()
|
||||
log.Printf("[ws] client disconnected (%d total)", h.ClientCount())
|
||||
}
|
||||
|
||||
// Broadcast sends a message to all connected clients.
|
||||
func (h *Hub) Broadcast(msg interface{}) {
|
||||
data, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
log.Printf("[ws] marshal error: %v", err)
|
||||
return
|
||||
}
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for c := range h.clients {
|
||||
select {
|
||||
case c.send <- data:
|
||||
default:
|
||||
// Client buffer full — drop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ServeWS handles the WebSocket upgrade and runs the client.
|
||||
func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Printf("[ws] upgrade error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
client := &Client{
|
||||
conn: conn,
|
||||
send: make(chan []byte, 256),
|
||||
}
|
||||
h.Register(client)
|
||||
|
||||
go client.writePump()
|
||||
go client.readPump(h)
|
||||
}
|
||||
|
||||
func (c *Client) readPump(hub *Hub) {
|
||||
defer func() {
|
||||
hub.Unregister(c)
|
||||
c.conn.Close()
|
||||
}()
|
||||
c.conn.SetReadLimit(512)
|
||||
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
c.conn.SetPongHandler(func(string) error {
|
||||
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
return nil
|
||||
})
|
||||
for {
|
||||
_, _, err := c.conn.ReadMessage()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) writePump() {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
c.conn.Close()
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case message, ok := <-c.send:
|
||||
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if !ok {
|
||||
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
return
|
||||
}
|
||||
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
|
||||
return
|
||||
}
|
||||
case <-ticker.C:
|
||||
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Poller watches for new transmissions in SQLite and broadcasts them.
|
||||
type Poller struct {
|
||||
db *DB
|
||||
hub *Hub
|
||||
interval time.Duration
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
func NewPoller(db *DB, hub *Hub, interval time.Duration) *Poller {
|
||||
return &Poller{db: db, hub: hub, interval: interval, stop: make(chan struct{})}
|
||||
}
|
||||
|
||||
func (p *Poller) Start() {
|
||||
lastID := p.db.GetMaxTransmissionID()
|
||||
log.Printf("[poller] starting from transmission ID %d, interval %v", lastID, p.interval)
|
||||
|
||||
ticker := time.NewTicker(p.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
newTxs, err := p.db.GetNewTransmissionsSince(lastID, 100)
|
||||
if err != nil {
|
||||
log.Printf("[poller] error: %v", err)
|
||||
continue
|
||||
}
|
||||
for _, tx := range newTxs {
|
||||
id, _ := tx["id"].(int)
|
||||
if id > lastID {
|
||||
lastID = id
|
||||
}
|
||||
p.hub.Broadcast(map[string]interface{}{
|
||||
"type": "packet",
|
||||
"data": tx,
|
||||
})
|
||||
}
|
||||
case <-p.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Poller) Stop() {
|
||||
close(p.stop)
|
||||
}
|
||||
Reference in New Issue
Block a user