fix(ingestor): add hourly WAL checkpoint to prevent unbounded WAL growth (#1435)

Fixes #1434.

## Problem

The ingestor's `Checkpoint()` (`PRAGMA wal_checkpoint(TRUNCATE)`) was
only called on shutdown. SQLite's built-in auto-checkpoint runs in
PASSIVE mode which cannot truncate the WAL while the server holds an
active read connection. Result: the WAL grows at ~40–50 MB/hour and is
never reset during a running instance.

Observed on analyzer.on8ar.eu: **183.4 MB WAL** after ~4h uptime.

## Changes

**`cmd/ingestor/main.go`**
- Add a periodic goroutine that calls `Checkpoint()` every hour,
staggered 30s after startup
- Hoist `walCheckpointTicker` to function scope so it is stopped cleanly
at shutdown alongside all other tickers

**`cmd/ingestor/db.go`**
- Switch `Checkpoint()` from `Exec` to `QueryRow(...).Scan` to capture
SQLite's 3-column result (`busy`, `log`, `checkpointed`)
- Return the checkpointed frame count (callers that discard it are
unaffected)
- Log only when `walFrames > 0` — silent when WAL is already empty,
avoiding log spam
- Log `blocked=true/false` instead of raw `busy` integer to make it
clear when the server's read lock is preventing full truncation

## Behaviour after fix

Each hourly tick flushes all WAL frames not held by an active server
reader. Worst-case WAL size is now bounded to roughly one hour of write
traffic (~45 MB) instead of unbounded growth. If the server holds a read
lock at checkpoint time, the log shows `blocked=true` and remaining
frames are retried on the next tick.

## Test plan

- [x] `go build ./...` (ingestor module)
- [x] `go test ./...` passes
- [x] Code review addressed (ticker stop on shutdown, log message
clarity)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
efiten
2026-05-29 00:01:54 +02:00
committed by GitHub
parent 29432d4fe0
commit 52f131e2dc
2 changed files with 28 additions and 6 deletions
+12 -6
View File
@@ -1045,14 +1045,20 @@ func (s *Store) RunIncrementalVacuum(pages int) {
}
}
// Checkpoint forces a WAL checkpoint to release the WAL lock file,
// preventing lock contention with a new process starting up.
func (s *Store) Checkpoint() {
if _, err := s.db.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil {
// Checkpoint runs a WAL checkpoint (TRUNCATE mode).
// Returns the number of WAL frames checkpointed (0 if WAL was already empty).
// TRUNCATE resets the WAL file to zero bytes when all frames are checkpointed;
// if active readers hold frames, it checkpoints what it can and leaves the rest.
func (s *Store) Checkpoint() int {
var busy, walFrames, checkpointed int
if err := s.db.QueryRow("PRAGMA wal_checkpoint(TRUNCATE)").Scan(&busy, &walFrames, &checkpointed); err != nil {
log.Printf("[db] WAL checkpoint error: %v", err)
} else {
log.Println("[db] WAL checkpoint complete")
return 0
}
if walFrames > 0 {
log.Printf("[db] WAL checkpoint: %d/%d frames checkpointed (blocked=%v)", checkpointed, walFrames, busy != 0)
}
return checkpointed
}
// BackfillPathJSONAsync launches the path_json backfill in a background goroutine.
+16
View File
@@ -150,6 +150,21 @@ func main() {
log.Printf("[prune] auto-prune enabled: packets older than %d days will be removed daily", packetDays)
}
// Hourly WAL checkpoint to prevent unbounded WAL growth.
// TRUNCATE resets the WAL file to zero bytes when all frames are flushed;
// if the server's read connection holds frames, remaining pages stay in the
// WAL until the next tick. Staggered 30s after startup to avoid competing
// with the initial burst of ingest writes.
walCheckpointTicker := time.NewTicker(1 * time.Hour)
go func() {
time.Sleep(30 * time.Second)
store.Checkpoint()
for range walCheckpointTicker.C {
store.Checkpoint()
}
}()
log.Printf("[db] WAL checkpoint scheduled every 1h")
// Daily neighbor_edges retention (#1287 — moved from cmd/server).
{
nDays := cfg.NeighborEdgesDaysOrDefault()
@@ -373,6 +388,7 @@ func main() {
}
statsTicker.Stop()
pruneQueueTicker.Stop()
walCheckpointTicker.Stop()
stopWatchdog()
store.LogStats() // final stats on shutdown
for _, c := range clients {