From 7018ddeaf390e64193201daae18f4d2a24df255e Mon Sep 17 00:00:00 2001 From: you Date: Fri, 24 Apr 2026 22:49:53 +0000 Subject: [PATCH] feat(#690): add per-hash evidence and calibration summary to node clock-skew API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extend GET /api/nodes/{pubkey}/clock-skew to return recentHashEvidence (most recent 10 hashes with per-observer raw/corrected skew breakdown) and calibrationSummary (total/calibrated/uncalibrated sample counts). Evidence is cached during ClockSkewEngine.Recompute() so the route handler remains cheap. Fleet endpoint omits evidence to keep payload small. Test: TestNodeClockSkew_EvidencePayload — 3-observer scenario verifying per-hash array shape, corrected = raw + offset math, and median. --- cmd/server/clock_skew.go | 127 ++++++++++++++++++++++++++++++++-- cmd/server/clock_skew_test.go | 105 +++++++++++++++++++++++++++- 2 files changed, 226 insertions(+), 6 deletions(-) diff --git a/cmd/server/clock_skew.go b/cmd/server/clock_skew.go index 0b2c363..a6a8c64 100644 --- a/cmd/server/clock_skew.go +++ b/cmd/server/clock_skew.go @@ -120,6 +120,8 @@ type NodeClockSkew struct { GoodFraction float64 `json:"goodFraction"` // fraction of recent samples with |skew| <= 1h RecentBadSampleCount int `json:"recentBadSampleCount"` // count of recent samples with |skew| > 1h RecentSampleCount int `json:"recentSampleCount"` // total recent samples in window + RecentHashEvidence []HashEvidence `json:"recentHashEvidence,omitempty"` + CalibrationSummary *CalibrationSummary `json:"calibrationSummary,omitempty"` NodeName string `json:"nodeName,omitempty"` // populated in fleet responses NodeRole string `json:"nodeRole,omitempty"` // populated in fleet responses } @@ -130,6 +132,31 @@ type SkewSample struct { SkewSec float64 `json:"skew"` // corrected skew in seconds } +// HashEvidenceObserver is one observer's contribution to a per-hash evidence entry. +type HashEvidenceObserver struct { + ObserverID string `json:"observerID"` + ObserverName string `json:"observerName"` + RawSkewSec float64 `json:"rawSkewSec"` + CorrectedSkewSec float64 `json:"correctedSkewSec"` + ObserverOffsetSec float64 `json:"observerOffsetSec"` + Calibrated bool `json:"calibrated"` +} + +// HashEvidence is per-hash clock skew evidence showing individual observer contributions. +type HashEvidence struct { + Hash string `json:"hash"` + Observers []HashEvidenceObserver `json:"observers"` + MedianCorrectedSkewSec float64 `json:"medianCorrectedSkewSec"` + Timestamp int64 `json:"timestamp"` +} + +// CalibrationSummary counts how many samples were corrected via observer calibration. +type CalibrationSummary struct { + TotalSamples int `json:"totalSamples"` + CalibratedSamples int `json:"calibratedSamples"` + UncalibratedSamples int `json:"uncalibratedSamples"` +} + // txSkewResult maps tx hash → per-transmission skew stats. This is an // intermediate result keyed by hash (not pubkey); the store maps hash → pubkey // when building the final per-node view. @@ -143,15 +170,27 @@ type ClockSkewEngine struct { observerOffsets map[string]float64 // observerID → calibrated offset (seconds) observerSamples map[string]int // observerID → number of multi-observer packets used nodeSkew txSkewResult + hashEvidence map[string][]hashEvidenceEntry // hash → per-observer raw/corrected data lastComputed time.Time computeInterval time.Duration } +// hashEvidenceEntry stores raw evidence per observer per hash, cached during Recompute. +type hashEvidenceEntry struct { + observerID string + rawSkew float64 + corrected float64 + offset float64 + calibrated bool + observedTS int64 +} + func NewClockSkewEngine() *ClockSkewEngine { return &ClockSkewEngine{ observerOffsets: make(map[string]float64), observerSamples: make(map[string]int), nodeSkew: make(txSkewResult), + hashEvidence: make(map[string][]hashEvidenceEntry), computeInterval: 30 * time.Second, } } @@ -176,14 +215,16 @@ func (e *ClockSkewEngine) Recompute(store *PacketStore) { var newOffsets map[string]float64 var newSamples map[string]int var newNodeSkew txSkewResult + var newHashEvidence map[string][]hashEvidenceEntry if len(samples) > 0 { newOffsets, newSamples = calibrateObservers(samples) - newNodeSkew = computeNodeSkew(samples, newOffsets) + newNodeSkew, newHashEvidence = computeNodeSkew(samples, newOffsets) } else { newOffsets = make(map[string]float64) newSamples = make(map[string]int) newNodeSkew = make(txSkewResult) + newHashEvidence = make(map[string][]hashEvidenceEntry) } // Swap results under brief write lock. @@ -196,6 +237,7 @@ func (e *ClockSkewEngine) Recompute(store *PacketStore) { e.observerOffsets = newOffsets e.observerSamples = newSamples e.nodeSkew = newNodeSkew + e.hashEvidence = newHashEvidence e.lastComputed = time.Now() e.mu.Unlock() } @@ -332,7 +374,7 @@ func calibrateObservers(samples []skewSample) (map[string]float64, map[string]in // ── Phase 3: Per-Node Skew ───────────────────────────────────────────────────── // computeNodeSkew calculates corrected skew statistics for each node. -func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) txSkewResult { +func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) (txSkewResult, map[string][]hashEvidenceEntry) { // Compute corrected skew per sample, grouped by hash (each hash = one // node's advert transmission). The caller maps hash → pubkey via byNode. type correctedSample struct { @@ -343,6 +385,7 @@ func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) txSkew byHash := make(map[string][]correctedSample) hashAdvertTS := make(map[string]int64) + evidence := make(map[string][]hashEvidenceEntry) // hash → per-observer evidence for _, s := range samples { obsOffset, hasCal := obsOffsets[s.observerID] @@ -359,6 +402,14 @@ func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) txSkew calibrated: hasCal, }) hashAdvertTS[s.hash] = s.advertTS + evidence[s.hash] = append(evidence[s.hash], hashEvidenceEntry{ + observerID: s.observerID, + rawSkew: round(rawSkew, 1), + corrected: round(corrected, 1), + offset: round(obsOffset, 1), + calibrated: hasCal, + observedTS: s.observedTS, + }) } // Each hash represents one advert from one node. Compute median corrected @@ -397,7 +448,7 @@ func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) txSkew LastObservedTS: latestObsTS, } } - return result + return result, evidence } // ── Integration with PacketStore ─────────────────────────────────────────────── @@ -558,6 +609,70 @@ func (s *PacketStore) getNodeClockSkewLocked(pubkey string) *NodeClockSkew { samples[i] = SkewSample{Timestamp: p.ts, SkewSec: round(p.skew, 1)} } + // Build per-hash evidence (most recent 10 hashes with ≥1 observer). + // Observer name lookup from store observations. + obsNameMap := make(map[string]string) + type hashMeta struct { + hash string + ts int64 + } + var evidenceHashes []hashMeta + for _, tx := range txs { + if tx.PayloadType == nil || *tx.PayloadType != PayloadADVERT { + continue + } + ev, ok := s.clockSkew.hashEvidence[tx.Hash] + if !ok || len(ev) == 0 { + continue + } + // Collect observer names from tx observations. + for _, obs := range tx.Observations { + if obs.ObserverID != "" && obs.ObserverName != "" { + obsNameMap[obs.ObserverID] = obs.ObserverName + } + } + evidenceHashes = append(evidenceHashes, hashMeta{hash: tx.Hash, ts: ev[0].observedTS}) + } + // Sort by timestamp descending, take most recent 10. + sort.Slice(evidenceHashes, func(i, j int) bool { return evidenceHashes[i].ts > evidenceHashes[j].ts }) + if len(evidenceHashes) > 10 { + evidenceHashes = evidenceHashes[:10] + } + var recentEvidence []HashEvidence + var calSummary CalibrationSummary + for _, eh := range evidenceHashes { + entries := s.clockSkew.hashEvidence[eh.hash] + var observers []HashEvidenceObserver + var corrSkews []float64 + for _, e := range entries { + name := obsNameMap[e.observerID] + if name == "" { + name = e.observerID + } + observers = append(observers, HashEvidenceObserver{ + ObserverID: e.observerID, + ObserverName: name, + RawSkewSec: e.rawSkew, + CorrectedSkewSec: e.corrected, + ObserverOffsetSec: e.offset, + Calibrated: e.calibrated, + }) + corrSkews = append(corrSkews, e.corrected) + calSummary.TotalSamples++ + if e.calibrated { + calSummary.CalibratedSamples++ + } else { + calSummary.UncalibratedSamples++ + } + } + recentEvidence = append(recentEvidence, HashEvidence{ + Hash: eh.hash, + Observers: observers, + MedianCorrectedSkewSec: round(median(corrSkews), 1), + Timestamp: eh.ts, + }) + } + return &NodeClockSkew{ Pubkey: pubkey, MeanSkewSec: round(meanSkew, 1), @@ -574,6 +689,8 @@ func (s *PacketStore) getNodeClockSkewLocked(pubkey string) *NodeClockSkew { GoodFraction: round(goodFraction, 2), RecentBadSampleCount: recentBadCount, RecentSampleCount: recentSampleCount, + RecentHashEvidence: recentEvidence, + CalibrationSummary: &calSummary, } } @@ -601,8 +718,10 @@ func (s *PacketStore) GetFleetClockSkew() []*NodeClockSkew { cs.NodeName = ni.Name cs.NodeRole = ni.Role } - // Omit samples in fleet response (too much data). + // Omit samples and evidence in fleet response (too much data). cs.Samples = nil + cs.RecentHashEvidence = nil + cs.CalibrationSummary = nil results = append(results, cs) } return results diff --git a/cmd/server/clock_skew_test.go b/cmd/server/clock_skew_test.go index 4fb79bb..e64de32 100644 --- a/cmd/server/clock_skew_test.go +++ b/cmd/server/clock_skew_test.go @@ -191,7 +191,7 @@ func TestComputeNodeSkew_BasicCorrection(t *testing.T) { // So the median approach finds obs2 is +5 ahead (relative to median) // Now compute node skew with those offsets: - nodeSkew := computeNodeSkew(samples, offsets) + nodeSkew, _ := computeNodeSkew(samples, offsets) cs, ok := nodeSkew["h1"] if !ok { t.Fatal("expected skew data for hash h1") @@ -220,7 +220,7 @@ func TestComputeNodeSkew_ThreeObservers(t *testing.T) { t.Errorf("obs3 offset = %v, want 30", offsets["obs3"]) } - nodeSkew := computeNodeSkew(samples, offsets) + nodeSkew, _ := computeNodeSkew(samples, offsets) cs := nodeSkew["h1"] if cs == nil { t.Fatal("expected skew data for h1") @@ -954,3 +954,104 @@ func TestAllGood_OK_845(t *testing.T) { t.Errorf("recentBadSampleCount = %v, want 0", r.RecentBadSampleCount) } } + +func TestNodeClockSkew_EvidencePayload(t *testing.T) { + // 3-observer scenario: obs1 ahead by +2s, obs2 on time, obs3 behind by -1s. + // Node clock is 60s ahead. Raw skew = advertTS - obsTS. + // Hash has 3 observations, each observer sees same advert. + ps := NewPacketStore(nil, nil) + + pt := 4 // ADVERT + // Advert timestamp: 1700000060 (node 60s ahead of true time 1700000000) + // obs1 sees at 1700000002 (2s ahead of true time) → raw = 60 - 2 = 58 + // obs2 sees at 1700000000 (on time) → raw = 60 - 0 = 60 + // obs3 sees at 1699999999 (-1s, behind) → raw = 60 + 1 = 61 + // Median obsTS = 1700000000, so: + // obs1 offset = 1700000002 - 1700000000 = +2 + // obs2 offset = 0 + // obs3 offset = 1699999999 - 1700000000 = -1 + // Corrected: raw + offset → obs1: 58+2=60, obs2: 60+0=60, obs3: 61+(-1)=60 + + tx1 := &StoreTx{ + Hash: "evidence_hash_1", + PayloadType: &pt, + DecodedJSON: `{"payload":{"timestamp":1700000060}}`, + Observations: []*StoreObs{ + {ObserverID: "obs1", ObserverName: "Observer Alpha", Timestamp: "2023-11-14T22:13:22Z"}, + {ObserverID: "obs2", ObserverName: "Observer Beta", Timestamp: "2023-11-14T22:13:20Z"}, + {ObserverID: "obs3", ObserverName: "Observer Gamma", Timestamp: "2023-11-14T22:13:19Z"}, + }, + } + // Second hash to ensure we get multiple evidence entries. + tx2 := &StoreTx{ + Hash: "evidence_hash_2", + PayloadType: &pt, + DecodedJSON: `{"payload":{"timestamp":1700003660}}`, + Observations: []*StoreObs{ + {ObserverID: "obs1", ObserverName: "Observer Alpha", Timestamp: "2023-11-14T23:13:22Z"}, + {ObserverID: "obs2", ObserverName: "Observer Beta", Timestamp: "2023-11-14T23:13:20Z"}, + {ObserverID: "obs3", ObserverName: "Observer Gamma", Timestamp: "2023-11-14T23:13:19Z"}, + }, + } + + ps.mu.Lock() + ps.byNode["NODETEST"] = []*StoreTx{tx1, tx2} + ps.byPayloadType[4] = []*StoreTx{tx1, tx2} + ps.clockSkew.computeInterval = 0 + ps.mu.Unlock() + + r := ps.GetNodeClockSkew("NODETEST") + if r == nil { + t.Fatal("expected clock skew result") + } + + // Check recentHashEvidence exists. + if len(r.RecentHashEvidence) == 0 { + t.Fatal("expected recentHashEvidence to be populated") + } + if len(r.RecentHashEvidence) != 2 { + t.Errorf("recentHashEvidence length = %d, want 2", len(r.RecentHashEvidence)) + } + + // Check first evidence entry has 3 observers. + ev := r.RecentHashEvidence[0] + if len(ev.Observers) != 3 { + t.Fatalf("evidence observers = %d, want 3", len(ev.Observers)) + } + + // Verify corrected = raw + offset for each observer. + for _, o := range ev.Observers { + expected := o.RawSkewSec + o.ObserverOffsetSec + if math.Abs(o.CorrectedSkewSec-expected) > 0.2 { + t.Errorf("observer %s: corrected=%.1f, expected raw(%.1f)+offset(%.1f)=%.1f", + o.ObserverID, o.CorrectedSkewSec, o.RawSkewSec, o.ObserverOffsetSec, expected) + } + } + + // All corrected values should be ~60s (node is 60s ahead). + if math.Abs(ev.MedianCorrectedSkewSec-60) > 1 { + t.Errorf("median corrected = %.1f, want ~60", ev.MedianCorrectedSkewSec) + } + + // Check calibration summary. + if r.CalibrationSummary == nil { + t.Fatal("expected calibrationSummary") + } + if r.CalibrationSummary.TotalSamples != 6 { // 3 observers × 2 hashes + t.Errorf("calibration total = %d, want 6", r.CalibrationSummary.TotalSamples) + } + if r.CalibrationSummary.CalibratedSamples != 6 { + t.Errorf("calibrated = %d, want 6 (all multi-observer)", r.CalibrationSummary.CalibratedSamples) + } + + // Check observer names are populated. + nameFound := false + for _, o := range ev.Observers { + if o.ObserverName == "Observer Alpha" || o.ObserverName == "Observer Beta" { + nameFound = true + } + } + if !nameFound { + t.Error("expected observer names to be populated from tx observations") + } +}