Files
meshcore-analyzer/cmd/ingestor/neighbor_builder.go
T
Kpa-clawbot eeddf46bc9 fix(ingestor): neighbor-builder delta scan + watermark — recovers 97% packet loss from #1289 (fixes #1339) (#1341)
## Summary
PR #1289 moved neighbor-graph construction into the ingestor with a 60s
ticker. `buildAndPersistNeighborEdges` then issued an **unbounded**
`SELECT … FROM observations o JOIN transmissions t …` every tick. On
staging (3.7M observations) one tick took ~2 minutes; with
`max_open_conns=1`, the SQLite single-writer was held continuously and
MQTT ingest collapsed (~6,500 tx/day → ~180 tx/day, 97% loss).

## Fix
Watermark-bounded delta scan. Each call derives the watermark from
`MAX(neighbor_edges.last_seen)` and restricts the SELECT to `WHERE
o.timestamp > ? ORDER BY o.timestamp LIMIT 50000`. `neighbor_edges`
itself is the persistence — no new metadata table, no in-memory state,
restarts resume cleanly from whatever the table reflects.

- Empty edges table → watermark 0 → full warm-up scan (preserves #1289's
synchronous warm-up intent).
- Warm-up loops the builder until a call returns fewer than the batch
cap, so the first server snapshot load sees a fully-populated table even
on fresh DBs.
- 50k batch cap stops any single tick from monopolising the writer; a
backlog drains over successive ticks.
- Per-tick wallclock is logged (`tick: N edges in DUR`); a tick >5s is
logged loudly as a possible regression of #1339. Broader instrumentation
is tracked in #1340.
- Output schema unchanged — server's `neighbor_recomputer.go` is
unaffected.

## Trade-off
An anomalously-old observation that arrives after its timestamp has been
crossed by the watermark will be skipped. Acceptable for an approximate
neighbor graph; a periodic full-rebuild can land later if needed.

## TDD
- **RED** (`d88e2522`): `TestNeighborEdgesBuilderDeltaScan` seeds 100k
observations, asserts an empty-delta tick is a no-op (<1s), and a
100-row delta is upserted in <500ms with no rescan of baseline rows.
Baseline builder fails the empty-delta assertion (sees all 200k baseline
edges).
- **GREEN** (`cf6fbb4e`): watermark + LIMIT — all assertions pass.
- **Mutation**: revert the `WHERE o.timestamp > ?` clause → the test
hangs to lock-contention timeout, confirming the WHERE actually gates
the behavior.

## Benchmark (synthetic, 100k observations, local sqlite)
| | Scan duration |
|---|---|
| Baseline builder, full scan every tick | ~40s |
| Patched builder, empty-delta tick | <50ms |
| Patched builder, 100-row delta | <50ms |

Staging projection: 2–3 min ticks → <1s ticks; SQLite writer freed for
MQTT ingest.

Fixes #1339

---------

Co-authored-by: openclaw-bot <bot@openclaw.local>
2026-05-23 20:54:16 -07:00

