mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-05-24 08:05:37 +00:00
perf: batch-remove from secondary indexes in EvictStale (#590)
## Summary `EvictStale()` was doing O(n) linear scans per evicted item to remove from secondary indexes (`byObserver`, `byPayloadType`, `byNode`). Evicting 1000 packets from an observer with 50K observations meant 1000 × 50K = 50M comparisons — all under a write lock. ## Fix Replace per-item removal with batch single-pass filtering: 1. **Collect phase**: Walk evicted packets once, building sets of evicted tx IDs, observation IDs, and affected index keys 2. **Filter phase**: For each affected index slice, do a single pass keeping only non-evicted entries **Before**: O(evicted_count × index_slice_size) per index — quadratic in practice **After**: O(evicted_count + index_slice_size) per affected key — linear ## Changes - `cmd/server/store.go`: Restructured `EvictStale()` eviction loop into collect + batch-filter pattern ## Testing - All existing tests pass (`cd cmd/server && go test ./...`) Fixes #368 Co-authored-by: you <you@example.com>
This commit is contained in:
+64
-37
@@ -2339,47 +2339,36 @@ func (s *PacketStore) EvictStale() int {
|
||||
evicting := s.packets[:cutoffIdx]
|
||||
evictedObs := 0
|
||||
|
||||
// Remove from all indexes
|
||||
// Build sets of evicted IDs for batch removal from secondary indexes
|
||||
evictedTxIDs := make(map[int]struct{}, cutoffIdx)
|
||||
evictedObsIDs := make(map[int]struct{}, cutoffIdx*2)
|
||||
// Track which observer IDs and payload types need filtering
|
||||
affectedObservers := make(map[string]struct{})
|
||||
affectedPayloadTypes := make(map[int]struct{})
|
||||
affectedNodes := make(map[string]struct{})
|
||||
|
||||
// First pass: remove from primary indexes (byHash, byTxID, byObsID),
|
||||
// collect IDs for batch secondary index cleanup, and handle non-index work
|
||||
for _, tx := range evicting {
|
||||
delete(s.byHash, tx.Hash)
|
||||
delete(s.byTxID, tx.ID)
|
||||
evictedTxIDs[tx.ID] = struct{}{}
|
||||
|
||||
// Remove observations from indexes
|
||||
for _, obs := range tx.Observations {
|
||||
delete(s.byObsID, obs.ID)
|
||||
// Remove from byObserver
|
||||
evictedObsIDs[obs.ID] = struct{}{}
|
||||
if obs.ObserverID != "" {
|
||||
obsList := s.byObserver[obs.ObserverID]
|
||||
for i, o := range obsList {
|
||||
if o.ID == obs.ID {
|
||||
s.byObserver[obs.ObserverID] = append(obsList[:i], obsList[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(s.byObserver[obs.ObserverID]) == 0 {
|
||||
delete(s.byObserver, obs.ObserverID)
|
||||
}
|
||||
affectedObservers[obs.ObserverID] = struct{}{}
|
||||
}
|
||||
evictedObs++
|
||||
}
|
||||
|
||||
// Remove from byPayloadType
|
||||
s.untrackAdvertPubkey(tx)
|
||||
if tx.PayloadType != nil {
|
||||
pt := *tx.PayloadType
|
||||
ptList := s.byPayloadType[pt]
|
||||
for i, t := range ptList {
|
||||
if t.ID == tx.ID {
|
||||
s.byPayloadType[pt] = append(ptList[:i], ptList[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(s.byPayloadType[pt]) == 0 {
|
||||
delete(s.byPayloadType, pt)
|
||||
}
|
||||
affectedPayloadTypes[*tx.PayloadType] = struct{}{}
|
||||
}
|
||||
|
||||
// Remove from byNode and nodeHashes
|
||||
// Remove from nodeHashes and collect affected node keys
|
||||
if tx.DecodedJSON != "" {
|
||||
var decoded map[string]interface{}
|
||||
if json.Unmarshal([]byte(tx.DecodedJSON), &decoded) == nil {
|
||||
@@ -2391,17 +2380,7 @@ func (s *PacketStore) EvictStale() int {
|
||||
delete(s.nodeHashes, v)
|
||||
}
|
||||
}
|
||||
// Remove tx from byNode
|
||||
nodeList := s.byNode[v]
|
||||
for i, t := range nodeList {
|
||||
if t.ID == tx.ID {
|
||||
s.byNode[v] = append(nodeList[:i], nodeList[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(s.byNode[v]) == 0 {
|
||||
delete(s.byNode, v)
|
||||
}
|
||||
affectedNodes[v] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2413,6 +2392,54 @@ func (s *PacketStore) EvictStale() int {
|
||||
removeTxFromPathHopIndex(s.byPathHop, tx)
|
||||
}
|
||||
|
||||
// Batch-remove from byObserver: single pass per affected observer slice
|
||||
for obsID := range affectedObservers {
|
||||
obsList := s.byObserver[obsID]
|
||||
filtered := obsList[:0]
|
||||
for _, o := range obsList {
|
||||
if _, evicted := evictedObsIDs[o.ID]; !evicted {
|
||||
filtered = append(filtered, o)
|
||||
}
|
||||
}
|
||||
if len(filtered) == 0 {
|
||||
delete(s.byObserver, obsID)
|
||||
} else {
|
||||
s.byObserver[obsID] = filtered
|
||||
}
|
||||
}
|
||||
|
||||
// Batch-remove from byPayloadType: single pass per affected type slice
|
||||
for pt := range affectedPayloadTypes {
|
||||
ptList := s.byPayloadType[pt]
|
||||
filtered := ptList[:0]
|
||||
for _, t := range ptList {
|
||||
if _, evicted := evictedTxIDs[t.ID]; !evicted {
|
||||
filtered = append(filtered, t)
|
||||
}
|
||||
}
|
||||
if len(filtered) == 0 {
|
||||
delete(s.byPayloadType, pt)
|
||||
} else {
|
||||
s.byPayloadType[pt] = filtered
|
||||
}
|
||||
}
|
||||
|
||||
// Batch-remove from byNode: single pass per affected node slice
|
||||
for nodeKey := range affectedNodes {
|
||||
nodeList := s.byNode[nodeKey]
|
||||
filtered := nodeList[:0]
|
||||
for _, t := range nodeList {
|
||||
if _, evicted := evictedTxIDs[t.ID]; !evicted {
|
||||
filtered = append(filtered, t)
|
||||
}
|
||||
}
|
||||
if len(filtered) == 0 {
|
||||
delete(s.byNode, nodeKey)
|
||||
} else {
|
||||
s.byNode[nodeKey] = filtered
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from distance indexes — filter out records referencing evicted txs
|
||||
evictedTxSet := make(map[*StoreTx]bool, cutoffIdx)
|
||||
for _, tx := range evicting {
|
||||
|
||||
Reference in New Issue
Block a user