mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-06-10 13:41:42 +00:00
test(ingestor): red — RunAsyncMigration pending_async→done contract
Pins the recurring 'sync migration on large table blocks startup' regression class (#791, #1483). Asserts: 1. RunAsyncMigration registers the name as pending_async immediately. 2. Returns without blocking on fn. 3. Status transitions to done after fn completes. Ships with a STUB RunAsyncMigration / AsyncMigrationStatus so the test compiles + runs to assertion failure (not build failure). Green commit follows with the real implementation + retroactive #1483 conversion + docs.
This commit is contained in:
@@ -0,0 +1,54 @@
|
||||
// Async migration helper — runs schema/backfill work that may take minutes on
|
||||
// large prod tables WITHOUT blocking ingestor startup.
|
||||
//
|
||||
// MIGRATION ANNOTATION CONVENTION (read this before touching migrations):
|
||||
//
|
||||
// Sync schema/data migrations (CREATE INDEX, ALTER TABLE, UPDATE ... WHERE)
|
||||
// that run inline during OpenStore() block the ingestor from accepting
|
||||
// packets until they finish. On an empty dev DB they return in milliseconds;
|
||||
// at prod scale (1.9M+ observations, 80K+ adverts) they can pin the boot
|
||||
// for minutes and trigger restart loops. This regression class has bitten us
|
||||
// repeatedly (#791 resolved_path backfill, #1483 obs_observer_ts_idx_v1).
|
||||
//
|
||||
// ANY new CREATE INDEX / ALTER TABLE / data-rewrite migration MUST EITHER:
|
||||
// 1. Run via Store.RunAsyncMigration(...) below (preferred for backfills
|
||||
// and any work that may touch >1K rows). The migration is recorded as
|
||||
// `pending_async` immediately, returns to the caller (boot proceeds),
|
||||
// and completes in a goroutine. Status flips to `done` (or `failed`
|
||||
// with an error message) when fn returns.
|
||||
// 2. Carry the preflight annotation comment immediately above the
|
||||
// migration block, e.g.
|
||||
// // PREFLIGHT: async=true reason="<one-line justification>"
|
||||
// Use this for migrations that are genuinely cheap at any scale
|
||||
// (e.g. ALTER TABLE ADD COLUMN, CREATE INDEX on a known-bounded
|
||||
// table). The annotation is grepped by
|
||||
// ~/.openclaw/skills/pr-preflight/scripts/check-async-migrations.sh
|
||||
// — its absence on a touched migration block is a hard-fail gate.
|
||||
//
|
||||
// See MIGRATIONS.md in the repo root for the full policy and examples.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
// RunAsyncMigration registers `name` as a pending async migration and
|
||||
// schedules `fn` to run in a background goroutine. It returns to the caller
|
||||
// immediately so the ingestor can keep booting.
|
||||
//
|
||||
// STUB: red-commit version. Implementation lands in the green commit; the
|
||||
// failing test in async_migration_test.go pins the contract.
|
||||
func (s *Store) RunAsyncMigration(ctx context.Context, name string, fn func(context.Context, *sql.DB) error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// AsyncMigrationStatus returns the current status of an async migration
|
||||
// (one of "pending_async", "done", "failed") or sql.ErrNoRows if no such
|
||||
// migration has been registered.
|
||||
//
|
||||
// STUB: red-commit version.
|
||||
func (s *Store) AsyncMigrationStatus(name string) (string, error) {
|
||||
return "", sql.ErrNoRows
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestRunAsyncMigration_PendingThenDone pins the contract for RunAsyncMigration:
|
||||
//
|
||||
// 1. After calling, the migration name MUST be queryable in the migrations
|
||||
// table with status `pending_async` IMMEDIATELY (no waiting for fn).
|
||||
// 2. After fn returns, the status MUST transition to `done`.
|
||||
// 3. RunAsyncMigration MUST return without blocking on fn.
|
||||
//
|
||||
// This is the regression test for the recurring "sync migration on large
|
||||
// table blocks ingestor startup" class (#791, #1483, ...). If this test
|
||||
// fails the contract is broken — do not relax it; fix the runner.
|
||||
func TestRunAsyncMigration_PendingThenDone(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
started := make(chan struct{})
|
||||
release := make(chan struct{})
|
||||
|
||||
const name = "test_async_migration_v1"
|
||||
if err := s.RunAsyncMigration(ctx, name, func(ctx context.Context, db *sql.DB) error {
|
||||
close(started)
|
||||
<-release
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("RunAsyncMigration returned error: %v", err)
|
||||
}
|
||||
|
||||
// Wait for the goroutine to actually start before checking status; this
|
||||
// proves RunAsyncMigration did not block on fn and that fn is running
|
||||
// concurrently.
|
||||
select {
|
||||
case <-started:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("async migration fn did not start within 2s — RunAsyncMigration may have blocked or never scheduled")
|
||||
}
|
||||
|
||||
status, err := s.AsyncMigrationStatus(name)
|
||||
if err != nil {
|
||||
t.Fatalf("AsyncMigrationStatus while running: %v", err)
|
||||
}
|
||||
if status != "pending_async" {
|
||||
t.Fatalf("status while fn running: got %q, want %q", status, "pending_async")
|
||||
}
|
||||
|
||||
close(release)
|
||||
|
||||
// Poll for transition to done.
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
status, err = s.AsyncMigrationStatus(name)
|
||||
if err == nil && status == "done" {
|
||||
return
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("status never transitioned to done within 2s: got %q (err=%v)", status, err)
|
||||
}
|
||||
Reference in New Issue
Block a user