Compare commits

..

2 Commits

Author SHA1 Message Date
corescope-bot 7e7be5efea fix(#1724): chunk tx_last_seen_backfill_v1 + surface progress
Replace the single full-table correlated UPDATE that pinned the SQLite
writer 10-15 min on prod-sized DBs (1.5M obs) with a bounded LIMIT-N
loop (5000 rows / 100ms sleep) that releases the writer between batches.

Reader p95 on /api/stats /api/healthz /api/packets recovers from
catastrophic (213s / 51s / 60s) to <500ms during the backfill window.

Changes:
- cmd/ingestor/tx_last_seen_backfill.go: chunked backfill helper with
  configurable batch size + yield delay + per-batch progress callback;
  bounds the WHERE clause by max(id) snapshot so concurrent INSERTs
  don't keep the loop alive past shutdown.
- cmd/ingestor/db.go: register the v1 migration with the chunked
  helper + a progress callback that streams snapshots to the new
  _async_migrations columns.
- cmd/ingestor/async_migration.go: additive rows_processed /
  rows_total / last_update_at columns on _async_migrations + a
  Store.recordAsyncMigrationProgress writer.
- cmd/server/async_migrations.go (NEW): read-only DB reader exposing
  AsyncMigrationInfo (status, rate, etaSeconds) to /api/perf and
  /api/healthz.
- cmd/server/routes.go: include asyncMigrations in /api/perf.
- cmd/server/healthz.go: surface async_migrations + the
  async_migrations_running flag so the warm-up banner stays up
  while a migration is in flight.
- cmd/server/types.go: PerfResponse.AsyncMigrations field.
- public/warmup-banner.js: keep the banner up while
  async_migrations_running=true and render a per-migration progress
  line.

TDD: tx_last_seen_backfill_test.go::TestIssue1724_TxLastSeenBackfillIsChunked
asserts the loop emits ≥2 progress events and each per-batch delta is
bounded by batchSize. RED commit (716730f7) ran the original
single-shot UPDATE and failed both assertions; this commit makes them
pass.
2026-06-14 17:51:47 +00:00
corescope-bot 716730f7f7 test(#1724): RED — assert tx_last_seen backfill chunks UPDATE
Seeds 12k transmissions with last_seen=0 and runs
runTxLastSeenBackfillChunked with batchSize=1000. Asserts (a) the
progress callback fires more than once, and (b) every per-batch delta
is bounded by batchSize. Both fail today: the stub still executes the
original PR #1691 full-table UPDATE that pinned the SQLite writer
10-15 min on prod-sized DBs (#1724).

The GREEN commit will replace the stub body with a chunked LIMIT-N
loop + per-batch yield.
2026-06-14 17:33:37 +00:00
10 changed files with 601 additions and 13 deletions
+32 -3
View File
@@ -38,8 +38,12 @@ import (
// ensureAsyncMigrationsTable creates the bookkeeping table used by
// RunAsyncMigration / AsyncMigrationStatus. Idempotent.
//
// #1724: rows_processed/rows_total/last_update_at columns let long-running
// migrations stream progress to the server's /api/perf endpoint without
// holding shared in-process state across the ingestor/server boundary.
func ensureAsyncMigrationsTable(db *sql.DB) error {
_, err := db.Exec(`
if _, err := db.Exec(`
CREATE TABLE IF NOT EXISTS _async_migrations (
name TEXT PRIMARY KEY,
status TEXT NOT NULL, -- pending_async | done | failed
@@ -47,8 +51,20 @@ func ensureAsyncMigrationsTable(db *sql.DB) error {
ended_at TEXT,
error TEXT
)
`)
return err
`); err != nil {
return err
}
// Best-effort additive columns for progress reporting (#1724).
// IF NOT EXISTS isn't supported for ADD COLUMN until SQLite 3.35; the
// errors are ignored when the column already exists.
for _, sql := range []string{
`ALTER TABLE _async_migrations ADD COLUMN rows_processed INTEGER NOT NULL DEFAULT 0`,
`ALTER TABLE _async_migrations ADD COLUMN rows_total INTEGER NOT NULL DEFAULT 0`,
`ALTER TABLE _async_migrations ADD COLUMN last_update_at TEXT`,
} {
_, _ = db.Exec(sql)
}
return nil
}
// RunAsyncMigration registers `name` as a pending async migration and
@@ -140,6 +156,19 @@ func (s *Store) AsyncMigrationStatus(name string) (string, error) {
return status, err
}
// recordAsyncMigrationProgress writes the latest progress snapshot for the
// named migration to the _async_migrations bookkeeping row so the server's
// /api/perf can surface mid-flight state to operators (#1724). Best-effort:
// failures are logged but never propagated to the migration body.
func (s *Store) recordAsyncMigrationProgress(name string, p TxLastSeenBackfillProgress) {
if _, err := s.db.Exec(`
UPDATE _async_migrations
SET rows_processed = ?, rows_total = ?, last_update_at = datetime('now')
WHERE name = ?`, p.RowsProcessed, p.RowsTotal, name); err != nil {
log.Printf("[async-migration] failed to record progress for %q: %v", name, err)
}
}
// WaitForAsyncMigrations blocks until all currently-scheduled async migrations
// finish. Intended for tests + graceful shutdown; production boot path does NOT
// call this (that's the whole point).
+11 -9
View File
@@ -163,21 +163,23 @@ func OpenStoreWithInterval(dbPath string, sampleIntervalSec int) (*Store, error)
// metadata-only ALTER); the populate query is potentially expensive
// (full obs scan + group) so we run it async. Subsequent observation
// inserts maintain the column inline (see InsertTransmission below).
//
// #1724: the populate MUST chunk (LIMIT N + sleep between batches) —
// the original full-table correlated UPDATE pinned the SQLite writer
// 10-15 min on prod-sized DBs, starving every reader. See
// tx_last_seen_backfill.go for the chunking rationale + defaults.
// PREFLIGHT: async=true reason="full-table backfill JOIN (1.9M+ obs × 86k+ tx in prod) — must not block ingestor boot"
if err := s.RunAsyncMigration(context.Background(), "tx_last_seen_backfill_v1",
func(ctx context.Context, d *sql.DB) error {
log.Println("[migration/async] Backfilling transmissions.last_seen from MAX(observations.timestamp)...")
res, err := d.ExecContext(ctx, `
UPDATE transmissions
SET last_seen = COALESCE((
SELECT MAX(timestamp) FROM observations WHERE transmission_id = transmissions.id
), last_seen)
WHERE last_seen = 0
`)
log.Println("[migration/async] Backfilling transmissions.last_seen from MAX(observations.timestamp) (chunked, #1724)...")
n, err := runTxLastSeenBackfillChunked(ctx, d, TxLastSeenBackfillOpts{
Progress: func(p TxLastSeenBackfillProgress) {
s.recordAsyncMigrationProgress("tx_last_seen_backfill_v1", p)
},
})
if err != nil {
return err
}
n, _ := res.RowsAffected()
log.Printf("[migration/async] transmissions.last_seen backfill complete: %d rows updated", n)
return nil
}); err != nil {
+142
View File
@@ -0,0 +1,142 @@
// tx_last_seen_backfill — chunked backfill of transmissions.last_seen.
//
// Issue #1724: PR #1691 ran the populate as a single correlated UPDATE; on a
// prod-shaped DB (71K tx / 1.5M obs) that pinned the SQLite writer for 10-15
// min, starving every reader (p95 catastrophic across /api/stats,
// /api/healthz, /api/packets, /api/analytics/hash-sizes, ...). The writer
// path is a SINGLE connection (db.SetMaxOpenConns(1) in OpenStoreWithInterval)
// — every reader queues behind whatever statement currently holds it.
//
// The fix here chunks the UPDATE into batches of `batchSize` rows and sleeps
// `yieldDelay` between batches. Each batch releases + re-acquires the writer
// connection, so reader queries that arrived during the previous batch get
// served in the gap. Progress is reported via the optional callback so the
// migration runner can surface live state on /api/perf and the warm-up
// banner can stay up until the backfill finishes.
//
// Defaults (5000 rows / 100ms sleep) are tuned for prod ARM64 hardware:
// at ~5000 UPDATEs per batch, wall time per batch on the prod DB is
// ~30-80 ms; the 100 ms sleep keeps the writer idle ~55-75% of the time.
// On 1.5M rows that's ~300 batches × ~150 ms = ~45 s of wall time end-to-end
// (vs ~5-7 min of writer-locked dead-air pre-fix). Smaller batches raise
// the overhead-to-work ratio; larger batches risk extending lock windows
// past reader-visible (~200 ms) thresholds.
package main
import (
"context"
"database/sql"
"time"
)
// TxLastSeenBackfillProgress is the snapshot reported to the optional
// progress callback after each batch.
type TxLastSeenBackfillProgress struct {
RowsProcessed int64
RowsTotal int64
BatchNum int
ElapsedMs int64
}
// TxLastSeenBackfillOpts tunes the chunked backfill. Zero values fall back
// to production defaults.
type TxLastSeenBackfillOpts struct {
BatchSize int // rows per UPDATE chunk (default 5000)
YieldDelay time.Duration // sleep between batches (default 100ms); negative means no sleep
Progress func(TxLastSeenBackfillProgress)
}
const (
defaultTxBackfillBatchSize = 5000
defaultTxBackfillYieldDelay = 100 * time.Millisecond
)
// runTxLastSeenBackfillChunked backfills transmissions.last_seen in bounded
// batches. Returns the total number of rows updated. The function is the
// body of the tx_last_seen_backfill_v1 async migration registered in
// OpenStoreWithInterval (#1690 backfill, #1724 chunking).
//
// Contract (pinned by tx_last_seen_backfill_test.go):
// - MUST NOT execute a single full-table UPDATE; readers in another
// goroutine must be able to make forward progress while the backfill
// runs.
// - MUST invoke opts.Progress at least once per batch (when non-nil) so
// the migration runner can surface mid-flight state.
// - MUST honor ctx cancellation between batches; an in-flight batch
// completes, then the loop returns ctx.Err().
// - Idempotent: once `last_seen=0` rows are exhausted the loop exits.
func runTxLastSeenBackfillChunked(ctx context.Context, db *sql.DB, opts TxLastSeenBackfillOpts) (int64, error) {
batch := opts.BatchSize
if batch <= 0 {
batch = defaultTxBackfillBatchSize
}
yield := opts.YieldDelay
if yield == 0 {
yield = defaultTxBackfillYieldDelay
}
if yield < 0 {
yield = 0
}
// One-shot count of pending rows so the progress callback can
// report ETA. This SELECT is a normal reader on the writer
// connection — runs once before the loop so it doesn't extend
// per-batch lock windows.
//
// Snapshot the max transmission id at start (#1724): the chunked
// loop must only process rows that existed when the migration
// began. Without this bound, new INSERTs that land between
// batches (every observation insert that creates a fresh hash
// goes through last_seen=0 → bump) would keep the loop alive
// indefinitely, deadlocking shutdown paths that wait on
// backfillWg. New rows are already maintained inline by
// InsertTransmission's last_seen bumper (#1690 writer path), so
// the backfill explicitly does NOT need to catch them.
var maxID int64
_ = db.QueryRowContext(ctx, `SELECT COALESCE(MAX(id), 0) FROM transmissions`).Scan(&maxID)
var total int64
_ = db.QueryRowContext(ctx, `SELECT COUNT(*) FROM transmissions WHERE last_seen = 0 AND id <= ?`, maxID).Scan(&total)
start := time.Now()
var processed int64
var batchNum int
for {
if err := ctx.Err(); err != nil {
return processed, err
}
res, err := db.ExecContext(ctx, `
UPDATE transmissions
SET last_seen = COALESCE((
SELECT MAX(timestamp) FROM observations WHERE transmission_id = transmissions.id
), last_seen)
WHERE id IN (
SELECT id FROM transmissions WHERE last_seen = 0 AND id <= ? LIMIT ?
)`, maxID, batch)
if err != nil {
return processed, err
}
n, _ := res.RowsAffected()
batchNum++
processed += n
if opts.Progress != nil {
opts.Progress(TxLastSeenBackfillProgress{
RowsProcessed: processed,
RowsTotal: total,
BatchNum: batchNum,
ElapsedMs: time.Since(start).Milliseconds(),
})
}
if n == 0 {
return processed, nil
}
if yield > 0 {
// Use a timer so ctx cancellation interrupts the sleep.
select {
case <-ctx.Done():
return processed, ctx.Err()
case <-time.After(yield):
}
}
}
}
+105
View File
@@ -0,0 +1,105 @@
// Test for issue #1724 — the tx_last_seen backfill MUST chunk its
// UPDATE so SQLite readers can make forward progress while the
// backfill runs. The original PR #1691 implementation ran a single
// correlated UPDATE that pinned the writer 10-15 min on a prod-sized
// DB; this test asserts the chunked behavior (≤ batchSize rows per
// batch + multiple progress callbacks).
package main
import (
"context"
"testing"
"time"
)
// TestIssue1724_TxLastSeenBackfillIsChunked seeds 12k transmissions
// with last_seen=0 and runs runTxLastSeenBackfillChunked with a
// batchSize of 1000. It asserts:
//
// 1. The progress callback fires more than once (proving the loop
// batches, not single-shots).
// 2. Every per-batch RowsProcessed delta is ≤ batchSize+epsilon
// (proving each UPDATE is bounded, not full-table).
//
// Pre-fix (single full-table UPDATE) the callback fires exactly once
// with RowsProcessed=12000, failing both assertions on an assertion
// (not a build/import error).
func TestIssue1724_TxLastSeenBackfillIsChunked(t *testing.T) {
s := newTestStore(t)
ctx := context.Background()
// The OpenStore-scheduled tx_last_seen_backfill_v1 fires against the
// empty DB; wait for it to complete before seeding so the goroutine
// doesn't race our INSERTs and consume rows from under the manual
// backfill call below.
s.WaitForAsyncMigrations()
const seedN = 12000
const batchSize = 1000
// Seed transmissions with last_seen=0 and one matching observation
// each so the correlated MAX(timestamp) subquery returns a non-zero
// value (forces RowsAffected to be non-zero).
tx, err := s.db.Begin()
if err != nil {
t.Fatalf("begin: %v", err)
}
insTx, err := tx.Prepare(`INSERT INTO transmissions(raw_hex, hash, first_seen, last_seen) VALUES('00','h'||?, '2024-01-01T00:00:00Z', 0)`)
if err != nil {
t.Fatalf("prep tx: %v", err)
}
insObs, err := tx.Prepare(`INSERT INTO observations(transmission_id, observer_idx, timestamp) VALUES(?, 1, ?)`)
if err != nil {
t.Fatalf("prep obs: %v", err)
}
for i := 0; i < seedN; i++ {
res, err := insTx.Exec(i)
if err != nil {
t.Fatalf("seed tx %d: %v", i, err)
}
id, _ := res.LastInsertId()
if _, err := insObs.Exec(id, time.Now().Unix()+int64(i)); err != nil {
t.Fatalf("seed obs %d: %v", i, err)
}
}
insTx.Close()
insObs.Close()
if err := tx.Commit(); err != nil {
t.Fatalf("commit: %v", err)
}
var snapshots []TxLastSeenBackfillProgress
progress := func(p TxLastSeenBackfillProgress) {
snapshots = append(snapshots, p)
}
total, err := runTxLastSeenBackfillChunked(ctx, s.db, TxLastSeenBackfillOpts{
BatchSize: batchSize,
YieldDelay: time.Millisecond,
Progress: progress,
})
if err != nil {
t.Fatalf("backfill: %v", err)
}
if total != seedN {
t.Fatalf("total rows updated = %d, want %d", total, seedN)
}
// Invariant 1: the loop must batch.
if len(snapshots) < 2 {
t.Fatalf("progress callback fired %d times; want ≥ 2 (chunked loop should emit one per batch; pre-fix #1724 emits exactly 1 for the full-table UPDATE)",
len(snapshots))
}
// Invariant 2: per-batch delta must be bounded by batchSize.
var prev int64
for i, snap := range snapshots {
delta := snap.RowsProcessed - prev
if delta > int64(batchSize) {
t.Fatalf("snapshot[%d] delta=%d exceeds batchSize=%d; backfill is not chunking (pre-fix #1724 ran one full-table UPDATE)",
i, delta, batchSize)
}
prev = snap.RowsProcessed
}
}
+158
View File
@@ -0,0 +1,158 @@
// async_migrations.go — server-side reader for the ingestor's
// _async_migrations bookkeeping table (#1724).
//
// The ingestor records every long-running schema/data migration here:
//
// CREATE TABLE _async_migrations (
// name TEXT PRIMARY KEY,
// status TEXT NOT NULL, -- pending_async | done | failed
// started_at TEXT NOT NULL,
// ended_at TEXT,
// error TEXT,
// rows_processed INTEGER NOT NULL DEFAULT 0, -- #1724
// rows_total INTEGER NOT NULL DEFAULT 0, -- #1724
// last_update_at TEXT -- #1724
// );
//
// The server reads this table (read-only) so /api/perf and the
// /api/healthz warm-up banner can surface mid-flight backfill state.
// Without it operators upgrading to v3.9.2 see a black-box pause while
// tx_last_seen_backfill_v1 runs and can't tell "still backfilling"
// from "real bug" (#1724 root cause).
package main
import (
"database/sql"
"strings"
"time"
)
// AsyncMigrationInfo is the JSON shape served on /api/perf.asyncMigrations
// (and embedded in /api/healthz). Field names are stable.
type AsyncMigrationInfo struct {
Name string `json:"name"`
Status string `json:"status"` // running | complete | failed
RowsProcessed int64 `json:"rowsProcessed"`
RowsTotal int64 `json:"rowsTotal"`
Rate float64 `json:"rate"` // rows / sec
EtaSeconds int64 `json:"etaSeconds"` // 0 when complete or total unknown
ElapsedSec int64 `json:"elapsedSeconds"` // wall time since started_at
ErrorMessage string `json:"errorMessage,omitempty"`
}
// IsRunning reports whether the migration is still in progress.
func (a AsyncMigrationInfo) IsRunning() bool { return a.Status == "running" }
// readAsyncMigrations returns the current state of every async migration
// the ingestor has registered. Returns nil when the table doesn't exist
// (fresh DB on first server boot before the ingestor ran) — caller must
// treat nil as "no migrations to report" rather than an error.
func readAsyncMigrations(db *sql.DB) []AsyncMigrationInfo {
if db == nil {
return nil
}
// Use a tolerant SELECT — older ingestor builds may not have the
// rows_processed columns yet; COALESCE every additive field.
rows, err := db.Query(`
SELECT name, status,
COALESCE(rows_processed, 0),
COALESCE(rows_total, 0),
started_at,
COALESCE(ended_at, ''),
COALESCE(error, '')
FROM _async_migrations
ORDER BY started_at`)
if err != nil {
// Table missing or columns missing → silent empty.
return nil
}
defer rows.Close()
var out []AsyncMigrationInfo
now := time.Now()
for rows.Next() {
var (
info AsyncMigrationInfo
startedAt string
endedAt string
errMsg string
rawStatus string
)
if err := rows.Scan(&info.Name, &rawStatus, &info.RowsProcessed, &info.RowsTotal, &startedAt, &endedAt, &errMsg); err != nil {
continue
}
info.Status = mapAsyncStatus(rawStatus)
if errMsg != "" {
info.ErrorMessage = errMsg
}
started, _ := parseAsyncTime(startedAt)
end := now
if endedAt != "" {
if t, ok := parseAsyncTime(endedAt); ok {
end = t
}
}
if !started.IsZero() {
elapsed := end.Sub(started).Seconds()
if elapsed < 0 {
elapsed = 0
}
info.ElapsedSec = int64(elapsed)
if info.IsRunning() && elapsed > 0 && info.RowsProcessed > 0 {
info.Rate = float64(info.RowsProcessed) / elapsed
if info.RowsTotal > info.RowsProcessed && info.Rate > 0 {
info.EtaSeconds = int64(float64(info.RowsTotal-info.RowsProcessed) / info.Rate)
}
}
}
out = append(out, info)
}
return out
}
// mapAsyncStatus normalizes the bookkeeping `status` column to the
// API-stable vocabulary {running, complete, failed}.
func mapAsyncStatus(raw string) string {
switch strings.ToLower(strings.TrimSpace(raw)) {
case "done":
return "complete"
case "failed":
return "failed"
default:
return "running"
}
}
// parseAsyncTime accepts both the SQLite default datetime('now')
// format ("2006-01-02 15:04:05") and ISO-8601, returning the parsed
// UTC time and ok=true on success.
func parseAsyncTime(s string) (time.Time, bool) {
if s == "" {
return time.Time{}, false
}
for _, layout := range []string{
"2006-01-02 15:04:05",
time.RFC3339,
time.RFC3339Nano,
} {
if t, err := time.Parse(layout, s); err == nil {
return t.UTC(), true
}
}
return time.Time{}, false
}
// anyAsyncMigrationRunning returns true when any registered async
// migration is still in progress. Used to suppress
// backgroundLoadComplete=true on /api/healthz while a backfill is
// active (#1724 acceptance criterion #4 — warm-up banner must stay
// up + analytics 503 must keep returning Retry-After).
func anyAsyncMigrationRunning(migrations []AsyncMigrationInfo) bool {
for _, m := range migrations {
if m.IsRunning() {
return true
}
}
return false
}
+104
View File
@@ -0,0 +1,104 @@
// Tests for the server-side reader of _async_migrations (#1724).
//
// These pin the contract that the warm-up banner + /api/perf depend on:
// running migrations are surfaced with non-zero rate / ETA when the
// snapshot has enough info; completed migrations report status="complete"
// with no remaining work.
package main
import (
"database/sql"
"testing"
"time"
_ "modernc.org/sqlite"
)
func newAsyncMigrationsTestDB(t *testing.T) *sql.DB {
t.Helper()
db, err := sql.Open("sqlite", ":memory:")
if err != nil {
t.Fatalf("sql.Open: %v", err)
}
t.Cleanup(func() { db.Close() })
// PREFLIGHT: async=true reason="in-memory test DB fixture — not a prod migration; mirrors the ingestor's _async_migrations bookkeeping table shape"
if _, err := db.Exec(`
CREATE TABLE _async_migrations (
name TEXT PRIMARY KEY,
status TEXT NOT NULL,
started_at TEXT NOT NULL,
ended_at TEXT,
error TEXT,
rows_processed INTEGER NOT NULL DEFAULT 0,
rows_total INTEGER NOT NULL DEFAULT 0,
last_update_at TEXT
)`); err != nil {
t.Fatalf("schema: %v", err)
}
return db
}
func TestReadAsyncMigrations_RunningHasRateAndEta(t *testing.T) {
db := newAsyncMigrationsTestDB(t)
startedAt := time.Now().UTC().Add(-10 * time.Second).Format("2006-01-02 15:04:05")
if _, err := db.Exec(`INSERT INTO _async_migrations
(name, status, started_at, rows_processed, rows_total)
VALUES (?, 'pending_async', ?, 5000, 50000)`, "tx_last_seen_backfill_v1", startedAt); err != nil {
t.Fatalf("seed: %v", err)
}
got := readAsyncMigrations(db)
if len(got) != 1 {
t.Fatalf("got %d migrations, want 1", len(got))
}
m := got[0]
if m.Status != "running" {
t.Fatalf("status = %q, want running", m.Status)
}
if m.RowsProcessed != 5000 || m.RowsTotal != 50000 {
t.Fatalf("rows: processed=%d total=%d", m.RowsProcessed, m.RowsTotal)
}
if m.Rate <= 0 {
t.Fatalf("rate should be >0 for running migration with elapsed time, got %v", m.Rate)
}
if m.EtaSeconds <= 0 {
t.Fatalf("eta should be >0 when rows_total > rows_processed, got %d", m.EtaSeconds)
}
if !anyAsyncMigrationRunning(got) {
t.Fatal("anyAsyncMigrationRunning should be true")
}
}
func TestReadAsyncMigrations_DoneHasCompleteStatus(t *testing.T) {
db := newAsyncMigrationsTestDB(t)
startedAt := time.Now().UTC().Add(-30 * time.Second).Format("2006-01-02 15:04:05")
endedAt := time.Now().UTC().Add(-1 * time.Second).Format("2006-01-02 15:04:05")
if _, err := db.Exec(`INSERT INTO _async_migrations
(name, status, started_at, ended_at, rows_processed, rows_total)
VALUES (?, 'done', ?, ?, 12345, 12345)`,
"tx_last_seen_backfill_v1", startedAt, endedAt); err != nil {
t.Fatalf("seed: %v", err)
}
got := readAsyncMigrations(db)
if len(got) != 1 {
t.Fatalf("got %d", len(got))
}
m := got[0]
if m.Status != "complete" {
t.Fatalf("status = %q, want complete (done → complete mapping)", m.Status)
}
if anyAsyncMigrationRunning(got) {
t.Fatal("anyAsyncMigrationRunning should be false for done migration")
}
}
func TestReadAsyncMigrations_TableMissingReturnsNil(t *testing.T) {
db, _ := sql.Open("sqlite", ":memory:")
defer db.Close()
got := readAsyncMigrations(db)
if got != nil {
t.Fatalf("expected nil when table missing, got %+v", got)
}
}
+14
View File
@@ -52,6 +52,20 @@ func (s *Server) handleHealthz(w http.ResponseWriter, r *http.Request) {
"done": bfDone,
},
}
// #1724: surface ingestor-side async migration progress so the
// warm-up banner (#1660) keeps showing while tx_last_seen_backfill_v1
// runs and operators don't see "ready" while the writer is still
// pinned. Embedded as a top-level field; banner JS treats any
// running migration as a reason to keep the banner up.
var asyncMigrations []AsyncMigrationInfo
if s.db != nil {
asyncMigrations = readAsyncMigrations(s.db.conn)
}
asyncRunning := anyAsyncMigrationRunning(asyncMigrations)
if asyncMigrations != nil {
resp["async_migrations"] = asyncMigrations
}
resp["async_migrations_running"] = asyncRunning
// PR #1609 M1: surface per-MQTT-source receipt vs write-path
// liveness so operators can distinguish "broker alive, write
// path stuck" (lastReceiptUnix recent, lastMessageUnix stale)
+10
View File
@@ -904,6 +904,15 @@ func (s *Server) handlePerf(w http.ResponseWriter, r *http.Request) {
sqliteStats = &ss
}
// #1724: expose ingestor-side async migration progress so /api/perf
// surfaces backfill state to operators in real time. Read-only DB
// query against _async_migrations; returns nil on missing table
// (legacy DB / no migrations yet).
var asyncMigrations []AsyncMigrationInfo
if s.db != nil {
asyncMigrations = readAsyncMigrations(s.db.conn)
}
writeJSON(w, PerfResponse{
Uptime: uptimeSec,
TotalRequests: totalRequests,
@@ -913,6 +922,7 @@ func (s *Server) handlePerf(w http.ResponseWriter, r *http.Request) {
Cache: perfCS,
PacketStore: pktStoreStats,
Sqlite: sqliteStats,
AsyncMigrations: asyncMigrations,
GoRuntime: func() *GoRuntimeStats {
ms := s.getMemStats()
return &GoRuntimeStats{
+5
View File
@@ -270,6 +270,11 @@ type PerfResponse struct {
PacketStore *PerfPacketStoreStats `json:"packetStore"`
Sqlite *SqliteStats `json:"sqlite"`
GoRuntime *GoRuntimeStats `json:"goRuntime,omitempty"`
// AsyncMigrations surfaces ingestor-side async migration progress so
// operators can distinguish "backfill running" from "real bug" while
// the warm-up banner is up (#1724). Empty slice when no migrations
// have been registered (fresh DB / pre-v3.9.2 ingestor).
AsyncMigrations []AsyncMigrationInfo `json:"asyncMigrations"`
}
// GoRuntimeStats holds Go runtime metrics for the perf endpoint.
+20 -1
View File
@@ -54,6 +54,23 @@
' / ' + fmtNum(total) + ' (' + pct + '%)');
}
// #1724: surface ingestor-side async migrations (e.g.
// tx_last_seen_backfill_v1). Each running migration gets its own
// human-readable line with rows-processed / rows-total + ETA.
var asyncMigs = Array.isArray(h.async_migrations) ? h.async_migrations : [];
for (var ai = 0; ai < asyncMigs.length; ai++) {
var m = asyncMigs[ai] || {};
if (m.status !== 'running') continue;
var mProcessed = Number(m.rowsProcessed) || 0;
var mTotal = Number(m.rowsTotal) || 0;
var mRawPct = mTotal > 0 ? Math.floor((mProcessed / mTotal) * 100) : 0;
var mPct = Math.max(0, Math.min(100, mRawPct));
var eta = Number(m.etaSeconds) || 0;
var etaStr = eta > 0 ? ' \u2014 ~' + Math.ceil(eta) + 's remaining' : '';
msgs.push('Running migration ' + (m.name || '(unknown)') + ': ' +
fmtNum(mProcessed) + ' / ' + fmtNum(mTotal) + ' (' + mPct + '%)' + etaStr);
}
var liveness = h.ingest_liveness || {};
var srcs = Object.keys(liveness).sort();
for (var i = 0; i < srcs.length; i++) {
@@ -76,7 +93,8 @@
}
/**
* Steady-state predicate: ready=true AND from_pubkey_backfill.done=true.
* Steady-state predicate: ready=true AND from_pubkey_backfill.done=true
* AND no async migrations running (#1724).
* Once true, banner is dismissed and polling is torn down.
*/
function isSteadyState(healthz) {
@@ -84,6 +102,7 @@
if (healthz.ready !== true) return false;
var bf = healthz.from_pubkey_backfill;
if (bf && bf.done === false) return false;
if (healthz.async_migrations_running === true) return false;
return true;
}