test(ingestor): red — RunAsyncMigration pending_async→done contract

Pins the recurring 'sync migration on large table blocks startup'
regression class (#791, #1483). Asserts:
  1. RunAsyncMigration registers the name as pending_async immediately.
  2. Returns without blocking on fn.
  3. Status transitions to done after fn completes.

Ships with a STUB RunAsyncMigration / AsyncMigrationStatus so the test
compiles + runs to assertion failure (not build failure). Green commit
follows with the real implementation + retroactive #1483 conversion +
docs.
This commit is contained in:
clawbot
2026-06-03 20:21:41 +00:00
parent ef1229a806
commit 2c6744ccea
2 changed files with 119 additions and 0 deletions
+54
View File
@@ -0,0 +1,54 @@
// Async migration helper — runs schema/backfill work that may take minutes on
// large prod tables WITHOUT blocking ingestor startup.
//
// MIGRATION ANNOTATION CONVENTION (read this before touching migrations):
//
// Sync schema/data migrations (CREATE INDEX, ALTER TABLE, UPDATE ... WHERE)
// that run inline during OpenStore() block the ingestor from accepting
// packets until they finish. On an empty dev DB they return in milliseconds;
// at prod scale (1.9M+ observations, 80K+ adverts) they can pin the boot
// for minutes and trigger restart loops. This regression class has bitten us
// repeatedly (#791 resolved_path backfill, #1483 obs_observer_ts_idx_v1).
//
// ANY new CREATE INDEX / ALTER TABLE / data-rewrite migration MUST EITHER:
// 1. Run via Store.RunAsyncMigration(...) below (preferred for backfills
// and any work that may touch >1K rows). The migration is recorded as
// `pending_async` immediately, returns to the caller (boot proceeds),
// and completes in a goroutine. Status flips to `done` (or `failed`
// with an error message) when fn returns.
// 2. Carry the preflight annotation comment immediately above the
// migration block, e.g.
// // PREFLIGHT: async=true reason="<one-line justification>"
// Use this for migrations that are genuinely cheap at any scale
// (e.g. ALTER TABLE ADD COLUMN, CREATE INDEX on a known-bounded
// table). The annotation is grepped by
// ~/.openclaw/skills/pr-preflight/scripts/check-async-migrations.sh
// — its absence on a touched migration block is a hard-fail gate.
//
// See MIGRATIONS.md in the repo root for the full policy and examples.
package main
import (
"context"
"database/sql"
)
// RunAsyncMigration registers `name` as a pending async migration and
// schedules `fn` to run in a background goroutine. It returns to the caller
// immediately so the ingestor can keep booting.
//
// STUB: red-commit version. Implementation lands in the green commit; the
// failing test in async_migration_test.go pins the contract.
func (s *Store) RunAsyncMigration(ctx context.Context, name string, fn func(context.Context, *sql.DB) error) error {
return nil
}
// AsyncMigrationStatus returns the current status of an async migration
// (one of "pending_async", "done", "failed") or sql.ErrNoRows if no such
// migration has been registered.
//
// STUB: red-commit version.
func (s *Store) AsyncMigrationStatus(name string) (string, error) {
return "", sql.ErrNoRows
}
+65
View File
@@ -0,0 +1,65 @@
package main
import (
"context"
"database/sql"
"testing"
"time"
)
// TestRunAsyncMigration_PendingThenDone pins the contract for RunAsyncMigration:
//
// 1. After calling, the migration name MUST be queryable in the migrations
// table with status `pending_async` IMMEDIATELY (no waiting for fn).
// 2. After fn returns, the status MUST transition to `done`.
// 3. RunAsyncMigration MUST return without blocking on fn.
//
// This is the regression test for the recurring "sync migration on large
// table blocks ingestor startup" class (#791, #1483, ...). If this test
// fails the contract is broken — do not relax it; fix the runner.
func TestRunAsyncMigration_PendingThenDone(t *testing.T) {
s := newTestStore(t)
ctx := context.Background()
started := make(chan struct{})
release := make(chan struct{})
const name = "test_async_migration_v1"
if err := s.RunAsyncMigration(ctx, name, func(ctx context.Context, db *sql.DB) error {
close(started)
<-release
return nil
}); err != nil {
t.Fatalf("RunAsyncMigration returned error: %v", err)
}
// Wait for the goroutine to actually start before checking status; this
// proves RunAsyncMigration did not block on fn and that fn is running
// concurrently.
select {
case <-started:
case <-time.After(2 * time.Second):
t.Fatal("async migration fn did not start within 2s — RunAsyncMigration may have blocked or never scheduled")
}
status, err := s.AsyncMigrationStatus(name)
if err != nil {
t.Fatalf("AsyncMigrationStatus while running: %v", err)
}
if status != "pending_async" {
t.Fatalf("status while fn running: got %q, want %q", status, "pending_async")
}
close(release)
// Poll for transition to done.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
status, err = s.AsyncMigrationStatus(name)
if err == nil && status == "done" {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("status never transitioned to done within 2s: got %q (err=%v)", status, err)
}