diff --git a/cmd/server/neighbor_persist.go b/cmd/server/neighbor_persist.go index fcb13253..c77ce554 100644 --- a/cmd/server/neighbor_persist.go +++ b/cmd/server/neighbor_persist.go @@ -144,14 +144,20 @@ func buildAndPersistEdges(store *PacketStore, rw *sql.DB) int { defer stmt.Close() edgeCount := 0 + var firstErr error for _, pkt := range packets { for _, obs := range pkt.Observations { for _, ec := range extractEdgesFromObs(obs, pkt, pm) { - stmt.Exec(ec.A, ec.B, ec.Timestamp) + if _, err := stmt.Exec(ec.A, ec.B, ec.Timestamp); err != nil && firstErr == nil { + firstErr = err + } edgeCount++ } } } + if firstErr != nil { + log.Printf("[neighbor] edge exec error (first): %v", firstErr) + } if err := tx.Commit(); err != nil { log.Printf("[neighbor] commit error: %v", err) @@ -337,8 +343,14 @@ func backfillResolvedPaths(store *PacketStore, dbPath string) int { } defer stmt.Close() + var firstErr error for _, r := range results { - stmt.Exec(r.rpJSON, r.obsID) + if _, err := stmt.Exec(r.rpJSON, r.obsID); err != nil && firstErr == nil { + firstErr = err + } + } + if firstErr != nil { + log.Printf("[store] backfill resolved_path exec error (first): %v", firstErr) } if err := sqlTx.Commit(); err != nil { diff --git a/cmd/server/neighbor_persist_test.go b/cmd/server/neighbor_persist_test.go index d3c72994..5f9bf13e 100644 --- a/cmd/server/neighbor_persist_test.go +++ b/cmd/server/neighbor_persist_test.go @@ -524,18 +524,4 @@ func TestExtractEdgesFromObs_SameNodeNoEdge(t *testing.T) { } } -func TestNeighborPersistFileCompiles(t *testing.T) { - _ = openRW - _ = ensureNeighborEdgesTable - _ = ensureResolvedPathColumn - _ = loadNeighborEdgesFromDB - _ = persistEdge - _ = resolvePathForObs - _ = marshalResolvedPath - _ = unmarshalResolvedPath - _ = backfillResolvedPaths - _ = buildAndPersistEdges - _ = neighborEdgesTableExists - _ = extractEdgesFromObs -} diff --git a/cmd/server/store.go b/cmd/server/store.go index 5d5e4ad5..5015c540 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -1490,6 +1490,10 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] updatedTxs := make(map[int]*StoreTx) broadcastMaps := make([]map[string]interface{}, 0, len(obsRows)) + // Hoist getCachedNodesAndPM() before the loop — same pattern as IngestNewFromDB (review fix #1). + _, pm := s.getCachedNodesAndPM() + graphRef := s.graph + for _, r := range obsRows { // Already ingested (e.g. by IngestNewFromDB in same cycle) if _, exists := s.byObsID[r.obsID]; exists { @@ -1525,7 +1529,6 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] // Resolve path at ingest time for late-arriving observations (review item #2). if r.pathJSON != "" && r.pathJSON != "[]" { - _, pm := s.getCachedNodesAndPM() if pm != nil { obs.ResolvedPath = resolvePathForObs(r.pathJSON, r.observerID, tx, pm, s.graph) } @@ -1573,6 +1576,9 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] "direction": strOrNil(obs.Direction), "observation_count": tx.ObservationCount, } + if obs.ResolvedPath != nil { + pkt["resolved_path"] = obs.ResolvedPath + } broadcastMap := make(map[string]interface{}, len(pkt)+2) for k, v := range pkt { broadcastMap[k] = v @@ -1637,6 +1643,96 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] }) } + // Persist resolved paths and neighbor edges asynchronously (review fix #3). + // Same pattern as IngestNewFromDB — late observations need persistence too. + if len(updatedTxs) > 0 && s.db != nil { + dbPath := s.db.path + type persistObs struct { + obsID int + resolvedPath string + } + type persistEdge struct { + a, b, ts string + } + var obsUpdates []persistObs + var edgeUpdates []persistEdge + + for _, tx := range updatedTxs { + for _, obs := range tx.Observations { + if obs.ResolvedPath != nil { + rpJSON := marshalResolvedPath(obs.ResolvedPath) + if rpJSON != "" { + obsUpdates = append(obsUpdates, persistObs{obs.ID, rpJSON}) + } + } + for _, ec := range extractEdgesFromObs(obs, tx, pm) { + edgeUpdates = append(edgeUpdates, persistEdge{ec.A, ec.B, ec.Timestamp}) + if graphRef != nil { + graphRef.upsertEdge(ec.A, ec.B, "", obs.ObserverID, obs.SNR, parseTimestamp(ec.Timestamp)) + } + } + } + } + + if len(obsUpdates) > 0 || len(edgeUpdates) > 0 { + go func() { + rw, err := openRW(dbPath) + if err != nil { + log.Printf("[store] obs-persist rw open error: %v", err) + return + } + defer rw.Close() + + if len(obsUpdates) > 0 { + sqlTx, err := rw.Begin() + if err == nil { + stmt, err := sqlTx.Prepare("UPDATE observations SET resolved_path = ? WHERE id = ?") + if err == nil { + var firstErr error + for _, u := range obsUpdates { + if _, err := stmt.Exec(u.resolvedPath, u.obsID); err != nil && firstErr == nil { + firstErr = err + } + } + stmt.Close() + if firstErr != nil { + log.Printf("[store] obs-persist resolved_path error (first): %v", firstErr) + } + } else { + log.Printf("[store] obs-persist resolved_path prepare error: %v", err) + } + sqlTx.Commit() + } + } + + if len(edgeUpdates) > 0 { + sqlTx, err := rw.Begin() + if err == nil { + stmt, err := sqlTx.Prepare(`INSERT INTO neighbor_edges (node_a, node_b, count, last_seen) + VALUES (?, ?, 1, ?) + ON CONFLICT(node_a, node_b) DO UPDATE SET + count = count + 1, last_seen = MAX(last_seen, excluded.last_seen)`) + if err == nil { + var firstErr error + for _, e := range edgeUpdates { + if _, err := stmt.Exec(e.a, e.b, e.ts); err != nil && firstErr == nil { + firstErr = err + } + } + stmt.Close() + if firstErr != nil { + log.Printf("[store] obs-persist edge error (first): %v", firstErr) + } + } else { + log.Printf("[store] obs-persist edge prepare error: %v", err) + } + sqlTx.Commit() + } + } + }() + } + } + return broadcastMaps }