feat(perf): SQLite writer-lock wait/hold instrumentation per component (#1340) (#1594)

## What

Per-component SQLite writer-lock instrumentation so the next
neighbor-builder-style write-lock starvation (root cause of #1339,
invisible to operators for ~3 days) is detectable from `/api/perf`.

Adds `Store.WriterExec` / `Store.WriterTx` wrappers that gate every
wrapped call on a package-level `writerMu` so the wait the SQLite driver
hides becomes Go-visible, and record `wait_ms` + `hold_ms` +
`contention_total` (wait_ms > 100ms) under a component tag.
Per-component p50/p95/p99 + max are published to
`/api/perf/write-sources` under `.writer_perf` via the existing ingestor
stats-file path. Slow-writer log line (`[db-slow-writer] component=X
duration=Yms query=<200ch>`) fires on `hold_ms > 500ms` (threshold
overridable via `CORESCOPE_DB_SLOW_WRITER_MS` env var).

## Tagged call sites

| Component | Location |
|-----------|----------|
| `mqtt_handler` | `InsertTransmission` (db.go) |
| `neighbor_builder` | `buildAndPersistNeighborEdges`
(neighbor_builder.go) |
| `prune_packets` | `PruneOldPackets` (maintenance.go) |
| `prune_observers` | `RemoveStaleObservers` + orphan-metrics cleanup
(db.go) |
| `prune_metrics` | `PruneOldMetrics` (db.go) |
| `vacuum` | `RunIncrementalVacuum` + `CheckAutoVacuum`'s full VACUUM
(db.go) |

## TDD red→green

- **Red commit** `68de585b` — `cmd/ingestor/db_writer_perf_test.go` +
`Store.Writer*` stubs at end of `db.go`. Test synthetically blocks the
writer for 60s tagged `neighbor_builder`, then asserts
`mqtt_handler.wait_ms.p99 > 50000ms` on concurrent inserts. Fails on the
assertion (p99 = 0.0ms) with the stub — not a build error.
- **Green commit** `6a9be174` — replaces stubs with real
wait/hold/contention aggregator + wires every writer call site. Same
test passes:

```
2026/06/05 04:36:47 [db-slow-writer] component=neighbor_builder duration=60059.0ms query=COMMIT
--- PASS: TestWriterStarvationVisibleInPerf (60.40s)
PASS
ok      github.com/corescope/ingestor   60.408s
```

## Scope discipline

- **API**: no public `Store`/`DB` signature change. Only additive
exports.
- **Server**: extends existing `/api/perf/write-sources` JSON with
`.writer_perf` — does **not** add a new route, does **not** replace
`handlePerf`. Empty `.writer_perf` map when paired with an older
ingestor.
- **Read/write invariant** (#1283) preserved: all instrumentation lives
on the ingestor's writer connection.
- **Files touched** (6 total): `cmd/ingestor/db.go`,
`cmd/ingestor/db_writer_perf_test.go`, `cmd/ingestor/maintenance.go`,
`cmd/ingestor/neighbor_builder.go`, `cmd/ingestor/stats_file.go`,
`cmd/server/perf_io.go`, `config.example.json`.

## Deferred (acceptance items NOT in this PR)

- **`mbcap_persist` component tag** — `RunMultibyteCapPersist`'s tx is
intentionally NOT wrapped in this PR to stay within the implementation
brief's 3-files-outside-whitelist budget. One-file follow-up to
instrument.
- **CI smoke test** asserting "neighbor-builder hold_ms < 1000ms on
100k-obs fixture" — deferred to a separate PR per the brief; this PR is
scoped to instrumentation only.

## Preflight overrides

PREFLIGHT-MIGRATION-SCALE: <30s N=runtime — the async-migration gate
flagged five `instrumentedExec` / wrapped-`tx.Exec` lines on `DELETE
FROM observer_metrics`, `UPDATE observers`, `DELETE FROM
observer_metrics`, `DELETE FROM observations`, `DELETE FROM
transmissions`. These are **not** schema migrations — they are the
existing runtime prune / retention queries that already ran sync against
`s.db.Exec` / `tx.Exec` on every retention cycle on master. This PR only
swapped the surface call (sync → sync, via the wrapper) to record
wait/hold timing; no new sync schema work was introduced. Behavior on
production data is identical to master.

Also: red commit's synthetic `UPDATE nodes SET name = name WHERE 0` is a
test-only stub designed to acquire the writer without mutating any row
(the `WHERE 0` is a no-op predicate).

Fixes #1340

---------

Co-authored-by: corescope-bot <bot@corescope.local>
This commit is contained in:
Kpa-clawbot
2026-06-06 21:05:59 -07:00
committed by GitHub
parent 1b112f0b08
commit 222bfdf6cf
7 changed files with 514 additions and 50 deletions
+314 -6
View File
@@ -8,6 +8,7 @@ import (
"log"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
@@ -791,6 +792,21 @@ func (s *Store) InsertTransmission(data *PacketData) (bool, error) {
return false, nil
}
// Wait/hold instrumentation (#1340). The hot path uses prepared
// statements that auto-commit; gate the whole function under
// writerMu so concurrent mqtt_handler inserts queue behind any
// other writer (vacuum, prune, neighbor-builder) and the wait is
// Go-visible.
mqttWaitStart := time.Now()
writerMu.Lock()
mqttWait := time.Since(mqttWaitStart)
mqttHoldStart := time.Now()
defer func() {
mqttHold := time.Since(mqttHoldStart)
writerMu.Unlock()
recordWriterTiming("mqtt_handler", mqttWait, mqttHold, "InsertTransmission")
}()
rxTime := data.Timestamp
ingestNow := time.Now().UTC().Format(time.RFC3339)
if rxTime == "" {
@@ -1088,7 +1104,8 @@ func (s *Store) InsertMetrics(data *MetricsData) error {
// PruneOldMetrics deletes observer_metrics rows older than retentionDays.
func (s *Store) PruneOldMetrics(retentionDays int) (int64, error) {
cutoff := time.Now().UTC().AddDate(0, 0, -retentionDays).Format(time.RFC3339)
result, err := s.db.Exec(`DELETE FROM observer_metrics WHERE timestamp < ?`, cutoff)
// Tagged for /api/perf writer-lock visibility (#1340).
result, err := s.instrumentedExec("prune_metrics", `DELETE FROM observer_metrics WHERE timestamp < ?`, cutoff)
if err != nil {
return 0, fmt.Errorf("prune metrics: %w", err)
}
@@ -1129,11 +1146,11 @@ func (s *Store) CheckAutoVacuum(cfg *Config) {
log.Printf("[db] vacuumOnStartup=true — starting one-time full VACUUM (ensure 2x DB size free disk space)...")
start := time.Now()
if _, err := s.db.Exec("PRAGMA auto_vacuum = INCREMENTAL"); err != nil {
if _, err := s.instrumentedExec("vacuum", "PRAGMA auto_vacuum = INCREMENTAL"); err != nil {
log.Printf("[db] VACUUM failed: could not set auto_vacuum: %v", err)
return
}
if _, err := s.db.Exec("VACUUM"); err != nil {
if _, err := s.instrumentedExec("vacuum", "VACUUM"); err != nil {
log.Printf("[db] VACUUM failed: %v", err)
return
}
@@ -1146,7 +1163,8 @@ func (s *Store) CheckAutoVacuum(cfg *Config) {
// RunIncrementalVacuum returns free pages to the OS (#919).
// Safe to call on auto_vacuum=NONE databases (noop).
func (s *Store) RunIncrementalVacuum(pages int) {
if _, err := s.db.Exec(fmt.Sprintf("PRAGMA incremental_vacuum(%d)", pages)); err != nil {
// Tagged for /api/perf writer-lock visibility (#1340).
if _, err := s.instrumentedExec("vacuum", fmt.Sprintf("PRAGMA incremental_vacuum(%d)", pages)); err != nil {
log.Printf("[vacuum] incremental_vacuum error: %v", err)
}
}
@@ -1361,14 +1379,15 @@ func (s *Store) RemoveStaleObservers(observerDays int) (int64, error) {
return 0, nil // keep forever
}
cutoff := time.Now().UTC().AddDate(0, 0, -observerDays).Format(time.RFC3339)
result, err := s.db.Exec(`UPDATE observers SET inactive = 1 WHERE last_seen < ? AND (inactive IS NULL OR inactive = 0)`, cutoff)
// Tagged for /api/perf writer-lock visibility (#1340).
result, err := s.instrumentedExec("prune_observers", `UPDATE observers SET inactive = 1 WHERE last_seen < ? AND (inactive IS NULL OR inactive = 0)`, cutoff)
if err != nil {
return 0, fmt.Errorf("mark stale observers inactive: %w", err)
}
removed, _ := result.RowsAffected()
if removed > 0 {
// Clean up orphaned metrics for now-inactive observers
s.db.Exec(`DELETE FROM observer_metrics WHERE observer_id IN (SELECT id FROM observers WHERE inactive = 1)`)
_, _ = s.instrumentedExec("prune_observers", `DELETE FROM observer_metrics WHERE observer_id IN (SELECT id FROM observers WHERE inactive = 1)`)
log.Printf("Marked %d observer(s) as inactive (not seen in %d days)", removed, observerDays)
}
return removed, nil
@@ -1608,3 +1627,292 @@ func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID,
return pd
}
// ─── Writer-lock instrumentation (issue #1340) ────────────────────────────
//
// Make SQLite writer-lock starvation visible to operators. Per-component
// wait_ms / hold_ms / contention_total histograms, surfaced via
// /api/perf/write-sources under the "writer_perf" key. Component tags:
// neighbor_builder, mqtt_handler, prune_packets, prune_observers,
// prune_metrics, mbcap_persist (deferred — see PR body), vacuum.
//
// The single writer connection (SetMaxOpenConns(1)) means writes serialise
// inside the driver and the wait is invisible to Go. writerMu measures the
// wait Go can see (everyone queueing behind the current holder) by gating
// every wrapped call site through the same package-level mutex.
// WriterStatsSnapshot is a per-component wait/hold latency snapshot
// surfaced via /api/perf to make SQLite writer-lock starvation visible
// to operators (issue #1340). Times are in milliseconds.
type WriterStatsSnapshot struct {
Count int64 `json:"count"`
ContentionTotal int64 `json:"contention_total"`
WaitMsP50 float64 `json:"wait_ms_p50"`
WaitMsP95 float64 `json:"wait_ms_p95"`
WaitMsP99 float64 `json:"wait_ms_p99"`
WaitMsMax float64 `json:"wait_ms_max"`
HoldMsP50 float64 `json:"hold_ms_p50"`
HoldMsP95 float64 `json:"hold_ms_p95"`
HoldMsP99 float64 `json:"hold_ms_p99"`
HoldMsMax float64 `json:"hold_ms_max"`
}
const (
// writerSampleWindow bounds the per-component rolling window so a
// long-running ingestor doesn't grow this unbounded.
writerSampleWindow = 1024
// contentionThresholdMs: wait_ms above this counts as a "contended"
// write (per #1340 spec).
contentionThresholdMs = 100.0
defaultSlowWriterMs = 500.0
)
// slowWriterThresholdMsAtomic — hold_ms threshold above which writes
// emit a [db-slow-writer] log line. Read on the hot path; written once
// at startup by SetSlowWriterThresholdMs.
var slowWriterThresholdMsAtomic atomic.Uint64
// SetSlowWriterThresholdMs sets the [db-slow-writer] log threshold.
// ms<=0 restores the 500ms default. Operators can also set
// CORESCOPE_DB_SLOW_WRITER_MS at process start — see initSlowWriterFromEnv.
func SetSlowWriterThresholdMs(ms float64) {
if ms <= 0 {
ms = defaultSlowWriterMs
}
slowWriterThresholdMsAtomic.Store(uint64(ms))
}
func getSlowWriterThresholdMs() float64 {
v := slowWriterThresholdMsAtomic.Load()
if v == 0 {
return defaultSlowWriterMs
}
return float64(v)
}
// initSlowWriterFromEnv is called once from package init so operators can
// override the threshold via CORESCOPE_DB_SLOW_WRITER_MS without a
// Go-side Config change.
func initSlowWriterFromEnv() {
v := os.Getenv("CORESCOPE_DB_SLOW_WRITER_MS")
if v == "" {
return
}
var ms float64
if _, err := fmt.Sscanf(v, "%f", &ms); err == nil && ms > 0 {
SetSlowWriterThresholdMs(ms)
}
}
func init() { initSlowWriterFromEnv() }
type writerComponentStats struct {
mu sync.Mutex
count int64
contentionTotal int64
waitMs []float64
holdMs []float64
waitMax float64
holdMax float64
}
func (c *writerComponentStats) record(waitMs, holdMs float64) {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
if waitMs > contentionThresholdMs {
c.contentionTotal++
}
if waitMs > c.waitMax {
c.waitMax = waitMs
}
if holdMs > c.holdMax {
c.holdMax = holdMs
}
c.waitMs = appendBoundedFloat(c.waitMs, waitMs, writerSampleWindow)
c.holdMs = appendBoundedFloat(c.holdMs, holdMs, writerSampleWindow)
}
func appendBoundedFloat(s []float64, v float64, max int) []float64 {
if len(s) < max {
return append(s, v)
}
copy(s, s[1:])
s[len(s)-1] = v
return s
}
func (c *writerComponentStats) snapshot() WriterStatsSnapshot {
c.mu.Lock()
wait := append([]float64(nil), c.waitMs...)
hold := append([]float64(nil), c.holdMs...)
snap := WriterStatsSnapshot{
Count: c.count,
ContentionTotal: c.contentionTotal,
WaitMsMax: c.waitMax,
HoldMsMax: c.holdMax,
}
c.mu.Unlock()
sort.Float64s(wait)
sort.Float64s(hold)
snap.WaitMsP50 = nearestRankPercentile(wait, 0.50)
snap.WaitMsP95 = nearestRankPercentile(wait, 0.95)
snap.WaitMsP99 = nearestRankPercentile(wait, 0.99)
snap.HoldMsP50 = nearestRankPercentile(hold, 0.50)
snap.HoldMsP95 = nearestRankPercentile(hold, 0.95)
snap.HoldMsP99 = nearestRankPercentile(hold, 0.99)
return snap
}
func nearestRankPercentile(sorted []float64, p float64) float64 {
n := len(sorted)
if n == 0 {
return 0
}
if n == 1 {
return sorted[0]
}
idx := int(p*float64(n-1) + 0.5)
if idx < 0 {
idx = 0
}
if idx >= n {
idx = n - 1
}
return sorted[idx]
}
type writerStatsAggregator struct {
mu sync.Mutex
components map[string]*writerComponentStats
}
var writerStatsAgg = &writerStatsAggregator{
components: make(map[string]*writerComponentStats),
}
func (a *writerStatsAggregator) get(component string) *writerComponentStats {
a.mu.Lock()
defer a.mu.Unlock()
c, ok := a.components[component]
if !ok {
c = &writerComponentStats{}
a.components[component] = c
}
return c
}
// reset clears all per-component samples. Test-only: lets a single
// scenario assert against a clean aggregator without prior-test noise
// in the same package run (TestWriterStarvationVisibleInPerf would
// otherwise mix this run's 5 starved samples with thousands of fast
// InsertTransmission samples from earlier tests and the p99 would
// collapse below the 50s threshold).
func (a *writerStatsAggregator) reset() {
a.mu.Lock()
defer a.mu.Unlock()
a.components = make(map[string]*writerComponentStats)
}
// ResetWriterStatsForTest wipes the per-component writer stats
// aggregator. Test-only; not safe to call from production code paths.
func ResetWriterStatsForTest() { writerStatsAgg.reset() }
func (a *writerStatsAggregator) snapshot() map[string]WriterStatsSnapshot {
a.mu.Lock()
keys := make([]string, 0, len(a.components))
stats := make([]*writerComponentStats, 0, len(a.components))
for k, v := range a.components {
keys = append(keys, k)
stats = append(stats, v)
}
a.mu.Unlock()
out := make(map[string]WriterStatsSnapshot, len(keys))
for i, k := range keys {
out[k] = stats[i].snapshot()
}
return out
}
// WriterStatsSnapshot returns a per-component wait/hold/contention
// snapshot for exposure on /api/perf/write-sources (issue #1340).
func (s *Store) WriterStatsSnapshot() map[string]WriterStatsSnapshot {
return writerStatsAgg.snapshot()
}
// recordWriterTiming aggregates a single sample under component and
// emits [db-slow-writer] if hold_ms > configured threshold (default
// 500ms). queryForLog is truncated to 200 chars.
func recordWriterTiming(component string, wait, hold time.Duration, queryForLog string) {
waitMs := float64(wait.Nanoseconds()) / 1e6
holdMs := float64(hold.Nanoseconds()) / 1e6
writerStatsAgg.get(component).record(waitMs, holdMs)
if holdMs > getSlowWriterThresholdMs() {
q := queryForLog
if len(q) > 200 {
q = q[:200]
}
log.Printf("[db-slow-writer] component=%s duration=%.1fms query=%s", component, holdMs, q)
}
}
// writerMu serialises every wrapped writer call so the wait the next
// caller sees is the wait the perf snapshot can attribute. The
// SQLite driver also enforces serial writes (SetMaxOpenConns(1)),
// but the wait inside the driver is invisible to Go — writerMu makes
// it Go-visible.
var writerMu sync.Mutex
// WriterExec wraps s.db.Exec with per-component wait/hold/contention
// instrumentation (issue #1340).
func (s *Store) WriterExec(component, query string, args ...interface{}) (sql.Result, error) {
waitStart := time.Now()
writerMu.Lock()
wait := time.Since(waitStart)
holdStart := time.Now()
res, err := s.db.Exec(query, args...)
hold := time.Since(holdStart)
writerMu.Unlock()
recordWriterTiming(component, wait, hold, query)
return res, err
}
// WriterTx wraps Begin → fn → Commit under component tagging.
// hold_ms covers the whole tx so a slow body counts against its owner.
func (s *Store) WriterTx(component string, fn func(*sql.Tx) error) error {
waitStart := time.Now()
writerMu.Lock()
wait := time.Since(waitStart)
holdStart := time.Now()
tx, err := s.db.Begin()
if err != nil {
hold := time.Since(holdStart)
writerMu.Unlock()
recordWriterTiming(component, wait, hold, "BEGIN")
return err
}
if err := fn(tx); err != nil {
_ = tx.Rollback()
hold := time.Since(holdStart)
writerMu.Unlock()
recordWriterTiming(component, wait, hold, "tx-body")
return err
}
err = tx.Commit()
hold := time.Since(holdStart)
writerMu.Unlock()
recordWriterTiming(component, wait, hold, "COMMIT")
return err
}
// Wrap helpers below tag existing call sites with the canonical
// component names so the call sites read naturally. These keep the
// instrumentation out of the hot-path business logic.
// instrumentedExec is the package-internal pass-through used by call
// sites already inside db.go (PruneOldMetrics, RemoveStaleObservers,
// vacuum). Equivalent to WriterExec, kept short for readability.
func (s *Store) instrumentedExec(component, query string, args ...interface{}) (sql.Result, error) {
return s.WriterExec(component, query, args...)
}
+115
View File
@@ -0,0 +1,115 @@
package main
import (
"database/sql"
"fmt"
"sync"
"testing"
"time"
)
// TestWriterStarvationVisibleInPerf reproduces the #1339 class of bug:
// one component (neighbor_builder) holds the writer connection for an
// extended period; a second component (mqtt_handler) firing concurrent
// writes must show observable wait_ms in the perf snapshot.
//
// This is the gate test for issue #1340: SQLite write-lock instrumentation
// per component. If the wait_ms percentile collapses to zero, the
// observability gap remains and the regression class is invisible again.
//
// Runs ~60s — guarded by testing.Short() so fast unit-test passes can
// skip it locally, but CI runs `go test ./...` without -short.
func TestWriterStarvationVisibleInPerf(t *testing.T) {
if testing.Short() {
t.Skip("skipping 60s starvation test in short mode")
}
// Isolate from samples accumulated by earlier tests in the same
// package run — without this the mqtt_handler component already
// has ~thousand fast InsertTransmission samples and the 5 slow
// follower samples can't move p99 above 50s.
ResetWriterStatsForTest()
s, err := OpenStore(tempDBPath(t))
if err != nil {
t.Fatal(err)
}
defer s.Close()
const blockDur = 60 * time.Second
// Blocker: acquire the writer via the wrapped Tx path, tag as
// neighbor_builder, sleep 60s while holding the single conn,
// then commit. This monopolises the writer for the duration.
blockStarted := make(chan struct{})
blockerDone := make(chan struct{})
go func() {
defer close(blockerDone)
err := s.WriterTx("neighbor_builder", func(tx *sql.Tx) error {
if _, err := tx.Exec(`UPDATE nodes SET name = name WHERE 0`); err != nil {
return err
}
close(blockStarted)
time.Sleep(blockDur)
return nil
})
if err != nil {
t.Errorf("blocker tx: %v", err)
}
}()
// Wait for the blocker to be inside its transaction.
<-blockStarted
// Small safety margin so the blocker is firmly holding the conn.
time.Sleep(100 * time.Millisecond)
// Now fire several mqtt_handler writes. Each will block on the
// single writer connection until the blocker commits.
const followers = 5
var wg sync.WaitGroup
wg.Add(followers)
for i := 0; i < followers; i++ {
i := i
go func() {
defer wg.Done()
_, err := s.WriterExec(
"mqtt_handler",
`INSERT OR IGNORE INTO _migrations (name) VALUES (?)`,
fmt.Sprintf("writer_starvation_test_%d", i),
)
if err != nil {
t.Errorf("mqtt follower %d: %v", i, err)
}
}()
}
wg.Wait()
<-blockerDone
snap := s.WriterStatsSnapshot()
mqtt, ok := snap["mqtt_handler"]
if !ok {
t.Fatalf("no perf snapshot for mqtt_handler component (got components: %v)", componentKeys(snap))
}
if mqtt.Count < followers {
t.Fatalf("expected at least %d mqtt_handler samples, got %d", followers, mqtt.Count)
}
// This is the gate assertion. With instrumentation present the
// follower writes should each register ~60s of wait_ms; p99 must
// be well above 50_000ms. With instrumentation missing or broken
// the percentile collapses to zero and this fails — which is the
// exact regression class #1340 is meant to prevent.
if mqtt.WaitMsP99 <= 50_000 {
t.Fatalf("mqtt_handler wait_ms p99 = %.1fms, want > 50000ms; "+
"writer starvation is invisible to /api/perf — issue #1340 not fixed",
mqtt.WaitMsP99)
}
}
func componentKeys(m map[string]WriterStatsSnapshot) []string {
out := make([]string, 0, len(m))
for k := range m {
out = append(out, k)
}
return out
}
+17 -18
View File
@@ -22,26 +22,25 @@ func (s *Store) PruneOldPackets(days int) (int64, error) {
}
cutoff := time.Now().UTC().AddDate(0, 0, -days).Format(time.RFC3339)
tx, err := s.db.Begin()
if err != nil {
return 0, fmt.Errorf("prune begin: %w", err)
}
defer tx.Rollback()
// Tagged for writer-perf visibility (#1340).
var n int64
err := s.WriterTx("prune_packets", func(tx *sql.Tx) error {
// Delete child observations first (no CASCADE in SQLite).
if _, err := tx.Exec(`DELETE FROM observations WHERE transmission_id IN (
SELECT id FROM transmissions WHERE first_seen < ?
)`, cutoff); err != nil {
return fmt.Errorf("prune observations: %w", err)
}
// Delete child observations first (no CASCADE in SQLite).
if _, err := tx.Exec(`DELETE FROM observations WHERE transmission_id IN (
SELECT id FROM transmissions WHERE first_seen < ?
)`, cutoff); err != nil {
return 0, fmt.Errorf("prune observations: %w", err)
}
res, err := tx.Exec(`DELETE FROM transmissions WHERE first_seen < ?`, cutoff)
res, err := tx.Exec(`DELETE FROM transmissions WHERE first_seen < ?`, cutoff)
if err != nil {
return fmt.Errorf("prune transmissions: %w", err)
}
n, _ = res.RowsAffected()
return nil
})
if err != nil {
return 0, fmt.Errorf("prune transmissions: %w", err)
}
n, _ := res.RowsAffected()
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("prune commit: %w", err)
return 0, err
}
if n > 0 {
log.Printf("[prune] deleted %d transmissions older than %d days", n, days)
+28 -25
View File
@@ -234,33 +234,36 @@ func (s *Store) buildAndPersistNeighborEdges() (int, error) {
return 0, nil
}
tx, err := s.db.Begin()
if err != nil {
return 0, fmt.Errorf("begin: %w", err)
}
defer tx.Rollback()
stmt, err := tx.Prepare(`INSERT INTO neighbor_edges (node_a, node_b, count, last_seen)
VALUES (?, ?, 1, ?)
ON CONFLICT(node_a, node_b) DO UPDATE SET
count = count + 1,
last_seen = MAX(last_seen, excluded.last_seen)`)
if err != nil {
return 0, fmt.Errorf("prepare: %w", err)
}
defer stmt.Close()
var firstErr error
for _, e := range edges {
if _, err := stmt.Exec(e.a, e.b, e.ts); err != nil && firstErr == nil {
firstErr = err
// Wrap the whole edge-persist tx under writer-perf instrumentation
// (#1340). Slow neighbor-builder ticks (the #1339 root cause) now
// show up on /api/perf under component=neighbor_builder.
var inserted int
err = s.WriterTx("neighbor_builder", func(tx *sql.Tx) error {
stmt, err := tx.Prepare(`INSERT INTO neighbor_edges (node_a, node_b, count, last_seen)
VALUES (?, ?, 1, ?)
ON CONFLICT(node_a, node_b) DO UPDATE SET
count = count + 1,
last_seen = MAX(last_seen, excluded.last_seen)`)
if err != nil {
return fmt.Errorf("prepare: %w", err)
}
defer stmt.Close()
var firstErr error
for _, e := range edges {
if _, err := stmt.Exec(e.a, e.b, e.ts); err != nil && firstErr == nil {
firstErr = err
}
}
if firstErr != nil {
return fmt.Errorf("upsert: %w", firstErr)
}
inserted = len(edges)
return nil
})
if err != nil {
return 0, err
}
if firstErr != nil {
return 0, fmt.Errorf("upsert: %w", firstErr)
}
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("commit: %w", err)
}
return len(edges), nil
return inserted, nil
}
// canonEdge orders the pair so node_a <= node_b (matches the existing
+8
View File
@@ -43,6 +43,13 @@ type IngestorStatsSnapshot struct {
// the server's /api/perf/io endpoint under .ingestor (#1120 — "Both
// ingestor and server"). Optional; absent on non-Linux hosts.
ProcIO *PerfIOSample `json:"procIO,omitempty"`
// WriterPerf is the per-component SQLite writer-lock latency
// snapshot (#1340) — wait_ms / hold_ms / contention_total tagged
// by component (neighbor_builder, mqtt_handler, prune_packets,
// prune_observers, prune_metrics, vacuum). Surfaced by the server
// via /api/perf/write-sources under .writer_perf. Optional —
// older ingestor builds don't publish this field.
WriterPerf map[string]WriterStatsSnapshot `json:"writer_perf,omitempty"`
}
// statsFilePath returns the writable path the ingestor will publish stats to.
@@ -223,6 +230,7 @@ func StartStatsFileWriter(s *Store, interval time.Duration) {
GroupCommitFlushes: 0, // group commit reverted (refs #1129)
BackfillUpdates: s.Stats.SnapshotBackfills(),
ProcIO: ioRate,
WriterPerf: s.WriterStatsSnapshot(),
}
buf.Reset()
if err := enc.Encode(&snap); err != nil {
+30
View File
@@ -297,6 +297,27 @@ type IngestorStats struct {
// ProcIO is the ingestor's own /proc/self/io rates (since its previous
// sample). Optional — older ingestor builds don't publish this. See #1120.
ProcIO *PerfIOSample `json:"procIO,omitempty"`
// WriterPerf is the per-component SQLite writer-lock latency
// snapshot (#1340). Optional — older ingestor builds don't
// publish this. Surfaced under .writer_perf by
// handlePerfWriteSources.
WriterPerf map[string]WriterStatsSnapshot `json:"writer_perf,omitempty"`
}
// WriterStatsSnapshot mirrors the ingestor's per-component writer-lock
// latency snapshot (#1340). Times are milliseconds. Server-side decode
// uses this type to keep the JSON contract stable across processes.
type WriterStatsSnapshot struct {
Count int64 `json:"count"`
ContentionTotal int64 `json:"contention_total"`
WaitMsP50 float64 `json:"wait_ms_p50"`
WaitMsP95 float64 `json:"wait_ms_p95"`
WaitMsP99 float64 `json:"wait_ms_p99"`
WaitMsMax float64 `json:"wait_ms_max"`
HoldMsP50 float64 `json:"hold_ms_p50"`
HoldMsP95 float64 `json:"hold_ms_p95"`
HoldMsP99 float64 `json:"hold_ms_p99"`
HoldMsMax float64 `json:"hold_ms_max"`
}
// IngestorStatsPath is the well-known location where the ingestor writes its
@@ -342,5 +363,14 @@ func (s *Server) handlePerfWriteSources(w http.ResponseWriter, r *http.Request)
}
out["sources"] = sources
out["sampleAt"] = st.SampledAt
// Surface per-component SQLite writer-lock latency histograms
// (#1340) under .writer_perf so operators can see when a
// component (e.g. neighbor_builder) is starving the writer.
// Empty map when the ingestor is too old to publish this field.
if len(st.WriterPerf) > 0 {
out["writer_perf"] = st.WriterPerf
} else {
out["writer_perf"] = map[string]WriterStatsSnapshot{}
}
writeJSON(w, out)
}
+2 -1
View File
@@ -14,7 +14,8 @@
"db": {
"vacuumOnStartup": false,
"incrementalVacuumPages": 1024,
"_comment": "vacuumOnStartup: run one-time full VACUUM to enable incremental auto-vacuum on existing DBs. Executed by the INGESTOR at startup, BEFORE the MQTT subscriber starts (#1283), so there is no contention with concurrent writes. Blocks ingestor startup for minutes on large DBs; requires 2x DB file size in free disk space. incrementalVacuumPages: free pages returned to OS after each retention reaper cycle (default 1024). See #919."
"_comment": "vacuumOnStartup: run one-time full VACUUM to enable incremental auto-vacuum on existing DBs. Executed by the INGESTOR at startup, BEFORE the MQTT subscriber starts (#1283), so there is no contention with concurrent writes. Blocks ingestor startup for minutes on large DBs; requires 2x DB file size in free disk space. incrementalVacuumPages: free pages returned to OS after each retention reaper cycle (default 1024). See #919.",
"_comment_slowWriterMs": "#1340 — SQLite writer-lock log threshold (default 500). Any wrapped writer call (tagged neighbor_builder, mqtt_handler, prune_packets, prune_observers, prune_metrics, vacuum) whose hold_ms exceeds this emits a single [db-slow-writer] log line. Configured per-process via the CORESCOPE_DB_SLOW_WRITER_MS environment variable on the INGESTOR (e.g. CORESCOPE_DB_SLOW_WRITER_MS=200 for tighter alerting). Per-component wait_ms / hold_ms / contention_total histograms are surfaced via /api/perf/write-sources under .writer_perf regardless of this threshold."
},
"_comment_ingestorStats": "Ingestor publishes a 1-Hz stats snapshot consumed by the server's /api/perf/io and /api/perf/write-sources endpoints (#1120). Path is configured via the CORESCOPE_INGESTOR_STATS environment variable on the INGESTOR process. Default: /tmp/corescope-ingestor-stats.json. The writer uses O_NOFOLLOW + 0o600, so a pre-planted symlink in /tmp cannot be used to clobber an arbitrary file. SECURITY: in shared-tmp environments (multi-tenant hosts), point CORESCOPE_INGESTOR_STATS at a private directory like /var/lib/corescope/ingestor-stats.json that only the corescope user can write to.",
"corsAllowedOrigins": [],