diff --git a/Dockerfile b/Dockerfile index a98fb745..0a8694fc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,6 +15,7 @@ COPY cmd/server/go.mod cmd/server/go.sum ./ COPY internal/geofilter/ ../../internal/geofilter/ COPY internal/sigvalidate/ ../../internal/sigvalidate/ COPY internal/packetpath/ ../../internal/packetpath/ +COPY internal/dbconfig/ ../../internal/dbconfig/ RUN go mod download COPY cmd/server/ ./ RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} \ @@ -26,6 +27,7 @@ COPY cmd/ingestor/go.mod cmd/ingestor/go.sum ./ COPY internal/geofilter/ ../../internal/geofilter/ COPY internal/sigvalidate/ ../../internal/sigvalidate/ COPY internal/packetpath/ ../../internal/packetpath/ +COPY internal/dbconfig/ ../../internal/dbconfig/ RUN go mod download COPY cmd/ingestor/ ./ RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} \ diff --git a/cmd/ingestor/config.go b/cmd/ingestor/config.go index a5c6547a..a43552af 100644 --- a/cmd/ingestor/config.go +++ b/cmd/ingestor/config.go @@ -9,6 +9,7 @@ import ( "strings" "sync" + "github.com/meshcore-analyzer/dbconfig" "github.com/meshcore-analyzer/geofilter" ) @@ -79,11 +80,8 @@ type MetricsConfig struct { SampleIntervalSec int `json:"sampleIntervalSec"` } -// DBConfig controls SQLite vacuum and maintenance behavior (#919). -type DBConfig struct { - VacuumOnStartup bool `json:"vacuumOnStartup"` // one-time full VACUUM on startup if auto_vacuum is not INCREMENTAL - IncrementalVacuumPages int `json:"incrementalVacuumPages"` // pages returned to OS per reaper cycle (default 1024) -} +// DBConfig is the shared SQLite vacuum/maintenance config (#919, #921). +type DBConfig = dbconfig.DBConfig // IncrementalVacuumPages returns the configured pages per vacuum or 1024 default. func (c *Config) IncrementalVacuumPages() int { diff --git a/cmd/ingestor/go.mod b/cmd/ingestor/go.mod index 87449594..010ba569 100644 --- a/cmd/ingestor/go.mod +++ b/cmd/ingestor/go.mod @@ -17,6 +17,10 @@ require github.com/meshcore-analyzer/packetpath v0.0.0 replace github.com/meshcore-analyzer/packetpath => ../../internal/packetpath +require github.com/meshcore-analyzer/dbconfig v0.0.0 + +replace github.com/meshcore-analyzer/dbconfig => ../../internal/dbconfig + require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/google/uuid v1.6.0 // indirect diff --git a/cmd/server/config.go b/cmd/server/config.go index 0c1a296c..31784b11 100644 --- a/cmd/server/config.go +++ b/cmd/server/config.go @@ -8,6 +8,7 @@ import ( "strings" "sync" + "github.com/meshcore-analyzer/dbconfig" "github.com/meshcore-analyzer/geofilter" ) @@ -145,11 +146,8 @@ type RetentionConfig struct { MetricsDays int `json:"metricsDays"` } -// DBConfig controls SQLite vacuum and maintenance behavior (#919). -type DBConfig struct { - VacuumOnStartup bool `json:"vacuumOnStartup"` // one-time full VACUUM on startup if auto_vacuum is not INCREMENTAL - IncrementalVacuumPages int `json:"incrementalVacuumPages"` // pages returned to OS per reaper cycle (default 1024) -} +// DBConfig is the shared SQLite vacuum/maintenance config (#919, #921). +type DBConfig = dbconfig.DBConfig // IncrementalVacuumPages returns the configured pages per vacuum or 1024 default. func (c *Config) IncrementalVacuumPages() int { diff --git a/cmd/server/db.go b/cmd/server/db.go index d970741c..0ee02baf 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -1873,11 +1873,10 @@ func nullInt(ni sql.NullInt64) interface{} { // Returns the number of transmissions deleted. // Opens a separate read-write connection since the main connection is read-only. func (db *DB) PruneOldPackets(days int) (int64, error) { - rw, err := openRW(db.path) + rw, err := cachedRW(db.path) if err != nil { return 0, err } - defer rw.Close() cutoff := time.Now().UTC().AddDate(0, 0, -days).Format(time.RFC3339) tx, err := rw.Begin() @@ -2220,11 +2219,10 @@ func (db *DB) GetMetricsSummary(since string) ([]MetricsSummaryRow, error) { // PruneOldMetrics deletes observer_metrics rows older than retentionDays. func (db *DB) PruneOldMetrics(retentionDays int) (int64, error) { - rw, err := openRW(db.path) + rw, err := cachedRW(db.path) if err != nil { return 0, err } - defer rw.Close() cutoff := time.Now().UTC().AddDate(0, 0, -retentionDays).Format(time.RFC3339) res, err := rw.Exec(`DELETE FROM observer_metrics WHERE timestamp < ?`, cutoff) @@ -2247,11 +2245,10 @@ func (db *DB) RemoveStaleObservers(observerDays int) (int64, error) { if observerDays <= -1 { return 0, nil // keep forever } - rw, err := openRW(db.path) + rw, err := cachedRW(db.path) if err != nil { return 0, err } - defer rw.Close() cutoff := time.Now().UTC().AddDate(0, 0, -observerDays).Format(time.RFC3339) res, err := rw.Exec(`UPDATE observers SET inactive = 1 WHERE last_seen < ? AND (inactive IS NULL OR inactive = 0)`, cutoff) diff --git a/cmd/server/go.mod b/cmd/server/go.mod index 05f99320..310d9375 100644 --- a/cmd/server/go.mod +++ b/cmd/server/go.mod @@ -18,6 +18,10 @@ require github.com/meshcore-analyzer/packetpath v0.0.0 replace github.com/meshcore-analyzer/packetpath => ../../internal/packetpath +require github.com/meshcore-analyzer/dbconfig v0.0.0 + +replace github.com/meshcore-analyzer/dbconfig => ../../internal/dbconfig + require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/google/uuid v1.6.0 // indirect diff --git a/cmd/server/main.go b/cmd/server/main.go index 942d2a75..07902efd 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -210,10 +210,9 @@ func main() { log.Printf("[neighbor] graph build panic recovered: %v", r) } }() - rw, rwErr := openRW(dbPath) + rw, rwErr := cachedRW(dbPath) if rwErr == nil { edgeCount := buildAndPersistEdges(store, rw) - rw.Close() log.Printf("[neighbor] persisted %d edges", edgeCount) } built := BuildFromStore(store) diff --git a/cmd/server/neighbor_persist.go b/cmd/server/neighbor_persist.go index 675772a6..4abb1ac7 100644 --- a/cmd/server/neighbor_persist.go +++ b/cmd/server/neighbor_persist.go @@ -20,11 +20,10 @@ var persistSem = make(chan struct{}, 1) // ensureNeighborEdgesTable creates the neighbor_edges table if it doesn't exist. // Uses a separate read-write connection since the main DB is read-only. func ensureNeighborEdgesTable(dbPath string) error { - rw, err := openRW(dbPath) + rw, err := cachedRW(dbPath) if err != nil { return fmt.Errorf("open rw for neighbor_edges: %w", err) } - defer rw.Close() _, err = rw.Exec(`CREATE TABLE IF NOT EXISTS neighbor_edges ( node_a TEXT NOT NULL, @@ -129,12 +128,11 @@ func asyncPersistResolvedPathsAndEdges(dbPath string, obsUpdates []persistObsUpd go func() { defer func() { <-persistSem }() - rw, err := openRW(dbPath) + rw, err := cachedRW(dbPath) if err != nil { log.Printf("[store] %s rw open error: %v", logPrefix, err) return } - defer rw.Close() if len(obsUpdates) > 0 { sqlTx, err := rw.Begin() @@ -249,11 +247,10 @@ func buildAndPersistEdges(store *PacketStore, rw *sql.DB) int { // ensureResolvedPathColumn adds the resolved_path column to observations if missing. func ensureResolvedPathColumn(dbPath string) error { - rw, err := openRW(dbPath) + rw, err := cachedRW(dbPath) if err != nil { return err } - defer rw.Close() // Check if column already exists rows, err := rw.Query("PRAGMA table_info(observations)") @@ -289,11 +286,10 @@ func ensureResolvedPathColumn(dbPath string) error { // GetStats) silently fail with "no such column: inactive" — leaving /api/observers // returning empty. func ensureObserverInactiveColumn(dbPath string) error { - rw, err := openRW(dbPath) + rw, err := cachedRW(dbPath) if err != nil { return err } - defer rw.Close() rows, err := rw.Query("PRAGMA table_info(observers)") if err != nil { @@ -327,11 +323,10 @@ func ensureObserverInactiveColumn(dbPath string) error { // the e2e fixture), the column is missing and read queries that reference it // (GetObservers, GetObserverByID) fail with "no such column: last_packet_at". func ensureLastPacketAtColumn(dbPath string) error { - rw, err := openRW(dbPath) + rw, err := cachedRW(dbPath) if err != nil { return err } - defer rw.Close() rows, err := rw.Query("PRAGMA table_info(observers)") if err != nil { @@ -361,12 +356,11 @@ func ensureLastPacketAtColumn(dbPath string) error { // softDeleteBlacklistedObservers marks observers matching the blacklist as // inactive=1 so they are hidden from API responses. Runs once at startup. func softDeleteBlacklistedObservers(dbPath string, blacklist []string) { - rw, err := openRW(dbPath) + rw, err := cachedRW(dbPath) if err != nil { log.Printf("[observer-blacklist] warning: could not open DB for soft-delete: %v", err) return } - defer rw.Close() placeholders := make([]string, 0, len(blacklist)) args := make([]interface{}, 0, len(blacklist)) @@ -528,16 +522,12 @@ func backfillResolvedPathsAsync(store *PacketStore, dbPath string, chunkSize int var rw *sql.DB if dbPath != "" { var err error - rw, err = openRW(dbPath) + rw, err = cachedRW(dbPath) if err != nil { log.Printf("[store] async backfill: open rw error: %v", err) } } - defer func() { - if rw != nil { - rw.Close() - } - }() + // rw is cached process-wide; do not close totalProcessed := 0 for totalProcessed < totalPending { @@ -762,11 +752,10 @@ func PruneNeighborEdges(dbPath string, graph *NeighborGraph, maxAgeDays int) (in // 1. Prune from SQLite using a read-write connection var dbPruned int64 - rw, err := openRW(dbPath) + rw, err := cachedRW(dbPath) if err != nil { return 0, fmt.Errorf("prune neighbor_edges: open rw: %w", err) } - defer rw.Close() res, err := rw.Exec("DELETE FROM neighbor_edges WHERE last_seen < ?", cutoff.Format(time.RFC3339)) if err != nil { return 0, fmt.Errorf("prune neighbor_edges: %w", err) diff --git a/cmd/server/rw_cache.go b/cmd/server/rw_cache.go new file mode 100644 index 00000000..b22fa11d --- /dev/null +++ b/cmd/server/rw_cache.go @@ -0,0 +1,59 @@ +package main + +import ( + "database/sql" + "fmt" + "sync" +) + +// rwCache holds a process-wide cached RW connection per database path. +// Instead of opening and closing a new RW connection on every call to openRW, +// we cache a single *sql.DB (which internally manages one connection due to +// SetMaxOpenConns(1)). This eliminates repeated open/close overhead for +// vacuum, prune, persist operations that run frequently (#921). +var rwCache = struct { + mu sync.Mutex + conns map[string]*sql.DB +}{conns: make(map[string]*sql.DB)} + +// cachedRW returns a cached read-write connection for the given dbPath. +// The connection is created on first call and reused thereafter. +// Callers MUST NOT call Close() on the returned *sql.DB. +func cachedRW(dbPath string) (*sql.DB, error) { + rwCache.mu.Lock() + defer rwCache.mu.Unlock() + + if db, ok := rwCache.conns[dbPath]; ok { + return db, nil + } + + dsn := fmt.Sprintf("file:%s?_journal_mode=WAL", dbPath) + db, err := sql.Open("sqlite", dsn) + if err != nil { + return nil, err + } + db.SetMaxOpenConns(1) + if _, err := db.Exec("PRAGMA busy_timeout = 5000"); err != nil { + db.Close() + return nil, fmt.Errorf("set busy_timeout: %w", err) + } + rwCache.conns[dbPath] = db + return db, nil +} + +// closeRWCache closes all cached RW connections (for tests/shutdown). +func closeRWCache() { + rwCache.mu.Lock() + defer rwCache.mu.Unlock() + for k, db := range rwCache.conns { + db.Close() + delete(rwCache.conns, k) + } +} + +// rwCacheLen returns the number of cached connections (for testing). +func rwCacheLen() int { + rwCache.mu.Lock() + defer rwCache.mu.Unlock() + return len(rwCache.conns) +} diff --git a/cmd/server/rw_cache_test.go b/cmd/server/rw_cache_test.go new file mode 100644 index 00000000..96c76369 --- /dev/null +++ b/cmd/server/rw_cache_test.go @@ -0,0 +1,55 @@ +package main + +import ( + "os" + "path/filepath" + "testing" +) + +func TestCachedRW_ReturnsSameHandle(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + + // Create the DB file + f, _ := os.Create(dbPath) + f.Close() + + defer closeRWCache() + + db1, err := cachedRW(dbPath) + if err != nil { + t.Fatalf("first cachedRW: %v", err) + } + db2, err := cachedRW(dbPath) + if err != nil { + t.Fatalf("second cachedRW: %v", err) + } + if db1 != db2 { + t.Fatalf("cachedRW returned different handles: %p vs %p", db1, db2) + } +} + +func TestCachedRW_100Calls_SingleConnection(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + f, _ := os.Create(dbPath) + f.Close() + + defer closeRWCache() + + var first interface{} + for i := 0; i < 100; i++ { + db, err := cachedRW(dbPath) + if err != nil { + t.Fatalf("call %d: %v", i, err) + } + if i == 0 { + first = db + } else if db != first { + t.Fatalf("call %d returned different handle", i) + } + } + if rwCacheLen() != 1 { + t.Fatalf("expected 1 cached connection, got %d", rwCacheLen()) + } +} diff --git a/cmd/server/vacuum.go b/cmd/server/vacuum.go index a53556a5..bea629dc 100644 --- a/cmd/server/vacuum.go +++ b/cmd/server/vacuum.go @@ -37,12 +37,11 @@ func checkAutoVacuum(db *DB, cfg *Config, dbPath string) { log.Printf("[db] vacuumOnStartup=true — starting one-time full VACUUM (ensure 2x DB size free disk space)...") start := time.Now() - rw, err := openRW(dbPath) + rw, err := cachedRW(dbPath) if err != nil { log.Printf("[db] VACUUM failed: could not open RW connection: %v", err) return } - defer rw.Close() if _, err := rw.Exec("PRAGMA auto_vacuum = INCREMENTAL"); err != nil { log.Printf("[db] VACUUM failed: could not set auto_vacuum: %v", err) @@ -71,12 +70,11 @@ func checkAutoVacuum(db *DB, cfg *Config, dbPath string) { // runIncrementalVacuum runs PRAGMA incremental_vacuum(N) on a read-write // connection. Safe to call on auto_vacuum=NONE databases (noop). func runIncrementalVacuum(dbPath string, pages int) { - rw, err := openRW(dbPath) + rw, err := cachedRW(dbPath) if err != nil { log.Printf("[vacuum] could not open RW connection: %v", err) return } - defer rw.Close() if _, err := rw.Exec(fmt.Sprintf("PRAGMA incremental_vacuum(%d)", pages)); err != nil { log.Printf("[vacuum] incremental_vacuum error: %v", err) diff --git a/internal/dbconfig/dbconfig.go b/internal/dbconfig/dbconfig.go new file mode 100644 index 00000000..ef9f7eac --- /dev/null +++ b/internal/dbconfig/dbconfig.go @@ -0,0 +1,17 @@ +// Package dbconfig provides the shared DBConfig struct used by both the server +// and ingestor binaries for SQLite vacuum and maintenance settings (#919, #921). +package dbconfig + +// DBConfig controls SQLite vacuum and maintenance behavior (#919). +type DBConfig struct { + VacuumOnStartup bool `json:"vacuumOnStartup"` // one-time full VACUUM on startup if auto_vacuum is not INCREMENTAL + IncrementalVacuumPages int `json:"incrementalVacuumPages"` // pages returned to OS per reaper cycle (default 1024) +} + +// GetIncrementalVacuumPages returns the configured pages or 1024 default. +func (c *DBConfig) GetIncrementalVacuumPages() int { + if c != nil && c.IncrementalVacuumPages > 0 { + return c.IncrementalVacuumPages + } + return 1024 +} diff --git a/internal/dbconfig/dbconfig_test.go b/internal/dbconfig/dbconfig_test.go new file mode 100644 index 00000000..cea6f4a4 --- /dev/null +++ b/internal/dbconfig/dbconfig_test.go @@ -0,0 +1,21 @@ +package dbconfig + +import "testing" + +func TestGetIncrementalVacuumPages_Default(t *testing.T) { + var c *DBConfig + if got := c.GetIncrementalVacuumPages(); got != 1024 { + t.Fatalf("nil DBConfig: got %d, want 1024", got) + } + c = &DBConfig{} + if got := c.GetIncrementalVacuumPages(); got != 1024 { + t.Fatalf("zero DBConfig: got %d, want 1024", got) + } +} + +func TestGetIncrementalVacuumPages_Configured(t *testing.T) { + c := &DBConfig{IncrementalVacuumPages: 512} + if got := c.GetIncrementalVacuumPages(); got != 512 { + t.Fatalf("got %d, want 512", got) + } +} diff --git a/internal/dbconfig/go.mod b/internal/dbconfig/go.mod new file mode 100644 index 00000000..314ff3b5 --- /dev/null +++ b/internal/dbconfig/go.mod @@ -0,0 +1,3 @@ +module github.com/meshcore-analyzer/dbconfig + +go 1.22