diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index 0f128775..31ab0b94 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -1045,14 +1045,20 @@ func (s *Store) RunIncrementalVacuum(pages int) { } } -// Checkpoint forces a WAL checkpoint to release the WAL lock file, -// preventing lock contention with a new process starting up. -func (s *Store) Checkpoint() { - if _, err := s.db.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil { +// Checkpoint runs a WAL checkpoint (TRUNCATE mode). +// Returns the number of WAL frames checkpointed (0 if WAL was already empty). +// TRUNCATE resets the WAL file to zero bytes when all frames are checkpointed; +// if active readers hold frames, it checkpoints what it can and leaves the rest. +func (s *Store) Checkpoint() int { + var busy, walFrames, checkpointed int + if err := s.db.QueryRow("PRAGMA wal_checkpoint(TRUNCATE)").Scan(&busy, &walFrames, &checkpointed); err != nil { log.Printf("[db] WAL checkpoint error: %v", err) - } else { - log.Println("[db] WAL checkpoint complete") + return 0 } + if walFrames > 0 { + log.Printf("[db] WAL checkpoint: %d/%d frames checkpointed (blocked=%v)", checkpointed, walFrames, busy != 0) + } + return checkpointed } // BackfillPathJSONAsync launches the path_json backfill in a background goroutine. diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index 3f2fbd5b..ae905528 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -150,6 +150,21 @@ func main() { log.Printf("[prune] auto-prune enabled: packets older than %d days will be removed daily", packetDays) } + // Hourly WAL checkpoint to prevent unbounded WAL growth. + // TRUNCATE resets the WAL file to zero bytes when all frames are flushed; + // if the server's read connection holds frames, remaining pages stay in the + // WAL until the next tick. Staggered 30s after startup to avoid competing + // with the initial burst of ingest writes. + walCheckpointTicker := time.NewTicker(1 * time.Hour) + go func() { + time.Sleep(30 * time.Second) + store.Checkpoint() + for range walCheckpointTicker.C { + store.Checkpoint() + } + }() + log.Printf("[db] WAL checkpoint scheduled every 1h") + // Daily neighbor_edges retention (#1287 — moved from cmd/server). { nDays := cfg.NeighborEdgesDaysOrDefault() @@ -373,6 +388,7 @@ func main() { } statsTicker.Stop() pruneQueueTicker.Stop() + walCheckpointTicker.Stop() stopWatchdog() store.LogStats() // final stats on shutdown for _, c := range clients {