From 76d89e65788cd034fc8d98ce3102225d7e0fcd69 Mon Sep 17 00:00:00 2001 From: Kpa-clawbot Date: Tue, 5 May 2026 17:35:16 -0700 Subject: [PATCH] fix(ingestor): exclude path_json='[]' rows from backfill WHERE (#1119) (#1121) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary `BackfillPathJSONAsync` re-selected observations whose `path_json` was already `'[]'`, rewrote them to `'[]'`, and looped forever. The `len(batch) == 0` exit condition was never reached, the migration marker was never recorded, and the ingestor sustained 2–3 MB/s WAL writes at idle (76% of CPU in `sqlite.Exec` per pprof). ## Fix Drop `'[]'` from the WHERE clause: ```diff WHERE o.raw_hex IS NOT NULL AND o.raw_hex != '' - AND (o.path_json IS NULL OR o.path_json = '' OR o.path_json = '[]') + AND (o.path_json IS NULL OR o.path_json = '') ``` `'[]'` is the "already attempted, no hops" sentinel (still written at line 994 of `cmd/ingestor/db.go` when `DecodePathFromRawHex` returns no hops). Excluding it from the WHERE lets the loop terminate after one full pass and the migration marker `backfill_path_json_from_raw_hex_v1` to be recorded. ## TDD - **Red commit** (`19f8004`): `TestBackfillPathJSONAsync_BracketRowsTerminate` — seeds 100 observations with `path_json='[]'` and a `raw_hex` that decodes to zero hops, asserts the migration marker is written within 5s. Fails on master with *"backfill never recorded migration marker within 5s — infinite loop on path_json='[]' rows"*. - **Green commit** (`7019100`): WHERE-clause fix + updates `TestBackfillPathJsonFromRawHex` row 1 expectation (the pre-seeded `'[]'` row is now correctly skipped instead of being re-decoded). ## Test results ``` ok github.com/corescope/ingestor 49.656s ``` ## Acceptance criteria from #1119 - [x] Backfill terminates within 1 polling cycle of having no progress to make - [x] Migration marker `backfill_path_json_from_raw_hex_v1` written after termination - [x] On restart, backfill recognizes migration done and exits immediately (existing behavior — the migration check at the top of `BackfillPathJSONAsync` was always correct; the bug was that the marker never got written) - [x] Test: seed DB with N observations all having `path_json = '[]'` → backfill runs once → no UPDATEs issued, migration marker written - [ ] Disk write rate on idle staging drops from 2–3 MB/s to <100 KB/s — to be verified by the user post-deploy Fixes #1119. --------- Co-authored-by: OpenClaw Bot --- cmd/ingestor/db.go | 4 +- cmd/ingestor/db_test.go | 157 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 157 insertions(+), 4 deletions(-) diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index edeee1fc..160f67dc 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -1092,7 +1092,9 @@ func (s *Store) BackfillPathJSONAsync() { FROM observations o JOIN transmissions t ON o.transmission_id = t.id WHERE o.raw_hex IS NOT NULL AND o.raw_hex != '' - AND (o.path_json IS NULL OR o.path_json = '' OR o.path_json = '[]') + -- NB: '[]' is the "already attempted, no hops" sentinel; excluded + -- to prevent the infinite re-UPDATE loop fixed in #1119. + AND (o.path_json IS NULL OR o.path_json = '') AND t.payload_type != 9 LIMIT ?`, batchSize) if err != nil { diff --git a/cmd/ingestor/db_test.go b/cmd/ingestor/db_test.go index 82a5ce15..44b46b53 100644 --- a/cmd/ingestor/db_test.go +++ b/cmd/ingestor/db_test.go @@ -2232,11 +2232,13 @@ func TestBackfillPathJsonFromRawHex(t *testing.T) { t.Fatalf("migration not recorded") } - // Row 1 (was '[]') should now have decoded hops + // Row 1 (was '[]') is NOT re-processed by the backfill — '[]' means + // "already attempted, no hops" and is excluded by the WHERE to avoid the + // infinite-loop bug fixed in #1119. It must remain '[]'. var pj1 string s2.db.QueryRow("SELECT path_json FROM observations WHERE id = 1").Scan(&pj1) - if pj1 != `["AABB","CCDD"]` { - t.Errorf("row 1 path_json = %q, want %q", pj1, `["AABB","CCDD"]`) + if pj1 != "[]" { + t.Errorf("row 1 path_json = %q, want %q (must not re-process '[]' rows after #1119)", pj1, "[]") } // Row 2 (was NULL) should now have decoded hops @@ -2567,3 +2569,152 @@ func TestBackfillPathJSONAsyncMethodExists(t *testing.T) { // This is a compile-time check — if the method doesn't exist, the test won't compile. store.BackfillPathJSONAsync() } + +// TestBackfillPathJSONAsync_BracketRowsTerminate exercises the infinite-loop bug +// from issue #1119. Observations whose path_json is already '[]' (meaning a prior +// backfill pass attempted to decode them and found no hops) must NOT be re-selected +// by the WHERE clause — otherwise the loop rewrites the same '[]' value forever +// and never records the migration marker. +// +// This test seeds N rows with path_json='[]' and a raw_hex that DecodePathFromRawHex +// resolves to zero hops. With the bug, the backfill loops infinitely re-UPDATEing +// the same rows back to '[]', batch is never empty, migration marker is never +// written. With the fix, no rows match → the very first batch is empty → migration +// is recorded immediately. +func TestBackfillPathJSONAsync_BracketRowsTerminate(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "bracket_terminate.db") + + // Bootstrap a minimal schema directly so we can seed pre-existing '[]' rows + // before OpenStore runs. + db, err := sql.Open("sqlite", dbPath+"?_pragma=journal_mode(WAL)&_pragma=busy_timeout(5000)") + if err != nil { + t.Fatal(err) + } + _, err = db.Exec(` + CREATE TABLE _migrations (name TEXT PRIMARY KEY); + CREATE TABLE transmissions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + raw_hex TEXT NOT NULL, + hash TEXT NOT NULL UNIQUE, + first_seen TEXT NOT NULL, + route_type INTEGER, + payload_type INTEGER, + payload_version INTEGER, + decoded_json TEXT, + created_at TEXT DEFAULT (datetime('now')), + channel_hash TEXT + ); + CREATE TABLE observers ( + id TEXT PRIMARY KEY, name TEXT, iata TEXT, + last_seen TEXT, first_seen TEXT, packet_count INTEGER DEFAULT 0, + model TEXT, firmware TEXT, client_version TEXT, radio TEXT, + battery_mv INTEGER, uptime_secs INTEGER, noise_floor REAL, + inactive INTEGER DEFAULT 0, last_packet_at TEXT + ); + CREATE TABLE nodes ( + public_key TEXT PRIMARY KEY, name TEXT, role TEXT, + lat REAL, lon REAL, last_seen TEXT, first_seen TEXT, + advert_count INTEGER DEFAULT 0, battery_mv INTEGER, temperature_c REAL + ); + CREATE TABLE inactive_nodes ( + public_key TEXT PRIMARY KEY, name TEXT, role TEXT, + lat REAL, lon REAL, last_seen TEXT, first_seen TEXT, + advert_count INTEGER DEFAULT 0, battery_mv INTEGER, temperature_c REAL + ); + CREATE TABLE observations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + transmission_id INTEGER NOT NULL REFERENCES transmissions(id), + observer_idx INTEGER, direction TEXT, + snr REAL, rssi REAL, score INTEGER, + path_json TEXT, + timestamp INTEGER NOT NULL, + raw_hex TEXT + ); + CREATE UNIQUE INDEX idx_observations_dedup ON observations(transmission_id, observer_idx, COALESCE(path_json, '')); + CREATE INDEX idx_observations_transmission_id ON observations(transmission_id); + CREATE INDEX idx_observations_observer_idx ON observations(observer_idx); + CREATE INDEX idx_observations_timestamp ON observations(timestamp); + CREATE TABLE observer_metrics ( + observer_id TEXT NOT NULL, timestamp TEXT NOT NULL, + noise_floor REAL, tx_air_secs INTEGER, rx_air_secs INTEGER, + recv_errors INTEGER, battery_mv INTEGER, + packets_sent INTEGER, packets_recv INTEGER, + PRIMARY KEY (observer_id, timestamp) + ); + CREATE TABLE dropped_packets ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + hash TEXT, raw_hex TEXT, reason TEXT NOT NULL, + observer_id TEXT, observer_name TEXT, + node_pubkey TEXT, node_name TEXT, + dropped_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + `) + if err != nil { + t.Fatal("bootstrap schema:", err) + } + + // Mark all migrations done EXCEPT backfill_path_json_from_raw_hex_v1. + for _, m := range []string{ + "advert_count_unique_v1", "noise_floor_real_v1", "node_telemetry_v1", + "obs_timestamp_index_v1", "observer_metrics_v1", "observer_metrics_ts_idx", + "observers_inactive_v1", "observer_metrics_packets_v1", "channel_hash_v1", + "dropped_packets_v1", "observations_raw_hex_v1", "observers_last_packet_at_v1", + "cleanup_legacy_null_hash_ts", + } { + db.Exec(`INSERT INTO _migrations (name) VALUES (?)`, m) + } + + // raw_hex producing ZERO hops via DecodePathFromRawHex: + // DIRECT route (type=2), payload_type=2, version=0 → header 0x0A; path byte 0x00. + // (See internal/packetpath/path_test.go: TestDecodePathFromRawHex_ZeroHops.) + rawHex := "0A00DEADBEEF" + _, err = db.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, payload_type) VALUES (?, 'h_brackets', '2025-01-01T00:00:00Z', 2)`, rawHex) + if err != nil { + t.Fatal("insert tx:", err) + } + const seedCount = 100 + for i := 0; i < seedCount; i++ { + _, err = db.Exec(`INSERT INTO observations (transmission_id, observer_idx, timestamp, raw_hex, path_json) VALUES (1, ?, ?, ?, '[]')`, + i+1, 1700000000+i, rawHex) + if err != nil { + t.Fatalf("insert obs %d: %v", i, err) + } + } + db.Close() + + store, err := OpenStoreWithInterval(dbPath, 300) + if err != nil { + t.Fatal("OpenStore:", err) + } + defer store.Close() + + // Trigger backfill. With the bug, every iteration re-fetches all 100 rows + // (because '[]' matches the WHERE), rewrites them to '[]', sleeps 50ms, repeats. + // The loop never terminates and the migration marker is never written. + store.BackfillPathJSONAsync() + + // Generous deadline: with the fix the marker is written essentially immediately. + // With the bug the marker is never written within any bounded time. + deadline := time.Now().Add(5 * time.Second) + var done int + for time.Now().Before(deadline) { + err = store.db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'backfill_path_json_from_raw_hex_v1'").Scan(&done) + if err == nil { + break + } + time.Sleep(50 * time.Millisecond) + } + if err != nil { + t.Fatalf("issue #1119: backfill never recorded migration marker within 5s — infinite loop on path_json='[]' rows") + } + + // Verify the seeded '[]' rows still have '[]' (sanity — neither bug nor fix + // should change their value), and that there are no NULL/empty path_json rows + // the backfill should have processed. + var bracketCount int + store.db.QueryRow("SELECT COUNT(*) FROM observations WHERE path_json = '[]'").Scan(&bracketCount) + if bracketCount != seedCount { + t.Errorf("expected %d rows with path_json='[]', got %d", seedCount, bracketCount) + } +}