perf: index subpath detail lookups instead of scanning all packets (#571)

## Summary

`GetSubpathDetail()` iterated ALL packets to find those containing a
specific subpath — `O(packets × hops × subpath_length)`. With 30K+
packets this caused user-visible latency on every subpath detail click.

## Changes

### `cmd/server/store.go`
- Added `spTxIndex map[string][]*StoreTx` alongside existing `spIndex` —
tracks which transmissions contain each subpath key
- Extended `addTxToSubpathIndexFull()` and
`removeTxFromSubpathIndexFull()` to maintain both indexes simultaneously
- Original `addTxToSubpathIndex()`/`removeTxFromSubpathIndex()` wrappers
preserved for backward compatibility
- `buildSubpathIndex()` now populates both `spIndex` and `spTxIndex`
during `Load()`
- All incremental update sites (ingest, path change, eviction) use the
`Full` variants
- `GetSubpathDetail()` rewritten: direct `O(1)` map lookup on
`spTxIndex[key]` instead of scanning all packets

### `cmd/server/coverage_test.go`
- Added `TestSubpathTxIndexPopulated`: verifies `spTxIndex` is
populated, counts match `spIndex`, and `GetSubpathDetail` returns
correct results for both existing and non-existent subpaths

## Complexity

- **Before:** `O(total_packets × avg_hops × subpath_length)` per request
- **After:** `O(matched_txs)` per request (direct map lookup)

## Tests

All tests pass: `cmd/server` (4.6s), `cmd/ingestor` (25.6s)

Fixes #358

---------

Co-authored-by: you <you@example.com>
This commit is contained in:
Kpa-clawbot
2026-04-04 09:35:00 -07:00
committed by GitHub
parent 37300bf5c8
commit d4f2c3ac66
2 changed files with 126 additions and 37 deletions

View File

@@ -2192,6 +2192,84 @@ func TestSubpathPrecomputedIndex(t *testing.T) {
}
}
func TestSubpathTxIndexPopulated(t *testing.T) {
db := setupRichTestDB(t)
defer db.Close()
store := NewPacketStore(db, nil)
store.Load()
// spTxIndex must be populated alongside spIndex
if len(store.spTxIndex) == 0 {
t.Fatal("expected spTxIndex to be populated after Load()")
}
// Every key in spIndex must also exist in spTxIndex with matching count
for key, count := range store.spIndex {
txs, ok := store.spTxIndex[key]
if !ok {
t.Errorf("spTxIndex missing key %q that exists in spIndex", key)
continue
}
if len(txs) != count {
t.Errorf("spTxIndex[%q] has %d txs, spIndex count is %d", key, len(txs), count)
}
}
// GetSubpathDetail should return correct match count via indexed lookup
detail := store.GetSubpathDetail([]string{"eeff", "0011"})
if detail == nil {
t.Fatal("expected non-nil detail for existing subpath")
}
matches, _ := detail["totalMatches"].(int)
if matches != 1 {
t.Errorf("totalMatches = %d, want 1", matches)
}
// Non-existent subpath should return 0 matches
detail2 := store.GetSubpathDetail([]string{"zzzz", "yyyy"})
if detail2 == nil {
t.Fatal("expected non-nil result even for non-existent subpath")
}
matches2, _ := detail2["totalMatches"].(int)
if matches2 != 0 {
t.Errorf("totalMatches for non-existent subpath = %d, want 0", matches2)
}
}
func TestSubpathDetailMixedCaseHops(t *testing.T) {
db := setupRichTestDB(t)
defer db.Close()
store := NewPacketStore(db, nil)
store.Load()
// Query with lowercase hops to establish baseline
lower := store.GetSubpathDetail([]string{"eeff", "0011"})
if lower == nil {
t.Fatal("expected non-nil detail for lowercase subpath")
}
lowerMatches, _ := lower["totalMatches"].(int)
if lowerMatches == 0 {
t.Fatal("expected >0 matches for lowercase subpath")
}
// Query with mixed-case hops — must return the same results (case-insensitive)
mixed := store.GetSubpathDetail([]string{"EEFF", "0011"})
if mixed == nil {
t.Fatal("expected non-nil detail for mixed-case subpath")
}
mixedMatches, _ := mixed["totalMatches"].(int)
if mixedMatches != lowerMatches {
t.Errorf("mixed-case totalMatches = %d, want %d (same as lowercase)", mixedMatches, lowerMatches)
}
// All-uppercase should also match
upper := store.GetSubpathDetail([]string{"EEFF", "0011"})
upperMatches, _ := upper["totalMatches"].(int)
if upperMatches != lowerMatches {
t.Errorf("uppercase totalMatches = %d, want %d", upperMatches, lowerMatches)
}
}
func TestStoreGetAnalyticsRFCacheHit(t *testing.T) {
db := setupRichTestDB(t)
defer db.Close()

View File

@@ -134,8 +134,9 @@ type PacketStore struct {
// Precomputed subpath index: raw comma-joined hops → occurrence count.
// Built during Load(), incrementally updated on ingest. Avoids full
// packet iteration at query time (O(unique_subpaths) vs O(total_packets)).
spIndex map[string]int // "hop1,hop2" → count
spTotalPaths int // transmissions with paths >= 2 hops
spIndex map[string]int // "hop1,hop2" → count
spTxIndex map[string][]*StoreTx // "hop1,hop2" → transmissions containing this subpath
spTotalPaths int // transmissions with paths >= 2 hops
// Precomputed distance analytics: hop distances and path totals
// computed during Load() and incrementally updated on ingest.
distHops []distHopRecord
@@ -224,6 +225,7 @@ func NewPacketStore(db *DB, cfg *PacketStoreConfig) *PacketStore {
collisionCacheTTL: 60 * time.Second,
invCooldown: 10 * time.Second,
spIndex: make(map[string]int, 4096),
spTxIndex: make(map[string][]*StoreTx, 4096),
advertPubkeys: make(map[string]int),
}
if cfg != nil {
@@ -1241,7 +1243,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
// Incrementally update precomputed subpath index with new transmissions
for _, tx := range broadcastTxs {
if addTxToSubpathIndex(s.spIndex, tx) {
if addTxToSubpathIndexFull(s.spIndex, s.spTxIndex, tx) {
s.spTotalPaths++
}
addTxToPathHopIndex(s.byPathHop, tx)
@@ -1589,7 +1591,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
// Temporarily set parsedPath to old hops for removal.
saved, savedFlag := tx.parsedPath, tx.pathParsed
tx.parsedPath, tx.pathParsed = oldHops, true
if removeTxFromSubpathIndex(s.spIndex, tx) {
if removeTxFromSubpathIndexFull(s.spIndex, s.spTxIndex, tx) {
s.spTotalPaths--
}
tx.parsedPath, tx.pathParsed = saved, savedFlag
@@ -1606,7 +1608,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
}
// pickBestObservation already set pathParsed=false so
// addTxToSubpathIndex will re-parse the new path.
if addTxToSubpathIndex(s.spIndex, tx) {
if addTxToSubpathIndexFull(s.spIndex, s.spTxIndex, tx) {
s.spTotalPaths++
}
addTxToPathHopIndex(s.byPathHop, tx)
@@ -1978,6 +1980,12 @@ func txGetParsedPath(tx *StoreTx) []string {
// increments their counts in the index. Returns true if the tx contributed
// (path had ≥ 2 hops).
func addTxToSubpathIndex(idx map[string]int, tx *StoreTx) bool {
return addTxToSubpathIndexFull(idx, nil, tx)
}
// addTxToSubpathIndexFull is like addTxToSubpathIndex but also appends
// tx to txIdx for each subpath key (if txIdx is non-nil).
func addTxToSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreTx, tx *StoreTx) bool {
hops := txGetParsedPath(tx)
if len(hops) < 2 {
return false
@@ -1985,8 +1993,11 @@ func addTxToSubpathIndex(idx map[string]int, tx *StoreTx) bool {
maxL := min(8, len(hops))
for l := 2; l <= maxL; l++ {
for start := 0; start <= len(hops)-l; start++ {
key := strings.Join(hops[start:start+l], ",")
key := strings.ToLower(strings.Join(hops[start:start+l], ","))
idx[key]++
if txIdx != nil {
txIdx[key] = append(txIdx[key], tx)
}
}
}
return true
@@ -1996,6 +2007,12 @@ func addTxToSubpathIndex(idx map[string]int, tx *StoreTx) bool {
// decrements counts for all raw subpaths of tx. Returns true if the tx
// had a path.
func removeTxFromSubpathIndex(idx map[string]int, tx *StoreTx) bool {
return removeTxFromSubpathIndexFull(idx, nil, tx)
}
// removeTxFromSubpathIndexFull is like removeTxFromSubpathIndex but also
// removes tx from txIdx for each subpath key (if txIdx is non-nil).
func removeTxFromSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreTx, tx *StoreTx) bool {
hops := txGetParsedPath(tx)
if len(hops) < 2 {
return false
@@ -2003,11 +2020,23 @@ func removeTxFromSubpathIndex(idx map[string]int, tx *StoreTx) bool {
maxL := min(8, len(hops))
for l := 2; l <= maxL; l++ {
for start := 0; start <= len(hops)-l; start++ {
key := strings.Join(hops[start:start+l], ",")
key := strings.ToLower(strings.Join(hops[start:start+l], ","))
idx[key]--
if idx[key] <= 0 {
delete(idx, key)
}
if txIdx != nil {
txs := txIdx[key]
for i, t := range txs {
if t == tx {
txIdx[key] = append(txs[:i], txs[i+1:]...)
break
}
}
if len(txIdx[key]) == 0 {
delete(txIdx, key)
}
}
}
}
return true
@@ -2017,9 +2046,10 @@ func removeTxFromSubpathIndex(idx map[string]int, tx *StoreTx) bool {
// Must be called with s.mu held.
func (s *PacketStore) buildSubpathIndex() {
s.spIndex = make(map[string]int, 4096)
s.spTxIndex = make(map[string][]*StoreTx, 4096)
s.spTotalPaths = 0
for _, tx := range s.packets {
if addTxToSubpathIndex(s.spIndex, tx) {
if addTxToSubpathIndexFull(s.spIndex, s.spTxIndex, tx) {
s.spTotalPaths++
}
}
@@ -2274,7 +2304,7 @@ func (s *PacketStore) EvictStale() int {
}
// Remove from subpath index
removeTxFromSubpathIndex(s.spIndex, tx)
removeTxFromSubpathIndexFull(s.spIndex, s.spTxIndex, tx)
// Remove from path-hop index
removeTxFromPathHopIndex(s.byPathHop, tx)
}
@@ -5944,40 +5974,21 @@ func (s *PacketStore) GetSubpathDetail(rawHops []string) map[string]interface{}
nodes[i] = entry
}
// Build the subpath key the same way the index does (lowercase, comma-joined)
spKey := strings.ToLower(strings.Join(rawHops, ","))
// Direct lookup instead of scanning all packets
matchedTxs := s.spTxIndex[spKey]
hourBuckets := make([]int, 24)
var snrSum, rssiSum float64
var snrCount, rssiCount int
observers := map[string]int{}
parentPaths := map[string]int{}
var matchCount int
matchCount := len(matchedTxs)
var firstSeen, lastSeen string
for _, tx := range s.packets {
hops := txGetParsedPath(tx)
if len(hops) < len(rawHops) {
continue
}
// Check if rawHops appears as contiguous subsequence
found := false
for i := 0; i <= len(hops)-len(rawHops); i++ {
match := true
for j := 0; j < len(rawHops); j++ {
if !strings.EqualFold(hops[i+j], rawHops[j]) {
match = false
break
}
}
if match {
found = true
break
}
}
if !found {
continue
}
matchCount++
for _, tx := range matchedTxs {
ts := tx.FirstSeen
if ts != "" {
if firstSeen == "" || ts < firstSeen {
@@ -5986,7 +5997,6 @@ func (s *PacketStore) GetSubpathDetail(rawHops []string) map[string]interface{}
if lastSeen == "" || ts > lastSeen {
lastSeen = ts
}
// Parse hour from timestamp for hourly distribution
t, err := time.Parse(time.RFC3339, ts)
if err != nil {
t, err = time.Parse("2006-01-02 15:04:05", ts)
@@ -6008,6 +6018,7 @@ func (s *PacketStore) GetSubpathDetail(rawHops []string) map[string]interface{}
}
// Full parent path (resolved)
hops := txGetParsedPath(tx)
resolved := make([]string, len(hops))
for i, h := range hops {
r, _, _ := pm.resolveWithContext(h, nil, s.graph)