mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-04-25 13:22:13 +00:00
Compare commits
3 Commits
fix/public
...
fix/subpat
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7ec2b77ccd | ||
|
|
8d89f7d3e9 | ||
|
|
7abd05ff7f |
@@ -2322,24 +2322,12 @@ func TestSubpathTxIndexPopulated(t *testing.T) {
|
||||
store := NewPacketStore(db, nil)
|
||||
store.Load()
|
||||
|
||||
// spTxIndex must be populated alongside spIndex
|
||||
if len(store.spTxIndex) == 0 {
|
||||
t.Fatal("expected spTxIndex to be populated after Load()")
|
||||
// spIndex must be populated after Load()
|
||||
if len(store.spIndex) == 0 {
|
||||
t.Fatal("expected spIndex to be populated after Load()")
|
||||
}
|
||||
|
||||
// Every key in spIndex must also exist in spTxIndex with matching count
|
||||
for key, count := range store.spIndex {
|
||||
txs, ok := store.spTxIndex[key]
|
||||
if !ok {
|
||||
t.Errorf("spTxIndex missing key %q that exists in spIndex", key)
|
||||
continue
|
||||
}
|
||||
if len(txs) != count {
|
||||
t.Errorf("spTxIndex[%q] has %d txs, spIndex count is %d", key, len(txs), count)
|
||||
}
|
||||
}
|
||||
|
||||
// GetSubpathDetail should return correct match count via indexed lookup
|
||||
// GetSubpathDetail should return correct match count via scan fallback
|
||||
detail := store.GetSubpathDetail([]string{"eeff", "0011"})
|
||||
if detail == nil {
|
||||
t.Fatal("expected non-nil detail for existing subpath")
|
||||
|
||||
@@ -176,8 +176,7 @@ type PacketStore struct {
|
||||
// Precomputed subpath index: raw comma-joined hops → occurrence count.
|
||||
// Built during Load(), incrementally updated on ingest. Avoids full
|
||||
// packet iteration at query time (O(unique_subpaths) vs O(total_packets)).
|
||||
spIndex map[string]int // "hop1,hop2" → count
|
||||
spTxIndex map[string][]*StoreTx // "hop1,hop2" → transmissions containing this subpath
|
||||
spIndex map[string]int // "hop1,hop2" → count
|
||||
spTotalPaths int // transmissions with paths >= 2 hops
|
||||
// Precomputed distance analytics: hop distances and path totals
|
||||
// computed during Load() and incrementally updated on ingest.
|
||||
@@ -311,7 +310,7 @@ func NewPacketStore(db *DB, cfg *PacketStoreConfig, cacheTTLs ...map[string]inte
|
||||
collisionCacheTTL: 3600 * time.Second,
|
||||
invCooldown: 300 * time.Second,
|
||||
spIndex: make(map[string]int, 4096),
|
||||
spTxIndex: make(map[string][]*StoreTx, 4096),
|
||||
|
||||
advertPubkeys: make(map[string]int),
|
||||
lastSeenTouched: make(map[string]time.Time),
|
||||
clockSkew: NewClockSkewEngine(),
|
||||
@@ -1537,7 +1536,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
|
||||
|
||||
// Incrementally update precomputed subpath index with new transmissions
|
||||
for _, tx := range broadcastTxs {
|
||||
if addTxToSubpathIndexFull(s.spIndex, s.spTxIndex, tx) {
|
||||
if addTxToSubpathIndex(s.spIndex, tx) {
|
||||
s.spTotalPaths++
|
||||
}
|
||||
addTxToPathHopIndex(s.byPathHop, tx)
|
||||
@@ -1906,7 +1905,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
|
||||
// Temporarily set parsedPath to old hops for removal.
|
||||
saved, savedFlag := tx.parsedPath, tx.pathParsed
|
||||
tx.parsedPath, tx.pathParsed = oldHops, true
|
||||
if removeTxFromSubpathIndexFull(s.spIndex, s.spTxIndex, tx) {
|
||||
if removeTxFromSubpathIndex(s.spIndex, tx) {
|
||||
s.spTotalPaths--
|
||||
}
|
||||
tx.parsedPath, tx.pathParsed = saved, savedFlag
|
||||
@@ -1923,7 +1922,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
|
||||
}
|
||||
// pickBestObservation already set pathParsed=false so
|
||||
// addTxToSubpathIndex will re-parse the new path.
|
||||
if addTxToSubpathIndexFull(s.spIndex, s.spTxIndex, tx) {
|
||||
if addTxToSubpathIndex(s.spIndex, tx) {
|
||||
s.spTotalPaths++
|
||||
}
|
||||
addTxToPathHopIndex(s.byPathHop, tx)
|
||||
@@ -2398,12 +2397,6 @@ func txGetParsedPath(tx *StoreTx) []string {
|
||||
// increments their counts in the index. Returns true if the tx contributed
|
||||
// (path had ≥ 2 hops).
|
||||
func addTxToSubpathIndex(idx map[string]int, tx *StoreTx) bool {
|
||||
return addTxToSubpathIndexFull(idx, nil, tx)
|
||||
}
|
||||
|
||||
// addTxToSubpathIndexFull is like addTxToSubpathIndex but also appends
|
||||
// tx to txIdx for each subpath key (if txIdx is non-nil).
|
||||
func addTxToSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreTx, tx *StoreTx) bool {
|
||||
hops := txGetParsedPath(tx)
|
||||
if len(hops) < 2 {
|
||||
return false
|
||||
@@ -2413,9 +2406,6 @@ func addTxToSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreTx, tx
|
||||
for start := 0; start <= len(hops)-l; start++ {
|
||||
key := strings.ToLower(strings.Join(hops[start:start+l], ","))
|
||||
idx[key]++
|
||||
if txIdx != nil {
|
||||
txIdx[key] = append(txIdx[key], tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
@@ -2425,12 +2415,6 @@ func addTxToSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreTx, tx
|
||||
// decrements counts for all raw subpaths of tx. Returns true if the tx
|
||||
// had a path.
|
||||
func removeTxFromSubpathIndex(idx map[string]int, tx *StoreTx) bool {
|
||||
return removeTxFromSubpathIndexFull(idx, nil, tx)
|
||||
}
|
||||
|
||||
// removeTxFromSubpathIndexFull is like removeTxFromSubpathIndex but also
|
||||
// removes tx from txIdx for each subpath key (if txIdx is non-nil).
|
||||
func removeTxFromSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreTx, tx *StoreTx) bool {
|
||||
hops := txGetParsedPath(tx)
|
||||
if len(hops) < 2 {
|
||||
return false
|
||||
@@ -2443,18 +2427,6 @@ func removeTxFromSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreT
|
||||
if idx[key] <= 0 {
|
||||
delete(idx, key)
|
||||
}
|
||||
if txIdx != nil {
|
||||
txs := txIdx[key]
|
||||
for i, t := range txs {
|
||||
if t == tx {
|
||||
txIdx[key] = append(txs[:i], txs[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(txIdx[key]) == 0 {
|
||||
delete(txIdx, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
@@ -2464,15 +2436,27 @@ func removeTxFromSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreT
|
||||
// Must be called with s.mu held.
|
||||
func (s *PacketStore) buildSubpathIndex() {
|
||||
s.spIndex = make(map[string]int, 4096)
|
||||
s.spTxIndex = make(map[string][]*StoreTx, 4096)
|
||||
s.spTotalPaths = 0
|
||||
for _, tx := range s.packets {
|
||||
if addTxToSubpathIndexFull(s.spIndex, s.spTxIndex, tx) {
|
||||
if addTxToSubpathIndex(s.spIndex, tx) {
|
||||
s.spTotalPaths++
|
||||
}
|
||||
}
|
||||
log.Printf("[store] Built subpath index: %d unique raw subpaths from %d paths",
|
||||
len(s.spIndex), s.spTotalPaths)
|
||||
// Drop singleton entries (count==1) to save memory. At scale, 60-70% of
|
||||
// subpath keys appear only once — they're noise and not useful for analytics.
|
||||
// Singletons that appear after startup via incremental ingest will remain
|
||||
// in the index until the next restart; this is acceptable since they only
|
||||
// matter for display ranking and the count is still correct for non-singletons.
|
||||
total := len(s.spIndex)
|
||||
dropped := 0
|
||||
for key, count := range s.spIndex {
|
||||
if count == 1 {
|
||||
delete(s.spIndex, key)
|
||||
dropped++
|
||||
}
|
||||
}
|
||||
log.Printf("[store] Subpath index: kept %d/%d entries (dropped %d singletons)",
|
||||
total-dropped, total, dropped)
|
||||
}
|
||||
|
||||
// buildPathHopIndex scans all packets and populates byPathHop.
|
||||
@@ -2656,10 +2640,11 @@ func (s *PacketStore) buildDistanceIndex() {
|
||||
// usage and independent of GC state.
|
||||
//
|
||||
// Issue #743: Previous estimates missed major per-packet allocations:
|
||||
// - spTxIndex: O(path²) entries per tx (50-150MB at scale)
|
||||
// - ResolvedPath on observations (~25MB at scale)
|
||||
// - Per-tx maps: obsKeys, observerSet (~11MB at scale)
|
||||
// - byPathHop index entries (20-40MB at scale)
|
||||
// Note: spTxIndex was eliminated in #791 (saved ~280MB at 3.4M subpaths).
|
||||
// Singleton subpath entries are also dropped from spIndex (#791, saves ~150MB).
|
||||
const (
|
||||
storeTxBaseBytes = 384 // StoreTx struct fields + map headers + sync.Once + string headers
|
||||
storeObsBaseBytes = 192 // StoreObs struct fields + string headers
|
||||
@@ -2673,15 +2658,12 @@ const (
|
||||
// Per path hop: byPathHop index entry (pointer + map bucket)
|
||||
perPathHopBytes = 50
|
||||
|
||||
// Per subpath entry in spTxIndex: string key + slice append + pointer
|
||||
perSubpathEntryBytes = 40
|
||||
|
||||
// Per resolved path element on an observation
|
||||
perResolvedPathElemBytes = 24 // *string pointer + string header + avg pubkey length
|
||||
)
|
||||
|
||||
// estimateStoreTxBytes returns the estimated memory cost of a StoreTx (excluding observations).
|
||||
// Includes per-tx maps (obsKeys, observerSet), byPathHop entries, and spTxIndex subpath entries.
|
||||
// Includes per-tx maps (obsKeys, observerSet) and byPathHop entries.
|
||||
func estimateStoreTxBytes(tx *StoreTx) int64 {
|
||||
base := int64(storeTxBaseBytes)
|
||||
base += int64(len(tx.RawHex) + len(tx.Hash) + len(tx.DecodedJSON) + len(tx.PathJSON))
|
||||
@@ -2694,12 +2676,6 @@ func estimateStoreTxBytes(tx *StoreTx) int64 {
|
||||
hops := int64(len(txGetParsedPath(tx)))
|
||||
base += hops * perPathHopBytes
|
||||
|
||||
// spTxIndex: O(path²) subpath combinations
|
||||
if hops > 1 {
|
||||
subpaths := hops * (hops - 1) / 2
|
||||
base += subpaths * perSubpathEntryBytes
|
||||
}
|
||||
|
||||
return base
|
||||
}
|
||||
|
||||
@@ -2713,7 +2689,6 @@ func estimateStoreTxBytesTypical(numObs int) int64 {
|
||||
base += perTxMapsBytes
|
||||
hops := int64(3)
|
||||
base += hops * perPathHopBytes
|
||||
base += (hops * (hops - 1) / 2) * perSubpathEntryBytes
|
||||
// Add observation costs
|
||||
obsBase := int64(storeObsBaseBytes) + 30 + 30 + 60 // observer ID + name + path
|
||||
obsBase += int64(numIndexesPerObs * indexEntryBytes)
|
||||
@@ -2906,7 +2881,7 @@ func (s *PacketStore) EvictStale() int {
|
||||
}
|
||||
|
||||
// Remove from subpath index
|
||||
removeTxFromSubpathIndexFull(s.spIndex, s.spTxIndex, tx)
|
||||
removeTxFromSubpathIndex(s.spIndex, tx)
|
||||
// Remove from path-hop index
|
||||
removeTxFromPathHopIndex(s.byPathHop, tx)
|
||||
}
|
||||
@@ -7078,8 +7053,27 @@ func (s *PacketStore) GetSubpathDetail(rawHops []string) map[string]interface{}
|
||||
// Build the subpath key the same way the index does (lowercase, comma-joined)
|
||||
spKey := strings.ToLower(strings.Join(rawHops, ","))
|
||||
|
||||
// Direct lookup instead of scanning all packets
|
||||
matchedTxs := s.spTxIndex[spKey]
|
||||
// Scan all transmissions for matching subpaths (replaces former spTxIndex
|
||||
// lookup — the per-tx pointer index was eliminated in #791 to save ~280MB
|
||||
// at scale). This is O(packets) but only called on drill-down, not listing.
|
||||
var matchedTxs []*StoreTx
|
||||
for _, tx := range s.packets {
|
||||
hops := txGetParsedPath(tx)
|
||||
if len(hops) < 2 {
|
||||
continue
|
||||
}
|
||||
maxL := min(8, len(hops))
|
||||
for l := 2; l <= maxL; l++ {
|
||||
for start := 0; start <= len(hops)-l; start++ {
|
||||
key := strings.ToLower(strings.Join(hops[start:start+l], ","))
|
||||
if key == spKey {
|
||||
matchedTxs = append(matchedTxs, tx)
|
||||
goto nextTx
|
||||
}
|
||||
}
|
||||
}
|
||||
nextTx:
|
||||
}
|
||||
|
||||
hourBuckets := make([]int, 24)
|
||||
var snrSum, rssiSum float64
|
||||
|
||||
Reference in New Issue
Block a user