Compare commits

...

6 Commits

Author SHA1 Message Date
corescope-bot caab3ae57b docs(migrations): explicit warning on MaxOpenConns=1 live-write blocking
Address round-1 review on PR #1541 (dijkstra #5). Rewrite the
'Concurrency model' section of MIGRATIONS.md:

- Soft 'live-ingest serialization caveat' replaced with explicit
  WARNING: with MaxOpenConns=1 a long fn blocks ALL live ingest
  writes for its duration. Estimate impact before approving any
  migration that takes >5 seconds at production scale.
- Document the two-layer in-process dedup (atomic INSERT ON CONFLICT
  RETURNING + sync.Map in-flight guard) and the cross-process
  guarantee provided by SQLite's file-level write lock.
- Document the terminal-status retry+CRITICAL log behavior.
- Reference the new tests
  (SameNameConcurrent_FnRunsOnce, CrossProcessSerialized) as the
  pinned contracts.
2026-06-03 21:38:09 +00:00
corescope-bot afc0ed10bf test(ingestor): tighten async-migration tests against new concurrency invariants
Address round-1 review on PR #1541. Test quality fixes:

- TestRunAsyncMigration_PendingThenDone now wraps the RunAsyncMigration
  call in a goroutine + select. A synchronous implementation that
  blocked on fn (the regression class this test is meant to gate)
  would never deliver to resultCh because fn waits on <-release.
  Old shape would silently pass against a sync impl. (kb #1)

- ConcurrentSameNameSerialized → renamed
  SameNameConcurrent_FnRunsOnce. Old assertion calls ∈ [1..5] was
  tautological (every broken impl satisfies it). New assertion is
  calls == 1 AND every caller error is nil. Adds a chan struct{}
  start-barrier so all 5 goroutines wake at the same instant,
  maximising the race window. Captures return errors instead of
  discarding them. (adv #3, kb #2, dijkstra #4)

- NilDBReturnsError pins the nil-db guard contract (RunAsyncMigration
  AND AsyncMigrationStatus return ErrNilStoreDB, no panic).
  (kb should-fix)

- CtxCancelRecorded pins graceful-shutdown contract: fn that returns
  ctx.Err() ⇒ status 'failed', error column contains 'context
  canceled'. (kb should-fix)

- CrossProcessSerialized verifies dijkstra #2: two *sql.DB handles
  to the same file, both calling RunAsyncMigration concurrently, fn
  runs exactly once. SQLite's write lock + atomic INSERT ON CONFLICT
  RETURNING handle cross-process dedup without the in-process
  sync.Map. (dijkstra #2)
2026-06-03 21:38:03 +00:00
corescope-bot 4976fb418d fix(ingestor): async_migration concurrency + terminal-status durability
Address round-1 review on PR #1541. Concurrency fixes:

- Atomic INSERT ... ON CONFLICT(name) DO UPDATE ... RETURNING replaces
  the SELECT-then-INSERT/UPDATE TOCTOU race. Two concurrent callers
  hitting ErrNoRows could both INSERT and the loser would hit
  UNIQUE constraint failed: _async_migrations.name. Now a single
  statement atomically reserves the slot OR fetches the existing
  status. (adv #1, kb should-fix)

- In-process sync.Map guard prevents same-name re-entry from launching
  fn twice in parallel within one process. Without this, a second
  caller that sees the first caller's fresh 'pending_async' row would
  interpret it as 'crashed prior boot, retry' and spin a duplicate
  goroutine. defer-clear inside the goroutine guarantees no leak on
  panic. (adv #2)

- recordTerminalStatus retries the final UPDATE with backoff
  (100ms / 500ms / 2s) and logs CRITICAL on persistent failure so a
  row stuck in pending_async because of a transient write error is
  visible in ops instead of silently re-running on every boot.
  (dijkstra #1)

- Public ErrNilStoreDB + nil-db guards in RunAsyncMigration and
  AsyncMigrationStatus turn a nil-deref into a clear sentinel error.
  (kb should-fix)

- Godoc on RunAsyncMigration now documents caller's obligations on
  fn: idempotency (may run >1 time across crashes/retries),
  ctx-cancellation responsiveness, and the
  blocks-all-live-writes-because-MaxOpenConns=1 caveat.
  (dijkstra #3)

Public signature unchanged.
2026-06-03 21:37:54 +00:00
clawbot afb860a5d2 test(ingestor): expand async-migration coverage + fix preflight bad-fixture
Self-review findings:

1. The preflight 'bad' fixture (testdata/preflight-migrations/bad_sync_migration.go)
   contained the literal string 'RunAsyncMigration' in its descriptive comments.
   The gate script (check-async-migrations.sh) greps a 25-line window above the
   migration line for that identifier as an OK signal, so the fixture FALSELY
   PASSED when it was supposed to be the negative-test sample. Rewrote the
   comment to refer to the helper obliquely ('the async-migration helper').
   Verified: bad fixture now exits 1, good fixture still exits 0.

2. async_migration_test.go covered only PendingThenDone. Added:
   - PanicCapture: fn panic -> status='failed', error column populated
   - IdempotentSecondCallNoOps: second call after done short-circuits
   - RestartSafetyFailedIsRetried: seeded 'failed' row -> re-runs, error cleared
   - RestartSafetyPendingIsRetried: seeded 'pending_async' row -> re-runs
   - FnErrorRecorded: non-panic error path -> status='failed' + error captured
   - ConcurrentSameNameSerialized: bounded fn-call count under N concurrent callers

3. MIGRATIONS.md: added 'Concurrency model' section documenting the single-process
   / single-writer reality (MaxOpenConns=1 + 5s busy_timeout), the cross-process
   non-concern, and the live-ingest serialization implication for long fn. Added
   'Scale budgets' section with the <30s target, the #1483 worked example, and
   guidance for migrations that legitimately exceed the budget.

No public API changes to RunAsyncMigration.
2026-06-03 20:56:15 +00:00
clawbot 38354f3283 feat(ingestor): async migration helper + retro convert obs_observer_ts_idx_v1
Adds Store.RunAsyncMigration(ctx, name, fn) — registers the migration
as pending_async in a new _async_migrations bookkeeping table, returns
to the caller immediately, runs fn in a goroutine on the shared
backfill WaitGroup, transitions status to done/failed on completion.
Idempotent (done short-circuits; pending_async/failed are retried on
next boot).

Retroactive fix: obs_observer_ts_idx_v1 (the composite index build
that pinned the v3.8.3 first boot for minutes on Cascadia's 80K+ /
prod's 1.9M+ observations) is now scheduled via the new helper from
OpenStore() so the ingestor accepts packets immediately.

MIGRATIONS.md documents the annotation convention enforced by the
pr-preflight gate (~/.openclaw/skills/pr-preflight/scripts/
check-async-migrations.sh): every new CREATE INDEX / ALTER TABLE /
data-rewrite must EITHER be wrapped in RunAsyncMigration, OR carry a
// PREFLIGHT: async=true reason="..." comment directly above, OR
include a PREFLIGHT-MIGRATION-SCALE: <30s N=<scale> line in the PR
body.

testdata/preflight-migrations/ provides bad-and-good fixture
migrations the gate script can be unit-tested against.

Test: TestRunAsyncMigration_PendingThenDone — pins the contract
(pending_async immediately, RunAsyncMigration does not block on fn,
status transitions to done). The red-commit stub satisfied the build
but failed the assertion; this commit makes it green.
2026-06-03 20:27:39 +00:00
clawbot 2c6744ccea test(ingestor): red — RunAsyncMigration pending_async→done contract
Pins the recurring 'sync migration on large table blocks startup'
regression class (#791, #1483). Asserts:
  1. RunAsyncMigration registers the name as pending_async immediately.
  2. Returns without blocking on fn.
  3. Status transitions to done after fn completes.

Ships with a STUB RunAsyncMigration / AsyncMigrationStatus so the test
compiles + runs to assertion failure (not build failure). Green commit
follows with the real implementation + retroactive #1483 conversion +
docs.
2026-06-03 20:21:41 +00:00
6 changed files with 963 additions and 7 deletions
+168
View File
@@ -0,0 +1,168 @@
# MIGRATIONS — async vs sync policy
CoreScope's ingestor applies schema/data migrations inline at boot in
`cmd/ingestor/db.go`. Every migration that runs synchronously blocks the
ingestor from accepting packets until it returns. On a dev DB that's
milliseconds; at prod scale (1.9M+ observations, 80K+ adverts, 2600+ nodes
on Cascadia) it can pin the boot for minutes and trigger restart loops —
the "upgrade broke prod" failure class (#791, #1483, and others).
## The rule
**Any new `CREATE INDEX`, `ALTER TABLE`, or data-rewriting `UPDATE`/`DELETE`
in a migration file MUST do ONE of the following:**
### Option 1 — Run via `Store.RunAsyncMigration` (preferred for backfills)
```go
// Scheduled in OpenStore() AFTER the *Store is constructed.
if err := s.RunAsyncMigration(ctx, "my_migration_v1",
func(ctx context.Context, db *sql.DB) error {
_, err := db.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS ...`)
return err
}); err != nil {
log.Printf("[migration/async] scheduling failed: %v", err)
}
```
- The migration is recorded as `pending_async` in the `_async_migrations`
table **immediately** — the ingestor boots and starts ingesting.
- `fn` runs in a goroutine; the WaitGroup is shared with the rest of the
ingestor (`Store.WaitForAsyncMigrations()` waits for everything).
- On success the row flips to `done`; on error/panic to `failed` with the
error message captured.
- Idempotent: rows in `done` state short-circuit; `failed`/`pending_async`
rows are retried on the next boot.
Reference implementations: `Store.BackfillPathJSONAsync` (path_json
backfill) and the converted `obs_observer_ts_idx_v1` index build in
`OpenStore`.
### Option 2 — Annotate as preflight-cheap
Some migrations are genuinely cheap at any scale (e.g. `ALTER TABLE ADD
COLUMN`, `CREATE INDEX` on a table you know is bounded to a few thousand
rows). Annotate the migration block with a comment **on the line
immediately above the migration block** so the preflight gate recognises
the opt-out:
```go
// PREFLIGHT: async=true reason="ALTER ADD COLUMN — O(1) sqlite operation"
if r := db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'foo_v1'"); ...
```
The reason MUST be a real one-line justification you can defend in
review. "It's fine" is not a reason.
### Option 3 — Opt out per PR
If the migration is genuinely safe and you don't want to add an inline
annotation, put a single line in the PR body:
```
PREFLIGHT-MIGRATION-SCALE: <30s N=80K verified on Cascadia staging snapshot
```
This must include both `<30s` and `N=<some scale>` so a reviewer can
challenge the measurement.
## The gate
`~/.openclaw/skills/pr-preflight/scripts/check-async-migrations.sh` runs
on every PR via the preflight orchestrator. It greps the diff for new or
modified migration blocks (files matching `cmd/ingestor/db.go`,
`cmd/ingestor/maintenance.go`, `internal/dbschema/**`, `**/migrations/**`,
`**/*.sql`, plus any Go file touching `CREATE INDEX` / `ALTER TABLE` /
`CREATE UNIQUE INDEX`). For each hit it requires one of the three
opt-outs above. Hard-fail (exit 1) — no warning-only mode.
## Concurrency model
CoreScope runs **one ingestor process** per deployment (`cmd/ingestor/`,
single binary, single `*Store`). There is no cluster mode, no leader
election, no second writer. SQLite is opened with `SetMaxOpenConns(1)`
and a 5s `busy_timeout`; all writes (live MQTT ingest + async migration
goroutines + maintenance backfills) serialize through the one connection
in a single process.
What this means for async migrations:
- **Within a single process**, concurrent `RunAsyncMigration(name=X)`
callers are deduplicated by two layers:
1. An atomic `INSERT ... ON CONFLICT(name) DO UPDATE ... RETURNING`
replaces the previous SELECT-then-INSERT pattern, eliminating the
TOCTOU race where two callers both saw `ErrNoRows` and the loser
hit `UNIQUE constraint failed`.
2. An in-process `sync.Map` guard tracks which names currently have a
goroutine executing `fn` IN THIS PROCESS. A second caller that
sees `pending_async` (e.g. because the first caller just inserted
it) short-circuits via the in-flight map and does NOT launch a
duplicate goroutine.
See `TestRunAsyncMigration_SameNameConcurrent_FnRunsOnce` for the
contract: 5 concurrent callers ⇒ fn runs exactly once, all 5 return
`nil`.
- **Across processes** (two ingestor instances against the same DB —
not a supported deployment shape, but verified for defense-in-depth):
SQLite's file-level write lock serializes the atomic INSERT. The
losing process's `ON CONFLICT` sees the winner's `pending_async` row
and short-circuits. The in-process `sync.Map` does NOT cross
processes, so this property comes solely from SQLite's write
serialization. See `TestRunAsyncMigration_CrossProcessSerialized` for
the pinned contract.
- **Terminal status durability.** The final `UPDATE` that flips the row
to `done` / `failed` is retried with backoff (3 attempts, 100ms /
500ms / 2s) to survive transient write contention. If all retries
fail, a `[async-migration] CRITICAL: cannot record terminal status…`
line is logged so the stuck row is visible in ops aggregation —
otherwise the migration would silently re-run on every subsequent
boot.
- **WARNING: `fn` blocks ALL live ingest writes for its duration.**
Because `MaxOpenConns=1`, a long `CREATE INDEX` / `UPDATE` /
backfill inside `fn` holds the only write connection until it
yields. Estimate impact before approving any migration that takes
>5 seconds at production scale. For multi-minute work the canonical
pattern is chunked / batched fn implementations with inter-batch
`time.Sleep` so live writers can interleave (see
`BackfillPathJSONAsync` for a reference implementation).
- **`fn` may run more than once across boots.** Crashed-mid-fn rows
are reset to `pending_async` and retried on next boot. `fn` MUST be
idempotent (`CREATE INDEX IF NOT EXISTS`, `INSERT ... ON CONFLICT DO
NOTHING`, etc.). See `RunAsyncMigration` godoc for the full caller
contract.
## Scale budgets
Per-migration target: **<30s** at current prod scale (Cascadia: ~2,600
nodes, ~80K observations; previous prod snapshot: ~1.9M observations).
Worked example (#1483, `obs_observer_ts_idx_v1`): composite index build
on `observations(observer_idx, timestamp)`. At ~1.9M rows the sync build
pinned ingestor boot for several minutes → restart loop. Converted to
async via `RunAsyncMigration` in `OpenStore` so boot returns immediately
and the index materializes in the background; the existing `_migrations`
short-circuit at the top of the migration block ensures DBs that already
completed the sync v3.8.3 build do NOT re-run it through the goroutine
path on subsequent boots.
If you cannot meet the <30s budget, document the expected upper bound
and operator runbook expectation (e.g. "index build expected ~10 min on
a 5M-row table; ingestor remains responsive; monitor via
`SELECT status, error FROM _async_migrations WHERE name = ...`").
## Why this exists
Pattern that keeps repeating:
1. Author writes `CREATE INDEX foo ON observations(...)` in a migration.
2. Local dev DB has ~100 rows. Migration returns in 1ms. CI is green.
3. Reviewer focuses on plan correctness, not scale.
4. Ship.
5. Prod boots, sqlite scans 1.9M rows, the ingestor sits at `[migration]
Adding index...` for 8 minutes, healthcheck times out, container
restarts, loops.
6. Operator pages. Hotfix. Apology.
The gate doesn't try to detect table size (undecidable from a diff). It
enforces **annotation discipline**: every author who adds a migration
must consciously decide which bucket it falls into and write that down.
That is the cheapest possible intervention that breaks the cycle.
+263
View File
@@ -0,0 +1,263 @@
// Async migration helper — runs schema/backfill work that may take minutes on
// large prod tables WITHOUT blocking ingestor startup.
//
// MIGRATION ANNOTATION CONVENTION (read this before touching migrations):
//
// Sync schema/data migrations (CREATE INDEX, ALTER TABLE, UPDATE ... WHERE)
// that run inline during OpenStore() block the ingestor from accepting
// packets until they finish. On an empty dev DB they return in milliseconds;
// at prod scale (1.9M+ observations, 80K+ adverts) they can pin the boot
// for minutes and trigger restart loops. This regression class has bitten us
// repeatedly (#791 resolved_path backfill, #1483 obs_observer_ts_idx_v1).
//
// ANY new CREATE INDEX / ALTER TABLE / data-rewrite migration MUST EITHER:
// 1. Run via Store.RunAsyncMigration(...) below (preferred for backfills
// and any work that may touch >1K rows). The migration is recorded as
// `pending_async` immediately, returns to the caller (boot proceeds),
// and completes in a goroutine. Status flips to `done` (or `failed`
// with an error message) when fn returns.
// 2. Carry the preflight annotation comment immediately above the
// migration block, e.g.
// // PREFLIGHT: async=true reason="<one-line justification>"
// Use this for migrations that are genuinely cheap at any scale
// (e.g. ALTER TABLE ADD COLUMN, CREATE INDEX on a known-bounded
// table). The annotation is grepped by
// ~/.openclaw/skills/pr-preflight/scripts/check-async-migrations.sh
// — its absence on a touched migration block is a hard-fail gate.
//
// See MIGRATIONS.md in the repo root for the full policy and examples.
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"log"
"sync"
"time"
)
// ErrNilStoreDB is returned by RunAsyncMigration when called on a *Store
// whose db handle is nil. This is a programmer error (the Store was never
// opened, or it was closed) but we surface it as a normal error rather than
// panicking so a misconfigured caller can degrade gracefully.
var ErrNilStoreDB = errors.New("async migration: store has nil db")
// inflightAsyncMigrations tracks which migration names currently have a
// goroutine executing fn IN THIS PROCESS. The SQL row's `pending_async`
// status alone is not sufficient — a row may be `pending_async` because
// (a) the goroutine is actively running right now in this process, or
// (b) a previous boot crashed mid-fn and left the row stuck.
// Case (a) must NOT re-launch fn (would run twice in parallel and corrupt
// state). Case (b) must re-launch fn (otherwise the migration is stuck
// forever). The in-process guard is the discriminator: if a name is in the
// map, case (a); if not, case (b).
//
// Cross-process serialization (a second ingestor instance against the same
// DB) is handled by SQLite's write lock + the atomic INSERT ON CONFLICT
// pattern in RunAsyncMigration — see TestRunAsyncMigration_CrossProcess.
var inflightAsyncMigrations sync.Map // map[string]struct{}
// ensureAsyncMigrationsTable creates the bookkeeping table used by
// RunAsyncMigration / AsyncMigrationStatus. Idempotent.
func ensureAsyncMigrationsTable(db *sql.DB) error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS _async_migrations (
name TEXT PRIMARY KEY,
status TEXT NOT NULL, -- pending_async | done | failed
started_at TEXT NOT NULL DEFAULT (datetime('now')),
ended_at TEXT,
error TEXT
)
`)
return err
}
// RunAsyncMigration registers `name` as a pending async migration and
// schedules `fn` to run in a background goroutine. It returns to the caller
// immediately so the ingestor can keep booting.
//
// Caller's obligations on `fn`:
//
// - `fn` MUST be idempotent. Across crashes, restarts, and retries (and
// in older buggy versions of this helper, possibly within a single
// process), `fn` can be invoked more than once for the same name.
// Always use `IF NOT EXISTS` / `INSERT ... ON CONFLICT DO NOTHING` /
// equivalent constructs. Never rely on "this only ever runs once".
// - `fn` SHOULD respect ctx cancellation. The ctx passed in is the
// ctx passed to RunAsyncMigration; if the caller cancels it (e.g.
// graceful shutdown), `fn` is expected to return `ctx.Err()` promptly.
// When `fn` returns a context cancellation error the row is marked
// `failed` so a future boot will retry.
// - `fn` will hold the single SQLite write connection for its entire
// duration. The ingestor opens SQLite with `SetMaxOpenConns(1)`,
// which means any `fn` that issues a write blocks ALL live ingest
// writes until it yields. For multi-minute work, use chunked /
// batched patterns with `time.Sleep` between batches (see
// `BackfillPathJSONAsync` for the canonical pattern).
//
// Contract (pinned by async_migration_test.go):
// - status is `pending_async` IMMEDIATELY after this returns.
// - fn runs in a goroutine; on success status becomes `done`, on error
// or panic status becomes `failed` and the error is recorded.
// - Idempotent at the call site: if a row with the same name already
// exists in `done` state, fn is NOT re-run. If a goroutine for this
// name is already in flight IN THIS PROCESS, fn is NOT re-run
// (the in-process guard short-circuits). If in `failed` or
// `pending_async` (from a crashed prior boot) state, fn IS
// re-scheduled.
// - Concurrency-safe: concurrent calls with the same name execute fn
// AT MOST ONCE. See TestRunAsyncMigration_SameNameConcurrent_FnRunsOnce.
// - The caller's WaitGroup tracks the goroutine so tests/shutdown can
// wait via Store.WaitForAsyncMigrations().
func (s *Store) RunAsyncMigration(ctx context.Context, name string, fn func(context.Context, *sql.DB) error) error {
if s == nil || s.db == nil {
return ErrNilStoreDB
}
if err := ensureAsyncMigrationsTable(s.db); err != nil {
return fmt.Errorf("ensure _async_migrations: %w", err)
}
// Atomic INSERT-or-fetch via SQLite's ON CONFLICT ... DO UPDATE ...
// RETURNING. SQLite returns the post-statement row state, so:
// - On INSERT: returns 'pending_async' (the just-inserted value).
// - On CONFLICT: the DO UPDATE is a no-op (SET status=status),
// and RETURNING gives the EXISTING status.
// This collapses the previous SELECT-then-INSERT/UPDATE into a
// single statement, eliminating the TOCTOU race where two
// concurrent callers both saw ErrNoRows and one hit a UNIQUE
// constraint failure on the second INSERT.
var status string
err := s.db.QueryRowContext(ctx, `
INSERT INTO _async_migrations (name, status) VALUES (?, 'pending_async')
ON CONFLICT(name) DO UPDATE SET status = _async_migrations.status
RETURNING status`,
name).Scan(&status)
if err != nil {
return fmt.Errorf("register async migration %q: %w", name, err)
}
if status == "done" {
return nil // already complete, nothing to do
}
// In-process guard: prevent re-entry when a goroutine for this name
// is already executing fn in THIS process. Without this guard, two
// concurrent callers both see `pending_async` (one inserted it, one
// hit ON CONFLICT) and both interpret it as "previous run crashed,
// retry" — running fn twice in parallel.
//
// LoadOrStore is atomic; the loser of the race takes the early-return
// path. The winner clears the guard with `defer` in the goroutine,
// guaranteeing no leak even on panic.
if _, loaded := inflightAsyncMigrations.LoadOrStore(name, struct{}{}); loaded {
// Another goroutine in this process is already running fn for
// this name. Don't launch a duplicate.
return nil
}
// We own the in-flight slot. From here on we MUST launch the
// goroutine (which clears the slot via defer) so the slot doesn't
// leak on an early error return. If the reset UPDATE fails we
// still launch a goroutine that immediately exits with the error
// recorded; otherwise an error here would orphan the inflight map
// entry and lock out all future retries.
// Reset the row to a fresh pending_async (clear ended_at/error from
// any prior failed run). Safe to do AFTER the in-process guard
// because we now hold exclusive run rights for this name.
if _, err := s.db.ExecContext(ctx, `
UPDATE _async_migrations
SET status = 'pending_async', started_at = datetime('now'), ended_at = NULL, error = NULL
WHERE name = ?`, name); err != nil {
// Release the guard and surface the error — we never launched fn.
inflightAsyncMigrations.Delete(name)
return fmt.Errorf("reset async migration %q: %w", name, err)
}
s.backfillWg.Add(1)
go func() {
defer s.backfillWg.Done()
defer inflightAsyncMigrations.Delete(name)
var runErr error
defer func() {
if r := recover(); r != nil {
runErr = fmt.Errorf("panic: %v", r)
log.Printf("[async-migration] %q panic recovered: %v", name, r)
}
if runErr != nil {
recordTerminalStatus(s.db, name, "failed", runErr.Error())
log.Printf("[async-migration] %q FAILED: %v", name, runErr)
return
}
recordTerminalStatus(s.db, name, "done", "")
log.Printf("[async-migration] %q done", name)
}()
log.Printf("[async-migration] %q starting (boot continues)", name)
runErr = fn(ctx, s.db)
}()
return nil
}
// recordTerminalStatus writes the final status (`done` | `failed`) for an
// async migration, retrying with exponential-ish backoff on transient
// errors (disk full briefly, db locked beyond busy_timeout). If all retries
// fail it logs a CRITICAL message so the row's stuck state is visible in
// ops dashboards / log aggregation — otherwise a row stuck in
// `pending_async` causes the migration to re-run on every subsequent boot,
// silently.
func recordTerminalStatus(db *sql.DB, name, status, errMsg string) {
backoffs := []time.Duration{100 * time.Millisecond, 500 * time.Millisecond, 2 * time.Second}
var lastErr error
for attempt := 0; attempt <= len(backoffs); attempt++ {
var err error
if status == "done" {
_, err = db.Exec(`
UPDATE _async_migrations
SET status = 'done', ended_at = datetime('now'), error = NULL
WHERE name = ?`, name)
} else {
_, err = db.Exec(`
UPDATE _async_migrations
SET status = ?, ended_at = datetime('now'), error = ?
WHERE name = ?`, status, errMsg, name)
}
if err == nil {
if attempt > 0 {
log.Printf("[async-migration] recorded terminal status %q for %q on retry %d", status, name, attempt)
}
return
}
lastErr = err
if attempt < len(backoffs) {
log.Printf("[async-migration] transient error recording terminal status %q for %q (attempt %d/%d): %v", status, name, attempt+1, len(backoffs)+1, err)
time.Sleep(backoffs[attempt])
}
}
log.Printf("[async-migration] CRITICAL: cannot record terminal status %q for %q after %d attempts, manual intervention required (last error: %v)", status, name, len(backoffs)+1, lastErr)
}
// AsyncMigrationStatus returns the current status of an async migration
// (one of "pending_async", "done", "failed") or sql.ErrNoRows if no such
// migration has been registered.
func (s *Store) AsyncMigrationStatus(name string) (string, error) {
if s == nil || s.db == nil {
return "", ErrNilStoreDB
}
if err := ensureAsyncMigrationsTable(s.db); err != nil {
return "", err
}
var status string
err := s.db.QueryRow(`SELECT status FROM _async_migrations WHERE name = ?`, name).Scan(&status)
return status, err
}
// WaitForAsyncMigrations blocks until all currently-scheduled async migrations
// finish. Intended for tests + graceful shutdown; production boot path does NOT
// call this (that's the whole point).
func (s *Store) WaitForAsyncMigrations() {
s.backfillWg.Wait()
}
+474
View File
@@ -0,0 +1,474 @@
package main
import (
"context"
"database/sql"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
// waitForStatus polls AsyncMigrationStatus until it matches `want` or `deadline` passes.
func waitForStatus(t *testing.T, s *Store, name, want string, timeout time.Duration) string {
t.Helper()
deadline := time.Now().Add(timeout)
var status string
var err error
for time.Now().Before(deadline) {
status, err = s.AsyncMigrationStatus(name)
if err == nil && status == want {
return status
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("status never reached %q within %s: got %q (err=%v)", want, timeout, status, err)
return status
}
// TestRunAsyncMigration_PendingThenDone pins the contract for RunAsyncMigration:
//
// 1. After calling, the migration name MUST be queryable in the migrations
// table with status `pending_async` IMMEDIATELY (no waiting for fn).
// 2. After fn returns, the status MUST transition to `done`.
// 3. RunAsyncMigration MUST return without blocking on fn.
//
// This is the regression test for the recurring "sync migration on large
// table blocks ingestor startup" class (#791, #1483, ...). If this test
// fails the contract is broken — do not relax it; fix the runner.
func TestRunAsyncMigration_PendingThenDone(t *testing.T) {
s := newTestStore(t)
ctx := context.Background()
started := make(chan struct{})
release := make(chan struct{})
const name = "test_async_migration_v1"
// Wrap the call in a goroutine + select so that a SYNCHRONOUS
// implementation (one that blocks on fn before returning) would
// deadlock or hang — proving non-blocking behaviour rather than
// just "fn started concurrently". A sync impl would never deliver
// to resultCh because fn blocks on `<-release` which we haven't
// closed yet.
resultCh := make(chan error, 1)
go func() {
resultCh <- s.RunAsyncMigration(ctx, name, func(ctx context.Context, db *sql.DB) error {
close(started)
<-release
return nil
})
}()
select {
case err := <-resultCh:
if err != nil {
t.Fatalf("RunAsyncMigration returned error: %v", err)
}
// Returned successfully without blocking — that's the contract.
case <-time.After(2 * time.Second):
t.Fatal("RunAsyncMigration did not return within 2s — implementation appears to block on fn (sync regression)")
}
// Wait for the goroutine to actually start before checking status; this
// proves RunAsyncMigration did not block on fn and that fn is running
// concurrently.
select {
case <-started:
case <-time.After(2 * time.Second):
t.Fatal("async migration fn did not start within 2s — RunAsyncMigration may never have scheduled")
}
status, err := s.AsyncMigrationStatus(name)
if err != nil {
t.Fatalf("AsyncMigrationStatus while running: %v", err)
}
if status != "pending_async" {
t.Fatalf("status while fn running: got %q, want %q", status, "pending_async")
}
close(release)
// Poll for transition to done.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
status, err = s.AsyncMigrationStatus(name)
if err == nil && status == "done" {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("status never transitioned to done within 2s: got %q (err=%v)", status, err)
}
// TestRunAsyncMigration_PanicCapture proves that a panic inside fn does NOT
// leak past the recover, AND that the migration row transitions to
// "failed" with the panic message captured — NOT silently to "done".
// Operator visibility into mid-migration crashes is the whole point.
func TestRunAsyncMigration_PanicCapture(t *testing.T) {
s := newTestStore(t)
const name = "test_panic_capture_v1"
if err := s.RunAsyncMigration(context.Background(), name,
func(ctx context.Context, db *sql.DB) error {
panic("synthetic boom")
}); err != nil {
t.Fatalf("RunAsyncMigration returned error: %v", err)
}
s.WaitForAsyncMigrations()
status, err := s.AsyncMigrationStatus(name)
if err != nil {
t.Fatalf("status lookup: %v", err)
}
if status != "failed" {
t.Fatalf("status after panic: got %q, want %q (silent-done would be catastrophic)", status, "failed")
}
var errMsg sql.NullString
if err := s.db.QueryRow(`SELECT error FROM _async_migrations WHERE name = ?`, name).Scan(&errMsg); err != nil {
t.Fatalf("error column lookup: %v", err)
}
if !errMsg.Valid || errMsg.String == "" {
t.Fatalf("error column empty after panic — operator has no clue what failed")
}
}
// TestRunAsyncMigration_IdempotentSecondCallNoOps verifies that calling
// RunAsyncMigration a second time with the same name AFTER it has reached
// "done" status does NOT re-run fn. This protects the prod path: ingestor
// restarts must not rebuild already-built indexes.
func TestRunAsyncMigration_IdempotentSecondCallNoOps(t *testing.T) {
s := newTestStore(t)
const name = "test_idempotent_v1"
var calls int32
fn := func(ctx context.Context, db *sql.DB) error {
atomic.AddInt32(&calls, 1)
return nil
}
if err := s.RunAsyncMigration(context.Background(), name, fn); err != nil {
t.Fatalf("first call: %v", err)
}
s.WaitForAsyncMigrations()
waitForStatus(t, s, name, "done", 2*time.Second)
// Second call must short-circuit; fn must not be invoked again.
if err := s.RunAsyncMigration(context.Background(), name, fn); err != nil {
t.Fatalf("second call: %v", err)
}
s.WaitForAsyncMigrations()
if got := atomic.LoadInt32(&calls); got != 1 {
t.Fatalf("fn invoked %d times, want 1 (done-state row must short-circuit)", got)
}
}
// TestRunAsyncMigration_RestartSafetyFailedIsRetried simulates a crashed
// previous run: a row exists in `failed` state from a prior boot. The next
// RunAsyncMigration call MUST re-schedule fn (reset to pending_async, then
// run it), not leave the migration stuck in `failed` forever.
func TestRunAsyncMigration_RestartSafetyFailedIsRetried(t *testing.T) {
s := newTestStore(t)
const name = "test_restart_failed_v1"
if err := ensureAsyncMigrationsTable(s.db); err != nil {
t.Fatalf("ensure table: %v", err)
}
if _, err := s.db.Exec(`INSERT INTO _async_migrations (name, status, error) VALUES (?, 'failed', 'simulated prior crash')`, name); err != nil {
t.Fatalf("seed failed row: %v", err)
}
var calls int32
if err := s.RunAsyncMigration(context.Background(), name,
func(ctx context.Context, db *sql.DB) error {
atomic.AddInt32(&calls, 1)
return nil
}); err != nil {
t.Fatalf("RunAsyncMigration on failed row: %v", err)
}
s.WaitForAsyncMigrations()
waitForStatus(t, s, name, "done", 2*time.Second)
if got := atomic.LoadInt32(&calls); got != 1 {
t.Fatalf("fn invoked %d times, want 1 (failed-state row must be retried)", got)
}
// And the error column must be cleared on success.
var errCol sql.NullString
if err := s.db.QueryRow(`SELECT error FROM _async_migrations WHERE name = ?`, name).Scan(&errCol); err != nil {
t.Fatalf("error col: %v", err)
}
if errCol.Valid && errCol.String != "" {
t.Fatalf("error column not cleared on retry success: %q", errCol.String)
}
}
// TestRunAsyncMigration_RestartSafetyPendingIsRetried simulates the
// ingestor crashing while a migration was still in `pending_async` (the
// goroutine never finished). On next boot the migration MUST be re-picked-up
// — leaving it stuck in pending forever would be a silent prod outage.
func TestRunAsyncMigration_RestartSafetyPendingIsRetried(t *testing.T) {
s := newTestStore(t)
const name = "test_restart_pending_v1"
if err := ensureAsyncMigrationsTable(s.db); err != nil {
t.Fatalf("ensure table: %v", err)
}
if _, err := s.db.Exec(`INSERT INTO _async_migrations (name, status) VALUES (?, 'pending_async')`, name); err != nil {
t.Fatalf("seed pending row: %v", err)
}
var calls int32
if err := s.RunAsyncMigration(context.Background(), name,
func(ctx context.Context, db *sql.DB) error {
atomic.AddInt32(&calls, 1)
return nil
}); err != nil {
t.Fatalf("RunAsyncMigration on pending row: %v", err)
}
s.WaitForAsyncMigrations()
waitForStatus(t, s, name, "done", 2*time.Second)
if got := atomic.LoadInt32(&calls); got != 1 {
t.Fatalf("fn invoked %d times, want 1 (pending row must be retried after crash)", got)
}
}
// TestRunAsyncMigration_FnErrorRecorded covers the non-panic failure path:
// fn returns an error → status MUST be "failed" with the error captured.
func TestRunAsyncMigration_FnErrorRecorded(t *testing.T) {
s := newTestStore(t)
const name = "test_fn_error_v1"
if err := s.RunAsyncMigration(context.Background(), name,
func(ctx context.Context, db *sql.DB) error {
return fmt.Errorf("simulated migration error")
}); err != nil {
t.Fatalf("RunAsyncMigration: %v", err)
}
s.WaitForAsyncMigrations()
status, err := s.AsyncMigrationStatus(name)
if err != nil {
t.Fatalf("status: %v", err)
}
if status != "failed" {
t.Fatalf("status: got %q, want failed", status)
}
var errCol sql.NullString
if err := s.db.QueryRow(`SELECT error FROM _async_migrations WHERE name = ?`, name).Scan(&errCol); err != nil {
t.Fatalf("error col: %v", err)
}
if !errCol.Valid || errCol.String == "" {
t.Fatalf("error column empty after fn error")
}
}
// TestRunAsyncMigration_SameNameConcurrent_FnRunsOnce validates the
// single-process concurrency invariant: many goroutines calling
// RunAsyncMigration(name=X) at the same instant must NEVER execute fn more
// than once, AND every caller must receive a nil error (none should hit
// the "UNIQUE constraint failed" race that the previous SELECT-then-INSERT
// implementation was vulnerable to).
//
// The fix relies on:
// - atomic INSERT ... ON CONFLICT DO UPDATE RETURNING (no SELECT-then-INSERT TOCTOU)
// - sync.Map in-process guard (no double-launch on shared pending_async)
//
// Previously named TestRunAsyncMigration_ConcurrentSameNameSerialized; the
// old test was tautological (it asserted calls ∈ [1..5], satisfiable by
// any broken impl).
func TestRunAsyncMigration_SameNameConcurrent_FnRunsOnce(t *testing.T) {
s := newTestStore(t)
const name = "test_concurrent_serialize_v1"
const callers = 5
var calls int32
fn := func(ctx context.Context, db *sql.DB) error {
atomic.AddInt32(&calls, 1)
// Hold long enough to guarantee all other goroutines have
// observed the in-flight state and exited their RunAsyncMigration
// returns. Without this the in-process guard cleanup could race
// with late entrants.
time.Sleep(50 * time.Millisecond)
return nil
}
// Start-barrier so all goroutines wake at the same instant — maximizes
// the chance of triggering any latent race.
start := make(chan struct{})
errs := make(chan error, callers)
var wg sync.WaitGroup
for i := 0; i < callers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
<-start
errs <- s.RunAsyncMigration(context.Background(), name, fn)
}()
}
close(start)
wg.Wait()
close(errs)
// Capture errors — every caller MUST get nil. Old impl was prone to
// "UNIQUE constraint failed: _async_migrations.name" on the loser of
// the SELECT-then-INSERT race.
for err := range errs {
if err != nil {
t.Fatalf("RunAsyncMigration returned error from concurrent caller: %v", err)
}
}
s.WaitForAsyncMigrations()
waitForStatus(t, s, name, "done", 2*time.Second)
if got := atomic.LoadInt32(&calls); got != 1 {
t.Fatalf("fn invoked %d times, want exactly 1 (concurrent same-name calls must serialize to one execution)", got)
}
}
// TestRunAsyncMigration_NilDBReturnsError ensures that calling
// RunAsyncMigration on a *Store with a nil db handle returns a clear
// sentinel error rather than panicking. This is a programmer-error guard:
// the helper is concurrency-critical and a nil-deref panic would crash
// the ingestor in a confusing way.
func TestRunAsyncMigration_NilDBReturnsError(t *testing.T) {
var s Store // zero value, db is nil
err := s.RunAsyncMigration(context.Background(), "nil_db_test",
func(ctx context.Context, db *sql.DB) error { return nil })
if err == nil {
t.Fatal("RunAsyncMigration on nil db: got nil error, want ErrNilStoreDB")
}
if err != ErrNilStoreDB {
t.Fatalf("RunAsyncMigration on nil db: got %v, want ErrNilStoreDB", err)
}
// Same for AsyncMigrationStatus — must not panic.
if _, err := s.AsyncMigrationStatus("nil_db_test"); err != ErrNilStoreDB {
t.Fatalf("AsyncMigrationStatus on nil db: got %v, want ErrNilStoreDB", err)
}
}
// TestRunAsyncMigration_CtxCancelRecorded verifies that when ctx is
// cancelled and fn returns ctx.Err(), the migration is recorded as
// `failed` with "context canceled" in the error column. This is the
// graceful-shutdown contract: a cancelled migration should NOT be marked
// done (it didn't finish), but the failure mode must be observable.
func TestRunAsyncMigration_CtxCancelRecorded(t *testing.T) {
s := newTestStore(t)
const name = "test_ctx_cancel_v1"
ctx, cancel := context.WithCancel(context.Background())
started := make(chan struct{})
if err := s.RunAsyncMigration(ctx, name, func(ctx context.Context, db *sql.DB) error {
close(started)
<-ctx.Done()
return ctx.Err()
}); err != nil {
t.Fatalf("RunAsyncMigration: %v", err)
}
<-started
cancel()
s.WaitForAsyncMigrations()
status, err := s.AsyncMigrationStatus(name)
if err != nil {
t.Fatalf("status: %v", err)
}
if status != "failed" {
t.Fatalf("status after ctx cancel: got %q, want failed", status)
}
var errCol sql.NullString
if err := s.db.QueryRow(`SELECT error FROM _async_migrations WHERE name = ?`, name).Scan(&errCol); err != nil {
t.Fatalf("error col: %v", err)
}
if !errCol.Valid || !strings.Contains(errCol.String, "context canceled") {
t.Fatalf("error column should contain %q, got %q", "context canceled", errCol.String)
}
}
// TestRunAsyncMigration_CrossProcessSerialized verifies the cross-process
// concurrency claim made in MIGRATIONS.md: two distinct *sql.DB handles
// (simulating two ingestor instances) opened against the same file,
// concurrently calling RunAsyncMigration(name=X), must NEVER execute fn
// more than once total. SQLite serializes writes via its file-level lock,
// and the atomic INSERT ON CONFLICT pattern means the loser sees the
// winner's `pending_async` row and short-circuits.
//
// NOTE: the in-process sync.Map guard does NOT cross processes; the
// cross-process safety comes solely from SQLite's write serialization +
// the atomic ON CONFLICT. This test pins that property.
func TestRunAsyncMigration_CrossProcessSerialized(t *testing.T) {
// Two stores against the same DB file simulate two processes.
dir := t.TempDir()
dbPath := dir + "/cross.db"
s1, err := OpenStore(dbPath)
if err != nil {
t.Fatalf("open s1: %v", err)
}
defer s1.Close()
s2, err := OpenStore(dbPath)
if err != nil {
t.Fatalf("open s2: %v", err)
}
defer s2.Close()
// Sanity: distinct *sql.DB handles.
if s1.db == s2.db {
t.Fatal("expected distinct *sql.DB handles")
}
const name = "test_cross_process_v1"
var calls int32
fn := func(ctx context.Context, db *sql.DB) error {
atomic.AddInt32(&calls, 1)
time.Sleep(50 * time.Millisecond)
return nil
}
start := make(chan struct{})
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
<-start
if err := s1.RunAsyncMigration(context.Background(), name, fn); err != nil {
t.Errorf("s1.RunAsyncMigration: %v", err)
}
}()
go func() {
defer wg.Done()
<-start
if err := s2.RunAsyncMigration(context.Background(), name, fn); err != nil {
t.Errorf("s2.RunAsyncMigration: %v", err)
}
}()
close(start)
wg.Wait()
s1.WaitForAsyncMigrations()
s2.WaitForAsyncMigrations()
// Both stores should agree status==done; SQLite write serialization
// guarantees the second INSERT sees the first's row.
st1, _ := s1.AsyncMigrationStatus(name)
st2, _ := s2.AsyncMigrationStatus(name)
if st1 != "done" || st2 != "done" {
t.Fatalf("expected done/done, got s1=%q s2=%q", st1, st2)
}
if got := atomic.LoadInt32(&calls); got != 1 {
t.Fatalf("fn invoked %d times across two processes, want exactly 1 (SQLite write serialization + ON CONFLICT must dedupe)", got)
}
}
+28 -7
View File
@@ -1,6 +1,7 @@
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
@@ -124,6 +125,27 @@ func OpenStoreWithInterval(dbPath string, sampleIntervalSec int) (*Store, error)
return nil, fmt.Errorf("preparing statements: %w", err)
}
// Schedule async migrations. These must NOT block boot. See
// async_migration.go for the convention.
// PREFLIGHT: async=true reason="composite index build on observations (1.9M+ rows in prod) — converted from sync after v3.8.3"
var idxDone int
if s.db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'obs_observer_ts_idx_v1'").Scan(&idxDone) != nil {
if err := s.RunAsyncMigration(context.Background(), "obs_observer_ts_idx_v1",
func(ctx context.Context, d *sql.DB) error {
log.Println("[migration/async] Building (observer_idx, timestamp) composite index on observations...")
if _, err := d.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS idx_observations_observer_idx_timestamp ON observations(observer_idx, timestamp)`); err != nil {
return err
}
if _, err := d.ExecContext(ctx, `INSERT OR IGNORE INTO _migrations (name) VALUES ('obs_observer_ts_idx_v1')`); err != nil {
return err
}
log.Println("[migration/async] observations(observer_idx, timestamp) index created")
return nil
}); err != nil {
log.Printf("[migration/async] scheduling obs_observer_ts_idx_v1 failed: %v", err)
}
}
return s, nil
}
@@ -368,13 +390,12 @@ func applySchema(db *sql.DB) error {
// timestamp WHERE filter; a composite (observer_idx, timestamp)
// index lets SQLite resolve the grouping + range filter from the
// index alone instead of a 1.9M-row scan.
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'obs_observer_ts_idx_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Adding (observer_idx, timestamp) composite index on observations...")
db.Exec(`CREATE INDEX IF NOT EXISTS idx_observations_observer_idx_timestamp ON observations(observer_idx, timestamp)`)
db.Exec(`INSERT INTO _migrations (name) VALUES ('obs_observer_ts_idx_v1')`)
log.Println("[migration] observations(observer_idx, timestamp) index created")
}
//
// CONVERTED TO ASYNC (preflight-async-migration-gate). Scheduling
// happens in OpenStore() once the real *Store exists so the
// backfill WaitGroup is shared with the rest of the ingestor.
// The legacy `_migrations` gate is preserved by the async fn so
// DBs that already completed the sync build stay no-op.
// #1483: normalize nodes.public_key to lowercase. The server's
// GetNodeLocationsByKeys lookup dropped LOWER(public_key) for perf
@@ -0,0 +1,21 @@
// Fixture: migration block WITHOUT an async annotation and WITHOUT being
// wrapped in the async-migration helper. This file exists ONLY so that
// ~/.openclaw/skills/pr-preflight/scripts/check-async-migrations.sh
// has a known-bad sample to test against (the script is invoked with
// BASE pointing at master and FIXTURE_DIR pointing here).
//
// DO NOT add a PREFLIGHT annotation to this file. DO NOT wrap the
// migration via the async helper. The check script's correctness
// depends on this staying BAD.
//
// IMPORTANT: this file must NOT contain the literal identifier of the
// async-helper function anywhere (comments, strings, identifiers). The
// preflight gate greps a window of lines above the migration for that
// identifier as an "OK" signal, so mentioning it here would cause the
// gate to *pass* this fixture — defeating its purpose. Refer to the
// helper only obliquely as "the async-migration helper" in prose.
package fixtures
const _ = `
CREATE INDEX idx_observations_bad_sync_v1 ON observations(observer_idx, timestamp);
`
@@ -0,0 +1,9 @@
// Fixture: migration block WITH an async annotation. Companion to
// bad_sync_migration.go. The preflight check script must accept this
// because of the PREFLIGHT line directly above the migration.
package fixtures
// PREFLIGHT: async=true reason="fixture-only — ALTER ADD COLUMN is O(1) in sqlite"
const _ = `
ALTER TABLE observations ADD COLUMN annotated_good_fixture_col INTEGER DEFAULT 0;
`