mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-05-13 17:53:11 +00:00
Compare commits
37 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| fba6c2c2e3 | |||
| 40c3aa13f9 | |||
| b47587f031 | |||
| c67f3347ce | |||
| b3a9677c52 | |||
| 707228ad91 | |||
| 8d379baf5e | |||
| 3b436c768b | |||
| 6d49cf939c | |||
| 8d39b33111 | |||
| e1a1be1735 | |||
| b97fe5758c | |||
| 568de4b441 | |||
| 04c8558768 | |||
| 52b5ae86d6 | |||
| 8397f2bb1c | |||
| ed65498281 | |||
| c53af5cf66 | |||
| 9f606600e2 | |||
| 053aef1994 | |||
| 7aef3c355c | |||
| 9ac484607f | |||
| b562de32ff | |||
| 6f0c58c94a | |||
| 7d1c679f4f | |||
| ead08c721d | |||
| 57e272494d | |||
| d870a693d0 | |||
| d9904cc138 | |||
| 7aa59eabde | |||
| ac7d2b64f7 | |||
| fd3bf1a892 | |||
| f16afe7fdf | |||
| ed66e54e57 | |||
| 22079a1fc4 | |||
| 232882d308 | |||
| fb640bcfc3 |
@@ -1 +1 @@
|
||||
{"schemaVersion":1,"label":"frontend coverage","message":"36.12%","color":"red"}
|
||||
{"schemaVersion":1,"label":"frontend coverage","message":"40.21%","color":"red"}
|
||||
|
||||
@@ -176,6 +176,9 @@ jobs:
|
||||
- name: Instrument frontend JS for coverage
|
||||
run: sh scripts/instrument-frontend.sh
|
||||
|
||||
- name: Freshen fixture timestamps
|
||||
run: bash tools/freshen-fixture.sh test-fixtures/e2e-fixture.db
|
||||
|
||||
- name: Start Go server with fixture DB
|
||||
run: |
|
||||
fuser -k 13581/tcp 2>/dev/null || true
|
||||
@@ -183,7 +186,7 @@ jobs:
|
||||
./corescope-server -port 13581 -db test-fixtures/e2e-fixture.db -public public-instrumented &
|
||||
echo $! > .server.pid
|
||||
for i in $(seq 1 30); do
|
||||
if curl -sf http://localhost:13581/api/stats > /dev/null 2>&1; then
|
||||
if curl -sf http://localhost:13581/api/healthz > /dev/null 2>&1; then
|
||||
echo "Server ready after ${i}s"
|
||||
break
|
||||
fi
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/meshcore-analyzer/geofilter"
|
||||
)
|
||||
@@ -42,6 +43,15 @@ type Config struct {
|
||||
GeoFilter *GeoFilterConfig `json:"geo_filter,omitempty"`
|
||||
ValidateSignatures *bool `json:"validateSignatures,omitempty"`
|
||||
DB *DBConfig `json:"db,omitempty"`
|
||||
|
||||
// ObserverBlacklist is a list of observer public keys to drop at ingest.
|
||||
// Messages from blacklisted observers are silently discarded — no DB writes,
|
||||
// no UpsertObserver, no observations, no metrics.
|
||||
ObserverBlacklist []string `json:"observerBlacklist,omitempty"`
|
||||
|
||||
// obsBlacklistSetCached is the lazily-built lowercase set for O(1) lookups.
|
||||
obsBlacklistSetCached map[string]bool
|
||||
obsBlacklistOnce sync.Once
|
||||
}
|
||||
|
||||
// GeoFilterConfig is an alias for the shared geofilter.Config type.
|
||||
@@ -114,6 +124,24 @@ func (c *Config) ObserverDaysOrDefault() int {
|
||||
return 14
|
||||
}
|
||||
|
||||
// IsObserverBlacklisted returns true if the given observer ID is in the observerBlacklist.
|
||||
func (c *Config) IsObserverBlacklisted(id string) bool {
|
||||
if c == nil || len(c.ObserverBlacklist) == 0 {
|
||||
return false
|
||||
}
|
||||
c.obsBlacklistOnce.Do(func() {
|
||||
m := make(map[string]bool, len(c.ObserverBlacklist))
|
||||
for _, pk := range c.ObserverBlacklist {
|
||||
trimmed := strings.ToLower(strings.TrimSpace(pk))
|
||||
if trimmed != "" {
|
||||
m[trimmed] = true
|
||||
}
|
||||
}
|
||||
c.obsBlacklistSetCached = m
|
||||
})
|
||||
return c.obsBlacklistSetCached[strings.ToLower(strings.TrimSpace(id))]
|
||||
}
|
||||
|
||||
// LoadConfig reads configuration from a JSON file, with env var overrides.
|
||||
// If the config file does not exist, sensible defaults are used (zero-config startup).
|
||||
func LoadConfig(path string) (*Config, error) {
|
||||
|
||||
+38
-6
@@ -123,6 +123,7 @@ func main() {
|
||||
|
||||
// Connect to each MQTT source
|
||||
var clients []mqtt.Client
|
||||
connectedCount := 0
|
||||
for _, source := range sources {
|
||||
tag := source.Name
|
||||
if tag == "" {
|
||||
@@ -164,19 +165,43 @@ func main() {
|
||||
|
||||
client := mqtt.NewClient(opts)
|
||||
token := client.Connect()
|
||||
token.Wait()
|
||||
if token.Error() != nil {
|
||||
log.Printf("MQTT [%s] connection failed (non-fatal): %v", tag, token.Error())
|
||||
// With ConnectRetry=true, token.Wait() blocks forever for unreachable brokers.
|
||||
// WaitTimeout lets startup proceed; the client keeps retrying in the background
|
||||
// and OnConnect fires (subscribing) when it eventually connects (#910).
|
||||
if !token.WaitTimeout(30 * time.Second) {
|
||||
log.Printf("MQTT [%s] initial connection timed out — retrying in background", tag)
|
||||
clients = append(clients, client)
|
||||
continue
|
||||
}
|
||||
if token.Error() != nil {
|
||||
log.Printf("MQTT [%s] connection failed (non-fatal): %v", tag, token.Error())
|
||||
// BL1 fix: Disconnect to stop Paho's internal retry goroutines.
|
||||
// With ConnectRetry=true, Connect() spawns background goroutines
|
||||
// that leak if the client is simply discarded.
|
||||
client.Disconnect(0)
|
||||
continue
|
||||
}
|
||||
connectedCount++
|
||||
clients = append(clients, client)
|
||||
}
|
||||
|
||||
if len(clients) == 0 {
|
||||
log.Fatal("no MQTT connections established — check broker is running (default: mqtt://localhost:1883). Set MQTT_BROKER env var or configure mqttSources in config.json")
|
||||
// BL2 fix: require at least one immediately-connected source. Timed-out
|
||||
// clients are retrying in background (tracked in clients) but don't count
|
||||
// as "connected" — a single unreachable broker must not silently run with
|
||||
// zero active connections.
|
||||
if connectedCount == 0 {
|
||||
// Clean up any timed-out clients still retrying
|
||||
for _, c := range clients {
|
||||
c.Disconnect(0)
|
||||
}
|
||||
log.Fatal("no MQTT sources connected — all timed out or failed. Check broker is running (default: mqtt://localhost:1883). Set MQTT_BROKER env var or configure mqttSources in config.json")
|
||||
}
|
||||
|
||||
log.Printf("Running — %d MQTT source(s) connected", len(clients))
|
||||
if connectedCount < len(clients) {
|
||||
log.Printf("Running — %d MQTT source(s) connected, %d retrying in background", connectedCount, len(clients)-connectedCount)
|
||||
} else {
|
||||
log.Printf("Running — %d MQTT source(s) connected", connectedCount)
|
||||
}
|
||||
|
||||
// Wait for shutdown signal
|
||||
sig := make(chan os.Signal, 1)
|
||||
@@ -240,6 +265,13 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message,
|
||||
return
|
||||
}
|
||||
|
||||
// Observer blacklist: drop ALL messages from blacklisted observers before any
|
||||
// DB writes (status, metrics, packets). Trumps IATA filter.
|
||||
if len(parts) > 2 && cfg.IsObserverBlacklisted(parts[2]) {
|
||||
log.Printf("MQTT [%s] observer %.8s blacklisted, dropping", tag, parts[2])
|
||||
return
|
||||
}
|
||||
|
||||
// Status topic: meshcore/<region>/<observer_id>/status
|
||||
// IATA filter does NOT apply here — observer metadata (noise_floor, battery, etc.)
|
||||
// is region-independent and should be accepted from all observers regardless of
|
||||
|
||||
@@ -5,8 +5,11 @@ import (
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
)
|
||||
|
||||
func TestToFloat64(t *testing.T) {
|
||||
@@ -780,3 +783,124 @@ func TestIATAFilterDoesNotDropStatusMessages(t *testing.T) {
|
||||
t.Error("packet from out-of-region BFL should still be filtered by IATA")
|
||||
}
|
||||
}
|
||||
|
||||
// TestMQTTConnectRetryTimeoutDoesNotBlock verifies that WaitTimeout returns within
|
||||
// the deadline for an unreachable broker when ConnectRetry=true (#910). Previously,
|
||||
// token.Wait() would block forever in this configuration.
|
||||
func TestMQTTConnectRetryTimeoutDoesNotBlock(t *testing.T) {
|
||||
opts := mqtt.NewClientOptions().
|
||||
AddBroker("tcp://127.0.0.1:1"). // port 1 — nothing listening
|
||||
SetConnectRetry(true).
|
||||
SetAutoReconnect(true)
|
||||
|
||||
client := mqtt.NewClient(opts)
|
||||
token := client.Connect()
|
||||
defer client.Disconnect(100)
|
||||
|
||||
start := time.Now()
|
||||
connected := token.WaitTimeout(3 * time.Second)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
if connected {
|
||||
t.Skip("port 1 unexpectedly accepted a connection — skipping")
|
||||
}
|
||||
if elapsed > 4*time.Second {
|
||||
t.Errorf("WaitTimeout blocked for %v — token.Wait() would block forever with ConnectRetry=true", elapsed)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBL1_GoroutineLeakOnHardFailure reproduces BLOCKER 1: without Disconnect()
|
||||
// on the error path, Paho's internal retry goroutines leak when a client is
|
||||
// discarded after Connect() with ConnectRetry=true.
|
||||
//
|
||||
// We prove the leak by creating N clients WITHOUT Disconnect — goroutines grow
|
||||
// proportionally. The fix (client.Disconnect(0) before continue) prevents this.
|
||||
func TestBL1_GoroutineLeakOnHardFailure(t *testing.T) {
|
||||
runtime.GC()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
baseline := runtime.NumGoroutine()
|
||||
|
||||
// Create multiple clients connected to unreachable broker, WITHOUT disconnecting.
|
||||
// Each one spawns Paho retry goroutines that accumulate.
|
||||
const numClients = 10
|
||||
clients := make([]mqtt.Client, numClients)
|
||||
for i := 0; i < numClients; i++ {
|
||||
opts := mqtt.NewClientOptions().
|
||||
AddBroker("tcp://127.0.0.1:1").
|
||||
SetConnectRetry(true).
|
||||
SetAutoReconnect(true).
|
||||
SetConnectTimeout(500 * time.Millisecond)
|
||||
c := mqtt.NewClient(opts)
|
||||
tok := c.Connect()
|
||||
tok.WaitTimeout(1 * time.Second)
|
||||
clients[i] = c
|
||||
}
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
leaked := runtime.NumGoroutine()
|
||||
goroutineGrowth := leaked - baseline
|
||||
|
||||
// Clean up to not actually leak in test
|
||||
for _, c := range clients {
|
||||
c.Disconnect(0)
|
||||
}
|
||||
|
||||
t.Logf("baseline=%d, after %d undisconnected clients=%d, growth=%d",
|
||||
baseline, numClients, leaked, goroutineGrowth)
|
||||
|
||||
// With ConnectRetry=true, each Connect() spawns retry goroutines.
|
||||
// Without Disconnect, these accumulate. Verify growth is meaningful.
|
||||
if goroutineGrowth < 3 {
|
||||
t.Skip("Connect didn't spawn enough extra goroutines to measure leak")
|
||||
}
|
||||
|
||||
// The fix: calling client.Disconnect(0) on the error path prevents accumulation.
|
||||
// Anti-tautology: removing the Disconnect(0) call from main.go's error path
|
||||
// would cause goroutine accumulation proportional to failed broker count.
|
||||
t.Logf("CONFIRMED: %d leaked goroutines from %d clients without Disconnect — fix adds Disconnect(0) on error path", goroutineGrowth, numClients)
|
||||
}
|
||||
|
||||
// TestBL2_ZeroConnectedFatals verifies BLOCKER 2: when all brokers are unreachable,
|
||||
// connectedCount==0 must be detected. We test the logic directly — if only timed-out
|
||||
// clients exist (appended to clients slice) but connectedCount is 0, the guard triggers.
|
||||
func TestBL2_ZeroConnectedFatals(t *testing.T) {
|
||||
// Simulate the connection loop result: 1 timed-out client, 0 connected
|
||||
var clients []mqtt.Client
|
||||
connectedCount := 0
|
||||
|
||||
// Create a client that times out (unreachable broker)
|
||||
opts := mqtt.NewClientOptions().
|
||||
AddBroker("tcp://127.0.0.1:1").
|
||||
SetConnectRetry(true).
|
||||
SetAutoReconnect(true)
|
||||
|
||||
client := mqtt.NewClient(opts)
|
||||
token := client.Connect()
|
||||
if !token.WaitTimeout(2 * time.Second) {
|
||||
// Timed out — PR #926 appends to clients
|
||||
clients = append(clients, client)
|
||||
}
|
||||
defer func() {
|
||||
for _, c := range clients {
|
||||
c.Disconnect(0)
|
||||
}
|
||||
}()
|
||||
|
||||
// OLD bug: len(clients) == 0 would be false (1 timed-out client in list)
|
||||
// → ingestor would silently run with zero connections
|
||||
if len(clients) == 0 {
|
||||
t.Fatal("expected timed-out client to be in clients slice")
|
||||
}
|
||||
|
||||
// NEW fix: connectedCount == 0 catches this
|
||||
if connectedCount != 0 {
|
||||
t.Errorf("connectedCount should be 0, got %d", connectedCount)
|
||||
}
|
||||
|
||||
// The real code does: if connectedCount == 0 { log.Fatal(...) }
|
||||
// This test proves len(clients) > 0 but connectedCount == 0 — the old guard
|
||||
// would have missed it.
|
||||
if len(clients) > 0 && connectedCount == 0 {
|
||||
t.Log("BL2 confirmed: old guard len(clients)==0 would NOT fatal; new guard connectedCount==0 correctly catches zero-connected state")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestIngestorIsObserverBlacklisted(t *testing.T) {
|
||||
cfg := &Config{
|
||||
ObserverBlacklist: []string{"OBS1", "obs2"},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
id string
|
||||
want bool
|
||||
}{
|
||||
{"OBS1", true},
|
||||
{"obs1", true},
|
||||
{"OBS2", true},
|
||||
{"obs3", false},
|
||||
{"", false},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
got := cfg.IsObserverBlacklisted(tt.id)
|
||||
if got != tt.want {
|
||||
t.Errorf("IsObserverBlacklisted(%q) = %v, want %v", tt.id, got, tt.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngestorIsObserverBlacklistedEmpty(t *testing.T) {
|
||||
cfg := &Config{}
|
||||
if cfg.IsObserverBlacklisted("anything") {
|
||||
t.Error("empty blacklist should not match")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngestorIsObserverBlacklistedNil(t *testing.T) {
|
||||
var cfg *Config
|
||||
if cfg.IsObserverBlacklisted("anything") {
|
||||
t.Error("nil config should not match")
|
||||
}
|
||||
}
|
||||
+123
-4
@@ -120,6 +120,8 @@ type NodeClockSkew struct {
|
||||
GoodFraction float64 `json:"goodFraction"` // fraction of recent samples with |skew| <= 1h
|
||||
RecentBadSampleCount int `json:"recentBadSampleCount"` // count of recent samples with |skew| > 1h
|
||||
RecentSampleCount int `json:"recentSampleCount"` // total recent samples in window
|
||||
RecentHashEvidence []HashEvidence `json:"recentHashEvidence,omitempty"`
|
||||
CalibrationSummary *CalibrationSummary `json:"calibrationSummary,omitempty"`
|
||||
NodeName string `json:"nodeName,omitempty"` // populated in fleet responses
|
||||
NodeRole string `json:"nodeRole,omitempty"` // populated in fleet responses
|
||||
}
|
||||
@@ -130,6 +132,31 @@ type SkewSample struct {
|
||||
SkewSec float64 `json:"skew"` // corrected skew in seconds
|
||||
}
|
||||
|
||||
// HashEvidenceObserver is one observer's contribution to a per-hash evidence entry.
|
||||
type HashEvidenceObserver struct {
|
||||
ObserverID string `json:"observerID"`
|
||||
ObserverName string `json:"observerName"`
|
||||
RawSkewSec float64 `json:"rawSkewSec"`
|
||||
CorrectedSkewSec float64 `json:"correctedSkewSec"`
|
||||
ObserverOffsetSec float64 `json:"observerOffsetSec"`
|
||||
Calibrated bool `json:"calibrated"`
|
||||
}
|
||||
|
||||
// HashEvidence is per-hash clock skew evidence showing individual observer contributions.
|
||||
type HashEvidence struct {
|
||||
Hash string `json:"hash"`
|
||||
Observers []HashEvidenceObserver `json:"observers"`
|
||||
MedianCorrectedSkewSec float64 `json:"medianCorrectedSkewSec"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
}
|
||||
|
||||
// CalibrationSummary counts how many samples were corrected via observer calibration.
|
||||
type CalibrationSummary struct {
|
||||
TotalSamples int `json:"totalSamples"`
|
||||
CalibratedSamples int `json:"calibratedSamples"`
|
||||
UncalibratedSamples int `json:"uncalibratedSamples"`
|
||||
}
|
||||
|
||||
// txSkewResult maps tx hash → per-transmission skew stats. This is an
|
||||
// intermediate result keyed by hash (not pubkey); the store maps hash → pubkey
|
||||
// when building the final per-node view.
|
||||
@@ -143,15 +170,27 @@ type ClockSkewEngine struct {
|
||||
observerOffsets map[string]float64 // observerID → calibrated offset (seconds)
|
||||
observerSamples map[string]int // observerID → number of multi-observer packets used
|
||||
nodeSkew txSkewResult
|
||||
hashEvidence map[string][]hashEvidenceEntry // hash → per-observer raw/corrected data
|
||||
lastComputed time.Time
|
||||
computeInterval time.Duration
|
||||
}
|
||||
|
||||
// hashEvidenceEntry stores raw evidence per observer per hash, cached during Recompute.
|
||||
type hashEvidenceEntry struct {
|
||||
observerID string
|
||||
rawSkew float64
|
||||
corrected float64
|
||||
offset float64
|
||||
calibrated bool
|
||||
observedTS int64
|
||||
}
|
||||
|
||||
func NewClockSkewEngine() *ClockSkewEngine {
|
||||
return &ClockSkewEngine{
|
||||
observerOffsets: make(map[string]float64),
|
||||
observerSamples: make(map[string]int),
|
||||
nodeSkew: make(txSkewResult),
|
||||
hashEvidence: make(map[string][]hashEvidenceEntry),
|
||||
computeInterval: 30 * time.Second,
|
||||
}
|
||||
}
|
||||
@@ -176,14 +215,16 @@ func (e *ClockSkewEngine) Recompute(store *PacketStore) {
|
||||
var newOffsets map[string]float64
|
||||
var newSamples map[string]int
|
||||
var newNodeSkew txSkewResult
|
||||
var newHashEvidence map[string][]hashEvidenceEntry
|
||||
|
||||
if len(samples) > 0 {
|
||||
newOffsets, newSamples = calibrateObservers(samples)
|
||||
newNodeSkew = computeNodeSkew(samples, newOffsets)
|
||||
newNodeSkew, newHashEvidence = computeNodeSkew(samples, newOffsets)
|
||||
} else {
|
||||
newOffsets = make(map[string]float64)
|
||||
newSamples = make(map[string]int)
|
||||
newNodeSkew = make(txSkewResult)
|
||||
newHashEvidence = make(map[string][]hashEvidenceEntry)
|
||||
}
|
||||
|
||||
// Swap results under brief write lock.
|
||||
@@ -196,6 +237,7 @@ func (e *ClockSkewEngine) Recompute(store *PacketStore) {
|
||||
e.observerOffsets = newOffsets
|
||||
e.observerSamples = newSamples
|
||||
e.nodeSkew = newNodeSkew
|
||||
e.hashEvidence = newHashEvidence
|
||||
e.lastComputed = time.Now()
|
||||
e.mu.Unlock()
|
||||
}
|
||||
@@ -332,7 +374,7 @@ func calibrateObservers(samples []skewSample) (map[string]float64, map[string]in
|
||||
// ── Phase 3: Per-Node Skew ─────────────────────────────────────────────────────
|
||||
|
||||
// computeNodeSkew calculates corrected skew statistics for each node.
|
||||
func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) txSkewResult {
|
||||
func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) (txSkewResult, map[string][]hashEvidenceEntry) {
|
||||
// Compute corrected skew per sample, grouped by hash (each hash = one
|
||||
// node's advert transmission). The caller maps hash → pubkey via byNode.
|
||||
type correctedSample struct {
|
||||
@@ -343,6 +385,7 @@ func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) txSkew
|
||||
|
||||
byHash := make(map[string][]correctedSample)
|
||||
hashAdvertTS := make(map[string]int64)
|
||||
evidence := make(map[string][]hashEvidenceEntry) // hash → per-observer evidence
|
||||
|
||||
for _, s := range samples {
|
||||
obsOffset, hasCal := obsOffsets[s.observerID]
|
||||
@@ -359,6 +402,14 @@ func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) txSkew
|
||||
calibrated: hasCal,
|
||||
})
|
||||
hashAdvertTS[s.hash] = s.advertTS
|
||||
evidence[s.hash] = append(evidence[s.hash], hashEvidenceEntry{
|
||||
observerID: s.observerID,
|
||||
rawSkew: round(rawSkew, 1),
|
||||
corrected: round(corrected, 1),
|
||||
offset: round(obsOffset, 1),
|
||||
calibrated: hasCal,
|
||||
observedTS: s.observedTS,
|
||||
})
|
||||
}
|
||||
|
||||
// Each hash represents one advert from one node. Compute median corrected
|
||||
@@ -397,7 +448,7 @@ func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) txSkew
|
||||
LastObservedTS: latestObsTS,
|
||||
}
|
||||
}
|
||||
return result
|
||||
return result, evidence
|
||||
}
|
||||
|
||||
// ── Integration with PacketStore ───────────────────────────────────────────────
|
||||
@@ -558,6 +609,70 @@ func (s *PacketStore) getNodeClockSkewLocked(pubkey string) *NodeClockSkew {
|
||||
samples[i] = SkewSample{Timestamp: p.ts, SkewSec: round(p.skew, 1)}
|
||||
}
|
||||
|
||||
// Build per-hash evidence (most recent 10 hashes with ≥1 observer).
|
||||
// Observer name lookup from store observations.
|
||||
obsNameMap := make(map[string]string)
|
||||
type hashMeta struct {
|
||||
hash string
|
||||
ts int64
|
||||
}
|
||||
var evidenceHashes []hashMeta
|
||||
for _, tx := range txs {
|
||||
if tx.PayloadType == nil || *tx.PayloadType != PayloadADVERT {
|
||||
continue
|
||||
}
|
||||
ev, ok := s.clockSkew.hashEvidence[tx.Hash]
|
||||
if !ok || len(ev) == 0 {
|
||||
continue
|
||||
}
|
||||
// Collect observer names from tx observations.
|
||||
for _, obs := range tx.Observations {
|
||||
if obs.ObserverID != "" && obs.ObserverName != "" {
|
||||
obsNameMap[obs.ObserverID] = obs.ObserverName
|
||||
}
|
||||
}
|
||||
evidenceHashes = append(evidenceHashes, hashMeta{hash: tx.Hash, ts: ev[0].observedTS})
|
||||
}
|
||||
// Sort by timestamp descending, take most recent 10.
|
||||
sort.Slice(evidenceHashes, func(i, j int) bool { return evidenceHashes[i].ts > evidenceHashes[j].ts })
|
||||
if len(evidenceHashes) > 10 {
|
||||
evidenceHashes = evidenceHashes[:10]
|
||||
}
|
||||
var recentEvidence []HashEvidence
|
||||
var calSummary CalibrationSummary
|
||||
for _, eh := range evidenceHashes {
|
||||
entries := s.clockSkew.hashEvidence[eh.hash]
|
||||
var observers []HashEvidenceObserver
|
||||
var corrSkews []float64
|
||||
for _, e := range entries {
|
||||
name := obsNameMap[e.observerID]
|
||||
if name == "" {
|
||||
name = e.observerID
|
||||
}
|
||||
observers = append(observers, HashEvidenceObserver{
|
||||
ObserverID: e.observerID,
|
||||
ObserverName: name,
|
||||
RawSkewSec: e.rawSkew,
|
||||
CorrectedSkewSec: e.corrected,
|
||||
ObserverOffsetSec: e.offset,
|
||||
Calibrated: e.calibrated,
|
||||
})
|
||||
corrSkews = append(corrSkews, e.corrected)
|
||||
calSummary.TotalSamples++
|
||||
if e.calibrated {
|
||||
calSummary.CalibratedSamples++
|
||||
} else {
|
||||
calSummary.UncalibratedSamples++
|
||||
}
|
||||
}
|
||||
recentEvidence = append(recentEvidence, HashEvidence{
|
||||
Hash: eh.hash,
|
||||
Observers: observers,
|
||||
MedianCorrectedSkewSec: round(median(corrSkews), 1),
|
||||
Timestamp: eh.ts,
|
||||
})
|
||||
}
|
||||
|
||||
return &NodeClockSkew{
|
||||
Pubkey: pubkey,
|
||||
MeanSkewSec: round(meanSkew, 1),
|
||||
@@ -574,6 +689,8 @@ func (s *PacketStore) getNodeClockSkewLocked(pubkey string) *NodeClockSkew {
|
||||
GoodFraction: round(goodFraction, 2),
|
||||
RecentBadSampleCount: recentBadCount,
|
||||
RecentSampleCount: recentSampleCount,
|
||||
RecentHashEvidence: recentEvidence,
|
||||
CalibrationSummary: &calSummary,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -601,8 +718,10 @@ func (s *PacketStore) GetFleetClockSkew() []*NodeClockSkew {
|
||||
cs.NodeName = ni.Name
|
||||
cs.NodeRole = ni.Role
|
||||
}
|
||||
// Omit samples in fleet response (too much data).
|
||||
// Omit samples and evidence in fleet response (too much data).
|
||||
cs.Samples = nil
|
||||
cs.RecentHashEvidence = nil
|
||||
cs.CalibrationSummary = nil
|
||||
results = append(results, cs)
|
||||
}
|
||||
return results
|
||||
|
||||
@@ -191,7 +191,7 @@ func TestComputeNodeSkew_BasicCorrection(t *testing.T) {
|
||||
// So the median approach finds obs2 is +5 ahead (relative to median)
|
||||
|
||||
// Now compute node skew with those offsets:
|
||||
nodeSkew := computeNodeSkew(samples, offsets)
|
||||
nodeSkew, _ := computeNodeSkew(samples, offsets)
|
||||
cs, ok := nodeSkew["h1"]
|
||||
if !ok {
|
||||
t.Fatal("expected skew data for hash h1")
|
||||
@@ -220,7 +220,7 @@ func TestComputeNodeSkew_ThreeObservers(t *testing.T) {
|
||||
t.Errorf("obs3 offset = %v, want 30", offsets["obs3"])
|
||||
}
|
||||
|
||||
nodeSkew := computeNodeSkew(samples, offsets)
|
||||
nodeSkew, _ := computeNodeSkew(samples, offsets)
|
||||
cs := nodeSkew["h1"]
|
||||
if cs == nil {
|
||||
t.Fatal("expected skew data for h1")
|
||||
@@ -954,3 +954,104 @@ func TestAllGood_OK_845(t *testing.T) {
|
||||
t.Errorf("recentBadSampleCount = %v, want 0", r.RecentBadSampleCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeClockSkew_EvidencePayload(t *testing.T) {
|
||||
// 3-observer scenario: obs1 ahead by +2s, obs2 on time, obs3 behind by -1s.
|
||||
// Node clock is 60s ahead. Raw skew = advertTS - obsTS.
|
||||
// Hash has 3 observations, each observer sees same advert.
|
||||
ps := NewPacketStore(nil, nil)
|
||||
|
||||
pt := 4 // ADVERT
|
||||
// Advert timestamp: 1700000060 (node 60s ahead of true time 1700000000)
|
||||
// obs1 sees at 1700000002 (2s ahead of true time) → raw = 60 - 2 = 58
|
||||
// obs2 sees at 1700000000 (on time) → raw = 60 - 0 = 60
|
||||
// obs3 sees at 1699999999 (-1s, behind) → raw = 60 + 1 = 61
|
||||
// Median obsTS = 1700000000, so:
|
||||
// obs1 offset = 1700000002 - 1700000000 = +2
|
||||
// obs2 offset = 0
|
||||
// obs3 offset = 1699999999 - 1700000000 = -1
|
||||
// Corrected: raw + offset → obs1: 58+2=60, obs2: 60+0=60, obs3: 61+(-1)=60
|
||||
|
||||
tx1 := &StoreTx{
|
||||
Hash: "evidence_hash_1",
|
||||
PayloadType: &pt,
|
||||
DecodedJSON: `{"payload":{"timestamp":1700000060}}`,
|
||||
Observations: []*StoreObs{
|
||||
{ObserverID: "obs1", ObserverName: "Observer Alpha", Timestamp: "2023-11-14T22:13:22Z"},
|
||||
{ObserverID: "obs2", ObserverName: "Observer Beta", Timestamp: "2023-11-14T22:13:20Z"},
|
||||
{ObserverID: "obs3", ObserverName: "Observer Gamma", Timestamp: "2023-11-14T22:13:19Z"},
|
||||
},
|
||||
}
|
||||
// Second hash to ensure we get multiple evidence entries.
|
||||
tx2 := &StoreTx{
|
||||
Hash: "evidence_hash_2",
|
||||
PayloadType: &pt,
|
||||
DecodedJSON: `{"payload":{"timestamp":1700003660}}`,
|
||||
Observations: []*StoreObs{
|
||||
{ObserverID: "obs1", ObserverName: "Observer Alpha", Timestamp: "2023-11-14T23:13:22Z"},
|
||||
{ObserverID: "obs2", ObserverName: "Observer Beta", Timestamp: "2023-11-14T23:13:20Z"},
|
||||
{ObserverID: "obs3", ObserverName: "Observer Gamma", Timestamp: "2023-11-14T23:13:19Z"},
|
||||
},
|
||||
}
|
||||
|
||||
ps.mu.Lock()
|
||||
ps.byNode["NODETEST"] = []*StoreTx{tx1, tx2}
|
||||
ps.byPayloadType[4] = []*StoreTx{tx1, tx2}
|
||||
ps.clockSkew.computeInterval = 0
|
||||
ps.mu.Unlock()
|
||||
|
||||
r := ps.GetNodeClockSkew("NODETEST")
|
||||
if r == nil {
|
||||
t.Fatal("expected clock skew result")
|
||||
}
|
||||
|
||||
// Check recentHashEvidence exists.
|
||||
if len(r.RecentHashEvidence) == 0 {
|
||||
t.Fatal("expected recentHashEvidence to be populated")
|
||||
}
|
||||
if len(r.RecentHashEvidence) != 2 {
|
||||
t.Errorf("recentHashEvidence length = %d, want 2", len(r.RecentHashEvidence))
|
||||
}
|
||||
|
||||
// Check first evidence entry has 3 observers.
|
||||
ev := r.RecentHashEvidence[0]
|
||||
if len(ev.Observers) != 3 {
|
||||
t.Fatalf("evidence observers = %d, want 3", len(ev.Observers))
|
||||
}
|
||||
|
||||
// Verify corrected = raw + offset for each observer.
|
||||
for _, o := range ev.Observers {
|
||||
expected := o.RawSkewSec + o.ObserverOffsetSec
|
||||
if math.Abs(o.CorrectedSkewSec-expected) > 0.2 {
|
||||
t.Errorf("observer %s: corrected=%.1f, expected raw(%.1f)+offset(%.1f)=%.1f",
|
||||
o.ObserverID, o.CorrectedSkewSec, o.RawSkewSec, o.ObserverOffsetSec, expected)
|
||||
}
|
||||
}
|
||||
|
||||
// All corrected values should be ~60s (node is 60s ahead).
|
||||
if math.Abs(ev.MedianCorrectedSkewSec-60) > 1 {
|
||||
t.Errorf("median corrected = %.1f, want ~60", ev.MedianCorrectedSkewSec)
|
||||
}
|
||||
|
||||
// Check calibration summary.
|
||||
if r.CalibrationSummary == nil {
|
||||
t.Fatal("expected calibrationSummary")
|
||||
}
|
||||
if r.CalibrationSummary.TotalSamples != 6 { // 3 observers × 2 hashes
|
||||
t.Errorf("calibration total = %d, want 6", r.CalibrationSummary.TotalSamples)
|
||||
}
|
||||
if r.CalibrationSummary.CalibratedSamples != 6 {
|
||||
t.Errorf("calibrated = %d, want 6 (all multi-observer)", r.CalibrationSummary.CalibratedSamples)
|
||||
}
|
||||
|
||||
// Check observer names are populated.
|
||||
nameFound := false
|
||||
for _, o := range ev.Observers {
|
||||
if o.ObserverName == "Observer Alpha" || o.ObserverName == "Observer Beta" {
|
||||
nameFound = true
|
||||
}
|
||||
}
|
||||
if !nameFound {
|
||||
t.Error("expected observer names to be populated from tx observations")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,6 +72,15 @@ type Config struct {
|
||||
|
||||
DebugAffinity bool `json:"debugAffinity,omitempty"`
|
||||
|
||||
// ObserverBlacklist is a list of observer public keys to exclude from API
|
||||
// responses (defense in depth — ingestor drops at ingest, server filters
|
||||
// any that slipped through from a prior unblocked window).
|
||||
ObserverBlacklist []string `json:"observerBlacklist,omitempty"`
|
||||
|
||||
// obsBlacklistSetCached is the lazily-built set version of ObserverBlacklist.
|
||||
obsBlacklistSetCached map[string]bool
|
||||
obsBlacklistOnce sync.Once
|
||||
|
||||
ResolvedPath *ResolvedPathConfig `json:"resolvedPath,omitempty"`
|
||||
NeighborGraph *NeighborGraphConfig `json:"neighborGraph,omitempty"`
|
||||
}
|
||||
@@ -404,3 +413,29 @@ func (c *Config) IsBlacklisted(pubkey string) bool {
|
||||
}
|
||||
return c.blacklistSet()[strings.ToLower(strings.TrimSpace(pubkey))]
|
||||
}
|
||||
|
||||
// obsBlacklistSet lazily builds and caches the observerBlacklist as a set for O(1) lookups.
|
||||
func (c *Config) obsBlacklistSet() map[string]bool {
|
||||
c.obsBlacklistOnce.Do(func() {
|
||||
if len(c.ObserverBlacklist) == 0 {
|
||||
return
|
||||
}
|
||||
m := make(map[string]bool, len(c.ObserverBlacklist))
|
||||
for _, pk := range c.ObserverBlacklist {
|
||||
trimmed := strings.ToLower(strings.TrimSpace(pk))
|
||||
if trimmed != "" {
|
||||
m[trimmed] = true
|
||||
}
|
||||
}
|
||||
c.obsBlacklistSetCached = m
|
||||
})
|
||||
return c.obsBlacklistSetCached
|
||||
}
|
||||
|
||||
// IsObserverBlacklisted returns true if the given observer ID is in the observerBlacklist.
|
||||
func (c *Config) IsObserverBlacklisted(id string) bool {
|
||||
if c == nil || len(c.ObserverBlacklist) == 0 {
|
||||
return false
|
||||
}
|
||||
return c.obsBlacklistSet()[strings.ToLower(strings.TrimSpace(id))]
|
||||
}
|
||||
|
||||
@@ -35,7 +35,8 @@ func setupTestDBv2(t *testing.T) *DB {
|
||||
CREATE TABLE observers (
|
||||
id TEXT PRIMARY KEY, name TEXT, iata TEXT, last_seen TEXT, first_seen TEXT,
|
||||
packet_count INTEGER DEFAULT 0, model TEXT, firmware TEXT,
|
||||
client_version TEXT, radio TEXT, battery_mv INTEGER, uptime_secs INTEGER, noise_floor REAL
|
||||
client_version TEXT, radio TEXT, battery_mv INTEGER, uptime_secs INTEGER, noise_floor REAL,
|
||||
inactive INTEGER DEFAULT 0
|
||||
);
|
||||
CREATE TABLE transmissions (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT, raw_hex TEXT NOT NULL,
|
||||
|
||||
+3
-3
@@ -231,7 +231,7 @@ func (db *DB) GetStats() (*Stats, error) {
|
||||
sevenDaysAgo := time.Now().Add(-7 * 24 * time.Hour).Format(time.RFC3339)
|
||||
db.conn.QueryRow("SELECT COUNT(*) FROM nodes WHERE last_seen > ?", sevenDaysAgo).Scan(&s.TotalNodes)
|
||||
db.conn.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&s.TotalNodesAllTime)
|
||||
db.conn.QueryRow("SELECT COUNT(*) FROM observers").Scan(&s.TotalObservers)
|
||||
db.conn.QueryRow("SELECT COUNT(*) FROM observers WHERE inactive IS NULL OR inactive = 0").Scan(&s.TotalObservers)
|
||||
|
||||
oneHourAgo := time.Now().Add(-1 * time.Hour).Unix()
|
||||
db.conn.QueryRow("SELECT COUNT(*) FROM observations WHERE timestamp > ?", oneHourAgo).Scan(&s.PacketsLastHour)
|
||||
@@ -970,9 +970,9 @@ func (db *DB) getObservationsForTransmissions(txIDs []int) map[int][]map[string]
|
||||
return result
|
||||
}
|
||||
|
||||
// GetObservers returns all observers sorted by last_seen DESC.
|
||||
// GetObservers returns active observers (not soft-deleted) sorted by last_seen DESC.
|
||||
func (db *DB) GetObservers() ([]Observer, error) {
|
||||
rows, err := db.conn.Query("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor FROM observers ORDER BY last_seen DESC")
|
||||
rows, err := db.conn.Query("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor FROM observers WHERE inactive IS NULL OR inactive = 0 ORDER BY last_seen DESC")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
+27
-1
@@ -48,7 +48,8 @@ func setupTestDB(t *testing.T) *DB {
|
||||
radio TEXT,
|
||||
battery_mv INTEGER,
|
||||
uptime_secs INTEGER,
|
||||
noise_floor REAL
|
||||
noise_floor REAL,
|
||||
inactive INTEGER DEFAULT 0
|
||||
);
|
||||
|
||||
CREATE TABLE transmissions (
|
||||
@@ -357,6 +358,31 @@ func TestGetObservers(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Regression: GetObservers must exclude soft-deleted (inactive=1) rows.
|
||||
// Stale observers were appearing in /api/observers despite the auto-prune
|
||||
// marking them inactive, because the SELECT query had no WHERE filter.
|
||||
func TestGetObservers_ExcludesInactive(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
seedTestData(t, db)
|
||||
// Mark obs2 inactive — soft delete simulating a stale-observer prune.
|
||||
if _, err := db.conn.Exec(`UPDATE observers SET inactive = 1 WHERE id = ?`, "obs2"); err != nil {
|
||||
t.Fatalf("update inactive: %v", err)
|
||||
}
|
||||
observers, err := db.GetObservers()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(observers) != 1 {
|
||||
t.Errorf("expected 1 observer (obs1) after marking obs2 inactive, got %d", len(observers))
|
||||
}
|
||||
for _, o := range observers {
|
||||
if o.ID == "obs2" {
|
||||
t.Errorf("inactive observer obs2 should be excluded")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetObserverByID(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// readiness tracks whether background init goroutines have completed.
|
||||
// Set to 1 once store.Load, pickBestObservation, and neighbor graph build are done.
|
||||
var readiness atomic.Int32
|
||||
|
||||
// handleHealthz returns 200 when the server is ready to serve queries,
|
||||
// or 503 while background initialization is still running.
|
||||
func (s *Server) handleHealthz(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
if readiness.Load() == 0 {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"ready": false,
|
||||
"reason": "loading",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
var loadedTx, loadedObs int
|
||||
if s.store != nil {
|
||||
s.store.mu.RLock()
|
||||
loadedTx = len(s.store.packets)
|
||||
for _, p := range s.store.packets {
|
||||
loadedObs += len(p.Observations)
|
||||
}
|
||||
s.store.mu.RUnlock()
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"ready": true,
|
||||
"loadedTx": loadedTx,
|
||||
"loadedObs": loadedObs,
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,80 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestHealthzNotReady(t *testing.T) {
|
||||
// Ensure readiness is 0 (not ready)
|
||||
readiness.Store(0)
|
||||
defer readiness.Store(0)
|
||||
|
||||
srv := &Server{store: &PacketStore{}}
|
||||
req := httptest.NewRequest("GET", "/api/healthz", nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
srv.handleHealthz(w, req)
|
||||
|
||||
if w.Code != http.StatusServiceUnavailable {
|
||||
t.Fatalf("expected 503, got %d", w.Code)
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("invalid JSON: %v", err)
|
||||
}
|
||||
if resp["ready"] != false {
|
||||
t.Fatalf("expected ready=false, got %v", resp["ready"])
|
||||
}
|
||||
if resp["reason"] != "loading" {
|
||||
t.Fatalf("expected reason=loading, got %v", resp["reason"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealthzReady(t *testing.T) {
|
||||
readiness.Store(1)
|
||||
defer readiness.Store(0)
|
||||
|
||||
srv := &Server{store: &PacketStore{}}
|
||||
req := httptest.NewRequest("GET", "/api/healthz", nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
srv.handleHealthz(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d", w.Code)
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("invalid JSON: %v", err)
|
||||
}
|
||||
if resp["ready"] != true {
|
||||
t.Fatalf("expected ready=true, got %v", resp["ready"])
|
||||
}
|
||||
if _, ok := resp["loadedTx"]; !ok {
|
||||
t.Fatal("missing loadedTx field")
|
||||
}
|
||||
if _, ok := resp["loadedObs"]; !ok {
|
||||
t.Fatal("missing loadedObs field")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealthzAntiTautology(t *testing.T) {
|
||||
// When readiness is 0, must NOT return 200
|
||||
readiness.Store(0)
|
||||
defer readiness.Store(0)
|
||||
|
||||
srv := &Server{store: &PacketStore{}}
|
||||
req := httptest.NewRequest("GET", "/api/healthz", nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
srv.handleHealthz(w, req)
|
||||
|
||||
if w.Code == http.StatusOK {
|
||||
t.Fatal("anti-tautology: handler returned 200 when readiness=0; gating is broken")
|
||||
}
|
||||
}
|
||||
@@ -174,6 +174,21 @@ func main() {
|
||||
database.hasResolvedPath = true // detectSchema ran before column was added; fix the flag
|
||||
}
|
||||
|
||||
// Ensure observers.inactive column exists (PR #954 filters on it; ingestor migration
|
||||
// adds it but server may run against DBs ingestor never touched, e.g. e2e fixture).
|
||||
if err := ensureObserverInactiveColumn(dbPath); err != nil {
|
||||
log.Printf("[store] warning: could not add observers.inactive column: %v", err)
|
||||
}
|
||||
|
||||
// Soft-delete observers that are in the blacklist (mark inactive=1) so
|
||||
// historical data from a prior unblocked window is hidden too.
|
||||
if len(cfg.ObserverBlacklist) > 0 {
|
||||
softDeleteBlacklistedObservers(dbPath, cfg.ObserverBlacklist)
|
||||
}
|
||||
|
||||
// WaitGroup for background init steps that gate /api/healthz readiness.
|
||||
var initWg sync.WaitGroup
|
||||
|
||||
// Load or build neighbor graph
|
||||
if neighborEdgesTableExists(database.conn) {
|
||||
store.graph = loadNeighborEdgesFromDB(database.conn)
|
||||
@@ -181,7 +196,9 @@ func main() {
|
||||
} else {
|
||||
log.Printf("[neighbor] no persisted edges found, will build in background...")
|
||||
store.graph = NewNeighborGraph() // empty graph — gets populated by background goroutine
|
||||
initWg.Add(1)
|
||||
go func() {
|
||||
defer initWg.Done()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("[neighbor] graph build panic recovered: %v", r)
|
||||
@@ -205,7 +222,9 @@ func main() {
|
||||
// API serves best-effort data until this completes (~10s for 100K txs).
|
||||
// Processes in chunks of 5000, releasing the lock between chunks so API
|
||||
// handlers remain responsive.
|
||||
initWg.Add(1)
|
||||
go func() {
|
||||
defer initWg.Done()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("[store] pickBestObservation panic recovered: %v", r)
|
||||
@@ -233,6 +252,13 @@ func main() {
|
||||
log.Printf("[store] initial pickBestObservation complete (%d transmissions)", totalPackets)
|
||||
}()
|
||||
|
||||
// Mark server ready once all background init completes.
|
||||
go func() {
|
||||
initWg.Wait()
|
||||
readiness.Store(1)
|
||||
log.Printf("[server] readiness: ready=true (background init complete)")
|
||||
}()
|
||||
|
||||
// WebSocket hub
|
||||
hub := NewHub()
|
||||
|
||||
|
||||
@@ -281,6 +281,80 @@ func ensureResolvedPathColumn(dbPath string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ensureObserverInactiveColumn adds the inactive column to observers if missing.
|
||||
// The column was originally added by ingestor migration (cmd/ingestor/db.go:344) to
|
||||
// support soft-delete via RemoveStaleObservers + filtered reads (PR #954). When the
|
||||
// server starts against a DB that was never touched by the ingestor (e.g. the e2e
|
||||
// fixture), the column is missing and read queries that filter on it (GetObservers,
|
||||
// GetStats) silently fail with "no such column: inactive" — leaving /api/observers
|
||||
// returning empty.
|
||||
func ensureObserverInactiveColumn(dbPath string) error {
|
||||
rw, err := openRW(dbPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rw.Close()
|
||||
|
||||
rows, err := rw.Query("PRAGMA table_info(observers)")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var cid int
|
||||
var colName string
|
||||
var colType sql.NullString
|
||||
var notNull, pk int
|
||||
var dflt sql.NullString
|
||||
if rows.Scan(&cid, &colName, &colType, ¬Null, &dflt, &pk) == nil && colName == "inactive" {
|
||||
return nil // already exists
|
||||
}
|
||||
}
|
||||
|
||||
_, err = rw.Exec("ALTER TABLE observers ADD COLUMN inactive INTEGER DEFAULT 0")
|
||||
if err != nil {
|
||||
return fmt.Errorf("add inactive column: %w", err)
|
||||
}
|
||||
log.Println("[store] Added inactive column to observers")
|
||||
return nil
|
||||
}
|
||||
|
||||
// softDeleteBlacklistedObservers marks observers matching the blacklist as
|
||||
// inactive=1 so they are hidden from API responses. Runs once at startup.
|
||||
func softDeleteBlacklistedObservers(dbPath string, blacklist []string) {
|
||||
rw, err := openRW(dbPath)
|
||||
if err != nil {
|
||||
log.Printf("[observer-blacklist] warning: could not open DB for soft-delete: %v", err)
|
||||
return
|
||||
}
|
||||
defer rw.Close()
|
||||
|
||||
placeholders := make([]string, 0, len(blacklist))
|
||||
args := make([]interface{}, 0, len(blacklist))
|
||||
for _, pk := range blacklist {
|
||||
trimmed := strings.TrimSpace(pk)
|
||||
if trimmed == "" {
|
||||
continue
|
||||
}
|
||||
placeholders = append(placeholders, "LOWER(?)")
|
||||
args = append(args, trimmed)
|
||||
}
|
||||
if len(placeholders) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
query := "UPDATE observers SET inactive = 1 WHERE LOWER(id) IN (" + strings.Join(placeholders, ",") + ") AND (inactive IS NULL OR inactive = 0)"
|
||||
result, err := rw.Exec(query, args...)
|
||||
if err != nil {
|
||||
log.Printf("[observer-blacklist] warning: soft-delete failed: %v", err)
|
||||
return
|
||||
}
|
||||
if n, _ := result.RowsAffected(); n > 0 {
|
||||
log.Printf("[observer-blacklist] soft-deleted %d blacklisted observer(s)", n)
|
||||
}
|
||||
}
|
||||
|
||||
// resolvePathForObs resolves hop prefixes to full pubkeys for an observation.
|
||||
// Returns nil if path is empty.
|
||||
func resolvePathForObs(pathJSON, observerID string, tx *StoreTx, pm *prefixMap, graph *NeighborGraph) []*string {
|
||||
|
||||
@@ -0,0 +1,159 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestConfigIsObserverBlacklisted(t *testing.T) {
|
||||
cfg := &Config{
|
||||
ObserverBlacklist: []string{"OBS1", "obs2", " Obs3 "},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
id string
|
||||
want bool
|
||||
}{
|
||||
{"OBS1", true},
|
||||
{"obs1", true}, // case-insensitive
|
||||
{"OBS2", true},
|
||||
{"Obs3", true}, // whitespace trimmed
|
||||
{"obs4", false},
|
||||
{"", false},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
got := cfg.IsObserverBlacklisted(tt.id)
|
||||
if got != tt.want {
|
||||
t.Errorf("IsObserverBlacklisted(%q) = %v, want %v", tt.id, got, tt.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigIsObserverBlacklistedEmpty(t *testing.T) {
|
||||
cfg := &Config{}
|
||||
if cfg.IsObserverBlacklisted("anything") {
|
||||
t.Error("empty blacklist should not match anything")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigIsObserverBlacklistedNil(t *testing.T) {
|
||||
var cfg *Config
|
||||
if cfg.IsObserverBlacklisted("anything") {
|
||||
t.Error("nil config should not match anything")
|
||||
}
|
||||
}
|
||||
|
||||
func TestObserverBlacklistFiltersHandleObservers(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
db.conn.Exec("INSERT OR IGNORE INTO observers (id, name, iata, last_seen) VALUES ('goodobs', 'GoodObs', 'SFO', datetime('now'))")
|
||||
db.conn.Exec("INSERT OR IGNORE INTO observers (id, name, iata, last_seen) VALUES ('badobs', 'BadObs', 'LAX', datetime('now'))")
|
||||
|
||||
cfg := &Config{
|
||||
ObserverBlacklist: []string{"badobs"},
|
||||
}
|
||||
srv := NewServer(db, cfg, NewHub())
|
||||
srv.RegisterRoutes(setupTestRouter(srv))
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/observers", nil)
|
||||
w := httptest.NewRecorder()
|
||||
srv.router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp ObserverListResponse
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to parse response: %v", err)
|
||||
}
|
||||
|
||||
for _, obs := range resp.Observers {
|
||||
if obs.ID == "badobs" {
|
||||
t.Error("blacklisted observer should not appear in observers list")
|
||||
}
|
||||
}
|
||||
|
||||
foundGood := false
|
||||
for _, obs := range resp.Observers {
|
||||
if obs.ID == "goodobs" {
|
||||
foundGood = true
|
||||
}
|
||||
}
|
||||
if !foundGood {
|
||||
t.Error("non-blacklisted observer should appear in observers list")
|
||||
}
|
||||
}
|
||||
|
||||
func TestObserverBlacklistFiltersObserverDetail(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
db.conn.Exec("INSERT OR IGNORE INTO observers (id, name, iata, last_seen) VALUES ('badobs', 'BadObs', 'LAX', datetime('now'))")
|
||||
|
||||
cfg := &Config{
|
||||
ObserverBlacklist: []string{"badobs"},
|
||||
}
|
||||
srv := NewServer(db, cfg, NewHub())
|
||||
srv.RegisterRoutes(setupTestRouter(srv))
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/observers/badobs", nil)
|
||||
w := httptest.NewRecorder()
|
||||
srv.router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404 for blacklisted observer detail, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNoObserverBlacklistPassesAll(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
db.conn.Exec("INSERT OR IGNORE INTO observers (id, name, iata, last_seen) VALUES ('someobs', 'SomeObs', 'SFO', datetime('now'))")
|
||||
|
||||
cfg := &Config{}
|
||||
srv := NewServer(db, cfg, NewHub())
|
||||
srv.RegisterRoutes(setupTestRouter(srv))
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/observers", nil)
|
||||
w := httptest.NewRecorder()
|
||||
srv.router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d", w.Code)
|
||||
}
|
||||
|
||||
var resp ObserverListResponse
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to parse response: %v", err)
|
||||
}
|
||||
|
||||
foundSome := false
|
||||
for _, obs := range resp.Observers {
|
||||
if obs.ID == "someobs" {
|
||||
foundSome = true
|
||||
}
|
||||
}
|
||||
if !foundSome {
|
||||
t.Error("without blacklist, observer should appear")
|
||||
}
|
||||
}
|
||||
|
||||
func TestObserverBlacklistConcurrent(t *testing.T) {
|
||||
cfg := &Config{
|
||||
ObserverBlacklist: []string{"AA", "BB", "CC"},
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
for i := 0; i < 50; i++ {
|
||||
go func() {
|
||||
defer func() { done <- struct{}{} }()
|
||||
for j := 0; j < 100; j++ {
|
||||
cfg.IsObserverBlacklisted("AA")
|
||||
cfg.IsObserverBlacklisted("DD")
|
||||
}
|
||||
}()
|
||||
}
|
||||
for i := 0; i < 50; i++ {
|
||||
<-done
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
// TestHandleNodePaths_PrefixCollisionExclusion verifies that paths through a node
|
||||
// sharing a 2-char prefix with another node are not returned as false positives
|
||||
// when they have no resolved_path data (issue #929).
|
||||
//
|
||||
// Setup:
|
||||
// - nodeA (target): pubkey starts with "7a", no GPS
|
||||
// - nodeB (other): pubkey starts with "7a", has GPS → "7a" resolves to nodeB
|
||||
// - tx1: path ["7a"], resolved_path NULL → false positive candidate, must be excluded
|
||||
// - tx2: path ["7a"], resolved_path contains nodeA pubkey → SQL-confirmed, must be included
|
||||
func TestHandleNodePaths_PrefixCollisionExclusion(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
recent := time.Now().Add(-1 * time.Hour).Format(time.RFC3339)
|
||||
recentEpoch := time.Now().Add(-1 * time.Hour).Unix()
|
||||
|
||||
nodeAPK := "7acb1111aaaabbbb"
|
||||
nodeBPK := "7aff2222ccccdddd" // same "7a" prefix, has GPS so resolveHop("7a") picks B
|
||||
|
||||
db.conn.Exec(`INSERT INTO nodes (public_key, name, role, lat, lon, last_seen, first_seen, advert_count)
|
||||
VALUES (?, 'NodeA', 'repeater', 0, 0, ?, '2026-01-01', 1)`, nodeAPK, recent)
|
||||
db.conn.Exec(`INSERT INTO nodes (public_key, name, role, lat, lon, last_seen, first_seen, advert_count)
|
||||
VALUES (?, 'NodeB', 'repeater', 37.5, -122.0, ?, '2026-01-01', 1)`, nodeBPK, recent)
|
||||
|
||||
// tx1: no resolved_path — should be excluded by hop-level check
|
||||
db.conn.Exec(`INSERT INTO transmissions (id, raw_hex, hash, first_seen) VALUES (10, 'AA', 'hash_fp', ?)`, recent)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, path_json, timestamp, resolved_path)
|
||||
VALUES (10, NULL, '["7a"]', ?, NULL)`, recentEpoch)
|
||||
|
||||
// tx2: resolved_path confirms nodeA — must be included
|
||||
db.conn.Exec(`INSERT INTO transmissions (id, raw_hex, hash, first_seen) VALUES (11, 'BB', 'hash_tp', ?)`, recent)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, path_json, timestamp, resolved_path)
|
||||
VALUES (11, NULL, '["7a"]', ?, ?)`, recentEpoch, `["`+nodeAPK+`"]`)
|
||||
|
||||
cfg := &Config{Port: 3000}
|
||||
hub := NewHub()
|
||||
srv := NewServer(db, cfg, hub)
|
||||
store := NewPacketStore(db, nil)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store.Load: %v", err)
|
||||
}
|
||||
srv.store = store
|
||||
router := mux.NewRouter()
|
||||
srv.RegisterRoutes(router)
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/nodes/"+nodeAPK+"/paths", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp NodePathsResponse
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("unmarshal: %v", err)
|
||||
}
|
||||
|
||||
// Only the SQL-confirmed path (tx2) should be present; tx1 (false positive) must be excluded.
|
||||
// tx1 and tx2 share the same raw path ["7a"] so they collapse into 1 unique path group.
|
||||
// If tx1 were included, TotalTransmissions would be 2.
|
||||
if resp.TotalPaths != 1 {
|
||||
t.Errorf("expected 1 path group, got %d", resp.TotalPaths)
|
||||
}
|
||||
if resp.TotalTransmissions != 1 {
|
||||
t.Errorf("expected 1 transmission (false positive tx1 excluded), got %d", resp.TotalTransmissions)
|
||||
}
|
||||
}
|
||||
+43
-6
@@ -118,6 +118,9 @@ func (s *Server) RegisterRoutes(r *mux.Router) {
|
||||
r.HandleFunc("/api/config/map", s.handleConfigMap).Methods("GET")
|
||||
r.HandleFunc("/api/config/geo-filter", s.handleConfigGeoFilter).Methods("GET")
|
||||
|
||||
// Readiness endpoint (gated on background init completion)
|
||||
r.HandleFunc("/api/healthz", s.handleHealthz).Methods("GET")
|
||||
|
||||
// System endpoints
|
||||
r.HandleFunc("/api/health", s.handleHealth).Methods("GET")
|
||||
r.HandleFunc("/api/stats", s.handleStats).Methods("GET")
|
||||
@@ -1258,25 +1261,31 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) {
|
||||
_, pm := s.store.getCachedNodesAndPM()
|
||||
|
||||
// Collect candidate transmissions from the index, deduplicating by tx ID.
|
||||
// confirmedByFullKey tracks TXs found via the full-pubkey index key — these are
|
||||
// already resolved_path-confirmed and bypass the hop-level check below.
|
||||
confirmedByFullKey := make(map[int]bool)
|
||||
seen := make(map[int]bool)
|
||||
var candidates []*StoreTx
|
||||
addCandidates := func(key string) {
|
||||
addCandidates := func(key string, confirmed bool) {
|
||||
for _, tx := range s.store.byPathHop[key] {
|
||||
if !seen[tx.ID] {
|
||||
seen[tx.ID] = true
|
||||
if confirmed {
|
||||
confirmedByFullKey[tx.ID] = true
|
||||
}
|
||||
candidates = append(candidates, tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
addCandidates(lowerPK) // full pubkey match (from resolved_path)
|
||||
addCandidates(prefix1) // 2-char raw hop match
|
||||
addCandidates(prefix2) // 4-char raw hop match
|
||||
addCandidates(lowerPK, true) // full pubkey match (from resolved_path) → confirmed
|
||||
addCandidates(prefix1, false) // 2-char raw hop match
|
||||
addCandidates(prefix2, false) // 4-char raw hop match
|
||||
// Also check any raw hops that start with prefix2 (longer prefixes).
|
||||
// Raw hops are typically 2 chars, so iterate only keys with HasPrefix
|
||||
// on the small set of index keys rather than all packets.
|
||||
for key := range s.store.byPathHop {
|
||||
if len(key) > 4 && len(key) < len(lowerPK) && strings.HasPrefix(key, prefix2) {
|
||||
addCandidates(key)
|
||||
addCandidates(key, false)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1313,6 +1322,7 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) {
|
||||
s.store.mu.RUnlock()
|
||||
|
||||
// Now run SQL checks outside the lock for candidates that need confirmation.
|
||||
confirmedBySQL := make(map[int]bool)
|
||||
filtered := candidates[:0]
|
||||
for _, cc := range checks {
|
||||
if cc.inIndex {
|
||||
@@ -1320,6 +1330,7 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) {
|
||||
} else if cc.hasReverse {
|
||||
if s.store.confirmResolvedPathContains(cc.tx.ID, lowerPK) {
|
||||
filtered = append(filtered, cc.tx)
|
||||
confirmedBySQL[cc.tx.ID] = true
|
||||
}
|
||||
}
|
||||
// else: not in index → exclude
|
||||
@@ -1347,10 +1358,14 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) {
|
||||
return r
|
||||
}
|
||||
for _, tx := range candidates {
|
||||
totalTransmissions++
|
||||
hops := txGetParsedPath(tx)
|
||||
resolvedHops := make([]PathHopResp, len(hops))
|
||||
sigParts := make([]string, len(hops))
|
||||
// For candidates not confirmed via full-pubkey index or SQL, verify that at
|
||||
// least one hop actually resolves to the target. This catches prefix collisions
|
||||
// (e.g. two nodes sharing a "7a" 1-byte prefix) that slipped through the
|
||||
// conservative resolved_path fallback.
|
||||
containsTarget := confirmedByFullKey[tx.ID] || confirmedBySQL[tx.ID]
|
||||
for i, hop := range hops {
|
||||
resolved := resolveHop(hop)
|
||||
entry := PathHopResp{Prefix: hop, Name: hop}
|
||||
@@ -1362,11 +1377,22 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) {
|
||||
entry.Lon = resolved.Lon
|
||||
}
|
||||
sigParts[i] = resolved.PublicKey
|
||||
if strings.ToLower(resolved.PublicKey) == lowerPK {
|
||||
containsTarget = true
|
||||
}
|
||||
} else {
|
||||
sigParts[i] = hop
|
||||
// Unresolvable hop: keep conservative if prefix could be the target.
|
||||
if strings.HasPrefix(lowerPK, strings.ToLower(hop)) {
|
||||
containsTarget = true
|
||||
}
|
||||
}
|
||||
resolvedHops[i] = entry
|
||||
}
|
||||
if !containsTarget {
|
||||
continue
|
||||
}
|
||||
totalTransmissions++
|
||||
|
||||
sig := strings.Join(sigParts, "→")
|
||||
agg := pathGroups[sig]
|
||||
@@ -1929,6 +1955,10 @@ func (s *Server) handleObservers(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
result := make([]ObserverResp, 0, len(observers))
|
||||
for _, o := range observers {
|
||||
// Defense in depth: skip observers that are in the blacklist
|
||||
if s.cfg != nil && s.cfg.IsObserverBlacklisted(o.ID) {
|
||||
continue
|
||||
}
|
||||
plh := 0
|
||||
if c, ok := pktCounts[o.ID]; ok {
|
||||
plh = c
|
||||
@@ -1960,6 +1990,13 @@ func (s *Server) handleObservers(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
func (s *Server) handleObserverDetail(w http.ResponseWriter, r *http.Request) {
|
||||
id := mux.Vars(r)["id"]
|
||||
|
||||
// Defense in depth: reject blacklisted observer
|
||||
if s.cfg != nil && s.cfg.IsObserverBlacklisted(id) {
|
||||
writeError(w, 404, "Observer not found")
|
||||
return
|
||||
}
|
||||
|
||||
obs, err := s.db.GetObserverByID(id)
|
||||
if err != nil || obs == nil {
|
||||
writeError(w, 404, "Observer not found")
|
||||
|
||||
+5
-4
@@ -4,7 +4,7 @@
|
||||
// --- Route/Payload name maps ---
|
||||
const ROUTE_TYPES = { 0: 'TRANSPORT_FLOOD', 1: 'FLOOD', 2: 'DIRECT', 3: 'TRANSPORT_DIRECT' };
|
||||
const PAYLOAD_TYPES = { 0: 'Request', 1: 'Response', 2: 'Direct Msg', 3: 'ACK', 4: 'Advert', 5: 'Channel Msg', 6: 'Group Data', 7: 'Anon Req', 8: 'Path', 9: 'Trace', 10: 'Multipart', 11: 'Control', 15: 'Raw Custom' };
|
||||
const PAYLOAD_COLORS = { 0: 'req', 1: 'response', 2: 'txt-msg', 3: 'ack', 4: 'advert', 5: 'grp-txt', 7: 'anon-req', 8: 'path', 9: 'trace' };
|
||||
const PAYLOAD_COLORS = { 0: 'req', 1: 'response', 2: 'txt-msg', 3: 'ack', 4: 'advert', 5: 'grp-txt', 6: 'grp-data', 7: 'anon-req', 8: 'path', 9: 'trace' };
|
||||
|
||||
function routeTypeName(n) { return ROUTE_TYPES[n] || 'UNKNOWN'; }
|
||||
function payloadTypeName(n) { return PAYLOAD_TYPES[n] || 'UNKNOWN'; }
|
||||
@@ -965,10 +965,11 @@ window.addEventListener('DOMContentLoaded', () => {
|
||||
}).catch(() => {
|
||||
window.SITE_CONFIG = { timestamps: { defaultMode: 'ago', timezone: 'local', formatPreset: 'iso', customFormat: '', allowCustomFormat: false } };
|
||||
if (window._customizerV2) window._customizerV2.init(window.SITE_CONFIG);
|
||||
}).finally(() => {
|
||||
if (!location.hash || location.hash === '#/') location.hash = '#/home';
|
||||
else navigate();
|
||||
});
|
||||
|
||||
// Navigate immediately — don't gate data-fetching pages on cosmetic theme fetch
|
||||
if (!location.hash || location.hash === '#/') location.hash = '#/home';
|
||||
else navigate();
|
||||
});
|
||||
|
||||
/**
|
||||
|
||||
@@ -595,6 +595,11 @@
|
||||
// Only fitBounds on subsequent data refreshes if user hasn't manually panned
|
||||
} catch (e) {
|
||||
console.error('Map load error:', e);
|
||||
} finally {
|
||||
// Always signal data-loaded — even on error — so E2E tests can proceed.
|
||||
// Otherwise an api() failure leaves the test waiting forever.
|
||||
var mapContainer = document.getElementById('leaflet-map');
|
||||
if (mapContainer) mapContainer.setAttribute('data-loaded', 'true');
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+41
-1
@@ -815,6 +815,41 @@
|
||||
* Shared between the full-screen detail page and the side panel (#813, #690).
|
||||
* No-op if the container is missing, the API errors, or the response lacks severity.
|
||||
*/
|
||||
/** Build collapsible evidence panel for node clock skew card */
|
||||
function buildEvidencePanel(cs) {
|
||||
var evidence = cs.recentHashEvidence;
|
||||
if (!evidence || evidence.length === 0) return '';
|
||||
|
||||
var calSum = cs.calibrationSummary || {};
|
||||
var calLine = calSum.totalSamples
|
||||
? '<div style="font-size:11px;color:var(--text-muted);margin-bottom:6px">Last ' + calSum.totalSamples + ' samples: ' + (calSum.calibratedSamples || 0) + ' corrected via observer calibration, ' + (calSum.uncalibratedSamples || 0) + ' uncorrected (single-observer).</div>'
|
||||
: '';
|
||||
|
||||
// Severity reason.
|
||||
var skewVal = window.currentSkewValue(cs);
|
||||
var sampleCount = (cs.samples || []).length;
|
||||
var sevLabel = SKEW_SEVERITY_LABELS[cs.severity] || cs.severity;
|
||||
var reasonLine = '<div style="font-size:12px;margin-bottom:8px"><strong>Recent ' + sampleCount + ' adverts median ' + formatSkew(skewVal) + ' → ' + sevLabel + '</strong></div>';
|
||||
|
||||
var hashBlocks = evidence.map(function(ev) {
|
||||
var shortHash = (ev.hash || '').substring(0, 8) + '…';
|
||||
var obsCount = ev.observers ? ev.observers.length : 0;
|
||||
var header = '<div style="font-weight:600;font-size:12px;margin-top:6px">Hash ' + shortHash + ' · ' + obsCount + ' observer' + (obsCount !== 1 ? 's' : '') + ' · median corrected: ' + formatSkew(ev.medianCorrectedSkewSec) + '</div>';
|
||||
var lines = (ev.observers || []).map(function(o) {
|
||||
var name = o.observerName || o.observerID;
|
||||
return '<div style="font-size:11px;padding-left:16px;font-family:var(--mono)">' +
|
||||
name + ' raw=' + formatSkew(o.rawSkewSec) + ' corrected=' + formatSkew(o.correctedSkewSec) + ' (observer offset ' + formatSkew(o.observerOffsetSec) + ')' +
|
||||
'</div>';
|
||||
}).join('');
|
||||
return header + lines;
|
||||
}).join('');
|
||||
|
||||
return '<details style="margin-top:10px"><summary style="cursor:pointer;font-size:12px;color:var(--text-muted)">Evidence (' + evidence.length + ' hashes)</summary>' +
|
||||
'<div style="margin-top:6px;padding:8px;background:var(--bg-secondary);border-radius:6px">' +
|
||||
reasonLine + calLine + hashBlocks +
|
||||
'</div></details>';
|
||||
}
|
||||
|
||||
async function loadClockSkewInto(container, pubkey) {
|
||||
if (!container) return;
|
||||
try {
|
||||
@@ -841,7 +876,8 @@
|
||||
'</div>' +
|
||||
driftHtml +
|
||||
(sparkHtml ? '<div class="skew-sparkline-wrap" style="margin-top:8px">' + sparkHtml + '<div style="font-size:10px;color:var(--text-muted)">Skew over time (' + (cs.samples || []).length + ' samples)</div></div>' : '') +
|
||||
bimodalWarning;
|
||||
bimodalWarning +
|
||||
buildEvidencePanel(cs);
|
||||
} catch (e) {
|
||||
// Non-fatal — section stays hidden
|
||||
}
|
||||
@@ -955,6 +991,10 @@
|
||||
console.error('Failed to load nodes:', e);
|
||||
const tbody = document.getElementById('nodesBody');
|
||||
if (tbody) tbody.innerHTML = '<tr><td colspan="6" class="text-center" style="padding:24px;color:var(--error,#ef4444)"><div role="alert" aria-live="polite">Failed to load nodes. Please try again.</div></td></tr>';
|
||||
} finally {
|
||||
// Always signal data-loaded — even on error — so E2E tests can proceed.
|
||||
var nodesContainer = document.getElementById('nodesLeft') || document.getElementById('nodesBody');
|
||||
if (nodesContainer) nodesContainer.setAttribute('data-loaded', 'true');
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -70,18 +70,24 @@
|
||||
try {
|
||||
destroyCharts();
|
||||
chartDefaults();
|
||||
const [obs, analytics] = await Promise.all([
|
||||
const [obs, analytics, obsSkewArr] = await Promise.all([
|
||||
api('/observers/' + encodeURIComponent(currentId)),
|
||||
api('/observers/' + encodeURIComponent(currentId) + '/analytics?days=' + currentDays),
|
||||
api('/observers/clock-skew', { ttl: 30000 }).catch(function() { return []; }),
|
||||
]);
|
||||
renderDetail(obs, analytics);
|
||||
// Find this observer's calibration data.
|
||||
var obsSkew = null;
|
||||
(Array.isArray(obsSkewArr) ? obsSkewArr : []).forEach(function(s) {
|
||||
if (s && s.observerID === currentId) obsSkew = s;
|
||||
});
|
||||
renderDetail(obs, analytics, obsSkew);
|
||||
} catch (e) {
|
||||
document.getElementById('obsDetailContent').innerHTML =
|
||||
'<div class="text-muted" style="padding:40px">Error: ' + e.message + '</div>';
|
||||
}
|
||||
}
|
||||
|
||||
function renderDetail(obs, analytics) {
|
||||
function renderDetail(obs, analytics, obsSkew) {
|
||||
const el = document.getElementById('obsDetailContent');
|
||||
if (!el) return;
|
||||
|
||||
@@ -154,6 +160,18 @@
|
||||
<div class="mono" style="font-size:0.75em;color:var(--text-muted);margin-bottom:20px;word-break:break-all">
|
||||
ID: ${obs.id}
|
||||
</div>
|
||||
${obsSkew && obsSkew.samples > 0 ? `
|
||||
<div class="node-full-card skew-detail-section" style="margin-bottom:20px;padding:12px">
|
||||
<h4 style="margin:0 0 6px">⏰ Clock Offset</h4>
|
||||
<div style="display:flex;align-items:center;gap:12px;flex-wrap:wrap">
|
||||
<span style="font-size:18px;font-weight:700;font-family:var(--mono)">${formatSkew(obsSkew.offsetSec)}</span>
|
||||
${renderSkewBadge(observerSkewSeverity(obsSkew.offsetSec), obsSkew.offsetSec)}
|
||||
<span class="text-muted" style="font-size:12px">${obsSkew.samples} sample${obsSkew.samples !== 1 ? 's' : ''}</span>
|
||||
</div>
|
||||
<div style="font-size:12px;color:var(--text-muted);margin-top:8px;max-width:600px">
|
||||
<strong>How this is computed:</strong> when this observer and another observer see the same packet, we compare their receive timestamps. The median deviation across all multi-observer packets is this observer's offset.
|
||||
</div>
|
||||
</div>` : ''}
|
||||
<div class="obs-charts" style="display:grid;grid-template-columns:repeat(auto-fit,minmax(400px,1fr));gap:16px">
|
||||
<div class="chart-card" style="padding:12px">
|
||||
<h3 style="margin:0 0 8px;font-size:0.95em">Packets Over Time</h3>
|
||||
|
||||
+17
-2
@@ -3,6 +3,7 @@
|
||||
|
||||
(function () {
|
||||
let observers = [];
|
||||
let obsSkewMap = {}; // observerID → {offsetSec, samples}
|
||||
let wsHandler = null;
|
||||
let refreshTimer = null;
|
||||
let regionChangeHandler = null;
|
||||
@@ -51,12 +52,20 @@
|
||||
if (regionChangeHandler) RegionFilter.offChange(regionChangeHandler);
|
||||
regionChangeHandler = null;
|
||||
observers = [];
|
||||
obsSkewMap = {};
|
||||
}
|
||||
|
||||
async function loadObservers() {
|
||||
try {
|
||||
const data = await api('/observers', { ttl: CLIENT_TTL.observers });
|
||||
const [data, skewData] = await Promise.all([
|
||||
api('/observers', { ttl: CLIENT_TTL.observers }),
|
||||
api('/observers/clock-skew', { ttl: 30000 }).catch(function() { return []; })
|
||||
]);
|
||||
observers = data.observers || [];
|
||||
obsSkewMap = {};
|
||||
(Array.isArray(skewData) ? skewData : []).forEach(function(s) {
|
||||
if (s && s.observerID) obsSkewMap[s.observerID] = s;
|
||||
});
|
||||
render();
|
||||
} catch (e) {
|
||||
document.getElementById('obsContent').innerHTML =
|
||||
@@ -124,7 +133,7 @@
|
||||
<caption class="sr-only">Observer status and statistics</caption>
|
||||
<thead><tr>
|
||||
<th scope="col">Status</th><th scope="col">Name</th><th scope="col">Region</th><th scope="col">Last Seen</th>
|
||||
<th scope="col">Packets</th><th scope="col">Packets/Hour</th><th scope="col">Uptime</th>
|
||||
<th scope="col">Packets</th><th scope="col">Packets/Hour</th><th scope="col">Clock Offset</th><th scope="col">Uptime</th>
|
||||
</tr></thead>
|
||||
<tbody>${filtered.map(o => {
|
||||
const h = healthStatus(o.last_seen);
|
||||
@@ -136,6 +145,12 @@
|
||||
<td>${timeAgo(o.last_seen)}</td>
|
||||
<td>${(o.packet_count || 0).toLocaleString()}</td>
|
||||
<td>${sparkBar(o.packetsLastHour || 0, maxPktsHr)}</td>
|
||||
<td>${(function() {
|
||||
var sk = obsSkewMap[o.id];
|
||||
if (!sk || sk.samples == null || sk.samples === 0) return '<span class="text-muted">—</span>';
|
||||
var sev = observerSkewSeverity(sk.offsetSec);
|
||||
return renderSkewBadge(sev, sk.offsetSec) + ' <span class="text-muted" title="Computed from ' + sk.samples + ' multi-observer packets. Positive = observer ahead of consensus.">(' + sk.samples + ')</span>';
|
||||
})()}</td>
|
||||
<td>${uptimeStr(o.first_seen)}</td>
|
||||
</tr>`;
|
||||
}).join('')}</tbody>
|
||||
|
||||
+5
-1
@@ -26,7 +26,7 @@
|
||||
let observers = [];
|
||||
let observerMap = new Map(); // id → observer for O(1) lookups (#383)
|
||||
let regionMap = {};
|
||||
const TYPE_NAMES = { 0:'Request', 1:'Response', 2:'Direct Msg', 3:'ACK', 4:'Advert', 5:'Channel Msg', 7:'Anon Req', 8:'Path', 9:'Trace', 11:'Control' };
|
||||
const TYPE_NAMES = { 0:'Request', 1:'Response', 2:'Direct Msg', 3:'ACK', 4:'Advert', 5:'Channel Msg', 6:'Group Data', 7:'Anon Req', 8:'Path', 9:'Trace', 11:'Control' };
|
||||
function typeName(t) { return TYPE_NAMES[t] ?? `Type ${t}`; }
|
||||
const isMobile = window.innerWidth <= 1024;
|
||||
const PACKET_LIMIT = isMobile ? 1000 : 50000;
|
||||
@@ -748,6 +748,10 @@
|
||||
console.error('Failed to load packets:', e);
|
||||
const tbody = document.getElementById('pktBody');
|
||||
if (tbody) tbody.innerHTML = '<tr><td colspan="' + _getColCount() + '" class="text-center" style="padding:24px;color:var(--error,#ef4444)"><div role="alert" aria-live="polite">Failed to load packets. Please try again.</div></td></tr>';
|
||||
} finally {
|
||||
// Always signal data-loaded — even on error — so E2E tests can proceed.
|
||||
var pktContainer = document.getElementById('pktLeft') || document.getElementById('pktBody');
|
||||
if (pktContainer) pktContainer.setAttribute('data-loaded', 'true');
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+8
-2
@@ -15,14 +15,14 @@
|
||||
};
|
||||
|
||||
window.TYPE_COLORS = {
|
||||
ADVERT: '#22c55e', GRP_TXT: '#3b82f6', TXT_MSG: '#f59e0b', ACK: '#6b7280',
|
||||
ADVERT: '#22c55e', GRP_TXT: '#3b82f6', GRP_DATA: '#8b5cf6', TXT_MSG: '#f59e0b', ACK: '#6b7280',
|
||||
REQUEST: '#a855f7', RESPONSE: '#06b6d4', TRACE: '#ec4899', PATH: '#14b8a6',
|
||||
ANON_REQ: '#f43f5e', UNKNOWN: '#6b7280'
|
||||
};
|
||||
|
||||
// Badge CSS class name mapping
|
||||
const TYPE_BADGE_MAP = {
|
||||
ADVERT: 'advert', GRP_TXT: 'grp-txt', TXT_MSG: 'txt-msg', ACK: 'ack',
|
||||
ADVERT: 'advert', GRP_TXT: 'grp-txt', GRP_DATA: 'grp-data', TXT_MSG: 'txt-msg', ACK: 'ack',
|
||||
REQUEST: 'req', RESPONSE: 'response', TRACE: 'trace', PATH: 'path',
|
||||
ANON_REQ: 'anon-req', UNKNOWN: 'unknown'
|
||||
};
|
||||
@@ -455,6 +455,12 @@
|
||||
return '<span class="' + cls + '" title="Clock skew: ' + window.formatSkew(skewSec) + ' (' + (SKEW_SEVERITY_LABELS[severity] || severity) + ')">' + label + '</span>';
|
||||
};
|
||||
|
||||
/** Compute severity for an observer's clock offset (seconds). */
|
||||
window.observerSkewSeverity = function(offsetSec) {
|
||||
var abs = Math.abs(offsetSec);
|
||||
return abs >= 3600 ? 'critical' : abs >= 300 ? 'warning' : 'ok';
|
||||
};
|
||||
|
||||
/** Render a skew sparkline SVG (inline, word-sized) */
|
||||
window.renderSkewSparkline = function(samples, w, h) {
|
||||
w = w || 120; h = h || 24;
|
||||
|
||||
@@ -211,6 +211,7 @@ async function run() {
|
||||
// Test 2: Nodes page loads with data
|
||||
await test('Nodes page loads with data', async () => {
|
||||
await page.goto(`${BASE}/#/nodes`, { waitUntil: 'domcontentloaded' });
|
||||
await page.waitForSelector('[data-loaded="true"]', { timeout: 15000 });
|
||||
await page.waitForSelector('table tbody tr');
|
||||
const headers = await page.$$eval('th', els => els.map(e => e.textContent.trim()));
|
||||
for (const col of ['Name', 'Public Key', 'Role']) {
|
||||
@@ -236,6 +237,7 @@ async function run() {
|
||||
// Test: Node side panel Details link navigates to full detail page (#778)
|
||||
await test('Node side panel Details link navigates', async () => {
|
||||
await page.goto(`${BASE}/#/nodes`, { waitUntil: 'domcontentloaded' });
|
||||
await page.waitForSelector('[data-loaded="true"]', { timeout: 15000 });
|
||||
await page.waitForSelector('table tbody tr');
|
||||
await page.click('table tbody tr');
|
||||
await page.waitForSelector('.node-detail');
|
||||
@@ -257,6 +259,7 @@ async function run() {
|
||||
// Test: Nodes page has WebSocket auto-update listener (#131)
|
||||
await test('Nodes page has WebSocket auto-update', async () => {
|
||||
await page.goto(`${BASE}/#/nodes`, { waitUntil: 'domcontentloaded' });
|
||||
await page.waitForSelector('[data-loaded="true"]', { timeout: 15000 });
|
||||
await page.waitForSelector('table tbody tr');
|
||||
// The live dot in navbar indicates WS connection status
|
||||
const liveDot = await page.$('#liveDot');
|
||||
@@ -282,11 +285,12 @@ async function run() {
|
||||
// Test 3: Map page loads with markers
|
||||
await test('Map page loads with markers', async () => {
|
||||
await page.goto(`${BASE}/#/map`, { waitUntil: 'domcontentloaded' });
|
||||
await page.waitForSelector('[data-loaded="true"]', { timeout: 15000 });
|
||||
await page.waitForSelector('.leaflet-container');
|
||||
await page.waitForSelector('.leaflet-tile-loaded');
|
||||
// Wait for markers/overlays to render (may not exist with empty DB)
|
||||
try {
|
||||
await page.waitForSelector('.leaflet-marker-icon, .leaflet-interactive, circle, .marker-cluster, .leaflet-marker-pane > *, .leaflet-overlay-pane svg path, .leaflet-overlay-pane svg circle', { timeout: 3000 });
|
||||
await page.waitForSelector('.leaflet-marker-icon, .leaflet-interactive, circle, .marker-cluster, .leaflet-marker-pane > *, .leaflet-overlay-pane svg path, .leaflet-overlay-pane svg circle', { timeout: 8000 });
|
||||
} catch (_) {
|
||||
// No markers with empty DB \u2014 assertion below handles it
|
||||
}
|
||||
@@ -362,7 +366,7 @@ async function run() {
|
||||
await page.waitForSelector('.leaflet-container');
|
||||
// Wait for markers (may not exist with empty DB)
|
||||
try {
|
||||
await page.waitForSelector('.leaflet-marker-icon, .leaflet-interactive', { timeout: 3000 });
|
||||
await page.waitForSelector('.leaflet-marker-icon, .leaflet-interactive', { timeout: 8000 });
|
||||
} catch (_) {
|
||||
// No markers with empty DB
|
||||
}
|
||||
@@ -394,6 +398,7 @@ async function run() {
|
||||
await page.goto(`${BASE}/#/packets`, { waitUntil: 'domcontentloaded' });
|
||||
await page.evaluate(() => localStorage.setItem('meshcore-time-window', '525600'));
|
||||
await page.reload({ waitUntil: 'load' });
|
||||
await page.waitForSelector('[data-loaded="true"]', { timeout: 15000 });
|
||||
await page.waitForSelector('table tbody tr', { timeout: 15000 });
|
||||
const rowsBefore = await page.$$('table tbody tr');
|
||||
assert(rowsBefore.length > 0, 'No packets visible');
|
||||
|
||||
Executable
+43
@@ -0,0 +1,43 @@
|
||||
#!/usr/bin/env bash
|
||||
# freshen-fixture.sh — Shift all timestamps in the fixture DB to be relative to now.
|
||||
# Preserves the relative ordering between timestamps.
|
||||
# Usage: bash tools/freshen-fixture.sh <path-to-fixture.db>
|
||||
set -euo pipefail
|
||||
|
||||
DB="${1:?Usage: freshen-fixture.sh <path-to-fixture.db>}"
|
||||
|
||||
if [ ! -f "$DB" ]; then
|
||||
echo "ERROR: DB file not found: $DB" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Find the max timestamp across all time columns, compute offset, shift everything forward.
|
||||
sqlite3 "$DB" <<'SQL'
|
||||
-- Shift all timestamps forward so the newest is ~now, preserving relative ordering.
|
||||
-- Use strftime to produce RFC3339 format (T separator + Z suffix) for correct comparison.
|
||||
UPDATE nodes SET last_seen = strftime('%Y-%m-%dT%H:%M:%SZ', last_seen,
|
||||
(SELECT printf('+%d seconds', CAST((julianday('now') - julianday(MAX(last_seen))) * 86400 AS INTEGER)) FROM nodes)
|
||||
) WHERE last_seen IS NOT NULL;
|
||||
|
||||
UPDATE transmissions SET first_seen = strftime('%Y-%m-%dT%H:%M:%SZ', first_seen,
|
||||
(SELECT printf('+%d seconds', CAST((julianday('now') - julianday(MAX(first_seen))) * 86400 AS INTEGER)) FROM transmissions)
|
||||
) WHERE first_seen IS NOT NULL;
|
||||
|
||||
-- Observers: shift last_seen too so they don't get auto-pruned by RemoveStaleObservers
|
||||
-- on server startup (default 14d threshold marks all >14d observers inactive=1, which
|
||||
-- the /api/observers filter then excludes — leaving the map page with no observer markers).
|
||||
UPDATE observers SET last_seen = strftime('%Y-%m-%dT%H:%M:%SZ', last_seen,
|
||||
(SELECT printf('+%d seconds', CAST((julianday('now') - julianday(MAX(last_seen))) * 86400 AS INTEGER)) FROM observers)
|
||||
) WHERE last_seen IS NOT NULL;
|
||||
SQL
|
||||
|
||||
# Defensive: clear any stale inactive=1 flags. Column may not exist on fresh fixtures
|
||||
# (added by server migration on first startup); silently no-op if missing.
|
||||
sqlite3 "$DB" "UPDATE observers SET inactive = 0 WHERE inactive = 1;" 2>/dev/null || true
|
||||
|
||||
# neighbor_edges may not exist in all fixture versions
|
||||
sqlite3 "$DB" "UPDATE neighbor_edges SET last_seen = strftime('%Y-%m-%dT%H:%M:%SZ', last_seen, (SELECT printf('+%d seconds', CAST((julianday('now') - julianday(MAX(last_seen))) * 86400 AS INTEGER)) FROM neighbor_edges)) WHERE last_seen IS NOT NULL;" 2>/dev/null || true
|
||||
|
||||
echo "Fixture timestamps freshened in $DB"
|
||||
sqlite3 "$DB" "SELECT 'nodes: min=' || MIN(last_seen) || ' max=' || MAX(last_seen) FROM nodes;"
|
||||
sqlite3 "$DB" "SELECT 'observers: count=' || COUNT(*) || ' max=' || MAX(last_seen) FROM observers;"
|
||||
Reference in New Issue
Block a user