From cb8a2e15c8f769f1259873f85e0e4a786223fc48 Mon Sep 17 00:00:00 2001 From: Kpa-clawbot Date: Sat, 4 Apr 2026 09:25:18 -0700 Subject: [PATCH] perf: index node path lookups instead of scanning all packets (#572) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary Index node path lookups in `handleNodePaths()` instead of scanning all packets on every request. ## Problem `handleNodePaths()` iterated ALL packets in the store (`O(total_packets × avg_hops)`) with prefix string matching on every hop. This caused user-facing latency on every node detail page load with 30K+ packets. ## Fix Added a `byPathHop` index (`map[string][]*StoreTx`) that maps lowercase hop prefixes and resolved full pubkeys to their transmissions. The handler now does direct map lookups instead of a full scan. ### Index lifecycle - **Built** during `Load()` via `buildPathHopIndex()` - **Incrementally updated** during `IngestNewFromDB()` (new packets) and `IngestNewObservations()` (path changes) - **Cleaned up** during `EvictStale()` (packet removal) ### Query strategy The handler looks up candidates from the index using: 1. Full pubkey (matches resolved hops from `resolved_path`) 2. 2-char prefix (matches short raw hops) 3. 4-char prefix (matches medium raw hops) 4. Any longer raw hops starting with the 4-char prefix This reduces complexity from `O(total_packets × avg_hops)` to `O(matching_txs + unique_hop_keys)`. ## Tests - `TestNodePathsEndpointUsesIndex` — verifies the endpoint returns correct results using the index - `TestPathHopIndexIncrementalUpdate` — verifies add/remove operations on the index All existing tests pass. Fixes #359 Co-authored-by: you --- cmd/server/routes.go | 57 ++++++++++++++--------- cmd/server/routes_test.go | 90 +++++++++++++++++++++++++++++++++++++ cmd/server/store.go | 95 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 220 insertions(+), 22 deletions(-) diff --git a/cmd/server/routes.go b/cmd/server/routes.go index ef0b579e..591b2ae1 100644 --- a/cmd/server/routes.go +++ b/cmd/server/routes.go @@ -1065,16 +1065,44 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) { return } - prefix1 := strings.ToLower(pubkey) - if len(prefix1) > 2 { - prefix1 = prefix1[:2] - } - prefix2 := strings.ToLower(pubkey) + // Use the precomputed byPathHop index instead of scanning all packets. + // Look up by full pubkey (resolved hops) and by short prefixes (raw hops). + lowerPK := strings.ToLower(pubkey) + prefix2 := lowerPK if len(prefix2) > 4 { prefix2 = prefix2[:4] } + prefix1 := lowerPK + if len(prefix1) > 2 { + prefix1 = prefix1[:2] + } + s.store.mu.RLock() _, pm := s.store.getCachedNodesAndPM() + + // Collect candidate transmissions from the index, deduplicating by tx ID. + seen := make(map[int]bool) + var candidates []*StoreTx + addCandidates := func(key string) { + for _, tx := range s.store.byPathHop[key] { + if !seen[tx.ID] { + seen[tx.ID] = true + candidates = append(candidates, tx) + } + } + } + addCandidates(lowerPK) // full pubkey match (from resolved_path) + addCandidates(prefix1) // 2-char raw hop match + addCandidates(prefix2) // 4-char raw hop match + // Also check any raw hops that start with prefix2 (longer prefixes). + // Raw hops are typically 2 chars, so iterate only keys with HasPrefix + // on the small set of index keys rather than all packets. + for key := range s.store.byPathHop { + if len(key) > 4 && len(key) < len(lowerPK) && strings.HasPrefix(key, prefix2) { + addCandidates(key) + } + } + type pathAgg struct { Hops []PathHopResp Count int @@ -1092,24 +1120,9 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) { hopCache[hop] = r return r } - for _, tx := range s.store.packets { - hops := txGetParsedPath(tx) - if len(hops) == 0 { - continue - } - found := false - for _, hop := range hops { - hl := strings.ToLower(hop) - if hl == prefix1 || hl == prefix2 || strings.HasPrefix(hl, prefix2) { - found = true - break - } - } - if !found { - continue - } - + for _, tx := range candidates { totalTransmissions++ + hops := txGetParsedPath(tx) resolvedHops := make([]PathHopResp, len(hops)) sigParts := make([]string, len(hops)) for i, hop := range hops { diff --git a/cmd/server/routes_test.go b/cmd/server/routes_test.go index 6d490ae4..cceda17f 100644 --- a/cmd/server/routes_test.go +++ b/cmd/server/routes_test.go @@ -3431,3 +3431,93 @@ func TestHashCollisionsOnlyRepeaters(t *testing.T) { t.Errorf("expected 2 nodes in collision, got %d", len(collisions[0].Nodes)) } } + +func TestNodePathsEndpointUsesIndex(t *testing.T) { + srv, router := setupTestServer(t) + + // Verify byPathHop index was built during Load + srv.store.mu.RLock() + hopKeys := len(srv.store.byPathHop) + srv.store.mu.RUnlock() + if hopKeys == 0 { + t.Fatal("byPathHop index is empty after Load") + } + + // Query paths for TestRepeater (pubkey aabbccdd11223344, prefix "aa") + // Should find transmissions with hop "aa" in path + req := httptest.NewRequest("GET", "/api/nodes/aabbccdd11223344/paths", nil) + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + + if w.Code != 200 { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp struct { + Paths []json.RawMessage `json:"paths"` + TotalTransmissions int `json:"totalTransmissions"` + } + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("bad JSON: %v", err) + } + + // Transmission 1 has path ["aa","bb"] which contains "aa" matching prefix of aabbccdd11223344 + if resp.TotalTransmissions == 0 { + t.Error("expected at least 1 transmission matching node paths") + } + if len(resp.Paths) == 0 { + t.Error("expected at least 1 path group") + } +} + +func TestPathHopIndexIncrementalUpdate(t *testing.T) { + // Test that addTxToPathHopIndex and removeTxFromPathHopIndex work correctly + idx := make(map[string][]*StoreTx) + + pk1 := "fullpubkey1" + tx1 := &StoreTx{ + ID: 1, + PathJSON: `["ab","cd"]`, + ResolvedPath: []*string{&pk1, nil}, + } + + addTxToPathHopIndex(idx, tx1) + + // Should be indexed under "ab", "cd", and "fullpubkey1" + if len(idx["ab"]) != 1 { + t.Errorf("expected 1 entry for 'ab', got %d", len(idx["ab"])) + } + if len(idx["cd"]) != 1 { + t.Errorf("expected 1 entry for 'cd', got %d", len(idx["cd"])) + } + if len(idx["fullpubkey1"]) != 1 { + t.Errorf("expected 1 entry for resolved pubkey, got %d", len(idx["fullpubkey1"])) + } + + // Add another tx with overlapping hop + tx2 := &StoreTx{ + ID: 2, + PathJSON: `["ab","ef"]`, + } + addTxToPathHopIndex(idx, tx2) + + if len(idx["ab"]) != 2 { + t.Errorf("expected 2 entries for 'ab', got %d", len(idx["ab"])) + } + if len(idx["ef"]) != 1 { + t.Errorf("expected 1 entry for 'ef', got %d", len(idx["ef"])) + } + + // Remove tx1 + removeTxFromPathHopIndex(idx, tx1) + + if len(idx["ab"]) != 1 { + t.Errorf("expected 1 entry for 'ab' after removal, got %d", len(idx["ab"])) + } + if _, ok := idx["cd"]; ok { + t.Error("expected 'cd' key to be deleted after removal") + } + if _, ok := idx["fullpubkey1"]; ok { + t.Error("expected resolved pubkey key to be deleted after removal") + } +} diff --git a/cmd/server/store.go b/cmd/server/store.go index 3237887c..08b95cc2 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -94,6 +94,7 @@ type PacketStore struct { byObserver map[string][]*StoreObs // observer_id → observations byNode map[string][]*StoreTx // pubkey → transmissions nodeHashes map[string]map[string]bool // pubkey → Set + byPathHop map[string][]*StoreTx // lowercase hop/pubkey → transmissions with that hop in path byPayloadType map[int][]*StoreTx // payload_type → transmissions loaded bool totalObs int @@ -208,6 +209,7 @@ func NewPacketStore(db *DB, cfg *PacketStoreConfig) *PacketStore { byObsID: make(map[int]*StoreObs, 65536), byObserver: make(map[string][]*StoreObs), byNode: make(map[string][]*StoreTx), + byPathHop: make(map[string][]*StoreTx), nodeHashes: make(map[string]map[string]bool), byPayloadType: make(map[int][]*StoreTx), rfCache: make(map[string]*cachedResult), @@ -371,6 +373,9 @@ func (s *PacketStore) Load() error { // Build precomputed subpath index for O(1) analytics queries s.buildSubpathIndex() + // Build path-hop index for O(1) node path lookups + s.buildPathHopIndex() + // Precompute distance analytics (hop distances, path totals) s.buildDistanceIndex() s.distLast = time.Now() @@ -680,6 +685,7 @@ func (s *PacketStore) GetPerfStoreStats() map[string]interface{} { obsIdx := len(s.byObsID) observerIdx := len(s.byObserver) nodeIdx := len(s.byNode) + pathHopIdx := len(s.byPathHop) ptIdx := len(s.byPayloadType) // Distinct advert pubkey count — precomputed incrementally (see trackAdvertPubkey). @@ -707,6 +713,7 @@ func (s *PacketStore) GetPerfStoreStats() map[string]interface{} { "byObsID": obsIdx, "byObserver": observerIdx, "byNode": nodeIdx, + "byPathHop": pathHopIdx, "byPayloadType": ptIdx, "advertByObserver": advertByObsCount, }, @@ -1237,6 +1244,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac if addTxToSubpathIndex(s.spIndex, tx) { s.spTotalPaths++ } + addTxToPathHopIndex(s.byPathHop, tx) } // Incrementally update precomputed distance index with new transmissions @@ -1565,8 +1573,10 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] // Re-pick best observation for updated transmissions and update subpath index // if the path changed. oldPaths := make(map[int]string, len(updatedTxs)) + oldResolvedPaths := make(map[int][]*string, len(updatedTxs)) for txID, tx := range updatedTxs { oldPaths[txID] = tx.PathJSON + oldResolvedPaths[txID] = tx.ResolvedPath } for _, tx := range updatedTxs { pickBestObservation(tx) @@ -1584,11 +1594,22 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] } tx.parsedPath, tx.pathParsed = saved, savedFlag } + // Remove old path-hop index entries using old hops + old resolved path. + if len(oldHops) > 0 { + saved, savedFlag := tx.parsedPath, tx.pathParsed + savedRP := tx.ResolvedPath + tx.parsedPath, tx.pathParsed = oldHops, true + tx.ResolvedPath = oldResolvedPaths[txID] + removeTxFromPathHopIndex(s.byPathHop, tx) + tx.parsedPath, tx.pathParsed = saved, savedFlag + tx.ResolvedPath = savedRP + } // pickBestObservation already set pathParsed=false so // addTxToSubpathIndex will re-parse the new path. if addTxToSubpathIndex(s.spIndex, tx) { s.spTotalPaths++ } + addTxToPathHopIndex(s.byPathHop, tx) } } @@ -2006,6 +2027,78 @@ func (s *PacketStore) buildSubpathIndex() { len(s.spIndex), s.spTotalPaths) } +// buildPathHopIndex scans all packets and populates byPathHop. +// Must be called with s.mu held. +func (s *PacketStore) buildPathHopIndex() { + s.byPathHop = make(map[string][]*StoreTx, 4096) + for _, tx := range s.packets { + addTxToPathHopIndex(s.byPathHop, tx) + } + log.Printf("[store] Built path-hop index: %d unique keys", len(s.byPathHop)) +} + +// addTxToPathHopIndex indexes a transmission under each unique hop key +// (raw lowercase hop + resolved full pubkey from ResolvedPath). +func addTxToPathHopIndex(idx map[string][]*StoreTx, tx *StoreTx) { + hops := txGetParsedPath(tx) + if len(hops) == 0 { + return + } + seen := make(map[string]bool, len(hops)*2) + for i, hop := range hops { + key := strings.ToLower(hop) + if !seen[key] { + seen[key] = true + idx[key] = append(idx[key], tx) + } + // Also index by resolved pubkey if available + if tx.ResolvedPath != nil && i < len(tx.ResolvedPath) && tx.ResolvedPath[i] != nil { + pk := *tx.ResolvedPath[i] + if !seen[pk] { + seen[pk] = true + idx[pk] = append(idx[pk], tx) + } + } + } +} + +// removeTxFromPathHopIndex removes a transmission from all its path-hop index entries. +func removeTxFromPathHopIndex(idx map[string][]*StoreTx, tx *StoreTx) { + hops := txGetParsedPath(tx) + if len(hops) == 0 { + return + } + seen := make(map[string]bool, len(hops)*2) + for i, hop := range hops { + key := strings.ToLower(hop) + if !seen[key] { + seen[key] = true + removeTxFromSlice(idx, key, tx) + } + if tx.ResolvedPath != nil && i < len(tx.ResolvedPath) && tx.ResolvedPath[i] != nil { + pk := *tx.ResolvedPath[i] + if !seen[pk] { + seen[pk] = true + removeTxFromSlice(idx, pk, tx) + } + } + } +} + +// removeTxFromSlice removes tx from idx[key] by ID, deleting the key if empty. +func removeTxFromSlice(idx map[string][]*StoreTx, key string, tx *StoreTx) { + list := idx[key] + for i, t := range list { + if t.ID == tx.ID { + idx[key] = append(list[:i], list[i+1:]...) + break + } + } + if len(idx[key]) == 0 { + delete(idx, key) + } +} + // buildDistanceIndex precomputes haversine distances for all packets. // Must be called with s.mu held (Lock). func (s *PacketStore) buildDistanceIndex() { @@ -2182,6 +2275,8 @@ func (s *PacketStore) EvictStale() int { // Remove from subpath index removeTxFromSubpathIndex(s.spIndex, tx) + // Remove from path-hop index + removeTxFromPathHopIndex(s.byPathHop, tx) } // Remove from distance indexes — filter out records referencing evicted txs