mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-05-11 09:16:58 +00:00
2460e33f94
## What + why
`fetchResolvedPathForTxBest` (used by every API path that fills the
top-level `resolved_path`, including
`/api/nodes/{pk}/health.recentPackets`) picked the observation with the
longest `path_json` and queried SQL for that single obs ID. When the
longest-path obs had `resolved_path` NULL but a shorter sibling had one,
the helper returned nil and the top-level field was dropped — even
though the data exists. QA #809 §2.1 caught it on the health endpoint
because that page surfaces it per-tx.
Fix: keep the LRU-friendly fast path (try the longest-path obs), then
fall back to scanning all observations of the tx and picking the longest
`path_json` that actually has a stored `resolved_path`.
## Changes
- `cmd/server/resolved_index.go`: extend `fetchResolvedPathForTxBest`
with a fallback through `fetchResolvedPathsForTx`.
- `cmd/server/issue810_repro_test.go`: regression test — seeds a tx
whose longest-path obs lacks `resolved_path` and a shorter sibling has
it, then asserts `/api/packets` and
`/api/nodes/{pk}/health.recentPackets` agree.
## Tests
`go test ./... -count=1` from `cmd/server` — PASS (full suite, ~19s).
## Perf
Fast path unchanged (single LRU/SQL lookup, dominant case). Fallback
only runs when the longest-path obs has NULL `resolved_path` — one
indexed query per affected tx, bounded by observations-per-tx (small).
Closes #810
---------
Co-authored-by: you <you@example.com>
476 lines
14 KiB
Go
476 lines
14 KiB
Go
package main
|
|
|
|
// Lock ordering contract (MUST be followed everywhere):
|
|
//
|
|
// s.mu → s.lruMu (s.mu is the outer lock, lruMu is the inner lock)
|
|
//
|
|
// • Never acquire s.lruMu while holding s.mu.
|
|
// • fetchResolvedPathForObs takes lruMu independently — callers under s.mu
|
|
// must NOT call it directly; instead collect IDs under s.mu, release, then
|
|
// do LRU ops under lruMu separately.
|
|
// • The backfill path (backfillResolvedPathsAsync) follows this by collecting
|
|
// obsIDs to invalidate under s.mu, releasing it, then taking lruMu.
|
|
|
|
import (
|
|
"database/sql"
|
|
"hash/fnv"
|
|
"log"
|
|
"strings"
|
|
)
|
|
|
|
// resolvedPubkeyHash computes a fast 64-bit hash for membership index keying.
|
|
// Uses FNV-1a from stdlib — good distribution, no external dependency.
|
|
func resolvedPubkeyHash(pk string) uint64 {
|
|
h := fnv.New64a()
|
|
h.Write([]byte(strings.ToLower(pk)))
|
|
return h.Sum64()
|
|
}
|
|
|
|
// addToResolvedPubkeyIndex adds a txID under each resolved pubkey hash.
|
|
// Deduplicates both within a single call AND across calls — won't add the
|
|
// same (hash, txID) pair twice even when called multiple times for the same tx.
|
|
// Must be called under s.mu write lock.
|
|
func (s *PacketStore) addToResolvedPubkeyIndex(txID int, resolvedPubkeys []string) {
|
|
if !s.useResolvedPathIndex {
|
|
return
|
|
}
|
|
seen := make(map[uint64]bool, len(resolvedPubkeys))
|
|
for _, pk := range resolvedPubkeys {
|
|
if pk == "" {
|
|
continue
|
|
}
|
|
h := resolvedPubkeyHash(pk)
|
|
if seen[h] {
|
|
continue
|
|
}
|
|
seen[h] = true
|
|
|
|
// Cross-call dedup: check if (h, txID) already exists in forward index.
|
|
existing := s.resolvedPubkeyIndex[h]
|
|
alreadyPresent := false
|
|
for _, id := range existing {
|
|
if id == txID {
|
|
alreadyPresent = true
|
|
break
|
|
}
|
|
}
|
|
if alreadyPresent {
|
|
continue
|
|
}
|
|
|
|
s.resolvedPubkeyIndex[h] = append(existing, txID)
|
|
s.resolvedPubkeyReverse[txID] = append(s.resolvedPubkeyReverse[txID], h)
|
|
}
|
|
}
|
|
|
|
// removeFromResolvedPubkeyIndex removes all index entries for a txID using the reverse map.
|
|
// Must be called under s.mu write lock.
|
|
func (s *PacketStore) removeFromResolvedPubkeyIndex(txID int) {
|
|
if !s.useResolvedPathIndex {
|
|
return
|
|
}
|
|
hashes := s.resolvedPubkeyReverse[txID]
|
|
for _, h := range hashes {
|
|
list := s.resolvedPubkeyIndex[h]
|
|
// Remove ALL occurrences of txID (not just the first) to prevent orphans.
|
|
filtered := list[:0]
|
|
for _, id := range list {
|
|
if id != txID {
|
|
filtered = append(filtered, id)
|
|
}
|
|
}
|
|
if len(filtered) == 0 {
|
|
delete(s.resolvedPubkeyIndex, h)
|
|
} else {
|
|
s.resolvedPubkeyIndex[h] = filtered
|
|
}
|
|
}
|
|
delete(s.resolvedPubkeyReverse, txID)
|
|
}
|
|
|
|
// extractResolvedPubkeys extracts all non-nil, non-empty pubkeys from a resolved path.
|
|
func extractResolvedPubkeys(rp []*string) []string {
|
|
if len(rp) == 0 {
|
|
return nil
|
|
}
|
|
result := make([]string, 0, len(rp))
|
|
for _, p := range rp {
|
|
if p != nil && *p != "" {
|
|
result = append(result, *p)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// mergeResolvedPubkeys collects unique non-empty pubkeys from multiple resolved paths.
|
|
func mergeResolvedPubkeys(paths ...[]*string) []string {
|
|
seen := make(map[string]bool)
|
|
var result []string
|
|
for _, rp := range paths {
|
|
for _, p := range rp {
|
|
if p != nil && *p != "" && !seen[*p] {
|
|
seen[*p] = true
|
|
result = append(result, *p)
|
|
}
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// nodeInResolvedPathViaIndex checks whether a transmission is associated with
|
|
// a target pubkey using the membership index + collision-safety SQL check.
|
|
// Must be called under s.mu RLock at minimum.
|
|
func (s *PacketStore) nodeInResolvedPathViaIndex(tx *StoreTx, targetPK string) bool {
|
|
if !s.useResolvedPathIndex {
|
|
// Flag off: can't disambiguate, keep candidate (conservative)
|
|
return true
|
|
}
|
|
|
|
// If this tx has no indexed pubkeys at all, we can't disambiguate —
|
|
// keep the candidate (same as old behavior for NULL resolved_path).
|
|
if _, hasReverse := s.resolvedPubkeyReverse[tx.ID]; !hasReverse {
|
|
return true
|
|
}
|
|
|
|
h := resolvedPubkeyHash(targetPK)
|
|
txIDs := s.resolvedPubkeyIndex[h]
|
|
|
|
// Check if this tx's ID is in the candidate list
|
|
for _, id := range txIDs {
|
|
if id == tx.ID {
|
|
// Found in index. Collision-safety: verify with SQL.
|
|
if s.db != nil && s.db.conn != nil {
|
|
return s.confirmResolvedPathContains(tx.ID, targetPK)
|
|
}
|
|
return true // no DB, trust the index
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// confirmResolvedPathContains verifies an exact pubkey match in resolved_path
|
|
// via SQL. This is the collision-safety fallback for the membership index.
|
|
func (s *PacketStore) confirmResolvedPathContains(txID int, pubkey string) bool {
|
|
if s.db == nil || s.db.conn == nil {
|
|
return true
|
|
}
|
|
// Use INSTR with surrounding quotes for exact match — avoids LIKE escape issues.
|
|
// resolved_path format: ["pubkey1","pubkey2",...]
|
|
needle := `"` + strings.ToLower(pubkey) + `"`
|
|
var count int
|
|
err := s.db.conn.QueryRow(
|
|
`SELECT COUNT(*) FROM observations WHERE transmission_id = ? AND INSTR(LOWER(resolved_path), ?) > 0`,
|
|
txID, needle,
|
|
).Scan(&count)
|
|
if err != nil {
|
|
return true // on error, keep the candidate
|
|
}
|
|
return count > 0
|
|
}
|
|
|
|
// fetchResolvedPathsForTx fetches resolved_path from SQLite for all observations
|
|
// of a transmission. Used for on-demand API responses and eviction cleanup.
|
|
func (s *PacketStore) fetchResolvedPathsForTx(txID int) map[int][]*string {
|
|
if s.db == nil || s.db.conn == nil {
|
|
return nil
|
|
}
|
|
rows, err := s.db.conn.Query(
|
|
`SELECT id, resolved_path FROM observations WHERE transmission_id = ? AND resolved_path IS NOT NULL`,
|
|
txID,
|
|
)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
defer rows.Close()
|
|
|
|
result := make(map[int][]*string)
|
|
for rows.Next() {
|
|
var obsID int
|
|
var rpJSON sql.NullString
|
|
if err := rows.Scan(&obsID, &rpJSON); err != nil {
|
|
continue
|
|
}
|
|
if rpJSON.Valid && rpJSON.String != "" {
|
|
result[obsID] = unmarshalResolvedPath(rpJSON.String)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// fetchResolvedPathForObs fetches resolved_path for a single observation,
|
|
// using the LRU cache.
|
|
func (s *PacketStore) fetchResolvedPathForObs(obsID int) []*string {
|
|
if s.db == nil || s.db.conn == nil {
|
|
return nil
|
|
}
|
|
|
|
// Check LRU cache first
|
|
s.lruMu.RLock()
|
|
if s.apiResolvedPathLRU != nil {
|
|
if entry, ok := s.apiResolvedPathLRU[obsID]; ok {
|
|
s.lruMu.RUnlock()
|
|
return entry
|
|
}
|
|
}
|
|
s.lruMu.RUnlock()
|
|
|
|
var rpJSON sql.NullString
|
|
err := s.db.conn.QueryRow(
|
|
`SELECT resolved_path FROM observations WHERE id = ?`, obsID,
|
|
).Scan(&rpJSON)
|
|
if err != nil || !rpJSON.Valid {
|
|
return nil
|
|
}
|
|
rp := unmarshalResolvedPath(rpJSON.String)
|
|
|
|
// Store in LRU
|
|
s.lruMu.Lock()
|
|
s.lruPut(obsID, rp)
|
|
s.lruMu.Unlock()
|
|
|
|
return rp
|
|
}
|
|
|
|
// fetchResolvedPathForTxBest returns the best observation's resolved_path for a tx.
|
|
//
|
|
// "Best" = the longest path_json among observations that actually have a stored
|
|
// resolved_path. Earlier versions picked the longest-path obs unconditionally
|
|
// and queried SQL for that single ID — if the longest-path obs had NULL
|
|
// resolved_path while a shorter sibling had one, the call returned nil and
|
|
// callers (e.g. /api/nodes/{pk}/health.recentPackets) lost the field. Fixes
|
|
// #810 by checking all observations and falling back to the longest sibling
|
|
// that has a stored path.
|
|
func (s *PacketStore) fetchResolvedPathForTxBest(tx *StoreTx) []*string {
|
|
if tx == nil || len(tx.Observations) == 0 {
|
|
return nil
|
|
}
|
|
// Fast path: try the longest-path obs first via the LRU/SQL helper.
|
|
longest := tx.Observations[0]
|
|
longestLen := pathLen(longest.PathJSON)
|
|
for _, obs := range tx.Observations[1:] {
|
|
if l := pathLen(obs.PathJSON); l > longestLen {
|
|
longest = obs
|
|
longestLen = l
|
|
}
|
|
}
|
|
if rp := s.fetchResolvedPathForObs(longest.ID); rp != nil {
|
|
return rp
|
|
}
|
|
// Fallback: longest-path obs has no stored resolved_path. Query all
|
|
// observations for this tx and pick the one with the longest path_json
|
|
// that actually has a stored resolved_path.
|
|
rpMap := s.fetchResolvedPathsForTx(tx.ID)
|
|
if len(rpMap) == 0 {
|
|
return nil
|
|
}
|
|
var bestRP []*string
|
|
bestObsID := 0
|
|
bestLen := -1
|
|
for _, obs := range tx.Observations {
|
|
rp, ok := rpMap[obs.ID]
|
|
if !ok || rp == nil {
|
|
continue
|
|
}
|
|
if l := pathLen(obs.PathJSON); l > bestLen {
|
|
bestLen = l
|
|
bestRP = rp
|
|
bestObsID = obs.ID
|
|
}
|
|
}
|
|
// Populate LRU so repeat lookups for this tx don't re-issue the multi-row
|
|
// SQL fallback (e.g. dashboard polling /api/nodes/{pk}/health).
|
|
if bestRP != nil && bestObsID != 0 {
|
|
s.lruMu.Lock()
|
|
s.lruPut(bestObsID, bestRP)
|
|
s.lruMu.Unlock()
|
|
}
|
|
return bestRP
|
|
}
|
|
|
|
// --- Simple LRU cache for resolved paths ---
|
|
|
|
const lruMaxSize = 10000
|
|
|
|
// lruPut adds an entry. Must be called under s.lruMu write lock.
|
|
func (s *PacketStore) lruPut(obsID int, rp []*string) {
|
|
if s.apiResolvedPathLRU == nil {
|
|
return
|
|
}
|
|
if _, exists := s.apiResolvedPathLRU[obsID]; exists {
|
|
return
|
|
}
|
|
// Compact lruOrder if stale entries exceed 50% of capacity.
|
|
// This prevents effective capacity degradation after bulk deletions.
|
|
if len(s.lruOrder) >= lruMaxSize && len(s.apiResolvedPathLRU) < lruMaxSize/2 {
|
|
compacted := make([]int, 0, len(s.apiResolvedPathLRU))
|
|
for _, id := range s.lruOrder {
|
|
if _, ok := s.apiResolvedPathLRU[id]; ok {
|
|
compacted = append(compacted, id)
|
|
}
|
|
}
|
|
s.lruOrder = compacted
|
|
}
|
|
if len(s.lruOrder) >= lruMaxSize {
|
|
// Evict oldest, skipping stale entries
|
|
for len(s.lruOrder) > 0 {
|
|
evictID := s.lruOrder[0]
|
|
s.lruOrder = s.lruOrder[1:]
|
|
if _, ok := s.apiResolvedPathLRU[evictID]; ok {
|
|
delete(s.apiResolvedPathLRU, evictID)
|
|
break
|
|
}
|
|
// stale entry — skip and continue
|
|
}
|
|
}
|
|
s.apiResolvedPathLRU[obsID] = rp
|
|
s.lruOrder = append(s.lruOrder, obsID)
|
|
}
|
|
|
|
// lruDelete removes an entry. Must be called under s.lruMu write lock.
|
|
func (s *PacketStore) lruDelete(obsID int) {
|
|
if s.apiResolvedPathLRU == nil {
|
|
return
|
|
}
|
|
delete(s.apiResolvedPathLRU, obsID)
|
|
// Don't scan lruOrder — eviction handles stale entries naturally.
|
|
}
|
|
|
|
// resolvedPubkeysForEvictionBatch fetches resolved pubkeys for multiple txIDs
|
|
// from SQL in a single batched query. Returns a map from txID to unique pubkeys.
|
|
// MUST be called WITHOUT holding s.mu — this is the whole point of the batch approach.
|
|
// Chunks queries to stay under SQLite's 500-parameter limit.
|
|
func (s *PacketStore) resolvedPubkeysForEvictionBatch(txIDs []int) map[int][]string {
|
|
result := make(map[int][]string, len(txIDs))
|
|
if len(txIDs) == 0 || s.db == nil || s.db.conn == nil {
|
|
return result
|
|
}
|
|
|
|
const chunkSize = 499 // SQLite SQLITE_MAX_VARIABLE_NUMBER default is 999; stay well under
|
|
for start := 0; start < len(txIDs); start += chunkSize {
|
|
end := start + chunkSize
|
|
if end > len(txIDs) {
|
|
end = len(txIDs)
|
|
}
|
|
chunk := txIDs[start:end]
|
|
|
|
// Build query with placeholders
|
|
placeholders := make([]byte, 0, len(chunk)*2)
|
|
args := make([]interface{}, len(chunk))
|
|
for i, id := range chunk {
|
|
if i > 0 {
|
|
placeholders = append(placeholders, ',')
|
|
}
|
|
placeholders = append(placeholders, '?')
|
|
args[i] = id
|
|
}
|
|
|
|
query := "SELECT transmission_id, resolved_path FROM observations WHERE transmission_id IN (" +
|
|
string(placeholders) + ") AND resolved_path IS NOT NULL"
|
|
|
|
rows, err := s.db.conn.Query(query, args...)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
for rows.Next() {
|
|
var txID int
|
|
var rpJSON sql.NullString
|
|
if err := rows.Scan(&txID, &rpJSON); err != nil {
|
|
continue
|
|
}
|
|
if !rpJSON.Valid || rpJSON.String == "" {
|
|
continue
|
|
}
|
|
rp := unmarshalResolvedPath(rpJSON.String)
|
|
for _, p := range rp {
|
|
if p != nil && *p != "" {
|
|
result[txID] = append(result[txID], *p)
|
|
}
|
|
}
|
|
}
|
|
rows.Close()
|
|
}
|
|
|
|
// Deduplicate per-txID
|
|
for txID, pks := range result {
|
|
seen := make(map[string]bool, len(pks))
|
|
deduped := pks[:0]
|
|
for _, pk := range pks {
|
|
if !seen[pk] {
|
|
seen[pk] = true
|
|
deduped = append(deduped, pk)
|
|
}
|
|
}
|
|
result[txID] = deduped
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// initResolvedPathIndex initializes the resolved path index data structures.
|
|
func (s *PacketStore) initResolvedPathIndex() {
|
|
s.resolvedPubkeyIndex = make(map[uint64][]int, 4096)
|
|
s.resolvedPubkeyReverse = make(map[int][]uint64, 4096)
|
|
s.apiResolvedPathLRU = make(map[int][]*string, lruMaxSize)
|
|
s.lruOrder = make([]int, 0, lruMaxSize)
|
|
}
|
|
|
|
// CompactResolvedPubkeyIndex reclaims memory from the resolved pubkey index maps
|
|
// after eviction. It removes empty forward-index entries (shouldn't exist if
|
|
// removeFromResolvedPubkeyIndex is correct, but defense in depth) and clips
|
|
// oversized slice backing arrays where cap > 2*len.
|
|
// Must be called under s.mu write lock.
|
|
func (s *PacketStore) CompactResolvedPubkeyIndex() {
|
|
if !s.useResolvedPathIndex {
|
|
return
|
|
}
|
|
for h, ids := range s.resolvedPubkeyIndex {
|
|
if len(ids) == 0 {
|
|
delete(s.resolvedPubkeyIndex, h)
|
|
continue
|
|
}
|
|
// Clip oversized backing arrays: if cap > 2*len, reallocate.
|
|
if cap(ids) > 2*len(ids)+8 {
|
|
clipped := make([]int, len(ids))
|
|
copy(clipped, ids)
|
|
s.resolvedPubkeyIndex[h] = clipped
|
|
}
|
|
}
|
|
for txID, hashes := range s.resolvedPubkeyReverse {
|
|
if len(hashes) == 0 {
|
|
delete(s.resolvedPubkeyReverse, txID)
|
|
continue
|
|
}
|
|
if cap(hashes) > 2*len(hashes)+8 {
|
|
clipped := make([]uint64, len(hashes))
|
|
copy(clipped, hashes)
|
|
s.resolvedPubkeyReverse[txID] = clipped
|
|
}
|
|
}
|
|
}
|
|
|
|
// defaultMaxResolvedPubkeyIndexEntries is the default hard cap for the forward
|
|
// index. When exceeded, a warning is logged. No auto-eviction — that's the
|
|
// eviction ticker's job.
|
|
const defaultMaxResolvedPubkeyIndexEntries = 5_000_000
|
|
|
|
// CheckResolvedPubkeyIndexSize logs a warning if the resolved pubkey forward
|
|
// index exceeds the configured maximum entries. Must be called under s.mu
|
|
// read lock at minimum.
|
|
func (s *PacketStore) CheckResolvedPubkeyIndexSize() {
|
|
if !s.useResolvedPathIndex {
|
|
return
|
|
}
|
|
maxEntries := s.maxResolvedPubkeyIndexEntries
|
|
if maxEntries <= 0 {
|
|
maxEntries = defaultMaxResolvedPubkeyIndexEntries
|
|
}
|
|
fwdLen := len(s.resolvedPubkeyIndex)
|
|
revLen := len(s.resolvedPubkeyReverse)
|
|
if fwdLen > maxEntries || revLen > maxEntries {
|
|
log.Printf("[store] WARNING: resolvedPubkeyIndex size exceeds limit — forward=%d reverse=%d limit=%d",
|
|
fwdLen, revLen, maxEntries)
|
|
}
|
|
}
|