fix(#1811/#1809): tighten RunStartupLoad terminal states + #1809 invariant guard

Round-1 fix consolidating reviewer findings (adversarial + Dijkstra +
Kent Beck) on PR #1811. Production behavior changes + new unit tests
in one commit:

Production fixes (chunked_load.go, main.go, store.go):
  * RunStartupLoad: inline the LoadChunked call (drop superfluous
    goroutine + channel — direct call is equivalent).
  * RunStartupLoad: on LoadChunked error, set backgroundLoadFailed=true
    with a captured error string. Pre-fix: undefined (done=false,
    failed=false) terminal state.
  * RunStartupLoad: when hotStartupHours==0 set backgroundLoadDone=true
    immediately (no bg work needed). Pre-fix: /api/healthz +
    backgroundLoadComplete stayed false forever on this branch.
  * RunStartupLoad: log an INFO line between LoadChunked completion
    and bg-loader start (the post-mortem of #1809 needed this signal).
  * main.go: stop logging 'background load will start' on the
    hotStartupHours==0 branch where it was a lie. Branch the log
    message based on the actual path taken.
  * loadBackgroundChunks: runtime assertion — panic if oldestLoaded
    is '' while packets are present. Future refactors that
    re-introduce the #1809 parallel spawn race fail loudly instead
    of silently shipping the same coverage-gate regression.
  * hotStartupHours: documented as immutable post-construction (no
    locks needed on readers).

Tests (runstartup_load_test.go) — codify the new contracts:
  * TestRunStartupLoad_HotStartupHoursZero_SetsDoneImmediately (B3)
  * TestRunStartupLoad_LoadChunkedError_SetsFailedTerminal (B2)
  * TestRunStartupLoad_EmptyDB_SetsDoneTerminal (B4)
  * TestRunStartupLoad_BgLoaderRunsAfterLoadChunkedSets_OldestLoaded
    (B5/B6 — behavior-describing name, no issue number in fn name)
  * TestLoadBackgroundChunks_PanicsOnOldestLoadedEmpty_Invariant (A7)

The original TDD red→green chain (c9c782bf532d08) is preserved
intact.

Refs #1809, PR #1811.
This commit is contained in:
meshcore-bot
2026-06-30 14:28:47 -07:00
parent 08526d9e5f
commit db5592f653
4 changed files with 319 additions and 24 deletions
+44 -22
View File
@@ -106,37 +106,59 @@ func (s *PacketStore) fireChunkCallbacks(rowsThisChunk, totalRows int) {
}
// RunStartupLoad orchestrates the startup load sequence:
// 1. start LoadChunked (async)
// 2. caller waits for FirstChunkReady to bind the HTTP listener
// 3. spawn the background fill loader AFTER LoadChunked completes,
// so s.oldestLoaded is set before the bg loader reads it (#1809)
// 1. run LoadChunked synchronously (FirstChunkReady is signaled
// internally so callers waiting on it in parallel can bind HTTP)
// 2. on success, run loadBackgroundChunks synchronously so
// s.oldestLoaded is guaranteed set before the bg loader reads
// it (#1809).
//
// chunkSize=0 uses the LoadChunked default. Blocks until LoadChunked
// AND any background loader have finished. Callers that want to bind
// the HTTP listener at FirstChunkReady should run this in a goroutine
// and wait on FirstChunkReady() themselves.
// chunkSize=0 uses the LoadChunked default. The function blocks until
// LoadChunked AND any background loader have finished. Callers that
// want to bind the HTTP listener at FirstChunkReady should run this
// in a goroutine and wait on FirstChunkReady() themselves.
//
// Steady-state contracts post-return:
// - LoadChunked error: backgroundLoadFailed=true, backgroundLoadDone
// stays false, backgroundLoadErr non-empty. Returns the error.
// - hotStartupHours == 0: backgroundLoadDone=true, failed=false
// (no background work was needed).
// - hotStartupHours > 0 success: terminal state is whatever
// loadBackgroundChunks set (done=true on full coverage,
// failed=true on partial / chunk errors — see #1690).
//
// Issue #1809 root cause: previously main.go spawned loadBackgroundChunks
// at FirstChunkReady while LoadChunked was still merging the remainder
// of the hot window. s.oldestLoaded is only assigned at the end of
// LoadChunked (chunked_load.go:330), so the bg loader read "" and
// bailed → coverage gate trips → backgroundLoadFailed=true. Gating the
// bg loader on LoadChunked completion preserves the FirstChunkReady
// HTTP-bind parallelism while ensuring oldestLoaded has a valid floor
// when the bg loader starts.
// LoadChunked, so the bg loader read "" and bailed → coverage gate
// trips → backgroundLoadFailed=true. Running the bg loader after
// LoadChunked returns preserves the FirstChunkReady HTTP-bind
// parallelism while ensuring oldestLoaded has a valid floor when the
// bg loader starts.
func (s *PacketStore) RunStartupLoad(chunkSize int) error {
loadErrCh := make(chan error, 1)
go func() {
loadErrCh <- s.LoadChunked(chunkSize)
}()
// Block until LoadChunked returns. Callers that want to bind their
// HTTP listener earlier can wait on FirstChunkReady() in parallel.
if err := <-loadErrCh; err != nil {
if err := s.LoadChunked(chunkSize); err != nil {
// Pick a terminal state on the error path so /api/healthz +
// backgroundLoadComplete don't stay undefined (done=false,
// failed=false) forever.
s.bgErrMu.Lock()
s.backgroundLoadErr = fmt.Sprintf("LoadChunked failed: %v", err)
s.bgErrMu.Unlock()
s.backgroundLoadFailed.Store(true)
return err
}
if s.hotStartupHours > 0 {
s.loadBackgroundChunks()
if s.hotStartupHours <= 0 {
// No bg work required → terminal steady state is done=true,
// failed=false. Without this the healthz probe would see
// backgroundLoadComplete=false indefinitely.
s.backgroundLoadDone.Store(true)
s.backgroundLoadProgress.Store(100)
return nil
}
// INFO signal between LoadChunked completion and the bg loader
// kick-off. The post-mortem of #1809 needed exactly this line to
// confirm the bg loader actually started after oldestLoaded was set.
log.Printf("[store] LoadChunked complete (oldestLoaded=%q) — starting background fill loader (retentionHours=%gh, hotStartupHours=%gh)",
s.oldestLoaded, s.retentionHours, s.hotStartupHours)
s.loadBackgroundChunks()
return nil
}
+4 -2
View File
@@ -242,8 +242,10 @@ func main() {
log.Printf("[store] RunStartupLoad completed before first-chunk signal (empty DB?)")
}
if store.hotStartupHours > 0 {
log.Printf("[store] background load will start after LoadChunked completes: filling retentionHours=%gh from hotStartupHours=%gh",
store.retentionHours, store.hotStartupHours)
log.Printf("[store] hot-startup window configured: hotStartupHours=%gh, retentionHours=%gh (background fill loader runs after LoadChunked succeeds — see RunStartupLoad)",
store.hotStartupHours, store.retentionHours)
} else {
log.Printf("[store] hot-startup disabled (hotStartupHours=0) — no background fill loader will run")
}
go func() {
if err := <-loadErrCh; err != nil {
+248
View File
@@ -0,0 +1,248 @@
package main
// Tests for RunStartupLoad branch behavior and #1809 invariants
// (PR #1811 round-1 followups B2/B3/B4/B5).
//
// The pre-#1811 RunStartupLoad left several steady states undefined:
// * hotStartupHours == 0 → backgroundLoadDone stayed false forever
// * LoadChunked error → both done & failed stayed false
// * empty DB + no bg work needed → backgroundLoadDone stayed false
//
// These tests codify the post-#1811 contract:
// * LoadChunked error → backgroundLoadFailed=true, done=false
// * hotStartupHours == 0 → backgroundLoadDone=true, failed=false,
// bg loader NOT called
// * empty DB + hot window → backgroundLoadDone reflects coverage
// (1.0 on empty DB → done=true, failed=false)
// * call ordering inside RunStartupLoad: LoadChunked completes
// before loadBackgroundChunks executes (so oldestLoaded is set)
import (
"database/sql"
"fmt"
"path/filepath"
"testing"
"time"
)
// TestRunStartupLoad_HotStartupHoursZero_SetsDoneImmediately covers B3:
// when hotStartupHours == 0 the bg loader has no work to do; healthz
// must NOT be stuck on backgroundLoadComplete=false.
func TestRunStartupLoad_HotStartupHoursZero_SetsDoneImmediately(t *testing.T) {
dir := t.TempDir()
dbPath := filepath.Join(dir, "test.db")
nowSec := time.Now().UTC().Unix()
createTestDBWithLastSeen(t, dbPath, 10, 1, nowSec,
30*time.Minute, 30*time.Minute)
db, err := OpenDB(dbPath)
if err != nil {
t.Fatalf("OpenDB: %v", err)
}
defer db.conn.Close()
store := NewPacketStore(db, &PacketStoreConfig{
RetentionHours: 168,
HotStartupHours: 0, // disable hot window → bg loader must not run
})
if err := store.RunStartupLoad(500); err != nil {
t.Fatalf("RunStartupLoad: %v", err)
}
if !store.backgroundLoadDone.Load() {
t.Fatalf("backgroundLoadDone must be true when hotStartupHours=0 (no bg work needed)")
}
if store.backgroundLoadFailed.Load() {
t.Fatalf("backgroundLoadFailed must be false on the no-bg-work path; got error=%q",
store.BackgroundLoadError())
}
}
// TestRunStartupLoad_LoadChunkedError_SetsFailedTerminal covers B2:
// when LoadChunked errors, the steady state must be terminal
// (failed=true) — not the pre-fix (done=false, failed=false).
func TestRunStartupLoad_LoadChunkedError_SetsFailedTerminal(t *testing.T) {
dir := t.TempDir()
dbPath := filepath.Join(dir, "test.db")
nowSec := time.Now().UTC().Unix()
createTestDBWithLastSeen(t, dbPath, 5, 1, nowSec,
30*time.Minute, 30*time.Minute)
db, err := OpenDB(dbPath)
if err != nil {
t.Fatalf("OpenDB: %v", err)
}
// Close the underlying connection to force LoadChunked to fail on
// its very first query. We're explicitly verifying the failure path
// terminal state, not the success path.
_ = db.conn.Close()
store := NewPacketStore(db, &PacketStoreConfig{
RetentionHours: 168,
HotStartupHours: 1,
})
loadErr := store.RunStartupLoad(500)
if loadErr == nil {
t.Fatalf("RunStartupLoad must return an error when LoadChunked fails")
}
if !store.backgroundLoadFailed.Load() {
t.Fatalf("backgroundLoadFailed must be true after LoadChunked error (terminal state)")
}
if store.backgroundLoadDone.Load() {
t.Fatalf("backgroundLoadDone must remain false on LoadChunked error")
}
if store.BackgroundLoadError() == "" {
t.Fatalf("BackgroundLoadError must be non-empty after LoadChunked failure")
}
}
// TestRunStartupLoad_EmptyDB_SetsDoneTerminal covers B4: empty DB with
// hot window > 0 — oldestLoaded stays "" because there are no packets.
// loadBackgroundChunks must reach its coverage block (totalInDB==0 →
// ratio=1.0) and set done=true rather than leaving the store stuck.
func TestRunStartupLoad_EmptyDB_SetsDoneTerminal(t *testing.T) {
dir := t.TempDir()
dbPath := filepath.Join(dir, "test.db")
createTestDBWithLastSeen(t, dbPath, 0, 0, time.Now().UTC().Unix(),
30*time.Minute, 30*time.Minute)
db, err := OpenDB(dbPath)
if err != nil {
t.Fatalf("OpenDB: %v", err)
}
defer db.conn.Close()
store := NewPacketStore(db, &PacketStoreConfig{
RetentionHours: 168,
HotStartupHours: 1,
})
if err := store.RunStartupLoad(500); err != nil {
t.Fatalf("RunStartupLoad on empty DB: %v", err)
}
if !store.backgroundLoadDone.Load() {
t.Fatalf("backgroundLoadDone must be true after empty-DB load (nothing to load == complete)")
}
if store.backgroundLoadFailed.Load() {
t.Fatalf("backgroundLoadFailed must be false on empty DB; got %q",
store.BackgroundLoadError())
}
}
// TestRunStartupLoad_BgLoaderRunsAfterLoadChunkedSets_OldestLoaded
// covers B5/B6: assert the in-process call ordering inside
// RunStartupLoad. The OnChunkLoaded hook fires from LoadChunked; the
// loadBackgroundChunks panic guard fires only if oldestLoaded=="" at
// entry. So observing the chunk callback strictly before the bg loader
// (which is exercised via the loop continuing without panic) is the
// minimum guarantee. If a future refactor re-introduces the parallel
// spawn pattern, the runtime assertion in loadBackgroundChunks will
// trip and this test will fail.
func TestRunStartupLoad_BgLoaderRunsAfterLoadChunkedSets_OldestLoaded(t *testing.T) {
dir := t.TempDir()
dbPath := filepath.Join(dir, "test.db")
nowSec := time.Now().UTC().Unix()
createTestDBWithLastSeen(t, dbPath, 50, 1, nowSec,
30*time.Minute, 30*time.Minute)
db, err := OpenDB(dbPath)
if err != nil {
t.Fatalf("OpenDB: %v", err)
}
defer db.conn.Close()
store := NewPacketStore(db, &PacketStoreConfig{
RetentionHours: 168,
HotStartupHours: 1,
})
// Hook: LoadChunked fires OnChunkLoaded after each chunk merge.
// We record whether it fired before RunStartupLoad returned. The
// runtime assertion in loadBackgroundChunks ensures the bg loader
// observes a non-empty oldestLoaded; if a future refactor parallels
// the bg-loader spawn with LoadChunked, that assertion panics.
chunkSeen := false
store.OnChunkLoaded(func(rowsThisChunk, totalRows int) {
chunkSeen = true
})
if err := store.RunStartupLoad(500); err != nil {
t.Fatalf("RunStartupLoad: %v", err)
}
if !chunkSeen {
t.Fatalf("LoadChunked OnChunkLoaded did not fire — chunk loop did not execute before bg loader")
}
if store.oldestLoaded == "" {
t.Fatalf("oldestLoaded is empty after RunStartupLoad — bg loader would have read \"\" and bailed")
}
}
// TestLoadBackgroundChunks_PanicsOnOldestLoadedEmpty_Invariant covers the
// runtime assertion (A7). Manually populate s.packets without setting
// oldestLoaded and call loadBackgroundChunks directly — the panic guard
// must fire so future refactors cannot silently re-introduce the
// #1809 race.
func TestLoadBackgroundChunks_PanicsOnOldestLoadedEmpty_Invariant(t *testing.T) {
dir := t.TempDir()
dbPath := filepath.Join(dir, "test.db")
conn, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL")
if err != nil {
t.Fatal(err)
}
// Create the bare minimum schema so OpenDB succeeds; we don't care
// about row count — only the guard at the top of the function.
if _, err := conn.Exec(`CREATE TABLE transmissions (
id INTEGER PRIMARY KEY, raw_hex TEXT, hash TEXT, first_seen TEXT,
route_type INTEGER, payload_type INTEGER, payload_version INTEGER,
decoded_json TEXT, last_seen INTEGER NOT NULL DEFAULT 0)`); err != nil {
t.Fatal(err)
}
if _, err := conn.Exec(`CREATE TABLE observations (
id INTEGER PRIMARY KEY, transmission_id INTEGER, observer_id TEXT,
observer_name TEXT, direction TEXT, snr REAL, rssi REAL,
score INTEGER, path_json TEXT, timestamp TEXT, raw_hex TEXT)`); err != nil {
t.Fatal(err)
}
if _, err := conn.Exec(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`); err != nil {
t.Fatal(err)
}
if _, err := conn.Exec(`CREATE TABLE nodes (pubkey TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL, last_seen TEXT, first_seen TEXT, frequency REAL)`); err != nil {
t.Fatal(err)
}
if _, err := conn.Exec(`CREATE TABLE schema_version (version INTEGER)`); err != nil {
t.Fatal(err)
}
if _, err := conn.Exec(`INSERT INTO schema_version (version) VALUES (1)`); err != nil {
t.Fatal(err)
}
conn.Close()
db, err := OpenDB(dbPath)
if err != nil {
t.Fatalf("OpenDB: %v", err)
}
defer db.conn.Close()
store := NewPacketStore(db, &PacketStoreConfig{
RetentionHours: 168,
HotStartupHours: 1,
})
// Simulate the #1809 race: packets present, oldestLoaded never set.
store.mu.Lock()
store.packets = append(store.packets, &StoreTx{ID: 1, Hash: "deadbeef", FirstSeen: "2025-01-01T00:00:00Z"})
store.oldestLoaded = ""
store.mu.Unlock()
defer func() {
r := recover()
if r == nil {
t.Fatalf("loadBackgroundChunks must panic when oldestLoaded=\"\" with packets in store (#1809 invariant)")
}
msg := fmt.Sprintf("%v", r)
if msg == "" {
t.Fatalf("panic message must be non-empty")
}
}()
store.loadBackgroundChunks()
}
+23
View File
@@ -421,6 +421,13 @@ type PacketStore struct {
// Read order MUST be: load backgroundLoadDone first; only if true
// is backgroundLoadFailed meaningful.
// 0 = disabled (current behavior). Background loader fills the rest.
//
// IMMUTABILITY: hotStartupHours is set ONCE in NewPacketStore (with
// optional clamp against retentionHours) and is NEVER mutated
// afterward. Readers (LoadChunked, RunStartupLoad,
// loadBackgroundChunks, GetPerfStoreStats) intentionally read it
// without s.mu. Do not add a write path without also adding the
// lock to every reader — see #1809 / #1811.
hotStartupHours float64
backgroundLoadDone atomic.Bool
backgroundLoadFailed atomic.Bool
@@ -1418,6 +1425,22 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
// chunks are merged it rebuilds analytics indexes once. Chunk errors are
// handled by advancing past the failed window so the loop always terminates.
func (s *PacketStore) loadBackgroundChunks() {
// #1809 invariant: oldestLoaded MUST be set before the bg loader
// runs whenever the in-memory store has packets. The original bug
// was a parallel spawn that read oldestLoaded="" and silently
// bailed → coverage gate trips → backgroundLoadFailed=true. Encode
// the precondition here so a future refactor that re-introduces
// the race fails loudly instead of silently shipping the same
// regression. Empty store + empty oldestLoaded is the legitimate
// "empty DB" path and is allowed.
s.mu.RLock()
oldestAtEntry := s.oldestLoaded
packetCountAtEntry := len(s.packets)
s.mu.RUnlock()
if oldestAtEntry == "" && packetCountAtEntry > 0 {
panic(fmt.Sprintf("loadBackgroundChunks: oldestLoaded=\"\" with %d packets in store — LoadChunked must run to completion first (#1809)", packetCountAtEntry))
}
if s.retentionHours <= 0 {
s.backgroundLoadDone.Store(true)
return