Compare commits

...

3 Commits

Author SHA1 Message Date
you
5c2c71c6ee fix(#791): update memory-accounting to reflect eliminated spTxIndex
Remove perSubpathEntryBytes constant and all O(path²) subpath cost
calculations from estimateStoreTxBytes and estimateStoreTxBytesTypical.
Update comments to note that spTxIndex was eliminated in #791 and
singleton subpath entries are dropped from spIndex.
2026-04-19 00:08:28 +00:00
you
8d89f7d3e9 fix(#791): drop singleton subpath entries to save ~150MB at scale
After the initial subpath index build, iterate and delete all entries
where count==1. At scale, 60-70% of subpath keys are singletons that
appear only once — they add no analytical value and consume significant
memory (each entry is a string key + map bucket overhead).

Newly-seen singletons during incremental ingest will remain in the
index until the next restart. This is acceptable since singleton
subpaths are noise for ranking/display purposes.

Add a startup log line showing how many entries were kept vs dropped.
2026-04-19 00:03:37 +00:00
you
7abd05ff7f fix(#791): eliminate spTxIndex to save ~280MB at 3.4M subpaths
Remove the spTxIndex map (map[string][]*StoreTx) that stored per-subpath
transmission pointer slices. This was the largest single heap consumer
during startup on databases with many unique subpaths.

Replace the only read site (GetSubpathDetail) with an O(packets) scan
that matches subpaths on the fly. This is acceptable because the
endpoint is only called on drill-down, not listing.

Remove all write sites (addTxToSubpathIndexFull, removeTxFromSubpathIndexFull)
and collapse them back into the simpler addTxToSubpathIndex/removeTxFromSubpathIndex
functions that only maintain counts.
2026-04-19 00:03:13 +00:00
4 changed files with 128 additions and 86 deletions

View File

@@ -2145,6 +2145,13 @@ func setupRichTestDB(t *testing.T) *DB {
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
VALUES (5, 1, 14.0, -88, '["aa"]', ?)`, recentEpoch)
// Extra packet sharing subpath "eeff,0011" with hash_with_path_02 above,
// so that subpath has count>=2 and survives singleton pruning.
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
VALUES ('0140eeff0011', 'hash_shared_subpath', ?, 1, 4, '{"pubKey":"eeff001199887766","name":"TestShared","type":"ADVERT"}')`, recent)
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
VALUES (6, 1, 9.0, -92, '["eeff","0011"]', ?)`, recentEpoch)
return db
}
@@ -2276,14 +2283,11 @@ func TestSubpathPrecomputedIndex(t *testing.T) {
t.Fatal("expected spTotalPaths > 0 after Load()")
}
// The rich test DB has paths ["aa","bb"], ["aabb","ccdd"], and
// ["eeff","0011","2233"]. That yields 5 unique raw subpaths.
// The rich test DB has paths ["aa","bb"], ["aabb","ccdd"],
// ["eeff","0011","2233"], and ["eeff","0011"]. After singleton pruning,
// only subpaths with count>=2 survive. "eeff,0011" appears in two packets.
expectedRaw := map[string]int{
"aa,bb": 1,
"aabb,ccdd": 1,
"eeff,0011": 1,
"0011,2233": 1,
"eeff,0011,2233": 1,
"eeff,0011": 2,
}
for key, want := range expectedRaw {
got, ok := store.spIndex[key]
@@ -2293,8 +2297,16 @@ func TestSubpathPrecomputedIndex(t *testing.T) {
t.Errorf("spIndex[%q] = %d, want %d", key, got, want)
}
}
if store.spTotalPaths != 3 {
t.Errorf("spTotalPaths = %d, want 3", store.spTotalPaths)
// Singleton subpaths must have been pruned
singletons := []string{"aa,bb", "aabb,ccdd", "0011,2233", "eeff,0011,2233"}
for _, key := range singletons {
if _, ok := store.spIndex[key]; ok {
t.Errorf("expected singleton spIndex[%q] to be pruned", key)
}
}
if store.spTotalPaths != 4 {
t.Errorf("spTotalPaths = %d, want 4", store.spTotalPaths)
}
// Fast-path (no region) and slow-path (with region) must return the
@@ -2322,31 +2334,19 @@ func TestSubpathTxIndexPopulated(t *testing.T) {
store := NewPacketStore(db, nil)
store.Load()
// spTxIndex must be populated alongside spIndex
if len(store.spTxIndex) == 0 {
t.Fatal("expected spTxIndex to be populated after Load()")
// spIndex must be populated after Load()
if len(store.spIndex) == 0 {
t.Fatal("expected spIndex to be populated after Load()")
}
// Every key in spIndex must also exist in spTxIndex with matching count
for key, count := range store.spIndex {
txs, ok := store.spTxIndex[key]
if !ok {
t.Errorf("spTxIndex missing key %q that exists in spIndex", key)
continue
}
if len(txs) != count {
t.Errorf("spTxIndex[%q] has %d txs, spIndex count is %d", key, len(txs), count)
}
}
// GetSubpathDetail should return correct match count via indexed lookup
// GetSubpathDetail should return correct match count via scan fallback
detail := store.GetSubpathDetail([]string{"eeff", "0011"})
if detail == nil {
t.Fatal("expected non-nil detail for existing subpath")
}
matches, _ := detail["totalMatches"].(int)
if matches != 1 {
t.Errorf("totalMatches = %d, want 1", matches)
if matches != 2 {
t.Errorf("totalMatches = %d, want 2", matches)
}
// Non-existent subpath should return 0 matches
@@ -2394,6 +2394,55 @@ func TestSubpathDetailMixedCaseHops(t *testing.T) {
}
}
// TestSubpathSingletonDrop verifies that singleton entries are pruned from
// spIndex while count>=2 entries are preserved.
func TestSubpathSingletonDrop(t *testing.T) {
db := setupRichTestDB(t)
defer db.Close()
store := NewPacketStore(db, nil)
store.Load()
// "eeff,0011" appears in 2 packets — must survive singleton pruning
if count, ok := store.spIndex["eeff,0011"]; !ok {
t.Fatal("expected spIndex[\"eeff,0011\"] to survive singleton pruning")
} else if count != 2 {
t.Errorf("spIndex[\"eeff,0011\"] = %d, want 2", count)
}
// All count==1 entries must be gone
for key, count := range store.spIndex {
if count < 2 {
t.Errorf("spIndex[%q] = %d, singletons should have been pruned", key, count)
}
}
}
// TestSubpathEmptyDB verifies that the store loads successfully on a DB
// with no transmissions (no subpaths at all).
func TestSubpathEmptyDB(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
store := NewPacketStore(db, nil)
store.Load()
if len(store.spIndex) != 0 {
t.Errorf("expected empty spIndex on empty DB, got %d entries", len(store.spIndex))
}
if store.spTotalPaths != 0 {
t.Errorf("expected spTotalPaths=0 on empty DB, got %d", store.spTotalPaths)
}
// GetSubpathDetail should still work (return zero matches)
detail := store.GetSubpathDetail([]string{"aa", "bb"})
if detail == nil {
t.Fatal("expected non-nil detail even on empty DB")
}
matches, _ := detail["totalMatches"].(int)
if matches != 0 {
t.Errorf("totalMatches on empty DB = %d, want 0", matches)
}
}
func TestStoreGetAnalyticsRFCacheHit(t *testing.T) {
db := setupRichTestDB(t)
defer db.Close()

View File

@@ -546,9 +546,6 @@ func TestEstimateStoreTxBytes(t *testing.T) {
manualCalc := int64(storeTxBaseBytes) + int64(len(tx.RawHex)+len(tx.Hash)+len(tx.DecodedJSON)+len(tx.PathJSON)) + int64(numIndexesPerTx*indexEntryBytes)
manualCalc += perTxMapsBytes
manualCalc += hops * perPathHopBytes
if hops > 1 {
manualCalc += (hops * (hops - 1) / 2) * perSubpathEntryBytes
}
if est != manualCalc {
t.Fatalf("estimateStoreTxBytes = %d, want %d (manual calc)", est, manualCalc)
}

View File

@@ -176,8 +176,7 @@ type PacketStore struct {
// Precomputed subpath index: raw comma-joined hops → occurrence count.
// Built during Load(), incrementally updated on ingest. Avoids full
// packet iteration at query time (O(unique_subpaths) vs O(total_packets)).
spIndex map[string]int // "hop1,hop2" → count
spTxIndex map[string][]*StoreTx // "hop1,hop2" → transmissions containing this subpath
spIndex map[string]int // "hop1,hop2" → count
spTotalPaths int // transmissions with paths >= 2 hops
// Precomputed distance analytics: hop distances and path totals
// computed during Load() and incrementally updated on ingest.
@@ -311,7 +310,7 @@ func NewPacketStore(db *DB, cfg *PacketStoreConfig, cacheTTLs ...map[string]inte
collisionCacheTTL: 3600 * time.Second,
invCooldown: 300 * time.Second,
spIndex: make(map[string]int, 4096),
spTxIndex: make(map[string][]*StoreTx, 4096),
advertPubkeys: make(map[string]int),
lastSeenTouched: make(map[string]time.Time),
clockSkew: NewClockSkewEngine(),
@@ -1537,7 +1536,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
// Incrementally update precomputed subpath index with new transmissions
for _, tx := range broadcastTxs {
if addTxToSubpathIndexFull(s.spIndex, s.spTxIndex, tx) {
if addTxToSubpathIndex(s.spIndex, tx) {
s.spTotalPaths++
}
addTxToPathHopIndex(s.byPathHop, tx)
@@ -1906,7 +1905,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
// Temporarily set parsedPath to old hops for removal.
saved, savedFlag := tx.parsedPath, tx.pathParsed
tx.parsedPath, tx.pathParsed = oldHops, true
if removeTxFromSubpathIndexFull(s.spIndex, s.spTxIndex, tx) {
if removeTxFromSubpathIndex(s.spIndex, tx) {
s.spTotalPaths--
}
tx.parsedPath, tx.pathParsed = saved, savedFlag
@@ -1923,7 +1922,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
}
// pickBestObservation already set pathParsed=false so
// addTxToSubpathIndex will re-parse the new path.
if addTxToSubpathIndexFull(s.spIndex, s.spTxIndex, tx) {
if addTxToSubpathIndex(s.spIndex, tx) {
s.spTotalPaths++
}
addTxToPathHopIndex(s.byPathHop, tx)
@@ -2398,12 +2397,6 @@ func txGetParsedPath(tx *StoreTx) []string {
// increments their counts in the index. Returns true if the tx contributed
// (path had ≥ 2 hops).
func addTxToSubpathIndex(idx map[string]int, tx *StoreTx) bool {
return addTxToSubpathIndexFull(idx, nil, tx)
}
// addTxToSubpathIndexFull is like addTxToSubpathIndex but also appends
// tx to txIdx for each subpath key (if txIdx is non-nil).
func addTxToSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreTx, tx *StoreTx) bool {
hops := txGetParsedPath(tx)
if len(hops) < 2 {
return false
@@ -2413,9 +2406,6 @@ func addTxToSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreTx, tx
for start := 0; start <= len(hops)-l; start++ {
key := strings.ToLower(strings.Join(hops[start:start+l], ","))
idx[key]++
if txIdx != nil {
txIdx[key] = append(txIdx[key], tx)
}
}
}
return true
@@ -2425,12 +2415,6 @@ func addTxToSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreTx, tx
// decrements counts for all raw subpaths of tx. Returns true if the tx
// had a path.
func removeTxFromSubpathIndex(idx map[string]int, tx *StoreTx) bool {
return removeTxFromSubpathIndexFull(idx, nil, tx)
}
// removeTxFromSubpathIndexFull is like removeTxFromSubpathIndex but also
// removes tx from txIdx for each subpath key (if txIdx is non-nil).
func removeTxFromSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreTx, tx *StoreTx) bool {
hops := txGetParsedPath(tx)
if len(hops) < 2 {
return false
@@ -2443,18 +2427,6 @@ func removeTxFromSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreT
if idx[key] <= 0 {
delete(idx, key)
}
if txIdx != nil {
txs := txIdx[key]
for i, t := range txs {
if t == tx {
txIdx[key] = append(txs[:i], txs[i+1:]...)
break
}
}
if len(txIdx[key]) == 0 {
delete(txIdx, key)
}
}
}
}
return true
@@ -2464,15 +2436,27 @@ func removeTxFromSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreT
// Must be called with s.mu held.
func (s *PacketStore) buildSubpathIndex() {
s.spIndex = make(map[string]int, 4096)
s.spTxIndex = make(map[string][]*StoreTx, 4096)
s.spTotalPaths = 0
for _, tx := range s.packets {
if addTxToSubpathIndexFull(s.spIndex, s.spTxIndex, tx) {
if addTxToSubpathIndex(s.spIndex, tx) {
s.spTotalPaths++
}
}
log.Printf("[store] Built subpath index: %d unique raw subpaths from %d paths",
len(s.spIndex), s.spTotalPaths)
// Drop singleton entries (count==1) to save memory. At scale, 60-70% of
// subpath keys appear only once — they're noise and not useful for analytics.
// Singletons that appear after startup via incremental ingest will remain
// in the index until the next restart; this is acceptable since they only
// matter for display ranking and the count is still correct for non-singletons.
total := len(s.spIndex)
dropped := 0
for key, count := range s.spIndex {
if count == 1 {
delete(s.spIndex, key)
dropped++
}
}
log.Printf("[store] Subpath index: kept %d/%d entries (dropped %d singletons)",
total-dropped, total, dropped)
}
// buildPathHopIndex scans all packets and populates byPathHop.
@@ -2656,10 +2640,11 @@ func (s *PacketStore) buildDistanceIndex() {
// usage and independent of GC state.
//
// Issue #743: Previous estimates missed major per-packet allocations:
// - spTxIndex: O(path²) entries per tx (50-150MB at scale)
// - ResolvedPath on observations (~25MB at scale)
// - Per-tx maps: obsKeys, observerSet (~11MB at scale)
// - byPathHop index entries (20-40MB at scale)
// Note: spTxIndex was eliminated in #791 (saved ~280MB at 3.4M subpaths).
// Singleton subpath entries are also dropped from spIndex (#791, saves ~150MB).
const (
storeTxBaseBytes = 384 // StoreTx struct fields + map headers + sync.Once + string headers
storeObsBaseBytes = 192 // StoreObs struct fields + string headers
@@ -2673,15 +2658,12 @@ const (
// Per path hop: byPathHop index entry (pointer + map bucket)
perPathHopBytes = 50
// Per subpath entry in spTxIndex: string key + slice append + pointer
perSubpathEntryBytes = 40
// Per resolved path element on an observation
perResolvedPathElemBytes = 24 // *string pointer + string header + avg pubkey length
)
// estimateStoreTxBytes returns the estimated memory cost of a StoreTx (excluding observations).
// Includes per-tx maps (obsKeys, observerSet), byPathHop entries, and spTxIndex subpath entries.
// Includes per-tx maps (obsKeys, observerSet) and byPathHop entries.
func estimateStoreTxBytes(tx *StoreTx) int64 {
base := int64(storeTxBaseBytes)
base += int64(len(tx.RawHex) + len(tx.Hash) + len(tx.DecodedJSON) + len(tx.PathJSON))
@@ -2694,12 +2676,6 @@ func estimateStoreTxBytes(tx *StoreTx) int64 {
hops := int64(len(txGetParsedPath(tx)))
base += hops * perPathHopBytes
// spTxIndex: O(path²) subpath combinations
if hops > 1 {
subpaths := hops * (hops - 1) / 2
base += subpaths * perSubpathEntryBytes
}
return base
}
@@ -2713,7 +2689,6 @@ func estimateStoreTxBytesTypical(numObs int) int64 {
base += perTxMapsBytes
hops := int64(3)
base += hops * perPathHopBytes
base += (hops * (hops - 1) / 2) * perSubpathEntryBytes
// Add observation costs
obsBase := int64(storeObsBaseBytes) + 30 + 30 + 60 // observer ID + name + path
obsBase += int64(numIndexesPerObs * indexEntryBytes)
@@ -2906,7 +2881,7 @@ func (s *PacketStore) EvictStale() int {
}
// Remove from subpath index
removeTxFromSubpathIndexFull(s.spIndex, s.spTxIndex, tx)
removeTxFromSubpathIndex(s.spIndex, tx)
// Remove from path-hop index
removeTxFromPathHopIndex(s.byPathHop, tx)
}
@@ -7078,8 +7053,27 @@ func (s *PacketStore) GetSubpathDetail(rawHops []string) map[string]interface{}
// Build the subpath key the same way the index does (lowercase, comma-joined)
spKey := strings.ToLower(strings.Join(rawHops, ","))
// Direct lookup instead of scanning all packets
matchedTxs := s.spTxIndex[spKey]
// Scan all transmissions for matching subpaths (replaces former spTxIndex
// lookup — the per-tx pointer index was eliminated in #791 to save ~280MB
// at scale). This is O(packets) but only called on drill-down, not listing.
var matchedTxs []*StoreTx
for _, tx := range s.packets {
hops := txGetParsedPath(tx)
if len(hops) < 2 {
continue
}
maxL := min(8, len(hops))
for l := 2; l <= maxL; l++ {
for start := 0; start <= len(hops)-l; start++ {
key := strings.ToLower(strings.Join(hops[start:start+l], ","))
if key == spKey {
matchedTxs = append(matchedTxs, tx)
goto nextTx
}
}
}
nextTx:
}
hourBuckets := make([]int, 24)
var snrSum, rssiSum float64

View File

@@ -28,7 +28,7 @@ func TestEstimateStoreTxBytes_ReasonableValues(t *testing.T) {
}
// TestEstimateStoreTxBytes_ManyHopsSubpaths verifies that packets with many
// hops estimate significantly more due to O(path²) subpath index entries.
// hops estimate more due to per-hop byPathHop index entries.
func TestEstimateStoreTxBytes_ManyHopsSubpaths(t *testing.T) {
tx2 := &StoreTx{
Hash: "aabb",
@@ -43,12 +43,14 @@ func TestEstimateStoreTxBytes_ManyHopsSubpaths(t *testing.T) {
est2 := estimateStoreTxBytes(tx2)
est10 := estimateStoreTxBytes(tx10)
// 10 hops → 45 subpath combos × 40 = 1800 bytes just for subpaths
// 10 hops vs 2 hops → 8 extra byPathHop entries × perPathHopBytes
if est10 <= est2 {
t.Errorf("10-hop (%d) should estimate more than 2-hop (%d)", est10, est2)
}
if est10 < est2+1500 {
t.Errorf("10-hop (%d) should estimate at least 1500 more than 2-hop (%d)", est10, est2)
// spTxIndex eliminated in #791; cost difference is now linear (per-hop only)
expectedDiff := int64(8) * perPathHopBytes // 8 extra hops
if est10 < est2+expectedDiff {
t.Errorf("10-hop (%d) should estimate at least %d more than 2-hop (%d)", est10, expectedDiff, est2)
}
}