fix: add mutex synchronization to PerfStats to eliminate data races (#469)

## Summary

Fixes #361 — `perfMiddleware()` wrote to shared `PerfStats` fields
(`Requests`, `TotalMs`, `Endpoints` map, `SlowQueries` slice) without
any synchronization, causing data races under concurrent HTTP requests.

## Changes

### `cmd/server/routes.go`
- **Added `sync.Mutex` to `PerfStats` struct** — single mutex protects
all fields
- **`perfMiddleware`** — all shared state mutations (counter increments,
endpoint map access, slice appends) now happen under lock. Key
normalization (regex, mux route lookup) moved outside the lock since it
uses no shared state
- **`handleHealth`** — snapshots `Requests`, `TotalMs`, `SlowQueries`
under lock before building response
- **`handlePerf`** — copies all endpoint data and slow queries under
lock into local snapshots, then does expensive work (sorting, percentile
calculation) outside the lock
- **`handlePerfReset`** — resets fields in-place instead of replacing
the pointer (avoids unlocking a different mutex)

### `cmd/server/perfstats_race_test.go` (new)
- Regression test: 50 concurrent writer goroutines + 10 concurrent
reader goroutines hammering `PerfStats` simultaneously
- Verifies no race conditions (via `-race` flag) and counter consistency

## Design Decisions

- **Single mutex over atomics**: The issue suggested `atomic.Int64` for
counters, but since slices/maps need a mutex anyway, a single mutex is
simpler and the critical section is small (microseconds). No measurable
contention at CoreScope's scale.
- **Copy-under-lock pattern**: Expensive operations (sorting, percentile
computation) happen outside the lock to minimize hold time.
- **In-place reset**: `handlePerfReset` clears fields rather than
replacing the `PerfStats` pointer, ensuring the mutex remains valid for
concurrent goroutines.

## Testing

- `go test -race -count=1 ./cmd/server/...` — **PASS** (all existing
tests + new race test)
- New `TestPerfStatsConcurrentAccess` specifically validates concurrent
access patterns

Co-authored-by: you <you@example.com>
This commit is contained in:
Kpa-clawbot
2026-04-01 19:26:11 -07:00
committed by GitHub
parent 0b1924d401
commit 623ebc879b
2 changed files with 156 additions and 30 deletions
+95
View File
@@ -0,0 +1,95 @@
package main
import (
"sync"
"testing"
"time"
)
// TestPerfStatsConcurrentAccess verifies that concurrent writes and reads
// to PerfStats do not trigger data races. Run with: go test -race
func TestPerfStatsConcurrentAccess(t *testing.T) {
ps := NewPerfStats()
var wg sync.WaitGroup
const goroutines = 50
const iterations = 200
// Concurrent writers (simulating perfMiddleware)
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < iterations; j++ {
ms := float64(j) * 0.5
key := "/api/test"
if id%2 == 0 {
key = "/api/other"
}
ps.mu.Lock()
ps.Requests++
ps.TotalMs += ms
if _, ok := ps.Endpoints[key]; !ok {
ps.Endpoints[key] = &EndpointPerf{Recent: make([]float64, 0, 100)}
}
ep := ps.Endpoints[key]
ep.Count++
ep.TotalMs += ms
if ms > ep.MaxMs {
ep.MaxMs = ms
}
ep.Recent = append(ep.Recent, ms)
if len(ep.Recent) > 100 {
ep.Recent = ep.Recent[1:]
}
if ms > 50 {
ps.SlowQueries = append(ps.SlowQueries, SlowQuery{
Path: key,
Ms: ms,
Time: time.Now().UTC().Format(time.RFC3339),
})
if len(ps.SlowQueries) > 50 {
ps.SlowQueries = ps.SlowQueries[1:]
}
}
ps.mu.Unlock()
}
}(i)
}
// Concurrent readers (simulating handlePerf / handleHealth)
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < iterations; j++ {
ps.mu.Lock()
_ = ps.Requests
_ = ps.TotalMs
for _, ep := range ps.Endpoints {
_ = ep.Count
_ = ep.MaxMs
c := make([]float64, len(ep.Recent))
copy(c, ep.Recent)
}
s := make([]SlowQuery, len(ps.SlowQueries))
copy(s, ps.SlowQueries)
ps.mu.Unlock()
}
}()
}
wg.Wait()
// Verify consistency
ps.mu.Lock()
defer ps.mu.Unlock()
expectedRequests := int64(goroutines * iterations)
if ps.Requests != expectedRequests {
t.Errorf("expected %d requests, got %d", expectedRequests, ps.Requests)
}
if len(ps.Endpoints) == 0 {
t.Error("expected endpoints to be populated")
}
}
+61 -30
View File
@@ -42,6 +42,7 @@ type Server struct {
// PerfStats tracks request performance.
type PerfStats struct {
mu sync.Mutex
Requests int64
TotalMs float64
Endpoints map[string]*EndpointPerf
@@ -162,10 +163,7 @@ func (s *Server) perfMiddleware(next http.Handler) http.Handler {
next.ServeHTTP(w, r)
ms := float64(time.Since(start).Microseconds()) / 1000.0
s.perfStats.Requests++
s.perfStats.TotalMs += ms
// Normalize key: prefer mux route template (like Node.js req.route.path)
// Normalize key outside lock (no shared state needed)
key := r.URL.Path
if route := mux.CurrentRoute(r); route != nil {
if tmpl, err := route.GetPathTemplate(); err == nil {
@@ -175,6 +173,11 @@ func (s *Server) perfMiddleware(next http.Handler) http.Handler {
if key == r.URL.Path {
key = perfHexFallback.ReplaceAllString(key, ":id")
}
s.perfStats.mu.Lock()
s.perfStats.Requests++
s.perfStats.TotalMs += ms
if _, ok := s.perfStats.Endpoints[key]; !ok {
s.perfStats.Endpoints[key] = &EndpointPerf{Recent: make([]float64, 0, 100)}
}
@@ -200,6 +203,7 @@ func (s *Server) perfMiddleware(next http.Handler) http.Handler {
s.perfStats.SlowQueries = s.perfStats.SlowQueries[1:]
}
}
s.perfStats.mu.Unlock()
})
}
@@ -365,7 +369,8 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
lastPauseMs = float64(m.PauseNs[(m.NumGC+255)%256]) / 1e6
}
// Build slow queries list
// Build slow queries list (copy under lock)
s.perfStats.mu.Lock()
recentSlow := make([]SlowQuery, 0)
sliceEnd := s.perfStats.SlowQueries
if len(sliceEnd) > 5 {
@@ -374,6 +379,10 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
for _, sq := range sliceEnd {
recentSlow = append(recentSlow, sq)
}
perfRequests := s.perfStats.Requests
perfTotalMs := s.perfStats.TotalMs
perfSlowCount := len(s.perfStats.SlowQueries)
s.perfStats.mu.Unlock()
writeJSON(w, HealthResponse{
Status: "ok",
@@ -403,9 +412,9 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
EstimatedMB: pktEstMB,
},
Perf: HealthPerfStats{
TotalRequests: int(s.perfStats.Requests),
AvgMs: safeAvg(s.perfStats.TotalMs, float64(s.perfStats.Requests)),
SlowQueries: len(s.perfStats.SlowQueries),
TotalRequests: int(perfRequests),
AvgMs: safeAvg(perfTotalMs, float64(perfRequests)),
SlowQueries: perfSlowCount,
RecentSlow: recentSlow,
},
})
@@ -465,22 +474,50 @@ func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
}
func (s *Server) handlePerf(w http.ResponseWriter, r *http.Request) {
// Endpoint performance summary
// Copy perfStats under lock to avoid data races
s.perfStats.mu.Lock()
type epSnapshot struct {
path string
count int
totalMs float64
maxMs float64
recent []float64
}
epSnapshots := make([]epSnapshot, 0, len(s.perfStats.Endpoints))
for path, ep := range s.perfStats.Endpoints {
recentCopy := make([]float64, len(ep.Recent))
copy(recentCopy, ep.Recent)
epSnapshots = append(epSnapshots, epSnapshot{path, ep.Count, ep.TotalMs, ep.MaxMs, recentCopy})
}
uptimeSec := int(time.Since(s.perfStats.StartedAt).Seconds())
totalRequests := s.perfStats.Requests
totalMs := s.perfStats.TotalMs
slowQueries := make([]SlowQuery, 0)
sliceEnd := s.perfStats.SlowQueries
if len(sliceEnd) > 20 {
sliceEnd = sliceEnd[len(sliceEnd)-20:]
}
for _, sq := range sliceEnd {
slowQueries = append(slowQueries, sq)
}
s.perfStats.mu.Unlock()
// Process snapshots outside lock
type epEntry struct {
path string
data *EndpointStatsResp
}
var entries []epEntry
for path, ep := range s.perfStats.Endpoints {
sorted := sortedCopy(ep.Recent)
for _, snap := range epSnapshots {
sorted := sortedCopy(snap.recent)
d := &EndpointStatsResp{
Count: ep.Count,
AvgMs: safeAvg(ep.TotalMs, float64(ep.Count)),
Count: snap.count,
AvgMs: safeAvg(snap.totalMs, float64(snap.count)),
P50Ms: round(percentile(sorted, 0.5), 1),
P95Ms: round(percentile(sorted, 0.95), 1),
MaxMs: round(ep.MaxMs, 1),
MaxMs: round(snap.maxMs, 1),
}
entries = append(entries, epEntry{path, d})
entries = append(entries, epEntry{snap.path, d})
}
// Sort by total time spent (count * avg) descending, matching Node.js
sort.Slice(entries, func(i, j int) bool {
@@ -521,22 +558,10 @@ func (s *Server) handlePerf(w http.ResponseWriter, r *http.Request) {
sqliteStats = &ss
}
uptimeSec := int(time.Since(s.perfStats.StartedAt).Seconds())
// Convert slow queries
slowQueries := make([]SlowQuery, 0)
sliceEnd := s.perfStats.SlowQueries
if len(sliceEnd) > 20 {
sliceEnd = sliceEnd[len(sliceEnd)-20:]
}
for _, sq := range sliceEnd {
slowQueries = append(slowQueries, sq)
}
writeJSON(w, PerfResponse{
Uptime: uptimeSec,
TotalRequests: s.perfStats.Requests,
AvgMs: safeAvg(s.perfStats.TotalMs, float64(s.perfStats.Requests)),
TotalRequests: totalRequests,
AvgMs: safeAvg(totalMs, float64(totalRequests)),
Endpoints: summary,
SlowQueries: slowQueries,
Cache: perfCS,
@@ -560,7 +585,13 @@ func (s *Server) handlePerf(w http.ResponseWriter, r *http.Request) {
}
func (s *Server) handlePerfReset(w http.ResponseWriter, r *http.Request) {
s.perfStats = NewPerfStats()
s.perfStats.mu.Lock()
s.perfStats.Requests = 0
s.perfStats.TotalMs = 0
s.perfStats.Endpoints = make(map[string]*EndpointPerf)
s.perfStats.SlowQueries = make([]SlowQuery, 0)
s.perfStats.StartedAt = time.Now()
s.perfStats.mu.Unlock()
writeJSON(w, OkResp{Ok: true})
}