Compare commits

...

3 Commits

Author SHA1 Message Date
you
5190139746 fix: data race on backfillTotal, O(n*chunks) rescan, unchecked stmt.Exec
- backfillTotal: plain int64 → atomic.Int64 (written by goroutine, read by
  stats handler — data race under -race)
- Collect all pending obs refs in a single pass upfront, then slice into
  chunks. Eliminates repeated full-scan of store.packets per chunk.
- Check stmt.Exec error return instead of silently discarding it.
- Scope X-CoreScope-Status header to /api/ paths only (skip static files).
2026-04-05 07:23:07 +00:00
you
2c25017185 fix: use atomic.Int64 for backfillTotal to prevent data race 2026-04-05 07:22:09 +00:00
you
8d3ff0ada3 perf: async chunked backfill — HTTP serves within 2 minutes (#612)
Move backfillResolvedPaths from blocking startup to a background goroutine
that runs after ListenAndServe. The HTTP server now starts immediately
after Load() + index builds (~2 min), regardless of database size.

Changes:
- New backfillResolvedPathsAsync() processes observations in chunks of 5K
  with 100ms yields between batches to avoid starving HTTP handlers
- Re-picks best observation per-chunk for affected transmissions
- Adds backfillComplete atomic flag + progress counters to PacketStore
- X-CoreScope-Status response header: 'backfilling' or 'ready' on all
  API responses via middleware
- /api/stats includes backfilling (bool) and backfillProgress (0.0-1.0)
- Logs progress: [store] backfill progress: 50000/7100000 observations (0.7%)

The original synchronous backfillResolvedPaths is preserved for potential
use in tests or one-off scripts.
2026-04-05 07:17:52 +00:00
6 changed files with 307 additions and 6 deletions

View File

@@ -0,0 +1,132 @@
package main
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/gorilla/mux"
)
// TestBackfillAsyncChunked verifies that backfillResolvedPathsAsync processes
// observations in chunks, yields between batches, and sets the completion flag.
func TestBackfillAsyncChunked(t *testing.T) {
store := &PacketStore{
packets: make([]*StoreTx, 0),
byHash: make(map[string]*StoreTx),
byTxID: make(map[int]*StoreTx),
byObsID: make(map[int]*StoreObs),
}
// No pending observations → should complete immediately.
backfillResolvedPathsAsync(store, "", 100, time.Millisecond)
if !store.backfillComplete.Load() {
t.Fatal("expected backfillComplete to be true with empty store")
}
}
// TestBackfillStatusHeader verifies the X-CoreScope-Status header is set correctly.
func TestBackfillStatusHeader(t *testing.T) {
store := &PacketStore{
packets: make([]*StoreTx, 0),
byHash: make(map[string]*StoreTx),
byTxID: make(map[int]*StoreTx),
byObsID: make(map[int]*StoreObs),
}
srv := &Server{store: store}
handler := srv.backfillStatusMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}))
// Before backfill completes → backfilling
req := httptest.NewRequest("GET", "/api/stats", nil)
rec := httptest.NewRecorder()
handler.ServeHTTP(rec, req)
if got := rec.Header().Get("X-CoreScope-Status"); got != "backfilling" {
t.Fatalf("expected 'backfilling', got %q", got)
}
// After backfill completes → ready
store.backfillComplete.Store(true)
rec = httptest.NewRecorder()
handler.ServeHTTP(rec, req)
if got := rec.Header().Get("X-CoreScope-Status"); got != "ready" {
t.Fatalf("expected 'ready', got %q", got)
}
}
// TestStatsBackfillFields verifies /api/stats includes backfill fields.
func TestStatsBackfillFields(t *testing.T) {
db := setupTestDBv2(t)
defer db.Close()
seedV2Data(t, db)
store := &PacketStore{
db: db,
packets: make([]*StoreTx, 0),
byHash: make(map[string]*StoreTx),
byTxID: make(map[int]*StoreTx),
byObsID: make(map[int]*StoreObs),
loaded: true,
}
cfg := &Config{Port: 0}
hub := NewHub()
srv := NewServer(db, cfg, hub)
srv.store = store
router := mux.NewRouter()
srv.RegisterRoutes(router)
// While backfilling
req := httptest.NewRequest("GET", "/api/stats", nil)
rec := httptest.NewRecorder()
router.ServeHTTP(rec, req)
var resp map[string]interface{}
if err := json.Unmarshal(rec.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse stats response: %v", err)
}
if backfilling, ok := resp["backfilling"]; !ok {
t.Fatal("missing 'backfilling' field in stats response")
} else if backfilling != true {
t.Fatalf("expected backfilling=true, got %v", backfilling)
}
if _, ok := resp["backfillProgress"]; !ok {
t.Fatal("missing 'backfillProgress' field in stats response")
}
// Check header
if got := rec.Header().Get("X-CoreScope-Status"); got != "backfilling" {
t.Fatalf("expected X-CoreScope-Status=backfilling, got %q", got)
}
// After backfill completes
store.backfillComplete.Store(true)
// Invalidate stats cache
srv.statsMu.Lock()
srv.statsCache = nil
srv.statsMu.Unlock()
rec = httptest.NewRecorder()
router.ServeHTTP(rec, req)
resp = nil
if err := json.Unmarshal(rec.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse stats response: %v", err)
}
if backfilling, ok := resp["backfilling"]; !ok || backfilling != false {
t.Fatalf("expected backfilling=false after completion, got %v", backfilling)
}
if got := rec.Header().Get("X-CoreScope-Status"); got != "ready" {
t.Fatalf("expected X-CoreScope-Status=ready, got %q", got)
}
}

