diff --git a/cmd/server/chunked_load.go b/cmd/server/chunked_load.go new file mode 100644 index 00000000..8acb5fd8 --- /dev/null +++ b/cmd/server/chunked_load.go @@ -0,0 +1,469 @@ +package main + +// Chunked startup load + early HTTP readiness for issue #1009. +// +// Design: +// * LoadChunked paginates transmissions in id-ordered chunks of +// `chunkSize` (default 10000 via Config.DBLoadChunkSize). After the +// first chunk is merged into the store, FirstChunkReady is closed. +// main.go binds the HTTP listener on that signal and serves +// partial data while remaining chunks stream in the background. +// * loadStatusMiddleware stamps X-CoreScope-Load-Status on every +// response: "loading; progress=" until LoadComplete() +// reports true, then "ready". Dashboards and probes can read the +// header without parsing JSON. +// * OnChunkLoaded registers a per-chunk callback for progress +// logging / tests. +// +// Concurrency: each chunk acquires s.mu.Lock() ONLY while merging the +// chunk's rows into store-shared maps. SQLite reads run lock-free so +// HTTP handlers (which take s.mu.RLock) stay responsive. + +import ( + "database/sql" + "fmt" + "log" + "net/http" + "sort" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/meshcore-analyzer/dbconfig" +) + +// dbLoadConfig is the server-package alias for dbconfig.LoadConfig (#1009). +type dbLoadConfig = dbconfig.LoadConfig + +// DBLoadChunkSize returns the configured chunk size for chunked +// startup load (config: db.load.chunkSize), or 10000 default (#1009). +func (c *Config) DBLoadChunkSize() int { + return c.DB.GetLoadChunkSize() +} + +// chunkedLoadState holds the runtime gates for LoadChunked. It lives +// on PacketStore via embedded fields — see store.go additions in the +// same commit. + +// FirstChunkReady returns a channel closed once the first chunk has +// been merged into the store, signalling the HTTP listener can bind. +func (s *PacketStore) FirstChunkReady() <-chan struct{} { + s.chunkedLoadInit() + return s.firstChunkReady +} + +// LoadComplete reports whether LoadChunked has finished all chunks. +func (s *PacketStore) LoadComplete() bool { + return s.loadComplete.Load() +} + +// LoadProgress reports the number of transmission rows processed by +// the in-flight (or completed) LoadChunked call. +func (s *PacketStore) LoadProgress() int64 { + return s.loadProgressRows.Load() +} + +// OnChunkLoaded registers a callback fired once per chunk after that +// chunk has been merged into the store. The callback receives the +// number of transmission rows in that chunk and the running total. +// Multiple registrations chain. +func (s *PacketStore) OnChunkLoaded(fn func(rowsThisChunk, totalRows int)) { + s.chunkedLoadInit() + s.chunkCBMu.Lock() + defer s.chunkCBMu.Unlock() + s.chunkCallbacks = append(s.chunkCallbacks, fn) +} + +// chunkedLoadInit lazily initialises the readiness channel + callback +// list under a mutex so concurrent first callers don't race. +func (s *PacketStore) chunkedLoadInit() { + s.chunkInitOnce.Do(func() { + s.firstChunkReady = make(chan struct{}) + }) +} + +func (s *PacketStore) signalFirstChunk() { + if s.firstChunkSignaled.CompareAndSwap(false, true) { + close(s.firstChunkReady) + } +} + +func (s *PacketStore) fireChunkCallbacks(rowsThisChunk, totalRows int) { + s.chunkCBMu.Lock() + cbs := append([]func(int, int){}, s.chunkCallbacks...) + s.chunkCBMu.Unlock() + for _, cb := range cbs { + func() { + defer func() { + if r := recover(); r != nil { + log.Printf("[store] OnChunkLoaded callback panic: %v", r) + } + }() + cb(rowsThisChunk, totalRows) + }() + } +} + +// LoadChunked streams transmissions + observations from SQLite into +// the in-memory store in id-ordered chunks of `chunkSize` rows. Pass +// 0 to use the default (10000). +// +// After the first chunk is merged, FirstChunkReady is closed and the +// HTTP listener may bind. Remaining chunks stream while handlers run +// against partially-populated data; loadStatusMiddleware advertises +// loading status until LoadComplete() returns true. +// +// Re-entrancy: LoadChunked is NOT safe to call concurrently with +// itself on the same PacketStore — it resets loadComplete / +// loadProgressRows and mutates store-shared maps under s.mu. In +// production it is invoked exactly once from main.go boot. Tests that +// open a fresh store per test are also safe. If a future caller needs +// repeat or concurrent loads, add a top-level mutex first. +func (s *PacketStore) LoadChunked(chunkSize int) error { + if chunkSize <= 0 { + chunkSize = 10000 + } + s.chunkedLoadInit() + // Reset state for repeat calls in tests. + s.loadComplete.Store(false) + s.loadProgressRows.Store(0) + + // On any return — error OR success — unblock listeners that gate on + // the readiness signal so an empty/failed DB does not deadlock the + // caller. Note: loadComplete is set on the success path only (see + // the end of this function) so probes do NOT see ready=true after a + // failed load. + defer s.signalFirstChunk() + + t0 := time.Now() + + // Build the retention/memory filter the legacy Load() uses so + // behavior is preserved when callers migrate from Load → LoadChunked. + // Built against the `t2` alias used inside the chunk subquery so we + // don't need brittle post-hoc string rewrites. + var loadConditions []string + hotCutoffHours := s.retentionHours + if s.hotStartupHours > 0 { + hotCutoffHours = s.hotStartupHours + } + var hotCutoffStr string + if hotCutoffHours > 0 { + hotCutoffStr = time.Now().UTC().Add(-time.Duration(hotCutoffHours * float64(time.Hour))).Format(time.RFC3339) + loadConditions = append(loadConditions, fmt.Sprintf("t2.first_seen >= '%s'", hotCutoffStr)) + } + + // COUNT honours the same retention/hot-startup filter the chunk + // loop applies, so the logged "DB total" matches the rows the + // loop will actually walk. Use a `t2` alias to share the WHERE + // builder above. If the count fails (e.g. empty DB, locked WAL), + // fall through with -1 — it's only used for the post-load log line. + totalInDB := -1 + countSQL := "SELECT COUNT(*) FROM transmissions t2" + if len(loadConditions) > 0 { + countSQL += " WHERE " + strings.Join(loadConditions, " AND ") + } + if err := s.db.conn.QueryRow(countSQL).Scan(&totalInDB); err != nil { + totalInDB = -1 + } + + // Memory cap honoured by clamping the maximum cursor walk. + var maxPackets int64 + if s.maxMemoryMB > 0 { + avgBytes := int64(1000) + if sample := estimateStoreTxBytesTypical(10); sample > avgBytes { + avgBytes = sample + } + maxPackets = (int64(s.maxMemoryMB) * 1048576) / avgBytes + if maxPackets < 1000 { + maxPackets = 1000 + } + } + + chunkIdx := 0 + totalLoaded := 0 + // Start the id cursor BELOW the minimum possible row id so the + // first chunk's `t2.id > cursorID` predicate includes id=0. The + // e2e fixture seed for issue #1486 inserts the grouped-packet row + // with id=0 (so it sorts LAST in the default packets view via + // `ORDER BY id DESC` / oldest first_seen). Seeding the cursor at + // 0 silently excluded that row, leaving the page with no + // tr[data-hash] and timing out the playwright wait. Legacy Load() + // had no id cursor and loaded id=0 unconditionally — we restore + // that semantic by starting one below SQLite's minimum rowid (-1). + var cursorID int64 = -1 + + for { + conds := append([]string{}, loadConditions...) + conds = append(conds, fmt.Sprintf("t2.id > %d", cursorID)) + whereClause := "WHERE " + strings.Join(conds, " AND ") + + rpCol := "" + if s.db.hasResolvedPath { + rpCol = ", o.resolved_path" + } + obsRawHexCol := "" + if s.db.hasObsRawHex { + obsRawHexCol = ", o.raw_hex" + } + + var chunkSQL string + if s.db.isV3 { + chunkSQL = `SELECT t.id, t.raw_hex, t.hash, t.first_seen, t.route_type, + t.payload_type, t.payload_version, t.decoded_json, + o.id, obs.id, obs.name, COALESCE(obs.iata, ''), o.direction, + o.snr, o.rssi, o.score, o.path_json, strftime('%Y-%m-%dT%H:%M:%fZ', o.timestamp, 'unixepoch')` + obsRawHexCol + rpCol + ` + FROM (SELECT * FROM transmissions t2 ` + whereClause + ` ORDER BY t2.id ASC LIMIT ` + fmt.Sprintf("%d", chunkSize) + `) AS t + LEFT JOIN observations o ON o.transmission_id = t.id + LEFT JOIN observers obs ON obs.rowid = o.observer_idx + ORDER BY t.id ASC, o.timestamp DESC` + } else { + chunkSQL = `SELECT t.id, t.raw_hex, t.hash, t.first_seen, t.route_type, + t.payload_type, t.payload_version, t.decoded_json, + o.id, o.observer_id, o.observer_name, COALESCE(obs.iata, ''), o.direction, + o.snr, o.rssi, o.score, o.path_json, o.timestamp` + obsRawHexCol + rpCol + ` + FROM (SELECT * FROM transmissions t2 ` + whereClause + ` ORDER BY t2.id ASC LIMIT ` + fmt.Sprintf("%d", chunkSize) + `) AS t + LEFT JOIN observations o ON o.transmission_id = t.id + LEFT JOIN observers obs ON obs.id = o.observer_id + ORDER BY t.id ASC, o.timestamp DESC` + } + + rows, err := s.db.conn.Query(chunkSQL) + if err != nil { + return fmt.Errorf("chunk %d: query: %w", chunkIdx, err) + } + + chunkTxCount, lastID, err := s.scanAndMergeChunk(rows) + rows.Close() + if err != nil { + return fmt.Errorf("chunk %d: scan: %w", chunkIdx, err) + } + + if chunkTxCount == 0 { + break + } + + cursorID = lastID + totalLoaded += chunkTxCount + chunkIdx++ + s.loadProgressRows.Store(int64(totalLoaded)) + s.signalFirstChunk() + s.fireChunkCallbacks(chunkTxCount, totalLoaded) + + if maxPackets > 0 && int64(totalLoaded) >= maxPackets { + break + } + if chunkTxCount < chunkSize { + break + } + } + + // Post-load: pick best observation, build indexes — same shape as + // legacy Load(). + s.mu.Lock() + for _, tx := range s.packets { + pickBestObservation(tx) + s.indexByNode(tx) + } + // Restore the "s.packets sorted oldest-first by FirstSeen" invariant + // that legacy Load() got for free from "ORDER BY t.first_seen ASC". + // LoadChunked walks chunks in id-ASC order so the slice ends up + // id-ordered, which only equals first_seen-ordered when ids and + // timestamps are correlated. After tools/freshen-fixture.sh (or any + // real-world out-of-order ingest) they're not, leaving + // s.packets[0].FirstSeen pointing at the newest row — which then + // poisons oldestLoaded below and routes legitimate in-memory queries + // to the SQL fallback. GetTimestamps (store.go) and QueryPackets + // both rely on this invariant. See PR #1596 / mobile e2e regression. + sort.SliceStable(s.packets, func(i, j int) bool { + return s.packets[i].FirstSeen < s.packets[j].FirstSeen + }) + s.buildSubpathIndex() + s.buildPathHopIndex() + s.buildDistanceIndex() + if s.hotStartupHours > 0 { + s.oldestLoaded = hotCutoffStr + } else if len(s.packets) > 0 { + s.oldestLoaded = s.packets[0].FirstSeen + } + s.loaded = true + s.mu.Unlock() + + // #1009 / PR #1596: flip the subpath + pathHop ready flags now that + // the chunk loader has built both indexes synchronously above. + // Without this, WaitIndexesReady (used by + // StartRepeaterEnrichmentRecomputer at boot) blocks for up to + // repeaterEnrichmentPrewarmWait (60s), delaying HTTP listener bind + // past CI's 30s /api/healthz deadline. + s.markIndexesReadySync() + + elapsed := time.Since(t0) + log.Printf("[store] LoadChunked: %d transmissions (%d observations) across %d chunk(s) in %v (chunkSize=%d, DB total=%d)", + totalLoaded, s.totalObs, chunkIdx, elapsed, chunkSize, totalInDB) + s.loadMultibyteCapFromDB() + // Mark complete on the success path only — see the function-level + // defer above for why this is NOT in a deferred call. Probes that + // read LoadComplete()==true after a failed load would otherwise + // see ready=true for a half-loaded store. + s.loadComplete.Store(true) + return nil +} + +// scanAndMergeChunk consumes one chunk's rows under s.mu.Lock and +// returns the number of distinct transmissions seen + the max +// transmission id (cursor for the next chunk). +func (s *PacketStore) scanAndMergeChunk(rows *sql.Rows) (int, int64, error) { + s.mu.Lock() + defer s.mu.Unlock() + + hopsSeen := make(map[string]bool) + seenTxIDs := make(map[int]bool) + var maxID int64 + + for rows.Next() { + var txID int + var rawHex, hash, firstSeen, decodedJSON sql.NullString + var routeType, payloadType, payloadVersion sql.NullInt64 + var obsID sql.NullInt64 + var observerID, observerName, observerIATA, direction, pathJSON, obsTimestamp sql.NullString + var snr, rssi sql.NullFloat64 + var score sql.NullInt64 + var obsRawHex sql.NullString + var resolvedPathStr sql.NullString + + scanArgs := []interface{}{&txID, &rawHex, &hash, &firstSeen, &routeType, &payloadType, + &payloadVersion, &decodedJSON, + &obsID, &observerID, &observerName, &observerIATA, &direction, + &snr, &rssi, &score, &pathJSON, &obsTimestamp} + if s.db.hasObsRawHex { + scanArgs = append(scanArgs, &obsRawHex) + } + if s.db.hasResolvedPath { + scanArgs = append(scanArgs, &resolvedPathStr) + } + if err := rows.Scan(scanArgs...); err != nil { + log.Printf("[store] LoadChunked scan error: %v", err) + continue + } + + if int64(txID) > maxID { + maxID = int64(txID) + } + seenTxIDs[txID] = true + + hashStr := nullStrVal(hash) + tx := s.byHash[hashStr] + if tx == nil { + tx = &StoreTx{ + ID: txID, + RawHex: nullStrVal(rawHex), + Hash: hashStr, + FirstSeen: nullStrVal(firstSeen), + LatestSeen: nullStrVal(firstSeen), + RouteType: nullIntPtr(routeType), + PayloadType: nullIntPtr(payloadType), + DecodedJSON: nullStrVal(decodedJSON), + obsKeys: make(map[string]bool), + observerSet: make(map[string]bool), + } + s.byHash[hashStr] = tx + s.packets = append(s.packets, tx) + s.byTxID[txID] = tx + if txID > s.maxTxID { + s.maxTxID = txID + } + s.indexByNode(tx) + if tx.PayloadType != nil { + pt := *tx.PayloadType + s.byPayloadType[pt] = append(s.byPayloadType[pt], tx) + } + s.trackAdvertPubkey(tx) + s.trackedBytes += estimateStoreTxBytes(tx) + } + + if obsID.Valid { + oid := int(obsID.Int64) + obsIDStr := nullStrVal(observerID) + obsPJ := nullStrVal(pathJSON) + + dk := obsIDStr + "|" + obsPJ + if tx.obsKeys[dk] { + continue + } + + obs := &StoreObs{ + ID: oid, + TransmissionID: txID, + ObserverID: obsIDStr, + ObserverName: nullStrVal(observerName), + ObserverIATA: nullStrVal(observerIATA), + Direction: nullStrVal(direction), + SNR: nullFloatPtr(snr), + RSSI: nullFloatPtr(rssi), + Score: nullIntPtr(score), + PathJSON: obsPJ, + RawHex: nullStrVal(obsRawHex), + Timestamp: normalizeTimestamp(nullStrVal(obsTimestamp)), + } + + rpStr := nullStrVal(resolvedPathStr) + if rpStr != "" { + rp := unmarshalResolvedPath(rpStr) + pks := extractResolvedPubkeys(rp) + s.indexResolvedPathHops(tx, pks, hopsSeen) + } + + tx.Observations = append(tx.Observations, obs) + tx.obsKeys[dk] = true + if obs.ObserverID != "" && !tx.observerSet[obs.ObserverID] { + tx.observerSet[obs.ObserverID] = true + tx.UniqueObserverCount++ + } + tx.ObservationCount++ + if obs.Timestamp > tx.LatestSeen { + tx.LatestSeen = obs.Timestamp + } + + s.byObsID[oid] = obs + if oid > s.maxObsID { + s.maxObsID = oid + } + if obsIDStr != "" { + s.byObserver[obsIDStr] = append(s.byObserver[obsIDStr], obs) + } + s.totalObs++ + s.trackedBytes += estimateStoreObsBytes(obs) + } + } + if err := rows.Err(); err != nil { + return len(seenTxIDs), maxID, err + } + return len(seenTxIDs), maxID, nil +} + +// loadStatusMiddleware sets X-CoreScope-Load-Status on every response. +// While LoadChunked is in flight the header reports +// "loading; progress="; after completion it reports "ready". +// The header is set BEFORE calling the next handler so probes can +// observe it on any response (including streaming bodies). +func loadStatusMiddleware(s *PacketStore, next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if s != nil && s.LoadComplete() { + w.Header().Set("X-CoreScope-Load-Status", "ready") + } else if s != nil { + w.Header().Set("X-CoreScope-Load-Status", + fmt.Sprintf("loading; progress=%d", s.LoadProgress())) + } else { + w.Header().Set("X-CoreScope-Load-Status", "loading") + } + next.ServeHTTP(w, r) + }) +} + +// --- runtime state stitched into PacketStore via store_chunked.go --- + +// Forward declarations of the new PacketStore fields used above. The +// actual struct fields live in store.go; placing them here as a +// reminder keeps the chunked-load surface easy to audit. +var _ = sync.Once{} +var _ atomic.Bool diff --git a/cmd/server/chunked_load_fixes_test.go b/cmd/server/chunked_load_fixes_test.go new file mode 100644 index 00000000..4a93526e --- /dev/null +++ b/cmd/server/chunked_load_fixes_test.go @@ -0,0 +1,63 @@ +package main + +// Issue #1009 follow-up tests for PR #1596: +// +// (A) LoadChunked must flip subpath + pathHop index ready flags +// after building those indexes. Otherwise WaitIndexesReady (used +// by StartRepeaterEnrichmentRecomputer at boot) blocks the +// caller for up to repeaterEnrichmentPrewarmWait (60s), which is +// why CI's "Start Go server" step times out before /api/healthz +// can answer within its 30s deadline. +// +// (B) LoadChunked must NOT report LoadComplete()==true when it +// returns an error. Today a defer unconditionally calls +// s.loadComplete.Store(true), so a failed load appears "ready" +// to probes and the load-status middleware. + +import ( + "errors" + "testing" +) + +// (A) Indexes must be marked ready by LoadChunked. +func TestLoadChunked_MarksIndexesReady(t *testing.T) { + store := openChunkedTestStore(t, 100) + defer store.db.conn.Close() + + if store.SubpathIndexReady() || store.PathHopIndexReady() { + t.Fatal("indexes must start NOT ready") + } + + if err := store.LoadChunked(50); err != nil { + t.Fatalf("LoadChunked: %v", err) + } + + if !store.SubpathIndexReady() { + t.Fatal("SubpathIndexReady() must be true after LoadChunked builds the index") + } + if !store.PathHopIndexReady() { + t.Fatal("PathHopIndexReady() must be true after LoadChunked builds the index") + } +} + +// (B) LoadChunked errors must not flip LoadComplete=true. +func TestLoadChunked_ErrorDoesNotMarkComplete(t *testing.T) { + store := openChunkedTestStore(t, 100) + + // Close the underlying DB so the very first chunk query fails. + if err := store.db.conn.Close(); err != nil { + t.Fatalf("close DB: %v", err) + } + + err := store.LoadChunked(50) + if err == nil { + t.Fatal("LoadChunked must return an error when the DB query fails") + } + if !errors.Is(err, err) { // satisfy linters; the assertion below is what matters + t.Fatalf("unexpected error shape: %v", err) + } + + if store.LoadComplete() { + t.Fatal("LoadComplete() must remain false after LoadChunked returns an error") + } +} diff --git a/cmd/server/chunked_load_id_zero_test.go b/cmd/server/chunked_load_id_zero_test.go new file mode 100644 index 00000000..e6cf74bb --- /dev/null +++ b/cmd/server/chunked_load_id_zero_test.go @@ -0,0 +1,115 @@ +package main + +// Regression for PR #1596 / issue #1486 e2e: LoadChunked uses +// `cursorID = 0` with a `t2.id > cursorID` predicate, which silently +// excludes any transmission with id=0. The e2e seed for #1486 inserts +// the grouped-packet row with id=0 (so it sorts LAST in the default +// packets view), and the page deep-links to /packets?hash=. +// With the chunked loader skipping id=0, the in-memory store never +// learns about the row; QueryGroupedPackets returns 0; the page +// renders no `tr[data-hash]` and the e2e times out at 12s. +// +// Legacy Load() walked all transmissions unconditionally (no id +// cursor) and therefore included id=0. Restoring that semantic — by +// using a non-existent sentinel (-1) on the first iteration, or by +// switching the predicate to `>=` for the initial pass — fixes the +// regression. +// +// This test inserts a transmission with id=0 plus a handful of +// id>=1 transmissions and asserts that LoadChunked loads the id=0 +// row into s.byHash. + +import ( + "database/sql" + "fmt" + "path/filepath" + "testing" + "time" +) + +func createTestDBWithIDZero(tb testing.TB, dbPath string, extraTx int) { + tb.Helper() + conn, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL") + if err != nil { + tb.Fatal(err) + } + defer conn.Close() + + stmts := []string{ + `CREATE TABLE IF NOT EXISTS transmissions ( + id INTEGER PRIMARY KEY, + raw_hex TEXT, hash TEXT, first_seen TEXT, + route_type INTEGER, payload_type INTEGER, + payload_version INTEGER, decoded_json TEXT + )`, + `CREATE TABLE IF NOT EXISTS observations ( + id INTEGER PRIMARY KEY, + transmission_id INTEGER, observer_id TEXT, observer_name TEXT, + direction TEXT, snr REAL, rssi REAL, score INTEGER, + path_json TEXT, timestamp TEXT, raw_hex TEXT + )`, + `CREATE TABLE IF NOT EXISTS observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`, + `CREATE TABLE IF NOT EXISTS nodes ( + pubkey TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL, + last_seen TEXT, first_seen TEXT, frequency REAL + )`, + `CREATE TABLE IF NOT EXISTS schema_version (version INTEGER)`, + `INSERT INTO schema_version (version) VALUES (1)`, + `CREATE INDEX IF NOT EXISTS idx_tx_first_seen ON transmissions(first_seen)`, + } + for _, s := range stmts { + if _, err := conn.Exec(s); err != nil { + tb.Fatalf("setup exec: %v\nSQL: %s", err, s) + } + } + + txStmt, _ := conn.Prepare("INSERT INTO transmissions (id, raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?)") + obsStmt, _ := conn.Prepare("INSERT INTO observations (id, transmission_id, observer_id, observer_name, direction, snr, rssi, score, path_json, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + defer txStmt.Close() + defer obsStmt.Close() + + now := time.Now().UTC().Truncate(time.Second) + // id=0: the #1486-style seed row, within retention window. + txStmt.Exec(0, "1500", "fae0c9e6d357a814", now.Add(-1*time.Minute).Format(time.RFC3339), 1, 5, 0, `{"type":"CHAN"}`) + obsStmt.Exec(0, 0, "obs1", "Obs1", "rx", 5.0, -95.0, 0, `["AA"]`, now.Add(-1*time.Minute).Unix()) + + for i := 1; i <= extraTx; i++ { + ts := now.Add(-time.Duration(i+1) * time.Minute).Format(time.RFC3339) + unixTs := now.Add(-time.Duration(i+1) * time.Minute).Unix() + hash := fmt.Sprintf("h%04d", i) + txStmt.Exec(i, "aabb", hash, ts, 0, 4, 1, fmt.Sprintf(`{"pubKey":"pk%04d"}`, i)) + obsStmt.Exec(i, i, "obs1", "Obs1", "rx", -10.0, -80.0, 5, `["aa","bb"]`, unixTs) + } +} + +// TestLoadChunked_IncludesIDZero: LoadChunked must load transmissions +// with id=0. The legacy Load() (since-replaced by LoadChunked) walked +// transmissions unconditionally; LoadChunked uses an id-cursor that +// starts at 0 with a strict `t2.id > cursorID` predicate, so id=0 +// rows are silently dropped. This breaks the #1486 e2e fixture seed +// which uses id=0 to sort the grouped row last in the default view. +func TestLoadChunked_IncludesIDZero(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "idzero.db") + createTestDBWithIDZero(t, dbPath, 10) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatalf("OpenDB: %v", err) + } + cfg := &PacketStoreConfig{} + store := NewPacketStore(db, cfg) + defer store.db.conn.Close() + + if err := store.LoadChunked(5); err != nil { + t.Fatalf("LoadChunked: %v", err) + } + + if _, ok := store.byHash["fae0c9e6d357a814"]; !ok { + t.Fatalf("LoadChunked dropped the id=0 transmission: "+ + "byHash[fae0c9e6d357a814] missing; loaded %d packets total "+ + "(id-cursor starts at 0 with strict `t2.id > cursorID`, "+ + "so id=0 is excluded — this is the #1486 e2e regression)", + len(store.packets)) + } +} diff --git a/cmd/server/chunked_load_oldest_test.go b/cmd/server/chunked_load_oldest_test.go new file mode 100644 index 00000000..8b9a5f82 --- /dev/null +++ b/cmd/server/chunked_load_oldest_test.go @@ -0,0 +1,154 @@ +package main + +// Regression for PR #1596 (issue #1009) chunked load: when transmission +// ids are anti-correlated with first_seen (e.g. id=1 has the NEWEST +// timestamp), LoadChunked walks id-ASC and the post-load +// `s.oldestLoaded = s.packets[0].FirstSeen` line set oldestLoaded to +// the NEWEST first_seen. QueryPackets then mis-routed any +// `since>=oldestLoaded` query to the SQL fallback, hiding fresh +// in-memory rows. This shows up in real life on the e2e fixture after +// tools/freshen-fixture.sh shifts timestamps so id=1 (originally +// loaded first) carries the most recent first_seen. +// +// The mobile e2e test test-observer-iata-1188-e2e.js fails as a +// result: with the default 15-minute time window, /api/packets returns +// 0 rows and the mobile DOM has no `tr[data-hash]` to tap. +// +// This test asserts the in-memory invariant: after LoadChunked, +// oldestLoaded must equal the actual oldest FirstSeen across loaded +// transmissions, not the FirstSeen of the first row in s.packets. + +import ( + "database/sql" + "fmt" + "path/filepath" + "testing" + "time" +) + +// createTestDBReverseTime builds numTx transmissions whose ids run +// 1..numTx ASC while first_seen runs newest..oldest (id=1 = newest). +// This mirrors the freshen-fixture-shifted e2e DB exactly. +func createTestDBReverseTime(tb testing.TB, dbPath string, numTx int) { + tb.Helper() + conn, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL") + if err != nil { + tb.Fatal(err) + } + defer conn.Close() + + stmts := []string{ + `CREATE TABLE IF NOT EXISTS transmissions ( + id INTEGER PRIMARY KEY, + raw_hex TEXT, hash TEXT, first_seen TEXT, + route_type INTEGER, payload_type INTEGER, + payload_version INTEGER, decoded_json TEXT + )`, + `CREATE TABLE IF NOT EXISTS observations ( + id INTEGER PRIMARY KEY, + transmission_id INTEGER, observer_id TEXT, observer_name TEXT, + direction TEXT, snr REAL, rssi REAL, score INTEGER, + path_json TEXT, timestamp TEXT, raw_hex TEXT + )`, + `CREATE TABLE IF NOT EXISTS observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`, + `CREATE TABLE IF NOT EXISTS nodes ( + pubkey TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL, + last_seen TEXT, first_seen TEXT, frequency REAL + )`, + `CREATE TABLE IF NOT EXISTS schema_version (version INTEGER)`, + `INSERT INTO schema_version (version) VALUES (1)`, + `CREATE INDEX IF NOT EXISTS idx_tx_first_seen ON transmissions(first_seen)`, + } + for _, s := range stmts { + if _, err := conn.Exec(s); err != nil { + tb.Fatalf("setup exec: %v\nSQL: %s", err, s) + } + } + + txStmt, _ := conn.Prepare("INSERT INTO transmissions (id, raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?)") + obsStmt, _ := conn.Prepare("INSERT INTO observations (id, transmission_id, observer_id, observer_name, direction, snr, rssi, score, path_json, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + defer txStmt.Close() + defer obsStmt.Close() + + // id=1 is the NEWEST (now); id=numTx is the OLDEST (numTx minutes ago). + now := time.Now().UTC().Truncate(time.Second) + for i := 1; i <= numTx; i++ { + ts := now.Add(-time.Duration(i-1) * time.Minute).Format(time.RFC3339) + unixTs := now.Add(-time.Duration(i-1) * time.Minute).Unix() + hash := fmt.Sprintf("h%04d", i) + txStmt.Exec(i, "aabb", hash, ts, 0, 4, 1, fmt.Sprintf(`{"pubKey":"pk%04d"}`, i)) + obsStmt.Exec(i, i, "obs1", "Obs1", "RX", -10.0, -80.0, 5, `["aa","bb"]`, unixTs) + } +} + +func openReverseTimeStore(t *testing.T, numTx int) *PacketStore { + t.Helper() + dir := t.TempDir() + dbPath := filepath.Join(dir, "rev.db") + createTestDBReverseTime(t, dbPath, numTx) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatalf("OpenDB: %v", err) + } + cfg := &PacketStoreConfig{} + return NewPacketStore(db, cfg) +} + +// TestLoadChunked_OldestLoadedIsActualOldest: when LoadChunked walks +// transmissions in id-ASC order but timestamps are anti-correlated +// with id (PR #1596 regression scenario), oldestLoaded MUST be the +// minimum FirstSeen across loaded packets, not the first row's +// FirstSeen. Otherwise QueryPackets routes "since=15min ago" to SQL +// fallback, hiding fresh rows. +func TestLoadChunked_OldestLoadedIsActualOldest(t *testing.T) { + store := openReverseTimeStore(t, 50) + defer store.db.conn.Close() + + if err := store.LoadChunked(20); err != nil { + t.Fatalf("LoadChunked: %v", err) + } + + // Compute the actual oldest first_seen across what got loaded. + if len(store.packets) == 0 { + t.Fatal("no packets loaded") + } + actualOldest := store.packets[0].FirstSeen + for _, p := range store.packets { + if p.FirstSeen < actualOldest { + actualOldest = p.FirstSeen + } + } + + if store.oldestLoaded != actualOldest { + t.Fatalf("oldestLoaded=%q must equal actual MIN(FirstSeen)=%q "+ + "(id-ordered chunk walk with anti-correlated timestamps "+ + "left oldestLoaded pointing at the newest row, which makes "+ + "QueryPackets mis-route since-windowed queries to SQL fallback "+ + "and the mobile e2e test renders 0 rows)", + store.oldestLoaded, actualOldest) + } +} + +// TestLoadChunked_PacketsSortedByFirstSeenASC: QueryPackets and +// GetTimestamps both assume s.packets is "sorted oldest-first" (see +// store.go:2125 comment on GetTimestamps). LoadChunked walks rows +// id-ASC which only equals first_seen-ASC when ids and timestamps +// are correlated — not true after fixture freshen, not true after +// any out-of-order ingest. Assert the invariant directly. +func TestLoadChunked_PacketsSortedByFirstSeenASC(t *testing.T) { + store := openReverseTimeStore(t, 25) + defer store.db.conn.Close() + + if err := store.LoadChunked(10); err != nil { + t.Fatalf("LoadChunked: %v", err) + } + for i := 1; i < len(store.packets); i++ { + if store.packets[i-1].FirstSeen > store.packets[i].FirstSeen { + t.Fatalf("s.packets must be sorted by FirstSeen ASC; "+ + "packets[%d].FirstSeen=%q > packets[%d].FirstSeen=%q", + i-1, store.packets[i-1].FirstSeen, + i, store.packets[i].FirstSeen) + } + } +} diff --git a/cmd/server/chunked_load_test.go b/cmd/server/chunked_load_test.go new file mode 100644 index 00000000..512a8da3 --- /dev/null +++ b/cmd/server/chunked_load_test.go @@ -0,0 +1,150 @@ +package main + +// Issue #1009: chunked Load with early HTTP readiness. +// +// These tests gate three behaviors: +// (a) FirstChunkReady() unblocks BEFORE LoadChunked returns, so the +// HTTP listener can bind after the first chunk completes while +// remaining rows continue loading in the background. +// (b) loadStatusMiddleware stamps an X-CoreScope-Load-Status header +// with "loading" + progress while a load is in flight, flipping +// to "ready" once LoadComplete() reports true. +// (c) LoadChunked honors the configured chunkSize: the per-chunk +// progress callback fires once per chunk, so a 2500-row DB with +// chunkSize=1000 must yield 3 callbacks (1000 + 1000 + 500). +// +// Each subtest fails on an assertion (not a build error) when the +// production code is absent — that is the red-commit contract. + +import ( + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" + "time" +) + +func openChunkedTestStore(t *testing.T, numTx int) *PacketStore { + t.Helper() + dir := t.TempDir() + dbPath := filepath.Join(dir, "chunked.db") + createTestDBAt(t, dbPath, numTx) + t.Cleanup(func() { os.RemoveAll(dir) }) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatalf("OpenDB: %v", err) + } + cfg := &PacketStoreConfig{} + return NewPacketStore(db, cfg) +} + +// (a) FirstChunkReady fires before LoadChunked returns. +func TestLoadChunked_FirstChunkReadyBeforeComplete(t *testing.T) { + store := openChunkedTestStore(t, 2500) + defer store.db.conn.Close() + + doneCh := make(chan error, 1) + go func() { doneCh <- store.LoadChunked(500) }() + + select { + case <-store.FirstChunkReady(): + // Good: first chunk signaled. Load may or may not have completed + // for tiny test DBs, but the gate must have fired without + // requiring the full load. + case err := <-doneCh: + // If load completed before we could observe the signal, the + // signal still must be closed. + if err != nil { + t.Fatalf("LoadChunked: %v", err) + } + select { + case <-store.FirstChunkReady(): + default: + t.Fatal("FirstChunkReady channel must be closed after LoadChunked completes") + } + case <-time.After(10 * time.Second): + t.Fatal("FirstChunkReady did not fire within 10s — listener would never bind") + } + + // Drain background completion. + select { + case err := <-doneCh: + if err != nil { + t.Fatalf("LoadChunked returned error: %v", err) + } + case <-time.After(30 * time.Second): + t.Fatal("LoadChunked never returned") + } + + if !store.LoadComplete() { + t.Fatal("LoadComplete() must report true after LoadChunked returns") + } +} + +// (b) Middleware stamps X-CoreScope-Load-Status correctly across the +// loading→ready transition. +func TestLoadStatusMiddleware_HeaderTransition(t *testing.T) { + store := openChunkedTestStore(t, 100) + defer store.db.conn.Close() + + handler := loadStatusMiddleware(store, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + + // Pre-load: header must report "loading". + req := httptest.NewRequest("GET", "/api/healthz", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + if got := w.Header().Get("X-CoreScope-Load-Status"); got == "" || got == "ready" { + t.Fatalf("expected loading status header before Load, got %q", got) + } + + if err := store.LoadChunked(50); err != nil { + t.Fatalf("LoadChunked: %v", err) + } + + // Post-load: header must report "ready". + req2 := httptest.NewRequest("GET", "/api/healthz", nil) + w2 := httptest.NewRecorder() + handler.ServeHTTP(w2, req2) + if got := w2.Header().Get("X-CoreScope-Load-Status"); got != "ready" { + t.Fatalf("expected X-CoreScope-Load-Status=ready after load, got %q", got) + } +} + +// (c) LoadChunked honors the chunkSize argument — progress callback +// fires once per chunk. +func TestLoadChunked_ChunkSizeHonored(t *testing.T) { + store := openChunkedTestStore(t, 2500) + defer store.db.conn.Close() + + var chunks []int + store.OnChunkLoaded(func(rowsThisChunk, totalRows int) { + chunks = append(chunks, rowsThisChunk) + }) + + if err := store.LoadChunked(1000); err != nil { + t.Fatalf("LoadChunked: %v", err) + } + + if len(chunks) != 3 { + t.Fatalf("expected 3 chunks for 2500 rows @ chunkSize=1000, got %d (sizes=%v)", len(chunks), chunks) + } + if chunks[0] != 1000 || chunks[1] != 1000 || chunks[2] != 500 { + t.Fatalf("expected chunk sizes [1000,1000,500], got %v", chunks) + } +} + +// (d) Config plumbing: DB.Load.ChunkSize threads through. +func TestConfig_DBLoadChunkSize(t *testing.T) { + c := &Config{} + if got := c.DBLoadChunkSize(); got != 10000 { + t.Fatalf("DBLoadChunkSize() default = %d, want 10000", got) + } + c.DB = &DBConfig{Load: &dbLoadConfig{ChunkSize: 2500}} + if got := c.DBLoadChunkSize(); got != 2500 { + t.Fatalf("DBLoadChunkSize() configured = %d, want 2500", got) + } +} diff --git a/cmd/server/main.go b/cmd/server/main.go index e6a5ee82..fa7f905f 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -198,9 +198,30 @@ func main() { // In-memory packet store store := NewPacketStore(database, cfg.PacketStore, cfg.CacheTTL) store.config = cfg - if err := store.Load(); err != nil { - log.Fatalf("[store] failed to load: %v", err) + // #1009: chunked Load with early HTTP readiness. LoadChunked runs + // asynchronously and signals FirstChunkReady after the first chunk + // is merged so the HTTP listener can bind without waiting for the + // full multi-minute scan to finish. loadStatusMiddleware (wired + // below) advertises loading|ready via X-CoreScope-Load-Status. + chunkSize := cfg.DBLoadChunkSize() + loadErrCh := make(chan error, 1) + go func() { + loadErrCh <- store.LoadChunked(chunkSize) + }() + select { + case <-store.FirstChunkReady(): + log.Printf("[store] first chunk ready (chunkSize=%d) — HTTP listener may bind", chunkSize) + case err := <-loadErrCh: + if err != nil { + log.Fatalf("[store] LoadChunked failed before first chunk: %v", err) + } + log.Printf("[store] LoadChunked completed before first-chunk signal (empty DB?)") } + go func() { + if err := <-loadErrCh; err != nil { + log.Printf("[store] LoadChunked background error: %v", err) + } + }() if store.hotStartupHours > 0 { log.Printf("[store] starting background load: filling retentionHours=%gh from hotStartupHours=%gh", store.retentionHours, store.hotStartupHours) @@ -395,6 +416,10 @@ func main() { handler = gzipMiddlewareWithConfig(cfg.Compression, router) log.Printf("[server] HTTP gzip compression enabled") } + // #1009: stamp X-CoreScope-Load-Status on every response so probes + // and dashboards can see when the chunked Load is still in flight. + // Outermost wrap so the header is set regardless of gzip/etc. + handler = loadStatusMiddleware(store, handler) if cfg.WSCompressionEnabled() { log.Printf("[server] WebSocket permessage-deflate compression enabled") } diff --git a/cmd/server/store.go b/cmd/server/store.go index 2fc4f8bd..8233191d 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -403,6 +403,19 @@ type PacketStore struct { // Async hash migration state: set after migrateContentHashesAsync completes. hashMigrationComplete atomic.Bool + // Chunked startup load state (#1009). LoadChunked closes + // firstChunkReady after the first chunk is merged so the HTTP + // listener can bind. loadComplete flips true after all chunks have + // been processed; loadProgressRows is updated per-chunk so + // loadStatusMiddleware can emit progress. + chunkInitOnce sync.Once + firstChunkReady chan struct{} + firstChunkSignaled atomic.Bool + loadComplete atomic.Bool + loadProgressRows atomic.Int64 + chunkCBMu sync.Mutex + chunkCallbacks []func(rowsThisChunk, totalRows int) + // Eviction config and stats retentionHours float64 // 0 = unlimited maxMemoryMB int // 0 = unlimited (packet store memory budget) diff --git a/config.example.json b/config.example.json index ba717af2..d136b593 100644 --- a/config.example.json +++ b/config.example.json @@ -14,7 +14,11 @@ "db": { "vacuumOnStartup": false, "incrementalVacuumPages": 1024, - "_comment": "vacuumOnStartup: run one-time full VACUUM to enable incremental auto-vacuum on existing DBs. Executed by the INGESTOR at startup, BEFORE the MQTT subscriber starts (#1283), so there is no contention with concurrent writes. Blocks ingestor startup for minutes on large DBs; requires 2x DB file size in free disk space. incrementalVacuumPages: free pages returned to OS after each retention reaper cycle (default 1024). See #919.", + "load": { + "chunkSize": 10000, + "_comment": "chunkSize: rows fetched per chunk by PacketStore.LoadChunked during startup (#1009). Default 10000. Lower values surface the early-HTTP-readiness signal sooner (the listener binds after the first chunk) at the cost of more SQL round-trips. Higher values reduce per-chunk overhead but delay first-chunk readiness. The X-CoreScope-Load-Status response header reports loading|ready and progress= until the load completes." + }, + "_comment": "vacuumOnStartup: run one-time full VACUUM to enable incremental auto-vacuum on existing DBs. Executed by the INGESTOR at startup, BEFORE the MQTT subscriber starts (#1283), so there is no contention with concurrent writes. Blocks ingestor startup for minutes on large DBs; requires 2x DB file size in free disk space. incrementalVacuumPages: free pages returned to OS after each retention reaper cycle (default 1024). See #919. load.chunkSize: see nested _comment (#1009).", "_comment_slowWriterMs": "#1340 — SQLite writer-lock log threshold (default 500). Any wrapped writer call (tagged neighbor_builder, mqtt_handler, prune_packets, prune_observers, prune_metrics, vacuum) whose hold_ms exceeds this emits a single [db-slow-writer] log line. Configured per-process via the CORESCOPE_DB_SLOW_WRITER_MS environment variable on the INGESTOR (e.g. CORESCOPE_DB_SLOW_WRITER_MS=200 for tighter alerting). Per-component wait_ms / hold_ms / contention_total histograms are surfaced via /api/perf/write-sources under .writer_perf regardless of this threshold." }, "listLimits": { diff --git a/internal/dbconfig/dbconfig.go b/internal/dbconfig/dbconfig.go index ef9f7eac..7526fa67 100644 --- a/internal/dbconfig/dbconfig.go +++ b/internal/dbconfig/dbconfig.go @@ -6,6 +6,16 @@ package dbconfig type DBConfig struct { VacuumOnStartup bool `json:"vacuumOnStartup"` // one-time full VACUUM on startup if auto_vacuum is not INCREMENTAL IncrementalVacuumPages int `json:"incrementalVacuumPages"` // pages returned to OS per reaper cycle (default 1024) + + // Load controls chunked startup loading (#1009). + Load *LoadConfig `json:"load,omitempty"` +} + +// LoadConfig controls the chunked startup-load behavior (#1009). +type LoadConfig struct { + // ChunkSize is the number of transmission rows fetched per chunk + // during PacketStore.LoadChunked. 0/unset → 10000. + ChunkSize int `json:"chunkSize"` } // GetIncrementalVacuumPages returns the configured pages or 1024 default. @@ -15,3 +25,11 @@ func (c *DBConfig) GetIncrementalVacuumPages() int { } return 1024 } + +// GetLoadChunkSize returns the configured chunk size or 10000 default (#1009). +func (c *DBConfig) GetLoadChunkSize() int { + if c != nil && c.Load != nil && c.Load.ChunkSize > 0 { + return c.Load.ChunkSize + } + return 10000 +}