From fa3f623bd68d84441a7f34715c150b971e843dcf Mon Sep 17 00:00:00 2001 From: Joel Claw Date: Fri, 17 Apr 2026 18:24:40 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20add=20observer=20retention=20=E2=80=94?= =?UTF-8?q?=20remove=20stale=20observers=20after=20configurable=20days=20(?= =?UTF-8?q?#764)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary Observers that stop actively sending data now get removed after a configurable retention period (default 14 days). Previously, observers remained in the `observers` table forever. This meant nodes that were once observers for an instance but are no longer connected (even if still active in the mesh elsewhere) would continue appearing in the observer list indefinitely. ## Key Design Decisions - **Active data requirement**: `last_seen` is only updated when the observer itself sends packets (via `stmtUpdateObserverLastSeen`). Being seen by another node does NOT update this field. So an observer must actively send data to stay listed. - **Default: 14 days** — observers not seen in 14 days are removed - **`-1` = keep forever** — for users who want observers to never be removed - **`0` = use default (14 days)** — same as not setting the field - **Runs on startup + daily ticker** — staggered 3 minutes after metrics prune to avoid DB contention ## Changes | File | Change | |------|--------| | `cmd/ingestor/config.go` | Add `ObserverDays` to `RetentionConfig`, add `ObserverDaysOrDefault()` | | `cmd/ingestor/db.go` | Add `RemoveStaleObservers()` — deletes observers with `last_seen` before cutoff | | `cmd/ingestor/main.go` | Wire up startup + daily ticker for observer retention | | `cmd/server/config.go` | Add `ObserverDays` to `RetentionConfig`, add `ObserverDaysOrDefault()` | | `cmd/server/db.go` | Add `RemoveStaleObservers()` (server-side, uses read-write connection) | | `cmd/server/main.go` | Wire up startup + daily ticker, shutdown cleanup | | `cmd/server/routes.go` | Admin prune API now also removes stale observers | | `config.example.json` | Add `observerDays: 14` with documentation | | `cmd/ingestor/coverage_boost_test.go` | 4 tests: basic removal, empty store, keep forever (-1), default (0→14) | | `cmd/server/config_test.go` | 4 tests: `ObserverDaysOrDefault` edge cases | ## Config Example ```json { "retention": { "nodeDays": 7, "observerDays": 14, "packetDays": 30, "_comment": "observerDays: -1 = keep forever, 0 = use default (14)" } } ``` ## Admin API The `/api/admin/prune` endpoint now also removes stale observers (using `observerDays` from config) and reports `observers_removed` in the response alongside `packets_deleted`. ## Test Plan - [x] `TestRemoveStaleObservers` — old observer removed, recent observer kept - [x] `TestRemoveStaleObserversNone` — empty store, no errors - [x] `TestRemoveStaleObserversKeepForever` — `-1` keeps even year-old observers - [x] `TestRemoveStaleObserversDefault` — `0` defaults to 14 days - [x] `TestObserverDaysOrDefault` (ingestor) — nil/zero/positive/keep-forever - [x] `TestObserverDaysOrDefault` (server) — nil/zero/positive/keep-forever - [x] Both binaries compile cleanly (`go build`) - [ ] Manual: verify observer count decreases after retention period on a live instance --- cmd/ingestor/config.go | 14 ++- cmd/ingestor/coverage_boost_test.go | 180 ++++++++++++++++++++++++++++ cmd/ingestor/db.go | 50 +++++++- cmd/ingestor/main.go | 14 +++ cmd/server/config.go | 16 ++- cmd/server/config_test.go | 22 ++++ cmd/server/db.go | 29 +++++ cmd/server/main.go | 37 ++++++ cmd/server/routes.go | 21 +++- config.example.json | 3 +- 10 files changed, 374 insertions(+), 12 deletions(-) diff --git a/cmd/ingestor/config.go b/cmd/ingestor/config.go index 007d2d62..c39730a0 100644 --- a/cmd/ingestor/config.go +++ b/cmd/ingestor/config.go @@ -47,8 +47,9 @@ type GeoFilterConfig = geofilter.Config // RetentionConfig controls how long stale nodes are kept before being moved to inactive_nodes. type RetentionConfig struct { - NodeDays int `json:"nodeDays"` - MetricsDays int `json:"metricsDays"` + NodeDays int `json:"nodeDays"` + ObserverDays int `json:"observerDays"` + MetricsDays int `json:"metricsDays"` } // MetricsConfig controls observer metrics collection. @@ -80,6 +81,15 @@ func (c *Config) NodeDaysOrDefault() int { return 7 } +// ObserverDaysOrDefault returns the configured retention.observerDays or 14 if not set. +// A value of -1 means observers are never removed. +func (c *Config) ObserverDaysOrDefault() int { + if c.Retention != nil && c.Retention.ObserverDays != 0 { + return c.Retention.ObserverDays + } + return 14 +} + // LoadConfig reads configuration from a JSON file, with env var overrides. // If the config file does not exist, sensible defaults are used (zero-config startup). func LoadConfig(path string) (*Config, error) { diff --git a/cmd/ingestor/coverage_boost_test.go b/cmd/ingestor/coverage_boost_test.go index a4dea830..292f080a 100644 --- a/cmd/ingestor/coverage_boost_test.go +++ b/cmd/ingestor/coverage_boost_test.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "encoding/json" "testing" + "time" ) // hmacSHA256 computes HMAC-SHA256 for test use. @@ -1138,3 +1139,182 @@ func TestDecodeTraceWithPath(t *testing.T) { t.Errorf("flags=%v, want 3", p.TraceFlags) } } + +// --- db.go: RemoveStaleObservers (soft-delete) --- + +func TestRemoveStaleObservers(t *testing.T) { + store := newTestStore(t) + + // Insert an observer with last_seen 30 days ago + err := store.UpsertObserver("obs-old", "OldObserver", "LAX", nil) + if err != nil { + t.Fatal(err) + } + // Override last_seen to 30 days ago + cutoff := time.Now().UTC().AddDate(0, 0, -30).Format(time.RFC3339) + _, err = store.db.Exec("UPDATE observers SET last_seen = ? WHERE id = ?", cutoff, "obs-old") + if err != nil { + t.Fatal(err) + } + + // Insert a recent observer + err = store.UpsertObserver("obs-new", "NewObserver", "NYC", nil) + if err != nil { + t.Fatal(err) + } + + removed, err := store.RemoveStaleObservers(14) + if err != nil { + t.Fatal(err) + } + if removed != 1 { + t.Errorf("removed=%d, want 1", removed) + } + + // Observer should still be in the table (soft-delete), but marked inactive + var count int + if err := store.db.QueryRow("SELECT COUNT(*) FROM observers").Scan(&count); err != nil { + t.Fatal(err) + } + if count != 2 { + t.Errorf("observers count=%d, want 2 (soft-delete preserves row)", count) + } + + // Check that the old observer is marked inactive + var inactive int + if err := store.db.QueryRow("SELECT inactive FROM observers WHERE id = ?", "obs-old").Scan(&inactive); err != nil { + t.Fatal(err) + } + if inactive != 1 { + t.Errorf("obs-old inactive=%d, want 1", inactive) + } + + // Check that the recent observer is still active + var newInactive int + if err := store.db.QueryRow("SELECT inactive FROM observers WHERE id = ?", "obs-new").Scan(&newInactive); err != nil { + t.Fatal(err) + } + if newInactive != 0 { + t.Errorf("obs-new inactive=%d, want 0", newInactive) + } +} + +func TestRemoveStaleObserversNone(t *testing.T) { + store := newTestStore(t) + + removed, err := store.RemoveStaleObservers(14) + if err != nil { + t.Fatal(err) + } + if removed != 0 { + t.Errorf("removed=%d, want 0", removed) + } +} + +func TestRemoveStaleObserversKeepForever(t *testing.T) { + store := newTestStore(t) + + // Insert an old observer + err := store.UpsertObserver("obs-ancient", "AncientObserver", "LAX", nil) + if err != nil { + t.Fatal(err) + } + cutoff := time.Now().UTC().AddDate(0, 0, -365).Format(time.RFC3339) + _, err = store.db.Exec("UPDATE observers SET last_seen = ? WHERE id = ?", cutoff, "obs-ancient") + if err != nil { + t.Fatal(err) + } + + // observerDays = -1 means keep forever + removed, err := store.RemoveStaleObservers(-1) + if err != nil { + t.Fatal(err) + } + if removed != 0 { + t.Errorf("removed=%d, want 0 (keep forever)", removed) + } + + var count int + if err := store.db.QueryRow("SELECT COUNT(*) FROM observers").Scan(&count); err != nil { + t.Fatal(err) + } + if count != 1 { + t.Errorf("observers count=%d, want 1 (keep forever)", count) + } + + // Observer should NOT be marked inactive + var inactive int + if err := store.db.QueryRow("SELECT inactive FROM observers WHERE id = ?", "obs-ancient").Scan(&inactive); err != nil { + t.Fatal(err) + } + if inactive != 0 { + t.Errorf("obs-ancient inactive=%d, want 0 (keep forever)", inactive) + } +} + +func TestRemoveStaleObserversReactivation(t *testing.T) { + store := newTestStore(t) + + // Insert and stale-mark an observer + err := store.UpsertObserver("obs-test", "TestObserver", "LAX", nil) + if err != nil { + t.Fatal(err) + } + cutoff := time.Now().UTC().AddDate(0, 0, -30).Format(time.RFC3339) + _, err = store.db.Exec("UPDATE observers SET last_seen = ? WHERE id = ?", cutoff, "obs-test") + if err != nil { + t.Fatal(err) + } + + removed, err := store.RemoveStaleObservers(14) + if err != nil { + t.Fatal(err) + } + if removed != 1 { + t.Errorf("removed=%d, want 1", removed) + } + + // Verify it's inactive + var inactive int + if err := store.db.QueryRow("SELECT inactive FROM observers WHERE id = ?", "obs-test").Scan(&inactive); err != nil { + t.Fatal(err) + } + if inactive != 1 { + t.Errorf("inactive=%d, want 1 after soft-delete", inactive) + } + + // Now UpsertObserver should reactivate it + err = store.UpsertObserver("obs-test", "TestObserver", "LAX", nil) + if err != nil { + t.Fatal(err) + } + + if err := store.db.QueryRow("SELECT inactive FROM observers WHERE id = ?", "obs-test").Scan(&inactive); err != nil { + t.Fatal(err) + } + if inactive != 0 { + t.Errorf("inactive=%d, want 0 after reactivation", inactive) + } +} + +func TestObserverDaysOrDefault(t *testing.T) { + tests := []struct { + name string + cfg *Config + want int + }{ + {"nil retention", &Config{}, 14}, + {"zero observer days", &Config{Retention: &RetentionConfig{ObserverDays: 0}}, 14}, + {"positive value", &Config{Retention: &RetentionConfig{ObserverDays: 30}}, 30}, + {"keep forever", &Config{Retention: &RetentionConfig{ObserverDays: -1}}, -1}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.cfg.ObserverDaysOrDefault() + if got != tt.want { + t.Errorf("ObserverDaysOrDefault() = %d, want %d", got, tt.want) + } + }) + } +} diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index 439ef1a5..ad9b781f 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -110,7 +110,8 @@ func applySchema(db *sql.DB) error { radio TEXT, battery_mv INTEGER, uptime_secs INTEGER, - noise_floor REAL + noise_floor REAL, + inactive INTEGER DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen); @@ -195,7 +196,7 @@ func applySchema(db *sql.DB) error { t.created_at FROM observations o JOIN transmissions t ON t.id = o.transmission_id - LEFT JOIN observers obs ON obs.rowid = o.observer_idx + LEFT JOIN observers obs ON obs.rowid = o.observer_idx AND (obs.inactive IS NULL OR obs.inactive = 0) `) if vErr != nil { return fmt.Errorf("packets_v view: %w", vErr) @@ -335,6 +336,19 @@ func applySchema(db *sql.DB) error { log.Println("[migration] observer_metrics timestamp index created") } + // Migration: add inactive column to observers for soft-delete retention + row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'observers_inactive_v1'") + if row.Scan(&migDone) != nil { + log.Println("[migration] Adding inactive column to observers...") + _, err := db.Exec(`ALTER TABLE observers ADD COLUMN inactive INTEGER DEFAULT 0`) + if err != nil { + // Column may already exist (e.g. fresh install with schema above) + log.Printf("[migration] observers.inactive: %v (may already exist)", err) + } + db.Exec(`INSERT INTO _migrations (name) VALUES ('observers_inactive_v1')`) + log.Println("[migration] observers.inactive column added") + } + // Migration: add packets_sent and packets_recv columns to observer_metrics row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'observer_metrics_packets_v1'") if row.Scan(&migDone) != nil { @@ -644,10 +658,13 @@ func (s *Store) UpsertObserver(id, name, iata string, meta *ObserverMeta) error ) if err != nil { s.Stats.WriteErrors.Add(1) - } else { - s.Stats.ObserverUpserts.Add(1) + return err } - return err + s.Stats.ObserverUpserts.Add(1) + + // Reactivate if this observer was previously marked inactive + s.db.Exec(`UPDATE observers SET inactive = 0 WHERE id = ? AND inactive = 1`, id) + return nil } // Close checkpoints the WAL and closes the database. @@ -779,6 +796,29 @@ func (s *Store) MoveStaleNodes(nodeDays int) (int64, error) { return moved, nil } +// RemoveStaleObservers marks observers that have not actively sent data in observerDays +// as inactive (soft-delete). This preserves JOIN integrity for observations.observer_idx +// and observer_metrics.observer_id — historical data still references the correct observer. +// An observer must actively send data to stay listed — being seen by another node does not count. +// observerDays <= -1 means never remove (keep forever). +func (s *Store) RemoveStaleObservers(observerDays int) (int64, error) { + if observerDays <= -1 { + return 0, nil // keep forever + } + cutoff := time.Now().UTC().AddDate(0, 0, -observerDays).Format(time.RFC3339) + result, err := s.db.Exec(`UPDATE observers SET inactive = 1 WHERE last_seen < ? AND (inactive IS NULL OR inactive = 0)`, cutoff) + if err != nil { + return 0, fmt.Errorf("mark stale observers inactive: %w", err) + } + removed, _ := result.RowsAffected() + if removed > 0 { + // Clean up orphaned metrics for now-inactive observers + s.db.Exec(`DELETE FROM observer_metrics WHERE observer_id IN (SELECT id FROM observers WHERE inactive = 1)`) + log.Printf("Marked %d observer(s) as inactive (not seen in %d days)", removed, observerDays) + } + return removed, nil +} + // PacketData holds the data needed to insert a packet into the DB. type PacketData struct { RawHex string diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index d1da3059..67544865 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -61,6 +61,10 @@ func main() { nodeDays := cfg.NodeDaysOrDefault() store.MoveStaleNodes(nodeDays) + // Observer retention: remove stale observers on startup + observerDays := cfg.ObserverDaysOrDefault() + store.RemoveStaleObservers(observerDays) + // Metrics retention: prune old metrics on startup metricsDays := cfg.MetricsRetentionDays() store.PruneOldMetrics(metricsDays) @@ -73,6 +77,16 @@ func main() { } }() + // Daily ticker for observer retention (every 24h, staggered 90s after startup) + observerRetentionTicker := time.NewTicker(24 * time.Hour) + go func() { + time.Sleep(90 * time.Second) // stagger after metrics prune + store.RemoveStaleObservers(observerDays) + for range observerRetentionTicker.C { + store.RemoveStaleObservers(observerDays) + } + }() + // Daily ticker for metrics retention (every 24h) metricsRetentionTicker := time.NewTicker(24 * time.Hour) go func() { diff --git a/cmd/server/config.go b/cmd/server/config.go index 61f388d0..0c15bd6e 100644 --- a/cmd/server/config.go +++ b/cmd/server/config.go @@ -110,9 +110,10 @@ type PacketStoreConfig struct { type GeoFilterConfig = geofilter.Config type RetentionConfig struct { - NodeDays int `json:"nodeDays"` - PacketDays int `json:"packetDays"` - MetricsDays int `json:"metricsDays"` + NodeDays int `json:"nodeDays"` + ObserverDays int `json:"observerDays"` + PacketDays int `json:"packetDays"` + MetricsDays int `json:"metricsDays"` } // MetricsRetentionDays returns configured metrics retention or 30 days default. @@ -165,6 +166,15 @@ func (c *Config) NodeDaysOrDefault() int { return 7 } +// ObserverDaysOrDefault returns the configured retention.observerDays or 14 if not set. +// A value of -1 means observers are never removed. +func (c *Config) ObserverDaysOrDefault() int { + if c.Retention != nil && c.Retention.ObserverDays != 0 { + return c.Retention.ObserverDays + } + return 14 +} + type HealthThresholds struct { InfraDegradedHours float64 `json:"infraDegradedHours"` InfraSilentHours float64 `json:"infraSilentHours"` diff --git a/cmd/server/config_test.go b/cmd/server/config_test.go index cd2126b9..36e59ea9 100644 --- a/cmd/server/config_test.go +++ b/cmd/server/config_test.go @@ -365,3 +365,25 @@ func TestPropagationBufferMs(t *testing.T) { } }) } + +func TestObserverDaysOrDefault(t *testing.T) { + tests := []struct { + name string + cfg *Config + want int + }{ + {"nil retention", &Config{}, 14}, + {"zero observer days", &Config{Retention: &RetentionConfig{ObserverDays: 0}}, 14}, + {"positive value", &Config{Retention: &RetentionConfig{ObserverDays: 30}}, 30}, + {"keep forever", &Config{Retention: &RetentionConfig{ObserverDays: -1}}, -1}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.cfg.ObserverDaysOrDefault() + if got != tt.want { + t.Errorf("ObserverDaysOrDefault() = %d, want %d", got, tt.want) + } + }) + } +} diff --git a/cmd/server/db.go b/cmd/server/db.go index 227a1993..aadf7772 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -2213,6 +2213,35 @@ func (db *DB) PruneOldMetrics(retentionDays int) (int64, error) { return n, nil } +// RemoveStaleObservers marks observers that have not actively sent data in observerDays +// as inactive (soft-delete). This preserves JOIN integrity for observations.observer_idx +// and observer_metrics.observer_id — historical data still references the correct observer. +// An observer must actively send data to stay listed — being seen by another node does not count. +// observerDays <= -1 means never remove (keep forever). +func (db *DB) RemoveStaleObservers(observerDays int) (int64, error) { + if observerDays <= -1 { + return 0, nil // keep forever + } + rw, err := openRW(db.path) + if err != nil { + return 0, err + } + defer rw.Close() + + cutoff := time.Now().UTC().AddDate(0, 0, -observerDays).Format(time.RFC3339) + res, err := rw.Exec(`UPDATE observers SET inactive = 1 WHERE last_seen < ? AND (inactive IS NULL OR inactive = 0)`, cutoff) + if err != nil { + return 0, err + } + n, _ := res.RowsAffected() + if n > 0 { + // Clean up orphaned metrics for now-inactive observers + rw.Exec(`DELETE FROM observer_metrics WHERE observer_id IN (SELECT id FROM observers WHERE inactive = 1)`) + log.Printf("[observers] Marked %d observer(s) as inactive (not seen in %d days)", n, observerDays) + } + return n, nil +} + // TouchNodeLastSeen updates last_seen for a node identified by full public key. // Only updates if the new timestamp is newer than the existing value (or NULL). // Returns nil even if no rows are affected (node doesn't exist). diff --git a/cmd/server/main.go b/cmd/server/main.go index 6c45973c..c056c7b0 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -325,6 +325,40 @@ func main() { log.Printf("[metrics-prune] auto-prune enabled: metrics older than %d days", metricsDays) } + // Auto-prune stale observers + var stopObserverPrune func() + { + observerDays := cfg.ObserverDaysOrDefault() + if observerDays <= -1 { + // -1 means keep forever, skip + } else { + observerPruneTicker := time.NewTicker(24 * time.Hour) + observerPruneDone := make(chan struct{}) + stopObserverPrune = func() { + observerPruneTicker.Stop() + close(observerPruneDone) + } + go func() { + defer func() { + if r := recover(); r != nil { + log.Printf("[observer-prune] panic recovered: %v", r) + } + }() + time.Sleep(3 * time.Minute) // stagger after metrics prune + database.RemoveStaleObservers(observerDays) + for { + select { + case <-observerPruneTicker.C: + database.RemoveStaleObservers(observerDays) + case <-observerPruneDone: + return + } + } + }() + log.Printf("[observer-prune] auto-prune enabled: observers not seen in %d days will be removed", observerDays) + } + } + // Auto-prune old neighbor edges var stopEdgePrune func() { @@ -386,6 +420,9 @@ func main() { if stopMetricsPrune != nil { stopMetricsPrune() } + if stopObserverPrune != nil { + stopObserverPrune() + } if stopEdgePrune != nil { stopEdgePrune() } diff --git a/cmd/server/routes.go b/cmd/server/routes.go index f9face01..7661dd40 100644 --- a/cmd/server/routes.go +++ b/cmd/server/routes.go @@ -2385,13 +2385,32 @@ func (s *Server) handleAdminPrune(w http.ResponseWriter, r *http.Request) { writeError(w, 400, "days parameter required (or set retention.packetDays in config)") return } + + results := map[string]interface{}{} + + // Prune old packets n, err := s.db.PruneOldPackets(days) if err != nil { writeError(w, 500, err.Error()) return } log.Printf("[prune] deleted %d transmissions older than %d days", n, days) - writeJSON(w, map[string]interface{}{"deleted": n, "days": days}) + results["packets_deleted"] = n + results["deleted"] = n // legacy alias + + // Also mark stale observers as inactive if observerDays is configured + observerDays := s.cfg.ObserverDaysOrDefault() + if observerDays > 0 { + obsN, obsErr := s.db.RemoveStaleObservers(observerDays) + if obsErr != nil { + log.Printf("[prune] observer prune error: %v", obsErr) + } else { + results["observers_inactive"] = obsN + } + } + + results["days"] = days + writeJSON(w, results) } // constantTimeEqual compares two strings in constant time to prevent timing attacks. diff --git a/config.example.json b/config.example.json index 5bb64be0..c42cf3c7 100644 --- a/config.example.json +++ b/config.example.json @@ -3,8 +3,9 @@ "apiKey": "your-secret-api-key-here", "retention": { "nodeDays": 7, + "observerDays": 14, "packetDays": 30, - "_comment": "nodeDays: nodes not seen in N days are moved to inactive_nodes (default 7). packetDays: transmissions+observations older than N days are deleted daily (0 = disabled)." + "_comment": "nodeDays: nodes not seen in N days moved to inactive_nodes (default 7). observerDays: observers not sending data in N days are removed (-1 = keep forever, default 14). packetDays: transmissions older than N days are deleted (0 = disabled)." }, "https": { "cert": "/path/to/cert.pem",