diff --git a/cmd/server/chunked_load.go b/cmd/server/chunked_load.go index e04b03f7..143044e4 100644 --- a/cmd/server/chunked_load.go +++ b/cmd/server/chunked_load.go @@ -106,37 +106,59 @@ func (s *PacketStore) fireChunkCallbacks(rowsThisChunk, totalRows int) { } // RunStartupLoad orchestrates the startup load sequence: -// 1. start LoadChunked (async) -// 2. caller waits for FirstChunkReady to bind the HTTP listener -// 3. spawn the background fill loader AFTER LoadChunked completes, -// so s.oldestLoaded is set before the bg loader reads it (#1809) +// 1. run LoadChunked synchronously (FirstChunkReady is signaled +// internally so callers waiting on it in parallel can bind HTTP) +// 2. on success, run loadBackgroundChunks synchronously so +// s.oldestLoaded is guaranteed set before the bg loader reads +// it (#1809). // -// chunkSize=0 uses the LoadChunked default. Blocks until LoadChunked -// AND any background loader have finished. Callers that want to bind -// the HTTP listener at FirstChunkReady should run this in a goroutine -// and wait on FirstChunkReady() themselves. +// chunkSize=0 uses the LoadChunked default. The function blocks until +// LoadChunked AND any background loader have finished. Callers that +// want to bind the HTTP listener at FirstChunkReady should run this +// in a goroutine and wait on FirstChunkReady() themselves. +// +// Steady-state contracts post-return: +// - LoadChunked error: backgroundLoadFailed=true, backgroundLoadDone +// stays false, backgroundLoadErr non-empty. Returns the error. +// - hotStartupHours == 0: backgroundLoadDone=true, failed=false +// (no background work was needed). +// - hotStartupHours > 0 success: terminal state is whatever +// loadBackgroundChunks set (done=true on full coverage, +// failed=true on partial / chunk errors — see #1690). // // Issue #1809 root cause: previously main.go spawned loadBackgroundChunks // at FirstChunkReady while LoadChunked was still merging the remainder // of the hot window. s.oldestLoaded is only assigned at the end of -// LoadChunked (chunked_load.go:330), so the bg loader read "" and -// bailed → coverage gate trips → backgroundLoadFailed=true. Gating the -// bg loader on LoadChunked completion preserves the FirstChunkReady -// HTTP-bind parallelism while ensuring oldestLoaded has a valid floor -// when the bg loader starts. +// LoadChunked, so the bg loader read "" and bailed → coverage gate +// trips → backgroundLoadFailed=true. Running the bg loader after +// LoadChunked returns preserves the FirstChunkReady HTTP-bind +// parallelism while ensuring oldestLoaded has a valid floor when the +// bg loader starts. func (s *PacketStore) RunStartupLoad(chunkSize int) error { - loadErrCh := make(chan error, 1) - go func() { - loadErrCh <- s.LoadChunked(chunkSize) - }() - // Block until LoadChunked returns. Callers that want to bind their - // HTTP listener earlier can wait on FirstChunkReady() in parallel. - if err := <-loadErrCh; err != nil { + if err := s.LoadChunked(chunkSize); err != nil { + // Pick a terminal state on the error path so /api/healthz + + // backgroundLoadComplete don't stay undefined (done=false, + // failed=false) forever. + s.bgErrMu.Lock() + s.backgroundLoadErr = fmt.Sprintf("LoadChunked failed: %v", err) + s.bgErrMu.Unlock() + s.backgroundLoadFailed.Store(true) return err } - if s.hotStartupHours > 0 { - s.loadBackgroundChunks() + if s.hotStartupHours <= 0 { + // No bg work required → terminal steady state is done=true, + // failed=false. Without this the healthz probe would see + // backgroundLoadComplete=false indefinitely. + s.backgroundLoadDone.Store(true) + s.backgroundLoadProgress.Store(100) + return nil } + // INFO signal between LoadChunked completion and the bg loader + // kick-off. The post-mortem of #1809 needed exactly this line to + // confirm the bg loader actually started after oldestLoaded was set. + log.Printf("[store] LoadChunked complete (oldestLoaded=%q) — starting background fill loader (retentionHours=%gh, hotStartupHours=%gh)", + s.oldestLoaded, s.retentionHours, s.hotStartupHours) + s.loadBackgroundChunks() return nil } diff --git a/cmd/server/main.go b/cmd/server/main.go index 522ae660..3e0219e5 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -242,8 +242,10 @@ func main() { log.Printf("[store] RunStartupLoad completed before first-chunk signal (empty DB?)") } if store.hotStartupHours > 0 { - log.Printf("[store] background load will start after LoadChunked completes: filling retentionHours=%gh from hotStartupHours=%gh", - store.retentionHours, store.hotStartupHours) + log.Printf("[store] hot-startup window configured: hotStartupHours=%gh, retentionHours=%gh (background fill loader runs after LoadChunked succeeds — see RunStartupLoad)", + store.hotStartupHours, store.retentionHours) + } else { + log.Printf("[store] hot-startup disabled (hotStartupHours=0) — no background fill loader will run") } go func() { if err := <-loadErrCh; err != nil { diff --git a/cmd/server/runstartup_load_test.go b/cmd/server/runstartup_load_test.go new file mode 100644 index 00000000..9b686668 --- /dev/null +++ b/cmd/server/runstartup_load_test.go @@ -0,0 +1,248 @@ +package main + +// Tests for RunStartupLoad branch behavior and #1809 invariants +// (PR #1811 round-1 followups B2/B3/B4/B5). +// +// The pre-#1811 RunStartupLoad left several steady states undefined: +// * hotStartupHours == 0 → backgroundLoadDone stayed false forever +// * LoadChunked error → both done & failed stayed false +// * empty DB + no bg work needed → backgroundLoadDone stayed false +// +// These tests codify the post-#1811 contract: +// * LoadChunked error → backgroundLoadFailed=true, done=false +// * hotStartupHours == 0 → backgroundLoadDone=true, failed=false, +// bg loader NOT called +// * empty DB + hot window → backgroundLoadDone reflects coverage +// (1.0 on empty DB → done=true, failed=false) +// * call ordering inside RunStartupLoad: LoadChunked completes +// before loadBackgroundChunks executes (so oldestLoaded is set) + +import ( + "database/sql" + "fmt" + "path/filepath" + "testing" + "time" +) + +// TestRunStartupLoad_HotStartupHoursZero_SetsDoneImmediately covers B3: +// when hotStartupHours == 0 the bg loader has no work to do; healthz +// must NOT be stuck on backgroundLoadComplete=false. +func TestRunStartupLoad_HotStartupHoursZero_SetsDoneImmediately(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + nowSec := time.Now().UTC().Unix() + createTestDBWithLastSeen(t, dbPath, 10, 1, nowSec, + 30*time.Minute, 30*time.Minute) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatalf("OpenDB: %v", err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 168, + HotStartupHours: 0, // disable hot window → bg loader must not run + }) + + if err := store.RunStartupLoad(500); err != nil { + t.Fatalf("RunStartupLoad: %v", err) + } + if !store.backgroundLoadDone.Load() { + t.Fatalf("backgroundLoadDone must be true when hotStartupHours=0 (no bg work needed)") + } + if store.backgroundLoadFailed.Load() { + t.Fatalf("backgroundLoadFailed must be false on the no-bg-work path; got error=%q", + store.BackgroundLoadError()) + } +} + +// TestRunStartupLoad_LoadChunkedError_SetsFailedTerminal covers B2: +// when LoadChunked errors, the steady state must be terminal +// (failed=true) — not the pre-fix (done=false, failed=false). +func TestRunStartupLoad_LoadChunkedError_SetsFailedTerminal(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + nowSec := time.Now().UTC().Unix() + createTestDBWithLastSeen(t, dbPath, 5, 1, nowSec, + 30*time.Minute, 30*time.Minute) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatalf("OpenDB: %v", err) + } + // Close the underlying connection to force LoadChunked to fail on + // its very first query. We're explicitly verifying the failure path + // terminal state, not the success path. + _ = db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 168, + HotStartupHours: 1, + }) + + loadErr := store.RunStartupLoad(500) + if loadErr == nil { + t.Fatalf("RunStartupLoad must return an error when LoadChunked fails") + } + if !store.backgroundLoadFailed.Load() { + t.Fatalf("backgroundLoadFailed must be true after LoadChunked error (terminal state)") + } + if store.backgroundLoadDone.Load() { + t.Fatalf("backgroundLoadDone must remain false on LoadChunked error") + } + if store.BackgroundLoadError() == "" { + t.Fatalf("BackgroundLoadError must be non-empty after LoadChunked failure") + } +} + +// TestRunStartupLoad_EmptyDB_SetsDoneTerminal covers B4: empty DB with +// hot window > 0 — oldestLoaded stays "" because there are no packets. +// loadBackgroundChunks must reach its coverage block (totalInDB==0 → +// ratio=1.0) and set done=true rather than leaving the store stuck. +func TestRunStartupLoad_EmptyDB_SetsDoneTerminal(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + createTestDBWithLastSeen(t, dbPath, 0, 0, time.Now().UTC().Unix(), + 30*time.Minute, 30*time.Minute) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatalf("OpenDB: %v", err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 168, + HotStartupHours: 1, + }) + + if err := store.RunStartupLoad(500); err != nil { + t.Fatalf("RunStartupLoad on empty DB: %v", err) + } + if !store.backgroundLoadDone.Load() { + t.Fatalf("backgroundLoadDone must be true after empty-DB load (nothing to load == complete)") + } + if store.backgroundLoadFailed.Load() { + t.Fatalf("backgroundLoadFailed must be false on empty DB; got %q", + store.BackgroundLoadError()) + } +} + +// TestRunStartupLoad_BgLoaderRunsAfterLoadChunkedSets_OldestLoaded +// covers B5/B6: assert the in-process call ordering inside +// RunStartupLoad. The OnChunkLoaded hook fires from LoadChunked; the +// loadBackgroundChunks panic guard fires only if oldestLoaded=="" at +// entry. So observing the chunk callback strictly before the bg loader +// (which is exercised via the loop continuing without panic) is the +// minimum guarantee. If a future refactor re-introduces the parallel +// spawn pattern, the runtime assertion in loadBackgroundChunks will +// trip and this test will fail. +func TestRunStartupLoad_BgLoaderRunsAfterLoadChunkedSets_OldestLoaded(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + nowSec := time.Now().UTC().Unix() + createTestDBWithLastSeen(t, dbPath, 50, 1, nowSec, + 30*time.Minute, 30*time.Minute) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatalf("OpenDB: %v", err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 168, + HotStartupHours: 1, + }) + + // Hook: LoadChunked fires OnChunkLoaded after each chunk merge. + // We record whether it fired before RunStartupLoad returned. The + // runtime assertion in loadBackgroundChunks ensures the bg loader + // observes a non-empty oldestLoaded; if a future refactor parallels + // the bg-loader spawn with LoadChunked, that assertion panics. + chunkSeen := false + store.OnChunkLoaded(func(rowsThisChunk, totalRows int) { + chunkSeen = true + }) + + if err := store.RunStartupLoad(500); err != nil { + t.Fatalf("RunStartupLoad: %v", err) + } + if !chunkSeen { + t.Fatalf("LoadChunked OnChunkLoaded did not fire — chunk loop did not execute before bg loader") + } + if store.oldestLoaded == "" { + t.Fatalf("oldestLoaded is empty after RunStartupLoad — bg loader would have read \"\" and bailed") + } +} + +// TestLoadBackgroundChunks_PanicsOnOldestLoadedEmpty_Invariant covers the +// runtime assertion (A7). Manually populate s.packets without setting +// oldestLoaded and call loadBackgroundChunks directly — the panic guard +// must fire so future refactors cannot silently re-introduce the +// #1809 race. +func TestLoadBackgroundChunks_PanicsOnOldestLoadedEmpty_Invariant(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + conn, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL") + if err != nil { + t.Fatal(err) + } + // Create the bare minimum schema so OpenDB succeeds; we don't care + // about row count — only the guard at the top of the function. + if _, err := conn.Exec(`CREATE TABLE transmissions ( + id INTEGER PRIMARY KEY, raw_hex TEXT, hash TEXT, first_seen TEXT, + route_type INTEGER, payload_type INTEGER, payload_version INTEGER, + decoded_json TEXT, last_seen INTEGER NOT NULL DEFAULT 0)`); err != nil { + t.Fatal(err) + } + if _, err := conn.Exec(`CREATE TABLE observations ( + id INTEGER PRIMARY KEY, transmission_id INTEGER, observer_id TEXT, + observer_name TEXT, direction TEXT, snr REAL, rssi REAL, + score INTEGER, path_json TEXT, timestamp TEXT, raw_hex TEXT)`); err != nil { + t.Fatal(err) + } + if _, err := conn.Exec(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`); err != nil { + t.Fatal(err) + } + if _, err := conn.Exec(`CREATE TABLE nodes (pubkey TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL, last_seen TEXT, first_seen TEXT, frequency REAL)`); err != nil { + t.Fatal(err) + } + if _, err := conn.Exec(`CREATE TABLE schema_version (version INTEGER)`); err != nil { + t.Fatal(err) + } + if _, err := conn.Exec(`INSERT INTO schema_version (version) VALUES (1)`); err != nil { + t.Fatal(err) + } + conn.Close() + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatalf("OpenDB: %v", err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 168, + HotStartupHours: 1, + }) + // Simulate the #1809 race: packets present, oldestLoaded never set. + store.mu.Lock() + store.packets = append(store.packets, &StoreTx{ID: 1, Hash: "deadbeef", FirstSeen: "2025-01-01T00:00:00Z"}) + store.oldestLoaded = "" + store.mu.Unlock() + + defer func() { + r := recover() + if r == nil { + t.Fatalf("loadBackgroundChunks must panic when oldestLoaded=\"\" with packets in store (#1809 invariant)") + } + msg := fmt.Sprintf("%v", r) + if msg == "" { + t.Fatalf("panic message must be non-empty") + } + }() + store.loadBackgroundChunks() +} diff --git a/cmd/server/store.go b/cmd/server/store.go index 4b4322e9..3a9967f6 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -421,6 +421,13 @@ type PacketStore struct { // Read order MUST be: load backgroundLoadDone first; only if true // is backgroundLoadFailed meaningful. // 0 = disabled (current behavior). Background loader fills the rest. + // + // IMMUTABILITY: hotStartupHours is set ONCE in NewPacketStore (with + // optional clamp against retentionHours) and is NEVER mutated + // afterward. Readers (LoadChunked, RunStartupLoad, + // loadBackgroundChunks, GetPerfStoreStats) intentionally read it + // without s.mu. Do not add a write path without also adding the + // lock to every reader — see #1809 / #1811. hotStartupHours float64 backgroundLoadDone atomic.Bool backgroundLoadFailed atomic.Bool @@ -1418,6 +1425,22 @@ func (s *PacketStore) loadChunk(from, to time.Time) error { // chunks are merged it rebuilds analytics indexes once. Chunk errors are // handled by advancing past the failed window so the loop always terminates. func (s *PacketStore) loadBackgroundChunks() { + // #1809 invariant: oldestLoaded MUST be set before the bg loader + // runs whenever the in-memory store has packets. The original bug + // was a parallel spawn that read oldestLoaded="" and silently + // bailed → coverage gate trips → backgroundLoadFailed=true. Encode + // the precondition here so a future refactor that re-introduces + // the race fails loudly instead of silently shipping the same + // regression. Empty store + empty oldestLoaded is the legitimate + // "empty DB" path and is allowed. + s.mu.RLock() + oldestAtEntry := s.oldestLoaded + packetCountAtEntry := len(s.packets) + s.mu.RUnlock() + if oldestAtEntry == "" && packetCountAtEntry > 0 { + panic(fmt.Sprintf("loadBackgroundChunks: oldestLoaded=\"\" with %d packets in store — LoadChunked must run to completion first (#1809)", packetCountAtEntry)) + } + if s.retentionHours <= 0 { s.backgroundLoadDone.Store(true) return