From 97f49a0c7f1927fc8d41d8363b253c4a328148c2 Mon Sep 17 00:00:00 2001 From: CoreScope Bot Date: Mon, 18 May 2026 16:28:18 +0000 Subject: [PATCH] =?UTF-8?q?test(#1265):=20RED=20=E2=80=94=20clock-skew=20h?= =?UTF-8?q?andlers=20not=20on=20recomputer=20(p99=20fails)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/server/clock_skew_recompute_test.go | 101 ++++++++++++++++++++++++ cmd/server/store.go | 2 + 2 files changed, 103 insertions(+) create mode 100644 cmd/server/clock_skew_recompute_test.go diff --git a/cmd/server/clock_skew_recompute_test.go b/cmd/server/clock_skew_recompute_test.go new file mode 100644 index 00000000..7cabe9fd --- /dev/null +++ b/cmd/server/clock_skew_recompute_test.go @@ -0,0 +1,101 @@ +package main + +import ( + "net/http" + "net/http/httptest" + "sort" + "sync" + "testing" + "time" +) + +// Issue #1265: /api/observers/clock-skew (3.3s) and /api/nodes/clock-skew (8.9s) +// must be wired into the steady-state analytics recomputer so reads serve +// from an atomic-pointer snapshot in <100ms p99 under concurrent load. + +func TestClockSkewRecomputersRegistered(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + store := NewPacketStore(db, nil) + + stop := store.StartAnalyticsRecomputers(50 * time.Millisecond) + defer stop() + time.Sleep(100 * time.Millisecond) + + store.analyticsRecomputerMu.RLock() + rcObs := store.recompObserversClockSkew + rcNodes := store.recompNodesClockSkew + store.analyticsRecomputerMu.RUnlock() + + if rcObs == nil { + t.Fatalf("recompObserversClockSkew not registered after StartAnalyticsRecomputers (issue #1265 not fixed)") + } + if rcNodes == nil { + t.Fatalf("recompNodesClockSkew not registered after StartAnalyticsRecomputers (issue #1265 not fixed)") + } + if rcObs.Load() == nil { + t.Fatalf("recompObserversClockSkew snapshot is nil after initial compute") + } + if rcNodes.Load() == nil { + t.Fatalf("recompNodesClockSkew snapshot is nil after initial compute") + } +} + +func TestClockSkewHandlersSteadyStateLatency(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + store := NewPacketStore(db, nil) + stop := store.StartAnalyticsRecomputers(50 * time.Millisecond) + defer stop() + time.Sleep(100 * time.Millisecond) + + s := &Server{store: store} + + endpoints := []struct { + name string + path string + handler http.HandlerFunc + }{ + {"observers", "/api/observers/clock-skew", s.handleObserverClockSkew}, + {"nodes", "/api/nodes/clock-skew", s.handleFleetClockSkew}, + } + + for _, ep := range endpoints { + ep := ep + t.Run(ep.name, func(t *testing.T) { + const readers = 8 + const perReader = 25 + var ( + mu sync.Mutex + samples []time.Duration + wg sync.WaitGroup + ) + wg.Add(readers) + for r := 0; r < readers; r++ { + go func() { + defer wg.Done() + for i := 0; i < perReader; i++ { + rr := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, ep.path, nil) + t0 := time.Now() + ep.handler(rr, req) + dt := time.Since(t0) + if rr.Code != http.StatusOK { + t.Errorf("%s status = %d, want 200", ep.path, rr.Code) + } + mu.Lock() + samples = append(samples, dt) + mu.Unlock() + } + }() + } + wg.Wait() + + sort.Slice(samples, func(i, j int) bool { return samples[i] < samples[j] }) + p99 := samples[int(float64(len(samples))*0.99)] + if p99 > 100*time.Millisecond { + t.Fatalf("%s p99 latency = %v over %d reqs, want <100ms (recomputer snapshot)", ep.path, p99, len(samples)) + } + }) + } +} diff --git a/cmd/server/store.go b/cmd/server/store.go index cd382844..f19676a0 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -165,6 +165,8 @@ type PacketStore struct { recompHashCollisions *analyticsRecomputer recompHashSizes *analyticsRecomputer recompRoles *analyticsRecomputer + recompObserversClockSkew *analyticsRecomputer + recompNodesClockSkew *analyticsRecomputer cacheHits int64 cacheMisses int64 // Rate-limited invalidation (fixes #533: caches cleared faster than hit)