From a0fddb50aa89e3d0f2208540a73fe67faf2c9d0f Mon Sep 17 00:00:00 2001 From: Kpa-clawbot Date: Mon, 20 Apr 2026 22:47:10 -0700 Subject: [PATCH] fix(#789): severity from recent samples; Theil-Sen drift with outlier rejection (#828) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #789. ## The two bugs 1. **Severity from stale median.** `classifySkew(absMedian)` used the all-time `MedianSkewSec` over every advert ever recorded for the node. A repeater that was off for hours and then GPS-corrected stayed pinned to `absurd` because hundreds of historical bad samples poisoned the median. Reporter's case: `medianSkewSec: -59,063,561.8` while `lastSkewSec: -0.8` — current health was perfect, dashboard said catastrophic. 2. **Drift from a single correction jump.** Drift used OLS over every `(ts, skew)` pair, with no outlier rejection. A single GPS-correction event (skew jumps millions of seconds in ~30s) dominated the regression and produced `+1,793,549.9 s/day` — physically nonsense; the existing `maxReasonableDriftPerDay` cap then zeroed it (better than absurd, but still useless). ## The two fixes 1. **Recent-window severity.** New field `recentMedianSkewSec` = median over the last `N=5` samples or last `1h`, whichever is narrower (more current view). Severity now derives from `abs(recentMedianSkewSec)`. `MeanSkewSec`, `MedianSkewSec`, `LastSkewSec` are preserved unchanged so the frontend, fleet view, and any external consumers continue to work. 2. **Theil-Sen drift with outlier filter.** Drift now uses the Theil-Sen estimator (median of all pairwise slopes — textbook robust regression, ~29% breakdown point) on a series pre-filtered to drop samples whose skew jumps more than `maxPlausibleSkewJumpSec = 60s` from the previous accepted point. Real µC drift is fractions of a second per advert; clock corrections fall well outside. Capped at `theilSenMaxPoints = 200` (most-recent) so O(n²) stays bounded for chatty nodes. ## What stays the same - Epoch-0 / out-of-range advert filter (PR #769). - `minDriftSamples = 5` floor. - `maxReasonableDriftPerDay = 86400` hard backstop. - API shape: only additions (`recentMedianSkewSec`); no fields removed or renamed. ## Tests All in `cmd/server/clock_skew_test.go`: - `TestSeverityUsesRecentNotMedian` — 100 bad samples (-60s) + 5 good (-1s) → severity = `ok`, historical median still huge. - `TestDriftRejectsCorrectionJump` — 30 min of clean linear drift + one 1000s jump → drift small (~12 s/day). - `TestTheilSenMatchesOLSWhenClean` — clean linear data, Theil-Sen within ~1% of OLS. - `TestReporterScenario_789` — exact reproducer: 1662 samples, 1657 @ -683 days then 5 @ -1s → severity `ok`, `recentMedianSkewSec ≈ 0`, drift bounded; legacy `medianSkewSec` preserved as historical context. `go test ./... -count=1` (cmd/server) and `node test-frontend-helpers.js` both pass. --------- Co-authored-by: clawbot Co-authored-by: you --- cmd/server/clock_skew.go | 175 ++++++++++++++++++++++++++++------ cmd/server/clock_skew_test.go | 156 ++++++++++++++++++++++++++++++ public/analytics.js | 7 +- public/nodes.js | 9 +- public/roles.js | 9 ++ 5 files changed, 319 insertions(+), 37 deletions(-) diff --git a/cmd/server/clock_skew.go b/cmd/server/clock_skew.go index f6fc024d..b3298b88 100644 --- a/cmd/server/clock_skew.go +++ b/cmd/server/clock_skew.go @@ -33,6 +33,31 @@ const ( // maxReasonableDriftPerDay caps drift display. Physically impossible // drift rates (> 1 day/day) indicate insufficient or outlier samples. maxReasonableDriftPerDay = 86400.0 + + // recentSkewWindowCount is the number of most-recent advert samples + // used to derive the "current" skew for severity classification (see + // issue #789). The all-time median is poisoned by historical bad + // samples (e.g. a node that was off and then GPS-corrected); severity + // must reflect current health, not lifetime statistics. + recentSkewWindowCount = 5 + + // recentSkewWindowSec bounds the recent-window in time as well: only + // samples from the last N seconds count as "recent" for severity. + // The effective window is min(recentSkewWindowCount, samples in 1h). + recentSkewWindowSec = 3600 + + // maxPlausibleSkewJumpSec is the largest skew change between + // consecutive samples that we treat as physical drift. Anything larger + // (e.g. a GPS sync that jumps the clock by minutes/days) is rejected + // as an outlier when computing drift. Real microcontroller drift is + // fractions of a second per advert; 60s is a generous safety factor. + maxPlausibleSkewJumpSec = 60.0 + + // theilSenMaxPoints caps the number of points fed to Theil-Sen + // regression (O(n²) in pairs). For nodes with thousands of samples we + // keep the most-recent points, which are also the most relevant for + // current drift. + theilSenMaxPoints = 200 ) // classifySkew maps absolute skew (seconds) to a severity level. @@ -76,6 +101,7 @@ type NodeClockSkew struct { MeanSkewSec float64 `json:"meanSkewSec"` // corrected mean skew (positive = node ahead) MedianSkewSec float64 `json:"medianSkewSec"` // corrected median skew LastSkewSec float64 `json:"lastSkewSec"` // most recent corrected skew + RecentMedianSkewSec float64 `json:"recentMedianSkewSec"` // median across most-recent samples (drives severity, see #789) DriftPerDaySec float64 `json:"driftPerDaySec"` // linear drift rate (sec/day) Severity SkewSeverity `json:"severity"` SampleCount int `json:"sampleCount"` @@ -419,8 +445,52 @@ func (s *PacketStore) getNodeClockSkewLocked(pubkey string) *NodeClockSkew { medSkew := median(allSkews) meanSkew := mean(allSkews) - absMedian := math.Abs(medSkew) - severity := classifySkew(absMedian) + + // Severity is derived from RECENT samples only (issue #789). The + // all-time median is poisoned by historical bad data — a node that + // was off for hours and then GPS-corrected can have median = -59M sec + // while its current skew is -0.8s. Operators need severity to reflect + // current health, so they trust the dashboard. + // + // Sort tsSkews by time and take the last recentSkewWindowCount samples + // (or all samples within recentSkewWindowSec of the latest, whichever + // gives FEWER samples — we want the more-current view; a chatty node + // can fit dozens of samples in 1h, in which case the count cap wins). + sort.Slice(tsSkews, func(i, j int) bool { return tsSkews[i].ts < tsSkews[j].ts }) + + recentSkew := lastSkew + if n := len(tsSkews); n > 0 { + latestTS := tsSkews[n-1].ts + // Index-based window: last K samples. + startByCount := n - recentSkewWindowCount + if startByCount < 0 { + startByCount = 0 + } + // Time-based window: samples newer than latestTS - windowSec. + startByTime := n - 1 + for i := n - 1; i >= 0; i-- { + if latestTS-tsSkews[i].ts <= recentSkewWindowSec { + startByTime = i + } else { + break + } + } + // Pick the narrower (larger-index) of the two windows — the most + // current view of the node's clock health. + start := startByCount + if startByTime > start { + start = startByTime + } + recentVals := make([]float64, 0, n-start) + for i := start; i < n; i++ { + recentVals = append(recentVals, tsSkews[i].skew) + } + if len(recentVals) > 0 { + recentSkew = median(recentVals) + } + } + + severity := classifySkew(math.Abs(recentSkew)) // For no_clock nodes (uninitialized RTC), skip drift — data is meaningless. var drift float64 @@ -432,25 +502,25 @@ func (s *PacketStore) getNodeClockSkewLocked(pubkey string) *NodeClockSkew { } } - // Build sparkline samples from tsSkews (sorted by time). - sort.Slice(tsSkews, func(i, j int) bool { return tsSkews[i].ts < tsSkews[j].ts }) + // Build sparkline samples from tsSkews (already sorted by time above). samples := make([]SkewSample, len(tsSkews)) for i, p := range tsSkews { samples[i] = SkewSample{Timestamp: p.ts, SkewSec: round(p.skew, 1)} } return &NodeClockSkew{ - Pubkey: pubkey, - MeanSkewSec: round(meanSkew, 1), - MedianSkewSec: round(medSkew, 1), - LastSkewSec: round(lastSkew, 1), - DriftPerDaySec: round(drift, 2), - Severity: severity, - SampleCount: totalSamples, - Calibrated: anyCal, - LastAdvertTS: lastAdvTS, - LastObservedTS: lastObsTS, - Samples: samples, + Pubkey: pubkey, + MeanSkewSec: round(meanSkew, 1), + MedianSkewSec: round(medSkew, 1), + LastSkewSec: round(lastSkew, 1), + RecentMedianSkewSec: round(recentSkew, 1), + DriftPerDaySec: round(drift, 2), + Severity: severity, + SampleCount: totalSamples, + Calibrated: anyCal, + LastAdvertTS: lastAdvTS, + LastObservedTS: lastObsTS, + Samples: samples, } } @@ -544,7 +614,18 @@ type tsSkewPair struct { } // computeDrift estimates linear drift in seconds per day from time-ordered -// (timestamp, skew) pairs using simple linear regression. +// (timestamp, skew) pairs. Issue #789: a single GPS-correction event (huge +// skew jump in seconds) used to dominate ordinary least squares and produce +// absurd drift like 1.7M sec/day. We now: +// +// 1. Drop pairs whose consecutive skew jump exceeds maxPlausibleSkewJumpSec +// (clock corrections, not physical drift). This protects both OLS-style +// consumers and Theil-Sen. +// 2. Use Theil-Sen regression — the slope is the median of all pairwise +// slopes, naturally robust to remaining outliers (breakdown point ~29%). +// +// For very small samples after filtering we fall back to a simple slope +// between first and last calibrated samples. func computeDrift(pairs []tsSkewPair) float64 { if len(pairs) < 2 { return 0 @@ -560,21 +641,55 @@ func computeDrift(pairs []tsSkewPair) float64 { return 0 } - // Simple linear regression: skew = a + b*t - n := float64(len(pairs)) - var sumX, sumY, sumXY, sumX2 float64 - for _, p := range pairs { - x := float64(p.ts - pairs[0].ts) // normalize to avoid large numbers - y := p.skew - sumX += x - sumY += y - sumXY += x * y - sumX2 += x * x + // Outlier filter: drop samples where the skew jumps more than + // maxPlausibleSkewJumpSec from the running "stable" baseline. + // We anchor on the first sample, then accept each subsequent point + // that's within the threshold of the most recent accepted point — + // this preserves a slow drift while rejecting correction events. + filtered := make([]tsSkewPair, 0, len(pairs)) + filtered = append(filtered, pairs[0]) + for i := 1; i < len(pairs); i++ { + prev := filtered[len(filtered)-1] + if math.Abs(pairs[i].skew-prev.skew) <= maxPlausibleSkewJumpSec { + filtered = append(filtered, pairs[i]) + } } - denom := n*sumX2 - sumX*sumX - if denom == 0 { + // If the filter killed too much (e.g. unstable node), fall back to the + // raw series so we at least produce *something* — it'll be capped by + // maxReasonableDriftPerDay downstream. + if len(filtered) < 2 || float64(filtered[len(filtered)-1].ts-filtered[0].ts) < 3600 { + filtered = pairs + } + + // Cap point count for Theil-Sen (O(n²) on pairs). Keep most-recent. + if len(filtered) > theilSenMaxPoints { + filtered = filtered[len(filtered)-theilSenMaxPoints:] + } + + return theilSenSlope(filtered) * 86400 // sec/sec → sec/day +} + +// theilSenSlope returns the Theil-Sen estimator: median of all pairwise +// slopes (yj - yi) / (tj - ti) for i < j. Naturally robust to outliers. +// Pairs must be sorted by timestamp ascending. +func theilSenSlope(pairs []tsSkewPair) float64 { + n := len(pairs) + if n < 2 { return 0 } - slope := (n*sumXY - sumX*sumY) / denom // seconds of drift per second - return slope * 86400 // convert to seconds per day + // Pre-allocate: n*(n-1)/2 pairs. + slopes := make([]float64, 0, n*(n-1)/2) + for i := 0; i < n; i++ { + for j := i + 1; j < n; j++ { + dt := float64(pairs[j].ts - pairs[i].ts) + if dt <= 0 { + continue + } + slopes = append(slopes, (pairs[j].skew-pairs[i].skew)/dt) + } + } + if len(slopes) == 0 { + return 0 + } + return median(slopes) } diff --git a/cmd/server/clock_skew_test.go b/cmd/server/clock_skew_test.go index bb8a3d6c..fc2e29e7 100644 --- a/cmd/server/clock_skew_test.go +++ b/cmd/server/clock_skew_test.go @@ -544,3 +544,159 @@ func TestGetNodeClockSkew_NormalNodeWithDrift(t *testing.T) { func formatInt64(n int64) string { return fmt.Sprintf("%d", n) } + +// ── #789: Recent-window severity & robust drift ─────────────────────────────── + +// TestSeverityUsesRecentNotMedian: 100 historical bad samples (skew=-60s, +// each ~5min apart) followed by 5 fresh good samples (skew=-1s). All-time +// median is still huge-ish but recent-window severity must reflect the +// current healthy state. +func TestSeverityUsesRecentNotMedian(t *testing.T) { + ps := NewPacketStore(nil, nil) + pt := 4 + + baseObs := int64(1700000000) + var txs []*StoreTx + for i := 0; i < 105; i++ { + obsTS := baseObs + int64(i)*300 // 5 min apart + var skew int64 = -60 + if i >= 100 { + skew = -1 // good samples at the tail + } + advTS := obsTS + skew + tx := &StoreTx{ + Hash: fmt.Sprintf("recent-h%03d", i), + PayloadType: &pt, + DecodedJSON: `{"payload":{"timestamp":` + formatInt64(advTS) + `}}`, + Observations: []*StoreObs{ + {ObserverID: "obs1", Timestamp: time.Unix(obsTS, 0).UTC().Format(time.RFC3339)}, + }, + } + txs = append(txs, tx) + } + ps.mu.Lock() + ps.byNode["RECENT"] = txs + for _, tx := range txs { + ps.byPayloadType[4] = append(ps.byPayloadType[4], tx) + } + ps.clockSkew.computeInterval = 0 + ps.mu.Unlock() + + r := ps.GetNodeClockSkew("RECENT") + if r == nil { + t.Fatal("nil result") + } + if r.Severity != SkewOK { + t.Errorf("severity = %v, want ok (recent samples are healthy)", r.Severity) + } + if math.Abs(r.RecentMedianSkewSec) > 5 { + t.Errorf("recentMedianSkewSec = %v, want ~-1", r.RecentMedianSkewSec) + } + // Historical median should still be retained for context. + if math.Abs(r.MedianSkewSec) < 30 { + t.Errorf("medianSkewSec = %v, expected historical median to remain large", r.MedianSkewSec) + } +} + +// TestDriftRejectsCorrectionJump: 30 minutes of clean linear drift, then a +// single 60-second skew jump. The pre-jump slope should win — drift must +// not be catastrophically inflated by the correction event. +func TestDriftRejectsCorrectionJump(t *testing.T) { + pairs := []tsSkewPair{} + // 30 min of stable, ~12 sec/day drift: 1s per 7200s. + for i := 0; i < 12; i++ { + ts := int64(i) * 300 + skew := float64(i) * (1.0 / 24.0) // ~0.04s per 5min step → 12 s/day + pairs = append(pairs, tsSkewPair{ts: ts, skew: skew}) + } + // Wait an hour, then a single 1000-sec correction jump (clearly outlier). + pairs = append(pairs, tsSkewPair{ts: 3600 + 12*300, skew: 1000}) + + drift := computeDrift(pairs) + // Without rejection this would be ~ (1000-0)/(end-0) * 86400 = enormous. + if math.Abs(drift) > 100 { + t.Errorf("drift = %v, expected small (~12 s/day), correction jump should be filtered", drift) + } +} + +// TestTheilSenMatchesOLSWhenClean: on clean linear data Theil-Sen should +// produce essentially the OLS answer. +func TestTheilSenMatchesOLSWhenClean(t *testing.T) { + // 1 sec drift per hour = 24 sec/day, 20 evenly-spaced samples. + pairs := []tsSkewPair{} + for i := 0; i < 20; i++ { + pairs = append(pairs, tsSkewPair{ + ts: int64(i) * 600, + skew: float64(i) * (600.0 / 3600.0), + }) + } + drift := computeDrift(pairs) + if math.Abs(drift-24.0) > 0.25 { // ~1% + t.Errorf("drift = %v, want ~24", drift) + } +} + +// TestReporterScenario_789: reproduce the exact scenario from issue #789. +// Reporter saw mean=-52565156, median=-59063561, last=-0.8, sample count +// 1662, drift +1793549.9 s/day, severity=absurd. After the fix, severity +// must be ok (recent samples are healthy) and drift must be sane. +func TestReporterScenario_789(t *testing.T) { + ps := NewPacketStore(nil, nil) + pt := 4 + + baseObs := int64(1700000000) + var txs []*StoreTx + // 1657 samples with the bad ~-683-day skew (the historical poison), + // then 5 freshly corrected samples at -0.8s — totals 1662. + for i := 0; i < 1662; i++ { + obsTS := baseObs + int64(i)*60 // 1 min apart + var skew int64 + if i < 1657 { + skew = -59063561 // ~ -683 days + } else { + skew = -1 // corrected (rounded; reporter saw -0.8) + } + advTS := obsTS + skew + tx := &StoreTx{ + Hash: fmt.Sprintf("rep-%04d", i), + PayloadType: &pt, + DecodedJSON: `{"payload":{"timestamp":` + formatInt64(advTS) + `}}`, + Observations: []*StoreObs{ + {ObserverID: "obs1", Timestamp: time.Unix(obsTS, 0).UTC().Format(time.RFC3339)}, + }, + } + txs = append(txs, tx) + } + ps.mu.Lock() + ps.byNode["REPNODE"] = txs + for _, tx := range txs { + ps.byPayloadType[4] = append(ps.byPayloadType[4], tx) + } + ps.clockSkew.computeInterval = 0 + ps.mu.Unlock() + + r := ps.GetNodeClockSkew("REPNODE") + if r == nil { + t.Fatal("nil result") + } + // Severity must reflect current health, not the all-time median. + if r.Severity != SkewOK && r.Severity != SkewWarning { + t.Errorf("severity = %v, want ok/warning (recent samples are healthy)", r.Severity) + } + if math.Abs(r.RecentMedianSkewSec) > 5 { + t.Errorf("recentMedianSkewSec = %v, want near 0", r.RecentMedianSkewSec) + } + // Drift must not be absurd. The historical jump is one event between + // the 1657th and 1658th sample; outlier rejection must contain it. + if math.Abs(r.DriftPerDaySec) > maxReasonableDriftPerDay { + t.Errorf("drift = %v, must be <= cap %v", r.DriftPerDaySec, maxReasonableDriftPerDay) + } + // And it should be close to zero (stable historical + stable corrected). + if math.Abs(r.DriftPerDaySec) > 1000 { + t.Errorf("drift = %v, expected near zero after outlier rejection", r.DriftPerDaySec) + } + // Historical median is preserved as context. + if math.Abs(r.MedianSkewSec) < 1e6 { + t.Errorf("medianSkewSec = %v, expected historical poison preserved as context", r.MedianSkewSec) + } +} diff --git a/public/analytics.js b/public/analytics.js index 316db9fa..9eb080d0 100644 --- a/public/analytics.js +++ b/public/analytics.js @@ -3448,7 +3448,7 @@ function destroy() { _analyticsData = {}; _channelData = null; if (_ngState && _ if (sortKey === 'severity') { v = (SKEW_SEVERITY_ORDER[a.severity] || 9) - (SKEW_SEVERITY_ORDER[b.severity] || 9); } else if (sortKey === 'skew') { - v = Math.abs(b.medianSkewSec || 0) - Math.abs(a.medianSkewSec || 0); + v = Math.abs(window.currentSkewValue(b) || 0) - Math.abs(window.currentSkewValue(a) || 0); } else if (sortKey === 'name') { v = (a.nodeName || '').localeCompare(b.nodeName || ''); } else if (sortKey === 'drift') { @@ -3475,12 +3475,13 @@ function destroy() { _analyticsData = {}; _channelData = null; if (_ngState && _ var rowsHtml = filtered.map(function(n) { var rowClass = 'clock-fleet-row--' + (n.severity || 'ok'); var lastAdv = n.lastObservedTS ? new Date(n.lastObservedTS * 1000).toISOString().replace('T', ' ').replace(/\.\d+Z/, ' UTC') : '—'; - var skewText = n.severity === 'no_clock' ? 'No Clock' : formatSkew(n.medianSkewSec); + var skewVal = window.currentSkewValue(n); + var skewText = n.severity === 'no_clock' ? 'No Clock' : formatSkew(skewVal); var driftText = n.severity === 'no_clock' || !n.driftPerDaySec ? '–' : formatDrift(n.driftPerDaySec); return '' + '' + esc(n.nodeName || n.pubkey.slice(0, 12)) + '' + '' + skewText + '' + - '' + renderSkewBadge(n.severity, n.medianSkewSec) + '' + + '' + renderSkewBadge(n.severity, skewVal) + '' + '' + driftText + '' + '' + lastAdv + '' + ''; diff --git a/public/nodes.js b/public/nodes.js index 93ed3216..9d6d72e0 100644 --- a/public/nodes.js +++ b/public/nodes.js @@ -788,7 +788,7 @@ let _themeRefreshHandler = null; let _allNodes = null; // cached full node list - let _fleetSkew = null; // cached clock skew map: pubkey → {severity, medianSkewSec, ...} + let _fleetSkew = null; // cached clock skew map: pubkey → {severity, recentMedianSkewSec, medianSkewSec, ...} /** * Fetch per-node clock skew and render into the given container element. @@ -803,14 +803,15 @@ container.style.display = ''; var driftHtml = cs.driftPerDaySec ? '
Drift: ' + formatDrift(cs.driftPerDaySec) + '
' : ''; var sparkHtml = renderSkewSparkline(cs.samples, 200, 32); + var skewVal = window.currentSkewValue(cs); var skewDisplay = cs.severity === 'no_clock' ? 'No Clock' - : '' + formatSkew(cs.medianSkewSec) + ''; + : '' + formatSkew(skewVal) + ''; container.innerHTML = '

⏰ Clock Skew

' + '
' + skewDisplay + - renderSkewBadge(cs.severity, cs.medianSkewSec) + + renderSkewBadge(cs.severity, skewVal) + (cs.calibrated ? ' ✓ calibrated' : '') + '
' + driftHtml + @@ -1116,7 +1117,7 @@ const status = getNodeStatus(n.role || 'companion', lastSeenTime ? new Date(lastSeenTime).getTime() : 0); const lastSeenClass = status === 'active' ? 'last-seen-active' : 'last-seen-stale'; const cs = _fleetSkew && _fleetSkew[n.public_key]; - const skewBadgeHtml = cs && cs.severity && cs.severity !== 'ok' ? renderSkewBadge(cs.severity, cs.medianSkewSec) : ''; + const skewBadgeHtml = cs && cs.severity && cs.severity !== 'ok' ? renderSkewBadge(cs.severity, window.currentSkewValue(cs)) : ''; return ` ${favStar(n.public_key, 'node-fav')}${isClaimed ? ' ' : ''}${n.name || '(unnamed)'}${dupNameBadge(n.name, n.public_key, dupMap)}${skewBadgeHtml} ${truncate(n.public_key, 16)} diff --git a/public/roles.js b/public/roles.js index bbeeeb88..646f687c 100644 --- a/public/roles.js +++ b/public/roles.js @@ -429,6 +429,15 @@ return (secPerDay >= 0 ? '+' : '') + secPerDay.toFixed(1) + ' s/day'; }; + /** Pick the skew value that drives current-health UI: prefer the + * recent-window median (#789, current health) over the all-time median + * (poisoned by historical bad samples). Falls back gracefully if the + * field isn't present (older API responses). */ + window.currentSkewValue = function(cs) { + if (!cs) return null; + return cs.recentMedianSkewSec != null ? cs.recentMedianSkewSec : cs.medianSkewSec; + }; + /** Render a clock skew badge HTML */ window.renderSkewBadge = function(severity, skewSec) { if (!severity) return '';