mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-05-13 20:53:33 +00:00
f4cf2acbc0
Red commit: e964ec9c46 (CI run: pending —
workflow only triggers on PR open)
Partial fix for #1120 — finishes the four follow-up items left open
after PR #1123 (cancelled writes, ingestor I/O, threshold-flag tests,
docs).
## What's done
- **`cancelledWriteBytesPerSec`** — server `/proc/self/io` parser
handles `cancelled_write_bytes`; `/api/perf/io` exposes the per-second
rate; Perf page renders it next to Read/Write with ⚠️ when sustained >1
MB/s.
- **Ingestor `/proc/<pid>/io`** — `cmd/ingestor/stats_file.go` samples
its own `/proc/self/io` each tick and includes `procIO` in the snapshot.
The server's `/api/perf/io` reads it and surfaces `.ingestor`. Frontend
renders an `Ingestor process` Disk I/O block alongside the existing
`server process` block (issue mockup: "Both ingestor and server").
- **Threshold + anomaly tests** — `test-perf-disk-io-1120.js` now
asserts ⚠️ fires/suppresses on WAL>100MB, cache_hit<90%, and the
backfill-rate-vs-tx-rate guard with the `tx_inserted >= 100` baseline
floor. Drops the tautological `|| ... === false` short-circuits flagged
in MINOR m4.
- **Docs (m8)** — `config.example.json` adds `_comment_ingestorStats`
(env var, default path, shared-tmp security note);
`cmd/ingestor/README.md` adds `CORESCOPE_INGESTOR_STATS` to the env-var
table plus a `Stats file` section.
## What's NOT done (deferred)
m1 sync.Map → map+RWMutex, m2 perfIOMu rate caching, m3 negative
cacheSize translation, m5 deterministic-write test, m7 ctx-aware
shutdown — pure polish; will file a follow-up issue if the operator
wants them tracked.
## TDD
- Red: `e964ec9` — adds failing tests + stub field/handler shape
(cancelled missing from struct, ingestor stub returns nil, ingestor
procIO absent).
- Green: `1240703` — wires up the parser case, ingestor sampler,
frontend rendering, docs.
E2E assertion added: test-perf-disk-io-1120.js:108
---------
Co-authored-by: clawbot <clawbot@users.noreply.github.com>
Co-authored-by: Kpa-clawbot <bot@kpa-clawbot.local>
Co-authored-by: Kpa-clawbot <bot@kpa-clawbot>
347 lines
12 KiB
Go
347 lines
12 KiB
Go
package main
|
||
|
||
import (
|
||
"bufio"
|
||
"encoding/json"
|
||
"net/http"
|
||
"os"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"github.com/meshcore-analyzer/perfio"
|
||
)
|
||
|
||
// PerfIOResponse holds per-process disk I/O metrics derived from /proc/self/io.
|
||
//
|
||
// `Ingestor` is the same shape as the top-level fields, sourced from the
|
||
// ingestor's own /proc/self/io snapshot (published via the ingestor stats file).
|
||
// Issue #1120 calls for "Both ingestor and server" — this is the ingestor half.
|
||
//
|
||
// `CancelledWriteBytesPerSec` surfaces `cancelled_write_bytes` from
|
||
// /proc/self/io — bytes the kernel discarded before they hit disk (e.g. file
|
||
// truncated/unlinked while dirty). Useful signal when chasing
|
||
// write-amplification anomalies (cf. the BackfillPathJSON loop in #1119).
|
||
type PerfIOResponse struct {
|
||
ReadBytesPerSec float64 `json:"readBytesPerSec"`
|
||
WriteBytesPerSec float64 `json:"writeBytesPerSec"`
|
||
CancelledWriteBytesPerSec float64 `json:"cancelledWriteBytesPerSec"`
|
||
SyscallsRead float64 `json:"syscallsRead"`
|
||
SyscallsWrite float64 `json:"syscallsWrite"`
|
||
Ingestor *PerfIOSample `json:"ingestor,omitempty"`
|
||
}
|
||
|
||
// PerfIOSample is the canonical per-process I/O rate sample, shared with the
|
||
// ingestor via internal/perfio. Sharing the type prevents silent JSON contract
|
||
// drift between the publisher (ingestor) and the consumer (server) (#1167).
|
||
type PerfIOSample = perfio.Sample
|
||
|
||
// PerfSqliteResponse holds SQLite-specific perf metrics.
|
||
type PerfSqliteResponse struct {
|
||
WalSizeMB float64 `json:"walSizeMB"`
|
||
WalSize int64 `json:"walSize"`
|
||
PageCount int64 `json:"pageCount"`
|
||
PageSize int64 `json:"pageSize"`
|
||
CacheSize int64 `json:"cacheSize"`
|
||
CacheHitRate float64 `json:"cacheHitRate"`
|
||
}
|
||
|
||
// procIOSample is a snapshot of /proc/self/io counters.
|
||
type procIOSample struct {
|
||
at time.Time
|
||
readBytes int64
|
||
writeBytes int64
|
||
cancelledWrite int64
|
||
syscR int64
|
||
syscW int64
|
||
}
|
||
|
||
// perfIOTracker keeps the previous sample so handlePerfIO can compute deltas.
|
||
var (
|
||
perfIOMu sync.Mutex
|
||
perfIOLastSample procIOSample
|
||
)
|
||
|
||
// readIngestorStatsParseCalls counts full json.Unmarshal calls performed by
|
||
// readIngestorIOSample (cache miss path). Exported (lowercase + same-package
|
||
// access) for tests asserting the cache eliminates redundant decodes.
|
||
// Carmack must-fix #2.
|
||
var readIngestorStatsParseCalls atomic.Int64
|
||
|
||
// resetIngestorIOCache wipes the cached snapshot. Test-only helper.
|
||
func resetIngestorIOCache() {
|
||
ingestorIOCache.Lock()
|
||
ingestorIOCache.mtimeUnixNano = 0
|
||
ingestorIOCache.size = 0
|
||
ingestorIOCache.sample = nil
|
||
ingestorIOCache.Unlock()
|
||
}
|
||
|
||
// ingestorIOCache is the byte-stable snapshot cache for readIngestorIOSample
|
||
// (Carmack must-fix #2). Keyed by (file mtime nanoseconds, size); on hit we
|
||
// return the previously decoded sample without re-opening the file.
|
||
var ingestorIOCache struct {
|
||
sync.Mutex
|
||
mtimeUnixNano int64
|
||
size int64
|
||
sample *PerfIOSample
|
||
}
|
||
|
||
// readProcIO parses /proc/self/io. Returns a zero-time sample (at.IsZero())
|
||
// on non-Linux, read failure, or when no recognised keys were parsed
|
||
// (Carmack must-fix #6 — never publish a phantom-zero counter set, the
|
||
// next tick would treat the real counters as a giant delta).
|
||
func readProcIO() procIOSample {
|
||
s := procIOSample{at: time.Now()}
|
||
f, err := os.Open("/proc/self/io")
|
||
if err != nil {
|
||
return procIOSample{}
|
||
}
|
||
defer f.Close()
|
||
if !parseProcIOInto(bufio.NewScanner(f), &s) {
|
||
return procIOSample{}
|
||
}
|
||
return s
|
||
}
|
||
|
||
// parseProcIOInto reads /proc/self/io-shaped key:value lines from sc and
|
||
// populates the byte/syscall fields on s. Returns true iff at least one
|
||
// recognised key was successfully parsed (Carmack must-fix #6).
|
||
//
|
||
// Implementation delegates to perfio.ParseProcIO — single source of truth
|
||
// shared with the ingestor (Carmack must-fix #7; previously two divergent
|
||
// copies, which is how the empty-key gate was missing on this side).
|
||
func parseProcIOInto(sc *bufio.Scanner, s *procIOSample) bool {
|
||
var c perfio.Counters
|
||
ok := perfio.ParseProcIO(sc, &c)
|
||
s.readBytes = c.ReadBytes
|
||
s.writeBytes = c.WriteBytes
|
||
s.cancelledWrite = c.CancelledWriteBytes
|
||
s.syscR = c.SyscR
|
||
s.syscW = c.SyscW
|
||
return ok
|
||
}
|
||
|
||
// handlePerfIO returns delta-rate disk I/O for the server process (per-second).
|
||
// On the first call (no prior sample), rates are zero; subsequent calls
|
||
// report the delta divided by elapsed seconds.
|
||
func (s *Server) handlePerfIO(w http.ResponseWriter, r *http.Request) {
|
||
cur := readProcIO()
|
||
resp := PerfIOResponse{}
|
||
|
||
perfIOMu.Lock()
|
||
prev := perfIOLastSample
|
||
perfIOLastSample = cur
|
||
perfIOMu.Unlock()
|
||
|
||
if !prev.at.IsZero() {
|
||
dt := cur.at.Sub(prev.at).Seconds()
|
||
if dt < 0.001 {
|
||
dt = 0.001
|
||
}
|
||
resp.ReadBytesPerSec = float64(cur.readBytes-prev.readBytes) / dt
|
||
resp.WriteBytesPerSec = float64(cur.writeBytes-prev.writeBytes) / dt
|
||
resp.CancelledWriteBytesPerSec = float64(cur.cancelledWrite-prev.cancelledWrite) / dt
|
||
resp.SyscallsRead = float64(cur.syscR-prev.syscR) / dt
|
||
resp.SyscallsWrite = float64(cur.syscW-prev.syscW) / dt
|
||
}
|
||
// Ingestor block: GREEN commit replaces stub readIngestorIOSample with
|
||
// real parsing of the ingestor stats file's procIO section (#1120
|
||
// follow-up — "Both ingestor and server").
|
||
if ing := readIngestorIOSample(); ing != nil {
|
||
resp.Ingestor = ing
|
||
}
|
||
writeJSON(w, resp)
|
||
}
|
||
|
||
// IngestorStatsStaleThreshold is the maximum age (sampledAt → now) of an
|
||
// ingestor stats snapshot before it is treated as dead and dropped from the
|
||
// /api/perf/io response. Default writer interval is ~1s; 5× that catches a
|
||
// wedged writer goroutine without flapping on a brief tick miss.
|
||
//
|
||
// #1167 must-fix #1: serving stale procIO as live disguises a dead ingestor.
|
||
const IngestorStatsStaleThreshold = 5 * time.Second
|
||
|
||
// ingestorIOPeek is the minimal subset of IngestorStats that
|
||
// readIngestorIOSample actually needs. Decoding into this instead of the
|
||
// full IngestorStats avoids allocating BackfillUpdates (a map) and the
|
||
// ~10 unused counter fields on every /api/perf/io request (Carmack
|
||
// must-fix #1).
|
||
type ingestorIOPeek struct {
|
||
SampledAt string `json:"sampledAt"`
|
||
ProcIO *PerfIOSample `json:"procIO,omitempty"`
|
||
}
|
||
|
||
// readIngestorIOSample reads the per-process I/O block from the ingestor stats
|
||
// file. Returns nil if the file is missing, malformed, carries no proc-IO
|
||
// block (older ingestor builds), OR the snapshot is older than
|
||
// IngestorStatsStaleThreshold (#1167 must-fix #1 — operators must not see
|
||
// stale numbers under .ingestor when the ingestor is down). Never errors —
|
||
// diagnostics only.
|
||
//
|
||
// Cached by (file mtime nanoseconds, size): the underlying file is byte-stable
|
||
// between 1Hz writer ticks, so polling the endpoint at 1Hz from N tabs MUST
|
||
// NOT cause N file-opens + N json.Unmarshal per second on identical bytes
|
||
// (Carmack must-fix #2). The cache invalidates as soon as either mtime or
|
||
// size differs from the cached entry.
|
||
func readIngestorIOSample() *PerfIOSample {
|
||
path := IngestorStatsPath()
|
||
info, statErr := os.Stat(path)
|
||
if statErr != nil {
|
||
return nil
|
||
}
|
||
mtimeNs := info.ModTime().UnixNano()
|
||
size := info.Size()
|
||
|
||
ingestorIOCache.Lock()
|
||
if ingestorIOCache.mtimeUnixNano == mtimeNs && ingestorIOCache.size == size && ingestorIOCache.sample != nil {
|
||
s := ingestorIOCache.sample
|
||
ingestorIOCache.Unlock()
|
||
// Re-validate freshness on cache hit too: a stale-but-byte-stable
|
||
// file (writer wedged) MUST still drop after the threshold.
|
||
if s.SampledAt != "" {
|
||
if ts, err := time.Parse(time.RFC3339, s.SampledAt); err == nil {
|
||
if time.Since(ts) > IngestorStatsStaleThreshold {
|
||
return nil
|
||
}
|
||
}
|
||
}
|
||
return s
|
||
}
|
||
ingestorIOCache.Unlock()
|
||
|
||
data, err := os.ReadFile(path)
|
||
if err != nil {
|
||
return nil
|
||
}
|
||
readIngestorStatsParseCalls.Add(1)
|
||
var st ingestorIOPeek
|
||
if err := json.Unmarshal(data, &st); err != nil {
|
||
return nil
|
||
}
|
||
if st.ProcIO == nil {
|
||
return nil
|
||
}
|
||
stamp := st.SampledAt
|
||
if stamp == "" {
|
||
stamp = st.ProcIO.SampledAt
|
||
}
|
||
if stamp == "" {
|
||
return nil
|
||
}
|
||
ts, err := time.Parse(time.RFC3339, stamp)
|
||
if err != nil {
|
||
return nil
|
||
}
|
||
if time.Since(ts) > IngestorStatsStaleThreshold {
|
||
return nil
|
||
}
|
||
|
||
ingestorIOCache.Lock()
|
||
ingestorIOCache.mtimeUnixNano = mtimeNs
|
||
ingestorIOCache.size = size
|
||
ingestorIOCache.sample = st.ProcIO
|
||
ingestorIOCache.Unlock()
|
||
|
||
return st.ProcIO
|
||
}
|
||
|
||
// handlePerfSqlite returns SQLite WAL size + cache hit-rate stats.
|
||
func (s *Server) handlePerfSqlite(w http.ResponseWriter, r *http.Request) {
|
||
resp := PerfSqliteResponse{}
|
||
if s.db != nil && s.db.conn != nil {
|
||
var pageCount, pageSize int64
|
||
_ = s.db.conn.QueryRow("PRAGMA page_count").Scan(&pageCount)
|
||
_ = s.db.conn.QueryRow("PRAGMA page_size").Scan(&pageSize)
|
||
var cacheSize int64
|
||
_ = s.db.conn.QueryRow("PRAGMA cache_size").Scan(&cacheSize)
|
||
resp.PageCount = pageCount
|
||
resp.PageSize = pageSize
|
||
resp.CacheSize = cacheSize
|
||
|
||
// Cache hit rate: derived from PacketStore cache (rw_cache). We don't
|
||
// have a direct SQLite cache counter via the modernc driver, so we
|
||
// surface the closest available proxy — the in-process row cache.
|
||
if s.store != nil {
|
||
cs := s.store.GetCacheStatsTyped()
|
||
total := cs.Hits + cs.Misses
|
||
if total > 0 {
|
||
resp.CacheHitRate = float64(cs.Hits) / float64(total)
|
||
}
|
||
}
|
||
|
||
if s.db.path != "" && s.db.path != ":memory:" {
|
||
if info, err := os.Stat(s.db.path + "-wal"); err == nil {
|
||
resp.WalSize = info.Size()
|
||
resp.WalSizeMB = float64(info.Size()) / 1048576
|
||
}
|
||
}
|
||
}
|
||
writeJSON(w, resp)
|
||
}
|
||
|
||
// IngestorStats is the on-disk JSON shape the ingestor writes periodically
|
||
// for the server to expose via /api/perf/write-sources.
|
||
type IngestorStats struct {
|
||
SampledAt string `json:"sampledAt"`
|
||
TxInserted int64 `json:"tx_inserted"`
|
||
ObsInserted int64 `json:"obs_inserted"`
|
||
DuplicateTx int64 `json:"tx_dupes"`
|
||
NodeUpserts int64 `json:"node_upserts"`
|
||
ObserverUpserts int64 `json:"observer_upserts"`
|
||
WriteErrors int64 `json:"write_errors"`
|
||
SignatureDrops int64 `json:"sig_drops"`
|
||
WALCommits int64 `json:"walCommits"`
|
||
GroupCommitFlushes int64 `json:"groupCommitFlushes"`
|
||
BackfillUpdates map[string]int64 `json:"backfillUpdates"`
|
||
// ProcIO is the ingestor's own /proc/self/io rates (since its previous
|
||
// sample). Optional — older ingestor builds don't publish this. See #1120.
|
||
ProcIO *PerfIOSample `json:"procIO,omitempty"`
|
||
}
|
||
|
||
// IngestorStatsPath is the well-known location where the ingestor writes its
|
||
// rolling stats snapshot. Overridable by env CORESCOPE_INGESTOR_STATS for tests.
|
||
func IngestorStatsPath() string {
|
||
if p := os.Getenv("CORESCOPE_INGESTOR_STATS"); p != "" {
|
||
return p
|
||
}
|
||
return "/tmp/corescope-ingestor-stats.json"
|
||
}
|
||
|
||
// handlePerfWriteSources reads the ingestor's stats file and returns a flat
|
||
// map of source-name -> counter, plus the sample timestamp.
|
||
func (s *Server) handlePerfWriteSources(w http.ResponseWriter, r *http.Request) {
|
||
out := map[string]interface{}{
|
||
"sources": map[string]int64{},
|
||
"sampleAt": "",
|
||
}
|
||
|
||
data, err := os.ReadFile(IngestorStatsPath())
|
||
if err != nil {
|
||
writeJSON(w, out)
|
||
return
|
||
}
|
||
var st IngestorStats
|
||
if err := json.Unmarshal(data, &st); err != nil {
|
||
writeJSON(w, out)
|
||
return
|
||
}
|
||
sources := map[string]int64{
|
||
"tx_inserted": st.TxInserted,
|
||
"tx_dupes": st.DuplicateTx,
|
||
"obs_inserted": st.ObsInserted,
|
||
"node_upserts": st.NodeUpserts,
|
||
"observer_upserts": st.ObserverUpserts,
|
||
"write_errors": st.WriteErrors,
|
||
"sig_drops": st.SignatureDrops,
|
||
"walCommits": st.WALCommits,
|
||
"groupCommitFlushes": st.GroupCommitFlushes,
|
||
}
|
||
for name, v := range st.BackfillUpdates {
|
||
sources["backfill_"+name] = v
|
||
}
|
||
out["sources"] = sources
|
||
out["sampleAt"] = st.SampledAt
|
||
writeJSON(w, out)
|
||
}
|