refactor(db): move all server writes to ingestor; server truly read-only (#1283)

Eliminates the SQLITE_BUSY VACUUM bug from #1283 by making cmd/server
truly read-only. The bug surfaced when supervisord launched both
ingestor + server in one container: the ingestor took the write lock for
INSERTs, then the server's VACUUM-on-startup immediately failed with
SQLITE_BUSY. Same race latently affected three other server-side writes.

Four write operations moved out of cmd/server/:

1. VACUUM / auto_vacuum migration (cmd/server/vacuum.go, entire file)
   → cmd/ingestor/db.go Store.CheckAutoVacuum (already existed;
     ingestor runs it BEFORE the MQTT subscriber starts so there is
     no contention with concurrent writes).

2. PruneOldPackets (DELETE FROM transmissions)
   cmd/server/db.go → cmd/ingestor/maintenance.go (new file,
     Store.PruneOldPackets) + main.go scheduler.

3. PruneOldMetrics (DELETE FROM observer_metrics)
   cmd/server/db.go → cmd/ingestor/db.go Store.PruneOldMetrics
     (already existed).

4. RemoveStaleObservers (UPDATE observers SET inactive = 1)
   cmd/server/db.go → cmd/ingestor/db.go Store.RemoveStaleObservers
     (already existed).

Server-side changes:
- vacuum.go deleted; checkAutoVacuum / runIncrementalVacuum gone.
- cmd/server/db.go: PruneOldPackets, PruneOldMetrics, RemoveStaleObservers
  deleted.
- cmd/server/main.go: packet/metrics/observer prune schedulers removed;
  the neighbor-edge prune scheduler (PruneNeighborEdges) is intentionally
  left in place — outside scope of #1283, tracked separately.
- routes.go + openapi.go: /api/admin/prune endpoint removed (prune is
  scheduled by the ingestor now; operators restart the ingestor for an
  ad-hoc pass).

Ingestor changes:
- New cmd/ingestor/maintenance.go with Store.PruneOldPackets.
- cmd/ingestor/config.go gains RetentionConfig.PacketDays and
  Config.PacketDaysOrZero().
- cmd/ingestor/main.go runs PruneOldPackets at startup (if
  packetDays > 0) and on a 24h ticker.

Docs:
- AGENTS.md: documents the read/write separation invariant.
- config.example.json: notes that retention + vacuumOnStartup are
  consumed by the ingestor.

TDD:
- Red: bb1d749a — invariant tests + Store.PruneOldPackets stub.
- Green: this commit — real implementation + server-side removals.

Note: cachedRW() still has three out-of-scope callers in cmd/server
(neighbor_persist.go, ensure_indexes.go, from_pubkey_migration.go).
Those are pre-existing write paths not covered by issue #1283 and are
left untouched per the issue scope. Future work can relocate them
under the same invariant.
This commit is contained in:
MeshCore Bot
2026-05-19 06:30:16 +00:00
parent f6290b6373
commit dbadef3e2f
11 changed files with 121 additions and 601 deletions
+11
View File
@@ -43,6 +43,17 @@ scripts/ — Tooling (coverage collector, fixture capture, frontend in
2. Go server (`cmd/server/`) polls SQLite for new packets, broadcasts via WebSocket
3. Frontend fetches via REST API (`/api/*`), filters/sorts client-side
### Read/Write Separation Invariant (#1283)
- **All DB writes live in `cmd/ingestor/`.** INSERT / UPDATE / DELETE / VACUUM /
schema migrations / retention all run in the ingestor process.
- **`cmd/server/` is read-only.** It opens SQLite with `mode=ro` and must not
acquire a write lock. Adding a write-side helper (e.g. a `cachedRW`-style
RW connection) regresses this invariant and races the ingestor → SQLITE_BUSY.
- Enforcement: `cmd/server/readonly_invariant_test.go` reflect-asserts that
`PruneOldPackets`, `PruneOldMetrics`, and `RemoveStaleObservers` are NOT
methods on the server's `*DB`. If you need a new write, add it to
`cmd/ingestor/`.
### What's Deprecated (DO NOT TOUCH)
The following were part of the old Node.js backend and have been removed:
- `server.js`, `db.js`, `decoder.js`, `server-helpers.js`, `packet-store.js`, `iata-coords.js`
+15 -3
View File
@@ -99,9 +99,21 @@ func (f *ForeignAdvertConfig) IsDropMode() bool {
// RetentionConfig controls how long stale nodes are kept before being moved to inactive_nodes.
type RetentionConfig struct {
NodeDays int `json:"nodeDays"`
ObserverDays int `json:"observerDays"`
MetricsDays int `json:"metricsDays"`
NodeDays int `json:"nodeDays"`
ObserverDays int `json:"observerDays"`
MetricsDays int `json:"metricsDays"`
// PacketDays is the retention window for transmissions (#1283).
// Ownership moved from cmd/server to cmd/ingestor; 0 disables.
PacketDays int `json:"packetDays"`
}
// PacketDaysOrZero returns the configured retention.packetDays or 0
// (disabled) if not set.
func (c *Config) PacketDaysOrZero() int {
if c.Retention != nil && c.Retention.PacketDays > 0 {
return c.Retention.PacketDays
}
return 0
}
// MetricsConfig controls observer metrics collection.
+32
View File
@@ -77,6 +77,19 @@ func main() {
metricsDays := cfg.MetricsRetentionDays()
store.PruneOldMetrics(metricsDays)
store.PruneDroppedPackets(metricsDays)
// Packet (transmissions) retention: previously lived in cmd/server,
// moved to ingestor in #1283 to eliminate cross-process write
// contention (SQLITE_BUSY). 0 = disabled.
packetDays := cfg.PacketDaysOrZero()
if packetDays > 0 {
if n, err := store.PruneOldPackets(packetDays); err != nil {
log.Printf("[prune] error: %v", err)
} else if n > 0 {
log.Printf("[prune] startup pruned %d transmissions older than %d days", n, packetDays)
}
}
vacuumPages := cfg.IncrementalVacuumPages()
store.RunIncrementalVacuum(vacuumPages)
@@ -111,6 +124,22 @@ func main() {
}
}()
// Daily ticker for transmission retention (#1283).
var packetRetentionTicker *time.Ticker
if packetDays > 0 {
packetRetentionTicker = time.NewTicker(24 * time.Hour)
go func() {
for range packetRetentionTicker.C {
if n, err := store.PruneOldPackets(packetDays); err != nil {
log.Printf("[prune] error: %v", err)
} else if n > 0 {
store.RunIncrementalVacuum(vacuumPages)
}
}
}()
log.Printf("[prune] auto-prune enabled: packets older than %d days will be removed daily", packetDays)
}
// Periodic stats logging (every 5 minutes)
statsTicker := time.NewTicker(5 * time.Minute)
go func() {
@@ -253,6 +282,9 @@ func main() {
log.Println("Shutting down...")
retentionTicker.Stop()
metricsRetentionTicker.Stop()
if packetRetentionTicker != nil {
packetRetentionTicker.Stop()
}
statsTicker.Stop()
stopWatchdog()
store.LogStats() // final stats on shutdown
+41 -5
View File
@@ -1,10 +1,46 @@
package main
// PruneOldPackets deletes transmissions (and their observations) older
// than the given number of days. Returns count of transmissions deleted.
// Owned by the ingestor per #1283 (the writer process).
import (
"fmt"
"log"
"time"
)
// PruneOldPackets deletes transmissions (and their child observations)
// older than `days`. Returns count of transmissions deleted.
//
// Stub: real implementation lands in the GREEN commit.
// Owned by the ingestor per #1283: the writer process is the only one
// allowed to hold the DB write lock; previously this lived in
// cmd/server/db.go and raced ingestor INSERTs (SQLITE_BUSY).
func (s *Store) PruneOldPackets(days int) (int64, error) {
return 0, nil
if days <= 0 {
return 0, nil
}
cutoff := time.Now().UTC().AddDate(0, 0, -days).Format(time.RFC3339)
tx, err := s.db.Begin()
if err != nil {
return 0, fmt.Errorf("prune begin: %w", err)
}
defer tx.Rollback()
// Delete child observations first (no CASCADE in SQLite).
if _, err := tx.Exec(`DELETE FROM observations WHERE transmission_id IN (
SELECT id FROM transmissions WHERE first_seen < ?
)`, cutoff); err != nil {
return 0, fmt.Errorf("prune observations: %w", err)
}
res, err := tx.Exec(`DELETE FROM transmissions WHERE first_seen < ?`, cutoff)
if err != nil {
return 0, fmt.Errorf("prune transmissions: %w", err)
}
n, _ := res.RowsAffected()
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("prune commit: %w", err)
}
if n > 0 {
log.Printf("[prune] deleted %d transmissions older than %d days", n, days)
}
return n, nil
}
+6 -78
View File
@@ -2032,38 +2032,10 @@ func nullInt(ni sql.NullInt64) interface{} {
return nil
}
// PruneOldPackets deletes transmissions and their observations older than the
// given number of days. Nodes and observers are never touched.
// Returns the number of transmissions deleted.
// Opens a separate read-write connection since the main connection is read-only.
func (db *DB) PruneOldPackets(days int) (int64, error) {
rw, err := cachedRW(db.path)
if err != nil {
return 0, err
}
cutoff := time.Now().UTC().AddDate(0, 0, -days).Format(time.RFC3339)
tx, err := rw.Begin()
if err != nil {
return 0, err
}
defer tx.Rollback()
// Delete observations linked to old transmissions first (no CASCADE in SQLite)
_, err = tx.Exec(`DELETE FROM observations WHERE transmission_id IN (
SELECT id FROM transmissions WHERE first_seen < ?
)`, cutoff)
if err != nil {
return 0, err
}
res, err := tx.Exec(`DELETE FROM transmissions WHERE first_seen < ?`, cutoff)
if err != nil {
return 0, err
}
n, _ := res.RowsAffected()
return n, tx.Commit()
}
// PruneOldPackets, PruneOldMetrics, and RemoveStaleObservers were
// removed in #1283 — they are write operations and now live on the
// ingestor's *Store (cmd/ingestor/maintenance.go and cmd/ingestor/db.go).
// The server is the read path; it must not hold the SQLite write lock.
// MetricsSample represents a single row from observer_metrics with computed deltas.
type MetricsSample struct {
@@ -2381,52 +2353,8 @@ func (db *DB) GetMetricsSummary(since string) ([]MetricsSummaryRow, error) {
return result, nil
}
// PruneOldMetrics deletes observer_metrics rows older than retentionDays.
func (db *DB) PruneOldMetrics(retentionDays int) (int64, error) {
rw, err := cachedRW(db.path)
if err != nil {
return 0, err
}
cutoff := time.Now().UTC().AddDate(0, 0, -retentionDays).Format(time.RFC3339)
res, err := rw.Exec(`DELETE FROM observer_metrics WHERE timestamp < ?`, cutoff)
if err != nil {
return 0, err
}
n, _ := res.RowsAffected()
if n > 0 {
log.Printf("[metrics] Pruned %d observer_metrics rows older than %d days", n, retentionDays)
}
return n, nil
}
// RemoveStaleObservers marks observers that have not actively sent data in observerDays
// as inactive (soft-delete). This preserves JOIN integrity for observations.observer_idx
// and observer_metrics.observer_id — historical data still references the correct observer.
// An observer must actively send data to stay listed — being seen by another node does not count.
// observerDays <= -1 means never remove (keep forever).
func (db *DB) RemoveStaleObservers(observerDays int) (int64, error) {
if observerDays <= -1 {
return 0, nil // keep forever
}
rw, err := cachedRW(db.path)
if err != nil {
return 0, err
}
cutoff := time.Now().UTC().AddDate(0, 0, -observerDays).Format(time.RFC3339)
res, err := rw.Exec(`UPDATE observers SET inactive = 1 WHERE last_seen < ? AND (inactive IS NULL OR inactive = 0)`, cutoff)
if err != nil {
return 0, err
}
n, _ := res.RowsAffected()
if n > 0 {
// Clean up orphaned metrics for now-inactive observers
rw.Exec(`DELETE FROM observer_metrics WHERE observer_id IN (SELECT id FROM observers WHERE inactive = 1)`)
log.Printf("[observers] Marked %d observer(s) as inactive (not seen in %d days)", n, observerDays)
}
return n, nil
}
// (PruneOldMetrics / RemoveStaleObservers removed in #1283 — see note
// above the MetricsSample type. Ingestor owns these writes now.)
// TouchNodeLastSeen updates last_seen for a node identified by full public key.
// Only updates if the new timestamp is newer than the existing value (or NULL).
-262
View File
@@ -1,262 +0,0 @@
package main
import (
"database/sql"
"os"
"path/filepath"
"strings"
"testing"
"time"
_ "modernc.org/sqlite"
)
// createFreshIngestorDB creates a SQLite DB using the ingestor's applySchema logic
// (simulated here) with auto_vacuum=INCREMENTAL set before tables.
func createFreshDBWithAutoVacuum(t *testing.T, path string) *sql.DB {
t.Helper()
// auto_vacuum must be set via DSN before journal_mode creates the DB file
db, err := sql.Open("sqlite", path+"?_pragma=auto_vacuum(INCREMENTAL)&_pragma=journal_mode(WAL)&_pragma=busy_timeout(5000)")
if err != nil {
t.Fatal(err)
}
db.SetMaxOpenConns(1)
// Create minimal schema
_, err = db.Exec(`
CREATE TABLE transmissions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
raw_hex TEXT NOT NULL,
hash TEXT NOT NULL UNIQUE,
first_seen TEXT NOT NULL,
route_type INTEGER,
payload_type INTEGER,
payload_version INTEGER,
decoded_json TEXT,
created_at TEXT DEFAULT (datetime('now')),
channel_hash TEXT
);
CREATE TABLE observations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transmission_id INTEGER NOT NULL REFERENCES transmissions(id),
observer_idx INTEGER,
direction TEXT,
snr REAL,
rssi REAL,
score INTEGER,
path_json TEXT,
timestamp INTEGER NOT NULL
);
`)
if err != nil {
t.Fatal(err)
}
return db
}
func TestNewDBHasIncrementalAutoVacuum(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.db")
db := createFreshDBWithAutoVacuum(t, path)
defer db.Close()
var autoVacuum int
if err := db.QueryRow("PRAGMA auto_vacuum").Scan(&autoVacuum); err != nil {
t.Fatal(err)
}
if autoVacuum != 2 {
t.Fatalf("expected auto_vacuum=2 (INCREMENTAL), got %d", autoVacuum)
}
}
func TestExistingDBHasAutoVacuumNone(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.db")
// Create DB WITHOUT setting auto_vacuum (simulates old DB)
db, err := sql.Open("sqlite", path+"?_pragma=journal_mode(WAL)")
if err != nil {
t.Fatal(err)
}
db.SetMaxOpenConns(1)
_, err = db.Exec("CREATE TABLE dummy (id INTEGER PRIMARY KEY)")
if err != nil {
t.Fatal(err)
}
var autoVacuum int
if err := db.QueryRow("PRAGMA auto_vacuum").Scan(&autoVacuum); err != nil {
t.Fatal(err)
}
db.Close()
if autoVacuum != 0 {
t.Fatalf("expected auto_vacuum=0 (NONE) for old DB, got %d", autoVacuum)
}
}
func TestVacuumOnStartupMigratesDB(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.db")
// Create DB without auto_vacuum (old DB)
db, err := sql.Open("sqlite", path+"?_pragma=journal_mode(WAL)")
if err != nil {
t.Fatal(err)
}
db.SetMaxOpenConns(1)
_, err = db.Exec("CREATE TABLE dummy (id INTEGER PRIMARY KEY)")
if err != nil {
t.Fatal(err)
}
var before int
db.QueryRow("PRAGMA auto_vacuum").Scan(&before)
if before != 0 {
t.Fatalf("precondition: expected auto_vacuum=0, got %d", before)
}
db.Close()
// Simulate vacuumOnStartup migration using openRW
rw, err := openRW(path)
if err != nil {
t.Fatal(err)
}
if _, err := rw.Exec("PRAGMA auto_vacuum = INCREMENTAL"); err != nil {
t.Fatal(err)
}
if _, err := rw.Exec("VACUUM"); err != nil {
t.Fatal(err)
}
rw.Close()
// Verify migration
db2, err := sql.Open("sqlite", path+"?mode=ro")
if err != nil {
t.Fatal(err)
}
defer db2.Close()
var after int
if err := db2.QueryRow("PRAGMA auto_vacuum").Scan(&after); err != nil {
t.Fatal(err)
}
if after != 2 {
t.Fatalf("expected auto_vacuum=2 after VACUUM migration, got %d", after)
}
}
func TestIncrementalVacuumReducesFreelist(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.db")
db := createFreshDBWithAutoVacuum(t, path)
// Insert a bunch of data
now := time.Now().UTC().Format(time.RFC3339)
for i := 0; i < 500; i++ {
_, err := db.Exec(
"INSERT INTO transmissions (raw_hex, hash, first_seen) VALUES (?, ?, ?)",
strings.Repeat("AA", 200), // ~400 bytes each
"hash_"+string(rune('A'+i%26))+string(rune('0'+i/26)),
now,
)
if err != nil {
t.Fatal(err)
}
}
// Get file size before delete
db.Close()
infoBefore, _ := os.Stat(path)
sizeBefore := infoBefore.Size()
// Reopen and delete all
db, err := sql.Open("sqlite", path+"?_pragma=journal_mode(WAL)&_pragma=busy_timeout(5000)")
if err != nil {
t.Fatal(err)
}
db.SetMaxOpenConns(1)
defer db.Close()
_, err = db.Exec("DELETE FROM transmissions")
if err != nil {
t.Fatal(err)
}
// Check freelist before vacuum
var freelistBefore int64
db.QueryRow("PRAGMA freelist_count").Scan(&freelistBefore)
if freelistBefore == 0 {
t.Fatal("expected non-zero freelist after DELETE")
}
// Run incremental vacuum
_, err = db.Exec("PRAGMA incremental_vacuum(10000)")
if err != nil {
t.Fatal(err)
}
// Check freelist after vacuum
var freelistAfter int64
db.QueryRow("PRAGMA freelist_count").Scan(&freelistAfter)
if freelistAfter >= freelistBefore {
t.Fatalf("expected freelist to shrink: before=%d after=%d", freelistBefore, freelistAfter)
}
// Checkpoint WAL and check file size shrunk
db.Exec("PRAGMA wal_checkpoint(TRUNCATE)")
db.Close()
infoAfter, _ := os.Stat(path)
sizeAfter := infoAfter.Size()
if sizeAfter >= sizeBefore {
t.Logf("warning: file did not shrink (before=%d after=%d) — may depend on page reuse", sizeBefore, sizeAfter)
}
}
func TestCheckAutoVacuumLogs(t *testing.T) {
// This test verifies checkAutoVacuum doesn't panic on various configs
dir := t.TempDir()
path := filepath.Join(dir, "test.db")
// Create a fresh DB with auto_vacuum=INCREMENTAL
dbConn := createFreshDBWithAutoVacuum(t, path)
db := &DB{conn: dbConn, path: path}
cfg := &Config{}
// Should not panic
checkAutoVacuum(db, cfg, path)
dbConn.Close()
// Create a DB without auto_vacuum
path2 := filepath.Join(dir, "test2.db")
dbConn2, _ := sql.Open("sqlite", path2+"?_pragma=journal_mode(WAL)")
dbConn2.SetMaxOpenConns(1)
dbConn2.Exec("CREATE TABLE dummy (id INTEGER PRIMARY KEY)")
db2 := &DB{conn: dbConn2, path: path2}
// Should log warning but not panic
checkAutoVacuum(db2, cfg, path2)
dbConn2.Close()
}
func TestConfigIncrementalVacuumPages(t *testing.T) {
// Default
cfg := &Config{}
if cfg.IncrementalVacuumPages() != 1024 {
t.Fatalf("expected default 1024, got %d", cfg.IncrementalVacuumPages())
}
// Custom
cfg.DB = &DBConfig{IncrementalVacuumPages: 512}
if cfg.IncrementalVacuumPages() != 512 {
t.Fatalf("expected 512, got %d", cfg.IncrementalVacuumPages())
}
// Zero should return default
cfg.DB.IncrementalVacuumPages = 0
if cfg.IncrementalVacuumPages() != 1024 {
t.Fatalf("expected default 1024 for zero, got %d", cfg.IncrementalVacuumPages())
}
}
+8 -128
View File
@@ -167,8 +167,8 @@ func main() {
stats.TotalTransmissions, stats.TotalObservations, stats.TotalNodes, stats.TotalObservers)
}
// Check auto_vacuum mode and optionally migrate (#919)
checkAutoVacuum(database, cfg, resolvedDB)
// auto_vacuum is checked + migrated by the ingestor (#1283). The
// server is read-only and must not race the writer for the lock.
// Ensure indexes the server's SQL fallback path depends on
// (mirrors ingestor schema for DBs created by old server-only builds).
@@ -387,120 +387,10 @@ func main() {
log.Printf("[bridge-recompute] background recompute enabled (interval=%s)",
cfg.AnalyticsDefaultRecomputeInterval())
// Auto-prune old packets if retention.packetDays is configured
vacuumPages := cfg.IncrementalVacuumPages()
var stopPrune func()
if cfg.Retention != nil && cfg.Retention.PacketDays > 0 {
days := cfg.Retention.PacketDays
pruneTicker := time.NewTicker(24 * time.Hour)
pruneDone := make(chan struct{})
stopPrune = func() {
pruneTicker.Stop()
close(pruneDone)
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("[prune] panic recovered: %v", r)
}
}()
time.Sleep(1 * time.Minute)
if n, err := database.PruneOldPackets(days); err != nil {
log.Printf("[prune] error: %v", err)
} else {
log.Printf("[prune] deleted %d transmissions older than %d days", n, days)
if n > 0 {
runIncrementalVacuum(resolvedDB, vacuumPages)
}
}
for {
select {
case <-pruneTicker.C:
if n, err := database.PruneOldPackets(days); err != nil {
log.Printf("[prune] error: %v", err)
} else {
log.Printf("[prune] deleted %d transmissions older than %d days", n, days)
if n > 0 {
runIncrementalVacuum(resolvedDB, vacuumPages)
}
}
case <-pruneDone:
return
}
}
}()
log.Printf("[prune] auto-prune enabled: packets older than %d days will be removed daily", days)
}
// Auto-prune old metrics
var stopMetricsPrune func()
{
metricsDays := cfg.MetricsRetentionDays()
metricsPruneTicker := time.NewTicker(24 * time.Hour)
metricsPruneDone := make(chan struct{})
stopMetricsPrune = func() {
metricsPruneTicker.Stop()
close(metricsPruneDone)
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("[metrics-prune] panic recovered: %v", r)
}
}()
time.Sleep(2 * time.Minute) // stagger after packet prune
database.PruneOldMetrics(metricsDays)
runIncrementalVacuum(resolvedDB, vacuumPages)
for {
select {
case <-metricsPruneTicker.C:
database.PruneOldMetrics(metricsDays)
runIncrementalVacuum(resolvedDB, vacuumPages)
case <-metricsPruneDone:
return
}
}
}()
log.Printf("[metrics-prune] auto-prune enabled: metrics older than %d days", metricsDays)
}
// Auto-prune stale observers
var stopObserverPrune func()
{
observerDays := cfg.ObserverDaysOrDefault()
if observerDays <= -1 {
// -1 means keep forever, skip
} else {
observerPruneTicker := time.NewTicker(24 * time.Hour)
observerPruneDone := make(chan struct{})
stopObserverPrune = func() {
observerPruneTicker.Stop()
close(observerPruneDone)
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("[observer-prune] panic recovered: %v", r)
}
}()
time.Sleep(3 * time.Minute) // stagger after metrics prune
database.RemoveStaleObservers(observerDays)
runIncrementalVacuum(resolvedDB, vacuumPages)
for {
select {
case <-observerPruneTicker.C:
database.RemoveStaleObservers(observerDays)
runIncrementalVacuum(resolvedDB, vacuumPages)
case <-observerPruneDone:
return
}
}
}()
log.Printf("[observer-prune] auto-prune enabled: observers not seen in %d days will be removed", observerDays)
}
}
// Auto-prune old neighbor edges
// Packet / metrics / observer retention moved to the ingestor in
// #1283 (writes only belong on the writer process). The server no
// longer schedules any of these; the ingestor's tickers handle them.
_ = cfg.IncrementalVacuumPages() // kept reachable for config validation; not used here
var stopEdgePrune func()
{
maxAgeDays := cfg.NeighborMaxAgeDays()
@@ -519,13 +409,11 @@ func main() {
time.Sleep(4 * time.Minute) // stagger after metrics prune
g := store.graph.Load()
PruneNeighborEdges(dbPath, g, maxAgeDays)
runIncrementalVacuum(resolvedDB, vacuumPages)
for {
select {
case <-edgePruneTicker.C:
g := store.graph.Load()
PruneNeighborEdges(dbPath, g, maxAgeDays)
runIncrementalVacuum(resolvedDB, vacuumPages)
case <-edgePruneDone:
return
}
@@ -552,16 +440,8 @@ func main() {
// 1. Stop accepting new WebSocket/poll data
poller.Stop()
// 1b. Stop auto-prune ticker
if stopPrune != nil {
stopPrune()
}
if stopMetricsPrune != nil {
stopMetricsPrune()
}
if stopObserverPrune != nil {
stopObserverPrune()
}
// 1b. Stop auto-prune ticker (server-side packet/metrics/observer
// prunes were removed in #1283; only neighbor-edge prune remains.)
if stopEdgePrune != nil {
stopEdgePrune()
}
+1 -1
View File
@@ -43,7 +43,7 @@ func routeDescriptions() map[string]routeMeta {
"GET /api/stats": {Summary: "Network statistics", Description: "Returns aggregate stats (node counts, packet counts, observer counts). Cached for 10s.", Tag: "admin"},
"GET /api/perf": {Summary: "Performance statistics", Description: "Returns per-endpoint request timing and slow query log.", Tag: "admin"},
"POST /api/perf/reset": {Summary: "Reset performance stats", Tag: "admin", Auth: true},
"POST /api/admin/prune": {Summary: "Prune old data", Description: "Deletes packets and nodes older than the configured retention period.", Tag: "admin", Auth: true},
// "POST /api/admin/prune" removed in #1283 (ingestor owns prune).
"GET /api/debug/affinity": {Summary: "Debug neighbor affinity scores", Tag: "admin", Auth: true},
"GET /api/backup": {Summary: "Download SQLite backup", Description: "Streams a consistent SQLite snapshot of the analyzer DB (VACUUM INTO). Response is application/octet-stream with attachment filename corescope-backup-<unix>.db.", Tag: "admin", Auth: true},
+5 -40
View File
@@ -132,7 +132,9 @@ func (s *Server) RegisterRoutes(r *mux.Router) {
r.HandleFunc("/api/perf/sqlite", s.handlePerfSqlite).Methods("GET")
r.HandleFunc("/api/perf/write-sources", s.handlePerfWriteSources).Methods("GET")
r.Handle("/api/perf/reset", s.requireAPIKey(http.HandlerFunc(s.handlePerfReset))).Methods("POST")
r.Handle("/api/admin/prune", s.requireAPIKey(http.HandlerFunc(s.handleAdminPrune))).Methods("POST")
// /api/admin/prune removed in #1283 — pruning is owned by the
// ingestor process (scheduled tickers + startup pass). Operators
// who want an ad-hoc prune can restart the ingestor.
r.Handle("/api/debug/affinity", s.requireAPIKey(http.HandlerFunc(s.handleDebugAffinity))).Methods("GET")
r.Handle("/api/dropped-packets", s.requireAPIKey(http.HandlerFunc(s.handleDroppedPackets))).Methods("GET")
r.Handle("/api/backup", s.requireAPIKey(http.HandlerFunc(s.handleBackup))).Methods("GET")
@@ -2684,45 +2686,8 @@ func parseWindowDuration(window string) (time.Duration, error) {
return time.ParseDuration(window)
}
func (s *Server) handleAdminPrune(w http.ResponseWriter, r *http.Request) {
days := 0
if d := r.URL.Query().Get("days"); d != "" {
fmt.Sscanf(d, "%d", &days)
}
if days <= 0 && s.cfg.Retention != nil {
days = s.cfg.Retention.PacketDays
}
if days <= 0 {
writeError(w, 400, "days parameter required (or set retention.packetDays in config)")
return
}
results := map[string]interface{}{}
// Prune old packets
n, err := s.db.PruneOldPackets(days)
if err != nil {
writeError(w, 500, err.Error())
return
}
log.Printf("[prune] deleted %d transmissions older than %d days", n, days)
results["packets_deleted"] = n
results["deleted"] = n // legacy alias
// Also mark stale observers as inactive if observerDays is configured
observerDays := s.cfg.ObserverDaysOrDefault()
if observerDays > 0 {
obsN, obsErr := s.db.RemoveStaleObservers(observerDays)
if obsErr != nil {
log.Printf("[prune] observer prune error: %v", obsErr)
} else {
results["observers_inactive"] = obsN
}
}
results["days"] = days
writeJSON(w, results)
}
// handleAdminPrune was removed in #1283. Prune now runs in the ingestor
// process (server is read-only). The function and route are gone.
// constantTimeEqual compares two strings in constant time to prevent timing attacks.
func constantTimeEqual(a, b string) bool {
-82
View File
@@ -1,82 +0,0 @@
package main
import (
"fmt"
"log"
"time"
)
// checkAutoVacuum inspects the current auto_vacuum mode and logs a warning
// if it's not INCREMENTAL. Optionally performs a one-time full VACUUM if
// the operator has set db.vacuumOnStartup: true in config (#919).
func checkAutoVacuum(db *DB, cfg *Config, dbPath string) {
var autoVacuum int
if err := db.conn.QueryRow("PRAGMA auto_vacuum").Scan(&autoVacuum); err != nil {
log.Printf("[db] warning: could not read auto_vacuum: %v", err)
return
}
if autoVacuum == 2 {
log.Printf("[db] auto_vacuum=INCREMENTAL")
return
}
modes := map[int]string{0: "NONE", 1: "FULL", 2: "INCREMENTAL"}
mode := modes[autoVacuum]
if mode == "" {
mode = fmt.Sprintf("UNKNOWN(%d)", autoVacuum)
}
log.Printf("[db] auto_vacuum=%s — DB needs one-time VACUUM to enable incremental auto-vacuum. "+
"Set db.vacuumOnStartup: true in config to migrate (will block startup for several minutes on large DBs). "+
"See https://github.com/Kpa-clawbot/CoreScope/issues/919", mode)
if cfg.DB != nil && cfg.DB.VacuumOnStartup {
// WARNING: Full VACUUM creates a temporary copy of the entire DB file.
// Requires ~2× the DB file size in free disk space or it will fail.
log.Printf("[db] vacuumOnStartup=true — starting one-time full VACUUM (ensure 2x DB size free disk space)...")
start := time.Now()
rw, err := cachedRW(dbPath)
if err != nil {
log.Printf("[db] VACUUM failed: could not open RW connection: %v", err)
return
}
if _, err := rw.Exec("PRAGMA auto_vacuum = INCREMENTAL"); err != nil {
log.Printf("[db] VACUUM failed: could not set auto_vacuum: %v", err)
return
}
if _, err := rw.Exec("VACUUM"); err != nil {
log.Printf("[db] VACUUM failed: %v", err)
return
}
elapsed := time.Since(start)
log.Printf("[db] VACUUM complete in %v — auto_vacuum is now INCREMENTAL", elapsed.Round(time.Millisecond))
// Re-check
var newMode int
if err := db.conn.QueryRow("PRAGMA auto_vacuum").Scan(&newMode); err == nil {
if newMode == 2 {
log.Printf("[db] auto_vacuum=INCREMENTAL (confirmed after VACUUM)")
} else {
log.Printf("[db] warning: auto_vacuum=%d after VACUUM — expected 2", newMode)
}
}
}
}
// runIncrementalVacuum runs PRAGMA incremental_vacuum(N) on a read-write
// connection. Safe to call on auto_vacuum=NONE databases (noop).
func runIncrementalVacuum(dbPath string, pages int) {
rw, err := cachedRW(dbPath)
if err != nil {
log.Printf("[vacuum] could not open RW connection: %v", err)
return
}
if _, err := rw.Exec(fmt.Sprintf("PRAGMA incremental_vacuum(%d)", pages)); err != nil {
log.Printf("[vacuum] incremental_vacuum error: %v", err)
}
}
+2 -2
View File
@@ -9,12 +9,12 @@
"nodeDays": 7,
"observerDays": 14,
"packetDays": 30,
"_comment": "nodeDays: nodes not seen in N days moved to inactive_nodes (default 7). observerDays: observers not sending data in N days are removed (-1 = keep forever, default 14). packetDays: transmissions older than N days are deleted (0 = disabled)."
"_comment": "nodeDays: nodes not seen in N days moved to inactive_nodes (default 7). observerDays: observers not sending data in N days are removed (-1 = keep forever, default 14). packetDays: transmissions older than N days are deleted (0 = disabled). NOTE (#1283): all four retention fields are consumed by the INGESTOR process. The server is read-only and never prunes."
},
"db": {
"vacuumOnStartup": false,
"incrementalVacuumPages": 1024,
"_comment": "vacuumOnStartup: run one-time full VACUUM to enable incremental auto-vacuum on existing DBs (blocks startup for minutes on large DBs; requires 2x DB file size in free disk space). incrementalVacuumPages: free pages returned to OS after each retention reaper cycle (default 1024). See #919."
"_comment": "vacuumOnStartup: run one-time full VACUUM to enable incremental auto-vacuum on existing DBs. Executed by the INGESTOR at startup, BEFORE the MQTT subscriber starts (#1283), so there is no contention with concurrent writes. Blocks ingestor startup for minutes on large DBs; requires 2x DB file size in free disk space. incrementalVacuumPages: free pages returned to OS after each retention reaper cycle (default 1024). See #919."
},
"_comment_ingestorStats": "Ingestor publishes a 1-Hz stats snapshot consumed by the server's /api/perf/io and /api/perf/write-sources endpoints (#1120). Path is configured via the CORESCOPE_INGESTOR_STATS environment variable on the INGESTOR process. Default: /tmp/corescope-ingestor-stats.json. The writer uses O_NOFOLLOW + 0o600, so a pre-planted symlink in /tmp cannot be used to clobber an arbitrary file. SECURITY: in shared-tmp environments (multi-tenant hosts), point CORESCOPE_INGESTOR_STATS at a private directory like /var/lib/corescope/ingestor-stats.json that only the corescope user can write to.",
"https": {