diff --git a/cmd/server/clock_skew.go b/cmd/server/clock_skew.go index f6fc024d..b3298b88 100644 --- a/cmd/server/clock_skew.go +++ b/cmd/server/clock_skew.go @@ -33,6 +33,31 @@ const ( // maxReasonableDriftPerDay caps drift display. Physically impossible // drift rates (> 1 day/day) indicate insufficient or outlier samples. maxReasonableDriftPerDay = 86400.0 + + // recentSkewWindowCount is the number of most-recent advert samples + // used to derive the "current" skew for severity classification (see + // issue #789). The all-time median is poisoned by historical bad + // samples (e.g. a node that was off and then GPS-corrected); severity + // must reflect current health, not lifetime statistics. + recentSkewWindowCount = 5 + + // recentSkewWindowSec bounds the recent-window in time as well: only + // samples from the last N seconds count as "recent" for severity. + // The effective window is min(recentSkewWindowCount, samples in 1h). + recentSkewWindowSec = 3600 + + // maxPlausibleSkewJumpSec is the largest skew change between + // consecutive samples that we treat as physical drift. Anything larger + // (e.g. a GPS sync that jumps the clock by minutes/days) is rejected + // as an outlier when computing drift. Real microcontroller drift is + // fractions of a second per advert; 60s is a generous safety factor. + maxPlausibleSkewJumpSec = 60.0 + + // theilSenMaxPoints caps the number of points fed to Theil-Sen + // regression (O(n²) in pairs). For nodes with thousands of samples we + // keep the most-recent points, which are also the most relevant for + // current drift. + theilSenMaxPoints = 200 ) // classifySkew maps absolute skew (seconds) to a severity level. @@ -76,6 +101,7 @@ type NodeClockSkew struct { MeanSkewSec float64 `json:"meanSkewSec"` // corrected mean skew (positive = node ahead) MedianSkewSec float64 `json:"medianSkewSec"` // corrected median skew LastSkewSec float64 `json:"lastSkewSec"` // most recent corrected skew + RecentMedianSkewSec float64 `json:"recentMedianSkewSec"` // median across most-recent samples (drives severity, see #789) DriftPerDaySec float64 `json:"driftPerDaySec"` // linear drift rate (sec/day) Severity SkewSeverity `json:"severity"` SampleCount int `json:"sampleCount"` @@ -419,8 +445,52 @@ func (s *PacketStore) getNodeClockSkewLocked(pubkey string) *NodeClockSkew { medSkew := median(allSkews) meanSkew := mean(allSkews) - absMedian := math.Abs(medSkew) - severity := classifySkew(absMedian) + + // Severity is derived from RECENT samples only (issue #789). The + // all-time median is poisoned by historical bad data — a node that + // was off for hours and then GPS-corrected can have median = -59M sec + // while its current skew is -0.8s. Operators need severity to reflect + // current health, so they trust the dashboard. + // + // Sort tsSkews by time and take the last recentSkewWindowCount samples + // (or all samples within recentSkewWindowSec of the latest, whichever + // gives FEWER samples — we want the more-current view; a chatty node + // can fit dozens of samples in 1h, in which case the count cap wins). + sort.Slice(tsSkews, func(i, j int) bool { return tsSkews[i].ts < tsSkews[j].ts }) + + recentSkew := lastSkew + if n := len(tsSkews); n > 0 { + latestTS := tsSkews[n-1].ts + // Index-based window: last K samples. + startByCount := n - recentSkewWindowCount + if startByCount < 0 { + startByCount = 0 + } + // Time-based window: samples newer than latestTS - windowSec. + startByTime := n - 1 + for i := n - 1; i >= 0; i-- { + if latestTS-tsSkews[i].ts <= recentSkewWindowSec { + startByTime = i + } else { + break + } + } + // Pick the narrower (larger-index) of the two windows — the most + // current view of the node's clock health. + start := startByCount + if startByTime > start { + start = startByTime + } + recentVals := make([]float64, 0, n-start) + for i := start; i < n; i++ { + recentVals = append(recentVals, tsSkews[i].skew) + } + if len(recentVals) > 0 { + recentSkew = median(recentVals) + } + } + + severity := classifySkew(math.Abs(recentSkew)) // For no_clock nodes (uninitialized RTC), skip drift — data is meaningless. var drift float64 @@ -432,25 +502,25 @@ func (s *PacketStore) getNodeClockSkewLocked(pubkey string) *NodeClockSkew { } } - // Build sparkline samples from tsSkews (sorted by time). - sort.Slice(tsSkews, func(i, j int) bool { return tsSkews[i].ts < tsSkews[j].ts }) + // Build sparkline samples from tsSkews (already sorted by time above). samples := make([]SkewSample, len(tsSkews)) for i, p := range tsSkews { samples[i] = SkewSample{Timestamp: p.ts, SkewSec: round(p.skew, 1)} } return &NodeClockSkew{ - Pubkey: pubkey, - MeanSkewSec: round(meanSkew, 1), - MedianSkewSec: round(medSkew, 1), - LastSkewSec: round(lastSkew, 1), - DriftPerDaySec: round(drift, 2), - Severity: severity, - SampleCount: totalSamples, - Calibrated: anyCal, - LastAdvertTS: lastAdvTS, - LastObservedTS: lastObsTS, - Samples: samples, + Pubkey: pubkey, + MeanSkewSec: round(meanSkew, 1), + MedianSkewSec: round(medSkew, 1), + LastSkewSec: round(lastSkew, 1), + RecentMedianSkewSec: round(recentSkew, 1), + DriftPerDaySec: round(drift, 2), + Severity: severity, + SampleCount: totalSamples, + Calibrated: anyCal, + LastAdvertTS: lastAdvTS, + LastObservedTS: lastObsTS, + Samples: samples, } } @@ -544,7 +614,18 @@ type tsSkewPair struct { } // computeDrift estimates linear drift in seconds per day from time-ordered -// (timestamp, skew) pairs using simple linear regression. +// (timestamp, skew) pairs. Issue #789: a single GPS-correction event (huge +// skew jump in seconds) used to dominate ordinary least squares and produce +// absurd drift like 1.7M sec/day. We now: +// +// 1. Drop pairs whose consecutive skew jump exceeds maxPlausibleSkewJumpSec +// (clock corrections, not physical drift). This protects both OLS-style +// consumers and Theil-Sen. +// 2. Use Theil-Sen regression — the slope is the median of all pairwise +// slopes, naturally robust to remaining outliers (breakdown point ~29%). +// +// For very small samples after filtering we fall back to a simple slope +// between first and last calibrated samples. func computeDrift(pairs []tsSkewPair) float64 { if len(pairs) < 2 { return 0 @@ -560,21 +641,55 @@ func computeDrift(pairs []tsSkewPair) float64 { return 0 } - // Simple linear regression: skew = a + b*t - n := float64(len(pairs)) - var sumX, sumY, sumXY, sumX2 float64 - for _, p := range pairs { - x := float64(p.ts - pairs[0].ts) // normalize to avoid large numbers - y := p.skew - sumX += x - sumY += y - sumXY += x * y - sumX2 += x * x + // Outlier filter: drop samples where the skew jumps more than + // maxPlausibleSkewJumpSec from the running "stable" baseline. + // We anchor on the first sample, then accept each subsequent point + // that's within the threshold of the most recent accepted point — + // this preserves a slow drift while rejecting correction events. + filtered := make([]tsSkewPair, 0, len(pairs)) + filtered = append(filtered, pairs[0]) + for i := 1; i < len(pairs); i++ { + prev := filtered[len(filtered)-1] + if math.Abs(pairs[i].skew-prev.skew) <= maxPlausibleSkewJumpSec { + filtered = append(filtered, pairs[i]) + } } - denom := n*sumX2 - sumX*sumX - if denom == 0 { + // If the filter killed too much (e.g. unstable node), fall back to the + // raw series so we at least produce *something* — it'll be capped by + // maxReasonableDriftPerDay downstream. + if len(filtered) < 2 || float64(filtered[len(filtered)-1].ts-filtered[0].ts) < 3600 { + filtered = pairs + } + + // Cap point count for Theil-Sen (O(n²) on pairs). Keep most-recent. + if len(filtered) > theilSenMaxPoints { + filtered = filtered[len(filtered)-theilSenMaxPoints:] + } + + return theilSenSlope(filtered) * 86400 // sec/sec → sec/day +} + +// theilSenSlope returns the Theil-Sen estimator: median of all pairwise +// slopes (yj - yi) / (tj - ti) for i < j. Naturally robust to outliers. +// Pairs must be sorted by timestamp ascending. +func theilSenSlope(pairs []tsSkewPair) float64 { + n := len(pairs) + if n < 2 { return 0 } - slope := (n*sumXY - sumX*sumY) / denom // seconds of drift per second - return slope * 86400 // convert to seconds per day + // Pre-allocate: n*(n-1)/2 pairs. + slopes := make([]float64, 0, n*(n-1)/2) + for i := 0; i < n; i++ { + for j := i + 1; j < n; j++ { + dt := float64(pairs[j].ts - pairs[i].ts) + if dt <= 0 { + continue + } + slopes = append(slopes, (pairs[j].skew-pairs[i].skew)/dt) + } + } + if len(slopes) == 0 { + return 0 + } + return median(slopes) } diff --git a/cmd/server/clock_skew_test.go b/cmd/server/clock_skew_test.go index bb8a3d6c..fc2e29e7 100644 --- a/cmd/server/clock_skew_test.go +++ b/cmd/server/clock_skew_test.go @@ -544,3 +544,159 @@ func TestGetNodeClockSkew_NormalNodeWithDrift(t *testing.T) { func formatInt64(n int64) string { return fmt.Sprintf("%d", n) } + +// ── #789: Recent-window severity & robust drift ─────────────────────────────── + +// TestSeverityUsesRecentNotMedian: 100 historical bad samples (skew=-60s, +// each ~5min apart) followed by 5 fresh good samples (skew=-1s). All-time +// median is still huge-ish but recent-window severity must reflect the +// current healthy state. +func TestSeverityUsesRecentNotMedian(t *testing.T) { + ps := NewPacketStore(nil, nil) + pt := 4 + + baseObs := int64(1700000000) + var txs []*StoreTx + for i := 0; i < 105; i++ { + obsTS := baseObs + int64(i)*300 // 5 min apart + var skew int64 = -60 + if i >= 100 { + skew = -1 // good samples at the tail + } + advTS := obsTS + skew + tx := &StoreTx{ + Hash: fmt.Sprintf("recent-h%03d", i), + PayloadType: &pt, + DecodedJSON: `{"payload":{"timestamp":` + formatInt64(advTS) + `}}`, + Observations: []*StoreObs{ + {ObserverID: "obs1", Timestamp: time.Unix(obsTS, 0).UTC().Format(time.RFC3339)}, + }, + } + txs = append(txs, tx) + } + ps.mu.Lock() + ps.byNode["RECENT"] = txs + for _, tx := range txs { + ps.byPayloadType[4] = append(ps.byPayloadType[4], tx) + } + ps.clockSkew.computeInterval = 0 + ps.mu.Unlock() + + r := ps.GetNodeClockSkew("RECENT") + if r == nil { + t.Fatal("nil result") + } + if r.Severity != SkewOK { + t.Errorf("severity = %v, want ok (recent samples are healthy)", r.Severity) + } + if math.Abs(r.RecentMedianSkewSec) > 5 { + t.Errorf("recentMedianSkewSec = %v, want ~-1", r.RecentMedianSkewSec) + } + // Historical median should still be retained for context. + if math.Abs(r.MedianSkewSec) < 30 { + t.Errorf("medianSkewSec = %v, expected historical median to remain large", r.MedianSkewSec) + } +} + +// TestDriftRejectsCorrectionJump: 30 minutes of clean linear drift, then a +// single 60-second skew jump. The pre-jump slope should win — drift must +// not be catastrophically inflated by the correction event. +func TestDriftRejectsCorrectionJump(t *testing.T) { + pairs := []tsSkewPair{} + // 30 min of stable, ~12 sec/day drift: 1s per 7200s. + for i := 0; i < 12; i++ { + ts := int64(i) * 300 + skew := float64(i) * (1.0 / 24.0) // ~0.04s per 5min step → 12 s/day + pairs = append(pairs, tsSkewPair{ts: ts, skew: skew}) + } + // Wait an hour, then a single 1000-sec correction jump (clearly outlier). + pairs = append(pairs, tsSkewPair{ts: 3600 + 12*300, skew: 1000}) + + drift := computeDrift(pairs) + // Without rejection this would be ~ (1000-0)/(end-0) * 86400 = enormous. + if math.Abs(drift) > 100 { + t.Errorf("drift = %v, expected small (~12 s/day), correction jump should be filtered", drift) + } +} + +// TestTheilSenMatchesOLSWhenClean: on clean linear data Theil-Sen should +// produce essentially the OLS answer. +func TestTheilSenMatchesOLSWhenClean(t *testing.T) { + // 1 sec drift per hour = 24 sec/day, 20 evenly-spaced samples. + pairs := []tsSkewPair{} + for i := 0; i < 20; i++ { + pairs = append(pairs, tsSkewPair{ + ts: int64(i) * 600, + skew: float64(i) * (600.0 / 3600.0), + }) + } + drift := computeDrift(pairs) + if math.Abs(drift-24.0) > 0.25 { // ~1% + t.Errorf("drift = %v, want ~24", drift) + } +} + +// TestReporterScenario_789: reproduce the exact scenario from issue #789. +// Reporter saw mean=-52565156, median=-59063561, last=-0.8, sample count +// 1662, drift +1793549.9 s/day, severity=absurd. After the fix, severity +// must be ok (recent samples are healthy) and drift must be sane. +func TestReporterScenario_789(t *testing.T) { + ps := NewPacketStore(nil, nil) + pt := 4 + + baseObs := int64(1700000000) + var txs []*StoreTx + // 1657 samples with the bad ~-683-day skew (the historical poison), + // then 5 freshly corrected samples at -0.8s — totals 1662. + for i := 0; i < 1662; i++ { + obsTS := baseObs + int64(i)*60 // 1 min apart + var skew int64 + if i < 1657 { + skew = -59063561 // ~ -683 days + } else { + skew = -1 // corrected (rounded; reporter saw -0.8) + } + advTS := obsTS + skew + tx := &StoreTx{ + Hash: fmt.Sprintf("rep-%04d", i), + PayloadType: &pt, + DecodedJSON: `{"payload":{"timestamp":` + formatInt64(advTS) + `}}`, + Observations: []*StoreObs{ + {ObserverID: "obs1", Timestamp: time.Unix(obsTS, 0).UTC().Format(time.RFC3339)}, + }, + } + txs = append(txs, tx) + } + ps.mu.Lock() + ps.byNode["REPNODE"] = txs + for _, tx := range txs { + ps.byPayloadType[4] = append(ps.byPayloadType[4], tx) + } + ps.clockSkew.computeInterval = 0 + ps.mu.Unlock() + + r := ps.GetNodeClockSkew("REPNODE") + if r == nil { + t.Fatal("nil result") + } + // Severity must reflect current health, not the all-time median. + if r.Severity != SkewOK && r.Severity != SkewWarning { + t.Errorf("severity = %v, want ok/warning (recent samples are healthy)", r.Severity) + } + if math.Abs(r.RecentMedianSkewSec) > 5 { + t.Errorf("recentMedianSkewSec = %v, want near 0", r.RecentMedianSkewSec) + } + // Drift must not be absurd. The historical jump is one event between + // the 1657th and 1658th sample; outlier rejection must contain it. + if math.Abs(r.DriftPerDaySec) > maxReasonableDriftPerDay { + t.Errorf("drift = %v, must be <= cap %v", r.DriftPerDaySec, maxReasonableDriftPerDay) + } + // And it should be close to zero (stable historical + stable corrected). + if math.Abs(r.DriftPerDaySec) > 1000 { + t.Errorf("drift = %v, expected near zero after outlier rejection", r.DriftPerDaySec) + } + // Historical median is preserved as context. + if math.Abs(r.MedianSkewSec) < 1e6 { + t.Errorf("medianSkewSec = %v, expected historical poison preserved as context", r.MedianSkewSec) + } +} diff --git a/public/analytics.js b/public/analytics.js index 316db9fa..9eb080d0 100644 --- a/public/analytics.js +++ b/public/analytics.js @@ -3448,7 +3448,7 @@ function destroy() { _analyticsData = {}; _channelData = null; if (_ngState && _ if (sortKey === 'severity') { v = (SKEW_SEVERITY_ORDER[a.severity] || 9) - (SKEW_SEVERITY_ORDER[b.severity] || 9); } else if (sortKey === 'skew') { - v = Math.abs(b.medianSkewSec || 0) - Math.abs(a.medianSkewSec || 0); + v = Math.abs(window.currentSkewValue(b) || 0) - Math.abs(window.currentSkewValue(a) || 0); } else if (sortKey === 'name') { v = (a.nodeName || '').localeCompare(b.nodeName || ''); } else if (sortKey === 'drift') { @@ -3475,12 +3475,13 @@ function destroy() { _analyticsData = {}; _channelData = null; if (_ngState && _ var rowsHtml = filtered.map(function(n) { var rowClass = 'clock-fleet-row--' + (n.severity || 'ok'); var lastAdv = n.lastObservedTS ? new Date(n.lastObservedTS * 1000).toISOString().replace('T', ' ').replace(/\.\d+Z/, ' UTC') : '—'; - var skewText = n.severity === 'no_clock' ? 'No Clock' : formatSkew(n.medianSkewSec); + var skewVal = window.currentSkewValue(n); + var skewText = n.severity === 'no_clock' ? 'No Clock' : formatSkew(skewVal); var driftText = n.severity === 'no_clock' || !n.driftPerDaySec ? '–' : formatDrift(n.driftPerDaySec); return '