diff --git a/public/perf.js b/public/perf.js
index 611b9a32..ce3d1db6 100644
--- a/public/perf.js
+++ b/public/perf.js
@@ -3,6 +3,75 @@
var GH = 'https://github.com/Kpa-clawbot/corescope';
+// detectPerfAnomalies — pure, testable.
+// Computes per-component write rates over a rolling time window and flags any
+// component whose current per-second rate exceeds `factor` × its rolling
+// baseline rate. Issue #1120 acceptance: 5-minute window, 10× threshold.
+//
+// Inputs:
+// history: ordered array of snapshots [{ sampleAt: ISO, sources: { name: cum } }]
+// current: the freshest snapshot, same shape
+// opts:
+// windowMs (default 5*60*1000) — rolling baseline window
+// factor (default 10) — rate-multiplier threshold
+// minHistorySec (default 30) — refuse to flag until baseline is stable
+//
+// Returns: { rates, baselineRates, flags } — all keyed by source name.
+function detectPerfAnomalies(history, current, opts) {
+ opts = opts || {};
+ const windowMs = opts.windowMs || (5 * 60 * 1000);
+ const factor = opts.factor || 10;
+ const minHistorySec = opts.minHistorySec != null ? opts.minHistorySec : 30;
+ const out = { rates: {}, baselineRates: {}, flags: {} };
+ if (!current || !current.sources || !history || history.length === 0) return out;
+ const curT = Date.parse(current.sampleAt);
+ if (!isFinite(curT)) return out;
+
+ // Find the most recent prior sample (for the *current* per-second rate)
+ // and the oldest sample within the window (for the baseline).
+ const prior = history[history.length - 1];
+ const priorT = Date.parse(prior.sampleAt);
+ const curDt = (curT - priorT) / 1000;
+ if (!(curDt > 0)) return out;
+
+ // Baseline: oldest sample within window vs. prior (the snapshot just before
+ // `current`). Anything older than windowMs is excluded.
+ const cutoff = curT - windowMs;
+ let baseIdx = 0;
+ for (let i = history.length - 1; i >= 0; i--) {
+ if (Date.parse(history[i].sampleAt) < cutoff) { baseIdx = i + 1; break; }
+ }
+ if (baseIdx >= history.length) baseIdx = history.length - 1;
+ const baseSnap = history[baseIdx];
+ const baseT = Date.parse(baseSnap.sampleAt);
+ const baseDt = (priorT - baseT) / 1000;
+
+ // Compute rates for every source seen in current.
+ for (const k of Object.keys(current.sources)) {
+ const cur = current.sources[k] || 0;
+ const prev = (prior.sources && prior.sources[k]) || 0;
+ const rate = (cur - prev) / curDt;
+ out.rates[k] = rate;
+ if (baseDt <= 0 || baseDt < minHistorySec) {
+ out.baselineRates[k] = null;
+ continue;
+ }
+ const baseStart = (baseSnap.sources && baseSnap.sources[k]) || 0;
+ const baseEnd = prev; // baseline window = [baseSnap .. prior]
+ const baseRate = (baseEnd - baseStart) / baseDt;
+ out.baselineRates[k] = baseRate;
+ // Guard floor to avoid 0-baseline → infinite ratio false positives.
+ const floor = 0.05; // 1 event per 20s minimum baseline
+ if (rate > factor * Math.max(baseRate, floor) && rate > factor * floor) {
+ out.flags[k] = true;
+ }
+ }
+ return out;
+}
+if (typeof window !== 'undefined') {
+ window.detectPerfAnomalies = detectPerfAnomalies;
+}
+
function renderVersionCard(health) {
if (!health || (!health.version && !health.commit)) return '';
var ver = health.version && health.version !== 'unknown' ? health.version : null;
@@ -127,48 +196,33 @@ function renderVersionCard(health) {
if (keys.length === 0) {
html += '
No ingestor stats yet (waiting for /tmp/corescope-ingestor-stats.json)
';
} else {
- // Anomaly detection (#1123 polish):
- // Compare PER-SECOND DELTA RATES, not cumulative counts.
- // Cumulative-vs-cumulative was a tautology that fired ⚠️ at startup
- // (any backfill_* > 10 when tx_inserted=0 → baseline collapses to 1)
- // and false-cleared once tx grew past a one-shot backfill burst.
- // Now we cache the previous snapshot + sampleAt and only fire when:
- // 1) we have a real interval (≥ 0.5s) to compute deltas against
- // 2) tx_inserted has crossed MIN_SAMPLE so the baseline is meaningful
- // 3) the per-second backfill rate exceeds 10× the per-second tx rate
- const MIN_SAMPLE = 100;
- const prev = window._perfWriteSourcesPrev;
- let prevSrc = null, dtSec = 0;
- if (prev && prev.sampleAt && writeSources.sampleAt) {
- dtSec = (Date.parse(writeSources.sampleAt) - Date.parse(prev.sampleAt)) / 1000;
- if (dtSec >= 0.5) prevSrc = prev.sources;
- }
- const txTotal = src.tx_inserted || 0;
- const txDelta = prevSrc ? (txTotal - (prevSrc.tx_inserted || 0)) : 0;
- const txRate = (prevSrc && dtSec > 0) ? (txDelta / dtSec) : 0;
- html += '| Source | Total | Rate/s | Anomaly |
';
+ // Anomaly detection (#1120 acceptance): flag any component whose
+ // per-second write rate exceeds 10× its 5-minute rolling baseline.
+ // History is stashed on window so the detector has multi-sample
+ // context across the 5s refresh tick.
+ if (!window._perfWriteSourcesHistory) window._perfWriteSourcesHistory = [];
+ const history = window._perfWriteSourcesHistory;
+ const current = { sampleAt: writeSources.sampleAt || new Date().toISOString(), sources: { ...src } };
+ const anom = detectPerfAnomalies(history, current, { windowMs: 5 * 60 * 1000, factor: 10 });
+ // Append current and prune anything older than 6 minutes (keeps a
+ // little headroom past the 5-min window, bounded memory).
+ history.push(current);
+ const cutoff = Date.parse(current.sampleAt) - (6 * 60 * 1000);
+ while (history.length > 1 && Date.parse(history[0].sampleAt) < cutoff) history.shift();
+
+ html += '| Source | Total | Rate/s | Baseline/s | Anomaly |
';
for (const k of keys) {
const v = src[k] || 0;
- const isBackfill = k.startsWith('backfill_');
- let rate = 0;
- let flag = '';
- if (prevSrc && dtSec > 0) {
- const delta = v - (prevSrc[k] || 0);
- rate = delta / dtSec;
- // Only flag when tx baseline is statistically meaningful AND
- // backfill is actively running faster than 10× the live tx rate.
- if (isBackfill && txTotal >= MIN_SAMPLE && rate > 10 * Math.max(txRate, 1)) {
- flag = ' ⚠️';
- }
- }
- const rateStr = (prevSrc && dtSec > 0) ? rate.toFixed(1) : '—';
- html += `${k} | ${v.toLocaleString()} | ${rateStr} | ${flag} |
`;
+ const rate = anom.rates[k];
+ const base = anom.baselineRates[k];
+ const flag = anom.flags[k] ? ' ⚠️' : '';
+ const rateStr = (rate != null && isFinite(rate)) ? rate.toFixed(2) : '—';
+ const baseStr = (base != null && isFinite(base)) ? base.toFixed(2) : '—';
+ html += `${k} | ${v.toLocaleString()} | ${rateStr} | ${baseStr} | ${flag} |
`;
}
html += '
';
- // Stash for next tick's delta computation.
- window._perfWriteSourcesPrev = { sources: { ...src }, sampleAt: writeSources.sampleAt };
if (writeSources.sampleAt) {
- html += `Sampled: ${writeSources.sampleAt}
`;
+ html += `Sampled: ${writeSources.sampleAt} · baseline window: 5 min · threshold: 10×
`;
}
}
}
diff --git a/test-perf-anomaly.js b/test-perf-anomaly.js
new file mode 100644
index 00000000..c9710e51
--- /dev/null
+++ b/test-perf-anomaly.js
@@ -0,0 +1,107 @@
+/* Unit tests for perf.js anomaly detection — 5-minute rolling baseline.
+ *
+ * Issue #1120 acceptance criterion: "Per-component write rate > 10× steady-state
+ * baseline" flagged with ⚠️. The baseline must be a 5-minute rolling window,
+ * not a single sample-to-sample comparison (which gives false negatives during
+ * a slow ramp and false positives during natural bursts).
+ *
+ * This file exercises window.detectPerfAnomalies(history, current, opts).
+ */
+'use strict';
+const vm = require('vm');
+const fs = require('fs');
+
+const code = fs.readFileSync('public/perf.js', 'utf8');
+const ctx = {
+ window: {},
+ document: { addEventListener() {}, getElementById() { return null; }, hidden: true },
+ console,
+ fetch: () => Promise.resolve({ json: () => Promise.resolve(null) }),
+ setInterval: () => 0,
+ clearInterval: () => {},
+ registerPage: () => {},
+};
+vm.createContext(ctx);
+vm.runInContext(code, ctx);
+
+const detect = ctx.window.detectPerfAnomalies;
+if (typeof detect !== 'function') {
+ console.log('FAIL: window.detectPerfAnomalies is not a function (got ' + typeof detect + ')');
+ process.exit(1);
+}
+
+let pass = 0, fail = 0;
+function test(name, fn) {
+ try { fn(); pass++; console.log(' ✅ ' + name); }
+ catch (e) { fail++; console.log(' ❌ ' + name + ': ' + e.message); }
+}
+function assert(cond, msg) { if (!cond) throw new Error(msg || 'assertion failed'); }
+
+// Build a 5-minute history where backfill_path_json increments at a steady
+// 1/sec baseline (300 samples over 300s), tx_inserted at 5/sec.
+function buildHistory(startMs, durSec, perSec) {
+ const h = [];
+ let cum = {};
+ for (const k of Object.keys(perSec)) cum[k] = 0;
+ for (let i = 0; i <= durSec; i++) {
+ const ts = new Date(startMs + i * 1000).toISOString();
+ const snap = { sampleAt: ts, sources: {} };
+ for (const k of Object.keys(perSec)) {
+ cum[k] += perSec[k];
+ snap.sources[k] = cum[k];
+ }
+ h.push(snap);
+ }
+ return h;
+}
+
+test('⚠️ fires when backfill rate hits 11× the 5-minute baseline', () => {
+ const t0 = Date.UTC(2026, 5, 5, 0, 0, 0);
+ const history = buildHistory(t0, 300, { backfill_path_json: 1, tx_inserted: 5 });
+ // Now a fresh sample at t0+301s where backfill_path_json jumped from 300→311
+ // (11/sec over 1s), tx_inserted continues at 5/sec.
+ const last = history[history.length - 1];
+ const current = {
+ sampleAt: new Date(t0 + 301 * 1000).toISOString(),
+ sources: {
+ backfill_path_json: last.sources.backfill_path_json + 11,
+ tx_inserted: last.sources.tx_inserted + 5,
+ },
+ };
+ const r = detect(history, current, { windowMs: 5 * 60 * 1000, factor: 10 });
+ assert(r && r.flags, 'expected result with flags map');
+ assert(r.flags.backfill_path_json === true,
+ 'expected backfill_path_json flagged at 11× baseline, got flags=' + JSON.stringify(r.flags) +
+ ' rates=' + JSON.stringify(r.rates) + ' baselines=' + JSON.stringify(r.baselineRates));
+});
+
+test('no flag at 5× baseline (under threshold)', () => {
+ const t0 = Date.UTC(2026, 5, 5, 0, 0, 0);
+ const history = buildHistory(t0, 300, { backfill_path_json: 2, tx_inserted: 5 });
+ const last = history[history.length - 1];
+ const current = {
+ sampleAt: new Date(t0 + 301 * 1000).toISOString(),
+ sources: {
+ backfill_path_json: last.sources.backfill_path_json + 10, // 10/sec vs 2/sec baseline = 5×
+ tx_inserted: last.sources.tx_inserted + 5,
+ },
+ };
+ const r = detect(history, current, { windowMs: 5 * 60 * 1000, factor: 10 });
+ assert(!r.flags.backfill_path_json,
+ 'expected no flag at 5× baseline, got ' + JSON.stringify(r.flags));
+});
+
+test('no flag without enough history (< 30s of samples)', () => {
+ const t0 = Date.UTC(2026, 5, 5, 0, 0, 0);
+ const history = buildHistory(t0, 5, { backfill_path_json: 1 });
+ const last = history[history.length - 1];
+ const current = {
+ sampleAt: new Date(t0 + 6 * 1000).toISOString(),
+ sources: { backfill_path_json: last.sources.backfill_path_json + 100 },
+ };
+ const r = detect(history, current, { windowMs: 5 * 60 * 1000, factor: 10, minHistorySec: 30 });
+ assert(!r.flags.backfill_path_json, 'expected no flag with insufficient history');
+});
+
+console.log('\n' + pass + ' passed, ' + fail + ' failed');
+process.exit(fail === 0 ? 0 : 1);