fix: cache RW SQLite connection + dedup DBConfig (closes #921) (#982)

Closes #921

## Summary

Follow-up to #920 (incremental auto-vacuum). Addresses both items from
the adversarial review:

### 1. RW connection caching

Previously, every call to `openRW(dbPath)` opened a new SQLite RW
connection and closed it after use. This happened in:
- `runIncrementalVacuum` (~4x/hour)
- `PruneOldPackets`, `PruneOldMetrics`, `RemoveStaleObservers`
- `buildAndPersistEdges`, `PruneNeighborEdges`
- All neighbor persist operations

Now a single `*sql.DB` handle (with `MaxOpenConns(1)`) is cached
process-wide via `cachedRW(dbPath)`. The underlying connection pool
manages serialization. The original `openRW()` function is retained for
one-shot test usage.

### 2. DBConfig dedup

`DBConfig` was defined identically in both `cmd/server/config.go` and
`cmd/ingestor/config.go`. Extracted to `internal/dbconfig/` as a shared
package; both binaries now use a type alias (`type DBConfig =
dbconfig.DBConfig`).

## Tests added

| Test | File |
|------|------|
| `TestCachedRW_ReturnsSameHandle` | `cmd/server/rw_cache_test.go` |
| `TestCachedRW_100Calls_SingleConnection` |
`cmd/server/rw_cache_test.go` |
| `TestGetIncrementalVacuumPages_Default` |
`internal/dbconfig/dbconfig_test.go` |
| `TestGetIncrementalVacuumPages_Configured` |
`internal/dbconfig/dbconfig_test.go` |

## Verification

```
ok  github.com/corescope/server    20.069s
ok  github.com/corescope/ingestor  47.117s
ok  github.com/meshcore-analyzer/dbconfig  0.003s
```

Both binaries build cleanly. 100 sequential `cachedRW()` calls return
the same handle with exactly 1 entry in the cache map.

---------

Co-authored-by: you <you@example.com>
This commit is contained in:
Kpa-clawbot
2026-05-02 20:15:30 -07:00
committed by GitHub
parent 736b09697d
commit dd2f044f2b
14 changed files with 186 additions and 42 deletions
+2
View File
@@ -15,6 +15,7 @@ COPY cmd/server/go.mod cmd/server/go.sum ./
COPY internal/geofilter/ ../../internal/geofilter/
COPY internal/sigvalidate/ ../../internal/sigvalidate/
COPY internal/packetpath/ ../../internal/packetpath/
COPY internal/dbconfig/ ../../internal/dbconfig/
RUN go mod download
COPY cmd/server/ ./
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} \
@@ -26,6 +27,7 @@ COPY cmd/ingestor/go.mod cmd/ingestor/go.sum ./
COPY internal/geofilter/ ../../internal/geofilter/
COPY internal/sigvalidate/ ../../internal/sigvalidate/
COPY internal/packetpath/ ../../internal/packetpath/
COPY internal/dbconfig/ ../../internal/dbconfig/
RUN go mod download
COPY cmd/ingestor/ ./
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} \
+3 -5
View File
@@ -9,6 +9,7 @@ import (
"strings"
"sync"
"github.com/meshcore-analyzer/dbconfig"
"github.com/meshcore-analyzer/geofilter"
)
@@ -79,11 +80,8 @@ type MetricsConfig struct {
SampleIntervalSec int `json:"sampleIntervalSec"`
}
// DBConfig controls SQLite vacuum and maintenance behavior (#919).
type DBConfig struct {
VacuumOnStartup bool `json:"vacuumOnStartup"` // one-time full VACUUM on startup if auto_vacuum is not INCREMENTAL
IncrementalVacuumPages int `json:"incrementalVacuumPages"` // pages returned to OS per reaper cycle (default 1024)
}
// DBConfig is the shared SQLite vacuum/maintenance config (#919, #921).
type DBConfig = dbconfig.DBConfig
// IncrementalVacuumPages returns the configured pages per vacuum or 1024 default.
func (c *Config) IncrementalVacuumPages() int {
+4
View File
@@ -17,6 +17,10 @@ require github.com/meshcore-analyzer/packetpath v0.0.0
replace github.com/meshcore-analyzer/packetpath => ../../internal/packetpath
require github.com/meshcore-analyzer/dbconfig v0.0.0
replace github.com/meshcore-analyzer/dbconfig => ../../internal/dbconfig
require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/uuid v1.6.0 // indirect
+3 -5
View File
@@ -8,6 +8,7 @@ import (
"strings"
"sync"
"github.com/meshcore-analyzer/dbconfig"
"github.com/meshcore-analyzer/geofilter"
)
@@ -145,11 +146,8 @@ type RetentionConfig struct {
MetricsDays int `json:"metricsDays"`
}
// DBConfig controls SQLite vacuum and maintenance behavior (#919).
type DBConfig struct {
VacuumOnStartup bool `json:"vacuumOnStartup"` // one-time full VACUUM on startup if auto_vacuum is not INCREMENTAL
IncrementalVacuumPages int `json:"incrementalVacuumPages"` // pages returned to OS per reaper cycle (default 1024)
}
// DBConfig is the shared SQLite vacuum/maintenance config (#919, #921).
type DBConfig = dbconfig.DBConfig
// IncrementalVacuumPages returns the configured pages per vacuum or 1024 default.
func (c *Config) IncrementalVacuumPages() int {
+3 -6
View File
@@ -1873,11 +1873,10 @@ func nullInt(ni sql.NullInt64) interface{} {
// Returns the number of transmissions deleted.
// Opens a separate read-write connection since the main connection is read-only.
func (db *DB) PruneOldPackets(days int) (int64, error) {
rw, err := openRW(db.path)
rw, err := cachedRW(db.path)
if err != nil {
return 0, err
}
defer rw.Close()
cutoff := time.Now().UTC().AddDate(0, 0, -days).Format(time.RFC3339)
tx, err := rw.Begin()
@@ -2220,11 +2219,10 @@ func (db *DB) GetMetricsSummary(since string) ([]MetricsSummaryRow, error) {
// PruneOldMetrics deletes observer_metrics rows older than retentionDays.
func (db *DB) PruneOldMetrics(retentionDays int) (int64, error) {
rw, err := openRW(db.path)
rw, err := cachedRW(db.path)
if err != nil {
return 0, err
}
defer rw.Close()
cutoff := time.Now().UTC().AddDate(0, 0, -retentionDays).Format(time.RFC3339)
res, err := rw.Exec(`DELETE FROM observer_metrics WHERE timestamp < ?`, cutoff)
@@ -2247,11 +2245,10 @@ func (db *DB) RemoveStaleObservers(observerDays int) (int64, error) {
if observerDays <= -1 {
return 0, nil // keep forever
}
rw, err := openRW(db.path)
rw, err := cachedRW(db.path)
if err != nil {
return 0, err
}
defer rw.Close()
cutoff := time.Now().UTC().AddDate(0, 0, -observerDays).Format(time.RFC3339)
res, err := rw.Exec(`UPDATE observers SET inactive = 1 WHERE last_seen < ? AND (inactive IS NULL OR inactive = 0)`, cutoff)
+4
View File
@@ -18,6 +18,10 @@ require github.com/meshcore-analyzer/packetpath v0.0.0
replace github.com/meshcore-analyzer/packetpath => ../../internal/packetpath
require github.com/meshcore-analyzer/dbconfig v0.0.0
replace github.com/meshcore-analyzer/dbconfig => ../../internal/dbconfig
require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/uuid v1.6.0 // indirect
+1 -2
View File
@@ -210,10 +210,9 @@ func main() {
log.Printf("[neighbor] graph build panic recovered: %v", r)
}
}()
rw, rwErr := openRW(dbPath)
rw, rwErr := cachedRW(dbPath)
if rwErr == nil {
edgeCount := buildAndPersistEdges(store, rw)
rw.Close()
log.Printf("[neighbor] persisted %d edges", edgeCount)
}
built := BuildFromStore(store)
+9 -20
View File
@@ -20,11 +20,10 @@ var persistSem = make(chan struct{}, 1)
// ensureNeighborEdgesTable creates the neighbor_edges table if it doesn't exist.
// Uses a separate read-write connection since the main DB is read-only.
func ensureNeighborEdgesTable(dbPath string) error {
rw, err := openRW(dbPath)
rw, err := cachedRW(dbPath)
if err != nil {
return fmt.Errorf("open rw for neighbor_edges: %w", err)
}
defer rw.Close()
_, err = rw.Exec(`CREATE TABLE IF NOT EXISTS neighbor_edges (
node_a TEXT NOT NULL,
@@ -129,12 +128,11 @@ func asyncPersistResolvedPathsAndEdges(dbPath string, obsUpdates []persistObsUpd
go func() {
defer func() { <-persistSem }()
rw, err := openRW(dbPath)
rw, err := cachedRW(dbPath)
if err != nil {
log.Printf("[store] %s rw open error: %v", logPrefix, err)
return
}
defer rw.Close()
if len(obsUpdates) > 0 {
sqlTx, err := rw.Begin()
@@ -249,11 +247,10 @@ func buildAndPersistEdges(store *PacketStore, rw *sql.DB) int {
// ensureResolvedPathColumn adds the resolved_path column to observations if missing.
func ensureResolvedPathColumn(dbPath string) error {
rw, err := openRW(dbPath)
rw, err := cachedRW(dbPath)
if err != nil {
return err
}
defer rw.Close()
// Check if column already exists
rows, err := rw.Query("PRAGMA table_info(observations)")
@@ -289,11 +286,10 @@ func ensureResolvedPathColumn(dbPath string) error {
// GetStats) silently fail with "no such column: inactive" — leaving /api/observers
// returning empty.
func ensureObserverInactiveColumn(dbPath string) error {
rw, err := openRW(dbPath)
rw, err := cachedRW(dbPath)
if err != nil {
return err
}
defer rw.Close()
rows, err := rw.Query("PRAGMA table_info(observers)")
if err != nil {
@@ -327,11 +323,10 @@ func ensureObserverInactiveColumn(dbPath string) error {
// the e2e fixture), the column is missing and read queries that reference it
// (GetObservers, GetObserverByID) fail with "no such column: last_packet_at".
func ensureLastPacketAtColumn(dbPath string) error {
rw, err := openRW(dbPath)
rw, err := cachedRW(dbPath)
if err != nil {
return err
}
defer rw.Close()
rows, err := rw.Query("PRAGMA table_info(observers)")
if err != nil {
@@ -361,12 +356,11 @@ func ensureLastPacketAtColumn(dbPath string) error {
// softDeleteBlacklistedObservers marks observers matching the blacklist as
// inactive=1 so they are hidden from API responses. Runs once at startup.
func softDeleteBlacklistedObservers(dbPath string, blacklist []string) {
rw, err := openRW(dbPath)
rw, err := cachedRW(dbPath)
if err != nil {
log.Printf("[observer-blacklist] warning: could not open DB for soft-delete: %v", err)
return
}
defer rw.Close()
placeholders := make([]string, 0, len(blacklist))
args := make([]interface{}, 0, len(blacklist))
@@ -528,16 +522,12 @@ func backfillResolvedPathsAsync(store *PacketStore, dbPath string, chunkSize int
var rw *sql.DB
if dbPath != "" {
var err error
rw, err = openRW(dbPath)
rw, err = cachedRW(dbPath)
if err != nil {
log.Printf("[store] async backfill: open rw error: %v", err)
}
}
defer func() {
if rw != nil {
rw.Close()
}
}()
// rw is cached process-wide; do not close
totalProcessed := 0
for totalProcessed < totalPending {
@@ -762,11 +752,10 @@ func PruneNeighborEdges(dbPath string, graph *NeighborGraph, maxAgeDays int) (in
// 1. Prune from SQLite using a read-write connection
var dbPruned int64
rw, err := openRW(dbPath)
rw, err := cachedRW(dbPath)
if err != nil {
return 0, fmt.Errorf("prune neighbor_edges: open rw: %w", err)
}
defer rw.Close()
res, err := rw.Exec("DELETE FROM neighbor_edges WHERE last_seen < ?", cutoff.Format(time.RFC3339))
if err != nil {
return 0, fmt.Errorf("prune neighbor_edges: %w", err)
+59
View File
@@ -0,0 +1,59 @@
package main
import (
"database/sql"
"fmt"
"sync"
)
// rwCache holds a process-wide cached RW connection per database path.
// Instead of opening and closing a new RW connection on every call to openRW,
// we cache a single *sql.DB (which internally manages one connection due to
// SetMaxOpenConns(1)). This eliminates repeated open/close overhead for
// vacuum, prune, persist operations that run frequently (#921).
var rwCache = struct {
mu sync.Mutex
conns map[string]*sql.DB
}{conns: make(map[string]*sql.DB)}
// cachedRW returns a cached read-write connection for the given dbPath.
// The connection is created on first call and reused thereafter.
// Callers MUST NOT call Close() on the returned *sql.DB.
func cachedRW(dbPath string) (*sql.DB, error) {
rwCache.mu.Lock()
defer rwCache.mu.Unlock()
if db, ok := rwCache.conns[dbPath]; ok {
return db, nil
}
dsn := fmt.Sprintf("file:%s?_journal_mode=WAL", dbPath)
db, err := sql.Open("sqlite", dsn)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(1)
if _, err := db.Exec("PRAGMA busy_timeout = 5000"); err != nil {
db.Close()
return nil, fmt.Errorf("set busy_timeout: %w", err)
}
rwCache.conns[dbPath] = db
return db, nil
}
// closeRWCache closes all cached RW connections (for tests/shutdown).
func closeRWCache() {
rwCache.mu.Lock()
defer rwCache.mu.Unlock()
for k, db := range rwCache.conns {
db.Close()
delete(rwCache.conns, k)
}
}
// rwCacheLen returns the number of cached connections (for testing).
func rwCacheLen() int {
rwCache.mu.Lock()
defer rwCache.mu.Unlock()
return len(rwCache.conns)
}
+55
View File
@@ -0,0 +1,55 @@
package main
import (
"os"
"path/filepath"
"testing"
)
func TestCachedRW_ReturnsSameHandle(t *testing.T) {
dir := t.TempDir()
dbPath := filepath.Join(dir, "test.db")
// Create the DB file
f, _ := os.Create(dbPath)
f.Close()
defer closeRWCache()
db1, err := cachedRW(dbPath)
if err != nil {
t.Fatalf("first cachedRW: %v", err)
}
db2, err := cachedRW(dbPath)
if err != nil {
t.Fatalf("second cachedRW: %v", err)
}
if db1 != db2 {
t.Fatalf("cachedRW returned different handles: %p vs %p", db1, db2)
}
}
func TestCachedRW_100Calls_SingleConnection(t *testing.T) {
dir := t.TempDir()
dbPath := filepath.Join(dir, "test.db")
f, _ := os.Create(dbPath)
f.Close()
defer closeRWCache()
var first interface{}
for i := 0; i < 100; i++ {
db, err := cachedRW(dbPath)
if err != nil {
t.Fatalf("call %d: %v", i, err)
}
if i == 0 {
first = db
} else if db != first {
t.Fatalf("call %d returned different handle", i)
}
}
if rwCacheLen() != 1 {
t.Fatalf("expected 1 cached connection, got %d", rwCacheLen())
}
}
+2 -4
View File
@@ -37,12 +37,11 @@ func checkAutoVacuum(db *DB, cfg *Config, dbPath string) {
log.Printf("[db] vacuumOnStartup=true — starting one-time full VACUUM (ensure 2x DB size free disk space)...")
start := time.Now()
rw, err := openRW(dbPath)
rw, err := cachedRW(dbPath)
if err != nil {
log.Printf("[db] VACUUM failed: could not open RW connection: %v", err)
return
}
defer rw.Close()
if _, err := rw.Exec("PRAGMA auto_vacuum = INCREMENTAL"); err != nil {
log.Printf("[db] VACUUM failed: could not set auto_vacuum: %v", err)
@@ -71,12 +70,11 @@ func checkAutoVacuum(db *DB, cfg *Config, dbPath string) {
// runIncrementalVacuum runs PRAGMA incremental_vacuum(N) on a read-write
// connection. Safe to call on auto_vacuum=NONE databases (noop).
func runIncrementalVacuum(dbPath string, pages int) {
rw, err := openRW(dbPath)
rw, err := cachedRW(dbPath)
if err != nil {
log.Printf("[vacuum] could not open RW connection: %v", err)
return
}
defer rw.Close()
if _, err := rw.Exec(fmt.Sprintf("PRAGMA incremental_vacuum(%d)", pages)); err != nil {
log.Printf("[vacuum] incremental_vacuum error: %v", err)
+17
View File
@@ -0,0 +1,17 @@
// Package dbconfig provides the shared DBConfig struct used by both the server
// and ingestor binaries for SQLite vacuum and maintenance settings (#919, #921).
package dbconfig
// DBConfig controls SQLite vacuum and maintenance behavior (#919).
type DBConfig struct {
VacuumOnStartup bool `json:"vacuumOnStartup"` // one-time full VACUUM on startup if auto_vacuum is not INCREMENTAL
IncrementalVacuumPages int `json:"incrementalVacuumPages"` // pages returned to OS per reaper cycle (default 1024)
}
// GetIncrementalVacuumPages returns the configured pages or 1024 default.
func (c *DBConfig) GetIncrementalVacuumPages() int {
if c != nil && c.IncrementalVacuumPages > 0 {
return c.IncrementalVacuumPages
}
return 1024
}
+21
View File
@@ -0,0 +1,21 @@
package dbconfig
import "testing"
func TestGetIncrementalVacuumPages_Default(t *testing.T) {
var c *DBConfig
if got := c.GetIncrementalVacuumPages(); got != 1024 {
t.Fatalf("nil DBConfig: got %d, want 1024", got)
}
c = &DBConfig{}
if got := c.GetIncrementalVacuumPages(); got != 1024 {
t.Fatalf("zero DBConfig: got %d, want 1024", got)
}
}
func TestGetIncrementalVacuumPages_Configured(t *testing.T) {
c := &DBConfig{IncrementalVacuumPages: 512}
if got := c.GetIncrementalVacuumPages(); got != 512 {
t.Fatalf("got %d, want 512", got)
}
}
+3
View File
@@ -0,0 +1,3 @@
module github.com/meshcore-analyzer/dbconfig
go 1.22