fix: redesign memory eviction — self-accounting trackedBytes, watermarks, safety cap (#711)

## Problem

`HeapAlloc`-based eviction cascades on large databases — evicts down to
near-zero packets because Go runtime overhead exceeds `maxMemoryMB` even
with an empty packet store.

## Fix (per Carmack spec on #710)

1. **Self-accounting `trackedBytes`** — running counter maintained on
insert/evict, computed from actual struct sizes. No
`runtime.ReadMemStats`.
2. **High/low watermark hysteresis** (100%/85%) — evict to 85% of
budget, don't re-trigger until 100% crossed again.
3. **25% per-pass safety cap** — never evict more than a quarter of
packets in one cycle.
4. **Oldest-first** — evict from sorted head, O(1) candidate selection.

`maxMemoryMB` now means packet store budget, not total process heap.

Fixes #710

Co-authored-by: you <you@example.com>
This commit is contained in:
Kpa-clawbot
2026-04-11 23:06:48 -07:00
committed by GitHub
parent 7e0b904d09
commit 4a7e20a8cb
3 changed files with 302 additions and 42 deletions
+219 -20
View File
@@ -85,6 +85,12 @@ func makeTestStore(count int, startTime time.Time, intervalMin int) *PacketStore
// Subpath index
addTxToSubpathIndex(store.spIndex, tx)
// Track bytes for self-accounting
store.trackedBytes += estimateStoreTxBytes(tx)
for _, obs := range tx.Observations {
store.trackedBytes += estimateStoreObsBytes(obs)
}
}
return store
@@ -166,43 +172,43 @@ func TestEvictStale_MemoryBasedEviction(t *testing.T) {
// All packets are recent (1h old) so time-based won't trigger.
store.retentionHours = 24
store.maxMemoryMB = 3
// Inject deterministic estimator: simulates 6MB (over 3MB limit).
// Uses packet count so it scales correctly after eviction.
store.memoryEstimator = func() float64 {
return float64(len(store.packets)*5120+store.totalObs*500) / 1048576.0
}
// Set trackedBytes to simulate 6MB (over 3MB limit).
store.trackedBytes = 6 * 1048576
evicted := store.EvictStale()
if evicted == 0 {
t.Fatal("expected some evictions for memory cap")
}
estMB := store.estimatedMemoryMB()
if estMB > 3.5 {
t.Fatalf("expected <=3.5MB after eviction, got %.1fMB", estMB)
// 25% safety cap should limit to 250 per pass
if evicted > 250 {
t.Fatalf("25%% safety cap violated: evicted %d", evicted)
}
// trackedBytes should have decreased
if store.trackedBytes >= 6*1048576 {
t.Fatal("trackedBytes should have decreased after eviction")
}
}
// TestEvictStale_MemoryBasedEviction_UnderestimatedHeap verifies that eviction
// fires correctly when actual heap is much larger than a formula-based estimate
// would report — the scenario that caused OOM kills in production.
// TestEvictStale_MemoryBasedEviction_UnderestimatedHeap verifies that the 25%
// safety cap prevents cascading eviction even when trackedBytes is very high.
func TestEvictStale_MemoryBasedEviction_UnderestimatedHeap(t *testing.T) {
now := time.Now().UTC()
store := makeTestStore(1000, now.Add(-1*time.Hour), 0)
store.retentionHours = 24
store.maxMemoryMB = 500
// Simulate actual heap 5x over budget (like production: ~5GB actual vs ~1GB limit).
store.memoryEstimator = func() float64 {
return 2500.0 // 2500MB actual vs 500MB limit
}
// Simulate trackedBytes 5x over budget.
store.trackedBytes = 2500 * 1048576
evicted := store.EvictStale()
if evicted == 0 {
t.Fatal("expected evictions when heap is 5x over limit")
t.Fatal("expected evictions when tracked is 5x over limit")
}
// Should keep roughly 500/2500 * 0.9 = 18% of packets → ~180 of 1000.
remaining := len(store.packets)
if remaining > 250 {
t.Fatalf("expected most packets evicted (heap 5x over), but %d of 1000 remain", remaining)
// Safety cap: max 25% per pass = 250
if evicted > 250 {
t.Fatalf("25%% safety cap violated: evicted %d of 1000", evicted)
}
if evicted != 250 {
t.Fatalf("expected exactly 250 evicted (25%% cap), got %d", evicted)
}
}
@@ -375,3 +381,196 @@ func TestCacheTTLDefaults(t *testing.T) {
t.Fatalf("expected default rfCacheTTL=15s, got %v", store.rfCacheTTL)
}
}
// --- Self-accounting memory tracking tests ---
func TestTrackedBytes_IncreasesOnInsert(t *testing.T) {
now := time.Now().UTC()
store := makeTestStore(0, now, 0)
if store.trackedBytes != 0 {
t.Fatalf("expected 0 trackedBytes for empty store, got %d", store.trackedBytes)
}
store2 := makeTestStore(10, now, 1)
if store2.trackedBytes <= 0 {
t.Fatal("expected positive trackedBytes after inserting 10 packets")
}
// Each packet has 2 observations; should be roughly 10*(384+5*48) + 20*(192+2*48) = 10*624 + 20*288 = 12000
expectedMin := int64(10*600 + 20*250) // rough lower bound
if store2.trackedBytes < expectedMin {
t.Fatalf("trackedBytes %d seems too low (expected > %d)", store2.trackedBytes, expectedMin)
}
}
func TestTrackedBytes_DecreasesOnEvict(t *testing.T) {
now := time.Now().UTC()
store := makeTestStore(100, now.Add(-48*time.Hour), 0)
store.retentionHours = 24
beforeBytes := store.trackedBytes
if beforeBytes <= 0 {
t.Fatal("expected positive trackedBytes before eviction")
}
evicted := store.EvictStale()
if evicted != 100 {
t.Fatalf("expected 100 evicted, got %d", evicted)
}
if store.trackedBytes != 0 {
t.Fatalf("expected 0 trackedBytes after evicting all, got %d", store.trackedBytes)
}
}
func TestTrackedBytes_MatchesExpectedAfterMixedInsertEvict(t *testing.T) {
now := time.Now().UTC()
// Create 100 packets, 50 old + 50 recent
store := makeTestStore(100, now.Add(-48*time.Hour), 0)
for i := 50; i < 100; i++ {
store.packets[i].FirstSeen = now.Add(-1 * time.Hour).Format(time.RFC3339)
}
store.retentionHours = 24
totalBefore := store.trackedBytes
// Calculate expected bytes for first 50 packets (to be evicted)
var evictedBytes int64
for i := 0; i < 50; i++ {
tx := store.packets[i]
evictedBytes += estimateStoreTxBytes(tx)
for _, obs := range tx.Observations {
evictedBytes += estimateStoreObsBytes(obs)
}
}
store.EvictStale()
expectedAfter := totalBefore - evictedBytes
if store.trackedBytes != expectedAfter {
t.Fatalf("trackedBytes %d != expected %d (before=%d, evicted=%d)",
store.trackedBytes, expectedAfter, totalBefore, evictedBytes)
}
}
func TestWatermarkHysteresis(t *testing.T) {
now := time.Now().UTC()
store := makeTestStore(1000, now.Add(-1*time.Hour), 0)
store.retentionHours = 0 // no time-based eviction
store.maxMemoryMB = 1 // 1MB budget
// Set trackedBytes to just above high watermark
highWatermark := int64(1 * 1048576)
lowWatermark := int64(float64(highWatermark) * 0.85)
store.trackedBytes = highWatermark + 1
evicted := store.EvictStale()
if evicted == 0 {
t.Fatal("expected eviction when above high watermark")
}
if store.trackedBytes > lowWatermark+1024 {
t.Fatalf("expected trackedBytes near low watermark after eviction, got %d (low=%d)",
store.trackedBytes, lowWatermark)
}
// Now set trackedBytes to just below high watermark — should NOT trigger
store.trackedBytes = highWatermark - 1
evicted2 := store.EvictStale()
if evicted2 != 0 {
t.Fatalf("expected no eviction below high watermark, got %d", evicted2)
}
}
func TestSafetyCap25Percent(t *testing.T) {
now := time.Now().UTC()
store := makeTestStore(1000, now.Add(-1*time.Hour), 0)
store.retentionHours = 0
store.maxMemoryMB = 1
// Set trackedBytes way over limit to force maximum eviction
store.trackedBytes = 100 * 1048576 // 100MB vs 1MB limit
evicted := store.EvictStale()
// 25% of 1000 = 250
if evicted > 250 {
t.Fatalf("25%% safety cap violated: evicted %d of 1000 (max should be 250)", evicted)
}
if evicted != 250 {
t.Fatalf("expected exactly 250 evicted (25%% cap), got %d", evicted)
}
if len(store.packets) != 750 {
t.Fatalf("expected 750 remaining, got %d", len(store.packets))
}
}
func TestMultiplePassesConverge(t *testing.T) {
now := time.Now().UTC()
store := makeTestStore(1000, now.Add(-1*time.Hour), 0)
store.retentionHours = 0
// Set budget to half the actual tracked bytes — requires ~2 passes
actualBytes := store.trackedBytes
store.maxMemoryMB = int(float64(actualBytes) / 1048576.0 / 2)
if store.maxMemoryMB < 1 {
store.maxMemoryMB = 1
}
totalEvicted := 0
for pass := 0; pass < 20; pass++ {
evicted := store.EvictStale()
if evicted == 0 {
break
}
totalEvicted += evicted
}
// After convergence, trackedBytes should be at or below high watermark
// (may be between low and high due to hysteresis — that's fine)
highWatermark := int64(store.maxMemoryMB) * 1048576
if store.trackedBytes > highWatermark {
t.Fatalf("did not converge: trackedBytes=%d (%.1fMB) > highWatermark=%d after multiple passes",
store.trackedBytes, float64(store.trackedBytes)/1048576.0, highWatermark)
}
if totalEvicted == 0 {
t.Fatal("expected some evictions across multiple passes")
}
}
func TestEstimateStoreTxBytes(t *testing.T) {
tx := &StoreTx{
RawHex: "aabbcc",
Hash: "hash1234",
DecodedJSON: `{"pubKey":"pk1"}`,
PathJSON: `["aa","bb"]`,
}
est := estimateStoreTxBytes(tx)
// Verify the function returns a reasonable value matching our manual calculation
manualCalc := int64(storeTxBaseBytes) + int64(len(tx.RawHex)+len(tx.Hash)+len(tx.DecodedJSON)+len(tx.PathJSON)) + int64(numIndexesPerTx*indexEntryBytes)
if est != manualCalc {
t.Fatalf("estimateStoreTxBytes = %d, want %d (manual calc)", est, manualCalc)
}
if est < 600 || est > 800 {
t.Fatalf("estimateStoreTxBytes = %d, expected in range [600, 800]", est)
}
}
func TestEstimateStoreObsBytes(t *testing.T) {
obs := &StoreObs{
ObserverID: "obs123",
PathJSON: `["aa"]`,
}
est := estimateStoreObsBytes(obs)
// storeObsBaseBytes(192) + len(ObserverID=6) + len(PathJSON=6) + 2*48(96) = 300
expected := int64(192 + 6 + 6 + 2*48)
if est != expected {
t.Fatalf("estimateStoreObsBytes = %d, want %d", est, expected)
}
}
func BenchmarkEviction100K(b *testing.B) {
now := time.Now().UTC()
for i := 0; i < b.N; i++ {
b.StopTimer()
store := makeTestStore(100000, now.Add(-48*time.Hour), 0)
store.retentionHours = 24
b.StartTimer()
store.EvictStale()
}
}
+82 -22
View File
@@ -208,9 +208,10 @@ type PacketStore struct {
// Eviction config and stats
retentionHours float64 // 0 = unlimited
maxMemoryMB int // 0 = unlimited
maxMemoryMB int // 0 = unlimited (packet store memory budget)
evicted int64 // total packets evicted
memoryEstimator func() float64 // injectable for tests; nil = use runtime.ReadMemStats
trackedBytes int64 // running total of estimated packet store memory
memoryEstimator func() float64 // injectable for tests; nil = use runtime.ReadMemStats (stats only)
}
// Precomputed distance records for fast analytics aggregation.
@@ -407,6 +408,7 @@ func (s *PacketStore) Load() error {
s.byPayloadType[pt] = append(s.byPayloadType[pt], tx)
}
s.trackAdvertPubkey(tx)
s.trackedBytes += estimateStoreTxBytes(tx)
}
if obsID.Valid {
@@ -455,6 +457,7 @@ func (s *PacketStore) Load() error {
}
s.totalObs++
s.trackedBytes += estimateStoreObsBytes(obs)
}
}
@@ -474,8 +477,8 @@ func (s *PacketStore) Load() error {
s.loaded = true
elapsed := time.Since(t0)
log.Printf("[store] Loaded %d transmissions (%d observations) in %v (heap ~%.0fMB)",
len(s.packets), s.totalObs, elapsed, s.estimatedMemoryMB())
log.Printf("[store] Loaded %d transmissions (%d observations) in %v (tracked ~%.0fMB, heap ~%.0fMB)",
len(s.packets), s.totalObs, elapsed, s.trackedMemoryMB(), s.estimatedMemoryMB())
return nil
}
@@ -887,6 +890,7 @@ func (s *PacketStore) GetPerfStoreStats() map[string]interface{} {
s.mu.RUnlock()
estimatedMB := math.Round(s.estimatedMemoryMB()*10) / 10
trackedMB := math.Round(s.trackedMemoryMB()*10) / 10
evicted := atomic.LoadInt64(&s.evicted)
@@ -901,6 +905,7 @@ func (s *PacketStore) GetPerfStoreStats() map[string]interface{} {
"retentionHours": s.retentionHours,
"maxMemoryMB": s.maxMemoryMB,
"estimatedMB": estimatedMB,
"trackedMB": trackedMB,
"indexes": map[string]interface{}{
"byHash": hashIdx,
"byTxID": txIdx,
@@ -1058,6 +1063,7 @@ func (s *PacketStore) GetPerfStoreStatsTyped() PerfPacketStoreStats {
s.mu.RUnlock()
estimatedMB := math.Round(s.estimatedMemoryMB()*10) / 10
trackedMB := math.Round(s.trackedMemoryMB()*10) / 10
return PerfPacketStoreStats{
TotalLoaded: totalLoaded,
@@ -1069,6 +1075,7 @@ func (s *PacketStore) GetPerfStoreStatsTyped() PerfPacketStoreStats {
SqliteOnly: false,
MaxPackets: 2386092,
EstimatedMB: estimatedMB,
TrackedMB: trackedMB,
MaxMB: s.maxMemoryMB,
Indexes: PacketStoreIndexes{
ByHash: hashIdx,
@@ -1374,6 +1381,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
s.byPayloadType[pt] = append(s.byPayloadType[pt], tx)
}
s.trackAdvertPubkey(tx)
s.trackedBytes += estimateStoreTxBytes(tx)
if _, exists := broadcastTxs[r.txID]; !exists {
broadcastTxs[r.txID] = tx
@@ -1430,6 +1438,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
s.byObserver[r.observerID] = append(s.byObserver[r.observerID], obs)
}
s.totalObs++
s.trackedBytes += estimateStoreObsBytes(obs)
}
}
@@ -1744,6 +1753,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
s.byObserver[r.observerID] = append(s.byObserver[r.observerID], obs)
}
s.totalObs++
s.trackedBytes += estimateStoreObsBytes(obs)
updatedTxs[r.txID] = tx
decoded := map[string]interface{}{
@@ -2557,9 +2567,36 @@ func (s *PacketStore) buildDistanceIndex() {
len(s.distHops), len(s.distPaths))
}
// Self-accounting memory estimation constants.
// These estimate the in-memory cost of StoreTx and StoreObs structs including
// map/index overhead. They don't need to be exact — just proportional to actual
// usage and independent of GC state.
const (
storeTxBaseBytes = 384 // StoreTx struct fields + map headers + sync.Once + string headers
storeObsBaseBytes = 192 // StoreObs struct fields + string headers
indexEntryBytes = 48 // average cost of one index map entry (key + pointer + bucket overhead)
numIndexesPerTx = 5 // byHash, byTxID, byNode, byPayloadType, nodeHashes entries
numIndexesPerObs = 2 // byObsID, byObserver entries
)
// estimateStoreTxBytes returns the estimated memory cost of a StoreTx (excluding observations).
func estimateStoreTxBytes(tx *StoreTx) int64 {
base := int64(storeTxBaseBytes)
base += int64(len(tx.RawHex) + len(tx.Hash) + len(tx.DecodedJSON) + len(tx.PathJSON))
base += int64(numIndexesPerTx * indexEntryBytes)
return base
}
// estimateStoreObsBytes returns the estimated memory cost of a StoreObs.
func estimateStoreObsBytes(obs *StoreObs) int64 {
base := int64(storeObsBaseBytes)
base += int64(len(obs.PathJSON) + len(obs.ObserverID))
base += int64(numIndexesPerObs * indexEntryBytes)
return base
}
// estimatedMemoryMB returns current Go heap allocation in MB.
// Uses runtime.ReadMemStats so it accounts for all data structures
// (distHops, distPaths, spIndex, map overhead) not just packets/observations.
// Kept for stats/debug endpoints only — NOT used in eviction decisions.
// In tests, memoryEstimator can be set to inject a deterministic value.
func (s *PacketStore) estimatedMemoryMB() float64 {
if s.memoryEstimator != nil {
@@ -2570,6 +2607,11 @@ func (s *PacketStore) estimatedMemoryMB() float64 {
return float64(ms.HeapAlloc) / 1048576.0
}
// trackedMemoryMB returns the self-accounted packet store memory in MB.
func (s *PacketStore) trackedMemoryMB() float64 {
return float64(s.trackedBytes) / 1048576.0
}
// EvictStale removes packets older than the retention window and/or exceeding
// the memory cap. Must be called with s.mu held (Lock). Returns the number of
// packets evicted.
@@ -2588,24 +2630,34 @@ func (s *PacketStore) EvictStale() int {
}
}
// Memory-based eviction: if heap exceeds budget, trim proportionally from head.
// All major data structures (distHops, distPaths, spIndex) scale with packet count,
// so evicting a fraction of packets frees roughly the same fraction of total heap.
// A 10% buffer avoids immediately re-triggering on the next ingest cycle.
// Memory-based eviction: use self-accounted trackedBytes with watermark hysteresis.
// High watermark = maxMemoryMB (trigger), low watermark = 85% (stop).
// Safety cap: never evict more than 25% of packets in a single pass.
if s.maxMemoryMB > 0 {
currentMB := s.estimatedMemoryMB()
if currentMB > float64(s.maxMemoryMB) && len(s.packets) > 0 {
fractionToKeep := (float64(s.maxMemoryMB) / currentMB) * 0.9
keepCount := int(float64(len(s.packets)) * fractionToKeep)
if keepCount < 0 {
keepCount = 0
highWatermark := int64(s.maxMemoryMB) * 1048576
lowWatermark := int64(float64(highWatermark) * 0.85)
if s.trackedBytes > highWatermark && len(s.packets) > 0 {
// Evict from head until trackedBytes would drop below low watermark
var bytesToEvict int64
memCutoff := cutoffIdx
for memCutoff < len(s.packets) && (s.trackedBytes-bytesToEvict) > lowWatermark {
tx := s.packets[memCutoff]
bytesToEvict += estimateStoreTxBytes(tx)
for _, obs := range tx.Observations {
bytesToEvict += estimateStoreObsBytes(obs)
}
memCutoff++
}
newCutoff := len(s.packets) - keepCount
if newCutoff > cutoffIdx {
cutoffIdx = newCutoff
// Safety cap: never evict more than 25% in a single pass
maxEvict := len(s.packets) / 4
if maxEvict < 1 {
maxEvict = 1
}
if cutoffIdx > len(s.packets) {
cutoffIdx = len(s.packets)
if memCutoff > maxEvict {
memCutoff = maxEvict
}
if memCutoff > cutoffIdx {
cutoffIdx = memCutoff
}
}
}
@@ -2619,6 +2671,7 @@ func (s *PacketStore) EvictStale() int {
evicting := s.packets[:cutoffIdx]
evictedObs := 0
var evictedBytes int64
// Build sets of evicted IDs for batch removal from secondary indexes
evictedTxIDs := make(map[int]struct{}, cutoffIdx)
@@ -2634,10 +2687,12 @@ func (s *PacketStore) EvictStale() int {
delete(s.byHash, tx.Hash)
delete(s.byTxID, tx.ID)
evictedTxIDs[tx.ID] = struct{}{}
evictedBytes += estimateStoreTxBytes(tx)
for _, obs := range tx.Observations {
delete(s.byObsID, obs.ID)
evictedObsIDs[obs.ID] = struct{}{}
evictedBytes += estimateStoreObsBytes(obs)
if obs.ObserverID != "" {
affectedObservers[obs.ObserverID] = struct{}{}
}
@@ -2789,7 +2844,12 @@ func (s *PacketStore) EvictStale() int {
evictCount := cutoffIdx
atomic.AddInt64(&s.evicted, int64(evictCount))
log.Printf("[store] Evicted %d packets (%d obs)", evictCount, evictedObs)
s.trackedBytes -= evictedBytes
if s.trackedBytes < 0 {
s.trackedBytes = 0
}
log.Printf("[store] Evicted %d packets (%d obs, freed ~%.1fMB, tracked ~%.1fMB)",
evictCount, evictedObs, float64(evictedBytes)/1048576.0, s.trackedMemoryMB())
// Eviction removes data — all caches may be affected
s.invalidateCachesFor(cacheInvalidation{eviction: true})
+1
View File
@@ -174,6 +174,7 @@ type PerfPacketStoreStats struct {
SqliteOnly bool `json:"sqliteOnly"`
MaxPackets int `json:"maxPackets"`
EstimatedMB float64 `json:"estimatedMB"`
TrackedMB float64 `json:"trackedMB"`
MaxMB int `json:"maxMB"`
Indexes PacketStoreIndexes `json:"indexes"`
}