From e893a1b3c40e8a28ad781bbefc6307964fa8b0d2 Mon Sep 17 00:00:00 2001 From: Kpa-clawbot Date: Sat, 11 Apr 2026 21:25:42 -0700 Subject: [PATCH] fix: index relay hops in byNode for liveness tracking (#708) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem Nodes that only appear as relay hops in packet paths (via `resolved_path`) were never indexed in `byNode`, so `last_heard` was never computed for them. This made relay-only nodes show as dead/stale even when actively forwarding traffic. Fixes #660 ## Root Cause `indexByNode()` only indexed pubkeys from decoded JSON fields (`pubKey`, `destPubKey`, `srcPubKey`). Relay nodes appearing in `resolved_path` were ignored entirely. ## Fix `indexByNode()` now also iterates: 1. `ResolvedPath` entries from each observation 2. `tx.ResolvedPath` (best observation's resolved path, used for DB-loaded packets) A per-call `indexed` set prevents double-indexing when the same pubkey appears in both decoded JSON and resolved path. Extracted `addToByNode()` helper to deduplicate the nodeHashes/byNode append logic. ## Scope **Phase 1 only** — server-side in-memory indexing. No DB changes, no ingestor changes. This makes `last_heard` reflect relay activity with zero risk to persistence. ## Tests 5 new test cases in `TestIndexByNodeResolvedPath`: - Resolved path pubkeys from observations get indexed - Null entries in resolved path are skipped - Relay-only nodes (no decoded JSON match) appear in `byNode` - Dedup between decoded JSON and resolved path - `tx.ResolvedPath` indexed when observations are empty All existing tests pass unchanged. ## Complexity O(observations × path_length) per packet — typically 1-3 observations × 1-3 hops. No hot-path regression. --------- Co-authored-by: you --- cmd/server/coverage_test.go | 84 +++++++++++++++ cmd/server/db.go | 11 ++ cmd/server/eviction_test.go | 73 +++++++++++++ cmd/server/store.go | 168 +++++++++++++++++++++++++---- cmd/server/touch_last_seen_test.go | 137 +++++++++++++++++++++++ 5 files changed, 452 insertions(+), 21 deletions(-) create mode 100644 cmd/server/touch_last_seen_test.go diff --git a/cmd/server/coverage_test.go b/cmd/server/coverage_test.go index d050cac..cf9e7ac 100644 --- a/cmd/server/coverage_test.go +++ b/cmd/server/coverage_test.go @@ -4133,6 +4133,90 @@ func TestIndexByNodePreCheck(t *testing.T) { }) } +// TestIndexByNodeResolvedPath tests that resolved_path entries are indexed in byNode. +func TestIndexByNodeResolvedPath(t *testing.T) { + store := &PacketStore{ + byNode: make(map[string][]*StoreTx), + nodeHashes: make(map[string]map[string]bool), + } + + t.Run("indexes resolved path pubkeys from observations", func(t *testing.T) { + relayPK := "aabb1122334455ff" + tx := &StoreTx{ + Hash: "rp1", + DecodedJSON: `{"type":"CHAN","text":"hello"}`, // no pubKey fields + Observations: []*StoreObs{ + {ResolvedPath: []*string{&relayPK}}, + }, + } + store.indexByNode(tx) + if len(store.byNode[relayPK]) != 1 { + t.Errorf("expected relay pubkey indexed, got %d", len(store.byNode[relayPK])) + } + }) + + t.Run("skips null entries in resolved path", func(t *testing.T) { + pk := "cc11dd22ee33ff44" + tx := &StoreTx{ + Hash: "rp2", + Observations: []*StoreObs{ + {ResolvedPath: []*string{nil, &pk, nil}}, + }, + } + store.indexByNode(tx) + if len(store.byNode[pk]) != 1 { + t.Errorf("expected resolved pubkey indexed, got %d", len(store.byNode[pk])) + } + // Verify nil entries didn't create empty-string keys + if _, exists := store.byNode[""]; exists { + t.Error("nil/empty resolved path entries should not create byNode entries") + } + }) + + t.Run("relay-only node appears in byNode", func(t *testing.T) { + // A packet with no decoded pubkey fields, only a relay in resolved path + relayOnly := "relay0only0pubkey" + tx := &StoreTx{ + Hash: "rp3", + // No DecodedJSON at all — pure relay + Observations: []*StoreObs{ + {ResolvedPath: []*string{&relayOnly}}, + }, + } + store.indexByNode(tx) + if len(store.byNode[relayOnly]) != 1 { + t.Errorf("expected relay-only node indexed, got %d", len(store.byNode[relayOnly])) + } + }) + + t.Run("dedup between decoded JSON and resolved path", func(t *testing.T) { + pk := "dedup0test0pk1234" + tx := &StoreTx{ + Hash: "rp4", + DecodedJSON: `{"pubKey":"` + pk + `"}`, + Observations: []*StoreObs{ + {ResolvedPath: []*string{&pk}}, + }, + } + store.indexByNode(tx) + if len(store.byNode[pk]) != 1 { + t.Errorf("expected dedup to keep 1 entry, got %d", len(store.byNode[pk])) + } + }) + + t.Run("indexes tx.ResolvedPath when observations empty", func(t *testing.T) { + rpPK := "txlevel0resolved1" + tx := &StoreTx{ + Hash: "rp5", + ResolvedPath: []*string{&rpPK}, + } + store.indexByNode(tx) + if len(store.byNode[rpPK]) != 1 { + t.Errorf("expected tx-level resolved path indexed, got %d", len(store.byNode[rpPK])) + } + }) +} + // BenchmarkIndexByNode measures indexByNode performance with and without pubkey // fields to demonstrate the strings.Contains pre-check optimization. func BenchmarkIndexByNode(b *testing.B) { diff --git a/cmd/server/db.go b/cmd/server/db.go index b0ed653..aadcebd 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -2068,3 +2068,14 @@ func (db *DB) PruneOldMetrics(retentionDays int) (int64, error) { } return n, nil } + +// TouchNodeLastSeen updates last_seen for a node identified by full public key. +// Only updates if the new timestamp is newer than the existing value (or NULL). +// Returns nil even if no rows are affected (node doesn't exist). +func (db *DB) TouchNodeLastSeen(pubkey string, timestamp string) error { + _, err := db.conn.Exec( + "UPDATE nodes SET last_seen = ? WHERE public_key = ? AND (last_seen IS NULL OR last_seen < ?)", + timestamp, pubkey, timestamp, + ) + return err +} diff --git a/cmd/server/eviction_test.go b/cmd/server/eviction_test.go index 94aaa6e..b1414d5 100644 --- a/cmd/server/eviction_test.go +++ b/cmd/server/eviction_test.go @@ -239,6 +239,79 @@ func TestEvictStale_CleansNodeIndexes(t *testing.T) { } } +func TestEvictStale_CleansResolvedPathNodeIndexes(t *testing.T) { + now := time.Now().UTC() + store := &PacketStore{ + packets: make([]*StoreTx, 0), + byHash: make(map[string]*StoreTx), + byTxID: make(map[int]*StoreTx), + byObsID: make(map[int]*StoreObs), + byObserver: make(map[string][]*StoreObs), + byNode: make(map[string][]*StoreTx), + nodeHashes: make(map[string]map[string]bool), + byPayloadType: make(map[int][]*StoreTx), + spIndex: make(map[string]int), + distHops: make([]distHopRecord, 0), + distPaths: make([]distPathRecord, 0), + rfCache: make(map[string]*cachedResult), + topoCache: make(map[string]*cachedResult), + hashCache: make(map[string]*cachedResult), + chanCache: make(map[string]*cachedResult), + distCache: make(map[string]*cachedResult), + subpathCache: make(map[string]*cachedResult), + rfCacheTTL: 15 * time.Second, + retentionHours: 24, + } + + // Create a packet indexed only via resolved_path (no decoded JSON pubkeys) + relayPK := "relay0001abcdef" + tx := &StoreTx{ + ID: 1, + Hash: "hash_rp_001", + FirstSeen: now.Add(-48 * time.Hour).UTC().Format(time.RFC3339), + } + rpPtr := &relayPK + obs := &StoreObs{ + ID: 100, + TransmissionID: 1, + ObserverID: "obs0", + Timestamp: tx.FirstSeen, + ResolvedPath: []*string{rpPtr}, + } + tx.Observations = append(tx.Observations, obs) + tx.ResolvedPath = []*string{rpPtr} + + store.packets = append(store.packets, tx) + store.byHash[tx.Hash] = tx + store.byTxID[tx.ID] = tx + store.byObsID[obs.ID] = obs + store.byObserver["obs0"] = append(store.byObserver["obs0"], obs) + + // Index via resolved_path + store.indexByNode(tx) + + // Verify indexed + if len(store.byNode[relayPK]) != 1 { + t.Fatalf("expected 1 entry in byNode[%s], got %d", relayPK, len(store.byNode[relayPK])) + } + if !store.nodeHashes[relayPK][tx.Hash] { + t.Fatalf("expected nodeHashes[%s] to contain %s", relayPK, tx.Hash) + } + + evicted := store.EvictStale() + if evicted != 1 { + t.Fatalf("expected 1 evicted, got %d", evicted) + } + + // Verify resolved_path entries are cleaned up + if len(store.byNode[relayPK]) != 0 { + t.Fatalf("expected byNode[%s] to be empty after eviction, got %d", relayPK, len(store.byNode[relayPK])) + } + if _, exists := store.nodeHashes[relayPK]; exists { + t.Fatalf("expected nodeHashes[%s] to be deleted after eviction", relayPK) + } +} + func TestEvictStale_RunEvictionThreadSafe(t *testing.T) { now := time.Now().UTC() store := makeTestStore(20, now.Add(-48*time.Hour), 0) diff --git a/cmd/server/store.go b/cmd/server/store.go index f86cef5..e56a5c8 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -193,6 +193,10 @@ type PacketStore struct { // Updated incrementally during Load/Ingest/Evict — avoids JSON parsing in GetPerfStoreStats. advertPubkeys map[string]int // pubkey → number of advert packets referencing it + // Debounce map for touchRelayLastSeen: pubkey → last time we wrote last_seen to DB. + // Limits DB writes to at most 1 per node per 5 minutes. + lastSeenTouched map[string]time.Time + // Persisted neighbor graph for hop resolution at ingest time. graph *NeighborGraph @@ -297,7 +301,8 @@ func NewPacketStore(db *DB, cfg *PacketStoreConfig, cacheTTLs ...map[string]inte invCooldown: 10 * time.Second, spIndex: make(map[string]int, 4096), spTxIndex: make(map[string][]*StoreTx, 4096), - advertPubkeys: make(map[string]int), + advertPubkeys: make(map[string]int), + lastSeenTouched: make(map[string]time.Time), } if cfg != nil { ps.retentionHours = cfg.RetentionHours @@ -512,28 +517,103 @@ func pathLen(pathJSON string) int { // indexByNode extracts pubkeys from decoded_json and indexes the transmission. func (s *PacketStore) indexByNode(tx *StoreTx) { - if tx.DecodedJSON == "" { - return - } - // All three target fields ("pubKey", "destPubKey", "srcPubKey") share the - // common suffix "ubKey" — skip JSON parse for packets that have none of them. - if !strings.Contains(tx.DecodedJSON, "ubKey") { - return - } - decoded := tx.ParsedDecoded() - if decoded == nil { - return - } - for _, field := range []string{"pubKey", "destPubKey", "srcPubKey"} { - if v, ok := decoded[field].(string); ok && v != "" { - if s.nodeHashes[v] == nil { - s.nodeHashes[v] = make(map[string]bool) + // Track which pubkeys have been indexed for this packet to avoid duplicates + // when the same pubkey appears in both decoded JSON and resolved path. + indexed := make(map[string]bool) + + // Index by decoded JSON fields (pubKey, destPubKey, srcPubKey). + if tx.DecodedJSON != "" && strings.Contains(tx.DecodedJSON, "ubKey") { + if decoded := tx.ParsedDecoded(); decoded != nil { + for _, field := range []string{"pubKey", "destPubKey", "srcPubKey"} { + if v, ok := decoded[field].(string); ok && v != "" { + s.addToByNode(tx, v) + indexed[v] = true + } } - if s.nodeHashes[v][tx.Hash] { + } + } + + // Index by resolved path entries — relay nodes that forwarded this packet. + for _, obs := range tx.Observations { + for _, rp := range obs.ResolvedPath { + if rp == nil { continue } - s.nodeHashes[v][tx.Hash] = true - s.byNode[v] = append(s.byNode[v], tx) + pk := *rp + if pk == "" || indexed[pk] { + continue + } + s.addToByNode(tx, pk) + indexed[pk] = true + } + } + // Also check tx.ResolvedPath (best observation's resolved path) for packets + // loaded from DB where Observations may be empty. + for _, rp := range tx.ResolvedPath { + if rp == nil { + continue + } + pk := *rp + if pk == "" || indexed[pk] { + continue + } + s.addToByNode(tx, pk) + indexed[pk] = true + } +} + +// addToByNode adds tx to byNode[pubkey] with dedup via nodeHashes. +func (s *PacketStore) addToByNode(tx *StoreTx, pubkey string) { + if s.nodeHashes[pubkey] == nil { + s.nodeHashes[pubkey] = make(map[string]bool) + } + if s.nodeHashes[pubkey][tx.Hash] { + return + } + s.nodeHashes[pubkey][tx.Hash] = true + s.byNode[pubkey] = append(s.byNode[pubkey], tx) +} + +// touchRelayLastSeen updates last_seen in the DB for relay nodes that appear +// in resolved_path entries. Debounced to at most 1 write per node per 5 minutes. +// Must be called under s.mu write lock (reads/writes lastSeenTouched). +func (s *PacketStore) touchRelayLastSeen(tx *StoreTx, now time.Time) { + if s.db == nil { + return + } + const debounceInterval = 5 * time.Minute + + seen := make(map[string]bool) + // Collect unique non-nil resolved pubkeys from all observations. + for _, obs := range tx.Observations { + for _, rp := range obs.ResolvedPath { + if rp == nil { + continue + } + pk := *rp + if pk != "" { + seen[pk] = true + } + } + } + // Also check tx.ResolvedPath (best observation, used after Load). + for _, rp := range tx.ResolvedPath { + if rp == nil { + continue + } + pk := *rp + if pk != "" { + seen[pk] = true + } + } + + ts := now.UTC().Format(time.RFC3339) + for pk := range seen { + if last, ok := s.lastSeenTouched[pk]; ok && now.Sub(last) < debounceInterval { + continue + } + if err := s.db.TouchNodeLastSeen(pk, ts); err == nil { + s.lastSeenTouched[pk] = now } } } @@ -1358,6 +1438,12 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac pickBestObservation(tx) } + // Phase 2 of #660: update last_seen in DB for relay nodes seen in resolved_path. + now := time.Now() + for _, tx := range broadcastTxs { + s.touchRelayLastSeen(tx, now) + } + // Incrementally update precomputed subpath index with new transmissions for _, tx := range broadcastTxs { if addTxToSubpathIndexFull(s.spIndex, s.spTxIndex, tx) { @@ -2563,7 +2649,9 @@ func (s *PacketStore) EvictStale() int { affectedPayloadTypes[*tx.PayloadType] = struct{}{} } - // Remove from nodeHashes and collect affected node keys + // Remove from nodeHashes and collect affected node keys. + // Must mirror indexByNode: process decoded JSON fields AND resolved_path pubkeys. + evictedFromNode := make(map[string]bool) if tx.DecodedJSON != "" { var decoded map[string]interface{} if json.Unmarshal([]byte(tx.DecodedJSON), &decoded) == nil { @@ -2576,10 +2664,48 @@ func (s *PacketStore) EvictStale() int { } } affectedNodes[v] = struct{}{} + evictedFromNode[v] = true } } } } + // Clean up resolved_path pubkeys from byNode/nodeHashes + for _, obs := range tx.Observations { + for _, rp := range obs.ResolvedPath { + if rp == nil { + continue + } + pk := *rp + if pk == "" || evictedFromNode[pk] { + continue + } + if hashes, ok := s.nodeHashes[pk]; ok { + delete(hashes, tx.Hash) + if len(hashes) == 0 { + delete(s.nodeHashes, pk) + } + } + affectedNodes[pk] = struct{}{} + evictedFromNode[pk] = true + } + } + for _, rp := range tx.ResolvedPath { + if rp == nil { + continue + } + pk := *rp + if pk == "" || evictedFromNode[pk] { + continue + } + if hashes, ok := s.nodeHashes[pk]; ok { + delete(hashes, tx.Hash) + if len(hashes) == 0 { + delete(s.nodeHashes, pk) + } + } + affectedNodes[pk] = struct{}{} + evictedFromNode[pk] = true + } // Remove from subpath index removeTxFromSubpathIndexFull(s.spIndex, s.spTxIndex, tx) diff --git a/cmd/server/touch_last_seen_test.go b/cmd/server/touch_last_seen_test.go new file mode 100644 index 0000000..f4b2be6 --- /dev/null +++ b/cmd/server/touch_last_seen_test.go @@ -0,0 +1,137 @@ +package main + +import ( + "database/sql" + "testing" + "time" + + _ "modernc.org/sqlite" +) + +func TestTouchNodeLastSeen_UpdatesDB(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + // Insert a node with no last_seen + db.conn.Exec("INSERT INTO nodes (public_key, name, role) VALUES (?, ?, ?)", "abc123", "relay1", "REPEATER") + + err := db.TouchNodeLastSeen("abc123", "2026-04-12T04:00:00Z") + if err != nil { + t.Fatalf("TouchNodeLastSeen returned error: %v", err) + } + + var lastSeen sql.NullString + db.conn.QueryRow("SELECT last_seen FROM nodes WHERE public_key = ?", "abc123").Scan(&lastSeen) + if !lastSeen.Valid || lastSeen.String != "2026-04-12T04:00:00Z" { + t.Fatalf("expected last_seen=2026-04-12T04:00:00Z, got %v", lastSeen) + } +} + +func TestTouchNodeLastSeen_DoesNotGoBackwards(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)", + "abc123", "relay1", "REPEATER", "2026-04-12T05:00:00Z") + + // Try to set an older timestamp + err := db.TouchNodeLastSeen("abc123", "2026-04-12T04:00:00Z") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + var lastSeen string + db.conn.QueryRow("SELECT last_seen FROM nodes WHERE public_key = ?", "abc123").Scan(&lastSeen) + if lastSeen != "2026-04-12T05:00:00Z" { + t.Fatalf("last_seen went backwards: got %s", lastSeen) + } +} + +func TestTouchNodeLastSeen_NonExistentNode(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + // Should not error for non-existent node + err := db.TouchNodeLastSeen("nonexistent", "2026-04-12T04:00:00Z") + if err != nil { + t.Fatalf("unexpected error for non-existent node: %v", err) + } +} + +func TestTouchRelayLastSeen_Debouncing(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + db.conn.Exec("INSERT INTO nodes (public_key, name, role) VALUES (?, ?, ?)", "relay1", "R1", "REPEATER") + + s := &PacketStore{ + db: db, + lastSeenTouched: make(map[string]time.Time), + } + + pk := "relay1" + tx := &StoreTx{ + ResolvedPath: []*string{&pk}, + } + + now := time.Now() + s.touchRelayLastSeen(tx, now) + + // Verify it was written + var lastSeen sql.NullString + db.conn.QueryRow("SELECT last_seen FROM nodes WHERE public_key = ?", "relay1").Scan(&lastSeen) + if !lastSeen.Valid { + t.Fatal("expected last_seen to be set after first touch") + } + + // Reset last_seen to check debounce prevents second write + db.conn.Exec("UPDATE nodes SET last_seen = NULL WHERE public_key = ?", "relay1") + + // Call again within 5 minutes — should be debounced (no write) + s.touchRelayLastSeen(tx, now.Add(2*time.Minute)) + + db.conn.QueryRow("SELECT last_seen FROM nodes WHERE public_key = ?", "relay1").Scan(&lastSeen) + if lastSeen.Valid { + t.Fatal("expected debounce to prevent second write within 5 minutes") + } + + // Call after 5 minutes — should write again + s.touchRelayLastSeen(tx, now.Add(6*time.Minute)) + db.conn.QueryRow("SELECT last_seen FROM nodes WHERE public_key = ?", "relay1").Scan(&lastSeen) + if !lastSeen.Valid { + t.Fatal("expected write after debounce interval expired") + } +} + +func TestTouchRelayLastSeen_SkipsNilResolvedPath(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + s := &PacketStore{ + db: db, + lastSeenTouched: make(map[string]time.Time), + } + + // tx with nil entries and empty resolved_path + tx := &StoreTx{ + ResolvedPath: []*string{nil, nil}, + } + + // Should not panic or error + s.touchRelayLastSeen(tx, time.Now()) +} + +func TestTouchRelayLastSeen_NilDB(t *testing.T) { + s := &PacketStore{ + db: nil, + lastSeenTouched: make(map[string]time.Time), + } + + pk := "abc" + tx := &StoreTx{ + ResolvedPath: []*string{&pk}, + } + + // Should not panic with nil db + s.touchRelayLastSeen(tx, time.Now()) +}