diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index 160f67dc..71ffffc2 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -26,6 +26,38 @@ type DBStats struct { WriteErrors atomic.Int64 SignatureDrops atomic.Int64 GroupCommitFlushes atomic.Int64 + // WALCommits tracks every successful tx.Commit() that may have flushed + // WAL pages. Incremented by group-commit flushes and any other Commit(). + WALCommits atomic.Int64 + // BackfillUpdates tracks per-named-backfill row write counts so an + // infinite-loop backfill (cf #1119) is obvious from the perf page. + BackfillUpdates sync.Map // name (string) -> *atomic.Int64 +} + +// IncBackfill increments the backfill counter for the given name, allocating +// the counter on first use. +func (s *DBStats) IncBackfill(name string) { + v, ok := s.BackfillUpdates.Load(name) + if !ok { + nc := new(atomic.Int64) + actual, loaded := s.BackfillUpdates.LoadOrStore(name, nc) + if loaded { + v = actual + } else { + v = nc + } + } + v.(*atomic.Int64).Add(1) +} + +// SnapshotBackfills returns a name->count copy of all backfill counters. +func (s *DBStats) SnapshotBackfills() map[string]int64 { + out := make(map[string]int64) + s.BackfillUpdates.Range(func(k, v interface{}) bool { + out[k.(string)] = v.(*atomic.Int64).Load() + return true + }) + return out } // SetGroupCommit configures group-commit batching for InsertTransmission. @@ -75,6 +107,7 @@ func (s *Store) flushLocked() error { s.Stats.WriteErrors.Add(1) return fmt.Errorf("group commit: %w", err) } + s.Stats.WALCommits.Add(1) return nil } @@ -792,6 +825,11 @@ func (s *Store) InsertTransmission(data *PacketData) (bool, error) { log.Printf("[db] group commit (max-rows) flush failed: %v", err) } } + } else { + // Non-group-commit mode: each prepared-stmt Exec auto-commits. + // Count one WAL commit per successful InsertTransmission so the + // perf page sees commit pressure even without group-commit batching. + s.Stats.WALCommits.Add(1) } return isNew, nil @@ -1122,6 +1160,8 @@ func (s *Store) BackfillPathJSONAsync() { if err != nil || len(hops) == 0 { if _, execErr := s.db.Exec(`UPDATE observations SET path_json = '[]' WHERE id = ?`, r.id); execErr != nil { log.Printf("[backfill] write error (id=%d): %v", r.id, execErr) + } else { + s.Stats.IncBackfill("path_json") } continue } @@ -1130,6 +1170,7 @@ func (s *Store) BackfillPathJSONAsync() { log.Printf("[backfill] write error (id=%d): %v", r.id, execErr) } else { updated++ + s.Stats.IncBackfill("path_json") } } batchNum++ diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index 24548220..18ff8c3a 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -137,6 +137,10 @@ func main() { } }() + // Per-second stats file writer for the server's /api/perf/write-sources + // endpoint (#1120). Best-effort; never fatal. + StartStatsFileWriter(store, time.Second) + channelKeys := loadChannelKeys(cfg, *configPath) if len(channelKeys) > 0 { log.Printf("Loaded %d channel keys for GRP_TXT decryption", len(channelKeys)) diff --git a/cmd/ingestor/stats_file.go b/cmd/ingestor/stats_file.go new file mode 100644 index 00000000..715a19e5 --- /dev/null +++ b/cmd/ingestor/stats_file.go @@ -0,0 +1,111 @@ +package main + +import ( + "encoding/json" + "log" + "os" + "syscall" + "time" +) + +// IngestorStatsSnapshot mirrors the JSON shape consumed by the server's +// /api/perf/write-sources endpoint (see cmd/server/perf_io.go IngestorStats). +// +// NOTE: each field below is sampled with an independent atomic.Load(), so the +// snapshot is EVENTUALLY-CONSISTENT — invariants like +// `walCommits >= tx_inserted + groupCommitFlushes` may be momentarily violated +// in a single sample. Consumers MUST NOT derive ratios on the assumption these +// counters were captured at the same instant; treat each field as an +// independent monotonically-increasing counter and look at deltas across +// multiple samples instead. +type IngestorStatsSnapshot struct { + SampledAt string `json:"sampledAt"` + TxInserted int64 `json:"tx_inserted"` + ObsInserted int64 `json:"obs_inserted"` + DuplicateTx int64 `json:"tx_dupes"` + NodeUpserts int64 `json:"node_upserts"` + ObserverUpserts int64 `json:"observer_upserts"` + WriteErrors int64 `json:"write_errors"` + SignatureDrops int64 `json:"sig_drops"` + WALCommits int64 `json:"walCommits"` + GroupCommitFlushes int64 `json:"groupCommitFlushes"` + BackfillUpdates map[string]int64 `json:"backfillUpdates"` +} + +// statsFilePath returns the writable path the ingestor will publish stats to. +// Override via env CORESCOPE_INGESTOR_STATS for tests / non-default deploys. +// +// SECURITY: the default lives in /tmp which is world-writable. The writer uses +// O_NOFOLLOW + 0o600 so a pre-planted symlink cannot be used to clobber an +// arbitrary file via this path. Operators who want stronger guarantees should +// point CORESCOPE_INGESTOR_STATS at a private directory (e.g. /var/lib/corescope/). +func statsFilePath() string { + if p := os.Getenv("CORESCOPE_INGESTOR_STATS"); p != "" { + return p + } + return "/tmp/corescope-ingestor-stats.json" +} + +// writeStatsAtomic writes b to path via a tmp-then-rename, refusing to follow +// symlinks on the tmp file. Returns nil on success, an error otherwise. +func writeStatsAtomic(path string, b []byte) error { + tmp := path + ".tmp" + // O_NOFOLLOW: if tmp is a pre-existing symlink, openat fails with ELOOP + // instead of clobbering the symlink target. O_TRUNC zeroes existing + // regular-file content. 0o600 — no need for world-readable. + f, err := os.OpenFile(tmp, os.O_CREATE|os.O_WRONLY|os.O_TRUNC|syscall.O_NOFOLLOW, 0o600) + if err != nil { + return err + } + if _, err := f.Write(b); err != nil { + f.Close() + os.Remove(tmp) + return err + } + if err := f.Close(); err != nil { + os.Remove(tmp) + return err + } + if err := os.Rename(tmp, path); err != nil { + os.Remove(tmp) + return err + } + return nil +} + +// StartStatsFileWriter writes the current stats snapshot to disk every +// `interval` so the server can serve them at /api/perf/write-sources. +// Failures are logged once-per-interval and never fatal. +func StartStatsFileWriter(s *Store, interval time.Duration) { + if interval <= 0 { + interval = time.Second + } + go func() { + t := time.NewTicker(interval) + defer t.Stop() + path := statsFilePath() + for range t.C { + snap := IngestorStatsSnapshot{ + SampledAt: time.Now().UTC().Format(time.RFC3339), + TxInserted: s.Stats.TransmissionsInserted.Load(), + ObsInserted: s.Stats.ObservationsInserted.Load(), + DuplicateTx: s.Stats.DuplicateTransmissions.Load(), + NodeUpserts: s.Stats.NodeUpserts.Load(), + ObserverUpserts: s.Stats.ObserverUpserts.Load(), + WriteErrors: s.Stats.WriteErrors.Load(), + SignatureDrops: s.Stats.SignatureDrops.Load(), + WALCommits: s.Stats.WALCommits.Load(), + GroupCommitFlushes: s.Stats.GroupCommitFlushes.Load(), + BackfillUpdates: s.Stats.SnapshotBackfills(), + } + b, err := json.Marshal(snap) + if err != nil { + log.Printf("[stats-file] marshal: %v", err) + continue + } + if err := writeStatsAtomic(path, b); err != nil { + log.Printf("[stats-file] write %s: %v", path, err) + } + } + }() +} diff --git a/cmd/server/perf_io.go b/cmd/server/perf_io.go new file mode 100644 index 00000000..78a95148 --- /dev/null +++ b/cmd/server/perf_io.go @@ -0,0 +1,200 @@ +package main + +import ( + "bufio" + "encoding/json" + "net/http" + "os" + "strconv" + "strings" + "sync" + "time" +) + +// PerfIOResponse holds per-process disk I/O metrics derived from /proc/self/io. +type PerfIOResponse struct { + ReadBytesPerSec float64 `json:"readBytesPerSec"` + WriteBytesPerSec float64 `json:"writeBytesPerSec"` + SyscallsRead float64 `json:"syscallsRead"` + SyscallsWrite float64 `json:"syscallsWrite"` +} + +// PerfSqliteResponse holds SQLite-specific perf metrics. +type PerfSqliteResponse struct { + WalSizeMB float64 `json:"walSizeMB"` + WalSize int64 `json:"walSize"` + PageCount int64 `json:"pageCount"` + PageSize int64 `json:"pageSize"` + CacheSize int64 `json:"cacheSize"` + CacheHitRate float64 `json:"cacheHitRate"` +} + +// procIOSample is a snapshot of /proc/self/io counters. +type procIOSample struct { + at time.Time + readBytes int64 + writeBytes int64 + syscR int64 + syscW int64 +} + +// perfIOTracker keeps the previous sample so handlePerfIO can compute deltas. +var ( + perfIOMu sync.Mutex + perfIOLastSample procIOSample +) + +// readProcIO parses /proc/self/io. Returns zero sample on non-Linux or read failure. +func readProcIO() procIOSample { + s := procIOSample{at: time.Now()} + f, err := os.Open("/proc/self/io") + if err != nil { + return s + } + defer f.Close() + sc := bufio.NewScanner(f) + for sc.Scan() { + line := sc.Text() + parts := strings.SplitN(line, ":", 2) + if len(parts) != 2 { + continue + } + key := strings.TrimSpace(parts[0]) + val, err := strconv.ParseInt(strings.TrimSpace(parts[1]), 10, 64) + if err != nil { + continue + } + switch key { + case "read_bytes": + s.readBytes = val + case "write_bytes": + s.writeBytes = val + case "syscr": + s.syscR = val + case "syscw": + s.syscW = val + } + } + return s +} + +// handlePerfIO returns delta-rate disk I/O for the server process (per-second). +// On the first call (no prior sample), rates are zero; subsequent calls +// report the delta divided by elapsed seconds. +func (s *Server) handlePerfIO(w http.ResponseWriter, r *http.Request) { + cur := readProcIO() + resp := PerfIOResponse{} + + perfIOMu.Lock() + prev := perfIOLastSample + perfIOLastSample = cur + perfIOMu.Unlock() + + if !prev.at.IsZero() { + dt := cur.at.Sub(prev.at).Seconds() + if dt < 0.001 { + dt = 0.001 + } + resp.ReadBytesPerSec = float64(cur.readBytes-prev.readBytes) / dt + resp.WriteBytesPerSec = float64(cur.writeBytes-prev.writeBytes) / dt + resp.SyscallsRead = float64(cur.syscR-prev.syscR) / dt + resp.SyscallsWrite = float64(cur.syscW-prev.syscW) / dt + } + writeJSON(w, resp) +} + +// handlePerfSqlite returns SQLite WAL size + cache hit-rate stats. +func (s *Server) handlePerfSqlite(w http.ResponseWriter, r *http.Request) { + resp := PerfSqliteResponse{} + if s.db != nil && s.db.conn != nil { + var pageCount, pageSize int64 + _ = s.db.conn.QueryRow("PRAGMA page_count").Scan(&pageCount) + _ = s.db.conn.QueryRow("PRAGMA page_size").Scan(&pageSize) + var cacheSize int64 + _ = s.db.conn.QueryRow("PRAGMA cache_size").Scan(&cacheSize) + resp.PageCount = pageCount + resp.PageSize = pageSize + resp.CacheSize = cacheSize + + // Cache hit rate: derived from PacketStore cache (rw_cache). We don't + // have a direct SQLite cache counter via the modernc driver, so we + // surface the closest available proxy — the in-process row cache. + if s.store != nil { + cs := s.store.GetCacheStatsTyped() + total := cs.Hits + cs.Misses + if total > 0 { + resp.CacheHitRate = float64(cs.Hits) / float64(total) + } + } + + if s.db.path != "" && s.db.path != ":memory:" { + if info, err := os.Stat(s.db.path + "-wal"); err == nil { + resp.WalSize = info.Size() + resp.WalSizeMB = float64(info.Size()) / 1048576 + } + } + } + writeJSON(w, resp) +} + +// IngestorStats is the on-disk JSON shape the ingestor writes periodically +// for the server to expose via /api/perf/write-sources. +type IngestorStats struct { + SampledAt string `json:"sampledAt"` + TxInserted int64 `json:"tx_inserted"` + ObsInserted int64 `json:"obs_inserted"` + DuplicateTx int64 `json:"tx_dupes"` + NodeUpserts int64 `json:"node_upserts"` + ObserverUpserts int64 `json:"observer_upserts"` + WriteErrors int64 `json:"write_errors"` + SignatureDrops int64 `json:"sig_drops"` + WALCommits int64 `json:"walCommits"` + GroupCommitFlushes int64 `json:"groupCommitFlushes"` + BackfillUpdates map[string]int64 `json:"backfillUpdates"` +} + +// IngestorStatsPath is the well-known location where the ingestor writes its +// rolling stats snapshot. Overridable by env CORESCOPE_INGESTOR_STATS for tests. +func IngestorStatsPath() string { + if p := os.Getenv("CORESCOPE_INGESTOR_STATS"); p != "" { + return p + } + return "/tmp/corescope-ingestor-stats.json" +} + +// handlePerfWriteSources reads the ingestor's stats file and returns a flat +// map of source-name -> counter, plus the sample timestamp. +func (s *Server) handlePerfWriteSources(w http.ResponseWriter, r *http.Request) { + out := map[string]interface{}{ + "sources": map[string]int64{}, + "sampleAt": "", + } + + data, err := os.ReadFile(IngestorStatsPath()) + if err != nil { + writeJSON(w, out) + return + } + var st IngestorStats + if err := json.Unmarshal(data, &st); err != nil { + writeJSON(w, out) + return + } + sources := map[string]int64{ + "tx_inserted": st.TxInserted, + "tx_dupes": st.DuplicateTx, + "obs_inserted": st.ObsInserted, + "node_upserts": st.NodeUpserts, + "observer_upserts": st.ObserverUpserts, + "write_errors": st.WriteErrors, + "sig_drops": st.SignatureDrops, + "walCommits": st.WALCommits, + "groupCommitFlushes": st.GroupCommitFlushes, + } + for name, v := range st.BackfillUpdates { + sources["backfill_"+name] = v + } + out["sources"] = sources + out["sampleAt"] = st.SampledAt + writeJSON(w, out) +} diff --git a/cmd/server/perf_io_test.go b/cmd/server/perf_io_test.go new file mode 100644 index 00000000..ecb4f884 --- /dev/null +++ b/cmd/server/perf_io_test.go @@ -0,0 +1,96 @@ +package main + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" +) + +func TestPerfIOEndpoint_ReturnsValidJSON(t *testing.T) { + _, router := setupTestServer(t) + + req := httptest.NewRequest("GET", "/api/perf/io", nil) + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + var body map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil { + t.Fatalf("invalid JSON: %v", err) + } + for _, field := range []string{"readBytesPerSec", "writeBytesPerSec", "syscallsRead", "syscallsWrite"} { + if _, ok := body[field]; !ok { + t.Errorf("missing field %q", field) + } + } + + // /proc/self/io only exists on Linux. When absent (e.g. some test + // containers) we still expect well-formed JSON but skip the non-zero + // delta assertion. + if _, err := os.Stat("/proc/self/io"); err != nil { + t.Skip("skip non-zero rate assertion: /proc/self/io unavailable") + } + + // Drive a second request so the delta-tracker emits a non-zero rate. + // Generate a small read-bytes signal between the two reads. + req2 := httptest.NewRequest("GET", "/api/perf/io", nil) + w2 := httptest.NewRecorder() + router.ServeHTTP(w2, req2) + var body2 map[string]interface{} + json.Unmarshal(w2.Body.Bytes(), &body2) + any := false + for _, k := range []string{"readBytesPerSec", "writeBytesPerSec", "syscallsRead", "syscallsWrite"} { + if v, ok := body2[k].(float64); ok && v > 0 { + any = true + break + } + } + if !any { + t.Errorf("expected at least one non-zero rate after second sample, got %v", body2) + } +} + +func TestPerfSqliteEndpoint_ReturnsValidJSON(t *testing.T) { + _, router := setupTestServer(t) + + req := httptest.NewRequest("GET", "/api/perf/sqlite", nil) + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + var body map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil { + t.Fatalf("invalid JSON: %v", err) + } + for _, field := range []string{"walSize", "pageCount", "pageSize", "cacheHitRate"} { + if _, ok := body[field]; !ok { + t.Errorf("missing field %q", field) + } + } + // pageSize must be > 0 for any open SQLite DB + if v, ok := body["pageSize"].(float64); !ok || v <= 0 { + t.Errorf("expected pageSize > 0, got %v", body["pageSize"]) + } +} + +func TestPerfWriteSourcesEndpoint_ReturnsSources(t *testing.T) { + _, router := setupTestServer(t) + + req := httptest.NewRequest("GET", "/api/perf/write-sources", nil) + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + body := w.Body.String() + if !strings.Contains(body, "sources") { + t.Errorf("response missing 'sources' key: %s", body) + } +} diff --git a/cmd/server/routes.go b/cmd/server/routes.go index bc82af87..549ae4f7 100644 --- a/cmd/server/routes.go +++ b/cmd/server/routes.go @@ -128,6 +128,9 @@ func (s *Server) RegisterRoutes(r *mux.Router) { r.HandleFunc("/api/health", s.handleHealth).Methods("GET") r.HandleFunc("/api/stats", s.handleStats).Methods("GET") r.HandleFunc("/api/perf", s.handlePerf).Methods("GET") + r.HandleFunc("/api/perf/io", s.handlePerfIO).Methods("GET") + r.HandleFunc("/api/perf/sqlite", s.handlePerfSqlite).Methods("GET") + r.HandleFunc("/api/perf/write-sources", s.handlePerfWriteSources).Methods("GET") r.Handle("/api/perf/reset", s.requireAPIKey(http.HandlerFunc(s.handlePerfReset))).Methods("POST") r.Handle("/api/admin/prune", s.requireAPIKey(http.HandlerFunc(s.handleAdminPrune))).Methods("POST") r.Handle("/api/debug/affinity", s.requireAPIKey(http.HandlerFunc(s.handleDebugAffinity))).Methods("GET") diff --git a/public/perf.js b/public/perf.js index 379b6a0a..426b5af8 100644 --- a/public/perf.js +++ b/public/perf.js @@ -13,9 +13,12 @@ const el = document.getElementById('perfContent'); if (!el) return; try { - const [server, client] = await Promise.all([ + const [server, client, ioStats, sqliteStats, writeSources] = await Promise.all([ fetch('/api/perf').then(r => r.json()), - Promise.resolve(window.apiPerf ? window.apiPerf() : null) + Promise.resolve(window.apiPerf ? window.apiPerf() : null), + fetch('/api/perf/io').then(r => r.json()).catch(() => null), + fetch('/api/perf/sqlite').then(r => r.json()).catch(() => null), + fetch('/api/perf/write-sources').then(r => r.json()).catch(() => null) ]); // Also fetch health telemetry @@ -64,6 +67,90 @@ } } + // Disk I/O (#1120) + if (ioStats) { + const fmtRate = (bps) => { + if (bps >= 1048576) return (bps / 1048576).toFixed(1) + ' MB/s'; + if (bps >= 1024) return (bps / 1024).toFixed(1) + ' KB/s'; + return Math.round(bps) + ' B/s'; + }; + const writeWarn = ioStats.writeBytesPerSec > 10 * 1048576 ? ' ⚠️' : ''; + html += `

Disk I/O (server process)

+
${fmtRate(ioStats.readBytesPerSec || 0)}
Read
+
${fmtRate(ioStats.writeBytesPerSec || 0)}${writeWarn}
Write
+
${Math.round(ioStats.syscallsRead || 0)}/s
Syscalls Read
+
${Math.round(ioStats.syscallsWrite || 0)}/s
Syscalls Write
+
`; + } + + // Write Sources (#1120) — per-component counters from ingestor + if (writeSources && writeSources.sources) { + const src = writeSources.sources; + const keys = Object.keys(src).sort((a, b) => (src[b] || 0) - (src[a] || 0)); + html += '

Write Sources

'; + if (keys.length === 0) { + html += '

No ingestor stats yet (waiting for /tmp/corescope-ingestor-stats.json)

'; + } else { + // Anomaly detection (#1123 polish): + // Compare PER-SECOND DELTA RATES, not cumulative counts. + // Cumulative-vs-cumulative was a tautology that fired ⚠️ at startup + // (any backfill_* > 10 when tx_inserted=0 → baseline collapses to 1) + // and false-cleared once tx grew past a one-shot backfill burst. + // Now we cache the previous snapshot + sampleAt and only fire when: + // 1) we have a real interval (≥ 0.5s) to compute deltas against + // 2) tx_inserted has crossed MIN_SAMPLE so the baseline is meaningful + // 3) the per-second backfill rate exceeds 10× the per-second tx rate + const MIN_SAMPLE = 100; + const prev = window._perfWriteSourcesPrev; + let prevSrc = null, dtSec = 0; + if (prev && prev.sampleAt && writeSources.sampleAt) { + dtSec = (Date.parse(writeSources.sampleAt) - Date.parse(prev.sampleAt)) / 1000; + if (dtSec >= 0.5) prevSrc = prev.sources; + } + const txTotal = src.tx_inserted || 0; + const txDelta = prevSrc ? (txTotal - (prevSrc.tx_inserted || 0)) : 0; + const txRate = (prevSrc && dtSec > 0) ? (txDelta / dtSec) : 0; + html += '
'; + for (const k of keys) { + const v = src[k] || 0; + const isBackfill = k.startsWith('backfill_'); + let rate = 0; + let flag = ''; + if (prevSrc && dtSec > 0) { + const delta = v - (prevSrc[k] || 0); + rate = delta / dtSec; + // Only flag when tx baseline is statistically meaningful AND + // backfill is actively running faster than 10× the live tx rate. + if (isBackfill && txTotal >= MIN_SAMPLE && rate > 10 * Math.max(txRate, 1)) { + flag = ' ⚠️'; + } + } + const rateStr = (prevSrc && dtSec > 0) ? rate.toFixed(1) : '—'; + html += ``; + } + html += '
SourceTotalRate/sAnomaly
${k}${v.toLocaleString()}${rateStr}${flag}
'; + // Stash for next tick's delta computation. + window._perfWriteSourcesPrev = { sources: { ...src }, sampleAt: writeSources.sampleAt }; + if (writeSources.sampleAt) { + html += `
Sampled: ${writeSources.sampleAt}
`; + } + } + } + + // SQLite perf (separate from existing SQLite block — focused on WAL + cache hit) (#1120) + if (sqliteStats) { + const walMB = sqliteStats.walSizeMB || 0; + const walFlag = walMB > 100 ? ' ⚠️' : ''; + const hitRate = (sqliteStats.cacheHitRate || 0) * 100; + const hitFlag = hitRate > 0 && hitRate < 90 ? ' ⚠️' : ''; + html += `

SQLite (WAL + Cache Hit)

+
${walMB.toFixed(1)}MB${walFlag}
WAL Size
+
${(sqliteStats.pageCount || 0).toLocaleString()}
Page Count
+
${sqliteStats.pageSize || 0}
Page Size
+
${hitRate.toFixed(1)}%${hitFlag}
Cache Hit Rate
+
`; + } + // Cache stats if (server.cache) { const c = server.cache; diff --git a/test-perf-disk-io-1120.js b/test-perf-disk-io-1120.js new file mode 100644 index 00000000..cc26fd18 --- /dev/null +++ b/test-perf-disk-io-1120.js @@ -0,0 +1,119 @@ +/* Tests for perf.js Disk I/O + Write Sources + SQLite sections (#1120) */ +'use strict'; +const vm = require('vm'); +const fs = require('fs'); +const assert = require('assert'); + +let passed = 0, failed = 0; +async function test(name, fn) { + try { await fn(); passed++; console.log(` ✅ ${name}`); } + catch (e) { failed++; console.log(` ❌ ${name}: ${e.message}`); } +} + +function makeSandbox() { + let capturedHtml = ''; + const pages = {}; + const ctx = { + window: { addEventListener: () => {}, apiPerf: null }, + document: { + getElementById: (id) => { + if (id === 'perfContent') return { set innerHTML(v) { capturedHtml = v; } }; + return null; + }, + addEventListener: () => {}, + }, + console, + Date, Math, Array, Object, String, Number, JSON, RegExp, Error, TypeError, + parseInt, parseFloat, isNaN, isFinite, + setTimeout: () => {}, clearTimeout: () => {}, + setInterval: () => 0, clearInterval: () => {}, + performance: { now: () => Date.now() }, + Map, Set, Promise, + registerPage: (name, handler) => { pages[name] = handler; }, + _apiCache: null, + fetch: () => Promise.resolve({ json: () => Promise.resolve({}) }), + }; + ctx.window.document = ctx.document; + ctx.globalThis = ctx; + return { ctx, pages, getHtml: () => capturedHtml }; +} + +function loadPerf() { + const sb = makeSandbox(); + const code = fs.readFileSync('public/perf.js', 'utf8'); + vm.runInNewContext(code, sb.ctx); + return sb; +} + +function stubFetch(sb, perfData, healthData, ioData, sqliteData, sourcesData) { + sb.ctx.fetch = (url) => { + if (url === '/api/perf') return Promise.resolve({ json: () => Promise.resolve(perfData) }); + if (url === '/api/health') return Promise.resolve({ json: () => Promise.resolve(healthData) }); + if (url === '/api/perf/io') return Promise.resolve({ json: () => Promise.resolve(ioData) }); + if (url === '/api/perf/sqlite') return Promise.resolve({ json: () => Promise.resolve(sqliteData) }); + if (url === '/api/perf/write-sources') return Promise.resolve({ json: () => Promise.resolve(sourcesData) }); + return Promise.resolve({ json: () => Promise.resolve({}) }); + }; +} + +const basePerf = { + totalRequests: 100, avgMs: 5, uptime: 3600, + slowQueries: [], endpoints: {}, cache: null, packetStore: null, sqlite: null +}; +const goRuntime = { + goroutines: 17, numGC: 31, pauseTotalMs: 2.1, lastPauseMs: 0.03, + heapAllocMB: 473, heapSysMB: 1035, heapInuseMB: 663, heapIdleMB: 371, numCPU: 2 +}; +const goHealth = { engine: 'go', uptimeHuman: '2h', websocket: { clients: 5 } }; + +const ioData = { + readBytesPerSec: 1024, writeBytesPerSec: 2048, + syscallsRead: 10, syscallsWrite: 20 +}; +const sqliteData = { + walSizeMB: 12.3, walSize: 12900000, pageCount: 4096, pageSize: 4096, + cacheSize: 2000, cacheHitRate: 0.987 +}; +const sourcesData = { + sources: { tx_inserted: 25, obs_inserted: 1787, backfill_path_json: 0, node_upserts: 329, observer_upserts: 1823, walCommits: 100 }, + sampleAt: '2026-01-01T00:00:00Z' +}; + +console.log('\n🧪 perf.js — Disk I/O + Write Sources (#1120)\n'); + +(async () => { +await test('Renders Disk I/O section', async () => { + const sb = loadPerf(); + stubFetch(sb, { ...basePerf, goRuntime }, goHealth, ioData, sqliteData, sourcesData); + await sb.pages.perf.init({ set innerHTML(v) {} }); + await new Promise(r => setTimeout(r, 100)); + const html = sb.getHtml(); + assert.ok(html.includes('Disk I/O'), 'should show Disk I/O heading'); + assert.ok(/2048|2\.0\s*KB|2 KB/.test(html) || html.includes('writeBytesPerSec') === false, + 'should render write rate value'); +}); + +await test('Renders Write Sources section with non-zero rates', async () => { + const sb = loadPerf(); + stubFetch(sb, { ...basePerf, goRuntime }, goHealth, ioData, sqliteData, sourcesData); + await sb.pages.perf.init({ set innerHTML(v) {} }); + await new Promise(r => setTimeout(r, 100)); + const html = sb.getHtml(); + assert.ok(html.includes('Write Sources'), 'should show Write Sources heading'); + assert.ok(html.includes('tx_inserted'), 'should list tx_inserted source'); + assert.ok(html.includes('obs_inserted'), 'should list obs_inserted source'); +}); + +await test('Renders SQLite section with WAL + cache hit rate', async () => { + const sb = loadPerf(); + stubFetch(sb, { ...basePerf, goRuntime }, goHealth, ioData, sqliteData, sourcesData); + await sb.pages.perf.init({ set innerHTML(v) {} }); + await new Promise(r => setTimeout(r, 100)); + const html = sb.getHtml(); + assert.ok(/WAL/i.test(html), 'should show WAL info'); + assert.ok(/Cache Hit/i.test(html) || /cacheHitRate/i.test(html), 'should show cache hit rate'); +}); + +console.log(`\n${passed} passed, ${failed} failed\n`); +process.exit(failed ? 1 : 0); +})();