View File

@@ -176,12 +176,9 @@ func main() {
store.graph = BuildFromStore(store)
}
// Backfill resolved_path for observations that don't have it yet
if backfilled := backfillResolvedPaths(store, dbPath); backfilled > 0 {
log.Printf("[store] backfilled resolved_path for %d observations", backfilled)
}
// Re-pick best observation now that resolved paths are populated
// Backfill resolved_path runs asynchronously after HTTP starts (see below).
// Initial pickBestObservation runs with whatever resolved_path data was
// loaded from SQLite; the async backfill will re-pick affected transmissions.
store.mu.Lock()
for _, tx := range store.packets {
pickBestObservation(tx)
@@ -325,6 +322,10 @@ func main() {
}()
log.Printf("[server] CoreScope (Go) listening on http://localhost:%d", cfg.Port)
// Start async backfill in background — HTTP is now available.
go backfillResolvedPathsAsync(store, dbPath, 5000, 100*time.Millisecond)
if err := httpServer.ListenAndServe(); err != http.ErrServerClosed {
log.Fatalf("[server] %v", err)
}

View File

@@ -451,6 +451,135 @@ func backfillResolvedPaths(store *PacketStore, dbPath string) int {
return count
}
// backfillResolvedPathsAsync processes observations with NULL resolved_path in
// chunks, yielding between batches so HTTP handlers remain responsive. It sets
// store.backfillComplete when finished and re-picks best observations for any
// transmissions affected by newly resolved paths.
func backfillResolvedPathsAsync(store *PacketStore, dbPath string, chunkSize int, yieldDuration time.Duration) {
// Snapshot of pending observation references — collected once under read lock.
type obsRef struct {
obsID int
pathJSON string
observerID string
txJSON string
payloadType *int
txHash string
}
store.mu.RLock()
pm := store.nodePM
graph := store.graph
var pending []obsRef
for _, tx := range store.packets {
for _, obs := range tx.Observations {
if obs.ResolvedPath == nil && obs.PathJSON != "" && obs.PathJSON != "[]" {
pending = append(pending, obsRef{
obsID: obs.ID,
pathJSON: obs.PathJSON,
observerID: obs.ObserverID,
txJSON: tx.DecodedJSON,
payloadType: tx.PayloadType,
txHash: tx.Hash,
})
}
}
}
store.mu.RUnlock()
if len(pending) == 0 || pm == nil {
store.backfillComplete.Store(true)
log.Printf("[store] async resolved_path backfill: nothing to do")
return
}
store.backfillTotal.Store(int64(len(pending)))
store.backfillProcessed.Store(0)
log.Printf("[store] async resolved_path backfill starting: %d observations", len(pending))
type resolved struct {
obsID int
rp []*string
rpJSON string
txHash string
}
for i := 0; i < len(pending); i += chunkSize {
end := i + chunkSize
if end > len(pending) {
end = len(pending)
}
chunk := pending[i:end]
// Resolve paths outside the lock.
var results []resolved
for _, ref := range chunk {
fakeTx := &StoreTx{DecodedJSON: ref.txJSON, PayloadType: ref.payloadType}
rp := resolvePathForObs(ref.pathJSON, ref.observerID, fakeTx, pm, graph)
if len(rp) > 0 {
rpJSON := marshalResolvedPath(rp)
if rpJSON != "" {
results = append(results, resolved{ref.obsID, rp, rpJSON, ref.txHash})
}
}
}
// Persist to SQLite in a single transaction per chunk.
if len(results) > 0 {
rw, err := openRW(dbPath)
if err != nil {
log.Printf("[store] async backfill: open rw error: %v", err)
} else {
sqlTx, err := rw.Begin()
if err != nil {
log.Printf("[store] async backfill: begin tx error: %v", err)
} else {
stmt, err := sqlTx.Prepare("UPDATE observations SET resolved_path = ? WHERE id = ?")
if err != nil {
log.Printf("[store] async backfill: prepare error: %v", err)
sqlTx.Rollback()
} else {
for _, r := range results {
if _, err := stmt.Exec(r.rpJSON, r.obsID); err != nil {
log.Printf("[store] async backfill: exec error obs %d: %v", r.obsID, err)
}
}
stmt.Close()
if err := sqlTx.Commit(); err != nil {
log.Printf("[store] async backfill: commit error: %v", err)
}
}
}
rw.Close()
}
// Update in-memory state + re-pick best obs for affected txs.
affectedTxs := make(map[string]bool)
store.mu.Lock()
for _, r := range results {
if obs, ok := store.byObsID[r.obsID]; ok {
obs.ResolvedPath = r.rp
}
affectedTxs[r.txHash] = true
}
for hash := range affectedTxs {
if tx, ok := store.byHash[hash]; ok {
pickBestObservation(tx)
}
}
store.mu.Unlock()
}
store.backfillProcessed.Store(int64(end))
pct := float64(end) / float64(len(pending)) * 100
log.Printf("[store] backfill progress: %d/%d observations (%.1f%%)", end, len(pending), pct)
time.Sleep(yieldDuration)
}
store.backfillComplete.Store(true)
log.Printf("[store] async resolved_path backfill complete: %d observations processed", len(pending))
}
// ─── Shared helpers ────────────────────────────────────────────────────────────
// edgeCandidate represents an extracted edge to be persisted.

View File

@@ -101,6 +101,9 @@ func (s *Server) RegisterRoutes(r *mux.Router) {
// Performance instrumentation middleware
r.Use(s.perfMiddleware)
// Backfill status header middleware
r.Use(s.backfillStatusMiddleware)
// Config endpoints
r.HandleFunc("/api/config/cache", s.handleConfigCache).Methods("GET")
r.HandleFunc("/api/config/client", s.handleConfigClient).Methods("GET")
@@ -164,6 +167,19 @@ func (s *Server) RegisterRoutes(r *mux.Router) {
r.HandleFunc("/api/audio-lab/buckets", s.handleAudioLabBuckets).Methods("GET")
}
func (s *Server) backfillStatusMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.Path, "/api/") {
if s.store != nil && s.store.backfillComplete.Load() {
w.Header().Set("X-CoreScope-Status", "ready")
} else {
w.Header().Set("X-CoreScope-Status", "backfilling")
}
}
next.ServeHTTP(w, r)
})
}
func (s *Server) perfMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(r.URL.Path, "/api/") {
@@ -521,6 +537,19 @@ func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
return
}
counts := s.db.GetRoleCounts()
// Compute backfill progress
backfilling := s.store != nil && !s.store.backfillComplete.Load()
var backfillProgress float64
if backfilling && s.store != nil && s.store.backfillTotal.Load() > 0 {
backfillProgress = float64(s.store.backfillProcessed.Load()) / float64(s.store.backfillTotal.Load())
if backfillProgress > 1 {
backfillProgress = 1
}
} else if !backfilling {
backfillProgress = 1
}
resp := &StatsResponse{
TotalPackets: stats.TotalPackets,
TotalTransmissions: &stats.TotalTransmissions,
@@ -540,6 +569,8 @@ func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
Companions: counts["companions"],
Sensors: counts["sensors"],
},
Backfilling: backfilling,
BackfillProgress: backfillProgress,
}
s.statsMu.Lock()

View File

@@ -157,6 +157,12 @@ type PacketStore struct {
// Persisted neighbor graph for hop resolution at ingest time.
graph *NeighborGraph
// Async backfill state: set after backfillResolvedPathsAsync completes.
backfillComplete atomic.Bool
// Progress tracking for async backfill (total pending and processed so far).
backfillTotal atomic.Int64
backfillProcessed atomic.Int64
// Eviction config and stats
retentionHours float64 // 0 = unlimited
maxMemoryMB int // 0 = unlimited

View File

@@ -68,6 +68,8 @@ type StatsResponse struct {
Commit string `json:"commit"`
BuildTime string `json:"buildTime"`
Counts RoleCounts `json:"counts"`
Backfilling bool `json:"backfilling"`
BackfillProgress float64 `json:"backfillProgress"`
}
// ─── Health ────────────────────────────────────────────────────────────────────