mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-06-02 12:44:07 +00:00
feat: add observer retention — remove stale observers after configurable days (#764)
## Summary
Observers that stop actively sending data now get removed after a
configurable retention period (default 14 days).
Previously, observers remained in the `observers` table forever. This
meant nodes that were once observers for an instance but are no longer
connected (even if still active in the mesh elsewhere) would continue
appearing in the observer list indefinitely.
## Key Design Decisions
- **Active data requirement**: `last_seen` is only updated when the
observer itself sends packets (via `stmtUpdateObserverLastSeen`). Being
seen by another node does NOT update this field. So an observer must
actively send data to stay listed.
- **Default: 14 days** — observers not seen in 14 days are removed
- **`-1` = keep forever** — for users who want observers to never be
removed
- **`0` = use default (14 days)** — same as not setting the field
- **Runs on startup + daily ticker** — staggered 3 minutes after metrics
prune to avoid DB contention
## Changes
| File | Change |
|------|--------|
| `cmd/ingestor/config.go` | Add `ObserverDays` to `RetentionConfig`,
add `ObserverDaysOrDefault()` |
| `cmd/ingestor/db.go` | Add `RemoveStaleObservers()` — deletes
observers with `last_seen` before cutoff |
| `cmd/ingestor/main.go` | Wire up startup + daily ticker for observer
retention |
| `cmd/server/config.go` | Add `ObserverDays` to `RetentionConfig`, add
`ObserverDaysOrDefault()` |
| `cmd/server/db.go` | Add `RemoveStaleObservers()` (server-side, uses
read-write connection) |
| `cmd/server/main.go` | Wire up startup + daily ticker, shutdown
cleanup |
| `cmd/server/routes.go` | Admin prune API now also removes stale
observers |
| `config.example.json` | Add `observerDays: 14` with documentation |
| `cmd/ingestor/coverage_boost_test.go` | 4 tests: basic removal, empty
store, keep forever (-1), default (0→14) |
| `cmd/server/config_test.go` | 4 tests: `ObserverDaysOrDefault` edge
cases |
## Config Example
```json
{
"retention": {
"nodeDays": 7,
"observerDays": 14,
"packetDays": 30,
"_comment": "observerDays: -1 = keep forever, 0 = use default (14)"
}
}
```
## Admin API
The `/api/admin/prune` endpoint now also removes stale observers (using
`observerDays` from config) and reports `observers_removed` in the
response alongside `packets_deleted`.
## Test Plan
- [x] `TestRemoveStaleObservers` — old observer removed, recent observer
kept
- [x] `TestRemoveStaleObserversNone` — empty store, no errors
- [x] `TestRemoveStaleObserversKeepForever` — `-1` keeps even year-old
observers
- [x] `TestRemoveStaleObserversDefault` — `0` defaults to 14 days
- [x] `TestObserverDaysOrDefault` (ingestor) —
nil/zero/positive/keep-forever
- [x] `TestObserverDaysOrDefault` (server) —
nil/zero/positive/keep-forever
- [x] Both binaries compile cleanly (`go build`)
- [ ] Manual: verify observer count decreases after retention period on
a live instance
This commit is contained in:
+12
-2
@@ -47,8 +47,9 @@ type GeoFilterConfig = geofilter.Config
|
||||
|
||||
// RetentionConfig controls how long stale nodes are kept before being moved to inactive_nodes.
|
||||
type RetentionConfig struct {
|
||||
NodeDays int `json:"nodeDays"`
|
||||
MetricsDays int `json:"metricsDays"`
|
||||
NodeDays int `json:"nodeDays"`
|
||||
ObserverDays int `json:"observerDays"`
|
||||
MetricsDays int `json:"metricsDays"`
|
||||
}
|
||||
|
||||
// MetricsConfig controls observer metrics collection.
|
||||
@@ -80,6 +81,15 @@ func (c *Config) NodeDaysOrDefault() int {
|
||||
return 7
|
||||
}
|
||||
|
||||
// ObserverDaysOrDefault returns the configured retention.observerDays or 14 if not set.
|
||||
// A value of -1 means observers are never removed.
|
||||
func (c *Config) ObserverDaysOrDefault() int {
|
||||
if c.Retention != nil && c.Retention.ObserverDays != 0 {
|
||||
return c.Retention.ObserverDays
|
||||
}
|
||||
return 14
|
||||
}
|
||||
|
||||
// LoadConfig reads configuration from a JSON file, with env var overrides.
|
||||
// If the config file does not exist, sensible defaults are used (zero-config startup).
|
||||
func LoadConfig(path string) (*Config, error) {
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// hmacSHA256 computes HMAC-SHA256 for test use.
|
||||
@@ -1138,3 +1139,182 @@ func TestDecodeTraceWithPath(t *testing.T) {
|
||||
t.Errorf("flags=%v, want 3", p.TraceFlags)
|
||||
}
|
||||
}
|
||||
|
||||
// --- db.go: RemoveStaleObservers (soft-delete) ---
|
||||
|
||||
func TestRemoveStaleObservers(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
|
||||
// Insert an observer with last_seen 30 days ago
|
||||
err := store.UpsertObserver("obs-old", "OldObserver", "LAX", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Override last_seen to 30 days ago
|
||||
cutoff := time.Now().UTC().AddDate(0, 0, -30).Format(time.RFC3339)
|
||||
_, err = store.db.Exec("UPDATE observers SET last_seen = ? WHERE id = ?", cutoff, "obs-old")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Insert a recent observer
|
||||
err = store.UpsertObserver("obs-new", "NewObserver", "NYC", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
removed, err := store.RemoveStaleObservers(14)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if removed != 1 {
|
||||
t.Errorf("removed=%d, want 1", removed)
|
||||
}
|
||||
|
||||
// Observer should still be in the table (soft-delete), but marked inactive
|
||||
var count int
|
||||
if err := store.db.QueryRow("SELECT COUNT(*) FROM observers").Scan(&count); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if count != 2 {
|
||||
t.Errorf("observers count=%d, want 2 (soft-delete preserves row)", count)
|
||||
}
|
||||
|
||||
// Check that the old observer is marked inactive
|
||||
var inactive int
|
||||
if err := store.db.QueryRow("SELECT inactive FROM observers WHERE id = ?", "obs-old").Scan(&inactive); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if inactive != 1 {
|
||||
t.Errorf("obs-old inactive=%d, want 1", inactive)
|
||||
}
|
||||
|
||||
// Check that the recent observer is still active
|
||||
var newInactive int
|
||||
if err := store.db.QueryRow("SELECT inactive FROM observers WHERE id = ?", "obs-new").Scan(&newInactive); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if newInactive != 0 {
|
||||
t.Errorf("obs-new inactive=%d, want 0", newInactive)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveStaleObserversNone(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
|
||||
removed, err := store.RemoveStaleObservers(14)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if removed != 0 {
|
||||
t.Errorf("removed=%d, want 0", removed)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveStaleObserversKeepForever(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
|
||||
// Insert an old observer
|
||||
err := store.UpsertObserver("obs-ancient", "AncientObserver", "LAX", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cutoff := time.Now().UTC().AddDate(0, 0, -365).Format(time.RFC3339)
|
||||
_, err = store.db.Exec("UPDATE observers SET last_seen = ? WHERE id = ?", cutoff, "obs-ancient")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// observerDays = -1 means keep forever
|
||||
removed, err := store.RemoveStaleObservers(-1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if removed != 0 {
|
||||
t.Errorf("removed=%d, want 0 (keep forever)", removed)
|
||||
}
|
||||
|
||||
var count int
|
||||
if err := store.db.QueryRow("SELECT COUNT(*) FROM observers").Scan(&count); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if count != 1 {
|
||||
t.Errorf("observers count=%d, want 1 (keep forever)", count)
|
||||
}
|
||||
|
||||
// Observer should NOT be marked inactive
|
||||
var inactive int
|
||||
if err := store.db.QueryRow("SELECT inactive FROM observers WHERE id = ?", "obs-ancient").Scan(&inactive); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if inactive != 0 {
|
||||
t.Errorf("obs-ancient inactive=%d, want 0 (keep forever)", inactive)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveStaleObserversReactivation(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
|
||||
// Insert and stale-mark an observer
|
||||
err := store.UpsertObserver("obs-test", "TestObserver", "LAX", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cutoff := time.Now().UTC().AddDate(0, 0, -30).Format(time.RFC3339)
|
||||
_, err = store.db.Exec("UPDATE observers SET last_seen = ? WHERE id = ?", cutoff, "obs-test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
removed, err := store.RemoveStaleObservers(14)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if removed != 1 {
|
||||
t.Errorf("removed=%d, want 1", removed)
|
||||
}
|
||||
|
||||
// Verify it's inactive
|
||||
var inactive int
|
||||
if err := store.db.QueryRow("SELECT inactive FROM observers WHERE id = ?", "obs-test").Scan(&inactive); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if inactive != 1 {
|
||||
t.Errorf("inactive=%d, want 1 after soft-delete", inactive)
|
||||
}
|
||||
|
||||
// Now UpsertObserver should reactivate it
|
||||
err = store.UpsertObserver("obs-test", "TestObserver", "LAX", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := store.db.QueryRow("SELECT inactive FROM observers WHERE id = ?", "obs-test").Scan(&inactive); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if inactive != 0 {
|
||||
t.Errorf("inactive=%d, want 0 after reactivation", inactive)
|
||||
}
|
||||
}
|
||||
|
||||
func TestObserverDaysOrDefault(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
cfg *Config
|
||||
want int
|
||||
}{
|
||||
{"nil retention", &Config{}, 14},
|
||||
{"zero observer days", &Config{Retention: &RetentionConfig{ObserverDays: 0}}, 14},
|
||||
{"positive value", &Config{Retention: &RetentionConfig{ObserverDays: 30}}, 30},
|
||||
{"keep forever", &Config{Retention: &RetentionConfig{ObserverDays: -1}}, -1},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := tt.cfg.ObserverDaysOrDefault()
|
||||
if got != tt.want {
|
||||
t.Errorf("ObserverDaysOrDefault() = %d, want %d", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
+45
-5
@@ -110,7 +110,8 @@ func applySchema(db *sql.DB) error {
|
||||
radio TEXT,
|
||||
battery_mv INTEGER,
|
||||
uptime_secs INTEGER,
|
||||
noise_floor REAL
|
||||
noise_floor REAL,
|
||||
inactive INTEGER DEFAULT 0
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen);
|
||||
@@ -195,7 +196,7 @@ func applySchema(db *sql.DB) error {
|
||||
t.created_at
|
||||
FROM observations o
|
||||
JOIN transmissions t ON t.id = o.transmission_id
|
||||
LEFT JOIN observers obs ON obs.rowid = o.observer_idx
|
||||
LEFT JOIN observers obs ON obs.rowid = o.observer_idx AND (obs.inactive IS NULL OR obs.inactive = 0)
|
||||
`)
|
||||
if vErr != nil {
|
||||
return fmt.Errorf("packets_v view: %w", vErr)
|
||||
@@ -335,6 +336,19 @@ func applySchema(db *sql.DB) error {
|
||||
log.Println("[migration] observer_metrics timestamp index created")
|
||||
}
|
||||
|
||||
// Migration: add inactive column to observers for soft-delete retention
|
||||
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'observers_inactive_v1'")
|
||||
if row.Scan(&migDone) != nil {
|
||||
log.Println("[migration] Adding inactive column to observers...")
|
||||
_, err := db.Exec(`ALTER TABLE observers ADD COLUMN inactive INTEGER DEFAULT 0`)
|
||||
if err != nil {
|
||||
// Column may already exist (e.g. fresh install with schema above)
|
||||
log.Printf("[migration] observers.inactive: %v (may already exist)", err)
|
||||
}
|
||||
db.Exec(`INSERT INTO _migrations (name) VALUES ('observers_inactive_v1')`)
|
||||
log.Println("[migration] observers.inactive column added")
|
||||
}
|
||||
|
||||
// Migration: add packets_sent and packets_recv columns to observer_metrics
|
||||
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'observer_metrics_packets_v1'")
|
||||
if row.Scan(&migDone) != nil {
|
||||
@@ -644,10 +658,13 @@ func (s *Store) UpsertObserver(id, name, iata string, meta *ObserverMeta) error
|
||||
)
|
||||
if err != nil {
|
||||
s.Stats.WriteErrors.Add(1)
|
||||
} else {
|
||||
s.Stats.ObserverUpserts.Add(1)
|
||||
return err
|
||||
}
|
||||
return err
|
||||
s.Stats.ObserverUpserts.Add(1)
|
||||
|
||||
// Reactivate if this observer was previously marked inactive
|
||||
s.db.Exec(`UPDATE observers SET inactive = 0 WHERE id = ? AND inactive = 1`, id)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close checkpoints the WAL and closes the database.
|
||||
@@ -779,6 +796,29 @@ func (s *Store) MoveStaleNodes(nodeDays int) (int64, error) {
|
||||
return moved, nil
|
||||
}
|
||||
|
||||
// RemoveStaleObservers marks observers that have not actively sent data in observerDays
|
||||
// as inactive (soft-delete). This preserves JOIN integrity for observations.observer_idx
|
||||
// and observer_metrics.observer_id — historical data still references the correct observer.
|
||||
// An observer must actively send data to stay listed — being seen by another node does not count.
|
||||
// observerDays <= -1 means never remove (keep forever).
|
||||
func (s *Store) RemoveStaleObservers(observerDays int) (int64, error) {
|
||||
if observerDays <= -1 {
|
||||
return 0, nil // keep forever
|
||||
}
|
||||
cutoff := time.Now().UTC().AddDate(0, 0, -observerDays).Format(time.RFC3339)
|
||||
result, err := s.db.Exec(`UPDATE observers SET inactive = 1 WHERE last_seen < ? AND (inactive IS NULL OR inactive = 0)`, cutoff)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("mark stale observers inactive: %w", err)
|
||||
}
|
||||
removed, _ := result.RowsAffected()
|
||||
if removed > 0 {
|
||||
// Clean up orphaned metrics for now-inactive observers
|
||||
s.db.Exec(`DELETE FROM observer_metrics WHERE observer_id IN (SELECT id FROM observers WHERE inactive = 1)`)
|
||||
log.Printf("Marked %d observer(s) as inactive (not seen in %d days)", removed, observerDays)
|
||||
}
|
||||
return removed, nil
|
||||
}
|
||||
|
||||
// PacketData holds the data needed to insert a packet into the DB.
|
||||
type PacketData struct {
|
||||
RawHex string
|
||||
|
||||
@@ -61,6 +61,10 @@ func main() {
|
||||
nodeDays := cfg.NodeDaysOrDefault()
|
||||
store.MoveStaleNodes(nodeDays)
|
||||
|
||||
// Observer retention: remove stale observers on startup
|
||||
observerDays := cfg.ObserverDaysOrDefault()
|
||||
store.RemoveStaleObservers(observerDays)
|
||||
|
||||
// Metrics retention: prune old metrics on startup
|
||||
metricsDays := cfg.MetricsRetentionDays()
|
||||
store.PruneOldMetrics(metricsDays)
|
||||
@@ -73,6 +77,16 @@ func main() {
|
||||
}
|
||||
}()
|
||||
|
||||
// Daily ticker for observer retention (every 24h, staggered 90s after startup)
|
||||
observerRetentionTicker := time.NewTicker(24 * time.Hour)
|
||||
go func() {
|
||||
time.Sleep(90 * time.Second) // stagger after metrics prune
|
||||
store.RemoveStaleObservers(observerDays)
|
||||
for range observerRetentionTicker.C {
|
||||
store.RemoveStaleObservers(observerDays)
|
||||
}
|
||||
}()
|
||||
|
||||
// Daily ticker for metrics retention (every 24h)
|
||||
metricsRetentionTicker := time.NewTicker(24 * time.Hour)
|
||||
go func() {
|
||||
|
||||
+13
-3
@@ -110,9 +110,10 @@ type PacketStoreConfig struct {
|
||||
type GeoFilterConfig = geofilter.Config
|
||||
|
||||
type RetentionConfig struct {
|
||||
NodeDays int `json:"nodeDays"`
|
||||
PacketDays int `json:"packetDays"`
|
||||
MetricsDays int `json:"metricsDays"`
|
||||
NodeDays int `json:"nodeDays"`
|
||||
ObserverDays int `json:"observerDays"`
|
||||
PacketDays int `json:"packetDays"`
|
||||
MetricsDays int `json:"metricsDays"`
|
||||
}
|
||||
|
||||
// MetricsRetentionDays returns configured metrics retention or 30 days default.
|
||||
@@ -165,6 +166,15 @@ func (c *Config) NodeDaysOrDefault() int {
|
||||
return 7
|
||||
}
|
||||
|
||||
// ObserverDaysOrDefault returns the configured retention.observerDays or 14 if not set.
|
||||
// A value of -1 means observers are never removed.
|
||||
func (c *Config) ObserverDaysOrDefault() int {
|
||||
if c.Retention != nil && c.Retention.ObserverDays != 0 {
|
||||
return c.Retention.ObserverDays
|
||||
}
|
||||
return 14
|
||||
}
|
||||
|
||||
type HealthThresholds struct {
|
||||
InfraDegradedHours float64 `json:"infraDegradedHours"`
|
||||
InfraSilentHours float64 `json:"infraSilentHours"`
|
||||
|
||||
@@ -365,3 +365,25 @@ func TestPropagationBufferMs(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestObserverDaysOrDefault(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
cfg *Config
|
||||
want int
|
||||
}{
|
||||
{"nil retention", &Config{}, 14},
|
||||
{"zero observer days", &Config{Retention: &RetentionConfig{ObserverDays: 0}}, 14},
|
||||
{"positive value", &Config{Retention: &RetentionConfig{ObserverDays: 30}}, 30},
|
||||
{"keep forever", &Config{Retention: &RetentionConfig{ObserverDays: -1}}, -1},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := tt.cfg.ObserverDaysOrDefault()
|
||||
if got != tt.want {
|
||||
t.Errorf("ObserverDaysOrDefault() = %d, want %d", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2213,6 +2213,35 @@ func (db *DB) PruneOldMetrics(retentionDays int) (int64, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// RemoveStaleObservers marks observers that have not actively sent data in observerDays
|
||||
// as inactive (soft-delete). This preserves JOIN integrity for observations.observer_idx
|
||||
// and observer_metrics.observer_id — historical data still references the correct observer.
|
||||
// An observer must actively send data to stay listed — being seen by another node does not count.
|
||||
// observerDays <= -1 means never remove (keep forever).
|
||||
func (db *DB) RemoveStaleObservers(observerDays int) (int64, error) {
|
||||
if observerDays <= -1 {
|
||||
return 0, nil // keep forever
|
||||
}
|
||||
rw, err := openRW(db.path)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer rw.Close()
|
||||
|
||||
cutoff := time.Now().UTC().AddDate(0, 0, -observerDays).Format(time.RFC3339)
|
||||
res, err := rw.Exec(`UPDATE observers SET inactive = 1 WHERE last_seen < ? AND (inactive IS NULL OR inactive = 0)`, cutoff)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
n, _ := res.RowsAffected()
|
||||
if n > 0 {
|
||||
// Clean up orphaned metrics for now-inactive observers
|
||||
rw.Exec(`DELETE FROM observer_metrics WHERE observer_id IN (SELECT id FROM observers WHERE inactive = 1)`)
|
||||
log.Printf("[observers] Marked %d observer(s) as inactive (not seen in %d days)", n, observerDays)
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// TouchNodeLastSeen updates last_seen for a node identified by full public key.
|
||||
// Only updates if the new timestamp is newer than the existing value (or NULL).
|
||||
// Returns nil even if no rows are affected (node doesn't exist).
|
||||
|
||||
@@ -325,6 +325,40 @@ func main() {
|
||||
log.Printf("[metrics-prune] auto-prune enabled: metrics older than %d days", metricsDays)
|
||||
}
|
||||
|
||||
// Auto-prune stale observers
|
||||
var stopObserverPrune func()
|
||||
{
|
||||
observerDays := cfg.ObserverDaysOrDefault()
|
||||
if observerDays <= -1 {
|
||||
// -1 means keep forever, skip
|
||||
} else {
|
||||
observerPruneTicker := time.NewTicker(24 * time.Hour)
|
||||
observerPruneDone := make(chan struct{})
|
||||
stopObserverPrune = func() {
|
||||
observerPruneTicker.Stop()
|
||||
close(observerPruneDone)
|
||||
}
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("[observer-prune] panic recovered: %v", r)
|
||||
}
|
||||
}()
|
||||
time.Sleep(3 * time.Minute) // stagger after metrics prune
|
||||
database.RemoveStaleObservers(observerDays)
|
||||
for {
|
||||
select {
|
||||
case <-observerPruneTicker.C:
|
||||
database.RemoveStaleObservers(observerDays)
|
||||
case <-observerPruneDone:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
log.Printf("[observer-prune] auto-prune enabled: observers not seen in %d days will be removed", observerDays)
|
||||
}
|
||||
}
|
||||
|
||||
// Auto-prune old neighbor edges
|
||||
var stopEdgePrune func()
|
||||
{
|
||||
@@ -386,6 +420,9 @@ func main() {
|
||||
if stopMetricsPrune != nil {
|
||||
stopMetricsPrune()
|
||||
}
|
||||
if stopObserverPrune != nil {
|
||||
stopObserverPrune()
|
||||
}
|
||||
if stopEdgePrune != nil {
|
||||
stopEdgePrune()
|
||||
}
|
||||
|
||||
+20
-1
@@ -2385,13 +2385,32 @@ func (s *Server) handleAdminPrune(w http.ResponseWriter, r *http.Request) {
|
||||
writeError(w, 400, "days parameter required (or set retention.packetDays in config)")
|
||||
return
|
||||
}
|
||||
|
||||
results := map[string]interface{}{}
|
||||
|
||||
// Prune old packets
|
||||
n, err := s.db.PruneOldPackets(days)
|
||||
if err != nil {
|
||||
writeError(w, 500, err.Error())
|
||||
return
|
||||
}
|
||||
log.Printf("[prune] deleted %d transmissions older than %d days", n, days)
|
||||
writeJSON(w, map[string]interface{}{"deleted": n, "days": days})
|
||||
results["packets_deleted"] = n
|
||||
results["deleted"] = n // legacy alias
|
||||
|
||||
// Also mark stale observers as inactive if observerDays is configured
|
||||
observerDays := s.cfg.ObserverDaysOrDefault()
|
||||
if observerDays > 0 {
|
||||
obsN, obsErr := s.db.RemoveStaleObservers(observerDays)
|
||||
if obsErr != nil {
|
||||
log.Printf("[prune] observer prune error: %v", obsErr)
|
||||
} else {
|
||||
results["observers_inactive"] = obsN
|
||||
}
|
||||
}
|
||||
|
||||
results["days"] = days
|
||||
writeJSON(w, results)
|
||||
}
|
||||
|
||||
// constantTimeEqual compares two strings in constant time to prevent timing attacks.
|
||||
|
||||
+2
-1
@@ -3,8 +3,9 @@
|
||||
"apiKey": "your-secret-api-key-here",
|
||||
"retention": {
|
||||
"nodeDays": 7,
|
||||
"observerDays": 14,
|
||||
"packetDays": 30,
|
||||
"_comment": "nodeDays: nodes not seen in N days are moved to inactive_nodes (default 7). packetDays: transmissions+observations older than N days are deleted daily (0 = disabled)."
|
||||
"_comment": "nodeDays: nodes not seen in N days moved to inactive_nodes (default 7). observerDays: observers not sending data in N days are removed (-1 = keep forever, default 14). packetDays: transmissions older than N days are deleted (0 = disabled)."
|
||||
},
|
||||
"https": {
|
||||
"cert": "/path/to/cert.pem",
|
||||
|
||||
Reference in New Issue
Block a user