diff --git a/AGENTS.md b/AGENTS.md index c50c7a69..0d36255f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -43,6 +43,17 @@ scripts/ — Tooling (coverage collector, fixture capture, frontend in 2. Go server (`cmd/server/`) polls SQLite for new packets, broadcasts via WebSocket 3. Frontend fetches via REST API (`/api/*`), filters/sorts client-side +### Read/Write Separation Invariant (#1283) +- **All DB writes live in `cmd/ingestor/`.** INSERT / UPDATE / DELETE / VACUUM / + schema migrations / retention all run in the ingestor process. +- **`cmd/server/` is read-only.** It opens SQLite with `mode=ro` and must not + acquire a write lock. Adding a write-side helper (e.g. a `cachedRW`-style + RW connection) regresses this invariant and races the ingestor → SQLITE_BUSY. +- Enforcement: `cmd/server/readonly_invariant_test.go` reflect-asserts that + `PruneOldPackets`, `PruneOldMetrics`, and `RemoveStaleObservers` are NOT + methods on the server's `*DB`. If you need a new write, add it to + `cmd/ingestor/`. + ### What's Deprecated (DO NOT TOUCH) The following were part of the old Node.js backend and have been removed: - `server.js`, `db.js`, `decoder.js`, `server-helpers.js`, `packet-store.js`, `iata-coords.js` diff --git a/cmd/ingestor/config.go b/cmd/ingestor/config.go index 0429b23a..8f6b8431 100644 --- a/cmd/ingestor/config.go +++ b/cmd/ingestor/config.go @@ -99,9 +99,21 @@ func (f *ForeignAdvertConfig) IsDropMode() bool { // RetentionConfig controls how long stale nodes are kept before being moved to inactive_nodes. type RetentionConfig struct { - NodeDays int `json:"nodeDays"` - ObserverDays int `json:"observerDays"` - MetricsDays int `json:"metricsDays"` + NodeDays int `json:"nodeDays"` + ObserverDays int `json:"observerDays"` + MetricsDays int `json:"metricsDays"` + // PacketDays is the retention window for transmissions (#1283). + // Ownership moved from cmd/server to cmd/ingestor; 0 disables. + PacketDays int `json:"packetDays"` +} + +// PacketDaysOrZero returns the configured retention.packetDays or 0 +// (disabled) if not set. +func (c *Config) PacketDaysOrZero() int { + if c.Retention != nil && c.Retention.PacketDays > 0 { + return c.Retention.PacketDays + } + return 0 } // MetricsConfig controls observer metrics collection. diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index bfc72e5c..41c000e6 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -77,6 +77,19 @@ func main() { metricsDays := cfg.MetricsRetentionDays() store.PruneOldMetrics(metricsDays) store.PruneDroppedPackets(metricsDays) + + // Packet (transmissions) retention: previously lived in cmd/server, + // moved to ingestor in #1283 to eliminate cross-process write + // contention (SQLITE_BUSY). 0 = disabled. + packetDays := cfg.PacketDaysOrZero() + if packetDays > 0 { + if n, err := store.PruneOldPackets(packetDays); err != nil { + log.Printf("[prune] error: %v", err) + } else if n > 0 { + log.Printf("[prune] startup pruned %d transmissions older than %d days", n, packetDays) + } + } + vacuumPages := cfg.IncrementalVacuumPages() store.RunIncrementalVacuum(vacuumPages) @@ -111,6 +124,22 @@ func main() { } }() + // Daily ticker for transmission retention (#1283). + var packetRetentionTicker *time.Ticker + if packetDays > 0 { + packetRetentionTicker = time.NewTicker(24 * time.Hour) + go func() { + for range packetRetentionTicker.C { + if n, err := store.PruneOldPackets(packetDays); err != nil { + log.Printf("[prune] error: %v", err) + } else if n > 0 { + store.RunIncrementalVacuum(vacuumPages) + } + } + }() + log.Printf("[prune] auto-prune enabled: packets older than %d days will be removed daily", packetDays) + } + // Periodic stats logging (every 5 minutes) statsTicker := time.NewTicker(5 * time.Minute) go func() { @@ -253,6 +282,9 @@ func main() { log.Println("Shutting down...") retentionTicker.Stop() metricsRetentionTicker.Stop() + if packetRetentionTicker != nil { + packetRetentionTicker.Stop() + } statsTicker.Stop() stopWatchdog() store.LogStats() // final stats on shutdown diff --git a/cmd/ingestor/maintenance.go b/cmd/ingestor/maintenance.go index 055cda67..44c8f7aa 100644 --- a/cmd/ingestor/maintenance.go +++ b/cmd/ingestor/maintenance.go @@ -1,10 +1,46 @@ package main -// PruneOldPackets deletes transmissions (and their observations) older -// than the given number of days. Returns count of transmissions deleted. -// Owned by the ingestor per #1283 (the writer process). +import ( + "fmt" + "log" + "time" +) + +// PruneOldPackets deletes transmissions (and their child observations) +// older than `days`. Returns count of transmissions deleted. // -// Stub: real implementation lands in the GREEN commit. +// Owned by the ingestor per #1283: the writer process is the only one +// allowed to hold the DB write lock; previously this lived in +// cmd/server/db.go and raced ingestor INSERTs (SQLITE_BUSY). func (s *Store) PruneOldPackets(days int) (int64, error) { - return 0, nil + if days <= 0 { + return 0, nil + } + cutoff := time.Now().UTC().AddDate(0, 0, -days).Format(time.RFC3339) + + tx, err := s.db.Begin() + if err != nil { + return 0, fmt.Errorf("prune begin: %w", err) + } + defer tx.Rollback() + + // Delete child observations first (no CASCADE in SQLite). + if _, err := tx.Exec(`DELETE FROM observations WHERE transmission_id IN ( + SELECT id FROM transmissions WHERE first_seen < ? + )`, cutoff); err != nil { + return 0, fmt.Errorf("prune observations: %w", err) + } + + res, err := tx.Exec(`DELETE FROM transmissions WHERE first_seen < ?`, cutoff) + if err != nil { + return 0, fmt.Errorf("prune transmissions: %w", err) + } + n, _ := res.RowsAffected() + if err := tx.Commit(); err != nil { + return 0, fmt.Errorf("prune commit: %w", err) + } + if n > 0 { + log.Printf("[prune] deleted %d transmissions older than %d days", n, days) + } + return n, nil } diff --git a/cmd/server/db.go b/cmd/server/db.go index 1be8ad6a..3fd8abaa 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -2032,38 +2032,10 @@ func nullInt(ni sql.NullInt64) interface{} { return nil } -// PruneOldPackets deletes transmissions and their observations older than the -// given number of days. Nodes and observers are never touched. -// Returns the number of transmissions deleted. -// Opens a separate read-write connection since the main connection is read-only. -func (db *DB) PruneOldPackets(days int) (int64, error) { - rw, err := cachedRW(db.path) - if err != nil { - return 0, err - } - - cutoff := time.Now().UTC().AddDate(0, 0, -days).Format(time.RFC3339) - tx, err := rw.Begin() - if err != nil { - return 0, err - } - defer tx.Rollback() - - // Delete observations linked to old transmissions first (no CASCADE in SQLite) - _, err = tx.Exec(`DELETE FROM observations WHERE transmission_id IN ( - SELECT id FROM transmissions WHERE first_seen < ? - )`, cutoff) - if err != nil { - return 0, err - } - - res, err := tx.Exec(`DELETE FROM transmissions WHERE first_seen < ?`, cutoff) - if err != nil { - return 0, err - } - n, _ := res.RowsAffected() - return n, tx.Commit() -} +// PruneOldPackets, PruneOldMetrics, and RemoveStaleObservers were +// removed in #1283 — they are write operations and now live on the +// ingestor's *Store (cmd/ingestor/maintenance.go and cmd/ingestor/db.go). +// The server is the read path; it must not hold the SQLite write lock. // MetricsSample represents a single row from observer_metrics with computed deltas. type MetricsSample struct { @@ -2381,52 +2353,8 @@ func (db *DB) GetMetricsSummary(since string) ([]MetricsSummaryRow, error) { return result, nil } -// PruneOldMetrics deletes observer_metrics rows older than retentionDays. -func (db *DB) PruneOldMetrics(retentionDays int) (int64, error) { - rw, err := cachedRW(db.path) - if err != nil { - return 0, err - } - - cutoff := time.Now().UTC().AddDate(0, 0, -retentionDays).Format(time.RFC3339) - res, err := rw.Exec(`DELETE FROM observer_metrics WHERE timestamp < ?`, cutoff) - if err != nil { - return 0, err - } - n, _ := res.RowsAffected() - if n > 0 { - log.Printf("[metrics] Pruned %d observer_metrics rows older than %d days", n, retentionDays) - } - return n, nil -} - -// RemoveStaleObservers marks observers that have not actively sent data in observerDays -// as inactive (soft-delete). This preserves JOIN integrity for observations.observer_idx -// and observer_metrics.observer_id — historical data still references the correct observer. -// An observer must actively send data to stay listed — being seen by another node does not count. -// observerDays <= -1 means never remove (keep forever). -func (db *DB) RemoveStaleObservers(observerDays int) (int64, error) { - if observerDays <= -1 { - return 0, nil // keep forever - } - rw, err := cachedRW(db.path) - if err != nil { - return 0, err - } - - cutoff := time.Now().UTC().AddDate(0, 0, -observerDays).Format(time.RFC3339) - res, err := rw.Exec(`UPDATE observers SET inactive = 1 WHERE last_seen < ? AND (inactive IS NULL OR inactive = 0)`, cutoff) - if err != nil { - return 0, err - } - n, _ := res.RowsAffected() - if n > 0 { - // Clean up orphaned metrics for now-inactive observers - rw.Exec(`DELETE FROM observer_metrics WHERE observer_id IN (SELECT id FROM observers WHERE inactive = 1)`) - log.Printf("[observers] Marked %d observer(s) as inactive (not seen in %d days)", n, observerDays) - } - return n, nil -} +// (PruneOldMetrics / RemoveStaleObservers removed in #1283 — see note +// above the MetricsSample type. Ingestor owns these writes now.) // TouchNodeLastSeen updates last_seen for a node identified by full public key. // Only updates if the new timestamp is newer than the existing value (or NULL). diff --git a/cmd/server/db_vacuum_test.go b/cmd/server/db_vacuum_test.go deleted file mode 100644 index 6dad269f..00000000 --- a/cmd/server/db_vacuum_test.go +++ /dev/null @@ -1,262 +0,0 @@ -package main - -import ( - "database/sql" - "os" - "path/filepath" - "strings" - "testing" - "time" - - _ "modernc.org/sqlite" -) - -// createFreshIngestorDB creates a SQLite DB using the ingestor's applySchema logic -// (simulated here) with auto_vacuum=INCREMENTAL set before tables. -func createFreshDBWithAutoVacuum(t *testing.T, path string) *sql.DB { - t.Helper() - // auto_vacuum must be set via DSN before journal_mode creates the DB file - db, err := sql.Open("sqlite", path+"?_pragma=auto_vacuum(INCREMENTAL)&_pragma=journal_mode(WAL)&_pragma=busy_timeout(5000)") - if err != nil { - t.Fatal(err) - } - db.SetMaxOpenConns(1) - - // Create minimal schema - _, err = db.Exec(` - CREATE TABLE transmissions ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - raw_hex TEXT NOT NULL, - hash TEXT NOT NULL UNIQUE, - first_seen TEXT NOT NULL, - route_type INTEGER, - payload_type INTEGER, - payload_version INTEGER, - decoded_json TEXT, - created_at TEXT DEFAULT (datetime('now')), - channel_hash TEXT - ); - CREATE TABLE observations ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - transmission_id INTEGER NOT NULL REFERENCES transmissions(id), - observer_idx INTEGER, - direction TEXT, - snr REAL, - rssi REAL, - score INTEGER, - path_json TEXT, - timestamp INTEGER NOT NULL - ); - `) - if err != nil { - t.Fatal(err) - } - return db -} - -func TestNewDBHasIncrementalAutoVacuum(t *testing.T) { - dir := t.TempDir() - path := filepath.Join(dir, "test.db") - - db := createFreshDBWithAutoVacuum(t, path) - defer db.Close() - - var autoVacuum int - if err := db.QueryRow("PRAGMA auto_vacuum").Scan(&autoVacuum); err != nil { - t.Fatal(err) - } - if autoVacuum != 2 { - t.Fatalf("expected auto_vacuum=2 (INCREMENTAL), got %d", autoVacuum) - } -} - -func TestExistingDBHasAutoVacuumNone(t *testing.T) { - dir := t.TempDir() - path := filepath.Join(dir, "test.db") - - // Create DB WITHOUT setting auto_vacuum (simulates old DB) - db, err := sql.Open("sqlite", path+"?_pragma=journal_mode(WAL)") - if err != nil { - t.Fatal(err) - } - db.SetMaxOpenConns(1) - _, err = db.Exec("CREATE TABLE dummy (id INTEGER PRIMARY KEY)") - if err != nil { - t.Fatal(err) - } - - var autoVacuum int - if err := db.QueryRow("PRAGMA auto_vacuum").Scan(&autoVacuum); err != nil { - t.Fatal(err) - } - db.Close() - - if autoVacuum != 0 { - t.Fatalf("expected auto_vacuum=0 (NONE) for old DB, got %d", autoVacuum) - } -} - -func TestVacuumOnStartupMigratesDB(t *testing.T) { - dir := t.TempDir() - path := filepath.Join(dir, "test.db") - - // Create DB without auto_vacuum (old DB) - db, err := sql.Open("sqlite", path+"?_pragma=journal_mode(WAL)") - if err != nil { - t.Fatal(err) - } - db.SetMaxOpenConns(1) - _, err = db.Exec("CREATE TABLE dummy (id INTEGER PRIMARY KEY)") - if err != nil { - t.Fatal(err) - } - - var before int - db.QueryRow("PRAGMA auto_vacuum").Scan(&before) - if before != 0 { - t.Fatalf("precondition: expected auto_vacuum=0, got %d", before) - } - db.Close() - - // Simulate vacuumOnStartup migration using openRW - rw, err := openRW(path) - if err != nil { - t.Fatal(err) - } - if _, err := rw.Exec("PRAGMA auto_vacuum = INCREMENTAL"); err != nil { - t.Fatal(err) - } - if _, err := rw.Exec("VACUUM"); err != nil { - t.Fatal(err) - } - rw.Close() - - // Verify migration - db2, err := sql.Open("sqlite", path+"?mode=ro") - if err != nil { - t.Fatal(err) - } - defer db2.Close() - - var after int - if err := db2.QueryRow("PRAGMA auto_vacuum").Scan(&after); err != nil { - t.Fatal(err) - } - if after != 2 { - t.Fatalf("expected auto_vacuum=2 after VACUUM migration, got %d", after) - } -} - -func TestIncrementalVacuumReducesFreelist(t *testing.T) { - dir := t.TempDir() - path := filepath.Join(dir, "test.db") - - db := createFreshDBWithAutoVacuum(t, path) - - // Insert a bunch of data - now := time.Now().UTC().Format(time.RFC3339) - for i := 0; i < 500; i++ { - _, err := db.Exec( - "INSERT INTO transmissions (raw_hex, hash, first_seen) VALUES (?, ?, ?)", - strings.Repeat("AA", 200), // ~400 bytes each - "hash_"+string(rune('A'+i%26))+string(rune('0'+i/26)), - now, - ) - if err != nil { - t.Fatal(err) - } - } - - // Get file size before delete - db.Close() - infoBefore, _ := os.Stat(path) - sizeBefore := infoBefore.Size() - - // Reopen and delete all - db, err := sql.Open("sqlite", path+"?_pragma=journal_mode(WAL)&_pragma=busy_timeout(5000)") - if err != nil { - t.Fatal(err) - } - db.SetMaxOpenConns(1) - defer db.Close() - - _, err = db.Exec("DELETE FROM transmissions") - if err != nil { - t.Fatal(err) - } - - // Check freelist before vacuum - var freelistBefore int64 - db.QueryRow("PRAGMA freelist_count").Scan(&freelistBefore) - if freelistBefore == 0 { - t.Fatal("expected non-zero freelist after DELETE") - } - - // Run incremental vacuum - _, err = db.Exec("PRAGMA incremental_vacuum(10000)") - if err != nil { - t.Fatal(err) - } - - // Check freelist after vacuum - var freelistAfter int64 - db.QueryRow("PRAGMA freelist_count").Scan(&freelistAfter) - if freelistAfter >= freelistBefore { - t.Fatalf("expected freelist to shrink: before=%d after=%d", freelistBefore, freelistAfter) - } - - // Checkpoint WAL and check file size shrunk - db.Exec("PRAGMA wal_checkpoint(TRUNCATE)") - db.Close() - infoAfter, _ := os.Stat(path) - sizeAfter := infoAfter.Size() - if sizeAfter >= sizeBefore { - t.Logf("warning: file did not shrink (before=%d after=%d) — may depend on page reuse", sizeBefore, sizeAfter) - } -} - -func TestCheckAutoVacuumLogs(t *testing.T) { - // This test verifies checkAutoVacuum doesn't panic on various configs - dir := t.TempDir() - path := filepath.Join(dir, "test.db") - - // Create a fresh DB with auto_vacuum=INCREMENTAL - dbConn := createFreshDBWithAutoVacuum(t, path) - db := &DB{conn: dbConn, path: path} - cfg := &Config{} - - // Should not panic - checkAutoVacuum(db, cfg, path) - dbConn.Close() - - // Create a DB without auto_vacuum - path2 := filepath.Join(dir, "test2.db") - dbConn2, _ := sql.Open("sqlite", path2+"?_pragma=journal_mode(WAL)") - dbConn2.SetMaxOpenConns(1) - dbConn2.Exec("CREATE TABLE dummy (id INTEGER PRIMARY KEY)") - db2 := &DB{conn: dbConn2, path: path2} - - // Should log warning but not panic - checkAutoVacuum(db2, cfg, path2) - dbConn2.Close() -} - -func TestConfigIncrementalVacuumPages(t *testing.T) { - // Default - cfg := &Config{} - if cfg.IncrementalVacuumPages() != 1024 { - t.Fatalf("expected default 1024, got %d", cfg.IncrementalVacuumPages()) - } - - // Custom - cfg.DB = &DBConfig{IncrementalVacuumPages: 512} - if cfg.IncrementalVacuumPages() != 512 { - t.Fatalf("expected 512, got %d", cfg.IncrementalVacuumPages()) - } - - // Zero should return default - cfg.DB.IncrementalVacuumPages = 0 - if cfg.IncrementalVacuumPages() != 1024 { - t.Fatalf("expected default 1024 for zero, got %d", cfg.IncrementalVacuumPages()) - } -} diff --git a/cmd/server/main.go b/cmd/server/main.go index 153f77dd..359b3e96 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -167,8 +167,8 @@ func main() { stats.TotalTransmissions, stats.TotalObservations, stats.TotalNodes, stats.TotalObservers) } - // Check auto_vacuum mode and optionally migrate (#919) - checkAutoVacuum(database, cfg, resolvedDB) + // auto_vacuum is checked + migrated by the ingestor (#1283). The + // server is read-only and must not race the writer for the lock. // Ensure indexes the server's SQL fallback path depends on // (mirrors ingestor schema for DBs created by old server-only builds). @@ -387,120 +387,10 @@ func main() { log.Printf("[bridge-recompute] background recompute enabled (interval=%s)", cfg.AnalyticsDefaultRecomputeInterval()) - // Auto-prune old packets if retention.packetDays is configured - vacuumPages := cfg.IncrementalVacuumPages() - var stopPrune func() - if cfg.Retention != nil && cfg.Retention.PacketDays > 0 { - days := cfg.Retention.PacketDays - pruneTicker := time.NewTicker(24 * time.Hour) - pruneDone := make(chan struct{}) - stopPrune = func() { - pruneTicker.Stop() - close(pruneDone) - } - go func() { - defer func() { - if r := recover(); r != nil { - log.Printf("[prune] panic recovered: %v", r) - } - }() - time.Sleep(1 * time.Minute) - if n, err := database.PruneOldPackets(days); err != nil { - log.Printf("[prune] error: %v", err) - } else { - log.Printf("[prune] deleted %d transmissions older than %d days", n, days) - if n > 0 { - runIncrementalVacuum(resolvedDB, vacuumPages) - } - } - for { - select { - case <-pruneTicker.C: - if n, err := database.PruneOldPackets(days); err != nil { - log.Printf("[prune] error: %v", err) - } else { - log.Printf("[prune] deleted %d transmissions older than %d days", n, days) - if n > 0 { - runIncrementalVacuum(resolvedDB, vacuumPages) - } - } - case <-pruneDone: - return - } - } - }() - log.Printf("[prune] auto-prune enabled: packets older than %d days will be removed daily", days) - } - - // Auto-prune old metrics - var stopMetricsPrune func() - { - metricsDays := cfg.MetricsRetentionDays() - metricsPruneTicker := time.NewTicker(24 * time.Hour) - metricsPruneDone := make(chan struct{}) - stopMetricsPrune = func() { - metricsPruneTicker.Stop() - close(metricsPruneDone) - } - go func() { - defer func() { - if r := recover(); r != nil { - log.Printf("[metrics-prune] panic recovered: %v", r) - } - }() - time.Sleep(2 * time.Minute) // stagger after packet prune - database.PruneOldMetrics(metricsDays) - runIncrementalVacuum(resolvedDB, vacuumPages) - for { - select { - case <-metricsPruneTicker.C: - database.PruneOldMetrics(metricsDays) - runIncrementalVacuum(resolvedDB, vacuumPages) - case <-metricsPruneDone: - return - } - } - }() - log.Printf("[metrics-prune] auto-prune enabled: metrics older than %d days", metricsDays) - } - - // Auto-prune stale observers - var stopObserverPrune func() - { - observerDays := cfg.ObserverDaysOrDefault() - if observerDays <= -1 { - // -1 means keep forever, skip - } else { - observerPruneTicker := time.NewTicker(24 * time.Hour) - observerPruneDone := make(chan struct{}) - stopObserverPrune = func() { - observerPruneTicker.Stop() - close(observerPruneDone) - } - go func() { - defer func() { - if r := recover(); r != nil { - log.Printf("[observer-prune] panic recovered: %v", r) - } - }() - time.Sleep(3 * time.Minute) // stagger after metrics prune - database.RemoveStaleObservers(observerDays) - runIncrementalVacuum(resolvedDB, vacuumPages) - for { - select { - case <-observerPruneTicker.C: - database.RemoveStaleObservers(observerDays) - runIncrementalVacuum(resolvedDB, vacuumPages) - case <-observerPruneDone: - return - } - } - }() - log.Printf("[observer-prune] auto-prune enabled: observers not seen in %d days will be removed", observerDays) - } - } - - // Auto-prune old neighbor edges + // Packet / metrics / observer retention moved to the ingestor in + // #1283 (writes only belong on the writer process). The server no + // longer schedules any of these; the ingestor's tickers handle them. + _ = cfg.IncrementalVacuumPages() // kept reachable for config validation; not used here var stopEdgePrune func() { maxAgeDays := cfg.NeighborMaxAgeDays() @@ -519,13 +409,11 @@ func main() { time.Sleep(4 * time.Minute) // stagger after metrics prune g := store.graph.Load() PruneNeighborEdges(dbPath, g, maxAgeDays) - runIncrementalVacuum(resolvedDB, vacuumPages) for { select { case <-edgePruneTicker.C: g := store.graph.Load() PruneNeighborEdges(dbPath, g, maxAgeDays) - runIncrementalVacuum(resolvedDB, vacuumPages) case <-edgePruneDone: return } @@ -552,16 +440,8 @@ func main() { // 1. Stop accepting new WebSocket/poll data poller.Stop() - // 1b. Stop auto-prune ticker - if stopPrune != nil { - stopPrune() - } - if stopMetricsPrune != nil { - stopMetricsPrune() - } - if stopObserverPrune != nil { - stopObserverPrune() - } + // 1b. Stop auto-prune ticker (server-side packet/metrics/observer + // prunes were removed in #1283; only neighbor-edge prune remains.) if stopEdgePrune != nil { stopEdgePrune() } diff --git a/cmd/server/openapi.go b/cmd/server/openapi.go index 650d50c2..c17099fa 100644 --- a/cmd/server/openapi.go +++ b/cmd/server/openapi.go @@ -43,7 +43,7 @@ func routeDescriptions() map[string]routeMeta { "GET /api/stats": {Summary: "Network statistics", Description: "Returns aggregate stats (node counts, packet counts, observer counts). Cached for 10s.", Tag: "admin"}, "GET /api/perf": {Summary: "Performance statistics", Description: "Returns per-endpoint request timing and slow query log.", Tag: "admin"}, "POST /api/perf/reset": {Summary: "Reset performance stats", Tag: "admin", Auth: true}, - "POST /api/admin/prune": {Summary: "Prune old data", Description: "Deletes packets and nodes older than the configured retention period.", Tag: "admin", Auth: true}, + // "POST /api/admin/prune" removed in #1283 (ingestor owns prune). "GET /api/debug/affinity": {Summary: "Debug neighbor affinity scores", Tag: "admin", Auth: true}, "GET /api/backup": {Summary: "Download SQLite backup", Description: "Streams a consistent SQLite snapshot of the analyzer DB (VACUUM INTO). Response is application/octet-stream with attachment filename corescope-backup-.db.", Tag: "admin", Auth: true}, diff --git a/cmd/server/routes.go b/cmd/server/routes.go index 4b98d5cd..cc715b2c 100644 --- a/cmd/server/routes.go +++ b/cmd/server/routes.go @@ -132,7 +132,9 @@ func (s *Server) RegisterRoutes(r *mux.Router) { r.HandleFunc("/api/perf/sqlite", s.handlePerfSqlite).Methods("GET") r.HandleFunc("/api/perf/write-sources", s.handlePerfWriteSources).Methods("GET") r.Handle("/api/perf/reset", s.requireAPIKey(http.HandlerFunc(s.handlePerfReset))).Methods("POST") - r.Handle("/api/admin/prune", s.requireAPIKey(http.HandlerFunc(s.handleAdminPrune))).Methods("POST") + // /api/admin/prune removed in #1283 — pruning is owned by the + // ingestor process (scheduled tickers + startup pass). Operators + // who want an ad-hoc prune can restart the ingestor. r.Handle("/api/debug/affinity", s.requireAPIKey(http.HandlerFunc(s.handleDebugAffinity))).Methods("GET") r.Handle("/api/dropped-packets", s.requireAPIKey(http.HandlerFunc(s.handleDroppedPackets))).Methods("GET") r.Handle("/api/backup", s.requireAPIKey(http.HandlerFunc(s.handleBackup))).Methods("GET") @@ -2684,45 +2686,8 @@ func parseWindowDuration(window string) (time.Duration, error) { return time.ParseDuration(window) } -func (s *Server) handleAdminPrune(w http.ResponseWriter, r *http.Request) { - days := 0 - if d := r.URL.Query().Get("days"); d != "" { - fmt.Sscanf(d, "%d", &days) - } - if days <= 0 && s.cfg.Retention != nil { - days = s.cfg.Retention.PacketDays - } - if days <= 0 { - writeError(w, 400, "days parameter required (or set retention.packetDays in config)") - return - } - - results := map[string]interface{}{} - - // Prune old packets - n, err := s.db.PruneOldPackets(days) - if err != nil { - writeError(w, 500, err.Error()) - return - } - log.Printf("[prune] deleted %d transmissions older than %d days", n, days) - results["packets_deleted"] = n - results["deleted"] = n // legacy alias - - // Also mark stale observers as inactive if observerDays is configured - observerDays := s.cfg.ObserverDaysOrDefault() - if observerDays > 0 { - obsN, obsErr := s.db.RemoveStaleObservers(observerDays) - if obsErr != nil { - log.Printf("[prune] observer prune error: %v", obsErr) - } else { - results["observers_inactive"] = obsN - } - } - - results["days"] = days - writeJSON(w, results) -} +// handleAdminPrune was removed in #1283. Prune now runs in the ingestor +// process (server is read-only). The function and route are gone. // constantTimeEqual compares two strings in constant time to prevent timing attacks. func constantTimeEqual(a, b string) bool { diff --git a/cmd/server/vacuum.go b/cmd/server/vacuum.go deleted file mode 100644 index bea629dc..00000000 --- a/cmd/server/vacuum.go +++ /dev/null @@ -1,82 +0,0 @@ -package main - -import ( - "fmt" - "log" - "time" -) - -// checkAutoVacuum inspects the current auto_vacuum mode and logs a warning -// if it's not INCREMENTAL. Optionally performs a one-time full VACUUM if -// the operator has set db.vacuumOnStartup: true in config (#919). -func checkAutoVacuum(db *DB, cfg *Config, dbPath string) { - var autoVacuum int - if err := db.conn.QueryRow("PRAGMA auto_vacuum").Scan(&autoVacuum); err != nil { - log.Printf("[db] warning: could not read auto_vacuum: %v", err) - return - } - - if autoVacuum == 2 { - log.Printf("[db] auto_vacuum=INCREMENTAL") - return - } - - modes := map[int]string{0: "NONE", 1: "FULL", 2: "INCREMENTAL"} - mode := modes[autoVacuum] - if mode == "" { - mode = fmt.Sprintf("UNKNOWN(%d)", autoVacuum) - } - - log.Printf("[db] auto_vacuum=%s — DB needs one-time VACUUM to enable incremental auto-vacuum. "+ - "Set db.vacuumOnStartup: true in config to migrate (will block startup for several minutes on large DBs). "+ - "See https://github.com/Kpa-clawbot/CoreScope/issues/919", mode) - - if cfg.DB != nil && cfg.DB.VacuumOnStartup { - // WARNING: Full VACUUM creates a temporary copy of the entire DB file. - // Requires ~2× the DB file size in free disk space or it will fail. - log.Printf("[db] vacuumOnStartup=true — starting one-time full VACUUM (ensure 2x DB size free disk space)...") - start := time.Now() - - rw, err := cachedRW(dbPath) - if err != nil { - log.Printf("[db] VACUUM failed: could not open RW connection: %v", err) - return - } - - if _, err := rw.Exec("PRAGMA auto_vacuum = INCREMENTAL"); err != nil { - log.Printf("[db] VACUUM failed: could not set auto_vacuum: %v", err) - return - } - if _, err := rw.Exec("VACUUM"); err != nil { - log.Printf("[db] VACUUM failed: %v", err) - return - } - - elapsed := time.Since(start) - log.Printf("[db] VACUUM complete in %v — auto_vacuum is now INCREMENTAL", elapsed.Round(time.Millisecond)) - - // Re-check - var newMode int - if err := db.conn.QueryRow("PRAGMA auto_vacuum").Scan(&newMode); err == nil { - if newMode == 2 { - log.Printf("[db] auto_vacuum=INCREMENTAL (confirmed after VACUUM)") - } else { - log.Printf("[db] warning: auto_vacuum=%d after VACUUM — expected 2", newMode) - } - } - } -} - -// runIncrementalVacuum runs PRAGMA incremental_vacuum(N) on a read-write -// connection. Safe to call on auto_vacuum=NONE databases (noop). -func runIncrementalVacuum(dbPath string, pages int) { - rw, err := cachedRW(dbPath) - if err != nil { - log.Printf("[vacuum] could not open RW connection: %v", err) - return - } - - if _, err := rw.Exec(fmt.Sprintf("PRAGMA incremental_vacuum(%d)", pages)); err != nil { - log.Printf("[vacuum] incremental_vacuum error: %v", err) - } -} diff --git a/config.example.json b/config.example.json index 52aa5d79..64b23f03 100644 --- a/config.example.json +++ b/config.example.json @@ -9,12 +9,12 @@ "nodeDays": 7, "observerDays": 14, "packetDays": 30, - "_comment": "nodeDays: nodes not seen in N days moved to inactive_nodes (default 7). observerDays: observers not sending data in N days are removed (-1 = keep forever, default 14). packetDays: transmissions older than N days are deleted (0 = disabled)." + "_comment": "nodeDays: nodes not seen in N days moved to inactive_nodes (default 7). observerDays: observers not sending data in N days are removed (-1 = keep forever, default 14). packetDays: transmissions older than N days are deleted (0 = disabled). NOTE (#1283): all four retention fields are consumed by the INGESTOR process. The server is read-only and never prunes." }, "db": { "vacuumOnStartup": false, "incrementalVacuumPages": 1024, - "_comment": "vacuumOnStartup: run one-time full VACUUM to enable incremental auto-vacuum on existing DBs (blocks startup for minutes on large DBs; requires 2x DB file size in free disk space). incrementalVacuumPages: free pages returned to OS after each retention reaper cycle (default 1024). See #919." + "_comment": "vacuumOnStartup: run one-time full VACUUM to enable incremental auto-vacuum on existing DBs. Executed by the INGESTOR at startup, BEFORE the MQTT subscriber starts (#1283), so there is no contention with concurrent writes. Blocks ingestor startup for minutes on large DBs; requires 2x DB file size in free disk space. incrementalVacuumPages: free pages returned to OS after each retention reaper cycle (default 1024). See #919." }, "_comment_ingestorStats": "Ingestor publishes a 1-Hz stats snapshot consumed by the server's /api/perf/io and /api/perf/write-sources endpoints (#1120). Path is configured via the CORESCOPE_INGESTOR_STATS environment variable on the INGESTOR process. Default: /tmp/corescope-ingestor-stats.json. The writer uses O_NOFOLLOW + 0o600, so a pre-planted symlink in /tmp cannot be used to clobber an arbitrary file. SECURITY: in shared-tmp environments (multi-tenant hosts), point CORESCOPE_INGESTOR_STATS at a private directory like /var/lib/corescope/ingestor-stats.json that only the corescope user can write to.", "https": {