mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-04-25 14:22:11 +00:00
Compare commits
3 Commits
feat/obser
...
fix/subpat
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5c2c71c6ee | ||
|
|
8d89f7d3e9 | ||
|
|
7abd05ff7f |
@@ -2145,6 +2145,13 @@ func setupRichTestDB(t *testing.T) *DB {
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (5, 1, 14.0, -88, '["aa"]', ?)`, recentEpoch)
|
||||
|
||||
// Extra packet sharing subpath "eeff,0011" with hash_with_path_02 above,
|
||||
// so that subpath has count>=2 and survives singleton pruning.
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
VALUES ('0140eeff0011', 'hash_shared_subpath', ?, 1, 4, '{"pubKey":"eeff001199887766","name":"TestShared","type":"ADVERT"}')`, recent)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (6, 1, 9.0, -92, '["eeff","0011"]', ?)`, recentEpoch)
|
||||
|
||||
return db
|
||||
}
|
||||
|
||||
@@ -2276,14 +2283,11 @@ func TestSubpathPrecomputedIndex(t *testing.T) {
|
||||
t.Fatal("expected spTotalPaths > 0 after Load()")
|
||||
}
|
||||
|
||||
// The rich test DB has paths ["aa","bb"], ["aabb","ccdd"], and
|
||||
// ["eeff","0011","2233"]. That yields 5 unique raw subpaths.
|
||||
// The rich test DB has paths ["aa","bb"], ["aabb","ccdd"],
|
||||
// ["eeff","0011","2233"], and ["eeff","0011"]. After singleton pruning,
|
||||
// only subpaths with count>=2 survive. "eeff,0011" appears in two packets.
|
||||
expectedRaw := map[string]int{
|
||||
"aa,bb": 1,
|
||||
"aabb,ccdd": 1,
|
||||
"eeff,0011": 1,
|
||||
"0011,2233": 1,
|
||||
"eeff,0011,2233": 1,
|
||||
"eeff,0011": 2,
|
||||
}
|
||||
for key, want := range expectedRaw {
|
||||
got, ok := store.spIndex[key]
|
||||
@@ -2293,8 +2297,16 @@ func TestSubpathPrecomputedIndex(t *testing.T) {
|
||||
t.Errorf("spIndex[%q] = %d, want %d", key, got, want)
|
||||
}
|
||||
}
|
||||
if store.spTotalPaths != 3 {
|
||||
t.Errorf("spTotalPaths = %d, want 3", store.spTotalPaths)
|
||||
|
||||
// Singleton subpaths must have been pruned
|
||||
singletons := []string{"aa,bb", "aabb,ccdd", "0011,2233", "eeff,0011,2233"}
|
||||
for _, key := range singletons {
|
||||
if _, ok := store.spIndex[key]; ok {
|
||||
t.Errorf("expected singleton spIndex[%q] to be pruned", key)
|
||||
}
|
||||
}
|
||||
if store.spTotalPaths != 4 {
|
||||
t.Errorf("spTotalPaths = %d, want 4", store.spTotalPaths)
|
||||
}
|
||||
|
||||
// Fast-path (no region) and slow-path (with region) must return the
|
||||
@@ -2322,31 +2334,19 @@ func TestSubpathTxIndexPopulated(t *testing.T) {
|
||||
store := NewPacketStore(db, nil)
|
||||
store.Load()
|
||||
|
||||
// spTxIndex must be populated alongside spIndex
|
||||
if len(store.spTxIndex) == 0 {
|
||||
t.Fatal("expected spTxIndex to be populated after Load()")
|
||||
// spIndex must be populated after Load()
|
||||
if len(store.spIndex) == 0 {
|
||||
t.Fatal("expected spIndex to be populated after Load()")
|
||||
}
|
||||
|
||||
// Every key in spIndex must also exist in spTxIndex with matching count
|
||||
for key, count := range store.spIndex {
|
||||
txs, ok := store.spTxIndex[key]
|
||||
if !ok {
|
||||
t.Errorf("spTxIndex missing key %q that exists in spIndex", key)
|
||||
continue
|
||||
}
|
||||
if len(txs) != count {
|
||||
t.Errorf("spTxIndex[%q] has %d txs, spIndex count is %d", key, len(txs), count)
|
||||
}
|
||||
}
|
||||
|
||||
// GetSubpathDetail should return correct match count via indexed lookup
|
||||
// GetSubpathDetail should return correct match count via scan fallback
|
||||
detail := store.GetSubpathDetail([]string{"eeff", "0011"})
|
||||
if detail == nil {
|
||||
t.Fatal("expected non-nil detail for existing subpath")
|
||||
}
|
||||
matches, _ := detail["totalMatches"].(int)
|
||||
if matches != 1 {
|
||||
t.Errorf("totalMatches = %d, want 1", matches)
|
||||
if matches != 2 {
|
||||
t.Errorf("totalMatches = %d, want 2", matches)
|
||||
}
|
||||
|
||||
// Non-existent subpath should return 0 matches
|
||||
@@ -2394,6 +2394,55 @@ func TestSubpathDetailMixedCaseHops(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestSubpathSingletonDrop verifies that singleton entries are pruned from
|
||||
// spIndex while count>=2 entries are preserved.
|
||||
func TestSubpathSingletonDrop(t *testing.T) {
|
||||
db := setupRichTestDB(t)
|
||||
defer db.Close()
|
||||
store := NewPacketStore(db, nil)
|
||||
store.Load()
|
||||
|
||||
// "eeff,0011" appears in 2 packets — must survive singleton pruning
|
||||
if count, ok := store.spIndex["eeff,0011"]; !ok {
|
||||
t.Fatal("expected spIndex[\"eeff,0011\"] to survive singleton pruning")
|
||||
} else if count != 2 {
|
||||
t.Errorf("spIndex[\"eeff,0011\"] = %d, want 2", count)
|
||||
}
|
||||
|
||||
// All count==1 entries must be gone
|
||||
for key, count := range store.spIndex {
|
||||
if count < 2 {
|
||||
t.Errorf("spIndex[%q] = %d, singletons should have been pruned", key, count)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestSubpathEmptyDB verifies that the store loads successfully on a DB
|
||||
// with no transmissions (no subpaths at all).
|
||||
func TestSubpathEmptyDB(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
store := NewPacketStore(db, nil)
|
||||
store.Load()
|
||||
|
||||
if len(store.spIndex) != 0 {
|
||||
t.Errorf("expected empty spIndex on empty DB, got %d entries", len(store.spIndex))
|
||||
}
|
||||
if store.spTotalPaths != 0 {
|
||||
t.Errorf("expected spTotalPaths=0 on empty DB, got %d", store.spTotalPaths)
|
||||
}
|
||||
|
||||
// GetSubpathDetail should still work (return zero matches)
|
||||
detail := store.GetSubpathDetail([]string{"aa", "bb"})
|
||||
if detail == nil {
|
||||
t.Fatal("expected non-nil detail even on empty DB")
|
||||
}
|
||||
matches, _ := detail["totalMatches"].(int)
|
||||
if matches != 0 {
|
||||
t.Errorf("totalMatches on empty DB = %d, want 0", matches)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStoreGetAnalyticsRFCacheHit(t *testing.T) {
|
||||
db := setupRichTestDB(t)
|
||||
defer db.Close()
|
||||
|
||||
@@ -546,9 +546,6 @@ func TestEstimateStoreTxBytes(t *testing.T) {
|
||||
manualCalc := int64(storeTxBaseBytes) + int64(len(tx.RawHex)+len(tx.Hash)+len(tx.DecodedJSON)+len(tx.PathJSON)) + int64(numIndexesPerTx*indexEntryBytes)
|
||||
manualCalc += perTxMapsBytes
|
||||
manualCalc += hops * perPathHopBytes
|
||||
if hops > 1 {
|
||||
manualCalc += (hops * (hops - 1) / 2) * perSubpathEntryBytes
|
||||
}
|
||||
if est != manualCalc {
|
||||
t.Fatalf("estimateStoreTxBytes = %d, want %d (manual calc)", est, manualCalc)
|
||||
}
|
||||
|
||||
@@ -176,8 +176,7 @@ type PacketStore struct {
|
||||
// Precomputed subpath index: raw comma-joined hops → occurrence count.
|
||||
// Built during Load(), incrementally updated on ingest. Avoids full
|
||||
// packet iteration at query time (O(unique_subpaths) vs O(total_packets)).
|
||||
spIndex map[string]int // "hop1,hop2" → count
|
||||
spTxIndex map[string][]*StoreTx // "hop1,hop2" → transmissions containing this subpath
|
||||
spIndex map[string]int // "hop1,hop2" → count
|
||||
spTotalPaths int // transmissions with paths >= 2 hops
|
||||
// Precomputed distance analytics: hop distances and path totals
|
||||
// computed during Load() and incrementally updated on ingest.
|
||||
@@ -311,7 +310,7 @@ func NewPacketStore(db *DB, cfg *PacketStoreConfig, cacheTTLs ...map[string]inte
|
||||
collisionCacheTTL: 3600 * time.Second,
|
||||
invCooldown: 300 * time.Second,
|
||||
spIndex: make(map[string]int, 4096),
|
||||
spTxIndex: make(map[string][]*StoreTx, 4096),
|
||||
|
||||
advertPubkeys: make(map[string]int),
|
||||
lastSeenTouched: make(map[string]time.Time),
|
||||
clockSkew: NewClockSkewEngine(),
|
||||
@@ -1537,7 +1536,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
|
||||
|
||||
// Incrementally update precomputed subpath index with new transmissions
|
||||
for _, tx := range broadcastTxs {
|
||||
if addTxToSubpathIndexFull(s.spIndex, s.spTxIndex, tx) {
|
||||
if addTxToSubpathIndex(s.spIndex, tx) {
|
||||
s.spTotalPaths++
|
||||
}
|
||||
addTxToPathHopIndex(s.byPathHop, tx)
|
||||
@@ -1906,7 +1905,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
|
||||
// Temporarily set parsedPath to old hops for removal.
|
||||
saved, savedFlag := tx.parsedPath, tx.pathParsed
|
||||
tx.parsedPath, tx.pathParsed = oldHops, true
|
||||
if removeTxFromSubpathIndexFull(s.spIndex, s.spTxIndex, tx) {
|
||||
if removeTxFromSubpathIndex(s.spIndex, tx) {
|
||||
s.spTotalPaths--
|
||||
}
|
||||
tx.parsedPath, tx.pathParsed = saved, savedFlag
|
||||
@@ -1923,7 +1922,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
|
||||
}
|
||||
// pickBestObservation already set pathParsed=false so
|
||||
// addTxToSubpathIndex will re-parse the new path.
|
||||
if addTxToSubpathIndexFull(s.spIndex, s.spTxIndex, tx) {
|
||||
if addTxToSubpathIndex(s.spIndex, tx) {
|
||||
s.spTotalPaths++
|
||||
}
|
||||
addTxToPathHopIndex(s.byPathHop, tx)
|
||||
@@ -2398,12 +2397,6 @@ func txGetParsedPath(tx *StoreTx) []string {
|
||||
// increments their counts in the index. Returns true if the tx contributed
|
||||
// (path had ≥ 2 hops).
|
||||
func addTxToSubpathIndex(idx map[string]int, tx *StoreTx) bool {
|
||||
return addTxToSubpathIndexFull(idx, nil, tx)
|
||||
}
|
||||
|
||||
// addTxToSubpathIndexFull is like addTxToSubpathIndex but also appends
|
||||
// tx to txIdx for each subpath key (if txIdx is non-nil).
|
||||
func addTxToSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreTx, tx *StoreTx) bool {
|
||||
hops := txGetParsedPath(tx)
|
||||
if len(hops) < 2 {
|
||||
return false
|
||||
@@ -2413,9 +2406,6 @@ func addTxToSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreTx, tx
|
||||
for start := 0; start <= len(hops)-l; start++ {
|
||||
key := strings.ToLower(strings.Join(hops[start:start+l], ","))
|
||||
idx[key]++
|
||||
if txIdx != nil {
|
||||
txIdx[key] = append(txIdx[key], tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
@@ -2425,12 +2415,6 @@ func addTxToSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreTx, tx
|
||||
// decrements counts for all raw subpaths of tx. Returns true if the tx
|
||||
// had a path.
|
||||
func removeTxFromSubpathIndex(idx map[string]int, tx *StoreTx) bool {
|
||||
return removeTxFromSubpathIndexFull(idx, nil, tx)
|
||||
}
|
||||
|
||||
// removeTxFromSubpathIndexFull is like removeTxFromSubpathIndex but also
|
||||
// removes tx from txIdx for each subpath key (if txIdx is non-nil).
|
||||
func removeTxFromSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreTx, tx *StoreTx) bool {
|
||||
hops := txGetParsedPath(tx)
|
||||
if len(hops) < 2 {
|
||||
return false
|
||||
@@ -2443,18 +2427,6 @@ func removeTxFromSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreT
|
||||
if idx[key] <= 0 {
|
||||
delete(idx, key)
|
||||
}
|
||||
if txIdx != nil {
|
||||
txs := txIdx[key]
|
||||
for i, t := range txs {
|
||||
if t == tx {
|
||||
txIdx[key] = append(txs[:i], txs[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(txIdx[key]) == 0 {
|
||||
delete(txIdx, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
@@ -2464,15 +2436,27 @@ func removeTxFromSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreT
|
||||
// Must be called with s.mu held.
|
||||
func (s *PacketStore) buildSubpathIndex() {
|
||||
s.spIndex = make(map[string]int, 4096)
|
||||
s.spTxIndex = make(map[string][]*StoreTx, 4096)
|
||||
s.spTotalPaths = 0
|
||||
for _, tx := range s.packets {
|
||||
if addTxToSubpathIndexFull(s.spIndex, s.spTxIndex, tx) {
|
||||
if addTxToSubpathIndex(s.spIndex, tx) {
|
||||
s.spTotalPaths++
|
||||
}
|
||||
}
|
||||
log.Printf("[store] Built subpath index: %d unique raw subpaths from %d paths",
|
||||
len(s.spIndex), s.spTotalPaths)
|
||||
// Drop singleton entries (count==1) to save memory. At scale, 60-70% of
|
||||
// subpath keys appear only once — they're noise and not useful for analytics.
|
||||
// Singletons that appear after startup via incremental ingest will remain
|
||||
// in the index until the next restart; this is acceptable since they only
|
||||
// matter for display ranking and the count is still correct for non-singletons.
|
||||
total := len(s.spIndex)
|
||||
dropped := 0
|
||||
for key, count := range s.spIndex {
|
||||
if count == 1 {
|
||||
delete(s.spIndex, key)
|
||||
dropped++
|
||||
}
|
||||
}
|
||||
log.Printf("[store] Subpath index: kept %d/%d entries (dropped %d singletons)",
|
||||
total-dropped, total, dropped)
|
||||
}
|
||||
|
||||
// buildPathHopIndex scans all packets and populates byPathHop.
|
||||
@@ -2656,10 +2640,11 @@ func (s *PacketStore) buildDistanceIndex() {
|
||||
// usage and independent of GC state.
|
||||
//
|
||||
// Issue #743: Previous estimates missed major per-packet allocations:
|
||||
// - spTxIndex: O(path²) entries per tx (50-150MB at scale)
|
||||
// - ResolvedPath on observations (~25MB at scale)
|
||||
// - Per-tx maps: obsKeys, observerSet (~11MB at scale)
|
||||
// - byPathHop index entries (20-40MB at scale)
|
||||
// Note: spTxIndex was eliminated in #791 (saved ~280MB at 3.4M subpaths).
|
||||
// Singleton subpath entries are also dropped from spIndex (#791, saves ~150MB).
|
||||
const (
|
||||
storeTxBaseBytes = 384 // StoreTx struct fields + map headers + sync.Once + string headers
|
||||
storeObsBaseBytes = 192 // StoreObs struct fields + string headers
|
||||
@@ -2673,15 +2658,12 @@ const (
|
||||
// Per path hop: byPathHop index entry (pointer + map bucket)
|
||||
perPathHopBytes = 50
|
||||
|
||||
// Per subpath entry in spTxIndex: string key + slice append + pointer
|
||||
perSubpathEntryBytes = 40
|
||||
|
||||
// Per resolved path element on an observation
|
||||
perResolvedPathElemBytes = 24 // *string pointer + string header + avg pubkey length
|
||||
)
|
||||
|
||||
// estimateStoreTxBytes returns the estimated memory cost of a StoreTx (excluding observations).
|
||||
// Includes per-tx maps (obsKeys, observerSet), byPathHop entries, and spTxIndex subpath entries.
|
||||
// Includes per-tx maps (obsKeys, observerSet) and byPathHop entries.
|
||||
func estimateStoreTxBytes(tx *StoreTx) int64 {
|
||||
base := int64(storeTxBaseBytes)
|
||||
base += int64(len(tx.RawHex) + len(tx.Hash) + len(tx.DecodedJSON) + len(tx.PathJSON))
|
||||
@@ -2694,12 +2676,6 @@ func estimateStoreTxBytes(tx *StoreTx) int64 {
|
||||
hops := int64(len(txGetParsedPath(tx)))
|
||||
base += hops * perPathHopBytes
|
||||
|
||||
// spTxIndex: O(path²) subpath combinations
|
||||
if hops > 1 {
|
||||
subpaths := hops * (hops - 1) / 2
|
||||
base += subpaths * perSubpathEntryBytes
|
||||
}
|
||||
|
||||
return base
|
||||
}
|
||||
|
||||
@@ -2713,7 +2689,6 @@ func estimateStoreTxBytesTypical(numObs int) int64 {
|
||||
base += perTxMapsBytes
|
||||
hops := int64(3)
|
||||
base += hops * perPathHopBytes
|
||||
base += (hops * (hops - 1) / 2) * perSubpathEntryBytes
|
||||
// Add observation costs
|
||||
obsBase := int64(storeObsBaseBytes) + 30 + 30 + 60 // observer ID + name + path
|
||||
obsBase += int64(numIndexesPerObs * indexEntryBytes)
|
||||
@@ -2906,7 +2881,7 @@ func (s *PacketStore) EvictStale() int {
|
||||
}
|
||||
|
||||
// Remove from subpath index
|
||||
removeTxFromSubpathIndexFull(s.spIndex, s.spTxIndex, tx)
|
||||
removeTxFromSubpathIndex(s.spIndex, tx)
|
||||
// Remove from path-hop index
|
||||
removeTxFromPathHopIndex(s.byPathHop, tx)
|
||||
}
|
||||
@@ -7078,8 +7053,27 @@ func (s *PacketStore) GetSubpathDetail(rawHops []string) map[string]interface{}
|
||||
// Build the subpath key the same way the index does (lowercase, comma-joined)
|
||||
spKey := strings.ToLower(strings.Join(rawHops, ","))
|
||||
|
||||
// Direct lookup instead of scanning all packets
|
||||
matchedTxs := s.spTxIndex[spKey]
|
||||
// Scan all transmissions for matching subpaths (replaces former spTxIndex
|
||||
// lookup — the per-tx pointer index was eliminated in #791 to save ~280MB
|
||||
// at scale). This is O(packets) but only called on drill-down, not listing.
|
||||
var matchedTxs []*StoreTx
|
||||
for _, tx := range s.packets {
|
||||
hops := txGetParsedPath(tx)
|
||||
if len(hops) < 2 {
|
||||
continue
|
||||
}
|
||||
maxL := min(8, len(hops))
|
||||
for l := 2; l <= maxL; l++ {
|
||||
for start := 0; start <= len(hops)-l; start++ {
|
||||
key := strings.ToLower(strings.Join(hops[start:start+l], ","))
|
||||
if key == spKey {
|
||||
matchedTxs = append(matchedTxs, tx)
|
||||
goto nextTx
|
||||
}
|
||||
}
|
||||
}
|
||||
nextTx:
|
||||
}
|
||||
|
||||
hourBuckets := make([]int, 24)
|
||||
var snrSum, rssiSum float64
|
||||
|
||||
@@ -28,7 +28,7 @@ func TestEstimateStoreTxBytes_ReasonableValues(t *testing.T) {
|
||||
}
|
||||
|
||||
// TestEstimateStoreTxBytes_ManyHopsSubpaths verifies that packets with many
|
||||
// hops estimate significantly more due to O(path²) subpath index entries.
|
||||
// hops estimate more due to per-hop byPathHop index entries.
|
||||
func TestEstimateStoreTxBytes_ManyHopsSubpaths(t *testing.T) {
|
||||
tx2 := &StoreTx{
|
||||
Hash: "aabb",
|
||||
@@ -43,12 +43,14 @@ func TestEstimateStoreTxBytes_ManyHopsSubpaths(t *testing.T) {
|
||||
est2 := estimateStoreTxBytes(tx2)
|
||||
est10 := estimateStoreTxBytes(tx10)
|
||||
|
||||
// 10 hops → 45 subpath combos × 40 = 1800 bytes just for subpaths
|
||||
// 10 hops vs 2 hops → 8 extra byPathHop entries × perPathHopBytes
|
||||
if est10 <= est2 {
|
||||
t.Errorf("10-hop (%d) should estimate more than 2-hop (%d)", est10, est2)
|
||||
}
|
||||
if est10 < est2+1500 {
|
||||
t.Errorf("10-hop (%d) should estimate at least 1500 more than 2-hop (%d)", est10, est2)
|
||||
// spTxIndex eliminated in #791; cost difference is now linear (per-hop only)
|
||||
expectedDiff := int64(8) * perPathHopBytes // 8 extra hops
|
||||
if est10 < est2+expectedDiff {
|
||||
t.Errorf("10-hop (%d) should estimate at least %d more than 2-hop (%d)", est10, expectedDiff, est2)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user