fix: address all 6 items from independent re-review

1. Hoist getCachedNodesAndPM() before observation loop in IngestNewObservations
2. Include resolved_path in IngestNewObservations broadcast maps
3. Persist resolved paths and edges to SQLite in IngestNewObservations
4. Add error logging for stmt.Exec in backfillResolvedPaths
5. Add error logging for stmt.Exec in buildAndPersistEdges
6. Remove tautological TestNeighborPersistFileCompiles test
This commit is contained in:
you
2026-04-04 05:09:37 +00:00
parent 232c637f44
commit 352967ca37
3 changed files with 111 additions and 17 deletions
+14 -2
View File
@@ -144,14 +144,20 @@ func buildAndPersistEdges(store *PacketStore, rw *sql.DB) int {
defer stmt.Close()
edgeCount := 0
var firstErr error
for _, pkt := range packets {
for _, obs := range pkt.Observations {
for _, ec := range extractEdgesFromObs(obs, pkt, pm) {
stmt.Exec(ec.A, ec.B, ec.Timestamp)
if _, err := stmt.Exec(ec.A, ec.B, ec.Timestamp); err != nil && firstErr == nil {
firstErr = err
}
edgeCount++
}
}
}
if firstErr != nil {
log.Printf("[neighbor] edge exec error (first): %v", firstErr)
}
if err := tx.Commit(); err != nil {
log.Printf("[neighbor] commit error: %v", err)
@@ -337,8 +343,14 @@ func backfillResolvedPaths(store *PacketStore, dbPath string) int {
}
defer stmt.Close()
var firstErr error
for _, r := range results {
stmt.Exec(r.rpJSON, r.obsID)
if _, err := stmt.Exec(r.rpJSON, r.obsID); err != nil && firstErr == nil {
firstErr = err
}
}
if firstErr != nil {
log.Printf("[store] backfill resolved_path exec error (first): %v", firstErr)
}
if err := sqlTx.Commit(); err != nil {
-14
View File
@@ -524,18 +524,4 @@ func TestExtractEdgesFromObs_SameNodeNoEdge(t *testing.T) {
}
}
func TestNeighborPersistFileCompiles(t *testing.T) {
_ = openRW
_ = ensureNeighborEdgesTable
_ = ensureResolvedPathColumn
_ = loadNeighborEdgesFromDB
_ = persistEdge
_ = resolvePathForObs
_ = marshalResolvedPath
_ = unmarshalResolvedPath
_ = backfillResolvedPaths
_ = buildAndPersistEdges
_ = neighborEdgesTableExists
_ = extractEdgesFromObs
}
+97 -1
View File
@@ -1490,6 +1490,10 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
updatedTxs := make(map[int]*StoreTx)
broadcastMaps := make([]map[string]interface{}, 0, len(obsRows))
// Hoist getCachedNodesAndPM() before the loop — same pattern as IngestNewFromDB (review fix #1).
_, pm := s.getCachedNodesAndPM()
graphRef := s.graph
for _, r := range obsRows {
// Already ingested (e.g. by IngestNewFromDB in same cycle)
if _, exists := s.byObsID[r.obsID]; exists {
@@ -1525,7 +1529,6 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
// Resolve path at ingest time for late-arriving observations (review item #2).
if r.pathJSON != "" && r.pathJSON != "[]" {
_, pm := s.getCachedNodesAndPM()
if pm != nil {
obs.ResolvedPath = resolvePathForObs(r.pathJSON, r.observerID, tx, pm, s.graph)
}
@@ -1573,6 +1576,9 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
"direction": strOrNil(obs.Direction),
"observation_count": tx.ObservationCount,
}
if obs.ResolvedPath != nil {
pkt["resolved_path"] = obs.ResolvedPath
}
broadcastMap := make(map[string]interface{}, len(pkt)+2)
for k, v := range pkt {
broadcastMap[k] = v
@@ -1637,6 +1643,96 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
})
}
// Persist resolved paths and neighbor edges asynchronously (review fix #3).
// Same pattern as IngestNewFromDB — late observations need persistence too.
if len(updatedTxs) > 0 && s.db != nil {
dbPath := s.db.path
type persistObs struct {
obsID int
resolvedPath string
}
type persistEdge struct {
a, b, ts string
}
var obsUpdates []persistObs
var edgeUpdates []persistEdge
for _, tx := range updatedTxs {
for _, obs := range tx.Observations {
if obs.ResolvedPath != nil {
rpJSON := marshalResolvedPath(obs.ResolvedPath)
if rpJSON != "" {
obsUpdates = append(obsUpdates, persistObs{obs.ID, rpJSON})
}
}
for _, ec := range extractEdgesFromObs(obs, tx, pm) {
edgeUpdates = append(edgeUpdates, persistEdge{ec.A, ec.B, ec.Timestamp})
if graphRef != nil {
graphRef.upsertEdge(ec.A, ec.B, "", obs.ObserverID, obs.SNR, parseTimestamp(ec.Timestamp))
}
}
}
}
if len(obsUpdates) > 0 || len(edgeUpdates) > 0 {
go func() {
rw, err := openRW(dbPath)
if err != nil {
log.Printf("[store] obs-persist rw open error: %v", err)
return
}
defer rw.Close()
if len(obsUpdates) > 0 {
sqlTx, err := rw.Begin()
if err == nil {
stmt, err := sqlTx.Prepare("UPDATE observations SET resolved_path = ? WHERE id = ?")
if err == nil {
var firstErr error
for _, u := range obsUpdates {
if _, err := stmt.Exec(u.resolvedPath, u.obsID); err != nil && firstErr == nil {
firstErr = err
}
}
stmt.Close()
if firstErr != nil {
log.Printf("[store] obs-persist resolved_path error (first): %v", firstErr)
}
} else {
log.Printf("[store] obs-persist resolved_path prepare error: %v", err)
}
sqlTx.Commit()
}
}
if len(edgeUpdates) > 0 {
sqlTx, err := rw.Begin()
if err == nil {
stmt, err := sqlTx.Prepare(`INSERT INTO neighbor_edges (node_a, node_b, count, last_seen)
VALUES (?, ?, 1, ?)
ON CONFLICT(node_a, node_b) DO UPDATE SET
count = count + 1, last_seen = MAX(last_seen, excluded.last_seen)`)
if err == nil {
var firstErr error
for _, e := range edgeUpdates {
if _, err := stmt.Exec(e.a, e.b, e.ts); err != nil && firstErr == nil {
firstErr = err
}
}
stmt.Close()
if firstErr != nil {
log.Printf("[store] obs-persist edge error (first): %v", firstErr)
}
} else {
log.Printf("[store] obs-persist edge prepare error: %v", err)
}
sqlTx.Commit()
}
}
}()
}
}
return broadcastMaps
}