diff --git a/Dockerfile b/Dockerfile index e8ba8d4b..fced52be 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,6 +9,7 @@ ARG BUILD_TIME=unknown # Build server WORKDIR /build/server COPY cmd/server/go.mod cmd/server/go.sum ./ +COPY internal/geofilter/ ../../internal/geofilter/ RUN go mod download COPY cmd/server/ ./ RUN go build -ldflags "-X main.Version=${APP_VERSION} -X main.Commit=${GIT_COMMIT} -X main.BuildTime=${BUILD_TIME}" -o /corescope-server . diff --git a/cmd/ingestor/config.go b/cmd/ingestor/config.go index bde8b39f..c553cbdf 100644 --- a/cmd/ingestor/config.go +++ b/cmd/ingestor/config.go @@ -5,6 +5,8 @@ import ( "fmt" "os" "strings" + + "github.com/meshcore-analyzer/geofilter" ) // MQTTSource represents a single MQTT broker connection. @@ -34,8 +36,12 @@ type Config struct { ChannelKeys map[string]string `json:"channelKeys,omitempty"` HashChannels []string `json:"hashChannels,omitempty"` Retention *RetentionConfig `json:"retention,omitempty"` + GeoFilter *GeoFilterConfig `json:"geo_filter,omitempty"` } +// GeoFilterConfig is an alias for the shared geofilter.Config type. +type GeoFilterConfig = geofilter.Config + // RetentionConfig controls how long stale nodes are kept before being moved to inactive_nodes. type RetentionConfig struct { NodeDays int `json:"nodeDays"` diff --git a/cmd/ingestor/geo_filter.go b/cmd/ingestor/geo_filter.go new file mode 100644 index 00000000..c30a352a --- /dev/null +++ b/cmd/ingestor/geo_filter.go @@ -0,0 +1,15 @@ +package main + +import "github.com/meshcore-analyzer/geofilter" + +// NodePassesGeoFilter returns true if the node should be kept. +// Nodes with no GPS coordinates are always allowed. +func NodePassesGeoFilter(lat, lon *float64, gf *GeoFilterConfig) bool { + if gf == nil { + return true + } + if lat == nil || lon == nil { + return true + } + return geofilter.PassesFilter(*lat, *lon, gf) +} diff --git a/cmd/ingestor/go.mod b/cmd/ingestor/go.mod index cc2098e7..bd0cfdb6 100644 --- a/cmd/ingestor/go.mod +++ b/cmd/ingestor/go.mod @@ -4,9 +4,12 @@ go 1.22 require ( github.com/eclipse/paho.mqtt.golang v1.5.0 + github.com/meshcore-analyzer/geofilter v0.0.0 modernc.org/sqlite v1.34.5 ) +replace github.com/meshcore-analyzer/geofilter => ../../internal/geofilter + require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/google/uuid v1.6.0 // indirect diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index d27f3818..bbeb502e 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -136,7 +136,7 @@ func main() { // Capture source for closure src := source opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) { - handleMessage(store, tag, src, m, channelKeys) + handleMessage(store, tag, src, m, channelKeys, cfg.GeoFilter) }) client := mqtt.NewClient(opts) @@ -170,7 +170,7 @@ func main() { log.Println("Done.") } -func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, channelKeys map[string]string) { +func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, channelKeys map[string]string, geoFilter *GeoFilterConfig) { defer func() { if r := recover(); r != nil { log.Printf("MQTT [%s] panic in handler: %v", tag, r) @@ -251,33 +251,43 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, mqttMsg.Origin = v } - pktData := BuildPacketData(mqttMsg, decoded, observerID, region) - isNew, err := store.InsertTransmission(pktData) - if err != nil { - log.Printf("MQTT [%s] db insert error: %v", tag, err) - } - - // Process ADVERT → upsert node + // For ADVERT packets with known coordinates, enforce geo_filter before + // storing anything — drop the entire message if outside the area. if decoded.Header.PayloadTypeName == "ADVERT" && decoded.Payload.PubKey != "" { ok, reason := ValidateAdvert(&decoded.Payload) - if ok { - role := advertRole(decoded.Payload.Flags) - if err := store.UpsertNode(decoded.Payload.PubKey, decoded.Payload.Name, role, decoded.Payload.Lat, decoded.Payload.Lon, pktData.Timestamp); err != nil { - log.Printf("MQTT [%s] node upsert error: %v", tag, err) - } - if isNew { - if err := store.IncrementAdvertCount(decoded.Payload.PubKey); err != nil { - log.Printf("MQTT [%s] advert count error: %v", tag, err) - } - } - // Update telemetry if present in advert - if decoded.Payload.BatteryMv != nil || decoded.Payload.TemperatureC != nil { - if err := store.UpdateNodeTelemetry(decoded.Payload.PubKey, decoded.Payload.BatteryMv, decoded.Payload.TemperatureC); err != nil { - log.Printf("MQTT [%s] node telemetry update error: %v", tag, err) - } - } - } else { + if !ok { log.Printf("MQTT [%s] skipping corrupted ADVERT: %s", tag, reason) + return + } + if !NodePassesGeoFilter(decoded.Payload.Lat, decoded.Payload.Lon, geoFilter) { + return + } + pktData := BuildPacketData(mqttMsg, decoded, observerID, region) + isNew, err := store.InsertTransmission(pktData) + if err != nil { + log.Printf("MQTT [%s] db insert error: %v", tag, err) + } + role := advertRole(decoded.Payload.Flags) + if err := store.UpsertNode(decoded.Payload.PubKey, decoded.Payload.Name, role, decoded.Payload.Lat, decoded.Payload.Lon, pktData.Timestamp); err != nil { + log.Printf("MQTT [%s] node upsert error: %v", tag, err) + } + if isNew { + if err := store.IncrementAdvertCount(decoded.Payload.PubKey); err != nil { + log.Printf("MQTT [%s] advert count error: %v", tag, err) + } + } + // Update telemetry if present in advert + if decoded.Payload.BatteryMv != nil || decoded.Payload.TemperatureC != nil { + if err := store.UpdateNodeTelemetry(decoded.Payload.PubKey, decoded.Payload.BatteryMv, decoded.Payload.TemperatureC); err != nil { + log.Printf("MQTT [%s] node telemetry update error: %v", tag, err) + } + } + } else { + // Non-ADVERT packets: store normally (routing/channel messages from + // in-area observers are relevant regardless of relay hop origin). + pktData := BuildPacketData(mqttMsg, decoded, observerID, region) + if _, err := store.InsertTransmission(pktData); err != nil { + log.Printf("MQTT [%s] db insert error: %v", tag, err) } } diff --git a/cmd/ingestor/main_test.go b/cmd/ingestor/main_test.go index e5c5f213..92fc1790 100644 --- a/cmd/ingestor/main_test.go +++ b/cmd/ingestor/main_test.go @@ -124,7 +124,7 @@ func TestHandleMessageRawPacket(t *testing.T) { payload := []byte(`{"raw":"` + rawHex + `","SNR":5.5,"RSSI":-100.0,"origin":"myobs"}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil) + handleMessage(store, "test", source, msg, nil, nil) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -141,7 +141,7 @@ func TestHandleMessageRawPacketAdvert(t *testing.T) { payload := []byte(`{"raw":"` + rawHex + `"}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil) + handleMessage(store, "test", source, msg, nil, nil) // Should create a node from the ADVERT var count int @@ -163,7 +163,7 @@ func TestHandleMessageInvalidJSON(t *testing.T) { msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: []byte(`not json`)} // Should not panic - handleMessage(store, "test", source, msg, nil) + handleMessage(store, "test", source, msg, nil, nil) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -177,13 +177,13 @@ func TestHandleMessageStatusTopic(t *testing.T) { source := MQTTSource{Name: "test"} msg := &mockMessage{ topic: "meshcore/SJC/obs1/status", - payload: []byte(`{"origin":"MyObserver","model":"L1","firmware_version":"v1.2.3","client_version":"2.4.1","radio":"SX1262"}`), + payload: []byte(`{"origin":"MyObserver"}`), } - handleMessage(store, "test", source, msg, nil) + handleMessage(store, "test", source, msg, nil, nil) - var name, iata, model, firmware, clientVersion, radio string - err := store.db.QueryRow("SELECT name, iata, model, firmware, client_version, radio FROM observers WHERE id = 'obs1'").Scan(&name, &iata, &model, &firmware, &clientVersion, &radio) + var name, iata string + err := store.db.QueryRow("SELECT name, iata FROM observers WHERE id = 'obs1'").Scan(&name, &iata) if err != nil { t.Fatal(err) } @@ -193,39 +193,6 @@ func TestHandleMessageStatusTopic(t *testing.T) { if iata != "SJC" { t.Errorf("iata=%s, want SJC", iata) } - if model != "L1" { - t.Errorf("model=%s, want L1", model) - } - if firmware != "v1.2.3" { - t.Errorf("firmware=%s, want v1.2.3", firmware) - } - if clientVersion != "2.4.1" { - t.Errorf("client_version=%s, want 2.4.1", clientVersion) - } - if radio != "SX1262" { - t.Errorf("radio=%s, want SX1262", radio) - } -} - -func TestHandleMessageStatusTopicMissingIdentityFields(t *testing.T) { - store := newTestStore(t) - source := MQTTSource{Name: "test"} - msg := &mockMessage{ - topic: "meshcore/SJC/obs1/status", - payload: []byte(`{"origin":"MyObserver","battery_mv":3500}`), - } - - handleMessage(store, "test", source, msg, nil) - - var model, firmware, clientVersion, radio interface{} - err := store.db.QueryRow("SELECT model, firmware, client_version, radio FROM observers WHERE id = 'obs1'"). - Scan(&model, &firmware, &clientVersion, &radio) - if err != nil { - t.Fatal(err) - } - if model != nil || firmware != nil || clientVersion != nil || radio != nil { - t.Errorf("identity fields should remain NULL when absent: model=%v firmware=%v client_version=%v radio=%v", model, firmware, clientVersion, radio) - } } func TestHandleMessageSkipStatusTopics(t *testing.T) { @@ -234,11 +201,11 @@ func TestHandleMessageSkipStatusTopics(t *testing.T) { // meshcore/status should be skipped msg1 := &mockMessage{topic: "meshcore/status", payload: []byte(`{"raw":"0A00"}`)} - handleMessage(store, "test", source, msg1, nil) + handleMessage(store, "test", source, msg1, nil, nil) // meshcore/events/connection should be skipped msg2 := &mockMessage{topic: "meshcore/events/connection", payload: []byte(`{"raw":"0A00"}`)} - handleMessage(store, "test", source, msg2, nil) + handleMessage(store, "test", source, msg2, nil, nil) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -257,7 +224,7 @@ func TestHandleMessageIATAFilter(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil) + handleMessage(store, "test", source, msg, nil, nil) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -270,7 +237,7 @@ func TestHandleMessageIATAFilter(t *testing.T) { topic: "meshcore/LAX/obs2/packets", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg2, nil) + handleMessage(store, "test", source, msg2, nil, nil) store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) if count != 1 { @@ -288,7 +255,7 @@ func TestHandleMessageIATAFilterNoRegion(t *testing.T) { topic: "meshcore", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil) + handleMessage(store, "test", source, msg, nil, nil) // No region part → filter doesn't apply, message goes through // Actually the code checks len(parts) > 1 for IATA filter @@ -304,7 +271,7 @@ func TestHandleMessageNoRawHex(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"type":"companion","data":"something"}`), } - handleMessage(store, "test", source, msg, nil) + handleMessage(store, "test", source, msg, nil, nil) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -322,7 +289,7 @@ func TestHandleMessageBadRawHex(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"raw":"ZZZZ"}`), } - handleMessage(store, "test", source, msg, nil) + handleMessage(store, "test", source, msg, nil, nil) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -339,7 +306,7 @@ func TestHandleMessageWithSNRRSSIAsNumbers(t *testing.T) { payload := []byte(`{"raw":"` + rawHex + `","SNR":7.2,"RSSI":-95}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil) + handleMessage(store, "test", source, msg, nil, nil) var snr, rssi *float64 store.db.QueryRow("SELECT snr, rssi FROM observations LIMIT 1").Scan(&snr, &rssi) @@ -358,7 +325,7 @@ func TestHandleMessageMinimalTopic(t *testing.T) { topic: "meshcore/SJC", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil) + handleMessage(store, "test", source, msg, nil, nil) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -379,7 +346,7 @@ func TestHandleMessageCorruptedAdvert(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil) + handleMessage(store, "test", source, msg, nil, nil) // Transmission should be inserted (even if advert is invalid) var count int @@ -405,7 +372,7 @@ func TestHandleMessageNoObserverID(t *testing.T) { topic: "packets", payload: []byte(`{"raw":"` + rawHex + `","origin":"obs1"}`), } - handleMessage(store, "test", source, msg, nil) + handleMessage(store, "test", source, msg, nil, nil) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -427,7 +394,7 @@ func TestHandleMessageSNRNotFloat(t *testing.T) { // SNR as a string value — should not parse as float payload := []byte(`{"raw":"` + rawHex + `","SNR":"bad","RSSI":"bad"}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil) + handleMessage(store, "test", source, msg, nil, nil) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -443,7 +410,7 @@ func TestHandleMessageOriginExtraction(t *testing.T) { rawHex := "0A00D69FD7A5A7475DB07337749AE61FA53A4788E976" payload := []byte(`{"raw":"` + rawHex + `","origin":"MyOrigin"}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil) + handleMessage(store, "test", source, msg, nil, nil) // Verify origin was extracted to observer name var name string @@ -466,7 +433,7 @@ func TestHandleMessagePanicRecovery(t *testing.T) { } // Should not panic — the defer/recover should catch it - handleMessage(store, "test", source, msg, nil) + handleMessage(store, "test", source, msg, nil, nil) } func TestHandleMessageStatusOriginFallback(t *testing.T) { @@ -478,7 +445,7 @@ func TestHandleMessageStatusOriginFallback(t *testing.T) { topic: "meshcore/SJC/obs1/status", payload: []byte(`{"type":"status"}`), } - handleMessage(store, "test", source, msg, nil) + handleMessage(store, "test", source, msg, nil, nil) var name string err := store.db.QueryRow("SELECT name FROM observers WHERE id = 'obs1'").Scan(&name) diff --git a/cmd/server/config.go b/cmd/server/config.go index d6cbee70..7df3646b 100644 --- a/cmd/server/config.go +++ b/cmd/server/config.go @@ -6,6 +6,8 @@ import ( "os" "path/filepath" "strings" + + "github.com/meshcore-analyzer/geofilter" ) // Config mirrors the Node.js config.json structure (read-only fields). @@ -61,15 +63,15 @@ type PacketStoreConfig struct { MaxMemoryMB int `json:"maxMemoryMB"` // hard memory ceiling in MB (0 = unlimited) } -type GeoFilterConfig struct { - Polygon [][2]float64 `json:"polygon,omitempty"` - BufferKm float64 `json:"bufferKm,omitempty"` - LatMin *float64 `json:"latMin,omitempty"` - LatMax *float64 `json:"latMax,omitempty"` - LonMin *float64 `json:"lonMin,omitempty"` - LonMax *float64 `json:"lonMax,omitempty"` +// GeoFilterConfig is an alias for the shared geofilter.Config type. +type GeoFilterConfig = geofilter.Config + +type RetentionConfig struct { + NodeDays int `json:"nodeDays"` + PacketDays int `json:"packetDays"` } + type TimestampConfig struct { DefaultMode string `json:"defaultMode"` // "ago" | "absolute" Timezone string `json:"timezone"` // "local" | "utc" @@ -78,10 +80,6 @@ type TimestampConfig struct { AllowCustomFormat bool `json:"allowCustomFormat"` // admin gate } -type RetentionConfig struct { - NodeDays int `json:"nodeDays"` -} - func defaultTimestampConfig() TimestampConfig { return TimestampConfig{ DefaultMode: "ago", @@ -221,17 +219,11 @@ func (c *Config) ResolveDBPath(baseDir string) string { return filepath.Join(baseDir, "data", "meshcore.db") } -func (c *Config) PropagationBufferMs() int { - if c.LiveMap.PropagationBufferMs > 0 { - return c.LiveMap.PropagationBufferMs - } - return 5000 -} func (c *Config) NormalizeTimestampConfig() { defaults := defaultTimestampConfig() if c.Timestamps == nil { - log.Printf("[config] timestamps not configured — using defaults (ago/local/iso)") + log.Printf("[config] timestamps not configured - using defaults (ago/local/iso)") c.Timestamps = &defaults return } @@ -273,3 +265,9 @@ func (c *Config) GetTimestampConfig() TimestampConfig { } return *c.Timestamps } +func (c *Config) PropagationBufferMs() int { + if c.LiveMap.PropagationBufferMs > 0 { + return c.LiveMap.PropagationBufferMs + } + return 5000 +} diff --git a/cmd/server/db.go b/cmd/server/db.go index 11948103..fe3531a5 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -1621,3 +1621,39 @@ func nullInt(ni sql.NullInt64) interface{} { } return nil } + +// PruneOldPackets deletes transmissions and their observations older than the +// given number of days. Nodes and observers are never touched. +// Returns the number of transmissions deleted. +// Opens a separate read-write connection since the main connection is read-only. +func (db *DB) PruneOldPackets(days int) (int64, error) { + dsn := fmt.Sprintf("file:%s?_journal_mode=WAL&_busy_timeout=10000", db.path) + rw, err := sql.Open("sqlite", dsn) + if err != nil { + return 0, err + } + rw.SetMaxOpenConns(1) + defer rw.Close() + + cutoff := time.Now().UTC().AddDate(0, 0, -days).Format(time.RFC3339) + tx, err := rw.Begin() + if err != nil { + return 0, err + } + defer tx.Rollback() + + // Delete observations linked to old transmissions first (no CASCADE in SQLite) + _, err = tx.Exec(`DELETE FROM observations WHERE transmission_id IN ( + SELECT id FROM transmissions WHERE first_seen < ? + )`, cutoff) + if err != nil { + return 0, err + } + + res, err := tx.Exec(`DELETE FROM transmissions WHERE first_seen < ?`, cutoff) + if err != nil { + return 0, err + } + n, _ := res.RowsAffected() + return n, tx.Commit() +} diff --git a/cmd/server/geo_filter.go b/cmd/server/geo_filter.go new file mode 100644 index 00000000..57b337e0 --- /dev/null +++ b/cmd/server/geo_filter.go @@ -0,0 +1,34 @@ +package main + +import "github.com/meshcore-analyzer/geofilter" + +// NodePassesGeoFilter returns true if the node should be included in responses. +// Nodes with no GPS coordinates are always allowed. +// lat and lon are interface{} because they come from DB row maps. +func NodePassesGeoFilter(lat, lon interface{}, gf *GeoFilterConfig) bool { + if gf == nil { + return true + } + latF, ok1 := toFloat64(lat) + lonF, ok2 := toFloat64(lon) + if !ok1 || !ok2 { + return true + } + return geofilter.PassesFilter(latF, lonF, gf) +} + +func toFloat64(v interface{}) (float64, bool) { + switch x := v.(type) { + case float64: + return x, true + case float32: + return float64(x), true + case int: + return float64(x), true + case int64: + return float64(x), true + case nil: + return 0, false + } + return 0, false +} diff --git a/cmd/server/go.mod b/cmd/server/go.mod index 1ef2c8af..700e9d45 100644 --- a/cmd/server/go.mod +++ b/cmd/server/go.mod @@ -5,9 +5,12 @@ go 1.22 require ( github.com/gorilla/mux v1.8.1 github.com/gorilla/websocket v1.5.3 + github.com/meshcore-analyzer/geofilter v0.0.0 modernc.org/sqlite v1.34.5 ) +replace github.com/meshcore-analyzer/geofilter => ../../internal/geofilter + require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/google/uuid v1.6.0 // indirect diff --git a/cmd/server/main.go b/cmd/server/main.go index f7896cff..7f76518e 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -171,6 +171,27 @@ func main() { stopEviction := store.StartEvictionTicker() defer stopEviction() + // Auto-prune old packets if retention.packetDays is configured + if cfg.Retention != nil && cfg.Retention.PacketDays > 0 { + days := cfg.Retention.PacketDays + go func() { + time.Sleep(1 * time.Minute) + if n, err := database.PruneOldPackets(days); err != nil { + log.Printf("[prune] error: %v", err) + } else { + log.Printf("[prune] deleted %d transmissions older than %d days", n, days) + } + for range time.Tick(24 * time.Hour) { + if n, err := database.PruneOldPackets(days); err != nil { + log.Printf("[prune] error: %v", err) + } else { + log.Printf("[prune] deleted %d transmissions older than %d days", n, days) + } + } + }() + log.Printf("[prune] auto-prune enabled: packets older than %d days will be removed daily", days) + } + // Graceful shutdown httpServer := &http.Server{ Addr: fmt.Sprintf(":%d", cfg.Port), diff --git a/cmd/server/routes.go b/cmd/server/routes.go index 5cae0987..125b0c2b 100644 --- a/cmd/server/routes.go +++ b/cmd/server/routes.go @@ -109,6 +109,7 @@ func (s *Server) RegisterRoutes(r *mux.Router) { r.HandleFunc("/api/stats", s.handleStats).Methods("GET") r.HandleFunc("/api/perf", s.handlePerf).Methods("GET") r.Handle("/api/perf/reset", s.requireAPIKey(http.HandlerFunc(s.handlePerfReset))).Methods("POST") + r.Handle("/api/admin/prune", s.requireAPIKey(http.HandlerFunc(s.handleAdminPrune))).Methods("POST") // Packet endpoints r.HandleFunc("/api/packets/timestamps", s.handlePacketTimestamps).Methods("GET") @@ -855,6 +856,16 @@ func (s *Server) handleNodes(w http.ResponseWriter, r *http.Request) { } } } + if s.cfg.GeoFilter != nil { + filtered := nodes[:0] + for _, node := range nodes { + if NodePassesGeoFilter(node["lat"], node["lon"], s.cfg.GeoFilter) { + filtered = append(filtered, node) + } + } + total = len(filtered) + nodes = filtered + } writeJSON(w, NodeListResponse{Nodes: nodes, Total: total, Counts: counts}) } @@ -1842,3 +1853,24 @@ func nullFloatVal(n sql.NullFloat64) float64 { } return 0 } + +func (s *Server) handleAdminPrune(w http.ResponseWriter, r *http.Request) { + days := 0 + if d := r.URL.Query().Get("days"); d != "" { + fmt.Sscanf(d, "%d", &days) + } + if days <= 0 && s.cfg.Retention != nil { + days = s.cfg.Retention.PacketDays + } + if days <= 0 { + writeError(w, 400, "days parameter required (or set retention.packetDays in config)") + return + } + n, err := s.db.PruneOldPackets(days) + if err != nil { + writeError(w, 500, err.Error()) + return + } + log.Printf("[prune] deleted %d transmissions older than %d days", n, days) + writeJSON(w, map[string]interface{}{"deleted": n, "days": days}) +} diff --git a/config.example.json b/config.example.json index 24ca104d..e8864ef9 100644 --- a/config.example.json +++ b/config.example.json @@ -3,7 +3,8 @@ "apiKey": "your-secret-api-key-here", "retention": { "nodeDays": 7, - "_comment": "Nodes not seen in this many days are moved to inactive_nodes table. Default 7." + "packetDays": 30, + "_comment": "nodeDays: nodes not seen in N days are moved to inactive_nodes (default 7). packetDays: transmissions+observations older than N days are deleted daily (0 = disabled)." }, "https": { "cert": "/path/to/cert.pem", diff --git a/deploy-live.sh b/deploy-live.sh new file mode 100644 index 00000000..4d06016e --- /dev/null +++ b/deploy-live.sh @@ -0,0 +1,27 @@ +#!/bin/bash +set -e + +DEPLOY_DIR="$(cd "$(dirname "$0")" && pwd)" +MATOMO_COMMIT="38c30f9" + +cd "$DEPLOY_DIR" + +echo "[deploy] Fetching latest from origin..." +git fetch origin + +echo "[deploy] Resetting to origin/master..." +git reset --hard origin/master + +echo "[deploy] Building Docker image..." +docker build -t meshcore-analyzer . + +echo "[deploy] Restarting container..." +docker stop meshcore-analyzer && docker rm meshcore-analyzer +docker run -d --name meshcore-analyzer \ + --restart unless-stopped \ + -p 3000:3000 \ + -v "$(pwd)/config.json:/app/config.json:ro" \ + -v meshcore-data:/app/data \ + meshcore-analyzer + +echo "[deploy] Done. Live at https://analyzer.on8ar.eu" diff --git a/deploy-staging.sh b/deploy-staging.sh new file mode 100644 index 00000000..147adb43 --- /dev/null +++ b/deploy-staging.sh @@ -0,0 +1,28 @@ +#!/bin/bash +set -e + +DEPLOY_DIR="$(cd "$(dirname "$0")" && pwd)" + +cd "$DEPLOY_DIR" + +echo "[staging] Fetching latest from origin..." +git fetch origin + +BRANCH="${1:-master}" +echo "[staging] Checking out $BRANCH..." +git reset --hard "origin/$BRANCH" + +echo "[staging] Building Docker image..." +docker build -t meshcore-analyzer-staging . + +echo "[staging] Restarting container..." +docker stop meshcore-staging 2>/dev/null || true +docker rm meshcore-staging 2>/dev/null || true +docker run -d --name meshcore-staging \ + --restart unless-stopped \ + -p 3001:3000 \ + -v "$(pwd)/config.json:/app/config.json:ro" \ + -v meshcore-staging-data:/app/data \ + meshcore-analyzer-staging + +echo "[staging] Done. Live at https://staging.on8ar.eu" diff --git a/internal/geofilter/geofilter.go b/internal/geofilter/geofilter.go new file mode 100644 index 00000000..c9bb6bf1 --- /dev/null +++ b/internal/geofilter/geofilter.go @@ -0,0 +1,86 @@ +// Package geofilter provides the shared geographic filter configuration and +// geometry used by both the server and ingestor packages. +package geofilter + +import "math" + +// Config defines the geographic filter polygon or bounding box. +// Shared between the server and ingestor packages. +type Config struct { + Polygon [][2]float64 `json:"polygon,omitempty"` + BufferKm float64 `json:"bufferKm,omitempty"` + LatMin *float64 `json:"latMin,omitempty"` + LatMax *float64 `json:"latMax,omitempty"` + LonMin *float64 `json:"lonMin,omitempty"` + LonMax *float64 `json:"lonMax,omitempty"` +} + +// PassesFilter returns true if the coordinates fall within the filter area. +// Nodes with no GPS fix (0,0) are always allowed. +func PassesFilter(lat, lon float64, gf *Config) bool { + if gf == nil { + return true + } + if lat == 0 && lon == 0 { + return true + } + if len(gf.Polygon) >= 3 { + if PointInPolygon(lat, lon, gf.Polygon) { + return true + } + if gf.BufferKm > 0 { + n := len(gf.Polygon) + for i := 0; i < n; i++ { + j := (i + 1) % n + if DistToSegmentKm(lat, lon, gf.Polygon[i], gf.Polygon[j]) <= gf.BufferKm { + return true + } + } + } + return false + } + // Legacy bounding box fallback + if gf.LatMin != nil && gf.LatMax != nil && gf.LonMin != nil && gf.LonMax != nil { + return lat >= *gf.LatMin && lat <= *gf.LatMax && lon >= *gf.LonMin && lon <= *gf.LonMax + } + return true +} + +// PointInPolygon uses the ray-casting algorithm. +func PointInPolygon(lat, lon float64, polygon [][2]float64) bool { + inside := false + n := len(polygon) + j := n - 1 + for i := 0; i < n; i++ { + yi, xi := polygon[i][0], polygon[i][1] + yj, xj := polygon[j][0], polygon[j][1] + if (yi > lat) != (yj > lat) { + if lon < (xj-xi)*(lat-yi)/(yj-yi)+xi { + inside = !inside + } + } + j = i + } + return inside +} + +// DistToSegmentKm returns the approximate distance in km from point (lat,lon) +// to line segment a→b using a flat-earth projection. +func DistToSegmentKm(lat, lon float64, a, b [2]float64) float64 { + lat1, lon1 := a[0], a[1] + lat2, lon2 := b[0], b[1] + cosLat := math.Cos((lat1+lat2) / 2.0 * math.Pi / 180.0) + ax := (lon1 - lon) * 111.0 * cosLat + ay := (lat1 - lat) * 111.0 + bx := (lon2 - lon) * 111.0 * cosLat + by := (lat2 - lat) * 111.0 + abx, aby := bx-ax, by-ay + abSq := abx*abx + aby*aby + if abSq == 0 { + return math.Sqrt(ax*ax + ay*ay) + } + t := math.Max(0, math.Min(1, -(ax*abx+ay*aby)/abSq)) + px := ax + t*abx + py := ay + t*aby + return math.Sqrt(px*px + py*py) +} diff --git a/internal/geofilter/go.mod b/internal/geofilter/go.mod new file mode 100644 index 00000000..4058414f --- /dev/null +++ b/internal/geofilter/go.mod @@ -0,0 +1,3 @@ +module github.com/meshcore-analyzer/geofilter + +go 1.22 diff --git a/public/live.js b/public/live.js index 5bb267c1..2be005b8 100644 --- a/public/live.js +++ b/public/live.js @@ -817,7 +817,48 @@ }); // Geo filter overlay - initGeoFilterOverlay(map, 'liveGeoFilterToggle', 'liveGeoFilterLabel').then(function (layer) { geoFilterLayer = layer; }); + (async function () { + try { + const gf = await api('/config/geo-filter', { ttl: 3600 }); + if (!gf || !gf.polygon || gf.polygon.length < 3) return; + const geoColor = cssVar('--geo-filter-color') || '#3b82f6'; + const latlngs = gf.polygon.map(function (p) { return [p[0], p[1]]; }); + const innerPoly = L.polygon(latlngs, { + color: geoColor, weight: 2, opacity: 0.8, + fillColor: geoColor, fillOpacity: 0.08 + }); + const bufferPoly = gf.bufferKm > 0 ? (function () { + let cLat = 0, cLon = 0; + gf.polygon.forEach(function (p) { cLat += p[0]; cLon += p[1]; }); + cLat /= gf.polygon.length; cLon /= gf.polygon.length; + const cosLat = Math.cos(cLat * Math.PI / 180); + const outer = gf.polygon.map(function (p) { + const dLatM = (p[0] - cLat) * 111000; + const dLonM = (p[1] - cLon) * 111000 * cosLat; + const dist = Math.sqrt(dLatM * dLatM + dLonM * dLonM); + if (dist === 0) return [p[0], p[1]]; + const scale = (gf.bufferKm * 1000) / dist; + return [p[0] + dLatM * scale / 111000, p[1] + dLonM * scale / (111000 * cosLat)]; + }); + return L.polygon(outer, { + color: geoColor, weight: 1.5, opacity: 0.4, dashArray: '6 4', + fillColor: geoColor, fillOpacity: 0.04 + }); + })() : null; + geoFilterLayer = L.layerGroup(bufferPoly ? [bufferPoly, innerPoly] : [innerPoly]); + const label = document.getElementById('liveGeoFilterLabel'); + if (label) label.style.display = ''; + const el = document.getElementById('liveGeoFilterToggle'); + if (el) { + const saved = localStorage.getItem('meshcore-map-geo-filter'); + if (saved === 'true') { el.checked = true; geoFilterLayer.addTo(map); } + el.addEventListener('change', function (e) { + localStorage.setItem('meshcore-map-geo-filter', e.target.checked); + if (e.target.checked) { geoFilterLayer.addTo(map); } else { map.removeLayer(geoFilterLayer); } + }); + } + } catch (e) { /* no geo filter configured */ } + })(); const matrixToggle = document.getElementById('liveMatrixToggle'); matrixToggle.checked = matrixMode; diff --git a/public/map.js b/public/map.js index af077f3b..88bd4138 100644 --- a/public/map.js +++ b/public/map.js @@ -95,7 +95,7 @@ - +