Files
meshcore-analyzer/cmd/ingestor/group_commit_test.go
T
Kpa-clawbot 45f2607f75 perf(ingestor): group commit observation INSERTs by time window (M1, refs #1115) (#1117)
## Summary

Implements **M1 from #1115**: batches observation/transmission INSERTs
into a single SQLite `BEGIN/COMMIT` window instead of fsyncing per
packet. At ~250 obs/sec this drops WAL fsync rate from ~20/s to ~1/s and
eliminates the `obs-persist skipped` / `SQLITE_BUSY` log spam that the
issue documents.

This is a **partial fix** — it ships the group-commit mechanism.
Acceptance items 6–7 (measured fsync rate / measured `obs-persist
skipped` rate at staging steady-state) require post-deploy observation,
and M2 (per-`tx_hash` observation buffering) is intentionally deferred.
The issue stays open for the user to verify on staging.

> Partial fix for #1115 — does not auto-close. Refs #1115.

## Mechanism

- `Store` gains an active `*sql.Tx`, `pendingRows` counter, `gcMu`, and
the `groupCommitMs` / `groupCommitMaxRows` knobs. `SetGroupCommit(ms,
maxRows)` enables the mode; `FlushGroupTx()` commits the in-flight tx.
- `InsertTransmission` lazily opens a tx on the first call after each
flush, then issues all writes through `tx.Stmt()` bindings of the
existing prepared statements. With `MaxOpenConns(1)` the connection is
already serialized; `gcMu` serializes group-commit state without
contention.
- A goroutine in `cmd/ingestor/main.go` calls `FlushGroupTx()` every
`groupCommitMs` ms. `pendingRows >= groupCommitMaxRows` triggers an
eager flush. `Close()` flushes before the WAL checkpoint so no rows are
lost on graceful shutdown.
- `groupCommitMs == 0` short-circuits to the legacy per-call auto-commit
path (statements bound to `s.db`, no tx) — current behavior preserved
byte-for-byte for operators who opt out.

## Config

Two new optional fields (ingestor-only), both documented in
`config.example.json`:

| Field | Default | Effect |
|---|---|---|
| `groupCommitMs` | `1000` | Flush window in ms. `0` disables batching
(legacy per-packet auto-commit). |
| `groupCommitMaxRows` | `1000` | Safety cap; when exceeded the queue
flushes immediately to bound memory and the crash-loss window. |

No DB schema change. No required config change on upgrade.

## Tests (TDD red → green visible in commits)

`cmd/ingestor/group_commit_test.go` — three assertions, written first as
the red commit:

- `TestGroupCommit_BatchesInsertsIntoOneTx` — 50 `InsertTransmission`
calls inside a wide window produce **0** commits until `FlushGroupTx`,
then exactly **1**; all 50 rows visible after flush. (This is the spec's
"50 observations → 1 SQLite write transaction" assertion.)
- `TestGroupCommit_Disabled` — `groupCommitMs=0` keeps every insert
immediately visible and `GroupCommitFlushes` never advances. (Spec's
"groupCommitMs=0 reverts to per-packet behavior" assertion.)
- `TestGroupCommit_MaxRowsForcesEarlyFlush` — cap=3, 7 inserts → 2
auto-flushes from the cap + 1 final manual flush = 3 total.

Red commit: `e2b0370` (stubs `SetGroupCommit` / `FlushGroupTx` so the
tests compile and fail on **assertions**, not import errors).
Green commit: `73f3559`.

Full ingestor suite (`go test ./...` in `cmd/ingestor`) stays green, ~49
s.

## Performance

This PR is the perf change itself. Local micro-test (the new
`TestGroupCommit_BatchesInsertsIntoOneTx`) shows the structural
property: 50 inserts → 1 commit. The fsync-rate measurement called out
in the M1 acceptance criteria (`~20/s → ~1/s` at 250 obs/sec) requires
staging deployment to confirm — that's the remaining open item that
keeps #1115 open after this merges.

No hot-path regressions: when `groupCommitMs > 0` we acquire one mutex
per insert (uncontended in the steady state — the connection was already
single-threaded via `MaxOpenConns(1)`). When `groupCommitMs == 0` the
code path is identical to before plus one nil-tx check.

## What this PR does NOT do (per spec)

- Does not collapse "30 observations of one packet" into 1 row write —
that's M2.
- Does not eliminate dual-writer contention with `cmd/server`'s
`resolved_path` writes.
- Does not change observation ordering or live broadcast latency.

---------

Co-authored-by: corescope-bot <bot@corescope.local>
2026-05-05 16:38:43 -07:00

156 lines
4.4 KiB
Go

package main
import (
"fmt"
"testing"
)
// makePacket returns a minimal valid PacketData with a unique hash so
// each call is treated as a distinct transmission by InsertTransmission.
func makePacket(i int) *PacketData {
snr := 1.0
rssi := -90.0
return &PacketData{
RawHex: fmt.Sprintf("AABB%04X", i),
Timestamp: "2026-05-01T00:00:00Z",
ObserverID: "obsGC",
Hash: fmt.Sprintf("gchash%010d", i),
RouteType: 2,
PayloadType: 2,
PayloadVersion: 0,
PathJSON: "[]",
DecodedJSON: `{"type":"TXT_MSG"}`,
SNR: &snr,
RSSI: &rssi,
}
}
// TestGroupCommit_BatchesInsertsIntoOneTx verifies M1 behavior: with
// groupCommitMs > 0, 50 InsertTransmission calls should produce ZERO
// commits until FlushGroupTx is called, then exactly 1 commit.
func TestGroupCommit_BatchesInsertsIntoOneTx(t *testing.T) {
s, err := OpenStore(tempDBPath(t))
if err != nil {
t.Fatal(err)
}
defer s.Close()
if err := s.UpsertObserver("obsGC", "GC Observer", "SJC", nil); err != nil {
t.Fatal(err)
}
// Enable group commit with a wide window so the test ticker doesn't fire.
s.SetGroupCommit(60_000, 1000)
startFlushes := s.Stats.GroupCommitFlushes.Load()
for i := 0; i < 50; i++ {
if _, err := s.InsertTransmission(makePacket(i)); err != nil {
t.Fatalf("insert %d: %v", i, err)
}
}
// Before flush, no commits should have occurred. (max=1000, count=50.)
if got := s.Stats.GroupCommitFlushes.Load() - startFlushes; got != 0 {
t.Fatalf("flushes before manual flush: got %d, want 0", got)
}
// Manual flush — exactly one commit for all 50 inserts.
if err := s.FlushGroupTx(); err != nil {
t.Fatalf("FlushGroupTx: %v", err)
}
if got := s.Stats.GroupCommitFlushes.Load() - startFlushes; got != 1 {
t.Fatalf("flushes after manual flush: got %d, want 1", got)
}
// All 50 rows must be visible after commit.
var n int
if err := s.db.QueryRow("SELECT COUNT(*) FROM transmissions WHERE hash LIKE 'gchash%'").Scan(&n); err != nil {
t.Fatal(err)
}
if n != 50 {
t.Fatalf("transmissions after flush: got %d, want 50", n)
}
if err := s.db.QueryRow("SELECT COUNT(*) FROM observations").Scan(&n); err != nil {
t.Fatal(err)
}
if n != 50 {
t.Fatalf("observations after flush: got %d, want 50", n)
}
}
// TestGroupCommit_Disabled verifies that with groupCommitMs == 0, every
// InsertTransmission commits immediately (current behavior preserved) and
// the GroupCommitFlushes counter never advances.
func TestGroupCommit_Disabled(t *testing.T) {
s, err := OpenStore(tempDBPath(t))
if err != nil {
t.Fatal(err)
}
defer s.Close()
if err := s.UpsertObserver("obsGC", "GC Observer", "SJC", nil); err != nil {
t.Fatal(err)
}
// Explicitly disable.
s.SetGroupCommit(0, 1000)
startFlushes := s.Stats.GroupCommitFlushes.Load()
for i := 0; i < 5; i++ {
if _, err := s.InsertTransmission(makePacket(i)); err != nil {
t.Fatalf("insert %d: %v", i, err)
}
// Each insert is immediately visible — no flush required.
var n int
if err := s.db.QueryRow("SELECT COUNT(*) FROM transmissions WHERE hash LIKE 'gchash%'").Scan(&n); err != nil {
t.Fatal(err)
}
if n != i+1 {
t.Fatalf("after insert %d: got %d transmissions, want %d", i, n, i+1)
}
}
if got := s.Stats.GroupCommitFlushes.Load() - startFlushes; got != 0 {
t.Fatalf("flushes with group commit disabled: got %d, want 0", got)
}
}
// TestGroupCommit_MaxRowsForcesEarlyFlush verifies that exceeding the
// row cap triggers an immediate flush even before the ticker fires.
func TestGroupCommit_MaxRowsForcesEarlyFlush(t *testing.T) {
s, err := OpenStore(tempDBPath(t))
if err != nil {
t.Fatal(err)
}
defer s.Close()
if err := s.UpsertObserver("obsGC", "GC Observer", "SJC", nil); err != nil {
t.Fatal(err)
}
// Window large; cap small (3) so the 4th insert should flush.
s.SetGroupCommit(60_000, 3)
startFlushes := s.Stats.GroupCommitFlushes.Load()
for i := 0; i < 7; i++ {
if _, err := s.InsertTransmission(makePacket(i)); err != nil {
t.Fatalf("insert %d: %v", i, err)
}
}
// 7 inserts with cap 3 → 2 auto-flushes (after 3 and after 6); 1 still pending.
if got := s.Stats.GroupCommitFlushes.Load() - startFlushes; got != 2 {
t.Fatalf("auto-flushes: got %d, want 2", got)
}
if err := s.FlushGroupTx(); err != nil {
t.Fatal(err)
}
if got := s.Stats.GroupCommitFlushes.Load() - startFlushes; got != 3 {
t.Fatalf("flushes after final manual flush: got %d, want 3", got)
}
}