mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-06-29 00:21:44 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| caab3ae57b | |||
| afc0ed10bf | |||
| 4976fb418d | |||
| afb860a5d2 | |||
| 38354f3283 | |||
| 2c6744ccea |
+168
@@ -0,0 +1,168 @@
|
||||
# MIGRATIONS — async vs sync policy
|
||||
|
||||
CoreScope's ingestor applies schema/data migrations inline at boot in
|
||||
`cmd/ingestor/db.go`. Every migration that runs synchronously blocks the
|
||||
ingestor from accepting packets until it returns. On a dev DB that's
|
||||
milliseconds; at prod scale (1.9M+ observations, 80K+ adverts, 2600+ nodes
|
||||
on Cascadia) it can pin the boot for minutes and trigger restart loops —
|
||||
the "upgrade broke prod" failure class (#791, #1483, and others).
|
||||
|
||||
## The rule
|
||||
|
||||
**Any new `CREATE INDEX`, `ALTER TABLE`, or data-rewriting `UPDATE`/`DELETE`
|
||||
in a migration file MUST do ONE of the following:**
|
||||
|
||||
### Option 1 — Run via `Store.RunAsyncMigration` (preferred for backfills)
|
||||
|
||||
```go
|
||||
// Scheduled in OpenStore() AFTER the *Store is constructed.
|
||||
if err := s.RunAsyncMigration(ctx, "my_migration_v1",
|
||||
func(ctx context.Context, db *sql.DB) error {
|
||||
_, err := db.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS ...`)
|
||||
return err
|
||||
}); err != nil {
|
||||
log.Printf("[migration/async] scheduling failed: %v", err)
|
||||
}
|
||||
```
|
||||
|
||||
- The migration is recorded as `pending_async` in the `_async_migrations`
|
||||
table **immediately** — the ingestor boots and starts ingesting.
|
||||
- `fn` runs in a goroutine; the WaitGroup is shared with the rest of the
|
||||
ingestor (`Store.WaitForAsyncMigrations()` waits for everything).
|
||||
- On success the row flips to `done`; on error/panic to `failed` with the
|
||||
error message captured.
|
||||
- Idempotent: rows in `done` state short-circuit; `failed`/`pending_async`
|
||||
rows are retried on the next boot.
|
||||
|
||||
Reference implementations: `Store.BackfillPathJSONAsync` (path_json
|
||||
backfill) and the converted `obs_observer_ts_idx_v1` index build in
|
||||
`OpenStore`.
|
||||
|
||||
### Option 2 — Annotate as preflight-cheap
|
||||
|
||||
Some migrations are genuinely cheap at any scale (e.g. `ALTER TABLE ADD
|
||||
COLUMN`, `CREATE INDEX` on a table you know is bounded to a few thousand
|
||||
rows). Annotate the migration block with a comment **on the line
|
||||
immediately above the migration block** so the preflight gate recognises
|
||||
the opt-out:
|
||||
|
||||
```go
|
||||
// PREFLIGHT: async=true reason="ALTER ADD COLUMN — O(1) sqlite operation"
|
||||
if r := db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'foo_v1'"); ...
|
||||
```
|
||||
|
||||
The reason MUST be a real one-line justification you can defend in
|
||||
review. "It's fine" is not a reason.
|
||||
|
||||
### Option 3 — Opt out per PR
|
||||
|
||||
If the migration is genuinely safe and you don't want to add an inline
|
||||
annotation, put a single line in the PR body:
|
||||
|
||||
```
|
||||
PREFLIGHT-MIGRATION-SCALE: <30s N=80K verified on Cascadia staging snapshot
|
||||
```
|
||||
|
||||
This must include both `<30s` and `N=<some scale>` so a reviewer can
|
||||
challenge the measurement.
|
||||
|
||||
## The gate
|
||||
|
||||
`~/.openclaw/skills/pr-preflight/scripts/check-async-migrations.sh` runs
|
||||
on every PR via the preflight orchestrator. It greps the diff for new or
|
||||
modified migration blocks (files matching `cmd/ingestor/db.go`,
|
||||
`cmd/ingestor/maintenance.go`, `internal/dbschema/**`, `**/migrations/**`,
|
||||
`**/*.sql`, plus any Go file touching `CREATE INDEX` / `ALTER TABLE` /
|
||||
`CREATE UNIQUE INDEX`). For each hit it requires one of the three
|
||||
opt-outs above. Hard-fail (exit 1) — no warning-only mode.
|
||||
|
||||
## Concurrency model
|
||||
|
||||
CoreScope runs **one ingestor process** per deployment (`cmd/ingestor/`,
|
||||
single binary, single `*Store`). There is no cluster mode, no leader
|
||||
election, no second writer. SQLite is opened with `SetMaxOpenConns(1)`
|
||||
and a 5s `busy_timeout`; all writes (live MQTT ingest + async migration
|
||||
goroutines + maintenance backfills) serialize through the one connection
|
||||
in a single process.
|
||||
|
||||
What this means for async migrations:
|
||||
|
||||
- **Within a single process**, concurrent `RunAsyncMigration(name=X)`
|
||||
callers are deduplicated by two layers:
|
||||
1. An atomic `INSERT ... ON CONFLICT(name) DO UPDATE ... RETURNING`
|
||||
replaces the previous SELECT-then-INSERT pattern, eliminating the
|
||||
TOCTOU race where two callers both saw `ErrNoRows` and the loser
|
||||
hit `UNIQUE constraint failed`.
|
||||
2. An in-process `sync.Map` guard tracks which names currently have a
|
||||
goroutine executing `fn` IN THIS PROCESS. A second caller that
|
||||
sees `pending_async` (e.g. because the first caller just inserted
|
||||
it) short-circuits via the in-flight map and does NOT launch a
|
||||
duplicate goroutine.
|
||||
See `TestRunAsyncMigration_SameNameConcurrent_FnRunsOnce` for the
|
||||
contract: 5 concurrent callers ⇒ fn runs exactly once, all 5 return
|
||||
`nil`.
|
||||
- **Across processes** (two ingestor instances against the same DB —
|
||||
not a supported deployment shape, but verified for defense-in-depth):
|
||||
SQLite's file-level write lock serializes the atomic INSERT. The
|
||||
losing process's `ON CONFLICT` sees the winner's `pending_async` row
|
||||
and short-circuits. The in-process `sync.Map` does NOT cross
|
||||
processes, so this property comes solely from SQLite's write
|
||||
serialization. See `TestRunAsyncMigration_CrossProcessSerialized` for
|
||||
the pinned contract.
|
||||
- **Terminal status durability.** The final `UPDATE` that flips the row
|
||||
to `done` / `failed` is retried with backoff (3 attempts, 100ms /
|
||||
500ms / 2s) to survive transient write contention. If all retries
|
||||
fail, a `[async-migration] CRITICAL: cannot record terminal status…`
|
||||
line is logged so the stuck row is visible in ops aggregation —
|
||||
otherwise the migration would silently re-run on every subsequent
|
||||
boot.
|
||||
- **WARNING: `fn` blocks ALL live ingest writes for its duration.**
|
||||
Because `MaxOpenConns=1`, a long `CREATE INDEX` / `UPDATE` /
|
||||
backfill inside `fn` holds the only write connection until it
|
||||
yields. Estimate impact before approving any migration that takes
|
||||
>5 seconds at production scale. For multi-minute work the canonical
|
||||
pattern is chunked / batched fn implementations with inter-batch
|
||||
`time.Sleep` so live writers can interleave (see
|
||||
`BackfillPathJSONAsync` for a reference implementation).
|
||||
- **`fn` may run more than once across boots.** Crashed-mid-fn rows
|
||||
are reset to `pending_async` and retried on next boot. `fn` MUST be
|
||||
idempotent (`CREATE INDEX IF NOT EXISTS`, `INSERT ... ON CONFLICT DO
|
||||
NOTHING`, etc.). See `RunAsyncMigration` godoc for the full caller
|
||||
contract.
|
||||
|
||||
## Scale budgets
|
||||
|
||||
Per-migration target: **<30s** at current prod scale (Cascadia: ~2,600
|
||||
nodes, ~80K observations; previous prod snapshot: ~1.9M observations).
|
||||
|
||||
Worked example (#1483, `obs_observer_ts_idx_v1`): composite index build
|
||||
on `observations(observer_idx, timestamp)`. At ~1.9M rows the sync build
|
||||
pinned ingestor boot for several minutes → restart loop. Converted to
|
||||
async via `RunAsyncMigration` in `OpenStore` so boot returns immediately
|
||||
and the index materializes in the background; the existing `_migrations`
|
||||
short-circuit at the top of the migration block ensures DBs that already
|
||||
completed the sync v3.8.3 build do NOT re-run it through the goroutine
|
||||
path on subsequent boots.
|
||||
|
||||
If you cannot meet the <30s budget, document the expected upper bound
|
||||
and operator runbook expectation (e.g. "index build expected ~10 min on
|
||||
a 5M-row table; ingestor remains responsive; monitor via
|
||||
`SELECT status, error FROM _async_migrations WHERE name = ...`").
|
||||
|
||||
## Why this exists
|
||||
|
||||
Pattern that keeps repeating:
|
||||
|
||||
1. Author writes `CREATE INDEX foo ON observations(...)` in a migration.
|
||||
2. Local dev DB has ~100 rows. Migration returns in 1ms. CI is green.
|
||||
3. Reviewer focuses on plan correctness, not scale.
|
||||
4. Ship.
|
||||
5. Prod boots, sqlite scans 1.9M rows, the ingestor sits at `[migration]
|
||||
Adding index...` for 8 minutes, healthcheck times out, container
|
||||
restarts, loops.
|
||||
6. Operator pages. Hotfix. Apology.
|
||||
|
||||
The gate doesn't try to detect table size (undecidable from a diff). It
|
||||
enforces **annotation discipline**: every author who adds a migration
|
||||
must consciously decide which bucket it falls into and write that down.
|
||||
That is the cheapest possible intervention that breaks the cycle.
|
||||
@@ -0,0 +1,263 @@
|
||||
// Async migration helper — runs schema/backfill work that may take minutes on
|
||||
// large prod tables WITHOUT blocking ingestor startup.
|
||||
//
|
||||
// MIGRATION ANNOTATION CONVENTION (read this before touching migrations):
|
||||
//
|
||||
// Sync schema/data migrations (CREATE INDEX, ALTER TABLE, UPDATE ... WHERE)
|
||||
// that run inline during OpenStore() block the ingestor from accepting
|
||||
// packets until they finish. On an empty dev DB they return in milliseconds;
|
||||
// at prod scale (1.9M+ observations, 80K+ adverts) they can pin the boot
|
||||
// for minutes and trigger restart loops. This regression class has bitten us
|
||||
// repeatedly (#791 resolved_path backfill, #1483 obs_observer_ts_idx_v1).
|
||||
//
|
||||
// ANY new CREATE INDEX / ALTER TABLE / data-rewrite migration MUST EITHER:
|
||||
// 1. Run via Store.RunAsyncMigration(...) below (preferred for backfills
|
||||
// and any work that may touch >1K rows). The migration is recorded as
|
||||
// `pending_async` immediately, returns to the caller (boot proceeds),
|
||||
// and completes in a goroutine. Status flips to `done` (or `failed`
|
||||
// with an error message) when fn returns.
|
||||
// 2. Carry the preflight annotation comment immediately above the
|
||||
// migration block, e.g.
|
||||
// // PREFLIGHT: async=true reason="<one-line justification>"
|
||||
// Use this for migrations that are genuinely cheap at any scale
|
||||
// (e.g. ALTER TABLE ADD COLUMN, CREATE INDEX on a known-bounded
|
||||
// table). The annotation is grepped by
|
||||
// ~/.openclaw/skills/pr-preflight/scripts/check-async-migrations.sh
|
||||
// — its absence on a touched migration block is a hard-fail gate.
|
||||
//
|
||||
// See MIGRATIONS.md in the repo root for the full policy and examples.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ErrNilStoreDB is returned by RunAsyncMigration when called on a *Store
|
||||
// whose db handle is nil. This is a programmer error (the Store was never
|
||||
// opened, or it was closed) but we surface it as a normal error rather than
|
||||
// panicking so a misconfigured caller can degrade gracefully.
|
||||
var ErrNilStoreDB = errors.New("async migration: store has nil db")
|
||||
|
||||
// inflightAsyncMigrations tracks which migration names currently have a
|
||||
// goroutine executing fn IN THIS PROCESS. The SQL row's `pending_async`
|
||||
// status alone is not sufficient — a row may be `pending_async` because
|
||||
// (a) the goroutine is actively running right now in this process, or
|
||||
// (b) a previous boot crashed mid-fn and left the row stuck.
|
||||
// Case (a) must NOT re-launch fn (would run twice in parallel and corrupt
|
||||
// state). Case (b) must re-launch fn (otherwise the migration is stuck
|
||||
// forever). The in-process guard is the discriminator: if a name is in the
|
||||
// map, case (a); if not, case (b).
|
||||
//
|
||||
// Cross-process serialization (a second ingestor instance against the same
|
||||
// DB) is handled by SQLite's write lock + the atomic INSERT ON CONFLICT
|
||||
// pattern in RunAsyncMigration — see TestRunAsyncMigration_CrossProcess.
|
||||
var inflightAsyncMigrations sync.Map // map[string]struct{}
|
||||
|
||||
// ensureAsyncMigrationsTable creates the bookkeeping table used by
|
||||
// RunAsyncMigration / AsyncMigrationStatus. Idempotent.
|
||||
func ensureAsyncMigrationsTable(db *sql.DB) error {
|
||||
_, err := db.Exec(`
|
||||
CREATE TABLE IF NOT EXISTS _async_migrations (
|
||||
name TEXT PRIMARY KEY,
|
||||
status TEXT NOT NULL, -- pending_async | done | failed
|
||||
started_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
ended_at TEXT,
|
||||
error TEXT
|
||||
)
|
||||
`)
|
||||
return err
|
||||
}
|
||||
|
||||
// RunAsyncMigration registers `name` as a pending async migration and
|
||||
// schedules `fn` to run in a background goroutine. It returns to the caller
|
||||
// immediately so the ingestor can keep booting.
|
||||
//
|
||||
// Caller's obligations on `fn`:
|
||||
//
|
||||
// - `fn` MUST be idempotent. Across crashes, restarts, and retries (and
|
||||
// in older buggy versions of this helper, possibly within a single
|
||||
// process), `fn` can be invoked more than once for the same name.
|
||||
// Always use `IF NOT EXISTS` / `INSERT ... ON CONFLICT DO NOTHING` /
|
||||
// equivalent constructs. Never rely on "this only ever runs once".
|
||||
// - `fn` SHOULD respect ctx cancellation. The ctx passed in is the
|
||||
// ctx passed to RunAsyncMigration; if the caller cancels it (e.g.
|
||||
// graceful shutdown), `fn` is expected to return `ctx.Err()` promptly.
|
||||
// When `fn` returns a context cancellation error the row is marked
|
||||
// `failed` so a future boot will retry.
|
||||
// - `fn` will hold the single SQLite write connection for its entire
|
||||
// duration. The ingestor opens SQLite with `SetMaxOpenConns(1)`,
|
||||
// which means any `fn` that issues a write blocks ALL live ingest
|
||||
// writes until it yields. For multi-minute work, use chunked /
|
||||
// batched patterns with `time.Sleep` between batches (see
|
||||
// `BackfillPathJSONAsync` for the canonical pattern).
|
||||
//
|
||||
// Contract (pinned by async_migration_test.go):
|
||||
// - status is `pending_async` IMMEDIATELY after this returns.
|
||||
// - fn runs in a goroutine; on success status becomes `done`, on error
|
||||
// or panic status becomes `failed` and the error is recorded.
|
||||
// - Idempotent at the call site: if a row with the same name already
|
||||
// exists in `done` state, fn is NOT re-run. If a goroutine for this
|
||||
// name is already in flight IN THIS PROCESS, fn is NOT re-run
|
||||
// (the in-process guard short-circuits). If in `failed` or
|
||||
// `pending_async` (from a crashed prior boot) state, fn IS
|
||||
// re-scheduled.
|
||||
// - Concurrency-safe: concurrent calls with the same name execute fn
|
||||
// AT MOST ONCE. See TestRunAsyncMigration_SameNameConcurrent_FnRunsOnce.
|
||||
// - The caller's WaitGroup tracks the goroutine so tests/shutdown can
|
||||
// wait via Store.WaitForAsyncMigrations().
|
||||
func (s *Store) RunAsyncMigration(ctx context.Context, name string, fn func(context.Context, *sql.DB) error) error {
|
||||
if s == nil || s.db == nil {
|
||||
return ErrNilStoreDB
|
||||
}
|
||||
if err := ensureAsyncMigrationsTable(s.db); err != nil {
|
||||
return fmt.Errorf("ensure _async_migrations: %w", err)
|
||||
}
|
||||
|
||||
// Atomic INSERT-or-fetch via SQLite's ON CONFLICT ... DO UPDATE ...
|
||||
// RETURNING. SQLite returns the post-statement row state, so:
|
||||
// - On INSERT: returns 'pending_async' (the just-inserted value).
|
||||
// - On CONFLICT: the DO UPDATE is a no-op (SET status=status),
|
||||
// and RETURNING gives the EXISTING status.
|
||||
// This collapses the previous SELECT-then-INSERT/UPDATE into a
|
||||
// single statement, eliminating the TOCTOU race where two
|
||||
// concurrent callers both saw ErrNoRows and one hit a UNIQUE
|
||||
// constraint failure on the second INSERT.
|
||||
var status string
|
||||
err := s.db.QueryRowContext(ctx, `
|
||||
INSERT INTO _async_migrations (name, status) VALUES (?, 'pending_async')
|
||||
ON CONFLICT(name) DO UPDATE SET status = _async_migrations.status
|
||||
RETURNING status`,
|
||||
name).Scan(&status)
|
||||
if err != nil {
|
||||
return fmt.Errorf("register async migration %q: %w", name, err)
|
||||
}
|
||||
|
||||
if status == "done" {
|
||||
return nil // already complete, nothing to do
|
||||
}
|
||||
|
||||
// In-process guard: prevent re-entry when a goroutine for this name
|
||||
// is already executing fn in THIS process. Without this guard, two
|
||||
// concurrent callers both see `pending_async` (one inserted it, one
|
||||
// hit ON CONFLICT) and both interpret it as "previous run crashed,
|
||||
// retry" — running fn twice in parallel.
|
||||
//
|
||||
// LoadOrStore is atomic; the loser of the race takes the early-return
|
||||
// path. The winner clears the guard with `defer` in the goroutine,
|
||||
// guaranteeing no leak even on panic.
|
||||
if _, loaded := inflightAsyncMigrations.LoadOrStore(name, struct{}{}); loaded {
|
||||
// Another goroutine in this process is already running fn for
|
||||
// this name. Don't launch a duplicate.
|
||||
return nil
|
||||
}
|
||||
|
||||
// We own the in-flight slot. From here on we MUST launch the
|
||||
// goroutine (which clears the slot via defer) so the slot doesn't
|
||||
// leak on an early error return. If the reset UPDATE fails we
|
||||
// still launch a goroutine that immediately exits with the error
|
||||
// recorded; otherwise an error here would orphan the inflight map
|
||||
// entry and lock out all future retries.
|
||||
|
||||
// Reset the row to a fresh pending_async (clear ended_at/error from
|
||||
// any prior failed run). Safe to do AFTER the in-process guard
|
||||
// because we now hold exclusive run rights for this name.
|
||||
if _, err := s.db.ExecContext(ctx, `
|
||||
UPDATE _async_migrations
|
||||
SET status = 'pending_async', started_at = datetime('now'), ended_at = NULL, error = NULL
|
||||
WHERE name = ?`, name); err != nil {
|
||||
// Release the guard and surface the error — we never launched fn.
|
||||
inflightAsyncMigrations.Delete(name)
|
||||
return fmt.Errorf("reset async migration %q: %w", name, err)
|
||||
}
|
||||
|
||||
s.backfillWg.Add(1)
|
||||
go func() {
|
||||
defer s.backfillWg.Done()
|
||||
defer inflightAsyncMigrations.Delete(name)
|
||||
var runErr error
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
runErr = fmt.Errorf("panic: %v", r)
|
||||
log.Printf("[async-migration] %q panic recovered: %v", name, r)
|
||||
}
|
||||
if runErr != nil {
|
||||
recordTerminalStatus(s.db, name, "failed", runErr.Error())
|
||||
log.Printf("[async-migration] %q FAILED: %v", name, runErr)
|
||||
return
|
||||
}
|
||||
recordTerminalStatus(s.db, name, "done", "")
|
||||
log.Printf("[async-migration] %q done", name)
|
||||
}()
|
||||
log.Printf("[async-migration] %q starting (boot continues)", name)
|
||||
runErr = fn(ctx, s.db)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// recordTerminalStatus writes the final status (`done` | `failed`) for an
|
||||
// async migration, retrying with exponential-ish backoff on transient
|
||||
// errors (disk full briefly, db locked beyond busy_timeout). If all retries
|
||||
// fail it logs a CRITICAL message so the row's stuck state is visible in
|
||||
// ops dashboards / log aggregation — otherwise a row stuck in
|
||||
// `pending_async` causes the migration to re-run on every subsequent boot,
|
||||
// silently.
|
||||
func recordTerminalStatus(db *sql.DB, name, status, errMsg string) {
|
||||
backoffs := []time.Duration{100 * time.Millisecond, 500 * time.Millisecond, 2 * time.Second}
|
||||
var lastErr error
|
||||
for attempt := 0; attempt <= len(backoffs); attempt++ {
|
||||
var err error
|
||||
if status == "done" {
|
||||
_, err = db.Exec(`
|
||||
UPDATE _async_migrations
|
||||
SET status = 'done', ended_at = datetime('now'), error = NULL
|
||||
WHERE name = ?`, name)
|
||||
} else {
|
||||
_, err = db.Exec(`
|
||||
UPDATE _async_migrations
|
||||
SET status = ?, ended_at = datetime('now'), error = ?
|
||||
WHERE name = ?`, status, errMsg, name)
|
||||
}
|
||||
if err == nil {
|
||||
if attempt > 0 {
|
||||
log.Printf("[async-migration] recorded terminal status %q for %q on retry %d", status, name, attempt)
|
||||
}
|
||||
return
|
||||
}
|
||||
lastErr = err
|
||||
if attempt < len(backoffs) {
|
||||
log.Printf("[async-migration] transient error recording terminal status %q for %q (attempt %d/%d): %v", status, name, attempt+1, len(backoffs)+1, err)
|
||||
time.Sleep(backoffs[attempt])
|
||||
}
|
||||
}
|
||||
log.Printf("[async-migration] CRITICAL: cannot record terminal status %q for %q after %d attempts, manual intervention required (last error: %v)", status, name, len(backoffs)+1, lastErr)
|
||||
}
|
||||
|
||||
// AsyncMigrationStatus returns the current status of an async migration
|
||||
// (one of "pending_async", "done", "failed") or sql.ErrNoRows if no such
|
||||
// migration has been registered.
|
||||
func (s *Store) AsyncMigrationStatus(name string) (string, error) {
|
||||
if s == nil || s.db == nil {
|
||||
return "", ErrNilStoreDB
|
||||
}
|
||||
if err := ensureAsyncMigrationsTable(s.db); err != nil {
|
||||
return "", err
|
||||
}
|
||||
var status string
|
||||
err := s.db.QueryRow(`SELECT status FROM _async_migrations WHERE name = ?`, name).Scan(&status)
|
||||
return status, err
|
||||
}
|
||||
|
||||
// WaitForAsyncMigrations blocks until all currently-scheduled async migrations
|
||||
// finish. Intended for tests + graceful shutdown; production boot path does NOT
|
||||
// call this (that's the whole point).
|
||||
func (s *Store) WaitForAsyncMigrations() {
|
||||
s.backfillWg.Wait()
|
||||
}
|
||||
@@ -0,0 +1,474 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// waitForStatus polls AsyncMigrationStatus until it matches `want` or `deadline` passes.
|
||||
func waitForStatus(t *testing.T, s *Store, name, want string, timeout time.Duration) string {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(timeout)
|
||||
var status string
|
||||
var err error
|
||||
for time.Now().Before(deadline) {
|
||||
status, err = s.AsyncMigrationStatus(name)
|
||||
if err == nil && status == want {
|
||||
return status
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("status never reached %q within %s: got %q (err=%v)", want, timeout, status, err)
|
||||
return status
|
||||
}
|
||||
|
||||
// TestRunAsyncMigration_PendingThenDone pins the contract for RunAsyncMigration:
|
||||
//
|
||||
// 1. After calling, the migration name MUST be queryable in the migrations
|
||||
// table with status `pending_async` IMMEDIATELY (no waiting for fn).
|
||||
// 2. After fn returns, the status MUST transition to `done`.
|
||||
// 3. RunAsyncMigration MUST return without blocking on fn.
|
||||
//
|
||||
// This is the regression test for the recurring "sync migration on large
|
||||
// table blocks ingestor startup" class (#791, #1483, ...). If this test
|
||||
// fails the contract is broken — do not relax it; fix the runner.
|
||||
func TestRunAsyncMigration_PendingThenDone(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
started := make(chan struct{})
|
||||
release := make(chan struct{})
|
||||
|
||||
const name = "test_async_migration_v1"
|
||||
// Wrap the call in a goroutine + select so that a SYNCHRONOUS
|
||||
// implementation (one that blocks on fn before returning) would
|
||||
// deadlock or hang — proving non-blocking behaviour rather than
|
||||
// just "fn started concurrently". A sync impl would never deliver
|
||||
// to resultCh because fn blocks on `<-release` which we haven't
|
||||
// closed yet.
|
||||
resultCh := make(chan error, 1)
|
||||
go func() {
|
||||
resultCh <- s.RunAsyncMigration(ctx, name, func(ctx context.Context, db *sql.DB) error {
|
||||
close(started)
|
||||
<-release
|
||||
return nil
|
||||
})
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-resultCh:
|
||||
if err != nil {
|
||||
t.Fatalf("RunAsyncMigration returned error: %v", err)
|
||||
}
|
||||
// Returned successfully without blocking — that's the contract.
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("RunAsyncMigration did not return within 2s — implementation appears to block on fn (sync regression)")
|
||||
}
|
||||
|
||||
// Wait for the goroutine to actually start before checking status; this
|
||||
// proves RunAsyncMigration did not block on fn and that fn is running
|
||||
// concurrently.
|
||||
select {
|
||||
case <-started:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("async migration fn did not start within 2s — RunAsyncMigration may never have scheduled")
|
||||
}
|
||||
|
||||
status, err := s.AsyncMigrationStatus(name)
|
||||
if err != nil {
|
||||
t.Fatalf("AsyncMigrationStatus while running: %v", err)
|
||||
}
|
||||
if status != "pending_async" {
|
||||
t.Fatalf("status while fn running: got %q, want %q", status, "pending_async")
|
||||
}
|
||||
|
||||
close(release)
|
||||
|
||||
// Poll for transition to done.
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
status, err = s.AsyncMigrationStatus(name)
|
||||
if err == nil && status == "done" {
|
||||
return
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("status never transitioned to done within 2s: got %q (err=%v)", status, err)
|
||||
}
|
||||
|
||||
// TestRunAsyncMigration_PanicCapture proves that a panic inside fn does NOT
|
||||
// leak past the recover, AND that the migration row transitions to
|
||||
// "failed" with the panic message captured — NOT silently to "done".
|
||||
// Operator visibility into mid-migration crashes is the whole point.
|
||||
func TestRunAsyncMigration_PanicCapture(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
const name = "test_panic_capture_v1"
|
||||
|
||||
if err := s.RunAsyncMigration(context.Background(), name,
|
||||
func(ctx context.Context, db *sql.DB) error {
|
||||
panic("synthetic boom")
|
||||
}); err != nil {
|
||||
t.Fatalf("RunAsyncMigration returned error: %v", err)
|
||||
}
|
||||
|
||||
s.WaitForAsyncMigrations()
|
||||
|
||||
status, err := s.AsyncMigrationStatus(name)
|
||||
if err != nil {
|
||||
t.Fatalf("status lookup: %v", err)
|
||||
}
|
||||
if status != "failed" {
|
||||
t.Fatalf("status after panic: got %q, want %q (silent-done would be catastrophic)", status, "failed")
|
||||
}
|
||||
|
||||
var errMsg sql.NullString
|
||||
if err := s.db.QueryRow(`SELECT error FROM _async_migrations WHERE name = ?`, name).Scan(&errMsg); err != nil {
|
||||
t.Fatalf("error column lookup: %v", err)
|
||||
}
|
||||
if !errMsg.Valid || errMsg.String == "" {
|
||||
t.Fatalf("error column empty after panic — operator has no clue what failed")
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunAsyncMigration_IdempotentSecondCallNoOps verifies that calling
|
||||
// RunAsyncMigration a second time with the same name AFTER it has reached
|
||||
// "done" status does NOT re-run fn. This protects the prod path: ingestor
|
||||
// restarts must not rebuild already-built indexes.
|
||||
func TestRunAsyncMigration_IdempotentSecondCallNoOps(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
const name = "test_idempotent_v1"
|
||||
|
||||
var calls int32
|
||||
fn := func(ctx context.Context, db *sql.DB) error {
|
||||
atomic.AddInt32(&calls, 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := s.RunAsyncMigration(context.Background(), name, fn); err != nil {
|
||||
t.Fatalf("first call: %v", err)
|
||||
}
|
||||
s.WaitForAsyncMigrations()
|
||||
waitForStatus(t, s, name, "done", 2*time.Second)
|
||||
|
||||
// Second call must short-circuit; fn must not be invoked again.
|
||||
if err := s.RunAsyncMigration(context.Background(), name, fn); err != nil {
|
||||
t.Fatalf("second call: %v", err)
|
||||
}
|
||||
s.WaitForAsyncMigrations()
|
||||
|
||||
if got := atomic.LoadInt32(&calls); got != 1 {
|
||||
t.Fatalf("fn invoked %d times, want 1 (done-state row must short-circuit)", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunAsyncMigration_RestartSafetyFailedIsRetried simulates a crashed
|
||||
// previous run: a row exists in `failed` state from a prior boot. The next
|
||||
// RunAsyncMigration call MUST re-schedule fn (reset to pending_async, then
|
||||
// run it), not leave the migration stuck in `failed` forever.
|
||||
func TestRunAsyncMigration_RestartSafetyFailedIsRetried(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
const name = "test_restart_failed_v1"
|
||||
|
||||
if err := ensureAsyncMigrationsTable(s.db); err != nil {
|
||||
t.Fatalf("ensure table: %v", err)
|
||||
}
|
||||
if _, err := s.db.Exec(`INSERT INTO _async_migrations (name, status, error) VALUES (?, 'failed', 'simulated prior crash')`, name); err != nil {
|
||||
t.Fatalf("seed failed row: %v", err)
|
||||
}
|
||||
|
||||
var calls int32
|
||||
if err := s.RunAsyncMigration(context.Background(), name,
|
||||
func(ctx context.Context, db *sql.DB) error {
|
||||
atomic.AddInt32(&calls, 1)
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("RunAsyncMigration on failed row: %v", err)
|
||||
}
|
||||
s.WaitForAsyncMigrations()
|
||||
waitForStatus(t, s, name, "done", 2*time.Second)
|
||||
|
||||
if got := atomic.LoadInt32(&calls); got != 1 {
|
||||
t.Fatalf("fn invoked %d times, want 1 (failed-state row must be retried)", got)
|
||||
}
|
||||
|
||||
// And the error column must be cleared on success.
|
||||
var errCol sql.NullString
|
||||
if err := s.db.QueryRow(`SELECT error FROM _async_migrations WHERE name = ?`, name).Scan(&errCol); err != nil {
|
||||
t.Fatalf("error col: %v", err)
|
||||
}
|
||||
if errCol.Valid && errCol.String != "" {
|
||||
t.Fatalf("error column not cleared on retry success: %q", errCol.String)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunAsyncMigration_RestartSafetyPendingIsRetried simulates the
|
||||
// ingestor crashing while a migration was still in `pending_async` (the
|
||||
// goroutine never finished). On next boot the migration MUST be re-picked-up
|
||||
// — leaving it stuck in pending forever would be a silent prod outage.
|
||||
func TestRunAsyncMigration_RestartSafetyPendingIsRetried(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
const name = "test_restart_pending_v1"
|
||||
|
||||
if err := ensureAsyncMigrationsTable(s.db); err != nil {
|
||||
t.Fatalf("ensure table: %v", err)
|
||||
}
|
||||
if _, err := s.db.Exec(`INSERT INTO _async_migrations (name, status) VALUES (?, 'pending_async')`, name); err != nil {
|
||||
t.Fatalf("seed pending row: %v", err)
|
||||
}
|
||||
|
||||
var calls int32
|
||||
if err := s.RunAsyncMigration(context.Background(), name,
|
||||
func(ctx context.Context, db *sql.DB) error {
|
||||
atomic.AddInt32(&calls, 1)
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("RunAsyncMigration on pending row: %v", err)
|
||||
}
|
||||
s.WaitForAsyncMigrations()
|
||||
waitForStatus(t, s, name, "done", 2*time.Second)
|
||||
|
||||
if got := atomic.LoadInt32(&calls); got != 1 {
|
||||
t.Fatalf("fn invoked %d times, want 1 (pending row must be retried after crash)", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunAsyncMigration_FnErrorRecorded covers the non-panic failure path:
|
||||
// fn returns an error → status MUST be "failed" with the error captured.
|
||||
func TestRunAsyncMigration_FnErrorRecorded(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
const name = "test_fn_error_v1"
|
||||
|
||||
if err := s.RunAsyncMigration(context.Background(), name,
|
||||
func(ctx context.Context, db *sql.DB) error {
|
||||
return fmt.Errorf("simulated migration error")
|
||||
}); err != nil {
|
||||
t.Fatalf("RunAsyncMigration: %v", err)
|
||||
}
|
||||
s.WaitForAsyncMigrations()
|
||||
|
||||
status, err := s.AsyncMigrationStatus(name)
|
||||
if err != nil {
|
||||
t.Fatalf("status: %v", err)
|
||||
}
|
||||
if status != "failed" {
|
||||
t.Fatalf("status: got %q, want failed", status)
|
||||
}
|
||||
|
||||
var errCol sql.NullString
|
||||
if err := s.db.QueryRow(`SELECT error FROM _async_migrations WHERE name = ?`, name).Scan(&errCol); err != nil {
|
||||
t.Fatalf("error col: %v", err)
|
||||
}
|
||||
if !errCol.Valid || errCol.String == "" {
|
||||
t.Fatalf("error column empty after fn error")
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunAsyncMigration_SameNameConcurrent_FnRunsOnce validates the
|
||||
// single-process concurrency invariant: many goroutines calling
|
||||
// RunAsyncMigration(name=X) at the same instant must NEVER execute fn more
|
||||
// than once, AND every caller must receive a nil error (none should hit
|
||||
// the "UNIQUE constraint failed" race that the previous SELECT-then-INSERT
|
||||
// implementation was vulnerable to).
|
||||
//
|
||||
// The fix relies on:
|
||||
// - atomic INSERT ... ON CONFLICT DO UPDATE RETURNING (no SELECT-then-INSERT TOCTOU)
|
||||
// - sync.Map in-process guard (no double-launch on shared pending_async)
|
||||
//
|
||||
// Previously named TestRunAsyncMigration_ConcurrentSameNameSerialized; the
|
||||
// old test was tautological (it asserted calls ∈ [1..5], satisfiable by
|
||||
// any broken impl).
|
||||
func TestRunAsyncMigration_SameNameConcurrent_FnRunsOnce(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
const name = "test_concurrent_serialize_v1"
|
||||
const callers = 5
|
||||
|
||||
var calls int32
|
||||
fn := func(ctx context.Context, db *sql.DB) error {
|
||||
atomic.AddInt32(&calls, 1)
|
||||
// Hold long enough to guarantee all other goroutines have
|
||||
// observed the in-flight state and exited their RunAsyncMigration
|
||||
// returns. Without this the in-process guard cleanup could race
|
||||
// with late entrants.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start-barrier so all goroutines wake at the same instant — maximizes
|
||||
// the chance of triggering any latent race.
|
||||
start := make(chan struct{})
|
||||
errs := make(chan error, callers)
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < callers; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
<-start
|
||||
errs <- s.RunAsyncMigration(context.Background(), name, fn)
|
||||
}()
|
||||
}
|
||||
close(start)
|
||||
wg.Wait()
|
||||
close(errs)
|
||||
|
||||
// Capture errors — every caller MUST get nil. Old impl was prone to
|
||||
// "UNIQUE constraint failed: _async_migrations.name" on the loser of
|
||||
// the SELECT-then-INSERT race.
|
||||
for err := range errs {
|
||||
if err != nil {
|
||||
t.Fatalf("RunAsyncMigration returned error from concurrent caller: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
s.WaitForAsyncMigrations()
|
||||
waitForStatus(t, s, name, "done", 2*time.Second)
|
||||
|
||||
if got := atomic.LoadInt32(&calls); got != 1 {
|
||||
t.Fatalf("fn invoked %d times, want exactly 1 (concurrent same-name calls must serialize to one execution)", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunAsyncMigration_NilDBReturnsError ensures that calling
|
||||
// RunAsyncMigration on a *Store with a nil db handle returns a clear
|
||||
// sentinel error rather than panicking. This is a programmer-error guard:
|
||||
// the helper is concurrency-critical and a nil-deref panic would crash
|
||||
// the ingestor in a confusing way.
|
||||
func TestRunAsyncMigration_NilDBReturnsError(t *testing.T) {
|
||||
var s Store // zero value, db is nil
|
||||
err := s.RunAsyncMigration(context.Background(), "nil_db_test",
|
||||
func(ctx context.Context, db *sql.DB) error { return nil })
|
||||
if err == nil {
|
||||
t.Fatal("RunAsyncMigration on nil db: got nil error, want ErrNilStoreDB")
|
||||
}
|
||||
if err != ErrNilStoreDB {
|
||||
t.Fatalf("RunAsyncMigration on nil db: got %v, want ErrNilStoreDB", err)
|
||||
}
|
||||
|
||||
// Same for AsyncMigrationStatus — must not panic.
|
||||
if _, err := s.AsyncMigrationStatus("nil_db_test"); err != ErrNilStoreDB {
|
||||
t.Fatalf("AsyncMigrationStatus on nil db: got %v, want ErrNilStoreDB", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunAsyncMigration_CtxCancelRecorded verifies that when ctx is
|
||||
// cancelled and fn returns ctx.Err(), the migration is recorded as
|
||||
// `failed` with "context canceled" in the error column. This is the
|
||||
// graceful-shutdown contract: a cancelled migration should NOT be marked
|
||||
// done (it didn't finish), but the failure mode must be observable.
|
||||
func TestRunAsyncMigration_CtxCancelRecorded(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
const name = "test_ctx_cancel_v1"
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
started := make(chan struct{})
|
||||
|
||||
if err := s.RunAsyncMigration(ctx, name, func(ctx context.Context, db *sql.DB) error {
|
||||
close(started)
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}); err != nil {
|
||||
t.Fatalf("RunAsyncMigration: %v", err)
|
||||
}
|
||||
|
||||
<-started
|
||||
cancel()
|
||||
s.WaitForAsyncMigrations()
|
||||
|
||||
status, err := s.AsyncMigrationStatus(name)
|
||||
if err != nil {
|
||||
t.Fatalf("status: %v", err)
|
||||
}
|
||||
if status != "failed" {
|
||||
t.Fatalf("status after ctx cancel: got %q, want failed", status)
|
||||
}
|
||||
|
||||
var errCol sql.NullString
|
||||
if err := s.db.QueryRow(`SELECT error FROM _async_migrations WHERE name = ?`, name).Scan(&errCol); err != nil {
|
||||
t.Fatalf("error col: %v", err)
|
||||
}
|
||||
if !errCol.Valid || !strings.Contains(errCol.String, "context canceled") {
|
||||
t.Fatalf("error column should contain %q, got %q", "context canceled", errCol.String)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunAsyncMigration_CrossProcessSerialized verifies the cross-process
|
||||
// concurrency claim made in MIGRATIONS.md: two distinct *sql.DB handles
|
||||
// (simulating two ingestor instances) opened against the same file,
|
||||
// concurrently calling RunAsyncMigration(name=X), must NEVER execute fn
|
||||
// more than once total. SQLite serializes writes via its file-level lock,
|
||||
// and the atomic INSERT ON CONFLICT pattern means the loser sees the
|
||||
// winner's `pending_async` row and short-circuits.
|
||||
//
|
||||
// NOTE: the in-process sync.Map guard does NOT cross processes; the
|
||||
// cross-process safety comes solely from SQLite's write serialization +
|
||||
// the atomic ON CONFLICT. This test pins that property.
|
||||
func TestRunAsyncMigration_CrossProcessSerialized(t *testing.T) {
|
||||
// Two stores against the same DB file simulate two processes.
|
||||
dir := t.TempDir()
|
||||
dbPath := dir + "/cross.db"
|
||||
|
||||
s1, err := OpenStore(dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("open s1: %v", err)
|
||||
}
|
||||
defer s1.Close()
|
||||
|
||||
s2, err := OpenStore(dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("open s2: %v", err)
|
||||
}
|
||||
defer s2.Close()
|
||||
|
||||
// Sanity: distinct *sql.DB handles.
|
||||
if s1.db == s2.db {
|
||||
t.Fatal("expected distinct *sql.DB handles")
|
||||
}
|
||||
|
||||
const name = "test_cross_process_v1"
|
||||
var calls int32
|
||||
fn := func(ctx context.Context, db *sql.DB) error {
|
||||
atomic.AddInt32(&calls, 1)
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
return nil
|
||||
}
|
||||
|
||||
start := make(chan struct{})
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
<-start
|
||||
if err := s1.RunAsyncMigration(context.Background(), name, fn); err != nil {
|
||||
t.Errorf("s1.RunAsyncMigration: %v", err)
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
<-start
|
||||
if err := s2.RunAsyncMigration(context.Background(), name, fn); err != nil {
|
||||
t.Errorf("s2.RunAsyncMigration: %v", err)
|
||||
}
|
||||
}()
|
||||
close(start)
|
||||
wg.Wait()
|
||||
|
||||
s1.WaitForAsyncMigrations()
|
||||
s2.WaitForAsyncMigrations()
|
||||
|
||||
// Both stores should agree status==done; SQLite write serialization
|
||||
// guarantees the second INSERT sees the first's row.
|
||||
st1, _ := s1.AsyncMigrationStatus(name)
|
||||
st2, _ := s2.AsyncMigrationStatus(name)
|
||||
if st1 != "done" || st2 != "done" {
|
||||
t.Fatalf("expected done/done, got s1=%q s2=%q", st1, st2)
|
||||
}
|
||||
|
||||
if got := atomic.LoadInt32(&calls); got != 1 {
|
||||
t.Fatalf("fn invoked %d times across two processes, want exactly 1 (SQLite write serialization + ON CONFLICT must dedupe)", got)
|
||||
}
|
||||
}
|
||||
+28
-7
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@@ -124,6 +125,27 @@ func OpenStoreWithInterval(dbPath string, sampleIntervalSec int) (*Store, error)
|
||||
return nil, fmt.Errorf("preparing statements: %w", err)
|
||||
}
|
||||
|
||||
// Schedule async migrations. These must NOT block boot. See
|
||||
// async_migration.go for the convention.
|
||||
// PREFLIGHT: async=true reason="composite index build on observations (1.9M+ rows in prod) — converted from sync after v3.8.3"
|
||||
var idxDone int
|
||||
if s.db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'obs_observer_ts_idx_v1'").Scan(&idxDone) != nil {
|
||||
if err := s.RunAsyncMigration(context.Background(), "obs_observer_ts_idx_v1",
|
||||
func(ctx context.Context, d *sql.DB) error {
|
||||
log.Println("[migration/async] Building (observer_idx, timestamp) composite index on observations...")
|
||||
if _, err := d.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS idx_observations_observer_idx_timestamp ON observations(observer_idx, timestamp)`); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := d.ExecContext(ctx, `INSERT OR IGNORE INTO _migrations (name) VALUES ('obs_observer_ts_idx_v1')`); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Println("[migration/async] observations(observer_idx, timestamp) index created")
|
||||
return nil
|
||||
}); err != nil {
|
||||
log.Printf("[migration/async] scheduling obs_observer_ts_idx_v1 failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
@@ -368,13 +390,12 @@ func applySchema(db *sql.DB) error {
|
||||
// timestamp WHERE filter; a composite (observer_idx, timestamp)
|
||||
// index lets SQLite resolve the grouping + range filter from the
|
||||
// index alone instead of a 1.9M-row scan.
|
||||
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'obs_observer_ts_idx_v1'")
|
||||
if row.Scan(&migDone) != nil {
|
||||
log.Println("[migration] Adding (observer_idx, timestamp) composite index on observations...")
|
||||
db.Exec(`CREATE INDEX IF NOT EXISTS idx_observations_observer_idx_timestamp ON observations(observer_idx, timestamp)`)
|
||||
db.Exec(`INSERT INTO _migrations (name) VALUES ('obs_observer_ts_idx_v1')`)
|
||||
log.Println("[migration] observations(observer_idx, timestamp) index created")
|
||||
}
|
||||
//
|
||||
// CONVERTED TO ASYNC (preflight-async-migration-gate). Scheduling
|
||||
// happens in OpenStore() once the real *Store exists so the
|
||||
// backfill WaitGroup is shared with the rest of the ingestor.
|
||||
// The legacy `_migrations` gate is preserved by the async fn so
|
||||
// DBs that already completed the sync build stay no-op.
|
||||
|
||||
// #1483: normalize nodes.public_key to lowercase. The server's
|
||||
// GetNodeLocationsByKeys lookup dropped LOWER(public_key) for perf
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
// Fixture: migration block WITHOUT an async annotation and WITHOUT being
|
||||
// wrapped in the async-migration helper. This file exists ONLY so that
|
||||
// ~/.openclaw/skills/pr-preflight/scripts/check-async-migrations.sh
|
||||
// has a known-bad sample to test against (the script is invoked with
|
||||
// BASE pointing at master and FIXTURE_DIR pointing here).
|
||||
//
|
||||
// DO NOT add a PREFLIGHT annotation to this file. DO NOT wrap the
|
||||
// migration via the async helper. The check script's correctness
|
||||
// depends on this staying BAD.
|
||||
//
|
||||
// IMPORTANT: this file must NOT contain the literal identifier of the
|
||||
// async-helper function anywhere (comments, strings, identifiers). The
|
||||
// preflight gate greps a window of lines above the migration for that
|
||||
// identifier as an "OK" signal, so mentioning it here would cause the
|
||||
// gate to *pass* this fixture — defeating its purpose. Refer to the
|
||||
// helper only obliquely as "the async-migration helper" in prose.
|
||||
package fixtures
|
||||
|
||||
const _ = `
|
||||
CREATE INDEX idx_observations_bad_sync_v1 ON observations(observer_idx, timestamp);
|
||||
`
|
||||
@@ -0,0 +1,9 @@
|
||||
// Fixture: migration block WITH an async annotation. Companion to
|
||||
// bad_sync_migration.go. The preflight check script must accept this
|
||||
// because of the PREFLIGHT line directly above the migration.
|
||||
package fixtures
|
||||
|
||||
// PREFLIGHT: async=true reason="fixture-only — ALTER ADD COLUMN is O(1) in sqlite"
|
||||
const _ = `
|
||||
ALTER TABLE observations ADD COLUMN annotated_good_fixture_col INTEGER DEFAULT 0;
|
||||
`
|
||||
Reference in New Issue
Block a user