From f897ce1b263dec0bf848a896c8e7f204cb72aa8b Mon Sep 17 00:00:00 2001 From: efiten Date: Sat, 4 Apr 2026 17:41:54 +0200 Subject: [PATCH] fix: use runtime heap stats for memory-based eviction (#564) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem Closes #563. Addresses the *Packet store estimated memory* item in #559. `estimatedMemoryMB()` used a hardcoded formula: ```go return float64(len(s.packets)*5120+s.totalObs*500) / 1048576.0 ``` This ignored three data structures that grow continuously with every ingest cycle: | Structure | Production size | Heap not counted | |---|---|---| | `distHops []distHopRecord` | 1,556,833 records | ~300 MB | | `distPaths []distPathRecord` | 93,090 records | ~25 MB | | `spIndex map[string]int` | 4,113,234 entries | ~400 MB | Result: formula reported ~1.2 GB while actual heap was ~5 GB. With `maxMemoryMB: 1024`, eviction calculated it only needed to shed ~200 MB, removed a handful of packets, and stopped. Memory kept growing until the OOM killer fired. ## Fix Replace `estimatedMemoryMB()` with `runtime.ReadMemStats` so all data structures are automatically counted: ```go func (s *PacketStore) estimatedMemoryMB() float64 { if s.memoryEstimator != nil { return s.memoryEstimator() } var ms runtime.MemStats runtime.ReadMemStats(&ms) return float64(ms.HeapAlloc) / 1048576.0 } ``` Replace the eviction simulation loop (which re-used the same wrong formula) with a proportional calculation: if heap is N× over budget, evict enough packets to keep `(1/N) × 0.9` of the current count. The 0.9 factor adds a 10% buffer so the next ingest cycle doesn't immediately re-trigger. All major data structures (distHops, distPaths, spIndex) scale with packet count, so removing a fraction of packets frees roughly the same fraction of total heap. ## Testing - Updated `TestEvictStale_MemoryBasedEviction` to inject a deterministic estimator via the new `memoryEstimator` field. - Added `TestEvictStale_MemoryBasedEviction_UnderestimatedHeap`: verifies that when actual heap is 5× over limit (the production failure scenario), eviction correctly removes ~80%+ of packets. ``` === RUN TestEvictStale_MemoryBasedEviction [store] Evicted 538 packets (1076 obs) --- PASS === RUN TestEvictStale_MemoryBasedEviction_UnderestimatedHeap [store] Evicted 820 packets (1640 obs) --- PASS ``` Full suite: `go test ./...` — ok (10.3s) ## Perf note `runtime.ReadMemStats` runs once per eviction tick (every 60 s) and once per `/api/perf/store` call. Cost is negligible. Co-authored-by: Claude Sonnet 4.6 --- cmd/server/eviction_test.go | 38 +++++++++++++++++---- cmd/server/store.go | 68 ++++++++++++++++++------------------- 2 files changed, 66 insertions(+), 40 deletions(-) diff --git a/cmd/server/eviction_test.go b/cmd/server/eviction_test.go index 4bba9c0b..9cb5ace6 100644 --- a/cmd/server/eviction_test.go +++ b/cmd/server/eviction_test.go @@ -162,24 +162,50 @@ func TestEvictStale_NoEvictionWhenDisabled(t *testing.T) { func TestEvictStale_MemoryBasedEviction(t *testing.T) { now := time.Now().UTC() - // Create enough packets to exceed a small memory limit - // 1000 packets * 5KB + 2000 obs * 500B ≈ 6MB store := makeTestStore(1000, now.Add(-1*time.Hour), 0) - // All packets are recent (1h old) so time-based won't trigger + // All packets are recent (1h old) so time-based won't trigger. store.retentionHours = 24 - store.maxMemoryMB = 3 // ~3MB limit, should evict roughly half + store.maxMemoryMB = 3 + // Inject deterministic estimator: simulates 6MB (over 3MB limit). + // Uses packet count so it scales correctly after eviction. + store.memoryEstimator = func() float64 { + return float64(len(store.packets)*5120+store.totalObs*500) / 1048576.0 + } evicted := store.EvictStale() if evicted == 0 { t.Fatal("expected some evictions for memory cap") } - // After eviction, estimated memory should be <= 3MB estMB := store.estimatedMemoryMB() - if estMB > 3.5 { // small tolerance + if estMB > 3.5 { t.Fatalf("expected <=3.5MB after eviction, got %.1fMB", estMB) } } +// TestEvictStale_MemoryBasedEviction_UnderestimatedHeap verifies that eviction +// fires correctly when actual heap is much larger than a formula-based estimate +// would report — the scenario that caused OOM kills in production. +func TestEvictStale_MemoryBasedEviction_UnderestimatedHeap(t *testing.T) { + now := time.Now().UTC() + store := makeTestStore(1000, now.Add(-1*time.Hour), 0) + store.retentionHours = 24 + store.maxMemoryMB = 500 + // Simulate actual heap 5x over budget (like production: ~5GB actual vs ~1GB limit). + store.memoryEstimator = func() float64 { + return 2500.0 // 2500MB actual vs 500MB limit + } + + evicted := store.EvictStale() + if evicted == 0 { + t.Fatal("expected evictions when heap is 5x over limit") + } + // Should keep roughly 500/2500 * 0.9 = 18% of packets → ~180 of 1000. + remaining := len(store.packets) + if remaining > 250 { + t.Fatalf("expected most packets evicted (heap 5x over), but %d of 1000 remain", remaining) + } +} + func TestEvictStale_CleansNodeIndexes(t *testing.T) { now := time.Now().UTC() store := makeTestStore(10, now.Add(-48*time.Hour), 0) diff --git a/cmd/server/store.go b/cmd/server/store.go index 0c5da5eb..3a78af2b 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "math" + "runtime" "sort" "strconv" "strings" @@ -152,9 +153,10 @@ type PacketStore struct { graph *NeighborGraph // Eviction config and stats - retentionHours float64 // 0 = unlimited - maxMemoryMB int // 0 = unlimited - evicted int64 // total packets evicted + retentionHours float64 // 0 = unlimited + maxMemoryMB int // 0 = unlimited + evicted int64 // total packets evicted + memoryEstimator func() float64 // injectable for tests; nil = use runtime.ReadMemStats } // Precomputed distance records for fast analytics aggregation. @@ -367,9 +369,8 @@ func (s *PacketStore) Load() error { s.loaded = true elapsed := time.Since(t0) - estMB := (len(s.packets)*5120 + s.totalObs*500) / (1024 * 1024) - log.Printf("[store] Loaded %d transmissions (%d observations) in %v (~%dMB est)", - len(s.packets), s.totalObs, elapsed, estMB) + log.Printf("[store] Loaded %d transmissions (%d observations) in %v (heap ~%.0fMB)", + len(s.packets), s.totalObs, elapsed, s.estimatedMemoryMB()) return nil } @@ -677,8 +678,7 @@ func (s *PacketStore) GetPerfStoreStats() map[string]interface{} { advertByObsCount := len(s.advertPubkeys) s.mu.RUnlock() - // Realistic estimate: ~5KB per packet + ~500 bytes per observation - estimatedMB := math.Round(float64(totalLoaded*5120+totalObs*500)/1048576*10) / 10 + estimatedMB := math.Round(s.estimatedMemoryMB()*10) / 10 evicted := atomic.LoadInt64(&s.evicted) @@ -848,7 +848,7 @@ func (s *PacketStore) GetPerfStoreStatsTyped() PerfPacketStoreStats { advertByObsCount := len(s.advertPubkeys) s.mu.RUnlock() - estimatedMB := math.Round(float64(totalLoaded*5120+totalObs*500)/1048576*10) / 10 + estimatedMB := math.Round(s.estimatedMemoryMB()*10) / 10 return PerfPacketStoreStats{ TotalLoaded: totalLoaded, @@ -2046,9 +2046,17 @@ func (s *PacketStore) buildDistanceIndex() { len(s.distHops), len(s.distPaths)) } -// estimatedMemoryMB returns estimated memory usage of the packet store. +// estimatedMemoryMB returns current Go heap allocation in MB. +// Uses runtime.ReadMemStats so it accounts for all data structures +// (distHops, distPaths, spIndex, map overhead) not just packets/observations. +// In tests, memoryEstimator can be set to inject a deterministic value. func (s *PacketStore) estimatedMemoryMB() float64 { - return float64(len(s.packets)*5120+s.totalObs*500) / 1048576.0 + if s.memoryEstimator != nil { + return s.memoryEstimator() + } + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + return float64(ms.HeapAlloc) / 1048576.0 } // EvictStale removes packets older than the retention window and/or exceeding @@ -2069,31 +2077,25 @@ func (s *PacketStore) EvictStale() int { } } - // Memory-based eviction: if still over budget, trim more from head + // Memory-based eviction: if heap exceeds budget, trim proportionally from head. + // All major data structures (distHops, distPaths, spIndex) scale with packet count, + // so evicting a fraction of packets frees roughly the same fraction of total heap. + // A 10% buffer avoids immediately re-triggering on the next ingest cycle. if s.maxMemoryMB > 0 { - for cutoffIdx < len(s.packets) && s.estimatedMemoryMB() > float64(s.maxMemoryMB) { - // Estimate how many more to evict: rough binary approach - overMB := s.estimatedMemoryMB() - float64(s.maxMemoryMB) - // ~5KB per packet, so overMB * 1024*1024 / 5120 packets - extra := int(overMB * 1048576.0 / 5120.0) - if extra < 100 { - extra = 100 + currentMB := s.estimatedMemoryMB() + if currentMB > float64(s.maxMemoryMB) && len(s.packets) > 0 { + fractionToKeep := (float64(s.maxMemoryMB) / currentMB) * 0.9 + keepCount := int(float64(len(s.packets)) * fractionToKeep) + if keepCount < 0 { + keepCount = 0 + } + newCutoff := len(s.packets) - keepCount + if newCutoff > cutoffIdx { + cutoffIdx = newCutoff } - cutoffIdx += extra if cutoffIdx > len(s.packets) { cutoffIdx = len(s.packets) } - // Recalculate estimated memory with fewer packets - // (we haven't actually removed yet, so simulate) - remainingPkts := len(s.packets) - cutoffIdx - remainingObs := s.totalObs - for _, tx := range s.packets[:cutoffIdx] { - remainingObs -= len(tx.Observations) - } - estMB := float64(remainingPkts*5120+remainingObs*500) / 1048576.0 - if estMB <= float64(s.maxMemoryMB) { - break - } } } @@ -2207,9 +2209,7 @@ func (s *PacketStore) EvictStale() int { evictCount := cutoffIdx atomic.AddInt64(&s.evicted, int64(evictCount)) - freedMB := float64(evictCount*5120+evictedObs*500) / 1048576.0 - log.Printf("[store] Evicted %d packets older than %.0fh (freed ~%.1fMB estimated)", - evictCount, s.retentionHours, freedMB) + log.Printf("[store] Evicted %d packets (%d obs)", evictCount, evictedObs) // Eviction removes data — all caches may be affected s.invalidateCachesFor(cacheInvalidation{eviction: true})