mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-07-03 22:51:58 +00:00
fix(#1811/#1809): tighten RunStartupLoad terminal states + #1809 invariant guard
Round-1 fix consolidating reviewer findings (adversarial + Dijkstra + Kent Beck) on PR #1811. Production behavior changes + new unit tests in one commit: Production fixes (chunked_load.go, main.go, store.go): * RunStartupLoad: inline the LoadChunked call (drop superfluous goroutine + channel — direct call is equivalent). * RunStartupLoad: on LoadChunked error, set backgroundLoadFailed=true with a captured error string. Pre-fix: undefined (done=false, failed=false) terminal state. * RunStartupLoad: when hotStartupHours==0 set backgroundLoadDone=true immediately (no bg work needed). Pre-fix: /api/healthz + backgroundLoadComplete stayed false forever on this branch. * RunStartupLoad: log an INFO line between LoadChunked completion and bg-loader start (the post-mortem of #1809 needed this signal). * main.go: stop logging 'background load will start' on the hotStartupHours==0 branch where it was a lie. Branch the log message based on the actual path taken. * loadBackgroundChunks: runtime assertion — panic if oldestLoaded is '' while packets are present. Future refactors that re-introduce the #1809 parallel spawn race fail loudly instead of silently shipping the same coverage-gate regression. * hotStartupHours: documented as immutable post-construction (no locks needed on readers). Tests (runstartup_load_test.go) — codify the new contracts: * TestRunStartupLoad_HotStartupHoursZero_SetsDoneImmediately (B3) * TestRunStartupLoad_LoadChunkedError_SetsFailedTerminal (B2) * TestRunStartupLoad_EmptyDB_SetsDoneTerminal (B4) * TestRunStartupLoad_BgLoaderRunsAfterLoadChunkedSets_OldestLoaded (B5/B6 — behavior-describing name, no issue number in fn name) * TestLoadBackgroundChunks_PanicsOnOldestLoadedEmpty_Invariant (A7) The original TDD red→green chain (c9c782b→f532d08) is preserved intact. Refs #1809, PR #1811.
This commit is contained in:
+44
-22
@@ -106,37 +106,59 @@ func (s *PacketStore) fireChunkCallbacks(rowsThisChunk, totalRows int) {
|
||||
}
|
||||
|
||||
// RunStartupLoad orchestrates the startup load sequence:
|
||||
// 1. start LoadChunked (async)
|
||||
// 2. caller waits for FirstChunkReady to bind the HTTP listener
|
||||
// 3. spawn the background fill loader AFTER LoadChunked completes,
|
||||
// so s.oldestLoaded is set before the bg loader reads it (#1809)
|
||||
// 1. run LoadChunked synchronously (FirstChunkReady is signaled
|
||||
// internally so callers waiting on it in parallel can bind HTTP)
|
||||
// 2. on success, run loadBackgroundChunks synchronously so
|
||||
// s.oldestLoaded is guaranteed set before the bg loader reads
|
||||
// it (#1809).
|
||||
//
|
||||
// chunkSize=0 uses the LoadChunked default. Blocks until LoadChunked
|
||||
// AND any background loader have finished. Callers that want to bind
|
||||
// the HTTP listener at FirstChunkReady should run this in a goroutine
|
||||
// and wait on FirstChunkReady() themselves.
|
||||
// chunkSize=0 uses the LoadChunked default. The function blocks until
|
||||
// LoadChunked AND any background loader have finished. Callers that
|
||||
// want to bind the HTTP listener at FirstChunkReady should run this
|
||||
// in a goroutine and wait on FirstChunkReady() themselves.
|
||||
//
|
||||
// Steady-state contracts post-return:
|
||||
// - LoadChunked error: backgroundLoadFailed=true, backgroundLoadDone
|
||||
// stays false, backgroundLoadErr non-empty. Returns the error.
|
||||
// - hotStartupHours == 0: backgroundLoadDone=true, failed=false
|
||||
// (no background work was needed).
|
||||
// - hotStartupHours > 0 success: terminal state is whatever
|
||||
// loadBackgroundChunks set (done=true on full coverage,
|
||||
// failed=true on partial / chunk errors — see #1690).
|
||||
//
|
||||
// Issue #1809 root cause: previously main.go spawned loadBackgroundChunks
|
||||
// at FirstChunkReady while LoadChunked was still merging the remainder
|
||||
// of the hot window. s.oldestLoaded is only assigned at the end of
|
||||
// LoadChunked (chunked_load.go:330), so the bg loader read "" and
|
||||
// bailed → coverage gate trips → backgroundLoadFailed=true. Gating the
|
||||
// bg loader on LoadChunked completion preserves the FirstChunkReady
|
||||
// HTTP-bind parallelism while ensuring oldestLoaded has a valid floor
|
||||
// when the bg loader starts.
|
||||
// LoadChunked, so the bg loader read "" and bailed → coverage gate
|
||||
// trips → backgroundLoadFailed=true. Running the bg loader after
|
||||
// LoadChunked returns preserves the FirstChunkReady HTTP-bind
|
||||
// parallelism while ensuring oldestLoaded has a valid floor when the
|
||||
// bg loader starts.
|
||||
func (s *PacketStore) RunStartupLoad(chunkSize int) error {
|
||||
loadErrCh := make(chan error, 1)
|
||||
go func() {
|
||||
loadErrCh <- s.LoadChunked(chunkSize)
|
||||
}()
|
||||
// Block until LoadChunked returns. Callers that want to bind their
|
||||
// HTTP listener earlier can wait on FirstChunkReady() in parallel.
|
||||
if err := <-loadErrCh; err != nil {
|
||||
if err := s.LoadChunked(chunkSize); err != nil {
|
||||
// Pick a terminal state on the error path so /api/healthz +
|
||||
// backgroundLoadComplete don't stay undefined (done=false,
|
||||
// failed=false) forever.
|
||||
s.bgErrMu.Lock()
|
||||
s.backgroundLoadErr = fmt.Sprintf("LoadChunked failed: %v", err)
|
||||
s.bgErrMu.Unlock()
|
||||
s.backgroundLoadFailed.Store(true)
|
||||
return err
|
||||
}
|
||||
if s.hotStartupHours > 0 {
|
||||
s.loadBackgroundChunks()
|
||||
if s.hotStartupHours <= 0 {
|
||||
// No bg work required → terminal steady state is done=true,
|
||||
// failed=false. Without this the healthz probe would see
|
||||
// backgroundLoadComplete=false indefinitely.
|
||||
s.backgroundLoadDone.Store(true)
|
||||
s.backgroundLoadProgress.Store(100)
|
||||
return nil
|
||||
}
|
||||
// INFO signal between LoadChunked completion and the bg loader
|
||||
// kick-off. The post-mortem of #1809 needed exactly this line to
|
||||
// confirm the bg loader actually started after oldestLoaded was set.
|
||||
log.Printf("[store] LoadChunked complete (oldestLoaded=%q) — starting background fill loader (retentionHours=%gh, hotStartupHours=%gh)",
|
||||
s.oldestLoaded, s.retentionHours, s.hotStartupHours)
|
||||
s.loadBackgroundChunks()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
+4
-2
@@ -242,8 +242,10 @@ func main() {
|
||||
log.Printf("[store] RunStartupLoad completed before first-chunk signal (empty DB?)")
|
||||
}
|
||||
if store.hotStartupHours > 0 {
|
||||
log.Printf("[store] background load will start after LoadChunked completes: filling retentionHours=%gh from hotStartupHours=%gh",
|
||||
store.retentionHours, store.hotStartupHours)
|
||||
log.Printf("[store] hot-startup window configured: hotStartupHours=%gh, retentionHours=%gh (background fill loader runs after LoadChunked succeeds — see RunStartupLoad)",
|
||||
store.hotStartupHours, store.retentionHours)
|
||||
} else {
|
||||
log.Printf("[store] hot-startup disabled (hotStartupHours=0) — no background fill loader will run")
|
||||
}
|
||||
go func() {
|
||||
if err := <-loadErrCh; err != nil {
|
||||
|
||||
@@ -0,0 +1,248 @@
|
||||
package main
|
||||
|
||||
// Tests for RunStartupLoad branch behavior and #1809 invariants
|
||||
// (PR #1811 round-1 followups B2/B3/B4/B5).
|
||||
//
|
||||
// The pre-#1811 RunStartupLoad left several steady states undefined:
|
||||
// * hotStartupHours == 0 → backgroundLoadDone stayed false forever
|
||||
// * LoadChunked error → both done & failed stayed false
|
||||
// * empty DB + no bg work needed → backgroundLoadDone stayed false
|
||||
//
|
||||
// These tests codify the post-#1811 contract:
|
||||
// * LoadChunked error → backgroundLoadFailed=true, done=false
|
||||
// * hotStartupHours == 0 → backgroundLoadDone=true, failed=false,
|
||||
// bg loader NOT called
|
||||
// * empty DB + hot window → backgroundLoadDone reflects coverage
|
||||
// (1.0 on empty DB → done=true, failed=false)
|
||||
// * call ordering inside RunStartupLoad: LoadChunked completes
|
||||
// before loadBackgroundChunks executes (so oldestLoaded is set)
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestRunStartupLoad_HotStartupHoursZero_SetsDoneImmediately covers B3:
|
||||
// when hotStartupHours == 0 the bg loader has no work to do; healthz
|
||||
// must NOT be stuck on backgroundLoadComplete=false.
|
||||
func TestRunStartupLoad_HotStartupHoursZero_SetsDoneImmediately(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
dbPath := filepath.Join(dir, "test.db")
|
||||
nowSec := time.Now().UTC().Unix()
|
||||
createTestDBWithLastSeen(t, dbPath, 10, 1, nowSec,
|
||||
30*time.Minute, 30*time.Minute)
|
||||
|
||||
db, err := OpenDB(dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("OpenDB: %v", err)
|
||||
}
|
||||
defer db.conn.Close()
|
||||
|
||||
store := NewPacketStore(db, &PacketStoreConfig{
|
||||
RetentionHours: 168,
|
||||
HotStartupHours: 0, // disable hot window → bg loader must not run
|
||||
})
|
||||
|
||||
if err := store.RunStartupLoad(500); err != nil {
|
||||
t.Fatalf("RunStartupLoad: %v", err)
|
||||
}
|
||||
if !store.backgroundLoadDone.Load() {
|
||||
t.Fatalf("backgroundLoadDone must be true when hotStartupHours=0 (no bg work needed)")
|
||||
}
|
||||
if store.backgroundLoadFailed.Load() {
|
||||
t.Fatalf("backgroundLoadFailed must be false on the no-bg-work path; got error=%q",
|
||||
store.BackgroundLoadError())
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunStartupLoad_LoadChunkedError_SetsFailedTerminal covers B2:
|
||||
// when LoadChunked errors, the steady state must be terminal
|
||||
// (failed=true) — not the pre-fix (done=false, failed=false).
|
||||
func TestRunStartupLoad_LoadChunkedError_SetsFailedTerminal(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
dbPath := filepath.Join(dir, "test.db")
|
||||
nowSec := time.Now().UTC().Unix()
|
||||
createTestDBWithLastSeen(t, dbPath, 5, 1, nowSec,
|
||||
30*time.Minute, 30*time.Minute)
|
||||
|
||||
db, err := OpenDB(dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("OpenDB: %v", err)
|
||||
}
|
||||
// Close the underlying connection to force LoadChunked to fail on
|
||||
// its very first query. We're explicitly verifying the failure path
|
||||
// terminal state, not the success path.
|
||||
_ = db.conn.Close()
|
||||
|
||||
store := NewPacketStore(db, &PacketStoreConfig{
|
||||
RetentionHours: 168,
|
||||
HotStartupHours: 1,
|
||||
})
|
||||
|
||||
loadErr := store.RunStartupLoad(500)
|
||||
if loadErr == nil {
|
||||
t.Fatalf("RunStartupLoad must return an error when LoadChunked fails")
|
||||
}
|
||||
if !store.backgroundLoadFailed.Load() {
|
||||
t.Fatalf("backgroundLoadFailed must be true after LoadChunked error (terminal state)")
|
||||
}
|
||||
if store.backgroundLoadDone.Load() {
|
||||
t.Fatalf("backgroundLoadDone must remain false on LoadChunked error")
|
||||
}
|
||||
if store.BackgroundLoadError() == "" {
|
||||
t.Fatalf("BackgroundLoadError must be non-empty after LoadChunked failure")
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunStartupLoad_EmptyDB_SetsDoneTerminal covers B4: empty DB with
|
||||
// hot window > 0 — oldestLoaded stays "" because there are no packets.
|
||||
// loadBackgroundChunks must reach its coverage block (totalInDB==0 →
|
||||
// ratio=1.0) and set done=true rather than leaving the store stuck.
|
||||
func TestRunStartupLoad_EmptyDB_SetsDoneTerminal(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
dbPath := filepath.Join(dir, "test.db")
|
||||
createTestDBWithLastSeen(t, dbPath, 0, 0, time.Now().UTC().Unix(),
|
||||
30*time.Minute, 30*time.Minute)
|
||||
|
||||
db, err := OpenDB(dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("OpenDB: %v", err)
|
||||
}
|
||||
defer db.conn.Close()
|
||||
|
||||
store := NewPacketStore(db, &PacketStoreConfig{
|
||||
RetentionHours: 168,
|
||||
HotStartupHours: 1,
|
||||
})
|
||||
|
||||
if err := store.RunStartupLoad(500); err != nil {
|
||||
t.Fatalf("RunStartupLoad on empty DB: %v", err)
|
||||
}
|
||||
if !store.backgroundLoadDone.Load() {
|
||||
t.Fatalf("backgroundLoadDone must be true after empty-DB load (nothing to load == complete)")
|
||||
}
|
||||
if store.backgroundLoadFailed.Load() {
|
||||
t.Fatalf("backgroundLoadFailed must be false on empty DB; got %q",
|
||||
store.BackgroundLoadError())
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunStartupLoad_BgLoaderRunsAfterLoadChunkedSets_OldestLoaded
|
||||
// covers B5/B6: assert the in-process call ordering inside
|
||||
// RunStartupLoad. The OnChunkLoaded hook fires from LoadChunked; the
|
||||
// loadBackgroundChunks panic guard fires only if oldestLoaded=="" at
|
||||
// entry. So observing the chunk callback strictly before the bg loader
|
||||
// (which is exercised via the loop continuing without panic) is the
|
||||
// minimum guarantee. If a future refactor re-introduces the parallel
|
||||
// spawn pattern, the runtime assertion in loadBackgroundChunks will
|
||||
// trip and this test will fail.
|
||||
func TestRunStartupLoad_BgLoaderRunsAfterLoadChunkedSets_OldestLoaded(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
dbPath := filepath.Join(dir, "test.db")
|
||||
nowSec := time.Now().UTC().Unix()
|
||||
createTestDBWithLastSeen(t, dbPath, 50, 1, nowSec,
|
||||
30*time.Minute, 30*time.Minute)
|
||||
|
||||
db, err := OpenDB(dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("OpenDB: %v", err)
|
||||
}
|
||||
defer db.conn.Close()
|
||||
|
||||
store := NewPacketStore(db, &PacketStoreConfig{
|
||||
RetentionHours: 168,
|
||||
HotStartupHours: 1,
|
||||
})
|
||||
|
||||
// Hook: LoadChunked fires OnChunkLoaded after each chunk merge.
|
||||
// We record whether it fired before RunStartupLoad returned. The
|
||||
// runtime assertion in loadBackgroundChunks ensures the bg loader
|
||||
// observes a non-empty oldestLoaded; if a future refactor parallels
|
||||
// the bg-loader spawn with LoadChunked, that assertion panics.
|
||||
chunkSeen := false
|
||||
store.OnChunkLoaded(func(rowsThisChunk, totalRows int) {
|
||||
chunkSeen = true
|
||||
})
|
||||
|
||||
if err := store.RunStartupLoad(500); err != nil {
|
||||
t.Fatalf("RunStartupLoad: %v", err)
|
||||
}
|
||||
if !chunkSeen {
|
||||
t.Fatalf("LoadChunked OnChunkLoaded did not fire — chunk loop did not execute before bg loader")
|
||||
}
|
||||
if store.oldestLoaded == "" {
|
||||
t.Fatalf("oldestLoaded is empty after RunStartupLoad — bg loader would have read \"\" and bailed")
|
||||
}
|
||||
}
|
||||
|
||||
// TestLoadBackgroundChunks_PanicsOnOldestLoadedEmpty_Invariant covers the
|
||||
// runtime assertion (A7). Manually populate s.packets without setting
|
||||
// oldestLoaded and call loadBackgroundChunks directly — the panic guard
|
||||
// must fire so future refactors cannot silently re-introduce the
|
||||
// #1809 race.
|
||||
func TestLoadBackgroundChunks_PanicsOnOldestLoadedEmpty_Invariant(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
dbPath := filepath.Join(dir, "test.db")
|
||||
conn, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Create the bare minimum schema so OpenDB succeeds; we don't care
|
||||
// about row count — only the guard at the top of the function.
|
||||
if _, err := conn.Exec(`CREATE TABLE transmissions (
|
||||
id INTEGER PRIMARY KEY, raw_hex TEXT, hash TEXT, first_seen TEXT,
|
||||
route_type INTEGER, payload_type INTEGER, payload_version INTEGER,
|
||||
decoded_json TEXT, last_seen INTEGER NOT NULL DEFAULT 0)`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := conn.Exec(`CREATE TABLE observations (
|
||||
id INTEGER PRIMARY KEY, transmission_id INTEGER, observer_id TEXT,
|
||||
observer_name TEXT, direction TEXT, snr REAL, rssi REAL,
|
||||
score INTEGER, path_json TEXT, timestamp TEXT, raw_hex TEXT)`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := conn.Exec(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := conn.Exec(`CREATE TABLE nodes (pubkey TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL, last_seen TEXT, first_seen TEXT, frequency REAL)`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := conn.Exec(`CREATE TABLE schema_version (version INTEGER)`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := conn.Exec(`INSERT INTO schema_version (version) VALUES (1)`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
conn.Close()
|
||||
|
||||
db, err := OpenDB(dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("OpenDB: %v", err)
|
||||
}
|
||||
defer db.conn.Close()
|
||||
|
||||
store := NewPacketStore(db, &PacketStoreConfig{
|
||||
RetentionHours: 168,
|
||||
HotStartupHours: 1,
|
||||
})
|
||||
// Simulate the #1809 race: packets present, oldestLoaded never set.
|
||||
store.mu.Lock()
|
||||
store.packets = append(store.packets, &StoreTx{ID: 1, Hash: "deadbeef", FirstSeen: "2025-01-01T00:00:00Z"})
|
||||
store.oldestLoaded = ""
|
||||
store.mu.Unlock()
|
||||
|
||||
defer func() {
|
||||
r := recover()
|
||||
if r == nil {
|
||||
t.Fatalf("loadBackgroundChunks must panic when oldestLoaded=\"\" with packets in store (#1809 invariant)")
|
||||
}
|
||||
msg := fmt.Sprintf("%v", r)
|
||||
if msg == "" {
|
||||
t.Fatalf("panic message must be non-empty")
|
||||
}
|
||||
}()
|
||||
store.loadBackgroundChunks()
|
||||
}
|
||||
@@ -421,6 +421,13 @@ type PacketStore struct {
|
||||
// Read order MUST be: load backgroundLoadDone first; only if true
|
||||
// is backgroundLoadFailed meaningful.
|
||||
// 0 = disabled (current behavior). Background loader fills the rest.
|
||||
//
|
||||
// IMMUTABILITY: hotStartupHours is set ONCE in NewPacketStore (with
|
||||
// optional clamp against retentionHours) and is NEVER mutated
|
||||
// afterward. Readers (LoadChunked, RunStartupLoad,
|
||||
// loadBackgroundChunks, GetPerfStoreStats) intentionally read it
|
||||
// without s.mu. Do not add a write path without also adding the
|
||||
// lock to every reader — see #1809 / #1811.
|
||||
hotStartupHours float64
|
||||
backgroundLoadDone atomic.Bool
|
||||
backgroundLoadFailed atomic.Bool
|
||||
@@ -1418,6 +1425,22 @@ func (s *PacketStore) loadChunk(from, to time.Time) error {
|
||||
// chunks are merged it rebuilds analytics indexes once. Chunk errors are
|
||||
// handled by advancing past the failed window so the loop always terminates.
|
||||
func (s *PacketStore) loadBackgroundChunks() {
|
||||
// #1809 invariant: oldestLoaded MUST be set before the bg loader
|
||||
// runs whenever the in-memory store has packets. The original bug
|
||||
// was a parallel spawn that read oldestLoaded="" and silently
|
||||
// bailed → coverage gate trips → backgroundLoadFailed=true. Encode
|
||||
// the precondition here so a future refactor that re-introduces
|
||||
// the race fails loudly instead of silently shipping the same
|
||||
// regression. Empty store + empty oldestLoaded is the legitimate
|
||||
// "empty DB" path and is allowed.
|
||||
s.mu.RLock()
|
||||
oldestAtEntry := s.oldestLoaded
|
||||
packetCountAtEntry := len(s.packets)
|
||||
s.mu.RUnlock()
|
||||
if oldestAtEntry == "" && packetCountAtEntry > 0 {
|
||||
panic(fmt.Sprintf("loadBackgroundChunks: oldestLoaded=\"\" with %d packets in store — LoadChunked must run to completion first (#1809)", packetCountAtEntry))
|
||||
}
|
||||
|
||||
if s.retentionHours <= 0 {
|
||||
s.backgroundLoadDone.Store(true)
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user