fix: use runtime heap stats for memory-based eviction (#564)

## Problem

Closes #563. Addresses the *Packet store estimated memory* item in #559.

`estimatedMemoryMB()` used a hardcoded formula:

```go
return float64(len(s.packets)*5120+s.totalObs*500) / 1048576.0
```

This ignored three data structures that grow continuously with every
ingest cycle:

| Structure | Production size | Heap not counted |
|---|---|---|
| `distHops []distHopRecord` | 1,556,833 records | ~300 MB |
| `distPaths []distPathRecord` | 93,090 records | ~25 MB |
| `spIndex map[string]int` | 4,113,234 entries | ~400 MB |

Result: formula reported ~1.2 GB while actual heap was ~5 GB. With
`maxMemoryMB: 1024`, eviction calculated it only needed to shed ~200 MB,
removed a handful of packets, and stopped. Memory kept growing until the
OOM killer fired.

## Fix

Replace `estimatedMemoryMB()` with `runtime.ReadMemStats` so all data
structures are automatically counted:

```go
func (s *PacketStore) estimatedMemoryMB() float64 {
    if s.memoryEstimator != nil {
        return s.memoryEstimator()
    }
    var ms runtime.MemStats
    runtime.ReadMemStats(&ms)
    return float64(ms.HeapAlloc) / 1048576.0
}
```

Replace the eviction simulation loop (which re-used the same wrong
formula) with a proportional calculation: if heap is N× over budget,
evict enough packets to keep `(1/N) × 0.9` of the current count. The 0.9
factor adds a 10% buffer so the next ingest cycle doesn't immediately
re-trigger. All major data structures (distHops, distPaths, spIndex)
scale with packet count, so removing a fraction of packets frees roughly
the same fraction of total heap.

## Testing

- Updated `TestEvictStale_MemoryBasedEviction` to inject a deterministic
estimator via the new `memoryEstimator` field.
- Added `TestEvictStale_MemoryBasedEviction_UnderestimatedHeap`:
verifies that when actual heap is 5× over limit (the production failure
scenario), eviction correctly removes ~80%+ of packets.

```
=== RUN   TestEvictStale_MemoryBasedEviction
[store] Evicted 538 packets (1076 obs)
--- PASS

=== RUN   TestEvictStale_MemoryBasedEviction_UnderestimatedHeap
[store] Evicted 820 packets (1640 obs)
--- PASS
```

Full suite: `go test ./...` — ok (10.3s)

## Perf note

`runtime.ReadMemStats` runs once per eviction tick (every 60 s) and once
per `/api/perf/store` call. Cost is negligible.

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
efiten
2026-04-04 17:41:54 +02:00
committed by GitHub
parent cbfce41d7e
commit f897ce1b26
2 changed files with 66 additions and 40 deletions
+32 -6
View File
@@ -162,24 +162,50 @@ func TestEvictStale_NoEvictionWhenDisabled(t *testing.T) {
func TestEvictStale_MemoryBasedEviction(t *testing.T) {
now := time.Now().UTC()
// Create enough packets to exceed a small memory limit
// 1000 packets * 5KB + 2000 obs * 500B ≈ 6MB
store := makeTestStore(1000, now.Add(-1*time.Hour), 0)
// All packets are recent (1h old) so time-based won't trigger
// All packets are recent (1h old) so time-based won't trigger.
store.retentionHours = 24
store.maxMemoryMB = 3 // ~3MB limit, should evict roughly half
store.maxMemoryMB = 3
// Inject deterministic estimator: simulates 6MB (over 3MB limit).
// Uses packet count so it scales correctly after eviction.
store.memoryEstimator = func() float64 {
return float64(len(store.packets)*5120+store.totalObs*500) / 1048576.0
}
evicted := store.EvictStale()
if evicted == 0 {
t.Fatal("expected some evictions for memory cap")
}
// After eviction, estimated memory should be <= 3MB
estMB := store.estimatedMemoryMB()
if estMB > 3.5 { // small tolerance
if estMB > 3.5 {
t.Fatalf("expected <=3.5MB after eviction, got %.1fMB", estMB)
}
}
// TestEvictStale_MemoryBasedEviction_UnderestimatedHeap verifies that eviction
// fires correctly when actual heap is much larger than a formula-based estimate
// would report — the scenario that caused OOM kills in production.
func TestEvictStale_MemoryBasedEviction_UnderestimatedHeap(t *testing.T) {
now := time.Now().UTC()
store := makeTestStore(1000, now.Add(-1*time.Hour), 0)
store.retentionHours = 24
store.maxMemoryMB = 500
// Simulate actual heap 5x over budget (like production: ~5GB actual vs ~1GB limit).
store.memoryEstimator = func() float64 {
return 2500.0 // 2500MB actual vs 500MB limit
}
evicted := store.EvictStale()
if evicted == 0 {
t.Fatal("expected evictions when heap is 5x over limit")
}
// Should keep roughly 500/2500 * 0.9 = 18% of packets → ~180 of 1000.
remaining := len(store.packets)
if remaining > 250 {
t.Fatalf("expected most packets evicted (heap 5x over), but %d of 1000 remain", remaining)
}
}
func TestEvictStale_CleansNodeIndexes(t *testing.T) {
now := time.Now().UTC()
store := makeTestStore(10, now.Add(-48*time.Hour), 0)
+34 -34
View File
@@ -6,6 +6,7 @@ import (
"fmt"
"log"
"math"
"runtime"
"sort"
"strconv"
"strings"
@@ -152,9 +153,10 @@ type PacketStore struct {
graph *NeighborGraph
// Eviction config and stats
retentionHours float64 // 0 = unlimited
maxMemoryMB int // 0 = unlimited
evicted int64 // total packets evicted
retentionHours float64 // 0 = unlimited
maxMemoryMB int // 0 = unlimited
evicted int64 // total packets evicted
memoryEstimator func() float64 // injectable for tests; nil = use runtime.ReadMemStats
}
// Precomputed distance records for fast analytics aggregation.
@@ -367,9 +369,8 @@ func (s *PacketStore) Load() error {
s.loaded = true
elapsed := time.Since(t0)
estMB := (len(s.packets)*5120 + s.totalObs*500) / (1024 * 1024)
log.Printf("[store] Loaded %d transmissions (%d observations) in %v (~%dMB est)",
len(s.packets), s.totalObs, elapsed, estMB)
log.Printf("[store] Loaded %d transmissions (%d observations) in %v (heap ~%.0fMB)",
len(s.packets), s.totalObs, elapsed, s.estimatedMemoryMB())
return nil
}
@@ -677,8 +678,7 @@ func (s *PacketStore) GetPerfStoreStats() map[string]interface{} {
advertByObsCount := len(s.advertPubkeys)
s.mu.RUnlock()
// Realistic estimate: ~5KB per packet + ~500 bytes per observation
estimatedMB := math.Round(float64(totalLoaded*5120+totalObs*500)/1048576*10) / 10
estimatedMB := math.Round(s.estimatedMemoryMB()*10) / 10
evicted := atomic.LoadInt64(&s.evicted)
@@ -848,7 +848,7 @@ func (s *PacketStore) GetPerfStoreStatsTyped() PerfPacketStoreStats {
advertByObsCount := len(s.advertPubkeys)
s.mu.RUnlock()
estimatedMB := math.Round(float64(totalLoaded*5120+totalObs*500)/1048576*10) / 10
estimatedMB := math.Round(s.estimatedMemoryMB()*10) / 10
return PerfPacketStoreStats{
TotalLoaded: totalLoaded,
@@ -2046,9 +2046,17 @@ func (s *PacketStore) buildDistanceIndex() {
len(s.distHops), len(s.distPaths))
}
// estimatedMemoryMB returns estimated memory usage of the packet store.
// estimatedMemoryMB returns current Go heap allocation in MB.
// Uses runtime.ReadMemStats so it accounts for all data structures
// (distHops, distPaths, spIndex, map overhead) not just packets/observations.
// In tests, memoryEstimator can be set to inject a deterministic value.
func (s *PacketStore) estimatedMemoryMB() float64 {
return float64(len(s.packets)*5120+s.totalObs*500) / 1048576.0
if s.memoryEstimator != nil {
return s.memoryEstimator()
}
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
return float64(ms.HeapAlloc) / 1048576.0
}
// EvictStale removes packets older than the retention window and/or exceeding
@@ -2069,31 +2077,25 @@ func (s *PacketStore) EvictStale() int {
}
}
// Memory-based eviction: if still over budget, trim more from head
// Memory-based eviction: if heap exceeds budget, trim proportionally from head.
// All major data structures (distHops, distPaths, spIndex) scale with packet count,
// so evicting a fraction of packets frees roughly the same fraction of total heap.
// A 10% buffer avoids immediately re-triggering on the next ingest cycle.
if s.maxMemoryMB > 0 {
for cutoffIdx < len(s.packets) && s.estimatedMemoryMB() > float64(s.maxMemoryMB) {
// Estimate how many more to evict: rough binary approach
overMB := s.estimatedMemoryMB() - float64(s.maxMemoryMB)
// ~5KB per packet, so overMB * 1024*1024 / 5120 packets
extra := int(overMB * 1048576.0 / 5120.0)
if extra < 100 {
extra = 100
currentMB := s.estimatedMemoryMB()
if currentMB > float64(s.maxMemoryMB) && len(s.packets) > 0 {
fractionToKeep := (float64(s.maxMemoryMB) / currentMB) * 0.9
keepCount := int(float64(len(s.packets)) * fractionToKeep)
if keepCount < 0 {
keepCount = 0
}
newCutoff := len(s.packets) - keepCount
if newCutoff > cutoffIdx {
cutoffIdx = newCutoff
}
cutoffIdx += extra
if cutoffIdx > len(s.packets) {
cutoffIdx = len(s.packets)
}
// Recalculate estimated memory with fewer packets
// (we haven't actually removed yet, so simulate)
remainingPkts := len(s.packets) - cutoffIdx
remainingObs := s.totalObs
for _, tx := range s.packets[:cutoffIdx] {
remainingObs -= len(tx.Observations)
}
estMB := float64(remainingPkts*5120+remainingObs*500) / 1048576.0
if estMB <= float64(s.maxMemoryMB) {
break
}
}
}
@@ -2207,9 +2209,7 @@ func (s *PacketStore) EvictStale() int {
evictCount := cutoffIdx
atomic.AddInt64(&s.evicted, int64(evictCount))
freedMB := float64(evictCount*5120+evictedObs*500) / 1048576.0
log.Printf("[store] Evicted %d packets older than %.0fh (freed ~%.1fMB estimated)",
evictCount, s.retentionHours, freedMB)
log.Printf("[store] Evicted %d packets (%d obs)", evictCount, evictedObs)
// Eviction removes data — all caches may be affected
s.invalidateCachesFor(cacheInvalidation{eviction: true})