diff --git a/cmd/ingestor/config.go b/cmd/ingestor/config.go index 007d2d62..c39730a0 100644 --- a/cmd/ingestor/config.go +++ b/cmd/ingestor/config.go @@ -47,8 +47,9 @@ type GeoFilterConfig = geofilter.Config // RetentionConfig controls how long stale nodes are kept before being moved to inactive_nodes. type RetentionConfig struct { - NodeDays int `json:"nodeDays"` - MetricsDays int `json:"metricsDays"` + NodeDays int `json:"nodeDays"` + ObserverDays int `json:"observerDays"` + MetricsDays int `json:"metricsDays"` } // MetricsConfig controls observer metrics collection. @@ -80,6 +81,15 @@ func (c *Config) NodeDaysOrDefault() int { return 7 } +// ObserverDaysOrDefault returns the configured retention.observerDays or 14 if not set. +// A value of -1 means observers are never removed. +func (c *Config) ObserverDaysOrDefault() int { + if c.Retention != nil && c.Retention.ObserverDays != 0 { + return c.Retention.ObserverDays + } + return 14 +} + // LoadConfig reads configuration from a JSON file, with env var overrides. // If the config file does not exist, sensible defaults are used (zero-config startup). func LoadConfig(path string) (*Config, error) { diff --git a/cmd/ingestor/coverage_boost_test.go b/cmd/ingestor/coverage_boost_test.go index a4dea830..292f080a 100644 --- a/cmd/ingestor/coverage_boost_test.go +++ b/cmd/ingestor/coverage_boost_test.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "encoding/json" "testing" + "time" ) // hmacSHA256 computes HMAC-SHA256 for test use. @@ -1138,3 +1139,182 @@ func TestDecodeTraceWithPath(t *testing.T) { t.Errorf("flags=%v, want 3", p.TraceFlags) } } + +// --- db.go: RemoveStaleObservers (soft-delete) --- + +func TestRemoveStaleObservers(t *testing.T) { + store := newTestStore(t) + + // Insert an observer with last_seen 30 days ago + err := store.UpsertObserver("obs-old", "OldObserver", "LAX", nil) + if err != nil { + t.Fatal(err) + } + // Override last_seen to 30 days ago + cutoff := time.Now().UTC().AddDate(0, 0, -30).Format(time.RFC3339) + _, err = store.db.Exec("UPDATE observers SET last_seen = ? WHERE id = ?", cutoff, "obs-old") + if err != nil { + t.Fatal(err) + } + + // Insert a recent observer + err = store.UpsertObserver("obs-new", "NewObserver", "NYC", nil) + if err != nil { + t.Fatal(err) + } + + removed, err := store.RemoveStaleObservers(14) + if err != nil { + t.Fatal(err) + } + if removed != 1 { + t.Errorf("removed=%d, want 1", removed) + } + + // Observer should still be in the table (soft-delete), but marked inactive + var count int + if err := store.db.QueryRow("SELECT COUNT(*) FROM observers").Scan(&count); err != nil { + t.Fatal(err) + } + if count != 2 { + t.Errorf("observers count=%d, want 2 (soft-delete preserves row)", count) + } + + // Check that the old observer is marked inactive + var inactive int + if err := store.db.QueryRow("SELECT inactive FROM observers WHERE id = ?", "obs-old").Scan(&inactive); err != nil { + t.Fatal(err) + } + if inactive != 1 { + t.Errorf("obs-old inactive=%d, want 1", inactive) + } + + // Check that the recent observer is still active + var newInactive int + if err := store.db.QueryRow("SELECT inactive FROM observers WHERE id = ?", "obs-new").Scan(&newInactive); err != nil { + t.Fatal(err) + } + if newInactive != 0 { + t.Errorf("obs-new inactive=%d, want 0", newInactive) + } +} + +func TestRemoveStaleObserversNone(t *testing.T) { + store := newTestStore(t) + + removed, err := store.RemoveStaleObservers(14) + if err != nil { + t.Fatal(err) + } + if removed != 0 { + t.Errorf("removed=%d, want 0", removed) + } +} + +func TestRemoveStaleObserversKeepForever(t *testing.T) { + store := newTestStore(t) + + // Insert an old observer + err := store.UpsertObserver("obs-ancient", "AncientObserver", "LAX", nil) + if err != nil { + t.Fatal(err) + } + cutoff := time.Now().UTC().AddDate(0, 0, -365).Format(time.RFC3339) + _, err = store.db.Exec("UPDATE observers SET last_seen = ? WHERE id = ?", cutoff, "obs-ancient") + if err != nil { + t.Fatal(err) + } + + // observerDays = -1 means keep forever + removed, err := store.RemoveStaleObservers(-1) + if err != nil { + t.Fatal(err) + } + if removed != 0 { + t.Errorf("removed=%d, want 0 (keep forever)", removed) + } + + var count int + if err := store.db.QueryRow("SELECT COUNT(*) FROM observers").Scan(&count); err != nil { + t.Fatal(err) + } + if count != 1 { + t.Errorf("observers count=%d, want 1 (keep forever)", count) + } + + // Observer should NOT be marked inactive + var inactive int + if err := store.db.QueryRow("SELECT inactive FROM observers WHERE id = ?", "obs-ancient").Scan(&inactive); err != nil { + t.Fatal(err) + } + if inactive != 0 { + t.Errorf("obs-ancient inactive=%d, want 0 (keep forever)", inactive) + } +} + +func TestRemoveStaleObserversReactivation(t *testing.T) { + store := newTestStore(t) + + // Insert and stale-mark an observer + err := store.UpsertObserver("obs-test", "TestObserver", "LAX", nil) + if err != nil { + t.Fatal(err) + } + cutoff := time.Now().UTC().AddDate(0, 0, -30).Format(time.RFC3339) + _, err = store.db.Exec("UPDATE observers SET last_seen = ? WHERE id = ?", cutoff, "obs-test") + if err != nil { + t.Fatal(err) + } + + removed, err := store.RemoveStaleObservers(14) + if err != nil { + t.Fatal(err) + } + if removed != 1 { + t.Errorf("removed=%d, want 1", removed) + } + + // Verify it's inactive + var inactive int + if err := store.db.QueryRow("SELECT inactive FROM observers WHERE id = ?", "obs-test").Scan(&inactive); err != nil { + t.Fatal(err) + } + if inactive != 1 { + t.Errorf("inactive=%d, want 1 after soft-delete", inactive) + } + + // Now UpsertObserver should reactivate it + err = store.UpsertObserver("obs-test", "TestObserver", "LAX", nil) + if err != nil { + t.Fatal(err) + } + + if err := store.db.QueryRow("SELECT inactive FROM observers WHERE id = ?", "obs-test").Scan(&inactive); err != nil { + t.Fatal(err) + } + if inactive != 0 { + t.Errorf("inactive=%d, want 0 after reactivation", inactive) + } +} + +func TestObserverDaysOrDefault(t *testing.T) { + tests := []struct { + name string + cfg *Config + want int + }{ + {"nil retention", &Config{}, 14}, + {"zero observer days", &Config{Retention: &RetentionConfig{ObserverDays: 0}}, 14}, + {"positive value", &Config{Retention: &RetentionConfig{ObserverDays: 30}}, 30}, + {"keep forever", &Config{Retention: &RetentionConfig{ObserverDays: -1}}, -1}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.cfg.ObserverDaysOrDefault() + if got != tt.want { + t.Errorf("ObserverDaysOrDefault() = %d, want %d", got, tt.want) + } + }) + } +} diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index 439ef1a5..ad9b781f 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -110,7 +110,8 @@ func applySchema(db *sql.DB) error { radio TEXT, battery_mv INTEGER, uptime_secs INTEGER, - noise_floor REAL + noise_floor REAL, + inactive INTEGER DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen); @@ -195,7 +196,7 @@ func applySchema(db *sql.DB) error { t.created_at FROM observations o JOIN transmissions t ON t.id = o.transmission_id - LEFT JOIN observers obs ON obs.rowid = o.observer_idx + LEFT JOIN observers obs ON obs.rowid = o.observer_idx AND (obs.inactive IS NULL OR obs.inactive = 0) `) if vErr != nil { return fmt.Errorf("packets_v view: %w", vErr) @@ -335,6 +336,19 @@ func applySchema(db *sql.DB) error { log.Println("[migration] observer_metrics timestamp index created") } + // Migration: add inactive column to observers for soft-delete retention + row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'observers_inactive_v1'") + if row.Scan(&migDone) != nil { + log.Println("[migration] Adding inactive column to observers...") + _, err := db.Exec(`ALTER TABLE observers ADD COLUMN inactive INTEGER DEFAULT 0`) + if err != nil { + // Column may already exist (e.g. fresh install with schema above) + log.Printf("[migration] observers.inactive: %v (may already exist)", err) + } + db.Exec(`INSERT INTO _migrations (name) VALUES ('observers_inactive_v1')`) + log.Println("[migration] observers.inactive column added") + } + // Migration: add packets_sent and packets_recv columns to observer_metrics row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'observer_metrics_packets_v1'") if row.Scan(&migDone) != nil { @@ -644,10 +658,13 @@ func (s *Store) UpsertObserver(id, name, iata string, meta *ObserverMeta) error ) if err != nil { s.Stats.WriteErrors.Add(1) - } else { - s.Stats.ObserverUpserts.Add(1) + return err } - return err + s.Stats.ObserverUpserts.Add(1) + + // Reactivate if this observer was previously marked inactive + s.db.Exec(`UPDATE observers SET inactive = 0 WHERE id = ? AND inactive = 1`, id) + return nil } // Close checkpoints the WAL and closes the database. @@ -779,6 +796,29 @@ func (s *Store) MoveStaleNodes(nodeDays int) (int64, error) { return moved, nil } +// RemoveStaleObservers marks observers that have not actively sent data in observerDays +// as inactive (soft-delete). This preserves JOIN integrity for observations.observer_idx +// and observer_metrics.observer_id — historical data still references the correct observer. +// An observer must actively send data to stay listed — being seen by another node does not count. +// observerDays <= -1 means never remove (keep forever). +func (s *Store) RemoveStaleObservers(observerDays int) (int64, error) { + if observerDays <= -1 { + return 0, nil // keep forever + } + cutoff := time.Now().UTC().AddDate(0, 0, -observerDays).Format(time.RFC3339) + result, err := s.db.Exec(`UPDATE observers SET inactive = 1 WHERE last_seen < ? AND (inactive IS NULL OR inactive = 0)`, cutoff) + if err != nil { + return 0, fmt.Errorf("mark stale observers inactive: %w", err) + } + removed, _ := result.RowsAffected() + if removed > 0 { + // Clean up orphaned metrics for now-inactive observers + s.db.Exec(`DELETE FROM observer_metrics WHERE observer_id IN (SELECT id FROM observers WHERE inactive = 1)`) + log.Printf("Marked %d observer(s) as inactive (not seen in %d days)", removed, observerDays) + } + return removed, nil +} + // PacketData holds the data needed to insert a packet into the DB. type PacketData struct { RawHex string diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index d1da3059..67544865 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -61,6 +61,10 @@ func main() { nodeDays := cfg.NodeDaysOrDefault() store.MoveStaleNodes(nodeDays) + // Observer retention: remove stale observers on startup + observerDays := cfg.ObserverDaysOrDefault() + store.RemoveStaleObservers(observerDays) + // Metrics retention: prune old metrics on startup metricsDays := cfg.MetricsRetentionDays() store.PruneOldMetrics(metricsDays) @@ -73,6 +77,16 @@ func main() { } }() + // Daily ticker for observer retention (every 24h, staggered 90s after startup) + observerRetentionTicker := time.NewTicker(24 * time.Hour) + go func() { + time.Sleep(90 * time.Second) // stagger after metrics prune + store.RemoveStaleObservers(observerDays) + for range observerRetentionTicker.C { + store.RemoveStaleObservers(observerDays) + } + }() + // Daily ticker for metrics retention (every 24h) metricsRetentionTicker := time.NewTicker(24 * time.Hour) go func() { diff --git a/cmd/server/config.go b/cmd/server/config.go index 61f388d0..0c15bd6e 100644 --- a/cmd/server/config.go +++ b/cmd/server/config.go @@ -110,9 +110,10 @@ type PacketStoreConfig struct { type GeoFilterConfig = geofilter.Config type RetentionConfig struct { - NodeDays int `json:"nodeDays"` - PacketDays int `json:"packetDays"` - MetricsDays int `json:"metricsDays"` + NodeDays int `json:"nodeDays"` + ObserverDays int `json:"observerDays"` + PacketDays int `json:"packetDays"` + MetricsDays int `json:"metricsDays"` } // MetricsRetentionDays returns configured metrics retention or 30 days default. @@ -165,6 +166,15 @@ func (c *Config) NodeDaysOrDefault() int { return 7 } +// ObserverDaysOrDefault returns the configured retention.observerDays or 14 if not set. +// A value of -1 means observers are never removed. +func (c *Config) ObserverDaysOrDefault() int { + if c.Retention != nil && c.Retention.ObserverDays != 0 { + return c.Retention.ObserverDays + } + return 14 +} + type HealthThresholds struct { InfraDegradedHours float64 `json:"infraDegradedHours"` InfraSilentHours float64 `json:"infraSilentHours"` diff --git a/cmd/server/config_test.go b/cmd/server/config_test.go index cd2126b9..36e59ea9 100644 --- a/cmd/server/config_test.go +++ b/cmd/server/config_test.go @@ -365,3 +365,25 @@ func TestPropagationBufferMs(t *testing.T) { } }) } + +func TestObserverDaysOrDefault(t *testing.T) { + tests := []struct { + name string + cfg *Config + want int + }{ + {"nil retention", &Config{}, 14}, + {"zero observer days", &Config{Retention: &RetentionConfig{ObserverDays: 0}}, 14}, + {"positive value", &Config{Retention: &RetentionConfig{ObserverDays: 30}}, 30}, + {"keep forever", &Config{Retention: &RetentionConfig{ObserverDays: -1}}, -1}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.cfg.ObserverDaysOrDefault() + if got != tt.want { + t.Errorf("ObserverDaysOrDefault() = %d, want %d", got, tt.want) + } + }) + } +} diff --git a/cmd/server/db.go b/cmd/server/db.go index 227a1993..aadf7772 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -2213,6 +2213,35 @@ func (db *DB) PruneOldMetrics(retentionDays int) (int64, error) { return n, nil } +// RemoveStaleObservers marks observers that have not actively sent data in observerDays +// as inactive (soft-delete). This preserves JOIN integrity for observations.observer_idx +// and observer_metrics.observer_id — historical data still references the correct observer. +// An observer must actively send data to stay listed — being seen by another node does not count. +// observerDays <= -1 means never remove (keep forever). +func (db *DB) RemoveStaleObservers(observerDays int) (int64, error) { + if observerDays <= -1 { + return 0, nil // keep forever + } + rw, err := openRW(db.path) + if err != nil { + return 0, err + } + defer rw.Close() + + cutoff := time.Now().UTC().AddDate(0, 0, -observerDays).Format(time.RFC3339) + res, err := rw.Exec(`UPDATE observers SET inactive = 1 WHERE last_seen < ? AND (inactive IS NULL OR inactive = 0)`, cutoff) + if err != nil { + return 0, err + } + n, _ := res.RowsAffected() + if n > 0 { + // Clean up orphaned metrics for now-inactive observers + rw.Exec(`DELETE FROM observer_metrics WHERE observer_id IN (SELECT id FROM observers WHERE inactive = 1)`) + log.Printf("[observers] Marked %d observer(s) as inactive (not seen in %d days)", n, observerDays) + } + return n, nil +} + // TouchNodeLastSeen updates last_seen for a node identified by full public key. // Only updates if the new timestamp is newer than the existing value (or NULL). // Returns nil even if no rows are affected (node doesn't exist). diff --git a/cmd/server/main.go b/cmd/server/main.go index 6c45973c..c056c7b0 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -325,6 +325,40 @@ func main() { log.Printf("[metrics-prune] auto-prune enabled: metrics older than %d days", metricsDays) } + // Auto-prune stale observers + var stopObserverPrune func() + { + observerDays := cfg.ObserverDaysOrDefault() + if observerDays <= -1 { + // -1 means keep forever, skip + } else { + observerPruneTicker := time.NewTicker(24 * time.Hour) + observerPruneDone := make(chan struct{}) + stopObserverPrune = func() { + observerPruneTicker.Stop() + close(observerPruneDone) + } + go func() { + defer func() { + if r := recover(); r != nil { + log.Printf("[observer-prune] panic recovered: %v", r) + } + }() + time.Sleep(3 * time.Minute) // stagger after metrics prune + database.RemoveStaleObservers(observerDays) + for { + select { + case <-observerPruneTicker.C: + database.RemoveStaleObservers(observerDays) + case <-observerPruneDone: + return + } + } + }() + log.Printf("[observer-prune] auto-prune enabled: observers not seen in %d days will be removed", observerDays) + } + } + // Auto-prune old neighbor edges var stopEdgePrune func() { @@ -386,6 +420,9 @@ func main() { if stopMetricsPrune != nil { stopMetricsPrune() } + if stopObserverPrune != nil { + stopObserverPrune() + } if stopEdgePrune != nil { stopEdgePrune() } diff --git a/cmd/server/routes.go b/cmd/server/routes.go index f9face01..7661dd40 100644 --- a/cmd/server/routes.go +++ b/cmd/server/routes.go @@ -2385,13 +2385,32 @@ func (s *Server) handleAdminPrune(w http.ResponseWriter, r *http.Request) { writeError(w, 400, "days parameter required (or set retention.packetDays in config)") return } + + results := map[string]interface{}{} + + // Prune old packets n, err := s.db.PruneOldPackets(days) if err != nil { writeError(w, 500, err.Error()) return } log.Printf("[prune] deleted %d transmissions older than %d days", n, days) - writeJSON(w, map[string]interface{}{"deleted": n, "days": days}) + results["packets_deleted"] = n + results["deleted"] = n // legacy alias + + // Also mark stale observers as inactive if observerDays is configured + observerDays := s.cfg.ObserverDaysOrDefault() + if observerDays > 0 { + obsN, obsErr := s.db.RemoveStaleObservers(observerDays) + if obsErr != nil { + log.Printf("[prune] observer prune error: %v", obsErr) + } else { + results["observers_inactive"] = obsN + } + } + + results["days"] = days + writeJSON(w, results) } // constantTimeEqual compares two strings in constant time to prevent timing attacks. diff --git a/config.example.json b/config.example.json index 5bb64be0..c42cf3c7 100644 --- a/config.example.json +++ b/config.example.json @@ -3,8 +3,9 @@ "apiKey": "your-secret-api-key-here", "retention": { "nodeDays": 7, + "observerDays": 14, "packetDays": 30, - "_comment": "nodeDays: nodes not seen in N days are moved to inactive_nodes (default 7). packetDays: transmissions+observations older than N days are deleted daily (0 = disabled)." + "_comment": "nodeDays: nodes not seen in N days moved to inactive_nodes (default 7). observerDays: observers not sending data in N days are removed (-1 = keep forever, default 14). packetDays: transmissions older than N days are deleted (0 = disabled)." }, "https": { "cert": "/path/to/cert.pem",