mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-05-14 00:25:03 +00:00
Compare commits
41 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 37a2b71fc7 | |||
| 9ba45e6fb4 | |||
| 33198b8012 | |||
| 60893b6418 | |||
| 7ac36178d6 | |||
| f56f1368be | |||
| cfb7b7297d | |||
| c67f3347ce | |||
| b3a9677c52 | |||
| 707228ad91 | |||
| 8d379baf5e | |||
| 3b436c768b | |||
| 6d49cf939c | |||
| 8d39b33111 | |||
| e1a1be1735 | |||
| b97fe5758c | |||
| 568de4b441 | |||
| 04c8558768 | |||
| 52b5ae86d6 | |||
| 8397f2bb1c | |||
| ed65498281 | |||
| c53af5cf66 | |||
| 9f606600e2 | |||
| 053aef1994 | |||
| 7aef3c355c | |||
| 9ac484607f | |||
| b562de32ff | |||
| 6f0c58c94a | |||
| 7d1c679f4f | |||
| ead08c721d | |||
| 57e272494d | |||
| d870a693d0 | |||
| d9904cc138 | |||
| 7aa59eabde | |||
| ac7d2b64f7 | |||
| fd3bf1a892 | |||
| f16afe7fdf | |||
| ed66e54e57 | |||
| 22079a1fc4 | |||
| 232882d308 | |||
| fb640bcfc3 |
@@ -1 +1 @@
|
||||
{"schemaVersion":1,"label":"frontend coverage","message":"36.12%","color":"red"}
|
||||
{"schemaVersion":1,"label":"frontend coverage","message":"40.21%","color":"red"}
|
||||
|
||||
@@ -176,6 +176,9 @@ jobs:
|
||||
- name: Instrument frontend JS for coverage
|
||||
run: sh scripts/instrument-frontend.sh
|
||||
|
||||
- name: Freshen fixture timestamps
|
||||
run: bash tools/freshen-fixture.sh test-fixtures/e2e-fixture.db
|
||||
|
||||
- name: Start Go server with fixture DB
|
||||
run: |
|
||||
fuser -k 13581/tcp 2>/dev/null || true
|
||||
@@ -183,7 +186,7 @@ jobs:
|
||||
./corescope-server -port 13581 -db test-fixtures/e2e-fixture.db -public public-instrumented &
|
||||
echo $! > .server.pid
|
||||
for i in $(seq 1 30); do
|
||||
if curl -sf http://localhost:13581/api/stats > /dev/null 2>&1; then
|
||||
if curl -sf http://localhost:13581/api/healthz > /dev/null 2>&1; then
|
||||
echo "Server ready after ${i}s"
|
||||
break
|
||||
fi
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/meshcore-analyzer/geofilter"
|
||||
)
|
||||
@@ -42,6 +43,15 @@ type Config struct {
|
||||
GeoFilter *GeoFilterConfig `json:"geo_filter,omitempty"`
|
||||
ValidateSignatures *bool `json:"validateSignatures,omitempty"`
|
||||
DB *DBConfig `json:"db,omitempty"`
|
||||
|
||||
// ObserverBlacklist is a list of observer public keys to drop at ingest.
|
||||
// Messages from blacklisted observers are silently discarded — no DB writes,
|
||||
// no UpsertObserver, no observations, no metrics.
|
||||
ObserverBlacklist []string `json:"observerBlacklist,omitempty"`
|
||||
|
||||
// obsBlacklistSetCached is the lazily-built lowercase set for O(1) lookups.
|
||||
obsBlacklistSetCached map[string]bool
|
||||
obsBlacklistOnce sync.Once
|
||||
}
|
||||
|
||||
// GeoFilterConfig is an alias for the shared geofilter.Config type.
|
||||
@@ -114,6 +124,24 @@ func (c *Config) ObserverDaysOrDefault() int {
|
||||
return 14
|
||||
}
|
||||
|
||||
// IsObserverBlacklisted returns true if the given observer ID is in the observerBlacklist.
|
||||
func (c *Config) IsObserverBlacklisted(id string) bool {
|
||||
if c == nil || len(c.ObserverBlacklist) == 0 {
|
||||
return false
|
||||
}
|
||||
c.obsBlacklistOnce.Do(func() {
|
||||
m := make(map[string]bool, len(c.ObserverBlacklist))
|
||||
for _, pk := range c.ObserverBlacklist {
|
||||
trimmed := strings.ToLower(strings.TrimSpace(pk))
|
||||
if trimmed != "" {
|
||||
m[trimmed] = true
|
||||
}
|
||||
}
|
||||
c.obsBlacklistSetCached = m
|
||||
})
|
||||
return c.obsBlacklistSetCached[strings.ToLower(strings.TrimSpace(id))]
|
||||
}
|
||||
|
||||
// LoadConfig reads configuration from a JSON file, with env var overrides.
|
||||
// If the config file does not exist, sensible defaults are used (zero-config startup).
|
||||
func LoadConfig(path string) (*Config, error) {
|
||||
|
||||
+27
-4
@@ -116,7 +116,8 @@ func applySchema(db *sql.DB) error {
|
||||
battery_mv INTEGER,
|
||||
uptime_secs INTEGER,
|
||||
noise_floor REAL,
|
||||
inactive INTEGER DEFAULT 0
|
||||
inactive INTEGER DEFAULT 0,
|
||||
last_packet_at TEXT DEFAULT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen);
|
||||
@@ -421,6 +422,28 @@ func applySchema(db *sql.DB) error {
|
||||
log.Println("[migration] observations.raw_hex column added")
|
||||
}
|
||||
|
||||
// Migration: add last_packet_at column to observers (#last-packet-at)
|
||||
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'observers_last_packet_at_v1'")
|
||||
if row.Scan(&migDone) != nil {
|
||||
log.Println("[migration] Adding last_packet_at column to observers...")
|
||||
_, alterErr := db.Exec(`ALTER TABLE observers ADD COLUMN last_packet_at TEXT DEFAULT NULL`)
|
||||
if alterErr != nil && !strings.Contains(alterErr.Error(), "duplicate column") {
|
||||
return fmt.Errorf("observers last_packet_at ALTER: %w", alterErr)
|
||||
}
|
||||
// Backfill: set last_packet_at = last_seen only for observers that actually have
|
||||
// observation rows (packet_count alone is unreliable — UpsertObserver sets it to 1
|
||||
// on INSERT even for status-only observers).
|
||||
res, err := db.Exec(`UPDATE observers SET last_packet_at = last_seen
|
||||
WHERE last_packet_at IS NULL
|
||||
AND rowid IN (SELECT DISTINCT observer_idx FROM observations WHERE observer_idx IS NOT NULL)`)
|
||||
if err == nil {
|
||||
n, _ := res.RowsAffected()
|
||||
log.Printf("[migration] Backfilled last_packet_at for %d observers with packets", n)
|
||||
}
|
||||
db.Exec(`INSERT INTO _migrations (name) VALUES ('observers_last_packet_at_v1')`)
|
||||
log.Println("[migration] observers.last_packet_at column added")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -504,7 +527,7 @@ func (s *Store) prepareStatements() error {
|
||||
return err
|
||||
}
|
||||
|
||||
s.stmtUpdateObserverLastSeen, err = s.db.Prepare("UPDATE observers SET last_seen = ? WHERE rowid = ?")
|
||||
s.stmtUpdateObserverLastSeen, err = s.db.Prepare("UPDATE observers SET last_seen = ?, last_packet_at = ? WHERE rowid = ?")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -583,9 +606,9 @@ func (s *Store) InsertTransmission(data *PacketData) (bool, error) {
|
||||
err := s.stmtGetObserverRowid.QueryRow(data.ObserverID).Scan(&rowid)
|
||||
if err == nil {
|
||||
observerIdx = &rowid
|
||||
// Update observer last_seen on every packet to prevent
|
||||
// Update observer last_seen and last_packet_at on every packet to prevent
|
||||
// low-traffic observers from appearing offline (#463)
|
||||
_, _ = s.stmtUpdateObserverLastSeen.Exec(now, rowid)
|
||||
_, _ = s.stmtUpdateObserverLastSeen.Exec(now, now, rowid)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -569,6 +569,61 @@ func TestInsertTransmissionUpdatesObserverLastSeen(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLastPacketAtUpdatedOnPacketOnly(t *testing.T) {
|
||||
s, err := OpenStore(tempDBPath(t))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
// Insert observer via status path — last_packet_at should be NULL
|
||||
if err := s.UpsertObserver("obs1", "Observer1", "SJC", nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var lastPacketAt sql.NullString
|
||||
s.db.QueryRow("SELECT last_packet_at FROM observers WHERE id = ?", "obs1").Scan(&lastPacketAt)
|
||||
if lastPacketAt.Valid {
|
||||
t.Fatalf("expected last_packet_at to be NULL after UpsertObserver, got %s", lastPacketAt.String)
|
||||
}
|
||||
|
||||
// Insert a packet from this observer — last_packet_at should be set
|
||||
data := &PacketData{
|
||||
RawHex: "0A00D69F",
|
||||
Timestamp: "2026-04-24T12:00:00Z",
|
||||
ObserverID: "obs1",
|
||||
Hash: "lastpackettest123456",
|
||||
RouteType: 2,
|
||||
PayloadType: 2,
|
||||
PathJSON: "[]",
|
||||
DecodedJSON: `{"type":"TXT_MSG"}`,
|
||||
}
|
||||
if _, err := s.InsertTransmission(data); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s.db.QueryRow("SELECT last_packet_at FROM observers WHERE id = ?", "obs1").Scan(&lastPacketAt)
|
||||
if !lastPacketAt.Valid {
|
||||
t.Fatal("expected last_packet_at to be non-NULL after InsertTransmission")
|
||||
}
|
||||
// InsertTransmission uses `now = data.Timestamp || time.Now()`, so last_packet_at
|
||||
// should match the packet's Timestamp when provided (same source-of-truth as last_seen).
|
||||
if lastPacketAt.String != "2026-04-24T12:00:00Z" {
|
||||
t.Errorf("expected last_packet_at=2026-04-24T12:00:00Z, got %s", lastPacketAt.String)
|
||||
}
|
||||
|
||||
// UpsertObserver again (status path) — last_packet_at should NOT change
|
||||
if err := s.UpsertObserver("obs1", "Observer1", "SJC", nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var lastPacketAtAfterStatus sql.NullString
|
||||
s.db.QueryRow("SELECT last_packet_at FROM observers WHERE id = ?", "obs1").Scan(&lastPacketAtAfterStatus)
|
||||
if !lastPacketAtAfterStatus.Valid || lastPacketAtAfterStatus.String != lastPacketAt.String {
|
||||
t.Errorf("UpsertObserver should not change last_packet_at; expected %s, got %v", lastPacketAt.String, lastPacketAtAfterStatus)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEndToEndIngest(t *testing.T) {
|
||||
s, err := OpenStore(tempDBPath(t))
|
||||
if err != nil {
|
||||
|
||||
@@ -240,6 +240,13 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message,
|
||||
return
|
||||
}
|
||||
|
||||
// Observer blacklist: drop ALL messages from blacklisted observers before any
|
||||
// DB writes (status, metrics, packets). Trumps IATA filter.
|
||||
if len(parts) > 2 && cfg.IsObserverBlacklisted(parts[2]) {
|
||||
log.Printf("MQTT [%s] observer %.8s blacklisted, dropping", tag, parts[2])
|
||||
return
|
||||
}
|
||||
|
||||
// Status topic: meshcore/<region>/<observer_id>/status
|
||||
// IATA filter does NOT apply here — observer metadata (noise_floor, battery, etc.)
|
||||
// is region-independent and should be accepted from all observers regardless of
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestIngestorIsObserverBlacklisted(t *testing.T) {
|
||||
cfg := &Config{
|
||||
ObserverBlacklist: []string{"OBS1", "obs2"},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
id string
|
||||
want bool
|
||||
}{
|
||||
{"OBS1", true},
|
||||
{"obs1", true},
|
||||
{"OBS2", true},
|
||||
{"obs3", false},
|
||||
{"", false},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
got := cfg.IsObserverBlacklisted(tt.id)
|
||||
if got != tt.want {
|
||||
t.Errorf("IsObserverBlacklisted(%q) = %v, want %v", tt.id, got, tt.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngestorIsObserverBlacklistedEmpty(t *testing.T) {
|
||||
cfg := &Config{}
|
||||
if cfg.IsObserverBlacklisted("anything") {
|
||||
t.Error("empty blacklist should not match")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngestorIsObserverBlacklistedNil(t *testing.T) {
|
||||
var cfg *Config
|
||||
if cfg.IsObserverBlacklisted("anything") {
|
||||
t.Error("nil config should not match")
|
||||
}
|
||||
}
|
||||
@@ -72,6 +72,15 @@ type Config struct {
|
||||
|
||||
DebugAffinity bool `json:"debugAffinity,omitempty"`
|
||||
|
||||
// ObserverBlacklist is a list of observer public keys to exclude from API
|
||||
// responses (defense in depth — ingestor drops at ingest, server filters
|
||||
// any that slipped through from a prior unblocked window).
|
||||
ObserverBlacklist []string `json:"observerBlacklist,omitempty"`
|
||||
|
||||
// obsBlacklistSetCached is the lazily-built set version of ObserverBlacklist.
|
||||
obsBlacklistSetCached map[string]bool
|
||||
obsBlacklistOnce sync.Once
|
||||
|
||||
ResolvedPath *ResolvedPathConfig `json:"resolvedPath,omitempty"`
|
||||
NeighborGraph *NeighborGraphConfig `json:"neighborGraph,omitempty"`
|
||||
}
|
||||
@@ -404,3 +413,29 @@ func (c *Config) IsBlacklisted(pubkey string) bool {
|
||||
}
|
||||
return c.blacklistSet()[strings.ToLower(strings.TrimSpace(pubkey))]
|
||||
}
|
||||
|
||||
// obsBlacklistSet lazily builds and caches the observerBlacklist as a set for O(1) lookups.
|
||||
func (c *Config) obsBlacklistSet() map[string]bool {
|
||||
c.obsBlacklistOnce.Do(func() {
|
||||
if len(c.ObserverBlacklist) == 0 {
|
||||
return
|
||||
}
|
||||
m := make(map[string]bool, len(c.ObserverBlacklist))
|
||||
for _, pk := range c.ObserverBlacklist {
|
||||
trimmed := strings.ToLower(strings.TrimSpace(pk))
|
||||
if trimmed != "" {
|
||||
m[trimmed] = true
|
||||
}
|
||||
}
|
||||
c.obsBlacklistSetCached = m
|
||||
})
|
||||
return c.obsBlacklistSetCached
|
||||
}
|
||||
|
||||
// IsObserverBlacklisted returns true if the given observer ID is in the observerBlacklist.
|
||||
func (c *Config) IsObserverBlacklisted(id string) bool {
|
||||
if c == nil || len(c.ObserverBlacklist) == 0 {
|
||||
return false
|
||||
}
|
||||
return c.obsBlacklistSet()[strings.ToLower(strings.TrimSpace(id))]
|
||||
}
|
||||
|
||||
@@ -35,7 +35,8 @@ func setupTestDBv2(t *testing.T) *DB {
|
||||
CREATE TABLE observers (
|
||||
id TEXT PRIMARY KEY, name TEXT, iata TEXT, last_seen TEXT, first_seen TEXT,
|
||||
packet_count INTEGER DEFAULT 0, model TEXT, firmware TEXT,
|
||||
client_version TEXT, radio TEXT, battery_mv INTEGER, uptime_secs INTEGER, noise_floor REAL
|
||||
client_version TEXT, radio TEXT, battery_mv INTEGER, uptime_secs INTEGER, noise_floor REAL,
|
||||
inactive INTEGER DEFAULT 0
|
||||
);
|
||||
CREATE TABLE transmissions (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT, raw_hex TEXT NOT NULL,
|
||||
|
||||
+7
-6
@@ -170,6 +170,7 @@ type Observer struct {
|
||||
BatteryMv *int `json:"battery_mv"`
|
||||
UptimeSecs *int64 `json:"uptime_secs"`
|
||||
NoiseFloor *float64 `json:"noise_floor"`
|
||||
LastPacketAt *string `json:"last_packet_at"`
|
||||
}
|
||||
|
||||
// Transmission represents a row from the transmissions table.
|
||||
@@ -231,7 +232,7 @@ func (db *DB) GetStats() (*Stats, error) {
|
||||
sevenDaysAgo := time.Now().Add(-7 * 24 * time.Hour).Format(time.RFC3339)
|
||||
db.conn.QueryRow("SELECT COUNT(*) FROM nodes WHERE last_seen > ?", sevenDaysAgo).Scan(&s.TotalNodes)
|
||||
db.conn.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&s.TotalNodesAllTime)
|
||||
db.conn.QueryRow("SELECT COUNT(*) FROM observers").Scan(&s.TotalObservers)
|
||||
db.conn.QueryRow("SELECT COUNT(*) FROM observers WHERE inactive IS NULL OR inactive = 0").Scan(&s.TotalObservers)
|
||||
|
||||
oneHourAgo := time.Now().Add(-1 * time.Hour).Unix()
|
||||
db.conn.QueryRow("SELECT COUNT(*) FROM observations WHERE timestamp > ?", oneHourAgo).Scan(&s.PacketsLastHour)
|
||||
@@ -970,9 +971,9 @@ func (db *DB) getObservationsForTransmissions(txIDs []int) map[int][]map[string]
|
||||
return result
|
||||
}
|
||||
|
||||
// GetObservers returns all observers sorted by last_seen DESC.
|
||||
// GetObservers returns active observers (not soft-deleted) sorted by last_seen DESC.
|
||||
func (db *DB) GetObservers() ([]Observer, error) {
|
||||
rows, err := db.conn.Query("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor FROM observers ORDER BY last_seen DESC")
|
||||
rows, err := db.conn.Query("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor, last_packet_at FROM observers WHERE inactive IS NULL OR inactive = 0 ORDER BY last_seen DESC")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -983,7 +984,7 @@ func (db *DB) GetObservers() ([]Observer, error) {
|
||||
var o Observer
|
||||
var batteryMv, uptimeSecs sql.NullInt64
|
||||
var noiseFloor sql.NullFloat64
|
||||
if err := rows.Scan(&o.ID, &o.Name, &o.IATA, &o.LastSeen, &o.FirstSeen, &o.PacketCount, &o.Model, &o.Firmware, &o.ClientVersion, &o.Radio, &batteryMv, &uptimeSecs, &noiseFloor); err != nil {
|
||||
if err := rows.Scan(&o.ID, &o.Name, &o.IATA, &o.LastSeen, &o.FirstSeen, &o.PacketCount, &o.Model, &o.Firmware, &o.ClientVersion, &o.Radio, &batteryMv, &uptimeSecs, &noiseFloor, &o.LastPacketAt); err != nil {
|
||||
continue
|
||||
}
|
||||
if batteryMv.Valid {
|
||||
@@ -1006,8 +1007,8 @@ func (db *DB) GetObserverByID(id string) (*Observer, error) {
|
||||
var o Observer
|
||||
var batteryMv, uptimeSecs sql.NullInt64
|
||||
var noiseFloor sql.NullFloat64
|
||||
err := db.conn.QueryRow("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor FROM observers WHERE id = ?", id).
|
||||
Scan(&o.ID, &o.Name, &o.IATA, &o.LastSeen, &o.FirstSeen, &o.PacketCount, &o.Model, &o.Firmware, &o.ClientVersion, &o.Radio, &batteryMv, &uptimeSecs, &noiseFloor)
|
||||
err := db.conn.QueryRow("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor, last_packet_at FROM observers WHERE id = ?", id).
|
||||
Scan(&o.ID, &o.Name, &o.IATA, &o.LastSeen, &o.FirstSeen, &o.PacketCount, &o.Model, &o.Firmware, &o.ClientVersion, &o.Radio, &batteryMv, &uptimeSecs, &noiseFloor, &o.LastPacketAt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
+76
-2
@@ -48,7 +48,9 @@ func setupTestDB(t *testing.T) *DB {
|
||||
radio TEXT,
|
||||
battery_mv INTEGER,
|
||||
uptime_secs INTEGER,
|
||||
noise_floor REAL
|
||||
noise_floor REAL,
|
||||
inactive INTEGER DEFAULT 0,
|
||||
last_packet_at TEXT DEFAULT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE transmissions (
|
||||
@@ -355,6 +357,35 @@ func TestGetObservers(t *testing.T) {
|
||||
if observers[0].ID != "obs1" {
|
||||
t.Errorf("expected obs1 first (most recent), got %s", observers[0].ID)
|
||||
}
|
||||
// last_packet_at should be nil since seedTestData doesn't set it
|
||||
if observers[0].LastPacketAt != nil {
|
||||
t.Errorf("expected nil LastPacketAt for obs1 from seed, got %v", *observers[0].LastPacketAt)
|
||||
}
|
||||
}
|
||||
|
||||
// Regression: GetObservers must exclude soft-deleted (inactive=1) rows.
|
||||
// Stale observers were appearing in /api/observers despite the auto-prune
|
||||
// marking them inactive, because the SELECT query had no WHERE filter.
|
||||
func TestGetObservers_ExcludesInactive(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
seedTestData(t, db)
|
||||
// Mark obs2 inactive — soft delete simulating a stale-observer prune.
|
||||
if _, err := db.conn.Exec(`UPDATE observers SET inactive = 1 WHERE id = ?`, "obs2"); err != nil {
|
||||
t.Fatalf("update inactive: %v", err)
|
||||
}
|
||||
observers, err := db.GetObservers()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(observers) != 1 {
|
||||
t.Errorf("expected 1 observer (obs1) after marking obs2 inactive, got %d", len(observers))
|
||||
}
|
||||
for _, o := range observers {
|
||||
if o.ID == "obs2" {
|
||||
t.Errorf("inactive observer obs2 should be excluded")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetObserverByID(t *testing.T) {
|
||||
@@ -369,6 +400,48 @@ func TestGetObserverByID(t *testing.T) {
|
||||
if obs.ID != "obs1" {
|
||||
t.Errorf("expected obs1, got %s", obs.ID)
|
||||
}
|
||||
// Verify last_packet_at is nil by default
|
||||
if obs.LastPacketAt != nil {
|
||||
t.Errorf("expected nil LastPacketAt, got %v", *obs.LastPacketAt)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetObserverLastPacketAt(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
seedTestData(t, db)
|
||||
|
||||
// Set last_packet_at for obs1
|
||||
ts := "2026-04-24T12:00:00Z"
|
||||
db.conn.Exec(`UPDATE observers SET last_packet_at = ? WHERE id = ?`, ts, "obs1")
|
||||
|
||||
// Verify via GetObservers
|
||||
observers, err := db.GetObservers()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var obs1 *Observer
|
||||
for i := range observers {
|
||||
if observers[i].ID == "obs1" {
|
||||
obs1 = &observers[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
if obs1 == nil {
|
||||
t.Fatal("obs1 not found")
|
||||
}
|
||||
if obs1.LastPacketAt == nil || *obs1.LastPacketAt != ts {
|
||||
t.Errorf("expected LastPacketAt=%s via GetObservers, got %v", ts, obs1.LastPacketAt)
|
||||
}
|
||||
|
||||
// Verify via GetObserverByID
|
||||
obs, err := db.GetObserverByID("obs1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if obs.LastPacketAt == nil || *obs.LastPacketAt != ts {
|
||||
t.Errorf("expected LastPacketAt=%s via GetObserverByID, got %v", ts, obs.LastPacketAt)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetObserverByIDNotFound(t *testing.T) {
|
||||
@@ -1109,7 +1182,8 @@ func setupTestDBV2(t *testing.T) *DB {
|
||||
iata TEXT,
|
||||
last_seen TEXT,
|
||||
first_seen TEXT,
|
||||
packet_count INTEGER DEFAULT 0
|
||||
packet_count INTEGER DEFAULT 0,
|
||||
last_packet_at TEXT DEFAULT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE transmissions (
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// readiness tracks whether background init goroutines have completed.
|
||||
// Set to 1 once store.Load, pickBestObservation, and neighbor graph build are done.
|
||||
var readiness atomic.Int32
|
||||
|
||||
// handleHealthz returns 200 when the server is ready to serve queries,
|
||||
// or 503 while background initialization is still running.
|
||||
func (s *Server) handleHealthz(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
if readiness.Load() == 0 {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"ready": false,
|
||||
"reason": "loading",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
var loadedTx, loadedObs int
|
||||
if s.store != nil {
|
||||
s.store.mu.RLock()
|
||||
loadedTx = len(s.store.packets)
|
||||
for _, p := range s.store.packets {
|
||||
loadedObs += len(p.Observations)
|
||||
}
|
||||
s.store.mu.RUnlock()
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"ready": true,
|
||||
"loadedTx": loadedTx,
|
||||
"loadedObs": loadedObs,
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,80 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestHealthzNotReady(t *testing.T) {
|
||||
// Ensure readiness is 0 (not ready)
|
||||
readiness.Store(0)
|
||||
defer readiness.Store(0)
|
||||
|
||||
srv := &Server{store: &PacketStore{}}
|
||||
req := httptest.NewRequest("GET", "/api/healthz", nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
srv.handleHealthz(w, req)
|
||||
|
||||
if w.Code != http.StatusServiceUnavailable {
|
||||
t.Fatalf("expected 503, got %d", w.Code)
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("invalid JSON: %v", err)
|
||||
}
|
||||
if resp["ready"] != false {
|
||||
t.Fatalf("expected ready=false, got %v", resp["ready"])
|
||||
}
|
||||
if resp["reason"] != "loading" {
|
||||
t.Fatalf("expected reason=loading, got %v", resp["reason"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealthzReady(t *testing.T) {
|
||||
readiness.Store(1)
|
||||
defer readiness.Store(0)
|
||||
|
||||
srv := &Server{store: &PacketStore{}}
|
||||
req := httptest.NewRequest("GET", "/api/healthz", nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
srv.handleHealthz(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d", w.Code)
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("invalid JSON: %v", err)
|
||||
}
|
||||
if resp["ready"] != true {
|
||||
t.Fatalf("expected ready=true, got %v", resp["ready"])
|
||||
}
|
||||
if _, ok := resp["loadedTx"]; !ok {
|
||||
t.Fatal("missing loadedTx field")
|
||||
}
|
||||
if _, ok := resp["loadedObs"]; !ok {
|
||||
t.Fatal("missing loadedObs field")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealthzAntiTautology(t *testing.T) {
|
||||
// When readiness is 0, must NOT return 200
|
||||
readiness.Store(0)
|
||||
defer readiness.Store(0)
|
||||
|
||||
srv := &Server{store: &PacketStore{}}
|
||||
req := httptest.NewRequest("GET", "/api/healthz", nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
srv.handleHealthz(w, req)
|
||||
|
||||
if w.Code == http.StatusOK {
|
||||
t.Fatal("anti-tautology: handler returned 200 when readiness=0; gating is broken")
|
||||
}
|
||||
}
|
||||
@@ -174,6 +174,27 @@ func main() {
|
||||
database.hasResolvedPath = true // detectSchema ran before column was added; fix the flag
|
||||
}
|
||||
|
||||
// Ensure observers.inactive column exists (PR #954 filters on it; ingestor migration
|
||||
// adds it but server may run against DBs ingestor never touched, e.g. e2e fixture).
|
||||
if err := ensureObserverInactiveColumn(dbPath); err != nil {
|
||||
log.Printf("[store] warning: could not add observers.inactive column: %v", err)
|
||||
}
|
||||
|
||||
// Ensure observers.last_packet_at column exists (PR #905 reads it; ingestor migration
|
||||
// adds it but server may run against DBs ingestor never touched, e.g. e2e fixture).
|
||||
if err := ensureLastPacketAtColumn(dbPath); err != nil {
|
||||
log.Printf("[store] warning: could not add observers.last_packet_at column: %v", err)
|
||||
}
|
||||
|
||||
// Soft-delete observers that are in the blacklist (mark inactive=1) so
|
||||
// historical data from a prior unblocked window is hidden too.
|
||||
if len(cfg.ObserverBlacklist) > 0 {
|
||||
softDeleteBlacklistedObservers(dbPath, cfg.ObserverBlacklist)
|
||||
}
|
||||
|
||||
// WaitGroup for background init steps that gate /api/healthz readiness.
|
||||
var initWg sync.WaitGroup
|
||||
|
||||
// Load or build neighbor graph
|
||||
if neighborEdgesTableExists(database.conn) {
|
||||
store.graph = loadNeighborEdgesFromDB(database.conn)
|
||||
@@ -181,7 +202,9 @@ func main() {
|
||||
} else {
|
||||
log.Printf("[neighbor] no persisted edges found, will build in background...")
|
||||
store.graph = NewNeighborGraph() // empty graph — gets populated by background goroutine
|
||||
initWg.Add(1)
|
||||
go func() {
|
||||
defer initWg.Done()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("[neighbor] graph build panic recovered: %v", r)
|
||||
@@ -205,7 +228,9 @@ func main() {
|
||||
// API serves best-effort data until this completes (~10s for 100K txs).
|
||||
// Processes in chunks of 5000, releasing the lock between chunks so API
|
||||
// handlers remain responsive.
|
||||
initWg.Add(1)
|
||||
go func() {
|
||||
defer initWg.Done()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("[store] pickBestObservation panic recovered: %v", r)
|
||||
@@ -233,6 +258,13 @@ func main() {
|
||||
log.Printf("[store] initial pickBestObservation complete (%d transmissions)", totalPackets)
|
||||
}()
|
||||
|
||||
// Mark server ready once all background init completes.
|
||||
go func() {
|
||||
initWg.Wait()
|
||||
readiness.Store(1)
|
||||
log.Printf("[server] readiness: ready=true (background init complete)")
|
||||
}()
|
||||
|
||||
// WebSocket hub
|
||||
hub := NewHub()
|
||||
|
||||
|
||||
@@ -281,6 +281,118 @@ func ensureResolvedPathColumn(dbPath string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ensureObserverInactiveColumn adds the inactive column to observers if missing.
|
||||
// The column was originally added by ingestor migration (cmd/ingestor/db.go:344) to
|
||||
// support soft-delete via RemoveStaleObservers + filtered reads (PR #954). When the
|
||||
// server starts against a DB that was never touched by the ingestor (e.g. the e2e
|
||||
// fixture), the column is missing and read queries that filter on it (GetObservers,
|
||||
// GetStats) silently fail with "no such column: inactive" — leaving /api/observers
|
||||
// returning empty.
|
||||
func ensureObserverInactiveColumn(dbPath string) error {
|
||||
rw, err := openRW(dbPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rw.Close()
|
||||
|
||||
rows, err := rw.Query("PRAGMA table_info(observers)")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var cid int
|
||||
var colName string
|
||||
var colType sql.NullString
|
||||
var notNull, pk int
|
||||
var dflt sql.NullString
|
||||
if rows.Scan(&cid, &colName, &colType, ¬Null, &dflt, &pk) == nil && colName == "inactive" {
|
||||
return nil // already exists
|
||||
}
|
||||
}
|
||||
|
||||
_, err = rw.Exec("ALTER TABLE observers ADD COLUMN inactive INTEGER DEFAULT 0")
|
||||
if err != nil {
|
||||
return fmt.Errorf("add inactive column: %w", err)
|
||||
}
|
||||
log.Println("[store] Added inactive column to observers")
|
||||
return nil
|
||||
}
|
||||
|
||||
// ensureLastPacketAtColumn adds the last_packet_at column to observers if missing.
|
||||
// The column was originally added by ingestor migration (observers_last_packet_at_v1)
|
||||
// to track the most recent packet observation time separately from status updates.
|
||||
// When the server starts against a DB that was never touched by the ingestor (e.g.
|
||||
// the e2e fixture), the column is missing and read queries that reference it
|
||||
// (GetObservers, GetObserverByID) fail with "no such column: last_packet_at".
|
||||
func ensureLastPacketAtColumn(dbPath string) error {
|
||||
rw, err := openRW(dbPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rw.Close()
|
||||
|
||||
rows, err := rw.Query("PRAGMA table_info(observers)")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var cid int
|
||||
var colName string
|
||||
var colType sql.NullString
|
||||
var notNull, pk int
|
||||
var dflt sql.NullString
|
||||
if rows.Scan(&cid, &colName, &colType, ¬Null, &dflt, &pk) == nil && colName == "last_packet_at" {
|
||||
return nil // already exists
|
||||
}
|
||||
}
|
||||
|
||||
_, err = rw.Exec("ALTER TABLE observers ADD COLUMN last_packet_at TEXT")
|
||||
if err != nil {
|
||||
return fmt.Errorf("add last_packet_at column: %w", err)
|
||||
}
|
||||
log.Println("[store] Added last_packet_at column to observers")
|
||||
return nil
|
||||
}
|
||||
|
||||
// softDeleteBlacklistedObservers marks observers matching the blacklist as
|
||||
// inactive=1 so they are hidden from API responses. Runs once at startup.
|
||||
func softDeleteBlacklistedObservers(dbPath string, blacklist []string) {
|
||||
rw, err := openRW(dbPath)
|
||||
if err != nil {
|
||||
log.Printf("[observer-blacklist] warning: could not open DB for soft-delete: %v", err)
|
||||
return
|
||||
}
|
||||
defer rw.Close()
|
||||
|
||||
placeholders := make([]string, 0, len(blacklist))
|
||||
args := make([]interface{}, 0, len(blacklist))
|
||||
for _, pk := range blacklist {
|
||||
trimmed := strings.TrimSpace(pk)
|
||||
if trimmed == "" {
|
||||
continue
|
||||
}
|
||||
placeholders = append(placeholders, "LOWER(?)")
|
||||
args = append(args, trimmed)
|
||||
}
|
||||
if len(placeholders) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
query := "UPDATE observers SET inactive = 1 WHERE LOWER(id) IN (" + strings.Join(placeholders, ",") + ") AND (inactive IS NULL OR inactive = 0)"
|
||||
result, err := rw.Exec(query, args...)
|
||||
if err != nil {
|
||||
log.Printf("[observer-blacklist] warning: soft-delete failed: %v", err)
|
||||
return
|
||||
}
|
||||
if n, _ := result.RowsAffected(); n > 0 {
|
||||
log.Printf("[observer-blacklist] soft-deleted %d blacklisted observer(s)", n)
|
||||
}
|
||||
}
|
||||
|
||||
// resolvePathForObs resolves hop prefixes to full pubkeys for an observation.
|
||||
// Returns nil if path is empty.
|
||||
func resolvePathForObs(pathJSON, observerID string, tx *StoreTx, pm *prefixMap, graph *NeighborGraph) []*string {
|
||||
|
||||
@@ -538,3 +538,62 @@ func TestOpenRW_BusyTimeout(t *testing.T) {
|
||||
t.Errorf("expected busy_timeout=5000, got %d", timeout)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnsureLastPacketAtColumn(t *testing.T) {
|
||||
// Create a temp DB with observers table missing last_packet_at
|
||||
dir := t.TempDir()
|
||||
dbPath := dir + "/test.db"
|
||||
db, err := sql.Open("sqlite", dbPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = db.Exec(`CREATE TABLE observers (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT,
|
||||
last_seen TEXT,
|
||||
lat REAL,
|
||||
lon REAL,
|
||||
inactive INTEGER DEFAULT 0
|
||||
)`)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
db.Close()
|
||||
|
||||
// First call: should add the column
|
||||
if err := ensureLastPacketAtColumn(dbPath); err != nil {
|
||||
t.Fatalf("first call failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify column exists
|
||||
db2, err := sql.Open("sqlite", dbPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer db2.Close()
|
||||
|
||||
var found bool
|
||||
rows, err := db2.Query("PRAGMA table_info(observers)")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var cid int
|
||||
var colName string
|
||||
var colType sql.NullString
|
||||
var notNull, pk int
|
||||
var dflt sql.NullString
|
||||
if rows.Scan(&cid, &colName, &colType, ¬Null, &dflt, &pk) == nil && colName == "last_packet_at" {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatal("last_packet_at column not found after migration")
|
||||
}
|
||||
|
||||
// Idempotency: second call should succeed without error
|
||||
if err := ensureLastPacketAtColumn(dbPath); err != nil {
|
||||
t.Fatalf("idempotent call failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,159 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestConfigIsObserverBlacklisted(t *testing.T) {
|
||||
cfg := &Config{
|
||||
ObserverBlacklist: []string{"OBS1", "obs2", " Obs3 "},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
id string
|
||||
want bool
|
||||
}{
|
||||
{"OBS1", true},
|
||||
{"obs1", true}, // case-insensitive
|
||||
{"OBS2", true},
|
||||
{"Obs3", true}, // whitespace trimmed
|
||||
{"obs4", false},
|
||||
{"", false},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
got := cfg.IsObserverBlacklisted(tt.id)
|
||||
if got != tt.want {
|
||||
t.Errorf("IsObserverBlacklisted(%q) = %v, want %v", tt.id, got, tt.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigIsObserverBlacklistedEmpty(t *testing.T) {
|
||||
cfg := &Config{}
|
||||
if cfg.IsObserverBlacklisted("anything") {
|
||||
t.Error("empty blacklist should not match anything")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigIsObserverBlacklistedNil(t *testing.T) {
|
||||
var cfg *Config
|
||||
if cfg.IsObserverBlacklisted("anything") {
|
||||
t.Error("nil config should not match anything")
|
||||
}
|
||||
}
|
||||
|
||||
func TestObserverBlacklistFiltersHandleObservers(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
db.conn.Exec("INSERT OR IGNORE INTO observers (id, name, iata, last_seen) VALUES ('goodobs', 'GoodObs', 'SFO', datetime('now'))")
|
||||
db.conn.Exec("INSERT OR IGNORE INTO observers (id, name, iata, last_seen) VALUES ('badobs', 'BadObs', 'LAX', datetime('now'))")
|
||||
|
||||
cfg := &Config{
|
||||
ObserverBlacklist: []string{"badobs"},
|
||||
}
|
||||
srv := NewServer(db, cfg, NewHub())
|
||||
srv.RegisterRoutes(setupTestRouter(srv))
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/observers", nil)
|
||||
w := httptest.NewRecorder()
|
||||
srv.router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp ObserverListResponse
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to parse response: %v", err)
|
||||
}
|
||||
|
||||
for _, obs := range resp.Observers {
|
||||
if obs.ID == "badobs" {
|
||||
t.Error("blacklisted observer should not appear in observers list")
|
||||
}
|
||||
}
|
||||
|
||||
foundGood := false
|
||||
for _, obs := range resp.Observers {
|
||||
if obs.ID == "goodobs" {
|
||||
foundGood = true
|
||||
}
|
||||
}
|
||||
if !foundGood {
|
||||
t.Error("non-blacklisted observer should appear in observers list")
|
||||
}
|
||||
}
|
||||
|
||||
func TestObserverBlacklistFiltersObserverDetail(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
db.conn.Exec("INSERT OR IGNORE INTO observers (id, name, iata, last_seen) VALUES ('badobs', 'BadObs', 'LAX', datetime('now'))")
|
||||
|
||||
cfg := &Config{
|
||||
ObserverBlacklist: []string{"badobs"},
|
||||
}
|
||||
srv := NewServer(db, cfg, NewHub())
|
||||
srv.RegisterRoutes(setupTestRouter(srv))
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/observers/badobs", nil)
|
||||
w := httptest.NewRecorder()
|
||||
srv.router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404 for blacklisted observer detail, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNoObserverBlacklistPassesAll(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
db.conn.Exec("INSERT OR IGNORE INTO observers (id, name, iata, last_seen) VALUES ('someobs', 'SomeObs', 'SFO', datetime('now'))")
|
||||
|
||||
cfg := &Config{}
|
||||
srv := NewServer(db, cfg, NewHub())
|
||||
srv.RegisterRoutes(setupTestRouter(srv))
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/observers", nil)
|
||||
w := httptest.NewRecorder()
|
||||
srv.router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d", w.Code)
|
||||
}
|
||||
|
||||
var resp ObserverListResponse
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to parse response: %v", err)
|
||||
}
|
||||
|
||||
foundSome := false
|
||||
for _, obs := range resp.Observers {
|
||||
if obs.ID == "someobs" {
|
||||
foundSome = true
|
||||
}
|
||||
}
|
||||
if !foundSome {
|
||||
t.Error("without blacklist, observer should appear")
|
||||
}
|
||||
}
|
||||
|
||||
func TestObserverBlacklistConcurrent(t *testing.T) {
|
||||
cfg := &Config{
|
||||
ObserverBlacklist: []string{"AA", "BB", "CC"},
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
for i := 0; i < 50; i++ {
|
||||
go func() {
|
||||
defer func() { done <- struct{}{} }()
|
||||
for j := 0; j < 100; j++ {
|
||||
cfg.IsObserverBlacklisted("AA")
|
||||
cfg.IsObserverBlacklisted("DD")
|
||||
}
|
||||
}()
|
||||
}
|
||||
for i := 0; i < 50; i++ {
|
||||
<-done
|
||||
}
|
||||
}
|
||||
@@ -118,6 +118,9 @@ func (s *Server) RegisterRoutes(r *mux.Router) {
|
||||
r.HandleFunc("/api/config/map", s.handleConfigMap).Methods("GET")
|
||||
r.HandleFunc("/api/config/geo-filter", s.handleConfigGeoFilter).Methods("GET")
|
||||
|
||||
// Readiness endpoint (gated on background init completion)
|
||||
r.HandleFunc("/api/healthz", s.handleHealthz).Methods("GET")
|
||||
|
||||
// System endpoints
|
||||
r.HandleFunc("/api/health", s.handleHealth).Methods("GET")
|
||||
r.HandleFunc("/api/stats", s.handleStats).Methods("GET")
|
||||
@@ -1929,6 +1932,10 @@ func (s *Server) handleObservers(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
result := make([]ObserverResp, 0, len(observers))
|
||||
for _, o := range observers {
|
||||
// Defense in depth: skip observers that are in the blacklist
|
||||
if s.cfg != nil && s.cfg.IsObserverBlacklisted(o.ID) {
|
||||
continue
|
||||
}
|
||||
plh := 0
|
||||
if c, ok := pktCounts[o.ID]; ok {
|
||||
plh = c
|
||||
@@ -1948,6 +1955,7 @@ func (s *Server) handleObservers(w http.ResponseWriter, r *http.Request) {
|
||||
ClientVersion: o.ClientVersion, Radio: o.Radio,
|
||||
BatteryMv: o.BatteryMv, UptimeSecs: o.UptimeSecs,
|
||||
NoiseFloor: o.NoiseFloor,
|
||||
LastPacketAt: o.LastPacketAt,
|
||||
PacketsLastHour: plh,
|
||||
Lat: lat, Lon: lon, NodeRole: nodeRole,
|
||||
})
|
||||
@@ -1960,6 +1968,13 @@ func (s *Server) handleObservers(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
func (s *Server) handleObserverDetail(w http.ResponseWriter, r *http.Request) {
|
||||
id := mux.Vars(r)["id"]
|
||||
|
||||
// Defense in depth: reject blacklisted observer
|
||||
if s.cfg != nil && s.cfg.IsObserverBlacklisted(id) {
|
||||
writeError(w, 404, "Observer not found")
|
||||
return
|
||||
}
|
||||
|
||||
obs, err := s.db.GetObserverByID(id)
|
||||
if err != nil || obs == nil {
|
||||
writeError(w, 404, "Observer not found")
|
||||
@@ -1982,6 +1997,7 @@ func (s *Server) handleObserverDetail(w http.ResponseWriter, r *http.Request) {
|
||||
ClientVersion: obs.ClientVersion, Radio: obs.Radio,
|
||||
BatteryMv: obs.BatteryMv, UptimeSecs: obs.UptimeSecs,
|
||||
NoiseFloor: obs.NoiseFloor,
|
||||
LastPacketAt: obs.LastPacketAt,
|
||||
PacketsLastHour: plh,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -859,6 +859,7 @@ type ObserverResp struct {
|
||||
BatteryMv interface{} `json:"battery_mv"`
|
||||
UptimeSecs interface{} `json:"uptime_secs"`
|
||||
NoiseFloor interface{} `json:"noise_floor"`
|
||||
LastPacketAt interface{} `json:"last_packet_at"`
|
||||
PacketsLastHour int `json:"packetsLastHour"`
|
||||
Lat interface{} `json:"lat"`
|
||||
Lon interface{} `json:"lon"`
|
||||
|
||||
+5
-4
@@ -4,7 +4,7 @@
|
||||
// --- Route/Payload name maps ---
|
||||
const ROUTE_TYPES = { 0: 'TRANSPORT_FLOOD', 1: 'FLOOD', 2: 'DIRECT', 3: 'TRANSPORT_DIRECT' };
|
||||
const PAYLOAD_TYPES = { 0: 'Request', 1: 'Response', 2: 'Direct Msg', 3: 'ACK', 4: 'Advert', 5: 'Channel Msg', 6: 'Group Data', 7: 'Anon Req', 8: 'Path', 9: 'Trace', 10: 'Multipart', 11: 'Control', 15: 'Raw Custom' };
|
||||
const PAYLOAD_COLORS = { 0: 'req', 1: 'response', 2: 'txt-msg', 3: 'ack', 4: 'advert', 5: 'grp-txt', 7: 'anon-req', 8: 'path', 9: 'trace' };
|
||||
const PAYLOAD_COLORS = { 0: 'req', 1: 'response', 2: 'txt-msg', 3: 'ack', 4: 'advert', 5: 'grp-txt', 6: 'grp-data', 7: 'anon-req', 8: 'path', 9: 'trace' };
|
||||
|
||||
function routeTypeName(n) { return ROUTE_TYPES[n] || 'UNKNOWN'; }
|
||||
function payloadTypeName(n) { return PAYLOAD_TYPES[n] || 'UNKNOWN'; }
|
||||
@@ -965,10 +965,11 @@ window.addEventListener('DOMContentLoaded', () => {
|
||||
}).catch(() => {
|
||||
window.SITE_CONFIG = { timestamps: { defaultMode: 'ago', timezone: 'local', formatPreset: 'iso', customFormat: '', allowCustomFormat: false } };
|
||||
if (window._customizerV2) window._customizerV2.init(window.SITE_CONFIG);
|
||||
}).finally(() => {
|
||||
if (!location.hash || location.hash === '#/') location.hash = '#/home';
|
||||
else navigate();
|
||||
});
|
||||
|
||||
// Navigate immediately — don't gate data-fetching pages on cosmetic theme fetch
|
||||
if (!location.hash || location.hash === '#/') location.hash = '#/home';
|
||||
else navigate();
|
||||
});
|
||||
|
||||
/**
|
||||
|
||||
@@ -595,6 +595,11 @@
|
||||
// Only fitBounds on subsequent data refreshes if user hasn't manually panned
|
||||
} catch (e) {
|
||||
console.error('Map load error:', e);
|
||||
} finally {
|
||||
// Always signal data-loaded — even on error — so E2E tests can proceed.
|
||||
// Otherwise an api() failure leaves the test waiting forever.
|
||||
var mapContainer = document.getElementById('leaflet-map');
|
||||
if (mapContainer) mapContainer.setAttribute('data-loaded', 'true');
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -955,6 +955,10 @@
|
||||
console.error('Failed to load nodes:', e);
|
||||
const tbody = document.getElementById('nodesBody');
|
||||
if (tbody) tbody.innerHTML = '<tr><td colspan="6" class="text-center" style="padding:24px;color:var(--error,#ef4444)"><div role="alert" aria-live="polite">Failed to load nodes. Please try again.</div></td></tr>';
|
||||
} finally {
|
||||
// Always signal data-loaded — even on error — so E2E tests can proceed.
|
||||
var nodesContainer = document.getElementById('nodesLeft') || document.getElementById('nodesBody');
|
||||
if (nodesContainer) nodesContainer.setAttribute('data-loaded', 'true');
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -150,6 +150,14 @@
|
||||
<div class="stat-label">First Seen</div>
|
||||
<div class="stat-value" style="font-size:0.85em">${obs.first_seen ? new Date(obs.first_seen).toLocaleDateString() : '—'}</div>
|
||||
</div>
|
||||
<div class="stat-card">
|
||||
<div class="stat-label">Last Status Update</div>
|
||||
<div class="stat-value" style="font-size:0.85em">${obs.last_seen ? timeAgo(obs.last_seen) + '<br><span style="font-size:0.8em;color:var(--text-muted)">' + new Date(obs.last_seen).toLocaleString() + '</span>' : '—'}</div>
|
||||
</div>
|
||||
<div class="stat-card">
|
||||
<div class="stat-label">Last Packet Observation</div>
|
||||
<div class="stat-value" style="font-size:0.85em">${obs.last_packet_at ? timeAgo(obs.last_packet_at) + '<br><span style="font-size:0.8em;color:var(--text-muted)">' + new Date(obs.last_packet_at).toLocaleString() + '</span>' : '<span style="color:var(--text-muted)">never</span>'}</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="mono" style="font-size:0.75em;color:var(--text-muted);margin-bottom:20px;word-break:break-all">
|
||||
ID: ${obs.id}
|
||||
|
||||
+13
-1
@@ -75,6 +75,17 @@
|
||||
return { cls: 'health-red', label: 'Offline' };
|
||||
}
|
||||
|
||||
function packetBadge(o) {
|
||||
if (!o.last_packet_at) return '<span title="No packets ever observed">📡⚠ never</span>';
|
||||
const pktAgo = Date.now() - new Date(o.last_packet_at).getTime();
|
||||
const statusAgo = o.last_seen ? Date.now() - new Date(o.last_seen).getTime() : Infinity;
|
||||
const gap = pktAgo - statusAgo;
|
||||
if (gap > 600000) {
|
||||
return `<span title="Last packet ${timeAgo(o.last_packet_at)} — status is newer by ${Math.round(gap/60000)}min. Observer may be alive but not forwarding packets.">📡⚠ ${timeAgo(o.last_packet_at)}</span>`;
|
||||
}
|
||||
return timeAgo(o.last_packet_at);
|
||||
}
|
||||
|
||||
function uptimeStr(firstSeen) {
|
||||
if (!firstSeen) return '—';
|
||||
const ms = Date.now() - new Date(firstSeen).getTime();
|
||||
@@ -123,7 +134,7 @@
|
||||
<div class="obs-table-scroll"><table class="data-table obs-table" id="obsTable">
|
||||
<caption class="sr-only">Observer status and statistics</caption>
|
||||
<thead><tr>
|
||||
<th scope="col">Status</th><th scope="col">Name</th><th scope="col">Region</th><th scope="col">Last Seen</th>
|
||||
<th scope="col">Status</th><th scope="col">Name</th><th scope="col">Region</th><th scope="col">Last Status</th><th scope="col">Last Packet</th>
|
||||
<th scope="col">Packets</th><th scope="col">Packets/Hour</th><th scope="col">Uptime</th>
|
||||
</tr></thead>
|
||||
<tbody>${filtered.map(o => {
|
||||
@@ -134,6 +145,7 @@
|
||||
<td class="mono">${o.name || o.id}</td>
|
||||
<td>${o.iata ? `<span class="badge-region">${o.iata}</span>` : '—'}</td>
|
||||
<td>${timeAgo(o.last_seen)}</td>
|
||||
<td>${packetBadge(o)}</td>
|
||||
<td>${(o.packet_count || 0).toLocaleString()}</td>
|
||||
<td>${sparkBar(o.packetsLastHour || 0, maxPktsHr)}</td>
|
||||
<td>${uptimeStr(o.first_seen)}</td>
|
||||
|
||||
+5
-1
@@ -26,7 +26,7 @@
|
||||
let observers = [];
|
||||
let observerMap = new Map(); // id → observer for O(1) lookups (#383)
|
||||
let regionMap = {};
|
||||
const TYPE_NAMES = { 0:'Request', 1:'Response', 2:'Direct Msg', 3:'ACK', 4:'Advert', 5:'Channel Msg', 7:'Anon Req', 8:'Path', 9:'Trace', 11:'Control' };
|
||||
const TYPE_NAMES = { 0:'Request', 1:'Response', 2:'Direct Msg', 3:'ACK', 4:'Advert', 5:'Channel Msg', 6:'Group Data', 7:'Anon Req', 8:'Path', 9:'Trace', 11:'Control' };
|
||||
function typeName(t) { return TYPE_NAMES[t] ?? `Type ${t}`; }
|
||||
const isMobile = window.innerWidth <= 1024;
|
||||
const PACKET_LIMIT = isMobile ? 1000 : 50000;
|
||||
@@ -748,6 +748,10 @@
|
||||
console.error('Failed to load packets:', e);
|
||||
const tbody = document.getElementById('pktBody');
|
||||
if (tbody) tbody.innerHTML = '<tr><td colspan="' + _getColCount() + '" class="text-center" style="padding:24px;color:var(--error,#ef4444)"><div role="alert" aria-live="polite">Failed to load packets. Please try again.</div></td></tr>';
|
||||
} finally {
|
||||
// Always signal data-loaded — even on error — so E2E tests can proceed.
|
||||
var pktContainer = document.getElementById('pktLeft') || document.getElementById('pktBody');
|
||||
if (pktContainer) pktContainer.setAttribute('data-loaded', 'true');
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+2
-2
@@ -15,14 +15,14 @@
|
||||
};
|
||||
|
||||
window.TYPE_COLORS = {
|
||||
ADVERT: '#22c55e', GRP_TXT: '#3b82f6', TXT_MSG: '#f59e0b', ACK: '#6b7280',
|
||||
ADVERT: '#22c55e', GRP_TXT: '#3b82f6', GRP_DATA: '#8b5cf6', TXT_MSG: '#f59e0b', ACK: '#6b7280',
|
||||
REQUEST: '#a855f7', RESPONSE: '#06b6d4', TRACE: '#ec4899', PATH: '#14b8a6',
|
||||
ANON_REQ: '#f43f5e', UNKNOWN: '#6b7280'
|
||||
};
|
||||
|
||||
// Badge CSS class name mapping
|
||||
const TYPE_BADGE_MAP = {
|
||||
ADVERT: 'advert', GRP_TXT: 'grp-txt', TXT_MSG: 'txt-msg', ACK: 'ack',
|
||||
ADVERT: 'advert', GRP_TXT: 'grp-txt', GRP_DATA: 'grp-data', TXT_MSG: 'txt-msg', ACK: 'ack',
|
||||
REQUEST: 'req', RESPONSE: 'response', TRACE: 'trace', PATH: 'path',
|
||||
ANON_REQ: 'anon-req', UNKNOWN: 'unknown'
|
||||
};
|
||||
|
||||
+1
-1
@@ -1558,7 +1558,7 @@ tr[data-hops]:hover { background: rgba(59,130,246,0.1); }
|
||||
|
||||
/* #20 — Observers table horizontal scroll on mobile */
|
||||
.obs-table-scroll { overflow-x: auto; -webkit-overflow-scrolling: touch; }
|
||||
.obs-table-scroll .obs-table { min-width: 640px; }
|
||||
.obs-table-scroll .obs-table { min-width: 720px; }
|
||||
|
||||
/* #206 — Analytics/Compare tables scroll wrappers on mobile */
|
||||
.analytics-table-scroll { overflow-x: auto; -webkit-overflow-scrolling: touch; }
|
||||
|
||||
@@ -211,6 +211,7 @@ async function run() {
|
||||
// Test 2: Nodes page loads with data
|
||||
await test('Nodes page loads with data', async () => {
|
||||
await page.goto(`${BASE}/#/nodes`, { waitUntil: 'domcontentloaded' });
|
||||
await page.waitForSelector('[data-loaded="true"]', { timeout: 15000 });
|
||||
await page.waitForSelector('table tbody tr');
|
||||
const headers = await page.$$eval('th', els => els.map(e => e.textContent.trim()));
|
||||
for (const col of ['Name', 'Public Key', 'Role']) {
|
||||
@@ -236,6 +237,7 @@ async function run() {
|
||||
// Test: Node side panel Details link navigates to full detail page (#778)
|
||||
await test('Node side panel Details link navigates', async () => {
|
||||
await page.goto(`${BASE}/#/nodes`, { waitUntil: 'domcontentloaded' });
|
||||
await page.waitForSelector('[data-loaded="true"]', { timeout: 15000 });
|
||||
await page.waitForSelector('table tbody tr');
|
||||
await page.click('table tbody tr');
|
||||
await page.waitForSelector('.node-detail');
|
||||
@@ -257,6 +259,7 @@ async function run() {
|
||||
// Test: Nodes page has WebSocket auto-update listener (#131)
|
||||
await test('Nodes page has WebSocket auto-update', async () => {
|
||||
await page.goto(`${BASE}/#/nodes`, { waitUntil: 'domcontentloaded' });
|
||||
await page.waitForSelector('[data-loaded="true"]', { timeout: 15000 });
|
||||
await page.waitForSelector('table tbody tr');
|
||||
// The live dot in navbar indicates WS connection status
|
||||
const liveDot = await page.$('#liveDot');
|
||||
@@ -282,11 +285,12 @@ async function run() {
|
||||
// Test 3: Map page loads with markers
|
||||
await test('Map page loads with markers', async () => {
|
||||
await page.goto(`${BASE}/#/map`, { waitUntil: 'domcontentloaded' });
|
||||
await page.waitForSelector('[data-loaded="true"]', { timeout: 15000 });
|
||||
await page.waitForSelector('.leaflet-container');
|
||||
await page.waitForSelector('.leaflet-tile-loaded');
|
||||
// Wait for markers/overlays to render (may not exist with empty DB)
|
||||
try {
|
||||
await page.waitForSelector('.leaflet-marker-icon, .leaflet-interactive, circle, .marker-cluster, .leaflet-marker-pane > *, .leaflet-overlay-pane svg path, .leaflet-overlay-pane svg circle', { timeout: 3000 });
|
||||
await page.waitForSelector('.leaflet-marker-icon, .leaflet-interactive, circle, .marker-cluster, .leaflet-marker-pane > *, .leaflet-overlay-pane svg path, .leaflet-overlay-pane svg circle', { timeout: 8000 });
|
||||
} catch (_) {
|
||||
// No markers with empty DB \u2014 assertion below handles it
|
||||
}
|
||||
@@ -362,7 +366,7 @@ async function run() {
|
||||
await page.waitForSelector('.leaflet-container');
|
||||
// Wait for markers (may not exist with empty DB)
|
||||
try {
|
||||
await page.waitForSelector('.leaflet-marker-icon, .leaflet-interactive', { timeout: 3000 });
|
||||
await page.waitForSelector('.leaflet-marker-icon, .leaflet-interactive', { timeout: 8000 });
|
||||
} catch (_) {
|
||||
// No markers with empty DB
|
||||
}
|
||||
@@ -394,6 +398,7 @@ async function run() {
|
||||
await page.goto(`${BASE}/#/packets`, { waitUntil: 'domcontentloaded' });
|
||||
await page.evaluate(() => localStorage.setItem('meshcore-time-window', '525600'));
|
||||
await page.reload({ waitUntil: 'load' });
|
||||
await page.waitForSelector('[data-loaded="true"]', { timeout: 15000 });
|
||||
await page.waitForSelector('table tbody tr', { timeout: 15000 });
|
||||
const rowsBefore = await page.$$('table tbody tr');
|
||||
assert(rowsBefore.length > 0, 'No packets visible');
|
||||
|
||||
Executable
+43
@@ -0,0 +1,43 @@
|
||||
#!/usr/bin/env bash
|
||||
# freshen-fixture.sh — Shift all timestamps in the fixture DB to be relative to now.
|
||||
# Preserves the relative ordering between timestamps.
|
||||
# Usage: bash tools/freshen-fixture.sh <path-to-fixture.db>
|
||||
set -euo pipefail
|
||||
|
||||
DB="${1:?Usage: freshen-fixture.sh <path-to-fixture.db>}"
|
||||
|
||||
if [ ! -f "$DB" ]; then
|
||||
echo "ERROR: DB file not found: $DB" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Find the max timestamp across all time columns, compute offset, shift everything forward.
|
||||
sqlite3 "$DB" <<'SQL'
|
||||
-- Shift all timestamps forward so the newest is ~now, preserving relative ordering.
|
||||
-- Use strftime to produce RFC3339 format (T separator + Z suffix) for correct comparison.
|
||||
UPDATE nodes SET last_seen = strftime('%Y-%m-%dT%H:%M:%SZ', last_seen,
|
||||
(SELECT printf('+%d seconds', CAST((julianday('now') - julianday(MAX(last_seen))) * 86400 AS INTEGER)) FROM nodes)
|
||||
) WHERE last_seen IS NOT NULL;
|
||||
|
||||
UPDATE transmissions SET first_seen = strftime('%Y-%m-%dT%H:%M:%SZ', first_seen,
|
||||
(SELECT printf('+%d seconds', CAST((julianday('now') - julianday(MAX(first_seen))) * 86400 AS INTEGER)) FROM transmissions)
|
||||
) WHERE first_seen IS NOT NULL;
|
||||
|
||||
-- Observers: shift last_seen too so they don't get auto-pruned by RemoveStaleObservers
|
||||
-- on server startup (default 14d threshold marks all >14d observers inactive=1, which
|
||||
-- the /api/observers filter then excludes — leaving the map page with no observer markers).
|
||||
UPDATE observers SET last_seen = strftime('%Y-%m-%dT%H:%M:%SZ', last_seen,
|
||||
(SELECT printf('+%d seconds', CAST((julianday('now') - julianday(MAX(last_seen))) * 86400 AS INTEGER)) FROM observers)
|
||||
) WHERE last_seen IS NOT NULL;
|
||||
SQL
|
||||
|
||||
# Defensive: clear any stale inactive=1 flags. Column may not exist on fresh fixtures
|
||||
# (added by server migration on first startup); silently no-op if missing.
|
||||
sqlite3 "$DB" "UPDATE observers SET inactive = 0 WHERE inactive = 1;" 2>/dev/null || true
|
||||
|
||||
# neighbor_edges may not exist in all fixture versions
|
||||
sqlite3 "$DB" "UPDATE neighbor_edges SET last_seen = strftime('%Y-%m-%dT%H:%M:%SZ', last_seen, (SELECT printf('+%d seconds', CAST((julianday('now') - julianday(MAX(last_seen))) * 86400 AS INTEGER)) FROM neighbor_edges)) WHERE last_seen IS NOT NULL;" 2>/dev/null || true
|
||||
|
||||
echo "Fixture timestamps freshened in $DB"
|
||||
sqlite3 "$DB" "SELECT 'nodes: min=' || MIN(last_seen) || ' max=' || MAX(last_seen) FROM nodes;"
|
||||
sqlite3 "$DB" "SELECT 'observers: count=' || COUNT(*) || ' max=' || MAX(last_seen) FROM observers;"
|
||||
Reference in New Issue
Block a user