diff --git a/cmd/server/eviction_test.go b/cmd/server/eviction_test.go index b1414d52..d4090e90 100644 --- a/cmd/server/eviction_test.go +++ b/cmd/server/eviction_test.go @@ -85,6 +85,12 @@ func makeTestStore(count int, startTime time.Time, intervalMin int) *PacketStore // Subpath index addTxToSubpathIndex(store.spIndex, tx) + + // Track bytes for self-accounting + store.trackedBytes += estimateStoreTxBytes(tx) + for _, obs := range tx.Observations { + store.trackedBytes += estimateStoreObsBytes(obs) + } } return store @@ -166,43 +172,43 @@ func TestEvictStale_MemoryBasedEviction(t *testing.T) { // All packets are recent (1h old) so time-based won't trigger. store.retentionHours = 24 store.maxMemoryMB = 3 - // Inject deterministic estimator: simulates 6MB (over 3MB limit). - // Uses packet count so it scales correctly after eviction. - store.memoryEstimator = func() float64 { - return float64(len(store.packets)*5120+store.totalObs*500) / 1048576.0 - } + // Set trackedBytes to simulate 6MB (over 3MB limit). + store.trackedBytes = 6 * 1048576 evicted := store.EvictStale() if evicted == 0 { t.Fatal("expected some evictions for memory cap") } - estMB := store.estimatedMemoryMB() - if estMB > 3.5 { - t.Fatalf("expected <=3.5MB after eviction, got %.1fMB", estMB) + // 25% safety cap should limit to 250 per pass + if evicted > 250 { + t.Fatalf("25%% safety cap violated: evicted %d", evicted) + } + // trackedBytes should have decreased + if store.trackedBytes >= 6*1048576 { + t.Fatal("trackedBytes should have decreased after eviction") } } -// TestEvictStale_MemoryBasedEviction_UnderestimatedHeap verifies that eviction -// fires correctly when actual heap is much larger than a formula-based estimate -// would report — the scenario that caused OOM kills in production. +// TestEvictStale_MemoryBasedEviction_UnderestimatedHeap verifies that the 25% +// safety cap prevents cascading eviction even when trackedBytes is very high. func TestEvictStale_MemoryBasedEviction_UnderestimatedHeap(t *testing.T) { now := time.Now().UTC() store := makeTestStore(1000, now.Add(-1*time.Hour), 0) store.retentionHours = 24 store.maxMemoryMB = 500 - // Simulate actual heap 5x over budget (like production: ~5GB actual vs ~1GB limit). - store.memoryEstimator = func() float64 { - return 2500.0 // 2500MB actual vs 500MB limit - } + // Simulate trackedBytes 5x over budget. + store.trackedBytes = 2500 * 1048576 evicted := store.EvictStale() if evicted == 0 { - t.Fatal("expected evictions when heap is 5x over limit") + t.Fatal("expected evictions when tracked is 5x over limit") } - // Should keep roughly 500/2500 * 0.9 = 18% of packets → ~180 of 1000. - remaining := len(store.packets) - if remaining > 250 { - t.Fatalf("expected most packets evicted (heap 5x over), but %d of 1000 remain", remaining) + // Safety cap: max 25% per pass = 250 + if evicted > 250 { + t.Fatalf("25%% safety cap violated: evicted %d of 1000", evicted) + } + if evicted != 250 { + t.Fatalf("expected exactly 250 evicted (25%% cap), got %d", evicted) } } @@ -375,3 +381,196 @@ func TestCacheTTLDefaults(t *testing.T) { t.Fatalf("expected default rfCacheTTL=15s, got %v", store.rfCacheTTL) } } + +// --- Self-accounting memory tracking tests --- + +func TestTrackedBytes_IncreasesOnInsert(t *testing.T) { + now := time.Now().UTC() + store := makeTestStore(0, now, 0) + if store.trackedBytes != 0 { + t.Fatalf("expected 0 trackedBytes for empty store, got %d", store.trackedBytes) + } + + store2 := makeTestStore(10, now, 1) + if store2.trackedBytes <= 0 { + t.Fatal("expected positive trackedBytes after inserting 10 packets") + } + // Each packet has 2 observations; should be roughly 10*(384+5*48) + 20*(192+2*48) = 10*624 + 20*288 = 12000 + expectedMin := int64(10*600 + 20*250) // rough lower bound + if store2.trackedBytes < expectedMin { + t.Fatalf("trackedBytes %d seems too low (expected > %d)", store2.trackedBytes, expectedMin) + } +} + +func TestTrackedBytes_DecreasesOnEvict(t *testing.T) { + now := time.Now().UTC() + store := makeTestStore(100, now.Add(-48*time.Hour), 0) + store.retentionHours = 24 + + beforeBytes := store.trackedBytes + if beforeBytes <= 0 { + t.Fatal("expected positive trackedBytes before eviction") + } + + evicted := store.EvictStale() + if evicted != 100 { + t.Fatalf("expected 100 evicted, got %d", evicted) + } + if store.trackedBytes != 0 { + t.Fatalf("expected 0 trackedBytes after evicting all, got %d", store.trackedBytes) + } +} + +func TestTrackedBytes_MatchesExpectedAfterMixedInsertEvict(t *testing.T) { + now := time.Now().UTC() + // Create 100 packets, 50 old + 50 recent + store := makeTestStore(100, now.Add(-48*time.Hour), 0) + for i := 50; i < 100; i++ { + store.packets[i].FirstSeen = now.Add(-1 * time.Hour).Format(time.RFC3339) + } + store.retentionHours = 24 + + totalBefore := store.trackedBytes + + // Calculate expected bytes for first 50 packets (to be evicted) + var evictedBytes int64 + for i := 0; i < 50; i++ { + tx := store.packets[i] + evictedBytes += estimateStoreTxBytes(tx) + for _, obs := range tx.Observations { + evictedBytes += estimateStoreObsBytes(obs) + } + } + + store.EvictStale() + + expectedAfter := totalBefore - evictedBytes + if store.trackedBytes != expectedAfter { + t.Fatalf("trackedBytes %d != expected %d (before=%d, evicted=%d)", + store.trackedBytes, expectedAfter, totalBefore, evictedBytes) + } +} + +func TestWatermarkHysteresis(t *testing.T) { + now := time.Now().UTC() + store := makeTestStore(1000, now.Add(-1*time.Hour), 0) + store.retentionHours = 0 // no time-based eviction + store.maxMemoryMB = 1 // 1MB budget + + // Set trackedBytes to just above high watermark + highWatermark := int64(1 * 1048576) + lowWatermark := int64(float64(highWatermark) * 0.85) + store.trackedBytes = highWatermark + 1 + + evicted := store.EvictStale() + if evicted == 0 { + t.Fatal("expected eviction when above high watermark") + } + if store.trackedBytes > lowWatermark+1024 { + t.Fatalf("expected trackedBytes near low watermark after eviction, got %d (low=%d)", + store.trackedBytes, lowWatermark) + } + + // Now set trackedBytes to just below high watermark — should NOT trigger + store.trackedBytes = highWatermark - 1 + evicted2 := store.EvictStale() + if evicted2 != 0 { + t.Fatalf("expected no eviction below high watermark, got %d", evicted2) + } +} + +func TestSafetyCap25Percent(t *testing.T) { + now := time.Now().UTC() + store := makeTestStore(1000, now.Add(-1*time.Hour), 0) + store.retentionHours = 0 + store.maxMemoryMB = 1 + + // Set trackedBytes way over limit to force maximum eviction + store.trackedBytes = 100 * 1048576 // 100MB vs 1MB limit + + evicted := store.EvictStale() + // 25% of 1000 = 250 + if evicted > 250 { + t.Fatalf("25%% safety cap violated: evicted %d of 1000 (max should be 250)", evicted) + } + if evicted != 250 { + t.Fatalf("expected exactly 250 evicted (25%% cap), got %d", evicted) + } + if len(store.packets) != 750 { + t.Fatalf("expected 750 remaining, got %d", len(store.packets)) + } +} + +func TestMultiplePassesConverge(t *testing.T) { + now := time.Now().UTC() + store := makeTestStore(1000, now.Add(-1*time.Hour), 0) + store.retentionHours = 0 + // Set budget to half the actual tracked bytes — requires ~2 passes + actualBytes := store.trackedBytes + store.maxMemoryMB = int(float64(actualBytes) / 1048576.0 / 2) + if store.maxMemoryMB < 1 { + store.maxMemoryMB = 1 + } + + totalEvicted := 0 + for pass := 0; pass < 20; pass++ { + evicted := store.EvictStale() + if evicted == 0 { + break + } + totalEvicted += evicted + } + + // After convergence, trackedBytes should be at or below high watermark + // (may be between low and high due to hysteresis — that's fine) + highWatermark := int64(store.maxMemoryMB) * 1048576 + if store.trackedBytes > highWatermark { + t.Fatalf("did not converge: trackedBytes=%d (%.1fMB) > highWatermark=%d after multiple passes", + store.trackedBytes, float64(store.trackedBytes)/1048576.0, highWatermark) + } + if totalEvicted == 0 { + t.Fatal("expected some evictions across multiple passes") + } +} + +func TestEstimateStoreTxBytes(t *testing.T) { + tx := &StoreTx{ + RawHex: "aabbcc", + Hash: "hash1234", + DecodedJSON: `{"pubKey":"pk1"}`, + PathJSON: `["aa","bb"]`, + } + est := estimateStoreTxBytes(tx) + // Verify the function returns a reasonable value matching our manual calculation + manualCalc := int64(storeTxBaseBytes) + int64(len(tx.RawHex)+len(tx.Hash)+len(tx.DecodedJSON)+len(tx.PathJSON)) + int64(numIndexesPerTx*indexEntryBytes) + if est != manualCalc { + t.Fatalf("estimateStoreTxBytes = %d, want %d (manual calc)", est, manualCalc) + } + if est < 600 || est > 800 { + t.Fatalf("estimateStoreTxBytes = %d, expected in range [600, 800]", est) + } +} + +func TestEstimateStoreObsBytes(t *testing.T) { + obs := &StoreObs{ + ObserverID: "obs123", + PathJSON: `["aa"]`, + } + est := estimateStoreObsBytes(obs) + // storeObsBaseBytes(192) + len(ObserverID=6) + len(PathJSON=6) + 2*48(96) = 300 + expected := int64(192 + 6 + 6 + 2*48) + if est != expected { + t.Fatalf("estimateStoreObsBytes = %d, want %d", est, expected) + } +} + +func BenchmarkEviction100K(b *testing.B) { + now := time.Now().UTC() + for i := 0; i < b.N; i++ { + b.StopTimer() + store := makeTestStore(100000, now.Add(-48*time.Hour), 0) + store.retentionHours = 24 + b.StartTimer() + store.EvictStale() + } +} diff --git a/cmd/server/store.go b/cmd/server/store.go index e56a5c8a..ad14bcdd 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -208,9 +208,10 @@ type PacketStore struct { // Eviction config and stats retentionHours float64 // 0 = unlimited - maxMemoryMB int // 0 = unlimited + maxMemoryMB int // 0 = unlimited (packet store memory budget) evicted int64 // total packets evicted - memoryEstimator func() float64 // injectable for tests; nil = use runtime.ReadMemStats + trackedBytes int64 // running total of estimated packet store memory + memoryEstimator func() float64 // injectable for tests; nil = use runtime.ReadMemStats (stats only) } // Precomputed distance records for fast analytics aggregation. @@ -407,6 +408,7 @@ func (s *PacketStore) Load() error { s.byPayloadType[pt] = append(s.byPayloadType[pt], tx) } s.trackAdvertPubkey(tx) + s.trackedBytes += estimateStoreTxBytes(tx) } if obsID.Valid { @@ -455,6 +457,7 @@ func (s *PacketStore) Load() error { } s.totalObs++ + s.trackedBytes += estimateStoreObsBytes(obs) } } @@ -474,8 +477,8 @@ func (s *PacketStore) Load() error { s.loaded = true elapsed := time.Since(t0) - log.Printf("[store] Loaded %d transmissions (%d observations) in %v (heap ~%.0fMB)", - len(s.packets), s.totalObs, elapsed, s.estimatedMemoryMB()) + log.Printf("[store] Loaded %d transmissions (%d observations) in %v (tracked ~%.0fMB, heap ~%.0fMB)", + len(s.packets), s.totalObs, elapsed, s.trackedMemoryMB(), s.estimatedMemoryMB()) return nil } @@ -887,6 +890,7 @@ func (s *PacketStore) GetPerfStoreStats() map[string]interface{} { s.mu.RUnlock() estimatedMB := math.Round(s.estimatedMemoryMB()*10) / 10 + trackedMB := math.Round(s.trackedMemoryMB()*10) / 10 evicted := atomic.LoadInt64(&s.evicted) @@ -901,6 +905,7 @@ func (s *PacketStore) GetPerfStoreStats() map[string]interface{} { "retentionHours": s.retentionHours, "maxMemoryMB": s.maxMemoryMB, "estimatedMB": estimatedMB, + "trackedMB": trackedMB, "indexes": map[string]interface{}{ "byHash": hashIdx, "byTxID": txIdx, @@ -1058,6 +1063,7 @@ func (s *PacketStore) GetPerfStoreStatsTyped() PerfPacketStoreStats { s.mu.RUnlock() estimatedMB := math.Round(s.estimatedMemoryMB()*10) / 10 + trackedMB := math.Round(s.trackedMemoryMB()*10) / 10 return PerfPacketStoreStats{ TotalLoaded: totalLoaded, @@ -1069,6 +1075,7 @@ func (s *PacketStore) GetPerfStoreStatsTyped() PerfPacketStoreStats { SqliteOnly: false, MaxPackets: 2386092, EstimatedMB: estimatedMB, + TrackedMB: trackedMB, MaxMB: s.maxMemoryMB, Indexes: PacketStoreIndexes{ ByHash: hashIdx, @@ -1374,6 +1381,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac s.byPayloadType[pt] = append(s.byPayloadType[pt], tx) } s.trackAdvertPubkey(tx) + s.trackedBytes += estimateStoreTxBytes(tx) if _, exists := broadcastTxs[r.txID]; !exists { broadcastTxs[r.txID] = tx @@ -1430,6 +1438,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac s.byObserver[r.observerID] = append(s.byObserver[r.observerID], obs) } s.totalObs++ + s.trackedBytes += estimateStoreObsBytes(obs) } } @@ -1744,6 +1753,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] s.byObserver[r.observerID] = append(s.byObserver[r.observerID], obs) } s.totalObs++ + s.trackedBytes += estimateStoreObsBytes(obs) updatedTxs[r.txID] = tx decoded := map[string]interface{}{ @@ -2557,9 +2567,36 @@ func (s *PacketStore) buildDistanceIndex() { len(s.distHops), len(s.distPaths)) } +// Self-accounting memory estimation constants. +// These estimate the in-memory cost of StoreTx and StoreObs structs including +// map/index overhead. They don't need to be exact — just proportional to actual +// usage and independent of GC state. +const ( + storeTxBaseBytes = 384 // StoreTx struct fields + map headers + sync.Once + string headers + storeObsBaseBytes = 192 // StoreObs struct fields + string headers + indexEntryBytes = 48 // average cost of one index map entry (key + pointer + bucket overhead) + numIndexesPerTx = 5 // byHash, byTxID, byNode, byPayloadType, nodeHashes entries + numIndexesPerObs = 2 // byObsID, byObserver entries +) + +// estimateStoreTxBytes returns the estimated memory cost of a StoreTx (excluding observations). +func estimateStoreTxBytes(tx *StoreTx) int64 { + base := int64(storeTxBaseBytes) + base += int64(len(tx.RawHex) + len(tx.Hash) + len(tx.DecodedJSON) + len(tx.PathJSON)) + base += int64(numIndexesPerTx * indexEntryBytes) + return base +} + +// estimateStoreObsBytes returns the estimated memory cost of a StoreObs. +func estimateStoreObsBytes(obs *StoreObs) int64 { + base := int64(storeObsBaseBytes) + base += int64(len(obs.PathJSON) + len(obs.ObserverID)) + base += int64(numIndexesPerObs * indexEntryBytes) + return base +} + // estimatedMemoryMB returns current Go heap allocation in MB. -// Uses runtime.ReadMemStats so it accounts for all data structures -// (distHops, distPaths, spIndex, map overhead) not just packets/observations. +// Kept for stats/debug endpoints only — NOT used in eviction decisions. // In tests, memoryEstimator can be set to inject a deterministic value. func (s *PacketStore) estimatedMemoryMB() float64 { if s.memoryEstimator != nil { @@ -2570,6 +2607,11 @@ func (s *PacketStore) estimatedMemoryMB() float64 { return float64(ms.HeapAlloc) / 1048576.0 } +// trackedMemoryMB returns the self-accounted packet store memory in MB. +func (s *PacketStore) trackedMemoryMB() float64 { + return float64(s.trackedBytes) / 1048576.0 +} + // EvictStale removes packets older than the retention window and/or exceeding // the memory cap. Must be called with s.mu held (Lock). Returns the number of // packets evicted. @@ -2588,24 +2630,34 @@ func (s *PacketStore) EvictStale() int { } } - // Memory-based eviction: if heap exceeds budget, trim proportionally from head. - // All major data structures (distHops, distPaths, spIndex) scale with packet count, - // so evicting a fraction of packets frees roughly the same fraction of total heap. - // A 10% buffer avoids immediately re-triggering on the next ingest cycle. + // Memory-based eviction: use self-accounted trackedBytes with watermark hysteresis. + // High watermark = maxMemoryMB (trigger), low watermark = 85% (stop). + // Safety cap: never evict more than 25% of packets in a single pass. if s.maxMemoryMB > 0 { - currentMB := s.estimatedMemoryMB() - if currentMB > float64(s.maxMemoryMB) && len(s.packets) > 0 { - fractionToKeep := (float64(s.maxMemoryMB) / currentMB) * 0.9 - keepCount := int(float64(len(s.packets)) * fractionToKeep) - if keepCount < 0 { - keepCount = 0 + highWatermark := int64(s.maxMemoryMB) * 1048576 + lowWatermark := int64(float64(highWatermark) * 0.85) + if s.trackedBytes > highWatermark && len(s.packets) > 0 { + // Evict from head until trackedBytes would drop below low watermark + var bytesToEvict int64 + memCutoff := cutoffIdx + for memCutoff < len(s.packets) && (s.trackedBytes-bytesToEvict) > lowWatermark { + tx := s.packets[memCutoff] + bytesToEvict += estimateStoreTxBytes(tx) + for _, obs := range tx.Observations { + bytesToEvict += estimateStoreObsBytes(obs) + } + memCutoff++ } - newCutoff := len(s.packets) - keepCount - if newCutoff > cutoffIdx { - cutoffIdx = newCutoff + // Safety cap: never evict more than 25% in a single pass + maxEvict := len(s.packets) / 4 + if maxEvict < 1 { + maxEvict = 1 } - if cutoffIdx > len(s.packets) { - cutoffIdx = len(s.packets) + if memCutoff > maxEvict { + memCutoff = maxEvict + } + if memCutoff > cutoffIdx { + cutoffIdx = memCutoff } } } @@ -2619,6 +2671,7 @@ func (s *PacketStore) EvictStale() int { evicting := s.packets[:cutoffIdx] evictedObs := 0 + var evictedBytes int64 // Build sets of evicted IDs for batch removal from secondary indexes evictedTxIDs := make(map[int]struct{}, cutoffIdx) @@ -2634,10 +2687,12 @@ func (s *PacketStore) EvictStale() int { delete(s.byHash, tx.Hash) delete(s.byTxID, tx.ID) evictedTxIDs[tx.ID] = struct{}{} + evictedBytes += estimateStoreTxBytes(tx) for _, obs := range tx.Observations { delete(s.byObsID, obs.ID) evictedObsIDs[obs.ID] = struct{}{} + evictedBytes += estimateStoreObsBytes(obs) if obs.ObserverID != "" { affectedObservers[obs.ObserverID] = struct{}{} } @@ -2789,7 +2844,12 @@ func (s *PacketStore) EvictStale() int { evictCount := cutoffIdx atomic.AddInt64(&s.evicted, int64(evictCount)) - log.Printf("[store] Evicted %d packets (%d obs)", evictCount, evictedObs) + s.trackedBytes -= evictedBytes + if s.trackedBytes < 0 { + s.trackedBytes = 0 + } + log.Printf("[store] Evicted %d packets (%d obs, freed ~%.1fMB, tracked ~%.1fMB)", + evictCount, evictedObs, float64(evictedBytes)/1048576.0, s.trackedMemoryMB()) // Eviction removes data — all caches may be affected s.invalidateCachesFor(cacheInvalidation{eviction: true}) diff --git a/cmd/server/types.go b/cmd/server/types.go index 5f350d53..0b78ba63 100644 --- a/cmd/server/types.go +++ b/cmd/server/types.go @@ -174,6 +174,7 @@ type PerfPacketStoreStats struct { SqliteOnly bool `json:"sqliteOnly"` MaxPackets int `json:"maxPackets"` EstimatedMB float64 `json:"estimatedMB"` + TrackedMB float64 `json:"trackedMB"` MaxMB int `json:"maxMB"` Indexes PacketStoreIndexes `json:"indexes"` }