From 352967ca37fd5e2a1122dd652fa1e9043fa0a90a Mon Sep 17 00:00:00 2001 From: you Date: Sat, 4 Apr 2026 05:09:37 +0000 Subject: [PATCH] fix: address all 6 items from independent re-review 1. Hoist getCachedNodesAndPM() before observation loop in IngestNewObservations 2. Include resolved_path in IngestNewObservations broadcast maps 3. Persist resolved paths and edges to SQLite in IngestNewObservations 4. Add error logging for stmt.Exec in backfillResolvedPaths 5. Add error logging for stmt.Exec in buildAndPersistEdges 6. Remove tautological TestNeighborPersistFileCompiles test --- cmd/server/neighbor_persist.go | 16 ++++- cmd/server/neighbor_persist_test.go | 14 ----- cmd/server/store.go | 98 ++++++++++++++++++++++++++++- 3 files changed, 111 insertions(+), 17 deletions(-) diff --git a/cmd/server/neighbor_persist.go b/cmd/server/neighbor_persist.go index fcb13253..c77ce554 100644 --- a/cmd/server/neighbor_persist.go +++ b/cmd/server/neighbor_persist.go @@ -144,14 +144,20 @@ func buildAndPersistEdges(store *PacketStore, rw *sql.DB) int { defer stmt.Close() edgeCount := 0 + var firstErr error for _, pkt := range packets { for _, obs := range pkt.Observations { for _, ec := range extractEdgesFromObs(obs, pkt, pm) { - stmt.Exec(ec.A, ec.B, ec.Timestamp) + if _, err := stmt.Exec(ec.A, ec.B, ec.Timestamp); err != nil && firstErr == nil { + firstErr = err + } edgeCount++ } } } + if firstErr != nil { + log.Printf("[neighbor] edge exec error (first): %v", firstErr) + } if err := tx.Commit(); err != nil { log.Printf("[neighbor] commit error: %v", err) @@ -337,8 +343,14 @@ func backfillResolvedPaths(store *PacketStore, dbPath string) int { } defer stmt.Close() + var firstErr error for _, r := range results { - stmt.Exec(r.rpJSON, r.obsID) + if _, err := stmt.Exec(r.rpJSON, r.obsID); err != nil && firstErr == nil { + firstErr = err + } + } + if firstErr != nil { + log.Printf("[store] backfill resolved_path exec error (first): %v", firstErr) } if err := sqlTx.Commit(); err != nil { diff --git a/cmd/server/neighbor_persist_test.go b/cmd/server/neighbor_persist_test.go index d3c72994..5f9bf13e 100644 --- a/cmd/server/neighbor_persist_test.go +++ b/cmd/server/neighbor_persist_test.go @@ -524,18 +524,4 @@ func TestExtractEdgesFromObs_SameNodeNoEdge(t *testing.T) { } } -func TestNeighborPersistFileCompiles(t *testing.T) { - _ = openRW - _ = ensureNeighborEdgesTable - _ = ensureResolvedPathColumn - _ = loadNeighborEdgesFromDB - _ = persistEdge - _ = resolvePathForObs - _ = marshalResolvedPath - _ = unmarshalResolvedPath - _ = backfillResolvedPaths - _ = buildAndPersistEdges - _ = neighborEdgesTableExists - _ = extractEdgesFromObs -} diff --git a/cmd/server/store.go b/cmd/server/store.go index 5d5e4ad5..5015c540 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -1490,6 +1490,10 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] updatedTxs := make(map[int]*StoreTx) broadcastMaps := make([]map[string]interface{}, 0, len(obsRows)) + // Hoist getCachedNodesAndPM() before the loop — same pattern as IngestNewFromDB (review fix #1). + _, pm := s.getCachedNodesAndPM() + graphRef := s.graph + for _, r := range obsRows { // Already ingested (e.g. by IngestNewFromDB in same cycle) if _, exists := s.byObsID[r.obsID]; exists { @@ -1525,7 +1529,6 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] // Resolve path at ingest time for late-arriving observations (review item #2). if r.pathJSON != "" && r.pathJSON != "[]" { - _, pm := s.getCachedNodesAndPM() if pm != nil { obs.ResolvedPath = resolvePathForObs(r.pathJSON, r.observerID, tx, pm, s.graph) } @@ -1573,6 +1576,9 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] "direction": strOrNil(obs.Direction), "observation_count": tx.ObservationCount, } + if obs.ResolvedPath != nil { + pkt["resolved_path"] = obs.ResolvedPath + } broadcastMap := make(map[string]interface{}, len(pkt)+2) for k, v := range pkt { broadcastMap[k] = v @@ -1637,6 +1643,96 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] }) } + // Persist resolved paths and neighbor edges asynchronously (review fix #3). + // Same pattern as IngestNewFromDB — late observations need persistence too. + if len(updatedTxs) > 0 && s.db != nil { + dbPath := s.db.path + type persistObs struct { + obsID int + resolvedPath string + } + type persistEdge struct { + a, b, ts string + } + var obsUpdates []persistObs + var edgeUpdates []persistEdge + + for _, tx := range updatedTxs { + for _, obs := range tx.Observations { + if obs.ResolvedPath != nil { + rpJSON := marshalResolvedPath(obs.ResolvedPath) + if rpJSON != "" { + obsUpdates = append(obsUpdates, persistObs{obs.ID, rpJSON}) + } + } + for _, ec := range extractEdgesFromObs(obs, tx, pm) { + edgeUpdates = append(edgeUpdates, persistEdge{ec.A, ec.B, ec.Timestamp}) + if graphRef != nil { + graphRef.upsertEdge(ec.A, ec.B, "", obs.ObserverID, obs.SNR, parseTimestamp(ec.Timestamp)) + } + } + } + } + + if len(obsUpdates) > 0 || len(edgeUpdates) > 0 { + go func() { + rw, err := openRW(dbPath) + if err != nil { + log.Printf("[store] obs-persist rw open error: %v", err) + return + } + defer rw.Close() + + if len(obsUpdates) > 0 { + sqlTx, err := rw.Begin() + if err == nil { + stmt, err := sqlTx.Prepare("UPDATE observations SET resolved_path = ? WHERE id = ?") + if err == nil { + var firstErr error + for _, u := range obsUpdates { + if _, err := stmt.Exec(u.resolvedPath, u.obsID); err != nil && firstErr == nil { + firstErr = err + } + } + stmt.Close() + if firstErr != nil { + log.Printf("[store] obs-persist resolved_path error (first): %v", firstErr) + } + } else { + log.Printf("[store] obs-persist resolved_path prepare error: %v", err) + } + sqlTx.Commit() + } + } + + if len(edgeUpdates) > 0 { + sqlTx, err := rw.Begin() + if err == nil { + stmt, err := sqlTx.Prepare(`INSERT INTO neighbor_edges (node_a, node_b, count, last_seen) + VALUES (?, ?, 1, ?) + ON CONFLICT(node_a, node_b) DO UPDATE SET + count = count + 1, last_seen = MAX(last_seen, excluded.last_seen)`) + if err == nil { + var firstErr error + for _, e := range edgeUpdates { + if _, err := stmt.Exec(e.a, e.b, e.ts); err != nil && firstErr == nil { + firstErr = err + } + } + stmt.Close() + if firstErr != nil { + log.Printf("[store] obs-persist edge error (first): %v", firstErr) + } + } else { + log.Printf("[store] obs-persist edge prepare error: %v", err) + } + sqlTx.Commit() + } + } + }() + } + } + return broadcastMaps }