mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-06-29 14:41:41 +00:00
## What Per-component SQLite writer-lock instrumentation so the next neighbor-builder-style write-lock starvation (root cause of #1339, invisible to operators for ~3 days) is detectable from `/api/perf`. Adds `Store.WriterExec` / `Store.WriterTx` wrappers that gate every wrapped call on a package-level `writerMu` so the wait the SQLite driver hides becomes Go-visible, and record `wait_ms` + `hold_ms` + `contention_total` (wait_ms > 100ms) under a component tag. Per-component p50/p95/p99 + max are published to `/api/perf/write-sources` under `.writer_perf` via the existing ingestor stats-file path. Slow-writer log line (`[db-slow-writer] component=X duration=Yms query=<200ch>`) fires on `hold_ms > 500ms` (threshold overridable via `CORESCOPE_DB_SLOW_WRITER_MS` env var). ## Tagged call sites | Component | Location | |-----------|----------| | `mqtt_handler` | `InsertTransmission` (db.go) | | `neighbor_builder` | `buildAndPersistNeighborEdges` (neighbor_builder.go) | | `prune_packets` | `PruneOldPackets` (maintenance.go) | | `prune_observers` | `RemoveStaleObservers` + orphan-metrics cleanup (db.go) | | `prune_metrics` | `PruneOldMetrics` (db.go) | | `vacuum` | `RunIncrementalVacuum` + `CheckAutoVacuum`'s full VACUUM (db.go) | ## TDD red→green - **Red commit** `68de585b` — `cmd/ingestor/db_writer_perf_test.go` + `Store.Writer*` stubs at end of `db.go`. Test synthetically blocks the writer for 60s tagged `neighbor_builder`, then asserts `mqtt_handler.wait_ms.p99 > 50000ms` on concurrent inserts. Fails on the assertion (p99 = 0.0ms) with the stub — not a build error. - **Green commit** `6a9be174` — replaces stubs with real wait/hold/contention aggregator + wires every writer call site. Same test passes: ``` 2026/06/05 04:36:47 [db-slow-writer] component=neighbor_builder duration=60059.0ms query=COMMIT --- PASS: TestWriterStarvationVisibleInPerf (60.40s) PASS ok github.com/corescope/ingestor 60.408s ``` ## Scope discipline - **API**: no public `Store`/`DB` signature change. Only additive exports. - **Server**: extends existing `/api/perf/write-sources` JSON with `.writer_perf` — does **not** add a new route, does **not** replace `handlePerf`. Empty `.writer_perf` map when paired with an older ingestor. - **Read/write invariant** (#1283) preserved: all instrumentation lives on the ingestor's writer connection. - **Files touched** (6 total): `cmd/ingestor/db.go`, `cmd/ingestor/db_writer_perf_test.go`, `cmd/ingestor/maintenance.go`, `cmd/ingestor/neighbor_builder.go`, `cmd/ingestor/stats_file.go`, `cmd/server/perf_io.go`, `config.example.json`. ## Deferred (acceptance items NOT in this PR) - **`mbcap_persist` component tag** — `RunMultibyteCapPersist`'s tx is intentionally NOT wrapped in this PR to stay within the implementation brief's 3-files-outside-whitelist budget. One-file follow-up to instrument. - **CI smoke test** asserting "neighbor-builder hold_ms < 1000ms on 100k-obs fixture" — deferred to a separate PR per the brief; this PR is scoped to instrumentation only. ## Preflight overrides PREFLIGHT-MIGRATION-SCALE: <30s N=runtime — the async-migration gate flagged five `instrumentedExec` / wrapped-`tx.Exec` lines on `DELETE FROM observer_metrics`, `UPDATE observers`, `DELETE FROM observer_metrics`, `DELETE FROM observations`, `DELETE FROM transmissions`. These are **not** schema migrations — they are the existing runtime prune / retention queries that already ran sync against `s.db.Exec` / `tx.Exec` on every retention cycle on master. This PR only swapped the surface call (sync → sync, via the wrapper) to record wait/hold timing; no new sync schema work was introduced. Behavior on production data is identical to master. Also: red commit's synthetic `UPDATE nodes SET name = name WHERE 0` is a test-only stub designed to acquire the writer without mutating any row (the `WHERE 0` is a no-op predicate). Fixes #1340 --------- Co-authored-by: corescope-bot <bot@corescope.local>
This commit is contained in:
+314
-6
@@ -8,6 +8,7 @@ import (
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -791,6 +792,21 @@ func (s *Store) InsertTransmission(data *PacketData) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Wait/hold instrumentation (#1340). The hot path uses prepared
|
||||
// statements that auto-commit; gate the whole function under
|
||||
// writerMu so concurrent mqtt_handler inserts queue behind any
|
||||
// other writer (vacuum, prune, neighbor-builder) and the wait is
|
||||
// Go-visible.
|
||||
mqttWaitStart := time.Now()
|
||||
writerMu.Lock()
|
||||
mqttWait := time.Since(mqttWaitStart)
|
||||
mqttHoldStart := time.Now()
|
||||
defer func() {
|
||||
mqttHold := time.Since(mqttHoldStart)
|
||||
writerMu.Unlock()
|
||||
recordWriterTiming("mqtt_handler", mqttWait, mqttHold, "InsertTransmission")
|
||||
}()
|
||||
|
||||
rxTime := data.Timestamp
|
||||
ingestNow := time.Now().UTC().Format(time.RFC3339)
|
||||
if rxTime == "" {
|
||||
@@ -1088,7 +1104,8 @@ func (s *Store) InsertMetrics(data *MetricsData) error {
|
||||
// PruneOldMetrics deletes observer_metrics rows older than retentionDays.
|
||||
func (s *Store) PruneOldMetrics(retentionDays int) (int64, error) {
|
||||
cutoff := time.Now().UTC().AddDate(0, 0, -retentionDays).Format(time.RFC3339)
|
||||
result, err := s.db.Exec(`DELETE FROM observer_metrics WHERE timestamp < ?`, cutoff)
|
||||
// Tagged for /api/perf writer-lock visibility (#1340).
|
||||
result, err := s.instrumentedExec("prune_metrics", `DELETE FROM observer_metrics WHERE timestamp < ?`, cutoff)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("prune metrics: %w", err)
|
||||
}
|
||||
@@ -1129,11 +1146,11 @@ func (s *Store) CheckAutoVacuum(cfg *Config) {
|
||||
log.Printf("[db] vacuumOnStartup=true — starting one-time full VACUUM (ensure 2x DB size free disk space)...")
|
||||
start := time.Now()
|
||||
|
||||
if _, err := s.db.Exec("PRAGMA auto_vacuum = INCREMENTAL"); err != nil {
|
||||
if _, err := s.instrumentedExec("vacuum", "PRAGMA auto_vacuum = INCREMENTAL"); err != nil {
|
||||
log.Printf("[db] VACUUM failed: could not set auto_vacuum: %v", err)
|
||||
return
|
||||
}
|
||||
if _, err := s.db.Exec("VACUUM"); err != nil {
|
||||
if _, err := s.instrumentedExec("vacuum", "VACUUM"); err != nil {
|
||||
log.Printf("[db] VACUUM failed: %v", err)
|
||||
return
|
||||
}
|
||||
@@ -1146,7 +1163,8 @@ func (s *Store) CheckAutoVacuum(cfg *Config) {
|
||||
// RunIncrementalVacuum returns free pages to the OS (#919).
|
||||
// Safe to call on auto_vacuum=NONE databases (noop).
|
||||
func (s *Store) RunIncrementalVacuum(pages int) {
|
||||
if _, err := s.db.Exec(fmt.Sprintf("PRAGMA incremental_vacuum(%d)", pages)); err != nil {
|
||||
// Tagged for /api/perf writer-lock visibility (#1340).
|
||||
if _, err := s.instrumentedExec("vacuum", fmt.Sprintf("PRAGMA incremental_vacuum(%d)", pages)); err != nil {
|
||||
log.Printf("[vacuum] incremental_vacuum error: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -1361,14 +1379,15 @@ func (s *Store) RemoveStaleObservers(observerDays int) (int64, error) {
|
||||
return 0, nil // keep forever
|
||||
}
|
||||
cutoff := time.Now().UTC().AddDate(0, 0, -observerDays).Format(time.RFC3339)
|
||||
result, err := s.db.Exec(`UPDATE observers SET inactive = 1 WHERE last_seen < ? AND (inactive IS NULL OR inactive = 0)`, cutoff)
|
||||
// Tagged for /api/perf writer-lock visibility (#1340).
|
||||
result, err := s.instrumentedExec("prune_observers", `UPDATE observers SET inactive = 1 WHERE last_seen < ? AND (inactive IS NULL OR inactive = 0)`, cutoff)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("mark stale observers inactive: %w", err)
|
||||
}
|
||||
removed, _ := result.RowsAffected()
|
||||
if removed > 0 {
|
||||
// Clean up orphaned metrics for now-inactive observers
|
||||
s.db.Exec(`DELETE FROM observer_metrics WHERE observer_id IN (SELECT id FROM observers WHERE inactive = 1)`)
|
||||
_, _ = s.instrumentedExec("prune_observers", `DELETE FROM observer_metrics WHERE observer_id IN (SELECT id FROM observers WHERE inactive = 1)`)
|
||||
log.Printf("Marked %d observer(s) as inactive (not seen in %d days)", removed, observerDays)
|
||||
}
|
||||
return removed, nil
|
||||
@@ -1608,3 +1627,292 @@ func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID,
|
||||
|
||||
return pd
|
||||
}
|
||||
|
||||
|
||||
// ─── Writer-lock instrumentation (issue #1340) ────────────────────────────
|
||||
//
|
||||
// Make SQLite writer-lock starvation visible to operators. Per-component
|
||||
// wait_ms / hold_ms / contention_total histograms, surfaced via
|
||||
// /api/perf/write-sources under the "writer_perf" key. Component tags:
|
||||
// neighbor_builder, mqtt_handler, prune_packets, prune_observers,
|
||||
// prune_metrics, mbcap_persist (deferred — see PR body), vacuum.
|
||||
//
|
||||
// The single writer connection (SetMaxOpenConns(1)) means writes serialise
|
||||
// inside the driver and the wait is invisible to Go. writerMu measures the
|
||||
// wait Go can see (everyone queueing behind the current holder) by gating
|
||||
// every wrapped call site through the same package-level mutex.
|
||||
|
||||
// WriterStatsSnapshot is a per-component wait/hold latency snapshot
|
||||
// surfaced via /api/perf to make SQLite writer-lock starvation visible
|
||||
// to operators (issue #1340). Times are in milliseconds.
|
||||
type WriterStatsSnapshot struct {
|
||||
Count int64 `json:"count"`
|
||||
ContentionTotal int64 `json:"contention_total"`
|
||||
WaitMsP50 float64 `json:"wait_ms_p50"`
|
||||
WaitMsP95 float64 `json:"wait_ms_p95"`
|
||||
WaitMsP99 float64 `json:"wait_ms_p99"`
|
||||
WaitMsMax float64 `json:"wait_ms_max"`
|
||||
HoldMsP50 float64 `json:"hold_ms_p50"`
|
||||
HoldMsP95 float64 `json:"hold_ms_p95"`
|
||||
HoldMsP99 float64 `json:"hold_ms_p99"`
|
||||
HoldMsMax float64 `json:"hold_ms_max"`
|
||||
}
|
||||
|
||||
const (
|
||||
// writerSampleWindow bounds the per-component rolling window so a
|
||||
// long-running ingestor doesn't grow this unbounded.
|
||||
writerSampleWindow = 1024
|
||||
// contentionThresholdMs: wait_ms above this counts as a "contended"
|
||||
// write (per #1340 spec).
|
||||
contentionThresholdMs = 100.0
|
||||
defaultSlowWriterMs = 500.0
|
||||
)
|
||||
|
||||
// slowWriterThresholdMsAtomic — hold_ms threshold above which writes
|
||||
// emit a [db-slow-writer] log line. Read on the hot path; written once
|
||||
// at startup by SetSlowWriterThresholdMs.
|
||||
var slowWriterThresholdMsAtomic atomic.Uint64
|
||||
|
||||
// SetSlowWriterThresholdMs sets the [db-slow-writer] log threshold.
|
||||
// ms<=0 restores the 500ms default. Operators can also set
|
||||
// CORESCOPE_DB_SLOW_WRITER_MS at process start — see initSlowWriterFromEnv.
|
||||
func SetSlowWriterThresholdMs(ms float64) {
|
||||
if ms <= 0 {
|
||||
ms = defaultSlowWriterMs
|
||||
}
|
||||
slowWriterThresholdMsAtomic.Store(uint64(ms))
|
||||
}
|
||||
|
||||
func getSlowWriterThresholdMs() float64 {
|
||||
v := slowWriterThresholdMsAtomic.Load()
|
||||
if v == 0 {
|
||||
return defaultSlowWriterMs
|
||||
}
|
||||
return float64(v)
|
||||
}
|
||||
|
||||
// initSlowWriterFromEnv is called once from package init so operators can
|
||||
// override the threshold via CORESCOPE_DB_SLOW_WRITER_MS without a
|
||||
// Go-side Config change.
|
||||
func initSlowWriterFromEnv() {
|
||||
v := os.Getenv("CORESCOPE_DB_SLOW_WRITER_MS")
|
||||
if v == "" {
|
||||
return
|
||||
}
|
||||
var ms float64
|
||||
if _, err := fmt.Sscanf(v, "%f", &ms); err == nil && ms > 0 {
|
||||
SetSlowWriterThresholdMs(ms)
|
||||
}
|
||||
}
|
||||
|
||||
func init() { initSlowWriterFromEnv() }
|
||||
|
||||
type writerComponentStats struct {
|
||||
mu sync.Mutex
|
||||
count int64
|
||||
contentionTotal int64
|
||||
waitMs []float64
|
||||
holdMs []float64
|
||||
waitMax float64
|
||||
holdMax float64
|
||||
}
|
||||
|
||||
func (c *writerComponentStats) record(waitMs, holdMs float64) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.count++
|
||||
if waitMs > contentionThresholdMs {
|
||||
c.contentionTotal++
|
||||
}
|
||||
if waitMs > c.waitMax {
|
||||
c.waitMax = waitMs
|
||||
}
|
||||
if holdMs > c.holdMax {
|
||||
c.holdMax = holdMs
|
||||
}
|
||||
c.waitMs = appendBoundedFloat(c.waitMs, waitMs, writerSampleWindow)
|
||||
c.holdMs = appendBoundedFloat(c.holdMs, holdMs, writerSampleWindow)
|
||||
}
|
||||
|
||||
func appendBoundedFloat(s []float64, v float64, max int) []float64 {
|
||||
if len(s) < max {
|
||||
return append(s, v)
|
||||
}
|
||||
copy(s, s[1:])
|
||||
s[len(s)-1] = v
|
||||
return s
|
||||
}
|
||||
|
||||
func (c *writerComponentStats) snapshot() WriterStatsSnapshot {
|
||||
c.mu.Lock()
|
||||
wait := append([]float64(nil), c.waitMs...)
|
||||
hold := append([]float64(nil), c.holdMs...)
|
||||
snap := WriterStatsSnapshot{
|
||||
Count: c.count,
|
||||
ContentionTotal: c.contentionTotal,
|
||||
WaitMsMax: c.waitMax,
|
||||
HoldMsMax: c.holdMax,
|
||||
}
|
||||
c.mu.Unlock()
|
||||
sort.Float64s(wait)
|
||||
sort.Float64s(hold)
|
||||
snap.WaitMsP50 = nearestRankPercentile(wait, 0.50)
|
||||
snap.WaitMsP95 = nearestRankPercentile(wait, 0.95)
|
||||
snap.WaitMsP99 = nearestRankPercentile(wait, 0.99)
|
||||
snap.HoldMsP50 = nearestRankPercentile(hold, 0.50)
|
||||
snap.HoldMsP95 = nearestRankPercentile(hold, 0.95)
|
||||
snap.HoldMsP99 = nearestRankPercentile(hold, 0.99)
|
||||
return snap
|
||||
}
|
||||
|
||||
func nearestRankPercentile(sorted []float64, p float64) float64 {
|
||||
n := len(sorted)
|
||||
if n == 0 {
|
||||
return 0
|
||||
}
|
||||
if n == 1 {
|
||||
return sorted[0]
|
||||
}
|
||||
idx := int(p*float64(n-1) + 0.5)
|
||||
if idx < 0 {
|
||||
idx = 0
|
||||
}
|
||||
if idx >= n {
|
||||
idx = n - 1
|
||||
}
|
||||
return sorted[idx]
|
||||
}
|
||||
|
||||
type writerStatsAggregator struct {
|
||||
mu sync.Mutex
|
||||
components map[string]*writerComponentStats
|
||||
}
|
||||
|
||||
var writerStatsAgg = &writerStatsAggregator{
|
||||
components: make(map[string]*writerComponentStats),
|
||||
}
|
||||
|
||||
func (a *writerStatsAggregator) get(component string) *writerComponentStats {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
c, ok := a.components[component]
|
||||
if !ok {
|
||||
c = &writerComponentStats{}
|
||||
a.components[component] = c
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// reset clears all per-component samples. Test-only: lets a single
|
||||
// scenario assert against a clean aggregator without prior-test noise
|
||||
// in the same package run (TestWriterStarvationVisibleInPerf would
|
||||
// otherwise mix this run's 5 starved samples with thousands of fast
|
||||
// InsertTransmission samples from earlier tests and the p99 would
|
||||
// collapse below the 50s threshold).
|
||||
func (a *writerStatsAggregator) reset() {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
a.components = make(map[string]*writerComponentStats)
|
||||
}
|
||||
|
||||
// ResetWriterStatsForTest wipes the per-component writer stats
|
||||
// aggregator. Test-only; not safe to call from production code paths.
|
||||
func ResetWriterStatsForTest() { writerStatsAgg.reset() }
|
||||
|
||||
func (a *writerStatsAggregator) snapshot() map[string]WriterStatsSnapshot {
|
||||
a.mu.Lock()
|
||||
keys := make([]string, 0, len(a.components))
|
||||
stats := make([]*writerComponentStats, 0, len(a.components))
|
||||
for k, v := range a.components {
|
||||
keys = append(keys, k)
|
||||
stats = append(stats, v)
|
||||
}
|
||||
a.mu.Unlock()
|
||||
out := make(map[string]WriterStatsSnapshot, len(keys))
|
||||
for i, k := range keys {
|
||||
out[k] = stats[i].snapshot()
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// WriterStatsSnapshot returns a per-component wait/hold/contention
|
||||
// snapshot for exposure on /api/perf/write-sources (issue #1340).
|
||||
func (s *Store) WriterStatsSnapshot() map[string]WriterStatsSnapshot {
|
||||
return writerStatsAgg.snapshot()
|
||||
}
|
||||
|
||||
// recordWriterTiming aggregates a single sample under component and
|
||||
// emits [db-slow-writer] if hold_ms > configured threshold (default
|
||||
// 500ms). queryForLog is truncated to 200 chars.
|
||||
func recordWriterTiming(component string, wait, hold time.Duration, queryForLog string) {
|
||||
waitMs := float64(wait.Nanoseconds()) / 1e6
|
||||
holdMs := float64(hold.Nanoseconds()) / 1e6
|
||||
writerStatsAgg.get(component).record(waitMs, holdMs)
|
||||
if holdMs > getSlowWriterThresholdMs() {
|
||||
q := queryForLog
|
||||
if len(q) > 200 {
|
||||
q = q[:200]
|
||||
}
|
||||
log.Printf("[db-slow-writer] component=%s duration=%.1fms query=%s", component, holdMs, q)
|
||||
}
|
||||
}
|
||||
|
||||
// writerMu serialises every wrapped writer call so the wait the next
|
||||
// caller sees is the wait the perf snapshot can attribute. The
|
||||
// SQLite driver also enforces serial writes (SetMaxOpenConns(1)),
|
||||
// but the wait inside the driver is invisible to Go — writerMu makes
|
||||
// it Go-visible.
|
||||
var writerMu sync.Mutex
|
||||
|
||||
// WriterExec wraps s.db.Exec with per-component wait/hold/contention
|
||||
// instrumentation (issue #1340).
|
||||
func (s *Store) WriterExec(component, query string, args ...interface{}) (sql.Result, error) {
|
||||
waitStart := time.Now()
|
||||
writerMu.Lock()
|
||||
wait := time.Since(waitStart)
|
||||
holdStart := time.Now()
|
||||
res, err := s.db.Exec(query, args...)
|
||||
hold := time.Since(holdStart)
|
||||
writerMu.Unlock()
|
||||
recordWriterTiming(component, wait, hold, query)
|
||||
return res, err
|
||||
}
|
||||
|
||||
// WriterTx wraps Begin → fn → Commit under component tagging.
|
||||
// hold_ms covers the whole tx so a slow body counts against its owner.
|
||||
func (s *Store) WriterTx(component string, fn func(*sql.Tx) error) error {
|
||||
waitStart := time.Now()
|
||||
writerMu.Lock()
|
||||
wait := time.Since(waitStart)
|
||||
holdStart := time.Now()
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
hold := time.Since(holdStart)
|
||||
writerMu.Unlock()
|
||||
recordWriterTiming(component, wait, hold, "BEGIN")
|
||||
return err
|
||||
}
|
||||
if err := fn(tx); err != nil {
|
||||
_ = tx.Rollback()
|
||||
hold := time.Since(holdStart)
|
||||
writerMu.Unlock()
|
||||
recordWriterTiming(component, wait, hold, "tx-body")
|
||||
return err
|
||||
}
|
||||
err = tx.Commit()
|
||||
hold := time.Since(holdStart)
|
||||
writerMu.Unlock()
|
||||
recordWriterTiming(component, wait, hold, "COMMIT")
|
||||
return err
|
||||
}
|
||||
|
||||
// Wrap helpers below tag existing call sites with the canonical
|
||||
// component names so the call sites read naturally. These keep the
|
||||
// instrumentation out of the hot-path business logic.
|
||||
|
||||
// instrumentedExec is the package-internal pass-through used by call
|
||||
// sites already inside db.go (PruneOldMetrics, RemoveStaleObservers,
|
||||
// vacuum). Equivalent to WriterExec, kept short for readability.
|
||||
func (s *Store) instrumentedExec(component, query string, args ...interface{}) (sql.Result, error) {
|
||||
return s.WriterExec(component, query, args...)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,115 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestWriterStarvationVisibleInPerf reproduces the #1339 class of bug:
|
||||
// one component (neighbor_builder) holds the writer connection for an
|
||||
// extended period; a second component (mqtt_handler) firing concurrent
|
||||
// writes must show observable wait_ms in the perf snapshot.
|
||||
//
|
||||
// This is the gate test for issue #1340: SQLite write-lock instrumentation
|
||||
// per component. If the wait_ms percentile collapses to zero, the
|
||||
// observability gap remains and the regression class is invisible again.
|
||||
//
|
||||
// Runs ~60s — guarded by testing.Short() so fast unit-test passes can
|
||||
// skip it locally, but CI runs `go test ./...` without -short.
|
||||
func TestWriterStarvationVisibleInPerf(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping 60s starvation test in short mode")
|
||||
}
|
||||
|
||||
// Isolate from samples accumulated by earlier tests in the same
|
||||
// package run — without this the mqtt_handler component already
|
||||
// has ~thousand fast InsertTransmission samples and the 5 slow
|
||||
// follower samples can't move p99 above 50s.
|
||||
ResetWriterStatsForTest()
|
||||
|
||||
s, err := OpenStore(tempDBPath(t))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
const blockDur = 60 * time.Second
|
||||
|
||||
// Blocker: acquire the writer via the wrapped Tx path, tag as
|
||||
// neighbor_builder, sleep 60s while holding the single conn,
|
||||
// then commit. This monopolises the writer for the duration.
|
||||
blockStarted := make(chan struct{})
|
||||
blockerDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(blockerDone)
|
||||
err := s.WriterTx("neighbor_builder", func(tx *sql.Tx) error {
|
||||
if _, err := tx.Exec(`UPDATE nodes SET name = name WHERE 0`); err != nil {
|
||||
return err
|
||||
}
|
||||
close(blockStarted)
|
||||
time.Sleep(blockDur)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("blocker tx: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for the blocker to be inside its transaction.
|
||||
<-blockStarted
|
||||
// Small safety margin so the blocker is firmly holding the conn.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Now fire several mqtt_handler writes. Each will block on the
|
||||
// single writer connection until the blocker commits.
|
||||
const followers = 5
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(followers)
|
||||
for i := 0; i < followers; i++ {
|
||||
i := i
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, err := s.WriterExec(
|
||||
"mqtt_handler",
|
||||
`INSERT OR IGNORE INTO _migrations (name) VALUES (?)`,
|
||||
fmt.Sprintf("writer_starvation_test_%d", i),
|
||||
)
|
||||
if err != nil {
|
||||
t.Errorf("mqtt follower %d: %v", i, err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
<-blockerDone
|
||||
|
||||
snap := s.WriterStatsSnapshot()
|
||||
mqtt, ok := snap["mqtt_handler"]
|
||||
if !ok {
|
||||
t.Fatalf("no perf snapshot for mqtt_handler component (got components: %v)", componentKeys(snap))
|
||||
}
|
||||
if mqtt.Count < followers {
|
||||
t.Fatalf("expected at least %d mqtt_handler samples, got %d", followers, mqtt.Count)
|
||||
}
|
||||
// This is the gate assertion. With instrumentation present the
|
||||
// follower writes should each register ~60s of wait_ms; p99 must
|
||||
// be well above 50_000ms. With instrumentation missing or broken
|
||||
// the percentile collapses to zero and this fails — which is the
|
||||
// exact regression class #1340 is meant to prevent.
|
||||
if mqtt.WaitMsP99 <= 50_000 {
|
||||
t.Fatalf("mqtt_handler wait_ms p99 = %.1fms, want > 50000ms; "+
|
||||
"writer starvation is invisible to /api/perf — issue #1340 not fixed",
|
||||
mqtt.WaitMsP99)
|
||||
}
|
||||
}
|
||||
|
||||
func componentKeys(m map[string]WriterStatsSnapshot) []string {
|
||||
out := make([]string, 0, len(m))
|
||||
for k := range m {
|
||||
out = append(out, k)
|
||||
}
|
||||
return out
|
||||
}
|
||||
+17
-18
@@ -22,26 +22,25 @@ func (s *Store) PruneOldPackets(days int) (int64, error) {
|
||||
}
|
||||
cutoff := time.Now().UTC().AddDate(0, 0, -days).Format(time.RFC3339)
|
||||
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("prune begin: %w", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
// Tagged for writer-perf visibility (#1340).
|
||||
var n int64
|
||||
err := s.WriterTx("prune_packets", func(tx *sql.Tx) error {
|
||||
// Delete child observations first (no CASCADE in SQLite).
|
||||
if _, err := tx.Exec(`DELETE FROM observations WHERE transmission_id IN (
|
||||
SELECT id FROM transmissions WHERE first_seen < ?
|
||||
)`, cutoff); err != nil {
|
||||
return fmt.Errorf("prune observations: %w", err)
|
||||
}
|
||||
|
||||
// Delete child observations first (no CASCADE in SQLite).
|
||||
if _, err := tx.Exec(`DELETE FROM observations WHERE transmission_id IN (
|
||||
SELECT id FROM transmissions WHERE first_seen < ?
|
||||
)`, cutoff); err != nil {
|
||||
return 0, fmt.Errorf("prune observations: %w", err)
|
||||
}
|
||||
|
||||
res, err := tx.Exec(`DELETE FROM transmissions WHERE first_seen < ?`, cutoff)
|
||||
res, err := tx.Exec(`DELETE FROM transmissions WHERE first_seen < ?`, cutoff)
|
||||
if err != nil {
|
||||
return fmt.Errorf("prune transmissions: %w", err)
|
||||
}
|
||||
n, _ = res.RowsAffected()
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("prune transmissions: %w", err)
|
||||
}
|
||||
n, _ := res.RowsAffected()
|
||||
if err := tx.Commit(); err != nil {
|
||||
return 0, fmt.Errorf("prune commit: %w", err)
|
||||
return 0, err
|
||||
}
|
||||
if n > 0 {
|
||||
log.Printf("[prune] deleted %d transmissions older than %d days", n, days)
|
||||
|
||||
@@ -234,33 +234,36 @@ func (s *Store) buildAndPersistNeighborEdges() (int, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("begin: %w", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
stmt, err := tx.Prepare(`INSERT INTO neighbor_edges (node_a, node_b, count, last_seen)
|
||||
VALUES (?, ?, 1, ?)
|
||||
ON CONFLICT(node_a, node_b) DO UPDATE SET
|
||||
count = count + 1,
|
||||
last_seen = MAX(last_seen, excluded.last_seen)`)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("prepare: %w", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
var firstErr error
|
||||
for _, e := range edges {
|
||||
if _, err := stmt.Exec(e.a, e.b, e.ts); err != nil && firstErr == nil {
|
||||
firstErr = err
|
||||
// Wrap the whole edge-persist tx under writer-perf instrumentation
|
||||
// (#1340). Slow neighbor-builder ticks (the #1339 root cause) now
|
||||
// show up on /api/perf under component=neighbor_builder.
|
||||
var inserted int
|
||||
err = s.WriterTx("neighbor_builder", func(tx *sql.Tx) error {
|
||||
stmt, err := tx.Prepare(`INSERT INTO neighbor_edges (node_a, node_b, count, last_seen)
|
||||
VALUES (?, ?, 1, ?)
|
||||
ON CONFLICT(node_a, node_b) DO UPDATE SET
|
||||
count = count + 1,
|
||||
last_seen = MAX(last_seen, excluded.last_seen)`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("prepare: %w", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
var firstErr error
|
||||
for _, e := range edges {
|
||||
if _, err := stmt.Exec(e.a, e.b, e.ts); err != nil && firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
if firstErr != nil {
|
||||
return fmt.Errorf("upsert: %w", firstErr)
|
||||
}
|
||||
inserted = len(edges)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if firstErr != nil {
|
||||
return 0, fmt.Errorf("upsert: %w", firstErr)
|
||||
}
|
||||
if err := tx.Commit(); err != nil {
|
||||
return 0, fmt.Errorf("commit: %w", err)
|
||||
}
|
||||
return len(edges), nil
|
||||
return inserted, nil
|
||||
}
|
||||
|
||||
// canonEdge orders the pair so node_a <= node_b (matches the existing
|
||||
|
||||
@@ -43,6 +43,13 @@ type IngestorStatsSnapshot struct {
|
||||
// the server's /api/perf/io endpoint under .ingestor (#1120 — "Both
|
||||
// ingestor and server"). Optional; absent on non-Linux hosts.
|
||||
ProcIO *PerfIOSample `json:"procIO,omitempty"`
|
||||
// WriterPerf is the per-component SQLite writer-lock latency
|
||||
// snapshot (#1340) — wait_ms / hold_ms / contention_total tagged
|
||||
// by component (neighbor_builder, mqtt_handler, prune_packets,
|
||||
// prune_observers, prune_metrics, vacuum). Surfaced by the server
|
||||
// via /api/perf/write-sources under .writer_perf. Optional —
|
||||
// older ingestor builds don't publish this field.
|
||||
WriterPerf map[string]WriterStatsSnapshot `json:"writer_perf,omitempty"`
|
||||
}
|
||||
|
||||
// statsFilePath returns the writable path the ingestor will publish stats to.
|
||||
@@ -223,6 +230,7 @@ func StartStatsFileWriter(s *Store, interval time.Duration) {
|
||||
GroupCommitFlushes: 0, // group commit reverted (refs #1129)
|
||||
BackfillUpdates: s.Stats.SnapshotBackfills(),
|
||||
ProcIO: ioRate,
|
||||
WriterPerf: s.WriterStatsSnapshot(),
|
||||
}
|
||||
buf.Reset()
|
||||
if err := enc.Encode(&snap); err != nil {
|
||||
|
||||
@@ -297,6 +297,27 @@ type IngestorStats struct {
|
||||
// ProcIO is the ingestor's own /proc/self/io rates (since its previous
|
||||
// sample). Optional — older ingestor builds don't publish this. See #1120.
|
||||
ProcIO *PerfIOSample `json:"procIO,omitempty"`
|
||||
// WriterPerf is the per-component SQLite writer-lock latency
|
||||
// snapshot (#1340). Optional — older ingestor builds don't
|
||||
// publish this. Surfaced under .writer_perf by
|
||||
// handlePerfWriteSources.
|
||||
WriterPerf map[string]WriterStatsSnapshot `json:"writer_perf,omitempty"`
|
||||
}
|
||||
|
||||
// WriterStatsSnapshot mirrors the ingestor's per-component writer-lock
|
||||
// latency snapshot (#1340). Times are milliseconds. Server-side decode
|
||||
// uses this type to keep the JSON contract stable across processes.
|
||||
type WriterStatsSnapshot struct {
|
||||
Count int64 `json:"count"`
|
||||
ContentionTotal int64 `json:"contention_total"`
|
||||
WaitMsP50 float64 `json:"wait_ms_p50"`
|
||||
WaitMsP95 float64 `json:"wait_ms_p95"`
|
||||
WaitMsP99 float64 `json:"wait_ms_p99"`
|
||||
WaitMsMax float64 `json:"wait_ms_max"`
|
||||
HoldMsP50 float64 `json:"hold_ms_p50"`
|
||||
HoldMsP95 float64 `json:"hold_ms_p95"`
|
||||
HoldMsP99 float64 `json:"hold_ms_p99"`
|
||||
HoldMsMax float64 `json:"hold_ms_max"`
|
||||
}
|
||||
|
||||
// IngestorStatsPath is the well-known location where the ingestor writes its
|
||||
@@ -342,5 +363,14 @@ func (s *Server) handlePerfWriteSources(w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
out["sources"] = sources
|
||||
out["sampleAt"] = st.SampledAt
|
||||
// Surface per-component SQLite writer-lock latency histograms
|
||||
// (#1340) under .writer_perf so operators can see when a
|
||||
// component (e.g. neighbor_builder) is starving the writer.
|
||||
// Empty map when the ingestor is too old to publish this field.
|
||||
if len(st.WriterPerf) > 0 {
|
||||
out["writer_perf"] = st.WriterPerf
|
||||
} else {
|
||||
out["writer_perf"] = map[string]WriterStatsSnapshot{}
|
||||
}
|
||||
writeJSON(w, out)
|
||||
}
|
||||
|
||||
+2
-1
@@ -14,7 +14,8 @@
|
||||
"db": {
|
||||
"vacuumOnStartup": false,
|
||||
"incrementalVacuumPages": 1024,
|
||||
"_comment": "vacuumOnStartup: run one-time full VACUUM to enable incremental auto-vacuum on existing DBs. Executed by the INGESTOR at startup, BEFORE the MQTT subscriber starts (#1283), so there is no contention with concurrent writes. Blocks ingestor startup for minutes on large DBs; requires 2x DB file size in free disk space. incrementalVacuumPages: free pages returned to OS after each retention reaper cycle (default 1024). See #919."
|
||||
"_comment": "vacuumOnStartup: run one-time full VACUUM to enable incremental auto-vacuum on existing DBs. Executed by the INGESTOR at startup, BEFORE the MQTT subscriber starts (#1283), so there is no contention with concurrent writes. Blocks ingestor startup for minutes on large DBs; requires 2x DB file size in free disk space. incrementalVacuumPages: free pages returned to OS after each retention reaper cycle (default 1024). See #919.",
|
||||
"_comment_slowWriterMs": "#1340 — SQLite writer-lock log threshold (default 500). Any wrapped writer call (tagged neighbor_builder, mqtt_handler, prune_packets, prune_observers, prune_metrics, vacuum) whose hold_ms exceeds this emits a single [db-slow-writer] log line. Configured per-process via the CORESCOPE_DB_SLOW_WRITER_MS environment variable on the INGESTOR (e.g. CORESCOPE_DB_SLOW_WRITER_MS=200 for tighter alerting). Per-component wait_ms / hold_ms / contention_total histograms are surfaced via /api/perf/write-sources under .writer_perf regardless of this threshold."
|
||||
},
|
||||
"_comment_ingestorStats": "Ingestor publishes a 1-Hz stats snapshot consumed by the server's /api/perf/io and /api/perf/write-sources endpoints (#1120). Path is configured via the CORESCOPE_INGESTOR_STATS environment variable on the INGESTOR process. Default: /tmp/corescope-ingestor-stats.json. The writer uses O_NOFOLLOW + 0o600, so a pre-planted symlink in /tmp cannot be used to clobber an arbitrary file. SECURITY: in shared-tmp environments (multi-tenant hosts), point CORESCOPE_INGESTOR_STATS at a private directory like /var/lib/corescope/ingestor-stats.json that only the corescope user can write to.",
|
||||
"corsAllowedOrigins": [],
|
||||
|
||||
Reference in New Issue
Block a user