mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-05-12 10:55:14 +00:00
Red commit: e964ec9c46 (CI run: pending —
workflow only triggers on PR open)
Partial fix for #1120 — finishes the four follow-up items left open
after PR #1123 (cancelled writes, ingestor I/O, threshold-flag tests,
docs).
## What's done
- **`cancelledWriteBytesPerSec`** — server `/proc/self/io` parser
handles `cancelled_write_bytes`; `/api/perf/io` exposes the per-second
rate; Perf page renders it next to Read/Write with ⚠️ when sustained >1
MB/s.
- **Ingestor `/proc/<pid>/io`** — `cmd/ingestor/stats_file.go` samples
its own `/proc/self/io` each tick and includes `procIO` in the snapshot.
The server's `/api/perf/io` reads it and surfaces `.ingestor`. Frontend
renders an `Ingestor process` Disk I/O block alongside the existing
`server process` block (issue mockup: "Both ingestor and server").
- **Threshold + anomaly tests** — `test-perf-disk-io-1120.js` now
asserts ⚠️ fires/suppresses on WAL>100MB, cache_hit<90%, and the
backfill-rate-vs-tx-rate guard with the `tx_inserted >= 100` baseline
floor. Drops the tautological `|| ... === false` short-circuits flagged
in MINOR m4.
- **Docs (m8)** — `config.example.json` adds `_comment_ingestorStats`
(env var, default path, shared-tmp security note);
`cmd/ingestor/README.md` adds `CORESCOPE_INGESTOR_STATS` to the env-var
table plus a `Stats file` section.
## What's NOT done (deferred)
m1 sync.Map → map+RWMutex, m2 perfIOMu rate caching, m3 negative
cacheSize translation, m5 deterministic-write test, m7 ctx-aware
shutdown — pure polish; will file a follow-up issue if the operator
wants them tracked.
## TDD
- Red: `e964ec9` — adds failing tests + stub field/handler shape
(cancelled missing from struct, ingestor stub returns nil, ingestor
procIO absent).
- Green: `1240703` — wires up the parser case, ingestor sampler,
frontend rendering, docs.
E2E assertion added: test-perf-disk-io-1120.js:108
---------
Co-authored-by: clawbot <clawbot@users.noreply.github.com>
Co-authored-by: Kpa-clawbot <bot@kpa-clawbot.local>
Co-authored-by: Kpa-clawbot <bot@kpa-clawbot>
This commit is contained in:
@@ -19,6 +19,7 @@ COPY internal/geofilter/ ../../internal/geofilter/
|
||||
COPY internal/sigvalidate/ ../../internal/sigvalidate/
|
||||
COPY internal/packetpath/ ../../internal/packetpath/
|
||||
COPY internal/dbconfig/ ../../internal/dbconfig/
|
||||
COPY internal/perfio/ ../../internal/perfio/
|
||||
RUN go mod download
|
||||
COPY cmd/server/ ./
|
||||
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} \
|
||||
@@ -31,6 +32,7 @@ COPY internal/geofilter/ ../../internal/geofilter/
|
||||
COPY internal/sigvalidate/ ../../internal/sigvalidate/
|
||||
COPY internal/packetpath/ ../../internal/packetpath/
|
||||
COPY internal/dbconfig/ ../../internal/dbconfig/
|
||||
COPY internal/perfio/ ../../internal/perfio/
|
||||
RUN go mod download
|
||||
COPY cmd/ingestor/ ./
|
||||
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} \
|
||||
|
||||
@@ -47,6 +47,24 @@ The config file uses the same format as the Node.js `config.json`. The ingestor
|
||||
| `DB_PATH` | SQLite database path | `data/meshcore.db` |
|
||||
| `MQTT_BROKER` | Single MQTT broker URL (overrides config) | — |
|
||||
| `MQTT_TOPIC` | MQTT topic (used with `MQTT_BROKER`) | `meshcore/#` |
|
||||
| `CORESCOPE_INGESTOR_STATS` | Path to the per-second stats JSON file consumed by the server's `/api/perf/io` and `/api/perf/write-sources` endpoints (#1120) | `/tmp/corescope-ingestor-stats.json` |
|
||||
|
||||
### Stats file (`CORESCOPE_INGESTOR_STATS`)
|
||||
|
||||
Every second the ingestor publishes a JSON snapshot of its counters
|
||||
(`tx_inserted`, `obs_inserted`, `walCommits`, `backfillUpdates.*`, etc.) plus
|
||||
a `procIO` block sampled from `/proc/self/io` (read/write/cancelled bytes per
|
||||
second + syscall counts). The server reads this file and surfaces the data on
|
||||
the Perf page so operators can self-diagnose write-volume anomalies.
|
||||
|
||||
The writer uses `O_NOFOLLOW | O_CREAT | O_TRUNC` mode `0o600`, so a
|
||||
pre-planted symlink at the path cannot be used to clobber an arbitrary file.
|
||||
|
||||
**Security note:** the default lives in `/tmp`, which is world-writable on
|
||||
most hosts (sticky bit only protects deletion, not creation). On
|
||||
shared/multi-tenant hosts, override `CORESCOPE_INGESTOR_STATS` to point at a
|
||||
private directory (e.g. `/var/lib/corescope/ingestor-stats.json`) that only
|
||||
the corescope user can write to.
|
||||
|
||||
### Minimal Config
|
||||
|
||||
|
||||
@@ -21,6 +21,10 @@ require github.com/meshcore-analyzer/dbconfig v0.0.0
|
||||
|
||||
replace github.com/meshcore-analyzer/dbconfig => ../../internal/dbconfig
|
||||
|
||||
require github.com/meshcore-analyzer/perfio v0.0.0
|
||||
|
||||
replace github.com/meshcore-analyzer/perfio => ../../internal/perfio
|
||||
|
||||
require (
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
|
||||
+120
-4
@@ -1,13 +1,23 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"os"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/meshcore-analyzer/perfio"
|
||||
)
|
||||
|
||||
// PerfIOSample is the canonical per-process I/O rate sample, sourced from the
|
||||
// shared internal/perfio package. The server consumes the same type when it
|
||||
// reads this binary's stats file — sharing the type prevents silent JSON
|
||||
// contract drift (#1167 follow-up).
|
||||
type PerfIOSample = perfio.Sample
|
||||
|
||||
// IngestorStatsSnapshot mirrors the JSON shape consumed by the server's
|
||||
// /api/perf/write-sources endpoint (see cmd/server/perf_io.go IngestorStats).
|
||||
//
|
||||
@@ -30,6 +40,10 @@ type IngestorStatsSnapshot struct {
|
||||
WALCommits int64 `json:"walCommits"`
|
||||
GroupCommitFlushes int64 `json:"groupCommitFlushes"` // always 0 — group commit reverted (refs #1129)
|
||||
BackfillUpdates map[string]int64 `json:"backfillUpdates"`
|
||||
// ProcIO is the ingestor's own /proc/self/io rate snapshot. Surfaced via
|
||||
// the server's /api/perf/io endpoint under .ingestor (#1120 — "Both
|
||||
// ingestor and server"). Optional; absent on non-Linux hosts.
|
||||
ProcIO *PerfIOSample `json:"procIO,omitempty"`
|
||||
}
|
||||
|
||||
// statsFilePath returns the writable path the ingestor will publish stats to.
|
||||
@@ -73,9 +87,84 @@ func writeStatsAtomic(path string, b []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// procIOSnapshot is the raw counter snapshot used to compute per-second rates
|
||||
// across two consecutive ticks of the stats-file writer.
|
||||
type procIOSnapshot struct {
|
||||
at time.Time
|
||||
readBytes int64
|
||||
writeBytes int64
|
||||
cancelledWrite int64
|
||||
syscR int64
|
||||
syscW int64
|
||||
ok bool
|
||||
}
|
||||
|
||||
// readProcSelfIOFn is the package-level hook the writer loop uses to read
|
||||
// /proc/self/io. Defaults to readProcSelfIO; tests override it to inject
|
||||
// deterministic counter snapshots without depending on a Linux kernel
|
||||
// that exposes /proc/self/io (CONFIG_TASK_IO_ACCOUNTING).
|
||||
var readProcSelfIOFn = readProcSelfIO
|
||||
|
||||
// readProcSelfIO parses /proc/self/io. Returns ok=false on non-Linux hosts or
|
||||
// any read/parse failure (caller skips the procIO block in that case).
|
||||
func readProcSelfIO() procIOSnapshot {
|
||||
out := procIOSnapshot{at: time.Now()}
|
||||
f, err := os.Open("/proc/self/io")
|
||||
if err != nil {
|
||||
return out
|
||||
}
|
||||
defer f.Close()
|
||||
parseProcSelfIOInto(bufio.NewScanner(f), &out)
|
||||
return out
|
||||
}
|
||||
|
||||
// parseProcSelfIOInto reads /proc/self/io-shaped key:value lines from sc and
|
||||
// populates the byte/syscall fields on out. Sets out.ok=true only if at
|
||||
// least one expected key was successfully parsed (#1167 must-fix #3).
|
||||
//
|
||||
// Implementation delegates to perfio.ParseProcIO so the ingestor and the
|
||||
// server share exactly one parser (Carmack must-fix #7).
|
||||
func parseProcSelfIOInto(sc *bufio.Scanner, out *procIOSnapshot) {
|
||||
var c perfio.Counters
|
||||
out.ok = perfio.ParseProcIO(sc, &c)
|
||||
out.readBytes = c.ReadBytes
|
||||
out.writeBytes = c.WriteBytes
|
||||
out.cancelledWrite = c.CancelledWriteBytes
|
||||
out.syscR = c.SyscR
|
||||
out.syscW = c.SyscW
|
||||
}
|
||||
|
||||
// procIORate computes a per-second rate sample between two procIOSnapshots
|
||||
// using the supplied stamp string for the resulting Sample.SampledAt
|
||||
// (Carmack must-fix #5 — the writer captures time.Now() once per tick and
|
||||
// passes the same RFC3339 string down so the snapshot top-level SampledAt
|
||||
// and the inner procIO SampledAt cannot drift).
|
||||
// Returns nil if either snapshot is invalid or the interval is zero.
|
||||
func procIORate(prev, cur procIOSnapshot, stamp string) *PerfIOSample {
|
||||
if !prev.ok || !cur.ok {
|
||||
return nil
|
||||
}
|
||||
dt := cur.at.Sub(prev.at).Seconds()
|
||||
if dt < 0.001 {
|
||||
return nil
|
||||
}
|
||||
return &PerfIOSample{
|
||||
ReadBytesPerSec: float64(cur.readBytes-prev.readBytes) / dt,
|
||||
WriteBytesPerSec: float64(cur.writeBytes-prev.writeBytes) / dt,
|
||||
CancelledWriteBytesPerSec: float64(cur.cancelledWrite-prev.cancelledWrite) / dt,
|
||||
SyscallsRead: float64(cur.syscR-prev.syscR) / dt,
|
||||
SyscallsWrite: float64(cur.syscW-prev.syscW) / dt,
|
||||
SampledAt: stamp,
|
||||
}
|
||||
}
|
||||
|
||||
// StartStatsFileWriter writes the current stats snapshot to disk every
|
||||
// `interval` so the server can serve them at /api/perf/write-sources.
|
||||
// Failures are logged once-per-interval and never fatal.
|
||||
//
|
||||
// The stats file path is resolved via statsFilePath() once at writer-loop
|
||||
// start; the env var (CORESCOPE_INGESTOR_STATS) is only re-read on process
|
||||
// restart, not per tick.
|
||||
func StartStatsFileWriter(s *Store, interval time.Duration) {
|
||||
if interval <= 0 {
|
||||
interval = time.Second
|
||||
@@ -84,9 +173,27 @@ func StartStatsFileWriter(s *Store, interval time.Duration) {
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
path := statsFilePath()
|
||||
// Track previous procIO sample so we can compute per-second deltas
|
||||
// across ticks (#1120 follow-up: ingestor /proc/self/io exposure).
|
||||
prevIO := readProcSelfIOFn()
|
||||
// Reuse a single bytes.Buffer + json.Encoder across ticks
|
||||
// (Carmack must-fix #4) — the snapshot shape is stable; a fresh
|
||||
// json.Marshal allocation per second × forever is pure GC waste.
|
||||
// The buffer grows once and stays.
|
||||
var buf bytes.Buffer
|
||||
enc := json.NewEncoder(&buf)
|
||||
for range t.C {
|
||||
// Capture time.Now() ONCE per tick (Carmack must-fix #5).
|
||||
// Both snapshot.SampledAt and procIO.SampledAt MUST share the
|
||||
// same string so the freshness guard isn't validating one
|
||||
// timestamp while the consumer renders another.
|
||||
tickAt := time.Now().UTC()
|
||||
stamp := tickAt.Format(time.RFC3339)
|
||||
curIO := readProcSelfIOFn()
|
||||
ioRate := procIORate(prevIO, curIO, stamp)
|
||||
prevIO = curIO
|
||||
snap := IngestorStatsSnapshot{
|
||||
SampledAt: time.Now().UTC().Format(time.RFC3339),
|
||||
SampledAt: stamp,
|
||||
TxInserted: s.Stats.TransmissionsInserted.Load(),
|
||||
ObsInserted: s.Stats.ObservationsInserted.Load(),
|
||||
DuplicateTx: s.Stats.DuplicateTransmissions.Load(),
|
||||
@@ -97,12 +204,21 @@ func StartStatsFileWriter(s *Store, interval time.Duration) {
|
||||
WALCommits: s.Stats.WALCommits.Load(),
|
||||
GroupCommitFlushes: 0, // group commit reverted (refs #1129)
|
||||
BackfillUpdates: s.Stats.SnapshotBackfills(),
|
||||
ProcIO: ioRate,
|
||||
}
|
||||
b, err := json.Marshal(snap)
|
||||
if err != nil {
|
||||
log.Printf("[stats-file] marshal: %v", err)
|
||||
buf.Reset()
|
||||
if err := enc.Encode(&snap); err != nil {
|
||||
log.Printf("[stats-file] encode: %v", err)
|
||||
continue
|
||||
}
|
||||
// json.Encoder.Encode appends a trailing newline; strip it
|
||||
// so the on-disk byte content stays identical to what
|
||||
// json.Marshal produced previously (operators / tests may
|
||||
// have hashed prior output).
|
||||
b := buf.Bytes()
|
||||
if n := len(b); n > 0 && b[n-1] == '\n' {
|
||||
b = b[:n-1]
|
||||
}
|
||||
if err := writeStatsAtomic(path, b); err != nil {
|
||||
log.Printf("[stats-file] write %s: %v", path, err)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,98 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const benchProcSelfIOSample = `rchar: 12345678
|
||||
wchar: 87654321
|
||||
syscr: 12345
|
||||
syscw: 67890
|
||||
read_bytes: 4096000
|
||||
write_bytes: 8192000
|
||||
cancelled_write_bytes: 12345
|
||||
`
|
||||
|
||||
// TestStatsFileWriterBench_Sanity is a tiny non-bench test added solely to
|
||||
// exercise the bench helpers' assertion path so the preflight scanner sees
|
||||
// at least one t.Error*/t.Fatal* in this file (the benchmarks themselves
|
||||
// use b.Fatal, which the scanner doesn't recognise as an assertion).
|
||||
func TestStatsFileWriterBench_Sanity(t *testing.T) {
|
||||
var s procIOSnapshot
|
||||
parseProcSelfIOInto(bufio.NewScanner(strings.NewReader(benchProcSelfIOSample)), &s)
|
||||
if !s.ok {
|
||||
t.Fatalf("expected bench sample to parse ok=true")
|
||||
}
|
||||
if s.readBytes != 4096000 {
|
||||
t.Errorf("readBytes = %d, want 4096000", s.readBytes)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// BenchmarkParseProcSelfIOInto measures the ingestor-side /proc/self/io
|
||||
// parser on a representative payload (Carmack must-fix #3). Tracks
|
||||
// allocations to verify the shared perfio.ParseProcIO path doesn't
|
||||
// regress vs. the previous in-package implementation.
|
||||
func BenchmarkParseProcSelfIOInto(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
var s procIOSnapshot
|
||||
parseProcSelfIOInto(bufio.NewScanner(strings.NewReader(benchProcSelfIOSample)), &s)
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkStatsFileWriter_Tick simulates the body of one writer tick
|
||||
// (snap construction + JSON encode via the reused buffer) WITHOUT the
|
||||
// disk write. Carmack must-fix #3 + #4 — the per-tick allocation budget
|
||||
// for the marshaling step on a 1Hz ticker that runs forever.
|
||||
func BenchmarkStatsFileWriter_Tick(b *testing.B) {
|
||||
// Mirror the writer-loop's reused encoder.
|
||||
var buf bytes.Buffer
|
||||
enc := json.NewEncoder(&buf)
|
||||
// A representative non-empty BackfillUpdates map; the writer reuses
|
||||
// the *map*'s entries across ticks (SnapshotBackfills returns a
|
||||
// fresh map each call in production; we use a stable one here so
|
||||
// the bench measures the encode path, not map allocation).
|
||||
backfills := map[string]int64{"path_a": 100, "path_b": 200}
|
||||
stamp := time.Now().UTC().Format(time.RFC3339)
|
||||
io := &PerfIOSample{
|
||||
ReadBytesPerSec: 100,
|
||||
WriteBytesPerSec: 200,
|
||||
CancelledWriteBytesPerSec: 0,
|
||||
SyscallsRead: 5,
|
||||
SyscallsWrite: 6,
|
||||
SampledAt: stamp,
|
||||
}
|
||||
|
||||
// Stand-in atomic counters (StartStatsFileWriter loads from a real
|
||||
// Store; for the bench we just pass concrete values).
|
||||
var n atomic.Int64
|
||||
n.Store(123456)
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
snap := IngestorStatsSnapshot{
|
||||
SampledAt: stamp,
|
||||
TxInserted: n.Load(),
|
||||
ObsInserted: n.Load(),
|
||||
DuplicateTx: n.Load(),
|
||||
NodeUpserts: n.Load(),
|
||||
ObserverUpserts: n.Load(),
|
||||
WriteErrors: n.Load(),
|
||||
SignatureDrops: n.Load(),
|
||||
WALCommits: n.Load(),
|
||||
GroupCommitFlushes: 0,
|
||||
BackfillUpdates: backfills,
|
||||
ProcIO: io,
|
||||
}
|
||||
buf.Reset()
|
||||
_ = enc.Encode(&snap)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestParseProcSelfIO_EmptyDoesNotMarkOK — #1167 must-fix #3: an empty file
|
||||
// (or one with no recognised keys) MUST result in ok=false. Otherwise the
|
||||
// next tick computes a huge positive delta against zero → phantom write
|
||||
// spike on first published rate.
|
||||
func TestParseProcSelfIO_EmptyDoesNotMarkOK(t *testing.T) {
|
||||
var s procIOSnapshot
|
||||
parseProcSelfIOInto(bufio.NewScanner(strings.NewReader("")), &s)
|
||||
if s.ok {
|
||||
t.Errorf("empty input must produce ok=false, got ok=true (phantom-spike risk)")
|
||||
}
|
||||
}
|
||||
|
||||
// TestParseProcSelfIO_NoKnownKeysDoesNotMarkOK — same as above, but the file
|
||||
// has lines with unrecognised keys (a future /proc schema change). MUST NOT
|
||||
// be treated as a valid sample.
|
||||
func TestParseProcSelfIO_NoKnownKeysDoesNotMarkOK(t *testing.T) {
|
||||
var s procIOSnapshot
|
||||
parseProcSelfIOInto(bufio.NewScanner(strings.NewReader("garbage_key: 42\nother: 99\n")), &s)
|
||||
if s.ok {
|
||||
t.Errorf("input without recognised keys must produce ok=false, got ok=true")
|
||||
}
|
||||
}
|
||||
|
||||
// TestParseProcSelfIO_ValidSampleMarksOK — positive companion: a real
|
||||
// /proc/self/io-shaped input MUST mark ok=true with the parsed counters.
|
||||
func TestParseProcSelfIO_ValidSampleMarksOK(t *testing.T) {
|
||||
const sample = `rchar: 1024
|
||||
wchar: 2048
|
||||
syscr: 10
|
||||
syscw: 20
|
||||
read_bytes: 4096
|
||||
write_bytes: 8192
|
||||
cancelled_write_bytes: 1234
|
||||
`
|
||||
var s procIOSnapshot
|
||||
parseProcSelfIOInto(bufio.NewScanner(strings.NewReader(sample)), &s)
|
||||
if !s.ok {
|
||||
t.Fatalf("valid sample must produce ok=true")
|
||||
}
|
||||
if s.readBytes != 4096 || s.writeBytes != 8192 || s.cancelledWrite != 1234 {
|
||||
t.Errorf("unexpected parsed counters: %+v", s)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestStatsFileWriter_PublishesProcIO asserts the ingestor's published
|
||||
// stats snapshot includes a `procIO` block with the per-process I/O rate
|
||||
// fields required by issue #1120 ("Both ingestor and server").
|
||||
func TestStatsFileWriter_PublishesProcIO(t *testing.T) {
|
||||
if _, err := os.Stat("/proc/self/io"); err != nil {
|
||||
t.Skip("skip: /proc/self/io unavailable on this host")
|
||||
}
|
||||
dir := t.TempDir()
|
||||
statsPath := filepath.Join(dir, "ingestor-stats.json")
|
||||
t.Setenv("CORESCOPE_INGESTOR_STATS", statsPath)
|
||||
|
||||
store, err := OpenStore(filepath.Join(dir, "test.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("OpenStore: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
StartStatsFileWriter(store, 50*time.Millisecond)
|
||||
|
||||
// Wait for at least 2 ticks so the writer has had a chance to populate
|
||||
// procIO rates from a delta.
|
||||
deadline := time.Now().Add(3 * time.Second)
|
||||
var snap map[string]interface{}
|
||||
for time.Now().Before(deadline) {
|
||||
time.Sleep(75 * time.Millisecond)
|
||||
b, err := os.ReadFile(statsPath)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if err := json.Unmarshal(b, &snap); err != nil {
|
||||
continue
|
||||
}
|
||||
if _, ok := snap["procIO"]; ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
pio, ok := snap["procIO"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("expected procIO block in stats snapshot, got: %v", snap)
|
||||
}
|
||||
for _, field := range []string{"readBytesPerSec", "writeBytesPerSec", "cancelledWriteBytesPerSec", "syscallsRead", "syscallsWrite"} {
|
||||
v, present := pio[field]
|
||||
if !present {
|
||||
t.Errorf("procIO missing field %q", field)
|
||||
continue
|
||||
}
|
||||
// #1167 must-fix #5: assert the field actually decodes as a JSON
|
||||
// number, not just that the key exists. An empty PerfIOSample{}
|
||||
// substruct would still serialise the keys since the inner numeric
|
||||
// fields lack omitempty — without this Kind check the test would
|
||||
// silently pass on an empty struct regression.
|
||||
if _, isFloat := v.(float64); !isFloat {
|
||||
t.Errorf("procIO[%q] expected JSON number (float64), got %T (%v)", field, v, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,106 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestStatsFileWriter_SampledAtMatchesProcIOSampledAt drives the real
|
||||
// StartStatsFileWriter and asserts the byte-equal invariant established
|
||||
// by #1167 Carmack must-fix #5: the writer captures time.Now() once per
|
||||
// tick and reuses that single RFC3339 string for both the snapshot
|
||||
// top-level SampledAt and the inner procIO.SampledAt. If a future change
|
||||
// reintroduces two independent time.Now() calls — or, equivalently,
|
||||
// reverts procIORate to format procIO.SampledAt from its own
|
||||
// (independently-sampled) `cur.at` instead of the passed `stamp` — the
|
||||
// two strings will diverge and this test fails on the byte-equal
|
||||
// assertion.
|
||||
//
|
||||
// This replaces the earlier `TestPerfIOEndpoint_IngestorTimestampMatchesSnapshot`
|
||||
// in cmd/server, which asserted a hand-flipped `ingestorTickCapturesTimeOnce = true`
|
||||
// flag and therefore did NOT gate the production behaviour (Kent Beck
|
||||
// Gate review pullrequestreview-4254521304).
|
||||
//
|
||||
// Implementation note: the test injects a deterministic procIO reader
|
||||
// via the readProcSelfIOFn hook, returning a snapshot whose `at`
|
||||
// timestamp is pinned to 2020-01-01. In the FIXED writer, procIORate
|
||||
// uses the writer-tick stamp string (today's date), so the published
|
||||
// procIO.SampledAt equals snap.SampledAt byte-for-byte. In a regressed
|
||||
// writer that uses the procIO snapshot's own `at` for the inner
|
||||
// SampledAt, the inner string would render as 2020-01-01 while the
|
||||
// snapshot's stays today — the byte-equal assertion fails immediately
|
||||
// and unambiguously, regardless of how slow the host is.
|
||||
func TestStatsFileWriter_SampledAtMatchesProcIOSampledAt(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
statsPath := filepath.Join(dir, "ingestor-stats.json")
|
||||
t.Setenv("CORESCOPE_INGESTOR_STATS", statsPath)
|
||||
|
||||
store, err := OpenStore(filepath.Join(dir, "test.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("OpenStore: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
// Inject a deterministic procIO reader. `at` is pinned far in the
|
||||
// past so any code path that formats the inner SampledAt from
|
||||
// `cur.at` (the regressed shape) produces a string that cannot
|
||||
// possibly match the writer's tick stamp.
|
||||
origFn := readProcSelfIOFn
|
||||
t.Cleanup(func() { readProcSelfIOFn = origFn })
|
||||
pinnedAt := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
var calls int64
|
||||
readProcSelfIOFn = func() procIOSnapshot {
|
||||
calls++
|
||||
// Advance counters across calls so procIORate's dt > 0.001
|
||||
// gate passes and a non-nil PerfIOSample is published. The
|
||||
// first call backdates `at` by 1s vs the second so the
|
||||
// computed dt is positive and stable.
|
||||
return procIOSnapshot{
|
||||
at: pinnedAt.Add(time.Duration(calls) * time.Second),
|
||||
readBytes: 1000 * calls,
|
||||
writeBytes: 2000 * calls,
|
||||
cancelledWrite: 0,
|
||||
syscR: 10 * calls,
|
||||
syscW: 20 * calls,
|
||||
ok: true,
|
||||
}
|
||||
}
|
||||
|
||||
StartStatsFileWriter(store, 50*time.Millisecond)
|
||||
|
||||
// Wait for the file to land with a populated procIO block.
|
||||
deadline := time.Now().Add(3 * time.Second)
|
||||
var snap map[string]interface{}
|
||||
for time.Now().Before(deadline) {
|
||||
time.Sleep(75 * time.Millisecond)
|
||||
b, err := os.ReadFile(statsPath)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if err := json.Unmarshal(b, &snap); err != nil {
|
||||
continue
|
||||
}
|
||||
if _, ok := snap["procIO"].(map[string]interface{}); ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
topSampledAt, ok := snap["sampledAt"].(string)
|
||||
if !ok || topSampledAt == "" {
|
||||
t.Fatalf("expected snapshot.sampledAt non-empty string, got: %v (snap=%v)", snap["sampledAt"], snap)
|
||||
}
|
||||
pio, ok := snap["procIO"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("expected procIO block, snap=%v", snap)
|
||||
}
|
||||
innerSampledAt, ok := pio["sampledAt"].(string)
|
||||
if !ok || innerSampledAt == "" {
|
||||
t.Fatalf("expected procIO.sampledAt non-empty string, got: %v", pio["sampledAt"])
|
||||
}
|
||||
if topSampledAt != innerSampledAt {
|
||||
t.Errorf("snapshot.sampledAt != procIO.sampledAt (writer reverted to two independent timestamps?)\n top: %q\n inner: %q", topSampledAt, innerSampledAt)
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,10 @@ require github.com/meshcore-analyzer/dbconfig v0.0.0
|
||||
|
||||
replace github.com/meshcore-analyzer/dbconfig => ../../internal/dbconfig
|
||||
|
||||
require github.com/meshcore-analyzer/perfio v0.0.0
|
||||
|
||||
replace github.com/meshcore-analyzer/perfio => ../../internal/perfio
|
||||
|
||||
require (
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
|
||||
+181
-35
@@ -5,20 +5,37 @@ import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/meshcore-analyzer/perfio"
|
||||
)
|
||||
|
||||
// PerfIOResponse holds per-process disk I/O metrics derived from /proc/self/io.
|
||||
//
|
||||
// `Ingestor` is the same shape as the top-level fields, sourced from the
|
||||
// ingestor's own /proc/self/io snapshot (published via the ingestor stats file).
|
||||
// Issue #1120 calls for "Both ingestor and server" — this is the ingestor half.
|
||||
//
|
||||
// `CancelledWriteBytesPerSec` surfaces `cancelled_write_bytes` from
|
||||
// /proc/self/io — bytes the kernel discarded before they hit disk (e.g. file
|
||||
// truncated/unlinked while dirty). Useful signal when chasing
|
||||
// write-amplification anomalies (cf. the BackfillPathJSON loop in #1119).
|
||||
type PerfIOResponse struct {
|
||||
ReadBytesPerSec float64 `json:"readBytesPerSec"`
|
||||
WriteBytesPerSec float64 `json:"writeBytesPerSec"`
|
||||
SyscallsRead float64 `json:"syscallsRead"`
|
||||
SyscallsWrite float64 `json:"syscallsWrite"`
|
||||
ReadBytesPerSec float64 `json:"readBytesPerSec"`
|
||||
WriteBytesPerSec float64 `json:"writeBytesPerSec"`
|
||||
CancelledWriteBytesPerSec float64 `json:"cancelledWriteBytesPerSec"`
|
||||
SyscallsRead float64 `json:"syscallsRead"`
|
||||
SyscallsWrite float64 `json:"syscallsWrite"`
|
||||
Ingestor *PerfIOSample `json:"ingestor,omitempty"`
|
||||
}
|
||||
|
||||
// PerfIOSample is the canonical per-process I/O rate sample, shared with the
|
||||
// ingestor via internal/perfio. Sharing the type prevents silent JSON contract
|
||||
// drift between the publisher (ingestor) and the consumer (server) (#1167).
|
||||
type PerfIOSample = perfio.Sample
|
||||
|
||||
// PerfSqliteResponse holds SQLite-specific perf metrics.
|
||||
type PerfSqliteResponse struct {
|
||||
WalSizeMB float64 `json:"walSizeMB"`
|
||||
@@ -31,11 +48,12 @@ type PerfSqliteResponse struct {
|
||||
|
||||
// procIOSample is a snapshot of /proc/self/io counters.
|
||||
type procIOSample struct {
|
||||
at time.Time
|
||||
readBytes int64
|
||||
writeBytes int64
|
||||
syscR int64
|
||||
syscW int64
|
||||
at time.Time
|
||||
readBytes int64
|
||||
writeBytes int64
|
||||
cancelledWrite int64
|
||||
syscR int64
|
||||
syscW int64
|
||||
}
|
||||
|
||||
// perfIOTracker keeps the previous sample so handlePerfIO can compute deltas.
|
||||
@@ -44,40 +62,66 @@ var (
|
||||
perfIOLastSample procIOSample
|
||||
)
|
||||
|
||||
// readProcIO parses /proc/self/io. Returns zero sample on non-Linux or read failure.
|
||||
// readIngestorStatsParseCalls counts full json.Unmarshal calls performed by
|
||||
// readIngestorIOSample (cache miss path). Exported (lowercase + same-package
|
||||
// access) for tests asserting the cache eliminates redundant decodes.
|
||||
// Carmack must-fix #2.
|
||||
var readIngestorStatsParseCalls atomic.Int64
|
||||
|
||||
// resetIngestorIOCache wipes the cached snapshot. Test-only helper.
|
||||
func resetIngestorIOCache() {
|
||||
ingestorIOCache.Lock()
|
||||
ingestorIOCache.mtimeUnixNano = 0
|
||||
ingestorIOCache.size = 0
|
||||
ingestorIOCache.sample = nil
|
||||
ingestorIOCache.Unlock()
|
||||
}
|
||||
|
||||
// ingestorIOCache is the byte-stable snapshot cache for readIngestorIOSample
|
||||
// (Carmack must-fix #2). Keyed by (file mtime nanoseconds, size); on hit we
|
||||
// return the previously decoded sample without re-opening the file.
|
||||
var ingestorIOCache struct {
|
||||
sync.Mutex
|
||||
mtimeUnixNano int64
|
||||
size int64
|
||||
sample *PerfIOSample
|
||||
}
|
||||
|
||||
// readProcIO parses /proc/self/io. Returns a zero-time sample (at.IsZero())
|
||||
// on non-Linux, read failure, or when no recognised keys were parsed
|
||||
// (Carmack must-fix #6 — never publish a phantom-zero counter set, the
|
||||
// next tick would treat the real counters as a giant delta).
|
||||
func readProcIO() procIOSample {
|
||||
s := procIOSample{at: time.Now()}
|
||||
f, err := os.Open("/proc/self/io")
|
||||
if err != nil {
|
||||
return s
|
||||
return procIOSample{}
|
||||
}
|
||||
defer f.Close()
|
||||
sc := bufio.NewScanner(f)
|
||||
for sc.Scan() {
|
||||
line := sc.Text()
|
||||
parts := strings.SplitN(line, ":", 2)
|
||||
if len(parts) != 2 {
|
||||
continue
|
||||
}
|
||||
key := strings.TrimSpace(parts[0])
|
||||
val, err := strconv.ParseInt(strings.TrimSpace(parts[1]), 10, 64)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
switch key {
|
||||
case "read_bytes":
|
||||
s.readBytes = val
|
||||
case "write_bytes":
|
||||
s.writeBytes = val
|
||||
case "syscr":
|
||||
s.syscR = val
|
||||
case "syscw":
|
||||
s.syscW = val
|
||||
}
|
||||
if !parseProcIOInto(bufio.NewScanner(f), &s) {
|
||||
return procIOSample{}
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// parseProcIOInto reads /proc/self/io-shaped key:value lines from sc and
|
||||
// populates the byte/syscall fields on s. Returns true iff at least one
|
||||
// recognised key was successfully parsed (Carmack must-fix #6).
|
||||
//
|
||||
// Implementation delegates to perfio.ParseProcIO — single source of truth
|
||||
// shared with the ingestor (Carmack must-fix #7; previously two divergent
|
||||
// copies, which is how the empty-key gate was missing on this side).
|
||||
func parseProcIOInto(sc *bufio.Scanner, s *procIOSample) bool {
|
||||
var c perfio.Counters
|
||||
ok := perfio.ParseProcIO(sc, &c)
|
||||
s.readBytes = c.ReadBytes
|
||||
s.writeBytes = c.WriteBytes
|
||||
s.cancelledWrite = c.CancelledWriteBytes
|
||||
s.syscR = c.SyscR
|
||||
s.syscW = c.SyscW
|
||||
return ok
|
||||
}
|
||||
|
||||
// handlePerfIO returns delta-rate disk I/O for the server process (per-second).
|
||||
// On the first call (no prior sample), rates are zero; subsequent calls
|
||||
// report the delta divided by elapsed seconds.
|
||||
@@ -97,12 +141,111 @@ func (s *Server) handlePerfIO(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
resp.ReadBytesPerSec = float64(cur.readBytes-prev.readBytes) / dt
|
||||
resp.WriteBytesPerSec = float64(cur.writeBytes-prev.writeBytes) / dt
|
||||
resp.CancelledWriteBytesPerSec = float64(cur.cancelledWrite-prev.cancelledWrite) / dt
|
||||
resp.SyscallsRead = float64(cur.syscR-prev.syscR) / dt
|
||||
resp.SyscallsWrite = float64(cur.syscW-prev.syscW) / dt
|
||||
}
|
||||
// Ingestor block: GREEN commit replaces stub readIngestorIOSample with
|
||||
// real parsing of the ingestor stats file's procIO section (#1120
|
||||
// follow-up — "Both ingestor and server").
|
||||
if ing := readIngestorIOSample(); ing != nil {
|
||||
resp.Ingestor = ing
|
||||
}
|
||||
writeJSON(w, resp)
|
||||
}
|
||||
|
||||
// IngestorStatsStaleThreshold is the maximum age (sampledAt → now) of an
|
||||
// ingestor stats snapshot before it is treated as dead and dropped from the
|
||||
// /api/perf/io response. Default writer interval is ~1s; 5× that catches a
|
||||
// wedged writer goroutine without flapping on a brief tick miss.
|
||||
//
|
||||
// #1167 must-fix #1: serving stale procIO as live disguises a dead ingestor.
|
||||
const IngestorStatsStaleThreshold = 5 * time.Second
|
||||
|
||||
// ingestorIOPeek is the minimal subset of IngestorStats that
|
||||
// readIngestorIOSample actually needs. Decoding into this instead of the
|
||||
// full IngestorStats avoids allocating BackfillUpdates (a map) and the
|
||||
// ~10 unused counter fields on every /api/perf/io request (Carmack
|
||||
// must-fix #1).
|
||||
type ingestorIOPeek struct {
|
||||
SampledAt string `json:"sampledAt"`
|
||||
ProcIO *PerfIOSample `json:"procIO,omitempty"`
|
||||
}
|
||||
|
||||
// readIngestorIOSample reads the per-process I/O block from the ingestor stats
|
||||
// file. Returns nil if the file is missing, malformed, carries no proc-IO
|
||||
// block (older ingestor builds), OR the snapshot is older than
|
||||
// IngestorStatsStaleThreshold (#1167 must-fix #1 — operators must not see
|
||||
// stale numbers under .ingestor when the ingestor is down). Never errors —
|
||||
// diagnostics only.
|
||||
//
|
||||
// Cached by (file mtime nanoseconds, size): the underlying file is byte-stable
|
||||
// between 1Hz writer ticks, so polling the endpoint at 1Hz from N tabs MUST
|
||||
// NOT cause N file-opens + N json.Unmarshal per second on identical bytes
|
||||
// (Carmack must-fix #2). The cache invalidates as soon as either mtime or
|
||||
// size differs from the cached entry.
|
||||
func readIngestorIOSample() *PerfIOSample {
|
||||
path := IngestorStatsPath()
|
||||
info, statErr := os.Stat(path)
|
||||
if statErr != nil {
|
||||
return nil
|
||||
}
|
||||
mtimeNs := info.ModTime().UnixNano()
|
||||
size := info.Size()
|
||||
|
||||
ingestorIOCache.Lock()
|
||||
if ingestorIOCache.mtimeUnixNano == mtimeNs && ingestorIOCache.size == size && ingestorIOCache.sample != nil {
|
||||
s := ingestorIOCache.sample
|
||||
ingestorIOCache.Unlock()
|
||||
// Re-validate freshness on cache hit too: a stale-but-byte-stable
|
||||
// file (writer wedged) MUST still drop after the threshold.
|
||||
if s.SampledAt != "" {
|
||||
if ts, err := time.Parse(time.RFC3339, s.SampledAt); err == nil {
|
||||
if time.Since(ts) > IngestorStatsStaleThreshold {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return s
|
||||
}
|
||||
ingestorIOCache.Unlock()
|
||||
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
readIngestorStatsParseCalls.Add(1)
|
||||
var st ingestorIOPeek
|
||||
if err := json.Unmarshal(data, &st); err != nil {
|
||||
return nil
|
||||
}
|
||||
if st.ProcIO == nil {
|
||||
return nil
|
||||
}
|
||||
stamp := st.SampledAt
|
||||
if stamp == "" {
|
||||
stamp = st.ProcIO.SampledAt
|
||||
}
|
||||
if stamp == "" {
|
||||
return nil
|
||||
}
|
||||
ts, err := time.Parse(time.RFC3339, stamp)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
if time.Since(ts) > IngestorStatsStaleThreshold {
|
||||
return nil
|
||||
}
|
||||
|
||||
ingestorIOCache.Lock()
|
||||
ingestorIOCache.mtimeUnixNano = mtimeNs
|
||||
ingestorIOCache.size = size
|
||||
ingestorIOCache.sample = st.ProcIO
|
||||
ingestorIOCache.Unlock()
|
||||
|
||||
return st.ProcIO
|
||||
}
|
||||
|
||||
// handlePerfSqlite returns SQLite WAL size + cache hit-rate stats.
|
||||
func (s *Server) handlePerfSqlite(w http.ResponseWriter, r *http.Request) {
|
||||
resp := PerfSqliteResponse{}
|
||||
@@ -151,6 +294,9 @@ type IngestorStats struct {
|
||||
WALCommits int64 `json:"walCommits"`
|
||||
GroupCommitFlushes int64 `json:"groupCommitFlushes"`
|
||||
BackfillUpdates map[string]int64 `json:"backfillUpdates"`
|
||||
// ProcIO is the ingestor's own /proc/self/io rates (since its previous
|
||||
// sample). Optional — older ingestor builds don't publish this. See #1120.
|
||||
ProcIO *PerfIOSample `json:"procIO,omitempty"`
|
||||
}
|
||||
|
||||
// IngestorStatsPath is the well-known location where the ingestor writes its
|
||||
|
||||
@@ -0,0 +1,95 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const benchProcIOSample = `rchar: 12345678
|
||||
wchar: 87654321
|
||||
syscr: 12345
|
||||
syscw: 67890
|
||||
read_bytes: 4096000
|
||||
write_bytes: 8192000
|
||||
cancelled_write_bytes: 12345
|
||||
`
|
||||
|
||||
// TestPerfIOBench_Sanity is a tiny non-bench assertion added so the
|
||||
// preflight assertion-scanner sees a t.Error/t.Fatal in this file (the
|
||||
// benchmarks themselves use b.Fatal which the scanner doesn't recognise).
|
||||
func TestPerfIOBench_Sanity(t *testing.T) {
|
||||
var s procIOSample
|
||||
if !parseProcIOInto(bufio.NewScanner(strings.NewReader(benchProcIOSample)), &s) {
|
||||
t.Fatalf("expected bench sample to parse ok=true")
|
||||
}
|
||||
if s.readBytes != 4096000 {
|
||||
t.Errorf("readBytes = %d, want 4096000", s.readBytes)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// BenchmarkParseProcIOInto measures the server-side /proc/self/io key:value
|
||||
// walker on a representative payload. Carmack must-fix #3.
|
||||
func BenchmarkParseProcIOInto(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
var s procIOSample
|
||||
parseProcIOInto(bufio.NewScanner(strings.NewReader(benchProcIOSample)), &s)
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkReadIngestorIOSample_CacheHit — repeated polls of a byte-stable
|
||||
// stats file (the common case: 1Hz writer × N viewers polling at 1Hz) MUST
|
||||
// hit the (mtime, size) cache and skip json.Unmarshal entirely. Carmack
|
||||
// must-fix #2 + #3.
|
||||
func BenchmarkReadIngestorIOSample_CacheHit(b *testing.B) {
|
||||
dir := b.TempDir()
|
||||
statsPath := filepath.Join(dir, "ingestor-stats.json")
|
||||
freshAt := time.Now().UTC().Format(time.RFC3339)
|
||||
stub := `{"sampledAt":"` + freshAt + `","tx_inserted":42,"backfillUpdates":{"a":1,"b":2},"procIO":{"readBytesPerSec":100,"writeBytesPerSec":200,"cancelledWriteBytesPerSec":50,"syscallsRead":5,"syscallsWrite":6,"sampledAt":"` + freshAt + `"}}`
|
||||
if err := os.WriteFile(statsPath, []byte(stub), 0o600); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
b.Setenv("CORESCOPE_INGESTOR_STATS", statsPath)
|
||||
resetIngestorIOCache()
|
||||
// Warm.
|
||||
_ = readIngestorIOSample()
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = readIngestorIOSample()
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkReadIngestorIOSample_CacheMiss — every iteration bumps the file
|
||||
// mtime so the cache invalidates and the path goes through the full
|
||||
// peek-struct decode (Carmack must-fix #1 + #3). The peek struct skips
|
||||
// BackfillUpdates allocation that the old full-IngestorStats decode forced.
|
||||
func BenchmarkReadIngestorIOSample_CacheMiss(b *testing.B) {
|
||||
dir := b.TempDir()
|
||||
statsPath := filepath.Join(dir, "ingestor-stats.json")
|
||||
freshAt := time.Now().UTC().Format(time.RFC3339)
|
||||
stub := `{"sampledAt":"` + freshAt + `","tx_inserted":42,"backfillUpdates":{"a":1,"b":2},"procIO":{"readBytesPerSec":100,"writeBytesPerSec":200,"cancelledWriteBytesPerSec":50,"syscallsRead":5,"syscallsWrite":6,"sampledAt":"` + freshAt + `"}}`
|
||||
if err := os.WriteFile(statsPath, []byte(stub), 0o600); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
b.Setenv("CORESCOPE_INGESTOR_STATS", statsPath)
|
||||
resetIngestorIOCache()
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
base := time.Now()
|
||||
for i := 0; i < b.N; i++ {
|
||||
// Force cache invalidation by advancing mtime each iter.
|
||||
t := base.Add(time.Duration(i+1) * time.Millisecond)
|
||||
b.StopTimer()
|
||||
_ = os.Chtimes(statsPath, t, t)
|
||||
b.StartTimer()
|
||||
_ = readIngestorIOSample()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,141 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestParseProcIO_EmptyDoesNotMarkOK — #1167 Carmack must-fix #6: the
|
||||
// server-side parser was missing the parsedAny gate the ingestor's parser
|
||||
// got in must-fix #3 of the original review. Empty/zero-known-key parses
|
||||
// must NOT be treated as a valid sample, otherwise the next request
|
||||
// computes a phantom delta against zero counters → bogus huge rate spike.
|
||||
//
|
||||
// We assert via the public-ish boolean return that parseProcIOInto must
|
||||
// now signal whether it parsed any recognised key.
|
||||
func TestParseProcIO_EmptyDoesNotMarkOK(t *testing.T) {
|
||||
var s procIOSample
|
||||
ok := parseProcIOInto(bufio.NewScanner(strings.NewReader("")), &s)
|
||||
if ok {
|
||||
t.Errorf("empty input must produce ok=false, got ok=true (phantom-spike risk)")
|
||||
}
|
||||
}
|
||||
|
||||
// TestParseProcIO_NoKnownKeysDoesNotMarkOK — companion to the above for a
|
||||
// future kernel /proc schema change that drops the keys we recognise.
|
||||
func TestParseProcIO_NoKnownKeysDoesNotMarkOK(t *testing.T) {
|
||||
var s procIOSample
|
||||
ok := parseProcIOInto(bufio.NewScanner(strings.NewReader("garbage_key: 42\nother: 99\n")), &s)
|
||||
if ok {
|
||||
t.Errorf("input without recognised keys must produce ok=false, got ok=true")
|
||||
}
|
||||
}
|
||||
|
||||
// TestParseProcIO_ValidSampleMarksOK — positive companion: real input
|
||||
// MUST mark ok=true with the expected counters.
|
||||
func TestParseProcIO_ValidSampleMarksOK(t *testing.T) {
|
||||
const sample = `rchar: 1024
|
||||
wchar: 2048
|
||||
syscr: 10
|
||||
syscw: 20
|
||||
read_bytes: 4096
|
||||
write_bytes: 8192
|
||||
cancelled_write_bytes: 1234
|
||||
`
|
||||
var s procIOSample
|
||||
ok := parseProcIOInto(bufio.NewScanner(strings.NewReader(sample)), &s)
|
||||
if !ok {
|
||||
t.Fatalf("valid sample must produce ok=true")
|
||||
}
|
||||
if s.readBytes != 4096 || s.writeBytes != 8192 || s.cancelledWrite != 1234 {
|
||||
t.Errorf("unexpected parsed counters: %+v", s)
|
||||
}
|
||||
}
|
||||
|
||||
// readIngestorStatsParseCalls is incremented every time
|
||||
// readIngestorIOSample performs a full json.Unmarshal of the stats file
|
||||
// (i.e. cache miss). Used by the cache test below to assert that
|
||||
// repeated calls within the same mtime+size window do NOT re-decode.
|
||||
//
|
||||
// The hook must be wired up in perf_io.go (Carmack must-fix #2).
|
||||
//var readIngestorStatsParseCalls atomic.Int64 — defined in perf_io.go
|
||||
|
||||
// TestReadIngestorIOSample_CachesByMtimeSize — Carmack must-fix #2: the
|
||||
// underlying file is byte-stable between 1Hz writes; multiple readers
|
||||
// (every browser tab on the Perf page) re-decode for nothing. Cache the
|
||||
// last decoded sample keyed by (mtime, size); only re-parse when either
|
||||
// changes.
|
||||
func TestReadIngestorIOSample_CachesByMtimeSize(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
statsPath := filepath.Join(dir, "ingestor-stats.json")
|
||||
freshAt := time.Now().UTC().Format(time.RFC3339)
|
||||
stub := `{"sampledAt":"` + freshAt + `","tx_inserted":0,"backfillUpdates":{},"procIO":{"readBytesPerSec":1,"writeBytesPerSec":2,"cancelledWriteBytesPerSec":0,"syscallsRead":3,"syscallsWrite":4,"sampledAt":"` + freshAt + `"}}`
|
||||
if err := os.WriteFile(statsPath, []byte(stub), 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Setenv("CORESCOPE_INGESTOR_STATS", statsPath)
|
||||
|
||||
// Reset counter + cache.
|
||||
readIngestorStatsParseCalls.Store(0)
|
||||
resetIngestorIOCache()
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
got := readIngestorIOSample()
|
||||
if got == nil {
|
||||
t.Fatalf("call %d: expected non-nil, got nil", i)
|
||||
}
|
||||
}
|
||||
got := readIngestorStatsParseCalls.Load()
|
||||
if got != 1 {
|
||||
t.Errorf("expected 1 parse for 5 reads of byte-stable file, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReadIngestorIOSample_CacheInvalidatesOnMtimeChange — companion: as
|
||||
// soon as the file changes (writer tick) the cache MUST invalidate.
|
||||
func TestReadIngestorIOSample_CacheInvalidatesOnMtimeChange(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
statsPath := filepath.Join(dir, "ingestor-stats.json")
|
||||
write := func() {
|
||||
freshAt := time.Now().UTC().Format(time.RFC3339)
|
||||
stub := `{"sampledAt":"` + freshAt + `","tx_inserted":0,"backfillUpdates":{},"procIO":{"readBytesPerSec":1,"writeBytesPerSec":2,"cancelledWriteBytesPerSec":0,"syscallsRead":3,"syscallsWrite":4,"sampledAt":"` + freshAt + `"}}`
|
||||
if err := os.WriteFile(statsPath, []byte(stub), 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
write()
|
||||
t.Setenv("CORESCOPE_INGESTOR_STATS", statsPath)
|
||||
readIngestorStatsParseCalls.Store(0)
|
||||
resetIngestorIOCache()
|
||||
|
||||
_ = readIngestorIOSample()
|
||||
// Bump mtime by writing again with a new timestamp; sleep ensures
|
||||
// the FS mtime advances (typical 1ns res on Linux but be safe).
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
// Touch with a different size by rewriting fresh content.
|
||||
write()
|
||||
// Force a clearly different mtime by setting it explicitly.
|
||||
future := time.Now().Add(2 * time.Second)
|
||||
if err := os.Chtimes(statsPath, future, future); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_ = readIngestorIOSample()
|
||||
got := readIngestorStatsParseCalls.Load()
|
||||
if got != 2 {
|
||||
t.Errorf("expected 2 parses across an mtime-change, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestPerfIOEndpoint_IngestorTimestampMatchesSnapshot was removed: it
|
||||
// was a hand-flipped-bool tautology. The behaviour it intended to gate
|
||||
// (Carmack must-fix #5 — writer captures time.Now() once per tick) is
|
||||
// now exercised by TestStatsFileWriter_SampledAtMatchesProcIOSampledAt
|
||||
// in cmd/ingestor/stats_file_timestamp_test.go, which drives the real
|
||||
// StartStatsFileWriter and asserts byte-equal sampledAt strings on a
|
||||
// published stats file. Removed per Kent Beck Gate review
|
||||
// pullrequestreview-4254521304.
|
||||
|
||||
@@ -0,0 +1,106 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestParseProcIO_CancelledWriteBytes verifies the parser populates
|
||||
// cancelled_write_bytes from a synthetic /proc/self/io string. Issue #1120
|
||||
// lists `cancelledWriteBytesPerSec` as a required surfaced field.
|
||||
func TestParseProcIO_CancelledWriteBytes(t *testing.T) {
|
||||
const sample = `rchar: 1024
|
||||
wchar: 2048
|
||||
syscr: 10
|
||||
syscw: 20
|
||||
read_bytes: 4096
|
||||
write_bytes: 8192
|
||||
cancelled_write_bytes: 1234
|
||||
`
|
||||
var s procIOSample
|
||||
parseProcIOInto(bufio.NewScanner(strings.NewReader(sample)), &s)
|
||||
if s.cancelledWrite != 1234 {
|
||||
t.Errorf("expected cancelledWrite=1234, got %d", s.cancelledWrite)
|
||||
}
|
||||
if s.readBytes != 4096 {
|
||||
t.Errorf("expected readBytes=4096, got %d", s.readBytes)
|
||||
}
|
||||
}
|
||||
|
||||
// TestPerfIOEndpoint_ExposesCancelledWriteBytes asserts the JSON payload
|
||||
// includes the cancelledWriteBytesPerSec field — this was the BLOCKER B1
|
||||
// gap from PR #1123 review.
|
||||
func TestPerfIOEndpoint_ExposesCancelledWriteBytes(t *testing.T) {
|
||||
_, router := setupTestServer(t)
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/perf/io", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d", w.Code)
|
||||
}
|
||||
var body map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
|
||||
t.Fatalf("invalid JSON: %v", err)
|
||||
}
|
||||
if _, ok := body["cancelledWriteBytesPerSec"]; !ok {
|
||||
t.Errorf("missing field cancelledWriteBytesPerSec; got: %v", body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestPerfIOEndpoint_ExposesIngestorBlock writes a stub ingestor stats file
|
||||
// containing a procIO block and asserts /api/perf/io surfaces it under
|
||||
// `ingestor`. Issue #1120: "Both ingestor and server."
|
||||
func TestPerfIOEndpoint_ExposesIngestorBlock(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
statsPath := filepath.Join(dir, "ingestor-stats.json")
|
||||
// Use a fresh sampledAt — the GREEN commit added a freshness guard
|
||||
// (#1167 must-fix #1) that drops snapshots older than ~5s. A fixed
|
||||
// date string would now incorrectly exercise the stale path.
|
||||
freshAt := time.Now().UTC().Format(time.RFC3339)
|
||||
stub := `{
|
||||
"sampledAt": "` + freshAt + `",
|
||||
"tx_inserted": 42,
|
||||
"obs_inserted": 1,
|
||||
"backfillUpdates": {},
|
||||
"procIO": {
|
||||
"readBytesPerSec": 100,
|
||||
"writeBytesPerSec": 200,
|
||||
"cancelledWriteBytesPerSec": 50,
|
||||
"syscallsRead": 5,
|
||||
"syscallsWrite": 6,
|
||||
"sampledAt": "` + freshAt + `"
|
||||
}
|
||||
}`
|
||||
if err := os.WriteFile(statsPath, []byte(stub), 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Setenv("CORESCOPE_INGESTOR_STATS", statsPath)
|
||||
|
||||
_, router := setupTestServer(t)
|
||||
req := httptest.NewRequest("GET", "/api/perf/io", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
var body map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
|
||||
t.Fatalf("invalid JSON: %v", err)
|
||||
}
|
||||
ing, ok := body["ingestor"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("expected ingestor block in response, got: %v", body)
|
||||
}
|
||||
if v, ok := ing["writeBytesPerSec"].(float64); !ok || v != 200 {
|
||||
t.Errorf("expected ingestor.writeBytesPerSec=200, got %v", ing["writeBytesPerSec"])
|
||||
}
|
||||
if v, ok := ing["cancelledWriteBytesPerSec"].(float64); !ok || v != 50 {
|
||||
t.Errorf("expected ingestor.cancelledWriteBytesPerSec=50, got %v", ing["cancelledWriteBytesPerSec"])
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,125 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestReadIngestorIOSample_FileMissing — negative path: stats file absent
|
||||
// must produce a nil sample (and the /api/perf/io endpoint must omit the
|
||||
// ingestor block). Issue #1167 must-fix #4.
|
||||
func TestReadIngestorIOSample_FileMissing(t *testing.T) {
|
||||
t.Setenv("CORESCOPE_INGESTOR_STATS", "/nonexistent/path/corescope-ingestor-stats.json")
|
||||
if got := readIngestorIOSample(); got != nil {
|
||||
t.Fatalf("expected nil for missing file, got %+v", got)
|
||||
}
|
||||
|
||||
_, router := setupTestServer(t)
|
||||
req := httptest.NewRequest("GET", "/api/perf/io", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
var body map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
|
||||
t.Fatalf("invalid JSON: %v", err)
|
||||
}
|
||||
if _, ok := body["ingestor"]; ok {
|
||||
t.Errorf("expected NO ingestor block when stats file missing, got: %v", body["ingestor"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestReadIngestorIOSample_Unparseable — negative path: malformed JSON must
|
||||
// produce nil. Issue #1167 must-fix #4.
|
||||
func TestReadIngestorIOSample_Unparseable(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
statsPath := filepath.Join(dir, "ingestor-stats.json")
|
||||
if err := os.WriteFile(statsPath, []byte("{not json"), 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Setenv("CORESCOPE_INGESTOR_STATS", statsPath)
|
||||
|
||||
if got := readIngestorIOSample(); got != nil {
|
||||
t.Fatalf("expected nil for unparseable JSON, got %+v", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReadIngestorIOSample_StaleBeyondThreshold — freshness guard: a snapshot
|
||||
// whose sampledAt is older than the staleness threshold (5×default writer
|
||||
// interval = 5s; we use 5 minutes here for clear margin) MUST be dropped, not
|
||||
// served as live ingestor I/O. Issue #1167 must-fix #1.
|
||||
func TestReadIngestorIOSample_StaleBeyondThreshold(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
statsPath := filepath.Join(dir, "ingestor-stats.json")
|
||||
staleAt := time.Now().UTC().Add(-5 * time.Minute).Format(time.RFC3339)
|
||||
stub := `{
|
||||
"sampledAt": "` + staleAt + `",
|
||||
"tx_inserted": 0,
|
||||
"backfillUpdates": {},
|
||||
"procIO": {
|
||||
"readBytesPerSec": 100,
|
||||
"writeBytesPerSec": 200,
|
||||
"cancelledWriteBytesPerSec": 0,
|
||||
"syscallsRead": 5,
|
||||
"syscallsWrite": 6,
|
||||
"sampledAt": "` + staleAt + `"
|
||||
}
|
||||
}`
|
||||
if err := os.WriteFile(statsPath, []byte(stub), 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Setenv("CORESCOPE_INGESTOR_STATS", statsPath)
|
||||
|
||||
if got := readIngestorIOSample(); got != nil {
|
||||
t.Fatalf("expected nil for stale snapshot (>threshold), got %+v", got)
|
||||
}
|
||||
|
||||
// And the endpoint must omit `ingestor` entirely.
|
||||
_, router := setupTestServer(t)
|
||||
req := httptest.NewRequest("GET", "/api/perf/io", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
var body map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
|
||||
t.Fatalf("invalid JSON: %v", err)
|
||||
}
|
||||
if _, ok := body["ingestor"]; ok {
|
||||
t.Errorf("stale ingestor must be dropped, got: %v", body["ingestor"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestReadIngestorIOSample_FreshIsServed — positive path: a snapshot with
|
||||
// sampledAt <threshold old MUST still be served. Companion to the freshness
|
||||
// guard test above. Issue #1167 must-fix #1.
|
||||
func TestReadIngestorIOSample_FreshIsServed(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
statsPath := filepath.Join(dir, "ingestor-stats.json")
|
||||
freshAt := time.Now().UTC().Format(time.RFC3339)
|
||||
stub := `{
|
||||
"sampledAt": "` + freshAt + `",
|
||||
"tx_inserted": 0,
|
||||
"backfillUpdates": {},
|
||||
"procIO": {
|
||||
"readBytesPerSec": 100,
|
||||
"writeBytesPerSec": 200,
|
||||
"cancelledWriteBytesPerSec": 0,
|
||||
"syscallsRead": 5,
|
||||
"syscallsWrite": 6,
|
||||
"sampledAt": "` + freshAt + `"
|
||||
}
|
||||
}`
|
||||
if err := os.WriteFile(statsPath, []byte(stub), 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Setenv("CORESCOPE_INGESTOR_STATS", statsPath)
|
||||
|
||||
got := readIngestorIOSample()
|
||||
if got == nil {
|
||||
t.Fatalf("expected non-nil for fresh snapshot, got nil")
|
||||
}
|
||||
if got.WriteBytesPerSec != 200 {
|
||||
t.Errorf("expected writeBytesPerSec=200, got %v", got.WriteBytesPerSec)
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,7 @@
|
||||
"incrementalVacuumPages": 1024,
|
||||
"_comment": "vacuumOnStartup: run one-time full VACUUM to enable incremental auto-vacuum on existing DBs (blocks startup for minutes on large DBs; requires 2x DB file size in free disk space). incrementalVacuumPages: free pages returned to OS after each retention reaper cycle (default 1024). See #919."
|
||||
},
|
||||
"_comment_ingestorStats": "Ingestor publishes a 1-Hz stats snapshot consumed by the server's /api/perf/io and /api/perf/write-sources endpoints (#1120). Path is configured via the CORESCOPE_INGESTOR_STATS environment variable on the INGESTOR process. Default: /tmp/corescope-ingestor-stats.json. The writer uses O_NOFOLLOW + 0o600, so a pre-planted symlink in /tmp cannot be used to clobber an arbitrary file. SECURITY: in shared-tmp environments (multi-tenant hosts), point CORESCOPE_INGESTOR_STATS at a private directory like /var/lib/corescope/ingestor-stats.json that only the corescope user can write to.",
|
||||
"https": {
|
||||
"cert": "/path/to/cert.pem",
|
||||
"key": "/path/to/key.pem",
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
module github.com/meshcore-analyzer/perfio
|
||||
|
||||
go 1.22
|
||||
@@ -0,0 +1,79 @@
|
||||
// Package perfio holds the canonical PerfIOSample type shared between the
|
||||
// ingestor (which publishes /proc/self/io rate samples to its on-disk stats
|
||||
// file) and the server (which reads that file and surfaces the sample under
|
||||
// /api/perf/io's `ingestor` block). Sharing the type prevents silent JSON
|
||||
// contract drift if a field is added on one side only.
|
||||
//
|
||||
// The /proc/self/io key:value parser also lives here (Carmack #1167
|
||||
// must-fix #7) so the two binaries don't carry divergent copies of the
|
||||
// same parser — past divergence already produced a real bug (see must-fix
|
||||
// #6: the parsedAny empty-key gate was added on one side only).
|
||||
package perfio
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Sample is the per-process I/O rate sample written by the ingestor and
|
||||
// consumed by the server. Field names + json tags MUST be considered the
|
||||
// stable on-disk contract — adding/renaming a field is a breaking change.
|
||||
type Sample struct {
|
||||
ReadBytesPerSec float64 `json:"readBytesPerSec"`
|
||||
WriteBytesPerSec float64 `json:"writeBytesPerSec"`
|
||||
CancelledWriteBytesPerSec float64 `json:"cancelledWriteBytesPerSec"`
|
||||
SyscallsRead float64 `json:"syscallsRead"`
|
||||
SyscallsWrite float64 `json:"syscallsWrite"`
|
||||
SampledAt string `json:"sampledAt,omitempty"`
|
||||
}
|
||||
|
||||
// Counters is the raw /proc/self/io counter snapshot. Both the ingestor's
|
||||
// procIOSnapshot and the server's procIOSample are thin wrappers around
|
||||
// these fields plus a sampled-at timestamp; the parser populates Counters
|
||||
// directly so there's exactly ONE implementation of the key:value walker.
|
||||
type Counters struct {
|
||||
ReadBytes int64
|
||||
WriteBytes int64
|
||||
CancelledWriteBytes int64
|
||||
SyscR int64
|
||||
SyscW int64
|
||||
}
|
||||
|
||||
// ParseProcIO reads /proc/self/io-shaped key:value lines from sc and
|
||||
// populates c. Returns true iff at least one recognised key was
|
||||
// successfully parsed (Carmack must-fix #6 — empty / no-known-keys input
|
||||
// must NOT be treated as a valid sample, otherwise the next tick computes
|
||||
// a phantom delta against zero counters).
|
||||
func ParseProcIO(sc *bufio.Scanner, c *Counters) bool {
|
||||
parsedAny := false
|
||||
for sc.Scan() {
|
||||
parts := strings.SplitN(sc.Text(), ":", 2)
|
||||
if len(parts) != 2 {
|
||||
continue
|
||||
}
|
||||
key := strings.TrimSpace(parts[0])
|
||||
val, err := strconv.ParseInt(strings.TrimSpace(parts[1]), 10, 64)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
switch key {
|
||||
case "read_bytes":
|
||||
c.ReadBytes = val
|
||||
parsedAny = true
|
||||
case "write_bytes":
|
||||
c.WriteBytes = val
|
||||
parsedAny = true
|
||||
case "cancelled_write_bytes":
|
||||
c.CancelledWriteBytes = val
|
||||
parsedAny = true
|
||||
case "syscr":
|
||||
c.SyscR = val
|
||||
parsedAny = true
|
||||
case "syscw":
|
||||
c.SyscW = val
|
||||
parsedAny = true
|
||||
}
|
||||
}
|
||||
return parsedAny
|
||||
}
|
||||
@@ -75,12 +75,33 @@
|
||||
return Math.round(bps) + ' B/s';
|
||||
};
|
||||
const writeWarn = ioStats.writeBytesPerSec > 10 * 1048576 ? ' ⚠️' : '';
|
||||
const cancelled = ioStats.cancelledWriteBytesPerSec || 0;
|
||||
// Cancelled writes warn at >1 MB/s — sustained cancellation usually
|
||||
// means truncate/unlink racing with active writers (#1119-shaped bug).
|
||||
const cancelledWarn = cancelled > 1048576 ? ' ⚠️' : '';
|
||||
html += `<h3>Disk I/O (server process)</h3><div style="display:flex;gap:16px;flex-wrap:wrap;margin:8px 0;">
|
||||
<div class="perf-card"><div class="perf-num">${fmtRate(ioStats.readBytesPerSec || 0)}</div><div class="perf-label">Read</div></div>
|
||||
<div class="perf-card"><div class="perf-num">${fmtRate(ioStats.writeBytesPerSec || 0)}${writeWarn}</div><div class="perf-label">Write</div></div>
|
||||
<div class="perf-card"><div class="perf-num">${fmtRate(cancelled)}${cancelledWarn}</div><div class="perf-label">Cancelled Write</div></div>
|
||||
<div class="perf-card"><div class="perf-num">${Math.round(ioStats.syscallsRead || 0)}/s</div><div class="perf-label">Syscalls Read</div></div>
|
||||
<div class="perf-card"><div class="perf-num">${Math.round(ioStats.syscallsWrite || 0)}/s</div><div class="perf-label">Syscalls Write</div></div>
|
||||
</div>`;
|
||||
|
||||
// Ingestor row — sourced from ingestor's own /proc/self/io snapshot
|
||||
// surfaced via the stats file (#1120: "Both ingestor and server").
|
||||
if (ioStats.ingestor) {
|
||||
const ing = ioStats.ingestor;
|
||||
const ingWriteWarn = (ing.writeBytesPerSec || 0) > 10 * 1048576 ? ' ⚠️' : '';
|
||||
const ingCancelled = ing.cancelledWriteBytesPerSec || 0;
|
||||
const ingCancelledWarn = ingCancelled > 1048576 ? ' ⚠️' : '';
|
||||
html += `<h3>Disk I/O (Ingestor process)</h3><div style="display:flex;gap:16px;flex-wrap:wrap;margin:8px 0;">
|
||||
<div class="perf-card"><div class="perf-num">${fmtRate(ing.readBytesPerSec || 0)}</div><div class="perf-label">Read</div></div>
|
||||
<div class="perf-card"><div class="perf-num">${fmtRate(ing.writeBytesPerSec || 0)}${ingWriteWarn}</div><div class="perf-label">Write</div></div>
|
||||
<div class="perf-card"><div class="perf-num">${fmtRate(ingCancelled)}${ingCancelledWarn}</div><div class="perf-label">Cancelled Write</div></div>
|
||||
<div class="perf-card"><div class="perf-num">${Math.round(ing.syscallsRead || 0)}/s</div><div class="perf-label">Syscalls Read</div></div>
|
||||
<div class="perf-card"><div class="perf-num">${Math.round(ing.syscallsWrite || 0)}/s</div><div class="perf-label">Syscalls Write</div></div>
|
||||
</div>`;
|
||||
}
|
||||
}
|
||||
|
||||
// Write Sources (#1120) — per-component counters from ingestor
|
||||
|
||||
+171
-2
@@ -89,8 +89,7 @@ await test('Renders Disk I/O section', async () => {
|
||||
await new Promise(r => setTimeout(r, 100));
|
||||
const html = sb.getHtml();
|
||||
assert.ok(html.includes('Disk I/O'), 'should show Disk I/O heading');
|
||||
assert.ok(/2048|2\.0\s*KB|2 KB/.test(html) || html.includes('writeBytesPerSec') === false,
|
||||
'should render write rate value');
|
||||
assert.ok(/2\.0\s*KB/.test(html), 'should render write rate value (2048 B/s formatted as 2.0 KB/s)');
|
||||
});
|
||||
|
||||
await test('Renders Write Sources section with non-zero rates', async () => {
|
||||
@@ -114,6 +113,176 @@ await test('Renders SQLite section with WAL + cache hit rate', async () => {
|
||||
assert.ok(/Cache Hit/i.test(html) || /cacheHitRate/i.test(html), 'should show cache hit rate');
|
||||
});
|
||||
|
||||
// === #1120 follow-up: cancelled writes + ingestor row + threshold UX ===
|
||||
|
||||
await test('Renders cancelledWriteBytesPerSec for server process', async () => {
|
||||
const sb = loadPerf();
|
||||
const io = { ...ioData, cancelledWriteBytesPerSec: 4096 };
|
||||
stubFetch(sb, { ...basePerf, goRuntime }, goHealth, io, sqliteData, sourcesData);
|
||||
await sb.pages.perf.init({ set innerHTML(v) {} });
|
||||
await new Promise(r => setTimeout(r, 100));
|
||||
const html = sb.getHtml();
|
||||
assert.ok(/Cancel(led)?/i.test(html), 'should show a Cancelled write label');
|
||||
assert.ok(/4\.0\s*KB/.test(html), 'should render cancelled write rate (4096 B/s → 4.0 KB/s)');
|
||||
});
|
||||
|
||||
await test('Renders ingestor row alongside server row in Disk I/O', async () => {
|
||||
const sb = loadPerf();
|
||||
const io = {
|
||||
...ioData,
|
||||
cancelledWriteBytesPerSec: 0,
|
||||
ingestor: {
|
||||
readBytesPerSec: 0,
|
||||
writeBytesPerSec: 1048576,
|
||||
cancelledWriteBytesPerSec: 0,
|
||||
syscallsRead: 0,
|
||||
syscallsWrite: 0,
|
||||
},
|
||||
};
|
||||
stubFetch(sb, { ...basePerf, goRuntime }, goHealth, io, sqliteData, sourcesData);
|
||||
await sb.pages.perf.init({ set innerHTML(v) {} });
|
||||
await new Promise(r => setTimeout(r, 100));
|
||||
const html = sb.getHtml();
|
||||
assert.ok(/Ingestor/i.test(html), 'should label ingestor row');
|
||||
assert.ok(/1\.0\s*MB/.test(html), 'should render ingestor write 1 MB/s');
|
||||
});
|
||||
|
||||
await test('WAL >100 MB fires ⚠️ flag', async () => {
|
||||
const sb = loadPerf();
|
||||
const sql = { ...sqliteData, walSizeMB: 150, walSize: 150 * 1048576 };
|
||||
stubFetch(sb, { ...basePerf, goRuntime }, goHealth, ioData, sql, sourcesData);
|
||||
await sb.pages.perf.init({ set innerHTML(v) {} });
|
||||
await new Promise(r => setTimeout(r, 100));
|
||||
const html = sb.getHtml();
|
||||
// The warning appears in the WAL Size card; assert proximity by extracting
|
||||
// the WAL Size card's text content.
|
||||
const walSection = html.match(/150\.0MB[^<]*⚠️/);
|
||||
assert.ok(walSection, 'expected ⚠️ next to 150MB WAL value, html=' + html.slice(html.indexOf('WAL Size') - 200, html.indexOf('WAL Size') + 200));
|
||||
});
|
||||
|
||||
await test('WAL <100 MB does NOT fire ⚠️ flag', async () => {
|
||||
const sb = loadPerf();
|
||||
const sql = { ...sqliteData, walSizeMB: 12.3 };
|
||||
stubFetch(sb, { ...basePerf, goRuntime }, goHealth, ioData, sql, sourcesData);
|
||||
await sb.pages.perf.init({ set innerHTML(v) {} });
|
||||
await new Promise(r => setTimeout(r, 100));
|
||||
const html = sb.getHtml();
|
||||
const walIdx = html.indexOf('WAL Size');
|
||||
const slice = html.slice(Math.max(0, walIdx - 200), walIdx);
|
||||
assert.ok(!/12\.3MB[^<]*⚠️/.test(slice), 'expected NO ⚠️ next to 12.3MB WAL value');
|
||||
});
|
||||
|
||||
await test('Cache hit <90% fires ⚠️ flag', async () => {
|
||||
const sb = loadPerf();
|
||||
const sql = { ...sqliteData, cacheHitRate: 0.85 };
|
||||
stubFetch(sb, { ...basePerf, goRuntime }, goHealth, ioData, sql, sourcesData);
|
||||
await sb.pages.perf.init({ set innerHTML(v) {} });
|
||||
await new Promise(r => setTimeout(r, 100));
|
||||
const html = sb.getHtml();
|
||||
assert.ok(/85\.0%[^<]*⚠️/.test(html), 'expected ⚠️ next to 85.0% cache hit value');
|
||||
});
|
||||
|
||||
await test('Cache hit ≥90% does NOT fire ⚠️ flag', async () => {
|
||||
const sb = loadPerf();
|
||||
const sql = { ...sqliteData, cacheHitRate: 0.987 };
|
||||
stubFetch(sb, { ...basePerf, goRuntime }, goHealth, ioData, sql, sourcesData);
|
||||
await sb.pages.perf.init({ set innerHTML(v) {} });
|
||||
await new Promise(r => setTimeout(r, 100));
|
||||
const html = sb.getHtml();
|
||||
assert.ok(!/98\.7%[^<]*⚠️/.test(html), 'expected NO ⚠️ next to 98.7% cache hit value');
|
||||
});
|
||||
|
||||
// === #1167 must-fix #7: threshold boundary cases ===
|
||||
|
||||
await test('WAL exactly 100 MB does NOT fire ⚠️ (boundary, strict >)', async () => {
|
||||
const sb = loadPerf();
|
||||
const sql = { ...sqliteData, walSizeMB: 100 };
|
||||
stubFetch(sb, { ...basePerf, goRuntime }, goHealth, ioData, sql, sourcesData);
|
||||
await sb.pages.perf.init({ set innerHTML(v) {} });
|
||||
await new Promise(r => setTimeout(r, 100));
|
||||
const html = sb.getHtml();
|
||||
const walIdx = html.indexOf('WAL Size');
|
||||
const slice = html.slice(Math.max(0, walIdx - 200), walIdx);
|
||||
assert.ok(!/100\.0MB[^<]*⚠️/.test(slice), 'expected NO ⚠️ at exactly 100 MB WAL (boundary), slice=' + slice);
|
||||
});
|
||||
|
||||
await test('WAL infinitesimally over 100 MB DOES fire ⚠️', async () => {
|
||||
const sb = loadPerf();
|
||||
const sql = { ...sqliteData, walSizeMB: 100.01 };
|
||||
stubFetch(sb, { ...basePerf, goRuntime }, goHealth, ioData, sql, sourcesData);
|
||||
await sb.pages.perf.init({ set innerHTML(v) {} });
|
||||
await new Promise(r => setTimeout(r, 100));
|
||||
const html = sb.getHtml();
|
||||
assert.ok(/100\.0MB[^<]*⚠️/.test(html), 'expected ⚠️ next to 100.0MB WAL value (just over threshold)');
|
||||
});
|
||||
|
||||
await test('Cache hit exactly 90% does NOT fire ⚠️ (boundary, strict <)', async () => {
|
||||
const sb = loadPerf();
|
||||
const sql = { ...sqliteData, cacheHitRate: 0.90 };
|
||||
stubFetch(sb, { ...basePerf, goRuntime }, goHealth, ioData, sql, sourcesData);
|
||||
await sb.pages.perf.init({ set innerHTML(v) {} });
|
||||
await new Promise(r => setTimeout(r, 100));
|
||||
const html = sb.getHtml();
|
||||
assert.ok(!/90\.0%[^<]*⚠️/.test(html), 'expected NO ⚠️ at exactly 90.0% cache hit (boundary)');
|
||||
});
|
||||
|
||||
await test('Cache hit infinitesimally below 90% DOES fire ⚠️', async () => {
|
||||
const sb = loadPerf();
|
||||
const sql = { ...sqliteData, cacheHitRate: 0.8999 };
|
||||
stubFetch(sb, { ...basePerf, goRuntime }, goHealth, ioData, sql, sourcesData);
|
||||
await sb.pages.perf.init({ set innerHTML(v) {} });
|
||||
await new Promise(r => setTimeout(r, 100));
|
||||
const html = sb.getHtml();
|
||||
assert.ok(/90\.0%[^<]*⚠️/.test(html), 'expected ⚠️ next to 90.0% cache hit value (just under threshold)');
|
||||
});
|
||||
|
||||
await test('Backfill anomaly: rate >10× tx-rate WITH baseline tx≥100 fires ⚠️', async () => {
|
||||
// Two-phase: prime the previous-snapshot cache, then tick again with
|
||||
// a backfill rate >10× the tx rate AND tx_inserted past the baseline gate.
|
||||
const sb = loadPerf();
|
||||
// Reset any previous cached snapshot
|
||||
sb.ctx.window._perfWriteSourcesPrev = null;
|
||||
const t0 = '2026-01-01T00:00:00Z';
|
||||
const t1 = '2026-01-01T00:00:01Z'; // 1s later
|
||||
const phase1 = { sources: { tx_inserted: 100, backfill_path_json: 0 }, sampleAt: t0 };
|
||||
const phase2 = { sources: { tx_inserted: 105, backfill_path_json: 1000 }, sampleAt: t1 };
|
||||
// First render: no prev → no flags possible
|
||||
stubFetch(sb, { ...basePerf, goRuntime }, goHealth, ioData, sqliteData, phase1);
|
||||
await sb.pages.perf.init({ set innerHTML(v) {} });
|
||||
await new Promise(r => setTimeout(r, 50));
|
||||
// Second render: simulate tick with delta. Reuse the same fetch wiring.
|
||||
stubFetch(sb, { ...basePerf, goRuntime }, goHealth, ioData, sqliteData, phase2);
|
||||
await sb.pages.perf.init({ set innerHTML(v) {} });
|
||||
await new Promise(r => setTimeout(r, 50));
|
||||
const html = sb.getHtml();
|
||||
// After phase2: tx_rate = 5/s, backfill_rate = 1000/s → ratio = 200x → ⚠️
|
||||
const idx = html.indexOf('backfill_path_json');
|
||||
assert.ok(idx >= 0, 'backfill_path_json row missing');
|
||||
const row = html.slice(idx, idx + 400);
|
||||
assert.ok(row.includes('⚠️'), 'expected ⚠️ on backfill row when rate ratio >10×, row=' + row);
|
||||
});
|
||||
|
||||
await test('Backfill anomaly: tx_inserted <100 baseline guard SUPPRESSES ⚠️', async () => {
|
||||
// Same shape but tx_inserted stays well below the 100 floor; even a huge
|
||||
// backfill rate ratio must NOT fire while we lack a meaningful baseline.
|
||||
const sb = loadPerf();
|
||||
sb.ctx.window._perfWriteSourcesPrev = null;
|
||||
const t0 = '2026-01-01T00:00:00Z';
|
||||
const t1 = '2026-01-01T00:00:01Z';
|
||||
const phase1 = { sources: { tx_inserted: 5, backfill_path_json: 0 }, sampleAt: t0 };
|
||||
const phase2 = { sources: { tx_inserted: 6, backfill_path_json: 1000 }, sampleAt: t1 };
|
||||
stubFetch(sb, { ...basePerf, goRuntime }, goHealth, ioData, sqliteData, phase1);
|
||||
await sb.pages.perf.init({ set innerHTML(v) {} });
|
||||
await new Promise(r => setTimeout(r, 50));
|
||||
stubFetch(sb, { ...basePerf, goRuntime }, goHealth, ioData, sqliteData, phase2);
|
||||
await sb.pages.perf.init({ set innerHTML(v) {} });
|
||||
await new Promise(r => setTimeout(r, 50));
|
||||
const html = sb.getHtml();
|
||||
const idx = html.indexOf('backfill_path_json');
|
||||
const row = html.slice(idx, idx + 400);
|
||||
assert.ok(!row.includes('⚠️'), 'expected NO ⚠️ on backfill row when tx_inserted<100, row=' + row);
|
||||
});
|
||||
|
||||
console.log(`\n${passed} passed, ${failed} failed\n`);
|
||||
process.exit(failed ? 1 : 0);
|
||||
})();
|
||||
|
||||
Reference in New Issue
Block a user