312 lines
9.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"strings"
"sync"
"time"
)
// NeighborEdgesBuilderInterval is how often the ingestor rescans
// observations and refreshes neighbor_edges. Server reads with the
// same 60s cadence (see cmd/server/neighbor_recomputer.go); a 60s
// pulse here is sufficient to keep the snapshot fresh.
const NeighborEdgesBuilderInterval = 60 * time.Second
// neighborBuilderMaxBatch caps how many observation rows a single
// delta tick may process (#1339). With max_open_conns=1, an unbounded
// scan on a multi-million-row table holds the SQLite write lock for
// minutes and starves MQTT ingest. The cap keeps each tick bounded;
// if a backlog accumulates, successive ticks drain it 50k rows at a
// time without ever blocking ingest for long.
const neighborBuilderMaxBatch = 50000
// neighborBuilderSlowTickThreshold is the per-tick wallclock budget
// for the builder. Exceeding it is logged loudly so operators can
// catch a regression of #1339 quickly. The full instrumentation
// framework is tracked in #1340.
const neighborBuilderSlowTickThreshold = 5 * time.Second
// payloadADVERT mirrors the constant in cmd/server/decoder.go.
// Duplicated rather than imported so the ingestor binary stays
// independent of the server package.
const payloadADVERT = 0x04
// edgeRow is one row to upsert into neighbor_edges. (a, b) is already
// canonical-ordered (a <= b).
type edgeRow struct {
a, b, ts string
}
// StartNeighborEdgesBuilder launches the periodic builder. On each
// tick it rescans recent observations + transmissions and upserts
// derived neighbor_edges rows. Builder is the only writer to
// neighbor_edges (#1287).
//
// The function returns a stop closure. Initial build runs synchronously
// before the ticker starts so the server's first snapshot load picks
// up real data instead of an empty table.
func (s *Store) StartNeighborEdgesBuilder(interval time.Duration) func() {
if interval <= 0 {
interval = NeighborEdgesBuilderInterval
}
stop := make(chan struct{})
done := make(chan struct{})
// Synchronous warm-up: on a fresh DB this is a full scan; on a DB
// with persisted neighbor_edges (most restarts), the watermark
// short-circuits it into a delta scan. Loop until the per-tick
// batch cap stops triggering so we drain any backlog before
// returning — first server load needs a fully-populated table.
wuStart := time.Now()
var wuTotal int
for {
n, err := s.buildAndPersistNeighborEdges()
if err != nil {
log.Printf("[neighbor-build] initial build error: %v", err)
break
}
wuTotal += n
if n < neighborBuilderMaxBatch {
break
}
}
log.Printf("[neighbor-build] initial build: %d edges upserted in %s", wuTotal, time.Since(wuStart))
var stopOnce sync.Once
go func() {
defer close(done)
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-t.C:
start := time.Now()
n, err := s.buildAndPersistNeighborEdges()
dur := time.Since(start)
if err != nil {
log.Printf("[neighbor-build] tick error after %s: %v", dur, err)
} else if n > 0 {
log.Printf("[neighbor-build] tick: %d edges in %s (delta from watermark)", n, dur)
}
if dur > neighborBuilderSlowTickThreshold {
log.Printf("[neighbor-build] SLOW tick: %s — possible regression of #1339", dur)
}
case <-stop:
return
}
}
}()
return func() {
stopOnce.Do(func() { close(stop) })
select {
case <-done:
case <-time.After(5 * time.Second):
}
}
}
// buildAndPersistNeighborEdges scans transmissions + observations,
// extracts edge candidates (originator↔first-hop on ADVERTs;
// observer↔last-hop on all packet types) and upserts them into
// neighbor_edges. Returns count of attempted upserts.
//
// Watermark / delta semantics (#1339): the builder derives a watermark
// from MAX(neighbor_edges.last_seen). On an empty edges table (fresh
// DB), watermark is 0 and the builder does a full warm-up scan. On
// every subsequent call, the SELECT is restricted to observations
// whose timestamp is strictly greater than the watermark, bounded by
// neighborBuilderMaxBatch. neighbor_edges itself is the persistence —
// no metadata table or in-memory state is required, and restarts
// resume cleanly from whatever the table reflects.
//
// Trade-off (documented for #1340 follow-up): an anomalously-old
// observation that arrives AFTER its timestamp has already been
// crossed by the watermark will be skipped. Acceptable for an
// approximate neighbor graph; a periodic full-rebuild can be added
// later if needed.
//
// Resolution of hop-prefix → full pubkey is done via a one-shot
// SELECT of (lowered) pubkey prefixes from nodes. Prefixes with
// multiple candidates are skipped (matches the conservative
// resolution rule in cmd/server/extractEdgesFromObs).
func (s *Store) buildAndPersistNeighborEdges() (int, error) {
prefixIdx, err := buildPrefixIndex(s.db)
if err != nil {
return 0, fmt.Errorf("build prefix index: %w", err)
}
// Derive the watermark from the existing edges table. RFC3339
// → epoch seconds so it can be compared against observations.timestamp
// (stored as INTEGER unix epoch). On an empty edges table both the
// query and the parse return zero → full warm-up scan.
var watermarkRFC sql.NullString
if err := s.db.QueryRow(`SELECT MAX(last_seen) FROM neighbor_edges`).Scan(&watermarkRFC); err != nil {
return 0, fmt.Errorf("read watermark: %w", err)
}
var watermarkEpoch int64
if watermarkRFC.Valid && watermarkRFC.String != "" {
if t, parseErr := time.Parse(time.RFC3339, watermarkRFC.String); parseErr == nil {
watermarkEpoch = t.Unix()
}
}
rows, err := s.db.Query(`SELECT
t.payload_type,
t.decoded_json,
COALESCE(t.from_pubkey, ''),
COALESCE(o.path_json, ''),
COALESCE(obs.id, '') AS observer_id,
o.timestamp
FROM observations o
JOIN transmissions t ON t.id = o.transmission_id
LEFT JOIN observers obs ON obs.rowid = o.observer_idx
WHERE o.timestamp > ?
ORDER BY o.timestamp
LIMIT ?`, watermarkEpoch, neighborBuilderMaxBatch)
if err != nil {
return 0, fmt.Errorf("scan observations: %w", err)
}
defer rows.Close()
var edges []edgeRow
for rows.Next() {
var payloadType sql.NullInt64
var decodedJSON, fromPubkey, pathJSON, observerID string
var epochTs int64
if err := rows.Scan(&payloadType, &decodedJSON, &fromPubkey, &pathJSON, &observerID, &epochTs); err != nil {
continue
}
fromNode := strings.ToLower(fromPubkey)
if fromNode == "" {
fromNode = strings.ToLower(extractPubkeyFromAdvertJSON(decodedJSON))
}
isAdvert := payloadType.Valid && payloadType.Int64 == int64(payloadADVERT)
ts := time.Unix(epochTs, 0).UTC().Format(time.RFC3339)
observerPK := strings.ToLower(observerID)
path := parsePathArray(pathJSON)
if len(path) == 0 {
if isAdvert && fromNode != "" && fromNode != observerPK && observerPK != "" {
edges = append(edges, canonEdge(fromNode, observerPK, ts))
}
continue
}
if isAdvert && fromNode != "" {
if resolved, ok := resolvePrefix(prefixIdx, path[0]); ok && resolved != fromNode {
edges = append(edges, canonEdge(fromNode, resolved, ts))
}
}
if observerPK != "" {
last := path[len(path)-1]
if resolved, ok := resolvePrefix(prefixIdx, last); ok && resolved != observerPK {
edges = append(edges, canonEdge(observerPK, resolved, ts))
}
}
}
if len(edges) == 0 {
return 0, nil
}
tx, err := s.db.Begin()
if err != nil {
return 0, fmt.Errorf("begin: %w", err)
}
defer tx.Rollback()
stmt, err := tx.Prepare(`INSERT INTO neighbor_edges (node_a, node_b, count, last_seen)
VALUES (?, ?, 1, ?)
ON CONFLICT(node_a, node_b) DO UPDATE SET
count = count + 1,
last_seen = MAX(last_seen, excluded.last_seen)`)
if err != nil {
return 0, fmt.Errorf("prepare: %w", err)
}
defer stmt.Close()
var firstErr error
for _, e := range edges {
if _, err := stmt.Exec(e.a, e.b, e.ts); err != nil && firstErr == nil {
firstErr = err
}
}
if firstErr != nil {
return 0, fmt.Errorf("upsert: %w", firstErr)
}
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("commit: %w", err)
}
return len(edges), nil
}
// canonEdge orders the pair so node_a <= node_b (matches the existing
// schema convention used by the loader and the bridge recomputer).
func canonEdge(a, b, ts string) edgeRow {
if a > b {
a, b = b, a
}
return edgeRow{a, b, ts}
}
// parsePathArray returns the hop strings from a path_json blob.
// Defensive against missing/invalid JSON.
func parsePathArray(s string) []string {
if s == "" || s == "[]" {
return nil
}
var arr []string
if json.Unmarshal([]byte(s), &arr) != nil {
return nil
}
return arr
}
// prefixIndex maps a hop prefix (lowercase) → all full pubkeys whose
// public_key starts with that prefix. Prefixes with > 1 candidate are
// considered ambiguous and skipped during resolution.
type prefixIndex map[string][]string
// buildPrefixIndex reads nodes.public_key and builds the prefix → pubkey
// map. We index every 1-byte (2 hex char) prefix length the firmware
// uses (1, 2, 3, 4, 6, 8). Memory cost is O(nodes × len(prefixLens)).
func buildPrefixIndex(db *sql.DB) (prefixIndex, error) {
rows, err := db.Query(`SELECT public_key FROM nodes`)
if err != nil {
return nil, err
}
defer rows.Close()
idx := make(prefixIndex, 1024)
var prefixLens = []int{1 * 2, 2 * 2, 3 * 2, 4 * 2, 6 * 2, 8 * 2}
for rows.Next() {
var pk string
if err := rows.Scan(&pk); err != nil {
continue
}
pkLower := strings.ToLower(pk)
for _, n := range prefixLens {
if len(pkLower) < n {
continue
}
prefix := pkLower[:n]
idx[prefix] = append(idx[prefix], pkLower)
}
}
return idx, nil
}
// resolvePrefix returns the single resolved pubkey if exactly one
// candidate matches, otherwise (zero || multiple), it returns ok=false
// (matches the conservative server-side resolver in
// cmd/server/extractEdgesFromObs).
func resolvePrefix(idx prefixIndex, hop string) (string, bool) {
h := strings.ToLower(hop)
candidates := idx[h]
if len(candidates) != 1 {
return "", false
}
return candidates[0], true
}