mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-05-25 22:14:02 +00:00
perf: index node path lookups instead of scanning all packets (#572)
## Summary Index node path lookups in `handleNodePaths()` instead of scanning all packets on every request. ## Problem `handleNodePaths()` iterated ALL packets in the store (`O(total_packets × avg_hops)`) with prefix string matching on every hop. This caused user-facing latency on every node detail page load with 30K+ packets. ## Fix Added a `byPathHop` index (`map[string][]*StoreTx`) that maps lowercase hop prefixes and resolved full pubkeys to their transmissions. The handler now does direct map lookups instead of a full scan. ### Index lifecycle - **Built** during `Load()` via `buildPathHopIndex()` - **Incrementally updated** during `IngestNewFromDB()` (new packets) and `IngestNewObservations()` (path changes) - **Cleaned up** during `EvictStale()` (packet removal) ### Query strategy The handler looks up candidates from the index using: 1. Full pubkey (matches resolved hops from `resolved_path`) 2. 2-char prefix (matches short raw hops) 3. 4-char prefix (matches medium raw hops) 4. Any longer raw hops starting with the 4-char prefix This reduces complexity from `O(total_packets × avg_hops)` to `O(matching_txs + unique_hop_keys)`. ## Tests - `TestNodePathsEndpointUsesIndex` — verifies the endpoint returns correct results using the index - `TestPathHopIndexIncrementalUpdate` — verifies add/remove operations on the index All existing tests pass. Fixes #359 Co-authored-by: you <you@example.com>
This commit is contained in:
+35
-22
@@ -1065,16 +1065,44 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
prefix1 := strings.ToLower(pubkey)
|
||||
if len(prefix1) > 2 {
|
||||
prefix1 = prefix1[:2]
|
||||
}
|
||||
prefix2 := strings.ToLower(pubkey)
|
||||
// Use the precomputed byPathHop index instead of scanning all packets.
|
||||
// Look up by full pubkey (resolved hops) and by short prefixes (raw hops).
|
||||
lowerPK := strings.ToLower(pubkey)
|
||||
prefix2 := lowerPK
|
||||
if len(prefix2) > 4 {
|
||||
prefix2 = prefix2[:4]
|
||||
}
|
||||
prefix1 := lowerPK
|
||||
if len(prefix1) > 2 {
|
||||
prefix1 = prefix1[:2]
|
||||
}
|
||||
|
||||
s.store.mu.RLock()
|
||||
_, pm := s.store.getCachedNodesAndPM()
|
||||
|
||||
// Collect candidate transmissions from the index, deduplicating by tx ID.
|
||||
seen := make(map[int]bool)
|
||||
var candidates []*StoreTx
|
||||
addCandidates := func(key string) {
|
||||
for _, tx := range s.store.byPathHop[key] {
|
||||
if !seen[tx.ID] {
|
||||
seen[tx.ID] = true
|
||||
candidates = append(candidates, tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
addCandidates(lowerPK) // full pubkey match (from resolved_path)
|
||||
addCandidates(prefix1) // 2-char raw hop match
|
||||
addCandidates(prefix2) // 4-char raw hop match
|
||||
// Also check any raw hops that start with prefix2 (longer prefixes).
|
||||
// Raw hops are typically 2 chars, so iterate only keys with HasPrefix
|
||||
// on the small set of index keys rather than all packets.
|
||||
for key := range s.store.byPathHop {
|
||||
if len(key) > 4 && len(key) < len(lowerPK) && strings.HasPrefix(key, prefix2) {
|
||||
addCandidates(key)
|
||||
}
|
||||
}
|
||||
|
||||
type pathAgg struct {
|
||||
Hops []PathHopResp
|
||||
Count int
|
||||
@@ -1092,24 +1120,9 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) {
|
||||
hopCache[hop] = r
|
||||
return r
|
||||
}
|
||||
for _, tx := range s.store.packets {
|
||||
hops := txGetParsedPath(tx)
|
||||
if len(hops) == 0 {
|
||||
continue
|
||||
}
|
||||
found := false
|
||||
for _, hop := range hops {
|
||||
hl := strings.ToLower(hop)
|
||||
if hl == prefix1 || hl == prefix2 || strings.HasPrefix(hl, prefix2) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, tx := range candidates {
|
||||
totalTransmissions++
|
||||
hops := txGetParsedPath(tx)
|
||||
resolvedHops := make([]PathHopResp, len(hops))
|
||||
sigParts := make([]string, len(hops))
|
||||
for i, hop := range hops {
|
||||
|
||||
@@ -3431,3 +3431,93 @@ func TestHashCollisionsOnlyRepeaters(t *testing.T) {
|
||||
t.Errorf("expected 2 nodes in collision, got %d", len(collisions[0].Nodes))
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodePathsEndpointUsesIndex(t *testing.T) {
|
||||
srv, router := setupTestServer(t)
|
||||
|
||||
// Verify byPathHop index was built during Load
|
||||
srv.store.mu.RLock()
|
||||
hopKeys := len(srv.store.byPathHop)
|
||||
srv.store.mu.RUnlock()
|
||||
if hopKeys == 0 {
|
||||
t.Fatal("byPathHop index is empty after Load")
|
||||
}
|
||||
|
||||
// Query paths for TestRepeater (pubkey aabbccdd11223344, prefix "aa")
|
||||
// Should find transmissions with hop "aa" in path
|
||||
req := httptest.NewRequest("GET", "/api/nodes/aabbccdd11223344/paths", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != 200 {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp struct {
|
||||
Paths []json.RawMessage `json:"paths"`
|
||||
TotalTransmissions int `json:"totalTransmissions"`
|
||||
}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("bad JSON: %v", err)
|
||||
}
|
||||
|
||||
// Transmission 1 has path ["aa","bb"] which contains "aa" matching prefix of aabbccdd11223344
|
||||
if resp.TotalTransmissions == 0 {
|
||||
t.Error("expected at least 1 transmission matching node paths")
|
||||
}
|
||||
if len(resp.Paths) == 0 {
|
||||
t.Error("expected at least 1 path group")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPathHopIndexIncrementalUpdate(t *testing.T) {
|
||||
// Test that addTxToPathHopIndex and removeTxFromPathHopIndex work correctly
|
||||
idx := make(map[string][]*StoreTx)
|
||||
|
||||
pk1 := "fullpubkey1"
|
||||
tx1 := &StoreTx{
|
||||
ID: 1,
|
||||
PathJSON: `["ab","cd"]`,
|
||||
ResolvedPath: []*string{&pk1, nil},
|
||||
}
|
||||
|
||||
addTxToPathHopIndex(idx, tx1)
|
||||
|
||||
// Should be indexed under "ab", "cd", and "fullpubkey1"
|
||||
if len(idx["ab"]) != 1 {
|
||||
t.Errorf("expected 1 entry for 'ab', got %d", len(idx["ab"]))
|
||||
}
|
||||
if len(idx["cd"]) != 1 {
|
||||
t.Errorf("expected 1 entry for 'cd', got %d", len(idx["cd"]))
|
||||
}
|
||||
if len(idx["fullpubkey1"]) != 1 {
|
||||
t.Errorf("expected 1 entry for resolved pubkey, got %d", len(idx["fullpubkey1"]))
|
||||
}
|
||||
|
||||
// Add another tx with overlapping hop
|
||||
tx2 := &StoreTx{
|
||||
ID: 2,
|
||||
PathJSON: `["ab","ef"]`,
|
||||
}
|
||||
addTxToPathHopIndex(idx, tx2)
|
||||
|
||||
if len(idx["ab"]) != 2 {
|
||||
t.Errorf("expected 2 entries for 'ab', got %d", len(idx["ab"]))
|
||||
}
|
||||
if len(idx["ef"]) != 1 {
|
||||
t.Errorf("expected 1 entry for 'ef', got %d", len(idx["ef"]))
|
||||
}
|
||||
|
||||
// Remove tx1
|
||||
removeTxFromPathHopIndex(idx, tx1)
|
||||
|
||||
if len(idx["ab"]) != 1 {
|
||||
t.Errorf("expected 1 entry for 'ab' after removal, got %d", len(idx["ab"]))
|
||||
}
|
||||
if _, ok := idx["cd"]; ok {
|
||||
t.Error("expected 'cd' key to be deleted after removal")
|
||||
}
|
||||
if _, ok := idx["fullpubkey1"]; ok {
|
||||
t.Error("expected resolved pubkey key to be deleted after removal")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,6 +94,7 @@ type PacketStore struct {
|
||||
byObserver map[string][]*StoreObs // observer_id → observations
|
||||
byNode map[string][]*StoreTx // pubkey → transmissions
|
||||
nodeHashes map[string]map[string]bool // pubkey → Set<hash>
|
||||
byPathHop map[string][]*StoreTx // lowercase hop/pubkey → transmissions with that hop in path
|
||||
byPayloadType map[int][]*StoreTx // payload_type → transmissions
|
||||
loaded bool
|
||||
totalObs int
|
||||
@@ -208,6 +209,7 @@ func NewPacketStore(db *DB, cfg *PacketStoreConfig) *PacketStore {
|
||||
byObsID: make(map[int]*StoreObs, 65536),
|
||||
byObserver: make(map[string][]*StoreObs),
|
||||
byNode: make(map[string][]*StoreTx),
|
||||
byPathHop: make(map[string][]*StoreTx),
|
||||
nodeHashes: make(map[string]map[string]bool),
|
||||
byPayloadType: make(map[int][]*StoreTx),
|
||||
rfCache: make(map[string]*cachedResult),
|
||||
@@ -371,6 +373,9 @@ func (s *PacketStore) Load() error {
|
||||
// Build precomputed subpath index for O(1) analytics queries
|
||||
s.buildSubpathIndex()
|
||||
|
||||
// Build path-hop index for O(1) node path lookups
|
||||
s.buildPathHopIndex()
|
||||
|
||||
// Precompute distance analytics (hop distances, path totals)
|
||||
s.buildDistanceIndex()
|
||||
s.distLast = time.Now()
|
||||
@@ -680,6 +685,7 @@ func (s *PacketStore) GetPerfStoreStats() map[string]interface{} {
|
||||
obsIdx := len(s.byObsID)
|
||||
observerIdx := len(s.byObserver)
|
||||
nodeIdx := len(s.byNode)
|
||||
pathHopIdx := len(s.byPathHop)
|
||||
ptIdx := len(s.byPayloadType)
|
||||
|
||||
// Distinct advert pubkey count — precomputed incrementally (see trackAdvertPubkey).
|
||||
@@ -707,6 +713,7 @@ func (s *PacketStore) GetPerfStoreStats() map[string]interface{} {
|
||||
"byObsID": obsIdx,
|
||||
"byObserver": observerIdx,
|
||||
"byNode": nodeIdx,
|
||||
"byPathHop": pathHopIdx,
|
||||
"byPayloadType": ptIdx,
|
||||
"advertByObserver": advertByObsCount,
|
||||
},
|
||||
@@ -1237,6 +1244,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
|
||||
if addTxToSubpathIndex(s.spIndex, tx) {
|
||||
s.spTotalPaths++
|
||||
}
|
||||
addTxToPathHopIndex(s.byPathHop, tx)
|
||||
}
|
||||
|
||||
// Incrementally update precomputed distance index with new transmissions
|
||||
@@ -1565,8 +1573,10 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
|
||||
// Re-pick best observation for updated transmissions and update subpath index
|
||||
// if the path changed.
|
||||
oldPaths := make(map[int]string, len(updatedTxs))
|
||||
oldResolvedPaths := make(map[int][]*string, len(updatedTxs))
|
||||
for txID, tx := range updatedTxs {
|
||||
oldPaths[txID] = tx.PathJSON
|
||||
oldResolvedPaths[txID] = tx.ResolvedPath
|
||||
}
|
||||
for _, tx := range updatedTxs {
|
||||
pickBestObservation(tx)
|
||||
@@ -1584,11 +1594,22 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
|
||||
}
|
||||
tx.parsedPath, tx.pathParsed = saved, savedFlag
|
||||
}
|
||||
// Remove old path-hop index entries using old hops + old resolved path.
|
||||
if len(oldHops) > 0 {
|
||||
saved, savedFlag := tx.parsedPath, tx.pathParsed
|
||||
savedRP := tx.ResolvedPath
|
||||
tx.parsedPath, tx.pathParsed = oldHops, true
|
||||
tx.ResolvedPath = oldResolvedPaths[txID]
|
||||
removeTxFromPathHopIndex(s.byPathHop, tx)
|
||||
tx.parsedPath, tx.pathParsed = saved, savedFlag
|
||||
tx.ResolvedPath = savedRP
|
||||
}
|
||||
// pickBestObservation already set pathParsed=false so
|
||||
// addTxToSubpathIndex will re-parse the new path.
|
||||
if addTxToSubpathIndex(s.spIndex, tx) {
|
||||
s.spTotalPaths++
|
||||
}
|
||||
addTxToPathHopIndex(s.byPathHop, tx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2006,6 +2027,78 @@ func (s *PacketStore) buildSubpathIndex() {
|
||||
len(s.spIndex), s.spTotalPaths)
|
||||
}
|
||||
|
||||
// buildPathHopIndex scans all packets and populates byPathHop.
|
||||
// Must be called with s.mu held.
|
||||
func (s *PacketStore) buildPathHopIndex() {
|
||||
s.byPathHop = make(map[string][]*StoreTx, 4096)
|
||||
for _, tx := range s.packets {
|
||||
addTxToPathHopIndex(s.byPathHop, tx)
|
||||
}
|
||||
log.Printf("[store] Built path-hop index: %d unique keys", len(s.byPathHop))
|
||||
}
|
||||
|
||||
// addTxToPathHopIndex indexes a transmission under each unique hop key
|
||||
// (raw lowercase hop + resolved full pubkey from ResolvedPath).
|
||||
func addTxToPathHopIndex(idx map[string][]*StoreTx, tx *StoreTx) {
|
||||
hops := txGetParsedPath(tx)
|
||||
if len(hops) == 0 {
|
||||
return
|
||||
}
|
||||
seen := make(map[string]bool, len(hops)*2)
|
||||
for i, hop := range hops {
|
||||
key := strings.ToLower(hop)
|
||||
if !seen[key] {
|
||||
seen[key] = true
|
||||
idx[key] = append(idx[key], tx)
|
||||
}
|
||||
// Also index by resolved pubkey if available
|
||||
if tx.ResolvedPath != nil && i < len(tx.ResolvedPath) && tx.ResolvedPath[i] != nil {
|
||||
pk := *tx.ResolvedPath[i]
|
||||
if !seen[pk] {
|
||||
seen[pk] = true
|
||||
idx[pk] = append(idx[pk], tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// removeTxFromPathHopIndex removes a transmission from all its path-hop index entries.
|
||||
func removeTxFromPathHopIndex(idx map[string][]*StoreTx, tx *StoreTx) {
|
||||
hops := txGetParsedPath(tx)
|
||||
if len(hops) == 0 {
|
||||
return
|
||||
}
|
||||
seen := make(map[string]bool, len(hops)*2)
|
||||
for i, hop := range hops {
|
||||
key := strings.ToLower(hop)
|
||||
if !seen[key] {
|
||||
seen[key] = true
|
||||
removeTxFromSlice(idx, key, tx)
|
||||
}
|
||||
if tx.ResolvedPath != nil && i < len(tx.ResolvedPath) && tx.ResolvedPath[i] != nil {
|
||||
pk := *tx.ResolvedPath[i]
|
||||
if !seen[pk] {
|
||||
seen[pk] = true
|
||||
removeTxFromSlice(idx, pk, tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// removeTxFromSlice removes tx from idx[key] by ID, deleting the key if empty.
|
||||
func removeTxFromSlice(idx map[string][]*StoreTx, key string, tx *StoreTx) {
|
||||
list := idx[key]
|
||||
for i, t := range list {
|
||||
if t.ID == tx.ID {
|
||||
idx[key] = append(list[:i], list[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(idx[key]) == 0 {
|
||||
delete(idx, key)
|
||||
}
|
||||
}
|
||||
|
||||
// buildDistanceIndex precomputes haversine distances for all packets.
|
||||
// Must be called with s.mu held (Lock).
|
||||
func (s *PacketStore) buildDistanceIndex() {
|
||||
@@ -2182,6 +2275,8 @@ func (s *PacketStore) EvictStale() int {
|
||||
|
||||
// Remove from subpath index
|
||||
removeTxFromSubpathIndex(s.spIndex, tx)
|
||||
// Remove from path-hop index
|
||||
removeTxFromPathHopIndex(s.byPathHop, tx)
|
||||
}
|
||||
|
||||
// Remove from distance indexes — filter out records referencing evicted txs
|
||||
|
||||
Reference in New Issue
Block a user