mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-06-15 15:31:56 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 7e7be5efea | |||
| 716730f7f7 |
@@ -38,8 +38,12 @@ import (
|
||||
|
||||
// ensureAsyncMigrationsTable creates the bookkeeping table used by
|
||||
// RunAsyncMigration / AsyncMigrationStatus. Idempotent.
|
||||
//
|
||||
// #1724: rows_processed/rows_total/last_update_at columns let long-running
|
||||
// migrations stream progress to the server's /api/perf endpoint without
|
||||
// holding shared in-process state across the ingestor/server boundary.
|
||||
func ensureAsyncMigrationsTable(db *sql.DB) error {
|
||||
_, err := db.Exec(`
|
||||
if _, err := db.Exec(`
|
||||
CREATE TABLE IF NOT EXISTS _async_migrations (
|
||||
name TEXT PRIMARY KEY,
|
||||
status TEXT NOT NULL, -- pending_async | done | failed
|
||||
@@ -47,8 +51,20 @@ func ensureAsyncMigrationsTable(db *sql.DB) error {
|
||||
ended_at TEXT,
|
||||
error TEXT
|
||||
)
|
||||
`)
|
||||
return err
|
||||
`); err != nil {
|
||||
return err
|
||||
}
|
||||
// Best-effort additive columns for progress reporting (#1724).
|
||||
// IF NOT EXISTS isn't supported for ADD COLUMN until SQLite 3.35; the
|
||||
// errors are ignored when the column already exists.
|
||||
for _, sql := range []string{
|
||||
`ALTER TABLE _async_migrations ADD COLUMN rows_processed INTEGER NOT NULL DEFAULT 0`,
|
||||
`ALTER TABLE _async_migrations ADD COLUMN rows_total INTEGER NOT NULL DEFAULT 0`,
|
||||
`ALTER TABLE _async_migrations ADD COLUMN last_update_at TEXT`,
|
||||
} {
|
||||
_, _ = db.Exec(sql)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunAsyncMigration registers `name` as a pending async migration and
|
||||
@@ -140,6 +156,19 @@ func (s *Store) AsyncMigrationStatus(name string) (string, error) {
|
||||
return status, err
|
||||
}
|
||||
|
||||
// recordAsyncMigrationProgress writes the latest progress snapshot for the
|
||||
// named migration to the _async_migrations bookkeeping row so the server's
|
||||
// /api/perf can surface mid-flight state to operators (#1724). Best-effort:
|
||||
// failures are logged but never propagated to the migration body.
|
||||
func (s *Store) recordAsyncMigrationProgress(name string, p TxLastSeenBackfillProgress) {
|
||||
if _, err := s.db.Exec(`
|
||||
UPDATE _async_migrations
|
||||
SET rows_processed = ?, rows_total = ?, last_update_at = datetime('now')
|
||||
WHERE name = ?`, p.RowsProcessed, p.RowsTotal, name); err != nil {
|
||||
log.Printf("[async-migration] failed to record progress for %q: %v", name, err)
|
||||
}
|
||||
}
|
||||
|
||||
// WaitForAsyncMigrations blocks until all currently-scheduled async migrations
|
||||
// finish. Intended for tests + graceful shutdown; production boot path does NOT
|
||||
// call this (that's the whole point).
|
||||
|
||||
+11
-9
@@ -163,21 +163,23 @@ func OpenStoreWithInterval(dbPath string, sampleIntervalSec int) (*Store, error)
|
||||
// metadata-only ALTER); the populate query is potentially expensive
|
||||
// (full obs scan + group) so we run it async. Subsequent observation
|
||||
// inserts maintain the column inline (see InsertTransmission below).
|
||||
//
|
||||
// #1724: the populate MUST chunk (LIMIT N + sleep between batches) —
|
||||
// the original full-table correlated UPDATE pinned the SQLite writer
|
||||
// 10-15 min on prod-sized DBs, starving every reader. See
|
||||
// tx_last_seen_backfill.go for the chunking rationale + defaults.
|
||||
// PREFLIGHT: async=true reason="full-table backfill JOIN (1.9M+ obs × 86k+ tx in prod) — must not block ingestor boot"
|
||||
if err := s.RunAsyncMigration(context.Background(), "tx_last_seen_backfill_v1",
|
||||
func(ctx context.Context, d *sql.DB) error {
|
||||
log.Println("[migration/async] Backfilling transmissions.last_seen from MAX(observations.timestamp)...")
|
||||
res, err := d.ExecContext(ctx, `
|
||||
UPDATE transmissions
|
||||
SET last_seen = COALESCE((
|
||||
SELECT MAX(timestamp) FROM observations WHERE transmission_id = transmissions.id
|
||||
), last_seen)
|
||||
WHERE last_seen = 0
|
||||
`)
|
||||
log.Println("[migration/async] Backfilling transmissions.last_seen from MAX(observations.timestamp) (chunked, #1724)...")
|
||||
n, err := runTxLastSeenBackfillChunked(ctx, d, TxLastSeenBackfillOpts{
|
||||
Progress: func(p TxLastSeenBackfillProgress) {
|
||||
s.recordAsyncMigrationProgress("tx_last_seen_backfill_v1", p)
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n, _ := res.RowsAffected()
|
||||
log.Printf("[migration/async] transmissions.last_seen backfill complete: %d rows updated", n)
|
||||
return nil
|
||||
}); err != nil {
|
||||
|
||||
@@ -0,0 +1,142 @@
|
||||
// tx_last_seen_backfill — chunked backfill of transmissions.last_seen.
|
||||
//
|
||||
// Issue #1724: PR #1691 ran the populate as a single correlated UPDATE; on a
|
||||
// prod-shaped DB (71K tx / 1.5M obs) that pinned the SQLite writer for 10-15
|
||||
// min, starving every reader (p95 catastrophic across /api/stats,
|
||||
// /api/healthz, /api/packets, /api/analytics/hash-sizes, ...). The writer
|
||||
// path is a SINGLE connection (db.SetMaxOpenConns(1) in OpenStoreWithInterval)
|
||||
// — every reader queues behind whatever statement currently holds it.
|
||||
//
|
||||
// The fix here chunks the UPDATE into batches of `batchSize` rows and sleeps
|
||||
// `yieldDelay` between batches. Each batch releases + re-acquires the writer
|
||||
// connection, so reader queries that arrived during the previous batch get
|
||||
// served in the gap. Progress is reported via the optional callback so the
|
||||
// migration runner can surface live state on /api/perf and the warm-up
|
||||
// banner can stay up until the backfill finishes.
|
||||
//
|
||||
// Defaults (5000 rows / 100ms sleep) are tuned for prod ARM64 hardware:
|
||||
// at ~5000 UPDATEs per batch, wall time per batch on the prod DB is
|
||||
// ~30-80 ms; the 100 ms sleep keeps the writer idle ~55-75% of the time.
|
||||
// On 1.5M rows that's ~300 batches × ~150 ms = ~45 s of wall time end-to-end
|
||||
// (vs ~5-7 min of writer-locked dead-air pre-fix). Smaller batches raise
|
||||
// the overhead-to-work ratio; larger batches risk extending lock windows
|
||||
// past reader-visible (~200 ms) thresholds.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TxLastSeenBackfillProgress is the snapshot reported to the optional
|
||||
// progress callback after each batch.
|
||||
type TxLastSeenBackfillProgress struct {
|
||||
RowsProcessed int64
|
||||
RowsTotal int64
|
||||
BatchNum int
|
||||
ElapsedMs int64
|
||||
}
|
||||
|
||||
// TxLastSeenBackfillOpts tunes the chunked backfill. Zero values fall back
|
||||
// to production defaults.
|
||||
type TxLastSeenBackfillOpts struct {
|
||||
BatchSize int // rows per UPDATE chunk (default 5000)
|
||||
YieldDelay time.Duration // sleep between batches (default 100ms); negative means no sleep
|
||||
Progress func(TxLastSeenBackfillProgress)
|
||||
}
|
||||
|
||||
const (
|
||||
defaultTxBackfillBatchSize = 5000
|
||||
defaultTxBackfillYieldDelay = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
// runTxLastSeenBackfillChunked backfills transmissions.last_seen in bounded
|
||||
// batches. Returns the total number of rows updated. The function is the
|
||||
// body of the tx_last_seen_backfill_v1 async migration registered in
|
||||
// OpenStoreWithInterval (#1690 backfill, #1724 chunking).
|
||||
//
|
||||
// Contract (pinned by tx_last_seen_backfill_test.go):
|
||||
// - MUST NOT execute a single full-table UPDATE; readers in another
|
||||
// goroutine must be able to make forward progress while the backfill
|
||||
// runs.
|
||||
// - MUST invoke opts.Progress at least once per batch (when non-nil) so
|
||||
// the migration runner can surface mid-flight state.
|
||||
// - MUST honor ctx cancellation between batches; an in-flight batch
|
||||
// completes, then the loop returns ctx.Err().
|
||||
// - Idempotent: once `last_seen=0` rows are exhausted the loop exits.
|
||||
func runTxLastSeenBackfillChunked(ctx context.Context, db *sql.DB, opts TxLastSeenBackfillOpts) (int64, error) {
|
||||
batch := opts.BatchSize
|
||||
if batch <= 0 {
|
||||
batch = defaultTxBackfillBatchSize
|
||||
}
|
||||
yield := opts.YieldDelay
|
||||
if yield == 0 {
|
||||
yield = defaultTxBackfillYieldDelay
|
||||
}
|
||||
if yield < 0 {
|
||||
yield = 0
|
||||
}
|
||||
|
||||
// One-shot count of pending rows so the progress callback can
|
||||
// report ETA. This SELECT is a normal reader on the writer
|
||||
// connection — runs once before the loop so it doesn't extend
|
||||
// per-batch lock windows.
|
||||
//
|
||||
// Snapshot the max transmission id at start (#1724): the chunked
|
||||
// loop must only process rows that existed when the migration
|
||||
// began. Without this bound, new INSERTs that land between
|
||||
// batches (every observation insert that creates a fresh hash
|
||||
// goes through last_seen=0 → bump) would keep the loop alive
|
||||
// indefinitely, deadlocking shutdown paths that wait on
|
||||
// backfillWg. New rows are already maintained inline by
|
||||
// InsertTransmission's last_seen bumper (#1690 writer path), so
|
||||
// the backfill explicitly does NOT need to catch them.
|
||||
var maxID int64
|
||||
_ = db.QueryRowContext(ctx, `SELECT COALESCE(MAX(id), 0) FROM transmissions`).Scan(&maxID)
|
||||
var total int64
|
||||
_ = db.QueryRowContext(ctx, `SELECT COUNT(*) FROM transmissions WHERE last_seen = 0 AND id <= ?`, maxID).Scan(&total)
|
||||
|
||||
start := time.Now()
|
||||
var processed int64
|
||||
var batchNum int
|
||||
for {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return processed, err
|
||||
}
|
||||
res, err := db.ExecContext(ctx, `
|
||||
UPDATE transmissions
|
||||
SET last_seen = COALESCE((
|
||||
SELECT MAX(timestamp) FROM observations WHERE transmission_id = transmissions.id
|
||||
), last_seen)
|
||||
WHERE id IN (
|
||||
SELECT id FROM transmissions WHERE last_seen = 0 AND id <= ? LIMIT ?
|
||||
)`, maxID, batch)
|
||||
if err != nil {
|
||||
return processed, err
|
||||
}
|
||||
n, _ := res.RowsAffected()
|
||||
batchNum++
|
||||
processed += n
|
||||
if opts.Progress != nil {
|
||||
opts.Progress(TxLastSeenBackfillProgress{
|
||||
RowsProcessed: processed,
|
||||
RowsTotal: total,
|
||||
BatchNum: batchNum,
|
||||
ElapsedMs: time.Since(start).Milliseconds(),
|
||||
})
|
||||
}
|
||||
if n == 0 {
|
||||
return processed, nil
|
||||
}
|
||||
if yield > 0 {
|
||||
// Use a timer so ctx cancellation interrupts the sleep.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return processed, ctx.Err()
|
||||
case <-time.After(yield):
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,105 @@
|
||||
// Test for issue #1724 — the tx_last_seen backfill MUST chunk its
|
||||
// UPDATE so SQLite readers can make forward progress while the
|
||||
// backfill runs. The original PR #1691 implementation ran a single
|
||||
// correlated UPDATE that pinned the writer 10-15 min on a prod-sized
|
||||
// DB; this test asserts the chunked behavior (≤ batchSize rows per
|
||||
// batch + multiple progress callbacks).
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestIssue1724_TxLastSeenBackfillIsChunked seeds 12k transmissions
|
||||
// with last_seen=0 and runs runTxLastSeenBackfillChunked with a
|
||||
// batchSize of 1000. It asserts:
|
||||
//
|
||||
// 1. The progress callback fires more than once (proving the loop
|
||||
// batches, not single-shots).
|
||||
// 2. Every per-batch RowsProcessed delta is ≤ batchSize+epsilon
|
||||
// (proving each UPDATE is bounded, not full-table).
|
||||
//
|
||||
// Pre-fix (single full-table UPDATE) the callback fires exactly once
|
||||
// with RowsProcessed=12000, failing both assertions on an assertion
|
||||
// (not a build/import error).
|
||||
func TestIssue1724_TxLastSeenBackfillIsChunked(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// The OpenStore-scheduled tx_last_seen_backfill_v1 fires against the
|
||||
// empty DB; wait for it to complete before seeding so the goroutine
|
||||
// doesn't race our INSERTs and consume rows from under the manual
|
||||
// backfill call below.
|
||||
s.WaitForAsyncMigrations()
|
||||
|
||||
const seedN = 12000
|
||||
const batchSize = 1000
|
||||
|
||||
// Seed transmissions with last_seen=0 and one matching observation
|
||||
// each so the correlated MAX(timestamp) subquery returns a non-zero
|
||||
// value (forces RowsAffected to be non-zero).
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
t.Fatalf("begin: %v", err)
|
||||
}
|
||||
insTx, err := tx.Prepare(`INSERT INTO transmissions(raw_hex, hash, first_seen, last_seen) VALUES('00','h'||?, '2024-01-01T00:00:00Z', 0)`)
|
||||
if err != nil {
|
||||
t.Fatalf("prep tx: %v", err)
|
||||
}
|
||||
insObs, err := tx.Prepare(`INSERT INTO observations(transmission_id, observer_idx, timestamp) VALUES(?, 1, ?)`)
|
||||
if err != nil {
|
||||
t.Fatalf("prep obs: %v", err)
|
||||
}
|
||||
for i := 0; i < seedN; i++ {
|
||||
res, err := insTx.Exec(i)
|
||||
if err != nil {
|
||||
t.Fatalf("seed tx %d: %v", i, err)
|
||||
}
|
||||
id, _ := res.LastInsertId()
|
||||
if _, err := insObs.Exec(id, time.Now().Unix()+int64(i)); err != nil {
|
||||
t.Fatalf("seed obs %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
insTx.Close()
|
||||
insObs.Close()
|
||||
if err := tx.Commit(); err != nil {
|
||||
t.Fatalf("commit: %v", err)
|
||||
}
|
||||
|
||||
var snapshots []TxLastSeenBackfillProgress
|
||||
progress := func(p TxLastSeenBackfillProgress) {
|
||||
snapshots = append(snapshots, p)
|
||||
}
|
||||
|
||||
total, err := runTxLastSeenBackfillChunked(ctx, s.db, TxLastSeenBackfillOpts{
|
||||
BatchSize: batchSize,
|
||||
YieldDelay: time.Millisecond,
|
||||
Progress: progress,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("backfill: %v", err)
|
||||
}
|
||||
if total != seedN {
|
||||
t.Fatalf("total rows updated = %d, want %d", total, seedN)
|
||||
}
|
||||
|
||||
// Invariant 1: the loop must batch.
|
||||
if len(snapshots) < 2 {
|
||||
t.Fatalf("progress callback fired %d times; want ≥ 2 (chunked loop should emit one per batch; pre-fix #1724 emits exactly 1 for the full-table UPDATE)",
|
||||
len(snapshots))
|
||||
}
|
||||
|
||||
// Invariant 2: per-batch delta must be bounded by batchSize.
|
||||
var prev int64
|
||||
for i, snap := range snapshots {
|
||||
delta := snap.RowsProcessed - prev
|
||||
if delta > int64(batchSize) {
|
||||
t.Fatalf("snapshot[%d] delta=%d exceeds batchSize=%d; backfill is not chunking (pre-fix #1724 ran one full-table UPDATE)",
|
||||
i, delta, batchSize)
|
||||
}
|
||||
prev = snap.RowsProcessed
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,158 @@
|
||||
// async_migrations.go — server-side reader for the ingestor's
|
||||
// _async_migrations bookkeeping table (#1724).
|
||||
//
|
||||
// The ingestor records every long-running schema/data migration here:
|
||||
//
|
||||
// CREATE TABLE _async_migrations (
|
||||
// name TEXT PRIMARY KEY,
|
||||
// status TEXT NOT NULL, -- pending_async | done | failed
|
||||
// started_at TEXT NOT NULL,
|
||||
// ended_at TEXT,
|
||||
// error TEXT,
|
||||
// rows_processed INTEGER NOT NULL DEFAULT 0, -- #1724
|
||||
// rows_total INTEGER NOT NULL DEFAULT 0, -- #1724
|
||||
// last_update_at TEXT -- #1724
|
||||
// );
|
||||
//
|
||||
// The server reads this table (read-only) so /api/perf and the
|
||||
// /api/healthz warm-up banner can surface mid-flight backfill state.
|
||||
// Without it operators upgrading to v3.9.2 see a black-box pause while
|
||||
// tx_last_seen_backfill_v1 runs and can't tell "still backfilling"
|
||||
// from "real bug" (#1724 root cause).
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// AsyncMigrationInfo is the JSON shape served on /api/perf.asyncMigrations
|
||||
// (and embedded in /api/healthz). Field names are stable.
|
||||
type AsyncMigrationInfo struct {
|
||||
Name string `json:"name"`
|
||||
Status string `json:"status"` // running | complete | failed
|
||||
RowsProcessed int64 `json:"rowsProcessed"`
|
||||
RowsTotal int64 `json:"rowsTotal"`
|
||||
Rate float64 `json:"rate"` // rows / sec
|
||||
EtaSeconds int64 `json:"etaSeconds"` // 0 when complete or total unknown
|
||||
ElapsedSec int64 `json:"elapsedSeconds"` // wall time since started_at
|
||||
ErrorMessage string `json:"errorMessage,omitempty"`
|
||||
}
|
||||
|
||||
// IsRunning reports whether the migration is still in progress.
|
||||
func (a AsyncMigrationInfo) IsRunning() bool { return a.Status == "running" }
|
||||
|
||||
// readAsyncMigrations returns the current state of every async migration
|
||||
// the ingestor has registered. Returns nil when the table doesn't exist
|
||||
// (fresh DB on first server boot before the ingestor ran) — caller must
|
||||
// treat nil as "no migrations to report" rather than an error.
|
||||
func readAsyncMigrations(db *sql.DB) []AsyncMigrationInfo {
|
||||
if db == nil {
|
||||
return nil
|
||||
}
|
||||
// Use a tolerant SELECT — older ingestor builds may not have the
|
||||
// rows_processed columns yet; COALESCE every additive field.
|
||||
rows, err := db.Query(`
|
||||
SELECT name, status,
|
||||
COALESCE(rows_processed, 0),
|
||||
COALESCE(rows_total, 0),
|
||||
started_at,
|
||||
COALESCE(ended_at, ''),
|
||||
COALESCE(error, '')
|
||||
FROM _async_migrations
|
||||
ORDER BY started_at`)
|
||||
if err != nil {
|
||||
// Table missing or columns missing → silent empty.
|
||||
return nil
|
||||
}
|
||||
defer rows.Close()
|
||||
var out []AsyncMigrationInfo
|
||||
now := time.Now()
|
||||
for rows.Next() {
|
||||
var (
|
||||
info AsyncMigrationInfo
|
||||
startedAt string
|
||||
endedAt string
|
||||
errMsg string
|
||||
rawStatus string
|
||||
)
|
||||
if err := rows.Scan(&info.Name, &rawStatus, &info.RowsProcessed, &info.RowsTotal, &startedAt, &endedAt, &errMsg); err != nil {
|
||||
continue
|
||||
}
|
||||
info.Status = mapAsyncStatus(rawStatus)
|
||||
if errMsg != "" {
|
||||
info.ErrorMessage = errMsg
|
||||
}
|
||||
|
||||
started, _ := parseAsyncTime(startedAt)
|
||||
end := now
|
||||
if endedAt != "" {
|
||||
if t, ok := parseAsyncTime(endedAt); ok {
|
||||
end = t
|
||||
}
|
||||
}
|
||||
if !started.IsZero() {
|
||||
elapsed := end.Sub(started).Seconds()
|
||||
if elapsed < 0 {
|
||||
elapsed = 0
|
||||
}
|
||||
info.ElapsedSec = int64(elapsed)
|
||||
if info.IsRunning() && elapsed > 0 && info.RowsProcessed > 0 {
|
||||
info.Rate = float64(info.RowsProcessed) / elapsed
|
||||
if info.RowsTotal > info.RowsProcessed && info.Rate > 0 {
|
||||
info.EtaSeconds = int64(float64(info.RowsTotal-info.RowsProcessed) / info.Rate)
|
||||
}
|
||||
}
|
||||
}
|
||||
out = append(out, info)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// mapAsyncStatus normalizes the bookkeeping `status` column to the
|
||||
// API-stable vocabulary {running, complete, failed}.
|
||||
func mapAsyncStatus(raw string) string {
|
||||
switch strings.ToLower(strings.TrimSpace(raw)) {
|
||||
case "done":
|
||||
return "complete"
|
||||
case "failed":
|
||||
return "failed"
|
||||
default:
|
||||
return "running"
|
||||
}
|
||||
}
|
||||
|
||||
// parseAsyncTime accepts both the SQLite default datetime('now')
|
||||
// format ("2006-01-02 15:04:05") and ISO-8601, returning the parsed
|
||||
// UTC time and ok=true on success.
|
||||
func parseAsyncTime(s string) (time.Time, bool) {
|
||||
if s == "" {
|
||||
return time.Time{}, false
|
||||
}
|
||||
for _, layout := range []string{
|
||||
"2006-01-02 15:04:05",
|
||||
time.RFC3339,
|
||||
time.RFC3339Nano,
|
||||
} {
|
||||
if t, err := time.Parse(layout, s); err == nil {
|
||||
return t.UTC(), true
|
||||
}
|
||||
}
|
||||
return time.Time{}, false
|
||||
}
|
||||
|
||||
// anyAsyncMigrationRunning returns true when any registered async
|
||||
// migration is still in progress. Used to suppress
|
||||
// backgroundLoadComplete=true on /api/healthz while a backfill is
|
||||
// active (#1724 acceptance criterion #4 — warm-up banner must stay
|
||||
// up + analytics 503 must keep returning Retry-After).
|
||||
func anyAsyncMigrationRunning(migrations []AsyncMigrationInfo) bool {
|
||||
for _, m := range migrations {
|
||||
if m.IsRunning() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
@@ -0,0 +1,104 @@
|
||||
// Tests for the server-side reader of _async_migrations (#1724).
|
||||
//
|
||||
// These pin the contract that the warm-up banner + /api/perf depend on:
|
||||
// running migrations are surfaced with non-zero rate / ETA when the
|
||||
// snapshot has enough info; completed migrations report status="complete"
|
||||
// with no remaining work.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
func newAsyncMigrationsTestDB(t *testing.T) *sql.DB {
|
||||
t.Helper()
|
||||
db, err := sql.Open("sqlite", ":memory:")
|
||||
if err != nil {
|
||||
t.Fatalf("sql.Open: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { db.Close() })
|
||||
// PREFLIGHT: async=true reason="in-memory test DB fixture — not a prod migration; mirrors the ingestor's _async_migrations bookkeeping table shape"
|
||||
if _, err := db.Exec(`
|
||||
CREATE TABLE _async_migrations (
|
||||
name TEXT PRIMARY KEY,
|
||||
status TEXT NOT NULL,
|
||||
started_at TEXT NOT NULL,
|
||||
ended_at TEXT,
|
||||
error TEXT,
|
||||
rows_processed INTEGER NOT NULL DEFAULT 0,
|
||||
rows_total INTEGER NOT NULL DEFAULT 0,
|
||||
last_update_at TEXT
|
||||
)`); err != nil {
|
||||
t.Fatalf("schema: %v", err)
|
||||
}
|
||||
return db
|
||||
}
|
||||
|
||||
func TestReadAsyncMigrations_RunningHasRateAndEta(t *testing.T) {
|
||||
db := newAsyncMigrationsTestDB(t)
|
||||
startedAt := time.Now().UTC().Add(-10 * time.Second).Format("2006-01-02 15:04:05")
|
||||
if _, err := db.Exec(`INSERT INTO _async_migrations
|
||||
(name, status, started_at, rows_processed, rows_total)
|
||||
VALUES (?, 'pending_async', ?, 5000, 50000)`, "tx_last_seen_backfill_v1", startedAt); err != nil {
|
||||
t.Fatalf("seed: %v", err)
|
||||
}
|
||||
|
||||
got := readAsyncMigrations(db)
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("got %d migrations, want 1", len(got))
|
||||
}
|
||||
m := got[0]
|
||||
if m.Status != "running" {
|
||||
t.Fatalf("status = %q, want running", m.Status)
|
||||
}
|
||||
if m.RowsProcessed != 5000 || m.RowsTotal != 50000 {
|
||||
t.Fatalf("rows: processed=%d total=%d", m.RowsProcessed, m.RowsTotal)
|
||||
}
|
||||
if m.Rate <= 0 {
|
||||
t.Fatalf("rate should be >0 for running migration with elapsed time, got %v", m.Rate)
|
||||
}
|
||||
if m.EtaSeconds <= 0 {
|
||||
t.Fatalf("eta should be >0 when rows_total > rows_processed, got %d", m.EtaSeconds)
|
||||
}
|
||||
if !anyAsyncMigrationRunning(got) {
|
||||
t.Fatal("anyAsyncMigrationRunning should be true")
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadAsyncMigrations_DoneHasCompleteStatus(t *testing.T) {
|
||||
db := newAsyncMigrationsTestDB(t)
|
||||
startedAt := time.Now().UTC().Add(-30 * time.Second).Format("2006-01-02 15:04:05")
|
||||
endedAt := time.Now().UTC().Add(-1 * time.Second).Format("2006-01-02 15:04:05")
|
||||
if _, err := db.Exec(`INSERT INTO _async_migrations
|
||||
(name, status, started_at, ended_at, rows_processed, rows_total)
|
||||
VALUES (?, 'done', ?, ?, 12345, 12345)`,
|
||||
"tx_last_seen_backfill_v1", startedAt, endedAt); err != nil {
|
||||
t.Fatalf("seed: %v", err)
|
||||
}
|
||||
|
||||
got := readAsyncMigrations(db)
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("got %d", len(got))
|
||||
}
|
||||
m := got[0]
|
||||
if m.Status != "complete" {
|
||||
t.Fatalf("status = %q, want complete (done → complete mapping)", m.Status)
|
||||
}
|
||||
if anyAsyncMigrationRunning(got) {
|
||||
t.Fatal("anyAsyncMigrationRunning should be false for done migration")
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadAsyncMigrations_TableMissingReturnsNil(t *testing.T) {
|
||||
db, _ := sql.Open("sqlite", ":memory:")
|
||||
defer db.Close()
|
||||
got := readAsyncMigrations(db)
|
||||
if got != nil {
|
||||
t.Fatalf("expected nil when table missing, got %+v", got)
|
||||
}
|
||||
}
|
||||
@@ -52,6 +52,20 @@ func (s *Server) handleHealthz(w http.ResponseWriter, r *http.Request) {
|
||||
"done": bfDone,
|
||||
},
|
||||
}
|
||||
// #1724: surface ingestor-side async migration progress so the
|
||||
// warm-up banner (#1660) keeps showing while tx_last_seen_backfill_v1
|
||||
// runs and operators don't see "ready" while the writer is still
|
||||
// pinned. Embedded as a top-level field; banner JS treats any
|
||||
// running migration as a reason to keep the banner up.
|
||||
var asyncMigrations []AsyncMigrationInfo
|
||||
if s.db != nil {
|
||||
asyncMigrations = readAsyncMigrations(s.db.conn)
|
||||
}
|
||||
asyncRunning := anyAsyncMigrationRunning(asyncMigrations)
|
||||
if asyncMigrations != nil {
|
||||
resp["async_migrations"] = asyncMigrations
|
||||
}
|
||||
resp["async_migrations_running"] = asyncRunning
|
||||
// PR #1609 M1: surface per-MQTT-source receipt vs write-path
|
||||
// liveness so operators can distinguish "broker alive, write
|
||||
// path stuck" (lastReceiptUnix recent, lastMessageUnix stale)
|
||||
|
||||
@@ -904,6 +904,15 @@ func (s *Server) handlePerf(w http.ResponseWriter, r *http.Request) {
|
||||
sqliteStats = &ss
|
||||
}
|
||||
|
||||
// #1724: expose ingestor-side async migration progress so /api/perf
|
||||
// surfaces backfill state to operators in real time. Read-only DB
|
||||
// query against _async_migrations; returns nil on missing table
|
||||
// (legacy DB / no migrations yet).
|
||||
var asyncMigrations []AsyncMigrationInfo
|
||||
if s.db != nil {
|
||||
asyncMigrations = readAsyncMigrations(s.db.conn)
|
||||
}
|
||||
|
||||
writeJSON(w, PerfResponse{
|
||||
Uptime: uptimeSec,
|
||||
TotalRequests: totalRequests,
|
||||
@@ -913,6 +922,7 @@ func (s *Server) handlePerf(w http.ResponseWriter, r *http.Request) {
|
||||
Cache: perfCS,
|
||||
PacketStore: pktStoreStats,
|
||||
Sqlite: sqliteStats,
|
||||
AsyncMigrations: asyncMigrations,
|
||||
GoRuntime: func() *GoRuntimeStats {
|
||||
ms := s.getMemStats()
|
||||
return &GoRuntimeStats{
|
||||
|
||||
@@ -270,6 +270,11 @@ type PerfResponse struct {
|
||||
PacketStore *PerfPacketStoreStats `json:"packetStore"`
|
||||
Sqlite *SqliteStats `json:"sqlite"`
|
||||
GoRuntime *GoRuntimeStats `json:"goRuntime,omitempty"`
|
||||
// AsyncMigrations surfaces ingestor-side async migration progress so
|
||||
// operators can distinguish "backfill running" from "real bug" while
|
||||
// the warm-up banner is up (#1724). Empty slice when no migrations
|
||||
// have been registered (fresh DB / pre-v3.9.2 ingestor).
|
||||
AsyncMigrations []AsyncMigrationInfo `json:"asyncMigrations"`
|
||||
}
|
||||
|
||||
// GoRuntimeStats holds Go runtime metrics for the perf endpoint.
|
||||
|
||||
+20
-1
@@ -54,6 +54,23 @@
|
||||
' / ' + fmtNum(total) + ' (' + pct + '%)');
|
||||
}
|
||||
|
||||
// #1724: surface ingestor-side async migrations (e.g.
|
||||
// tx_last_seen_backfill_v1). Each running migration gets its own
|
||||
// human-readable line with rows-processed / rows-total + ETA.
|
||||
var asyncMigs = Array.isArray(h.async_migrations) ? h.async_migrations : [];
|
||||
for (var ai = 0; ai < asyncMigs.length; ai++) {
|
||||
var m = asyncMigs[ai] || {};
|
||||
if (m.status !== 'running') continue;
|
||||
var mProcessed = Number(m.rowsProcessed) || 0;
|
||||
var mTotal = Number(m.rowsTotal) || 0;
|
||||
var mRawPct = mTotal > 0 ? Math.floor((mProcessed / mTotal) * 100) : 0;
|
||||
var mPct = Math.max(0, Math.min(100, mRawPct));
|
||||
var eta = Number(m.etaSeconds) || 0;
|
||||
var etaStr = eta > 0 ? ' \u2014 ~' + Math.ceil(eta) + 's remaining' : '';
|
||||
msgs.push('Running migration ' + (m.name || '(unknown)') + ': ' +
|
||||
fmtNum(mProcessed) + ' / ' + fmtNum(mTotal) + ' (' + mPct + '%)' + etaStr);
|
||||
}
|
||||
|
||||
var liveness = h.ingest_liveness || {};
|
||||
var srcs = Object.keys(liveness).sort();
|
||||
for (var i = 0; i < srcs.length; i++) {
|
||||
@@ -76,7 +93,8 @@
|
||||
}
|
||||
|
||||
/**
|
||||
* Steady-state predicate: ready=true AND from_pubkey_backfill.done=true.
|
||||
* Steady-state predicate: ready=true AND from_pubkey_backfill.done=true
|
||||
* AND no async migrations running (#1724).
|
||||
* Once true, banner is dismissed and polling is torn down.
|
||||
*/
|
||||
function isSteadyState(healthz) {
|
||||
@@ -84,6 +102,7 @@
|
||||
if (healthz.ready !== true) return false;
|
||||
var bf = healthz.from_pubkey_backfill;
|
||||
if (bf && bf.done === false) return false;
|
||||
if (healthz.async_migrations_running === true) return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user